{-# LANGUAGE ScopedTypeVariables #-}
module Control.RateLimit (
generateRateLimitedFunction
, RateLimit(..)
, ResultsCombiner
, dontCombine
, rateLimitInvocation
, rateLimitExecution
) where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad (void)
import Data.Functor (($>))
import Data.Time.Clock.POSIX (getPOSIXTime)
import Data.Time.Units
data RateLimit a
= PerInvocation a
| PerExecution a
type ResultsCombiner req resp = req -> req -> Maybe (req, resp -> (resp, resp))
dontCombine :: ResultsCombiner a b
dontCombine :: forall a b. ResultsCombiner a b
dontCombine a
_ a
_ = forall a. Maybe a
Nothing
rateLimitInvocation :: TimeUnit t
=> t
-> (req -> IO resp)
-> IO (req -> IO resp)
rateLimitInvocation :: forall t req resp.
TimeUnit t =>
t -> (req -> IO resp) -> IO (req -> IO resp)
rateLimitInvocation t
pertime req -> IO resp
action =
forall req resp t.
TimeUnit t =>
RateLimit t
-> (req -> IO resp)
-> ResultsCombiner req resp
-> IO (req -> IO resp)
generateRateLimitedFunction (forall a. a -> RateLimit a
PerInvocation t
pertime) req -> IO resp
action forall a b. ResultsCombiner a b
dontCombine
rateLimitExecution :: TimeUnit t
=> t
-> (req -> IO resp)
-> IO (req -> IO resp)
rateLimitExecution :: forall t req resp.
TimeUnit t =>
t -> (req -> IO resp) -> IO (req -> IO resp)
rateLimitExecution t
pertime req -> IO resp
action =
forall req resp t.
TimeUnit t =>
RateLimit t
-> (req -> IO resp)
-> ResultsCombiner req resp
-> IO (req -> IO resp)
generateRateLimitedFunction (forall a. a -> RateLimit a
PerExecution t
pertime) req -> IO resp
action forall a b. ResultsCombiner a b
dontCombine
generateRateLimitedFunction :: forall req resp t
. TimeUnit t
=> RateLimit t
-> (req -> IO resp)
-> ResultsCombiner req resp
-> IO (req -> IO resp)
generateRateLimitedFunction :: forall req resp t.
TimeUnit t =>
RateLimit t
-> (req -> IO resp)
-> ResultsCombiner req resp
-> IO (req -> IO resp)
generateRateLimitedFunction RateLimit t
ratelimit req -> IO resp
action ResultsCombiner req resp
combiner = do
TChan (req, MVar resp)
chan <- forall a. STM a -> IO a
atomically forall a. STM (TChan a)
newTChan
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ forall a.
Maybe Integer -> Integer -> TChan (req, MVar resp) -> IO a
runner forall a. Maybe a
Nothing Integer
0 TChan (req, MVar resp)
chan
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ TChan (req, MVar resp) -> req -> IO resp
resultFunction TChan (req, MVar resp)
chan
where
currentMicroseconds :: IO Integer
currentMicroseconds :: IO Integer
currentMicroseconds =
forall a. TimeUnit a => a -> Integer
toMicroseconds forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall a b. (Integral a, Num b) => a -> b
fromIntegral :: Int -> Picosecond) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Enum a => a -> Int
fromEnum forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
IO POSIXTime
getPOSIXTime
runner :: Maybe Integer -> Integer -> TChan (req, MVar resp) -> IO a
runner :: forall a.
Maybe Integer -> Integer -> TChan (req, MVar resp) -> IO a
runner Maybe Integer
mLastRun Integer
lastAllowance TChan (req, MVar resp)
chan = do
(req
req, MVar resp
respMV) <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TChan a -> STM a
readTChan TChan (req, MVar resp)
chan
let baseHandler :: resp -> IO ()
baseHandler resp
resp = forall a. MVar a -> a -> IO ()
putMVar MVar resp
respMV resp
resp
Integer
beforeWait <- IO Integer
currentMicroseconds
let targetPeriod :: Integer
targetPeriod = forall a. TimeUnit a => a -> Integer
toMicroseconds forall a b. (a -> b) -> a -> b
$ RateLimit t -> t
getRate RateLimit t
ratelimit
timeSinceLastRun :: Integer
timeSinceLastRun = case Maybe Integer
mLastRun of
Just Integer
lastRun -> Integer
beforeWait forall a. Num a => a -> a -> a
- Integer
lastRun
Maybe Integer
Nothing -> forall a. Num a => a -> a
negate Integer
targetPeriod
targetDelay :: Integer
targetDelay = Integer
targetPeriod forall a. Num a => a -> a -> a
- Integer
timeSinceLastRun forall a. Num a => a -> a -> a
- Integer
lastAllowance
Integer
nextAllowance <- if Integer
targetDelay forall a. Ord a => a -> a -> Bool
< Integer
0
then forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a. Num a => a -> a
abs Integer
targetDelay
else do
Int -> IO ()
threadDelay forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral Integer
targetDelay
Integer
afterWait <- IO Integer
currentMicroseconds
let slept :: Integer
slept = Integer
afterWait forall a. Num a => a -> a -> a
- Integer
beforeWait
overslept :: Integer
overslept = Integer
slept forall a. Num a => a -> a -> a
- Integer
targetDelay
forall (m :: * -> *) a. Monad m => a -> m a
return Integer
overslept
(req
req', resp -> IO ()
finalHandler) <- TChan (req, MVar resp)
-> req -> (resp -> IO ()) -> IO (req, resp -> IO ())
updateRequestWithFollowers TChan (req, MVar resp)
chan req
req resp -> IO ()
baseHandler
let run :: IO ()
run = req -> IO resp
action req
req' forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= resp -> IO ()
finalHandler
Integer
beforeRun <- IO Integer
currentMicroseconds
if RateLimit t -> Bool
shouldFork RateLimit t
ratelimit
then forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO IO ()
run
else IO ()
run
forall a.
Maybe Integer -> Integer -> TChan (req, MVar resp) -> IO a
runner (forall a. a -> Maybe a
Just Integer
beforeRun) Integer
nextAllowance TChan (req, MVar resp)
chan
updateRequestWithFollowers :: TChan (req, MVar resp)
-> req
-> (resp -> IO ())
-> IO (req, (resp -> IO ()))
updateRequestWithFollowers :: TChan (req, MVar resp)
-> req -> (resp -> IO ()) -> IO (req, resp -> IO ())
updateRequestWithFollowers TChan (req, MVar resp)
chan req
req resp -> IO ()
handler = do
Bool
isEmpty <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TChan a -> STM Bool
isEmptyTChan TChan (req, MVar resp)
chan
if Bool
isEmpty
then forall (m :: * -> *) a. Monad m => a -> m a
return (req
req, resp -> IO ()
handler)
else do Maybe ((req, resp -> (resp, resp)), MVar resp)
mCombinedAndMV <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
tup :: (req, MVar resp)
tup@(req
next, MVar resp
nextRespMV) <- forall a. TChan a -> STM a
readTChan TChan (req, MVar resp)
chan
case ResultsCombiner req resp
combiner req
req req
next of
Maybe (req, resp -> (resp, resp))
Nothing -> forall a. TChan a -> a -> STM ()
unGetTChan TChan (req, MVar resp)
chan (req, MVar resp)
tup forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> forall a. Maybe a
Nothing
Just (req, resp -> (resp, resp))
combined -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just ((req, resp -> (resp, resp))
combined, MVar resp
nextRespMV)
case Maybe ((req, resp -> (resp, resp)), MVar resp)
mCombinedAndMV of
Maybe ((req, resp -> (resp, resp)), MVar resp)
Nothing ->
forall (m :: * -> *) a. Monad m => a -> m a
return (req
req, resp -> IO ()
handler)
Just ((req
req', resp -> (resp, resp)
splitResponse), MVar resp
nextRespMV) ->
TChan (req, MVar resp)
-> req -> (resp -> IO ()) -> IO (req, resp -> IO ())
updateRequestWithFollowers TChan (req, MVar resp)
chan req
req' forall a b. (a -> b) -> a -> b
$ \resp
resp -> do
let (resp
theirs, resp
mine) = resp -> (resp, resp)
splitResponse resp
resp
forall a. MVar a -> a -> IO ()
putMVar MVar resp
nextRespMV resp
mine
resp -> IO ()
handler resp
theirs
shouldFork :: RateLimit t -> Bool
shouldFork :: RateLimit t -> Bool
shouldFork (PerInvocation t
_) = Bool
True
shouldFork (PerExecution t
_) = Bool
False
getRate :: RateLimit t -> t
getRate :: RateLimit t -> t
getRate (PerInvocation t
x) = t
x
getRate (PerExecution t
x) = t
x
resultFunction :: TChan (req, MVar resp) -> req -> IO resp
resultFunction :: TChan (req, MVar resp) -> req -> IO resp
resultFunction TChan (req, MVar resp)
chan req
req = do
MVar resp
respMV <- forall a. IO (MVar a)
newEmptyMVar
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TChan a -> a -> STM ()
writeTChan TChan (req, MVar resp)
chan (req
req, MVar resp
respMV)
forall a. MVar a -> IO a
takeMVar MVar resp
respMV