{-# LANGUAGE QuasiQuotes #-}

-- | Provides access to the functions described at http://docs.eventide-project.org/user-guide/message-db/server-functions.html
module MessageDb.Functions
  ( WithConnection
  , BatchSize (..)
  , Condition (..)
  , ConsumerGroup (..)
  , Correlation (..)
  , StreamVersion (..)
  , ExpectedVersion (..)
  , ExpectedVersionViolation (..)
  , parseExpectedVersionViolation
  , lookupById
  , lookupByPosition
  , writeMessageWithId
  , writeMessage
  , getStreamMessages
  , getCategoryMessages
  , getLastStreamMessage
  , streamVersion
  )
where

import Control.Exception (Exception, handle, throwIO)
import qualified Data.Aeson as Aeson
import qualified Data.ByteString.Char8 as Char8
import Data.Coerce (Coercible, coerce)
import Data.Maybe (listToMaybe)
import Data.String (IsString)
import Data.Text (Text)
import qualified Data.Time as Time
import qualified Data.UUID as UUID
import qualified Database.PostgreSQL.Simple as Postgres
import qualified Database.PostgreSQL.Simple.FromField as FromField
import Database.PostgreSQL.Simple.FromRow (RowParser, field, fieldWith)
import Database.PostgreSQL.Simple.SqlQQ (sql)
import MessageDb.Message (Message (Message))
import qualified MessageDb.Message as Message
import MessageDb.StreamName (Category, StreamName (StreamName), categoryToText, streamNameToText)
import MessageDb.Units (NumberOfMessages)
import Numeric.Natural (Natural)


type WithConnection = forall records. (Postgres.Connection -> IO records) -> IO records


data ConsumerGroup = ConsumerGroup
  { ConsumerGroup -> Natural
consumerGroupMember :: Natural
  , ConsumerGroup -> Natural
consumerGroupSize :: Natural
  }
  deriving (Int -> ConsumerGroup -> ShowS
[ConsumerGroup] -> ShowS
ConsumerGroup -> String
(Int -> ConsumerGroup -> ShowS)
-> (ConsumerGroup -> String)
-> ([ConsumerGroup] -> ShowS)
-> Show ConsumerGroup
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ConsumerGroup] -> ShowS
$cshowList :: [ConsumerGroup] -> ShowS
show :: ConsumerGroup -> String
$cshow :: ConsumerGroup -> String
showsPrec :: Int -> ConsumerGroup -> ShowS
$cshowsPrec :: Int -> ConsumerGroup -> ShowS
Show, ConsumerGroup -> ConsumerGroup -> Bool
(ConsumerGroup -> ConsumerGroup -> Bool)
-> (ConsumerGroup -> ConsumerGroup -> Bool) -> Eq ConsumerGroup
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ConsumerGroup -> ConsumerGroup -> Bool
$c/= :: ConsumerGroup -> ConsumerGroup -> Bool
== :: ConsumerGroup -> ConsumerGroup -> Bool
$c== :: ConsumerGroup -> ConsumerGroup -> Bool
Eq)


newtype Condition = Condition
  { Condition -> Text
conditionToText :: Text
  }
  deriving (Condition -> Condition -> Bool
(Condition -> Condition -> Bool)
-> (Condition -> Condition -> Bool) -> Eq Condition
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Condition -> Condition -> Bool
$c/= :: Condition -> Condition -> Bool
== :: Condition -> Condition -> Bool
$c== :: Condition -> Condition -> Bool
Eq, Eq Condition
Eq Condition
-> (Condition -> Condition -> Ordering)
-> (Condition -> Condition -> Bool)
-> (Condition -> Condition -> Bool)
-> (Condition -> Condition -> Bool)
-> (Condition -> Condition -> Bool)
-> (Condition -> Condition -> Condition)
-> (Condition -> Condition -> Condition)
-> Ord Condition
Condition -> Condition -> Bool
Condition -> Condition -> Ordering
Condition -> Condition -> Condition
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: Condition -> Condition -> Condition
$cmin :: Condition -> Condition -> Condition
max :: Condition -> Condition -> Condition
$cmax :: Condition -> Condition -> Condition
>= :: Condition -> Condition -> Bool
$c>= :: Condition -> Condition -> Bool
> :: Condition -> Condition -> Bool
$c> :: Condition -> Condition -> Bool
<= :: Condition -> Condition -> Bool
$c<= :: Condition -> Condition -> Bool
< :: Condition -> Condition -> Bool
$c< :: Condition -> Condition -> Bool
compare :: Condition -> Condition -> Ordering
$ccompare :: Condition -> Condition -> Ordering
$cp1Ord :: Eq Condition
Ord, [Condition] -> Encoding
[Condition] -> Value
Condition -> Encoding
Condition -> Value
(Condition -> Value)
-> (Condition -> Encoding)
-> ([Condition] -> Value)
-> ([Condition] -> Encoding)
-> ToJSON Condition
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> ToJSON a
toEncodingList :: [Condition] -> Encoding
$ctoEncodingList :: [Condition] -> Encoding
toJSONList :: [Condition] -> Value
$ctoJSONList :: [Condition] -> Value
toEncoding :: Condition -> Encoding
$ctoEncoding :: Condition -> Encoding
toJSON :: Condition -> Value
$ctoJSON :: Condition -> Value
Aeson.ToJSON, Value -> Parser [Condition]
Value -> Parser Condition
(Value -> Parser Condition)
-> (Value -> Parser [Condition]) -> FromJSON Condition
forall a.
(Value -> Parser a) -> (Value -> Parser [a]) -> FromJSON a
parseJSONList :: Value -> Parser [Condition]
$cparseJSONList :: Value -> Parser [Condition]
parseJSON :: Value -> Parser Condition
$cparseJSON :: Value -> Parser Condition
Aeson.FromJSON, String -> Condition
(String -> Condition) -> IsString Condition
forall a. (String -> a) -> IsString a
fromString :: String -> Condition
$cfromString :: String -> Condition
IsString)
  deriving (Int -> Condition -> ShowS
[Condition] -> ShowS
Condition -> String
(Int -> Condition -> ShowS)
-> (Condition -> String)
-> ([Condition] -> ShowS)
-> Show Condition
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Condition] -> ShowS
$cshowList :: [Condition] -> ShowS
show :: Condition -> String
$cshow :: Condition -> String
showsPrec :: Int -> Condition -> ShowS
$cshowsPrec :: Int -> Condition -> ShowS
Show) via Text


