{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
module Network.ZRE.Peer (
    newPeerFromBeacon
  , newPeerFromHello
  , newPeerFromEndpoint
  , makePeer
  , destroyPeer
  , msgPeer
  , msgPeerUUID
  , msgAll
  , msgGroup
  , joinGroup
  , joinGroups
  , leaveGroup
  , leaveGroups
  , lookupPeer
  , updatePeer
  , updateLastHeard
  , printPeer
  , printAll
  , msgAllJoin
  , msgAllLeave
  , shoutGroup
  , shoutGroupMulti
  , whisperPeerUUID
  ) where

import Control.Monad
import Control.Monad.IO.Class
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import qualified Data.Map as M
import qualified Data.Set as Set
import qualified Data.ByteString.Char8 as B
import Data.ByteString (ByteString)
import Data.Time.Clock
import Data.UUID
import Data.ZRE()
import System.ZMQ4.Endpoint
import Data.ZRE
import Network.ZRE.Types hiding (Shout, Whisper)
import Network.ZRE.Utils
import Network.ZRE.ZMQ (zreDealer)

printPeer :: Peer -> ByteString
printPeer :: Peer -> ByteString
printPeer Peer{Seq
Maybe (Async ())
Maybe ByteString
Headers
Groups
TBQueue ZRECmd
UTCTime
UUID
Endpoint
peerLastHeard :: Peer -> UTCTime
peerQueue :: Peer -> TBQueue ZRECmd
peerAsyncPing :: Peer -> Maybe (Async ())
peerAsync :: Peer -> Maybe (Async ())
peerHeaders :: Peer -> Headers
peerName :: Peer -> Maybe ByteString
peerGroupSeq :: Peer -> Seq
peerGroups :: Peer -> Groups
peerSeq :: Peer -> Seq
peerUUID :: Peer -> UUID
peerEndpoint :: Peer -> Endpoint
peerLastHeard :: UTCTime
peerQueue :: TBQueue ZRECmd
peerAsyncPing :: Maybe (Async ())
peerAsync :: Maybe (Async ())
peerHeaders :: Headers
peerName :: Maybe ByteString
peerGroupSeq :: Seq
peerGroups :: Groups
peerSeq :: Seq
peerUUID :: UUID
peerEndpoint :: Endpoint
..} = ByteString -> [ByteString] -> ByteString
B.intercalate ByteString
" "
  [ByteString
"Peer",
    forall a. Show a => a -> ByteString
bshow Maybe ByteString
peerName,
    Endpoint -> ByteString
pEndpoint Endpoint
peerEndpoint,
    UUID -> ByteString
toASCIIBytes UUID
peerUUID,
    forall a. Show a => a -> ByteString
bshow Seq
peerSeq,
    forall a. Show a => a -> ByteString
bshow Seq
peerGroupSeq,
    forall a. Show a => a -> ByteString
bshow Groups
peerGroups,
    forall a. Show a => a -> ByteString
bshow UTCTime
peerLastHeard]

newPeer :: MonadIO m
        => TVar ZREState
        -> Endpoint
        -> UUID
        -> Set.Set Group
        -> GroupSeq
        -> Maybe Name
        -> Headers
        -> UTCTime
        -> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeer :: forall (m :: * -> *) a b.
MonadIO m =>
TVar ZREState
-> Endpoint
-> UUID
-> Groups
-> Seq
-> Maybe ByteString
-> Headers
-> UTCTime
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeer TVar ZREState
s Endpoint
endpoint UUID
uuid Groups
groups Seq
groupSeq Maybe ByteString
mname Headers
headers UTCTime
t = do
  ZREState
st <- forall a. TVar a -> STM a
readTVar TVar ZREState
s
  TBQueue ZRECmd
peerQ <- forall a. Natural -> STM (TBQueue a)
newTBQueue Natural
100
  forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue ZRECmd
peerQ forall a b. (a -> b) -> a -> b
$ Endpoint -> Groups -> Seq -> ByteString -> Headers -> ZRECmd
Hello (ZREState -> Endpoint
zreEndpoint ZREState
st) (ZREState -> Groups
zreGroups ZREState
st) (ZREState -> Seq
zreGroupSeq ZREState
st) (ZREState -> ByteString
zreName ZREState
st) (ZREState -> Headers
zreHeaders ZREState
st)

  let p :: Peer
p = Peer {
              peerEndpoint :: Endpoint
peerEndpoint  = Endpoint
endpoint
            , peerUUID :: UUID
peerUUID      = UUID
uuid
            , peerSeq :: Seq
peerSeq       = Seq
1
            , peerGroups :: Groups
peerGroups    = Groups
groups
            , peerGroupSeq :: Seq
peerGroupSeq  = Seq
0
            , peerName :: Maybe ByteString
peerName      = Maybe ByteString
mname
            , peerHeaders :: Headers
peerHeaders   = Headers
headers
            , peerAsync :: Maybe (Async ())
peerAsync     = forall a. Maybe a
Nothing
            , peerAsyncPing :: Maybe (Async ())
peerAsyncPing = forall a. Maybe a
Nothing
            , peerQueue :: TBQueue ZRECmd
peerQueue     = TBQueue ZRECmd
peerQ
            , peerLastHeard :: UTCTime
peerLastHeard = UTCTime
t }
  TVar Peer
np <- forall a. a -> STM (TVar a)
newTVar forall a b. (a -> b) -> a -> b
$ Peer
p

  forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s forall a b. (a -> b) -> a -> b
$ \ZREState
x -> ZREState
x { zrePeers :: Peers
zrePeers = forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert UUID
uuid TVar Peer
np (ZREState -> Peers
zrePeers ZREState
x) }

  TVar ZREState -> Event -> STM ()
emit TVar ZREState
s forall a b. (a -> b) -> a -> b
$ UUID -> Maybe ByteString -> Groups -> Headers -> Endpoint -> Event
New UUID
uuid Maybe ByteString
mname Groups
groups Headers
headers Endpoint
endpoint
  case Maybe ByteString
mname of
    (Just ByteString
name) -> TVar ZREState -> Event -> STM ()
emit TVar ZREState
s forall a b. (a -> b) -> a -> b
$ UUID -> ByteString -> Groups -> Headers -> Endpoint -> Event
Ready UUID
uuid ByteString
name Groups
groups Headers
headers Endpoint
endpoint
    Maybe ByteString
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()

  TVar ZREState -> TVar Peer -> Groups -> Seq -> STM ()
joinGroups TVar ZREState
s TVar Peer
np Groups
groups Seq
groupSeq

  forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ (TVar Peer
np, forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadIO m =>
Endpoint -> ByteString -> TBQueue ZRECmd -> m a
zreDealer Endpoint
endpoint (UUID -> ByteString
uuidByteString forall a b. (a -> b) -> a -> b
$ ZREState -> UUID
zreUUID ZREState
st) TBQueue ZRECmd
peerQ, forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ forall b. TVar ZREState -> TVar Peer -> IO b
pinger TVar ZREState
s TVar Peer
np)

newPeerFromBeacon :: MonadIO m
                  => Address
                  -> Port
                  -> UTCTime
                  -> UUID
                  -> TVar ZREState
                  -> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeerFromBeacon :: forall (m :: * -> *) a b.
MonadIO m =>
ByteString
-> Seq
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeerFromBeacon ByteString
addr Seq
port UTCTime
t UUID
uuid TVar ZREState
s = do
  TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s ByteString
"New peer from beacon"
  let endpoint :: Endpoint
endpoint = ByteString -> Seq -> Endpoint
newTCPEndpoint ByteString
addr Seq
port
  forall (m :: * -> *) a b.
MonadIO m =>
TVar ZREState
-> Endpoint
-> UUID
-> Groups
-> Seq
-> Maybe ByteString
-> Headers
-> UTCTime
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeer TVar ZREState
s Endpoint
endpoint UUID
uuid (forall a. Set a
Set.empty :: Groups) Seq
0 forall a. Maybe a
Nothing forall k a. Map k a
M.empty UTCTime
t

newPeerFromHello :: MonadIO m
                 => ZRECmd
                 -> UTCTime
                 -> UUID
                 -> TVar ZREState
                 -> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeerFromHello :: forall (m :: * -> *) a b.
MonadIO m =>
ZRECmd
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeerFromHello (Hello Endpoint
endpoint Groups
groups Seq
groupSeq ByteString
name Headers
headers) UTCTime
t UUID
uuid TVar ZREState
s = do
  TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s ByteString
"New peer from hello"
  forall (m :: * -> *) a b.
MonadIO m =>
TVar ZREState
-> Endpoint
-> UUID
-> Groups
-> Seq
-> Maybe ByteString
-> Headers
-> UTCTime
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeer TVar ZREState
s Endpoint
endpoint UUID
uuid Groups
groups Seq
groupSeq (forall a. a -> Maybe a
Just ByteString
name) Headers
headers UTCTime
t
newPeerFromHello ZRECmd
_ UTCTime
_ UUID
_ TVar ZREState
_ = forall a. HasCallStack => [Char] -> a
error [Char]
"not a hello message"

newPeerFromEndpoint :: MonadIO m
                    => Endpoint
                    -> UTCTime
                    -> UUID
                    -> TVar ZREState
                    -> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeerFromEndpoint :: forall (m :: * -> *) a b.
MonadIO m =>
Endpoint
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeerFromEndpoint Endpoint
endpoint UTCTime
t UUID
uuid TVar ZREState
s = do
  TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s ByteString
"New peer from endpoint"
  forall (m :: * -> *) a b.
MonadIO m =>
TVar ZREState
-> Endpoint
-> UUID
-> Groups
-> Seq
-> Maybe ByteString
-> Headers
-> UTCTime
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeer TVar ZREState
s Endpoint
endpoint UUID
uuid (forall a. Set a
Set.empty :: Groups) Seq
0 forall a. Maybe a
Nothing forall k a. Map k a
M.empty UTCTime
t

makePeer :: TVar ZREState
            -> UUID
            -> (UTCTime
                -> UUID
                -> TVar ZREState
                -> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
            -> IO (TVar Peer)
makePeer :: TVar ZREState
-> UUID
-> (UTCTime
    -> UUID
    -> TVar ZREState
    -> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
-> IO (TVar Peer)
makePeer TVar ZREState
s UUID
uuid UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (IO ()), Maybe (IO ()))
newPeerFn = do
  UTCTime
t <- IO UTCTime
getCurrentTime
  (TVar Peer, Maybe (IO ()), Maybe (IO ()))
res <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
    ZREState
st <- forall a. TVar a -> STM a
readTVar TVar ZREState
s
    case forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup UUID
uuid forall a b. (a -> b) -> a -> b
$ ZREState -> Peers
zrePeers ZREState
st of
      (Just TVar Peer
peer) -> forall (m :: * -> *) a. Monad m => a -> m a
return (TVar Peer
peer, forall a. Maybe a
Nothing, forall a. Maybe a
Nothing)
      Maybe (TVar Peer)
Nothing -> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (IO ()), Maybe (IO ()))
newPeerFn UTCTime
t UUID
uuid TVar ZREState
s

  case (TVar Peer, Maybe (IO ()), Maybe (IO ()))
res of
    -- fixme: clumsy
    (TVar Peer
peer, Just IO ()
deal, Just IO ()
ping) -> do
      Async ()
a <- forall a. IO a -> IO (Async a)
async IO ()
deal
      Async ()
b <- forall a. IO a -> IO (Async a)
async IO ()
ping
      forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
        TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer forall a b. (a -> b) -> a -> b
$ \Peer
x -> Peer
x { peerAsync :: Maybe (Async ())
peerAsync = (forall a. a -> Maybe a
Just Async ()
a) }
        TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer forall a b. (a -> b) -> a -> b
$ \Peer
x -> Peer
x { peerAsyncPing :: Maybe (Async ())
peerAsyncPing = (forall a. a -> Maybe a
Just Async ()
b) }

      forall (m :: * -> *) a. Monad m => a -> m a
return TVar Peer
peer
    (TVar Peer
peer, Maybe (IO ())
_, Maybe (IO ())
_) -> forall (m :: * -> *) a. Monad m => a -> m a
return TVar Peer
peer

destroyPeer :: TVar ZREState -> UUID -> IO ()
destroyPeer :: TVar ZREState -> UUID -> IO ()
destroyPeer TVar ZREState
s UUID
uuid = do
  [Maybe (Async ())]
asyncs <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
    Maybe (TVar Peer)
mpt <- TVar ZREState -> UUID -> STM (Maybe (TVar Peer))
lookupPeer TVar ZREState
s UUID
uuid
    case Maybe (TVar Peer)
mpt of
      Maybe (TVar Peer)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return []
      (Just TVar Peer
peer) -> do
        Peer{Seq
Maybe (Async ())
Maybe ByteString
Headers
Groups
TBQueue ZRECmd
UTCTime
UUID
Endpoint
peerLastHeard :: UTCTime
peerQueue :: TBQueue ZRECmd
peerAsyncPing :: Maybe (Async ())
peerAsync :: Maybe (Async ())
peerHeaders :: Headers
peerName :: Maybe ByteString
peerGroupSeq :: Seq
peerGroups :: Groups
peerSeq :: Seq
peerUUID :: UUID
peerEndpoint :: Endpoint
peerLastHeard :: Peer -> UTCTime
peerQueue :: Peer -> TBQueue ZRECmd
peerAsyncPing :: Peer -> Maybe (Async ())
peerAsync :: Peer -> Maybe (Async ())
peerHeaders :: Peer -> Headers
peerName :: Peer -> Maybe ByteString
peerGroupSeq :: Peer -> Seq
peerGroups :: Peer -> Groups
peerSeq :: Peer -> Seq
peerUUID :: Peer -> UUID
peerEndpoint :: Peer -> Endpoint
..} <- forall a. TVar a -> STM a
readTVar TVar Peer
peer
        forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s forall a b. (a -> b) -> a -> b
$ \ZREState
x -> ZREState
x { zrePeers :: Peers
zrePeers = forall k a. Ord k => k -> Map k a -> Map k a
M.delete UUID
uuid (ZREState -> Peers
zrePeers ZREState
x) }
        TVar ZREState -> TVar Peer -> Groups -> Seq -> STM ()
leaveGroups TVar ZREState
s TVar Peer
peer Groups
peerGroups Seq
peerGroupSeq
        TVar ZREState -> Event -> STM ()
emit TVar ZREState
s forall a b. (a -> b) -> a -> b
$ UUID -> Maybe ByteString -> Event
Quit UUID
uuid Maybe ByteString
peerName

        forall (m :: * -> *) a. Monad m => a -> m a
return [Maybe (Async ())
peerAsync, Maybe (Async ())
peerAsyncPing]

  -- this is called from pinger so no more code is executed after this point
  forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ forall {a}. Maybe (Async a) -> IO ()
cancelM [Maybe (Async ())]
asyncs
  where
    cancelM :: Maybe (Async a) -> IO ()
cancelM Maybe (Async a)
Nothing = forall (m :: * -> *) a. Monad m => a -> m a
return ()
    cancelM (Just Async a
a) = forall a. Async a -> IO ()
cancel Async a
a

pinger :: TVar ZREState -> TVar Peer -> IO b
pinger :: forall b. TVar ZREState -> TVar Peer -> IO b
pinger TVar ZREState
s TVar Peer
peer = forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
  p :: Peer
p@Peer{Seq
Maybe (Async ())
Maybe ByteString
Headers
Groups
TBQueue ZRECmd
UTCTime
UUID
Endpoint
peerLastHeard :: UTCTime
peerQueue :: TBQueue ZRECmd
peerAsyncPing :: Maybe (Async ())
peerAsync :: Maybe (Async ())
peerHeaders :: Headers
peerName :: Maybe ByteString
peerGroupSeq :: Seq
peerGroups :: Groups
peerSeq :: Seq
peerUUID :: UUID
peerEndpoint :: Endpoint
peerLastHeard :: Peer -> UTCTime
peerQueue :: Peer -> TBQueue ZRECmd
peerAsyncPing :: Peer -> Maybe (Async ())
peerAsync :: Peer -> Maybe (Async ())
peerHeaders :: Peer -> Headers
peerName :: Peer -> Maybe ByteString
peerGroupSeq :: Peer -> Seq
peerGroups :: Peer -> Groups
peerSeq :: Peer -> Seq
peerUUID :: Peer -> UUID
peerEndpoint :: Peer -> Endpoint
..} <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> STM a
readTVar TVar Peer
peer
  ZRECfg
cfg <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ ZREState -> ZRECfg
zreCfg forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TVar a -> STM a
readTVar TVar ZREState
s

  UTCTime
now <- IO UTCTime
getCurrentTime
  if UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
now UTCTime
peerLastHeard forall a. Ord a => a -> a -> Bool
> (forall a b. (Real a, Fractional b) => a -> b
realToFrac forall a b. (a -> b) -> a -> b
$ ZRECfg -> Float
zreDeadPeriod ZRECfg
cfg)
    then do
      forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
B.unwords [ByteString
"Peer over deadPeriod, destroying", forall a. Show a => a -> ByteString
bshow Peer
p]
      TVar ZREState -> UUID -> IO ()
destroyPeer TVar ZREState
s UUID
peerUUID
    else do
      let tdiff :: NominalDiffTime
tdiff = UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
now UTCTime
peerLastHeard
      if NominalDiffTime
tdiff forall a. Ord a => a -> a -> Bool
> (forall a b. (Real a, Fractional b) => a -> b
realToFrac forall a b. (a -> b) -> a -> b
$ ZRECfg -> Float
zreQuietPeriod ZRECfg
cfg)
        then do
          forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
B.unwords [ByteString
"Peer over quietPeriod, sending hugz", forall a. Show a => a -> ByteString
bshow Peer
p]
          forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue ZRECmd
peerQueue forall a b. (a -> b) -> a -> b
$ ZRECmd
Ping
          Seq -> IO ()
threadDelay forall a b. (a -> b) -> a -> b
$ forall a. RealFrac a => a -> Seq
sec (ZRECfg -> Float
zreQuietPingRate ZRECfg
cfg)
        else do
          Seq -> IO ()
threadDelay forall a b. (a -> b) -> a -> b
$ forall a. RealFrac a => a -> Seq
sec forall a b. (a -> b) -> a -> b
$ (forall a b. (Real a, Fractional b) => a -> b
realToFrac forall a b. (a -> b) -> a -> b
$ ZRECfg -> Float
zreQuietPeriod ZRECfg
cfg) forall a. Num a => a -> a -> a
- NominalDiffTime
tdiff

lookupPeer :: TVar ZREState -> UUID -> STM (Maybe (TVar Peer))
lookupPeer :: TVar ZREState -> UUID -> STM (Maybe (TVar Peer))
lookupPeer TVar ZREState
s UUID
uuid = do
  ZREState
st <- forall a. TVar a -> STM a
readTVar TVar ZREState
s
  forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup UUID
uuid forall a b. (a -> b) -> a -> b
$ ZREState -> Peers
zrePeers ZREState
st

updatePeer :: TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer :: TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer Peer -> Peer
fn = forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar Peer
peer Peer -> Peer
fn

--updatePeerUUID :: TVar ZREState -> UUID -> (Peer -> Peer) -> STM ()
--updatePeerUUID s uuid fn = do
--  st <- readTVar s
--  case M.lookup uuid $ zrePeers st of
--    Nothing -> return ()
--    (Just peer) -> updatePeer peer fn

updateLastHeard :: TVar Peer -> UTCTime -> STM ()
updateLastHeard :: TVar Peer -> UTCTime -> STM ()
updateLastHeard TVar Peer
peer UTCTime
val = TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer forall a b. (a -> b) -> a -> b
$ \Peer
x -> Peer
x { peerLastHeard :: UTCTime
peerLastHeard = UTCTime
val }

-- join `peer` to `group`, update group sequence nuber to `groupSeq`
joinGroup :: TVar ZREState -> TVar Peer -> Group -> GroupSeq -> STM ()
joinGroup :: TVar ZREState -> TVar Peer -> Group -> Seq -> STM ()
joinGroup TVar ZREState
s TVar Peer
peer Group
group Seq
groupSeq = do
  TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer forall a b. (a -> b) -> a -> b
$ \Peer
x -> Peer
x { peerGroups :: Groups
peerGroups = forall a. Ord a => a -> Set a -> Set a
Set.insert Group
group (Peer -> Groups
peerGroups Peer
x) }
  TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer forall a b. (a -> b) -> a -> b
$ \Peer
x -> Peer
x { peerGroupSeq :: Seq
peerGroupSeq = Seq
groupSeq }
  Peer
p <- forall a. TVar a -> STM a
readTVar TVar Peer
peer
  TVar ZREState -> Event -> STM ()
emit TVar ZREState
s forall a b. (a -> b) -> a -> b
$ UUID -> Group -> Event
GroupJoin (Peer -> UUID
peerUUID Peer
p) Group
group
  forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s forall a b. (a -> b) -> a -> b
$ \ZREState
x -> ZREState
x { zrePeerGroups :: PeerGroups
zrePeerGroups = forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
M.alter (Peer -> Maybe Peers -> Maybe Peers
f Peer
p) Group
group forall a b. (a -> b) -> a -> b
$ ZREState -> PeerGroups
zrePeerGroups ZREState
x }
  where
    f :: Peer -> Maybe Peers -> Maybe Peers
f Peer
p Maybe Peers
Nothing = forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [(Peer -> UUID
peerUUID Peer
p, TVar Peer
peer)]
    f Peer
p (Just Peers
old) = forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert (Peer -> UUID
peerUUID Peer
p) TVar Peer
peer Peers
old

joinGroups :: TVar ZREState -> TVar Peer -> Set.Set Group -> GroupSeq -> STM ()
joinGroups :: TVar ZREState -> TVar Peer -> Groups -> Seq -> STM ()
joinGroups TVar ZREState
s TVar Peer
peer Groups
groups Seq
groupSeq = do
  forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\Group
x -> TVar ZREState -> TVar Peer -> Group -> Seq -> STM ()
joinGroup TVar ZREState
s TVar Peer
peer Group
x Seq
groupSeq) forall a b. (a -> b) -> a -> b
$ forall a. Set a -> [a]
Set.toList Groups
groups

