streamly-0.8.2: Dataflow programming and declarative concurrency
Copyright(c) 2020 Composewell Technologies
LicenseBSD3-3-Clause
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.Internal.Data.Array.Foreign.Mut.Type

Description

Mutable arrays and file system files are quite similar, they can grow and their content is mutable. Therefore, both have similar APIs as well. We strive to keep the API consistent for both. Ideally, you should be able to replace one with another with little changes to the code.

Synopsis

Type

We can use a Storable constraint in the Array type and the constraint can be automatically provided to a function that pattern matches on the Array type. However, it has huge performance cost, so we do not use it. Investigate a GHC improvement possiblity.

data Array a Source #

An unboxed, pinned mutable array. An array is created with a given length and capacity. Length is the number of valid elements in the array. Capacity is the maximum number of elements that the array can be expanded to without having to reallocate the memory.

The elements in the array can be mutated in-place without changing the reference (constructor). However, the length of the array cannot be mutated in-place. A new array reference is generated when the length changes. When the length is increased (upto the maximum reserved capacity of the array), the array is not reallocated and the new reference uses the same underlying memory as the old one.

Several routines in this module allow the programmer to control the capacity of the array. The programmer can control the trade-off between memory usage and performance impact due to reallocations when growing or shrinking the array.

Constructors

Array 

Fields

Instances

Instances details
NFData (Array a) Source # 
Instance details

Defined in Streamly.Internal.Data.Array.Foreign.Mut.Type

Methods

rnf :: Array a -> () #

Constructing and Writing

Construction

Uninitialized Arrays

newArray :: forall m a. (MonadIO m, Storable a) => Int -> m (Array a) Source #

Allocates an empty array that can hold count items. The memory of the array is uninitialized and the allocation is aligned as per the Storable instance of the type.

Pre-release

newArrayAligned :: (MonadIO m, Storable a) => Int -> Int -> m (Array a) Source #

Like newArrayWith but using an allocator that aligns the memory to the alignment dictated by the Storable instance of the type.

Internal

newArrayAlignedUnmanaged :: forall m a. (MonadIO m, Storable a) => Int -> Int -> m (Array a) Source #

Like newArrayWith but using an allocator that allocates unmanaged pinned memory. The memory will never be freed by GHC. This could be useful in allocate-once global data structures. Use carefully as incorrect use can lead to memory leak.

Internal

newArrayWith :: forall m a. (MonadIO m, Storable a) => (Int -> Int -> m (ArrayContents, Ptr a)) -> Int -> Int -> m (Array a) Source #

newArrayWith allocator alignment count allocates a new array of zero length and with a capacity to hold count elements, using allocator size alignment as the memory allocator function.

Alignment must be greater than or equal to machine word size and a power of 2.

Pre-release

Initialized Arrays

withNewArrayUnsafe :: (MonadIO m, Storable a) => Int -> (Ptr a -> m ()) -> m (Array a) Source #

Allocate an Array of the given size and run an IO action passing the array start pointer.

Internal

From streams

data ArrayUnsafe a Source #

Constructors

ArrayUnsafe !ArrayContents !(Ptr a) !(Ptr a) 

writeNWithUnsafe :: forall m a. (MonadIO m, Storable a) => (Int -> m (Array a)) -> Int -> Fold m a (Array a) Source #

Like writeNUnsafe but takes a new array allocator alloc size function as argument.

>>> writeNWithUnsafe alloc n = Array.appendNUnsafe (alloc n) n

Pre-release

writeNWith :: forall m a. (MonadIO m, Storable a) => (Int -> m (Array a)) -> Int -> Fold m a (Array a) Source #

writeNWith alloc n folds a maximum of n elements into an array allocated using the alloc function.

>>> writeNWith alloc n = Fold.take n (Array.writeNWithUnsafe alloc n)
>>> writeNWith alloc n = Array.appendN (alloc n) n

writeNUnsafe :: forall m a. (MonadIO m, Storable a) => Int -> Fold m a (Array a) Source #

Like writeN but does not check the array bounds when writing. The fold driver must not call the step function more than n times otherwise it will corrupt the memory and crash. This function exists mainly because any conditional in the step function blocks fusion causing 10x performance slowdown.

