{-# LANGUAGE CPP #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
#include "inline.hs"
module Streamly.Internal.Memory.ArrayStream
(
arraysOf
, concat
, concatRev
, interpose
, interposeSuffix
, intercalateSuffix
, splitOn
, splitOnSuffix
, compact
, toArray
)
where
import Control.Monad.IO.Class (MonadIO(..))
import Data.Word (Word8)
import Foreign.ForeignPtr (withForeignPtr)
import Foreign.Ptr (minusPtr, plusPtr, castPtr)
import Foreign.Storable (Storable(..))
import Prelude hiding (length, null, last, map, (!!), read, concat)
import Streamly.Internal.Memory.Array.Types (Array(..), length)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Stream.StreamK.Type (IsStream)
import qualified Streamly.Internal.Memory.Array as A
import qualified Streamly.Internal.Memory.Array.Types as A
import qualified Streamly.Internal.Prelude as S
import qualified Streamly.Internal.Data.Stream.StreamD as D
import qualified Streamly.Internal.Data.Stream.Prelude as P
{-# INLINE concat #-}
concat :: (IsStream t, MonadIO m, Storable a) => t m (Array a) -> t m a
concat :: t m (Array a) -> t m a
concat t m (Array a)
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ Unfold m (Array a) a -> Stream m (Array a) -> Stream m a
forall (m :: * -> *) a b.
Monad m =>
Unfold m a b -> Stream m a -> Stream m b
D.concatMapU Unfold m (Array a) a
forall (m :: * -> *) a.
(Monad m, Storable a) =>
Unfold m (Array a) a
A.read (t m (Array a) -> Stream m (Array a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m (Array a)
m)
{-# INLINE concatRev #-}
concatRev :: (IsStream t, MonadIO m, Storable a) => t m (Array a) -> t m a
concatRev :: t m (Array a) -> t m a
concatRev t m (Array a)
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ Stream m (Array a) -> Stream m a
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Stream m (Array a) -> Stream m a
A.flattenArraysRev (t m (Array a) -> Stream m (Array a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m (Array a)
m)
{-# INLINE interpose #-}
interpose :: (MonadIO m, IsStream t, Storable a) => a -> t m (Array a) -> t m a
interpose :: a -> t m (Array a) -> t m a
interpose a
x = a -> Unfold m (Array a) a -> t m (Array a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) c b.
(IsStream t, Monad m) =>
c -> Unfold m b c -> t m b -> t m c
S.interpose a
x Unfold m (Array a) a
forall (m :: * -> *) a.
(Monad m, Storable a) =>
Unfold m (Array a) a
A.read
{-# INLINE intercalateSuffix #-}
intercalateSuffix :: (MonadIO m, IsStream t, Storable a)
=> Array a -> t m (Array a) -> t m a
intercalateSuffix :: Array a -> t m (Array a) -> t m a
intercalateSuffix Array a
arr = Array a -> Unfold m (Array a) a -> t m (Array a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b c.
(IsStream t, Monad m) =>
b -> Unfold m b c -> t m b -> t m c
S.intercalateSuffix Array a
arr Unfold m (Array a) a
forall (m :: * -> *) a.
(Monad m, Storable a) =>
Unfold m (Array a) a
A.read
{-# INLINE interposeSuffix #-}
interposeSuffix :: (MonadIO m, IsStream t, Storable a)
=> a -> t m (Array a) -> t m a
interposeSuffix :: a -> t m (Array a) -> t m a
interposeSuffix a
x = a -> Unfold m (Array a) a -> t m (Array a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) c b.
(IsStream t, Monad m) =>
c -> Unfold m b c -> t m b -> t m c
S.interposeSuffix a
x Unfold m (Array a) a
forall (m :: * -> *) a.
(Monad m, Storable a) =>
Unfold m (Array a) a
A.read
{-# INLINE splitOn #-}
splitOn
:: (IsStream t, MonadIO m)
=> Word8
-> t m (Array Word8)
-> t m (Array Word8)
splitOn :: Word8 -> t m (Array Word8) -> t m (Array Word8)
splitOn Word8
byte t m (Array Word8)
s =
Stream m (Array Word8) -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m (Array Word8) -> t m (Array Word8))
-> Stream m (Array Word8) -> t m (Array Word8)
forall a b. (a -> b) -> a -> b
$ (Array Word8 -> m (Array Word8, Maybe (Array Word8)))
-> (Array Word8 -> Array Word8 -> m (Array Word8))
-> Stream m (Array Word8)
-> Stream m (Array Word8)
forall (m :: * -> *) (f :: * -> *) a.
Monad m =>
(f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a)
D.splitInnerBy (Word8 -> Array Word8 -> m (Array Word8, Maybe (Array Word8))
forall (m :: * -> *).
MonadIO m =>
Word8 -> Array Word8 -> m (Array Word8, Maybe (Array Word8))
A.breakOn Word8
byte) Array Word8 -> Array Word8 -> m (Array Word8)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Array a -> Array a -> m (Array a)
A.spliceTwo (Stream m (Array Word8) -> Stream m (Array Word8))
-> Stream m (Array Word8) -> Stream m (Array Word8)
forall a b. (a -> b) -> a -> b
$ t m (Array Word8) -> Stream m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m (Array Word8)
s
{-# INLINE splitOnSuffix #-}
splitOnSuffix
:: (IsStream t, MonadIO m)
=> Word8
-> t m (Array Word8)
-> t m (Array Word8)
splitOnSuffix :: Word8 -> t m (Array Word8) -> t m (Array Word8)
splitOnSuffix Word8
byte t m (Array Word8)
s =
Stream m (Array Word8) -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m (Array Word8) -> t m (Array Word8))
-> Stream m (Array Word8) -> t m (Array Word8)
forall a b. (a -> b) -> a -> b
$ (Array Word8 -> m (Array Word8, Maybe (Array Word8)))
-> (Array Word8 -> Array Word8 -> m (Array Word8))
-> Stream m (Array Word8)
-> Stream m (Array Word8)
forall (m :: * -> *) (f :: * -> *) a.
(Monad m, Eq (f a), Monoid (f a)) =>
(f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a)
D.splitInnerBySuffix (Word8 -> Array Word8 -> m (Array Word8, Maybe (Array Word8))
forall (m :: * -> *).
MonadIO m =>
Word8 -> Array Word8 -> m (Array Word8, Maybe (Array Word8))
A.breakOn Word8
byte) Array Word8 -> Array Word8 -> m (Array Word8)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Array a -> Array a -> m (Array a)
A.spliceTwo (Stream m (Array Word8) -> Stream m (Array Word8))
-> Stream m (Array Word8) -> Stream m (Array Word8)
forall a b. (a -> b) -> a -> b
$ t m (Array Word8) -> Stream m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m (Array Word8)
s
{-# INLINE compact #-}
compact :: (MonadIO m, Storable a)
=> Int -> SerialT m (Array a) -> SerialT m (Array a)
compact :: Int -> SerialT m (Array a) -> SerialT m (Array a)
compact Int
n SerialT m (Array a)
xs = Stream m (Array a) -> SerialT m (Array a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m (Array a) -> SerialT m (Array a))
-> Stream m (Array a) -> SerialT m (Array a)
forall a b. (a -> b) -> a -> b
$ Int -> Stream m (Array a) -> Stream m (Array a)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Stream m (Array a) -> Stream m (Array a)
A.packArraysChunksOf Int
n (SerialT m (Array a) -> Stream m (Array a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD SerialT m (Array a)
xs)
{-# INLINE arraysOf #-}
arraysOf :: (IsStream t, MonadIO m, Storable a)
=> Int -> t m a -> t m (Array a)
arraysOf :: Int -> t m a -> t m (Array a)
arraysOf Int
n t m a
str =
Stream m (Array a) -> t m (Array a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m (Array a) -> t m (Array a))
-> Stream m (Array a) -> t m (Array a)
forall a b. (a -> b) -> a -> b
$ Int -> Stream m a -> Stream m (Array a)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Stream m a -> Stream m (Array a)
A.fromStreamDArraysOf Int
n (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
str)
{-# INLINE spliceArraysLenUnsafe #-}
spliceArraysLenUnsafe :: (MonadIO m, Storable a)
=> Int -> SerialT m (Array a) -> m (Array a)
spliceArraysLenUnsafe :: Int -> SerialT m (Array a) -> m (Array a)
spliceArraysLenUnsafe Int
len SerialT m (Array a)
buffered = do
Array a
arr <- IO (Array a) -> m (Array a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Array a) -> m (Array a)) -> IO (Array a) -> m (Array a)
forall a b. (a -> b) -> a -> b
$ Int -> IO (Array a)
forall a. Storable a => Int -> IO (Array a)
A.newArray Int
len
Ptr a
end <- (Ptr a -> Array a -> m (Ptr a))
-> Ptr a -> SerialT m (Array a) -> m (Ptr a)
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> b -> SerialT m a -> m b
S.foldlM' Ptr a -> Array a -> m (Ptr a)
forall (m :: * -> *) a a b.
MonadIO m =>
Ptr a -> Array a -> m (Ptr b)
writeArr (Array a -> Ptr a
forall a. Array a -> Ptr a
aEnd Array a
arr) SerialT m (Array a)
buffered
Array a -> m (Array a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Array a -> m (Array a)) -> Array a -> m (Array a)
forall a b. (a -> b) -> a -> b
$ Array a
arr {aEnd :: Ptr a
aEnd = Ptr a
end}
where
writeArr :: Ptr a -> Array a -> m (Ptr b)
writeArr Ptr a
dst Array{Ptr a
ForeignPtr a
aBound :: forall a. Array a -> Ptr a
aStart :: forall a. Array a -> ForeignPtr a
aBound :: Ptr a
aEnd :: Ptr a
aStart :: ForeignPtr a
aEnd :: forall a. Array a -> Ptr a
..} =
IO (Ptr b) -> m (Ptr b)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Ptr b) -> m (Ptr b)) -> IO (Ptr b) -> m (Ptr b)
forall a b. (a -> b) -> a -> b
$ ForeignPtr a -> (Ptr a -> IO (Ptr b)) -> IO (Ptr b)
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr a
aStart ((Ptr a -> IO (Ptr b)) -> IO (Ptr b))
-> (Ptr a -> IO (Ptr b)) -> IO (Ptr b)
forall a b. (a -> b) -> a -> b
$ \Ptr a
src -> do
let count :: Int
count = Ptr a
aEnd Ptr a -> Ptr a -> Int
forall a b. Ptr a -> Ptr b -> Int
`minusPtr` Ptr a
src
Ptr Word8 -> Ptr Word8 -> Int -> IO ()
A.memcpy (Ptr a -> Ptr Word8
forall a b. Ptr a -> Ptr b
castPtr Ptr a
dst) (Ptr a -> Ptr Word8
forall a b. Ptr a -> Ptr b
castPtr Ptr a
src) Int
count
Ptr b -> IO (Ptr b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Ptr b -> IO (Ptr b)) -> Ptr b -> IO (Ptr b)
forall a b. (a -> b) -> a -> b
$ Ptr a
dst Ptr a -> Int -> Ptr b
forall a b. Ptr a -> Int -> Ptr b
`plusPtr` Int
count
{-# INLINE _spliceArraysBuffered #-}
_spliceArraysBuffered :: (MonadIO m, Storable a)
=> SerialT m (Array a) -> m (Array a)
_spliceArraysBuffered :: SerialT m (Array a) -> m (Array a)
_spliceArraysBuffered SerialT m (Array a)
s = do
SerialT m (Array a)
buffered <- (Array a -> SerialT m (Array a) -> SerialT m (Array a))
-> SerialT m (Array a)
-> SerialT m (Array a)
-> m (SerialT m (Array a))
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a b.
(Monad m, IsStream t) =>
(a -> b -> b) -> b -> t m a -> m b
P.foldr Array a -> SerialT m (Array a) -> SerialT m (Array a)
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
S.cons SerialT m (Array a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
S.nil SerialT m (Array a)
s
Int
len <- SerialT m Int -> m Int
forall (m :: * -> *) a. (Monad m, Num a) => SerialT m a -> m a
S.sum ((Array a -> Int) -> SerialT m (Array a) -> SerialT m Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
S.map Array a -> Int
forall a. Storable a => Array a -> Int
length SerialT m (Array a)
buffered)
Int -> SerialT m (Array a) -> m (Array a)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> SerialT m (Array a) -> m (Array a)
spliceArraysLenUnsafe Int
len SerialT m (Array a)
s
{-# INLINE spliceArraysRealloced #-}
spliceArraysRealloced :: forall m a. (MonadIO m, Storable a)
=> SerialT m (Array a) -> m (Array a)
spliceArraysRealloced :: SerialT m (Array a) -> m (Array a)
spliceArraysRealloced SerialT m (Array a)
s = do
Array a
idst <- IO (Array a) -> m (Array a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Array a) -> m (Array a)) -> IO (Array a) -> m (Array a)
forall a b. (a -> b) -> a -> b
$ Int -> IO (Array a)
forall a. Storable a => Int -> IO (Array a)
A.newArray (a -> Int -> Int
forall a. Storable a => a -> Int -> Int
A.bytesToElemCount (a
forall a. HasCallStack => a
undefined :: a)
(Int -> Int
A.mkChunkSizeKB Int
4))
Array a
arr <- (Array a -> Array a -> m (Array a))
-> Array a -> SerialT m (Array a) -> m (Array a)
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> b -> SerialT m a -> m b
S.foldlM' Array a -> Array a -> m (Array a)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Array a -> Array a -> m (Array a)
A.spliceWithDoubling Array a
idst SerialT m (Array a)
s
IO (Array a) -> m (Array a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Array a) -> m (Array a)) -> IO (Array a) -> m (Array a)
forall a b. (a -> b) -> a -> b
$ Array a -> IO (Array a)
forall a. Storable a => Array a -> IO (Array a)
A.shrinkToFit Array a
arr
{-# INLINE toArray #-}
toArray :: (MonadIO m, Storable a) => SerialT m (Array a) -> m (Array a)
toArray :: SerialT m (Array a) -> m (Array a)
toArray = SerialT m (Array a) -> m (Array a)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
SerialT m (Array a) -> m (Array a)
spliceArraysRealloced