{-# LANGUAGE CPP #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
module Database.EventStore.Internal.Control
(
EventStore
, runEventStore
, getSettings
, freshUUID
, Pub(..)
, Sub(..)
, Hub
, Subscribe
, Publish
, asPub
, asSub
, asHub
, Bus
, newBus
, busStop
, Message
, toMsg
, fromMsg
, busProcessedEverything
, publish
, publishWith
, subscribe
, stopBus
, publisher
, monitorIncrPkgCount
, monitorIncrConnectionDrop
, monitorAddDataTransmitted
, monitorIncrForceReconnect
, monitorIncrHeartbeatTimeouts
, module Database.EventStore.Internal.Settings
) where
#if __GLASGOW_HASKELL__ > 710
import Control.Monad.Fail
#endif
import Data.Typeable
#if __GLASGOW_HASKELL__ < 802
import Data.Typeable.Internal
#else
import GHC.Fingerprint
#endif
import Control.Monad.Reader
import Data.UUID
import Data.UUID.V4
import Database.EventStore.Internal.Logger
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Settings
data Env =
Env { Env -> LoggerRef
__logRef :: !LoggerRef
, Env -> Settings
__settings :: !Settings
, Env -> Bus
__bus :: !Bus
, Env -> MonitoringBackend
__monitor :: !MonitoringBackend
}
newtype EventStore a =
EventStore { forall a. EventStore a -> ReaderT Env IO a
unEventStore :: ReaderT Env IO a }
deriving ( forall a b. a -> EventStore b -> EventStore a
forall a b. (a -> b) -> EventStore a -> EventStore b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: forall a b. a -> EventStore b -> EventStore a
$c<$ :: forall a b. a -> EventStore b -> EventStore a
fmap :: forall a b. (a -> b) -> EventStore a -> EventStore b
$cfmap :: forall a b. (a -> b) -> EventStore a -> EventStore b
Functor
, Functor EventStore
forall a. a -> EventStore a
forall a b. EventStore a -> EventStore b -> EventStore a
forall a b. EventStore a -> EventStore b -> EventStore b
forall a b. EventStore (a -> b) -> EventStore a -> EventStore b
forall a b c.
(a -> b -> c) -> EventStore a -> EventStore b -> EventStore c
forall (f :: * -> *).
Functor f
-> (forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
<* :: forall a b. EventStore a -> EventStore b -> EventStore a
$c<* :: forall a b. EventStore a -> EventStore b -> EventStore a
*> :: forall a b. EventStore a -> EventStore b -> EventStore b
$c*> :: forall a b. EventStore a -> EventStore b -> EventStore b
liftA2 :: forall a b c.
(a -> b -> c) -> EventStore a -> EventStore b -> EventStore c
$cliftA2 :: forall a b c.
(a -> b -> c) -> EventStore a -> EventStore b -> EventStore c
<*> :: forall a b. EventStore (a -> b) -> EventStore a -> EventStore b
$c<*> :: forall a b. EventStore (a -> b) -> EventStore a -> EventStore b
pure :: forall a. a -> EventStore a
$cpure :: forall a. a -> EventStore a
Applicative
, Applicative EventStore
forall a. a -> EventStore a
forall a b. EventStore a -> EventStore b -> EventStore b
forall a b. EventStore a -> (a -> EventStore b) -> EventStore b
forall (m :: * -> *).
Applicative m
-> (forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
return :: forall a. a -> EventStore a
$creturn :: forall a. a -> EventStore a
>> :: forall a b. EventStore a -> EventStore b -> EventStore b
$c>> :: forall a b. EventStore a -> EventStore b -> EventStore b
>>= :: forall a b. EventStore a -> (a -> EventStore b) -> EventStore b
$c>>= :: forall a b. EventStore a -> (a -> EventStore b) -> EventStore b
Monad
#if __GLASGOW_HASKELL__ > 710
, Monad EventStore
forall a. String -> EventStore a
forall (m :: * -> *).
Monad m -> (forall a. String -> m a) -> MonadFail m
fail :: forall a. String -> EventStore a
$cfail :: forall a. String -> EventStore a
MonadFail
#endif
, Monad EventStore
forall e a. Exception e => e -> EventStore a
forall (m :: * -> *).
Monad m -> (forall e a. Exception e => e -> m a) -> MonadThrow m
throwM :: forall e a. Exception e => e -> EventStore a
$cthrowM :: forall e a. Exception e => e -> EventStore a
MonadThrow
, MonadThrow EventStore
forall e a.
Exception e =>
EventStore a -> (e -> EventStore a) -> EventStore a
forall (m :: * -> *).
MonadThrow m
-> (forall e a. Exception e => m a -> (e -> m a) -> m a)
-> MonadCatch m
catch :: forall e a.
Exception e =>
EventStore a -> (e -> EventStore a) -> EventStore a
$ccatch :: forall e a.
Exception e =>
EventStore a -> (e -> EventStore a) -> EventStore a
MonadCatch
, Monad EventStore
forall a. IO a -> EventStore a
forall (m :: * -> *).
Monad m -> (forall a. IO a -> m a) -> MonadIO m
liftIO :: forall a. IO a -> EventStore a
$cliftIO :: forall a. IO a -> EventStore a
MonadIO
, Monad EventStore
forall a. (a -> EventStore a) -> EventStore a
forall (m :: * -> *).
Monad m -> (forall a. (a -> m a) -> m a) -> MonadFix m
mfix :: forall a. (a -> EventStore a) -> EventStore a
$cmfix :: forall a. (a -> EventStore a) -> EventStore a
MonadFix
)
getEnv :: EventStore Env
getEnv :: EventStore Env
getEnv = forall a. ReaderT Env IO a -> EventStore a
EventStore forall r (m :: * -> *). MonadReader r m => m r
ask
getSettings :: EventStore Settings
getSettings :: EventStore Settings
getSettings = Env -> Settings
__settings forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> EventStore Env
getEnv
freshUUID :: MonadIO m => m UUID
freshUUID :: forall (m :: * -> *). MonadIO m => m UUID
freshUUID = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UUID
nextRandom
publisher :: EventStore Publish
publisher :: EventStore Publish
publisher = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall p. Pub p => p -> Publish
asPub forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> Bus
__bus) EventStore Env
getEnv
stopBus :: EventStore ()
stopBus :: EventStore ()
stopBus = forall (m :: * -> *). MonadIO m => Bus -> m ()
busStop forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> Bus
__bus forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< EventStore Env
getEnv
instance MonadBase IO EventStore where
liftBase :: forall a. IO a -> EventStore a
liftBase IO α
m = forall a. ReaderT Env IO a -> EventStore a
EventStore forall a b. (a -> b) -> a -> b
$ forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase IO α
m
instance MonadBaseControl IO EventStore where
type StM EventStore a = a
liftBaseWith :: forall a. (RunInBase EventStore IO -> IO a) -> EventStore a
liftBaseWith RunInBase EventStore IO -> IO a
run = forall a. ReaderT Env IO a -> EventStore a
EventStore forall a b. (a -> b) -> a -> b
$ do
Env
env <- forall r (m :: * -> *). MonadReader r m => m r
ask
a
s <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ RunInBase EventStore IO -> IO a
run (\EventStore a
m -> forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (forall a. EventStore a -> ReaderT Env IO a
unEventStore EventStore a
m) Env
env)
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM a
s
restoreM :: forall a. StM EventStore a -> EventStore a
restoreM = forall (m :: * -> *) a. Monad m => a -> m a
return
instance MonadLogger EventStore where
monadLoggerLog :: forall msg.
ToLogStr msg =>
Loc -> LogSource -> LogLevel -> msg -> EventStore ()
monadLoggerLog Loc
loc LogSource
src LogLevel
lvl msg
msg = do
LoggerRef
loggerRef <- Env -> LoggerRef
__logRef forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> EventStore Env
getEnv
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ LoggerRef -> Loc -> LogSource -> LogLevel -> LogStr -> IO ()
loggerCallback LoggerRef
loggerRef Loc
loc LogSource
src LogLevel
lvl (forall msg. ToLogStr msg => msg -> LogStr
toLogStr msg
msg)
instance MonadLoggerIO EventStore where
askLoggerIO :: EventStore (Loc -> LogSource -> LogLevel -> LogStr -> IO ())
askLoggerIO = do
LoggerRef
loggerRef <- Env -> LoggerRef
__logRef forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> EventStore Env
getEnv
forall (m :: * -> *) a. Monad m => a -> m a
return (LoggerRef -> Loc -> LogSource -> LogLevel -> LogStr -> IO ()
loggerCallback LoggerRef
loggerRef)
runEventStore :: LoggerRef -> Settings -> Bus -> EventStore a -> IO a
runEventStore :: forall a. LoggerRef -> Settings -> Bus -> EventStore a -> IO a
runEventStore LoggerRef
ref Settings
setts Bus
bus (EventStore ReaderT Env IO a
action) =
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT ReaderT Env IO a
action (LoggerRef -> Settings -> Bus -> MonitoringBackend -> Env
Env LoggerRef
ref Settings
setts Bus
bus (Bus -> MonitoringBackend
_monitoring Bus
bus))
data Message where
Message :: Typeable a => a -> Message
deriving Typeable
instance Show Message where
show :: Message -> String
show (Message a
a) = String
"Message: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show (forall a. Typeable a => a -> TypeRep
typeOf a
a)
toMsg :: Typeable a => a -> Message
toMsg :: forall a. Typeable a => a -> Message
toMsg = forall a. Typeable a => a -> Message
Message
fromMsg :: Typeable a => Message -> Maybe a
fromMsg :: forall a. Typeable a => Message -> Maybe a
fromMsg (Message a
a) = forall a b. (Typeable a, Typeable b) => a -> Maybe b
cast a
a
class Pub p where
publishSTM :: Typeable a => p -> a -> STM Bool
publish :: Typeable a => a -> EventStore ()
publish :: forall a. Typeable a => a -> EventStore ()
publish a
a = do
Bus
bus <- Env -> Bus
__bus forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> EventStore Env
getEnv
forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith Bus
bus a
a
publishWith :: (Pub p, Typeable a, MonadIO m) => p -> a -> m ()
publishWith :: forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith p
p a
a = forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
Bool
_ <- forall p a. (Pub p, Typeable a) => p -> a -> STM Bool
publishSTM p
p a
a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
class Sub s where
subscribeEventHandler :: s -> EventHandler -> IO ()
subscribe :: (Sub s, Typeable a)
=> s
-> (a -> EventStore ())
-> IO ()
subscribe :: forall s a.
(Sub s, Typeable a) =>
s -> (a -> EventStore ()) -> IO ()
subscribe s
s a -> EventStore ()
k = forall s. Sub s => s -> EventHandler -> IO ()
subscribeEventHandler s
s (forall a.
Typeable a =>
Proxy a -> (a -> EventStore ()) -> EventHandler
EventHandler forall {k} (t :: k). Proxy t
Proxy a -> EventStore ()
k)
data Publish = forall p. Pub p => Publish p
instance Pub Publish where
publishSTM :: forall a. Typeable a => Publish -> a -> STM Bool
publishSTM (Publish p
p) a
a = forall p a. (Pub p, Typeable a) => p -> a -> STM Bool
publishSTM p
p a
a
data Subscribe = forall p. Sub p => Subscribe p
instance Sub Subscribe where
subscribeEventHandler :: Subscribe -> EventHandler -> IO ()
subscribeEventHandler (Subscribe p
p) EventHandler
a = forall s. Sub s => s -> EventHandler -> IO ()
subscribeEventHandler p
p EventHandler
a
data Hub = forall h. (Sub h, Pub h) => Hub h
instance Sub Hub where
subscribeEventHandler :: Hub -> EventHandler -> IO ()
subscribeEventHandler (Hub h
h) = forall s. Sub s => s -> EventHandler -> IO ()
subscribeEventHandler h
h
instance Pub Hub where
publishSTM :: forall a. Typeable a => Hub -> a -> STM Bool
publishSTM (Hub h
h) = forall p a. (Pub p, Typeable a) => p -> a -> STM Bool
publishSTM h
h
asSub :: Sub s => s -> Subscribe
asSub :: forall s. Sub s => s -> Subscribe
asSub = forall s. Sub s => s -> Subscribe
Subscribe
asPub :: Pub p => p -> Publish
asPub :: forall p. Pub p => p -> Publish
asPub = forall p. Pub p => p -> Publish
Publish
asHub :: (Sub h, Pub h) => h -> Hub
asHub :: forall h. (Sub h, Pub h) => h -> Hub
asHub = forall h. (Sub h, Pub h) => h -> Hub
Hub
data Type = Type TypeRep Fingerprint
instance Show Type where
show :: Type -> String
show (Type TypeRep
rep Fingerprint
_) = String
"type " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show TypeRep
rep
instance Eq Type where
Type TypeRep
_ Fingerprint
a == :: Type -> Type -> Bool
== Type TypeRep
_ Fingerprint
b = Fingerprint
a forall a. Eq a => a -> a -> Bool
== Fingerprint
b
instance Ord Type where
compare :: Type -> Type -> Ordering
compare (Type TypeRep
_ Fingerprint
a) (Type TypeRep
_ Fingerprint
b) = forall a. Ord a => a -> a -> Ordering
compare Fingerprint
a Fingerprint
b
instance Hashable Type where
hashWithSalt :: Int -> Type -> Int
hashWithSalt Int
s (Type TypeRep
_ (Fingerprint Word64
b Word64
l)) = forall a. Hashable a => Int -> a -> Int
hashWithSalt Int
s (Word64
b, Word64
l)
data GetType
= forall a. Typeable a => FromTypeable a
| forall prx a. Typeable a => FromProxy (prx a)
getFingerprint :: TypeRep -> Fingerprint
#if __GLASGOW_HASKELL__ == 708
getFingerprint (TypeRep fp _ _) = fp
#else
getFingerprint :: TypeRep -> Fingerprint
getFingerprint = TypeRep -> Fingerprint
typeRepFingerprint
#endif
getType :: GetType -> Type
getType :: GetType -> Type
getType GetType
op = TypeRep -> Fingerprint -> Type
Type TypeRep
t (TypeRep -> Fingerprint
getFingerprint TypeRep
t)
where
t :: TypeRep
t = case GetType
op of
FromTypeable a
a -> forall a. Typeable a => a -> TypeRep
typeOf a
a
FromProxy prx a
prx -> forall {k} (proxy :: k -> *) (a :: k).
Typeable a =>
proxy a -> TypeRep
typeRep prx a
prx
type EventHandlers = HashMap Type (Seq EventHandler)
propagate :: Typeable a => a -> Seq EventHandler -> EventStore ()
propagate :: forall a. Typeable a => a -> Seq EventHandler -> EventStore ()
propagate a
a = forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ forall a b. (a -> b) -> a -> b
$ \(EventHandler Proxy a
_ a -> EventStore ()
k) -> do
let Just a
b = forall a b. (Typeable a, Typeable b) => a -> Maybe b
cast a
a
tpe :: TypeRep
tpe = forall a. Typeable a => a -> TypeRep
typeOf a
b
Either SomeException ()
outcome <- forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either SomeException a)
tryAny forall a b. (a -> b) -> a -> b
$ a -> EventStore ()
k a
b
case Either SomeException ()
outcome of
Right ()
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Left SomeException
e -> $(logError) [i|Exception when propagating #{tpe}: #{e}.|]
data EventHandler where
EventHandler :: Typeable a
=> Proxy a
-> (a -> EventStore ())
-> EventHandler
instance Show EventHandler where
show :: EventHandler -> String
show (EventHandler Proxy a
prx a -> EventStore ()
_) = String
"Handle " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show (forall {k} (proxy :: k -> *) (a :: k).
Typeable a =>
proxy a -> TypeRep
typeRep Proxy a
prx)
data Bus =
Bus { Bus -> LoggerRef
_busLoggerRef :: LoggerRef
, Bus -> Settings
_busSettings :: Settings
, Bus -> IORef EventHandlers
_busEventHandlers :: IORef EventHandlers
, Bus -> TBMQueue Message
_busQueue :: TBMQueue Message
, Bus -> Async ()
_workerAsync :: Async ()
, Bus -> MonitoringBackend
_monitoring :: MonitoringBackend
}
busStop :: MonadIO m => Bus -> m ()
busStop :: forall (m :: * -> *). MonadIO m => Bus -> m ()
busStop Bus{Async ()
IORef EventHandlers
TBMQueue Message
LoggerRef
MonitoringBackend
Settings
_monitoring :: MonitoringBackend
_workerAsync :: Async ()
_busQueue :: TBMQueue Message
_busEventHandlers :: IORef EventHandlers
_busSettings :: Settings
_busLoggerRef :: LoggerRef
_workerAsync :: Bus -> Async ()
_busQueue :: Bus -> TBMQueue Message
_busEventHandlers :: Bus -> IORef EventHandlers
_busSettings :: Bus -> Settings
_busLoggerRef :: Bus -> LoggerRef
_monitoring :: Bus -> MonitoringBackend
..} = forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue Message
_busQueue
busProcessedEverything :: Bus -> IO ()
busProcessedEverything :: Bus -> IO ()
busProcessedEverything Bus{Async ()
IORef EventHandlers
TBMQueue Message
LoggerRef
MonitoringBackend
Settings
_monitoring :: MonitoringBackend
_workerAsync :: Async ()
_busQueue :: TBMQueue Message
_busEventHandlers :: IORef EventHandlers
_busSettings :: Settings
_busLoggerRef :: LoggerRef
_workerAsync :: Bus -> Async ()
_busQueue :: Bus -> TBMQueue Message
_busEventHandlers :: Bus -> IORef EventHandlers
_busSettings :: Bus -> Settings
_busLoggerRef :: Bus -> LoggerRef
_monitoring :: Bus -> MonitoringBackend
..} = forall (m :: * -> *) a.
MonadBaseControl IO m =>
Async (StM m a) -> m a
wait Async ()
_workerAsync
messageType :: Type
messageType :: Type
messageType = GetType -> Type
getType (forall (prx :: * -> *) a. Typeable a => prx a -> GetType
FromProxy (forall {k} (t :: k). Proxy t
Proxy :: Proxy Message))
newBus :: LoggerRef -> Settings -> IO Bus
newBus :: LoggerRef -> Settings -> IO Bus
newBus LoggerRef
ref Settings
setts = do
Bus
bus <- forall (m :: * -> *) a. MonadFix m => (a -> m a) -> m a
mfix forall a b. (a -> b) -> a -> b
$ \Bus
b -> do
LoggerRef
-> Settings
-> IORef EventHandlers
-> TBMQueue Message
-> Async ()
-> MonitoringBackend
-> Bus
Bus LoggerRef
ref Settings
setts forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) a. MonadBase IO m => a -> m (IORef a)
newIORef forall a. Monoid a => a
mempty
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. Int -> IO (TBMQueue a)
newTBMQueueIO Int
500
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
async (Bus -> IO ()
worker Bus
b)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall (f :: * -> *) a. Applicative f => a -> f a
pure (Settings -> MonitoringBackend
s_monitoring Settings
setts)
forall (m :: * -> *) a. Monad m => a -> m a
return Bus
bus
worker :: Bus -> IO ()
worker :: Bus -> IO ()
worker self :: Bus
self@Bus{Async ()
IORef EventHandlers
TBMQueue Message
LoggerRef
MonitoringBackend
Settings
_monitoring :: MonitoringBackend
_workerAsync :: Async ()
_busQueue :: TBMQueue Message
_busEventHandlers :: IORef EventHandlers
_busSettings :: Settings
_busLoggerRef :: LoggerRef
_workerAsync :: Bus -> Async ()
_busQueue :: Bus -> TBMQueue Message
_busEventHandlers :: Bus -> IORef EventHandlers
_busSettings :: Bus -> Settings
_busLoggerRef :: Bus -> LoggerRef
_monitoring :: Bus -> MonitoringBackend
..} = IO ()
loop
where
handleMsg :: Message -> IO ()
handleMsg (Message a
a) = do
EventHandlers
callbacks <- forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef EventHandlers
_busEventHandlers
forall a. Typeable a => Bus -> EventHandlers -> a -> IO ()
publishing Bus
self EventHandlers
callbacks a
a
IO ()
loop
loop :: IO ()
loop = forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ Message -> IO ()
handleMsg forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue Message
_busQueue)
instance Sub Bus where
subscribeEventHandler :: Bus -> EventHandler -> IO ()
subscribeEventHandler Bus{Async ()
IORef EventHandlers
TBMQueue Message
LoggerRef
MonitoringBackend
Settings
_monitoring :: MonitoringBackend
_workerAsync :: Async ()
_busQueue :: TBMQueue Message
_busEventHandlers :: IORef EventHandlers
_busSettings :: Settings
_busLoggerRef :: LoggerRef
_workerAsync :: Bus -> Async ()
_busQueue :: Bus -> TBMQueue Message
_busEventHandlers :: Bus -> IORef EventHandlers
_busSettings :: Bus -> Settings
_busLoggerRef :: Bus -> LoggerRef
_monitoring :: Bus -> MonitoringBackend
..} hdl :: EventHandler
hdl@(EventHandler Proxy a
prx a -> EventStore ()
_) =
forall (m :: * -> *) a b.
MonadBase IO m =>
IORef a -> (a -> (a, b)) -> m b
atomicModifyIORef' IORef EventHandlers
_busEventHandlers EventHandlers -> (EventHandlers, ())
update
where
update :: EventHandlers -> (EventHandlers, ())
update :: EventHandlers -> (EventHandlers, ())
update EventHandlers
callbacks =
let tpe :: Type
tpe = GetType -> Type
getType (forall (prx :: * -> *) a. Typeable a => prx a -> GetType
FromProxy Proxy a
prx)
next :: ContainerKey EventHandlers -> EventHandlers -> EventHandlers
next = forall map.
IsMap map =>
(Maybe (MapValue map) -> Maybe (MapValue map))
-> ContainerKey map -> map -> map
alterMap forall a b. (a -> b) -> a -> b
$ \Maybe (MapValue EventHandlers)
input ->
case Maybe (MapValue EventHandlers)
input of
Maybe (MapValue EventHandlers)
Nothing -> forall a. a -> Maybe a
Just (forall seq. MonoPointed seq => Element seq -> seq
singleton EventHandler
hdl)
Just MapValue EventHandlers
hs -> forall a. a -> Maybe a
Just (forall seq. SemiSequence seq => seq -> Element seq -> seq
snoc MapValue EventHandlers
hs EventHandler
hdl) in
(ContainerKey EventHandlers -> EventHandlers -> EventHandlers
next Type
tpe EventHandlers
callbacks, ())
instance Pub Bus where
publishSTM :: forall a. Typeable a => Bus -> a -> STM Bool
publishSTM Bus{Async ()
IORef EventHandlers
TBMQueue Message
LoggerRef
MonitoringBackend
Settings
_monitoring :: MonitoringBackend
_workerAsync :: Async ()
_busQueue :: TBMQueue Message
_busEventHandlers :: IORef EventHandlers
_busSettings :: Settings
_busLoggerRef :: LoggerRef
_workerAsync :: Bus -> Async ()
_busQueue :: Bus -> TBMQueue Message
_busEventHandlers :: Bus -> IORef EventHandlers
_busSettings :: Bus -> Settings
_busLoggerRef :: Bus -> LoggerRef
_monitoring :: Bus -> MonitoringBackend
..} a
a = do
Bool
closed <- forall a. TBMQueue a -> STM Bool
isClosedTBMQueue TBMQueue Message
_busQueue
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue Message
_busQueue (forall a. Typeable a => a -> Message
toMsg a
a)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Bool -> Bool
not Bool
closed
publishing :: Typeable a => Bus -> EventHandlers -> a -> IO ()
publishing :: forall a. Typeable a => Bus -> EventHandlers -> a -> IO ()
publishing self :: Bus
self@Bus{Async ()
IORef EventHandlers
TBMQueue Message
LoggerRef
MonitoringBackend
Settings
_monitoring :: MonitoringBackend
_workerAsync :: Async ()
_busQueue :: TBMQueue Message
_busEventHandlers :: IORef EventHandlers
_busSettings :: Settings
_busLoggerRef :: LoggerRef
_workerAsync :: Bus -> Async ()
_busQueue :: Bus -> TBMQueue Message
_busEventHandlers :: Bus -> IORef EventHandlers
_busSettings :: Bus -> Settings
_busLoggerRef :: Bus -> LoggerRef
_monitoring :: Bus -> MonitoringBackend
..} EventHandlers
callbacks a
a = do
let tpe :: Type
tpe = GetType -> Type
getType (forall a. Typeable a => a -> GetType
FromTypeable a
a)
forall a. LoggerRef -> Settings -> Bus -> EventStore a -> IO a
runEventStore LoggerRef
_busLoggerRef Settings
_busSettings Bus
self forall a b. (a -> b) -> a -> b
$ do
$(logDebug) [i|Publishing message #{tpe}.|]
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (forall a. Typeable a => a -> Seq EventHandler -> EventStore ()
propagate a
a) (forall map.
IsMap map =>
ContainerKey map -> map -> Maybe (MapValue map)
lookup Type
tpe EventHandlers
callbacks)
$(logDebug) [i|Message #{tpe} propagated.|]
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Type
tpe forall a. Eq a => a -> a -> Bool
== Type
messageType) forall a b. (a -> b) -> a -> b
$
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (forall a. Typeable a => a -> Seq EventHandler -> EventStore ()
propagate (forall a. Typeable a => a -> Message
toMsg a
a)) (forall map.
IsMap map =>
ContainerKey map -> map -> Maybe (MapValue map)
lookup Type
messageType EventHandlers
callbacks)
monitorIncrPkgCount :: EventStore ()
monitorIncrPkgCount :: EventStore ()
monitorIncrPkgCount = do
Env{LoggerRef
MonitoringBackend
Settings
Bus
__monitor :: MonitoringBackend
__bus :: Bus
__settings :: Settings
__logRef :: LoggerRef
__monitor :: Env -> MonitoringBackend
__bus :: Env -> Bus
__settings :: Env -> Settings
__logRef :: Env -> LoggerRef
..} <- EventStore Env
getEnv
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ MonitoringBackend -> IO ()
monitoringBackendIncrPkgCount MonitoringBackend
__monitor
monitorIncrConnectionDrop :: EventStore ()
monitorIncrConnectionDrop :: EventStore ()
monitorIncrConnectionDrop = do
Env{LoggerRef
MonitoringBackend
Settings
Bus
__monitor :: MonitoringBackend
__bus :: Bus
__settings :: Settings
__logRef :: LoggerRef
__monitor :: Env -> MonitoringBackend
__bus :: Env -> Bus
__settings :: Env -> Settings
__logRef :: Env -> LoggerRef
..} <- EventStore Env
getEnv
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ MonitoringBackend -> IO ()
monitoringBackendIncrConnectionDrop MonitoringBackend
__monitor
monitorAddDataTransmitted :: Int -> EventStore ()
monitorAddDataTransmitted :: Int -> EventStore ()
monitorAddDataTransmitted Int
siz = do
Env{LoggerRef
MonitoringBackend
Settings
Bus
__monitor :: MonitoringBackend
__bus :: Bus
__settings :: Settings
__logRef :: LoggerRef
__monitor :: Env -> MonitoringBackend
__bus :: Env -> Bus
__settings :: Env -> Settings
__logRef :: Env -> LoggerRef
..} <- EventStore Env
getEnv
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ MonitoringBackend -> Int -> IO ()
monitoringBackendAddDataTransmitted MonitoringBackend
__monitor Int
siz
monitorIncrForceReconnect :: EventStore ()
monitorIncrForceReconnect :: EventStore ()
monitorIncrForceReconnect = do
Env{LoggerRef
MonitoringBackend
Settings
Bus
__monitor :: MonitoringBackend
__bus :: Bus
__settings :: Settings
__logRef :: LoggerRef
__monitor :: Env -> MonitoringBackend
__bus :: Env -> Bus
__settings :: Env -> Settings
__logRef :: Env -> LoggerRef
..} <- EventStore Env
getEnv
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ MonitoringBackend -> IO ()
monitoringBackendIncrForceReconnect MonitoringBackend
__monitor
monitorIncrHeartbeatTimeouts :: EventStore ()
monitorIncrHeartbeatTimeouts :: EventStore ()
monitorIncrHeartbeatTimeouts = do
Env{LoggerRef
MonitoringBackend
Settings
Bus
__monitor :: MonitoringBackend
__bus :: Bus
__settings :: Settings
__logRef :: LoggerRef
__monitor :: Env -> MonitoringBackend
__bus :: Env -> Bus
__settings :: Env -> Settings
__logRef :: Env -> LoggerRef
..} <- EventStore Env
getEnv
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ MonitoringBackend -> IO ()
monitoringBackendIncrHeartbeatTimeouts MonitoringBackend
__monitor