module Network.Nakadi.Subscriptions.Cursors
( subscriptionCursorCommit'
, subscriptionCursorCommit
, subscriptionCursors
, subscriptionCursorsReset
) where
import Network.Nakadi.Internal.Prelude
import Data.Aeson
import qualified Control.Exception.Safe as Safe
import Control.Lens
import qualified Data.HashMap.Lazy as HashMap
import Network.Nakadi.Internal.Conversions
import Network.Nakadi.Internal.Http
import Network.Nakadi.Internal.Lenses (HasNakadiSubscriptionCursor)
import qualified Network.Nakadi.Internal.Lenses as L
path :: SubscriptionId -> ByteString
path subscriptionId =
"/subscriptions/"
<> subscriptionIdToByteString subscriptionId
<> "/cursors"
subscriptionCursorCommit' ::
MonadNakadi b m
=> SubscriptionId
-> StreamId
-> SubscriptionCursorCommit
-> m ()
subscriptionCursorCommit' subscriptionId streamId cursors =
httpJsonNoBody status204 [(ok200, errorCursorAlreadyCommitted)]
(setRequestMethod "POST"
. addRequestHeader "X-Nakadi-StreamId" (encodeUtf8 (unStreamId streamId))
. setRequestBodyJSON cursors
. setRequestPath (path subscriptionId))
subscriptionCursorCommit ::
(MonadNakadi b m, MonadCatch m, HasNakadiSubscriptionCursor a)
=> SubscriptionEventStream
-> [a]
-> m ()
subscriptionCursorCommit SubscriptionEventStream { .. } as = do
Safe.catchJust
exceptionPredicate
(subscriptionCursorCommit' _subscriptionId _streamId cursorsCommit)
(const (return ()))
where exceptionPredicate = \case
CursorAlreadyCommitted _ -> Just ()
_ -> Nothing
cursors = map (^. L.subscriptionCursor) as
cursorsCommit = SubscriptionCursorCommit cursors
subscriptionCursors ::
MonadNakadi b m
=> SubscriptionId
-> m [SubscriptionCursor]
subscriptionCursors subscriptionId =
httpJsonBody ok200 []
(setRequestMethod "GET" . setRequestPath (path subscriptionId))
subscriptionCursorsReset ::
MonadNakadi b m
=> SubscriptionId
-> [SubscriptionCursorWithoutToken]
-> m ()
subscriptionCursorsReset subscriptionId cursors =
httpJsonNoBody status204 [ (status404, errorSubscriptionNotFound)
, (status409, errorCursorResetInProgress) ]
(setRequestMethod "PATCH"
. setRequestPath (path subscriptionId)
. setRequestBodyJSON (Object (HashMap.fromList [("items", toJSON cursors)])))