{-# LANGUAGE CPP #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE OverloadedStrings #-}
module Test.Inbox (
Inbox
, newInbox
, putInbox
, takeInbox
, takeInbox'
, Filter(Filter)
, equalTo
, predicate
, expectEmpty
, expectEmpty'
)
where
import qualified Control.Category as Cat
import Control.Arrow (Arrow(..), first)
import Data.IORef (newIORef, readIORef, atomicModifyIORef, IORef)
import Control.Monad.IO.Class (liftIO, MonadIO)
import qualified Data.Text as T
import Data.ErrorOr
import Control.Concurrent
import Control.Concurrent.Async
import Data.Maybe (isJust)
import Control.Monad (unless)
import Data.Foldable (sequenceA_)
import Control.Monad (forM_)
import Control.Exception
import Data.Time
#if __GLASGOW_HASKELL__ < 880
import Data.Semigroup
#endif
data Inbox a =
Inbox (IORef (MessagesAndObservers a))
data MessagesAndObservers a = MessagesAndObservers {
MessagesAndObservers a -> [a]
messages :: ![a]
, MessagesAndObservers a -> Observers
observers :: !Observers
}
type Observer = MVar ()
type Observers = [Observer]
newInbox :: IO (Inbox a)
newInbox :: IO (Inbox a)
newInbox = IORef (MessagesAndObservers a) -> Inbox a
forall a. IORef (MessagesAndObservers a) -> Inbox a
Inbox (IORef (MessagesAndObservers a) -> Inbox a)
-> IO (IORef (MessagesAndObservers a)) -> IO (Inbox a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` MessagesAndObservers a -> IO (IORef (MessagesAndObservers a))
forall a. a -> IO (IORef a)
newIORef ([a] -> Observers -> MessagesAndObservers a
forall a. [a] -> Observers -> MessagesAndObservers a
MessagesAndObservers [] [])
putInbox :: forall m a . MonadIO m => Inbox a -> a -> m ()
putInbox :: Inbox a -> a -> m ()
putInbox (Inbox IORef (MessagesAndObservers a)
r) a
newmsg = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall a. IO a -> IO a
mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Observers
observers <- IORef (MessagesAndObservers a)
-> (MessagesAndObservers a -> (MessagesAndObservers a, Observers))
-> IO Observers
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (MessagesAndObservers a)
r MessagesAndObservers a -> (MessagesAndObservers a, Observers)
f
(MVar () -> IO ()) -> Observers -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ((MVar () -> () -> IO ()) -> () -> MVar () -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar ()) Observers
observers
where
f :: MessagesAndObservers a -> (MessagesAndObservers a, [MVar ()])
f :: MessagesAndObservers a -> (MessagesAndObservers a, Observers)
f MessagesAndObservers {[a]
Observers
observers :: Observers
messages :: [a]
observers :: forall a. MessagesAndObservers a -> Observers
messages :: forall a. MessagesAndObservers a -> [a]
..} = ([a] -> Observers -> MessagesAndObservers a
forall a. [a] -> Observers -> MessagesAndObservers a
MessagesAndObservers (a
newmsga -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
messages) [], Observers
observers)
takeInbox :: (MonadIO m, Show a) => Inbox a -> Filter a b -> m b
takeInbox :: Inbox a -> Filter a b -> m b
takeInbox = Float -> Inbox a -> Filter a b -> m b
forall (m :: * -> *) a b.
(MonadIO m, Show a) =>
Float -> Inbox a -> Filter a b -> m b
takeInbox' Float
3
takeInbox' ::
forall m a b.
(MonadIO m, Show a) =>
#if __GLASGOW_HASKELL__ >= 880
#endif
Float ->
Inbox a ->
Filter a b ->
m b
takeInbox' :: Float -> Inbox a -> Filter a b -> m b
takeInbox' Float
sec t :: Inbox a
t@(Inbox IORef (MessagesAndObservers a)
r) filter :: Filter a b
filter@(Filter Text
text a -> Maybe b
f) = do
MVar ()
observer <- IO (MVar ()) -> m (MVar ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (MVar ()) -> m (MVar ())) -> IO (MVar ()) -> m (MVar ())
forall a b. (a -> b) -> a -> b
$ IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
Maybe b
match <- IO (Maybe b) -> m (Maybe b)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe b) -> m (Maybe b)) -> IO (Maybe b) -> m (Maybe b)
forall a b. (a -> b) -> a -> b
$ IO (Maybe b) -> IO (Maybe b)
forall a. IO a -> IO a
mask_ (IO (Maybe b) -> IO (Maybe b)) -> IO (Maybe b) -> IO (Maybe b)
forall a b. (a -> b) -> a -> b
$ do
Maybe (Observers, b)
match <- IORef (MessagesAndObservers a)
-> (MessagesAndObservers a
-> (MessagesAndObservers a, Maybe (Observers, b)))
-> IO (Maybe (Observers, b))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (MessagesAndObservers a)
r (MVar ()
-> MessagesAndObservers a
-> (MessagesAndObservers a, Maybe (Observers, b))
checkInbox MVar ()
observer)
Maybe Observers -> (Observers -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ((Observers, b) -> Observers
forall a b. (a, b) -> a
fst ((Observers, b) -> Observers)
-> Maybe (Observers, b) -> Maybe Observers
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (Observers, b)
match) ((Observers -> IO ()) -> IO ()) -> (Observers -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ (MVar () -> IO ()) -> Observers -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ((MVar () -> () -> IO ()) -> () -> MVar () -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar ()) (Observers -> IO ())
-> (Observers -> Observers) -> Observers -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Observers -> Observers
forall a. [a] -> [a]
reverse
Maybe b -> IO (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return ((Observers, b) -> b
forall a b. (a, b) -> b
snd ((Observers, b) -> b) -> Maybe (Observers, b) -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (Observers, b)
match)
case Maybe b
match of
Just b
msg -> b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return b
msg
Maybe b
Nothing -> do
UTCTime
time0 <- IO UTCTime -> m UTCTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
Either () ()
res <- IO (Either () ()) -> m (Either () ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either () ()) -> m (Either () ()))
-> IO (Either () ()) -> m (Either () ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO () -> IO (Either () ())
forall a b. IO a -> IO b -> IO (Either a b)
race (Int -> IO ()
threadDelay (Float -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (Float -> Int) -> Float -> Int
forall a b. (a -> b) -> a -> b
$ Float
sec Float -> Float -> Float
forall a. Num a => a -> a -> a
* Float
10Float -> Integer -> Float
forall a b. (Num a, Integral b) => a -> b -> a
^Integer
6)) (MVar () -> IO ()
forall a. MVar a -> IO a
readMVar MVar ()
observer)
case Either () ()
res of
Right () -> do
UTCTime
now <- IO UTCTime -> m UTCTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
let elapsed :: NominalDiffTime
elapsed = UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
time0 UTCTime
now
Float -> Inbox a -> Filter a b -> m b
forall (m :: * -> *) a b.
(MonadIO m, Show a) =>
Float -> Inbox a -> Filter a b -> m b
takeInbox' (Float
sec Float -> Float -> Float
forall a. Num a => a -> a -> a
- NominalDiffTime -> Float
forall a b. (Real a, Fractional b) => a -> b
realToFrac NominalDiffTime
elapsed) Inbox a
t Filter a b
filter
Left () -> do
[a]
xs <- IO [a] -> m [a]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [a] -> m [a]) -> IO [a] -> m [a]
forall a b. (a -> b) -> a -> b
$ MessagesAndObservers a -> [a]
forall a. MessagesAndObservers a -> [a]
messages (MessagesAndObservers a -> [a])
-> IO (MessagesAndObservers a) -> IO [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef (MessagesAndObservers a) -> IO (MessagesAndObservers a)
forall a. IORef a -> IO a
readIORef IORef (MessagesAndObservers a)
r
[Char] -> m b
forall a. HasCallStack => [Char] -> a
error (Text -> [Char]
T.unpack (Text -> [Char]) -> Text -> [Char]
forall a b. (a -> b) -> a -> b
$ Text
"Timed out waiting for `" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
text Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"`. Contents: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> ([Char] -> Text
T.pack ([Char] -> Text) -> [Char] -> Text
forall a b. (a -> b) -> a -> b
$ [a] -> [Char]
forall a. Show a => a -> [Char]
show [a]
xs))
where
checkInbox :: Observer -> MessagesAndObservers a -> (MessagesAndObservers a, Maybe (Observers, b))
checkInbox :: MVar ()
-> MessagesAndObservers a
-> (MessagesAndObservers a, Maybe (Observers, b))
checkInbox MVar ()
observer MessagesAndObservers{[a]
Observers
observers :: Observers
messages :: [a]
observers :: forall a. MessagesAndObservers a -> Observers
messages :: forall a. MessagesAndObservers a -> [a]
..} =
case ([a] -> [a]) -> ([a], Maybe b) -> ([a], Maybe b)
forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (b, d) (c, d)
first [a] -> [a]
forall a. [a] -> [a]
reverse (([a], Maybe b) -> ([a], Maybe b))
-> ([a] -> ([a], Maybe b)) -> [a] -> ([a], Maybe b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> Maybe b) -> [a] -> ([a], Maybe b)
forall a a. (a -> Maybe a) -> [a] -> ([a], Maybe a)
pick a -> Maybe b
f ([a] -> ([a], Maybe b)) -> ([a] -> [a]) -> [a] -> ([a], Maybe b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [a] -> [a]
forall a. [a] -> [a]
reverse ([a] -> ([a], Maybe b)) -> [a] -> ([a], Maybe b)
forall a b. (a -> b) -> a -> b
$ [a]
messages of
([a]
_, Maybe b
Nothing) -> ([a] -> Observers -> MessagesAndObservers a
forall a. [a] -> Observers -> MessagesAndObservers a
MessagesAndObservers [a]
messages (MVar ()
observerMVar () -> Observers -> Observers
forall a. a -> [a] -> [a]
:Observers
observers), Maybe (Observers, b)
forall a. Maybe a
Nothing)
([a]
newMsgs, Just b
matched) -> ([a] -> Observers -> MessagesAndObservers a
forall a. [a] -> Observers -> MessagesAndObservers a
MessagesAndObservers [a]
newMsgs [], (Observers, b) -> Maybe (Observers, b)
forall a. a -> Maybe a
Just (Observers
observers, b
matched))
pick :: (a -> Maybe a) -> [a] -> ([a], Maybe a)
pick a -> Maybe a
_ [] = ([], Maybe a
forall a. Maybe a
Nothing)
pick a -> Maybe a
f (a
x:[a]
xs) =
case a -> Maybe a
f a
x of
Maybe a
Nothing ->
let ([a]
rest, Maybe a
res) = (a -> Maybe a) -> [a] -> ([a], Maybe a)
pick a -> Maybe a
f [a]
xs in
(a
xa -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
rest, Maybe a
res)
Just a
found -> ([a]
xs, a -> Maybe a
forall a. a -> Maybe a
Just a
found)
data Filter a b = Filter T.Text (a -> Maybe b)
instance Cat.Category Filter where
id :: Filter a a
id = Text -> (a -> Maybe a) -> Filter a a
forall a b. Text -> (a -> Maybe b) -> Filter a b
Filter Text
"id" a -> Maybe a
forall a. a -> Maybe a
Just
. :: Filter b c -> Filter a b -> Filter a c
(.) (Filter Text
n1 b -> Maybe c
f1) (Filter Text
n2 a -> Maybe b
f2) = Text -> (a -> Maybe c) -> Filter a c
forall a b. Text -> (a -> Maybe b) -> Filter a b
Filter (Text
n2 Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
">>>" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
n1) (\a
x -> a -> Maybe b
f2 a
x Maybe b -> (b -> Maybe c) -> Maybe c
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= b -> Maybe c
f1)
instance Arrow Filter where
arr :: (b -> c) -> Filter b c
arr b -> c
f = Text -> (b -> Maybe c) -> Filter b c
forall a b. Text -> (a -> Maybe b) -> Filter a b
Filter Text
"arr" (c -> Maybe c
forall a. a -> Maybe a
Just (c -> Maybe c) -> (b -> c) -> b -> Maybe c
forall b c a. (b -> c) -> (a -> b) -> a -> c
. b -> c
f)
first :: Filter b c -> Filter (b, d) (c, d)
first (Filter Text
name b -> Maybe c
f) = Text -> ((b, d) -> Maybe (c, d)) -> Filter (b, d) (c, d)
forall a b. Text -> (a -> Maybe b) -> Filter a b
Filter (Text
name Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"*") (\(b
x,d
y) -> (c -> (c, d)) -> Maybe c -> Maybe (c, d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (,d
y) (b -> Maybe c
f b
x))
equalTo :: (Eq a, Show a) => a -> Filter a ()
equalTo :: a -> Filter a ()
equalTo a
a = Text -> (a -> Maybe ()) -> Filter a ()
forall a b. Text -> (a -> Maybe b) -> Filter a b
Filter ([Char] -> Text
T.pack ([Char] -> Text) -> [Char] -> Text
forall a b. (a -> b) -> a -> b
$ a -> [Char]
forall a. Show a => a -> [Char]
show a
a) (\a
x -> if a
a a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
x then () -> Maybe ()
forall a. a -> Maybe a
Just () else Maybe ()
forall a. Maybe a
Nothing)
predicate :: T.Text
-> (a -> Bool)
-> Filter a a
predicate :: Text -> (a -> Bool) -> Filter a a
predicate Text
name a -> Bool
p = Text -> (a -> Maybe a) -> Filter a a
forall a b. Text -> (a -> Maybe b) -> Filter a b
Filter Text
name (\a
x -> if a -> Bool
p a
x then a -> Maybe a
forall a. a -> Maybe a
Just a
x else Maybe a
forall a. Maybe a
Nothing)
expectEmpty :: Show a => Inbox a -> IO (ErrorOr ())
expectEmpty :: Inbox a -> IO (ErrorOr ())
expectEmpty (Inbox IORef (MessagesAndObservers a)
r) = do
[a]
xs <- MessagesAndObservers a -> [a]
forall a. MessagesAndObservers a -> [a]
messages (MessagesAndObservers a -> [a])
-> IO (MessagesAndObservers a) -> IO [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef (MessagesAndObservers a) -> IO (MessagesAndObservers a)
forall a. IORef a -> IO a
readIORef IORef (MessagesAndObservers a)
r
case [a]
xs of
[] -> ErrorOr () -> IO (ErrorOr ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> ErrorOr ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
[a]
_ -> ErrorOr () -> IO (ErrorOr ())
forall (m :: * -> *) a. Monad m => a -> m a
return (ErrorOr () -> IO (ErrorOr ()))
-> ([a] -> ErrorOr ()) -> [a] -> IO (ErrorOr ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> ErrorOr () -> ErrorOr ()
forall a. Text -> ErrorOr a -> ErrorOr a
tag Text
"Unconsumed messages" (ErrorOr () -> ErrorOr ())
-> ([a] -> ErrorOr ()) -> [a] -> ErrorOr ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [ErrorOr Any] -> ErrorOr ()
forall (t :: * -> *) (f :: * -> *) a.
(Foldable t, Applicative f) =>
t (f a) -> f ()
sequenceA_ ([ErrorOr Any] -> ErrorOr ())
-> ([a] -> [ErrorOr Any]) -> [a] -> ErrorOr ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> ErrorOr Any) -> [a] -> [ErrorOr Any]
forall a b. (a -> b) -> [a] -> [b]
map (Text -> ErrorOr Any
forall a. Text -> ErrorOr a
err (Text -> ErrorOr Any) -> (a -> Text) -> a -> ErrorOr Any
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Char] -> Text
T.pack ([Char] -> Text) -> (a -> [Char]) -> a -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> [Char]
forall a. Show a => a -> [Char]
show) ([a] -> IO (ErrorOr ())) -> [a] -> IO (ErrorOr ())
forall a b. (a -> b) -> a -> b
$ [a]
xs
expectEmpty' :: (Show a, MonadIO m) => Inbox a -> Filter a b -> m ()
expectEmpty' :: Inbox a -> Filter a b -> m ()
expectEmpty' (Inbox IORef (MessagesAndObservers a)
r) (Filter Text
name a -> Maybe b
p) = do
[a]
elems <- IO [a] -> m [a]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO ((a -> Bool) -> [a] -> [a]
forall a. (a -> Bool) -> [a] -> [a]
filter (Maybe b -> Bool
forall a. Maybe a -> Bool
isJust(Maybe b -> Bool) -> (a -> Maybe b) -> a -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
.a -> Maybe b
p) ([a] -> [a])
-> (MessagesAndObservers a -> [a]) -> MessagesAndObservers a -> [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MessagesAndObservers a -> [a]
forall a. MessagesAndObservers a -> [a]
messages (MessagesAndObservers a -> [a])
-> IO (MessagesAndObservers a) -> IO [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef (MessagesAndObservers a) -> IO (MessagesAndObservers a)
forall a. IORef a -> IO a
readIORef IORef (MessagesAndObservers a)
r)
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [a]
elems) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO () -> m ()) -> ([a] -> IO ()) -> [a] -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ErrorOr () -> IO ()
forall (t :: * -> *) (s :: * -> *) a. ErrorConv t s => t a -> s a
toE
(ErrorOr () -> IO ()) -> ([a] -> ErrorOr ()) -> [a] -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> ErrorOr () -> ErrorOr ()
forall a. Text -> ErrorOr a -> ErrorOr a
tag (Text
"There are msgs matching " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
name)
(ErrorOr () -> ErrorOr ())
-> ([a] -> ErrorOr ()) -> [a] -> ErrorOr ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [ErrorOr ()] -> ErrorOr ()
forall (t :: * -> *) (f :: * -> *) a.
(Foldable t, Applicative f) =>
t (f a) -> f ()
sequenceA_
([ErrorOr ()] -> ErrorOr ())
-> ([a] -> [ErrorOr ()]) -> [a] -> ErrorOr ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> ErrorOr ()) -> [a] -> [ErrorOr ()]
forall a b. (a -> b) -> [a] -> [b]
map ((Text -> ErrorOr ()
forall a. Text -> ErrorOr a
err :: T.Text -> ErrorOr ()) (Text -> ErrorOr ()) -> (a -> Text) -> a -> ErrorOr ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Char] -> Text
T.pack ([Char] -> Text) -> (a -> [Char]) -> a -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> [Char]
forall a. Show a => a -> [Char]
show) ([a] -> m ()) -> [a] -> m ()
forall a b. (a -> b) -> a -> b
$ [a]
elems