module Streamly.Internal.Data.Time.Clock
(
Clock(..)
, getTime
, asyncClock
, readClock
, Timer
, timer
, resetTimer
, extendTimer
, shortenTimer
, readTimer
, waitTimer
)
where
import Control.Concurrent (threadDelay, ThreadId)
import Control.Concurrent.MVar (MVar, newEmptyMVar, takeMVar, tryPutMVar)
import Control.Monad (forever, when, void)
import Streamly.Internal.Data.Time.Clock.Type (Clock(..), getTime)
import Streamly.Internal.Data.Time.Units
(MicroSecond64(..), fromAbsTime, addToAbsTime, toRelTime)
import Streamly.Internal.Control.ForkIO (forkIOManaged)
import qualified Streamly.Internal.Data.IORef.Unboxed as Unboxed
{-# INLINE updateTimeVar #-}
updateTimeVar :: Clock -> Unboxed.IORef MicroSecond64 -> IO ()
updateTimeVar :: Clock -> IORef MicroSecond64 -> IO ()
updateTimeVar Clock
clock IORef MicroSecond64
timeVar = do
MicroSecond64
t <- forall a. TimeUnit a => AbsTime -> a
fromAbsTime forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Clock -> IO AbsTime
getTime Clock
clock
forall a. Unbox a => IORef a -> (a -> a) -> IO ()
Unboxed.modifyIORef' IORef MicroSecond64
timeVar (forall a b. a -> b -> a
const MicroSecond64
t)
{-# INLINE updateWithDelay #-}
updateWithDelay :: RealFrac a =>
Clock -> a -> Unboxed.IORef MicroSecond64 -> IO ()
updateWithDelay :: forall a. RealFrac a => Clock -> a -> IORef MicroSecond64 -> IO ()
updateWithDelay Clock
clock a
precision IORef MicroSecond64
timeVar = do
Int -> IO ()
threadDelay (forall {a} {a}. (Bounded a, RealFrac a, Integral a) => a -> a
delayTime a
precision)
Clock -> IORef MicroSecond64 -> IO ()
updateTimeVar Clock
clock IORef MicroSecond64
timeVar
where
{-# INLINE delayTime #-}
delayTime :: a -> a
delayTime a
g
| a
g' forall a. Ord a => a -> a -> Bool
>= forall a b. (Integral a, Num b) => a -> b
fromIntegral (forall a. Bounded a => a
maxBound :: Int) = forall a. Bounded a => a
maxBound
| a
g' forall a. Ord a => a -> a -> Bool
< a
1000 = a
1000
| Bool
otherwise = forall a b. (RealFrac a, Integral b) => a -> b
round a
g'
where
g' :: a
g' = a
g forall a. Num a => a -> a -> a
* a
10 forall a b. (Num a, Integral b) => a -> b -> a
^ (Int
6 :: Int)
asyncClock :: Clock -> Double -> IO (ThreadId, Unboxed.IORef MicroSecond64)
asyncClock :: Clock -> Double -> IO (ThreadId, IORef MicroSecond64)
asyncClock Clock
clock Double
g = do
IORef MicroSecond64
timeVar <- forall a. Unbox a => a -> IO (IORef a)
Unboxed.newIORef MicroSecond64
0
Clock -> IORef MicroSecond64 -> IO ()
updateTimeVar Clock
clock IORef MicroSecond64
timeVar
ThreadId
tid <- IO () -> IO ThreadId
forkIOManaged forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (forall a. RealFrac a => Clock -> a -> IORef MicroSecond64 -> IO ()
updateWithDelay Clock
clock Double
g IORef MicroSecond64
timeVar)
forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId
tid, IORef MicroSecond64
timeVar)
{-# INLINE readClock #-}
readClock :: (ThreadId, Unboxed.IORef MicroSecond64) -> IO MicroSecond64
readClock :: (ThreadId, IORef MicroSecond64) -> IO MicroSecond64
readClock (ThreadId
_, IORef MicroSecond64
timeVar) = forall a. Unbox a => IORef a -> IO a
Unboxed.readIORef IORef MicroSecond64
timeVar
data Timer = Timer ThreadId (MVar ()) (IO ())
{-# INLINE resetTimerExpiry #-}
resetTimerExpiry :: Clock -> MicroSecond64 -> Unboxed.IORef MicroSecond64 -> IO ()
resetTimerExpiry :: Clock -> MicroSecond64 -> IORef MicroSecond64 -> IO ()
resetTimerExpiry Clock
clock MicroSecond64
period IORef MicroSecond64
timeVar = do
AbsTime
t <- Clock -> IO AbsTime
getTime Clock
clock
let t1 :: AbsTime
t1 = AbsTime -> RelTime -> AbsTime
addToAbsTime AbsTime
t (forall a. TimeUnit a => a -> RelTime
toRelTime MicroSecond64
period)
forall a. Unbox a => IORef a -> (a -> a) -> IO ()
Unboxed.modifyIORef' IORef MicroSecond64
timeVar (forall a b. a -> b -> a
const (forall a. TimeUnit a => AbsTime -> a
fromAbsTime AbsTime
t1))
{-# INLINE processTimerTick #-}
processTimerTick :: RealFrac a =>
Clock -> a -> Unboxed.IORef MicroSecond64 -> MVar () -> IO () -> IO ()
processTimerTick :: forall a.
RealFrac a =>
Clock -> a -> IORef MicroSecond64 -> MVar () -> IO () -> IO ()
processTimerTick Clock
clock a
precision IORef MicroSecond64
timeVar MVar ()
mvar IO ()
reset = do
Int -> IO ()
threadDelay (forall {a} {a}. (Bounded a, RealFrac a, Integral a) => a -> a
delayTime a
precision)
MicroSecond64
t <- forall a. TimeUnit a => AbsTime -> a
fromAbsTime forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Clock -> IO AbsTime
getTime Clock
clock
MicroSecond64
expiry <- forall a. Unbox a => IORef a -> IO a
Unboxed.readIORef IORef MicroSecond64
timeVar
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (MicroSecond64
t forall a. Ord a => a -> a -> Bool
>= MicroSecond64
expiry) forall a b. (a -> b) -> a -> b
$ do
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ()
mvar ()
IO ()
reset
where
{-# INLINE delayTime #-}
delayTime :: a -> a
delayTime a
g
| a
g' forall a. Ord a => a -> a -> Bool
>= forall a b. (Integral a, Num b) => a -> b
fromIntegral (forall a. Bounded a => a
maxBound :: Int) = forall a. Bounded a => a
maxBound
| a
g' forall a. Ord a => a -> a -> Bool
< a
1000 = a
1000
| Bool
otherwise = forall a b. (RealFrac a, Integral b) => a -> b
round a
g'
where
g' :: a
g' = a
g forall a. Num a => a -> a -> a
* a
10 forall a b. (Num a, Integral b) => a -> b -> a
^ (Int
6 :: Int)
timer :: Clock -> Double -> Double -> IO Timer
timer :: Clock -> Double -> Double -> IO Timer
timer Clock
clock Double
g Double
period = do
MVar ()
mvar <- forall a. IO (MVar a)
newEmptyMVar
IORef MicroSecond64
timeVar <- forall a. Unbox a => a -> IO (IORef a)
Unboxed.newIORef MicroSecond64
0
let p :: Int
p = forall a b. (RealFrac a, Integral b) => a -> b
round (Double
period forall a. Num a => a -> a -> a
* Double
1e6) :: Int
p1 :: MicroSecond64
p1 = forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
p :: MicroSecond64
reset :: IO ()
reset = Clock -> MicroSecond64 -> IORef MicroSecond64 -> IO ()
resetTimerExpiry Clock
clock MicroSecond64
p1 IORef MicroSecond64
timeVar
process :: IO ()
process = forall a.
RealFrac a =>
Clock -> a -> IORef MicroSecond64 -> MVar () -> IO () -> IO ()
processTimerTick Clock
clock Double
g IORef MicroSecond64
timeVar MVar ()
mvar IO ()
reset
IO ()
reset
ThreadId
tid <- IO () -> IO ThreadId
forkIOManaged forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Applicative f => f a -> f b
forever IO ()
process
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ ThreadId -> MVar () -> IO () -> Timer
Timer ThreadId
tid MVar ()
mvar IO ()
reset
{-# INLINE waitTimer #-}
waitTimer :: Timer -> IO ()
waitTimer :: Timer -> IO ()
waitTimer (Timer ThreadId
_ MVar ()
mvar IO ()
_) = forall a. MVar a -> IO a
takeMVar MVar ()
mvar
{-# INLINE resetTimer #-}
resetTimer :: Timer -> IO ()
resetTimer :: Timer -> IO ()
resetTimer (Timer ThreadId
_ MVar ()
_ IO ()
reset) = IO ()
reset
{-# INLINE extendTimer #-}
extendTimer :: Timer -> Double -> IO ()
extendTimer :: Timer -> Double -> IO ()
extendTimer = forall a. HasCallStack => a
undefined
{-# INLINE shortenTimer #-}
shortenTimer :: Timer -> Double -> IO ()
shortenTimer :: Timer -> Double -> IO ()
shortenTimer = forall a. HasCallStack => a
undefined
{-# INLINE readTimer #-}
readTimer :: Timer -> IO Double
readTimer :: Timer -> IO Double
readTimer = forall a. HasCallStack => a
undefined