{-# LANGUAGE CPP #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE DeriveDataTypeable #-}
module Haxl.Core.Fetch
( dataFetch
, dataFetchWithShow
, uncachedRequest
, cacheResult
, dupableCacheRequest
, cacheResultWithShow
, cacheRequest
, performFetches
, performRequestStore
, ShowReq
) where
import Control.Concurrent.STM
import Control.Exception as Exception
import Control.Monad
import Data.Either
import Data.Hashable
import Data.IORef
import Data.Int
import Data.List
#if __GLASGOW_HASKELL__ < 804
import Data.Monoid
#endif
import Data.Proxy
import Data.Typeable
import Data.Text (Text)
import qualified Data.Text as Text
import Text.Printf
#ifdef PROFILING
import GHC.Stack
#endif
import Haxl.Core.DataSource
import Haxl.Core.DataCache as DataCache
import Haxl.Core.Exception
import Haxl.Core.Flags
import Haxl.Core.Monad
import Haxl.Core.Profile
import Haxl.Core.RequestStore
import Haxl.Core.ShowP
import Haxl.Core.Stats
import Haxl.Core.StateStore
import Haxl.Core.Util
data CacheResult u w a
= Uncached
(ResultVar a)
{-# UNPACK #-} !(IVar u w a)
{-# UNPACK #-} !CallId
| CachedNotFetched
{-# UNPACK #-} !(IVar u w a)
{-# UNPACK #-} !CallId
| Cached (ResultVal a w)
{-# UNPACK #-} !CallId
type ShowReq r a = (r a -> String, a -> String)
cachedWithInsert
:: forall r a u w.
(DataSource u r, Typeable (r a))
=> (r a -> String)
-> (r a -> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
-> Env u w -> r a -> IO (CacheResult u w a)
cachedWithInsert showFn insertFn env@Env{..} req = do
let
doFetch = do
ivar <- newIVar
k <- nextCallId env
let !rvar = stdResultVar ivar completions submittedReqsRef flags
(Proxy :: Proxy r)
insertFn req (DataCacheItem ivar k) dataCache
return (Uncached rvar ivar k)
mbRes <- DataCache.lookup req dataCache
case mbRes of
Nothing -> doFetch
Just (DataCacheItem i@IVar{ivarRef = cr} k) -> do
e <- readIORef cr
case e of
IVarEmpty _ -> do
ivar <- withCurrentCCS i
return (CachedNotFetched ivar k)
IVarFull r -> do
ifTrace flags 3 $ putStrLn $ case r of
ThrowIO{} -> "Cached error: " ++ showFn req
ThrowHaxl{} -> "Cached error: " ++ showFn req
Ok{} -> "Cached request: " ++ showFn req
return (Cached r k)
stdResultVar
:: forall r a u w. (DataSourceName r, Typeable r)
=> IVar u w a
-> TVar [CompleteReq u w]
-> IORef ReqCountMap
-> Flags
-> Proxy r
-> ResultVar a
stdResultVar ivar completions ref flags p =
mkResultVar $ \r isChildThread -> do
allocs <- if isChildThread
then
getAllocationCounter
else
return 0
atomicallyOnBlocking
(LogicBug (ReadingCompletionsFailedFetch (dataSourceName p))) $ do
cs <- readTVar completions
writeTVar completions (CompleteReq r ivar allocs : cs)
ifReport flags 1 $
atomicModifyIORef' ref (\m -> (subFromCountMap p 1 m, ()))
{-# INLINE stdResultVar #-}
logFetch :: Env u w -> (r a -> String) -> r a -> CallId -> IO ()
#ifdef PROFILING
logFetch env showFn req fid = do
ifReport (flags env) 5 $ do
stack <- currentCallStack
modifyIORef' (statsRef env) $ \(Stats s) ->
Stats (FetchCall (showFn req) stack fid : s)
#else
logFetch _ _ _ _ = return ()
#endif
dataFetch :: (DataSource u r, Request r a) => r a -> GenHaxl u w a
dataFetch = dataFetchWithInsert show DataCache.insert
dataFetchWithShow
:: (DataSource u r, Eq (r a), Hashable (r a), Typeable (r a))
=> ShowReq r a
-> r a -> GenHaxl u w a
dataFetchWithShow (showReq, showRes) = dataFetchWithInsert showReq
(DataCache.insertWithShow showReq showRes)
dataFetchWithInsert
:: forall u w r a
. (DataSource u r, Eq (r a), Hashable (r a), Typeable (r a))
=> (r a -> String)
-> (r a -> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
-> r a
-> GenHaxl u w a
dataFetchWithInsert showFn insertFn req =
GenHaxl $ \env@Env{..} -> do
res <- cachedWithInsert showFn insertFn env req
case res of
Uncached rvar ivar fid -> do
logFetch env showFn req fid
ifProfiling flags $ addProfileFetch env req fid False
let blockedFetch = BlockedFetch req rvar
let blockedFetchI = BlockedFetchInternal fid
case schedulerHint userEnv :: SchedulerHint r of
SubmitImmediately ->
performFetches env [BlockedFetches [blockedFetch] [blockedFetchI]]
TryToBatch ->
modifyIORef' reqStoreRef $ \bs ->
addRequest blockedFetch blockedFetchI bs
return $ Blocked ivar (Return ivar)
CachedNotFetched ivar fid -> do
ifProfiling flags $ addProfileFetch env req fid True
return $ Blocked ivar (Return ivar)
Cached r fid -> do
ifProfiling flags $ addProfileFetch env req fid True
done r
uncachedRequest
:: forall a u w (r :: * -> *). (DataSource u r, Request r a)
=> r a -> GenHaxl u w a
uncachedRequest req = do
flg <- env flags
subRef <- env submittedReqsRef
if recording flg /= 0
then dataFetch req
else GenHaxl $ \e@Env{..} -> do
ivar <- newIVar
k <- nextCallId e
let !rvar = stdResultVar ivar completions subRef flg (Proxy :: Proxy r)
modifyIORef' reqStoreRef $ \bs ->
addRequest (BlockedFetch req rvar) (BlockedFetchInternal k) bs
return $ Blocked ivar (Return ivar)
cacheResult :: Request r a => r a -> IO a -> GenHaxl u w a
cacheResult = cacheResultWithInsert show DataCache.insert
cacheResultWithShow
:: (Eq (r a), Hashable (r a), Typeable (r a))
=> ShowReq r a -> r a -> IO a -> GenHaxl u w a
cacheResultWithShow (showReq, showRes) = cacheResultWithInsert showReq
(DataCache.insertWithShow showReq showRes)
cacheResultWithInsert
:: Typeable (r a)
=> (r a -> String)
-> (r a -> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
-> r a
-> IO a
-> GenHaxl u w a
cacheResultWithInsert showFn insertFn req val = GenHaxl $ \e@Env{..} -> do
mbRes <- DataCache.lookup req dataCache
case mbRes of
Nothing -> do
eitherResult <- Exception.try val
case eitherResult of
Left e -> rethrowAsyncExceptions e
_ -> return ()
let result = eitherToResultThrowIO eitherResult
ivar <- newFullIVar result
k <- nextCallId e
insertFn req (DataCacheItem ivar k) dataCache
done result
Just (DataCacheItem IVar{ivarRef = cr} _) -> do
e <- readIORef cr
case e of
IVarEmpty _ -> corruptCache
IVarFull r -> done r
where
corruptCache = raise . DataSourceError $ Text.concat
[ Text.pack (showFn req)
, " has a corrupted cache value: these requests are meant to"
, " return immediately without an intermediate value. Either"
, " the cache was updated incorrectly, or you're calling"
, " cacheResult on a query that involves a blocking fetch."
]
cacheRequest
:: Request req a => req a -> Either SomeException a -> GenHaxl u w ()
cacheRequest request result = GenHaxl $ \e@Env{..} -> do
mbRes <- DataCache.lookup request dataCache
case mbRes of
Nothing -> do
cr <- newFullIVar (eitherToResult result)
k <- nextCallId e
DataCache.insert request (DataCacheItem cr k) dataCache
return (Done ())
_other -> raise $
DataSourceError "cacheRequest: request is already in the cache"
dupableCacheRequest
:: Request req a => req a -> Either SomeException a -> GenHaxl u w ()
dupableCacheRequest request result = GenHaxl $ \e@Env{..} -> do
cr <- newFullIVar (eitherToResult result)
k <- nextCallId e
DataCache.insert request (DataCacheItem cr k) dataCache
return (Done ())
performRequestStore
:: forall u w. Env u w -> RequestStore u -> IO ()
performRequestStore env reqStore =
performFetches env (contents reqStore)
performFetches
:: forall u w. Env u w -> [BlockedFetches u] -> IO ()
performFetches env@Env{flags=f, statsRef=sref, statsBatchIdRef=sbref} jobs = do
t0 <- getTimestamp
ifTrace f 3 $
forM_ jobs $ \(BlockedFetches reqs _) ->
forM_ reqs $ \(BlockedFetch r _) -> putStrLn (showp r)
let
applyFetch i bfs@(BlockedFetches (reqs :: [BlockedFetch r]) _) =
case stateGet (states env) of
Nothing ->
return (FetchToDo reqs (SyncFetch (mapM_ (setError e))))
where
e :: ShowP req => req a -> DataSourceError
e req = DataSourceError $ "data source not initialized: " <> dsName
<> ": "
<> Text.pack (showp req)
Just state ->
return $ FetchToDo reqs
$ (if report f >= 2
then wrapFetchInStats
(userEnv env)
sref
sbref
dsName
(length reqs)
bfs
else id)
$ wrapFetchInTrace i (length reqs) dsName
$ wrapFetchInCatch reqs
$ fetch state f (userEnv env)
where
dsName = dataSourceName (Proxy :: Proxy r)
fetches <- zipWithM applyFetch [0..] jobs
scheduleFetches fetches (submittedReqsRef env) (flags env)
t1 <- getTimestamp
let roundtime = fromIntegral (t1 - t0) / 1000000 :: Double
ifTrace f 1 $
printf "Batch data fetch done (%.4fs)\n" (realToFrac roundtime :: Double)
data FetchToDo where
FetchToDo
:: forall (req :: * -> *). (DataSourceName req, Typeable req)
=> [BlockedFetch req] -> PerformFetch req -> FetchToDo
wrapFetchInCatch :: [BlockedFetch req] -> PerformFetch req -> PerformFetch req
wrapFetchInCatch reqs fetch =
case fetch of
SyncFetch f ->
SyncFetch $ \reqs -> f reqs `Exception.catch` handler
AsyncFetch f ->
AsyncFetch $ \reqs io -> f reqs io `Exception.catch` handler
BackgroundFetch f ->
BackgroundFetch $ \reqs -> f reqs `Exception.catch` handler
where
handler :: SomeException -> IO ()
handler e = do
rethrowAsyncExceptions e
mapM_ (forceError e) reqs
forceError e (BlockedFetch _ rvar) =
putResult rvar (except e)
data FailureCount = FailureCount
{ failureCountStandard :: {-# UNPACK #-} !Int
, failureCountIgnored :: {-# UNPACK #-} !Int
}
#if __GLASGOW_HASKELL__ >= 804
instance Semigroup FailureCount where
(<>) = mappend
#endif
instance Monoid FailureCount where
mempty = FailureCount 0 0
mappend (FailureCount s1 i1) (FailureCount s2 i2)
= FailureCount (s1+s2) (i1+i2)
wrapFetchInStats
:: DataSource u req
=> u
-> IORef Stats
-> IORef Int
-> Text
-> Int
-> BlockedFetches u
-> PerformFetch req
-> PerformFetch req
wrapFetchInStats
u
!statsRef
!batchIdRef
dataSource
batchSize
(BlockedFetches _reqs reqsI)
perform = do
case perform of
SyncFetch f ->
SyncFetch $ \reqs -> do
bid <- newBatchId
fail_ref <- newIORef mempty
(t0,t,alloc,_) <- statsForIO (f (map (addFailureCount u fail_ref) reqs))
failures <- readIORef fail_ref
updateFetchStats bid allFids t0 t alloc batchSize failures
AsyncFetch f -> do
AsyncFetch $ \reqs inner -> do
bid <- newBatchId
inner_r <- newIORef (0, 0)
fail_ref <- newIORef mempty
let inner' = do
(_,t,alloc,_) <- statsForIO inner
writeIORef inner_r (t,alloc)
reqs' = map (addFailureCount u fail_ref) reqs
(t0, totalTime, totalAlloc, _) <- statsForIO (f reqs' inner')
(innerTime, innerAlloc) <- readIORef inner_r
failures <- readIORef fail_ref
updateFetchStats bid allFids t0 (totalTime - innerTime)
(totalAlloc - innerAlloc) batchSize failures
BackgroundFetch io -> do
BackgroundFetch $ \reqs -> do
bid <- newBatchId
startTime <- getTimestamp
io (zipWith (addTimer u bid startTime) reqs reqsI)
where
allFids = map (\(BlockedFetchInternal k) -> k) reqsI
newBatchId = atomicModifyIORef' batchIdRef $ \x -> (x+1,x+1)
statsForIO io = do
prevAlloc <- getAllocationCounter
(t0,t,a) <- time io
postAlloc <- getAllocationCounter
return (t0,t, fromIntegral $ prevAlloc - postAlloc, a)
calcFailure _u _r (Right _) = mempty
calcFailure u r (Left e) = case classifyFailure u r e of
StandardFailure -> mempty { failureCountStandard = 1 }
IgnoredForStatsFailure -> mempty { failureCountIgnored = 1 }
addTimer
u
bid
t0
(BlockedFetch req (ResultVar fn))
(BlockedFetchInternal fid) =
BlockedFetch req $ ResultVar $ \result isChildThread -> do
t1 <- getTimestamp
allocs <- if isChildThread then getAllocationCounter else return 0
updateFetchStats bid [fid] t0 (t1 - t0)
(negate allocs)
1
(calcFailure u req result)
fn result isChildThread
updateFetchStats
:: Int
-> [CallId]
-> Timestamp
-> Microseconds
-> Int64
-> Int
-> FailureCount
-> IO ()
updateFetchStats bid fids start time space batch FailureCount{..} = do
let this = FetchStats { fetchDataSource = dataSource
, fetchBatchSize = batch
, fetchStart = start
, fetchDuration = time
, fetchSpace = space
, fetchFailures = failureCountStandard
, fetchIgnoredFailures = failureCountIgnored
, fetchBatchId = bid
, fetchIds = fids }
atomicModifyIORef' statsRef $ \(Stats fs) -> (Stats (this : fs), ())
addFailureCount :: DataSource u r
=> u -> IORef FailureCount -> BlockedFetch r -> BlockedFetch r
addFailureCount u ref (BlockedFetch req (ResultVar fn)) =
BlockedFetch req $ ResultVar $ \result isChildThread -> do
let addFailures r = (r <> calcFailure u req result, ())
when (isLeft result) $ atomicModifyIORef' ref addFailures
fn result isChildThread
wrapFetchInTrace
:: Int
-> Int
-> Text
-> PerformFetch req
-> PerformFetch req
#ifdef EVENTLOG
wrapFetchInTrace i n dsName f =
case f of
SyncFetch io -> SyncFetch (wrapF "Sync" io)
AsyncFetch fio -> AsyncFetch (wrapF "Async" . fio . unwrapF "Async")
where
d = Text.unpack dsName
wrapF :: String -> IO a -> IO a
wrapF ty = bracket_ (traceEventIO $ printf "START %d %s (%d %s)" i d n ty)
(traceEventIO $ printf "STOP %d %s (%d %s)" i d n ty)
unwrapF :: String -> IO a -> IO a
unwrapF ty = bracket_ (traceEventIO $ printf "STOP %d %s (%d %s)" i d n ty)
(traceEventIO $ printf "START %d %s (%d %s)" i d n ty)
#else
wrapFetchInTrace _ _ _ f = f
#endif
time :: IO a -> IO (Timestamp,Microseconds,a)
time io = do
t0 <- getTimestamp
a <- io
t1 <- getTimestamp
return (t0, t1 - t0, a)
scheduleFetches :: [FetchToDo] -> IORef ReqCountMap -> Flags -> IO ()
scheduleFetches fetches ref flags = do
ifReport flags 1 $ sequence_
[ atomicModifyIORef' ref $
\m -> (addToCountMap (Proxy :: Proxy r) (length reqs) m, ())
| FetchToDo (reqs :: [BlockedFetch r]) _f <- fetches
]
ifTrace flags 1 $ printf "Batch data fetch round: %s\n" $
intercalate (", "::String) $
map (\(c, n, ds) -> printf "%s %s %d" n ds c) stats
fully_async_fetches
async_fetches sync_fetches
where
fetchName :: forall req . PerformFetch req -> String
fetchName (BackgroundFetch _) = "background"
fetchName (AsyncFetch _) = "async"
fetchName (SyncFetch _) = "sync"
srcName :: forall req . (DataSourceName req) => [BlockedFetch req] -> String
srcName _ = Text.unpack $ dataSourceName (Proxy :: Proxy req)
stats = [(length reqs, fetchName f, srcName reqs)
| FetchToDo reqs f <- fetches]
fully_async_fetches :: IO ()
fully_async_fetches = sequence_
[f reqs | FetchToDo reqs (BackgroundFetch f) <- fetches]
async_fetches :: IO () -> IO ()
async_fetches = compose
[f reqs | FetchToDo reqs (AsyncFetch f) <- fetches]
sync_fetches :: IO ()
sync_fetches = sequence_
[f reqs | FetchToDo reqs (SyncFetch f) <- fetches]
data ReadingCompletionsFailedFetch = ReadingCompletionsFailedFetch Text
deriving Show
instance Exception ReadingCompletionsFailedFetch