{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
Module      : TimeSeries.Routing
Copyright   : (C) 2013 Parallel Scientific Labs, LLC
License     : GPL-2
Stability   : experimental
Portability : non-portable

Data routing for time series analysis.

module TimeSeries.Routing where

import Control.Monad.State
import Control.Monad.Writer
import Data.List (intersperse)
import Data.Word
import Data.IntMap (IntMap)
import Data.Sequence (Seq)
import System.IO
import qualified Data.Foldable as F
import qualified Data.IntMap as IM
import qualified Data.Sequence as Seq

import TimeSeries.Window (Window, RandomVector(..), Size)
import qualified TimeSeries.Window as W
import qualified TimeSeries.Correlation as Crr

-- --------------------------------------------------------------------------
-- * Types

-- | Configuration values for detecting correlations from
-- uncooperative time series data.
-- This data type shall relate to bootstrapping in future, but at the
-- moment, nothing related.
data Config = Config
  { -- | Number of random elements in sketch.
    sketchSize :: Int
    -- | Number of subsequences for grouping sketch.
  , sketchGroups :: Word8
    -- | Cutoff value between of correlation, between 0 to 1.
  , corrCutoff :: Double
    -- | Size of sliding window.
  , swSize :: Word64
    -- | Size of basic window.
  , bwSize :: Word64
    -- | Seed for random vectors.
  , randSeed :: Integer
    -- | Implementation.
  , impls :: Implementation
    -- | To show trace output or not.
  , verbose :: Bool
  } deriving (Eq, Show)

-- | Representing basic windows as 'IntMap' of 'Seq' of 'Window's.
-- Key of outer 'IntMap' is ID for concurrent input stream, number of
-- keys matches to number of concurrent input stream. Inner 'Seq'
-- is indexed basic window within sliding window, number of elements
-- matches to /nb/, where `nb = sw / bw`, /sw/ is sliding window size
-- and /bw/ is basic window size.
type BasicWindows = IntMap (Seq Window)

-- | Summary of sliding window.
-- We don't need to maintain whole basic windows when sketch
-- implementation is the only concern. To show correlation values with
-- direct function, preserving the whole basic window contents. From
-- \"3.5 The Issues in Implementation section\":
-- /... We need to maintain/
-- * ∑i=0,nb-1(sum(Xbwⁱ))
-- * ∑i=0,nb-1(sum((Xbwⁱ)²))
-- /for a sliding window .../
data Summary = Summary
  { -- | Sum of elements in sliding window.
    -- In other words, @∑i=0,nb-1(sum(Xbwⁱ))@.
    smrSum :: Double
    -- | Sum of squared elements in sliding window.
    -- In other words, @∑i=0,nb-1(sum((Xbwⁱ)²))@.
  , smrSumSq :: Double
    -- | Sketch vector, size of this window matches to sketch size.
  , smrSketch :: Window
    -- | Pre-sketches. \"/Pre/\" means that those sketch values
    -- before applying dot product with control vector.
  , smrPreSketches :: [Window]
    -- | Summaries for basic windows, length of 'Seq' is /nb/, where
    -- @nb = sliding_window_size / basic_window_size@.
  , smrBasicWindows :: Seq BWSummary
  } deriving (Eq, Show)

-- | Summary of basic window.
-- From \"3.5 The Issues in Implementation section\":
-- /... We need to maintain/
-- * Xbwⁱ⋅R
-- * sum(Xbwⁱ)
-- * sum((Xbwⁱ)²)
-- /for each basic window./
data BWSummary = BWSummary
  { -- | Sketch of basic window, currently not in use.
    bwSketch :: Window
    -- | Sum of elements in basic window.
  , bwsSum :: Double
    -- | Sum of squared elements in basic window.
  , bwsSumSq :: Double
  } deriving (Eq, Show)

XXX: Are those fields in 'Summary' and 'BWSummry' enough?
Update whenever found more needs....

-- | State stored in analysis system.
-- Later this shall relate to memory data in hardware implementation,
-- but at the moment its 100% Haskell data type and values.
data SysState = SysState
  { sysBasicWindows :: BasicWindows
  , sysRandomVectors :: IntMap RandomVector
  , sysAppendCount :: Word64
  , sysCurrentTime :: Word64
  , sysNumberOfBasicWindows :: Word64
  , sysIsReadyToTell :: Bool
  , sysSummaries :: IntMap Summary
  , sysConfig :: Config
  } deriving (Eq, Show)

-- XXX: There are more data to hold in state, more parameters to configure.
-- Just enough to starting the main loop ....

-- | Result of correlation analysis.
-- Inspired from output found in \"statStream\". See:
-- * <http://cs.nyu.edu/cs/faculty/shasha/papers/statstream/doc.html>
data CorrResult = CorrResult
  { -- | Index of time series input A.
    crIndexA :: Int
    -- | Index of time series input B.
  , crIndexB :: Int
    -- | Start time of base window.
  , crStartTime :: Word64
    -- | 'crTimeStart' + (size of base window)
  , crEndTime :: Word64
    -- | A value between -1 to 1.
  , crValue :: Double
  } deriving (Eq, Show)

-- | Implementations.
-- As for a prototype, used to analyse and compre resulting values for
-- different implementations.
data Implementation
  = Direct -- ^ Direct correlation computation.
  | Sketch -- ^ Sketch based computation.
  deriving (Eq, Show, Read)

-- | Loop for analysing correlation of input data and updating
-- analysis results and windowed data.
newtype Loop a =
  Loop {unLoop :: StateT SysState (Writer [Either String CorrResult]) a}
    ( Functor, Monad, MonadState SysState
    , MonadWriter [Either String CorrResult]

-- --------------------------------------------------------------------------
-- * Looping actions
-- $ Looping with 'State' and 'Writer'.

-- | The main loop of time series analysis.
-- Specifying number of concurrent time series data in this function.
-- In hardware implementation, this may relates to fixed value, which
-- possibly been configured at the time of code generation.
loop ::
  Handle        -- ^ Where to show results.
  -> Config     -- ^ Configuration values used for computation.
  -> [[Double]] -- ^ Input values.
  -> IO ()
loop hdl conf ins0 = go st0 ins0 where
  nTimeSeries = length (head ins0)
  st0 = initialSysState conf nTimeSeries
  go st ins = case ins of
    []     -> putStrLn "done."
    ds:dss -> do
      let ((_,st'),res) = runLoop (step conf ds) st
      mapM_ (hPutStrLn hdl . either id simpleCorrResult) res
      go st' dss

-- | Unwrap 'Loop', run internal 'State' and 'Writer'.
runLoop ::
  Loop a -> SysState -> ((a, SysState), [Either String CorrResult])
runLoop (Loop lp) = runWriter . runStateT lp

-- | Single step to take with new input data.
-- Handle state management and if found any, report analysis result.
step :: Config -> [Double] -> Loop ()
step Config{..} values = do

  -- What we do with new inputs.
  updateStates values

  -- Notify correlation value.
  -- Some of the state may relate to recursive values in FPGA registers,
  -- some may relate to contents of external memory, in hardware
  -- implementation.
  notify <- sysIsReadyToTell `fmap` get

  -- In later phase, 'tell' below should be a response to external
  -- call for getting analysis result.
  when notify $ do

    -- Update summaries used for standardization of sketch distances,
    -- and shift basic windows when basic window is full state.

    -- ... And get state.
    SysState{..} <- get

    -- Computation of correlation with current system state.
    let results = corrPermute
          sysCurrentTime bwSize swSize sysBasicWindows sysSummaries
    tell $ concatMap (map Right . filterImpl impls corrCutoff) results

    -- For tracing.
    when verbose $ do

      -- Differences of direct correlation and sketch correlation.
      -- tell [ Left "\nDiffs:"
      --      , let diff :: Double -> Double -> String
      --            diff x y = printf "%.22f" (abs (x-y))
      --            ds = map (uncurry (diff `on` crValue)) results
      --        in  Left (unlines $ map ("  " ++) ds)
      --      ]

      -- Averages and variances of sketch.
      tell [ Left "\nIntMap (avg,var) of sketches:"
           , let av :: Summary -> (Double,Double)
                 av Summary{..} = (W.average smrSketch, W.variance smrSketch)
             in  Left (IM.showTree $ IM.map av sysSummaries)

-- | Filter results with implementation and cutoff value.
filterImpl ::
  -- ^ Which result to choose.
  -> Double
  -- ^ Cutoff value.
  -> (CorrResult,CorrResult)
  -- ^ (Direct correlation result, sketch correlation result).
  -> [CorrResult]
filterImpl impl coff (d,s) = case impl of
  Direct | abs (crValue d) > coff -> [d]
  Sketch | abs (crValue s) > coff -> [s]
  _                               -> []

-- | Compute correlations.
corrPermute ::
  -- ^ System current time.
  -- ... What is the time format used in hardware implementation?
  -> Word64
  -- ^ Basic window size.
  -> Word64
  -- ^ Sliding window size.
  -> BasicWindows
  -- ^ Basic windows.
  -> IntMap Summary
  -- ^ 'Summary' for each input stream.
  -> [(CorrResult,CorrResult)]
  -- ^ Direct result and sketch result.
corrPermute time bwsz swsz im smrs =
  let im' = IM.toList $ slidingWindow swsz im
      f (mykey,mywin) = concatMap g im' where
        g (otherkey,otherwin)
          | mykey < otherkey = [(directCrr,sketchCrr)]
          | otherwise        = []
            directCrr = crr $ Crr.direct mywin otherwin
            sketchCrr = crr $ Crr.sketch mysketch othersketch
            crr = CorrResult mykey otherkey startTime endTime
            startTime = time - bwsz + 1
            endTime = time
            _mytup = (mysketch,mysum,mysumsq)
            mysum = smrSum (smrs IM.! mykey)
            mysumsq = smrSumSq (smrs IM.! mykey)
            _othertup = (othersketch,othersum,othersumsq)
            othersum = smrSum (smrs IM.! otherkey)
            othersumsq = smrSumSq (smrs IM.! otherkey)
            -- sw' = fromIntegral swsz
            mysketch = smrSketch (smrs IM.! mykey)
            othersketch = smrSketch (smrs IM.! otherkey)
  in  concatMap f im'

-- | Struct sliding windows from basic windows.
-- This function is not needed in hardware implementation, should used
-- for direct calculation only.
slidingWindow :: Word64 -> BasicWindows -> IntMap Window
slidingWindow swsz bw = fmap (F.foldr W.append (W.empty swsz)) bw

-- | Adding new elements to windows, removing old elements.
updateStates :: [Double] -> Loop ()
updateStates values =
  modify $ \st@SysState{..} ->
    let values' =
          IM.fromList . zip [0..] . map (Seq.singleton . W.singleton) $
        count = (sysAppendCount+1) `mod` bwSize sysConfig
        shifting = count == (bwSize sysConfig - 1)
        bwin = IM.unionWith consBW values' sysBasicWindows
        vmap = IM.fromList $ zip [0..] values
        -- Sum and sum of squares in head basic window are updated in
        -- smry'.
        smry' = IM.mapWithKey f sysSummaries
        f k s@Summary{..} =
          let v = vmap IM.! k
              sbw0 = Seq.index smrBasicWindows 0
              sbw0' = sbw0
                { bwsSum = bwsSum sbw0 + v
                , bwsSumSq = bwsSumSq sbw0 + v^(2::Int)
              sbw' = Seq.update 0 sbw0' smrBasicWindows
          in  s {smrBasicWindows = sbw'}
    in  st { sysBasicWindows = bwin
           , sysAppendCount = count
           , sysIsReadyToTell = shifting
           , sysCurrentTime = sysCurrentTime + 1
           , sysSummaries = smry'

-- | Append first argument to second argument.
consBW ::
  Seq Window -- ^ Singleton 'Seq' of singleton 'Window'.
  -> Seq Window -- ^ Basic windows in system status.
  -> Seq Window
consBW a b = b' where
  a0 = Seq.index a 0
  b0 = Seq.index b 0
  b' = W.append a0 b0 Seq.<| Seq.drop 1 b

-- | Update sum of elements in sliding window, and sum of squared
-- elements in sliding window, and shift contents of basic windows.
updateSummariesAndShift :: Loop ()
updateSummariesAndShift = modify $ \st@SysState{..} ->
  let -- Popping out last element, pushing size /bw/ zero filled window to
      -- store incoming data.
      bws = IM.map (rotateSeq (W.empty (bwSize sysConfig))) sysBasicWindows

      -- Word64 to Int conversion.
      nb = fromIntegral sysNumberOfBasicWindows

      -- Updating sums and sum of squares.
      smry' = IM.mapWithKey f sysSummaries
      f k s = s { smrSum = sum'
                , smrSumSq = sumsq'
                , smrBasicWindows = smrbws'
                , smrSketch = sketch'
                , smrPreSketches = preSketches'
          -- Sum and sum of squares, of elements in sliding window.
          sum' = smrSum s - bwsSum bwsmrynb + bwsSum bwsmry0
          sumsq' = smrSumSq s - bwsSumSq bwsmrynb + bwsSumSq bwsmry0

          -- Basic window summaries.
          bwsmry0 = Seq.index smrbws 0
          bwsmrynb = Seq.index smrbws (nb-1)
          smrbws = smrBasicWindows s
          bws' = sysBasicWindows IM.! k

          -- Sketch of sliding window (Naively standardized)
          -- Sums and sums of squares are not used ....
          sketch' = W.fromList [(xi-mu)/sigma|xi<-sks]
          sks = zipWith W.dotp cs preSketches'

          mu = W.average $ W.fromList sks
          sigma = W.variance $ W.fromList sks

          -- Using these 'mu' and 'sigma', symmetrical shape is
          -- preserved but scaling is inappropriate...
          -- mu = (sum'/swSize')
          -- sigma = (sumsq'/swSize') - mu^(2::Int)
          -- swSize' = fromIntegral (swSize sysConfig)

          -- Summaries of basic windows.
          smrbws' = rotateSeq iniBW smrbws
          iniBW = initialBWSummary (fromIntegral $ sketchSize sysConfig)

          -- Pre-sketches, stored for next looping step.
          preSketches' :: [Window]
          preSketches' =
            zipWith (\a b -> W.append (W.singleton a) b)
              (W.toList $ W.dotps urs bw0) (smrPreSketches s)

          -- New basic window.
          bw0 :: Window
          bw0 = Seq.index bws' 0

          -- Unit random vectors.
          urs :: [Window]
          urs = map unitRV . IM.elems $ sysRandomVectors

          -- Control vectors.
          cs :: [Window]
          cs = map controlRV . IM.elems $ sysRandomVectors

  in  st { sysBasicWindows = bws
         , sysSummaries = smry'

-- | Cons given element to given 'Seq' and remove last element in
-- 'Seq'.
rotateSeq :: a -> Seq a -> Seq a
rotateSeq x sq = x Seq.<| Seq.take (Seq.length sq - 1) sq


* Do 'incremental approach' described in Chapter 4 of
'High Performance Algorithms for ... ' pdf to update state.

* Use partitioned sketches, with parameters: N, d, c, and f.

* Make code closer to model hardware implementation.

* Asynchrnous correlation (i.e. correlations between time shifted
windows) are not computed.


-- --------------------------------------------------------------------------
-- * Initial values

-- | Initial state.
-- Other than 'Config', number of concurrent time series input data is
-- passed.
initialSysState :: Config -> Int -> SysState
initialSysState conf@Config{..} numTimeSeries = SysState
  { sysBasicWindows = initialBasicWindows bwSize nb numTimeSeries
  , sysRandomVectors = randomVectors randSeed bwSize swSize sketchSize
  , sysAppendCount = 0
  , sysNumberOfBasicWindows = swSize `div` bwSize
  , sysIsReadyToTell = False
  , sysCurrentTime = 0
  , sysSummaries = initialSummaries conf numTimeSeries
  , sysConfig = conf
  } where nb = swSize `div` bwSize

initialSlidingWindows :: Size -> Int -> IntMap Window
initialSlidingWindows sz n = IM.fromList $ zip [0..n-1] (repeat $ W.empty sz)

initialBasicWindows :: Size -> Size -> Int -> BasicWindows
initialBasicWindows sz nb n = IM.fromList $ zip [0..n-1] (repeat sw0) where
  sw0 = Seq.fromList (replicate (fromIntegral nb) bw0)
  bw0 = W.empty sz

initialSummaries ::
  Config -- ^ Main configuration.
  -> Int -- ^ Number of concurrent input streams.
  -> IntMap Summary
initialSummaries Config{..} nt = IM.fromList $ zip [0..nt-1] smrs where
  smrs = repeat $ Summary
    { smrSum = 0
    , smrSumSq = 0
    , smrSketch = W.empty (fromIntegral sketchSize)
    , smrPreSketches = replicate sketchSize (W.empty nb)
    , smrBasicWindows = Seq.fromList . replicate nb . initialBWSummary $
                        fromIntegral sketchSize
  nb :: Num a => a
  nb = fromIntegral (swSize `div` bwSize)

-- | Initial basic window summary data.
initialBWSummary ::
  Size -- ^ Sketch size.
  -> BWSummary
initialBWSummary d = BWSummary
  { bwSketch = W.empty d
  , bwsSum = 0
  , bwsSumSq = 0

-- | Initial random vectors.
randomVectors ::
  Integer -- ^ Random seed.
  -> Size -- ^ Basic window size.
  -> Size -- ^ Sliding window size.
  -> Int  -- ^ Sketch size.
  -> IntMap RandomVector
randomVectors seed bw sw d = IM.fromList wins where
  wins = zip ks rs
  ks = [0..d-1]
  rs = [W.randomVector (fromIntegral k + seed) bw nb | k <- ks]
  nb = fromIntegral (sw `div` bw)

-- --------------------------------------------------------------------------
-- * Pretty printer
-- $ Actually, not much pretty.

-- | Simplified string representation of 'CorrResult'.
-- >>> simpleCorrResult (CorrResult 1 2 10 20 0.5)
-- "1, 2, 10, 20, 0.5"
simpleCorrResult :: CorrResult -> String
simpleCorrResult CorrResult{..} = concat $ intersperse ", "
   [ show crIndexA, show crIndexB
   , show crStartTime, show crEndTime
   , show crValue