module Network.Nakadi.Internal.Http
( module Network.HTTP.Simple
, module Network.HTTP.Types.Status
, module Network.Nakadi.Internal.Types
, httpJsonBody
, httpJsonNoBody
, httpJsonBodyStream
, httpBuildRequest
, conduitDecode
, includeFlowId
, errorClientNotAuthenticated
, errorUnprocessableEntity
, errorAccessForbidden
, errorTooManyRequests
, errorBadRequest
, errorSubscriptionNotFound
, errorCursorAlreadyCommitted
, errorCursorResetInProgress
, errorEventTypeNotFound
, errorSubscriptionExistsAlready
, errorBatchPartiallySubmitted
, errorBatchNotSubmitted
, setRequestQueryParameters
) where
import Network.Nakadi.Internal.Prelude
import Conduit hiding (throwM)
import Control.Arrow
import Control.Lens
import Control.Monad (void)
import Data.Aeson
import qualified Data.ByteString.Lazy as ByteString.Lazy
import qualified Data.ByteString.Lazy as LB
import qualified Data.Text as Text
import Network.HTTP.Client (BodyReader,
HttpException (..),
HttpExceptionContent (..),
Manager, checkResponse,
responseBody, responseStatus)
import Network.HTTP.Simple hiding (Proxy)
import Network.HTTP.Types
import Network.HTTP.Types.Status
import qualified Network.Nakadi.Internal.Lenses as L
import Network.Nakadi.Internal.Types
import Network.Nakadi.Internal.Util
conduitDecode
:: forall a b m
. (FromJSON a, MonadNakadi b m)
=> Config b
-> ConduitM ByteString a m ()
conduitDecode config = awaitForever $ \ a -> case eitherDecodeStrict' a of
Right v -> yield v
Left err -> lift . nakadiLiftBase $ callback a (Text.pack err)
where
callback bs err = case config ^. L.deserializationFailureCallback of
Nothing -> throwM $ DeserializationFailure bs err
Just cb -> cb bs err
checkNakadiResponse :: Request -> Response BodyReader -> IO ()
checkNakadiResponse request response =
when (statusCode (responseStatus response) `div` 100 == 5)
$ throwIO
$ HttpExceptionRequest request (StatusCodeException (void response) mempty)
httpBuildRequest
:: MonadNakadi b m
=> (Request -> Request)
-> m Request
httpBuildRequest requestDef = do
config <- nakadiAsk
let request = requestDef (config ^. L.requestTemplate)
& \req -> req { checkResponse = checkNakadiResponse }
modifyRequest (config ^. L.requestModifier) request
modifyRequest
:: MonadNakadi b m => (Request -> b Request) -> Request -> m Request
modifyRequest rm request = tryAny (nakadiLiftBase (rm request)) >>= \case
Right modifiedRequest -> return modifiedRequest
Left exn -> throwIO $ RequestModificationException exn
httpExecRequest
:: MonadNakadi b m
=> (Request -> Request)
-> m (Response ByteString.Lazy.ByteString)
httpExecRequest requestDef = do
config <- nakadiAsk
req <- httpBuildRequest requestDef
nakadiLiftBase $ nakadiHttpLbs config req (config^.L.manager)
nakadiHttpLbs :: Config b
-> Request
-> Maybe Manager
-> b (Response LB.ByteString)
nakadiHttpLbs config req maybeMngr =
(config^.L.http.L.httpLbs) config req maybeMngr
httpExecRequestWithStatus
:: MonadNakadi b m
=> (Request -> Request)
-> m (Response ByteString.Lazy.ByteString, Status)
httpExecRequestWithStatus requestDef =
(identity &&& getResponseStatus) <$> httpExecRequest requestDef
httpJsonBody
:: (MonadNakadi b m, FromJSON a)
=> Status
-> [(Status, ByteString.Lazy.ByteString -> m NakadiException)]
-> (Request -> Request)
-> m a
httpJsonBody successStatus exceptionMap requestDef = do
(response, status) <- httpExecRequestWithStatus requestDef
if status == successStatus
then decodeThrow (getResponseBody response)
else case lookup status exceptionMap' of
Just mkExn -> mkExn (getResponseBody response) >>= throwM
Nothing -> throwM (UnexpectedResponse (void response))
where exceptionMap' = exceptionMap ++ defaultExceptionMap
httpJsonNoBody
:: MonadNakadi b m
=> Status
-> [(Status, ByteString.Lazy.ByteString -> m NakadiException)]
-> (Request -> Request)
-> m ()
httpJsonNoBody successStatus exceptionMap requestDef = do
(response, status) <- httpExecRequestWithStatus requestDef
unless (status == successStatus) $ case lookup status exceptionMap' of
Just mkExn -> mkExn (getResponseBody response) >>= throwIO
Nothing -> throwIO (UnexpectedResponse (void response))
where exceptionMap' = exceptionMap ++ defaultExceptionMap
nakadiHttpResponseOpen :: Config b
-> Request
-> Maybe Manager
-> b (Response (ConduitM () ByteString b ()))
nakadiHttpResponseOpen config req maybeMngr =
(config^.L.http.L.httpResponseOpen) config req maybeMngr
nakadiHttpResponseClose :: Config b -> Response () -> b ()
nakadiHttpResponseClose config rsp =
(config^.L.http.L.httpResponseClose) rsp
httpJsonBodyStream
:: forall b m r
. (MonadNakadi b m, MonadMask m)
=> Status
-> [(Status, ByteString.Lazy.ByteString -> m NakadiException)]
-> (Request -> Request)
-> (Response (ConduitM () ByteString m ()) -> m r)
-> m r
httpJsonBodyStream successStatus exceptionMap requestDef handler = do
config <- nakadiAsk
request <- httpBuildRequest requestDef
bracket (nakadiLiftBase $ nakadiHttpResponseOpen config request (config^.L.manager))
(nakadiLiftBase . nakadiHttpResponseClose config . void)
$ \response -> wrappedHandler config response
where wrappedHandler :: Config b -> Response (ConduitM () ByteString b ()) -> m r
wrappedHandler config response = do
let response_ = void response
status = responseStatus response_
bodySource = responseBody responseLifted
responseLifted = fmap (transPipe nakadiLiftBase) response
if status == successStatus
then do connectCallback config response_
handler responseLifted
else case lookup status exceptionMap' of
Just mkExn ->
conduitDrainToLazyByteString bodySource >>= mkExn >>= throwM
Nothing -> throwM (UnexpectedResponse response_)
exceptionMap' = exceptionMap ++ defaultExceptionMap
connectCallback config response =
nakadiLiftBase $ case config^.L.streamConnectCallback of
Just cb -> cb response
Nothing -> pure ()
setRequestQueryParameters :: [(ByteString, ByteString)] -> Request -> Request
setRequestQueryParameters parameters = setRequestQueryString parameters'
where parameters' = map (fmap Just) parameters
includeFlowId
:: Config b
-> Request
-> Request
includeFlowId config =
case config^.L.flowId of
Just flowId -> setRequestHeader "X-Flow-Id" [encodeUtf8 (unFlowId flowId)]
Nothing -> identity
defaultExceptionMap
:: MonadThrow m => [(Status, ByteString.Lazy.ByteString -> m NakadiException)]
defaultExceptionMap =
[ (status401, errorClientNotAuthenticated)
, (status403, errorAccessForbidden)
, (status400, errorBadRequest)
, (status422, errorUnprocessableEntity)
, (status409, errorConflict)
]
errorClientNotAuthenticated
:: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorClientNotAuthenticated s = ClientNotAuthenticated <$> decodeThrow s
errorConflict :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorConflict s = Conflict <$> decodeThrow s
errorUnprocessableEntity
:: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorUnprocessableEntity s = UnprocessableEntity <$> decodeThrow s
errorAccessForbidden
:: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorAccessForbidden s = AccessForbidden <$> decodeThrow s
errorTooManyRequests
:: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorTooManyRequests s = TooManyRequests <$> decodeThrow s
errorBadRequest
:: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorBadRequest s = BadRequest <$> decodeThrow s
errorSubscriptionNotFound
:: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorSubscriptionNotFound s = SubscriptionNotFound <$> decodeThrow s
errorCursorAlreadyCommitted
:: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorCursorAlreadyCommitted s = CursorAlreadyCommitted <$> decodeThrow s
errorCursorResetInProgress
:: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorCursorResetInProgress s = CursorResetInProgress <$> decodeThrow s
errorEventTypeNotFound
:: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorEventTypeNotFound s = EventTypeNotFound <$> decodeThrow s
errorSubscriptionExistsAlready
:: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorSubscriptionExistsAlready s = SubscriptionExistsAlready <$> decodeThrow s
errorBatchPartiallySubmitted
:: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorBatchPartiallySubmitted s = BatchPartiallySubmitted <$> decodeThrow s
errorBatchNotSubmitted
:: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorBatchNotSubmitted s = BatchNotSubmitted <$> decodeThrow s