{-# LANGUAGE TupleSections #-}
module Data.Avro.Schema.Deconflict
  ( deconflict
  ) where

import           Control.Applicative     ((<|>))
import           Data.Avro.Schema.Schema as S
import qualified Data.Foldable           as Foldable
import           Data.HashMap.Strict     (HashMap)
import qualified Data.HashMap.Strict     as HashMap
import           Data.List               (find)
import           Data.List.NonEmpty      (NonEmpty (..))
import qualified Data.List.NonEmpty      as NE
import qualified Data.Map                as M
import           Data.Maybe              (isNothing)
import           Data.Semigroup          ((<>))
import qualified Data.Set                as Set
import           Data.Text               (Text)
import qualified Data.Text               as Text
import qualified Data.Text.Encoding      as Text
import           Data.Vector             (Vector)
import qualified Data.Vector             as V

import           Data.Avro.Schema.ReadSchema (FieldStatus (..), ReadField, ReadSchema)
import qualified Data.Avro.Schema.ReadSchema as Read

import Debug.Trace

-- | @deconflict writer reader@ will produce a schema that can decode
-- with the writer's schema into the form specified by the reader's schema.
--
-- Schema resolution rules are described by the specification: <https://avro.apache.org/docs/current/spec.html#Schema+Resolution>
deconflict :: Schema -> Schema -> Either String ReadSchema
deconflict :: Schema -> Schema -> Either String ReadSchema
deconflict Schema
writerSchema Schema
readerSchema | Schema
writerSchema Schema -> Schema -> Bool
forall a. Eq a => a -> a -> Bool
== Schema
readerSchema = ReadSchema -> Either String ReadSchema
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Schema -> ReadSchema
Read.fromSchema Schema
readerSchema)
deconflict Schema
S.Null Schema
S.Null             = ReadSchema -> Either String ReadSchema
forall (f :: * -> *) a. Applicative f => a -> f a
pure ReadSchema
Read.Null
deconflict Schema
S.Boolean Schema
S.Boolean       = ReadSchema -> Either String ReadSchema
forall (f :: * -> *) a. Applicative f => a -> f a
pure ReadSchema
Read.Boolean

deconflict (S.Int Maybe LogicalTypeInt
_) (S.Int Maybe LogicalTypeInt
r)       = ReadSchema -> Either String ReadSchema
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe LogicalTypeInt -> ReadSchema
Read.Int Maybe LogicalTypeInt
r)
deconflict (S.Int Maybe LogicalTypeInt
_) (S.Long Maybe LogicalTypeLong
r)      = ReadSchema -> Either String ReadSchema
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ReadLong -> Maybe LogicalTypeLong -> ReadSchema
Read.Long ReadLong
Read.LongFromInt Maybe LogicalTypeLong
r)
deconflict (S.Int Maybe LogicalTypeInt
_) Schema
S.Float         = ReadSchema -> Either String ReadSchema
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ReadFloat -> ReadSchema
Read.Float ReadFloat
Read.FloatFromInt)
deconflict (S.Int Maybe LogicalTypeInt
_) Schema
S.Double        = ReadSchema -> Either String ReadSchema
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ReadDouble -> ReadSchema
Read.Double ReadDouble
Read.DoubleFromInt)

deconflict (S.Long Maybe LogicalTypeLong
_) (S.Long Maybe LogicalTypeLong
r)     = ReadSchema -> Either String ReadSchema
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ReadLong -> Maybe LogicalTypeLong -> ReadSchema
Read.Long ReadLong
Read.ReadLong Maybe LogicalTypeLong
r)
deconflict (S.Long Maybe LogicalTypeLong
_) Schema
S.Float        = ReadSchema -> Either String ReadSchema
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ReadFloat -> ReadSchema
Read.Float ReadFloat
Read.FloatFromLong)
deconflict (S.Long Maybe LogicalTypeLong
_) Schema
S.Double       = ReadSchema -> Either String ReadSchema
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ReadDouble -> ReadSchema
Read.Double ReadDouble
Read.DoubleFromLong)

