{- |

CMQ, a UDP-based inherently asynchronous message queue to orchestrate messages, 
events and processes in the cloud. It trades guarantees, consistency mechanisms, 
(shared) state and transactions for robustness, scalability and performance.
CMQ fares especially well in modern Layer 2 switches in data center networks, 
as well as in the presence of errors. 
A 'Message' is pushed to the queue together with a queue identifier ('Cmq') 
and a 'KEY' that specifies the recipient. Messages can be pushed in logarithmic 
time and the next message can be retrieved in constant time. 

This implementation is based on

* J. Fritsch, C. Walker, /CMQ - A lightweight, asynchronous high-performance messaging queue for the cloud (2012)/.

-}

module System.CMQ (
                   -- * The queue identifier (Token)
                   Cmq
                   -- * IPv4 address 
                   {-|
                    Use 'read' @\"192.0.2.1\"@ :: 'IPv4', for example. Also, @\"192.0.2.1\"@ can be used as literal with OverloadedStrings.
                   -}
                   , IPv4
                   -- * Destination identifier (KEY)
                   , KEY
                   -- * Construction
                   , newRq
                   -- * Insertion (Push Message)
                   , cwPush
                   -- * Query (Pop a Message)
                   , cwPop
) where

import Data.Time.Clock.POSIX
import Data.Serialize as S
import qualified Data.PSQueue as PSQ
import qualified Data.Map as Map
import Control.Concurrent
import Control.Concurrent.MVar
import Control.Concurrent.STM
import Control.Concurrent.STM.TChan
import Control.Monad.State
import Control.Monad.Reader
import Network.Socket hiding (send, sendTo, recv, recvFrom)
import Network.Socket.ByteString
import qualified Data.ByteString.Char8 as B
import Data.IP

--PSQ = ( host , ID ) POSIXTime
--where the tuple is k and POSIXTime is p

--Map is ( host , ID ) [messages]
--where the tuple is the k and [messages] is a list of messages

--ID is an integer that is reserved for future use e.g. as unique process identifier

--qthresh = 1440 (MTU minus some overhead)
--qthresh = 512 (most commen UDP packet size e.g. DNS)

type KEY = ( IPv4 , Integer ) -- ^ The 'KEY' identifies the message destination in the format 'IPv4' address, Integer. The integer is reserved for future use e.g. as unique process identifier 
type TPSQ = TVar (PSQ.PSQ KEY POSIXTime)
type TMap a = TVar (Map.Map KEY [a]) 

-- | General purpose finite queue.
data Cmq a = Cmq { qthresh :: Int, tdelay :: Rational, cwpsq :: TPSQ, cwmap :: TMap a, cwchan :: TChan a }

getQthresh :: Reader (Cmq a) Int
getQthresh = do
   c <- ask
   return (qthresh c)

getDelay :: Reader (Cmq a) Rational
getDelay = do
   c <- ask
   return (tdelay c)

getTMap :: Reader (Cmq a) (TMap a)
getTMap = do
   c <- ask
   return (cwmap c)

getTPsq :: Reader (Cmq a) TPSQ
getTPsq = do
   c <- ask
   return (cwpsq c)

getTChan :: Reader (Cmq a) (TChan a)
getTChan = do
    c <- ask
    return (cwchan c)

-- | Builds and returns a new instance of Cmq.
--
-- @
--  (token) <- newRq soc 512 200
-- @

newRq :: Serialize a => Socket -- ^ Socket does not need to be in connected state. 
         -> Int         -- ^ Maximum Queue length in bytes.
         -> Rational    -- ^ Maximum Queue age in ms.
         -> IO (Cmq a)  -- ^ Token returned to identify the Queue.
newRq s qthresh tdelay = do
      q <- atomically $ newTVar (PSQ.empty)
      m <- atomically $ newTVar (Map.empty)
      t <- newTChanIO
      let cmq = Cmq qthresh tdelay q m t
      forkIO $ loopMyQ s cmq q m
      forkIO $ loadTChan s t
      return cmq

loadTChan :: Serialize a => Socket -> TChan a -> IO ()
loadTChan s t = forever $ do
      (msg, _) <- receiveMessage s
      forkIO $ write2TChan msg t
 