leaveGroup :: TVar ZREState -> TVar Peer -> Group -> GroupSeq -> STM ()
leaveGroup :: TVar ZREState -> TVar Peer -> Group -> Seq -> STM ()
leaveGroup TVar ZREState
s TVar Peer
peer Group
group Seq
groupSeq = do
  TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer forall a b. (a -> b) -> a -> b
$ \Peer
x -> Peer
x { peerGroups :: Groups
peerGroups = forall a. Ord a => a -> Set a -> Set a
Set.delete Group
group (Peer -> Groups
peerGroups Peer
x) }
  TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer forall a b. (a -> b) -> a -> b
$ \Peer
x -> Peer
x { peerGroupSeq :: Seq
peerGroupSeq = Seq
groupSeq }
  Peer
p <- forall a. TVar a -> STM a
readTVar TVar Peer
peer
  TVar ZREState -> Event -> STM ()
emit TVar ZREState
s forall a b. (a -> b) -> a -> b
$ UUID -> Group -> Event
GroupLeave (Peer -> UUID
peerUUID Peer
p) Group
group
  forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s forall a b. (a -> b) -> a -> b
$ \ZREState
x -> ZREState
x { zrePeerGroups :: PeerGroups
zrePeerGroups = forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
M.alter (forall {a}. Peer -> Maybe (Map UUID a) -> Maybe (Map UUID a)
f Peer
p) Group
group forall a b. (a -> b) -> a -> b
$ ZREState -> PeerGroups
zrePeerGroups ZREState
x }
  where
    f :: Peer -> Maybe (Map UUID a) -> Maybe (Map UUID a)
