streaming-utils-0.2.0.0: http, attoparsec, pipes and other utilities for the streaming libraries

Safe HaskellNone
LanguageHaskell2010

Streaming.Network.TCP

Contents

Description

This hyper-minimal module closely follows the corresponding module in Renzo Carbonara' 'pipes-network' package. It is meant to be used together with the Network.Simple.TCP module from Carbonara's network-simple package, which is completely re-exported from this module.

Synopsis

Receiving

fromSocket Source #

Arguments

:: MonadIO m 
=> Socket

Connected socket.

-> Int

Maximum number of bytes to receive and send dowstream at once. Renzo recommends using 4096 if you don't have a special purpose.

-> ByteString m () 

Receives bytes from a connected socket with a maximum chunk size. The bytestream ends if the remote peer closes its side of the connection or EOF is received. The implementation is as follows:

fromSocket sock nbytes = loop where
  loop = do
    bs <- liftIO (NSB.recv sock nbytes)
    if B.null bs 
      then return ()
      else Q.chunk bs >> loop

Sending

toSocket Source #

Arguments

:: MonadIO m 
=> Socket

Connected socket.

-> ByteString m r 
-> m r 

Connect a stream of bytes to the remote end. The implementation is again very simple:

toSocket sock = loop where
 loop bs = do
   e <- Q.nextChunk bs
   case e of
     Left r -> return r
     Right (b,rest) -> send sock b >> loop rest

Simple demos

Here are a collection of little hello telnet world programs, following Michael Snoyberg's post on using network-conduit together with async I hope the reader will find that they are a bit more intelligible when we think naively of 'byte streams' as ordinary Haskell entities, rather than conduits trapped in a framework. (In fact they're pretty straightforward either way, of course.) The complete source is appended to this module below.

  • serverToUpper - a server on 4000 that sends back input sent e.g. with telnet upper-cased
  • serverDouble - a server on 4001 that sends back input doubled, Char8 by Char8
  • clientToUpper - a client through which the user interacts directly with the upper-caser
  • clientPipeline - a client that sends material to the uppercasing server and then the doubling server and returns it to the user
  • proxyToUpper - a proxy on 4002 that sends input to the uppercasing server on 4000
  • proxyAuth - a proxy on 4003 that asks for demands authorization before condescending to send user input to the upper-casing server on 4000

The following remarks will require that eight instances of a terminal all be opened; a crude option parser will make the examples usable with one executable:

   $ streaming-network-tcp-examples --help
   Usage: streaming-network-tcp-examples COMMAND
   Available options:
     -h,--help                Show this help text
   Available commands:
     ClientPipeline           
     ClientToUpper            
     ProxyAuth                
     ProxyToUpper             
     ServePipes               
     ServerDouble             
     ServerToUpper

Since most examples use the uppercasing service, which looks like this:

   serverToUpper :: IO ()
   serverToUpper = do
       putStrLn "Opening upper-casing service on 4000"
       serve (Host "127.0.0.1") "4000" $ \(client,_) -> 
         fromSocket client 4096 -- raw bytes are received from a telnet user or the like
         & Q.map toUpper        -- we map them to uppercase
         & toSocket client      -- and send them back

we start it in one terminal:

   term1$ streaming-network-tcp-examples ServerToUpper
   Opening upper-casing service on 4000

then in another terminal we can telnet to it:

   term2$ telnet localhost 4000
   Trying 127.0.0.1...
   Connected to localhost.
   Escape character is '^]'.
   hello -- <-- our input
   HELLO
   ...

or we can scrap telnet and use a dedicated Haskell client. This is a little subtler:

   clientToUpper :: IO ()
   clientToUpper = connect "127.0.0.1" "4000" $ \(socket,_) -> do
     let act1 = toSocket socket Q.stdin           -- we send our stdin to the service
         act2 = Q.stdout (fromSocket socket 4096) -- we read our stdout from the service
     concurrently act1 act2                       -- but we do each on a separate thread
     return ()

Here, we stream standard input to the remote end indefinitely, and we stream news from the remote end to standard output indefinitely. The two open ended processes are run them together with Control.Concurrent.Async.concurrently, so we see:

   term3$ streaming-network-tcp-examples ClientToUpper
   el pueblo unido jamas sera vencido!  -- our input
   EL PUEBLO UNIDO JAMAS SERA VENCIDO!
   el pueblo unido jamas sera vencido!  -- our input
   EL PUEBLO UNIDO JAMAS SERA VENCIDO!
   ...

