module Data.Streaming.Network
(
ServerSettings
, ClientSettings
, HostPreference
, Message (..)
, AppData
#if !WINDOWS
, ServerSettingsUnix
, ClientSettingsUnix
, AppDataUnix
#endif
, serverSettingsTCP
, serverSettingsTCPSocket
, clientSettingsTCP
, serverSettingsUDP
, clientSettingsUDP
#if !WINDOWS
, serverSettingsUnix
, clientSettingsUnix
#endif
, message
, HasPort (..)
, HasAfterBind (..)
, HasReadWrite (..)
#if !WINDOWS
, HasPath (..)
#endif
, setPort
, setHost
, setAddrFamily
, setAfterBind
, setNeedLocalAddr
#if !WINDOWS
, setPath
#endif
, getPort
, getHost
, getAddrFamily
, getAfterBind
, getNeedLocalAddr
#if !WINDOWS
, getPath
#endif
, appRead
, appWrite
, appSockAddr
, appLocalAddr
, appCloseConnection
, bindPortGen
, bindRandomPortGen
, getSocketGen
, getSocketFamilyGen
, acceptSafe
, unassignedPorts
, getUnassignedPort
, bindPortTCP
, bindRandomPortTCP
, getSocketTCP
, getSocketFamilyTCP
, safeRecv
, runTCPServer
, runTCPClient
, ConnectionHandle (..)
, runTCPServerWithHandle
, bindPortUDP
, bindRandomPortUDP
, getSocketUDP
#if !WINDOWS
, bindPath
, getSocketUnix
, runUnixServer
, runUnixClient
#endif
) where
import qualified Network.Socket as NS
import Data.Streaming.Network.Internal
import Control.Concurrent (threadDelay)
import Control.Exception (IOException, try, SomeException, throwIO, bracketOnError)
import Network.Socket (Socket, AddrInfo, SocketType)
import Network.Socket.ByteString (recv, sendAll)
import System.IO.Error (isDoesNotExistError)
import qualified Data.ByteString.Char8 as S8
import qualified Control.Exception as E
import Data.ByteString (ByteString)
import System.Directory (removeFile)
import Data.Functor.Constant (Constant (Constant), getConstant)
import Data.Functor.Identity (Identity (Identity), runIdentity)
import Control.Concurrent (forkIO)
import Control.Monad (forever)
import Data.IORef (IORef, newIORef, atomicModifyIORef)
import Data.Array.Unboxed ((!), UArray, bounds, listArray)
import System.IO.Unsafe (unsafePerformIO)
import System.Random (randomRIO)
#if WINDOWS
import Control.Concurrent.MVar (putMVar, takeMVar, newEmptyMVar)
#endif
getSocketFamilyGen :: SocketType -> String -> Int -> NS.Family -> IO (Socket, AddrInfo)
getSocketFamilyGen sockettype host' port' af = do
let hints = NS.defaultHints {
NS.addrFlags = [NS.AI_ADDRCONFIG]
, NS.addrSocketType = sockettype
, NS.addrFamily = af
}
(addr:_) <- NS.getAddrInfo (Just hints) (Just host') (Just $ show port')
sock <- NS.socket (NS.addrFamily addr) (NS.addrSocketType addr)
(NS.addrProtocol addr)
return (sock, addr)
getSocketGen :: SocketType -> String -> Int -> IO (Socket, AddrInfo)
getSocketGen sockettype host port = getSocketFamilyGen sockettype host port NS.AF_UNSPEC
bindPortGen :: SocketType -> Int -> HostPreference -> IO Socket
bindPortGen sockettype p s = do
let hints = NS.defaultHints
{ NS.addrFlags = [ NS.AI_PASSIVE
, NS.AI_NUMERICSERV
, NS.AI_NUMERICHOST
]
, NS.addrSocketType = sockettype
}
host =
case s of
Host s' -> Just s'
_ -> Nothing
port = Just . show $ p
addrs <- NS.getAddrInfo (Just hints) host port
let addrs4 = filter (\x -> NS.addrFamily x /= NS.AF_INET6) addrs
addrs6 = filter (\x -> NS.addrFamily x == NS.AF_INET6) addrs
addrs' =
case s of
HostIPv4 -> addrs4 ++ addrs6
HostIPv4Only -> addrs4
HostIPv6 -> addrs6 ++ addrs4
HostIPv6Only -> addrs6
_ -> addrs
tryAddrs (addr1:rest@(_:_)) =
E.catch
(theBody addr1)
(\(_ :: IOException) -> tryAddrs rest)
tryAddrs (addr1:[]) = theBody addr1
tryAddrs _ = error "bindPort: addrs is empty"
sockOpts =
case sockettype of
NS.Datagram -> [(NS.ReuseAddr,1)]
_ -> [(NS.NoDelay,1), (NS.ReuseAddr,1)]
theBody addr =
bracketOnError
(NS.socket (NS.addrFamily addr) (NS.addrSocketType addr) (NS.addrProtocol addr))
NS.sClose
(\sock -> do
mapM_ (\(opt,v) -> NS.setSocketOption sock opt v) sockOpts
NS.bindSocket sock (NS.addrAddress addr)
return sock
)
tryAddrs addrs'
bindRandomPortGen :: SocketType -> HostPreference -> IO (Int, Socket)
bindRandomPortGen sockettype s =
loop 30
where
loop cnt = do
port <- getUnassignedPort
esocket <- try $ bindPortGen sockettype port s
case esocket :: Either IOException Socket of
Left e
| cnt <= 1 -> error $ concat
[ "Data.Streaming.Network.bindRandomPortGen: Could not get port. Last attempted: "
, show port
, ". Exception was: "
, show e
]
| otherwise -> do
skipUnassigned 50
loop $! cnt 1
Right socket -> return (port, socket)
unassignedPortsList :: [Int]
unassignedPortsList = concat
[ [43124..44320]
, [28120..29166]
, [45967..46997]
, [28241..29117]
, [40001..40840]
, [29170..29998]
, [38866..39680]
, [43442..44122]
, [41122..41793]
, [35358..36000]
]
unassignedPorts :: UArray Int Int
unassignedPorts = listArray (unassignedPortsMin, unassignedPortsMax) unassignedPortsList
unassignedPortsMin, unassignedPortsMax :: Int
unassignedPortsMin = 0
unassignedPortsMax = length unassignedPortsList 1
nextUnusedPort :: IORef Int
nextUnusedPort = unsafePerformIO
$ randomRIO (unassignedPortsMin, unassignedPortsMax) >>= newIORef
getUnassignedPort :: IO Int
getUnassignedPort = do
port <- atomicModifyIORef nextUnusedPort go
return $! port
where
go i
| i > unassignedPortsMax = (succ unassignedPortsMin, unassignedPorts ! unassignedPortsMin)
| otherwise = (succ i, unassignedPorts ! i)
skipUnassigned :: Int -> IO ()
skipUnassigned i = do
!() <- atomicModifyIORef nextUnusedPort $ \j ->
let k = i + j `mod` unassignedPortsMax
in k `seq` (k, ())
return ()
getSocketUDP :: String -> Int -> IO (Socket, AddrInfo)
getSocketUDP = getSocketGen NS.Datagram
bindPortUDP :: Int -> HostPreference -> IO Socket
bindPortUDP = bindPortGen NS.Datagram
bindRandomPortUDP :: HostPreference -> IO (Int, Socket)
bindRandomPortUDP = bindRandomPortGen NS.Datagram
#if !WINDOWS
getSocketUnix :: FilePath -> IO Socket
getSocketUnix path = do
sock <- NS.socket NS.AF_UNIX NS.Stream 0
ee <- try' $ NS.connect sock (NS.SockAddrUnix path)
case ee of
Left e -> NS.sClose sock >> throwIO e
Right () -> return sock
where
try' :: IO a -> IO (Either SomeException a)
try' = try
bindPath :: FilePath -> IO Socket
bindPath path = do
sock <- bracketOnError
(NS.socket NS.AF_UNIX NS.Stream 0)
NS.sClose
(\sock -> do
removeFileSafe path
NS.bindSocket sock (NS.SockAddrUnix path)
return sock)
NS.listen sock (max 2048 NS.maxListenQueue)
return sock
removeFileSafe :: FilePath -> IO ()
removeFileSafe path =
removeFile path `E.catch` handleExists
where
handleExists e
| isDoesNotExistError e = return ()
| otherwise = throwIO e
serverSettingsUnix
:: FilePath
-> ServerSettingsUnix
serverSettingsUnix path = ServerSettingsUnix
{ serverPath = path
, serverAfterBindUnix = const $ return ()
}
clientSettingsUnix
:: FilePath
-> ClientSettingsUnix
clientSettingsUnix path = ClientSettingsUnix
{ clientPath = path
}
#endif
#if defined(__GLASGOW_HASKELL__) && WINDOWS
#define SOCKET_ACCEPT_RECV_WORKAROUND
#endif
safeRecv :: Socket -> Int -> IO ByteString
#ifndef SOCKET_ACCEPT_RECV_WORKAROUND
safeRecv = recv
#else
safeRecv s buf = do
var <- newEmptyMVar
forkIO $ recv s buf `E.catch` (\(_::IOException) -> return S8.empty) >>= putMVar var
takeMVar var
#endif
serverSettingsUDP
:: Int
-> HostPreference
-> ServerSettings
serverSettingsUDP = serverSettingsTCP
serverSettingsTCP
:: Int
-> HostPreference
-> ServerSettings
serverSettingsTCP port host = ServerSettings
{ serverPort = port
, serverHost = host
, serverSocket = Nothing
, serverAfterBind = const $ return ()
, serverNeedLocalAddr = False
}
serverSettingsTCPSocket :: Socket -> ServerSettings
serverSettingsTCPSocket lsocket = ServerSettings
{ serverPort = 0
, serverHost = HostAny
, serverSocket = Just lsocket
, serverAfterBind = const $ return ()
, serverNeedLocalAddr = False
}
clientSettingsUDP
:: Int
-> ByteString
-> ClientSettings
clientSettingsUDP = clientSettingsTCP
clientSettingsTCP
:: Int
-> ByteString
-> ClientSettings
clientSettingsTCP port host = ClientSettings
{ clientPort = port
, clientHost = host
, clientAddrFamily = NS.AF_UNSPEC
}
getSocketFamilyTCP :: ByteString -> Int -> NS.Family -> IO (NS.Socket, NS.SockAddr)
getSocketFamilyTCP host' port' addrFamily = do
(sock, addr) <- getSocketFamilyGen NS.Stream (S8.unpack host') port' addrFamily
ee <- try' $ NS.connect sock (NS.addrAddress addr)
case ee of
Left e -> NS.sClose sock >> throwIO e
Right () -> return (sock, NS.addrAddress addr)
where
try' :: IO a -> IO (Either SomeException a)
try' = try
getSocketTCP :: ByteString -> Int -> IO (NS.Socket, NS.SockAddr)
getSocketTCP host port = getSocketFamilyTCP host port NS.AF_UNSPEC
bindPortTCP :: Int -> HostPreference -> IO Socket
bindPortTCP p s = do
sock <- bindPortGen NS.Stream p s
NS.listen sock (max 2048 NS.maxListenQueue)
return sock
bindRandomPortTCP :: HostPreference -> IO (Int, Socket)
bindRandomPortTCP s = do
(port, sock) <- bindRandomPortGen NS.Stream s
NS.listen sock (max 2048 NS.maxListenQueue)
return (port, sock)
acceptSafe :: Socket -> IO (Socket, NS.SockAddr)
acceptSafe socket =
#ifndef SOCKET_ACCEPT_RECV_WORKAROUND
loop
#else
do var <- newEmptyMVar
forkIO $ loop >>= putMVar var
takeMVar var
#endif
where
loop =
NS.accept socket `E.catch` \(_ :: IOException) -> do
threadDelay 1000000
loop
message :: ByteString -> NS.SockAddr -> Message
message = Message
class HasPort a where
portLens :: Functor f => (Int -> f Int) -> a -> f a
instance HasPort ServerSettings where
portLens f ss = fmap (\p -> ss { serverPort = p }) (f (serverPort ss))
instance HasPort ClientSettings where
portLens f ss = fmap (\p -> ss { clientPort = p }) (f (clientPort ss))
getPort :: HasPort a => a -> Int
getPort = getConstant . portLens Constant
setPort :: HasPort a => Int -> a -> a
setPort p = runIdentity . portLens (const (Identity p))
setHostPref :: HostPreference -> ServerSettings -> ServerSettings
setHostPref hp ss = ss { serverHost = hp }
getHostPref :: ServerSettings -> HostPreference
getHostPref = serverHost
setHost :: ByteString -> ClientSettings -> ClientSettings
setHost hp ss = ss { clientHost = hp }
getHost :: ClientSettings -> ByteString
getHost = clientHost
setAddrFamily :: NS.Family -> ClientSettings -> ClientSettings
setAddrFamily af cs = cs { clientAddrFamily = af }
getAddrFamily :: ClientSettings -> NS.Family
getAddrFamily = clientAddrFamily
#if !WINDOWS
class HasPath a where
pathLens :: Functor f => (FilePath -> f FilePath) -> a -> f a
instance HasPath ServerSettingsUnix where
pathLens f ss = fmap (\p -> ss { serverPath = p }) (f (serverPath ss))
instance HasPath ClientSettingsUnix where
pathLens f ss = fmap (\p -> ss { clientPath = p }) (f (clientPath ss))
getPath :: HasPath a => a -> FilePath
getPath = getConstant . pathLens Constant
setPath :: HasPath a => FilePath -> a -> a
setPath p = runIdentity . pathLens (const (Identity p))
#endif
setNeedLocalAddr :: Bool -> ServerSettings -> ServerSettings
setNeedLocalAddr x y = y { serverNeedLocalAddr = x }
getNeedLocalAddr :: ServerSettings -> Bool
getNeedLocalAddr = serverNeedLocalAddr
class HasAfterBind a where
afterBindLens :: Functor f => ((Socket -> IO ()) -> f (Socket -> IO ())) -> a -> f a
instance HasAfterBind ServerSettings where
afterBindLens f ss = fmap (\p -> ss { serverAfterBind = p }) (f (serverAfterBind ss))
#if !WINDOWS
instance HasAfterBind ServerSettingsUnix where
afterBindLens f ss = fmap (\p -> ss { serverAfterBindUnix = p }) (f (serverAfterBindUnix ss))
#endif
getAfterBind :: HasAfterBind a => a -> (Socket -> IO ())
getAfterBind = getConstant . afterBindLens Constant
setAfterBind :: HasAfterBind a => (Socket -> IO ()) -> a -> a
setAfterBind p = runIdentity . afterBindLens (const (Identity p))
type ConnectionHandle = Socket -> NS.SockAddr -> Maybe NS.SockAddr -> IO ()
runTCPServerWithHandle :: ServerSettings -> ConnectionHandle -> IO ()
runTCPServerWithHandle (ServerSettings port host msocket afterBind needLocalAddr) handle =
case msocket of
Nothing -> E.bracket (bindPortTCP port host) NS.sClose inner
Just lsocket -> inner lsocket
where
inner lsocket = afterBind lsocket >> forever (serve lsocket)
serve lsocket = E.bracketOnError
(acceptSafe lsocket)
(\(socket, _) -> NS.sClose socket)
$ \(socket, addr) -> do
mlocal <- if needLocalAddr
then fmap Just $ NS.getSocketName socket
else return Nothing
_ <- E.mask $ \restore -> forkIO
$ restore (handle socket addr mlocal)
`E.finally` NS.sClose socket
return ()
runTCPServer :: ServerSettings -> (AppData -> IO ()) -> IO ()
runTCPServer settings app = runTCPServerWithHandle settings app'
where app' socket addr mlocal =
let ad = AppData
{ appRead' = safeRecv socket 4096
, appWrite' = sendAll socket
, appSockAddr' = addr
, appLocalAddr' = mlocal
, appCloseConnection' = NS.sClose socket
}
in
app ad
runTCPClient :: ClientSettings -> (AppData -> IO a) -> IO a
runTCPClient (ClientSettings port host addrFamily) app = E.bracket
(getSocketFamilyTCP host port addrFamily)
(NS.sClose . fst)
(\(s, address) -> app AppData
{ appRead' = safeRecv s 4096
, appWrite' = sendAll s
, appSockAddr' = address
, appLocalAddr' = Nothing
, appCloseConnection' = NS.sClose s
})
appLocalAddr :: AppData -> Maybe NS.SockAddr
appLocalAddr = appLocalAddr'
appSockAddr :: AppData -> NS.SockAddr
appSockAddr = appSockAddr'
appCloseConnection :: AppData -> IO ()
appCloseConnection = appCloseConnection'
class HasReadWrite a where
readLens :: Functor f => (IO ByteString -> f (IO ByteString)) -> a -> f a
writeLens :: Functor f => ((ByteString -> IO ()) -> f (ByteString -> IO ())) -> a -> f a
instance HasReadWrite AppData where
readLens f a = fmap (\x -> a { appRead' = x }) (f (appRead' a))
writeLens f a = fmap (\x -> a { appWrite' = x }) (f (appWrite' a))
#if !WINDOWS
instance HasReadWrite AppDataUnix where
readLens f a = fmap (\x -> a { appReadUnix = x }) (f (appReadUnix a))
writeLens f a = fmap (\x -> a { appWriteUnix = x }) (f (appWriteUnix a))
#endif
appRead :: HasReadWrite a => a -> IO ByteString
appRead = getConstant . readLens Constant
appWrite :: HasReadWrite a => a -> ByteString -> IO ()
appWrite = getConstant . writeLens Constant
#if !WINDOWS
runUnixServer :: ServerSettingsUnix -> (AppDataUnix -> IO ()) -> IO ()
runUnixServer (ServerSettingsUnix path afterBind) app = E.bracket
(bindPath path)
NS.sClose
(\socket -> do
afterBind socket
forever $ serve socket)
where
serve lsocket = E.bracketOnError
(acceptSafe lsocket)
(\(socket, _) -> NS.sClose socket)
$ \(socket, _) -> do
let ad = AppDataUnix
{ appReadUnix = safeRecv socket 4096
, appWriteUnix = sendAll socket
}
_ <- E.mask $ \restore -> forkIO
$ restore (app ad)
`E.finally` NS.sClose socket
return ()
runUnixClient :: ClientSettingsUnix -> (AppDataUnix -> IO a) -> IO a
runUnixClient (ClientSettingsUnix path) app = E.bracket
(getSocketUnix path)
NS.sClose
(\sock -> app AppDataUnix
{ appReadUnix = safeRecv sock 4096
, appWriteUnix = sendAll sock
})
#endif