newtype Correlation = Correlation
  { Correlation -> Text
correlationToText :: Text
  }
  deriving (Correlation -> Correlation -> Bool
(Correlation -> Correlation -> Bool)
-> (Correlation -> Correlation -> Bool) -> Eq Correlation
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Correlation -> Correlation -> Bool
$c/= :: Correlation -> Correlation -> Bool
== :: Correlation -> Correlation -> Bool
$c== :: Correlation -> Correlation -> Bool
Eq, Eq Correlation
Eq Correlation
-> (Correlation -> Correlation -> Ordering)
-> (Correlation -> Correlation -> Bool)
-> (Correlation -> Correlation -> Bool)
-> (Correlation -> Correlation -> Bool)
-> (Correlation -> Correlation -> Bool)
-> (Correlation -> Correlation -> Correlation)
-> (Correlation -> Correlation -> Correlation)
-> Ord Correlation
Correlation -> Correlation -> Bool
Correlation -> Correlation -> Ordering
Correlation -> Correlation -> Correlation
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: Correlation -> Correlation -> Correlation
$cmin :: Correlation -> Correlation -> Correlation
max :: Correlation -> Correlation -> Correlation
$cmax :: Correlation -> Correlation -> Correlation
>= :: Correlation -> Correlation -> Bool
$c>= :: Correlation -> Correlation -> Bool
> :: Correlation -> Correlation -> Bool
$c> :: Correlation -> Correlation -> Bool
<= :: Correlation -> Correlation -> Bool
$c<= :: Correlation -> Correlation -> Bool
< :: Correlation -> Correlation -> Bool
$c< :: Correlation -> Correlation -> Bool
compare :: Correlation -> Correlation -> Ordering
$ccompare :: Correlation -> Correlation -> Ordering
$cp1Ord :: Eq Correlation
Ord, [Correlation] -> Encoding
[Correlation] -> Value
Correlation -> Encoding
Correlation -> Value
(Correlation -> Value)
-> (Correlation -> Encoding)
-> ([Correlation] -> Value)
-> ([Correlation] -> Encoding)
-> ToJSON Correlation
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> ToJSON a
toEncodingList :: [Correlation] -> Encoding
$ctoEncodingList :: [Correlation] -> Encoding
toJSONList :: [Correlation] -> Value
$ctoJSONList :: [Correlation] -> Value
toEncoding :: Correlation -> Encoding
$ctoEncoding :: Correlation -> Encoding
toJSON :: Correlation -> Value
$ctoJSON :: Correlation -> Value
Aeson.ToJSON, Value -> Parser [Correlation]
Value -> Parser Correlation
(Value -> Parser Correlation)
-> (Value -> Parser [Correlation]) -> FromJSON Correlation
forall a.
(Value -> Parser a) -> (Value -> Parser [a]) -> FromJSON a
parseJSONList :: Value -> Parser [Correlation]
$cparseJSONList :: Value -> Parser [Correlation]
parseJSON :: Value -> Parser Correlation
$cparseJSON :: Value -> Parser Correlation
Aeson.FromJSON, String -> Correlation
(String -> Correlation) -> IsString Correlation
forall a. (String -> a) -> IsString a
fromString :: String -> Correlation
$cfromString :: String -> Correlation
IsString)
  deriving (Int -> Correlation -> ShowS
[Correlation] -> ShowS
Correlation -> String
(Int -> Correlation -> ShowS)
-> (Correlation -> String)
-> ([Correlation] -> ShowS)
-> Show Correlation
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Correlation] -> ShowS
$cshowList :: [Correlation] -> ShowS
show :: Correlation -> String
$cshow :: Correlation -> String
showsPrec :: Int -> Correlation -> ShowS
$cshowsPrec :: Int -> Correlation -> ShowS
Show) via Text


data StreamVersion
  = DoesNotExist
  | DoesExist Message.StreamPosition
  deriving (Int -> StreamVersion -> ShowS
[StreamVersion] -> ShowS
StreamVersion -> String
(Int -> StreamVersion -> ShowS)
-> (StreamVersion -> String)
-> ([StreamVersion] -> ShowS)
-> Show StreamVersion
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [StreamVersion] -> ShowS
$cshowList :: [StreamVersion] -> ShowS
show :: StreamVersion -> String
$cshow :: StreamVersion -> String
showsPrec :: Int -> StreamVersion -> ShowS
$cshowsPrec :: Int -> StreamVersion -> ShowS
Show, StreamVersion -> StreamVersion -> Bool
(StreamVersion -> StreamVersion -> Bool)
-> (StreamVersion -> StreamVersion -> Bool) -> Eq StreamVersion
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: StreamVersion -> StreamVersion -> Bool
$c/= :: StreamVersion -> StreamVersion -> Bool
== :: StreamVersion -> StreamVersion -> Bool
$c== :: StreamVersion -> StreamVersion -> Bool
Eq, Eq StreamVersion
Eq StreamVersion
-> (StreamVersion -> StreamVersion -> Ordering)
-> (StreamVersion -> StreamVersion -> Bool)
-> (StreamVersion -> StreamVersion -> Bool)
-> (StreamVersion -> StreamVersion -> Bool)
-> (StreamVersion -> StreamVersion -> Bool)
-> (StreamVersion -> StreamVersion -> StreamVersion)
-> (StreamVersion -> StreamVersion -> StreamVersion)
-> Ord StreamVersion
StreamVersion -> StreamVersion -> Bool
StreamVersion -> StreamVersion -> Ordering
StreamVersion -> StreamVersion -> StreamVersion
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: StreamVersion -> StreamVersion -> StreamVersion
$cmin :: StreamVersion -> StreamVersion -> StreamVersion
max :: StreamVersion -> StreamVersion -> StreamVersion
$cmax :: StreamVersion -> StreamVersion -> StreamVersion
>= :: StreamVersion -> StreamVersion -> Bool
$c>= :: StreamVersion -> StreamVersion -> Bool
> :: StreamVersion -> StreamVersion -> Bool
$c> :: StreamVersion -> StreamVersion -> Bool
<= :: StreamVersion -> StreamVersion -> Bool
$c<= :: StreamVersion -> StreamVersion -> Bool
< :: StreamVersion -> StreamVersion -> Bool
$c< :: StreamVersion -> StreamVersion -> Bool
compare :: StreamVersion -> StreamVersion -> Ordering
$ccompare :: StreamVersion -> StreamVersion -> Ordering
$cp1Ord :: Eq StreamVersion
Ord)


