{-# LANGUAGE TupleSections #-}
module Data.Avro.Schema.Deconflict
  ( deconflict
  ) where

import           Control.Applicative     ((<|>))
import           Data.Avro.Schema.Schema as S
import qualified Data.Foldable           as Foldable
import           Data.HashMap.Strict     (HashMap)
import qualified Data.HashMap.Strict     as HashMap
import           Data.List               (find)
import           Data.List.NonEmpty      (NonEmpty (..))
import qualified Data.List.NonEmpty      as NE
import qualified Data.Map                as M
import           Data.Maybe              (isNothing)
import           Data.Semigroup          ((<>))
import qualified Data.Set                as Set
import           Data.Text               (Text)
import qualified Data.Text               as Text
import qualified Data.Text.Encoding      as Text
import           Data.Vector             (Vector)
import qualified Data.Vector             as V

import           Data.Avro.Schema.ReadSchema (FieldStatus (..), ReadField, ReadSchema)
import qualified Data.Avro.Schema.ReadSchema as Read

import Debug.Trace

-- | @deconflict writer reader@ will produce a schema that can decode
-- with the writer's schema into the form specified by the reader's schema.
--
-- Schema resolution rules are described by the specification: <https://avro.apache.org/docs/current/spec.html#Schema+Resolution>
deconflict :: Schema -> Schema -> Either String ReadSchema
deconflict :: Schema -> Schema -> Either String ReadSchema
deconflict Schema
writerSchema Schema
readerSchema | Schema
writerSchema forall a. Eq a => a -> a -> Bool
== Schema
readerSchema = forall (f :: * -> *) a. Applicative f => a -> f a
pure (Schema -> ReadSchema
Read.fromSchema Schema
readerSchema)
deconflict Schema
S.Null Schema
S.Null             = forall (f :: * -> *) a. Applicative f => a -> f a
pure ReadSchema
Read.Null
deconflict Schema
S.Boolean Schema
S.Boolean       = forall (f :: * -> *) a. Applicative f => a -> f a
pure ReadSchema
Read.Boolean

deconflict (S.Int Maybe LogicalTypeInt
_) (S.Int Maybe LogicalTypeInt
r)       = forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe LogicalTypeInt -> ReadSchema
Read.Int Maybe LogicalTypeInt
r)
deconflict (S.Int Maybe LogicalTypeInt
_) (S.Long Maybe LogicalTypeLong
r)      = forall (f :: * -> *) a. Applicative f => a -> f a
pure (ReadLong -> Maybe LogicalTypeLong -> ReadSchema
Read.Long ReadLong
Read.LongFromInt Maybe LogicalTypeLong
r)
deconflict (S.Int Maybe LogicalTypeInt
_) Schema
S.Float         = forall (f :: * -> *) a. Applicative f => a -> f a
pure (ReadFloat -> ReadSchema
Read.Float ReadFloat
Read.FloatFromInt)
deconflict (S.Int Maybe LogicalTypeInt
_) Schema
S.Double        = forall (f :: * -> *) a. Applicative f => a -> f a
pure (ReadDouble -> ReadSchema
Read.Double ReadDouble
Read.DoubleFromInt)

deconflict (S.Long Maybe LogicalTypeLong
_) (S.Long Maybe LogicalTypeLong
r)     = forall (f :: * -> *) a. Applicative f => a -> f a
pure (ReadLong -> Maybe LogicalTypeLong -> ReadSchema
Read.Long ReadLong
Read.ReadLong Maybe LogicalTypeLong
r)
deconflict (S.Long Maybe LogicalTypeLong
_) Schema
S.Float        = forall (f :: * -> *) a. Applicative f => a -> f a
pure (ReadFloat -> ReadSchema
Read.Float ReadFloat
Read.FloatFromLong)
deconflict (S.Long Maybe LogicalTypeLong
_) Schema
S.Double       = forall (f :: * -> *) a. Applicative f => a -> f a
pure (ReadDouble -> ReadSchema
Read.Double ReadDouble
Read.DoubleFromLong)

deconflict Schema
S.Float Schema
S.Float           = forall (f :: * -> *) a. Applicative f => a -> f a
pure (ReadFloat -> ReadSchema
Read.Float ReadFloat
Read.ReadFloat)
deconflict Schema
S.Float Schema
S.Double          = forall (f :: * -> *) a. Applicative f => a -> f a
pure (ReadDouble -> ReadSchema
Read.Double ReadDouble
Read.DoubleFromFloat)

