{-# LANGUAGE ScopedTypeVariables #-}
module Pipes.Aws.S3.Upload
(
toS3
, toS3'
, toS3WithManager
, ChunkSize
, defaultChunkSize
, EmptyS3UploadError(..)
, FailedUploadError(..)
, UploadId(..)
) where
import Control.Monad (when)
import Control.Exception (Exception)
import qualified Data.ByteString as BS
import qualified Data.Text as T
import Pipes
import Pipes.Safe
import qualified Pipes.Prelude as PP
import qualified Pipes.ByteString as PBS
import Control.Monad.Trans.Resource
import Network.HTTP.Client
import Network.HTTP.Client.TLS
import qualified Aws
import qualified Aws.S3 as S3
import Pipes.Aws.S3.Types
data EmptyS3UploadError = EmptyS3UploadError Bucket Object
deriving (Show)
instance Exception EmptyS3UploadError
newtype UploadId = UploadId T.Text
deriving (Show, Eq, Ord)
data FailedUploadError = FailedUploadError { failedUploadBucket :: Bucket
, failedUploadObject :: Object
, failedUploadException :: SomeException
, failedUploadId :: UploadId
}
deriving (Show)
instance Exception FailedUploadError
type ChunkSize = Int
defaultChunkSize :: ChunkSize
defaultChunkSize = 10 * 1024 * 1024
type ETag = T.Text
type PartN = Integer
toS3 :: forall m a. (MonadIO m, MonadCatch m)
=> ChunkSize -> Bucket -> Object
-> Producer BS.ByteString m a
-> m a
toS3 chunkSize bucket object consumer = do
cfg <- Aws.baseConfiguration
toS3' cfg Aws.defServiceConfig chunkSize bucket object consumer
toS3' :: forall m a. (MonadIO m, MonadCatch m)
=> Aws.Configuration
-> S3.S3Configuration Aws.NormalQuery
-> ChunkSize -> Bucket -> Object
-> Producer BS.ByteString m a
-> m a
toS3' cfg s3cfg chunkSize bucket object consumer = do
mgr <- liftIO $ newManager tlsManagerSettings
toS3WithManager mgr cfg s3cfg chunkSize bucket object consumer
toS3WithManager :: forall m a. (MonadIO m, MonadCatch m)
=> Manager
-> Aws.Configuration
-> S3.S3Configuration Aws.NormalQuery
-> ChunkSize -> Bucket -> Object
-> Producer BS.ByteString m a
-> m a
toS3WithManager mgr cfg s3cfg chunkSize bucket object consumer = do
let Bucket bucketName = bucket
Object objectName = object
resp1 <- liftIO $ runResourceT
$ Aws.pureAws cfg s3cfg mgr
$ S3.postInitiateMultipartUpload bucketName objectName
let uploadId = S3.imurUploadId resp1
abortUpload err
| Just (EmptyS3UploadError _ _) <- fromException err = throwM err
| otherwise = do
resp <- liftIO $ runResourceT $ Aws.aws cfg s3cfg mgr
$ S3.postAbortMultipartUpload bucketName objectName uploadId
case Aws.responseResult resp of
Left err' -> throwM err'
Right _ -> throwM $ FailedUploadError bucket object err (UploadId uploadId)
handleAll abortUpload $ do
let uploadPart :: (PartN, BS.ByteString) -> m (PartN, ETag)
uploadPart (partN, content) = do
resp <- liftIO $ runResourceT
$ Aws.pureAws cfg s3cfg mgr
$ S3.uploadPart bucketName objectName
partN uploadId (RequestBodyBS content)
return (partN, S3.uprETag resp)
(parts, res) <- PP.toListM' $ PBS.chunksOf' chunkSize consumer
>-> PP.filter (not . BS.null)
>-> enumFromP 1
>-> PP.mapM uploadPart
when (null parts)
$ throwM (EmptyS3UploadError bucket object)
_ <- liftIO $ runResourceT
$ Aws.pureAws cfg s3cfg mgr
$ S3.postCompleteMultipartUpload bucketName objectName uploadId parts
return res
enumFromP :: (Monad m, Enum i) => i -> Pipe a (i, a) m r
enumFromP = go
where
go i = await >>= \x -> yield (i, x) >> go (succ i)