module Simulation.Aivika.Processor
(
Processor(..),
emptyProcessor,
arrProcessor,
accumProcessor,
withinProcessor,
processorUsingId,
prefetchProcessor,
delayProcessor,
bufferProcessor,
bufferProcessorLoop,
queueProcessor,
queueProcessorLoopMerging,
queueProcessorLoopSeq,
queueProcessorLoopParallel,
processorSeq,
processorParallel,
processorQueuedParallel,
processorPrioritisingOutputParallel,
processorPrioritisingInputParallel,
processorPrioritisingInputOutputParallel,
arrivalProcessor,
joinProcessor,
failoverProcessor,
channelProcessor,
processorChannel,
queuedChannelProcessor,
queuedProcessorChannel,
traceProcessor) where
import qualified Control.Category as C
import Control.Arrow
import Data.Monoid
import Simulation.Aivika.Simulation
import Simulation.Aivika.Dynamics
import Simulation.Aivika.Event
import Simulation.Aivika.Composite
import Simulation.Aivika.Cont
import Simulation.Aivika.Process
import Simulation.Aivika.Stream
import Simulation.Aivika.QueueStrategy
import Simulation.Aivika.Signal
import Simulation.Aivika.Channel
import Simulation.Aivika.Internal.Arrival
newtype Processor a b =
Processor { Processor a b -> Stream a -> Stream b
runProcessor :: Stream a -> Stream b
}
instance C.Category Processor where
id :: Processor a a
id = (Stream a -> Stream a) -> Processor a a
forall a b. (Stream a -> Stream b) -> Processor a b
Processor Stream a -> Stream a
forall a. a -> a
id
Processor Stream b -> Stream c
x . :: Processor b c -> Processor a b -> Processor a c
. Processor Stream a -> Stream b
y = (Stream a -> Stream c) -> Processor a c
forall a b. (Stream a -> Stream b) -> Processor a b
Processor (Stream b -> Stream c
x (Stream b -> Stream c)
-> (Stream a -> Stream b) -> Stream a -> Stream c
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream a -> Stream b
y)
instance Arrow Processor where
arr :: (b -> c) -> Processor b c
arr = (Stream b -> Stream c) -> Processor b c
forall a b. (Stream a -> Stream b) -> Processor a b
Processor ((Stream b -> Stream c) -> Processor b c)
-> ((b -> c) -> Stream b -> Stream c) -> (b -> c) -> Processor b c
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (b -> c) -> Stream b -> Stream c
forall a b. (a -> b) -> Stream a -> Stream b
mapStream
first :: Processor b c -> Processor (b, d) (c, d)
first (Processor Stream b -> Stream c
f) =
(Stream (b, d) -> Stream (c, d)) -> Processor (b, d) (c, d)
forall a b. (Stream a -> Stream b) -> Processor a b
Processor ((Stream (b, d) -> Stream (c, d)) -> Processor (b, d) (c, d))
-> (Stream (b, d) -> Stream (c, d)) -> Processor (b, d) (c, d)
forall a b. (a -> b) -> a -> b
$ \Stream (b, d)
xys ->
Process ((c, d), Stream (c, d)) -> Stream (c, d)
forall a. Process (a, Stream a) -> Stream a
Cons (Process ((c, d), Stream (c, d)) -> Stream (c, d))
-> Process ((c, d), Stream (c, d)) -> Stream (c, d)
forall a b. (a -> b) -> a -> b
$
do (Stream b
xs, Stream d
ys) <- Simulation (Stream b, Stream d) -> Process (Stream b, Stream d)
forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation (Simulation (Stream b, Stream d) -> Process (Stream b, Stream d))
-> Simulation (Stream b, Stream d) -> Process (Stream b, Stream d)
forall a b. (a -> b) -> a -> b
$ Stream (b, d) -> Simulation (Stream b, Stream d)
forall a b. Stream (a, b) -> Simulation (Stream a, Stream b)
unzipStream Stream (b, d)
xys
Stream (c, d) -> Process ((c, d), Stream (c, d))
forall a. Stream a -> Process (a, Stream a)
runStream (Stream (c, d) -> Process ((c, d), Stream (c, d)))
-> Stream (c, d) -> Process ((c, d), Stream (c, d))
forall a b. (a -> b) -> a -> b
$ Stream c -> Stream d -> Stream (c, d)
forall a b. Stream a -> Stream b -> Stream (a, b)
zipStreamParallel (Stream b -> Stream c
f Stream b
xs) Stream d
ys
second :: Processor b c -> Processor (d, b) (d, c)
second (Processor Stream b -> Stream c
f) =
(Stream (d, b) -> Stream (d, c)) -> Processor (d, b) (d, c)
forall a b. (Stream a -> Stream b) -> Processor a b
Processor ((Stream (d, b) -> Stream (d, c)) -> Processor (d, b) (d, c))
-> (Stream (d, b) -> Stream (d, c)) -> Processor (d, b) (d, c)
forall a b. (a -> b) -> a -> b
$ \Stream (d, b)
xys ->
Process ((d, c), Stream (d, c)) -> Stream (d, c)
forall a. Process (a, Stream a) -> Stream a
Cons (Process ((d, c), Stream (d, c)) -> Stream (d, c))
-> Process ((d, c), Stream (d, c)) -> Stream (d, c)
forall a b. (a -> b) -> a -> b
$
do (Stream d
xs, Stream b
ys) <- Simulation (Stream d, Stream b) -> Process (Stream d, Stream b)
forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation (Simulation (Stream d, Stream b) -> Process (Stream d, Stream b))
-> Simulation (Stream d, Stream b) -> Process (Stream d, Stream b)
forall a b. (a -> b) -> a -> b
$ Stream (d, b) -> Simulation (Stream d, Stream b)
forall a b. Stream (a, b) -> Simulation (Stream a, Stream b)
unzipStream Stream (d, b)
xys
Stream (d, c) -> Process ((d, c), Stream (d, c))
forall a. Stream a -> Process (a, Stream a)
runStream (Stream (d, c) -> Process ((d, c), Stream (d, c)))
-> Stream (d, c) -> Process ((d, c), Stream (d, c))
forall a b. (a -> b) -> a -> b
$ Stream d -> Stream c -> Stream (d, c)
forall a b. Stream a -> Stream b -> Stream (a, b)
zipStreamParallel Stream d
xs (Stream b -> Stream c
f Stream b
ys)
Processor Stream b -> Stream c
f *** :: Processor b c -> Processor b' c' -> Processor (b, b') (c, c')
*** Processor Stream b' -> Stream c'
g =
(Stream (b, b') -> Stream (c, c')) -> Processor (b, b') (c, c')
forall a b. (Stream a -> Stream b) -> Processor a b
Processor ((Stream (b, b') -> Stream (c, c')) -> Processor (b, b') (c, c'))
-> (Stream (b, b') -> Stream (c, c')) -> Processor (b, b') (c, c')
forall a b. (a -> b) -> a -> b
$ \Stream (b, b')
xys ->
Process ((c, c'), Stream (c, c')) -> Stream (c, c')
forall a. Process (a, Stream a) -> Stream a
Cons (Process ((c, c'), Stream (c, c')) -> Stream (c, c'))
-> Process ((c, c'), Stream (c, c')) -> Stream (c, c')
forall a b. (a -> b) -> a -> b
$
do (Stream b
xs, Stream b'
ys) <- Simulation (Stream b, Stream b') -> Process (Stream b, Stream b')
forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation (Simulation (Stream b, Stream b') -> Process (Stream b, Stream b'))
-> Simulation (Stream b, Stream b')
-> Process (Stream b, Stream b')
forall a b. (a -> b) -> a -> b
$ Stream (b, b') -> Simulation (Stream b, Stream b')
forall a b. Stream (a, b) -> Simulation (Stream a, Stream b)
unzipStream Stream (b, b')
xys
Stream (c, c') -> Process ((c, c'), Stream (c, c'))
forall a. Stream a -> Process (a, Stream a)
runStream (Stream (c, c') -> Process ((c, c'), Stream (c, c')))
-> Stream (c, c') -> Process ((c, c'), Stream (c, c'))
forall a b. (a -> b) -> a -> b
$ Stream c -> Stream c' -> Stream (c, c')
forall a b. Stream a -> Stream b -> Stream (a, b)
zipStreamParallel (Stream b -> Stream c
f Stream b
xs) (Stream b' -> Stream c'
g Stream b'
ys)
Processor Stream b -> Stream c
f &&& :: Processor b c -> Processor b c' -> Processor b (c, c')
&&& Processor Stream b -> Stream c'
g =
(Stream b -> Stream (c, c')) -> Processor b (c, c')
forall a b. (Stream a -> Stream b) -> Processor a b
Processor ((Stream b -> Stream (c, c')) -> Processor b (c, c'))
-> (Stream b -> Stream (c, c')) -> Processor b (c, c')
forall a b. (a -> b) -> a -> b
$ \Stream b
xs -> Stream c -> Stream c' -> Stream (c, c')
forall a b. Stream a -> Stream b -> Stream (a, b)
zipStreamParallel (Stream b -> Stream c
f Stream b
xs) (Stream b -> Stream c'
g Stream b
xs)
instance ArrowChoice Processor where
left :: Processor b c -> Processor (Either b d) (Either c d)
left (Processor Stream b -> Stream c
f) =
(Stream (Either b d) -> Stream (Either c d))
-> Processor (Either b d) (Either c d)
forall a b. (Stream a -> Stream b) -> Processor a b
Processor ((Stream (Either b d) -> Stream (Either c d))
-> Processor (Either b d) (Either c d))
-> (Stream (Either b d) -> Stream (Either c d))
-> Processor (Either b d) (Either c d)
forall a b. (a -> b) -> a -> b
$ \Stream (Either b d)
xs ->
Process (Either c d, Stream (Either c d)) -> Stream (Either c d)
forall a. Process (a, Stream a) -> Stream a
Cons (Process (Either c d, Stream (Either c d)) -> Stream (Either c d))
-> Process (Either c d, Stream (Either c d)) -> Stream (Either c d)
forall a b. (a -> b) -> a -> b
$
do Stream (Either b d)
ys <- Simulation (Stream (Either b d)) -> Process (Stream (Either b d))
forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation (Simulation (Stream (Either b d)) -> Process (Stream (Either b d)))
-> Simulation (Stream (Either b d))
-> Process (Stream (Either b d))
forall a b. (a -> b) -> a -> b
$ Stream (Either b d) -> Simulation (Stream (Either b d))
forall a. Stream a -> Simulation (Stream a)
memoStream Stream (Either b d)
xs
Stream (Either c d) -> Process (Either c d, Stream (Either c d))
forall a. Stream a -> Process (a, Stream a)
runStream (Stream (Either c d) -> Process (Either c d, Stream (Either c d)))
-> Stream (Either c d) -> Process (Either c d, Stream (Either c d))
forall a b. (a -> b) -> a -> b
$ Stream (Either b d) -> Stream c -> Stream (Either c d)
forall a b c.
Stream (Either a b) -> Stream c -> Stream (Either c b)
replaceLeftStream Stream (Either b d)
ys (Stream b -> Stream c
f (Stream b -> Stream c) -> Stream b -> Stream c
forall a b. (a -> b) -> a -> b
$ Stream (Either b d) -> Stream b
forall a b. Stream (Either a b) -> Stream a
leftStream Stream (Either b d)
ys)
right :: Processor b c -> Processor (Either d b) (Either d c)
right (Processor Stream b -> Stream c
f) =
(Stream (Either d b) -> Stream (Either d c))
-> Processor (Either d b) (Either d c)
forall a b. (Stream a -> Stream b) -> Processor a b
Processor ((Stream (Either d b) -> Stream (Either d c))
-> Processor (Either d b) (Either d c))
-> (Stream (Either d b) -> Stream (Either d c))
-> Processor (Either d b) (Either d c)
forall a b. (a -> b) -> a -> b
$ \Stream (Either d b)
xs ->
Process (Either d c, Stream (Either d c)) -> Stream (Either d c)
forall a. Process (a, Stream a) -> Stream a
Cons (Process (Either d c, Stream (Either d c)) -> Stream (Either d c))
-> Process (Either d c, Stream (Either d c)) -> Stream (Either d c)
forall a b. (a -> b) -> a -> b
$
do Stream (Either d b)
ys <- Simulation (Stream (Either d b)) -> Process (Stream (Either d b))
forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation (Simulation (Stream (Either d b)) -> Process (Stream (Either d b)))
-> Simulation (Stream (Either d b))
-> Process (Stream (Either d b))
forall a b. (a -> b) -> a -> b
$ Stream (Either d b) -> Simulation (Stream (Either d b))
forall a. Stream a -> Simulation (Stream a)
memoStream Stream (Either d b)
xs
Stream (Either d c) -> Process (Either d c, Stream (Either d c))
forall a. Stream a -> Process (a, Stream a)
runStream (Stream (Either d c) -> Process (Either d c, Stream (Either d c)))
-> Stream (Either d c) -> Process (Either d c, Stream (Either d c))
forall a b. (a -> b) -> a -> b
$ Stream (Either d b) -> Stream c -> Stream (Either d c)
forall a b c.
Stream (Either a b) -> Stream c -> Stream (Either a c)
replaceRightStream Stream (Either d b)
ys (Stream b -> Stream c
f (Stream b -> Stream c) -> Stream b -> Stream c
forall a b. (a -> b) -> a -> b
$ Stream (Either d b) -> Stream b
forall a b. Stream (Either a b) -> Stream b
rightStream Stream (Either d b)
ys)
instance ArrowZero Processor where
zeroArrow :: Processor b c
zeroArrow = (Stream b -> Stream c) -> Processor b c
forall a b. (Stream a -> Stream b) -> Processor a b
Processor ((Stream b -> Stream c) -> Processor b c)
-> (Stream b -> Stream c) -> Processor b c
forall a b. (a -> b) -> a -> b
$ Stream c -> Stream b -> Stream c
forall a b. a -> b -> a
const Stream c
forall a. Stream a
emptyStream
instance ArrowPlus Processor where
(Processor Stream b -> Stream c
f) <+> :: Processor b c -> Processor b c -> Processor b c
<+> (Processor Stream b -> Stream c
g) =
(Stream b -> Stream c) -> Processor b c
forall a b. (Stream a -> Stream b) -> Processor a b
Processor ((Stream b -> Stream c) -> Processor b c)
-> (Stream b -> Stream c) -> Processor b c
forall a b. (a -> b) -> a -> b
$ \Stream b
xs ->
Process (c, Stream c) -> Stream c
forall a. Process (a, Stream a) -> Stream a
Cons (Process (c, Stream c) -> Stream c)
-> Process (c, Stream c) -> Stream c
forall a b. (a -> b) -> a -> b
$
do [Stream b
xs1, Stream b
xs2] <- Simulation [Stream b] -> Process [Stream b]
forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation (Simulation [Stream b] -> Process [Stream b])
-> Simulation [Stream b] -> Process [Stream b]
forall a b. (a -> b) -> a -> b
$ Int -> Stream b -> Simulation [Stream b]
forall a. Int -> Stream a -> Simulation [Stream a]
splitStream Int
2 Stream b
xs
Stream c -> Process (c, Stream c)
forall a. Stream a -> Process (a, Stream a)
runStream (Stream c -> Process (c, Stream c))
-> Stream c -> Process (c, Stream c)
forall a b. (a -> b) -> a -> b
$ Stream c -> Stream c -> Stream c
forall a. Stream a -> Stream a -> Stream a
mergeStreams (Stream b -> Stream c
f Stream b
xs1) (Stream b -> Stream c
g Stream b
xs2)
emptyProcessor :: Processor a b
emptyProcessor :: Processor a b
emptyProcessor = (Stream a -> Stream b) -> Processor a b
forall a b. (Stream a -> Stream b) -> Processor a b
Processor ((Stream a -> Stream b) -> Processor a b)
-> (Stream a -> Stream b) -> Processor a b
forall a b. (a -> b) -> a -> b
$ Stream b -> Stream a -> Stream b
forall a b. a -> b -> a
const Stream b
forall a. Stream a
emptyStream
arrProcessor :: (a -> Process b) -> Processor a b
arrProcessor :: (a -> Process b) -> Processor a b
arrProcessor = (Stream a -> Stream b) -> Processor a b
forall a b. (Stream a -> Stream b) -> Processor a b
Processor ((Stream a -> Stream b) -> Processor a b)
-> ((a -> Process b) -> Stream a -> Stream b)
-> (a -> Process b)
-> Processor a b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> Process b) -> Stream a -> Stream b
forall a b. (a -> Process b) -> Stream a -> Stream b
mapStreamM
accumProcessor :: (acc -> a -> Process (acc, b)) -> acc -> Processor a b
accumProcessor :: (acc -> a -> Process (acc, b)) -> acc -> Processor a b
accumProcessor acc -> a -> Process (acc, b)
f acc
acc = (Stream a -> Stream b) -> Processor a b
forall a b. (Stream a -> Stream b) -> Processor a b
Processor ((Stream a -> Stream b) -> Processor a b)
-> (Stream a -> Stream b) -> Processor a b
forall a b. (a -> b) -> a -> b
$ (acc -> a -> Process (acc, b)) -> acc -> Stream a -> Stream b
forall acc a b.
(acc -> a -> Process (acc, b)) -> acc -> Stream a -> Stream b
accumStream acc -> a -> Process (acc, b)
f acc
acc
withinProcessor :: Process () -> Processor a a
withinProcessor :: Process () -> Processor a a
withinProcessor Process ()
m =
(Stream a -> Stream a) -> Processor a a
forall a b. (Stream a -> Stream b) -> Processor a b
Processor ((Stream a -> Stream a) -> Processor a a)
-> (Stream a -> Stream a) -> Processor a a
forall a b. (a -> b) -> a -> b
$
(a -> Process a) -> Stream a -> Stream a
forall a b. (a -> Process b) -> Stream a -> Stream b
mapStreamM ((a -> Process a) -> Stream a -> Stream a)
-> (a -> Process a) -> Stream a -> Stream a
forall a b. (a -> b) -> a -> b
$ \a
a ->
do { Process ()
m; a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a }
processorUsingId :: ProcessId -> Processor a b -> Processor a b
processorUsingId :: ProcessId -> Processor a b -> Processor a b
processorUsingId ProcessId
pid (Processor Stream a -> Stream b
f) =
(Stream a -> Stream b) -> Processor a b
forall a b. (Stream a -> Stream b) -> Processor a b
Processor ((Stream a -> Stream b) -> Processor a b)
-> (Stream a -> Stream b) -> Processor a b
forall a b. (a -> b) -> a -> b
$ Process (b, Stream b) -> Stream b
forall a. Process (a, Stream a) -> Stream a
Cons (Process (b, Stream b) -> Stream b)
-> (Stream a -> Process (b, Stream b)) -> Stream a -> Stream b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessId -> Process (b, Stream b) -> Process (b, Stream b)
forall a. ProcessId -> Process a -> Process a
processUsingId ProcessId
pid (Process (b, Stream b) -> Process (b, Stream b))
-> (Stream a -> Process (b, Stream b))
-> Stream a
-> Process (b, Stream b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream b -> Process (b, Stream b)
forall a. Stream a -> Process (a, Stream a)
runStream (Stream b -> Process (b, Stream b))
-> (Stream a -> Stream b) -> Stream a -> Process (b, Stream b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream a -> Stream b
f
processorQueuedParallel :: (EnqueueStrategy si,
EnqueueStrategy so)
=> si
-> so
-> [Processor a b]
-> Processor a b
processorQueuedParallel :: si -> so -> [Processor a b] -> Processor a b
processorQueuedParallel si
si so
so [Processor a b]
ps =
(Stream a -> Stream b) -> Processor a b
forall a b. (Stream a -> Stream b) -> Processor a b
Processor ((Stream a -> Stream b) -> Processor a b)
-> (Stream a -> Stream b) -> Processor a b
forall a b. (a -> b) -> a -> b
$ \Stream a
xs ->
Process (b, Stream b) -> Stream b
forall a. Process (a, Stream a) -> Stream a
Cons (Process (b, Stream b) -> Stream b)
-> Process (b, Stream b) -> Stream b
forall a b. (a -> b) -> a -> b
$
do let n :: Int
n = [Processor a b] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Processor a b]
ps
[Stream a]
input <- Simulation [Stream a] -> Process [Stream a]
forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation (Simulation [Stream a] -> Process [Stream a])
-> Simulation [Stream a] -> Process [Stream a]
forall a b. (a -> b) -> a -> b
$ si -> Int -> Stream a -> Simulation [Stream a]
forall s a.
EnqueueStrategy s =>
s -> Int -> Stream a -> Simulation [Stream a]
splitStreamQueueing si
si Int
n Stream a
xs
let results :: [Stream b]
results = (((Stream a, Processor a b) -> Stream b)
-> [(Stream a, Processor a b)] -> [Stream b])
-> [(Stream a, Processor a b)]
-> ((Stream a, Processor a b) -> Stream b)
-> [Stream b]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((Stream a, Processor a b) -> Stream b)
-> [(Stream a, Processor a b)] -> [Stream b]
forall a b. (a -> b) -> [a] -> [b]
map ([Stream a] -> [Processor a b] -> [(Stream a, Processor a b)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Stream a]
input [Processor a b]
ps) (((Stream a, Processor a b) -> Stream b) -> [Stream b])
-> ((Stream a, Processor a b) -> Stream b) -> [Stream b]
forall a b. (a -> b) -> a -> b
$ \(Stream a
input, Processor a b
p) ->
Processor a b -> Stream a -> Stream b
forall a b. Processor a b -> Stream a -> Stream b
runProcessor Processor a b
p Stream a
input
output :: Stream b
output = so -> [Stream b] -> Stream b
forall s a. EnqueueStrategy s => s -> [Stream a] -> Stream a
concatQueuedStreams so
so [Stream b]
results
Stream b -> Process (b, Stream b)
forall a. Stream a -> Process (a, Stream a)
runStream Stream b
output
processorPrioritisingOutputParallel :: (EnqueueStrategy si,
PriorityQueueStrategy so po)
=> si
-> so
-> [Processor a (po, b)]
-> Processor a b
processorPrioritisingOutputParallel :: si -> so -> [Processor a (po, b)] -> Processor a b
processorPrioritisingOutputParallel si
si so
so [Processor a (po, b)]
ps =
(Stream a -> Stream b) -> Processor a b
forall a b. (Stream a -> Stream b) -> Processor a b
Processor ((Stream a -> Stream b) -> Processor a b)
-> (Stream a -> Stream b) -> Processor a b
forall a b. (a -> b) -> a -> b
$ \Stream a
xs ->
Process (b, Stream b) -> Stream b
forall a. Process (a, Stream a) -> Stream a
Cons (Process (b, Stream b) -> Stream b)
-> Process (b, Stream b) -> Stream b
forall a b. (a -> b) -> a -> b
$
do let n :: Int
n = [Processor a (po, b)] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Processor a (po, b)]
ps
[Stream a]
input <- Simulation [Stream a] -> Process [Stream a]
forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation (Simulation [Stream a] -> Process [Stream a])
-> Simulation [Stream a] -> Process [Stream a]
forall a b. (a -> b) -> a -> b
$ si -> Int -> Stream a -> Simulation [Stream a]
forall s a.
EnqueueStrategy s =>
s -> Int -> Stream a -> Simulation [Stream a]
splitStreamQueueing si
si Int
n Stream a
xs
let results :: [Stream (po, b)]
results = (((Stream a, Processor a (po, b)) -> Stream (po, b))
-> [(Stream a, Processor a (po, b))] -> [Stream (po, b)])
-> [(Stream a, Processor a (po, b))]
-> ((Stream a, Processor a (po, b)) -> Stream (po, b))
-> [Stream (po, b)]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((Stream a, Processor a (po, b)) -> Stream (po, b))
-> [(Stream a, Processor a (po, b))] -> [Stream (po, b)]
forall a b. (a -> b) -> [a] -> [b]
map ([Stream a]
-> [Processor a (po, b)] -> [(Stream a, Processor a (po, b))]
forall a b. [a] -> [b] -> [(a, b)]
zip [Stream a]
input [Processor a (po, b)]
ps) (((Stream a, Processor a (po, b)) -> Stream (po, b))
-> [Stream (po, b)])
-> ((Stream a, Processor a (po, b)) -> Stream (po, b))
-> [Stream (po, b)]
forall a b. (a -> b) -> a -> b
$ \(Stream a
input, Processor a (po, b)
p) ->
Processor a (po, b) -> Stream a -> Stream (po, b)
forall a b. Processor a b -> Stream a -> Stream b
runProcessor Processor a (po, b)
p Stream a
input
output :: Stream b
output = so -> [Stream (po, b)] -> Stream b
forall s p a.
PriorityQueueStrategy s p =>
s -> [Stream (p, a)] -> Stream a
concatPriorityStreams so
so [Stream (po, b)]
results
Stream b -> Process (b, Stream b)
forall a. Stream a -> Process (a, Stream a)
runStream Stream b
output
processorPrioritisingInputParallel :: (PriorityQueueStrategy si pi,
EnqueueStrategy so)
=> si
-> so
-> [(Stream pi, Processor a b)]
-> Processor a b
processorPrioritisingInputParallel :: si -> so -> [(Stream pi, Processor a b)] -> Processor a b
processorPrioritisingInputParallel si
si so
so [(Stream pi, Processor a b)]
ps =
(Stream a -> Stream b) -> Processor a b
forall a b. (Stream a -> Stream b) -> Processor a b
Processor ((Stream a -> Stream b) -> Processor a b)
-> (Stream a -> Stream b) -> Processor a b
forall a b. (a -> b) -> a -> b
$ \Stream a
xs ->
Process (b, Stream b) -> Stream b
forall a. Process (a, Stream a) -> Stream a
Cons (Process (b, Stream b) -> Stream b)
-> Process (b, Stream b) -> Stream b
forall a b. (a -> b) -> a -> b
$
do [Stream a]
input <- Simulation [Stream a] -> Process [Stream a]
forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation (Simulation [Stream a] -> Process [Stream a])
-> Simulation [Stream a] -> Process [Stream a]
forall a b. (a -> b) -> a -> b
$ si -> [Stream pi] -> Stream a -> Simulation [Stream a]
forall s p a.
PriorityQueueStrategy s p =>
s -> [Stream p] -> Stream a -> Simulation [Stream a]
splitStreamPrioritising si
si (((Stream pi, Processor a b) -> Stream pi)
-> [(Stream pi, Processor a b)] -> [Stream pi]
forall a b. (a -> b) -> [a] -> [b]
map (Stream pi, Processor a b) -> Stream pi
forall a b. (a, b) -> a
fst [(Stream pi, Processor a b)]
ps) Stream a
xs
let results :: [Stream b]
results = (((Stream a, (Stream pi, Processor a b)) -> Stream b)
-> [(Stream a, (Stream pi, Processor a b))] -> [Stream b])
-> [(Stream a, (Stream pi, Processor a b))]
-> ((Stream a, (Stream pi, Processor a b)) -> Stream b)
-> [Stream b]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((Stream a, (Stream pi, Processor a b)) -> Stream b)
-> [(Stream a, (Stream pi, Processor a b))] -> [Stream b]
forall a b. (a -> b) -> [a] -> [b]
map ([Stream a]
-> [(Stream pi, Processor a b)]
-> [(Stream a, (Stream pi, Processor a b))]
forall a b. [a] -> [b] -> [(a, b)]
zip [Stream a]
input [(Stream pi, Processor a b)]
ps) (((Stream a, (Stream pi, Processor a b)) -> Stream b)
-> [Stream b])
-> ((Stream a, (Stream pi, Processor a b)) -> Stream b)
-> [Stream b]
forall a b. (a -> b) -> a -> b
$ \(Stream a
input, (Stream pi
_, Processor a b
p)) ->
Processor a b -> Stream a -> Stream b
forall a b. Processor a b -> Stream a -> Stream b
runProcessor Processor a b
p Stream a
input
output :: Stream b
output = so -> [Stream b] -> Stream b
forall s a. EnqueueStrategy s => s -> [Stream a] -> Stream a
concatQueuedStreams so
so [Stream b]
results
Stream b -> Process (b, Stream b)
forall a. Stream a -> Process (a, Stream a)
runStream Stream b
output
processorPrioritisingInputOutputParallel :: (PriorityQueueStrategy si pi,
PriorityQueueStrategy so po)
=> si
-> so
-> [(Stream pi, Processor a (po, b))]
-> Processor a b
processorPrioritisingInputOutputParallel :: si -> so -> [(Stream pi, Processor a (po, b))] -> Processor a b
processorPrioritisingInputOutputParallel si
si so
so [(Stream pi, Processor a (po, b))]
ps =
(Stream a -> Stream b) -> Processor a b
forall a b. (Stream a -> Stream b) -> Processor a b
Processor ((Stream a -> Stream b) -> Processor a b)
-> (Stream a -> Stream b) -> Processor a b
forall a b. (a -> b) -> a -> b
$ \Stream a
xs ->
Process (b, Stream b) -> Stream b
forall a. Process (a, Stream a) -> Stream a
Cons (Process (b, Stream b) -> Stream b)
-> Process (b, Stream b) -> Stream b
forall a b. (a -> b) -> a -> b
$
do [Stream a]
input <- Simulation [Stream a] -> Process [Stream a]
forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation (Simulation [Stream a] -> Process [Stream a])
-> Simulation [Stream a] -> Process [Stream a]
forall a b. (a -> b) -> a -> b
$ si -> [Stream pi] -> Stream a -> Simulation [Stream a]
forall s p a.
PriorityQueueStrategy s p =>
s -> [Stream p] -> Stream a -> Simulation [Stream a]
splitStreamPrioritising si
si (((Stream pi, Processor a (po, b)) -> Stream pi)
-> [(Stream pi, Processor a (po, b))] -> [Stream pi]
forall a b. (a -> b) -> [a] -> [b]
map (Stream pi, Processor a (po, b)) -> Stream pi
forall a b. (a, b) -> a
fst [(Stream pi, Processor a (po, b))]
ps) Stream a
xs
let results :: [Stream (po, b)]
results = (((Stream a, (Stream pi, Processor a (po, b))) -> Stream (po, b))
-> [(Stream a, (Stream pi, Processor a (po, b)))]
-> [Stream (po, b)])
-> [(Stream a, (Stream pi, Processor a (po, b)))]
-> ((Stream a, (Stream pi, Processor a (po, b))) -> Stream (po, b))
-> [Stream (po, b)]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((Stream a, (Stream pi, Processor a (po, b))) -> Stream (po, b))
-> [(Stream a, (Stream pi, Processor a (po, b)))]
-> [Stream (po, b)]
forall a b. (a -> b) -> [a] -> [b]
map ([Stream a]
-> [(Stream pi, Processor a (po, b))]
-> [(Stream a, (Stream pi, Processor a (po, b)))]
forall a b. [a] -> [b] -> [(a, b)]
zip [Stream a]
input [(Stream pi, Processor a (po, b))]
ps) (((Stream a, (Stream pi, Processor a (po, b))) -> Stream (po, b))
-> [Stream (po, b)])
-> ((Stream a, (Stream pi, Processor a (po, b))) -> Stream (po, b))
-> [Stream (po, b)]
forall a b. (a -> b) -> a -> b
$ \(Stream a
input, (Stream pi
_, Processor a (po, b)
p)) ->
Processor a (po, b) -> Stream a -> Stream (po, b)
forall a b. Processor a b -> Stream a -> Stream b
runProcessor Processor a (po, b)
p Stream a
input
output :: Stream b
output = so -> [Stream (po, b)] -> Stream b
forall s p a.
PriorityQueueStrategy s p =>
s -> [Stream (p, a)] -> Stream a
concatPriorityStreams so
so [Stream (po, b)]
results
Stream b -> Process (b, Stream b)
forall a. Stream a -> Process (a, Stream a)
runStream Stream b
output
processorParallel :: [Processor a b] -> Processor a b
processorParallel :: [Processor a b] -> Processor a b
processorParallel = FCFS -> FCFS -> [Processor a b] -> Processor a b
forall si so a b.
(EnqueueStrategy si, EnqueueStrategy so) =>
si -> so -> [Processor a b] -> Processor a b
processorQueuedParallel FCFS
FCFS FCFS
FCFS
processorSeq :: [Processor a a] -> Processor a a
processorSeq :: [Processor a a] -> Processor a a
processorSeq [] = Processor a a
forall b c. Processor b c
emptyProcessor
processorSeq [Processor a a
p] = Processor a a
p
processorSeq (Processor a a
p : [Processor a a]
ps) = Processor a a
p Processor a a -> Processor a a -> Processor a a
forall k (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> Processor a a
forall a. Processor a a
prefetchProcessor Processor a a -> Processor a a -> Processor a a
forall k (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> [Processor a a] -> Processor a a
forall a. [Processor a a] -> Processor a a
processorSeq [Processor a a]
ps
bufferProcessor :: (Stream a -> Process ())
-> Stream b
-> Processor a b
bufferProcessor :: (Stream a -> Process ()) -> Stream b -> Processor a b
bufferProcessor Stream a -> Process ()
consume Stream b
output =
(Stream a -> Stream b) -> Processor a b
forall a b. (Stream a -> Stream b) -> Processor a b
Processor ((Stream a -> Stream b) -> Processor a b)
-> (Stream a -> Stream b) -> Processor a b
forall a b. (a -> b) -> a -> b
$ \Stream a
xs ->
Process (b, Stream b) -> Stream b
forall a. Process (a, Stream a) -> Stream a
Cons (Process (b, Stream b) -> Stream b)
-> Process (b, Stream b) -> Stream b
forall a b. (a -> b) -> a -> b
$
do Process () -> Process ()
spawnProcess (Stream a -> Process ()
consume Stream a
xs)
Stream b -> Process (b, Stream b)
forall a. Stream a -> Process (a, Stream a)
runStream Stream b
output
bufferProcessorLoop :: (Stream a -> Stream c -> Process ())
-> Stream d
-> Processor d (Either e b)
-> Processor e c
-> Processor a b
bufferProcessorLoop :: (Stream a -> Stream c -> Process ())
-> Stream d
-> Processor d (Either e b)
-> Processor e c
-> Processor a b
bufferProcessorLoop Stream a -> Stream c -> Process ()
consume Stream d
preoutput Processor d (Either e b)
cond Processor e c
body =
(Stream a -> Stream b) -> Processor a b
forall a b. (Stream a -> Stream b) -> Processor a b
Processor ((Stream a -> Stream b) -> Processor a b)
-> (Stream a -> Stream b) -> Processor a b
forall a b. (a -> b) -> a -> b
$ \Stream a
xs ->
Process (b, Stream b) -> Stream b
forall a. Process (a, Stream a) -> Stream a
Cons (Process (b, Stream b) -> Stream b)
-> Process (b, Stream b) -> Stream b
forall a b. (a -> b) -> a -> b
$
do (Stream e
reverted, Stream b
output) <-
Simulation (Stream e, Stream b) -> Process (Stream e, Stream b)
forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation (Simulation (Stream e, Stream b) -> Process (Stream e, Stream b))
-> Simulation (Stream e, Stream b) -> Process (Stream e, Stream b)
forall a b. (a -> b) -> a -> b
$
Stream (Either e b) -> Simulation (Stream e, Stream b)
forall a b. Stream (Either a b) -> Simulation (Stream a, Stream b)
partitionEitherStream (Stream (Either e b) -> Simulation (Stream e, Stream b))
-> Stream (Either e b) -> Simulation (Stream e, Stream b)
forall a b. (a -> b) -> a -> b
$
Processor d (Either e b) -> Stream d -> Stream (Either e b)
forall a b. Processor a b -> Stream a -> Stream b
runProcessor Processor d (Either e b)
cond Stream d
preoutput
Process () -> Process ()
spawnProcess
(Stream a -> Stream c -> Process ()
consume Stream a
xs (Stream c -> Process ()) -> Stream c -> Process ()
forall a b. (a -> b) -> a -> b
$ Processor e c -> Stream e -> Stream c
forall a b. Processor a b -> Stream a -> Stream b
runProcessor Processor e c
body Stream e
reverted)
Stream b -> Process (b, Stream b)
forall a. Stream a -> Process (a, Stream a)
runStream Stream b
output
queueProcessor :: (a -> Process ())
-> Process b
-> Processor a b
queueProcessor :: (a -> Process ()) -> Process b -> Processor a b
queueProcessor a -> Process ()
enqueue Process b
dequeue =
(Stream a -> Process ()) -> Stream b -> Processor a b
forall a b. (Stream a -> Process ()) -> Stream b -> Processor a b
bufferProcessor
((a -> Process ()) -> Stream a -> Process ()
forall a. (a -> Process ()) -> Stream a -> Process ()
consumeStream a -> Process ()
enqueue)
(Process b -> Stream b
forall a. Process a -> Stream a
repeatProcess Process b
dequeue)
queueProcessorLoopMerging :: (Stream a -> Stream d -> Stream e)
-> (e -> Process ())
-> Process c
-> Processor c (Either f b)
-> Processor f d
-> Processor a b
queueProcessorLoopMerging :: (Stream a -> Stream d -> Stream e)
-> (e -> Process ())
-> Process c
-> Processor c (Either f b)
-> Processor f d
-> Processor a b
queueProcessorLoopMerging Stream a -> Stream d -> Stream e
merge e -> Process ()
enqueue Process c
dequeue =
(Stream a -> Stream d -> Process ())
-> Stream c
-> Processor c (Either f b)
-> Processor f d
-> Processor a b
forall a c d e b.
(Stream a -> Stream c -> Process ())
-> Stream d
-> Processor d (Either e b)
-> Processor e c
-> Processor a b
bufferProcessorLoop
(\Stream a
bs Stream d
cs ->
(e -> Process ()) -> Stream e -> Process ()
forall a. (a -> Process ()) -> Stream a -> Process ()
consumeStream e -> Process ()
enqueue (Stream e -> Process ()) -> Stream e -> Process ()
forall a b. (a -> b) -> a -> b
$
Stream a -> Stream d -> Stream e
merge Stream a
bs Stream d
cs)
(Process c -> Stream c
forall a. Process a -> Stream a
repeatProcess Process c
dequeue)
queueProcessorLoopSeq :: (a -> Process ())
-> Process c
-> Processor c (Either e b)
-> Processor e a
-> Processor a b
queueProcessorLoopSeq :: (a -> Process ())
-> Process c
-> Processor c (Either e b)
-> Processor e a
-> Processor a b
queueProcessorLoopSeq =
(Stream a -> Stream a -> Stream a)
-> (a -> Process ())
-> Process c
-> Processor c (Either e b)
-> Processor e a
-> Processor a b
forall a d e c f b.
(Stream a -> Stream d -> Stream e)
-> (e -> Process ())
-> Process c
-> Processor c (Either f b)
-> Processor f d
-> Processor a b
queueProcessorLoopMerging Stream a -> Stream a -> Stream a
forall a. Stream a -> Stream a -> Stream a
mergeStreams
queueProcessorLoopParallel :: (a -> Process ())
-> Process c
-> Processor c (Either e b)
-> Processor e a
-> Processor a b
queueProcessorLoopParallel :: (a -> Process ())
-> Process c
-> Processor c (Either e b)
-> Processor e a
-> Processor a b
queueProcessorLoopParallel a -> Process ()
enqueue Process c
dequeue =
(Stream a -> Stream a -> Process ())
-> Stream c
-> Processor c (Either e b)
-> Processor e a
-> Processor a b
forall a c d e b.
(Stream a -> Stream c -> Process ())
-> Stream d
-> Processor d (Either e b)
-> Processor e c
-> Processor a b
bufferProcessorLoop
(\Stream a
bs Stream a
cs ->
do Process () -> Process ()
spawnProcess (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
(a -> Process ()) -> Stream a -> Process ()
forall a. (a -> Process ()) -> Stream a -> Process ()
consumeStream a -> Process ()
enqueue Stream a
bs
Process () -> Process ()
spawnProcess (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
(a -> Process ()) -> Stream a -> Process ()
forall a. (a -> Process ()) -> Stream a -> Process ()
consumeStream a -> Process ()
enqueue Stream a
cs)
(Process c -> Stream c
forall a. Process a -> Stream a
repeatProcess Process c
dequeue)
prefetchProcessor :: Processor a a
prefetchProcessor :: Processor a a
prefetchProcessor = (Stream a -> Stream a) -> Processor a a
forall a b. (Stream a -> Stream b) -> Processor a b
Processor Stream a -> Stream a
forall a. Stream a -> Stream a
prefetchStream
channelProcessor :: Channel a b -> Processor a b
channelProcessor :: Channel a b -> Processor a b
channelProcessor Channel a b
f =
(Stream a -> Stream b) -> Processor a b
forall a b. (Stream a -> Stream b) -> Processor a b
Processor ((Stream a -> Stream b) -> Processor a b)
-> (Stream a -> Stream b) -> Processor a b
forall a b. (a -> b) -> a -> b
$ \Stream a
xs ->
Process (b, Stream b) -> Stream b
forall a. Process (a, Stream a) -> Stream a
Cons (Process (b, Stream b) -> Stream b)
-> Process (b, Stream b) -> Stream b
forall a b. (a -> b) -> a -> b
$
do let composite :: Composite (Stream b)
composite =
do Signal a
sa <- Stream a -> Composite (Signal a)
forall a. Stream a -> Composite (Signal a)
streamSignal Stream a
xs
Signal b
sb <- Channel a b -> Signal a -> Composite (Signal b)
forall a b. Channel a b -> Signal a -> Composite (Signal b)
runChannel Channel a b
f Signal a
sa
Signal b -> Composite (Stream b)
forall a. Signal a -> Composite (Stream a)
signalStream Signal b
sb
(Stream b
ys, DisposableEvent
h) <- Event (Stream b, DisposableEvent)
-> Process (Stream b, DisposableEvent)
forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent (Event (Stream b, DisposableEvent)
-> Process (Stream b, DisposableEvent))
-> Event (Stream b, DisposableEvent)
-> Process (Stream b, DisposableEvent)
forall a b. (a -> b) -> a -> b
$
Composite (Stream b)
-> DisposableEvent -> Event (Stream b, DisposableEvent)
forall a.
Composite a -> DisposableEvent -> Event (a, DisposableEvent)
runComposite Composite (Stream b)
composite DisposableEvent
forall a. Monoid a => a
mempty
Event () -> Process ()
whenCancellingProcess (Event () -> Process ()) -> Event () -> Process ()
forall a b. (a -> b) -> a -> b
$
DisposableEvent -> Event ()
disposeEvent DisposableEvent
h
Stream b -> Process (b, Stream b)
forall a. Stream a -> Process (a, Stream a)
runStream Stream b
ys
processorChannel :: Processor a b -> Channel a b
processorChannel :: Processor a b -> Channel a b
processorChannel (Processor Stream a -> Stream b
f) =
(Signal a -> Composite (Signal b)) -> Channel a b
forall a b. (Signal a -> Composite (Signal b)) -> Channel a b
Channel ((Signal a -> Composite (Signal b)) -> Channel a b)
-> (Signal a -> Composite (Signal b)) -> Channel a b
forall a b. (a -> b) -> a -> b
$ \Signal a
sa ->
do Stream a
xs <- Signal a -> Composite (Stream a)
forall a. Signal a -> Composite (Stream a)
signalStream Signal a
sa
let ys :: Stream b
ys = Stream a -> Stream b
f Stream a
xs
Stream b -> Composite (Signal b)
forall a. Stream a -> Composite (Signal a)
streamSignal Stream b
ys
queuedChannelProcessor :: (b -> Event ())
-> Process b
-> Channel a b
-> Processor a b
queuedChannelProcessor :: (b -> Event ()) -> Process b -> Channel a b -> Processor a b
queuedChannelProcessor b -> Event ()
enqueue Process b
dequeue Channel a b
f =
(Stream a -> Stream b) -> Processor a b
forall a b. (Stream a -> Stream b) -> Processor a b
Processor ((Stream a -> Stream b) -> Processor a b)
-> (Stream a -> Stream b) -> Processor a b
forall a b. (a -> b) -> a -> b
$ \Stream a
xs ->
Process (b, Stream b) -> Stream b
forall a. Process (a, Stream a) -> Stream a
Cons (Process (b, Stream b) -> Stream b)
-> Process (b, Stream b) -> Stream b
forall a b. (a -> b) -> a -> b
$
do let composite :: Composite (Stream b)
composite =
do Signal a
sa <- Stream a -> Composite (Signal a)
forall a. Stream a -> Composite (Signal a)
streamSignal Stream a
xs
Signal b
sb <- Channel a b -> Signal a -> Composite (Signal b)
forall a b. Channel a b -> Signal a -> Composite (Signal b)
runChannel Channel a b
f Signal a
sa
(b -> Event ()) -> Process b -> Signal b -> Composite (Stream b)
forall a.
(a -> Event ()) -> Process a -> Signal a -> Composite (Stream a)
queuedSignalStream b -> Event ()
enqueue Process b
dequeue Signal b
sb
(Stream b
ys, DisposableEvent
h) <- Event (Stream b, DisposableEvent)
-> Process (Stream b, DisposableEvent)
forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent (Event (Stream b, DisposableEvent)
-> Process (Stream b, DisposableEvent))
-> Event (Stream b, DisposableEvent)
-> Process (Stream b, DisposableEvent)
forall a b. (a -> b) -> a -> b
$
Composite (Stream b)
-> DisposableEvent -> Event (Stream b, DisposableEvent)
forall a.
Composite a -> DisposableEvent -> Event (a, DisposableEvent)
runComposite Composite (Stream b)
composite DisposableEvent
forall a. Monoid a => a
mempty
Event () -> Process ()
whenCancellingProcess (Event () -> Process ()) -> Event () -> Process ()
forall a b. (a -> b) -> a -> b
$
DisposableEvent -> Event ()
disposeEvent DisposableEvent
h
Stream b -> Process (b, Stream b)
forall a. Stream a -> Process (a, Stream a)
runStream Stream b
ys
queuedProcessorChannel :: (a -> Event ())
-> (Process a)
-> Processor a b
-> Channel a b
queuedProcessorChannel :: (a -> Event ()) -> Process a -> Processor a b -> Channel a b
queuedProcessorChannel a -> Event ()
enqueue Process a
dequeue (Processor Stream a -> Stream b
f) =
(Signal a -> Composite (Signal b)) -> Channel a b
forall a b. (Signal a -> Composite (Signal b)) -> Channel a b
Channel ((Signal a -> Composite (Signal b)) -> Channel a b)
-> (Signal a -> Composite (Signal b)) -> Channel a b
forall a b. (a -> b) -> a -> b
$ \Signal a
sa ->
do Stream a
xs <- (a -> Event ()) -> Process a -> Signal a -> Composite (Stream a)
forall a.
(a -> Event ()) -> Process a -> Signal a -> Composite (Stream a)
queuedSignalStream a -> Event ()
enqueue Process a
dequeue Signal a
sa
let ys :: Stream b
ys = Stream a -> Stream b
f Stream a
xs
Stream b -> Composite (Signal b)
forall a. Stream a -> Composite (Signal a)
streamSignal Stream b
ys
arrivalProcessor :: Processor a (Arrival a)
arrivalProcessor :: Processor a (Arrival a)
arrivalProcessor = (Stream a -> Stream (Arrival a)) -> Processor a (Arrival a)
forall a b. (Stream a -> Stream b) -> Processor a b
Processor Stream a -> Stream (Arrival a)
forall a. Stream a -> Stream (Arrival a)
arrivalStream
delayProcessor :: a -> Processor a a
delayProcessor :: a -> Processor a a
delayProcessor a
a0 = (Stream a -> Stream a) -> Processor a a
forall a b. (Stream a -> Stream b) -> Processor a b
Processor ((Stream a -> Stream a) -> Processor a a)
-> (Stream a -> Stream a) -> Processor a a
forall a b. (a -> b) -> a -> b
$ a -> Stream a -> Stream a
forall a. a -> Stream a -> Stream a
delayStream a
a0
joinProcessor :: Process (Processor a b) -> Processor a b
joinProcessor :: Process (Processor a b) -> Processor a b
joinProcessor Process (Processor a b)
m =
(Stream a -> Stream b) -> Processor a b
forall a b. (Stream a -> Stream b) -> Processor a b
Processor ((Stream a -> Stream b) -> Processor a b)
-> (Stream a -> Stream b) -> Processor a b
forall a b. (a -> b) -> a -> b
$ \Stream a
xs ->
Process (b, Stream b) -> Stream b
forall a. Process (a, Stream a) -> Stream a
Cons (Process (b, Stream b) -> Stream b)
-> Process (b, Stream b) -> Stream b
forall a b. (a -> b) -> a -> b
$
do Processor Stream a -> Stream b
f <- Process (Processor a b)
m
Stream b -> Process (b, Stream b)
forall a. Stream a -> Process (a, Stream a)
runStream (Stream b -> Process (b, Stream b))
-> Stream b -> Process (b, Stream b)
forall a b. (a -> b) -> a -> b
$ Stream a -> Stream b
f Stream a
xs
failoverProcessor :: [Processor a b] -> Processor a b
failoverProcessor :: [Processor a b] -> Processor a b
failoverProcessor [Processor a b]
ps =
(Stream a -> Stream b) -> Processor a b
forall a b. (Stream a -> Stream b) -> Processor a b
Processor ((Stream a -> Stream b) -> Processor a b)
-> (Stream a -> Stream b) -> Processor a b
forall a b. (a -> b) -> a -> b
$ \Stream a
xs -> [Stream b] -> Stream b
forall a. [Stream a] -> Stream a
failoverStream [Processor a b -> Stream a -> Stream b
forall a b. Processor a b -> Stream a -> Stream b
runProcessor Processor a b
p Stream a
xs | Processor a b
p <- [Processor a b]
ps]
traceProcessor :: Maybe String
-> Maybe String
-> Processor a b
-> Processor a b
traceProcessor :: Maybe String -> Maybe String -> Processor a b -> Processor a b
traceProcessor Maybe String
request Maybe String
response (Processor Stream a -> Stream b
f) =
(Stream a -> Stream b) -> Processor a b
forall a b. (Stream a -> Stream b) -> Processor a b
Processor ((Stream a -> Stream b) -> Processor a b)
-> (Stream a -> Stream b) -> Processor a b
forall a b. (a -> b) -> a -> b
$ Maybe String -> Maybe String -> Stream b -> Stream b
forall a. Maybe String -> Maybe String -> Stream a -> Stream a
traceStream Maybe String
request Maybe String
response (Stream b -> Stream b)
-> (Stream a -> Stream b) -> Stream a -> Stream b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream a -> Stream b
f