module Network.MQTT.Broker.Session
( publish
, subscribe
, unsubscribe
, disconnect
, terminate
, getSubscriptions
, getConnection
, getPrincipal
, getFreePacketIdentifiers
, reset
, notePending
, waitPending
, publishMessage
, publishMessages
, enqueuePingResponse
, enqueueMessage
, enqueueSubscribeAcknowledged
, enqueueUnsubscribeAcknowledged
, dequeue
, processPublish
, processPublishRelease
, processPublishReceived
, processPublishComplete
, processPublishAcknowledged
, Session (..)
, SessionIdentifier (..)
, Connection (..)
) where
import Control.Concurrent.MVar
import Control.Concurrent.PrioritySemaphore
import Control.Monad
import Data.Bool
import Data.Functor.Identity
import qualified Data.IntMap as IM
import qualified Data.IntSet as IS
import Data.Maybe
import Data.Monoid
import qualified Data.Sequence as Seq
import Network.MQTT.Broker.Authentication hiding (getPrincipal)
import Network.MQTT.Broker.Internal
import qualified Network.MQTT.Broker.RetainedMessages as RM
import qualified Network.MQTT.Broker.SessionStatistics as SS
import Network.MQTT.Message
import qualified Network.MQTT.Trie as R
publish :: Session auth -> Message -> IO ()
publish session msg = do
principal <- readMVar (sessionPrincipal session)
if R.matchTopic (msgTopic msg) (principalPublishPermissions principal)
then do
if retain && R.matchTopic (msgTopic msg) (principalRetainPermissions principal)
then do
RM.store msg (brokerRetainedStore $ sessionBroker session)
SS.accountRetentionsAccepted stats 1
else
SS.accountRetentionsDropped stats 1
publishUpstream (sessionBroker session) msg
SS.accountPublicationsAccepted stats 1
else
SS.accountPublicationsDropped stats 1
where
stats = sessionStatistics session
Retain retain = msgRetain msg
subscribe :: Session auth -> PacketIdentifier -> [(Filter, QoS)] -> IO ()
subscribe session pid filters = do
principal <- readMVar (sessionPrincipal session)
checkedFilters <- mapM (checkPermission principal) filters
let subscribeFilters = mapMaybe (\(filtr,mqos)->(filtr,) <$> mqos) checkedFilters
qosTree = R.insertFoldable subscribeFilters R.empty
sidTree = R.map (const $ IS.singleton sid) qosTree
let countAccepted = length subscribeFilters
let countDenied = length filters countAccepted
SS.accountSubscriptionsAccepted (sessionStatistics session) $ fromIntegral countAccepted
SS.accountSubscriptionsDenied (sessionStatistics session) $ fromIntegral countDenied
qosTree `seq` do
modifyMVarMasked_ (brokerState $ sessionBroker session) $ \bst-> do
modifyMVarMasked_
( sessionSubscriptions session )
( pure . R.unionWith max qosTree )
pure $ bst { brokerSubscriptions = R.unionWith IS.union (brokerSubscriptions bst) sidTree }
enqueueSubscribeAcknowledged session pid (fmap snd checkedFilters)
forM_ checkedFilters $ \(filtr,_qos)->
publishMessages session =<< RM.retrieve filtr (brokerRetainedStore $ sessionBroker session)
where
SessionIdentifier sid = sessionIdentifier session
checkPermission principal (filtr, qos) = do
let isPermitted = R.matchFilter filtr (principalSubscribePermissions principal)
pure (filtr, if isPermitted then Just qos else Nothing)
unsubscribe :: Session auth -> PacketIdentifier -> [Filter] -> IO ()
unsubscribe session pid filters =
unsubBrokerTree `seq` do
modifyMVarMasked_ (brokerState $ sessionBroker session) $ \bst-> do
modifyMVarMasked_
( sessionSubscriptions session )
( pure . flip (R.differenceWith (const . const Nothing)) unsubBrokerTree )
pure $ bst { brokerSubscriptions = R.differenceWith
(\is (Identity i)-> Just (IS.delete i is))
(brokerSubscriptions bst) unsubBrokerTree }
enqueueUnsubscribeAcknowledged session pid
where
SessionIdentifier sid = sessionIdentifier session
unsubBrokerTree = R.insertFoldable ( fmap (,Identity sid) filters ) R.empty
disconnect :: Session auth -> IO ()
disconnect session =
exclusively (sessionSemaphore session) (pure ())
reset :: Session auth -> IO ()
reset session =
modifyMVar_ (sessionQueue session) (\q-> pure $! resetQueue q)
enqueuePingResponse :: Session auth -> IO ()
enqueuePingResponse session = do
modifyMVar_ (sessionQueue session) $ \queue->
pure $! queue { outputBuffer = ServerPingResponse Seq.<| outputBuffer queue }
notePending session
enqueueSubscribeAcknowledged :: Session auth -> PacketIdentifier -> [Maybe QoS] -> IO ()
enqueueSubscribeAcknowledged session pid mqoss = do
modifyMVar_ (sessionQueue session) $ \queue->
pure $! queue { outputBuffer = outputBuffer queue Seq.|> ServerSubscribeAcknowledged pid mqoss }
notePending session
enqueueUnsubscribeAcknowledged :: Session auth -> PacketIdentifier -> IO ()
enqueueUnsubscribeAcknowledged session pid = do
modifyMVar_ (sessionQueue session) $ \queue->
pure $! queue { outputBuffer = outputBuffer queue Seq.|> ServerUnsubscribeAcknowledged pid}
notePending session
dequeue :: Session auth -> IO (Seq.Seq ServerPacket)
dequeue session =
modifyMVar (sessionQueue session) $ \queue-> do
let q = normalizeQueue queue
if | not (Seq.null $ outputBuffer q) -> pure (q { outputBuffer = mempty }, outputBuffer q)
| not (Seq.null $ queueQoS0 q) -> pure (q { queueQoS0 = mempty }, fmap (ServerPublish (PacketIdentifier (1)) (Duplicate False)) (queueQoS0 q))
| otherwise -> clearPending >> pure (q, mempty)
where
clearPending :: IO ()
clearPending = void $ tryTakeMVar (sessionQueuePending session)
processPublish :: Session auth -> PacketIdentifier -> Duplicate -> Message -> IO ()
processPublish session pid@(PacketIdentifier p) _dup msg =
case msgQoS msg of
QoS0 ->
publish session msg
QoS1 -> do
publish session msg
modifyMVar_ (sessionQueue session) $ \q-> pure $! q {
outputBuffer = outputBuffer q Seq.|> ServerPublishAcknowledged pid
}
notePending session
QoS2 -> do
modifyMVar_ (sessionQueue session) $ \q-> pure $! q {
outputBuffer = outputBuffer q Seq.|> ServerPublishReceived pid
, notReleased = IM.insert p msg (notReleased q)
}
notePending session
processPublishAcknowledged :: Session auth -> PacketIdentifier -> IO ()
processPublishAcknowledged session (PacketIdentifier pid) = do
modifyMVar_ (sessionQueue session) $ \q-> pure $! q {
queuePids = bool (queuePids q) (PacketIdentifier pid Seq.<| queuePids q) (IM.member pid (notAcknowledged q))
, notAcknowledged = IM.delete pid (notAcknowledged q)
}
notePending session
processPublishReceived :: Session auth -> PacketIdentifier -> IO ()
processPublishReceived session (PacketIdentifier pid) = do
modifyMVar_ (sessionQueue session) $ \q->
pure $! q {
notReceived = IM.delete pid (notReceived q)
, notComplete = IS.insert pid (notComplete q)
, outputBuffer = outputBuffer q Seq.|> ServerPublishRelease (PacketIdentifier pid)
}
notePending session
processPublishRelease :: Session auth -> PacketIdentifier -> IO ()
processPublishRelease session (PacketIdentifier pid) = do
modifyMVar_ (sessionQueue session) $ \q->
case IM.lookup pid (notReleased q) of
Nothing ->
pure q
Just msg -> do
publish session msg
pure $! q { notReleased = IM.delete pid (notReleased q)
, outputBuffer = outputBuffer q Seq.|> ServerPublishComplete (PacketIdentifier pid)
}
notePending session
processPublishComplete :: Session auth -> PacketIdentifier -> IO ()
processPublishComplete session (PacketIdentifier pid) = do
modifyMVar_ (sessionQueue session) $ \q-> pure $! q {
queuePids = PacketIdentifier pid Seq.<| queuePids q
, notComplete = IS.delete pid (notComplete q)
}
notePending session
getSubscriptions :: Session auth -> IO (R.Trie QoS)
getSubscriptions session =
readMVar (sessionSubscriptions session)
getConnection :: Session auth -> IO (Maybe Connection)
getConnection session =
tryReadMVar (sessionConnection session)
getPrincipal :: Session auth -> IO Principal
getPrincipal session =
readMVar (sessionPrincipal session)
getFreePacketIdentifiers :: Session auth -> IO (Seq.Seq PacketIdentifier)
getFreePacketIdentifiers session =
queuePids <$> readMVar (sessionQueue session)
resetQueue :: ServerQueue -> ServerQueue
resetQueue q = q {
outputBuffer = (rePublishQoS1 . rePublishQoS2 . reReleaseQoS2) mempty
}
where
rePublishQoS1 s = IM.foldlWithKey (\s' pid msg-> s' Seq.|> ServerPublish (PacketIdentifier pid) (Duplicate True) msg) s (notAcknowledged q)
rePublishQoS2 s = IM.foldlWithKey (\s' pid msg-> s' Seq.|> ServerPublish (PacketIdentifier pid) (Duplicate True) msg) s (notReceived q)
reReleaseQoS2 s = IS.foldl (\s' pid-> s' Seq.|> ServerPublishRelease (PacketIdentifier pid) ) s (notComplete q)
normalizeQueue :: ServerQueue -> ServerQueue
normalizeQueue = takeQoS1 . takeQoS2
where
takeQoS1 q
| Seq.null msgs = q
| otherwise = q
{ outputBuffer = outputBuffer q <> Seq.zipWith (flip ServerPublish (Duplicate False)) pids' msgs'
, queuePids = pids''
, queueQoS1 = msgs''
, notAcknowledged = foldr (\(PacketIdentifier pid, msg)-> IM.insert pid msg)
(notAcknowledged q)
(Seq.zipWith (,) pids' msgs')
}
where
pids = queuePids q
msgs = queueQoS1 q
n = min (Seq.length pids) (Seq.length msgs)
(pids', pids'') = Seq.splitAt n pids
(msgs', msgs'') = Seq.splitAt n msgs
takeQoS2 q
| Seq.null msgs = q
| otherwise = q
{ outputBuffer = outputBuffer q <> Seq.zipWith (flip ServerPublish (Duplicate False)) pids' msgs'
, queuePids = pids''
, queueQoS2 = msgs''
, notReceived = foldr (\(PacketIdentifier pid, msg)-> IM.insert pid msg)
(notReceived q)
(Seq.zipWith (,) pids' msgs')
}
where
pids = queuePids q
msgs = queueQoS2 q
n = min (Seq.length pids) (Seq.length msgs)
(pids', pids'') = Seq.splitAt n pids
(msgs', msgs'') = Seq.splitAt n msgs