Safe Haskell | None |
Language | Haskell2010 |
Fast and robust message queues for concurrent processes.
Processes of an application can exchange message using Message Boxes.
This library is meant to be a wrapper around a
well tested and benchmarked subset of unagi-chan
for applications using unliftio
In addition to the basic functionality, i.e. _Message Boxes_, there is a very little bit of type level magic dust in UnliftIO.MessageBox.Command that helps to write code that sends a message and expects the receiving process to send a reply.
This module re-exports most of the library.
- class IsInput input where
- deliver :: MonadUnliftIO m => input a -> a -> m Bool
- deliver_ :: MonadUnliftIO m => input a -> a -> m ()
- class IsInput (Input box) => IsMessageBox box where
- type Input box :: Type -> Type
- receive :: MonadUnliftIO m => box a -> m (Maybe a)
- tryReceive :: MonadUnliftIO m => box a -> m (Future a)
- receiveAfter :: MonadUnliftIO m => box a -> Int -> m (Maybe a)
- newInput :: MonadUnliftIO m => box a -> m (Input box a)
- class (IsMessageBox (MessageBox argument), IsInput (Input (MessageBox argument))) => IsMessageBoxArg argument where
- type MessageBox argument :: Type -> Type
- getConfiguredMessageLimit :: argument -> Maybe Int
- newMessageBox :: MonadUnliftIO m => argument -> m (MessageBox argument a)
- handleMessage :: (MonadUnliftIO m, IsMessageBox box) => box message -> (message -> m b) -> m (Maybe b)
- data WaitingInput a = WaitingInput !Int !(BlockingInput a)
- data WaitingBox a = WaitingBox WaitingBoxLimit (BlockingBox a)
- data WaitingBoxLimit = WaitingBoxLimit !(Maybe Int) !Int !MessageLimit
- newtype NonBlockingInput a = NonBlockingInput (BlockingInput a)
- data NonBlockingBox a
- newtype NonBlockingBoxLimit = NonBlockingBoxLimit MessageLimit
- data BlockingInput a
- data BlockingBox a
- newtype BlockingBoxLimit = BlockingBoxLimit MessageLimit
- data MessageLimit
- messageLimitToInt :: MessageLimit -> Int
- data BlockingUnlimited = BlockingUnlimited
- data UnlimitedBoxInput a
- data UnlimitedBox a
- newtype CatchAllInput i a = CatchAllInput (i a)
- newtype CatchAllBox box a = CatchAllBox (box a)
- newtype CatchAllArg cfg = CatchAllArg cfg
- data AsyncReply r
- newtype DuplicateReply = DuplicateReply CallId
- data CommandError where
- data ReplyBox a
- data Message apiTag where
- Blocking :: Show (Command apiTag ('Return result)) => Command apiTag ('Return result) -> ReplyBox result -> Message apiTag
- NonBlocking :: Show (Command apiTag 'FireAndForget) => Command apiTag 'FireAndForget -> Message apiTag
- data ReturnType where
- FireAndForget :: ReturnType
- Return :: Type -> ReturnType
- data family Command apiTag :: ReturnType -> Type
- cast :: (MonadUnliftIO m, IsInput o, Show (Command apiTag 'FireAndForget)) => o (Message apiTag) -> Command apiTag 'FireAndForget -> m Bool
- call :: (HasCallIdCounter env, MonadReader env m, MonadUnliftIO m, IsInput input, Show (Command apiTag ('Return result))) => input (Message apiTag) -> Command apiTag ('Return result) -> Int -> m (Either CommandError result)
- replyTo :: MonadUnliftIO m => ReplyBox a -> a -> m ()
- delegateCall :: (MonadUnliftIO m, IsInput o, Show (Command apiTag ('Return r))) => o (Message apiTag) -> Command apiTag ('Return r) -> ReplyBox r -> m Bool
- callAsync :: (HasCallIdCounter env, MonadReader env m, MonadUnliftIO m, IsInput o, Show (Command apiTag ('Return result))) => o (Message apiTag) -> Command apiTag ('Return result) -> m (Maybe (AsyncReply result))
- waitForReply :: MonadUnliftIO m => Int -> AsyncReply result -> m (Either CommandError result)
- tryTakeReply :: MonadUnliftIO m => AsyncReply result -> m (Maybe (Either CommandError result))
- class HasCallIdCounter env where
- getCallIdCounter :: env -> CounterVar CallId
- newtype CallId = MkCallId Int
- newCallIdCounter :: MonadIO m => m (CounterVar CallId)
- takeNext :: (MonadReader env m, HasCallIdCounter env, MonadUnliftIO m) => m CallId
- class HasCounterVar a env | env -> a where
- getCounterVar :: env -> CounterVar a
- data CounterVar a
- fresh :: forall a env m. (MonadReader env m, MonadIO m, HasCounterVar a env, Coercible a Int) => m a
- incrementAndGet :: forall a m. (MonadIO m, Coercible a Int) => CounterVar a -> m a
- newCounterVar :: forall a m. MonadIO m => m (CounterVar a)
- newtype Future a = Future (IO (Maybe a))
- tryNow :: MonadUnliftIO m => Future a -> m (Maybe a)
- awaitFuture :: MonadUnliftIO m => Future b -> m b
class IsInput input where Source #
A type class for input types. A common interface for delivering messages.
Minimal complete definition
deliver :: MonadUnliftIO m => input a -> a -> m Bool Source #
Send a message. Take whatever time it takes. Depending on the implementation, this might be a non-blocking operation. Return if the operation was successful.
NOTE: False
may sporadically be returned, especially
when there is a lot of load, so please make sure to
build your application in such a way, that it
anticipates failure.
deliver_ :: MonadUnliftIO m => input a -> a -> m () Source #
class IsInput (Input box) => IsMessageBox box where Source #
A type class for msgBox types. A common interface for receiving messages.
Minimal complete definition
receive :: MonadUnliftIO m => box a -> m (Maybe a) Source #
Receive a message. Take whatever time it takes.
Return Just
the value or Nothing
when an error
NOTE: Nothing may sporadically be returned, especially when there is a lot of load, so please make sure to build your application in such a way, that it anticipates failure.
tryReceive :: MonadUnliftIO m => box a -> m (Future a) Source #
Return a Future
that can be used to wait for the
arrival of the next message.
NOTE: Each future value represents the next slot in the queue
so one future corresponds to exactly that message (should it arrive)
and if that future value is dropped, that message will be lost!
:: MonadUnliftIO m | |
=> box a | Message box |
-> Int | Time in micro seconds to wait until the action is invoked. |
-> m (Maybe a) |
Wait for an incoming message or return Nothing.
The default implementation uses tryReceive
to get a
on which awaitFuture
inside a timeout
is called.
Instances might override this with more performant implementations especially non-blocking Unagi channel based implementation.
NOTE: Nothing may sporadically be returned, especially when there is a lot of load, so please make sure to build your application in such a way, that it anticipates failure.
newInput :: MonadUnliftIO m => box a -> m (Input box a) Source #
Create a new input
that enqueus messages,
which are received by the box
class (IsMessageBox (MessageBox argument), IsInput (Input (MessageBox argument))) => IsMessageBoxArg argument where Source #
Types that configure and allow the creation of a MessageBox
Create IsMessageBox
instances from a parameter.
Types that determine MessageBox
For a limited message box this might be the limit of the message queue.
Associated Types
type MessageBox argument :: Type -> Type Source #
The message box that can be created from the message box argument
getConfiguredMessageLimit :: argument -> Maybe Int Source #
Return a message limit.
NOTE: This method was added for unit tests. Although the method is totally valid, it might not be super useful in production code. Also note that the naming follows the rule: Reserve short names for entities that are used often.
newMessageBox :: MonadUnliftIO m => argument -> m (MessageBox argument a) Source #
Create a new msgBox
according to the argument
This is required to receive a message.
NOTE: Only one process may receive on an msgBox.
handleMessage :: (MonadUnliftIO m, IsMessageBox box) => box message -> (message -> m b) -> m (Maybe b) Source #
Receive a message and apply a function to it.
data WaitingInput a Source #
An input for a BlockingBox
that will block
for not much more than the given timeout when
the message box is full.
WaitingInput !Int !(BlockingInput a) |
IsInput WaitingInput Source # | |
Defined in UnliftIO.MessageBox.Limited Methods deliver :: MonadUnliftIO m => WaitingInput a -> a -> m Bool Source # deliver_ :: MonadUnliftIO m => WaitingInput a -> a -> m () Source # |
data WaitingBox a Source #
A BlockingBox
an a WaitingBoxLimit
the IsMessageBox
WaitingBox WaitingBoxLimit (BlockingBox a) |
IsMessageBox WaitingBox Source # | |
Defined in UnliftIO.MessageBox.Limited Methods receive :: MonadUnliftIO m => WaitingBox a -> m (Maybe a) Source # tryReceive :: MonadUnliftIO m => WaitingBox a -> m (Future a) Source # receiveAfter :: MonadUnliftIO m => WaitingBox a -> Int -> m (Maybe a) Source # newInput :: MonadUnliftIO m => WaitingBox a -> m (Input WaitingBox a) Source # | |
type Input WaitingBox Source # | |
Defined in UnliftIO.MessageBox.Limited |
data WaitingBoxLimit Source #
A IsMessageBoxArg
instance wrapping the BlockingBox
with independently configurable timeouts for receive
and deliver
WaitingBoxLimit !(Maybe Int) !Int !MessageLimit |
newtype NonBlockingInput a Source #
A wrapper around BlockingInput
with a non-blocking IsInput
will enqueue the message or return False
if the message box already contains more messages than
it's limit allows.
NonBlockingInput (BlockingInput a) |
IsInput NonBlockingInput Source # | |
Defined in UnliftIO.MessageBox.Limited Methods deliver :: MonadUnliftIO m => NonBlockingInput a -> a -> m Bool Source # deliver_ :: MonadUnliftIO m => NonBlockingInput a -> a -> m () Source # |
data NonBlockingBox a Source #
A BlockingBox
wrapper for non-blocking IsMessageBox
The difference to the BlockingBox
instance is that deliver
immediately returns if the message box limit is surpassed.
IsMessageBox NonBlockingBox Source # | |
Defined in UnliftIO.MessageBox.Limited Methods receive :: MonadUnliftIO m => NonBlockingBox a -> m (Maybe a) Source # tryReceive :: MonadUnliftIO m => NonBlockingBox a -> m (Future a) Source # receiveAfter :: MonadUnliftIO m => NonBlockingBox a -> Int -> m (Maybe a) Source # newInput :: MonadUnliftIO m => NonBlockingBox a -> m (Input NonBlockingBox a) Source # | |
type Input NonBlockingBox Source # | |
Defined in UnliftIO.MessageBox.Limited |
newtype NonBlockingBoxLimit Source #
A BlockingBoxLimit
wrapper for non-blocking IsMessageBoxArg
NonBlockingBoxLimit MessageLimit |
data BlockingInput a Source #
A message queue into which messages can be enqued by,
e.g. tryToDeliver
Messages can be received from an BlockingBox
The Input
is the counter part of a BlockingBox
IsInput BlockingInput Source # | A blocking instance that invokes |
Defined in UnliftIO.MessageBox.Limited Methods deliver :: MonadUnliftIO m => BlockingInput a -> a -> m Bool Source # deliver_ :: MonadUnliftIO m => BlockingInput a -> a -> m () Source # |
data BlockingBox a Source #
A message queue out of which messages can by receive
This is the counter part of Input
. Can be used for reading
Messages can be received by receive
or tryReceive
IsMessageBox BlockingBox Source # | A blocking instance that invokes |
Defined in UnliftIO.MessageBox.Limited Methods receive :: MonadUnliftIO m => BlockingBox a -> m (Maybe a) Source # tryReceive :: MonadUnliftIO m => BlockingBox a -> m (Future a) Source # receiveAfter :: MonadUnliftIO m => BlockingBox a -> Int -> m (Maybe a) Source # newInput :: MonadUnliftIO m => BlockingBox a -> m (Input BlockingBox a) Source # | |
type Input BlockingBox Source # | |
Defined in UnliftIO.MessageBox.Limited |
newtype BlockingBoxLimit Source #
Contains the (vague) limit of messages that a BlockingBox
can buffer, i.e. that deliver
can put into a BlockingInput
of a BlockingBox
BlockingBoxLimit MessageLimit |
data MessageLimit Source #
Message Limit
The message limit must be a reasonable small positive integer that is also a power of two. This stems from the fact that Unagi is used under the hood.
The limit is a lower bound.
messageLimitToInt :: MessageLimit -> Int Source #
Convert a MessageLimit
to the
data BlockingUnlimited Source #
The (empty) configuration for creating
es using the IsMessageBoxArg
BlockingUnlimited |
Show BlockingUnlimited Source # | |
Defined in UnliftIO.MessageBox.Unlimited Methods showsPrec :: Int -> BlockingUnlimited -> ShowS # show :: BlockingUnlimited -> String # showList :: [BlockingUnlimited] -> ShowS # | |
IsMessageBoxArg BlockingUnlimited Source # | |
Defined in UnliftIO.MessageBox.Unlimited Associated Types type MessageBox BlockingUnlimited :: Type -> Type Source # Methods getConfiguredMessageLimit :: BlockingUnlimited -> Maybe Int Source # newMessageBox :: MonadUnliftIO m => BlockingUnlimited -> m (MessageBox BlockingUnlimited a) Source # | |
type MessageBox BlockingUnlimited Source # | |
Defined in UnliftIO.MessageBox.Unlimited |
data UnlimitedBoxInput a Source #
A message queue into which messages can be enqued by,
e.g. deliver
Messages can be received from an UnlimitedBox
The UnlimitedBoxInput
is the counter part of a UnlimitedBox
IsInput UnlimitedBoxInput Source # | A blocking instance that invokes |
Defined in UnliftIO.MessageBox.Unlimited Methods deliver :: MonadUnliftIO m => UnlimitedBoxInput a -> a -> m Bool Source # deliver_ :: MonadUnliftIO m => UnlimitedBoxInput a -> a -> m () Source # |
data UnlimitedBox a Source #
A message queue out of which messages can
by receive
This is the counter part of Input
. Can be
used for reading messages.
Messages can be received by receive
or tryReceive
IsMessageBox UnlimitedBox Source # | A blocking instance that invokes |
Defined in UnliftIO.MessageBox.Unlimited Methods receive :: MonadUnliftIO m => UnlimitedBox a -> m (Maybe a) Source # tryReceive :: MonadUnliftIO m => UnlimitedBox a -> m (Future a) Source # receiveAfter :: MonadUnliftIO m => UnlimitedBox a -> Int -> m (Maybe a) Source # newInput :: MonadUnliftIO m => UnlimitedBox a -> m (Input UnlimitedBox a) Source # | |
type Input UnlimitedBox Source # | |
Defined in UnliftIO.MessageBox.Unlimited |
newtype CatchAllInput i a Source #
A wrapper around values that are instances
of IsInput
CatchAllInput (i a) |
IsInput i => IsInput (CatchAllInput i) Source # | |
Defined in UnliftIO.MessageBox.CatchAll Methods deliver :: MonadUnliftIO m => CatchAllInput i a -> a -> m Bool Source # deliver_ :: MonadUnliftIO m => CatchAllInput i a -> a -> m () Source # |
newtype CatchAllBox box a Source #
A wrapper around values that are instances
of IsMessageBox
The Input
type will be wrapped using
CatchAllBox (box a) |
IsMessageBox box => IsMessageBox (CatchAllBox box) Source # | |
Defined in UnliftIO.MessageBox.CatchAll Methods receive :: MonadUnliftIO m => CatchAllBox box a -> m (Maybe a) Source # tryReceive :: MonadUnliftIO m => CatchAllBox box a -> m (Future a) Source # receiveAfter :: MonadUnliftIO m => CatchAllBox box a -> Int -> m (Maybe a) Source # newInput :: MonadUnliftIO m => CatchAllBox box a -> m (Input (CatchAllBox box) a) Source # | |
type Input (CatchAllBox box) Source # | |
Defined in UnliftIO.MessageBox.CatchAll |
newtype CatchAllArg cfg Source #
A wrapper around values that are instances
of IsMessageBoxArg
. The factory wraps
the result of the delegated newMessageBox
invocation into a CatchAllBox
CatchAllArg cfg |
data AsyncReply r Source #
The result of callAsync
Use waitForReply
or tryTakeReply
Typeable r => Show (AsyncReply r) Source # | |
Defined in UnliftIO.MessageBox.Command Methods showsPrec :: Int -> AsyncReply r -> ShowS # show :: AsyncReply r -> String # showList :: [AsyncReply r] -> ShowS # |
newtype DuplicateReply Source #
DuplicateReply CallId |
Eq DuplicateReply Source # | |
Defined in UnliftIO.MessageBox.Command Methods (==) :: DuplicateReply -> DuplicateReply -> Bool # (/=) :: DuplicateReply -> DuplicateReply -> Bool # | |
Show DuplicateReply Source # | |
Defined in UnliftIO.MessageBox.Command Methods showsPrec :: Int -> DuplicateReply -> ShowS # show :: DuplicateReply -> String # showList :: [DuplicateReply] -> ShowS # | |
Exception DuplicateReply Source # | |
Defined in UnliftIO.MessageBox.Command Methods toException :: DuplicateReply -> SomeException # |
data CommandError where Source #
The failures that the receiver of a Return
, i.e. a Blocking
can communicate to the caller, in order to indicate that
processing a request did not or will not lead to the result the
caller is blocked waiting for.
CouldNotEnqueueCommand :: !CallId -> CommandError | Failed to enqueue a |
BlockingCommandFailure :: !CallId -> CommandError | The request has failed for reasons. |
BlockingCommandTimedOut :: !CallId -> CommandError | Timeout waiting for the result. |
Eq CommandError Source # | |
Defined in UnliftIO.MessageBox.Command | |
Show CommandError Source # | |
Defined in UnliftIO.MessageBox.Command Methods showsPrec :: Int -> CommandError -> ShowS # show :: CommandError -> String # showList :: [CommandError] -> ShowS # |
This is like Input
, it can be used
by the receiver of a Blocking
to either send a reply using reply
or to fail/abort the request using sendRequestError
data Message apiTag where Source #
A message valid for some user defined apiTag
The apiTag
tag (phantom-) type defines the
messages allowed here, declared by the instance of
for apiTag
Blocking :: Show (Command apiTag ('Return result)) => Command apiTag ('Return result) -> ReplyBox result -> Message apiTag | Wraps a Such a message can formed by using A |
NonBlocking :: Show (Command apiTag 'FireAndForget) => Command apiTag 'FireAndForget -> Message apiTag | If the The smart constructor |
data ReturnType where Source #
Indicates if a Command
requires the
receiver to send a reply or not.
FireAndForget :: ReturnType | Indicates that a Values of a |
Return :: Type -> ReturnType | Indicates that a Values of a |
data family Command apiTag :: ReturnType -> Type Source #
This family allows to encode imperative commands.
The clauses of a Command
define the commands that
a process should execute.
Every clause may specify an individual ReturnType
declares if and what response is valid for a message.
For example:
type LampId = Int data instance Command LightControl r where GetLamps :: Command LigthControl (Return [LampId]) SwitchOn :: LampId -> Command LigthControl FireAndForget data LightControl -- the phantom type
The type index of the Command family is the uninhabited
The second type parameter indicates if a message requires the receiver to send a reply back to the blocked and waiting sender, or if no reply is necessary.
cast :: (MonadUnliftIO m, IsInput o, Show (Command apiTag 'FireAndForget)) => o (Message apiTag) -> Command apiTag 'FireAndForget -> m Bool Source #
Enqueue a NonBlocking
into an Input
This is just for symetry to call
, this is
equivalent to: input -> MessageBox.tryToDeliver input . NonBlocking
call :: (HasCallIdCounter env, MonadReader env m, MonadUnliftIO m, IsInput input, Show (Command apiTag ('Return result))) => input (Message apiTag) -> Command apiTag ('Return result) -> Int -> m (Either CommandError result) Source #
Enqueue a Blocking
into an IsInput
and wait for the
If message deliver
y failed, return Left
If no reply was given by the receiving process (using replyTo
) within
a given duration, return Left
Important: The given timeout starts after deliver
has returned,
if deliver
blocks and delays, call
might take longer than the
specified timeout.
The receiving process can either delegate the call using
or reply to the call by using: replyTo
replyTo :: MonadUnliftIO m => ReplyBox a -> a -> m () Source #
delegateCall :: (MonadUnliftIO m, IsInput o, Show (Command apiTag ('Return r))) => o (Message apiTag) -> Command apiTag ('Return r) -> ReplyBox r -> m Bool Source #
callAsync :: (HasCallIdCounter env, MonadReader env m, MonadUnliftIO m, IsInput o, Show (Command apiTag ('Return result))) => o (Message apiTag) -> Command apiTag ('Return result) -> m (Maybe (AsyncReply result)) Source #
:: MonadUnliftIO m | |
=> Int | The time in micro seconds to wait
before returning |
-> AsyncReply result | |
-> m (Either CommandError result) |
tryTakeReply :: MonadUnliftIO m => AsyncReply result -> m (Maybe (Either CommandError result)) Source #
class HasCallIdCounter env where Source #
Class of environment records containing a CounterVar
for CallId
getCallIdCounter :: env -> CounterVar CallId Source #
HasCallIdCounter (CounterVar CallId) Source # | |
Defined in UnliftIO.MessageBox.Util.CallId Methods getCallIdCounter :: CounterVar CallId -> CounterVar CallId Source # |
An identifier value every command send by call
Eq CallId Source # | |
Ord CallId Source # | |
Show CallId Source # | |
HasCallIdCounter (CounterVar CallId) Source # | |
Defined in UnliftIO.MessageBox.Util.CallId Methods getCallIdCounter :: CounterVar CallId -> CounterVar CallId Source # |
newCallIdCounter :: MonadIO m => m (CounterVar CallId) Source #
Create a new CallId
takeNext :: (MonadReader env m, HasCallIdCounter env, MonadUnliftIO m) => m CallId Source #
Increment and get a new CallId
class HasCounterVar a env | env -> a where Source #
A type class for MonadReader
getCounterVar :: env -> CounterVar a Source #
HasCounterVar (t :: k) (CounterVar t) Source # | |
Defined in UnliftIO.MessageBox.Util.Fresh Methods getCounterVar :: CounterVar t -> CounterVar t Source # |
data CounterVar a Source #
An AtomicCounter
HasCounterVar (t :: k) (CounterVar t) Source # | |
Defined in UnliftIO.MessageBox.Util.Fresh Methods getCounterVar :: CounterVar t -> CounterVar t Source # | |
HasCallIdCounter (CounterVar CallId) Source # | |
Defined in UnliftIO.MessageBox.Util.CallId Methods getCallIdCounter :: CounterVar CallId -> CounterVar CallId Source # |
fresh :: forall a env m. (MonadReader env m, MonadIO m, HasCounterVar a env, Coercible a Int) => m a Source #
A threadsafe atomic a
Atomically increment and get the value of the Counter
for type a
that must be present in the env
incrementAndGet :: forall a m. (MonadIO m, Coercible a Int) => CounterVar a -> m a Source #
Atomically increment and get the value of the Counter
for type a
that must be present in the env
newCounterVar :: forall a m. MonadIO m => m (CounterVar a) Source #
Create a new CounterVar
starting at 0
A wrapper around an IO action that returns value in the future.
awaitFuture :: MonadUnliftIO m => Future b -> m b Source #
Poll a Future until the value is present.