module Database.RethinkDB
( Handle
, defaultPort, newHandle, handleDatabase, close, serverInfo
, run, nextChunk, collect
, start, continue, stop, wait, nextResult
, Token, Error(..), Response(..), ChangeNotification(..)
, Datum(..), Array, Object, ToDatum(..), FromDatum(..)
, (.=), (.:), (.:?), object
, Exp(..), SomeExp(..)
, Bound(..), Order(..)
, Sequence(..)
, Table, Database, SingleSelection
, Res, Result, FromResponse
, ConflictResolutionStrategy(..)
, emptyOptions
, lift
, call1, call2
, IsDatum, IsObject, IsSequence
) where
import Control.Monad
import Control.Concurrent
import Control.Concurrent.STM
import Data.Monoid ((<>))
import Data.Text (Text)
import qualified Data.Text as T
import Data.Vector (Vector)
import qualified Data.Vector as V
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import qualified Data.Aeson.Types as A
import Data.Sequence (Seq, ViewR(..))
import qualified Data.Sequence as S
import Data.IORef
import Network.Socket (Socket)
import Database.RethinkDB.Types
import Database.RethinkDB.Types.Datum
import Database.RethinkDB.Messages
data Handle = Handle
{ hSocket :: !(MVar Socket)
, hTokenRef :: !(IORef Token)
, hError :: !(TVar (Maybe Error))
, hResponses :: !(TVar (Map Token (Seq (Either Error Response))))
, hReader :: !ThreadId
, hDatabase :: !(Exp Database)
}
defaultPort :: Int
defaultPort = 28015
newHandle :: Text -> Int -> Maybe Text -> Exp Database -> IO Handle
newHandle host port mbAuth db = do
sock <- createSocket host port
sendMessage sock (handshakeMessage mbAuth)
_reply <- recvMessage sock handshakeReplyParser
err <- newTVarIO Nothing
responses <- newTVarIO M.empty
readerThreadId <- forkIO $ forever $ do
res <- recvMessage sock responseMessageParser
case res of
Left e -> atomically $ do
mbError <- readTVar err
case mbError of
Nothing -> writeTVar err (Just e)
Just _ -> pure ()
Right (Left (token, msg)) -> atomically $ modifyTVar' responses $
M.insertWith mappend token (S.singleton $ Left $ ProtocolError $ T.pack msg)
Right (Right r) -> atomically $ modifyTVar' responses $
M.insertWith mappend (responseToken r) (S.singleton $ Right r)
return ()
Handle
<$> newMVar sock
<*> newIORef 1
<*> pure err
<*> pure responses
<*> pure readerThreadId
<*> pure db
handleDatabase :: Handle -> Exp Database
handleDatabase = hDatabase
close :: Handle -> IO ()
close handle = do
withMVar (hSocket handle) closeSocket
killThread (hReader handle)
serverInfo :: Handle -> IO (Either Error ServerInfo)
serverInfo handle = do
token <- atomicModifyIORef' (hTokenRef handle) (\x -> (x + 1, x))
withMVar (hSocket handle) $ \socket ->
sendMessage socket (queryMessage token $ singleElementArray 5)
nextResult handle token
run :: (FromResponse (Result a)) => Handle -> Exp a -> IO (Res a)
run handle expr = do
token <- start handle expr
nextResult handle token
nextChunk :: (FromResponse (Sequence a))
=> Handle -> Sequence a -> IO (Either Error (Sequence a))
nextChunk _ (Done _) = return $ Left $ ProtocolError ""
nextChunk handle (Partial token _) = do
continue handle token
nextResult handle token
collect :: (FromDatum a) => Handle -> Sequence a -> IO (Either Error (Vector a))
collect _ (Done x) = return $ Right x
collect handle s@(Partial _ x) = do
chunk <- nextChunk handle s
case chunk of
Left e -> return $ Left e
Right r -> do
vals <- collect handle r
case vals of
Left ve -> return $ Left ve
Right v -> return $ Right $ x <> v
start :: Handle -> Exp a -> IO Token
start handle term = do
token <- atomicModifyIORef' (hTokenRef handle) (\x -> (x + 1, x))
withMVar (hSocket handle) $ \socket ->
sendMessage socket (queryMessage token msg)
return token
where
msg = compileTerm (hDatabase handle) $ do
term' <- toTerm term
options' <- toTerm emptyOptions
return $ A.Array $ V.fromList
[ A.Number 1
, term'
, A.toJSON $ options'
]
singleElementArray :: Int -> A.Value
singleElementArray x = A.Array $ V.singleton $ A.Number $ fromIntegral x
continue :: Handle -> Token -> IO ()
continue handle token = withMVar (hSocket handle) $ \socket ->
sendMessage socket (queryMessage token $ singleElementArray 2)
stop :: Handle -> Token -> IO ()
stop handle token = withMVar (hSocket handle) $ \socket ->
sendMessage socket (queryMessage token $ singleElementArray 3)
wait :: Handle -> Token -> IO ()
wait handle token = withMVar (hSocket handle) $ \socket ->
sendMessage socket (queryMessage token $ singleElementArray 4)
responseForToken :: Handle -> Token -> IO (Either Error Response)
responseForToken h token = atomically $ do
m <- readTVar (hResponses h)
case M.lookup token m of
Nothing -> retry
Just s -> case S.viewr s of
EmptyR -> retry
rest :> a -> do
modifyTVar' (hResponses h) $ if S.null rest
then M.delete token
else M.insert token rest
pure a
nextResult :: (FromResponse a) => Handle -> Token -> IO (Either Error a)
nextResult h token = do
mbError <- atomically $ readTVar (hError h)
case mbError of
Just err -> return $ Left err
Nothing -> do
errorOrResponse <- responseForToken h token
case errorOrResponse of
Left err -> return $ Left err
Right response -> case responseType response of
ClientErrorType -> mkError response ClientError
CompileErrorType -> mkError response CompileError
RuntimeErrorType -> mkError response RuntimeError
_ -> return $ parseMessage parseResponse response Right
parseMessage :: (a -> A.Parser b) -> a -> (b -> Either Error c) -> Either Error c
parseMessage parser value f = case A.parseEither parser value of
Left e -> Left $ ProtocolError $ T.pack e
Right v -> f v
mkError :: Response -> (Text -> Error) -> IO (Either Error a)
mkError r e = return $ case V.toList (responseResult r) of
[a] -> parseMessage A.parseJSON a (Left . e)
_ -> Left $ ProtocolError $ "mkError: Could not parse error" <> T.pack (show (responseResult r))