deconflict Schema
S.Double Schema
S.Double         = forall (f :: * -> *) a. Applicative f => a -> f a
pure (ReadDouble -> ReadSchema
Read.Double ReadDouble
Read.ReadDouble)

deconflict (S.Bytes Maybe LogicalTypeBytes
_) (S.Bytes Maybe LogicalTypeBytes
r)   = forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe LogicalTypeBytes -> ReadSchema
Read.Bytes Maybe LogicalTypeBytes
r)
deconflict (S.Bytes Maybe LogicalTypeBytes
_) (S.String Maybe LogicalTypeString
r)  = forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe LogicalTypeString -> ReadSchema
Read.String Maybe LogicalTypeString
r)

deconflict (S.String Maybe LogicalTypeString
_) (S.String Maybe LogicalTypeString
r) = forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe LogicalTypeString -> ReadSchema
Read.String Maybe LogicalTypeString
r)
deconflict (S.String Maybe LogicalTypeString
_) (S.Bytes Maybe LogicalTypeBytes
r)  = forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe LogicalTypeBytes -> ReadSchema
Read.Bytes Maybe LogicalTypeBytes
r)

deconflict (S.Array Schema
w) (S.Array Schema
r)   = ReadSchema -> ReadSchema
Read.Array forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Schema -> Schema -> Either String ReadSchema
deconflict Schema
w Schema
r

deconflict (S.Map Schema
w) (S.Map Schema
r)       = ReadSchema -> ReadSchema
Read.Map forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Schema -> Schema -> Either String ReadSchema
deconflict Schema
w Schema
r

deconflict w :: Schema
w@S.Enum{} r :: Schema
r@S.Enum{}
  | Schema -> TypeName
name Schema
w forall a. Eq a => a -> a -> Bool
== Schema -> TypeName
name Schema
r Bool -> Bool -> Bool
&& Schema -> Vector Text
symbols Schema
w Vector Text -> Vector Text -> Bool
`contains` Schema -> Vector Text
symbols Schema
r = forall (f :: * -> *) a. Applicative f => a -> f a
pure Read.Enum
    { name :: TypeName
Read.name    = Schema -> TypeName
name Schema
r
    , aliases :: [TypeName]
Read.aliases = Schema -> [TypeName]
aliases Schema
w forall a. Semigroup a => a -> a -> a
<> Schema -> [TypeName]
aliases Schema
r
    , doc :: Maybe Text
Read.doc     = Schema -> Maybe Text
doc Schema
r
    , symbols :: Vector Text
Read.symbols = Schema -> Vector Text
symbols Schema
w
    }

deconflict w :: Schema
w@S.Fixed {} r :: Schema
r@S.Fixed {}
  | Schema -> TypeName
name Schema
w forall a. Eq a => a -> a -> Bool
== Schema -> TypeName
name Schema
r Bool -> Bool -> Bool
&& Schema -> Int
size Schema
w forall a. Eq a => a -> a -> Bool
== Schema -> Int
size Schema
r = forall (f :: * -> *) a. Applicative f => a -> f a
pure Read.Fixed
    { name :: TypeName
Read.name    = Schema -> TypeName
name Schema
r
    , aliases :: [TypeName]
Read.aliases = Schema -> [TypeName]
aliases Schema
w forall a. Semigroup a => a -> a -> a
<> Schema -> [TypeName]
aliases Schema
r
    , size :: Int
Read.size    = Schema -> Int
size Schema
w
    , logicalTypeF :: Maybe LogicalTypeFixed
Read.logicalTypeF = Schema -> Maybe LogicalTypeFixed
logicalTypeF Schema
r
    }

deconflict w :: Schema
w@S.Record {} r :: Schema
r@S.Record {}
  | Schema -> TypeName
