module Network.Nakadi.EventTypes.CursorsLag
( cursorsLag'
, cursorsLagR'
, cursorsLag
, cursorsLagR
) where
import Network.Nakadi.Internal.Prelude
import Control.Arrow
import Control.Lens
import qualified Data.Map.Strict as Map
import Network.Nakadi.Internal.Http
import qualified Network.Nakadi.Internal.Lenses as L
import Network.Nakadi.Internal.Util
path :: EventTypeName -> ByteString
path eventTypeName =
"/event-types/"
<> encodeUtf8 (unEventTypeName eventTypeName)
<> "/cursors-lag"
cursorsLag' ::
MonadNakadi m
=> Config
-> EventTypeName
-> [Cursor]
-> m [Partition]
cursorsLag' config eventTypeName cursors =
httpJsonBody config ok200 []
(setRequestMethod "POST"
. setRequestPath (path eventTypeName)
. setRequestBodyJSON cursors)
cursorsLagR' ::
MonadNakadiEnv r m
=> EventTypeName
-> [Cursor]
-> m [Partition]
cursorsLagR' eventTypeName cursors = do
config <- asks (view L.nakadiConfig)
cursorsLag' config eventTypeName cursors
cursorsLag ::
MonadNakadi m
=> Config
-> EventTypeName
-> Map PartitionName CursorOffset
-> m (Map PartitionName Int64)
cursorsLag config eventTypeName cursorsMap = do
partitionStats <- cursorsLag' config eventTypeName cursors
return $ partitionStats & map ((view L.partition &&& view L.unconsumedEvents) >>> sequenceSnd)
& catMaybes
& Map.fromList
where cursors = map (uncurry Cursor) (Map.toList cursorsMap)
cursorsLagR ::
MonadNakadiEnv r m
=> EventTypeName
-> Map PartitionName CursorOffset
-> m (Map PartitionName Int64)
cursorsLagR eventTypeName cursorsMap = do
config <- asks (view L.nakadiConfig)
cursorsLag config eventTypeName cursorsMap