-- | -- Module : Simulation.Aivika.Processor -- Copyright : Copyright (c) 2009-2013, David Sorokin <david.sorokin@gmail.com> -- License : BSD3 -- Maintainer : David Sorokin <david.sorokin@gmail.com> -- Stability : experimental -- Tested with: GHC 7.6.3 -- -- The processor of simulation data. -- module Simulation.Aivika.Processor (-- * Processor Type Processor(..), -- * Creating Simple Processor simpleProcessor, statefulProcessor, -- * Specifying Identifier processorUsingId, -- * Buffer Processor bufferProcessor, bufferProcessorLoop, -- * Processing Queues queueProcessor, queueProcessorLoopMerging, queueProcessorLoopSeq, queueProcessorLoopParallel, -- * Parallelizing Processors processorParallel, processorQueuedParallel, processorPrioritisingOutputParallel, processorPrioritisingInputParallel, processorPrioritisingInputOutputParallel) where import qualified Control.Category as C import Control.Arrow import Simulation.Aivika.Simulation import Simulation.Aivika.Dynamics import Simulation.Aivika.Event import Simulation.Aivika.Cont import Simulation.Aivika.Process import Simulation.Aivika.Stream import Simulation.Aivika.QueueStrategy -- | Represents a processor of simulation data. newtype Processor a b = Processor { runProcessor :: Stream a -> Stream b -- ^ Run the processor. } instance C.Category Processor where id = Processor id Processor x . Processor y = Processor (x . y) -- The implementation is based on article -- A New Notation for Arrows by Ross Paterson, -- although my streams are different and they -- already depend on the Process monad, -- while the pure streams were considered in the -- mentioned article. instance Arrow Processor where arr = Processor . mapStream first (Processor f) = Processor $ \xys -> Cons $ do (xs, ys) <- liftSimulation $ unzipStream xys runStream $ zipStreamSeq (f xs) ys second (Processor f) = Processor $ \xys -> Cons $ do (xs, ys) <- liftSimulation $ unzipStream xys runStream $ zipStreamSeq xs (f ys) Processor f *** Processor g = Processor $ \xys -> Cons $ do (xs, ys) <- liftSimulation $ unzipStream xys runStream $ zipStreamSeq (f xs) (g ys) -- N.B. -- Very probably, Processor is not ArrowLoop, -- which would be natural as Process is not MonadFix, -- for the discontinuous process is not irreversible -- and the time flows in one direction only. -- -- -- The implementation is based on article -- -- A New Notation for Arrows by Ross Paterson, -- -- although my streams are different and they -- -- already depend on the Process monad, -- -- while the pure streams were considered in the -- -- mentioned article. -- instance ArrowLoop Processor where -- -- loop (Processor f) = -- Processor $ \xs -> -- Cons $ -- do Cons zs <- liftSimulation $ -- simulationLoop (\(xs, ys) -> -- unzipStream $ f $ zipStreamSeq xs ys) xs -- zs -- -- simulationLoop :: ((b, d) -> Simulation (c, d)) -> b -> Simulation c -- simulationLoop f b = -- mdo (c, d) <- f (b, d) -- return c -- The implementation is based on article -- A New Notation for Arrows by Ross Paterson, -- although my streams are different and they -- already depend on the Process monad, -- while the pure streams were considered in the -- mentioned article. instance ArrowChoice Processor where left (Processor f) = Processor $ \xs -> Cons $ do ys <- liftSimulation $ memoStream xs runStream $ replaceLeftStream ys (f $ leftStream ys) right (Processor f) = Processor $ \xs -> Cons $ do ys <- liftSimulation $ memoStream xs runStream $ replaceRightStream ys (f $ rightStream ys) instance ArrowZero Processor where zeroArrow = Processor $ const emptyStream instance ArrowPlus Processor where (Processor f) <+> (Processor g) = Processor $ \xs -> Cons $ do [xs1, xs2] <- liftSimulation $ splitStream 2 xs runStream $ mergeStreams (f xs1) (g xs2) -- These instances are meaningless: -- -- instance SimulationLift (Processor a) where -- liftSimulation = Processor . mapStreamM . const . liftSimulation -- -- instance DynamicsLift (Processor a) where -- liftDynamics = Processor . mapStreamM . const . liftDynamics -- -- instance EventLift (Processor a) where -- liftEvent = Processor . mapStreamM . const . liftEvent -- -- instance ProcessLift (Processor a) where -- liftProcess = Processor . mapStreamM . const -- data first! -- | Create a simple processor by the specified handling function -- that runs the discontinuous process for each input value to get the output. simpleProcessor :: (a -> Process b) -> Processor a b simpleProcessor = Processor . mapStreamM -- | Like 'simpleProcessor' but allows creating a processor that has a state -- which is passed in to every new iteration. statefulProcessor :: s -> ((s, a) -> Process (s, b)) -> Processor a b statefulProcessor s f = Processor $ \xs -> Cons $ loop s xs where loop s xs = do (a, xs') <- runStream xs (s', b) <- f (s, a) return (b, Cons $ loop s' xs') -- | Create a processor that will use the specified process identifier. -- It can be useful to refer to the underlying 'Process' computation which -- can be passivated, interrupted, canceled and so on. See also the -- 'processUsingId' function for more details. processorUsingId :: ProcessId -> Processor a b -> Processor a b processorUsingId pid (Processor f) = Processor $ Cons . processUsingId pid . runStream . f -- | Launches the specified processors in parallel consuming the same input -- stream and producing a combined output stream. -- -- If you don't know what the enqueue strategies to apply, then -- you will probably need 'FCFS' for the both parameters, or -- function 'processorParallel' that does namely this. processorQueuedParallel :: (EnqueueStrategy si qi, EnqueueStrategy so qo) => si -- ^ the strategy applied for enqueuing the input data -> so -- ^ the strategy applied for enqueuing the output data -> [Processor a b] -- ^ the processors to parallelize -> Processor a b -- ^ the parallelized processor processorQueuedParallel si so ps = Processor $ \xs -> Cons $ do let n = length ps input <- liftSimulation $ splitStreamQueuing si n xs let results = flip map (zip input ps) $ \(input, p) -> runProcessor p input output = concatQueuedStreams so results runStream output -- | Launches the specified processors in parallel using priorities for combining the output. processorPrioritisingOutputParallel :: (EnqueueStrategy si qi, PriorityQueueStrategy so qo po) => si -- ^ the strategy applied for enqueuing the input data -> so -- ^ the strategy applied for enqueuing the output data -> [Processor a (po, b)] -- ^ the processors to parallelize -> Processor a b -- ^ the parallelized processor processorPrioritisingOutputParallel si so ps = Processor $ \xs -> Cons $ do let n = length ps input <- liftSimulation $ splitStreamQueuing si n xs let results = flip map (zip input ps) $ \(input, p) -> runProcessor p input output = concatPriorityStreams so results runStream output -- | Launches the specified processors in parallel using priorities for consuming the intput. processorPrioritisingInputParallel :: (PriorityQueueStrategy si qi pi, EnqueueStrategy so qo) => si -- ^ the strategy applied for enqueuing the input data -> so -- ^ the strategy applied for enqueuing the output data -> [(Stream pi, Processor a b)] -- ^ the streams of input priorities and the processors -- to parallelize -> Processor a b -- ^ the parallelized processor processorPrioritisingInputParallel si so ps = Processor $ \xs -> Cons $ do input <- liftSimulation $ splitStreamPrioritising si (map fst ps) xs let results = flip map (zip input ps) $ \(input, (_, p)) -> runProcessor p input output = concatQueuedStreams so results runStream output -- | Launches the specified processors in parallel using priorities for consuming -- the input and combining the output. processorPrioritisingInputOutputParallel :: (PriorityQueueStrategy si qi pi, PriorityQueueStrategy so qo po) => si -- ^ the strategy applied for enqueuing the input data -> so -- ^ the strategy applied for enqueuing the output data -> [(Stream pi, Processor a (po, b))] -- ^ the streams of input priorities and the processors -- to parallelize -> Processor a b -- ^ the parallelized processor processorPrioritisingInputOutputParallel si so ps = Processor $ \xs -> Cons $ do input <- liftSimulation $ splitStreamPrioritising si (map fst ps) xs let results = flip map (zip input ps) $ \(input, (_, p)) -> runProcessor p input output = concatPriorityStreams so results runStream output -- | Launches the processors in parallel consuming the same input stream and producing -- a combined output stream. This version applies the 'FCFS' strategy both for input -- and output, which suits the most part of uses cases. processorParallel :: [Processor a b] -> Processor a b processorParallel = processorQueuedParallel FCFS FCFS -- | Create a buffer processor, where the process from the first argument -- consumes the input stream but the stream passed in as the second argument -- and produced usually by some other process is returned as an output. -- This kind of processor is very useful for modeling the queues. bufferProcessor :: (Stream a -> Process ()) -- ^ a separate process to consume the input -> Stream b -- ^ the resulting stream of data -> Processor a b bufferProcessor consume output = Processor $ \xs -> Cons $ do spawnProcess CancelTogether (consume xs) runStream output -- | Like 'bufferProcessor' but allows creating a loop when some items -- can be returned for processing them again. It is very useful for -- modeling the processors with queues and loop-backs. bufferProcessorLoop :: (Stream a -> Stream c -> Process ()) -- ^ consume two streams: the input values of type @a@ -- and the values of type @c@ returned by the loop -> Stream d -- ^ the stream of data that may become results -> Processor d (Either c b) -- ^ process and then decide what values of type @c@ -- should be processed again -> Processor a b bufferProcessorLoop consume preoutput filter = Processor $ \xs -> Cons $ do (reverted, output) <- liftSimulation $ partitionEitherStream $ runProcessor filter preoutput spawnProcess CancelTogether (consume xs reverted) runStream output -- | Return a processor with help of which we can model the queue. -- -- Although the function doesn't refer to the queue directly, its main use case -- is namely a processing of the queue. The first argument should be the enqueueing -- operation, while the second argument should be the opposite dequeueing operation. -- -- The reason is as follows. There are many possible combinations how the queues -- can be modeled. There is no sense to enumerate all them creating a separate function -- for each case. We can just use combinators to define exactly what we need. -- -- So, the queue can lose the input items if the queue is full, or the input process -- can suspend while the queue is full, or we can use priorities for enqueueing, -- storing and dequeueing the items in different combinations. There are so many use -- cases! -- -- There is a hope that this function along with other similar functions from this -- module is sufficient to cover the most important cases. Even if it is not sufficient -- then you can use a more generic function 'bufferProcessor' which this function is -- based on. In case of need, you can even write your own function from scratch. It is -- quite easy actually. queueProcessor :: (a -> Process ()) -- ^ enqueue the input item and wait -- while the queue is full if required -- so that there was no hanging items -> Process b -- ^ dequeue an output item -> Processor a b -- ^ the buffering processor queueProcessor enqueue dequeue = bufferProcessor (consumeStream enqueue) (repeatProcess dequeue) -- | Like 'queueProcessor' creates a queue processor but allows creating -- a loop when some items can be returned and added to the queue again. -- Also it allows specifying how two input streams of data can be merged. queueProcessorLoopMerging :: (Stream a -> Stream d -> Stream e) -- ^ merge two streams: the input values of type @a@ -- and the values of type @d@ returned by the loop -> (e -> Process ()) -- ^ enqueue the input item and wait -- while the queue is full if required -- so that there was no hanging items -> Process c -- ^ dequeue an item for the further processing -> Processor c (Either d b) -- ^ process and then decide what values of type @d@ -- should be processed again -> Processor a b -- ^ the buffering processor queueProcessorLoopMerging merge enqueue dequeue = bufferProcessorLoop (\bs cs -> consumeStream enqueue $ merge bs cs) (repeatProcess dequeue) -- | Like 'queueProcessorLoopMerging' creates a queue processor and allows -- creating a loop when some items can be returned and added to the queue again. -- Only it sequentially merges two input streams of data: one stream -- that come from the external source and another stream of data returned -- by the loop. The first stream has a priority over the second one. queueProcessorLoopSeq :: (a -> Process ()) -- ^ enqueue the input item and wait -- while the queue is full if required -- so that there was no hanging items -> Process c -- ^ dequeue an item for the further processing -> Processor c (Either a b) -- ^ process and then decide what values of type @a@ -- should be processed again -> Processor a b -- ^ the buffering processor queueProcessorLoopSeq = queueProcessorLoopMerging mergeStreams -- | Like 'queueProcessorLoopMerging' creates a queue processor and allows -- creating a loop when some items can be returned and added to the queue again. -- Only it runs two simultaneous processes to enqueue the input streams of data: -- one stream that come from the external source and another stream of data returned -- by the loop. queueProcessorLoopParallel :: (a -> Process ()) -- ^ enqueue the input item and wait -- while the queue is full if required -- so that there was no hanging items -> Process c -- ^ dequeue an item for the further processing -> Processor c (Either a b) -- ^ process and then decide what values of type @a@ -- should be processed again -> Processor a b -- ^ the buffering processor queueProcessorLoopParallel enqueue dequeue = bufferProcessorLoop (\bs cs -> do spawnProcess CancelTogether $ consumeStream enqueue bs spawnProcess CancelTogether $ consumeStream enqueue cs) (repeatProcess dequeue)