{-# LANGUAGE FlexibleContexts #-}
module Simulation.Aivika.Trans.Stream
(
Stream(..),
emptyStream,
mergeStreams,
mergeQueuedStreams,
mergePriorityStreams,
concatStreams,
concatQueuedStreams,
concatPriorityStreams,
splitStream,
splitStreamQueueing,
splitStreamPrioritising,
splitStreamFiltering,
splitStreamFilteringQueueing,
streamUsingId,
prefetchStream,
delayStream,
arrivalStream,
memoStream,
zipStreamSeq,
zipStreamParallel,
zip3StreamSeq,
zip3StreamParallel,
unzipStream,
streamSeq,
streamParallel,
consumeStream,
sinkStream,
repeatProcess,
mapStream,
mapStreamM,
accumStream,
apStream,
apStreamM,
filterStream,
filterStreamM,
takeStream,
takeStreamWhile,
takeStreamWhileM,
dropStream,
dropStreamWhile,
dropStreamWhileM,
singletonStream,
joinStream,
failoverStream,
signalStream,
streamSignal,
queuedSignalStream,
leftStream,
rightStream,
replaceLeftStream,
replaceRightStream,
partitionEitherStream,
cloneStream,
firstArrivalStream,
lastArrivalStream,
assembleAccumStream,
traceStream) where
import Data.Maybe
import Data.Monoid hiding ((<>))
import Data.Semigroup (Semigroup(..))
import Data.List.NonEmpty (NonEmpty((:|)))
import Control.Applicative
import Control.Monad
import Control.Monad.Trans
import Simulation.Aivika.Trans.Ref.Base
import Simulation.Aivika.Trans.DES
import Simulation.Aivika.Trans.Parameter
import Simulation.Aivika.Trans.Simulation
import Simulation.Aivika.Trans.Dynamics
import Simulation.Aivika.Trans.Event
import Simulation.Aivika.Trans.Composite
import Simulation.Aivika.Trans.Cont
import Simulation.Aivika.Trans.Process
import Simulation.Aivika.Trans.Signal
import Simulation.Aivika.Trans.Resource.Base
import Simulation.Aivika.Trans.QueueStrategy
import qualified Simulation.Aivika.Trans.Queue.Infinite.Base as IQ
import Simulation.Aivika.Arrival (Arrival(..))
newtype Stream m a = Cons { Stream m a -> Process m (a, Stream m a)
runStream :: Process m (a, Stream m a)
}
instance MonadDES m => Functor (Stream m) where
{-# INLINE fmap #-}
fmap :: (a -> b) -> Stream m a -> Stream m b
fmap = (a -> b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
MonadDES m =>
(a -> b) -> Stream m a -> Stream m b
mapStream
instance MonadDES m => Applicative (Stream m) where
{-# INLINE pure #-}
pure :: a -> Stream m a
pure a
a = let y :: Stream m a
y = Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons ((a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream m a
y)) in Stream m a
y
{-# INLINE (<*>) #-}
<*> :: Stream m (a -> b) -> Stream m a -> Stream m b
(<*>) = Stream m (a -> b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
MonadDES m =>
Stream m (a -> b) -> Stream m a -> Stream m b
apStream
instance MonadDES m => Alternative (Stream m) where
{-# INLINE empty #-}
empty :: Stream m a
empty = Stream m a
forall (m :: * -> *) a. MonadDES m => Stream m a
emptyStream
{-# INLINE (<|>) #-}
<|> :: Stream m a -> Stream m a -> Stream m a
(<|>) = Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Stream m a -> Stream m a
mergeStreams
instance MonadDES m => Semigroup (Stream m a) where
{-# INLINE (<>) #-}
<> :: Stream m a -> Stream m a -> Stream m a
(<>) = Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Stream m a -> Stream m a
mergeStreams
{-# INLINE sconcat #-}
sconcat :: NonEmpty (Stream m a) -> Stream m a
sconcat (Stream m a
h :| [Stream m a]
t) = [Stream m a] -> Stream m a
forall (m :: * -> *) a. MonadDES m => [Stream m a] -> Stream m a
concatStreams (Stream m a
h Stream m a -> [Stream m a] -> [Stream m a]
forall a. a -> [a] -> [a]
: [Stream m a]
t)
instance MonadDES m => Monoid (Stream m a) where
{-# INLINE mempty #-}
mempty :: Stream m a
mempty = Stream m a
forall (m :: * -> *) a. MonadDES m => Stream m a
emptyStream
{-# INLINE mappend #-}
mappend :: Stream m a -> Stream m a -> Stream m a
mappend = Stream m a -> Stream m a -> Stream m a
forall a. Semigroup a => a -> a -> a
(<>)
{-# INLINE mconcat #-}
mconcat :: [Stream m a] -> Stream m a
mconcat = [Stream m a] -> Stream m a
forall (m :: * -> *) a. MonadDES m => [Stream m a] -> Stream m a
concatStreams
streamUsingId :: MonadDES m => ProcessId m -> Stream m a -> Stream m a
{-# INLINABLE streamUsingId #-}
streamUsingId :: ProcessId m -> Stream m a -> Stream m a
streamUsingId ProcessId m
pid (Cons Process m (a, Stream m a)
s) =
Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (a, Stream m a) -> Stream m a)
-> Process m (a, Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ ProcessId m
-> Process m (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a.
MonadDES m =>
ProcessId m -> Process m a -> Process m a
processUsingId ProcessId m
pid Process m (a, Stream m a)
s
memoStream :: MonadDES m => Stream m a -> Simulation m (Stream m a)
{-# INLINABLE memoStream #-}
memoStream :: Stream m a -> Simulation m (Stream m a)
memoStream (Cons Process m (a, Stream m a)
s) =
do Process m (a, Stream m a)
p <- Process m (a, Stream m a)
-> Simulation m (Process m (a, Stream m a))
forall (m :: * -> *) a.
MonadDES m =>
Process m a -> Simulation m (Process m a)
memoProcess (Process m (a, Stream m a)
-> Simulation m (Process m (a, Stream m a)))
-> Process m (a, Stream m a)
-> Simulation m (Process m (a, Stream m a))
forall a b. (a -> b) -> a -> b
$
do ~(a
x, Stream m a
xs) <- Process m (a, Stream m a)
s
Stream m a
xs' <- Simulation m (Stream m a) -> Process m (Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Stream m a) -> Process m (Stream m a))
-> Simulation m (Stream m a) -> Process m (Stream m a)
forall a b. (a -> b) -> a -> b
$ Stream m a -> Simulation m (Stream m a)
forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Simulation m (Stream m a)
memoStream Stream m a
xs
(a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
x, Stream m a
xs')
Stream m a -> Simulation m (Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (a, Stream m a)
p)
zipStreamSeq :: MonadDES m => Stream m a -> Stream m b -> Stream m (a, b)
{-# INLINABLE zipStreamSeq #-}
zipStreamSeq :: Stream m a -> Stream m b -> Stream m (a, b)
zipStreamSeq (Cons Process m (a, Stream m a)
sa) (Cons Process m (b, Stream m b)
sb) = Process m ((a, b), Stream m (a, b)) -> Stream m (a, b)
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m ((a, b), Stream m (a, b))
y where
y :: Process m ((a, b), Stream m (a, b))
y = do ~(a
x, Stream m a
xs) <- Process m (a, Stream m a)
sa
~(b
y, Stream m b
ys) <- Process m (b, Stream m b)
sb
((a, b), Stream m (a, b)) -> Process m ((a, b), Stream m (a, b))
forall (m :: * -> *) a. Monad m => a -> m a
return ((a
x, b
y), Stream m a -> Stream m b -> Stream m (a, b)
forall (m :: * -> *) a b.
MonadDES m =>
Stream m a -> Stream m b -> Stream m (a, b)
zipStreamSeq Stream m a
xs Stream m b
ys)
zipStreamParallel :: MonadDES m => Stream m a -> Stream m b -> Stream m (a, b)
{-# INLINABLE zipStreamParallel #-}
zipStreamParallel :: Stream m a -> Stream m b -> Stream m (a, b)
zipStreamParallel (Cons Process m (a, Stream m a)
sa) (Cons Process m (b, Stream m b)
sb) = Process m ((a, b), Stream m (a, b)) -> Stream m (a, b)
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m ((a, b), Stream m (a, b))
y where
y :: Process m ((a, b), Stream m (a, b))
y = do ~((a
x, Stream m a
xs), (b
y, Stream m b
ys)) <- Process m (a, Stream m a)
-> Process m (b, Stream m b)
-> Process m ((a, Stream m a), (b, Stream m b))
forall (m :: * -> *) a b.
MonadDES m =>
Process m a -> Process m b -> Process m (a, b)
zipProcessParallel Process m (a, Stream m a)
sa Process m (b, Stream m b)
sb
((a, b), Stream m (a, b)) -> Process m ((a, b), Stream m (a, b))
forall (m :: * -> *) a. Monad m => a -> m a
return ((a
x, b
y), Stream m a -> Stream m b -> Stream m (a, b)
forall (m :: * -> *) a b.
MonadDES m =>
Stream m a -> Stream m b -> Stream m (a, b)
zipStreamParallel Stream m a
xs Stream m b
ys)
zip3StreamSeq :: MonadDES m => Stream m a -> Stream m b -> Stream m c -> Stream m (a, b, c)
{-# INLINABLE zip3StreamSeq #-}
zip3StreamSeq :: Stream m a -> Stream m b -> Stream m c -> Stream m (a, b, c)
zip3StreamSeq (Cons Process m (a, Stream m a)
sa) (Cons Process m (b, Stream m b)
sb) (Cons Process m (c, Stream m c)
sc) = Process m ((a, b, c), Stream m (a, b, c)) -> Stream m (a, b, c)
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m ((a, b, c), Stream m (a, b, c))
y where
y :: Process m ((a, b, c), Stream m (a, b, c))
y = do ~(a
x, Stream m a
xs) <- Process m (a, Stream m a)
sa
~(b
y, Stream m b
ys) <- Process m (b, Stream m b)
sb
~(c
z, Stream m c
zs) <- Process m (c, Stream m c)
sc
((a, b, c), Stream m (a, b, c))
-> Process m ((a, b, c), Stream m (a, b, c))
forall (m :: * -> *) a. Monad m => a -> m a
return ((a
x, b
y, c
z), Stream m a -> Stream m b -> Stream m c -> Stream m (a, b, c)
forall (m :: * -> *) a b c.
MonadDES m =>
Stream m a -> Stream m b -> Stream m c -> Stream m (a, b, c)
zip3StreamSeq Stream m a
xs Stream m b
ys Stream m c
zs)
zip3StreamParallel :: MonadDES m => Stream m a -> Stream m b -> Stream m c -> Stream m (a, b, c)
{-# INLINABLE zip3StreamParallel #-}
zip3StreamParallel :: Stream m a -> Stream m b -> Stream m c -> Stream m (a, b, c)
zip3StreamParallel (Cons Process m (a, Stream m a)
sa) (Cons Process m (b, Stream m b)
sb) (Cons Process m (c, Stream m c)
sc) = Process m ((a, b, c), Stream m (a, b, c)) -> Stream m (a, b, c)
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m ((a, b, c), Stream m (a, b, c))
y where
y :: Process m ((a, b, c), Stream m (a, b, c))
y = do ~((a
x, Stream m a
xs), (b
y, Stream m b
ys), (c
z, Stream m c
zs)) <- Process m (a, Stream m a)
-> Process m (b, Stream m b)
-> Process m (c, Stream m c)
-> Process m ((a, Stream m a), (b, Stream m b), (c, Stream m c))
forall (m :: * -> *) a b c.
MonadDES m =>
Process m a -> Process m b -> Process m c -> Process m (a, b, c)
zip3ProcessParallel Process m (a, Stream m a)
sa Process m (b, Stream m b)
sb Process m (c, Stream m c)
sc
((a, b, c), Stream m (a, b, c))
-> Process m ((a, b, c), Stream m (a, b, c))
forall (m :: * -> *) a. Monad m => a -> m a
return ((a
x, b
y, c
z), Stream m a -> Stream m b -> Stream m c -> Stream m (a, b, c)
forall (m :: * -> *) a b c.
MonadDES m =>
Stream m a -> Stream m b -> Stream m c -> Stream m (a, b, c)
zip3StreamParallel Stream m a
xs Stream m b
ys Stream m c
zs)
unzipStream :: MonadDES m => Stream m (a, b) -> Simulation m (Stream m a, Stream m b)
{-# INLINABLE unzipStream #-}
unzipStream :: Stream m (a, b) -> Simulation m (Stream m a, Stream m b)
unzipStream Stream m (a, b)
s =
do Stream m (a, b)
s' <- Stream m (a, b) -> Simulation m (Stream m (a, b))
forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Simulation m (Stream m a)
memoStream Stream m (a, b)
s
let sa :: Stream m a
sa = ((a, b) -> a) -> Stream m (a, b) -> Stream m a
forall (m :: * -> *) a b.
MonadDES m =>
(a -> b) -> Stream m a -> Stream m b
mapStream (a, b) -> a
forall a b. (a, b) -> a
fst Stream m (a, b)
s'
sb :: Stream m b
sb = ((a, b) -> b) -> Stream m (a, b) -> Stream m b
forall (m :: * -> *) a b.
MonadDES m =>
(a -> b) -> Stream m a -> Stream m b
mapStream (a, b) -> b
forall a b. (a, b) -> b
snd Stream m (a, b)
s'
(Stream m a, Stream m b) -> Simulation m (Stream m a, Stream m b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream m a
sa, Stream m b
sb)
streamSeq :: MonadDES m => [Stream m a] -> Stream m [a]
{-# INLINABLE streamSeq #-}
streamSeq :: [Stream m a] -> Stream m [a]
streamSeq [Stream m a]
xs = Process m ([a], Stream m [a]) -> Stream m [a]
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m ([a], Stream m [a])
y where
y :: Process m ([a], Stream m [a])
y = do [(a, Stream m a)]
ps <- [Stream m a]
-> (Stream m a -> Process m (a, Stream m a))
-> Process m [(a, Stream m a)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Stream m a]
xs Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream
([a], Stream m [a]) -> Process m ([a], Stream m [a])
forall (m :: * -> *) a. Monad m => a -> m a
return (((a, Stream m a) -> a) -> [(a, Stream m a)] -> [a]
forall a b. (a -> b) -> [a] -> [b]
map (a, Stream m a) -> a
forall a b. (a, b) -> a
fst [(a, Stream m a)]
ps, [Stream m a] -> Stream m [a]
forall (m :: * -> *) a. MonadDES m => [Stream m a] -> Stream m [a]
streamSeq ([Stream m a] -> Stream m [a]) -> [Stream m a] -> Stream m [a]
forall a b. (a -> b) -> a -> b
$ ((a, Stream m a) -> Stream m a)
-> [(a, Stream m a)] -> [Stream m a]
forall a b. (a -> b) -> [a] -> [b]
map (a, Stream m a) -> Stream m a
forall a b. (a, b) -> b
snd [(a, Stream m a)]
ps)
streamParallel :: MonadDES m => [Stream m a] -> Stream m [a]
{-# INLINABLE streamParallel #-}
streamParallel :: [Stream m a] -> Stream m [a]
streamParallel [Stream m a]
xs = Process m ([a], Stream m [a]) -> Stream m [a]
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m ([a], Stream m [a])
y where
y :: Process m ([a], Stream m [a])
y = do [(a, Stream m a)]
ps <- [Process m (a, Stream m a)] -> Process m [(a, Stream m a)]
forall (m :: * -> *) a.
MonadDES m =>
[Process m a] -> Process m [a]
processParallel ([Process m (a, Stream m a)] -> Process m [(a, Stream m a)])
-> [Process m (a, Stream m a)] -> Process m [(a, Stream m a)]
forall a b. (a -> b) -> a -> b
$ (Stream m a -> Process m (a, Stream m a))
-> [Stream m a] -> [Process m (a, Stream m a)]
forall a b. (a -> b) -> [a] -> [b]
map Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream [Stream m a]
xs
([a], Stream m [a]) -> Process m ([a], Stream m [a])
forall (m :: * -> *) a. Monad m => a -> m a
return (((a, Stream m a) -> a) -> [(a, Stream m a)] -> [a]
forall a b. (a -> b) -> [a] -> [b]
map (a, Stream m a) -> a
forall a b. (a, b) -> a
fst [(a, Stream m a)]
ps, [Stream m a] -> Stream m [a]
forall (m :: * -> *) a. MonadDES m => [Stream m a] -> Stream m [a]
streamParallel ([Stream m a] -> Stream m [a]) -> [Stream m a] -> Stream m [a]
forall a b. (a -> b) -> a -> b
$ ((a, Stream m a) -> Stream m a)
-> [(a, Stream m a)] -> [Stream m a]
forall a b. (a -> b) -> [a] -> [b]
map (a, Stream m a) -> Stream m a
forall a b. (a, b) -> b
snd [(a, Stream m a)]
ps)
repeatProcess :: MonadDES m => Process m a -> Stream m a
{-# INLINABLE repeatProcess #-}
repeatProcess :: Process m a -> Stream m a
repeatProcess Process m a
p = Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (a, Stream m a)
y where
y :: Process m (a, Stream m a)
y = do a
a <- Process m a
p
(a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Process m a -> Stream m a
forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess Process m a
p)
mapStream :: MonadDES m => (a -> b) -> Stream m a -> Stream m b
{-# INLINABLE mapStream #-}
mapStream :: (a -> b) -> Stream m a -> Stream m b
mapStream a -> b
f (Cons Process m (a, Stream m a)
s) = Process m (b, Stream m b) -> Stream m b
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (b, Stream m b)
y where
y :: Process m (b, Stream m b)
y = do (a
a, Stream m a
xs) <- Process m (a, Stream m a)
s
(b, Stream m b) -> Process m (b, Stream m b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> b
f a
a, (a -> b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
MonadDES m =>
(a -> b) -> Stream m a -> Stream m b
mapStream a -> b
f Stream m a
xs)
mapStreamM :: MonadDES m => (a -> Process m b) -> Stream m a -> Stream m b
{-# INLINABLE mapStreamM #-}
mapStreamM :: (a -> Process m b) -> Stream m a -> Stream m b
mapStreamM a -> Process m b
f (Cons Process m (a, Stream m a)
s) = Process m (b, Stream m b) -> Stream m b
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (b, Stream m b)
y where
y :: Process m (b, Stream m b)
y = do (a
a, Stream m a
xs) <- Process m (a, Stream m a)
s
b
b <- a -> Process m b
f a
a
(b, Stream m b) -> Process m (b, Stream m b)
forall (m :: * -> *) a. Monad m => a -> m a
return (b
b, (a -> Process m b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
MonadDES m =>
(a -> Process m b) -> Stream m a -> Stream m b
mapStreamM a -> Process m b
f Stream m a
xs)
accumStream :: MonadDES m => (acc -> a -> Process m (acc, b)) -> acc -> Stream m a -> Stream m b
{-# INLINABLE accumStream #-}
accumStream :: (acc -> a -> Process m (acc, b)) -> acc -> Stream m a -> Stream m b
accumStream acc -> a -> Process m (acc, b)
f acc
acc Stream m a
xs = Process m (b, Stream m b) -> Stream m b
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (b, Stream m b) -> Stream m b)
-> Process m (b, Stream m b) -> Stream m b
forall a b. (a -> b) -> a -> b
$ Stream m a -> acc -> Process m (b, Stream m b)
loop Stream m a
xs acc
acc where
loop :: Stream m a -> acc -> Process m (b, Stream m b)
loop (Cons Process m (a, Stream m a)
s) acc
acc =
do (a
a, Stream m a
xs) <- Process m (a, Stream m a)
s
(acc
acc', b
b) <- acc -> a -> Process m (acc, b)
f acc
acc a
a
(b, Stream m b) -> Process m (b, Stream m b)
forall (m :: * -> *) a. Monad m => a -> m a
return (b
b, Process m (b, Stream m b) -> Stream m b
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (b, Stream m b) -> Stream m b)
-> Process m (b, Stream m b) -> Stream m b
forall a b. (a -> b) -> a -> b
$ Stream m a -> acc -> Process m (b, Stream m b)
loop Stream m a
xs acc
acc')
apStream :: MonadDES m => Stream m (a -> b) -> Stream m a -> Stream m b
{-# INLINABLE apStream #-}
apStream :: Stream m (a -> b) -> Stream m a -> Stream m b
apStream (Cons Process m (a -> b, Stream m (a -> b))
sf) (Cons Process m (a, Stream m a)
sa) = Process m (b, Stream m b) -> Stream m b
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (b, Stream m b)
y where
y :: Process m (b, Stream m b)
y = do (a -> b
f, Stream m (a -> b)
sf') <- Process m (a -> b, Stream m (a -> b))
sf
(a
a, Stream m a
sa') <- Process m (a, Stream m a)
sa
(b, Stream m b) -> Process m (b, Stream m b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> b
f a
a, Stream m (a -> b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
MonadDES m =>
Stream m (a -> b) -> Stream m a -> Stream m b
apStream Stream m (a -> b)
sf' Stream m a
sa')
apStreamM :: MonadDES m => Stream m (a -> Process m b) -> Stream m a -> Stream m b
{-# INLINABLE apStreamM #-}
apStreamM :: Stream m (a -> Process m b) -> Stream m a -> Stream m b
apStreamM (Cons Process m (a -> Process m b, Stream m (a -> Process m b))
sf) (Cons Process m (a, Stream m a)
sa) = Process m (b, Stream m b) -> Stream m b
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (b, Stream m b)
y where
y :: Process m (b, Stream m b)
y = do (a -> Process m b
f, Stream m (a -> Process m b)
sf') <- Process m (a -> Process m b, Stream m (a -> Process m b))
sf
(a
a, Stream m a
sa') <- Process m (a, Stream m a)
sa
b
x <- a -> Process m b
f a
a
(b, Stream m b) -> Process m (b, Stream m b)
forall (m :: * -> *) a. Monad m => a -> m a
return (b
x, Stream m (a -> Process m b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
MonadDES m =>
Stream m (a -> Process m b) -> Stream m a -> Stream m b
apStreamM Stream m (a -> Process m b)
sf' Stream m a
sa')
filterStream :: MonadDES m => (a -> Bool) -> Stream m a -> Stream m a
{-# INLINABLE filterStream #-}
filterStream :: (a -> Bool) -> Stream m a -> Stream m a
filterStream a -> Bool
p (Cons Process m (a, Stream m a)
s) = Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (a, Stream m a)
y where
y :: Process m (a, Stream m a)
y = do (a
a, Stream m a
xs) <- Process m (a, Stream m a)
s
if a -> Bool
p a
a
then (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, (a -> Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadDES m =>
(a -> Bool) -> Stream m a -> Stream m a
filterStream a -> Bool
p Stream m a
xs)
else let Cons Process m (a, Stream m a)
z = (a -> Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadDES m =>
(a -> Bool) -> Stream m a -> Stream m a
filterStream a -> Bool
p Stream m a
xs in Process m (a, Stream m a)
z
filterStreamM :: MonadDES m => (a -> Process m Bool) -> Stream m a -> Stream m a
{-# INLINABLE filterStreamM #-}
filterStreamM :: (a -> Process m Bool) -> Stream m a -> Stream m a
filterStreamM a -> Process m Bool
p (Cons Process m (a, Stream m a)
s) = Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (a, Stream m a)
y where
y :: Process m (a, Stream m a)
y = do (a
a, Stream m a
xs) <- Process m (a, Stream m a)
s
Bool
b <- a -> Process m Bool
p a
a
if Bool
b
then (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, (a -> Process m Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadDES m =>
(a -> Process m Bool) -> Stream m a -> Stream m a
filterStreamM a -> Process m Bool
p Stream m a
xs)
else let Cons Process m (a, Stream m a)
z = (a -> Process m Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadDES m =>
(a -> Process m Bool) -> Stream m a -> Stream m a
filterStreamM a -> Process m Bool
p Stream m a
xs in Process m (a, Stream m a)
z
leftStream :: MonadDES m => Stream m (Either a b) -> Stream m a
{-# INLINABLE leftStream #-}
leftStream :: Stream m (Either a b) -> Stream m a
leftStream (Cons Process m (Either a b, Stream m (Either a b))
s) = Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (a, Stream m a)
y where
y :: Process m (a, Stream m a)
y = do (Either a b
a, Stream m (Either a b)
xs) <- Process m (Either a b, Stream m (Either a b))
s
case Either a b
a of
Left a
a -> (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream m (Either a b) -> Stream m a
forall (m :: * -> *) a b.
MonadDES m =>
Stream m (Either a b) -> Stream m a
leftStream Stream m (Either a b)
xs)
Right b
_ -> let Cons Process m (a, Stream m a)
z = Stream m (Either a b) -> Stream m a
forall (m :: * -> *) a b.
MonadDES m =>
Stream m (Either a b) -> Stream m a
leftStream Stream m (Either a b)
xs in Process m (a, Stream m a)
z
rightStream :: MonadDES m => Stream m (Either a b) -> Stream m b
{-# INLINABLE rightStream #-}
rightStream :: Stream m (Either a b) -> Stream m b
rightStream (Cons Process m (Either a b, Stream m (Either a b))
s) = Process m (b, Stream m b) -> Stream m b
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (b, Stream m b)
y where
y :: Process m (b, Stream m b)
y = do (Either a b
a, Stream m (Either a b)
xs) <- Process m (Either a b, Stream m (Either a b))
s
case Either a b
a of
Left a
_ -> let Cons Process m (b, Stream m b)
z = Stream m (Either a b) -> Stream m b
forall (m :: * -> *) a b.
MonadDES m =>
Stream m (Either a b) -> Stream m b
rightStream Stream m (Either a b)
xs in Process m (b, Stream m b)
z
Right b
a -> (b, Stream m b) -> Process m (b, Stream m b)
forall (m :: * -> *) a. Monad m => a -> m a
return (b
a, Stream m (Either a b) -> Stream m b
forall (m :: * -> *) a b.
MonadDES m =>
Stream m (Either a b) -> Stream m b
rightStream Stream m (Either a b)
xs)
replaceLeftStream :: MonadDES m => Stream m (Either a b) -> Stream m c -> Stream m (Either c b)
{-# INLINABLE replaceLeftStream #-}
replaceLeftStream :: Stream m (Either a b) -> Stream m c -> Stream m (Either c b)
replaceLeftStream (Cons Process m (Either a b, Stream m (Either a b))
sab) (ys0 :: Stream m c
ys0@(Cons Process m (c, Stream m c)
sc)) = Process m (Either c b, Stream m (Either c b))
-> Stream m (Either c b)
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (Either c b, Stream m (Either c b))
z where
z :: Process m (Either c b, Stream m (Either c b))
z = do (Either a b
a, Stream m (Either a b)
xs) <- Process m (Either a b, Stream m (Either a b))
sab
case Either a b
a of
Left a
_ ->
do (c
b, Stream m c
ys) <- Process m (c, Stream m c)
sc
(Either c b, Stream m (Either c b))
-> Process m (Either c b, Stream m (Either c b))
forall (m :: * -> *) a. Monad m => a -> m a
return (c -> Either c b
forall a b. a -> Either a b
Left c
b, Stream m (Either a b) -> Stream m c -> Stream m (Either c b)
forall (m :: * -> *) a b c.
MonadDES m =>
Stream m (Either a b) -> Stream m c -> Stream m (Either c b)
replaceLeftStream Stream m (Either a b)
xs Stream m c
ys)
Right b
a ->
(Either c b, Stream m (Either c b))
-> Process m (Either c b, Stream m (Either c b))
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> Either c b
forall a b. b -> Either a b
Right b
a, Stream m (Either a b) -> Stream m c -> Stream m (Either c b)
forall (m :: * -> *) a b c.
MonadDES m =>
Stream m (Either a b) -> Stream m c -> Stream m (Either c b)
replaceLeftStream Stream m (Either a b)
xs Stream m c
ys0)
replaceRightStream :: MonadDES m => Stream m (Either a b) -> Stream m c -> Stream m (Either a c)
{-# INLINABLE replaceRightStream #-}
replaceRightStream :: Stream m (Either a b) -> Stream m c -> Stream m (Either a c)
replaceRightStream (Cons Process m (Either a b, Stream m (Either a b))
sab) (ys0 :: Stream m c
ys0@(Cons Process m (c, Stream m c)
sc)) = Process m (Either a c, Stream m (Either a c))
-> Stream m (Either a c)
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (Either a c, Stream m (Either a c))
z where
z :: Process m (Either a c, Stream m (Either a c))
z = do (Either a b
a, Stream m (Either a b)
xs) <- Process m (Either a b, Stream m (Either a b))
sab
case Either a b
a of
Right b
_ ->
do (c
b, Stream m c
ys) <- Process m (c, Stream m c)
sc
(Either a c, Stream m (Either a c))
-> Process m (Either a c, Stream m (Either a c))
forall (m :: * -> *) a. Monad m => a -> m a
return (c -> Either a c
forall a b. b -> Either a b
Right c
b, Stream m (Either a b) -> Stream m c -> Stream m (Either a c)
forall (m :: * -> *) a b c.
MonadDES m =>
Stream m (Either a b) -> Stream m c -> Stream m (Either a c)
replaceRightStream Stream m (Either a b)
xs Stream m c
ys)
Left a
a ->
(Either a c, Stream m (Either a c))
-> Process m (Either a c, Stream m (Either a c))
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Either a c
forall a b. a -> Either a b
Left a
a, Stream m (Either a b) -> Stream m c -> Stream m (Either a c)
forall (m :: * -> *) a b c.
MonadDES m =>
Stream m (Either a b) -> Stream m c -> Stream m (Either a c)
replaceRightStream Stream m (Either a b)
xs Stream m c
ys0)
partitionEitherStream :: MonadDES m => Stream m (Either a b) -> Simulation m (Stream m a, Stream m b)
{-# INLINABLE partitionEitherStream #-}
partitionEitherStream :: Stream m (Either a b) -> Simulation m (Stream m a, Stream m b)
partitionEitherStream Stream m (Either a b)
s =
do Stream m (Either a b)
s' <- Stream m (Either a b) -> Simulation m (Stream m (Either a b))
forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Simulation m (Stream m a)
memoStream Stream m (Either a b)
s
(Stream m a, Stream m b) -> Simulation m (Stream m a, Stream m b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream m (Either a b) -> Stream m a
forall (m :: * -> *) a b.
MonadDES m =>
Stream m (Either a b) -> Stream m a
leftStream Stream m (Either a b)
s', Stream m (Either a b) -> Stream m b
forall (m :: * -> *) a b.
MonadDES m =>
Stream m (Either a b) -> Stream m b
rightStream Stream m (Either a b)
s')
splitStream :: MonadDES m => Int -> Stream m a -> Simulation m [Stream m a]
{-# INLINABLE splitStream #-}
splitStream :: Int -> Stream m a -> Simulation m [Stream m a]
splitStream = FCFS -> Int -> Stream m a -> Simulation m [Stream m a]
forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
s -> Int -> Stream m a -> Simulation m [Stream m a]
splitStreamQueueing FCFS
FCFS
splitStreamQueueing :: (MonadDES m, EnqueueStrategy m s)
=> s
-> Int
-> Stream m a
-> Simulation m [Stream m a]
{-# INLINABLE splitStreamQueueing #-}
splitStreamQueueing :: s -> Int -> Stream m a -> Simulation m [Stream m a]
splitStreamQueueing s
s Int
n Stream m a
x =
do Ref m (Stream m a)
ref <- Stream m a -> Simulation m (Ref m (Stream m a))
forall (m :: * -> *) a. MonadRef m => a -> Simulation m (Ref m a)
newRef Stream m a
x
Resource m s
res <- s -> Int -> Simulation m (Resource m s)
forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Simulation m (Resource m s)
newResource s
s Int
1
let reader :: Process m a
reader =
Resource m s -> Process m a -> Process m a
forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m a -> Process m a
usingResource Resource m s
res (Process m a -> Process m a) -> Process m a -> Process m a
forall a b. (a -> b) -> a -> b
$
do Stream m a
p <- Event m (Stream m a) -> Process m (Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m (Stream m a) -> Process m (Stream m a))
-> Event m (Stream m a) -> Process m (Stream m a)
forall a b. (a -> b) -> a -> b
$ Ref m (Stream m a) -> Event m (Stream m a)
forall (m :: * -> *) a. MonadRef m => Ref m a -> Event m a
readRef Ref m (Stream m a)
ref
(a
a, Stream m a
xs) <- Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
p
Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$ Ref m (Stream m a) -> Stream m a -> Event m ()
forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Stream m a)
ref Stream m a
xs
a -> Process m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
[Stream m a] -> Simulation m [Stream m a]
forall (m :: * -> *) a. Monad m => a -> m a
return ([Stream m a] -> Simulation m [Stream m a])
-> [Stream m a] -> Simulation m [Stream m a]
forall a b. (a -> b) -> a -> b
$ (Int -> Stream m a) -> [Int] -> [Stream m a]
forall a b. (a -> b) -> [a] -> [b]
map (\Int
i -> Process m a -> Stream m a
forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess Process m a
reader) [Int
1..Int
n]
splitStreamPrioritising :: (MonadDES m, PriorityQueueStrategy m s p)
=> s
-> [Stream m p]
-> Stream m a
-> Simulation m [Stream m a]
{-# INLINABLE splitStreamPrioritising #-}
splitStreamPrioritising :: s -> [Stream m p] -> Stream m a -> Simulation m [Stream m a]
splitStreamPrioritising s
s [Stream m p]
ps Stream m a
x =
do Ref m (Stream m a)
ref <- Stream m a -> Simulation m (Ref m (Stream m a))
forall (m :: * -> *) a. MonadRef m => a -> Simulation m (Ref m a)
newRef Stream m a
x
Resource m s
res <- s -> Int -> Simulation m (Resource m s)
forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Simulation m (Resource m s)
newResource s
s Int
1
let stream :: Stream m a -> Stream m a
stream (Cons Process m (a, Stream m a)
p) = Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (a, Stream m a)
z where
z :: Process m (a, Stream m a)
z = do (a
p', Stream m a
ps) <- Process m (a, Stream m a)
p
a
a <- Resource m s -> a -> Process m a -> Process m a
forall (m :: * -> *) s p a.
(MonadDES m, PriorityQueueStrategy m s p) =>
Resource m s -> p -> Process m a -> Process m a
usingResourceWithPriority Resource m s
res a
p' (Process m a -> Process m a) -> Process m a -> Process m a
forall a b. (a -> b) -> a -> b
$
do Stream m a
p <- Event m (Stream m a) -> Process m (Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m (Stream m a) -> Process m (Stream m a))
-> Event m (Stream m a) -> Process m (Stream m a)
forall a b. (a -> b) -> a -> b
$ Ref m (Stream m a) -> Event m (Stream m a)
forall (m :: * -> *) a. MonadRef m => Ref m a -> Event m a
readRef Ref m (Stream m a)
ref
(a
a, Stream m a
xs) <- Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
p
Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$ Ref m (Stream m a) -> Stream m a -> Event m ()
forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Stream m a)
ref Stream m a
xs
a -> Process m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
(a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream m a -> Stream m a
stream Stream m a
ps)
[Stream m a] -> Simulation m [Stream m a]
forall (m :: * -> *) a. Monad m => a -> m a
return ([Stream m a] -> Simulation m [Stream m a])
-> [Stream m a] -> Simulation m [Stream m a]
forall a b. (a -> b) -> a -> b
$ (Stream m p -> Stream m a) -> [Stream m p] -> [Stream m a]
forall a b. (a -> b) -> [a] -> [b]
map Stream m p -> Stream m a
forall a. PriorityQueueStrategy m s a => Stream m a -> Stream m a
stream [Stream m p]
ps
splitStreamFiltering :: MonadDES m => [a -> Event m Bool] -> Stream m a -> Simulation m [Stream m a]
{-# INLINABLE splitStreamFiltering #-}
splitStreamFiltering :: [a -> Event m Bool] -> Stream m a -> Simulation m [Stream m a]
splitStreamFiltering = FCFS
-> [a -> Event m Bool] -> Stream m a -> Simulation m [Stream m a]
forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
s -> [a -> Event m Bool] -> Stream m a -> Simulation m [Stream m a]
splitStreamFilteringQueueing FCFS
FCFS
splitStreamFilteringQueueing :: (MonadDES m, EnqueueStrategy m s)
=> s
-> [a -> Event m Bool]
-> Stream m a
-> Simulation m [Stream m a]
{-# INLINABLE splitStreamFilteringQueueing #-}
splitStreamFilteringQueueing :: s -> [a -> Event m Bool] -> Stream m a -> Simulation m [Stream m a]
splitStreamFilteringQueueing s
s [a -> Event m Bool]
preds Stream m a
x =
do Ref m (Stream m a)
ref <- Simulation m (Ref m (Stream m a))
-> Simulation m (Ref m (Stream m a))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Ref m (Stream m a))
-> Simulation m (Ref m (Stream m a)))
-> Simulation m (Ref m (Stream m a))
-> Simulation m (Ref m (Stream m a))
forall a b. (a -> b) -> a -> b
$ Stream m a -> Simulation m (Ref m (Stream m a))
forall (m :: * -> *) a. MonadRef m => a -> Simulation m (Ref m a)
newRef Stream m a
x
Resource m s
res <- s -> Int -> Simulation m (Resource m s)
forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Simulation m (Resource m s)
newResource s
s Int
1
let reader :: (a -> Event m Bool) -> Process m a
reader a -> Event m Bool
pred =
do Maybe a
a <-
Resource m s -> Process m (Maybe a) -> Process m (Maybe a)
forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m a -> Process m a
usingResource Resource m s
res (Process m (Maybe a) -> Process m (Maybe a))
-> Process m (Maybe a) -> Process m (Maybe a)
forall a b. (a -> b) -> a -> b
$
do Stream m a
p <- Event m (Stream m a) -> Process m (Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m (Stream m a) -> Process m (Stream m a))
-> Event m (Stream m a) -> Process m (Stream m a)
forall a b. (a -> b) -> a -> b
$ Ref m (Stream m a) -> Event m (Stream m a)
forall (m :: * -> *) a. MonadRef m => Ref m a -> Event m a
readRef Ref m (Stream m a)
ref
(a
a, Stream m a
xs) <- Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
p
Event m (Maybe a) -> Process m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m (Maybe a) -> Process m (Maybe a))
-> Event m (Maybe a) -> Process m (Maybe a)
forall a b. (a -> b) -> a -> b
$
do Bool
f <- a -> Event m Bool
pred a
a
if Bool
f
then do Ref m (Stream m a) -> Stream m a -> Event m ()
forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Stream m a)
ref Stream m a
xs
Maybe a -> Event m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> Event m (Maybe a)) -> Maybe a -> Event m (Maybe a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just a
a
else do Ref m (Stream m a) -> Stream m a -> Event m ()
forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Stream m a)
ref (Stream m a -> Event m ()) -> Stream m a -> Event m ()
forall a b. (a -> b) -> a -> b
$ Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons ((a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream m a
xs))
Maybe a -> Event m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
case Maybe a
a of
Just a
a -> a -> Process m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
Maybe a
Nothing -> (a -> Event m Bool) -> Process m a
reader a -> Event m Bool
pred
[Stream m a] -> Simulation m [Stream m a]
forall (m :: * -> *) a. Monad m => a -> m a
return ([Stream m a] -> Simulation m [Stream m a])
-> [Stream m a] -> Simulation m [Stream m a]
forall a b. (a -> b) -> a -> b
$ ((a -> Event m Bool) -> Stream m a)
-> [a -> Event m Bool] -> [Stream m a]
forall a b. (a -> b) -> [a] -> [b]
map (Process m a -> Stream m a
forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess (Process m a -> Stream m a)
-> ((a -> Event m Bool) -> Process m a)
-> (a -> Event m Bool)
-> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> Event m Bool) -> Process m a
reader) [a -> Event m Bool]
preds
concatStreams :: MonadDES m => [Stream m a] -> Stream m a
{-# INLINABLE concatStreams #-}
concatStreams :: [Stream m a] -> Stream m a
concatStreams = FCFS -> [Stream m a] -> Stream m a
forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
s -> [Stream m a] -> Stream m a
concatQueuedStreams FCFS
FCFS
concatQueuedStreams :: (MonadDES m, EnqueueStrategy m s)
=> s
-> [Stream m a]
-> Stream m a
{-# INLINABLE concatQueuedStreams #-}
concatQueuedStreams :: s -> [Stream m a] -> Stream m a
concatQueuedStreams s
s [Stream m a]
streams = Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (a, Stream m a)
z where
z :: Process m (a, Stream m a)
z = do Resource m FCFS
reading <- Simulation m (Resource m FCFS) -> Process m (Resource m FCFS)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Resource m FCFS) -> Process m (Resource m FCFS))
-> Simulation m (Resource m FCFS) -> Process m (Resource m FCFS)
forall a b. (a -> b) -> a -> b
$ FCFS -> Int -> Maybe Int -> Simulation m (Resource m FCFS)
forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Maybe Int -> Simulation m (Resource m s)
newResourceWithMaxCount FCFS
FCFS Int
0 (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1)
Resource m s
writing <- Simulation m (Resource m s) -> Process m (Resource m s)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Resource m s) -> Process m (Resource m s))
-> Simulation m (Resource m s) -> Process m (Resource m s)
forall a b. (a -> b) -> a -> b
$ s -> Int -> Maybe Int -> Simulation m (Resource m s)
forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Maybe Int -> Simulation m (Resource m s)
newResourceWithMaxCount s
s Int
1 (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1)
Resource m FCFS
conting <- Simulation m (Resource m FCFS) -> Process m (Resource m FCFS)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Resource m FCFS) -> Process m (Resource m FCFS))
-> Simulation m (Resource m FCFS) -> Process m (Resource m FCFS)
forall a b. (a -> b) -> a -> b
$ FCFS -> Int -> Maybe Int -> Simulation m (Resource m FCFS)
forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Maybe Int -> Simulation m (Resource m s)
newResourceWithMaxCount FCFS
FCFS Int
0 (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1)
Ref m (Maybe a)
ref <- Simulation m (Ref m (Maybe a)) -> Process m (Ref m (Maybe a))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Ref m (Maybe a)) -> Process m (Ref m (Maybe a)))
-> Simulation m (Ref m (Maybe a)) -> Process m (Ref m (Maybe a))
forall a b. (a -> b) -> a -> b
$ Maybe a -> Simulation m (Ref m (Maybe a))
forall (m :: * -> *) a. MonadRef m => a -> Simulation m (Ref m a)
newRef Maybe a
forall a. Maybe a
Nothing
let writer :: Stream m a -> Process m b
writer Stream m a
p =
do (a
a, Stream m a
xs) <- Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
p
Resource m s -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m ()
requestResource Resource m s
writing
Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$ Ref m (Maybe a) -> Maybe a -> Event m ()
forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Maybe a)
ref (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
Resource m FCFS -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Process m ()
releaseResource Resource m FCFS
reading
Resource m FCFS -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m ()
requestResource Resource m FCFS
conting
Stream m a -> Process m b
writer Stream m a
xs
reader :: Process m a
reader =
do Resource m FCFS -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m ()
requestResource Resource m FCFS
reading
Just a
a <- Event m (Maybe a) -> Process m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m (Maybe a) -> Process m (Maybe a))
-> Event m (Maybe a) -> Process m (Maybe a)
forall a b. (a -> b) -> a -> b
$ Ref m (Maybe a) -> Event m (Maybe a)
forall (m :: * -> *) a. MonadRef m => Ref m a -> Event m a
readRef Ref m (Maybe a)
ref
Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$ Ref m (Maybe a) -> Maybe a -> Event m ()
forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Maybe a)
ref Maybe a
forall a. Maybe a
Nothing
Resource m s -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Process m ()
releaseResource Resource m s
writing
a -> Process m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
[Stream m a] -> (Stream m a -> Process m ()) -> Process m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Stream m a]
streams ((Stream m a -> Process m ()) -> Process m ())
-> (Stream m a -> Process m ()) -> Process m ()
forall a b. (a -> b) -> a -> b
$ Process m () -> Process m ()
forall (m :: * -> *). MonadDES m => Process m () -> Process m ()
spawnProcess (Process m () -> Process m ())
-> (Stream m a -> Process m ()) -> Stream m a -> Process m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m a -> Process m ()
forall b. Stream m a -> Process m b
writer
a
a <- Process m a
reader
let xs :: Stream m a
xs = Process m a -> Stream m a
forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess (Resource m FCFS -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Process m ()
releaseResource Resource m FCFS
conting Process m () -> Process m a -> Process m a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Process m a
reader)
(a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream m a
xs)
concatPriorityStreams :: (MonadDES m, PriorityQueueStrategy m s p)
=> s
-> [Stream m (p, a)]
-> Stream m a
{-# INLINABLE concatPriorityStreams #-}
concatPriorityStreams :: s -> [Stream m (p, a)] -> Stream m a
concatPriorityStreams s
s [Stream m (p, a)]
streams = Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (a, Stream m a)
z where
z :: Process m (a, Stream m a)
z = do Resource m FCFS
reading <- Simulation m (Resource m FCFS) -> Process m (Resource m FCFS)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Resource m FCFS) -> Process m (Resource m FCFS))
-> Simulation m (Resource m FCFS) -> Process m (Resource m FCFS)
forall a b. (a -> b) -> a -> b
$ FCFS -> Int -> Maybe Int -> Simulation m (Resource m FCFS)
forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Maybe Int -> Simulation m (Resource m s)
newResourceWithMaxCount FCFS
FCFS Int
0 (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1)
Resource m s
writing <- Simulation m (Resource m s) -> Process m (Resource m s)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Resource m s) -> Process m (Resource m s))
-> Simulation m (Resource m s) -> Process m (Resource m s)
forall a b. (a -> b) -> a -> b
$ s -> Int -> Maybe Int -> Simulation m (Resource m s)
forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Maybe Int -> Simulation m (Resource m s)
newResourceWithMaxCount s
s Int
1 (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1)
Resource m FCFS
conting <- Simulation m (Resource m FCFS) -> Process m (Resource m FCFS)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Resource m FCFS) -> Process m (Resource m FCFS))
-> Simulation m (Resource m FCFS) -> Process m (Resource m FCFS)
forall a b. (a -> b) -> a -> b
$ FCFS -> Int -> Maybe Int -> Simulation m (Resource m FCFS)
forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Maybe Int -> Simulation m (Resource m s)
newResourceWithMaxCount FCFS
FCFS Int
0 (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1)
Ref m (Maybe a)
ref <- Simulation m (Ref m (Maybe a)) -> Process m (Ref m (Maybe a))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Ref m (Maybe a)) -> Process m (Ref m (Maybe a)))
-> Simulation m (Ref m (Maybe a)) -> Process m (Ref m (Maybe a))
forall a b. (a -> b) -> a -> b
$ Maybe a -> Simulation m (Ref m (Maybe a))
forall (m :: * -> *) a. MonadRef m => a -> Simulation m (Ref m a)
newRef Maybe a
forall a. Maybe a
Nothing
let writer :: Stream m (p, a) -> Process m b
writer Stream m (p, a)
p =
do ((p
priority, a
a), Stream m (p, a)
xs) <- Stream m (p, a) -> Process m ((p, a), Stream m (p, a))
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m (p, a)
p
Resource m s -> p -> Process m ()
forall (m :: * -> *) s p.
(MonadDES m, PriorityQueueStrategy m s p) =>
Resource m s -> p -> Process m ()
requestResourceWithPriority Resource m s
writing p
priority
Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$ Ref m (Maybe a) -> Maybe a -> Event m ()
forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Maybe a)
ref (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
Resource m FCFS -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Process m ()
releaseResource Resource m FCFS
reading
Resource m FCFS -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m ()
requestResource Resource m FCFS
conting
Stream m (p, a) -> Process m b
writer Stream m (p, a)
xs
reader :: Process m a
reader =
do Resource m FCFS -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m ()
requestResource Resource m FCFS
reading
Just a
a <- Event m (Maybe a) -> Process m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m (Maybe a) -> Process m (Maybe a))
-> Event m (Maybe a) -> Process m (Maybe a)
forall a b. (a -> b) -> a -> b
$ Ref m (Maybe a) -> Event m (Maybe a)
forall (m :: * -> *) a. MonadRef m => Ref m a -> Event m a
readRef Ref m (Maybe a)
ref
Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$ Ref m (Maybe a) -> Maybe a -> Event m ()
forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Maybe a)
ref Maybe a
forall a. Maybe a
Nothing
Resource m s -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Process m ()
releaseResource Resource m s
writing
a -> Process m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
[Stream m (p, a)]
-> (Stream m (p, a) -> Process m ()) -> Process m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Stream m (p, a)]
streams ((Stream m (p, a) -> Process m ()) -> Process m ())
-> (Stream m (p, a) -> Process m ()) -> Process m ()
forall a b. (a -> b) -> a -> b
$ Process m () -> Process m ()
forall (m :: * -> *). MonadDES m => Process m () -> Process m ()
spawnProcess (Process m () -> Process m ())
-> (Stream m (p, a) -> Process m ())
-> Stream m (p, a)
-> Process m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m (p, a) -> Process m ()
forall p b.
PriorityQueueStrategy m s p =>
Stream m (p, a) -> Process m b
writer
a
a <- Process m a
reader
let xs :: Stream m a
xs = Process m a -> Stream m a
forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess (Resource m FCFS -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Process m ()
releaseResource Resource m FCFS
conting Process m () -> Process m a -> Process m a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Process m a
reader)
(a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream m a
xs)
mergeStreams :: MonadDES m => Stream m a -> Stream m a -> Stream m a
{-# INLINABLE mergeStreams #-}
mergeStreams :: Stream m a -> Stream m a -> Stream m a
mergeStreams = FCFS -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
s -> Stream m a -> Stream m a -> Stream m a
mergeQueuedStreams FCFS
FCFS
mergeQueuedStreams :: (MonadDES m, EnqueueStrategy m s)
=> s
-> Stream m a
-> Stream m a
-> Stream m a
{-# INLINABLE mergeQueuedStreams #-}
mergeQueuedStreams :: s -> Stream m a -> Stream m a -> Stream m a
mergeQueuedStreams s
s Stream m a
x Stream m a
y = s -> [Stream m a] -> Stream m a
forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
s -> [Stream m a] -> Stream m a
concatQueuedStreams s
s [Stream m a
x, Stream m a
y]
mergePriorityStreams :: (MonadDES m, PriorityQueueStrategy m s p)
=> s
-> Stream m (p, a)
-> Stream m (p, a)
-> Stream m a
{-# INLINABLE mergePriorityStreams #-}
mergePriorityStreams :: s -> Stream m (p, a) -> Stream m (p, a) -> Stream m a
mergePriorityStreams s
s Stream m (p, a)
x Stream m (p, a)
y = s -> [Stream m (p, a)] -> Stream m a
forall (m :: * -> *) s p a.
(MonadDES m, PriorityQueueStrategy m s p) =>
s -> [Stream m (p, a)] -> Stream m a
concatPriorityStreams s
s [Stream m (p, a)
x, Stream m (p, a)
y]
emptyStream :: MonadDES m => Stream m a
{-# INLINABLE emptyStream #-}
emptyStream :: Stream m a
emptyStream = Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (a, Stream m a)
forall (m :: * -> *) a. MonadDES m => Process m a
neverProcess
consumeStream :: MonadDES m => (a -> Process m ()) -> Stream m a -> Process m ()
{-# INLINABLE consumeStream #-}
consumeStream :: (a -> Process m ()) -> Stream m a -> Process m ()
consumeStream a -> Process m ()
f (Cons Process m (a, Stream m a)
s) =
do (a
a, Stream m a
xs) <- Process m (a, Stream m a)
s
a -> Process m ()
f a
a
(a -> Process m ()) -> Stream m a -> Process m ()
forall (m :: * -> *) a.
MonadDES m =>
(a -> Process m ()) -> Stream m a -> Process m ()
consumeStream a -> Process m ()
f Stream m a
xs
sinkStream :: MonadDES m => Stream m a -> Process m ()
{-# INLINABLE sinkStream #-}
sinkStream :: Stream m a -> Process m ()
sinkStream (Cons Process m (a, Stream m a)
s) =
do (a
a, Stream m a
xs) <- Process m (a, Stream m a)
s
Stream m a -> Process m ()
forall (m :: * -> *) a. MonadDES m => Stream m a -> Process m ()
sinkStream Stream m a
xs
prefetchStream :: MonadDES m => Stream m a -> Stream m a
{-# INLINABLE prefetchStream #-}
prefetchStream :: Stream m a -> Stream m a
prefetchStream Stream m a
s = Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (a, Stream m a)
z where
z :: Process m (a, Stream m a)
z = do Resource m FCFS
reading <- Simulation m (Resource m FCFS) -> Process m (Resource m FCFS)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Resource m FCFS) -> Process m (Resource m FCFS))
-> Simulation m (Resource m FCFS) -> Process m (Resource m FCFS)
forall a b. (a -> b) -> a -> b
$ FCFS -> Int -> Maybe Int -> Simulation m (Resource m FCFS)
forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Maybe Int -> Simulation m (Resource m s)
newResourceWithMaxCount FCFS
FCFS Int
0 (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1)
Resource m FCFS
writing <- Simulation m (Resource m FCFS) -> Process m (Resource m FCFS)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Resource m FCFS) -> Process m (Resource m FCFS))
-> Simulation m (Resource m FCFS) -> Process m (Resource m FCFS)
forall a b. (a -> b) -> a -> b
$ FCFS -> Int -> Maybe Int -> Simulation m (Resource m FCFS)
forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Maybe Int -> Simulation m (Resource m s)
newResourceWithMaxCount FCFS
FCFS Int
1 (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1)
Ref m (Maybe a)
ref <- Simulation m (Ref m (Maybe a)) -> Process m (Ref m (Maybe a))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Ref m (Maybe a)) -> Process m (Ref m (Maybe a)))
-> Simulation m (Ref m (Maybe a)) -> Process m (Ref m (Maybe a))
forall a b. (a -> b) -> a -> b
$ Maybe a -> Simulation m (Ref m (Maybe a))
forall (m :: * -> *) a. MonadRef m => a -> Simulation m (Ref m a)
newRef Maybe a
forall a. Maybe a
Nothing
let writer :: Stream m a -> Process m b
writer Stream m a
p =
do (a
a, Stream m a
xs) <- Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
p
Resource m FCFS -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m ()
requestResource Resource m FCFS
writing
Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$ Ref m (Maybe a) -> Maybe a -> Event m ()
forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Maybe a)
ref (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
Resource m FCFS -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Process m ()
releaseResource Resource m FCFS
reading
Stream m a -> Process m b
writer Stream m a
xs
reader :: Process m a
reader =
do Resource m FCFS -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m ()
requestResource Resource m FCFS
reading
Just a
a <- Event m (Maybe a) -> Process m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m (Maybe a) -> Process m (Maybe a))
-> Event m (Maybe a) -> Process m (Maybe a)
forall a b. (a -> b) -> a -> b
$ Ref m (Maybe a) -> Event m (Maybe a)
forall (m :: * -> *) a. MonadRef m => Ref m a -> Event m a
readRef Ref m (Maybe a)
ref
Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$ Ref m (Maybe a) -> Maybe a -> Event m ()
forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Maybe a)
ref Maybe a
forall a. Maybe a
Nothing
Resource m FCFS -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Process m ()
releaseResource Resource m FCFS
writing
a -> Process m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
Process m () -> Process m ()
forall (m :: * -> *). MonadDES m => Process m () -> Process m ()
spawnProcess (Process m () -> Process m ()) -> Process m () -> Process m ()
forall a b. (a -> b) -> a -> b
$ Stream m a -> Process m ()
forall b. Stream m a -> Process m b
writer Stream m a
s
Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream (Stream m a -> Process m (a, Stream m a))
-> Stream m a -> Process m (a, Stream m a)
forall a b. (a -> b) -> a -> b
$ Process m a -> Stream m a
forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess Process m a
reader
queuedSignalStream :: MonadDES m
=> (a -> Event m ())
-> Process m a
-> Signal m a
-> Composite m (Stream m a)
{-# INLINABLE queuedSignalStream #-}
queuedSignalStream :: (a -> Event m ())
-> Process m a -> Signal m a -> Composite m (Stream m a)
queuedSignalStream a -> Event m ()
enqueue Process m a
dequeue Signal m a
s =
do DisposableEvent m
h <- Event m (DisposableEvent m) -> Composite m (DisposableEvent m)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m (DisposableEvent m) -> Composite m (DisposableEvent m))
-> Event m (DisposableEvent m) -> Composite m (DisposableEvent m)
forall a b. (a -> b) -> a -> b
$
Signal m a -> (a -> Event m ()) -> Event m (DisposableEvent m)
forall (m :: * -> *) a.
Signal m a -> (a -> Event m ()) -> Event m (DisposableEvent m)
handleSignal Signal m a
s a -> Event m ()
enqueue
DisposableEvent m -> Composite m ()
forall (m :: * -> *).
Monad m =>
DisposableEvent m -> Composite m ()
disposableComposite DisposableEvent m
h
Stream m a -> Composite m (Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream m a -> Composite m (Stream m a))
-> Stream m a -> Composite m (Stream m a)
forall a b. (a -> b) -> a -> b
$ Process m a -> Stream m a
forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess Process m a
dequeue
signalStream :: MonadDES m => Signal m a -> Composite m (Stream m a)
{-# INLINABLE signalStream #-}
signalStream :: Signal m a -> Composite m (Stream m a)
signalStream Signal m a
s =
do FCFSQueue m a
q <- Simulation m (FCFSQueue m a) -> Composite m (FCFSQueue m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation Simulation m (FCFSQueue m a)
forall (m :: * -> *) a. MonadDES m => Simulation m (FCFSQueue m a)
IQ.newFCFSQueue
(a -> Event m ())
-> Process m a -> Signal m a -> Composite m (Stream m a)
forall (m :: * -> *) a.
MonadDES m =>
(a -> Event m ())
-> Process m a -> Signal m a -> Composite m (Stream m a)
queuedSignalStream (FCFSQueue m a -> a -> Event m ()
forall (m :: * -> *) sm so a.
(MonadDES m, EnqueueStrategy m sm, DequeueStrategy m so) =>
Queue m sm so a -> a -> Event m ()
IQ.enqueue FCFSQueue m a
q) (FCFSQueue m a -> Process m a
forall (m :: * -> *) sm so a.
(MonadDES m, DequeueStrategy m sm, EnqueueStrategy m so) =>
Queue m sm so a -> Process m a
IQ.dequeue FCFSQueue m a
q) Signal m a
s
streamSignal :: MonadDES m => Stream m a -> Composite m (Signal m a)
{-# INLINABLE streamSignal #-}
streamSignal :: Stream m a -> Composite m (Signal m a)
streamSignal Stream m a
z =
do SignalSource m a
s <- Simulation m (SignalSource m a) -> Composite m (SignalSource m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation Simulation m (SignalSource m a)
forall (m :: * -> *) a.
MonadDES m =>
Simulation m (SignalSource m a)
newSignalSource
ProcessId m
pid <- Simulation m (ProcessId m) -> Composite m (ProcessId m)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation Simulation m (ProcessId m)
forall (m :: * -> *). MonadDES m => Simulation m (ProcessId m)
newProcessId
Event m () -> Composite m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Composite m ()) -> Event m () -> Composite m ()
forall a b. (a -> b) -> a -> b
$
ProcessId m -> Process m () -> Event m ()
forall (m :: * -> *).
MonadDES m =>
ProcessId m -> Process m () -> Event m ()
runProcessUsingId ProcessId m
pid (Process m () -> Event m ()) -> Process m () -> Event m ()
forall a b. (a -> b) -> a -> b
$
(a -> Process m ()) -> Stream m a -> Process m ()
forall (m :: * -> *) a.
MonadDES m =>
(a -> Process m ()) -> Stream m a -> Process m ()
consumeStream (Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ())
-> (a -> Event m ()) -> a -> Process m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SignalSource m a -> a -> Event m ()
forall (m :: * -> *) a. SignalSource m a -> a -> Event m ()
triggerSignal SignalSource m a
s) Stream m a
z
DisposableEvent m -> Composite m ()
forall (m :: * -> *).
Monad m =>
DisposableEvent m -> Composite m ()
disposableComposite (DisposableEvent m -> Composite m ())
-> DisposableEvent m -> Composite m ()
forall a b. (a -> b) -> a -> b
$
Event m () -> DisposableEvent m
forall (m :: * -> *). Event m () -> DisposableEvent m
DisposableEvent (Event m () -> DisposableEvent m)
-> Event m () -> DisposableEvent m
forall a b. (a -> b) -> a -> b
$
ProcessId m -> Event m ()
forall (m :: * -> *). MonadDES m => ProcessId m -> Event m ()
cancelProcessWithId ProcessId m
pid
Signal m a -> Composite m (Signal m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Signal m a -> Composite m (Signal m a))
-> Signal m a -> Composite m (Signal m a)
forall a b. (a -> b) -> a -> b
$ SignalSource m a -> Signal m a
forall (m :: * -> *) a. SignalSource m a -> Signal m a
publishSignal SignalSource m a
s
arrivalStream :: MonadDES m => Stream m a -> Stream m (Arrival a)
{-# INLINABLE arrivalStream #-}
arrivalStream :: Stream m a -> Stream m (Arrival a)
arrivalStream Stream m a
s = Process m (Arrival a, Stream m (Arrival a)) -> Stream m (Arrival a)
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (Arrival a, Stream m (Arrival a))
-> Stream m (Arrival a))
-> Process m (Arrival a, Stream m (Arrival a))
-> Stream m (Arrival a)
forall a b. (a -> b) -> a -> b
$ Stream m a
-> Maybe Double -> Process m (Arrival a, Stream m (Arrival a))
forall (m :: * -> *) a.
MonadDES m =>
Stream m a
-> Maybe Double -> Process m (Arrival a, Stream m (Arrival a))
loop Stream m a
s Maybe Double
forall a. Maybe a
Nothing where
loop :: Stream m a
-> Maybe Double -> Process m (Arrival a, Stream m (Arrival a))
loop Stream m a
s Maybe Double
t0 = do (a
a, Stream m a
xs) <- Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
s
Double
t <- Dynamics m Double -> Process m Double
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
DynamicsLift t m =>
Dynamics m a -> t m a
liftDynamics Dynamics m Double
forall (m :: * -> *). Monad m => Dynamics m Double
time
let b :: Arrival a
b = Arrival :: forall a. a -> Double -> Maybe Double -> Arrival a
Arrival { arrivalValue :: a
arrivalValue = a
a,
arrivalTime :: Double
arrivalTime = Double
t,
arrivalDelay :: Maybe Double
arrivalDelay =
case Maybe Double
t0 of
Maybe Double
Nothing -> Maybe Double
forall a. Maybe a
Nothing
Just t0 -> Double -> Maybe Double
forall a. a -> Maybe a
Just (Double
t Double -> Double -> Double
forall a. Num a => a -> a -> a
- Double
t0) }
(Arrival a, Stream m (Arrival a))
-> Process m (Arrival a, Stream m (Arrival a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Arrival a
b, Process m (Arrival a, Stream m (Arrival a)) -> Stream m (Arrival a)
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (Arrival a, Stream m (Arrival a))
-> Stream m (Arrival a))
-> Process m (Arrival a, Stream m (Arrival a))
-> Stream m (Arrival a)
forall a b. (a -> b) -> a -> b
$ Stream m a
-> Maybe Double -> Process m (Arrival a, Stream m (Arrival a))
loop Stream m a
xs (Double -> Maybe Double
forall a. a -> Maybe a
Just Double
t))
delayStream :: MonadDES m => a -> Stream m a -> Stream m a
{-# INLINABLE delayStream #-}
delayStream :: a -> Stream m a -> Stream m a
delayStream a
a0 Stream m a
s = Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (a, Stream m a) -> Stream m a)
-> Process m (a, Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a0, Stream m a
s)
singletonStream :: MonadDES m => a -> Stream m a
{-# INLINABLE singletonStream #-}
singletonStream :: a -> Stream m a
singletonStream a
a = Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (a, Stream m a) -> Stream m a)
-> Process m (a, Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream m a
forall (m :: * -> *) a. MonadDES m => Stream m a
emptyStream)
joinStream :: MonadDES m => Process m (Stream m a) -> Stream m a
{-# INLINABLE joinStream #-}
joinStream :: Process m (Stream m a) -> Stream m a
joinStream Process m (Stream m a)
m = Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (a, Stream m a) -> Stream m a)
-> Process m (a, Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ Process m (Stream m a)
m Process m (Stream m a)
-> (Stream m a -> Process m (a, Stream m a))
-> Process m (a, Stream m a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream
failoverStream :: MonadDES m => [Stream m a] -> Stream m a
{-# INLINABLE failoverStream #-}
failoverStream :: [Stream m a] -> Stream m a
failoverStream [Stream m a]
ps = Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (a, Stream m a)
z where
z :: Process m (a, Stream m a)
z = do Resource m FCFS
reading <- Simulation m (Resource m FCFS) -> Process m (Resource m FCFS)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Resource m FCFS) -> Process m (Resource m FCFS))
-> Simulation m (Resource m FCFS) -> Process m (Resource m FCFS)
forall a b. (a -> b) -> a -> b
$ FCFS -> Int -> Maybe Int -> Simulation m (Resource m FCFS)
forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Maybe Int -> Simulation m (Resource m s)
newResourceWithMaxCount FCFS
FCFS Int
0 (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1)
Resource m FCFS
writing <- Simulation m (Resource m FCFS) -> Process m (Resource m FCFS)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Resource m FCFS) -> Process m (Resource m FCFS))
-> Simulation m (Resource m FCFS) -> Process m (Resource m FCFS)
forall a b. (a -> b) -> a -> b
$ FCFS -> Int -> Maybe Int -> Simulation m (Resource m FCFS)
forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Maybe Int -> Simulation m (Resource m s)
newResourceWithMaxCount FCFS
FCFS Int
0 (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1)
Ref m (Maybe a)
ref <- Simulation m (Ref m (Maybe a)) -> Process m (Ref m (Maybe a))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Ref m (Maybe a)) -> Process m (Ref m (Maybe a)))
-> Simulation m (Ref m (Maybe a)) -> Process m (Ref m (Maybe a))
forall a b. (a -> b) -> a -> b
$ Maybe a -> Simulation m (Ref m (Maybe a))
forall (m :: * -> *) a. MonadRef m => a -> Simulation m (Ref m a)
newRef Maybe a
forall a. Maybe a
Nothing
ProcessId m
pid <- Process m (ProcessId m)
forall (m :: * -> *). MonadDES m => Process m (ProcessId m)
processId
let writer :: Stream m a -> Process m b
writer Stream m a
p =
do Resource m FCFS -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m ()
requestResource Resource m FCFS
writing
ProcessId m
pid' <- Process m (ProcessId m)
forall (m :: * -> *). MonadDES m => Process m (ProcessId m)
processId
(a
a, Stream m a
xs) <-
Process m (a, Stream m a)
-> Process m () -> Process m (a, Stream m a)
forall (m :: * -> *) a b.
MonadDES m =>
Process m a -> Process m b -> Process m a
finallyProcess (Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
p) (Process m () -> Process m (a, Stream m a))
-> Process m () -> Process m (a, Stream m a)
forall a b. (a -> b) -> a -> b
$
Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$
do Bool
cancelled' <- ProcessId m -> Event m Bool
forall (m :: * -> *). MonadDES m => ProcessId m -> Event m Bool
processCancelled ProcessId m
pid'
Bool -> Event m () -> Event m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
cancelled' (Event m () -> Event m ()) -> Event m () -> Event m ()
forall a b. (a -> b) -> a -> b
$
Resource m FCFS -> Event m ()
forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Event m ()
releaseResourceWithinEvent Resource m FCFS
writing
Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$ Ref m (Maybe a) -> Maybe a -> Event m ()
forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Maybe a)
ref (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
Resource m FCFS -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Process m ()
releaseResource Resource m FCFS
reading
Stream m a -> Process m b
writer Stream m a
xs
reader :: Process m a
reader =
do Resource m FCFS -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Process m ()
releaseResource Resource m FCFS
writing
Resource m FCFS -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m ()
requestResource Resource m FCFS
reading
Just a
a <- Event m (Maybe a) -> Process m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m (Maybe a) -> Process m (Maybe a))
-> Event m (Maybe a) -> Process m (Maybe a)
forall a b. (a -> b) -> a -> b
$ Ref m (Maybe a) -> Event m (Maybe a)
forall (m :: * -> *) a. MonadRef m => Ref m a -> Event m a
readRef Ref m (Maybe a)
ref
Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$ Ref m (Maybe a) -> Maybe a -> Event m ()
forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Maybe a)
ref Maybe a
forall a. Maybe a
Nothing
a -> Process m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
loop :: [Stream m a] -> Process m ()
loop [] = () -> Process m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
loop (Stream m a
p: [Stream m a]
ps) =
do ProcessId m
pid' <- Process m (ProcessId m)
forall (m :: * -> *). MonadDES m => Process m (ProcessId m)
processId
DisposableEvent m
h' <- Event m (DisposableEvent m) -> Process m (DisposableEvent m)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m (DisposableEvent m) -> Process m (DisposableEvent m))
-> Event m (DisposableEvent m) -> Process m (DisposableEvent m)
forall a b. (a -> b) -> a -> b
$
Signal m () -> (() -> Event m ()) -> Event m (DisposableEvent m)
forall (m :: * -> *) a.
Signal m a -> (a -> Event m ()) -> Event m (DisposableEvent m)
handleSignal (ProcessId m -> Signal m ()
forall (m :: * -> *). MonadDES m => ProcessId m -> Signal m ()
processCancelling ProcessId m
pid) ((() -> Event m ()) -> Event m (DisposableEvent m))
-> (() -> Event m ()) -> Event m (DisposableEvent m)
forall a b. (a -> b) -> a -> b
$ \() ->
ProcessId m -> Event m ()
forall (m :: * -> *). MonadDES m => ProcessId m -> Event m ()
cancelProcessWithId ProcessId m
pid'
Process m () -> Process m () -> Process m ()
forall (m :: * -> *) a b.
MonadDES m =>
Process m a -> Process m b -> Process m a
finallyProcess (Stream m a -> Process m ()
forall b. Stream m a -> Process m b
writer Stream m a
p) (Process m () -> Process m ()) -> Process m () -> Process m ()
forall a b. (a -> b) -> a -> b
$
Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$
do DisposableEvent m -> Event m ()
forall (m :: * -> *). DisposableEvent m -> Event m ()
disposeEvent DisposableEvent m
h'
Bool
cancelled <- ProcessId m -> Event m Bool
forall (m :: * -> *). MonadDES m => ProcessId m -> Event m Bool
processCancelled ProcessId m
pid
Bool -> Event m () -> Event m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
cancelled (Event m () -> Event m ()) -> Event m () -> Event m ()
forall a b. (a -> b) -> a -> b
$
do Bool
cancelled' <- ProcessId m -> Event m Bool
forall (m :: * -> *). MonadDES m => ProcessId m -> Event m Bool
processCancelled ProcessId m
pid'
Bool -> Event m () -> Event m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
cancelled' (Event m () -> Event m ()) -> Event m () -> Event m ()
forall a b. (a -> b) -> a -> b
$
[Char] -> Event m ()
forall a. HasCallStack => [Char] -> a
error [Char]
"Expected the sub-process to be cancelled: failoverStream"
Process m () -> Event m ()
forall (m :: * -> *). MonadDES m => Process m () -> Event m ()
runProcess (Process m () -> Event m ()) -> Process m () -> Event m ()
forall a b. (a -> b) -> a -> b
$ [Stream m a] -> Process m ()
loop [Stream m a]
ps
Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$ Process m () -> Event m ()
forall (m :: * -> *). MonadDES m => Process m () -> Event m ()
runProcess (Process m () -> Event m ()) -> Process m () -> Event m ()
forall a b. (a -> b) -> a -> b
$ [Stream m a] -> Process m ()
loop [Stream m a]
ps
Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream (Stream m a -> Process m (a, Stream m a))
-> Stream m a -> Process m (a, Stream m a)
forall a b. (a -> b) -> a -> b
$ Process m a -> Stream m a
forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess Process m a
reader
takeStream :: MonadDES m => Int -> Stream m a -> Stream m a
{-# INLINABLE takeStream #-}
takeStream :: Int -> Stream m a -> Stream m a
takeStream Int
n Stream m a
s
| Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 = Stream m a
forall (m :: * -> *) a. MonadDES m => Stream m a
emptyStream
| Bool
otherwise =
Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (a, Stream m a) -> Stream m a)
-> Process m (a, Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$
do (a
a, Stream m a
xs) <- Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
s
(a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Int -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadDES m =>
Int -> Stream m a -> Stream m a
takeStream (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) Stream m a
xs)
takeStreamWhile :: MonadDES m => (a -> Bool) -> Stream m a -> Stream m a
{-# INLINABLE takeStreamWhile #-}
takeStreamWhile :: (a -> Bool) -> Stream m a -> Stream m a
takeStreamWhile a -> Bool
p Stream m a
s =
Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (a, Stream m a) -> Stream m a)
-> Process m (a, Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$
do (a
a, Stream m a
xs) <- Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
s
if a -> Bool
p a
a
then (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, (a -> Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadDES m =>
(a -> Bool) -> Stream m a -> Stream m a
takeStreamWhile a -> Bool
p Stream m a
xs)
else Process m (a, Stream m a)
forall (m :: * -> *) a. MonadDES m => Process m a
neverProcess
takeStreamWhileM :: MonadDES m => (a -> Process m Bool) -> Stream m a -> Stream m a
{-# INLINABLE takeStreamWhileM #-}
takeStreamWhileM :: (a -> Process m Bool) -> Stream m a -> Stream m a
takeStreamWhileM a -> Process m Bool
p Stream m a
s =
Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (a, Stream m a) -> Stream m a)
-> Process m (a, Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$
do (a
a, Stream m a
xs) <- Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
s
Bool
f <- a -> Process m Bool
p a
a
if Bool
f
then (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, (a -> Process m Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadDES m =>
(a -> Process m Bool) -> Stream m a -> Stream m a
takeStreamWhileM a -> Process m Bool
p Stream m a
xs)
else Process m (a, Stream m a)
forall (m :: * -> *) a. MonadDES m => Process m a
neverProcess
dropStream :: MonadDES m => Int -> Stream m a -> Stream m a
{-# INLINABLE dropStream #-}
dropStream :: Int -> Stream m a -> Stream m a
dropStream Int
n Stream m a
s
| Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 = Stream m a
s
| Bool
otherwise =
Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (a, Stream m a) -> Stream m a)
-> Process m (a, Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$
do (a
a, Stream m a
xs) <- Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
s
Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream (Stream m a -> Process m (a, Stream m a))
-> Stream m a -> Process m (a, Stream m a)
forall a b. (a -> b) -> a -> b
$ Int -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadDES m =>
Int -> Stream m a -> Stream m a
dropStream (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) Stream m a
xs
dropStreamWhile :: MonadDES m => (a -> Bool) -> Stream m a -> Stream m a
{-# INLINABLE dropStreamWhile #-}
dropStreamWhile :: (a -> Bool) -> Stream m a -> Stream m a
dropStreamWhile a -> Bool
p Stream m a
s =
Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (a, Stream m a) -> Stream m a)
-> Process m (a, Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$
do (a
a, Stream m a
xs) <- Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
s
if a -> Bool
p a
a
then Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream (Stream m a -> Process m (a, Stream m a))
-> Stream m a -> Process m (a, Stream m a)
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadDES m =>
(a -> Bool) -> Stream m a -> Stream m a
dropStreamWhile a -> Bool
p Stream m a
xs
else (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream m a
xs)
dropStreamWhileM :: MonadDES m => (a -> Process m Bool) -> Stream m a -> Stream m a
{-# INLINABLE dropStreamWhileM #-}
dropStreamWhileM :: (a -> Process m Bool) -> Stream m a -> Stream m a
dropStreamWhileM a -> Process m Bool
p Stream m a
s =
Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (a, Stream m a) -> Stream m a)
-> Process m (a, Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$
do (a
a, Stream m a
xs) <- Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
s
Bool
f <- a -> Process m Bool
p a
a
if Bool
f
then Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream (Stream m a -> Process m (a, Stream m a))
-> Stream m a -> Process m (a, Stream m a)
forall a b. (a -> b) -> a -> b
$ (a -> Process m Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadDES m =>
(a -> Process m Bool) -> Stream m a -> Stream m a
dropStreamWhileM a -> Process m Bool
p Stream m a
xs
else (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream m a
xs)
cloneStream :: MonadDES m => Int -> Stream m a -> Simulation m [Stream m a]
{-# INLINABLE cloneStream #-}
cloneStream :: Int -> Stream m a -> Simulation m [Stream m a]
cloneStream Int
n Stream m a
s =
do [FCFSQueue m a]
qs <- [Int]
-> (Int -> Simulation m (FCFSQueue m a))
-> Simulation m [FCFSQueue m a]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Int
1..Int
n] ((Int -> Simulation m (FCFSQueue m a))
-> Simulation m [FCFSQueue m a])
-> (Int -> Simulation m (FCFSQueue m a))
-> Simulation m [FCFSQueue m a]
forall a b. (a -> b) -> a -> b
$ \Int
i -> Simulation m (FCFSQueue m a)
forall (m :: * -> *) a. MonadDES m => Simulation m (FCFSQueue m a)
IQ.newFCFSQueue
FCFSResource m
rs <- Int -> Simulation m (FCFSResource m)
forall (m :: * -> *).
MonadDES m =>
Int -> Simulation m (FCFSResource m)
newFCFSResource Int
1
Ref m (Stream m a)
ref <- Stream m a -> Simulation m (Ref m (Stream m a))
forall (m :: * -> *) a. MonadRef m => a -> Simulation m (Ref m a)
newRef Stream m a
s
let reader :: a -> Queue m sm so a -> Process m a
reader a
m Queue m sm so a
q =
do Maybe a
a <- Event m (Maybe a) -> Process m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m (Maybe a) -> Process m (Maybe a))
-> Event m (Maybe a) -> Process m (Maybe a)
forall a b. (a -> b) -> a -> b
$ Queue m sm so a -> Event m (Maybe a)
forall (m :: * -> *) sm so a.
(MonadDES m, DequeueStrategy m sm) =>
Queue m sm so a -> Event m (Maybe a)
IQ.tryDequeue Queue m sm so a
q
case Maybe a
a of
Just a
a -> a -> Process m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
Maybe a
Nothing ->
FCFSResource m -> Process m a -> Process m a
forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m a -> Process m a
usingResource FCFSResource m
rs (Process m a -> Process m a) -> Process m a -> Process m a
forall a b. (a -> b) -> a -> b
$
do Maybe a
a <- Event m (Maybe a) -> Process m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m (Maybe a) -> Process m (Maybe a))
-> Event m (Maybe a) -> Process m (Maybe a)
forall a b. (a -> b) -> a -> b
$ Queue m sm so a -> Event m (Maybe a)
forall (m :: * -> *) sm so a.
(MonadDES m, DequeueStrategy m sm) =>
Queue m sm so a -> Event m (Maybe a)
IQ.tryDequeue Queue m sm so a
q
case Maybe a
a of
Just a
a -> a -> Process m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
Maybe a
Nothing ->
do Stream m a
s <- Event m (Stream m a) -> Process m (Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m (Stream m a) -> Process m (Stream m a))
-> Event m (Stream m a) -> Process m (Stream m a)
forall a b. (a -> b) -> a -> b
$ Ref m (Stream m a) -> Event m (Stream m a)
forall (m :: * -> *) a. MonadRef m => Ref m a -> Event m a
readRef Ref m (Stream m a)
ref
(a
a, Stream m a
xs) <- Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
s
Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$ Ref m (Stream m a) -> Stream m a -> Event m ()
forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Stream m a)
ref Stream m a
xs
[(a, FCFSQueue m a)]
-> ((a, FCFSQueue m a) -> Process m ()) -> Process m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([a] -> [FCFSQueue m a] -> [(a, FCFSQueue m a)]
forall a b. [a] -> [b] -> [(a, b)]
zip [a
1..] [FCFSQueue m a]
qs) (((a, FCFSQueue m a) -> Process m ()) -> Process m ())
-> ((a, FCFSQueue m a) -> Process m ()) -> Process m ()
forall a b. (a -> b) -> a -> b
$ \(a
i, FCFSQueue m a
q) ->
Bool -> Process m () -> Process m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (a
i a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
m) (Process m () -> Process m ()) -> Process m () -> Process m ()
forall a b. (a -> b) -> a -> b
$
Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$ FCFSQueue m a -> a -> Event m ()
forall (m :: * -> *) sm so a.
(MonadDES m, EnqueueStrategy m sm, DequeueStrategy m so) =>
Queue m sm so a -> a -> Event m ()
IQ.enqueue FCFSQueue m a
q a
a
a -> Process m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
[(Integer, FCFSQueue m a)]
-> ((Integer, FCFSQueue m a) -> Simulation m (Stream m a))
-> Simulation m [Stream m a]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM ([Integer] -> [FCFSQueue m a] -> [(Integer, FCFSQueue m a)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Integer
1..] [FCFSQueue m a]
qs) (((Integer, FCFSQueue m a) -> Simulation m (Stream m a))
-> Simulation m [Stream m a])
-> ((Integer, FCFSQueue m a) -> Simulation m (Stream m a))
-> Simulation m [Stream m a]
forall a b. (a -> b) -> a -> b
$ \(Integer
i, FCFSQueue m a
q) ->
Stream m a -> Simulation m (Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream m a -> Simulation m (Stream m a))
-> Stream m a -> Simulation m (Stream m a)
forall a b. (a -> b) -> a -> b
$ Process m a -> Stream m a
forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess (Process m a -> Stream m a) -> Process m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ Integer -> FCFSQueue m a -> Process m a
forall a sm so.
(Num a, Enum a, Eq a, DequeueStrategy m sm) =>
a -> Queue m sm so a -> Process m a
reader Integer
i FCFSQueue m a
q
firstArrivalStream :: MonadDES m => Int -> Stream m a -> Stream m a
{-# INLINABLE firstArrivalStream #-}
firstArrivalStream :: Int -> Stream m a -> Stream m a
firstArrivalStream Int
n Stream m a
s = ((Int, Maybe a) -> a -> Process m ((Int, Maybe a), Maybe a))
-> (Int, Maybe a) -> Stream m a -> Stream m a
forall (m :: * -> *) acc a b.
MonadDES m =>
(acc -> a -> Process m (acc, Maybe b))
-> acc -> Stream m a -> Stream m b
assembleAccumStream (Int, Maybe a) -> a -> Process m ((Int, Maybe a), Maybe a)
forall (m :: * -> *) a.
Monad m =>
(Int, Maybe a) -> a -> m ((Int, Maybe a), Maybe a)
f (Int
1, Maybe a
forall a. Maybe a
Nothing) Stream m a
s
where f :: (Int, Maybe a) -> a -> m ((Int, Maybe a), Maybe a)
f (Int
i, Maybe a
a0) a
a =
let a0' :: Maybe a
a0' = a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> a -> Maybe a
forall a b. (a -> b) -> a -> b
$ a -> Maybe a -> a
forall a. a -> Maybe a -> a
fromMaybe a
a Maybe a
a0
in if Int
i Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` Int
n Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
then ((Int, Maybe a), Maybe a) -> m ((Int, Maybe a), Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return ((Int
1, Maybe a
forall a. Maybe a
Nothing), Maybe a
a0')
else ((Int, Maybe a), Maybe a) -> m ((Int, Maybe a), Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return ((Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1, Maybe a
a0'), Maybe a
forall a. Maybe a
Nothing)
lastArrivalStream :: MonadDES m => Int -> Stream m a -> Stream m a
{-# INLINABLE lastArrivalStream #-}
lastArrivalStream :: Int -> Stream m a -> Stream m a
lastArrivalStream Int
n Stream m a
s = (Int -> a -> Process m (Int, Maybe a))
-> Int -> Stream m a -> Stream m a
forall (m :: * -> *) acc a b.
MonadDES m =>
(acc -> a -> Process m (acc, Maybe b))
-> acc -> Stream m a -> Stream m b
assembleAccumStream Int -> a -> Process m (Int, Maybe a)
forall (m :: * -> *) a. Monad m => Int -> a -> m (Int, Maybe a)
f Int
1 Stream m a
s
where f :: Int -> a -> m (Int, Maybe a)
f Int
i a
a =
if Int
i Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` Int
n Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
then (Int, Maybe a) -> m (Int, Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
1, a -> Maybe a
forall a. a -> Maybe a
Just a
a)
else (Int, Maybe a) -> m (Int, Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1, Maybe a
forall a. Maybe a
Nothing)
assembleAccumStream :: MonadDES m => (acc -> a -> Process m (acc, Maybe b)) -> acc -> Stream m a -> Stream m b
{-# INLINABLE assembleAccumStream #-}
assembleAccumStream :: (acc -> a -> Process m (acc, Maybe b))
-> acc -> Stream m a -> Stream m b
assembleAccumStream acc -> a -> Process m (acc, Maybe b)
f acc
acc Stream m a
s =
(Maybe b -> b) -> Stream m (Maybe b) -> Stream m b
forall (m :: * -> *) a b.
MonadDES m =>
(a -> b) -> Stream m a -> Stream m b
mapStream Maybe b -> b
forall a. HasCallStack => Maybe a -> a
fromJust (Stream m (Maybe b) -> Stream m b)
-> Stream m (Maybe b) -> Stream m b
forall a b. (a -> b) -> a -> b
$
(Maybe b -> Bool) -> Stream m (Maybe b) -> Stream m (Maybe b)
forall (m :: * -> *) a.
MonadDES m =>
(a -> Bool) -> Stream m a -> Stream m a
filterStream Maybe b -> Bool
forall a. Maybe a -> Bool
isJust (Stream m (Maybe b) -> Stream m (Maybe b))
-> Stream m (Maybe b) -> Stream m (Maybe b)
forall a b. (a -> b) -> a -> b
$
(acc -> a -> Process m (acc, Maybe b))
-> acc -> Stream m a -> Stream m (Maybe b)
forall (m :: * -> *) acc a b.
MonadDES m =>
(acc -> a -> Process m (acc, b)) -> acc -> Stream m a -> Stream m b
accumStream acc -> a -> Process m (acc, Maybe b)
f acc
acc Stream m a
s
traceStream :: MonadDES m
=> Maybe String
-> Maybe String
-> Stream m a
-> Stream m a
{-# INLINABLE traceStream #-}
traceStream :: Maybe [Char] -> Maybe [Char] -> Stream m a -> Stream m a
traceStream Maybe [Char]
request Maybe [Char]
response Stream m a
s = Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (a, Stream m a) -> Stream m a)
-> Process m (a, Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Process m (a, Stream m a)
loop Stream m a
s where
loop :: Stream m a -> Process m (a, Stream m a)
loop Stream m a
s = do (a
a, Stream m a
xs) <-
case Maybe [Char]
request of
Maybe [Char]
Nothing -> Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
s
Just [Char]
message ->
[Char] -> Process m (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a.
MonadDES m =>
[Char] -> Process m a -> Process m a
traceProcess [Char]
message (Process m (a, Stream m a) -> Process m (a, Stream m a))
-> Process m (a, Stream m a) -> Process m (a, Stream m a)
forall a b. (a -> b) -> a -> b
$
Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
s
case Maybe [Char]
response of
Maybe [Char]
Nothing -> (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (a, Stream m a) -> Stream m a)
-> Process m (a, Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Process m (a, Stream m a)
loop Stream m a
xs)
Just [Char]
message ->
[Char] -> Process m (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a.
MonadDES m =>
[Char] -> Process m a -> Process m a
traceProcess [Char]
message (Process m (a, Stream m a) -> Process m (a, Stream m a))
-> Process m (a, Stream m a) -> Process m (a, Stream m a)
forall a b. (a -> b) -> a -> b
$
(a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (a, Stream m a) -> Stream m a)
-> Process m (a, Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Process m (a, Stream m a)
loop Stream m a
xs)