>>> writeNUnsafe = Array.writeNWithUnsafe Array.newArray

Since: 0.7.0

writeN :: forall m a. (MonadIO m, Storable a) => Int -> Fold m a (Array a) Source #

writeN n folds a maximum of n elements from the input stream to an Array.

>>> writeN = Array.writeNWith Array.newArray
>>> writeN n = Fold.take n (Array.writeNUnsafe n)
>>> writeN n = Array.appendN (Array.newArray n) n

Since: 0.7.0

writeNAligned :: forall m a. (MonadIO m, Storable a) => Int -> Int -> Fold m a (Array a) Source #

writeNAligned align n folds a maximum of n elements from the input stream to an Array aligned to the given size.

>>> writeNAligned align = Array.writeNWith (Array.newArrayAligned align)
>>> writeNAligned align n = Array.appendN (Array.newArrayAligned align n) n

Pre-release

writeNAlignedUnmanaged :: forall m a. (MonadIO m, Storable a) => Int -> Int -> Fold m a (Array a) Source #

writeNAlignedUnmanaged align n folds a maximum of n elements from the input stream to an Array whose starting address is aligned to align bytes and is allocated using unmanaged memory (never freed). This could be useful to allocate memory that we need to allocate only once in the lifetime of the program.

>>> f = Array.newArrayAlignedUnmanaged
>>> writeNAlignedUnmanaged a = Array.writeNWith (f a)
>>> writeNAlignedUnmanaged a n = Array.appendN (f a n) n

Pre-release

writeWith :: forall m a. (MonadIO m, Storable a) => Int -> Fold m a (Array a) Source #

writeWith minCount folds the whole input to a single array. The array starts at a size big enough to hold minCount elements, the size is doubled every time the array needs to be grown.

Caution! Do not use this on infinite streams.

>>> f n = Array.appendWith (* 2) (Array.newArray n)
>>> writeWith n = Fold.rmapM Array.rightSize (f n)
>>> writeWith n = Fold.rmapM Array.fromArrayStreamK (Array.writeChunks n)

Pre-release

write :: forall m a. (MonadIO m, Storable a) => Fold m a (Array a) Source #

Fold the whole input to a single array.

Same as writeWith using an initial array size of arrayChunkBytes bytes rounded up to the element size.

Caution! Do not use this on infinite streams.

Since: 0.7.0

From containers

fromForeignPtrUnsafe :: ForeignPtr a -> Ptr a -> Ptr a -> Array a Source #

fromForeignPtrUnsafe foreignPtr end bound creates an Array that starts at the memory pointed by the foreignPtr, end is the first unused address, and bound is the first address beyond the allocated memory.

Unsafe: Make sure that foreignPtr <= end <= bound and (end - start) is an integral multiple of the element size. Only PlainPtr type ForeignPtr is supported.

Pre-release

fromListN :: (MonadIO m, Storable a) => Int -> [a] -> m (Array a) Source #

Create an Array from the first N elements of a list. The array is allocated to size N, if the list terminates before N elements then the array may hold less than N elements.

Since: 0.7.0

fromList :: (MonadIO m, Storable a) => [a] -> m (Array a) Source #

Create an Array from a list. The list must be of finite size.

Since: 0.7.0

fromStreamDN :: forall m a. (MonadIO m, Storable a) => Int -> Stream m a -> m (Array a) Source #

Use the writeN fold instead.

>>> fromStreamDN n = StreamD.fold (Array.writeN n)

fromStreamD :: (MonadIO m, Storable a) => Stream m a -> m (Array a) Source #

We could take the approach of doubling the memory allocation on each overflow. This would result in more or less the same amount of copying as in the chunking approach. However, if we have to shrink in the end then it may result in an extra copy of the entire data.

>>> fromStreamD = StreamD.fold Array.write

Random writes

putIndex :: (MonadIO m, Storable a) => Int -> a -> Array a -> m () Source #

O(1) Write the given element at the given index in the array. Performs in-place mutation of the array.

>>> putIndex arr ix val = Array.modifyIndex ix (const (val, ())) arr
>>> f = Array.putIndices
>>> putIndex ix val arr = Stream.fold (f arr) (Stream.fromPure (ix, val))