newtype ExpectedVersion = ExpectedVersion
  { ExpectedVersion -> StreamVersion
expectedToStreamVersion :: StreamVersion
  }
  deriving (Int -> ExpectedVersion -> ShowS
[ExpectedVersion] -> ShowS
ExpectedVersion -> String
(Int -> ExpectedVersion -> ShowS)
-> (ExpectedVersion -> String)
-> ([ExpectedVersion] -> ShowS)
-> Show ExpectedVersion
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ExpectedVersion] -> ShowS
$cshowList :: [ExpectedVersion] -> ShowS
show :: ExpectedVersion -> String
$cshow :: ExpectedVersion -> String
showsPrec :: Int -> ExpectedVersion -> ShowS
$cshowsPrec :: Int -> ExpectedVersion -> ShowS
Show, ExpectedVersion -> ExpectedVersion -> Bool
(ExpectedVersion -> ExpectedVersion -> Bool)
-> (ExpectedVersion -> ExpectedVersion -> Bool)
-> Eq ExpectedVersion
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ExpectedVersion -> ExpectedVersion -> Bool
$c/= :: ExpectedVersion -> ExpectedVersion -> Bool
== :: ExpectedVersion -> ExpectedVersion -> Bool
$c== :: ExpectedVersion -> ExpectedVersion -> Bool
Eq, Eq ExpectedVersion
Eq ExpectedVersion
-> (ExpectedVersion -> ExpectedVersion -> Ordering)
-> (ExpectedVersion -> ExpectedVersion -> Bool)
-> (ExpectedVersion -> ExpectedVersion -> Bool)
-> (ExpectedVersion -> ExpectedVersion -> Bool)
-> (ExpectedVersion -> ExpectedVersion -> Bool)
-> (ExpectedVersion -> ExpectedVersion -> ExpectedVersion)
-> (ExpectedVersion -> ExpectedVersion -> ExpectedVersion)
-> Ord ExpectedVersion
ExpectedVersion -> ExpectedVersion -> Bool
ExpectedVersion -> ExpectedVersion -> Ordering
ExpectedVersion -> ExpectedVersion -> ExpectedVersion
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: ExpectedVersion -> ExpectedVersion -> ExpectedVersion
$cmin :: ExpectedVersion -> ExpectedVersion -> ExpectedVersion
max :: ExpectedVersion -> ExpectedVersion -> ExpectedVersion
$cmax :: ExpectedVersion -> ExpectedVersion -> ExpectedVersion
>= :: ExpectedVersion -> ExpectedVersion -> Bool
$c>= :: ExpectedVersion -> ExpectedVersion -> Bool
> :: ExpectedVersion -> ExpectedVersion -> Bool
$c> :: ExpectedVersion -> ExpectedVersion -> Bool
<= :: ExpectedVersion -> ExpectedVersion -> Bool
$c<= :: ExpectedVersion -> ExpectedVersion -> Bool
< :: ExpectedVersion -> ExpectedVersion -> Bool
$c< :: ExpectedVersion -> ExpectedVersion -> Bool
compare :: ExpectedVersion -> ExpectedVersion -> Ordering
$ccompare :: ExpectedVersion -> ExpectedVersion -> Ordering
$cp1Ord :: Eq ExpectedVersion
Ord)


versionToInteger :: Coercible a StreamVersion => a -> Integer
versionToInteger :: a -> Integer
versionToInteger a
version =
  case a -> StreamVersion
coerce a
version of
    StreamVersion
DoesNotExist -> -Integer
1
    DoesExist StreamPosition
position -> StreamPosition -> Integer
forall a. Integral a => a -> Integer
toInteger StreamPosition
position


newtype ExpectedVersionViolation = ExpectedVersionViolation
  { ExpectedVersionViolation -> SqlError
expectedVersionViolationToSqlError :: Postgres.SqlError
  }
  deriving (Int -> ExpectedVersionViolation -> ShowS
[ExpectedVersionViolation] -> ShowS
ExpectedVersionViolation -> String
(Int -> ExpectedVersionViolation -> ShowS)
-> (ExpectedVersionViolation -> String)
-> ([ExpectedVersionViolation] -> ShowS)
-> Show ExpectedVersionViolation
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ExpectedVersionViolation] -> ShowS
$cshowList :: [ExpectedVersionViolation] -> ShowS
show :: ExpectedVersionViolation -> String
$cshow :: ExpectedVersionViolation -> String
showsPrec :: Int -> ExpectedVersionViolation -> ShowS
$cshowsPrec :: Int -> ExpectedVersionViolation -> ShowS
Show, ExpectedVersionViolation -> ExpectedVersionViolation -> Bool
(ExpectedVersionViolation -> ExpectedVersionViolation -> Bool)
-> (ExpectedVersionViolation -> ExpectedVersionViolation -> Bool)
-> Eq ExpectedVersionViolation
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ExpectedVersionViolation -> ExpectedVersionViolation -> Bool
$c/= :: ExpectedVersionViolation -> ExpectedVersionViolation -> Bool
== :: ExpectedVersionViolation -> ExpectedVersionViolation -> Bool
$c== :: ExpectedVersionViolation -> ExpectedVersionViolation -> Bool
Eq)
instance Exception ExpectedVersionViolation


