module Control.Distributed.Process.Registry
(
KeyType(..)
, Key(..)
, Keyable
, Registry(..)
, start
, run
, addName
, addProperty
, registerName
, registerValue
, giveAwayName
, RegisterKeyReply(..)
, unregisterName
, UnregisterKeyReply(..)
, lookupName
, lookupProperty
, registeredNames
, foldNames
, SearchHandle()
, member
, queryNames
, findByProperty
, findByPropertyValue
, monitor
, monitorName
, monitorProp
, unmonitor
, await
, awaitTimeout
, AwaitResult(..)
, KeyUpdateEventMask(..)
, KeyUpdateEvent(..)
, RegKeyMonitorRef
, RegistryKeyMonitorNotification(RegistryKeyMonitorNotification)
) where
import Control.Distributed.Process hiding (call, monitor, unmonitor, mask)
import qualified Control.Distributed.Process.UnsafePrimitives as Unsafe (send)
import qualified Control.Distributed.Process as P (monitor)
import Control.Distributed.Process.Serializable
import Control.Distributed.Process.Extras hiding (monitor, wrapMessage)
import qualified Control.Distributed.Process.Extras as PL
( monitor
)
import Control.Distributed.Process.ManagedProcess
( call
, cast
, handleInfo
, reply
, continue
, input
, defaultProcess
, prioritised
, InitResult(..)
, ProcessAction
, ProcessReply
, ProcessDefinition(..)
, PrioritisedProcessDefinition(..)
, DispatchPriority
, CallRef
)
import qualified Control.Distributed.Process.ManagedProcess as MP
( pserve
)
import Control.Distributed.Process.ManagedProcess.Server
( handleCallIf
, handleCallFrom
, handleCallFromIf
, handleCast
)
import Control.Distributed.Process.ManagedProcess.Server.Priority
( prioritiseInfo_
, setPriority
)
import Control.Distributed.Process.ManagedProcess.Server.Restricted
( RestrictedProcess
, Result
, getState
)
import qualified Control.Distributed.Process.ManagedProcess.Server.Restricted as Restricted
( handleCall
, reply
)
import Control.Distributed.Process.Extras.Time
import Control.Monad (forM_, void)
import Data.Accessor
( Accessor
, accessor
, (^:)
, (^=)
, (^.)
)
import Data.Binary
import Data.Foldable (Foldable)
import qualified Data.Foldable as Foldable
import Data.Maybe (fromJust, isJust)
import Data.Hashable
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as Map
import Control.Distributed.Process.Extras.Internal.Containers.MultiMap (MultiMap)
import qualified Control.Distributed.Process.Extras.Internal.Containers.MultiMap as MultiMap
import Data.HashSet (HashSet)
import qualified Data.HashSet as Set
import Data.Typeable (Typeable)
import GHC.Generics
data KeyType =
KeyTypeAlias
| KeyTypeProperty
deriving (Typeable, Generic, Show, Eq)
instance Binary KeyType where
instance Hashable KeyType where
data Key a =
Key
{ keyIdentity :: !a
, keyType :: !KeyType
, keyScope :: !(Maybe ProcessId)
}
deriving (Typeable, Generic, Show, Eq)
instance (Serializable a) => Binary (Key a) where
instance (Hashable a) => Hashable (Key a) where
class (Show a, Eq a, Hashable a, Serializable a) => Keyable a
instance (Show a, Eq a, Hashable a, Serializable a) => Keyable a
data Registry k v = Registry { registryPid :: ProcessId }
deriving (Typeable, Generic, Show, Eq)
instance (Keyable k, Serializable v) => Binary (Registry k v) where
instance Resolvable (Registry k v) where
resolve = return . Just . registryPid
instance Linkable (Registry k v) where
linkTo = link . registryPid
data LookupKeyReq k = LookupKeyReq !(Key k)
deriving (Typeable, Generic)
instance (Serializable k) => Binary (LookupKeyReq k) where
data LookupPropReq k = PropReq (Key k)
deriving (Typeable, Generic)
instance (Serializable k) => Binary (LookupPropReq k) where
data LookupPropReply =
PropFound !Message
| PropNotFound
deriving (Typeable, Generic)
instance Binary LookupPropReply where
data InvalidPropertyType = InvalidPropertyType
deriving (Typeable, Generic, Show, Eq)
instance Binary InvalidPropertyType where
data RegNamesReq = RegNamesReq !ProcessId
deriving (Typeable, Generic)
instance Binary RegNamesReq where
data UnregisterKeyReq k = UnregisterKeyReq !(Key k)
deriving (Typeable, Generic)
instance (Serializable k) => Binary (UnregisterKeyReq k) where
data UnregisterKeyReply =
UnregisterOk
| UnregisterInvalidKey
| UnregisterKeyNotFound
deriving (Typeable, Generic, Eq, Show)
instance Binary UnregisterKeyReply where
data KeyUpdateEventMask =
OnKeyRegistered
| OnKeyUnregistered
| OnKeyOwnershipChange
| OnKeyLeaseExpiry
deriving (Typeable, Generic, Eq, Show)
instance Binary KeyUpdateEventMask where
instance Hashable KeyUpdateEventMask where
newtype RegKeyMonitorRef =
RegKeyMonitorRef { unRef :: (ProcessId, Integer) }
deriving (Typeable, Generic, Eq, Show)
instance Binary RegKeyMonitorRef where
instance Hashable RegKeyMonitorRef where
instance Resolvable RegKeyMonitorRef where
resolve = return . Just . fst . unRef
data KeyUpdateEvent =
KeyRegistered
{
owner :: !ProcessId
}
| KeyUnregistered
| KeyLeaseExpired
| KeyOwnerDied
{
diedReason :: !DiedReason
}
| KeyOwnerChanged
{
previousOwner :: !ProcessId
, newOwner :: !ProcessId
}
deriving (Typeable, Generic, Eq, Show)
instance Binary KeyUpdateEvent where
data RegistryKeyMonitorNotification k =
RegistryKeyMonitorNotification !k !RegKeyMonitorRef !KeyUpdateEvent !ProcessId
deriving (Typeable, Generic)
instance (Keyable k) => Binary (RegistryKeyMonitorNotification k) where
deriving instance (Keyable k) => Eq (RegistryKeyMonitorNotification k)
deriving instance (Keyable k) => Show (RegistryKeyMonitorNotification k)
data RegisterKeyReq k = RegisterKeyReq !(Key k)
deriving (Typeable, Generic)
instance (Serializable k) => Binary (RegisterKeyReq k) where
data RegisterKeyReply =
RegisteredOk
| AlreadyRegistered
deriving (Typeable, Generic, Eq, Show)
instance Binary RegisterKeyReply where
data GiveAwayName k = GiveAwayName !ProcessId !(Key k)
deriving (Typeable, Generic)
instance (Keyable k) => Binary (GiveAwayName k) where
deriving instance (Keyable k) => Eq (GiveAwayName k)
deriving instance (Keyable k) => Show (GiveAwayName k)
data MonitorReq k = MonitorReq !(Key k) !(Maybe [KeyUpdateEventMask])
deriving (Typeable, Generic)
instance (Keyable k) => Binary (MonitorReq k) where
data UnmonitorReq = UnmonitorReq !RegKeyMonitorRef
deriving (Typeable, Generic)
instance Binary UnmonitorReq where
data AwaitResult k =
RegisteredName !ProcessId !k
| ServerUnreachable !DiedReason
| AwaitTimeout
deriving (Typeable, Generic, Eq, Show)
instance (Keyable k) => Binary (AwaitResult k) where
data KMRef = KMRef { ref :: !RegKeyMonitorRef
, mask :: !(Maybe [KeyUpdateEventMask])
}
deriving (Typeable, Generic, Show)
instance Hashable KMRef where
instance Eq KMRef where
(KMRef a _) == (KMRef b _) = a == b
data State k v =
State
{
_names :: !(HashMap k ProcessId)
, _properties :: !(HashMap (ProcessId, k) v)
, _monitors :: !(MultiMap k KMRef)
, _registeredPids :: !(HashSet ProcessId)
, _listeningPids :: !(HashSet ProcessId)
, _monitorIdCount :: !Integer
}
deriving (Typeable, Generic)
data QueryDirect = QueryDirectNames | QueryDirectProperties | QueryDirectValues
deriving (Typeable, Generic)
instance Binary QueryDirect where
data SHashMap k v = SHashMap [(k, v)] (HashMap k v)
deriving (Typeable, Generic)
instance (Keyable k, Serializable v) =>
Binary (SHashMap k v) where
put = error "AttemptedToUseBinaryShim"
get = error "AttemptedToUseBinaryShim"
newtype SearchHandle k v = RS { getRS :: HashMap k v }
deriving (Typeable)
instance (Keyable k) => Functor (SearchHandle k) where
fmap f (RS m) = RS $ Map.map f m
instance (Keyable k) => Foldable (SearchHandle k) where
foldr f acc = Foldable.foldr f acc . getRS
start :: forall k v. (Keyable k, Serializable v)
=> Process (Registry k v)
start = return . Registry =<< spawnLocal (run (undefined :: Registry k v))
run :: forall k v. (Keyable k, Serializable v)
=> Registry k v
-> Process ()
run _ =
MP.pserve () (const $ return $ InitOk initState Infinity) serverDefinition
where
initState = State { _names = Map.empty
, _properties = Map.empty
, _monitors = MultiMap.empty
, _registeredPids = Set.empty
, _listeningPids = Set.empty
, _monitorIdCount = (1 :: Integer)
} :: State k v
addName :: forall k v. (Keyable k)
=> Registry k v
-> k
-> Process RegisterKeyReply
addName s n = getSelfPid >>= registerName s n
giveAwayName :: forall k v . (Keyable k)
=> Registry k v
-> k
-> ProcessId
-> Process ()
giveAwayName s n p = do
us <- getSelfPid
cast s $ GiveAwayName p $ Key n KeyTypeAlias (Just us)
addProperty :: (Keyable k, Serializable v)
=> Registry k v -> k -> v -> Process RegisterKeyReply
addProperty s k v = do
call s $ (RegisterKeyReq (Key k KeyTypeProperty $ Nothing), v)
registerName :: forall k v . (Keyable k)
=> Registry k v -> k -> ProcessId -> Process RegisterKeyReply
registerName s n p = do
call s $ RegisterKeyReq (Key n KeyTypeAlias $ Just p)
registerValue :: (Resolvable b, Keyable k, Serializable v)
=> Registry k v -> b -> k -> v -> Process RegisterKeyReply
registerValue s t n v = do
Just p <- resolve t
call s $ (RegisterKeyReq (Key n KeyTypeProperty $ Just p), v)
unregisterName :: forall k v . (Keyable k)
=> Registry k v
-> k
-> Process UnregisterKeyReply
unregisterName s n = do
self <- getSelfPid
call s $ UnregisterKeyReq (Key n KeyTypeAlias $ Just self)
lookupName :: forall k v . (Keyable k)
=> Registry k v
-> k
-> Process (Maybe ProcessId)
lookupName s n = call s $ LookupKeyReq (Key n KeyTypeAlias Nothing)
lookupProperty :: (Keyable k, Serializable v)
=> Registry k v
-> k
-> Process (Maybe v)
lookupProperty s n = do
us <- getSelfPid
res <- call s $ PropReq (Key n KeyTypeProperty (Just us))
case res of
PropNotFound -> return Nothing
PropFound msg -> do
val <- unwrapMessage msg
if (isJust val)
then return val
else die InvalidPropertyType
registeredNames :: forall k v . (Keyable k)
=> Registry k v
-> ProcessId
-> Process [k]
registeredNames s p = call s $ RegNamesReq p
monitorName :: forall k v. (Keyable k)
=> Registry k v -> k -> Process RegKeyMonitorRef
monitorName svr name = do
let key' = Key { keyIdentity = name
, keyScope = Nothing
, keyType = KeyTypeAlias
}
monitor svr key' Nothing
monitorProp :: forall k v. (Keyable k)
=> Registry k v -> k -> ProcessId -> Process RegKeyMonitorRef
monitorProp svr key pid = do
let key' = Key { keyIdentity = key
, keyScope = Just pid
, keyType = KeyTypeProperty
}
monitor svr key' Nothing
monitor :: forall k v. (Keyable k)
=> Registry k v
-> Key k
-> Maybe [KeyUpdateEventMask]
-> Process RegKeyMonitorRef
monitor svr key' mask' = call svr $ MonitorReq key' mask'
unmonitor :: forall k v. (Keyable k)
=> Registry k v
-> RegKeyMonitorRef
-> Process ()
unmonitor s = call s . UnmonitorReq
await :: forall k v. (Keyable k)
=> Registry k v
-> k
-> Process (AwaitResult k)
await a k = awaitTimeout a Infinity k
awaitTimeout :: forall k v. (Keyable k)
=> Registry k v
-> Delay
-> k
-> Process (AwaitResult k)
awaitTimeout a d k = do
p <- forceResolve a
Just mRef <- PL.monitor p
kRef <- monitor a (Key k KeyTypeAlias Nothing) (Just [OnKeyRegistered])
let matches' = matches mRef kRef k
let recv = case d of
Infinity -> receiveWait matches' >>= return . Just
Delay t -> receiveTimeout (asTimeout t) matches'
NoDelay -> receiveTimeout 0 matches'
recv >>= return . maybe AwaitTimeout id
where
forceResolve addr = do
mPid <- resolve addr
case mPid of
Nothing -> die "InvalidAddressable"
Just p -> return p
matches mr kr k' = [
matchIf (\(RegistryKeyMonitorNotification mk' kRef' ev' _) ->
(matchEv ev' && kRef' == kr && mk' == k'))
(\(RegistryKeyMonitorNotification _ _ (KeyRegistered pid) _) ->
return $ RegisteredName pid k')
, matchIf (\(ProcessMonitorNotification mRef' _ _) -> mRef' == mr)
(\(ProcessMonitorNotification _ _ dr) ->
return $ ServerUnreachable dr)
]
matchEv ev' = case ev' of
KeyRegistered _ -> True
_ -> False
findByProperty :: forall k v. (Keyable k)
=> Registry k v
-> k
-> Process [ProcessId]
findByProperty r key = do
let pid = registryPid r
self <- getSelfPid
withMonitor pid $ do
cast r $ (self, QueryDirectProperties)
answer <- receiveWait [
match (\(SHashMap _ m :: SHashMap ProcessId [k]) -> return $ Just m)
, matchIf (\(ProcessMonitorNotification _ p _) -> p == pid)
(\_ -> return Nothing)
, matchAny (\_ -> return Nothing)
]
case answer of
Nothing -> die "DisconnectedFromServer"
Just m -> return $ Map.foldlWithKey' matchKey [] m
where
matchKey ps p ks
| key `elem` ks = p:ps
| otherwise = ps
findByPropertyValue :: (Keyable k, Serializable v, Eq v)
=> Registry k v
-> k
-> v
-> Process [ProcessId]
findByPropertyValue r key val = do
let pid = registryPid r
self <- getSelfPid
withMonitor pid $ do
cast r $ (self, QueryDirectValues)
answer <- receiveWait [
match (\(SHashMap _ m :: SHashMap ProcessId [(k, v)]) -> return $ Just m)
, matchIf (\(ProcessMonitorNotification _ p _) -> p == pid)
(\_ -> return Nothing)
, matchAny (\_ -> return Nothing)
]
case answer of
Nothing -> die "DisconnectedFromServer"
Just m -> return $ Map.foldlWithKey' matchKey [] m
where
matchKey ps p ks
| (key, val) `elem` ks = p:ps
| otherwise = ps
foldNames :: forall b k v . Keyable k
=> Registry k v
-> b
-> (b -> (k, ProcessId) -> Process b)
-> Process b
foldNames pid acc fn = do
self <- getSelfPid
cast pid $ (self, QueryDirectNames)
SHashMap _ m <- expect :: Process (SHashMap k ProcessId)
Foldable.foldlM fn acc (Map.toList m)
queryNames :: forall b k v . Keyable k
=> Registry k v
-> (SearchHandle k ProcessId -> Process b)
-> Process b
queryNames pid fn = do
self <- getSelfPid
cast pid $ (self, QueryDirectNames)
SHashMap _ m <- expect :: Process (SHashMap k ProcessId)
fn (RS m)
member :: (Keyable k, Serializable v)
=> k
-> SearchHandle k v
-> Bool
member k = Map.member k . getRS
serverDefinition :: forall k v. (Keyable k, Serializable v)
=> PrioritisedProcessDefinition (State k v)
serverDefinition = prioritised processDefinition regPriorities
where
regPriorities :: [DispatchPriority (State k v)]
regPriorities = [
prioritiseInfo_ (\(ProcessMonitorNotification _ _ _) -> setPriority 100)
]
processDefinition :: forall k v. (Keyable k, Serializable v)
=> ProcessDefinition (State k v)
processDefinition =
defaultProcess
{
apiHandlers =
[
handleCallIf
(input ((\(RegisterKeyReq (Key{..} :: Key k)) ->
keyType == KeyTypeAlias && (isJust keyScope))))
handleRegisterName
, handleCallIf
(input ((\((RegisterKeyReq (Key{..} :: Key k)), _ :: v) ->
keyType == KeyTypeProperty && (isJust keyScope))))
handleRegisterProperty
, handleCallFromIf
(input ((\((RegisterKeyReq (Key{..} :: Key k)), _ :: v) ->
keyType == KeyTypeProperty && (not $ isJust keyScope))))
handleRegisterPropertyCR
, handleCast handleGiveAwayName
, handleCallIf
(input ((\(LookupKeyReq (Key{..} :: Key k)) ->
keyType == KeyTypeAlias)))
(\state (LookupKeyReq key') -> reply (findName key' state) state)
, handleCallIf
(input ((\(PropReq (Key{..} :: Key k)) ->
keyType == KeyTypeProperty && (isJust keyScope))))
handleLookupProperty
, handleCallIf
(input ((\(UnregisterKeyReq (Key{..} :: Key k)) ->
keyType == KeyTypeAlias && (isJust keyScope))))
handleUnregisterName
, handleCallFrom handleMonitorReq
, handleCallFrom handleUnmonitorReq
, Restricted.handleCall handleRegNamesLookup
, handleCast handleQuery
]
, infoHandlers = [handleInfo handleMonitorSignal]
} :: ProcessDefinition (State k v)
handleQuery :: forall k v. (Keyable k, Serializable v)
=> State k v
-> (ProcessId, QueryDirect)
-> Process (ProcessAction (State k v))
handleQuery st@State{..} (pid, qd) = do
case qd of
QueryDirectNames -> Unsafe.send pid shmNames
QueryDirectProperties -> Unsafe.send pid shmProps
QueryDirectValues -> Unsafe.send pid shmVals
continue st
where
shmNames = SHashMap [] $ st ^. names
shmProps = SHashMap [] xfmProps
shmVals = SHashMap [] xfmVals
xfmProps = Map.foldlWithKey' convProps Map.empty (st ^. properties)
xfmVals = Map.foldlWithKey' convVals Map.empty (st ^. properties)
convProps m (p, k) _ =
case Map.lookup p m of
Nothing -> Map.insert p [k] m
Just ks -> Map.insert p (k:ks) m
convVals m (p, k) v =
case Map.lookup p m of
Nothing -> Map.insert p [(k, v)] m
Just ks -> Map.insert p ((k, v):ks) m
handleRegisterName :: forall k v. (Keyable k, Serializable v)
=> State k v
-> RegisterKeyReq k
-> Process (ProcessReply RegisterKeyReply (State k v))
handleRegisterName state (RegisterKeyReq Key{..}) = do
let found = Map.lookup keyIdentity (state ^. names)
case found of
Nothing -> do
let pid = fromJust keyScope
let refs = state ^. registeredPids
refs' <- ensureMonitored pid refs
notifySubscribers keyIdentity state (KeyRegistered pid)
reply RegisteredOk $ ( (names ^: Map.insert keyIdentity pid)
. (registeredPids ^= refs')
$ state)
Just pid ->
if (pid == (fromJust keyScope))
then reply RegisteredOk state
else reply AlreadyRegistered state
handleRegisterPropertyCR :: forall k v. (Keyable k, Serializable v)
=> State k v
-> CallRef (RegisterKeyReply)
-> (RegisterKeyReq k, v)
-> Process (ProcessReply RegisterKeyReply (State k v))
handleRegisterPropertyCR st cr req = do
pid <- resolve cr
doRegisterProperty (fromJust pid) st req
handleRegisterProperty :: forall k v. (Keyable k, Serializable v)
=> State k v
-> (RegisterKeyReq k, v)
-> Process (ProcessReply RegisterKeyReply (State k v))
handleRegisterProperty state req@((RegisterKeyReq Key{..}), _) = do
doRegisterProperty (fromJust keyScope) state req
doRegisterProperty :: forall k v. (Keyable k, Serializable v)
=> ProcessId
-> State k v
-> (RegisterKeyReq k, v)
-> Process (ProcessReply RegisterKeyReply (State k v))
doRegisterProperty scope state ((RegisterKeyReq Key{..}), v) = do
void $ P.monitor scope
notifySubscribers keyIdentity state (KeyRegistered scope)
reply RegisteredOk $ ( (properties ^: Map.insert (scope, keyIdentity) v)
$ state )
handleLookupProperty :: forall k v. (Keyable k, Serializable v)
=> State k v
-> LookupPropReq k
-> Process (ProcessReply LookupPropReply (State k v))
handleLookupProperty state (PropReq Key{..}) = do
let entry = Map.lookup (fromJust keyScope, keyIdentity) (state ^. properties)
case entry of
Nothing -> reply PropNotFound state
Just p -> reply (PropFound (wrapMessage p)) state
handleUnregisterName :: forall k v. (Keyable k, Serializable v)
=> State k v
-> UnregisterKeyReq k
-> Process (ProcessReply UnregisterKeyReply (State k v))
handleUnregisterName state (UnregisterKeyReq Key{..}) = do
let entry = Map.lookup keyIdentity (state ^. names)
case entry of
Nothing -> reply UnregisterKeyNotFound state
Just pid ->
case (pid /= (fromJust keyScope)) of
True -> reply UnregisterInvalidKey state
False -> do
notifySubscribers keyIdentity state KeyUnregistered
let state' = ( (names ^: Map.delete keyIdentity)
. (monitors ^: MultiMap.filterWithKey (\k' _ -> k' /= keyIdentity))
$ state)
reply UnregisterOk $ state'
handleGiveAwayName :: forall k v. (Keyable k, Serializable v)
=> State k v
-> GiveAwayName k
-> Process (ProcessAction (State k v))
handleGiveAwayName state (GiveAwayName newPid Key{..}) = do
maybe (continue state) giveAway $ Map.lookup keyIdentity (state ^. names)
where
giveAway pid = do
let scope = fromJust keyScope
case (pid == scope) of
False -> continue state
True -> do
let state' = ((names ^: Map.insert keyIdentity newPid) $ state)
notifySubscribers keyIdentity state (KeyOwnerChanged pid newPid)
continue state'
handleMonitorReq :: forall k v. (Keyable k, Serializable v)
=> State k v
-> CallRef RegKeyMonitorRef
-> MonitorReq k
-> Process (ProcessReply RegKeyMonitorRef (State k v))
handleMonitorReq state cRef (MonitorReq Key{..} mask') = do
let mRefId = (state ^. monitorIdCount) + 1
Just caller <- resolve cRef
let mRef = RegKeyMonitorRef (caller, mRefId)
let kmRef = KMRef mRef mask'
let refs = state ^. listeningPids
refs' <- ensureMonitored caller refs
fireEventForPreRegisteredKey state keyIdentity keyScope kmRef
reply mRef $ ( (monitors ^: MultiMap.insert keyIdentity kmRef)
. (listeningPids ^= refs')
. (monitorIdCount ^= mRefId)
$ state
)
where
fireEventForPreRegisteredKey st kId kScope KMRef{..} = do
let evMask = maybe [] id mask
case (keyType, elem OnKeyRegistered evMask) of
(KeyTypeAlias, True) -> do
let found = Map.lookup kId (st ^. names)
fireEvent found kId ref
(KeyTypeProperty, _) -> do
self <- getSelfPid
let scope = maybe self id kScope
let found = Map.lookup (scope, kId) (st ^. properties)
case found of
Nothing -> return ()
Just _ -> fireEvent (Just scope) kId ref
_ -> return ()
fireEvent fnd kId' ref' = do
case fnd of
Nothing -> return ()
Just p -> do
us <- getSelfPid
sendTo ref' $ (RegistryKeyMonitorNotification kId'
ref'
(KeyRegistered p)
us)
handleUnmonitorReq :: forall k v. (Keyable k, Serializable v)
=> State k v
-> CallRef ()
-> UnmonitorReq
-> Process (ProcessReply () (State k v))
handleUnmonitorReq state _cRef (UnmonitorReq ref') = do
let pid = fst $ unRef ref'
reply () $ ( (monitors ^: MultiMap.filter ((/= ref') . ref))
. (listeningPids ^: Set.delete pid)
$ state
)
handleRegNamesLookup :: forall k v. (Keyable k, Serializable v)
=> RegNamesReq
-> RestrictedProcess (State k v) (Result [k])
handleRegNamesLookup (RegNamesReq p) = do
state <- getState
Restricted.reply $ Map.foldlWithKey' (acc p) [] (state ^. names)
where
acc pid ns n pid'
| pid == pid' = (n:ns)
| otherwise = ns
handleMonitorSignal :: forall k v. (Keyable k, Serializable v)
=> State k v
-> ProcessMonitorNotification
-> Process (ProcessAction (State k v))
handleMonitorSignal state@State{..} (ProcessMonitorNotification _ pid diedReason) =
do let state' = removeActiveSubscriptions pid state
(deadNames, deadProps) <- notifyListeners state' pid diedReason
continue $ ( (names ^= Map.difference _names deadNames)
. (properties ^= Map.difference _properties deadProps)
$ state)
where
removeActiveSubscriptions p s =
let subscriptions = (state ^. listeningPids) in
case (Set.member p subscriptions) of
False -> s
True -> ( (listeningPids ^: Set.delete p)
. (monitors ^: MultiMap.filter ((/= p) . fst . unRef . ref))
$ s)
notifyListeners :: State k v
-> ProcessId
-> DiedReason
-> Process (HashMap k ProcessId, HashMap (ProcessId, k) v)
notifyListeners st pid' dr = do
let diedNames = Map.filter (== pid') (st ^. names)
let diedProps = Map.filterWithKey (\(p, _) _ -> p == pid')
(st ^. properties)
let nameSubs = MultiMap.filterWithKey (\k _ -> Map.member k diedNames)
(st ^. monitors)
let propSubs = MultiMap.filterWithKey (\k _ -> Map.member (pid', k) diedProps)
(st ^. monitors)
forM_ (MultiMap.toList nameSubs) $ \(kIdent, KMRef{..}) -> do
let kEvDied = KeyOwnerDied { diedReason = dr }
let mRef = RegistryKeyMonitorNotification kIdent ref
us <- getSelfPid
case mask of
Nothing -> sendTo ref (mRef kEvDied us)
Just mask' -> do
case (elem OnKeyOwnershipChange mask') of
True -> sendTo ref (mRef kEvDied us)
False -> do
if (elem OnKeyUnregistered mask')
then sendTo ref (mRef KeyUnregistered us)
else return ()
forM_ (MultiMap.toList propSubs) (notifyPropSubscribers dr)
return (diedNames, diedProps)
notifyPropSubscribers dr' (kIdent, KMRef{..}) = do
let died = maybe False (elem OnKeyOwnershipChange) mask
let event = case died of
True -> KeyOwnerDied { diedReason = dr' }
False -> KeyUnregistered
getSelfPid >>= sendTo ref . RegistryKeyMonitorNotification kIdent ref event
ensureMonitored :: ProcessId -> HashSet ProcessId -> Process (HashSet ProcessId)
ensureMonitored pid refs = do
case (Set.member pid refs) of
True -> return refs
False -> P.monitor pid >> return (Set.insert pid refs)
notifySubscribers :: forall k v. (Keyable k, Serializable v)
=> k
-> State k v
-> KeyUpdateEvent
-> Process ()
notifySubscribers k st ev = do
let subscribers = MultiMap.filterWithKey (\k' _ -> k' == k) (st ^. monitors)
forM_ (MultiMap.toList subscribers) $ \(_, KMRef{..}) -> do
if (maybe True (elem (maskFor ev)) mask)
then getSelfPid >>= sendTo ref . RegistryKeyMonitorNotification k ref ev
else return ()
maskFor :: KeyUpdateEvent -> KeyUpdateEventMask
maskFor (KeyRegistered _) = OnKeyRegistered
maskFor KeyUnregistered = OnKeyUnregistered
maskFor (KeyOwnerDied _) = OnKeyOwnershipChange
maskFor (KeyOwnerChanged _ _) = OnKeyOwnershipChange
maskFor KeyLeaseExpired = OnKeyLeaseExpiry
findName :: forall k v. (Keyable k, Serializable v)
=> Key k
-> State k v
-> Maybe ProcessId
findName Key{..} state = Map.lookup keyIdentity (state ^. names)
names :: forall k v. Accessor (State k v) (HashMap k ProcessId)
names = accessor _names (\n' st -> st { _names = n' })
properties :: forall k v. Accessor (State k v) (HashMap (ProcessId, k) v)
properties = accessor _properties (\ps st -> st { _properties = ps })
monitors :: forall k v. Accessor (State k v) (MultiMap k KMRef)
monitors = accessor _monitors (\ms st -> st { _monitors = ms })
registeredPids :: forall k v. Accessor (State k v) (HashSet ProcessId)
registeredPids = accessor _registeredPids (\mp st -> st { _registeredPids = mp })
listeningPids :: forall k v. Accessor (State k v) (HashSet ProcessId)
listeningPids = accessor _listeningPids (\lp st -> st { _listeningPids = lp })
monitorIdCount :: forall k v. Accessor (State k v) Integer
monitorIdCount = accessor _monitorIdCount (\i st -> st { _monitorIdCount = i })