{-# Language BangPatterns,CPP #-}
module Network.Mom.Stompl.Client.State (
         msgContent, numeric, ms,
         Connection(..), mkConnection,
         connected, getVersion, 
         EHandler,
         getEH, 
         Copt(..),
         oHeartBeat, oMaxRecv,
         oAuth, oCliId, oStomp, oTmo, oTLS,oEH,
         Transaction(..),
         Topt(..), hasTopt, tmo,
         TxState(..),
         Receipt,
         Message(..), mkMessage, MsgId(..),
         Subscription(..), mkSub,
         logSend, logReceive,
         addCon, rmCon, getCon, withCon, updCon,
         addSub, addDest, getSub, getDest,
         rmSub, rmDest,
         mkTrn,
         addTx,  getTx, rmTx, rmThisTx,
         getCurTx,
         updTxState,
         txPendingAck, txReceipts,
         addAck, rmAck, addRec, rmRec,
         forceRmRec,
         checkReceipt) 
where

  import qualified Network.Mom.Stompl.Client.Factory  as Fac

  import qualified Network.Mom.Stompl.Frame as F
  import           Network.Mom.Stompl.Client.Exception

  import           Control.Concurrent 
  import           Control.Exception (throwIO)

  import           System.IO.Unsafe

  import           Data.List (find)
  import           Data.Char (isDigit)
  import           Data.Time.Clock

  import           Data.Conduit.Network.TLS (TLSClientConfig, 
                                             tlsClientConfig,
                                             tlsClientUseTLS)

  import qualified Data.ByteString.Char8 as B

  import           Codec.MIME.Type as Mime (Type) 

  ------------------------------------------------------------------------
  -- | Returns the content of the message in the format 
  --   produced by an in-bound converter
  ------------------------------------------------------------------------
  msgContent :: Message a -> a
  msgContent :: Message a -> a
msgContent = Message a -> a
forall a. Message a -> a
msgCont

  ------------------------------------------------------------------------
  -- some helpers
  ------------------------------------------------------------------------
  numeric :: String -> Bool
  numeric :: String -> Bool
numeric = (Char -> Bool) -> String -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all Char -> Bool
isDigit

  ------------------------------------------------------------------------
  -- strict deletes
  ------------------------------------------------------------------------
  delete' :: Eq a => a -> [a] -> [a]
  delete' :: a -> [a] -> [a]
delete' = (a -> a -> Bool) -> a -> [a] -> [a]
forall a. (a -> a -> Bool) -> a -> [a] -> [a]
deleteBy' a -> a -> Bool
forall a. Eq a => a -> a -> Bool
(==) 

  deleteBy' :: (a -> a -> Bool) -> a -> [a] -> [a]
  deleteBy' :: (a -> a -> Bool) -> a -> [a] -> [a]
deleteBy' a -> a -> Bool
_ a
_ [] = []
  deleteBy' a -> a -> Bool
f a
p (a
x:[a]
xs) | a -> a -> Bool
f a
p a
x     = [a]
xs
                       | Bool
otherwise = let !xs' :: [a]
xs' = (a -> a -> Bool) -> a -> [a] -> [a]
forall a. (a -> a -> Bool) -> a -> [a] -> [a]
deleteBy' a -> a -> Bool
f a
p [a]
xs
                                      in  a
x a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
xs'

  ------------------------------------------------------------------------
  -- convert milliseconds to microseconds 
  ------------------------------------------------------------------------
  ms :: Int -> Int
  ms :: Int -> Int
ms Int
u = Int
1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
u

  ------------------------------------------------------------------------
  -- compare two tuples by the fst
  ------------------------------------------------------------------------
  eq :: Eq a => (a, b) -> (a, b) -> Bool
  eq :: (a, b) -> (a, b) -> Bool
eq (a, b)
x (a, b)
y = (a, b) -> a
forall a b. (a, b) -> a
fst (a, b)
x a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== (a, b) -> a
forall a b. (a, b) -> a
fst (a, b)
y

  -- | Just a nicer word for 'Rec'
  type Receipt = Fac.Rec

  ------------------------------------------------------------------------
  -- | Action executed when an Error Frame is received;
  --   the typical use case is logging the text of the Error Frame.
  ------------------------------------------------------------------------
  type EHandler = Fac.Con -> F.Frame -> IO ()

  ------------------------------------------------------------------------
  -- Connection
  ------------------------------------------------------------------------
  data Connection = Connection {
                      Connection -> Con
conId      :: Fac.Con,        -- Con handle
                      Connection -> String
conAddr    :: String,         -- the broker's IP address
                      Connection -> Int
conPort    :: Int,            -- the broker's port
                      Connection -> Int
conMax     :: Int,            -- max receive
                      Connection -> String
conUsr     :: String,         -- user
                      Connection -> String
conPwd     :: String,         -- passcode
                      Connection -> String
conCli     :: String,         -- client-id
                      Connection -> String
conSrv     :: String,         -- server description
                      Connection -> String
conSes     :: String,         -- session identifier 
                      Connection -> [Version]
conVers    :: [F.Version],    -- accepted versions
                      Connection -> Version
conBeat    :: F.Heart,        -- the heart beat
                      Connection -> Chan Frame
conChn     :: Chan F.Frame,   -- sender channel
                      Connection -> Bool
conBrk     :: Bool,
                      Connection -> ThreadId
conOwner   :: ThreadId,       -- thread that created 
                                                    -- the connection
                      Connection -> Maybe EHandler
conEH      :: Maybe EHandler, -- Error Handler
                      Connection -> UTCTime
conHisBeat :: UTCTime,        -- broker's next beat
                      Connection -> UTCTime
conMyBeat  :: UTCTime,        -- our next beat
                      Connection -> Int
conWait    :: Int,            -- wait for receipt
                                                    -- before terminating
                      Connection -> Int
conWaitE   :: Int,            -- wait on error handling
                      Connection -> [SubEntry]
conSubs    :: [SubEntry],     -- subscriptions
                      Connection -> [DestEntry]
conDests   :: [DestEntry],    -- destinations
                      Connection -> [ThreadEntry]
conThrds   :: [ThreadEntry],  -- threads with transactions
                      Connection -> [Receipt]
conRecs    :: [Receipt],      -- expected receipts 
                      Connection -> [MsgId]
conAcks    :: [MsgId]}        -- expected acks 

  instance Eq Connection where
    Connection
c1 == :: Connection -> Connection -> Bool
== Connection
c2 = Connection -> Con
conId Connection
c1 Con -> Con -> Bool
forall a. Eq a => a -> a -> Bool
== Connection -> Con
conId Connection
c2

  -------------------------------------------------------------------------
  -- Make a connection. Quite ugly.
  -------------------------------------------------------------------------
  mkConnection :: Fac.Con -> String -> Int         -> 
                             Int    -> String      -> String   -> 
                             String -> [F.Version] -> F.Heart  -> 
                             Chan F.Frame          -> ThreadId -> 
                             UTCTime               -> [Copt]   -> Connection
  mkConnection :: Con
-> String
-> Int
-> Int
-> String
-> String
-> String
-> [Version]
-> Version
-> Chan Frame
-> ThreadId
-> UTCTime
-> [Copt]
-> Connection
mkConnection Con
cid String
host Int
port Int
mx String
usr String
pwd String
ci [Version]
vs Version
hs Chan Frame
chn ThreadId
myself UTCTime
t [Copt]
os = 
    Con
-> String
-> Int
-> Int
-> String
-> String
-> String
-> String
-> String
-> [Version]
-> Version
-> Chan Frame
-> Bool
-> ThreadId
-> Maybe EHandler
-> UTCTime
-> UTCTime
-> Int
-> Int
-> [SubEntry]
-> [DestEntry]
-> [ThreadEntry]
-> [Receipt]
-> [MsgId]
-> Connection
Connection Con
cid String
host Int
port Int
mx String
usr String
pwd String
ci String
"" String
"" [Version]
vs Version
hs Chan Frame
chn Bool
False 
                   ThreadId
myself ([Copt] -> Maybe EHandler
oEH [Copt]
os) UTCTime
t UTCTime
t 
                          ([Copt] -> Int
oWaitBroker [Copt]
os) 
                          ([Copt] -> Int
oWaitError  [Copt]
os) [] [] [] [] []

  -------------------------------------------------------------------------
  -- | Options passed to a connection
  -------------------------------------------------------------------------
  data Copt = 
    -- | Tells the connection to wait /n/ milliseconds for the 'Receipt' 
    --   sent with 'F.Disconnect' at the end of the session.
    --   The /Stomp/ protocol advises to request a receipt 
    --   and to wait for it before actually closing the 
    --   socket. Many brokers, however, do not 
    --   implement this feature (or implement it inappropriately,
    --   closing the connection immediately after having sent
    --   the receipt).
    --   'withConnection', for this reason, ignores 
    --   the receipt by default and simply closes the socket
    --   after having sent the 'F.Disconnect' frame.
    --   If your broker shows a correct behaviour, 
    --   it is advisable to use this option.
    OWaitBroker Int |

    -- | Wait /n/ milliseconds 
    --   after the connection has been closed by the broker
    --   to give the library some time to process
    --   the error message (if one has been sent).
    --   
    OWaitError  Int |

    -- | The maximum size of TCP/IP packets.
    --   This option is currently ignored.
    --   Instead, 'Data.Conduit.Network' defines
    --   the packet size (currently hard-wired 4KB).
    --   The maximum message size is 1024 times this value,
    --   /i.e./ 4MB.
    OMaxRecv    Int |

    -- | This option defines the client\'s bid
    --   for negotiating heartbeats providing 
    --   an accepted lower and upper bound
    --   expessed as milliseconds between heartbeats.
    --   By default, no heart beats are sent or accepted
    OHeartBeat  (F.Heart) |

    -- | Authentication: user and password
    OAuth String String |

    -- | Identification: specifies the JMS Client ID for persistent connections
    OClientId String |

    -- | With this option set, "connect" will use 
    --   a "STOMP" frame instead of a "CONNECT" frame
    OStomp |

    -- | Connection timeout in milliseconds;
    --   if the broker does not respond to a connect request
    --   within this time frame, a 'ConnectException' is thrown.
    --   If the value is <= 0, the program will wait forever.
    OTmo Int |

    -- | 'TLSClientConfig'
    --        (see 'Data.Conduit.Network.TLS' for details)
    --   for TLS connections.
    --   If the option is not given, 
    --      a plain TCP/IP connection is used.
    OTLS TLSClientConfig |

    -- | Action to handle Error frames;
    --   if the option is not given,
    --   an exception is raised on arrival of an error frame.
    --   If it is given, one should also pass a value
    --   for OWaitError to give the error handler time
    --   to execute.
    OEH EHandler

  instance Show Copt where
    show :: Copt -> String
show (OWaitBroker Int
i) = String
"OWaitBroker" String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
i
    show (OWaitError  Int
i) = String
"OWaitError " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
i
    show (OMaxRecv    Int
i) = String
"OMaxRecv "   String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
i
    show (OHeartBeat  Version
h) = String
"OHeartBeat " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Version -> String
forall a. Show a => a -> String
show Version
h
    show (OAuth     String
u String
p) = String
"OAuth " String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
u String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"/" String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
p
    show (OClientId   String
u) = String
"OClientId "  String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
u
    show  Copt
OStomp         = String
"OStomp"  
    show (OTmo        Int
i) = String
"OTmo"        String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
i
    show (OTLS        TLSClientConfig
c) = String
"OTLS      "  String -> ShowS
forall a. [a] -> [a] -> [a]
++ Bool -> String
forall a. Show a => a -> String
show (TLSClientConfig -> Bool
tlsClientUseTLS TLSClientConfig
c)
    show (OEH         EHandler
_) = String
"OEHandler "  

  instance Eq Copt where
    (OWaitBroker Int
i1) == :: Copt -> Copt -> Bool
== (OWaitBroker Int
i2) = Int
i1 Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
i2
    (OWaitError  Int
i1) == (OWaitError  Int
i2) = Int
i1 Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
i2
    (OMaxRecv    Int
i1) == (OMaxRecv    Int
i2) = Int
i1 Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
i2 
    (OHeartBeat  Version
h1) == (OHeartBeat  Version
h2) = Version
h1 Version -> Version -> Bool
forall a. Eq a => a -> a -> Bool
== Version
h2
    (OAuth    String
u1 String
p1) == (OAuth    String
u2 String
p2) = String
u1 String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
u2 Bool -> Bool -> Bool
&& String
p1 String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
p2
    (OClientId   String
u1) == (OClientId   String
u2) = String
u1 String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
u2
    (OTmo        Int
i1) == (OTmo        Int
i2) = Int
i1 Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
i2
    (OTLS        TLSClientConfig
c1) == (OTLS        TLSClientConfig
c2) = TLSClientConfig -> Bool
tlsClientUseTLS TLSClientConfig
c1 Bool -> Bool -> Bool
&& 
                                           TLSClientConfig -> Bool
tlsClientUseTLS TLSClientConfig
c2
    (OEH          EHandler
_) == (OEH          EHandler
_) = Bool
True
    Copt
OStomp           == Copt
OStomp           = Bool
True
    Copt
_                == Copt
_                = Bool
False

  ------------------------------------------------------------------------
  -- Same constructor
  ------------------------------------------------------------------------
  is :: Copt -> Copt -> Bool
  is :: Copt -> Copt -> Bool
is (OWaitBroker Int
_) (OWaitBroker Int
_) = Bool
True
  is (OWaitError  Int
_) (OWaitError  Int
_) = Bool
True
  is (OMaxRecv    Int
_) (OMaxRecv    Int
_) = Bool
True
  is (OHeartBeat  Version
_) (OHeartBeat  Version
_) = Bool
True
  is (OAuth     String
_ String
_) (OAuth     String
_ String
_) = Bool
True
  is (OClientId   String
_) (OClientId  String
_)  = Bool
True
  is (Copt
OStomp       ) (Copt
OStomp      )  = Bool
True
  is (OTmo Int
_       ) (OTmo Int
_      )  = Bool
True
  is (OTLS      TLSClientConfig
_  ) (OTLS      TLSClientConfig
_ )  = Bool
True
  is (OEH       EHandler
_  ) (OEH       EHandler
_ )  = Bool
True
  is Copt
_               Copt
_               = Bool
False

  noWait :: Int
  noWait :: Int
noWait = Int
0

  stdRecv :: Int
  stdRecv :: Int
stdRecv = Int
1024

  noBeat :: F.Heart
  noBeat :: Version
noBeat = (Int
0,Int
0)

  noAuth :: (String, String)
  noAuth :: (String, String)
noAuth = (String
"",String
"")

  noCliId :: String
  noCliId :: String
noCliId = String
""

  oWaitBroker :: [Copt] -> Int
  oWaitBroker :: [Copt] -> Int
oWaitBroker [Copt]
os = case (Copt -> Bool) -> [Copt] -> Maybe Copt
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (Copt -> Copt -> Bool
is (Copt -> Copt -> Bool) -> Copt -> Copt -> Bool
forall a b. (a -> b) -> a -> b
$ Int -> Copt
OWaitBroker Int
0) [Copt]
os of
                     Just (OWaitBroker Int
d) -> Int
d
                     Maybe Copt
_   -> Int
noWait

  oWaitError  :: [Copt] -> Int
  oWaitError :: [Copt] -> Int
oWaitError  [Copt]
os = case (Copt -> Bool) -> [Copt] -> Maybe Copt
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (Copt -> Copt -> Bool
is (Copt -> Copt -> Bool) -> Copt -> Copt -> Bool
forall a b. (a -> b) -> a -> b
$ Int -> Copt
OWaitError  Int
0) [Copt]
os of
                     Just (OWaitError Int
d) -> Int
d
                     Maybe Copt
_   -> Int
noWait

  oMaxRecv :: [Copt] -> Int
  oMaxRecv :: [Copt] -> Int
oMaxRecv [Copt]
os = case (Copt -> Bool) -> [Copt] -> Maybe Copt
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (Copt -> Copt -> Bool
is (Copt -> Copt -> Bool) -> Copt -> Copt -> Bool
forall a b. (a -> b) -> a -> b
$ Int -> Copt
OMaxRecv Int
0) [Copt]
os of
                  Just (OMaxRecv Int
i) -> Int
i
                  Maybe Copt
_ -> Int
stdRecv

  oHeartBeat :: [Copt] -> F.Heart
  oHeartBeat :: [Copt] -> Version
oHeartBeat [Copt]
os = case (Copt -> Bool) -> [Copt] -> Maybe Copt
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (Copt -> Copt -> Bool
is (Copt -> Copt -> Bool) -> Copt -> Copt -> Bool
forall a b. (a -> b) -> a -> b
$ Version -> Copt
OHeartBeat (Int
0,Int
0)) [Copt]
os of
                    Just (OHeartBeat Version
b) -> Version
b
                    Maybe Copt
_ -> Version
noBeat

  oAuth :: [Copt] -> (String, String)
  oAuth :: [Copt] -> (String, String)
oAuth [Copt]
os = case (Copt -> Bool) -> [Copt] -> Maybe Copt
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (Copt -> Copt -> Bool
is (Copt -> Copt -> Bool) -> Copt -> Copt -> Bool
forall a b. (a -> b) -> a -> b
$ String -> String -> Copt
OAuth String
"" String
"") [Copt]
os of
               Just (OAuth String
u String
p) -> (String
u, String
p)
               Maybe Copt
_   -> (String, String)
noAuth

  oCliId :: [Copt] -> String
  oCliId :: [Copt] -> String
oCliId [Copt]
os = case (Copt -> Bool) -> [Copt] -> Maybe Copt
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (Copt -> Copt -> Bool
is (Copt -> Copt -> Bool) -> Copt -> Copt -> Bool
forall a b. (a -> b) -> a -> b
$ String -> Copt
OClientId String
"") [Copt]
os of
               Just (OClientId String
i) -> String
i
               Maybe Copt
_   -> String
noCliId

  oStomp :: [Copt] -> Bool
  oStomp :: [Copt] -> Bool
oStomp [Copt]
os = case (Copt -> Bool) -> [Copt] -> Maybe Copt
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (Copt -> Copt -> Bool
is Copt
OStomp) [Copt]
os of
                Just Copt
_  -> Bool
True
                Maybe Copt
Nothing -> Bool
False

  oTmo :: [Copt] -> Int
  oTmo :: [Copt] -> Int
oTmo [Copt]
os = case (Copt -> Bool) -> [Copt] -> Maybe Copt
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (Copt -> Copt -> Bool
is (Copt -> Copt -> Bool) -> Copt -> Copt -> Bool
forall a b. (a -> b) -> a -> b
$ Int -> Copt
OTmo Int
0) [Copt]
os of
              Just (OTmo Int
i) -> Int
i
              Maybe Copt
_             -> Int
0

  oTLS :: String -> Int -> [Copt] -> TLSClientConfig
  oTLS :: String -> Int -> [Copt] -> TLSClientConfig
oTLS String
h Int
p [Copt]
os = case (Copt -> Bool) -> [Copt] -> Maybe Copt
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (Copt -> Copt -> Bool
is (Copt -> Copt -> Bool) -> Copt -> Copt -> Bool
forall a b. (a -> b) -> a -> b
$ TLSClientConfig -> Copt
OTLS TLSClientConfig
dcfg) [Copt]
os of
                     Just (OTLS TLSClientConfig
cfg) -> TLSClientConfig
cfg
                     Maybe Copt
_               -> TLSClientConfig
dcfg
    where dcfg :: TLSClientConfig
dcfg = (Int -> ByteString -> TLSClientConfig
tlsClientConfig Int
p (ByteString -> TLSClientConfig) -> ByteString -> TLSClientConfig
forall a b. (a -> b) -> a -> b
$ String -> ByteString
B.pack String
h){tlsClientUseTLS :: Bool
tlsClientUseTLS=Bool
False}

  oEH :: [Copt] -> Maybe EHandler
  oEH :: [Copt] -> Maybe EHandler
oEH [Copt]
os = case (Copt -> Bool) -> [Copt] -> Maybe Copt
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (Copt -> Copt -> Bool
is (Copt -> Copt -> Bool) -> Copt -> Copt -> Bool
forall a b. (a -> b) -> a -> b
$ EHandler -> Copt
OEH EHandler
forall (m :: * -> *) p p. Monad m => p -> p -> m ()
deh) [Copt]
os of
             Just (OEH EHandler
eh) -> EHandler -> Maybe EHandler
forall a. a -> Maybe a
Just EHandler
eh
             Maybe Copt
_             -> Maybe EHandler
forall a. Maybe a
Nothing
    where deh :: p -> p -> m ()
deh p
_ p
_ = () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

  findCon :: Fac.Con -> [Connection] -> Maybe Connection
  findCon :: Con -> [Connection] -> Maybe Connection
findCon Con
cid = (Connection -> Bool) -> [Connection] -> Maybe Connection
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (\Connection
c -> Connection -> Con
conId Connection
c Con -> Con -> Bool
forall a. Eq a => a -> a -> Bool
== Con
cid)

  addAckToCon :: MsgId -> Connection -> Connection
  addAckToCon :: MsgId -> Connection -> Connection
addAckToCon MsgId
mid Connection
c = Connection
c {conAcks :: [MsgId]
conAcks = MsgId
mid MsgId -> [MsgId] -> [MsgId]
forall a. a -> [a] -> [a]
: Connection -> [MsgId]
conAcks Connection
c} 

  rmAckFromCon :: MsgId -> Connection -> Connection
  rmAckFromCon :: MsgId -> Connection -> Connection
rmAckFromCon MsgId
mid Connection
c = Connection
c {conAcks :: [MsgId]
conAcks = MsgId -> [MsgId] -> [MsgId]
forall a. Eq a => a -> [a] -> [a]
delete' MsgId
mid ([MsgId] -> [MsgId]) -> [MsgId] -> [MsgId]
forall a b. (a -> b) -> a -> b
$ Connection -> [MsgId]
conAcks Connection
c}

  addRecToCon :: Receipt -> Connection -> Connection
  addRecToCon :: Receipt -> Connection -> Connection
addRecToCon Receipt
r Connection
c = Connection
c {conRecs :: [Receipt]
conRecs = Receipt
r Receipt -> [Receipt] -> [Receipt]
forall a. a -> [a] -> [a]
: Connection -> [Receipt]
conRecs Connection
c}

  rmRecFromCon :: Receipt -> Connection -> Connection
  rmRecFromCon :: Receipt -> Connection -> Connection
rmRecFromCon Receipt
r Connection
c = Connection
c {conRecs :: [Receipt]
conRecs = Receipt -> [Receipt] -> [Receipt]
forall a. Eq a => a -> [a] -> [a]
delete' Receipt
r ([Receipt] -> [Receipt]) -> [Receipt] -> [Receipt]
forall a b. (a -> b) -> a -> b
$ Connection -> [Receipt]
conRecs Connection
c}

  checkReceiptCon :: Receipt -> Connection -> Bool
  checkReceiptCon :: Receipt -> Connection -> Bool
checkReceiptCon Receipt
r Connection
c = case (Receipt -> Bool) -> [Receipt] -> Maybe Receipt
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (Receipt -> Receipt -> Bool
forall a. Eq a => a -> a -> Bool
== Receipt
r) ([Receipt] -> Maybe Receipt) -> [Receipt] -> Maybe Receipt
forall a b. (a -> b) -> a -> b
$ Connection -> [Receipt]
conRecs Connection
c of
                         Maybe Receipt
Nothing -> Bool
True
                         Just Receipt
_  -> Bool
False

  ---------------------------------------------------------------------
  -- Connection interfaces
  ---------------------------------------------------------------------
  connected :: Connection -> Bool
  connected :: Connection -> Bool
connected = Connection -> Bool
conBrk 

  getVersion :: Connection -> F.Version
  getVersion :: Connection -> Version
getVersion Connection
c = if [Version] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (Connection -> [Version]
conVers Connection
c) 
                   then Version
defVersion
                   else [Version] -> Version
forall a. [a] -> a
head ([Version] -> Version) -> [Version] -> Version
forall a b. (a -> b) -> a -> b
$ Connection -> [Version]
conVers Connection
c

  getEH :: Connection -> Maybe EHandler
  getEH :: Connection -> Maybe EHandler
getEH = Connection -> Maybe EHandler
conEH 

  ------------------------------------------------------------------------
  -- Sub, Dest and Thread Entry
  ------------------------------------------------------------------------
  type SubEntry    = (Fac.Sub, Chan F.Frame)
  type DestEntry   = (String, Chan F.Frame)
  type ThreadEntry = (ThreadId, [Transaction])

  addSubToCon :: SubEntry -> Connection -> Connection
  addSubToCon :: SubEntry -> Connection -> Connection
addSubToCon SubEntry
s Connection
c = Connection
c {conSubs :: [SubEntry]
conSubs = SubEntry
s SubEntry -> [SubEntry] -> [SubEntry]
forall a. a -> [a] -> [a]
: Connection -> [SubEntry]
conSubs Connection
c}

  getSub :: Fac.Sub -> Connection -> Maybe (Chan F.Frame)
  getSub :: Sub -> Connection -> Maybe (Chan Frame)
getSub Sub
sid Connection
c = Sub -> [SubEntry] -> Maybe (Chan Frame)
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup Sub
sid (Connection -> [SubEntry]
conSubs Connection
c)

  rmSubFromCon :: SubEntry -> Connection -> Connection
  rmSubFromCon :: SubEntry -> Connection -> Connection
rmSubFromCon SubEntry
s Connection
c = Connection
c {conSubs :: [SubEntry]
conSubs = [SubEntry]
ss} 
    where !ss :: [SubEntry]
ss = (SubEntry -> SubEntry -> Bool)
-> SubEntry -> [SubEntry] -> [SubEntry]
forall a. (a -> a -> Bool) -> a -> [a] -> [a]
deleteBy' SubEntry -> SubEntry -> Bool
forall a b. Eq a => (a, b) -> (a, b) -> Bool
eq SubEntry
s (Connection -> [SubEntry]
conSubs Connection
c)

  addDestToCon :: DestEntry -> Connection -> Connection
  addDestToCon :: DestEntry -> Connection -> Connection
addDestToCon DestEntry
d Connection
c = Connection
c {conDests :: [DestEntry]
conDests = DestEntry
d DestEntry -> [DestEntry] -> [DestEntry]
forall a. a -> [a] -> [a]
: Connection -> [DestEntry]
conDests Connection
c}

  getDest :: String -> Connection -> Maybe (Chan F.Frame)
  getDest :: String -> Connection -> Maybe (Chan Frame)
getDest String
dst Connection
c = String -> [DestEntry] -> Maybe (Chan Frame)
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup String
dst (Connection -> [DestEntry]
conDests Connection
c)

  rmDestFromCon :: DestEntry -> Connection -> Connection
  rmDestFromCon :: DestEntry -> Connection -> Connection
rmDestFromCon DestEntry
d Connection
c = Connection
c {conDests :: [DestEntry]
conDests = [DestEntry]
ds}
    where !ds :: [DestEntry]
ds = (DestEntry -> DestEntry -> Bool)
-> DestEntry -> [DestEntry] -> [DestEntry]
forall a. (a -> a -> Bool) -> a -> [a] -> [a]
deleteBy' DestEntry -> DestEntry -> Bool
forall a b. Eq a => (a, b) -> (a, b) -> Bool
eq DestEntry
d (Connection -> [DestEntry]
conDests Connection
c)

  setHisTime :: UTCTime -> Connection -> Connection
  setHisTime :: UTCTime -> Connection -> Connection
setHisTime UTCTime
t Connection
c = Connection
c {conHisBeat :: UTCTime
conHisBeat = UTCTime
t}

  setMyTime  :: UTCTime -> Connection -> Connection
  setMyTime :: UTCTime -> Connection -> Connection
setMyTime UTCTime
t Connection
c = Connection
c {conMyBeat :: UTCTime
conMyBeat = UTCTime
t}

  _updCon :: Connection -> [Connection] -> [Connection]
  _updCon :: Connection -> [Connection] -> [Connection]
_updCon Connection
c [Connection]
cs = let !c' :: [Connection]
c' = Connection -> [Connection] -> [Connection]
forall a. Eq a => a -> [a] -> [a]
delete' Connection
c [Connection]
cs in Connection
cConnection -> [Connection] -> [Connection]
forall a. a -> [a] -> [a]
:[Connection]
c'
  
  ------------------------------------------------------------------------
  -- Transaction 
  ------------------------------------------------------------------------
  data Transaction = Trn {
                       Transaction -> Tx
txId      :: Fac.Tx,
                       Transaction -> TxState
txState   :: TxState,
                       Transaction -> Int
txTmo     :: Int,
                       Transaction -> Bool
txAbrtAck :: Bool,
                       Transaction -> Bool
txAbrtRc  :: Bool,
                       Transaction -> [MsgId]
txAcks    :: [MsgId],
                       Transaction -> [Receipt]
txRecs    :: [Receipt]
                     }

  instance Eq Transaction where
    Transaction
t1 == :: Transaction -> Transaction -> Bool
== Transaction
t2 = Transaction -> Tx
txId Transaction
t1 Tx -> Tx -> Bool
forall a. Eq a => a -> a -> Bool
== Transaction -> Tx
txId Transaction
t2

  findTx :: Fac.Tx -> [Transaction] -> Maybe Transaction
  findTx :: Tx -> [Transaction] -> Maybe Transaction
findTx Tx
tx = (Transaction -> Bool) -> [Transaction] -> Maybe Transaction
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (\Transaction
x -> Transaction -> Tx
txId Transaction
x Tx -> Tx -> Bool
forall a. Eq a => a -> a -> Bool
== Tx
tx) 

  mkTrn :: Fac.Tx -> [Topt] -> Transaction
  mkTrn :: Tx -> [Topt] -> Transaction
mkTrn Tx
tx [Topt]
os = Trn :: Tx
-> TxState
-> Int
-> Bool
-> Bool
-> [MsgId]
-> [Receipt]
-> Transaction
Trn {
                  txId :: Tx
txId      = Tx
tx,
                  txState :: TxState
txState   = TxState
TxStarted,
                  txTmo :: Int
txTmo     = [Topt] -> Int
tmo [Topt]
os,
                  txAbrtAck :: Bool
txAbrtAck = Topt -> [Topt] -> Bool
hasTopt Topt
OAbortMissingAcks [Topt]
os,
                  txAbrtRc :: Bool
txAbrtRc  = Topt -> [Topt] -> Bool
hasTopt Topt
OWithReceipts     [Topt]
os,
                  txAcks :: [MsgId]
txAcks    = [],
                  txRecs :: [Receipt]
txRecs    = []}

  ------------------------------------------------------------------------
  -- | Options passed to a transaction.
  ------------------------------------------------------------------------
  data Topt = 
            -- | The timeout in milliseconds (not microseconds!)
            --   to wait for /pending receipts/.
            --   If receipts are pending, when the transaction
            --   is ready to terminate,
            --   and no timeout or a timeout /<= 0/ is given, 
            --   and the option 'OWithReceipts' 
            --   was passed to 'withTransaction',
            --   the transaction will be aborted with 'TxException';
            --   otherwise it will wait until all pending
            --   ineractions with the broker have terminated
            --   or the timeout has expired - whatever comes first.
            --   If the timeout expires first, 'TxException' is raised. 
            OTimeout Int 
            -- | This option has two effects:
            --   1) Internal interactions of the transaction
            --      with the broker will request receipts;
            --   2) before ending the transaction,
            --      the library will check for receipts
            --      that have not yet been confirmed by the broker
            --      (including receipts requested by user calls
            --       such as /writeQ/ or /ackWith/).
            --
            --   If receipts are pending, when the transaction
            --   is ready to terminate and 'OTimeout' with
            --   a value /> 0/ is given, the transaction will
            --   wait for pending receipts; otherwise
            --   the transaction will be aborted with 'TxException'.
            --   Note that it usually does not make sense to use
            --   this option without 'OTimeout',
            --   since, in all probability, there will be receipts 
            --   that have not yet been confirmed 
            --   when the transaction terminates.
            | OWithReceipts 
            -- | If a message has been received from a 
            --   queue with 'OMode' option other 
            --   than 'F.Auto' and this message has not yet been
            --   acknowledged when the transaction is ready
            --   to terminate, the /ack/ is /missing/.
            --   With this option, the transaction 
            --   will not commit with missing /acks/,
            --   but abort and raise 'TxException'.
            | OAbortMissingAcks 
    deriving (Topt -> Topt -> Bool
(Topt -> Topt -> Bool) -> (Topt -> Topt -> Bool) -> Eq Topt
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Topt -> Topt -> Bool
$c/= :: Topt -> Topt -> Bool
== :: Topt -> Topt -> Bool
$c== :: Topt -> Topt -> Bool
Eq, Int -> Topt -> ShowS
[Topt] -> ShowS
Topt -> String
(Int -> Topt -> ShowS)
-> (Topt -> String) -> ([Topt] -> ShowS) -> Show Topt
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Topt] -> ShowS
$cshowList :: [Topt] -> ShowS
show :: Topt -> String
$cshow :: Topt -> String
showsPrec :: Int -> Topt -> ShowS
$cshowsPrec :: Int -> Topt -> ShowS
Show)

  hasTopt :: Topt -> [Topt] -> Bool
  hasTopt :: Topt -> [Topt] -> Bool