deconflict Schema
S.Float Schema
S.Float           = ReadSchema -> Either String ReadSchema
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ReadFloat -> ReadSchema
Read.Float ReadFloat
Read.ReadFloat)
deconflict Schema
S.Float Schema
S.Double          = ReadSchema -> Either String ReadSchema
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ReadDouble -> ReadSchema
Read.Double ReadDouble
Read.DoubleFromFloat)

deconflict Schema
S.Double Schema
S.Double         = ReadSchema -> Either String ReadSchema
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ReadDouble -> ReadSchema
Read.Double ReadDouble
Read.ReadDouble)

deconflict (S.Bytes Maybe LogicalTypeBytes
_) (S.Bytes Maybe LogicalTypeBytes
r)   = ReadSchema -> Either String ReadSchema
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe LogicalTypeBytes -> ReadSchema
Read.Bytes Maybe LogicalTypeBytes
r)
deconflict (S.Bytes Maybe LogicalTypeBytes
_) (S.String Maybe LogicalTypeString
r)  = ReadSchema -> Either String ReadSchema
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe LogicalTypeString -> ReadSchema
Read.String Maybe LogicalTypeString
r)

deconflict (S.String Maybe LogicalTypeString
_) (S.String Maybe LogicalTypeString
r) = ReadSchema -> Either String ReadSchema
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe LogicalTypeString -> ReadSchema
Read.String Maybe LogicalTypeString
r)
deconflict (S.String Maybe LogicalTypeString
_) (S.Bytes Maybe LogicalTypeBytes
r)  = ReadSchema -> Either String ReadSchema
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe LogicalTypeBytes -> ReadSchema
Read.Bytes Maybe LogicalTypeBytes
r)

deconflict (S.Array Schema
w) (S.Array Schema
r)   = ReadSchema -> ReadSchema
Read.Array (ReadSchema -> ReadSchema)
-> Either String ReadSchema -> Either String ReadSchema
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Schema -> Schema -> Either String ReadSchema
deconflict Schema
w Schema
r

deconflict (S.Map Schema
w) (S.Map Schema
r)       = ReadSchema -> ReadSchema
Read.Map (ReadSchema -> ReadSchema)
-> Either String ReadSchema -> Either String ReadSchema
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Schema -> Schema -> Either String ReadSchema
deconflict Schema
w Schema
r

deconflict w :: Schema
w@S.Enum{} r :: Schema
r@S.Enum{}
  | Schema -> TypeName
name Schema
w TypeName -> TypeName -> Bool
forall a. Eq a => a -> a -> Bool
== Schema -> TypeName
name Schema
r Bool -> Bool -> Bool
&& Schema -> Vector Text
symbols Schema
w Vector Text -> Vector Text -> Bool
`contains` Schema -> Vector Text
symbols Schema
r = ReadSchema -> Either String ReadSchema
forall (f :: * -> *) a. Applicative f => a -> f a
pure Enum :: TypeName -> [TypeName] -> Maybe Text -> Vector Text -> ReadSchema
Read.Enum
    { name :: TypeName
Read.name    = Schema -> TypeName
name Schema
r
    , aliases :: [TypeName]
Read.aliases = Schema -> [TypeName]
aliases Schema
w [TypeName] -> [TypeName] -> [TypeName]
forall a. Semigroup a => a -> a -> a
<> Schema -> [TypeName]
aliases Schema
r
    , doc :: Maybe Text
Read.doc     = Schema -> Maybe Text
doc Schema
r
    , symbols :: Vector Text
Read.symbols = Schema -> Vector Text
symbols Schema
w
    }

deconflict w :: Schema
w@S.Fixed {} r :: Schema
r@S.Fixed {}
  | Schema -> TypeName
name Schema
w TypeName -> TypeName -> Bool
forall a. Eq a => a -> a -> Bool
== Schema -> TypeName
name Schema
r Bool -> Bool -> Bool
&& Schema -> Int
size Schema
w Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Schema -> Int
size Schema
r = ReadSchema -> Either String ReadSchema
forall (f :: * -> *) a. Applicative f => a -> f a
pure Fixed :: TypeName
-> [TypeName] -> Int -> Maybe LogicalTypeFixed -> ReadSchema
Read.Fixed
    { name :: TypeName
Read.name    = Schema -> TypeName
name Schema
r
    , aliases :: [TypeName]
Read.aliases = Schema -> [TypeName]
aliases Schema
w [TypeName] -> [TypeName] -> [TypeName]
forall a. Semigroup a => a -> a -> a
<> Schema -> [TypeName]
aliases Schema
r
    , size :: Int
Read.size    = Schema -> Int
size Schema
w
    , logicalTypeF :: Maybe LogicalTypeFixed
Read.logicalTypeF = Schema -> Maybe LogicalTypeFixed
logicalTypeF Schema
r
    }

