{-# Language CPP #-}
module Network.Mom.Stompl.Client.Queue (
withConnection,
Fac.Con,
F.Heart,
Copt(..),
EHandler,
Reader, Writer,
newReader, destroyReader, newWriter, destroyWriter,
withReader, withWriter, withPair, ReaderDesc, WriterDesc,
Qopt(..), F.AckMode(..),
InBound, OutBound,
readQ,
writeQ, writeQWith,
writeAdHoc, writeAdHocWith,
Message,
msgContent, msgRaw,
msgType, msgLen, msgHdrs,
Fac.Rec(..), Receipt,
waitReceipt,
Tx,
withTransaction,
Topt(..), abort,
ack, ackWith, nack, nackWith,
#ifdef TEST
frmToMsg, msgAck,
#endif
module Network.Mom.Stompl.Client.Exception
)
where
import Network.Mom.Stompl.Client.Stream
import Network.Mom.Stompl.Client.Factory as Fac
import Network.Mom.Stompl.Client.State
import qualified Network.Mom.Stompl.Frame as F
import Network.Mom.Stompl.Client.Exception
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.UTF8 as U
import Data.List (find)
import Data.Time.Clock
import Data.Maybe (isNothing)
import Data.Conduit.Network (AppData)
import Data.Conduit.Network.TLS
import Control.Concurrent
import Control.Applicative ((<$>))
import Control.Monad (when, unless, forever)
import Control.Exception (bracket, finally, catches, onException,
AsyncException(..), Handler(..),
throwIO, SomeException)
import Codec.MIME.Type as Mime (Type)
import System.Timeout (timeout)
vers :: [F.Version]
vers :: [Version]
vers = [(Int
1,Int
0), (Int
1,Int
1), (Int
1,Int
2)]
withConnection :: String -> Int -> [Copt] -> [F.Header] ->
(Con -> IO a) -> IO a
withConnection :: String -> Int -> [Copt] -> [Header] -> (Con -> IO a) -> IO a
withConnection String
host Int
port [Copt]
os [Header]
hs Con -> IO a
act = do
let beat :: Version
beat = [Copt] -> Version
oHeartBeat [Copt]
os
let mx :: Int
mx = [Copt] -> Int
oMaxRecv [Copt]
os
let (String
u,String
p) = [Copt] -> Header
oAuth [Copt]
os
let (String
ci) = [Copt] -> String
oCliId [Copt]
os
let tm :: Int
tm = [Copt] -> Int
oTmo [Copt]
os
let cfg :: TLSClientConfig
cfg = String -> Int -> [Copt] -> TLSClientConfig
oTLS String
host Int
port [Copt]
os
let t :: FrameType
t | [Copt] -> Bool
oStomp [Copt]
os = FrameType
F.Stomp
| Bool
otherwise = FrameType
F.Connect
TLSClientConfig -> (AppData -> IO a) -> IO a
forall (m :: * -> *) a.
MonadUnliftIO m =>
TLSClientConfig -> (AppData -> m a) -> m a
runTLSClient TLSClientConfig
cfg ((AppData -> IO a) -> IO a) -> (AppData -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \AppData
ad -> do
Con
cid <- IO Con
mkUniqueConId
ThreadId
me <- IO ThreadId
myThreadId
UTCTime
now <- IO UTCTime
getCurrentTime
Chan Frame
ch <- IO (Chan Frame)
forall a. IO (Chan a)
newChan
IO Connection
-> (Connection -> IO ()) -> (Connection -> IO a) -> IO a
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (do Connection -> IO ()
addCon (Connection -> IO ()) -> Connection -> IO ()
forall a b. (a -> b) -> a -> b
$ (Con
-> String
-> Int
-> Int
-> String
-> String
-> String
-> [Version]
-> Version
-> Chan Frame
-> ThreadId
-> UTCTime
-> [Copt]
-> Connection
mkConnection Con
cid String
host Int
port
Int
mx String
u String
p String
ci
[Version]
vers Version
beat Chan Frame
ch
ThreadId
me UTCTime
now [Copt]
os)
Con -> IO Connection
getCon Con
cid)
(\Connection
_ -> Con -> IO ()
rmCon Con
cid) ((Connection -> IO a) -> IO a) -> (Connection -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Connection
c ->
Connection -> AppData -> Chan Frame -> IO a -> IO a
forall a. Connection -> AppData -> Chan Frame -> IO a -> IO a
withSender Connection
c AppData
ad Chan Frame
ch (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ do
MVar ()
w <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
Connection -> AppData -> Con -> MVar () -> IO a -> IO a
forall a. Connection -> AppData -> Con -> MVar () -> IO a -> IO a
withListener Connection
c AppData
ad Con
cid MVar ()
w (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ do
FrameType
-> [Version] -> Version -> [Header] -> Connection -> IO ()
connectBroker FrameType
t [Version]
vers Version
beat [Header]
hs Connection
c
Con -> Int -> Version -> MVar () -> (Con -> IO a) -> IO a
forall a. Con -> Int -> Version -> MVar () -> (Con -> IO a) -> IO a
whenConnected Con
cid Int
tm Version
beat MVar ()
w Con -> IO a
act
withSender :: Connection -> AppData -> Chan F.Frame -> IO a -> IO a
withSender :: Connection -> AppData -> Chan Frame -> IO a -> IO a
withSender Connection
c AppData
ad Chan Frame
ch = Connection -> String -> IO () -> IO a -> IO a
forall r. Connection -> String -> IO () -> IO r -> IO r
withThread Connection
c String
"sender" (AppData -> Chan Frame -> IO ()
sender AppData
ad Chan Frame
ch)
withListener :: Connection -> AppData -> Con -> MVar () -> IO a -> IO a
withListener :: Connection -> AppData -> Con -> MVar () -> IO a -> IO a
withListener Connection
c AppData
ad Con
cid MVar ()
m = Connection -> String -> IO () -> IO a -> IO a
forall r. Connection -> String -> IO () -> IO r -> IO r
withThread Connection
c String
"listener" (AppData -> Con -> MVar () -> IO ()
listen AppData
ad Con
cid MVar ()
m)
listen :: AppData -> Con -> MVar () -> IO ()
listen :: AppData -> Con -> MVar () -> IO ()
listen AppData
ad Con
cid MVar ()
w = do
Connection
c <- Con -> IO Connection
getCon Con
cid
Chan Frame
ch <- IO (Chan Frame)
forall a. IO (Chan a)
newChan
Connection -> String -> IO () -> IO () -> IO ()
forall r. Connection -> String -> IO () -> IO r -> IO r
withThread Connection
c String
"receiver" (AppData -> Chan Frame -> EH -> IO ()
receiver AppData
ad Chan Frame
ch (Connection -> EH
throwToOwner Connection
c)) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Frame
f <- Chan Frame -> IO Frame
forall a. Chan a -> IO a
readChan Chan Frame
ch
Con -> IO ()
logReceive Con
cid
case Frame -> FrameType
F.typeOf Frame
f of
FrameType
F.Connected -> Con -> Frame -> MVar () -> IO ()
handleConnected Con
cid Frame
f MVar ()
w
FrameType
F.Message -> Con -> Frame -> IO ()
handleMessage Con
cid Frame
f
FrameType
F.Error -> Con -> Frame -> IO ()
handleError Con
cid Frame
f
FrameType
F.Receipt -> Con -> Frame -> IO ()
handleReceipt Con
cid Frame
f
FrameType
F.HeartBeat -> Con -> Frame -> IO ()
handleBeat Con
cid Frame
f
FrameType
_ -> Connection -> EH
throwToOwner Connection
c EH -> EH
forall a b. (a -> b) -> a -> b
$
String -> StomplException
ProtocolException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$ String
"Unexpected Frame: " String -> String -> String
forall a. [a] -> [a] -> [a]
++
FrameType -> String
forall a. Show a => a -> String
show (Frame -> FrameType
F.typeOf Frame
f)
connectBroker :: F.FrameType ->
[F.Version] -> F.Heart -> [F.Header] ->
Connection -> IO ()
connectBroker :: FrameType
-> [Version] -> Version -> [Header] -> Connection -> IO ()
connectBroker FrameType
t [Version]
vs Version
beat [Header]
hs Connection
c =
let mk :: [Header] -> Either String Frame
mk = case FrameType
t of
FrameType
F.Connect -> [Header] -> Either String Frame
F.mkConFrame
FrameType
F.Stomp -> [Header] -> Either String Frame
F.mkStmpFrame
FrameType
_ -> String -> [Header] -> Either String Frame
forall a. HasCallStack => String -> a
error String
"Ouch: Unknown Connect-type"
in case ([Header] -> Either String Frame)
-> String
-> String
-> String
-> String
-> [Version]
-> Version
-> [Header]
-> Either String Frame
mkConF [Header] -> Either String Frame
mk
(Connection -> String
conAddr Connection
c)
(Connection -> String
conUsr Connection
c) (Connection -> String
conPwd Connection
c)
(Connection -> String
conCli Connection
c) [Version]
vs Version
beat [Header]
hs of
Left String
e -> EH
forall e a. Exception e => e -> IO a
throwIO (String -> StomplException
ConnectException String
e)
Right Frame
f -> Chan Frame -> Frame -> IO ()
forall a. Chan a -> a -> IO ()
writeChan (Connection -> Chan Frame
conChn Connection
c) Frame
f
whenConnected :: Con -> Int -> F.Heart -> MVar () -> (Con -> IO a) -> IO a
whenConnected :: Con -> Int -> Version -> MVar () -> (Con -> IO a) -> IO a
whenConnected Con
cid Int
tm Version
bt MVar ()
m Con -> IO a
act = do
Maybe ()
mbT <- if Int
tm Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 then () -> Maybe ()
forall a. a -> Maybe a
Just (() -> Maybe ()) -> IO () -> IO (Maybe ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
m
else Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
timeout (Int -> Int
ms Int
tm) (MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
m)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe () -> Bool
forall a. Maybe a -> Bool
isNothing Maybe ()
mbT) (EH
forall e a. Exception e => e -> IO a
throwIO EH -> EH
forall a b. (a -> b) -> a -> b
$ String -> StomplException
ConnectException String
"Timeout expired!")
Connection
c <- Con -> IO Connection
getCon Con
cid
if Connection -> Int
period Connection
c Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
then if Connection -> Int
period Connection
c Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Version -> Int
forall a b. (a, b) -> a
fst Version
bt
then StomplException -> IO a
forall e a. Exception e => e -> IO a
throwIO (String -> StomplException
ConnectException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$ String
"Beat frequency too high: " String -> String -> String
forall a. [a] -> [a] -> [a]
++
Int -> String
forall a. Show a => a -> String
show (Connection -> Int
period Connection
c))
else Connection -> String -> IO () -> IO a -> IO a
forall r. Connection -> String -> IO () -> IO r -> IO r
withThread Connection
c String
"heartbeat" (Con -> Int -> IO ()
heartBeat Con
cid (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection -> Int
period Connection
c) (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ Connection -> IO a
go Connection
c
else Connection -> IO a
go Connection
c
where go :: Connection -> IO a
go Connection
c = do
MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
m ()
Con -> Connection -> IO ()
updCon Con
cid Connection
c {conBrk :: Bool
conBrk = Bool
True}
IO a -> IO () -> IO a
forall a b. IO a -> IO b -> IO a
finally (Con -> IO a
act Con
cid) (do
Connection
c' <- Con -> IO Connection
getCon Con
cid
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Connection -> Int
conWait Connection
c' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Rec
rc <- IO Rec
mkUniqueRecc
Connection -> String -> IO ()
disconnect Connection
c' (Rec -> String
forall a. Show a => a -> String
show Rec
rc)
Con -> Rec -> IO ()
addRec Con
cid Rec
rc
Con -> Rec -> Int -> IO ()
waitCon Con
cid Rec
rc (Connection -> Int
conWait Connection
c') IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
`onException` Con -> Rec -> IO ()
rmRec Con
cid Rec
rc
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Connection -> Int
conWait Connection
c' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 Bool -> Bool -> Bool
&& Connection -> Int
conWaitE Connection
c' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
Int -> IO ()
threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> Int
ms (Connection -> Int
conWaitE Connection
c'))
period :: Connection -> Int
period = Version -> Int
forall a b. (a, b) -> b
snd (Version -> Int) -> (Connection -> Version) -> Connection -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> Version
conBeat
disconnect :: Connection -> String -> IO ()
disconnect :: Connection -> String -> IO ()
disconnect Connection
c String
r
| Connection -> Bool
conBrk Connection
c = case String -> Either String Frame
mkDiscF String
r of
Left String
e -> EH
forall e a. Exception e => e -> IO a
throwIO (String -> StomplException
ConnectException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$
String
"Cannot create Disconnect Frame: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
e)
Right Frame
f -> Chan Frame -> Frame -> IO ()
forall a. Chan a -> a -> IO ()
writeChan (Connection -> Chan Frame
conChn Connection
c) Frame
f
| Bool
otherwise = EH
forall e a. Exception e => e -> IO a
throwIO (String -> StomplException
ConnectException String
"Not connected")
waitCon :: Con -> Receipt -> Int -> IO ()
waitCon :: Con -> Rec -> Int -> IO ()
waitCon Con
cid Rec
rc Int
delay = do
Connection
c <- Con -> IO Connection
getCon Con
cid
case (Rec -> Bool) -> [Rec] -> Maybe Rec
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (Rec -> Rec -> Bool
forall a. Eq a => a -> a -> Bool
==Rec
rc) ([Rec] -> Maybe Rec) -> [Rec] -> Maybe Rec
forall a b. (a -> b) -> a -> b
$ Connection -> [Rec]
conRecs Connection
c of
Maybe Rec
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just Rec
_ ->
if Int
delay Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
then EH
forall e a. Exception e => e -> IO a
throwIO EH -> EH
forall a b. (a -> b) -> a -> b
$ String -> StomplException
ConnectException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$
String
"No receipt on disconnect (" String -> String -> String
forall a. [a] -> [a] -> [a]
++
Con -> String
forall a. Show a => a -> String
show Con
cid String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
")."
else do Int -> IO ()
threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> Int
ms Int
1
Con -> Rec -> Int -> IO ()
waitCon Con
cid Rec
rc (Int
delay Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)
withThread :: Connection -> String -> IO () -> IO r -> IO r
withThread :: Connection -> String -> IO () -> IO r -> IO r
withThread Connection
c String
nm IO ()
th IO r
act = do
MVar ()
m <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
MVar Bool
x <- Bool -> IO (MVar Bool)
forall a. a -> IO (MVar a)
newMVar Bool
False
ThreadId
tid <- IO () -> IO ThreadId
forkIO (MVar Bool -> IO ()
theThread MVar Bool
x IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
`finally` MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
m ())
(IO r
act IO r -> (r -> IO r) -> IO r
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= MVar Bool -> r -> IO r
forall b. MVar Bool -> b -> IO b
signalOk MVar Bool
x) IO r -> IO () -> IO r
forall a b. IO a -> IO b -> IO a
`finally` (ThreadId -> IO ()
killThread ThreadId
tid IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
m)
where theThread :: MVar Bool -> IO ()
theThread MVar Bool
x = (IO ()
th IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MVar Bool -> IO ()
signalEnd MVar Bool
x) IO () -> [Handler ()] -> IO ()
forall a. IO a -> [Handler a] -> IO a
`catches` [Handler ()]
hndls
hndls :: [Handler ()]
hndls = [(AsyncException -> IO ()) -> Handler ()
forall a e. Exception e => (e -> IO a) -> Handler a
Handler (\AsyncException
e -> case AsyncException
e of
AsyncException
ThreadKilled -> AsyncException -> IO ()
forall e a. Exception e => e -> IO a
throwIO AsyncException
e
AsyncException
_ -> AsyncException -> IO ()
forall e a. Exception e => e -> IO a
throwIO AsyncException
e),
(SomeException -> IO ()) -> Handler ()
forall a e. Exception e => (e -> IO a) -> Handler a
Handler (\SomeException
e -> Connection -> EH
throwToOwner Connection
c EH -> EH
forall a b. (a -> b) -> a -> b
$
String -> StomplException
WorkerException (
String
nm String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" terminated: " String -> String -> String
forall a. [a] -> [a] -> [a]
++
SomeException -> String
forall a. Show a => a -> String
show (SomeException
e::SomeException)))
]
signalEnd :: MVar Bool -> IO ()
signalEnd MVar Bool
x = do Bool
t <- MVar Bool -> IO Bool
forall a. MVar a -> IO a
readMVar MVar Bool
x
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
t (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection -> EH
throwToOwner Connection
c EH -> EH
forall a b. (a -> b) -> a -> b
$ String -> StomplException
WorkerException (
String
nm String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" terminated")
signalOk :: MVar Bool -> b -> IO b
signalOk MVar Bool
x b
r = MVar Bool -> (Bool -> IO (Bool, b)) -> IO b
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar Bool
x ((Bool -> IO (Bool, b)) -> IO b) -> (Bool -> IO (Bool, b)) -> IO b
forall a b. (a -> b) -> a -> b
$ \Bool
_ -> (Bool, b) -> IO (Bool, b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
True,b
r)
throwToOwner :: Connection -> StomplException -> IO ()
throwToOwner :: Connection -> EH
throwToOwner Connection
c = ThreadId -> EH
forall e. Exception e => ThreadId -> e -> IO ()
throwTo (Connection -> ThreadId
conOwner Connection
c)
data Writer a = SendQ {
Writer a -> Con
wCon :: Con,
Writer a -> String
wDest :: String,
Writer a -> String
wName :: String,
Writer a -> Bool
wRec :: Bool,
Writer a -> Bool
wWait :: Bool,
Writer a -> Bool
wCntl :: Bool,
Writer a -> Bool
wTx :: Bool,
Writer a -> OutBound a
wTo :: OutBound a}
data Reader a = RecvQ {
Reader a -> Con
rCon :: Con,
Reader a -> Sub
rSub :: Sub,
Reader a -> String
rDest :: String,
Reader a -> String
rName :: String,
Reader a -> AckMode
rMode :: F.AckMode,
Reader a -> Bool
rAuto :: Bool,
Reader a -> Bool
rRec :: Bool,
Reader a -> InBound a
rFrom :: InBound a}
instance Eq (Reader a) where
Reader a
q1 == :: Reader a -> Reader a -> Bool
== Reader a
q2 = Reader a -> String
forall a. Reader a -> String
rName Reader a
q1 String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== Reader a -> String
forall a. Reader a -> String
rName Reader a
q2
instance Eq (Writer a) where
Writer a
q1 == :: Writer a -> Writer a -> Bool
== Writer a
q2 = Writer a -> String
forall a. Writer a -> String
wName Writer a
q1 String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== Writer a -> String
forall a. Writer a -> String
wName Writer a
q2
data Qopt =
OWithReceipt
| OWaitReceipt
| OMode F.AckMode
| OAck
| OForceTx
| ONoContentLen
deriving (Int -> Qopt -> String -> String
[Qopt] -> String -> String
Qopt -> String
(Int -> Qopt -> String -> String)
-> (Qopt -> String) -> ([Qopt] -> String -> String) -> Show Qopt
forall a.
(Int -> a -> String -> String)
-> (a -> String) -> ([a] -> String -> String) -> Show a
showList :: [Qopt] -> String -> String
$cshowList :: [Qopt] -> String -> String
show :: Qopt -> String
$cshow :: Qopt -> String
showsPrec :: Int -> Qopt -> String -> String
$cshowsPrec :: Int -> Qopt -> String -> String
Show, ReadPrec [Qopt]
ReadPrec Qopt
Int -> ReadS Qopt
ReadS [Qopt]
(Int -> ReadS Qopt)
-> ReadS [Qopt] -> ReadPrec Qopt -> ReadPrec [Qopt] -> Read Qopt
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
readListPrec :: ReadPrec [Qopt]
$creadListPrec :: ReadPrec [Qopt]
readPrec :: ReadPrec Qopt
$creadPrec :: ReadPrec Qopt
readList :: ReadS [Qopt]
$creadList :: ReadS [Qopt]
readsPrec :: Int -> ReadS Qopt
$creadsPrec :: Int -> ReadS Qopt
Read, Qopt -> Qopt -> Bool
(Qopt -> Qopt -> Bool) -> (Qopt -> Qopt -> Bool) -> Eq Qopt
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Qopt -> Qopt -> Bool
$c/= :: Qopt -> Qopt -> Bool
== :: Qopt -> Qopt -> Bool
$c== :: Qopt -> Qopt -> Bool
Eq)
hasQopt :: Qopt -> [Qopt] -> Bool
hasQopt :: Qopt -> [Qopt] -> Bool
hasQopt Qopt
o [Qopt]
os = Qopt
o Qopt -> [Qopt] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [Qopt]
os
ackMode :: [Qopt] -> F.AckMode
ackMode :: [Qopt] -> AckMode
ackMode [Qopt]
os = case (Qopt -> Bool) -> [Qopt] -> Maybe Qopt
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find Qopt -> Bool
isMode [Qopt]
os of
Just (OMode AckMode
x) -> AckMode
x
Maybe Qopt
_ -> AckMode
F.Auto
where isMode :: Qopt -> Bool
isMode Qopt
x = case Qopt
x of
OMode AckMode
_ -> Bool
True
Qopt
_ -> Bool
False
type InBound a = Mime.Type -> Int -> [F.Header] -> B.ByteString -> IO a
type OutBound a = a -> IO B.ByteString
newReader :: Con -> String -> String -> [Qopt] -> [F.Header] ->
InBound a -> IO (Reader a)
newReader :: Con
-> String
-> String
-> [Qopt]
-> [Header]
-> InBound a
-> IO (Reader a)
newReader Con
cid String
qn String
dst [Qopt]
os [Header]
hs InBound a
conv = do
Connection
c <- Con -> IO Connection
getCon Con
cid
if Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ Connection -> Bool
connected Connection
c
then StomplException -> IO (Reader a)
forall e a. Exception e => e -> IO a
throwIO (StomplException -> IO (Reader a))
-> StomplException -> IO (Reader a)
forall a b. (a -> b) -> a -> b
$ String -> StomplException
ConnectException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$
String
"Not connected (" String -> String -> String
forall a. [a] -> [a] -> [a]
++ Con -> String
forall a. Show a => a -> String
show Con
cid String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
")"
else Con
-> Connection
-> String
-> String
-> [Qopt]
-> [Header]
-> InBound a
-> IO (Reader a)
forall a.
Con
-> Connection
-> String
-> String
-> [Qopt]
-> [Header]
-> InBound a
-> IO (Reader a)
newRecvQ Con
cid Connection
c String
qn String
dst [Qopt]
os [Header]
hs InBound a
conv
destroyReader :: Reader a -> IO ()
destroyReader :: Reader a -> IO ()
destroyReader = Reader a -> IO ()
forall a. Reader a -> IO ()
unsub
newWriter :: Con -> String -> String -> [Qopt] -> [F.Header] ->
OutBound a -> IO (Writer a)
newWriter :: Con
-> String
-> String
-> [Qopt]
-> [Header]
-> OutBound a
-> IO (Writer a)
newWriter Con
cid String
qn String
dst [Qopt]
os [Header]
_ OutBound a
conv = do
Connection
c <- Con -> IO Connection
getCon Con
cid
if Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ Connection -> Bool
connected Connection
c
then StomplException -> IO (Writer a)
forall e a. Exception e => e -> IO a
throwIO (StomplException -> IO (Writer a))
-> StomplException -> IO (Writer a)
forall a b. (a -> b) -> a -> b
$ String -> StomplException
ConnectException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$
String
"Not connected (" String -> String -> String
forall a. [a] -> [a] -> [a]
++ Con -> String
forall a. Show a => a -> String
show Con
cid String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
")"
else Con -> String -> String -> [Qopt] -> OutBound a -> IO (Writer a)
forall a.
Con -> String -> String -> [Qopt] -> OutBound a -> IO (Writer a)
newSendQ Con
cid String
qn String
dst [Qopt]
os OutBound a
conv
destroyWriter :: Writer a -> IO ()
destroyWriter :: Writer a -> IO ()
destroyWriter Writer a
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
withReader :: Con -> String ->
String -> [Qopt] -> [F.Header] ->
InBound i -> (Reader i -> IO r) -> IO r
withReader :: Con
-> String
-> String
-> [Qopt]
-> [Header]
-> InBound i
-> (Reader i -> IO r)
-> IO r
withReader Con
cid String
qn String
dst [Qopt]
os [Header]
hs InBound i
conv =
IO (Reader i) -> (Reader i -> IO ()) -> (Reader i -> IO r) -> IO r
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (Con
-> String
-> String
-> [Qopt]
-> [Header]
-> InBound i
-> IO (Reader i)
forall a.
Con
-> String
-> String
-> [Qopt]
-> [Header]
-> InBound a
-> IO (Reader a)
newReader Con
cid String
qn String
dst [Qopt]
os [Header]
hs InBound i
conv) Reader i -> IO ()
forall a. Reader a -> IO ()
unsub
withWriter :: Con -> String ->
String -> [Qopt] -> [F.Header] ->
OutBound o -> (Writer o -> IO r) -> IO r
withWriter :: Con
-> String
-> String
-> [Qopt]
-> [Header]
-> OutBound o
-> (Writer o -> IO r)
-> IO r
withWriter Con
cid String
qn String
dst [Qopt]
os [Header]
hs OutBound o
conv Writer o -> IO r
act =
Con
-> String
-> String
-> [Qopt]
-> [Header]
-> OutBound o
-> IO (Writer o)
forall a.
Con
-> String
-> String
-> [Qopt]
-> [Header]
-> OutBound a
-> IO (Writer a)
newWriter Con
cid String
qn String
dst [Qopt]
os [Header]
hs OutBound o
conv IO (Writer o) -> (Writer o -> IO r) -> IO r
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Writer o -> IO r
act
withPair :: Con -> String -> ReaderDesc i ->
WriterDesc o ->
((Reader i, Writer o) -> IO r) -> IO r
withPair :: Con
-> String
-> ReaderDesc i
-> WriterDesc o
-> ((Reader i, Writer o) -> IO r)
-> IO r
withPair Con
cid String
n (String
rq,[Qopt]
ro,[Header]
rh,InBound i
iconv)
(String
wq,[Qopt]
wo,[Header]
wh,OutBound o
oconv) (Reader i, Writer o) -> IO r
act =
Con
-> String
-> String
-> [Qopt]
-> [Header]
-> InBound i
-> (Reader i -> IO r)
-> IO r
forall i r.
Con
-> String
-> String
-> [Qopt]
-> [Header]
-> InBound i
-> (Reader i -> IO r)
-> IO r
withReader Con
cid (String
n String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
"_r") String
rq [Qopt]
ro [Header]
rh InBound i
iconv ((Reader i -> IO r) -> IO r) -> (Reader i -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Reader i
r ->
Con
-> String
-> String
-> [Qopt]
-> [Header]
-> OutBound o
-> (Writer o -> IO r)
-> IO r
forall o r.
Con
-> String
-> String
-> [Qopt]
-> [Header]
-> OutBound o
-> (Writer o -> IO r)
-> IO r
withWriter Con
cid (String
n String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
"_w") String
wq [Qopt]
wo [Header]
wh OutBound o
oconv ((Writer o -> IO r) -> IO r) -> (Writer o -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Writer o
w -> (Reader i, Writer o) -> IO r
act (Reader i
r,Writer o
w)
type ReaderDesc i = (String, [Qopt], [F.Header], InBound i)
type WriterDesc o = (String, [Qopt], [F.Header], OutBound o)
newSendQ :: Con -> String -> String -> [Qopt] ->
OutBound a -> IO (Writer a)
newSendQ :: Con -> String -> String -> [Qopt] -> OutBound a -> IO (Writer a)
newSendQ Con
cid String
qn String
dst [Qopt]
os OutBound a
conv =
Writer a -> IO (Writer a)
forall (m :: * -> *) a. Monad m => a -> m a
return SendQ :: forall a.
Con
-> String
-> String
-> Bool
-> Bool
-> Bool
-> Bool
-> OutBound a
-> Writer a
SendQ {
wCon :: Con
wCon = Con
cid,
wDest :: String
wDest = String
dst,
wName :: String
wName = String
qn,
wRec :: Bool
wRec = Qopt -> [Qopt] -> Bool
hasQopt Qopt
OWithReceipt [Qopt]
os,
wWait :: Bool
wWait = Qopt -> [Qopt] -> Bool
hasQopt Qopt
OWaitReceipt [Qopt]
os,
wTx :: Bool
wTx = Qopt -> [Qopt] -> Bool
hasQopt Qopt
OForceTx [Qopt]
os,
wCntl :: Bool
wCntl = Qopt -> [Qopt] -> Bool
hasQopt Qopt
ONoContentLen [Qopt]
os,
wTo :: OutBound a
wTo = OutBound a
conv}
newRecvQ :: Con -> Connection -> String -> String ->
[Qopt] -> [F.Header] ->
InBound a -> IO (Reader a)
newRecvQ :: Con
-> Connection
-> String
-> String
-> [Qopt]
-> [Header]
-> InBound a
-> IO (Reader a)
newRecvQ Con
cid Connection
c String
qn String
dst [Qopt]
os [Header]
hs InBound a
conv = do
let am :: AckMode
am = [Qopt] -> AckMode
ackMode [Qopt]
os
let au :: Bool
au = Qopt -> [Qopt] -> Bool
hasQopt Qopt
OAck [Qopt]
os
let with :: Bool
with = Qopt -> [Qopt] -> Bool
hasQopt Qopt
OWithReceipt [Qopt]
os Bool -> Bool -> Bool
|| Qopt -> [Qopt] -> Bool
hasQopt Qopt
OWaitReceipt [Qopt]
os
Sub
sid <- IO Sub
mkUniqueSubId
Rec
rc <- if Bool
with then IO Rec
mkUniqueRecc else Rec -> IO Rec
forall (m :: * -> *) a. Monad m => a -> m a
return Rec
NoRec
Con -> IO ()
logSend Con
cid
Connection -> Subscription -> String -> [Header] -> IO ()
fSubscribe Connection
c(Sub -> String -> AckMode -> Subscription
mkSub Sub
sid String
dst AckMode
am) (Rec -> String
forall a. Show a => a -> String
show Rec
rc) [Header]
hs
Chan Frame
ch <- IO (Chan Frame)
forall a. IO (Chan a)
newChan
Con -> SubEntry -> IO ()
addSub Con
cid (Sub
sid, Chan Frame
ch)
Con -> DestEntry -> IO ()
addDest Con
cid (String
dst, Chan Frame
ch)
let q :: Reader a
q = RecvQ :: forall a.
Con
-> Sub
-> String
-> String
-> AckMode
-> Bool
-> Bool
-> InBound a
-> Reader a
RecvQ {
rCon :: Con
rCon = Con
cid,
rSub :: Sub
rSub = Sub
sid,
rDest :: String
rDest = String
dst,
rName :: String
rName = String
qn,
rMode :: AckMode
rMode = AckMode
am,
rAuto :: Bool
rAuto = Bool
au,
rRec :: Bool
rRec = Bool
with,
rFrom :: InBound a
rFrom = InBound a
conv}
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
with (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Con -> Rec -> IO ()
waitReceipt Con
cid Rec
rc
Reader a -> IO (Reader a)
forall (m :: * -> *) a. Monad m => a -> m a
return Reader a
q
unsub :: Reader a -> IO ()
unsub :: Reader a -> IO ()
unsub Reader a
q = do
let cid :: Con
cid = Reader a -> Con
forall a. Reader a -> Con
rCon Reader a
q
let sid :: Sub
sid = Reader a -> Sub
forall a. Reader a -> Sub
rSub Reader a
q
let dst :: String
dst = Reader a -> String
forall a. Reader a -> String
rDest Reader a
q
Rec
rc <- if Reader a -> Bool
forall a. Reader a -> Bool
rRec Reader a
q then IO Rec
mkUniqueRecc else Rec -> IO Rec
forall (m :: * -> *) a. Monad m => a -> m a
return Rec
NoRec
Connection
c <- Con -> IO Connection
getCon Con
cid
Con -> IO ()
logSend Con
cid
IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
finally (Connection -> Subscription -> String -> [Header] -> IO ()
fUnsubscribe Connection
c
(Sub -> String -> AckMode -> Subscription
mkSub Sub
sid String
dst AckMode
F.Client)
(Rec -> String
forall a. Show a => a -> String
show Rec
rc) [])
(do Con -> Sub -> IO ()
rmSub Con
cid Sub
sid
Con -> String -> IO ()
rmDest Con
cid String
dst)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Reader a -> Bool
forall a. Reader a -> Bool
rRec Reader a
q) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Con -> Rec -> IO ()
waitReceipt Con
cid Rec
rc
readQ :: Reader a -> IO (Message a)
readQ :: Reader a -> IO (Message a)
readQ Reader a
q = do
Connection
c <- Con -> IO Connection
getCon (Reader a -> Con
forall a. Reader a -> Con
rCon Reader a
q)
if Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ Connection -> Bool
connected Connection
c
then StomplException -> IO (Message a)
forall e a. Exception e => e -> IO a
throwIO (StomplException -> IO (Message a))
-> StomplException -> IO (Message a)
forall a b. (a -> b) -> a -> b
$ String -> StomplException
QueueException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$ String
"Not connected: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Con -> String
forall a. Show a => a -> String
show (Reader a -> Con
forall a. Reader a -> Con
rCon Reader a
q)
else case Sub -> Connection -> Maybe (Chan Frame)
getSub (Reader a -> Sub
forall a. Reader a -> Sub
rSub Reader a
q) Connection
c of
Maybe (Chan Frame)
Nothing -> StomplException -> IO (Message a)
forall e a. Exception e => e -> IO a
throwIO (StomplException -> IO (Message a))
-> StomplException -> IO (Message a)
forall a b. (a -> b) -> a -> b
$ String -> StomplException
QueueException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$
String
"Unknown queue " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Reader a -> String
forall a. Reader a -> String
rName Reader a
q
Just Chan Frame
ch -> do
Message a
m <- Chan Frame -> IO Frame
forall a. Chan a -> IO a
readChan Chan Frame
ch IO Frame -> (Frame -> IO (Message a)) -> IO (Message a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Reader a -> Frame -> IO (Message a)
forall a. Reader a -> Frame -> IO (Message a)
frmToMsg Reader a
q
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Reader a -> AckMode
forall a. Reader a -> AckMode
rMode Reader a
q AckMode -> AckMode -> Bool
forall a. Eq a => a -> a -> Bool
/= AckMode
F.Auto) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
if Reader a -> Bool
forall a. Reader a -> Bool
rAuto Reader a
q
then Con -> Message a -> IO ()
forall a. Con -> Message a -> IO ()
ack (Reader a -> Con
forall a. Reader a -> Con
rCon Reader a
q) Message a
m
else Con -> MsgId -> IO ()
addAck (Reader a -> Con
forall a. Reader a -> Con
rCon Reader a
q) (Message a -> MsgId
forall a. Message a -> MsgId
msgId Message a
m)
Message a -> IO (Message a)
forall (m :: * -> *) a. Monad m => a -> m a
return Message a
m
writeQ :: Writer a -> Mime.Type -> [F.Header] -> a -> IO ()
writeQ :: Writer a -> Type -> [Header] -> a -> IO ()
writeQ Writer a
q Type
mime [Header]
hs a
x =
Writer a -> Type -> [Header] -> a -> IO Rec
forall a. Writer a -> Type -> [Header] -> a -> IO Rec
writeQWith Writer a
q Type
mime [Header]
hs a
x IO Rec -> (Rec -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (\Rec
_ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
writeAdHoc :: Writer a -> String -> Mime.Type -> [F.Header] -> a -> IO ()
writeAdHoc :: Writer a -> String -> Type -> [Header] -> a -> IO ()
writeAdHoc Writer a
q String
dest Type
mime [Header]
hs a
x =
Writer a -> String -> Type -> [Header] -> a -> IO Rec
forall a. Writer a -> String -> Type -> [Header] -> a -> IO Rec
writeGeneric Writer a
q String
dest Type
mime [Header]
hs a
x IO Rec -> (Rec -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (\Rec
_ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
writeQWith :: Writer a -> Mime.Type -> [F.Header] -> a -> IO Receipt
writeQWith :: Writer a -> Type -> [Header] -> a -> IO Rec
writeQWith Writer a
q = Writer a -> String -> Type -> [Header] -> a -> IO Rec
forall a. Writer a -> String -> Type -> [Header] -> a -> IO Rec
writeGeneric Writer a
q (Writer a -> String
forall a. Writer a -> String
wDest Writer a
q)
writeAdHocWith :: Writer a -> String -> Mime.Type -> [F.Header] -> a -> IO Receipt
writeAdHocWith :: Writer a -> String -> Type -> [Header] -> a -> IO Rec
writeAdHocWith = Writer a -> String -> Type -> [Header] -> a -> IO Rec
forall a. Writer a -> String -> Type -> [Header] -> a -> IO Rec
writeGeneric
writeGeneric :: Writer a -> String ->
Mime.Type -> [F.Header] -> a -> IO Receipt
writeGeneric :: Writer a -> String -> Type -> [Header] -> a -> IO Rec
writeGeneric Writer a
q String
dest Type
mime [Header]
hs a
x = do
Connection
c <- Con -> IO Connection
getCon (Writer a -> Con
forall a. Writer a -> Con
wCon Writer a
q)
if Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ Connection -> Bool
connected Connection
c
then StomplException -> IO Rec
forall e a. Exception e => e -> IO a
throwIO (StomplException -> IO Rec) -> StomplException -> IO Rec
forall a b. (a -> b) -> a -> b
$ String -> StomplException
ConnectException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$
String
"Not connected (" String -> String -> String
forall a. [a] -> [a] -> [a]
++ Con -> String
forall a. Show a => a -> String
show (Writer a -> Con
forall a. Writer a -> Con
wCon Writer a
q) String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
")"
else do
Tx
tx <- Connection -> IO (Maybe Tx)
getCurTx Connection
c IO (Maybe Tx) -> (Maybe Tx -> IO Tx) -> IO Tx
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (\Maybe Tx
mbT ->
case Maybe Tx
mbT of
Maybe Tx
Nothing -> Tx -> IO Tx
forall (m :: * -> *) a. Monad m => a -> m a
return Tx
NoTx
Just Tx
t -> Tx -> IO Tx
forall (m :: * -> *) a. Monad m => a -> m a
return Tx
t)
if Tx
tx Tx -> Tx -> Bool
forall a. Eq a => a -> a -> Bool
== Tx
NoTx Bool -> Bool -> Bool
&& Writer a -> Bool
forall a. Writer a -> Bool
wTx Writer a
q
then StomplException -> IO Rec
forall e a. Exception e => e -> IO a
throwIO (StomplException -> IO Rec) -> StomplException -> IO Rec
forall a b. (a -> b) -> a -> b
$ String -> StomplException
QueueException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$
String
"Queue '" String -> String -> String
forall a. [a] -> [a] -> [a]
++ Writer a -> String
forall a. Writer a -> String
wName Writer a
q String -> String -> String
forall a. [a] -> [a] -> [a]
++
String
"' with OForceTx used outside Transaction"
else do
let conv :: OutBound a
conv = Writer a -> OutBound a
forall a. Writer a -> OutBound a
wTo Writer a
q
ByteString
s <- OutBound a
conv a
x
Rec
rc <- if Writer a -> Bool
forall a. Writer a -> Bool
wRec Writer a
q then IO Rec
mkUniqueRecc else Rec -> IO Rec
forall (m :: * -> *) a. Monad m => a -> m a
return Rec
NoRec
let l :: Int
l = if Writer a -> Bool
forall a. Writer a -> Bool
wCntl Writer a
q then -Int
1 else ByteString -> Int
B.length ByteString
s
let m :: Message a
m = MsgId
-> Sub
-> String
-> String
-> Type
-> Int
-> Tx
-> ByteString
-> a
-> Message a
forall a.
MsgId
-> Sub
-> String
-> String
-> Type
-> Int
-> Tx
-> ByteString
-> a
-> Message a
mkMessage MsgId
NoMsg Sub
NoSub String
dest String
"" Type
mime Int
l Tx
tx ByteString
s a
x
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Writer a -> Bool
forall a. Writer a -> Bool
wRec Writer a
q) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Con -> Rec -> IO ()
addRec (Writer a -> Con
forall a. Writer a -> Con
wCon Writer a
q) Rec
rc
Con -> IO ()
logSend (Con -> IO ()) -> Con -> IO ()
forall a b. (a -> b) -> a -> b
$ Writer a -> Con
forall a. Writer a -> Con
wCon Writer a
q
Connection -> Message a -> String -> [Header] -> IO ()
forall a. Connection -> Message a -> String -> [Header] -> IO ()
fSend Connection
c Message a
m (Rec -> String
forall a. Show a => a -> String
show Rec
rc) [Header]
hs
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Writer a -> Bool
forall a. Writer a -> Bool
wRec Writer a
q Bool -> Bool -> Bool
&& Writer a -> Bool
forall a. Writer a -> Bool
wWait Writer a
q) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Con -> Rec -> IO ()
waitReceipt (Writer a -> Con
forall a. Writer a -> Con
wCon Writer a
q) Rec
rc
Rec -> IO Rec
forall (m :: * -> *) a. Monad m => a -> m a
return Rec
rc
ack :: Con -> Message a -> IO ()
ack :: Con -> Message a -> IO ()
ack Con
cid Message a
msg = do
Con -> Bool -> Bool -> Message a -> IO ()
forall a. Con -> Bool -> Bool -> Message a -> IO ()
ack' Con
cid Bool
True Bool
False Message a
msg
Con -> MsgId -> IO ()
rmAck Con
cid (MsgId -> IO ()) -> MsgId -> IO ()
forall a b. (a -> b) -> a -> b
$ Message a -> MsgId
forall a. Message a -> MsgId
msgId Message a
msg
ackWith :: Con -> Message a -> IO ()
ackWith :: Con -> Message a -> IO ()
ackWith Con
cid Message a
msg = do
Con -> Bool -> Bool -> Message a -> IO ()
forall a. Con -> Bool -> Bool -> Message a -> IO ()
ack' Con
cid Bool
True Bool
True Message a
msg
Con -> MsgId -> IO ()
rmAck Con
cid (MsgId -> IO ()) -> MsgId -> IO ()
forall a b. (a -> b) -> a -> b
$ Message a -> MsgId
forall a. Message a -> MsgId
msgId Message a
msg
nack :: Con -> Message a -> IO ()
nack :: Con -> Message a -> IO ()
nack Con
cid Message a
msg = do
Con -> Bool -> Bool -> Message a -> IO ()
forall a. Con -> Bool -> Bool -> Message a -> IO ()
ack' Con
cid Bool
False Bool
False Message a
msg
Con -> MsgId -> IO ()
rmAck Con
cid (MsgId -> IO ()) -> MsgId -> IO ()
forall a b. (a -> b) -> a -> b
$ Message a -> MsgId
forall a. Message a -> MsgId
msgId Message a
msg
nackWith :: Con -> Message a -> IO ()
nackWith :: Con -> Message a -> IO ()
nackWith Con
cid Message a
msg = do
Con -> Bool -> Bool -> Message a -> IO ()
forall a. Con -> Bool -> Bool -> Message a -> IO ()
ack' Con
cid Bool
False Bool
True Message a
msg
Con -> MsgId -> IO ()
rmAck Con
cid (MsgId -> IO ()) -> MsgId -> IO ()
forall a b. (a -> b) -> a -> b
$ Message a -> MsgId
forall a. Message a -> MsgId
msgId Message a
msg
ack' :: Con -> Bool -> Bool -> Message a -> IO ()
ack' :: Con -> Bool -> Bool -> Message a -> IO ()
ack' Con
cid Bool
ok Bool
with Message a
msg = do
Connection
c <- Con -> IO Connection
getCon Con
cid
if Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ Connection -> Bool
connected Connection
c
then EH
forall e a. Exception e => e -> IO a
throwIO EH -> EH
forall a b. (a -> b) -> a -> b
$ String -> StomplException
ConnectException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$
String
"Not connected (" String -> String -> String
forall a. [a] -> [a] -> [a]
++ Con -> String
forall a. Show a => a -> String
show Con
cid String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
")"
else if String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (String -> String
forall a. Show a => a -> String
show (String -> String) -> String -> String
forall a b. (a -> b) -> a -> b
$ Message a -> String
forall a. Message a -> String
msgAck Message a
msg)
then EH
forall e a. Exception e => e -> IO a
throwIO EH -> EH
forall a b. (a -> b) -> a -> b
$ String -> StomplException
ProtocolException String
"No ack in message!"
else do
Tx
tx <- Connection -> IO (Maybe Tx)
getCurTx Connection
c IO (Maybe Tx) -> (Maybe Tx -> IO Tx) -> IO Tx
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (\Maybe Tx
mbT ->
case Maybe Tx
mbT of
Maybe Tx
Nothing -> Tx -> IO Tx
forall (m :: * -> *) a. Monad m => a -> m a
return Tx
NoTx
Just Tx
x -> Tx -> IO Tx
forall (m :: * -> *) a. Monad m => a -> m a
return Tx
x)
let msg' :: Message a
msg' = Message a
msg {msgTx :: Tx
msgTx = Tx
tx}
Rec
rc <- if Bool
with then IO Rec
mkUniqueRecc else Rec -> IO Rec
forall (m :: * -> *) a. Monad m => a -> m a
return Rec
NoRec
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
with (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Con -> Rec -> IO ()
addRec Con
cid Rec
rc
Con -> IO ()
logSend Con
cid
if Bool
ok then Connection -> Message a -> String -> IO ()
forall a. Connection -> Message a -> String -> IO ()
fAck Connection
c Message a
msg' (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ Rec -> String
forall a. Show a => a -> String
show Rec
rc
else Connection -> Message a -> String -> IO ()
forall a. Connection -> Message a -> String -> IO ()
fNack Connection
c Message a
msg' (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ Rec -> String
forall a. Show a => a -> String
show Rec
rc
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
with (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Con -> Rec -> IO ()
waitReceipt Con
cid Rec
rc
withTransaction :: Con -> [Topt] -> (Tx -> IO a) -> IO a
withTransaction :: Con -> [Topt] -> (Tx -> IO a) -> IO a
withTransaction Con
cid [Topt]
os Tx -> IO a
op = do
Tx
tx <- IO Tx
mkUniqueTxId
let t :: Transaction
t = Tx -> [Topt] -> Transaction
mkTrn Tx
tx [Topt]
os
Connection
c <- Con -> IO Connection
getCon Con
cid
if Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ Connection -> Bool
connected Connection
c
then StomplException -> IO a
forall e a. Exception e => e -> IO a
throwIO (StomplException -> IO a) -> StomplException -> IO a
forall a b. (a -> b) -> a -> b
$ String -> StomplException
ConnectException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$
String
"Not connected (" String -> String -> String
forall a. [a] -> [a] -> [a]
++ Con -> String
forall a. Show a => a -> String
show Con
cid String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
")"
else IO a -> IO () -> IO a
forall a b. IO a -> IO b -> IO a
finally (do Transaction -> Con -> IO ()
addTx Transaction
t Con
cid
Con -> Connection -> Transaction -> IO ()
startTx Con
cid Connection
c Transaction
t
a
x <- Tx -> IO a
op Tx
tx
Tx -> Con -> TxState -> IO ()
updTxState Tx
tx Con
cid TxState
TxEnded
a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x)
(Con -> Tx -> IO ()
terminateTx Con
cid Tx
tx IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
`onException` Tx -> Con -> IO ()
rmThisTx Tx
tx Con
cid)
waitReceipt :: Con -> Receipt -> IO ()
waitReceipt :: Con -> Rec -> IO ()
waitReceipt Con
cid Rec
r =
case Rec
r of
Rec
NoRec -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Rec
_ -> IO ()
waitForMe
where waitForMe :: IO ()
waitForMe = do
Bool
ok <- Con -> Rec -> IO Bool
checkReceipt Con
cid Rec
r
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
ok (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Int -> IO ()
threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> Int
ms Int
1
IO ()
waitForMe
abort :: String -> IO ()
abort :: String -> IO ()
abort String
e = EH
forall e a. Exception e => e -> IO a
throwIO EH -> EH
forall a b. (a -> b) -> a -> b
$ String -> StomplException
AppException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$
String
"Tx aborted by application: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
e
terminateTx :: Con -> Tx -> IO ()
terminateTx :: Con -> Tx -> IO ()
terminateTx Con
cid Tx
tx = do
Connection
c <- Con -> IO Connection
getCon Con
cid
Maybe Transaction
mbT <- Tx -> Connection -> IO (Maybe Transaction)
getTx Tx
tx Connection
c
case Maybe Transaction
mbT of
Maybe Transaction
Nothing -> String -> IO ()
putStrLn String
"Transaction terminated!"
Just Transaction
t | Transaction -> TxState
txState Transaction
t TxState -> TxState -> Bool
forall a. Eq a => a -> a -> Bool
/= TxState
TxEnded -> Bool -> Con -> Connection -> Tx -> Transaction -> IO ()
endTx Bool
False Con
cid Connection
c Tx
tx Transaction
t
| Transaction -> Bool
txReceipts Transaction
t Bool -> Bool -> Bool
|| Transaction -> Bool
txPendingAck Transaction
t -> do
Bool
ok <- Tx -> Con -> Int -> IO Bool
waitTx Tx
tx Con
cid (Int -> IO Bool) -> Int -> IO Bool
forall a b. (a -> b) -> a -> b
$ Transaction -> Int
txTmo Transaction
t
if Bool
ok
then Bool -> Con -> Connection -> Tx -> Transaction -> IO ()
endTx Bool
True Con
cid Connection
c Tx
tx Transaction
t
else do
Bool -> Con -> Connection -> Tx -> Transaction -> IO ()
endTx Bool
False Con
cid Connection
c Tx
tx Transaction
t
let m :: String
m = if Transaction -> Bool
txPendingAck Transaction
t then String
"Acks" else String
"Receipts"
EH
forall e a. Exception e => e -> IO a
throwIO EH -> EH
forall a b. (a -> b) -> a -> b
$ String -> StomplException
TxException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$
String
"Transaction aborted: Missing " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
m
| Bool
otherwise -> Bool -> Con -> Connection -> Tx -> Transaction -> IO ()
endTx Bool
True Con
cid Connection
c Tx
tx Transaction
t
startTx :: Con -> Connection -> Transaction -> IO ()
startTx :: Con -> Connection -> Transaction -> IO ()
startTx Con
cid Connection
c Transaction
t = do
Rec
rc <- if Transaction -> Bool
txAbrtRc Transaction
t then IO Rec
mkUniqueRecc else Rec -> IO Rec
forall (m :: * -> *) a. Monad m => a -> m a
return Rec
NoRec
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Transaction -> Bool
txAbrtRc Transaction
t) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Con -> Rec -> IO ()
addRec Con
cid Rec
rc
Con -> IO ()
logSend Con
cid
Connection -> String -> String -> IO ()
fBegin Connection
c (Tx -> String
forall a. Show a => a -> String
show (Tx -> String) -> Tx -> String
forall a b. (a -> b) -> a -> b
$ Transaction -> Tx
txId Transaction
t) (Rec -> String
forall a. Show a => a -> String
show Rec
rc)
endTx :: Bool -> Con -> Connection -> Tx -> Transaction -> IO ()
endTx :: Bool -> Con -> Connection -> Tx -> Transaction -> IO ()
endTx Bool
x Con
cid Connection
c Tx
tx Transaction
t = do
let w :: Bool
w = Transaction -> Bool
txAbrtRc Transaction
t Bool -> Bool -> Bool
&& Transaction -> Int
txTmo Transaction
t Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
Rec
rc <- if Bool
w then IO Rec
mkUniqueRecc else Rec -> IO Rec
forall (m :: * -> *) a. Monad m => a -> m a
return Rec
NoRec
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
w (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Con -> Rec -> IO ()
addRec Con
cid Rec
rc
Con -> IO ()
logSend Con
cid
if Bool
x then Connection -> String -> String -> IO ()
fCommit Connection
c (Tx -> String
forall a. Show a => a -> String
show Tx
tx) (Rec -> String
forall a. Show a => a -> String
show Rec
rc)
else Connection -> String -> String -> IO ()
fAbort Connection
c (Tx -> String
forall a. Show a => a -> String
show Tx
tx) (Rec -> String
forall a. Show a => a -> String
show Rec
rc)
Maybe ()
mbR <- if Bool
w then Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
timeout (Int -> Int
ms (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$ Transaction -> Int
txTmo Transaction
t) (IO () -> IO (Maybe ())) -> IO () -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ Con -> Rec -> IO ()
waitReceipt Con
cid Rec
rc
else Maybe () -> IO (Maybe ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe () -> IO (Maybe ())) -> Maybe () -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ () -> Maybe ()
forall a. a -> Maybe a
Just ()
Con -> IO ()
rmTx Con
cid
case Maybe ()
mbR of
Just ()
_ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Maybe ()
Nothing -> EH
forall e a. Exception e => e -> IO a
throwIO EH -> EH
forall a b. (a -> b) -> a -> b
$ String -> StomplException
TxException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$
String
"Transaction in unknown State: " String -> String -> String
forall a. [a] -> [a] -> [a]
++
String
"missing receipt for " String -> String -> String
forall a. [a] -> [a] -> [a]
++ (if Bool
x then String
"commit!"
else String
"abort!")
waitTx :: Tx -> Con -> Int -> IO Bool
waitTx :: Tx -> Con -> Int -> IO Bool
waitTx Tx
tx Con
cid Int
delay = do
Connection
c <- Con -> IO Connection
getCon Con
cid
Maybe Transaction
mbT <- Tx -> Connection -> IO (Maybe Transaction)
getTx Tx
tx Connection
c
case Maybe Transaction
mbT of
Maybe Transaction
Nothing -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Just Transaction
t | Transaction -> Bool
txPendingAck Transaction
t -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
| Transaction -> Bool
txReceipts Transaction
t ->
if Int
delay Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 then Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
else do
Int -> IO ()
threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> Int
ms Int
1
Tx -> Con -> Int -> IO Bool
waitTx Tx
tx Con
cid (Int
delay Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)
| Bool
otherwise -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
frmToMsg :: Reader a -> F.Frame -> IO (Message a)
frmToMsg :: Reader a -> Frame -> IO (Message a)
frmToMsg Reader a
q Frame
f = do
let b :: ByteString
b = Frame -> ByteString
F.getBody Frame
f
let conv :: InBound a
conv = Reader a -> InBound a
forall a. Reader a -> InBound a
rFrom Reader a
q
let sid :: Sub
sid = if String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (Frame -> String
F.getSub Frame
f) Bool -> Bool -> Bool
|| Bool -> Bool
not (String -> Bool
numeric (String -> Bool) -> String -> Bool
forall a b. (a -> b) -> a -> b
$ Frame -> String
F.getSub Frame
f)
then Sub
NoSub else Int -> Sub
Sub (Int -> Sub) -> Int -> Sub
forall a b. (a -> b) -> a -> b
$ String -> Int
forall a. Read a => String -> a
read (String -> Int) -> String -> Int
forall a b. (a -> b) -> a -> b
$ Frame -> String
F.getSub Frame
f
a
x <- InBound a
conv (Frame -> Type
F.getMime Frame
f) (Frame -> Int
F.getLength Frame
f) (Frame -> [Header]
F.getHeaders Frame
f) ByteString
b
let m :: Message a
m = MsgId
-> Sub
-> String
-> String
-> Type
-> Int
-> Tx
-> ByteString
-> a
-> Message a
forall a.
MsgId
-> Sub
-> String
-> String
-> Type
-> Int
-> Tx
-> ByteString
-> a
-> Message a
mkMessage (String -> MsgId
MsgId (String -> MsgId) -> String -> MsgId
forall a b. (a -> b) -> a -> b
$ Frame -> String
F.getId Frame
f) Sub
sid
(Frame -> String
F.getDest Frame
f)
(Frame -> String
F.getMsgAck Frame
f)
(Frame -> Type
F.getMime Frame
f)
(Frame -> Int
F.getLength Frame
f)
Tx
NoTx
ByteString
b
a
x
Message a -> IO (Message a)
forall (m :: * -> *) a. Monad m => a -> m a
return Message a
m {msgHdrs :: [Header]
msgHdrs = Frame -> [Header]
F.getHeaders Frame
f}
handleConnected :: Con -> F.Frame -> MVar () -> IO ()
handleConnected :: Con -> Frame -> MVar () -> IO ()
handleConnected Con
cid Frame
f MVar ()
m = do
Connection
c <- Con -> IO Connection
getCon Con
cid
if Connection -> Bool
connected Connection
c
then Connection -> EH
throwToOwner Connection
c EH -> EH
forall a b. (a -> b) -> a -> b
$
String -> StomplException
ProtocolException String
"Unexptected Connected frame"
else Con -> Connection -> IO ()
updCon Con
cid Connection
c {conSrv :: String
conSrv = let srv :: SrvDesc
srv = Frame -> SrvDesc
F.getServer Frame
f
in SrvDesc -> String
F.getSrvName SrvDesc
srv String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
"/" String -> String -> String
forall a. [a] -> [a] -> [a]
++
SrvDesc -> String
F.getSrvVer SrvDesc
srv String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" (" String -> String -> String
forall a. [a] -> [a] -> [a]
++
SrvDesc -> String
F.getSrvCmts SrvDesc
srv String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
")",
conBeat :: Version
conBeat = Frame -> Version
F.getBeat Frame
f,
conVers :: [Version]
conVers = [Frame -> Version
F.getVersion Frame
f],
conSes :: String
conSes = Frame -> String
F.getSession Frame
f}
IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
m ()
handleMessage :: Con -> F.Frame -> IO ()
handleMessage :: Con -> Frame -> IO ()
handleMessage Con
cid Frame
f = do
Connection
c <- Con -> IO Connection
getCon Con
cid
case Connection -> Maybe (Chan Frame)
getCh Connection
c of
Maybe (Chan Frame)
Nothing -> Connection -> EH
throwToOwner Connection
c EH -> EH
forall a b. (a -> b) -> a -> b
$
String -> StomplException
ProtocolException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$ String
"Unknown Channel: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Frame -> String
forall a. Show a => a -> String
show Frame
f
Just Chan Frame
ch -> Chan Frame -> Frame -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan Frame
ch Frame
f
where getCh :: Connection -> Maybe (Chan Frame)
getCh Connection
c = let dst :: String
dst = Frame -> String
F.getDest Frame
f
sid :: String
sid = Frame -> String
F.getSub Frame
f
in if String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null String
sid
then String -> Connection -> Maybe (Chan Frame)
getDest String
dst Connection
c
else if String -> Bool
numeric String
sid
then Sub -> Connection -> Maybe (Chan Frame)
getSub (Int -> Sub
Sub (Int -> Sub) -> Int -> Sub
forall a b. (a -> b) -> a -> b
$ String -> Int
forall a. Read a => String -> a
read String
sid) Connection
c
else Maybe (Chan Frame)
forall a. Maybe a
Nothing
handleError :: Con -> F.Frame -> IO ()
handleError :: Con -> Frame -> IO ()
handleError Con
cid Frame
f = do
Connection
c <- Con -> IO Connection
getCon Con
cid
case Connection -> Maybe (Con -> Frame -> IO ())
getEH Connection
c of
Just Con -> Frame -> IO ()
eh -> Con -> Frame -> IO ()
eh Con
cid Frame
f
Maybe (Con -> Frame -> IO ())
Nothing -> let r :: String
r | String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (Frame -> String
F.getReceipt Frame
f) = String
""
| Bool
otherwise = String
" (" String -> String -> String
forall a. [a] -> [a] -> [a]
++ Frame -> String
F.getReceipt Frame
f String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
")"
e :: String
e = Frame -> String
F.getMsg Frame
f String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
r String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
": " String -> String -> String
forall a. [a] -> [a] -> [a]
++ ByteString -> String
U.toString (Frame -> ByteString
F.getBody Frame
f)
in Connection -> EH
throwToOwner Connection
c (String -> StomplException
BrokerException String
e)
handleReceipt :: Con -> F.Frame -> IO ()
handleReceipt :: Con -> Frame -> IO ()
handleReceipt Con
cid Frame
f = do
Connection
c <- Con -> IO Connection
getCon Con
cid
case String -> Maybe Rec
parseRec (String -> Maybe Rec) -> String -> Maybe Rec
forall a b. (a -> b) -> a -> b
$ Frame -> String
F.getReceipt Frame
f of
Just Rec
r -> Con -> Rec -> IO ()
forceRmRec Con
cid Rec
r
Maybe Rec
Nothing -> Connection -> EH
throwToOwner Connection
c EH -> EH
forall a b. (a -> b) -> a -> b
$
String -> StomplException
ProtocolException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$ String
"Invalid Receipt: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Frame -> String
forall a. Show a => a -> String
show Frame
f
handleBeat :: Con -> F.Frame -> IO ()
handleBeat :: Con -> Frame -> IO ()
handleBeat Con
_ Frame
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
heartBeat :: Con -> Int -> IO ()
heartBeat :: Con -> Int -> IO ()
heartBeat Con
cid Int
p = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
UTCTime
now <- IO UTCTime
getCurrentTime
Connection
c <- Con -> IO Connection
getCon Con
cid
let me :: UTCTime
me = Connection -> UTCTime
myMust Connection
c
let he :: UTCTime
he = Connection -> UTCTime
hisMust Connection
c
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (UTCTime
now UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
> UTCTime
he) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection -> EH
throwToOwner Connection
c EH -> EH
forall a b. (a -> b) -> a -> b
$
String -> StomplException
ProtocolException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$
String
"Missing HeartBeat, last was " String -> String -> String
forall a. [a] -> [a] -> [a]
++
NominalDiffTime -> String
forall a. Show a => a -> String
show (UTCTime
now UTCTime -> UTCTime -> NominalDiffTime
`diffUTCTime` UTCTime
he) String -> String -> String
forall a. [a] -> [a] -> [a]
++
String
" seconds ago!"
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (UTCTime
now UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
>= UTCTime
me) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection -> IO ()
fSendBeat Connection
c
Int -> IO ()
threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> Int
ms Int
p
myMust :: Connection -> UTCTime
myMust :: Connection -> UTCTime
myMust Connection
c = let t :: UTCTime
t = Connection -> UTCTime
conMyBeat Connection
c
p :: Int
p = Version -> Int
forall a b. (a, b) -> b
snd (Version -> Int) -> Version -> Int
forall a b. (a -> b) -> a -> b
$ Connection -> Version
conBeat Connection
c
in UTCTime -> Int -> UTCTime
timeAdd UTCTime
t Int
p
hisMust :: Connection -> UTCTime
hisMust :: Connection -> UTCTime
hisMust Connection
c = let t :: UTCTime
t = Connection -> UTCTime
conHisBeat Connection
c
tol :: Int
tol = Int
4
b :: Int
b = Version -> Int
forall a b. (a, b) -> a
fst (Version -> Int) -> Version -> Int
forall a b. (a -> b) -> a -> b
$ Connection -> Version
conBeat Connection
c
p :: Int
p = Int
tol Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
b
in UTCTime -> Int -> UTCTime
timeAdd UTCTime
t Int
p
timeAdd :: UTCTime -> Int -> UTCTime
timeAdd :: UTCTime -> Int -> UTCTime
timeAdd UTCTime
t Int
p = Int -> NominalDiffTime
ms2nominal Int
p NominalDiffTime -> UTCTime -> UTCTime
`addUTCTime` UTCTime
t
ms2nominal :: Int -> NominalDiffTime
ms2nominal :: Int -> NominalDiffTime
ms2nominal Int
m = Int -> NominalDiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
m NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Fractional a => a -> a -> a
/ (NominalDiffTime
1000::NominalDiffTime)
fBegin :: Connection -> String -> String -> IO ()
fBegin :: Connection -> String -> String -> IO ()
fBegin Connection
c String
tx String
receipt = Connection
-> String
-> String
-> [Header]
-> (String -> String -> [Header] -> Either String Frame)
-> IO ()
forall a.
Connection
-> a
-> String
-> [Header]
-> (a -> String -> [Header] -> Either String Frame)
-> IO ()
sendFrame Connection
c String
tx String
receipt [] String -> String -> [Header] -> Either String Frame
mkBeginF
fCommit :: Connection -> String -> String -> IO ()
fCommit :: Connection -> String -> String -> IO ()
fCommit Connection
c String
tx String
receipt = Connection
-> String
-> String
-> [Header]
-> (String -> String -> [Header] -> Either String Frame)
-> IO ()
forall a.
Connection
-> a
-> String
-> [Header]
-> (a -> String -> [Header] -> Either String Frame)
-> IO ()
sendFrame Connection
c String
tx String
receipt [] String -> String -> [Header] -> Either String Frame
mkCommitF
fAbort :: Connection -> String -> String -> IO ()
fAbort :: Connection -> String -> String -> IO ()
fAbort Connection
c String
tx String
receipt = Connection
-> String
-> String
-> [Header]
-> (String -> String -> [Header] -> Either String Frame)
-> IO ()
forall a.
Connection
-> a
-> String
-> [Header]
-> (a -> String -> [Header] -> Either String Frame)
-> IO ()
sendFrame Connection
c String
tx String
receipt [] String -> String -> [Header] -> Either String Frame
mkAbortF
fAck :: Connection -> Message a -> String -> IO ()
fAck :: Connection -> Message a -> String -> IO ()
fAck Connection
c Message a
m String
receipt = Connection
-> Message a
-> String
-> [Header]
-> (Message a -> String -> [Header] -> Either String Frame)
-> IO ()
forall a.
Connection
-> a
-> String
-> [Header]
-> (a -> String -> [Header] -> Either String Frame)
-> IO ()
sendFrame Connection
c Message a
m String
receipt [] (Bool -> Message a -> String -> [Header] -> Either String Frame
forall a.
Bool -> Message a -> String -> [Header] -> Either String Frame
mkAckF Bool
True)
fNack :: Connection -> Message a -> String -> IO ()
fNack :: Connection -> Message a -> String -> IO ()
fNack Connection
c Message a
m String
receipt = Connection
-> Message a
-> String
-> [Header]
-> (Message a -> String -> [Header] -> Either String Frame)
-> IO ()
forall a.
Connection
-> a
-> String
-> [Header]
-> (a -> String -> [Header] -> Either String Frame)
-> IO ()
sendFrame Connection
c Message a
m String
receipt [] (Bool -> Message a -> String -> [Header] -> Either String Frame
forall a.
Bool -> Message a -> String -> [Header] -> Either String Frame
mkAckF Bool
False)
fSubscribe :: Connection -> Subscription -> String -> [F.Header] -> IO ()
fSubscribe :: Connection -> Subscription -> String -> [Header] -> IO ()
fSubscribe Connection
c Subscription
sub String
receipt [Header]
hs = Connection
-> Subscription
-> String
-> [Header]
-> (Subscription -> String -> [Header] -> Either String Frame)
-> IO ()
forall a.
Connection
-> a
-> String
-> [Header]
-> (a -> String -> [Header] -> Either String Frame)
-> IO ()
sendFrame Connection
c Subscription
sub String
receipt [Header]
hs Subscription -> String -> [Header] -> Either String Frame
mkSubF
fUnsubscribe :: Connection -> Subscription -> String -> [F.Header] -> IO ()
fUnsubscribe :: Connection -> Subscription -> String -> [Header] -> IO ()
fUnsubscribe Connection
c Subscription
sub String
receipt [Header]
hs = Connection
-> Subscription
-> String
-> [Header]
-> (Subscription -> String -> [Header] -> Either String Frame)
-> IO ()
forall a.
Connection
-> a
-> String
-> [Header]
-> (a -> String -> [Header] -> Either String Frame)
-> IO ()
sendFrame Connection
c Subscription
sub String
receipt [Header]
hs Subscription -> String -> [Header] -> Either String Frame
mkUnSubF
fSend :: Connection -> Message a -> String -> [F.Header] -> IO ()
fSend :: Connection -> Message a -> String -> [Header] -> IO ()
fSend Connection
c Message a
msg String
receipt [Header]
hs = Connection
-> Message a
-> String
-> [Header]
-> (Message a -> String -> [Header] -> Either String Frame)
-> IO ()
forall a.
Connection
-> a
-> String
-> [Header]
-> (a -> String -> [Header] -> Either String Frame)
-> IO ()
sendFrame Connection
c Message a
msg String
receipt [Header]
hs Message a -> String -> [Header] -> Either String Frame
forall a. Message a -> String -> [Header] -> Either String Frame
mkSendF
fSendBeat :: Connection -> IO ()
fSendBeat :: Connection -> IO ()
fSendBeat Connection
c = Connection
-> ()
-> String
-> [Header]
-> (() -> String -> [Header] -> Either String Frame)
-> IO ()
forall a.
Connection
-> a
-> String
-> [Header]
-> (a -> String -> [Header] -> Either String Frame)
-> IO ()
sendFrame Connection
c () String
"" [] (\()
_ String
_ [Header]
_ -> Frame -> Either String Frame
forall a b. b -> Either a b
Right Frame
F.mkBeat)
sendFrame :: Connection -> a -> String -> [F.Header] ->
(a -> String -> [F.Header] -> Either String F.Frame) -> IO ()
sendFrame :: Connection
-> a
-> String
-> [Header]
-> (a -> String -> [Header] -> Either String Frame)
-> IO ()
sendFrame Connection
c a
m String
receipt [Header]
hs a -> String -> [Header] -> Either String Frame
mkF =
if Bool -> Bool
not (Connection -> Bool
connected Connection
c) then EH
forall e a. Exception e => e -> IO a
throwIO EH -> EH
forall a b. (a -> b) -> a -> b
$ String -> StomplException
ConnectException String
"Not connected!"
else case a -> String -> [Header] -> Either String Frame
mkF a
m String
receipt [Header]
hs of
Left String
e -> EH
forall e a. Exception e => e -> IO a
throwIO EH -> EH
forall a b. (a -> b) -> a -> b
$ String -> StomplException
ProtocolException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$
String
"Cannot create Frame: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
e
Right Frame
f ->
#ifdef _DEBUG
do when (not $ F.complies (1,2) f) $
putStrLn $ "Frame does not comply with 1.2: " ++ show f
#endif
Chan Frame -> Frame -> IO ()
forall a. Chan a -> a -> IO ()
writeChan (Connection -> Chan Frame
conChn Connection
c) Frame
f
mkReceipt :: String -> [F.Header]
mkReceipt :: String -> [Header]
mkReceipt String
receipt = if String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null String
receipt then [] else [String -> Header
F.mkRecHdr String
receipt]
mkConF :: ([F.Header] -> Either String F.Frame) ->
String -> String -> String -> String ->
[F.Version] -> F.Heart -> [F.Header] -> Either String F.Frame
mkConF :: ([Header] -> Either String Frame)
-> String
-> String
-> String
-> String
-> [Version]
-> Version
-> [Header]
-> Either String Frame
mkConF [Header] -> Either String Frame
mk String
host String
usr String
pwd String
cli [Version]
vs Version
beat [Header]
hs =
let uHdr :: [Header]
uHdr = if String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null String
usr then [] else [String -> Header
F.mkLogHdr String
usr]
pHdr :: [Header]
pHdr = if String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null String
pwd then [] else [String -> Header
F.mkPassHdr String
pwd]
cHdr :: [Header]
cHdr = if String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null String
cli then [] else [String -> Header
F.mkCliIdHdr String
cli]
in [Header] -> Either String Frame
mk ([Header] -> Either String Frame)
-> [Header] -> Either String Frame
forall a b. (a -> b) -> a -> b
$ [String -> Header
F.mkHostHdr String
host,
String -> Header
F.mkAcVerHdr (String -> Header) -> String -> Header
forall a b. (a -> b) -> a -> b
$ [Version] -> String
F.versToVal [Version]
vs,
String -> Header
F.mkBeatHdr (String -> Header) -> String -> Header
forall a b. (a -> b) -> a -> b
$ Version -> String
F.beatToVal Version
beat] [Header] -> [Header] -> [Header]
forall a. [a] -> [a] -> [a]
++
[Header]
uHdr [Header] -> [Header] -> [Header]
forall a. [a] -> [a] -> [a]
++ [Header]
pHdr [Header] -> [Header] -> [Header]
forall a. [a] -> [a] -> [a]
++ [Header]
cHdr [Header] -> [Header] -> [Header]
forall a. [a] -> [a] -> [a]
++ [Header]
hs
mkDiscF :: String -> Either String F.Frame
mkDiscF :: String -> Either String Frame
mkDiscF String
receipt =
[Header] -> Either String Frame
F.mkDisFrame ([Header] -> Either String Frame)
-> [Header] -> Either String Frame
forall a b. (a -> b) -> a -> b
$ String -> [Header]
mkReceipt String
receipt
mkSubF :: Subscription -> String -> [F.Header] -> Either String F.Frame
mkSubF :: Subscription -> String -> [Header] -> Either String Frame
mkSubF Subscription
sub String
receipt [Header]
hs =
[Header] -> Either String Frame
F.mkSubFrame ([Header] -> Either String Frame)
-> [Header] -> Either String Frame
forall a b. (a -> b) -> a -> b
$ [String -> Header
F.mkIdHdr (String -> Header) -> String -> Header
forall a b. (a -> b) -> a -> b
$ Sub -> String
forall a. Show a => a -> String
show (Sub -> String) -> Sub -> String
forall a b. (a -> b) -> a -> b
$ Subscription -> Sub
subId Subscription
sub,
String -> Header
F.mkDestHdr (String -> Header) -> String -> Header
forall a b. (a -> b) -> a -> b
$ Subscription -> String
subName Subscription
sub,
String -> Header
F.mkAckHdr (String -> Header) -> String -> Header
forall a b. (a -> b) -> a -> b
$ AckMode -> String
forall a. Show a => a -> String
show (AckMode -> String) -> AckMode -> String
forall a b. (a -> b) -> a -> b
$ Subscription -> AckMode
subMode Subscription
sub] [Header] -> [Header] -> [Header]
forall a. [a] -> [a] -> [a]
++
String -> [Header]
mkReceipt String
receipt [Header] -> [Header] -> [Header]
forall a. [a] -> [a] -> [a]
++ [Header]
hs
mkUnSubF :: Subscription -> String -> [F.Header] -> Either String F.Frame
mkUnSubF :: Subscription -> String -> [Header] -> Either String Frame
mkUnSubF Subscription
sub String
receipt [Header]
hs =
let dh :: [Header]
dh = if String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (Subscription -> String
subName Subscription
sub) then [] else [String -> Header
F.mkDestHdr (String -> Header) -> String -> Header
forall a b. (a -> b) -> a -> b
$ Subscription -> String
subName Subscription
sub]
in [Header] -> Either String Frame
F.mkUSubFrame ([Header] -> Either String Frame)
-> [Header] -> Either String Frame
forall a b. (a -> b) -> a -> b
$ [String -> Header
F.mkIdHdr (String -> Header) -> String -> Header
forall a b. (a -> b) -> a -> b
$ Sub -> String
forall a. Show a => a -> String
show (Sub -> String) -> Sub -> String
forall a b. (a -> b) -> a -> b
$ Subscription -> Sub
subId Subscription
sub] [Header] -> [Header] -> [Header]
forall a. [a] -> [a] -> [a]
++ [Header]
dh [Header] -> [Header] -> [Header]
forall a. [a] -> [a] -> [a]
++
String -> [Header]
mkReceipt String
receipt [Header] -> [Header] -> [Header]
forall a. [a] -> [a] -> [a]
++ [Header]
hs
mkSendF :: Message a -> String -> [F.Header] -> Either String F.Frame
mkSendF :: Message a -> String -> [Header] -> Either String Frame
mkSendF Message a
msg String
receipt [Header]
hs =
Frame -> Either String Frame
forall a b. b -> Either a b
Right (Frame -> Either String Frame) -> Frame -> Either String Frame
forall a b. (a -> b) -> a -> b
$ String
-> String
-> String
-> Type
-> Int
-> [Header]
-> ByteString
-> Frame
F.mkSend (Message a -> String
forall a. Message a -> String
msgDest Message a
msg) (Tx -> String
forall a. Show a => a -> String
show (Tx -> String) -> Tx -> String
forall a b. (a -> b) -> a -> b
$ Message a -> Tx
forall a. Message a -> Tx
msgTx Message a
msg) String
receipt
(Message a -> Type
forall a. Message a -> Type
msgType Message a
msg) (Message a -> Int
forall a. Message a -> Int
msgLen Message a
msg) [Header]
hs
(Message a -> ByteString
forall a. Message a -> ByteString
msgRaw Message a
msg)
mkAckF :: Bool -> Message a -> String -> [F.Header] -> Either String F.Frame
mkAckF :: Bool -> Message a -> String -> [Header] -> Either String Frame
mkAckF Bool
ok Message a
msg String
receipt [Header]
_ =
let sh :: [Header]
sh = if String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (String -> Bool) -> String -> Bool
forall a b. (a -> b) -> a -> b
$ Sub -> String
forall a. Show a => a -> String
show (Sub -> String) -> Sub -> String
forall a b. (a -> b) -> a -> b
$ Message a -> Sub
forall a. Message a -> Sub
msgSub Message a
msg then []
else [String -> Header
F.mkSubHdr (String -> Header) -> String -> Header
forall a b. (a -> b) -> a -> b
$ Sub -> String
forall a. Show a => a -> String
show (Sub -> String) -> Sub -> String
forall a b. (a -> b) -> a -> b
$ Message a -> Sub
forall a. Message a -> Sub
msgSub Message a
msg]
th :: [Header]
th = if String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (String -> Bool) -> String -> Bool
forall a b. (a -> b) -> a -> b
$ Tx -> String
forall a. Show a => a -> String
show (Tx -> String) -> Tx -> String
forall a b. (a -> b) -> a -> b
$ Message a -> Tx
forall a. Message a -> Tx
msgTx Message a
msg
then [] else [String -> Header
F.mkTrnHdr (String -> Header) -> String -> Header
forall a b. (a -> b) -> a -> b
$ Tx -> String
forall a. Show a => a -> String
show (Tx -> String) -> Tx -> String
forall a b. (a -> b) -> a -> b
$ Message a -> Tx
forall a. Message a -> Tx
msgTx Message a
msg]
rh :: [Header]
rh = String -> [Header]
mkReceipt String
receipt
mk :: [Header] -> Either String Frame
mk = if Bool
ok then [Header] -> Either String Frame
F.mkAckFrame else [Header] -> Either String Frame
F.mkNackFrame
in [Header] -> Either String Frame
mk ([Header] -> Either String Frame)
-> [Header] -> Either String Frame
forall a b. (a -> b) -> a -> b
$ String -> Header
F.mkIdHdr (Message a -> String
forall a. Message a -> String
msgAck Message a
msg) Header -> [Header] -> [Header]
forall a. a -> [a] -> [a]
: ([Header]
sh [Header] -> [Header] -> [Header]
forall a. [a] -> [a] -> [a]
++ [Header]
rh [Header] -> [Header] -> [Header]
forall a. [a] -> [a] -> [a]
++ [Header]
th)
mkBeginF :: String -> String -> [F.Header] -> Either String F.Frame
mkBeginF :: String -> String -> [Header] -> Either String Frame
mkBeginF String
tx String
receipt [Header]
_ =
[Header] -> Either String Frame
F.mkBgnFrame ([Header] -> Either String Frame)
-> [Header] -> Either String Frame
forall a b. (a -> b) -> a -> b
$ String -> Header
F.mkTrnHdr String
tx Header -> [Header] -> [Header]
forall a. a -> [a] -> [a]
: String -> [Header]
mkReceipt String
receipt
mkCommitF :: String -> String -> [F.Header] -> Either String F.Frame
mkCommitF :: String -> String -> [Header] -> Either String Frame
mkCommitF String
tx String
receipt [Header]
_ =
[Header] -> Either String Frame
F.mkCmtFrame ([Header] -> Either String Frame)
-> [Header] -> Either String Frame
forall a b. (a -> b) -> a -> b
$ String -> Header
F.mkTrnHdr String
tx Header -> [Header] -> [Header]
forall a. a -> [a] -> [a]
: String -> [Header]
mkReceipt String
receipt
mkAbortF :: String -> String -> [F.Header] -> Either String F.Frame
mkAbortF :: String -> String -> [Header] -> Either String Frame
mkAbortF String
tx String
receipt [Header]
_ =
[Header] -> Either String Frame
F.mkAbrtFrame ([Header] -> Either String Frame)
-> [Header] -> Either String Frame
forall a b. (a -> b) -> a -> b
$ String -> Header
F.mkTrnHdr String
tx Header -> [Header] -> [Header]
forall a. a -> [a] -> [a]
: String -> [Header]
mkReceipt String
receipt