Pre-release

putIndexUnsafe :: forall m a. (MonadIO m, Storable a) => Int -> a -> Array a -> m () Source #

Write the given element to the given index of the array. Does not check if the index is out of bounds of the array.

Pre-release

putIndices :: forall m a. (MonadIO m, Storable a) => Array a -> Fold m (Int, a) () Source #

Write an input stream of (index, value) pairs to an array. Throws an error if any index is out of bounds.

Pre-release

modifyIndexUnsafe :: forall m a b. (MonadIO m, Storable a) => Int -> (a -> (a, b)) -> Array a -> m b Source #

Modify a given index of an array using a modifier function.

Pre-release

modifyIndex :: forall m a b. (MonadIO m, Storable a) => Int -> (a -> (a, b)) -> Array a -> m b Source #

Modify a given index of an array using a modifier function.

Pre-release

modifyIndices :: forall m a. (MonadIO m, Storable a) => (a -> a) -> Array a -> Fold m Int () Source #

Modify the array indices generated by the supplied stream.

Pre-release

modify :: forall m a. (MonadIO m, Storable a) => (a -> a) -> Array a -> m () Source #

Modify each element of an array using the supplied modifier function.

Pre-release

swapIndices :: forall m a. (MonadIO m, Storable a) => Int -> Int -> Array a -> m () Source #

Swap the elements at two indices.

Pre-release

unsafeSwapIndices :: forall m a. (MonadIO m, Storable a) => Int -> Int -> Array a -> m () Source #

Swap the elements at two indices without validating the indices.

Unsafe: This could result in memory corruption if indices are not valid.

Pre-release

Growing and Shrinking

Appending elements

snocWith :: forall m a. (MonadIO m, Storable a) => (Int -> Int) -> Array a -> a -> m (Array a) Source #

snocWith sizer arr elem mutates arr to append elem. The length of the array increases by 1.

If there is no reserved space available in arr it is reallocated to a size in bytes determined by the sizer oldSizeBytes function, where oldSizeBytes is the original size of the array in bytes.

If the new array size is more than largeObjectThreshold we automatically round it up to blockSize.

Note that the returned array may be a mutated version of the original array.

Pre-release

snoc :: forall m a. (MonadIO m, Storable a) => Array a -> a -> m (Array a) Source #

The array is mutated to append an additional element to it. If there is no reserved space available in the array then it is reallocated to double the original size.

This is useful to reduce allocations when appending unknown number of elements.

Note that the returned array may be a mutated version of the original array.

>>> snoc = Array.snocWith (* 2)

Performs O(n * log n) copies to grow, but is liberal with memory allocation.

Pre-release

snocLinear :: forall m a. (MonadIO m, Storable a) => Array a -> a -> m (Array a) Source #

The array is mutated to append an additional element to it. If there is no reserved space available in the array then it is reallocated to grow it by arrayChunkBytes rounded up to blockSize when the size becomes more than largeObjectThreshold.

Note that the returned array may be a mutated version of the original array.

Performs O(n^2) copies to grow but is thrifty on memory.

Pre-release

snocMay :: forall m a. (MonadIO m, Storable a) => Array a -> a -> m (Maybe (Array a)) Source #

Like snoc but does not reallocate when pre-allocated array capacity becomes full.

Internal

snocUnsafe :: forall m a. (MonadIO m, Storable a) => Array a -> a -> m (Array a) Source #

Really really unsafe, appends the element into the first array, may cause silent data corruption or if you are lucky a segfault if the first array does not have enough space to append the element.

Internal

Appending streams

appendNUnsafe :: forall m a. (MonadIO m, Storable a) => m (Array a) -> Int -> Fold m a (Array a) Source #

Append up to n input items to the supplied array.

Unsafe: Do not drive the fold beyond n elements, it will lead to memory corruption or segfault.

Any free space left in the array after appending n elements is lost.

Internal

appendN :: forall m a. (MonadIO m, Storable a) => m (Array a) -> Int -> Fold m a (Array a) Source #

Append n elements to an existing array. Any free space left in the array after appending n elements is lost.

