-- |
-- Module      : Streamly.Internal.Data.Array.Mut.Stream
-- Copyright   : (c) 2019 Composewell Technologies
-- License     : BSD3-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- Combinators to efficiently manipulate streams of mutable arrays.
--
module Streamly.Internal.Data.Array.Mut.Stream
    (
    -- * Generation
      chunksOf

    -- * Compaction
    , packArraysChunksOf
    , SpliceState (..)
    , lpackArraysChunksOf
    , compact
    , compactLE
    , compactEQ
    , compactGE
    )
where

#include "inline.hs"
#include "ArrayMacros.h"

import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad (when)
import Data.Bifunctor (first)
import Data.Proxy (Proxy(..))
import Streamly.Internal.Data.Unboxed (Unbox, sizeOf)
import Streamly.Internal.Data.Array.Mut.Type (MutArray(..))
import Streamly.Internal.Data.Fold.Type (Fold(..))
import Streamly.Internal.Data.Parser (ParseError)
import Streamly.Internal.Data.Stream.StreamD.Type (Stream)
import Streamly.Internal.Data.Tuple.Strict (Tuple'(..))

import qualified Streamly.Internal.Data.Array.Mut.Type as MArray
import qualified Streamly.Internal.Data.Fold.Type as FL
import qualified Streamly.Internal.Data.Stream.StreamD as D
import qualified Streamly.Internal.Data.Parser.ParserD as ParserD

-- | @chunksOf n stream@ groups the elements in the input stream into arrays of
-- @n@ elements each.
--
-- Same as the following but may be more efficient:
--
-- > chunksOf n = Stream.foldMany (MArray.writeN n)
--
-- /Pre-release/
{-# INLINE chunksOf #-}
chunksOf :: (MonadIO m, Unbox a)
    => Int -> Stream m a -> Stream m (MutArray a)
chunksOf :: Int -> Stream m a -> Stream m (MutArray a)
chunksOf = Int -> Stream m a -> Stream m (MutArray a)
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Stream m a -> Stream m (MutArray a)
MArray.chunksOf

-------------------------------------------------------------------------------
-- Compact
-------------------------------------------------------------------------------

data SpliceState s arr
    = SpliceInitial s
    | SpliceBuffering s arr
    | SpliceYielding arr (SpliceState s arr)
    | SpliceFinish

-- XXX This can be removed once compactLEFold/compactLE are implemented.
--
-- | This mutates the first array (if it has space) to append values from the
-- second one. This would work for immutable arrays as well because an
-- immutable array never has space so a new array is allocated instead of
-- mutating it.
--
-- | Coalesce adjacent arrays in incoming stream to form bigger arrays of a
-- maximum specified size. Note that if a single array is bigger than the
-- specified size we do not split it to fit. When we coalesce multiple arrays
-- if the size would exceed the specified size we do not coalesce therefore the
-- actual array size may be less than the specified chunk size.
--
-- @since 0.7.0
{-# INLINE_NORMAL packArraysChunksOf #-}
packArraysChunksOf :: (MonadIO m, Unbox a)
    => Int -> D.Stream m (MutArray a) -> D.Stream m (MutArray a)
packArraysChunksOf :: Int -> Stream m (MutArray a) -> Stream m (MutArray a)
packArraysChunksOf Int
n (D.Stream State StreamK m (MutArray a) -> s -> m (Step s (MutArray a))
step s
state) =
    (State StreamK m (MutArray a)
 -> SpliceState s (MutArray a)
 -> m (Step (SpliceState s (MutArray a)) (MutArray a)))
-> SpliceState s (MutArray a) -> Stream m (MutArray a)
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m (MutArray a)
-> SpliceState s (MutArray a)
-> m (Step (SpliceState s (MutArray a)) (MutArray a))
step' (s -> SpliceState s (MutArray a)
forall s arr. s -> SpliceState s arr
SpliceInitial s
state)

    where

    {-# INLINE_LATE step' #-}
    step' :: State StreamK m (MutArray a)
-> SpliceState s (MutArray a)
-> m (Step (SpliceState s (MutArray a)) (MutArray a))
step' State StreamK m (MutArray a)
gst (SpliceInitial s
st) = do
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
            -- XXX we can pass the module string from the higher level API
            [Char] -> m ()
forall a. HasCallStack => [Char] -> a
error ([Char] -> m ()) -> [Char] -> m ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Streamly.Internal.Data.Array.Mut.Type.packArraysChunksOf: the size of "
                 [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
"arrays [" [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
n [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
"] must be a natural number"
        Step s (MutArray a)
r <- State StreamK m (MutArray a) -> s -> m (Step s (MutArray a))
step State StreamK m (MutArray a)
gst s
st
        case Step s (MutArray a)
r of
            D.Yield MutArray a
arr s
s -> Step (SpliceState s (MutArray a)) (MutArray a)
-> m (Step (SpliceState s (MutArray a)) (MutArray a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SpliceState s (MutArray a)) (MutArray a)
 -> m (Step (SpliceState s (MutArray a)) (MutArray a)))
-> Step (SpliceState s (MutArray a)) (MutArray a)
-> m (Step (SpliceState s (MutArray a)) (MutArray a))
forall a b. (a -> b) -> a -> b
$
                let len :: Int
len = MutArray a -> Int
forall a. MutArray a -> Int
MArray.byteLength MutArray a
arr
                 in if Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n
                    then SpliceState s (MutArray a)
-> Step (SpliceState s (MutArray a)) (MutArray a)
forall s a. s -> Step s a
D.Skip (MutArray a
-> SpliceState s (MutArray a) -> SpliceState s (MutArray a)
forall s arr. arr -> SpliceState s arr -> SpliceState s arr
SpliceYielding MutArray a
arr (s -> SpliceState s (MutArray a)
forall s arr. s -> SpliceState s arr
SpliceInitial s
s))
                    else SpliceState s (MutArray a)
-> Step (SpliceState s (MutArray a)) (MutArray a)
forall s a. s -> Step s a
D.Skip (s -> MutArray a -> SpliceState s (MutArray a)
forall s arr. s -> arr -> SpliceState s arr
SpliceBuffering s
s MutArray a
arr)
            D.Skip s
s -> Step (SpliceState s (MutArray a)) (MutArray a)
-> m (Step (SpliceState s (MutArray a)) (MutArray a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SpliceState s (MutArray a)) (MutArray a)
 -> m (Step (SpliceState s (MutArray a)) (MutArray a)))
-> Step (SpliceState s (MutArray a)) (MutArray a)
-> m (Step (SpliceState s (MutArray a)) (MutArray a))
forall a b. (a -> b) -> a -> b
$ SpliceState s (MutArray a)
-> Step (SpliceState s (MutArray a)) (MutArray a)
forall s a. s -> Step s a
D.Skip (s -> SpliceState s (MutArray a)
forall s arr. s -> SpliceState s arr
SpliceInitial s
s)
            Step s (MutArray a)
D.Stop -> Step (SpliceState s (MutArray a)) (MutArray a)
-> m (Step (SpliceState s (MutArray a)) (MutArray a))
forall (m :: * -> *) a. Monad m => a -> m a
return Step (SpliceState s (MutArray a)) (MutArray a)
forall s a. Step s a
D.Stop

    step' State StreamK m (MutArray a)
gst (SpliceBuffering s
st MutArray a
buf) = do
        Step s (MutArray a)
r <- State StreamK m (MutArray a) -> s -> m (Step s (MutArray a))
step State StreamK m (MutArray a)
gst s
st
        case Step s (MutArray a)
r of
            D.Yield MutArray a
arr s
s -> do
                let len :: Int
len = MutArray a -> Int
forall a. MutArray a -> Int
MArray.byteLength MutArray a
buf Int -> Int -> Int
forall a. Num a => a -> a -> a
+ MutArray a -> Int
forall a. MutArray a -> Int
MArray.byteLength MutArray a
arr
                if Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
n
                then Step (SpliceState s (MutArray a)) (MutArray a)
-> m (Step (SpliceState s (MutArray a)) (MutArray a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SpliceState s (MutArray a)) (MutArray a)
 -> m (Step (SpliceState s (MutArray a)) (MutArray a)))
-> Step (SpliceState s (MutArray a)) (MutArray a)
-> m (Step (SpliceState s (MutArray a)) (MutArray a))
forall a b. (a -> b) -> a -> b
$
                    SpliceState s (MutArray a)
-> Step (SpliceState s (MutArray a)) (MutArray a)
forall s a. s -> Step s a
D.Skip (MutArray a
-> SpliceState s (MutArray a) -> SpliceState s (MutArray a)
forall s arr. arr -> SpliceState s arr -> SpliceState s arr
SpliceYielding MutArray a
buf (s -> MutArray a -> SpliceState s (MutArray a)
forall s arr. s -> arr -> SpliceState s arr
SpliceBuffering s
s MutArray a
arr))
                else do
                    MutArray a
buf' <- if MutArray a -> Int
forall a. MutArray a -> Int
MArray.byteCapacity MutArray a
buf Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
n
                            then IO (MutArray a) -> m (MutArray a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (MutArray a) -> m (MutArray a))
-> IO (MutArray a) -> m (MutArray a)
forall a b. (a -> b) -> a -> b
$ Int -> MutArray a -> IO (MutArray a)
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> MutArray a -> m (MutArray a)
MArray.realloc Int
n MutArray a
buf
                            else MutArray a -> m (MutArray a)
forall (m :: * -> *) a. Monad m => a -> m a
return MutArray a
buf
                    MutArray a
buf'' <- MutArray a -> MutArray a -> m (MutArray a)
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
MutArray a -> MutArray a -> m (MutArray a)
MArray.splice MutArray a
buf' MutArray a
arr
                    Step (SpliceState s (MutArray a)) (MutArray a)
-> m (Step (SpliceState s (MutArray a)) (MutArray a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SpliceState s (MutArray a)) (MutArray a)
 -> m (Step (SpliceState s (MutArray a)) (MutArray a)))
-> Step (SpliceState s (MutArray a)) (MutArray a)
-> m (Step (SpliceState s (MutArray a)) (MutArray a))
forall a b. (a -> b) -> a -> b
$ SpliceState s (MutArray a)
-> Step (SpliceState s (MutArray a)) (MutArray a)
forall s a. s -> Step s a
D.Skip (s -> MutArray a -> SpliceState s (MutArray a)
forall s arr. s -> arr -> SpliceState s arr
SpliceBuffering s
s MutArray a
buf'')
            D.Skip s
s -> Step (SpliceState s (MutArray a)) (MutArray a)
-> m (Step (SpliceState s (MutArray a)) (MutArray a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SpliceState s (MutArray a)) (MutArray a)
 -> m (Step (SpliceState s (MutArray a)) (MutArray a)))
-> Step (SpliceState s (MutArray a)) (MutArray a)
-> m (Step (SpliceState s (MutArray a)) (MutArray a))
forall a b. (a -> b) -> a -> b
$ SpliceState s (MutArray a)
-> Step (SpliceState s (MutArray a)) (MutArray a)
forall s a. s -> Step s a
D.Skip (s -> MutArray a -> SpliceState s (MutArray a)
forall s arr. s -> arr -> SpliceState s arr
SpliceBuffering s
s MutArray a
buf)
            Step s (MutArray a)
D.Stop -> Step (SpliceState s (MutArray a)) (MutArray a)
-> m (Step (SpliceState s (MutArray a)) (MutArray a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SpliceState s (MutArray a)) (MutArray a)
 -> m (Step (SpliceState s (MutArray a)) (MutArray a)))
-> Step (SpliceState s (MutArray a)) (MutArray a)
-> m (Step (SpliceState s (MutArray a)) (MutArray a))
forall a b. (a -> b) -> a -> b
$ SpliceState s (MutArray a)
-> Step (SpliceState s (MutArray a)) (MutArray a)
forall s a. s -> Step s a
D.Skip (MutArray a
-> SpliceState s (MutArray a) -> SpliceState s (MutArray a)
forall s arr. arr -> SpliceState s arr -> SpliceState s arr
SpliceYielding MutArray a
buf SpliceState s (MutArray a)
forall s arr. SpliceState s arr
SpliceFinish)

    step' State StreamK m (MutArray a)
_ SpliceState s (MutArray a)
SpliceFinish = Step (SpliceState s (MutArray a)) (MutArray a)
-> m (Step (SpliceState s (MutArray a)) (MutArray a))
forall (m :: * -> *) a. Monad m => a -> m a
return Step (SpliceState s (MutArray a)) (MutArray a)
forall s a. Step s a
D.Stop

    step' State StreamK m (MutArray a)
_ (SpliceYielding MutArray a
arr SpliceState s (MutArray a)
next) = Step (SpliceState s (MutArray a)) (MutArray a)
-> m (Step (SpliceState s (MutArray a)) (MutArray a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SpliceState s (MutArray a)) (MutArray a)
 -> m (Step (SpliceState s (MutArray a)) (MutArray a)))
-> Step (SpliceState s (MutArray a)) (MutArray a)
-> m (Step (SpliceState s (MutArray a)) (MutArray a))
forall a b. (a -> b) -> a -> b
$ MutArray a
-> SpliceState s (MutArray a)
-> Step (SpliceState s (MutArray a)) (MutArray a)
forall s a. a -> s -> Step s a
D.Yield MutArray a
arr SpliceState s (MutArray a)
next

-- XXX Remove this once compactLEFold is implemented
-- lpackArraysChunksOf = Fold.many compactLEFold
--
{-# INLINE_NORMAL lpackArraysChunksOf #-}
lpackArraysChunksOf :: (MonadIO m, Unbox a)
    => Int -> Fold m (MutArray a) () -> Fold m (MutArray a) ()
lpackArraysChunksOf :: Int -> Fold m (MutArray a) () -> Fold m (MutArray a) ()
lpackArraysChunksOf Int
n (Fold s -> MutArray a -> m (Step s ())
step1 m (Step s ())
initial1 s -> m ()
extract1) =
    (Tuple' (Maybe (MutArray a)) s
 -> MutArray a -> m (Step (Tuple' (Maybe (MutArray a)) s) ()))
-> m (Step (Tuple' (Maybe (MutArray a)) s) ())
-> (Tuple' (Maybe (MutArray a)) s -> m ())
-> Fold m (MutArray a) ()
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold Tuple' (Maybe (MutArray a)) s
-> MutArray a -> m (Step (Tuple' (Maybe (MutArray a)) s) ())
step m (Step (Tuple' (Maybe (MutArray a)) s) ())
forall a. m (Step (Tuple' (Maybe a) s) ())
initial Tuple' (Maybe (MutArray a)) s -> m ()
extract

    where

    initial :: m (Step (Tuple' (Maybe a) s) ())
initial = do
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
            -- XXX we can pass the module string from the higher level API
            [Char] -> m ()
forall a. HasCallStack => [Char] -> a
error ([Char] -> m ()) -> [Char] -> m ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Streamly.Internal.Data.Array.Mut.Type.packArraysChunksOf: the size of "
                 [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
"arrays [" [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
n [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
"] must be a natural number"

        Step s ()
r <- m (Step s ())
initial1
        Step (Tuple' (Maybe a) s) () -> m (Step (Tuple' (Maybe a) s) ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple' (Maybe a) s) () -> m (Step (Tuple' (Maybe a) s) ()))
-> Step (Tuple' (Maybe a) s) () -> m (Step (Tuple' (Maybe a) s) ())
forall a b. (a -> b) -> a -> b
$ (s -> Tuple' (Maybe a) s)
-> Step s () -> Step (Tuple' (Maybe a) s) ()
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first (Maybe a -> s -> Tuple' (Maybe a) s
forall a b. a -> b -> Tuple' a b
Tuple' Maybe a
forall a. Maybe a
Nothing) Step s ()
r

    extract :: Tuple' (Maybe (MutArray a)) s -> m ()
extract (Tuple' Maybe (MutArray a)
Nothing s
r1) = s -> m ()
extract1 s
r1
    extract (Tuple' (Just MutArray a
buf) s
r1) = do
        Step s ()
r <- s -> MutArray a -> m (Step s ())
step1 s
r1 MutArray a
buf
        case Step s ()
r of
            FL.Partial s
rr -> s -> m ()
extract1 s
rr
            FL.Done ()
_ -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

    step :: Tuple' (Maybe (MutArray a)) s
-> MutArray a -> m (Step (Tuple' (Maybe (MutArray a)) s) ())
step (Tuple' Maybe (MutArray a)
Nothing s
r1) MutArray a
arr =
            let len :: Int
len = MutArray a -> Int
forall a. MutArray a -> Int
MArray.byteLength MutArray a
arr
             in if Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n
                then do
                    Step s ()
r <- s -> MutArray a -> m (Step s ())
step1 s
r1 MutArray a
arr
                    case Step s ()
r of
                        FL.Done ()
_ -> Step (Tuple' (Maybe (MutArray a)) s) ()
-> m (Step (Tuple' (Maybe (MutArray a)) s) ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple' (Maybe (MutArray a)) s) ()
 -> m (Step (Tuple' (Maybe (MutArray a)) s) ()))
-> Step (Tuple' (Maybe (MutArray a)) s) ()
-> m (Step (Tuple' (Maybe (MutArray a)) s) ())
forall a b. (a -> b) -> a -> b
$ () -> Step (Tuple' (Maybe (MutArray a)) s) ()
forall s b. b -> Step s b
FL.Done ()
                        FL.Partial s
s -> do
                            s -> m ()
extract1 s
s
                            Step s ()
res <- m (Step s ())
initial1
                            Step (Tuple' (Maybe (MutArray a)) s) ()
-> m (Step (Tuple' (Maybe (MutArray a)) s) ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple' (Maybe (MutArray a)) s) ()
 -> m (Step (Tuple' (Maybe (MutArray a)) s) ()))
-> Step (Tuple' (Maybe (MutArray a)) s) ()
-> m (Step (Tuple' (Maybe (MutArray a)) s) ())
forall a b. (a -> b) -> a -> b
$ (s -> Tuple' (Maybe (MutArray a)) s)
-> Step s () -> Step (Tuple' (Maybe (MutArray a)) s) ()
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first (Maybe (MutArray a) -> s -> Tuple' (Maybe (MutArray a)) s
forall a b. a -> b -> Tuple' a b
Tuple' Maybe (MutArray a)
forall a. Maybe a
Nothing) Step s ()
res
                else Step (Tuple' (Maybe (MutArray a)) s) ()
-> m (Step (Tuple' (Maybe (MutArray a)) s) ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple' (Maybe (MutArray a)) s) ()
 -> m (Step (Tuple' (Maybe (MutArray a)) s) ()))
-> Step (Tuple' (Maybe (MutArray a)) s) ()
-> m (Step (Tuple' (Maybe (MutArray a)) s) ())
forall a b. (a -> b) -> a -> b
$ Tuple' (Maybe (MutArray a)) s
-> Step (Tuple' (Maybe (MutArray a)) s) ()
forall s b. s -> Step s b
FL.Partial (Tuple' (Maybe (MutArray a)) s
 -> Step (Tuple' (Maybe (MutArray a)) s) ())
-> Tuple' (Maybe (MutArray a)) s
-> Step (Tuple' (Maybe (MutArray a)) s) ()
forall a b. (a -> b) -> a -> b
$ Maybe (MutArray a) -> s -> Tuple' (Maybe (MutArray a)) s
forall a b. a -> b -> Tuple' a b
Tuple' (MutArray a -> Maybe (MutArray a)
forall a. a -> Maybe a
Just MutArray a
arr) s
r1

    step (Tuple' (Just MutArray a
buf) s
r1) MutArray a
arr = do
            let len :: Int
len = MutArray a -> Int
forall a. MutArray a -> Int
MArray.byteLength MutArray a
buf Int -> Int -> Int
forall a. Num a => a -> a -> a
+ MutArray a -> Int
forall a. MutArray a -> Int
MArray.byteLength MutArray a
arr
            MutArray a
buf' <- if MutArray a -> Int
forall a. MutArray a -> Int
MArray.byteCapacity MutArray a
buf Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
len
                    then IO (MutArray a) -> m (MutArray a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (MutArray a) -> m (MutArray a))
-> IO (MutArray a) -> m (MutArray a)
forall a b. (a -> b) -> a -> b
$ Int -> MutArray a -> IO (MutArray a)
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> MutArray a -> m (MutArray a)
MArray.realloc (Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
n Int
len) MutArray a
buf
                    else MutArray a -> m (MutArray a)
forall (m :: * -> *) a. Monad m => a -> m a
return MutArray a
buf
            MutArray a
buf'' <- MutArray a -> MutArray a -> m (MutArray a)
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
MutArray a -> MutArray a -> m (MutArray a)
MArray.splice MutArray a
buf' MutArray a
arr

            -- XXX this is common in both the equations of step
            if Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n
            then do
                Step s ()
r <- s -> MutArray a -> m (Step s ())
step1 s
r1 MutArray a
buf''
                case Step s ()
r of
                    FL.Done ()
_ -> Step (Tuple' (Maybe (MutArray a)) s) ()
-> m (Step (Tuple' (Maybe (MutArray a)) s) ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple' (Maybe (MutArray a)) s) ()
 -> m (Step (Tuple' (Maybe (MutArray a)) s) ()))
-> Step (Tuple' (Maybe (MutArray a)) s) ()
-> m (Step (Tuple' (Maybe (MutArray a)) s) ())
forall a b. (a -> b) -> a -> b
$ () -> Step (Tuple' (Maybe (MutArray a)) s) ()
forall s b. b -> Step s b
FL.Done ()
                    FL.Partial s
s -> do
                        s -> m ()
extract1 s
s
                        Step s ()
res <- m (Step s ())
initial1
                        Step (Tuple' (Maybe (MutArray a)) s) ()
-> m (Step (Tuple' (Maybe (MutArray a)) s) ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple' (Maybe (MutArray a)) s) ()
 -> m (Step (Tuple' (Maybe (MutArray a)) s) ()))
-> Step (Tuple' (Maybe (MutArray a)) s) ()
-> m (Step (Tuple' (Maybe (MutArray a)) s) ())
forall a b. (a -> b) -> a -> b
$ (s -> Tuple' (Maybe (MutArray a)) s)
-> Step s () -> Step (Tuple' (Maybe (MutArray a)) s) ()
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first (Maybe (MutArray a) -> s -> Tuple' (Maybe (MutArray a)) s
forall a b. a -> b -> Tuple' a b
Tuple' Maybe (MutArray a)
forall a. Maybe a
Nothing) Step s ()
res
            else Step (Tuple' (Maybe (MutArray a)) s) ()
-> m (Step (Tuple' (Maybe (MutArray a)) s) ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple' (Maybe (MutArray a)) s) ()
 -> m (Step (Tuple' (Maybe (MutArray a)) s) ()))
-> Step (Tuple' (Maybe (MutArray a)) s) ()
-> m (Step (Tuple' (Maybe (MutArray a)) s) ())
forall a b. (a -> b) -> a -> b
$ Tuple' (Maybe (MutArray a)) s
-> Step (Tuple' (Maybe (MutArray a)) s) ()
forall s b. s -> Step s b
FL.Partial (Tuple' (Maybe (MutArray a)) s
 -> Step (Tuple' (Maybe (MutArray a)) s) ())
-> Tuple' (Maybe (MutArray a)) s
-> Step (Tuple' (Maybe (MutArray a)) s) ()
forall a b. (a -> b) -> a -> b
$ Maybe (MutArray a) -> s -> Tuple' (Maybe (MutArray a)) s
forall a b. a -> b -> Tuple' a b
Tuple' (MutArray a -> Maybe (MutArray a)
forall a. a -> Maybe a
Just MutArray a
buf'') s
r1

-- XXX Same as compactLE, to be removed once that is implemented.
--
-- | Coalesce adjacent arrays in incoming stream to form bigger arrays of a
-- maximum specified size in bytes.
--
-- /Internal/
{-# INLINE compact #-}
compact :: (MonadIO m, Unbox a)
    => Int -> Stream m (MutArray a) -> Stream m (MutArray a)
compact :: Int -> Stream m (MutArray a) -> Stream m (MutArray a)
compact = Int -> Stream m (MutArray a) -> Stream m (MutArray a)
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Stream m (MutArray a) -> Stream m (MutArray a)
packArraysChunksOf

-- | Coalesce adjacent arrays in incoming stream to form bigger arrays of a
-- maximum specified size. Note that if a single array is bigger than the
-- specified size we do not split it to fit. When we coalesce multiple arrays
-- if the size would exceed the specified size we do not coalesce therefore the
-- actual array size may be less than the specified chunk size.
--
-- /Internal/
{-# INLINE_NORMAL compactLEParserD #-}
compactLEParserD ::
       forall m a. (MonadIO m, Unbox a)
    => Int -> ParserD.Parser (MutArray a) m (MutArray a)
compactLEParserD :: Int -> Parser (MutArray a) m (MutArray a)
compactLEParserD Int
n = (Maybe (MutArray a)
 -> MutArray a -> m (Step (Maybe (MutArray a)) (MutArray a)))
-> m (Initial (Maybe (MutArray a)) (MutArray a))
-> (Maybe (MutArray a)
    -> m (Step (Maybe (MutArray a)) (MutArray a)))
-> Parser (MutArray a) m (MutArray a)
forall a (m :: * -> *) b s.
(s -> a -> m (Step s b))
-> m (Initial s b) -> (s -> m (Step s b)) -> Parser a m b
ParserD.Parser Maybe (MutArray a)
-> MutArray a -> m (Step (Maybe (MutArray a)) (MutArray a))
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Maybe (MutArray a)
-> MutArray a -> m (Step (Maybe (MutArray a)) (MutArray a))
step m (Initial (Maybe (MutArray a)) (MutArray a))
forall a b. m (Initial (Maybe a) b)
initial Maybe (MutArray a) -> m (Step (Maybe (MutArray a)) (MutArray a))
forall (m :: * -> *) a s.
Monad m =>
Maybe (MutArray a) -> m (Step s (MutArray a))
extract

    where

    nBytes :: Int
nBytes = Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
* SIZE_OF(a)

    initial :: m (Initial (Maybe a) b)
initial =
        Initial (Maybe a) b -> m (Initial (Maybe a) b)
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Initial (Maybe a) b -> m (Initial (Maybe a) b))
-> Initial (Maybe a) b -> m (Initial (Maybe a) b)
forall a b. (a -> b) -> a -> b
$ if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
              then [Char] -> Initial (Maybe a) b
forall a. HasCallStack => [Char] -> a
error
                       ([Char] -> Initial (Maybe a) b) -> [Char] -> Initial (Maybe a) b
forall a b. (a -> b) -> a -> b
$ [Char]
functionPath
                       [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
": the size of arrays ["
                       [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
n [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
"] must be a natural number"
              else Maybe a -> Initial (Maybe a) b
forall s b. s -> Initial s b
ParserD.IPartial Maybe a
forall a. Maybe a
Nothing

    step :: Maybe (MutArray a)
-> MutArray a -> m (Step (Maybe (MutArray a)) (MutArray a))
step Maybe (MutArray a)
Nothing MutArray a
arr =
        Step (Maybe (MutArray a)) (MutArray a)
-> m (Step (Maybe (MutArray a)) (MutArray a))
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (Maybe (MutArray a)) (MutArray a)
 -> m (Step (Maybe (MutArray a)) (MutArray a)))
-> Step (Maybe (MutArray a)) (MutArray a)
-> m (Step (Maybe (MutArray a)) (MutArray a))
forall a b. (a -> b) -> a -> b
$ let len :: Int
len = MutArray a -> Int
forall a. MutArray a -> Int
MArray.byteLength MutArray a
arr
               in if Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
nBytes
                  then Int -> MutArray a -> Step (Maybe (MutArray a)) (MutArray a)
forall s b. Int -> b -> Step s b
ParserD.Done Int
0 MutArray a
arr
                  else Int -> Maybe (MutArray a) -> Step (Maybe (MutArray a)) (MutArray a)
forall s b. Int -> s -> Step s b
ParserD.Partial Int
0 (MutArray a -> Maybe (MutArray a)
forall a. a -> Maybe a
Just MutArray a
arr)
    step (Just MutArray a
buf) MutArray a
arr =
        let len :: Int
len = MutArray a -> Int
forall a. MutArray a -> Int
MArray.byteLength MutArray a
buf Int -> Int -> Int
forall a. Num a => a -> a -> a
+ MutArray a -> Int
forall a. MutArray a -> Int
MArray.byteLength MutArray a
arr
         in if Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
nBytes
            then Step (Maybe (MutArray a)) (MutArray a)
-> m (Step (Maybe (MutArray a)) (MutArray a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (MutArray a)) (MutArray a)
 -> m (Step (Maybe (MutArray a)) (MutArray a)))
-> Step (Maybe (MutArray a)) (MutArray a)
-> m (Step (Maybe (MutArray a)) (MutArray a))
forall a b. (a -> b) -> a -> b
$ Int -> MutArray a -> Step (Maybe (MutArray a)) (MutArray a)
forall s b. Int -> b -> Step s b
ParserD.Done Int
1 MutArray a
buf
            else do
                MutArray a
buf1 <-
                    if MutArray a -> Int
forall a. MutArray a -> Int
MArray.byteCapacity MutArray a
buf Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
nBytes
                    then IO (MutArray a) -> m (MutArray a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (MutArray a) -> m (MutArray a))
-> IO (MutArray a) -> m (MutArray a)
forall a b. (a -> b) -> a -> b
$ Int -> MutArray a -> IO (MutArray a)
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> MutArray a -> m (MutArray a)
MArray.realloc Int
nBytes MutArray a
buf
                    else MutArray a -> m (MutArray a)
forall (m :: * -> *) a. Monad m => a -> m a
return MutArray a
buf
                MutArray a
buf2 <- MutArray a -> MutArray a -> m (MutArray a)
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
MutArray a -> MutArray a -> m (MutArray a)
MArray.splice MutArray a
buf1 MutArray a
arr
                Step (Maybe (MutArray a)) (MutArray a)
-> m (Step (Maybe (MutArray a)) (MutArray a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (MutArray a)) (MutArray a)
 -> m (Step (Maybe (MutArray a)) (MutArray a)))
-> Step (Maybe (MutArray a)) (MutArray a)
-> m (Step (Maybe (MutArray a)) (MutArray a))
forall a b. (a -> b) -> a -> b
$ Int -> Maybe (MutArray a) -> Step (Maybe (MutArray a)) (MutArray a)
forall s b. Int -> s -> Step s b
ParserD.Partial Int
0 (MutArray a -> Maybe (MutArray a)
forall a. a -> Maybe a
Just MutArray a
buf2)

    extract :: Maybe (MutArray a) -> m (Step s (MutArray a))
extract Maybe (MutArray a)
Nothing = Step s (MutArray a) -> m (Step s (MutArray a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s (MutArray a) -> m (Step s (MutArray a)))
-> Step s (MutArray a) -> m (Step s (MutArray a))
forall a b. (a -> b) -> a -> b
$ Int -> MutArray a -> Step s (MutArray a)
forall s b. Int -> b -> Step s b
ParserD.Done Int
0 MutArray a
forall a. MutArray a
MArray.nil
    extract (Just MutArray a
buf) = Step s (MutArray a) -> m (Step s (MutArray a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s (MutArray a) -> m (Step s (MutArray a)))
-> Step s (MutArray a) -> m (Step s (MutArray a))
forall a b. (a -> b) -> a -> b
$ Int -> MutArray a -> Step s (MutArray a)
forall s b. Int -> b -> Step s b
ParserD.Done Int
0 MutArray a
buf

    functionPath :: [Char]
functionPath =
        [Char]
"Streamly.Internal.Data.Array.Mut.Stream.compactLEParserD"

-- | Coalesce adjacent arrays in incoming stream to form bigger arrays of a
-- minimum specified size. Note that if all the arrays in the stream together
-- are smaller than the specified size the resulting array will be smaller than
-- the specified size. When we coalesce multiple arrays if the size would exceed
-- the specified size we stop coalescing further.
--
-- /Internal/
{-# INLINE_NORMAL compactGEFold #-}
compactGEFold ::
       forall m a. (MonadIO m, Unbox a)
    => Int -> FL.Fold m (MutArray a) (MutArray a)
compactGEFold :: Int -> Fold m (MutArray a) (MutArray a)
compactGEFold Int
n = (Maybe (MutArray a)
 -> MutArray a -> m (Step (Maybe (MutArray a)) (MutArray a)))
-> m (Step (Maybe (MutArray a)) (MutArray a))
-> (Maybe (MutArray a) -> m (MutArray a))
-> Fold m (MutArray a) (MutArray a)
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold Maybe (MutArray a)
-> MutArray a -> m (Step (Maybe (MutArray a)) (MutArray a))
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Maybe (MutArray a)
-> MutArray a -> m (Step (Maybe (MutArray a)) (MutArray a))
step m (Step (Maybe (MutArray a)) (MutArray a))
forall a b. m (Step (Maybe a) b)
initial Maybe (MutArray a) -> m (MutArray a)
forall (m :: * -> *) a.
Monad m =>
Maybe (MutArray a) -> m (MutArray a)
extract

    where

    nBytes :: Int
nBytes = Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
* SIZE_OF(a)

    initial :: m (Step (Maybe a) b)
initial =
        Step (Maybe a) b -> m (Step (Maybe a) b)
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (Maybe a) b -> m (Step (Maybe a) b))
-> Step (Maybe a) b -> m (Step (Maybe a) b)
forall a b. (a -> b) -> a -> b
$ if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
0
              then [Char] -> Step (Maybe a) b
forall a. HasCallStack => [Char] -> a
error
                       ([Char] -> Step (Maybe a) b) -> [Char] -> Step (Maybe a) b
forall a b. (a -> b) -> a -> b
$ [Char]
functionPath
                       [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
": the size of arrays ["
                       [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
n [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
"] must be a natural number"
              else Maybe a -> Step (Maybe a) b
forall s b. s -> Step s b
FL.Partial Maybe a
forall a. Maybe a
Nothing

    step :: Maybe (MutArray a)
-> MutArray a -> m (Step (Maybe (MutArray a)) (MutArray a))
step Maybe (MutArray a)
Nothing MutArray a
arr =
        Step (Maybe (MutArray a)) (MutArray a)
-> m (Step (Maybe (MutArray a)) (MutArray a))
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (Maybe (MutArray a)) (MutArray a)
 -> m (Step (Maybe (MutArray a)) (MutArray a)))
-> Step (Maybe (MutArray a)) (MutArray a)
-> m (Step (Maybe (MutArray a)) (MutArray a))
forall a b. (a -> b) -> a -> b
$ let len :: Int
len = MutArray a -> Int
forall a. MutArray a -> Int
MArray.byteLength MutArray a
arr
               in if Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
nBytes
                  then MutArray a -> Step (Maybe (MutArray a)) (MutArray a)
forall s b. b -> Step s b
FL.Done MutArray a
arr
                  else Maybe (MutArray a) -> Step (Maybe (MutArray a)) (MutArray a)
forall s b. s -> Step s b
FL.Partial (MutArray a -> Maybe (MutArray a)
forall a. a -> Maybe a
Just MutArray a
arr)
    step (Just MutArray a
buf) MutArray a
arr = do
        let len :: Int
len = MutArray a -> Int
forall a. MutArray a -> Int
MArray.byteLength MutArray a
buf Int -> Int -> Int
forall a. Num a => a -> a -> a
+ MutArray a -> Int
forall a. MutArray a -> Int
MArray.byteLength MutArray a
arr
        MutArray a
buf1 <-
            if MutArray a -> Int
forall a. MutArray a -> Int
MArray.byteCapacity MutArray a
buf Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
len
            then IO (MutArray a) -> m (MutArray a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (MutArray a) -> m (MutArray a))
-> IO (MutArray a) -> m (MutArray a)
forall a b. (a -> b) -> a -> b
$ Int -> MutArray a -> IO (MutArray a)
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> MutArray a -> m (MutArray a)
MArray.realloc (Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
len Int
nBytes) MutArray a
buf
            else MutArray a -> m (MutArray a)
forall (m :: * -> *) a. Monad m => a -> m a
return MutArray a
buf
        MutArray a
buf2 <- MutArray a -> MutArray a -> m (MutArray a)
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
MutArray a -> MutArray a -> m (MutArray a)
MArray.splice MutArray a
buf1 MutArray a
arr
        if Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n
        then Step (Maybe (MutArray a)) (MutArray a)
-> m (Step (Maybe (MutArray a)) (MutArray a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (MutArray a)) (MutArray a)
 -> m (Step (Maybe (MutArray a)) (MutArray a)))
-> Step (Maybe (MutArray a)) (MutArray a)
-> m (Step (Maybe (MutArray a)) (MutArray a))
forall a b. (a -> b) -> a -> b
$ MutArray a -> Step (Maybe (MutArray a)) (MutArray a)
forall s b. b -> Step s b
FL.Done MutArray a
buf2
        else Step (Maybe (MutArray a)) (MutArray a)
-> m (Step (Maybe (MutArray a)) (MutArray a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (MutArray a)) (MutArray a)
 -> m (Step (Maybe (MutArray a)) (MutArray a)))
-> Step (Maybe (MutArray a)) (MutArray a)
-> m (Step (Maybe (MutArray a)) (MutArray a))
forall a b. (a -> b) -> a -> b
$ Maybe (MutArray a) -> Step (Maybe (MutArray a)) (MutArray a)
forall s b. s -> Step s b
FL.Partial (MutArray a -> Maybe (MutArray a)
forall a. a -> Maybe a
Just MutArray a
buf2)

    extract :: Maybe (MutArray a) -> m (MutArray a)
extract Maybe (MutArray a)
Nothing = MutArray a -> m (MutArray a)
forall (m :: * -> *) a. Monad m => a -> m a
return MutArray a
forall a. MutArray a
MArray.nil
    extract (Just MutArray a
buf) = MutArray a -> m (MutArray a)
forall (m :: * -> *) a. Monad m => a -> m a
return MutArray a
buf

    functionPath :: [Char]
functionPath =
        [Char]
"Streamly.Internal.Data.Array.Mut.Stream.compactGEFold"

-- | Coalesce adjacent arrays in incoming stream to form bigger arrays of a
-- maximum specified size in bytes.
--
-- /Internal/
compactLE :: (MonadIO m, Unbox a) =>
    Int -> Stream m (MutArray a) -> Stream m (Either ParseError (MutArray a))
compactLE :: Int
-> Stream m (MutArray a)
-> Stream m (Either ParseError (MutArray a))
compactLE Int
n = Parser (MutArray a) m (MutArray a)
-> Stream m (MutArray a)
-> Stream m (Either ParseError (MutArray a))
forall (m :: * -> *) a b.
Monad m =>
Parser a m b -> Stream m a -> Stream m (Either ParseError b)
D.parseManyD (Int -> Parser (MutArray a) m (MutArray a)
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Parser (MutArray a) m (MutArray a)
compactLEParserD Int
n)

-- | Like 'compactLE' but generates arrays of exactly equal to the size
-- specified except for the last array in the stream which could be shorter.
--
-- /Unimplemented/
{-# INLINE compactEQ #-}
compactEQ :: -- (MonadIO m, Unbox a) =>
    Int -> Stream m (MutArray a) -> Stream m (MutArray a)
compactEQ :: Int -> Stream m (MutArray a) -> Stream m (MutArray a)
compactEQ Int
_n Stream m (MutArray a)
_xs = Stream m (MutArray a)
forall a. HasCallStack => a
undefined
    -- IsStream.fromStreamD $ D.foldMany (compactEQFold n) (IsStream.toStreamD xs)

-- | Like 'compactLE' but generates arrays of size greater than or equal to the
-- specified except for the last array in the stream which could be shorter.
--
-- /Internal/
{-# INLINE compactGE #-}
compactGE ::
       (MonadIO m, Unbox a)
    => Int -> Stream m (MutArray a) -> Stream m (MutArray a)
compactGE :: Int -> Stream m (MutArray a) -> Stream m (MutArray a)
compactGE Int
n = Fold m (MutArray a) (MutArray a)
-> Stream m (MutArray a) -> Stream m (MutArray a)
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
D.foldMany (Int -> Fold m (MutArray a) (MutArray a)
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Fold m (MutArray a) (MutArray a)
compactGEFold Int
n)