{-# LANGUAGE CPP #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE OverloadedStrings #-}
module Haxl.Core.Fetch
( dataFetch
, dataFetchWithShow
, uncachedRequest
, cacheResult
, cacheResultWithShow
, cacheRequest
, performFetches
, performRequestStore
, ShowReq
) where
import Control.Concurrent.STM
import Control.Exception as Exception
import Control.Monad
import Data.Either
import Data.Hashable
import Data.IORef
import Data.Int
import Data.List
import Data.Monoid
import Data.Typeable
import Data.Text (Text)
import qualified Data.Text as Text
import Text.Printf
#ifdef PROFILING
import GHC.Stack
#endif
import Haxl.Core.DataSource
import Haxl.Core.DataCache as DataCache
import Haxl.Core.Exception
import Haxl.Core.Flags
import Haxl.Core.Monad
import Haxl.Core.Profile
import Haxl.Core.RequestStore
import Haxl.Core.ShowP
import Haxl.Core.Stats
import Haxl.Core.StateStore
import Haxl.Core.Util
data CacheResult u a
= Uncached
(ResultVar a)
{-# UNPACK #-} !(IVar u a)
| CachedNotFetched
{-# UNPACK #-} !(IVar u a)
| Cached (ResultVal a)
type ShowReq r a = (r a -> String, a -> String)
cachedWithInsert
:: forall r a u .
Typeable (r a)
=> (r a -> String)
-> (r a -> IVar u a -> DataCache (IVar u) -> DataCache (IVar u))
-> Env u -> r a -> IO (CacheResult u a)
cachedWithInsert showFn insertFn Env{..} req = do
cache <- readIORef cacheRef
let
doFetch = do
ivar <- newIVar
let !rvar = stdResultVar ivar completions
writeIORef cacheRef $! insertFn req ivar cache
return (Uncached rvar ivar)
case DataCache.lookup req cache of
Nothing -> doFetch
Just (IVar cr) -> do
e <- readIORef cr
case e of
IVarEmpty _ -> return (CachedNotFetched (IVar cr))
IVarFull r -> do
ifTrace flags 3 $ putStrLn $ case r of
ThrowIO _ -> "Cached error: " ++ showFn req
ThrowHaxl _ -> "Cached error: " ++ showFn req
Ok _ -> "Cached request: " ++ showFn req
return (Cached r)
stdResultVar :: IVar u a -> TVar [CompleteReq u] -> ResultVar a
stdResultVar ivar completions = mkResultVar $ \r isChildThread -> do
allocs <- if isChildThread
then
getAllocationCounter
else
return 0
atomically $ do
cs <- readTVar completions
writeTVar completions (CompleteReq r ivar allocs : cs)
{-# INLINE stdResultVar #-}
logFetch :: Env u -> (r a -> String) -> r a -> IO ()
#ifdef PROFILING
logFetch env showFn req = do
ifReport (flags env) 5 $ do
stack <- currentCallStack
modifyIORef' (statsRef env) $ \(Stats s) ->
Stats (FetchCall (showFn req) stack : s)
#else
logFetch _ _ _ = return ()
#endif
dataFetch :: (DataSource u r, Request r a) => r a -> GenHaxl u a
dataFetch = dataFetchWithInsert show DataCache.insert
dataFetchWithShow
:: (DataSource u r, Eq (r a), Hashable (r a), Typeable (r a))
=> ShowReq r a
-> r a -> GenHaxl u a
dataFetchWithShow (showReq, showRes) = dataFetchWithInsert showReq
(DataCache.insertWithShow showReq showRes)
dataFetchWithInsert
:: forall u r a
. (DataSource u r, Eq (r a), Hashable (r a), Typeable (r a))
=> (r a -> String)
-> (r a -> IVar u a -> DataCache (IVar u) -> DataCache (IVar u))
-> r a
-> GenHaxl u a
dataFetchWithInsert showFn insertFn req =
GenHaxl $ \env@Env{..} -> do
res <- cachedWithInsert showFn insertFn env req
ifProfiling flags $ addProfileFetch env req
case res of
Uncached rvar ivar -> do
logFetch env showFn req
case schedulerHint userEnv :: SchedulerHint r of
SubmitImmediately -> do
(_,ios) <- performFetches 0 env
[BlockedFetches [BlockedFetch req rvar]]
when (not (null ios)) $
error "bad data source:SubmitImmediately but returns FutureFetch"
TryToBatch ->
modifyIORef' reqStoreRef $ \bs ->
addRequest (BlockedFetch req rvar) bs
return $ Blocked ivar (Cont (getIVar ivar))
CachedNotFetched ivar -> return $ Blocked ivar (Cont (getIVar ivar))
Cached r -> done r
uncachedRequest :: (DataSource u r, Request r a) => r a -> GenHaxl u a
uncachedRequest req = do
isRecordingFlag <- env (recording . flags)
if isRecordingFlag /= 0
then dataFetch req
else GenHaxl $ \Env{..} -> do
ivar <- newIVar
let !rvar = stdResultVar ivar completions
modifyIORef' reqStoreRef $ \bs ->
addRequest (BlockedFetch req rvar) bs
return $ Blocked ivar (Cont (getIVar ivar))
cacheResult :: Request r a => r a -> IO a -> GenHaxl u a
cacheResult = cacheResultWithInsert show DataCache.insert
cacheResultWithShow
:: (Eq (r a), Hashable (r a), Typeable (r a))
=> ShowReq r a -> r a -> IO a -> GenHaxl u a
cacheResultWithShow (showReq, showRes) = cacheResultWithInsert showReq
(DataCache.insertWithShow showReq showRes)
cacheResultWithInsert
:: Typeable (r a)
=> (r a -> String)
-> (r a -> IVar u a -> DataCache (IVar u) -> DataCache (IVar u)) -> r a
-> IO a -> GenHaxl u a
cacheResultWithInsert showFn insertFn req val = GenHaxl $ \env -> do
let !ref = cacheRef env
cache <- readIORef ref
case DataCache.lookup req cache of
Nothing -> do
eitherResult <- Exception.try val
case eitherResult of
Left e -> rethrowAsyncExceptions e
_ -> return ()
let result = eitherToResultThrowIO eitherResult
ivar <- newFullIVar result
writeIORef ref $! insertFn req ivar cache
done result
Just (IVar cr) -> do
e <- readIORef cr
case e of
IVarEmpty _ -> corruptCache
IVarFull r -> done r
where
corruptCache = raise . DataSourceError $ Text.concat
[ Text.pack (showFn req)
, " has a corrupted cache value: these requests are meant to"
, " return immediately without an intermediate value. Either"
, " the cache was updated incorrectly, or you're calling"
, " cacheResult on a query that involves a blocking fetch."
]
cacheRequest
:: Request req a => req a -> Either SomeException a -> GenHaxl u ()
cacheRequest request result = GenHaxl $ \env -> do
cache <- readIORef (cacheRef env)
case DataCache.lookup request cache of
Nothing -> do
cr <- newFullIVar (eitherToResult result)
writeIORef (cacheRef env) $! DataCache.insert request cr cache
return (Done ())
_other -> raise $
DataSourceError "cacheRequest: request is already in the cache"
performRequestStore
:: forall u. Int -> Env u -> RequestStore u -> IO (Int, [IO ()])
performRequestStore n env reqStore =
performFetches n env (contents reqStore)
performFetches
:: forall u. Int -> Env u -> [BlockedFetches u] -> IO (Int, [IO ()])
performFetches n env@Env{flags=f, statsRef=sref} jobs = do
let !n' = n + length jobs
t0 <- getTimestamp
let
roundstats =
[ (dataSourceName (Proxy :: Proxy r), length reqs)
| BlockedFetches (reqs :: [BlockedFetch r]) <- jobs ]
ifTrace f 1 $
printf "Batch data fetch (%s)\n" $
intercalate (", "::String) $
map (\(name,num) -> printf "%d %s" num (Text.unpack name)) roundstats
ifTrace f 3 $
forM_ jobs $ \(BlockedFetches reqs) ->
forM_ reqs $ \(BlockedFetch r _) -> putStrLn (showp r)
let
applyFetch i (BlockedFetches (reqs :: [BlockedFetch r])) =
case stateGet (states env) of
Nothing ->
return (FetchToDo reqs (SyncFetch (mapM_ (setError e))))
where
e :: ShowP req => req a -> DataSourceError
e req = DataSourceError $ "data source not initialized: " <> dsName
<> ": "
<> Text.pack (showp req)
Just state ->
return
$ FetchToDo reqs
$ (if report f >= 2
then wrapFetchInStats sref dsName (length reqs)
else id)
$ wrapFetchInTrace i (length reqs) dsName
$ wrapFetchInCatch reqs
$ fetch state f (userEnv env)
where
dsName = dataSourceName (Proxy :: Proxy r)
fetches <- zipWithM applyFetch [n..] jobs
waits <- scheduleFetches fetches
t1 <- getTimestamp
let roundtime = fromIntegral (t1 - t0) / 1000000 :: Double
ifTrace f 1 $
printf "Batch data fetch done (%.2fs)\n" (realToFrac roundtime :: Double)
return (n', waits)
data FetchToDo where
FetchToDo :: [BlockedFetch req] -> PerformFetch req -> FetchToDo
wrapFetchInCatch :: [BlockedFetch req] -> PerformFetch req -> PerformFetch req
wrapFetchInCatch reqs fetch =
case fetch of
SyncFetch f ->
SyncFetch $ \reqs -> f reqs `Exception.catch` handler
AsyncFetch f ->
AsyncFetch $ \reqs io -> f reqs io `Exception.catch` handler
FutureFetch f ->
FutureFetch $ \reqs -> f reqs `Exception.catch` (
\e -> handler e >> return (return ()))
BackgroundFetch f ->
BackgroundFetch $ \reqs -> f reqs `Exception.catch` handler
where
handler :: SomeException -> IO ()
handler e = do
rethrowAsyncExceptions e
mapM_ (forceError e) reqs
forceError e (BlockedFetch _ rvar) =
putResult rvar (except e)
wrapFetchInStats
:: IORef Stats
-> Text
-> Int
-> PerformFetch req
-> PerformFetch req
wrapFetchInStats !statsRef dataSource batchSize perform = do
case perform of
SyncFetch f ->
SyncFetch $ \reqs -> do
fail_ref <- newIORef 0
(t0,t,alloc,_) <- statsForIO (f (map (addFailureCount fail_ref) reqs))
failures <- readIORef fail_ref
updateFetchStats t0 t alloc batchSize failures
AsyncFetch f -> do
AsyncFetch $ \reqs inner -> do
inner_r <- newIORef (0, 0)
fail_ref <- newIORef 0
let inner' = do
(_,t,alloc,_) <- statsForIO inner
writeIORef inner_r (t,alloc)
reqs' = map (addFailureCount fail_ref) reqs
(t0, totalTime, totalAlloc, _) <- statsForIO (f reqs' inner')
(innerTime, innerAlloc) <- readIORef inner_r
failures <- readIORef fail_ref
updateFetchStats t0 (totalTime - innerTime) (totalAlloc - innerAlloc)
batchSize failures
FutureFetch submit ->
FutureFetch $ \reqs -> do
fail_ref <- newIORef 0
let reqs' = map (addFailureCount fail_ref) reqs
(t0, submitTime, submitAlloc, wait) <- statsForIO (submit reqs')
return $ do
(_, waitTime, waitAlloc, _) <- statsForIO wait
failures <- readIORef fail_ref
updateFetchStats t0 (submitTime + waitTime) (submitAlloc + waitAlloc)
batchSize failures
BackgroundFetch io -> do
BackgroundFetch $ \reqs -> do
startTime <- getTimestamp
io (map (addTimer startTime) reqs)
where
statsForIO io = do
prevAlloc <- getAllocationCounter
(t0,t,a) <- time io
postAlloc <- getAllocationCounter
return (t0,t, fromIntegral $ prevAlloc - postAlloc, a)
addTimer t0 (BlockedFetch req (ResultVar fn)) =
BlockedFetch req $ ResultVar $ \result isChildThread -> do
t1 <- getTimestamp
updateFetchStats t0 (t1 - t0)
0
1
(if isLeft result then 1 else 0)
fn result isChildThread
updateFetchStats
:: Timestamp -> Microseconds -> Int64 -> Int -> Int -> IO ()
updateFetchStats start time space batch failures = do
let this = FetchStats { fetchDataSource = dataSource
, fetchBatchSize = batch
, fetchStart = start
, fetchDuration = time
, fetchSpace = space
, fetchFailures = failures }
atomicModifyIORef' statsRef $ \(Stats fs) -> (Stats (this : fs), ())
addFailureCount :: IORef Int -> BlockedFetch r -> BlockedFetch r
addFailureCount ref (BlockedFetch req (ResultVar fn)) =
BlockedFetch req $ ResultVar $ \result isChildThread -> do
when (isLeft result) $ atomicModifyIORef' ref (\r -> (r+1,()))
fn result isChildThread
wrapFetchInTrace
:: Int
-> Int
-> Text
-> PerformFetch req
-> PerformFetch req
#ifdef EVENTLOG
wrapFetchInTrace i n dsName f =
case f of
SyncFetch io -> SyncFetch (wrapF "Sync" io)
AsyncFetch fio -> AsyncFetch (wrapF "Async" . fio . unwrapF "Async")
where
d = Text.unpack dsName
wrapF :: String -> IO a -> IO a
wrapF ty = bracket_ (traceEventIO $ printf "START %d %s (%d %s)" i d n ty)
(traceEventIO $ printf "STOP %d %s (%d %s)" i d n ty)
unwrapF :: String -> IO a -> IO a
unwrapF ty = bracket_ (traceEventIO $ printf "STOP %d %s (%d %s)" i d n ty)
(traceEventIO $ printf "START %d %s (%d %s)" i d n ty)
#else
wrapFetchInTrace _ _ _ f = f
#endif
time :: IO a -> IO (Timestamp,Microseconds,a)
time io = do
t0 <- getTimestamp
a <- io
t1 <- getTimestamp
return (t0, t1 - t0, a)
scheduleFetches :: [FetchToDo] -> IO [IO ()]
scheduleFetches fetches = do
fully_async_fetches
waits <- future_fetches
async_fetches sync_fetches
return waits
where
fully_async_fetches :: IO ()
fully_async_fetches =
sequence_ [f reqs | FetchToDo reqs (BackgroundFetch f) <- fetches]
future_fetches :: IO [IO ()]
future_fetches = sequence [f reqs | FetchToDo reqs (FutureFetch f) <- fetches]
async_fetches :: IO () -> IO ()
async_fetches = compose [f reqs | FetchToDo reqs (AsyncFetch f) <- fetches]
sync_fetches :: IO ()
sync_fetches = sequence_ [f reqs | FetchToDo reqs (SyncFetch f) <- fetches]