>>> appendN initial n = Fold.take n (Array.appendNUnsafe initial n)

Pre-release

appendWith :: forall m a. (MonadIO m, Storable a) => (Int -> Int) -> m (Array a) -> Fold m a (Array a) Source #

appendWith realloc action mutates the array generated by action to append the input stream. If there is no reserved space available in the array it is reallocated to a size in bytes determined by realloc oldSize, where oldSize is the current size of the array in bytes.

Note that the returned array may be a mutated version of original array.

>>> appendWith sizer = Fold.foldlM' (Array.snocWith sizer)

Pre-release

append :: forall m a. (MonadIO m, Storable a) => m (Array a) -> Fold m a (Array a) Source #

append action mutates the array generated by action to append the input stream. If there is no reserved space available in the array it is reallocated to double the size.

Note that the returned array may be a mutated version of original array.

>>> append = Array.appendWith (* 2)

Pre-release

Eliminating and Reading

To streams

data ReadUState a Source #

Constructors

ReadUState !ArrayContents !(Ptr a) !(Ptr a) 

read :: forall m a. (MonadIO m, Storable a) => Unfold m (Array a) a Source #

Unfold an array into a stream.

Since: 0.7.0

readRev :: forall m a. (MonadIO m, Storable a) => Unfold m (Array a) a Source #

Unfold an array into a stream in reverse order.

Pre-release

To containers

toStreamD :: forall m a. (MonadIO m, Storable a) => Array a -> Stream m a Source #

Use the read unfold instead.

toStreamD = D.unfold read

We can try this if the unfold has any performance issues.

toStreamDRev :: forall m a. (MonadIO m, Storable a) => Array a -> Stream m a Source #

Use the readRev unfold instead.

toStreamDRev = D.unfold readRev

We can try this if the unfold has any perf issues.

toStreamK :: forall m a. (MonadIO m, Storable a) => Array a -> Stream m a Source #

toStreamKRev :: forall m a. (MonadIO m, Storable a) => Array a -> Stream m a Source #

toList :: forall m a. (MonadIO m, Storable a) => Array a -> m [a] Source #

Convert an Array into a list.

Since: 0.7.0

producer :: forall m a. (MonadIO m, Storable a) => Producer m (Array a) a Source #

Resumable unfold of an array.

Random reads

getIndex :: (MonadIO m, Storable a) => Int -> Array a -> m a Source #

O(1) Lookup the element at the given index. Index starts from 0.

getIndexUnsafe :: forall m a. (MonadIO m, Storable a) => Int -> Array a -> m a Source #

Return the element at the specified index without checking the bounds.

Unsafe because it does not check the bounds of the array.

getIndicesD :: (Monad m, Storable a) => (forall b. IO b -> m b) -> Stream m Int -> Unfold m (Array a) a Source #

Given an unfold that generates array indices, read the elements on those indices from the supplied Array. An error is thrown if an index is out of bounds.

Pre-release

getIndexRev :: (MonadIO m, Storable a) => Int -> Array a -> m a Source #

O(1) Lookup the element at the given index from the end of the array. Index starts from 0.

Slightly faster than computing the forward index and using getIndex.

Memory Management

blockSize :: Int Source #

The page or block size used by the GHC allocator. Allocator allocates at least a block and then allocates smaller allocations from within a block.

arrayChunkBytes :: Int Source #

The default chunk size by which the array creation routines increase the size of the array when the array is grown linearly.

allocBytesToElemCount :: Storable a => a -> Int -> Int Source #

Given a Storable type (unused first arg) and real allocation size (including overhead), return how many elements of that type will completely fit in it, returns at least 1.

realloc :: forall m a. (MonadIO m, Storable a) => Int -> Array a -> m (Array a) Source #

realloc newCapacity array reallocates the array to the specified capacity in bytes.

If the new size is less than the original array the array gets truncated. If the new size is not a multiple of array element size then it is rounded down to multiples of array size. If the new size is more than largeObjectThreshold then it is rounded up to the block size (4K).

resize :: forall m a. (MonadIO m, Storable a) => Int -> Array a -> m (Array a) Source #

