-- |
-- Module      : Streamly.Internal.Data.Time.Clock
-- Copyright   : (c) 2021 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : pre-release
-- Portability : GHC

module Streamly.Internal.Data.Time.Clock
    (
    -- * System clock
      Clock(..)
    , getTime

    -- * Async clock
    , asyncClock
    , readClock

    -- * Adjustable Timer
    , Timer
    , timer
    , resetTimer
    , extendTimer
    , shortenTimer
    , readTimer
    , waitTimer
    )
where

import Control.Concurrent (threadDelay, ThreadId)
import Control.Concurrent.MVar (MVar, newEmptyMVar, takeMVar, tryPutMVar)
import Control.Monad (forever, when, void)
import Streamly.Internal.Data.Time.Clock.Type (Clock(..), getTime)
import Streamly.Internal.Data.Time.Units
    (MicroSecond64(..), fromAbsTime, addToAbsTime, toRelTime)
import Streamly.Internal.Control.ForkIO (forkIOManaged)

import qualified Streamly.Internal.Data.IORef.Unboxed as Unboxed

------------------------------------------------------------------------------
-- Async clock
------------------------------------------------------------------------------

{-# INLINE updateTimeVar #-}
updateTimeVar :: Clock -> Unboxed.IORef MicroSecond64 -> IO ()
updateTimeVar :: Clock -> IORef MicroSecond64 -> IO ()
updateTimeVar Clock
clock IORef MicroSecond64
timeVar = do
    MicroSecond64
t <- forall a. TimeUnit a => AbsTime -> a
fromAbsTime forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Clock -> IO AbsTime
getTime Clock
clock
    forall a. Unbox a => IORef a -> (a -> a) -> IO ()
Unboxed.modifyIORef' IORef MicroSecond64
timeVar (forall a b. a -> b -> a
const MicroSecond64
t)

{-# INLINE updateWithDelay #-}
updateWithDelay :: RealFrac a =>
    Clock -> a -> Unboxed.IORef MicroSecond64 -> IO ()
updateWithDelay :: forall a. RealFrac a => Clock -> a -> IORef MicroSecond64 -> IO ()
updateWithDelay Clock
clock a
precision IORef MicroSecond64
timeVar = do
    Int -> IO ()
threadDelay (forall {a} {a}. (Bounded a, RealFrac a, Integral a) => a -> a
delayTime a
precision)
    Clock -> IORef MicroSecond64 -> IO ()
updateTimeVar Clock
clock IORef MicroSecond64
timeVar

    where

    -- Keep the minimum at least a millisecond to avoid high CPU usage
    {-# INLINE delayTime #-}
    delayTime :: a -> a
delayTime a
g
        | a
g' forall a. Ord a => a -> a -> Bool
>= forall a b. (Integral a, Num b) => a -> b
fromIntegral (forall a. Bounded a => a
maxBound :: Int) = forall a. Bounded a => a
maxBound
        | a
g' forall a. Ord a => a -> a -> Bool
< a
1000 = a
1000
        | Bool
otherwise = forall a b. (RealFrac a, Integral b) => a -> b
round a
g'

        where

        g' :: a
g' = a
g forall a. Num a => a -> a -> a
* a
10 forall a b. (Num a, Integral b) => a -> b -> a
^ (Int
6 :: Int)

-- | @asyncClock g@ starts a clock thread that updates an IORef with current
-- time as a 64-bit value in microseconds, every 'g' seconds. The IORef can be
-- read asynchronously.  The thread exits automatically when the reference to
-- the returned 'ThreadId' is lost.
--
-- Minimum granularity of clock update is 1 ms. Higher is better for
-- performance.
--
-- CAUTION! This is safe only on a 64-bit machine. On a 32-bit machine a 64-bit
-- 'Var' cannot be read consistently without a lock while another thread is
-- writing to it.
asyncClock :: Clock -> Double -> IO (ThreadId, Unboxed.IORef MicroSecond64)
asyncClock :: Clock -> Double -> IO (ThreadId, IORef MicroSecond64)
asyncClock Clock
clock Double
g = do
    IORef MicroSecond64
timeVar <- forall a. Unbox a => a -> IO (IORef a)
Unboxed.newIORef MicroSecond64
0
    Clock -> IORef MicroSecond64 -> IO ()
updateTimeVar Clock
clock IORef MicroSecond64
timeVar
    ThreadId
tid <- IO () -> IO ThreadId
forkIOManaged forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (forall a. RealFrac a => Clock -> a -> IORef MicroSecond64 -> IO ()
updateWithDelay Clock
clock Double
g IORef MicroSecond64
timeVar)
    forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId
tid, IORef MicroSecond64
timeVar)

{-# INLINE readClock #-}
readClock :: (ThreadId, Unboxed.IORef MicroSecond64) -> IO MicroSecond64
readClock :: (ThreadId, IORef MicroSecond64) -> IO MicroSecond64
readClock (ThreadId
_, IORef MicroSecond64
timeVar) = forall a. Unbox a => IORef a -> IO a
Unboxed.readIORef IORef MicroSecond64
timeVar

------------------------------------------------------------------------------
-- Adjustable Timer
------------------------------------------------------------------------------

-- | Adjustable periodic timer.
data Timer = Timer ThreadId (MVar ()) (IO ())

-- Set the expiry to current time + timer period
{-# INLINE resetTimerExpiry #-}
resetTimerExpiry :: Clock -> MicroSecond64 -> Unboxed.IORef MicroSecond64 -> IO ()
resetTimerExpiry :: Clock -> MicroSecond64 -> IORef MicroSecond64 -> IO ()
resetTimerExpiry Clock
clock MicroSecond64
period IORef MicroSecond64
timeVar = do
    AbsTime
t <- Clock -> IO AbsTime
getTime Clock
clock
    let t1 :: AbsTime
t1 = AbsTime -> RelTime -> AbsTime
addToAbsTime AbsTime
t (forall a. TimeUnit a => a -> RelTime
toRelTime MicroSecond64
period)
    forall a. Unbox a => IORef a -> (a -> a) -> IO ()
Unboxed.modifyIORef' IORef MicroSecond64
timeVar (forall a b. a -> b -> a
const (forall a. TimeUnit a => AbsTime -> a
fromAbsTime AbsTime
t1))

{-# INLINE processTimerTick #-}
processTimerTick :: RealFrac a =>
    Clock -> a -> Unboxed.IORef MicroSecond64 -> MVar () -> IO () -> IO ()
processTimerTick :: forall a.
RealFrac a =>
Clock -> a -> IORef MicroSecond64 -> MVar () -> IO () -> IO ()
processTimerTick Clock
clock a
precision IORef MicroSecond64
timeVar MVar ()
mvar IO ()
reset = do
    Int -> IO ()
threadDelay (forall {a} {a}. (Bounded a, RealFrac a, Integral a) => a -> a
delayTime a
precision)
    MicroSecond64
t <- forall a. TimeUnit a => AbsTime -> a
fromAbsTime forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Clock -> IO AbsTime
getTime Clock
clock
    MicroSecond64
expiry <- forall a. Unbox a => IORef a -> IO a
Unboxed.readIORef IORef MicroSecond64
timeVar
    forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (MicroSecond64
t forall a. Ord a => a -> a -> Bool
>= MicroSecond64
expiry) forall a b. (a -> b) -> a -> b
$ do
        -- non-blocking put so that we can process multiple timers in a
        -- non-blocking manner in future.
        forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ()
mvar ()
        IO ()
reset

    where

    -- Keep the minimum at least a millisecond to avoid high CPU usage
    {-# INLINE delayTime #-}
    delayTime :: a -> a
delayTime a
g
        | a
g' forall a. Ord a => a -> a -> Bool
>= forall a b. (Integral a, Num b) => a -> b
fromIntegral (forall a. Bounded a => a
maxBound :: Int) = forall a. Bounded a => a
maxBound
        | a
g' forall a. Ord a => a -> a -> Bool
< a
1000 = a
1000
        | Bool
otherwise = forall a b. (RealFrac a, Integral b) => a -> b
round a
g'

        where

        g' :: a
g' = a
g forall a. Num a => a -> a -> a
* a
10 forall a b. (Num a, Integral b) => a -> b -> a
^ (Int
6 :: Int)

-- XXX In future we can add a timer in a heap of timers.
--
-- | @timer clockType granularity period@ creates a timer.  The timer produces
-- timer ticks at specified time intervals that can be waited upon using
-- 'waitTimer'.  If the previous tick is not yet processed, the new tick is
-- lost.
timer :: Clock -> Double -> Double -> IO Timer
timer :: Clock -> Double -> Double -> IO Timer
timer Clock
clock Double
g Double
period = do
    MVar ()
mvar <- forall a. IO (MVar a)
newEmptyMVar
    IORef MicroSecond64
timeVar <- forall a. Unbox a => a -> IO (IORef a)
Unboxed.newIORef MicroSecond64
0
    let p :: Int
p = forall a b. (RealFrac a, Integral b) => a -> b
round (Double
period forall a. Num a => a -> a -> a
* Double
1e6) :: Int
        p1 :: MicroSecond64
p1 = forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
p :: MicroSecond64
        reset :: IO ()
reset = Clock -> MicroSecond64 -> IORef MicroSecond64 -> IO ()
resetTimerExpiry Clock
clock MicroSecond64
p1 IORef MicroSecond64
timeVar
        process :: IO ()
process = forall a.
RealFrac a =>
Clock -> a -> IORef MicroSecond64 -> MVar () -> IO () -> IO ()
processTimerTick Clock
clock Double
g IORef MicroSecond64
timeVar MVar ()
mvar IO ()
reset
    IO ()
reset
    ThreadId
tid <- IO () -> IO ThreadId
forkIOManaged forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Applicative f => f a -> f b
forever IO ()
process
    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ ThreadId -> MVar () -> IO () -> Timer
Timer ThreadId
tid MVar ()
mvar IO ()
reset

-- | Blocking wait for a timer tick.
{-# INLINE waitTimer #-}
waitTimer :: Timer -> IO ()
waitTimer :: Timer -> IO ()
waitTimer (Timer ThreadId
_ MVar ()
mvar IO ()
_) = forall a. MVar a -> IO a
takeMVar MVar ()
mvar

-- | Resets the current period.
{-# INLINE resetTimer #-}
resetTimer :: Timer -> IO ()
resetTimer :: Timer -> IO ()
resetTimer (Timer ThreadId
_ MVar ()
_ IO ()
reset) = IO ()
reset

-- | Elongates the current period by specified amount.
--
-- /Unimplemented/
{-# INLINE extendTimer #-}
extendTimer :: Timer -> Double -> IO ()
extendTimer :: Timer -> Double -> IO ()
extendTimer = forall a. HasCallStack => a
undefined

-- | Shortens the current period by specified amount.
--
-- /Unimplemented/
{-# INLINE shortenTimer #-}
shortenTimer :: Timer -> Double -> IO ()
shortenTimer :: Timer -> Double -> IO ()
shortenTimer = forall a. HasCallStack => a
undefined

-- | Show the remaining time in the current time period.
--
-- /Unimplemented/
{-# INLINE readTimer #-}
readTimer :: Timer -> IO Double
readTimer :: Timer -> IO Double
readTimer = forall a. HasCallStack => a
undefined