deconflict w :: Schema
w@S.Record {} r :: Schema
r@S.Record {}
  | Schema -> TypeName
name Schema
w TypeName -> TypeName -> Bool
forall a. Eq a => a -> a -> Bool
== Schema -> TypeName
name Schema
r = do
    [ReadField]
fields' <- [Field] -> [Field] -> Either String [ReadField]
deconflictFields (Schema -> [Field]
fields Schema
w) (Schema -> [Field]
fields Schema
r)
    ReadSchema -> Either String ReadSchema
forall (f :: * -> *) a. Applicative f => a -> f a
pure Record :: TypeName -> [TypeName] -> Maybe Text -> [ReadField] -> ReadSchema
Read.Record
      { name :: TypeName
Read.name    = Schema -> TypeName
name Schema
r
      , aliases :: [TypeName]
Read.aliases = Schema -> [TypeName]
aliases Schema
w [TypeName] -> [TypeName] -> [TypeName]
forall a. Semigroup a => a -> a -> a
<> Schema -> [TypeName]
aliases Schema
r
      , doc :: Maybe Text
Read.doc     = Schema -> Maybe Text
doc Schema
r
      , fields :: [ReadField]
Read.fields  = [ReadField]
fields'
      }

deconflict (S.Union Vector Schema
ws) (S.Union Vector Schema
rs) =
  let
    err :: Schema -> String
err Schema
x = String
"Incorrect payload: union " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> ([Text] -> String
forall a. Show a => a -> String
show ([Text] -> String)
-> (Vector Text -> [Text]) -> Vector Text -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Vector Text -> [Text]
forall (t :: * -> *) a. Foldable t => t a -> [a]
Foldable.toList (Vector Text -> String) -> Vector Text -> String
forall a b. (a -> b) -> a -> b
$ Schema -> Text
typeName (Schema -> Text) -> Vector Schema -> Vector Text
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Vector Schema
rs) String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" does not contain schema " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Text -> String
Text.unpack (Schema -> Text
typeName Schema
x)
  in Vector (Int, ReadSchema) -> ReadSchema
Read.Union (Vector (Int, ReadSchema) -> ReadSchema)
-> Either String (Vector (Int, ReadSchema))
-> Either String ReadSchema
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Schema -> Either String (Int, ReadSchema))
-> Vector Schema -> Either String (Vector (Int, ReadSchema))
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Vector a -> m (Vector b)
V.mapM (\Schema
w -> Either String (Int, ReadSchema)
-> ((Int, Schema) -> Either String (Int, ReadSchema))
-> Maybe (Int, Schema)
-> Either String (Int, ReadSchema)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (String -> Either String (Int, ReadSchema)
forall a b. a -> Either a b
Left (String -> Either String (Int, ReadSchema))
-> String -> Either String (Int, ReadSchema)
forall a b. (a -> b) -> a -> b
$ Schema -> String
err Schema
w) (\(Int
i, Schema
r') -> (Int
i,) (ReadSchema -> (Int, ReadSchema))
-> Either String ReadSchema -> Either String (Int, ReadSchema)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Schema -> Schema -> Either String ReadSchema
deconflict Schema
w Schema
r') (Schema -> Vector Schema -> Maybe (Int, Schema)
findTypeV Schema
w Vector Schema
rs)) Vector Schema
ws

deconflict Schema
nonUnion (S.Union Vector Schema
rs)
  | Just (Int
ix, Schema
y) <- Schema -> Vector Schema -> Maybe (Int, Schema)
findTypeV Schema
nonUnion Vector Schema
rs =
    Int -> ReadSchema -> ReadSchema
Read.FreeUnion Int
ix (ReadSchema -> ReadSchema)
-> Either String ReadSchema -> Either String ReadSchema
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Schema -> Schema -> Either String ReadSchema
deconflict Schema
nonUnion Schema
y

