module Pipes.RealTime (
timeCat,
timeCatDelayedBy,
relativeTimeCat,
relativeTimeCatDelayedBy,
steadyCat,
poissonCat,
poissonCatConst,
genPoissonCat,
catAtTimes,
catAtRelativeTimes,
) where
import Prelude hiding (dropWhile)
import Pipes
import Pipes.Prelude (chain, dropWhile)
import Control.Concurrent (threadDelay)
import Data.Time.Clock
import Data.Time.Calendar
import System.Random.MWC
import qualified System.Random.MWC.Distributions as MWCDists
relativeTimeCat :: (MonadIO m) => (a -> Double) -> Pipe a a m r
relativeTimeCat toRelTime = do
t0 <- liftIO getCurrentTime
dropWhile (( < 0 ) . toRelTime) >->
chain (\v -> liftIO $ pauseUntil
(doubleToNomDiffTime (toRelTime v) `addUTCTime` t0))
relativeTimeCatDelayedBy :: (MonadIO m) => (a -> Double) -> Double
-> Pipe a a m r
relativeTimeCatDelayedBy toTime delay = relativeTimeCat toTime'
where toTime' = ((+ delay) . toTime)
timeCat :: (MonadIO m) => (a -> UTCTime) -> Pipe a a m r
timeCat toTime = do
t0 <- liftIO getCurrentTime
dropWhile (( < t0 ) . toTime) >->
chain (liftIO . pauseUntil . toTime)
timeCatDelayedBy :: (MonadIO m) => (a -> UTCTime) -> Double -> Pipe a a m r
timeCatDelayedBy toTime delay = do
timeCat $ toTime'
where toTime' = (doubleToNomDiffTime delay `addUTCTime`) . toTime
steadyCat :: (MonadIO m) => Double -> Pipe a a m r
steadyCat rate = do
t0 <- liftIO getCurrentTime
loop t0
where
dtUTC = doubleToNomDiffTime (1/rate)
loop t =
let t' = dtUTC `addUTCTime` t in do
liftIO $ pauseUntil t'
v <- await
yield v
loop t'
poissonCat :: (MonadIO m) => Double -> Pipe a a m r
poissonCat rate = liftIO createSystemRandom >>= \gen ->
genPoissonCat gen rate
poissonCatConst :: (MonadIO m) => Double -> Pipe a a m r
poissonCatConst rate = liftIO create >>= \gen ->
genPoissonCat gen rate
genPoissonCat :: (MonadIO m) => GenIO -> Double -> Pipe a a m r
genPoissonCat gen rate = do
t0 <- liftIO getCurrentTime
loop t0
where
loop t = do
v <- await
dt <- liftIO $ MWCDists.exponential rate gen
let t' = addUTCTime (doubleToNomDiffTime dt) t
liftIO $ pauseUntil t'
yield v
loop t'
catAtTimes :: (MonadIO m) => [UTCTime] -> Pipe a a m r
catAtTimes [] = cat
catAtTimes (t:ts) = do
liftIO $ pauseUntil t
v <- await
yield v
catAtTimes ts
catAtRelativeTimes :: (MonadIO m) => [Double] -> Pipe a a m r
catAtRelativeTimes [] = cat
catAtRelativeTimes ts@(_:_) = liftIO absTimes >>= catAtTimes
where absTimes =
getCurrentTime >>= \t0 ->
return $ map (\d -> doubleToNomDiffTime d `addUTCTime` t0) ts
pauseUntil :: UTCTime -> IO ()
pauseUntil t = do
now <- getCurrentTime
case compare now t of
LT -> threadDelay (truncate (diffUTCTime t now * 1000000))
_ -> return ()
doubleToNomDiffTime :: Double -> NominalDiffTime
doubleToNomDiffTime x =
let d0 = ModifiedJulianDay 0
t0 = UTCTime d0 (picosecondsToDiffTime 0)
t1 = UTCTime d0 (picosecondsToDiffTime $ floor (x/1e-12))
in diffUTCTime t1 t0