module Simulation.Aivika.Trans.Stream
(
Stream(..),
emptyStream,
mergeStreams,
mergeQueuedStreams,
mergePriorityStreams,
concatStreams,
concatQueuedStreams,
concatPriorityStreams,
splitStream,
splitStreamQueueing,
splitStreamPrioritising,
streamUsingId,
prefetchStream,
delayStream,
arrivalStream,
memoStream,
zipStreamSeq,
zipStreamParallel,
zip3StreamSeq,
zip3StreamParallel,
unzipStream,
streamSeq,
streamParallel,
consumeStream,
sinkStream,
repeatProcess,
mapStream,
mapStreamM,
apStream,
apStreamM,
filterStream,
filterStreamM,
signalStream,
streamSignal,
leftStream,
rightStream,
replaceLeftStream,
replaceRightStream,
partitionEitherStream) where
import Data.Maybe
import Data.Monoid
import Control.Applicative
import Control.Monad
import Control.Monad.Trans
import Simulation.Aivika.Trans.Session
import Simulation.Aivika.Trans.ProtoRef
import Simulation.Aivika.Trans.Comp
import Simulation.Aivika.Trans.Parameter
import Simulation.Aivika.Trans.Simulation
import Simulation.Aivika.Trans.Dynamics
import Simulation.Aivika.Trans.Event
import Simulation.Aivika.Trans.Cont
import Simulation.Aivika.Trans.Process
import Simulation.Aivika.Trans.Signal
import Simulation.Aivika.Trans.Resource
import Simulation.Aivika.Trans.QueueStrategy
import Simulation.Aivika.Trans.Queue.Infinite
import Simulation.Aivika.Arrival (Arrival(..))
newtype Stream m a = Cons { runStream :: Process m (a, Stream m a)
}
instance MonadComp m => Functor (Stream m) where
fmap = mapStream
instance MonadComp m => Applicative (Stream m) where
pure a = let y = Cons (return (a, y)) in y
(<*>) = apStream
instance MonadComp m => Monoid (Stream m a) where
mempty = emptyStream
mappend = mergeStreams
mconcat = concatStreams
streamUsingId :: MonadComp m => ProcessId m -> Stream m a -> Stream m a
streamUsingId pid (Cons s) =
Cons $ processUsingId pid s
memoStream :: MonadComp m => Stream m a -> Simulation m (Stream m a)
memoStream (Cons s) =
do p <- memoProcess $
do ~(x, xs) <- s
xs' <- liftSimulation $ memoStream xs
return (x, xs')
return (Cons p)
zipStreamSeq :: MonadComp m => Stream m a -> Stream m b -> Stream m (a, b)
zipStreamSeq (Cons sa) (Cons sb) = Cons y where
y = do ~(x, xs) <- sa
~(y, ys) <- sb
return ((x, y), zipStreamSeq xs ys)
zipStreamParallel :: MonadComp m => Stream m a -> Stream m b -> Stream m (a, b)
zipStreamParallel (Cons sa) (Cons sb) = Cons y where
y = do ~((x, xs), (y, ys)) <- zipProcessParallel sa sb
return ((x, y), zipStreamParallel xs ys)
zip3StreamSeq :: MonadComp m => Stream m a -> Stream m b -> Stream m c -> Stream m (a, b, c)
zip3StreamSeq (Cons sa) (Cons sb) (Cons sc) = Cons y where
y = do ~(x, xs) <- sa
~(y, ys) <- sb
~(z, zs) <- sc
return ((x, y, z), zip3StreamSeq xs ys zs)
zip3StreamParallel :: MonadComp m => Stream m a -> Stream m b -> Stream m c -> Stream m (a, b, c)
zip3StreamParallel (Cons sa) (Cons sb) (Cons sc) = Cons y where
y = do ~((x, xs), (y, ys), (z, zs)) <- zip3ProcessParallel sa sb sc
return ((x, y, z), zip3StreamParallel xs ys zs)
unzipStream :: MonadComp m => Stream m (a, b) -> Simulation m (Stream m a, Stream m b)
unzipStream s =
do s' <- memoStream s
let sa = mapStream fst s'
sb = mapStream snd s'
return (sa, sb)
streamSeq :: MonadComp m => [Stream m a] -> Stream m [a]
streamSeq xs = Cons y where
y = do ps <- forM xs runStream
return (map fst ps, streamSeq $ map snd ps)
streamParallel :: MonadComp m => [Stream m a] -> Stream m [a]
streamParallel xs = Cons y where
y = do ps <- processParallel $ map runStream xs
return (map fst ps, streamParallel $ map snd ps)
repeatProcess :: MonadComp m => Process m a -> Stream m a
repeatProcess p = Cons y where
y = do a <- p
return (a, repeatProcess p)
mapStream :: MonadComp m => (a -> b) -> Stream m a -> Stream m b
mapStream f (Cons s) = Cons y where
y = do (a, xs) <- s
return (f a, mapStream f xs)
mapStreamM :: MonadComp m => (a -> Process m b) -> Stream m a -> Stream m b
mapStreamM f (Cons s) = Cons y where
y = do (a, xs) <- s
b <- f a
return (b, mapStreamM f xs)
apStream :: MonadComp m => Stream m (a -> b) -> Stream m a -> Stream m b
apStream (Cons sf) (Cons sa) = Cons y where
y = do (f, sf') <- sf
(a, sa') <- sa
return (f a, apStream sf' sa')
apStreamM :: MonadComp m => Stream m (a -> Process m b) -> Stream m a -> Stream m b
apStreamM (Cons sf) (Cons sa) = Cons y where
y = do (f, sf') <- sf
(a, sa') <- sa
x <- f a
return (x, apStreamM sf' sa')
filterStream :: MonadComp m => (a -> Bool) -> Stream m a -> Stream m a
filterStream p (Cons s) = Cons y where
y = do (a, xs) <- s
if p a
then return (a, filterStream p xs)
else let Cons z = filterStream p xs in z
filterStreamM :: MonadComp m => (a -> Process m Bool) -> Stream m a -> Stream m a
filterStreamM p (Cons s) = Cons y where
y = do (a, xs) <- s
b <- p a
if b
then return (a, filterStreamM p xs)
else let Cons z = filterStreamM p xs in z
leftStream :: MonadComp m => Stream m (Either a b) -> Stream m a
leftStream (Cons s) = Cons y where
y = do (a, xs) <- s
case a of
Left a -> return (a, leftStream xs)
Right _ -> let Cons z = leftStream xs in z
rightStream :: MonadComp m => Stream m (Either a b) -> Stream m b
rightStream (Cons s) = Cons y where
y = do (a, xs) <- s
case a of
Left _ -> let Cons z = rightStream xs in z
Right a -> return (a, rightStream xs)
replaceLeftStream :: MonadComp m => Stream m (Either a b) -> Stream m c -> Stream m (Either c b)
replaceLeftStream (Cons sab) (ys0 @ ~(Cons sc)) = Cons z where
z = do (a, xs) <- sab
case a of
Left _ ->
do (b, ys) <- sc
return (Left b, replaceLeftStream xs ys)
Right a ->
return (Right a, replaceLeftStream xs ys0)
replaceRightStream :: MonadComp m => Stream m (Either a b) -> Stream m c -> Stream m (Either a c)
replaceRightStream (Cons sab) (ys0 @ ~(Cons sc)) = Cons z where
z = do (a, xs) <- sab
case a of
Right _ ->
do (b, ys) <- sc
return (Right b, replaceRightStream xs ys)
Left a ->
return (Left a, replaceRightStream xs ys0)
partitionEitherStream :: MonadComp m => Stream m (Either a b) -> Simulation m (Stream m a, Stream m b)
partitionEitherStream s =
do s' <- memoStream s
return (leftStream s', rightStream s')
splitStream :: MonadComp m => Int -> Stream m a -> Simulation m [Stream m a]
splitStream = splitStreamQueueing FCFS
splitStreamQueueing :: (MonadComp m, EnqueueStrategy m s)
=> s
-> Int
-> Stream m a
-> Simulation m [Stream m a]
splitStreamQueueing s n x =
do session <- liftParameter simulationSession
ref <- liftComp $ newProtoRef session x
res <- newResource s 1
let reader =
usingResource res $
do p <- liftComp $ readProtoRef ref
(a, xs) <- runStream p
liftComp $ writeProtoRef ref xs
return a
return $ map (\i -> repeatProcess reader) [1..n]
splitStreamPrioritising :: (MonadComp m, PriorityQueueStrategy m s p)
=> s
-> [Stream m p]
-> Stream m a
-> Simulation m [Stream m a]
splitStreamPrioritising s ps x =
do session <- liftParameter simulationSession
ref <- liftComp $ newProtoRef session x
res <- newResource s 1
let stream (Cons p) = Cons z where
z = do (p', ps) <- p
a <- usingResourceWithPriority res p' $
do p <- liftComp $ readProtoRef ref
(a, xs) <- runStream p
liftComp $ writeProtoRef ref xs
return a
return (a, stream ps)
return $ map stream ps
concatStreams :: MonadComp m => [Stream m a] -> Stream m a
concatStreams = concatQueuedStreams FCFS
concatQueuedStreams :: (MonadComp m, EnqueueStrategy m s)
=> s
-> [Stream m a]
-> Stream m a
concatQueuedStreams s streams = Cons z where
z = do reading <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
writing <- liftSimulation $ newResourceWithMaxCount s 1 (Just 1)
conting <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
session <- liftParameter simulationSession
ref <- liftComp $ newProtoRef session Nothing
let writer p =
do (a, xs) <- runStream p
requestResource writing
liftComp $ writeProtoRef ref (Just a)
releaseResource reading
requestResource conting
writer xs
reader =
do requestResource reading
Just a <- liftComp $ readProtoRef ref
liftComp $ writeProtoRef ref Nothing
releaseResource writing
return a
forM_ streams $ spawnProcess CancelTogether . writer
a <- reader
let xs = repeatProcess (releaseResource conting >> reader)
return (a, xs)
concatPriorityStreams :: (MonadComp m, PriorityQueueStrategy m s p)
=> s
-> [Stream m (p, a)]
-> Stream m a
concatPriorityStreams s streams = Cons z where
z = do reading <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
writing <- liftSimulation $ newResourceWithMaxCount s 1 (Just 1)
conting <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
session <- liftParameter simulationSession
ref <- liftComp $ newProtoRef session Nothing
let writer p =
do ((priority, a), xs) <- runStream p
requestResourceWithPriority writing priority
liftComp $ writeProtoRef ref (Just a)
releaseResource reading
requestResource conting
writer xs
reader =
do requestResource reading
Just a <- liftComp $ readProtoRef ref
liftComp $ writeProtoRef ref Nothing
releaseResource writing
return a
forM_ streams $ spawnProcess CancelTogether . writer
a <- reader
let xs = repeatProcess (releaseResource conting >> reader)
return (a, xs)
mergeStreams :: MonadComp m => Stream m a -> Stream m a -> Stream m a
mergeStreams = mergeQueuedStreams FCFS
mergeQueuedStreams :: (MonadComp m, EnqueueStrategy m s)
=> s
-> Stream m a
-> Stream m a
-> Stream m a
mergeQueuedStreams s x y = concatQueuedStreams s [x, y]
mergePriorityStreams :: (MonadComp m, PriorityQueueStrategy m s p)
=> s
-> Stream m (p, a)
-> Stream m (p, a)
-> Stream m a
mergePriorityStreams s x y = concatPriorityStreams s [x, y]
emptyStream :: MonadComp m => Stream m a
emptyStream = Cons neverProcess
consumeStream :: MonadComp m => (a -> Process m ()) -> Stream m a -> Process m ()
consumeStream f = p where
p (Cons s) = do (a, xs) <- s
f a
p xs
sinkStream :: MonadComp m => Stream m a -> Process m ()
sinkStream = p where
p (Cons s) = do (a, xs) <- s
p xs
prefetchStream :: MonadComp m => Stream m a -> Stream m a
prefetchStream s = Cons z where
z = do reading <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
writing <- liftSimulation $ newResourceWithMaxCount FCFS 1 (Just 1)
session <- liftParameter simulationSession
ref <- liftComp $ newProtoRef session Nothing
let writer p =
do (a, xs) <- runStream p
requestResource writing
liftComp $ writeProtoRef ref (Just a)
releaseResource reading
writer xs
reader =
do requestResource reading
Just a <- liftComp $ readProtoRef ref
liftComp $ writeProtoRef ref Nothing
releaseResource writing
return a
spawnProcess CancelTogether $ writer s
runStream $ repeatProcess reader
signalStream :: MonadComp m => Signal m a -> Process m (Stream m a)
signalStream s =
do q <- liftEvent newFCFSQueue
h <- liftEvent $
handleSignal s $
enqueue q
whenCancellingProcess $ disposeEvent h
return $ repeatProcess $ dequeue q
streamSignal :: MonadComp m => Stream m a -> Process m (Signal m a)
streamSignal z =
do s <- liftSimulation newSignalSource
spawnProcess CancelTogether $
consumeStream (liftEvent . triggerSignal s) z
return $ publishSignal s
arrivalStream :: MonadComp m => Stream m a -> Stream m (Arrival a)
arrivalStream s = Cons $ loop s Nothing where
loop s t0 = do (a, xs) <- runStream s
t <- liftDynamics time
let b = Arrival { arrivalValue = a,
arrivalTime = t,
arrivalDelay =
case t0 of
Nothing -> Nothing
Just t0 -> Just (t t0) }
return (b, Cons $ loop xs (Just t))
delayStream :: MonadComp m => a -> Stream m a -> Stream m a
delayStream a0 s = Cons $ return (a0, s)