{-# LANGUAGE OverloadedStrings #-}
module Network.ZRE.ZMQ (zreRouter, zreDealer) where
import Control.Monad
import Control.Concurrent.STM
import Control.Monad.IO.Class
import Data.ByteString (ByteString)
import qualified System.ZMQ4.Monadic as ZMQ
import qualified Data.ByteString.Char8 as B
import qualified Data.List.NonEmpty as NE
import Data.Time.Clock.POSIX
import Data.ZRE
import System.ZMQ4.Endpoint
zreDealer :: Control.Monad.IO.Class.MonadIO m
=> Endpoint
-> ByteString
-> TBQueue ZRECmd
-> m a
zreDealer :: forall (m :: * -> *) a.
MonadIO m =>
Endpoint -> ByteString -> TBQueue ZRECmd -> m a
zreDealer Endpoint
endpoint ByteString
ourUUID TBQueue ZRECmd
peerQ = forall (m :: * -> *) a. MonadIO m => (forall z. ZMQ z a) -> m a
ZMQ.runZMQ forall a b. (a -> b) -> a -> b
$ do
Socket z Dealer
d <- forall t z. SocketType t => t -> ZMQ z (Socket z t)
ZMQ.socket Dealer
ZMQ.Dealer
forall i z t.
Integral i =>
Restricted (Nneg1, Int32) i -> Socket z t -> ZMQ z ()
ZMQ.setLinger (forall r v. Restriction r v => v -> Restricted r v
ZMQ.restrict (Int
1 :: Int)) Socket z Dealer
d
forall i z t.
Integral i =>
Restricted (N0, Int32) i -> Socket z t -> ZMQ z ()
ZMQ.setSendHighWM (forall r v. Restriction r v => v -> Restricted r v
ZMQ.restrict forall a b. (a -> b) -> a -> b
$ (Int
30 forall a. Num a => a -> a -> a
* Int
100 :: Int)) Socket z Dealer
d
forall i z t.
Integral i =>
Restricted (Nneg1, Int32) i -> Socket z t -> ZMQ z ()
ZMQ.setSendTimeout (forall r v. Restriction r v => v -> Restricted r v
ZMQ.restrict (Int
0 :: Int)) Socket z Dealer
d
forall z t.
Restricted (N1, N254) ByteString -> Socket z t -> ZMQ z ()
ZMQ.setIdentity (forall r v. Restriction r v => v -> Restricted r v
ZMQ.restrict forall a b. (a -> b) -> a -> b
$ Char -> ByteString -> ByteString
B.cons Char
'1' ByteString
ourUUID) Socket z Dealer
d
forall z t. Socket z t -> String -> ZMQ z ()
ZMQ.connect Socket z Dealer
d forall a b. (a -> b) -> a -> b
$ ByteString -> String
B.unpack forall a b. (a -> b) -> a -> b
$ Endpoint -> ByteString
pEndpoint Endpoint
endpoint
forall {t} {z} {b}. Sender t => Socket z t -> Int -> ZMQ z b
loop Socket z Dealer
d Int
1
where loop :: Socket z t -> Int -> ZMQ z b
loop Socket z t
d Int
x = do
ZRECmd
cmd <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBQueue a -> STM a
readTBQueue TBQueue ZRECmd
peerQ
forall t z.
Sender t =>
Socket z t -> NonEmpty ByteString -> ZMQ z ()
ZMQ.sendMulti Socket z t
d forall a b. (a -> b) -> a -> b
$ (forall a. [a] -> NonEmpty a
NE.fromList forall a b. (a -> b) -> a -> b
$ ZREMsg -> [ByteString]
encodeZRE forall a b. (a -> b) -> a -> b
$ Int -> ZRECmd -> ZREMsg
newZRE Int
x ZRECmd
cmd :: NE.NonEmpty ByteString)
Socket z t -> Int -> ZMQ z b
loop Socket z t
d (Int
xforall a. Num a => a -> a -> a
+Int
1)
zreRouter :: Control.Monad.IO.Class.MonadIO m
=> Endpoint
-> (ZREMsg -> IO a1)
-> m a
zreRouter :: forall (m :: * -> *) a1 a.
MonadIO m =>
Endpoint -> (ZREMsg -> IO a1) -> m a
zreRouter Endpoint
endpoint ZREMsg -> IO a1
handler = forall (m :: * -> *) a. MonadIO m => (forall z. ZMQ z a) -> m a
ZMQ.runZMQ forall a b. (a -> b) -> a -> b
$ do
Socket z Router
sock <- forall t z. SocketType t => t -> ZMQ z (Socket z t)
ZMQ.socket Router
ZMQ.Router
forall z t. Socket z t -> String -> ZMQ z ()
ZMQ.bind Socket z Router
sock forall a b. (a -> b) -> a -> b
$ ByteString -> String
B.unpack forall a b. (a -> b) -> a -> b
$ Endpoint -> ByteString
pEndpoint Endpoint
endpoint
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
[ByteString]
input <- forall t z. Receiver t => Socket z t -> ZMQ z [ByteString]
ZMQ.receiveMulti Socket z Router
sock
UTCTime
now <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ IO UTCTime
getCurrentTime
case [ByteString] -> Either String ZREMsg
parseZRE [ByteString]
input of
Left String
err -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Show a => a -> IO ()
print forall a b. (a -> b) -> a -> b
$ String
"Malformed message received: " forall a. [a] -> [a] -> [a]
++ String
err
Right ZREMsg
msg -> do
let updateTime :: ZREMsg -> ZREMsg
updateTime = \ZREMsg
x -> ZREMsg
x { msgTime :: Maybe UTCTime
msgTime = forall a. a -> Maybe a
Just UTCTime
now }
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ ZREMsg -> IO a1
handler (ZREMsg -> ZREMsg
updateTime ZREMsg
msg)
forall (m :: * -> *) a. Monad m => a -> m a
return ()