-- |
-- Module      : Streamly.Internal.Data.Stream.StreamD.Exception
-- Copyright   : (c) 2020 Composewell Technologies and Contributors
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.Stream.StreamD.Exception
    (
      gbracket_
    , gbracket
    , before
    , after_
    , after
    , bracket_
    , bracket'
    , onException
    , finally_
    , finally
    , ghandle
    , handle
    , retry
    )
where

#include "inline.hs"

import Control.Exception (Exception, SomeException, mask_)
import Control.Monad.Catch (MonadCatch)
import Data.Map.Strict (Map)
import GHC.Exts (inline)
import Streamly.Internal.Control.Concurrent (MonadRunInIO, MonadAsync, withRunInIO)
import Streamly.Internal.Data.IOFinalizer
    (newIOFinalizer, runIOFinalizer, clearingIOFinalizer)

import qualified Control.Monad.Catch as MC
import qualified Data.Map.Strict as Map

import Streamly.Internal.Data.Stream.StreamD.Type

data GbracketState s1 s2 v
    = GBracketInit
    | GBracketNormal s1 v
    | GBracketException s2

-- | Like 'gbracket' but with following differences:
--
-- * alloc action @m c@ runs with async exceptions enabled
-- * cleanup action @c -> m d@ won't run if the stream is garbage collected
--   after partial evaluation.
-- * does not require a 'MonadAsync' constraint.
--
-- /Inhibits stream fusion/
--
-- /Pre-release/
--
{-# INLINE_NORMAL gbracket_ #-}
gbracket_
    :: Monad m
    => m c                                  -- ^ before
    -> (forall s. m s -> m (Either e s))    -- ^ try (exception handling)
    -> (c -> m d)                           -- ^ after, on normal stop
    -> (c -> e -> Stream m b -> Stream m b) -- ^ on exception
    -> (c -> Stream m b)                    -- ^ stream generator
    -> Stream m b
gbracket_ :: m c
-> (forall s. m s -> m (Either e s))
-> (c -> m d)
-> (c -> e -> Stream m b -> Stream m b)
-> (c -> Stream m b)
-> Stream m b
gbracket_ m c
bef forall s. m s -> m (Either e s)
exc c -> m d
aft c -> e -> Stream m b -> Stream m b
fexc c -> Stream m b
fnormal =
    (State Stream m b
 -> GbracketState (Stream m b) (Stream m b) c
 -> m (Step (GbracketState (Stream m b) (Stream m b) c) b))
-> GbracketState (Stream m b) (Stream m b) c -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b
-> GbracketState (Stream m b) (Stream m b) c
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
step GbracketState (Stream m b) (Stream m b) c
forall s1 s2 v. GbracketState s1 s2 v
GBracketInit

    where

    {-# INLINE_LATE step #-}
    step :: State Stream m b
-> GbracketState (Stream m b) (Stream m b) c
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
step State Stream m b
_ GbracketState (Stream m b) (Stream m b) c
GBracketInit = do
        c
r <- m c
bef
        Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketState (Stream m b) (Stream m b) c) b
 -> m (Step (GbracketState (Stream m b) (Stream m b) c) b))
-> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall a b. (a -> b) -> a -> b
$ GbracketState (Stream m b) (Stream m b) c
-> Step (GbracketState (Stream m b) (Stream m b) c) b
forall s a. s -> Step s a
Skip (GbracketState (Stream m b) (Stream m b) c
 -> Step (GbracketState (Stream m b) (Stream m b) c) b)
-> GbracketState (Stream m b) (Stream m b) c
-> Step (GbracketState (Stream m b) (Stream m b) c) b
forall a b. (a -> b) -> a -> b
$ Stream m b -> c -> GbracketState (Stream m b) (Stream m b) c
forall s1 s2 v. s1 -> v -> GbracketState s1 s2 v
GBracketNormal (c -> Stream m b
fnormal c
r) c
r

    step State Stream m b
gst (GBracketNormal (UnStream State Stream m b -> s -> m (Step s b)
step1 s
st) c
v) = do
        Either e (Step s b)
res <- m (Step s b) -> m (Either e (Step s b))
forall s. m s -> m (Either e s)
exc (m (Step s b) -> m (Either e (Step s b)))
-> m (Step s b) -> m (Either e (Step s b))
forall a b. (a -> b) -> a -> b
$ State Stream m b -> s -> m (Step s b)
step1 State Stream m b
gst s
st
        case Either e (Step s b)
res of
            Right Step s b
r -> case Step s b
r of
                Yield b
x s
s ->
                    Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketState (Stream m b) (Stream m b) c) b
 -> m (Step (GbracketState (Stream m b) (Stream m b) c) b))
-> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall a b. (a -> b) -> a -> b
$ b
-> GbracketState (Stream m b) (Stream m b) c
-> Step (GbracketState (Stream m b) (Stream m b) c) b
forall s a. a -> s -> Step s a
Yield b
x (Stream m b -> c -> GbracketState (Stream m b) (Stream m b) c
forall s1 s2 v. s1 -> v -> GbracketState s1 s2 v
GBracketNormal ((State Stream m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> s -> m (Step s b)
step1 s
s) c
v)
                Skip s
s -> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketState (Stream m b) (Stream m b) c) b
 -> m (Step (GbracketState (Stream m b) (Stream m b) c) b))
-> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall a b. (a -> b) -> a -> b
$ GbracketState (Stream m b) (Stream m b) c
-> Step (GbracketState (Stream m b) (Stream m b) c) b
forall s a. s -> Step s a
Skip (Stream m b -> c -> GbracketState (Stream m b) (Stream m b) c
forall s1 s2 v. s1 -> v -> GbracketState s1 s2 v
GBracketNormal ((State Stream m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> s -> m (Step s b)
step1 s
s) c
v)
                Step s b
Stop -> c -> m d
aft c
v m d
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (GbracketState (Stream m b) (Stream m b) c) b
forall s a. Step s a
Stop
            -- XXX Do not handle async exceptions, just rethrow them.
            Left e
e ->
                Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketState (Stream m b) (Stream m b) c) b
 -> m (Step (GbracketState (Stream m b) (Stream m b) c) b))
-> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall a b. (a -> b) -> a -> b
$ GbracketState (Stream m b) (Stream m b) c
-> Step (GbracketState (Stream m b) (Stream m b) c) b
forall s a. s -> Step s a
Skip (Stream m b -> GbracketState (Stream m b) (Stream m b) c
forall s1 s2 v. s2 -> GbracketState s1 s2 v
GBracketException (c -> e -> Stream m b -> Stream m b
fexc c
v e
e ((State Stream m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
UnStream State Stream m b -> s -> m (Step s b)
step1 s
st)))
    step State Stream m b
gst (GBracketException (UnStream State Stream m b -> s -> m (Step s b)
step1 s
st)) = do
        Step s b
res <- State Stream m b -> s -> m (Step s b)
step1 State Stream m b
gst s
st
        case Step s b
res of
            Yield b
x s
s -> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketState (Stream m b) (Stream m b) c) b
 -> m (Step (GbracketState (Stream m b) (Stream m b) c) b))
-> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall a b. (a -> b) -> a -> b
$ b
-> GbracketState (Stream m b) (Stream m b) c
-> Step (GbracketState (Stream m b) (Stream m b) c) b
forall s a. a -> s -> Step s a
Yield b
x (Stream m b -> GbracketState (Stream m b) (Stream m b) c
forall s1 s2 v. s2 -> GbracketState s1 s2 v
GBracketException ((State Stream m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> s -> m (Step s b)
step1 s
s))
            Skip s
s    -> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketState (Stream m b) (Stream m b) c) b
 -> m (Step (GbracketState (Stream m b) (Stream m b) c) b))
-> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall a b. (a -> b) -> a -> b
$ GbracketState (Stream m b) (Stream m b) c
-> Step (GbracketState (Stream m b) (Stream m b) c) b
forall s a. s -> Step s a
Skip (Stream m b -> GbracketState (Stream m b) (Stream m b) c
forall s1 s2 v. s2 -> GbracketState s1 s2 v
GBracketException ((State Stream m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> s -> m (Step s b)
step1 s
s))
            Step s b
Stop      -> Step (GbracketState (Stream m b) (Stream m b) c) b
-> m (Step (GbracketState (Stream m b) (Stream m b) c) b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (GbracketState (Stream m b) (Stream m b) c) b
forall s a. Step s a
Stop

data GbracketIOState s1 s2 v wref
    = GBracketIOInit
    | GBracketIONormal s1 v wref
    | GBracketIOException s2

-- | Run the alloc action @m c@ with async exceptions disabled but keeping
-- blocking operations interruptible (see 'Control.Exception.mask').  Use the
-- output @c@ as input to @c -> Stream m b@ to generate an output stream. When
-- generating the stream use the supplied @try@ operation @forall s. m s -> m
-- (Either e s)@ to catch synchronous exceptions. If an exception occurs run
-- the exception handler @c -> e -> Stream m b -> m (Stream m b)@. Note that
-- 'gbracket' does not rethrow the exception, it has to be done by the
-- exception handler if desired.
--
-- The cleanup action @c -> m d@, runs whenever the stream ends normally, due
-- to a sync or async exception or if it gets garbage collected after a partial
-- lazy evaluation.  See 'bracket' for the semantics of the cleanup action.
--
-- 'gbracket' can express all other exception handling combinators.
--
-- /Inhibits stream fusion/
--
-- /Pre-release/
{-# INLINE_NORMAL gbracket #-}
gbracket
    :: MonadRunInIO m
    => m c -- ^ before
    -> (forall s. m s -> m (Either e s)) -- ^ try (exception handling)
    -> (c -> m d1) -- ^ on normal stop
    -> (c -> m d2) -- ^ on GC without normal stop or exception
    -> (c -> e -> Stream m b -> m (Stream m b)) -- ^ on exception
    -> (c -> Stream m b) -- ^ stream generator
    -> Stream m b
gbracket :: m c
-> (forall s. m s -> m (Either e s))
-> (c -> m d1)
-> (c -> m d2)
-> (c -> e -> Stream m b -> m (Stream m b))
-> (c -> Stream m b)
-> Stream m b
gbracket m c
bef forall s. m s -> m (Either e s)
exc c -> m d1
aft c -> m d2
gc c -> e -> Stream m b -> m (Stream m b)
fexc c -> Stream m b
fnormal =
    (State Stream m b
 -> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
 -> m (Step
         (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
step GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref. GbracketIOState s1 s2 v wref
GBracketIOInit

    where

    -- If the stream is never evaluated the "aft" action will never be
    -- called. For that to occur we will need the user of this API to pass a
    -- weak pointer to us.
    {-# INLINE_LATE step #-}
    step :: State Stream m b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
step State Stream m b
_ GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
GBracketIOInit = do
        -- We mask asynchronous exceptions to make the execution
        -- of 'bef' and the registration of 'aft' atomic.
        -- A similar thing is done in the resourcet package: https://git.io/JvKV3
        -- Tutorial: https://markkarpov.com/tutorial/exceptions.html
        (c
r, IOFinalizer
ref) <- ((forall a. m a -> IO (StM m a)) -> IO (StM m (c, IOFinalizer)))
-> m (c, IOFinalizer)
forall (m :: * -> *) b.
MonadRunInIO m =>
((forall a. m a -> IO (StM m a)) -> IO (StM m b)) -> m b
withRunInIO (((forall a. m a -> IO (StM m a)) -> IO (StM m (c, IOFinalizer)))
 -> m (c, IOFinalizer))
-> ((forall a. m a -> IO (StM m a)) -> IO (StM m (c, IOFinalizer)))
-> m (c, IOFinalizer)
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO (StM m a)
run -> IO (StM m (c, IOFinalizer)) -> IO (StM m (c, IOFinalizer))
forall a. IO a -> IO a
mask_ (IO (StM m (c, IOFinalizer)) -> IO (StM m (c, IOFinalizer)))
-> IO (StM m (c, IOFinalizer)) -> IO (StM m (c, IOFinalizer))
forall a b. (a -> b) -> a -> b
$ m (c, IOFinalizer) -> IO (StM m (c, IOFinalizer))
forall a. m a -> IO (StM m a)
run (m (c, IOFinalizer) -> IO (StM m (c, IOFinalizer)))
-> m (c, IOFinalizer) -> IO (StM m (c, IOFinalizer))
forall a b. (a -> b) -> a -> b
$ do
            c
r <- m c
bef
            IOFinalizer
ref <- m d2 -> m IOFinalizer
forall (m :: * -> *) a. MonadRunInIO m => m a -> m IOFinalizer
newIOFinalizer (c -> m d2
gc c
r)
            (c, IOFinalizer) -> m (c, IOFinalizer)
forall (m :: * -> *) a. Monad m => a -> m a
return (c
r, IOFinalizer
ref)
        Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
 -> m (Step
         (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a b. (a -> b) -> a -> b
$ GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. s -> Step s a
Skip (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
 -> Step
      (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall a b. (a -> b) -> a -> b
$ Stream m b
-> c
-> IOFinalizer
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref.
s1 -> v -> wref -> GbracketIOState s1 s2 v wref
GBracketIONormal (c -> Stream m b
fnormal c
r) c
r IOFinalizer
ref

    step State Stream m b
gst (GBracketIONormal (UnStream State Stream m b -> s -> m (Step s b)
step1 s
st) c
v IOFinalizer
ref) = do
        Either e (Step s b)
res <- m (Step s b) -> m (Either e (Step s b))
forall s. m s -> m (Either e s)
exc (m (Step s b) -> m (Either e (Step s b)))
-> m (Step s b) -> m (Either e (Step s b))
forall a b. (a -> b) -> a -> b
$ State Stream m b -> s -> m (Step s b)
step1 State Stream m b
gst s
st
        case Either e (Step s b)
res of
            Right Step s b
r -> case Step s b
r of
                Yield b
x s
s ->
                    Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
 -> m (Step
         (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a b. (a -> b) -> a -> b
$ b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. a -> s -> Step s a
Yield b
x (Stream m b
-> c
-> IOFinalizer
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref.
s1 -> v -> wref -> GbracketIOState s1 s2 v wref
GBracketIONormal ((State Stream m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> s -> m (Step s b)
step1 s
s) c
v IOFinalizer
ref)
                Skip s
s ->
                    Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
 -> m (Step
         (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a b. (a -> b) -> a -> b
$ GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. s -> Step s a
Skip (Stream m b
-> c
-> IOFinalizer
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref.
s1 -> v -> wref -> GbracketIOState s1 s2 v wref
GBracketIONormal ((State Stream m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> s -> m (Step s b)
step1 s
s) c
v IOFinalizer
ref)
                Step s b
Stop ->
                    IOFinalizer -> m d1 -> m d1
forall (m :: * -> *) a. MonadRunInIO m => IOFinalizer -> m a -> m a
clearingIOFinalizer IOFinalizer
ref (c -> m d1
aft c
v) m d1
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. Step s a
Stop
            -- XXX Do not handle async exceptions, just rethrow them.
            Left e
e -> do
                -- Clearing of finalizer and running of exception handler must
                -- be atomic wrt async exceptions. Otherwise if we have cleared
                -- the finalizer and have not run the exception handler then we
                -- may leak the resource.
                Stream m b
stream <- IOFinalizer -> m (Stream m b) -> m (Stream m b)
forall (m :: * -> *) a. MonadRunInIO m => IOFinalizer -> m a -> m a
clearingIOFinalizer IOFinalizer
ref (c -> e -> Stream m b -> m (Stream m b)
fexc c
v e
e ((State Stream m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
UnStream State Stream m b -> s -> m (Step s b)
step1 s
st))
                Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
 -> m (Step
         (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a b. (a -> b) -> a -> b
$ GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. s -> Step s a
Skip (Stream m b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref. s2 -> GbracketIOState s1 s2 v wref
GBracketIOException Stream m b
stream)
    step State Stream m b
gst (GBracketIOException (UnStream State Stream m b -> s -> m (Step s b)
step1 s
st)) = do
        Step s b
res <- State Stream m b -> s -> m (Step s b)
step1 State Stream m b
gst s
st
        case Step s b
res of
            Yield b
x s
s ->
                Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
 -> m (Step
         (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a b. (a -> b) -> a -> b
$ b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. a -> s -> Step s a
Yield b
x (Stream m b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref. s2 -> GbracketIOState s1 s2 v wref
GBracketIOException ((State Stream m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> s -> m (Step s b)
step1 s
s))
            Skip s
s    -> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
 -> m (Step
         (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a b. (a -> b) -> a -> b
$ GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. s -> Step s a
Skip (Stream m b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref. s2 -> GbracketIOState s1 s2 v wref
GBracketIOException ((State Stream m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> s -> m (Step s b)
step1 s
s))
            Step s b
Stop      -> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. Step s a
Stop

-- | See 'Streamly.Internal.Data.Stream.IsStream.before'.
--
{-# INLINE_NORMAL before #-}
before :: Monad m => m b -> Stream m a -> Stream m a
before :: m b -> Stream m a -> Stream m a
before m b
action (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m a -> Maybe s -> m (Step (Maybe s) a))
-> Maybe s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> Maybe s -> m (Step (Maybe s) a)
step' Maybe s
forall a. Maybe a
Nothing

    where

    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> Maybe s -> m (Step (Maybe s) a)
step' State Stream m a
_ Maybe s
Nothing = m b
action m b -> m (Step (Maybe s) a) -> m (Step (Maybe s) a)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Step (Maybe s) a -> m (Step (Maybe s) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe s -> Step (Maybe s) a
forall s a. s -> Step s a
Skip (s -> Maybe s
forall a. a -> Maybe a
Just s
state))

    step' State Stream m a
gst (Just s
st) = do
        Step s a
res <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
res of
            Yield a
x s
s -> Step (Maybe s) a -> m (Step (Maybe s) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe s) a -> m (Step (Maybe s) a))
-> Step (Maybe s) a -> m (Step (Maybe s) a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe s -> Step (Maybe s) a
forall s a. a -> s -> Step s a
Yield a
x (s -> Maybe s
forall a. a -> Maybe a
Just s
s)
            Skip s
s    -> Step (Maybe s) a -> m (Step (Maybe s) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe s) a -> m (Step (Maybe s) a))
-> Step (Maybe s) a -> m (Step (Maybe s) a)
forall a b. (a -> b) -> a -> b
$ Maybe s -> Step (Maybe s) a
forall s a. s -> Step s a
Skip (s -> Maybe s
forall a. a -> Maybe a
Just s
s)
            Step s a
Stop      -> Step (Maybe s) a -> m (Step (Maybe s) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (Maybe s) a
forall s a. Step s a
Stop

-- | See 'Streamly.Internal.Data.Stream.IsStream.after_'.
--
{-# INLINE_NORMAL after_ #-}
after_ :: Monad m => m b -> Stream m a -> Stream m a
after_ :: m b -> Stream m a -> Stream m a
after_ m b
action (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> s -> m (Step s a)
step' s
state

    where

    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> s -> m (Step s a)
step' State Stream m a
gst s
st = do
        Step s a
res <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
res of
            Yield a
x s
s -> Step s a -> m (Step s a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s a -> m (Step s a)) -> Step s a -> m (Step s a)
forall a b. (a -> b) -> a -> b
$ a -> s -> Step s a
forall s a. a -> s -> Step s a
Yield a
x s
s
            Skip s
s    -> Step s a -> m (Step s a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s a -> m (Step s a)) -> Step s a -> m (Step s a)
forall a b. (a -> b) -> a -> b
$ s -> Step s a
forall s a. s -> Step s a
Skip s
s
            Step s a
Stop      -> m b
action m b -> m (Step s a) -> m (Step s a)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Step s a -> m (Step s a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step s a
forall s a. Step s a
Stop

-- | See 'Streamly.Internal.Data.Stream.IsStream.after'.
--
{-# INLINE_NORMAL after #-}
after :: MonadRunInIO m
    => m b -> Stream m a -> Stream m a
after :: m b -> Stream m a -> Stream m a
after m b
action (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m a
 -> Maybe (s, IOFinalizer) -> m (Step (Maybe (s, IOFinalizer)) a))
-> Maybe (s, IOFinalizer) -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a
-> Maybe (s, IOFinalizer) -> m (Step (Maybe (s, IOFinalizer)) a)
step' Maybe (s, IOFinalizer)
forall a. Maybe a
Nothing

    where

    {-# INLINE_LATE step' #-}
    step' :: State Stream m a
-> Maybe (s, IOFinalizer) -> m (Step (Maybe (s, IOFinalizer)) a)
step' State Stream m a
_ Maybe (s, IOFinalizer)
Nothing = do
        IOFinalizer
ref <- m b -> m IOFinalizer
forall (m :: * -> *) a. MonadRunInIO m => m a -> m IOFinalizer
newIOFinalizer m b
action
        Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (s, IOFinalizer)) a
 -> m (Step (Maybe (s, IOFinalizer)) a))
-> Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall a b. (a -> b) -> a -> b
$ Maybe (s, IOFinalizer) -> Step (Maybe (s, IOFinalizer)) a
forall s a. s -> Step s a
Skip (Maybe (s, IOFinalizer) -> Step (Maybe (s, IOFinalizer)) a)
-> Maybe (s, IOFinalizer) -> Step (Maybe (s, IOFinalizer)) a
forall a b. (a -> b) -> a -> b
$ (s, IOFinalizer) -> Maybe (s, IOFinalizer)
forall a. a -> Maybe a
Just (s
state, IOFinalizer
ref)
    step' State Stream m a
gst (Just (s
st, IOFinalizer
ref)) = do
        Step s a
res <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
res of
            Yield a
x s
s -> Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (s, IOFinalizer)) a
 -> m (Step (Maybe (s, IOFinalizer)) a))
-> Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe (s, IOFinalizer) -> Step (Maybe (s, IOFinalizer)) a
forall s a. a -> s -> Step s a
Yield a
x ((s, IOFinalizer) -> Maybe (s, IOFinalizer)
forall a. a -> Maybe a
Just (s
s, IOFinalizer
ref))
            Skip s
s    -> Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (s, IOFinalizer)) a
 -> m (Step (Maybe (s, IOFinalizer)) a))
-> Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall a b. (a -> b) -> a -> b
$ Maybe (s, IOFinalizer) -> Step (Maybe (s, IOFinalizer)) a
forall s a. s -> Step s a
Skip ((s, IOFinalizer) -> Maybe (s, IOFinalizer)
forall a. a -> Maybe a
Just (s
s, IOFinalizer
ref))
            Step s a
Stop      -> do
                IOFinalizer -> m ()
forall (m :: * -> *). MonadIO m => IOFinalizer -> m ()
runIOFinalizer IOFinalizer
ref
                Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (Maybe (s, IOFinalizer)) a
forall s a. Step s a
Stop

-- XXX For high performance error checks in busy streams we may need another
-- Error constructor in step.
--
-- | See 'Streamly.Internal.Data.Stream.IsStream.onException'.
--
{-# INLINE_NORMAL onException #-}
onException :: MonadCatch m => m b -> Stream m a -> Stream m a
onException :: m b -> Stream m a -> Stream m a
onException m b
action Stream m a
str =
    m ()
-> (forall s. m s -> m (Either SomeException s))
-> (() -> m ())
-> (() -> SomeException -> Stream m a -> Stream m a)
-> (() -> Stream m a)
-> Stream m a
forall (m :: * -> *) c e d b.
Monad m =>
m c
-> (forall s. m s -> m (Either e s))
-> (c -> m d)
-> (c -> e -> Stream m b -> Stream m b)
-> (c -> Stream m b)
-> Stream m b
gbracket_ (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) ((m s -> m (Either SomeException s))
-> m s -> m (Either SomeException s)
forall a. a -> a
inline m s -> m (Either SomeException s)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try) () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return
        (\()
_ (SomeException
e :: MC.SomeException) Stream m a
_ -> m Any -> Stream m a
forall (m :: * -> *) b a. Applicative m => m b -> Stream m a
nilM (m b
action m b -> m Any -> m Any
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> SomeException -> m Any
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
MC.throwM SomeException
e))
        (Stream m a -> () -> Stream m a
forall a b. a -> b -> a
const Stream m a
str)

{-# INLINE_NORMAL _onException #-}
_onException :: MonadCatch m => m b -> Stream m a -> Stream m a
_onException :: m b -> Stream m a -> Stream m a
_onException m b
action (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> s -> m (Step s a)
step' s
state

    where

    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> s -> m (Step s a)
step' State Stream m a
gst s
st = do
        Step s a
res <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st m (Step s a) -> m b -> m (Step s a)
forall (m :: * -> *) a b. MonadCatch m => m a -> m b -> m a
`MC.onException` m b
action
        case Step s a
res of
            Yield a
x s
s -> Step s a -> m (Step s a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s a -> m (Step s a)) -> Step s a -> m (Step s a)
forall a b. (a -> b) -> a -> b
$ a -> s -> Step s a
forall s a. a -> s -> Step s a
Yield a
x s
s
            Skip s
s    -> Step s a -> m (Step s a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s a -> m (Step s a)) -> Step s a -> m (Step s a)
forall a b. (a -> b) -> a -> b
$ s -> Step s a
forall s a. s -> Step s a
Skip s
s
            Step s a
Stop      -> Step s a -> m (Step s a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step s a
forall s a. Step s a
Stop

-- | See 'Streamly.Internal.Data.Stream.IsStream.bracket_'.
--
{-# INLINE_NORMAL bracket_ #-}
bracket_ :: MonadCatch m
    => m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
bracket_ :: m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
bracket_ m b
bef b -> m c
aft =
    m b
-> (forall s. m s -> m (Either SomeException s))
-> (b -> m c)
-> (b -> SomeException -> Stream m a -> Stream m a)
-> (b -> Stream m a)
-> Stream m a
forall (m :: * -> *) c e d b.
Monad m =>
m c
-> (forall s. m s -> m (Either e s))
-> (c -> m d)
-> (c -> e -> Stream m b -> Stream m b)
-> (c -> Stream m b)
-> Stream m b
gbracket_ m b
bef ((m s -> m (Either SomeException s))
-> m s -> m (Either SomeException s)
forall a. a -> a
inline m s -> m (Either SomeException s)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try) b -> m c
aft
        (\b
a (SomeException
e :: SomeException) Stream m a
_ -> m Any -> Stream m a
forall (m :: * -> *) b a. Applicative m => m b -> Stream m a
nilM (b -> m c
aft b
a m c -> m Any -> m Any
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> SomeException -> m Any
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
MC.throwM SomeException
e))

-- | See 'Streamly.Internal.Data.Stream.IsStream.bracket'.
--
{-# INLINE_NORMAL bracket' #-}
bracket' :: (MonadAsync m, MonadCatch m) =>
       m b
    -> (b -> m c)
    -> (b -> m d)
    -> (b -> m e)
    -> (b -> Stream m a)
    -> Stream m a
bracket' :: m b
-> (b -> m c)
-> (b -> m d)
-> (b -> m e)
-> (b -> Stream m a)
-> Stream m a
bracket' m b
bef b -> m c
aft b -> m d
exc b -> m e
gc =
    m b
-> (forall s. m s -> m (Either SomeException s))
-> (b -> m c)
-> (b -> m e)
-> (b -> SomeException -> Stream m a -> m (Stream m a))
-> (b -> Stream m a)
-> Stream m a
forall (m :: * -> *) c e d1 d2 b.
MonadRunInIO m =>
m c
-> (forall s. m s -> m (Either e s))
-> (c -> m d1)
-> (c -> m d2)
-> (c -> e -> Stream m b -> m (Stream m b))
-> (c -> Stream m b)
-> Stream m b
gbracket m b
bef ((m s -> m (Either SomeException s))
-> m s -> m (Either SomeException s)
forall a. a -> a
inline m s -> m (Either SomeException s)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try) b -> m c
aft b -> m e
gc
        (\b
a (SomeException
e :: SomeException) Stream m a
_ -> b -> m d
exc b
a m d -> m (Stream m a) -> m (Stream m a)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Stream m a -> m (Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (m Any -> Stream m a
forall (m :: * -> *) b a. Applicative m => m b -> Stream m a
nilM (SomeException -> m Any
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
MC.throwM SomeException
e)))

data BracketState s v = BracketInit | BracketRun s v

-- | Alternate (custom) implementation of 'bracket'.
--
{-# INLINE_NORMAL _bracket #-}
_bracket :: MonadCatch m
    => m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
_bracket :: m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
_bracket m b
bef b -> m c
aft b -> Stream m a
bet = (State Stream m a
 -> BracketState (Stream m a) b
 -> m (Step (BracketState (Stream m a) b) a))
-> BracketState (Stream m a) b -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a
-> BracketState (Stream m a) b
-> m (Step (BracketState (Stream m a) b) a)
step' BracketState (Stream m a) b
forall s v. BracketState s v
BracketInit

    where

    {-# INLINE_LATE step' #-}
    step' :: State Stream m a
-> BracketState (Stream m a) b
-> m (Step (BracketState (Stream m a) b) a)
step' State Stream m a
_ BracketState (Stream m a) b
BracketInit = m b
bef m b
-> (b -> m (Step (BracketState (Stream m a) b) a))
-> m (Step (BracketState (Stream m a) b) a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \b
x -> Step (BracketState (Stream m a) b) a
-> m (Step (BracketState (Stream m a) b) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (BracketState (Stream m a) b -> Step (BracketState (Stream m a) b) a
forall s a. s -> Step s a
Skip (Stream m a -> b -> BracketState (Stream m a) b
forall s v. s -> v -> BracketState s v
BracketRun (b -> Stream m a
bet b
x) b
x))

    -- NOTE: It is important to use UnStream instead of the Stream pattern
    -- here, otherwise we get huge perf degradation, see note in concatMap.
    step' State Stream m a
gst (BracketRun (UnStream State Stream m a -> s -> m (Step s a)
step s
state) b
v) = do
        -- res <- step gst state `MC.onException` aft v
        Either SomeException (Step s a)
res <- (m (Step s a) -> m (Either SomeException (Step s a)))
-> m (Step s a) -> m (Either SomeException (Step s a))
forall a. a -> a
inline m (Step s a) -> m (Either SomeException (Step s a))
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try (m (Step s a) -> m (Either SomeException (Step s a)))
-> m (Step s a) -> m (Either SomeException (Step s a))
forall a b. (a -> b) -> a -> b
$ State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
state
        case Either SomeException (Step s a)
res of
            Left (SomeException
e :: SomeException) -> b -> m c
aft b
v m c -> m Any -> m Any
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> SomeException -> m Any
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
MC.throwM SomeException
e m Any
-> m (Step (BracketState (Stream m a) b) a)
-> m (Step (BracketState (Stream m a) b) a)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Step (BracketState (Stream m a) b) a
-> m (Step (BracketState (Stream m a) b) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (BracketState (Stream m a) b) a
forall s a. Step s a
Stop
            Right Step s a
r -> case Step s a
r of
                Yield a
x s
s -> Step (BracketState (Stream m a) b) a
-> m (Step (BracketState (Stream m a) b) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (BracketState (Stream m a) b) a
 -> m (Step (BracketState (Stream m a) b) a))
-> Step (BracketState (Stream m a) b) a
-> m (Step (BracketState (Stream m a) b) a)
forall a b. (a -> b) -> a -> b
$ a
-> BracketState (Stream m a) b
-> Step (BracketState (Stream m a) b) a
forall s a. a -> s -> Step s a
Yield a
x (Stream m a -> b -> BracketState (Stream m a) b
forall s v. s -> v -> BracketState s v
BracketRun ((State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> s -> m (Step s a)
step s
s) b
v)
                Skip s
s    -> Step (BracketState (Stream m a) b) a
-> m (Step (BracketState (Stream m a) b) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (BracketState (Stream m a) b) a
 -> m (Step (BracketState (Stream m a) b) a))
-> Step (BracketState (Stream m a) b) a
-> m (Step (BracketState (Stream m a) b) a)
forall a b. (a -> b) -> a -> b
$ BracketState (Stream m a) b -> Step (BracketState (Stream m a) b) a
forall s a. s -> Step s a
Skip (Stream m a -> b -> BracketState (Stream m a) b
forall s v. s -> v -> BracketState s v
BracketRun ((State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> s -> m (Step s a)
step s
s) b
v)
                Step s a
Stop      -> b -> m c
aft b
v m c
-> m (Step (BracketState (Stream m a) b) a)
-> m (Step (BracketState (Stream m a) b) a)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Step (BracketState (Stream m a) b) a
-> m (Step (BracketState (Stream m a) b) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (BracketState (Stream m a) b) a
forall s a. Step s a
Stop

-- | See 'Streamly.Internal.Data.Stream.IsStream.finally_'.
--
{-# INLINE finally_ #-}
finally_ :: MonadCatch m => m b -> Stream m a -> Stream m a
finally_ :: m b -> Stream m a -> Stream m a
finally_ m b
action Stream m a
xs = m () -> (() -> m b) -> (() -> Stream m a) -> Stream m a
forall (m :: * -> *) b c a.
MonadCatch m =>
m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
bracket_ (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) (m b -> () -> m b
forall a b. a -> b -> a
const m b
action) (Stream m a -> () -> Stream m a
forall a b. a -> b -> a
const Stream m a
xs)

-- | See 'Streamly.Internal.Data.Stream.IsStream.finally'.
--
-- finally action xs = after action $ onException action xs
--
{-# INLINE finally #-}
finally :: (MonadAsync m, MonadCatch m) => m b -> Stream m a -> Stream m a
finally :: m b -> Stream m a -> Stream m a
finally m b
action Stream m a
xs = m ()
-> (() -> m b)
-> (() -> m b)
-> (() -> m b)
-> (() -> Stream m a)
-> Stream m a
forall (m :: * -> *) b c d e a.
(MonadAsync m, MonadCatch m) =>
m b
-> (b -> m c)
-> (b -> m d)
-> (b -> m e)
-> (b -> Stream m a)
-> Stream m a
bracket' (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) () -> m b
forall p. p -> m b
act () -> m b
forall p. p -> m b
act () -> m b
forall p. p -> m b
act (Stream m a -> () -> Stream m a
forall a b. a -> b -> a
const Stream m a
xs)
    where act :: p -> m b
act p
_ = m b
action

-- | See 'Streamly.Internal.Data.Stream.IsStream.ghandle'.
--
{-# INLINE_NORMAL ghandle #-}
ghandle :: (MonadCatch m, Exception e)
    => (e -> Stream m a -> Stream m a) -> Stream m a -> Stream m a
ghandle :: (e -> Stream m a -> Stream m a) -> Stream m a -> Stream m a
ghandle e -> Stream m a -> Stream m a
f Stream m a
str =
    m ()
-> (forall s. m s -> m (Either e s))
-> (() -> m ())
-> (() -> e -> Stream m a -> Stream m a)
-> (() -> Stream m a)
-> Stream m a
forall (m :: * -> *) c e d b.
Monad m =>
m c
-> (forall s. m s -> m (Either e s))
-> (c -> m d)
-> (c -> e -> Stream m b -> Stream m b)
-> (c -> Stream m b)
-> Stream m b
gbracket_ (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) ((m s -> m (Either e s)) -> m s -> m (Either e s)
forall a. a -> a
inline m s -> m (Either e s)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try) () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ((e -> Stream m a -> Stream m a)
-> () -> e -> Stream m a -> Stream m a
forall a b. a -> b -> a
const e -> Stream m a -> Stream m a
f) (Stream m a -> () -> Stream m a
forall a b. a -> b -> a
const Stream m a
str)

-- | See 'Streamly.Internal.Data.Stream.IsStream.handle'.
--
{-# INLINE_NORMAL handle #-}
handle :: (MonadCatch m, Exception e)
    => (e -> Stream m a) -> Stream m a -> Stream m a
handle :: (e -> Stream m a) -> Stream m a -> Stream m a
handle e -> Stream m a
f Stream m a
str =
    m ()
-> (forall s. m s -> m (Either e s))
-> (() -> m ())
-> (() -> e -> Stream m a -> Stream m a)
-> (() -> Stream m a)
-> Stream m a
forall (m :: * -> *) c e d b.
Monad m =>
m c
-> (forall s. m s -> m (Either e s))
-> (c -> m d)
-> (c -> e -> Stream m b -> Stream m b)
-> (c -> Stream m b)
-> Stream m b
gbracket_ (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) ((m s -> m (Either e s)) -> m s -> m (Either e s)
forall a. a -> a
inline m s -> m (Either e s)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try) () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return (\()
_ e
e Stream m a
_ -> e -> Stream m a
f e
e) (Stream m a -> () -> Stream m a
forall a b. a -> b -> a
const Stream m a
str)

-- | Alternate (custom) implementation of 'handle'.
--
{-# INLINE_NORMAL _handle #-}
_handle :: (MonadCatch m, Exception e)
    => (e -> Stream m a) -> Stream m a -> Stream m a
_handle :: (e -> Stream m a) -> Stream m a -> Stream m a
_handle e -> Stream m a
f (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m a
 -> Either s (Stream m a) -> m (Step (Either s (Stream m a)) a))
-> Either s (Stream m a) -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a
-> Either s (Stream m a) -> m (Step (Either s (Stream m a)) a)
step' (s -> Either s (Stream m a)
forall a b. a -> Either a b
Left s
state)

    where

    {-# INLINE_LATE step' #-}
    step' :: State Stream m a
-> Either s (Stream m a) -> m (Step (Either s (Stream m a)) a)
step' State Stream m a
gst (Left s
st) = do
        Either e (Step s a)
res <- (m (Step s a) -> m (Either e (Step s a)))
-> m (Step s a) -> m (Either e (Step s a))
forall a. a -> a
inline m (Step s a) -> m (Either e (Step s a))
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try (m (Step s a) -> m (Either e (Step s a)))
-> m (Step s a) -> m (Either e (Step s a))
forall a b. (a -> b) -> a -> b
$ State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Either e (Step s a)
res of
            Left e
e -> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Either s (Stream m a)) a
 -> m (Step (Either s (Stream m a)) a))
-> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ Either s (Stream m a) -> Step (Either s (Stream m a)) a
forall s a. s -> Step s a
Skip (Either s (Stream m a) -> Step (Either s (Stream m a)) a)
-> Either s (Stream m a) -> Step (Either s (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Either s (Stream m a)
forall a b. b -> Either a b
Right (e -> Stream m a
f e
e)
            Right Step s a
r -> case Step s a
r of
                Yield a
x s
s -> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Either s (Stream m a)) a
 -> m (Step (Either s (Stream m a)) a))
-> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ a -> Either s (Stream m a) -> Step (Either s (Stream m a)) a
forall s a. a -> s -> Step s a
Yield a
x (s -> Either s (Stream m a)
forall a b. a -> Either a b
Left s
s)
                Skip s
s    -> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Either s (Stream m a)) a
 -> m (Step (Either s (Stream m a)) a))
-> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ Either s (Stream m a) -> Step (Either s (Stream m a)) a
forall s a. s -> Step s a
Skip (s -> Either s (Stream m a)
forall a b. a -> Either a b
Left s
s)
                Step s a
Stop      -> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (Either s (Stream m a)) a
forall s a. Step s a
Stop

    step' State Stream m a
gst (Right (UnStream State Stream m a -> s -> m (Step s a)
step1 s
st)) = do
        Step s a
res <- State Stream m a -> s -> m (Step s a)
step1 State Stream m a
gst s
st
        case Step s a
res of
            Yield a
x s
s -> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Either s (Stream m a)) a
 -> m (Step (Either s (Stream m a)) a))
-> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ a -> Either s (Stream m a) -> Step (Either s (Stream m a)) a
forall s a. a -> s -> Step s a
Yield a
x (Stream m a -> Either s (Stream m a)
forall a b. b -> Either a b
Right ((State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> s -> m (Step s a)
step1 s
s))
            Skip s
s    -> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Either s (Stream m a)) a
 -> m (Step (Either s (Stream m a)) a))
-> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ Either s (Stream m a) -> Step (Either s (Stream m a)) a
forall s a. s -> Step s a
Skip (Stream m a -> Either s (Stream m a)
forall a b. b -> Either a b
Right ((State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> s -> m (Step s a)
step1 s
s))
            Step s a
Stop      -> Step (Either s (Stream m a)) a
-> m (Step (Either s (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (Either s (Stream m a)) a
forall s a. Step s a
Stop

data RetryState emap s1 s2
    = RetryWithMap emap s1
    | RetryDefault s2

-- | See 'Streamly.Internal.Data.Stream.IsStream.retry'
--
{-# INLINE_NORMAL retry #-}
retry
    :: forall e m a. (Exception e, Ord e, MonadCatch m)
    => Map e Int
       -- ^ map from exception to retry count
    -> (e -> Stream m a)
       -- ^ default handler for those exceptions that are not in the map
    -> Stream m a
    -> Stream m a
retry :: Map e Int -> (e -> Stream m a) -> Stream m a -> Stream m a
retry Map e Int
emap0 e -> Stream m a
defaultHandler (Stream State Stream m a -> s -> m (Step s a)
step0 s
state0) = (State Stream m a
 -> RetryState (Map e Int) s (Stream m a)
 -> m (Step (RetryState (Map e Int) s (Stream m a)) a))
-> RetryState (Map e Int) s (Stream m a) -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a
-> RetryState (Map e Int) s (Stream m a)
-> m (Step (RetryState (Map e Int) s (Stream m a)) a)
forall a.
(Ord a, Num a) =>
State Stream m a
-> RetryState (Map e a) s (Stream m a)
-> m (Step (RetryState (Map e a) s (Stream m a)) a)
step RetryState (Map e Int) s (Stream m a)
forall s2. RetryState (Map e Int) s s2
state

    where

    state :: RetryState (Map e Int) s s2
state = Map e Int -> s -> RetryState (Map e Int) s s2
forall emap s1 s2. emap -> s1 -> RetryState emap s1 s2
RetryWithMap Map e Int
emap0 s
state0

    {-# INLINE_LATE step #-}
    step :: State Stream m a
-> RetryState (Map e a) s (Stream m a)
-> m (Step (RetryState (Map e a) s (Stream m a)) a)
step State Stream m a
gst (RetryWithMap Map e a
emap s
st) = do
        Either e (Step s a)
eres <- m (Step s a) -> m (Either e (Step s a))
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try (m (Step s a) -> m (Either e (Step s a)))
-> m (Step s a) -> m (Either e (Step s a))
forall a b. (a -> b) -> a -> b
$ State Stream m a -> s -> m (Step s a)
step0 State Stream m a
gst s
st
        case Either e (Step s a)
eres of
            Left e
e -> e
-> Map e a -> s -> m (Step (RetryState (Map e a) s (Stream m a)) a)
forall (m :: * -> *) a s1 a.
(Monad m, Ord a, Num a) =>
e
-> Map e a
-> s1
-> m (Step (RetryState (Map e a) s1 (Stream m a)) a)
handler e
e Map e a
emap s
st
            Right Step s a
res ->
                Step (RetryState (Map e a) s (Stream m a)) a
-> m (Step (RetryState (Map e a) s (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return
                    (Step (RetryState (Map e a) s (Stream m a)) a
 -> m (Step (RetryState (Map e a) s (Stream m a)) a))
-> Step (RetryState (Map e a) s (Stream m a)) a
-> m (Step (RetryState (Map e a) s (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
res of
                          Yield a
x s
st1 -> a
-> RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a
forall s a. a -> s -> Step s a
Yield a
x (RetryState (Map e a) s (Stream m a)
 -> Step (RetryState (Map e a) s (Stream m a)) a)
-> RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ Map e a -> s -> RetryState (Map e a) s (Stream m a)
forall emap s1 s2. emap -> s1 -> RetryState emap s1 s2
RetryWithMap Map e a
emap s
st1
                          Skip s
st1 -> RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a
forall s a. s -> Step s a
Skip (RetryState (Map e a) s (Stream m a)
 -> Step (RetryState (Map e a) s (Stream m a)) a)
-> RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ Map e a -> s -> RetryState (Map e a) s (Stream m a)
forall emap s1 s2. emap -> s1 -> RetryState emap s1 s2
RetryWithMap Map e a
emap s
st1
                          Step s a
Stop -> Step (RetryState (Map e a) s (Stream m a)) a
forall s a. Step s a
Stop
    step State Stream m a
gst (RetryDefault (UnStream State Stream m a -> s -> m (Step s a)
step1 s
state1)) = do
        Step s a
res <- State Stream m a -> s -> m (Step s a)
step1 State Stream m a
gst s
state1
        Step (RetryState (Map e a) s (Stream m a)) a
-> m (Step (RetryState (Map e a) s (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (RetryState (Map e a) s (Stream m a)) a
 -> m (Step (RetryState (Map e a) s (Stream m a)) a))
-> Step (RetryState (Map e a) s (Stream m a)) a
-> m (Step (RetryState (Map e a) s (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
res of
                  Yield a
x s
st1 -> a
-> RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a
forall s a. a -> s -> Step s a
Yield a
x (RetryState (Map e a) s (Stream m a)
 -> Step (RetryState (Map e a) s (Stream m a)) a)
-> RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ Stream m a -> RetryState (Map e a) s (Stream m a)
forall emap s1 s2. s2 -> RetryState emap s1 s2
RetryDefault ((State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> s -> m (Step s a)
step1 s
st1)
                  Skip s
st1 -> RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a
forall s a. s -> Step s a
Skip (RetryState (Map e a) s (Stream m a)
 -> Step (RetryState (Map e a) s (Stream m a)) a)
-> RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ Stream m a -> RetryState (Map e a) s (Stream m a)
forall emap s1 s2. s2 -> RetryState emap s1 s2
RetryDefault ((State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> s -> m (Step s a)
step1 s
st1)
                  Step s a
Stop -> Step (RetryState (Map e a) s (Stream m a)) a
forall s a. Step s a
Stop

    {-# INLINE handler #-}
    handler :: e
-> Map e a
-> s1
-> m (Step (RetryState (Map e a) s1 (Stream m a)) a)
handler e
e Map e a
emap s1
st =
        Step (RetryState (Map e a) s1 (Stream m a)) a
-> m (Step (RetryState (Map e a) s1 (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (RetryState (Map e a) s1 (Stream m a)) a
 -> m (Step (RetryState (Map e a) s1 (Stream m a)) a))
-> Step (RetryState (Map e a) s1 (Stream m a)) a
-> m (Step (RetryState (Map e a) s1 (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ RetryState (Map e a) s1 (Stream m a)
-> Step (RetryState (Map e a) s1 (Stream m a)) a
forall s a. s -> Step s a
Skip
            (RetryState (Map e a) s1 (Stream m a)
 -> Step (RetryState (Map e a) s1 (Stream m a)) a)
-> RetryState (Map e a) s1 (Stream m a)
-> Step (RetryState (Map e a) s1 (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ case e -> Map e a -> Maybe a
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup e
e Map e a
emap of
                  Just a
i
                      | a
i a -> a -> Bool
forall a. Ord a => a -> a -> Bool
> a
0 ->
                          let emap1 :: Map e a
emap1 = e -> a -> Map e a -> Map e a
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert e
e (a
i a -> a -> a
forall a. Num a => a -> a -> a
- a
1) Map e a
emap
                           in Map e a -> s1 -> RetryState (Map e a) s1 (Stream m a)
forall emap s1 s2. emap -> s1 -> RetryState emap s1 s2
RetryWithMap Map e a
emap1 s1
st
                      | Bool
otherwise -> Stream m a -> RetryState (Map e a) s1 (Stream m a)
forall emap s1 s2. s2 -> RetryState emap s1 s2
RetryDefault (Stream m a -> RetryState (Map e a) s1 (Stream m a))
-> Stream m a -> RetryState (Map e a) s1 (Stream m a)
forall a b. (a -> b) -> a -> b
$ e -> Stream m a
defaultHandler e
e
                  Maybe a
Nothing -> Stream m a -> RetryState (Map e a) s1 (Stream m a)
forall emap s1 s2. s2 -> RetryState emap s1 s2
RetryDefault (Stream m a -> RetryState (Map e a) s1 (Stream m a))
-> Stream m a -> RetryState (Map e a) s1 (Stream m a)
forall a b. (a -> b) -> a -> b
$ e -> Stream m a
defaultHandler e
e