f Peer
_ Maybe (Map UUID a)
Nothing = forall a. Maybe a
Nothing
    f Peer
p (Just Map UUID a
old) = forall {k} {a}. Map k a -> Maybe (Map k a)
nEmpty forall a b. (a -> b) -> a -> b
$ forall k a. Ord k => k -> Map k a -> Map k a
M.delete (Peer -> UUID
peerUUID Peer
p) Map UUID a
old
    nEmpty :: Map k a -> Maybe (Map k a)
nEmpty Map k a
pmap | forall k a. Map k a -> Bool
M.null Map k a
pmap = forall a. Maybe a
Nothing
    nEmpty Map k a
pmap = forall a. a -> Maybe a
Just Map k a
pmap

leaveGroups :: TVar ZREState -> TVar Peer -> Set.Set Group -> GroupSeq -> STM ()
leaveGroups :: TVar ZREState -> TVar Peer -> Groups -> Seq -> STM ()
leaveGroups TVar ZREState
s TVar Peer
peer Groups
groups Seq
groupSeq = do
  forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\Group
x -> TVar ZREState -> TVar Peer -> Group -> Seq -> STM ()
leaveGroup TVar ZREState
s TVar Peer
peer Group
x Seq
groupSeq) forall a b. (a -> b) -> a -> b
$ forall a. Set a -> [a]
Set.toList Groups
groups

