{-# LANGUAGE FlexibleContexts #-}
module Control.Concurrent.NQE.Publisher
( Subscriber
, PublisherMessage(..)
, Publisher
, withSubscription
, subscribe
, unsubscribe
, withPublisher
, publisher
, publisherProcess
, publish
, publishSTM
) where
import Control.Concurrent.NQE.Process
import Control.Concurrent.Unique
import Control.Monad
import Control.Monad.Reader
import Data.Function
import Data.Hashable
import Data.List
import UnliftIO
data Subscriber msg = Subscriber (Listen msg) Unique
instance Eq (Subscriber msg) where
== :: Subscriber msg -> Subscriber msg -> Bool
(==) = Unique -> Unique -> Bool
forall a. Eq a => a -> a -> Bool
(==) (Unique -> Unique -> Bool)
-> (Subscriber msg -> Unique)
-> Subscriber msg
-> Subscriber msg
-> Bool
forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
`on` Subscriber msg -> Unique
forall {msg}. Subscriber msg -> Unique
f
where
f :: Subscriber msg -> Unique
f (Subscriber Listen msg
_ Unique
u) = Unique
u
instance Hashable (Subscriber msg) where
hashWithSalt :: Int -> Subscriber msg -> Int
hashWithSalt Int
i (Subscriber Listen msg
_ Unique
u) = Int -> Unique -> Int
forall a. Hashable a => Int -> a -> Int
hashWithSalt Int
i Unique
u
data PublisherMessage msg
= Subscribe !(Listen msg) !(Listen (Subscriber msg))
| Unsubscribe !(Subscriber msg)
| Event msg
type Publisher msg = Process (PublisherMessage msg)
publish :: MonadIO m => msg -> Publisher msg -> m ()
publish :: forall (m :: * -> *) msg. MonadIO m => msg -> Publisher msg -> m ()
publish = PublisherMessage msg -> Process (PublisherMessage msg) -> m ()
forall (m :: * -> *) (mbox :: * -> *) msg.
(MonadIO m, OutChan mbox) =>
msg -> mbox msg -> m ()
send (PublisherMessage msg -> Process (PublisherMessage msg) -> m ())
-> (msg -> PublisherMessage msg)
-> msg
-> Process (PublisherMessage msg)
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. msg -> PublisherMessage msg
forall msg. msg -> PublisherMessage msg
Event
publishSTM :: msg -> Publisher msg -> STM ()
publishSTM :: forall msg. msg -> Publisher msg -> STM ()
publishSTM = PublisherMessage msg -> Process (PublisherMessage msg) -> STM ()
forall msg. msg -> Process msg -> STM ()
forall (mbox :: * -> *) msg.
OutChan mbox =>
msg -> mbox msg -> STM ()
sendSTM (PublisherMessage msg -> Process (PublisherMessage msg) -> STM ())
-> (msg -> PublisherMessage msg)
-> msg
-> Process (PublisherMessage msg)
-> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. msg -> PublisherMessage msg
forall msg. msg -> PublisherMessage msg
Event
withSubscription ::
MonadUnliftIO m => Publisher msg -> (Inbox msg -> m a) -> m a
withSubscription :: forall (m :: * -> *) msg a.
MonadUnliftIO m =>
Publisher msg -> (Inbox msg -> m a) -> m a
withSubscription Publisher msg
pub Inbox msg -> m a
f = do
Inbox msg
inbox <- m (Inbox msg)
forall (m :: * -> *) msg. MonadIO m => m (Inbox msg)
newInbox
let sub :: m (Subscriber msg)
sub = Publisher msg -> Listen msg -> m (Subscriber msg)
forall (m :: * -> *) msg.
MonadIO m =>
Publisher msg -> Listen msg -> m (Subscriber msg)
subscribe Publisher msg
pub (msg -> Inbox msg -> STM ()
forall msg. msg -> Inbox msg -> STM ()
forall (mbox :: * -> *) msg.
OutChan mbox =>
msg -> mbox msg -> STM ()
`sendSTM` Inbox msg
inbox)
unsub :: Subscriber msg -> m ()
unsub = Publisher msg -> Subscriber msg -> m ()
forall (m :: * -> *) msg.
MonadIO m =>
Publisher msg -> Subscriber msg -> m ()
unsubscribe Publisher msg
pub
m (Subscriber msg)
-> (Subscriber msg -> m ()) -> (Subscriber msg -> m a) -> m a
forall (m :: * -> *) a b c.
MonadUnliftIO m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket m (Subscriber msg)
sub Subscriber msg -> m ()
unsub ((Subscriber msg -> m a) -> m a) -> (Subscriber msg -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Subscriber msg
_ -> Inbox msg -> m a
f Inbox msg
inbox
subscribe :: MonadIO m => Publisher msg -> Listen msg -> m (Subscriber msg)
subscribe :: forall (m :: * -> *) msg.
MonadIO m =>
Publisher msg -> Listen msg -> m (Subscriber msg)
subscribe Publisher msg
pub Listen msg
sub = Listen msg -> Listen (Subscriber msg) -> PublisherMessage msg
forall msg.
Listen msg -> Listen (Subscriber msg) -> PublisherMessage msg
Subscribe Listen msg
sub (Listen (Subscriber msg) -> PublisherMessage msg)
-> Publisher msg -> m (Subscriber msg)
forall (m :: * -> *) (mbox :: * -> *) response request.
(MonadIO m, OutChan mbox) =>
(Listen response -> request) -> mbox request -> m response
`query` Publisher msg
pub
unsubscribe :: MonadIO m => Publisher msg -> Subscriber msg -> m ()
unsubscribe :: forall (m :: * -> *) msg.
MonadIO m =>
Publisher msg -> Subscriber msg -> m ()
unsubscribe Publisher msg
pub Subscriber msg
sub = Subscriber msg -> PublisherMessage msg
forall msg. Subscriber msg -> PublisherMessage msg
Unsubscribe Subscriber msg
sub PublisherMessage msg -> Publisher msg -> m ()
forall (m :: * -> *) (mbox :: * -> *) msg.
(MonadIO m, OutChan mbox) =>
msg -> mbox msg -> m ()
`send` Publisher msg
pub
withPublisher :: MonadUnliftIO m => (Publisher msg -> m a) -> m a
withPublisher :: forall (m :: * -> *) msg a.
MonadUnliftIO m =>
(Publisher msg -> m a) -> m a
withPublisher = (Inbox (PublisherMessage msg) -> m ())
-> (Process (PublisherMessage msg) -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
(Inbox msg -> m ()) -> (Process msg -> m a) -> m a
withProcess Inbox (PublisherMessage msg) -> m ()
forall (m :: * -> *) msg.
MonadUnliftIO m =>
Inbox (PublisherMessage msg) -> m ()
publisherProcess
publisher :: MonadUnliftIO m => m (Publisher msg)
publisher :: forall (m :: * -> *) msg. MonadUnliftIO m => m (Publisher msg)
publisher = (Inbox (PublisherMessage msg) -> m ())
-> m (Process (PublisherMessage msg))
forall (m :: * -> *) msg.
MonadUnliftIO m =>
(Inbox msg -> m ()) -> m (Process msg)
process Inbox (PublisherMessage msg) -> m ()
forall (m :: * -> *) msg.
MonadUnliftIO m =>
Inbox (PublisherMessage msg) -> m ()
publisherProcess
publisherProcess :: MonadUnliftIO m => Inbox (PublisherMessage msg) -> m ()
publisherProcess :: forall (m :: * -> *) msg.
MonadUnliftIO m =>
Inbox (PublisherMessage msg) -> m ()
publisherProcess Inbox (PublisherMessage msg)
inbox = [Subscriber msg] -> m (TVar [Subscriber msg])
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO [] m (TVar [Subscriber msg])
-> (TVar [Subscriber msg] -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ReaderT (TVar [Subscriber msg]) m ()
-> TVar [Subscriber msg] -> m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT ReaderT (TVar [Subscriber msg]) m ()
forall {b}. ReaderT (TVar [Subscriber msg]) m b
go
where
go :: ReaderT (TVar [Subscriber msg]) m b
go = ReaderT (TVar [Subscriber msg]) m ()
-> ReaderT (TVar [Subscriber msg]) m b
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (ReaderT (TVar [Subscriber msg]) m ()
-> ReaderT (TVar [Subscriber msg]) m b)
-> ReaderT (TVar [Subscriber msg]) m ()
-> ReaderT (TVar [Subscriber msg]) m b
forall a b. (a -> b) -> a -> b
$ Inbox (PublisherMessage msg)
-> ReaderT (TVar [Subscriber msg]) m (PublisherMessage msg)
forall (mbox :: * -> *) (m :: * -> *) msg.
(InChan mbox, MonadIO m) =>
mbox msg -> m msg
receive Inbox (PublisherMessage msg)
inbox ReaderT (TVar [Subscriber msg]) m (PublisherMessage msg)
-> (PublisherMessage msg -> ReaderT (TVar [Subscriber msg]) m ())
-> ReaderT (TVar [Subscriber msg]) m ()
forall a b.
ReaderT (TVar [Subscriber msg]) m a
-> (a -> ReaderT (TVar [Subscriber msg]) m b)
-> ReaderT (TVar [Subscriber msg]) m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= PublisherMessage msg -> ReaderT (TVar [Subscriber msg]) m ()
forall (m :: * -> *) msg.
(MonadIO m, MonadReader (TVar [Subscriber msg]) m) =>
PublisherMessage msg -> m ()
publisherMessage
publisherMessage ::
(MonadIO m, MonadReader (TVar [Subscriber msg]) m)
=> PublisherMessage msg
-> m ()
publisherMessage :: forall (m :: * -> *) msg.
(MonadIO m, MonadReader (TVar [Subscriber msg]) m) =>
PublisherMessage msg -> m ()
publisherMessage (Subscribe Listen msg
sub Listen (Subscriber msg)
r) =
m (TVar [Subscriber msg])
forall r (m :: * -> *). MonadReader r m => m r
ask m (TVar [Subscriber msg])
-> (TVar [Subscriber msg] -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \TVar [Subscriber msg]
box -> do
Unique
u <- IO Unique -> m Unique
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO Unique
newUnique
let s :: Subscriber msg
s = Listen msg -> Unique -> Subscriber msg
forall msg. Listen msg -> Unique -> Subscriber msg
Subscriber Listen msg
sub Unique
u
STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ do
TVar [Subscriber msg]
-> ([Subscriber msg] -> [Subscriber msg]) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar [Subscriber msg]
box ([Subscriber msg] -> [Subscriber msg] -> [Subscriber msg]
forall a. Eq a => [a] -> [a] -> [a]
`union` [Subscriber msg
s])
Listen (Subscriber msg)
r Subscriber msg
s
publisherMessage (Unsubscribe Subscriber msg
sub) =
m (TVar [Subscriber msg])
forall r (m :: * -> *). MonadReader r m => m r
ask m (TVar [Subscriber msg])
-> (TVar [Subscriber msg] -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \TVar [Subscriber msg]
box -> STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TVar [Subscriber msg]
-> ([Subscriber msg] -> [Subscriber msg]) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar [Subscriber msg]
box (Subscriber msg -> [Subscriber msg] -> [Subscriber msg]
forall a. Eq a => a -> [a] -> [a]
delete Subscriber msg
sub))
publisherMessage (Event msg
event) =
m (TVar [Subscriber msg])
forall r (m :: * -> *). MonadReader r m => m r
ask m (TVar [Subscriber msg])
-> (TVar [Subscriber msg] -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \TVar [Subscriber msg]
box ->
STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$
TVar [Subscriber msg] -> STM [Subscriber msg]
forall a. TVar a -> STM a
readTVar TVar [Subscriber msg]
box STM [Subscriber msg] -> ([Subscriber msg] -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \[Subscriber msg]
subs ->
[Subscriber msg] -> Listen (Subscriber msg) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Subscriber msg]
subs (Listen (Subscriber msg) -> STM ())
-> Listen (Subscriber msg) -> STM ()
forall a b. (a -> b) -> a -> b
$ \(Subscriber Listen msg
sub Unique
_) -> Listen msg
sub msg
event