hasTopt Topt
o [Topt]
os = Topt
o Topt -> [Topt] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [Topt]
os 

  tmo :: [Topt] -> Int
  tmo :: [Topt] -> Int
tmo [Topt]
os = case (Topt -> Bool) -> [Topt] -> Maybe Topt
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find Topt -> Bool
isTimeout [Topt]
os of
             Just (OTimeout Int
i) -> Int
i
             Maybe Topt
_                 -> Int
0
    where isTimeout :: Topt -> Bool
isTimeout Topt
o = case Topt
o of
                          OTimeout Int
_ -> Bool
True
                          Topt
_          -> Bool
False

  data TxState = TxStarted | TxEnded
    deriving (TxState -> TxState -> Bool
(TxState -> TxState -> Bool)
-> (TxState -> TxState -> Bool) -> Eq TxState
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: TxState -> TxState -> Bool
$c/= :: TxState -> TxState -> Bool
== :: TxState -> TxState -> Bool
$c== :: TxState -> TxState -> Bool
Eq, Int -> TxState -> ShowS
[TxState] -> ShowS
TxState -> String
(Int -> TxState -> ShowS)
-> (TxState -> String) -> ([TxState] -> ShowS) -> Show TxState
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [TxState] -> ShowS
$cshowList :: [TxState] -> ShowS
show :: TxState -> String
$cshow :: TxState -> String
showsPrec :: Int -> TxState -> ShowS
$cshowsPrec :: Int -> TxState -> ShowS
Show)

  setTxState :: TxState -> Transaction -> Transaction
  setTxState :: TxState -> Transaction -> Transaction
