{-# LANGUAGE BangPatterns, CPP, OverloadedStrings, ScopedTypeVariables, LambdaCase #-}
module Network.AMQP.Internal where
import Paths_amqp(version)
import Data.Version(showVersion)
import Control.Applicative
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import Data.Binary
import Data.Binary.Get
import Data.Binary.Put as BPut
import Data.Int (Int64)
import Data.Maybe
import Data.Text (Text)
import Network.Socket (PortNumber, withSocketsDo)
import System.IO (hPutStrLn, stderr)
import qualified Control.Exception as CE
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as BC
import qualified Data.ByteString.Lazy as BL
import qualified Data.Map as M
import qualified Data.Foldable as F
import qualified Data.IntMap as IM
import qualified Data.IntSet as IntSet
import qualified Data.Sequence as Seq
import qualified Data.Text as T
import qualified Data.Text.Encoding as E
import qualified Network.Connection as Conn
import Network.AMQP.Protocol
import Network.AMQP.Types
import Network.AMQP.Helpers
import Network.AMQP.Generated
import Network.AMQP.ChannelAllocator
data AckType = BasicAck | BasicNack deriving Int -> AckType -> ShowS
[AckType] -> ShowS
AckType -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [AckType] -> ShowS
$cshowList :: [AckType] -> ShowS
show :: AckType -> [Char]
$cshow :: AckType -> [Char]
showsPrec :: Int -> AckType -> ShowS
$cshowsPrec :: Int -> AckType -> ShowS
Show
data DeliveryMode = Persistent
| NonPersistent
deriving (DeliveryMode -> DeliveryMode -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: DeliveryMode -> DeliveryMode -> Bool
$c/= :: DeliveryMode -> DeliveryMode -> Bool
== :: DeliveryMode -> DeliveryMode -> Bool
$c== :: DeliveryMode -> DeliveryMode -> Bool
Eq, Eq DeliveryMode
DeliveryMode -> DeliveryMode -> Bool
DeliveryMode -> DeliveryMode -> Ordering
DeliveryMode -> DeliveryMode -> DeliveryMode
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: DeliveryMode -> DeliveryMode -> DeliveryMode
$cmin :: DeliveryMode -> DeliveryMode -> DeliveryMode
max :: DeliveryMode -> DeliveryMode -> DeliveryMode
$cmax :: DeliveryMode -> DeliveryMode -> DeliveryMode
>= :: DeliveryMode -> DeliveryMode -> Bool
$c>= :: DeliveryMode -> DeliveryMode -> Bool
> :: DeliveryMode -> DeliveryMode -> Bool
$c> :: DeliveryMode -> DeliveryMode -> Bool
<= :: DeliveryMode -> DeliveryMode -> Bool
$c<= :: DeliveryMode -> DeliveryMode -> Bool
< :: DeliveryMode -> DeliveryMode -> Bool
$c< :: DeliveryMode -> DeliveryMode -> Bool
compare :: DeliveryMode -> DeliveryMode -> Ordering
$ccompare :: DeliveryMode -> DeliveryMode -> Ordering
Ord, ReadPrec [DeliveryMode]
ReadPrec DeliveryMode
Int -> ReadS DeliveryMode
ReadS [DeliveryMode]
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
readListPrec :: ReadPrec [DeliveryMode]
$creadListPrec :: ReadPrec [DeliveryMode]
readPrec :: ReadPrec DeliveryMode
$creadPrec :: ReadPrec DeliveryMode
readList :: ReadS [DeliveryMode]
$creadList :: ReadS [DeliveryMode]
readsPrec :: Int -> ReadS DeliveryMode
$creadsPrec :: Int -> ReadS DeliveryMode
Read, Int -> DeliveryMode -> ShowS
[DeliveryMode] -> ShowS
DeliveryMode -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [DeliveryMode] -> ShowS
$cshowList :: [DeliveryMode] -> ShowS
show :: DeliveryMode -> [Char]
$cshow :: DeliveryMode -> [Char]
showsPrec :: Int -> DeliveryMode -> ShowS
$cshowsPrec :: Int -> DeliveryMode -> ShowS
Show)
deliveryModeToInt :: DeliveryMode -> Octet
deliveryModeToInt :: DeliveryMode -> Word8
deliveryModeToInt DeliveryMode
NonPersistent = Word8
1
deliveryModeToInt DeliveryMode
Persistent = Word8
2
intToDeliveryMode :: Octet -> DeliveryMode
intToDeliveryMode :: Word8 -> DeliveryMode
intToDeliveryMode Word8
1 = DeliveryMode
NonPersistent
intToDeliveryMode Word8
2 = DeliveryMode
Persistent
intToDeliveryMode Word8
n = forall a. HasCallStack => [Char] -> a
error ([Char]
"Unknown delivery mode int: " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show Word8
n)
data Message = Message {
Message -> ByteString
msgBody :: BL.ByteString,
Message -> Maybe DeliveryMode
msgDeliveryMode :: Maybe DeliveryMode,
Message -> Maybe LongLongInt
msgTimestamp :: Maybe Timestamp,
Message -> Maybe Text
msgID :: Maybe Text,
Message -> Maybe Text
msgType :: Maybe Text,
Message -> Maybe Text
msgUserID :: Maybe Text,
Message -> Maybe Text
msgApplicationID :: Maybe Text,
Message -> Maybe Text
msgClusterID :: Maybe Text,
Message -> Maybe Text
msgContentType :: Maybe Text,
Message -> Maybe Text
msgContentEncoding :: Maybe Text,
Message -> Maybe Text
msgReplyTo :: Maybe Text,
Message -> Maybe Word8
msgPriority :: Maybe Octet,
Message -> Maybe Text
msgCorrelationID :: Maybe Text,
Message -> Maybe Text
msgExpiration :: Maybe Text,
:: Maybe FieldTable
} deriving (Message -> Message -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Message -> Message -> Bool
$c/= :: Message -> Message -> Bool
== :: Message -> Message -> Bool
$c== :: Message -> Message -> Bool
Eq, Eq Message
Message -> Message -> Bool
Message -> Message -> Ordering
Message -> Message -> Message
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: Message -> Message -> Message
$cmin :: Message -> Message -> Message
max :: Message -> Message -> Message
$cmax :: Message -> Message -> Message
>= :: Message -> Message -> Bool
$c>= :: Message -> Message -> Bool
> :: Message -> Message -> Bool
$c> :: Message -> Message -> Bool
<= :: Message -> Message -> Bool
$c<= :: Message -> Message -> Bool
< :: Message -> Message -> Bool
$c< :: Message -> Message -> Bool
compare :: Message -> Message -> Ordering
$ccompare :: Message -> Message -> Ordering
Ord, ReadPrec [Message]
ReadPrec Message
Int -> ReadS Message
ReadS [Message]
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
readListPrec :: ReadPrec [Message]
$creadListPrec :: ReadPrec [Message]
readPrec :: ReadPrec Message
$creadPrec :: ReadPrec Message
readList :: ReadS [Message]
$creadList :: ReadS [Message]
readsPrec :: Int -> ReadS Message
$creadsPrec :: Int -> ReadS Message
Read, Int -> Message -> ShowS
[Message] -> ShowS
Message -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [Message] -> ShowS
$cshowList :: [Message] -> ShowS
show :: Message -> [Char]
$cshow :: Message -> [Char]
showsPrec :: Int -> Message -> ShowS
$cshowsPrec :: Int -> Message -> ShowS
Show)
data Envelope = Envelope {
Envelope -> LongLongInt
envDeliveryTag :: LongLongInt,
Envelope -> Bool
envRedelivered :: Bool,
Envelope -> Text
envExchangeName :: Text,
Envelope -> Text
envRoutingKey :: Text,
Envelope -> Channel
envChannel :: Channel
}
data PublishError = PublishError {
PublishError -> ReturnReplyCode
errReplyCode :: ReturnReplyCode,
PublishError -> Maybe Text
errExchange :: Maybe Text,
PublishError -> Text
errRoutingKey :: Text
} deriving (PublishError -> PublishError -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: PublishError -> PublishError -> Bool
$c/= :: PublishError -> PublishError -> Bool
== :: PublishError -> PublishError -> Bool
$c== :: PublishError -> PublishError -> Bool
Eq, ReadPrec [PublishError]
ReadPrec PublishError
Int -> ReadS PublishError
ReadS [PublishError]
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
readListPrec :: ReadPrec [PublishError]
$creadListPrec :: ReadPrec [PublishError]
readPrec :: ReadPrec PublishError
$creadPrec :: ReadPrec PublishError
readList :: ReadS [PublishError]
$creadList :: ReadS [PublishError]
readsPrec :: Int -> ReadS PublishError
$creadsPrec :: Int -> ReadS PublishError
Read, Int -> PublishError -> ShowS
[PublishError] -> ShowS
PublishError -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [PublishError] -> ShowS
$cshowList :: [PublishError] -> ShowS
show :: PublishError -> [Char]
$cshow :: PublishError -> [Char]
showsPrec :: Int -> PublishError -> ShowS
$cshowsPrec :: Int -> PublishError -> ShowS
Show)
data ReturnReplyCode = Unroutable Text
| NoConsumers Text
| NotFound Text
deriving (ReturnReplyCode -> ReturnReplyCode -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ReturnReplyCode -> ReturnReplyCode -> Bool
$c/= :: ReturnReplyCode -> ReturnReplyCode -> Bool
== :: ReturnReplyCode -> ReturnReplyCode -> Bool
$c== :: ReturnReplyCode -> ReturnReplyCode -> Bool
Eq, ReadPrec [ReturnReplyCode]
ReadPrec ReturnReplyCode
Int -> ReadS ReturnReplyCode
ReadS [ReturnReplyCode]
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
readListPrec :: ReadPrec [ReturnReplyCode]
$creadListPrec :: ReadPrec [ReturnReplyCode]
readPrec :: ReadPrec ReturnReplyCode
$creadPrec :: ReadPrec ReturnReplyCode
readList :: ReadS [ReturnReplyCode]
$creadList :: ReadS [ReturnReplyCode]
readsPrec :: Int -> ReadS ReturnReplyCode
$creadsPrec :: Int -> ReadS ReturnReplyCode
Read, Int -> ReturnReplyCode -> ShowS
[ReturnReplyCode] -> ShowS
ReturnReplyCode -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [ReturnReplyCode] -> ShowS
$cshowList :: [ReturnReplyCode] -> ShowS
show :: ReturnReplyCode -> [Char]
$cshow :: ReturnReplyCode -> [Char]
showsPrec :: Int -> ReturnReplyCode -> ShowS
$cshowsPrec :: Int -> ReturnReplyCode -> ShowS
Show)
data Assembly = SimpleMethod MethodPayload
| ContentMethod MethodPayload ContentHeaderProperties BL.ByteString
deriving Int -> Assembly -> ShowS
[Assembly] -> ShowS
Assembly -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [Assembly] -> ShowS
$cshowList :: [Assembly] -> ShowS
show :: Assembly -> [Char]
$cshow :: Assembly -> [Char]
showsPrec :: Int -> Assembly -> ShowS
$cshowsPrec :: Int -> Assembly -> ShowS
Show
readAssembly :: Chan FramePayload -> IO Assembly
readAssembly :: Chan FramePayload -> IO Assembly
readAssembly Chan FramePayload
chan = do
FramePayload
m <- forall a. Chan a -> IO a
readChan Chan FramePayload
chan
case FramePayload
m of
MethodPayload MethodPayload
p ->
if FramePayload -> Bool
hasContent FramePayload
m
then do
(ContentHeaderProperties
props, ByteString
msg) <- Chan FramePayload -> IO (ContentHeaderProperties, ByteString)
collectContent Chan FramePayload
chan
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ MethodPayload -> ContentHeaderProperties -> ByteString -> Assembly
ContentMethod MethodPayload
p ContentHeaderProperties
props ByteString
msg
else do
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod MethodPayload
p
FramePayload
x -> forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"didn't expect frame: " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show FramePayload
x
collectContent :: Chan FramePayload -> IO (ContentHeaderProperties, BL.ByteString)
collectContent :: Chan FramePayload -> IO (ContentHeaderProperties, ByteString)
collectContent Chan FramePayload
chan = do
(ContentHeaderPayload Word16
_ Word16
_ LongLongInt
bodySize ContentHeaderProperties
props) <- forall a. Chan a -> IO a
readChan Chan FramePayload
chan
[ByteString]
content <- ByteOffset -> IO [ByteString]
collect forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral LongLongInt
bodySize
forall (m :: * -> *) a. Monad m => a -> m a
return (ContentHeaderProperties
props, [ByteString] -> ByteString
BL.concat [ByteString]
content)
where
collect :: ByteOffset -> IO [ByteString]
collect ByteOffset
x | ByteOffset
x forall a. Ord a => a -> a -> Bool
<= ByteOffset
0 = forall (m :: * -> *) a. Monad m => a -> m a
return []
collect ByteOffset
x = do
(ContentBodyPayload ByteString
payload) <- forall a. Chan a -> IO a
readChan Chan FramePayload
chan
[ByteString]
r <- ByteOffset -> IO [ByteString]
collect (ByteOffset
x forall a. Num a => a -> a -> a
- ByteString -> ByteOffset
BL.length ByteString
payload)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ ByteString
payload forall a. a -> [a] -> [a]
: [ByteString]
r
data Connection = Connection {
Connection -> Connection
connHandle :: Conn.Connection,
Connection -> ChannelAllocator
connChanAllocator :: ChannelAllocator,
Connection -> MVar (IntMap (Channel, ThreadId))
connChannels :: MVar (IM.IntMap (Channel, ThreadId)),
Connection -> Int
connMaxFrameSize :: Int,
Connection -> MVar (Maybe (CloseType, [Char]))
connClosed :: MVar (Maybe (CloseType, String)),
Connection -> MVar ()
connClosedLock :: MVar (),
Connection -> MVar ()
connWriteLock :: MVar (),
Connection -> MVar [IO ()]
connClosedHandlers :: MVar [IO ()],
Connection -> MVar [(Text -> IO (), IO ())]
connBlockedHandlers :: MVar [(Text -> IO (), IO ())],
Connection -> MVar ByteOffset
connLastReceived :: MVar Int64,
Connection -> MVar ByteOffset
connLastSent :: MVar Int64,
Connection -> FieldTable
connServerProperties :: FieldTable,
Connection -> MVar (Maybe ThreadId)
connThread :: MVar (Maybe ThreadId)
}
data ConnectionOpts = ConnectionOpts {
ConnectionOpts -> [([Char], PortNumber)]
coServers :: ![(String, PortNumber)],
ConnectionOpts -> Text
coVHost :: !Text,
ConnectionOpts -> [SASLMechanism]
coAuth :: ![SASLMechanism],
ConnectionOpts -> Maybe Word32
coMaxFrameSize :: !(Maybe Word32),
ConnectionOpts -> Maybe Word16
coHeartbeatDelay :: !(Maybe Word16),
ConnectionOpts -> Maybe Word16
coMaxChannel :: !(Maybe Word16),
ConnectionOpts -> Maybe TLSSettings
coTLSSettings :: Maybe TLSSettings,
ConnectionOpts -> Maybe Text
coName :: !(Maybe Text)
}
data TLSSettings =
TLSTrusted
| TLSUntrusted
| TLSCustom Conn.TLSSettings
connectionTLSSettings :: TLSSettings -> Maybe Conn.TLSSettings
connectionTLSSettings :: TLSSettings -> Maybe TLSSettings
connectionTLSSettings TLSSettings
tlsSettings =
forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ case TLSSettings
tlsSettings of
TLSSettings
TLSTrusted -> Bool -> Bool -> Bool -> TLSSettings
Conn.TLSSettingsSimple Bool
False Bool
False Bool
False
TLSSettings
TLSUntrusted -> Bool -> Bool -> Bool -> TLSSettings
Conn.TLSSettingsSimple Bool
True Bool
False Bool
False
TLSCustom TLSSettings
x -> TLSSettings
x
data SASLMechanism = SASLMechanism {
SASLMechanism -> Text
saslName :: !Text,
SASLMechanism -> ByteString
saslInitialResponse :: !BS.ByteString,
SASLMechanism -> Maybe (ByteString -> IO ByteString)
saslChallengeFunc :: !(Maybe (BS.ByteString -> IO BS.ByteString))
}
connectionReceiver :: Connection -> IO ()
connectionReceiver :: Connection -> IO ()
connectionReceiver Connection
conn = do
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch (do
Frame Word16
chanID FramePayload
payload <- Connection -> IO Frame
readFrame (Connection -> Connection
connHandle Connection
conn)
Connection -> IO ()
updateLastReceived Connection
conn
forall {a}. (Integral a, Show a) => a -> FramePayload -> IO ()
forwardToChannel Word16
chanID FramePayload
payload
)
(\(IOError
e :: CE.IOException) -> IO ThreadId
myThreadId forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Connection -> CloseType -> SomeException -> ThreadId -> IO ()
killConnection Connection
conn CloseType
Abnormal (forall e. Exception e => e -> SomeException
CE.toException IOError
e))
Connection -> IO ()
connectionReceiver Connection
conn
where
closedByUserEx :: AMQPException
closedByUserEx = CloseType -> [Char] -> AMQPException
ConnectionClosedException CloseType
Normal [Char]
"closed by user"
forwardToChannel :: a -> FramePayload -> IO ()
forwardToChannel a
0 (MethodPayload MethodPayload
Connection_close_ok) =
IO ThreadId
myThreadId forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Connection -> CloseType -> SomeException -> ThreadId -> IO ()
killConnection Connection
conn CloseType
Normal (forall e. Exception e => e -> SomeException
CE.toException AMQPException
closedByUserEx)
forwardToChannel a
0 (MethodPayload (Connection_close Word16
_ (ShortString Text
errorMsg) Word16
_ Word16
_)) = do
Connection -> Frame -> IO ()
writeFrame (Connection -> Connection
connHandle Connection
conn) forall a b. (a -> b) -> a -> b
$ Word16 -> FramePayload -> Frame
Frame Word16
0 forall a b. (a -> b) -> a -> b
$ MethodPayload -> FramePayload
MethodPayload MethodPayload
Connection_close_ok
IO ThreadId
myThreadId forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Connection -> CloseType -> SomeException -> ThreadId -> IO ()
killConnection Connection
conn CloseType
Abnormal (forall e. Exception e => e -> SomeException
CE.toException forall b c a. (b -> c) -> (a -> b) -> a -> c
. CloseType -> [Char] -> AMQPException
ConnectionClosedException CloseType
Abnormal forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> [Char]
T.unpack forall a b. (a -> b) -> a -> b
$ Text
errorMsg)
forwardToChannel a
0 FramePayload
HeartbeatPayload = forall (m :: * -> *) a. Monad m => a -> m a
return ()
forwardToChannel a
0 (MethodPayload (Connection_blocked ShortString
reason)) = ShortString -> IO ()
handleBlocked ShortString
reason
forwardToChannel a
0 (MethodPayload MethodPayload
Connection_unblocked) = IO ()
handleUnblocked
forwardToChannel a
0 FramePayload
payload = Handle -> [Char] -> IO ()
hPutStrLn Handle
stderr forall a b. (a -> b) -> a -> b
$ [Char]
"Got unexpected msg on channel zero: " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show FramePayload
payload
forwardToChannel a
chanID FramePayload
payload = do
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Connection -> MVar (IntMap (Channel, ThreadId))
connChannels Connection
conn) forall a b. (a -> b) -> a -> b
$ \IntMap (Channel, ThreadId)
cs -> do
case forall a. Int -> IntMap a -> Maybe a
IM.lookup (forall a b. (Integral a, Num b) => a -> b
fromIntegral a
chanID) IntMap (Channel, ThreadId)
cs of
Just (Channel, ThreadId)
c -> forall a. Chan a -> a -> IO ()
writeChan (Channel -> Chan FramePayload
inQueue forall a b. (a -> b) -> a -> b
$ forall a b. (a, b) -> a
fst (Channel, ThreadId)
c) FramePayload
payload
Maybe (Channel, ThreadId)
Nothing -> Handle -> [Char] -> IO ()
hPutStrLn Handle
stderr forall a b. (a -> b) -> a -> b
$ [Char]
"ERROR: channel not open " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show a
chanID
handleBlocked :: ShortString -> IO ()
handleBlocked (ShortString Text
reason) = do
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Connection -> MVar [(Text -> IO (), IO ())]
connBlockedHandlers Connection
conn) forall a b. (a -> b) -> a -> b
$ \[(Text -> IO (), IO ())]
listeners ->
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(Text -> IO (), IO ())]
listeners forall a b. (a -> b) -> a -> b
$ \(Text -> IO ()
l, IO ()
_) -> forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch (Text -> IO ()
l Text
reason) forall a b. (a -> b) -> a -> b
$ \(SomeException
ex :: CE.SomeException) ->
Handle -> [Char] -> IO ()
hPutStrLn Handle
stderr forall a b. (a -> b) -> a -> b
$ [Char]
"connection blocked listener threw exception: "forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show SomeException
ex
handleUnblocked :: IO ()
handleUnblocked = do
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Connection -> MVar [(Text -> IO (), IO ())]
connBlockedHandlers Connection
conn) forall a b. (a -> b) -> a -> b
$ \[(Text -> IO (), IO ())]
listeners ->
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(Text -> IO (), IO ())]
listeners forall a b. (a -> b) -> a -> b
$ \(Text -> IO ()
_, IO ()
l) -> forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch IO ()
l forall a b. (a -> b) -> a -> b
$ \(SomeException
ex :: CE.SomeException) ->
Handle -> [Char] -> IO ()
hPutStrLn Handle
stderr forall a b. (a -> b) -> a -> b
$ [Char]
"connection unblocked listener threw exception: "forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show SomeException
ex
openConnection'' :: ConnectionOpts -> IO Connection
openConnection'' :: ConnectionOpts -> IO Connection
openConnection'' ConnectionOpts
connOpts = forall a. IO a -> IO a
withSocketsDo forall a b. (a -> b) -> a -> b
$ do
Connection
handle <- [SomeException] -> [([Char], PortNumber)] -> IO Connection
connect [] forall a b. (a -> b) -> a -> b
$ ConnectionOpts -> [([Char], PortNumber)]
coServers ConnectionOpts
connOpts
(Word32
maxFrameSize, Word16
maxChannel, Maybe Word16
heartbeatTimeout, FieldTable
serverProps) <- forall e a. Exception e => (e -> IO a) -> IO a -> IO a
CE.handle (\(IOError
_ :: CE.IOException) -> forall e a. Exception e => e -> IO a
CE.throwIO forall a b. (a -> b) -> a -> b
$ CloseType -> [Char] -> AMQPException
ConnectionClosedException CloseType
Abnormal [Char]
"Handshake failed. Please check the RabbitMQ logs for more information") forall a b. (a -> b) -> a -> b
$ do
Connection -> ByteString -> IO ()
Conn.connectionPut Connection
handle forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString -> ByteString
BS.append ([Char] -> ByteString
BC.pack [Char]
"AMQP")
([Word8] -> ByteString
BS.pack [
Word8
1
, Word8
1
, Word8
0
, Word8
9
])
Frame Word16
0 (MethodPayload (Connection_start Word8
_ Word8
_ FieldTable
serverProps (LongString ByteString
serverMechanisms) LongString
_)) <- Connection -> IO Frame
readFrame Connection
handle
SASLMechanism
selectedSASL <- Connection -> ByteString -> IO SASLMechanism
selectSASLMechanism Connection
handle ByteString
serverMechanisms
Connection -> Frame -> IO ()
writeFrame Connection
handle forall a b. (a -> b) -> a -> b
$ SASLMechanism -> Frame
start_ok SASLMechanism
selectedSASL
Frame Word16
0 (MethodPayload (Connection_tune Word16
channel_max Word32
frame_max Word16
sendHeartbeat)) <- Connection -> SASLMechanism -> IO Frame
handleSecureUntilTune Connection
handle SASLMechanism
selectedSASL
let maxFrameSize :: Word32
maxFrameSize = forall a. Ord a => a -> Maybe a -> a
chooseMin Word32
frame_max forall a b. (a -> b) -> a -> b
$ ConnectionOpts -> Maybe Word32
coMaxFrameSize ConnectionOpts
connOpts
finalHeartbeatSec :: Word16
finalHeartbeatSec = forall a. a -> Maybe a -> a
fromMaybe Word16
sendHeartbeat (ConnectionOpts -> Maybe Word16
coHeartbeatDelay ConnectionOpts
connOpts)
heartbeatTimeout :: Maybe Word16
heartbeatTimeout = forall (m :: * -> *) a. MonadPlus m => (a -> Bool) -> m a -> m a
mfilter (forall a. Eq a => a -> a -> Bool
/=Word16
0) (forall a. a -> Maybe a
Just Word16
finalHeartbeatSec)
fixChanNum :: a -> a
fixChanNum a
x = if a
x forall a. Eq a => a -> a -> Bool
== a
0 then a
65535 else a
x
maxChannel :: Word16
maxChannel = forall a. Ord a => a -> Maybe a -> a
chooseMin (forall {a}. (Eq a, Num a) => a -> a
fixChanNum Word16
channel_max) forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall {a}. (Eq a, Num a) => a -> a
fixChanNum forall a b. (a -> b) -> a -> b
$ ConnectionOpts -> Maybe Word16
coMaxChannel ConnectionOpts
connOpts
Connection -> Frame -> IO ()
writeFrame Connection
handle (Word16 -> FramePayload -> Frame
Frame Word16
0 (MethodPayload -> FramePayload
MethodPayload
(Word16 -> Word32 -> Word16 -> MethodPayload
Connection_tune_ok Word16
maxChannel Word32
maxFrameSize Word16
finalHeartbeatSec)
))
Connection -> Frame -> IO ()
writeFrame Connection
handle Frame
open
Frame Word16
0 (MethodPayload (Connection_open_ok ShortString
_)) <- Connection -> IO Frame
readFrame Connection
handle
forall (m :: * -> *) a. Monad m => a -> m a
return (Word32
maxFrameSize, Word16
maxChannel, Maybe Word16
heartbeatTimeout, FieldTable
serverProps)
MVar (IntMap (Channel, ThreadId))
cChannels <- forall a. a -> IO (MVar a)
newMVar forall a. IntMap a
IM.empty
MVar (Maybe (CloseType, [Char]))
cClosed <- forall a. a -> IO (MVar a)
newMVar forall a. Maybe a
Nothing
ChannelAllocator
cChanAllocator <- Int -> IO ChannelAllocator
newChannelAllocator forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
maxChannel
Int
_ <- ChannelAllocator -> IO Int
allocateChannel ChannelAllocator
cChanAllocator
MVar ()
writeLock <- forall a. a -> IO (MVar a)
newMVar ()
MVar ()
ccl <- forall a. IO (MVar a)
newEmptyMVar
MVar [IO ()]
cClosedHandlers <- forall a. a -> IO (MVar a)
newMVar []
MVar [(Text -> IO (), IO ())]
cBlockedHandlers <- forall a. a -> IO (MVar a)
newMVar []
MVar ByteOffset
cLastReceived <- IO ByteOffset
getTimestamp forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. a -> IO (MVar a)
newMVar
MVar ByteOffset
cLastSent <- IO ByteOffset
getTimestamp forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. a -> IO (MVar a)
newMVar
MVar (Maybe ThreadId)
cThread <- forall a. a -> IO (MVar a)
newMVar forall a. Maybe a
Nothing
let conn :: Connection
conn = Connection
-> ChannelAllocator
-> MVar (IntMap (Channel, ThreadId))
-> Int
-> MVar (Maybe (CloseType, [Char]))
-> MVar ()
-> MVar ()
-> MVar [IO ()]
-> MVar [(Text -> IO (), IO ())]
-> MVar ByteOffset
-> MVar ByteOffset
-> FieldTable
-> MVar (Maybe ThreadId)
-> Connection
Connection Connection
handle ChannelAllocator
cChanAllocator MVar (IntMap (Channel, ThreadId))
cChannels (forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
maxFrameSize) MVar (Maybe (CloseType, [Char]))
cClosed MVar ()
ccl MVar ()
writeLock MVar [IO ()]
cClosedHandlers MVar [(Text -> IO (), IO ())]
cBlockedHandlers MVar ByteOffset
cLastReceived MVar ByteOffset
cLastSent FieldTable
serverProps MVar (Maybe ThreadId)
cThread
ThreadId
connThreadId <- forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally' (Connection -> IO ()
connectionReceiver Connection
conn) forall a b. (a -> b) -> a -> b
$ \Either SomeException ()
res -> do
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch (Connection -> IO ()
Conn.connectionClose Connection
handle) (\(SomeException
_ :: CE.SomeException) -> forall (m :: * -> *) a. Monad m => a -> m a
return ())
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar (Maybe (CloseType, [Char]))
cClosed forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. a -> Maybe a
Just forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. a -> Maybe a -> a
fromMaybe (CloseType
Abnormal, [Char]
"unknown reason")
let finaliser :: ChanThreadKilledException
finaliser = SomeException -> ChanThreadKilledException
ChanThreadKilledException forall a b. (a -> b) -> a -> b
$ case Either SomeException ()
res of
Left SomeException
ex -> SomeException
ex
Right ()
_ -> forall e. Exception e => e -> SomeException
CE.toException AsyncException
CE.ThreadKilled
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar (IntMap (Channel, ThreadId))
cChannels forall a b. (a -> b) -> a -> b
$ \IntMap (Channel, ThreadId)
x -> do
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (forall a b c. (a -> b -> c) -> b -> a -> c
flip forall e. Exception e => ThreadId -> e -> IO ()
CE.throwTo ChanThreadKilledException
finaliser forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a, b) -> b
snd) forall a b. (a -> b) -> a -> b
$ forall a. IntMap a -> [a]
IM.elems IntMap (Channel, ThreadId)
x
forall (m :: * -> *) a. Monad m => a -> m a
return forall a. IntMap a
IM.empty
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ()
ccl ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar [IO ()]
cClosedHandlers forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_
case Maybe Word16
heartbeatTimeout of
Maybe Word16
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just Word16
timeout -> do
ThreadId
heartbeatThread <- Connection -> Int -> ThreadId -> IO ThreadId
watchHeartbeats Connection
conn (forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
timeout) ThreadId
connThreadId
Connection -> Bool -> IO () -> IO ()
addConnectionClosedHandler Connection
conn Bool
True (ThreadId -> IO ()
killThread ThreadId
heartbeatThread)
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar (Maybe ThreadId)
cThread forall a b. (a -> b) -> a -> b
$ \Maybe ThreadId
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just ThreadId
connThreadId
forall (m :: * -> *) a. Monad m => a -> m a
return Connection
conn
where
connect :: [SomeException] -> [([Char], PortNumber)] -> IO Connection
connect [SomeException]
excs (([Char]
host, PortNumber
port) : [([Char], PortNumber)]
rest) = do
ConnectionContext
ctx <- IO ConnectionContext
Conn.initConnectionContext
Either SomeException Connection
result <- forall e a. Exception e => IO a -> IO (Either e a)
CE.try forall a b. (a -> b) -> a -> b
$ ConnectionContext -> ConnectionParams -> IO Connection
Conn.connectTo ConnectionContext
ctx forall a b. (a -> b) -> a -> b
$ Conn.ConnectionParams
{ connectionHostname :: [Char]
Conn.connectionHostname = [Char]
host
, connectionPort :: PortNumber
Conn.connectionPort = PortNumber
port
, connectionUseSecure :: Maybe TLSSettings
Conn.connectionUseSecure = Maybe TLSSettings
tlsSettings
, connectionUseSocks :: Maybe ProxySettings
Conn.connectionUseSocks = forall a. Maybe a
Nothing
}
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (\(SomeException
ex :: CE.SomeException) -> [SomeException] -> [([Char], PortNumber)] -> IO Connection
connect (SomeException
exforall a. a -> [a] -> [a]
:[SomeException]
excs) [([Char], PortNumber)]
rest)
forall (m :: * -> *) a. Monad m => a -> m a
return
Either SomeException Connection
result
connect [SomeException]
excs [] = forall e a. Exception e => e -> IO a
CE.throwIO forall a b. (a -> b) -> a -> b
$ CloseType -> [Char] -> AMQPException
ConnectionClosedException CloseType
Abnormal forall a b. (a -> b) -> a -> b
$ [Char]
"Could not connect to any of the provided brokers: " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show (forall a b. [a] -> [b] -> [(a, b)]
zip (ConnectionOpts -> [([Char], PortNumber)]
coServers ConnectionOpts
connOpts) (forall a. [a] -> [a]
reverse [SomeException]
excs))
tlsSettings :: Maybe TLSSettings
tlsSettings = forall b a. b -> (a -> b) -> Maybe a -> b
maybe forall a. Maybe a
Nothing TLSSettings -> Maybe TLSSettings
connectionTLSSettings (ConnectionOpts -> Maybe TLSSettings
coTLSSettings ConnectionOpts
connOpts)
selectSASLMechanism :: Connection -> ByteString -> IO SASLMechanism
selectSASLMechanism Connection
handle ByteString
serverMechanisms =
let serverSaslList :: [Text]
serverSaslList = (Char -> Bool) -> Text -> [Text]
T.split (forall a. Eq a => a -> a -> Bool
== Char
' ') forall a b. (a -> b) -> a -> b
$ ByteString -> Text
E.decodeUtf8 ByteString
serverMechanisms
clientMechanisms :: [SASLMechanism]
clientMechanisms = ConnectionOpts -> [SASLMechanism]
coAuth ConnectionOpts
connOpts
clientSaslList :: [Text]
clientSaslList = forall a b. (a -> b) -> [a] -> [b]
map SASLMechanism -> Text
saslName [SASLMechanism]
clientMechanisms
maybeSasl :: Maybe SASLMechanism
maybeSasl = forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
F.find (\(SASLMechanism Text
name ByteString
_ Maybe (ByteString -> IO ByteString)
_) -> forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
elem Text
name [Text]
serverSaslList) [SASLMechanism]
clientMechanisms
in forall {b}. Maybe b -> Connection -> [Char] -> IO b
abortIfNothing Maybe SASLMechanism
maybeSasl Connection
handle
([Char]
"None of the provided SASL mechanisms "forall a. [a] -> [a] -> [a]
++forall a. Show a => a -> [Char]
show [Text]
clientSaslListforall a. [a] -> [a] -> [a]
++[Char]
" is supported by the server "forall a. [a] -> [a] -> [a]
++forall a. Show a => a -> [Char]
show [Text]
serverSaslListforall a. [a] -> [a] -> [a]
++[Char]
".")
start_ok :: SASLMechanism -> Frame
start_ok SASLMechanism
sasl = Word16 -> FramePayload -> Frame
Frame Word16
0 forall a b. (a -> b) -> a -> b
$ MethodPayload -> FramePayload
MethodPayload forall a b. (a -> b) -> a -> b
$ FieldTable
-> ShortString -> LongString -> ShortString -> MethodPayload
Connection_start_ok
FieldTable
clientProperties
(Text -> ShortString
ShortString forall a b. (a -> b) -> a -> b
$ SASLMechanism -> Text
saslName SASLMechanism
sasl)
(ByteString -> LongString
LongString forall a b. (a -> b) -> a -> b
$ SASLMechanism -> ByteString
saslInitialResponse SASLMechanism
sasl)
(Text -> ShortString
ShortString Text
"en_US")
where
clientProperties :: FieldTable
clientProperties = Map Text FieldValue -> FieldTable
FieldTable forall a b. (a -> b) -> a -> b
$ forall k a. Ord k => [(k, a)] -> Map k a
M.fromList forall a b. (a -> b) -> a -> b
$ [
(Text
"platform", ByteString -> FieldValue
FVString ByteString
"Haskell"),
(Text
"version" , ByteString -> FieldValue
FVString forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> ByteString
E.encodeUtf8 forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Char] -> Text
T.pack forall a b. (a -> b) -> a -> b
$ Version -> [Char]
showVersion Version
version),
(Text
"capabilities", FieldTable -> FieldValue
FVFieldTable FieldTable
clientCapabilities)
] forall a. [a] -> [a] -> [a]
++ forall b a. b -> (a -> b) -> Maybe a -> b
maybe [] (\Text
x -> [(Text
"connection_name", ByteString -> FieldValue
FVString forall a b. (a -> b) -> a -> b
$ Text -> ByteString
E.encodeUtf8 Text
x)]) (ConnectionOpts -> Maybe Text
coName ConnectionOpts
connOpts)
clientCapabilities :: FieldTable
clientCapabilities = Map Text FieldValue -> FieldTable
FieldTable forall a b. (a -> b) -> a -> b
$ forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [
(Text
"consumer_cancel_notify", Bool -> FieldValue
FVBool Bool
True),
(Text
"connection.blocked", Bool -> FieldValue
FVBool Bool
True)
]
handleSecureUntilTune :: Connection -> SASLMechanism -> IO Frame
handleSecureUntilTune Connection
handle SASLMechanism
sasl = do
Frame
tuneOrSecure <- Connection -> IO Frame
readFrame Connection
handle
case Frame
tuneOrSecure of
Frame Word16
0 (MethodPayload (Connection_secure (LongString ByteString
challenge))) -> do
ByteString -> IO ByteString
processChallenge <- forall {b}. Maybe b -> Connection -> [Char] -> IO b
abortIfNothing (SASLMechanism -> Maybe (ByteString -> IO ByteString)
saslChallengeFunc SASLMechanism
sasl)
Connection
handle forall a b. (a -> b) -> a -> b
$ [Char]
"The server provided a challenge, but the selected SASL mechanism "forall a. [a] -> [a] -> [a]
++forall a. Show a => a -> [Char]
show (SASLMechanism -> Text
saslName SASLMechanism
sasl)forall a. [a] -> [a] -> [a]
++[Char]
" is not equipped with a challenge processing function."
ByteString
challengeResponse <- ByteString -> IO ByteString
processChallenge ByteString
challenge
Connection -> Frame -> IO ()
writeFrame Connection
handle (Word16 -> FramePayload -> Frame
Frame Word16
0 (MethodPayload -> FramePayload
MethodPayload (LongString -> MethodPayload
Connection_secure_ok (ByteString -> LongString
LongString ByteString
challengeResponse))))
Connection -> SASLMechanism -> IO Frame
handleSecureUntilTune Connection
handle SASLMechanism
sasl
tune :: Frame
tune@(Frame Word16
0 (MethodPayload Connection_tune{})) -> forall (m :: * -> *) a. Monad m => a -> m a
return Frame
tune
Frame
x -> forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"handleSecureUntilTune fail. received message: "forall a. [a] -> [a] -> [a]
++forall a. Show a => a -> [Char]
show Frame
x
open :: Frame
open = Word16 -> FramePayload -> Frame
Frame Word16
0 forall a b. (a -> b) -> a -> b
$ MethodPayload -> FramePayload
MethodPayload forall a b. (a -> b) -> a -> b
$ ShortString -> ShortString -> Bool -> MethodPayload
Connection_open
(Text -> ShortString
ShortString forall a b. (a -> b) -> a -> b
$ ConnectionOpts -> Text
coVHost ConnectionOpts
connOpts)
(Text -> ShortString
ShortString forall a b. (a -> b) -> a -> b
$ [Char] -> Text
T.pack [Char]
"")
Bool
True
abortHandshake :: Connection -> [Char] -> IO b
abortHandshake Connection
handle [Char]
msg = do
Connection -> IO ()
Conn.connectionClose Connection
handle
forall e a. Exception e => e -> IO a
CE.throwIO forall a b. (a -> b) -> a -> b
$ CloseType -> [Char] -> AMQPException
ConnectionClosedException CloseType
Abnormal [Char]
msg
abortIfNothing :: Maybe b -> Connection -> [Char] -> IO b
abortIfNothing Maybe b
m Connection
handle [Char]
msg = case Maybe b
m of
Maybe b
Nothing -> forall {b}. Connection -> [Char] -> IO b
abortHandshake Connection
handle [Char]
msg
Just b
a -> forall (m :: * -> *) a. Monad m => a -> m a
return b
a
watchHeartbeats :: Connection -> Int -> ThreadId -> IO ThreadId
watchHeartbeats :: Connection -> Int -> ThreadId -> IO ThreadId
watchHeartbeats Connection
conn Int
timeout ThreadId
connThread = Int -> IO () -> IO ThreadId
scheduleAtFixedRate Int
rate forall a b. (a -> b) -> a -> b
$ do
IO ()
checkSendTimeout
IO ()
checkReceiveTimeout
where
rate :: Int
rate = Int
timeout forall a. Num a => a -> a -> a
* Int
1000 forall a. Num a => a -> a -> a
* Int
250
receiveTimeout :: ByteOffset
receiveTimeout = forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
rate forall a. Num a => a -> a -> a
* ByteOffset
4 forall a. Num a => a -> a -> a
* ByteOffset
2
sendTimeout :: ByteOffset
sendTimeout = forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
rate forall a. Num a => a -> a -> a
* ByteOffset
2
skippedBeatEx :: AMQPException
skippedBeatEx = CloseType -> [Char] -> AMQPException
ConnectionClosedException CloseType
Abnormal [Char]
"killed connection after missing 2 consecutive heartbeats"
checkReceiveTimeout :: IO ()
checkReceiveTimeout = MVar ByteOffset -> ByteOffset -> IO () -> IO ()
doCheck (Connection -> MVar ByteOffset
connLastReceived Connection
conn) ByteOffset
receiveTimeout forall a b. (a -> b) -> a -> b
$
Connection -> CloseType -> SomeException -> ThreadId -> IO ()
killConnection Connection
conn CloseType
Abnormal (forall e. Exception e => e -> SomeException
CE.toException AMQPException
skippedBeatEx) ThreadId
connThread
checkSendTimeout :: IO ()
checkSendTimeout = MVar ByteOffset -> ByteOffset -> IO () -> IO ()
doCheck (Connection -> MVar ByteOffset
connLastSent Connection
conn) ByteOffset
sendTimeout forall a b. (a -> b) -> a -> b
$
Connection -> Frame -> IO ()
writeFrame (Connection -> Connection
connHandle Connection
conn) (Word16 -> FramePayload -> Frame
Frame Word16
0 FramePayload
HeartbeatPayload)
doCheck :: MVar ByteOffset -> ByteOffset -> IO () -> IO ()
doCheck MVar ByteOffset
var ByteOffset
timeout_µs IO ()
action = forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar ByteOffset
var forall a b. (a -> b) -> a -> b
$ \ByteOffset
lastFrameTime -> do
ByteOffset
time <- IO ByteOffset
getTimestamp
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteOffset
time forall a. Ord a => a -> a -> Bool
>= ByteOffset
lastFrameTime forall a. Num a => a -> a -> a
+ ByteOffset
timeout_µs) IO ()
action
updateLastSent :: Connection -> IO ()
updateLastSent :: Connection -> IO ()
updateLastSent Connection
conn = forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Connection -> MVar ByteOffset
connLastSent Connection
conn) (forall a b. a -> b -> a
const IO ByteOffset
getTimestamp)
updateLastReceived :: Connection -> IO ()
updateLastReceived :: Connection -> IO ()
updateLastReceived Connection
conn = forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Connection -> MVar ByteOffset
connLastReceived Connection
conn) (forall a b. a -> b -> a
const IO ByteOffset
getTimestamp)
killConnection :: Connection -> CloseType -> CE.SomeException -> ThreadId -> IO ()
killConnection :: Connection -> CloseType -> SomeException -> ThreadId -> IO ()
killConnection Connection
conn CloseType
closeType SomeException
ex ThreadId
connThreadId = do
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Connection -> MVar (Maybe (CloseType, [Char]))
connClosed Connection
conn) forall a b. (a -> b) -> a -> b
$ forall a b. a -> b -> a
const forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just (CloseType
closeType, forall a. Show a => a -> [Char]
show SomeException
ex)
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
connThreadId SomeException
ex
closeConnection :: Connection -> IO ()
closeConnection :: Connection -> IO ()
closeConnection Connection
c = do
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch (
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Connection -> MVar ()
connWriteLock Connection
c) forall a b. (a -> b) -> a -> b
$ \()
_ -> Connection -> Frame -> IO ()
writeFrame (Connection -> Connection
connHandle Connection
c) forall a b. (a -> b) -> a -> b
$ Word16 -> FramePayload -> Frame
Frame Word16
0 forall a b. (a -> b) -> a -> b
$ MethodPayload -> FramePayload
MethodPayload forall a b. (a -> b) -> a -> b
$ Word16 -> ShortString -> Word16 -> Word16 -> MethodPayload
Connection_close
Word16
0
(Text -> ShortString
ShortString Text
"")
Word16
0
Word16
0
)
(\ (IOError
e :: CE.IOException) -> do
Just ThreadId
thrID <- forall a. MVar a -> IO a
readMVar (Connection -> MVar (Maybe ThreadId)
connThread Connection
c)
Connection -> CloseType -> SomeException -> ThreadId -> IO ()
killConnection Connection
c CloseType
Abnormal (forall e. Exception e => e -> SomeException
CE.toException IOError
e) ThreadId
thrID
)
forall a. MVar a -> IO a
readMVar forall a b. (a -> b) -> a -> b
$ Connection -> MVar ()
connClosedLock Connection
c
forall (m :: * -> *) a. Monad m => a -> m a
return ()
getServerProperties :: Connection -> IO FieldTable
getServerProperties :: Connection -> IO FieldTable
getServerProperties = forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> FieldTable
connServerProperties
addConnectionClosedHandler :: Connection -> Bool -> IO () -> IO ()
addConnectionClosedHandler :: Connection -> Bool -> IO () -> IO ()
addConnectionClosedHandler Connection
conn Bool
ifClosed IO ()
handler = do
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Connection -> MVar (Maybe (CloseType, [Char]))
connClosed Connection
conn) forall a b. (a -> b) -> a -> b
$ \case
Just (CloseType, [Char])
_ | Bool
ifClosed -> IO ()
handler
Maybe (CloseType, [Char])
_ -> forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Connection -> MVar [IO ()]
connClosedHandlers Connection
conn) forall a b. (a -> b) -> a -> b
$ \[IO ()]
old -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ IO ()
handlerforall a. a -> [a] -> [a]
:[IO ()]
old
addConnectionBlockedHandler :: Connection -> (Text -> IO ()) -> IO () -> IO ()
addConnectionBlockedHandler :: Connection -> (Text -> IO ()) -> IO () -> IO ()
addConnectionBlockedHandler Connection
conn Text -> IO ()
blockedHandler IO ()
unblockedHandler =
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Connection -> MVar [(Text -> IO (), IO ())]
connBlockedHandlers Connection
conn) forall a b. (a -> b) -> a -> b
$ \[(Text -> IO (), IO ())]
old -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ (Text -> IO ()
blockedHandler, IO ()
unblockedHandler)forall a. a -> [a] -> [a]
:[(Text -> IO (), IO ())]
old
readFrame :: Conn.Connection -> IO Frame
readFrame :: Connection -> IO Frame
readFrame Connection
handle = do
ByteString
strictDat <- Connection -> Int -> IO ByteString
connectionGetExact Connection
handle Int
7
let dat :: ByteString
dat = ByteString -> ByteString
toLazy ByteString
strictDat
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteString -> Bool
BL.null ByteString
dat) forall a b. (a -> b) -> a -> b
$ forall e a. Exception e => e -> IO a
CE.throwIO forall a b. (a -> b) -> a -> b
$ [Char] -> IOError
userError [Char]
"connection not open"
let len :: Int
len = forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ ByteString -> Word32
peekFrameSize ByteString
dat
ByteString
strictDat' <- Connection -> Int -> IO ByteString
connectionGetExact Connection
handle (Int
lenforall a. Num a => a -> a -> a
+Int
1)
let dat' :: ByteString
dat' = ByteString -> ByteString
toLazy ByteString
strictDat'
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteString -> Bool
BL.null ByteString
dat') forall a b. (a -> b) -> a -> b
$ forall e a. Exception e => e -> IO a
CE.throwIO forall a b. (a -> b) -> a -> b
$ [Char] -> IOError
userError [Char]
"connection not open"
#if MIN_VERSION_binary(0, 7, 0)
let ret :: Either
(ByteString, ByteOffset, [Char]) (ByteString, ByteOffset, Frame)
ret = forall a.
Get a
-> ByteString
-> Either
(ByteString, ByteOffset, [Char]) (ByteString, ByteOffset, a)
runGetOrFail forall t. Binary t => Get t
get (ByteString -> ByteString -> ByteString
BL.append ByteString
dat ByteString
dat')
case Either
(ByteString, ByteOffset, [Char]) (ByteString, ByteOffset, Frame)
ret of
Left (ByteString
_, ByteOffset
_, [Char]
errMsg) -> forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"readFrame fail: " forall a. [a] -> [a] -> [a]
++ [Char]
errMsg
Right (ByteString
_, ByteOffset
consumedBytes, Frame
_) | ByteOffset
consumedBytes forall a. Eq a => a -> a -> Bool
/= forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int
lenforall a. Num a => a -> a -> a
+Int
8) ->
forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"readFrame: parser should read " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show (Int
lenforall a. Num a => a -> a -> a
+Int
8) forall a. [a] -> [a] -> [a]
++ [Char]
" bytes; but read " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show ByteOffset
consumedBytes
Right (ByteString
_, ByteOffset
_, Frame
frame) -> forall (m :: * -> *) a. Monad m => a -> m a
return Frame
frame
#else
let (frame, _, consumedBytes) = runGetState get (BL.append dat dat') 0
if consumedBytes /= fromIntegral (len+8)
then error $ "readFrameSock: parser should read "++show (len+8)++" bytes; but read "++show consumedBytes
else return ()
return frame
#endif
connectionGetExact :: Conn.Connection -> Int -> IO BS.ByteString
connectionGetExact :: Connection -> Int -> IO ByteString
connectionGetExact Connection
conn Int
x = ByteString -> Int -> IO ByteString
loop ByteString
BS.empty Int
0
where loop :: ByteString -> Int -> IO ByteString
loop ByteString
bs Int
y
| Int
y forall a. Eq a => a -> a -> Bool
== Int
x = forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
bs
| Bool
otherwise = do
ByteString
next <- Connection -> Int -> IO ByteString
Conn.connectionGet Connection
conn (Int
x forall a. Num a => a -> a -> a
- Int
y)
ByteString -> Int -> IO ByteString
loop (ByteString -> ByteString -> ByteString
BS.append ByteString
bs ByteString
next) (Int
y forall a. Num a => a -> a -> a
+ ByteString -> Int
BS.length ByteString
next)
writeFrame :: Conn.Connection -> Frame -> IO ()
writeFrame :: Connection -> Frame -> IO ()
writeFrame Connection
handle Frame
f = do
Connection -> ByteString -> IO ()
Conn.connectionPut Connection
handle forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
toStrict forall b c a. (b -> c) -> (a -> b) -> a -> c
. Put -> ByteString
runPut forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall t. Binary t => t -> Put
put forall a b. (a -> b) -> a -> b
$ Frame
f
data Channel = Channel {
Channel -> Connection
connection :: Connection,
Channel -> Chan FramePayload
inQueue :: Chan FramePayload,
Channel -> MVar (Seq (MVar Assembly))
outstandingResponses :: MVar (Seq.Seq (MVar Assembly)),
Channel -> Word16
channelID :: Word16,
Channel -> MVar Int
lastConsumerTag :: MVar Int,
Channel -> MVar Int
nextPublishSeqNum :: MVar Int,
Channel -> TVar IntSet
unconfirmedSet :: TVar IntSet.IntSet,
Channel -> TVar IntSet
ackedSet :: TVar IntSet.IntSet,
Channel -> TVar IntSet
nackedSet :: TVar IntSet.IntSet,
Channel -> Lock
chanActive :: Lock,
Channel -> MVar (Maybe (CloseType, [Char]))
chanClosed :: MVar (Maybe (CloseType, String)),
Channel
-> MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
consumers :: MVar (M.Map Text ((Message, Envelope) -> IO (),
ConsumerTag -> IO ())),
Channel -> MVar [(Message, PublishError) -> IO ()]
returnListeners :: MVar [(Message, PublishError) -> IO ()],
Channel -> MVar [(LongLongInt, Bool, AckType) -> IO ()]
confirmListeners :: MVar [(Word64, Bool, AckType) -> IO ()],
Channel -> MVar [SomeException -> IO ()]
chanExceptionHandlers :: MVar [CE.SomeException -> IO ()]
}
data ChanThreadKilledException = ChanThreadKilledException { ChanThreadKilledException -> SomeException
cause :: CE.SomeException }
deriving (Int -> ChanThreadKilledException -> ShowS
[ChanThreadKilledException] -> ShowS
ChanThreadKilledException -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [ChanThreadKilledException] -> ShowS
$cshowList :: [ChanThreadKilledException] -> ShowS
show :: ChanThreadKilledException -> [Char]
$cshow :: ChanThreadKilledException -> [Char]
showsPrec :: Int -> ChanThreadKilledException -> ShowS
$cshowsPrec :: Int -> ChanThreadKilledException -> ShowS
Show)
instance CE.Exception ChanThreadKilledException
unwrapChanThreadKilledException :: CE.SomeException -> CE.SomeException
unwrapChanThreadKilledException :: SomeException -> SomeException
unwrapChanThreadKilledException SomeException
e = forall b a. b -> (a -> b) -> Maybe a -> b
maybe SomeException
e ChanThreadKilledException -> SomeException
cause forall a b. (a -> b) -> a -> b
$ forall e. Exception e => SomeException -> Maybe e
CE.fromException SomeException
e
msgFromContentHeaderProperties :: ContentHeaderProperties -> BL.ByteString -> Message
(CHBasic Maybe ShortString
content_type Maybe ShortString
content_encoding Maybe FieldTable
headers Maybe Word8
delivery_mode Maybe Word8
priority Maybe ShortString
correlation_id Maybe ShortString
reply_to Maybe ShortString
expiration Maybe ShortString
message_id Maybe LongLongInt
timestamp Maybe ShortString
message_type Maybe ShortString
user_id Maybe ShortString
application_id Maybe ShortString
cluster_id) ByteString
body =
let msgId :: Maybe Text
msgId = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
message_id
contentType :: Maybe Text
contentType = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
content_type
contentEncoding :: Maybe Text
contentEncoding = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
content_encoding
replyTo :: Maybe Text
replyTo = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
reply_to
correlationID :: Maybe Text
correlationID = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
correlation_id
messageType :: Maybe Text
messageType = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
message_type
userId :: Maybe Text
userId = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
user_id
applicationId :: Maybe Text
applicationId = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
application_id
clusterId :: Maybe Text
clusterId = Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
cluster_id
in ByteString
-> Maybe DeliveryMode
-> Maybe LongLongInt
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Word8
-> Maybe Text
-> Maybe Text
-> Maybe FieldTable
-> Message
Message ByteString
body (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Word8 -> DeliveryMode
intToDeliveryMode Maybe Word8
delivery_mode) Maybe LongLongInt
timestamp Maybe Text
msgId Maybe Text
messageType Maybe Text
userId Maybe Text
applicationId Maybe Text
clusterId Maybe Text
contentType Maybe Text
contentEncoding Maybe Text
replyTo Maybe Word8
priority Maybe Text
correlationID (Maybe ShortString -> Maybe Text
fromShortString Maybe ShortString
expiration) Maybe FieldTable
headers
where
fromShortString :: Maybe ShortString -> Maybe Text
fromShortString (Just (ShortString Text
s)) = forall a. a -> Maybe a
Just Text
s
fromShortString Maybe ShortString
_ = forall a. Maybe a
Nothing
msgFromContentHeaderProperties ContentHeaderProperties
c ByteString
_ = forall a. HasCallStack => [Char] -> a
error ([Char]
"Unknown content header properties: " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show ContentHeaderProperties
c)
channelReceiver :: Channel -> IO ()
channelReceiver :: Channel -> IO ()
channelReceiver Channel
chan = do
Assembly
p <- Chan FramePayload -> IO Assembly
readAssembly forall a b. (a -> b) -> a -> b
$ Channel -> Chan FramePayload
inQueue Channel
chan
if Assembly -> Bool
isResponse Assembly
p
then do
IO ()
action <- forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (Channel -> MVar (Seq (MVar Assembly))
outstandingResponses Channel
chan) forall a b. (a -> b) -> a -> b
$ \Seq (MVar Assembly)
val -> do
case forall a. Seq a -> ViewL a
Seq.viewl Seq (MVar Assembly)
val of
MVar Assembly
x Seq.:< Seq (MVar Assembly)
rest -> do
forall (m :: * -> *) a. Monad m => a -> m a
return (Seq (MVar Assembly)
rest, forall a. MVar a -> a -> IO ()
putMVar MVar Assembly
x Assembly
p)
ViewL (MVar Assembly)
Seq.EmptyL -> do
forall (m :: * -> *) a. Monad m => a -> m a
return (Seq (MVar Assembly)
val, forall e a. Exception e => e -> IO a
CE.throwIO forall a b. (a -> b) -> a -> b
$ [Char] -> IOError
userError [Char]
"got response, but have no corresponding request")
IO ()
action
else Assembly -> IO ()
handleAsync Assembly
p
Channel -> IO ()
channelReceiver Channel
chan
where
isResponse :: Assembly -> Bool
isResponse :: Assembly -> Bool
isResponse (ContentMethod Basic_deliver{} ContentHeaderProperties
_ ByteString
_) = Bool
False
isResponse (ContentMethod Basic_return{} ContentHeaderProperties
_ ByteString
_) = Bool
False
isResponse (SimpleMethod (Channel_flow Bool
_)) = Bool
False
isResponse (SimpleMethod Channel_close{}) = Bool
False
isResponse (SimpleMethod (Basic_ack LongLongInt
_ Bool
_)) = Bool
False
isResponse (SimpleMethod Basic_nack{}) = Bool
False
isResponse (SimpleMethod (Basic_cancel ShortString
_ Bool
_)) = Bool
False
isResponse Assembly
_ = Bool
True
handleAsync :: Assembly -> IO ()
handleAsync (ContentMethod (Basic_deliver (ShortString Text
consumerTag) LongLongInt
deliveryTag Bool
redelivered (ShortString Text
exchange)
(ShortString Text
routingKey))
ContentHeaderProperties
properties ByteString
body) =
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Channel
-> MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
consumers Channel
chan) (\Map Text ((Message, Envelope) -> IO (), Text -> IO ())
s -> do
case forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup Text
consumerTag Map Text ((Message, Envelope) -> IO (), Text -> IO ())
s of
Just ((Message, Envelope) -> IO ()
subscriber, Text -> IO ()
_) -> do
let msg :: Message
msg = ContentHeaderProperties -> ByteString -> Message
msgFromContentHeaderProperties ContentHeaderProperties
properties ByteString
body
let env :: Envelope
env = Envelope {envDeliveryTag :: LongLongInt
envDeliveryTag = LongLongInt
deliveryTag, envRedelivered :: Bool
envRedelivered = Bool
redelivered,
envExchangeName :: Text
envExchangeName = Text
exchange, envRoutingKey :: Text
envRoutingKey = Text
routingKey, envChannel :: Channel
envChannel = Channel
chan}
forall a. IO a -> [Handler a] -> IO a
CE.catches ((Message, Envelope) -> IO ()
subscriber (Message
msg, Envelope
env)) [
forall a e. Exception e => (e -> IO a) -> Handler a
CE.Handler (\(ChanThreadKilledException
e::ChanThreadKilledException) -> forall e a. Exception e => e -> IO a
CE.throwIO forall a b. (a -> b) -> a -> b
$ ChanThreadKilledException -> SomeException
cause ChanThreadKilledException
e),
forall a e. Exception e => (e -> IO a) -> Handler a
CE.Handler (\(SomeException
e::CE.SomeException) -> Handle -> [Char] -> IO ()
hPutStrLn Handle
stderr forall a b. (a -> b) -> a -> b
$ [Char]
"AMQP callback threw exception: " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show SomeException
e)
]
Maybe ((Message, Envelope) -> IO (), Text -> IO ())
Nothing ->
forall (m :: * -> *) a. Monad m => a -> m a
return ()
)
handleAsync (SimpleMethod (Channel_close Word16
_ (ShortString Text
errorMsg) Word16
_ Word16
_)) = do
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch (Channel -> Assembly -> IO ()
writeAssembly' Channel
chan (MethodPayload -> Assembly
SimpleMethod MethodPayload
Channel_close_ok))
(\ (IOError
_ :: CE.IOException) ->
forall (m :: * -> *) a. Monad m => a -> m a
return ()
)
Channel -> CloseType -> Text -> IO ()
closeChannel' Channel
chan CloseType
Abnormal Text
errorMsg
IO ThreadId
myThreadId forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a b c. (a -> b -> c) -> b -> a -> c
flip forall e. Exception e => ThreadId -> e -> IO ()
CE.throwTo (CloseType -> [Char] -> AMQPException
ChannelClosedException CloseType
Abnormal forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> [Char]
T.unpack forall a b. (a -> b) -> a -> b
$ Text
errorMsg)
handleAsync (SimpleMethod (Channel_flow Bool
active)) = do
if Bool
active
then Lock -> IO ()
openLock forall a b. (a -> b) -> a -> b
$ Channel -> Lock
chanActive Channel
chan
else Lock -> IO ()
closeLock forall a b. (a -> b) -> a -> b
$ Channel -> Lock
chanActive Channel
chan
forall (m :: * -> *) a. Monad m => a -> m a
return ()
handleAsync (ContentMethod basicReturn :: MethodPayload
basicReturn@Basic_return{} ContentHeaderProperties
props ByteString
body) = do
let msg :: Message
msg = ContentHeaderProperties -> ByteString -> Message
msgFromContentHeaderProperties ContentHeaderProperties
props ByteString
body
pubError :: PublishError
pubError = MethodPayload -> PublishError
basicReturnToPublishError MethodPayload
basicReturn
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Channel -> MVar [(Message, PublishError) -> IO ()]
returnListeners Channel
chan) forall a b. (a -> b) -> a -> b
$ \[(Message, PublishError) -> IO ()]
listeners ->
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(Message, PublishError) -> IO ()]
listeners forall a b. (a -> b) -> a -> b
$ \(Message, PublishError) -> IO ()
l -> forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch ((Message, PublishError) -> IO ()
l (Message
msg, PublishError
pubError)) forall a b. (a -> b) -> a -> b
$ \(SomeException
ex :: CE.SomeException) ->
Handle -> [Char] -> IO ()
hPutStrLn Handle
stderr forall a b. (a -> b) -> a -> b
$ [Char]
"return listener on channel ["forall a. [a] -> [a] -> [a]
++forall a. Show a => a -> [Char]
show (Channel -> Word16
channelID Channel
chan)forall a. [a] -> [a] -> [a]
++[Char]
"] handling error ["forall a. [a] -> [a] -> [a]
++forall a. Show a => a -> [Char]
show PublishError
pubErrorforall a. [a] -> [a] -> [a]
++[Char]
"] threw exception: "forall a. [a] -> [a] -> [a]
++forall a. Show a => a -> [Char]
show SomeException
ex
handleAsync (SimpleMethod (Basic_ack LongLongInt
deliveryTag Bool
multiple)) = LongLongInt -> Bool -> AckType -> IO ()
handleConfirm LongLongInt
deliveryTag Bool
multiple AckType
BasicAck
handleAsync (SimpleMethod (Basic_nack LongLongInt
deliveryTag Bool
multiple Bool
_)) = LongLongInt -> Bool -> AckType -> IO ()
handleConfirm LongLongInt
deliveryTag Bool
multiple AckType
BasicNack
handleAsync (SimpleMethod (Basic_cancel ShortString
consumerTag Bool
_)) = ShortString -> IO ()
handleCancel ShortString
consumerTag
handleAsync Assembly
m = forall a. HasCallStack => [Char] -> a
error ([Char]
"Unknown method: " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show Assembly
m)
handleConfirm :: LongLongInt -> Bool -> AckType -> IO ()
handleConfirm LongLongInt
deliveryTag Bool
multiple AckType
k = do
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Channel -> MVar [(LongLongInt, Bool, AckType) -> IO ()]
confirmListeners Channel
chan) forall a b. (a -> b) -> a -> b
$ \[(LongLongInt, Bool, AckType) -> IO ()]
listeners ->
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(LongLongInt, Bool, AckType) -> IO ()]
listeners forall a b. (a -> b) -> a -> b
$ \(LongLongInt, Bool, AckType) -> IO ()
l -> forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch ((LongLongInt, Bool, AckType) -> IO ()
l (LongLongInt
deliveryTag, Bool
multiple, AckType
k)) forall a b. (a -> b) -> a -> b
$ \(SomeException
ex :: CE.SomeException) ->
Handle -> [Char] -> IO ()
hPutStrLn Handle
stderr forall a b. (a -> b) -> a -> b
$ [Char]
"confirm listener on channel ["forall a. [a] -> [a] -> [a]
++forall a. Show a => a -> [Char]
show (Channel -> Word16
channelID Channel
chan)forall a. [a] -> [a] -> [a]
++[Char]
"] handling method "forall a. [a] -> [a] -> [a]
++forall a. Show a => a -> [Char]
show AckType
kforall a. [a] -> [a] -> [a]
++[Char]
" threw exception: "forall a. [a] -> [a] -> [a]
++forall a. Show a => a -> [Char]
show SomeException
ex
let seqNum :: Int
seqNum = forall a b. (Integral a, Num b) => a -> b
fromIntegral LongLongInt
deliveryTag
let targetSet :: TVar IntSet
targetSet = case AckType
k of
AckType
BasicAck -> Channel -> TVar IntSet
ackedSet Channel
chan
AckType
BasicNack -> Channel -> TVar IntSet
nackedSet Channel
chan
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
IntSet
unconfSet <- forall a. TVar a -> STM a
readTVar (Channel -> TVar IntSet
unconfirmedSet Channel
chan)
let (IntSet -> IntSet
merge, IntSet
pending) = if Bool
multiple
then (IntSet -> IntSet -> IntSet
IntSet.union IntSet
confs, IntSet
pending')
else (Int -> IntSet -> IntSet
IntSet.insert Int
seqNum, Int -> IntSet -> IntSet
IntSet.delete Int
seqNum IntSet
unconfSet)
where
confs :: IntSet
confs = forall a b. (a, b) -> a
fst (IntSet, IntSet)
parts
pending' :: IntSet
pending' = forall a b. (a, b) -> b
snd (IntSet, IntSet)
parts
parts :: (IntSet, IntSet)
parts = (Int -> Bool) -> IntSet -> (IntSet, IntSet)
IntSet.partition (\Int
n -> Int
n forall a. Ord a => a -> a -> Bool
<= Int
seqNum) IntSet
unconfSet
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar IntSet
targetSet (\IntSet
ts -> IntSet -> IntSet
merge IntSet
ts)
forall a. TVar a -> a -> STM ()
writeTVar (Channel -> TVar IntSet
unconfirmedSet Channel
chan) IntSet
pending
handleCancel :: ShortString -> IO ()
handleCancel (ShortString Text
consumerTag) =
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Channel
-> MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
consumers Channel
chan) (\Map Text ((Message, Envelope) -> IO (), Text -> IO ())
s -> do
case forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup Text
consumerTag Map Text ((Message, Envelope) -> IO (), Text -> IO ())
s of
Just ((Message, Envelope) -> IO ()
_, Text -> IO ()
cancelCB) ->
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch (Text -> IO ()
cancelCB Text
consumerTag) forall a b. (a -> b) -> a -> b
$ \(SomeException
ex :: CE.SomeException) ->
Handle -> [Char] -> IO ()
hPutStrLn Handle
stderr forall a b. (a -> b) -> a -> b
$ [Char]
"consumer cancellation listener "forall a. [a] -> [a] -> [a]
++forall a. Show a => a -> [Char]
show Text
consumerTagforall a. [a] -> [a] -> [a]
++[Char]
" on channel ["forall a. [a] -> [a] -> [a]
++forall a. Show a => a -> [Char]
show (Channel -> Word16
channelID Channel
chan)forall a. [a] -> [a] -> [a]
++[Char]
"] threw exception: "forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show SomeException
ex
Maybe ((Message, Envelope) -> IO (), Text -> IO ())
Nothing ->
forall (m :: * -> *) a. Monad m => a -> m a
return ()
)
basicReturnToPublishError :: MethodPayload -> PublishError
basicReturnToPublishError (Basic_return Word16
code (ShortString Text
errText) (ShortString Text
exchange) (ShortString Text
routingKey)) =
let replyError :: ReturnReplyCode
replyError = case Word16
code of
Word16
312 -> Text -> ReturnReplyCode
Unroutable Text
errText
Word16
313 -> Text -> ReturnReplyCode
NoConsumers Text
errText
Word16
404 -> Text -> ReturnReplyCode
NotFound Text
errText
Word16
num -> forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"unexpected return error code: " forall a. [a] -> [a] -> [a]
++forall a. Show a => a -> [Char]
show Word16
num
pubError :: PublishError
pubError = ReturnReplyCode -> Maybe Text -> Text -> PublishError
PublishError ReturnReplyCode
replyError (forall a. a -> Maybe a
Just Text
exchange) Text
routingKey
in PublishError
pubError
basicReturnToPublishError MethodPayload
x = forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"basicReturnToPublishError fail: "forall a. [a] -> [a] -> [a]
++forall a. Show a => a -> [Char]
show MethodPayload
x
addReturnListener :: Channel -> ((Message, PublishError) -> IO ()) -> IO ()
addReturnListener :: Channel -> ((Message, PublishError) -> IO ()) -> IO ()
addReturnListener Channel
chan (Message, PublishError) -> IO ()
listener = do
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Channel -> MVar [(Message, PublishError) -> IO ()]
returnListeners Channel
chan) forall a b. (a -> b) -> a -> b
$ \[(Message, PublishError) -> IO ()]
listeners -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ (Message, PublishError) -> IO ()
listenerforall a. a -> [a] -> [a]
:[(Message, PublishError) -> IO ()]
listeners
addChannelExceptionHandler :: Channel -> (CE.SomeException -> IO ()) -> IO ()
addChannelExceptionHandler :: Channel -> (SomeException -> IO ()) -> IO ()
addChannelExceptionHandler Channel
chan SomeException -> IO ()
handler = do
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Channel -> MVar [SomeException -> IO ()]
chanExceptionHandlers Channel
chan) forall a b. (a -> b) -> a -> b
$ \[SomeException -> IO ()]
handlers -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ SomeException -> IO ()
handlerforall a. a -> [a] -> [a]
:[SomeException -> IO ()]
handlers
isNormalChannelClose :: CE.SomeException -> Bool
isNormalChannelClose :: SomeException -> Bool
isNormalChannelClose SomeException
e = case forall e. Exception e => SomeException -> Maybe e
CE.fromException SomeException
e :: Maybe AMQPException of
Just (ChannelClosedException CloseType
Normal [Char]
_) -> Bool
True
Just (ConnectionClosedException CloseType
Normal [Char]
_) -> Bool
True
Maybe AMQPException
_ -> Bool
False
closeChannel' :: Channel -> CloseType -> Text -> IO ()
closeChannel' :: Channel -> CloseType -> Text -> IO ()
closeChannel' Channel
c CloseType
closeType Text
reason = do
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Channel -> MVar (Maybe (CloseType, [Char]))
chanClosed Channel
c) forall a b. (a -> b) -> a -> b
$ \Maybe (CloseType, [Char])
x -> do
if forall a. Maybe a -> Bool
isNothing Maybe (CloseType, [Char])
x
then do
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Connection -> MVar (IntMap (Channel, ThreadId))
connChannels forall a b. (a -> b) -> a -> b
$ Channel -> Connection
connection Channel
c) forall a b. (a -> b) -> a -> b
$ \IntMap (Channel, ThreadId)
old -> do
Bool
ret <- ChannelAllocator -> Int -> IO Bool
freeChannel (Connection -> ChannelAllocator
connChanAllocator forall a b. (a -> b) -> a -> b
$ Channel -> Connection
connection Channel
c) forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ Channel -> Word16
channelID Channel
c
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
ret forall a b. (a -> b) -> a -> b
$ Handle -> [Char] -> IO ()
hPutStrLn Handle
stderr [Char]
"closeChannel error: channel already freed"
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. Int -> IntMap a -> IntMap a
IM.delete (forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ Channel -> Word16
channelID Channel
c) IntMap (Channel, ThreadId)
old
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ Lock -> IO Bool
killLock forall a b. (a -> b) -> a -> b
$ Channel -> Lock
chanActive Channel
c
forall a. MVar (Seq (MVar a)) -> IO ()
killOutstandingResponses forall a b. (a -> b) -> a -> b
$ Channel -> MVar (Seq (MVar Assembly))
outstandingResponses Channel
c
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just (CloseType
closeType, Text -> [Char]
T.unpack Text
reason)
else forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (CloseType, [Char])
x
where
killOutstandingResponses :: MVar (Seq.Seq (MVar a)) -> IO ()
killOutstandingResponses :: forall a. MVar (Seq (MVar a)) -> IO ()
killOutstandingResponses MVar (Seq (MVar a))
outResps = do
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar (Seq (MVar a))
outResps forall a b. (a -> b) -> a -> b
$ \Seq (MVar a)
val -> do
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
F.mapM_ (\MVar a
x -> forall a. MVar a -> a -> IO Bool
tryPutMVar MVar a
x forall a b. (a -> b) -> a -> b
$ forall a. HasCallStack => [Char] -> a
error [Char]
"channel closed") Seq (MVar a)
val
forall (m :: * -> *) a. Monad m => a -> m a
return forall a. HasCallStack => a
undefined
openChannel :: Connection -> IO Channel
openChannel :: Connection -> IO Channel
openChannel Connection
c = do
Chan FramePayload
newInQueue <- forall a. IO (Chan a)
newChan
MVar (Seq (MVar Assembly))
outRes <- forall a. a -> IO (MVar a)
newMVar forall a. Seq a
Seq.empty
MVar Int
lastConsTag <- forall a. a -> IO (MVar a)
newMVar Int
0
Lock
ca <- IO Lock
newLock
MVar (Maybe (CloseType, [Char]))
closed <- forall a. a -> IO (MVar a)
newMVar forall a. Maybe a
Nothing
MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
conss <- forall a. a -> IO (MVar a)
newMVar forall k a. Map k a
M.empty
MVar [(Message, PublishError) -> IO ()]
retListeners <- forall a. a -> IO (MVar a)
newMVar []
TVar IntSet
aSet <- forall a. a -> IO (TVar a)
newTVarIO IntSet
IntSet.empty
TVar IntSet
nSet <- forall a. a -> IO (TVar a)
newTVarIO IntSet
IntSet.empty
MVar Int
nxtSeq <- forall a. a -> IO (MVar a)
newMVar Int
0
TVar IntSet
unconfSet <- forall a. a -> IO (TVar a)
newTVarIO IntSet
IntSet.empty
MVar [(LongLongInt, Bool, AckType) -> IO ()]
cnfListeners <- forall a. a -> IO (MVar a)
newMVar []
MVar [SomeException -> IO ()]
handlers <- forall a. a -> IO (MVar a)
newMVar []
Channel
newChannel <- forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (Connection -> MVar (IntMap (Channel, ThreadId))
connChannels Connection
c) forall a b. (a -> b) -> a -> b
$ \IntMap (Channel, ThreadId)
mp -> do
Int
newChannelID <- ChannelAllocator -> IO Int
allocateChannel (Connection -> ChannelAllocator
connChanAllocator Connection
c)
let newChannel :: Channel
newChannel = Connection
-> Chan FramePayload
-> MVar (Seq (MVar Assembly))
-> Word16
-> MVar Int
-> MVar Int
-> TVar IntSet
-> TVar IntSet
-> TVar IntSet
-> Lock
-> MVar (Maybe (CloseType, [Char]))
-> MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
-> MVar [(Message, PublishError) -> IO ()]
-> MVar [(LongLongInt, Bool, AckType) -> IO ()]
-> MVar [SomeException -> IO ()]
-> Channel
Channel Connection
c Chan FramePayload
newInQueue MVar (Seq (MVar Assembly))
outRes (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
newChannelID) MVar Int
lastConsTag MVar Int
nxtSeq TVar IntSet
unconfSet TVar IntSet
aSet TVar IntSet
nSet Lock
ca MVar (Maybe (CloseType, [Char]))
closed MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
conss MVar [(Message, PublishError) -> IO ()]
retListeners MVar [(LongLongInt, Bool, AckType) -> IO ()]
cnfListeners MVar [SomeException -> IO ()]
handlers
ThreadId
thrID <- forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally' (Channel -> IO ()
channelReceiver Channel
newChannel) forall a b. (a -> b) -> a -> b
$ \Either SomeException ()
res -> do
Channel -> CloseType -> Text -> IO ()
closeChannel' Channel
newChannel CloseType
Normal Text
"closed"
case Either SomeException ()
res of
Right ()
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Left SomeException
ex -> do
let unwrappedExc :: SomeException
unwrappedExc = SomeException -> SomeException
unwrapChanThreadKilledException SomeException
ex
[SomeException -> IO ()]
handlers' <- forall a. MVar a -> IO a
readMVar MVar [SomeException -> IO ()]
handlers
case (forall (t :: * -> *) a. Foldable t => t a -> Bool
null [SomeException -> IO ()]
handlers', SomeException -> Maybe [Char]
fromAbnormalChannelClose SomeException
unwrappedExc) of
(Bool
True, Just [Char]
reason) -> Handle -> [Char] -> IO ()
hPutStrLn Handle
stderr forall a b. (a -> b) -> a -> b
$ [Char]
"unhandled AMQP channel exception (chanId="forall a. [a] -> [a] -> [a]
++forall a. Show a => a -> [Char]
show Int
newChannelIDforall a. [a] -> [a] -> [a]
++[Char]
"): "forall a. [a] -> [a] -> [a]
++[Char]
reason
(Bool, Maybe [Char])
_ -> forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (forall a b. (a -> b) -> a -> b
$ SomeException
unwrappedExc) [SomeException -> IO ()]
handlers'
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall a. Int -> IntMap a -> Bool
IM.member Int
newChannelID IntMap (Channel, ThreadId)
mp) forall a b. (a -> b) -> a -> b
$ forall e a. Exception e => e -> IO a
CE.throwIO forall a b. (a -> b) -> a -> b
$ [Char] -> IOError
userError [Char]
"openChannel fail: channel already open"
forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. Int -> a -> IntMap a -> IntMap a
IM.insert Int
newChannelID (Channel
newChannel, ThreadId
thrID) IntMap (Channel, ThreadId)
mp, Channel
newChannel)
SimpleMethod (Channel_open_ok LongString
_) <- Channel -> Assembly -> IO Assembly
request Channel
newChannel forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod forall a b. (a -> b) -> a -> b
$ ShortString -> MethodPayload
Channel_open (Text -> ShortString
ShortString Text
"")
forall (m :: * -> *) a. Monad m => a -> m a
return Channel
newChannel
where
fromAbnormalChannelClose :: CE.SomeException -> Maybe String
fromAbnormalChannelClose :: SomeException -> Maybe [Char]
fromAbnormalChannelClose SomeException
exc =
case forall e. Exception e => SomeException -> Maybe e
CE.fromException SomeException
exc :: Maybe AMQPException of
Just (ConnectionClosedException CloseType
_ [Char]
_) -> forall a. Maybe a
Nothing
Just (ChannelClosedException CloseType
Normal [Char]
_) -> forall a. Maybe a
Nothing
Just (ChannelClosedException CloseType
Abnormal [Char]
reason) -> forall a. a -> Maybe a
Just [Char]
reason
Just (AllChannelsAllocatedException Int
_) -> forall a. a -> Maybe a
Just [Char]
"all channels allocated"
Maybe AMQPException
Nothing -> forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ forall a. Show a => a -> [Char]
show SomeException
exc
closeChannel :: Channel -> IO ()
closeChannel :: Channel -> IO ()
closeChannel Channel
c = do
SimpleMethod MethodPayload
Channel_close_ok <- Channel -> Assembly -> IO Assembly
request Channel
c forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod forall a b. (a -> b) -> a -> b
$ Word16 -> ShortString -> Word16 -> Word16 -> MethodPayload
Channel_close Word16
0 (Text -> ShortString
ShortString Text
"") Word16
0 Word16
0
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Connection -> MVar (IntMap (Channel, ThreadId))
connChannels forall a b. (a -> b) -> a -> b
$ Channel -> Connection
connection Channel
c) forall a b. (a -> b) -> a -> b
$ \IntMap (Channel, ThreadId)
chans -> do
case forall a. Int -> IntMap a -> Maybe a
IM.lookup (forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ Channel -> Word16
channelID Channel
c) IntMap (Channel, ThreadId)
chans of
Just (Channel
_, ThreadId
thrID) -> forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
thrID forall a b. (a -> b) -> a -> b
$ CloseType -> [Char] -> AMQPException
ChannelClosedException CloseType
Normal [Char]
"closeChannel was called"
Maybe (Channel, ThreadId)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
writeFrames :: Channel -> [FramePayload] -> IO ()
writeFrames :: Channel -> [FramePayload] -> IO ()
writeFrames Channel
chan [FramePayload]
payloads = do
let conn :: Connection
conn = Channel -> Connection
connection Channel
chan
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Connection -> MVar (IntMap (Channel, ThreadId))
connChannels Connection
conn) forall a b. (a -> b) -> a -> b
$ \IntMap (Channel, ThreadId)
chans ->
if forall a. Int -> IntMap a -> Bool
IM.member (forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ Channel -> Word16
channelID Channel
chan) IntMap (Channel, ThreadId)
chans
then forall e a. Exception e => IO a -> (e -> IO a) -> IO a
CE.catch
(do
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Connection -> MVar ()
connWriteLock Connection
conn) forall a b. (a -> b) -> a -> b
$ \()
_ ->
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\FramePayload
payload -> Connection -> Frame -> IO ()
writeFrame (Connection -> Connection
connHandle Connection
conn) (Word16 -> FramePayload -> Frame
Frame (Channel -> Word16
channelID Channel
chan) FramePayload
payload)) [FramePayload]
payloads
Connection -> IO ()
updateLastSent Connection
conn)
(\(IOError
_ :: CE.IOException) -> do
forall e a. Exception e => e -> IO a
CE.throwIO forall a b. (a -> b) -> a -> b
$ [Char] -> IOError
userError [Char]
"connection not open"
)
else do
forall e a. Exception e => e -> IO a
CE.throwIO forall a b. (a -> b) -> a -> b
$ [Char] -> IOError
userError [Char]
"channel not open"
writeAssembly' :: Channel -> Assembly -> IO ()
writeAssembly' :: Channel -> Assembly -> IO ()
writeAssembly' Channel
chan (ContentMethod MethodPayload
m ContentHeaderProperties
properties ByteString
msg) = do
Lock -> IO ()
waitLock forall a b. (a -> b) -> a -> b
$ Channel -> Lock
chanActive Channel
chan
let !toWrite :: [FramePayload]
toWrite = [
MethodPayload -> FramePayload
MethodPayload MethodPayload
m,
Word16
-> Word16 -> LongLongInt -> ContentHeaderProperties -> FramePayload
ContentHeaderPayload
(ContentHeaderProperties -> Word16
getClassIDOf ContentHeaderProperties
properties)
Word16
0
(forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ ByteString -> ByteOffset
BL.length ByteString
msg)
ContentHeaderProperties
properties] forall a. [a] -> [a] -> [a]
++
(if ByteString -> ByteOffset
BL.length ByteString
msg forall a. Ord a => a -> a -> Bool
> ByteOffset
0
then do
forall a b. (a -> b) -> [a] -> [b]
map ByteString -> FramePayload
ContentBodyPayload
(ByteString -> ByteOffset -> [ByteString]
splitLen ByteString
msg forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral (Connection -> Int
connMaxFrameSize forall a b. (a -> b) -> a -> b
$ Channel -> Connection
connection Channel
chan) forall a. Num a => a -> a -> a
- ByteOffset
8)
else []
)
Channel -> [FramePayload] -> IO ()
writeFrames Channel
chan [FramePayload]
toWrite
where
splitLen :: ByteString -> ByteOffset -> [ByteString]
splitLen ByteString
str ByteOffset
len | ByteString -> ByteOffset
BL.length ByteString
str forall a. Ord a => a -> a -> Bool
> ByteOffset
len = ByteOffset -> ByteString -> ByteString
BL.take ByteOffset
len ByteString
str forall a. a -> [a] -> [a]
: ByteString -> ByteOffset -> [ByteString]
splitLen (ByteOffset -> ByteString -> ByteString
BL.drop ByteOffset
len ByteString
str) ByteOffset
len
splitLen ByteString
str ByteOffset
_ = [ByteString
str]
writeAssembly' Channel
chan (SimpleMethod MethodPayload
m) = Channel -> [FramePayload] -> IO ()
writeFrames Channel
chan [MethodPayload -> FramePayload
MethodPayload MethodPayload
m]
writeAssembly :: Channel -> Assembly -> IO ()
writeAssembly :: Channel -> Assembly -> IO ()
writeAssembly Channel
chan Assembly
m =
forall a. IO a -> [Handler a] -> IO a
CE.catches
(Channel -> Assembly -> IO ()
writeAssembly' Channel
chan Assembly
m)
[forall a e. Exception e => (e -> IO a) -> Handler a
CE.Handler (\ (AMQPException
_ :: AMQPException) -> forall a. Channel -> IO a
throwMostRelevantAMQPException Channel
chan),
forall a e. Exception e => (e -> IO a) -> Handler a
CE.Handler (\ (ErrorCall
_ :: CE.ErrorCall) -> forall a. Channel -> IO a
throwMostRelevantAMQPException Channel
chan),
forall a e. Exception e => (e -> IO a) -> Handler a
CE.Handler (\ (IOError
_ :: CE.IOException) -> forall a. Channel -> IO a
throwMostRelevantAMQPException Channel
chan)]
request :: Channel -> Assembly -> IO Assembly
request :: Channel -> Assembly -> IO Assembly
request Channel
chan Assembly
m = do
MVar Assembly
res <- forall a. IO (MVar a)
newEmptyMVar
forall a. IO a -> [Handler a] -> IO a
CE.catches (do
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar (Channel -> MVar (Maybe (CloseType, [Char]))
chanClosed Channel
chan) forall a b. (a -> b) -> a -> b
$ \Maybe (CloseType, [Char])
cc -> do
if forall a. Maybe a -> Bool
isNothing Maybe (CloseType, [Char])
cc
then do
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Channel -> MVar (Seq (MVar Assembly))
outstandingResponses Channel
chan) forall a b. (a -> b) -> a -> b
$ \Seq (MVar Assembly)
val -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$! Seq (MVar Assembly)
val forall a. Seq a -> a -> Seq a
Seq.|> MVar Assembly
res
Channel -> Assembly -> IO ()
writeAssembly' Channel
chan Assembly
m
else forall e a. Exception e => e -> IO a
CE.throwIO forall a b. (a -> b) -> a -> b
$ [Char] -> IOError
userError [Char]
"closed"
!Assembly
r <- forall a. MVar a -> IO a
takeMVar MVar Assembly
res
forall (m :: * -> *) a. Monad m => a -> m a
return Assembly
r
)
[forall a e. Exception e => (e -> IO a) -> Handler a
CE.Handler (\ (AMQPException
_ :: AMQPException) -> forall a. Channel -> IO a
throwMostRelevantAMQPException Channel
chan),
forall a e. Exception e => (e -> IO a) -> Handler a
CE.Handler (\ (ErrorCall
_ :: CE.ErrorCall) -> forall a. Channel -> IO a
throwMostRelevantAMQPException Channel
chan),
forall a e. Exception e => (e -> IO a) -> Handler a
CE.Handler (\ (IOError
_ :: CE.IOException) -> forall a. Channel -> IO a
throwMostRelevantAMQPException Channel
chan)]
throwMostRelevantAMQPException :: Channel -> IO a
throwMostRelevantAMQPException :: forall a. Channel -> IO a
throwMostRelevantAMQPException Channel
chan = do
Maybe (CloseType, [Char])
cc <- forall a. MVar a -> IO a
readMVar forall a b. (a -> b) -> a -> b
$ Connection -> MVar (Maybe (CloseType, [Char]))
connClosed forall a b. (a -> b) -> a -> b
$ Channel -> Connection
connection Channel
chan
case Maybe (CloseType, [Char])
cc of
Just (CloseType
closeType, [Char]
r) -> forall e a. Exception e => e -> IO a
CE.throwIO forall a b. (a -> b) -> a -> b
$ CloseType -> [Char] -> AMQPException
ConnectionClosedException CloseType
closeType [Char]
r
Maybe (CloseType, [Char])
Nothing -> do
Maybe (CloseType, [Char])
chc <- forall a. MVar a -> IO a
readMVar forall a b. (a -> b) -> a -> b
$ Channel -> MVar (Maybe (CloseType, [Char]))
chanClosed Channel
chan
case Maybe (CloseType, [Char])
chc of
Just (CloseType
ct, [Char]
r) -> forall e a. Exception e => e -> IO a
CE.throwIO forall a b. (a -> b) -> a -> b
$ CloseType -> [Char] -> AMQPException
ChannelClosedException CloseType
ct [Char]
r
Maybe (CloseType, [Char])
Nothing -> forall e a. Exception e => e -> IO a
CE.throwIO forall a b. (a -> b) -> a -> b
$ CloseType -> [Char] -> AMQPException
ConnectionClosedException CloseType
Abnormal [Char]
"unknown reason"
waitForAllConfirms :: Channel -> STM (IntSet.IntSet, IntSet.IntSet)
waitForAllConfirms :: Channel -> STM (IntSet, IntSet)
waitForAllConfirms Channel
chan = do
IntSet
pending <- forall a. TVar a -> STM a
readTVar forall a b. (a -> b) -> a -> b
$ Channel -> TVar IntSet
unconfirmedSet Channel
chan
Bool -> STM ()
check (IntSet -> Bool
IntSet.null IntSet
pending)
(,) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TVar a -> a -> STM a
swapTVar (Channel -> TVar IntSet
ackedSet Channel
chan) IntSet
IntSet.empty
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. TVar a -> a -> STM a
swapTVar (Channel -> TVar IntSet
nackedSet Channel
chan) IntSet
IntSet.empty