{-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE TemplateHaskell #-} -- | Common test functionality module Eventful.TestHelpers ( Counter (..) , CounterProjection , counterProjection , CounterAggregate , counterAggregate , CounterEvent (..) , CounterCommand (..) , EventStoreRunner (..) , GlobalStreamEventStoreRunner (..) , eventStoreSpec , globalStreamEventStoreSpec , VersionedProjectionCacheRunner (..) , versionedProjectionCacheSpec , GlobalStreamProjectionCacheRunner (..) , globalStreamProjectionCacheSpec , Text , module X ) where import Control.Monad as X import Control.Monad.IO.Class as X import Control.Monad.Logger as X import Data.Aeson import Data.Aeson.Casing import Data.Aeson.TH import Data.Text (Text) import Test.Hspec import Eventful -- | Example Projection/Aggregate newtype Counter = Counter { unCounter :: Int } deriving (Eq, Show, FromJSON, ToJSON) data CounterEvent = Added { _counterEventAmount :: Int } | CounterFailedOutOfBounds deriving (Eq, Show) type CounterProjection = Projection Counter CounterEvent counterProjection :: CounterProjection counterProjection = Projection (Counter 0) (\(Counter k) (Added x) -> Counter (k + x)) counterGlobalProjection :: Projection Counter (VersionedStreamEvent CounterEvent) counterGlobalProjection = Projection (Counter 0) (\(Counter k) (StreamEvent _ _ (Added x)) -> Counter (k + x)) data CounterCommand = Increment { _counterCommandAmount :: Int } | Decrement { _counterCommandAmount :: Int } deriving (Eq, Show) type CounterAggregate = Aggregate Counter CounterEvent CounterCommand counterAggregate :: CounterAggregate counterAggregate = Aggregate counterCommand counterProjection counterCommand :: Counter -> CounterCommand -> [CounterEvent] counterCommand (Counter k) (Increment n) = if k + n <= 100 then [Added n] else [CounterFailedOutOfBounds] counterCommand (Counter k) (Decrement n) = if k - n >= 0 then [Added (-n)] else [CounterFailedOutOfBounds] deriveJSON (aesonPrefix camelCase) ''CounterEvent deriveJSON (aesonPrefix camelCase) ''CounterCommand -- Test harness for stores newtype EventStoreRunner m = EventStoreRunner (forall a. (EventStoreWriter m CounterEvent -> VersionedEventStoreReader m CounterEvent -> m a) -> IO a) eventStoreSpec :: (Monad m) => EventStoreRunner m -> Spec eventStoreSpec (EventStoreRunner withStore) = do let withStoreExampleEvents action = withStore $ \writer reader -> do _ <- insertExampleEvents writer action writer reader context "when a few events are inserted" $ do let sampleEvents = [Added 1, Added 4, Added (-3), Added 5] withStore' action = withStore $ \writer reader -> do _ <- storeEvents writer NoStream nil sampleEvents action writer reader it "should return events" $ do events' <- withStore' $ \_ reader -> getEvents reader (allEvents nil) (streamEventEvent <$> events') `shouldBe` sampleEvents it "should return correct event versions" $ do events <- withStore' $ \_ reader -> getEvents reader (allEvents nil) (streamEventPosition <$> events) `shouldBe` [0, 1, 2, 3] it "should return correct events with queries" $ do (firstEvents, middleEvents, laterEvents, maxEvents) <- withStore' $ \_ reader -> (,,,) <$> getEvents reader (eventsUntil nil 1) <*> getEvents reader (eventsStartingAtUntil nil 1 2) <*> getEvents reader (eventsStartingAt nil 2) <*> getEvents reader (eventsStartingAtTakeLimit nil 0 2) (streamEventEvent <$> firstEvents) `shouldBe` take 2 sampleEvents (streamEventEvent <$> middleEvents) `shouldBe` take 2 (drop 1 sampleEvents) (streamEventEvent <$> laterEvents) `shouldBe` drop 2 sampleEvents (streamEventEvent <$> maxEvents) `shouldBe` take 2 sampleEvents it "should return the latest projection" $ do projection <- withStore' $ \_ reader -> getLatestStreamProjection reader (versionedStreamProjection nil counterProjection) streamProjectionState projection `shouldBe` Counter 7 streamProjectionPosition projection `shouldBe` 3 streamProjectionKey projection `shouldBe` nil it "should return the latest projection with some starting StreamProjection" $ do projection <- withStore' $ \_ reader -> do initialEvents <- getEvents reader (eventsUntil nil 1) let initialProjection = latestProjection counterProjection (streamEventEvent <$> initialEvents) getLatestStreamProjection reader (StreamProjection nil 1 counterProjection initialProjection) streamProjectionState projection `shouldBe` Counter 7 streamProjectionPosition projection `shouldBe` 3 streamProjectionKey projection `shouldBe` nil context "when events from multiple UUIDs are inserted" $ do it "should have the correct events for each aggregate" $ do (events1, events2) <- withStoreExampleEvents $ \_ reader -> (,) <$> getEvents reader (allEvents uuid1) <*> getEvents reader (allEvents uuid2) (streamEventEvent <$> events1) `shouldBe` Added <$> [1, 4] (streamEventEvent <$> events2) `shouldBe` Added <$> [2, 3, 5] (streamEventKey <$> events1) `shouldBe` [uuid1, uuid1] (streamEventKey <$> events2) `shouldBe` [uuid2, uuid2, uuid2] (streamEventPosition <$> events1) `shouldBe` [0, 1] (streamEventPosition <$> events2) `shouldBe` [0, 1, 2] it "should return correct event versions" $ do (events1, events2) <- withStoreExampleEvents $ \_ reader -> (,) <$> getEvents reader (allEvents uuid1) <*> getEvents reader (allEvents uuid2) streamEventEvent <$> events1 `shouldBe` [Added 1, Added 4] streamEventEvent <$> events2 `shouldBe` [Added 2, Added 3, Added 5] it "should return correct events with queries" $ do (firstEvents, middleEvents, laterEvents, maxEvents) <- withStoreExampleEvents $ \_ reader -> (,,,) <$> getEvents reader (eventsUntil uuid1 1) <*> getEvents reader (eventsStartingAtUntil uuid2 1 2) <*> getEvents reader (eventsStartingAt uuid2 2) <*> getEvents reader (eventsStartingAtTakeLimit uuid1 1 1) (streamEventEvent <$> firstEvents) `shouldBe` [Added 1, Added 4] (streamEventEvent <$> middleEvents) `shouldBe` [Added 3, Added 5] (streamEventEvent <$> laterEvents) `shouldBe` [Added 5] (streamEventEvent <$> maxEvents) `shouldBe` [Added 4] it "should produce the correct projections" $ do (proj1, proj2) <- withStoreExampleEvents $ \_ reader -> (,) <$> getLatestStreamProjection reader (versionedStreamProjection uuid1 counterProjection) <*> getLatestStreamProjection reader (versionedStreamProjection uuid2 counterProjection) (streamProjectionState proj1, streamProjectionPosition proj1) `shouldBe` (Counter 5, 1) (streamProjectionState proj2, streamProjectionPosition proj2) `shouldBe` (Counter 10, 2) describe "can handle event storage errors" $ do it "rejects some writes when event store isn't created" $ do (err1, err2) <- withStore $ \writer _ -> (,) <$> storeEvents writer StreamExists nil [Added 1] <*> storeEvents writer (ExactVersion 0) nil [Added 1] err1 `shouldBe` Just (EventStreamNotAtExpectedVersion (-1)) err2 `shouldBe` Just (EventStreamNotAtExpectedVersion (-1)) it "should be able to store events starting with an empty stream" $ do withStore (\writer _ -> storeEvents writer NoStream nil [Added 1]) `shouldReturn` Nothing it "should reject storing events sometimes with a stream" $ do (err1, err2, err3) <- withStore $ \writer _ -> (,,) <$> storeEvents writer NoStream nil [Added 1] <*> storeEvents writer NoStream nil [Added 1] <*> storeEvents writer (ExactVersion 1) nil [Added 1] err1 `shouldBe` Nothing err2 `shouldBe` Just (EventStreamNotAtExpectedVersion 0) err3 `shouldBe` Just (EventStreamNotAtExpectedVersion 0) it "should accepts storing events sometimes with a stream" $ do errors <- withStore $ \writer _ -> sequence [ storeEvents writer NoStream nil [Added 1] , storeEvents writer AnyVersion nil [Added 1] , storeEvents writer (ExactVersion 1) nil [Added 1] , storeEvents writer StreamExists nil [Added 1] ] errors `shouldBe` [Nothing, Nothing, Nothing, Nothing] newtype GlobalStreamEventStoreRunner m = GlobalStreamEventStoreRunner (forall a. (EventStoreWriter m CounterEvent -> GlobalEventStoreReader m CounterEvent -> m a) -> IO a) globalStreamEventStoreSpec :: (Monad m) => GlobalStreamEventStoreRunner m -> Spec globalStreamEventStoreSpec (GlobalStreamEventStoreRunner withStore) = do context "when the event store is empty" $ do it "shouldn't have any events" $ do events <- withStore (\_ globalReader -> getEvents globalReader (allEvents ())) length events `shouldBe` 0 context "when events from multiple UUIDs are inserted" $ do it "should have the correct events in global order" $ do events <- withStore $ \writer globalReader -> do insertExampleEvents writer getEvents globalReader (allEvents ()) (streamEventEvent . streamEventEvent <$> events) `shouldBe` Added <$> [1..5] (streamEventKey . streamEventEvent <$> events) `shouldBe` [uuid1, uuid2, uuid2, uuid1, uuid2] (streamEventPosition . streamEventEvent <$> events) `shouldBe` [0, 0, 1, 1, 2] (streamEventPosition <$> events) `shouldBe` [1..5] it "should work with global projections" $ do (proj1, proj2) <- withStore $ \writer globalReader -> do insertExampleEvents writer p1 <- getLatestStreamProjection globalReader (globalStreamProjection counterGlobalProjection) _ <- storeEvents writer AnyVersion uuid1 [Added 10, Added 20] p2 <- getLatestStreamProjection globalReader p1 return (p1, p2) streamProjectionPosition proj1 `shouldBe` 5 streamProjectionPosition proj2 `shouldBe` 7 it "should handle queries" $ do (firstEvents, middleEvents, laterEvents, maxEvents) <- withStore $ \writer globalReader -> do insertExampleEvents writer (,,,) <$> getEvents globalReader (eventsUntil () 2) <*> getEvents globalReader (eventsStartingAtUntil () 2 3) <*> getEvents globalReader (eventsStartingAt () 3) <*> getEvents globalReader (eventsStartingAtTakeLimit () 2 3) (streamEventEvent . streamEventEvent <$> firstEvents) `shouldBe` Added <$> [1..2] (streamEventPosition <$> firstEvents) `shouldBe` [1..2] (streamEventEvent . streamEventEvent <$> middleEvents) `shouldBe` Added <$> [2..3] (streamEventPosition <$> middleEvents) `shouldBe` [2..3] (streamEventEvent . streamEventEvent <$> laterEvents) `shouldBe` Added <$> [3..5] (streamEventPosition <$> laterEvents) `shouldBe` [3..5] (streamEventEvent . streamEventEvent <$> maxEvents) `shouldBe` Added <$> [2..4] (streamEventPosition <$> maxEvents) `shouldBe` [2..4] insertExampleEvents :: (Monad m) => EventStoreWriter m CounterEvent -> m () insertExampleEvents store = do void $ storeEvents store NoStream uuid1 [Added 1] void $ storeEvents store NoStream uuid2 [Added 2, Added 3] void $ storeEvents store (ExactVersion 0) uuid1 [Added 4] void $ storeEvents store (ExactVersion 1) uuid2 [Added 5] uuid1 :: UUID uuid1 = uuidFromInteger 1 uuid2 :: UUID uuid2 = uuidFromInteger 2 newtype VersionedProjectionCacheRunner m = VersionedProjectionCacheRunner (forall a. ( EventStoreWriter m CounterEvent -> VersionedEventStoreReader m CounterEvent -> VersionedProjectionCache Counter m -> m a) -> IO a ) versionedProjectionCacheSpec :: (Monad m) => VersionedProjectionCacheRunner m -> Spec versionedProjectionCacheSpec (VersionedProjectionCacheRunner withStoreAndCache) = do context "when the store is empty" $ do it "should be able to store and load simple projections" $ do snapshot <- withStoreAndCache $ \_ _ cache -> do storeProjectionSnapshot cache nil 4 (Counter 100) loadProjectionSnapshot cache nil snapshot `shouldBe` Just (4, Counter 100) context "when the store has some events in one stream" $ do it "should load from a stream of events" $ do snapshot <- withStoreAndCache $ \writer reader cache -> do _ <- storeEvents writer AnyVersion nil [Added 1, Added 2] getLatestVersionedProjectionWithCache reader cache (versionedStreamProjection nil counterProjection) streamProjectionPosition snapshot `shouldBe` 1 streamProjectionState snapshot `shouldBe` Counter 3 it "should work with updateProjectionCache" $ do snapshot <- withStoreAndCache $ \writer reader cache -> do _ <- storeEvents writer AnyVersion nil [Added 1, Added 2, Added 3] updateProjectionCache reader cache (versionedStreamProjection nil counterProjection) getLatestVersionedProjectionWithCache reader cache (versionedStreamProjection nil counterProjection) streamProjectionKey snapshot `shouldBe` nil streamProjectionPosition snapshot `shouldBe` 2 streamProjectionState snapshot `shouldBe` Counter 6 newtype GlobalStreamProjectionCacheRunner m = GlobalStreamProjectionCacheRunner (forall a. ( EventStoreWriter m CounterEvent -> GlobalEventStoreReader m CounterEvent -> GlobalStreamProjectionCache Text Counter m -> m a ) -> IO a) globalStreamProjectionCacheSpec :: (Monad m) => GlobalStreamProjectionCacheRunner m -> Spec globalStreamProjectionCacheSpec (GlobalStreamProjectionCacheRunner withStoreAndCache) = do context "when the store is empty" $ do it "should be able to store and load simple projections" $ do snapshot <- withStoreAndCache $ \_ _ cache -> do storeProjectionSnapshot cache "key" 4 (Counter 100) loadProjectionSnapshot cache "key" snapshot `shouldBe` Just (4, Counter 100) context "when the store has some events in one stream" $ do it "should load from a global stream of events" $ do snapshot <- withStoreAndCache $ \writer globalReader cache -> do _ <- storeEvents writer AnyVersion nil [Added 1, Added 2] getLatestGlobalProjectionWithCache globalReader cache (globalStreamProjection counterGlobalProjection) "key" streamProjectionPosition snapshot `shouldBe` 2 streamProjectionState snapshot `shouldBe` Counter 3 it "should work with updateGlobalProjectionCache" $ do snapshot <- withStoreAndCache $ \writer globalReader cache -> do _ <- storeEvents writer AnyVersion nil [Added 1, Added 2, Added 3] updateGlobalProjectionCache globalReader cache (globalStreamProjection counterGlobalProjection) "key" getLatestGlobalProjectionWithCache globalReader cache (globalStreamProjection counterGlobalProjection) "key" streamProjectionPosition snapshot `shouldBe` 3 streamProjectionState snapshot `shouldBe` Counter 6 context "when events from multiple UUIDs are inserted" $ do it "should have the correct cached projection value" $ do snapshot <- withStoreAndCache $ \writer globalReader cache -> do insertExampleEvents writer updateGlobalProjectionCache globalReader cache (globalStreamProjection counterGlobalProjection) "key" getLatestGlobalProjectionWithCache globalReader cache (globalStreamProjection counterGlobalProjection) "key" streamProjectionPosition snapshot `shouldBe` 5 streamProjectionState snapshot `shouldBe` Counter 15