setTxState TxState
st Transaction
t = Transaction
t {txState :: TxState
txState = TxState
st}

  addAckToTx :: MsgId -> Transaction -> Transaction
  addAckToTx :: MsgId -> Transaction -> Transaction
addAckToTx MsgId
mid Transaction
t = Transaction
t {txAcks :: [MsgId]
txAcks = MsgId
mid MsgId -> [MsgId] -> [MsgId]
forall a. a -> [a] -> [a]
: Transaction -> [MsgId]
txAcks Transaction
t}

  rmAckFromTx :: MsgId -> Transaction -> Transaction
  rmAckFromTx :: MsgId -> Transaction -> Transaction
rmAckFromTx MsgId
mid Transaction
t = Transaction
t {txAcks :: [MsgId]
txAcks = MsgId -> [MsgId] -> [MsgId]
forall a. Eq a => a -> [a] -> [a]
delete' MsgId
mid ([MsgId] -> [MsgId]) -> [MsgId] -> [MsgId]
forall a b. (a -> b) -> a -> b
$ Transaction -> [MsgId]
txAcks Transaction
t}

  addRecToTx :: Receipt -> Transaction -> Transaction
  addRecToTx :: Receipt -> Transaction -> Transaction
addRecToTx Receipt
r Transaction
t = Transaction
t {txRecs :: [Receipt]
txRecs = Receipt
r Receipt -> [Receipt] -> [Receipt]
forall a. a -> [a] -> [a]
: Transaction -> [Receipt]
txRecs Transaction
t}

  rmRecFromTx :: Receipt -> Transaction -> Transaction
  rmRecFromTx :: Receipt -> Transaction -> Transaction
