module Network.MQTT.Broker
( Broker (brokerAuthenticator)
, newBroker
, publishUpstream
, publishDownstream
, withSession
, getUptime
, getSessions
, getSubscriptions
, lookupSession
) where
import Control.Concurrent.MVar
import Control.Concurrent.PrioritySemaphore
import Control.Exception
import Control.Monad
import Data.Int
import qualified Data.IntMap.Strict as IM
import qualified Data.IntSet as IS
import qualified Data.Map.Strict as M
import Data.Maybe
import System.Clock
import qualified System.Log.Logger as Log
import Network.MQTT.Broker.Authentication
import Network.MQTT.Broker.Internal
import qualified Network.MQTT.Broker.RetainedMessages as RM
import qualified Network.MQTT.Broker.Session as Session
import qualified Network.MQTT.Broker.SessionStatistics as SS
import Network.MQTT.Message
import qualified Network.MQTT.Trie as R
newBroker :: auth -> IO (Broker auth)
newBroker authenticator = do
now <- sec <$> getTime Realtime
rm <- RM.new
st <-newMVar BrokerState
{ brokerMaxSessionIdentifier = SessionIdentifier 0
, brokerSubscriptions = mempty
, brokerSessions = mempty
, brokerSessionsByPrincipals = mempty
}
pure Broker {
brokerCreatedAt = now
, brokerAuthenticator = authenticator
, brokerRetainedStore = rm
, brokerState = st
}
withSession :: forall auth. (Authenticator auth) => Broker auth -> ConnectionRequest -> (RejectReason -> IO ()) -> (Session auth -> SessionPresent -> IO ()) -> IO ()
withSession broker request sessionRejectHandler sessionAcceptHandler =
(try $ authenticate (brokerAuthenticator broker) request :: IO (Either (AuthenticationException auth) (Maybe PrincipalIdentifier))) >>= \case
Left _ -> sessionRejectHandler ServerUnavailable
Right mp -> case mp of
Nothing -> sessionRejectHandler NotAuthorized
Just principalIdentifier -> bracket
( getSession broker (principalIdentifier, requestClientIdentifier request) )
(\case
Nothing -> pure ()
Just (session, _) -> if requestCleanSession request
then Session.terminate session
else Session.reset session
)
(\case
Nothing -> sessionRejectHandler NotAuthorized
Just (session, sessionPresent)->
exclusively (sessionSemaphore session) $ do
now <- sec <$> getTime Realtime
let connection = Connection {
connectionCreatedAt = now
, connectionCleanSession = requestCleanSession request
, connectionSecure = requestSecure request
, connectionWebSocket = isJust (requestHttp request)
, connectionRemoteAddress = requestRemoteAddress request
}
bracket_
( putMVar (sessionConnection session) connection )
( void $ takeMVar (sessionConnection session) )
( sessionAcceptHandler session sessionPresent )
)
lookupSession :: SessionIdentifier -> Broker auth -> IO (Maybe (Session auth))
lookupSession (SessionIdentifier sid) broker =
withMVar (brokerState broker) $ \st->
pure $ IM.lookup sid (brokerSessions st)
getSession :: Authenticator auth => Broker auth -> (PrincipalIdentifier, ClientIdentifier) -> IO (Maybe (Session auth, SessionPresent))
getSession broker pcid@(pid, cid) =
modifyMVar (brokerState broker) $ \st->
case M.lookup pcid (brokerSessionsByPrincipals st) of
Just (SessionIdentifier sid) ->
case IM.lookup sid (brokerSessions st) of
Just session ->
pure (st, Just (session, SessionPresent True))
Nothing -> do
Log.warningM "Broker.getSession" $ "Illegal state: Found orphanded session id " ++ show sid ++ "."
createSession st
Nothing -> createSession st
where
createSession st = getPrincipal (brokerAuthenticator broker) pid >>= \case
Nothing -> pure (st, Nothing)
Just principal -> do
now <- sec <$> getTime Realtime
semaphore <- newPrioritySemaphore
subscriptions <- newMVar R.empty
queue <- newMVar (emptyServerQueue $ fromIntegral $ quotaMaxPacketIdentifiers $ principalQuota principal)
queuePending <- newEmptyMVar
mconnection <- newEmptyMVar
mprincipal <- newMVar principal
stats <- SS.new
let SessionIdentifier maxSessionIdentifier = brokerMaxSessionIdentifier st
newSessionIdentifier = maxSessionIdentifier + 1
newSession = Session
{ sessionBroker = broker
, sessionIdentifier = SessionIdentifier newSessionIdentifier
, sessionClientIdentifier = cid
, sessionPrincipalIdentifier = pid
, sessionCreatedAt = now
, sessionConnection = mconnection
, sessionPrincipal = mprincipal
, sessionSemaphore = semaphore
, sessionSubscriptions = subscriptions
, sessionQueue = queue
, sessionQueuePending = queuePending
, sessionStatistics = stats
}
newBrokerState = st
{ brokerMaxSessionIdentifier = SessionIdentifier newSessionIdentifier
, brokerSessions = IM.insert newSessionIdentifier newSession (brokerSessions st)
, brokerSessionsByPrincipals = M.insert pcid (SessionIdentifier newSessionIdentifier) (brokerSessionsByPrincipals st)
}
Log.infoM "Broker.createSession" $ "Creating new session with id " ++ show newSessionIdentifier ++ " for " ++ show pid ++ "."
pure (newBrokerState, Just (newSession, SessionPresent False))
getUptime :: Broker auth -> IO Int64
getUptime broker = do
now <- sec <$> getTime Realtime
pure $ now brokerCreatedAt broker
getSessions :: Broker auth -> IO (IM.IntMap (Session auth))
getSessions broker = brokerSessions <$> readMVar (brokerState broker)
getSubscriptions :: Broker auth -> IO (R.Trie IS.IntSet)
getSubscriptions broker = brokerSubscriptions <$> readMVar (brokerState broker)