parseExpectedVersionViolation :: Postgres.SqlError -> Maybe ExpectedVersionViolation
parseExpectedVersionViolation :: SqlError -> Maybe ExpectedVersionViolation
parseExpectedVersionViolation sqlError :: SqlError
sqlError@Postgres.SqlError{ByteString
ExecStatus
sqlState :: SqlError -> ByteString
sqlExecStatus :: SqlError -> ExecStatus
sqlErrorMsg :: SqlError -> ByteString
sqlErrorDetail :: SqlError -> ByteString
sqlErrorHint :: SqlError -> ByteString
sqlErrorHint :: ByteString
sqlErrorDetail :: ByteString
sqlErrorMsg :: ByteString
sqlExecStatus :: ExecStatus
sqlState :: ByteString
..} =
  {- Example error of what we are looking for

        SqlError
          { sqlState = "P0001"
          , sqlExecStatus = FatalError
          , sqlErrorMsg = "Wrong expected version: 4 (Stream: AqZHVQR4-Pn85sUkra3, Stream Version: 10)"
          , sqlErrorDetail = ""
          , sqlErrorHint = ""
          }

  -}

  let seemsLikeTheRightErrorMessages :: Bool
seemsLikeTheRightErrorMessages =
        ByteString
"Wrong expected version:" ByteString -> ByteString -> Bool
`Char8.isPrefixOf` ByteString
sqlErrorMsg

      isTheCorrectErrorState :: Bool
isTheCorrectErrorState =
        ByteString
sqlState ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== ByteString
"P0001"

      isTheCorrectExecStatus :: Bool
isTheCorrectExecStatus =
        ExecStatus
sqlExecStatus ExecStatus -> ExecStatus -> Bool
forall a. Eq a => a -> a -> Bool
== ExecStatus
Postgres.FatalError

      isProbablyTheRightError :: Bool
isProbablyTheRightError =
        Bool
seemsLikeTheRightErrorMessages
          Bool -> Bool -> Bool
&& Bool
isTheCorrectErrorState
          Bool -> Bool -> Bool
&& Bool
isTheCorrectExecStatus
   in if Bool
isProbablyTheRightError
        then ExpectedVersionViolation -> Maybe ExpectedVersionViolation
forall a. a -> Maybe a
Just (ExpectedVersionViolation -> Maybe ExpectedVersionViolation)
-> ExpectedVersionViolation -> Maybe ExpectedVersionViolation
forall a b. (a -> b) -> a -> b
$ SqlError -> ExpectedVersionViolation
ExpectedVersionViolation SqlError
sqlError
        else Maybe ExpectedVersionViolation
forall a. Maybe a
Nothing


data BatchSize
  = FixedSize NumberOfMessages
  | Unlimited
  deriving (Int -> BatchSize -> ShowS
[BatchSize] -> ShowS
BatchSize -> String
(Int -> BatchSize -> ShowS)
-> (BatchSize -> String)
-> ([BatchSize] -> ShowS)
-> Show BatchSize
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [BatchSize] -> ShowS
$cshowList :: [BatchSize] -> ShowS
show :: BatchSize -> String
$cshow :: BatchSize -> String
showsPrec :: Int -> BatchSize -> ShowS
$cshowsPrec :: Int -> BatchSize -> ShowS
Show, BatchSize -> BatchSize -> Bool
(BatchSize -> BatchSize -> Bool)
-> (BatchSize -> BatchSize -> Bool) -> Eq BatchSize
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: BatchSize -> BatchSize -> Bool
$c/= :: BatchSize -> BatchSize -> Bool
== :: BatchSize -> BatchSize -> Bool
$c== :: BatchSize -> BatchSize -> Bool
Eq)


batchSizeToInteger :: BatchSize -> Integer
batchSizeToInteger :: BatchSize -> Integer
batchSizeToInteger BatchSize
batchSize =
  case BatchSize
batchSize of
    FixedSize NumberOfMessages
size -> NumberOfMessages -> Integer
forall a. Integral a => a -> Integer
toInteger NumberOfMessages
size
    BatchSize
Unlimited -> -Integer
1


createdAtField :: RowParser Message.CreatedAt
createdAtField :: RowParser CreatedAt
createdAtField =
  UTCTime -> CreatedAt
Message.CreatedAt (UTCTime -> CreatedAt)
-> (LocalTime -> UTCTime) -> LocalTime -> CreatedAt
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TimeZone -> LocalTime -> UTCTime
Time.localTimeToUTC TimeZone
Time.utc (LocalTime -> CreatedAt)
-> RowParser LocalTime -> RowParser CreatedAt
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RowParser LocalTime
forall a. FromField a => RowParser a
field


streamPositionField :: RowParser Message.StreamPosition
streamPositionField :: RowParser StreamPosition
streamPositionField = do
  FieldParser StreamPosition -> RowParser StreamPosition
forall a. FieldParser a -> RowParser a
fieldWith (FieldParser StreamPosition -> RowParser StreamPosition)
-> FieldParser StreamPosition -> RowParser StreamPosition
forall a b. (a -> b) -> a -> b
$ \Field
f Maybe ByteString
mdata -> do
    Integer
integer <- FieldParser Integer
forall a. FromField a => FieldParser a
FromField.fromField Field
f Maybe ByteString
mdata
    if Integer
integer Integer -> Integer -> Bool
forall a. Ord a => a -> a -> Bool
>= Integer
0
      then StreamPosition -> Conversion StreamPosition
forall (f :: * -> *) a. Applicative f => a -> f a
pure (StreamPosition -> Conversion StreamPosition)
-> (Natural -> StreamPosition)
-> Natural
-> Conversion StreamPosition
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Natural -> StreamPosition
Message.StreamPosition (Natural -> Conversion StreamPosition)
-> Natural -> Conversion StreamPosition
forall a b. (a -> b) -> a -> b
$ Integer -> Natural
forall a. Num a => Integer -> a
fromInteger Integer
integer
      else (String -> Maybe Oid -> String -> String -> String -> ResultError)
