module Network.IMAP.RequestWatcher (requestWatcher) where
import Network.IMAP.Types
import Network.IMAP.Parsers
import Data.Maybe (isJust, fromJust)
import Network.Connection
import Data.Attoparsec.ByteString
import qualified Data.Attoparsec.ByteString as AP
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as BSC
import qualified Data.List as L
import qualified Data.Text as T
import qualified Data.STM.RollingQueue as RQ
import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar
import Control.Concurrent (killThread)
import Control.Monad.STM
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Exception (SomeException)
import qualified Control.Monad.Catch as C
import Control.Monad.Catch (MonadCatch)
import Control.Monad (when)
import Data.Foldable (forM_)
import System.Log.Logger (errorM)
requestWatcher :: (MonadIO m, Universe m, MonadCatch m) => IMAPConnection -> m ()
requestWatcher conn = flip C.catch (handleExceptions conn) $ do
parseResult <- getParsedChunk (rawConnection . imapState $ conn) (AP.parse parseReply)
reactToReply conn parseResult
requestWatcher conn
reactToReply :: (MonadIO m) =>
IMAPConnection ->
ParseResult ->
m ()
reactToReply conn parsedReply = do
let state = imapState conn
requests <- liftIO . atomically $ do
newReqs <- getOutstandingReqs $ responseRequests state
knownReqs <- readTVar $ outstandingReqs state
return $ knownReqs ++ newReqs
pendingReqs <- case parsedReply of
Left err -> dispatchError requests err
Right reply -> do
updateConnState conn reply
case reply of
Tagged t -> dispatchTagged requests t
Untagged u -> dispatchUntagged conn requests u
liftIO . atomically $ writeTVar (outstandingReqs state) pendingReqs
shouldIDie conn
updateConnState :: (MonadIO m) => IMAPConnection -> CommandResult -> m ()
updateConnState conn command = do
let connState = connectionState conn
case command of
Untagged u -> case u of
OKResult _ -> liftIO . atomically $ writeTVar connState Connected
Bye -> liftIO . atomically $ writeTVar connState Disconnected
_ -> return ()
_ -> return ()
shouldIDie :: (MonadIO m) => IMAPConnection -> m ()
shouldIDie conn = liftIO $ do
threadId <- atomically . readTVar . serverWatcherThread . imapState $ conn
connState <- atomically . readTVar $ connectionState conn
when (isDisconnected connState && isJust threadId) $
killThread $ fromJust threadId
dispatchError :: (MonadIO m) => [ResponseRequest] ->
ErrorMessage -> m [ResponseRequest]
dispatchError requests errorMessage = do
case requests of
req:reqs -> do
let errorResponse = TaggedResult {
commandId="noid"
, resultState=BAD
, resultRest=errorMessage
}
liftIO . atomically $ writeTQueue (responseQueue req) $ Tagged errorResponse
return reqs
_ -> do
liftIO $ errorM "dispatchError" "Cannot dispatch a parse error \
\without an outstanding request"
return []
dispatchTagged :: (MonadIO m) => [ResponseRequest] ->
TaggedResult -> m [ResponseRequest]
dispatchTagged requests response = do
let reqId = commandId response
let pendingRequest = L.find (\r -> respRequestId r == reqId) requests
liftIO $ case pendingRequest of
Just req -> atomically $ writeTQueue (responseQueue req) $ Tagged response
Nothing -> return ()
return $ if isJust pendingRequest
then filter (/= fromJust pendingRequest) requests
else requests
dispatchUntagged :: (MonadIO m) => IMAPConnection ->
[ResponseRequest] ->
UntaggedResult ->
m [ResponseRequest]
dispatchUntagged conn requests response = do
if null requests
then liftIO . atomically $ RQ.write (untaggedQueue conn) response
else liftIO . atomically $ do
let responseQ = responseQueue . head $ requests
writeTQueue responseQ $ Untagged response
return requests
getOutstandingReqs :: TQueue ResponseRequest ->
STM [ResponseRequest]
getOutstandingReqs reqsQueue = do
isEmpty <- isEmptyTQueue reqsQueue
if isEmpty
then return []
else do
req <- readTQueue reqsQueue
next <- getOutstandingReqs reqsQueue
return (req:next)
omitOneLine :: BSC.ByteString -> BSC.ByteString
omitOneLine bytes = if BSC.length withLF > 0 then BSC.tail withLF else withLF
where withLF = BSC.dropWhile (/= '\n') bytes
parseChunk :: (BSC.ByteString -> Result ParseResult) ->
BSC.ByteString ->
((Maybe ParseResult, Maybe (BSC.ByteString -> Result ParseResult)), BSC.ByteString)
parseChunk parser chunk =
case parser chunk of
Fail left _ msg -> ((Just $ assembleParseError msg chunk, Nothing), omitOneLine left)
Partial continuation -> ((Nothing, Just continuation), BS.empty)
Done left result -> ((Just result, Nothing), left)
assembleParseError :: String -> BSC.ByteString -> ParseResult
assembleParseError parserError chunk = Left $ T.concat [
"Parse failed with error: '", packedError, "' while reading an input chunk: '",
decodedChunk, "'.\n\nThis should never happen and is a library error. ",
"To open an issue please go to https://github.com/mkawalec/imap/issues"]
where packedError = T.pack parserError
decodedChunk = T.pack $ show chunk
getParsedChunk :: (MonadIO m, Universe m) => Connection ->
(BSC.ByteString -> Result ParseResult) ->
m ParseResult
getParsedChunk conn parser = do
(parsed, cont) <- connectionGetChunk'' conn $ parseChunk parser
case cont of
Just continuation -> getParsedChunk conn continuation
Nothing -> return . fromJust $ parsed
handleExceptions :: (MonadIO m) => IMAPConnection ->
SomeException ->
m ()
handleExceptions conn e = do
let state = imapState conn
threadId <- liftIO . atomically $ do
writeTVar (connectionState conn) Disconnected
let actualThreadId = readTVar $ serverWatcherThread state
writeTVar (serverWatcherThread state) Nothing
actualThreadId
requests <- liftIO . atomically $ do
newReqs <- getOutstandingReqs $ responseRequests state
knownReqs <- readTVar $ outstandingReqs state
return $ knownReqs ++ newReqs
let reply = TaggedResult {
commandId = "noid",
resultState = BAD,
resultRest = T.append "Exception caught " (T.pack . show $ e)
}
liftIO . atomically $ mapM_ (sendResponse reply) requests
forM_ threadId $ liftIO . killThread
sendResponse :: TaggedResult -> ResponseRequest -> STM ()
sendResponse response request = writeTQueue (responseQueue request) $ Tagged response