{-# LANGUAGE CPP #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilyDependencies #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE UndecidableInstances #-}
module Network.Ethereum.Contract.Event.MultiFilter
(
MultiFilter(..)
, minStartBlock
, minEndBlock
, modifyMultiFilter
, multiEvent
, multiEventMany
, multiEventNoFilter
, multiEventManyNoFilter
, Handlers
, Handler(..)
, Rec(..)
) where
import Control.Concurrent (threadDelay)
import Control.Monad (forM, void, when)
import Control.Monad.IO.Class (MonadIO (..))
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Reader (ReaderT (..))
import Data.List (sortOn)
import Data.Machine (MachineT, asParts,
autoM, await,
construct, final,
repeatedly, runT,
unfoldPlan, (~>))
import Data.Machine.Plan (PlanT, stop, yield)
import Data.Maybe (catMaybes, fromJust,
listToMaybe)
import Data.Tagged (Tagged (..))
import Data.Vinyl (Rec ((:&), RNil),
RecApplicative)
import Data.Vinyl.CoRec (CoRec (..), Field,
FoldRec, Handler (H),
Handlers, coRecToRec,
firstField, match,
onField)
import Data.Vinyl.Functor (Compose (..),
Identity (..))
#if MIN_VERSION_vinyl(0,10,0)
import Data.Vinyl (RPureConstrained)
#else
import Data.Proxy (Proxy (..))
import Data.Vinyl.TypeLevel (AllAllSat)
#endif
import Data.Solidity.Event (DecodeEvent (..))
import qualified Network.Ethereum.Api.Eth as Eth
import Network.Ethereum.Api.Types (Change (..),
DefaultBlock (..),
Filter (..), Quantity)
import Network.JsonRpc.TinyClient (JsonRpc(..))
import Network.Ethereum.Contract.Event.Common
data MultiFilter (es :: [*]) where
NilFilters :: MultiFilter '[]
(:?) :: Filter e -> MultiFilter es -> MultiFilter (e ': es)
infixr 5 :?
minEndBlock
:: MultiFilter es
-> DefaultBlock
minEndBlock :: MultiFilter es -> DefaultBlock
minEndBlock MultiFilter es
NilFilters = DefaultBlock
Pending
minEndBlock (Filter Maybe [Address]
_ DefaultBlock
_ DefaultBlock
e Maybe [Maybe HexString]
_ :? MultiFilter es
fs) = DefaultBlock
e DefaultBlock -> DefaultBlock -> DefaultBlock
forall a. Ord a => a -> a -> a
`min` MultiFilter es -> DefaultBlock
forall (es :: [*]). MultiFilter es -> DefaultBlock
minEndBlock MultiFilter es
fs
minStartBlock
:: MultiFilter es
-> DefaultBlock
minStartBlock :: MultiFilter es -> DefaultBlock
minStartBlock MultiFilter es
NilFilters = DefaultBlock
Pending
minStartBlock (Filter Maybe [Address]
_ DefaultBlock
s DefaultBlock
_ Maybe [Maybe HexString]
_ :? MultiFilter es
fs) = DefaultBlock
s DefaultBlock -> DefaultBlock -> DefaultBlock
forall a. Ord a => a -> a -> a
`min` MultiFilter es -> DefaultBlock
forall (es :: [*]). MultiFilter es -> DefaultBlock
minStartBlock MultiFilter es
fs
modifyMultiFilter
:: (forall e. Filter e -> Filter e)
-> MultiFilter es
-> MultiFilter es
modifyMultiFilter :: (forall e. Filter e -> Filter e)
-> MultiFilter es -> MultiFilter es
modifyMultiFilter forall e. Filter e -> Filter e
_ MultiFilter es
NilFilters = MultiFilter es
MultiFilter '[]
NilFilters
modifyMultiFilter forall e. Filter e -> Filter e
h (Filter e
f :? MultiFilter es
fs) = Filter e -> Filter e
forall e. Filter e -> Filter e
h Filter e
f Filter e -> MultiFilter es -> MultiFilter (e : es)
forall e (es :: [*]).
Filter e -> MultiFilter es -> MultiFilter (e : es)
:? (forall e. Filter e -> Filter e)
-> MultiFilter es -> MultiFilter es
forall (es :: [*]).
(forall e. Filter e -> Filter e)
-> MultiFilter es -> MultiFilter es
modifyMultiFilter forall e. Filter e -> Filter e
h MultiFilter es
fs
multiEvent
:: ( PollFilters es
, QueryAllLogs es
, MapHandlers m es (WithChange es)
#if MIN_VERSION_vinyl(0,10,0)
, RPureConstrained HasLogIndex (WithChange es)
#else
, AllAllSat '[HasLogIndex] (WithChange es)
#endif
, RecApplicative (WithChange es)
, JsonRpc m
)
=> MultiFilter es
-> Handlers es (ReaderT Change m EventAction)
-> m ()
multiEvent :: MultiFilter es
-> Handlers es (ReaderT Change m EventAction) -> m ()
multiEvent MultiFilter es
fltrs = MultiFilter es
-> Integer -> Handlers es (ReaderT Change m EventAction) -> m ()
forall (es :: [*]) (m :: * -> *).
(PollFilters es, QueryAllLogs es, MapHandlers m es (WithChange es),
RPureConstrained HasLogIndex (WithChange es),
RecApplicative (WithChange es), JsonRpc m) =>
MultiFilter es
-> Integer -> Handlers es (ReaderT Change m EventAction) -> m ()
multiEventMany MultiFilter es
fltrs Integer
0
data MultiFilterStreamState es =
MultiFilterStreamState { MultiFilterStreamState es -> Quantity
mfssCurrentBlock :: Quantity
, MultiFilterStreamState es -> MultiFilter es
mfssInitialMultiFilter :: MultiFilter es
, MultiFilterStreamState es -> Integer
mfssWindowSize :: Integer
}
multiEventMany
:: ( PollFilters es
, QueryAllLogs es
, MapHandlers m es (WithChange es)
#if MIN_VERSION_vinyl(0,10,0)
, RPureConstrained HasLogIndex (WithChange es)
#else
, AllAllSat '[HasLogIndex] (WithChange es)
#endif
, RecApplicative (WithChange es)
, JsonRpc m
)
=> MultiFilter es
-> Integer
-> Handlers es (ReaderT Change m EventAction)
-> m ()
multiEventMany :: MultiFilter es
-> Integer -> Handlers es (ReaderT Change m EventAction) -> m ()
multiEventMany MultiFilter es
fltrs Integer
window Handlers es (ReaderT Change m EventAction)
handlers = do
Quantity
start <- DefaultBlock -> m Quantity
forall (m :: * -> *). JsonRpc m => DefaultBlock -> m Quantity
mkBlockNumber (DefaultBlock -> m Quantity) -> DefaultBlock -> m Quantity
forall a b. (a -> b) -> a -> b
$ MultiFilter es -> DefaultBlock
forall (es :: [*]). MultiFilter es -> DefaultBlock
minStartBlock MultiFilter es
fltrs
let initState :: MultiFilterStreamState es
initState =
MultiFilterStreamState :: forall (es :: [*]).
Quantity -> MultiFilter es -> Integer -> MultiFilterStreamState es
MultiFilterStreamState { mfssCurrentBlock :: Quantity
mfssCurrentBlock = Quantity
start
, mfssInitialMultiFilter :: MultiFilter es
mfssInitialMultiFilter = MultiFilter es
fltrs
, mfssWindowSize :: Integer
mfssWindowSize = Integer
window
}
Maybe (EventAction, Quantity)
mLastProcessedFilterState <- MachineT m Any [Field (WithChange es)]
-> Handlers es (ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
forall (m :: * -> *) (es :: [*]) (k :: * -> *).
(Monad m, MapHandlers m es (WithChange es)) =>
MachineT m k [Field (WithChange es)]
-> Handlers es (ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
reduceMultiEventStream (MultiFilterStreamState es -> MachineT m Any [Field (WithChange es)]
forall (es :: [*]) (k :: * -> *) (m :: * -> *).
(QueryAllLogs es, RPureConstrained HasLogIndex (WithChange es),
RecApplicative (WithChange es), JsonRpc m) =>
MultiFilterStreamState es -> MachineT m k [Field (WithChange es)]
playMultiLogs MultiFilterStreamState es
initState) Handlers es (ReaderT Change m EventAction)
handlers
case Maybe (EventAction, Quantity)
mLastProcessedFilterState of
Maybe (EventAction, Quantity)
Nothing -> MultiFilter es -> m ()
startPolling ((forall e. Filter e -> Filter e)
-> MultiFilter es -> MultiFilter es
forall (es :: [*]).
(forall e. Filter e -> Filter e)
-> MultiFilter es -> MultiFilter es
modifyMultiFilter (\Filter e
f -> Filter e
f {filterFromBlock :: DefaultBlock
filterFromBlock = Quantity -> DefaultBlock
BlockWithNumber Quantity
start}) MultiFilter es
fltrs)
Just (EventAction
act, Quantity
lastBlock) -> do
Quantity
end <- DefaultBlock -> m Quantity
forall (m :: * -> *). JsonRpc m => DefaultBlock -> m Quantity
mkBlockNumber (DefaultBlock -> m Quantity)
-> (MultiFilter es -> DefaultBlock) -> MultiFilter es -> m Quantity
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MultiFilter es -> DefaultBlock
forall (es :: [*]). MultiFilter es -> DefaultBlock
minEndBlock (MultiFilter es -> m Quantity) -> MultiFilter es -> m Quantity
forall a b. (a -> b) -> a -> b
$ MultiFilter es
fltrs
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (EventAction
act EventAction -> EventAction -> Bool
forall a. Eq a => a -> a -> Bool
/= EventAction
TerminateEvent Bool -> Bool -> Bool
&& Quantity
lastBlock Quantity -> Quantity -> Bool
forall a. Ord a => a -> a -> Bool
< Quantity
end) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
let pollingFromBlock :: Quantity
pollingFromBlock = Quantity
lastBlock Quantity -> Quantity -> Quantity
forall a. Num a => a -> a -> a
+ Quantity
1
in MultiFilter es -> m ()
startPolling ((forall e. Filter e -> Filter e)
-> MultiFilter es -> MultiFilter es
forall (es :: [*]).
(forall e. Filter e -> Filter e)
-> MultiFilter es -> MultiFilter es
modifyMultiFilter (\Filter e
f -> Filter e
f {filterFromBlock :: DefaultBlock
filterFromBlock = Quantity -> DefaultBlock
BlockWithNumber Quantity
pollingFromBlock}) MultiFilter es
fltrs)
where
startPolling :: MultiFilter es -> m ()
startPolling MultiFilter es
fltrs' = do
TaggedFilterIds es
fIds <- MultiFilter es -> m (TaggedFilterIds es)
forall (es :: [*]) (m :: * -> *).
(PollFilters es, JsonRpc m) =>
MultiFilter es -> m (TaggedFilterIds es)
openMultiFilter MultiFilter es
fltrs'
let pollTo :: DefaultBlock
pollTo = MultiFilter es -> DefaultBlock
forall (es :: [*]). MultiFilter es -> DefaultBlock
minEndBlock MultiFilter es
fltrs'
m (Maybe (EventAction, Quantity)) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Maybe (EventAction, Quantity)) -> m ())
-> m (Maybe (EventAction, Quantity)) -> m ()
forall a b. (a -> b) -> a -> b
$ MachineT m Any [Field (WithChange es)]
-> Handlers es (ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
forall (m :: * -> *) (es :: [*]) (k :: * -> *).
(Monad m, MapHandlers m es (WithChange es)) =>
MachineT m k [Field (WithChange es)]
-> Handlers es (ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
reduceMultiEventStream (TaggedFilterIds es
-> DefaultBlock -> MachineT m Any [Field (WithChange es)]
forall (es :: [*]) (m :: * -> *) (k :: * -> *).
(PollFilters es, RecApplicative (WithChange es),
RPureConstrained HasLogIndex (WithChange es), JsonRpc m) =>
TaggedFilterIds es
-> DefaultBlock -> MachineT m k [Field (WithChange es)]
pollMultiFilter TaggedFilterIds es
fIds DefaultBlock
pollTo) Handlers es (ReaderT Change m EventAction)
handlers
multiFilterStream :: JsonRpc m
=> MultiFilterStreamState es
-> MachineT m k (MultiFilter es)
multiFilterStream :: MultiFilterStreamState es -> MachineT m k (MultiFilter es)
multiFilterStream MultiFilterStreamState es
initialPlan = do
MultiFilterStreamState es
-> (MultiFilterStreamState es
-> PlanT k (MultiFilter es) m (MultiFilterStreamState es))
-> MachineT m k (MultiFilter es)
forall (m :: * -> *) s (k :: * -> *) o.
Monad m =>
s -> (s -> PlanT k o m s) -> MachineT m k o
unfoldPlan MultiFilterStreamState es
initialPlan ((MultiFilterStreamState es
-> PlanT k (MultiFilter es) m (MultiFilterStreamState es))
-> MachineT m k (MultiFilter es))
-> (MultiFilterStreamState es
-> PlanT k (MultiFilter es) m (MultiFilterStreamState es))
-> MachineT m k (MultiFilter es)
forall a b. (a -> b) -> a -> b
$ \MultiFilterStreamState es
s -> do
Quantity
end <- m Quantity -> PlanT k (MultiFilter es) m Quantity
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m Quantity -> PlanT k (MultiFilter es) m Quantity)
-> (MultiFilterStreamState es -> m Quantity)
-> MultiFilterStreamState es
-> PlanT k (MultiFilter es) m Quantity
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DefaultBlock -> m Quantity
forall (m :: * -> *). JsonRpc m => DefaultBlock -> m Quantity
mkBlockNumber (DefaultBlock -> m Quantity)
-> (MultiFilterStreamState es -> DefaultBlock)
-> MultiFilterStreamState es
-> m Quantity
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MultiFilter es -> DefaultBlock
forall (es :: [*]). MultiFilter es -> DefaultBlock
minEndBlock (MultiFilter es -> DefaultBlock)
-> (MultiFilterStreamState es -> MultiFilter es)
-> MultiFilterStreamState es
-> DefaultBlock
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MultiFilterStreamState es -> MultiFilter es
forall (es :: [*]). MultiFilterStreamState es -> MultiFilter es
mfssInitialMultiFilter (MultiFilterStreamState es -> PlanT k (MultiFilter es) m Quantity)
-> MultiFilterStreamState es -> PlanT k (MultiFilter es) m Quantity
forall a b. (a -> b) -> a -> b
$ MultiFilterStreamState es
initialPlan
Quantity
-> MultiFilterStreamState es
-> PlanT k (MultiFilter es) m (MultiFilterStreamState es)
forall (m :: * -> *) (es :: [*]) (k :: * -> *).
JsonRpc m =>
Quantity
-> MultiFilterStreamState es
-> PlanT k (MultiFilter es) m (MultiFilterStreamState es)
filterPlan Quantity
end MultiFilterStreamState es
s
where
filterPlan :: JsonRpc m
=> Quantity
-> MultiFilterStreamState es
-> PlanT k (MultiFilter es) m (MultiFilterStreamState es)
filterPlan :: Quantity
-> MultiFilterStreamState es
-> PlanT k (MultiFilter es) m (MultiFilterStreamState es)
filterPlan Quantity
end initialState :: MultiFilterStreamState es
initialState@MultiFilterStreamState{Integer
Quantity
MultiFilter es
mfssWindowSize :: Integer
mfssInitialMultiFilter :: MultiFilter es
mfssCurrentBlock :: Quantity
mfssWindowSize :: forall (es :: [*]). MultiFilterStreamState es -> Integer
mfssInitialMultiFilter :: forall (es :: [*]). MultiFilterStreamState es -> MultiFilter es
mfssCurrentBlock :: forall (es :: [*]). MultiFilterStreamState es -> Quantity
..} = do
if Quantity
mfssCurrentBlock Quantity -> Quantity -> Bool
forall a. Ord a => a -> a -> Bool
> Quantity
end
then PlanT k (MultiFilter es) m (MultiFilterStreamState es)
forall (k :: * -> *) o a. Plan k o a
stop
else do
let to' :: Quantity
to' = Quantity -> Quantity -> Quantity
forall a. Ord a => a -> a -> a
min Quantity
end (Quantity -> Quantity) -> Quantity -> Quantity
forall a b. (a -> b) -> a -> b
$ Quantity
mfssCurrentBlock Quantity -> Quantity -> Quantity
forall a. Num a => a -> a -> a
+ Integer -> Quantity
forall a. Num a => Integer -> a
fromInteger Integer
mfssWindowSize
h :: forall e. Filter e -> Filter e
h :: Filter e -> Filter e
h Filter e
f = Filter e
f { filterFromBlock :: DefaultBlock
filterFromBlock = Quantity -> DefaultBlock
BlockWithNumber Quantity
mfssCurrentBlock
, filterToBlock :: DefaultBlock
filterToBlock = Quantity -> DefaultBlock
BlockWithNumber Quantity
to'
}
MultiFilter es -> Plan k (MultiFilter es) ()
forall o (k :: * -> *). o -> Plan k o ()
yield ((forall e. Filter e -> Filter e)
-> MultiFilter es -> MultiFilter es
forall (es :: [*]).
(forall e. Filter e -> Filter e)
-> MultiFilter es -> MultiFilter es
modifyMultiFilter forall e. Filter e -> Filter e
h MultiFilter es
mfssInitialMultiFilter)
Quantity
-> MultiFilterStreamState es
-> PlanT k (MultiFilter es) m (MultiFilterStreamState es)
forall (m :: * -> *) (es :: [*]) (k :: * -> *).
JsonRpc m =>
Quantity
-> MultiFilterStreamState es
-> PlanT k (MultiFilter es) m (MultiFilterStreamState es)
filterPlan Quantity
end MultiFilterStreamState es
initialState { mfssCurrentBlock :: Quantity
mfssCurrentBlock = Quantity
to' Quantity -> Quantity -> Quantity
forall a. Num a => a -> a -> a
+ Quantity
1 }
weakenCoRec
:: ( RecApplicative ts
, FoldRec (t ': ts) (t ': ts)
)
=> Field ts
-> Field (t ': ts)
weakenCoRec :: Field ts -> Field (t : ts)
weakenCoRec = Maybe (Field (t : ts)) -> Field (t : ts)
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe (Field (t : ts)) -> Field (t : ts))
-> (Field ts -> Maybe (Field (t : ts)))
-> Field ts
-> Field (t : ts)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Rec (Maybe :. Identity) (t : ts) -> Maybe (Field (t : ts))
forall k (ts :: [k]) (f :: k -> *).
FoldRec ts ts =>
Rec (Maybe :. f) ts -> Maybe (CoRec f ts)
firstField (Rec (Maybe :. Identity) (t : ts) -> Maybe (Field (t : ts)))
-> (Field ts -> Rec (Maybe :. Identity) (t : ts))
-> Field ts
-> Maybe (Field (t : ts))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe (Identity t) -> Compose Maybe Identity t
forall l k (f :: l -> *) (g :: k -> l) (x :: k).
f (g x) -> Compose f g x
Compose Maybe (Identity t)
forall a. Maybe a
Nothing Compose Maybe Identity t
-> Rec (Maybe :. Identity) ts -> Rec (Maybe :. Identity) (t : ts)
forall u (a :: u -> *) (r :: u) (rs :: [u]).
a r -> Rec a rs -> Rec a (r : rs)
:&) (Rec (Maybe :. Identity) ts -> Rec (Maybe :. Identity) (t : ts))
-> (Field ts -> Rec (Maybe :. Identity) ts)
-> Field ts
-> Rec (Maybe :. Identity) (t : ts)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Field ts -> Rec (Maybe :. Identity) ts
forall u (f :: u -> *) (ts :: [u]).
RecApplicative ts =>
CoRec f ts -> Rec (Maybe :. f) ts
coRecToRec
type family WithChange (es :: [*]) = (es' :: [*]) | es' -> es where
WithChange '[] = '[]
WithChange (e : es) = FilterChange e : WithChange es
class QueryAllLogs (es :: [*]) where
queryAllLogs :: JsonRpc m => MultiFilter es -> m [Field (WithChange es)]
instance QueryAllLogs '[] where
queryAllLogs :: MultiFilter '[] -> m [Field (WithChange '[])]
queryAllLogs MultiFilter '[]
NilFilters = [Field '[]] -> m [Field '[]]
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
instance forall e i ni es.
( DecodeEvent i ni e
, QueryAllLogs es
, RecApplicative (WithChange es)
, FoldRec (FilterChange e : WithChange es) (WithChange es)
) => QueryAllLogs (e:es) where
queryAllLogs :: MultiFilter (e : es) -> m [Field (WithChange (e : es))]
queryAllLogs (Filter e
f :? MultiFilter es
fs) = do
[Change]
changes <- Filter e -> m [Change]
forall (m :: * -> *) e. JsonRpc m => Filter e -> m [Change]
Eth.getLogs Filter e
f
[FilterChange e]
filterChanges <- IO [FilterChange e] -> m [FilterChange e]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [FilterChange e] -> m [FilterChange e])
-> ([Change] -> IO [FilterChange e])
-> [Change]
-> m [FilterChange e]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DecodeEvent i ni e => [Change] -> IO [FilterChange e]
forall i ni e.
DecodeEvent i ni e =>
[Change] -> IO [FilterChange e]
mkFilterChanges @_ @_ @e ([Change] -> m [FilterChange e]) -> [Change] -> m [FilterChange e]
forall a b. (a -> b) -> a -> b
$ [Change]
changes
[Field (WithChange es)]
filterChanges' <- MultiFilter es -> m [Field (WithChange es)]
forall (es :: [*]) (m :: * -> *).
(QueryAllLogs es, JsonRpc m) =>
MultiFilter es -> m [Field (WithChange es)]
queryAllLogs MultiFilter es
fs
[CoRec Identity (FilterChange e : WithChange es)]
-> m [CoRec Identity (FilterChange e : WithChange es)]
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([CoRec Identity (FilterChange e : WithChange es)]
-> m [CoRec Identity (FilterChange e : WithChange es)])
-> [CoRec Identity (FilterChange e : WithChange es)]
-> m [CoRec Identity (FilterChange e : WithChange es)]
forall a b. (a -> b) -> a -> b
$ (FilterChange e -> CoRec Identity (FilterChange e : WithChange es))
-> [FilterChange e]
-> [CoRec Identity (FilterChange e : WithChange es)]
forall a b. (a -> b) -> [a] -> [b]
map (Identity (FilterChange e)
-> CoRec Identity (FilterChange e : WithChange es)
forall k (a1 :: k) (b :: [k]) (a :: k -> *).
RElem a1 b (RIndex a1 b) =>
a a1 -> CoRec a b
CoRec (Identity (FilterChange e)
-> CoRec Identity (FilterChange e : WithChange es))
-> (FilterChange e -> Identity (FilterChange e))
-> FilterChange e
-> CoRec Identity (FilterChange e : WithChange es)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FilterChange e -> Identity (FilterChange e)
forall a. a -> Identity a
Identity) [FilterChange e]
filterChanges [CoRec Identity (FilterChange e : WithChange es)]
-> [CoRec Identity (FilterChange e : WithChange es)]
-> [CoRec Identity (FilterChange e : WithChange es)]
forall a. Semigroup a => a -> a -> a
<> (Field (WithChange es)
-> CoRec Identity (FilterChange e : WithChange es))
-> [Field (WithChange es)]
-> [CoRec Identity (FilterChange e : WithChange es)]
forall a b. (a -> b) -> [a] -> [b]
map Field (WithChange es)
-> CoRec Identity (FilterChange e : WithChange es)
forall (ts :: [*]) t.
(RecApplicative ts, FoldRec (t : ts) (t : ts)) =>
Field ts -> Field (t : ts)
weakenCoRec [Field (WithChange es)]
filterChanges'
class HasLogIndex a where
getLogIndex :: a -> Maybe (Quantity, Quantity)
instance HasLogIndex (FilterChange e) where
getLogIndex :: FilterChange e -> Maybe (Quantity, Quantity)
getLogIndex FilterChange{e
Change
filterChangeEvent :: forall a. FilterChange a -> a
filterChangeRawChange :: forall a. FilterChange a -> Change
filterChangeEvent :: e
filterChangeRawChange :: Change
..} =
(,) (Quantity -> Quantity -> (Quantity, Quantity))
-> Maybe Quantity -> Maybe (Quantity -> (Quantity, Quantity))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Change -> Maybe Quantity
changeBlockNumber Change
filterChangeRawChange Maybe (Quantity -> (Quantity, Quantity))
-> Maybe Quantity -> Maybe (Quantity, Quantity)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Change -> Maybe Quantity
changeLogIndex Change
filterChangeRawChange
sortChanges
#if MIN_VERSION_vinyl(0,10,0)
:: ( RPureConstrained HasLogIndex es
#else
:: ( AllAllSat '[HasLogIndex] es
#endif
, RecApplicative es
)
=> [Field es]
-> [Field es]
sortChanges :: [Field es] -> [Field es]
sortChanges [Field es]
changes =
#if MIN_VERSION_vinyl(0,10,0)
let sorterProj :: Field ts -> Maybe (Quantity, Quantity)
sorterProj Field ts
change = (forall a.
(a ∈ ts, HasLogIndex a) =>
a -> Maybe (Quantity, Quantity))
-> Field ts -> Maybe (Quantity, Quantity)
forall (c :: * -> Constraint) (ts :: [*]) b.
RPureConstrained c ts =>
(forall a. (a ∈ ts, c a) => a -> b) -> Field ts -> b
onField @HasLogIndex forall a.
(a ∈ ts, HasLogIndex a) =>
a -> Maybe (Quantity, Quantity)
forall a. HasLogIndex a => a -> Maybe (Quantity, Quantity)
getLogIndex Field ts
change
#else
let sorterProj change = onField (Proxy @'[HasLogIndex]) getLogIndex change
#endif
in (Field es -> Maybe (Quantity, Quantity))
-> [Field es] -> [Field es]
forall b a. Ord b => (a -> b) -> [a] -> [a]
sortOn Field es -> Maybe (Quantity, Quantity)
forall (ts :: [*]).
RPureConstrained HasLogIndex ts =>
Field ts -> Maybe (Quantity, Quantity)
sorterProj [Field es]
changes
class MapHandlers m es es' where
mapHandlers
:: Handlers es (ReaderT Change m EventAction)
-> Handlers es' (m (Maybe (EventAction, Quantity)))
instance Monad m => MapHandlers m '[] '[] where
mapHandlers :: Handlers '[] (ReaderT Change m EventAction)
-> Handlers '[] (m (Maybe (EventAction, Quantity)))
mapHandlers Handlers '[] (ReaderT Change m EventAction)
RNil = Handlers '[] (m (Maybe (EventAction, Quantity)))
forall u (a :: u -> *). Rec a '[]
RNil
instance
( Monad m
, MapHandlers m es es'
) => MapHandlers m (e : es) (FilterChange e : es') where
mapHandlers :: Handlers (e : es) (ReaderT Change m EventAction)
-> Handlers
(FilterChange e : es') (m (Maybe (EventAction, Quantity)))
mapHandlers (H r -> ReaderT Change m EventAction
f :& Rec (Handler (ReaderT Change m EventAction)) rs
fs) =
let f' :: FilterChange r -> m (Maybe (EventAction, Quantity))
f' FilterChange{r
Change
filterChangeEvent :: r
filterChangeRawChange :: Change
filterChangeEvent :: forall a. FilterChange a -> a
filterChangeRawChange :: forall a. FilterChange a -> Change
..} = do
EventAction
act <- ReaderT Change m EventAction -> Change -> m EventAction
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (r -> ReaderT Change m EventAction
f r
filterChangeEvent) Change
filterChangeRawChange
Maybe (EventAction, Quantity) -> m (Maybe (EventAction, Quantity))
forall (m :: * -> *) a. Monad m => a -> m a
return ((,) EventAction
act (Quantity -> (EventAction, Quantity))
-> Maybe Quantity -> Maybe (EventAction, Quantity)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Change -> Maybe Quantity
changeBlockNumber Change
filterChangeRawChange)
in (FilterChange r -> m (Maybe (EventAction, Quantity)))
-> Handler (m (Maybe (EventAction, Quantity))) (FilterChange r)
forall b a. (a -> b) -> Handler b a
H FilterChange r -> m (Maybe (EventAction, Quantity))
f' Handler (m (Maybe (EventAction, Quantity))) (FilterChange r)
-> Rec (Handler (m (Maybe (EventAction, Quantity)))) es'
-> Rec
(Handler (m (Maybe (EventAction, Quantity))))
(FilterChange r : es')
forall u (a :: u -> *) (r :: u) (rs :: [u]).
a r -> Rec a rs -> Rec a (r : rs)
:& Rec (Handler (ReaderT Change m EventAction)) rs
-> Rec (Handler (m (Maybe (EventAction, Quantity)))) es'
forall (m :: * -> *) (es :: [*]) (es' :: [*]).
MapHandlers m es es' =>
Handlers es (ReaderT Change m EventAction)
-> Handlers es' (m (Maybe (EventAction, Quantity)))
mapHandlers Rec (Handler (ReaderT Change m EventAction)) rs
fs
reduceMultiEventStream
:: ( Monad m
, MapHandlers m es (WithChange es)
)
=> MachineT m k [Field (WithChange es)]
-> Handlers es (ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
reduceMultiEventStream :: MachineT m k [Field (WithChange es)]
-> Handlers es (ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
reduceMultiEventStream MachineT m k [Field (WithChange es)]
filterChanges Handlers es (ReaderT Change m EventAction)
handlers = ([(EventAction, Quantity)] -> Maybe (EventAction, Quantity))
-> m [(EventAction, Quantity)] -> m (Maybe (EventAction, Quantity))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [(EventAction, Quantity)] -> Maybe (EventAction, Quantity)
forall a. [a] -> Maybe a
listToMaybe (m [(EventAction, Quantity)] -> m (Maybe (EventAction, Quantity)))
-> (MachineT m k (EventAction, Quantity)
-> m [(EventAction, Quantity)])
-> MachineT m k (EventAction, Quantity)
-> m (Maybe (EventAction, Quantity))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MachineT m k (EventAction, Quantity) -> m [(EventAction, Quantity)]
forall (m :: * -> *) (k :: * -> *) b.
Monad m =>
MachineT m k b -> m [b]
runT (MachineT m k (EventAction, Quantity)
-> m (Maybe (EventAction, Quantity)))
-> MachineT m k (EventAction, Quantity)
-> m (Maybe (EventAction, Quantity))
forall a b. (a -> b) -> a -> b
$
MachineT m k [Field (WithChange es)]
filterChanges
MachineT m k [Field (WithChange es)]
-> ProcessT m [Field (WithChange es)] [(EventAction, Quantity)]
-> MachineT m k [(EventAction, Quantity)]
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> ([Field (WithChange es)] -> m [(EventAction, Quantity)])
-> ProcessT m [Field (WithChange es)] [(EventAction, Quantity)]
forall (k :: * -> * -> *) (m :: * -> *) a b.
(Category k, Monad m) =>
(a -> m b) -> MachineT m (k a) b
autoM (Handlers es (ReaderT Change m EventAction)
-> [Field (WithChange es)] -> m [(EventAction, Quantity)]
forall (f :: * -> *) (es :: [*]) (ts :: [*]).
(Monad f, MapHandlers f es ts) =>
Handlers es (ReaderT Change f EventAction)
-> [CoRec Identity ts] -> f [(EventAction, Quantity)]
processChanges Handlers es (ReaderT Change m EventAction)
handlers)
MachineT m k [(EventAction, Quantity)]
-> ProcessT m [(EventAction, Quantity)] (EventAction, Quantity)
-> MachineT m k (EventAction, Quantity)
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> ProcessT m [(EventAction, Quantity)] (EventAction, Quantity)
forall (f :: * -> *) a. Foldable f => Process (f a) a
asParts
MachineT m k (EventAction, Quantity)
-> ProcessT m (EventAction, Quantity) (EventAction, Quantity)
-> MachineT m k (EventAction, Quantity)
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> ((EventAction, Quantity) -> Bool)
-> ProcessT m (EventAction, Quantity) (EventAction, Quantity)
forall (m :: * -> *) (k :: * -> * -> *) o.
(Monad m, Category k) =>
(o -> Bool) -> MachineT m (k o) o
runWhile (\(EventAction
act, Quantity
_) -> EventAction
act EventAction -> EventAction -> Bool
forall a. Eq a => a -> a -> Bool
/= EventAction
TerminateEvent)
MachineT m k (EventAction, Quantity)
-> ProcessT m (EventAction, Quantity) (EventAction, Quantity)
-> MachineT m k (EventAction, Quantity)
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> ProcessT m (EventAction, Quantity) (EventAction, Quantity)
forall (k :: * -> * -> *) a. Category k => Machine (k a) a
final
where
runWhile :: (o -> Bool) -> MachineT m (k o) o
runWhile o -> Bool
p = PlanT (k o) o m () -> MachineT m (k o) o
forall (m :: * -> *) (k :: * -> *) o a.
Monad m =>
PlanT k o m a -> MachineT m k o
repeatedly (PlanT (k o) o m () -> MachineT m (k o) o)
-> PlanT (k o) o m () -> MachineT m (k o) o
forall a b. (a -> b) -> a -> b
$ do
o
v <- PlanT (k o) o m o
forall (k :: * -> * -> *) i o. Category k => Plan (k i) o i
await
if o -> Bool
p o
v
then o -> Plan (k o) o ()
forall o (k :: * -> *). o -> Plan k o ()
yield o
v
else o -> Plan (k o) o ()
forall o (k :: * -> *). o -> Plan k o ()
yield o
v PlanT (k o) o m () -> PlanT (k o) o m () -> PlanT (k o) o m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> PlanT (k o) o m ()
forall (k :: * -> *) o a. Plan k o a
stop
processChanges :: Handlers es (ReaderT Change f EventAction)
-> [CoRec Identity ts] -> f [(EventAction, Quantity)]
processChanges Handlers es (ReaderT Change f EventAction)
handlers' [CoRec Identity ts]
changes = ([Maybe (EventAction, Quantity)] -> [(EventAction, Quantity)])
-> f [Maybe (EventAction, Quantity)] -> f [(EventAction, Quantity)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [Maybe (EventAction, Quantity)] -> [(EventAction, Quantity)]
forall a. [Maybe a] -> [a]
catMaybes (f [Maybe (EventAction, Quantity)] -> f [(EventAction, Quantity)])
-> f [Maybe (EventAction, Quantity)] -> f [(EventAction, Quantity)]
forall a b. (a -> b) -> a -> b
$
[CoRec Identity ts]
-> (CoRec Identity ts -> f (Maybe (EventAction, Quantity)))
-> f [Maybe (EventAction, Quantity)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [CoRec Identity ts]
changes ((CoRec Identity ts -> f (Maybe (EventAction, Quantity)))
-> f [Maybe (EventAction, Quantity)])
-> (CoRec Identity ts -> f (Maybe (EventAction, Quantity)))
-> f [Maybe (EventAction, Quantity)]
forall a b. (a -> b) -> a -> b
$ \CoRec Identity ts
fc -> CoRec Identity ts
-> Handlers ts (f (Maybe (EventAction, Quantity)))
-> f (Maybe (EventAction, Quantity))
forall (ts :: [*]) b. CoRec Identity ts -> Handlers ts b -> b
match CoRec Identity ts
fc (Handlers es (ReaderT Change f EventAction)
-> Handlers ts (f (Maybe (EventAction, Quantity)))
forall (m :: * -> *) (es :: [*]) (es' :: [*]).
MapHandlers m es es' =>
Handlers es (ReaderT Change m EventAction)
-> Handlers es' (m (Maybe (EventAction, Quantity)))
mapHandlers Handlers es (ReaderT Change f EventAction)
handlers')
playMultiLogs
:: forall es k m.
( QueryAllLogs es
#if MIN_VERSION_vinyl(0,10,0)
, RPureConstrained HasLogIndex (WithChange es)
#else
, AllAllSat '[HasLogIndex] (WithChange es)
#endif
, RecApplicative (WithChange es)
, JsonRpc m
)
=> MultiFilterStreamState es
-> MachineT m k [Field (WithChange es)]
playMultiLogs :: MultiFilterStreamState es -> MachineT m k [Field (WithChange es)]
playMultiLogs MultiFilterStreamState es
s = ([Field (WithChange es)] -> [Field (WithChange es)])
-> MachineT m k [Field (WithChange es)]
-> MachineT m k [Field (WithChange es)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [Field (WithChange es)] -> [Field (WithChange es)]
forall (es :: [*]).
(RPureConstrained HasLogIndex es, RecApplicative es) =>
[Field es] -> [Field es]
sortChanges (MachineT m k [Field (WithChange es)]
-> MachineT m k [Field (WithChange es)])
-> MachineT m k [Field (WithChange es)]
-> MachineT m k [Field (WithChange es)]
forall a b. (a -> b) -> a -> b
$
MultiFilterStreamState es -> MachineT m k (MultiFilter es)
forall (m :: * -> *) (es :: [*]) (k :: * -> *).
JsonRpc m =>
MultiFilterStreamState es -> MachineT m k (MultiFilter es)
multiFilterStream MultiFilterStreamState es
s
MachineT m k (MultiFilter es)
-> ProcessT m (MultiFilter es) [Field (WithChange es)]
-> MachineT m k [Field (WithChange es)]
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> (MultiFilter es -> m [Field (WithChange es)])
-> ProcessT m (MultiFilter es) [Field (WithChange es)]
forall (k :: * -> * -> *) (m :: * -> *) a b.
(Category k, Monad m) =>
(a -> m b) -> MachineT m (k a) b
autoM MultiFilter es -> m [Field (WithChange es)]
forall (es :: [*]) (m :: * -> *).
(QueryAllLogs es, JsonRpc m) =>
MultiFilter es -> m [Field (WithChange es)]
queryAllLogs
data TaggedFilterIds (es :: [*]) where
TaggedFilterNil :: TaggedFilterIds '[]
TaggedFilterCons :: Tagged e Quantity -> TaggedFilterIds es -> TaggedFilterIds (e : es)
class PollFilters (es :: [*]) where
openMultiFilter :: JsonRpc m => MultiFilter es -> m (TaggedFilterIds es)
checkMultiFilter :: JsonRpc m => TaggedFilterIds es -> m [Field (WithChange es)]
closeMultiFilter :: JsonRpc m => TaggedFilterIds es -> m ()
instance PollFilters '[] where
openMultiFilter :: MultiFilter '[] -> m (TaggedFilterIds '[])
openMultiFilter MultiFilter '[]
_ = TaggedFilterIds '[] -> m (TaggedFilterIds '[])
forall (f :: * -> *) a. Applicative f => a -> f a
pure TaggedFilterIds '[]
TaggedFilterNil
checkMultiFilter :: TaggedFilterIds '[] -> m [Field (WithChange '[])]
checkMultiFilter TaggedFilterIds '[]
_ = [Field '[]] -> m [Field '[]]
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
closeMultiFilter :: TaggedFilterIds '[] -> m ()
closeMultiFilter TaggedFilterIds '[]
_ = () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
instance forall e i ni es.
( DecodeEvent i ni e
, PollFilters es
, RecApplicative (WithChange es)
, FoldRec (FilterChange e : WithChange es) (WithChange es)
) => PollFilters (e:es) where
openMultiFilter :: MultiFilter (e : es) -> m (TaggedFilterIds (e : es))
openMultiFilter (Filter e
f :? MultiFilter es
fs) = do
Quantity
fId <- Filter e -> m Quantity
forall (m :: * -> *) e. JsonRpc m => Filter e -> m Quantity
Eth.newFilter Filter e
f
TaggedFilterIds es
fsIds <- MultiFilter es -> m (TaggedFilterIds es)
forall (es :: [*]) (m :: * -> *).
(PollFilters es, JsonRpc m) =>
MultiFilter es -> m (TaggedFilterIds es)
openMultiFilter MultiFilter es
fs
TaggedFilterIds (e : es) -> m (TaggedFilterIds (e : es))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TaggedFilterIds (e : es) -> m (TaggedFilterIds (e : es)))
-> TaggedFilterIds (e : es) -> m (TaggedFilterIds (e : es))
forall a b. (a -> b) -> a -> b
$ Tagged e Quantity -> TaggedFilterIds es -> TaggedFilterIds (e : es)
forall e (es :: [*]).
Tagged e Quantity -> TaggedFilterIds es -> TaggedFilterIds (e : es)
TaggedFilterCons (Quantity -> Tagged e Quantity
forall k (s :: k) b. b -> Tagged s b
Tagged Quantity
fId) TaggedFilterIds es
fsIds
checkMultiFilter :: TaggedFilterIds (e : es) -> m [Field (WithChange (e : es))]
checkMultiFilter (TaggedFilterCons (Tagged Quantity
fId) TaggedFilterIds es
fsIds) = do
[Change]
changes <- Quantity -> m [Change]
forall (m :: * -> *). JsonRpc m => Quantity -> m [Change]
Eth.getFilterChanges Quantity
fId
[FilterChange e]
filterChanges <- IO [FilterChange e] -> m [FilterChange e]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [FilterChange e] -> m [FilterChange e])
-> ([Change] -> IO [FilterChange e])
-> [Change]
-> m [FilterChange e]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DecodeEvent i ni e => [Change] -> IO [FilterChange e]
forall i ni e.
DecodeEvent i ni e =>
[Change] -> IO [FilterChange e]
mkFilterChanges @_ @_ @e ([Change] -> m [FilterChange e]) -> [Change] -> m [FilterChange e]
forall a b. (a -> b) -> a -> b
$ [Change]
changes
[Field (WithChange es)]
filterChanges' <- TaggedFilterIds es -> m [Field (WithChange es)]
forall (es :: [*]) (m :: * -> *).
(PollFilters es, JsonRpc m) =>
TaggedFilterIds es -> m [Field (WithChange es)]
checkMultiFilter @es TaggedFilterIds es
TaggedFilterIds es
fsIds
[CoRec Identity (FilterChange e : WithChange es)]
-> m [CoRec Identity (FilterChange e : WithChange es)]
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([CoRec Identity (FilterChange e : WithChange es)]
-> m [CoRec Identity (FilterChange e : WithChange es)])
-> [CoRec Identity (FilterChange e : WithChange es)]
-> m [CoRec Identity (FilterChange e : WithChange es)]
forall a b. (a -> b) -> a -> b
$ (FilterChange e -> CoRec Identity (FilterChange e : WithChange es))
-> [FilterChange e]
-> [CoRec Identity (FilterChange e : WithChange es)]
forall a b. (a -> b) -> [a] -> [b]
map (Identity (FilterChange e)
-> CoRec Identity (FilterChange e : WithChange es)
forall k (a1 :: k) (b :: [k]) (a :: k -> *).
RElem a1 b (RIndex a1 b) =>
a a1 -> CoRec a b
CoRec (Identity (FilterChange e)
-> CoRec Identity (FilterChange e : WithChange es))
-> (FilterChange e -> Identity (FilterChange e))
-> FilterChange e
-> CoRec Identity (FilterChange e : WithChange es)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FilterChange e -> Identity (FilterChange e)
forall a. a -> Identity a
Identity) [FilterChange e]
filterChanges [CoRec Identity (FilterChange e : WithChange es)]
-> [CoRec Identity (FilterChange e : WithChange es)]
-> [CoRec Identity (FilterChange e : WithChange es)]
forall a. Semigroup a => a -> a -> a
<> (Field (WithChange es)
-> CoRec Identity (FilterChange e : WithChange es))
-> [Field (WithChange es)]
-> [CoRec Identity (FilterChange e : WithChange es)]
forall a b. (a -> b) -> [a] -> [b]
map Field (WithChange es)
-> CoRec Identity (FilterChange e : WithChange es)
forall (ts :: [*]) t.
(RecApplicative ts, FoldRec (t : ts) (t : ts)) =>
Field ts -> Field (t : ts)
weakenCoRec [Field (WithChange es)]
filterChanges'
closeMultiFilter :: TaggedFilterIds (e : es) -> m ()
closeMultiFilter (TaggedFilterCons (Tagged Quantity
fId) TaggedFilterIds es
fsIds) = do
Bool
_ <- Quantity -> m Bool
forall (m :: * -> *). JsonRpc m => Quantity -> m Bool
Eth.uninstallFilter Quantity
fId
TaggedFilterIds es -> m ()
forall (es :: [*]) (m :: * -> *).
(PollFilters es, JsonRpc m) =>
TaggedFilterIds es -> m ()
closeMultiFilter TaggedFilterIds es
fsIds
pollMultiFilter
:: ( PollFilters es
, RecApplicative (WithChange es)
#if MIN_VERSION_vinyl(0,10,0)
, RPureConstrained HasLogIndex (WithChange es)
#else
, AllAllSat '[HasLogIndex] (WithChange es)
#endif
, JsonRpc m
)
=> TaggedFilterIds es
-> DefaultBlock
-> MachineT m k [Field (WithChange es)]
pollMultiFilter :: TaggedFilterIds es
-> DefaultBlock -> MachineT m k [Field (WithChange es)]
pollMultiFilter TaggedFilterIds es
is = PlanT k [Field (WithChange es)] m Any
-> MachineT m k [Field (WithChange es)]
forall (m :: * -> *) (k :: * -> *) o a.
Monad m =>
PlanT k o m a -> MachineT m k o
construct (PlanT k [Field (WithChange es)] m Any
-> MachineT m k [Field (WithChange es)])
-> (DefaultBlock -> PlanT k [Field (WithChange es)] m Any)
-> DefaultBlock
-> MachineT m k [Field (WithChange es)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TaggedFilterIds es
-> DefaultBlock -> PlanT k [Field (WithChange es)] m Any
forall (m :: * -> *) (es :: [*]) (k :: * -> *) b.
(JsonRpc m, PollFilters es,
RPureConstrained HasLogIndex (WithChange es),
RecApplicative (WithChange es)) =>
TaggedFilterIds es
-> DefaultBlock -> PlanT k [Field (WithChange es)] m b
pollPlan TaggedFilterIds es
is
where
pollPlan :: TaggedFilterIds es
-> DefaultBlock -> PlanT k [Field (WithChange es)] m b
pollPlan (TaggedFilterIds es
fIds :: TaggedFilterIds es) DefaultBlock
end = do
Quantity
bn <- m Quantity -> PlanT k [Field (WithChange es)] m Quantity
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m Quantity -> PlanT k [Field (WithChange es)] m Quantity)
-> m Quantity -> PlanT k [Field (WithChange es)] m Quantity
forall a b. (a -> b) -> a -> b
$ m Quantity
forall (m :: * -> *). JsonRpc m => m Quantity
Eth.blockNumber
if Quantity -> DefaultBlock
BlockWithNumber Quantity
bn DefaultBlock -> DefaultBlock -> Bool
forall a. Ord a => a -> a -> Bool
> DefaultBlock
end
then do
()
_ <- m () -> PlanT k [Field (WithChange es)] m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> PlanT k [Field (WithChange es)] m ())
-> m () -> PlanT k [Field (WithChange es)] m ()
forall a b. (a -> b) -> a -> b
$ TaggedFilterIds es -> m ()
forall (es :: [*]) (m :: * -> *).
(PollFilters es, JsonRpc m) =>
TaggedFilterIds es -> m ()
closeMultiFilter TaggedFilterIds es
fIds
PlanT k [Field (WithChange es)] m b
forall (k :: * -> *) o a. Plan k o a
stop
else do
IO () -> PlanT k [Field (WithChange es)] m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> PlanT k [Field (WithChange es)] m ())
-> IO () -> PlanT k [Field (WithChange es)] m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
1000000
[Field (WithChange es)]
changes <- m [Field (WithChange es)]
-> PlanT k [Field (WithChange es)] m [Field (WithChange es)]
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m [Field (WithChange es)]
-> PlanT k [Field (WithChange es)] m [Field (WithChange es)])
-> m [Field (WithChange es)]
-> PlanT k [Field (WithChange es)] m [Field (WithChange es)]
forall a b. (a -> b) -> a -> b
$ [Field (WithChange es)] -> [Field (WithChange es)]
forall (es :: [*]).
(RPureConstrained HasLogIndex es, RecApplicative es) =>
[Field es] -> [Field es]
sortChanges ([Field (WithChange es)] -> [Field (WithChange es)])
-> m [Field (WithChange es)] -> m [Field (WithChange es)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TaggedFilterIds es -> m [Field (WithChange es)]
forall (es :: [*]) (m :: * -> *).
(PollFilters es, JsonRpc m) =>
TaggedFilterIds es -> m [Field (WithChange es)]
checkMultiFilter TaggedFilterIds es
fIds
[Field (WithChange es)] -> Plan k [Field (WithChange es)] ()
forall o (k :: * -> *). o -> Plan k o ()
yield [Field (WithChange es)]
changes
TaggedFilterIds es
-> DefaultBlock -> PlanT k [Field (WithChange es)] m b
pollPlan TaggedFilterIds es
fIds DefaultBlock
end
multiEventNoFilter
:: ( QueryAllLogs es
, MapHandlers m es (WithChange es)
#if MIN_VERSION_vinyl(0,10,0)
, RPureConstrained HasLogIndex (WithChange es)
#else
, AllAllSat '[HasLogIndex] (WithChange es)
#endif
, RecApplicative (WithChange es)
, JsonRpc m
)
=> MultiFilter es
-> Handlers es (ReaderT Change m EventAction)
-> m ()
multiEventNoFilter :: MultiFilter es
-> Handlers es (ReaderT Change m EventAction) -> m ()
multiEventNoFilter MultiFilter es
fltrs = MultiFilter es
-> Integer -> Handlers es (ReaderT Change m EventAction) -> m ()
forall (es :: [*]) (m :: * -> *).
(QueryAllLogs es, MapHandlers m es (WithChange es),
RPureConstrained HasLogIndex (WithChange es),
RecApplicative (WithChange es), JsonRpc m) =>
MultiFilter es
-> Integer -> Handlers es (ReaderT Change m EventAction) -> m ()
multiEventManyNoFilter MultiFilter es
fltrs Integer
0
multiEventManyNoFilter
:: ( QueryAllLogs es
, MapHandlers m es (WithChange es)
#if MIN_VERSION_vinyl(0,10,0)
, RPureConstrained HasLogIndex (WithChange es)
#else
, AllAllSat '[HasLogIndex] (WithChange es)
#endif
, RecApplicative (WithChange es)
, JsonRpc m
)
=> MultiFilter es
-> Integer
-> Handlers es (ReaderT Change m EventAction)
-> m ()
multiEventManyNoFilter :: MultiFilter es
-> Integer -> Handlers es (ReaderT Change m EventAction) -> m ()
multiEventManyNoFilter MultiFilter es
fltrs Integer
window Handlers es (ReaderT Change m EventAction)
handlers = do
Quantity
start <- DefaultBlock -> m Quantity
forall (m :: * -> *). JsonRpc m => DefaultBlock -> m Quantity
mkBlockNumber (DefaultBlock -> m Quantity) -> DefaultBlock -> m Quantity
forall a b. (a -> b) -> a -> b
$ MultiFilter es -> DefaultBlock
forall (es :: [*]). MultiFilter es -> DefaultBlock
minStartBlock MultiFilter es
fltrs
let initState :: MultiFilterStreamState es
initState =
MultiFilterStreamState :: forall (es :: [*]).
Quantity -> MultiFilter es -> Integer -> MultiFilterStreamState es
MultiFilterStreamState { mfssCurrentBlock :: Quantity
mfssCurrentBlock = Quantity
start
, mfssInitialMultiFilter :: MultiFilter es
mfssInitialMultiFilter = MultiFilter es
fltrs
, mfssWindowSize :: Integer
mfssWindowSize = Integer
window
}
Maybe (EventAction, Quantity)
mLastProcessedFilterState <- MachineT m Any [Field (WithChange es)]
-> Handlers es (ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
forall (m :: * -> *) (es :: [*]) (k :: * -> *).
(Monad m, MapHandlers m es (WithChange es)) =>
MachineT m k [Field (WithChange es)]
-> Handlers es (ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
reduceMultiEventStream (MultiFilterStreamState es -> MachineT m Any [Field (WithChange es)]
forall (es :: [*]) (k :: * -> *) (m :: * -> *).
(QueryAllLogs es, RPureConstrained HasLogIndex (WithChange es),
RecApplicative (WithChange es), JsonRpc m) =>
MultiFilterStreamState es -> MachineT m k [Field (WithChange es)]
playMultiLogs MultiFilterStreamState es
initState) Handlers es (ReaderT Change m EventAction)
handlers
case Maybe (EventAction, Quantity)
mLastProcessedFilterState of
Maybe (EventAction, Quantity)
Nothing ->
let pollingFilterState :: MultiFilterStreamState es
pollingFilterState =
MultiFilterStreamState :: forall (es :: [*]).
Quantity -> MultiFilter es -> Integer -> MultiFilterStreamState es
MultiFilterStreamState { mfssCurrentBlock :: Quantity
mfssCurrentBlock = Quantity
start
, mfssInitialMultiFilter :: MultiFilter es
mfssInitialMultiFilter = MultiFilter es
fltrs
, mfssWindowSize :: Integer
mfssWindowSize = Integer
0
}
in m (Maybe (EventAction, Quantity)) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Maybe (EventAction, Quantity)) -> m ())
-> m (Maybe (EventAction, Quantity)) -> m ()
forall a b. (a -> b) -> a -> b
$ MachineT m Any [Field (WithChange es)]
-> Handlers es (ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
forall (m :: * -> *) (es :: [*]) (k :: * -> *).
(Monad m, MapHandlers m es (WithChange es)) =>
MachineT m k [Field (WithChange es)]
-> Handlers es (ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
reduceMultiEventStream (MultiFilterStreamState es -> MachineT m Any [Field (WithChange es)]
forall (es :: [*]) (k :: * -> *) (m :: * -> *).
(QueryAllLogs es, RPureConstrained HasLogIndex (WithChange es),
RecApplicative (WithChange es), JsonRpc m) =>
MultiFilterStreamState es -> MachineT m k [Field (WithChange es)]
playNewMultiLogs MultiFilterStreamState es
pollingFilterState) Handlers es (ReaderT Change m EventAction)
handlers
Just (EventAction
act, Quantity
lastBlock) -> do
Quantity
end <- DefaultBlock -> m Quantity
forall (m :: * -> *). JsonRpc m => DefaultBlock -> m Quantity
mkBlockNumber (DefaultBlock -> m Quantity)
-> (MultiFilter es -> DefaultBlock) -> MultiFilter es -> m Quantity
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MultiFilter es -> DefaultBlock
forall (es :: [*]). MultiFilter es -> DefaultBlock
minEndBlock (MultiFilter es -> m Quantity) -> MultiFilter es -> m Quantity
forall a b. (a -> b) -> a -> b
$ MultiFilter es
fltrs
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (EventAction
act EventAction -> EventAction -> Bool
forall a. Eq a => a -> a -> Bool
/= EventAction
TerminateEvent Bool -> Bool -> Bool
&& Quantity
lastBlock Quantity -> Quantity -> Bool
forall a. Ord a => a -> a -> Bool
< Quantity
end) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
let pollingFilterState :: MultiFilterStreamState es
pollingFilterState = MultiFilterStreamState :: forall (es :: [*]).
Quantity -> MultiFilter es -> Integer -> MultiFilterStreamState es
MultiFilterStreamState { mfssCurrentBlock :: Quantity
mfssCurrentBlock = Quantity
lastBlock Quantity -> Quantity -> Quantity
forall a. Num a => a -> a -> a
+ Quantity
1
, mfssInitialMultiFilter :: MultiFilter es
mfssInitialMultiFilter = MultiFilter es
fltrs
, mfssWindowSize :: Integer
mfssWindowSize = Integer
0
}
in m (Maybe (EventAction, Quantity)) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Maybe (EventAction, Quantity)) -> m ())
-> m (Maybe (EventAction, Quantity)) -> m ()
forall a b. (a -> b) -> a -> b
$ MachineT m Any [Field (WithChange es)]
-> Handlers es (ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
forall (m :: * -> *) (es :: [*]) (k :: * -> *).
(Monad m, MapHandlers m es (WithChange es)) =>
MachineT m k [Field (WithChange es)]
-> Handlers es (ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
reduceMultiEventStream (MultiFilterStreamState es -> MachineT m Any [Field (WithChange es)]
forall (es :: [*]) (k :: * -> *) (m :: * -> *).
(QueryAllLogs es, RPureConstrained HasLogIndex (WithChange es),
RecApplicative (WithChange es), JsonRpc m) =>
MultiFilterStreamState es -> MachineT m k [Field (WithChange es)]
playNewMultiLogs MultiFilterStreamState es
pollingFilterState) Handlers es (ReaderT Change m EventAction)
handlers
newMultiFilterStream :: JsonRpc m
=> MultiFilterStreamState es
-> MachineT m k (MultiFilter es)
newMultiFilterStream :: MultiFilterStreamState es -> MachineT m k (MultiFilter es)
newMultiFilterStream MultiFilterStreamState es
initialPlan = do
MultiFilterStreamState es
-> (MultiFilterStreamState es
-> PlanT k (MultiFilter es) m (MultiFilterStreamState es))
-> MachineT m k (MultiFilter es)
forall (m :: * -> *) s (k :: * -> *) o.
Monad m =>
s -> (s -> PlanT k o m s) -> MachineT m k o
unfoldPlan MultiFilterStreamState es
initialPlan ((MultiFilterStreamState es
-> PlanT k (MultiFilter es) m (MultiFilterStreamState es))
-> MachineT m k (MultiFilter es))
-> (MultiFilterStreamState es
-> PlanT k (MultiFilter es) m (MultiFilterStreamState es))
-> MachineT m k (MultiFilter es)
forall a b. (a -> b) -> a -> b
$ \MultiFilterStreamState es
s -> do
let end :: DefaultBlock
end = MultiFilter es -> DefaultBlock
forall (es :: [*]). MultiFilter es -> DefaultBlock
minEndBlock (MultiFilter es -> DefaultBlock)
-> (MultiFilterStreamState es -> MultiFilter es)
-> MultiFilterStreamState es
-> DefaultBlock
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MultiFilterStreamState es -> MultiFilter es
forall (es :: [*]). MultiFilterStreamState es -> MultiFilter es
mfssInitialMultiFilter (MultiFilterStreamState es -> DefaultBlock)
-> MultiFilterStreamState es -> DefaultBlock
forall a b. (a -> b) -> a -> b
$ MultiFilterStreamState es
initialPlan
DefaultBlock
-> MultiFilterStreamState es
-> PlanT k (MultiFilter es) m (MultiFilterStreamState es)
forall (m :: * -> *) (es :: [*]) (k :: * -> *).
JsonRpc m =>
DefaultBlock
-> MultiFilterStreamState es
-> PlanT k (MultiFilter es) m (MultiFilterStreamState es)
filterPlan DefaultBlock
end MultiFilterStreamState es
s
where
filterPlan :: JsonRpc m
=> DefaultBlock
-> MultiFilterStreamState es
-> PlanT k (MultiFilter es) m (MultiFilterStreamState es)
filterPlan :: DefaultBlock
-> MultiFilterStreamState es
-> PlanT k (MultiFilter es) m (MultiFilterStreamState es)
filterPlan DefaultBlock
end initialState :: MultiFilterStreamState es
initialState@MultiFilterStreamState{Integer
Quantity
MultiFilter es
mfssWindowSize :: Integer
mfssInitialMultiFilter :: MultiFilter es
mfssCurrentBlock :: Quantity
mfssWindowSize :: forall (es :: [*]). MultiFilterStreamState es -> Integer
mfssInitialMultiFilter :: forall (es :: [*]). MultiFilterStreamState es -> MultiFilter es
mfssCurrentBlock :: forall (es :: [*]). MultiFilterStreamState es -> Quantity
..} = do
if Quantity -> DefaultBlock
BlockWithNumber Quantity
mfssCurrentBlock DefaultBlock -> DefaultBlock -> Bool
forall a. Ord a => a -> a -> Bool
> DefaultBlock
end
then PlanT k (MultiFilter es) m (MultiFilterStreamState es)
forall (k :: * -> *) o a. Plan k o a
stop
else do
Quantity
newestBlockNumber <- m Quantity -> PlanT k (MultiFilter es) m Quantity
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m Quantity -> PlanT k (MultiFilter es) m Quantity)
-> (Quantity -> m Quantity)
-> Quantity
-> PlanT k (MultiFilter es) m Quantity
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Quantity -> m Quantity
forall (m :: * -> *). JsonRpc m => Quantity -> m Quantity
pollTillBlockProgress (Quantity -> PlanT k (MultiFilter es) m Quantity)
-> Quantity -> PlanT k (MultiFilter es) m Quantity
forall a b. (a -> b) -> a -> b
$ Quantity
mfssCurrentBlock
let h :: forall e. Filter e -> Filter e
h :: Filter e -> Filter e
h Filter e
f = Filter e
f { filterFromBlock :: DefaultBlock
filterFromBlock = Quantity -> DefaultBlock
BlockWithNumber Quantity
mfssCurrentBlock
, filterToBlock :: DefaultBlock
filterToBlock = Quantity -> DefaultBlock
BlockWithNumber Quantity
newestBlockNumber
}
MultiFilter es -> Plan k (MultiFilter es) ()
forall o (k :: * -> *). o -> Plan k o ()
yield ((forall e. Filter e -> Filter e)
-> MultiFilter es -> MultiFilter es
forall (es :: [*]).
(forall e. Filter e -> Filter e)
-> MultiFilter es -> MultiFilter es
modifyMultiFilter forall e. Filter e -> Filter e
h MultiFilter es
mfssInitialMultiFilter)
DefaultBlock
-> MultiFilterStreamState es
-> PlanT k (MultiFilter es) m (MultiFilterStreamState es)
forall (m :: * -> *) (es :: [*]) (k :: * -> *).
JsonRpc m =>
DefaultBlock
-> MultiFilterStreamState es
-> PlanT k (MultiFilter es) m (MultiFilterStreamState es)
filterPlan DefaultBlock
end MultiFilterStreamState es
initialState { mfssCurrentBlock :: Quantity
mfssCurrentBlock = Quantity
newestBlockNumber Quantity -> Quantity -> Quantity
forall a. Num a => a -> a -> a
+ Quantity
1 }
playNewMultiLogs
:: forall es k m.
( QueryAllLogs es
#if MIN_VERSION_vinyl(0,10,0)
, RPureConstrained HasLogIndex (WithChange es)
#else
, AllAllSat '[HasLogIndex] (WithChange es)
#endif
, RecApplicative (WithChange es)
, JsonRpc m
)
=> MultiFilterStreamState es
-> MachineT m k [Field (WithChange es)]
playNewMultiLogs :: MultiFilterStreamState es -> MachineT m k [Field (WithChange es)]
playNewMultiLogs MultiFilterStreamState es
s = ([Field (WithChange es)] -> [Field (WithChange es)])
-> MachineT m k [Field (WithChange es)]
-> MachineT m k [Field (WithChange es)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [Field (WithChange es)] -> [Field (WithChange es)]
forall (es :: [*]).
(RPureConstrained HasLogIndex es, RecApplicative es) =>
[Field es] -> [Field es]
sortChanges (MachineT m k [Field (WithChange es)]
-> MachineT m k [Field (WithChange es)])
-> MachineT m k [Field (WithChange es)]
-> MachineT m k [Field (WithChange es)]
forall a b. (a -> b) -> a -> b
$
MultiFilterStreamState es -> MachineT m k (MultiFilter es)
forall (m :: * -> *) (es :: [*]) (k :: * -> *).
JsonRpc m =>
MultiFilterStreamState es -> MachineT m k (MultiFilter es)
newMultiFilterStream MultiFilterStreamState es
s
MachineT m k (MultiFilter es)
-> ProcessT m (MultiFilter es) [Field (WithChange es)]
-> MachineT m k [Field (WithChange es)]
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> (MultiFilter es -> m [Field (WithChange es)])
-> ProcessT m (MultiFilter es) [Field (WithChange es)]
forall (k :: * -> * -> *) (m :: * -> *) a b.
(Category k, Monad m) =>
(a -> m b) -> MachineT m (k a) b
autoM MultiFilter es -> m [Field (WithChange es)]
forall (es :: [*]) (m :: * -> *).
(QueryAllLogs es, JsonRpc m) =>
MultiFilter es -> m [Field (WithChange es)]
queryAllLogs