{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE MultiParamTypeClasses #-}
module Network.Riak.Cluster
( Cluster(..)
, InClusterError(..)
, connectToCluster
, inCluster
, Riak.create
, Riak.defaultClient
) where
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TMVar
import Control.Exception
import Control.Exception.Enclosed
import Control.Monad.Base (liftBase)
import Control.Monad.Catch (MonadThrow (..))
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Typeable
import Network.Riak (Connection)
import qualified Network.Riak as Riak
import qualified Network.Riak.Connection.Pool as Riak
import System.Random.Mersenne.Pure64
data Cluster = Cluster
{ Cluster -> [Pool]
clusterPools :: [Riak.Pool]
, Cluster -> TMVar PureMT
clusterGen :: TMVar PureMT
}
data InClusterError = InClusterError [SomeException]
deriving (Int -> InClusterError -> ShowS
[InClusterError] -> ShowS
InClusterError -> String
(Int -> InClusterError -> ShowS)
-> (InClusterError -> String)
-> ([InClusterError] -> ShowS)
-> Show InClusterError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [InClusterError] -> ShowS
$cshowList :: [InClusterError] -> ShowS
show :: InClusterError -> String
$cshow :: InClusterError -> String
showsPrec :: Int -> InClusterError -> ShowS
$cshowsPrec :: Int -> InClusterError -> ShowS
Show, Typeable)
instance Exception InClusterError
connectToCluster :: [Riak.Client] -> IO Cluster
connectToCluster :: [Client] -> IO Cluster
connectToCluster [Client]
clients = do
[Pool]
pools <- (Client -> IO Pool) -> [Client] -> IO [Pool]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (\Client
c -> Client -> Int -> NominalDiffTime -> Int -> IO Pool
Riak.create Client
c Int
1 NominalDiffTime
10 Int
20) [Client]
clients
[Pool] -> IO Cluster
connectToClusterWithPools [Pool]
pools
connectToClusterWithPools :: [Riak.Pool] -> IO Cluster
connectToClusterWithPools :: [Pool] -> IO Cluster
connectToClusterWithPools [Pool]
pools = do
PureMT
gen <- IO PureMT
newPureMT
TMVar PureMT
mt <- STM (TMVar PureMT) -> IO (TMVar PureMT)
forall a. STM a -> IO a
atomically (PureMT -> STM (TMVar PureMT)
forall a. a -> STM (TMVar a)
newTMVar PureMT
gen)
Cluster -> IO Cluster
forall (m :: * -> *) a. Monad m => a -> m a
return ([Pool] -> TMVar PureMT -> Cluster
Cluster [Pool]
pools TMVar PureMT
mt)
inCluster :: (MonadThrow m, MonadBaseControl IO m)
=> Cluster -> (Connection -> m a) -> m a
inCluster :: Cluster -> (Connection -> m a) -> m a
inCluster Cluster{clusterPools :: Cluster -> [Pool]
clusterPools=[Pool]
pools, clusterGen :: Cluster -> TMVar PureMT
clusterGen=TMVar PureMT
tMT} Connection -> m a
f = do
Int
rnd <- IO Int -> m Int
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$ STM Int -> IO Int
forall a. STM a -> IO a
atomically (STM Int -> IO Int) -> STM Int -> IO Int
forall a b. (a -> b) -> a -> b
$ do
PureMT
mt <- TMVar PureMT -> STM PureMT
forall a. TMVar a -> STM a
takeTMVar TMVar PureMT
tMT
let (Int
i, PureMT
mt') = PureMT -> (Int, PureMT)
randomInt PureMT
mt
TMVar PureMT -> PureMT -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar PureMT
tMT PureMT
mt'
Int -> STM Int
forall (m :: * -> *) a. Monad m => a -> m a
return Int
i
let n :: Int
n = if [Pool] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Pool]
pools then Int
0 else Int
rnd Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` [Pool] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [Pool]
pools
pools' :: [Pool]
pools' = Int -> [Pool] -> [Pool]
forall a. Int -> [a] -> [a]
rotateL Int
n [Pool]
pools
[Pool] -> [SomeException] -> m a
go [Pool]
pools' []
where
go :: [Pool] -> [SomeException] -> m a
go [] [SomeException]
errors = InClusterError -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM ([SomeException] -> InClusterError
InClusterError [SomeException]
errors)
go (Pool
p:[Pool]
ps) [SomeException]
es =
m a -> (SomeException -> m a) -> m a
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> (SomeException -> m a) -> m a
catchAny (Pool -> (Connection -> m a) -> m a
forall (m :: * -> *) a.
MonadBaseControl IO m =>
Pool -> (Connection -> m a) -> m a
Riak.withConnectionM Pool
p Connection -> m a
f) (\SomeException
e -> [Pool] -> [SomeException] -> m a
go [Pool]
ps (SomeException
eSomeException -> [SomeException] -> [SomeException]
forall a. a -> [a] -> [a]
:[SomeException]
es))
rotateL :: Int -> [a] -> [a]
rotateL :: Int -> [a] -> [a]
rotateL Int
i [a]
xs = [a]
right [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
++ [a]
left
where
([a]
left, [a]
right) = Int -> [a] -> ([a], [a])
forall a. Int -> [a] -> ([a], [a])
splitAt Int
i [a]
xs