{-# language AllowAmbiguousTypes #-}
{-# language DataKinds #-}
{-# language DeriveFunctor #-}
{-# language FlexibleContexts #-}
{-# language FlexibleInstances #-}
{-# language GADTs #-}
{-# language LambdaCase #-}
{-# language MultiParamTypeClasses #-}
{-# language OverloadedStrings #-}
{-# language PolyKinds #-}
{-# language ScopedTypeVariables #-}
{-# language TypeApplications #-}
{-# language TypeFamilies #-}
{-# language TypeOperators #-}
{-# language UndecidableInstances #-}
{-# OPTIONS_GHC -fprint-explicit-kinds #-}
module Mu.GRpc.Client.Internal where
import Control.Concurrent.Async
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TMChan
import Control.Concurrent.STM.TMVar
import Control.Exception (throwIO)
import Control.Monad.IO.Class
import Data.Avro
import qualified Data.ByteString.Char8 as BS
import Data.Conduit
import qualified Data.Conduit.Combinators as C
import Data.Conduit.TMChan
import Data.Kind
import Data.Text as T
import GHC.TypeLits
import Monitor.Tracing
import Monitor.Tracing.Zipkin
import Network.GRPC.Client (CompressMode (..), IncomingEvent (..),
OutgoingEvent (..), RawReply, StreamDone (..))
import Network.GRPC.Client.Helpers
import Network.GRPC.HTTP2.Encoding (GRPCInput, GRPCOutput)
import Network.HTTP2 (ErrorCode)
import Network.HTTP2.Client (ClientError, ClientIO, TooMuchConcurrency,
runExceptT, ExceptT)
import Mu.Adapter.ProtoBuf.Via
import Mu.GRpc.Avro
import Mu.GRpc.Bridge
import Mu.Rpc
import Mu.Schema
setupGrpcClient' :: MonadIO m
=> GrpcClientConfig -> m (Either ClientError GrpcClient)
setupGrpcClient' :: GrpcClientConfig -> m (Either ClientError GrpcClient)
setupGrpcClient' = IO (Either ClientError GrpcClient)
-> m (Either ClientError GrpcClient)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either ClientError GrpcClient)
-> m (Either ClientError GrpcClient))
-> (GrpcClientConfig -> IO (Either ClientError GrpcClient))
-> GrpcClientConfig
-> m (Either ClientError GrpcClient)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExceptT ClientError IO GrpcClient
-> IO (Either ClientError GrpcClient)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT ClientError IO GrpcClient
-> IO (Either ClientError GrpcClient))
-> (GrpcClientConfig -> ExceptT ClientError IO GrpcClient)
-> GrpcClientConfig
-> IO (Either ClientError GrpcClient)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. GrpcClientConfig -> ExceptT ClientError IO GrpcClient
setupGrpcClient
setupGrpcClientZipkin
:: (MonadIO m, MonadTrace m)
=> GrpcClientConfig -> T.Text -> m (Either ClientError GrpcClient)
setupGrpcClientZipkin :: GrpcClientConfig -> Text -> m (Either ClientError GrpcClient)
setupGrpcClientZipkin GrpcClientConfig
cfg Text
spanName
= Text
-> (Maybe B3 -> m (Either ClientError GrpcClient))
-> m (Either ClientError GrpcClient)
forall (m :: * -> *) a.
MonadTrace m =>
Text -> (Maybe B3 -> m a) -> m a
clientSpan Text
spanName ((Maybe B3 -> m (Either ClientError GrpcClient))
-> m (Either ClientError GrpcClient))
-> (Maybe B3 -> m (Either ClientError GrpcClient))
-> m (Either ClientError GrpcClient)
forall a b. (a -> b) -> a -> b
$ \case
Maybe B3
Nothing -> GrpcClientConfig -> m (Either ClientError GrpcClient)
forall (m :: * -> *).
MonadIO m =>
GrpcClientConfig -> m (Either ClientError GrpcClient)
setupGrpcClient' GrpcClientConfig
cfg
(Just B3
b3) -> GrpcClientConfig -> m (Either ClientError GrpcClient)
forall (m :: * -> *).
MonadIO m =>
GrpcClientConfig -> m (Either ClientError GrpcClient)
setupGrpcClient' GrpcClientConfig
cfg {
_grpcClientConfigHeaders :: [(ByteString, ByteString)]
_grpcClientConfigHeaders = (ByteString
"b3", B3 -> ByteString
b3ToHeaderValue B3
b3)
(ByteString, ByteString)
-> [(ByteString, ByteString)] -> [(ByteString, ByteString)]
forall a. a -> [a] -> [a]
: GrpcClientConfig -> [(ByteString, ByteString)]
_grpcClientConfigHeaders GrpcClientConfig
cfg
}
class GRpcServiceMethodCall (p :: GRpcMessageProtocol)
(pkg :: snm) (s :: snm)
(m :: Method snm mnm anm (TypeRef snm)) h where
gRpcServiceMethodCall :: Proxy p -> Proxy pkg -> Proxy s -> Proxy m -> GrpcClient -> h
instance ( KnownName serviceName, KnownName pkg, KnownName mname
, GRpcMethodCall p ('Method mname margs mret) h, MkRPC p )
=> GRpcServiceMethodCall p pkg serviceName ('Method mname margs mret) h where
gRpcServiceMethodCall :: Proxy @GRpcMessageProtocol p
-> Proxy @Symbol pkg
-> Proxy @Symbol serviceName
-> Proxy
@(Method @* Symbol Symbol Symbol (TypeRef Symbol))
('Method
@* @Symbol @Symbol @Symbol @(TypeRef Symbol) mname margs mret)
-> GrpcClient
-> h
gRpcServiceMethodCall Proxy @GRpcMessageProtocol p
pro Proxy @Symbol pkg
_ Proxy @Symbol serviceName
_ = RPCTy p
-> Proxy
@(Method @* Symbol Symbol Symbol (TypeRef Symbol))
('Method
@* @Symbol @Symbol @Symbol @(TypeRef Symbol) mname margs mret)
-> GrpcClient
-> h
forall (p :: GRpcMessageProtocol)
(method :: Method @* Symbol Symbol Symbol (TypeRef Symbol)) h.
GRpcMethodCall p method h =>
RPCTy p
-> Proxy @(Method @* Symbol Symbol Symbol (TypeRef Symbol)) method
-> GrpcClient
-> h
gRpcMethodCall @p RPCTy p
rpc
where pkgName :: ByteString
pkgName = String -> ByteString
BS.pack (Proxy @Symbol pkg -> String
forall k (a :: k) (proxy :: k -> *).
KnownName @k a =>
proxy a -> String
nameVal (Proxy @Symbol pkg
forall k (t :: k). Proxy @k t
Proxy @pkg))
svrName :: ByteString
svrName = String -> ByteString
BS.pack (Proxy @Symbol serviceName -> String
forall k (a :: k) (proxy :: k -> *).
KnownName @k a =>
proxy a -> String
nameVal (Proxy @Symbol serviceName
forall k (t :: k). Proxy @k t
Proxy @serviceName))
metName :: ByteString
metName = String -> ByteString
BS.pack (Proxy @Symbol mname -> String
forall k (a :: k) (proxy :: k -> *).
KnownName @k a =>
proxy a -> String
nameVal (Proxy @Symbol mname
forall k (t :: k). Proxy @k t
Proxy @mname))
rpc :: RPCTy p
rpc = Proxy @GRpcMessageProtocol p
-> ByteString -> ByteString -> ByteString -> RPCTy p
forall (p :: GRpcMessageProtocol).
MkRPC p =>
Proxy @GRpcMessageProtocol p
-> ByteString -> ByteString -> ByteString -> RPCTy p
mkRPC Proxy @GRpcMessageProtocol p
pro ByteString
pkgName ByteString
svrName ByteString
metName
data GRpcReply a
= GRpcTooMuchConcurrency TooMuchConcurrency
| GRpcErrorCode ErrorCode
| GRpcErrorString String
| GRpcClientError ClientError
| GRpcOk a
deriving (Int -> GRpcReply a -> ShowS
[GRpcReply a] -> ShowS
GRpcReply a -> String
(Int -> GRpcReply a -> ShowS)
-> (GRpcReply a -> String)
-> ([GRpcReply a] -> ShowS)
-> Show (GRpcReply a)
forall a. Show a => Int -> GRpcReply a -> ShowS
forall a. Show a => [GRpcReply a] -> ShowS
forall a. Show a => GRpcReply a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [GRpcReply a] -> ShowS
$cshowList :: forall a. Show a => [GRpcReply a] -> ShowS
show :: GRpcReply a -> String
$cshow :: forall a. Show a => GRpcReply a -> String
showsPrec :: Int -> GRpcReply a -> ShowS
$cshowsPrec :: forall a. Show a => Int -> GRpcReply a -> ShowS
Show, a -> GRpcReply b -> GRpcReply a
(a -> b) -> GRpcReply a -> GRpcReply b
(forall a b. (a -> b) -> GRpcReply a -> GRpcReply b)
-> (forall a b. a -> GRpcReply b -> GRpcReply a)
-> Functor GRpcReply
forall a b. a -> GRpcReply b -> GRpcReply a
forall a b. (a -> b) -> GRpcReply a -> GRpcReply b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> GRpcReply b -> GRpcReply a
$c<$ :: forall a b. a -> GRpcReply b -> GRpcReply a
fmap :: (a -> b) -> GRpcReply a -> GRpcReply b
$cfmap :: forall a b. (a -> b) -> GRpcReply a -> GRpcReply b
Functor)
buildGRpcReply1 :: Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 :: Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 (Left TooMuchConcurrency
tmc) = TooMuchConcurrency -> GRpcReply a
forall a. TooMuchConcurrency -> GRpcReply a
GRpcTooMuchConcurrency TooMuchConcurrency
tmc
buildGRpcReply1 (Right (Left ErrorCode
ec)) = ErrorCode -> GRpcReply a
forall a. ErrorCode -> GRpcReply a
GRpcErrorCode ErrorCode
ec
buildGRpcReply1 (Right (Right (CIHeaderList
_, Maybe CIHeaderList
_, Left String
es))) = String -> GRpcReply a
forall a. String -> GRpcReply a
GRpcErrorString String
es
buildGRpcReply1 (Right (Right (CIHeaderList
_, Maybe CIHeaderList
_, Right a
r))) = a -> GRpcReply a
forall a. a -> GRpcReply a
GRpcOk a
r
buildGRpcReply2 :: Either TooMuchConcurrency (r, RawReply a) -> GRpcReply a
buildGRpcReply2 :: Either TooMuchConcurrency (r, RawReply a) -> GRpcReply a
buildGRpcReply2 (Left TooMuchConcurrency
tmc) = TooMuchConcurrency -> GRpcReply a
forall a. TooMuchConcurrency -> GRpcReply a
GRpcTooMuchConcurrency TooMuchConcurrency
tmc
buildGRpcReply2 (Right (r
_, Left ErrorCode
ec)) = ErrorCode -> GRpcReply a
forall a. ErrorCode -> GRpcReply a
GRpcErrorCode ErrorCode
ec
buildGRpcReply2 (Right (r
_, Right (CIHeaderList
_, Maybe CIHeaderList
_, Left String
es))) = String -> GRpcReply a
forall a. String -> GRpcReply a
GRpcErrorString String
es
buildGRpcReply2 (Right (r
_, Right (CIHeaderList
_, Maybe CIHeaderList
_, Right a
r))) = a -> GRpcReply a
forall a. a -> GRpcReply a
GRpcOk a
r
buildGRpcReply3 :: Either TooMuchConcurrency v -> GRpcReply ()
buildGRpcReply3 :: Either TooMuchConcurrency v -> GRpcReply ()
buildGRpcReply3 (Left TooMuchConcurrency
tmc) = TooMuchConcurrency -> GRpcReply ()
forall a. TooMuchConcurrency -> GRpcReply a
GRpcTooMuchConcurrency TooMuchConcurrency
tmc
buildGRpcReply3 (Right v
_) = () -> GRpcReply ()
forall a. a -> GRpcReply a
GRpcOk ()
simplifyResponse :: ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse :: ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse ClientIO (GRpcReply a)
reply = do
Either ClientError (GRpcReply a)
r <- ClientIO (GRpcReply a) -> IO (Either ClientError (GRpcReply a))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT ClientIO (GRpcReply a)
reply
GRpcReply a -> IO (GRpcReply a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (GRpcReply a -> IO (GRpcReply a))
-> GRpcReply a -> IO (GRpcReply a)
forall a b. (a -> b) -> a -> b
$ case Either ClientError (GRpcReply a)
r of
Left ClientError
e -> ClientError -> GRpcReply a
forall a. ClientError -> GRpcReply a
GRpcClientError ClientError
e
Right GRpcReply a
v -> GRpcReply a
v
class GRPCInput (RPCTy p) (GRpcIWTy p ref r)
=> GRpcInputWrapper (p :: GRpcMessageProtocol) (ref :: TypeRef snm) (r :: Type) where
type GRpcIWTy p ref r :: Type
buildGRpcIWTy :: Proxy p -> Proxy ref -> r -> GRpcIWTy p ref r
instance ToProtoBufTypeRef ref r
=> GRpcInputWrapper 'MsgProtoBuf ref r where
type GRpcIWTy 'MsgProtoBuf ref r = ViaToProtoBufTypeRef ref r
buildGRpcIWTy :: Proxy @GRpcMessageProtocol 'MsgProtoBuf
-> Proxy @(TypeRef snm) ref
-> r
-> GRpcIWTy @snm 'MsgProtoBuf ref r
buildGRpcIWTy Proxy @GRpcMessageProtocol 'MsgProtoBuf
_ Proxy @(TypeRef snm) ref
_ = r -> GRpcIWTy @snm 'MsgProtoBuf ref r
forall snm (ref :: TypeRef snm) t.
t -> ViaToProtoBufTypeRef @snm ref t
ViaToProtoBufTypeRef
instance forall (sch :: Schema') (sty :: Symbol) (r :: Type).
( ToSchema sch sty r
, ToAvro (WithSchema sch sty r)
, HasAvroSchema (WithSchema sch sty r) )
=> GRpcInputWrapper 'MsgAvro ('SchemaRef sch sty) r where
type GRpcIWTy 'MsgAvro ('SchemaRef sch sty) r = ViaToAvroTypeRef ('SchemaRef sch sty) r
buildGRpcIWTy :: Proxy @GRpcMessageProtocol 'MsgAvro
-> Proxy @(TypeRef snm) ('SchemaRef @Symbol @Symbol @snm sch sty)
-> r
-> GRpcIWTy
@snm 'MsgAvro ('SchemaRef @Symbol @Symbol @snm sch sty) r
buildGRpcIWTy Proxy @GRpcMessageProtocol 'MsgAvro
_ Proxy @(TypeRef snm) ('SchemaRef @Symbol @Symbol @snm sch sty)
_ = r
-> GRpcIWTy
@snm 'MsgAvro ('SchemaRef @Symbol @Symbol @snm sch sty) r
forall snm (ref :: TypeRef snm) t. t -> ViaToAvroTypeRef @snm ref t
ViaToAvroTypeRef
class GRPCOutput (RPCTy p) (GRpcOWTy p ref r)
=> GRpcOutputWrapper (p :: GRpcMessageProtocol) (ref :: TypeRef snm) (r :: Type) where
type GRpcOWTy p ref r :: Type
unGRpcOWTy :: Proxy p -> Proxy ref -> GRpcOWTy p ref r -> r
instance FromProtoBufTypeRef ref r
=> GRpcOutputWrapper 'MsgProtoBuf ref r where
type GRpcOWTy 'MsgProtoBuf ref r = ViaFromProtoBufTypeRef ref r
unGRpcOWTy :: Proxy @GRpcMessageProtocol 'MsgProtoBuf
-> Proxy @(TypeRef snm) ref
-> GRpcOWTy @snm 'MsgProtoBuf ref r
-> r
unGRpcOWTy Proxy @GRpcMessageProtocol 'MsgProtoBuf
_ Proxy @(TypeRef snm) ref
_ = GRpcOWTy @snm 'MsgProtoBuf ref r -> r
forall snm (ref :: TypeRef snm) t.
ViaFromProtoBufTypeRef @snm ref t -> t
unViaFromProtoBufTypeRef
instance forall (sch :: Schema') (sty :: Symbol) (r :: Type).
( FromSchema sch sty r
, FromAvro (WithSchema sch sty r)
, HasAvroSchema (WithSchema sch sty r) )
=> GRpcOutputWrapper 'MsgAvro ('SchemaRef sch sty) r where
type GRpcOWTy 'MsgAvro ('SchemaRef sch sty) r = ViaFromAvroTypeRef ('SchemaRef sch sty) r
unGRpcOWTy :: Proxy @GRpcMessageProtocol 'MsgAvro
-> Proxy @(TypeRef snm) ('SchemaRef @Symbol @Symbol @snm sch sty)
-> GRpcOWTy
@snm 'MsgAvro ('SchemaRef @Symbol @Symbol @snm sch sty) r
-> r
unGRpcOWTy Proxy @GRpcMessageProtocol 'MsgAvro
_ Proxy @(TypeRef snm) ('SchemaRef @Symbol @Symbol @snm sch sty)
_ = GRpcOWTy @snm 'MsgAvro ('SchemaRef @Symbol @Symbol @snm sch sty) r
-> r
forall snm (ref :: TypeRef snm) t.
ViaFromAvroTypeRef @snm ref t -> t
unViaFromAvroTypeRef
class GRpcMethodCall (p :: GRpcMessageProtocol) (method :: Method') h where
gRpcMethodCall :: RPCTy p -> Proxy method -> GrpcClient -> h
instance ( KnownName name
, GRPCInput (RPCTy p) (), GRPCOutput (RPCTy p) ()
, handler ~ IO (GRpcReply ()) )
=> GRpcMethodCall p ('Method name '[ ] 'RetNothing) handler where
gRpcMethodCall :: RPCTy p
-> Proxy
@(Method @* Symbol Symbol Symbol (TypeRef Symbol))
('Method
@*
@Symbol
@Symbol
@Symbol
@(TypeRef Symbol)
name
('[] @(Argument @* Symbol Symbol (TypeRef Symbol)))
('RetNothing @* @Symbol @(TypeRef Symbol)))
-> GrpcClient
-> handler
gRpcMethodCall RPCTy p
rpc Proxy
@(Method @* Symbol Symbol Symbol (TypeRef Symbol))
('Method
@*
@Symbol
@Symbol
@Symbol
@(TypeRef Symbol)
name
('[] @(Argument @* Symbol Symbol (TypeRef Symbol)))
('RetNothing @* @Symbol @(TypeRef Symbol)))
_ GrpcClient
client
= ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> IO (GRpcReply ()))
-> ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$
Either TooMuchConcurrency (RawReply ()) -> GRpcReply ()
forall a. Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 (Either TooMuchConcurrency (RawReply ()) -> GRpcReply ())
-> ExceptT ClientError IO (Either TooMuchConcurrency (RawReply ()))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
RPCTy p
-> GrpcClient
-> ()
-> ExceptT ClientError IO (Either TooMuchConcurrency (RawReply ()))
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> i
-> ClientIO (Either TooMuchConcurrency (RawReply o))
rawUnary RPCTy p
rpc GrpcClient
client ()
instance ( KnownName name
, GRPCInput (RPCTy p) (), GRpcOutputWrapper p rref r
, handler ~ IO (GRpcReply r) )
=> GRpcMethodCall p ('Method name '[ ] ('RetSingle rref)) handler where
gRpcMethodCall :: RPCTy p
-> Proxy
@(Method @* Symbol Symbol Symbol (TypeRef Symbol))
('Method
@*
@Symbol
@Symbol
@Symbol
@(TypeRef Symbol)
name
('[] @(Argument @* Symbol Symbol (TypeRef Symbol)))
('RetSingle @* @(TypeRef Symbol) @Symbol rref))
-> GrpcClient
-> handler
gRpcMethodCall RPCTy p
rpc Proxy
@(Method @* Symbol Symbol Symbol (TypeRef Symbol))
('Method
@*
@Symbol
@Symbol
@Symbol
@(TypeRef Symbol)
name
('[] @(Argument @* Symbol Symbol (TypeRef Symbol)))
('RetSingle @* @(TypeRef Symbol) @Symbol rref))
_ GrpcClient
client
= (GRpcReply (GRpcOWTy @Symbol p rref r) -> GRpcReply r)
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> IO (GRpcReply r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((GRpcOWTy @Symbol p rref r -> r)
-> GRpcReply (GRpcOWTy @Symbol p rref r) -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) rref -> GRpcOWTy @Symbol p rref r -> r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> GRpcOWTy @snm p ref r -> r
unGRpcOWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) rref
forall k (t :: k). Proxy @k t
Proxy @rref))) (IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> IO (GRpcReply r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> IO (GRpcReply r)
forall a b. (a -> b) -> a -> b
$
ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall a b. (a -> b) -> a -> b
$
Either TooMuchConcurrency (RawReply (GRpcOWTy @Symbol p rref r))
-> GRpcReply (GRpcOWTy @Symbol p rref r)
forall a. Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 (Either TooMuchConcurrency (RawReply (GRpcOWTy @Symbol p rref r))
-> GRpcReply (GRpcOWTy @Symbol p rref r))
-> ExceptT
ClientError
IO
(Either TooMuchConcurrency (RawReply (GRpcOWTy @Symbol p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
RPCTy p
-> GrpcClient
-> ()
-> ExceptT
ClientError
IO
(Either TooMuchConcurrency (RawReply (GRpcOWTy @Symbol p rref r)))
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> i
-> ClientIO (Either TooMuchConcurrency (RawReply o))
rawUnary @_ @() @(GRpcOWTy p rref r) RPCTy p
rpc GrpcClient
client ()
instance ( KnownName name
, GRPCInput (RPCTy p) (), GRpcOutputWrapper p rref r
, handler ~ IO (ConduitT () (GRpcReply r) IO ()) )
=> GRpcMethodCall p ('Method name '[ ] ('RetStream rref)) handler where
gRpcMethodCall :: RPCTy p
-> Proxy
@(Method @* Symbol Symbol Symbol (TypeRef Symbol))
('Method
@*
@Symbol
@Symbol
@Symbol
@(TypeRef Symbol)
name
('[] @(Argument @* Symbol Symbol (TypeRef Symbol)))
('RetStream @* @(TypeRef Symbol) @Symbol rref))
-> GrpcClient
-> handler
gRpcMethodCall RPCTy p
rpc Proxy
@(Method @* Symbol Symbol Symbol (TypeRef Symbol))
('Method
@*
@Symbol
@Symbol
@Symbol
@(TypeRef Symbol)
name
('[] @(Argument @* Symbol Symbol (TypeRef Symbol)))
('RetStream @* @(TypeRef Symbol) @Symbol rref))
_ GrpcClient
client
= do
TMChan r
chan <- IO (TMChan r)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan r)
TMVar (GRpcReply ())
var <- IO (TMVar (GRpcReply ()))
forall a. IO (TMVar a)
newEmptyTMVarIO
Async ()
_ <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ do
GRpcReply ()
v <- ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> IO (GRpcReply ()))
-> ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$
Either
TooMuchConcurrency
((), [(ByteString, ByteString)], [(ByteString, ByteString)])
-> GRpcReply ()
forall v. Either TooMuchConcurrency v -> GRpcReply ()
buildGRpcReply3 (Either
TooMuchConcurrency
((), [(ByteString, ByteString)], [(ByteString, ByteString)])
-> GRpcReply ())
-> ExceptT
ClientError
IO
(Either
TooMuchConcurrency
((), [(ByteString, ByteString)], [(ByteString, ByteString)]))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
RPCTy p
-> GrpcClient
-> ()
-> ()
-> (()
-> [(ByteString, ByteString)]
-> GRpcOWTy @Symbol p rref r
-> ClientIO ())
-> ExceptT
ClientError
IO
(Either
TooMuchConcurrency
((), [(ByteString, ByteString)], [(ByteString, ByteString)]))
forall r i o a.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> a
-> i
-> (a -> [(ByteString, ByteString)] -> o -> ClientIO a)
-> ClientIO
(Either
TooMuchConcurrency
(a, [(ByteString, ByteString)], [(ByteString, ByteString)]))
rawStreamServer @_ @() @(GRpcOWTy p rref r)
RPCTy p
rpc GrpcClient
client () ()
(\()
_ [(ByteString, ByteString)]
_ GRpcOWTy @Symbol p rref r
newVal -> IO () -> ClientIO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ClientIO ()) -> IO () -> ClientIO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Bool
_ <- TMVar (GRpcReply ()) -> GRpcReply () -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar (GRpcReply ())
var (() -> GRpcReply ()
forall a. a -> GRpcReply a
GRpcOk ())
TMChan r -> r -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan r
chan (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) rref -> GRpcOWTy @Symbol p rref r -> r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> GRpcOWTy @snm p ref r -> r
unGRpcOWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) rref
forall k (t :: k). Proxy @k t
Proxy @rref) GRpcOWTy @Symbol p rref r
newVal))
case GRpcReply ()
v of
GRpcOk () -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan r -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan r
chan
GRpcReply ()
_ -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> GRpcReply () -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (GRpcReply ())
var GRpcReply ()
v
let go :: ConduitT () (GRpcReply r) IO ()
go = do GRpcReply ()
firstResult <- IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ()))
-> IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ STM (GRpcReply ()) -> IO (GRpcReply ())
forall a. STM a -> IO a
atomically (STM (GRpcReply ()) -> IO (GRpcReply ()))
-> STM (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> STM (GRpcReply ())
forall a. TMVar a -> STM a
takeTMVar TMVar (GRpcReply ())
var
case GRpcReply ()
firstResult of
GRpcOk ()
_ ->
TMChan r -> ConduitT () r IO ()
forall (m :: * -> *) a. MonadIO m => TMChan a -> ConduitT () a m ()
sourceTMChan TMChan r
chan ConduitT () r IO ()
-> ConduitM r (GRpcReply r) IO ()
-> ConduitT () (GRpcReply r) IO ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (r -> GRpcReply r) -> ConduitM r (GRpcReply r) IO ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.map r -> GRpcReply r
forall a. a -> GRpcReply a
GRpcOk
GRpcReply ()
e -> GRpcReply r -> ConduitT () (GRpcReply r) IO ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (GRpcReply r -> ConduitT () (GRpcReply r) IO ())
-> GRpcReply r -> ConduitT () (GRpcReply r) IO ()
forall a b. (a -> b) -> a -> b
$ (\()
_ -> String -> r
forall a. HasCallStack => String -> a
error String
"this should never happen") (() -> r) -> GRpcReply () -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> GRpcReply ()
e
ConduitT () (GRpcReply r) IO ()
-> IO (ConduitT () (GRpcReply r) IO ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ConduitT () (GRpcReply r) IO ()
go
instance ( KnownName name
, GRpcInputWrapper p vref v, GRPCOutput (RPCTy p) ()
, handler ~ (v -> IO (GRpcReply ())) )
=> GRpcMethodCall p ('Method name '[ 'ArgSingle aname vref ]
'RetNothing) handler where
gRpcMethodCall :: RPCTy p
-> Proxy
@(Method @* Symbol Symbol Symbol (TypeRef Symbol))
('Method
@*
@Symbol
@Symbol
@Symbol
@(TypeRef Symbol)
name
((':)
@(Argument @* Symbol Symbol (TypeRef Symbol))
('ArgSingle @* @Symbol @(TypeRef Symbol) @Symbol aname vref)
('[] @(Argument @* Symbol Symbol (TypeRef Symbol))))
('RetNothing @* @Symbol @(TypeRef Symbol)))
-> GrpcClient
-> handler
gRpcMethodCall RPCTy p
rpc Proxy
@(Method @* Symbol Symbol Symbol (TypeRef Symbol))
('Method
@*
@Symbol
@Symbol
@Symbol
@(TypeRef Symbol)
name
((':)
@(Argument @* Symbol Symbol (TypeRef Symbol))
('ArgSingle @* @Symbol @(TypeRef Symbol) @Symbol aname vref)
('[] @(Argument @* Symbol Symbol (TypeRef Symbol))))
('RetNothing @* @Symbol @(TypeRef Symbol)))
_ GrpcClient
client v
x
= ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> IO (GRpcReply ()))
-> ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$
Either TooMuchConcurrency (RawReply ()) -> GRpcReply ()
forall a. Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 (Either TooMuchConcurrency (RawReply ()) -> GRpcReply ())
-> ExceptT ClientError IO (Either TooMuchConcurrency (RawReply ()))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
RPCTy p
-> GrpcClient
-> GRpcIWTy @Symbol p vref v
-> ExceptT ClientError IO (Either TooMuchConcurrency (RawReply ()))
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> i
-> ClientIO (Either TooMuchConcurrency (RawReply o))
rawUnary @_ @(GRpcIWTy p vref v) @() RPCTy p
rpc GrpcClient
client (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) vref -> v -> GRpcIWTy @Symbol p vref v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> r -> GRpcIWTy @snm p ref r
buildGRpcIWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) vref
forall k (t :: k). Proxy @k t
Proxy @vref) v
x)
instance ( KnownName name
, GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r
, handler ~ (v -> IO (GRpcReply r)) )
=> GRpcMethodCall p ('Method name '[ 'ArgSingle aname vref ]
('RetSingle rref)) handler where
gRpcMethodCall :: RPCTy p
-> Proxy
@(Method @* Symbol Symbol Symbol (TypeRef Symbol))
('Method
@*
@Symbol
@Symbol
@Symbol
@(TypeRef Symbol)
name
((':)
@(Argument @* Symbol Symbol (TypeRef Symbol))
('ArgSingle @* @Symbol @(TypeRef Symbol) @Symbol aname vref)
('[] @(Argument @* Symbol Symbol (TypeRef Symbol))))
('RetSingle @* @(TypeRef Symbol) @Symbol rref))
-> GrpcClient
-> handler
gRpcMethodCall RPCTy p
rpc Proxy
@(Method @* Symbol Symbol Symbol (TypeRef Symbol))
('Method
@*
@Symbol
@Symbol
@Symbol
@(TypeRef Symbol)
name
((':)
@(Argument @* Symbol Symbol (TypeRef Symbol))
('ArgSingle @* @Symbol @(TypeRef Symbol) @Symbol aname vref)
('[] @(Argument @* Symbol Symbol (TypeRef Symbol))))
('RetSingle @* @(TypeRef Symbol) @Symbol rref))
_ GrpcClient
client v
x
= (GRpcReply (GRpcOWTy @Symbol p rref r) -> GRpcReply r)
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> IO (GRpcReply r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((GRpcOWTy @Symbol p rref r -> r)
-> GRpcReply (GRpcOWTy @Symbol p rref r) -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) rref -> GRpcOWTy @Symbol p rref r -> r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> GRpcOWTy @snm p ref r -> r
unGRpcOWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) rref
forall k (t :: k). Proxy @k t
Proxy @rref))) (IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> IO (GRpcReply r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> IO (GRpcReply r)
forall a b. (a -> b) -> a -> b
$
ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall a b. (a -> b) -> a -> b
$
Either TooMuchConcurrency (RawReply (GRpcOWTy @Symbol p rref r))
-> GRpcReply (GRpcOWTy @Symbol p rref r)
forall a. Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 (Either TooMuchConcurrency (RawReply (GRpcOWTy @Symbol p rref r))
-> GRpcReply (GRpcOWTy @Symbol p rref r))
-> ExceptT
ClientError
IO
(Either TooMuchConcurrency (RawReply (GRpcOWTy @Symbol p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
RPCTy p
-> GrpcClient
-> GRpcIWTy @Symbol p vref v
-> ExceptT
ClientError
IO
(Either TooMuchConcurrency (RawReply (GRpcOWTy @Symbol p rref r)))
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> i
-> ClientIO (Either TooMuchConcurrency (RawReply o))
rawUnary @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
RPCTy p
rpc GrpcClient
client (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) vref -> v -> GRpcIWTy @Symbol p vref v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> r -> GRpcIWTy @snm p ref r
buildGRpcIWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) vref
forall k (t :: k). Proxy @k t
Proxy @vref) v
x)
instance ( KnownName name
, GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r
, handler ~ (v -> IO (ConduitT () (GRpcReply r) IO ())) )
=> GRpcMethodCall p ('Method name '[ 'ArgSingle aname vref ]
('RetStream rref)) handler where
gRpcMethodCall :: RPCTy p
-> Proxy
@(Method @* Symbol Symbol Symbol (TypeRef Symbol))
('Method
@*
@Symbol
@Symbol
@Symbol
@(TypeRef Symbol)
name
((':)
@(Argument @* Symbol Symbol (TypeRef Symbol))
('ArgSingle @* @Symbol @(TypeRef Symbol) @Symbol aname vref)
('[] @(Argument @* Symbol Symbol (TypeRef Symbol))))
('RetStream @* @(TypeRef Symbol) @Symbol rref))
-> GrpcClient
-> handler
gRpcMethodCall RPCTy p
rpc Proxy
@(Method @* Symbol Symbol Symbol (TypeRef Symbol))
('Method
@*
@Symbol
@Symbol
@Symbol
@(TypeRef Symbol)
name
((':)
@(Argument @* Symbol Symbol (TypeRef Symbol))
('ArgSingle @* @Symbol @(TypeRef Symbol) @Symbol aname vref)
('[] @(Argument @* Symbol Symbol (TypeRef Symbol))))
('RetStream @* @(TypeRef Symbol) @Symbol rref))
_ GrpcClient
client v
x
= do
TMChan r
chan <- IO (TMChan r)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan r)
TMVar (GRpcReply ())
var <- IO (TMVar (GRpcReply ()))
forall a. IO (TMVar a)
newEmptyTMVarIO
Async ()
_ <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ do
GRpcReply ()
v <- ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> IO (GRpcReply ()))
-> ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$
Either
TooMuchConcurrency
((), [(ByteString, ByteString)], [(ByteString, ByteString)])
-> GRpcReply ()
forall v. Either TooMuchConcurrency v -> GRpcReply ()
buildGRpcReply3 (Either
TooMuchConcurrency
((), [(ByteString, ByteString)], [(ByteString, ByteString)])
-> GRpcReply ())
-> ExceptT
ClientError
IO
(Either
TooMuchConcurrency
((), [(ByteString, ByteString)], [(ByteString, ByteString)]))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
RPCTy p
-> GrpcClient
-> ()
-> GRpcIWTy @Symbol p vref v
-> (()
-> [(ByteString, ByteString)]
-> GRpcOWTy @Symbol p rref r
-> ClientIO ())
-> ExceptT
ClientError
IO
(Either
TooMuchConcurrency
((), [(ByteString, ByteString)], [(ByteString, ByteString)]))
forall r i o a.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> a
-> i
-> (a -> [(ByteString, ByteString)] -> o -> ClientIO a)
-> ClientIO
(Either
TooMuchConcurrency
(a, [(ByteString, ByteString)], [(ByteString, ByteString)]))
rawStreamServer @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
RPCTy p
rpc GrpcClient
client () (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) vref -> v -> GRpcIWTy @Symbol p vref v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> r -> GRpcIWTy @snm p ref r
buildGRpcIWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) vref
forall k (t :: k). Proxy @k t
Proxy @vref) v
x)
(\()
_ [(ByteString, ByteString)]
_ GRpcOWTy @Symbol p rref r
newVal -> IO () -> ClientIO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ClientIO ()) -> IO () -> ClientIO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Bool
_ <- TMVar (GRpcReply ()) -> GRpcReply () -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar (GRpcReply ())
var (() -> GRpcReply ()
forall a. a -> GRpcReply a
GRpcOk ())
TMChan r -> r -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan r
chan (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) rref -> GRpcOWTy @Symbol p rref r -> r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> GRpcOWTy @snm p ref r -> r
unGRpcOWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) rref
forall k (t :: k). Proxy @k t
Proxy @rref) GRpcOWTy @Symbol p rref r
newVal))
case GRpcReply ()
v of
GRpcOk () -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan r -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan r
chan
GRpcReply ()
_ -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> GRpcReply () -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (GRpcReply ())
var GRpcReply ()
v
let go :: ConduitT () (GRpcReply r) IO ()
go = do GRpcReply ()
firstResult <- IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ()))
-> IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ STM (GRpcReply ()) -> IO (GRpcReply ())
forall a. STM a -> IO a
atomically (STM (GRpcReply ()) -> IO (GRpcReply ()))
-> STM (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> STM (GRpcReply ())
forall a. TMVar a -> STM a
takeTMVar TMVar (GRpcReply ())
var
case GRpcReply ()
firstResult of
GRpcOk ()
_ ->
TMChan r -> ConduitT () r IO ()
forall (m :: * -> *) a. MonadIO m => TMChan a -> ConduitT () a m ()
sourceTMChan TMChan r
chan ConduitT () r IO ()
-> ConduitM r (GRpcReply r) IO ()
-> ConduitT () (GRpcReply r) IO ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (r -> GRpcReply r) -> ConduitM r (GRpcReply r) IO ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.map r -> GRpcReply r
forall a. a -> GRpcReply a
GRpcOk
GRpcReply ()
e -> GRpcReply r -> ConduitT () (GRpcReply r) IO ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (GRpcReply r -> ConduitT () (GRpcReply r) IO ())
-> GRpcReply r -> ConduitT () (GRpcReply r) IO ()
forall a b. (a -> b) -> a -> b
$ (\()
_ -> String -> r
forall a. HasCallStack => String -> a
error String
"this should never happen") (() -> r) -> GRpcReply () -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> GRpcReply ()
e
ConduitT () (GRpcReply r) IO ()
-> IO (ConduitT () (GRpcReply r) IO ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ConduitT () (GRpcReply r) IO ()
go
instance ( KnownName name
, GRpcInputWrapper p vref v, GRPCOutput (RPCTy p) ()
, handler ~ (CompressMode -> IO (ConduitT v Void IO (GRpcReply ()))) )
=> GRpcMethodCall p ('Method name '[ 'ArgStream aname vref ]
'RetNothing) handler where
gRpcMethodCall :: RPCTy p
-> Proxy
@(Method @* Symbol Symbol Symbol (TypeRef Symbol))
('Method
@*
@Symbol
@Symbol
@Symbol
@(TypeRef Symbol)
name
((':)
@(Argument @* Symbol Symbol (TypeRef Symbol))
('ArgStream @* @Symbol @(TypeRef Symbol) @Symbol aname vref)
('[] @(Argument @* Symbol Symbol (TypeRef Symbol))))
('RetNothing @* @Symbol @(TypeRef Symbol)))
-> GrpcClient
-> handler
gRpcMethodCall RPCTy p
rpc Proxy
@(Method @* Symbol Symbol Symbol (TypeRef Symbol))
('Method
@*
@Symbol
@Symbol
@Symbol
@(TypeRef Symbol)
name
((':)
@(Argument @* Symbol Symbol (TypeRef Symbol))
('ArgStream @* @Symbol @(TypeRef Symbol) @Symbol aname vref)
('[] @(Argument @* Symbol Symbol (TypeRef Symbol))))
('RetNothing @* @Symbol @(TypeRef Symbol)))
_ GrpcClient
client CompressMode
compress
= do
TMChan v
chan <- IO (TMChan v)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan v)
Async (GRpcReply ())
promise <- IO (GRpcReply ()) -> IO (Async (GRpcReply ()))
forall a. IO a -> IO (Async a)
async (IO (GRpcReply ()) -> IO (Async (GRpcReply ())))
-> IO (GRpcReply ()) -> IO (Async (GRpcReply ()))
forall a b. (a -> b) -> a -> b
$
ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> IO (GRpcReply ()))
-> ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$
Either TooMuchConcurrency ((), RawReply ()) -> GRpcReply ()
forall r a.
Either TooMuchConcurrency (r, RawReply a) -> GRpcReply a
buildGRpcReply2 (Either TooMuchConcurrency ((), RawReply ()) -> GRpcReply ())
-> ExceptT
ClientError IO (Either TooMuchConcurrency ((), RawReply ()))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
RPCTy p
-> GrpcClient
-> ()
-> (()
-> ClientIO
((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v)))
-> ExceptT
ClientError IO (Either TooMuchConcurrency ((), RawReply ()))
forall r i o a.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> a
-> (a -> ClientIO (a, Either StreamDone (CompressMode, i)))
-> ClientIO (Either TooMuchConcurrency (a, RawReply o))
rawStreamClient @_ @(GRpcIWTy p vref v) @() RPCTy p
rpc GrpcClient
client ()
(\()
_ -> do Maybe v
nextVal <- IO (Maybe v) -> ExceptT ClientError IO (Maybe v)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe v) -> ExceptT ClientError IO (Maybe v))
-> IO (Maybe v) -> ExceptT ClientError IO (Maybe v)
forall a b. (a -> b) -> a -> b
$ STM (Maybe v) -> IO (Maybe v)
forall a. STM a -> IO a
atomically (STM (Maybe v) -> IO (Maybe v)) -> STM (Maybe v) -> IO (Maybe v)
forall a b. (a -> b) -> a -> b
$ TMChan v -> STM (Maybe v)
forall a. TMChan a -> STM (Maybe a)
readTMChan TMChan v
chan
case Maybe v
nextVal of
Maybe v
Nothing -> ((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v))
-> ClientIO
((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v))
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), StreamDone
-> Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v)
forall a b. a -> Either a b
Left StreamDone
StreamDone)
Just v
v -> ((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v))
-> ClientIO
((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v))
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), (CompressMode, GRpcIWTy @Symbol p vref v)
-> Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v)
forall a b. b -> Either a b
Right (CompressMode
compress, Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) vref -> v -> GRpcIWTy @Symbol p vref v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> r -> GRpcIWTy @snm p ref r
buildGRpcIWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) vref
forall k (t :: k). Proxy @k t
Proxy @vref) v
v)))
ConduitT v Void IO (GRpcReply ())
-> IO (ConduitT v Void IO (GRpcReply ()))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TMChan v
-> Async (GRpcReply ()) -> ConduitT v Void IO (GRpcReply ())
forall (m :: * -> *) a b o.
MonadIO m =>
TMChan a -> Async b -> ConduitT a o m b
conduitFromChannel TMChan v
chan Async (GRpcReply ())
promise)
instance ( KnownName name
, GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r
, handler ~ (CompressMode -> IO (ConduitT v Void IO (GRpcReply r))) )
=> GRpcMethodCall p ('Method name '[ 'ArgStream aname vref ]
('RetSingle rref)) handler where
gRpcMethodCall :: RPCTy p
-> Proxy
@(Method @* Symbol Symbol Symbol (TypeRef Symbol))
('Method
@*
@Symbol
@Symbol
@Symbol
@(TypeRef Symbol)
name
((':)
@(Argument @* Symbol Symbol (TypeRef Symbol))
('ArgStream @* @Symbol @(TypeRef Symbol) @Symbol aname vref)
('[] @(Argument @* Symbol Symbol (TypeRef Symbol))))
('RetSingle @* @(TypeRef Symbol) @Symbol rref))
-> GrpcClient
-> handler
gRpcMethodCall RPCTy p
rpc Proxy
@(Method @* Symbol Symbol Symbol (TypeRef Symbol))
('Method
@*
@Symbol
@Symbol
@Symbol
@(TypeRef Symbol)
name
((':)
@(Argument @* Symbol Symbol (TypeRef Symbol))
('ArgStream @* @Symbol @(TypeRef Symbol) @Symbol aname vref)
('[] @(Argument @* Symbol Symbol (TypeRef Symbol))))
('RetSingle @* @(TypeRef Symbol) @Symbol rref))
_ GrpcClient
client CompressMode
compress
= do
TMChan v
chan <- IO (TMChan v)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan v)
Async (GRpcReply r)
promise <- IO (GRpcReply r) -> IO (Async (GRpcReply r))
forall a. IO a -> IO (Async a)
async (IO (GRpcReply r) -> IO (Async (GRpcReply r)))
-> IO (GRpcReply r) -> IO (Async (GRpcReply r))
forall a b. (a -> b) -> a -> b
$
(GRpcReply (GRpcOWTy @Symbol p rref r) -> GRpcReply r)
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> IO (GRpcReply r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((GRpcOWTy @Symbol p rref r -> r)
-> GRpcReply (GRpcOWTy @Symbol p rref r) -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) rref -> GRpcOWTy @Symbol p rref r -> r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> GRpcOWTy @snm p ref r -> r
unGRpcOWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) rref
forall k (t :: k). Proxy @k t
Proxy @rref))) (IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> IO (GRpcReply r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> IO (GRpcReply r)
forall a b. (a -> b) -> a -> b
$
ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall a b. (a -> b) -> a -> b
$
Either
TooMuchConcurrency ((), RawReply (GRpcOWTy @Symbol p rref r))
-> GRpcReply (GRpcOWTy @Symbol p rref r)
forall r a.
Either TooMuchConcurrency (r, RawReply a) -> GRpcReply a
buildGRpcReply2 (Either
TooMuchConcurrency ((), RawReply (GRpcOWTy @Symbol p rref r))
-> GRpcReply (GRpcOWTy @Symbol p rref r))
-> ExceptT
ClientError
IO
(Either
TooMuchConcurrency ((), RawReply (GRpcOWTy @Symbol p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
RPCTy p
-> GrpcClient
-> ()
-> (()
-> ClientIO
((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v)))
-> ExceptT
ClientError
IO
(Either
TooMuchConcurrency ((), RawReply (GRpcOWTy @Symbol p rref r)))
forall r i o a.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> a
-> (a -> ClientIO (a, Either StreamDone (CompressMode, i)))
-> ClientIO (Either TooMuchConcurrency (a, RawReply o))
rawStreamClient @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r) RPCTy p
rpc GrpcClient
client ()
(\()
_ -> do Maybe v
nextVal <- IO (Maybe v) -> ExceptT ClientError IO (Maybe v)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe v) -> ExceptT ClientError IO (Maybe v))
-> IO (Maybe v) -> ExceptT ClientError IO (Maybe v)
forall a b. (a -> b) -> a -> b
$ STM (Maybe v) -> IO (Maybe v)
forall a. STM a -> IO a
atomically (STM (Maybe v) -> IO (Maybe v)) -> STM (Maybe v) -> IO (Maybe v)
forall a b. (a -> b) -> a -> b
$ TMChan v -> STM (Maybe v)
forall a. TMChan a -> STM (Maybe a)
readTMChan TMChan v
chan
case Maybe v
nextVal of
Maybe v
Nothing -> ((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v))
-> ClientIO
((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v))
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), StreamDone
-> Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v)
forall a b. a -> Either a b
Left StreamDone
StreamDone)
Just v
v -> ((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v))
-> ClientIO
((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v))
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), (CompressMode, GRpcIWTy @Symbol p vref v)
-> Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v)
forall a b. b -> Either a b
Right (CompressMode
compress, Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) vref -> v -> GRpcIWTy @Symbol p vref v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> r -> GRpcIWTy @snm p ref r
buildGRpcIWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) vref
forall k (t :: k). Proxy @k t
Proxy @vref) v
v)))
ConduitT v Void IO (GRpcReply r)
-> IO (ConduitT v Void IO (GRpcReply r))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TMChan v -> Async (GRpcReply r) -> ConduitT v Void IO (GRpcReply r)
forall (m :: * -> *) a b o.
MonadIO m =>
TMChan a -> Async b -> ConduitT a o m b
conduitFromChannel TMChan v
chan Async (GRpcReply r)
promise)
conduitFromChannel :: MonadIO m => TMChan a -> Async b -> ConduitT a o m b
conduitFromChannel :: TMChan a -> Async b -> ConduitT a o m b
conduitFromChannel TMChan a
chan Async b
promise = ConduitT a o m b
go
where go :: ConduitT a o m b
go = do Maybe a
x <- ConduitT a o m (Maybe a)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await
case Maybe a
x of
Just a
v -> do IO () -> ConduitT a o m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT a o m ()) -> IO () -> ConduitT a o m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan a -> a -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan a
chan a
v
ConduitT a o m b
go
Maybe a
Nothing -> do IO () -> ConduitT a o m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT a o m ()) -> IO () -> ConduitT a o m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan a -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan a
chan
IO b -> ConduitT a o m b
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO b -> ConduitT a o m b) -> IO b -> ConduitT a o m b
forall a b. (a -> b) -> a -> b
$ Async b -> IO b
forall a. Async a -> IO a
wait Async b
promise
instance ( KnownName name
, GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r
, handler ~ (CompressMode -> IO (ConduitT v Void IO (), ConduitT () r IO (GRpcReply ()))))
=> GRpcMethodCall p ('Method name '[ 'ArgStream aname vref ]
('RetStream rref)) handler where
gRpcMethodCall :: RPCTy p
-> Proxy
@(Method @* Symbol Symbol Symbol (TypeRef Symbol))
('Method
@*
@Symbol
@Symbol
@Symbol
@(TypeRef Symbol)
name
((':)
@(Argument @* Symbol Symbol (TypeRef Symbol))
('ArgStream @* @Symbol @(TypeRef Symbol) @Symbol aname vref)
('[] @(Argument @* Symbol Symbol (TypeRef Symbol))))
('RetStream @* @(TypeRef Symbol) @Symbol rref))
-> GrpcClient
-> handler
gRpcMethodCall RPCTy p
rpc Proxy
@(Method @* Symbol Symbol Symbol (TypeRef Symbol))
('Method
@*
@Symbol
@Symbol
@Symbol
@(TypeRef Symbol)
name
((':)
@(Argument @* Symbol Symbol (TypeRef Symbol))
('ArgStream @* @Symbol @(TypeRef Symbol) @Symbol aname vref)
('[] @(Argument @* Symbol Symbol (TypeRef Symbol))))
('RetStream @* @(TypeRef Symbol) @Symbol rref))
_ GrpcClient
client CompressMode
compress
= do TMChan r
serverChan <- IO (TMChan r)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan r)
TMChan v
clientChan <- IO (TMChan v)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan v)
TMVar (GRpcReply ())
finalReply <- IO (TMVar (GRpcReply ()))
forall a. IO (TMVar a)
newEmptyTMVarIO :: IO (TMVar (GRpcReply ()))
Async ()
_ <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ do
GRpcReply ()
v <- ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> IO (GRpcReply ()))
-> ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$
Either TooMuchConcurrency ((), ()) -> GRpcReply ()
forall v. Either TooMuchConcurrency v -> GRpcReply ()
buildGRpcReply3 (Either TooMuchConcurrency ((), ()) -> GRpcReply ())
-> ExceptT ClientError IO (Either TooMuchConcurrency ((), ()))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
RPCTy p
-> GrpcClient
-> ()
-> (()
-> IncomingEvent (GRpcOWTy @Symbol p rref r) () -> ClientIO ())
-> ()
-> (()
-> ClientIO ((), OutgoingEvent (GRpcIWTy @Symbol p vref v) ()))
-> ExceptT ClientError IO (Either TooMuchConcurrency ((), ()))
forall r i o a b.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> a
-> (a -> IncomingEvent o a -> ClientIO a)
-> b
-> (b -> ClientIO (b, OutgoingEvent i b))
-> ClientIO (Either TooMuchConcurrency (a, b))
rawGeneralStream
@_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
RPCTy p
rpc GrpcClient
client
() (TMChan r
-> ()
-> IncomingEvent (GRpcOWTy @Symbol p rref r) ()
-> ClientIO ()
incomingEventConsumer TMChan r
serverChan)
() (TMChan v
-> ()
-> ClientIO ((), OutgoingEvent (GRpcIWTy @Symbol p vref v) ())
outgoingEventProducer TMChan v
clientChan)
IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> GRpcReply () -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (GRpcReply ())
finalReply GRpcReply ()
v
let clientConduit :: ConduitT v Void IO ()
clientConduit = do
TMChan v -> ConduitT v Void IO ()
forall (m :: * -> *) a z.
MonadIO m =>
TMChan a -> ConduitT a z m ()
sinkTMChan TMChan v
clientChan
IO () -> ConduitT v Void IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT v Void IO ())
-> (TMChan v -> IO ()) -> TMChan v -> ConduitT v Void IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> (TMChan v -> STM ()) -> TMChan v -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TMChan v -> STM ()
forall a. TMChan a -> STM ()
closeTMChan (TMChan v -> ConduitT v Void IO ())
-> TMChan v -> ConduitT v Void IO ()
forall a b. (a -> b) -> a -> b
$ TMChan v
clientChan
serverConduit :: ConduitT () r IO (GRpcReply ())
serverConduit = do
TMChan r -> ConduitT () r IO ()
forall (m :: * -> *) a. MonadIO m => TMChan a -> ConduitT () a m ()
sourceTMChan TMChan r
serverChan
IO (GRpcReply ()) -> ConduitT () r IO (GRpcReply ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (GRpcReply ()) -> ConduitT () r IO (GRpcReply ()))
-> (TMVar (GRpcReply ()) -> IO (GRpcReply ()))
-> TMVar (GRpcReply ())
-> ConduitT () r IO (GRpcReply ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (GRpcReply ()) -> IO (GRpcReply ())
forall a. STM a -> IO a
atomically (STM (GRpcReply ()) -> IO (GRpcReply ()))
-> (TMVar (GRpcReply ()) -> STM (GRpcReply ()))
-> TMVar (GRpcReply ())
-> IO (GRpcReply ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TMVar (GRpcReply ()) -> STM (GRpcReply ())
forall a. TMVar a -> STM a
readTMVar (TMVar (GRpcReply ()) -> ConduitT () r IO (GRpcReply ()))
-> TMVar (GRpcReply ()) -> ConduitT () r IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ())
finalReply
(ConduitT v Void IO (), ConduitT () r IO (GRpcReply ()))
-> IO (ConduitT v Void IO (), ConduitT () r IO (GRpcReply ()))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ConduitT v Void IO ()
clientConduit, ConduitT () r IO (GRpcReply ())
serverConduit)
where
incomingEventConsumer :: TMChan r -> () -> IncomingEvent (GRpcOWTy p rref r) () -> ExceptT ClientError IO ()
incomingEventConsumer :: TMChan r
-> ()
-> IncomingEvent (GRpcOWTy @Symbol p rref r) ()
-> ClientIO ()
incomingEventConsumer TMChan r
serverChan ()
_ IncomingEvent (GRpcOWTy @Symbol p rref r) ()
ievent =
case IncomingEvent (GRpcOWTy @Symbol p rref r) ()
ievent of
RecvMessage GRpcOWTy @Symbol p rref r
o -> do
IO () -> ClientIO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ClientIO ()) -> IO () -> ClientIO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan r -> r -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan r
serverChan (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) rref -> GRpcOWTy @Symbol p rref r -> r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> GRpcOWTy @snm p ref r -> r
unGRpcOWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) rref
forall k (t :: k). Proxy @k t
Proxy @rref) GRpcOWTy @Symbol p rref r
o)
Invalid SomeException
e -> IO () -> ClientIO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ClientIO ()) -> IO () -> ClientIO ()
forall a b. (a -> b) -> a -> b
$ do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan r -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan r
serverChan
SomeException -> IO ()
forall e a. Exception e => e -> IO a
throwIO SomeException
e
Trailers [(ByteString, ByteString)]
_ ->
IO () -> ClientIO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ClientIO ()) -> IO () -> ClientIO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan r -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan r
serverChan
Headers [(ByteString, ByteString)]
_ ->
() -> ClientIO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
outgoingEventProducer :: TMChan v -> () -> ExceptT ClientError IO ((), OutgoingEvent (GRpcIWTy p vref v) ())
outgoingEventProducer :: TMChan v
-> ()
-> ClientIO ((), OutgoingEvent (GRpcIWTy @Symbol p vref v) ())
outgoingEventProducer TMChan v
clientChan ()
_ = do
Maybe v
nextVal <- IO (Maybe v) -> ExceptT ClientError IO (Maybe v)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe v) -> ExceptT ClientError IO (Maybe v))
-> IO (Maybe v) -> ExceptT ClientError IO (Maybe v)
forall a b. (a -> b) -> a -> b
$ STM (Maybe v) -> IO (Maybe v)
forall a. STM a -> IO a
atomically (STM (Maybe v) -> IO (Maybe v)) -> STM (Maybe v) -> IO (Maybe v)
forall a b. (a -> b) -> a -> b
$ TMChan v -> STM (Maybe v)
forall a. TMChan a -> STM (Maybe a)
readTMChan TMChan v
clientChan
case Maybe v
nextVal of
Maybe v
Nothing -> ((), OutgoingEvent (GRpcIWTy @Symbol p vref v) ())
-> ClientIO ((), OutgoingEvent (GRpcIWTy @Symbol p vref v) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), OutgoingEvent (GRpcIWTy @Symbol p vref v) ()
forall i b. OutgoingEvent i b
Finalize)
Just v
v -> ((), OutgoingEvent (GRpcIWTy @Symbol p vref v) ())
-> ClientIO ((), OutgoingEvent (GRpcIWTy @Symbol p vref v) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), CompressMode
-> GRpcIWTy @Symbol p vref v
-> OutgoingEvent (GRpcIWTy @Symbol p vref v) ()
forall i b. CompressMode -> i -> OutgoingEvent i b
SendMessage CompressMode
compress (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) vref -> v -> GRpcIWTy @Symbol p vref v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> r -> GRpcIWTy @snm p ref r
buildGRpcIWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) vref
forall k (t :: k). Proxy @k t
Proxy @vref) v
v))