To complicate the system of connections, we can also start a second server, which again just makes a trivial alteration in the bytestream, doubling each Char8:

   serverDoubler :: IO ()
   serverDoubler = do 
     putStrLn "Double server available on 4001"
     serve (Host "127.0.0.1") "4001" $ \(socket, remoteAddr) -> 
       fromSocket socket 4096                       -- raw bytes from a client
         & Q.toChunks                               -- are munged ...
         & S.map (B.concatMap (\x -> B.pack [x,x])) -- with standard bytestream materials
         & Q.fromChunks                             -- ...
         & toSocket socket                          -- and sent back

starting it up thus:

    term4$ streaming-network-tcp-examples ServerDouble

we see:

    term5$ telnet localhost 4001
    Trying 127.0.0.1...
    Connected to localhost.
    Escape character is '^]'.
    hello
    hheelllloo

Now we complicate our use of the async library with a Haskell client that interacts with 4000 and 4001 together:

   clientPipeline :: IO ()
   clientPipeline = do
     putStrLn "We will connect stdin to 4000 and 4001 in succession."
     putStrLn "Input will thus be uppercased and doubled char-by-char.\n"
     connect "127.0.0.1" "4000" $ \(socket1,_) ->
       connect "127.0.0.1" "4001" $ \(socket2,_) ->
         do let act1 = toSocket socket1 Q.stdin
                                                    -- we send out stdin to the uppercaser
                act2 = toSocket socket2 (fromSocket socket1 4096)
                                                    -- we send the results from the uppercase to the doubler
                act3 = Q.stdout (fromSocket socket2 4096)
                                                    -- we send the doubler's output to stdout
            runConcurrently $ Concurrently act1 *>  -- all this simultaneously
                              Concurrently act2 *>
                              Concurrently act3

Note the use of the Applicative instance for Concurrently from the async library to make the three stream operations simultaneous. Then we see:

   term6$ streaming-network-tcp-examples ClientPipeline
   hello
   HHEELLLLOO

The upper-caser is open on 4000 but don't tell the children. The last program does a little manipulation of the bytestream to demand authorization on 4003

   term7$ streaming-network-tcp-examples ProxyAuth

which then elsewhere permits

   term8$ telnet localhost 4003
   Trying 127.0.0.1...
   Connected to localhost.
   Escape character is '^]'.
   Username: spaceballs
   Password: 12345
   Successfully authenticated.
   hello
   HELLO
   hello!
   HELLO!

Source

-- streaming-network-tcp-examples.hs
module Main where

import Streaming
import Streaming.Network.TCP
import qualified Streaming.Prelude as S
import qualified Data.ByteString.Streaming  as Q

import Control.Concurrent.Async      -- cabal install async
import qualified Data.ByteString as B
import Data.ByteString (ByteString)
import Data.Word8 (toUpper, _cr)     -- cabal install word8
import Data.Function ((&))
import Options.Applicative           -- cabal install optparse-applicative
import Control.Applicative
import Control.Monad
import Data.Monoid

serverToUpper :: IO ()
serverToUpper = do
    putStrLn "Opening upper-casing service on 4000"
    serve (Host "127.0.0.1") "4000" $ \(client,_) -> 
      toSocket client $ Q.map toUpper $ fromSocket client 4096 

serverDoubler :: IO ()
serverDoubler = do 
  putStrLn "Double server available on 4001"
  serve (Host "127.0.0.1") "4001" $ \(client, remoteAddr) -> 
    fromSocket client 4096
          & Q.toChunks
          & S.map (B.concatMap (\x -> B.pack [x,x]))
          & Q.fromChunks
          & toSocket client

clientToUpper :: IO ()
clientToUpper = connect "127.0.0.1" "4000" $ \(server,_) -> do
  let act1 = toSocket server Q.stdin  
      act2 = Q.stdout (fromSocket server 4096) 
  concurrently act1 act2 
  return ()

clientPipeline :: IO ()
clientPipeline = do
  putStrLn "We will connect stdin to 4000 and 4001 in succession."
  putStrLn "Input will thus be uppercased and doubled char-by-char.\n"
  connect "127.0.0.1" "4000" $ \(socket1,_) ->
    connect "127.0.0.1" "4001" $ \(socket2,_) ->
      do let act1 = toSocket socket1 (Q.stdin)
             act2 = toSocket socket2 (fromSocket socket1 4096)
             act3 = Q.stdout (fromSocket socket2 4096)
         runConcurrently $ Concurrently act1 *>
                           Concurrently act2 *>
                           Concurrently act3

