{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TypeFamilies #-}
module Database.EventStore.Internal.Operation.Transaction
( transactionStart
, transactionWrite
, transactionCommit
) where
import Data.Int
import Data.Maybe
import Data.ProtocolBuffers
import Database.EventStore.Internal.Command
import Database.EventStore.Internal.Operation
import Database.EventStore.Internal.Operation.Transaction.Message
import Database.EventStore.Internal.Operation.Write.Common
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Settings
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Types
transactionStart :: Settings
-> Text
-> ExpectedVersion
-> Maybe Credentials
-> Operation Int64
transactionStart Settings{..} stream exp_v cred = construct $ do
let msg = newStart stream (expVersionInt64 exp_v) s_requireMaster
resp <- send transactionStartCmd transactionStartCompletedCmd cred msg
let tid = getField $ _transId resp
r = getField $ _result resp
case r of
OP_PREPARE_TIMEOUT -> retry
OP_FORWARD_TIMEOUT -> retry
OP_COMMIT_TIMEOUT -> retry
OP_WRONG_EXPECTED_VERSION -> wrongVersion stream exp_v
OP_STREAM_DELETED -> streamDeleted $ StreamName stream
OP_INVALID_TRANSACTION -> invalidTransaction
OP_ACCESS_DENIED -> accessDenied $ StreamName stream
OP_SUCCESS -> yield tid
transactionWrite :: Settings
-> Text
-> ExpectedVersion
-> Int64
-> [Event]
-> Maybe Credentials
-> Operation ()
transactionWrite Settings{..} stream exp_v trans_id evts cred = construct $ do
nevts <- traverse eventToNewEvent evts
let msg = newWrite trans_id nevts s_requireMaster
resp <- send transactionWriteCmd transactionWriteCompletedCmd cred msg
let r = getField $ _wwResult resp
case r of
OP_PREPARE_TIMEOUT -> retry
OP_FORWARD_TIMEOUT -> retry
OP_COMMIT_TIMEOUT -> retry
OP_WRONG_EXPECTED_VERSION -> wrongVersion stream exp_v
OP_STREAM_DELETED -> streamDeleted $ StreamName stream
OP_INVALID_TRANSACTION -> invalidTransaction
OP_ACCESS_DENIED -> accessDenied $ StreamName stream
OP_SUCCESS -> yield ()
transactionCommit :: Settings
-> Text
-> ExpectedVersion
-> Int64
-> Maybe Credentials
-> Operation WriteResult
transactionCommit Settings{..} stream exp_v trans_id cred = construct $ do
let msg = newCommit trans_id s_requireMaster
resp <- send transactionCommitCmd transactionCommitCompletedCmd cred msg
let r = getField $ _ccResult resp
com_pos = getField $ _commitPosition resp
pre_pos = getField $ _preparePosition resp
lst_num = getField $ _lastNumber resp
p_int = fromMaybe (-1) pre_pos
c_int = fromMaybe (-1) com_pos
pos = Position c_int p_int
res = WriteResult lst_num pos
case r of
OP_PREPARE_TIMEOUT -> retry
OP_FORWARD_TIMEOUT -> retry
OP_COMMIT_TIMEOUT -> retry
OP_WRONG_EXPECTED_VERSION -> wrongVersion stream exp_v
OP_STREAM_DELETED -> streamDeleted $ StreamName stream
OP_INVALID_TRANSACTION -> invalidTransaction
OP_ACCESS_DENIED -> accessDenied $ StreamName stream
OP_SUCCESS -> yield res