{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
module Network.ZRE.Peer (
newPeerFromBeacon
, newPeerFromHello
, newPeerFromEndpoint
, makePeer
, destroyPeer
, msgPeer
, msgPeerUUID
, msgAll
, msgGroup
, joinGroup
, joinGroups
, leaveGroup
, leaveGroups
, lookupPeer
, updatePeer
, updateLastHeard
, printPeer
, printAll
, msgAllJoin
, msgAllLeave
, shoutGroup
, shoutGroupMulti
, whisperPeerUUID
) where
import Control.Monad
import Control.Monad.IO.Class
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import qualified Data.Map as M
import qualified Data.Set as Set
import qualified Data.ByteString.Char8 as B
import Data.ByteString (ByteString)
import Data.Time.Clock
import Data.UUID
import Data.ZRE()
import System.ZMQ4.Endpoint
import Data.ZRE
import Network.ZRE.Types hiding (Shout, Whisper)
import Network.ZRE.Utils
import Network.ZRE.ZMQ (zreDealer)
printPeer :: Peer -> ByteString
printPeer :: Peer -> ByteString
printPeer Peer{Seq
Maybe (Async ())
Maybe ByteString
Headers
Groups
TBQueue ZRECmd
UTCTime
UUID
Endpoint
peerLastHeard :: Peer -> UTCTime
peerQueue :: Peer -> TBQueue ZRECmd
peerAsyncPing :: Peer -> Maybe (Async ())
peerAsync :: Peer -> Maybe (Async ())
peerHeaders :: Peer -> Headers
peerName :: Peer -> Maybe ByteString
peerGroupSeq :: Peer -> Seq
peerGroups :: Peer -> Groups
peerSeq :: Peer -> Seq
peerUUID :: Peer -> UUID
peerEndpoint :: Peer -> Endpoint
peerLastHeard :: UTCTime
peerQueue :: TBQueue ZRECmd
peerAsyncPing :: Maybe (Async ())
peerAsync :: Maybe (Async ())
peerHeaders :: Headers
peerName :: Maybe ByteString
peerGroupSeq :: Seq
peerGroups :: Groups
peerSeq :: Seq
peerUUID :: UUID
peerEndpoint :: Endpoint
..} = ByteString -> [ByteString] -> ByteString
B.intercalate ByteString
" "
[ByteString
"Peer",
forall a. Show a => a -> ByteString
bshow Maybe ByteString
peerName,
Endpoint -> ByteString
pEndpoint Endpoint
peerEndpoint,
UUID -> ByteString
toASCIIBytes UUID
peerUUID,
forall a. Show a => a -> ByteString
bshow Seq
peerSeq,
forall a. Show a => a -> ByteString
bshow Seq
peerGroupSeq,
forall a. Show a => a -> ByteString
bshow Groups
peerGroups,
forall a. Show a => a -> ByteString
bshow UTCTime
peerLastHeard]
newPeer :: MonadIO m
=> TVar ZREState
-> Endpoint
-> UUID
-> Set.Set Group
-> GroupSeq
-> Maybe Name
-> Headers
-> UTCTime
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeer :: forall (m :: * -> *) a b.
MonadIO m =>
TVar ZREState
-> Endpoint
-> UUID
-> Groups
-> Seq
-> Maybe ByteString
-> Headers
-> UTCTime
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeer TVar ZREState
s Endpoint
endpoint UUID
uuid Groups
groups Seq
groupSeq Maybe ByteString
mname Headers
headers UTCTime
t = do
ZREState
st <- forall a. TVar a -> STM a
readTVar TVar ZREState
s
TBQueue ZRECmd
peerQ <- forall a. Natural -> STM (TBQueue a)
newTBQueue Natural
100
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue ZRECmd
peerQ forall a b. (a -> b) -> a -> b
$ Endpoint -> Groups -> Seq -> ByteString -> Headers -> ZRECmd
Hello (ZREState -> Endpoint
zreEndpoint ZREState
st) (ZREState -> Groups
zreGroups ZREState
st) (ZREState -> Seq
zreGroupSeq ZREState
st) (ZREState -> ByteString
zreName ZREState
st) (ZREState -> Headers
zreHeaders ZREState
st)
let p :: Peer
p = Peer {
peerEndpoint :: Endpoint
peerEndpoint = Endpoint
endpoint
, peerUUID :: UUID
peerUUID = UUID
uuid
, peerSeq :: Seq
peerSeq = Seq
1
, peerGroups :: Groups
peerGroups = Groups
groups
, peerGroupSeq :: Seq
peerGroupSeq = Seq
0
, peerName :: Maybe ByteString
peerName = Maybe ByteString
mname
, peerHeaders :: Headers
peerHeaders = Headers
headers
, peerAsync :: Maybe (Async ())
peerAsync = forall a. Maybe a
Nothing
, peerAsyncPing :: Maybe (Async ())
peerAsyncPing = forall a. Maybe a
Nothing
, peerQueue :: TBQueue ZRECmd
peerQueue = TBQueue ZRECmd
peerQ
, peerLastHeard :: UTCTime
peerLastHeard = UTCTime
t }
TVar Peer
np <- forall a. a -> STM (TVar a)
newTVar forall a b. (a -> b) -> a -> b
$ Peer
p
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s forall a b. (a -> b) -> a -> b
$ \ZREState
x -> ZREState
x { zrePeers :: Peers
zrePeers = forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert UUID
uuid TVar Peer
np (ZREState -> Peers
zrePeers ZREState
x) }
TVar ZREState -> Event -> STM ()
emit TVar ZREState
s forall a b. (a -> b) -> a -> b
$ UUID -> Maybe ByteString -> Groups -> Headers -> Endpoint -> Event
New UUID
uuid Maybe ByteString
mname Groups
groups Headers
headers Endpoint
endpoint
case Maybe ByteString
mname of
(Just ByteString
name) -> TVar ZREState -> Event -> STM ()
emit TVar ZREState
s forall a b. (a -> b) -> a -> b
$ UUID -> ByteString -> Groups -> Headers -> Endpoint -> Event
Ready UUID
uuid ByteString
name Groups
groups Headers
headers Endpoint
endpoint
Maybe ByteString
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
TVar ZREState -> TVar Peer -> Groups -> Seq -> STM ()
joinGroups TVar ZREState
s TVar Peer
np Groups
groups Seq
groupSeq
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ (TVar Peer
np, forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadIO m =>
Endpoint -> ByteString -> TBQueue ZRECmd -> m a
zreDealer Endpoint
endpoint (UUID -> ByteString
uuidByteString forall a b. (a -> b) -> a -> b
$ ZREState -> UUID
zreUUID ZREState
st) TBQueue ZRECmd
peerQ, forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ forall b. TVar ZREState -> TVar Peer -> IO b
pinger TVar ZREState
s TVar Peer
np)
newPeerFromBeacon :: MonadIO m
=> Address
-> Port
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeerFromBeacon :: forall (m :: * -> *) a b.
MonadIO m =>
ByteString
-> Seq
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeerFromBeacon ByteString
addr Seq
port UTCTime
t UUID
uuid TVar ZREState
s = do
TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s ByteString
"New peer from beacon"
let endpoint :: Endpoint
endpoint = ByteString -> Seq -> Endpoint
newTCPEndpoint ByteString
addr Seq
port
forall (m :: * -> *) a b.
MonadIO m =>
TVar ZREState
-> Endpoint
-> UUID
-> Groups
-> Seq
-> Maybe ByteString
-> Headers
-> UTCTime
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeer TVar ZREState
s Endpoint
endpoint UUID
uuid (forall a. Set a
Set.empty :: Groups) Seq
0 forall a. Maybe a
Nothing forall k a. Map k a
M.empty UTCTime
t
newPeerFromHello :: MonadIO m
=> ZRECmd
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeerFromHello :: forall (m :: * -> *) a b.
MonadIO m =>
ZRECmd
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeerFromHello (Hello Endpoint
endpoint Groups
groups Seq
groupSeq ByteString
name Headers
headers) UTCTime
t UUID
uuid TVar ZREState
s = do
TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s ByteString
"New peer from hello"
forall (m :: * -> *) a b.
MonadIO m =>
TVar ZREState
-> Endpoint
-> UUID
-> Groups
-> Seq
-> Maybe ByteString
-> Headers
-> UTCTime
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeer TVar ZREState
s Endpoint
endpoint UUID
uuid Groups
groups Seq
groupSeq (forall a. a -> Maybe a
Just ByteString
name) Headers
headers UTCTime
t
newPeerFromHello ZRECmd
_ UTCTime
_ UUID
_ TVar ZREState
_ = forall a. HasCallStack => [Char] -> a
error [Char]
"not a hello message"
newPeerFromEndpoint :: MonadIO m
=> Endpoint
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeerFromEndpoint :: forall (m :: * -> *) a b.
MonadIO m =>
Endpoint
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeerFromEndpoint Endpoint
endpoint UTCTime
t UUID
uuid TVar ZREState
s = do
TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s ByteString
"New peer from endpoint"
forall (m :: * -> *) a b.
MonadIO m =>
TVar ZREState
-> Endpoint
-> UUID
-> Groups
-> Seq
-> Maybe ByteString
-> Headers
-> UTCTime
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeer TVar ZREState
s Endpoint
endpoint UUID
uuid (forall a. Set a
Set.empty :: Groups) Seq
0 forall a. Maybe a
Nothing forall k a. Map k a
M.empty UTCTime
t
makePeer :: TVar ZREState
-> UUID
-> (UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
-> IO (TVar Peer)
makePeer :: TVar ZREState
-> UUID
-> (UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
-> IO (TVar Peer)
makePeer TVar ZREState
s UUID
uuid UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (IO ()), Maybe (IO ()))
newPeerFn = do
UTCTime
t <- IO UTCTime
getCurrentTime
(TVar Peer, Maybe (IO ()), Maybe (IO ()))
res <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
ZREState
st <- forall a. TVar a -> STM a
readTVar TVar ZREState
s
case forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup UUID
uuid forall a b. (a -> b) -> a -> b
$ ZREState -> Peers
zrePeers ZREState
st of
(Just TVar Peer
peer) -> forall (m :: * -> *) a. Monad m => a -> m a
return (TVar Peer
peer, forall a. Maybe a
Nothing, forall a. Maybe a
Nothing)
Maybe (TVar Peer)
Nothing -> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (IO ()), Maybe (IO ()))
newPeerFn UTCTime
t UUID
uuid TVar ZREState
s
case (TVar Peer, Maybe (IO ()), Maybe (IO ()))
res of
(TVar Peer
peer, Just IO ()
deal, Just IO ()
ping) -> do
Async ()
a <- forall a. IO a -> IO (Async a)
async IO ()
deal
Async ()
b <- forall a. IO a -> IO (Async a)
async IO ()
ping
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer forall a b. (a -> b) -> a -> b
$ \Peer
x -> Peer
x { peerAsync :: Maybe (Async ())
peerAsync = (forall a. a -> Maybe a
Just Async ()
a) }
TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer forall a b. (a -> b) -> a -> b
$ \Peer
x -> Peer
x { peerAsyncPing :: Maybe (Async ())
peerAsyncPing = (forall a. a -> Maybe a
Just Async ()
b) }
forall (m :: * -> *) a. Monad m => a -> m a
return TVar Peer
peer
(TVar Peer
peer, Maybe (IO ())
_, Maybe (IO ())
_) -> forall (m :: * -> *) a. Monad m => a -> m a
return TVar Peer
peer
destroyPeer :: TVar ZREState -> UUID -> IO ()
destroyPeer :: TVar ZREState -> UUID -> IO ()
destroyPeer TVar ZREState
s UUID
uuid = do
[Maybe (Async ())]
asyncs <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
Maybe (TVar Peer)
mpt <- TVar ZREState -> UUID -> STM (Maybe (TVar Peer))
lookupPeer TVar ZREState
s UUID
uuid
case Maybe (TVar Peer)
mpt of
Maybe (TVar Peer)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return []
(Just TVar Peer
peer) -> do
Peer{Seq
Maybe (Async ())
Maybe ByteString
Headers
Groups
TBQueue ZRECmd
UTCTime
UUID
Endpoint
peerLastHeard :: UTCTime
peerQueue :: TBQueue ZRECmd
peerAsyncPing :: Maybe (Async ())
peerAsync :: Maybe (Async ())
peerHeaders :: Headers
peerName :: Maybe ByteString
peerGroupSeq :: Seq
peerGroups :: Groups
peerSeq :: Seq
peerUUID :: UUID
peerEndpoint :: Endpoint
peerLastHeard :: Peer -> UTCTime
peerQueue :: Peer -> TBQueue ZRECmd
peerAsyncPing :: Peer -> Maybe (Async ())
peerAsync :: Peer -> Maybe (Async ())
peerHeaders :: Peer -> Headers
peerName :: Peer -> Maybe ByteString
peerGroupSeq :: Peer -> Seq
peerGroups :: Peer -> Groups
peerSeq :: Peer -> Seq
peerUUID :: Peer -> UUID
peerEndpoint :: Peer -> Endpoint
..} <- forall a. TVar a -> STM a
readTVar TVar Peer
peer
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s forall a b. (a -> b) -> a -> b
$ \ZREState
x -> ZREState
x { zrePeers :: Peers
zrePeers = forall k a. Ord k => k -> Map k a -> Map k a
M.delete UUID
uuid (ZREState -> Peers
zrePeers ZREState
x) }
TVar ZREState -> TVar Peer -> Groups -> Seq -> STM ()
leaveGroups TVar ZREState
s TVar Peer
peer Groups
peerGroups Seq
peerGroupSeq
TVar ZREState -> Event -> STM ()
emit TVar ZREState
s forall a b. (a -> b) -> a -> b
$ UUID -> Maybe ByteString -> Event
Quit UUID
uuid Maybe ByteString
peerName
forall (m :: * -> *) a. Monad m => a -> m a
return [Maybe (Async ())
peerAsync, Maybe (Async ())
peerAsyncPing]
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ forall {a}. Maybe (Async a) -> IO ()
cancelM [Maybe (Async ())]
asyncs
where
cancelM :: Maybe (Async a) -> IO ()
cancelM Maybe (Async a)
Nothing = forall (m :: * -> *) a. Monad m => a -> m a
return ()
cancelM (Just Async a
a) = forall a. Async a -> IO ()
cancel Async a
a
pinger :: TVar ZREState -> TVar Peer -> IO b
pinger :: forall b. TVar ZREState -> TVar Peer -> IO b
pinger TVar ZREState
s TVar Peer
peer = forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
p :: Peer
p@Peer{Seq
Maybe (Async ())
Maybe ByteString
Headers
Groups
TBQueue ZRECmd
UTCTime
UUID
Endpoint
peerLastHeard :: UTCTime
peerQueue :: TBQueue ZRECmd
peerAsyncPing :: Maybe (Async ())
peerAsync :: Maybe (Async ())
peerHeaders :: Headers
peerName :: Maybe ByteString
peerGroupSeq :: Seq
peerGroups :: Groups
peerSeq :: Seq
peerUUID :: UUID
peerEndpoint :: Endpoint
peerLastHeard :: Peer -> UTCTime
peerQueue :: Peer -> TBQueue ZRECmd
peerAsyncPing :: Peer -> Maybe (Async ())
peerAsync :: Peer -> Maybe (Async ())
peerHeaders :: Peer -> Headers
peerName :: Peer -> Maybe ByteString
peerGroupSeq :: Peer -> Seq
peerGroups :: Peer -> Groups
peerSeq :: Peer -> Seq
peerUUID :: Peer -> UUID
peerEndpoint :: Peer -> Endpoint
..} <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> STM a
readTVar TVar Peer
peer
ZRECfg
cfg <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ ZREState -> ZRECfg
zreCfg forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TVar a -> STM a
readTVar TVar ZREState
s
UTCTime
now <- IO UTCTime
getCurrentTime
if UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
now UTCTime
peerLastHeard forall a. Ord a => a -> a -> Bool
> (forall a b. (Real a, Fractional b) => a -> b
realToFrac forall a b. (a -> b) -> a -> b
$ ZRECfg -> Float
zreDeadPeriod ZRECfg
cfg)
then do
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
B.unwords [ByteString
"Peer over deadPeriod, destroying", forall a. Show a => a -> ByteString
bshow Peer
p]
TVar ZREState -> UUID -> IO ()
destroyPeer TVar ZREState
s UUID
peerUUID
else do
let tdiff :: NominalDiffTime
tdiff = UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
now UTCTime
peerLastHeard
if NominalDiffTime
tdiff forall a. Ord a => a -> a -> Bool
> (forall a b. (Real a, Fractional b) => a -> b
realToFrac forall a b. (a -> b) -> a -> b
$ ZRECfg -> Float
zreQuietPeriod ZRECfg
cfg)
then do
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
B.unwords [ByteString
"Peer over quietPeriod, sending hugz", forall a. Show a => a -> ByteString
bshow Peer
p]
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue ZRECmd
peerQueue forall a b. (a -> b) -> a -> b
$ ZRECmd
Ping
Seq -> IO ()
threadDelay forall a b. (a -> b) -> a -> b
$ forall a. RealFrac a => a -> Seq
sec (ZRECfg -> Float
zreQuietPingRate ZRECfg
cfg)
else do
Seq -> IO ()
threadDelay forall a b. (a -> b) -> a -> b
$ forall a. RealFrac a => a -> Seq
sec forall a b. (a -> b) -> a -> b
$ (forall a b. (Real a, Fractional b) => a -> b
realToFrac forall a b. (a -> b) -> a -> b
$ ZRECfg -> Float
zreQuietPeriod ZRECfg
cfg) forall a. Num a => a -> a -> a
- NominalDiffTime
tdiff
lookupPeer :: TVar ZREState -> UUID -> STM (Maybe (TVar Peer))
lookupPeer :: TVar ZREState -> UUID -> STM (Maybe (TVar Peer))
lookupPeer TVar ZREState
s UUID
uuid = do
ZREState
st <- forall a. TVar a -> STM a
readTVar TVar ZREState
s
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup UUID
uuid forall a b. (a -> b) -> a -> b
$ ZREState -> Peers
zrePeers ZREState
st
updatePeer :: TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer :: TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer Peer -> Peer
fn = forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar Peer
peer Peer -> Peer
fn
updateLastHeard :: TVar Peer -> UTCTime -> STM ()
updateLastHeard :: TVar Peer -> UTCTime -> STM ()
updateLastHeard TVar Peer
peer UTCTime
val = TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer forall a b. (a -> b) -> a -> b
$ \Peer
x -> Peer
x { peerLastHeard :: UTCTime
peerLastHeard = UTCTime
val }
joinGroup :: TVar ZREState -> TVar Peer -> Group -> GroupSeq -> STM ()
joinGroup :: TVar ZREState -> TVar Peer -> Group -> Seq -> STM ()
joinGroup TVar ZREState
s TVar Peer
peer Group
group Seq
groupSeq = do
TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer forall a b. (a -> b) -> a -> b
$ \Peer
x -> Peer
x { peerGroups :: Groups
peerGroups = forall a. Ord a => a -> Set a -> Set a
Set.insert Group
group (Peer -> Groups
peerGroups Peer
x) }
TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer forall a b. (a -> b) -> a -> b
$ \Peer
x -> Peer
x { peerGroupSeq :: Seq
peerGroupSeq = Seq
groupSeq }
Peer
p <- forall a. TVar a -> STM a
readTVar TVar Peer
peer
TVar ZREState -> Event -> STM ()
emit TVar ZREState
s forall a b. (a -> b) -> a -> b
$ UUID -> Group -> Event
GroupJoin (Peer -> UUID
peerUUID Peer
p) Group
group
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s forall a b. (a -> b) -> a -> b
$ \ZREState
x -> ZREState
x { zrePeerGroups :: PeerGroups
zrePeerGroups = forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
M.alter (Peer -> Maybe Peers -> Maybe Peers
f Peer
p) Group
group forall a b. (a -> b) -> a -> b
$ ZREState -> PeerGroups
zrePeerGroups ZREState
x }
where
f :: Peer -> Maybe Peers -> Maybe Peers
f Peer
p Maybe Peers
Nothing = forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [(Peer -> UUID
peerUUID Peer
p, TVar Peer
peer)]
f Peer
p (Just Peers
old) = forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert (Peer -> UUID
peerUUID Peer
p) TVar Peer
peer Peers
old
joinGroups :: TVar ZREState -> TVar Peer -> Set.Set Group -> GroupSeq -> STM ()
joinGroups :: TVar ZREState -> TVar Peer -> Groups -> Seq -> STM ()
joinGroups TVar ZREState
s TVar Peer
peer Groups
groups Seq
groupSeq = do
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\Group
x -> TVar ZREState -> TVar Peer -> Group -> Seq -> STM ()
joinGroup TVar ZREState
s TVar Peer
peer Group
x Seq
groupSeq) forall a b. (a -> b) -> a -> b
$ forall a. Set a -> [a]
Set.toList Groups
groups
leaveGroup :: TVar ZREState -> TVar Peer -> Group -> GroupSeq -> STM ()
leaveGroup :: TVar ZREState -> TVar Peer -> Group -> Seq -> STM ()
leaveGroup TVar ZREState
s TVar Peer
peer Group
group Seq
groupSeq = do
TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer forall a b. (a -> b) -> a -> b
$ \Peer
x -> Peer
x { peerGroups :: Groups
peerGroups = forall a. Ord a => a -> Set a -> Set a
Set.delete Group
group (Peer -> Groups
peerGroups Peer
x) }
TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer forall a b. (a -> b) -> a -> b
$ \Peer
x -> Peer
x { peerGroupSeq :: Seq
peerGroupSeq = Seq
groupSeq }
Peer
p <- forall a. TVar a -> STM a
readTVar TVar Peer
peer
TVar ZREState -> Event -> STM ()
emit TVar ZREState
s forall a b. (a -> b) -> a -> b
$ UUID -> Group -> Event
GroupLeave (Peer -> UUID
peerUUID Peer
p) Group
group
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s forall a b. (a -> b) -> a -> b
$ \ZREState
x -> ZREState
x { zrePeerGroups :: PeerGroups
zrePeerGroups = forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
M.alter (forall {a}. Peer -> Maybe (Map UUID a) -> Maybe (Map UUID a)
f Peer
p) Group
group forall a b. (a -> b) -> a -> b
$ ZREState -> PeerGroups
zrePeerGroups ZREState
x }
where
f :: Peer -> Maybe (Map UUID a) -> Maybe (Map UUID a)
f Peer
_ Maybe (Map UUID a)
Nothing = forall a. Maybe a
Nothing
f Peer
p (Just Map UUID a
old) = forall {k} {a}. Map k a -> Maybe (Map k a)
nEmpty forall a b. (a -> b) -> a -> b
$ forall k a. Ord k => k -> Map k a -> Map k a
M.delete (Peer -> UUID
peerUUID Peer
p) Map UUID a
old
nEmpty :: Map k a -> Maybe (Map k a)
nEmpty Map k a
pmap | forall k a. Map k a -> Bool
M.null Map k a
pmap = forall a. Maybe a
Nothing
nEmpty Map k a
pmap = forall a. a -> Maybe a
Just Map k a
pmap
leaveGroups :: TVar ZREState -> TVar Peer -> Set.Set Group -> GroupSeq -> STM ()
leaveGroups :: TVar ZREState -> TVar Peer -> Groups -> Seq -> STM ()
leaveGroups TVar ZREState
s TVar Peer
peer Groups
groups Seq
groupSeq = do
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\Group
x -> TVar ZREState -> TVar Peer -> Group -> Seq -> STM ()
leaveGroup TVar ZREState
s TVar Peer
peer Group
x Seq
groupSeq) forall a b. (a -> b) -> a -> b
$ forall a. Set a -> [a]
Set.toList Groups
groups
msgPeer :: TVar Peer -> ZRECmd -> STM ()
msgPeer :: TVar Peer -> ZRECmd -> STM ()
msgPeer TVar Peer
peer ZRECmd
msg = do
Peer
p <- forall a. TVar a -> STM a
readTVar TVar Peer
peer
forall a. TBQueue a -> a -> STM ()
writeTBQueue (Peer -> TBQueue ZRECmd
peerQueue Peer
p) ZRECmd
msg
msgPeerUUID :: TVar ZREState -> UUID -> ZRECmd -> STM ()
msgPeerUUID :: TVar ZREState -> UUID -> ZRECmd -> STM ()
msgPeerUUID TVar ZREState
s UUID
uuid ZRECmd
msg = do
ZREState
st <- forall a. TVar a -> STM a
readTVar TVar ZREState
s
case forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup UUID
uuid forall a b. (a -> b) -> a -> b
$ ZREState -> Peers
zrePeers ZREState
st of
Maybe (TVar Peer)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
(Just TVar Peer
peer) -> do
TVar Peer -> ZRECmd -> STM ()
msgPeer TVar Peer
peer ZRECmd
msg
forall (m :: * -> *) a. Monad m => a -> m a
return ()
msgAll :: TVar ZREState -> ZRECmd -> STM ()
msgAll :: TVar ZREState -> ZRECmd -> STM ()
msgAll TVar ZREState
s ZRECmd
msg = do
ZREState
st <- forall a. TVar a -> STM a
readTVar TVar ZREState
s
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (forall a b c. (a -> b -> c) -> b -> a -> c
flip TVar Peer -> ZRECmd -> STM ()
msgPeer ZRECmd
msg) (ZREState -> Peers
zrePeers ZREState
st)
msgGroup :: TVar ZREState -> Group -> ZRECmd -> STM ()
msgGroup :: TVar ZREState -> Group -> ZRECmd -> STM ()
msgGroup TVar ZREState
s Group
groupname ZRECmd
msg = do
ZREState
st <- forall a. TVar a -> STM a
readTVar TVar ZREState
s
case forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup Group
groupname forall a b. (a -> b) -> a -> b
$ ZREState -> PeerGroups
zrePeerGroups ZREState
st of
Maybe Peers
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
(Just Peers
group) -> do
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (forall a b c. (a -> b -> c) -> b -> a -> c
flip TVar Peer -> ZRECmd -> STM ()
msgPeer ZRECmd
msg) Peers
group
shoutGroup :: TVar ZREState -> Group -> ByteString -> STM ()
shoutGroup :: TVar ZREState -> Group -> ByteString -> STM ()
shoutGroup TVar ZREState
s Group
group ByteString
msg = TVar ZREState -> Group -> ZRECmd -> STM ()
msgGroup TVar ZREState
s Group
group forall a b. (a -> b) -> a -> b
$ Group -> [ByteString] -> ZRECmd
Shout Group
group [ByteString
msg]
shoutGroupMulti :: TVar ZREState -> Group -> Content -> STM ()
shoutGroupMulti :: TVar ZREState -> Group -> [ByteString] -> STM ()
shoutGroupMulti TVar ZREState
s Group
group [ByteString]
mmsg = TVar ZREState -> Group -> ZRECmd -> STM ()
msgGroup TVar ZREState
s Group
group forall a b. (a -> b) -> a -> b
$ Group -> [ByteString] -> ZRECmd
Shout Group
group [ByteString]
mmsg
msgAllJoin :: TVar ZREState -> Group -> GroupSeq -> STM ()
msgAllJoin :: TVar ZREState -> Group -> Seq -> STM ()
msgAllJoin TVar ZREState
s Group
group Seq
sq = TVar ZREState -> ZRECmd -> STM ()
msgAll TVar ZREState
s forall a b. (a -> b) -> a -> b
$ Group -> Seq -> ZRECmd
Join Group
group Seq
sq
msgAllLeave :: TVar ZREState -> Group -> GroupSeq -> STM ()
msgAllLeave :: TVar ZREState -> Group -> Seq -> STM ()
msgAllLeave TVar ZREState
s Group
group Seq
sq = TVar ZREState -> ZRECmd -> STM ()
msgAll TVar ZREState
s forall a b. (a -> b) -> a -> b
$ Group -> Seq -> ZRECmd
Leave Group
group Seq
sq
whisperPeerUUID :: TVar ZREState -> UUID -> ByteString -> STM ()
whisperPeerUUID :: TVar ZREState -> UUID -> ByteString -> STM ()
whisperPeerUUID TVar ZREState
s UUID
uuid ByteString
msg = TVar ZREState -> UUID -> ZRECmd -> STM ()
msgPeerUUID TVar ZREState
s UUID
uuid forall a b. (a -> b) -> a -> b
$ [ByteString] -> ZRECmd
Whisper [ByteString
msg]
printPeers :: M.Map k (TVar Peer) -> IO ()
printPeers :: forall k. Map k (TVar Peer) -> IO ()
printPeers Map k (TVar Peer)
x = do
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ TVar Peer -> IO ()
ePrint forall a b. (a -> b) -> a -> b
$ forall k a. Map k a -> [a]
M.elems Map k (TVar Peer)
x
where
ePrint :: TVar Peer -> IO ()
ePrint TVar Peer
pt = do
Peer
p <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> STM a
readTVar TVar Peer
pt
ByteString -> IO ()
B.putStrLn forall a b. (a -> b) -> a -> b
$ Peer -> ByteString
printPeer Peer
p
printGroup :: (Group, M.Map k (TVar Peer)) -> IO ()
printGroup :: forall k. (Group, Map k (TVar Peer)) -> IO ()
printGroup (Group
g, Map k (TVar Peer)
v) = do
ByteString -> IO ()
B.putStrLn forall a b. (a -> b) -> a -> b
$ ByteString -> [ByteString] -> ByteString
B.intercalate ByteString
" " [ByteString
"group", Group -> ByteString
unGroup Group
g, ByteString
"->"]
forall k. Map k (TVar Peer) -> IO ()
printPeers Map k (TVar Peer)
v
printAll :: TVar ZREState -> IO ()
printAll :: TVar ZREState -> IO ()
printAll TVar ZREState
s = do
ZREState
st <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> STM a
readTVar TVar ZREState
s
forall k. Map k (TVar Peer) -> IO ()
printPeers forall a b. (a -> b) -> a -> b
$ ZREState -> Peers
zrePeers ZREState
st
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ forall k. (Group, Map k (TVar Peer)) -> IO ()
printGroup forall a b. (a -> b) -> a -> b
$ forall k a. Map k a -> [(k, a)]
M.toList forall a b. (a -> b) -> a -> b
$ ZREState -> PeerGroups
zrePeerGroups ZREState
st