module Network.AWS.S3.StreamingUpload
(
streamUpload
, UploadLocation(..)
, concurrentUpload
, abortAllUploads
, module Network.AWS.S3.CreateMultipartUpload
, module Network.AWS.S3.CompleteMultipartUpload
, minimumChunkSize
) where
import Network.AWS (HasEnv(..),
LogLevel(..), MonadAWS,
getFileSize,
hashedBody, send,
toBody)
import Control.Monad.Trans.AWS (AWSConstraint)
import Network.AWS.Data.Crypto (Digest, SHA256,
hashFinalize, hashInit,
hashUpdate)
import Network.AWS.S3.AbortMultipartUpload
import Network.AWS.S3.CompleteMultipartUpload
import Network.AWS.S3.CreateMultipartUpload
import Network.AWS.S3.ListMultipartUploads
import Network.AWS.S3.Types (BucketName, cmuParts, completedMultipartUpload,
completedPart, muKey,
muUploadId)
import Network.AWS.S3.UploadPart
import Control.Applicative
import Control.Category ((>>>))
import Control.Monad (forM_, when, (>=>))
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Morph (lift)
import Control.Monad.Reader.Class (local)
import Control.Monad.Trans.Resource (MonadBaseControl,
MonadResource)
import Data.Conduit (Sink, await)
import Data.Conduit.List (sourceList)
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import Data.ByteString.Builder (stringUtf8)
import System.IO.MMap (mmapFileByteString)
import Control.DeepSeq (rnf)
import qualified Data.DList as D
import Data.List (unfoldr)
import Data.List.NonEmpty (nonEmpty)
import Control.Lens (set, view)
import Control.Lens.Operators
import Control.Monad.Catch (onException)
import Text.Printf (printf)
import Control.Concurrent (newQSem, signalQSem,
waitQSem)
import Control.Concurrent.Async.Lifted (forConcurrently)
import System.Mem (performGC)
import Network.HTTP.Client (defaultManagerSettings,
managerConnCount,
newManager)
import Network.HTTP.Client.Internal (mMaxConns)
type ChunkSize = Int
type NumThreads = Int
minimumChunkSize :: ChunkSize
minimumChunkSize = 6*1024*1024
streamUpload :: (MonadResource m, AWSConstraint r m, MonadAWS m)
=> Maybe ChunkSize
-> CreateMultipartUpload
-> Sink ByteString m CompleteMultipartUploadResponse
streamUpload mcs cmu = do
logger <- lift $ view envLogger
let logStr :: MonadIO m => String -> m ()
logStr = liftIO . logger Debug . stringUtf8
chunkSize = maybe minimumChunkSize (max minimumChunkSize) mcs
cmur <- lift $ send cmu
when (cmur ^. cmursResponseStatus /= 200) $
fail "Failed to create upload"
logStr "\n**** Created upload\n"
let Just upId = cmur ^. cmursUploadId
bucket = cmu ^. cmuBucket
key = cmu ^. cmuKey
go !bss !bufsize !ctx !partnum !completed = Data.Conduit.await >>= \mbs -> case mbs of
Just bs | l <- BS.length bs
, bufsize + l <= chunkSize ->
go (D.snoc bss bs) (bufsize + l) (hashUpdate ctx bs) partnum completed
| otherwise -> do
rs <- lift $ partUploader partnum (bufsize + BS.length bs)
(hashFinalize $ hashUpdate ctx bs)
(D.snoc bss bs)
logStr $ printf "\n**** Uploaded part %d size %d\n" partnum bufsize
let part = completedPart partnum <$> (rs ^. uprsETag)
#if MIN_VERSION_amazonka_s3(1,4,1)
!_ = rnf part
#endif
liftIO performGC
go empty 0 hashInit (partnum+1) . D.snoc completed $! part
Nothing -> lift $ do
prts <- if bufsize > 0
then do
rs <- partUploader partnum bufsize (hashFinalize ctx) bss
logStr $ printf "\n**** Uploaded (final) part %d size %d\n" partnum bufsize
let allParts = D.toList $ D.snoc completed $ completedPart partnum <$> (rs ^. uprsETag)
pure $ nonEmpty =<< sequence allParts
else do
logStr $ printf "\n**** No final data to upload\n"
pure $ nonEmpty =<< sequence (D.toList completed)
send $ completeMultipartUpload bucket key upId
& cMultipartUpload ?~ set cmuParts prts completedMultipartUpload
partUploader :: MonadAWS m => Int -> Int -> Digest SHA256 -> D.DList ByteString -> m UploadPartResponse
partUploader pnum size digest =
D.toList
>>> sourceList
>>> hashedBody digest (fromIntegral size)
>>> toBody
>>> uploadPart bucket key pnum upId
>>> send
>=> checkUpload
checkUpload :: (Monad m) => UploadPartResponse -> m UploadPartResponse
checkUpload upr = do
when (upr ^. uprsResponseStatus /= 200) $ fail "Failed to upload piece"
return upr
go D.empty 0 hashInit 1 D.empty `onException` lift (send (abortMultipartUpload bucket key upId))
data UploadLocation
= FP FilePath
| BS ByteString
concurrentUpload :: (AWSConstraint r m, MonadAWS m, MonadBaseControl IO m)
=> Maybe ChunkSize
-> Maybe NumThreads
-> UploadLocation
-> CreateMultipartUpload
-> m CompleteMultipartUploadResponse
concurrentUpload mcs mnt ud cmu = do
cmur <- send cmu
when (cmur ^. cmursResponseStatus /= 200) $
fail "Failed to create upload"
logger <- view envLogger
let logStr :: MonadIO m => String -> m ()
logStr = liftIO . logger Info . stringUtf8
let Just upId = cmur ^. cmursUploadId
bucket = cmu ^. cmuBucket
key = cmu ^. cmuKey
calcChunkSize :: Int -> Int
calcChunkSize len =
let chunkSize' = maybe minimumChunkSize (max minimumChunkSize) mcs
in if len `div` chunkSize' >= 10000 then len `div` 9999 else chunkSize'
mgr <- view envManager
let mConnCount = mMaxConns mgr
nThreads = maybe mConnCount (max 1) mnt
exec run = if maybe False (> mConnCount) mnt
then do
mgr' <- liftIO $ newManager defaultManagerSettings{managerConnCount = nThreads}
local (envManager .~ mgr') run
else run
exec $ flip onException (send (abortMultipartUpload bucket key upId)) $ do
sem <- liftIO $ newQSem nThreads
umrs <- case ud of
BS bs ->
let chunkSize = calcChunkSize $ BS.length bs
in forConcurrently (zip [1..] $ chunksOf chunkSize bs) $ \(partnum, b) -> do
liftIO $ waitQSem sem
logStr $ "Starting part: " ++ show partnum
umr <- send . uploadPart bucket key partnum upId . toBody $ b
logStr $ "Finished part: " ++ show partnum
liftIO $ signalQSem sem
pure $ completedPart partnum <$> (umr ^. uprsETag)
FP fp -> do
fsize <- liftIO $ getFileSize fp
let chunkSize = calcChunkSize $ fromIntegral fsize
(count,lst) = divMod (fromIntegral fsize) chunkSize
params = [(partnum, chunkSize*offset, size)
| partnum <- [1..]
| offset <- [0..count]
| size <- (chunkSize <$ [0..count1]) ++ [lst]
]
forConcurrently params $ \(partnum,off,size) -> do
liftIO $ waitQSem sem
b <- liftIO $ mmapFileByteString fp (Just (fromIntegral off,size))
umr <- send . uploadPart bucket key partnum upId . toBody $ b
liftIO $ signalQSem sem
pure $ completedPart partnum <$> (umr ^. uprsETag)
let prts = nonEmpty =<< sequence umrs
send $ completeMultipartUpload bucket key upId
& cMultipartUpload ?~ set cmuParts prts completedMultipartUpload
abortAllUploads :: (MonadAWS m) => BucketName -> m ()
abortAllUploads bucket = do
rs <- send (listMultipartUploads bucket)
forM_ (rs ^. lmursUploads) $ \mu -> do
let mki = (,) <$> mu ^. muKey <*> mu ^. muUploadId
case mki of
Nothing -> pure ()
Just (key,uid) -> send (abortMultipartUpload bucket key uid) >> pure ()
justWhen :: (a -> Bool) -> (a -> b) -> a -> Maybe b
justWhen f g a = if f a then Just (g a) else Nothing
nothingWhen :: (a -> Bool) -> (a -> b) -> a -> Maybe b
nothingWhen f = justWhen (not . f)
chunksOf :: Int -> BS.ByteString -> [BS.ByteString]
chunksOf x = unfoldr (nothingWhen BS.null (BS.splitAt x))