module Spark.Core.Internal.ColumnFunctions(
colType,
colOrigin,
colOp,
colFieldName,
iUntypedColData,
iEmptyCol,
unsafeStaticProjection,
dynamicProjection,
untypedCol,
colFromObs,
colFromObs',
castCol,
castCol',
colRef
) where
import qualified Data.Text as T
import qualified Data.Text.Format as TF
import qualified Data.Vector as V
import Data.String(IsString(fromString))
import Data.Text.Lazy(toStrict)
import Data.Maybe(fromMaybe)
import Data.List(find)
import Formatting
import Spark.Core.Internal.ColumnStructures
import Spark.Core.Internal.DatasetFunctions
import Spark.Core.Internal.DatasetStructures
import Spark.Core.Internal.TypesStructures
import Spark.Core.Try
import Spark.Core.StructuresInternal
import Spark.Core.Internal.TypesFunctions
import Spark.Core.Internal.OpStructures
import Spark.Core.Internal.OpFunctions(prettyShowColOp)
import Spark.Core.Internal.AlgebraStructures
import Spark.Core.Internal.Utilities
unsafeStaticProjection :: forall from to. (HasCallStack) =>
SQLType from
-> String
-> StaticColProjection from to
unsafeStaticProjection sqlt field =
let
f = forceRight . fieldPath . T.pack $ field
sqlt' = fromMaybe
(failure $ sformat ("unsafeStaticProjection: Cannot find the field "%sh%" in type "%sh) field sqlt)
(_extractPath sqlt f)
in StaticColProjection (sqlt, f, sqlt')
dynamicProjection :: String -> DynamicColProjection
dynamicProjection txt = case fieldPath (T.pack txt) of
Left msg -> DynamicColProjection $ \_ ->
tryError $ sformat ("dynamicProjection: invalid syntax for path "%shown%": "%shown) txt msg
Right fpath -> _dynamicProjection fpath
colType :: Column ref a -> SQLType a
colType = SQLType . _cType
untypedCol :: Column ref a -> DynColumn
untypedCol = pure . _unsafeCastColData . _dropReference
castCol :: ColumnReference ref -> SQLType a -> DynColumn -> Try (Column ref a)
castCol r sqlt dc =
dc >>= _checkedCastColData sqlt >>= _checkedCastRefColData r
castCol' :: SQLType a -> DynColumn -> Try (Column UnknownReference a)
castCol' = castCol ColumnReference
colOrigin :: Column ref a -> UntypedDataset
colOrigin = _cOrigin
colOp :: Column ref a -> ColOp
colOp = _cOp
colRef :: Column ref a -> ColumnReference ref
colRef _ = ColumnReference
colFromObs :: (HasCallStack) => LocalData a -> Column (LocalData a) a
colFromObs = missing "colFromObs"
colFromObs' :: (HasCallStack) => LocalFrame -> DynColumn
colFromObs' = missing "colFromObs'"
colFieldName :: ColumnData ref a -> FieldName
colFieldName c =
fromMaybe (unsafeFieldName . prettyShowColOp . _cOp $ c)
(_cReferingPath c)
instance forall a to. Projection (Dataset a) (StaticColProjection a to) (Column a to) where
_performProjection = _project
instance forall a. Projection (Dataset a) DynamicColProjection DynColumn where
_performProjection = _projectDyn
instance forall a . Projection (Dataset a) String DynColumn where
_performProjection ds s = _projectDyn ds (_strToDynProj s)
instance Projection DataFrame DynamicColProjection DynColumn where
_performProjection = _projectDF
instance forall a to. Projection DataFrame (StaticColProjection a to) DynColumn where
_performProjection df proj = _projectDF df (_colStaticProjToDynProj proj)
instance Projection DataFrame String DynColumn where
_performProjection df s = _projectDF df (_strToDynProj s)
instance forall ref a to. Projection (Column ref a) (StaticColProjection a to) (Column ref to) where
_performProjection = _projectCol
instance Projection DynColumn DynamicColProjection DynColumn where
_performProjection = _projectDynCol
instance forall a to. Projection DynColumn (StaticColProjection a to) DynColumn where
_performProjection dc proj = _projectDynCol dc (_colStaticProjToDynProj proj)
class StringStuff a where
stuffAsString :: a -> String
instance StringStuff String where
stuffAsString = undefined
instance Projection DynColumn String DynColumn where
_performProjection dc s = _performProjection dc (_strToDynProj s)
_strToDynProj :: String -> DynamicColProjection
_strToDynProj s =
let
fun dt =
case fieldPath (T.pack s) of
Right fp -> _dynProjTry (_dynamicProjection fp) dt
Left msg -> tryError (T.pack msg)
in DynamicColProjection fun
_colStaticProjToDynProj :: forall from to. StaticColProjection from to -> DynamicColProjection
_colStaticProjToDynProj (StaticColProjection (SQLType dtFrom, fp, SQLType dtTo)) =
DynamicColProjection $ \dt ->
if dt /= dtFrom then
tryError $ sformat ("Cannot convert type "%shown%" into type "%shown) dt dtFrom
else pure (fp, dtTo)
iUntypedColData :: Column ref a -> UntypedColumnData
iUntypedColData = _unsafeCastColData . _dropReference
_unsafeCastColData :: Column ref a -> Column ref b
_unsafeCastColData c = c { _cType = _cType c }
_checkedCastColData :: SQLType b -> ColumnData ref a -> Try (ColumnData ref b)
_checkedCastColData sqlt cd =
if (unSQLType sqlt) == (unSQLType (colType cd))
then pure (_unsafeCastColData cd)
else tryError $ sformat ("Cannot cast column "%sh%" to type "%sh) cd sqlt
_checkedCastRefColData :: ColumnReference ref2 -> ColumnData ref a -> Try (ColumnData ref2 a)
_checkedCastRefColData _ cd =
pure $ cd { _cType = _cType cd }
_dynamicProjection :: FieldPath -> DynamicColProjection
_dynamicProjection fpath =
let
fun dt = case _extractPath (SQLType dt) fpath of
Just (SQLType dt') -> pure (fpath, dt')
Nothing ->
tryError $ sformat ("unsafeStaticProjection: Cannot find the field "%shown%" in type "%shown) fpath dt
in DynamicColProjection fun
_projectDyn :: Dataset from -> DynamicColProjection -> DynColumn
_projectDyn ds proj = do
(p, dt) <- _dynProjTry proj (unSQLType . nodeType $ ds)
_emptyDynCol ds dt p
_projectDF :: DataFrame -> DynamicColProjection -> DynColumn
_projectDF df proj = do
node <- df
_projectDyn node proj
_project :: Dataset from -> StaticColProjection from to -> Column from to
_project ds proj = let (_, p, sqlt) = _staticProj proj in
iEmptyCol ds sqlt p
_projectCol :: Column ref from -> StaticColProjection from to -> Column ref to
_projectCol c (StaticColProjection (_, fp, SQLType dt)) =
_projectColData0 c fp dt
_projectColData0 :: ColumnData ref a -> FieldPath -> DataType -> ColumnData ref b
_projectColData0 cd (FieldPath p) dtTo =
case colOp cd of
ColExtraction (FieldPath p') ->
cd { _cOp = ColExtraction (FieldPath (p V.++ p')),
_cType = dtTo}
_ ->
cd { _cOp = ColExtraction (FieldPath p),
_cType = dtTo}
_projectDynColData :: ColumnData ref a -> DynamicColProjection -> DynColumn
_projectDynColData cd proj =
_dynProjTry proj (_cType cd) <&> uncurry (_projectColData0 . _dropReference $ cd)
_projectDynCol :: DynColumn -> DynamicColProjection -> DynColumn
_projectDynCol c proj = do
cd <- c
_projectDynColData cd proj
_extractPath :: SQLType from -> FieldPath -> Maybe (SQLType to)
_extractPath sqlt (FieldPath v) = _extractPath0 sqlt (V.toList v)
_extractPath0 :: SQLType from -> [FieldName] -> Maybe (SQLType to)
_extractPath0 sqlt [] = Just (unsafeCastType sqlt)
_extractPath0 sqlt (field : l) = do
inner <- _extractField sqlt field
_extractPath0 inner l
_extractField :: SQLType from -> FieldName -> Maybe (SQLType to)
_extractField (SQLType (StrictType (Struct (StructType fields)))) f =
let z = find (\x -> structFieldName x == f) fields in
SQLType . structFieldType <$> z
_extractField (SQLType (NullableType (Struct (StructType fields)))) f =
let z = find (\x -> structFieldName x == f) fields in
SQLType . structFieldType <$> z
_extractField _ _ = Nothing
_dropReference :: ColumnData ref a -> ColumnData UnknownReference a
_dropReference c = c {_cOp = _cOp c}
iEmptyCol :: Dataset a -> SQLType b -> FieldPath -> Column a b
iEmptyCol = _emptyColData
_emptyDynCol :: Dataset a -> DataType -> FieldPath -> DynColumn
_emptyDynCol ds dt fp = Right $ _dropReference $ _emptyColData ds (SQLType dt) fp
_emptyColData :: Dataset a -> SQLType b -> FieldPath -> ColumnData a b
_emptyColData ds sqlt path = ColumnData {
_cOrigin = untypedDataset ds,
_cType = unSQLType sqlt,
_cOp = ColExtraction path,
_cReferingPath = Nothing
}
_homoColOp2 :: T.Text -> Column ref x -> Column ref x -> Column ref x
_homoColOp2 opName c1 c2 =
let co = ColFunction opName (V.fromList (colOp <$> [c1, c2]))
in ColumnData {
_cOrigin = _cOrigin c1,
_cType = _cType c1,
_cOp = co,
_cReferingPath = Nothing }
_homoColOp2' :: T.Text -> DynColumn -> DynColumn -> DynColumn
_homoColOp2' opName c1' c2' = do
c1 <- c1'
c2 <- c2'
return $ _homoColOp2 opName c1 c2
instance forall ref a. Show (Column ref a) where
show c =
let
name = case _cReferingPath c of
Just fn -> show' fn
Nothing -> prettyShowColOp . colOp $ c
txt = fromString "{}{{}}->{}" :: TF.Format
fields = T.pack . show . colType $ c
nn = unNodeName . nodeName . _cOrigin $ c
in T.unpack $ toStrict $ TF.format txt (name, fields, nn)
instance forall ref a. CanRename (ColumnData ref a) FieldName where
c @@ fn = c { _cReferingPath = Just fn }
instance forall ref a. CanRename (Column ref a) String where
c @@ str = case fieldName (T.pack str) of
Right fp -> c @@ fp
Left msg ->
failure $ sformat ("Could not make a field path out of string "%shown%" for column "%shown%":"%shown) str c msg
instance CanRename DynColumn FieldName where
(Right cd) @@ fn = Right (cd @@ fn)
x @@ _ = x
instance CanRename DynColumn String where
(Right cd) @@ str = case fieldName (T.pack str) of
Right fp -> Right $ cd @@ fp
Left msg ->
tryError $ sformat ("Could not make a field path out of string "%shown%" for column "%shown%":"%shown) str cd msg
x @@ _ = x
instance forall a. HomoBinaryOp2 a a a where
_liftFun f = BinaryOpFun id id f
instance forall ref a. HomoBinaryOp2 (Column ref a) DynColumn DynColumn where
_liftFun f = BinaryOpFun untypedCol id f
instance forall ref a. HomoBinaryOp2 DynColumn (Column ref a) DynColumn where
_liftFun f = BinaryOpFun id untypedCol f
instance (Num x) => Num (Column ref x) where
(+) = _homoColOp2 "sum"
(*) _ _ = missing "Num (Column x): *"
abs _ = missing "Num (Column x): abs"
signum _ = missing "Num (Column x): signum"
fromInteger _ = missing "Num (Column x): fromInteger"
negate _ = missing "Num (Column x): negate"
instance Num DynColumn where
(+) = _homoColOp2' "sum"
(*) _ _ = missing "Num (DynColumn x): *"
abs _ = missing "Num (DynColumn x): abs"
signum _ = missing "Num (DynColumn x): signum"
fromInteger _ = missing "Num (DynColumn x): fromInteger"
negate _ = missing "Num (DynColumn x): negate"