{-# LANGUAGE FlexibleContexts #-}
module EventSource.Store.Internal.Iterator where
import Data.Int (Int32)
import Control.Concurrent.Async.Lifted (Async, wait)
import Control.Monad.Base (MonadBase, liftBase)
import Data.IORef.Lifted (IORef, atomicModifyIORef')
import EventSource.Types
data Batch' a =
Batch' { batchFrom :: !a
, batchSize :: !Int32
}
startFrom :: a -> Batch' a
startFrom from = Batch' from 500
data IteratorOverState a
= IteratorOverAvailable (Slice' a)
| IteratorOverClosed
data IteratorOverAction a
= IteratorOverEvent SavedEvent
| IteratorOverNextBatch a
| IteratorOverEndOfStream
iterateOver :: MonadBase IO m
=> IORef (IteratorOverState a)
-> (Batch' a -> m (Async (ReadStatus (Slice' a))))
-> m (Maybe SavedEvent)
iterateOver ref puller = go
where
go = do
action <- atomicModifyIORef' ref $ \st ->
case st of
IteratorOverAvailable slice ->
case sliceEvents slice of
e:es ->
let nextSlice = slice { sliceEvents = es }
nxtSt = IteratorOverAvailable nextSlice in
(nxtSt, IteratorOverEvent e)
[] | sliceEndOfStream slice
-> (IteratorOverClosed, IteratorOverEndOfStream)
| otherwise
-> let resp = IteratorOverNextBatch $
sliceNext slice in
(st, resp)
IteratorOverClosed -> (st, IteratorOverEndOfStream)
case action of
IteratorOverEvent e -> return $ Just e
IteratorOverEndOfStream -> return Nothing
IteratorOverNextBatch num -> do
w <- puller (startFrom num)
res <- liftBase $ wait w
case res of
ReadFailure _ -> do
atomicModifyIORef' ref $ \_ -> (IteratorOverClosed, ())
return Nothing
ReadSuccess slice -> do
let nxtSt = IteratorOverAvailable slice
atomicModifyIORef' ref $ \_ -> (nxtSt, ())
go