{-# LANGUAGE OverloadedStrings #-}
module Network.ZRE.ZMQ (zreRouter, zreDealer) where

import Control.Monad
import Control.Concurrent.STM
import Control.Monad.IO.Class
import Data.ByteString (ByteString)
import qualified System.ZMQ4.Monadic as ZMQ
import qualified Data.ByteString.Char8 as B
import qualified Data.List.NonEmpty as NE
import Data.Time.Clock.POSIX

import Data.ZRE
import System.ZMQ4.Endpoint

zreDealer :: Control.Monad.IO.Class.MonadIO m
          => Endpoint
          -> ByteString
          -> TBQueue ZRECmd
          -> m a
zreDealer :: forall (m :: * -> *) a.
MonadIO m =>
Endpoint -> ByteString -> TBQueue ZRECmd -> m a
zreDealer Endpoint
endpoint ByteString
ourUUID TBQueue ZRECmd
peerQ = forall (m :: * -> *) a. MonadIO m => (forall z. ZMQ z a) -> m a
ZMQ.runZMQ forall a b. (a -> b) -> a -> b
$ do
  Socket z Dealer
d <- forall t z. SocketType t => t -> ZMQ z (Socket z t)
ZMQ.socket Dealer
ZMQ.Dealer
  forall i z t.
Integral i =>
Restricted (Nneg1, Int32) i -> Socket z t -> ZMQ z ()
ZMQ.setLinger (forall r v. Restriction r v => v -> Restricted r v
ZMQ.restrict (Int
1 :: Int)) Socket z Dealer
d
  -- The sender MAY set a high-water mark (HWM) of, for example, 100 messages per second (if the timeout period is 30 second, this means a HWM of 3,000 messages).
  forall i z t.
Integral i =>
Restricted (N0, Int32) i -> Socket z t -> ZMQ z ()
ZMQ.setSendHighWM (forall r v. Restriction r v => v -> Restricted r v
ZMQ.restrict forall a b. (a -> b) -> a -> b
$ (Int
30 forall a. Num a => a -> a -> a
* Int
100 :: Int)) Socket z Dealer
d
  forall i z t.
Integral i =>
Restricted (Nneg1, Int32) i -> Socket z t -> ZMQ z ()
ZMQ.setSendTimeout (forall r v. Restriction r v => v -> Restricted r v
ZMQ.restrict (Int
0 :: Int)) Socket z Dealer
d
  -- prepend '1' in front of 16bit UUID, ZMQ.restrict would do that for us but protocol requires it
  forall z t.
Restricted (N1, N254) ByteString -> Socket z t -> ZMQ z ()
ZMQ.setIdentity (forall r v. Restriction r v => v -> Restricted r v
ZMQ.restrict forall a b. (a -> b) -> a -> b
$ Char -> ByteString -> ByteString
B.cons Char
'1' ByteString
ourUUID) Socket z Dealer
d
  forall z t. Socket z t -> String -> ZMQ z ()
ZMQ.connect Socket z Dealer
d forall a b. (a -> b) -> a -> b
$ ByteString -> String
B.unpack forall a b. (a -> b) -> a -> b
$ Endpoint -> ByteString
pEndpoint Endpoint
endpoint
  forall {t} {z} {b}. Sender t => Socket z t -> Int -> ZMQ z b
loop Socket z Dealer
d Int
1 -- sequence number must start with 1
  where loop :: Socket z t -> Int -> ZMQ z b
loop Socket z t
d Int
x = do
           ZRECmd
cmd <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBQueue a -> STM a
readTBQueue TBQueue ZRECmd
peerQ
           -- liftIO $ print "Sending" >> (print $ newZRE x cmd)
           forall t z.
Sender t =>
Socket z t -> NonEmpty ByteString -> ZMQ z ()
ZMQ.sendMulti Socket z t
d forall a b. (a -> b) -> a -> b
$ (forall a. [a] -> NonEmpty a
NE.fromList forall a b. (a -> b) -> a -> b
$ ZREMsg -> [ByteString]
encodeZRE forall a b. (a -> b) -> a -> b
$ Int -> ZRECmd -> ZREMsg
newZRE Int
x ZRECmd
cmd :: NE.NonEmpty ByteString)
           Socket z t -> Int -> ZMQ z b
loop Socket z t
d (Int
xforall a. Num a => a -> a -> a
+Int
1)

zreRouter :: Control.Monad.IO.Class.MonadIO m
          => Endpoint
          -> (ZREMsg -> IO a1)
          -> m a
zreRouter :: forall (m :: * -> *) a1 a.
MonadIO m =>
Endpoint -> (ZREMsg -> IO a1) -> m a
zreRouter Endpoint
endpoint ZREMsg -> IO a1
handler = forall (m :: * -> *) a. MonadIO m => (forall z. ZMQ z a) -> m a
ZMQ.runZMQ forall a b. (a -> b) -> a -> b
$ do
  Socket z Router
sock <- forall t z. SocketType t => t -> ZMQ z (Socket z t)
ZMQ.socket Router
ZMQ.Router
  forall z t. Socket z t -> String -> ZMQ z ()
ZMQ.bind Socket z Router
sock forall a b. (a -> b) -> a -> b
$ ByteString -> String
B.unpack forall a b. (a -> b) -> a -> b
$ Endpoint -> ByteString
pEndpoint Endpoint
endpoint
  forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
     [ByteString]
input <- forall t z. Receiver t => Socket z t -> ZMQ z [ByteString]
ZMQ.receiveMulti Socket z Router
sock
     UTCTime
now <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ IO UTCTime
getCurrentTime
     case [ByteString] -> Either String ZREMsg
parseZRE [ByteString]
input of
        Left String
err -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Show a => a -> IO ()
print forall a b. (a -> b) -> a -> b
$ String
"Malformed message received: " forall a. [a] -> [a] -> [a]
++ String
err
        Right ZREMsg
msg -> do
          let updateTime :: ZREMsg -> ZREMsg
updateTime = \ZREMsg
x -> ZREMsg
x { msgTime :: Maybe UTCTime
msgTime = forall a. a -> Maybe a
Just UTCTime
now }
          forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ ZREMsg -> IO a1
handler (ZREMsg -> ZREMsg
updateTime ZREMsg
msg)
          forall (m :: * -> *) a. Monad m => a -> m a
return ()