-> Field -> String -> Conversion StreamPosition
forall a err.
(Typeable a, Exception err) =>
(String -> Maybe Oid -> String -> String -> String -> err)
-> Field -> String -> Conversion a
FromField.returnError String -> Maybe Oid -> String -> String -> String -> ResultError
FromField.Incompatible Field
f String
"Stream position is negative"


fromTable :: RowParser Message
fromTable :: RowParser Message
fromTable = do
  MessageId
messageId <- (UUID -> MessageId) -> RowParser UUID -> RowParser MessageId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap UUID -> MessageId
Message.MessageId RowParser UUID
forall a. FromField a => RowParser a
field
  StreamName
messageStream <- (Text -> StreamName) -> RowParser Text -> RowParser StreamName
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Text -> StreamName
StreamName RowParser Text
forall a. FromField a => RowParser a
field
  MessageType
messageType <- (Text -> MessageType) -> RowParser Text -> RowParser MessageType
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Text -> MessageType
Message.MessageType RowParser Text
forall a. FromField a => RowParser a
field
  StreamPosition
messageStreamPosition <- RowParser StreamPosition
streamPositionField
  GlobalPosition
messageGlobalPosition <- (Integer -> GlobalPosition)
-> RowParser Integer -> RowParser GlobalPosition
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Integer -> GlobalPosition
Message.GlobalPosition RowParser Integer
forall a. FromField a => RowParser a
field
  Payload
messagePayload <- Payload -> (Value -> Payload) -> Maybe Value -> Payload
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Payload
Message.nullPayload Value -> Payload
Message.Payload (Maybe Value -> Payload)
-> RowParser (Maybe Value) -> RowParser Payload
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RowParser (Maybe Value)
forall a. FromField a => RowParser a
field
  Metadata
messageMetadata <- Metadata -> (Value -> Metadata) -> Maybe Value -> Metadata
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Metadata
Message.nullMetadata Value -> Metadata
Message.Metadata (Maybe Value -> Metadata)
-> RowParser (Maybe Value) -> RowParser Metadata
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RowParser (Maybe Value)
forall a. FromField a => RowParser a
field
  CreatedAt
messageCreatedAt <- RowParser CreatedAt
createdAtField

  Message -> RowParser Message
forall (f :: * -> *) a. Applicative f => a -> f a
pure Message :: MessageId
-> StreamName
-> MessageType
-> StreamPosition
-> GlobalPosition
-> CreatedAt
-> Payload
-> Metadata
-> Message
Message{StreamName
Metadata
Payload
CreatedAt
GlobalPosition
StreamPosition
MessageType
MessageId
messageMetadata :: Metadata
messagePayload :: Payload
messageCreatedAt :: CreatedAt
messageGlobalPosition :: GlobalPosition
messageStreamPosition :: StreamPosition
messageType :: MessageType
messageStream :: StreamName
messageId :: MessageId
messageCreatedAt :: CreatedAt
messageMetadata :: Metadata
messagePayload :: Payload
messageGlobalPosition :: GlobalPosition
messageStreamPosition :: StreamPosition
messageType :: MessageType
messageStream :: StreamName
messageId :: MessageId
..}


fromFunction :: RowParser Message
fromFunction :: RowParser Message
fromFunction = do
  MessageId
messageId <- FieldParser MessageId -> RowParser MessageId
forall a. FieldParser a -> RowParser a
fieldWith (FieldParser MessageId -> RowParser MessageId)
-> FieldParser MessageId -> RowParser MessageId
forall a b. (a -> b) -> a -> b
$ \Field
f Maybe ByteString
mdata -> do
    Text
text <- FieldParser Text
forall a. FromField a => FieldParser a
FromField.fromField Field
f Maybe ByteString
mdata
    case Text -> Maybe UUID
UUID.fromText Text
text of
      Maybe UUID
Nothing -> (String -> Maybe Oid -> String -> String -> String -> ResultError)
-> Field -> String -> Conversion MessageId
forall a err.
(Typeable a, Exception err) =>
(String -> Maybe Oid -> String -> String -> String -> err)
-> Field -> String -> Conversion a
FromField.returnError String -> Maybe Oid -> String -> String -> String -> ResultError
FromField.Incompatible Field
f String
"Invalid UUID"
      Just UUID
uuid -> MessageId -> Conversion MessageId
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MessageId -> Conversion MessageId)
-> MessageId -> Conversion MessageId
forall a b. (a -> b) -> a -> b
$ UUID -> MessageId
Message.MessageId UUID
uuid

  StreamName
messageStream <- (Text -> StreamName) -> RowParser Text -> RowParser StreamName
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Text -> StreamName
StreamName RowParser Text
forall a. FromField a => RowParser a
field
  MessageType
messageType <- (Text -> MessageType) -> RowParser Text -> RowParser MessageType
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Text -> MessageType
Message.MessageType RowParser Text
forall a. FromField a => RowParser a
field
  StreamPosition
messageStreamPosition <- RowParser StreamPosition
streamPositionField
  GlobalPosition
messageGlobalPosition <- (Integer -> GlobalPosition)
-> RowParser Integer -> RowParser GlobalPosition
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Integer -> GlobalPosition
Message.GlobalPosition RowParser Integer
forall a. FromField a => RowParser a
field

  Payload
messagePayload <- do
    Maybe ByteString
maybeByteString <- RowParser (Maybe ByteString)
forall a. FromField a => RowParser a
field
    Payload -> RowParser Payload
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Payload -> RowParser Payload) -> Payload -> RowParser Payload
forall a b. (a -> b) -> a -> b
$
      Payload -> (Value -> Payload) -> Maybe Value -> Payload
forall b a. b -> (a -> b) -> Maybe a -> b
maybe
        Payload
Message.nullPayload
        Value -> Payload
