-- | Lifted "Control.Concurrent".
--
-- For functions that spawn threads, the order of preference for their usage is
-- recommended as follows:
--
-- 1) High level functions from "Effectful.Concurrent.Async" such as
--    'Effectful.Concurrent.Async.withAsync',
--    'Effectful.Concurrent.Async.concurrently' or
--    'Effectful.Concurrent.Async.mapConcurrently'.
--
-- 2) Low level functions from "Effectful.Concurrent.Async" such as
--    'Effectful.Concurrent.Async.async'.
--
-- 3) Low level functions from "Effectful.Concurrent" such as 'forkIO'.
module Effectful.Concurrent
  ( -- * Effect
    Concurrent

    -- ** Handlers
  , runConcurrent

    -- * Basic concurrency operations
  , myThreadId
  , forkIO
  , forkFinally
  , forkIOWithUnmask
  , killThread
  , throwTo

    -- ** Threads with affinity
  , forkOn
  , forkOnWithUnmask
  , getNumCapabilities
  , setNumCapabilities
  , getNumProcessors
  , threadCapability

    -- * Scheduling
  , yield

    -- ** Waiting
  , threadDelay
  , threadWaitRead
  , threadWaitWrite
  , threadWaitReadSTM
  , threadWaitWriteSTM

    -- * Bound threads
  , forkOS
  , forkOSWithUnmask
  , isCurrentThreadBound
  , runInBoundThread
  , runInUnboundThread

    -- * Weak references to ThreadIds
  , mkWeakThreadId

    -- * Re-exports
  , C.rtsSupportsBoundThreads
  ) where

import Control.Concurrent qualified as C
import Control.Exception (Exception, SomeException)
import Data.Bifunctor (second)
import GHC.Conc qualified as GHC
import System.Mem.Weak (Weak)
import System.Posix.Types (Fd)
import UnliftIO.STM (STM)

import Effectful
import Effectful.Concurrent.Effect
import Effectful.Dispatch.Static
import Effectful.Dispatch.Static.Primitive
import Effectful.Dispatch.Static.Unsafe

----------------------------------------
-- Basic concurrency operations

-- | Lifted 'C.myThreadId'.
myThreadId :: Concurrent :> es => Eff es C.ThreadId
myThreadId :: forall (es :: [Effect]). (Concurrent :> es) => Eff es ThreadId
myThreadId = IO ThreadId -> Eff es ThreadId
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ IO ThreadId
C.myThreadId

-- | Lifted 'C.forkIO'.
forkIO :: Concurrent :> es => Eff es () -> Eff es C.ThreadId
forkIO :: forall (es :: [Effect]).
(Concurrent :> es) =>
Eff es () -> Eff es ThreadId
forkIO Eff es ()
k = (Env es -> IO ThreadId) -> Eff es ThreadId
forall (es :: [Effect]) a. (Env es -> IO a) -> Eff es a
unsafeEff ((Env es -> IO ThreadId) -> Eff es ThreadId)
-> (Env es -> IO ThreadId) -> Eff es ThreadId
forall a b. (a -> b) -> a -> b
$ \Env es
es -> do
  Env es
esF <- Env es -> IO (Env es)
forall (es :: [Effect]). Env es -> IO (Env es)
cloneEnv Env es
es
  IO () -> IO ThreadId
C.forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Eff es () -> Env es -> IO ()
forall (es :: [Effect]) a. Eff es a -> Env es -> IO a
unEff Eff es ()
k Env es
esF

-- | Lifted 'C.forkFinally'.
forkFinally
  :: Concurrent :> es
  => Eff es a
  -> (Either SomeException a -> Eff es ())
  -> Eff es C.ThreadId
forkFinally :: forall (es :: [Effect]) a.
(Concurrent :> es) =>
Eff es a
-> (Either SomeException a -> Eff es ()) -> Eff es ThreadId
forkFinally Eff es a
k Either SomeException a -> Eff es ()
cleanup = (Env es -> IO ThreadId) -> Eff es ThreadId
forall (es :: [Effect]) a. (Env es -> IO a) -> Eff es a
unsafeEff ((Env es -> IO ThreadId) -> Eff es ThreadId)
-> (Env es -> IO ThreadId) -> Eff es ThreadId
forall a b. (a -> b) -> a -> b
$ \Env es
es -> do
  Env es
esF <- Env es -> IO (Env es)
forall (es :: [Effect]). Env es -> IO (Env es)
cloneEnv Env es
es
  IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
