{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE Rank2Types #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
module Haxl.Core.DataSource
(
DataSource(..)
, DataSourceName(..)
, Request
, BlockedFetch(..)
, PerformFetch(..)
, SchedulerHint(..)
, FailureClassification(..)
, ResultVar(..)
, mkResultVar
, putFailure
, putResult
, putResultFromChildThread
, putSuccess
, putResultWithStats
, putResultWithStatsFromChildThread
, asyncFetch, asyncFetchWithDispatch
, asyncFetchAcquireRelease
, backgroundFetchSeq, backgroundFetchPar
, backgroundFetchAcquireRelease
, backgroundFetchAcquireReleaseMVar
, stubFetch
, syncFetch
, except
, setError
) where
import Control.Exception
import Control.Monad
import Data.Hashable
import Data.Text (Text)
import Data.Kind (Type)
import Data.Typeable
import Haxl.Core.Exception
import Haxl.Core.Flags
import Haxl.Core.ShowP
import Haxl.Core.StateStore
import Haxl.Core.Stats
import GHC.Conc ( newStablePtrPrimMVar
, PrimMVar)
import Control.Concurrent ( threadCapability
, forkOn
, myThreadId )
import Control.Concurrent.MVar
import Foreign.StablePtr
class (DataSourceName req, StateKey req, ShowP req) => DataSource u req where
fetch
:: State req
-> Flags
-> u
-> PerformFetch req
schedulerHint :: u -> SchedulerHint req
schedulerHint u
_ = SchedulerHint req
forall (req :: * -> *). SchedulerHint req
TryToBatch
classifyFailure :: u -> req a -> SomeException -> FailureClassification
classifyFailure u
_ req a
_ SomeException
_ = FailureClassification
StandardFailure
class DataSourceName (req :: Type -> Type) where
dataSourceName :: Proxy req -> Text
type Request req a =
( Eq (req a)
, Hashable (req a)
, Typeable (req a)
, Show (req a)
, Show a
)
data SchedulerHint (req :: Type -> Type)
= TryToBatch
| SubmitImmediately
data FailureClassification
= StandardFailure
| IgnoredForStatsFailure
data PerformFetch req
= SyncFetch ([BlockedFetch req] -> IO ())
| AsyncFetch ([BlockedFetch req] -> IO () -> IO ())
| BackgroundFetch ([BlockedFetch req] -> IO ())
data BlockedFetch r = forall a. BlockedFetch (r a) (ResultVar a)
newtype ResultVar a =
ResultVar (Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
mkResultVar
:: (Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a
mkResultVar :: (Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a
mkResultVar = (Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a
forall a.
(Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a
ResultVar
putFailure :: (Exception e) => ResultVar a -> e -> IO ()
putFailure :: ResultVar a -> e -> IO ()
putFailure ResultVar a
r = ResultVar a -> Either SomeException a -> IO ()
forall a. ResultVar a -> Either SomeException a -> IO ()
putResult ResultVar a
r (Either SomeException a -> IO ())
-> (e -> Either SomeException a) -> e -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. e -> Either SomeException a
forall e a. Exception e => e -> Either SomeException a
except
putSuccess :: ResultVar a -> a -> IO ()
putSuccess :: ResultVar a -> a -> IO ()
putSuccess ResultVar a
r = ResultVar a -> Either SomeException a -> IO ()
forall a. ResultVar a -> Either SomeException a -> IO ()
putResult ResultVar a
r (Either SomeException a -> IO ())
-> (a -> Either SomeException a) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Either SomeException a
forall a b. b -> Either a b
Right
putResult :: ResultVar a -> Either SomeException a -> IO ()
putResult :: ResultVar a -> Either SomeException a -> IO ()
putResult (ResultVar Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ()
io) Either SomeException a
res = Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ()
io Either SomeException a
res Bool
False Maybe DataSourceStats
forall a. Maybe a
Nothing
putResultWithStats
:: ResultVar a -> Either SomeException a -> DataSourceStats -> IO ()
putResultWithStats :: ResultVar a -> Either SomeException a -> DataSourceStats -> IO ()
putResultWithStats (ResultVar Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ()
io) Either SomeException a
res DataSourceStats
st = Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ()
io Either SomeException a
res Bool
False (DataSourceStats -> Maybe DataSourceStats
forall a. a -> Maybe a
Just DataSourceStats
st)
putResultWithStatsFromChildThread
:: ResultVar a -> Either SomeException a -> DataSourceStats -> IO ()
putResultWithStatsFromChildThread :: ResultVar a -> Either SomeException a -> DataSourceStats -> IO ()
putResultWithStatsFromChildThread (ResultVar Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ()
io) Either SomeException a
res DataSourceStats
st = Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ()
io Either SomeException a
res Bool
True (DataSourceStats -> Maybe DataSourceStats
forall a. a -> Maybe a
Just DataSourceStats
st)
putResultFromChildThread :: ResultVar a -> Either SomeException a -> IO ()
putResultFromChildThread :: ResultVar a -> Either SomeException a -> IO ()
putResultFromChildThread (ResultVar Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ()
io) Either SomeException a
res = Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ()
io Either SomeException a
res Bool
True Maybe DataSourceStats
forall a. Maybe a
Nothing
setError :: (Exception e) => (forall a. r a -> e) -> BlockedFetch r -> IO ()
setError :: (forall a. r a -> e) -> BlockedFetch r -> IO ()
setError forall a. r a -> e
e (BlockedFetch r a
req ResultVar a
m) = ResultVar a -> e -> IO ()
forall e a. Exception e => ResultVar a -> e -> IO ()
putFailure ResultVar a
m (r a -> e
forall a. r a -> e
e r a
req)
except :: (Exception e) => e -> Either SomeException a
except :: e -> Either SomeException a
except = SomeException -> Either SomeException a
forall a b. a -> Either a b
Left (SomeException -> Either SomeException a)
-> (e -> SomeException) -> e -> Either SomeException a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. e -> SomeException
forall e. Exception e => e -> SomeException
toException
stubFetch
:: (Exception e) => (forall a. r a -> e)
-> State r -> Flags -> u -> PerformFetch r
stubFetch :: (forall a. r a -> e) -> State r -> Flags -> u -> PerformFetch r
stubFetch forall a. r a -> e
e State r
_state Flags
_flags u
_si = ([BlockedFetch r] -> IO ()) -> PerformFetch r
forall (req :: * -> *).
([BlockedFetch req] -> IO ()) -> PerformFetch req
SyncFetch (([BlockedFetch r] -> IO ()) -> PerformFetch r)
-> ([BlockedFetch r] -> IO ()) -> PerformFetch r
forall a b. (a -> b) -> a -> b
$ (BlockedFetch r -> IO ()) -> [BlockedFetch r] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ((forall a. r a -> e) -> BlockedFetch r -> IO ()
forall e (r :: * -> *).
Exception e =>
(forall a. r a -> e) -> BlockedFetch r -> IO ()
setError forall a. r a -> e
e)
asyncFetchWithDispatch
:: ((service -> IO ()) -> IO ())
-> (service -> IO ())
-> (service -> IO ())
-> (forall a. service -> request a -> IO (IO (Either SomeException a)))
-> State request
-> Flags
-> u
-> PerformFetch request
asyncFetch, syncFetch
:: ((service -> IO ()) -> IO ())
-> (service -> IO ())
-> (forall a. service -> request a -> IO (IO (Either SomeException a)))
-> State request
-> Flags
-> u
-> PerformFetch request
asyncFetchWithDispatch :: ((service -> IO ()) -> IO ())
-> (service -> IO ())
-> (service -> IO ())
-> (forall a.
service -> request a -> IO (IO (Either SomeException a)))
-> State request
-> Flags
-> u
-> PerformFetch request
asyncFetchWithDispatch
(service -> IO ()) -> IO ()
withService service -> IO ()
dispatch service -> IO ()
wait forall a. service -> request a -> IO (IO (Either SomeException a))
enqueue State request
_state Flags
_flags u
_si =
([BlockedFetch request] -> IO () -> IO ()) -> PerformFetch request
forall (req :: * -> *).
([BlockedFetch req] -> IO () -> IO ()) -> PerformFetch req
AsyncFetch (([BlockedFetch request] -> IO () -> IO ())
-> PerformFetch request)
-> ([BlockedFetch request] -> IO () -> IO ())
-> PerformFetch request
forall a b. (a -> b) -> a -> b
$ \[BlockedFetch request]
requests IO ()
inner -> (service -> IO ()) -> IO ()
withService ((service -> IO ()) -> IO ()) -> (service -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \service
service -> do
[IO ()]
getResults <- (BlockedFetch request -> IO (IO ()))
-> [BlockedFetch request] -> IO [IO ()]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (service
-> (forall a.
service -> request a -> IO (IO (Either SomeException a)))
-> BlockedFetch request
-> IO (IO ())
forall service (request :: * -> *).
service
-> (forall a.
service -> request a -> IO (IO (Either SomeException a)))
-> BlockedFetch request
-> IO (IO ())
submitFetch service
service forall a. service -> request a -> IO (IO (Either SomeException a))
enqueue) [BlockedFetch request]
requests
service -> IO ()
dispatch service
service
IO ()
inner
service -> IO ()
wait service
service
[IO ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_ [IO ()]
getResults
asyncFetch :: ((service -> IO ()) -> IO ())
-> (service -> IO ())
-> (forall a.
service -> request a -> IO (IO (Either SomeException a)))
-> State request
-> Flags
-> u
-> PerformFetch request
asyncFetch (service -> IO ()) -> IO ()
withService service -> IO ()
wait forall a. service -> request a -> IO (IO (Either SomeException a))
enqueue State request
_state Flags
_flags u
_si =
([BlockedFetch request] -> IO () -> IO ()) -> PerformFetch request
forall (req :: * -> *).
([BlockedFetch req] -> IO () -> IO ()) -> PerformFetch req
AsyncFetch (([BlockedFetch request] -> IO () -> IO ())
-> PerformFetch request)
-> ([BlockedFetch request] -> IO () -> IO ())
-> PerformFetch request
forall a b. (a -> b) -> a -> b
$ \[BlockedFetch request]
requests IO ()
inner -> (service -> IO ()) -> IO ()
withService ((service -> IO ()) -> IO ()) -> (service -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \service
service -> do
[IO ()]
getResults <- (BlockedFetch request -> IO (IO ()))
-> [BlockedFetch request] -> IO [IO ()]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (service
-> (forall a.
service -> request a -> IO (IO (Either SomeException a)))
-> BlockedFetch request
-> IO (IO ())
forall service (request :: * -> *).
service
-> (forall a.
service -> request a -> IO (IO (Either SomeException a)))
-> BlockedFetch request
-> IO (IO ())
submitFetch service
service forall a. service -> request a -> IO (IO (Either SomeException a))
enqueue) [BlockedFetch request]
requests
IO ()
inner
service -> IO ()
wait service
service
[IO ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_ [IO ()]
getResults
syncFetch :: ((service -> IO ()) -> IO ())
-> (service -> IO ())
-> (forall a.
service -> request a -> IO (IO (Either SomeException a)))
-> State request
-> Flags
-> u
-> PerformFetch request
syncFetch (service -> IO ()) -> IO ()
withService service -> IO ()
dispatch forall a. service -> request a -> IO (IO (Either SomeException a))
enqueue State request
_state Flags
_flags u
_si =
([BlockedFetch request] -> IO ()) -> PerformFetch request
forall (req :: * -> *).
([BlockedFetch req] -> IO ()) -> PerformFetch req
SyncFetch (([BlockedFetch request] -> IO ()) -> PerformFetch request)
-> ([BlockedFetch request] -> IO ()) -> PerformFetch request
forall a b. (a -> b) -> a -> b
$ \[BlockedFetch request]
requests -> (service -> IO ()) -> IO ()
withService ((service -> IO ()) -> IO ()) -> (service -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \service
service -> do
[IO ()]
getResults <- (BlockedFetch request -> IO (IO ()))
-> [BlockedFetch request] -> IO [IO ()]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (service
-> (forall a.
service -> request a -> IO (IO (Either SomeException a)))
-> BlockedFetch request
-> IO (IO ())
forall service (request :: * -> *).
service
-> (forall a.
service -> request a -> IO (IO (Either SomeException a)))
-> BlockedFetch request
-> IO (IO ())
submitFetch service
service forall a. service -> request a -> IO (IO (Either SomeException a))
enqueue) [BlockedFetch request]
requests
service -> IO ()
dispatch service
service
[IO ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_ [IO ()]
getResults
backgroundFetchSeq, backgroundFetchPar
:: (forall a. request a -> IO (Either SomeException a))
-> State request
-> Flags
-> u
-> PerformFetch request
backgroundFetchSeq :: (forall a. request a -> IO (Either SomeException a))
-> State request -> Flags -> u -> PerformFetch request
backgroundFetchSeq forall a. request a -> IO (Either SomeException a)
run State request
_state Flags
_flags u
_si =
([BlockedFetch request] -> IO ()) -> PerformFetch request
forall (req :: * -> *).
([BlockedFetch req] -> IO ()) -> PerformFetch req
BackgroundFetch (([BlockedFetch request] -> IO ()) -> PerformFetch request)
-> ([BlockedFetch request] -> IO ()) -> PerformFetch request
forall a b. (a -> b) -> a -> b
$ \[BlockedFetch request]
requests -> do
(Int
cap, Bool
_) <- ThreadId -> IO (Int, Bool)
threadCapability (ThreadId -> IO (Int, Bool)) -> IO ThreadId -> IO (Int, Bool)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO ThreadId
myThreadId
((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO ()) -> IO ())
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> IO () -> IO ThreadId
forkOn Int
cap (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
let rethrow :: SomeException -> IO ()
rethrow = [BlockedFetch request] -> SomeException -> IO ()
forall (req :: * -> *).
[BlockedFetch req] -> SomeException -> IO ()
rethrowFromBg [BlockedFetch request]
requests
IO () -> IO ()
forall a. IO a -> IO a
restore ((BlockedFetch request -> IO ()) -> [BlockedFetch request] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ BlockedFetch request -> IO ()
runOne [BlockedFetch request]
requests) IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` SomeException -> IO ()
rethrow
where
runOne :: BlockedFetch request -> IO ()
runOne (BlockedFetch request a
request ResultVar a
result) = do
Either SomeException a
res <- request a -> IO (Either SomeException a)
forall a. request a -> IO (Either SomeException a)
run request a
request
ResultVar a -> Either SomeException a -> IO ()
forall a. ResultVar a -> Either SomeException a -> IO ()
putResultFromBg ResultVar a
result Either SomeException a
res
backgroundFetchPar :: (forall a. request a -> IO (Either SomeException a))
-> State request -> Flags -> u -> PerformFetch request
backgroundFetchPar forall a. request a -> IO (Either SomeException a)
run State request
_state Flags
_flags u
_si =
([BlockedFetch request] -> IO ()) -> PerformFetch request
forall (req :: * -> *).
([BlockedFetch req] -> IO ()) -> PerformFetch req
BackgroundFetch (([BlockedFetch request] -> IO ()) -> PerformFetch request)
-> ([BlockedFetch request] -> IO ()) -> PerformFetch request
forall a b. (a -> b) -> a -> b
$ \[BlockedFetch request]
requests -> do
(Int
cap, Bool
_) <- ThreadId -> IO (Int, Bool)
threadCapability (ThreadId -> IO (Int, Bool)) -> IO ThreadId -> IO (Int, Bool)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO ThreadId
myThreadId
(BlockedFetch request -> IO ()) -> [BlockedFetch request] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Int -> BlockedFetch request -> IO ()
runOneInThread Int
cap) [BlockedFetch request]
requests
where
runOneInThread :: Int -> BlockedFetch request -> IO ()
runOneInThread Int
cap BlockedFetch request
request = do
((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO ()) -> IO ())
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> IO () -> IO ThreadId
forkOn Int
cap (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
let rethrow :: SomeException -> IO ()
rethrow = [BlockedFetch request] -> SomeException -> IO ()
forall (req :: * -> *).
[BlockedFetch req] -> SomeException -> IO ()
rethrowFromBg [BlockedFetch request
request]
IO () -> IO ()
forall a. IO a -> IO a
restore (BlockedFetch request -> IO ()
runOne BlockedFetch request
request) IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` SomeException -> IO ()
rethrow
runOne :: BlockedFetch request -> IO ()
runOne (BlockedFetch request a
request ResultVar a
result) = do
Either SomeException a
res <- request a -> IO (Either SomeException a)
forall a. request a -> IO (Either SomeException a)
run request a
request
ResultVar a -> Either SomeException a -> IO ()
forall a. ResultVar a -> Either SomeException a -> IO ()
putResultFromBg ResultVar a
result Either SomeException a
res
asyncFetchAcquireRelease
:: IO service
-> (service -> IO ())
-> (service -> IO ())
-> (service -> IO ())
-> (forall a. service -> request a -> IO (IO (Either SomeException a)))
-> State request
-> Flags
-> u
-> PerformFetch request
asyncFetchAcquireRelease :: IO service
-> (service -> IO ())
-> (service -> IO ())
-> (service -> IO ())
-> (forall a.
service -> request a -> IO (IO (Either SomeException a)))
-> State request
-> Flags
-> u
-> PerformFetch request
asyncFetchAcquireRelease
IO service
acquire service -> IO ()
release service -> IO ()
dispatch service -> IO ()
wait forall a. service -> request a -> IO (IO (Either SomeException a))
enqueue State request
_state Flags
_flags u
_si =
([BlockedFetch request] -> IO () -> IO ()) -> PerformFetch request
forall (req :: * -> *).
([BlockedFetch req] -> IO () -> IO ()) -> PerformFetch req
AsyncFetch (([BlockedFetch request] -> IO () -> IO ())
-> PerformFetch request)
-> ([BlockedFetch request] -> IO () -> IO ())
-> PerformFetch request
forall a b. (a -> b) -> a -> b
$ \[BlockedFetch request]
requests IO ()
inner -> ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO ()) -> IO ())
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
Either SomeException service
r1 <- IO service -> IO (Either SomeException service)
forall a. IO a -> IO (Either SomeException a)
tryWithRethrow IO service
acquire
case Either SomeException service
r1 of
Left SomeException
err -> do IO () -> IO ()
forall a. IO a -> IO a
restore IO ()
inner; SomeException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (SomeException
err :: SomeException)
Right service
service -> do
(IO () -> IO () -> IO ()) -> IO () -> IO () -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
finally (service -> IO ()
release service
service) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall a. IO a -> IO a
restore (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Either SomeException [IO ()]
r2 <- IO [IO ()] -> IO (Either SomeException [IO ()])
forall a. IO a -> IO (Either SomeException a)
tryWithRethrow (IO [IO ()] -> IO (Either SomeException [IO ()]))
-> IO [IO ()] -> IO (Either SomeException [IO ()])
forall a b. (a -> b) -> a -> b
$ do
[IO ()]
getResults <- (BlockedFetch request -> IO (IO ()))
-> [BlockedFetch request] -> IO [IO ()]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (service
-> (forall a.
service -> request a -> IO (IO (Either SomeException a)))
-> BlockedFetch request
-> IO (IO ())
forall service (request :: * -> *).
service
-> (forall a.
service -> request a -> IO (IO (Either SomeException a)))
-> BlockedFetch request
-> IO (IO ())
submitFetch service
service forall a. service -> request a -> IO (IO (Either SomeException a))
enqueue) [BlockedFetch request]
requests
service -> IO ()
dispatch service
service
[IO ()] -> IO [IO ()]
forall (m :: * -> *) a. Monad m => a -> m a
return [IO ()]
getResults
IO ()
inner
case Either SomeException [IO ()]
r2 of
Left SomeException
err -> SomeException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (SomeException
err :: SomeException)
Right [IO ()]
getResults -> do service -> IO ()
wait service
service; [IO ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_ [IO ()]
getResults
submitFetch
:: service
-> (forall a. service -> request a -> IO (IO (Either SomeException a)))
-> BlockedFetch request
-> IO (IO ())
submitFetch :: service
-> (forall a.
service -> request a -> IO (IO (Either SomeException a)))
-> BlockedFetch request
-> IO (IO ())
submitFetch service
service forall a. service -> request a -> IO (IO (Either SomeException a))
fetchFn (BlockedFetch request a
request ResultVar a
result)
= (ResultVar a -> Either SomeException a -> IO ()
forall a. ResultVar a -> Either SomeException a -> IO ()
putResult ResultVar a
result (Either SomeException a -> IO ())
-> IO (Either SomeException a) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<) (IO (Either SomeException a) -> IO ())
-> IO (IO (Either SomeException a)) -> IO (IO ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> service -> request a -> IO (IO (Either SomeException a))
forall a. service -> request a -> IO (IO (Either SomeException a))
fetchFn service
service request a
request
putResultFromBg :: ResultVar a -> Either SomeException a -> IO ()
putResultFromBg :: ResultVar a -> Either SomeException a -> IO ()
putResultFromBg ResultVar a
result Either SomeException a
r = do
ResultVar a -> Either SomeException a -> IO ()
forall a. ResultVar a -> Either SomeException a -> IO ()
putResultFromChildThread ResultVar a
result Either SomeException a
r
Int64 -> IO ()
setAllocationCounter Int64
0
rethrowFromBg :: [BlockedFetch req] -> SomeException -> IO ()
rethrowFromBg :: [BlockedFetch req] -> SomeException -> IO ()
rethrowFromBg [BlockedFetch req]
requests SomeException
e = do
(BlockedFetch req -> IO ()) -> [BlockedFetch req] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (SomeException -> BlockedFetch req -> IO ()
forall (r :: * -> *). SomeException -> BlockedFetch r -> IO ()
rethrow1bg SomeException
e) [BlockedFetch req]
requests
SomeException -> IO ()
rethrowAsyncExceptions SomeException
e
where
rethrow1bg :: SomeException -> BlockedFetch r -> IO ()
rethrow1bg SomeException
e (BlockedFetch r a
_ ResultVar a
result) =
ResultVar a -> Either SomeException a -> IO ()
forall a. ResultVar a -> Either SomeException a -> IO ()
putResultFromBg ResultVar a
result (SomeException -> Either SomeException a
forall a b. a -> Either a b
Left SomeException
e)
backgroundFetchAcquireReleaseMVar
:: IO service
-> (service -> IO ())
-> (service -> Int -> MVar () -> IO ())
-> (service -> IO ())
-> (forall a. service -> request a -> IO (IO (Either SomeException a)))
-> State request
-> Flags
-> u
-> PerformFetch request
backgroundFetchAcquireReleaseMVar :: IO service
-> (service -> IO ())
-> (service -> Int -> MVar () -> IO ())
-> (service -> IO ())
-> (forall a.
service -> request a -> IO (IO (Either SomeException a)))
-> State request
-> Flags
-> u
-> PerformFetch request
backgroundFetchAcquireReleaseMVar
IO service
acquire service -> IO ()
release service -> Int -> MVar () -> IO ()
dispatch service -> IO ()
process forall a. service -> request a -> IO (IO (Either SomeException a))
enqueue State request
_state Flags
_flags u
_si =
([BlockedFetch request] -> IO ()) -> PerformFetch request
forall (req :: * -> *).
([BlockedFetch req] -> IO ()) -> PerformFetch req
BackgroundFetch (([BlockedFetch request] -> IO ()) -> PerformFetch request)
-> ([BlockedFetch request] -> IO ()) -> PerformFetch request
forall a b. (a -> b) -> a -> b
$ \[BlockedFetch request]
requests -> do
MVar ()
mvar <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO ()) -> IO ())
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
(Int
cap, Bool
_) <- ThreadId -> IO (Int, Bool)
threadCapability (ThreadId -> IO (Int, Bool)) -> IO ThreadId -> IO (Int, Bool)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO ThreadId
myThreadId
service
service <- IO service
acquire
IO ()
getResults <- (do
[IO ()]
results <- IO [IO ()] -> IO [IO ()]
forall a. IO a -> IO a
restore (IO [IO ()] -> IO [IO ()]) -> IO [IO ()] -> IO [IO ()]
forall a b. (a -> b) -> a -> b
$ (BlockedFetch request -> IO (IO ()))
-> [BlockedFetch request] -> IO [IO ()]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (service -> BlockedFetch request -> IO (IO ())
submit service
service) [BlockedFetch request]
requests
service -> Int -> MVar () -> IO ()
dispatch service
service Int
cap MVar ()
mvar
IO () -> IO (IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return ([IO ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_ [IO ()]
results)) IO (IO ()) -> IO () -> IO (IO ())
forall a b. IO a -> IO b -> IO a
`onException` service -> IO ()
release service
service
ThreadId
_tid <- Int -> IO () -> IO ThreadId
forkOn Int
cap (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
mvar
let rethrow :: SomeException -> IO ()
rethrow = [BlockedFetch request] -> SomeException -> IO ()
forall (req :: * -> *).
[BlockedFetch req] -> SomeException -> IO ()
rethrowFromBg [BlockedFetch request]
requests
()
_ <- IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
finally
(IO () -> IO ()
forall a. IO a -> IO a
restore (service -> IO ()
process service
service IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
getResults) IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` SomeException -> IO ()
rethrow)
(service -> IO ()
release service
service IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` SomeException -> IO ()
rethrow)
() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
where
submit :: service -> BlockedFetch request -> IO (IO ())
submit service
service (BlockedFetch request a
request ResultVar a
result) =
(ResultVar a -> Either SomeException a -> IO ()
forall a. ResultVar a -> Either SomeException a -> IO ()
putResultFromBg ResultVar a
result (Either SomeException a -> IO ())
-> IO (Either SomeException a) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<) (IO (Either SomeException a) -> IO ())
-> IO (IO (Either SomeException a)) -> IO (IO ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> service -> request a -> IO (IO (Either SomeException a))
forall a. service -> request a -> IO (IO (Either SomeException a))
enqueue service
service request a
request
backgroundFetchAcquireRelease
:: IO service
-> (service -> IO ())
-> (service -> Int -> StablePtr PrimMVar -> IO ())
-> (service -> IO ())
-> (forall a. service -> request a -> IO (IO (Either SomeException a)))
-> State request
-> Flags
-> u
-> PerformFetch request
backgroundFetchAcquireRelease :: IO service
-> (service -> IO ())
-> (service -> Int -> StablePtr PrimMVar -> IO ())
-> (service -> IO ())
-> (forall a.
service -> request a -> IO (IO (Either SomeException a)))
-> State request
-> Flags
-> u
-> PerformFetch request
backgroundFetchAcquireRelease
IO service
a service -> IO ()
r service -> Int -> StablePtr PrimMVar -> IO ()
dispatch = IO service
-> (service -> IO ())
-> (service -> Int -> MVar () -> IO ())
-> (service -> IO ())
-> (forall a.
service -> request a -> IO (IO (Either SomeException a)))
-> State request
-> Flags
-> u
-> PerformFetch request
forall service (request :: * -> *) u.
IO service
-> (service -> IO ())
-> (service -> Int -> MVar () -> IO ())
-> (service -> IO ())
-> (forall a.
service -> request a -> IO (IO (Either SomeException a)))
-> State request
-> Flags
-> u
-> PerformFetch request
backgroundFetchAcquireReleaseMVar
IO service
a
service -> IO ()
r
(\service
s Int
c MVar ()
mvar -> do
StablePtr PrimMVar
sp <- MVar () -> IO (StablePtr PrimMVar)
newStablePtrPrimMVar MVar ()
mvar
service -> Int -> StablePtr PrimMVar -> IO ()
dispatch service
s Int
c StablePtr PrimMVar
sp)