module Colog.Concurrent
(
withBackgroundLogger
, defCapacity
, BackgroundWorker
, backgroundWorkerWrite
, killBackgroundLogger
, mkCapacity
, forkBackgroundLogger
, convertToLogAction
, mkBackgroundThread
, runInBackgroundThread
) where
import Control.Applicative (many, (<|>), some)
import Control.Concurrent (forkFinally, killThread)
import Control.Concurrent.STM (STM, atomically, check, newTVarIO, readTVar, writeTVar)
import Control.Concurrent.STM.TBQueue (newTBQueueIO, readTBQueue, writeTBQueue)
import Control.Exception (bracket, finally, mask_)
import Control.Monad (forever, join)
import Control.Monad.IO.Class (MonadIO (..))
import Data.Foldable (for_)
import Numeric.Natural (Natural)
import Colog.Concurrent.Internal (BackgroundWorker (..), Capacity (..), mkCapacity)
import Colog.Core.Action (LogAction (..))
withBackgroundLogger
:: MonadIO m
=> Capacity
-> LogAction IO msg
-> IO ()
-> (LogAction m msg -> IO a)
-> IO a
withBackgroundLogger :: Capacity
-> LogAction IO msg -> IO () -> (LogAction m msg -> IO a) -> IO a
withBackgroundLogger Capacity
cap LogAction IO msg
logger IO ()
flush LogAction m msg -> IO a
action =
IO (BackgroundWorker msg)
-> (BackgroundWorker msg -> IO ())
-> (BackgroundWorker msg -> IO a)
-> IO a
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (Capacity -> LogAction IO msg -> IO () -> IO (BackgroundWorker msg)
forall msg.
Capacity -> LogAction IO msg -> IO () -> IO (BackgroundWorker msg)
forkBackgroundLogger Capacity
cap LogAction IO msg
logger IO ()
flush)
BackgroundWorker msg -> IO ()
forall msg. BackgroundWorker msg -> IO ()
killBackgroundLogger
(LogAction m msg -> IO a
action (LogAction m msg -> IO a)
-> (BackgroundWorker msg -> LogAction m msg)
-> BackgroundWorker msg
-> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. BackgroundWorker msg -> LogAction m msg
forall (m :: * -> *) msg.
MonadIO m =>
BackgroundWorker msg -> LogAction m msg
convertToLogAction)
defCapacity :: Capacity
defCapacity :: Capacity
defCapacity = Natural -> Maybe Natural -> Capacity
Capacity Natural
4096 (Natural -> Maybe Natural
forall a. a -> Maybe a
Just Natural
32)
killBackgroundLogger :: BackgroundWorker msg -> IO ()
killBackgroundLogger :: BackgroundWorker msg -> IO ()
killBackgroundLogger BackgroundWorker msg
bl = do
ThreadId -> IO ()
killThread (BackgroundWorker msg -> ThreadId
forall msg. BackgroundWorker msg -> ThreadId
backgroundWorkerThreadId BackgroundWorker msg
bl)
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar (BackgroundWorker msg -> TVar Bool
forall msg. BackgroundWorker msg -> TVar Bool
backgroundWorkerIsAlive BackgroundWorker msg
bl) STM Bool -> (Bool -> STM ()) -> STM ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> STM ()
check (Bool -> STM ()) -> (Bool -> Bool) -> Bool -> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Bool -> Bool
not
forkBackgroundLogger :: Capacity -> LogAction IO msg -> IO () -> IO (BackgroundWorker msg)
forkBackgroundLogger :: Capacity -> LogAction IO msg -> IO () -> IO (BackgroundWorker msg)
forkBackgroundLogger (Capacity Natural
cap Maybe Natural
lim) LogAction IO msg
logAction IO ()
flush = do
TBQueue msg
queue <- Natural -> IO (TBQueue msg)
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
cap
TVar Bool
isAlive <- Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
True
ThreadId
tid <- IO Any -> (Either SomeException Any -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally
(IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO Any) -> IO () -> IO Any
forall a b. (a -> b) -> a -> b
$ do
[msg]
msgs <- STM [msg] -> IO [msg]
forall a. STM a -> IO a
atomically (STM [msg] -> IO [msg]) -> STM [msg] -> IO [msg]
forall a b. (a -> b) -> a -> b
$ STM msg -> STM [msg]
forall a. STM a -> STM [a]
fetch (STM msg -> STM [msg]) -> STM msg -> STM [msg]
forall a b. (a -> b) -> a -> b
$ TBQueue msg -> STM msg
forall a. TBQueue a -> STM a
readTBQueue TBQueue msg
queue
[msg] -> (msg -> IO ()) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [msg]
msgs ((msg -> IO ()) -> IO ()) -> (msg -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall a. IO a -> IO a
mask_ (IO () -> IO ()) -> (msg -> IO ()) -> msg -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LogAction IO msg -> msg -> IO ()
forall (m :: * -> *) msg. LogAction m msg -> msg -> m ()
unLogAction LogAction IO msg
logAction
IO ()
flush)
(\Either SomeException Any
_ ->
(do [msg]
msgs <- STM [msg] -> IO [msg]
forall a. STM a -> IO a
atomically (STM [msg] -> IO [msg]) -> STM [msg] -> IO [msg]
forall a b. (a -> b) -> a -> b
$ STM msg -> STM [msg]
forall (f :: * -> *) a. Alternative f => f a -> f [a]
many (STM msg -> STM [msg]) -> STM msg -> STM [msg]
forall a b. (a -> b) -> a -> b
$ TBQueue msg -> STM msg
forall a. TBQueue a -> STM a
readTBQueue TBQueue msg
queue
[msg] -> (msg -> IO ()) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [msg]
msgs ((msg -> IO ()) -> IO ()) -> (msg -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall a. IO a -> IO a
mask_ (IO () -> IO ()) -> (msg -> IO ()) -> msg -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LogAction IO msg -> msg -> IO ()
forall (m :: * -> *) msg. LogAction m msg -> msg -> m ()
unLogAction LogAction IO msg
logAction
IO ()
flush)
IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
`finally` STM () -> IO ()
forall a. STM a -> IO a
atomically (TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
isAlive Bool
False))
BackgroundWorker msg -> IO (BackgroundWorker msg)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (BackgroundWorker msg -> IO (BackgroundWorker msg))
-> BackgroundWorker msg -> IO (BackgroundWorker msg)
forall a b. (a -> b) -> a -> b
$ ThreadId -> (msg -> STM ()) -> TVar Bool -> BackgroundWorker msg
forall msg.
ThreadId -> (msg -> STM ()) -> TVar Bool -> BackgroundWorker msg
BackgroundWorker ThreadId
tid (TBQueue msg -> msg -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue msg
queue) TVar Bool
isAlive
where
fetch :: STM a -> STM [a]
fetch
| Just Natural
n <- Maybe Natural
lim = Natural -> STM a -> STM [a]
forall a. Natural -> STM a -> STM [a]
someN Natural
n
| Bool
otherwise = STM a -> STM [a]
forall (f :: * -> *) a. Alternative f => f a -> f [a]
some
someN :: Natural -> STM a -> STM [a]
someN :: Natural -> STM a -> STM [a]
someN Natural
0 STM a
_ = [a] -> STM [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
someN Natural
n STM a
f = (:) (a -> [a] -> [a]) -> STM a -> STM ([a] -> [a])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM a
f STM ([a] -> [a]) -> STM [a] -> STM [a]
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Natural -> STM [a]
forall t. (Eq t, Num t) => t -> STM [a]
go Natural
n where
go :: t -> STM [a]
go t
0 = [a] -> STM [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
go t
k = ((:) (a -> [a] -> [a]) -> STM a -> STM ([a] -> [a])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM a
f STM ([a] -> [a]) -> STM [a] -> STM [a]
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> t -> STM [a]
go (t
kt -> t -> t
forall a. Num a => a -> a -> a
-t
1)) STM [a] -> STM [a] -> STM [a]
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> [a] -> STM [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
convertToLogAction :: MonadIO m => BackgroundWorker msg -> LogAction m msg
convertToLogAction :: BackgroundWorker msg -> LogAction m msg
convertToLogAction BackgroundWorker msg
logger = (msg -> m ()) -> LogAction m msg
forall (m :: * -> *) msg. (msg -> m ()) -> LogAction m msg
LogAction ((msg -> m ()) -> LogAction m msg)
-> (msg -> m ()) -> LogAction m msg
forall a b. (a -> b) -> a -> b
$ \msg
msg ->
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ BackgroundWorker msg -> msg -> STM ()
forall msg. BackgroundWorker msg -> msg -> STM ()
backgroundWorkerWrite BackgroundWorker msg
logger msg
msg
mkBackgroundThread :: Capacity -> IO (BackgroundWorker (IO ()))
mkBackgroundThread :: Capacity -> IO (BackgroundWorker (IO ()))
mkBackgroundThread (Capacity Natural
cap Maybe Natural
_lim) = do
TBQueue (IO ())
queue <- Natural -> IO (TBQueue (IO ()))
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
cap
TVar Bool
isAlive <- Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
True
ThreadId
tid <- IO Any -> (Either SomeException Any -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally
(IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO Any) -> IO () -> IO Any
forall a b. (a -> b) -> a -> b
$ IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ()) -> IO ()) -> IO (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ STM (IO ()) -> IO (IO ())
forall a. STM a -> IO a
atomically (STM (IO ()) -> IO (IO ())) -> STM (IO ()) -> IO (IO ())
forall a b. (a -> b) -> a -> b
$ TBQueue (IO ()) -> STM (IO ())
forall a. TBQueue a -> STM a
readTBQueue TBQueue (IO ())
queue)
(\Either SomeException Any
_ ->
([IO ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_ ([IO ()] -> IO ()) -> IO [IO ()] -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< STM [IO ()] -> IO [IO ()]
forall a. STM a -> IO a
atomically (STM (IO ()) -> STM [IO ()]
forall (f :: * -> *) a. Alternative f => f a -> f [a]
many (STM (IO ()) -> STM [IO ()]) -> STM (IO ()) -> STM [IO ()]
forall a b. (a -> b) -> a -> b
$ TBQueue (IO ()) -> STM (IO ())
forall a. TBQueue a -> STM a
readTBQueue TBQueue (IO ())
queue))
IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
`finally` STM () -> IO ()
forall a. STM a -> IO a
atomically (TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
isAlive Bool
False))
BackgroundWorker (IO ()) -> IO (BackgroundWorker (IO ()))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (BackgroundWorker (IO ()) -> IO (BackgroundWorker (IO ())))
-> BackgroundWorker (IO ()) -> IO (BackgroundWorker (IO ()))
forall a b. (a -> b) -> a -> b
$ ThreadId
-> (IO () -> STM ()) -> TVar Bool -> BackgroundWorker (IO ())
forall msg.
ThreadId -> (msg -> STM ()) -> TVar Bool -> BackgroundWorker msg
BackgroundWorker ThreadId
tid (TBQueue (IO ()) -> IO () -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue (IO ())
queue) TVar Bool
isAlive
runInBackgroundThread :: BackgroundWorker (IO ()) -> LogAction IO msg -> LogAction IO msg
runInBackgroundThread :: BackgroundWorker (IO ()) -> LogAction IO msg -> LogAction IO msg
runInBackgroundThread BackgroundWorker (IO ())
bt LogAction IO msg
logAction = (msg -> IO ()) -> LogAction IO msg
forall (m :: * -> *) msg. (msg -> m ()) -> LogAction m msg
LogAction ((msg -> IO ()) -> LogAction IO msg)
-> (msg -> IO ()) -> LogAction IO msg
forall a b. (a -> b) -> a -> b
$ \msg
msg ->
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ BackgroundWorker (IO ()) -> IO () -> STM ()
forall msg. BackgroundWorker msg -> msg -> STM ()
backgroundWorkerWrite BackgroundWorker (IO ())
bt (IO () -> STM ()) -> IO () -> STM ()
forall a b. (a -> b) -> a -> b
$ LogAction IO msg -> msg -> IO ()
forall (m :: * -> *) msg. LogAction m msg -> msg -> m ()
unLogAction LogAction IO msg
logAction msg
msg