{-# LINE 1 "src/Nanomsg.hsc" #-}
{-# LANGUAGE ForeignFunctionInterface, DeriveDataTypeable #-}
module Nanomsg
(
Pair(..)
, Req(..)
, Rep(..)
, Pub(..)
, Sub(..)
, Surveyor(..)
, Respondent(..)
, Push(..)
, Pull(..)
, Bus(..)
, Socket
, Endpoint
, NNException
, SocketType
, Sender
, Receiver
, socket
, withSocket
, bind
, connect
, send
, recv
, recv'
, subscribe
, unsubscribe
, shutdown
, close
, term
, linger
, setLinger
, sndBuf
, setSndBuf
, rcvBuf
, setRcvBuf
, reconnectInterval
, setReconnectInterval
, reconnectIntervalMax
, setReconnectIntervalMax
, sndPrio
, setSndPrio
, ipv4Only
, setIpv4Only
, requestResendInterval
, setRequestResendInterval
, surveyorDeadline
, setSurveyorDeadline
, tcpNoDelay
, setTcpNoDelay
) where
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as C
import qualified Data.ByteString.Unsafe as U
import Foreign (peek, poke, alloca)
import Foreign.Ptr
import Foreign.C.Types
import Foreign.C.String
import Foreign.Storable (sizeOf)
import Control.Applicative ( (<$>) )
import Control.Exception.Base (bracket)
import Control.Exception (Exception, throwIO)
import Data.Typeable (Typeable)
import Control.Monad (void)
import Text.Printf (printf)
import Control.Concurrent (threadWaitRead, threadWaitWrite)
import System.Posix.Types (Fd(..))
data Pair = Pair
data Req = Req
data Rep = Rep
data Pub = Pub
data Sub = Sub
data Surveyor = Surveyor
data Respondent = Respondent
data Push = Push
data Pull = Pull
data Bus = Bus
data Endpoint = Endpoint CInt
deriving (Eq, Show)
data Socket a = Socket a CInt
deriving (Eq, Show)
class SocketType a where
socketType :: a -> CInt
instance SocketType Pair where
socketType Pair = 16
{-# LINE 193 "src/Nanomsg.hsc" #-}
instance SocketType Req where
socketType Req = 48
{-# LINE 196 "src/Nanomsg.hsc" #-}
instance SocketType Rep where
socketType Rep = 49
{-# LINE 199 "src/Nanomsg.hsc" #-}
instance SocketType Pub where
socketType Pub = 32
{-# LINE 202 "src/Nanomsg.hsc" #-}
instance SocketType Sub where
socketType Sub = 33
{-# LINE 205 "src/Nanomsg.hsc" #-}
instance SocketType Surveyor where
socketType Surveyor = 98
{-# LINE 208 "src/Nanomsg.hsc" #-}
instance SocketType Respondent where
socketType Respondent = 99
{-# LINE 211 "src/Nanomsg.hsc" #-}
instance SocketType Push where
socketType Push = 80
{-# LINE 214 "src/Nanomsg.hsc" #-}
instance SocketType Pull where
socketType Pull = 81
{-# LINE 217 "src/Nanomsg.hsc" #-}
instance SocketType Bus where
socketType Bus = 112
{-# LINE 220 "src/Nanomsg.hsc" #-}
class (SocketType a) => Sender a
instance Sender Pair
instance Sender Req
instance Sender Rep
instance Sender Pub
instance Sender Surveyor
instance Sender Respondent
instance Sender Push
instance Sender Bus
class (SocketType a) => Receiver a
instance Receiver Pair
instance Receiver Req
instance Receiver Rep
instance Receiver Sub
instance Receiver Surveyor
instance Receiver Respondent
instance Receiver Pull
instance Receiver Bus
data NNException = NNException String
deriving (Eq, Show, Typeable)
instance Exception NNException
mkErrorString :: String -> IO String
mkErrorString loc = do
errNo <- c_nn_errno
errCString <- c_nn_strerror errNo
errString <- peekCString errCString
return $ printf "nanomsg-haskell error at %s. Errno %d: %s" loc (fromIntegral errNo :: Int) errString
throwErrno :: String -> IO a
throwErrno loc = do
s <- mkErrorString loc
throwIO $ NNException s
throwErrnoIf :: (a -> Bool) -> String -> IO a -> IO a
throwErrnoIf p loc action = do
res <- action
if p res then throwErrno loc else return res
throwErrnoIf_ :: (a -> Bool) -> String -> IO a -> IO ()
throwErrnoIf_ p loc action = void $ throwErrnoIf p loc action
throwErrnoIfMinus1 :: (Eq a, Num a) => String -> IO a -> IO a
throwErrnoIfMinus1 = throwErrnoIf (== -1)
throwErrnoIfMinus1_ :: (Eq a, Num a) => String -> IO a -> IO ()
throwErrnoIfMinus1_ = throwErrnoIf_ (== -1)
throwErrnoIfRetry :: (a -> Bool) -> String -> IO a -> IO a
throwErrnoIfRetry p loc f = do
res <- f
if p res
then do
err <- c_nn_errno
if err == (11) || err == (4)
{-# LINE 289 "src/Nanomsg.hsc" #-}
then throwErrnoIfRetry p loc f
else throwErrno loc
else return res
throwErrnoIfRetry_ :: (a -> Bool) -> String -> IO a -> IO ()
throwErrnoIfRetry_ p loc f = void $ throwErrnoIfRetry p loc f
throwErrnoIfMinus1Retry_ :: (Eq a, Num a) => String -> IO a -> IO ()
throwErrnoIfMinus1Retry_ = throwErrnoIfRetry_ (== -1)
throwErrnoIfRetryMayBlock :: (a -> Bool) -> String -> IO a -> IO b -> IO a
throwErrnoIfRetryMayBlock p loc f on_block = do
res <- f
if p res
then do
err <- c_nn_errno
if err `elem` [ (11), (4), (11) ]
{-# LINE 311 "src/Nanomsg.hsc" #-}
then do
void on_block
throwErrnoIfRetryMayBlock p loc f on_block
else throwErrno loc
else return res
throwErrnoIfRetryMayBlock_ :: (a -> Bool) -> String -> IO a -> IO b -> IO ()
throwErrnoIfRetryMayBlock_ p loc f on_block = void $ throwErrnoIfRetryMayBlock p loc f on_block
throwErrnoIfMinus1RetryMayBlock :: (Eq a, Num a) => String -> IO a -> IO b -> IO a
throwErrnoIfMinus1RetryMayBlock = throwErrnoIfRetryMayBlock (== -1)
throwErrnoIfMinus1RetryMayBlock_ :: (Eq a, Num a) => String -> IO a -> IO b -> IO ()
throwErrnoIfMinus1RetryMayBlock_ = throwErrnoIfRetryMayBlock_ (== -1)
foreign import ccall safe "nn.h nn_socket"
c_nn_socket :: CInt -> CInt -> IO CInt
foreign import ccall safe "nn.h nn_bind"
c_nn_bind :: CInt -> CString -> IO CInt
foreign import ccall safe "nn.h nn_connect"
c_nn_connect :: CInt -> CString -> IO CInt
foreign import ccall safe "nn.h nn_shutdown"
c_nn_shutdown :: CInt -> CInt -> IO CInt
foreign import ccall safe "nn.h nn_send"
c_nn_send :: CInt -> CString -> CInt -> CInt -> IO CInt
foreign import ccall safe "nn.h nn_recv"
c_nn_recv :: CInt -> Ptr CString -> CInt -> CInt -> IO CInt
foreign import ccall safe "nn.h nn_freemsg"
c_nn_freemsg :: Ptr CChar -> IO CInt
foreign import ccall safe "nn.h nn_close"
c_nn_close :: CInt -> IO CInt
foreign import ccall safe "nn.h nn_term"
c_nn_term :: IO ()
foreign import ccall safe "nn.h nn_setsockopt"
c_nn_setsockopt :: CInt -> CInt -> CInt -> Ptr a -> CInt -> IO CInt
foreign import ccall safe "nn.h nn_getsockopt"
c_nn_getsockopt :: CInt -> CInt -> CInt -> Ptr a -> Ptr CInt -> IO CInt
foreign import ccall safe "nn.h nn_strerror"
c_nn_strerror :: CInt -> IO CString
foreign import ccall safe "nn.h nn_errno"
c_nn_errno :: IO CInt
socket :: (SocketType a) => a -> IO (Socket a)
socket t = do
sid <- throwErrnoIfMinus1 "socket" $ c_nn_socket (1) (socketType t)
{-# LINE 404 "src/Nanomsg.hsc" #-}
return $ Socket t sid
withSocket :: (SocketType a) => a -> (Socket a -> IO b) -> IO b
withSocket t = bracket (socket t) close
bind :: Socket a -> String -> IO Endpoint
bind (Socket _ sid) addr =
withCString addr $ \adr -> do
epid <- throwErrnoIfMinus1 "bind" $ c_nn_bind sid adr
return $ Endpoint epid
connect :: Socket a -> String -> IO Endpoint
connect (Socket _ sid) addr =
withCString addr $ \adr -> do
epid <- throwErrnoIfMinus1 "connect" $ c_nn_connect sid adr
return $ Endpoint epid
shutdown :: Socket a -> Endpoint -> IO ()
shutdown (Socket _ sid) (Endpoint eid) =
throwErrnoIfMinus1_ "shutdown" $ c_nn_shutdown sid eid
send :: Sender a => Socket a -> ByteString -> IO ()
send (Socket t sid) string =
U.unsafeUseAsCStringLen string $ \(ptr, len) ->
throwErrnoIfMinus1RetryMayBlock_
"send"
(c_nn_send sid ptr (fromIntegral len) (1))
{-# LINE 470 "src/Nanomsg.hsc" #-}
(getOptionFd (Socket t sid) (10) >>= threadWaitWrite)
{-# LINE 471 "src/Nanomsg.hsc" #-}
recv :: Receiver a => Socket a -> IO ByteString
recv (Socket t sid) =
alloca $ \ptr -> do
len <- throwErrnoIfMinus1RetryMayBlock
"recv"
(c_nn_recv sid ptr (4294967295) (1))
{-# LINE 479 "src/Nanomsg.hsc" #-}
(getOptionFd (Socket t sid) (11) >>= threadWaitRead)
{-# LINE 480 "src/Nanomsg.hsc" #-}
buf <- peek ptr
str <- C.packCStringLen (buf, fromIntegral len)
throwErrnoIfMinus1_ "recv freeing message buffer" $ c_nn_freemsg buf
return str
recv' :: Receiver a => Socket a -> IO (Maybe ByteString)
recv' (Socket _ sid) =
alloca $ \ptr -> do
len <- c_nn_recv sid ptr (4294967295) (1)
{-# LINE 490 "src/Nanomsg.hsc" #-}
if len >= 0
then do
buf <- peek ptr
str <- C.packCStringLen (buf, fromIntegral len)
throwErrnoIfMinus1_ "recv' freeing message buffer" $ c_nn_freemsg buf
return $ Just str
else do
errno <- c_nn_errno
if errno == (11) || errno == (4)
{-# LINE 499 "src/Nanomsg.hsc" #-}
then return Nothing
else throwErrno "recv'"
subscribe :: Socket Sub -> ByteString -> IO ()
subscribe (Socket t sid) string =
setOption (Socket t sid) (socketType t) (1) (StringOption string)
{-# LINE 506 "src/Nanomsg.hsc" #-}
unsubscribe :: Socket Sub -> ByteString -> IO ()
unsubscribe (Socket t sid) string =
setOption (Socket t sid) (socketType t) (2) (StringOption string)
{-# LINE 511 "src/Nanomsg.hsc" #-}
close :: Socket a -> IO ()
close (Socket _ sid) =
throwErrnoIfMinus1Retry_ "close" $ c_nn_close sid
term :: IO ()
term = c_nn_term
data SocketOption = IntOption Int | StringOption ByteString
deriving (Show)
setOption :: Socket a -> CInt -> CInt -> SocketOption -> IO ()
setOption (Socket _ sid) level option (IntOption val) =
alloca $ \ptr -> do
poke ptr (fromIntegral val :: CInt)
let cintSize = fromIntegral $ sizeOf (fromIntegral val :: CInt) :: CInt
throwErrnoIfMinus1_ "setOption (int)" $ c_nn_setsockopt sid level option ptr cintSize
setOption (Socket _ sid) level option (StringOption str) =
throwErrnoIfMinus1_ "setOption (string)" <$> U.unsafeUseAsCStringLen str $
\(ptr, len) -> c_nn_setsockopt sid level option ptr (fromIntegral len)
getOption :: Socket a -> CInt -> CInt -> IO CInt
getOption (Socket _ sid) level option =
alloca $ \ptr ->
alloca $ \sizePtr -> do
let a = 1 :: CInt
let cintSize = fromIntegral $ sizeOf a
poke sizePtr cintSize
throwErrnoIfMinus1_ "getOption" $ c_nn_getsockopt sid level option (ptr :: Ptr CInt) sizePtr
value <- peek ptr
size <- peek sizePtr
if cintSize /= size then throwErrno "getOption: output size not as expected" else return value
getOptionFd :: Socket a -> CInt -> IO Fd
getOptionFd (Socket _ sid) option =
alloca $ \ptr ->
alloca $ \sizePtr -> do
let a = 1 :: Fd
let fdSize = fromIntegral $ sizeOf a
poke sizePtr fdSize
throwErrnoIfMinus1_ "getOptionFd" $ c_nn_getsockopt sid (0) option (ptr :: Ptr Fd) sizePtr
{-# LINE 567 "src/Nanomsg.hsc" #-}
value <- peek ptr
size <- peek sizePtr
if fdSize /= size then throwErrno "getOptionFd: output size not as expected" else return value
linger :: Socket a -> IO Int
linger s =
fromIntegral <$> getOption s (0) (1)
{-# LINE 578 "src/Nanomsg.hsc" #-}
setLinger :: Socket a -> Int -> IO ()
setLinger s val =
setOption s (0) (1) (IntOption val)
{-# LINE 586 "src/Nanomsg.hsc" #-}
sndBuf :: Socket a -> IO Int
sndBuf s =
fromIntegral <$> getOption s (0) (2)
{-# LINE 595 "src/Nanomsg.hsc" #-}
setSndBuf :: Socket a -> Int -> IO ()
setSndBuf s val =
setOption s (0) (2) (IntOption val)
{-# LINE 604 "src/Nanomsg.hsc" #-}
rcvBuf :: Socket a -> IO Int
rcvBuf s =
fromIntegral <$> getOption s (0) (3)
{-# LINE 613 "src/Nanomsg.hsc" #-}
setRcvBuf :: Socket a -> Int -> IO ()
setRcvBuf s val =
setOption s (0) (3) (IntOption val)
{-# LINE 622 "src/Nanomsg.hsc" #-}
reconnectInterval :: Socket a -> IO Int
reconnectInterval s =
fromIntegral <$> getOption s (0) (6)
{-# LINE 639 "src/Nanomsg.hsc" #-}
setReconnectInterval :: Socket a -> Int -> IO ()
setReconnectInterval s val =
setOption s (0) (6) (IntOption val)
{-# LINE 651 "src/Nanomsg.hsc" #-}
reconnectIntervalMax :: Socket a -> IO Int
reconnectIntervalMax s =
fromIntegral <$> getOption s (0) (7)
{-# LINE 664 "src/Nanomsg.hsc" #-}
setReconnectIntervalMax :: Socket a -> Int -> IO ()
setReconnectIntervalMax s val =
setOption s (0) (7) (IntOption val)
{-# LINE 677 "src/Nanomsg.hsc" #-}
sndPrio :: Socket a -> IO Int
sndPrio s =
fromIntegral <$> getOption s (0) (8)
{-# LINE 688 "src/Nanomsg.hsc" #-}
setSndPrio :: Socket a -> Int -> IO ()
setSndPrio s val =
setOption s (0) (8) (IntOption val)
{-# LINE 699 "src/Nanomsg.hsc" #-}
ipv4Only :: Socket a -> IO Int
ipv4Only s =
fromIntegral <$> getOption s (0) (14)
{-# LINE 707 "src/Nanomsg.hsc" #-}
setIpv4Only :: Socket a -> Int -> IO ()
setIpv4Only s val =
setOption s (0) (14) (IntOption val)
{-# LINE 715 "src/Nanomsg.hsc" #-}
requestResendInterval :: Socket Req -> IO Int
requestResendInterval s =
fromIntegral <$> getOption s (48) (1)
{-# LINE 724 "src/Nanomsg.hsc" #-}
setRequestResendInterval :: Socket Req -> Int -> IO ()
setRequestResendInterval s val =
setOption s (48) (1) (IntOption val)
{-# LINE 733 "src/Nanomsg.hsc" #-}
surveyorDeadline :: Socket Surveyor -> IO Int
surveyorDeadline s =
fromIntegral <$> getOption s (98) (1)
{-# LINE 738 "src/Nanomsg.hsc" #-}
setSurveyorDeadline :: Socket Surveyor -> Int -> IO ()
setSurveyorDeadline s val =
setOption s (98) (1) (IntOption val)
{-# LINE 743 "src/Nanomsg.hsc" #-}
tcpNoDelay :: Socket a -> IO Int
tcpNoDelay s =
fromIntegral <$> getOption s (-3) (1)
{-# LINE 750 "src/Nanomsg.hsc" #-}
setTcpNoDelay :: Socket a -> Int -> IO ()
setTcpNoDelay s val =
setOption s (-3) (1) (IntOption val)
{-# LINE 757 "src/Nanomsg.hsc" #-}