{-# LANGUAGE OverloadedStrings   #-}
{-# LANGUAGE RecordWildCards     #-}
{-# LANGUAGE ScopedTypeVariables #-}
-- |
-- Module : Database.EventStore.Internal.Subscription.Catchup
-- Copyright : (C) 2017 Yorick Laupa
-- License : (see the file LICENSE)
-- Maintainer : Yorick Laupa <yo.eight@gmail.com>
-- Stability : provisional
-- Portability : non-portable
module Database.EventStore.Internal.Subscription.Catchup where

import Control.Monad.Fix

import Database.EventStore.Internal.Callback
import Database.EventStore.Internal.Communication
import Database.EventStore.Internal.Control
import Database.EventStore.Internal.Exec
import Database.EventStore.Internal.Operation
import Database.EventStore.Internal.Operation.Catchup
import Database.EventStore.Internal.Operation.Volatile
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Subscription.Api
import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Types

data Phase
  = CatchingUp
  | Running SubDetails
  | Closed (Either SomeException SubDropReason)

data CatchupTrack
  = CatchupRegular !(Maybe Int64)
  | CatchupAll !(Maybe Position)

catchupTrack :: CatchupState -> CatchupTrack
catchupTrack RegularCatchup{} = CatchupRegular Nothing
catchupTrack AllCatchup{}     = CatchupAll Nothing

receivedAlready :: CatchupTrack -> ResolvedEvent -> Bool
receivedAlready (CatchupRegular old) e =
  maybe False (resolvedEventOriginalEventNumber e <=) old
receivedAlready (CatchupAll old) e =
  fromMaybe False ((<=) <$> resolvedEventPosition e <*> old)

updateTrack :: ResolvedEvent -> CatchupTrack -> CatchupTrack
updateTrack e (CatchupRegular _) =
  CatchupRegular (Just $ resolvedEventOriginalEventNumber e)
updateTrack e (CatchupAll _) =
  CatchupAll (resolvedEventPosition e)

-- | This kind of subscription specifies a starting point, in the form of an
--   event number or transaction file position. The given function will be
--   called for events from the starting point until the end of the stream, and
--   then for subsequently written events.
--   For example, if a starting point of 50 is specified when a stream has 100
--   events in it, the subscriber can expect to see events 51 through 100, and
--   then any events subsequently written until such time as the subscription is
--   dropped or closed.
data CatchupSubscription =
  CatchupSubscription { _catchupExec   :: Exec
                      , _catchupStream :: StreamName
                      , _catchupPhase  :: TVar Phase
                      , _catchupTrack  :: TVar CatchupTrack
                      , _catchupNext   :: STM (Maybe ResolvedEvent)

instance Subscription CatchupSubscription where
  nextEventMaybeSTM = _catchupNext

  getSubscriptionDetailsSTM s = do
    p <- readTVar (_catchupPhase s)
    case p of
      Running details -> return details
      Closed r        -> throwClosed r
      _               -> retrySTM

  subscriptionStream = _catchupStream

  unsubscribe s = subUnsubscribe (_catchupExec s) s

streamName :: CatchupState -> StreamName
streamName (RegularCatchup stream _) = StreamName stream
streamName _                         = "$all"

streamText :: StreamName -> Text
streamText (StreamName s) = s
streamText _              = ""

newCatchupSubscription :: Exec
                       -> Bool
                       -> Maybe Int32
                       -> Maybe Credentials
                       -> CatchupState
                       -> IO CatchupSubscription
newCatchupSubscription exec tos batch cred state = do
  phaseVar <- newTVarIO CatchingUp
  queue    <- newTQueueIO
  track    <- newTVarIO $ catchupTrack state

  let stream = streamName state
      sub = CatchupSubscription exec stream phaseVar track $ do
        p       <- readTVar phaseVar
        isEmpty <- isEmptyTQueue queue
        if isEmpty
            case p of
              Closed r -> throwClosed r
              _        -> return Nothing
          else Just <$> readTQueue queue

      callback cb (Left e) =
        case fromException e of
          Just opE ->
            case opE of
              StreamNotFound{} -> do
                let op = volatile (streamText stream) tos cred
                publishWith exec (SubmitOperation cb op)
              _ -> atomically $ writeTVar phaseVar (Closed $ Left e)
          _ -> atomically $ writeTVar phaseVar (Closed $ Left e)
      callback _ (Right action) =
        case action of
          Confirmed details -> atomically $ writeTVar phaseVar (Running details)
          Dropped r ->
            atomically $ writeTVar phaseVar (Closed $ Right r)
          Submit e -> atomically $ do
            tracker <- readTVar track
            unless (receivedAlready tracker e) $ do
              writeTVar track (updateTrack e tracker)
              writeTQueue queue e
          ConnectionReset -> do
            tpe <- readTVarIO track
            let newState =
                  case tpe of
                    CatchupRegular old ->
                      case old of
                        Just n -> RegularCatchup (streamText stream) n
                        _      -> state
                    CatchupAll old ->
                      case old of
                        Just p -> AllCatchup p
                        _      -> state

                newOp = catchup (execSettings exec) newState tos batch cred

            newCb <- mfix $ \self -> newCallback (callback self)
            publishWith exec (SubmitOperation newCb newOp)

  cb <- mfix $ \self -> newCallback (callback self)
  let op = catchup (execSettings exec) state tos batch cred
  publishWith exec (SubmitOperation cb op)
  return sub

throwClosed :: Either SomeException SubDropReason -> STM a
throwClosed (Left e)  = throwSTM e
throwClosed (Right r) = throwSTM (SubscriptionClosed $ Just r)

-- | Non blocking version of `waitTillCatchup`.
hasCaughtUp :: CatchupSubscription -> IO Bool
hasCaughtUp sub = atomically $ hasCaughtUpSTM sub

-- | Waits until 'CatchupSubscription' subscription catch-up its stream.
waitTillCatchup :: CatchupSubscription -> IO ()
waitTillCatchup sub = atomically $ unlessM (hasCaughtUpSTM sub) retrySTM

-- | Like 'hasCaughtUp' but lives in 'STM' monad.
hasCaughtUpSTM :: CatchupSubscription -> STM Bool
hasCaughtUpSTM CatchupSubscription{..} = do
  p <- readTVar _catchupPhase
  case p of
    CatchingUp -> return False
    Running{}  -> return True
    Closed tpe ->
      case tpe of
        Left e  -> throwSTM e
        Right r -> throwSTM (SubscriptionClosed $ Just r)