C.forkFinally (Eff es a -> Env es -> IO a
forall (es :: [Effect]) a. Eff es a -> Env es -> IO a
unEff Eff es a
k Env es
esF) ((Eff es () -> Env es -> IO ()
forall (es :: [Effect]) a. Eff es a -> Env es -> IO a
`unEff` Env es
esF) (Eff es () -> IO ())
-> (Either SomeException a -> Eff es ())
-> Either SomeException a
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Either SomeException a -> Eff es ()
cleanup)

-- | Lifted 'C.forkIOWithUnmask'.
forkIOWithUnmask
  :: Concurrent :> es
  => ((forall a. Eff es a -> Eff es a) -> Eff es ())
  -> Eff es C.ThreadId
forkIOWithUnmask :: forall (es :: [Effect]).
(Concurrent :> es) =>
((forall a. Eff es a -> Eff es a) -> Eff es ()) -> Eff es ThreadId
forkIOWithUnmask = (((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId)
-> ((forall a. Eff es a -> Eff es a) -> Eff es ())
-> Eff es ThreadId
forall a (es :: [Effect]).
(((forall c. IO c -> IO c) -> IO a) -> IO ThreadId)
-> ((forall c. Eff es c -> Eff es c) -> Eff es a)
-> Eff es ThreadId
liftForkWithUnmask ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId
C.forkIOWithUnmask

-- | Lifted 'C.killThread'.
killThread :: Concurrent :> es => C.ThreadId -> Eff es ()
killThread :: forall (es :: [Effect]).
(Concurrent :> es) =>
ThreadId -> Eff es ()
killThread = IO () -> Eff es ()
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ (IO () -> Eff es ())
-> (ThreadId -> IO ()) -> ThreadId -> Eff es ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ThreadId -> IO ()
C.killThread

-- | Lifted 'C.throwTo'.
throwTo :: (Concurrent :> es, Exception e) => C.ThreadId -> e -> Eff es ()
throwTo :: forall (es :: [Effect]) e.
(Concurrent :> es, Exception e) =>
ThreadId -> e -> Eff es ()
throwTo ThreadId
tid = IO () -> Eff es ()
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ (IO () -> Eff es ()) -> (e -> IO ()) -> e -> Eff es ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ThreadId -> e -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
C.throwTo ThreadId
tid

----------------------------------------
-- Threads with affinity

-- | Lifted 'C.forkOn'.
forkOn :: Concurrent :> es => Int -> Eff es () -> Eff es C.ThreadId
forkOn :: forall (es :: [Effect]).
(Concurrent :> es) =>
Int -> Eff es () -> Eff es ThreadId
forkOn Int
n Eff es ()
k = (Env es -> IO ThreadId) -> Eff es ThreadId
forall (es :: [Effect]) a. (Env es -> IO a) -> Eff es a
unsafeEff ((Env es -> IO ThreadId) -> Eff es ThreadId)
-> (Env es -> IO ThreadId) -> Eff es ThreadId
forall a b. (a -> b) -> a -> b
$ \Env es
es -> do
  Env es
esF <- Env es -> IO (Env es)
forall (es :: [Effect]). Env es -> IO (Env es)
cloneEnv Env es
es
  Int -> IO () -> IO ThreadId
C.forkOn Int
n (Eff es () -> Env es -> IO ()
forall (es :: [Effect]) a. Eff es a -> Env es -> IO a
unEff Eff es ()
k Env es
esF)

-- | Lifted 'C.forkOnWithUnmask'.
forkOnWithUnmask
  :: Concurrent :> es
  => Int
  -> ((forall a. Eff es a -> Eff es a) -> Eff es ())
  -> Eff es C.ThreadId
forkOnWithUnmask :: forall (es :: [Effect]).
(Concurrent :> es) =>
Int
-> ((forall a. Eff es a -> Eff es a) -> Eff es ())
-> Eff es ThreadId
forkOnWithUnmask Int
n = (((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId)
-> ((forall a. Eff es a -> Eff es a) -> Eff es ())
-> Eff es ThreadId
forall a (es :: [Effect]).
(((forall c. IO c -> IO c) -> IO a) -> IO ThreadId)
-> ((forall c. Eff es c -> Eff es c) -> Eff es a)
-> Eff es ThreadId
liftForkWithUnmask (Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId
C.forkOnWithUnmask Int
n)

-- | Lifted 'C.getNumCapabilities'.
getNumCapabilities :: Concurrent :> es => Eff es Int
getNumCapabilities :: forall (es :: [Effect]). (Concurrent :> es) => Eff es Int
getNumCapabilities = IO Int -> Eff es Int
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ IO Int
C.getNumCapabilities

-- | Lifted 'C.setNumCapabilities'.
setNumCapabilities :: Concurrent :> es => Int -> Eff es ()
setNumCapabilities :: forall (es :: [Effect]). (Concurrent :> es) => Int -> Eff es ()
setNumCapabilities = IO () -> Eff es ()
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ (IO () -> Eff es ()) -> (Int -> IO ()) -> Int -> Eff es ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO ()
C.setNumCapabilities

-- | Lifted 'GHC.getNumProcessors'.
getNumProcessors :: Concurrent :> es => Eff es Int
getNumProcessors :: forall (es :: [Effect]). (Concurrent :> es) => Eff es Int
getNumProcessors = IO Int -> Eff es Int
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ IO Int
GHC.getNumProcessors

-- | Lifted 'C.threadCapability'.
threadCapability :: Concurrent :> es => C.ThreadId -> Eff es (Int, Bool)
threadCapability :: forall (es :: [Effect]).
(Concurrent :> es) =>
ThreadId -> Eff es (Int, Bool)
threadCapability = IO (Int, Bool) -> Eff es (Int, Bool)
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ (IO (Int, Bool) -> Eff es (Int, Bool))
-> (ThreadId -> IO (Int, Bool)) -> ThreadId -> Eff es (Int, Bool)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ThreadId -> IO (Int, Bool)
C.threadCapability

----------------------------------------
-- Scheduling

-- | Lifted 'C.yield'.
yield :: Concurrent :> es => Eff es ()
yield :: forall (es :: [Effect]). (Concurrent :> es) => Eff es ()
yield = IO () -> Eff es ()
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ IO ()
C.yield

----------------------------------------
-- Waiting

-- | Lifted 'C.threadDelay'.
threadDelay :: Concurrent :> es => Int -> Eff es ()
threadDelay :: forall (es :: [Effect]). (Concurrent :> es) => Int -> Eff es ()
threadDelay = IO () -> Eff es ()
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ (IO () -> Eff es ()) -> (Int -> IO ()) -> Int -> Eff es ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO ()
C.threadDelay

-- | Lifted 'C.threadWaitRead'.
threadWaitRead :: Concurrent :> es => Fd -> Eff es ()
threadWaitRead :: forall (es :: [Effect]). (Concurrent :> es) => Fd -> Eff es ()
threadWaitRead = IO () -> Eff es ()
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ (IO () -> Eff es ()) -> (Fd -> IO ()) -> Fd -> Eff es ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Fd -> IO ()
C.threadWaitRead

-- | Lifted 'C.threadWaitWrite'.
threadWaitWrite :: Concurrent :> es => Fd -> Eff es ()
threadWaitWrite :: forall (es :: [Effect]). (Concurrent :> es) => Fd -> Eff es ()
threadWaitWrite = IO () -> Eff es ()
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ (IO () -> Eff es ()) -> (Fd -> IO ()) -> Fd -> Eff es ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Fd -> IO ()
C.threadWaitWrite

-- | Lifted 'C.threadWaitReadSTM'.
threadWaitReadSTM :: Concurrent :> es => Fd -> Eff es (STM (), Eff es ())
threadWaitReadSTM :: forall (es :: [Effect]).
(Concurrent :> es) =>
Fd -> Eff es (STM (), Eff es ())
threadWaitReadSTM Fd
fd = IO (STM (), Eff es ()) -> Eff es (STM (), Eff es ())
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ (IO (STM (), Eff es ()) -> Eff es (STM (), Eff es ()))
-> IO (STM (), Eff es ()) -> Eff es (STM (), Eff es ())
forall a b. (a -> b) -> a -> b
$ do
  (IO () -> Eff es ()) -> (STM (), IO ()) -> (STM (), Eff es ())
forall b c a. (b -> c) -> (a, b) -> (a, c)
forall (p :: Type -> Type -> Type) b c a.
Bifunctor p =>
(b -> c) -> p a b -> p a c
second IO () -> Eff es ()
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ ((STM (), IO ()) -> (STM (), Eff es ()))
-> IO (STM (), IO ()) -> IO (STM (), Eff es ())
forall (f :: Type -> Type) a b. Functor f => (a -> b) -> f a -> f b
<$> Fd -> IO (STM (), IO ())
C.threadWaitReadSTM Fd
fd

-- | Lifted 'C.threadWaitWriteSTM'.
threadWaitWriteSTM :: Concurrent :> es => Fd -> Eff es (STM (), Eff es ())
threadWaitWriteSTM :: forall (es :: [Effect]).
(Concurrent :> es) =>
Fd -> Eff es (STM (), Eff es ())
threadWaitWriteSTM Fd
fd = IO (STM (), Eff es ()) -> Eff es (STM (), Eff es ())
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ (IO (STM (), Eff es ()) -> Eff es (STM (), Eff es ()))
-> IO (STM (), Eff es ()) -> Eff es (STM (), Eff es ())
forall a b. (a -> b) -> a -> b
$ do
  (IO () -> Eff es ()) -> (STM (), IO ()) -> (STM (), Eff es ())
forall b c a. (b -> c) -> (a, b) -> (a, c)
forall (p :: Type -> Type -> Type) b c a.
Bifunctor p =>
(b -> c) -> p a b -> p a c
second IO () -> Eff es ()
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ ((STM (), IO ()) -> (STM (), Eff es ()))
-> IO (STM (), IO ()) -> IO (STM (), Eff es ())
forall (f :: Type -> Type) a b. Functor f => (a -> b) -> f a -> f b
<$> Fd -> IO (STM (), IO ())
C.threadWaitWriteSTM Fd
fd

----------------------------------------
-- Bound threads

-- | Lifted 'C.forkOS'.
forkOS :: Concurrent :> es => Eff es () -> Eff es C.ThreadId
forkOS :: forall (es :: [Effect]).
(Concurrent :> es) =>
Eff es () -> Eff es ThreadId
forkOS Eff es ()
k = (Env es -> IO ThreadId) -> Eff es ThreadId
forall (es :: [Effect]) a. (Env es -> IO a) -> Eff es a
unsafeEff ((Env es -> IO ThreadId) -> Eff es ThreadId)
-> (Env es -> IO ThreadId) -> Eff es ThreadId
forall a b. (a -> b) -> a -> b
$ \Env es
es -> do
  Env es
esF <- Env es -> IO (Env es)
forall (es :: [Effect]). Env es -> IO (Env es)
cloneEnv Env es
es
  IO () -> IO ThreadId
C.forkOS (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Eff es () -> Env es -> IO ()
forall (es :: [Effect]) a. Eff es a -> Env es -> IO a
unEff Eff es ()
k Env es
esF

-- | Lifted 'E.forkOSWithUnmask'.
forkOSWithUnmask
  :: Concurrent :> es
  => ((forall a. Eff es a -> Eff es a) -> Eff es ())
  -> Eff es C.ThreadId
forkOSWithUnmask :: forall (es :: [Effect]).
(Concurrent :> es) =>
((forall a. Eff es a -> Eff es a) -> Eff es ()) -> Eff es ThreadId
forkOSWithUnmask = (((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId)
-> ((forall a. Eff es a -> Eff es a) -> Eff es ())
-> Eff es ThreadId
forall a (es :: [Effect]).
(((forall c. IO c -> IO c) -> IO a) -> IO ThreadId)
-> ((forall c. Eff es c -> Eff es c) -> Eff es a)
-> Eff es ThreadId
liftForkWithUnmask ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId
C.forkOSWithUnmask

-- | Lifted 'C.isCurrentThreadBound'.
isCurrentThreadBound :: Concurrent :> es => Eff es Bool
isCurrentThreadBound :: forall (es :: [Effect]). (Concurrent :> es) => Eff es Bool
isCurrentThreadBound = IO Bool -> Eff es Bool
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ IO Bool
C.isCurrentThreadBound

-- | Lifted 'C.runInBoundThread'.
runInBoundThread :: Concurrent :> es => Eff es a -> Eff es a
runInBoundThread :: forall (es :: [Effect]) a.
(Concurrent :> es) =>
Eff es a -> Eff es a
runInBoundThread Eff es a
k = (Env es -> IO a) -> Eff es a
forall (es :: [Effect]) a. (Env es -> IO a) -> Eff es a
unsafeEff ((Env es -> IO a) -> Eff es a) -> (Env es -> IO a) -> Eff es a
forall a b. (a -> b) -> a -> b
$ \Env es
es -> do
  Env es
esF <- Env es -> IO (Env es)
forall (es :: [Effect]). Env es -> IO (Env es)
cloneEnv Env es
es
  IO a -> IO a
forall c. IO c -> IO c
C.runInBoundThread (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ Eff es a -> Env es -> IO a
forall (es :: [Effect]) a. Eff es a -> Env es -> IO a
unEff Eff es a
k Env es
esF

-- | Lifted 'C.runInUnboundThread'.
runInUnboundThread :: Concurrent :> es => Eff es a -> Eff es a
runInUnboundThread :: forall (es :: [Effect]) a.
(Concurrent :> es) =>
Eff es a -> Eff es a
runInUnboundThread Eff es a
k = (Env es -> IO a) -> Eff es a
forall (es :: [Effect]) a. (Env es -> IO a) -> Eff es a
unsafeEff ((Env es -> IO a) -> Eff es a) -> (Env es -> IO a) -> Eff es a
forall a b. (a -> b) -> a -> b
$ \Env es
es -> do
  Env es
esF <- Env es -> IO (Env es)
forall (es :: [Effect]). Env es -> IO (Env es)
cloneEnv Env es
es
  IO a -> IO a
forall c. IO c -> IO c
C.runInUnboundThread (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ Eff es a -> Env es -> IO a
forall (es :: [Effect]) a. Eff es a -> Env es -> IO a
unEff Eff es a
k Env es
esF

----------------------------------------
-- Weak references to ThreadIds

-- | Lifted 'C.mkWeakThreadId'.
mkWeakThreadId :: Concurrent :> es => C.ThreadId -> Eff es (Weak C.ThreadId)
mkWeakThreadId :: forall (es :: [Effect]).
(Concurrent :> es) =>
ThreadId -> Eff es (Weak ThreadId)
mkWeakThreadId = IO (Weak ThreadId) -> Eff es (Weak ThreadId)
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ (IO (Weak ThreadId) -> Eff es (Weak ThreadId))
-> (ThreadId -> IO (Weak ThreadId))
-> ThreadId
-> Eff es (Weak ThreadId)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ThreadId -> IO (Weak ThreadId)
C.mkWeakThreadId

----------------------------------------
-- Helpers

liftForkWithUnmask
  :: (((forall c. IO c -> IO c) -> IO a) -> IO C.ThreadId)
  -> ((forall c. Eff es c -> Eff es c) -> Eff es a)
  -> Eff es C.ThreadId
liftForkWithUnmask :: forall a (es :: [Effect]).
(((forall c. IO c -> IO c) -> IO a) -> IO ThreadId)
-> ((forall c. Eff es c -> Eff es c) -> Eff es a)
-> Eff es ThreadId
liftForkWithUnmask ((forall c. IO c -> IO c) -> IO a) -> IO ThreadId
fork (forall c. Eff es c -> Eff es c) -> Eff es a
action = (Env es -> IO ThreadId) -> Eff es ThreadId
forall (es :: [Effect]) a. (Env es -> IO a) -> Eff es a
unsafeEff ((Env es -> IO ThreadId) -> Eff es ThreadId)
-> (Env es -> IO ThreadId) -> Eff es ThreadId
forall a b. (a -> b) -> a -> b
$ \Env es
es -> do
  Env es
esF <- Env es -> IO (Env es)
forall (es :: [Effect]). Env es -> IO (Env es)
cloneEnv Env es
es
  -- Unmask never runs its argument in a different thread.
  ((forall c. IO c -> IO c) -> IO a) -> IO ThreadId
fork (((forall c. IO c -> IO c) -> IO a) -> IO ThreadId)
-> ((forall c. IO c -> IO c) -> IO a) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall c. IO c -> IO c
unmask -> Eff es a -> Env es -> IO a
forall (es :: [Effect]) a. Eff es a -> Env es -> IO a
unEff ((forall c. Eff es c -> Eff es c) -> Eff es a
action ((forall c. Eff es c -> Eff es c) -> Eff es a)
-> (forall c. Eff es c -> Eff es c) -> Eff es a
forall a b. (a -> b) -> a -> b
$ (IO c -> IO c) -> Eff es c -> Eff es c
forall a b (es :: [Effect]). (IO a -> IO b) -> Eff es a -> Eff es b
reallyUnsafeLiftMapIO IO c -> IO c
forall c. IO c -> IO c
unmask) Env es
esF