name Schema
w forall a. Eq a => a -> a -> Bool
== Schema -> TypeName
name Schema
r Bool -> Bool -> Bool
|| Schema -> TypeName
name Schema
w forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` Schema -> [TypeName]
aliases Schema
r = do
    [ReadField]
fields' <- [Field] -> [Field] -> Either String [ReadField]
deconflictFields (Schema -> [Field]
fields Schema
w) (Schema -> [Field]
fields Schema
r)
    forall (f :: * -> *) a. Applicative f => a -> f a
pure Read.Record
      { name :: TypeName
Read.name    = Schema -> TypeName
name Schema
r
      , aliases :: [TypeName]
Read.aliases = Schema -> [TypeName]
aliases Schema
w forall a. Semigroup a => a -> a -> a
<> Schema -> [TypeName]
aliases Schema
r
      , doc :: Maybe Text
Read.doc     = Schema -> Maybe Text
doc Schema
r
      , fields :: [ReadField]
Read.fields  = [ReadField]
fields'
      }

deconflict (S.Union Vector Schema
ws) (S.Union Vector Schema
rs) =
  let
    err :: Schema -> String
err Schema
x = String
"Incorrect payload: union " forall a. Semigroup a => a -> a -> a
<> (forall a. Show a => a -> String
show forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: * -> *) a. Foldable t => t a -> [a]
Foldable.toList forall a b. (a -> b) -> a -> b
$ Schema -> Text
typeName forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Vector Schema
rs) forall a. Semigroup a => a -> a -> a
<> String
" does not contain schema " forall a. Semigroup a => a -> a -> a
<> Text -> String
Text.unpack (Schema -> Text
typeName Schema
x)
  in Vector (Int, ReadSchema) -> ReadSchema
Read.Union forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Vector a -> m (Vector b)
V.mapM (\Schema
w -> forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ Schema -> String
err Schema
w) (\(Int
i, Schema
r') -> (Int
i,) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Schema -> Schema -> Either String ReadSchema
deconflict Schema
w Schema
r') (Schema -> Vector Schema -> Maybe (Int, Schema)
findTypeV Schema
w Vector Schema
rs)) Vector Schema
ws

deconflict Schema
nonUnion (S.Union Vector Schema
rs)
  | Just (Int
ix, Schema
y) <- Schema -> Vector Schema -> Maybe (Int, Schema)
findTypeV Schema
nonUnion Vector Schema
rs =
    Int -> ReadSchema -> ReadSchema
Read.FreeUnion Int
ix forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Schema -> Schema -> Either String ReadSchema
deconflict Schema
nonUnion Schema
y

deconflict Schema
a Schema
b = forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ String
"Can not resolve differing writer and reader schemas: " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> String
show (Schema
a, Schema
b)

contains :: V.Vector Text -> V.Vector Text -> Bool
contains :: Vector Text -> Vector Text -> Bool
contains Vector Text
container Vector Text
elts =
  forall (t :: * -> *). Foldable t => t Bool -> Bool
and [Text
e forall a. Eq a => a -> Vector a -> Bool
`V.elem` Vector Text
container | Text
e <- forall a. Vector a -> [a]
V.toList Vector Text
elts]

-- For each field:
--  1) If it exists in both schemas, deconflict it
--  2) If it's only in the reader schema and has a default, mark it defaulted.
--  2) If it's only in the reader schema and has no default, fail.
--  3) If it's only in the writer schema, mark it ignored.
deconflictFields :: [Field] -> [Field] -> Either String [ReadField]
deconflictFields :: [Field] -> [Field] -> Either String [ReadField]
deconflictFields [Field]
writerFields [Field]
readerFields =
  forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
sequence forall a b. (a -> b) -> a -> b
$ (Field -> Either String ReadField
deconflictField forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Field]
writerFields) forall a. Semigroup a => a -> a -> a
<> [Either String ReadField]
defaultedFields
  where
    indexedReaderFields :: [(Int, Field)]
indexedReaderFields = forall a b. [a] -> [b] -> [(a, b)]
zip [Int
0..] [Field]
readerFields
    defaultedFields :: [Either String ReadField]
defaultedFields = [forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry Int -> Field -> Either String ReadField
confirmDefaulted (Int, Field)
f | (Int, Field)
f <- [(Int, Field)]
indexedReaderFields, forall a. Maybe a -> Bool
isNothing (Field -> [(Int, Field)] -> Maybe (Int, Field)
findField (forall a b. (a, b) -> b
snd (Int, Field)
f) (forall a b. [a] -> [b] -> [(a, b)]
zip [Int
0..] [Field]
writerFields))]

    confirmDefaulted :: Int -> Field -> Either String ReadField
    confirmDefaulted :: Int -> Field -> Either String ReadField
confirmDefaulted Int
ix Field
f
      | Just DefaultValue
def <- Field -> Maybe DefaultValue
fldDefault Field
f = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ FieldStatus -> Field -> ReadField
Read.fromField (Int -> DefaultValue -> FieldStatus
Defaulted Int
ix DefaultValue
def) Field
f
      | Bool
otherwise = forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ String
"No default found for deconflicted field " forall a. Semigroup a => a -> a -> a
<> Text -> String
Text.unpack (Field -> Text
fldName Field
f)

    deconflictField :: Field -> Either String ReadField
    deconflictField :: Field -> Either String ReadField
deconflictField Field
writerField
      | Just (Int
ix, Field
readerField) <- Field -> [(Int, Field)] -> Maybe (Int, Field)
findField Field
writerField [(Int, Field)]
indexedReaderFields = do
        ReadSchema
t <- Schema -> Schema -> Either String ReadSchema
deconflict (Field -> Schema
fldType Field
writerField) (Field -> Schema
fldType Field
readerField)
        forall (f :: * -> *) a. Applicative f => a -> f a
pure (FieldStatus -> Field -> ReadField
Read.fromField (Int -> FieldStatus
AsIs Int
ix) Field
writerField) { fldType :: ReadSchema
Read.fldType = ReadSchema
t, fldDefault :: Maybe DefaultValue
Read.fldDefault = Field -> Maybe DefaultValue
fldDefault Field
readerField}
      | Bool
otherwise =
        forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ (FieldStatus -> Field -> ReadField
Read.fromField FieldStatus
Ignored Field
writerField) { fldDefault :: Maybe DefaultValue
Read.fldDefault = forall a. Maybe a
Nothing }

findField :: Field -> [(Int, Field)] -> Maybe (Int, Field)
findField :: Field -> [(Int, Field)] -> Maybe (Int, Field)
findField Field
w [(Int, Field)]
rs =
  let
    byName :: Maybe (Int, Field)
byName = forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (\(Int, Field)
x -> Field -> Text
fldName (forall a b. (a, b) -> b
snd (Int, Field)
x) forall a. Eq a => a -> a -> Bool
== Field -> Text
fldName Field
w) [(Int, Field)]
rs
    allNames :: Field -> Set Text
allNames Field
fld = forall a. Ord a => [a] -> Set a
Set.fromList (Field -> Text
fldName Field
fld forall a. a -> [a] -> [a]
: Field -> [Text]
fldAliases Field
fld)
    fNames :: Set Text
fNames = Field -> Set Text
allNames Field
w
    sameField :: Field -> Bool
sameField = Bool -> Bool
not forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Set a -> Bool
Set.null forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Ord a => Set a -> Set a -> Set a
Set.intersection Set Text
fNames forall b c a. (b -> c) -> (a -> b) -> a -> c
. Field -> Set Text
allNames
    byAliases :: Maybe (Int, Field)
byAliases = forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (Field -> Bool
sameField forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a, b) -> b
snd) [(Int, Field)]
rs
  in Maybe (Int, Field)
byName forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Maybe (Int, Field)
byAliases

findTypeV :: Schema -> Vector Schema -> Maybe (Int, Schema)
findTypeV :: Schema -> Vector Schema -> Maybe (Int, Schema)
findTypeV Schema
schema Vector Schema
schemas =
  let tn :: Text
tn = Schema -> Text
typeName Schema
schema
      allNames :: Schema -> [Text]
allNames Schema
typ =
        Schema -> Text
typeName Schema
typ forall a. a -> [a] -> [a]
: forall a b. (a -> b) -> [a] -> [b]
map TypeName -> Text
renderFullname (Schema -> [TypeName]
typeAliases Schema
typ)
  in ((,) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. a -> a
id forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. Vector a -> Int -> a
V.unsafeIndex Vector Schema
schemas) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
        forall a. (a -> Bool) -> Vector a -> Maybe Int
V.findIndex ((Text
tn forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem`) forall b c a. (b -> c) -> (a -> b) -> a -> c
. Schema -> [Text]
allNames) Vector Schema
schemas