{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
module Network.ZRE (
runZre
, runZreCfg
, runZreEnvConfig
, runZreParse
, readZ
, writeZ
, unReadZ
, defaultConf
, API(..)
, Event(..)
, ZRE
, Z.Group
, Z.mkGroup
, Z.unGroup
, zjoin
, zleave
, zshout
, zshout'
, zwhisper
, zdebug
, znodebug
, zquit
, zfail
, zrecv
, pEndpoint
, toASCIIBytes
, getApiQueue
, getEventQueue
, module Network.ZRE.Lib
) where
import Prelude hiding (putStrLn, take)
import Control.Monad hiding (join)
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception (SomeException)
import qualified Control.Exception.Lifted
import Data.ByteString (ByteString)
import Data.UUID
import Data.UUID.V1
import Data.Maybe
import qualified Data.Set as S
import qualified Data.Map as M
import qualified Data.ByteString.Char8 as B
import qualified Data.ZRE as Z
import Network.ZRE.Beacon
import Network.ZRE.Config
import Network.ZRE.Lib
import Network.ZRE.Options
import Network.ZRE.Peer
import Network.ZRE.Types
import Network.ZRE.Utils
import Network.ZRE.ZMQ
import Network.ZGossip
import System.ZMQ4.Endpoint
import Options.Applicative
getIfaces :: [ByteString]
-> IO [(ByteString, ByteString, ByteString)]
getIfaces :: [ByteString] -> IO [(ByteString, ByteString, ByteString)]
getIfaces [ByteString]
ifcs = do
case [ByteString]
ifcs of
[] -> do
Maybe (ByteString, ByteString)
dr <- IO (Maybe (ByteString, ByteString))
getDefRoute
case Maybe (ByteString, ByteString)
dr of
Maybe (ByteString, ByteString)
Nothing -> forall b. ByteString -> IO b
exitFail ByteString
"Unable to get default route"
Just (ByteString
_route, ByteString
iface) -> do
(ByteString, ByteString, ByteString)
i <- ByteString -> IO (ByteString, ByteString, ByteString)
getIfaceReport ByteString
iface
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ [(ByteString, ByteString, ByteString)
i]
[ByteString]
x -> do
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [ByteString]
x ByteString -> IO (ByteString, ByteString, ByteString)
getIfaceReport
runIface :: Show a
=> TVar ZREState
-> Int
-> (ByteString, ByteString, a)
-> IO ()
runIface :: forall a.
Show a =>
TVar ZREState -> GroupSeq -> (ByteString, ByteString, a) -> IO ()
runIface TVar ZREState
s GroupSeq
port (ByteString
iface, ByteString
ipv4, a
ipv6) = do
Async ()
r <- forall a. IO a -> IO (Async a)
async forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a1 a.
MonadIO m =>
Endpoint -> (ZREMsg -> IO a1) -> m a
zreRouter (ByteString -> GroupSeq -> Endpoint
newTCPEndpoint ByteString
ipv4 GroupSeq
port) (TVar ZREState -> ZREMsg -> IO ()
inbox TVar ZREState
s)
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s forall a b. (a -> b) -> a -> b
$ \ZREState
x ->
ZREState
x { zreIfaces :: Map ByteString [Async ()]
zreIfaces = forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert ByteString
iface [Async ()
r] (ZREState -> Map ByteString [Async ()]
zreIfaces ZREState
x) }
runZre :: ZRE a -> IO ()
runZre :: forall a. ZRE a -> IO ()
runZre ZRE a
app = forall extra a. Parser extra -> (extra -> ZRE a) -> IO ()
runZreParse (forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (forall a b. a -> b -> a
const ZRE a
app)
runZreParse :: Parser extra -> (extra -> ZRE a) -> IO ()
runZreParse :: forall extra a. Parser extra -> (extra -> ZRE a) -> IO ()
runZreParse Parser extra
parseExtra extra -> ZRE a
app = do
ZRECfg
cfgIni <- String -> IO ZRECfg
envZRECfg String
"zre"
(ZRECfg
cfgOpts, extra
extras) <- forall a. ParserInfo a -> IO a
execParser ParserInfo (ZRECfg, extra)
opts
forall a. ZRECfg -> ZRE a -> IO ()
runZreCfg (ZRECfg -> ZRECfg -> ZRECfg
overrideNonDefault ZRECfg
cfgIni ZRECfg
cfgOpts) (extra -> ZRE a
app extra
extras)
where
opts :: ParserInfo (ZRECfg, extra)
opts = forall a. Parser a -> InfoMod a -> ParserInfo a
info (((,) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Parser ZRECfg
parseOptions forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Parser extra
parseExtra) forall (f :: * -> *) a b. Applicative f => f a -> f (a -> b) -> f b
<**> forall a. Parser (a -> a)
helper)
( forall a. InfoMod a
fullDesc
forall a. Semigroup a => a -> a -> a
<> forall a. String -> InfoMod a
progDesc String
"ZRE"
forall a. Semigroup a => a -> a -> a
<> forall a. String -> InfoMod a
header String
"zre tools" )
runZreEnvConfig :: ZRE a -> IO ()
runZreEnvConfig :: forall a. ZRE a -> IO ()
runZreEnvConfig ZRE a
app = do
ZRECfg
cfgIni <- String -> IO ZRECfg
envZRECfg String
"zre"
forall a. ZRECfg -> ZRE a -> IO ()
runZreCfg ZRECfg
cfgIni ZRE a
app
runZreCfg :: ZRECfg -> ZRE a -> IO ()
runZreCfg :: forall a. ZRECfg -> ZRE a -> IO ()
runZreCfg cfg :: ZRECfg
cfg@ZRECfg{Bool
Float
[ByteString]
Maybe Endpoint
ByteString
Endpoint
zreDbg :: ZRECfg -> Bool
zreZGossip :: ZRECfg -> Maybe Endpoint
zreMCast :: ZRECfg -> Endpoint
zreInterfaces :: ZRECfg -> [ByteString]
zreBeaconPeriod :: ZRECfg -> Float
zreDeadPeriod :: ZRECfg -> Float
zreQuietPingRate :: ZRECfg -> Float
zreQuietPeriod :: ZRECfg -> Float
zreNamed :: ZRECfg -> ByteString
zreDbg :: Bool
zreZGossip :: Maybe Endpoint
zreMCast :: Endpoint
zreInterfaces :: [ByteString]
zreBeaconPeriod :: Float
zreDeadPeriod :: Float
zreQuietPingRate :: Float
zreQuietPeriod :: Float
zreNamed :: ByteString
..} ZRE a
app = do
[(ByteString, ByteString, ByteString)]
ifcs <- [ByteString] -> IO [(ByteString, ByteString, ByteString)]
getIfaces [ByteString]
zreInterfaces
UUID
u <- forall (m :: * -> *) b a.
Monad m =>
m b -> (a -> m b) -> m (Maybe a) -> m b
maybeM (forall b. ByteString -> IO b
exitFail ByteString
"Unable to get UUID") forall (m :: * -> *) a. Monad m => a -> m a
return IO (Maybe UUID)
nextUUID
let uuid :: ByteString
uuid = UUID -> ByteString
uuidByteString UUID
u
case [(ByteString, ByteString, ByteString)]
ifcs of
[] -> forall b. ByteString -> IO b
exitFail ByteString
"No interfaces found"
ifaces :: [(ByteString, ByteString, ByteString)]
ifaces@((ByteString
_ifcname, ByteString
ipv4, ByteString
_ipv6):[(ByteString, ByteString, ByteString)]
_) -> do
GroupSeq
zrePort <- ByteString -> IO GroupSeq
randPort ByteString
ipv4
let zreEndpoint :: Endpoint
zreEndpoint = ByteString -> GroupSeq -> Endpoint
newTCPEndpoint ByteString
ipv4 GroupSeq
zrePort
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
zreDbg forall a b. (a -> b) -> a -> b
$ ByteString -> IO ()
B.putStrLn forall a b. (a -> b) -> a -> b
$ ByteString
"Starting with " forall a. Semigroup a => a -> a -> a
<> (forall a. Show a => a -> ByteString
bshow Endpoint
zreEndpoint)
ByteString
zreName <- ByteString -> IO ByteString
getName ByteString
zreNamed
TBQueue Event
inQ <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. Natural -> STM (TBQueue a)
newTBQueue Natural
1000000
TBQueue API
outQ <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. Natural -> STM (TBQueue a)
newTBQueue Natural
1000000
TVar ZREState
s <- ByteString
-> Endpoint
-> UUID
-> TBQueue Event
-> TBQueue API
-> Bool
-> ZRECfg
-> IO (TVar ZREState)
newZREState ByteString
zreName Endpoint
zreEndpoint UUID
u TBQueue Event
inQ TBQueue API
outQ Bool
zreDbg ZRECfg
cfg
case Maybe Endpoint
zreZGossip of
Maybe Endpoint
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just Endpoint
end -> forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. IO a -> IO (Async a)
async forall a b. (a -> b) -> a -> b
$ ByteString -> Endpoint -> Endpoint -> (ZGSMsg -> IO ()) -> IO ()
zgossipClient ByteString
uuid Endpoint
end Endpoint
zreEndpoint (TBQueue API -> ZGSMsg -> IO ()
zgossipZRE TBQueue API
outQ)
(AddrInfo
mCastAddr:[AddrInfo]
_) <- Endpoint -> IO [AddrInfo]
toAddrInfo Endpoint
zreMCast
Async ()
_beaconAsync <- forall a. IO a -> IO (Async a)
async forall a b. (a -> b) -> a -> b
$ Float -> AddrInfo -> ByteString -> GroupSeq -> IO ()
beacon Float
zreBeaconPeriod AddrInfo
mCastAddr ByteString
uuid GroupSeq
zrePort
Async Any
_beaconRecvAsync <- forall a. IO a -> IO (Async a)
async forall a b. (a -> b) -> a -> b
$ forall b. TVar ZREState -> Endpoint -> IO b
beaconRecv TVar ZREState
s Endpoint
zreMCast
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (forall a.
Show a =>
TVar ZREState -> GroupSeq -> (ByteString, ByteString, a) -> IO ()
runIface TVar ZREState
s GroupSeq
zrePort) [(ByteString, ByteString, ByteString)]
ifaces
Async ()
apiAsync <- forall a. IO a -> IO (Async a)
async forall a b. (a -> b) -> a -> b
$ TVar ZREState -> IO ()
api TVar ZREState
s
GroupSeq -> IO ()
threadDelay GroupSeq
500000
Async a
_userAppAsync <- forall a. IO a -> IO (Async a)
async
forall a b. (a -> b) -> a -> b
$ forall a. ZRE a -> TBQueue Event -> TBQueue API -> IO a
runZ
(ZRE a
app
forall (m :: * -> *) e a.
(MonadBaseControl IO m, Exception e) =>
m a -> (e -> m a) -> m a
`Control.Exception.Lifted.catch`
(\SomeException
e -> do let err :: String
err = forall a. Show a => a -> String
show (SomeException
e :: SomeException) in forall a. String -> ZRE a
zfail String
err)
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> m b -> m a
`Control.Exception.Lifted.finally`
ZRE ()
zquit
)
TBQueue Event
inQ
TBQueue API
outQ
forall a. Async a -> IO a
wait Async ()
apiAsync
forall (m :: * -> *) a. Monad m => a -> m a
return ()
api :: TVar ZREState -> IO ()
api :: TVar ZREState -> IO ()
api TVar ZREState
s = do
API
a <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> STM a
readTVar TVar ZREState
s forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. TBQueue a -> STM a
readTBQueue forall b c a. (b -> c) -> (a -> b) -> a -> c
. ZREState -> TBQueue API
zreOut
TVar ZREState -> API -> IO ()
handleApi TVar ZREState
s API
a
case API
a of
API
DoQuit -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
API
_ -> TVar ZREState -> IO ()
api TVar ZREState
s
handleApi :: TVar ZREState -> API -> IO ()
handleApi :: TVar ZREState -> API -> IO ()
handleApi TVar ZREState
s API
act = do
case API
act of
DoJoin Group
group -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
STM ()
incGroupSeq
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s forall a b. (a -> b) -> a -> b
$ \ZREState
x -> ZREState
x { zreGroups :: Groups
zreGroups = forall a. Ord a => a -> Set a -> Set a
S.insert Group
group (ZREState -> Groups
zreGroups ZREState
x) }
ZREState
st <- forall a. TVar a -> STM a
readTVar TVar ZREState
s
TVar ZREState -> Group -> GroupSeq -> STM ()
msgAllJoin TVar ZREState
s Group
group (ZREState -> GroupSeq
zreGroupSeq ZREState
st)
DoLeave Group
group -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
STM ()
incGroupSeq
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s forall a b. (a -> b) -> a -> b
$ \ZREState
x -> ZREState
x { zreGroups :: Groups
zreGroups = forall a. Ord a => a -> Set a -> Set a
S.delete Group
group (ZREState -> Groups
zreGroups ZREState
x) }
ZREState
st <- forall a. TVar a -> STM a
readTVar TVar ZREState
s
TVar ZREState -> Group -> GroupSeq -> STM ()
msgAllLeave TVar ZREState
s Group
group (ZREState -> GroupSeq
zreGroupSeq ZREState
st)
DoShout Group
group ByteString
msg -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ TVar ZREState -> Group -> ByteString -> STM ()
shoutGroup TVar ZREState
s Group
group ByteString
msg
DoShoutMulti Group
group [ByteString]
mmsg -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ TVar ZREState -> Group -> [ByteString] -> STM ()
shoutGroupMulti TVar ZREState
s Group
group [ByteString]
mmsg
DoWhisper UUID
uuid ByteString
msg -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ TVar ZREState -> UUID -> ByteString -> STM ()
whisperPeerUUID TVar ZREState
s UUID
uuid ByteString
msg
DoDiscover UUID
uuid Endpoint
endpoint -> do
Maybe (TVar Peer)
mp <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ TVar ZREState -> UUID -> STM (Maybe (TVar Peer))
lookupPeer TVar ZREState
s UUID
uuid
case Maybe (TVar Peer)
mp of
Just TVar Peer
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Maybe (TVar Peer)
Nothing -> do
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ TVar ZREState
-> UUID
-> (UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
-> IO (TVar Peer)
makePeer TVar ZREState
s UUID
uuid forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
MonadIO m =>
Endpoint
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeerFromEndpoint Endpoint
endpoint
DoDebug Bool
bool -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s forall a b. (a -> b) -> a -> b
$ \ZREState
x -> ZREState
x { zreDebug :: Bool
zreDebug = Bool
bool }
API
DoQuit -> do
let chk :: IO Bool
chk = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
ZREState
s' <- forall a. TVar a -> STM a
readTVar TVar ZREState
s
[Bool]
pqs <- forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM (forall k a. Map k a -> [(k, a)]
M.toList forall a b. (a -> b) -> a -> b
$ ZREState -> Peers
zrePeers ZREState
s') forall a b. (a -> b) -> a -> b
$ \(UUID
_, TVar Peer
tp) -> forall a. TVar a -> STM a
readTVar TVar Peer
tp forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. TBQueue a -> STM Bool
isEmptyTBQueue forall b c a. (b -> c) -> (a -> b) -> a -> c
. Peer -> TBQueue ZRECmd
peerQueue
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *). Foldable t => t Bool -> Bool
and [Bool]
pqs
let loop :: IO ()
loop = do
Bool
res <- IO Bool
chk
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
res forall a b. (a -> b) -> a -> b
$ GroupSeq -> IO ()
threadDelay (forall a. RealFrac a => a -> GroupSeq
sec (Float
0.1 :: Float)) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop
IO ()
loop
where
incGroupSeq :: STM ()
incGroupSeq = forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s forall a b. (a -> b) -> a -> b
$ \ZREState
x -> ZREState
x { zreGroupSeq :: GroupSeq
zreGroupSeq = (ZREState -> GroupSeq
zreGroupSeq ZREState
x) forall a. Num a => a -> a -> a
+ GroupSeq
1 }
inbox :: TVar ZREState -> Z.ZREMsg -> IO ()
inbox :: TVar ZREState -> ZREMsg -> IO ()
inbox TVar ZREState
s msg :: ZREMsg
msg@Z.ZREMsg{GroupSeq
Maybe UTCTime
Maybe UUID
ZRECmd
msgCmd :: ZREMsg -> ZRECmd
msgTime :: ZREMsg -> Maybe UTCTime
msgSeq :: ZREMsg -> GroupSeq
msgFrom :: ZREMsg -> Maybe UUID
msgCmd :: ZRECmd
msgTime :: Maybe UTCTime
msgSeq :: GroupSeq
msgFrom :: Maybe UUID
..} = do
let uuid :: UUID
uuid = forall a. HasCallStack => Maybe a -> a
fromJust Maybe UUID
msgFrom
Maybe (TVar Peer)
mpt <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ TVar ZREState -> UUID -> STM (Maybe (TVar Peer))
lookupPeer TVar ZREState
s UUID
uuid
case Maybe (TVar Peer)
mpt of
Maybe (TVar Peer)
Nothing -> do
case ZRECmd
msgCmd of
h :: ZRECmd
h@(Z.Hello Endpoint
_endpoint Groups
_groups GroupSeq
_groupSeq ByteString
_name Headers
_headers) -> do
TVar Peer
peer <- TVar ZREState
-> UUID
-> (UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
-> IO (TVar Peer)
makePeer TVar ZREState
s UUID
uuid forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
MonadIO m =>
ZRECmd
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeerFromHello ZRECmd
h
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer forall a b. (a -> b) -> a -> b
$ \Peer
x -> Peer
x { peerSeq :: GroupSeq
peerSeq = (Peer -> GroupSeq
peerSeq Peer
x) forall a. Num a => a -> a -> a
+ GroupSeq
1 }
ZRECmd
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
(Just TVar Peer
peer) -> do
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ TVar Peer -> UTCTime -> STM ()
updateLastHeard TVar Peer
peer forall a b. (a -> b) -> a -> b
$ forall a. HasCallStack => Maybe a -> a
fromJust Maybe UTCTime
msgTime
Peer
p <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> STM a
readTVar TVar Peer
peer
case Peer -> GroupSeq
peerSeq Peer
p forall a. Eq a => a -> a -> Bool
== GroupSeq
msgSeq of
Bool
True -> do
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer forall a b. (a -> b) -> a -> b
$ \Peer
x -> Peer
x { peerSeq :: GroupSeq
peerSeq = (Peer -> GroupSeq
peerSeq Peer
x) forall a. Num a => a -> a -> a
+ GroupSeq
1 }
TVar ZREState -> ZREMsg -> TVar Peer -> IO ()
handleCmd TVar ZREState
s ZREMsg
msg TVar Peer
peer
Bool
_ -> do
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s ByteString
"sequence mismatch, recreating peer"
UUID -> ZRECmd -> IO ()
recreatePeer (Peer -> UUID
peerUUID Peer
p) ZRECmd
msgCmd
where
recreatePeer :: UUID -> ZRECmd -> IO ()
recreatePeer UUID
uuid h :: ZRECmd
h@(Z.Hello Endpoint
_ Groups
_ GroupSeq
_ ByteString
_ Headers
_) = do
TVar ZREState -> UUID -> IO ()
destroyPeer TVar ZREState
s UUID
uuid
TVar Peer
peer <- TVar ZREState
-> UUID
-> (UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
-> IO (TVar Peer)
makePeer TVar ZREState
s UUID
uuid forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
MonadIO m =>
ZRECmd
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeerFromHello ZRECmd
h
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer forall a b. (a -> b) -> a -> b
$ \Peer
x -> Peer
x { peerSeq :: GroupSeq
peerSeq = (Peer -> GroupSeq
peerSeq Peer
x) forall a. Num a => a -> a -> a
+ GroupSeq
1 }
recreatePeer UUID
uuid ZRECmd
_ = TVar ZREState -> UUID -> IO ()
destroyPeer TVar ZREState
s UUID
uuid
handleCmd :: TVar ZREState -> Z.ZREMsg -> TVar Peer -> IO ()
handleCmd :: TVar ZREState -> ZREMsg -> TVar Peer -> IO ()
handleCmd TVar ZREState
s Z.ZREMsg{msgFrom :: ZREMsg -> Maybe UUID
msgFrom=(Just UUID
from), msgTime :: ZREMsg -> Maybe UTCTime
msgTime=(Just UTCTime
time), msgCmd :: ZREMsg -> ZRECmd
msgCmd=ZRECmd
cmd} TVar Peer
peer = do
case ZRECmd
cmd of
(Z.Whisper [ByteString]
content) -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
TVar ZREState -> Event -> STM ()
emit TVar ZREState
s forall a b. (a -> b) -> a -> b
$ UUID -> [ByteString] -> UTCTime -> Event
Whisper UUID
from [ByteString]
content UTCTime
time
TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s forall a b. (a -> b) -> a -> b
$ ByteString -> [ByteString] -> ByteString
B.intercalate ByteString
" " [ByteString
"whisper", [ByteString] -> ByteString
B.concat [ByteString]
content]
Z.Shout Group
group [ByteString]
content -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
TVar ZREState -> Event -> STM ()
emit TVar ZREState
s forall a b. (a -> b) -> a -> b
$ UUID -> Group -> [ByteString] -> UTCTime -> Event
Shout UUID
from Group
group [ByteString]
content UTCTime
time
TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s forall a b. (a -> b) -> a -> b
$ ByteString -> [ByteString] -> ByteString
B.intercalate ByteString
" " [ByteString
"shout for group", Group -> ByteString
Z.unGroup Group
group, ByteString
">", [ByteString] -> ByteString
B.concat [ByteString]
content]
Z.Join Group
group GroupSeq
groupSeq -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
TVar ZREState -> TVar Peer -> Group -> GroupSeq -> STM ()
joinGroup TVar ZREState
s TVar Peer
peer Group
group GroupSeq
groupSeq
TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s forall a b. (a -> b) -> a -> b
$ ByteString -> [ByteString] -> ByteString
B.intercalate ByteString
" " [ByteString
"join", Group -> ByteString
Z.unGroup Group
group, forall a. Show a => a -> ByteString
bshow GroupSeq
groupSeq]
Z.Leave Group
group GroupSeq
groupSeq -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
TVar ZREState -> TVar Peer -> Group -> GroupSeq -> STM ()
leaveGroup TVar ZREState
s TVar Peer
peer Group
group GroupSeq
groupSeq
TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s forall a b. (a -> b) -> a -> b
$ ByteString -> [ByteString] -> ByteString
B.intercalate ByteString
" " [ByteString
"leave", Group -> ByteString
Z.unGroup Group
group, forall a. Show a => a -> ByteString
bshow GroupSeq
groupSeq]
ZRECmd
Z.Ping -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
TVar Peer -> ZRECmd -> STM ()
msgPeer TVar Peer
peer ZRECmd
Z.PingOk
Peer
p <- forall a. TVar a -> STM a
readTVar TVar Peer
peer
TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
B.unwords [ByteString
"sending pings to ", forall a. Show a => a -> ByteString
bshow Peer
p]
ZRECmd
Z.PingOk -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Z.Hello Endpoint
endpoint Groups
groups GroupSeq
groupSeq ByteString
name Headers
headers -> do
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
TVar ZREState -> TVar Peer -> Groups -> GroupSeq -> STM ()
joinGroups TVar ZREState
s TVar Peer
peer Groups
groups GroupSeq
groupSeq
TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer forall a b. (a -> b) -> a -> b
$ \Peer
x -> Peer
x {
peerName :: Maybe ByteString
peerName = forall a. a -> Maybe a
Just ByteString
name
, peerHeaders :: Headers
peerHeaders = Headers
headers
}
Peer
p <- forall a. TVar a -> STM a
readTVar TVar Peer
peer
TVar ZREState -> Event -> STM ()
emit TVar ZREState
s forall a b. (a -> b) -> a -> b
$ UUID -> ByteString -> Groups -> Headers -> Endpoint -> Event
Ready (Peer -> UUID
peerUUID Peer
p) ByteString
name Groups
groups Headers
headers Endpoint
endpoint
TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s forall a b. (a -> b) -> a -> b
$ ByteString
"update peer"
forall (m :: * -> *) a. Monad m => a -> m a
return ()
handleCmd TVar ZREState
_ ZREMsg
_ TVar Peer
_ = forall (m :: * -> *) a. Monad m => a -> m a
return ()