{-# LANGUAGE FlexibleContexts, MultiParamTypeClasses, OverloadedStrings,
ScopedTypeVariables, LambdaCase #-}
module Streamly.Csv
(
decode
, decodeWith
, decodeWithErrors
, CsvParseException (..)
, chunkStream
, decodeByName
, decodeByNameWith
, decodeByNameWithErrors
, encode
, encodeDefault
, encodeWith
, encodeByName
, encodeByNameDefault
, encodeByNameWith
, FromRecord (..)
, FromNamedRecord (..)
, ToRecord (..)
, ToNamedRecord (..)
, DefaultOrdered (..)
, HasHeader (..)
, Header
, header
, Name
, DecodeOptions(..)
, defaultDecodeOptions
, EncodeOptions(..)
, defaultEncodeOptions
) where
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import Streamly
import qualified Streamly.Prelude as S
import Data.Csv (DecodeOptions(..), DefaultOrdered(..),
EncodeOptions(..), FromNamedRecord(..),
FromRecord(..), Header, Name,
ToNamedRecord(..), ToRecord(..),
defaultDecodeOptions,
defaultEncodeOptions, encIncludeHeader,
header)
import Data.Csv.Incremental (HasHeader(..), HeaderParser(..),
Parser(..))
import qualified Data.Csv.Incremental as CI
import System.IO (Handle)
import Control.Exception (Exception(..))
import Control.Monad.Catch (MonadThrow(..))
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.Word (Word8)
import Data.Bifunctor (first)
import Data.Maybe (fromMaybe)
import Data.String (IsString(..))
import Data.Typeable (Typeable)
decode :: (IsStream t, MonadAsync m, FromRecord a)
=> HasHeader
-> t m BS.ByteString
-> t m a
decode = decodeWith defaultDecodeOptions
decodeWith :: (IsStream t, MonadAsync m, FromRecord a)
=> DecodeOptions -> HasHeader
-> t m BS.ByteString
-> t m a
decodeWith opts hdr chunks = getValues (decodeWithErrors opts hdr chunks)
decodeWithErrors :: (IsStream t, Monad m, FromRecord a, MonadThrow m)
=> DecodeOptions -> HasHeader
-> t m BS.ByteString
-> t m (Either CsvParseException a)
decodeWithErrors opts = runParser . CI.decodeWith opts
runParser :: forall t a m. (IsStream t, Monad m, MonadThrow m)
=> Parser a -> t m BS.ByteString -> t m (Either CsvParseException a)
runParser p chunked = S.concatMap fst $ S.scanlM' continue (S.nil, const p) $
S.cons BS.empty chunked
where
continue :: (t m (Either CsvParseException a), BS.ByteString -> Parser a)
-> BS.ByteString
-> m (t m (Either CsvParseException a), BS.ByteString -> Parser a)
continue (_, p) chunk =
case p chunk of
Fail bs err -> throwM (CsvParseException err)
Many es get -> return (withEach es, get)
Done es -> return (withEach es, p)
withEach = S.fromList . map (first CsvParseException)
chunkStream :: (IsStream t, MonadAsync m) => Handle -> Int -> t m BS.ByteString
chunkStream h chunkSize = loop
where
loop = S.takeWhile (not . BS.null) $
liftIO (BS.hGetSome h chunkSize) `S.consM` loop
decodeByName :: (MonadAsync m, FromNamedRecord a)
=> SerialT m BS.ByteString -> SerialT m a
decodeByName = decodeByNameWith defaultDecodeOptions
decodeByNameWith :: (MonadAsync m, FromNamedRecord a)
=> DecodeOptions
-> SerialT m BS.ByteString -> SerialT m a
decodeByNameWith opts bs = getValues (decodeByNameWithErrors opts bs)
decodeByNameWithErrors :: forall m a. (Monad m, MonadThrow m, FromNamedRecord a)
=> DecodeOptions
-> SerialT m BS.ByteString
-> SerialT m (Either CsvParseException a)
decodeByNameWithErrors opts chunked = do
(p, rest) <- S.yieldM $ extractParser (const $ CI.decodeByNameWith opts) $ S.cons BS.empty chunked
runParser p rest
where
extractParser :: (BS.ByteString -> HeaderParser (Parser a))
-> SerialT m BS.ByteString
-> m (Parser a, SerialT m BS.ByteString)
extractParser p chunks = S.uncons chunks >>= \case
Just (hed, rest) ->
case p hed of
FailH bs err -> throwM (CsvParseException err)
PartialH get -> extractParser get rest
DoneH _ p -> return (p, rest)
Nothing -> throwM $ CsvParseException "Unexpected end of input stream"
encode :: (IsStream t, ToRecord a, Monad m) => Maybe Header
-> t m a -> t m BS.ByteString
encode = encodeWith defaultEncodeOptions
encodeDefault :: forall a t m. (IsStream t, ToRecord a, DefaultOrdered a, Monad m)
=> t m a -> t m BS.ByteString
encodeDefault = encode (Just (headerOrder (undefined :: a)))
encodeWith :: (IsStream t, ToRecord a, Monad m)
=> EncodeOptions
-> Maybe Header
-> t m a
-> t m BS.ByteString
encodeWith opts mhdr = S.concatMap S.fromList
. addHeaders
. S.map enc
where
addHeaders = maybe id (S.cons . enc) mhdr
enc :: (ToRecord v) => v -> [BS.ByteString]
enc = BSL.toChunks . CI.encodeWith opts . CI.encodeRecord
encodeByNameDefault :: forall a t m. (IsStream t, DefaultOrdered a, ToNamedRecord a, Monad m)
=> t m a -> t m BS.ByteString
encodeByNameDefault = encodeByName (headerOrder (undefined :: a))
encodeByName :: (IsStream t, ToNamedRecord a, Monad m) => Header
-> t m a -> t m BS.ByteString
encodeByName = encodeByNameWith defaultEncodeOptions
encodeByNameWith :: (IsStream t, ToNamedRecord a, Monad m) => EncodeOptions -> Header
-> t m a -> t m BS.ByteString
encodeByNameWith opts hdr = S.concatMap S.fromList
. addHeaders
. S.map enc
where
opts' = opts { encIncludeHeader = False }
addHeaders
| encIncludeHeader opts = S.cons . BSL.toChunks
. CI.encodeWith opts' . CI.encodeRecord $ hdr
| otherwise = id
enc = BSL.toChunks . CI.encodeByNameWith opts' hdr . CI.encodeNamedRecord
getValues :: (IsStream t, MonadAsync m, Exception e)
=> t m (Either e a) -> t m a
getValues = S.mapM (either throwM return)
newtype CsvParseException = CsvParseException String
deriving (Eq, Show, Typeable)
instance IsString CsvParseException where
fromString = CsvParseException
instance Exception CsvParseException where
displayException (CsvParseException e) = "Error parsing csv: " ++ e