module Simulation.Aivika.Stream
(
Stream(..),
emptyStream,
mergeStreams,
mergeQueuedStreams,
mergePriorityStreams,
concatStreams,
concatQueuedStreams,
concatPriorityStreams,
splitStream,
splitStreamQueueing,
splitStreamPrioritising,
splitStreamFiltering,
splitStreamFilteringQueueing,
streamUsingId,
prefetchStream,
delayStream,
arrivalStream,
memoStream,
zipStreamSeq,
zipStreamParallel,
zip3StreamSeq,
zip3StreamParallel,
unzipStream,
streamSeq,
streamParallel,
consumeStream,
sinkStream,
repeatProcess,
mapStream,
mapStreamM,
accumStream,
apStream,
apStreamM,
filterStream,
filterStreamM,
takeStream,
takeStreamWhile,
takeStreamWhileM,
dropStream,
dropStreamWhile,
dropStreamWhileM,
singletonStream,
joinStream,
failoverStream,
signalStream,
streamSignal,
queuedSignalStream,
leftStream,
rightStream,
replaceLeftStream,
replaceRightStream,
partitionEitherStream,
cloneStream,
firstArrivalStream,
lastArrivalStream,
assembleAccumStream,
traceStream) where
import Data.IORef
import Data.Maybe
import Data.Monoid hiding ((<>))
import Data.Semigroup (Semigroup(..))
import Data.List.NonEmpty (NonEmpty((:|)))
import Control.Applicative
import Control.Monad
import Control.Monad.Trans
import Simulation.Aivika.Simulation
import Simulation.Aivika.Dynamics
import Simulation.Aivika.Event
import Simulation.Aivika.Composite
import Simulation.Aivika.Cont
import Simulation.Aivika.Process
import Simulation.Aivika.Signal
import Simulation.Aivika.Resource.Base
import Simulation.Aivika.QueueStrategy
import qualified Simulation.Aivika.Queue.Infinite.Base as IQ
import Simulation.Aivika.Internal.Arrival
newtype Stream a = Cons { runStream :: Process (a, Stream a)
}
instance Functor Stream where
fmap = mapStream
instance Applicative Stream where
pure a = let y = Cons (return (a, y)) in y
(<*>) = apStream
instance Alternative Stream where
empty = emptyStream
(<|>) = mergeStreams
instance Semigroup (Stream a) where
(<>) = mergeStreams
sconcat (h :| t) = concatStreams (h : t)
instance Monoid (Stream a) where
mempty = emptyStream
mappend = (<>)
mconcat = concatStreams
streamUsingId :: ProcessId -> Stream a -> Stream a
streamUsingId pid (Cons s) =
Cons $ processUsingId pid s
memoStream :: Stream a -> Simulation (Stream a)
memoStream (Cons s) =
do p <- memoProcess $
do ~(x, xs) <- s
xs' <- liftSimulation $ memoStream xs
return (x, xs')
return (Cons p)
zipStreamSeq :: Stream a -> Stream b -> Stream (a, b)
zipStreamSeq (Cons sa) (Cons sb) = Cons y where
y = do ~(x, xs) <- sa
~(y, ys) <- sb
return ((x, y), zipStreamSeq xs ys)
zipStreamParallel :: Stream a -> Stream b -> Stream (a, b)
zipStreamParallel (Cons sa) (Cons sb) = Cons y where
y = do ~((x, xs), (y, ys)) <- zipProcessParallel sa sb
return ((x, y), zipStreamParallel xs ys)
zip3StreamSeq :: Stream a -> Stream b -> Stream c -> Stream (a, b, c)
zip3StreamSeq (Cons sa) (Cons sb) (Cons sc) = Cons y where
y = do ~(x, xs) <- sa
~(y, ys) <- sb
~(z, zs) <- sc
return ((x, y, z), zip3StreamSeq xs ys zs)
zip3StreamParallel :: Stream a -> Stream b -> Stream c -> Stream (a, b, c)
zip3StreamParallel (Cons sa) (Cons sb) (Cons sc) = Cons y where
y = do ~((x, xs), (y, ys), (z, zs)) <- zip3ProcessParallel sa sb sc
return ((x, y, z), zip3StreamParallel xs ys zs)
unzipStream :: Stream (a, b) -> Simulation (Stream a, Stream b)
unzipStream s =
do s' <- memoStream s
let sa = mapStream fst s'
sb = mapStream snd s'
return (sa, sb)
streamSeq :: [Stream a] -> Stream [a]
streamSeq xs = Cons y where
y = do ps <- forM xs runStream
return (map fst ps, streamSeq $ map snd ps)
streamParallel :: [Stream a] -> Stream [a]
streamParallel xs = Cons y where
y = do ps <- processParallel $ map runStream xs
return (map fst ps, streamParallel $ map snd ps)
repeatProcess :: Process a -> Stream a
repeatProcess p = Cons y where
y = do a <- p
return (a, repeatProcess p)
mapStream :: (a -> b) -> Stream a -> Stream b
mapStream f (Cons s) = Cons y where
y = do (a, xs) <- s
return (f a, mapStream f xs)
mapStreamM :: (a -> Process b) -> Stream a -> Stream b
mapStreamM f (Cons s) = Cons y where
y = do (a, xs) <- s
b <- f a
return (b, mapStreamM f xs)
accumStream :: (acc -> a -> Process (acc, b)) -> acc -> Stream a -> Stream b
accumStream f acc xs = Cons $ loop xs acc where
loop (Cons s) acc =
do (a, xs) <- s
(acc', b) <- f acc a
return (b, Cons $ loop xs acc')
apStream :: Stream (a -> b) -> Stream a -> Stream b
apStream (Cons sf) (Cons sa) = Cons y where
y = do (f, sf') <- sf
(a, sa') <- sa
return (f a, apStream sf' sa')
apStreamM :: Stream (a -> Process b) -> Stream a -> Stream b
apStreamM (Cons sf) (Cons sa) = Cons y where
y = do (f, sf') <- sf
(a, sa') <- sa
x <- f a
return (x, apStreamM sf' sa')
filterStream :: (a -> Bool) -> Stream a -> Stream a
filterStream p (Cons s) = Cons y where
y = do (a, xs) <- s
if p a
then return (a, filterStream p xs)
else let Cons z = filterStream p xs in z
filterStreamM :: (a -> Process Bool) -> Stream a -> Stream a
filterStreamM p (Cons s) = Cons y where
y = do (a, xs) <- s
b <- p a
if b
then return (a, filterStreamM p xs)
else let Cons z = filterStreamM p xs in z
leftStream :: Stream (Either a b) -> Stream a
leftStream (Cons s) = Cons y where
y = do (a, xs) <- s
case a of
Left a -> return (a, leftStream xs)
Right _ -> let Cons z = leftStream xs in z
rightStream :: Stream (Either a b) -> Stream b
rightStream (Cons s) = Cons y where
y = do (a, xs) <- s
case a of
Left _ -> let Cons z = rightStream xs in z
Right a -> return (a, rightStream xs)
replaceLeftStream :: Stream (Either a b) -> Stream c -> Stream (Either c b)
replaceLeftStream (Cons sab) (ys0 @ ~(Cons sc)) = Cons z where
z = do (a, xs) <- sab
case a of
Left _ ->
do (b, ys) <- sc
return (Left b, replaceLeftStream xs ys)
Right a ->
return (Right a, replaceLeftStream xs ys0)
replaceRightStream :: Stream (Either a b) -> Stream c -> Stream (Either a c)
replaceRightStream (Cons sab) (ys0 @ ~(Cons sc)) = Cons z where
z = do (a, xs) <- sab
case a of
Right _ ->
do (b, ys) <- sc
return (Right b, replaceRightStream xs ys)
Left a ->
return (Left a, replaceRightStream xs ys0)
partitionEitherStream :: Stream (Either a b) -> Simulation (Stream a, Stream b)
partitionEitherStream s =
do s' <- memoStream s
return (leftStream s', rightStream s')
splitStream :: Int -> Stream a -> Simulation [Stream a]
splitStream = splitStreamQueueing FCFS
splitStreamQueueing :: EnqueueStrategy s
=> s
-> Int
-> Stream a
-> Simulation [Stream a]
splitStreamQueueing s n x =
do ref <- liftIO $ newIORef x
res <- newResource s 1
let reader =
usingResource res $
do p <- liftIO $ readIORef ref
(a, xs) <- runStream p
liftIO $ writeIORef ref xs
return a
return $ map (\i -> repeatProcess reader) [1..n]
splitStreamPrioritising :: PriorityQueueStrategy s p
=> s
-> [Stream p]
-> Stream a
-> Simulation [Stream a]
splitStreamPrioritising s ps x =
do ref <- liftIO $ newIORef x
res <- newResource s 1
let stream (Cons p) = Cons z where
z = do (p', ps) <- p
a <- usingResourceWithPriority res p' $
do p <- liftIO $ readIORef ref
(a, xs) <- runStream p
liftIO $ writeIORef ref xs
return a
return (a, stream ps)
return $ map stream ps
splitStreamFiltering :: [a -> Event Bool] -> Stream a -> Simulation [Stream a]
splitStreamFiltering = splitStreamFilteringQueueing FCFS
splitStreamFilteringQueueing :: EnqueueStrategy s
=> s
-> [a -> Event Bool]
-> Stream a
-> Simulation [Stream a]
splitStreamFilteringQueueing s preds x =
do ref <- liftIO $ newIORef x
res <- newResource s 1
let reader pred =
do a <-
usingResource res $
do p <- liftIO $ readIORef ref
(a, xs) <- runStream p
liftEvent $
do f <- pred a
if f
then do liftIO $ writeIORef ref xs
return $ Just a
else do liftIO $ writeIORef ref $ Cons (return (a, xs))
return Nothing
case a of
Just a -> return a
Nothing -> reader pred
return $ map (repeatProcess . reader) preds
concatStreams :: [Stream a] -> Stream a
concatStreams = concatQueuedStreams FCFS
concatQueuedStreams :: EnqueueStrategy s
=> s
-> [Stream a]
-> Stream a
concatQueuedStreams s streams = Cons z where
z = do reading <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
writing <- liftSimulation $ newResourceWithMaxCount s 1 (Just 1)
conting <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
ref <- liftIO $ newIORef Nothing
let writer p =
do (a, xs) <- runStream p
requestResource writing
liftIO $ writeIORef ref (Just a)
releaseResource reading
requestResource conting
writer xs
reader =
do requestResource reading
Just a <- liftIO $ readIORef ref
liftIO $ writeIORef ref Nothing
releaseResource writing
return a
forM_ streams $ spawnProcess . writer
a <- reader
let xs = repeatProcess (releaseResource conting >> reader)
return (a, xs)
concatPriorityStreams :: PriorityQueueStrategy s p
=> s
-> [Stream (p, a)]
-> Stream a
concatPriorityStreams s streams = Cons z where
z = do reading <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
writing <- liftSimulation $ newResourceWithMaxCount s 1 (Just 1)
conting <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
ref <- liftIO $ newIORef Nothing
let writer p =
do ((priority, a), xs) <- runStream p
requestResourceWithPriority writing priority
liftIO $ writeIORef ref (Just a)
releaseResource reading
requestResource conting
writer xs
reader =
do requestResource reading
Just a <- liftIO $ readIORef ref
liftIO $ writeIORef ref Nothing
releaseResource writing
return a
forM_ streams $ spawnProcess . writer
a <- reader
let xs = repeatProcess (releaseResource conting >> reader)
return (a, xs)
mergeStreams :: Stream a -> Stream a -> Stream a
mergeStreams = mergeQueuedStreams FCFS
mergeQueuedStreams :: EnqueueStrategy s
=> s
-> Stream a
-> Stream a
-> Stream a
mergeQueuedStreams s x y = concatQueuedStreams s [x, y]
mergePriorityStreams :: PriorityQueueStrategy s p
=> s
-> Stream (p, a)
-> Stream (p, a)
-> Stream a
mergePriorityStreams s x y = concatPriorityStreams s [x, y]
emptyStream :: Stream a
emptyStream = Cons neverProcess
consumeStream :: (a -> Process ()) -> Stream a -> Process ()
consumeStream f = p where
p (Cons s) = do (a, xs) <- s
f a
p xs
sinkStream :: Stream a -> Process ()
sinkStream = p where
p (Cons s) = do (a, xs) <- s
p xs
prefetchStream :: Stream a -> Stream a
prefetchStream s = Cons z where
z = do reading <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
writing <- liftSimulation $ newResourceWithMaxCount FCFS 1 (Just 1)
ref <- liftIO $ newIORef Nothing
let writer p =
do (a, xs) <- runStream p
requestResource writing
liftIO $ writeIORef ref (Just a)
releaseResource reading
writer xs
reader =
do requestResource reading
Just a <- liftIO $ readIORef ref
liftIO $ writeIORef ref Nothing
releaseResource writing
return a
spawnProcess $ writer s
runStream $ repeatProcess reader
queuedSignalStream :: (a -> Event ())
-> Process a
-> Signal a
-> Composite (Stream a)
queuedSignalStream enqueue dequeue s =
do h <- liftEvent $
handleSignal s enqueue
disposableComposite h
return $ repeatProcess dequeue
signalStream :: Signal a -> Composite (Stream a)
signalStream s =
do q <- liftSimulation IQ.newFCFSQueue
queuedSignalStream (IQ.enqueue q) (IQ.dequeue q) s
streamSignal :: Stream a -> Composite (Signal a)
streamSignal z =
do s <- liftSimulation newSignalSource
pid <- liftSimulation newProcessId
liftEvent $
runProcessUsingId pid $
consumeStream (liftEvent . triggerSignal s) z
disposableComposite $
DisposableEvent $
cancelProcessWithId pid
return $ publishSignal s
arrivalStream :: Stream a -> Stream (Arrival a)
arrivalStream s = Cons $ loop s Nothing where
loop s t0 = do (a, xs) <- runStream s
t <- liftDynamics time
let b = Arrival { arrivalValue = a,
arrivalTime = t,
arrivalDelay =
case t0 of
Nothing -> Nothing
Just t0 -> Just (t - t0) }
return (b, Cons $ loop xs (Just t))
delayStream :: a -> Stream a -> Stream a
delayStream a0 s = Cons $ return (a0, s)
singletonStream :: a -> Stream a
singletonStream a = Cons $ return (a, emptyStream)
joinStream :: Process (Stream a) -> Stream a
joinStream m = Cons $ m >>= runStream
failoverStream :: [Stream a] -> Stream a
failoverStream ps = Cons z where
z = do reading <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
writing <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
ref <- liftIO $ newIORef Nothing
pid <- processId
let writer p =
do requestResource writing
pid' <- processId
(a, xs) <-
finallyProcess (runStream p) $
liftEvent $
do cancelled' <- processCancelled pid'
when cancelled' $
releaseResourceWithinEvent writing
liftIO $ writeIORef ref (Just a)
releaseResource reading
writer xs
reader =
do releaseResource writing
requestResource reading
Just a <- liftIO $ readIORef ref
liftIO $ writeIORef ref Nothing
return a
loop [] = return ()
loop (p: ps) =
do pid' <- processId
h' <- liftEvent $
handleSignal (processCancelling pid) $ \() ->
cancelProcessWithId pid'
finallyProcess (writer p) $
liftEvent $
do disposeEvent h'
cancelled <- processCancelled pid
unless cancelled $
do cancelled' <- processCancelled pid'
unless cancelled' $
error "Expected the sub-process to be cancelled: failoverStream"
runProcess $ loop ps
liftEvent $ runProcess $ loop ps
runStream $ repeatProcess reader
takeStream :: Int -> Stream a -> Stream a
takeStream n s
| n <= 0 = emptyStream
| otherwise =
Cons $
do (a, xs) <- runStream s
return (a, takeStream (n - 1) xs)
takeStreamWhile :: (a -> Bool) -> Stream a -> Stream a
takeStreamWhile p s =
Cons $
do (a, xs) <- runStream s
if p a
then return (a, takeStreamWhile p xs)
else neverProcess
takeStreamWhileM :: (a -> Process Bool) -> Stream a -> Stream a
takeStreamWhileM p s =
Cons $
do (a, xs) <- runStream s
f <- p a
if f
then return (a, takeStreamWhileM p xs)
else neverProcess
dropStream :: Int -> Stream a -> Stream a
dropStream n s
| n <= 0 = s
| otherwise =
Cons $
do (a, xs) <- runStream s
runStream $ dropStream (n - 1) xs
dropStreamWhile :: (a -> Bool) -> Stream a -> Stream a
dropStreamWhile p s =
Cons $
do (a, xs) <- runStream s
if p a
then runStream $ dropStreamWhile p xs
else return (a, xs)
dropStreamWhileM :: (a -> Process Bool) -> Stream a -> Stream a
dropStreamWhileM p s =
Cons $
do (a, xs) <- runStream s
f <- p a
if f
then runStream $ dropStreamWhileM p xs
else return (a, xs)
cloneStream :: Int -> Stream a -> Simulation [Stream a]
cloneStream n s =
do qs <- forM [1..n] $ \i -> IQ.newFCFSQueue
rs <- newFCFSResource 1
ref <- liftIO $ newIORef s
let reader m q =
do a <- liftEvent $ IQ.tryDequeue q
case a of
Just a -> return a
Nothing ->
usingResource rs $
do a <- liftEvent $ IQ.tryDequeue q
case a of
Just a -> return a
Nothing ->
do s <- liftIO $ readIORef ref
(a, xs) <- runStream s
liftIO $ writeIORef ref xs
forM_ (zip [1..] qs) $ \(i, q) ->
unless (i == m) $
liftEvent $ IQ.enqueue q a
return a
forM (zip [1..] qs) $ \(i, q) ->
return $ repeatProcess $ reader i q
firstArrivalStream :: Int -> Stream a -> Stream a
firstArrivalStream n s = assembleAccumStream f (1, Nothing) s
where f (i, a0) a =
let a0' = Just $ fromMaybe a a0
in if i `mod` n == 0
then return ((1, Nothing), a0')
else return ((i + 1, a0'), Nothing)
lastArrivalStream :: Int -> Stream a -> Stream a
lastArrivalStream n s = assembleAccumStream f 1 s
where f i a =
if i `mod` n == 0
then return (1, Just a)
else return (i + 1, Nothing)
assembleAccumStream :: (acc -> a -> Process (acc, Maybe b)) -> acc -> Stream a -> Stream b
assembleAccumStream f acc s =
mapStream fromJust $
filterStream isJust $
accumStream f acc s
traceStream :: Maybe String
-> Maybe String
-> Stream a
-> Stream a
traceStream request response s = Cons $ loop s where
loop s = do (a, xs) <-
case request of
Nothing -> runStream s
Just message ->
traceProcess message $
runStream s
case response of
Nothing -> return (a, Cons $ loop xs)
Just message ->
traceProcess message $
return (a, Cons $ loop xs)