module Spark.Core.Internal.FunctionsInternals(
DynColPackable,
StaticColPackable2,
NameTuple(..),
TupleEquivalence(..),
asCol,
pack1,
pack,
pack',
struct',
struct,
) where
import Control.Arrow
import qualified Data.Vector as V
import qualified Data.Text as T
import Data.List(sort, nub)
import Formatting
import Spark.Core.Internal.ColumnStructures
import Spark.Core.Internal.ColumnFunctions
import Spark.Core.Internal.DatasetFunctions
import Spark.Core.Internal.DatasetStructures
import Spark.Core.Internal.Utilities
import Spark.Core.Internal.TypesGenerics
import Spark.Core.Internal.TypesFunctions
import Spark.Core.Internal.TypesStructures
import Spark.Core.Internal.OpStructures
import Spark.Core.StructuresInternal
import Spark.Core.Try
class DynColPackable a where
_packAsColumn :: a -> DynColumn
class StaticColPackable2 ref a b | a -> ref where
_staticPackAsColumn2 :: a -> Column ref b
data NameTuple to = NameTuple [String]
class TupleEquivalence to tup | to -> tup where
tupleFieldNames :: NameTuple to
asCol :: (HasCallStack) => Dataset a -> Column a a
asCol ds =
iEmptyCol ds (unsafeCastType $ nodeType ds) (FieldPath V.empty)
pack1 :: (HasCallStack) => Column ref a -> Dataset a
pack1 c =
emptyDataset (NodeStructuredTransform (colOp c)) (colType c)
`parents` [untyped (colOrigin c)]
pack' :: (DynColPackable a) => a -> DataFrame
pack' z = pack1 <$> _packAsColumn z
pack :: forall ref a b. (StaticColPackable2 ref a b, HasCallStack) => a -> Dataset b
pack z =
let c = _staticPackAsColumn2 z :: ColumnData ref b
in pack1 c
struct' :: (HasCallStack) => [DynColumn] -> DynColumn
struct' cols = do
l <- sequence cols
let fields = (colFieldName &&& id) <$> l
_buildStruct fields
struct :: forall ref a b. (StaticColPackable2 ref a b, HasCallStack) => a -> Column ref b
struct = _staticPackAsColumn2
instance forall x. (DynColPackable x) => DynColPackable [x] where
_packAsColumn = struct' . (_packAsColumn <$>)
instance DynColPackable DynColumn where
_packAsColumn = id
instance forall ref a. DynColPackable (Column ref a) where
_packAsColumn = pure . iUntypedColData
instance forall z1 z2. (DynColPackable z1, DynColPackable z2) => DynColPackable (z1, z2) where
_packAsColumn (c1, c2) = struct' [_packAsColumn c1, _packAsColumn c2]
instance forall ref a. StaticColPackable2 ref (Column ref a) a where
_staticPackAsColumn2 = id
instance forall a1 a2. TupleEquivalence (a1, a2) (a1, a2) where
tupleFieldNames = NameTuple ["_1", "_2"]
instance forall ref b a1 a2 z1 z2. (
SQLTypeable b,
TupleEquivalence b (a1, a2),
StaticColPackable2 ref z1 a1,
StaticColPackable2 ref z2 a2) =>
StaticColPackable2 ref (z1, z2) b where
_staticPackAsColumn2 (c1, c2) =
let
x1 = iUntypedColData (_staticPackAsColumn2 c1 :: Column ref a1)
x2 = iUntypedColData (_staticPackAsColumn2 c2 :: Column ref a2)
names = tupleFieldNames :: NameTuple b
in _unsafeBuildStruct [x1, x2] names
instance forall ref b a1 a2 a3 z1 z2 z3. (
SQLTypeable b,
TupleEquivalence b (a1, a2, a3),
StaticColPackable2 ref z1 a1,
StaticColPackable2 ref z2 a2,
StaticColPackable2 ref z3 a3) =>
StaticColPackable2 ref (z1, z2, z3) b where
_staticPackAsColumn2 (c1, c2, c3) =
let
x1 = iUntypedColData (_staticPackAsColumn2 c1 :: Column ref a1)
x2 = iUntypedColData (_staticPackAsColumn2 c2 :: Column ref a2)
x3 = iUntypedColData (_staticPackAsColumn2 c3 :: Column ref a3)
names = tupleFieldNames :: NameTuple b
in _unsafeBuildStruct [x1, x2, x3] names
_unsafeBuildStruct :: (HasCallStack, SQLTypeable x) =>
[UntypedColumnData] -> NameTuple x -> Column ref x
_unsafeBuildStruct cols (NameTuple names) =
if length cols /= length names
then failure $ sformat ("The number of columns and names differs:"%sh%" and "%sh) cols names
else
let fnames = unsafeFieldName . T.pack <$> names
uc = _buildStruct (fnames `zip` cols)
z = forceRight uc
in z { _cOp = _cOp z }
_buildStruct :: [(FieldName, UntypedColumnData)] -> Try UntypedColumnData
_buildStruct [] = tryError "You cannot build an empty structure"
_buildStruct ((hfn, hcol):t) =
let cols = ((hfn, hcol):t)
cols' = V.fromList cols
fields = ColStruct $ (uncurry TransformField .(fst &&& colOp . snd)) <$> cols'
ct = StructType $ (uncurry StructField . (fst &&& unSQLType . colType . snd)) <$> cols'
name = "struct(" <> T.intercalate "," (unFieldName . fst <$> cols) <> ")"
names = fst <$> cols
numNames = length names
numDistincts = length . nub $ names
origins = _columnOrigin (snd <$> cols)
in case (origins, numNames == numDistincts) of
([_], True) ->
pure ColumnData {
_cOrigin = _cOrigin hcol,
_cType = StrictType $ Struct ct,
_cOp = fields,
_cReferingPath = Just $ unsafeFieldName name
}
(l, True) -> tryError $ sformat ("Too many distinct origins: "%sh) l
(_, False) -> tryError $ sformat ("Duplicate field names when building the struct: "%sh) (sort names)
_columnOrigin :: [UntypedColumnData] -> [UntypedDataset]
_columnOrigin l =
let
groups = myGroupBy' (nodeId . colOrigin) l
in (colOrigin . head . snd) <$> groups