appendMsg :: Serialize a => a -> KEY -> Cmq a -> TMap a -> IO Int
appendMsg newmsgs key cmq m =
      atomically $ do
         mT <- readTVar m
         messages' <- case (Map.lookup key mT) of
                           Nothing -> let messages' = [] in return messages'
                           _ -> let Just messages' = Map.lookup key mT in return messages'
         let l = B.length $ S.encode messages'
             l' = l + (B.length $ S.encode newmsgs)
         let env = runReader getQthresh cmq
         if l' < env then writeTVar m (Map.adjust (++ [newmsgs]) key mT) else writeTVar m mT
         return (env - l')

insertSglton :: a -> KEY -> TPSQ -> TMap a -> IO ()
insertSglton newmsgs key q m = do
      time <- getPOSIXTime
      atomically $ do
      qT <- readTVar q
      mT <- readTVar m
      writeTVar q (PSQ.insert key time qT)
      writeTVar m (Map.insert key [newmsgs] mT)
      return ()

-- | /O(log n)/. Push a message to the queue.
--
-- @
--  cwPush soc (\"192.168.35.69\", 0) (\"ping\" :: String) token 
-- @

cwPush :: Serialize a => Socket -> KEY -> a -> Cmq a -> IO ()
cwPush s key newmsgs cmq = do
     now <- getPOSIXTime
     let m = runReader getTMap cmq
     let q = runReader getTPsq cmq
     qT <- atomically $ readTVar q
     case (PSQ.lookup key qT) of
          Nothing -> insertSglton newmsgs key q m
          _ -> do  result <- appendMsg newmsgs key cmq m
                   when (result <= 0) (transMit s now key newmsgs q m)

sendq :: Socket -> B.ByteString -> String -> PortNumber -> IO ()
sendq s datastring host port = do
     hostAddr <- inet_addr host
     sendAllTo s datastring (SockAddrInet port hostAddr)

transMit :: Serialize a => Socket -> POSIXTime -> KEY -> a -> TPSQ -> TMap a -> IO ()
transMit s time key newmsgs q m = do
     loopAction <- atomically $ do
                       mT <- readTVar m
                       qT <- readTVar q
                       let (a, _) = key
                       let mT' = Map.delete key mT
                       let qT' = PSQ.delete key qT
                       writeTVar q (PSQ.insert key time qT')
                       writeTVar m (Map.insert key [newmsgs] mT')
                       return $ case Map.lookup key mT of
                                     Nothing -> return ()
                                     Just messages -> sendq s (S.encode messages) (show a) 4711
     loopAction

transMit2 :: Serialize a => Socket -> KEY -> TPSQ -> TMap a -> IO ()
transMit2 s key q m = do
     loopAction2 <- atomically $ do
                       mT <- readTVar m
                       qT <- readTVar q
                       let (a, _) = key
                       let mT' = Map.delete key mT
                       let qT' = PSQ.delete key qT
                       writeTVar q qT'
                       writeTVar m mT'
                       return (let Just messages = Map.lookup key mT in sendq s (S.encode messages) (show a)  4711)
     loopAction2

loopMyQ s cmq q m = forever $ do

      b <- atomically $ do q' <- readTVar q
                           case PSQ.findMin q' of
                              Just b  -> return b
                              Nothing -> retry

      let tdelay= runReader getDelay cmq
      let duetime = (PSQ.prio b) + (fromRational $ tdelay / 1000)
      let key = (PSQ.key b)
      now <- getPOSIXTime
      when (now > duetime) (transMit2 s key q m)
      threadDelay 20 --this may need to be adjusted manually

write2TChan :: [a] -> TChan a -> IO ()
write2TChan msg mtch = do
        mapM_ (\x -> atomically $ writeTChan mtch x) msg
        return ()

-- | /O(1)/. A message is popped of CMQ. The next value is read from the queue.
-- Use for example
--
-- @
--   msg <- cwPop token :: IO (Maybe String)
-- @
--
--  or with ScopedTypeVariables
--
-- @
--   (msg :: Maybe String) <- cwPop token
-- @

cwPop :: Cmq a -> IO (Maybe a)
cwPop cmq = do
        let mtch = runReader getTChan cmq
        empty <- atomically $ isEmptyTChan mtch
        case empty of
             False -> do m <- atomically $ readTChan mtch
                         return (Just m)
             True -> return Nothing
        --Checks whether messages have arrived or not
        --before read is attempted. Makes the Pop non-blocking

receiveMessage :: (Serialize a) => Socket -> IO (a, SockAddr)
receiveMessage s  = do
        (msg, remoteSockAddr) <- recvFrom s 512 --influence on performance and best value still under investigation
        case S.decode msg of
             --Left str -> putStrLn str
             Right res -> return (res, remoteSockAddr)