proxyToUpper :: IO ()
proxyToUpper = 
  serve (Host "127.0.0.1") "4002" $ \(client, _) ->
    connect "127.0.0.1" "4000"    $ \(server, _) -> 
      do let act1 =  toSocket server (fromSocket client 4096)
             act2 =  toSocket client (fromSocket server 4096)
         concurrently act1 act2
         return ()

proxyAuth :: IO ()
proxyAuth = serve (Host "127.0.0.1") "4003" process  
  where
  process (client, _) =
    do from_client <- toSocket client (checkAuth (fromSocket client 4096))
       connect  "127.0.0.1" "4000"  $ \(server,_) ->
         do let pipe_forward = toSocket server from_client 
                pipe_back    = toSocket client (fromSocket server 4096) 
            concurrently pipe_forward pipe_back
            return ()

  checkAuth :: MonadIO m => Q.ByteString m r -> Q.ByteString m (Q.ByteString m r)
  checkAuth p = do 
     Q.chunk "Username: "
     (username,p1) <- lift $ shortLineInput 80 p
     Q.chunk "Password: "
     (password,p2) <- lift $ shortLineInput 80 p1
     if (username, password) `elem` creds
          then Q.chunk "Successfully authenticated.\n"
          else do Q.chunk "Invalid username/password.\n"
                  error "Invalid authentication, please log somewhere..."
     return p2 -- when using `error`
 
  shortLineInput n bs = do
     (bs:>rest) <- Q.toStrict $ Q.break (==10) $ Q.splitAt n bs
     return $ (B.filter (/= _cr) bs, Q.drop 1 $ rest >>= id) 
    
  creds :: [(ByteString, ByteString)]
  creds = [ ("spaceballs", "12345") ]

main :: IO ()
main = join $ execParser (info opts idm) where

  opts :: Parser (IO ())
  opts = helper <*> subparser stuff where 
     stuff = mconcat
      [ command "ClientPipeline" (info (pure clientPipeline) idm)
      , command "ClientToUpper"  (info (pure clientToUpper) idm)
      , command "ProxyAuth"      (info (pure proxyAuth) idm)
      , command "ProxyToUpper"   (info (pure proxyToUpper) idm)
      , command "ServerDouble"   (info (pure serverDoubler) idm)
      , command "ServerToUpper"  (info (pure serverToUpper) idm)
      ]

Re-exports

data Socket #

A socket data type. Sockets are not GCed unless they are closed by close.

Instances
Eq Socket 
Instance details

Defined in Network.Socket.Types

Methods

(==) :: Socket -> Socket -> Bool #

(/=) :: Socket -> Socket -> Bool #

Show Socket 
Instance details

Defined in Network.Socket.Types

type HostName = String #

Either a host name e.g., "haskell.org" or a numeric host address string consisting of a dotted decimal IPv4 address or an IPv6 address e.g., "192.168.0.1".

withSocketsDo :: IO a -> IO a #

With older versions of the network library (version 2.6.0.2 or earlier) on Windows operating systems, the networking subsystem must be initialised using withSocketsDo before any networking operations can be used. eg.

main = withSocketsDo $ do {...}

It is fine to nest calls to withSocketsDo, and to perform networking operations after withSocketsDo has returned.