msgPeer :: TVar Peer -> ZRECmd -> STM ()
msgPeer :: TVar Peer -> ZRECmd -> STM ()
msgPeer TVar Peer
peer ZRECmd
msg = do
  Peer
p <- forall a. TVar a -> STM a
readTVar TVar Peer
peer
  forall a. TBQueue a -> a -> STM ()
writeTBQueue (Peer -> TBQueue ZRECmd
peerQueue Peer
p) ZRECmd
msg

msgPeerUUID :: TVar ZREState -> UUID -> ZRECmd -> STM ()
msgPeerUUID :: TVar ZREState -> UUID -> ZRECmd -> STM ()
msgPeerUUID TVar ZREState
s UUID
uuid ZRECmd
msg = do
  ZREState
st <- forall a. TVar a -> STM a
readTVar TVar ZREState
s
  case forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup UUID
uuid forall a b. (a -> b) -> a -> b
$ ZREState -> Peers
zrePeers ZREState
st of
    Maybe (TVar Peer)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
    (Just TVar Peer
peer) -> do
      TVar Peer -> ZRECmd -> STM ()
msgPeer TVar Peer
peer ZRECmd
msg
      forall (m :: * -> *) a. Monad m => a -> m a
return ()

msgAll :: TVar ZREState -> ZRECmd -> STM ()
msgAll :: TVar ZREState -> ZRECmd -> STM ()
msgAll TVar ZREState
s ZRECmd
msg = do
  ZREState