Message.Payload
        (ByteString -> Maybe Value
forall a. FromJSON a => ByteString -> Maybe a
Aeson.decodeStrict (ByteString -> Maybe Value) -> Maybe ByteString -> Maybe Value
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Maybe ByteString
maybeByteString)

  Metadata
messageMetadata <- do
    Maybe ByteString
maybeByteString <- RowParser (Maybe ByteString)
forall a. FromField a => RowParser a
field
    Metadata -> RowParser Metadata
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Metadata -> RowParser Metadata) -> Metadata -> RowParser Metadata
forall a b. (a -> b) -> a -> b
$
      Metadata -> (Value -> Metadata) -> Maybe Value -> Metadata
forall b a. b -> (a -> b) -> Maybe a -> b
maybe
        Metadata
Message.nullMetadata
        Value -> Metadata
Message.Metadata
        (ByteString -> Maybe Value
forall a. FromJSON a => ByteString -> Maybe a
Aeson.decodeStrict (ByteString -> Maybe Value) -> Maybe ByteString -> Maybe Value
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Maybe ByteString
maybeByteString)

  CreatedAt
messageCreatedAt <- RowParser CreatedAt
createdAtField

  Message -> RowParser Message
forall (f :: * -> *) a. Applicative f => a -> f a
pure Message :: MessageId
-> StreamName
-> MessageType
-> StreamPosition
-> GlobalPosition
-> CreatedAt
-> Payload
-> Metadata
-> Message
Message{StreamName
Metadata
Payload
CreatedAt
GlobalPosition
StreamPosition
MessageType
MessageId
messageCreatedAt :: CreatedAt
messageMetadata :: Metadata
messagePayload :: Payload
messageGlobalPosition :: GlobalPosition
messageStreamPosition :: StreamPosition
messageType :: MessageType
messageStream :: StreamName
messageId :: MessageId
messageMetadata :: Metadata
messagePayload :: Payload
messageCreatedAt :: CreatedAt
messageGlobalPosition :: GlobalPosition
messageStreamPosition :: StreamPosition
messageType :: MessageType
messageStream :: StreamName
messageId :: MessageId
..}


lookupById :: Postgres.Connection -> Message.MessageId -> IO (Maybe Message)
lookupById :: Connection -> MessageId -> IO (Maybe Message)
lookupById Connection
connection MessageId
messageId = do
  let query :: Query
query =
        [sql|
          SELECT 
            id
            ,stream_name
            ,type
            ,position
            ,global_position
            ,data
            ,metadata
            ,time
          FROM message_store.messages
          WHERE id = ?;
        |]

  [Message]
messages <- RowParser Message
-> Connection -> Query -> Only UUID -> IO [Message]
forall q r.
ToRow q =>
RowParser r -> Connection -> Query -> q -> IO [r]
Postgres.queryWith RowParser Message
fromTable Connection
connection Query
query (UUID -> Only UUID
forall a. a -> Only a
Postgres.Only (UUID -> Only UUID) -> UUID -> Only UUID
forall a b. (a -> b) -> a -> b
$ MessageId -> UUID
Message.messageIdToUUID MessageId
messageId)

  Maybe Message -> IO (Maybe Message)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Message -> IO (Maybe Message))
-> Maybe Message -> IO (Maybe Message)
forall a b. (a -> b) -> a -> b
$ [Message] -> Maybe Message
forall a. [a] -> Maybe a
listToMaybe [Message]
messages


lookupByPosition :: Postgres.Connection -> Message.GlobalPosition -> IO (Maybe Message)
lookupByPosition :: Connection -> GlobalPosition -> IO (Maybe Message)
lookupByPosition Connection
connection GlobalPosition
position = do
  let query :: Query
query =
        [sql|
          SELECT 
            id
            ,stream_name
            ,type
            ,position
            ,global_position
            ,data
            ,metadata
            ,time
          FROM message_store.messages
          WHERE global_position = ?;
        |]

  [Message]
messages <- RowParser Message
-> Connection -> Query -> Only Integer -> IO [Message]
forall q r.
ToRow q =>
RowParser r -> Connection -> Query -> q -> IO [r]
Postgres.queryWith RowParser Message
fromTable Connection
connection Query
query (Integer -> Only Integer
forall a. a -> Only a
Postgres.Only (Integer -> Only Integer) -> Integer -> Only Integer
forall a b. (a -> b) -> a -> b
$ GlobalPosition -> Integer
Message.globalPositionToInteger GlobalPosition
position)

  Maybe Message -> IO (Maybe Message)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe Message -> IO (Maybe Message))
-> Maybe Message -> IO (Maybe Message)
forall a b. (a -> b) -> a -> b
$ [Message] -> Maybe Message
forall a. [a] -> Maybe a
listToMaybe [Message]
messages


writeMessageWithId ::
  ( Aeson.ToJSON payload
  , Aeson.ToJSON metadata
  ) =>
  Postgres.Connection ->
  Message.MessageId ->
  StreamName ->
  Message.MessageType ->
  payload ->
  Maybe metadata ->
  Maybe ExpectedVersion ->
  IO Message.StreamPosition
writeMessageWithId :: Connection
-> MessageId
-> StreamName
-> MessageType
-> payload
-> Maybe metadata
-> Maybe ExpectedVersion
-> IO StreamPosition
writeMessageWithId Connection
connection MessageId
messageId StreamName
streamName MessageType
messageType payload
payload Maybe metadata
metadata Maybe ExpectedVersion
expectedVersion = do
  let query :: Query
query =
        [sql|
          SELECT message_store.write_message (
            id => ?
            ,stream_name => ?
            ,type => ?
            ,data => ?
            ,metadata => ?
            ,expected_version => ?
          );
        |]
      params :: (Text, Text, Text, Value, Maybe Value, Maybe Integer)