rmRecFromTx Receipt
r Transaction
t = Transaction
t {txRecs :: [Receipt]
txRecs = Receipt -> [Receipt] -> [Receipt]
forall a. Eq a => a -> [a] -> [a]
delete' Receipt
r ([Receipt] -> [Receipt]) -> [Receipt] -> [Receipt]
forall a b. (a -> b) -> a -> b
$ Transaction -> [Receipt]
txRecs Transaction
t}

  checkReceiptTx :: Receipt -> Transaction -> Bool
  checkReceiptTx :: Receipt -> Transaction -> Bool
checkReceiptTx Receipt
r = Receipt -> [Receipt] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
notElem Receipt
r ([Receipt] -> Bool)
-> (Transaction -> [Receipt]) -> Transaction -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Transaction -> [Receipt]
txRecs 

  txPendingAck :: Transaction -> Bool
  txPendingAck :: Transaction -> Bool
txPendingAck Transaction
t = Transaction -> Bool
txAbrtAck Transaction
t Bool -> Bool -> Bool
&& Bool -> Bool
not ([MsgId] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null ([MsgId] -> Bool) -> [MsgId] -> Bool
forall a b. (a -> b) -> a -> b
$ Transaction -> [MsgId]
txAcks Transaction
t)

  txReceipts :: Transaction -> Bool
  txReceipts :: Transaction -> Bool
txReceipts Transaction
t = Transaction -> Bool
txAbrtRc Transaction
t Bool -> Bool -> Bool
&& Bool -> Bool
not ([Receipt] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null ([Receipt] -> Bool) -> [Receipt] -> Bool
forall a b. (a -> b) -> a -> b
$ Transaction -> [Receipt]
txRecs Transaction
t) 

  ------------------------------------------------------------------------
  -- State 
  ------------------------------------------------------------------------
  {-# NOINLINE con #-}
  con :: MVar [Connection]
  con :: MVar [Connection]
con = IO (MVar [Connection]) -> MVar [Connection]
forall a. IO a -> a
unsafePerformIO (IO (MVar [Connection]) -> MVar [Connection])
-> IO (MVar [Connection]) -> MVar [Connection]
forall a b. (a -> b) -> a -> b
$ [Connection] -> IO (MVar [Connection])
forall a. a -> IO (MVar a)
newMVar []
 
  ------------------------------------------------------------------------
  -- Add connection to state
  ------------------------------------------------------------------------
  addCon :: Connection -> IO ()
  addCon :: Connection -> IO ()
addCon Connection
c = MVar [Connection] -> ([Connection] -> IO [Connection]) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar [Connection]
con (([Connection] -> IO [Connection]) -> IO ())
-> ([Connection] -> IO [Connection]) -> IO ()
forall a b. (a -> b) -> a -> b
$ \[Connection]
cs -> [Connection] -> IO [Connection]
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection
cConnection -> [Connection] -> [Connection]
forall a. a -> [a] -> [a]
:[Connection]
cs)

  ------------------------------------------------------------------------
  -- get connection from state
  ------------------------------------------------------------------------
  getCon :: Fac.Con -> IO Connection
  getCon :: Con -> IO Connection
