module Network.Nakadi.Config where
import Network.Nakadi.Internal.Prelude
import Control.Lens
import Control.Retry
import Network.HTTP.Client (Manager, ManagerSettings,
responseClose, responseOpen)
import Network.HTTP.Client.TLS (newTlsManagerWith,
tlsManagerSettings)
import Network.HTTP.Simple (httpLbs)
import qualified Network.Nakadi.Internal.Lenses as L
import Network.Nakadi.Internal.Types
defaultRetryPolicy :: MonadIO m => RetryPolicyM m
defaultRetryPolicy = fullJitterBackoff 2 <> limitRetries 5
newConfig' ::
(MonadIO m, MonadThrow m)
=> Manager
-> ConsumeParameters
-> Request
-> m Config
newConfig' manager consumeParameters request =
return Config { _consumeParameters = consumeParameters
, _manager = manager
, _requestTemplate = request
, _requestModifier = return
, _deserializationFailureCallback = Nothing
, _streamConnectCallback = Nothing
, _logFunc = Nothing
, _retryPolicy = defaultRetryPolicy
, _http = defaultHttpBackend
, _httpErrorCallback = Nothing
}
defaultHttpBackend :: HttpBackend
defaultHttpBackend =
HttpBackend { _httpLbs = httpLbs
, _responseOpen = responseOpen
, _responseClose = responseClose }
newConfig ::
(MonadIO m, MonadThrow m)
=> Maybe ManagerSettings
-> Request
-> m Config
newConfig mngrSettings request = do
manager <- newTlsManagerWith (fromMaybe tlsManagerSettings mngrSettings)
newConfig' manager defaultConsumeParameters request
setRequestModifier :: (Request -> IO Request) -> Config -> Config
setRequestModifier = (L.requestModifier .~)
setDeserializationFailureCallback ::
(ByteString -> Text -> IO ())
-> Config
-> Config
setDeserializationFailureCallback cb = L.deserializationFailureCallback .~ Just cb
setStreamConnectCallback :: StreamConnectCallback -> Config -> Config
setStreamConnectCallback cb = L.streamConnectCallback .~ Just cb
setHttpErrorCallback :: HttpErrorCallback -> Config -> Config
setHttpErrorCallback cb = L.httpErrorCallback .~ Just cb
setLogFunc :: LogFunc -> Config -> Config
setLogFunc logFunc = L.logFunc .~ Just logFunc
setRetryPolicy :: RetryPolicyM IO -> Config -> Config
setRetryPolicy = (L.retryPolicy .~)
setHttpBackend :: HttpBackend -> Config -> Config
setHttpBackend = (L.http .~)
defaultConsumeParameters :: ConsumeParameters
defaultConsumeParameters = ConsumeParameters
{ _maxUncommittedEvents = Nothing
, _batchLimit = Nothing
, _streamLimit = Nothing
, _batchFlushTimeout = Nothing
, _streamTimeout = Nothing
, _streamKeepAliveLimit = Nothing
, _flowId = Nothing
}
setMaxUncommittedEvents :: Int32 -> ConsumeParameters -> ConsumeParameters
setMaxUncommittedEvents n params = params & L.maxUncommittedEvents .~ Just n
setBatchLimit :: Int32 -> ConsumeParameters -> ConsumeParameters
setBatchLimit n params = params & L.batchLimit .~ Just n
setStreamLimit :: Int32 -> ConsumeParameters -> ConsumeParameters
setStreamLimit n params = params & L.streamLimit .~ Just n
setBatchFlushTimeout :: Int32 -> ConsumeParameters -> ConsumeParameters
setBatchFlushTimeout n params = params & L.batchFlushTimeout .~ Just n
setStreamTimeout :: Int32 -> ConsumeParameters -> ConsumeParameters
setStreamTimeout n params = params & L.streamTimeout .~ Just n
setStreamKeepAliveLimit :: Int32 -> ConsumeParameters -> ConsumeParameters
setStreamKeepAliveLimit n params = params & L.streamKeepAliveLimit .~ Just n
setFlowId :: Text -> ConsumeParameters -> ConsumeParameters
setFlowId flowId = L.flowId .~ Just flowId