{-# LANGUAGE BangPatterns        #-}
{-# LANGUAGE DeriveFunctor       #-}
{-# LANGUAGE LambdaCase          #-}
{-# LANGUAGE OverloadedStrings   #-}
{-# LANGUAGE RankNTypes          #-}
{-# LANGUAGE RecordWildCards     #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StrictData          #-}
{-# LANGUAGE TupleSections       #-}
{-# LANGUAGE TypeApplications    #-}

module Data.Avro.Internal.Container
where

import           Control.Monad                (when)
import qualified Data.Aeson                   as Aeson
import           Data.Avro.Codec              (Codec (..), Decompress)
import qualified Data.Avro.Codec              as Codec
import           Data.Avro.Encoding.ToAvro    (toAvro)
import           Data.Avro.Internal.EncodeRaw (encodeRaw)
import           Data.Avro.Schema.Schema      (Schema)
import qualified Data.Avro.Schema.Schema      as Schema
import           Data.Binary.Get              (Get)
import qualified Data.Binary.Get              as Get
import           Data.ByteString              (ByteString)
import qualified Data.ByteString              as B
import           Data.ByteString.Builder      (Builder, lazyByteString, toLazyByteString)
import qualified Data.ByteString.Lazy         as BL
import qualified Data.ByteString.Lazy.Char8   as BLC
import           Data.Either                  (isRight)
import           Data.HashMap.Strict          (HashMap)
import qualified Data.HashMap.Strict          as HashMap
import           Data.Int                     (Int32, Int64)
import           Data.List                    (foldl', unfoldr)
import qualified Data.Map.Strict              as Map
import           Data.Text                    (Text)
import           System.Random.TF.Init        (initTFGen)
import           System.Random.TF.Instances   (randoms)

import qualified Data.Avro.Internal.Get as AGet

data ContainerHeader = ContainerHeader
  { ContainerHeader -> ByteString
syncBytes       :: BL.ByteString
  , ContainerHeader -> forall a. Decompress a
decompress      :: forall a. Decompress a
  , ContainerHeader -> Schema
containedSchema :: Schema
  }

nrSyncBytes :: Integral sb => sb
nrSyncBytes :: sb
nrSyncBytes = sb
16
{-# INLINE nrSyncBytes #-}

-- | Generates a new synchronization marker for encoding Avro containers
newSyncBytes :: IO BL.ByteString
newSyncBytes :: IO ByteString
newSyncBytes = [Word8] -> ByteString
BL.pack ([Word8] -> ByteString)
-> (TFGen -> [Word8]) -> TFGen -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> [Word8] -> [Word8]
forall a. Int -> [a] -> [a]
take Int
forall sb. Integral sb => sb
nrSyncBytes ([Word8] -> [Word8]) -> (TFGen -> [Word8]) -> TFGen -> [Word8]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TFGen -> [Word8]
forall a g. (Random a, RandomGen g) => g -> [a]
randoms (TFGen -> ByteString) -> IO TFGen -> IO ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO TFGen
initTFGen

getContainerHeader :: Get ContainerHeader
getContainerHeader :: Get ContainerHeader
getContainerHeader = do
  ByteString
magic <- Int -> Get ByteString
getFixed Int
forall sb. Integral sb => sb
avroMagicSize
  Bool -> Get () -> Get ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteString -> ByteString
BL.fromStrict ByteString
magic ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
/= ByteString
avroMagicBytes)
        (String -> Get ()
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"Invalid magic number at start of container.")
  Map Text ByteString
metadata <- Get (Map Text ByteString)
getMeta
  ByteString
sync  <- ByteString -> ByteString
BL.fromStrict (ByteString -> ByteString) -> Get ByteString -> Get ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> Get ByteString
getFixed Int
forall sb. Integral sb => sb
nrSyncBytes
  Codec
codec <- Maybe ByteString -> Get Codec
forall (m :: * -> *). Monad m => Maybe ByteString -> m Codec
parseCodec (Text -> Map Text ByteString -> Maybe ByteString
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Text
"avro.codec" Map Text ByteString
metadata)
  Schema
schema <- case Text -> Map Text ByteString -> Maybe ByteString
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Text
"avro.schema" Map Text ByteString
metadata of
              Maybe ByteString
Nothing -> String -> Get Schema
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"Invalid container object: no schema."
              Just ByteString
s  -> case ByteString -> Either String Schema
forall a. FromJSON a => ByteString -> Either String a
Aeson.eitherDecode' ByteString
s of
                            Left String
e  -> String -> Get Schema
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String
"Can not decode container schema: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
e)
                            Right Schema
x -> Schema -> Get Schema
forall (m :: * -> *) a. Monad m => a -> m a
return Schema
x
  ContainerHeader -> Get ContainerHeader
forall (m :: * -> *) a. Monad m => a -> m a
return ContainerHeader :: ByteString -> (forall a. Decompress a) -> Schema -> ContainerHeader
ContainerHeader  { syncBytes :: ByteString
syncBytes = ByteString
sync
                          , decompress :: forall a. Decompress a
decompress = Codec -> forall a. Decompress a
Codec.codecDecompress Codec
codec
                          , containedSchema :: Schema
containedSchema = Schema
schema
                          }
  where avroMagicSize :: Integral a => a
        avroMagicSize :: a
avroMagicSize = a
4

        avroMagicBytes :: BL.ByteString
        avroMagicBytes :: ByteString
avroMagicBytes = String -> ByteString
BLC.pack String
"Obj" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> [Word8] -> ByteString
BL.pack [Word8
1]

        getFixed :: Int -> Get ByteString
        getFixed :: Int -> Get ByteString
getFixed = Int -> Get ByteString
Get.getByteString

        getMeta :: Get (Map.Map Text BL.ByteString)
        getMeta :: Get (Map Text ByteString)
getMeta =
          let keyValue :: Get (Text, ByteString)
keyValue = (,) (Text -> ByteString -> (Text, ByteString))
-> Get Text -> Get (ByteString -> (Text, ByteString))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get Text
AGet.getString Get (ByteString -> (Text, ByteString))
-> Get ByteString -> Get (Text, ByteString)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Get ByteString
AGet.getBytesLazy
          in [(Text, ByteString)] -> Map Text ByteString
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList ([(Text, ByteString)] -> Map Text ByteString)
-> Get [(Text, ByteString)] -> Get (Map Text ByteString)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get (Text, ByteString) -> Get [(Text, ByteString)]
forall a. Get a -> Get [a]
AGet.decodeBlocks Get (Text, ByteString)
keyValue

-- | Reads the container as a list of blocks without decoding them into actual values.
--
-- This can be useful for streaming / splitting / merging Avro containers without
-- paying the cost for Avro encoding/decoding.
--
-- Each block is returned as a raw 'ByteString' annotated with the number of Avro values
-- that are contained in this block.
--
-- The "outer" error represents the error in opening the container itself
-- (including problems like reading schemas embedded into the container.)
decodeRawBlocks :: BL.ByteString -> Either String (Schema, [Either String (Int, BL.ByteString)])
decodeRawBlocks :: ByteString
-> Either String (Schema, [Either String (Int, ByteString)])
decodeRawBlocks ByteString
bs =
  case Get ContainerHeader
-> ByteString
-> Either
     (ByteString, ByteOffset, String)
     (ByteString, ByteOffset, ContainerHeader)
forall a.
Get a
-> ByteString
-> Either
     (ByteString, ByteOffset, String) (ByteString, ByteOffset, a)
Get.runGetOrFail Get ContainerHeader
getContainerHeader ByteString
bs of
    Left (ByteString
bs', ByteOffset
_, String
err) -> String -> Either String (Schema, [Either String (Int, ByteString)])
forall a b. a -> Either a b
Left String
err
    Right (ByteString
bs', ByteOffset
_, containerHeader :: ContainerHeader
containerHeader@ContainerHeader {ByteString
Schema
forall a. Decompress a
containedSchema :: Schema
decompress :: forall a. Decompress a
syncBytes :: ByteString
containedSchema :: ContainerHeader -> Schema
decompress :: ContainerHeader -> forall a. Decompress a
syncBytes :: ContainerHeader -> ByteString
..}) ->
      let blocks :: [Either String (Int, ByteString)]
blocks = ContainerHeader -> ByteString -> [Either String (Int, ByteString)]
allBlocks ContainerHeader
containerHeader ByteString
bs'
      in (Schema, [Either String (Int, ByteString)])
-> Either String (Schema, [Either String (Int, ByteString)])
forall a b. b -> Either a b
Right (Schema
containedSchema, [Either String (Int, ByteString)]
blocks)
  where
    allBlocks :: ContainerHeader -> ByteString -> [Either String (Int, ByteString)]
allBlocks ContainerHeader
containerHeader ByteString
bytes =
      ((Int, ByteString)
 -> [Either String (Int, ByteString)]
 -> [Either String (Int, ByteString)])
-> (String -> [Either String (Int, ByteString)])
-> [Either String (Int, ByteString)]
-> ByteString
-> Blocks (Int, ByteString)
-> [Either String (Int, ByteString)]
forall a b.
(a -> b -> b) -> (String -> b) -> b -> ByteString -> Blocks a -> b
foldrBlocks (\(Int, ByteString)
x -> ((Int, ByteString) -> Either String (Int, ByteString)
forall a b. b -> Either a b
Right (Int, ByteString)
x Either String (Int, ByteString)
-> [Either String (Int, ByteString)]
-> [Either String (Int, ByteString)]
forall a. a -> [a] -> [a]
:)) (\String
err -> [String -> Either String (Int, ByteString)
forall a b. a -> Either a b
Left String
err]) [] ByteString
bytes
        (ContainerHeader -> Blocks (Int, ByteString)
decodeRawBlocksIncremental ContainerHeader
containerHeader)

data Blocks a
  = Block
      a
      (Blocks a)
  | More
      (ByteString -> Blocks a) -- ^ Feed more bytes. Pass the empty ByteString to
                               -- signal end of input.
  | Error
      String -- ^ Error message
      ByteString -- ^ Leftover bytes
  | Done
      ByteString -- ^ Leftover bytes
  deriving (a -> Blocks b -> Blocks a
(a -> b) -> Blocks a -> Blocks b
(forall a b. (a -> b) -> Blocks a -> Blocks b)
-> (forall a b. a -> Blocks b -> Blocks a) -> Functor Blocks
forall a b. a -> Blocks b -> Blocks a
forall a b. (a -> b) -> Blocks a -> Blocks b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> Blocks b -> Blocks a
$c<$ :: forall a b. a -> Blocks b -> Blocks a
fmap :: (a -> b) -> Blocks a -> Blocks b
$cfmap :: forall a b. (a -> b) -> Blocks a -> Blocks b
Functor)

-- | Feeds a 'BL.ByteString' to the 'Blocks' until exhausted.
-- Consumes the 'BL.ByteString' lazily.
foldrBlocks :: (a -> b -> b) -> (String -> b) -> b -> BL.ByteString -> Blocks a -> b
foldrBlocks :: (a -> b -> b) -> (String -> b) -> b -> ByteString -> Blocks a -> b
foldrBlocks a -> b -> b
block String -> b
err b
done ByteString
input = [ByteString] -> Blocks a -> b
go (ByteString -> [ByteString]
BL.toChunks ByteString
input)
  where
    go :: [ByteString] -> Blocks a -> b
go [ByteString]
chunks (Block a
a Blocks a
rest)     = a -> b -> b
block a
a ([ByteString] -> Blocks a -> b
go [ByteString]
chunks Blocks a
rest)
    go []     (More ByteString -> Blocks a
cont)        = [ByteString] -> Blocks a -> b
go [] (ByteString -> Blocks a
cont ByteString
"")
    go (ByteString
c:[ByteString]
cx) (More ByteString -> Blocks a
cont)        = [ByteString] -> Blocks a -> b
go [ByteString]
cx (ByteString -> Blocks a
cont ByteString
c)
    go [ByteString]
_      (Error String
message ByteString
_)  = String -> b
err String
message
    go [ByteString]
_      (Done ByteString
_)           = b
done

decodeRawBlocksIncremental :: ContainerHeader -> Blocks (Int, BL.ByteString)
decodeRawBlocksIncremental :: ContainerHeader -> Blocks (Int, ByteString)
decodeRawBlocksIncremental ContainerHeader{ByteString
Schema
forall a. Decompress a
containedSchema :: Schema
decompress :: forall a. Decompress a
syncBytes :: ByteString
containedSchema :: ContainerHeader -> Schema
decompress :: ContainerHeader -> forall a. Decompress a
syncBytes :: ContainerHeader -> ByteString
..} = Blocks (Int, ByteString)
initial
  where
    initialDecoder :: Decoder (Int, ByteString)
initialDecoder =
      Get (Int, ByteString) -> Decoder (Int, ByteString)
forall a. Get a -> Decoder a
Get.runGetIncremental Get (Int, ByteString)
getRawBlock

    initial :: Blocks (Int, ByteString)
initial = (ByteString -> Blocks (Int, ByteString))
-> Blocks (Int, ByteString)
forall a. (ByteString -> Blocks a) -> Blocks a
More ((ByteString -> Blocks (Int, ByteString))
 -> Blocks (Int, ByteString))
-> (ByteString -> Blocks (Int, ByteString))
-> Blocks (Int, ByteString)
forall a b. (a -> b) -> a -> b
$ \ByteString
input ->
      case ByteString
input of
        ByteString
"" -> ByteString -> Blocks (Int, ByteString)
forall a. ByteString -> Blocks a
Done ByteString
""
        ByteString
_  -> Decoder (Int, ByteString) -> Blocks (Int, ByteString)
go (Decoder (Int, ByteString)
-> ByteString -> Decoder (Int, ByteString)
forall a. Decoder a -> ByteString -> Decoder a
Get.pushChunk Decoder (Int, ByteString)
initialDecoder ByteString
input)

    go :: Decoder (Int, ByteString) -> Blocks (Int, ByteString)
go Decoder (Int, ByteString)
decoder = case Decoder (Int, ByteString)
decoder of
      Get.Done ByteString
rest ByteOffset
_ !(Int, ByteString)
block ->
        case ByteString
rest of
          ByteString
"" -> (Int, ByteString)
-> Blocks (Int, ByteString) -> Blocks (Int, ByteString)
forall a. a -> Blocks a -> Blocks a
Block (Int, ByteString)
block Blocks (Int, ByteString)
initial
          ByteString
_  -> (Int, ByteString)
-> Blocks (Int, ByteString) -> Blocks (Int, ByteString)
forall a. a -> Blocks a -> Blocks a
Block (Int, ByteString)
block (Decoder (Int, ByteString) -> Blocks (Int, ByteString)
go (Decoder (Int, ByteString)
-> ByteString -> Decoder (Int, ByteString)
forall a. Decoder a -> ByteString -> Decoder a
Get.pushChunk Decoder (Int, ByteString)
initialDecoder ByteString
rest))
      Get.Fail ByteString
rest ByteOffset
_ String
err ->
        String -> ByteString -> Blocks (Int, ByteString)
forall a. String -> ByteString -> Blocks a
Error String
err ByteString
rest
      Get.Partial{} -> (ByteString -> Blocks (Int, ByteString))
-> Blocks (Int, ByteString)
forall a. (ByteString -> Blocks a) -> Blocks a
More ((ByteString -> Blocks (Int, ByteString))
 -> Blocks (Int, ByteString))
-> (ByteString -> Blocks (Int, ByteString))
-> Blocks (Int, ByteString)
forall a b. (a -> b) -> a -> b
$ \ByteString
input ->
        case ByteString
input of
          ByteString
"" -> Decoder (Int, ByteString) -> Blocks (Int, ByteString)
go (Decoder (Int, ByteString) -> Decoder (Int, ByteString)
forall a. Decoder a -> Decoder a
Get.pushEndOfInput Decoder (Int, ByteString)
decoder)
          ByteString
_  -> Decoder (Int, ByteString) -> Blocks (Int, ByteString)
go (Decoder (Int, ByteString)
-> ByteString -> Decoder (Int, ByteString)
forall a. Decoder a -> ByteString -> Decoder a
Get.pushChunk Decoder (Int, ByteString)
decoder ByteString
input)

    getRawBlock :: Get (Int, ByteString)
getRawBlock = do
      Int
nrObj   <- Get ByteOffset
AGet.getLong Get ByteOffset -> (ByteOffset -> Get Int) -> Get Int
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ByteOffset -> Get Int
forall a b (m :: * -> *).
(Monad m, Bounded a, Bounded b, Integral a, Integral b) =>
a -> m b
AGet.sFromIntegral
      ByteOffset
nrBytes <- Get ByteOffset
AGet.getLong
      ByteString
compressed <- ByteOffset -> Get ByteString
Get.getLazyByteString ByteOffset
nrBytes
      ByteString
bytes <- case Decompress ByteString
forall a. Decompress a
decompress ByteString
compressed Get ByteString
Get.getRemainingLazyByteString of
        Right ByteString
x -> ByteString -> Get ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
x
        Left String
err -> String -> Get ByteString
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
err
      ByteString
trailer <- ByteOffset -> Get ByteString
Get.getLazyByteString ByteOffset
forall sb. Integral sb => sb
nrSyncBytes
      if ByteString
trailer ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
/= ByteString
syncBytes then
        String -> Get (Int, ByteString)
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"Invalid marker, does not match sync bytes."
      else
        (Int, ByteString) -> Get (Int, ByteString)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int
nrObj, ByteString
bytes)

-- | Splits container into a list of individual avro-encoded values.
-- This version provides both encoded and decoded values.
--
-- This is particularly useful when slicing up containers into one or more
-- smaller files.  By extracting the original bytestring it is possible to
-- avoid re-encoding data.
extractContainerValuesBytes :: forall a schema.
     (Schema -> Either String schema)
  -> (schema -> Get a)
  -> BL.ByteString
  -> Either String (Schema, [Either String (a, BL.ByteString)])
extractContainerValuesBytes :: (Schema -> Either String schema)
-> (schema -> Get a)
-> ByteString
-> Either String (Schema, [Either String (a, ByteString)])
extractContainerValuesBytes Schema -> Either String schema
deconflict schema -> Get a
f =
  (Schema -> Either String schema)
-> (schema -> Get (a, ByteString))
-> ByteString
-> Either String (Schema, [Either String (a, ByteString)])
forall a schema.
(Schema -> Either String schema)
-> (schema -> Get a)
-> ByteString
-> Either String (Schema, [Either String a])
extractContainerValues Schema -> Either String schema
deconflict schema -> Get (a, ByteString)
readBytes
  where
    readBytes :: schema -> Get (a, ByteString)
readBytes schema
sch = do
      ByteOffset
start <- Get ByteOffset
Get.bytesRead
      (a
val, ByteOffset
end) <- Get (a, ByteOffset) -> Get (a, ByteOffset)
forall a. Get a -> Get a
Get.lookAhead (schema -> Get a
f schema
sch Get a -> (a -> Get (a, ByteOffset)) -> Get (a, ByteOffset)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (\a
v -> (a
v, ) (ByteOffset -> (a, ByteOffset))
-> Get ByteOffset -> Get (a, ByteOffset)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get ByteOffset
Get.bytesRead))
      ByteString
res <- ByteOffset -> Get ByteString
Get.getLazyByteString (ByteOffset
endByteOffset -> ByteOffset -> ByteOffset
forall a. Num a => a -> a -> a
-ByteOffset
start)
      (a, ByteString) -> Get (a, ByteString)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
val, ByteString
res)

extractContainerValues :: forall a schema.
     (Schema -> Either String schema)
  -> (schema -> Get a)
  -> BL.ByteString
  -> Either String (Schema, [Either String a])
extractContainerValues :: (Schema -> Either String schema)
-> (schema -> Get a)
-> ByteString
-> Either String (Schema, [Either String a])
extractContainerValues Schema -> Either String schema
deconflict schema -> Get a
f ByteString
bs = do
  case Get ContainerHeader
-> ByteString
-> Either
     (ByteString, ByteOffset, String)
     (ByteString, ByteOffset, ContainerHeader)
forall a.
Get a
-> ByteString
-> Either
     (ByteString, ByteOffset, String) (ByteString, ByteOffset, a)
Get.runGetOrFail Get ContainerHeader
getContainerHeader ByteString
bs of
    Left (ByteString
_, ByteOffset
_, String
err) -> String -> Either String (Schema, [Either String a])
forall a b. a -> Either a b
Left String
err
    Right (ByteString
rest, ByteOffset
_, ContainerHeader
containerHeader) -> do
      (Schema
schema, Blocks [Either String a]
blocks) <- (Schema -> Either String schema)
-> (schema -> Get a)
-> ContainerHeader
-> Either String (Schema, Blocks [Either String a])
forall schema a.
(Schema -> Either String schema)
-> (schema -> Get a)
-> ContainerHeader
-> Either String (Schema, Blocks [Either String a])
extractContainerValuesIncremental Schema -> Either String schema
deconflict schema -> Get a
f ContainerHeader
containerHeader
      let values :: [Either String a]
values = ([Either String a] -> [Either String a] -> [Either String a])
-> (String -> [Either String a])
-> [Either String a]
-> ByteString
-> Blocks [Either String a]
-> [Either String a]
forall a b.
(a -> b -> b) -> (String -> b) -> b -> ByteString -> Blocks a -> b
foldrBlocks [Either String a] -> [Either String a] -> [Either String a]
forall a. [a] -> [a] -> [a]
(++) (\String
err -> [String -> Either String a
forall a b. a -> Either a b
Left String
err]) [] ByteString
rest Blocks [Either String a]
blocks
      (Schema, [Either String a])
-> Either String (Schema, [Either String a])
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Schema
schema, [Either String a]
values)

extractContainerValuesIncremental
  :: (Schema -> Either String schema)
  -> (schema -> Get a)
  -> ContainerHeader
  -> Either String (Schema, Blocks [Either String a])
extractContainerValuesIncremental :: (Schema -> Either String schema)
-> (schema -> Get a)
-> ContainerHeader
-> Either String (Schema, Blocks [Either String a])
extractContainerValuesIncremental Schema -> Either String schema
deconflict schema -> Get a
getValue containerHeader :: ContainerHeader
containerHeader@ContainerHeader{ByteString
Schema
forall a. Decompress a
containedSchema :: Schema
decompress :: forall a. Decompress a
syncBytes :: ByteString
containedSchema :: ContainerHeader -> Schema
decompress :: ContainerHeader -> forall a. Decompress a
syncBytes :: ContainerHeader -> ByteString
..} = do
  schema
readSchema <- Schema -> Either String schema
deconflict Schema
containedSchema
  let blocks :: Blocks (Int, ByteString)
blocks = ContainerHeader -> Blocks (Int, ByteString)
decodeRawBlocksIncremental ContainerHeader
containerHeader
  (Schema, Blocks [Either String a])
-> Either String (Schema, Blocks [Either String a])
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Schema
containedSchema, ((Int, ByteString) -> [Either String a])
-> Blocks (Int, ByteString) -> Blocks [Either String a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (schema -> (Int, ByteString) -> [Either String a]
forall a.
Integral a =>
schema -> (a, ByteString) -> [Either String a]
decodeBlock schema
readSchema) Blocks (Int, ByteString)
blocks)
  where
    decodeBlock :: schema -> (a, ByteString) -> [Either String a]
decodeBlock schema
readSchema (a
nrObj, ByteString
bytes) =
      (ByteString, [Either String a]) -> [Either String a]
forall a b. (a, b) -> b
snd ((ByteString, [Either String a]) -> [Either String a])
-> (ByteString, [Either String a]) -> [Either String a]
forall a b. (a -> b) -> a -> b
$ ByteOffset
-> (ByteString -> (ByteString, Either String a))
-> ByteString
-> (ByteString, [Either String a])
forall a b. ByteOffset -> (a -> (a, b)) -> a -> (a, [b])
consumeN (a -> ByteOffset
forall a b. (Integral a, Num b) => a -> b
fromIntegral a
nrObj) (schema -> ByteString -> (ByteString, Either String a)
decodeValue schema
readSchema) ByteString
bytes

    decodeValue :: schema -> ByteString -> (ByteString, Either String a)
decodeValue schema
readSchema ByteString
bytes =
      case Get a
-> ByteString
-> Either
     (ByteString, ByteOffset, String) (ByteString, ByteOffset, a)
forall a.
Get a
-> ByteString
-> Either
     (ByteString, ByteOffset, String) (ByteString, ByteOffset, a)
Get.runGetOrFail (schema -> Get a
getValue schema
readSchema) ByteString
bytes of
        Left (ByteString
rest, ByteOffset
_, String
err) -> (ByteString
rest, String -> Either String a
forall a b. a -> Either a b
Left String
err)
        Right (ByteString
rest, ByteOffset
_, a
value) -> (ByteString
rest, a -> Either String a
forall a b. b -> Either a b
Right a
value)

-- | Packs a container from a given list of already encoded Avro values
-- Each bytestring should represent exactly one one value serialised to Avro.
packContainerValues :: Codec -> Schema -> [[BL.ByteString]] -> IO BL.ByteString
packContainerValues :: Codec -> Schema -> [[ByteString]] -> IO ByteString
packContainerValues Codec
codec Schema
sch [[ByteString]]
values = do
  ByteString
sync <- IO ByteString
newSyncBytes
  ByteString -> IO ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> IO ByteString) -> ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ Codec -> Schema -> ByteString -> [[ByteString]] -> ByteString
packContainerValuesWithSync Codec
codec Schema
sch ByteString
sync [[ByteString]]
values

-- | Packs a container from a given list of already encoded Avro values
-- Each bytestring should represent exactly one one value serialised to Avro.
packContainerValuesWithSync :: Codec -> Schema -> BL.ByteString -> [[BL.ByteString]] -> BL.ByteString
packContainerValuesWithSync :: Codec -> Schema -> ByteString -> [[ByteString]] -> ByteString
packContainerValuesWithSync = (Schema -> ByteString -> Builder)
-> Codec -> Schema -> ByteString -> [[ByteString]] -> ByteString
forall a.
(Schema -> a -> Builder)
-> Codec -> Schema -> ByteString -> [[a]] -> ByteString
packContainerValuesWithSync' (\Schema
_ ByteString
a -> ByteString -> Builder
lazyByteString ByteString
a)
{-# INLINABLE packContainerValuesWithSync #-}
-- | Packs a container from a given list of already encoded Avro values
-- Each bytestring should represent exactly one one value serialised to Avro.
packContainerValuesWithSync' ::
     (Schema -> a -> Builder)
  -> Codec
  -> Schema
  -> BL.ByteString
  -> [[a]]
  -> BL.ByteString
packContainerValuesWithSync' :: (Schema -> a -> Builder)
-> Codec -> Schema -> ByteString -> [[a]] -> ByteString
packContainerValuesWithSync' Schema -> a -> Builder
encode Codec
codec Schema
sch ByteString
syncBytes [[a]]
values =
  Builder -> ByteString
toLazyByteString (Builder -> ByteString) -> Builder -> ByteString
forall a b. (a -> b) -> a -> b
$ Codec -> Schema -> ByteString -> Builder
containerHeaderWithSync Codec
codec Schema
sch ByteString
syncBytes Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> ([a] -> Builder) -> [[a]] -> Builder
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap [a] -> Builder
forall (t :: * -> *). Foldable t => t a -> Builder
putBlock [[a]]
values
  where
    putBlock :: t a -> Builder
putBlock t a
ys =
      let nrObj :: Int
nrObj = t a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length t a
ys
          nrBytes :: ByteOffset
nrBytes = ByteString -> ByteOffset
BL.length ByteString
theBytes
          theBytes :: ByteString
theBytes = Codec -> ByteString -> ByteString
codecCompress Codec
codec (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ Builder -> ByteString
toLazyByteString (Builder -> ByteString) -> Builder -> ByteString
forall a b. (a -> b) -> a -> b
$ (a -> Builder) -> t a -> Builder
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap (Schema -> a -> Builder
encode Schema
sch) t a
ys
      in Int32 -> Builder
forall a. EncodeRaw a => a -> Builder
encodeRaw @Int32 (Int -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
nrObj) Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
         ByteOffset -> Builder
forall a. EncodeRaw a => a -> Builder
encodeRaw ByteOffset
nrBytes Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
         ByteString -> Builder
lazyByteString ByteString
theBytes Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
         ByteString -> Builder
lazyByteString ByteString
syncBytes

-- | Packs a new container from a list of already encoded Avro blocks.
-- Each block is denoted as a pair of a number of objects within that block and the block content.
packContainerBlocks :: Codec -> Schema -> [(Int, BL.ByteString)] -> IO BL.ByteString
packContainerBlocks :: Codec -> Schema -> [(Int, ByteString)] -> IO ByteString
packContainerBlocks Codec
codec Schema
sch [(Int, ByteString)]
blocks = do
  ByteString
sync <- IO ByteString
newSyncBytes
  ByteString -> IO ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> IO ByteString) -> ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ Codec -> Schema -> ByteString -> [(Int, ByteString)] -> ByteString
packContainerBlocksWithSync Codec
codec Schema
sch ByteString
sync [(Int, ByteString)]
blocks

-- | Packs a new container from a list of already encoded Avro blocks.
-- Each block is denoted as a pair of a number of objects within that block and the block content.
packContainerBlocksWithSync :: Codec -> Schema -> BL.ByteString -> [(Int, BL.ByteString)] -> BL.ByteString
packContainerBlocksWithSync :: Codec -> Schema -> ByteString -> [(Int, ByteString)] -> ByteString
packContainerBlocksWithSync Codec
codec Schema
sch ByteString
syncBytes [(Int, ByteString)]
blocks =
  Builder -> ByteString
toLazyByteString (Builder -> ByteString) -> Builder -> ByteString
forall a b. (a -> b) -> a -> b
$
    Codec -> Schema -> ByteString -> Builder
containerHeaderWithSync Codec
codec Schema
sch ByteString
syncBytes Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
    ((Int, ByteString) -> Builder) -> [(Int, ByteString)] -> Builder
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap (Int, ByteString) -> Builder
forall a. Integral a => (a, ByteString) -> Builder
putBlock [(Int, ByteString)]
blocks
  where
    putBlock :: (a, ByteString) -> Builder
putBlock (a
nrObj, ByteString
bytes) =
      let compressed :: ByteString
compressed = Codec -> ByteString -> ByteString
codecCompress Codec
codec ByteString
bytes in
        Int32 -> Builder
forall a. EncodeRaw a => a -> Builder
encodeRaw @Int32 (a -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral a
nrObj) Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
        ByteOffset -> Builder
forall a. EncodeRaw a => a -> Builder
encodeRaw (ByteString -> ByteOffset
BL.length ByteString
compressed) Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
        ByteString -> Builder
lazyByteString ByteString
compressed Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<>
        ByteString -> Builder
lazyByteString ByteString
syncBytes


-- | Creates an Avro container header for a given schema.
containerHeaderWithSync :: Codec -> Schema -> BL.ByteString -> Builder
containerHeaderWithSync :: Codec -> Schema -> ByteString -> Builder
containerHeaderWithSync Codec
codec Schema
sch ByteString
syncBytes =
  ByteString -> Builder
lazyByteString ByteString
avroMagicBytes
    Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Schema -> HashMap Text ByteString -> Builder
forall a. ToAvro a => Schema -> a -> Builder
toAvro (Schema -> Schema
Schema.Map Schema
Schema.Bytes') HashMap Text ByteString
headers
    Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> ByteString -> Builder
lazyByteString ByteString
syncBytes
  where
    avroMagicBytes :: BL.ByteString
    avroMagicBytes :: ByteString
avroMagicBytes = ByteString
"Obj" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> [Word8] -> ByteString
BL.pack [Word8
1]

    headers :: HashMap Text BL.ByteString
    headers :: HashMap Text ByteString
headers =
      [(Text, ByteString)] -> HashMap Text ByteString
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HashMap.fromList
        [
          (Text
"avro.schema", Schema -> ByteString
forall a. ToJSON a => a -> ByteString
Aeson.encode Schema
sch)
        , (Text
"avro.codec", ByteString -> ByteString
BL.fromStrict (Codec -> ByteString
codecName Codec
codec))
        ]

-----------------------------------------------------------------

consumeN :: Int64 -> (a -> (a, b)) -> a -> (a, [b])
consumeN :: ByteOffset -> (a -> (a, b)) -> a -> (a, [b])
consumeN ByteOffset
n a -> (a, b)
f a
a =
  if ByteOffset
n ByteOffset -> ByteOffset -> Bool
forall a. Eq a => a -> a -> Bool
== ByteOffset
0
    then (a
a, [])
    else
      let (a
a', b
b) = a -> (a, b)
f a
a
          (a
r, [b]
bs) = ByteOffset -> (a -> (a, b)) -> a -> (a, [b])
forall a b. ByteOffset -> (a -> (a, b)) -> a -> (a, [b])
consumeN (ByteOffset
nByteOffset -> ByteOffset -> ByteOffset
forall a. Num a => a -> a -> a
-ByteOffset
1) a -> (a, b)
f a
a'
      in (a
r, b
bb -> [b] -> [b]
forall a. a -> [a] -> [a]
:[b]
bs)
{-# INLINE consumeN #-}

----------------------------------------------------------------
parseCodec :: Monad m => Maybe BL.ByteString -> m Codec
parseCodec :: Maybe ByteString -> m Codec
parseCodec (Just ByteString
"null")    = Codec -> m Codec
forall (f :: * -> *) a. Applicative f => a -> f a
pure Codec
Codec.nullCodec
parseCodec (Just ByteString
"deflate") = Codec -> m Codec
forall (f :: * -> *) a. Applicative f => a -> f a
pure Codec
Codec.deflateCodec
parseCodec (Just ByteString
x)         = String -> m Codec
forall a. HasCallStack => String -> a
error (String -> m Codec) -> String -> m Codec
forall a b. (a -> b) -> a -> b
$ String
"Unrecognized codec: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> ByteString -> String
BLC.unpack ByteString
x
parseCodec Maybe ByteString
Nothing          = Codec -> m Codec
forall (f :: * -> *) a. Applicative f => a -> f a
pure Codec
Codec.nullCodec

takeWhileInclusive :: (a -> Bool) -> [a] -> [a]
takeWhileInclusive :: (a -> Bool) -> [a] -> [a]
takeWhileInclusive a -> Bool
_ [] = []
takeWhileInclusive a -> Bool
p (a
x:[a]
xs) =
  a
x a -> [a] -> [a]
forall a. a -> [a] -> [a]
: if a -> Bool
p a
x then (a -> Bool) -> [a] -> [a]
forall a. (a -> Bool) -> [a] -> [a]
takeWhileInclusive a -> Bool
p [a]
xs else []
{-# INLINE takeWhileInclusive #-}