{-# LANGUAGE RecordWildCards #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-| Module : TimeSeries.Routing Copyright : (C) 2013 Parallel Scientific Labs, LLC License : GPL-2 Stability : experimental Portability : non-portable Data routing for time series analysis. -} module TimeSeries.Routing where import Control.Monad.State import Control.Monad.Writer import Data.List (intersperse) import Data.Word import Data.IntMap (IntMap) import Data.Sequence (Seq) import System.IO import qualified Data.Foldable as F import qualified Data.IntMap as IM import qualified Data.Sequence as Seq import TimeSeries.Window (Window, RandomVector(..), Size) import qualified TimeSeries.Window as W import qualified TimeSeries.Correlation as Crr -- -------------------------------------------------------------------------- -- * Types -- | Configuration values for detecting correlations from -- uncooperative time series data. -- -- This data type shall relate to bootstrapping in future, but at the -- moment, nothing related. -- data Config = Config { -- | Number of random elements in sketch. sketchSize :: Int -- | Number of subsequences for grouping sketch. , sketchGroups :: Word8 -- | Cutoff value between of correlation, between 0 to 1. , corrCutoff :: Double -- | Size of sliding window. , swSize :: Word64 -- | Size of basic window. , bwSize :: Word64 -- | Seed for random vectors. , randSeed :: Integer -- | Implementation. , impls :: Implementation -- | To show trace output or not. , verbose :: Bool } deriving (Eq, Show) -- | Representing basic windows as 'IntMap' of 'Seq' of 'Window's. -- -- Key of outer 'IntMap' is ID for concurrent input stream, number of -- keys matches to number of concurrent input stream. Inner 'Seq' -- is indexed basic window within sliding window, number of elements -- matches to /nb/, where `nb = sw / bw`, /sw/ is sliding window size -- and /bw/ is basic window size. -- type BasicWindows = IntMap (Seq Window) -- | Summary of sliding window. -- -- We don't need to maintain whole basic windows when sketch -- implementation is the only concern. To show correlation values with -- direct function, preserving the whole basic window contents. From -- \"3.5 The Issues in Implementation section\": -- -- /... We need to maintain/ -- -- * ∑i=0,nb-1(sum(Xbwⁱ)) -- -- * ∑i=0,nb-1(sum((Xbwⁱ)²)) -- -- /for a sliding window .../ -- data Summary = Summary { -- | Sum of elements in sliding window. -- In other words, @∑i=0,nb-1(sum(Xbwⁱ))@. smrSum :: Double -- | Sum of squared elements in sliding window. -- In other words, @∑i=0,nb-1(sum((Xbwⁱ)²))@. , smrSumSq :: Double -- | Sketch vector, size of this window matches to sketch size. , smrSketch :: Window -- | Pre-sketches. \"/Pre/\" means that those sketch values -- before applying dot product with control vector. , smrPreSketches :: [Window] -- | Summaries for basic windows, length of 'Seq' is /nb/, where -- @nb = sliding_window_size / basic_window_size@. , smrBasicWindows :: Seq BWSummary } deriving (Eq, Show) -- | Summary of basic window. -- -- From \"3.5 The Issues in Implementation section\": -- -- /... We need to maintain/ -- -- * Xbwⁱ⋅R -- -- * sum(Xbwⁱ) -- -- * sum((Xbwⁱ)²) -- -- /for each basic window./ -- data BWSummary = BWSummary { -- | Sketch of basic window, currently not in use. bwSketch :: Window -- | Sum of elements in basic window. , bwsSum :: Double -- | Sum of squared elements in basic window. , bwsSumSq :: Double } deriving (Eq, Show) {- XXX: Are those fields in 'Summary' and 'BWSummry' enough? Update whenever found more needs.... -} -- | State stored in analysis system. -- -- Later this shall relate to memory data in hardware implementation, -- but at the moment its 100% Haskell data type and values. -- data SysState = SysState { sysBasicWindows :: BasicWindows , sysRandomVectors :: IntMap RandomVector , sysAppendCount :: Word64 , sysCurrentTime :: Word64 , sysNumberOfBasicWindows :: Word64 , sysIsReadyToTell :: Bool , sysSummaries :: IntMap Summary , sysConfig :: Config } deriving (Eq, Show) -- XXX: There are more data to hold in state, more parameters to configure. -- Just enough to starting the main loop .... -- | Result of correlation analysis. -- -- Inspired from output found in \"statStream\". See: -- -- * <http://cs.nyu.edu/cs/faculty/shasha/papers/statstream/doc.html> -- data CorrResult = CorrResult { -- | Index of time series input A. crIndexA :: Int -- | Index of time series input B. , crIndexB :: Int -- | Start time of base window. , crStartTime :: Word64 -- | 'crTimeStart' + (size of base window) , crEndTime :: Word64 -- | A value between -1 to 1. , crValue :: Double } deriving (Eq, Show) -- | Implementations. -- -- As for a prototype, used to analyse and compre resulting values for -- different implementations. -- data Implementation = Direct -- ^ Direct correlation computation. | Sketch -- ^ Sketch based computation. deriving (Eq, Show, Read) -- | Loop for analysing correlation of input data and updating -- analysis results and windowed data. newtype Loop a = Loop {unLoop :: StateT SysState (Writer [Either String CorrResult]) a} deriving ( Functor, Monad, MonadState SysState , MonadWriter [Either String CorrResult] ) -- -------------------------------------------------------------------------- -- * Looping actions -- -- $ Looping with 'State' and 'Writer'. -- -- | The main loop of time series analysis. -- -- Specifying number of concurrent time series data in this function. -- In hardware implementation, this may relates to fixed value, which -- possibly been configured at the time of code generation. -- loop :: Handle -- ^ Where to show results. -> Config -- ^ Configuration values used for computation. -> [[Double]] -- ^ Input values. -> IO () loop hdl conf ins0 = go st0 ins0 where nTimeSeries = length (head ins0) st0 = initialSysState conf nTimeSeries go st ins = case ins of [] -> putStrLn "done." ds:dss -> do let ((_,st'),res) = runLoop (step conf ds) st mapM_ (hPutStrLn hdl . either id simpleCorrResult) res go st' dss -- | Unwrap 'Loop', run internal 'State' and 'Writer'. runLoop :: Loop a -> SysState -> ((a, SysState), [Either String CorrResult]) runLoop (Loop lp) = runWriter . runStateT lp -- | Single step to take with new input data. -- -- Handle state management and if found any, report analysis result. -- step :: Config -> [Double] -> Loop () step Config{..} values = do -- What we do with new inputs. updateStates values -- Notify correlation value. -- Some of the state may relate to recursive values in FPGA registers, -- some may relate to contents of external memory, in hardware -- implementation. notify <- sysIsReadyToTell `fmap` get -- In later phase, 'tell' below should be a response to external -- call for getting analysis result. when notify $ do -- Update summaries used for standardization of sketch distances, -- and shift basic windows when basic window is full state. updateSummariesAndShift -- ... And get state. SysState{..} <- get -- Computation of correlation with current system state. let results = corrPermute sysCurrentTime bwSize swSize sysBasicWindows sysSummaries tell $ concatMap (map Right . filterImpl impls corrCutoff) results -- For tracing. when verbose $ do -- Differences of direct correlation and sketch correlation. -- tell [ Left "\nDiffs:" -- , let diff :: Double -> Double -> String -- diff x y = printf "%.22f" (abs (x-y)) -- ds = map (uncurry (diff `on` crValue)) results -- in Left (unlines $ map (" " ++) ds) -- ] -- Averages and variances of sketch. tell [ Left "\nIntMap (avg,var) of sketches:" , let av :: Summary -> (Double,Double) av Summary{..} = (W.average smrSketch, W.variance smrSketch) in Left (IM.showTree $ IM.map av sysSummaries) ] -- | Filter results with implementation and cutoff value. filterImpl :: Implementation -- ^ Which result to choose. -> Double -- ^ Cutoff value. -> (CorrResult,CorrResult) -- ^ (Direct correlation result, sketch correlation result). -> [CorrResult] filterImpl impl coff (d,s) = case impl of Direct | abs (crValue d) > coff -> [d] Sketch | abs (crValue s) > coff -> [s] _ -> [] -- | Compute correlations. corrPermute :: Word64 -- ^ System current time. -- ... What is the time format used in hardware implementation? -> Word64 -- ^ Basic window size. -> Word64 -- ^ Sliding window size. -> BasicWindows -- ^ Basic windows. -> IntMap Summary -- ^ 'Summary' for each input stream. -> [(CorrResult,CorrResult)] -- ^ Direct result and sketch result. corrPermute time bwsz swsz im smrs = let im' = IM.toList $ slidingWindow swsz im f (mykey,mywin) = concatMap g im' where g (otherkey,otherwin) | mykey < otherkey = [(directCrr,sketchCrr)] | otherwise = [] where directCrr = crr $ Crr.direct mywin otherwin sketchCrr = crr $ Crr.sketch mysketch othersketch crr = CorrResult mykey otherkey startTime endTime startTime = time - bwsz + 1 endTime = time _mytup = (mysketch,mysum,mysumsq) mysum = smrSum (smrs IM.! mykey) mysumsq = smrSumSq (smrs IM.! mykey) _othertup = (othersketch,othersum,othersumsq) othersum = smrSum (smrs IM.! otherkey) othersumsq = smrSumSq (smrs IM.! otherkey) -- sw' = fromIntegral swsz mysketch = smrSketch (smrs IM.! mykey) othersketch = smrSketch (smrs IM.! otherkey) in concatMap f im' -- | Struct sliding windows from basic windows. -- -- This function is not needed in hardware implementation, should used -- for direct calculation only. -- slidingWindow :: Word64 -> BasicWindows -> IntMap Window slidingWindow swsz bw = fmap (F.foldr W.append (W.empty swsz)) bw -- | Adding new elements to windows, removing old elements. updateStates :: [Double] -> Loop () updateStates values = modify $ \st@SysState{..} -> let values' = IM.fromList . zip [0..] . map (Seq.singleton . W.singleton) $ values count = (sysAppendCount+1) `mod` bwSize sysConfig shifting = count == (bwSize sysConfig - 1) bwin = IM.unionWith consBW values' sysBasicWindows vmap = IM.fromList $ zip [0..] values -- Sum and sum of squares in head basic window are updated in -- smry'. smry' = IM.mapWithKey f sysSummaries f k s@Summary{..} = let v = vmap IM.! k sbw0 = Seq.index smrBasicWindows 0 sbw0' = sbw0 { bwsSum = bwsSum sbw0 + v , bwsSumSq = bwsSumSq sbw0 + v^(2::Int) } sbw' = Seq.update 0 sbw0' smrBasicWindows in s {smrBasicWindows = sbw'} in st { sysBasicWindows = bwin , sysAppendCount = count , sysIsReadyToTell = shifting , sysCurrentTime = sysCurrentTime + 1 , sysSummaries = smry' } -- | Append first argument to second argument. consBW :: Seq Window -- ^ Singleton 'Seq' of singleton 'Window'. -> Seq Window -- ^ Basic windows in system status. -> Seq Window consBW a b = b' where a0 = Seq.index a 0 b0 = Seq.index b 0 b' = W.append a0 b0 Seq.<| Seq.drop 1 b -- | Update sum of elements in sliding window, and sum of squared -- elements in sliding window, and shift contents of basic windows. updateSummariesAndShift :: Loop () updateSummariesAndShift = modify $ \st@SysState{..} -> let -- Popping out last element, pushing size /bw/ zero filled window to -- store incoming data. bws = IM.map (rotateSeq (W.empty (bwSize sysConfig))) sysBasicWindows -- Word64 to Int conversion. nb = fromIntegral sysNumberOfBasicWindows -- Updating sums and sum of squares. smry' = IM.mapWithKey f sysSummaries f k s = s { smrSum = sum' , smrSumSq = sumsq' , smrBasicWindows = smrbws' , smrSketch = sketch' , smrPreSketches = preSketches' } where -- Sum and sum of squares, of elements in sliding window. sum' = smrSum s - bwsSum bwsmrynb + bwsSum bwsmry0 sumsq' = smrSumSq s - bwsSumSq bwsmrynb + bwsSumSq bwsmry0 -- Basic window summaries. bwsmry0 = Seq.index smrbws 0 bwsmrynb = Seq.index smrbws (nb-1) smrbws = smrBasicWindows s bws' = sysBasicWindows IM.! k -- Sketch of sliding window (Naively standardized) -- Sums and sums of squares are not used .... sketch' = W.fromList [(xi-mu)/sigma|xi<-sks] sks = zipWith W.dotp cs preSketches' mu = W.average $ W.fromList sks sigma = W.variance $ W.fromList sks -- Using these 'mu' and 'sigma', symmetrical shape is -- preserved but scaling is inappropriate... -- -- mu = (sum'/swSize') -- sigma = (sumsq'/swSize') - mu^(2::Int) -- swSize' = fromIntegral (swSize sysConfig) -- Summaries of basic windows. smrbws' = rotateSeq iniBW smrbws iniBW = initialBWSummary (fromIntegral $ sketchSize sysConfig) -- Pre-sketches, stored for next looping step. preSketches' :: [Window] preSketches' = zipWith (\a b -> W.append (W.singleton a) b) (W.toList $ W.dotps urs bw0) (smrPreSketches s) -- New basic window. bw0 :: Window bw0 = Seq.index bws' 0 -- Unit random vectors. urs :: [Window] urs = map unitRV . IM.elems $ sysRandomVectors -- Control vectors. cs :: [Window] cs = map controlRV . IM.elems $ sysRandomVectors in st { sysBasicWindows = bws , sysSummaries = smry' } -- | Cons given element to given 'Seq' and remove last element in -- 'Seq'. rotateSeq :: a -> Seq a -> Seq a rotateSeq x sq = x Seq.<| Seq.take (Seq.length sq - 1) sq {- XXX: * Do 'incremental approach' described in Chapter 4 of 'High Performance Algorithms for ... ' pdf to update state. * Use partitioned sketches, with parameters: N, d, c, and f. * Make code closer to model hardware implementation. * Asynchrnous correlation (i.e. correlations between time shifted windows) are not computed. -} -- -------------------------------------------------------------------------- -- * Initial values -- | Initial state. -- -- Other than 'Config', number of concurrent time series input data is -- passed. -- initialSysState :: Config -> Int -> SysState initialSysState conf@Config{..} numTimeSeries = SysState { sysBasicWindows = initialBasicWindows bwSize nb numTimeSeries , sysRandomVectors = randomVectors randSeed bwSize swSize sketchSize , sysAppendCount = 0 , sysNumberOfBasicWindows = swSize `div` bwSize , sysIsReadyToTell = False , sysCurrentTime = 0 , sysSummaries = initialSummaries conf numTimeSeries , sysConfig = conf } where nb = swSize `div` bwSize initialSlidingWindows :: Size -> Int -> IntMap Window initialSlidingWindows sz n = IM.fromList $ zip [0..n-1] (repeat $ W.empty sz) initialBasicWindows :: Size -> Size -> Int -> BasicWindows initialBasicWindows sz nb n = IM.fromList $ zip [0..n-1] (repeat sw0) where sw0 = Seq.fromList (replicate (fromIntegral nb) bw0) bw0 = W.empty sz initialSummaries :: Config -- ^ Main configuration. -> Int -- ^ Number of concurrent input streams. -> IntMap Summary initialSummaries Config{..} nt = IM.fromList $ zip [0..nt-1] smrs where smrs = repeat $ Summary { smrSum = 0 , smrSumSq = 0 , smrSketch = W.empty (fromIntegral sketchSize) , smrPreSketches = replicate sketchSize (W.empty nb) , smrBasicWindows = Seq.fromList . replicate nb . initialBWSummary $ fromIntegral sketchSize } nb :: Num a => a nb = fromIntegral (swSize `div` bwSize) -- | Initial basic window summary data. initialBWSummary :: Size -- ^ Sketch size. -> BWSummary initialBWSummary d = BWSummary { bwSketch = W.empty d , bwsSum = 0 , bwsSumSq = 0 } -- | Initial random vectors. randomVectors :: Integer -- ^ Random seed. -> Size -- ^ Basic window size. -> Size -- ^ Sliding window size. -> Int -- ^ Sketch size. -> IntMap RandomVector randomVectors seed bw sw d = IM.fromList wins where wins = zip ks rs ks = [0..d-1] rs = [W.randomVector (fromIntegral k + seed) bw nb | k <- ks] nb = fromIntegral (sw `div` bw) -- -------------------------------------------------------------------------- -- * Pretty printer -- -- $ Actually, not much pretty. -- | Simplified string representation of 'CorrResult'. -- -- >>> simpleCorrResult (CorrResult 1 2 10 20 0.5) -- "1, 2, 10, 20, 0.5" -- simpleCorrResult :: CorrResult -> String simpleCorrResult CorrResult{..} = concat $ intersperse ", " [ show crIndexA, show crIndexB , show crStartTime, show crEndTime , show crValue ]