{-# LANGUAGE DataKinds #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE AllowAmbiguousTypes #-}
module Network.ZRE.Chan (
zreChan
, zreChan'
, zreChanWith
, mapToGroup
, mapToGroup'
) where
import Control.Concurrent.STM (TChan)
import Data.Serialize (Serialize)
import Data.ZRE (Group, KnownGroup, knownToGroup)
import Network.ZRE (ZRE)
import qualified Control.Concurrent.Async.Lifted
import qualified Control.Concurrent.STM
import qualified Control.Monad
import qualified Control.Monad.IO.Class
import qualified Data.Serialize
import qualified Network.ZRE
mapToGroup :: forall fromGroup toGroup from to .
( Serialize from
, Show from
, Serialize to
, KnownGroup fromGroup
, KnownGroup toGroup
)
=> (from -> to)
-> ZRE ()
mapToGroup :: forall (fromGroup :: Symbol) (toGroup :: Symbol) from to.
(Serialize from, Show from, Serialize to, KnownGroup fromGroup,
KnownGroup toGroup) =>
(from -> to) -> ZRE ()
mapToGroup from -> to
fn = forall from to.
(Show from, Serialize from, Serialize to) =>
Group -> Group -> (from -> to) -> ZRE ()
mapToGroup'
(forall (n :: Symbol). KnownGroup n => Group
knownToGroup @fromGroup)
(forall (n :: Symbol). KnownGroup n => Group
knownToGroup @toGroup)
from -> to
fn
mapToGroup' :: (Show from, Serialize from, Serialize to)
=> Group
-> Group
-> (from -> to)
-> ZRE ()
mapToGroup' :: forall from to.
(Show from, Serialize from, Serialize to) =>
Group -> Group -> (from -> to) -> ZRE ()
mapToGroup' Group
fromGroup Group
toGroup from -> to
fn = do
Group -> ZRE ()
Network.ZRE.zjoin Group
fromGroup
Group -> ZRE ()
Network.ZRE.zjoin Group
toGroup
forall decoded.
Group
-> (ByteString -> Either String decoded)
-> (Either String decoded -> ZRE ())
-> ZRE ()
Network.ZRE.zrecvShoutsDecode Group
fromGroup forall a. Serialize a => ByteString -> Either String a
Data.Serialize.decode
forall a b. (a -> b) -> a -> b
$ \(Either String from
mdec :: Either String from) -> do
case Either String from
mdec of
Left String
e -> do
forall a. String -> ZRE a
Network.ZRE.zfail
forall a b. (a -> b) -> a -> b
$ String
"Unable to decode message from "
forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> String
show Group
fromGroup forall a. [a] -> [a] -> [a]
++ String
" error was: " forall a. [a] -> [a] -> [a]
++ String
e
Right from
dec -> do
Group -> ByteString -> ZRE ()
Network.ZRE.zshout Group
toGroup forall a b. (a -> b) -> a -> b
$ forall a. Serialize a => a -> ByteString
Data.Serialize.encode forall a b. (a -> b) -> a -> b
$ from -> to
fn from
dec
zreChan :: forall input output inputGroup outputGroup .
( Serialize input
, Serialize output
, KnownGroup inputGroup
, KnownGroup outputGroup
)
=> IO ( TChan input
, TChan output)
zreChan :: forall input output (inputGroup :: Symbol) (outputGroup :: Symbol).
(Serialize input, Serialize output, KnownGroup inputGroup,
KnownGroup outputGroup) =>
IO (TChan input, TChan output)
zreChan = forall input output.
(Serialize input, Serialize output) =>
Group -> Group -> IO (TChan input, TChan output)
zreChan'
(forall (n :: Symbol). KnownGroup n => Group
knownToGroup @outputGroup)
(forall (n :: Symbol). KnownGroup n => Group
knownToGroup @inputGroup)
zreChan' :: (Serialize input, Serialize output)
=> Group
-> Group
-> IO ( TChan input
, TChan output)
zreChan' :: forall input output.
(Serialize input, Serialize output) =>
Group -> Group -> IO (TChan input, TChan output)
zreChan' = forall input output.
(Serialize input, Serialize output) =>
(ZRE () -> IO ())
-> Group -> Group -> IO (TChan input, TChan output)
zreChanWith forall a. ZRE a -> IO ()
Network.ZRE.runZre
zreChanWith :: (Serialize input, Serialize output)
=> (ZRE () -> IO ())
-> Group
-> Group
-> IO ( TChan input
, TChan output)
zreChanWith :: forall input output.
(Serialize input, Serialize output) =>
(ZRE () -> IO ())
-> Group -> Group -> IO (TChan input, TChan output)
zreChanWith ZRE () -> IO ()
runner Group
outputGroup Group
inputGroup = do
TChan input
chanInput <- forall a. IO (TChan a)
Control.Concurrent.STM.newTChanIO
TChan output
chanOutput <- forall a. IO (TChan a)
Control.Concurrent.STM.newTChanIO
Async ()
_ <- forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
Control.Concurrent.Async.Lifted.async forall a b. (a -> b) -> a -> b
$ ZRE () -> IO ()
runner forall a b. (a -> b) -> a -> b
$ do
Group -> ZRE ()
Network.ZRE.zjoin Group
outputGroup
forall (f :: * -> *) a. Functor f => f a -> f ()
Control.Monad.void
forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
Control.Concurrent.Async.Lifted.async
forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Applicative f => f a -> f b
Control.Monad.forever
forall a b. (a -> b) -> a -> b
$ do
input
out <-
forall (m :: * -> *) a. MonadIO m => IO a -> m a
Control.Monad.IO.Class.liftIO
forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
Control.Concurrent.STM.atomically
forall a b. (a -> b) -> a -> b
$ forall a. TChan a -> STM a
Control.Concurrent.STM.readTChan TChan input
chanInput
Group -> ByteString -> ZRE ()
Network.ZRE.zshout Group
outputGroup
forall a b. (a -> b) -> a -> b
$ forall a. Serialize a => a -> ByteString
Data.Serialize.encode input
out
Group -> ZRE ()
Network.ZRE.zjoin Group
inputGroup
forall decoded.
Group
-> (ByteString -> Either String decoded)
-> (Either String decoded -> ZRE ())
-> ZRE ()
Network.ZRE.zrecvShoutsDecode Group
inputGroup forall a. Serialize a => ByteString -> Either String a
Data.Serialize.decode
forall a b. (a -> b) -> a -> b
$ forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either
(\String
e -> forall a. String -> ZRE a
Network.ZRE.zfail
forall a b. (a -> b) -> a -> b
$ String
"zreChan: Unable to decode message from input "
forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> String
show Group
inputGroup
forall a. [a] -> [a] -> [a]
++ String
" error was: "
forall a. [a] -> [a] -> [a]
++ String
e
)
( forall (m :: * -> *) a. MonadIO m => IO a -> m a
Control.Monad.IO.Class.liftIO
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
Control.Concurrent.STM.atomically
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TChan a -> a -> STM ()
Control.Concurrent.STM.writeTChan TChan output
chanOutput
)
forall (m :: * -> *) a. Monad m => a -> m a
return (TChan input
chanInput, TChan output
chanOutput)