{-# options_haddock prune #-}
-- |Description: Streaming Implementation, Internal
module Polysemy.Http.Http where

import qualified Data.ByteString as ByteString
import Polysemy.Resource (Resource, bracket)

import Polysemy.Http.Data.HttpError (HttpError)
import Polysemy.Http.Data.Request (Request)
import Polysemy.Http.Data.Response (Response (Response))
import Polysemy.Http.Data.StreamChunk (StreamChunk (StreamChunk))
import qualified Polysemy.Http.Data.StreamEvent as StreamEvent
import Polysemy.Http.Data.StreamEvent (StreamEvent)
import qualified Polysemy.Http.Effect.Http as Http
import Polysemy.Http.Effect.Http (Http)

streamLoop ::
  Members [Http c, Error HttpError] r =>
  ( x . StreamEvent o c h x -> Sem r x) ->
  Response c ->
  h ->
  Sem r o
streamLoop :: (forall x. StreamEvent o c h x -> Sem r x)
-> Response c -> h -> Sem r o
streamLoop forall x. StreamEvent o c h x -> Sem r x
process response :: Response c
response@(Response Status
_ c
body [Header]
_ CookieJar
_) h
handle =
  Sem r o
spin
  where
    spin :: Sem r o
spin =
      ByteString -> Sem r o
handleChunk (ByteString -> Sem r o) -> Sem r ByteString -> Sem r o
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Either HttpError ByteString -> Sem r ByteString
forall e (r :: [(* -> *) -> * -> *]) a.
Member (Error e) r =>
Either e a -> Sem r a
fromEither (Either HttpError ByteString -> Sem r ByteString)
-> Sem r (Either HttpError ByteString) -> Sem r ByteString
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< c -> Sem r (Either HttpError ByteString)
forall c (r :: [(* -> *) -> * -> *]).
Member (Http c) r =>
c -> Sem r (Either HttpError ByteString)
Http.consumeChunk c
body
    handleChunk :: ByteString -> Sem r o
handleChunk (ByteString -> Bool
ByteString.null -> Bool
True) =
      StreamEvent o c h o -> Sem r o
forall x. StreamEvent o c h x -> Sem r x
process (Response c -> h -> StreamEvent o c h o
forall c h r. Response c -> h -> StreamEvent r c h r
StreamEvent.Result Response c
response h
handle)
    handleChunk !ByteString
chunk = do
      StreamEvent o c h () -> Sem r ()
forall x. StreamEvent o c h x -> Sem r x
process (h -> StreamChunk -> StreamEvent o c h ()
forall h r c. h -> StreamChunk -> StreamEvent r c h ()
StreamEvent.Chunk h
handle (ByteString -> StreamChunk
StreamChunk ByteString
chunk))
      Sem r o
spin

streamHandler ::
   o r c h .
  Members [Http c, Error HttpError, Resource] r =>
  ( x . StreamEvent o c h x -> Sem r x) ->
  Response c ->
  Sem r o
streamHandler :: (forall x. StreamEvent o c h x -> Sem r x) -> Response c -> Sem r o
streamHandler forall x. StreamEvent o c h x -> Sem r x
process Response c
response = do
  Sem r h -> (h -> Sem r ()) -> (h -> Sem r o) -> Sem r o
forall (r :: [(* -> *) -> * -> *]) a c b.
MemberWithError Resource r =>
Sem r a -> (a -> Sem r c) -> (a -> Sem r b) -> Sem r b
bracket Sem r h
acquire h -> Sem r ()
release ((forall x. StreamEvent o c h x -> Sem r x)
-> Response c -> h -> Sem r o
forall c (r :: [(* -> *) -> * -> *]) o h.
Members '[Http c, Error HttpError] r =>
(forall x. StreamEvent o c h x -> Sem r x)
-> Response c -> h -> Sem r o
streamLoop forall x. StreamEvent o c h x -> Sem r x
process Response c
response)
  where
    acquire :: Sem r h
acquire =
      StreamEvent o c h h -> Sem r h
forall x. StreamEvent o c h x -> Sem r x
process (Response c -> StreamEvent o c h h
forall c r h. Response c -> StreamEvent r c h h
StreamEvent.Acquire Response c
response)
    release :: h -> Sem r ()
release h
handle =
      StreamEvent o c h () -> Sem r ()
forall x. StreamEvent o c h x -> Sem r x
process (h -> StreamEvent o c h ()
forall h r c. h -> StreamEvent r c h ()
StreamEvent.Release h
handle)

-- |Initiate a request and stream the response, calling @process@ after connecting, for every chunk, after closing the
-- connection, and for the return value.
-- 'StreamEvent' is used to indicate the stage of the request cycle.
--
-- @
-- handle ::
--   StreamEvent Double (IO ByteString) Int a ->
--   Sem r a
-- handle = \\case
--   StreamEvent.Acquire (Response status body headers) ->
--     pure 1
--   StreamEvent.Chunk handle (StreamChunk c) ->
--     pure ()
--   StreamEvent.Result (Response status body headers) handle ->
--     pure 5.5
--   StreamEvent.Release handle ->
--     pure ()
-- @
-- >>> runInterpreters $ streamResponse (Http.get "host.com" "path/to/file") handle
-- 5.5
streamResponse ::
  Members [Http c, Error HttpError, Resource] r =>
  Request ->
  ( x . StreamEvent o c h x -> Sem r x) ->
  Sem r o
streamResponse :: Request -> (forall x. StreamEvent o c h x -> Sem r x) -> Sem r o
streamResponse Request
request forall x. StreamEvent o c h x -> Sem r x
process =
  Either HttpError o -> Sem r o
forall e (r :: [(* -> *) -> * -> *]) a.
Member (Error e) r =>
Either e a -> Sem r a
fromEither (Either HttpError o -> Sem r o)
-> (Either HttpError (Either HttpError o) -> Either HttpError o)
-> Either HttpError (Either HttpError o)
-> Sem r o
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Either HttpError (Either HttpError o) -> Either HttpError o
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (Either HttpError (Either HttpError o) -> Sem r o)
-> Sem r (Either HttpError (Either HttpError o)) -> Sem r o
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Request
-> (Response c -> Sem r (Either HttpError o))
-> Sem r (Either HttpError (Either HttpError o))
forall c (r :: [(* -> *) -> * -> *]) a.
Member (Http c) r =>
Request -> (Response c -> Sem r a) -> Sem r (Either HttpError a)
Http.stream Request
request (Sem (Error HttpError : r) o -> Sem r (Either HttpError o)
forall e (r :: [(* -> *) -> * -> *]) a.
Sem (Error e : r) a -> Sem r (Either e a)
runError (Sem (Error HttpError : r) o -> Sem r (Either HttpError o))
-> (Response c -> Sem (Error HttpError : r) o)
-> Response c
-> Sem r (Either HttpError o)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall x. StreamEvent o c h x -> Sem (Error HttpError : r) x)
-> Response c -> Sem (Error HttpError : r) o
forall o (r :: [(* -> *) -> * -> *]) c h.
Members '[Http c, Error HttpError, Resource] r =>
(forall x. StreamEvent o c h x -> Sem r x) -> Response c -> Sem r o
streamHandler (Sem r x -> Sem (Error HttpError : r) x
forall (e :: (* -> *) -> * -> *) (r :: [(* -> *) -> * -> *]) a.
Sem r a -> Sem (e : r) a
raise (Sem r x -> Sem (Error HttpError : r) x)
-> (StreamEvent o c h x -> Sem r x)
-> StreamEvent o c h x
-> Sem (Error HttpError : r) x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamEvent o c h x -> Sem r x
forall x. StreamEvent o c h x -> Sem r x
process))