{-# LANGUAGE CPP, OverloadedStrings, RecordWildCards, ScopedTypeVariables, FlexibleContexts, MultiWayIf #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
module Network.Riak.Connection.Internal
(
Network.Riak.Connection.Internal.connect
, disconnect
, setClientID
, defaultClient
, makeClientID
, exchange
, exchangeMaybe
, exchange_
, pipeline
, pipelineMaybe
, pipeline_
, sendRequest
, recvResponse
, recvMaybeResponse
, recvResponse_
) where
import Control.Concurrent.Async (async, waitBoth)
import Control.Exception (Exception, IOException, throwIO, bracketOnError)
import Control.Monad (forM_, replicateM)
import Data.Binary.Get (Get, Decoder(..), getWord32be, runGetIncremental)
import Data.Binary.Put (Put, putWord32be, runPut, putLazyByteString)
import Data.ByteString (ByteString)
import Data.IORef (newIORef, readIORef, writeIORef)
import Network.Riak.Connection.NoPush (setNoPush)
import Network.Riak.Debug as Debug
import Network.Riak.Lens
import Network.Riak.Tag (getTag, putTag)
import Network.Riak.Types.Internal hiding (MessageTag(..))
import Network.Socket as Socket
import Numeric (showHex)
import Data.ProtoLens (buildMessage)
import System.Random (randomIO)
import qualified Control.Exception as E
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as B8
import qualified Data.ByteString.Builder as BB
import qualified Data.ByteString.Lazy.Char8 as L
import qualified Data.Riak.Proto as Proto
import qualified Network.Riak.Types.Internal as T
import qualified Network.Socket.ByteString as B
import qualified Network.Socket.ByteString.Lazy as L
defaultClient :: Client
defaultClient :: Client
defaultClient = Client :: HostName -> HostName -> ClientID -> Client
Client {
host :: HostName
host = HostName
"127.0.0.1"
, port :: HostName
port = HostName
"8087"
, clientID :: ClientID
clientID = ClientID
B.empty
}
setClientID :: Connection -> ClientID -> IO ()
setClientID :: Connection -> ClientID -> IO ()
setClientID Connection
conn ClientID
i = do
Connection -> RpbSetClientIdReq -> IO ()
forall req. Request req => Connection -> req -> IO ()
sendRequest Connection
conn (RpbSetClientIdReq -> IO ()) -> RpbSetClientIdReq -> IO ()
forall a b. (a -> b) -> a -> b
$ (RpbSetClientIdReq
forall msg. Message msg => msg
Proto.defMessage RpbSetClientIdReq
-> (RpbSetClientIdReq -> RpbSetClientIdReq) -> RpbSetClientIdReq
forall a b. a -> (a -> b) -> b
& LensLike' Identity RpbSetClientIdReq ClientID
forall (f :: * -> *) s a.
(Functor f, HasField s "clientId" a) =>
LensLike' f s a
Proto.clientId LensLike' Identity RpbSetClientIdReq ClientID
-> ClientID -> RpbSetClientIdReq -> RpbSetClientIdReq
forall s a. Setter s a -> a -> s -> s
.~ ClientID
i :: Proto.RpbSetClientIdReq)
Connection -> MessageTag -> IO ()
recvResponse_ Connection
conn MessageTag
T.SetClientIDResponse
makeClientID :: IO ClientID
makeClientID :: IO ClientID
makeClientID = do
Int
r <- IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => m a
randomIO :: IO Int
ClientID -> IO ClientID
forall (m :: * -> *) a. Monad m => a -> m a
return (ClientID -> IO ClientID)
-> (HostName -> ClientID) -> HostName -> IO ClientID
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ClientID -> ClientID -> ClientID
B.append ClientID
"hs_" (ClientID -> ClientID)
-> (HostName -> ClientID) -> HostName -> ClientID
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HostName -> ClientID
B8.pack (HostName -> ClientID)
-> (HostName -> HostName) -> HostName -> ClientID
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> HostName -> HostName
forall a. (Integral a, Show a) => a -> HostName -> HostName
showHex (Int -> Int
forall a. Num a => a -> a
abs Int
r) (HostName -> IO ClientID) -> HostName -> IO ClientID
forall a b. (a -> b) -> a -> b
$ HostName
""
addClientID :: Client -> IO Client
addClientID :: Client -> IO Client
addClientID Client
client
| ClientID -> Bool
B.null (Client -> ClientID
clientID Client
client) = do
ClientID
i <- IO ClientID
makeClientID
Client -> IO Client
forall (m :: * -> *) a. Monad m => a -> m a
return Client
client { clientID :: ClientID
clientID = ClientID
i }
| Bool
otherwise = Client -> IO Client
forall (m :: * -> *) a. Monad m => a -> m a
return Client
client
connect :: Client -> IO Connection
connect :: Client -> IO Connection
connect Client
cli0 = do
client :: Client
client@Client{HostName
ClientID
clientID :: ClientID
port :: HostName
host :: HostName
clientID :: Client -> ClientID
port :: Client -> HostName
host :: Client -> HostName
..} <- Client -> IO Client
addClientID Client
cli0
let hints :: AddrInfo
hints = AddrInfo
defaultHints {
addrFlags :: [AddrInfoFlag]
addrFlags = [AddrInfoFlag
AI_ADDRCONFIG]
, addrSocketType :: SocketType
addrSocketType = SocketType
Stream
}
HostName -> HostName -> IO ()
debug HostName
"connect" (HostName -> IO ()) -> HostName -> IO ()
forall a b. (a -> b) -> a -> b
$ HostName
"server " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ HostName
host HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ HostName
":" HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ HostName
port HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ HostName
", client ID " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++
ClientID -> HostName
B8.unpack ClientID
clientID
[AddrInfo]
ais <- Maybe AddrInfo -> Maybe HostName -> Maybe HostName -> IO [AddrInfo]
getAddrInfo (AddrInfo -> Maybe AddrInfo
forall a. a -> Maybe a
Just AddrInfo
hints) (HostName -> Maybe HostName
forall a. a -> Maybe a
Just HostName
host) (HostName -> Maybe HostName
forall a. a -> Maybe a
Just HostName
port)
let ai :: AddrInfo
ai = case [AddrInfo]
ais of
(AddrInfo
a:[AddrInfo]
_) -> AddrInfo
a
[AddrInfo]
_ -> HostName -> HostName -> AddrInfo
forall a. HostName -> HostName -> a
moduleError HostName
"connect" (HostName -> AddrInfo) -> HostName -> AddrInfo
forall a b. (a -> b) -> a -> b
$
HostName
"could not look up server " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ HostName
host HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ HostName
":" HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ HostName
port
HostName -> IO Connection -> IO Connection
forall a. HostName -> IO a -> IO a
onIOException HostName
"connect" (IO Connection -> IO Connection) -> IO Connection -> IO Connection
forall a b. (a -> b) -> a -> b
$
IO Socket
-> (Socket -> IO ()) -> (Socket -> IO Connection) -> IO Connection
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracketOnError
(Family -> SocketType -> ProtocolNumber -> IO Socket
socket (AddrInfo -> Family
addrFamily AddrInfo
ai) (AddrInfo -> SocketType
addrSocketType AddrInfo
ai) (AddrInfo -> ProtocolNumber
addrProtocol AddrInfo
ai))
Socket -> IO ()
close ((Socket -> IO Connection) -> IO Connection)
-> (Socket -> IO Connection) -> IO Connection
forall a b. (a -> b) -> a -> b
$
\Socket
sock -> do
Socket -> SockAddr -> IO ()
Socket.connect Socket
sock (AddrInfo -> SockAddr
addrAddress AddrInfo
ai)
IORef ClientID
buf <- ClientID -> IO (IORef ClientID)
forall a. a -> IO (IORef a)
newIORef ClientID
B.empty
let conn :: Connection
conn = Socket -> Client -> IORef ClientID -> Connection
Connection Socket
sock Client
client IORef ClientID
buf
Connection -> ClientID -> IO ()
setClientID Connection
conn ClientID
clientID
Connection -> IO Connection
forall (m :: * -> *) a. Monad m => a -> m a
return Connection
conn
disconnect :: Connection -> IO ()
disconnect :: Connection -> IO ()
disconnect Connection{IORef ClientID
Socket
Client
connBuffer :: Connection -> IORef ClientID
connClient :: Connection -> Client
connSock :: Connection -> Socket
connBuffer :: IORef ClientID
connClient :: Client
connSock :: Socket
..} = HostName -> IO () -> IO ()
forall a. HostName -> IO a -> IO a
onIOException HostName
"disconnect" (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
HostName -> HostName -> IO ()
debug HostName
"disconnect" (HostName -> IO ()) -> HostName -> IO ()
forall a b. (a -> b) -> a -> b
$ HostName
"server " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ Client -> HostName
host Client
connClient HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ HostName
":" HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ Client -> HostName
port Client
connClient HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++
HostName
", client ID " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ ClientID -> HostName
B8.unpack (Client -> ClientID
clientID Client
connClient)
Socket -> IO ()
close Socket
connSock
IORef ClientID -> ClientID -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef ClientID
connBuffer ClientID
B.empty
recvBufferSize :: Integral a => a
recvBufferSize :: a
recvBufferSize = a
16384
{-# INLINE recvBufferSize #-}
recvExactly :: Connection -> Int -> IO ByteString
recvExactly :: Connection -> Int -> IO ClientID
recvExactly Connection{IORef ClientID
Socket
Client
connBuffer :: IORef ClientID
connClient :: Client
connSock :: Socket
connBuffer :: Connection -> IORef ClientID
connClient :: Connection -> Client
connSock :: Connection -> Socket
..} Int
n0
| Int
n0 Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 = ClientID -> IO ClientID
forall (m :: * -> *) a. Monad m => a -> m a
return ClientID
B.empty
| Bool
otherwise = do
ClientID
bs <- IORef ClientID -> IO ClientID
forall a. IORef a -> IO a
readIORef IORef ClientID
connBuffer
let (ClientID
h,ClientID
t) = Int -> ClientID -> (ClientID, ClientID)
B.splitAt Int
n0 ClientID
bs
len :: Int
len = ClientID -> Int
B.length ClientID
h
if Int
len Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
n0
then IORef ClientID -> ClientID -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef ClientID
connBuffer ClientID
t IO () -> IO ClientID -> IO ClientID
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ClientID -> IO ClientID
forall (m :: * -> *) a. Monad m => a -> m a
return ClientID
h
else [ClientID] -> Int -> IO ClientID
go [ClientID
h] (Int
n0Int -> Int -> Int
forall a. Num a => a -> a -> a
-Int
len)
where
maxInt :: Int
maxInt = Int -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int
forall a. Bounded a => a
maxBound :: Int)
go :: [ClientID] -> Int -> IO ClientID
go (ClientID
s:[ClientID]
acc) Int
n
| Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
0 = do
let (ClientID
h,ClientID
t) = Int -> ClientID -> (ClientID, ClientID)
B.splitAt (ClientID -> Int
B.length ClientID
s Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n) ClientID
s
IORef ClientID -> ClientID -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef ClientID
connBuffer (ClientID -> IO ()) -> ClientID -> IO ()
forall a b. (a -> b) -> a -> b
$! ClientID
t
ClientID -> IO ClientID
forall (m :: * -> *) a. Monad m => a -> m a
return (ClientID -> IO ClientID) -> ClientID -> IO ClientID
forall a b. (a -> b) -> a -> b
$ [ClientID] -> ClientID
B.concat ([ClientID] -> [ClientID]
forall a. [a] -> [a]
reverse (ClientID
hClientID -> [ClientID] -> [ClientID]
forall a. a -> [a] -> [a]
:[ClientID]
acc))
go [ClientID]
acc Int
n
| Int
n Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 = do
IORef ClientID -> ClientID -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef ClientID
connBuffer ClientID
B.empty
ClientID -> IO ClientID
forall (m :: * -> *) a. Monad m => a -> m a
return (ClientID -> IO ClientID) -> ClientID -> IO ClientID
forall a b. (a -> b) -> a -> b
$ [ClientID] -> ClientID
B.concat ([ClientID] -> [ClientID]
forall a. [a] -> [a]
reverse [ClientID]
acc)
| Bool
otherwise = do
let n' :: Int
n' = Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
forall a. Integral a => a
recvBufferSize (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$ Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
n Int
maxInt
ClientID
bs <- Socket -> Int -> IO ClientID
B.recv Socket
connSock (Int -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n')
let len :: Int
len = ClientID -> Int
B.length ClientID
bs
if Int
len Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
then HostName -> HostName -> IO ClientID
forall a. HostName -> HostName -> a
moduleError HostName
"recvExactly" HostName
"short read from network"
else [ClientID] -> Int -> IO ClientID
go (ClientID
bsClientID -> [ClientID] -> [ClientID]
forall a. a -> [a] -> [a]
:[ClientID]
acc) (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
len)
recvGet :: Connection -> Get a -> IO a
recvGet :: Connection -> Get a -> IO a
recvGet Connection{IORef ClientID
Socket
Client
connBuffer :: IORef ClientID
connClient :: Client
connSock :: Socket
connBuffer :: Connection -> IORef ClientID
connClient :: Connection -> Client
connSock :: Connection -> Socket
..} Get a
get = do
let refill :: IO (Maybe ByteString)
refill :: IO (Maybe ClientID)
refill = do
ClientID
bs <- Socket -> Int -> IO ClientID
B.recv Socket
connSock Int
forall a. Integral a => a
recvBufferSize
if ClientID -> Bool
B.null ClientID
bs
then Socket -> ShutdownCmd -> IO ()
shutdown Socket
connSock ShutdownCmd
ShutdownReceive IO () -> IO (Maybe ClientID) -> IO (Maybe ClientID)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe ClientID -> IO (Maybe ClientID)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ClientID
forall a. Maybe a
Nothing
else Maybe ClientID -> IO (Maybe ClientID)
forall (m :: * -> *) a. Monad m => a -> m a
return (ClientID -> Maybe ClientID
forall a. a -> Maybe a
Just ClientID
bs)
step :: Decoder b -> IO b
step (Fail ClientID
_ ByteOffset
_ HostName
err) = HostName -> HostName -> IO b
forall a. HostName -> HostName -> a
moduleError HostName
"recvGet" HostName
err
step (Done ClientID
bs ByteOffset
_ b
r) = IORef ClientID -> ClientID -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef ClientID
connBuffer ClientID
bs IO () -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> b -> IO b
forall (m :: * -> *) a. Monad m => a -> m a
return b
r
step (Partial Maybe ClientID -> Decoder b
k) = (Decoder b -> IO b
step (Decoder b -> IO b)
-> (Maybe ClientID -> Decoder b) -> Maybe ClientID -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe ClientID -> Decoder b
k) (Maybe ClientID -> IO b) -> IO (Maybe ClientID) -> IO b
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO (Maybe ClientID)
refill
Maybe ClientID
mbs <- do
ClientID
buf <- IORef ClientID -> IO ClientID
forall a. IORef a -> IO a
readIORef IORef ClientID
connBuffer
if ClientID -> Bool
B.null ClientID
buf
then IO (Maybe ClientID)
refill
else Maybe ClientID -> IO (Maybe ClientID)
forall (m :: * -> *) a. Monad m => a -> m a
return (ClientID -> Maybe ClientID
forall a. a -> Maybe a
Just ClientID
buf)
case Maybe ClientID
mbs of
Just ClientID
bs ->
case Get a -> Decoder a
forall a. Get a -> Decoder a
runGetIncremental Get a
get of
Fail ClientID
_ ByteOffset
_ HostName
err -> HostName -> HostName -> IO a
forall a. HostName -> HostName -> a
moduleError HostName
"recvGet" HostName
err
Done ClientID
bs' ByteOffset
_ a
r -> IORef ClientID -> ClientID -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef ClientID
connBuffer ClientID
bs' IO () -> IO a -> IO a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
r
Partial Maybe ClientID -> Decoder a
k -> Decoder a -> IO a
forall b. Decoder b -> IO b
step (Maybe ClientID -> Decoder a
k (ClientID -> Maybe ClientID
forall a. a -> Maybe a
Just ClientID
bs))
Maybe ClientID
Nothing -> HostName -> HostName -> IO a
forall a. HostName -> HostName -> a
moduleError HostName
"recvGet" HostName
"socket closed"
recvGetN :: Proto.Message a => Connection -> Int -> IO a
recvGetN :: Connection -> Int -> IO a
recvGetN Connection
conn Int
n = do
ClientID
bs <- Connection -> Int -> IO ClientID
recvExactly Connection
conn Int
n
case ClientID -> Either HostName a
forall msg. Message msg => ClientID -> Either HostName msg
Proto.decodeMessage ClientID
bs of
Left HostName
err -> HostName -> HostName -> IO a
forall a. HostName -> HostName -> a
moduleError HostName
"recvGetN" HostName
err
Right a
r -> a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
r
putRequest :: (Request req) => req -> Put
putRequest :: req -> Put
putRequest req
req = do
Word32 -> Put
putWord32be (ByteOffset -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ByteOffset
1 ByteOffset -> ByteOffset -> ByteOffset
forall a. Num a => a -> a -> a
+ ByteString -> ByteOffset
L.length ByteString
bytes))
MessageTag -> Put
putTag (req -> MessageTag
forall msg. Tagged msg => msg -> MessageTag
messageTag req
req)
ByteString -> Put
putLazyByteString ByteString
bytes
where
bytes :: L.ByteString
bytes :: ByteString
bytes = Builder -> ByteString
BB.toLazyByteString (req -> Builder
forall msg. Message msg => msg -> Builder
buildMessage req
req)
instance Exception Proto.RpbErrorResp
throwError :: Proto.RpbErrorResp -> IO a
throwError :: RpbErrorResp -> IO a
throwError = RpbErrorResp -> IO a
forall e a. Exception e => e -> IO a
throwIO
getResponse :: Response a => Connection -> Int -> a -> T.MessageTag -> IO a
getResponse :: Connection -> Int -> a -> MessageTag -> IO a
getResponse Connection
conn Int
len a
_ MessageTag
expected = do
MessageTag
tag <- Connection -> Get MessageTag -> IO MessageTag
forall a. Connection -> Get a -> IO a
recvGet Connection
conn Get MessageTag
getTag
if | MessageTag
tag MessageTag -> MessageTag -> Bool
forall a. Eq a => a -> a -> Bool
== MessageTag
expected -> Connection -> Int -> IO a
forall a. Message a => Connection -> Int -> IO a
recvGetN Connection
conn (Int
lenInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1)
| MessageTag
tag MessageTag -> MessageTag -> Bool
forall a. Eq a => a -> a -> Bool
== MessageTag
T.ErrorResponse -> RpbErrorResp -> IO a
forall a. RpbErrorResp -> IO a
throwError (RpbErrorResp -> IO a) -> IO RpbErrorResp -> IO a
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Connection -> Int -> IO RpbErrorResp
forall a. Message a => Connection -> Int -> IO a
recvGetN Connection
conn (Int
lenInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1)
| Bool
otherwise ->
HostName -> HostName -> IO a
forall a. HostName -> HostName -> a
moduleError HostName
"getResponse" (HostName -> IO a) -> HostName -> IO a
forall a b. (a -> b) -> a -> b
$ HostName
"received unexpected response: expected " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++
MessageTag -> HostName
forall a. Show a => a -> HostName
show MessageTag
expected HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ HostName
", received " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ MessageTag -> HostName
forall a. Show a => a -> HostName
show MessageTag
tag
exchange :: Exchange req resp => Connection -> req -> IO resp
exchange :: Connection -> req -> IO resp
exchange conn :: Connection
conn@Connection{IORef ClientID
Socket
Client
connBuffer :: IORef ClientID
connClient :: Client
connSock :: Socket
connBuffer :: Connection -> IORef ClientID
connClient :: Connection -> Client
connSock :: Connection -> Socket
..} req
req = do
HostName -> HostName -> IO ()
debug HostName
"exchange" (HostName -> IO ()) -> HostName -> IO ()
forall a b. (a -> b) -> a -> b
$ HostName
">>> " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ req -> HostName
forall a. (Show a, Tagged a) => a -> HostName
showM req
req
HostName -> IO resp -> IO resp
forall a. HostName -> IO a -> IO a
onIOException (HostName
"exchange " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ MessageTag -> HostName
forall a. Show a => a -> HostName
show (req -> MessageTag
forall msg. Tagged msg => msg -> MessageTag
messageTag req
req)) (IO resp -> IO resp) -> IO resp -> IO resp
forall a b. (a -> b) -> a -> b
$ do
Connection -> req -> IO ()
forall req. Request req => Connection -> req -> IO ()
sendRequest Connection
conn req
req
Connection -> IO resp
forall a. Response a => Connection -> IO a
recvResponse Connection
conn
exchangeMaybe :: Exchange req resp => Connection -> req -> IO (Maybe resp)
exchangeMaybe :: Connection -> req -> IO (Maybe resp)
exchangeMaybe conn :: Connection
conn@Connection{IORef ClientID
Socket
Client
connBuffer :: IORef ClientID
connClient :: Client
connSock :: Socket
connBuffer :: Connection -> IORef ClientID
connClient :: Connection -> Client
connSock :: Connection -> Socket
..} req
req = do
HostName -> HostName -> IO ()
debug HostName
"exchangeMaybe" (HostName -> IO ()) -> HostName -> IO ()
forall a b. (a -> b) -> a -> b
$ HostName
">>> " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ req -> HostName
forall a. (Show a, Tagged a) => a -> HostName
showM req
req
HostName -> IO (Maybe resp) -> IO (Maybe resp)
forall a. HostName -> IO a -> IO a
onIOException (HostName
"exchangeMaybe " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ MessageTag -> HostName
forall a. Show a => a -> HostName
show (req -> MessageTag
forall msg. Tagged msg => msg -> MessageTag
messageTag req
req)) (IO (Maybe resp) -> IO (Maybe resp))
-> IO (Maybe resp) -> IO (Maybe resp)
forall a b. (a -> b) -> a -> b
$ do
Connection -> req -> IO ()
forall req. Request req => Connection -> req -> IO ()
sendRequest Connection
conn req
req
Connection -> IO (Maybe resp)
forall a. Response a => Connection -> IO (Maybe a)
recvMaybeResponse Connection
conn
exchange_ :: Request req => Connection -> req -> IO ()
exchange_ :: Connection -> req -> IO ()
exchange_ Connection
conn req
req = do
HostName -> HostName -> IO ()
debug HostName
"exchange_" (HostName -> IO ()) -> HostName -> IO ()
forall a b. (a -> b) -> a -> b
$ HostName
">>> " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ req -> HostName
forall a. (Show a, Tagged a) => a -> HostName
showM req
req
HostName -> IO () -> IO ()
forall a. HostName -> IO a -> IO a
onIOException (HostName
"exchange_ " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ MessageTag -> HostName
forall a. Show a => a -> HostName
show (req -> MessageTag
forall msg. Tagged msg => msg -> MessageTag
messageTag req
req)) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Connection -> req -> IO ()
forall req. Request req => Connection -> req -> IO ()
sendRequest Connection
conn req
req
Connection -> MessageTag -> IO ()
recvResponse_ Connection
conn (req -> MessageTag
forall msg. Request msg => msg -> MessageTag
expectedResponse req
req)
sendAll :: Socket -> L.ByteString -> IO ()
sendAll :: Socket -> ByteString -> IO ()
sendAll Socket
sock ByteString
bs = do
Socket -> Bool -> IO ()
setNoPush Socket
sock Bool
True
Socket -> ByteString -> IO ()
L.sendAll Socket
sock ByteString
bs
Socket -> Bool -> IO ()
setNoPush Socket
sock Bool
False
sendRequest :: (Request req) => Connection -> req -> IO ()
sendRequest :: Connection -> req -> IO ()
sendRequest Connection{IORef ClientID
Socket
Client
connBuffer :: IORef ClientID
connClient :: Client
connSock :: Socket
connBuffer :: Connection -> IORef ClientID
connClient :: Connection -> Client
connSock :: Connection -> Socket
..} = Socket -> ByteString -> IO ()
sendAll Socket
connSock (ByteString -> IO ()) -> (req -> ByteString) -> req -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Put -> ByteString
runPut (Put -> ByteString) -> (req -> Put) -> req -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. req -> Put
forall req. Request req => req -> Put
putRequest
recvResponse :: (Response a) => Connection -> IO a
recvResponse :: Connection -> IO a
recvResponse Connection
conn = (a -> HostName) -> IO a -> IO a
forall a. (a -> HostName) -> IO a -> IO a
debugRecv a -> HostName
forall a. (Show a, Tagged a) => a -> HostName
showM (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ a -> IO a
forall b. Response b => b -> IO b
go a
forall a. HasCallStack => a
undefined where
go :: Response b => b -> IO b
go :: b -> IO b
go b
dummy = do
Int
len <- Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word32 -> Int) -> IO Word32 -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` Connection -> Get Word32 -> IO Word32
forall a. Connection -> Get a -> IO a
recvGet Connection
conn Get Word32
getWord32be
Connection -> Int -> b -> MessageTag -> IO b
forall a.
Response a =>
Connection -> Int -> a -> MessageTag -> IO a
getResponse Connection
conn Int
len b
dummy (b -> MessageTag
forall msg. Tagged msg => msg -> MessageTag
messageTag b
dummy)
recvResponse_ :: Connection -> T.MessageTag -> IO ()
recvResponse_ :: Connection -> MessageTag -> IO ()
recvResponse_ Connection
conn MessageTag
expected = (() -> HostName) -> IO () -> IO ()
forall a. (a -> HostName) -> IO a -> IO a
debugRecv () -> HostName
forall a. Show a => a -> HostName
show (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Int
len <- Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word32 -> Int) -> IO Word32 -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` Connection -> Get Word32 -> IO Word32
forall a. Connection -> Get a -> IO a
recvGet Connection
conn Get Word32
getWord32be
HostName -> Connection -> MessageTag -> Int -> () -> IO ()
forall a. HostName -> Connection -> MessageTag -> Int -> a -> IO a
recvCorrectTag HostName
"recvResponse_" Connection
conn MessageTag
expected (Int
lenInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1) ()
recvMaybeResponse :: (Response a) => Connection -> IO (Maybe a)
recvMaybeResponse :: Connection -> IO (Maybe a)
recvMaybeResponse Connection
conn = (Maybe a -> HostName) -> IO (Maybe a) -> IO (Maybe a)
forall a. (a -> HostName) -> IO a -> IO a
debugRecv (HostName -> (a -> HostName) -> Maybe a -> HostName
forall b a. b -> (a -> b) -> Maybe a -> b
maybe HostName
"Nothing" ((HostName
"Just " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++) (HostName -> HostName) -> (a -> HostName) -> a -> HostName
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> HostName
forall a. (Show a, Tagged a) => a -> HostName
showM)) (IO (Maybe a) -> IO (Maybe a)) -> IO (Maybe a) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$
a -> IO (Maybe a)
forall b. Response b => b -> IO (Maybe b)
go a
forall a. HasCallStack => a
undefined where
go :: Response b => b -> IO (Maybe b)
go :: b -> IO (Maybe b)
go b
dummy = do
Int
len <- Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word32 -> Int) -> IO Word32 -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` Connection -> Get Word32 -> IO Word32
forall a. Connection -> Get a -> IO a
recvGet Connection
conn Get Word32
getWord32be
let tag :: MessageTag
tag = b -> MessageTag
forall msg. Tagged msg => msg -> MessageTag
messageTag b
dummy
if Int
len Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1
then HostName
-> Connection -> MessageTag -> Int -> Maybe b -> IO (Maybe b)
forall a. HostName -> Connection -> MessageTag -> Int -> a -> IO a
recvCorrectTag HostName
"recvMaybeResponse" Connection
conn MessageTag
tag Int
1 Maybe b
forall a. Maybe a
Nothing
else b -> Maybe b
forall a. a -> Maybe a
Just (b -> Maybe b) -> IO b -> IO (Maybe b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` Connection -> Int -> b -> MessageTag -> IO b
forall a.
Response a =>
Connection -> Int -> a -> MessageTag -> IO a
getResponse Connection
conn Int
len b
dummy MessageTag
tag
recvCorrectTag :: String -> Connection -> T.MessageTag -> Int -> a -> IO a
recvCorrectTag :: HostName -> Connection -> MessageTag -> Int -> a -> IO a
recvCorrectTag HostName
func Connection
conn MessageTag
expected Int
len a
v = do
MessageTag
tag <- Connection -> Get MessageTag -> IO MessageTag
forall a. Connection -> Get a -> IO a
recvGet Connection
conn Get MessageTag
getTag
if | MessageTag
tag MessageTag -> MessageTag -> Bool
forall a. Eq a => a -> a -> Bool
== MessageTag
expected -> Connection -> Int -> IO ClientID
recvExactly Connection
conn (Int
lenInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1) IO ClientID -> IO a -> IO a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
v
| MessageTag
tag MessageTag -> MessageTag -> Bool
forall a. Eq a => a -> a -> Bool
== MessageTag
T.ErrorResponse -> RpbErrorResp -> IO a
forall a. RpbErrorResp -> IO a
throwError (RpbErrorResp -> IO a) -> IO RpbErrorResp -> IO a
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Connection -> Int -> IO RpbErrorResp
forall a. Message a => Connection -> Int -> IO a
recvGetN Connection
conn Int
len
| Bool
otherwise -> HostName -> HostName -> IO a
forall a. HostName -> HostName -> a
moduleError HostName
func (HostName -> IO a) -> HostName -> IO a
forall a b. (a -> b) -> a -> b
$
HostName
"received unexpected response: expected " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++
MessageTag -> HostName
forall a. Show a => a -> HostName
show MessageTag
expected HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ HostName
", received " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ MessageTag -> HostName
forall a. Show a => a -> HostName
show MessageTag
tag
debugRecv :: (a -> String) -> IO a -> IO a
#ifdef DEBUG
debugRecv :: (a -> HostName) -> IO a -> IO a
debugRecv a -> HostName
f IO a
act = do
a
r <- IO a
act
HostName -> HostName -> IO ()
debug HostName
"recv" (HostName -> IO ()) -> HostName -> IO ()
forall a b. (a -> b) -> a -> b
$ HostName
"<<< " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ a -> HostName
f a
r
a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
r
#else
debugRecv _ act = act
{-# INLINE debugRecv #-}
#endif
pipe :: (Request req) =>
(Connection -> IO resp) -> Connection -> [req] -> IO [resp]
pipe :: (Connection -> IO resp) -> Connection -> [req] -> IO [resp]
pipe Connection -> IO resp
_ Connection
_ [] = [resp] -> IO [resp]
forall (m :: * -> *) a. Monad m => a -> m a
return []
pipe Connection -> IO resp
receive conn :: Connection
conn@Connection{IORef ClientID
Socket
Client
connBuffer :: IORef ClientID
connClient :: Client
connSock :: Socket
connBuffer :: Connection -> IORef ClientID
connClient :: Connection -> Client
connSock :: Connection -> Socket
..} [req]
reqs = do
let numReqs :: Int
numReqs = [req] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [req]
reqs
let tag :: HostName
tag = MessageTag -> HostName
forall a. Show a => a -> HostName
show (req -> MessageTag
forall msg. Tagged msg => msg -> MessageTag
messageTag ([req] -> req
forall a. [a] -> a
head [req]
reqs))
if Int
Debug.level Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
1
then [req] -> (req -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [req]
reqs ((req -> IO ()) -> IO ()) -> (req -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \req
req -> HostName -> HostName -> IO ()
debug HostName
"pipe" (HostName -> IO ()) -> HostName -> IO ()
forall a b. (a -> b) -> a -> b
$ HostName
">>> " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ req -> HostName
forall a. (Show a, Tagged a) => a -> HostName
showM req
req
else HostName -> HostName -> IO ()
debug HostName
"pipe" (HostName -> IO ()) -> HostName -> IO ()
forall a b. (a -> b) -> a -> b
$ HostName
">>> " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ Int -> HostName
forall a. Show a => a -> HostName
show Int
numReqs HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ HostName
"x " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ HostName
tag
Async [resp]
receiveResps <- IO [resp] -> IO (Async [resp])
forall a. IO a -> IO (Async a)
async (IO [resp] -> IO (Async [resp]))
-> (IO resp -> IO [resp]) -> IO resp -> IO (Async [resp])
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO resp -> IO [resp]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
numReqs (IO resp -> IO (Async [resp])) -> IO resp -> IO (Async [resp])
forall a b. (a -> b) -> a -> b
$ Connection -> IO resp
receive Connection
conn
Async ()
sendReqs <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ()))
-> ([req] -> IO ()) -> [req] -> IO (Async ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket -> ByteString -> IO ()
sendAll Socket
connSock (ByteString -> IO ()) -> ([req] -> ByteString) -> [req] -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Put -> ByteString
runPut (Put -> ByteString) -> ([req] -> Put) -> [req] -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (req -> Put) -> [req] -> Put
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ req -> Put
forall req. Request req => req -> Put
putRequest ([req] -> IO (Async ())) -> [req] -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ [req]
reqs
(()
_, [resp]
resps) <- HostName -> IO ((), [resp]) -> IO ((), [resp])
forall a. HostName -> IO a -> IO a
onIOException (HostName
"pipe " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ HostName
tag) (IO ((), [resp]) -> IO ((), [resp]))
-> IO ((), [resp]) -> IO ((), [resp])
forall a b. (a -> b) -> a -> b
$
Async () -> Async [resp] -> IO ((), [resp])
forall a b. Async a -> Async b -> IO (a, b)
waitBoth Async ()
sendReqs Async [resp]
receiveResps
[resp] -> IO [resp]
forall (m :: * -> *) a. Monad m => a -> m a
return [resp]
resps
pipeline :: (Exchange req resp) => Connection -> [req] -> IO [resp]
pipeline :: Connection -> [req] -> IO [resp]
pipeline = (Connection -> IO resp) -> Connection -> [req] -> IO [resp]
forall req resp.
Request req =>
(Connection -> IO resp) -> Connection -> [req] -> IO [resp]
pipe Connection -> IO resp
forall a. Response a => Connection -> IO a
recvResponse
pipelineMaybe :: (Exchange req resp) => Connection -> [req] -> IO [Maybe resp]
pipelineMaybe :: Connection -> [req] -> IO [Maybe resp]
pipelineMaybe = (Connection -> IO (Maybe resp))
-> Connection -> [req] -> IO [Maybe resp]
forall req resp.
Request req =>
(Connection -> IO resp) -> Connection -> [req] -> IO [resp]
pipe Connection -> IO (Maybe resp)
forall a. Response a => Connection -> IO (Maybe a)
recvMaybeResponse
pipeline_ :: (Request req) => Connection -> [req] -> IO ()
pipeline_ :: Connection -> [req] -> IO ()
pipeline_ Connection
_ [] = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
pipeline_ conn :: Connection
conn@Connection{IORef ClientID
Socket
Client
connBuffer :: IORef ClientID
connClient :: Client
connSock :: Socket
connBuffer :: Connection -> IORef ClientID
connClient :: Connection -> Client
connSock :: Connection -> Socket
..} [req]
reqs = do
Async ()
receiveResps <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$
[req] -> (req -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [req]
reqs (Connection -> MessageTag -> IO ()
recvResponse_ Connection
conn (MessageTag -> IO ()) -> (req -> MessageTag) -> req -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. req -> MessageTag
forall msg. Request msg => msg -> MessageTag
expectedResponse)
if Int
Debug.level Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
1
then [req] -> (req -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [req]
reqs ((req -> IO ()) -> IO ()) -> (req -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \req
req -> HostName -> HostName -> IO ()
debug HostName
"pipe" (HostName -> IO ()) -> HostName -> IO ()
forall a b. (a -> b) -> a -> b
$ HostName
">>> " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ req -> HostName
forall a. (Show a, Tagged a) => a -> HostName
showM req
req
else HostName -> HostName -> IO ()
debug HostName
"pipe" (HostName -> IO ()) -> HostName -> IO ()
forall a b. (a -> b) -> a -> b
$ HostName
">>> " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ Int -> HostName
forall a. Show a => a -> HostName
show ([req] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [req]
reqs) HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ HostName
"x " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++
MessageTag -> HostName
forall a. Show a => a -> HostName
show (req -> MessageTag
forall msg. Tagged msg => msg -> MessageTag
messageTag ([req] -> req
forall a. [a] -> a
head [req]
reqs))
Async ()
sendReqs <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ()))
-> ([req] -> IO ()) -> [req] -> IO (Async ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket -> ByteString -> IO ()
sendAll Socket
connSock (ByteString -> IO ()) -> ([req] -> ByteString) -> [req] -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Put -> ByteString
runPut (Put -> ByteString) -> ([req] -> Put) -> [req] -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (req -> Put) -> [req] -> Put
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ req -> Put
forall req. Request req => req -> Put
putRequest ([req] -> IO (Async ())) -> [req] -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ [req]
reqs
((), ())
_ <- HostName -> IO ((), ()) -> IO ((), ())
forall a. HostName -> IO a -> IO a
onIOException HostName
"pipeline_" (IO ((), ()) -> IO ((), ())) -> IO ((), ()) -> IO ((), ())
forall a b. (a -> b) -> a -> b
$ Async () -> Async () -> IO ((), ())
forall a b. Async a -> Async b -> IO (a, b)
waitBoth Async ()
sendReqs Async ()
receiveResps
() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
onIOException :: String -> IO a -> IO a
onIOException :: HostName -> IO a -> IO a
onIOException HostName
func IO a
act =
IO a
act IO a -> (IOException -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`E.catch` \(IOException
e::IOException) -> do
let s :: HostName
s = IOException -> HostName
forall a. Show a => a -> HostName
show IOException
e
HostName -> HostName -> IO ()
debug HostName
func (HostName -> IO ()) -> HostName -> IO ()
forall a b. (a -> b) -> a -> b
$ HostName
"caught IO exception: " HostName -> HostName -> HostName
forall a. [a] -> [a] -> [a]
++ HostName
s
HostName -> HostName -> IO a
forall a. HostName -> HostName -> a
moduleError HostName
func HostName
s
moduleError :: String -> String -> a
moduleError :: HostName -> HostName -> a
moduleError = HostName -> HostName -> HostName -> a
forall a. HostName -> HostName -> HostName -> a
netError HostName
"Network.Riak.Connection.Internal"