{-# LANGUAGE QuasiQuotes #-}
module MessageDb.Functions
( WithConnection,
BatchSize (..),
Condition (..),
ConsumerGroup (..),
Correlation (..),
StreamVersion (..),
ExpectedVersion (..),
ExpectedVersionViolation (..),
parseExpectedVersionViolation,
lookupById,
lookupByPosition,
writeMessage,
getStreamMessages,
getCategoryMessages,
getLastStreamMessage,
streamVersion,
)
where
import Control.Exception (Exception, handle, throwIO)
import qualified Data.Aeson as Aeson
import qualified Data.ByteString.Char8 as Char8
import Data.Coerce (Coercible, coerce)
import Data.Maybe (listToMaybe)
import Data.String (IsString)
import Data.Text (Text)
import qualified Data.Time as Time
import qualified Data.UUID as UUID
import qualified Database.PostgreSQL.Simple as Postgres
import qualified Database.PostgreSQL.Simple.FromField as FromField
import Database.PostgreSQL.Simple.FromRow (RowParser, field, fieldWith)
import Database.PostgreSQL.Simple.SqlQQ (sql)
import MessageDb.Message (Message (Message))
import qualified MessageDb.Message as Message
import MessageDb.StreamName (CategoryName, StreamName (StreamName), categoryNameToText, streamNameToText)
import MessageDb.Units (NumberOfMessages)
import Numeric.Natural (Natural)
type WithConnection = forall records. (Postgres.Connection -> IO records) -> IO records
data ConsumerGroup = ConsumerGroup
{ ConsumerGroup -> Natural
consumerGroupMember :: Natural
, ConsumerGroup -> Natural
consumerGroupSize :: Natural
}
deriving (Int -> ConsumerGroup -> ShowS
[ConsumerGroup] -> ShowS
ConsumerGroup -> String
(Int -> ConsumerGroup -> ShowS)
-> (ConsumerGroup -> String)
-> ([ConsumerGroup] -> ShowS)
-> Show ConsumerGroup
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ConsumerGroup] -> ShowS
$cshowList :: [ConsumerGroup] -> ShowS
show :: ConsumerGroup -> String
$cshow :: ConsumerGroup -> String
showsPrec :: Int -> ConsumerGroup -> ShowS
$cshowsPrec :: Int -> ConsumerGroup -> ShowS
Show, ConsumerGroup -> ConsumerGroup -> Bool
(ConsumerGroup -> ConsumerGroup -> Bool)
-> (ConsumerGroup -> ConsumerGroup -> Bool) -> Eq ConsumerGroup
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ConsumerGroup -> ConsumerGroup -> Bool
$c/= :: ConsumerGroup -> ConsumerGroup -> Bool
== :: ConsumerGroup -> ConsumerGroup -> Bool
$c== :: ConsumerGroup -> ConsumerGroup -> Bool
Eq)
newtype Condition = Condition
{ Condition -> Text
conditionToText :: Text
}
deriving (Condition -> Condition -> Bool
(Condition -> Condition -> Bool)
-> (Condition -> Condition -> Bool) -> Eq Condition
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Condition -> Condition -> Bool
$c/= :: Condition -> Condition -> Bool
== :: Condition -> Condition -> Bool
$c== :: Condition -> Condition -> Bool
Eq, Eq Condition
Eq Condition
-> (Condition -> Condition -> Ordering)
-> (Condition -> Condition -> Bool)
-> (Condition -> Condition -> Bool)
-> (Condition -> Condition -> Bool)
-> (Condition -> Condition -> Bool)
-> (Condition -> Condition -> Condition)
-> (Condition -> Condition -> Condition)
-> Ord Condition
Condition -> Condition -> Bool
Condition -> Condition -> Ordering
Condition -> Condition -> Condition
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: Condition -> Condition -> Condition
$cmin :: Condition -> Condition -> Condition
max :: Condition -> Condition -> Condition
$cmax :: Condition -> Condition -> Condition
>= :: Condition -> Condition -> Bool
$c>= :: Condition -> Condition -> Bool
> :: Condition -> Condition -> Bool
$c> :: Condition -> Condition -> Bool
<= :: Condition -> Condition -> Bool
$c<= :: Condition -> Condition -> Bool
< :: Condition -> Condition -> Bool
$c< :: Condition -> Condition -> Bool
compare :: Condition -> Condition -> Ordering
$ccompare :: Condition -> Condition -> Ordering
$cp1Ord :: Eq Condition
Ord, [Condition] -> Encoding
[Condition] -> Value
Condition -> Encoding
Condition -> Value
(Condition -> Value)
-> (Condition -> Encoding)
-> ([Condition] -> Value)
-> ([Condition] -> Encoding)
-> ToJSON Condition
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> ToJSON a
toEncodingList :: [Condition] -> Encoding
$ctoEncodingList :: [Condition] -> Encoding
toJSONList :: [Condition] -> Value
$ctoJSONList :: [Condition] -> Value
toEncoding :: Condition -> Encoding
$ctoEncoding :: Condition -> Encoding
toJSON :: Condition -> Value
$ctoJSON :: Condition -> Value
Aeson.ToJSON, Value -> Parser [Condition]
Value -> Parser Condition
(Value -> Parser Condition)
-> (Value -> Parser [Condition]) -> FromJSON Condition
forall a.
(Value -> Parser a) -> (Value -> Parser [a]) -> FromJSON a
parseJSONList :: Value -> Parser [Condition]
$cparseJSONList :: Value -> Parser [Condition]
parseJSON :: Value -> Parser Condition
$cparseJSON :: Value -> Parser Condition
Aeson.FromJSON, String -> Condition
(String -> Condition) -> IsString Condition
forall a. (String -> a) -> IsString a
fromString :: String -> Condition
$cfromString :: String -> Condition
IsString)
deriving (Int -> Condition -> ShowS
[Condition] -> ShowS
Condition -> String
(Int -> Condition -> ShowS)
-> (Condition -> String)
-> ([Condition] -> ShowS)
-> Show Condition
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Condition] -> ShowS
$cshowList :: [Condition] -> ShowS
show :: Condition -> String
$cshow :: Condition -> String
showsPrec :: Int -> Condition -> ShowS
$cshowsPrec :: Int -> Condition -> ShowS
Show) via Text
newtype Correlation = Correlation
{ Correlation -> Text
correlationToText :: Text
}
deriving (Correlation -> Correlation -> Bool
(Correlation -> Correlation -> Bool)
-> (Correlation -> Correlation -> Bool) -> Eq Correlation
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Correlation -> Correlation -> Bool
$c/= :: Correlation -> Correlation -> Bool
== :: Correlation -> Correlation -> Bool
$c== :: Correlation -> Correlation -> Bool
Eq, Eq Correlation
Eq Correlation
-> (Correlation -> Correlation -> Ordering)
-> (Correlation -> Correlation -> Bool)
-> (Correlation -> Correlation -> Bool)
-> (Correlation -> Correlation -> Bool)
-> (Correlation -> Correlation -> Bool)
-> (Correlation -> Correlation -> Correlation)
-> (Correlation -> Correlation -> Correlation)
-> Ord Correlation
Correlation -> Correlation -> Bool
Correlation -> Correlation -> Ordering
Correlation -> Correlation -> Correlation
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: Correlation -> Correlation -> Correlation
$cmin :: Correlation -> Correlation -> Correlation
max :: Correlation -> Correlation -> Correlation
$cmax :: Correlation -> Correlation -> Correlation
>= :: Correlation -> Correlation -> Bool
$c>= :: Correlation -> Correlation -> Bool
> :: Correlation -> Correlation -> Bool
$c> :: Correlation -> Correlation -> Bool
<= :: Correlation -> Correlation -> Bool
$c<= :: Correlation -> Correlation -> Bool
< :: Correlation -> Correlation -> Bool
$c< :: Correlation -> Correlation -> Bool
compare :: Correlation -> Correlation -> Ordering
$ccompare :: Correlation -> Correlation -> Ordering
$cp1Ord :: Eq Correlation
Ord, [Correlation] -> Encoding
[Correlation] -> Value
Correlation -> Encoding
Correlation -> Value
(Correlation -> Value)
-> (Correlation -> Encoding)
-> ([Correlation] -> Value)
-> ([Correlation] -> Encoding)
-> ToJSON Correlation
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> ToJSON a
toEncodingList :: [Correlation] -> Encoding
$ctoEncodingList :: [Correlation] -> Encoding
toJSONList :: [Correlation] -> Value
$ctoJSONList :: [Correlation] -> Value
toEncoding :: Correlation -> Encoding
$ctoEncoding :: Correlation -> Encoding
toJSON :: Correlation -> Value
$ctoJSON :: Correlation -> Value
Aeson.ToJSON, Value -> Parser [Correlation]
Value -> Parser Correlation
(Value -> Parser Correlation)
-> (Value -> Parser [Correlation]) -> FromJSON Correlation
forall a.
(Value -> Parser a) -> (Value -> Parser [a]) -> FromJSON a
parseJSONList :: Value -> Parser [Correlation]
$cparseJSONList :: Value -> Parser [Correlation]
parseJSON :: Value -> Parser Correlation
$cparseJSON :: Value -> Parser Correlation
Aeson.FromJSON, String -> Correlation
(String -> Correlation) -> IsString Correlation
forall a. (String -> a) -> IsString a
fromString :: String -> Correlation
$cfromString :: String -> Correlation
IsString)
deriving (Int -> Correlation -> ShowS
[Correlation] -> ShowS
Correlation -> String
(Int -> Correlation -> ShowS)
-> (Correlation -> String)
-> ([Correlation] -> ShowS)
-> Show Correlation
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Correlation] -> ShowS
$cshowList :: [Correlation] -> ShowS
show :: Correlation -> String
$cshow :: Correlation -> String
showsPrec :: Int -> Correlation -> ShowS
$cshowsPrec :: Int -> Correlation -> ShowS
Show) via Text
data StreamVersion
= DoesNotExist
| DoesExist Message.StreamPosition
deriving (Int -> StreamVersion -> ShowS
[StreamVersion] -> ShowS
StreamVersion -> String
(Int -> StreamVersion -> ShowS)
-> (StreamVersion -> String)
-> ([StreamVersion] -> ShowS)
-> Show StreamVersion
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [StreamVersion] -> ShowS
$cshowList :: [StreamVersion] -> ShowS
show :: StreamVersion -> String
$cshow :: StreamVersion -> String
showsPrec :: Int -> StreamVersion -> ShowS
$cshowsPrec :: Int -> StreamVersion -> ShowS
Show, StreamVersion -> StreamVersion -> Bool
(StreamVersion -> StreamVersion -> Bool)
-> (StreamVersion -> StreamVersion -> Bool) -> Eq StreamVersion
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: StreamVersion -> StreamVersion -> Bool
$c/= :: StreamVersion -> StreamVersion -> Bool
== :: StreamVersion -> StreamVersion -> Bool
$c== :: StreamVersion -> StreamVersion -> Bool
Eq, Eq StreamVersion
Eq StreamVersion
-> (StreamVersion -> StreamVersion -> Ordering)
-> (StreamVersion -> StreamVersion -> Bool)
-> (StreamVersion -> StreamVersion -> Bool)
-> (StreamVersion -> StreamVersion -> Bool)
-> (StreamVersion -> StreamVersion -> Bool)
-> (StreamVersion -> StreamVersion -> StreamVersion)
-> (StreamVersion -> StreamVersion -> StreamVersion)
-> Ord StreamVersion
StreamVersion -> StreamVersion -> Bool
StreamVersion -> StreamVersion -> Ordering
StreamVersion -> StreamVersion -> StreamVersion
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: StreamVersion -> StreamVersion -> StreamVersion
$cmin :: StreamVersion -> StreamVersion -> StreamVersion
max :: StreamVersion -> StreamVersion -> StreamVersion
$cmax :: StreamVersion -> StreamVersion -> StreamVersion
>= :: StreamVersion -> StreamVersion -> Bool
$c>= :: StreamVersion -> StreamVersion -> Bool
> :: StreamVersion -> StreamVersion -> Bool
$c> :: StreamVersion -> StreamVersion -> Bool
<= :: StreamVersion -> StreamVersion -> Bool
$c<= :: StreamVersion -> StreamVersion -> Bool
< :: StreamVersion -> StreamVersion -> Bool
$c< :: StreamVersion -> StreamVersion -> Bool
compare :: StreamVersion -> StreamVersion -> Ordering
$ccompare :: StreamVersion -> StreamVersion -> Ordering
$cp1Ord :: Eq StreamVersion
Ord)
newtype ExpectedVersion = ExpectedVersion
{ ExpectedVersion -> StreamVersion
expectedToStreamVersion :: StreamVersion
}
deriving (Int -> ExpectedVersion -> ShowS
[ExpectedVersion] -> ShowS
ExpectedVersion -> String
(Int -> ExpectedVersion -> ShowS)
-> (ExpectedVersion -> String)
-> ([ExpectedVersion] -> ShowS)
-> Show ExpectedVersion
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ExpectedVersion] -> ShowS
$cshowList :: [ExpectedVersion] -> ShowS
show :: ExpectedVersion -> String
$cshow :: ExpectedVersion -> String
showsPrec :: Int -> ExpectedVersion -> ShowS
$cshowsPrec :: Int -> ExpectedVersion -> ShowS
Show, ExpectedVersion -> ExpectedVersion -> Bool
(ExpectedVersion -> ExpectedVersion -> Bool)
-> (ExpectedVersion -> ExpectedVersion -> Bool)
-> Eq ExpectedVersion
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ExpectedVersion -> ExpectedVersion -> Bool
$c/= :: ExpectedVersion -> ExpectedVersion -> Bool
== :: ExpectedVersion -> ExpectedVersion -> Bool
$c== :: ExpectedVersion -> ExpectedVersion -> Bool
Eq, Eq ExpectedVersion
Eq ExpectedVersion
-> (ExpectedVersion -> ExpectedVersion -> Ordering)
-> (ExpectedVersion -> ExpectedVersion -> Bool)
-> (ExpectedVersion -> ExpectedVersion -> Bool)
-> (ExpectedVersion -> ExpectedVersion -> Bool)
-> (ExpectedVersion -> ExpectedVersion -> Bool)
-> (ExpectedVersion -> ExpectedVersion -> ExpectedVersion)
-> (ExpectedVersion -> ExpectedVersion -> ExpectedVersion)
-> Ord ExpectedVersion
ExpectedVersion -> ExpectedVersion -> Bool
ExpectedVersion -> ExpectedVersion -> Ordering
ExpectedVersion -> ExpectedVersion -> ExpectedVersion
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: ExpectedVersion -> ExpectedVersion -> ExpectedVersion
$cmin :: ExpectedVersion -> ExpectedVersion -> ExpectedVersion
max :: ExpectedVersion -> ExpectedVersion -> ExpectedVersion
$cmax :: ExpectedVersion -> ExpectedVersion -> ExpectedVersion
>= :: ExpectedVersion -> ExpectedVersion -> Bool
$c>= :: ExpectedVersion -> ExpectedVersion -> Bool
> :: ExpectedVersion -> ExpectedVersion -> Bool
$c> :: ExpectedVersion -> ExpectedVersion -> Bool
<= :: ExpectedVersion -> ExpectedVersion -> Bool
$c<= :: ExpectedVersion -> ExpectedVersion -> Bool
< :: ExpectedVersion -> ExpectedVersion -> Bool
$c< :: ExpectedVersion -> ExpectedVersion -> Bool
compare :: ExpectedVersion -> ExpectedVersion -> Ordering
$ccompare :: ExpectedVersion -> ExpectedVersion -> Ordering
$cp1Ord :: Eq ExpectedVersion
Ord)
versionToInteger :: Coercible a StreamVersion => a -> Integer
versionToInteger :: a -> Integer
versionToInteger a
version =
case a -> StreamVersion
coerce a
version of
StreamVersion
DoesNotExist -> -Integer
1
DoesExist StreamPosition
position -> StreamPosition -> Integer
forall a. Integral a => a -> Integer
toInteger StreamPosition
position
newtype ExpectedVersionViolation = ExpectedVersionViolation
{ ExpectedVersionViolation -> SqlError
expectedVersionViolationToSqlError :: Postgres.SqlError
}
deriving (Int -> ExpectedVersionViolation -> ShowS
[ExpectedVersionViolation] -> ShowS
ExpectedVersionViolation -> String
(Int -> ExpectedVersionViolation -> ShowS)
-> (ExpectedVersionViolation -> String)
-> ([ExpectedVersionViolation] -> ShowS)
-> Show ExpectedVersionViolation
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ExpectedVersionViolation] -> ShowS
$cshowList :: [ExpectedVersionViolation] -> ShowS
show :: ExpectedVersionViolation -> String
$cshow :: ExpectedVersionViolation -> String
showsPrec :: Int -> ExpectedVersionViolation -> ShowS
$cshowsPrec :: Int -> ExpectedVersionViolation -> ShowS
Show, ExpectedVersionViolation -> ExpectedVersionViolation -> Bool
(ExpectedVersionViolation -> ExpectedVersionViolation -> Bool)
-> (ExpectedVersionViolation -> ExpectedVersionViolation -> Bool)
-> Eq ExpectedVersionViolation
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ExpectedVersionViolation -> ExpectedVersionViolation -> Bool
$c/= :: ExpectedVersionViolation -> ExpectedVersionViolation -> Bool
== :: ExpectedVersionViolation -> ExpectedVersionViolation -> Bool
$c== :: ExpectedVersionViolation -> ExpectedVersionViolation -> Bool
Eq)
instance Exception ExpectedVersionViolation
parseExpectedVersionViolation :: Postgres.SqlError -> Maybe ExpectedVersionViolation
parseExpectedVersionViolation :: SqlError -> Maybe ExpectedVersionViolation
parseExpectedVersionViolation sqlError :: SqlError
sqlError@Postgres.SqlError{ByteString
ExecStatus
sqlState :: SqlError -> ByteString
sqlExecStatus :: SqlError -> ExecStatus
sqlErrorMsg :: SqlError -> ByteString
sqlErrorDetail :: SqlError -> ByteString
sqlErrorHint :: SqlError -> ByteString
sqlErrorHint :: ByteString
sqlErrorDetail :: ByteString
sqlErrorMsg :: ByteString
sqlExecStatus :: ExecStatus
sqlState :: ByteString
..} =
let seemsLikeTheRightErrorMessages :: Bool
seemsLikeTheRightErrorMessages =
ByteString
"Wrong expected version:" ByteString -> ByteString -> Bool
`Char8.isPrefixOf` ByteString
sqlErrorMsg
isTheCorrectErrorState :: Bool
isTheCorrectErrorState =
ByteString
sqlState ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== ByteString
"P0001"
isTheCorrectExecStatus :: Bool
isTheCorrectExecStatus =
ExecStatus
sqlExecStatus ExecStatus -> ExecStatus -> Bool
forall a. Eq a => a -> a -> Bool
== ExecStatus
Postgres.FatalError
isProbablyTheRightError :: Bool
isProbablyTheRightError =
Bool
seemsLikeTheRightErrorMessages
Bool -> Bool -> Bool
&& Bool
isTheCorrectErrorState
Bool -> Bool -> Bool
&& Bool
isTheCorrectExecStatus
in if Bool
isProbablyTheRightError
then ExpectedVersionViolation -> Maybe ExpectedVersionViolation
forall a. a -> Maybe a
Just (ExpectedVersionViolation -> Maybe ExpectedVersionViolation)
-> ExpectedVersionViolation -> Maybe ExpectedVersionViolation
forall a b. (a -> b) -> a -> b
$ SqlError -> ExpectedVersionViolation
ExpectedVersionViolation SqlError
sqlError
else Maybe ExpectedVersionViolation
forall a. Maybe a
Nothing
data BatchSize
= FixedSize NumberOfMessages
| Unlimited
deriving (Int -> BatchSize -> ShowS
[BatchSize] -> ShowS
BatchSize -> String
(Int -> BatchSize -> ShowS)
-> (BatchSize -> String)
-> ([BatchSize] -> ShowS)
-> Show BatchSize
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [BatchSize] -> ShowS
$cshowList :: [BatchSize] -> ShowS
show :: BatchSize -> String
$cshow :: BatchSize -> String
showsPrec :: Int -> BatchSize -> ShowS
$cshowsPrec :: Int -> BatchSize -> ShowS
Show, BatchSize -> BatchSize -> Bool
(BatchSize -> BatchSize -> Bool)
-> (BatchSize -> BatchSize -> Bool) -> Eq BatchSize
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: BatchSize -> BatchSize -> Bool
$c/= :: BatchSize -> BatchSize -> Bool
== :: BatchSize -> BatchSize -> Bool
$c== :: BatchSize -> BatchSize -> Bool
Eq)
batchSizeToInteger :: BatchSize -> Integer
batchSizeToInteger :: BatchSize -> Integer
batchSizeToInteger BatchSize
batchSize =
case BatchSize
batchSize of
FixedSize NumberOfMessages
size -> NumberOfMessages -> Integer
forall a. Integral a => a -> Integer
toInteger NumberOfMessages
size
BatchSize
Unlimited -> -Integer
1
createdAtField :: RowParser Message.CreatedAt
createdAtField :: RowParser CreatedAt
createdAtField =
UTCTime -> CreatedAt
Message.CreatedAt (UTCTime -> CreatedAt)
-> (LocalTime -> UTCTime) -> LocalTime -> CreatedAt
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TimeZone -> LocalTime -> UTCTime
Time.localTimeToUTC TimeZone
Time.utc (LocalTime -> CreatedAt)
-> RowParser LocalTime -> RowParser CreatedAt
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RowParser LocalTime
forall a. FromField a => RowParser a
field
streamPositionField :: RowParser Message.StreamPosition
streamPositionField :: RowParser StreamPosition
streamPositionField = do
FieldParser StreamPosition -> RowParser StreamPosition
forall a. FieldParser a -> RowParser a
fieldWith (FieldParser StreamPosition -> RowParser StreamPosition)
-> FieldParser StreamPosition -> RowParser StreamPosition
forall a b. (a -> b) -> a -> b
$ \Field
f Maybe ByteString
mdata -> do
Integer
integer <- FieldParser Integer
forall a. FromField a => FieldParser a
FromField.fromField Field
f Maybe ByteString
mdata
if Integer
integer Integer -> Integer -> Bool
forall a. Ord a => a -> a -> Bool
>= Integer
0
then StreamPosition -> Conversion StreamPosition
forall (f :: * -> *) a. Applicative f => a -> f a
pure (StreamPosition -> Conversion StreamPosition)
-> (Natural -> StreamPosition)
-> Natural
-> Conversion StreamPosition
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Natural -> StreamPosition
Message.StreamPosition (Natural -> Conversion StreamPosition)
-> Natural -> Conversion StreamPosition
forall a b. (a -> b) -> a -> b
$ Integer -> Natural
forall a. Num a => Integer -> a
fromInteger Integer
integer
else (String -> Maybe Oid -> String -> String -> String -> ResultError)
-> Field -> String -> Conversion StreamPosition
forall a err.
(Typeable a, Exception err) =>
(String -> Maybe Oid -> String -> String -> String -> err)
-> Field -> String -> Conversion a
FromField.returnError String -> Maybe Oid -> String -> String -> String -> ResultError
FromField.Incompatible Field
f String
"Stream position is negative"
fromTable :: RowParser Message
fromTable :: RowParser Message
fromTable = do
MessageId
messageId <- (UUID -> MessageId) -> RowParser UUID -> RowParser MessageId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap UUID -> MessageId
Message.MessageId RowParser UUID
forall a. FromField a => RowParser a
field
StreamName
messageStream <- (Text -> StreamName) -> RowParser Text -> RowParser StreamName
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Text -> StreamName
StreamName RowParser Text
forall a. FromField a => RowParser a
field
MessageType
messageType <- (Text -> MessageType) -> RowParser Text -> RowParser MessageType
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Text -> MessageType
Message.MessageType RowParser Text
forall a. FromField a => RowParser a
field
StreamPosition
messageStreamPosition <- RowParser StreamPosition
streamPositionField
GlobalPosition
messageGlobalPosition <- (Integer -> GlobalPosition)
-> RowParser Integer -> RowParser GlobalPosition
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Integer -> GlobalPosition
Message.GlobalPosition RowParser Integer
forall a. FromField a => RowParser a
field
Payload
messagePayload <- Payload -> (Value -> Payload) -> Maybe Value -> Payload
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Payload
Message.nullPayload Value -> Payload
Message.Payload (Maybe Value -> Payload)
-> RowParser (Maybe Value) -> RowParser Payload
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RowParser (Maybe Value)
forall a. FromField a => RowParser a
field
Metadata
messageMetadata <- Metadata -> (Value -> Metadata) -> Maybe Value -> Metadata
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Metadata
Message.nullMetadata Value -> Metadata
Message.Metadata (Maybe Value -> Metadata)
-> RowParser (Maybe Value) -> RowParser Metadata
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RowParser (Maybe Value)
forall a. FromField a => RowParser a
field
CreatedAt
messageCreatedAt <- RowParser CreatedAt
createdAtField
Message -> RowParser Message
forall (f :: * -> *) a. Applicative f => a -> f a
pure Message :: MessageId
-> StreamName
-> MessageType
-> StreamPosition
-> GlobalPosition
-> CreatedAt
-> Payload
-> Metadata
-> Message
Message{StreamName
Metadata
Payload
CreatedAt
GlobalPosition
StreamPosition
MessageType
MessageId
messageMetadata :: Metadata
messagePayload :: Payload
messageCreatedAt :: CreatedAt
messageGlobalPosition :: GlobalPosition
messageStreamPosition :: StreamPosition
messageType :: MessageType
messageStream :: StreamName
messageId :: MessageId
messageCreatedAt :: CreatedAt
messageMetadata :: Metadata
messagePayload :: Payload
messageGlobalPosition :: GlobalPosition
messageStreamPosition :: StreamPosition
messageType :: MessageType
messageStream :: StreamName
messageId :: MessageId
..}
fromFunction :: RowParser Message
fromFunction :: RowParser Message
fromFunction = do
MessageId
messageId <- FieldParser MessageId -> RowParser MessageId
forall a. FieldParser a -> RowParser a
fieldWith (FieldParser MessageId -> RowParser MessageId)
-> FieldParser MessageId -> RowParser MessageId
forall a b. (a -> b) -> a -> b
$ \Field
f Maybe ByteString
mdata -> do
Text
text <- FieldParser Text
forall a. FromField a => FieldParser a
FromField.fromField Field
f Maybe ByteString
mdata
case Text -> Maybe UUID
UUID.fromText Text
text of
Maybe UUID
Nothing -> (String -> Maybe Oid -> String -> String -> String -> ResultError)
-> Field -> String -> Conversion MessageId
forall a err.
(Typeable a, Exception err) =>
(String -> Maybe Oid -> String -> String -> String -> err)
-> Field -> String -> Conversion a
FromField.returnError String -> Maybe Oid -> String -> String -> String -> ResultError
FromField.Incompatible Field
f String
"Invalid UUID"
Just UUID
uuid -> MessageId -> Conversion MessageId
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MessageId -> Conversion MessageId)
-> MessageId -> Conversion MessageId
forall a b. (a -> b) -> a -> b
$ UUID -> MessageId
Message.MessageId UUID
uuid
StreamName
messageStream <- (Text -> StreamName) -> RowParser Text -> RowParser StreamName
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Text -> StreamName
StreamName RowParser Text
forall a. FromField a => RowParser a
field
MessageType
messageType <- (Text -> MessageType) -> RowParser Text -> RowParser MessageType
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Text -> MessageType
Message.MessageType RowParser Text
forall a. FromField a => RowParser a
field
StreamPosition
messageStreamPosition <- RowParser StreamPosition
streamPositionField
GlobalPosition
messageGlobalPosition <- (Integer -> GlobalPosition)
-> RowParser Integer -> RowParser GlobalPosition
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Integer -> GlobalPosition
Message.GlobalPosition RowParser Integer
forall a. FromField a => RowParser a
field
Payload
messagePayload <- do
Maybe ByteString
maybeByteString <- RowParser (Maybe ByteString)
forall a. FromField a => RowParser a
field
Payload -> RowParser Payload
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Payload -> RowParser Payload) -> Payload -> RowParser Payload
forall a b. (a -> b) -> a -> b
$
Payload -> (Value -> Payload) -> Maybe Value -> Payload
forall b a. b -> (a -> b) -> Maybe a -> b
maybe
Payload
Message.nullPayload
Value -> Payload
Message.Payload
(ByteString -> Maybe Value
forall a. FromJSON a => ByteString -> Maybe a
Aeson.decodeStrict (ByteString -> Maybe Value) -> Maybe ByteString -> Maybe Value
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Maybe ByteString
maybeByteString)
Metadata
messageMetadata <- do
Maybe ByteString
maybeByteString <- RowParser (Maybe ByteString)
forall a. FromField a => RowParser a
field
Metadata -> RowParser Metadata
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Metadata -> RowParser Metadata) -> Metadata -> RowParser Metadata
forall a b. (a -> b) -> a -> b
$
Metadata -> (Value -> Metadata) -> Maybe Value -> Metadata
forall b a. b -> (a -> b) -> Maybe a -> b
maybe
Metadata
Message.nullMetadata
Value -> Metadata
Message.Metadata
(ByteString -> Maybe Value
forall a. FromJSON a => ByteString -> Maybe a
Aeson.decodeStrict (ByteString -> Maybe Value) -> Maybe ByteString -> Maybe Value
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Maybe ByteString
maybeByteString)
CreatedAt
messageCreatedAt <- RowParser CreatedAt
createdAtField
Message -> RowParser Message
forall (f :: * -> *) a. Applicative f => a -> f a
pure Message :: MessageId
-> StreamName
-> MessageType
-> StreamPosition
-> GlobalPosition
-> CreatedAt
-> Payload
-> Metadata
-> Message
Message{StreamName
Metadata
Payload
CreatedAt
GlobalPosition
StreamPosition
MessageType
MessageId
messageCreatedAt :: CreatedAt
messageMetadata :: Metadata
messagePayload :: Payload
messageGlobalPosition :: GlobalPosition
messageStreamPosition :: StreamPosition
messageType :: MessageType
messageStream :: StreamName
messageId :: MessageId
messageMetadata :: Metadata
messagePayload :: Payload
messageCreatedAt :: CreatedAt
messageGlobalPosition :: GlobalPosition
messageStreamPosition :: StreamPosition
messageType :: MessageType
messageStream :: StreamName
messageId :: MessageId
..}
lookupById :: Postgres.Connection -> Message.MessageId -> IO (Maybe Message)
lookupById :: Connection -> MessageId -> IO (Maybe Message)
lookupById Connection
connection MessageId
messageId = do
let query :: Query
query =
[sql|
SELECT
id
,stream_name
,type
,position
,global_position
,data
,metadata
,time
FROM message_store.messages
WHERE id = ?;
|]
[Message]
messages <- RowParser Message
-> Connection -> Query -> Only UUID -> IO [Message]
forall q r.
ToRow q =>
RowParser r -> Connection -> Query -> q -> IO [r]
Postgres.queryWith RowParser Message
fromTable Connection
connection Query
query (UUID -> Only UUID
forall a. a -> Only a
Postgres.Only (UUID -> Only UUID) -> UUID -> Only UUID
forall a b. (a -> b) -> a -> b
$ MessageId -> UUID
Message.messageIdToUUID MessageId
messageId)
Maybe Message -> IO (Maybe Message)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Message -> IO (Maybe Message))
-> Maybe Message -> IO (Maybe Message)
forall a b. (a -> b) -> a -> b
$ [Message] -> Maybe Message
forall a. [a] -> Maybe a
listToMaybe [Message]
messages
lookupByPosition :: Postgres.Connection -> Message.GlobalPosition -> IO (Maybe Message)
lookupByPosition :: Connection -> GlobalPosition -> IO (Maybe Message)
lookupByPosition Connection
connection GlobalPosition
position = do
let query :: Query
query =
[sql|
SELECT
id
,stream_name
,type
,position
,global_position
,data
,metadata
,time
FROM message_store.messages
WHERE global_position = ?;
|]
[Message]
messages <- RowParser Message
-> Connection -> Query -> Only Integer -> IO [Message]
forall q r.
ToRow q =>
RowParser r -> Connection -> Query -> q -> IO [r]
Postgres.queryWith RowParser Message
fromTable Connection
connection Query
query (Integer -> Only Integer
forall a. a -> Only a
Postgres.Only (Integer -> Only Integer) -> Integer -> Only Integer
forall a b. (a -> b) -> a -> b
$ GlobalPosition -> Integer
Message.globalPositionToInteger GlobalPosition
position)
Maybe Message -> IO (Maybe Message)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Message -> IO (Maybe Message))
-> Maybe Message -> IO (Maybe Message)
forall a b. (a -> b) -> a -> b
$ [Message] -> Maybe Message
forall a. [a] -> Maybe a
listToMaybe [Message]
messages
writeMessage ::
( Aeson.ToJSON payload
, Aeson.ToJSON metadata
) =>
Postgres.Connection ->
StreamName ->
Message.MessageType ->
payload ->
Maybe metadata ->
Maybe ExpectedVersion ->
IO (Message.MessageId, Message.StreamPosition)
writeMessage :: Connection
-> StreamName
-> MessageType
-> payload
-> Maybe metadata
-> Maybe ExpectedVersion
-> IO (MessageId, StreamPosition)
writeMessage Connection
connection StreamName
streamName MessageType
messageType payload
payload Maybe metadata
metadata Maybe ExpectedVersion
expectedVersion = do
MessageId
messageId <- IO MessageId
Message.newMessageId
let query :: Query
query =
[sql|
SELECT message_store.write_message (
id => ?
,stream_name => ?
,type => ?
,data => ?
,metadata => ?
,expected_version => ?
);
|]
params :: (Text, Text, Text, Value, Maybe Value, Maybe Integer)
params =
( UUID -> Text
UUID.toText (UUID -> Text) -> UUID -> Text
forall a b. (a -> b) -> a -> b
$ MessageId -> UUID
Message.messageIdToUUID MessageId
messageId
, StreamName -> Text
streamNameToText StreamName
streamName
, MessageType -> Text
Message.messageTypeToText MessageType
messageType
, payload -> Value
forall a. ToJSON a => a -> Value
Aeson.toJSON payload
payload
, (metadata -> Value) -> Maybe metadata -> Maybe Value
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap metadata -> Value
forall a. ToJSON a => a -> Value
Aeson.toJSON Maybe metadata
metadata
, (ExpectedVersion -> Integer)
-> Maybe ExpectedVersion -> Maybe Integer
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ExpectedVersion -> Integer
forall a. Coercible a StreamVersion => a -> Integer
versionToInteger Maybe ExpectedVersion
expectedVersion
)
handleSqlError :: SqlError -> IO a
handleSqlError SqlError
sqlError =
case SqlError -> Maybe ExpectedVersionViolation
parseExpectedVersionViolation SqlError
sqlError of
Maybe ExpectedVersionViolation
Nothing ->
SqlError -> IO a
forall e a. Exception e => e -> IO a
throwIO SqlError
sqlError
Just ExpectedVersionViolation
expectedVersionViolation ->
ExpectedVersionViolation -> IO a
forall e a. Exception e => e -> IO a
throwIO ExpectedVersionViolation
expectedVersionViolation
[Postgres.Only Integer
position] <-
(SqlError -> IO [Only Integer])
-> IO [Only Integer] -> IO [Only Integer]
forall e a. Exception e => (e -> IO a) -> IO a -> IO a
handle SqlError -> IO [Only Integer]
forall a. SqlError -> IO a
handleSqlError (IO [Only Integer] -> IO [Only Integer])
-> IO [Only Integer] -> IO [Only Integer]
forall a b. (a -> b) -> a -> b
$
Connection
-> Query
-> (Text, Text, Text, Value, Maybe Value, Maybe Integer)
-> IO [Only Integer]
forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
Postgres.query Connection
connection Query
query (Text, Text, Text, Value, Maybe Value, Maybe Integer)
params
(MessageId, StreamPosition) -> IO (MessageId, StreamPosition)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MessageId
messageId, Integer -> StreamPosition
forall a. Num a => Integer -> a
fromInteger Integer
position)
getStreamMessages ::
Postgres.Connection ->
StreamName ->
Maybe Message.StreamPosition ->
Maybe BatchSize ->
Maybe Condition ->
IO [Message]
getStreamMessages :: Connection
-> StreamName
-> Maybe StreamPosition
-> Maybe BatchSize
-> Maybe Condition
-> IO [Message]
getStreamMessages Connection
connection StreamName
streamName Maybe StreamPosition
position Maybe BatchSize
batchSize Maybe Condition
condition =
let query :: Query
query =
[sql|
SELECT
id
,stream_name
,type
,position
,global_position
,data
,metadata
,time
FROM message_store.get_stream_messages (
stream_name => ?
,"position" => ?
,batch_size => ?
,condition => ?
);
|]
params :: (Text, Integer, Integer, Maybe Text)
params =
( StreamName -> Text
streamNameToText StreamName
streamName
, Integer
-> (StreamPosition -> Integer) -> Maybe StreamPosition -> Integer
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Integer
0 StreamPosition -> Integer
forall a. Integral a => a -> Integer
toInteger Maybe StreamPosition
position
, Integer -> (BatchSize -> Integer) -> Maybe BatchSize -> Integer
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Integer
1000 BatchSize -> Integer
batchSizeToInteger Maybe BatchSize
batchSize
, (Condition -> Text) -> Maybe Condition -> Maybe Text
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Condition -> Text
conditionToText Maybe Condition
condition
)
in RowParser Message
-> Connection
-> Query
-> (Text, Integer, Integer, Maybe Text)
-> IO [Message]
forall q r.
ToRow q =>
RowParser r -> Connection -> Query -> q -> IO [r]
Postgres.queryWith RowParser Message
fromFunction Connection
connection Query
query (Text, Integer, Integer, Maybe Text)
params
getCategoryMessages ::
Postgres.Connection ->
CategoryName ->
Maybe Message.GlobalPosition ->
Maybe BatchSize ->
Maybe Correlation ->
Maybe ConsumerGroup ->
Maybe Condition ->
IO [Message]
getCategoryMessages :: Connection
-> CategoryName
-> Maybe GlobalPosition
-> Maybe BatchSize
-> Maybe Correlation
-> Maybe ConsumerGroup
-> Maybe Condition
-> IO [Message]
getCategoryMessages Connection
connection CategoryName
category Maybe GlobalPosition
position Maybe BatchSize
batchSize Maybe Correlation
correlation Maybe ConsumerGroup
consumerGroup Maybe Condition
condition =
let query :: Query
query =
[sql|
SELECT
id
,stream_name
,type
,position
,global_position
,data
,metadata
,time
FROM message_store.get_category_messages (
category => ?
,"position" => ?
,batch_size => ?
,correlation => ?
,consumer_group_member => ?
,consumer_group_size => ?
,condition => ?
);
|]
params :: (Text, Integer, Integer, Maybe Text, Maybe Integer, Maybe Integer,
Maybe Text)
params =
( CategoryName -> Text
categoryNameToText CategoryName
category
, Integer
-> (GlobalPosition -> Integer) -> Maybe GlobalPosition -> Integer
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Integer
0 GlobalPosition -> Integer
Message.globalPositionToInteger Maybe GlobalPosition
position
, Integer -> (BatchSize -> Integer) -> Maybe BatchSize -> Integer
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Integer
1000 BatchSize -> Integer
batchSizeToInteger Maybe BatchSize
batchSize
, (Correlation -> Text) -> Maybe Correlation -> Maybe Text
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Correlation -> Text
correlationToText Maybe Correlation
correlation
, (ConsumerGroup -> Integer) -> Maybe ConsumerGroup -> Maybe Integer
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Natural -> Integer
forall a. Integral a => a -> Integer
toInteger (Natural -> Integer)
-> (ConsumerGroup -> Natural) -> ConsumerGroup -> Integer
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConsumerGroup -> Natural
consumerGroupMember) Maybe ConsumerGroup
consumerGroup
, (ConsumerGroup -> Integer) -> Maybe ConsumerGroup -> Maybe Integer
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Natural -> Integer
forall a. Integral a => a -> Integer
toInteger (Natural -> Integer)
-> (ConsumerGroup -> Natural) -> ConsumerGroup -> Integer
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConsumerGroup -> Natural
consumerGroupSize) Maybe ConsumerGroup
consumerGroup
, (Condition -> Text) -> Maybe Condition -> Maybe Text
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Condition -> Text
conditionToText Maybe Condition
condition
)
in RowParser Message
-> Connection
-> Query
-> (Text, Integer, Integer, Maybe Text, Maybe Integer,
Maybe Integer, Maybe Text)
-> IO [Message]
forall q r.
ToRow q =>
RowParser r -> Connection -> Query -> q -> IO [r]
Postgres.queryWith RowParser Message
fromFunction Connection
connection Query
query (Text, Integer, Integer, Maybe Text, Maybe Integer, Maybe Integer,
Maybe Text)
params
getLastStreamMessage :: Postgres.Connection -> StreamName -> IO (Maybe Message)
getLastStreamMessage :: Connection -> StreamName -> IO (Maybe Message)
getLastStreamMessage Connection
connection StreamName
streamName =
let query :: Query
query =
[sql|
SELECT
id
,stream_name
,type
,position
,global_position
,data
,metadata
,time
FROM message_store.get_last_stream_message (
stream_name => ?
);
|]
params :: Only Text
params =
Text -> Only Text
forall a. a -> Only a
Postgres.Only (StreamName -> Text
streamNameToText StreamName
streamName)
in [Message] -> Maybe Message
forall a. [a] -> Maybe a
listToMaybe ([Message] -> Maybe Message) -> IO [Message] -> IO (Maybe Message)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RowParser Message
-> Connection -> Query -> Only Text -> IO [Message]
forall q r.
ToRow q =>
RowParser r -> Connection -> Query -> q -> IO [r]
Postgres.queryWith RowParser Message
fromFunction Connection
connection Query
query Only Text
params
streamVersion :: Postgres.Connection -> StreamName -> IO (Maybe Message.StreamPosition)
streamVersion :: Connection -> StreamName -> IO (Maybe StreamPosition)
streamVersion Connection
connection StreamName
streamName = do
let query :: Query
query =
[sql|
SELECT message_store.stream_version (
stream_name => ?
);
|]
params :: Only Text
params =
Text -> Only Text
forall a. a -> Only a
Postgres.Only (StreamName -> Text
streamNameToText StreamName
streamName)
[Only (Maybe Integer)]
result <- Connection -> Query -> Only Text -> IO [Only (Maybe Integer)]
forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
Postgres.query Connection
connection Query
query Only Text
params
Maybe StreamPosition -> IO (Maybe StreamPosition)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe StreamPosition -> IO (Maybe StreamPosition))
-> Maybe StreamPosition -> IO (Maybe StreamPosition)
forall a b. (a -> b) -> a -> b
$ case [Only (Maybe Integer)]
result of
[Postgres.Only (Just Integer
position)] -> StreamPosition -> Maybe StreamPosition
forall a. a -> Maybe a
Just (StreamPosition -> Maybe StreamPosition)
-> StreamPosition -> Maybe StreamPosition
forall a b. (a -> b) -> a -> b
$ Integer -> StreamPosition
forall a. Num a => Integer -> a
fromInteger Integer
position
[Only (Maybe Integer)]
_ -> Maybe StreamPosition
forall a. Maybe a
Nothing