st <- forall a. TVar a -> STM a
readTVar TVar ZREState
s
  forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (forall a b c. (a -> b -> c) -> b -> a -> c
flip TVar Peer -> ZRECmd -> STM ()
msgPeer ZRECmd
msg) (ZREState -> Peers
zrePeers ZREState
st)

msgGroup :: TVar ZREState -> Group -> ZRECmd -> STM ()
msgGroup :: TVar ZREState -> Group -> ZRECmd -> STM ()
msgGroup TVar ZREState
s Group
groupname ZRECmd
msg = do
  ZREState
st <- forall a. TVar a -> STM a
readTVar TVar ZREState
s
  case forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup Group
groupname forall a b. (a -> b) -> a -> b
$ ZREState -> PeerGroups
zrePeerGroups ZREState
st of
    Maybe Peers
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return () -- XXX: should report no such group error?
    (Just Peers
group) -> do
      forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (forall a b c. (a -> b -> c) -> b -> a -> c
flip TVar Peer -> ZRECmd -> STM ()
msgPeer ZRECmd
msg) Peers
group

shoutGroup :: TVar ZREState -> Group -> ByteString -> STM ()
shoutGroup :: TVar ZREState -> Group -> ByteString -> STM ()
shoutGroup TVar ZREState
s Group
group ByteString
msg = TVar ZREState -> Group -> ZRECmd -> STM ()
msgGroup TVar ZREState
s Group
group forall a b. (a -> b) -> a -> b
$ Group -> [ByteString] -> ZRECmd
Shout Group
group [ByteString
msg]