params =
        ( UUID -> Text
UUID.toText (UUID -> Text) -> UUID -> Text
forall a b. (a -> b) -> a -> b
$ MessageId -> UUID
Message.messageIdToUUID MessageId
messageId
        , StreamName -> Text
streamNameToText StreamName
streamName
        , MessageType -> Text
Message.messageTypeToText MessageType
messageType
        , payload -> Value
forall a. ToJSON a => a -> Value
Aeson.toJSON payload
payload
        , (metadata -> Value) -> Maybe metadata -> Maybe Value
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap metadata -> Value
forall a. ToJSON a => a -> Value
Aeson.toJSON Maybe metadata
metadata
        , (ExpectedVersion -> Integer)
-> Maybe ExpectedVersion -> Maybe Integer
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ExpectedVersion -> Integer
forall a. Coercible a StreamVersion => a -> Integer
versionToInteger Maybe ExpectedVersion
expectedVersion
        )

      handleSqlError :: SqlError -> IO a
handleSqlError SqlError
sqlError =
        case SqlError -> Maybe ExpectedVersionViolation
parseExpectedVersionViolation SqlError
sqlError of
          Maybe ExpectedVersionViolation
Nothing ->
            SqlError -> IO a
forall e a. Exception e => e -> IO a
throwIO SqlError
sqlError
          Just ExpectedVersionViolation
expectedVersionViolation ->
            ExpectedVersionViolation -> IO a
forall e a. Exception e => e -> IO a
throwIO ExpectedVersionViolation
expectedVersionViolation

  [Postgres.Only Integer
position] <-
    (SqlError -> IO [Only Integer])
-> IO [Only Integer] -> IO [Only Integer]
forall e a. Exception e => (e -> IO a) -> IO a -> IO a
handle SqlError -> IO [Only Integer]
forall a. SqlError -> IO a
handleSqlError (IO [Only Integer] -> IO [Only Integer])
-> IO [Only Integer] -> IO [Only Integer]
forall a b. (a -> b) -> a -> b
$
      Connection
-> Query
-> (Text, Text, Text, Value, Maybe Value, Maybe Integer)
-> IO [Only Integer]
forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
Postgres.query Connection
connection Query
query (Text, Text, Text, Value, Maybe Value, Maybe Integer)
params

  StreamPosition -> IO StreamPosition
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Integer -> StreamPosition
forall a. Num a => Integer -> a
fromInteger Integer
position)


-- | Write a JSON-formatted message to a named stream, optionally specifying JSON-formatted metadata and an expected version number.
writeMessage ::
  ( Aeson.ToJSON payload
  , Aeson.ToJSON metadata
  ) =>
  Postgres.Connection ->
  StreamName ->
  Message.MessageType ->
  payload ->
  Maybe metadata ->
  Maybe ExpectedVersion ->
  IO (Message.MessageId, Message.StreamPosition)
writeMessage :: Connection
-> StreamName
-> MessageType
-> payload
-> Maybe metadata
-> Maybe ExpectedVersion
-> IO (MessageId, StreamPosition)
writeMessage Connection
connection StreamName
streamName MessageType
messageType payload
payload Maybe metadata
metadata Maybe ExpectedVersion
expectedVersion = do
  MessageId
messageId <- IO MessageId
Message.newMessageId
  StreamPosition
position <- Connection
-> MessageId
-> StreamName
-> MessageType
-> payload
-> Maybe metadata
-> Maybe ExpectedVersion
-> IO StreamPosition
forall payload metadata.
(ToJSON payload, ToJSON metadata) =>
Connection
-> MessageId
-> StreamName
-> MessageType
-> payload
-> Maybe metadata
-> Maybe ExpectedVersion
-> IO StreamPosition
writeMessageWithId Connection
connection MessageId
messageId StreamName
streamName MessageType
messageType payload
payload Maybe metadata
metadata Maybe ExpectedVersion
expectedVersion
  (MessageId, StreamPosition) -> IO (MessageId, StreamPosition)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MessageId
messageId, StreamPosition
position)


-- | Retrieve messages from a single stream, optionally specifying the starting position, the number of messages to retrieve, and an additional condition that will be appended to the SQL command's WHERE clause.
getStreamMessages ::
  Postgres.Connection ->
  StreamName ->
  Maybe Message.StreamPosition ->
  Maybe BatchSize ->
  Maybe Condition ->
  IO [Message]
getStreamMessages :: Connection
-> StreamName
-> Maybe StreamPosition
-> Maybe BatchSize
-> Maybe Condition
-> IO [Message]
getStreamMessages Connection
connection StreamName
streamName Maybe StreamPosition
position Maybe BatchSize
batchSize Maybe Condition
condition =
  let query :: Query
query =
        [sql|
          SELECT 
            id
            ,stream_name
            ,type
            ,position
            ,global_position
            ,data
            ,metadata
            ,time
          FROM message_store.get_stream_messages (
            stream_name => ?
            ,"position" => ?
            ,batch_size => ?
            ,condition => ?
          );
        |]
      params :: (Text, Integer, Integer, Maybe Text)
params =
        ( StreamName -> Text
streamNameToText StreamName
streamName
        , Integer
-> (StreamPosition -> Integer) -> Maybe StreamPosition -> Integer
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Integer
0 StreamPosition -> Integer
forall a. Integral a => a -> Integer
toInteger Maybe StreamPosition
position
        , Integer -> (BatchSize -> Integer) -> Maybe BatchSize -> Integer
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Integer
1000 BatchSize -> Integer
batchSizeToInteger Maybe BatchSize
batchSize
        , (Condition -> Text) -> Maybe Condition -> Maybe Text
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Condition -> Text
conditionToText Maybe Condition
condition
        )
   in RowParser Message
-> Connection
-> Query
-> (Text, Integer, Integer, Maybe Text)
-> IO [Message]
forall q r.
ToRow q =>
RowParser r -> Connection -> Query -> q -> IO [r]
Postgres.queryWith RowParser Message
fromFunction Connection
connection Query
query (Text, Integer, Integer, Maybe Text)
params