deconflict Schema
a Schema
b = String -> Either String ReadSchema
forall a b. a -> Either a b
Left (String -> Either String ReadSchema)
-> String -> Either String ReadSchema
forall a b. (a -> b) -> a -> b
$ String
"Can not resolve differing writer and reader schemas: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ (Schema, Schema) -> String
forall a. Show a => a -> String
show (Schema
a, Schema
b)

contains :: V.Vector Text -> V.Vector Text -> Bool
contains :: Vector Text -> Vector Text -> Bool
contains Vector Text
container Vector Text
elts =
  [Bool] -> Bool
forall (t :: * -> *). Foldable t => t Bool -> Bool
and [Text
e Text -> Vector Text -> Bool
forall a. Eq a => a -> Vector a -> Bool
`V.elem` Vector Text
container | Text
e <- Vector Text -> [Text]
forall a. Vector a -> [a]
V.toList Vector Text
elts]

-- For each field:
--  1) If it exists in both schemas, deconflict it
--  2) If it's only in the reader schema and has a default, mark it defaulted.
--  2) If it's only in the reader schema and has no default, fail.
--  3) If it's only in the writer schema, mark it ignored.
deconflictFields :: [Field] -> [Field] -> Either String [ReadField]
deconflictFields :: [Field] -> [Field] -> Either String [ReadField]
deconflictFields [Field]
writerFields [Field]
readerFields =
  [Either String ReadField] -> Either String [ReadField]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
sequence ([Either String ReadField] -> Either String [ReadField])
-> [Either String ReadField] -> Either String [ReadField]
forall a b. (a -> b) -> a -> b
$ (Field -> Either String ReadField
deconflictField (Field -> Either String ReadField)
-> [Field] -> [Either String ReadField]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Field]
writerFields) [Either String ReadField]
-> [Either String ReadField] -> [Either String ReadField]
forall a. Semigroup a => a -> a -> a
<> [Either String ReadField]
defaultedFields
  where
    indexedReaderFields :: [(Int, Field)]
indexedReaderFields = [Int] -> [Field] -> [(Int, Field)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
0..] [Field]
readerFields
    defaultedFields :: [Either String ReadField]
defaultedFields = [(Int -> Field -> Either String ReadField)
-> (Int, Field) -> Either String ReadField
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry Int -> Field -> Either String ReadField
confirmDefaulted (Int, Field)
f | (Int, Field)
f <- [(Int, Field)]
indexedReaderFields, Maybe (Int, Field) -> Bool
forall a. Maybe a -> Bool
isNothing (Field -> [(Int, Field)] -> Maybe (Int, Field)
findField ((Int, Field) -> Field
forall a b. (a, b) -> b
snd (Int, Field)
f) ([Int] -> [Field] -> [(Int, Field)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
0..] [Field]
writerFields))]

    confirmDefaulted :: Int -> Field -> Either String ReadField
    confirmDefaulted :: Int -> Field -> Either String ReadField
confirmDefaulted Int
ix Field
f
      | Just DefaultValue
def <- Field -> Maybe DefaultValue
fldDefault Field
f = ReadField -> Either String ReadField
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ReadField -> Either String ReadField)
-> ReadField -> Either String ReadField
forall a b. (a -> b) -> a -> b
$ FieldStatus -> Field -> ReadField
Read.fromField (Int -> DefaultValue -> FieldStatus
Defaulted Int
ix DefaultValue
def) Field
f
      | Bool
otherwise = String -> Either String ReadField
forall a b. a -> Either a b
Left (String -> Either String ReadField)
-> String -> Either String ReadField
forall a b. (a -> b) -> a -> b
$ String
"No default found for deconflicted field " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Text -> String
Text.unpack (Field -> Text
fldName Field
f)

    deconflictField :: Field -> Either String ReadField
    deconflictField :: Field -> Either String ReadField
deconflictField Field
writerField
      | Just (Int
ix, Field
readerField) <- Field -> [(Int, Field)] -> Maybe (Int, Field)
findField Field
writerField [(Int, Field)]
indexedReaderFields = do
        ReadSchema
