{-# LANGUAGE OverloadedStrings #-}
module Hasql.Streams (
CursorStreamFold,
cursorStreamQuery,
) where
import Hasql.Statement
import Hasql.CursorTransactionIO as CursorTransactionIO
type CursorStreamFold s a r =
(forall x.
(x -> CursorTransactionIO s (Maybe (a, x))) ->
CursorTransactionIO s x ->
r
)
cursorStreamQuery :: forall params s a r.
Statement params [a] -> params -> CursorStreamFold s a r -> r
cursorStreamQuery :: Statement params [a] -> params -> CursorStreamFold s a r -> r
cursorStreamQuery Statement params [a]
stmt params
params CursorStreamFold s a r
foldStream = (StreamState s a
-> CursorTransactionIO s (Maybe (a, StreamState s a)))
-> CursorTransactionIO s (StreamState s a) -> r
CursorStreamFold s a r
foldStream StreamState s a
-> CursorTransactionIO s (Maybe (a, StreamState s a))
step CursorTransactionIO s (StreamState s a)
init
where
init :: CursorTransactionIO s (StreamState s a)
init :: CursorTransactionIO s (StreamState s a)
init = do
Cursor s [a]
cursor <- params
-> Statement params [a] -> CursorTransactionIO s (Cursor s [a])
forall params result s.
params
-> Statement params result
-> CursorTransactionIO s (Cursor s result)
declareCursorFor params
params Statement params [a]
stmt
StreamState s a -> CursorTransactionIO s (StreamState s a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Cursor s [a]
cursor, [])
step :: StreamState s a -> CursorTransactionIO s (Maybe (a, StreamState s a))
step :: StreamState s a
-> CursorTransactionIO s (Maybe (a, StreamState s a))
step (Cursor s [a]
cursor, [a]
batch) = case [a]
batch of
(a
a:[a]
batch') -> Maybe (a, StreamState s a)
-> CursorTransactionIO s (Maybe (a, StreamState s a))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (a, StreamState s a)
-> CursorTransactionIO s (Maybe (a, StreamState s a)))
-> Maybe (a, StreamState s a)
-> CursorTransactionIO s (Maybe (a, StreamState s a))
forall a b. (a -> b) -> a -> b
$ (a, StreamState s a) -> Maybe (a, StreamState s a)
forall a. a -> Maybe a
Just (a
a, (Cursor s [a]
cursor, [a]
batch'))
[] -> do
[a]
batch' <- Cursor s [a] -> CursorTransactionIO s [a]
forall s a. Cursor s a -> CursorTransactionIO s a
fetchWithCursor Cursor s [a]
cursor
if [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
Prelude.null [a]
batch'
then Maybe (a, StreamState s a)
-> CursorTransactionIO s (Maybe (a, StreamState s a))
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (a, StreamState s a)
forall a. Maybe a
Nothing
else StreamState s a
-> CursorTransactionIO s (Maybe (a, StreamState s a))
step (Cursor s [a]
cursor, [a]
batch')
type StreamState s a = (Cursor s [a], [a])