module Network.Nakadi.Subscriptions.Events
( subscriptionProcessConduit
, subscriptionProcess
) where
import Network.Nakadi.Internal.Prelude
import Conduit hiding (throwM)
import Data.Aeson
import Data.Void
import Network.HTTP.Client (responseBody)
import Network.HTTP.Simple
import Network.HTTP.Types
import Network.Nakadi.Internal.Config
import Network.Nakadi.Internal.Conversions
import Network.Nakadi.Internal.Http
import qualified Network.Nakadi.Internal.Lenses as L
import Network.Nakadi.Subscriptions.Cursors
subscriptionProcess
:: ( MonadNakadi b m
, MonadMask m
, FromJSON a )
=> Maybe ConsumeParameters
-> SubscriptionId
-> (SubscriptionEventStreamBatch a -> m ())
-> m ()
subscriptionProcess maybeConsumeParameters subscriptionId processor =
subscriptionProcessConduit maybeConsumeParameters subscriptionId conduit
where conduit = iterMC processor
subscriptionProcessConduit
:: ( MonadNakadi b m
, MonadMask m
, FromJSON a
, L.HasNakadiSubscriptionCursor c )
=> Maybe ConsumeParameters
-> SubscriptionId
-> ConduitM (SubscriptionEventStreamBatch a) c m ()
-> m ()
subscriptionProcessConduit maybeConsumeParameters subscriptionId processor = do
config <- nakadiAsk
let consumeParams = fromMaybe defaultConsumeParameters maybeConsumeParameters
queryParams = buildSubscriptionConsumeQueryParameters consumeParams
httpJsonBodyStream ok200 [(status404, errorSubscriptionNotFound)]
(includeFlowId config
. setRequestPath path
. setRequestQueryParameters queryParams) $
handler config
where buildSubscriptionEventStream response =
case listToMaybe (getResponseHeader "X-Nakadi-StreamId" response) of
Just streamId ->
pure SubscriptionEventStream
{ _streamId = StreamId (decodeUtf8 streamId)
, _subscriptionId = subscriptionId }
Nothing ->
throwM StreamIdMissing
path = "/subscriptions/"
<> subscriptionIdToByteString subscriptionId
<> "/events"
handler config response = do
eventStream <- buildSubscriptionEventStream response
runConduit $
responseBody response
.| linesUnboundedAsciiC
.| conduitDecode config
.| processor
.| subscriptionSink eventStream
subscriptionSink ::
(MonadNakadi b m, L.HasNakadiSubscriptionCursor a)
=> SubscriptionEventStream
-> ConduitM a Void m ()
subscriptionSink eventStream =
awaitForever $ lift . subscriptionCursorCommit eventStream . (: [])