-- | Retrieve messages from a category of streams, optionally specifying the starting position, the number of messages to retrieve, the correlation category for Pub/Sub, consumer group parameters, and an additional condition that will be appended to the SQL command's WHERE clause.
getCategoryMessages ::
  Postgres.Connection ->
  Category ->
  Maybe Message.GlobalPosition ->
  Maybe BatchSize ->
  Maybe Correlation ->
  Maybe ConsumerGroup ->
  Maybe Condition ->
  IO [Message]
getCategoryMessages :: Connection
-> Category
-> Maybe GlobalPosition
-> Maybe BatchSize
-> Maybe Correlation
-> Maybe ConsumerGroup
-> Maybe Condition
-> IO [Message]
getCategoryMessages Connection
connection Category
category Maybe GlobalPosition
position Maybe BatchSize
batchSize Maybe Correlation
correlation Maybe ConsumerGroup
consumerGroup Maybe Condition
condition =
  let query :: Query
query =
        [sql|
          SELECT 
            id
            ,stream_name
            ,type
            ,position
            ,global_position
            ,data
            ,metadata
            ,time
          FROM message_store.get_category_messages (
            category => ?
            ,"position" => ?
            ,batch_size => ?
            ,correlation => ?
            ,consumer_group_member => ?
            ,consumer_group_size => ?
            ,condition => ?
          );
        |]
      params :: (Text, Integer, Integer, Maybe Text, Maybe Integer, Maybe Integer,
 Maybe Text)
params =
        ( Category -> Text
categoryToText Category
category
        , Integer
-> (GlobalPosition -> Integer) -> Maybe GlobalPosition -> Integer
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Integer
0 GlobalPosition -> Integer
Message.globalPositionToInteger Maybe GlobalPosition
position
        , Integer -> (BatchSize -> Integer) -> Maybe BatchSize -> Integer
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Integer
1000 BatchSize -> Integer
batchSizeToInteger Maybe BatchSize
batchSize
        , (Correlation -> Text) -> Maybe Correlation -> Maybe Text
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Correlation -> Text
correlationToText Maybe Correlation
correlation
        , (ConsumerGroup -> Integer) -> Maybe ConsumerGroup -> Maybe Integer
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Natural -> Integer
forall a. Integral a => a -> Integer
toInteger (Natural -> Integer)
-> (ConsumerGroup -> Natural) -> ConsumerGroup -> Integer
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConsumerGroup -> Natural
consumerGroupMember) Maybe ConsumerGroup
consumerGroup
        , (ConsumerGroup -> Integer) -> Maybe ConsumerGroup -> Maybe Integer
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Natural -> Integer
forall a. Integral a => a -> Integer
toInteger (Natural -> Integer)
-> (ConsumerGroup -> Natural) -> ConsumerGroup -> Integer
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConsumerGroup -> Natural
consumerGroupSize) Maybe ConsumerGroup
consumerGroup
        , (Condition -> Text) -> Maybe Condition -> Maybe Text
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Condition -> Text
conditionToText Maybe Condition
condition
        )
   in RowParser Message
-> Connection
-> Query
-> (Text, Integer, Integer, Maybe Text, Maybe Integer,
    Maybe Integer, Maybe Text)
-> IO [Message]
forall q r.
ToRow q =>
RowParser r -> Connection -> Query -> q -> IO [r]
Postgres.queryWith RowParser Message
fromFunction Connection
connection Query
query (Text, Integer, Integer, Maybe Text, Maybe Integer, Maybe Integer,
 Maybe Text)
params


-- | Row from the messages table that corresponds to the highest position number in the stream.
getLastStreamMessage :: Postgres.Connection -> StreamName -> IO (Maybe Message)
getLastStreamMessage :: Connection -> StreamName -> IO (Maybe Message)
getLastStreamMessage Connection
connection StreamName
streamName =
  let query :: Query
query =
        [sql|
          SELECT 
            id
            ,stream_name
            ,type
            ,position
            ,global_position
            ,data
            ,metadata
            ,time
          FROM message_store.get_last_stream_message (
            stream_name => ?
          );
        |]
      params :: Only Text
params =
        Text -> Only Text
forall a. a -> Only a
Postgres.Only (StreamName -> Text
streamNameToText StreamName
streamName)
   in [Message] -> Maybe Message
forall a. [a] -> Maybe a
listToMaybe ([Message] -> Maybe Message) -> IO [Message] -> IO (Maybe Message)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RowParser Message
-> Connection -> Query -> Only Text -> IO [Message]
forall q r.
ToRow q =>
RowParser r -> Connection -> Query -> q -> IO [r]
Postgres.queryWith RowParser Message
fromFunction Connection
connection Query
query Only Text
params


-- | Highest position number in the stream.
streamVersion :: Postgres.Connection -> StreamName -> IO (Maybe Message.StreamPosition)
streamVersion :: Connection -> StreamName -> IO (Maybe StreamPosition)
streamVersion Connection
connection StreamName
streamName = do
  let query :: Query
query =
        [sql|
          SELECT message_store.stream_version (
            stream_name => ?
          );
        |]
      params :: Only Text
params =
        Text -> Only Text
forall a. a -> Only a
Postgres.Only (StreamName -> Text
streamNameToText StreamName
streamName)

  [Only (Maybe Integer)]
result <- Connection -> Query -> Only Text -> IO [Only (Maybe Integer)]
forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
Postgres.query Connection
connection Query
query Only Text
params

  Maybe StreamPosition -> IO (Maybe StreamPosition)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe StreamPosition -> IO (Maybe StreamPosition))
-> Maybe StreamPosition -> IO (Maybe StreamPosition)
forall a b. (a -> b) -> a -> b
$ case [Only (Maybe Integer)]
result of
    [Postgres.Only (Just Integer
position)] -> StreamPosition -> Maybe StreamPosition
forall a. a -> Maybe a
Just (StreamPosition -> Maybe StreamPosition)
-> StreamPosition -> Maybe StreamPosition
forall a b. (a -> b) -> a -> b
$ Integer -> StreamPosition
forall a. Num a => Integer -> a
fromInteger Integer
position
    [Only (Maybe Integer)]
_ -> Maybe StreamPosition
forall a. Maybe a
Nothing