{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleContexts #-}
module Streamly.Streams.SVar
( fromSVar
, fromStreamVar
, toSVar
)
where
import Control.Exception (fromException)
import Control.Monad (when)
import Control.Monad.Catch (throwM)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (newIORef, readIORef, mkWeakIORef, writeIORef)
import Data.Maybe (isNothing)
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup ((<>))
#endif
import System.IO (hPutStrLn, stderr)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import System.Mem (performMajorGC)
import Streamly.Internal.Data.SVar
import Streamly.Streams.StreamK hiding (reverse)
printSVar :: SVar t m a -> String -> IO ()
printSVar sv how = do
svInfo <- dumpSVar sv
hPutStrLn stderr $ "\n" <> how <> "\n" <> svInfo
{-# NOINLINE fromStreamVar #-}
fromStreamVar :: MonadAsync m => SVar Stream m a -> Stream m a
fromStreamVar sv = mkStream $ \st yld sng stp -> do
list <- readOutputQ sv
foldStream st yld sng stp $ processEvents $ reverse list
where
allDone stp = do
when (svarInspectMode sv) $ do
t <- liftIO $ getTime Monotonic
liftIO $ writeIORef (svarStopTime (svarStats sv)) (Just t)
liftIO $ printSVar sv "SVar Done"
stp
{-# INLINE processEvents #-}
processEvents [] = mkStream $ \st yld sng stp -> do
done <- postProcess sv
if done
then allDone stp
else foldStream st yld sng stp $ fromStreamVar sv
processEvents (ev : es) = mkStream $ \st yld sng stp -> do
let rest = processEvents es
case ev of
ChildYield a -> yld a rest
ChildStop tid e -> do
accountThread sv tid
case e of
Nothing -> do
stop <- shouldStop tid
if stop
then liftIO (cleanupSVar sv) >> allDone stp
else foldStream st yld sng stp rest
Just ex ->
case fromException ex of
Just ThreadAbort ->
foldStream st yld sng stp rest
Nothing -> liftIO (cleanupSVar sv) >> throwM ex
shouldStop tid =
case svarStopStyle sv of
StopNone -> return False
StopAny -> return True
StopBy -> do
sid <- liftIO $ readIORef (svarStopBy sv)
return $ if tid == sid then True else False
{-# INLINE fromSVar #-}
fromSVar :: (MonadAsync m, IsStream t) => SVar Stream m a -> t m a
fromSVar sv =
mkStream $ \st yld sng stp -> do
ref <- liftIO $ newIORef ()
_ <- liftIO $ mkWeakIORef ref hook
foldStreamShared st yld sng stp $
fromStream $ fromStreamVar sv{svarRef = Just ref}
where
hook = do
when (svarInspectMode sv) $ do
r <- liftIO $ readIORef (svarStopTime (svarStats sv))
when (isNothing r) $
printSVar sv "SVar Garbage Collected"
cleanupSVar sv
when (svarInspectMode sv) performMajorGC
toSVar :: (IsStream t, MonadAsync m) => SVar Stream m a -> t m a -> m ()
toSVar sv m = toStreamVar sv (toStream m)