{-# LANGUAGE DeriveDataTypeable         #-}
{-# LANGUAGE DeriveGeneric              #-}
{-# LANGUAGE StandaloneDeriving         #-}
{-# LANGUAGE ExistentialQuantification  #-}
{-# LANGUAGE ScopedTypeVariables        #-}
{-# LANGUAGE PatternGuards              #-}
{-# LANGUAGE RecordWildCards            #-}
{-# LANGUAGE FlexibleInstances          #-}
{-# LANGUAGE UndecidableInstances       #-}
{-# LANGUAGE MultiParamTypeClasses      #-}

-----------------------------------------------------------------------------
-- |
-- Module      :  Control.Distributed.Process.Registry
-- Copyright   :  (c) Tim Watson 2012 - 2013
-- License     :  BSD3 (see the file LICENSE)
--
-- Maintainer  :  Tim Watson <watson.timothy@gmail.com>
-- Stability   :  experimental
-- Portability :  non-portable (requires concurrency)
--
-- The module provides an extended process registry, offering slightly altered
-- semantics to the built in @register@ and @unregister@ primitives and a richer
-- set of features:
--
-- * Associate (unique) keys with a process /or/ (unique key per-process) values
-- * Use any 'Keyable' algebraic data type as keys
-- * Query for process with matching keys / values / properties
-- * Atomically /give away/ names
-- * Forceibly re-allocate names to/from a third party
--
-- [Subscribing To Registry Events]
--
-- It is possible to monitor a registry for changes and be informed whenever
-- changes take place. All subscriptions are /key based/, which means that
-- you can subscribe to name or property changes for any process, so that any
-- property changes matching the key you've subscribed to will trigger a
-- notification (i.e., regardless of the process to which the property belongs).
--
-- The different types of event are defined by the 'KeyUpdateEvent' type.
--
-- Processes subscribe to registry events using @monitorName@ or its counterpart
-- @monitorProperty@. If the operation succeeds, this will evaluate to an
-- opaque /reference/ that can be used when subsequently handling incoming
-- notifications, which will be delivered to the subscriber's mailbox as
-- @RegistryKeyMonitorNotification keyIdentity opaqueRef event@, where @event@
-- has the type 'KeyUpdateEvent'.
--
-- Subscribers can filter the types of event they receive by using the lower
-- level @monitor@ function (defined in /this/ module - not the one defined
-- in distributed-process' @Primitives@) and passing a list of filtering
-- 'KeyUpdateEventMask'. Without these filters in place, a monitor event will
-- be fired for /every/ pertinent change.
--
-----------------------------------------------------------------------------
module Control.Distributed.Process.Registry
  ( -- * Registry Keys
    KeyType(..)
  , Key(..)
  , Keyable
    -- * Defining / Starting A Registry
  , Registry(..)
  , start
  , run
    -- * Registration / Unregistration
  , addName
  , addProperty
  , registerName
  , registerValue
  , giveAwayName
  , RegisterKeyReply(..)
  , unregisterName
  , UnregisterKeyReply(..)
    -- * Queries / Lookups
  , lookupName
  , lookupProperty
  , registeredNames
  , foldNames
  , SearchHandle()
  , member
  , queryNames
  , findByProperty
  , findByPropertyValue
    -- * Monitoring / Waiting
  , monitor
  , monitorName
  , monitorProp
  , unmonitor
  , await
  , awaitTimeout
  , AwaitResult(..)
  , KeyUpdateEventMask(..)
  , KeyUpdateEvent(..)
  , RegKeyMonitorRef
  , RegistryKeyMonitorNotification(RegistryKeyMonitorNotification)
  ) where

{- DESIGN NOTES
This registry is a single process, parameterised by the types of key and
property value it can manage. It is, of course, possible to start multiple
registries and inter-connect them via registration, with one another.

The /Service/ API is intended to be a declarative layer in which you define
the managed processes that make up your services, and each /Service Component/
is registered and supervised appropriately for you, with the correct restart
strategies and start order calculated and so on. The registry is not only a
service locator, but provides the /wait for these dependencies to start first/
bit of the puzzle.

At some point, we'd like to offer a shared memory based registry, created on
behalf of a particular subsystem (i.e., some service or service group) and
passed implicitly using a reader monad or some such. This would allow multiple
processes to interact with the registry using STM (or perhaps a simple RWLock)
and could facilitate reduced contention.

Even for the singleton-process based registry (i.e., this one) we /might/ also
be better off separating the monitoring (or at least the notifications) from
the registration/mapping parts into separate processes.
-}

import Control.Distributed.Process hiding (call, monitor, unmonitor, mask)
import qualified Control.Distributed.Process.UnsafePrimitives as Unsafe (send)
import qualified Control.Distributed.Process as P (monitor)
import Control.Distributed.Process.Serializable
import Control.Distributed.Process.Extras hiding (monitor, wrapMessage)
import qualified Control.Distributed.Process.Extras as PL
  ( monitor
  )
import Control.Distributed.Process.ManagedProcess
  ( call
  , cast
  , handleInfo
  , reply
  , continue
  , input
  , defaultProcess
  , prioritised
  , InitResult(..)
  , ProcessAction
  , ProcessReply
  , ProcessDefinition(..)
  , PrioritisedProcessDefinition(..)
  , DispatchPriority
  , CallRef
  )
import qualified Control.Distributed.Process.ManagedProcess as MP
  ( pserve
  )
import Control.Distributed.Process.ManagedProcess.Server
  ( handleCallIf
  , handleCallFrom
  , handleCallFromIf
  , handleCast
  )
import Control.Distributed.Process.ManagedProcess.Server.Priority
  ( prioritiseInfo_
  , setPriority
  )
import Control.Distributed.Process.ManagedProcess.Server.Restricted
  ( RestrictedProcess
  , Result
  , getState
  )
import qualified Control.Distributed.Process.ManagedProcess.Server.Restricted as Restricted
  ( handleCall
  , reply
  )
import Control.Distributed.Process.Extras.Time
import Control.Monad (forM_, void)
import Data.Accessor
  ( Accessor
  , accessor
  , (^:)
  , (^=)
  , (^.)
  )
import Data.Binary
import Data.Foldable (Foldable)
import qualified Data.Foldable as Foldable
import Data.Maybe (fromJust, isJust)
import Data.Hashable
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as Map
import Control.Distributed.Process.Extras.Internal.Containers.MultiMap (MultiMap)
import qualified Control.Distributed.Process.Extras.Internal.Containers.MultiMap as MultiMap
import Data.HashSet (HashSet)
import qualified Data.HashSet as Set
import Data.Typeable (Typeable)

import GHC.Generics

--------------------------------------------------------------------------------
-- Types                                                                      --
--------------------------------------------------------------------------------

-- | Describes how a key will be used - for storing names or properties.
data KeyType =
    KeyTypeAlias    -- ^ the key will refer to a name (i.e., named process)
  | KeyTypeProperty -- ^ the key will refer to a (per-process) property
  deriving (Typeable, Generic, Show, Eq)
instance Binary KeyType where
instance Hashable KeyType where

-- | A registered key. Keys can be mapped to names or (process-local) properties
-- in the registry. The 'keyIdentity' holds the key's value (e.g., a string or
-- similar simple data type, which must provide a 'Keyable' instance), whilst
-- the 'keyType' and 'keyScope' describe the key's intended use and ownership.
data Key a =
    Key
    { keyIdentity :: !a
    , keyType     :: !KeyType
    , keyScope    :: !(Maybe ProcessId)
    }
  deriving (Typeable, Generic, Show, Eq)
instance (Serializable a) => Binary (Key a) where
instance (Hashable a) => Hashable (Key a) where

-- | The 'Keyable' class describes types that can be used as registry keys.
-- The constraints ensure that the key can be stored and compared appropriately.
class (Show a, Eq a, Hashable a, Serializable a) => Keyable a
instance (Show a, Eq a, Hashable a, Serializable a) => Keyable a

-- | A phantom type, used to parameterise registry startup
-- with the required key and value types.
data Registry k v = Registry { registryPid :: ProcessId }
  deriving (Typeable, Generic, Show, Eq)
instance (Keyable k, Serializable v) => Binary (Registry k v) where

instance Resolvable (Registry k v) where
  resolve = return . Just . registryPid

instance Linkable (Registry k v) where
  linkTo = link . registryPid

-- Internal/Private Request/Response Types

data LookupKeyReq k = LookupKeyReq !(Key k)
  deriving (Typeable, Generic)
instance (Serializable k) => Binary (LookupKeyReq k) where

data LookupPropReq k = PropReq (Key k)
  deriving (Typeable, Generic)
instance (Serializable k) => Binary (LookupPropReq k) where

data LookupPropReply =
    PropFound !Message
  | PropNotFound
  deriving (Typeable, Generic)
instance Binary LookupPropReply where

data InvalidPropertyType = InvalidPropertyType
  deriving (Typeable, Generic, Show, Eq)
instance Binary InvalidPropertyType where

data RegNamesReq = RegNamesReq !ProcessId
  deriving (Typeable, Generic)
instance Binary RegNamesReq where

data UnregisterKeyReq k = UnregisterKeyReq !(Key k)
  deriving (Typeable, Generic)
instance (Serializable k) => Binary (UnregisterKeyReq k) where

-- | The result of an un-registration attempt.
data UnregisterKeyReply =
    UnregisterOk -- ^ The given key was successfully unregistered
  | UnregisterInvalidKey -- ^ The given key was invalid and could not be unregistered
  | UnregisterKeyNotFound -- ^ The given key was not found (i.e., was not registered)
  deriving (Typeable, Generic, Eq, Show)
instance Binary UnregisterKeyReply where

-- Types used in (setting up and interacting with) key monitors

-- | Used to describe a subset of monitoring events to listen for.
data KeyUpdateEventMask =
    OnKeyRegistered      -- ^ receive an event when a key is registered
  | OnKeyUnregistered    -- ^ receive an event when a key is unregistered
  | OnKeyOwnershipChange -- ^ receive an event when a key's owner changes
  | OnKeyLeaseExpiry     -- ^ receive an event when a key's lease expires
  deriving (Typeable, Generic, Eq, Show)
instance Binary KeyUpdateEventMask where
instance Hashable KeyUpdateEventMask where

-- | An opaque reference used for matching monitoring events. See
-- 'RegistryKeyMonitorNotification' for more details.
newtype RegKeyMonitorRef =
  RegKeyMonitorRef { unRef :: (ProcessId, Integer) }
  deriving (Typeable, Generic, Eq, Show)
instance Binary RegKeyMonitorRef where
instance Hashable RegKeyMonitorRef where

instance Resolvable RegKeyMonitorRef where
  resolve = return . Just . fst . unRef

-- | Provides information about a key monitoring event.
data KeyUpdateEvent =
    KeyRegistered
    {
      owner :: !ProcessId
    }
  | KeyUnregistered
  | KeyLeaseExpired
  | KeyOwnerDied
    {
      diedReason :: !DiedReason
    }
  | KeyOwnerChanged
    {
      previousOwner :: !ProcessId
    , newOwner      :: !ProcessId
    }
  deriving (Typeable, Generic, Eq, Show)
instance Binary KeyUpdateEvent where

-- | This message is delivered to processes which are monioring a
-- registry key. The opaque monitor reference will match (i.e., be equal
-- to) the reference returned from the @monitor@ function, which the
-- 'KeyUpdateEvent' describes the change that took place.
data RegistryKeyMonitorNotification k =
  RegistryKeyMonitorNotification !k !RegKeyMonitorRef !KeyUpdateEvent !ProcessId
  deriving (Typeable, Generic)
instance (Keyable k) => Binary (RegistryKeyMonitorNotification k) where
deriving instance (Keyable k) => Eq (RegistryKeyMonitorNotification k)
deriving instance (Keyable k) => Show (RegistryKeyMonitorNotification k)

data RegisterKeyReq k = RegisterKeyReq !(Key k)
  deriving (Typeable, Generic)
instance (Serializable k) => Binary (RegisterKeyReq k) where

-- | The (return) value of an attempted registration.
data RegisterKeyReply =
    RegisteredOk      -- ^ The given key was registered successfully
  | AlreadyRegistered -- ^ The key was already registered
  deriving (Typeable, Generic, Eq, Show)
instance Binary RegisterKeyReply where

-- | A cast message used to atomically give a name/key away to another process.
data GiveAwayName k = GiveAwayName !ProcessId !(Key k)
  deriving (Typeable, Generic)
instance (Keyable k) => Binary (GiveAwayName k) where
deriving instance (Keyable k) => Eq (GiveAwayName k)
deriving instance (Keyable k) => Show (GiveAwayName k)

data MonitorReq k = MonitorReq !(Key k) !(Maybe [KeyUpdateEventMask])
  deriving (Typeable, Generic)
instance (Keyable k) => Binary (MonitorReq k) where

data UnmonitorReq = UnmonitorReq !RegKeyMonitorRef
  deriving (Typeable, Generic)
instance Binary UnmonitorReq where

-- | The result of an @await@ operation.
data AwaitResult k =
    RegisteredName     !ProcessId !k   -- ^ The name was registered
  | ServerUnreachable  !DiedReason     -- ^ The server was unreachable (or died)
  | AwaitTimeout                       -- ^ The operation timed out
  deriving (Typeable, Generic, Eq, Show)
instance (Keyable k) => Binary (AwaitResult k) where

-- Server state

-- On the server, a monitor reference consists of the actual
-- RegKeyMonitorRef which we can 'sendTo' /and/ the which
-- the client matches on, plus an optional list of event masks
data KMRef = KMRef { ref  :: !RegKeyMonitorRef
                   , mask :: !(Maybe [KeyUpdateEventMask])
                     -- use Nothing to monitor every event
                   }
  deriving (Typeable, Generic, Show)
instance Hashable KMRef where
-- instance Binary KMRef where
instance Eq KMRef where
  (KMRef a _) == (KMRef b _) = a == b

data State k v =
  State
  {
    _names          :: !(HashMap k ProcessId)
  , _properties     :: !(HashMap (ProcessId, k) v)
  , _monitors       :: !(MultiMap k KMRef)
  , _registeredPids :: !(HashSet ProcessId)
  , _listeningPids  :: !(HashSet ProcessId)
  , _monitorIdCount :: !Integer
  }
  deriving (Typeable, Generic)

-- Types used in \direct/ queries

-- TODO: enforce QueryDirect's usage over only local channels

data QueryDirect = QueryDirectNames | QueryDirectProperties | QueryDirectValues
  deriving (Typeable, Generic)
instance Binary QueryDirect where

-- NB: SHashMap is basically a shim, allowing us to copy a
-- pointer to our HashMap directly to the querying process'
-- mailbox with no serialisation or even deepseq evaluation
-- required. We disallow remote queries (i.e., from other nodes)
-- and thus the Binary instance below is never used (though it's
-- required by the type system) and will in fact generate errors if
-- you attempt to use it at runtime.
data SHashMap k v = SHashMap [(k, v)] (HashMap k v)
  deriving (Typeable, Generic)

instance (Keyable k, Serializable v) =>
         Binary (SHashMap k v) where
  put = error "AttemptedToUseBinaryShim"
  get = error "AttemptedToUseBinaryShim"
{- a real instance could look something like this:

  put (SHashMap _ hmap) = put (toList hmap)
  get = do
    hm <- get :: Get [(k, v)]
    return $ SHashMap [] (fromList hm)
-}

newtype SearchHandle k v = RS { getRS :: HashMap k v }
  deriving (Typeable)

instance (Keyable k) => Functor (SearchHandle k) where
  fmap f (RS m) = RS $ Map.map f m
instance (Keyable k) => Foldable (SearchHandle k) where
  foldr f acc = Foldable.foldr f acc . getRS
-- TODO: add Functor and Traversable instances

--------------------------------------------------------------------------------
-- Starting / Running A Registry                                              --
--------------------------------------------------------------------------------

start :: forall k v. (Keyable k, Serializable v)
      => Process (Registry k v)
start = return . Registry =<< spawnLocal (run (undefined :: Registry k v))

run :: forall k v. (Keyable k, Serializable v)
    => Registry k v
    -> Process ()
run _ =
  MP.pserve () (const $ return $ InitOk initState Infinity) serverDefinition
  where
    initState = State { _names          = Map.empty
                      , _properties     = Map.empty
                      , _monitors       = MultiMap.empty
                      , _registeredPids = Set.empty
                      , _listeningPids  = Set.empty
                      , _monitorIdCount = (1 :: Integer)
                      } :: State k v

--------------------------------------------------------------------------------
-- Client Facing API                                                          --
--------------------------------------------------------------------------------

-- -- | Sends a message to the process, or processes, corresponding to @key@.
-- -- If Key belongs to a unique object (name or aggregated counter), this
-- -- function will send a message to the corresponding process, or fail if there
-- -- is no such process. If Key is for a non-unique object type (counter or
-- -- property), Msg will be send to all processes that have such an object.
-- --
-- dispatch svr ky msg = undefined
--   -- TODO: do a local-lookup and then sendTo the target

-- | Associate the calling process with the given (unique) key.
addName :: forall k v. (Keyable k)
        => Registry k v
        -> k
        -> Process RegisterKeyReply
addName s n = getSelfPid >>= registerName s n

-- | Atomically transfer a (registered) name to another process. Has no effect
-- if the name does is not registered to the calling process!
--
giveAwayName :: forall k v . (Keyable k)
             => Registry k v
             -> k
             -> ProcessId
             -> Process ()
giveAwayName s n p = do
  us <- getSelfPid
  cast s $ GiveAwayName p $ Key n KeyTypeAlias (Just us)

-- | Associate the given (non-unique) property with the current process.
-- If the property already exists, it will be overwritten with the new value.
addProperty :: (Keyable k, Serializable v)
            => Registry k v -> k -> v -> Process RegisterKeyReply
addProperty s k v = do
  call s $ (RegisterKeyReq (Key k KeyTypeProperty $ Nothing), v)

-- | Register the item at the given address.
registerName :: forall k v . (Keyable k)
             => Registry k v -> k -> ProcessId -> Process RegisterKeyReply
registerName s n p = do
  call s $ RegisterKeyReq (Key n KeyTypeAlias $ Just p)

-- | Register an item at the given address and associate it with a value.
-- If the property already exists, it will be overwritten with the new value.
registerValue :: (Resolvable b, Keyable k, Serializable v)
              => Registry k v -> b -> k -> v -> Process RegisterKeyReply
registerValue s t n v = do
  Just p <- resolve t
  call s $ (RegisterKeyReq (Key n KeyTypeProperty $ Just p), v)

-- | Un-register a (unique) name for the calling process.
unregisterName :: forall k v . (Keyable k)
               => Registry k v
               -> k
               -> Process UnregisterKeyReply
unregisterName s n = do
  self <- getSelfPid
  call s $ UnregisterKeyReq (Key n KeyTypeAlias $ Just self)

-- | Lookup the process identified by the supplied key. Evaluates to
-- @Nothing@ if the key is not registered.
lookupName :: forall k v . (Keyable k)
           => Registry k v
           -> k
           -> Process (Maybe ProcessId)
lookupName s n = call s $ LookupKeyReq (Key n KeyTypeAlias Nothing)

-- | Lookup the value of a named property for the calling process. Evaluates to
-- @Nothing@ if the property (key) is not registered. If the assignment to a
-- value of type @v@ does not correspond to the type of properties stored by
-- the registry, the calling process will exit with the reason set to
-- @InvalidPropertyType@.
lookupProperty :: (Keyable k, Serializable v)
               => Registry k v
               -> k
               -> Process (Maybe v)
lookupProperty s n = do
  us <- getSelfPid
  res <- call s $ PropReq (Key n KeyTypeProperty (Just us))
  case res of
    PropNotFound  -> return Nothing
    PropFound msg -> do
      val <- unwrapMessage msg
      if (isJust val)
        then return val
        else die InvalidPropertyType

-- | Obtain a list of all registered keys.
registeredNames :: forall k v . (Keyable k)
                => Registry k v
                -> ProcessId
                -> Process [k]
registeredNames s p = call s $ RegNamesReq p

-- | Monitor changes to the supplied name.
--
monitorName :: forall k v. (Keyable k)
            => Registry k v -> k -> Process RegKeyMonitorRef
monitorName svr name = do
  let key' = Key { keyIdentity = name
                 , keyScope    = Nothing
                 , keyType     = KeyTypeAlias
                 }
  monitor svr key' Nothing

-- | Monitor changes to the supplied (property) key.
--
monitorProp :: forall k v. (Keyable k)
            => Registry k v -> k -> ProcessId -> Process RegKeyMonitorRef
monitorProp svr key pid = do
  let key' = Key { keyIdentity = key
                 , keyScope    = Just pid
                 , keyType     = KeyTypeProperty
                 }
  monitor svr key' Nothing

-- | Low level monitor operation. For the given key, set up a monitor
-- filtered by any 'KeyUpdateEventMask' entries that are supplied.
monitor :: forall k v. (Keyable k)
        => Registry k v
        -> Key k
        -> Maybe [KeyUpdateEventMask]
        -> Process RegKeyMonitorRef
monitor svr key' mask' = call svr $ MonitorReq key' mask'

-- | Remove a previously set monitor.
--
unmonitor :: forall k v. (Keyable k)
          => Registry k v
          -> RegKeyMonitorRef
          -> Process ()
unmonitor s = call s . UnmonitorReq

-- | Await registration of a given key. This function will subsequently
-- block the evaluating process until the key is registered and a registration
-- event is dispatched to the caller's mailbox.
--
await :: forall k v. (Keyable k)
      => Registry k v
      -> k
      -> Process (AwaitResult k)
await a k = awaitTimeout a Infinity k

-- | Await registration of a given key, but give up and return @AwaitTimeout@
-- if registration does not take place within the specified time period (@delay@).
awaitTimeout :: forall k v. (Keyable k)
             => Registry k v
             -> Delay
             -> k
             -> Process (AwaitResult k)
awaitTimeout a d k = do
    p <- forceResolve a
    Just mRef <- PL.monitor p
    kRef <- monitor a (Key k KeyTypeAlias Nothing) (Just [OnKeyRegistered])
    let matches' = matches mRef kRef k
    let recv = case d of
                 Infinity -> receiveWait matches' >>= return . Just
                 Delay t  -> receiveTimeout (asTimeout t) matches'
                 NoDelay  -> receiveTimeout 0 matches'
    recv >>= return . maybe AwaitTimeout id
  where
    forceResolve addr = do
      mPid <- resolve addr
      case mPid of
        Nothing -> die "InvalidAddressable"
        Just p  -> return p

    matches mr kr k' = [
        matchIf (\(RegistryKeyMonitorNotification mk' kRef' ev' _) ->
                      (matchEv ev' && kRef' == kr && mk' == k'))
                (\(RegistryKeyMonitorNotification _ _ (KeyRegistered pid) _) ->
                  return $ RegisteredName pid k')
      , matchIf (\(ProcessMonitorNotification mRef' _ _) -> mRef' == mr)
                (\(ProcessMonitorNotification _ _ dr) ->
                  return $ ServerUnreachable dr)
      ]

    matchEv ev' = case ev' of
                    KeyRegistered _ -> True
                    _               -> False

-- Local (non-serialised) shared data access. See note [sharing] below.

findByProperty :: forall k v. (Keyable k)
               => Registry k v
               -> k
               -> Process [ProcessId]
findByProperty r key = do
    let pid = registryPid r
    self <- getSelfPid
    withMonitor pid $ do
      cast r $ (self, QueryDirectProperties)
      answer <- receiveWait [
          match (\(SHashMap _ m :: SHashMap ProcessId [k]) -> return $ Just m)
        , matchIf (\(ProcessMonitorNotification _ p _) -> p == pid)
                  (\_ -> return Nothing)
        , matchAny (\_ -> return Nothing)
        ]
      case answer of
        Nothing -> die "DisconnectedFromServer"
        Just m  -> return $ Map.foldlWithKey' matchKey [] m
  where
    matchKey ps p ks
      | key `elem` ks = p:ps
      | otherwise     = ps

findByPropertyValue :: (Keyable k, Serializable v, Eq v)
                    => Registry k v
                    -> k
                    -> v
                    -> Process [ProcessId]
findByPropertyValue r key val = do
    let pid = registryPid r
    self <- getSelfPid
    withMonitor pid $ do
      cast r $ (self, QueryDirectValues)
      answer <- receiveWait [
          match (\(SHashMap _ m :: SHashMap ProcessId [(k, v)]) -> return $ Just m)
        , matchIf (\(ProcessMonitorNotification _ p _) -> p == pid)
                  (\_ -> return Nothing)
        , matchAny (\_ -> return Nothing) -- TODO: logging?
        ]
      case answer of
        Nothing -> die "DisconnectedFromServer"
        Just m  -> return $ Map.foldlWithKey' matchKey [] m
  where
    matchKey ps p ks
      | (key, val) `elem` ks = p:ps
      | otherwise            = ps

-- TODO: move to UnsafePrimitives over a passed {Send|Receive}Port here, and
-- avoid interfering with the caller's mailbox.

-- | Monadic left fold over all registered names/keys. The fold takes place
-- in the evaluating process.
foldNames :: forall b k v . Keyable k
          => Registry k v
          -> b
          -> (b -> (k, ProcessId) -> Process b)
          -> Process b
foldNames pid acc fn = do
  self <- getSelfPid
  -- TODO: monitor @pid@ and die if necessary!!!
  cast pid $ (self, QueryDirectNames)
  -- Although we incur the cost of scanning our mailbox here (which we could
  -- avoid by spawning an intermediary perhaps), the message is delivered to
  -- us without any copying or serialisation overheads.
  SHashMap _ m <- expect :: Process (SHashMap k ProcessId)
  Foldable.foldlM fn acc (Map.toList m)

-- | Evaluate a query on a 'SearchHandle', in the calling process.
queryNames :: forall b k v . Keyable k
       => Registry k v
       -> (SearchHandle k ProcessId -> Process b)
       -> Process b
queryNames pid fn = do
  self <- getSelfPid
  cast pid $ (self, QueryDirectNames)
  SHashMap _ m <- expect :: Process (SHashMap k ProcessId)
  fn (RS m)

-- | Tests whether or not the supplied key is registered, evaluated in the
-- calling process.
member :: (Keyable k, Serializable v)
       => k
       -> SearchHandle k v
       -> Bool
member k = Map.member k . getRS

-- note [sharing]:
-- We use the base library's UnsafePrimitives for these fold/query operations,
-- to pass a pointer to our internal HashMaps for read-only operations. There
-- is a potential cost to the caller, if their mailbox is full - we should move
-- to use unsafe channel's for this at some point.
--

--------------------------------------------------------------------------------
-- Server Process                                                             --
--------------------------------------------------------------------------------

serverDefinition :: forall k v. (Keyable k, Serializable v)
                 => PrioritisedProcessDefinition (State k v)
serverDefinition = prioritised processDefinition regPriorities
  where
    regPriorities :: [DispatchPriority (State k v)]
    regPriorities = [
        prioritiseInfo_ (\(ProcessMonitorNotification _ _ _) -> setPriority 100)
      ]

processDefinition :: forall k v. (Keyable k, Serializable v)
                  => ProcessDefinition (State k v)
processDefinition =
  defaultProcess
  {
    apiHandlers =
       [
         handleCallIf
              (input ((\(RegisterKeyReq (Key{..} :: Key k)) ->
                        keyType == KeyTypeAlias && (isJust keyScope))))
              handleRegisterName
       , handleCallIf
              (input ((\((RegisterKeyReq (Key{..} :: Key k)), _ :: v) ->
                        keyType == KeyTypeProperty && (isJust keyScope))))
              handleRegisterProperty
       , handleCallFromIf
              (input ((\((RegisterKeyReq (Key{..} :: Key k)), _ :: v) ->
                        keyType == KeyTypeProperty && (not $ isJust keyScope))))
              handleRegisterPropertyCR
       , handleCast handleGiveAwayName
       , handleCallIf
              (input ((\(LookupKeyReq (Key{..} :: Key k)) ->
                        keyType == KeyTypeAlias)))
              (\state (LookupKeyReq key') -> reply (findName key' state) state)
       , handleCallIf
              (input ((\(PropReq (Key{..} :: Key k)) ->
                        keyType == KeyTypeProperty && (isJust keyScope))))
              handleLookupProperty
       , handleCallIf
              (input ((\(UnregisterKeyReq (Key{..} :: Key k)) ->
                        keyType == KeyTypeAlias && (isJust keyScope))))
              handleUnregisterName
       , handleCallFrom handleMonitorReq
       , handleCallFrom handleUnmonitorReq
       , Restricted.handleCall handleRegNamesLookup
       , handleCast handleQuery
       ]
  , infoHandlers = [handleInfo handleMonitorSignal]
  } :: ProcessDefinition (State k v)

handleQuery :: forall k v. (Keyable k, Serializable v)
            => State k v
            -> (ProcessId, QueryDirect)
            -> Process (ProcessAction (State k v))
handleQuery st@State{..} (pid, qd) = do
    case qd of
      QueryDirectNames      -> Unsafe.send pid shmNames
      QueryDirectProperties -> Unsafe.send pid shmProps
      QueryDirectValues     -> Unsafe.send pid shmVals
    continue st
  where
    shmNames = SHashMap [] $ st ^. names
    shmProps = SHashMap [] xfmProps
    shmVals  = SHashMap [] xfmVals

    -- since we currently have to fold over our properties in order
    -- answer remote queries, the sharing we do here seems a bit pointless,
    -- however we'll be moving to a shared memory based registry soon

    xfmProps = Map.foldlWithKey' convProps Map.empty (st ^. properties)
    xfmVals  = Map.foldlWithKey' convVals Map.empty (st ^. properties)

    convProps m (p, k) _ =
      case Map.lookup p m of
        Nothing -> Map.insert p [k] m
        Just ks -> Map.insert p (k:ks) m

    convVals m (p, k) v =
      case Map.lookup p m of
        Nothing -> Map.insert p [(k, v)] m
        Just ks -> Map.insert p ((k, v):ks) m

handleRegisterName :: forall k v. (Keyable k, Serializable v)
                   => State k v
                   -> RegisterKeyReq k
                   -> Process (ProcessReply RegisterKeyReply (State k v))
handleRegisterName state (RegisterKeyReq Key{..}) = do
  let found = Map.lookup keyIdentity (state ^. names)
  case found of
    Nothing -> do
      let pid  = fromJust keyScope
      let refs = state ^. registeredPids
      refs' <- ensureMonitored pid refs
      notifySubscribers keyIdentity state (KeyRegistered pid)
      reply RegisteredOk $ ( (names ^: Map.insert keyIdentity pid)
                           . (registeredPids ^= refs')
                           $ state)
    Just pid ->
      if (pid == (fromJust keyScope))
         then reply RegisteredOk      state
         else reply AlreadyRegistered state

handleRegisterPropertyCR :: forall k v. (Keyable k, Serializable v)
                         => State k v
                         -> CallRef (RegisterKeyReply)
                         -> (RegisterKeyReq k, v)
                         -> Process (ProcessReply RegisterKeyReply (State k v))
handleRegisterPropertyCR st cr req = do
  pid <- resolve cr
  doRegisterProperty (fromJust pid) st req

handleRegisterProperty :: forall k v. (Keyable k, Serializable v)
                       => State k v
                       -> (RegisterKeyReq k, v)
                       -> Process (ProcessReply RegisterKeyReply (State k v))
handleRegisterProperty state req@((RegisterKeyReq Key{..}), _) = do
  doRegisterProperty (fromJust keyScope) state req

doRegisterProperty :: forall k v. (Keyable k, Serializable v)
                       => ProcessId
                       -> State k v
                       -> (RegisterKeyReq k, v)
                       -> Process (ProcessReply RegisterKeyReply (State k v))
doRegisterProperty scope state ((RegisterKeyReq Key{..}), v) = do
  void $ P.monitor scope
  notifySubscribers keyIdentity state (KeyRegistered scope)
  reply RegisteredOk $ ( (properties ^: Map.insert (scope, keyIdentity) v)
                       $ state )

handleLookupProperty :: forall k v. (Keyable k, Serializable v)
                     => State k v
                     -> LookupPropReq k
                     -> Process (ProcessReply LookupPropReply (State k v))
handleLookupProperty state (PropReq Key{..}) = do
  let entry = Map.lookup (fromJust keyScope, keyIdentity) (state ^. properties)
  case entry of
    Nothing -> reply PropNotFound state
    Just p  -> reply (PropFound (wrapMessage p)) state

handleUnregisterName :: forall k v. (Keyable k, Serializable v)
                     => State k v
                     -> UnregisterKeyReq k
                     -> Process (ProcessReply UnregisterKeyReply (State k v))
handleUnregisterName state (UnregisterKeyReq Key{..}) = do
  let entry = Map.lookup keyIdentity (state ^. names)
  case entry of
    Nothing  -> reply UnregisterKeyNotFound state
    Just pid ->
      case (pid /= (fromJust keyScope)) of
        True  -> reply UnregisterInvalidKey state
        False -> do
          notifySubscribers keyIdentity state KeyUnregistered
          let state' = ( (names ^: Map.delete keyIdentity)
                       . (monitors ^: MultiMap.filterWithKey (\k' _ -> k' /= keyIdentity))
                       $ state)
          reply UnregisterOk $ state'

handleGiveAwayName :: forall k v. (Keyable k, Serializable v)
                   => State k v
                   -> GiveAwayName k
                   -> Process (ProcessAction (State k v))
handleGiveAwayName state (GiveAwayName newPid Key{..}) = do
  maybe (continue state) giveAway $ Map.lookup keyIdentity (state ^. names)
  where
    giveAway pid = do
      let scope = fromJust keyScope
      case (pid == scope) of
        False -> continue state
        True -> do
          let state' = ((names ^: Map.insert keyIdentity newPid) $ state)
          notifySubscribers keyIdentity state (KeyOwnerChanged pid newPid)
          continue state'

handleMonitorReq :: forall k v. (Keyable k, Serializable v)
                 => State k v
                 -> CallRef RegKeyMonitorRef
                 -> MonitorReq k
                 -> Process (ProcessReply RegKeyMonitorRef (State k v))
handleMonitorReq state cRef (MonitorReq Key{..} mask') = do
  let mRefId = (state ^. monitorIdCount) + 1
  Just caller <- resolve cRef
  let mRef  = RegKeyMonitorRef (caller, mRefId)
  let kmRef = KMRef mRef mask'
  let refs = state ^. listeningPids
  refs' <- ensureMonitored caller refs
  fireEventForPreRegisteredKey state keyIdentity keyScope kmRef
  reply mRef $ ( (monitors ^: MultiMap.insert keyIdentity kmRef)
               . (listeningPids ^= refs')
               . (monitorIdCount ^= mRefId)
               $ state
               )
  where
    fireEventForPreRegisteredKey st kId kScope KMRef{..} = do
      let evMask = maybe [] id mask
      case (keyType, elem OnKeyRegistered evMask) of
        (KeyTypeAlias, True) -> do
          let found = Map.lookup kId (st ^. names)
          fireEvent found kId ref
        (KeyTypeProperty, _) -> do
          self <- getSelfPid
          let scope = maybe self id kScope
          let found = Map.lookup (scope, kId) (st ^. properties)
          case found of
            Nothing -> return () -- TODO: logging or some such!?
            Just _  -> fireEvent (Just scope) kId ref
        _ -> return ()

    fireEvent fnd kId' ref' = do
      case fnd of
        Nothing -> return ()
        Just p  -> do
          us <- getSelfPid
          sendTo ref' $ (RegistryKeyMonitorNotification kId'
                         ref'
                         (KeyRegistered p)
                         us)

handleUnmonitorReq :: forall k v. (Keyable k, Serializable v)
                 => State k v
                 -> CallRef ()
                 -> UnmonitorReq
                 -> Process (ProcessReply () (State k v))
handleUnmonitorReq state _cRef (UnmonitorReq ref') = do
  let pid = fst $ unRef ref'
  reply () $ ( (monitors ^: MultiMap.filter ((/= ref') . ref))
             . (listeningPids ^: Set.delete pid)
             $ state
             )

handleRegNamesLookup :: forall k v. (Keyable k, Serializable v)
                     => RegNamesReq
                     -> RestrictedProcess (State k v) (Result [k])
handleRegNamesLookup (RegNamesReq p) = do
  state <- getState
  Restricted.reply $ Map.foldlWithKey' (acc p) [] (state ^. names)
  where
    acc pid ns n pid'
      | pid == pid' = (n:ns)
      | otherwise   = ns

handleMonitorSignal :: forall k v. (Keyable k, Serializable v)
                    => State k v
                    -> ProcessMonitorNotification
                    -> Process (ProcessAction (State k v))
handleMonitorSignal state@State{..} (ProcessMonitorNotification _ pid diedReason) =
  do let state' = removeActiveSubscriptions pid state
     (deadNames, deadProps) <- notifyListeners state' pid diedReason
     continue $ ( (names ^= Map.difference _names deadNames)
                . (properties ^= Map.difference _properties deadProps)
                $ state)
  where
    removeActiveSubscriptions p s =
      let subscriptions = (state ^. listeningPids) in
      case (Set.member p subscriptions) of
        False -> s
        True  -> ( (listeningPids ^: Set.delete p)
                   -- delete any monitors this (now dead) process held
                 . (monitors ^: MultiMap.filter ((/= p) . fst . unRef . ref))
                 $ s)

    notifyListeners :: State k v
                    -> ProcessId
                    -> DiedReason
                    -> Process (HashMap k ProcessId, HashMap (ProcessId, k) v)
    notifyListeners st pid' dr = do
      let diedNames = Map.filter (== pid') (st ^. names)
      let diedProps = Map.filterWithKey (\(p, _) _ -> p == pid')
                                        (st ^. properties)
      let nameSubs  = MultiMap.filterWithKey (\k _ -> Map.member k diedNames)
                                             (st ^. monitors)
      let propSubs  = MultiMap.filterWithKey (\k _ -> Map.member (pid', k) diedProps)
                                             (st ^. monitors)
      forM_ (MultiMap.toList nameSubs) $ \(kIdent, KMRef{..}) -> do
        let kEvDied = KeyOwnerDied { diedReason = dr }
        let mRef    = RegistryKeyMonitorNotification kIdent ref
        us <- getSelfPid
        case mask of
          Nothing    -> sendTo ref (mRef kEvDied us)
          Just mask' -> do
            case (elem OnKeyOwnershipChange mask') of
              True  -> sendTo ref (mRef kEvDied us)
              False -> do
                if (elem OnKeyUnregistered mask')
                  then sendTo ref (mRef KeyUnregistered us)
                  else return ()
      forM_ (MultiMap.toList propSubs) (notifyPropSubscribers dr)
      return (diedNames, diedProps)

    notifyPropSubscribers dr' (kIdent, KMRef{..}) = do
      let died  = maybe False (elem OnKeyOwnershipChange) mask
      let event = case died of
                    True  -> KeyOwnerDied { diedReason = dr' }
                    False -> KeyUnregistered
      getSelfPid >>= sendTo ref . RegistryKeyMonitorNotification kIdent ref event

ensureMonitored :: ProcessId -> HashSet ProcessId -> Process (HashSet ProcessId)
ensureMonitored pid refs = do
  case (Set.member pid refs) of
    True  -> return refs
    False -> P.monitor pid >> return (Set.insert pid refs)

notifySubscribers :: forall k v. (Keyable k, Serializable v)
                  => k
                  -> State k v
                  -> KeyUpdateEvent
                  -> Process ()
notifySubscribers k st ev = do
  let subscribers = MultiMap.filterWithKey (\k' _ -> k' == k) (st ^. monitors)
  forM_ (MultiMap.toList subscribers) $ \(_, KMRef{..}) -> do
    if (maybe True (elem (maskFor ev)) mask)
      then getSelfPid >>= sendTo ref . RegistryKeyMonitorNotification k ref ev
      else {- (liftIO $ putStrLn "no mask") >> -} return ()

--------------------------------------------------------------------------------
-- Utilities / Accessors                                                      --
--------------------------------------------------------------------------------

maskFor :: KeyUpdateEvent -> KeyUpdateEventMask
maskFor (KeyRegistered _)     = OnKeyRegistered
maskFor KeyUnregistered       = OnKeyUnregistered
maskFor (KeyOwnerDied   _)    = OnKeyOwnershipChange
maskFor (KeyOwnerChanged _ _) = OnKeyOwnershipChange
maskFor KeyLeaseExpired       = OnKeyLeaseExpiry

findName :: forall k v. (Keyable k, Serializable v)
         => Key k
         -> State k v
         -> Maybe ProcessId
findName Key{..} state = Map.lookup keyIdentity (state ^. names)

names :: forall k v. Accessor (State k v) (HashMap k ProcessId)
names = accessor _names (\n' st -> st { _names = n' })

properties :: forall k v. Accessor (State k v) (HashMap (ProcessId, k) v)
properties = accessor _properties (\ps st -> st { _properties = ps })

monitors :: forall k v. Accessor (State k v) (MultiMap k KMRef)
monitors = accessor _monitors (\ms st -> st { _monitors = ms })

registeredPids :: forall k v. Accessor (State k v) (HashSet ProcessId)
registeredPids = accessor _registeredPids (\mp st -> st { _registeredPids = mp })

listeningPids :: forall k v. Accessor (State k v) (HashSet ProcessId)
listeningPids = accessor _listeningPids (\lp st -> st { _listeningPids = lp })

monitorIdCount :: forall k v. Accessor (State k v) Integer
monitorIdCount = accessor _monitorIdCount (\i st -> st { _monitorIdCount = i })