Copyright | (c) 2014-2015, Peter Trško |
---|---|
License | BSD3 |
Maintainer | peter.trsko@gmail.com |
Stability | unstable |
Portability | CPP, NoImplicitPrelude |
Safe Haskell | None |
Language | Haskell2010 |
Connection pools for TCP clients and UNIX Socket clients, later is not supported on Windows.
This package is built on top of
resource-pool and
streaming-commons
packages. The later allows us to use
conduit-extra package for
implementing TCP and UNIX Sockets clients. Package conduit-extra defines
appSource
and appSink
based on abstractions from streaming-commons
package and they can be therefore reused. Difference between using
conduit-extra or streaming-commons is that instead of using
runTCPClient
(or its lifted variant runGeneralTCPClient
from
conduit-extra) one would use withTcpClientConnection
, and instead of
runUnixClient
it would be withUnixClientConnection
. There is also more
generic function named withConnection
, which takes either ConnectionPool
instance.
- data family ConnectionPool :: k -> *
- data ResourcePoolParams
- numberOfResourcesPerStripe :: Functor f => (Int -> f Int) -> ResourcePoolParams -> f ResourcePoolParams
- numberOfStripes :: Functor f => (Int -> f Int) -> ResourcePoolParams -> f ResourcePoolParams
- resourceIdleTimeout :: Functor f => (NominalDiffTime -> f NominalDiffTime) -> ResourcePoolParams -> f ResourcePoolParams
- validateResourcePoolParams :: ResourcePoolParams -> Either String ResourcePoolParams
- data TcpClient
- data ClientSettings :: *
- data AppData :: *
- createTcpClientPool :: ResourcePoolParams -> ClientSettings -> IO (ConnectionPool TcpClient)
- withTcpClientConnection :: (MonadBaseControl io m, io ~ IO) => ConnectionPool TcpClient -> (AppData -> m r) -> m r
- tryWithTcpClientConnection :: (MonadBaseControl io m, io ~ IO) => ConnectionPool TcpClient -> (AppData -> m r) -> m (Maybe r)
- destroyAllTcpClientConnections :: ConnectionPool TcpClient -> IO ()
- data UnixClient
- data ClientSettingsUnix :: *
- data AppDataUnix :: *
- createUnixClientPool :: ResourcePoolParams -> ClientSettingsUnix -> IO (ConnectionPool UnixClient)
- withUnixClientConnection :: (MonadBaseControl io m, io ~ IO) => ConnectionPool UnixClient -> (AppDataUnix -> m r) -> m r
- tryWithUnixClientConnection :: (MonadBaseControl io m, io ~ IO) => ConnectionPool UnixClient -> (AppDataUnix -> m r) -> m (Maybe r)
- destroyAllUnixClientConnections :: ConnectionPool UnixClient -> IO ()
- class ConnectionPoolFor protocol where
- type HandlerData protocol
- withConnection :: MonadBaseControl IO m => ConnectionPool protocol -> (HandlerData protocol -> m r) -> m r
- tryWithConnection :: MonadBaseControl IO m => ConnectionPool protocol -> (HandlerData protocol -> m r) -> m (Maybe r)
- destroyAllConnections :: ConnectionPool protocol -> IO ()
TCP Client Example
Here is a simple example that demonstrates how TCP client can be created and how connection pool behaves.
{-# LANGUAGE OverloadedStrings #-} module Main (main) where import Control.Concurrent ( forkIO , newEmptyMVar , putMVar , readMVar , threadDelay ) import Control.Monad (void, mapM_) import System.Environment (getArgs) import Control.Lens ((.~), (&)) import Data.ConnectionPool (createTcpClientPool
,numberOfResourcesPerStripe
,numberOfStripes
,withTcpClientConnection
) import Data.Default.Class (Default(def)) import Data.Streaming.Network (appWrite
,clientSettingsTCP
) main :: IO () main = do [port, numStripes, numPerStripe] <- getArgs pool <-createTcpClientPool
(poolParams numStripes numPerStripe) (clientSettingsTCP
(read port) "127.0.0.1") thread1 <- newEmptyMVar thread2 <- newEmptyMVar void . forkIO .withTcpClientConnection
pool $ \appData -> do threadDelay 1000appWrite
appData "1: I'm alive!\n" putMVar thread1 () void . forkIO .withTcpClientConnection
pool $ \appData -> doappWrite
appData "2: I'm alive!\n" putMVar thread2 () mapM_ readMVar [thread1, thread2] where poolParams m n =def
&numberOfStripes
.~ read m &numberOfResourcesPerStripe
.~ read n
To test it we can use socat
or some netcat
like application. Our test
will require two terminals, in one we will execute socat
as a server
listenting on UNIX socket and in the other one we execute above example.
Simple TCP server listening on port 8001
that prints what it receives to
stdout:
$ socat TCP4-LISTEN:8001,bind=127.0.0.1,fork -
The fork
parameter in the above example is important, otherwise socat
would terminate when client closes its connection.
If we run above example as:
$ runghc tcp-example.hs 8001 1 1
We can see that socat
received following text:
1: I'm alive! 2: I'm alive!
But if we increment number of stripes or number of connections (resources) per stripe, then we will get:
2: I'm alive! 1: I'm alive!
The reason for this is that we use threadDelay 100
in the first executed
thread. So when we have only one stripe and one connection per stripe, then
we have only one connection in the pool. Therefore when the first thread
executes and acquires a connection, then all the other threads (the other
one in above example) will block. If we have more then one connection
available in our pool, then the first thread acquires connection, blocks on
threadDelay
call, but the other thread also acquires connection and prints
its output while the first thread is still blocked on threadDelay
. This
example demonstrates how connection pool behaves if it reached its capacity
and when it has enough free resources.
Unix Client Example
Here is a simple example that demonstrates how UNIX Sockets client can be created and how connection pool behaves.
{-# LANGUAGE OverloadedStrings #-} module Main (main) where import Control.Concurrent ( forkIO , newEmptyMVar , putMVar , readMVar , threadDelay ) import Control.Monad (void, mapM_) import System.Environment (getArgs) import Control.Lens ((.~), (&)) import Data.ConnectionPool (createUnixClientPool
,numberOfResourcesPerStripe
,numberOfStripes
,withUnixClientConnection
) import Data.Default.Class (Default(def)) import Data.Streaming.Network (appWrite
,clientSettingsUnix
) main :: IO () main = do [socket, numStripes, numPerStripe] <- getArgs pool <-createUnixClientPool
(poolParams numStripes numPerStripe) (clientSettingsUnix
socket) thread1 <- newEmptyMVar thread2 <- newEmptyMVar void . forkIO .withUnixClientConnection
pool $ \appData -> do threadDelay 100appWrite
appData "1: I'm alive!\n" putMVar thread1 () void . forkIO .withUnixClientConnection
pool $ \appData -> doappWrite
appData "2: I'm alive!\n" putMVar thread2 () mapM_ readMVar [thread1, thread2] where poolParams m n =def
&numberOfStripes
.~ read m &numberOfResourcesPerStripe
.~ read n
Above example is very similar to our TCP Client Example and most notably the
implementation of two client threads is the same. Testing it is very similar
to testing TCP Client Example, but we would use different command for
socat
and for executing the example.
Simple UNIX socket server that prints what it receives to stdout:
$ socat UNIX-LISTEN:test.sock,fork -
Parameter fork
has the same importance as when we used it in the command
for running TCP server.
We can execute UNIX Sockets Example using:
$ runghc unix-sockets-example.hs test.sock 1 1
Result of the test will be the same in case of using one stripe and one connection per stripe, and when we increase total number connections, to what we had with the TCP Client Example.
Connection Pool
For each supported protocol we have a ConnectionPool
data family instance
that is tagged with supported protocol. Currently it can be either
TcpClient
or UnixClient
. This way we are able to use same core
implementation for both and only need to deviate from common code where
necessary.
Under the hood we use Socket
to represent connections and
that limits possible implementations of ConnectionPool
instances to
protocols supported by network
package.
Those interested in details should look in to Data.ConnectionPool.Internal.ConnectionPool and Data.ConnectionPool.Internal.ConnectionPoolFamily modules.
data family ConnectionPool :: k -> * Source
Family of connection pools parametrised by transport protocol.
Definition changed version 0.2 to be kind polymorphic (only on GHC >= 7.10) and became part of stable API by being moved in to Data.ConnectionPool.Family module.
HasConnectionPool HandlerParams Socket () (ConnectionPool * UnixClient) Source | Since version 0.2. |
HasConnectionPool HandlerParams Socket SockAddr (ConnectionPool * TcpClient) Source | Since version 0.2. |
Show (ConnectionPool * TcpClient) | |
Show (ConnectionPool * UnixClient) | |
Generic (ConnectionPool * TcpClient) | |
Generic (ConnectionPool * UnixClient) | |
data ConnectionPool * TcpClient = TcpConnectionPool (ConnectionPool HandlerParams Socket SockAddr) Source | Connection pool for TCP clients. Definition changed in version 0.1.3 and 0.2.
Instances for |
data ConnectionPool * UnixClient = UnixConnectionPool (ConnectionPool HandlerParams Socket ()) Source | Connection pool for UNIX Socket clients. Definition changed in version 0.1.3 and 0.2.
Instances for |
type Rep (ConnectionPool * TcpClient) | |
type Rep (ConnectionPool * UnixClient) |
Constructing Connection Pool
For each protocol we provide separate function that creates ConnectionPool
instance. For TCP clients it's createTcpClientPool
and for UNIX Socket
clients it's createUnixClientPool
(not available on Windows).
In each case two kinds of values need to be provided as parameters to such functions:
- Parameters of underlying resource pool like how to organize stripes and parameters for algorithm that handles resource releasing, etc.
- Transport protocol parameters like IP address, port, UNIX Socket file, and similar.
To simplify things we provide ResourcePoolParams
data type that is
accepted by concrete constructors of ConnectionPool
instances and it wraps
all common connection pool parameters. And for protocol specific settings
this package reuses data types from streaming-commons library.
As a result, of the above, type signature of function that creates
connection pool for some protocol named MyProtocol
could look like:
createMyProtocolPool ::ResourcePoolParams
-> MyProtocolParams ->IO
(ConnectionPool
MyProtocol)
To further simplify things this package defines default value for
ResourcePoolParams
using Default
type class that has
only one method named def
. Instance of this class is
declared using minimal possible values of each parameter required by
underlying resource pool. In example, to specify connection pool with 2
stripes with 8 connections in each stripe, but keeping connection idle
timeout on its default value, we can simply use:
def
&numberOfStripes
.~ 2 &numberOfResourcesPerStripe
.~ 8
Where functions &
and .~
are defined by
lens package.
data ResourcePoolParams Source
Parameters of resource pool that describe things like its internal
structure. See createPool
for details.
Instance for Generic
introduced in version 0.2.
Lenses
For details on how to use leses as these see lens package where you might find a good starting point documentation for you.
numberOfResourcesPerStripe :: Functor f => (Int -> f Int) -> ResourcePoolParams -> f ResourcePoolParams Source
Lens for accessing maximum number of resources to keep open per stripe. The smallest acceptable value is 1 (default).
numberOfStripes :: Functor f => (Int -> f Int) -> ResourcePoolParams -> f ResourcePoolParams Source
Lens for accessing stripe count. The number of distinct sub-pools to maintain. The smallest acceptable value is 1 (default).
resourceIdleTimeout :: Functor f => (NominalDiffTime -> f NominalDiffTime) -> ResourcePoolParams -> f ResourcePoolParams Source
Lens for accessing amount of time for which an unused resource is kept open. The smallest acceptable value is 0.5 seconds (default).
Validation
Sometimes one needs to validate parameters as early as possible, e.g. while parsing command line options.
Usage example:
validateResourcePoolParams
$ someParams &resourceIdleTimeout
.~ 1 &numberOfResourcesPerStripe
.~ 16
Most usually one would use def
instead of someParams
.
Functions &
and .~
are defined in
lens package.
validateResourcePoolParams Source
:: ResourcePoolParams | Parameters to validate. |
-> Either String ResourcePoolParams | Either error message or the same value of |
Check if all parameters for underlying resource pool are valid:
Number of connection sub-pools. Keeping it set tonumberOfStripes
>= 11
is good for most applications.
Maximum number of connections in each stripe. Totally there can benumberOfResourcesPerStripe
>= 1
open connections simultaneously.numberOfStripes
*numberOfResourcesPerStripe
Property specified for how long connection will be kept alive after it is released by back to the pool before it is automatically closed. Value is in seconds.resourceIdleTimeout
>= 0.5
For more details see createPool
.
Since version 0.1.1.0.
TCP Client Connection Pool
Type tag used to specialize connection pool for TCP clients.
Instance for Generic
introduced in version 0.2.
Generic TcpClient Source | |
ConnectionPoolFor * TcpClient Source | Defined using:
Since version 0.2. |
HasConnectionPool HandlerParams Socket SockAddr (ConnectionPool * TcpClient) Source | Since version 0.2. |
Show (ConnectionPool * TcpClient) Source | |
Generic (ConnectionPool * TcpClient) Source | |
type Rep TcpClient Source | |
data ConnectionPool * TcpClient = TcpConnectionPool (ConnectionPool HandlerParams Socket SockAddr) Source | Connection pool for TCP clients. Definition changed in version 0.1.3 and 0.2.
Instances for |
type HandlerData * TcpClient = AppData Source | |
type Rep (ConnectionPool * TcpClient) Source |
data ClientSettings :: *
Settings for a TCP client, specifying how to connect to the server.
HasPort ClientSettings | |
HasReadBufferSize ClientSettings | Since 0.1.13 |
createTcpClientPool :: ResourcePoolParams -> ClientSettings -> IO (ConnectionPool TcpClient) Source
Create connection pool for TCP clients.
withTcpClientConnection :: (MonadBaseControl io m, io ~ IO) => ConnectionPool TcpClient -> (AppData -> m r) -> m r Source
Temporarily take a TCP connection from a pool, run client with it, and
return it to the pool afterwards. For details how connections are allocated
see withResource
.
tryWithTcpClientConnection :: (MonadBaseControl io m, io ~ IO) => ConnectionPool TcpClient -> (AppData -> m r) -> m (Maybe r) Source
Similar to withConnection
, but only performs action if a TCP connection
could be taken from the pool without blocking. Otherwise,
tryWithResource
returns immediately with Nothing
(ie. the action
function is not called). Conversely, if a connection can be acquired from
the pool without blocking, the action is performed and it's result is
returned, wrapped in a Just
.
Since version 0.2.
destroyAllTcpClientConnections :: ConnectionPool TcpClient -> IO () Source
Destroy all TCP connections that might be still open in a connection pool. This is useful when one needs to release all resources at once and not to wait for idle timeout to be reached.
For more details see destroyAllResources
.
Since version 0.1.1.0.
UNIX Client Connection Pool
data UnixClient Source
Type tag used to specialize connection pool for UNIX Socket clients.
Instance for Generic
introduced in version 0.2.
Generic UnixClient Source | |
ConnectionPoolFor * UnixClient Source | Defined using:
Since version 0.2. |
HasConnectionPool HandlerParams Socket () (ConnectionPool * UnixClient) Source | Since version 0.2. |
Show (ConnectionPool * UnixClient) Source | |
Generic (ConnectionPool * UnixClient) Source | |
type Rep UnixClient Source | |
data ConnectionPool * UnixClient = UnixConnectionPool (ConnectionPool HandlerParams Socket ()) Source | Connection pool for UNIX Socket clients. Definition changed in version 0.1.3 and 0.2.
Instances for |
type HandlerData * UnixClient = AppDataUnix Source | |
type Rep (ConnectionPool * UnixClient) Source |
data ClientSettingsUnix :: *
Settings for a Unix domain sockets client.
HasPath ClientSettingsUnix | |
HasReadBufferSize ClientSettingsUnix | Since 0.1.14 |
createUnixClientPool :: ResourcePoolParams -> ClientSettingsUnix -> IO (ConnectionPool UnixClient) Source
Create connection pool for UNIX Sockets clients.
withUnixClientConnection :: (MonadBaseControl io m, io ~ IO) => ConnectionPool UnixClient -> (AppDataUnix -> m r) -> m r Source
Temporarily take a UNIX Sockets connection from a pool, run client with
it, and return it to the pool afterwards. For details how connections are
allocated see withResource
.
tryWithUnixClientConnection :: (MonadBaseControl io m, io ~ IO) => ConnectionPool UnixClient -> (AppDataUnix -> m r) -> m (Maybe r) Source
Similar to withConnection
, but only performs action if a UNIX Sockets
connection could be taken from the pool without blocking. Otherwise,
tryWithResource
returns immediately with Nothing
(ie. the action
function is not called). Conversely, if a connection can be acquired from
the pool without blocking, the action is performed and it's result is
returned, wrapped in a Just
.
Since version 0.2.
destroyAllUnixClientConnections :: ConnectionPool UnixClient -> IO () Source
Destroy all UNIX Sockets connections that might be still open in a connection pool. This is useful when one needs to release all resources at once and not to wait for idle timeout to be reached.
For more details see destroyAllResources
.
Since version 0.1.1.0.
Polymorphic Interface
Since version 0.2.
class ConnectionPoolFor protocol where Source
Type class for common connection pool operations. It intentionally doesn't handle connection pool creation, which is best left to dedicated smart constructors.
Since version 0.2.
type HandlerData protocol Source
Data passed to individual connection handler.
withConnection :: MonadBaseControl IO m => ConnectionPool protocol -> (HandlerData protocol -> m r) -> m r Source
Temporarily take a connection from a pool, run handler with it, and return it to the pool afterwards.
Since version 0.2.
tryWithConnection :: MonadBaseControl IO m => ConnectionPool protocol -> (HandlerData protocol -> m r) -> m (Maybe r) Source
Similar to withConnection
, but only performs action if a connection
could be taken from the pool without blocking. Otherwise,
tryWithResource
returns immediately with Nothing
(ie. the action
function is not called). Conversely, if a connection can be acquired
from the pool without blocking, the action is performed and it's result
is returned, wrapped in a Just
.
Since version 0.2.
destroyAllConnections :: ConnectionPool protocol -> IO () Source
Destroy all connections that might be still open in a connection pool. This is useful when one needs to release all resources at once and not to wait for idle timeout to be reached.
Since version 0.2.
ConnectionPoolFor * TcpClient Source | Defined using:
Since version 0.2. |
ConnectionPoolFor * UnixClient Source | Defined using:
Since version 0.2. |