{-# LANGUAGE CPP                 #-}
{-# LANGUAGE RecordWildCards     #-}
{-# LANGUAGE ScopedTypeVariables #-}

#include "inline.hs"

-- |
-- Module      : Streamly.Internal.Memory.ArrayStream
-- Copyright   : (c) 2019 Composewell Technologies
--
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- Combinators to efficiently manipulate streams of arrays.
--
module Streamly.Internal.Memory.ArrayStream
    (
    -- * Creation
      arraysOf

    -- * Flattening to elements
    , concat
    , concatRev
    , interpose
    , interposeSuffix
    , intercalateSuffix

    -- * Transformation
    , splitOn
    , splitOnSuffix
    , compact -- compact

    -- * Elimination
    , toArray
    )
where

import Control.Monad.IO.Class (MonadIO(..))
-- import Data.Functor.Identity (Identity)
import Data.Word (Word8)
import Foreign.ForeignPtr (withForeignPtr)
import Foreign.Ptr (minusPtr, plusPtr, castPtr)
import Foreign.Storable (Storable(..))
import Prelude hiding (length, null, last, map, (!!), read, concat)

import Streamly.Internal.Memory.Array.Types (Array(..), length)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Stream.StreamK.Type (IsStream)

import qualified Streamly.Internal.Memory.Array as A
import qualified Streamly.Internal.Memory.Array.Types as A
import qualified Streamly.Internal.Prelude as S
import qualified Streamly.Internal.Data.Stream.StreamD as D
import qualified Streamly.Internal.Data.Stream.Prelude as P

-- XXX efficiently compare two streams of arrays. Two streams can have chunks
-- of different sizes, we can handle that in the stream comparison abstraction.
-- This could be useful e.g. to fast compare whether two files differ.

-- | Convert a stream of arrays into a stream of their elements.
--
-- Same as the following but more efficient:
--
-- > concat = S.concatMap A.read
--
-- @since 0.7.0
{-# INLINE concat #-}
concat :: (IsStream t, MonadIO m, Storable a) => t m (Array a) -> t m a
-- concat m = D.fromStreamD $ A.flattenArrays (D.toStreamD m)
-- concat m = D.fromStreamD $ D.concatMap A.toStreamD (D.toStreamD m)
concat :: t m (Array a) -> t m a
concat t m (Array a)
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ Unfold m (Array a) a -> Stream m (Array a) -> Stream m a
forall (m :: * -> *) a b.
Monad m =>
Unfold m a b -> Stream m a -> Stream m b
D.concatMapU Unfold m (Array a) a
forall (m :: * -> *) a.
(Monad m, Storable a) =>
Unfold m (Array a) a
A.read (t m (Array a) -> Stream m (Array a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m (Array a)
m)

-- XXX should we have a reverseArrays API to reverse the stream of arrays
-- instead?
--
-- | Convert a stream of arrays into a stream of their elements reversing the
-- contents of each array before flattening.
--
-- @since 0.7.0
{-# INLINE concatRev #-}
concatRev :: (IsStream t, MonadIO m, Storable a) => t m (Array a) -> t m a
concatRev :: t m (Array a) -> t m a
concatRev t m (Array a)
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ Stream m (Array a) -> Stream m a
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Stream m (Array a) -> Stream m a
A.flattenArraysRev (t m (Array a) -> Stream m (Array a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m (Array a)
m)

-- | Flatten a stream of arrays after inserting the given element between
-- arrays.
--
-- /Internal/
{-# INLINE interpose #-}
interpose :: (MonadIO m, IsStream t, Storable a) => a -> t m (Array a) -> t m a
interpose :: a -> t m (Array a) -> t m a
interpose a
x = a -> Unfold m (Array a) a -> t m (Array a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) c b.
(IsStream t, Monad m) =>
c -> Unfold m b c -> t m b -> t m c
S.interpose a
x Unfold m (Array a) a
forall (m :: * -> *) a.
(Monad m, Storable a) =>
Unfold m (Array a) a
A.read

{-# INLINE intercalateSuffix #-}
intercalateSuffix :: (MonadIO m, IsStream t, Storable a)
    => Array a -> t m (Array a) -> t m a
intercalateSuffix :: Array a -> t m (Array a) -> t m a
intercalateSuffix Array a
arr = Array a -> Unfold m (Array a) a -> t m (Array a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b c.
(IsStream t, Monad m) =>
b -> Unfold m b c -> t m b -> t m c
S.intercalateSuffix Array a
arr Unfold m (Array a) a
forall (m :: * -> *) a.
(Monad m, Storable a) =>
Unfold m (Array a) a
A.read

-- | Flatten a stream of arrays appending the given element after each
-- array.
--
-- @since 0.7.0
{-# INLINE interposeSuffix #-}
interposeSuffix :: (MonadIO m, IsStream t, Storable a)
    => a -> t m (Array a) -> t m a
-- interposeSuffix x = D.fromStreamD . A.unlines x . D.toStreamD
interposeSuffix :: a -> t m (Array a) -> t m a
interposeSuffix a
x = a -> Unfold m (Array a) a -> t m (Array a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) c b.
(IsStream t, Monad m) =>
c -> Unfold m b c -> t m b -> t m c
S.interposeSuffix a
x Unfold m (Array a) a
forall (m :: * -> *) a.
(Monad m, Storable a) =>
Unfold m (Array a) a
A.read

-- | Split a stream of arrays on a given separator byte, dropping the separator
-- and coalescing all the arrays between two separators into a single array.
--
-- @since 0.7.0
{-# INLINE splitOn #-}
splitOn
    :: (IsStream t, MonadIO m)
    => Word8
    -> t m (Array Word8)
    -> t m (Array Word8)
splitOn :: Word8 -> t m (Array Word8) -> t m (Array Word8)
splitOn Word8
byte t m (Array Word8)
s =
    Stream m (Array Word8) -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m (Array Word8) -> t m (Array Word8))
-> Stream m (Array Word8) -> t m (Array Word8)
forall a b. (a -> b) -> a -> b
$ (Array Word8 -> m (Array Word8, Maybe (Array Word8)))
-> (Array Word8 -> Array Word8 -> m (Array Word8))
-> Stream m (Array Word8)
-> Stream m (Array Word8)
forall (m :: * -> *) (f :: * -> *) a.
Monad m =>
(f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a)
D.splitInnerBy (Word8 -> Array Word8 -> m (Array Word8, Maybe (Array Word8))
forall (m :: * -> *).
MonadIO m =>
Word8 -> Array Word8 -> m (Array Word8, Maybe (Array Word8))
A.breakOn Word8
byte) Array Word8 -> Array Word8 -> m (Array Word8)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Array a -> Array a -> m (Array a)
A.spliceTwo (Stream m (Array Word8) -> Stream m (Array Word8))
-> Stream m (Array Word8) -> Stream m (Array Word8)
forall a b. (a -> b) -> a -> b
$ t m (Array Word8) -> Stream m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m (Array Word8)
s

{-# INLINE splitOnSuffix #-}
splitOnSuffix
    :: (IsStream t, MonadIO m)
    => Word8
    -> t m (Array Word8)
    -> t m (Array Word8)
-- splitOn byte s = D.fromStreamD $ A.splitOn byte $ D.toStreamD s
splitOnSuffix :: Word8 -> t m (Array Word8) -> t m (Array Word8)
splitOnSuffix Word8
byte t m (Array Word8)
s =
    Stream m (Array Word8) -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m (Array Word8) -> t m (Array Word8))
-> Stream m (Array Word8) -> t m (Array Word8)
forall a b. (a -> b) -> a -> b
$ (Array Word8 -> m (Array Word8, Maybe (Array Word8)))
-> (Array Word8 -> Array Word8 -> m (Array Word8))
-> Stream m (Array Word8)
-> Stream m (Array Word8)
forall (m :: * -> *) (f :: * -> *) a.
(Monad m, Eq (f a), Monoid (f a)) =>
(f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a)
D.splitInnerBySuffix (Word8 -> Array Word8 -> m (Array Word8, Maybe (Array Word8))
forall (m :: * -> *).
MonadIO m =>
Word8 -> Array Word8 -> m (Array Word8, Maybe (Array Word8))
A.breakOn Word8
byte) Array Word8 -> Array Word8 -> m (Array Word8)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Array a -> Array a -> m (Array a)
A.spliceTwo (Stream m (Array Word8) -> Stream m (Array Word8))
-> Stream m (Array Word8) -> Stream m (Array Word8)
forall a b. (a -> b) -> a -> b
$ t m (Array Word8) -> Stream m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m (Array Word8)
s

-- | Coalesce adjacent arrays in incoming stream to form bigger arrays of a
-- maximum specified size in bytes.
--
-- @since 0.7.0
{-# INLINE compact #-}
compact :: (MonadIO m, Storable a)
    => Int -> SerialT m (Array a) -> SerialT m (Array a)
compact :: Int -> SerialT m (Array a) -> SerialT m (Array a)
compact Int
n SerialT m (Array a)
xs = Stream m (Array a) -> SerialT m (Array a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m (Array a) -> SerialT m (Array a))
-> Stream m (Array a) -> SerialT m (Array a)
forall a b. (a -> b) -> a -> b
$ Int -> Stream m (Array a) -> Stream m (Array a)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Stream m (Array a) -> Stream m (Array a)
A.packArraysChunksOf Int
n (SerialT m (Array a) -> Stream m (Array a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD SerialT m (Array a)
xs)

-- | @arraysOf n stream@ groups the elements in the input stream into arrays of
-- @n@ elements each.
--
-- Same as the following but more efficient:
--
-- > arraysOf n = S.chunksOf n (A.writeN n)
--
-- @since 0.7.0
{-# INLINE arraysOf #-}
arraysOf :: (IsStream t, MonadIO m, Storable a)
    => Int -> t m a -> t m (Array a)
arraysOf :: Int -> t m a -> t m (Array a)
arraysOf Int
n t m a
str =
    Stream m (Array a) -> t m (Array a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m (Array a) -> t m (Array a))
-> Stream m (Array a) -> t m (Array a)
forall a b. (a -> b) -> a -> b
$ Int -> Stream m a -> Stream m (Array a)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Stream m a -> Stream m (Array a)
A.fromStreamDArraysOf Int
n (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
str)

-- XXX Both of these implementations of splicing seem to perform equally well.
-- We need to perform benchmarks over a range of sizes though.

-- CAUTION! length must more than equal to lengths of all the arrays in the
-- stream.
{-# INLINE spliceArraysLenUnsafe #-}
spliceArraysLenUnsafe :: (MonadIO m, Storable a)
    => Int -> SerialT m (Array a) -> m (Array a)
spliceArraysLenUnsafe :: Int -> SerialT m (Array a) -> m (Array a)
spliceArraysLenUnsafe Int
len SerialT m (Array a)
buffered = do
    Array a
arr <- IO (Array a) -> m (Array a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Array a) -> m (Array a)) -> IO (Array a) -> m (Array a)
forall a b. (a -> b) -> a -> b
$ Int -> IO (Array a)
forall a. Storable a => Int -> IO (Array a)
A.newArray Int
len
    Ptr a
end <- (Ptr a -> Array a -> m (Ptr a))
-> Ptr a -> SerialT m (Array a) -> m (Ptr a)
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> b -> SerialT m a -> m b
S.foldlM' Ptr a -> Array a -> m (Ptr a)
forall (m :: * -> *) a a b.
MonadIO m =>
Ptr a -> Array a -> m (Ptr b)
writeArr (Array a -> Ptr a
forall a. Array a -> Ptr a
aEnd Array a
arr) SerialT m (Array a)
buffered
    Array a -> m (Array a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Array a -> m (Array a)) -> Array a -> m (Array a)
forall a b. (a -> b) -> a -> b
$ Array a
arr {aEnd :: Ptr a
aEnd = Ptr a
end}

    where

    writeArr :: Ptr a -> Array a -> m (Ptr b)
writeArr Ptr a
dst Array{Ptr a
ForeignPtr a
aBound :: forall a. Array a -> Ptr a
aStart :: forall a. Array a -> ForeignPtr a
aBound :: Ptr a
aEnd :: Ptr a
aStart :: ForeignPtr a
aEnd :: forall a. Array a -> Ptr a
..} =
        IO (Ptr b) -> m (Ptr b)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Ptr b) -> m (Ptr b)) -> IO (Ptr b) -> m (Ptr b)
forall a b. (a -> b) -> a -> b
$ ForeignPtr a -> (Ptr a -> IO (Ptr b)) -> IO (Ptr b)
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr a
aStart ((Ptr a -> IO (Ptr b)) -> IO (Ptr b))
-> (Ptr a -> IO (Ptr b)) -> IO (Ptr b)
forall a b. (a -> b) -> a -> b
$ \Ptr a
src -> do
                        let count :: Int
count = Ptr a
aEnd Ptr a -> Ptr a -> Int
forall a b. Ptr a -> Ptr b -> Int
`minusPtr` Ptr a
src
                        Ptr Word8 -> Ptr Word8 -> Int -> IO ()
A.memcpy (Ptr a -> Ptr Word8
forall a b. Ptr a -> Ptr b
castPtr Ptr a
dst) (Ptr a -> Ptr Word8
forall a b. Ptr a -> Ptr b
castPtr Ptr a
src) Int
count
                        Ptr b -> IO (Ptr b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Ptr b -> IO (Ptr b)) -> Ptr b -> IO (Ptr b)
forall a b. (a -> b) -> a -> b
$ Ptr a
dst Ptr a -> Int -> Ptr b
forall a b. Ptr a -> Int -> Ptr b
`plusPtr` Int
count

{-# INLINE _spliceArraysBuffered #-}
_spliceArraysBuffered :: (MonadIO m, Storable a)
    => SerialT m (Array a) -> m (Array a)
_spliceArraysBuffered :: SerialT m (Array a) -> m (Array a)
_spliceArraysBuffered SerialT m (Array a)
s = do
    SerialT m (Array a)
buffered <- (Array a -> SerialT m (Array a) -> SerialT m (Array a))
-> SerialT m (Array a)
-> SerialT m (Array a)
-> m (SerialT m (Array a))
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a b.
(Monad m, IsStream t) =>
(a -> b -> b) -> b -> t m a -> m b
P.foldr Array a -> SerialT m (Array a) -> SerialT m (Array a)
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
S.cons SerialT m (Array a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
S.nil SerialT m (Array a)
s
    Int
len <- SerialT m Int -> m Int
forall (m :: * -> *) a. (Monad m, Num a) => SerialT m a -> m a
S.sum ((Array a -> Int) -> SerialT m (Array a) -> SerialT m Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
S.map Array a -> Int
forall a. Storable a => Array a -> Int
length SerialT m (Array a)
buffered)
    Int -> SerialT m (Array a) -> m (Array a)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> SerialT m (Array a) -> m (Array a)
spliceArraysLenUnsafe Int
len SerialT m (Array a)
s

{-# INLINE spliceArraysRealloced #-}
spliceArraysRealloced :: forall m a. (MonadIO m, Storable a)
    => SerialT m (Array a) -> m (Array a)
spliceArraysRealloced :: SerialT m (Array a) -> m (Array a)
spliceArraysRealloced SerialT m (Array a)
s = do
    Array a
idst <- IO (Array a) -> m (Array a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Array a) -> m (Array a)) -> IO (Array a) -> m (Array a)
forall a b. (a -> b) -> a -> b
$ Int -> IO (Array a)
forall a. Storable a => Int -> IO (Array a)
A.newArray (a -> Int -> Int
forall a. Storable a => a -> Int -> Int
A.bytesToElemCount (a
forall a. HasCallStack => a
undefined :: a)
                                (Int -> Int
A.mkChunkSizeKB Int
4))

    Array a
arr <- (Array a -> Array a -> m (Array a))
-> Array a -> SerialT m (Array a) -> m (Array a)
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> b -> SerialT m a -> m b
S.foldlM' Array a -> Array a -> m (Array a)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Array a -> Array a -> m (Array a)
A.spliceWithDoubling Array a
idst SerialT m (Array a)
s
    IO (Array a) -> m (Array a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Array a) -> m (Array a)) -> IO (Array a) -> m (Array a)
forall a b. (a -> b) -> a -> b
$ Array a -> IO (Array a)
forall a. Storable a => Array a -> IO (Array a)
A.shrinkToFit Array a
arr

-- | Given a stream of arrays, splice them all together to generate a single
-- array. The stream must be /finite/.
--
-- @since 0.7.0
{-# INLINE toArray #-}
toArray :: (MonadIO m, Storable a) => SerialT m (Array a) -> m (Array a)
toArray :: SerialT m (Array a) -> m (Array a)
toArray = SerialT m (Array a) -> m (Array a)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
SerialT m (Array a) -> m (Array a)
spliceArraysRealloced
-- spliceArrays = _spliceArraysBuffered

-- exponentially increasing sizes of the chunks upto the max limit.
-- XXX this will be easier to implement with parsers/terminating folds
-- With this we should be able to reduce the number of chunks/allocations.
-- The reallocation/copy based toArray can also be implemented using this.
--
{-
{-# INLINE toArraysInRange #-}
toArraysInRange :: (IsStream t, MonadIO m, Storable a)
    => Int -> Int -> Fold m (Array a) b -> Fold m a b
toArraysInRange low high (Fold step initial extract) =
-}

{-
-- | Fold the input to a pure buffered stream (List) of arrays.
{-# INLINE _toArraysOf #-}
_toArraysOf :: (MonadIO m, Storable a)
    => Int -> Fold m a (SerialT Identity (Array a))
_toArraysOf n = FL.lchunksOf n (A.writeNF n) FL.toStream
-}