resize newCapacity array changes the total capacity of the array so that it is enough to hold the specified number of elements. Nothing is done if the specified capacity is less than the length of the array.

If the capacity is more than largeObjectThreshold then it is rounded up to the block size (4K).

Pre-release

resizeExp :: forall m a. (MonadIO m, Storable a) => Int -> Array a -> m (Array a) Source #

Like resize but if the capacity is more than largeObjectThreshold then it is rounded up to the closest power of 2.

Pre-release

rightSize :: forall m a. (MonadIO m, Storable a) => Array a -> m (Array a) Source #

Resize the allocated memory to drop any reserved free space at the end of the array and reallocate it to reduce wastage.

Up to 25% wastage is allowed to avoid reallocations. If the capacity is more than largeObjectThreshold then free space up to the blockSize is retained.

Pre-release

Size

length :: forall a. Storable a => Array a -> Int Source #

O(1) Get the length of the array i.e. the number of elements in the array.

Note that byteLength is less expensive than this operation, as length involves a costly division operation.

Since: 0.7.0

byteLength :: Array a -> Int Source #

O(1) Get the byte length of the array.

Since: 0.7.0

byteCapacity :: Array a -> Int Source #

Get the total capacity of an array. An array may have space reserved beyond the current used length of the array.

Pre-release

bytesFree :: Array a -> Int Source #

The remaining capacity in the array for appending more elements without reallocation.

Pre-release

In-place Mutation Algorithms

reverse :: forall m a. (MonadIO m, Storable a) => Array a -> m () Source #

You may not need to reverse an array because you can consume it in reverse using readRev. To reverse large arrays you can read in reverse and write to another array. However, in-place reverse can be useful to take adavantage of cache locality and when you do not want to allocate additional memory.

Pre-release

permute :: Array a -> m Bool Source #

Generate the next permutation of the sequence, returns False if this is the last permutation.

Unimplemented

partitionBy :: forall m a. (MonadIO m, Storable a) => (a -> Bool) -> Array a -> m (Array a, Array a) Source #

Partition an array into two halves using a partitioning predicate. The first half retains values where the predicate is False and the second half retains values where the predicate is True.

Pre-release

shuffleBy :: (a -> a -> m Bool) -> Array a -> Array a -> m () Source #

Shuffle corresponding elements from two arrays using a shuffle function. If the shuffle function returns False then do nothing otherwise swap the elements. This can be used in a bottom up fold to shuffle or reorder the elements.

Unimplemented

divideBy :: Int -> (Array a -> m (Array a, Array a)) -> Array a -> m () Source #

divideBy level partition array performs a top down hierarchical recursive partitioning fold of items in the container using the given function as the partition function. Level indicates the level in the tree where the fold would stop.

This performs a quick sort if the partition function is 'partitionBy (< pivot)'.

Unimplemented

mergeBy :: Int -> (Array a -> Array a -> m ()) -> Array a -> m () Source #

mergeBy level merge array performs a pairwise bottom up fold recursively merging the pairs using the supplied merge function. Level indicates the level in the tree where the fold would stop.

This performs a random shuffle if the merge function is random. If we stop at level 0 and repeatedly apply the function then we can do a bubble sort.

Unimplemented

Casting

cast :: forall a b. Storable b => Array a -> Maybe (Array b) Source #

Cast an array having elements of type a into an array having elements of type b. The length of the array should be a multiple of the size of the target element otherwise Nothing is returned.

Pre-release

castUnsafe :: Array a -> Array b Source #

Cast an array having elements of type a into an array having elements of type b. The array size must be a multiple of the size of type b otherwise accessing the last element of the array may result into a crash or a random value.

Pre-release

asBytes :: Array a -> Array Word8 Source #

Cast an Array a into an Array Word8.

Pre-release

asPtrUnsafe :: MonadIO m => Array a -> (Ptr a -> m b) -> m b Source #

Use an Array a as Ptr a.

Unsafe

Pre-release

Folding

foldl' :: (MonadIO m, Storable a) => (b -> a -> b) -> b -> Array a -> m b Source #

Strict left fold of an array.

foldr :: (MonadIO m, Storable a) => (a -> b -> b) -> b -> Array a -> m b Source #

Right fold of an array.

