module Network.Nakadi.Config
( newConfig
, newConfigIO
, newConfigWithDedicatedManager
, setHttpManager
, setRequestModifier
, setDeserializationFailureCallback
, setStreamConnectCallback
, setHttpErrorCallback
, setLogFunc
, setRetryPolicy
, setMaxUncommittedEvents
, setBatchLimit
, setStreamLimit
, setBatchFlushTimeout
, setStreamTimeout
, setStreamKeepAliveLimit
, setFlowId
, defaultConsumeParameters
) where
import Network.Nakadi.Internal.Prelude
import Control.Lens
import Control.Retry
import Network.HTTP.Client (Manager,
ManagerSettings)
import Network.HTTP.Client.TLS (newTlsManagerWith)
import Network.Nakadi.Internal.Config
import Network.Nakadi.Internal.HttpBackendIO
import qualified Network.Nakadi.Internal.Lenses as L
import Network.Nakadi.Internal.Types
defaultRetryPolicy :: MonadIO m => RetryPolicyM m
defaultRetryPolicy = fullJitterBackoff 2 <> limitRetries 5
newConfig
:: Monad b
=> HttpBackend b
-> Request
-> Config b
newConfig httpBackend request =
Config { _consumeParameters = Nothing
, _manager = Nothing
, _requestTemplate = request
, _requestModifier = pure
, _deserializationFailureCallback = Nothing
, _streamConnectCallback = Nothing
, _logFunc = Nothing
, _retryPolicy = defaultRetryPolicy
, _http = httpBackend
, _httpErrorCallback = Nothing
, _flowId = Nothing
}
newConfigIO
:: Request
-> ConfigIO
newConfigIO = newConfig httpBackendIO
newConfigWithDedicatedManager ::
(MonadIO b, MonadMask b, MonadIO m)
=> ManagerSettings
-> Request
-> m (Config b)
newConfigWithDedicatedManager mngrSettings request = do
manager <- newTlsManagerWith mngrSettings
pure $ newConfig httpBackendIO request & setHttpManager manager
setHttpManager
:: Manager
-> Config m
-> Config m
setHttpManager mngr = L.manager .~ Just mngr
setRequestModifier ::
(Request -> m Request)
-> Config m
-> Config m
setRequestModifier = (L.requestModifier .~)
setDeserializationFailureCallback ::
(ByteString -> Text -> m ())
-> Config m
-> Config m
setDeserializationFailureCallback cb = L.deserializationFailureCallback .~ Just cb
setStreamConnectCallback ::
StreamConnectCallback m
-> Config m
-> Config m
setStreamConnectCallback cb = L.streamConnectCallback .~ Just cb
setHttpErrorCallback ::
HttpErrorCallback m
-> Config m
-> Config m
setHttpErrorCallback cb = L.httpErrorCallback .~ Just cb
setLogFunc ::
LogFunc m
-> Config m
-> Config m
setLogFunc logFunc = L.logFunc .~ Just logFunc
setRetryPolicy ::
RetryPolicyM IO
-> Config m
-> Config m
setRetryPolicy = (L.retryPolicy .~)
setFlowId
:: FlowId
-> Config m
-> Config m
setFlowId flowId = L.flowId .~ Just flowId
setMaxUncommittedEvents :: Int32 -> ConsumeParameters -> ConsumeParameters
setMaxUncommittedEvents n params = params & L.maxUncommittedEvents .~ Just n
setBatchLimit :: Int32 -> ConsumeParameters -> ConsumeParameters
setBatchLimit n params = params & L.batchLimit .~ Just n
setStreamLimit :: Int32 -> ConsumeParameters -> ConsumeParameters
setStreamLimit n params = params & L.streamLimit .~ Just n
setBatchFlushTimeout :: Int32 -> ConsumeParameters -> ConsumeParameters
setBatchFlushTimeout n params = params & L.batchFlushTimeout .~ Just n
setStreamTimeout :: Int32 -> ConsumeParameters -> ConsumeParameters
setStreamTimeout n params = params & L.streamTimeout .~ Just n
setStreamKeepAliveLimit :: Int32 -> ConsumeParameters -> ConsumeParameters
setStreamKeepAliveLimit n params = params & L.streamKeepAliveLimit .~ Just n