Copyright | Copyright (c) 2009-2013, David Sorokin <david.sorokin@gmail.com> |
---|---|
License | BSD3 |
Maintainer | David Sorokin <david.sorokin@gmail.com> |
Stability | experimental |
Safe Haskell | Safe-Inferred |
Language | Haskell98 |
Tested with: GHC 7.6.3
The processor of simulation data.
- newtype Processor a b = Processor {
- runProcessor :: Stream a -> Stream b
- simpleProcessor :: (a -> Process b) -> Processor a b
- statefulProcessor :: s -> ((s, a) -> Process (s, b)) -> Processor a b
- processorUsingId :: ProcessId -> Processor a b -> Processor a b
- bufferProcessor :: (Stream a -> Process ()) -> Stream b -> Processor a b
- bufferProcessorLoop :: (Stream a -> Stream c -> Process ()) -> Stream d -> Processor d (Either c b) -> Processor a b
- queueProcessor :: (a -> Process ()) -> Process b -> Processor a b
- queueProcessorLoopMerging :: (Stream a -> Stream d -> Stream e) -> (e -> Process ()) -> Process c -> Processor c (Either d b) -> Processor a b
- queueProcessorLoopSeq :: (a -> Process ()) -> Process c -> Processor c (Either a b) -> Processor a b
- queueProcessorLoopParallel :: (a -> Process ()) -> Process c -> Processor c (Either a b) -> Processor a b
- processorParallel :: [Processor a b] -> Processor a b
- processorQueuedParallel :: (EnqueueStrategy si qi, EnqueueStrategy so qo) => si -> so -> [Processor a b] -> Processor a b
- processorPrioritisingOutputParallel :: (EnqueueStrategy si qi, PriorityQueueStrategy so qo po) => si -> so -> [Processor a (po, b)] -> Processor a b
- processorPrioritisingInputParallel :: (PriorityQueueStrategy si qi pi, EnqueueStrategy so qo) => si -> so -> [(Stream pi, Processor a b)] -> Processor a b
- processorPrioritisingInputOutputParallel :: (PriorityQueueStrategy si qi pi, PriorityQueueStrategy so qo po) => si -> so -> [(Stream pi, Processor a (po, b))] -> Processor a b
Processor Type
Represents a processor of simulation data.
Processor | |
|
Creating Simple Processor
simpleProcessor :: (a -> Process b) -> Processor a b Source
Create a simple processor by the specified handling function that runs the discontinuous process for each input value to get the output.
statefulProcessor :: s -> ((s, a) -> Process (s, b)) -> Processor a b Source
Like simpleProcessor
but allows creating a processor that has a state
which is passed in to every new iteration.
Specifying Identifier
processorUsingId :: ProcessId -> Processor a b -> Processor a b Source
Create a processor that will use the specified process identifier.
It can be useful to refer to the underlying Process
computation which
can be passivated, interrupted, canceled and so on. See also the
processUsingId
function for more details.
Buffer Processor
:: (Stream a -> Process ()) | a separate process to consume the input |
-> Stream b | the resulting stream of data |
-> Processor a b |
Create a buffer processor, where the process from the first argument consumes the input stream but the stream passed in as the second argument and produced usually by some other process is returned as an output. This kind of processor is very useful for modeling the queues.
:: (Stream a -> Stream c -> Process ()) | consume two streams: the input values of type |
-> Stream d | the stream of data that may become results |
-> Processor d (Either c b) | process and then decide what values of type |
-> Processor a b |
Like bufferProcessor
but allows creating a loop when some items
can be returned for processing them again. It is very useful for
modeling the processors with queues and loop-backs.
Processing Queues
:: (a -> Process ()) | enqueue the input item and wait while the queue is full if required so that there was no hanging items |
-> Process b | dequeue an output item |
-> Processor a b | the buffering processor |
Return a processor with help of which we can model the queue.
Although the function doesn't refer to the queue directly, its main use case is namely a processing of the queue. The first argument should be the enqueueing operation, while the second argument should be the opposite dequeueing operation.
The reason is as follows. There are many possible combinations how the queues can be modeled. There is no sense to enumerate all them creating a separate function for each case. We can just use combinators to define exactly what we need.
So, the queue can lose the input items if the queue is full, or the input process can suspend while the queue is full, or we can use priorities for enqueueing, storing and dequeueing the items in different combinations. There are so many use cases!
There is a hope that this function along with other similar functions from this
module is sufficient to cover the most important cases. Even if it is not sufficient
then you can use a more generic function bufferProcessor
which this function is
based on. In case of need, you can even write your own function from scratch. It is
quite easy actually.
queueProcessorLoopMerging Source
:: (Stream a -> Stream d -> Stream e) | merge two streams: the input values of type |
-> (e -> Process ()) | enqueue the input item and wait while the queue is full if required so that there was no hanging items |
-> Process c | dequeue an item for the further processing |
-> Processor c (Either d b) | process and then decide what values of type |
-> Processor a b | the buffering processor |
Like queueProcessor
creates a queue processor but allows creating
a loop when some items can be returned and added to the queue again.
Also it allows specifying how two input streams of data can be merged.
:: (a -> Process ()) | enqueue the input item and wait while the queue is full if required so that there was no hanging items |
-> Process c | dequeue an item for the further processing |
-> Processor c (Either a b) | process and then decide what values of type |
-> Processor a b | the buffering processor |
Like queueProcessorLoopMerging
creates a queue processor and allows
creating a loop when some items can be returned and added to the queue again.
Only it sequentially merges two input streams of data: one stream
that come from the external source and another stream of data returned
by the loop. The first stream has a priority over the second one.
queueProcessorLoopParallel Source
:: (a -> Process ()) | enqueue the input item and wait while the queue is full if required so that there was no hanging items |
-> Process c | dequeue an item for the further processing |
-> Processor c (Either a b) | process and then decide what values of type |
-> Processor a b | the buffering processor |
Like queueProcessorLoopMerging
creates a queue processor and allows
creating a loop when some items can be returned and added to the queue again.
Only it runs two simultaneous processes to enqueue the input streams of data:
one stream that come from the external source and another stream of data returned
by the loop.
Parallelizing Processors
processorParallel :: [Processor a b] -> Processor a b Source
Launches the processors in parallel consuming the same input stream and producing
a combined output stream. This version applies the FCFS
strategy both for input
and output, which suits the most part of uses cases.
processorQueuedParallel Source
:: (EnqueueStrategy si qi, EnqueueStrategy so qo) | |
=> si | the strategy applied for enqueuing the input data |
-> so | the strategy applied for enqueuing the output data |
-> [Processor a b] | the processors to parallelize |
-> Processor a b | the parallelized processor |
Launches the specified processors in parallel consuming the same input stream and producing a combined output stream.
If you don't know what the enqueue strategies to apply, then
you will probably need FCFS
for the both parameters, or
function processorParallel
that does namely this.
processorPrioritisingOutputParallel Source
:: (EnqueueStrategy si qi, PriorityQueueStrategy so qo po) | |
=> si | the strategy applied for enqueuing the input data |
-> so | the strategy applied for enqueuing the output data |
-> [Processor a (po, b)] | the processors to parallelize |
-> Processor a b | the parallelized processor |
Launches the specified processors in parallel using priorities for combining the output.
processorPrioritisingInputParallel Source
:: (PriorityQueueStrategy si qi pi, EnqueueStrategy so qo) | |
=> si | the strategy applied for enqueuing the input data |
-> so | the strategy applied for enqueuing the output data |
-> [(Stream pi, Processor a b)] | the streams of input priorities and the processors to parallelize |
-> Processor a b | the parallelized processor |
Launches the specified processors in parallel using priorities for consuming the intput.
processorPrioritisingInputOutputParallel Source
:: (PriorityQueueStrategy si qi pi, PriorityQueueStrategy so qo po) | |
=> si | the strategy applied for enqueuing the input data |
-> so | the strategy applied for enqueuing the output data |
-> [(Stream pi, Processor a (po, b))] | the streams of input priorities and the processors to parallelize |
-> Processor a b | the parallelized processor |
Launches the specified processors in parallel using priorities for consuming the input and combining the output.