module Colog.Concurrent
(
withBackgroundLogger
, defCapacity
, BackgroundWorker
, backgroundWorkerWrite
, killBackgroundLogger
, forkBackgroundLogger
, convertToLogAction
, mkBackgroundThread
, runInBackgroundThread
) where
import Control.Applicative (many)
import Control.Concurrent (forkFinally, killThread)
import Control.Concurrent.STM (atomically, check, newTVarIO, readTVar, writeTVar)
import Control.Concurrent.STM.TBQueue (newTBQueueIO, readTBQueue, writeTBQueue)
import Control.Exception (bracket, finally)
import Control.Monad (forever, join)
import Control.Monad.IO.Class (MonadIO (..))
import Data.Foldable (for_)
import Colog.Concurrent.Internal (BackgroundWorker (..), Capacity (..))
import Colog.Core.Action (LogAction (..))
withBackgroundLogger
:: MonadIO m
=> Capacity
-> LogAction IO msg
-> (LogAction m msg -> IO a)
-> IO a
withBackgroundLogger :: Capacity -> LogAction IO msg -> (LogAction m msg -> IO a) -> IO a
withBackgroundLogger cap :: Capacity
cap logger :: LogAction IO msg
logger action :: LogAction m msg -> IO a
action =
IO (BackgroundWorker msg)
-> (BackgroundWorker msg -> IO ())
-> (BackgroundWorker msg -> IO a)
-> IO a
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (Capacity -> LogAction IO msg -> IO (BackgroundWorker msg)
forall msg.
Capacity -> LogAction IO msg -> IO (BackgroundWorker msg)
forkBackgroundLogger Capacity
cap LogAction IO msg
logger)
BackgroundWorker msg -> IO ()
forall msg. BackgroundWorker msg -> IO ()
killBackgroundLogger
(LogAction m msg -> IO a
action (LogAction m msg -> IO a)
-> (BackgroundWorker msg -> LogAction m msg)
-> BackgroundWorker msg
-> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. BackgroundWorker msg -> LogAction m msg
forall (m :: * -> *) msg.
MonadIO m =>
BackgroundWorker msg -> LogAction m msg
convertToLogAction)
defCapacity :: Capacity
defCapacity :: Capacity
defCapacity = Natural -> Capacity
Capacity 4096
killBackgroundLogger :: BackgroundWorker msg -> IO ()
killBackgroundLogger :: BackgroundWorker msg -> IO ()
killBackgroundLogger bl :: BackgroundWorker msg
bl = do
ThreadId -> IO ()
killThread (BackgroundWorker msg -> ThreadId
forall msg. BackgroundWorker msg -> ThreadId
backgroundWorkerThreadId BackgroundWorker msg
bl)
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar (BackgroundWorker msg -> TVar Bool
forall msg. BackgroundWorker msg -> TVar Bool
backgroundWorkerIsAlive BackgroundWorker msg
bl) STM Bool -> (Bool -> STM ()) -> STM ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> STM ()
check (Bool -> STM ()) -> (Bool -> Bool) -> Bool -> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Bool -> Bool
not
forkBackgroundLogger :: Capacity -> LogAction IO msg -> IO (BackgroundWorker msg)
forkBackgroundLogger :: Capacity -> LogAction IO msg -> IO (BackgroundWorker msg)
forkBackgroundLogger (Capacity cap :: Natural
cap) logAction :: LogAction IO msg
logAction = do
TBQueue msg
queue <- Natural -> IO (TBQueue msg)
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
cap
TVar Bool
isAlive <- Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
True
ThreadId
tid <- IO Any -> (Either SomeException Any -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally
(IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO Any) -> IO () -> IO Any
forall a b. (a -> b) -> a -> b
$ do
msg
msg <- STM msg -> IO msg
forall a. STM a -> IO a
atomically (STM msg -> IO msg) -> STM msg -> IO msg
forall a b. (a -> b) -> a -> b
$ TBQueue msg -> STM msg
forall a. TBQueue a -> STM a
readTBQueue TBQueue msg
queue
LogAction IO msg -> msg -> IO ()
forall (m :: * -> *) msg. LogAction m msg -> msg -> m ()
unLogAction LogAction IO msg
logAction msg
msg)
(\_ ->
(do [msg]
msgs <- STM [msg] -> IO [msg]
forall a. STM a -> IO a
atomically (STM [msg] -> IO [msg]) -> STM [msg] -> IO [msg]
forall a b. (a -> b) -> a -> b
$ STM msg -> STM [msg]
forall (f :: * -> *) a. Alternative f => f a -> f [a]
many (STM msg -> STM [msg]) -> STM msg -> STM [msg]
forall a b. (a -> b) -> a -> b
$ TBQueue msg -> STM msg
forall a. TBQueue a -> STM a
readTBQueue TBQueue msg
queue
[msg] -> (msg -> IO ()) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [msg]
msgs ((msg -> IO ()) -> IO ()) -> (msg -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ LogAction IO msg -> msg -> IO ()
forall (m :: * -> *) msg. LogAction m msg -> msg -> m ()
unLogAction LogAction IO msg
logAction)
IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
`finally` STM () -> IO ()
forall a. STM a -> IO a
atomically (TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
isAlive Bool
False))
BackgroundWorker msg -> IO (BackgroundWorker msg)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (BackgroundWorker msg -> IO (BackgroundWorker msg))
-> BackgroundWorker msg -> IO (BackgroundWorker msg)
forall a b. (a -> b) -> a -> b
$ ThreadId -> (msg -> STM ()) -> TVar Bool -> BackgroundWorker msg
forall msg.
ThreadId -> (msg -> STM ()) -> TVar Bool -> BackgroundWorker msg
BackgroundWorker ThreadId
tid (TBQueue msg -> msg -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue msg
queue) TVar Bool
isAlive
convertToLogAction :: MonadIO m => BackgroundWorker msg -> LogAction m msg
convertToLogAction :: BackgroundWorker msg -> LogAction m msg
convertToLogAction logger :: BackgroundWorker msg
logger = (msg -> m ()) -> LogAction m msg
forall (m :: * -> *) msg. (msg -> m ()) -> LogAction m msg
LogAction ((msg -> m ()) -> LogAction m msg)
-> (msg -> m ()) -> LogAction m msg
forall a b. (a -> b) -> a -> b
$ \msg :: msg
msg ->
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ BackgroundWorker msg -> msg -> STM ()
forall msg. BackgroundWorker msg -> msg -> STM ()
backgroundWorkerWrite BackgroundWorker msg
logger msg
msg
mkBackgroundThread :: Capacity -> IO (BackgroundWorker (IO ()))
mkBackgroundThread :: Capacity -> IO (BackgroundWorker (IO ()))
mkBackgroundThread (Capacity cap :: Natural
cap) = do
TBQueue (IO ())
queue <- Natural -> IO (TBQueue (IO ()))
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
cap
TVar Bool
isAlive <- Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
True
ThreadId
tid <- IO Any -> (Either SomeException Any -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally
(IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO Any) -> IO () -> IO Any
forall a b. (a -> b) -> a -> b
$ IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ()) -> IO ()) -> IO (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ STM (IO ()) -> IO (IO ())
forall a. STM a -> IO a
atomically (STM (IO ()) -> IO (IO ())) -> STM (IO ()) -> IO (IO ())
forall a b. (a -> b) -> a -> b
$ TBQueue (IO ()) -> STM (IO ())
forall a. TBQueue a -> STM a
readTBQueue TBQueue (IO ())
queue)
(\_ ->
([IO ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_ ([IO ()] -> IO ()) -> IO [IO ()] -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< STM [IO ()] -> IO [IO ()]
forall a. STM a -> IO a
atomically (STM (IO ()) -> STM [IO ()]
forall (f :: * -> *) a. Alternative f => f a -> f [a]
many (STM (IO ()) -> STM [IO ()]) -> STM (IO ()) -> STM [IO ()]
forall a b. (a -> b) -> a -> b
$ TBQueue (IO ()) -> STM (IO ())
forall a. TBQueue a -> STM a
readTBQueue TBQueue (IO ())
queue))
IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
`finally` STM () -> IO ()
forall a. STM a -> IO a
atomically (TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
isAlive Bool
False))
BackgroundWorker (IO ()) -> IO (BackgroundWorker (IO ()))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (BackgroundWorker (IO ()) -> IO (BackgroundWorker (IO ())))
-> BackgroundWorker (IO ()) -> IO (BackgroundWorker (IO ()))
forall a b. (a -> b) -> a -> b
$ ThreadId
-> (IO () -> STM ()) -> TVar Bool -> BackgroundWorker (IO ())
forall msg.
ThreadId -> (msg -> STM ()) -> TVar Bool -> BackgroundWorker msg
BackgroundWorker ThreadId
tid (TBQueue (IO ()) -> IO () -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue (IO ())
queue) TVar Bool
isAlive
runInBackgroundThread :: BackgroundWorker (IO ()) -> LogAction IO msg -> LogAction IO msg
runInBackgroundThread :: BackgroundWorker (IO ()) -> LogAction IO msg -> LogAction IO msg
runInBackgroundThread bt :: BackgroundWorker (IO ())
bt logAction :: LogAction IO msg
logAction = (msg -> IO ()) -> LogAction IO msg
forall (m :: * -> *) msg. (msg -> m ()) -> LogAction m msg
LogAction ((msg -> IO ()) -> LogAction IO msg)
-> (msg -> IO ()) -> LogAction IO msg
forall a b. (a -> b) -> a -> b
$ \msg :: msg
msg ->
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ BackgroundWorker (IO ()) -> IO () -> STM ()
forall msg. BackgroundWorker msg -> msg -> STM ()
backgroundWorkerWrite BackgroundWorker (IO ())
bt (IO () -> STM ()) -> IO () -> STM ()
forall a b. (a -> b) -> a -> b
$ LogAction IO msg -> msg -> IO ()
forall (m :: * -> *) msg. LogAction m msg -> msg -> m ()
unLogAction LogAction IO msg
logAction msg
msg