shoutGroupMulti :: TVar ZREState -> Group -> Content -> STM ()
shoutGroupMulti :: TVar ZREState -> Group -> [ByteString] -> STM ()
shoutGroupMulti TVar ZREState
s Group
group [ByteString]
mmsg = TVar ZREState -> Group -> ZRECmd -> STM ()
msgGroup TVar ZREState
s Group
group forall a b. (a -> b) -> a -> b
$ Group -> [ByteString] -> ZRECmd
Shout Group
group [ByteString]
mmsg

msgAllJoin :: TVar ZREState -> Group -> GroupSeq -> STM ()
msgAllJoin :: TVar ZREState -> Group -> Seq -> STM ()
msgAllJoin TVar ZREState
s Group
group Seq
sq = TVar ZREState -> ZRECmd -> STM ()
msgAll TVar ZREState
s forall a b. (a -> b) -> a -> b
$ Group -> Seq -> ZRECmd
Join Group
group Seq
sq

msgAllLeave :: TVar ZREState -> Group -> GroupSeq -> STM ()
msgAllLeave :: TVar ZREState -> Group -> Seq -> STM ()
msgAllLeave TVar ZREState
s Group
group Seq
sq = TVar ZREState -> ZRECmd -> STM ()
msgAll TVar ZREState
s forall a b. (a -> b) -> a -> b
$ Group -> Seq -> ZRECmd
Leave Group
group Seq
sq

