{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Mismi.S3.Stream (
    sizeRecursively
  , read
  , list
  , liftAddressAndPrefix
  , listRecursively
  , liftAddress
  ) where

import qualified Control.Exception as CE
import           Control.Lens ((.~), (^.), to)
import           Control.Monad.IO.Class (liftIO)
import           Control.Monad.Reader (ask)
import           Control.Monad.Trans.Resource (ResourceT)
import qualified Control.Retry as Retry

import qualified Data.ByteString as BS
import           Data.Conduit (ConduitT, (.|))
import qualified Data.Conduit as Conduit
import qualified Data.Conduit.List as DC
import           Data.IORef (IORef)
import qualified Data.IORef as IORef
import           Data.Maybe (maybeToList)

import           Mismi.Amazonka (Env, send, paginate)
import           Mismi.Control (AWS, throwOrRetry, handle404)
import           Mismi.S3.Core.Data
import           Mismi.S3.Internal (f', (+/), bytesRange)

import qualified Network.AWS as A
import           Network.AWS.Data.Body (RsBody (..))
import           Network.AWS.S3 (BucketName (..))
import           Network.AWS.S3 (ListObjectsResponse)
import           Network.AWS.S3 (ObjectKey (..))
import qualified Network.AWS.S3 as A

import           P


takeObjectSizes :: Bucket -> ListObjectsResponse -> [Sized Address]
takeObjectSizes b lors =
  with (lors ^. A.lorsContents) $ \o ->
    let
      ObjectKey k =
        o ^. A.oKey

      bytes =
        Bytes $ fromIntegral (o ^. A.oSize)
        -- We shouldn't need this fromIntegral but amazonka incorrectly uses
        -- an Int instead of Int64 for sizes, we don't want to propagate this
        -- mistake.
        --
        -- See https://github.com/brendanhay/amazonka/issues/320
    in
      Sized bytes $ Address b (Key k)

sizeRecursively :: Address -> ConduitT () (Sized Address) AWS ()
sizeRecursively (Address b (Key k)) =
  let
    cmd =
      A.listObjects (BucketName $ unBucket b)
        & A.loPrefix .~ Just k
  in
    paginate cmd .|
      DC.mapFoldable (takeObjectSizes b)

countBytes ::
      IORef Int64
   -> ConduitT () BS.ByteString (ResourceT IO) ()
   -> ConduitT () BS.ByteString (ResourceT IO) ()
countBytes ref src =
  let
    loop = do
      mbs <- Conduit.await
      case mbs of
        Nothing ->
          pure ()
        Just bs -> do
          liftIO $ IORef.modifyIORef' ref (+ fromIntegral (BS.length bs))
          Conduit.yield bs
          loop
  in
    src .| loop

readRange ::
     Int
  -> Int
  -> Address
--  -> AWS (ConduitT BS.ByteString (ResourceT IO) (), ResourceT IO ())
  -> AWS (ConduitT () BS.ByteString (ResourceT IO) ())
readRange start end a = do
  result <-
    send $
      f' A.getObject a
        & A.goRange .~ Just (bytesRange start end)

--  liftResourceT $
  pure $
    result ^. A.gorsBody . to _streamBody

readRetry ::
     Env
  -> Retry.RetryStatus
--  -> IORef (ResourceT IO ())
  -> IORef Int64
  -> Int
  -> Address
  -> ConduitT () BS.ByteString (ResourceT IO) ()
readRetry env status0 startRef end a = do
  start <- fmap fromIntegral . liftIO $ IORef.readIORef startRef

  source <- A.runAWS env $ readRange start end a
--  liftIO $ IORef.writeIORef finalizerRef finalizer

  Conduit.catchC (countBytes startRef source) $ \(err :: CE.SomeException) -> do
    status <- liftIO $ throwOrRetry 5 err status0
    readRetry env status startRef end a

--newResumableSource :: Source m a -> m () -> ResumableSource m a
--newResumableSource (Conduit.ConduitM source) final =
--  Conduit.ResumableSource (source Conduit.Done) final


-- | WARNING : The returned @ResumableResource@ must be consumed within the
-- @AWS@ monad. Failure to do so can result in run time errors (recv on a bad
-- file descriptor) when the @MonadResouce@ cleans up the socket.
read :: Address -> AWS (Maybe (ConduitT () BS.ByteString (ResourceT IO) ()))
read a = do
  env <- ask
  startRef <- liftIO $ IORef.newIORef 0
  mend <-
    (handle404 . send . f' A.headObject $ a) >>= pure . fmap (Bytes . fromIntegral) . maybe Nothing (^. A.horsContentLength) >>= pure . fmap fromIntegral

  case mend of
    Nothing ->
      pure Nothing

    Just 0 ->
      pure $ Just mempty
--      pure . Just $ newResumableSource mempty (pure ())

    Just end -> do
--      finalizerRef <- liftIO $ IORef.newIORef (pure ())

      let
        source =
          readRetry env Retry.defaultRetryStatus startRef end a

--        final = do
--          finalizer <- liftIO $ IORef.readIORef finalizerRef
--          finalizer

      pure . Just $ source
--        newResumableSource source final

list :: Address -> ConduitT () Address AWS ()
list a@(Address (Bucket b) (Key k)) =
  let
    run s = s .| liftAddressAndPrefix a
  in
    run . paginate $
      A.listObjects (BucketName b)
        & A.loPrefix .~ Just ((+/) k)
        & A.loDelimiter .~ Just '/'

liftAddressAndPrefix :: Address -> ConduitT ListObjectsResponse Address AWS ()
liftAddressAndPrefix a =
  DC.mapFoldable (\r ->
       fmap (\o ->
         let ObjectKey t = o ^. A.oKey
         in a { key = Key t }) (r ^. A.lorsContents)
    <> join (forM (r ^. A.lorsCommonPrefixes) $ \cp ->
         maybeToList . fmap (\cp' -> a { key = Key cp' }) $ cp ^. A.cpPrefix))

listRecursively :: Address -> ConduitT () Address AWS ()
listRecursively a@(Address (Bucket bn) (Key k)) =
  paginate (A.listObjects (BucketName bn) & A.loPrefix .~ Just k) .| liftAddress a

liftAddress :: Address -> ConduitT ListObjectsResponse Address AWS ()
liftAddress a =
  DC.mapFoldable (\r -> (\o -> a { key = Key (let ObjectKey t = o ^. A.oKey in t) }) <$> (r ^. A.lorsContents) )