{-# LANGUAGE FlexibleContexts #-}
--------------------------------------------------------------------------------
-- |
-- Module    :  EventSource.Store.Iterator
-- Copyright :  (C) 2018 Yorick Laupa
-- License   :  (see the file LICENSE)
-- Maintainer:  Yorick Laupa <yo.eight@gmail.com>
-- Stability :  experimental
-- Portability: non-portable
--
--------------------------------------------------------------------------------
module EventSource.Store.Internal.Iterator where

--------------------------------------------------------------------------------
import Data.Int (Int32)

--------------------------------------------------------------------------------
import Control.Concurrent.Async.Lifted (Async, wait)
import Control.Monad.Base (MonadBase, liftBase)
import Data.IORef.Lifted (IORef, atomicModifyIORef')

--------------------------------------------------------------------------------
import EventSource.Types

--------------------------------------------------------------------------------
-- | Represents batch information needed to read a stream.
data Batch' a =
  Batch' { batchFrom :: !a
         , batchSize :: !Int32
         }

--------------------------------------------------------------------------------
-- | Starts a 'Batch' from a given point. The batch size is set to default,
--   which is 500.
startFrom :: a -> Batch' a
startFrom from = Batch' from 500

--------------------------------------------------------------------------------
data IteratorOverState a
  = IteratorOverAvailable (Slice' a)
  | IteratorOverClosed

--------------------------------------------------------------------------------
data IteratorOverAction a
  = IteratorOverEvent SavedEvent
  | IteratorOverNextBatch a
  | IteratorOverEndOfStream

--------------------------------------------------------------------------------
iterateOver :: MonadBase IO m
            => IORef (IteratorOverState a)
            -> (Batch' a -> m (Async (ReadStatus (Slice' a))))
            -> m (Maybe SavedEvent)
iterateOver ref puller = go
  where
    go = do
      action <- atomicModifyIORef' ref $ \st ->
        case st of
          IteratorOverAvailable slice ->
            case sliceEvents slice of
              e:es ->
                let nextSlice = slice { sliceEvents = es }
                    nxtSt = IteratorOverAvailable nextSlice in
                (nxtSt, IteratorOverEvent e)
              [] | sliceEndOfStream slice
                   -> (IteratorOverClosed, IteratorOverEndOfStream)
                 | otherwise
                   -> let resp = IteratorOverNextBatch $
                                 sliceNext slice in
                     (st, resp)
          IteratorOverClosed -> (st, IteratorOverEndOfStream)

      case action of
        IteratorOverEvent e -> return $ Just e
        IteratorOverEndOfStream -> return Nothing
        IteratorOverNextBatch num -> do
          w <- puller (startFrom num)
          res <- liftBase $ wait w
          case res of
            ReadFailure _ -> do
              atomicModifyIORef' ref $ \_ -> (IteratorOverClosed, ())
              return Nothing
            ReadSuccess slice -> do
              let nxtSt = IteratorOverAvailable slice
              atomicModifyIORef' ref $ \_ -> (nxtSt, ())
              go