cmp :: MonadIO m => Array a -> Array a -> m Bool Source #

Compare if two arrays are equal.

Pre-release

Arrays of arrays

Operations dealing with multiple arrays, streams of arrays or multidimensional array representations.

Construct from streams

arraysOf :: forall m a. (MonadIO m, Storable a) => Int -> Stream m a -> Stream m (Array a) Source #

arraysOf n stream groups the input stream into a stream of arrays of size n.

arraysOf n = StreamD.foldMany (Array.writeN n)

Pre-release

arrayStreamKFromStreamD :: forall m a. (MonadIO m, Storable a) => Stream m a -> m (Stream m (Array a)) Source #

Buffer the stream into arrays in memory.

writeChunks :: (MonadIO m, Storable a) => Int -> Fold m a (Stream n (Array a)) Source #

Buffer a stream into a stream of arrays.

>>> writeChunks n = Fold.many (Array.writeN n) Fold.toStreamK

Breaking an array into an array stream can be useful to consume a large array sequentially such that memory of the array is released incrementatlly.

See also: arrayStreamKFromStreamD.

Unimplemented

Eliminate to streams

flattenArrays :: forall m a. (MonadIO m, Storable a) => Stream m (Array a) -> Stream m a Source #

Use the "read" unfold instead.

flattenArrays = unfoldMany read

We can try this if there are any fusion issues in the unfold.

flattenArraysRev :: forall m a. (MonadIO m, Storable a) => Stream m (Array a) -> Stream m a Source #

Use the "readRev" unfold instead.

flattenArrays = unfoldMany readRev

We can try this if there are any fusion issues in the unfold.

fromArrayStreamK :: (Storable a, MonadIO m) => Stream m (Array a) -> m (Array a) Source #

Convert an array stream to an array. Note that this requires peak memory that is double the size of the array stream.

Construct from arrays

getSliceUnsafe Source #

Arguments

:: forall a. Storable a 
=> Int

from index

-> Int

length of the slice

-> Array a 
-> Array a 

O(1) Slice an array in constant time.

Unsafe: The bounds of the slice are not checked.

Unsafe

Pre-release

getSlice Source #

Arguments

:: forall a. Storable a 
=> Int

from index

-> Int

length of the slice

-> Array a 
-> Array a 

O(1) Slice an array in constant time. Throws an error if the slice extends out of the array bounds.

Pre-release

splitAt :: forall a. Storable a => Int -> Array a -> (Array a, Array a) Source #

Create two slices of an array without copying the original array. The specified index i is the first index of the second slice.

Since: 0.7.0

breakOn :: MonadIO m => Word8 -> Array Word8 -> m (Array Word8, Maybe (Array Word8)) Source #

Drops the separator byte

Appending arrays

spliceCopy :: (MonadIO m, Storable a) => Array a -> Array a -> m (Array a) Source #

Copy two arrays into a newly allocated array.

spliceWith :: forall m a. (MonadIO m, Storable a) => (Int -> Int -> Int) -> Array a -> Array a -> m (Array a) Source #

spliceWith sizer dst src mutates dst to append src. If there is no reserved space available in dst it is reallocated to a size determined by the sizer dstBytesn srcBytes function, where dstBytes is the size of the first array and srcBytes is the size of the second array, in bytes.

Note that the returned array may be a mutated version of first array.

Pre-release

splice :: (MonadIO m, Storable a) => Array a -> Array a -> m (Array a) Source #

The first array is mutated to append the second array. If there is no reserved space available in the first array a new allocation of exact required size is done.

Note that the returned array may be a mutated version of first array.

>>> splice = Array.spliceWith (+)

Pre-release

spliceExp :: (MonadIO m, Storable a) => Array a -> Array a -> m (Array a) Source #

Like append but the growth of the array is exponential. Whenever a new allocation is required the previous array size is at least doubled.

This is useful to reduce allocations when folding many arrays together.

Note that the returned array may be a mutated version of first array.

>>> spliceExp = Array.spliceWith (\l1 l2 -> max (l1 * 2) (l1 + l2))

Pre-release

Utilities

memcpy :: Ptr Word8 -> Ptr Word8 -> Int -> IO () Source #