{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StrictData #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
module Data.Avro.Internal.Container
where
import Control.Monad (when)
import qualified Data.Aeson as Aeson
import Data.Avro.Codec (Codec (..), Decompress)
import qualified Data.Avro.Codec as Codec
import Data.Avro.Encoding.ToAvro (toAvro)
import Data.Avro.Internal.EncodeRaw (encodeRaw)
import Data.Avro.Schema.Schema (Schema)
import qualified Data.Avro.Schema.Schema as Schema
import Data.Binary.Get (Get)
import qualified Data.Binary.Get as Get
import Data.ByteString (ByteString)
import qualified Data.ByteString as B
import Data.ByteString.Builder (Builder, lazyByteString, toLazyByteString)
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Lazy.Char8 as BLC
import Data.Either (isRight)
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HashMap
import Data.Int (Int32, Int64)
import Data.List (foldl', unfoldr)
import qualified Data.Map.Strict as Map
import Data.Text (Text)
import System.Random.TF.Init (initTFGen)
import System.Random.TF.Instances (randoms)
import qualified Data.Avro.Internal.Get as AGet
data =
{ ContainerHeader -> ByteString
syncBytes :: BL.ByteString
, ContainerHeader -> forall a. Decompress a
decompress :: forall a. Decompress a
, ContainerHeader -> Schema
containedSchema :: Schema
}
nrSyncBytes :: Integral sb => sb
nrSyncBytes :: sb
nrSyncBytes = sb
16
{-# INLINE nrSyncBytes #-}
newSyncBytes :: IO BL.ByteString
newSyncBytes :: IO ByteString
newSyncBytes = [Word8] -> ByteString
BL.pack ([Word8] -> ByteString)
-> (TFGen -> [Word8]) -> TFGen -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
take Int
forall sb. Integral sb => sb
nrSyncBytes ([Word8] -> [Word8]) -> (TFGen -> [Word8]) -> TFGen -> [Word8]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TFGen -> [Word8]
forall a g. (Random a, RandomGen g) => g -> [a]
randoms (TFGen -> ByteString) -> IO TFGen -> IO ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO TFGen
initTFGen
getContainerHeader :: Get ContainerHeader
= do
ByteString
magic <- Int -> Get ByteString
getFixed Int
forall sb. Integral sb => sb
avroMagicSize
Bool -> Get () -> Get ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteString -> ByteString
BL.fromStrict ByteString
magic ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
/= ByteString
avroMagicBytes)
(String -> Get ()
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"Invalid magic number at start of container.")
Map Text ByteString
metadata <- Get (Map Text ByteString)
getMeta
ByteString
sync <- ByteString -> ByteString
BL.fromStrict (ByteString -> ByteString) -> Get ByteString -> Get ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> Get ByteString
getFixed Int
forall sb. Integral sb => sb
nrSyncBytes
Codec
codec <- Maybe ByteString -> Get Codec
forall (m :: * -> *). Monad m => Maybe ByteString -> m Codec
parseCodec (Text -> Map Text ByteString -> Maybe ByteString
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Text
"avro.codec" Map Text ByteString
metadata)
Schema
schema <- case Text -> Map Text ByteString -> Maybe ByteString
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Text
"avro.schema" Map Text ByteString
metadata of
Maybe ByteString
Nothing -> String -> Get Schema
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"Invalid container object: no schema."
Just ByteString
s -> case ByteString -> Either String Schema
forall a. FromJSON a => ByteString -> Either String a
Aeson.eitherDecode' ByteString
s of
Left String
e -> String -> Get Schema
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String
"Can not decode container schema: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
e)
Right Schema
x -> Schema -> Get Schema
forall (m :: * -> *) a. Monad m => a -> m a
return Schema
x
ContainerHeader -> Get ContainerHeader
forall (m :: * -> *) a. Monad m => a -> m a
return ContainerHeader :: ByteString -> (forall a. Decompress a) -> Schema -> ContainerHeader
ContainerHeader { syncBytes :: ByteString
syncBytes = ByteString
sync
, decompress :: forall a. Decompress a
decompress = Codec -> forall a. Decompress a
Codec.codecDecompress Codec
codec
, containedSchema :: Schema
containedSchema = Schema
schema
}
where avroMagicSize :: Integral a => a
avroMagicSize :: a
avroMagicSize = a
4
avroMagicBytes :: BL.ByteString
avroMagicBytes :: ByteString
avroMagicBytes = String -> ByteString
BLC.pack String
"Obj" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> [Word8] -> ByteString
BL.pack [Word8
1]
getFixed :: Int -> Get ByteString
getFixed :: Int -> Get ByteString
getFixed = Int -> Get ByteString
Get.getByteString
getMeta :: Get (Map.Map Text BL.ByteString)
getMeta :: Get (Map Text ByteString)
getMeta =
let keyValue :: Get (Text, ByteString)
keyValue = (,) (Text -> ByteString -> (Text, ByteString))
-> Get Text -> Get (ByteString -> (Text, ByteString))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get Text
AGet.getString Get (ByteString -> (Text, ByteString))
-> Get ByteString -> Get (Text, ByteString)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Get ByteString
AGet.getBytesLazy
in [(Text, ByteString)] -> Map Text ByteString
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList ([(Text, ByteString)] -> Map Text ByteString)
-> Get [(Text, ByteString)] -> Get (Map Text ByteString)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get (Text, ByteString) -> Get [(Text, ByteString)]
forall a. Get a -> Get [a]
AGet.decodeBlocks Get (Text, ByteString)
keyValue
decodeRawBlocks :: BL.ByteString -> Either String (Schema, [Either String (Int, BL.ByteString)])
decodeRawBlocks :: ByteString
-> Either String (Schema, [Either String (Int, ByteString)])
decodeRawBlocks ByteString
bs =
case Get ContainerHeader
-> ByteString
-> Either
(ByteString, ByteOffset, String)
(ByteString, ByteOffset, ContainerHeader)
forall a.
Get a
-> ByteString
-> Either
(ByteString, ByteOffset, String) (ByteString, ByteOffset, a)
Get.runGetOrFail Get ContainerHeader
getContainerHeader ByteString
bs of
Left (ByteString
bs', ByteOffset
_, String
err) -> String -> Either String (Schema, [Either String (Int, ByteString)])
forall a b. a -> Either a b
Left String
err
Right (ByteString
bs', ByteOffset
_, containerHeader :: ContainerHeader
containerHeader@ContainerHeader {ByteString
Schema
forall a. Decompress a
containedSchema :: Schema
decompress :: forall a. Decompress a
syncBytes :: ByteString
containedSchema :: ContainerHeader -> Schema
decompress :: ContainerHeader -> forall a. Decompress a
syncBytes :: ContainerHeader -> ByteString
..}) ->
let blocks :: [Either String (Int, ByteString)]
blocks = ContainerHeader -> ByteString -> [Either String (Int, ByteString)]
allBlocks ContainerHeader
containerHeader ByteString
bs'
in (Schema, [Either String (Int, ByteString)])
-> Either String (Schema, [Either String (Int, ByteString)])
forall a b. b -> Either a b
Right (Schema
containedSchema, [Either String (Int, ByteString)]
blocks)
where
allBlocks :: ContainerHeader -> ByteString -> [Either String (Int, ByteString)]
allBlocks ContainerHeader
containerHeader ByteString
bytes =
((Int, ByteString)
-> [Either String (Int, ByteString)]
-> [Either String (Int, ByteString)])
-> (String -> [Either String (Int, ByteString)])
-> [Either String (Int, ByteString)]
-> ByteString
-> Blocks (Int, ByteString)
-> [Either String (Int, ByteString)]
forall a b.
(a -> b -> b) -> (String -> b) -> b -> ByteString -> Blocks a -> b
foldrBlocks (\(Int, ByteString)
x -> ((Int, ByteString) -> Either String (Int, ByteString)
forall a b. b -> Either a b
Right (Int, ByteString)
x Either String (Int, ByteString)
-> [Either String (Int, ByteString)]
-> [Either String (Int, ByteString)]
forall a. a -> [a] -> [a]
:)) (\String
err -> [String -> Either String (Int, ByteString)
forall a b. a -> Either a b
Left String
err]) [] ByteString
bytes
(ContainerHeader -> Blocks (Int, ByteString)
decodeRawBlocksIncremental ContainerHeader
containerHeader)
data Blocks a
= Block
a
(Blocks a)
| More
(ByteString -> Blocks a)
| Error
String
ByteString
| Done
ByteString
deriving (a -> Blocks b -> Blocks a
(a -> b) -> Blocks a -> Blocks b
(forall a b. (a -> b) -> Blocks a -> Blocks b)
-> (forall a b. a -> Blocks b -> Blocks a) -> Functor Blocks
forall a b. a -> Blocks b -> Blocks a
forall a b. (a -> b) -> Blocks a -> Blocks b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> Blocks b -> Blocks a
$c<$ :: forall a b. a -> Blocks b -> Blocks a
fmap :: (a -> b) -> Blocks a -> Blocks b
$cfmap :: forall a b. (a -> b) -> Blocks a -> Blocks b
Functor)
foldrBlocks :: (a -> b -> b) -> (String -> b) -> b -> BL.ByteString -> Blocks a -> b
foldrBlocks :: (a -> b -> b) -> (String -> b) -> b -> ByteString -> Blocks a -> b
foldrBlocks a -> b -> b
block String -> b
err b
done ByteString
input = [ByteString] -> Blocks a -> b
go (ByteString -> [ByteString]
BL.toChunks ByteString
input)
where
go :: [ByteString] -> Blocks a -> b
go [ByteString]
chunks (Block a
a Blocks a
rest) = a -> b -> b
block a
a ([ByteString] -> Blocks a -> b
go [ByteString]
chunks Blocks a
rest)
go [] (More ByteString -> Blocks a
cont) = [ByteString] -> Blocks a -> b
go [] (ByteString -> Blocks a
cont ByteString
"")
go (ByteString
c:[ByteString]
cx) (More ByteString -> Blocks a
cont) = [ByteString] -> Blocks a -> b
go [ByteString]
cx (ByteString -> Blocks a
cont ByteString
c)
go [ByteString]
_ (Error String
message ByteString
_) = String -> b
err String
message
go [ByteString]
_ (Done ByteString
_) = b
done
decodeRawBlocksIncremental :: ContainerHeader -> Blocks (Int, BL.ByteString)
decodeRawBlocksIncremental :: ContainerHeader -> Blocks (Int, ByteString)
decodeRawBlocksIncremental ContainerHeader{ByteString
Schema
forall a. Decompress a
containedSchema :: Schema
decompress :: forall a. Decompress a
syncBytes :: ByteString
containedSchema :: ContainerHeader -> Schema
decompress :: ContainerHeader -> forall a. Decompress a
syncBytes :: ContainerHeader -> ByteString
..} = Blocks (Int, ByteString)
initial
where
initialDecoder :: Decoder (Int, ByteString)
initialDecoder =
Get (Int, ByteString) -> Decoder (Int, ByteString)
forall a. Get a -> Decoder a
Get.runGetIncremental Get (Int, ByteString)
getRawBlock
initial :: Blocks (Int, ByteString)
initial = (ByteString -> Blocks (Int, ByteString))
-> Blocks (Int, ByteString)
forall a. (ByteString -> Blocks a) -> Blocks a
More ((ByteString -> Blocks (Int, ByteString))
-> Blocks (Int, ByteString))
-> (ByteString -> Blocks (Int, ByteString))
-> Blocks (Int, ByteString)
forall a b. (a -> b) -> a -> b
$ \ByteString
input ->
case ByteString
input of
ByteString
"" -> ByteString -> Blocks (Int, ByteString)
forall a. ByteString -> Blocks a
Done ByteString
""
ByteString
_ -> Decoder (Int, ByteString) -> Blocks (Int, ByteString)
go (Decoder (Int, ByteString)
-> ByteString -> Decoder (Int, ByteString)
forall a. Decoder a -> ByteString -> Decoder a
Get.pushChunk Decoder (Int, ByteString)
initialDecoder ByteString
input)
go :: Decoder (Int, ByteString) -> Blocks (Int, ByteString)
go Decoder (Int, ByteString)
decoder = case Decoder (Int, ByteString)
decoder of
Get.Done ByteString
rest ByteOffset
_ !(Int, ByteString)
block ->
case ByteString
rest of
ByteString
"" -> (Int, ByteString)
-> Blocks (Int, ByteString) -> Blocks (Int, ByteString)
forall a. a -> Blocks a -> Blocks a
Block (Int, ByteString)
block Blocks (Int, ByteString)
initial
ByteString
_ -> (Int, ByteString)
-> Blocks (Int, ByteString) -> Blocks (Int, ByteString)
forall a. a -> Blocks a -> Blocks a
Block (Int, ByteString)
block (Decoder (Int, ByteString) -> Blocks (Int, ByteString)
go (Decoder (Int, ByteString)
-> ByteString -> Decoder (Int, ByteString)
forall a. Decoder a -> ByteString -> Decoder a
Get.pushChunk Decoder (Int, ByteString)
initialDecoder ByteString
rest))
Get.Fail ByteString
rest ByteOffset
_ String
err ->
String -> ByteString -> Blocks (Int, ByteString)
forall a. String -> ByteString -> Blocks a
Error String
err ByteString
rest
Get.Partial{} -> (ByteString -> Blocks (Int, ByteString))
-> Blocks (Int, ByteString)
forall a. (ByteString -> Blocks a) -> Blocks a
More ((ByteString -> Blocks (Int, ByteString))
-> Blocks (Int, ByteString))
-> (ByteString -> Blocks (Int, ByteString))
-> Blocks (Int, ByteString)
forall a b. (a -> b) -> a -> b
$ \ByteString
input ->
case ByteString
input of
ByteString
"" -> Decoder (Int, ByteString) -> Blocks (Int, ByteString)
go (Decoder (Int, ByteString) -> Decoder (Int, ByteString)
forall a. Decoder a -> Decoder a
Get.pushEndOfInput Decoder (Int, ByteString)
decoder)
ByteString
_ -> Decoder (Int, ByteString) -> Blocks (Int, ByteString)
go (Decoder (Int, ByteString)
-> ByteString -> Decoder (Int, ByteString)
forall a. Decoder a -> ByteString -> Decoder a
Get.pushChunk Decoder (Int, ByteString)
decoder ByteString
input)
getRawBlock :: Get (Int, ByteString)
getRawBlock = do
Int
nrObj <- Get ByteOffset
AGet.getLong Get ByteOffset -> (ByteOffset -> Get Int) -> Get Int
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ByteOffset -> Get Int
forall a b (m :: * -> *).
(Monad m, Bounded a, Bounded b, Integral a, Integral b) =>
a -> m b
AGet.sFromIntegral
ByteOffset
nrBytes <- Get ByteOffset
AGet.getLong
ByteString
compressed <- ByteOffset -> Get ByteString
Get.getLazyByteString ByteOffset
nrBytes
ByteString
bytes <- case Decompress ByteString
forall a. Decompress a
decompress ByteString
compressed Get ByteString
Get.getRemainingLazyByteString of
Right ByteString
x -> ByteString -> Get ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
x
Left String
err -> String -> Get ByteString
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
err
ByteString
trailer <- ByteOffset -> Get ByteString
Get.getLazyByteString ByteOffset
forall sb. Integral sb => sb
nrSyncBytes
if ByteString
trailer ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
/= ByteString
syncBytes then
String -> Get (Int, ByteString)
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"Invalid marker, does not match sync bytes."
else
(Int, ByteString) -> Get (Int, ByteString)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int
nrObj, ByteString
bytes)
extractContainerValuesBytes :: forall a schema.
(Schema -> Either String schema)
-> (schema -> Get a)
-> BL.ByteString
-> Either String (Schema, [Either String (a, BL.ByteString)])
Schema -> Either String schema
deconflict schema -> Get a
f =
(Schema -> Either String schema)
-> (schema -> Get (a, ByteString))
-> ByteString
-> Either String (Schema, [Either String (a, ByteString)])
forall a schema.
(Schema -> Either String schema)
-> (schema -> Get a)
-> ByteString
-> Either String (Schema, [Either String a])
extractContainerValues Schema -> Either String schema
deconflict schema -> Get (a, ByteString)
readBytes
where
readBytes :: schema -> Get (a, ByteString)
readBytes schema
sch = do
ByteOffset
start <- Get ByteOffset
Get.bytesRead
(a
val, ByteOffset
end) <- Get (a, ByteOffset) -> Get (a, ByteOffset)
forall a. Get a -> Get a
Get.lookAhead (schema -> Get a
f schema
sch Get a -> (a -> Get (a, ByteOffset)) -> Get (a, ByteOffset)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (\a
v -> (a
v, ) (ByteOffset -> (a, ByteOffset))
-> Get ByteOffset -> Get (a, ByteOffset)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get ByteOffset
Get.bytesRead))
ByteString
res <- ByteOffset -> Get ByteString
Get.getLazyByteString (ByteOffset
endByteOffset -> ByteOffset -> ByteOffset
forall a. Num a => a -> a -> a
-ByteOffset
start)
(a, ByteString) -> Get (a, ByteString)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
val, ByteString
res)
extractContainerValues :: forall a schema.
(Schema -> Either String schema)
-> (schema -> Get a)
-> BL.ByteString
-> Either String (Schema, [Either String a])
Schema -> Either String schema
deconflict schema -> Get a
f ByteString
bs = do
case Get ContainerHeader
-> ByteString
-> Either
(ByteString, ByteOffset, String)
(ByteString, ByteOffset, ContainerHeader)
forall a.
Get a
-> ByteString
-> Either
(ByteString, ByteOffset, String) (ByteString, ByteOffset, a)
Get.runGetOrFail Get ContainerHeader
getContainerHeader ByteString
bs of
Left (ByteString
_, ByteOffset
_, String
err) -> String -> Either String (Schema, [Either String a])
forall a b. a -> Either a b
Left String
err
Right (ByteString
rest, ByteOffset
_, ContainerHeader
containerHeader) -> do
(Schema
schema, Blocks [Either String a]
blocks) <- (Schema -> Either String schema)
-> (schema -> Get a)
-> ContainerHeader
-> Either String (Schema, Blocks [Either String a])
forall schema a.
(Schema -> Either String schema)
-> (schema -> Get a)
-> ContainerHeader
-> Either String (Schema, Blocks [Either String a])
extractContainerValuesIncremental Schema -> Either String schema
deconflict schema -> Get a
f ContainerHeader
containerHeader
let values :: [Either String a]
values = ([Either String a] -> [Either String a] -> [Either String a])
-> (String -> [Either String a])
-> [Either String a]
-> ByteString
-> Blocks [Either String a]
-> [Either String a]
forall a b.
(a -> b -> b) -> (String -> b) -> b -> ByteString -> Blocks a -> b
foldrBlocks [Either String a] -> [Either String a] -> [Either String a]
forall a. [a] -> [a] -> [a]
(++) (\String
err -> [String -> Either String a
forall a b. a -> Either a b
Left String
err]) [] ByteString
rest Blocks [Either String a]
blocks
(Schema, [Either String a])
-> Either String (Schema, [Either String a])
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Schema
schema, [Either String a]
values)
extractContainerValuesIncremental
:: (Schema -> Either String schema)
-> (schema -> Get a)
-> ContainerHeader
-> Either String (Schema, Blocks [Either String a])
Schema -> Either String schema
deconflict schema -> Get a
getValue containerHeader :: ContainerHeader
containerHeader@ContainerHeader{ByteString
Schema
forall a. Decompress a
containedSchema :: Schema
decompress :: forall a. Decompress a
syncBytes :: ByteString
containedSchema :: ContainerHeader -> Schema
decompress :: ContainerHeader -> forall a. Decompress a
syncBytes :: ContainerHeader -> ByteString
..} = do
schema
readSchema <- Schema -> Either String schema
deconflict Schema
containedSchema
let blocks :: Blocks (Int, ByteString)
blocks = ContainerHeader -> Blocks (Int, ByteString)
decodeRawBlocksIncremental ContainerHeader
containerHeader
(Schema, Blocks [Either String a])
-> Either String (Schema, Blocks [Either String a])
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Schema
containedSchema, ((Int, ByteString) -> [Either String a])
-> Blocks (Int, ByteString) -> Blocks [Either String a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (schema -> (Int, ByteString) -> [Either String a]
forall a.
Integral a =>
schema -> (a, ByteString) -> [Either String a]
decodeBlock schema
readSchema) Blocks (Int, ByteString)
blocks)
where
decodeBlock :: schema -> (a, ByteString) -> [Either String a]
decodeBlock schema
readSchema (a
nrObj, ByteString
bytes) =
(ByteString, [Either String a]) -> [Either String a]
forall a b. (a, b) -> b
snd ((ByteString, [Either String a]) -> [Either String a])
-> (ByteString, [Either String a]) -> [Either String a]
forall a b. (a -> b) -> a -> b
$ ByteOffset
-> (ByteString -> (ByteString, Either String a))
-> ByteString
-> (ByteString, [Either String a])
forall a b. ByteOffset -> (a -> (a, b)) -> a -> (a, [b])
consumeN (a -> ByteOffset
forall a b. (Integral a, Num b) => a -> b
fromIntegral a
nrObj) (schema -> ByteString -> (ByteString, Either String a)
decodeValue schema
readSchema) ByteString
bytes
decodeValue :: schema -> ByteString -> (ByteString, Either String a)
decodeValue schema
readSchema ByteString
bytes =
case Get a
-> ByteString
-> Either
(ByteString, ByteOffset, String) (ByteString, ByteOffset, a)
forall a.
Get a
-> ByteString
-> Either
(ByteString, ByteOffset, String) (ByteString, ByteOffset, a)
Get.runGetOrFail (schema -> Get a
getValue schema
readSchema) ByteString
bytes of
Left (ByteString
rest, ByteOffset
_, String
err) -> (ByteString
rest, String -> Either String a
forall a b. a -> Either a b
Left String
err)
Right (ByteString
rest, ByteOffset
_, a
value) -> (ByteString
rest, a -> Either String a
forall a b. b -> Either a b
Right a
value)
packContainerValues :: Codec -> Schema -> [[BL.ByteString]] -> IO BL.ByteString
packContainerValues :: Codec -> Schema -> [[ByteString]] -> IO ByteString
packContainerValues Codec
codec Schema
sch [[ByteString]]
values = do
ByteString
sync <- IO ByteString
newSyncBytes
ByteString -> IO ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> IO ByteString) -> ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ Codec -> Schema -> ByteString -> [[ByteString]] -> ByteString
packContainerValuesWithSync Codec
codec Schema
sch ByteString
sync [[ByteString]]
values
packContainerValuesWithSync :: Codec -> Schema -> BL.ByteString -> [[BL.ByteString]] -> BL.ByteString
packContainerValuesWithSync :: Codec -> Schema -> ByteString -> [[ByteString]] -> ByteString
packContainerValuesWithSync = (Schema -> ByteString -> Builder)
-> Codec -> Schema -> ByteString -> [[ByteString]] -> ByteString
forall a.
(Schema -> a -> Builder)
-> Codec -> Schema -> ByteString -> [[a]] -> ByteString
packContainerValuesWithSync' (\Schema
_ ByteString
a -> ByteString -> Builder
lazyByteString ByteString
a)
{-# INLINABLE packContainerValuesWithSync #-}
packContainerValuesWithSync' ::
(Schema -> a -> Builder)
-> Codec
-> Schema
-> BL.ByteString
-> [[a]]
-> BL.ByteString
packContainerValuesWithSync' :: (Schema -> a -> Builder)
-> Codec -> Schema -> ByteString -> [[a]] -> ByteString
packContainerValuesWithSync' Schema -> a -> Builder
encode Codec
codec Schema
sch ByteString
syncBytes [[a]]
values =
Builder -> ByteString
toLazyByteString (Builder -> ByteString) -> Builder -> ByteString
forall a b. (a -> b) -> a -> b
$ Codec -> Schema -> ByteString -> Builder
containerHeaderWithSync Codec
codec Schema
sch ByteString
syncBytes Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> ([a] -> Builder) -> [[a]] -> Builder
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap [a] -> Builder
forall (t :: * -> *). Foldable t => t a -> Builder
putBlock [[a]]
values
where
putBlock :: t a -> Builder
putBlock t a
ys =
let nrObj :: Int
nrObj = t a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length t a
ys
nrBytes :: ByteOffset
nrBytes = ByteString -> ByteOffset
BL.length ByteString
theBytes
theBytes :: ByteString
theBytes = Codec -> ByteString -> ByteString
codecCompress Codec
codec (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ Builder -> ByteString
toLazyByteString (Builder -> ByteString) -> Builder -> ByteString
forall a b. (a -> b) -> a -> b
$ (a -> Builder) -> t a -> Builder
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap (Schema -> a -> Builder
encode Schema
sch) t a
ys
in Int32 -> Builder
forall a. EncodeRaw a => a -> Builder
encodeRaw @Int32 (Int -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
nrObj) Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
ByteOffset -> Builder
forall a. EncodeRaw a => a -> Builder
encodeRaw ByteOffset
nrBytes Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
ByteString -> Builder
lazyByteString ByteString
theBytes Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
ByteString -> Builder
lazyByteString ByteString
syncBytes
packContainerBlocks :: Codec -> Schema -> [(Int, BL.ByteString)] -> IO BL.ByteString
packContainerBlocks :: Codec -> Schema -> [(Int, ByteString)] -> IO ByteString
packContainerBlocks Codec
codec Schema
sch [(Int, ByteString)]
blocks = do
ByteString
sync <- IO ByteString
newSyncBytes
ByteString -> IO ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> IO ByteString) -> ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ Codec -> Schema -> ByteString -> [(Int, ByteString)] -> ByteString
packContainerBlocksWithSync Codec
codec Schema
sch ByteString
sync [(Int, ByteString)]
blocks
packContainerBlocksWithSync :: Codec -> Schema -> BL.ByteString -> [(Int, BL.ByteString)] -> BL.ByteString
packContainerBlocksWithSync :: Codec -> Schema -> ByteString -> [(Int, ByteString)] -> ByteString
packContainerBlocksWithSync Codec
codec Schema
sch ByteString
syncBytes [(Int, ByteString)]
blocks =
Builder -> ByteString
toLazyByteString (Builder -> ByteString) -> Builder -> ByteString
forall a b. (a -> b) -> a -> b
$
Codec -> Schema -> ByteString -> Builder
containerHeaderWithSync Codec
codec Schema
sch ByteString
syncBytes Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
((Int, ByteString) -> Builder) -> [(Int, ByteString)] -> Builder
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap (Int, ByteString) -> Builder
forall a. Integral a => (a, ByteString) -> Builder
putBlock [(Int, ByteString)]
blocks
where
putBlock :: (a, ByteString) -> Builder
putBlock (a
nrObj, ByteString
bytes) =
let compressed :: ByteString
compressed = Codec -> ByteString -> ByteString
codecCompress Codec
codec ByteString
bytes in
Int32 -> Builder
forall a. EncodeRaw a => a -> Builder
encodeRaw @Int32 (a -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral a
nrObj) Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
ByteOffset -> Builder
forall a. EncodeRaw a => a -> Builder
encodeRaw (ByteString -> ByteOffset
BL.length ByteString
compressed) Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
ByteString -> Builder
lazyByteString ByteString
compressed Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
ByteString -> Builder
lazyByteString ByteString
syncBytes
containerHeaderWithSync :: Codec -> Schema -> BL.ByteString -> Builder
Codec
codec Schema
sch ByteString
syncBytes =
ByteString -> Builder
lazyByteString ByteString
avroMagicBytes
Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Schema -> HashMap Text ByteString -> Builder
forall a. ToAvro a => Schema -> a -> Builder
toAvro (Schema -> Schema
Schema.Map Schema
Schema.Bytes') HashMap Text ByteString
headers
Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> ByteString -> Builder
lazyByteString ByteString
syncBytes
where
avroMagicBytes :: BL.ByteString
avroMagicBytes :: ByteString
avroMagicBytes = ByteString
"Obj" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> [Word8] -> ByteString
BL.pack [Word8
1]
headers :: HashMap Text BL.ByteString
headers :: HashMap Text ByteString
headers =
[(Text, ByteString)] -> HashMap Text ByteString
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HashMap.fromList
[
(Text
"avro.schema", Schema -> ByteString
forall a. ToJSON a => a -> ByteString
Aeson.encode Schema
sch)
, (Text
"avro.codec", ByteString -> ByteString
BL.fromStrict (Codec -> ByteString
codecName Codec
codec))
]
consumeN :: Int64 -> (a -> (a, b)) -> a -> (a, [b])
consumeN :: ByteOffset -> (a -> (a, b)) -> a -> (a, [b])
consumeN ByteOffset
n a -> (a, b)
f a
a =
if ByteOffset
n ByteOffset -> ByteOffset -> Bool
forall a. Eq a => a -> a -> Bool
== ByteOffset
0
then (a
a, [])
else
let (a
a', b
b) = a -> (a, b)
f a
a
(a
r, [b]
bs) = ByteOffset -> (a -> (a, b)) -> a -> (a, [b])
forall a b. ByteOffset -> (a -> (a, b)) -> a -> (a, [b])
consumeN (ByteOffset
nByteOffset -> ByteOffset -> ByteOffset
forall a. Num a => a -> a -> a
-ByteOffset
1) a -> (a, b)
f a
a'
in (a
r, b
bb -> [b] -> [b]
forall a. a -> [a] -> [a]
:[b]
bs)
{-# INLINE consumeN #-}
parseCodec :: Monad m => Maybe BL.ByteString -> m Codec
parseCodec :: Maybe ByteString -> m Codec
parseCodec (Just ByteString
"null") = Codec -> m Codec
forall (f :: * -> *) a. Applicative f => a -> f a
pure Codec
Codec.nullCodec
parseCodec (Just ByteString
"deflate") = Codec -> m Codec
forall (f :: * -> *) a. Applicative f => a -> f a
pure Codec
Codec.deflateCodec
parseCodec (Just ByteString
x) = String -> m Codec
forall a. HasCallStack => String -> a
error (String -> m Codec) -> String -> m Codec
forall a b. (a -> b) -> a -> b
$ String
"Unrecognized codec: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> ByteString -> String
BLC.unpack ByteString
x
parseCodec Maybe ByteString
Nothing = Codec -> m Codec
forall (f :: * -> *) a. Applicative f => a -> f a
pure Codec
Codec.nullCodec
takeWhileInclusive :: (a -> Bool) -> [a] -> [a]
takeWhileInclusive :: (a -> Bool) -> [a] -> [a]
takeWhileInclusive a -> Bool
_ [] = []
takeWhileInclusive a -> Bool
p (a
x:[a]
xs) =
a
x a -> [a] -> [a]
forall a. a -> [a] -> [a]
: if a -> Bool
p a
x then (a -> Bool) -> [a] -> [a]
forall a. (a -> Bool) -> [a] -> [a]
takeWhileInclusive a -> Bool
p [a]
xs else []
{-# INLINE takeWhileInclusive #-}