getCon Con
cid = Con -> (Connection -> IO (Connection, Connection)) -> IO Connection
forall a. Con -> (Connection -> IO (Connection, a)) -> IO a
withCon Con
cid ((Connection -> IO (Connection, Connection)) -> IO Connection)
-> (Connection -> IO (Connection, Connection)) -> IO Connection
forall a b. (a -> b) -> a -> b
$ \Connection
c -> (Connection, Connection) -> IO (Connection, Connection)
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection
c, Connection
c) 

  ------------------------------------------------------------------------
  -- update connection 
  ------------------------------------------------------------------------
  updCon :: Fac.Con -> Connection -> IO ()
  updCon :: Con -> Connection -> IO ()
updCon Con
cid Connection
c = Con -> (Connection -> IO (Connection, ())) -> IO ()
forall a. Con -> (Connection -> IO (Connection, a)) -> IO a
withCon Con
cid ((Connection -> IO (Connection, ())) -> IO ())
-> (Connection -> IO (Connection, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Connection
_ -> (Connection, ()) -> IO (Connection, ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection
c, ()) 

  ------------------------------------------------------------------------
  -- remove connection from state
  ------------------------------------------------------------------------
  rmCon :: Fac.Con -> IO ()
  rmCon :: Con -> IO ()
rmCon Con
cid = MVar [Connection] -> ([Connection] -> IO [Connection]) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar [Connection]
con (([Connection] -> IO [Connection]) -> IO ())
-> ([Connection] -> IO [Connection]) -> IO ()
forall a b. (a -> b) -> a -> b
$ \[Connection]
cs -> 
    case Con -> [Connection] -> Maybe Connection
findCon Con
cid [Connection]
cs of
      Maybe Connection
Nothing -> [Connection] -> IO [Connection]
forall (m :: * -> *) a. Monad m => a -> m a
return [Connection]
cs
      Just Connection
c  -> [Connection] -> IO [Connection]
forall (m :: * -> *) a. Monad m => a -> m a
return ([Connection] -> IO [Connection])
-> [Connection] -> IO [Connection]
forall a b. (a -> b) -> a -> b
$ Connection -> [Connection] -> [Connection]
forall a. Eq a => a -> [a] -> [a]
delete' Connection
c [Connection]
cs

  ------------------------------------------------------------------------
  -- Apply an action that may change a connection to the state
  ------------------------------------------------------------------------
  withCon :: Fac.Con -> (Connection -> IO (Connection, a)) -> IO a
  withCon :: Con -> (Connection -> IO (Connection, a)) -> IO a
withCon Con
cid Connection -> IO (Connection, a)
op = MVar [Connection] -> ([Connection] -> IO ([Connection], a)) -> IO a
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar [Connection]
con (\[Connection]
cs -> 
     case Con -> [Connection] -> Maybe Connection
findCon Con
cid [Connection]
cs of
       Maybe Connection
Nothing   -> 
         StomplException -> IO ([Connection], a)
forall e a. Exception e => e -> IO a
throwIO (StomplException -> IO ([Connection], a))
-> StomplException -> IO ([Connection], a)
forall a b. (a -> b) -> a -> b
$ String -> StomplException
ConnectException (String -> StomplException) -> String -> StomplException
forall a b. (a -> b) -> a -> b
$
                 String
"No such Connection: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Con -> String
forall a. Show a => a -> String
show Con
cid
       Just Connection
c    -> do
         (Connection
c', a
x) <- Connection -> IO (Connection, a)
op Connection
c
         let cs' :: [Connection]
cs' = Connection -> [Connection] -> [Connection]
_updCon Connection
c' [Connection]
cs
         ([Connection], a) -> IO ([Connection], a)
forall (m :: * -> *) a. Monad m => a -> m a
return ([Connection]
cs', a
x))

  ------------------------------------------------------------------------
  -- Log heart-beats
  ------------------------------------------------------------------------
  logTime :: Fac.Con -> (UTCTime -> Connection -> Connection) -> IO ()
  logTime :: Con -> (UTCTime -> Connection -> Connection) -> IO ()
logTime Con
cid UTCTime -> Connection -> Connection
f = 
    IO UTCTime
getCurrentTime IO UTCTime -> (UTCTime -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \UTCTime
t -> Con -> (Connection -> IO (Connection, ())) -> IO ()
forall a. Con -> (Connection -> IO (Connection, a)) -> IO a
withCon Con
cid (\Connection
c -> (Connection, ()) -> IO (Connection, ())
forall (m :: * -> *) a. Monad m => a -> m a
return (UTCTime -> Connection -> Connection
f UTCTime
t Connection
c, ()))

  logSend :: Fac.Con -> IO ()
  logSend :: Con -> IO ()
logSend Con
cid = Con -> (UTCTime -> Connection -> Connection) -> IO ()
logTime Con
cid UTCTime -> Connection -> Connection
setMyTime

  logReceive :: Fac.Con -> IO ()
  logReceive :: Con -> IO ()
logReceive Con
cid = Con -> (UTCTime -> Connection -> Connection) -> IO ()
logTime Con
cid UTCTime -> Connection -> Connection
setHisTime

  ------------------------------------------------------------------------
  -- add and remove sub and dest
  ------------------------------------------------------------------------
  addSub :: Fac.Con -> SubEntry -> IO ()
  addSub :: Con -> SubEntry -> IO ()
addSub Con
cid SubEntry
s  = Con -> (Connection -> IO (Connection, ())) -> IO ()
forall a. Con -> (Connection -> IO (Connection, a)) -> IO a
withCon Con
cid ((Connection -> IO (Connection, ())) -> IO ())
-> (Connection -> IO (Connection, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Connection
c -> (Connection, ()) -> IO (Connection, ())
forall (m :: * -> *) a. Monad m => a -> m a
return (SubEntry -> Connection -> Connection
addSubToCon SubEntry
s Connection
c, ())

  addDest :: Fac.Con -> DestEntry -> IO ()
  addDest :: Con -> DestEntry -> IO ()
addDest Con
cid DestEntry
d = Con -> (Connection -> IO (Connection, ())) -> IO ()
forall a. Con -> (Connection -> IO (Connection, a)) -> IO a
withCon Con
cid ((Connection -> IO (Connection, ())) -> IO ())
-> (Connection -> IO (Connection, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Connection
c -> (Connection, ()) -> IO (Connection, ())
forall (m :: * -> *) a. Monad m => a -> m a
return (DestEntry -> Connection -> Connection
addDestToCon DestEntry
d Connection
c, ())

  rmSub :: Fac.Con -> Fac.Sub -> IO ()
  rmSub :: Con -> Sub -> IO ()
rmSub Con
cid Sub
sid = Con -> (Connection -> IO (Connection, ())) -> IO ()
forall a. Con -> (Connection -> IO (Connection, a)) -> IO a
withCon Con
cid Connection -> IO (Connection, ())
forall (m :: * -> *). Monad m => Connection -> m (Connection, ())
rm
    where rm :: Connection -> m (Connection, ())
rm Connection
c = case Sub -> Connection -> Maybe (Chan Frame)
getSub Sub
sid Connection
c of 
                   Maybe (Chan Frame)
Nothing -> (Connection, ()) -> m (Connection, ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection
c, ())
                   Just Chan Frame
ch -> (Connection, ()) -> m (Connection, ())
forall (m :: * -> *) a. Monad m => a -> m a
return (SubEntry -> Connection -> Connection
rmSubFromCon (Sub
sid, Chan Frame
ch) Connection
c, ())

  rmDest :: Fac.Con -> String -> IO ()
  rmDest :: Con -> String -> IO ()
rmDest Con
cid String
dst = Con -> (Connection -> IO (Connection, ())) -> IO ()
forall a. Con -> (Connection -> IO (Connection, a)) -> IO a
withCon Con
cid Connection -> IO (Connection, ())
forall (m :: * -> *). Monad m => Connection -> m (Connection, ())
rm
    where rm :: Connection -> m (Connection, ())
rm Connection
c = case String -> Connection -> Maybe (Chan Frame)
getDest String
dst Connection
c of 
                   Maybe (Chan Frame)
Nothing -> (Connection, ()) -> m (Connection, ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection
c, ())
                   Just Chan Frame
ch -> (Connection, ()) -> m (Connection, ())
forall (m :: * -> *) a. Monad m => a -> m a
return (DestEntry -> Connection -> Connection
rmDestFromCon (String
dst, Chan Frame
ch) Connection
c, ())

  ------------------------------------------------------------------------
  -- add transaction to connection
  -- Note: transactions are kept per threadId
  ------------------------------------------------------------------------
  addTx :: Transaction -> Fac.Con -> IO ()
  addTx :: Transaction -> Con -> IO ()
addTx Transaction
t Con
cid = Con -> (Connection -> IO (Connection, ())) -> IO ()
forall a. Con -> (Connection -> IO (Connection, a)) -> IO a
withCon Con
cid ((Connection -> IO (Connection, ())) -> IO ())
-> (Connection -> IO (Connection, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Connection
c -> do
    ThreadId
tid <- IO ThreadId
myThreadId
    case ThreadId -> [ThreadEntry] -> Maybe [Transaction]
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup ThreadId
tid (Connection -> [ThreadEntry]
conThrds Connection
c) of
      Maybe [Transaction]
Nothing -> 
        (Connection, ()) -> IO (Connection, ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection
c {conThrds :: [ThreadEntry]
conThrds = [(ThreadId
tid, [Transaction
t])]}, ())
      Just [Transaction]
ts -> 
        (Connection, ()) -> IO (Connection, ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection
c {conThrds :: [ThreadEntry]
conThrds = Transaction
-> ThreadId -> [ThreadEntry] -> [Transaction] -> [ThreadEntry]
forall a a. Eq a => a -> a -> [(a, [a])] -> [a] -> [(a, [a])]
addTx2Thrds Transaction
t ThreadId
tid (Connection -> [ThreadEntry]
conThrds Connection
c) [Transaction]
ts}, ())
    where addTx2Thrds :: a -> a -> [(a, [a])] -> [a] -> [(a, [a])]
addTx2Thrds a
tx a
tid [(a, [a])]
ts [a]
trns = 
            (a
tid, a
tx a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
trns) (a, [a]) -> [(a, [a])] -> [(a, [a])]
forall a. a -> [a] -> [a]
: ((a, [a]) -> (a, [a]) -> Bool)
-> (a, [a]) -> [(a, [a])] -> [(a, [a])]
forall a. (a -> a -> Bool) -> a -> [a] -> [a]
deleteBy' (a, [a]) -> (a, [a]) -> Bool
forall a b. Eq a => (a, b) -> (a, b) -> Bool
eq (a
tid, [a]
trns) [(a, [a])]
ts

  ------------------------------------------------------------------------
  -- get transaction from connection
  -- Note: transactions are kept per threadId
  ------------------------------------------------------------------------
  getTx :: Fac.Tx -> Connection -> IO (Maybe Transaction)
  getTx :: Tx -> Connection -> IO (Maybe Transaction)
getTx Tx
tx Connection
c = do
    ThreadId
tid <- IO ThreadId
myThreadId
    case ThreadId -> [ThreadEntry] -> Maybe [Transaction]
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup ThreadId
tid (Connection -> [ThreadEntry]
conThrds Connection
c) of
      Maybe [Transaction]
Nothing -> Maybe Transaction -> IO (Maybe Transaction)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Transaction
forall a. Maybe a
Nothing
      Just [Transaction]
ts -> Maybe Transaction -> IO (Maybe Transaction)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe Transaction -> IO (Maybe Transaction))
-> Maybe Transaction -> IO (Maybe Transaction)
forall a b. (a -> b) -> a -> b
$ Tx -> [Transaction] -> Maybe Transaction
findTx Tx
tx [Transaction]
ts

  ------------------------------------------------------------------------
  -- apply an action that may change a transaction to the state
  ------------------------------------------------------------------------
  updTx :: Fac.Tx -> Fac.Con -> (Transaction -> Transaction) -> IO ()
  updTx :: Tx -> Con -> (Transaction -> Transaction) -> IO ()
updTx Tx
tx Con
cid Transaction -> Transaction
f = Con -> (Connection -> IO (Connection, ())) -> IO ()
forall a. Con -> (Connection -> IO (Connection, a)) -> IO a
withCon Con
cid ((Connection -> IO (Connection, ())) -> IO ())
-> (Connection -> IO (Connection, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Connection
c -> do
    ThreadId
tid <- IO ThreadId
myThreadId
    case ThreadId -> [ThreadEntry] -> Maybe [Transaction]
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup ThreadId
tid (Connection -> [ThreadEntry]
conThrds Connection
c) of
      Maybe [Transaction]
Nothing -> (Connection, ()) -> IO (Connection, ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection
c, ())
      Just [Transaction]
ts -> 
        case Tx -> [Transaction] -> Maybe Transaction
findTx Tx
tx [Transaction]
ts of
          Maybe Transaction
Nothing -> (Connection, ()) -> IO (Connection, ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection
c, ())
          Just Transaction
t  -> 
            let t' :: Transaction
t' = Transaction -> Transaction
f Transaction
t
            in  (Connection, ()) -> IO (Connection, ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection
c {conThrds :: [ThreadEntry]
conThrds = 
                             Transaction
-> ThreadId -> [ThreadEntry] -> [Transaction] -> [ThreadEntry]
forall a a.
(Eq a, Eq a) =>
a -> a -> [(a, [a])] -> [a] -> [(a, [a])]
updTxInThrds Transaction
t' ThreadId
tid (Connection -> [ThreadEntry]
conThrds Connection
c) [Transaction]
ts}, 
                        ())
    where updTxInThrds :: a -> a -> [(a, [a])] -> [a] -> [(a, [a])]
updTxInThrds a
t a
tid [(a, [a])]
ts [a]
trns =
            let !trns' :: [a]
trns' = a -> [a] -> [a]
forall a. Eq a => a -> [a] -> [a]
delete' a
t [a]
trns
                !ts' :: [(a, [a])]
ts'   = ((a, [a]) -> (a, [a]) -> Bool)
-> (a, [a]) -> [(a, [a])] -> [(a, [a])]
forall a. (a -> a -> Bool) -> a -> [a] -> [a]
deleteBy' (a, [a]) -> (a, [a]) -> Bool
forall a b. Eq a => (a, b) -> (a, b) -> Bool
eq (a
tid, [a]
trns) [(a, [a])]
ts
             in (a
tid, a
t a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
trns') (a, [a]) -> [(a, [a])] -> [(a, [a])]
forall a. a -> [a] -> [a]
: [(a, [a])]
ts'

  ------------------------------------------------------------------------
  -- update transaction state
  ------------------------------------------------------------------------
  updTxState :: Fac.Tx -> Fac.Con -> TxState -> IO ()
  updTxState :: Tx -> Con -> TxState -> IO ()
updTxState Tx
tx Con
cid TxState
st = Tx -> Con -> (Transaction -> Transaction) -> IO ()
updTx Tx
tx Con
cid (TxState -> Transaction -> Transaction
setTxState TxState
st)

  ------------------------------------------------------------------------
  -- get current transaction for thread
  ------------------------------------------------------------------------
  getCurTx :: Connection -> IO (Maybe Fac.Tx)
  getCurTx :: Connection -> IO (Maybe Tx)
getCurTx Connection
c = do
    ThreadId
tid <- IO ThreadId
myThreadId
    case ThreadId -> [ThreadEntry] -> Maybe [Transaction]
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup ThreadId
tid (Connection -> [ThreadEntry]
conThrds Connection
c) of
      Maybe [Transaction]
Nothing -> Maybe Tx -> IO (Maybe Tx)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Tx
forall a. Maybe a
Nothing
      Just [Transaction]
ts -> if [Transaction] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Transaction]
ts then Maybe Tx -> IO (Maybe Tx)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Tx
forall a. Maybe a
Nothing
                   else Maybe Tx -> IO (Maybe Tx)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe Tx -> IO (Maybe Tx)) -> Maybe Tx -> IO (Maybe Tx)
forall a b. (a -> b) -> a -> b
$ Tx -> Maybe Tx
forall a. a -> Maybe a
Just (Tx -> Maybe Tx) -> Tx -> Maybe Tx
forall a b. (a -> b) -> a -> b
$ (Transaction -> Tx
txId (Transaction -> Tx)
-> ([Transaction] -> Transaction) -> [Transaction] -> Tx
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Transaction] -> Transaction
forall a. [a] -> a
head) [Transaction]
ts

  ------------------------------------------------------------------------
  -- apply a change of the current transaction 
  -- or the connection (if there is no transaction) to the state
  ------------------------------------------------------------------------
  updCurTx :: (Transaction -> Transaction) ->
              (Connection  -> Connection)  ->
              Connection -> IO (Connection, ())
  updCurTx :: (Transaction -> Transaction)
-> (Connection -> Connection) -> Connection -> IO (Connection, ())
updCurTx Transaction -> Transaction
onTx Connection -> Connection
onCon Connection
c = do
    ThreadId
tid  <- IO ThreadId
myThreadId
    case ThreadId -> [ThreadEntry] -> Maybe [Transaction]
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup ThreadId
tid ([ThreadEntry] -> Maybe [Transaction])
-> [ThreadEntry] -> Maybe [Transaction]
forall a b. (a -> b) -> a -> b
$ Connection -> [ThreadEntry]
conThrds Connection
c of
      Maybe [Transaction]
Nothing -> (Connection, ()) -> IO (Connection, ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection -> Connection
onCon Connection
c, ())
      Just [Transaction]
ts -> if [Transaction] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Transaction]
ts 
                   then (Connection, ()) -> IO (Connection, ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection -> Connection
onCon Connection
c, ())
                   else do
                      let t :: Transaction
t   = [Transaction] -> Transaction
forall a. [a] -> a
head [Transaction]
ts
                      let t' :: Transaction
t'  = Transaction -> Transaction
onTx Transaction
t 
                      let ts' :: [Transaction]
ts' = Transaction
t' Transaction -> [Transaction] -> [Transaction]
forall a. a -> [a] -> [a]
: [Transaction] -> [Transaction]
forall a. [a] -> [a]
tail [Transaction]
ts
                      let c' :: Connection
c'  = Connection
c {conThrds :: [ThreadEntry]
conThrds = 
                                      (ThreadId
tid, [Transaction]
ts') ThreadEntry -> [ThreadEntry] -> [ThreadEntry]
forall a. a -> [a] -> [a]
: 
                                         (ThreadEntry -> ThreadEntry -> Bool)
-> ThreadEntry -> [ThreadEntry] -> [ThreadEntry]
forall a. (a -> a -> Bool) -> a -> [a] -> [a]
deleteBy' ThreadEntry -> ThreadEntry -> Bool
forall a b. Eq a => (a, b) -> (a, b) -> Bool
eq (ThreadId
tid, [Transaction]
ts) (Connection -> [ThreadEntry]
conThrds Connection
c)}
                      (Connection, ()) -> IO (Connection, ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection
c', ())

  ------------------------------------------------------------------------
  -- add a pending ack either to the current transaction
  -- or - if there is no transaction - to the connection
  ------------------------------------------------------------------------
  addAck :: Fac.Con -> MsgId -> IO ()
  addAck :: Con -> MsgId -> IO ()
addAck Con
cid MsgId
mid = do
    let toTx :: Transaction -> Transaction
toTx  = MsgId -> Transaction -> Transaction
addAckToTx  MsgId
mid
    let toCon :: Connection -> Connection
toCon = MsgId -> Connection -> Connection
addAckToCon MsgId
mid
    Con -> (Connection -> IO (Connection, ())) -> IO ()
forall a. Con -> (Connection -> IO (Connection, a)) -> IO a
withCon Con
cid ((Connection -> IO (Connection, ())) -> IO ())
-> (Connection -> IO (Connection, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ (Transaction -> Transaction)
-> (Connection -> Connection) -> Connection -> IO (Connection, ())
updCurTx Transaction -> Transaction
toTx Connection -> Connection
toCon

  ------------------------------------------------------------------------
  -- remove a pending ack either from the current transaction
  -- or - if there is no transaction - from the connection
  ------------------------------------------------------------------------
  rmAck :: Fac.Con -> MsgId -> IO ()
  rmAck :: Con -> MsgId -> IO ()
rmAck Con
cid MsgId
mid = do
    let fromTx :: Transaction -> Transaction
fromTx  = MsgId -> Transaction -> Transaction
rmAckFromTx  MsgId
mid
    let fromCon :: Connection -> Connection
fromCon = MsgId -> Connection -> Connection
rmAckFromCon MsgId
mid
    Con -> (Connection -> IO (Connection, ())) -> IO ()
forall a. Con -> (Connection -> IO (Connection, a)) -> IO a
withCon Con
cid ((Connection -> IO (Connection, ())) -> IO ())
-> (Connection -> IO (Connection, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ (Transaction -> Transaction)
-> (Connection -> Connection) -> Connection -> IO (Connection, ())
updCurTx Transaction -> Transaction
fromTx Connection -> Connection
fromCon

  ------------------------------------------------------------------------
  -- add a pending receipt either to the current transaction
  -- or - if there is no transaction - to the connection
  ------------------------------------------------------------------------
  addRec :: Fac.Con -> Receipt -> IO ()
  addRec :: Con -> Receipt -> IO ()
addRec Con
cid Receipt
r = do
    let toTx :: Transaction -> Transaction
toTx  = Receipt -> Transaction -> Transaction
addRecToTx  Receipt
r
    let toCon :: Connection -> Connection
toCon = Receipt -> Connection -> Connection
addRecToCon Receipt
r
    Con -> (Connection -> IO (Connection, ())) -> IO ()
forall a. Con -> (Connection -> IO (Connection, a)) -> IO a
withCon Con
cid ((Connection -> IO (Connection, ())) -> IO ())
-> (Connection -> IO (Connection, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ (Transaction -> Transaction)
-> (Connection -> Connection) -> Connection -> IO (Connection, ())
updCurTx Transaction -> Transaction
toTx Connection -> Connection
toCon

  ------------------------------------------------------------------------
  -- remove a pending receipt either from the current transaction
  -- or - if there is no transaction - from the connection
  ------------------------------------------------------------------------
  rmRec :: Fac.Con -> Receipt -> IO ()
  rmRec :: Con -> Receipt -> IO ()
rmRec Con
cid Receipt
r = do
    let fromTx :: Transaction -> Transaction
fromTx  = Receipt -> Transaction -> Transaction
rmRecFromTx  Receipt
r
    let fromCon :: Connection -> Connection
fromCon = Receipt -> Connection -> Connection
rmRecFromCon Receipt
r
    Con -> (Connection -> IO (Connection, ())) -> IO ()
forall a. Con -> (Connection -> IO (Connection, a)) -> IO a
withCon Con
cid ((Connection -> IO (Connection, ())) -> IO ())
-> (Connection -> IO (Connection, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ (Transaction -> Transaction)
-> (Connection -> Connection) -> Connection -> IO (Connection, ())
updCurTx Transaction -> Transaction
fromTx Connection -> Connection
fromCon

  ------------------------------------------------------------------------
  -- search for a receipt either in connection or transactions.
  -- this is used by the listener (which is not in the thread list)
  ------------------------------------------------------------------------
  forceRmRec :: Fac.Con -> Receipt -> IO ()
  forceRmRec :: Con -> Receipt -> IO ()
forceRmRec Con
cid Receipt
r = Con -> (Connection -> IO (Connection, ())) -> IO ()
forall a. Con -> (Connection -> IO (Connection, a)) -> IO a
withCon Con
cid Connection -> IO (Connection, ())
forall (m :: * -> *). Monad m => Connection -> m (Connection, ())
doRmRec 
    where doRmRec :: Connection -> m (Connection, ())
doRmRec Connection
c = 
            case (Receipt -> Bool) -> [Receipt] -> Maybe Receipt
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (Receipt -> Receipt -> Bool
forall a. Eq a => a -> a -> Bool
== Receipt
r) ([Receipt] -> Maybe Receipt) -> [Receipt] -> Maybe Receipt
forall a b. (a -> b) -> a -> b
$ Connection -> [Receipt]
conRecs Connection
c of
              Just Receipt
_  -> (Connection, ()) -> m (Connection, ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Receipt -> Connection -> Connection
rmRecFromCon Receipt
r Connection
c, ())
              Maybe Receipt
Nothing -> 
                let thrds :: [ThreadEntry]
thrds = (ThreadEntry -> ThreadEntry) -> [ThreadEntry] -> [ThreadEntry]
forall a b. (a -> b) -> [a] -> [b]
map ThreadEntry -> ThreadEntry
forall a. (a, [Transaction]) -> (a, [Transaction])
rmRecFromThrd ([ThreadEntry] -> [ThreadEntry]) -> [ThreadEntry] -> [ThreadEntry]
forall a b. (a -> b) -> a -> b
$ Connection -> [ThreadEntry]
conThrds Connection
c
                in  (Connection, ()) -> m (Connection, ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection
c {conThrds :: [ThreadEntry]
conThrds = [ThreadEntry]
thrds}, ()) 
          rmRecFromThrd :: (a, [Transaction]) -> (a, [Transaction])
rmRecFromThrd   (a
thrd, [Transaction]
ts) = (a
thrd, (Transaction -> Transaction) -> [Transaction] -> [Transaction]
forall a b. (a -> b) -> [a] -> [b]
map (Receipt -> Transaction -> Transaction
rmRecFromTx Receipt
r) [Transaction]
ts)

  ------------------------------------------------------------------------
  -- check a condition either on the current transaction or
  -- - if there is not transaction - on the connection
  ------------------------------------------------------------------------
  checkCurTx :: (Transaction -> Bool) ->
                (Connection  -> Bool) -> 
                Fac.Con -> IO Bool
  checkCurTx :: (Transaction -> Bool) -> (Connection -> Bool) -> Con -> IO Bool
checkCurTx Transaction -> Bool
onTx Connection -> Bool
onCon Con
cid = do
    Connection
c   <- Con -> IO Connection
getCon Con
cid
    ThreadId
tid <- IO ThreadId
myThreadId
    case ThreadId -> [ThreadEntry] -> Maybe [Transaction]
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup ThreadId
tid ([ThreadEntry] -> Maybe [Transaction])
-> [ThreadEntry] -> Maybe [Transaction]
forall a b. (a -> b) -> a -> b
$ Connection -> [ThreadEntry]
conThrds Connection
c of
      Maybe [Transaction]
Nothing -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Connection -> Bool
onCon Connection
c
      Just [Transaction]
ts -> if [Transaction] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Transaction]
ts then Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Connection -> Bool
onCon Connection
c
                   else Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Transaction -> Bool
onTx (Transaction -> Bool) -> Transaction -> Bool
forall a b. (a -> b) -> a -> b
$ [Transaction] -> Transaction
forall a. [a] -> a
head [Transaction]
ts

  ------------------------------------------------------------------------
  -- check a receipt
  ------------------------------------------------------------------------
  checkReceipt :: Fac.Con -> Receipt -> IO Bool
  checkReceipt :: Con -> Receipt -> IO Bool
checkReceipt Con
cid Receipt
r = do
    let onTx :: Transaction -> Bool
onTx  = Receipt -> Transaction -> Bool
checkReceiptTx Receipt
r
    let onCon :: Connection -> Bool
onCon = Receipt -> Connection -> Bool
checkReceiptCon Receipt
r
    (Transaction -> Bool) -> (Connection -> Bool) -> Con -> IO Bool
checkCurTx Transaction -> Bool
onTx Connection -> Bool
onCon Con
cid

  ------------------------------------------------------------------------
  -- remove a specific transaction
  ------------------------------------------------------------------------
  rmThisTx :: Fac.Tx -> Fac.Con -> IO ()
  rmThisTx :: Tx -> Con -> IO ()
rmThisTx Tx
tx Con
cid = Con -> (Connection -> IO (Connection, ())) -> IO ()
forall a. Con -> (Connection -> IO (Connection, a)) -> IO a
withCon Con
cid ((Connection -> IO (Connection, ())) -> IO ())
-> (Connection -> IO (Connection, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Connection
c -> do
    ThreadId
tid <- IO ThreadId
myThreadId
    case ThreadId -> [ThreadEntry] -> Maybe [Transaction]
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup ThreadId
tid (Connection -> [ThreadEntry]
conThrds Connection
c) of
      Maybe [Transaction]
Nothing -> (Connection, ()) -> IO (Connection, ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection
c, ())
      Just [Transaction]
ts -> 
        if [Transaction] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Transaction]
ts 
          then (Connection, ()) -> IO (Connection, ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection
c {conThrds :: [ThreadEntry]
conThrds = (ThreadEntry -> ThreadEntry -> Bool)
-> ThreadEntry -> [ThreadEntry] -> [ThreadEntry]
forall a. (a -> a -> Bool) -> a -> [a] -> [a]
deleteBy' ThreadEntry -> ThreadEntry -> Bool
forall a b. Eq a => (a, b) -> (a, b) -> Bool
eq (ThreadId
tid, []) (Connection -> [ThreadEntry]
conThrds Connection
c)}, ())
          else 
            case Tx -> [Transaction] -> Maybe Transaction
findTx Tx
tx [Transaction]
ts of
              Maybe Transaction
Nothing -> (Connection, ()) -> IO (Connection, ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection
c, ())
              Just Transaction
t  -> do
                let ts' :: [Transaction]
ts' = Transaction -> [Transaction] -> [Transaction]
forall a. Eq a => a -> [a] -> [a]
delete' Transaction
t [Transaction]
ts
                if [Transaction] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Transaction]
ts' 
                  then (Connection, ()) -> IO (Connection, ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection
c {conThrds :: [ThreadEntry]
conThrds = 
                                   (ThreadEntry -> ThreadEntry -> Bool)
-> ThreadEntry -> [ThreadEntry] -> [ThreadEntry]
forall a. (a -> a -> Bool) -> a -> [a] -> [a]
deleteBy' ThreadEntry -> ThreadEntry -> Bool
forall a b. Eq a => (a, b) -> (a, b) -> Bool
eq (ThreadId
tid, [Transaction]
ts) (Connection -> [ThreadEntry]
conThrds Connection
c)},  ())
                  else (Connection, ()) -> IO (Connection, ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection
c {conThrds :: [ThreadEntry]
conThrds = (ThreadId
tid, [Transaction]
ts') ThreadEntry -> [ThreadEntry] -> [ThreadEntry]
forall a. a -> [a] -> [a]
: 
                                   (ThreadEntry -> ThreadEntry -> Bool)
-> ThreadEntry -> [ThreadEntry] -> [ThreadEntry]
forall a. (a -> a -> Bool) -> a -> [a] -> [a]
deleteBy' ThreadEntry -> ThreadEntry -> Bool
forall a b. Eq a => (a, b) -> (a, b) -> Bool
eq (ThreadId
tid, [Transaction]
ts) (Connection -> [ThreadEntry]
conThrds Connection
c)}, ())

  ------------------------------------------------------------------------
  -- remove the current transaction
  ------------------------------------------------------------------------
  rmTx :: Fac.Con -> IO ()
  rmTx :: Con -> IO ()
rmTx Con
cid = Con -> (Connection -> IO (Connection, ())) -> IO ()
forall a. Con -> (Connection -> IO (Connection, a)) -> IO a
withCon Con
cid ((Connection -> IO (Connection, ())) -> IO ())
-> (Connection -> IO (Connection, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Connection
c -> do
    ThreadId
tid <- IO ThreadId
myThreadId
    case ThreadId -> [ThreadEntry] -> Maybe [Transaction]
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup ThreadId
tid (Connection -> [ThreadEntry]
conThrds Connection
c) of
      Maybe [Transaction]
Nothing -> (Connection, ()) -> IO (Connection, ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection
c, ())
      Just [Transaction]
ts -> 
        if [Transaction] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Transaction]
ts 
          then (Connection, ()) -> IO (Connection, ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection
c {conThrds :: [ThreadEntry]
conThrds = (ThreadEntry -> ThreadEntry -> Bool)
-> ThreadEntry -> [ThreadEntry] -> [ThreadEntry]
forall a. (a -> a -> Bool) -> a -> [a] -> [a]
deleteBy' ThreadEntry -> ThreadEntry -> Bool
forall a b. Eq a => (a, b) -> (a, b) -> Bool
eq (ThreadId
tid, []) (Connection -> [ThreadEntry]
conThrds Connection
c)}, ())
          else do
            let ts' :: [Transaction]
ts' = [Transaction] -> [Transaction]
forall a. [a] -> [a]
tail [Transaction]
ts
            if [Transaction] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Transaction]
ts' 
              then (Connection, ()) -> IO (Connection, ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection
c {conThrds :: [ThreadEntry]
conThrds = 
                               (ThreadEntry -> ThreadEntry -> Bool)
-> ThreadEntry -> [ThreadEntry] -> [ThreadEntry]
forall a. (a -> a -> Bool) -> a -> [a] -> [a]
deleteBy' ThreadEntry -> ThreadEntry -> Bool
forall a b. Eq a => (a, b) -> (a, b) -> Bool
eq (ThreadId
tid, [Transaction]
ts) (Connection -> [ThreadEntry]
conThrds Connection
c)},  ())
              else (Connection, ()) -> IO (Connection, ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection
c {conThrds :: [ThreadEntry]
conThrds = (ThreadId
tid, [Transaction]
ts') ThreadEntry -> [ThreadEntry] -> [ThreadEntry]
forall a. a -> [a] -> [a]
: 
                               (ThreadEntry -> ThreadEntry -> Bool)
-> ThreadEntry -> [ThreadEntry] -> [ThreadEntry]
forall a. (a -> a -> Bool) -> a -> [a] -> [a]
deleteBy' ThreadEntry -> ThreadEntry -> Bool
forall a b. Eq a => (a, b) -> (a, b) -> Bool
eq (ThreadId
tid, [Transaction]
ts) (Connection -> [ThreadEntry]
conThrds Connection
c)}, ())

  ---------------------------------------------------------------------
  -- Default version, when broker does not send a version
  ---------------------------------------------------------------------
  defVersion :: F.Version
  defVersion :: Version
defVersion = (Int
1,Int
2)

  ---------------------------------------------------------------------
  -- Subscribe abstraction
  ---------------------------------------------------------------------
  data Subscription = Subscription {
                        Subscription -> Sub
subId   :: Fac.Sub,   -- subscribe identifier
                        Subscription -> String
subName :: String,    -- queue name
                        Subscription -> AckMode
subMode :: F.AckMode  -- ack mode
                      }
    deriving (Int -> Subscription -> ShowS
[Subscription] -> ShowS
Subscription -> String
(Int -> Subscription -> ShowS)
-> (Subscription -> String)
-> ([Subscription] -> ShowS)
-> Show Subscription
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Subscription] -> ShowS
$cshowList :: [Subscription] -> ShowS
show :: Subscription -> String
$cshow :: Subscription -> String
showsPrec :: Int -> Subscription -> ShowS
$cshowsPrec :: Int -> Subscription -> ShowS
Show)

  mkSub :: Fac.Sub -> String -> F.AckMode -> Subscription
  mkSub :: Sub -> String -> AckMode -> Subscription
mkSub Sub
sid String
qn AckMode
am = Subscription :: Sub -> String -> AckMode -> Subscription
Subscription {
                      subId :: Sub
subId   = Sub
sid,
                      subName :: String
subName = String
qn,
                      subMode :: AckMode
subMode = AckMode
am}

  ---------------------------------------------------------------------
  -- | Message Identifier
  ---------------------------------------------------------------------
  data MsgId = MsgId String | NoMsg
    deriving (MsgId -> MsgId -> Bool
(MsgId -> MsgId -> Bool) -> (MsgId -> MsgId -> Bool) -> Eq MsgId
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: MsgId -> MsgId -> Bool
$c/= :: MsgId -> MsgId -> Bool
== :: MsgId -> MsgId -> Bool
$c== :: MsgId -> MsgId -> Bool
Eq)

  instance Show MsgId where
    show :: MsgId -> String
show (MsgId String
s) = String
s
    show (MsgId
NoMsg)   = String
""

  ------------------------------------------------------------------------
  -- | Any content received from a queue
  --   is wrapped in a message.
  --   It is, in particular, the return value of /readQ/.
  ------------------------------------------------------------------------
  data Message a = Msg {
                     -- | The message Identifier
                     Message a -> MsgId
msgId   :: MsgId,
                     -- | The subscription
                     Message a -> Sub
msgSub  :: Fac.Sub,
                     -- | The destination
                     Message a -> String
msgDest :: String,
                     -- | The Ack identifier
                     Message a -> String
msgAck  :: String,
                     -- | The Stomp headers
                     --   that came with the message
                     Message a -> [(String, String)]
msgHdrs :: [F.Header],
                     -- | The /MIME/ type of the content
                     Message a -> Type
msgType :: Mime.Type,
                     -- | The length of the 
                     --   encoded content
                     Message a -> Int
msgLen  :: Int,
                     -- | The transaction, in which 
                     --   the message was received
                     Message a -> Tx
msgTx   :: Fac.Tx,
                     -- | The encoded content             
                     Message a -> ByteString
msgRaw  :: B.ByteString,
                     -- | The content             
                     Message a -> a
msgCont :: a}
  
  ---------------------------------------------------------------------
  -- Create a message
  ---------------------------------------------------------------------
  mkMessage :: MsgId -> Fac.Sub -> String -> String ->
               Mime.Type -> Int -> Fac.Tx -> 
               B.ByteString -> a -> Message a
  mkMessage :: MsgId
-> Sub
-> String
-> String
-> Type
-> Int
-> Tx
-> ByteString
-> a
-> Message a
mkMessage MsgId
mid Sub
sub String
dst String
ak Type
typ Int
len Tx
tx ByteString
raw a
cont = Msg :: forall a.
MsgId
-> Sub
-> String
-> String
-> [(String, String)]
-> Type
-> Int
-> Tx
-> ByteString
-> a
-> Message a
Msg {
                                             msgId :: MsgId
msgId   = MsgId
mid,
                                             msgSub :: Sub
msgSub  = Sub
sub,
                                             msgDest :: String
msgDest = String
dst,
                                             msgAck :: String
msgAck  = String
ak,
                                             msgHdrs :: [(String, String)]
msgHdrs = [], 
                                             msgType :: Type
msgType = Type
typ, 
                                             msgLen :: Int
msgLen  = Int
len, 
                                             msgTx :: Tx
msgTx   = Tx
tx,
                                             msgRaw :: ByteString
msgRaw  = ByteString
raw,
                                             msgCont :: a
msgCont = a
cont}