{-# Language CPP #-}
-------------------------------------------------------------------------------
-- |
-- Module     : Network/Mom/Stompl/Client/Queue.hs
-- Copyright  : (c) Tobias Schoofs
-- License    : LGPL 
-- Stability  : experimental
-- Portability: portable
--
-- The Stomp Protocol specifies message-oriented interoperability.
-- Applications connect to a message broker to send (publish)
-- or receive (subscribe) messages through queues. 
-- Interoperating applications do not know 
-- the location or internal structure of each other.
-- They only see interfaces, /i.e./ the messages
-- published and subscribed through the broker.
-- 
-- The Stompl Client library implements
-- a Stomp client using abstractions like
-- 'Connection', 'Transaction' and
-- queues in terms of 'Reader' and 'Writer'.
-------------------------------------------------------------------------------
module Network.Mom.Stompl.Client.Queue (
                   -- * Connections
                   -- $stomp_con
                   withConnection, 
                   Fac.Con, 
                   F.Heart,
                   Copt(..),
                   EHandler,
                   -- * Queues
                   -- $stomp_queues
                   Reader, Writer, 
                   newReader, destroyReader, newWriter, destroyWriter,
                   withReader, withWriter, withPair, ReaderDesc, WriterDesc,
                   Qopt(..), F.AckMode(..), 
                   InBound, OutBound, 
                   readQ, 
                   writeQ, writeQWith,
                   writeAdHoc, writeAdHocWith,
                   -- * Messages
                   Message, 
                   msgContent, msgRaw, 
                   msgType, msgLen, msgHdrs,
                   -- * Receipts
                   -- $stomp_receipts
                   Fac.Rec(..), Receipt,
                   waitReceipt,
                   -- * Transactions
                   -- $stomp_trans
                   Tx,
                   withTransaction,
                   Topt(..), abort, 
                   -- * Acknowledgement
                   -- $stomp_acks
                   ack, ackWith, nack, nackWith,
#ifdef TEST
                   frmToMsg, msgAck,
#endif
                   -- * Exceptions
                   module Network.Mom.Stompl.Client.Exception
                   -- * Complete Example
                   -- $stomp_sample
                   )

where

  import           Network.Mom.Stompl.Client.Stream
  import           Network.Mom.Stompl.Client.Factory as Fac
  import           Network.Mom.Stompl.Client.State

  import qualified Network.Mom.Stompl.Frame as F
  import           Network.Mom.Stompl.Client.Exception

  import qualified Data.ByteString.Char8 as B
  import qualified Data.ByteString.UTF8  as U
  import           Data.List (find)
  import           Data.Time.Clock
  import           Data.Maybe (isNothing)
  import           Data.Conduit.Network (AppData)
  import           Data.Conduit.Network.TLS

  import           Control.Concurrent 
  import           Control.Applicative ((<$>))
  import           Control.Monad (when, unless, forever)
  import           Control.Exception (bracket, finally, catches, onException,
                                      AsyncException(..), Handler(..),
                                      throwIO, SomeException)

  import           Codec.MIME.Type as Mime (Type)
  import           System.Timeout (timeout)

  {- $stomp_con

     The Stomp protocol is connection-oriented and
     usually implemented on top of /TCP/\//IP/.
     The client initialises the connection
     by sending a /connect/ message which is answered
     by the broker by confirming or rejecting the connection.
     The connection is authenticated by user and passcode.
     The authentication mechanism, however, 
     varies among brokers.

     During the connection phase,
     the protocol version and a heartbeat 
     that defines the frequency of /alive/ messages
     exchanged between broker and client
     are negotiated.

     The connection remains active until either the client
     disconnects voluntarily or the broker disconnects
     in consequence of a protocol error.

     The details of the connection, including 
     protocol version and heartbeats are handled
     internally by the Stompl Client library.
  -}

  {- $stomp_queues

     Stomp program interoperability is based on queues.
     Queues are communication channels of arbitrary size
     that may be written by any client 
     currently connected to the broker.
     Messages in the queue are stored in /FIFO/ order.
     The process of adding messages to the queue is called /send/.
     In order to read from a queue, a client has to /subscribe/ to it.
     After having subscribed to a queue, 
     a client will receive all message sent to it.
     Brokers may implement additional selection criteria
     by means of /selectors/ that are expressed in some
     query language, such as /SQL/ or /XPath/.

     There are two different flavours of queues
     distinguished by their communication pattern:
     Queues are either one-to-one channels,
     this is, a message published in this queue
     is sent to exactly one subscriber
     and then removed from it; 
     or queues may be one-to-many,
     /i.e./, a message published in this queue
     is sent to all current subscribers of the queue.
     This type of queues is sometimes called /topic/.
     Which pattern is supported 
     and how patterns are controlled, depends on the broker. 

     From the perspective of the Stomp protocol,
     the content of messages in a queue
     has no format.
     The Protocol describes only those aspects of messages
     that are related to their handling;
     this can be seen as a /syntactic/ level of interoperability.
     Introducing meaning to message contents
     is entirely left to applications.
     Message- or service-oriented frameworks,
     usually, define formats and encodings 
     to describe messages and higher-level
     communication patterns built on top of them,
     to add more /syntactic/ formalism or
     to raise interoperability
     to a /semantic/ or even /pragmatic/ level.

     The Stompl library stresses the importance
     of adding meaning to the message content
     by adding types to queues. 
     From the perspective of the client Haskell program,
     a queue is a communication channel
     that allows sending and receiving messages of a given type.
     This adds type-safety to Stompl queues, which,
     otherwise, would just return plain bytes.
     It is, on the other hand, always possible
     to ignore this feature by declaring queues
     as '()' or 'B.ByteString'.
     In the first case, the /raw/ bytestring
     may be read from the 'Message';
     in the second case, the contents of the 'Message'
     will be a 'B.ByteString'.

     In the Stompl library, queues 
     are unidirectional communication channels
     either for reading or writing.
     This is captured by implementing queues
     with two different data types,
     a 'Reader' and a 'Writer'.
     On creating a queue a set of parameters can be defined
     to control the behaviour of the queue.
  -}

  {- $stomp_receipts

     Receipts are identifiers unique during the lifetime
     of an application; receipts can be added to all kinds of
     messages sent to the broker.
     The broker, in its turn, uses receipts to acknowledge received messages.
     Receipts, hence, are useful to make a session more reliable.
     When the broker has confirmed the receipt of a frame sent to it,
     the client application can be sure that it has arrived.
     What kind of additional guarantees are made,
     /e.g./ that the frame is saved to disk or has already been sent
     to the subscriber(s), depends on the broker.

     Receipts are handled internally by the library.
     The application, however, decides where receipts should
     be requested, /i.e./ on subcribing to a queue,
     on sending a message, on sending /acks/ and on
     starting and ending transactions.
     On sending messages, 
     receipt handling can be made explict.
     The function 'writeQWith'
     requests a receipt for the message it sends
     and returns it to the caller.
     The application can then, later,
     explicitly wait for the receipt, using 'waitReceipt'.
     Otherwise, receipt handling remains
     inivisible in the application code.
  -}

  {- $stomp_trans

     Transactions are units of interactions with
     a Stomp broker, including sending messages to queues
     and acknowledging the receipt of messages.
     All messages sent during a transaction
     are buffered in the broker.
     Only when the application terminates the transaction
     with /commit/ the messages will be eventually processed.
     If an error occurs during the transaction,
     it can be /aborted/ by the client. 
     Transactions, in consequence, can be used
     to ensure atomicity,
     /i.e./ either all single steps are performed or 
     no step is performed.

     In the Stompl Client library, transactions
     are sequences of Stompl actions, 
     queue operations as well as nested transactions,
     that are committed at the end or aborted,
     whenever an error condition becomes true.
     Error conditions are uncaught exceptions
     and conditions defined by options passed
     to the transaction, for example
     that all receipts requested during the transaction,
     have been confirmed by the broker.

     To enforce atomicity, 
     threads are not allowed to share transactions.
  -}

  {- $stomp_acks

     Acknowledgements are used by the client to confirm the receipt
     of a message. The Stomp protocol foresees three different
     acknowledgement modes, defined when the client subscribes to a queues.
     A subscription may use 
     /auto mode/, /i.e./ a message is considered acknowledged
     when it has been sent to the subscriber;
     /client mode/, /i.e./ a message is considered acknowledged
     only when an /ack/ message has been sent back from the client.
     Note that client mode is cumulative, that means, 
     the broker will consider all messages acknowledged 
     that have been sent
     from the previous ack up to the acknowledged message;
     or /client-individual mode/, /i.e./ non-cumulative
     client mode.
     
     A message may also be /negatively acknowledged/ (/nack/). 
     How the broker handles a /nack/, however,
     is not further specified by the Stomp protocol.
     Some brokers respond with an error frame and
     close the connection immediately after.
     This may cause a surprising cascade of exceptions!
  -}

  {- $stomp_sample

     > import Network.Mom.Stompl.Client.Queue
     >
     > import System.Environment (getArgs)
     > import Network.Socket (withSocketsDo)
     > import Control.Monad (forever)
     > import Control.Concurrent (threadDelay)
     > import qualified Data.ByteString.UTF8  as U
     > import Data.Char(toUpper)
     > import Codec.MIME.Type (nullType)
     >
     > main :: IO ()
     > main = do
     >   os <- getArgs
     >   case os of
     >     [q] -> withSocketsDo $ ping q
     >     _   -> putStrLn "I need a queue name!"
     >            -- error handling...
     > 
     > data Ping = Ping | Pong
     >   deriving (Show)
     >
     > strToPing :: String -> IO Ping
     > strToPing s = case map toUpper s of
     >                 "PING" -> return Ping
     >                 "PONG" -> return Pong
     >                 _      -> convertError $ "Not a Ping: '" ++ s ++ "'"
     >
     > ping :: String -> IO ()
     > ping qn = 
     >   withConnection "localhost" 61613 [] [] $ \c -> do
     >     let iconv _ _ _ = strToPing . U.toString
     >     let oconv       = return    . U.fromString . show
     >     withReader   c "Q-IN"  qn [] [] iconv $ \inQ ->
     >       withWriter c "Q-OUT" qn [] [] oconv $ \outQ -> do
     >         writeQ outQ nullType [] Pong
     >         listen inQ outQ
     >
     > listen  :: Reader Ping -> Writer Ping -> IO ()
     > listen iQ oQ = forever $ do
     >   eiM <- try $ readQ iQ 
     >   case eiM of
     >     Left  e -> do
     >       putStrLn $ "Error: " ++ show e
     >       -- error handling ...
     >     Right m -> do
     >       let p = case msgContent m of
     >                 Ping -> Pong
     >                 Pong -> Ping
     >       print p
     >       writeQ oQ nullType [] p
     >       threadDelay 10000
  -}

  -- The versions, we support
  vers :: [F.Version]
  vers :: [Version]
vers = [(Int
1,Int
0), (Int
1,Int
1), (Int
1,Int
2)]

  ------------------------------------------------------------------------
  -- | Initialises a connection and executes an 'IO' action.
  --   The connection lifetime is the scope of this action.
  --   The connection handle, 'Con', that is passed to the action
  --   should not be returned from 'withConnection'.
  --   Connections, however, can be shared among threads.
  --   In this case, the programmer has to take care
  --   not to terminate the action before all other threads
  --   working on the connection have finished.
  --
  --   Since 'Connection' is a heavy data type,
  --   you should try to reduce the number of connections
  --   to the same broker within the same process - 
  --   there is ideally only one connection per broker
  --   in one process.
  --
  --   Paramter:
  --
  --   * 'String': The broker's hostname or IP-address
  --
  --   * 'Int': The broker's port
  --
  --   * 'Copt': Control options passed to the connection
  --             (including user/password)
  --
  --   * 'Header': List of additional, broker-specific headers
  --
  --   * ('Con' -> 'IO' a): The action to execute.
  --                        The action receives the connection handle
  --                        and returns a value of type /a/ 
  --                        in the 'IO' monad.
  --
  -- 'withConnection' returns the result of the action passed into it.
  --
  -- 'withConnection' will always disconnect from the broker 
  -- when the action has terminated, even if an exception is raised.
  --
  -- Example:
  --
  -- > withConnection "localhost" 61613 [] [] $ \c -> do
  --
  -- This would connect to a broker listening to the loopback interface,
  -- port number 61613.
  -- The action is defined after the /hanging do/.
  --
  -- Internally, connections use concurrent threads;
  -- errors are communicated by throwing exceptions
  -- to the owner of the connection, where
  -- the owner is the thread that created the connection
  -- calling 'withConnection'.
  -- It is therefore advisable to start different connections
  -- in different threads, so that each thread will receive
  -- only exceptions related to the connection it has opened.
  ------------------------------------------------------------------------
  withConnection :: String -> Int -> [Copt] -> [F.Header] ->
                    (Con -> IO a) -> IO a
  withConnection :: String -> Int -> [Copt] -> [Header] -> (Con -> IO a) -> IO a
withConnection String
host Int
port [Copt]
os [Header]
hs Con -> IO a
act = do
    let beat :: Version
beat  = [Copt] -> Version
oHeartBeat [Copt]
os
    let mx :: Int
mx    = [Copt] -> Int
oMaxRecv   [Copt]
os
    let (String
u,String
p) = [Copt] -> Header
oAuth      [Copt]
os
    let (String
ci)  = [Copt] -> String
oCliId     [Copt]
os
    let tm :: Int
tm    = [Copt] -> Int
oTmo       [Copt]
os
    let cfg :: TLSClientConfig
cfg   = String -> Int -> [Copt] -> TLSClientConfig
oTLS       String
host Int
port [Copt]
os
    let t :: FrameType
t | [Copt] -> Bool
oStomp [Copt]
os = FrameType
F.Stomp
          | Bool
otherwise = FrameType
F.Connect
    TLSClientConfig -> (AppData -> IO a) -> IO a
forall (m :: * -> *) a.
MonadUnliftIO m =>
TLSClientConfig -> (AppData -> m a) -> m a
runTLSClient TLSClientConfig
cfg ((AppData -> IO a) -> IO a) -> (AppData -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \AppData
ad -> do
      Con
cid <- IO Con
mkUniqueConId  -- connection id
      ThreadId
me  <- IO ThreadId
myThreadId     -- connection owner
      UTCTime
now <- IO UTCTime
getCurrentTime -- heartbeat
      Chan Frame
ch  <- IO (Chan Frame)
forall a. IO (Chan a)
newChan        -- sender
      IO Connection
-> (Connection -> IO ()) -> (Connection -> IO a) -> IO a
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (do Connection -> IO ()
addCon (Connection -> IO ()) -> Connection -> IO ()
forall a b. (a -> b) -> a -> b
$ (Con
-> String
-> Int
-> Int
-> String
-> String
-> String
-> [Version]
-> Version
-> Chan Frame
-> ThreadId
-> UTCTime
-> [Copt]
-> Connection
mkConnection Con
cid String
host Int
port 
                                             Int
mx   String
u String
p String
ci 
                                             [Version]
vers Version
beat Chan Frame
ch 
                                             ThreadId
me UTCTime
now [Copt]
os)
                  Con -> IO Connection
getCon Con
cid)
              (\Connection
_ -> Con -> IO ()
rmCon Con
cid) ((Connection -> IO a) -> IO a) -> (Connection -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Connection
c -> 
        Connection -> AppData -> Chan Frame -> IO a -> IO a
forall a. Connection -> AppData -> Chan Frame -> IO a -> IO a
withSender Connection
c AppData
ad Chan Frame
ch (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ do
          MVar ()
w <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
          Connection -> AppData -> Con -> MVar () -> IO a -> IO a
forall a. Connection -> AppData -> Con -> MVar () -> IO a -> IO a
withListener Connection
c AppData
ad Con
cid MVar ()
w (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ do
            FrameType
-> [Version] -> Version -> [Header] -> Connection -> IO ()
connectBroker FrameType
t [Version]
vers Version
beat [Header]
hs Connection
c
            Con -> Int -> Version -> MVar () -> (Con -> IO a) -> IO a
forall a. Con -> Int -> Version -> MVar () -> (Con -> IO a) -> IO a
whenConnected Con
cid Int
tm Version
beat MVar ()
w Con -> IO a
act 

  ---------------------------------------------------------------------
  -- Start the sender threat
  ---------------------------------------------------------------------
  withSender :: Connection -> AppData -> Chan F.Frame -> IO a -> IO a
  withSender :: Connection -> AppData -> Chan Frame -> IO a -> IO a
withSender Connection
c AppData
ad Chan Frame
ch = Connection -> String -> IO () -> IO a -> IO a
forall r. Connection -> String -> IO () -> IO r -> IO r
withThread Connection
c String
"sender" (AppData -> Chan Frame -> IO ()
sender AppData
ad Chan Frame
ch) 

  ---------------------------------------------------------------------
  -- Start the listener threat
  ---------------------------------------------------------------------
  withListener :: Connection -> AppData -> Con -> MVar () -> IO a -> IO a
  withListener :: Connection -> AppData -> Con -> MVar () -> IO a -> IO a
withListener Connection
c AppData
ad Con
cid MVar ()
m = Connection -> String -> IO () -> IO a -> IO a
forall r. Connection -> String -> IO () -> IO r -> IO r
withThread Connection
c String
"listener" (AppData -> Con -> MVar () -> IO ()
listen AppData
ad Con
cid MVar ()
m) 

  -----------------------------------------------------------------------
  -- Connection listener
  -----------------------------------------------------------------------
  listen :: AppData -> Con -> MVar () -> IO ()
  listen :: AppData -> Con -> MVar () -> IO ()
listen AppData
ad Con
cid MVar ()
w = do 
    Connection
c   <- Con -> IO Connection
getCon Con
cid
    Chan Frame
ch  <- IO (Chan Frame)
forall a. IO (Chan a)
newChan
    Connection -> String -> IO () -> IO () -> IO ()
forall r. Connection -> String -> IO () -> IO r -> IO r
withThread Connection
c String
"receiver" (AppData -> Chan Frame -> EH -> IO ()
receiver AppData
ad Chan Frame
ch (Connection -> EH
throwToOwner Connection
c)) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
      IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        Frame
f <- Chan Frame -> IO Frame
forall a. Chan a -> IO a
readChan Chan Frame
ch
        Con -> IO ()
logReceive Con
cid
        case Frame -> FrameType
F.typeOf Frame
f of
          FrameType
F.Connected -> Con -> Frame -> MVar () -> IO ()
handleConnected Con
cid Frame
f MVar ()
w
          FrameType
F.Message   -> Con -> Frame -> IO ()
handleMessage   Con
cid Frame
f
          FrameType
F.Error     -> Con -> Frame -> IO ()
handleError     Con
cid Frame
f
          FrameType
F.Receipt   -> Con -> Frame -> IO ()
handleReceipt   Con
cid Frame
f
          FrameType
F.HeartBeat -> Con -> Frame -> IO ()
handleBeat      Con
cid Frame
f
          FrameType
_           -> Connection -> EH
throwToOwner Connection
c EH -> EH
forall a b. (a -> b) -> a -> b
$ 
                             String -> StomplException
ProtocolException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$ String
"Unexpected Frame: " String -> String -> String
forall a. [a] -> [a] -> [a]
++
                             FrameType -> String
forall a. Show a => a -> String
show (Frame -> FrameType
F.typeOf Frame
f)

  ---------------------------------------------------------------------
  -- Send the Connect frame
  ---------------------------------------------------------------------
  connectBroker :: F.FrameType -> 
                   [F.Version] -> F.Heart -> [F.Header] ->
                   Connection -> IO ()
  connectBroker :: FrameType
-> [Version] -> Version -> [Header] -> Connection -> IO ()
connectBroker FrameType
t [Version]
vs Version
beat [Header]
hs Connection
c = 
    let mk :: [Header] -> Either String Frame
mk = case FrameType
t of
               FrameType
F.Connect -> [Header] -> Either String Frame
F.mkConFrame
               FrameType
F.Stomp   -> [Header] -> Either String Frame
F.mkStmpFrame
               FrameType
_         -> String -> [Header] -> Either String Frame
forall a. HasCallStack => String -> a
error String
"Ouch: Unknown Connect-type"
     in case ([Header] -> Either String Frame)
-> String
-> String
-> String
-> String
-> [Version]
-> Version
-> [Header]
-> Either String Frame
mkConF [Header] -> Either String Frame
mk
                (Connection -> String
conAddr Connection
c) 
                (Connection -> String
conUsr  Connection
c) (Connection -> String
conPwd Connection
c) 
                (Connection -> String
conCli  Connection
c) [Version]
vs Version
beat [Header]
hs of
          Left String
e  -> EH
forall e a. Exception e => e -> IO a
throwIO (String -> StomplException
ConnectException String
e)
          Right Frame
f -> Chan Frame -> Frame -> IO ()
forall a. Chan a -> a -> IO ()
writeChan (Connection -> Chan Frame
conChn Connection
c) Frame
f 

  ---------------------------------------------------------------------
  -- Waiting for the Connected frame,
  -- starting the user application and
  -- disconnecting after
  ---------------------------------------------------------------------
  whenConnected :: Con -> Int -> F.Heart -> MVar () -> (Con -> IO a) -> IO a
  whenConnected :: Con -> Int -> Version -> MVar () -> (Con -> IO a) -> IO a
whenConnected Con
cid Int
tm Version
bt MVar ()
m Con -> IO a
act = do
    Maybe ()
mbT <- if Int
tm Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 then () -> Maybe ()
forall a. a -> Maybe a
Just (() -> Maybe ()) -> IO () -> IO (Maybe ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
m 
                      else Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
timeout (Int -> Int
ms Int
tm) (MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
m)
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe () -> Bool
forall a. Maybe a -> Bool
isNothing Maybe ()
mbT) (EH
forall e a. Exception e => e -> IO a
throwIO EH -> EH
forall a b. (a -> b) -> a -> b
$ String -> StomplException
ConnectException String
"Timeout expired!")
    Connection
c <- Con -> IO Connection
getCon Con
cid
    if Connection -> Int
period Connection
c Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0 
      then if Connection -> Int
period Connection
c Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Version -> Int
forall a b. (a, b) -> a
fst Version
bt
           then StomplException -> IO a
forall e a. Exception e => e -> IO a
throwIO (String -> StomplException
ConnectException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$ String
"Beat frequency too high: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ 
                                            Int -> String
forall a. Show a => a -> String
show (Connection -> Int
period Connection
c))
           else Connection -> String -> IO () -> IO a -> IO a
forall r. Connection -> String -> IO () -> IO r -> IO r
withThread Connection
c String
"heartbeat" (Con -> Int -> IO ()
heartBeat Con
cid (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection -> Int
period Connection
c) (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ Connection -> IO a
go Connection
c
      else Connection -> IO a
go Connection
c
    where  go :: Connection -> IO a
go Connection
c = do
             MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
m () 
             Con -> Connection -> IO ()
updCon Con
cid Connection
c {conBrk :: Bool
conBrk = Bool
True} 
             IO a -> IO () -> IO a
forall a b. IO a -> IO b -> IO a
finally (Con -> IO a
act Con
cid) (do
               Connection
c' <- Con -> IO Connection
getCon Con
cid
               Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Connection -> Int
conWait Connection
c' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do -- wait for receipt
                 Rec
rc <- IO Rec
mkUniqueRecc 
                 Connection -> String -> IO ()
disconnect Connection
c' (Rec -> String
forall a. Show a => a -> String
show Rec
rc)
                 Con -> Rec -> IO ()
addRec Con
cid Rec
rc
                 Con -> Rec -> Int -> IO ()
waitCon Con
cid Rec
rc (Connection -> Int
conWait Connection
c') IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
`onException` Con -> Rec -> IO ()
rmRec Con
cid Rec
rc
               -- if we not have waited for a receipt,
               --    wait now for error handling to terminate
               Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Connection -> Int
conWait Connection
c' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 Bool -> Bool -> Bool
&& Connection -> Int
conWaitE Connection
c' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
                 Int -> IO ()
threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> Int
ms (Connection -> Int
conWaitE Connection
c'))
           period :: Connection -> Int
period = Version -> Int
forall a b. (a, b) -> b
snd (Version -> Int) -> (Connection -> Version) -> Connection -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> Version
conBeat

  ---------------------------------------------------------------------
  -- disconnect from the broker
  ---------------------------------------------------------------------
  disconnect :: Connection -> String -> IO ()
  disconnect :: Connection -> String -> IO ()
disconnect Connection
c String
r
    | Connection -> Bool
conBrk Connection
c  = case String -> Either String Frame
mkDiscF String
r of
                    Left  String
e -> EH
forall e a. Exception e => e -> IO a
throwIO (String -> StomplException
ConnectException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$
                                   String
"Cannot create Disconnect Frame: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
e)
                    Right Frame
f -> Chan Frame -> Frame -> IO ()
forall a. Chan a -> a -> IO ()
writeChan (Connection -> Chan Frame
conChn Connection
c) Frame
f 
    | Bool
otherwise = EH
forall e a. Exception e => e -> IO a
throwIO (String -> StomplException
ConnectException String
"Not connected")

  ------------------------------------------------------------------------
  -- wait for receipt on disconnect
  ------------------------------------------------------------------------
  waitCon :: Con -> Receipt -> Int -> IO ()
  waitCon :: Con -> Rec -> Int -> IO ()
waitCon Con
cid Rec
rc Int
delay = do
    Connection
c <- Con -> IO Connection
getCon Con
cid
    case (Rec -> Bool) -> [Rec] -> Maybe Rec
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (Rec -> Rec -> Bool
forall a. Eq a => a -> a -> Bool
==Rec
rc) ([Rec] -> Maybe Rec) -> [Rec] -> Maybe Rec
forall a b. (a -> b) -> a -> b
$ Connection -> [Rec]
conRecs Connection
c of
      Maybe Rec
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
      Just Rec
_  -> 
        if Int
delay Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 
          then EH
forall e a. Exception e => e -> IO a
throwIO EH -> EH
forall a b. (a -> b) -> a -> b
$ String -> StomplException
ConnectException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$ 
                            String
"No receipt on disconnect (" String -> String -> String
forall a. [a] -> [a] -> [a]
++ 
                            Con -> String
forall a. Show a => a -> String
show Con
cid String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
")."
          else do Int -> IO ()
threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> Int
ms Int
1
                  Con -> Rec -> Int -> IO ()
waitCon Con
cid Rec
rc (Int
delay Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)

  -----------------------------------------------------------------------
  -- Starting the internal worker threads:
  -- - the new thread lives as long as the user action lives
  -- - if the new thread terminates early, the owner is signalled
  -- - if the new thread receives a synchronous exception,
  --      the owner is signalled
  -----------------------------------------------------------------------
  withThread :: Connection -> String -> IO () -> IO r -> IO r
  withThread :: Connection -> String -> IO () -> IO r -> IO r
withThread Connection
c String
nm IO ()
th IO r
act = do
    MVar ()
m   <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
    MVar Bool
x   <- Bool -> IO (MVar Bool)
forall a. a -> IO (MVar a)
newMVar Bool
False
    ThreadId
tid <- IO () -> IO ThreadId
forkIO (MVar Bool -> IO ()
theThread MVar Bool
x IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
`finally` MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
m ())
    (IO r
act IO r -> (r -> IO r) -> IO r
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= MVar Bool -> r -> IO r
forall b. MVar Bool -> b -> IO b
signalOk MVar Bool
x) IO r -> IO () -> IO r
forall a b. IO a -> IO b -> IO a
`finally` (ThreadId -> IO ()
killThread ThreadId
tid IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
m)
    where theThread :: MVar Bool -> IO ()
theThread MVar Bool
x = (IO ()
th IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MVar Bool -> IO ()
signalEnd MVar Bool
x) IO () -> [Handler ()] -> IO ()
forall a. IO a -> [Handler a] -> IO a
`catches` [Handler ()]
hndls
          hndls :: [Handler ()]
hndls = [(AsyncException -> IO ()) -> Handler ()
forall a e. Exception e => (e -> IO a) -> Handler a
Handler (\AsyncException
e -> case AsyncException
e of
                                    AsyncException
ThreadKilled -> AsyncException -> IO ()
forall e a. Exception e => e -> IO a
throwIO AsyncException
e
                                    AsyncException
_            -> AsyncException -> IO ()
forall e a. Exception e => e -> IO a
throwIO AsyncException
e),
                   (SomeException -> IO ()) -> Handler ()
forall a e. Exception e => (e -> IO a) -> Handler a
Handler (\SomeException
e -> Connection -> EH
throwToOwner Connection
c EH -> EH
forall a b. (a -> b) -> a -> b
$ 
                                    String -> StomplException
WorkerException (
                                      String
nm String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" terminated: " String -> String -> String
forall a. [a] -> [a] -> [a]
++
                                      SomeException -> String
forall a. Show a => a -> String
show (SomeException
e::SomeException)))
                  ]
          signalEnd :: MVar Bool -> IO ()
signalEnd MVar Bool
x = do Bool
t <- MVar Bool -> IO Bool
forall a. MVar a -> IO a
readMVar MVar Bool
x
                           Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
t (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection -> EH
throwToOwner Connection
c EH -> EH
forall a b. (a -> b) -> a -> b
$ String -> StomplException
WorkerException (
                                                         String
nm String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" terminated")
          signalOk :: MVar Bool -> b -> IO b
signalOk MVar Bool
x b
r = MVar Bool -> (Bool -> IO (Bool, b)) -> IO b
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar Bool
x ((Bool -> IO (Bool, b)) -> IO b) -> (Bool -> IO (Bool, b)) -> IO b
forall a b. (a -> b) -> a -> b
$ \Bool
_ -> (Bool, b) -> IO (Bool, b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
True,b
r)

  -----------------------------------------------------------------------
  -- Throw to owner
  -- important: give the owner some time to react
  --            before continuing and - probably - 
  --            causing another exception
  -----------------------------------------------------------------------
  throwToOwner :: Connection -> StomplException -> IO ()
  throwToOwner :: Connection -> EH
throwToOwner Connection
c = ThreadId -> EH
forall e. Exception e => ThreadId -> e -> IO ()
throwTo (Connection -> ThreadId
conOwner Connection
c)
    
  ------------------------------------------------------------------------
  -- | A Queue for sending messages.
  ------------------------------------------------------------------------
  data Writer a = SendQ {
                   Writer a -> Con
wCon  :: Con,
                   Writer a -> String
wDest :: String,
                   Writer a -> String
wName :: String,
                   Writer a -> Bool
wRec  :: Bool,
                   Writer a -> Bool
wWait :: Bool,
                   Writer a -> Bool
wCntl :: Bool,
                   Writer a -> Bool
wTx   :: Bool,
                   Writer a -> OutBound a
wTo   :: OutBound a}

  ------------------------------------------------------------------------
  -- | A Queue for receiving messages
  ------------------------------------------------------------------------
  data Reader a = RecvQ {
                   Reader a -> Con
rCon  :: Con,
                   Reader a -> Sub
rSub  :: Sub,
                   Reader a -> String
rDest :: String,
                   Reader a -> String
rName :: String,
                   Reader a -> AckMode
rMode :: F.AckMode,
                   Reader a -> Bool
rAuto :: Bool, -- library creates Ack
                   Reader a -> Bool
rRec  :: Bool,
                   Reader a -> InBound a
rFrom :: InBound a}

  instance Eq (Reader a) where
    Reader a
q1 == :: Reader a -> Reader a -> Bool
== Reader a
q2 = Reader a -> String
forall a. Reader a -> String
rName Reader a
q1 String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== Reader a -> String
forall a. Reader a -> String
rName Reader a
q2

  instance Eq (Writer a) where
    Writer a
q1 == :: Writer a -> Writer a -> Bool
== Writer a
q2 = Writer a -> String
forall a. Writer a -> String
wName Writer a
q1 String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== Writer a -> String
forall a. Writer a -> String
wName Writer a
q2

  ------------------------------------------------------------------------
  -- | Options that may be passed 
  --   to 'newReader' and 'newWriter' and their variants.
  ------------------------------------------------------------------------
  data Qopt = 
            -- | A queue created with 'OWithReceipt' will request a receipt
            --   on all interactions with the broker.
            --   The handling of receipts is usually not visible to applications, 
            --   but may be made visible in the case of sending messages
            --   using 'writeQWith'.
            --   'writeQWith' return the receipt identifier
            --   and the application can later invoke 'waitReceipt'
            --   to explicitly wait for the broker confirming this receipt.
            --
            --   A 'Reader' created with 'OWithReceipt' will 
            --   issue a request for receipt when subscribing to a Stomp queue.
            OWithReceipt 
            -- | A queue created with 'OWaitReceipt' will wait for the receipt
            --   before returning from a call that has issued a request for receipt.
            --   This implies that the current thread will yield the processor.
            --   'writeQ' will internally create a request for receipt and 
            --   wait for the broker to confirm the receipt before returning.
            --   Note that, for 'newReader', there is no difference between
            --   'OWaitReceipt' and 'OWithReceipt'. Either option will cause
            --   the thread to preempt until the receipt is confirmed.
            --
            --   On writing a message, this is not always the preferred
            --   method. You may want to fire and forget - and check 
            --   for the confirmation of the receipt only later.
            --   In this case, you will create the
            --   'Writer' with 'OWithReceipt' only and, later, after having
            --   sent a message with 'writeQWith', wait for the receipt using
            --   'waitReceipt'. Note that 'OWaitReceipt' without 'OWithReceipt'
            --   has no meaning with 'writeQ' and 'writeQWith'. 
            --   If you want to request a receipt with a message
            --   and wait for the broker to confirm it, you have to use 
            --   both options.
            --
            --   It is good practice to use /timeout/ with all calls
            --   that may wait for receipts, 
            --   /i.e./ 'newReader' and 'withReader' 
            --   with options 'OWithReceipt' or 'OWaitReceipt',
            --   or 'writeQ' and 'writeQWith' with options 'OWaitReceipt',
            --   or 'ackWith' and 'nackWith'.
            | OWaitReceipt 
            -- | The option defines the 'F.AckMode' of the queue,
            --   which is relevant for 'Reader' only.
            --   'F.AckMode' is one of: 
            --   'F.Auto', 'F.Client', 'F.ClientIndi'.
            --
            --   If 'OMode' is not given, 'F.Auto' is assumed as default.
            --
            --   For more details, see 'F.AckMode'.
            | OMode F.AckMode  
            -- | Expression often used by Ren&#x00e9; Artois.
            --   Furthermore, if 'OMode' is either
            --   'F.Client' or 'F.ClientIndi', then 
            --   this option forces 'readQ' to send an acknowledgement
            --   automatically when a message has been read from the queue. 
            | OAck
            -- | A queue created with 'OForceTx' will throw 
            --   'QueueException' when used outside a 'Transaction'.
            | OForceTx
            -- | Do not automatically add a content-length header
            | ONoContentLen 
    deriving (Int -> Qopt -> String -> String
[Qopt] -> String -> String
Qopt -> String
(Int -> Qopt -> String -> String)
-> (Qopt -> String) -> ([Qopt] -> String -> String) -> Show Qopt
forall a.
(Int -> a -> String -> String)
-> (a -> String) -> ([a] -> String -> String) -> Show a
showList :: [Qopt] -> String -> String
$cshowList :: [Qopt] -> String -> String
show :: Qopt -> String
$cshow :: Qopt -> String
showsPrec :: Int -> Qopt -> String -> String
$cshowsPrec :: Int -> Qopt -> String -> String
Show, ReadPrec [Qopt]
ReadPrec Qopt
Int -> ReadS Qopt
ReadS [Qopt]
(Int -> ReadS Qopt)
-> ReadS [Qopt] -> ReadPrec Qopt -> ReadPrec [Qopt] -> Read Qopt
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
readListPrec :: ReadPrec [Qopt]
$creadListPrec :: ReadPrec [Qopt]
readPrec :: ReadPrec Qopt
$creadPrec :: ReadPrec Qopt
readList :: ReadS [Qopt]
$creadList :: ReadS [Qopt]
readsPrec :: Int -> ReadS Qopt
$creadsPrec :: Int -> ReadS Qopt
Read, Qopt -> Qopt -> Bool
(Qopt -> Qopt -> Bool) -> (Qopt -> Qopt -> Bool) -> Eq Qopt
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Qopt -> Qopt -> Bool
$c/= :: Qopt -> Qopt -> Bool
== :: Qopt -> Qopt -> Bool
$c== :: Qopt -> Qopt -> Bool
Eq) 

  ------------------------------------------------------------------------
  -- Option is element of option list
  ------------------------------------------------------------------------
  hasQopt :: Qopt -> [Qopt] -> Bool
  hasQopt :: Qopt -> [Qopt] -> Bool
hasQopt Qopt
o [Qopt]
os = Qopt
o Qopt -> [Qopt] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [Qopt]
os

  ------------------------------------------------------------------------
  -- What ackMode ('F.Auto' by default)
  ------------------------------------------------------------------------
  ackMode :: [Qopt] -> F.AckMode
  ackMode :: [Qopt] -> AckMode
ackMode [Qopt]
os = case (Qopt -> Bool) -> [Qopt] -> Maybe Qopt
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find Qopt -> Bool
isMode [Qopt]
os of
                 Just (OMode AckMode
x) -> AckMode
x
                 Maybe Qopt
_              -> AckMode
F.Auto
    where isMode :: Qopt -> Bool
isMode Qopt
x = case Qopt
x of
                       OMode AckMode
_ -> Bool
True
                       Qopt
_       -> Bool
False

  ------------------------------------------------------------------------
  -- | Converters are user-defined actions passed to 
  --   'newReader' ('InBound') and
  --   'newWriter' ('OutBound')
  --   that convert a 'B.ByteString' to a value of type /a/ ('InBound') or
  --                a value of type /a/ to 'B.ByteString' ('OutBound'). 
  --   Converters are, hence, similar to /put/ and /get/ in the /Binary/
  --   monad. 
  --
  --   The reason for using explicit, user-defined converters 
  --   instead of /Binary/ /encode/ and /decode/
  --   is that the conversion with queues
  --   may be much more complex, involving reading configurations 
  --   or other 'IO' actions.
  --   Furthermore, we have to distinguish between data types and 
  --   there binary encoding when sent over the network.
  --   This distinction is made by /MIME/ types.
  --   Two applications may send the same data type,
  --   but one encodes this type as \"text/plain\",
  --   the other as \"text/xml\".
  --   'InBound' conversions have to consider the /MIME/ type
  --   and, hence, need more input parameters than provided by /decode/.
  --   /encode/ and /decode/, however,
  --   can be used internally by user-defined converters.
  --
  --   The parameters expected by an 'InBound' converter are:
  --
  --     * the /MIME/ type of the content
  --
  --     * the content size 
  --
  --     * the list of 'F.Header' coming with the message
  --
  --     * the content encoded as 'B.ByteString'.
  --
  --   A simple in-bound converter for plain strings is for instance:
  --
  --   > let iconv _ _ _ = return . unpack
  ------------------------------------------------------------------------
  type InBound  a = Mime.Type -> Int -> [F.Header] -> B.ByteString -> IO a
  ------------------------------------------------------------------------
  -- | Out-bound converters are much simpler.
  --   Since the application developer knows,
  --   which encoding to use, the /MIME/ type is not needed.
  --   The converter receives only the value of type /a/
  --   and converts it into a 'B.ByteString'.
  --   A simple example to create an out-bound converter 
  --   for plain strings could be:
  --
  --   > let oconv = return . pack
  ------------------------------------------------------------------------
  type OutBound a = a -> IO B.ByteString

  ------------------------------------------------------------------------
  -- | Creates a 'Reader' with the lifetime of the connection 'Con'.
  --   Creating a receiving queue involves interaction with the broker;
  --   this may result in preempting the calling thread, 
  --   depending on the options ['Qopt'].
  --   
  --   Parameters:
  --
  --   * The connection handle 'Con'
  --
  --   * A queue name that should be unique in your application.
  --     The queue name is used only for debugging. 
  --
  --   * The Stomp destination, /i.e./ the name of the queue
  --     as it is known to the broker and other applications.
  --
  --   * A list of options ('Qopt').
  --
  --   * A list of headers ('F.Header'), 
  --     which will be passed to the broker.
  --     A header may be, for instance,
  --     a selector that restricts the subscription to this queue,
  --     such that only messages with certain attributes 
  --     (/i.e./ specific headers) are sent to the subscribing client.
  --     Selectors are broker-specific and typically expressed
  --     in some kind of query language such as SQL or XPath.
  --
  --   * An in-bound converter.
  --
  --   A usage example to create a 'Reader'
  --   with 'Connection' /c/ and the in-bound converter
  --   /iconv/ would be:
  --
  --   > q <- newReader c "TestQ" "/queue/test" [] [] iconv
  --
  --   A call to 'newReader' may result in preemption when
  --   one of the options 'OWaitReceipt' or 'OWithReceipt' are given;
  --   an example for such a call 
  --   with /tmo/ an 'Int' value representing a /timeout/
  --   in microseconds and 
  --   the result /mbQ/ of type 'Maybe' is:
  --
  --   > mbQ <- timeout tmo $ newReader c "TestQ" "/queue/test" [OWaitReceipt] [] oconv
  --   > case mbQ of
  --   >   Nothing -> -- handle error
  --   >   Just q  -> do -- ...
  --
  --   A newReader stores data in the connection /c/.
  --   If the lifetime of a reader is shorter than that of its connection
  --   it should call 'destroyReader' to avoid memory leaks.
  --   In such cases, it is usually preferable to use 'withReader'.
  ------------------------------------------------------------------------
  newReader :: Con -> String -> String -> [Qopt] -> [F.Header] -> 
               InBound a -> IO (Reader a)
  newReader :: Con
-> String
-> String
-> [Qopt]
-> [Header]
-> InBound a
-> IO (Reader a)
newReader Con
cid String
qn String
dst [Qopt]
os [Header]
hs InBound a
conv = do
    Connection
c <- Con -> IO Connection
getCon Con
cid
    if Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ Connection -> Bool
connected Connection
c
      then StomplException -> IO (Reader a)
forall e a. Exception e => e -> IO a
throwIO (StomplException -> IO (Reader a))
-> StomplException -> IO (Reader a)
forall a b. (a -> b) -> a -> b
$ String -> StomplException
ConnectException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$ 
                 String
"Not connected (" String -> String -> String
forall a. [a] -> [a] -> [a]
++ Con -> String
forall a. Show a => a -> String
show Con
cid String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
")"
      else Con
-> Connection
-> String
-> String
-> [Qopt]
-> [Header]
-> InBound a
-> IO (Reader a)
forall a.
Con
-> Connection
-> String
-> String
-> [Qopt]
-> [Header]
-> InBound a
-> IO (Reader a)
newRecvQ Con
cid Connection
c String
qn String
dst [Qopt]
os [Header]
hs InBound a
conv

  ------------------------------------------------------------------------
  -- | Removes all references to the reader from the connection. 
  ------------------------------------------------------------------------
  destroyReader :: Reader a -> IO ()
  destroyReader :: Reader a -> IO ()
destroyReader = Reader a -> IO ()
forall a. Reader a -> IO ()
unsub 
  
  ------------------------------------------------------------------------
  -- | Creates a 'Writer' with the lifetime of the connection 'Con'.
  --   Creating a sending queue does not involve interaction with the broker
  --   and will not preempt the calling thread.
  --   
  --   A sending queue may be created like in the following
  --   code fragment, where /oconv/ is 
  --   an already defined out-bound converter:
  --
  --   > q <- newWriter c "TestQ" "/queue/test" [] [] oconv
  --
  -- Currently no references to the writer are stored 
  -- in the connection. It is advisable, however, 
  -- to use 'withWriter' instead of 'newWriter'
  -- whenever the lifetime of a writer is shorter than that
  -- of the connection. 
  -- In cases where this is not possible,
  -- you should use 'destroyWriter' 
  -- when the writer is not needed anymore.
  -- Currently 'destroyWriter' does nothing,
  -- but this may change in the future.
  ------------------------------------------------------------------------
  newWriter :: Con -> String -> String -> [Qopt] -> [F.Header] -> 
               OutBound a -> IO (Writer a)
  newWriter :: Con
-> String
-> String
-> [Qopt]
-> [Header]
-> OutBound a
-> IO (Writer a)
newWriter Con
cid String
qn String
dst [Qopt]
os [Header]
_ OutBound a
conv = do
    Connection
c <- Con -> IO Connection
getCon Con
cid
    if Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ Connection -> Bool
connected Connection
c
      then StomplException -> IO (Writer a)
forall e a. Exception e => e -> IO a
throwIO (StomplException -> IO (Writer a))
-> StomplException -> IO (Writer a)
forall a b. (a -> b) -> a -> b
$ String -> StomplException
ConnectException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$ 
                 String
"Not connected (" String -> String -> String
forall a. [a] -> [a] -> [a]
++ Con -> String
forall a. Show a => a -> String
show Con
cid String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
")"
      else Con -> String -> String -> [Qopt] -> OutBound a -> IO (Writer a)
forall a.
Con -> String -> String -> [Qopt] -> OutBound a -> IO (Writer a)
newSendQ Con
cid String
qn String
dst [Qopt]
os OutBound a
conv

  ------------------------------------------------------------------------
  -- | Does nothing, but should be used with 'newWriter'.
  ------------------------------------------------------------------------
  destroyWriter :: Writer a -> IO ()
  destroyWriter :: Writer a -> IO ()
destroyWriter Writer a
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

  ------------------------------------------------------------------------
  -- | Creates a 'Reader' with limited lifetime. 
  --   The queue will live only in the scope of the action
  --   that is passed as last parameter. 
  --   The function is useful for readers
  --   with a lifetime shorter than that of the connection.
  --   When the action terminates, the client unsubscribes from 
  --   the broker queue - even if an exception is raised.
  --
  --   'withReader' returns the result of the action.
  --   Since the lifetime of the queue is limited to the action,
  --   it should not be returned.
  --   Any operation on a reader created by 'withReader'
  --   outside the action will raise 'QueueException'.
  --
  --   A usage example is: 
  --
  --   > x <- withReader c "TestQ" "/queue/test" [] [] iconv $ \q -> do
  ------------------------------------------------------------------------
  withReader :: Con -> String    -> 
                       String    -> [Qopt] -> [F.Header] -> 
                       InBound i -> (Reader i -> IO r)   -> IO r
  withReader :: Con
-> String
-> String
-> [Qopt]
-> [Header]
-> InBound i
-> (Reader i -> IO r)
-> IO r
withReader Con
cid String
qn String
dst [Qopt]
os [Header]
hs InBound i
conv = 
    IO (Reader i) -> (Reader i -> IO ()) -> (Reader i -> IO r) -> IO r
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (Con
-> String
-> String
-> [Qopt]
-> [Header]
-> InBound i
-> IO (Reader i)
forall a.
Con
-> String
-> String
-> [Qopt]
-> [Header]
-> InBound a
-> IO (Reader a)
newReader Con
cid String
qn String
dst [Qopt]
os [Header]
hs InBound i
conv) Reader i -> IO ()
forall a. Reader a -> IO ()
unsub 

  ------------------------------------------------------------------------
  -- | Creates a 'Writer' with limited lifetime. 
  --   The queue will live only in the scope of the action
  --   that is passed as last parameter. 
  --   The function should be used for writers with a lifetime
  --   shorter than that of the connection.
  --
  --   'withWriter' returns the result of the action.
  --   Since the lifetime of the queue is limited to the action,
  --   it should not be returned.
  --   Any operation on a writer created by 'withWriter'
  --   outside the action will raise a 'QueueException'.
  ------------------------------------------------------------------------
  withWriter :: Con -> String     -> 
                       String     -> [Qopt] -> [F.Header] -> 
                       OutBound o -> (Writer o -> IO r)   -> IO r
  withWriter :: Con
-> String
-> String
-> [Qopt]
-> [Header]
-> OutBound o
-> (Writer o -> IO r)
-> IO r
withWriter Con
cid String
qn String
dst [Qopt]
os [Header]
hs OutBound o
conv Writer o -> IO r
act = 
    Con
-> String
-> String
-> [Qopt]
-> [Header]
-> OutBound o
-> IO (Writer o)
forall a.
Con
-> String
-> String
-> [Qopt]
-> [Header]
-> OutBound a
-> IO (Writer a)
newWriter Con
cid String
qn String
dst [Qopt]
os [Header]
hs OutBound o
conv IO (Writer o) -> (Writer o -> IO r) -> IO r
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Writer o -> IO r
act

  ------------------------------------------------------------------------
  -- | Creates a pair of ('Reader' i, 'Writer' o) with limited lifetime. 
  --   The pair will live only in the scope of the action
  --   that is passed as last parameter. 
  --   The function is useful for readers\/writers
  --   used in combination, /e.g./ to emulate a client\/server
  --   kind of communication.
  --
  --   'withPair' returns the result of the action passed in.
  --
  --   The parameters are:
  --
  --   * The connection handle 'Con'
  --
  --   * The name of the pair; 
  --     the reader will be identified by a string
  --                with \"_r\" added to this name,
  --     the writer by a string with \"_w\" added to this name.
  --
  --   * The description of the 'Reader', 'ReaderDesc'
  --
  --   * The description of the 'Writer', 'WriterDesc'
  --
  --   * The application-defined action
  --
  --  The reason for introducing the reader and writer description
  --  is to provide error detection at compile time:
  --  It is this way much more difficult to accidently confuse
  --  the writer's and the reader's parameters (/e.g./ 
  --  passing the writer's 'Qopt's to the reader).
  ------------------------------------------------------------------------
  withPair :: Con -> String  ->  ReaderDesc i -> 
                                 WriterDesc o ->
                                 ((Reader i, Writer o) -> IO r) -> IO r
  withPair :: Con
-> String
-> ReaderDesc i
-> WriterDesc o
-> ((Reader i, Writer o) -> IO r)
-> IO r
withPair Con
cid String
n (String
rq,[Qopt]
ro,[Header]
rh,InBound i
iconv)
                 (String
wq,[Qopt]
wo,[Header]
wh,OutBound o
oconv) (Reader i, Writer o) -> IO r
act = 
    Con
-> String
-> String
-> [Qopt]
-> [Header]
-> InBound i
-> (Reader i -> IO r)
-> IO r
forall i r.
Con
-> String
-> String
-> [Qopt]
-> [Header]
-> InBound i
-> (Reader i -> IO r)
-> IO r
withReader Con
cid (String
n String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
"_r") String
rq [Qopt]
ro [Header]
rh InBound i
iconv ((Reader i -> IO r) -> IO r) -> (Reader i -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Reader i
r ->
      Con
-> String
-> String
-> [Qopt]
-> [Header]
-> OutBound o
-> (Writer o -> IO r)
-> IO r
forall o r.
Con
-> String
-> String
-> [Qopt]
-> [Header]
-> OutBound o
-> (Writer o -> IO r)
-> IO r
withWriter Con
cid (String
n String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
"_w") String
wq [Qopt]
wo [Header]
wh OutBound o
oconv ((Writer o -> IO r) -> IO r) -> (Writer o -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Writer o
w -> (Reader i, Writer o) -> IO r
act (Reader i
r,Writer o
w)

  ------------------------------------------------------------------------
  -- | The 'Reader' parameters of 'withPair':
  --
  --     * The reader's queue name 
  --
  --     * The reader's 'Qopt's
  --
  --     * The reader's 'Header's
  --
  --     * The reader's (inbound) converter
  ------------------------------------------------------------------------
  type ReaderDesc i = (String, [Qopt], [F.Header], InBound  i)

  ------------------------------------------------------------------------
  -- | The 'Writer' parameters of 'withPair'
  --
  --     * The writer's queue name
  --
  --     * The writer's 'Qopt's
  --
  --     * The writer's 'Header's
  --
  --     * The writer's (outbound) converter
  ------------------------------------------------------------------------
  type WriterDesc o = (String, [Qopt], [F.Header], OutBound o)

  ------------------------------------------------------------------------
  -- Creating a SendQ is plain and simple.
  ------------------------------------------------------------------------
  newSendQ :: Con -> String -> String -> [Qopt] -> 
              OutBound a -> IO (Writer a)
  newSendQ :: Con -> String -> String -> [Qopt] -> OutBound a -> IO (Writer a)
newSendQ Con
cid String
qn String
dst [Qopt]
os OutBound a
conv = 
    Writer a -> IO (Writer a)
forall (m :: * -> *) a. Monad m => a -> m a
return SendQ :: forall a.
Con
-> String
-> String
-> Bool
-> Bool
-> Bool
-> Bool
-> OutBound a
-> Writer a
SendQ {
              wCon :: Con
wCon  = Con
cid,
              wDest :: String
wDest = String
dst,
              wName :: String
wName = String
qn,
              wRec :: Bool
wRec  = Qopt -> [Qopt] -> Bool
hasQopt Qopt
OWithReceipt  [Qopt]
os, 
              wWait :: Bool
wWait = Qopt -> [Qopt] -> Bool
hasQopt Qopt
OWaitReceipt  [Qopt]
os, 
              wTx :: Bool
wTx   = Qopt -> [Qopt] -> Bool
hasQopt Qopt
OForceTx      [Qopt]
os, 
              wCntl :: Bool
wCntl = Qopt -> [Qopt] -> Bool
hasQopt Qopt
ONoContentLen [Qopt]
os,
              wTo :: OutBound a
wTo   = OutBound a
conv}

  ------------------------------------------------------------------------
  -- Creating a ReceivQ, however, involves some more hassle,
  -- in particular 'IO'.
  ------------------------------------------------------------------------
  newRecvQ :: Con        -> Connection -> String -> String -> 
              [Qopt]     -> [F.Header] ->
              InBound a  -> IO (Reader a)
  newRecvQ :: Con
-> Connection
-> String
-> String
-> [Qopt]
-> [Header]
-> InBound a
-> IO (Reader a)
newRecvQ Con
cid Connection
c String
qn String
dst [Qopt]
os [Header]
hs InBound a
conv = do
    let am :: AckMode
am   = [Qopt] -> AckMode
ackMode [Qopt]
os
    let au :: Bool
au   = Qopt -> [Qopt] -> Bool
hasQopt Qopt
OAck [Qopt]
os
    let with :: Bool
with = Qopt -> [Qopt] -> Bool
hasQopt Qopt
OWithReceipt [Qopt]
os Bool -> Bool -> Bool
|| Qopt -> [Qopt] -> Bool
hasQopt Qopt
OWaitReceipt [Qopt]
os
    Sub
sid <- IO Sub
mkUniqueSubId
    Rec
rc  <- if Bool
with then IO Rec
mkUniqueRecc else Rec -> IO Rec
forall (m :: * -> *) a. Monad m => a -> m a
return Rec
NoRec
    Con -> IO ()
logSend Con
cid
    Connection -> Subscription -> String -> [Header] -> IO ()
fSubscribe Connection
c(Sub -> String -> AckMode -> Subscription
mkSub Sub
sid String
dst AckMode
am) (Rec -> String
forall a. Show a => a -> String
show Rec
rc) [Header]
hs
    Chan Frame
ch <- IO (Chan Frame)
forall a. IO (Chan a)
newChan 
    Con -> SubEntry -> IO ()
addSub  Con
cid (Sub
sid, Chan Frame
ch) 
    Con -> DestEntry -> IO ()
addDest Con
cid (String
dst, Chan Frame
ch) 
    let q :: Reader a
q = RecvQ :: forall a.
Con
-> Sub
-> String
-> String
-> AckMode
-> Bool
-> Bool
-> InBound a
-> Reader a
RecvQ {
               rCon :: Con
rCon  = Con
cid,
               rSub :: Sub
rSub  = Sub
sid,
               rDest :: String
rDest = String
dst,
               rName :: String
rName = String
qn,
               rMode :: AckMode
rMode = AckMode
am,
               rAuto :: Bool
rAuto = Bool
au,
               rRec :: Bool
rRec  = Bool
with,
               rFrom :: InBound a
rFrom = InBound a
conv}
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
with (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Con -> Rec -> IO ()
waitReceipt Con
cid Rec
rc
    Reader a -> IO (Reader a)
forall (m :: * -> *) a. Monad m => a -> m a
return Reader a
q

  ------------------------------------------------------------------------
  -- Unsubscribe a queue
  ------------------------------------------------------------------------
  unsub :: Reader a -> IO ()
  unsub :: Reader a -> IO ()
unsub Reader a
q = do
    let cid :: Con
cid = Reader a -> Con
forall a. Reader a -> Con
rCon  Reader a
q
    let sid :: Sub
sid = Reader a -> Sub
forall a. Reader a -> Sub
rSub  Reader a
q
    let dst :: String
dst = Reader a -> String
forall a. Reader a -> String
rDest Reader a
q
    Rec
rc <- if Reader a -> Bool
forall a. Reader a -> Bool
rRec Reader a
q then IO Rec
mkUniqueRecc else Rec -> IO Rec
forall (m :: * -> *) a. Monad m => a -> m a
return Rec
NoRec
    Connection
c  <- Con -> IO Connection
getCon Con
cid
    Con -> IO ()
logSend Con
cid
    IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
finally (Connection -> Subscription -> String -> [Header] -> IO ()
fUnsubscribe Connection
c
               (Sub -> String -> AckMode -> Subscription
mkSub Sub
sid String
dst AckMode
F.Client)
               (Rec -> String
forall a. Show a => a -> String
show Rec
rc) [])
            (do Con -> Sub -> IO ()
rmSub  Con
cid Sub
sid
                Con -> String -> IO ()
rmDest Con
cid String
dst)
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Reader a -> Bool
forall a. Reader a -> Bool
rRec Reader a
q) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Con -> Rec -> IO ()
waitReceipt Con
cid Rec
rc

  ------------------------------------------------------------------------
  -- | Removes the oldest message from the queue
  --   and returns it as 'Message'.
  --   The message cannot be read from the queue
  --   by another call to 'readQ' within the same connection.
  --   Wether other connections will receive the message too
  --   depends on the broker and the queue patterns it implements.
  --   If the queue is currently empty,
  --   the thread will preempt until a message arrives.
  --
  --   If the queue was created with 
  --   'OMode' other than 'F.Auto' 
  --   and with 'OAck', then an /ack/ 
  --   will be automatically sent to the broker;
  --   if 'OAck' was not set,
  --   the message will be registered as pending /ack/.
  --
  --   Note that, when 'readQ' sends an /ack/ internally,
  --   it will not request a receipt from the broker.
  --   The rationale for this design is simplicity.
  --   If the function expected a receipt, 
  --   it would have to either wait for the receipt
  --   or return it.
  --   In the first case, it would be difficult
  --   for the programmer to distinguish, on a timeout, between
  --   /no message available/ and
  --   /no receipt arrived/.
  --   In the second case, the receipt
  --   would need to be returned.
  --   This would unnecessarily blow up the interface.
  --   If you need the reliability of receipts,
  --   you should create the queue without 'OAck'
  --   and use 'ackWith' to acknowledge 
  --   the message explicitly.
  ------------------------------------------------------------------------
  readQ :: Reader a -> IO (Message a)
  readQ :: Reader a -> IO (Message a)
readQ Reader a
q = do
    Connection
c <- Con -> IO Connection
getCon (Reader a -> Con
forall a. Reader a -> Con
rCon Reader a
q)
    if Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ Connection -> Bool
connected Connection
c
      then StomplException -> IO (Message a)
forall e a. Exception e => e -> IO a
throwIO (StomplException -> IO (Message a))
-> StomplException -> IO (Message a)
forall a b. (a -> b) -> a -> b
$ String -> StomplException
QueueException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$ String
"Not connected: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Con -> String
forall a. Show a => a -> String
show (Reader a -> Con
forall a. Reader a -> Con
rCon Reader a
q)
      else case Sub -> Connection -> Maybe (Chan Frame)
getSub (Reader a -> Sub
forall a. Reader a -> Sub
rSub Reader a
q) Connection
c of
             Maybe (Chan Frame)
Nothing -> StomplException -> IO (Message a)
forall e a. Exception e => e -> IO a
throwIO (StomplException -> IO (Message a))
-> StomplException -> IO (Message a)
forall a b. (a -> b) -> a -> b
$ String -> StomplException
QueueException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$ 
                           String
"Unknown queue " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Reader a -> String
forall a. Reader a -> String
rName Reader a
q
             Just Chan Frame
ch -> do
               Message a
m <- Chan Frame -> IO Frame
forall a. Chan a -> IO a
readChan Chan Frame
ch IO Frame -> (Frame -> IO (Message a)) -> IO (Message a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Reader a -> Frame -> IO (Message a)
forall a. Reader a -> Frame -> IO (Message a)
frmToMsg Reader a
q
               Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Reader a -> AckMode
forall a. Reader a -> AckMode
rMode Reader a
q AckMode -> AckMode -> Bool
forall a. Eq a => a -> a -> Bool
/= AckMode
F.Auto) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
                 if Reader a -> Bool
forall a. Reader a -> Bool
rAuto Reader a
q
                   then Con -> Message a -> IO ()
forall a. Con -> Message a -> IO ()
ack    (Reader a -> Con
forall a. Reader a -> Con
rCon Reader a
q) Message a
m
                   else Con -> MsgId -> IO ()
addAck (Reader a -> Con
forall a. Reader a -> Con
rCon Reader a
q) (Message a -> MsgId
forall a. Message a -> MsgId
msgId Message a
m)
               Message a -> IO (Message a)
forall (m :: * -> *) a. Monad m => a -> m a
return Message a
m

  ------------------------------------------------------------------------
  -- We do not support this, because of GHC ticket 4154:
  -- deadlock on isEmptyChan with concurrent read
  ------------------------------------------------------------------------
  -- isEmptyQ :: Reader a -> IO Bool

  ------------------------------------------------------------------------
  -- | Adds the value /a/ as message at the end of the queue.
  --   The Mime type as well as the headers 
  --   are added to the message.
  --
  --   If the queue was created with the option
  --   'OWithReceipt',
  --   'writeQ' will request a receipt from the broker.
  --   If the queue was additionally created with
  --   'OWaitReceipt',
  --   'writeQ' will preempt until the receipt is confirmed.
  --
  --   The Stomp headers are useful for brokers
  --   that provide selectors on /subscribe/,
  --   see 'newReader' for details.
  --
  --   A usage example for a 'Writer' /q/ of type 'String'
  --   may be (/nullType/ is defined as /text/\//plain/ in Codec.MIME):
  --
  --   > writeQ q nullType [] "hello world!"
  --
  --   For a 'Writer' that was created 
  --   with 'OWithReceipt' and 'OWaitReceipt',
  --   the function should be called with /timeout/:
  --
  --   > mbR <- timeout tmo $ writeQ q nullType [] "hello world!"
  --   > case mbR of
  --   >   Nothing -> -- error handling
  --   >   Just r  -> do -- ...
  ------------------------------------------------------------------------
  writeQ :: Writer a -> Mime.Type -> [F.Header] -> a -> IO ()
  writeQ :: Writer a -> Type -> [Header] -> a -> IO ()
writeQ Writer a
q Type
mime [Header]
hs a
x =
    Writer a -> Type -> [Header] -> a -> IO Rec
forall a. Writer a -> Type -> [Header] -> a -> IO Rec
writeQWith Writer a
q Type
mime [Header]
hs a
x IO Rec -> (Rec -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (\Rec
_ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())

  ------------------------------------------------------------------------
  -- | This is a variant of 'writeQ'
  --   that overwrites the destination queue defined in the writer queue.
  --   It can be used for /ad hoc/ communication and
  --   for emulations of client/server-like protocols:
  --   the client would pass the name of the queue
  --   where it expects the server response in a header;
  --   the server would send the reply to the queue
  --   indicated in the header using 'writeAdHoc'.
  --   The additional 'String' parameter contains the destination.
  ------------------------------------------------------------------------
  writeAdHoc :: Writer a -> String -> Mime.Type -> [F.Header] -> a -> IO ()
  writeAdHoc :: Writer a -> String -> Type -> [Header] -> a -> IO ()
writeAdHoc Writer a
q String
dest Type
mime [Header]
hs a
x =
    Writer a -> String -> Type -> [Header] -> a -> IO Rec
forall a. Writer a -> String -> Type -> [Header] -> a -> IO Rec
writeGeneric Writer a
q String
dest Type
mime [Header]
hs a
x IO Rec -> (Rec -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (\Rec
_ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())

  ------------------------------------------------------------------------
  -- | This is a variant of 'writeQ' 
  --   that is particularly useful for queues 
  --   created with 'OWithReceipt', but without 'OWaitReceipt'.
  --   It returns the 'Receipt', so that it can be waited for
  --   later, using 'waitReceipt'.
  --
  --   Note that the behaviour of 'writeQWith', 
  --   besides of returning the receipt, is the same as 'writeQ',
  --   /i.e./, on a queue with 'OWithReceipt' and 'OWaitReceipt'
  --   'writeQWith' will wait for the receipt being confirmed.
  --   In this case, the returned receipt is, in fact, 
  --   of no further use for the application.
  --
  --   The function is used like:
  --
  --   > r <- writeQWith q nullType [] "hello world!"
  ------------------------------------------------------------------------
  writeQWith :: Writer a -> Mime.Type -> [F.Header] -> a -> IO Receipt
  writeQWith :: Writer a -> Type -> [Header] -> a -> IO Rec
writeQWith Writer a
q = Writer a -> String -> Type -> [Header] -> a -> IO Rec
forall a. Writer a -> String -> Type -> [Header] -> a -> IO Rec
writeGeneric Writer a
q (Writer a -> String
forall a. Writer a -> String
wDest Writer a
q) 

  ------------------------------------------------------------------------
  -- | This is a variant of 'writeAdHoc' 
  --   that is particularly useful for queues 
  --   created with 'OWithReceipt', but without 'OWaitReceipt'.
  --   It returns the 'Receipt', so that it can be waited for
  --   later, using 'waitReceipt'.
  --   Please refer to 'writeQWith' for more details.
  ------------------------------------------------------------------------
  writeAdHocWith :: Writer a -> String -> Mime.Type -> [F.Header] -> a -> IO Receipt
  writeAdHocWith :: Writer a -> String -> Type -> [Header] -> a -> IO Rec
writeAdHocWith = Writer a -> String -> Type -> [Header] -> a -> IO Rec
forall a. Writer a -> String -> Type -> [Header] -> a -> IO Rec
writeGeneric 
 
  ------------------------------------------------------------------------
  -- internal work horse
  ------------------------------------------------------------------------
  writeGeneric :: Writer a -> String -> 
                  Mime.Type -> [F.Header] -> a -> IO Receipt
  writeGeneric :: Writer a -> String -> Type -> [Header] -> a -> IO Rec
writeGeneric Writer a
q String
dest Type
mime [Header]
hs a
x = do
    Connection
c <- Con -> IO Connection
getCon (Writer a -> Con
forall a. Writer a -> Con
wCon Writer a
q)
    if Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ Connection -> Bool
connected Connection
c
      then StomplException -> IO Rec
forall e a. Exception e => e -> IO a
throwIO (StomplException -> IO Rec) -> StomplException -> IO Rec
forall a b. (a -> b) -> a -> b
$ String -> StomplException
ConnectException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$
                 String
"Not connected (" String -> String -> String
forall a. [a] -> [a] -> [a]
++ Con -> String
forall a. Show a => a -> String
show (Writer a -> Con
forall a. Writer a -> Con
wCon Writer a
q) String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
")"
      else do
        Tx
tx <- Connection -> IO (Maybe Tx)
getCurTx Connection
c IO (Maybe Tx) -> (Maybe Tx -> IO Tx) -> IO Tx
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (\Maybe Tx
mbT -> 
                 case Maybe Tx
mbT of
                   Maybe Tx
Nothing     -> Tx -> IO Tx
forall (m :: * -> *) a. Monad m => a -> m a
return Tx
NoTx
                   Just  Tx
t     -> Tx -> IO Tx
forall (m :: * -> *) a. Monad m => a -> m a
return Tx
t)
        if Tx
tx Tx -> Tx -> Bool
forall a. Eq a => a -> a -> Bool
== Tx
NoTx Bool -> Bool -> Bool
&& Writer a -> Bool
forall a. Writer a -> Bool
wTx Writer a
q
          then StomplException -> IO Rec
forall e a. Exception e => e -> IO a
throwIO (StomplException -> IO Rec) -> StomplException -> IO Rec
forall a b. (a -> b) -> a -> b
$ String -> StomplException
QueueException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$
                 String
"Queue '" String -> String -> String
forall a. [a] -> [a] -> [a]
++ Writer a -> String
forall a. Writer a -> String
wName Writer a
q String -> String -> String
forall a. [a] -> [a] -> [a]
++ 
                 String
"' with OForceTx used outside Transaction"
          else do
            let conv :: OutBound a
conv = Writer a -> OutBound a
forall a. Writer a -> OutBound a
wTo Writer a
q
            ByteString
s  <- OutBound a
conv a
x
            Rec
rc <- if Writer a -> Bool
forall a. Writer a -> Bool
wRec Writer a
q then IO Rec
mkUniqueRecc else Rec -> IO Rec
forall (m :: * -> *) a. Monad m => a -> m a
return Rec
NoRec
            let l :: Int
l = if Writer a -> Bool
forall a. Writer a -> Bool
wCntl Writer a
q then -Int
1 else ByteString -> Int
B.length ByteString
s
            let m :: Message a
m = MsgId
-> Sub
-> String
-> String
-> Type
-> Int
-> Tx
-> ByteString
-> a
-> Message a
forall a.
MsgId
-> Sub
-> String
-> String
-> Type
-> Int
-> Tx
-> ByteString
-> a
-> Message a
mkMessage MsgId
NoMsg Sub
NoSub String
dest String
"" Type
mime Int
l Tx
tx ByteString
s a
x
            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Writer a -> Bool
forall a. Writer a -> Bool
wRec Writer a
q) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Con -> Rec -> IO ()
addRec (Writer a -> Con
forall a. Writer a -> Con
wCon Writer a
q) Rec
rc 
            Con -> IO ()
logSend (Con -> IO ()) -> Con -> IO ()
forall a b. (a -> b) -> a -> b
$ Writer a -> Con
forall a. Writer a -> Con
wCon Writer a
q
            Connection -> Message a -> String -> [Header] -> IO ()
forall a. Connection -> Message a -> String -> [Header] -> IO ()
fSend Connection
c Message a
m (Rec -> String
forall a. Show a => a -> String
show Rec
rc) [Header]
hs 
            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Writer a -> Bool
forall a. Writer a -> Bool
wRec Writer a
q Bool -> Bool -> Bool
&& Writer a -> Bool
forall a. Writer a -> Bool
wWait Writer a
q) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Con -> Rec -> IO ()
waitReceipt (Writer a -> Con
forall a. Writer a -> Con
wCon Writer a
q) Rec
rc 
            Rec -> IO Rec
forall (m :: * -> *) a. Monad m => a -> m a
return Rec
rc

  ------------------------------------------------------------------------
  -- | Acknowledges the arrival of 'Message' to the broker.
  --   It is used with a 'Connection' /c/ and a 'Message' /x/ like:
  --
  --   > ack c x
  ------------------------------------------------------------------------
  ack :: Con -> Message a -> IO ()
  ack :: Con -> Message a -> IO ()
ack Con
cid Message a
msg = do
    Con -> Bool -> Bool -> Message a -> IO ()
forall a. Con -> Bool -> Bool -> Message a -> IO ()
ack'  Con
cid Bool
True Bool
False Message a
msg
    Con -> MsgId -> IO ()
rmAck Con
cid (MsgId -> IO ()) -> MsgId -> IO ()
forall a b. (a -> b) -> a -> b
$ Message a -> MsgId
forall a. Message a -> MsgId
msgId Message a
msg

  ------------------------------------------------------------------------
  -- | Acknowledges the arrival of 'Message' to the broker,
  --   requests a receipt and waits until it is confirmed.
  --   Since it preempts the calling thread,
  --   it is usually used with /timeout/,
  --   for a 'Connection' /c/, a 'Message' /x/ 
  --   and a /timeout/ in microseconds /tmo/ like:
  --
  --   > mbR <- timeout tmo $ ackWith c x   
  --   > case mbR of
  --   >   Nothing -> -- error handling
  --   >   Just _  -> do -- ...
  ------------------------------------------------------------------------
  ackWith :: Con -> Message a -> IO ()
  ackWith :: Con -> Message a -> IO ()
ackWith Con
cid Message a
msg = do
    Con -> Bool -> Bool -> Message a -> IO ()
forall a. Con -> Bool -> Bool -> Message a -> IO ()
ack'  Con
cid Bool
True Bool
True Message a
msg  
    Con -> MsgId -> IO ()
rmAck Con
cid (MsgId -> IO ()) -> MsgId -> IO ()
forall a b. (a -> b) -> a -> b
$ Message a -> MsgId
forall a. Message a -> MsgId
msgId Message a
msg

  ------------------------------------------------------------------------
  -- | Negatively acknowledges the arrival of 'Message' to the broker.
  --   For more details see 'ack'.
  ------------------------------------------------------------------------
  nack :: Con -> Message a -> IO ()
  nack :: Con -> Message a -> IO ()
nack Con
cid Message a
msg = do
    Con -> Bool -> Bool -> Message a -> IO ()
forall a. Con -> Bool -> Bool -> Message a -> IO ()
ack' Con
cid Bool
False Bool
False Message a
msg
    Con -> MsgId -> IO ()
rmAck Con
cid (MsgId -> IO ()) -> MsgId -> IO ()
forall a b. (a -> b) -> a -> b
$ Message a -> MsgId
forall a. Message a -> MsgId
msgId Message a
msg

  ------------------------------------------------------------------------
  -- | Negatively acknowledges the arrival of 'Message' to the broker,
  --   requests a receipt and waits until it is confirmed.
  --   For more details see 'ackWith'.
  ------------------------------------------------------------------------
  nackWith :: Con -> Message a -> IO ()
  nackWith :: Con -> Message a -> IO ()
nackWith Con
cid Message a
msg = do
    Con -> Bool -> Bool -> Message a -> IO ()
forall a. Con -> Bool -> Bool -> Message a -> IO ()
ack' Con
cid Bool
False Bool
True Message a
msg
    Con -> MsgId -> IO ()
rmAck Con
cid (MsgId -> IO ()) -> MsgId -> IO ()
forall a b. (a -> b) -> a -> b
$ Message a -> MsgId
forall a. Message a -> MsgId
msgId Message a
msg

  ------------------------------------------------------------------------
  -- Checks for Transaction,
  -- if a transaction is ongoing,
  -- the TxId is added to the message
  -- and calls ack on the message.
  -- If called with True for "with receipt"
  -- the function creates a receipt and waits for its confirmation. 
  ------------------------------------------------------------------------
  ack' :: Con -> Bool -> Bool -> Message a -> IO ()
  ack' :: Con -> Bool -> Bool -> Message a -> IO ()
ack' Con
cid Bool
ok Bool
with Message a
msg = do
    Connection
c <- Con -> IO Connection
getCon Con
cid
    if Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ Connection -> Bool
connected Connection
c
      then EH
forall e a. Exception e => e -> IO a
throwIO EH -> EH
forall a b. (a -> b) -> a -> b
$ String -> StomplException
ConnectException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$ 
             String
"Not connected (" String -> String -> String
forall a. [a] -> [a] -> [a]
++ Con -> String
forall a. Show a => a -> String
show Con
cid String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
")"
      else if String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (String -> String
forall a. Show a => a -> String
show (String -> String) -> String -> String
forall a b. (a -> b) -> a -> b
$ Message a -> String
forall a. Message a -> String
msgAck Message a
msg)
           then EH
forall e a. Exception e => e -> IO a
throwIO EH -> EH
forall a b. (a -> b) -> a -> b
$ String -> StomplException
ProtocolException String
"No ack in message!"
           else do
             Tx
tx <- Connection -> IO (Maybe Tx)
getCurTx Connection
c IO (Maybe Tx) -> (Maybe Tx -> IO Tx) -> IO Tx
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (\Maybe Tx
mbT -> 
                       case Maybe Tx
mbT of
                         Maybe Tx
Nothing -> Tx -> IO Tx
forall (m :: * -> *) a. Monad m => a -> m a
return Tx
NoTx
                         Just Tx
x  -> Tx -> IO Tx
forall (m :: * -> *) a. Monad m => a -> m a
return Tx
x)
             let msg' :: Message a
msg' = Message a
msg {msgTx :: Tx
msgTx = Tx
tx}
             Rec
rc <- if Bool
with then IO Rec
mkUniqueRecc else Rec -> IO Rec
forall (m :: * -> *) a. Monad m => a -> m a
return Rec
NoRec
             Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
with (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Con -> Rec -> IO ()
addRec Con
cid Rec
rc
             Con -> IO ()
logSend Con
cid
             if Bool
ok then Connection -> Message a -> String -> IO ()
forall a. Connection -> Message a -> String -> IO ()
fAck  Connection
c Message a
msg' (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ Rec -> String
forall a. Show a => a -> String
show Rec
rc
                   else Connection -> Message a -> String -> IO ()
forall a. Connection -> Message a -> String -> IO ()
fNack Connection
c Message a
msg' (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ Rec -> String
forall a. Show a => a -> String
show Rec
rc
             Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
with (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Con -> Rec -> IO ()
waitReceipt Con
cid Rec
rc 

  ------------------------------------------------------------------------
  -- | Starts a transaction and executes the action
  --   in the last parameter.
  --   After the action has finished, 
  --   the transaction will be either committed or aborted
  --   even if an exception has been raised.
  --   Note that, depending on the options,
  --   the way a transaction is terminated may vary,
  --   refer to 'Topt' for details.
  --
  --   Transactions cannot be shared among threads.
  --   Transactions are internally protected against
  --   access from any thread but the one
  --   that has actually started the transaction.
  --
  --   It is /not/ advisable to use 'withTransaction' with /timeout/.
  --   It is preferred to use /timeout/ on the 
  --   the actions executed within this transaction.
  --   Whether and how much time the transaction itself
  --   shall wait for the completion of on-going interactions with the broker,
  --   in particular pending receipts,
  --   shall be controlled
  --   by the 'OTimeout' option.
  --
  --   'withTransaction' returns the result of the action.
  --
  --   The simplest usage example with a 'Connection' /c/ is:
  --
  --   > r <- withTransaction c [] $ \_ -> do
  --
  --   If the transaction shall use receipts and, before terminating, wait 100/ms/
  --   for all receipts to be confirmed by the broker
  --   'withTransaction' is called like:
  --
  --   > eiR <- try $ withTransaction c [OTimeout 100, OWithReceipts] \_ -> do
  --   > case eiR of
  --   >   Left e  -> -- error handling
  --   >   Right x -> do -- ..
  --
  --   Note that 'try' is used to catch any 'StomplException'.
  ------------------------------------------------------------------------
  withTransaction :: Con -> [Topt] -> (Tx -> IO a) -> IO a
  withTransaction :: Con -> [Topt] -> (Tx -> IO a) -> IO a
withTransaction Con
cid [Topt]
os Tx -> IO a
op = do
    Tx
tx <- IO Tx
mkUniqueTxId
    let t :: Transaction
t = Tx -> [Topt] -> Transaction
mkTrn Tx
tx [Topt]
os
    Connection
c <- Con -> IO Connection
getCon Con
cid
    if Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ Connection -> Bool
connected Connection
c
      then StomplException -> IO a
forall e a. Exception e => e -> IO a
throwIO (StomplException -> IO a) -> StomplException -> IO a
forall a b. (a -> b) -> a -> b
$ String -> StomplException
ConnectException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$
             String
"Not connected (" String -> String -> String
forall a. [a] -> [a] -> [a]
++ Con -> String
forall a. Show a => a -> String
show Con
cid String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
")"
      else IO a -> IO () -> IO a
forall a b. IO a -> IO b -> IO a
finally (do Transaction -> Con -> IO ()
addTx Transaction
t Con
cid
                       Con -> Connection -> Transaction -> IO ()
startTx Con
cid Connection
c Transaction
t 
                       a
x <- Tx -> IO a
op Tx
tx
                       Tx -> Con -> TxState -> IO ()
updTxState Tx
tx Con
cid TxState
TxEnded
                       a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x)
                   -- if an exception is raised in terminate
                   -- we at least will remove the transaction
                   -- from our state and then reraise 
                   (Con -> Tx -> IO ()
terminateTx Con
cid Tx
tx IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
`onException` Tx -> Con -> IO ()
rmThisTx Tx
tx Con
cid)
  
  ------------------------------------------------------------------------
  -- | Waits for the 'Receipt' to be confirmed by the broker.
  --   Since the thread will preempt, the call should be protected
  --   with /timeout/, /e.g./:
  --
  --   > mb_ <- timeout tmo $ waitReceipt c r
  --   > case mb_ of
  --   >  Nothing -> -- error handling
  --   >  Just _  -> do -- ...
  ------------------------------------------------------------------------
  waitReceipt :: Con -> Receipt -> IO ()
  waitReceipt :: Con -> Rec -> IO ()
waitReceipt Con
cid Rec
r =
    case Rec
r of
      Rec
NoRec -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
      Rec
_     -> IO ()
waitForMe 
    where waitForMe :: IO ()
waitForMe = do
            Bool
ok <- Con -> Rec -> IO Bool
checkReceipt Con
cid Rec
r
            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
ok (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
              Int -> IO ()
threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> Int
ms Int
1
              IO ()
waitForMe 

  ------------------------------------------------------------------------
  -- | Aborts the transaction immediately by raising 'AppException'.
  --   The string passed in to 'abort' will be added to the 
  --   exception message.
  ------------------------------------------------------------------------
  abort :: String -> IO ()
  abort :: String -> IO ()
abort String
e = EH
forall e a. Exception e => e -> IO a
throwIO EH -> EH
forall a b. (a -> b) -> a -> b
$ String -> StomplException
AppException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$
              String
"Tx aborted by application: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
e

  ------------------------------------------------------------------------
  -- Terminate the transaction appropriately
  -- either committing or aborting
  ------------------------------------------------------------------------
  terminateTx :: Con -> Tx -> IO ()
  terminateTx :: Con -> Tx -> IO ()
terminateTx Con
cid Tx
tx = do
    Connection
c   <- Con -> IO Connection
getCon Con
cid
    Maybe Transaction
mbT <- Tx -> Connection -> IO (Maybe Transaction)
getTx Tx
tx Connection
c
    case Maybe Transaction
mbT of
      Maybe Transaction
Nothing -> String -> IO ()
putStrLn String
"Transaction terminated!" -- return ()
                 -- throwIO $ OuchException $ 
                 --  "Transaction disappeared: " ++ show tx
      Just Transaction
t | Transaction -> TxState
txState Transaction
t TxState -> TxState -> Bool
forall a. Eq a => a -> a -> Bool
/= TxState
TxEnded -> Bool -> Con -> Connection -> Tx -> Transaction -> IO ()
endTx Bool
False Con
cid Connection
c Tx
tx Transaction
t
             | Transaction -> Bool
txReceipts Transaction
t Bool -> Bool -> Bool
|| Transaction -> Bool
txPendingAck Transaction
t -> do 
                Bool
ok <- Tx -> Con -> Int -> IO Bool
waitTx Tx
tx Con
cid (Int -> IO Bool) -> Int -> IO Bool
forall a b. (a -> b) -> a -> b
$ Transaction -> Int
txTmo Transaction
t
                if Bool
ok
                  then Bool -> Con -> Connection -> Tx -> Transaction -> IO ()
endTx Bool
True Con
cid Connection
c Tx
tx Transaction
t
                  else do
                    Bool -> Con -> Connection -> Tx -> Transaction -> IO ()
endTx Bool
False Con
cid Connection
c Tx
tx Transaction
t
                    let m :: String
m = if Transaction -> Bool
txPendingAck Transaction
t then String
"Acks" else String
"Receipts"
                    EH
forall e a. Exception e => e -> IO a
throwIO EH -> EH
forall a b. (a -> b) -> a -> b
$ String -> StomplException
TxException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$
                       String
"Transaction aborted: Missing " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
m
             | Bool
otherwise -> Bool -> Con -> Connection -> Tx -> Transaction -> IO ()
endTx Bool
True Con
cid Connection
c Tx
tx Transaction
t

  -----------------------------------------------------------------------
  -- Send begin frame
  -- We don't wait for the receipt now
  -- we will wait for receipts 
  -- on terminating the transaction anyway
  -----------------------------------------------------------------------
  startTx :: Con -> Connection -> Transaction -> IO ()
  startTx :: Con -> Connection -> Transaction -> IO ()
startTx Con
cid Connection
c Transaction
t = do
    Rec
rc <- if Transaction -> Bool
txAbrtRc Transaction
t then IO Rec
mkUniqueRecc else Rec -> IO Rec
forall (m :: * -> *) a. Monad m => a -> m a
return Rec
NoRec
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Transaction -> Bool
txAbrtRc Transaction
t) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Con -> Rec -> IO ()
addRec Con
cid Rec
rc 
    Con -> IO ()
logSend Con
cid
    Connection -> String -> String -> IO ()
fBegin Connection
c (Tx -> String
forall a. Show a => a -> String
show (Tx -> String) -> Tx -> String
forall a b. (a -> b) -> a -> b
$ Transaction -> Tx
txId Transaction
t) (Rec -> String
forall a. Show a => a -> String
show Rec
rc)

  -----------------------------------------------------------------------
  -- Send commit or abort frame
  -- and, if we work with receipts,
  -- wait for the receipt
  -----------------------------------------------------------------------
  endTx :: Bool -> Con -> Connection -> Tx -> Transaction -> IO ()
  endTx :: Bool -> Con -> Connection -> Tx -> Transaction -> IO ()
endTx Bool
x Con
cid Connection
c Tx
tx Transaction
t = do
    let w :: Bool
w = Transaction -> Bool
txAbrtRc Transaction
t Bool -> Bool -> Bool
&& Transaction -> Int
txTmo Transaction
t Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0 
    Rec
rc <- if Bool
w then IO Rec
mkUniqueRecc else Rec -> IO Rec
forall (m :: * -> *) a. Monad m => a -> m a
return Rec
NoRec
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
w (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Con -> Rec -> IO ()
addRec Con
cid Rec
rc 
    Con -> IO ()
logSend Con
cid
    if Bool
x then Connection -> String -> String -> IO ()
fCommit Connection
c (Tx -> String
forall a. Show a => a -> String
show Tx
tx) (Rec -> String
forall a. Show a => a -> String
show Rec
rc)
         else Connection -> String -> String -> IO ()
fAbort  Connection
c (Tx -> String
forall a. Show a => a -> String
show Tx
tx) (Rec -> String
forall a. Show a => a -> String
show Rec
rc)
    Maybe ()
mbR <- if Bool
w then Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
timeout (Int -> Int
ms (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$ Transaction -> Int
txTmo Transaction
t) (IO () -> IO (Maybe ())) -> IO () -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ Con -> Rec -> IO ()
waitReceipt Con
cid Rec
rc
                else Maybe () -> IO (Maybe ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe () -> IO (Maybe ())) -> Maybe () -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ () -> Maybe ()
forall a. a -> Maybe a
Just ()
    Con -> IO ()
rmTx Con
cid
    case Maybe ()
mbR of
      Just ()
_  -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
      Maybe ()
Nothing -> EH
forall e a. Exception e => e -> IO a
throwIO EH -> EH
forall a b. (a -> b) -> a -> b
$ String -> StomplException
TxException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$
                   String
"Transaction in unknown State: " String -> String -> String
forall a. [a] -> [a] -> [a]
++
                   String
"missing receipt for " String -> String -> String
forall a. [a] -> [a] -> [a]
++ (if Bool
x then String
"commit!" 
                                                   else String
"abort!")

  -----------------------------------------------------------------------
  -- Check if there are pending acks,
  -- if so, already wrong,
  -- otherwise wait for receipts
  -----------------------------------------------------------------------
  waitTx :: Tx -> Con -> Int -> IO Bool
  waitTx :: Tx -> Con -> Int -> IO Bool
waitTx Tx
tx Con
cid Int
delay = do
    Connection
c   <- Con -> IO Connection
getCon Con
cid
    Maybe Transaction
mbT <- Tx -> Connection -> IO (Maybe Transaction)
getTx Tx
tx Connection
c
    case Maybe Transaction
mbT of
      Maybe Transaction
Nothing -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
      Just Transaction
t  | Transaction -> Bool
txPendingAck Transaction
t -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
              | Transaction -> Bool
txReceipts   Transaction
t -> 
                  if Int
delay Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 then Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
                    else do
                      Int -> IO ()
threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> Int
ms Int
1
                      Tx -> Con -> Int -> IO Bool
waitTx Tx
tx Con
cid (Int
delay Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)
              | Bool
otherwise -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True

  -----------------------------------------------------------------------
  -- Transform a frame into a message
  -- using the queue's application callback
  -----------------------------------------------------------------------
  frmToMsg :: Reader a -> F.Frame -> IO (Message a)
  frmToMsg :: Reader a -> Frame -> IO (Message a)
frmToMsg Reader a
q Frame
f = do
    let b :: ByteString
b = Frame -> ByteString
F.getBody Frame
f
    let conv :: InBound a
conv = Reader a -> InBound a
forall a. Reader a -> InBound a
rFrom Reader a
q
    let sid :: Sub
sid  = if  String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (Frame -> String
F.getSub Frame
f) Bool -> Bool -> Bool
|| Bool -> Bool
not (String -> Bool
numeric (String -> Bool) -> String -> Bool
forall a b. (a -> b) -> a -> b
$ Frame -> String
F.getSub Frame
f)
                 then Sub
NoSub else Int -> Sub
Sub (Int -> Sub) -> Int -> Sub
forall a b. (a -> b) -> a -> b
$ String -> Int
forall a. Read a => String -> a
read (String -> Int) -> String -> Int
forall a b. (a -> b) -> a -> b
$ Frame -> String
F.getSub Frame
f
    a
x <- InBound a
conv (Frame -> Type
F.getMime Frame
f) (Frame -> Int
F.getLength Frame
f) (Frame -> [Header]
F.getHeaders Frame
f) ByteString
b
    let m :: Message a
m = MsgId
-> Sub
-> String
-> String
-> Type
-> Int
-> Tx
-> ByteString
-> a
-> Message a
forall a.
MsgId
-> Sub
-> String
-> String
-> Type
-> Int
-> Tx
-> ByteString
-> a
-> Message a
mkMessage (String -> MsgId
MsgId (String -> MsgId) -> String -> MsgId
forall a b. (a -> b) -> a -> b
$ Frame -> String
F.getId Frame
f) Sub
sid
                      (Frame -> String
F.getDest   Frame
f) 
                      (Frame -> String
F.getMsgAck Frame
f) 
                      (Frame -> Type
F.getMime   Frame
f)
                      (Frame -> Int
F.getLength Frame
f)
                      Tx
NoTx 
                      ByteString
b -- raw bytestring
                      a
x -- converted context
    Message a -> IO (Message a)
forall (m :: * -> *) a. Monad m => a -> m a
return Message a
m {msgHdrs :: [Header]
msgHdrs = Frame -> [Header]
F.getHeaders Frame
f}

  -----------------------------------------------------------------------
  -- Handle Connected Frame
  -----------------------------------------------------------------------
  handleConnected :: Con -> F.Frame -> MVar () -> IO ()
  handleConnected :: Con -> Frame -> MVar () -> IO ()
handleConnected Con
cid Frame
f MVar ()
m = do -- withCon cid $ \c -> 
     Connection
c <- Con -> IO Connection
getCon Con
cid
     if Connection -> Bool
connected Connection
c 
      then Connection -> EH
throwToOwner Connection
c EH -> EH
forall a b. (a -> b) -> a -> b
$
                String -> StomplException
ProtocolException String
"Unexptected Connected frame"
      else Con -> Connection -> IO ()
updCon Con
cid Connection
c {conSrv :: String
conSrv  =  let srv :: SrvDesc
srv = Frame -> SrvDesc
F.getServer Frame
f
                                     in SrvDesc -> String
F.getSrvName SrvDesc
srv String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
"/"  String -> String -> String
forall a. [a] -> [a] -> [a]
++
                                        SrvDesc -> String
F.getSrvVer  SrvDesc
srv String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" (" String -> String -> String
forall a. [a] -> [a] -> [a]
++
                                        SrvDesc -> String
F.getSrvCmts SrvDesc
srv String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
")",
                         conBeat :: Version
conBeat =  Frame -> Version
F.getBeat    Frame
f,
                         conVers :: [Version]
conVers = [Frame -> Version
F.getVersion Frame
f],
                         conSes :: String
conSes  =  Frame -> String
F.getSession Frame
f}
           IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
m () 

  -----------------------------------------------------------------------
  -- Handle Message Frame
  -----------------------------------------------------------------------
  handleMessage :: Con -> F.Frame -> IO ()
  handleMessage :: Con -> Frame -> IO ()
handleMessage Con
cid Frame
f = do
    Connection
c <- Con -> IO Connection
getCon Con
cid
    case Connection -> Maybe (Chan Frame)
getCh Connection
c of
      Maybe (Chan Frame)
Nothing -> Connection -> EH
throwToOwner Connection
c EH -> EH
forall a b. (a -> b) -> a -> b
$ 
                 String -> StomplException
ProtocolException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$ String
"Unknown Channel: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Frame -> String
forall a. Show a => a -> String
show Frame
f
      Just Chan Frame
ch -> Chan Frame -> Frame -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan Frame
ch Frame
f
    where getCh :: Connection -> Maybe (Chan Frame)
getCh Connection
c = let dst :: String
dst = Frame -> String
F.getDest Frame
f
                        sid :: String
sid = Frame -> String
F.getSub  Frame
f
                    in if String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null String
sid
                      then String -> Connection -> Maybe (Chan Frame)
getDest String
dst Connection
c
                      else if String -> Bool
numeric String
sid
                             then Sub -> Connection -> Maybe (Chan Frame)
getSub (Int -> Sub
Sub (Int -> Sub) -> Int -> Sub
forall a b. (a -> b) -> a -> b
$ String -> Int
forall a. Read a => String -> a
read String
sid) Connection
c
                             else Maybe (Chan Frame)
forall a. Maybe a
Nothing 

  -----------------------------------------------------------------------
  -- Handle Error Frame
  -----------------------------------------------------------------------
  handleError :: Con -> F.Frame -> IO ()
  handleError :: Con -> Frame -> IO ()
handleError Con
cid Frame
f = do
    Connection
c <- Con -> IO Connection
getCon Con
cid
    case Connection -> Maybe (Con -> Frame -> IO ())
getEH Connection
c of 
      Just Con -> Frame -> IO ()
eh -> Con -> Frame -> IO ()
eh Con
cid Frame
f
      Maybe (Con -> Frame -> IO ())
Nothing -> let r :: String
r | String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (Frame -> String
F.getReceipt Frame
f) = String
""
                       | Bool
otherwise             = String
" (" String -> String -> String
forall a. [a] -> [a] -> [a]
++ Frame -> String
F.getReceipt Frame
f String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
")"
                     e :: String
e = Frame -> String
F.getMsg Frame
f String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
r String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
": " String -> String -> String
forall a. [a] -> [a] -> [a]
++ ByteString -> String
U.toString (Frame -> ByteString
F.getBody Frame
f)
                  in Connection -> EH
throwToOwner Connection
c (String -> StomplException
BrokerException String
e) 

  -----------------------------------------------------------------------
  -- Handle Receipt Frame
  -----------------------------------------------------------------------
  handleReceipt :: Con -> F.Frame -> IO ()
  handleReceipt :: Con -> Frame -> IO ()
handleReceipt Con
cid Frame
f = do
    Connection
c <- Con -> IO Connection
getCon Con
cid
    case String -> Maybe Rec
parseRec (String -> Maybe Rec) -> String -> Maybe Rec
forall a b. (a -> b) -> a -> b
$ Frame -> String
F.getReceipt Frame
f of
      Just Rec
r  -> Con -> Rec -> IO ()
forceRmRec Con
cid Rec
r 
      Maybe Rec
Nothing -> Connection -> EH
throwToOwner Connection
c EH -> EH
forall a b. (a -> b) -> a -> b
$ 
                   String -> StomplException
ProtocolException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$ String
"Invalid Receipt: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Frame -> String
forall a. Show a => a -> String
show Frame
f

  -----------------------------------------------------------------------
  -- Handle Beat Frame, i.e. ignore them
  -----------------------------------------------------------------------
  handleBeat :: Con -> F.Frame -> IO ()
  handleBeat :: Con -> Frame -> IO ()
handleBeat Con
_ Frame
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return () -- putStrLn "Beat!"

  -----------------------------------------------------------------------
  -- My Beat 
  -----------------------------------------------------------------------
  heartBeat :: Con -> Int -> IO ()
  heartBeat :: Con -> Int -> IO ()
heartBeat Con
cid Int
p = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    UTCTime
now <- IO UTCTime
getCurrentTime
    Connection
c   <- Con -> IO Connection
getCon Con
cid
    let me :: UTCTime
me = Connection -> UTCTime
myMust  Connection
c 
    let he :: UTCTime
he = Connection -> UTCTime
hisMust Connection
c 
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (UTCTime
now UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
> UTCTime
he) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection -> EH
throwToOwner Connection
c EH -> EH
forall a b. (a -> b) -> a -> b
$ 
                         String -> StomplException
ProtocolException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$ 
                           String
"Missing HeartBeat, last was " String -> String -> String
forall a. [a] -> [a] -> [a]
++
                           NominalDiffTime -> String
forall a. Show a => a -> String
show (UTCTime
now UTCTime -> UTCTime -> NominalDiffTime
`diffUTCTime` UTCTime
he)    String -> String -> String
forall a. [a] -> [a] -> [a]
++ 
                           String
" seconds ago!"
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (UTCTime
now UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
>= UTCTime
me) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection -> IO ()
fSendBeat Connection
c
    Int -> IO ()
threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> Int
ms Int
p

  -----------------------------------------------------------------------
  -- When we should have sent last heartbeat
  -----------------------------------------------------------------------
  myMust :: Connection -> UTCTime
  myMust :: Connection -> UTCTime
myMust Connection
c = let t :: UTCTime
t = Connection -> UTCTime
conMyBeat Connection
c
                 p :: Int
p = Version -> Int
forall a b. (a, b) -> b
snd (Version -> Int) -> Version -> Int
forall a b. (a -> b) -> a -> b
$ Connection -> Version
conBeat Connection
c
             in  UTCTime -> Int -> UTCTime
timeAdd UTCTime
t Int
p

  -----------------------------------------------------------------------
  -- When he should have sent last heartbeat
  -----------------------------------------------------------------------
  hisMust :: Connection -> UTCTime
  hisMust :: Connection -> UTCTime
hisMust Connection
c = let t :: UTCTime
t   = Connection -> UTCTime
conHisBeat Connection
c
                  tol :: Int
tol = Int
4
                  b :: Int
b   = Version -> Int
forall a b. (a, b) -> a
fst (Version -> Int) -> Version -> Int
forall a b. (a -> b) -> a -> b
$ Connection -> Version
conBeat Connection
c
                  p :: Int
p   = Int
tol Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
b
              in  UTCTime -> Int -> UTCTime
timeAdd UTCTime
t Int
p

  -----------------------------------------------------------------------
  -- Adding a period to a point in time
  -----------------------------------------------------------------------
  timeAdd :: UTCTime -> Int -> UTCTime
  timeAdd :: UTCTime -> Int -> UTCTime
timeAdd UTCTime
t Int
p = Int -> NominalDiffTime
ms2nominal Int
p NominalDiffTime -> UTCTime -> UTCTime
`addUTCTime` UTCTime
t

  -----------------------------------------------------------------------
  -- Convert milliseconds to seconds
  -----------------------------------------------------------------------
  ms2nominal :: Int -> NominalDiffTime
  ms2nominal :: Int -> NominalDiffTime
ms2nominal Int
m = Int -> NominalDiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
m NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Fractional a => a -> a -> a
/ (NominalDiffTime
1000::NominalDiffTime)

  ---------------------------------------------------------------------
  -- begin transaction
  ---------------------------------------------------------------------
  fBegin :: Connection -> String -> String -> IO ()
  fBegin :: Connection -> String -> String -> IO ()
fBegin Connection
c String
tx String
receipt = Connection
-> String
-> String
-> [Header]
-> (String -> String -> [Header] -> Either String Frame)
-> IO ()
forall a.
Connection
-> a
-> String
-> [Header]
-> (a -> String -> [Header] -> Either String Frame)
-> IO ()
sendFrame Connection
c String
tx String
receipt [] String -> String -> [Header] -> Either String Frame
mkBeginF

  ---------------------------------------------------------------------
  -- commit transaction
  ---------------------------------------------------------------------
  fCommit :: Connection -> String -> String -> IO ()
  fCommit :: Connection -> String -> String -> IO ()
fCommit Connection
c String
tx String
receipt = Connection
-> String
-> String
-> [Header]
-> (String -> String -> [Header] -> Either String Frame)
-> IO ()
forall a.
Connection
-> a
-> String
-> [Header]
-> (a -> String -> [Header] -> Either String Frame)
-> IO ()
sendFrame Connection
c String
tx String
receipt [] String -> String -> [Header] -> Either String Frame
mkCommitF

  ---------------------------------------------------------------------
  -- abort transaction
  ---------------------------------------------------------------------
  fAbort :: Connection -> String -> String -> IO ()
  fAbort :: Connection -> String -> String -> IO ()
fAbort Connection
c String
tx String
receipt = Connection
-> String
-> String
-> [Header]
-> (String -> String -> [Header] -> Either String Frame)
-> IO ()
forall a.
Connection
-> a
-> String
-> [Header]
-> (a -> String -> [Header] -> Either String Frame)
-> IO ()
sendFrame Connection
c String
tx String
receipt [] String -> String -> [Header] -> Either String Frame
mkAbortF

  ---------------------------------------------------------------------
  -- ack
  ---------------------------------------------------------------------
  fAck :: Connection -> Message a -> String -> IO ()
  fAck :: Connection -> Message a -> String -> IO ()
fAck Connection
c Message a
m String
receipt = Connection
-> Message a
-> String
-> [Header]
-> (Message a -> String -> [Header] -> Either String Frame)
-> IO ()
forall a.
Connection
-> a
-> String
-> [Header]
-> (a -> String -> [Header] -> Either String Frame)
-> IO ()
sendFrame Connection
c Message a
m String
receipt []  (Bool -> Message a -> String -> [Header] -> Either String Frame
forall a.
Bool -> Message a -> String -> [Header] -> Either String Frame
mkAckF Bool
True)

  ---------------------------------------------------------------------
  -- nack
  ---------------------------------------------------------------------
  fNack :: Connection -> Message a -> String -> IO ()
  fNack :: Connection -> Message a -> String -> IO ()
fNack Connection
c Message a
m String
receipt = Connection
-> Message a
-> String
-> [Header]
-> (Message a -> String -> [Header] -> Either String Frame)
-> IO ()
forall a.
Connection
-> a
-> String
-> [Header]
-> (a -> String -> [Header] -> Either String Frame)
-> IO ()
sendFrame Connection
c Message a
m String
receipt [] (Bool -> Message a -> String -> [Header] -> Either String Frame
forall a.
Bool -> Message a -> String -> [Header] -> Either String Frame
mkAckF Bool
False)

  ---------------------------------------------------------------------
  -- subscribe
  ---------------------------------------------------------------------
  fSubscribe :: Connection -> Subscription -> String -> [F.Header] -> IO ()
  fSubscribe :: Connection -> Subscription -> String -> [Header] -> IO ()
fSubscribe Connection
c Subscription
sub String
receipt [Header]
hs = Connection
-> Subscription
-> String
-> [Header]
-> (Subscription -> String -> [Header] -> Either String Frame)
-> IO ()
forall a.
Connection
-> a
-> String
-> [Header]
-> (a -> String -> [Header] -> Either String Frame)
-> IO ()
sendFrame Connection
c Subscription
sub String
receipt [Header]
hs Subscription -> String -> [Header] -> Either String Frame
mkSubF

  ---------------------------------------------------------------------
  -- unsubscribe
  ---------------------------------------------------------------------
  fUnsubscribe :: Connection -> Subscription -> String -> [F.Header] -> IO ()
  fUnsubscribe :: Connection -> Subscription -> String -> [Header] -> IO ()
fUnsubscribe Connection
c Subscription
sub String
receipt [Header]
hs = Connection
-> Subscription
-> String
-> [Header]
-> (Subscription -> String -> [Header] -> Either String Frame)
-> IO ()
forall a.
Connection
-> a
-> String
-> [Header]
-> (a -> String -> [Header] -> Either String Frame)
-> IO ()
sendFrame Connection
c Subscription
sub String
receipt [Header]
hs Subscription -> String -> [Header] -> Either String Frame
mkUnSubF

  ---------------------------------------------------------------------
  -- send
  ---------------------------------------------------------------------
  fSend :: Connection -> Message a -> String -> [F.Header] -> IO ()
  fSend :: Connection -> Message a -> String -> [Header] -> IO ()
fSend Connection
c Message a
msg String
receipt [Header]
hs = Connection
-> Message a
-> String
-> [Header]
-> (Message a -> String -> [Header] -> Either String Frame)
-> IO ()
forall a.
Connection
-> a
-> String
-> [Header]
-> (a -> String -> [Header] -> Either String Frame)
-> IO ()
sendFrame Connection
c Message a
msg String
receipt [Header]
hs Message a -> String -> [Header] -> Either String Frame
forall a. Message a -> String -> [Header] -> Either String Frame
mkSendF

  ---------------------------------------------------------------------
  -- heart beat
  ---------------------------------------------------------------------
  fSendBeat :: Connection -> IO ()
  fSendBeat :: Connection -> IO ()
fSendBeat Connection
c = Connection
-> ()
-> String
-> [Header]
-> (() -> String -> [Header] -> Either String Frame)
-> IO ()
forall a.
Connection
-> a
-> String
-> [Header]
-> (a -> String -> [Header] -> Either String Frame)
-> IO ()
sendFrame Connection
c () String
"" [] (\()
_ String
_ [Header]
_ -> Frame -> Either String Frame
forall a b. b -> Either a b
Right Frame
F.mkBeat)

  ---------------------------------------------------------------------
  -- generic sendFrame:
  -- takes a connection some data (like subscribe, message, etc.)
  -- some headers, a function that creates a frame or returns an error
  -- creates the frame and sends it
  ---------------------------------------------------------------------
  sendFrame :: Connection -> a -> String -> [F.Header] -> 
               (a -> String -> [F.Header] -> Either String F.Frame) -> IO ()
  sendFrame :: Connection
-> a
-> String
-> [Header]
-> (a -> String -> [Header] -> Either String Frame)
-> IO ()
sendFrame Connection
c a
m String
receipt [Header]
hs a -> String -> [Header] -> Either String Frame
mkF = 
    if Bool -> Bool
not (Connection -> Bool
connected Connection
c) then EH
forall e a. Exception e => e -> IO a
throwIO EH -> EH
forall a b. (a -> b) -> a -> b
$ String -> StomplException
ConnectException String
"Not connected!"
      else case a -> String -> [Header] -> Either String Frame
mkF a
m String
receipt [Header]
hs of
             Left  String
e -> EH
forall e a. Exception e => e -> IO a
throwIO EH -> EH
forall a b. (a -> b) -> a -> b
$ String -> StomplException
ProtocolException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$
                          String
"Cannot create Frame: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
e
             Right Frame
f -> 
#ifdef _DEBUG
               do when (not $ F.complies (1,2) f) $
                    putStrLn $ "Frame does not comply with 1.2: " ++ show f 
#endif
                  Chan Frame -> Frame -> IO ()
forall a. Chan a -> a -> IO ()
writeChan (Connection -> Chan Frame
conChn Connection
c) Frame
f
 
  ---------------------------------------------------------------------
  -- frame constructors
  -- this needs review...
  ---------------------------------------------------------------------
  mkReceipt :: String -> [F.Header]
  mkReceipt :: String -> [Header]
mkReceipt String
receipt = if String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null String
receipt then [] else [String -> Header
F.mkRecHdr String
receipt]

  mkConF :: ([F.Header] -> Either String F.Frame) ->
            String -> String -> String -> String  -> 
            [F.Version] -> F.Heart -> [F.Header]  -> Either String F.Frame
  mkConF :: ([Header] -> Either String Frame)
-> String
-> String
-> String
-> String
-> [Version]
-> Version
-> [Header]
-> Either String Frame
mkConF [Header] -> Either String Frame
mk String
host String
usr String
pwd String
cli [Version]
vs Version
beat [Header]
hs = 
    let uHdr :: [Header]
uHdr = if String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null String
usr then [] else [String -> Header
F.mkLogHdr  String
usr]
        pHdr :: [Header]
pHdr = if String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null String
pwd then [] else [String -> Header
F.mkPassHdr String
pwd]
        cHdr :: [Header]
cHdr = if String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null String
cli then [] else [String -> Header
F.mkCliIdHdr String
cli]
     in [Header] -> Either String Frame
mk ([Header] -> Either String Frame)
-> [Header] -> Either String Frame
forall a b. (a -> b) -> a -> b
$ [String -> Header
F.mkHostHdr String
host,
              String -> Header
F.mkAcVerHdr (String -> Header) -> String -> Header
forall a b. (a -> b) -> a -> b
$ [Version] -> String
F.versToVal [Version]
vs, 
              String -> Header
F.mkBeatHdr  (String -> Header) -> String -> Header
forall a b. (a -> b) -> a -> b
$ Version -> String
F.beatToVal Version
beat] [Header] -> [Header] -> [Header]
forall a. [a] -> [a] -> [a]
++
             [Header]
uHdr [Header] -> [Header] -> [Header]
forall a. [a] -> [a] -> [a]
++ [Header]
pHdr [Header] -> [Header] -> [Header]
forall a. [a] -> [a] -> [a]
++ [Header]
cHdr [Header] -> [Header] -> [Header]
forall a. [a] -> [a] -> [a]
++ [Header]
hs

  ---------------------------------------------------------------------
  -- make Disconnect Frame
  ---------------------------------------------------------------------
  mkDiscF :: String -> Either String F.Frame
  mkDiscF :: String -> Either String Frame
mkDiscF String
receipt =
    [Header] -> Either String Frame
F.mkDisFrame ([Header] -> Either String Frame)
-> [Header] -> Either String Frame
forall a b. (a -> b) -> a -> b
$ String -> [Header]
mkReceipt String
receipt

  ---------------------------------------------------------------------
  -- make Subscribe Frame
  ---------------------------------------------------------------------
  mkSubF :: Subscription -> String -> [F.Header] -> Either String F.Frame
  mkSubF :: Subscription -> String -> [Header] -> Either String Frame
mkSubF Subscription
sub String
receipt [Header]
hs = 
    [Header] -> Either String Frame
F.mkSubFrame ([Header] -> Either String Frame)
-> [Header] -> Either String Frame
forall a b. (a -> b) -> a -> b
$ [String -> Header
F.mkIdHdr   (String -> Header) -> String -> Header
forall a b. (a -> b) -> a -> b
$ Sub -> String
forall a. Show a => a -> String
show (Sub -> String) -> Sub -> String
forall a b. (a -> b) -> a -> b
$ Subscription -> Sub
subId Subscription
sub,
                    String -> Header
F.mkDestHdr (String -> Header) -> String -> Header
forall a b. (a -> b) -> a -> b
$ Subscription -> String
subName Subscription
sub,
                    String -> Header
F.mkAckHdr  (String -> Header) -> String -> Header
forall a b. (a -> b) -> a -> b
$ AckMode -> String
forall a. Show a => a -> String
show (AckMode -> String) -> AckMode -> String
forall a b. (a -> b) -> a -> b
$ Subscription -> AckMode
subMode Subscription
sub] [Header] -> [Header] -> [Header]
forall a. [a] -> [a] -> [a]
++ 
                   String -> [Header]
mkReceipt String
receipt [Header] -> [Header] -> [Header]
forall a. [a] -> [a] -> [a]
++ [Header]
hs

  ---------------------------------------------------------------------
  -- make Unsubscribe Frame
  ---------------------------------------------------------------------
  mkUnSubF :: Subscription -> String -> [F.Header] -> Either String F.Frame
  mkUnSubF :: Subscription -> String -> [Header] -> Either String Frame
mkUnSubF Subscription
sub String
receipt [Header]
hs =
    let dh :: [Header]
dh = if String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (Subscription -> String
subName Subscription
sub) then [] else [String -> Header
F.mkDestHdr (String -> Header) -> String -> Header
forall a b. (a -> b) -> a -> b
$ Subscription -> String
subName Subscription
sub]
    in  [Header] -> Either String Frame
F.mkUSubFrame ([Header] -> Either String Frame)
-> [Header] -> Either String Frame
forall a b. (a -> b) -> a -> b
$ [String -> Header
F.mkIdHdr (String -> Header) -> String -> Header
forall a b. (a -> b) -> a -> b
$ Sub -> String
forall a. Show a => a -> String
show (Sub -> String) -> Sub -> String
forall a b. (a -> b) -> a -> b
$ Subscription -> Sub
subId Subscription
sub] [Header] -> [Header] -> [Header]
forall a. [a] -> [a] -> [a]
++ [Header]
dh [Header] -> [Header] -> [Header]
forall a. [a] -> [a] -> [a]
++ 
                        String -> [Header]
mkReceipt String
receipt [Header] -> [Header] -> [Header]
forall a. [a] -> [a] -> [a]
++ [Header]
hs

  ---------------------------------------------------------------------
  -- make Send Frame
  ---------------------------------------------------------------------
  mkSendF :: Message a -> String -> [F.Header] -> Either String F.Frame
  mkSendF :: Message a -> String -> [Header] -> Either String Frame
mkSendF Message a
msg String
receipt [Header]
hs = 
    Frame -> Either String Frame
forall a b. b -> Either a b
Right (Frame -> Either String Frame) -> Frame -> Either String Frame
forall a b. (a -> b) -> a -> b
$ String
-> String
-> String
-> Type
-> Int
-> [Header]
-> ByteString
-> Frame
F.mkSend (Message a -> String
forall a. Message a -> String
msgDest Message a
msg) (Tx -> String
forall a. Show a => a -> String
show (Tx -> String) -> Tx -> String
forall a b. (a -> b) -> a -> b
$ Message a -> Tx
forall a. Message a -> Tx
msgTx Message a
msg)  String
receipt 
                     (Message a -> Type
forall a. Message a -> Type
msgType Message a
msg) (Message a -> Int
forall a. Message a -> Int
msgLen Message a
msg) [Header]
hs -- escape headers! 
                     (Message a -> ByteString
forall a. Message a -> ByteString
msgRaw  Message a
msg) 

  ---------------------------------------------------------------------
  -- make Ack Frame
  ---------------------------------------------------------------------
  mkAckF :: Bool -> Message a -> String -> [F.Header] -> Either String F.Frame
  mkAckF :: Bool -> Message a -> String -> [Header] -> Either String Frame
mkAckF Bool
ok Message a
msg String
receipt [Header]
_ =
    let sh :: [Header]
sh = if String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (String -> Bool) -> String -> Bool
forall a b. (a -> b) -> a -> b
$ Sub -> String
forall a. Show a => a -> String
show (Sub -> String) -> Sub -> String
forall a b. (a -> b) -> a -> b
$ Message a -> Sub
forall a. Message a -> Sub
msgSub Message a
msg then [] 
               else [String -> Header
F.mkSubHdr (String -> Header) -> String -> Header
forall a b. (a -> b) -> a -> b
$ Sub -> String
forall a. Show a => a -> String
show (Sub -> String) -> Sub -> String
forall a b. (a -> b) -> a -> b
$ Message a -> Sub
forall a. Message a -> Sub
msgSub Message a
msg]
        th :: [Header]
th = if String -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (String -> Bool) -> String -> Bool
forall a b. (a -> b) -> a -> b
$ Tx -> String
forall a. Show a => a -> String
show (Tx -> String) -> Tx -> String
forall a b. (a -> b) -> a -> b
$ Message a -> Tx
forall a. Message a -> Tx
msgTx Message a
msg 
               then [] else [String -> Header
F.mkTrnHdr (String -> Header) -> String -> Header
forall a b. (a -> b) -> a -> b
$ Tx -> String
forall a. Show a => a -> String
show (Tx -> String) -> Tx -> String
forall a b. (a -> b) -> a -> b
$ Message a -> Tx
forall a. Message a -> Tx
msgTx Message a
msg]
        rh :: [Header]
rh = String -> [Header]
mkReceipt String
receipt
        mk :: [Header] -> Either String Frame
mk = if Bool
ok then [Header] -> Either String Frame
F.mkAckFrame else [Header] -> Either String Frame
F.mkNackFrame
    in [Header] -> Either String Frame
mk ([Header] -> Either String Frame)
-> [Header] -> Either String Frame
forall a b. (a -> b) -> a -> b
$ String -> Header
F.mkIdHdr (Message a -> String
forall a. Message a -> String
msgAck Message a
msg) Header -> [Header] -> [Header]
forall a. a -> [a] -> [a]
: ([Header]
sh [Header] -> [Header] -> [Header]
forall a. [a] -> [a] -> [a]
++ [Header]
rh [Header] -> [Header] -> [Header]
forall a. [a] -> [a] -> [a]
++ [Header]
th)

  ---------------------------------------------------------------------
  -- make Begin Frame
  ---------------------------------------------------------------------
  mkBeginF :: String -> String -> [F.Header] -> Either String F.Frame
  mkBeginF :: String -> String -> [Header] -> Either String Frame
mkBeginF String
tx String
receipt [Header]
_ = 
    [Header] -> Either String Frame
F.mkBgnFrame ([Header] -> Either String Frame)
-> [Header] -> Either String Frame
forall a b. (a -> b) -> a -> b
$ String -> Header
F.mkTrnHdr String
tx Header -> [Header] -> [Header]
forall a. a -> [a] -> [a]
: String -> [Header]
mkReceipt String
receipt

  ---------------------------------------------------------------------
  -- make Commit Frame
  ---------------------------------------------------------------------
  mkCommitF :: String -> String -> [F.Header] -> Either String F.Frame
  mkCommitF :: String -> String -> [Header] -> Either String Frame
mkCommitF String
tx String
receipt [Header]
_ =
    [Header] -> Either String Frame
F.mkCmtFrame ([Header] -> Either String Frame)
-> [Header] -> Either String Frame
forall a b. (a -> b) -> a -> b
$ String -> Header
F.mkTrnHdr String
tx Header -> [Header] -> [Header]
forall a. a -> [a] -> [a]
: String -> [Header]
mkReceipt String
receipt

  ---------------------------------------------------------------------
  -- make Abort Frame
  ---------------------------------------------------------------------
  mkAbortF :: String -> String -> [F.Header] -> Either String F.Frame
  mkAbortF :: String -> String -> [Header] -> Either String Frame
mkAbortF String
tx String
receipt [Header]
_ =
    [Header] -> Either String Frame
F.mkAbrtFrame ([Header] -> Either String Frame)
-> [Header] -> Either String Frame
forall a b. (a -> b) -> a -> b
$ String -> Header
F.mkTrnHdr String
tx Header -> [Header] -> [Header]
forall a. a -> [a] -> [a]
: String -> [Header]
mkReceipt String
receipt