Safe Haskell | None |
---|---|
Language | Haskell2010 |
Synopsis
- data Queue d :: Effect
- data QueueResult d
- resultToMaybe :: QueueResult d -> Maybe d
- interpretQueueTBM :: forall d r. Members [Resource, Race, Embed IO] r => Int -> InterpreterFor (Queue d) r
- interpretQueueTB :: forall d r. Members [Race, Embed IO] r => Natural -> InterpreterFor (Queue d) r
- interpretQueueListReadOnlyAtomic :: forall d r. Member (Embed IO) r => [d] -> InterpreterFor (Queue d) r
- interpretQueueListReadOnlyAtomicWith :: forall d r. Member (AtomicState [d]) r => InterpreterFor (Queue d) r
- interpretQueueListReadOnlyState :: forall d r. Member (Embed IO) r => [d] -> InterpreterFor (Queue d) r
- interpretQueueListReadOnlyStateWith :: forall d r. Member (State [d]) r => InterpreterFor (Queue d) r
- loop :: Member (Queue d) r => (d -> Sem r ()) -> Sem r ()
- loopOr :: Member (Queue d) r => Sem r Bool -> (d -> Sem r Bool) -> Sem r ()
- data Sync d :: Effect
- type ScopedSync res a = Scoped (SyncResources res) (Sync a)
- interpretSync :: forall d r. Members [Race, Embed IO] r => InterpreterFor (Sync d) r
- interpretSyncAs :: forall d r. Members [Race, Embed IO] r => d -> InterpreterFor (Sync d) r
- withSync :: forall d res r. Member (Scoped (SyncResources res) (Sync d)) r => InterpreterFor (Sync d) r
- interpretScopedSync :: forall d r. Members [Resource, Race, Embed IO] r => InterpreterFor (Scoped (SyncResources (MVar d)) (Sync d)) r
- interpretScopedSyncAs :: forall d r. Members [Resource, Race, Embed IO] r => d -> InterpreterFor (Scoped (SyncResources (MVar d)) (Sync d)) r
- data Race :: Effect
- race :: forall a b r. Member Race r => Sem r a -> Sem r b -> Sem r (Either a b)
- race_ :: Member Race r => Sem r a -> Sem r a -> Sem r a
- timeout :: forall a b u r. TimeUnit u => Member Race r => Sem r a -> u -> Sem r b -> Sem r (Either a b)
- timeout_ :: TimeUnit u => Member Race r => Sem r a -> u -> Sem r a -> Sem r a
- timeoutAs :: TimeUnit u => Member Race r => a -> u -> Sem r b -> Sem r (Either a b)
- timeoutAs_ :: TimeUnit u => Member Race r => a -> u -> Sem r a -> Sem r a
- timeoutU :: TimeUnit u => Member Race r => u -> Sem r () -> Sem r ()
- timeoutMaybe :: TimeUnit u => Member Race r => u -> Sem r a -> Sem r (Maybe a)
- retrying :: forall e w u t d r a. TimeUnit w => TimeUnit u => Members [Race, Time t d] r => w -> u -> Sem r (Either e a) -> Sem r (Maybe a)
- retryingWithError :: forall e w u t d r a. TimeUnit w => TimeUnit u => Members [Race, Time t d, Embed IO] r => w -> u -> Sem r (Either e a) -> Sem r (Maybe (Either e a))
- interpretRace :: Member (Final IO) r => InterpreterFor Race r
- data Interrupt :: Effect
- interpretInterrupt :: Members [Critical, Race, Async, Embed IO] r => InterpreterFor Interrupt r
- data Events (token :: Type) (e :: Type) :: Effect
- publish :: forall token e r. MemberWithError (Events token e) r => e -> Sem r ()
- consume :: forall e r. MemberWithError (Consume e) r => Sem r e
- subscribe :: forall e token r. Member (Scoped (EventToken token) (Consume e)) r => InterpreterFor (Consume e) r
- subscribeWhile :: forall e token r. Member (EventConsumer token e) r => (e -> Sem r Bool) -> Sem r ()
- subscribeLoop :: forall e token r. Member (EventConsumer token e) r => (e -> Sem r ()) -> Sem r ()
- data EventToken token
- type EventChan e = EventToken (OutChan e)
- type ChanEvents e = Events (OutChan e) e
- type EventConsumer token e = Scoped (EventToken token) (Consume e)
- type ChanConsumer e = Scoped (EventChan e) (Consume e)
- interpretEventsChan :: forall e r. Members [Resource, Race, Async, Embed IO] r => InterpretersFor [Events (OutChan e) e, ChanConsumer e] r
- data Critical :: Effect
- interpretCritical :: Member (Final IO) r => InterpreterFor Critical r
- interpretCriticalNull :: InterpreterFor Critical r
- interpretAtomic :: forall a r. Member (Embed IO) r => a -> InterpreterFor (AtomicState a) r
- withAsyncBlock :: Members [Resource, Async] r => Sem r b -> (Async (Maybe b) -> Sem r a) -> Sem r a
- withAsync :: Members [Resource, Race, Async] r => Sem r b -> (Async (Maybe b) -> Sem r a) -> Sem r a
- withAsync_ :: Members [Resource, Race, Async] r => Sem r b -> Sem r a -> Sem r a
Introduction
This library provides an assortment of tools for concurrency-related tasks:
Queues
data Queue d :: Effect Source #
Abstracts queues like TBQueue
.
For documentation on the constructors, see the module Polysemy.Conc.Data.Queue.
import Polysemy.Conc (Queue, QueueResult) import Polysemy.Conc.Effect.Queue as Queue prog :: Member (Queue Int) r => Sem r (QueueResult Int) prog = do Queue.write 5 Queue.write 10 Queue.read >>= \case QueueResult.Success i -> fmap (i +) <$> Queue.read r -> pure r
Instances
type DefiningModule Queue Source # | |
Defined in Polysemy.Conc.Effect.Queue |
data QueueResult d Source #
Encodes failure reasons for queues.
For documentation on the constructors, see the module Polysemy.Conc.Data.QueueResult.
import qualified Polysemy.Conc.Data.QueueResult as QueueResult
Instances
resultToMaybe :: QueueResult d -> Maybe d Source #
Turn a Success
into Just
.
Interpreters
interpretQueueListReadOnlyAtomic :: forall d r. Member (Embed IO) r => [d] -> InterpreterFor (Queue d) r Source #
Variant of interpretQueueListReadOnlyAtomicWith
that interprets the AtomicState
.
interpretQueueListReadOnlyAtomicWith :: forall d r. Member (AtomicState [d]) r => InterpreterFor (Queue d) r Source #
Reinterpret Queue
as AtomicState
with a list that cannot be written to.
Useful for testing.
interpretQueueListReadOnlyState :: forall d r. Member (Embed IO) r => [d] -> InterpreterFor (Queue d) r Source #
Variant of interpretQueueListReadOnlyAtomicWith
that interprets the State
.
interpretQueueListReadOnlyStateWith :: forall d r. Member (State [d]) r => InterpreterFor (Queue d) r Source #
Combinators
loop :: Member (Queue d) r => (d -> Sem r ()) -> Sem r () Source #
Read from a Queue
repeatedly until it is closed.
When an element is received, call action
and recurse.
MVars
data Sync d :: Effect Source #
Abstracts an MVar
.
For documentation on the constructors, see the module Polysemy.Conc.Effect.Sync.
import Polysemy.Conc (Sync) import qualified Polysemy.Conc.Effect.Sync as Sync prog :: Member (Sync Int) r => Sem r Int prog = do Sync.putTry 5 Sync.takeBlock
Instances
type DefiningModule Sync Source # | |
Defined in Polysemy.Conc.Effect.Sync |
type ScopedSync res a = Scoped (SyncResources res) (Sync a) Source #
Interpreters
interpretSync :: forall d r. Members [Race, Embed IO] r => InterpreterFor (Sync d) r Source #
interpretSyncAs :: forall d r. Members [Race, Embed IO] r => d -> InterpreterFor (Sync d) r Source #
withSync :: forall d res r. Member (Scoped (SyncResources res) (Sync d)) r => InterpreterFor (Sync d) r Source #
Run an action with a locally scoped Sync
variable.
interpretScopedSync :: forall d r. Members [Resource, Race, Embed IO] r => InterpreterFor (Scoped (SyncResources (MVar d)) (Sync d)) r Source #
interpretScopedSyncAs :: forall d r. Members [Resource, Race, Embed IO] r => d -> InterpreterFor (Scoped (SyncResources (MVar d)) (Sync d)) r Source #
Racing
prog = Polysemy.Conc.race (httpRequest "hackage.haskell.org") (readFile "/path/to/file") >>= \case Left _ -> putStrLn "hackage was faster" Right _ -> putStrLn "file was faster"
When the first thunk finishes, the other will be killed.
Abstract the concept of running two programs concurrently, aborting the other when one terminates.
Timeout
is a simpler variant, where one thread just sleeps for a given interval.
Instances
type DefiningModule Race Source # | |
Defined in Polysemy.Conc.Effect.Race |
race :: forall a b r. Member Race r => Sem r a -> Sem r b -> Sem r (Either a b) Source #
Run both programs concurrently, returning the result of the faster one.
timeout :: forall a b u r. TimeUnit u => Member Race r => Sem r a -> u -> Sem r b -> Sem r (Either a b) Source #
Run the fallback action if the given program doesn't finish within the specified interval.
timeoutAs :: TimeUnit u => Member Race r => a -> u -> Sem r b -> Sem r (Either a b) Source #
Version of timeout
that takes a pure fallback value.
timeoutU :: TimeUnit u => Member Race r => u -> Sem r () -> Sem r () Source #
Specialization of timeout
for unit actions.
timeoutMaybe :: TimeUnit u => Member Race r => u -> Sem r a -> Sem r (Maybe a) Source #
Variant of timeout
that returns Maybe
.
:: forall e w u t d r a. TimeUnit w | |
=> TimeUnit u | |
=> Members [Race, Time t d] r | |
=> w | The timeout after which the attempt is abandoned. |
-> u | The waiting interval between two tries. |
-> Sem r (Either e a) | |
-> Sem r (Maybe a) |
Run an action repeatedly until it returns Right
or the timout has been exceeded.
:: forall e w u t d r a. TimeUnit w | |
=> TimeUnit u | |
=> Members [Race, Time t d, Embed IO] r | |
=> w | The timeout after which the attempt is abandoned. |
-> u | The waiting interval between two tries. |
-> Sem r (Either e a) | |
-> Sem r (Maybe (Either e a)) |
Run an action repeatedly until it returns Right
or the timout has been exceeded.
If the action failed at least once, the last error will be returned in case of timeout.
Interpreters
interpretRace :: Member (Final IO) r => InterpreterFor Race r Source #
Signal Handling
data Interrupt :: Effect Source #
The interrupt handler effect allows three kinds of interaction for interrupt signals:
- Execute a callback when a signal is received
- Block a thread until a signal is received
- Kill a thread when a signal is received
For documentation on the constructors, see the module Polysemy.Conc.Effect.Interrupt.
import qualified Polysemy.Conc.Effect.Interrupt as Interrupt prog = do Interrupt.register "task 1" (putStrLn "interrupted") Interrupt.killOnQuit $ forever do doSomeWork
Instances
type DefiningModule Interrupt Source # | |
Defined in Polysemy.Conc.Effect.Interrupt |
Interpreters
interpretInterrupt :: Members [Critical, Race, Async, Embed IO] r => InterpreterFor Interrupt r Source #
Interpret Interrupt
by installing a signal handler.
Event Channels
data Events (token :: Type) (e :: Type) :: Effect Source #
An event publisher that can be consumed from multiple threads.
Instances
type DefiningModule Events Source # | |
Defined in Polysemy.Conc.Effect.Events |
subscribe :: forall e token r. Member (Scoped (EventToken token) (Consume e)) r => InterpreterFor (Consume e) r Source #
Create a new scope for Events
, causing the nested program to get its own copy of the event stream.
To be used with interpretEventsChan
.
subscribeWhile :: forall e token r. Member (EventConsumer token e) r => (e -> Sem r Bool) -> Sem r () Source #
Pull repeatedly from the Events
channel, passing the event to the supplied callback.
Stop when the action returns False
.
subscribeLoop :: forall e token r. Member (EventConsumer token e) r => (e -> Sem r ()) -> Sem r () Source #
Pull repeatedly from the Events
channel, passing the event to the supplied callback.
data EventToken token Source #
Instances
Eq token => Eq (EventToken token) Source # | |
Defined in Polysemy.Conc.Effect.Events (==) :: EventToken token -> EventToken token -> Bool # (/=) :: EventToken token -> EventToken token -> Bool # | |
Show token => Show (EventToken token) Source # | |
Defined in Polysemy.Conc.Effect.Events showsPrec :: Int -> EventToken token -> ShowS # show :: EventToken token -> String # showList :: [EventToken token] -> ShowS # | |
Generic (EventToken token) Source # | |
Defined in Polysemy.Conc.Effect.Events type Rep (EventToken token) :: Type -> Type # from :: EventToken token -> Rep (EventToken token) x # to :: Rep (EventToken token) x -> EventToken token # | |
type Rep (EventToken token) Source # | |
Defined in Polysemy.Conc.Effect.Events type Rep (EventToken token) = D1 ('MetaData "EventToken" "Polysemy.Conc.Effect.Events" "polysemy-conc-0.3.0.0-inplace" 'True) (C1 ('MetaCons "EventToken" 'PrefixI 'True) (S1 ('MetaSel ('Just "unEventToken") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 token))) |
type EventChan e = EventToken (OutChan e) Source #
Convenience alias for the default EventToken
that uses an OutChan
.
type ChanEvents e = Events (OutChan e) e Source #
type EventConsumer token e = Scoped (EventToken token) (Consume e) Source #
Convenience alias for the consumer effect.
type ChanConsumer e = Scoped (EventChan e) (Consume e) Source #
Convenience alias for the consumer effect using the default implementation.
Interpreters
interpretEventsChan :: forall e r. Members [Resource, Race, Async, Embed IO] r => InterpretersFor [Events (OutChan e) e, ChanConsumer e] r Source #
Interpret Events
and Consume
together by connecting them to the two ends of an unagi channel.
Consume
is only interpreted in a Scoped
manner, ensuring that a new duplicate of the channel is created so that
all consumers see all events (from the moment they are connected).
This should be used in conjunction with subscribe
:
interpretEventsChan do async $ subscribe do putStrLn =<< consume publish "hello"
Whenever subscribe
creates a new scope, this interpreter calls dupChan
and passes the
duplicate to interpretConsumeChan
.
Exceptions
data Critical :: Effect Source #
An effect that catches exceptions.
Provides the exact functionality of fromExceptionSem
, but pushes the dependency on Final IO
to the
interpreter, and makes it optional.
Instances
type DefiningModule Critical Source # | |
Defined in Polysemy.Conc.Effect.Critical |
Interpreters
interpretCritical :: Member (Final IO) r => InterpreterFor Critical r Source #
interpretCriticalNull :: InterpreterFor Critical r Source #
Interpret Critical
by doing nothing.
Other Combinators
interpretAtomic :: forall a r. Member (Embed IO) r => a -> InterpreterFor (AtomicState a) r Source #
Convenience wrapper around runAtomicStateTVar
that creates a new TVar
.
withAsyncBlock :: Members [Resource, Async] r => Sem r b -> (Async (Maybe b) -> Sem r a) -> Sem r a Source #
Run the first action asynchronously while the second action executes, then cancel the first action. Passes the handle into the action to allow it to await its result.
When cancelling, this variant will wait indefinitely for the thread to be gone.
withAsync :: Members [Resource, Race, Async] r => Sem r b -> (Async (Maybe b) -> Sem r a) -> Sem r a Source #
Run the first action asynchronously while the second action executes, then cancel the first action. Passes the handle into the action to allow it to await its result.
When cancelling, this variant will wait for 500ms for the thread to be gone.
withAsync_ :: Members [Resource, Race, Async] r => Sem r b -> Sem r a -> Sem r a Source #
Run the first action asynchronously while the second action executes, then cancel the first action. Discards the handle, expecting the async action to either terminate or be cancelled.
When cancelling, this variant will wait for 500ms for the thread to be gone.