whisperPeerUUID :: TVar ZREState -> UUID -> ByteString -> STM ()
whisperPeerUUID :: TVar ZREState -> UUID -> ByteString -> STM ()
whisperPeerUUID TVar ZREState
s UUID
uuid ByteString
msg = TVar ZREState -> UUID -> ZRECmd -> STM ()
msgPeerUUID TVar ZREState
s UUID
uuid forall a b. (a -> b) -> a -> b
$ [ByteString] -> ZRECmd
Whisper [ByteString
msg]

printPeers :: M.Map k (TVar Peer) -> IO ()
printPeers :: forall k. Map k (TVar Peer) -> IO ()
printPeers Map k (TVar Peer)
x = do
  forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ TVar Peer -> IO ()
ePrint forall a b. (a -> b) -> a -> b
$ forall k a. Map k a -> [a]
M.elems Map k (TVar Peer)
x
  where
    ePrint :: TVar Peer -> IO ()
ePrint TVar Peer
pt = do
      Peer
p <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> STM a
readTVar TVar Peer
pt
      ByteString -> IO ()
B.putStrLn forall a b. (a -> b) -> a -> b
$ Peer -> ByteString
printPeer Peer
p

printGroup :: (Group, M.Map k (TVar Peer)) -> IO ()
printGroup :: forall k. (Group, Map k (TVar Peer)) -> IO ()
printGroup (Group
g, Map k (TVar Peer)
v) = do
  ByteString -> IO ()
B.putStrLn forall a b. (a -> b) -> a -> b
$ ByteString -> [ByteString] -> ByteString
B.intercalate ByteString
" " [ByteString
"group", Group -> ByteString
unGroup Group
g, ByteString
"->"]
  forall k. Map k (TVar Peer) -> IO ()
printPeers Map k (TVar Peer)
v

printAll :: TVar ZREState -> IO ()
printAll :: TVar ZREState -> IO ()
printAll TVar ZREState
s = do
  ZREState
st <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> STM a
readTVar TVar ZREState
s
  forall k. Map k (TVar Peer) -> IO ()
printPeers forall a b. (a -> b) -> a -> b
$ ZREState -> Peers
zrePeers ZREState
st
  forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ forall k. (Group, Map k (TVar Peer)) -> IO ()
printGroup forall a b. (a -> b) -> a -> b
$ forall k a. Map k a -> [(k, a)]
M.toList forall a b. (a -> b) -> a -> b
$ ZREState -> PeerGroups
zrePeerGroups ZREState
st