module Aws.Kinesis.Client.Common
( KinesisKit(..)
, kkConfiguration
, kkKinesisConfiguration
, kkManager
, runKinesis
, streamShardSource
, streamOpenShardSource
, shardIsOpen
) where
import qualified Aws
import qualified Aws.Core as Aws
import qualified Aws.Kinesis as Kin
import Control.Lens
import Control.Monad
import Control.Monad.Trans
import Control.Monad.Trans.Resource
import Data.Conduit
import qualified Data.Conduit.List as CL
import qualified Network.HTTP.Conduit as HC
import Prelude.Unicode
data KinesisKit
= KinesisKit
{ _kkConfiguration ∷ !Aws.Configuration
, _kkKinesisConfiguration ∷ !(Kin.KinesisConfiguration Aws.NormalQuery)
, _kkManager ∷ !HC.Manager
}
kkConfiguration ∷ Lens' KinesisKit Aws.Configuration
kkConfiguration = lens _kkConfiguration $ \kk cfg → kk { _kkConfiguration = cfg }
kkKinesisConfiguration ∷ Lens' KinesisKit (Kin.KinesisConfiguration Aws.NormalQuery)
kkKinesisConfiguration = lens _kkKinesisConfiguration $ \kk cfg → kk { _kkKinesisConfiguration = cfg }
kkManager ∷ Lens' KinesisKit HC.Manager
kkManager = lens _kkManager $ \kk mgr → kk { _kkManager = mgr }
runKinesis
∷ ( Aws.ServiceConfiguration req ~ Kin.KinesisConfiguration
, Aws.Transaction req resp
)
⇒ KinesisKit
→ req
→ IO resp
runKinesis KinesisKit{..} =
runResourceT ∘
Aws.pureAws
_kkConfiguration
_kkKinesisConfiguration
_kkManager
shardIsOpen
∷ Kin.Shard
→ Bool
shardIsOpen Kin.Shard{..} =
has _Nothing $ shardSequenceNumberRange ^. _2
fetchShardsConduit
∷ MonadIO m
⇒ KinesisKit
→ Kin.StreamName
→ Conduit (Maybe Kin.ShardId) m Kin.Shard
fetchShardsConduit kit streamName =
awaitForever $ \mshardId → do
let req = Kin.DescribeStream
{ Kin.describeStreamExclusiveStartShardId = mshardId
, Kin.describeStreamLimit = Nothing
, Kin.describeStreamStreamName = streamName
}
resp@(Kin.DescribeStreamResponse Kin.StreamDescription{..}) ←
liftIO $ runKinesis kit req
yield `mapM_` streamDescriptionShards
void ∘ traverse (leftover ∘ Just) $
Kin.describeStreamExclusiveStartShardId =<<
Aws.nextIteratedRequest req resp
return ()
streamShardSource
∷ MonadIO m
⇒ KinesisKit
→ Kin.StreamName
→ Source m Kin.Shard
streamShardSource kit streamName =
CL.sourceList [Nothing] $= fetchShardsConduit kit streamName
streamOpenShardSource
∷ MonadIO m
⇒ KinesisKit
→ Kin.StreamName
→ Source m Kin.Shard
streamOpenShardSource kit streamName =
flip mapOutputMaybe (streamShardSource kit streamName) $ \shard →
if shardIsOpen shard
then Just shard
else Nothing