module Network.Nakadi.EventTypes.Events
( eventsProcessConduit
, eventsProcess
, eventsPublish
) where
import Network.Nakadi.Internal.Prelude
import Conduit
import Data.Aeson
import qualified Data.ByteString.Lazy as ByteString.Lazy
import Data.Void
import Network.HTTP.Client (responseBody)
import Network.Nakadi.Internal.Config
import Network.Nakadi.Internal.Http
path :: EventTypeName -> ByteString
path eventTypeName =
"/event-types/"
<> encodeUtf8 (unEventTypeName eventTypeName)
<> "/events"
eventsProcess
:: ( MonadNakadi b m
, FromJSON a )
=> Maybe ConsumeParameters
-> EventTypeName
-> Maybe [Cursor]
-> (EventStreamBatch a -> m r)
-> m r
eventsProcess maybeConsumeParameters eventTypeName maybeCursors processor =
eventsProcess maybeConsumeParameters eventTypeName maybeCursors processor
eventsProcessConduit
:: ( MonadNakadi b m
, MonadMask m
, FromJSON a )
=> Maybe ConsumeParameters
-> EventTypeName
-> Maybe [Cursor]
-> ConduitM (EventStreamBatch a) Void m r
-> m r
eventsProcessConduit maybeConsumeParameters eventTypeName maybeCursors consumer = do
config <- nakadiAsk
let consumeParams = fromMaybe defaultConsumeParameters maybeConsumeParameters
queryParams = buildConsumeQueryParameters consumeParams
httpJsonBodyStream ok200 [ (status429, errorTooManyRequests)
, (status429, errorEventTypeNotFound) ]
(setRequestPath (path eventTypeName)
. includeFlowId config
. setRequestQueryParameters queryParams
. addCursors) $
handler config
where addCursors = case maybeCursors of
Just cursors -> let cursors' = ByteString.Lazy.toStrict (encode cursors)
in addRequestHeader "X-Nakadi-Cursors" cursors'
Nothing -> identity
handler config response =
responseBody response
.| linesUnboundedAsciiC
.| conduitDecode config
$$ consumer
eventsPublish
:: (MonadNakadi b m, ToJSON a)
=> EventTypeName
-> [a]
-> m ()
eventsPublish eventTypeName eventBatch = do
config <- nakadiAsk
httpJsonNoBody status200
[ (Status 207 "Multi-Status", errorBatchPartiallySubmitted)
, (status422, errorBatchNotSubmitted) ] $
(setRequestMethod "POST"
. includeFlowId config
. setRequestPath (path eventTypeName)
. setRequestBodyJSON eventBatch)