{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Mismi.S3.Stream (
sizeRecursively
, read
, list
, liftAddressAndPrefix
, listRecursively
, liftAddress
) where
import qualified Control.Exception as CE
import Control.Lens ((.~), (^.), to)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Reader (ask)
import Control.Monad.Trans.Resource (ResourceT)
import qualified Control.Retry as Retry
import qualified Data.ByteString as BS
import Data.Conduit (ConduitT, (.|))
import qualified Data.Conduit as Conduit
import qualified Data.Conduit.List as DC
import Data.IORef (IORef)
import qualified Data.IORef as IORef
import Data.Maybe (maybeToList)
import Mismi.Amazonka (Env, send, paginate)
import Mismi.Control (AWS, throwOrRetry, handle404)
import Mismi.S3.Core.Data
import Mismi.S3.Internal (f', (+/), bytesRange)
import qualified Network.AWS as A
import Network.AWS.Data.Body (RsBody (..))
import Network.AWS.S3 (BucketName (..))
import Network.AWS.S3 (ListObjectsResponse)
import Network.AWS.S3 (ObjectKey (..))
import qualified Network.AWS.S3 as A
import P
takeObjectSizes :: Bucket -> ListObjectsResponse -> [Sized Address]
takeObjectSizes b lors =
with (lors ^. A.lorsContents) $ \o ->
let
ObjectKey k =
o ^. A.oKey
bytes =
Bytes $ fromIntegral (o ^. A.oSize)
in
Sized bytes $ Address b (Key k)
sizeRecursively :: Address -> ConduitT () (Sized Address) AWS ()
sizeRecursively (Address b (Key k)) =
let
cmd =
A.listObjects (BucketName $ unBucket b)
& A.loPrefix .~ Just k
in
paginate cmd .|
DC.mapFoldable (takeObjectSizes b)
countBytes ::
IORef Int64
-> ConduitT () BS.ByteString (ResourceT IO) ()
-> ConduitT () BS.ByteString (ResourceT IO) ()
countBytes ref src =
let
loop = do
mbs <- Conduit.await
case mbs of
Nothing ->
pure ()
Just bs -> do
liftIO $ IORef.modifyIORef' ref (+ fromIntegral (BS.length bs))
Conduit.yield bs
loop
in
src .| loop
readRange ::
Int
-> Int
-> Address
-> AWS (ConduitT () BS.ByteString (ResourceT IO) ())
readRange start end a = do
result <-
send $
f' A.getObject a
& A.goRange .~ Just (bytesRange start end)
pure $
result ^. A.gorsBody . to _streamBody
readRetry ::
Env
-> Retry.RetryStatus
-> IORef Int64
-> Int
-> Address
-> ConduitT () BS.ByteString (ResourceT IO) ()
readRetry env status0 startRef end a = do
start <- fmap fromIntegral . liftIO $ IORef.readIORef startRef
source <- A.runAWS env $ readRange start end a
Conduit.catchC (countBytes startRef source) $ \(err :: CE.SomeException) -> do
status <- liftIO $ throwOrRetry 5 err status0
readRetry env status startRef end a
read :: Address -> AWS (Maybe (ConduitT () BS.ByteString (ResourceT IO) ()))
read a = do
env <- ask
startRef <- liftIO $ IORef.newIORef 0
mend <-
(handle404 . send . f' A.headObject $ a) >>= pure . fmap (Bytes . fromIntegral) . maybe Nothing (^. A.horsContentLength) >>= pure . fmap fromIntegral
case mend of
Nothing ->
pure Nothing
Just 0 ->
pure $ Just mempty
Just end -> do
let
source =
readRetry env Retry.defaultRetryStatus startRef end a
pure . Just $ source
list :: Address -> ConduitT () Address AWS ()
list a@(Address (Bucket b) (Key k)) =
let
run s = s .| liftAddressAndPrefix a
in
run . paginate $
A.listObjects (BucketName b)
& A.loPrefix .~ Just ((+/) k)
& A.loDelimiter .~ Just '/'
liftAddressAndPrefix :: Address -> ConduitT ListObjectsResponse Address AWS ()
liftAddressAndPrefix a =
DC.mapFoldable (\r ->
fmap (\o ->
let ObjectKey t = o ^. A.oKey
in a { key = Key t }) (r ^. A.lorsContents)
<> join (forM (r ^. A.lorsCommonPrefixes) $ \cp ->
maybeToList . fmap (\cp' -> a { key = Key cp' }) $ cp ^. A.cpPrefix))
listRecursively :: Address -> ConduitT () Address AWS ()
listRecursively a@(Address (Bucket bn) (Key k)) =
paginate (A.listObjects (BucketName bn) & A.loPrefix .~ Just k) .| liftAddress a
liftAddress :: Address -> ConduitT ListObjectsResponse Address AWS ()
liftAddress a =
DC.mapFoldable (\r -> (\o -> a { key = Key (let ObjectKey t = o ^. A.oKey in t) }) <$> (r ^. A.lorsContents) )