{-# LANGUAGE FlexibleContexts #-}
module Simulation.Aivika.Trans.Processor
(
Processor(..),
emptyProcessor,
arrProcessor,
accumProcessor,
withinProcessor,
processorUsingId,
prefetchProcessor,
delayProcessor,
bufferProcessor,
bufferProcessorLoop,
queueProcessor,
queueProcessorLoopMerging,
queueProcessorLoopSeq,
queueProcessorLoopParallel,
processorSeq,
processorParallel,
processorQueuedParallel,
processorPrioritisingOutputParallel,
processorPrioritisingInputParallel,
processorPrioritisingInputOutputParallel,
arrivalProcessor,
joinProcessor,
failoverProcessor,
channelProcessor,
processorChannel,
queuedChannelProcessor,
queuedProcessorChannel,
traceProcessor) where
import qualified Control.Category as C
import Control.Arrow
import Data.Monoid
import Simulation.Aivika.Trans.DES
import Simulation.Aivika.Trans.Simulation
import Simulation.Aivika.Trans.Dynamics
import Simulation.Aivika.Trans.Event
import Simulation.Aivika.Trans.Composite
import Simulation.Aivika.Trans.Cont
import Simulation.Aivika.Trans.Process
import Simulation.Aivika.Trans.Stream
import Simulation.Aivika.Trans.QueueStrategy
import Simulation.Aivika.Trans.Signal
import Simulation.Aivika.Trans.Channel
import Simulation.Aivika.Arrival (Arrival(..))
newtype Processor m a b =
Processor { Processor m a b -> Stream m a -> Stream m b
runProcessor :: Stream m a -> Stream m b
}
instance C.Category (Processor m) where
{-# INLINE id #-}
id :: Processor m a a
id = (Stream m a -> Stream m a) -> Processor m a a
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor Stream m a -> Stream m a
forall a. a -> a
id
{-# INLINE (.) #-}
Processor Stream m b -> Stream m c
x . :: Processor m b c -> Processor m a b -> Processor m a c
. Processor Stream m a -> Stream m b
y = (Stream m a -> Stream m c) -> Processor m a c
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor (Stream m b -> Stream m c
x (Stream m b -> Stream m c)
-> (Stream m a -> Stream m b) -> Stream m a -> Stream m c
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m a -> Stream m b
y)
instance MonadDES m => Arrow (Processor m) where
{-# INLINABLE arr #-}
arr :: (b -> c) -> Processor m b c
arr = (Stream m b -> Stream m c) -> Processor m b c
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor ((Stream m b -> Stream m c) -> Processor m b c)
-> ((b -> c) -> Stream m b -> Stream m c)
-> (b -> c)
-> Processor m b c
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (b -> c) -> Stream m b -> Stream m c
forall (m :: * -> *) a b.
MonadDES m =>
(a -> b) -> Stream m a -> Stream m b
mapStream
{-# INLINABLE first #-}
first :: Processor m b c -> Processor m (b, d) (c, d)
first (Processor Stream m b -> Stream m c
f) =
(Stream m (b, d) -> Stream m (c, d)) -> Processor m (b, d) (c, d)
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor ((Stream m (b, d) -> Stream m (c, d)) -> Processor m (b, d) (c, d))
-> (Stream m (b, d) -> Stream m (c, d))
-> Processor m (b, d) (c, d)
forall a b. (a -> b) -> a -> b
$ \Stream m (b, d)
xys ->
Process m ((c, d), Stream m (c, d)) -> Stream m (c, d)
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m ((c, d), Stream m (c, d)) -> Stream m (c, d))
-> Process m ((c, d), Stream m (c, d)) -> Stream m (c, d)
forall a b. (a -> b) -> a -> b
$
do (Stream m b
xs, Stream m d
ys) <- Simulation m (Stream m b, Stream m d)
-> Process m (Stream m b, Stream m d)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Stream m b, Stream m d)
-> Process m (Stream m b, Stream m d))
-> Simulation m (Stream m b, Stream m d)
-> Process m (Stream m b, Stream m d)
forall a b. (a -> b) -> a -> b
$ Stream m (b, d) -> Simulation m (Stream m b, Stream m d)
forall (m :: * -> *) a b.
MonadDES m =>
Stream m (a, b) -> Simulation m (Stream m a, Stream m b)
unzipStream Stream m (b, d)
xys
Stream m (c, d) -> Process m ((c, d), Stream m (c, d))
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream (Stream m (c, d) -> Process m ((c, d), Stream m (c, d)))
-> Stream m (c, d) -> Process m ((c, d), Stream m (c, d))
forall a b. (a -> b) -> a -> b
$ Stream m c -> Stream m d -> Stream m (c, d)
forall (m :: * -> *) a b.
MonadDES m =>
Stream m a -> Stream m b -> Stream m (a, b)
zipStreamParallel (Stream m b -> Stream m c
f Stream m b
xs) Stream m d
ys
{-# INLINABLE second #-}
second :: Processor m b c -> Processor m (d, b) (d, c)
second (Processor Stream m b -> Stream m c
f) =
(Stream m (d, b) -> Stream m (d, c)) -> Processor m (d, b) (d, c)
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor ((Stream m (d, b) -> Stream m (d, c)) -> Processor m (d, b) (d, c))
-> (Stream m (d, b) -> Stream m (d, c))
-> Processor m (d, b) (d, c)
forall a b. (a -> b) -> a -> b
$ \Stream m (d, b)
xys ->
Process m ((d, c), Stream m (d, c)) -> Stream m (d, c)
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m ((d, c), Stream m (d, c)) -> Stream m (d, c))
-> Process m ((d, c), Stream m (d, c)) -> Stream m (d, c)
forall a b. (a -> b) -> a -> b
$
do (Stream m d
xs, Stream m b
ys) <- Simulation m (Stream m d, Stream m b)
-> Process m (Stream m d, Stream m b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Stream m d, Stream m b)
-> Process m (Stream m d, Stream m b))
-> Simulation m (Stream m d, Stream m b)
-> Process m (Stream m d, Stream m b)
forall a b. (a -> b) -> a -> b
$ Stream m (d, b) -> Simulation m (Stream m d, Stream m b)
forall (m :: * -> *) a b.
MonadDES m =>
Stream m (a, b) -> Simulation m (Stream m a, Stream m b)
unzipStream Stream m (d, b)
xys
Stream m (d, c) -> Process m ((d, c), Stream m (d, c))
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream (Stream m (d, c) -> Process m ((d, c), Stream m (d, c)))
-> Stream m (d, c) -> Process m ((d, c), Stream m (d, c))
forall a b. (a -> b) -> a -> b
$ Stream m d -> Stream m c -> Stream m (d, c)
forall (m :: * -> *) a b.
MonadDES m =>
Stream m a -> Stream m b -> Stream m (a, b)
zipStreamParallel Stream m d
xs (Stream m b -> Stream m c
f Stream m b
ys)
{-# INLINABLE (***) #-}
Processor Stream m b -> Stream m c
f *** :: Processor m b c -> Processor m b' c' -> Processor m (b, b') (c, c')
*** Processor Stream m b' -> Stream m c'
g =
(Stream m (b, b') -> Stream m (c, c'))
-> Processor m (b, b') (c, c')
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor ((Stream m (b, b') -> Stream m (c, c'))
-> Processor m (b, b') (c, c'))
-> (Stream m (b, b') -> Stream m (c, c'))
-> Processor m (b, b') (c, c')
forall a b. (a -> b) -> a -> b
$ \Stream m (b, b')
xys ->
Process m ((c, c'), Stream m (c, c')) -> Stream m (c, c')
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m ((c, c'), Stream m (c, c')) -> Stream m (c, c'))
-> Process m ((c, c'), Stream m (c, c')) -> Stream m (c, c')
forall a b. (a -> b) -> a -> b
$
do (Stream m b
xs, Stream m b'
ys) <- Simulation m (Stream m b, Stream m b')
-> Process m (Stream m b, Stream m b')
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Stream m b, Stream m b')
-> Process m (Stream m b, Stream m b'))
-> Simulation m (Stream m b, Stream m b')
-> Process m (Stream m b, Stream m b')
forall a b. (a -> b) -> a -> b
$ Stream m (b, b') -> Simulation m (Stream m b, Stream m b')
forall (m :: * -> *) a b.
MonadDES m =>
Stream m (a, b) -> Simulation m (Stream m a, Stream m b)
unzipStream Stream m (b, b')
xys
Stream m (c, c') -> Process m ((c, c'), Stream m (c, c'))
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream (Stream m (c, c') -> Process m ((c, c'), Stream m (c, c')))
-> Stream m (c, c') -> Process m ((c, c'), Stream m (c, c'))
forall a b. (a -> b) -> a -> b
$ Stream m c -> Stream m c' -> Stream m (c, c')
forall (m :: * -> *) a b.
MonadDES m =>
Stream m a -> Stream m b -> Stream m (a, b)
zipStreamParallel (Stream m b -> Stream m c
f Stream m b
xs) (Stream m b' -> Stream m c'
g Stream m b'
ys)
{-# INLINABLE (&&&) #-}
Processor Stream m b -> Stream m c
f &&& :: Processor m b c -> Processor m b c' -> Processor m b (c, c')
&&& Processor Stream m b -> Stream m c'
g =
(Stream m b -> Stream m (c, c')) -> Processor m b (c, c')
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor ((Stream m b -> Stream m (c, c')) -> Processor m b (c, c'))
-> (Stream m b -> Stream m (c, c')) -> Processor m b (c, c')
forall a b. (a -> b) -> a -> b
$ \Stream m b
xs -> Stream m c -> Stream m c' -> Stream m (c, c')
forall (m :: * -> *) a b.
MonadDES m =>
Stream m a -> Stream m b -> Stream m (a, b)
zipStreamParallel (Stream m b -> Stream m c
f Stream m b
xs) (Stream m b -> Stream m c'
g Stream m b
xs)
instance MonadDES m => ArrowChoice (Processor m) where
{-# INLINABLE left #-}
left :: Processor m b c -> Processor m (Either b d) (Either c d)
left (Processor Stream m b -> Stream m c
f) =
(Stream m (Either b d) -> Stream m (Either c d))
-> Processor m (Either b d) (Either c d)
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor ((Stream m (Either b d) -> Stream m (Either c d))
-> Processor m (Either b d) (Either c d))
-> (Stream m (Either b d) -> Stream m (Either c d))
-> Processor m (Either b d) (Either c d)
forall a b. (a -> b) -> a -> b
$ \Stream m (Either b d)
xs ->
Process m (Either c d, Stream m (Either c d))
-> Stream m (Either c d)
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (Either c d, Stream m (Either c d))
-> Stream m (Either c d))
-> Process m (Either c d, Stream m (Either c d))
-> Stream m (Either c d)
forall a b. (a -> b) -> a -> b
$
do Stream m (Either b d)
ys <- Simulation m (Stream m (Either b d))
-> Process m (Stream m (Either b d))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Stream m (Either b d))
-> Process m (Stream m (Either b d)))
-> Simulation m (Stream m (Either b d))
-> Process m (Stream m (Either b d))
forall a b. (a -> b) -> a -> b
$ Stream m (Either b d) -> Simulation m (Stream m (Either b d))
forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Simulation m (Stream m a)
memoStream Stream m (Either b d)
xs
Stream m (Either c d)
-> Process m (Either c d, Stream m (Either c d))
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream (Stream m (Either c d)
-> Process m (Either c d, Stream m (Either c d)))
-> Stream m (Either c d)
-> Process m (Either c d, Stream m (Either c d))
forall a b. (a -> b) -> a -> b
$ Stream m (Either b d) -> Stream m c -> Stream m (Either c d)
forall (m :: * -> *) a b c.
MonadDES m =>
Stream m (Either a b) -> Stream m c -> Stream m (Either c b)
replaceLeftStream Stream m (Either b d)
ys (Stream m b -> Stream m c
f (Stream m b -> Stream m c) -> Stream m b -> Stream m c
forall a b. (a -> b) -> a -> b
$ Stream m (Either b d) -> Stream m b
forall (m :: * -> *) a b.
MonadDES m =>
Stream m (Either a b) -> Stream m a
leftStream Stream m (Either b d)
ys)
{-# INLINABLE right #-}
right :: Processor m b c -> Processor m (Either d b) (Either d c)
right (Processor Stream m b -> Stream m c
f) =
(Stream m (Either d b) -> Stream m (Either d c))
-> Processor m (Either d b) (Either d c)
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor ((Stream m (Either d b) -> Stream m (Either d c))
-> Processor m (Either d b) (Either d c))
-> (Stream m (Either d b) -> Stream m (Either d c))
-> Processor m (Either d b) (Either d c)
forall a b. (a -> b) -> a -> b
$ \Stream m (Either d b)
xs ->
Process m (Either d c, Stream m (Either d c))
-> Stream m (Either d c)
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (Either d c, Stream m (Either d c))
-> Stream m (Either d c))
-> Process m (Either d c, Stream m (Either d c))
-> Stream m (Either d c)
forall a b. (a -> b) -> a -> b
$
do Stream m (Either d b)
ys <- Simulation m (Stream m (Either d b))
-> Process m (Stream m (Either d b))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Stream m (Either d b))
-> Process m (Stream m (Either d b)))
-> Simulation m (Stream m (Either d b))
-> Process m (Stream m (Either d b))
forall a b. (a -> b) -> a -> b
$ Stream m (Either d b) -> Simulation m (Stream m (Either d b))
forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Simulation m (Stream m a)
memoStream Stream m (Either d b)
xs
Stream m (Either d c)
-> Process m (Either d c, Stream m (Either d c))
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream (Stream m (Either d c)
-> Process m (Either d c, Stream m (Either d c)))
-> Stream m (Either d c)
-> Process m (Either d c, Stream m (Either d c))
forall a b. (a -> b) -> a -> b
$ Stream m (Either d b) -> Stream m c -> Stream m (Either d c)
forall (m :: * -> *) a b c.
MonadDES m =>
Stream m (Either a b) -> Stream m c -> Stream m (Either a c)
replaceRightStream Stream m (Either d b)
ys (Stream m b -> Stream m c
f (Stream m b -> Stream m c) -> Stream m b -> Stream m c
forall a b. (a -> b) -> a -> b
$ Stream m (Either d b) -> Stream m b
forall (m :: * -> *) a b.
MonadDES m =>
Stream m (Either a b) -> Stream m b
rightStream Stream m (Either d b)
ys)
instance MonadDES m => ArrowZero (Processor m) where
{-# INLINE zeroArrow #-}
zeroArrow :: Processor m b c
zeroArrow = Processor m b c
forall (m :: * -> *) a b. MonadDES m => Processor m a b
emptyProcessor
instance MonadDES m => ArrowPlus (Processor m) where
{-# INLINABLE (<+>) #-}
(Processor Stream m b -> Stream m c
f) <+> :: Processor m b c -> Processor m b c -> Processor m b c
<+> (Processor Stream m b -> Stream m c
g) =
(Stream m b -> Stream m c) -> Processor m b c
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor ((Stream m b -> Stream m c) -> Processor m b c)
-> (Stream m b -> Stream m c) -> Processor m b c
forall a b. (a -> b) -> a -> b
$ \Stream m b
xs ->
Process m (c, Stream m c) -> Stream m c
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (c, Stream m c) -> Stream m c)
-> Process m (c, Stream m c) -> Stream m c
forall a b. (a -> b) -> a -> b
$
do [Stream m b
xs1, Stream m b
xs2] <- Simulation m [Stream m b] -> Process m [Stream m b]
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m [Stream m b] -> Process m [Stream m b])
-> Simulation m [Stream m b] -> Process m [Stream m b]
forall a b. (a -> b) -> a -> b
$ Int -> Stream m b -> Simulation m [Stream m b]
forall (m :: * -> *) a.
MonadDES m =>
Int -> Stream m a -> Simulation m [Stream m a]
splitStream Int
2 Stream m b
xs
Stream m c -> Process m (c, Stream m c)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream (Stream m c -> Process m (c, Stream m c))
-> Stream m c -> Process m (c, Stream m c)
forall a b. (a -> b) -> a -> b
$ Stream m c -> Stream m c -> Stream m c
forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Stream m a -> Stream m a
mergeStreams (Stream m b -> Stream m c
f Stream m b
xs1) (Stream m b -> Stream m c
g Stream m b
xs2)
emptyProcessor :: MonadDES m => Processor m a b
{-# INLINABLE emptyProcessor #-}
emptyProcessor :: Processor m a b
emptyProcessor = (Stream m a -> Stream m b) -> Processor m a b
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor ((Stream m a -> Stream m b) -> Processor m a b)
-> (Stream m a -> Stream m b) -> Processor m a b
forall a b. (a -> b) -> a -> b
$ Stream m b -> Stream m a -> Stream m b
forall a b. a -> b -> a
const Stream m b
forall (m :: * -> *) a. MonadDES m => Stream m a
emptyStream
arrProcessor :: MonadDES m => (a -> Process m b) -> Processor m a b
{-# INLINABLE arrProcessor #-}
arrProcessor :: (a -> Process m b) -> Processor m a b
arrProcessor = (Stream m a -> Stream m b) -> Processor m a b
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor ((Stream m a -> Stream m b) -> Processor m a b)
-> ((a -> Process m b) -> Stream m a -> Stream m b)
-> (a -> Process m b)
-> Processor m a b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> Process m b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
MonadDES m =>
(a -> Process m b) -> Stream m a -> Stream m b
mapStreamM
accumProcessor :: MonadDES m => (acc -> a -> Process m (acc, b)) -> acc -> Processor m a b
{-# INLINABLE accumProcessor #-}
accumProcessor :: (acc -> a -> Process m (acc, b)) -> acc -> Processor m a b
accumProcessor acc -> a -> Process m (acc, b)
f acc
acc =
(Stream m a -> Stream m b) -> Processor m a b
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor ((Stream m a -> Stream m b) -> Processor m a b)
-> (Stream m a -> Stream m b) -> Processor m a b
forall a b. (a -> b) -> a -> b
$ \Stream m a
xs -> Process m (b, Stream m b) -> Stream m b
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (b, Stream m b) -> Stream m b)
-> Process m (b, Stream m b) -> Stream m b
forall a b. (a -> b) -> a -> b
$ Stream m a -> acc -> Process m (b, Stream m b)
loop Stream m a
xs acc
acc where
loop :: Stream m a -> acc -> Process m (b, Stream m b)
loop Stream m a
xs acc
acc =
do (a
a, Stream m a
xs') <- Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
xs
(acc
acc', b
b) <- acc -> a -> Process m (acc, b)
f acc
acc a
a
(b, Stream m b) -> Process m (b, Stream m b)
forall (m :: * -> *) a. Monad m => a -> m a
return (b
b, Process m (b, Stream m b) -> Stream m b
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (b, Stream m b) -> Stream m b)
-> Process m (b, Stream m b) -> Stream m b
forall a b. (a -> b) -> a -> b
$ Stream m a -> acc -> Process m (b, Stream m b)
loop Stream m a
xs' acc
acc')
withinProcessor :: MonadDES m => Process m () -> Processor m a a
{-# INLINABLE withinProcessor #-}
withinProcessor :: Process m () -> Processor m a a
withinProcessor Process m ()
m =
(Stream m a -> Stream m a) -> Processor m a a
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor ((Stream m a -> Stream m a) -> Processor m a a)
-> (Stream m a -> Stream m a) -> Processor m a a
forall a b. (a -> b) -> a -> b
$
(a -> Process m a) -> Stream m a -> Stream m a
forall (m :: * -> *) a b.
MonadDES m =>
(a -> Process m b) -> Stream m a -> Stream m b
mapStreamM ((a -> Process m a) -> Stream m a -> Stream m a)
-> (a -> Process m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ \a
a ->
do { Process m ()
m; a -> Process m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a }
processorUsingId :: MonadDES m => ProcessId m -> Processor m a b -> Processor m a b
{-# INLINABLE processorUsingId #-}
processorUsingId :: ProcessId m -> Processor m a b -> Processor m a b
processorUsingId ProcessId m
pid (Processor Stream m a -> Stream m b
f) =
(Stream m a -> Stream m b) -> Processor m a b
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor ((Stream m a -> Stream m b) -> Processor m a b)
-> (Stream m a -> Stream m b) -> Processor m a b
forall a b. (a -> b) -> a -> b
$ Process m (b, Stream m b) -> Stream m b
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (b, Stream m b) -> Stream m b)
-> (Stream m a -> Process m (b, Stream m b))
-> Stream m a
-> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessId m
-> Process m (b, Stream m b) -> Process m (b, Stream m b)
forall (m :: * -> *) a.
MonadDES m =>
ProcessId m -> Process m a -> Process m a
processUsingId ProcessId m
pid (Process m (b, Stream m b) -> Process m (b, Stream m b))
-> (Stream m a -> Process m (b, Stream m b))
-> Stream m a
-> Process m (b, Stream m b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m b -> Process m (b, Stream m b)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream (Stream m b -> Process m (b, Stream m b))
-> (Stream m a -> Stream m b)
-> Stream m a
-> Process m (b, Stream m b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m a -> Stream m b
f
processorQueuedParallel :: (MonadDES m,
EnqueueStrategy m si,
EnqueueStrategy m so)
=> si
-> so
-> [Processor m a b]
-> Processor m a b
{-# INLINABLE processorQueuedParallel #-}
processorQueuedParallel :: si -> so -> [Processor m a b] -> Processor m a b
processorQueuedParallel si
si so
so [Processor m a b]
ps =
(Stream m a -> Stream m b) -> Processor m a b
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor ((Stream m a -> Stream m b) -> Processor m a b)
-> (Stream m a -> Stream m b) -> Processor m a b
forall a b. (a -> b) -> a -> b
$ \Stream m a
xs ->
Process m (b, Stream m b) -> Stream m b
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (b, Stream m b) -> Stream m b)
-> Process m (b, Stream m b) -> Stream m b
forall a b. (a -> b) -> a -> b
$
do let n :: Int
n = [Processor m a b] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Processor m a b]
ps
[Stream m a]
input <- Simulation m [Stream m a] -> Process m [Stream m a]
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m [Stream m a] -> Process m [Stream m a])
-> Simulation m [Stream m a] -> Process m [Stream m a]
forall a b. (a -> b) -> a -> b
$ si -> Int -> Stream m a -> Simulation m [Stream m a]
forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
s -> Int -> Stream m a -> Simulation m [Stream m a]
splitStreamQueueing si
si Int
n Stream m a
xs
let results :: [Stream m b]
results = (((Stream m a, Processor m a b) -> Stream m b)
-> [(Stream m a, Processor m a b)] -> [Stream m b])
-> [(Stream m a, Processor m a b)]
-> ((Stream m a, Processor m a b) -> Stream m b)
-> [Stream m b]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((Stream m a, Processor m a b) -> Stream m b)
-> [(Stream m a, Processor m a b)] -> [Stream m b]
forall a b. (a -> b) -> [a] -> [b]
map ([Stream m a]
-> [Processor m a b] -> [(Stream m a, Processor m a b)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Stream m a]
input [Processor m a b]
ps) (((Stream m a, Processor m a b) -> Stream m b) -> [Stream m b])
-> ((Stream m a, Processor m a b) -> Stream m b) -> [Stream m b]
forall a b. (a -> b) -> a -> b
$ \(Stream m a
input, Processor m a b
p) ->
Processor m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Processor m a b -> Stream m a -> Stream m b
runProcessor Processor m a b
p Stream m a
input
output :: Stream m b
output = so -> [Stream m b] -> Stream m b
forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
s -> [Stream m a] -> Stream m a
concatQueuedStreams so
so [Stream m b]
results
Stream m b -> Process m (b, Stream m b)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m b
output
processorPrioritisingOutputParallel :: (MonadDES m,
EnqueueStrategy m si,
PriorityQueueStrategy m so po)
=> si
-> so
-> [Processor m a (po, b)]
-> Processor m a b
{-# INLINABLE processorPrioritisingOutputParallel #-}
processorPrioritisingOutputParallel :: si -> so -> [Processor m a (po, b)] -> Processor m a b
processorPrioritisingOutputParallel si
si so
so [Processor m a (po, b)]
ps =
(Stream m a -> Stream m b) -> Processor m a b
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor ((Stream m a -> Stream m b) -> Processor m a b)
-> (Stream m a -> Stream m b) -> Processor m a b
forall a b. (a -> b) -> a -> b
$ \Stream m a
xs ->
Process m (b, Stream m b) -> Stream m b
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (b, Stream m b) -> Stream m b)
-> Process m (b, Stream m b) -> Stream m b
forall a b. (a -> b) -> a -> b
$
do let n :: Int
n = [Processor m a (po, b)] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Processor m a (po, b)]
ps
[Stream m a]
input <- Simulation m [Stream m a] -> Process m [Stream m a]
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m [Stream m a] -> Process m [Stream m a])
-> Simulation m [Stream m a] -> Process m [Stream m a]
forall a b. (a -> b) -> a -> b
$ si -> Int -> Stream m a -> Simulation m [Stream m a]
forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
s -> Int -> Stream m a -> Simulation m [Stream m a]
splitStreamQueueing si
si Int
n Stream m a
xs
let results :: [Stream m (po, b)]
results = (((Stream m a, Processor m a (po, b)) -> Stream m (po, b))
-> [(Stream m a, Processor m a (po, b))] -> [Stream m (po, b)])
-> [(Stream m a, Processor m a (po, b))]
-> ((Stream m a, Processor m a (po, b)) -> Stream m (po, b))
-> [Stream m (po, b)]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((Stream m a, Processor m a (po, b)) -> Stream m (po, b))
-> [(Stream m a, Processor m a (po, b))] -> [Stream m (po, b)]
forall a b. (a -> b) -> [a] -> [b]
map ([Stream m a]
-> [Processor m a (po, b)] -> [(Stream m a, Processor m a (po, b))]
forall a b. [a] -> [b] -> [(a, b)]
zip [Stream m a]
input [Processor m a (po, b)]
ps) (((Stream m a, Processor m a (po, b)) -> Stream m (po, b))
-> [Stream m (po, b)])
-> ((Stream m a, Processor m a (po, b)) -> Stream m (po, b))
-> [Stream m (po, b)]
forall a b. (a -> b) -> a -> b
$ \(Stream m a
input, Processor m a (po, b)
p) ->
Processor m a (po, b) -> Stream m a -> Stream m (po, b)
forall (m :: * -> *) a b.
Processor m a b -> Stream m a -> Stream m b
runProcessor Processor m a (po, b)
p Stream m a
input
output :: Stream m b
output = so -> [Stream m (po, b)] -> Stream m b
forall (m :: * -> *) s p a.
(MonadDES m, PriorityQueueStrategy m s p) =>
s -> [Stream m (p, a)] -> Stream m a
concatPriorityStreams so
so [Stream m (po, b)]
results
Stream m b -> Process m (b, Stream m b)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m b
output
processorPrioritisingInputParallel :: (MonadDES m,
PriorityQueueStrategy m si pi,
EnqueueStrategy m so)
=> si
-> so
-> [(Stream m pi, Processor m a b)]
-> Processor m a b
{-# INLINABLE processorPrioritisingInputParallel #-}
processorPrioritisingInputParallel :: si -> so -> [(Stream m pi, Processor m a b)] -> Processor m a b
processorPrioritisingInputParallel si
si so
so [(Stream m pi, Processor m a b)]
ps =
(Stream m a -> Stream m b) -> Processor m a b
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor ((Stream m a -> Stream m b) -> Processor m a b)
-> (Stream m a -> Stream m b) -> Processor m a b
forall a b. (a -> b) -> a -> b
$ \Stream m a
xs ->
Process m (b, Stream m b) -> Stream m b
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (b, Stream m b) -> Stream m b)
-> Process m (b, Stream m b) -> Stream m b
forall a b. (a -> b) -> a -> b
$
do [Stream m a]
input <- Simulation m [Stream m a] -> Process m [Stream m a]
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m [Stream m a] -> Process m [Stream m a])
-> Simulation m [Stream m a] -> Process m [Stream m a]
forall a b. (a -> b) -> a -> b
$ si -> [Stream m pi] -> Stream m a -> Simulation m [Stream m a]
forall (m :: * -> *) s p a.
(MonadDES m, PriorityQueueStrategy m s p) =>
s -> [Stream m p] -> Stream m a -> Simulation m [Stream m a]
splitStreamPrioritising si
si (((Stream m pi, Processor m a b) -> Stream m pi)
-> [(Stream m pi, Processor m a b)] -> [Stream m pi]
forall a b. (a -> b) -> [a] -> [b]
map (Stream m pi, Processor m a b) -> Stream m pi
forall a b. (a, b) -> a
fst [(Stream m pi, Processor m a b)]
ps) Stream m a
xs
let results :: [Stream m b]
results = (((Stream m a, (Stream m pi, Processor m a b)) -> Stream m b)
-> [(Stream m a, (Stream m pi, Processor m a b))] -> [Stream m b])
-> [(Stream m a, (Stream m pi, Processor m a b))]
-> ((Stream m a, (Stream m pi, Processor m a b)) -> Stream m b)
-> [Stream m b]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((Stream m a, (Stream m pi, Processor m a b)) -> Stream m b)
-> [(Stream m a, (Stream m pi, Processor m a b))] -> [Stream m b]
forall a b. (a -> b) -> [a] -> [b]
map ([Stream m a]
-> [(Stream m pi, Processor m a b)]
-> [(Stream m a, (Stream m pi, Processor m a b))]
forall a b. [a] -> [b] -> [(a, b)]
zip [Stream m a]
input [(Stream m pi, Processor m a b)]
ps) (((Stream m a, (Stream m pi, Processor m a b)) -> Stream m b)
-> [Stream m b])
-> ((Stream m a, (Stream m pi, Processor m a b)) -> Stream m b)
-> [Stream m b]
forall a b. (a -> b) -> a -> b
$ \(Stream m a
input, (Stream m pi
_, Processor m a b
p)) ->
Processor m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Processor m a b -> Stream m a -> Stream m b
runProcessor Processor m a b
p Stream m a
input
output :: Stream m b
output = so -> [Stream m b] -> Stream m b
forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
s -> [Stream m a] -> Stream m a
concatQueuedStreams so
so [Stream m b]
results
Stream m b -> Process m (b, Stream m b)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m b
output
processorPrioritisingInputOutputParallel :: (MonadDES m,
PriorityQueueStrategy m si pi,
PriorityQueueStrategy m so po)
=> si
-> so
-> [(Stream m pi, Processor m a (po, b))]
-> Processor m a b
{-# INLINABLE processorPrioritisingInputOutputParallel #-}
processorPrioritisingInputOutputParallel :: si
-> so -> [(Stream m pi, Processor m a (po, b))] -> Processor m a b
processorPrioritisingInputOutputParallel si
si so
so [(Stream m pi, Processor m a (po, b))]
ps =
(Stream m a -> Stream m b) -> Processor m a b
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor ((Stream m a -> Stream m b) -> Processor m a b)
-> (Stream m a -> Stream m b) -> Processor m a b
forall a b. (a -> b) -> a -> b
$ \Stream m a
xs ->
Process m (b, Stream m b) -> Stream m b
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (b, Stream m b) -> Stream m b)
-> Process m (b, Stream m b) -> Stream m b
forall a b. (a -> b) -> a -> b
$
do [Stream m a]
input <- Simulation m [Stream m a] -> Process m [Stream m a]
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m [Stream m a] -> Process m [Stream m a])
-> Simulation m [Stream m a] -> Process m [Stream m a]
forall a b. (a -> b) -> a -> b
$ si -> [Stream m pi] -> Stream m a -> Simulation m [Stream m a]
forall (m :: * -> *) s p a.
(MonadDES m, PriorityQueueStrategy m s p) =>
s -> [Stream m p] -> Stream m a -> Simulation m [Stream m a]
splitStreamPrioritising si
si (((Stream m pi, Processor m a (po, b)) -> Stream m pi)
-> [(Stream m pi, Processor m a (po, b))] -> [Stream m pi]
forall a b. (a -> b) -> [a] -> [b]
map (Stream m pi, Processor m a (po, b)) -> Stream m pi
forall a b. (a, b) -> a
fst [(Stream m pi, Processor m a (po, b))]
ps) Stream m a
xs
let results :: [Stream m (po, b)]
results = (((Stream m a, (Stream m pi, Processor m a (po, b)))
-> Stream m (po, b))
-> [(Stream m a, (Stream m pi, Processor m a (po, b)))]
-> [Stream m (po, b)])
-> [(Stream m a, (Stream m pi, Processor m a (po, b)))]
-> ((Stream m a, (Stream m pi, Processor m a (po, b)))
-> Stream m (po, b))
-> [Stream m (po, b)]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((Stream m a, (Stream m pi, Processor m a (po, b)))
-> Stream m (po, b))
-> [(Stream m a, (Stream m pi, Processor m a (po, b)))]
-> [Stream m (po, b)]
forall a b. (a -> b) -> [a] -> [b]
map ([Stream m a]
-> [(Stream m pi, Processor m a (po, b))]
-> [(Stream m a, (Stream m pi, Processor m a (po, b)))]
forall a b. [a] -> [b] -> [(a, b)]
zip [Stream m a]
input [(Stream m pi, Processor m a (po, b))]
ps) (((Stream m a, (Stream m pi, Processor m a (po, b)))
-> Stream m (po, b))
-> [Stream m (po, b)])
-> ((Stream m a, (Stream m pi, Processor m a (po, b)))
-> Stream m (po, b))
-> [Stream m (po, b)]
forall a b. (a -> b) -> a -> b
$ \(Stream m a
input, (Stream m pi
_, Processor m a (po, b)
p)) ->
Processor m a (po, b) -> Stream m a -> Stream m (po, b)
forall (m :: * -> *) a b.
Processor m a b -> Stream m a -> Stream m b
runProcessor Processor m a (po, b)
p Stream m a
input
output :: Stream m b
output = so -> [Stream m (po, b)] -> Stream m b
forall (m :: * -> *) s p a.
(MonadDES m, PriorityQueueStrategy m s p) =>
s -> [Stream m (p, a)] -> Stream m a
concatPriorityStreams so
so [Stream m (po, b)]
results
Stream m b -> Process m (b, Stream m b)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m b
output
processorParallel :: MonadDES m => [Processor m a b] -> Processor m a b
{-# INLINABLE processorParallel #-}
processorParallel :: [Processor m a b] -> Processor m a b
processorParallel = FCFS -> FCFS -> [Processor m a b] -> Processor m a b
forall (m :: * -> *) si so a b.
(MonadDES m, EnqueueStrategy m si, EnqueueStrategy m so) =>
si -> so -> [Processor m a b] -> Processor m a b
processorQueuedParallel FCFS
FCFS FCFS
FCFS
processorSeq :: MonadDES m => [Processor m a a] -> Processor m a a
{-# INLINABLE processorSeq #-}
processorSeq :: [Processor m a a] -> Processor m a a
processorSeq [] = Processor m a a
forall (m :: * -> *) a b. MonadDES m => Processor m a b
emptyProcessor
processorSeq [Processor m a a
p] = Processor m a a
p
processorSeq (Processor m a a
p : [Processor m a a]
ps) = Processor m a a
p Processor m a a -> Processor m a a -> Processor m a a
forall k (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> Processor m a a
forall (m :: * -> *) a. MonadDES m => Processor m a a
prefetchProcessor Processor m a a -> Processor m a a -> Processor m a a
forall k (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> [Processor m a a] -> Processor m a a
forall (m :: * -> *) a.
MonadDES m =>
[Processor m a a] -> Processor m a a
processorSeq [Processor m a a]
ps
bufferProcessor :: MonadDES m
=> (Stream m a -> Process m ())
-> Stream m b
-> Processor m a b
{-# INLINABLE bufferProcessor #-}
bufferProcessor :: (Stream m a -> Process m ()) -> Stream m b -> Processor m a b
bufferProcessor Stream m a -> Process m ()
consume Stream m b
output =
(Stream m a -> Stream m b) -> Processor m a b
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor ((Stream m a -> Stream m b) -> Processor m a b)
-> (Stream m a -> Stream m b) -> Processor m a b
forall a b. (a -> b) -> a -> b
$ \Stream m a
xs ->
Process m (b, Stream m b) -> Stream m b
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (b, Stream m b) -> Stream m b)
-> Process m (b, Stream m b) -> Stream m b
forall a b. (a -> b) -> a -> b
$
do Process m () -> Process m ()
forall (m :: * -> *). MonadDES m => Process m () -> Process m ()
spawnProcess (Stream m a -> Process m ()
consume Stream m a
xs)
Stream m b -> Process m (b, Stream m b)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m b
output
bufferProcessorLoop :: MonadDES m
=> (Stream m a -> Stream m c -> Process m ())
-> Stream m d
-> Processor m d (Either e b)
-> Processor m e c
-> Processor m a b
{-# INLINABLE bufferProcessorLoop #-}
bufferProcessorLoop :: (Stream m a -> Stream m c -> Process m ())
-> Stream m d
-> Processor m d (Either e b)
-> Processor m e c
-> Processor m a b
bufferProcessorLoop Stream m a -> Stream m c -> Process m ()
consume Stream m d
preoutput Processor m d (Either e b)
cond Processor m e c
body =
(Stream m a -> Stream m b) -> Processor m a b
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor ((Stream m a -> Stream m b) -> Processor m a b)
-> (Stream m a -> Stream m b) -> Processor m a b
forall a b. (a -> b) -> a -> b
$ \Stream m a
xs ->
Process m (b, Stream m b) -> Stream m b
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (b, Stream m b) -> Stream m b)
-> Process m (b, Stream m b) -> Stream m b
forall a b. (a -> b) -> a -> b
$
do (Stream m e
reverted, Stream m b
output) <-
Simulation m (Stream m e, Stream m b)
-> Process m (Stream m e, Stream m b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Stream m e, Stream m b)
-> Process m (Stream m e, Stream m b))
-> Simulation m (Stream m e, Stream m b)
-> Process m (Stream m e, Stream m b)
forall a b. (a -> b) -> a -> b
$
Stream m (Either e b) -> Simulation m (Stream m e, Stream m b)
forall (m :: * -> *) a b.
MonadDES m =>
Stream m (Either a b) -> Simulation m (Stream m a, Stream m b)
partitionEitherStream (Stream m (Either e b) -> Simulation m (Stream m e, Stream m b))
-> Stream m (Either e b) -> Simulation m (Stream m e, Stream m b)
forall a b. (a -> b) -> a -> b
$
Processor m d (Either e b) -> Stream m d -> Stream m (Either e b)
forall (m :: * -> *) a b.
Processor m a b -> Stream m a -> Stream m b
runProcessor Processor m d (Either e b)
cond Stream m d
preoutput
Process m () -> Process m ()
forall (m :: * -> *). MonadDES m => Process m () -> Process m ()
spawnProcess
(Stream m a -> Stream m c -> Process m ()
consume Stream m a
xs (Stream m c -> Process m ()) -> Stream m c -> Process m ()
forall a b. (a -> b) -> a -> b
$ Processor m e c -> Stream m e -> Stream m c
forall (m :: * -> *) a b.
Processor m a b -> Stream m a -> Stream m b
runProcessor Processor m e c
body Stream m e
reverted)
Stream m b -> Process m (b, Stream m b)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m b
output
queueProcessor :: MonadDES m =>
(a -> Process m ())
-> Process m b
-> Processor m a b
{-# INLINABLE queueProcessor #-}
queueProcessor :: (a -> Process m ()) -> Process m b -> Processor m a b
queueProcessor a -> Process m ()
enqueue Process m b
dequeue =
(Stream m a -> Process m ()) -> Stream m b -> Processor m a b
forall (m :: * -> *) a b.
MonadDES m =>
(Stream m a -> Process m ()) -> Stream m b -> Processor m a b
bufferProcessor
((a -> Process m ()) -> Stream m a -> Process m ()
forall (m :: * -> *) a.
MonadDES m =>
(a -> Process m ()) -> Stream m a -> Process m ()
consumeStream a -> Process m ()
enqueue)
(Process m b -> Stream m b
forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess Process m b
dequeue)
queueProcessorLoopMerging :: MonadDES m
=> (Stream m a -> Stream m d -> Stream m e)
-> (e -> Process m ())
-> Process m c
-> Processor m c (Either f b)
-> Processor m f d
-> Processor m a b
{-# INLINABLE queueProcessorLoopMerging #-}
queueProcessorLoopMerging :: (Stream m a -> Stream m d -> Stream m e)
-> (e -> Process m ())
-> Process m c
-> Processor m c (Either f b)
-> Processor m f d
-> Processor m a b
queueProcessorLoopMerging Stream m a -> Stream m d -> Stream m e
merge e -> Process m ()
enqueue Process m c
dequeue =
(Stream m a -> Stream m d -> Process m ())
-> Stream m c
-> Processor m c (Either f b)
-> Processor m f d
-> Processor m a b
forall (m :: * -> *) a c d e b.
MonadDES m =>
(Stream m a -> Stream m c -> Process m ())
-> Stream m d
-> Processor m d (Either e b)
-> Processor m e c
-> Processor m a b
bufferProcessorLoop
(\Stream m a
bs Stream m d
cs ->
(e -> Process m ()) -> Stream m e -> Process m ()
forall (m :: * -> *) a.
MonadDES m =>
(a -> Process m ()) -> Stream m a -> Process m ()
consumeStream e -> Process m ()
enqueue (Stream m e -> Process m ()) -> Stream m e -> Process m ()
forall a b. (a -> b) -> a -> b
$
Stream m a -> Stream m d -> Stream m e
merge Stream m a
bs Stream m d
cs)
(Process m c -> Stream m c
forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess Process m c
dequeue)
queueProcessorLoopSeq :: MonadDES m
=> (a -> Process m ())
-> Process m c
-> Processor m c (Either e b)
-> Processor m e a
-> Processor m a b
{-# INLINABLE queueProcessorLoopSeq #-}
queueProcessorLoopSeq :: (a -> Process m ())
-> Process m c
-> Processor m c (Either e b)
-> Processor m e a
-> Processor m a b
queueProcessorLoopSeq =
(Stream m a -> Stream m a -> Stream m a)
-> (a -> Process m ())
-> Process m c
-> Processor m c (Either e b)
-> Processor m e a
-> Processor m a b
forall (m :: * -> *) a d e c f b.
MonadDES m =>
(Stream m a -> Stream m d -> Stream m e)
-> (e -> Process m ())
-> Process m c
-> Processor m c (Either f b)
-> Processor m f d
-> Processor m a b
queueProcessorLoopMerging Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Stream m a -> Stream m a
mergeStreams
queueProcessorLoopParallel :: MonadDES m
=> (a -> Process m ())
-> Process m c
-> Processor m c (Either e b)
-> Processor m e a
-> Processor m a b
{-# INLINABLE queueProcessorLoopParallel #-}
queueProcessorLoopParallel :: (a -> Process m ())
-> Process m c
-> Processor m c (Either e b)
-> Processor m e a
-> Processor m a b
queueProcessorLoopParallel a -> Process m ()
enqueue Process m c
dequeue =
(Stream m a -> Stream m a -> Process m ())
-> Stream m c
-> Processor m c (Either e b)
-> Processor m e a
-> Processor m a b
forall (m :: * -> *) a c d e b.
MonadDES m =>
(Stream m a -> Stream m c -> Process m ())
-> Stream m d
-> Processor m d (Either e b)
-> Processor m e c
-> Processor m a b
bufferProcessorLoop
(\Stream m a
bs Stream m a
cs ->
do Process m () -> Process m ()
forall (m :: * -> *). MonadDES m => Process m () -> Process m ()
spawnProcess (Process m () -> Process m ()) -> Process m () -> Process m ()
forall a b. (a -> b) -> a -> b
$
(a -> Process m ()) -> Stream m a -> Process m ()
forall (m :: * -> *) a.
MonadDES m =>
(a -> Process m ()) -> Stream m a -> Process m ()
consumeStream a -> Process m ()
enqueue Stream m a
bs
Process m () -> Process m ()
forall (m :: * -> *). MonadDES m => Process m () -> Process m ()
spawnProcess (Process m () -> Process m ()) -> Process m () -> Process m ()
forall a b. (a -> b) -> a -> b
$
(a -> Process m ()) -> Stream m a -> Process m ()
forall (m :: * -> *) a.
MonadDES m =>
(a -> Process m ()) -> Stream m a -> Process m ()
consumeStream a -> Process m ()
enqueue Stream m a
cs)
(Process m c -> Stream m c
forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess Process m c
dequeue)
prefetchProcessor :: MonadDES m => Processor m a a
{-# INLINABLE prefetchProcessor #-}
prefetchProcessor :: Processor m a a
prefetchProcessor = (Stream m a -> Stream m a) -> Processor m a a
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor Stream m a -> Stream m a
forall (m :: * -> *) a. MonadDES m => Stream m a -> Stream m a
prefetchStream
channelProcessor :: MonadDES m => Channel m a b -> Processor m a b
{-# INLINABLE channelProcessor #-}
channelProcessor :: Channel m a b -> Processor m a b
channelProcessor Channel m a b
f =
(Stream m a -> Stream m b) -> Processor m a b
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor ((Stream m a -> Stream m b) -> Processor m a b)
-> (Stream m a -> Stream m b) -> Processor m a b
forall a b. (a -> b) -> a -> b
$ \Stream m a
xs ->
Process m (b, Stream m b) -> Stream m b
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (b, Stream m b) -> Stream m b)
-> Process m (b, Stream m b) -> Stream m b
forall a b. (a -> b) -> a -> b
$
do let composite :: Composite m (Stream m b)
composite =
do Signal m a
sa <- Stream m a -> Composite m (Signal m a)
forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Composite m (Signal m a)
streamSignal Stream m a
xs
Signal m b
sb <- Channel m a b -> Signal m a -> Composite m (Signal m b)
forall (m :: * -> *) a b.
Channel m a b -> Signal m a -> Composite m (Signal m b)
runChannel Channel m a b
f Signal m a
sa
Signal m b -> Composite m (Stream m b)
forall (m :: * -> *) a.
MonadDES m =>
Signal m a -> Composite m (Stream m a)
signalStream Signal m b
sb
(Stream m b
ys, DisposableEvent m
h) <- Event m (Stream m b, DisposableEvent m)
-> Process m (Stream m b, DisposableEvent m)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m (Stream m b, DisposableEvent m)
-> Process m (Stream m b, DisposableEvent m))
-> Event m (Stream m b, DisposableEvent m)
-> Process m (Stream m b, DisposableEvent m)
forall a b. (a -> b) -> a -> b
$
Composite m (Stream m b)
-> DisposableEvent m -> Event m (Stream m b, DisposableEvent m)
forall (m :: * -> *) a.
Composite m a
-> DisposableEvent m -> Event m (a, DisposableEvent m)
runComposite Composite m (Stream m b)
composite DisposableEvent m
forall a. Monoid a => a
mempty
Event m () -> Process m ()
forall (m :: * -> *). MonadDES m => Event m () -> Process m ()
whenCancellingProcess (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$
DisposableEvent m -> Event m ()
forall (m :: * -> *). DisposableEvent m -> Event m ()
disposeEvent DisposableEvent m
h
Stream m b -> Process m (b, Stream m b)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m b
ys
processorChannel :: MonadDES m => Processor m a b -> Channel m a b
{-# INLINABLE processorChannel #-}
processorChannel :: Processor m a b -> Channel m a b
processorChannel (Processor Stream m a -> Stream m b
f) =
(Signal m a -> Composite m (Signal m b)) -> Channel m a b
forall (m :: * -> *) a b.
(Signal m a -> Composite m (Signal m b)) -> Channel m a b
Channel ((Signal m a -> Composite m (Signal m b)) -> Channel m a b)
-> (Signal m a -> Composite m (Signal m b)) -> Channel m a b
forall a b. (a -> b) -> a -> b
$ \Signal m a
sa ->
do Stream m a
xs <- Signal m a -> Composite m (Stream m a)
forall (m :: * -> *) a.
MonadDES m =>
Signal m a -> Composite m (Stream m a)
signalStream Signal m a
sa
let ys :: Stream m b
ys = Stream m a -> Stream m b
f Stream m a
xs
Stream m b -> Composite m (Signal m b)
forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Composite m (Signal m a)
streamSignal Stream m b
ys
queuedChannelProcessor :: MonadDES m
=> (b -> Event m ())
-> Process m b
-> Channel m a b
-> Processor m a b
{-# INLINABLE queuedChannelProcessor #-}
queuedChannelProcessor :: (b -> Event m ())
-> Process m b -> Channel m a b -> Processor m a b
queuedChannelProcessor b -> Event m ()
enqueue Process m b
dequeue Channel m a b
f =
(Stream m a -> Stream m b) -> Processor m a b
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor ((Stream m a -> Stream m b) -> Processor m a b)
-> (Stream m a -> Stream m b) -> Processor m a b
forall a b. (a -> b) -> a -> b
$ \Stream m a
xs ->
Process m (b, Stream m b) -> Stream m b
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (b, Stream m b) -> Stream m b)
-> Process m (b, Stream m b) -> Stream m b
forall a b. (a -> b) -> a -> b
$
do let composite :: Composite m (Stream m b)
composite =
do Signal m a
sa <- Stream m a -> Composite m (Signal m a)
forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Composite m (Signal m a)
streamSignal Stream m a
xs
Signal m b
sb <- Channel m a b -> Signal m a -> Composite m (Signal m b)
forall (m :: * -> *) a b.
Channel m a b -> Signal m a -> Composite m (Signal m b)
runChannel Channel m a b
f Signal m a
sa
(b -> Event m ())
-> Process m b -> Signal m b -> Composite m (Stream m b)
forall (m :: * -> *) a.
MonadDES m =>
(a -> Event m ())
-> Process m a -> Signal m a -> Composite m (Stream m a)
queuedSignalStream b -> Event m ()
enqueue Process m b
dequeue Signal m b
sb
(Stream m b
ys, DisposableEvent m
h) <- Event m (Stream m b, DisposableEvent m)
-> Process m (Stream m b, DisposableEvent m)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m (Stream m b, DisposableEvent m)
-> Process m (Stream m b, DisposableEvent m))
-> Event m (Stream m b, DisposableEvent m)
-> Process m (Stream m b, DisposableEvent m)
forall a b. (a -> b) -> a -> b
$
Composite m (Stream m b)
-> DisposableEvent m -> Event m (Stream m b, DisposableEvent m)
forall (m :: * -> *) a.
Composite m a
-> DisposableEvent m -> Event m (a, DisposableEvent m)
runComposite Composite m (Stream m b)
composite DisposableEvent m
forall a. Monoid a => a
mempty
Event m () -> Process m ()
forall (m :: * -> *). MonadDES m => Event m () -> Process m ()
whenCancellingProcess (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$
DisposableEvent m -> Event m ()
forall (m :: * -> *). DisposableEvent m -> Event m ()
disposeEvent DisposableEvent m
h
Stream m b -> Process m (b, Stream m b)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m b
ys
queuedProcessorChannel :: MonadDES m =>
(a -> Event m ())
-> (Process m a)
-> Processor m a b
-> Channel m a b
{-# INLINABLE queuedProcessorChannel #-}
queuedProcessorChannel :: (a -> Event m ())
-> Process m a -> Processor m a b -> Channel m a b
queuedProcessorChannel a -> Event m ()
enqueue Process m a
dequeue (Processor Stream m a -> Stream m b
f) =
(Signal m a -> Composite m (Signal m b)) -> Channel m a b
forall (m :: * -> *) a b.
(Signal m a -> Composite m (Signal m b)) -> Channel m a b
Channel ((Signal m a -> Composite m (Signal m b)) -> Channel m a b)
-> (Signal m a -> Composite m (Signal m b)) -> Channel m a b
forall a b. (a -> b) -> a -> b
$ \Signal m a
sa ->
do Stream m a
xs <- (a -> Event m ())
-> Process m a -> Signal m a -> Composite m (Stream m a)
forall (m :: * -> *) a.
MonadDES m =>
(a -> Event m ())
-> Process m a -> Signal m a -> Composite m (Stream m a)
queuedSignalStream a -> Event m ()
enqueue Process m a
dequeue Signal m a
sa
let ys :: Stream m b
ys = Stream m a -> Stream m b
f Stream m a
xs
Stream m b -> Composite m (Signal m b)
forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Composite m (Signal m a)
streamSignal Stream m b
ys
arrivalProcessor :: MonadDES m => Processor m a (Arrival a)
{-# INLINABLE arrivalProcessor #-}
arrivalProcessor :: Processor m a (Arrival a)
arrivalProcessor = (Stream m a -> Stream m (Arrival a)) -> Processor m a (Arrival a)
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor Stream m a -> Stream m (Arrival a)
forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Stream m (Arrival a)
arrivalStream
delayProcessor :: MonadDES m => a -> Processor m a a
{-# INLINABLE delayProcessor #-}
delayProcessor :: a -> Processor m a a
delayProcessor a
a0 = (Stream m a -> Stream m a) -> Processor m a a
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor ((Stream m a -> Stream m a) -> Processor m a a)
-> (Stream m a -> Stream m a) -> Processor m a a
forall a b. (a -> b) -> a -> b
$ a -> Stream m a -> Stream m a
forall (m :: * -> *) a. MonadDES m => a -> Stream m a -> Stream m a
delayStream a
a0
joinProcessor :: MonadDES m => Process m (Processor m a b) -> Processor m a b
{-# INLINABLE joinProcessor #-}
joinProcessor :: Process m (Processor m a b) -> Processor m a b
joinProcessor Process m (Processor m a b)
m =
(Stream m a -> Stream m b) -> Processor m a b
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor ((Stream m a -> Stream m b) -> Processor m a b)
-> (Stream m a -> Stream m b) -> Processor m a b
forall a b. (a -> b) -> a -> b
$ \Stream m a
xs ->
Process m (b, Stream m b) -> Stream m b
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (b, Stream m b) -> Stream m b)
-> Process m (b, Stream m b) -> Stream m b
forall a b. (a -> b) -> a -> b
$
do Processor Stream m a -> Stream m b
f <- Process m (Processor m a b)
m
Stream m b -> Process m (b, Stream m b)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream (Stream m b -> Process m (b, Stream m b))
-> Stream m b -> Process m (b, Stream m b)
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m b
f Stream m a
xs
failoverProcessor :: MonadDES m => [Processor m a b] -> Processor m a b
{-# INLINABLE failoverProcessor #-}
failoverProcessor :: [Processor m a b] -> Processor m a b
failoverProcessor [Processor m a b]
ps =
(Stream m a -> Stream m b) -> Processor m a b
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor ((Stream m a -> Stream m b) -> Processor m a b)
-> (Stream m a -> Stream m b) -> Processor m a b
forall a b. (a -> b) -> a -> b
$ \Stream m a
xs -> [Stream m b] -> Stream m b
forall (m :: * -> *) a. MonadDES m => [Stream m a] -> Stream m a
failoverStream [Processor m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Processor m a b -> Stream m a -> Stream m b
runProcessor Processor m a b
p Stream m a
xs | Processor m a b
p <- [Processor m a b]
ps]
traceProcessor :: MonadDES m
=> Maybe String
-> Maybe String
-> Processor m a b
-> Processor m a b
{-# INLINABLE traceProcessor #-}
traceProcessor :: Maybe String -> Maybe String -> Processor m a b -> Processor m a b
traceProcessor Maybe String
request Maybe String
response (Processor Stream m a -> Stream m b
f) =
(Stream m a -> Stream m b) -> Processor m a b
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor ((Stream m a -> Stream m b) -> Processor m a b)
-> (Stream m a -> Stream m b) -> Processor m a b
forall a b. (a -> b) -> a -> b
$ Maybe String -> Maybe String -> Stream m b -> Stream m b
forall (m :: * -> *) a.
MonadDES m =>
Maybe String -> Maybe String -> Stream m a -> Stream m a
traceStream Maybe String
request Maybe String
response (Stream m b -> Stream m b)
-> (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m a -> Stream m b
f