module Hasql.Pool
(
Pool,
acquire,
use,
release,
UsageError (..),
)
where
import qualified Data.Text.Encoding as Text
import qualified Data.Text.Encoding.Error as Text
import qualified Data.UUID.V4 as Uuid
import Hasql.Connection (Connection)
import qualified Hasql.Connection as Connection
import qualified Hasql.Pool.Config.Config as Config
import Hasql.Pool.Observation
import Hasql.Pool.Prelude
import qualified Hasql.Session as Session
data Entry = Entry
{ Entry -> Connection
entryConnection :: Connection,
Entry -> Word64
entryCreationTimeNSec :: Word64,
Entry -> Word64
entryUseTimeNSec :: Word64,
Entry -> UUID
entryId :: UUID
}
entryIsAged :: Word64 -> Word64 -> Entry -> Bool
entryIsAged :: Word64 -> Word64 -> Entry -> Bool
entryIsAged Word64
maxLifetime Word64
now Entry {Word64
Connection
UUID
entryConnection :: Entry -> Connection
entryCreationTimeNSec :: Entry -> Word64
entryUseTimeNSec :: Entry -> Word64
entryId :: Entry -> UUID
entryConnection :: Connection
entryCreationTimeNSec :: Word64
entryUseTimeNSec :: Word64
entryId :: UUID
..} =
Word64
now Word64 -> Word64 -> Bool
forall a. Ord a => a -> a -> Bool
> Word64
entryCreationTimeNSec Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ Word64
maxLifetime
entryIsIdle :: Word64 -> Word64 -> Entry -> Bool
entryIsIdle :: Word64 -> Word64 -> Entry -> Bool
entryIsIdle Word64
maxIdletime Word64
now Entry {Word64
Connection
UUID
entryConnection :: Entry -> Connection
entryCreationTimeNSec :: Entry -> Word64
entryUseTimeNSec :: Entry -> Word64
entryId :: Entry -> UUID
entryConnection :: Connection
entryCreationTimeNSec :: Word64
entryUseTimeNSec :: Word64
entryId :: UUID
..} =
Word64
now Word64 -> Word64 -> Bool
forall a. Ord a => a -> a -> Bool
> Word64
entryUseTimeNSec Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ Word64
maxIdletime
data Pool = Pool
{
Pool -> Int
poolSize :: Int,
Pool -> IO Settings
poolFetchConnectionSettings :: IO Connection.Settings,
Pool -> Int
poolAcquisitionTimeout :: Int,
Pool -> Word64
poolMaxLifetime :: Word64,
Pool -> Word64
poolMaxIdletime :: Word64,
Pool -> TQueue Entry
poolConnectionQueue :: TQueue Entry,
Pool -> TVar Int
poolCapacity :: TVar Int,
Pool -> TVar (TVar Bool)
poolReuseVar :: TVar (TVar Bool),
Pool -> IORef ()
poolReaperRef :: IORef (),
Pool -> Observation -> IO ()
poolObserver :: Observation -> IO ()
}
acquire :: Config.Config -> IO Pool
acquire :: Config -> IO Pool
acquire Config
config = do
TQueue Entry
connectionQueue <- IO (TQueue Entry)
forall a. IO (TQueue a)
newTQueueIO
TVar Int
capVar <- Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO (Config -> Int
Config.size Config
config)
TVar (TVar Bool)
reuseVar <- TVar Bool -> IO (TVar (TVar Bool))
forall a. a -> IO (TVar a)
newTVarIO (TVar Bool -> IO (TVar (TVar Bool)))
-> IO (TVar Bool) -> IO (TVar (TVar Bool))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
True
IORef ()
reaperRef <- () -> IO (IORef ())
forall a. a -> IO (IORef a)
newIORef ()
ThreadId
managerTid <- ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOWithUnmask (((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask -> IO () -> IO ()
forall a. IO a -> IO a
unmask (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Int -> IO ()
threadDelay Int
1000000
Word64
now <- IO Word64
getMonotonicTimeNSec
IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ()) -> IO ())
-> (STM (IO ()) -> IO (IO ())) -> STM (IO ()) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. STM (IO ()) -> IO (IO ())
forall a. STM a -> IO a
atomically (STM (IO ()) -> IO ()) -> STM (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ do
[Entry]
entries <- TQueue Entry -> STM [Entry]
forall a. TQueue a -> STM [a]
flushTQueue TQueue Entry
connectionQueue
let ([Entry]
agedEntries, [Entry]
unagedEntries) = (Entry -> Bool) -> [Entry] -> ([Entry], [Entry])
forall a. (a -> Bool) -> [a] -> ([a], [a])
partition (Word64 -> Word64 -> Entry -> Bool
entryIsAged Word64
agingTimeoutNanos Word64
now) [Entry]
entries
([Entry]
idleEntries, [Entry]
liveEntries) = (Entry -> Bool) -> [Entry] -> ([Entry], [Entry])
forall a. (a -> Bool) -> [a] -> ([a], [a])
partition (Word64 -> Word64 -> Entry -> Bool
entryIsIdle Word64
agingTimeoutNanos Word64
now) [Entry]
unagedEntries
(Entry -> STM ()) -> [Entry] -> STM ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (TQueue Entry -> Entry -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue Entry
connectionQueue) [Entry]
liveEntries
IO () -> STM (IO ())
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO () -> STM (IO ())) -> IO () -> STM (IO ())
forall a b. (a -> b) -> a -> b
$ do
[Entry] -> (Entry -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Entry]
agedEntries ((Entry -> IO ()) -> IO ()) -> (Entry -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Entry
entry -> do
Connection -> IO ()
Connection.release (Entry -> Connection
entryConnection Entry
entry)
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
capVar Int -> Int
forall a. Enum a => a -> a
succ
(Config -> Observation -> IO ()
Config.observationHandler Config
config) (UUID -> ConnectionStatus -> Observation
ConnectionObservation (Entry -> UUID
entryId Entry
entry) (ConnectionTerminationReason -> ConnectionStatus
TerminatedConnectionStatus ConnectionTerminationReason
AgingConnectionTerminationReason))
[Entry] -> (Entry -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Entry]
idleEntries ((Entry -> IO ()) -> IO ()) -> (Entry -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Entry
entry -> do
Connection -> IO ()
Connection.release (Entry -> Connection
entryConnection Entry
entry)
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
capVar Int -> Int
forall a. Enum a => a -> a
succ
(Config -> Observation -> IO ()
Config.observationHandler Config
config) (UUID -> ConnectionStatus -> Observation
ConnectionObservation (Entry -> UUID
entryId Entry
entry) (ConnectionTerminationReason -> ConnectionStatus
TerminatedConnectionStatus ConnectionTerminationReason
IdlenessConnectionTerminationReason))
IO (Weak (IORef ())) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Weak (IORef ())) -> IO ())
-> (IO () -> IO (Weak (IORef ()))) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. IORef () -> IO () -> IO (Weak (IORef ()))
forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef ()
reaperRef (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
ThreadId -> IO ()
killThread ThreadId
managerTid
Pool -> IO Pool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Pool -> IO Pool) -> Pool -> IO Pool
forall a b. (a -> b) -> a -> b
$ Int
-> IO Settings
-> Int
-> Word64
-> Word64
-> TQueue Entry
-> TVar Int
-> TVar (TVar Bool)
-> IORef ()
-> (Observation -> IO ())
-> Pool
Pool (Config -> Int
Config.size Config
config) (Config -> IO Settings
Config.connectionSettingsProvider Config
config) Int
acqTimeoutMicros Word64
agingTimeoutNanos Word64
maxIdletimeNanos TQueue Entry
connectionQueue TVar Int
capVar TVar (TVar Bool)
reuseVar IORef ()
reaperRef (Config -> Observation -> IO ()
Config.observationHandler Config
config)
where
acqTimeoutMicros :: Int
acqTimeoutMicros =
Int -> Int -> Int
forall a. Integral a => a -> a -> a
div (Integer -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (DiffTime -> Integer
diffTimeToPicoseconds (Config -> DiffTime
Config.acquisitionTimeout Config
config))) Int
1_000_000
agingTimeoutNanos :: Word64
agingTimeoutNanos =
Word64 -> Word64 -> Word64
forall a. Integral a => a -> a -> a
div (Integer -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (DiffTime -> Integer
diffTimeToPicoseconds (Config -> DiffTime
Config.agingTimeout Config
config))) Word64
1_000
maxIdletimeNanos :: Word64
maxIdletimeNanos =
Word64 -> Word64 -> Word64
forall a. Integral a => a -> a -> a
div (Integer -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (DiffTime -> Integer
diffTimeToPicoseconds (Config -> DiffTime
Config.idlenessTimeout Config
config))) Word64
1_000
release :: Pool -> IO ()
release :: Pool -> IO ()
release Pool {Int
IO Settings
Word64
TVar Int
TVar (TVar Bool)
IORef ()
TQueue Entry
Observation -> IO ()
poolSize :: Pool -> Int
poolFetchConnectionSettings :: Pool -> IO Settings
poolAcquisitionTimeout :: Pool -> Int
poolMaxLifetime :: Pool -> Word64
poolMaxIdletime :: Pool -> Word64
poolConnectionQueue :: Pool -> TQueue Entry
poolCapacity :: Pool -> TVar Int
poolReuseVar :: Pool -> TVar (TVar Bool)
poolReaperRef :: Pool -> IORef ()
poolObserver :: Pool -> Observation -> IO ()
poolSize :: Int
poolFetchConnectionSettings :: IO Settings
poolAcquisitionTimeout :: Int
poolMaxLifetime :: Word64
poolMaxIdletime :: Word64
poolConnectionQueue :: TQueue Entry
poolCapacity :: TVar Int
poolReuseVar :: TVar (TVar Bool)
poolReaperRef :: IORef ()
poolObserver :: Observation -> IO ()
..} =
IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ()) -> IO ())
-> (STM (IO ()) -> IO (IO ())) -> STM (IO ()) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. STM (IO ()) -> IO (IO ())
forall a. STM a -> IO a
atomically (STM (IO ()) -> IO ()) -> STM (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TVar Bool
prevReuse <- TVar (TVar Bool) -> STM (TVar Bool)
forall a. TVar a -> STM a
readTVar TVar (TVar Bool)
poolReuseVar
TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
prevReuse Bool
False
TVar Bool
newReuse <- Bool -> STM (TVar Bool)
forall a. a -> STM (TVar a)
newTVar Bool
True
TVar (TVar Bool) -> TVar Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (TVar Bool)
poolReuseVar TVar Bool
newReuse
[Entry]
entries <- TQueue Entry -> STM [Entry]
forall a. TQueue a -> STM [a]
flushTQueue TQueue Entry
poolConnectionQueue
IO () -> STM (IO ())
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO () -> STM (IO ())) -> IO () -> STM (IO ())
forall a b. (a -> b) -> a -> b
$ [Entry] -> (Entry -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Entry]
entries ((Entry -> IO ()) -> IO ()) -> (Entry -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Entry
entry -> do
Connection -> IO ()
Connection.release (Entry -> Connection
entryConnection Entry
entry)
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
poolCapacity Int -> Int
forall a. Enum a => a -> a
succ
Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation (Entry -> UUID
entryId Entry
entry) (ConnectionTerminationReason -> ConnectionStatus
TerminatedConnectionStatus ConnectionTerminationReason
ReleaseConnectionTerminationReason))
use :: Pool -> Session.Session a -> IO (Either UsageError a)
use :: forall a. Pool -> Session a -> IO (Either UsageError a)
use Pool {Int
IO Settings
Word64
TVar Int
TVar (TVar Bool)
IORef ()
TQueue Entry
Observation -> IO ()
poolSize :: Pool -> Int
poolFetchConnectionSettings :: Pool -> IO Settings
poolAcquisitionTimeout :: Pool -> Int
poolMaxLifetime :: Pool -> Word64
poolMaxIdletime :: Pool -> Word64
poolConnectionQueue :: Pool -> TQueue Entry
poolCapacity :: Pool -> TVar Int
poolReuseVar :: Pool -> TVar (TVar Bool)
poolReaperRef :: Pool -> IORef ()
poolObserver :: Pool -> Observation -> IO ()
poolSize :: Int
poolFetchConnectionSettings :: IO Settings
poolAcquisitionTimeout :: Int
poolMaxLifetime :: Word64
poolMaxIdletime :: Word64
poolConnectionQueue :: TQueue Entry
poolCapacity :: TVar Int
poolReuseVar :: TVar (TVar Bool)
poolReaperRef :: IORef ()
poolObserver :: Observation -> IO ()
..} Session a
sess = do
STM Bool
timeout <- do
TVar Bool
delay <- Int -> IO (TVar Bool)
registerDelay Int
poolAcquisitionTimeout
STM Bool -> IO (STM Bool)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (STM Bool -> IO (STM Bool)) -> STM Bool -> IO (STM Bool)
forall a b. (a -> b) -> a -> b
$ TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
delay
IO (IO (Either UsageError a)) -> IO (Either UsageError a)
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO (Either UsageError a)) -> IO (Either UsageError a))
-> (STM (IO (Either UsageError a))
-> IO (IO (Either UsageError a)))
-> STM (IO (Either UsageError a))
-> IO (Either UsageError a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. STM (IO (Either UsageError a)) -> IO (IO (Either UsageError a))
forall a. STM a -> IO a
atomically (STM (IO (Either UsageError a)) -> IO (Either UsageError a))
-> STM (IO (Either UsageError a)) -> IO (Either UsageError a)
forall a b. (a -> b) -> a -> b
$ do
TVar Bool
reuseVar <- TVar (TVar Bool) -> STM (TVar Bool)
forall a. TVar a -> STM a
readTVar TVar (TVar Bool)
poolReuseVar
[STM (IO (Either UsageError a))] -> STM (IO (Either UsageError a))
forall (t :: * -> *) (f :: * -> *) a.
(Foldable t, Alternative f) =>
t (f a) -> f a
asum
[ TQueue Entry -> STM Entry
forall a. TQueue a -> STM a
readTQueue TQueue Entry
poolConnectionQueue STM Entry
-> (Entry -> IO (Either UsageError a))
-> STM (IO (Either UsageError a))
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> TVar Bool -> Entry -> IO (Either UsageError a)
onConn TVar Bool
reuseVar,
do
Int
capVal <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
poolCapacity
if Int
capVal Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
then do
TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
poolCapacity (Int -> STM ()) -> Int -> STM ()
forall a b. (a -> b) -> a -> b
$! Int -> Int
forall a. Enum a => a -> a
pred Int
capVal
IO (Either UsageError a) -> STM (IO (Either UsageError a))
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO (Either UsageError a) -> STM (IO (Either UsageError a)))
-> IO (Either UsageError a) -> STM (IO (Either UsageError a))
forall a b. (a -> b) -> a -> b
$ TVar Bool -> IO (Either UsageError a)
onNewConn TVar Bool
reuseVar
else STM (IO (Either UsageError a))
forall a. STM a
retry,
do
Bool
timedOut <- STM Bool
timeout
if Bool
timedOut
then IO (Either UsageError a) -> STM (IO (Either UsageError a))
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO (Either UsageError a) -> STM (IO (Either UsageError a)))
-> (UsageError -> IO (Either UsageError a))
-> UsageError
-> STM (IO (Either UsageError a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. Either UsageError a -> IO (Either UsageError a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either UsageError a -> IO (Either UsageError a))
-> (UsageError -> Either UsageError a)
-> UsageError
-> IO (Either UsageError a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. UsageError -> Either UsageError a
forall a b. a -> Either a b
Left (UsageError -> STM (IO (Either UsageError a)))
-> UsageError -> STM (IO (Either UsageError a))
forall a b. (a -> b) -> a -> b
$ UsageError
AcquisitionTimeoutUsageError
else STM (IO (Either UsageError a))
forall a. STM a
retry
]
where
onNewConn :: TVar Bool -> IO (Either UsageError a)
onNewConn TVar Bool
reuseVar = do
Settings
settings <- IO Settings
poolFetchConnectionSettings
Word64
now <- IO Word64
getMonotonicTimeNSec
UUID
id <- IO UUID
Uuid.nextRandom
Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation UUID
id ConnectionStatus
ConnectingConnectionStatus)
Either ConnectionError Connection
connRes <- Settings -> IO (Either ConnectionError Connection)
Connection.acquire Settings
settings
case Either ConnectionError Connection
connRes of
Left ConnectionError
connErr -> do
Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation UUID
id (ConnectionTerminationReason -> ConnectionStatus
TerminatedConnectionStatus (Maybe Text -> ConnectionTerminationReason
NetworkErrorConnectionTerminationReason ((Settings -> Text) -> ConnectionError -> Maybe Text
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (OnDecodeError -> Settings -> Text
Text.decodeUtf8With OnDecodeError
Text.lenientDecode) ConnectionError
connErr))))
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
poolCapacity Int -> Int
forall a. Enum a => a -> a
succ
Either UsageError a -> IO (Either UsageError a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either UsageError a -> IO (Either UsageError a))
-> Either UsageError a -> IO (Either UsageError a)
forall a b. (a -> b) -> a -> b
$ UsageError -> Either UsageError a
forall a b. a -> Either a b
Left (UsageError -> Either UsageError a)
-> UsageError -> Either UsageError a
forall a b. (a -> b) -> a -> b
$ ConnectionError -> UsageError
ConnectionUsageError ConnectionError
connErr
Right Connection
entry -> do
Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation UUID
id ConnectionStatus
ReadyForUseConnectionStatus)
TVar Bool -> Entry -> IO (Either UsageError a)
onLiveConn TVar Bool
reuseVar (Connection -> Word64 -> Word64 -> UUID -> Entry
Entry Connection
entry Word64
now Word64
now UUID
id)
onConn :: TVar Bool -> Entry -> IO (Either UsageError a)
onConn TVar Bool
reuseVar Entry
entry = do
Word64
now <- IO Word64
getMonotonicTimeNSec
if Word64 -> Word64 -> Entry -> Bool
entryIsAged Word64
poolMaxLifetime Word64
now Entry
entry
then do
Connection -> IO ()
Connection.release (Entry -> Connection
entryConnection Entry
entry)
Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation (Entry -> UUID
entryId Entry
entry) (ConnectionTerminationReason -> ConnectionStatus
TerminatedConnectionStatus ConnectionTerminationReason
AgingConnectionTerminationReason))
TVar Bool -> IO (Either UsageError a)
onNewConn TVar Bool
reuseVar
else
if Word64 -> Word64 -> Entry -> Bool
entryIsIdle Word64
poolMaxIdletime Word64
now Entry
entry
then do
Connection -> IO ()
Connection.release (Entry -> Connection
entryConnection Entry
entry)
Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation (Entry -> UUID
entryId Entry
entry) (ConnectionTerminationReason -> ConnectionStatus
TerminatedConnectionStatus ConnectionTerminationReason
IdlenessConnectionTerminationReason))
TVar Bool -> IO (Either UsageError a)
onNewConn TVar Bool
reuseVar
else do
TVar Bool -> Entry -> IO (Either UsageError a)
onLiveConn TVar Bool
reuseVar Entry
entry {entryUseTimeNSec = now}
onLiveConn :: TVar Bool -> Entry -> IO (Either UsageError a)
onLiveConn TVar Bool
reuseVar Entry
entry = do
Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation (Entry -> UUID
entryId Entry
entry) ConnectionStatus
InUseConnectionStatus)
Either SomeException (Either QueryError a)
sessRes <- forall e a. Exception e => IO a -> IO (Either e a)
try @SomeException (Session a -> Connection -> IO (Either QueryError a)
forall a. Session a -> Connection -> IO (Either QueryError a)
Session.run Session a
sess (Entry -> Connection
entryConnection Entry
entry))
case Either SomeException (Either QueryError a)
sessRes of
Left SomeException
exc -> do
IO ()
returnConn
SomeException -> IO (Either UsageError a)
forall e a. Exception e => e -> IO a
throwIO SomeException
exc
Right (Left QueryError
err) -> case QueryError
err of
Session.QueryError Settings
_ [Text]
_ (Session.ClientError ConnectionError
details) -> do
Connection -> IO ()
Connection.release (Entry -> Connection
entryConnection Entry
entry)
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
poolCapacity Int -> Int
forall a. Enum a => a -> a
succ
Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation (Entry -> UUID
entryId Entry
entry) (ConnectionTerminationReason -> ConnectionStatus
TerminatedConnectionStatus (Maybe Text -> ConnectionTerminationReason
NetworkErrorConnectionTerminationReason ((Settings -> Text) -> ConnectionError -> Maybe Text
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (OnDecodeError -> Settings -> Text
Text.decodeUtf8With OnDecodeError
Text.lenientDecode) ConnectionError
details))))
Either UsageError a -> IO (Either UsageError a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either UsageError a -> IO (Either UsageError a))
-> Either UsageError a -> IO (Either UsageError a)
forall a b. (a -> b) -> a -> b
$ UsageError -> Either UsageError a
forall a b. a -> Either a b
Left (UsageError -> Either UsageError a)
-> UsageError -> Either UsageError a
forall a b. (a -> b) -> a -> b
$ QueryError -> UsageError
SessionUsageError QueryError
err
QueryError
_ -> do
IO ()
returnConn
Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation (Entry -> UUID
entryId Entry
entry) ConnectionStatus
ReadyForUseConnectionStatus)
Either UsageError a -> IO (Either UsageError a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either UsageError a -> IO (Either UsageError a))
-> Either UsageError a -> IO (Either UsageError a)
forall a b. (a -> b) -> a -> b
$ UsageError -> Either UsageError a
forall a b. a -> Either a b
Left (UsageError -> Either UsageError a)
-> UsageError -> Either UsageError a
forall a b. (a -> b) -> a -> b
$ QueryError -> UsageError
SessionUsageError QueryError
err
Right (Right a
res) -> do
IO ()
returnConn
Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation (Entry -> UUID
entryId Entry
entry) ConnectionStatus
ReadyForUseConnectionStatus)
Either UsageError a -> IO (Either UsageError a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either UsageError a -> IO (Either UsageError a))
-> Either UsageError a -> IO (Either UsageError a)
forall a b. (a -> b) -> a -> b
$ a -> Either UsageError a
forall a b. b -> Either a b
Right a
res
where
returnConn :: IO ()
returnConn =
IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ()) -> IO ())
-> (STM (IO ()) -> IO (IO ())) -> STM (IO ()) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. STM (IO ()) -> IO (IO ())
forall a. STM a -> IO a
atomically (STM (IO ()) -> IO ()) -> STM (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Bool
reuse <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
reuseVar
if Bool
reuse
then TQueue Entry -> Entry -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue Entry
poolConnectionQueue Entry
entry STM () -> IO () -> STM (IO ())
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
else IO () -> STM (IO ())
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO () -> STM (IO ())) -> IO () -> STM (IO ())
forall a b. (a -> b) -> a -> b
$ do
Connection -> IO ()
Connection.release (Entry -> Connection
entryConnection Entry
entry)
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
poolCapacity Int -> Int
forall a. Enum a => a -> a
succ
Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation (Entry -> UUID
entryId Entry
entry) (ConnectionTerminationReason -> ConnectionStatus
TerminatedConnectionStatus ConnectionTerminationReason
ReleaseConnectionTerminationReason))
data UsageError
=
ConnectionUsageError Connection.ConnectionError
|
SessionUsageError Session.QueryError
|
AcquisitionTimeoutUsageError
deriving (Int -> UsageError -> ShowS
[UsageError] -> ShowS
UsageError -> String
(Int -> UsageError -> ShowS)
-> (UsageError -> String)
-> ([UsageError] -> ShowS)
-> Show UsageError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> UsageError -> ShowS
showsPrec :: Int -> UsageError -> ShowS
$cshow :: UsageError -> String
show :: UsageError -> String
$cshowList :: [UsageError] -> ShowS
showList :: [UsageError] -> ShowS
Show, UsageError -> UsageError -> Bool
(UsageError -> UsageError -> Bool)
-> (UsageError -> UsageError -> Bool) -> Eq UsageError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: UsageError -> UsageError -> Bool
== :: UsageError -> UsageError -> Bool
$c/= :: UsageError -> UsageError -> Bool
/= :: UsageError -> UsageError -> Bool
Eq)
instance Exception UsageError