In newer versions of the network library (version v2.6.1.0 or later) it is only necessary to call withSocketsDo if you are calling the MkSocket constructor directly. However, for compatibility with older versions on Windows, it is good practice to always call withSocketsDo (it's very cheap).

data SockAddr #

The existence of a constructor does not necessarily imply that that socket address type is supported on your system: see isSupportedSockAddr.

Instances
Eq SockAddr 
Instance details

Defined in Network.Socket.Types

Ord SockAddr 
Instance details

Defined in Network.Socket.Types

sendMany :: MonadIO m => Socket -> [ByteString] -> m () #

Writes the given list of ByteStrings to the socket.

Note: This uses writev(2) on POSIX.

sendLazy :: MonadIO m => Socket -> ByteString -> m () #

Writes a lazy ByteString to the socket.

Note: This uses writev(2) on POSIX.

send :: MonadIO m => Socket -> ByteString -> m () #

Writes a ByteString to the socket.

Note: On POSIX, calling sendLazy once is much more efficient than repeatedly calling send on strict ByteStrings. Use sendLazy sock . fromChunks if you have more than one strict ByteString to send.

recv :: MonadIO m => Socket -> Int -> m (Maybe ByteString) #

Read up to a limited number of bytes from a socket.

Returns Nothing if the remote end closed the connection or end-of-input was reached. The number of returned bytes might be less than the specified limit, but it will never null.

closeSock :: MonadIO m => Socket -> m () #

Shuts down and closes the Socket, silently ignoring any synchronous exception that might happen.

bindSock #

Arguments

:: MonadIO m 
=> HostPreference

Host to bind.

-> ServiceName

Server service port name or number to bind.

-> m (Socket, SockAddr)

Bound socket and address.

Obtain a Socket bound to the given host name and TCP service port.

The obtained Socket should be closed manually using closeSock when it's not needed anymore.

Prefer to use listen if you will be listening on this socket and using it within a limited scope, and would like it to be closed immediately after its usage or in case of exceptions.

Note: The NoDelay, KeepAlive and ReuseAddr options are set on the socket.

connectSock #

Arguments

:: MonadIO m 
=> HostName

Server hostname or IP address.

-> ServiceName

Server service port name or number.

-> m (Socket, SockAddr)

Connected socket and server address.

Obtain a Socket connected to the given host and TCP service port.

The obtained Socket should be closed manually using closeSock when it's not needed anymore, otherwise you risk having the connection and socket open for much longer than needed.

Prefer to use connect if you will be using the socket within a limited scope and would like it to be closed immediately after its usage or in case of exceptions.

Note: The NoDelay and KeepAlive options are set on the socket.

acceptFork #

Arguments

:: MonadIO m 
=> Socket

Listening and bound socket.

-> ((Socket, SockAddr) -> IO ())

Computation to run in a different thread once an incoming connection is accepted. Takes the connection socket and remote end address.

-> m ThreadId 

Accept a single incoming connection and use it in a different thread.

The connection socket is shut down and closed when done or in case of exceptions.

accept #

Arguments

:: (MonadIO m, MonadMask m) 
=> Socket

Listening and bound socket.

-> ((Socket, SockAddr) -> m r)

Computation to run once an incoming connection is accepted. Takes the connection socket and remote end address.

-> m r 

Accept a single incoming connection and use it.

The connection socket is shut down and closed when done or in case of exceptions.

listen #

Arguments

:: (MonadIO m, MonadMask m) 
=> HostPreference

Host to bind.

-> ServiceName

Server service port name or number to bind.

-> ((Socket, SockAddr) -> m r)

Computation taking the listening socket and the address it's bound to.

-> m r 

Bind a TCP listening socket and use it.

The listening socket is closed when done or in case of exceptions.

If you prefer to acquire and close the socket yourself, then use bindSock and closeSock, as well as listenSock function.

Note: The NoDelay, KeepAlive and ReuseAddr options are set on the socket. The maximum number of incoming queued connections is 2048.

serve #

Arguments

:: MonadIO m 
=> HostPreference

Host to bind.

-> ServiceName

Server service port name or number to bind.

-> ((Socket, SockAddr) -> IO ())

Computation to run in a different thread once an incoming connection is accepted. Takes the connection socket and remote end address.

-> m a

This function never returns.

Start a TCP server that accepts incoming connections and handles them concurrently in different threads.

Any acquired sockets are properly shut down and closed when done or in case of exceptions. Exceptions from the threads handling the individual connections won't cause serve to die.

Note: This function performs listen, acceptFork, so don't perform those manually.

connect #

Arguments

:: (MonadIO m, MonadMask m) 
=> HostName

Server hostname or IP address.

-> ServiceName

Server service port name or number.

-> ((Socket, SockAddr) -> m r)

Computation taking the communication socket and the server address.

-> m r 

Connect to a TCP server and use the connection.

The connection socket is shut down and closed when done or in case of exceptions.

If you prefer to acquire and close the socket yourself, then use connectSock and closeSock.

Note: The NoDelay and KeepAlive options are set on the socket.

data HostPreference #

Preferred host to bind.

Constructors

HostAny

Any available host.

HostIPv4

Any available IPv4 host.

HostIPv6

Any available IPv6 host.

Host HostName

An explicit host name.