{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE GeneralisedNewtypeDeriving #-}
module Database.Franz.Internal.Reader where

import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Control.Monad.State.Strict
import Control.Monad.Trans.Maybe
import Data.Hashable (Hashable)
import Data.Serialize
import Database.Franz.Internal.Protocol
import qualified Data.ByteString.Char8 as B
import qualified Data.HashMap.Strict as HM
import qualified Data.IntMap.Strict as IM
import qualified Data.Vector.Unboxed as U
import qualified Data.Vector as V
import Data.Maybe (isJust)
import GHC.Clock (getMonotonicTime)
import System.Directory
import System.FilePath
import System.IO
import qualified System.FSNotify as FS

data StreamStatus = CaughtUp | Outdated | Gone deriving StreamStatus -> StreamStatus -> Bool
(StreamStatus -> StreamStatus -> Bool)
-> (StreamStatus -> StreamStatus -> Bool) -> Eq StreamStatus
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: StreamStatus -> StreamStatus -> Bool
$c/= :: StreamStatus -> StreamStatus -> Bool
== :: StreamStatus -> StreamStatus -> Bool
$c== :: StreamStatus -> StreamStatus -> Bool
Eq

data Stream = Stream
  { Stream -> FilePath
streamPath :: FilePath
  , Stream -> TVar (IntMap Int)
vOffsets :: !(TVar (IM.IntMap Int))
  , Stream -> Vector IndexName
indexNames :: !(V.Vector IndexName)
  , Stream -> HashMap IndexName (TVar (IntMap Int))
indices :: !(HM.HashMap IndexName (TVar (IM.IntMap Int)))
  , Stream -> TVar Int
vCount :: !(TVar Int)
  , Stream -> TVar StreamStatus
vStatus :: !(TVar StreamStatus)
  , Stream -> ThreadId
followThread :: !ThreadId
  , Stream -> Handle
indexHandle :: !Handle
  , Stream -> Handle
payloadHandle :: !Handle
  , Stream -> TVar Activity
vActivity :: !(TVar Activity)
  }

type Activity = Either Double Int

addActivity :: Stream -> STM ()
addActivity :: Stream -> STM ()
addActivity Stream
str = TVar Activity -> (Activity -> Activity) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (Stream -> TVar Activity
vActivity Stream
str) ((Activity -> Activity) -> STM ())
-> (Activity -> Activity) -> STM ()
forall a b. (a -> b) -> a -> b
$ \case
  Left Double
_ -> Int -> Activity
forall a b. b -> Either a b
Right Int
0
  Right Int
n -> Int -> Activity
forall a b. b -> Either a b
Right (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)

removeActivity :: Stream -> IO ()
removeActivity :: Stream -> IO ()
removeActivity Stream
str = do
  Double
now <- IO Double
getMonotonicTime
  STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Activity -> (Activity -> Activity) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (Stream -> TVar Activity
vActivity Stream
str) ((Activity -> Activity) -> STM ())
-> (Activity -> Activity) -> STM ()
forall a b. (a -> b) -> a -> b
$ \case
    Left Double
_ -> Double -> Activity
forall a b. a -> Either a b
Left Double
now
    Right Int
n
      | Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 -> Double -> Activity
forall a b. a -> Either a b
Left Double
now
      | Bool
otherwise -> Int -> Activity
forall a b. b -> Either a b
Right (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)

closeStream :: Stream -> IO ()
closeStream :: Stream -> IO ()
closeStream Stream{FilePath
Handle
ThreadId
TVar Int
TVar Activity
TVar (IntMap Int)
TVar StreamStatus
HashMap IndexName (TVar (IntMap Int))
Vector IndexName
vActivity :: TVar Activity
payloadHandle :: Handle
indexHandle :: Handle
followThread :: ThreadId
vStatus :: TVar StreamStatus
vCount :: TVar Int
indices :: HashMap IndexName (TVar (IntMap Int))
indexNames :: Vector IndexName
vOffsets :: TVar (IntMap Int)
streamPath :: FilePath
vActivity :: Stream -> TVar Activity
payloadHandle :: Stream -> Handle
indexHandle :: Stream -> Handle
followThread :: Stream -> ThreadId
vStatus :: Stream -> TVar StreamStatus
vCount :: Stream -> TVar Int
indices :: Stream -> HashMap IndexName (TVar (IntMap Int))
indexNames :: Stream -> Vector IndexName
vOffsets :: Stream -> TVar (IntMap Int)
streamPath :: Stream -> FilePath
..} = do
  ThreadId -> IO ()
killThread ThreadId
followThread
  Handle -> IO ()
hClose Handle
payloadHandle
  Handle -> IO ()
hClose Handle
indexHandle

createStream :: FS.WatchManager -> FilePath -> IO Stream
createStream :: WatchManager -> FilePath -> IO Stream
createStream WatchManager
man FilePath
path = do
  let offsetPath :: FilePath
offsetPath = FilePath
path FilePath -> FilePath -> FilePath
</> FilePath
"offsets"
  let payloadPath :: FilePath
payloadPath = FilePath
path FilePath -> FilePath -> FilePath
</> FilePath
"payloads"
  Bool
exist <- FilePath -> IO Bool
doesFileExist FilePath
offsetPath
  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
exist (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ FranzException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (FranzException -> IO ()) -> FranzException -> IO ()
forall a b. (a -> b) -> a -> b
$ FilePath -> FranzException
StreamNotFound FilePath
offsetPath
  IndexName
initialOffsetsBS <- FilePath -> IO IndexName
B.readFile FilePath
offsetPath
  Handle
payloadHandle <- FilePath -> IOMode -> IO Handle
openBinaryFile FilePath
payloadPath IOMode
ReadMode
  Vector IndexName
indexNames <- [IndexName] -> Vector IndexName
forall a. [a] -> Vector a
V.fromList ([IndexName] -> Vector IndexName)
-> (IndexName -> [IndexName]) -> IndexName -> Vector IndexName
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IndexName -> [IndexName]
B.lines (IndexName -> Vector IndexName)
-> IO IndexName -> IO (Vector IndexName)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> FilePath -> IO IndexName
B.readFile (FilePath
path FilePath -> FilePath -> FilePath
</> FilePath
"indices")
  let icount :: Int
icount = Int
1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Vector IndexName -> Int
forall a. Vector a -> Int
V.length Vector IndexName
indexNames
  let count :: Int
count = IndexName -> Int
B.length IndexName
initialOffsetsBS Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` (Int
8 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
icount)
  let getI :: Get Int
getI = Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Int) -> Get Int64 -> Get Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get Int64
getInt64le
  Vector (Vector Int)
initialIndices <- (FilePath -> IO (Vector (Vector Int)))
-> (Vector (Vector Int) -> IO (Vector (Vector Int)))
-> Either FilePath (Vector (Vector Int))
-> IO (Vector (Vector Int))
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (FranzException -> IO (Vector (Vector Int))
forall e a. Exception e => e -> IO a
throwIO (FranzException -> IO (Vector (Vector Int)))
-> (FilePath -> FranzException)
-> FilePath
-> IO (Vector (Vector Int))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FilePath -> FranzException
InternalError) Vector (Vector Int) -> IO (Vector (Vector Int))
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    (Either FilePath (Vector (Vector Int)) -> IO (Vector (Vector Int)))
-> Either FilePath (Vector (Vector Int))
-> IO (Vector (Vector Int))
forall a b. (a -> b) -> a -> b
$ Get (Vector (Vector Int))
-> IndexName -> Either FilePath (Vector (Vector Int))
forall a. Get a -> IndexName -> Either FilePath a
runGet (Int -> Get (Vector Int) -> Get (Vector (Vector Int))
forall (m :: * -> *) a. Monad m => Int -> m a -> m (Vector a)
V.replicateM Int
count (Get (Vector Int) -> Get (Vector (Vector Int)))
-> Get (Vector Int) -> Get (Vector (Vector Int))
forall a b. (a -> b) -> a -> b
$ Int -> Get Int -> Get (Vector Int)
forall (m :: * -> *) a.
(Monad m, Unbox a) =>
Int -> m a -> m (Vector a)
U.replicateM Int
icount Get Int
getI) IndexName
initialOffsetsBS
  let initialOffsets :: IntMap Int
initialOffsets = [(Int, Int)] -> IntMap Int
forall a. [(Int, a)] -> IntMap a
IM.fromList ([(Int, Int)] -> IntMap Int) -> [(Int, Int)] -> IntMap Int
forall a b. (a -> b) -> a -> b
$ Vector (Int, Int) -> [(Int, Int)]
forall a. Vector a -> [a]
V.toList
        (Vector (Int, Int) -> [(Int, Int)])
-> Vector (Int, Int) -> [(Int, Int)]
forall a b. (a -> b) -> a -> b
$ Vector Int -> Vector Int -> Vector (Int, Int)
forall a b. Vector a -> Vector b -> Vector (a, b)
V.zip (Int -> Int -> Vector Int
forall a. Num a => a -> Int -> Vector a
V.enumFromN Int
0 Int
count) (Vector Int -> Vector (Int, Int))
-> Vector Int -> Vector (Int, Int)
forall a b. (a -> b) -> a -> b
$ (Vector Int -> Int) -> Vector (Vector Int) -> Vector Int
forall a b. (a -> b) -> Vector a -> Vector b
V.map Vector Int -> Int
forall a. Unbox a => Vector a -> a
U.head Vector (Vector Int)
initialIndices
  TVar (IntMap Int)
vOffsets <- IntMap Int -> IO (TVar (IntMap Int))
forall a. a -> IO (TVar a)
newTVarIO (IntMap Int -> IO (TVar (IntMap Int)))
-> IntMap Int -> IO (TVar (IntMap Int))
forall a b. (a -> b) -> a -> b
$! IntMap Int
initialOffsets
  TVar StreamStatus
vStatus <- StreamStatus -> IO (TVar StreamStatus)
forall a. a -> IO (TVar a)
newTVarIO StreamStatus
Outdated
  TVar Int
vCount <- Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO (Int -> IO (TVar Int)) -> Int -> IO (TVar Int)
forall a b. (a -> b) -> a -> b
$! IntMap Int -> Int
forall a. IntMap a -> Int
IM.size IntMap Int
initialOffsets
  IO ()
_ <- WatchManager -> FilePath -> ActionPredicate -> Action -> IO (IO ())
FS.watchDir WatchManager
man FilePath
path (\case
    FS.Modified FilePath
p UTCTime
_ Bool
_ | FilePath
p FilePath -> FilePath -> Bool
forall a. Eq a => a -> a -> Bool
== FilePath
offsetPath -> Bool
True
    FS.Removed FilePath
p UTCTime
_ Bool
_ | FilePath
p FilePath -> FilePath -> Bool
forall a. Eq a => a -> a -> Bool
== FilePath
offsetPath -> Bool
True
    Event
_ -> Bool
False)
    (Action -> IO (IO ())) -> Action -> IO (IO ())
forall a b. (a -> b) -> a -> b
$ \case
      FS.Modified FilePath
_ UTCTime
_ Bool
_ -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar StreamStatus -> StreamStatus -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar StreamStatus
vStatus StreamStatus
Outdated
      FS.Removed FilePath
_ UTCTime
_ Bool
_ -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar StreamStatus -> StreamStatus -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar StreamStatus
vStatus StreamStatus
Gone
      Event
_ -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

  [TVar (IntMap Int)]
vIndices <- [Int] -> (Int -> IO (TVar (IntMap Int))) -> IO [TVar (IntMap Int)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Int
1..Vector IndexName -> Int
forall a. Vector a -> Int
V.length Vector IndexName
indexNames] ((Int -> IO (TVar (IntMap Int))) -> IO [TVar (IntMap Int)])
-> (Int -> IO (TVar (IntMap Int))) -> IO [TVar (IntMap Int)]
forall a b. (a -> b) -> a -> b
$ \Int
i -> IntMap Int -> IO (TVar (IntMap Int))
forall a. a -> IO (TVar a)
newTVarIO
    (IntMap Int -> IO (TVar (IntMap Int)))
-> IntMap Int -> IO (TVar (IntMap Int))
forall a b. (a -> b) -> a -> b
$ [(Int, Int)] -> IntMap Int
forall a. [(Int, a)] -> IntMap a
IM.fromList ([(Int, Int)] -> IntMap Int) -> [(Int, Int)] -> IntMap Int
forall a b. (a -> b) -> a -> b
$ Vector (Int, Int) -> [(Int, Int)]
forall a. Vector a -> [a]
V.toList (Vector (Int, Int) -> [(Int, Int)])
-> Vector (Int, Int) -> [(Int, Int)]
forall a b. (a -> b) -> a -> b
$ Vector Int -> Vector Int -> Vector (Int, Int)
forall a b. Vector a -> Vector b -> Vector (a, b)
V.zip ((Vector Int -> Int) -> Vector (Vector Int) -> Vector Int
forall a b. (a -> b) -> Vector a -> Vector b
V.map (Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
U.! Int
i) Vector (Vector Int)
initialIndices) (Int -> Int -> Vector Int
forall a. Num a => a -> Int -> Vector a
V.enumFromN Int
0 Int
count)

  Handle
indexHandle <- FilePath -> IOMode -> IO Handle
openFile FilePath
offsetPath IOMode
ReadMode

  let final :: Either SomeException () -> IO ()
      final :: Either SomeException () -> IO ()
final (Left SomeException
exc) | Just AsyncException
ThreadKilled <- SomeException -> Maybe AsyncException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
exc = () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      final (Left SomeException
exc) = [FilePath] -> IO ()
logFollower [FilePath
path, FilePath
"terminated with", SomeException -> FilePath
forall a. Show a => a -> FilePath
show SomeException
exc]
      final (Right ()
_) = [FilePath] -> IO ()
logFollower [FilePath
path, FilePath
"has been removed"]

  -- TODO broadcast an exception if it exits?
  ThreadId
followThread <- (IO () -> (Either SomeException () -> IO ()) -> IO ThreadId)
-> (Either SomeException () -> IO ()) -> IO () -> IO ThreadId
forall a b c. (a -> b -> c) -> b -> a -> c
flip IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally Either SomeException () -> IO ()
final (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
    Maybe ((Int, Int), IntMap Int)
-> (((Int, Int), IntMap Int) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (IntMap Int -> Maybe ((Int, Int), IntMap Int)
forall a. IntMap a -> Maybe ((Int, a), IntMap a)
IM.maxViewWithKey IntMap Int
initialOffsets) ((((Int, Int), IntMap Int) -> IO ()) -> IO ())
-> (((Int, Int), IntMap Int) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \((Int
i, Int
_), IntMap Int
_) ->
      Handle -> SeekMode -> Integer -> IO ()
hSeek Handle
indexHandle SeekMode
AbsoluteSeek (Integer -> IO ()) -> Integer -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Integer) -> Int -> Integer
forall a b. (a -> b) -> a -> b
$ Int -> Int
forall a. Enum a => a -> a
succ Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
icount Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
8
    (IO () -> IO ()) -> IO ()
forall a. (a -> a) -> a
fix ((IO () -> IO ()) -> IO ()) -> (IO () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \IO ()
self -> do
      IndexName
bs <- Handle -> Int -> IO IndexName
B.hGet Handle
indexHandle (Int
8 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
icount)
      if IndexName -> Bool
B.null IndexName
bs
        then do
          STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar StreamStatus -> (StreamStatus -> StreamStatus) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar StreamStatus
vStatus ((StreamStatus -> StreamStatus) -> STM ())
-> (StreamStatus -> StreamStatus) -> STM ()
forall a b. (a -> b) -> a -> b
$ \case
            StreamStatus
Outdated -> StreamStatus
CaughtUp
            StreamStatus
CaughtUp -> StreamStatus
CaughtUp
            StreamStatus
Gone -> StreamStatus
Gone
          Bool
continue <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TVar StreamStatus -> STM StreamStatus
forall a. TVar a -> STM a
readTVar TVar StreamStatus
vStatus STM StreamStatus -> (StreamStatus -> STM Bool) -> STM Bool
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
            StreamStatus
CaughtUp -> STM Bool
forall a. STM a
retry
            StreamStatus
Outdated -> Bool -> STM Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
            StreamStatus
Gone -> Bool -> STM Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
          Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
continue IO ()
self
        else do
          Int
ofs : [Int]
indices <- (FilePath -> IO [Int])
-> ([Int] -> IO [Int]) -> Either FilePath [Int] -> IO [Int]
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (FranzException -> IO [Int]
forall e a. Exception e => e -> IO a
throwIO (FranzException -> IO [Int])
-> (FilePath -> FranzException) -> FilePath -> IO [Int]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FilePath -> FranzException
InternalError) [Int] -> IO [Int]
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either FilePath [Int] -> IO [Int])
-> Either FilePath [Int] -> IO [Int]
forall a b. (a -> b) -> a -> b
$ Get [Int] -> IndexName -> Either FilePath [Int]
forall a. Get a -> IndexName -> Either FilePath a
runGet (Int -> Get Int -> Get [Int]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
icount Get Int
getI) IndexName
bs
          STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            Int
i <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
vCount
            TVar (IntMap Int) -> (IntMap Int -> IntMap Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (IntMap Int)
vOffsets ((IntMap Int -> IntMap Int) -> STM ())
-> (IntMap Int -> IntMap Int) -> STM ()
forall a b. (a -> b) -> a -> b
$ Int -> Int -> IntMap Int -> IntMap Int
forall a. Int -> a -> IntMap a -> IntMap a
IM.insert Int
i Int
ofs
            [(TVar (IntMap Int), Int)]
-> ((TVar (IntMap Int), Int) -> STM ()) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([TVar (IntMap Int)] -> [Int] -> [(TVar (IntMap Int), Int)]
forall a b. [a] -> [b] -> [(a, b)]
zip [TVar (IntMap Int)]
vIndices [Int]
indices) (((TVar (IntMap Int), Int) -> STM ()) -> STM ())
-> ((TVar (IntMap Int), Int) -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ \(TVar (IntMap Int)
v, Int
x) -> TVar (IntMap Int) -> (IntMap Int -> IntMap Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (IntMap Int)
v ((IntMap Int -> IntMap Int) -> STM ())
-> (IntMap Int -> IntMap Int) -> STM ()
forall a b. (a -> b) -> a -> b
$ Int -> Int -> IntMap Int -> IntMap Int
forall a. Int -> a -> IntMap a -> IntMap a
IM.insert (Int -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
x) Int
i
            TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
vCount (Int -> STM ()) -> Int -> STM ()
forall a b. (a -> b) -> a -> b
$! Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
          IO ()
self

  let indices :: HashMap IndexName (TVar (IntMap Int))
indices = [(IndexName, TVar (IntMap Int))]
-> HashMap IndexName (TVar (IntMap Int))
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HM.fromList ([(IndexName, TVar (IntMap Int))]
 -> HashMap IndexName (TVar (IntMap Int)))
-> [(IndexName, TVar (IntMap Int))]
-> HashMap IndexName (TVar (IntMap Int))
forall a b. (a -> b) -> a -> b
$ [IndexName]
-> [TVar (IntMap Int)] -> [(IndexName, TVar (IntMap Int))]
forall a b. [a] -> [b] -> [(a, b)]
zip (Vector IndexName -> [IndexName]
forall a. Vector a -> [a]
V.toList Vector IndexName
indexNames) [TVar (IntMap Int)]
vIndices

  TVar Activity
vActivity <- IO Double
getMonotonicTime IO Double -> (Double -> IO (TVar Activity)) -> IO (TVar Activity)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Activity -> IO (TVar Activity)
forall a. a -> IO (TVar a)
newTVarIO (Activity -> IO (TVar Activity))
-> (Double -> Activity) -> Double -> IO (TVar Activity)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Double -> Activity
forall a b. a -> Either a b
Left
  Stream -> IO Stream
forall (m :: * -> *) a. Monad m => a -> m a
return Stream :: FilePath
-> TVar (IntMap Int)
-> Vector IndexName
-> HashMap IndexName (TVar (IntMap Int))
-> TVar Int
-> TVar StreamStatus
-> ThreadId
-> Handle
-> Handle
-> TVar Activity
-> Stream
Stream{ streamPath :: FilePath
streamPath = FilePath
path, Handle
ThreadId
TVar Int
TVar Activity
TVar (IntMap Int)
TVar StreamStatus
HashMap IndexName (TVar (IntMap Int))
Vector IndexName
vActivity :: TVar Activity
indices :: HashMap IndexName (TVar (IntMap Int))
followThread :: ThreadId
indexHandle :: Handle
vCount :: TVar Int
vStatus :: TVar StreamStatus
vOffsets :: TVar (IntMap Int)
indexNames :: Vector IndexName
payloadHandle :: Handle
vActivity :: TVar Activity
payloadHandle :: Handle
indexHandle :: Handle
followThread :: ThreadId
vStatus :: TVar StreamStatus
vCount :: TVar Int
indices :: HashMap IndexName (TVar (IntMap Int))
indexNames :: Vector IndexName
vOffsets :: TVar (IntMap Int)
..}
  where
    logFollower :: [FilePath] -> IO ()
logFollower = Handle -> FilePath -> IO ()
hPutStrLn Handle
stderr (FilePath -> IO ())
-> ([FilePath] -> FilePath) -> [FilePath] -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [FilePath] -> FilePath
unwords ([FilePath] -> FilePath)
-> ([FilePath] -> [FilePath]) -> [FilePath] -> FilePath
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (:) FilePath
"[follower]"

type QueryResult = ((Int, Int) -- starting SeqNo, byte offset
    , (Int, Int)) -- ending SeqNo, byte offset

range :: Int -- ^ from
  -> Int -- ^ to
  -> RequestType
  -> IM.IntMap Int -- ^ offsets
  -> (Bool, QueryResult)
range :: Int -> Int -> RequestType -> IntMap Int -> (Bool, QueryResult)
range Int
begin Int
end RequestType
rt IntMap Int
allOffsets = case RequestType
rt of
    RequestType
AllItems -> (Bool
ready, ((Int, Int)
firstItem, (Int, Int)
-> (((Int, Int), IntMap Int) -> (Int, Int))
-> Maybe ((Int, Int), IntMap Int)
-> (Int, Int)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Int, Int)
firstItem ((Int, Int), IntMap Int) -> (Int, Int)
forall a b. (a, b) -> a
fst (Maybe ((Int, Int), IntMap Int) -> (Int, Int))
-> Maybe ((Int, Int), IntMap Int) -> (Int, Int)
forall a b. (a -> b) -> a -> b
$ IntMap Int -> Maybe ((Int, Int), IntMap Int)
forall a. IntMap a -> Maybe ((Int, a), IntMap a)
IM.maxViewWithKey IntMap Int
body))
    RequestType
LastItem -> case IntMap Int -> Maybe ((Int, Int), IntMap Int)
forall a. IntMap a -> Maybe ((Int, a), IntMap a)
IM.maxViewWithKey IntMap Int
body of
      Maybe ((Int, Int), IntMap Int)
Nothing -> (Bool
False, ((Int, Int)
zero, (Int, Int)
zero))
      Just ((Int, Int)
ofs', IntMap Int
r) -> case IntMap Int -> Maybe ((Int, Int), IntMap Int)
forall a. IntMap a -> Maybe ((Int, a), IntMap a)
IM.maxViewWithKey (IntMap Int -> IntMap Int -> IntMap Int
forall a. IntMap a -> IntMap a -> IntMap a
IM.union IntMap Int
left IntMap Int
r) of
        Just ((Int, Int)
ofs, IntMap Int
_) -> (Bool
ready, ((Int, Int)
ofs, (Int, Int)
ofs'))
        Maybe ((Int, Int), IntMap Int)
Nothing -> (Bool
ready, ((Int, Int)
zero, (Int, Int)
ofs'))
  where
    zero :: (Int, Int)
zero = (-Int
1, Int
0)
    ready :: Bool
ready = Maybe Int -> Bool
forall a. Maybe a -> Bool
isJust Maybe Int
lastItem Bool -> Bool -> Bool
|| Bool -> Bool
not (IntMap Int -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null IntMap Int
cont)
    (IntMap Int
wing, Maybe Int
lastItem, IntMap Int
cont) = Int -> IntMap Int -> (IntMap Int, Maybe Int, IntMap Int)
forall a. Int -> IntMap a -> (IntMap a, Maybe a, IntMap a)
IM.splitLookup Int
end IntMap Int
allOffsets
    (IntMap Int
left, IntMap Int
body) = Int -> IntMap Int -> (IntMap Int, IntMap Int)
forall a. Int -> IntMap a -> (IntMap a, IntMap a)
splitR Int
begin (IntMap Int -> (IntMap Int, IntMap Int))
-> IntMap Int -> (IntMap Int, IntMap Int)
forall a b. (a -> b) -> a -> b
$ (IntMap Int -> IntMap Int)
-> (Int -> IntMap Int -> IntMap Int)
-> Maybe Int
-> IntMap Int
-> IntMap Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe IntMap Int -> IntMap Int
forall a. a -> a
id (Int -> Int -> IntMap Int -> IntMap Int
forall a. Int -> a -> IntMap a -> IntMap a
IM.insert Int
end) Maybe Int
lastItem IntMap Int
wing
    firstItem :: (Int, Int)
firstItem = (Int, Int)
-> (((Int, Int), IntMap Int) -> (Int, Int))
-> Maybe ((Int, Int), IntMap Int)
-> (Int, Int)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Int, Int)
zero ((Int, Int), IntMap Int) -> (Int, Int)
forall a b. (a, b) -> a
fst (Maybe ((Int, Int), IntMap Int) -> (Int, Int))
-> Maybe ((Int, Int), IntMap Int) -> (Int, Int)
forall a b. (a -> b) -> a -> b
$ IntMap Int -> Maybe ((Int, Int), IntMap Int)
forall a. IntMap a -> Maybe ((Int, a), IntMap a)
IM.maxViewWithKey IntMap Int
left

splitR :: Int -> IM.IntMap a -> (IM.IntMap a, IM.IntMap a)
splitR :: Int -> IntMap a -> (IntMap a, IntMap a)
splitR Int
i IntMap a
m = let (IntMap a
l, Maybe a
p, IntMap a
r) = Int -> IntMap a -> (IntMap a, Maybe a, IntMap a)
forall a. Int -> IntMap a -> (IntMap a, Maybe a, IntMap a)
IM.splitLookup Int
i IntMap a
m in (IntMap a
l, (IntMap a -> IntMap a)
-> (a -> IntMap a -> IntMap a) -> Maybe a -> IntMap a -> IntMap a
forall b a. b -> (a -> b) -> Maybe a -> b
maybe IntMap a -> IntMap a
forall a. a -> a
id (Int -> a -> IntMap a -> IntMap a
forall a. Int -> a -> IntMap a -> IntMap a
IM.insert Int
i) Maybe a
p IntMap a
r)

data FranzReader = FranzReader
  { FranzReader -> WatchManager
watchManager :: FS.WatchManager
  , FranzReader
-> TVar (HashMap FranzDirectory (HashMap StreamName Stream))
vStreams :: TVar (HM.HashMap FranzDirectory (HM.HashMap StreamName Stream))
  }

data ReaperState = ReaperState
    { -- | How many streams we pruned.
      ReaperState -> Int
prunedStreams :: !Int
      -- | How many streams we saw in total.
    , ReaperState -> Int
totalStreams :: !Int
    }

reaper :: Double -- interval
  -> Double -- lifetime
  -> FranzReader -> IO ()
reaper :: Double -> Double -> FranzReader -> IO ()
reaper Double
int Double
life FranzReader{TVar (HashMap FranzDirectory (HashMap StreamName Stream))
WatchManager
vStreams :: TVar (HashMap FranzDirectory (HashMap StreamName Stream))
watchManager :: WatchManager
vStreams :: FranzReader
-> TVar (HashMap FranzDirectory (HashMap StreamName Stream))
watchManager :: FranzReader -> WatchManager
..} = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
  Double
now <- IO Double
getMonotonicTime
  -- Check if stream's activity indicates that we should prune it.
  let shouldPrune :: Either Double b -> Bool
shouldPrune (Left Double
t) = Double
now Double -> Double -> Double
forall a. Num a => a -> a -> a
- Double
t Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
>= Double
life
      shouldPrune Either Double b
_ = Bool
False

      -- Try prunning stream at given filepath. Checks if stream
      -- should really be pruned first.
      tryPrune :: FranzDirectory -> StreamName -> STM (Maybe Stream)
      tryPrune :: FranzDirectory -> StreamName -> STM (Maybe Stream)
tryPrune FranzDirectory
mPath StreamName
sPath = MaybeT STM Stream -> STM (Maybe Stream)
forall (m :: * -> *) a. MaybeT m a -> m (Maybe a)
runMaybeT (MaybeT STM Stream -> STM (Maybe Stream))
-> MaybeT STM Stream -> STM (Maybe Stream)
forall a b. (a -> b) -> a -> b
$ do
        HashMap FranzDirectory (HashMap StreamName Stream)
currentAllStreams <- STM (HashMap FranzDirectory (HashMap StreamName Stream))
-> MaybeT STM (HashMap FranzDirectory (HashMap StreamName Stream))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (STM (HashMap FranzDirectory (HashMap StreamName Stream))
 -> MaybeT STM (HashMap FranzDirectory (HashMap StreamName Stream)))
-> STM (HashMap FranzDirectory (HashMap StreamName Stream))
-> MaybeT STM (HashMap FranzDirectory (HashMap StreamName Stream))
forall a b. (a -> b) -> a -> b
$ TVar (HashMap FranzDirectory (HashMap StreamName Stream))
-> STM (HashMap FranzDirectory (HashMap StreamName Stream))
forall a. TVar a -> STM a
readTVar TVar (HashMap FranzDirectory (HashMap StreamName Stream))
vStreams
        HashMap StreamName Stream
currentStreams <- STM (Maybe (HashMap StreamName Stream))
-> MaybeT STM (HashMap StreamName Stream)
forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT (STM (Maybe (HashMap StreamName Stream))
 -> MaybeT STM (HashMap StreamName Stream))
-> (Maybe (HashMap StreamName Stream)
    -> STM (Maybe (HashMap StreamName Stream)))
-> Maybe (HashMap StreamName Stream)
-> MaybeT STM (HashMap StreamName Stream)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe (HashMap StreamName Stream)
-> STM (Maybe (HashMap StreamName Stream))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (HashMap StreamName Stream)
 -> MaybeT STM (HashMap StreamName Stream))
-> Maybe (HashMap StreamName Stream)
-> MaybeT STM (HashMap StreamName Stream)
forall a b. (a -> b) -> a -> b
$ FranzDirectory
-> HashMap FranzDirectory (HashMap StreamName Stream)
-> Maybe (HashMap StreamName Stream)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup FranzDirectory
mPath HashMap FranzDirectory (HashMap StreamName Stream)
currentAllStreams
        Stream
currentStream <- STM (Maybe Stream) -> MaybeT STM Stream
forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT (STM (Maybe Stream) -> MaybeT STM Stream)
-> (Maybe Stream -> STM (Maybe Stream))
-> Maybe Stream
-> MaybeT STM Stream
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe Stream -> STM (Maybe Stream)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Stream -> MaybeT STM Stream)
-> Maybe Stream -> MaybeT STM Stream
forall a b. (a -> b) -> a -> b
$ StreamName -> HashMap StreamName Stream -> Maybe Stream
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup StreamName
sPath HashMap StreamName Stream
currentStreams
        Activity
currentAct <- STM Activity -> MaybeT STM Activity
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (STM Activity -> MaybeT STM Activity)
-> STM Activity -> MaybeT STM Activity
forall a b. (a -> b) -> a -> b
$ TVar Activity -> STM Activity
forall a. TVar a -> STM a
readTVar (Stream -> TVar Activity
vActivity Stream
currentStream)
        Bool -> MaybeT STM ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard (Bool -> MaybeT STM ()) -> Bool -> MaybeT STM ()
forall a b. (a -> b) -> a -> b
$ Activity -> Bool
forall b. Either Double b -> Bool
shouldPrune Activity
currentAct
        let newStreams :: HashMap StreamName Stream
newStreams = StreamName
-> HashMap StreamName Stream -> HashMap StreamName Stream
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
HM.delete StreamName
sPath HashMap StreamName Stream
currentStreams
        STM () -> MaybeT STM ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (STM () -> MaybeT STM ())
-> (HashMap FranzDirectory (HashMap StreamName Stream) -> STM ())
-> HashMap FranzDirectory (HashMap StreamName Stream)
-> MaybeT STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar (HashMap FranzDirectory (HashMap StreamName Stream))
-> HashMap FranzDirectory (HashMap StreamName Stream) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (HashMap FranzDirectory (HashMap StreamName Stream))
vStreams (HashMap FranzDirectory (HashMap StreamName Stream)
 -> MaybeT STM ())
-> HashMap FranzDirectory (HashMap StreamName Stream)
-> MaybeT STM ()
forall a b. (a -> b) -> a -> b
$ if HashMap StreamName Stream -> Bool
forall k v. HashMap k v -> Bool
HM.null HashMap StreamName Stream
newStreams
          -- Stream we're deleting was the
          -- last one around.
          then FranzDirectory
-> HashMap FranzDirectory (HashMap StreamName Stream)
-> HashMap FranzDirectory (HashMap StreamName Stream)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
HM.delete FranzDirectory
mPath HashMap FranzDirectory (HashMap StreamName Stream)
currentAllStreams
          -- Still have some other streams left for this mount path.
          -- Keep those only.
          else FranzDirectory
-> HashMap StreamName Stream
-> HashMap FranzDirectory (HashMap StreamName Stream)
-> HashMap FranzDirectory (HashMap StreamName Stream)
forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
HM.insert FranzDirectory
mPath HashMap StreamName Stream
newStreams HashMap FranzDirectory (HashMap StreamName Stream)
currentAllStreams
        Stream -> MaybeT STM Stream
forall (f :: * -> *) a. Applicative f => a -> f a
pure Stream
currentStream

  -- Take a snapshots of all streams.
  HashMap FranzDirectory (HashMap StreamName Stream)
allStreams <- TVar (HashMap FranzDirectory (HashMap StreamName Stream))
-> IO (HashMap FranzDirectory (HashMap StreamName Stream))
forall a. TVar a -> IO a
readTVarIO TVar (HashMap FranzDirectory (HashMap StreamName Stream))
vStreams
  -- Traverse the snapshot, looking for streams that currently seem
  -- out of date. If we find an out-of-date stream, take outer lock
  -- too, check again, delete it if necessary. Close stream promptly
  -- after deletion.
  --
  -- We could first traverse the whole snapshot, gather potential
  -- streams for deletion, take lock and delete all these streams from
  -- the map. However, on an assumption that we normally reaps streams
  -- and much lower rate than we use them and in favour of locking at
  -- as small time intervals as possible, we simply traverse the
  -- snapshot and if we find an out of date stream in the snapshot, we
  -- take the lock on the whole stream data structure and delete the
  -- stream. We also check that we aren't leaving an empty map entry
  -- behind and if we are, we delete it straight away. This stops us
  -- from having to traverse the whole map later to clean things up as
  -- well as ensuring that we never leave empty values in the map for
  -- others to see.
  --
  -- While doing all this, keep track of how many streams we saw and
  -- how many we have closed which is used later to report statistics
  -- to the user: this is in contrast to doing linear-time traversals
  -- just to get counts of things we've already traversed.
  ReaperState
stats <- (StateT ReaperState IO () -> ReaperState -> IO ReaperState)
-> ReaperState -> StateT ReaperState IO () -> IO ReaperState
forall a b c. (a -> b -> c) -> b -> a -> c
flip StateT ReaperState IO () -> ReaperState -> IO ReaperState
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m s
execStateT (Int -> Int -> ReaperState
ReaperState Int
0 Int
0) (StateT ReaperState IO () -> IO ReaperState)
-> StateT ReaperState IO () -> IO ReaperState
forall a b. (a -> b) -> a -> b
$
    [(FranzDirectory, HashMap StreamName Stream)]
-> ((FranzDirectory, HashMap StreamName Stream)
    -> StateT ReaperState IO ())
-> StateT ReaperState IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (HashMap FranzDirectory (HashMap StreamName Stream)
-> [(FranzDirectory, HashMap StreamName Stream)]
forall k v. HashMap k v -> [(k, v)]
HM.toList HashMap FranzDirectory (HashMap StreamName Stream)
allStreams) (((FranzDirectory, HashMap StreamName Stream)
  -> StateT ReaperState IO ())
 -> StateT ReaperState IO ())
-> ((FranzDirectory, HashMap StreamName Stream)
    -> StateT ReaperState IO ())
-> StateT ReaperState IO ()
forall a b. (a -> b) -> a -> b
$ \(FranzDirectory
mPath, HashMap StreamName Stream
streams) ->
      [(StreamName, Stream)]
-> ((StreamName, Stream) -> StateT ReaperState IO ())
-> StateT ReaperState IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (HashMap StreamName Stream -> [(StreamName, Stream)]
forall k v. HashMap k v -> [(k, v)]
HM.toList HashMap StreamName Stream
streams) (((StreamName, Stream) -> StateT ReaperState IO ())
 -> StateT ReaperState IO ())
-> ((StreamName, Stream) -> StateT ReaperState IO ())
-> StateT ReaperState IO ()
forall a b. (a -> b) -> a -> b
$ \(StreamName
sPath, Stream
stream) -> do
        (ReaperState -> ReaperState) -> StateT ReaperState IO ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((ReaperState -> ReaperState) -> StateT ReaperState IO ())
-> (ReaperState -> ReaperState) -> StateT ReaperState IO ()
forall a b. (a -> b) -> a -> b
$ \ReaperState
s -> ReaperState
s { totalStreams :: Int
totalStreams = ReaperState -> Int
totalStreams ReaperState
s Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1 }
        Activity
snapAct <- IO Activity -> StateT ReaperState IO Activity
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (IO Activity -> StateT ReaperState IO Activity)
-> IO Activity -> StateT ReaperState IO Activity
forall a b. (a -> b) -> a -> b
$ TVar Activity -> IO Activity
forall a. TVar a -> IO a
readTVarIO (Stream -> TVar Activity
vActivity Stream
stream)
        Bool -> StateT ReaperState IO () -> StateT ReaperState IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Activity -> Bool
forall b. Either Double b -> Bool
shouldPrune Activity
snapAct) (StateT ReaperState IO () -> StateT ReaperState IO ())
-> StateT ReaperState IO () -> StateT ReaperState IO ()
forall a b. (a -> b) -> a -> b
$ do
          -- The stream indicates that it's old and should be pruned. Take
          -- a lock on the outer map, check again inside a transaction and
          -- amend the maps as necessary.
          Maybe Stream
deletedStream'm <- IO (Maybe Stream) -> StateT ReaperState IO (Maybe Stream)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (IO (Maybe Stream) -> StateT ReaperState IO (Maybe Stream))
-> (STM (Maybe Stream) -> IO (Maybe Stream))
-> STM (Maybe Stream)
-> StateT ReaperState IO (Maybe Stream)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (Maybe Stream) -> IO (Maybe Stream)
forall a. STM a -> IO a
atomically (STM (Maybe Stream) -> StateT ReaperState IO (Maybe Stream))
-> STM (Maybe Stream) -> StateT ReaperState IO (Maybe Stream)
forall a b. (a -> b) -> a -> b
$ FranzDirectory -> StreamName -> STM (Maybe Stream)
tryPrune FranzDirectory
mPath StreamName
sPath
          Maybe Stream
-> (Stream -> StateT ReaperState IO ()) -> StateT ReaperState IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe Stream
deletedStream'm ((Stream -> StateT ReaperState IO ()) -> StateT ReaperState IO ())
-> (Stream -> StateT ReaperState IO ()) -> StateT ReaperState IO ()
forall a b. (a -> b) -> a -> b
$ \Stream
prunedStream -> do
            IO () -> StateT ReaperState IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (IO () -> StateT ReaperState IO ())
-> IO () -> StateT ReaperState IO ()
forall a b. (a -> b) -> a -> b
$ Stream -> IO ()
closeStream Stream
prunedStream
            (ReaperState -> ReaperState) -> StateT ReaperState IO ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((ReaperState -> ReaperState) -> StateT ReaperState IO ())
-> (ReaperState -> ReaperState) -> StateT ReaperState IO ()
forall a b. (a -> b) -> a -> b
$ \ReaperState
s -> ReaperState
s { prunedStreams :: Int
prunedStreams = ReaperState -> Int
prunedStreams ReaperState
s Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1 }

  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ReaperState -> Int
prunedStreams ReaperState
stats Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Handle -> FilePath -> IO ()
hPutStrLn Handle
stderr (FilePath -> IO ()) -> FilePath -> IO ()
forall a b. (a -> b) -> a -> b
$ [FilePath] -> FilePath
unwords
    [ FilePath
"[reaper] closed"
    , Int -> FilePath
forall a. Show a => a -> FilePath
show (ReaperState -> Int
prunedStreams ReaperState
stats)
    , FilePath
"out of"
    , Int -> FilePath
forall a. Show a => a -> FilePath
show (ReaperState -> Int
totalStreams ReaperState
stats)
    ]

  Int -> IO ()
threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Double -> Int
forall a b. (RealFrac a, Integral b) => a -> b
floor (Double -> Int) -> Double -> Int
forall a b. (a -> b) -> a -> b
$ Double
int Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1e6

withFranzReader :: (FranzReader -> IO ()) -> IO ()
withFranzReader :: (FranzReader -> IO ()) -> IO ()
withFranzReader = IO FranzReader
-> (FranzReader -> IO ()) -> (FranzReader -> IO ()) -> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO FranzReader
newFranzReader FranzReader -> IO ()
closeFranzReader

newFranzReader :: IO FranzReader
newFranzReader :: IO FranzReader
newFranzReader = do
  TVar (HashMap FranzDirectory (HashMap StreamName Stream))
vStreams <- HashMap FranzDirectory (HashMap StreamName Stream)
-> IO (TVar (HashMap FranzDirectory (HashMap StreamName Stream)))
forall a. a -> IO (TVar a)
newTVarIO HashMap FranzDirectory (HashMap StreamName Stream)
forall k v. HashMap k v
HM.empty
  WatchManager
watchManager <- IO WatchManager
FS.startManager
  FranzReader -> IO FranzReader
forall (f :: * -> *) a. Applicative f => a -> f a
pure FranzReader :: WatchManager
-> TVar (HashMap FranzDirectory (HashMap StreamName Stream))
-> FranzReader
FranzReader{TVar (HashMap FranzDirectory (HashMap StreamName Stream))
WatchManager
watchManager :: WatchManager
vStreams :: TVar (HashMap FranzDirectory (HashMap StreamName Stream))
vStreams :: TVar (HashMap FranzDirectory (HashMap StreamName Stream))
watchManager :: WatchManager
..}

closeFranzReader :: FranzReader -> IO ()
closeFranzReader :: FranzReader -> IO ()
closeFranzReader FranzReader{TVar (HashMap FranzDirectory (HashMap StreamName Stream))
WatchManager
vStreams :: TVar (HashMap FranzDirectory (HashMap StreamName Stream))
watchManager :: WatchManager
vStreams :: FranzReader
-> TVar (HashMap FranzDirectory (HashMap StreamName Stream))
watchManager :: FranzReader -> WatchManager
..} = do
  WatchManager -> IO ()
FS.stopManager WatchManager
watchManager
  TVar (HashMap FranzDirectory (HashMap StreamName Stream))
-> IO (HashMap FranzDirectory (HashMap StreamName Stream))
forall a. TVar a -> IO a
readTVarIO TVar (HashMap FranzDirectory (HashMap StreamName Stream))
vStreams IO (HashMap FranzDirectory (HashMap StreamName Stream))
-> (HashMap FranzDirectory (HashMap StreamName Stream) -> IO ())
-> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (HashMap StreamName Stream -> IO ())
-> HashMap FranzDirectory (HashMap StreamName Stream) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ((Stream -> IO ()) -> HashMap StreamName Stream -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Stream -> IO ()
closeStream)

-- | Globally-configured path which contains franz directories.
newtype FranzPrefix = FranzPrefix { FranzPrefix -> FilePath
unFranzPrefix :: FilePath } deriving (FranzPrefix -> FranzPrefix -> Bool
(FranzPrefix -> FranzPrefix -> Bool)
-> (FranzPrefix -> FranzPrefix -> Bool) -> Eq FranzPrefix
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: FranzPrefix -> FranzPrefix -> Bool
$c/= :: FranzPrefix -> FranzPrefix -> Bool
== :: FranzPrefix -> FranzPrefix -> Bool
$c== :: FranzPrefix -> FranzPrefix -> Bool
Eq, Eq FranzPrefix
Eq FranzPrefix
-> (Int -> FranzPrefix -> Int)
-> (FranzPrefix -> Int)
-> Hashable FranzPrefix
Int -> FranzPrefix -> Int
FranzPrefix -> Int
forall a. Eq a -> (Int -> a -> Int) -> (a -> Int) -> Hashable a
hash :: FranzPrefix -> Int
$chash :: FranzPrefix -> Int
hashWithSalt :: Int -> FranzPrefix -> Int
$chashWithSalt :: Int -> FranzPrefix -> Int
$cp1Hashable :: Eq FranzPrefix
Hashable)

-- | Directory which contains franz streams.
-- Values of this type serve two purposes:
--
-- * Arbitrary prefix so that clients don't have to specify the full path 
newtype FranzDirectory = FranzDirectory FilePath deriving (FranzDirectory -> FranzDirectory -> Bool
(FranzDirectory -> FranzDirectory -> Bool)
-> (FranzDirectory -> FranzDirectory -> Bool) -> Eq FranzDirectory
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: FranzDirectory -> FranzDirectory -> Bool
$c/= :: FranzDirectory -> FranzDirectory -> Bool
== :: FranzDirectory -> FranzDirectory -> Bool
$c== :: FranzDirectory -> FranzDirectory -> Bool
Eq, Eq FranzDirectory
Eq FranzDirectory
-> (Int -> FranzDirectory -> Int)
-> (FranzDirectory -> Int)
-> Hashable FranzDirectory
Int -> FranzDirectory -> Int
FranzDirectory -> Int
forall a. Eq a -> (Int -> a -> Int) -> (a -> Int) -> Hashable a
hash :: FranzDirectory -> Int
$chash :: FranzDirectory -> Int
hashWithSalt :: Int -> FranzDirectory -> Int
$chashWithSalt :: Int -> FranzDirectory -> Int
$cp1Hashable :: Eq FranzDirectory
Hashable)

getFranzDirectory :: FranzPrefix -> FranzDirectory -> FilePath
getFranzDirectory :: FranzPrefix -> FranzDirectory -> FilePath
getFranzDirectory (FranzPrefix FilePath
prefix) (FranzDirectory FilePath
dir) = FilePath
prefix FilePath -> FilePath -> FilePath
</> FilePath
dir

getFranzStreamPath :: FranzPrefix -> FranzDirectory -> StreamName -> FilePath
getFranzStreamPath :: FranzPrefix -> FranzDirectory -> StreamName -> FilePath
getFranzStreamPath FranzPrefix
prefix FranzDirectory
dir StreamName
name
  = FranzPrefix -> FranzDirectory -> FilePath
getFranzDirectory FranzPrefix
prefix FranzDirectory
dir FilePath -> FilePath -> FilePath
</> StreamName -> FilePath
streamNameToPath StreamName
name

handleQuery :: FranzPrefix
  -> FranzReader
  -> FranzDirectory
  -> Query
  -> (FranzException -> IO r)
  -> (Stream -> STM (Bool, QueryResult) -> IO r) -> IO r
handleQuery :: FranzPrefix
-> FranzReader
-> FranzDirectory
-> Query
-> (FranzException -> IO r)
-> (Stream -> STM (Bool, QueryResult) -> IO r)
-> IO r
handleQuery FranzPrefix
prefix FranzReader{TVar (HashMap FranzDirectory (HashMap StreamName Stream))
WatchManager
vStreams :: TVar (HashMap FranzDirectory (HashMap StreamName Stream))
watchManager :: WatchManager
vStreams :: FranzReader
-> TVar (HashMap FranzDirectory (HashMap StreamName Stream))
watchManager :: FranzReader -> WatchManager
..} FranzDirectory
dir (Query StreamName
name ItemRef
begin_ ItemRef
end_ RequestType
rt) FranzException -> IO r
onError Stream -> STM (Bool, QueryResult) -> IO r
cont
  = IO (Either FranzException Stream)
-> (Either FranzException Stream -> IO ())
-> (Either FranzException Stream -> IO r)
-> IO r
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (IO Stream -> IO (Either FranzException Stream)
forall e a. Exception e => IO a -> IO (Either e a)
try IO Stream
acquire) ((FranzException -> IO ())
-> (Stream -> IO ()) -> Either FranzException Stream -> IO ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either FranzException -> IO ()
forall a. Monoid a => a
mempty Stream -> IO ()
removeActivity) ((Either FranzException Stream -> IO r) -> IO r)
-> (Either FranzException Stream -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \case
  Left FranzException
err -> FranzException -> IO r
onError FranzException
err
  Right stream :: Stream
stream@Stream{FilePath
Handle
ThreadId
TVar Int
TVar Activity
TVar (IntMap Int)
TVar StreamStatus
HashMap IndexName (TVar (IntMap Int))
Vector IndexName
vActivity :: TVar Activity
payloadHandle :: Handle
indexHandle :: Handle
followThread :: ThreadId
vStatus :: TVar StreamStatus
vCount :: TVar Int
indices :: HashMap IndexName (TVar (IntMap Int))
indexNames :: Vector IndexName
vOffsets :: TVar (IntMap Int)
streamPath :: FilePath
vActivity :: Stream -> TVar Activity
payloadHandle :: Stream -> Handle
indexHandle :: Stream -> Handle
followThread :: Stream -> ThreadId
vStatus :: Stream -> TVar StreamStatus
vCount :: Stream -> TVar Int
indices :: Stream -> HashMap IndexName (TVar (IntMap Int))
indexNames :: Stream -> Vector IndexName
vOffsets :: Stream -> TVar (IntMap Int)
streamPath :: Stream -> FilePath
..} -> Stream -> STM (Bool, QueryResult) -> IO r
cont Stream
stream (STM (Bool, QueryResult) -> IO r)
-> STM (Bool, QueryResult) -> IO r
forall a b. (a -> b) -> a -> b
$ do
    TVar StreamStatus -> STM StreamStatus
forall a. TVar a -> STM a
readTVar TVar StreamStatus
vStatus STM StreamStatus -> (StreamStatus -> STM ()) -> STM ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        StreamStatus
Outdated -> STM ()
forall a. STM a
retry
        StreamStatus
CaughtUp -> () -> STM ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        StreamStatus
Gone -> FranzException -> STM ()
forall e a. Exception e => e -> STM a
throwSTM (FranzException -> STM ()) -> FranzException -> STM ()
forall a b. (a -> b) -> a -> b
$ FilePath -> FranzException
StreamNotFound FilePath
streamPath
    IntMap Int
allOffsets <- TVar (IntMap Int) -> STM (IntMap Int)
forall a. TVar a -> STM a
readTVar TVar (IntMap Int)
vOffsets
    let finalOffset :: Int
finalOffset = case IntMap Int -> Maybe ((Int, Int), IntMap Int)
forall a. IntMap a -> Maybe ((Int, a), IntMap a)
IM.maxViewWithKey IntMap Int
allOffsets of
          Just ((Int
k, Int
_), IntMap Int
_) -> Int
k Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
          Maybe ((Int, Int), IntMap Int)
Nothing -> Int
0
    let rotate :: Int -> Int
rotate Int
i
          | Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
0 = Int
finalOffset Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
i
          | Bool
otherwise = Int
i
    Int
begin <- case ItemRef
begin_ of
      BySeqNum Int
i -> Int -> STM Int
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> STM Int) -> Int -> STM Int
forall a b. (a -> b) -> a -> b
$ Int -> Int
rotate Int
i
      ByIndex IndexName
index Int
val -> case IndexName
-> HashMap IndexName (TVar (IntMap Int))
-> Maybe (TVar (IntMap Int))
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup IndexName
index HashMap IndexName (TVar (IntMap Int))
indices of
        Maybe (TVar (IntMap Int))
Nothing -> FranzException -> STM Int
forall e a. Exception e => e -> STM a
throwSTM (FranzException -> STM Int) -> FranzException -> STM Int
forall a b. (a -> b) -> a -> b
$ IndexName -> [IndexName] -> FranzException
IndexNotFound IndexName
index ([IndexName] -> FranzException) -> [IndexName] -> FranzException
forall a b. (a -> b) -> a -> b
$ HashMap IndexName (TVar (IntMap Int)) -> [IndexName]
forall k v. HashMap k v -> [k]
HM.keys HashMap IndexName (TVar (IntMap Int))
indices
        Just TVar (IntMap Int)
v -> do
          IntMap Int
m <- TVar (IntMap Int) -> STM (IntMap Int)
forall a. TVar a -> STM a
readTVar TVar (IntMap Int)
v
          let (IntMap Int
_, IntMap Int
wing) = Int -> IntMap Int -> (IntMap Int, IntMap Int)
forall a. Int -> IntMap a -> (IntMap a, IntMap a)
splitR Int
val IntMap Int
m
          Int -> STM Int
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> STM Int) -> Int -> STM Int
forall a b. (a -> b) -> a -> b
$! Int -> ((Int, IntMap Int) -> Int) -> Maybe (Int, IntMap Int) -> Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Int
forall a. Bounded a => a
maxBound (Int, IntMap Int) -> Int
forall a b. (a, b) -> a
fst (Maybe (Int, IntMap Int) -> Int) -> Maybe (Int, IntMap Int) -> Int
forall a b. (a -> b) -> a -> b
$ IntMap Int -> Maybe (Int, IntMap Int)
forall a. IntMap a -> Maybe (a, IntMap a)
IM.minView IntMap Int
wing
    Int
end <- case ItemRef
end_ of
      BySeqNum Int
i -> Int -> STM Int
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> STM Int) -> Int -> STM Int
forall a b. (a -> b) -> a -> b
$ Int -> Int
rotate Int
i
      ByIndex IndexName
index Int
val -> case IndexName
-> HashMap IndexName (TVar (IntMap Int))
-> Maybe (TVar (IntMap Int))
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup IndexName
index HashMap IndexName (TVar (IntMap Int))
indices of
        Maybe (TVar (IntMap Int))
Nothing -> FranzException -> STM Int
forall e a. Exception e => e -> STM a
throwSTM (FranzException -> STM Int) -> FranzException -> STM Int
forall a b. (a -> b) -> a -> b
$ IndexName -> [IndexName] -> FranzException
IndexNotFound IndexName
index ([IndexName] -> FranzException) -> [IndexName] -> FranzException
forall a b. (a -> b) -> a -> b
$ HashMap IndexName (TVar (IntMap Int)) -> [IndexName]
forall k v. HashMap k v -> [k]
HM.keys HashMap IndexName (TVar (IntMap Int))
indices
        Just TVar (IntMap Int)
v -> do
          IntMap Int
m <- TVar (IntMap Int) -> STM (IntMap Int)
forall a. TVar a -> STM a
readTVar TVar (IntMap Int)
v
          let (IntMap Int
body, Maybe Int
lastItem, IntMap Int
_) = Int -> IntMap Int -> (IntMap Int, Maybe Int, IntMap Int)
forall a. Int -> IntMap a -> (IntMap a, Maybe a, IntMap a)
IM.splitLookup Int
val IntMap Int
m
          let body' :: IntMap Int
body' = (IntMap Int -> IntMap Int)
-> (Int -> IntMap Int -> IntMap Int)
-> Maybe Int
-> IntMap Int
-> IntMap Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe IntMap Int -> IntMap Int
forall a. a -> a
id (Int -> Int -> IntMap Int -> IntMap Int
forall a. Int -> a -> IntMap a -> IntMap a
IM.insert Int
val) Maybe Int
lastItem IntMap Int
body
          Int -> STM Int
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> STM Int) -> Int -> STM Int
forall a b. (a -> b) -> a -> b
$! Int -> ((Int, IntMap Int) -> Int) -> Maybe (Int, IntMap Int) -> Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Int
forall a. Bounded a => a
minBound (Int, IntMap Int) -> Int
forall a b. (a, b) -> a
fst (Maybe (Int, IntMap Int) -> Int) -> Maybe (Int, IntMap Int) -> Int
forall a b. (a -> b) -> a -> b
$ IntMap Int -> Maybe (Int, IntMap Int)
forall a. IntMap a -> Maybe (a, IntMap a)
IM.maxView IntMap Int
body'
    (Bool, QueryResult) -> STM (Bool, QueryResult)
forall (m :: * -> *) a. Monad m => a -> m a
return ((Bool, QueryResult) -> STM (Bool, QueryResult))
-> (Bool, QueryResult) -> STM (Bool, QueryResult)
forall a b. (a -> b) -> a -> b
$! Int -> Int -> RequestType -> IntMap Int -> (Bool, QueryResult)
range Int
begin Int
end RequestType
rt IntMap Int
allOffsets
  where
    acquire :: IO Stream
acquire = IO (IO Stream) -> IO Stream
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO Stream) -> IO Stream) -> IO (IO Stream) -> IO Stream
forall a b. (a -> b) -> a -> b
$ STM (IO Stream) -> IO (IO Stream)
forall a. STM a -> IO a
atomically (STM (IO Stream) -> IO (IO Stream))
-> STM (IO Stream) -> IO (IO Stream)
forall a b. (a -> b) -> a -> b
$ do
      HashMap FranzDirectory (HashMap StreamName Stream)
allStreams <- TVar (HashMap FranzDirectory (HashMap StreamName Stream))
-> STM (HashMap FranzDirectory (HashMap StreamName Stream))
forall a. TVar a -> STM a
readTVar TVar (HashMap FranzDirectory (HashMap StreamName Stream))
vStreams
      let !streams :: HashMap StreamName Stream
streams = HashMap StreamName Stream
-> (HashMap StreamName Stream -> HashMap StreamName Stream)
-> Maybe (HashMap StreamName Stream)
-> HashMap StreamName Stream
forall b a. b -> (a -> b) -> Maybe a -> b
maybe HashMap StreamName Stream
forall a. Monoid a => a
mempty HashMap StreamName Stream -> HashMap StreamName Stream
forall a. a -> a
id (Maybe (HashMap StreamName Stream) -> HashMap StreamName Stream)
-> Maybe (HashMap StreamName Stream) -> HashMap StreamName Stream
forall a b. (a -> b) -> a -> b
$ FranzDirectory
-> HashMap FranzDirectory (HashMap StreamName Stream)
-> Maybe (HashMap StreamName Stream)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup FranzDirectory
dir HashMap FranzDirectory (HashMap StreamName Stream)
allStreams
      case StreamName -> HashMap StreamName Stream -> Maybe Stream
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup StreamName
name HashMap StreamName Stream
streams of
        Maybe Stream
Nothing -> IO Stream -> STM (IO Stream)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (IO Stream -> STM (IO Stream)) -> IO Stream -> STM (IO Stream)
forall a b. (a -> b) -> a -> b
$ IO Stream
-> (Stream -> IO ()) -> (Stream -> IO Stream) -> IO Stream
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracketOnError
          (WatchManager -> FilePath -> IO Stream
createStream WatchManager
watchManager (FilePath -> IO Stream) -> FilePath -> IO Stream
forall a b. (a -> b) -> a -> b
$ FranzPrefix -> FranzDirectory -> StreamName -> FilePath
getFranzStreamPath FranzPrefix
prefix FranzDirectory
dir StreamName
name)
          Stream -> IO ()
closeStream ((Stream -> IO Stream) -> IO Stream)
-> (Stream -> IO Stream) -> IO Stream
forall a b. (a -> b) -> a -> b
$ \Stream
s -> STM Stream -> IO Stream
forall a. STM a -> IO a
atomically (STM Stream -> IO Stream) -> STM Stream -> IO Stream
forall a b. (a -> b) -> a -> b
$ do
            Stream -> STM ()
addActivity Stream
s
            TVar (HashMap FranzDirectory (HashMap StreamName Stream))
-> (HashMap FranzDirectory (HashMap StreamName Stream)
    -> HashMap FranzDirectory (HashMap StreamName Stream))
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (HashMap FranzDirectory (HashMap StreamName Stream))
vStreams ((HashMap FranzDirectory (HashMap StreamName Stream)
  -> HashMap FranzDirectory (HashMap StreamName Stream))
 -> STM ())
-> (HashMap FranzDirectory (HashMap StreamName Stream)
    -> HashMap FranzDirectory (HashMap StreamName Stream))
-> STM ()
forall a b. (a -> b) -> a -> b
$ FranzDirectory
-> HashMap StreamName Stream
-> HashMap FranzDirectory (HashMap StreamName Stream)
-> HashMap FranzDirectory (HashMap StreamName Stream)
forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
HM.insert FranzDirectory
dir (HashMap StreamName Stream
 -> HashMap FranzDirectory (HashMap StreamName Stream)
 -> HashMap FranzDirectory (HashMap StreamName Stream))
-> HashMap StreamName Stream
-> HashMap FranzDirectory (HashMap StreamName Stream)
-> HashMap FranzDirectory (HashMap StreamName Stream)
forall a b. (a -> b) -> a -> b
$ StreamName
-> Stream -> HashMap StreamName Stream -> HashMap StreamName Stream
forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
HM.insert StreamName
name Stream
s HashMap StreamName Stream
streams
            Stream -> STM Stream
forall (f :: * -> *) a. Applicative f => a -> f a
pure Stream
s
        Just Stream
s -> do
          Stream -> STM ()
addActivity Stream
s
          IO Stream -> STM (IO Stream)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Stream -> IO Stream
forall (f :: * -> *) a. Applicative f => a -> f a
pure Stream
s)