{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE GeneralisedNewtypeDeriving #-}
module Database.Franz.Internal.Reader where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Control.Monad.State.Strict
import Control.Monad.Trans.Maybe
import Data.Hashable (Hashable)
import Data.Serialize
import Database.Franz.Internal.Protocol
import qualified Data.ByteString.Char8 as B
import qualified Data.HashMap.Strict as HM
import qualified Data.IntMap.Strict as IM
import qualified Data.Vector.Unboxed as U
import qualified Data.Vector as V
import Data.Maybe (isJust)
import GHC.Clock (getMonotonicTime)
import System.Directory
import System.FilePath
import System.IO
import qualified System.FSNotify as FS
data StreamStatus = CaughtUp | Outdated | Gone deriving StreamStatus -> StreamStatus -> Bool
(StreamStatus -> StreamStatus -> Bool)
-> (StreamStatus -> StreamStatus -> Bool) -> Eq StreamStatus
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: StreamStatus -> StreamStatus -> Bool
$c/= :: StreamStatus -> StreamStatus -> Bool
== :: StreamStatus -> StreamStatus -> Bool
$c== :: StreamStatus -> StreamStatus -> Bool
Eq
data Stream = Stream
{ Stream -> FilePath
streamPath :: FilePath
, Stream -> TVar (IntMap Int)
vOffsets :: !(TVar (IM.IntMap Int))
, Stream -> Vector IndexName
indexNames :: !(V.Vector IndexName)
, Stream -> HashMap IndexName (TVar (IntMap Int))
indices :: !(HM.HashMap IndexName (TVar (IM.IntMap Int)))
, Stream -> TVar Int
vCount :: !(TVar Int)
, Stream -> TVar StreamStatus
vStatus :: !(TVar StreamStatus)
, Stream -> ThreadId
followThread :: !ThreadId
, Stream -> Handle
indexHandle :: !Handle
, Stream -> Handle
payloadHandle :: !Handle
, Stream -> TVar Activity
vActivity :: !(TVar Activity)
}
type Activity = Either Double Int
addActivity :: Stream -> STM ()
addActivity :: Stream -> STM ()
addActivity Stream
str = TVar Activity -> (Activity -> Activity) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (Stream -> TVar Activity
vActivity Stream
str) ((Activity -> Activity) -> STM ())
-> (Activity -> Activity) -> STM ()
forall a b. (a -> b) -> a -> b
$ \case
Left Double
_ -> Int -> Activity
forall a b. b -> Either a b
Right Int
0
Right Int
n -> Int -> Activity
forall a b. b -> Either a b
Right (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
removeActivity :: Stream -> IO ()
removeActivity :: Stream -> IO ()
removeActivity Stream
str = do
Double
now <- IO Double
getMonotonicTime
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Activity -> (Activity -> Activity) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (Stream -> TVar Activity
vActivity Stream
str) ((Activity -> Activity) -> STM ())
-> (Activity -> Activity) -> STM ()
forall a b. (a -> b) -> a -> b
$ \case
Left Double
_ -> Double -> Activity
forall a b. a -> Either a b
Left Double
now
Right Int
n
| Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 -> Double -> Activity
forall a b. a -> Either a b
Left Double
now
| Bool
otherwise -> Int -> Activity
forall a b. b -> Either a b
Right (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)
closeStream :: Stream -> IO ()
closeStream :: Stream -> IO ()
closeStream Stream{FilePath
Handle
ThreadId
TVar Int
TVar Activity
TVar (IntMap Int)
TVar StreamStatus
HashMap IndexName (TVar (IntMap Int))
Vector IndexName
vActivity :: TVar Activity
payloadHandle :: Handle
indexHandle :: Handle
followThread :: ThreadId
vStatus :: TVar StreamStatus
vCount :: TVar Int
indices :: HashMap IndexName (TVar (IntMap Int))
indexNames :: Vector IndexName
vOffsets :: TVar (IntMap Int)
streamPath :: FilePath
vActivity :: Stream -> TVar Activity
payloadHandle :: Stream -> Handle
indexHandle :: Stream -> Handle
followThread :: Stream -> ThreadId
vStatus :: Stream -> TVar StreamStatus
vCount :: Stream -> TVar Int
indices :: Stream -> HashMap IndexName (TVar (IntMap Int))
indexNames :: Stream -> Vector IndexName
vOffsets :: Stream -> TVar (IntMap Int)
streamPath :: Stream -> FilePath
..} = do
ThreadId -> IO ()
killThread ThreadId
followThread
Handle -> IO ()
hClose Handle
payloadHandle
Handle -> IO ()
hClose Handle
indexHandle
createStream :: FS.WatchManager -> FilePath -> IO Stream
createStream :: WatchManager -> FilePath -> IO Stream
createStream WatchManager
man FilePath
path = do
let offsetPath :: FilePath
offsetPath = FilePath
path FilePath -> FilePath -> FilePath
</> FilePath
"offsets"
let payloadPath :: FilePath
payloadPath = FilePath
path FilePath -> FilePath -> FilePath
</> FilePath
"payloads"
Bool
exist <- FilePath -> IO Bool
doesFileExist FilePath
offsetPath
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
exist (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ FranzException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (FranzException -> IO ()) -> FranzException -> IO ()
forall a b. (a -> b) -> a -> b
$ FilePath -> FranzException
StreamNotFound FilePath
offsetPath
IndexName
initialOffsetsBS <- FilePath -> IO IndexName
B.readFile FilePath
offsetPath
Handle
payloadHandle <- FilePath -> IOMode -> IO Handle
openBinaryFile FilePath
payloadPath IOMode
ReadMode
Vector IndexName
indexNames <- [IndexName] -> Vector IndexName
forall a. [a] -> Vector a
V.fromList ([IndexName] -> Vector IndexName)
-> (IndexName -> [IndexName]) -> IndexName -> Vector IndexName
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IndexName -> [IndexName]
B.lines (IndexName -> Vector IndexName)
-> IO IndexName -> IO (Vector IndexName)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> FilePath -> IO IndexName
B.readFile (FilePath
path FilePath -> FilePath -> FilePath
</> FilePath
"indices")
let icount :: Int
icount = Int
1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Vector IndexName -> Int
forall a. Vector a -> Int
V.length Vector IndexName
indexNames
let count :: Int
count = IndexName -> Int
B.length IndexName
initialOffsetsBS Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` (Int
8 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
icount)
let getI :: Get Int
getI = Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Int) -> Get Int64 -> Get Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get Int64
getInt64le
Vector (Vector Int)
initialIndices <- (FilePath -> IO (Vector (Vector Int)))
-> (Vector (Vector Int) -> IO (Vector (Vector Int)))
-> Either FilePath (Vector (Vector Int))
-> IO (Vector (Vector Int))
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (FranzException -> IO (Vector (Vector Int))
forall e a. Exception e => e -> IO a
throwIO (FranzException -> IO (Vector (Vector Int)))
-> (FilePath -> FranzException)
-> FilePath
-> IO (Vector (Vector Int))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FilePath -> FranzException
InternalError) Vector (Vector Int) -> IO (Vector (Vector Int))
forall (f :: * -> *) a. Applicative f => a -> f a
pure
(Either FilePath (Vector (Vector Int)) -> IO (Vector (Vector Int)))
-> Either FilePath (Vector (Vector Int))
-> IO (Vector (Vector Int))
forall a b. (a -> b) -> a -> b
$ Get (Vector (Vector Int))
-> IndexName -> Either FilePath (Vector (Vector Int))
forall a. Get a -> IndexName -> Either FilePath a
runGet (Int -> Get (Vector Int) -> Get (Vector (Vector Int))
forall (m :: * -> *) a. Monad m => Int -> m a -> m (Vector a)
V.replicateM Int
count (Get (Vector Int) -> Get (Vector (Vector Int)))
-> Get (Vector Int) -> Get (Vector (Vector Int))
forall a b. (a -> b) -> a -> b
$ Int -> Get Int -> Get (Vector Int)
forall (m :: * -> *) a.
(Monad m, Unbox a) =>
Int -> m a -> m (Vector a)
U.replicateM Int
icount Get Int
getI) IndexName
initialOffsetsBS
let initialOffsets :: IntMap Int
initialOffsets = [(Int, Int)] -> IntMap Int
forall a. [(Int, a)] -> IntMap a
IM.fromList ([(Int, Int)] -> IntMap Int) -> [(Int, Int)] -> IntMap Int
forall a b. (a -> b) -> a -> b
$ Vector (Int, Int) -> [(Int, Int)]
forall a. Vector a -> [a]
V.toList
(Vector (Int, Int) -> [(Int, Int)])
-> Vector (Int, Int) -> [(Int, Int)]
forall a b. (a -> b) -> a -> b
$ Vector Int -> Vector Int -> Vector (Int, Int)
forall a b. Vector a -> Vector b -> Vector (a, b)
V.zip (Int -> Int -> Vector Int
forall a. Num a => a -> Int -> Vector a
V.enumFromN Int
0 Int
count) (Vector Int -> Vector (Int, Int))
-> Vector Int -> Vector (Int, Int)
forall a b. (a -> b) -> a -> b
$ (Vector Int -> Int) -> Vector (Vector Int) -> Vector Int
forall a b. (a -> b) -> Vector a -> Vector b
V.map Vector Int -> Int
forall a. Unbox a => Vector a -> a
U.head Vector (Vector Int)
initialIndices
TVar (IntMap Int)
vOffsets <- IntMap Int -> IO (TVar (IntMap Int))
forall a. a -> IO (TVar a)
newTVarIO (IntMap Int -> IO (TVar (IntMap Int)))
-> IntMap Int -> IO (TVar (IntMap Int))
forall a b. (a -> b) -> a -> b
$! IntMap Int
initialOffsets
TVar StreamStatus
vStatus <- StreamStatus -> IO (TVar StreamStatus)
forall a. a -> IO (TVar a)
newTVarIO StreamStatus
Outdated
TVar Int
vCount <- Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO (Int -> IO (TVar Int)) -> Int -> IO (TVar Int)
forall a b. (a -> b) -> a -> b
$! IntMap Int -> Int
forall a. IntMap a -> Int
IM.size IntMap Int
initialOffsets
IO ()
_ <- WatchManager -> FilePath -> ActionPredicate -> Action -> IO (IO ())
FS.watchDir WatchManager
man FilePath
path (\case
FS.Modified FilePath
p UTCTime
_ Bool
_ | FilePath
p FilePath -> FilePath -> Bool
forall a. Eq a => a -> a -> Bool
== FilePath
offsetPath -> Bool
True
FS.Removed FilePath
p UTCTime
_ Bool
_ | FilePath
p FilePath -> FilePath -> Bool
forall a. Eq a => a -> a -> Bool
== FilePath
offsetPath -> Bool
True
Event
_ -> Bool
False)
(Action -> IO (IO ())) -> Action -> IO (IO ())
forall a b. (a -> b) -> a -> b
$ \case
FS.Modified FilePath
_ UTCTime
_ Bool
_ -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar StreamStatus -> StreamStatus -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar StreamStatus
vStatus StreamStatus
Outdated
FS.Removed FilePath
_ UTCTime
_ Bool
_ -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar StreamStatus -> StreamStatus -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar StreamStatus
vStatus StreamStatus
Gone
Event
_ -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
[TVar (IntMap Int)]
vIndices <- [Int] -> (Int -> IO (TVar (IntMap Int))) -> IO [TVar (IntMap Int)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Int
1..Vector IndexName -> Int
forall a. Vector a -> Int
V.length Vector IndexName
indexNames] ((Int -> IO (TVar (IntMap Int))) -> IO [TVar (IntMap Int)])
-> (Int -> IO (TVar (IntMap Int))) -> IO [TVar (IntMap Int)]
forall a b. (a -> b) -> a -> b
$ \Int
i -> IntMap Int -> IO (TVar (IntMap Int))
forall a. a -> IO (TVar a)
newTVarIO
(IntMap Int -> IO (TVar (IntMap Int)))
-> IntMap Int -> IO (TVar (IntMap Int))
forall a b. (a -> b) -> a -> b
$ [(Int, Int)] -> IntMap Int
forall a. [(Int, a)] -> IntMap a
IM.fromList ([(Int, Int)] -> IntMap Int) -> [(Int, Int)] -> IntMap Int
forall a b. (a -> b) -> a -> b
$ Vector (Int, Int) -> [(Int, Int)]
forall a. Vector a -> [a]
V.toList (Vector (Int, Int) -> [(Int, Int)])
-> Vector (Int, Int) -> [(Int, Int)]
forall a b. (a -> b) -> a -> b
$ Vector Int -> Vector Int -> Vector (Int, Int)
forall a b. Vector a -> Vector b -> Vector (a, b)
V.zip ((Vector Int -> Int) -> Vector (Vector Int) -> Vector Int
forall a b. (a -> b) -> Vector a -> Vector b
V.map (Vector Int -> Int -> Int
forall a. Unbox a => Vector a -> Int -> a
U.! Int
i) Vector (Vector Int)
initialIndices) (Int -> Int -> Vector Int
forall a. Num a => a -> Int -> Vector a
V.enumFromN Int
0 Int
count)
Handle
indexHandle <- FilePath -> IOMode -> IO Handle
openFile FilePath
offsetPath IOMode
ReadMode
let final :: Either SomeException () -> IO ()
final :: Either SomeException () -> IO ()
final (Left SomeException
exc) | Just AsyncException
ThreadKilled <- SomeException -> Maybe AsyncException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
exc = () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
final (Left SomeException
exc) = [FilePath] -> IO ()
logFollower [FilePath
path, FilePath
"terminated with", SomeException -> FilePath
forall a. Show a => a -> FilePath
show SomeException
exc]
final (Right ()
_) = [FilePath] -> IO ()
logFollower [FilePath
path, FilePath
"has been removed"]
ThreadId
followThread <- (IO () -> (Either SomeException () -> IO ()) -> IO ThreadId)
-> (Either SomeException () -> IO ()) -> IO () -> IO ThreadId
forall a b c. (a -> b -> c) -> b -> a -> c
flip IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally Either SomeException () -> IO ()
final (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
Maybe ((Int, Int), IntMap Int)
-> (((Int, Int), IntMap Int) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (IntMap Int -> Maybe ((Int, Int), IntMap Int)
forall a. IntMap a -> Maybe ((Int, a), IntMap a)
IM.maxViewWithKey IntMap Int
initialOffsets) ((((Int, Int), IntMap Int) -> IO ()) -> IO ())
-> (((Int, Int), IntMap Int) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \((Int
i, Int
_), IntMap Int
_) ->
Handle -> SeekMode -> Integer -> IO ()
hSeek Handle
indexHandle SeekMode
AbsoluteSeek (Integer -> IO ()) -> Integer -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Integer) -> Int -> Integer
forall a b. (a -> b) -> a -> b
$ Int -> Int
forall a. Enum a => a -> a
succ Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
icount Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
8
(IO () -> IO ()) -> IO ()
forall a. (a -> a) -> a
fix ((IO () -> IO ()) -> IO ()) -> (IO () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \IO ()
self -> do
IndexName
bs <- Handle -> Int -> IO IndexName
B.hGet Handle
indexHandle (Int
8 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
icount)
if IndexName -> Bool
B.null IndexName
bs
then do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar StreamStatus -> (StreamStatus -> StreamStatus) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar StreamStatus
vStatus ((StreamStatus -> StreamStatus) -> STM ())
-> (StreamStatus -> StreamStatus) -> STM ()
forall a b. (a -> b) -> a -> b
$ \case
StreamStatus
Outdated -> StreamStatus
CaughtUp
StreamStatus
CaughtUp -> StreamStatus
CaughtUp
StreamStatus
Gone -> StreamStatus
Gone
Bool
continue <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TVar StreamStatus -> STM StreamStatus
forall a. TVar a -> STM a
readTVar TVar StreamStatus
vStatus STM StreamStatus -> (StreamStatus -> STM Bool) -> STM Bool
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
StreamStatus
CaughtUp -> STM Bool
forall a. STM a
retry
StreamStatus
Outdated -> Bool -> STM Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
StreamStatus
Gone -> Bool -> STM Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
continue IO ()
self
else do
Int
ofs : [Int]
indices <- (FilePath -> IO [Int])
-> ([Int] -> IO [Int]) -> Either FilePath [Int] -> IO [Int]
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (FranzException -> IO [Int]
forall e a. Exception e => e -> IO a
throwIO (FranzException -> IO [Int])
-> (FilePath -> FranzException) -> FilePath -> IO [Int]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FilePath -> FranzException
InternalError) [Int] -> IO [Int]
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either FilePath [Int] -> IO [Int])
-> Either FilePath [Int] -> IO [Int]
forall a b. (a -> b) -> a -> b
$ Get [Int] -> IndexName -> Either FilePath [Int]
forall a. Get a -> IndexName -> Either FilePath a
runGet (Int -> Get Int -> Get [Int]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
icount Get Int
getI) IndexName
bs
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Int
i <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
vCount
TVar (IntMap Int) -> (IntMap Int -> IntMap Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (IntMap Int)
vOffsets ((IntMap Int -> IntMap Int) -> STM ())
-> (IntMap Int -> IntMap Int) -> STM ()
forall a b. (a -> b) -> a -> b
$ Int -> Int -> IntMap Int -> IntMap Int
forall a. Int -> a -> IntMap a -> IntMap a
IM.insert Int
i Int
ofs
[(TVar (IntMap Int), Int)]
-> ((TVar (IntMap Int), Int) -> STM ()) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([TVar (IntMap Int)] -> [Int] -> [(TVar (IntMap Int), Int)]
forall a b. [a] -> [b] -> [(a, b)]
zip [TVar (IntMap Int)]
vIndices [Int]
indices) (((TVar (IntMap Int), Int) -> STM ()) -> STM ())
-> ((TVar (IntMap Int), Int) -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ \(TVar (IntMap Int)
v, Int
x) -> TVar (IntMap Int) -> (IntMap Int -> IntMap Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (IntMap Int)
v ((IntMap Int -> IntMap Int) -> STM ())
-> (IntMap Int -> IntMap Int) -> STM ()
forall a b. (a -> b) -> a -> b
$ Int -> Int -> IntMap Int -> IntMap Int
forall a. Int -> a -> IntMap a -> IntMap a
IM.insert (Int -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
x) Int
i
TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
vCount (Int -> STM ()) -> Int -> STM ()
forall a b. (a -> b) -> a -> b
$! Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
IO ()
self
let indices :: HashMap IndexName (TVar (IntMap Int))
indices = [(IndexName, TVar (IntMap Int))]
-> HashMap IndexName (TVar (IntMap Int))
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HM.fromList ([(IndexName, TVar (IntMap Int))]
-> HashMap IndexName (TVar (IntMap Int)))
-> [(IndexName, TVar (IntMap Int))]
-> HashMap IndexName (TVar (IntMap Int))
forall a b. (a -> b) -> a -> b
$ [IndexName]
-> [TVar (IntMap Int)] -> [(IndexName, TVar (IntMap Int))]
forall a b. [a] -> [b] -> [(a, b)]
zip (Vector IndexName -> [IndexName]
forall a. Vector a -> [a]
V.toList Vector IndexName
indexNames) [TVar (IntMap Int)]
vIndices
TVar Activity
vActivity <- IO Double
getMonotonicTime IO Double -> (Double -> IO (TVar Activity)) -> IO (TVar Activity)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Activity -> IO (TVar Activity)
forall a. a -> IO (TVar a)
newTVarIO (Activity -> IO (TVar Activity))
-> (Double -> Activity) -> Double -> IO (TVar Activity)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Double -> Activity
forall a b. a -> Either a b
Left
Stream -> IO Stream
forall (m :: * -> *) a. Monad m => a -> m a
return Stream :: FilePath
-> TVar (IntMap Int)
-> Vector IndexName
-> HashMap IndexName (TVar (IntMap Int))
-> TVar Int
-> TVar StreamStatus
-> ThreadId
-> Handle
-> Handle
-> TVar Activity
-> Stream
Stream{ streamPath :: FilePath
streamPath = FilePath
path, Handle
ThreadId
TVar Int
TVar Activity
TVar (IntMap Int)
TVar StreamStatus
HashMap IndexName (TVar (IntMap Int))
Vector IndexName
vActivity :: TVar Activity
indices :: HashMap IndexName (TVar (IntMap Int))
followThread :: ThreadId
indexHandle :: Handle
vCount :: TVar Int
vStatus :: TVar StreamStatus
vOffsets :: TVar (IntMap Int)
indexNames :: Vector IndexName
payloadHandle :: Handle
vActivity :: TVar Activity
payloadHandle :: Handle
indexHandle :: Handle
followThread :: ThreadId
vStatus :: TVar StreamStatus
vCount :: TVar Int
indices :: HashMap IndexName (TVar (IntMap Int))
indexNames :: Vector IndexName
vOffsets :: TVar (IntMap Int)
..}
where
logFollower :: [FilePath] -> IO ()
logFollower = Handle -> FilePath -> IO ()
hPutStrLn Handle
stderr (FilePath -> IO ())
-> ([FilePath] -> FilePath) -> [FilePath] -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [FilePath] -> FilePath
unwords ([FilePath] -> FilePath)
-> ([FilePath] -> [FilePath]) -> [FilePath] -> FilePath
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (:) FilePath
"[follower]"
type QueryResult = ((Int, Int)
, (Int, Int))
range :: Int
-> Int
-> RequestType
-> IM.IntMap Int
-> (Bool, QueryResult)
range :: Int -> Int -> RequestType -> IntMap Int -> (Bool, QueryResult)
range Int
begin Int
end RequestType
rt IntMap Int
allOffsets = case RequestType
rt of
RequestType
AllItems -> (Bool
ready, ((Int, Int)
firstItem, (Int, Int)
-> (((Int, Int), IntMap Int) -> (Int, Int))
-> Maybe ((Int, Int), IntMap Int)
-> (Int, Int)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Int, Int)
firstItem ((Int, Int), IntMap Int) -> (Int, Int)
forall a b. (a, b) -> a
fst (Maybe ((Int, Int), IntMap Int) -> (Int, Int))
-> Maybe ((Int, Int), IntMap Int) -> (Int, Int)
forall a b. (a -> b) -> a -> b
$ IntMap Int -> Maybe ((Int, Int), IntMap Int)
forall a. IntMap a -> Maybe ((Int, a), IntMap a)
IM.maxViewWithKey IntMap Int
body))
RequestType
LastItem -> case IntMap Int -> Maybe ((Int, Int), IntMap Int)
forall a. IntMap a -> Maybe ((Int, a), IntMap a)
IM.maxViewWithKey IntMap Int
body of
Maybe ((Int, Int), IntMap Int)
Nothing -> (Bool
False, ((Int, Int)
zero, (Int, Int)
zero))
Just ((Int, Int)
ofs', IntMap Int
r) -> case IntMap Int -> Maybe ((Int, Int), IntMap Int)
forall a. IntMap a -> Maybe ((Int, a), IntMap a)
IM.maxViewWithKey (IntMap Int -> IntMap Int -> IntMap Int
forall a. IntMap a -> IntMap a -> IntMap a
IM.union IntMap Int
left IntMap Int
r) of
Just ((Int, Int)
ofs, IntMap Int
_) -> (Bool
ready, ((Int, Int)
ofs, (Int, Int)
ofs'))
Maybe ((Int, Int), IntMap Int)
Nothing -> (Bool
ready, ((Int, Int)
zero, (Int, Int)
ofs'))
where
zero :: (Int, Int)
zero = (-Int
1, Int
0)
ready :: Bool
ready = Maybe Int -> Bool
forall a. Maybe a -> Bool
isJust Maybe Int
lastItem Bool -> Bool -> Bool
|| Bool -> Bool
not (IntMap Int -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null IntMap Int
cont)
(IntMap Int
wing, Maybe Int
lastItem, IntMap Int
cont) = Int -> IntMap Int -> (IntMap Int, Maybe Int, IntMap Int)
forall a. Int -> IntMap a -> (IntMap a, Maybe a, IntMap a)
IM.splitLookup Int
end IntMap Int
allOffsets
(IntMap Int
left, IntMap Int
body) = Int -> IntMap Int -> (IntMap Int, IntMap Int)
forall a. Int -> IntMap a -> (IntMap a, IntMap a)
splitR Int
begin (IntMap Int -> (IntMap Int, IntMap Int))
-> IntMap Int -> (IntMap Int, IntMap Int)
forall a b. (a -> b) -> a -> b
$ (IntMap Int -> IntMap Int)
-> (Int -> IntMap Int -> IntMap Int)
-> Maybe Int
-> IntMap Int
-> IntMap Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe IntMap Int -> IntMap Int
forall a. a -> a
id (Int -> Int -> IntMap Int -> IntMap Int
forall a. Int -> a -> IntMap a -> IntMap a
IM.insert Int
end) Maybe Int
lastItem IntMap Int
wing
firstItem :: (Int, Int)
firstItem = (Int, Int)
-> (((Int, Int), IntMap Int) -> (Int, Int))
-> Maybe ((Int, Int), IntMap Int)
-> (Int, Int)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Int, Int)
zero ((Int, Int), IntMap Int) -> (Int, Int)
forall a b. (a, b) -> a
fst (Maybe ((Int, Int), IntMap Int) -> (Int, Int))
-> Maybe ((Int, Int), IntMap Int) -> (Int, Int)
forall a b. (a -> b) -> a -> b
$ IntMap Int -> Maybe ((Int, Int), IntMap Int)
forall a. IntMap a -> Maybe ((Int, a), IntMap a)
IM.maxViewWithKey IntMap Int
left
splitR :: Int -> IM.IntMap a -> (IM.IntMap a, IM.IntMap a)
splitR :: Int -> IntMap a -> (IntMap a, IntMap a)
splitR Int
i IntMap a
m = let (IntMap a
l, Maybe a
p, IntMap a
r) = Int -> IntMap a -> (IntMap a, Maybe a, IntMap a)
forall a. Int -> IntMap a -> (IntMap a, Maybe a, IntMap a)
IM.splitLookup Int
i IntMap a
m in (IntMap a
l, (IntMap a -> IntMap a)
-> (a -> IntMap a -> IntMap a) -> Maybe a -> IntMap a -> IntMap a
forall b a. b -> (a -> b) -> Maybe a -> b
maybe IntMap a -> IntMap a
forall a. a -> a
id (Int -> a -> IntMap a -> IntMap a
forall a. Int -> a -> IntMap a -> IntMap a
IM.insert Int
i) Maybe a
p IntMap a
r)
data FranzReader = FranzReader
{ FranzReader -> WatchManager
watchManager :: FS.WatchManager
, FranzReader
-> TVar (HashMap FranzDirectory (HashMap StreamName Stream))
vStreams :: TVar (HM.HashMap FranzDirectory (HM.HashMap StreamName Stream))
}
data ReaperState = ReaperState
{
ReaperState -> Int
prunedStreams :: !Int
, ReaperState -> Int
totalStreams :: !Int
}
reaper :: Double
-> Double
-> FranzReader -> IO ()
reaper :: Double -> Double -> FranzReader -> IO ()
reaper Double
int Double
life FranzReader{TVar (HashMap FranzDirectory (HashMap StreamName Stream))
WatchManager
vStreams :: TVar (HashMap FranzDirectory (HashMap StreamName Stream))
watchManager :: WatchManager
vStreams :: FranzReader
-> TVar (HashMap FranzDirectory (HashMap StreamName Stream))
watchManager :: FranzReader -> WatchManager
..} = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Double
now <- IO Double
getMonotonicTime
let shouldPrune :: Either Double b -> Bool
shouldPrune (Left Double
t) = Double
now Double -> Double -> Double
forall a. Num a => a -> a -> a
- Double
t Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
>= Double
life
shouldPrune Either Double b
_ = Bool
False
tryPrune :: FranzDirectory -> StreamName -> STM (Maybe Stream)
tryPrune :: FranzDirectory -> StreamName -> STM (Maybe Stream)
tryPrune FranzDirectory
mPath StreamName
sPath = MaybeT STM Stream -> STM (Maybe Stream)
forall (m :: * -> *) a. MaybeT m a -> m (Maybe a)
runMaybeT (MaybeT STM Stream -> STM (Maybe Stream))
-> MaybeT STM Stream -> STM (Maybe Stream)
forall a b. (a -> b) -> a -> b
$ do
HashMap FranzDirectory (HashMap StreamName Stream)
currentAllStreams <- STM (HashMap FranzDirectory (HashMap StreamName Stream))
-> MaybeT STM (HashMap FranzDirectory (HashMap StreamName Stream))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (STM (HashMap FranzDirectory (HashMap StreamName Stream))
-> MaybeT STM (HashMap FranzDirectory (HashMap StreamName Stream)))
-> STM (HashMap FranzDirectory (HashMap StreamName Stream))
-> MaybeT STM (HashMap FranzDirectory (HashMap StreamName Stream))
forall a b. (a -> b) -> a -> b
$ TVar (HashMap FranzDirectory (HashMap StreamName Stream))
-> STM (HashMap FranzDirectory (HashMap StreamName Stream))
forall a. TVar a -> STM a
readTVar TVar (HashMap FranzDirectory (HashMap StreamName Stream))
vStreams
HashMap StreamName Stream
currentStreams <- STM (Maybe (HashMap StreamName Stream))
-> MaybeT STM (HashMap StreamName Stream)
forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT (STM (Maybe (HashMap StreamName Stream))
-> MaybeT STM (HashMap StreamName Stream))
-> (Maybe (HashMap StreamName Stream)
-> STM (Maybe (HashMap StreamName Stream)))
-> Maybe (HashMap StreamName Stream)
-> MaybeT STM (HashMap StreamName Stream)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe (HashMap StreamName Stream)
-> STM (Maybe (HashMap StreamName Stream))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (HashMap StreamName Stream)
-> MaybeT STM (HashMap StreamName Stream))
-> Maybe (HashMap StreamName Stream)
-> MaybeT STM (HashMap StreamName Stream)
forall a b. (a -> b) -> a -> b
$ FranzDirectory
-> HashMap FranzDirectory (HashMap StreamName Stream)
-> Maybe (HashMap StreamName Stream)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup FranzDirectory
mPath HashMap FranzDirectory (HashMap StreamName Stream)
currentAllStreams
Stream
currentStream <- STM (Maybe Stream) -> MaybeT STM Stream
forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT (STM (Maybe Stream) -> MaybeT STM Stream)
-> (Maybe Stream -> STM (Maybe Stream))
-> Maybe Stream
-> MaybeT STM Stream
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe Stream -> STM (Maybe Stream)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Stream -> MaybeT STM Stream)
-> Maybe Stream -> MaybeT STM Stream
forall a b. (a -> b) -> a -> b
$ StreamName -> HashMap StreamName Stream -> Maybe Stream
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup StreamName
sPath HashMap StreamName Stream
currentStreams
Activity
currentAct <- STM Activity -> MaybeT STM Activity
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (STM Activity -> MaybeT STM Activity)
-> STM Activity -> MaybeT STM Activity
forall a b. (a -> b) -> a -> b
$ TVar Activity -> STM Activity
forall a. TVar a -> STM a
readTVar (Stream -> TVar Activity
vActivity Stream
currentStream)
Bool -> MaybeT STM ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard (Bool -> MaybeT STM ()) -> Bool -> MaybeT STM ()
forall a b. (a -> b) -> a -> b
$ Activity -> Bool
forall b. Either Double b -> Bool
shouldPrune Activity
currentAct
let newStreams :: HashMap StreamName Stream
newStreams = StreamName
-> HashMap StreamName Stream -> HashMap StreamName Stream
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
HM.delete StreamName
sPath HashMap StreamName Stream
currentStreams
STM () -> MaybeT STM ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (STM () -> MaybeT STM ())
-> (HashMap FranzDirectory (HashMap StreamName Stream) -> STM ())
-> HashMap FranzDirectory (HashMap StreamName Stream)
-> MaybeT STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar (HashMap FranzDirectory (HashMap StreamName Stream))
-> HashMap FranzDirectory (HashMap StreamName Stream) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (HashMap FranzDirectory (HashMap StreamName Stream))
vStreams (HashMap FranzDirectory (HashMap StreamName Stream)
-> MaybeT STM ())
-> HashMap FranzDirectory (HashMap StreamName Stream)
-> MaybeT STM ()
forall a b. (a -> b) -> a -> b
$ if HashMap StreamName Stream -> Bool
forall k v. HashMap k v -> Bool
HM.null HashMap StreamName Stream
newStreams
then FranzDirectory
-> HashMap FranzDirectory (HashMap StreamName Stream)
-> HashMap FranzDirectory (HashMap StreamName Stream)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
HM.delete FranzDirectory
mPath HashMap FranzDirectory (HashMap StreamName Stream)
currentAllStreams
else FranzDirectory
-> HashMap StreamName Stream
-> HashMap FranzDirectory (HashMap StreamName Stream)
-> HashMap FranzDirectory (HashMap StreamName Stream)
forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
HM.insert FranzDirectory
mPath HashMap StreamName Stream
newStreams HashMap FranzDirectory (HashMap StreamName Stream)
currentAllStreams
Stream -> MaybeT STM Stream
forall (f :: * -> *) a. Applicative f => a -> f a
pure Stream
currentStream
HashMap FranzDirectory (HashMap StreamName Stream)
allStreams <- TVar (HashMap FranzDirectory (HashMap StreamName Stream))
-> IO (HashMap FranzDirectory (HashMap StreamName Stream))
forall a. TVar a -> IO a
readTVarIO TVar (HashMap FranzDirectory (HashMap StreamName Stream))
vStreams
ReaperState
stats <- (StateT ReaperState IO () -> ReaperState -> IO ReaperState)
-> ReaperState -> StateT ReaperState IO () -> IO ReaperState
forall a b c. (a -> b -> c) -> b -> a -> c
flip StateT ReaperState IO () -> ReaperState -> IO ReaperState
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m s
execStateT (Int -> Int -> ReaperState
ReaperState Int
0 Int
0) (StateT ReaperState IO () -> IO ReaperState)
-> StateT ReaperState IO () -> IO ReaperState
forall a b. (a -> b) -> a -> b
$
[(FranzDirectory, HashMap StreamName Stream)]
-> ((FranzDirectory, HashMap StreamName Stream)
-> StateT ReaperState IO ())
-> StateT ReaperState IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (HashMap FranzDirectory (HashMap StreamName Stream)
-> [(FranzDirectory, HashMap StreamName Stream)]
forall k v. HashMap k v -> [(k, v)]
HM.toList HashMap FranzDirectory (HashMap StreamName Stream)
allStreams) (((FranzDirectory, HashMap StreamName Stream)
-> StateT ReaperState IO ())
-> StateT ReaperState IO ())
-> ((FranzDirectory, HashMap StreamName Stream)
-> StateT ReaperState IO ())
-> StateT ReaperState IO ()
forall a b. (a -> b) -> a -> b
$ \(FranzDirectory
mPath, HashMap StreamName Stream
streams) ->
[(StreamName, Stream)]
-> ((StreamName, Stream) -> StateT ReaperState IO ())
-> StateT ReaperState IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (HashMap StreamName Stream -> [(StreamName, Stream)]
forall k v. HashMap k v -> [(k, v)]
HM.toList HashMap StreamName Stream
streams) (((StreamName, Stream) -> StateT ReaperState IO ())
-> StateT ReaperState IO ())
-> ((StreamName, Stream) -> StateT ReaperState IO ())
-> StateT ReaperState IO ()
forall a b. (a -> b) -> a -> b
$ \(StreamName
sPath, Stream
stream) -> do
(ReaperState -> ReaperState) -> StateT ReaperState IO ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((ReaperState -> ReaperState) -> StateT ReaperState IO ())
-> (ReaperState -> ReaperState) -> StateT ReaperState IO ()
forall a b. (a -> b) -> a -> b
$ \ReaperState
s -> ReaperState
s { totalStreams :: Int
totalStreams = ReaperState -> Int
totalStreams ReaperState
s Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1 }
Activity
snapAct <- IO Activity -> StateT ReaperState IO Activity
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (IO Activity -> StateT ReaperState IO Activity)
-> IO Activity -> StateT ReaperState IO Activity
forall a b. (a -> b) -> a -> b
$ TVar Activity -> IO Activity
forall a. TVar a -> IO a
readTVarIO (Stream -> TVar Activity
vActivity Stream
stream)
Bool -> StateT ReaperState IO () -> StateT ReaperState IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Activity -> Bool
forall b. Either Double b -> Bool
shouldPrune Activity
snapAct) (StateT ReaperState IO () -> StateT ReaperState IO ())
-> StateT ReaperState IO () -> StateT ReaperState IO ()
forall a b. (a -> b) -> a -> b
$ do
Maybe Stream
deletedStream'm <- IO (Maybe Stream) -> StateT ReaperState IO (Maybe Stream)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (IO (Maybe Stream) -> StateT ReaperState IO (Maybe Stream))
-> (STM (Maybe Stream) -> IO (Maybe Stream))
-> STM (Maybe Stream)
-> StateT ReaperState IO (Maybe Stream)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (Maybe Stream) -> IO (Maybe Stream)
forall a. STM a -> IO a
atomically (STM (Maybe Stream) -> StateT ReaperState IO (Maybe Stream))
-> STM (Maybe Stream) -> StateT ReaperState IO (Maybe Stream)
forall a b. (a -> b) -> a -> b
$ FranzDirectory -> StreamName -> STM (Maybe Stream)
tryPrune FranzDirectory
mPath StreamName
sPath
Maybe Stream
-> (Stream -> StateT ReaperState IO ()) -> StateT ReaperState IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe Stream
deletedStream'm ((Stream -> StateT ReaperState IO ()) -> StateT ReaperState IO ())
-> (Stream -> StateT ReaperState IO ()) -> StateT ReaperState IO ()
forall a b. (a -> b) -> a -> b
$ \Stream
prunedStream -> do
IO () -> StateT ReaperState IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (IO () -> StateT ReaperState IO ())
-> IO () -> StateT ReaperState IO ()
forall a b. (a -> b) -> a -> b
$ Stream -> IO ()
closeStream Stream
prunedStream
(ReaperState -> ReaperState) -> StateT ReaperState IO ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((ReaperState -> ReaperState) -> StateT ReaperState IO ())
-> (ReaperState -> ReaperState) -> StateT ReaperState IO ()
forall a b. (a -> b) -> a -> b
$ \ReaperState
s -> ReaperState
s { prunedStreams :: Int
prunedStreams = ReaperState -> Int
prunedStreams ReaperState
s Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1 }
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ReaperState -> Int
prunedStreams ReaperState
stats Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Handle -> FilePath -> IO ()
hPutStrLn Handle
stderr (FilePath -> IO ()) -> FilePath -> IO ()
forall a b. (a -> b) -> a -> b
$ [FilePath] -> FilePath
unwords
[ FilePath
"[reaper] closed"
, Int -> FilePath
forall a. Show a => a -> FilePath
show (ReaperState -> Int
prunedStreams ReaperState
stats)
, FilePath
"out of"
, Int -> FilePath
forall a. Show a => a -> FilePath
show (ReaperState -> Int
totalStreams ReaperState
stats)
]
Int -> IO ()
threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Double -> Int
forall a b. (RealFrac a, Integral b) => a -> b
floor (Double -> Int) -> Double -> Int
forall a b. (a -> b) -> a -> b
$ Double
int Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1e6
withFranzReader :: (FranzReader -> IO ()) -> IO ()
withFranzReader :: (FranzReader -> IO ()) -> IO ()
withFranzReader = IO FranzReader
-> (FranzReader -> IO ()) -> (FranzReader -> IO ()) -> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO FranzReader
newFranzReader FranzReader -> IO ()
closeFranzReader
newFranzReader :: IO FranzReader
newFranzReader :: IO FranzReader
newFranzReader = do
TVar (HashMap FranzDirectory (HashMap StreamName Stream))
vStreams <- HashMap FranzDirectory (HashMap StreamName Stream)
-> IO (TVar (HashMap FranzDirectory (HashMap StreamName Stream)))
forall a. a -> IO (TVar a)
newTVarIO HashMap FranzDirectory (HashMap StreamName Stream)
forall k v. HashMap k v
HM.empty
WatchManager
watchManager <- IO WatchManager
FS.startManager
FranzReader -> IO FranzReader
forall (f :: * -> *) a. Applicative f => a -> f a
pure FranzReader :: WatchManager
-> TVar (HashMap FranzDirectory (HashMap StreamName Stream))
-> FranzReader
FranzReader{TVar (HashMap FranzDirectory (HashMap StreamName Stream))
WatchManager
watchManager :: WatchManager
vStreams :: TVar (HashMap FranzDirectory (HashMap StreamName Stream))
vStreams :: TVar (HashMap FranzDirectory (HashMap StreamName Stream))
watchManager :: WatchManager
..}
closeFranzReader :: FranzReader -> IO ()
closeFranzReader :: FranzReader -> IO ()
closeFranzReader FranzReader{TVar (HashMap FranzDirectory (HashMap StreamName Stream))
WatchManager
vStreams :: TVar (HashMap FranzDirectory (HashMap StreamName Stream))
watchManager :: WatchManager
vStreams :: FranzReader
-> TVar (HashMap FranzDirectory (HashMap StreamName Stream))
watchManager :: FranzReader -> WatchManager
..} = do
WatchManager -> IO ()
FS.stopManager WatchManager
watchManager
TVar (HashMap FranzDirectory (HashMap StreamName Stream))
-> IO (HashMap FranzDirectory (HashMap StreamName Stream))
forall a. TVar a -> IO a
readTVarIO TVar (HashMap FranzDirectory (HashMap StreamName Stream))
vStreams IO (HashMap FranzDirectory (HashMap StreamName Stream))
-> (HashMap FranzDirectory (HashMap StreamName Stream) -> IO ())
-> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (HashMap StreamName Stream -> IO ())
-> HashMap FranzDirectory (HashMap StreamName Stream) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ((Stream -> IO ()) -> HashMap StreamName Stream -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Stream -> IO ()
closeStream)
newtype FranzPrefix = FranzPrefix { FranzPrefix -> FilePath
unFranzPrefix :: FilePath } deriving (FranzPrefix -> FranzPrefix -> Bool
(FranzPrefix -> FranzPrefix -> Bool)
-> (FranzPrefix -> FranzPrefix -> Bool) -> Eq FranzPrefix
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: FranzPrefix -> FranzPrefix -> Bool
$c/= :: FranzPrefix -> FranzPrefix -> Bool
== :: FranzPrefix -> FranzPrefix -> Bool
$c== :: FranzPrefix -> FranzPrefix -> Bool
Eq, Eq FranzPrefix
Eq FranzPrefix
-> (Int -> FranzPrefix -> Int)
-> (FranzPrefix -> Int)
-> Hashable FranzPrefix
Int -> FranzPrefix -> Int
FranzPrefix -> Int
forall a. Eq a -> (Int -> a -> Int) -> (a -> Int) -> Hashable a
hash :: FranzPrefix -> Int
$chash :: FranzPrefix -> Int
hashWithSalt :: Int -> FranzPrefix -> Int
$chashWithSalt :: Int -> FranzPrefix -> Int
$cp1Hashable :: Eq FranzPrefix
Hashable)
newtype FranzDirectory = FranzDirectory FilePath deriving (FranzDirectory -> FranzDirectory -> Bool
(FranzDirectory -> FranzDirectory -> Bool)
-> (FranzDirectory -> FranzDirectory -> Bool) -> Eq FranzDirectory
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: FranzDirectory -> FranzDirectory -> Bool
$c/= :: FranzDirectory -> FranzDirectory -> Bool
== :: FranzDirectory -> FranzDirectory -> Bool
$c== :: FranzDirectory -> FranzDirectory -> Bool
Eq, Eq FranzDirectory
Eq FranzDirectory
-> (Int -> FranzDirectory -> Int)
-> (FranzDirectory -> Int)
-> Hashable FranzDirectory
Int -> FranzDirectory -> Int
FranzDirectory -> Int
forall a. Eq a -> (Int -> a -> Int) -> (a -> Int) -> Hashable a
hash :: FranzDirectory -> Int
$chash :: FranzDirectory -> Int
hashWithSalt :: Int -> FranzDirectory -> Int
$chashWithSalt :: Int -> FranzDirectory -> Int
$cp1Hashable :: Eq FranzDirectory
Hashable)
getFranzDirectory :: FranzPrefix -> FranzDirectory -> FilePath
getFranzDirectory :: FranzPrefix -> FranzDirectory -> FilePath
getFranzDirectory (FranzPrefix FilePath
prefix) (FranzDirectory FilePath
dir) = FilePath
prefix FilePath -> FilePath -> FilePath
</> FilePath
dir
getFranzStreamPath :: FranzPrefix -> FranzDirectory -> StreamName -> FilePath
getFranzStreamPath :: FranzPrefix -> FranzDirectory -> StreamName -> FilePath
getFranzStreamPath FranzPrefix
prefix FranzDirectory
dir StreamName
name
= FranzPrefix -> FranzDirectory -> FilePath
getFranzDirectory FranzPrefix
prefix FranzDirectory
dir FilePath -> FilePath -> FilePath
</> StreamName -> FilePath
streamNameToPath StreamName
name
handleQuery :: FranzPrefix
-> FranzReader
-> FranzDirectory
-> Query
-> (FranzException -> IO r)
-> (Stream -> STM (Bool, QueryResult) -> IO r) -> IO r
handleQuery :: FranzPrefix
-> FranzReader
-> FranzDirectory
-> Query
-> (FranzException -> IO r)
-> (Stream -> STM (Bool, QueryResult) -> IO r)
-> IO r
handleQuery FranzPrefix
prefix FranzReader{TVar (HashMap FranzDirectory (HashMap StreamName Stream))
WatchManager
vStreams :: TVar (HashMap FranzDirectory (HashMap StreamName Stream))
watchManager :: WatchManager
vStreams :: FranzReader
-> TVar (HashMap FranzDirectory (HashMap StreamName Stream))
watchManager :: FranzReader -> WatchManager
..} FranzDirectory
dir (Query StreamName
name ItemRef
begin_ ItemRef
end_ RequestType
rt) FranzException -> IO r
onError Stream -> STM (Bool, QueryResult) -> IO r
cont
= IO (Either FranzException Stream)
-> (Either FranzException Stream -> IO ())
-> (Either FranzException Stream -> IO r)
-> IO r
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (IO Stream -> IO (Either FranzException Stream)
forall e a. Exception e => IO a -> IO (Either e a)
try IO Stream
acquire) ((FranzException -> IO ())
-> (Stream -> IO ()) -> Either FranzException Stream -> IO ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either FranzException -> IO ()
forall a. Monoid a => a
mempty Stream -> IO ()
removeActivity) ((Either FranzException Stream -> IO r) -> IO r)
-> (Either FranzException Stream -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \case
Left FranzException
err -> FranzException -> IO r
onError FranzException
err
Right stream :: Stream
stream@Stream{FilePath
Handle
ThreadId
TVar Int
TVar Activity
TVar (IntMap Int)
TVar StreamStatus
HashMap IndexName (TVar (IntMap Int))
Vector IndexName
vActivity :: TVar Activity
payloadHandle :: Handle
indexHandle :: Handle
followThread :: ThreadId
vStatus :: TVar StreamStatus
vCount :: TVar Int
indices :: HashMap IndexName (TVar (IntMap Int))
indexNames :: Vector IndexName
vOffsets :: TVar (IntMap Int)
streamPath :: FilePath
vActivity :: Stream -> TVar Activity
payloadHandle :: Stream -> Handle
indexHandle :: Stream -> Handle
followThread :: Stream -> ThreadId
vStatus :: Stream -> TVar StreamStatus
vCount :: Stream -> TVar Int
indices :: Stream -> HashMap IndexName (TVar (IntMap Int))
indexNames :: Stream -> Vector IndexName
vOffsets :: Stream -> TVar (IntMap Int)
streamPath :: Stream -> FilePath
..} -> Stream -> STM (Bool, QueryResult) -> IO r
cont Stream
stream (STM (Bool, QueryResult) -> IO r)
-> STM (Bool, QueryResult) -> IO r
forall a b. (a -> b) -> a -> b
$ do
TVar StreamStatus -> STM StreamStatus
forall a. TVar a -> STM a
readTVar TVar StreamStatus
vStatus STM StreamStatus -> (StreamStatus -> STM ()) -> STM ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
StreamStatus
Outdated -> STM ()
forall a. STM a
retry
StreamStatus
CaughtUp -> () -> STM ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
StreamStatus
Gone -> FranzException -> STM ()
forall e a. Exception e => e -> STM a
throwSTM (FranzException -> STM ()) -> FranzException -> STM ()
forall a b. (a -> b) -> a -> b
$ FilePath -> FranzException
StreamNotFound FilePath
streamPath
IntMap Int
allOffsets <- TVar (IntMap Int) -> STM (IntMap Int)
forall a. TVar a -> STM a
readTVar TVar (IntMap Int)
vOffsets
let finalOffset :: Int
finalOffset = case IntMap Int -> Maybe ((Int, Int), IntMap Int)
forall a. IntMap a -> Maybe ((Int, a), IntMap a)
IM.maxViewWithKey IntMap Int
allOffsets of
Just ((Int
k, Int
_), IntMap Int
_) -> Int
k Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
Maybe ((Int, Int), IntMap Int)
Nothing -> Int
0
let rotate :: Int -> Int
rotate Int
i
| Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
0 = Int
finalOffset Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
i
| Bool
otherwise = Int
i
Int
begin <- case ItemRef
begin_ of
BySeqNum Int
i -> Int -> STM Int
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> STM Int) -> Int -> STM Int
forall a b. (a -> b) -> a -> b
$ Int -> Int
rotate Int
i
ByIndex IndexName
index Int
val -> case IndexName
-> HashMap IndexName (TVar (IntMap Int))
-> Maybe (TVar (IntMap Int))
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup IndexName
index HashMap IndexName (TVar (IntMap Int))
indices of
Maybe (TVar (IntMap Int))
Nothing -> FranzException -> STM Int
forall e a. Exception e => e -> STM a
throwSTM (FranzException -> STM Int) -> FranzException -> STM Int
forall a b. (a -> b) -> a -> b
$ IndexName -> [IndexName] -> FranzException
IndexNotFound IndexName
index ([IndexName] -> FranzException) -> [IndexName] -> FranzException
forall a b. (a -> b) -> a -> b
$ HashMap IndexName (TVar (IntMap Int)) -> [IndexName]
forall k v. HashMap k v -> [k]
HM.keys HashMap IndexName (TVar (IntMap Int))
indices
Just TVar (IntMap Int)
v -> do
IntMap Int
m <- TVar (IntMap Int) -> STM (IntMap Int)
forall a. TVar a -> STM a
readTVar TVar (IntMap Int)
v
let (IntMap Int
_, IntMap Int
wing) = Int -> IntMap Int -> (IntMap Int, IntMap Int)
forall a. Int -> IntMap a -> (IntMap a, IntMap a)
splitR Int
val IntMap Int
m
Int -> STM Int
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> STM Int) -> Int -> STM Int
forall a b. (a -> b) -> a -> b
$! Int -> ((Int, IntMap Int) -> Int) -> Maybe (Int, IntMap Int) -> Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Int
forall a. Bounded a => a
maxBound (Int, IntMap Int) -> Int
forall a b. (a, b) -> a
fst (Maybe (Int, IntMap Int) -> Int) -> Maybe (Int, IntMap Int) -> Int
forall a b. (a -> b) -> a -> b
$ IntMap Int -> Maybe (Int, IntMap Int)
forall a. IntMap a -> Maybe (a, IntMap a)
IM.minView IntMap Int
wing
Int
end <- case ItemRef
end_ of
BySeqNum Int
i -> Int -> STM Int
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> STM Int) -> Int -> STM Int
forall a b. (a -> b) -> a -> b
$ Int -> Int
rotate Int
i
ByIndex IndexName
index Int
val -> case IndexName
-> HashMap IndexName (TVar (IntMap Int))
-> Maybe (TVar (IntMap Int))
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup IndexName
index HashMap IndexName (TVar (IntMap Int))
indices of
Maybe (TVar (IntMap Int))
Nothing -> FranzException -> STM Int
forall e a. Exception e => e -> STM a
throwSTM (FranzException -> STM Int) -> FranzException -> STM Int
forall a b. (a -> b) -> a -> b
$ IndexName -> [IndexName] -> FranzException
IndexNotFound IndexName
index ([IndexName] -> FranzException) -> [IndexName] -> FranzException
forall a b. (a -> b) -> a -> b
$ HashMap IndexName (TVar (IntMap Int)) -> [IndexName]
forall k v. HashMap k v -> [k]
HM.keys HashMap IndexName (TVar (IntMap Int))
indices
Just TVar (IntMap Int)
v -> do
IntMap Int
m <- TVar (IntMap Int) -> STM (IntMap Int)
forall a. TVar a -> STM a
readTVar TVar (IntMap Int)
v
let (IntMap Int
body, Maybe Int
lastItem, IntMap Int
_) = Int -> IntMap Int -> (IntMap Int, Maybe Int, IntMap Int)
forall a. Int -> IntMap a -> (IntMap a, Maybe a, IntMap a)
IM.splitLookup Int
val IntMap Int
m
let body' :: IntMap Int
body' = (IntMap Int -> IntMap Int)
-> (Int -> IntMap Int -> IntMap Int)
-> Maybe Int
-> IntMap Int
-> IntMap Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe IntMap Int -> IntMap Int
forall a. a -> a
id (Int -> Int -> IntMap Int -> IntMap Int
forall a. Int -> a -> IntMap a -> IntMap a
IM.insert Int
val) Maybe Int
lastItem IntMap Int
body
Int -> STM Int
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> STM Int) -> Int -> STM Int
forall a b. (a -> b) -> a -> b
$! Int -> ((Int, IntMap Int) -> Int) -> Maybe (Int, IntMap Int) -> Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Int
forall a. Bounded a => a
minBound (Int, IntMap Int) -> Int
forall a b. (a, b) -> a
fst (Maybe (Int, IntMap Int) -> Int) -> Maybe (Int, IntMap Int) -> Int
forall a b. (a -> b) -> a -> b
$ IntMap Int -> Maybe (Int, IntMap Int)
forall a. IntMap a -> Maybe (a, IntMap a)
IM.maxView IntMap Int
body'
(Bool, QueryResult) -> STM (Bool, QueryResult)
forall (m :: * -> *) a. Monad m => a -> m a
return ((Bool, QueryResult) -> STM (Bool, QueryResult))
-> (Bool, QueryResult) -> STM (Bool, QueryResult)
forall a b. (a -> b) -> a -> b
$! Int -> Int -> RequestType -> IntMap Int -> (Bool, QueryResult)
range Int
begin Int
end RequestType
rt IntMap Int
allOffsets
where
acquire :: IO Stream
acquire = IO (IO Stream) -> IO Stream
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO Stream) -> IO Stream) -> IO (IO Stream) -> IO Stream
forall a b. (a -> b) -> a -> b
$ STM (IO Stream) -> IO (IO Stream)
forall a. STM a -> IO a
atomically (STM (IO Stream) -> IO (IO Stream))
-> STM (IO Stream) -> IO (IO Stream)
forall a b. (a -> b) -> a -> b
$ do
HashMap FranzDirectory (HashMap StreamName Stream)
allStreams <- TVar (HashMap FranzDirectory (HashMap StreamName Stream))
-> STM (HashMap FranzDirectory (HashMap StreamName Stream))
forall a. TVar a -> STM a
readTVar TVar (HashMap FranzDirectory (HashMap StreamName Stream))
vStreams
let !streams :: HashMap StreamName Stream
streams = HashMap StreamName Stream
-> (HashMap StreamName Stream -> HashMap StreamName Stream)
-> Maybe (HashMap StreamName Stream)
-> HashMap StreamName Stream
forall b a. b -> (a -> b) -> Maybe a -> b
maybe HashMap StreamName Stream
forall a. Monoid a => a
mempty HashMap StreamName Stream -> HashMap StreamName Stream
forall a. a -> a
id (Maybe (HashMap StreamName Stream) -> HashMap StreamName Stream)
-> Maybe (HashMap StreamName Stream) -> HashMap StreamName Stream
forall a b. (a -> b) -> a -> b
$ FranzDirectory
-> HashMap FranzDirectory (HashMap StreamName Stream)
-> Maybe (HashMap StreamName Stream)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup FranzDirectory
dir HashMap FranzDirectory (HashMap StreamName Stream)
allStreams
case StreamName -> HashMap StreamName Stream -> Maybe Stream
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup StreamName
name HashMap StreamName Stream
streams of
Maybe Stream
Nothing -> IO Stream -> STM (IO Stream)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (IO Stream -> STM (IO Stream)) -> IO Stream -> STM (IO Stream)
forall a b. (a -> b) -> a -> b
$ IO Stream
-> (Stream -> IO ()) -> (Stream -> IO Stream) -> IO Stream
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracketOnError
(WatchManager -> FilePath -> IO Stream
createStream WatchManager
watchManager (FilePath -> IO Stream) -> FilePath -> IO Stream
forall a b. (a -> b) -> a -> b
$ FranzPrefix -> FranzDirectory -> StreamName -> FilePath
getFranzStreamPath FranzPrefix
prefix FranzDirectory
dir StreamName
name)
Stream -> IO ()
closeStream ((Stream -> IO Stream) -> IO Stream)
-> (Stream -> IO Stream) -> IO Stream
forall a b. (a -> b) -> a -> b
$ \Stream
s -> STM Stream -> IO Stream
forall a. STM a -> IO a
atomically (STM Stream -> IO Stream) -> STM Stream -> IO Stream
forall a b. (a -> b) -> a -> b
$ do
Stream -> STM ()
addActivity Stream
s
TVar (HashMap FranzDirectory (HashMap StreamName Stream))
-> (HashMap FranzDirectory (HashMap StreamName Stream)
-> HashMap FranzDirectory (HashMap StreamName Stream))
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (HashMap FranzDirectory (HashMap StreamName Stream))
vStreams ((HashMap FranzDirectory (HashMap StreamName Stream)
-> HashMap FranzDirectory (HashMap StreamName Stream))
-> STM ())
-> (HashMap FranzDirectory (HashMap StreamName Stream)
-> HashMap FranzDirectory (HashMap StreamName Stream))
-> STM ()
forall a b. (a -> b) -> a -> b
$ FranzDirectory
-> HashMap StreamName Stream
-> HashMap FranzDirectory (HashMap StreamName Stream)
-> HashMap FranzDirectory (HashMap StreamName Stream)
forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
HM.insert FranzDirectory
dir (HashMap StreamName Stream
-> HashMap FranzDirectory (HashMap StreamName Stream)
-> HashMap FranzDirectory (HashMap StreamName Stream))
-> HashMap StreamName Stream
-> HashMap FranzDirectory (HashMap StreamName Stream)
-> HashMap FranzDirectory (HashMap StreamName Stream)
forall a b. (a -> b) -> a -> b
$ StreamName
-> Stream -> HashMap StreamName Stream -> HashMap StreamName Stream
forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
HM.insert StreamName
name Stream
s HashMap StreamName Stream
streams
Stream -> STM Stream
forall (f :: * -> *) a. Applicative f => a -> f a
pure Stream
s
Just Stream
s -> do
Stream -> STM ()
addActivity Stream
s
IO Stream -> STM (IO Stream)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Stream -> IO Stream
forall (f :: * -> *) a. Applicative f => a -> f a
pure Stream
s)