t <- Schema -> Schema -> Either String ReadSchema
deconflict (Field -> Schema
fldType Field
writerField) (Field -> Schema
fldType Field
readerField)
        ReadField -> Either String ReadField
forall (f :: * -> *) a. Applicative f => a -> f a
pure (FieldStatus -> Field -> ReadField
Read.fromField (Int -> FieldStatus
AsIs Int
ix) Field
writerField) { fldType :: ReadSchema
Read.fldType = ReadSchema
t, fldDefault :: Maybe DefaultValue
Read.fldDefault = Field -> Maybe DefaultValue
fldDefault Field
readerField}
      | Bool
otherwise =
        ReadField -> Either String ReadField
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ReadField -> Either String ReadField)
-> ReadField -> Either String ReadField
forall a b. (a -> b) -> a -> b
$ (FieldStatus -> Field -> ReadField
Read.fromField FieldStatus
Ignored Field
writerField) { fldDefault :: Maybe DefaultValue
Read.fldDefault = Maybe DefaultValue
forall a. Maybe a
Nothing }

findField :: Field -> [(Int, Field)] -> Maybe (Int, Field)
findField :: Field -> [(Int, Field)] -> Maybe (Int, Field)
findField Field
w [(Int, Field)]
rs =
  let
    byName :: Maybe (Int, Field)
byName = ((Int, Field) -> Bool) -> [(Int, Field)] -> Maybe (Int, Field)
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (\(Int, Field)
x -> Field -> Text
fldName ((Int, Field) -> Field
forall a b. (a, b) -> b
snd (Int, Field)
x) Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
== Field -> Text
fldName Field
w) [(Int, Field)]
rs
    allNames :: Field -> Set Text
allNames Field
fld = [Text] -> Set Text
forall a. Ord a => [a] -> Set a
Set.fromList (Field -> Text
fldName Field
fld Text -> [Text] -> [Text]
forall a. a -> [a] -> [a]
: Field -> [Text]
fldAliases Field
fld)
    fNames :: Set Text
fNames = Field -> Set Text
allNames Field
w
    sameField :: Field -> Bool
sameField = Bool -> Bool
not (Bool -> Bool) -> (Field -> Bool) -> Field -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Set Text -> Bool
forall a. Set a -> Bool
Set.null (Set Text -> Bool) -> (Field -> Set Text) -> Field -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Set Text -> Set Text -> Set Text
forall a. Ord a => Set a -> Set a -> Set a
Set.intersection Set Text
fNames (Set Text -> Set Text) -> (Field -> Set Text) -> Field -> Set Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Field -> Set Text
allNames
    byAliases :: Maybe (Int, Field)
byAliases = ((Int, Field) -> Bool) -> [(Int, Field)] -> Maybe (Int, Field)
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (Field -> Bool
sameField (Field -> Bool) -> ((Int, Field) -> Field) -> (Int, Field) -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int, Field) -> Field
forall a b. (a, b) -> b
snd) [(Int, Field)]
rs
  in Maybe (Int, Field)
byName Maybe (Int, Field) -> Maybe (Int, Field) -> Maybe (Int, Field)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Maybe (Int, Field)
byAliases

findTypeV :: Schema -> Vector Schema -> Maybe (Int, Schema)
findTypeV :: Schema -> Vector Schema -> Maybe (Int, Schema)
findTypeV Schema
schema Vector Schema
schemas =
  let tn :: Text
tn = Schema -> Text
typeName Schema
schema
  in ((,) (Int -> Schema -> (Int, Schema))
-> (Int -> Int) -> Int -> Schema -> (Int, Schema)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> Int
forall a. a -> a
id (Int -> Schema -> (Int, Schema))
-> (Int -> Schema) -> Int -> (Int, Schema)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Vector Schema -> Int -> Schema
forall a. Vector a -> Int -> a
V.unsafeIndex Vector Schema
schemas) (Int -> (Int, Schema)) -> Maybe Int -> Maybe (Int, Schema)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Schema -> Bool) -> Vector Schema -> Maybe Int
forall a. (a -> Bool) -> Vector a -> Maybe Int
V.findIndex ((Text
tn Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
==) (Text -> Bool) -> (Schema -> Text) -> Schema -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Schema -> Text
typeName) Vector Schema
schemas