{-# language AllowAmbiguousTypes   #-}
{-# language DataKinds             #-}
{-# language DeriveFunctor         #-}
{-# language FlexibleContexts      #-}
{-# language FlexibleInstances     #-}
{-# language GADTs                 #-}
{-# language LambdaCase            #-}
{-# language MultiParamTypeClasses #-}
{-# language OverloadedStrings     #-}
{-# language PolyKinds             #-}
{-# language ScopedTypeVariables   #-}
{-# language TypeApplications      #-}
{-# language TypeFamilies          #-}
{-# language TypeOperators         #-}
{-# language UndecidableInstances  #-}
{-# OPTIONS_GHC -fprint-explicit-kinds #-}
-- | Client for gRPC services defined using Mu 'Service'
module Mu.GRpc.Client.Internal where

import           Control.Concurrent.Async
import           Control.Concurrent.STM        (atomically)
import           Control.Concurrent.STM.TMChan
import           Control.Concurrent.STM.TMVar
import           Control.Exception (throwIO)
import           Control.Monad.IO.Class
import           Data.Avro
import qualified Data.ByteString.Char8         as BS
import           Data.Conduit
import qualified Data.Conduit.Combinators      as C
import           Data.Conduit.TMChan
import           Data.Kind
import           Data.Text                     as T
import           GHC.TypeLits
import           Monitor.Tracing
import           Monitor.Tracing.Zipkin
import           Network.GRPC.Client           (CompressMode (..), IncomingEvent (..),
                                                OutgoingEvent (..), RawReply, StreamDone (..))
import           Network.GRPC.Client.Helpers
import           Network.GRPC.HTTP2.Encoding   (GRPCInput, GRPCOutput)
import           Network.HTTP2                 (ErrorCode)
import           Network.HTTP2.Client          (ClientError, ClientIO, TooMuchConcurrency,
                                                runExceptT, ExceptT)

import           Mu.Adapter.ProtoBuf.Via
import           Mu.GRpc.Avro
import           Mu.GRpc.Bridge
import           Mu.Rpc
import           Mu.Schema

-- | Initialize a connection to a gRPC server.
setupGrpcClient' :: MonadIO m
                 => GrpcClientConfig -> m (Either ClientError GrpcClient)
setupGrpcClient' :: GrpcClientConfig -> m (Either ClientError GrpcClient)
setupGrpcClient' = IO (Either ClientError GrpcClient)
-> m (Either ClientError GrpcClient)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either ClientError GrpcClient)
 -> m (Either ClientError GrpcClient))
-> (GrpcClientConfig -> IO (Either ClientError GrpcClient))
-> GrpcClientConfig
-> m (Either ClientError GrpcClient)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExceptT ClientError IO GrpcClient
-> IO (Either ClientError GrpcClient)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT ClientError IO GrpcClient
 -> IO (Either ClientError GrpcClient))
-> (GrpcClientConfig -> ExceptT ClientError IO GrpcClient)
-> GrpcClientConfig
-> IO (Either ClientError GrpcClient)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. GrpcClientConfig -> ExceptT ClientError IO GrpcClient
setupGrpcClient

-- | Initialize a connection to a gRPC server
--   and pass information about distributed tracing.
setupGrpcClientZipkin
  :: (MonadIO m, MonadTrace m)
  => GrpcClientConfig -> T.Text -> m (Either ClientError GrpcClient)
setupGrpcClientZipkin :: GrpcClientConfig -> Text -> m (Either ClientError GrpcClient)
setupGrpcClientZipkin GrpcClientConfig
cfg Text
spanName
  = Text
-> (Maybe B3 -> m (Either ClientError GrpcClient))
-> m (Either ClientError GrpcClient)
forall (m :: * -> *) a.
MonadTrace m =>
Text -> (Maybe B3 -> m a) -> m a
clientSpan Text
spanName ((Maybe B3 -> m (Either ClientError GrpcClient))
 -> m (Either ClientError GrpcClient))
-> (Maybe B3 -> m (Either ClientError GrpcClient))
-> m (Either ClientError GrpcClient)
forall a b. (a -> b) -> a -> b
$ \case
      Maybe B3
Nothing   -> GrpcClientConfig -> m (Either ClientError GrpcClient)
forall (m :: * -> *).
MonadIO m =>
GrpcClientConfig -> m (Either ClientError GrpcClient)
setupGrpcClient' GrpcClientConfig
cfg
      (Just B3
b3) -> GrpcClientConfig -> m (Either ClientError GrpcClient)
forall (m :: * -> *).
MonadIO m =>
GrpcClientConfig -> m (Either ClientError GrpcClient)
setupGrpcClient' GrpcClientConfig
cfg {
                      _grpcClientConfigHeaders :: [(ByteString, ByteString)]
_grpcClientConfigHeaders = (ByteString
"b3", B3 -> ByteString
b3ToHeaderValue B3
b3)
                                                 (ByteString, ByteString)
-> [(ByteString, ByteString)] -> [(ByteString, ByteString)]
forall a. a -> [a] -> [a]
: GrpcClientConfig -> [(ByteString, ByteString)]
_grpcClientConfigHeaders GrpcClientConfig
cfg
                   }

class GRpcServiceMethodCall (p :: GRpcMessageProtocol)
                            (pkg :: snm) (s :: snm)
                            (m :: Method snm mnm anm (TypeRef snm)) h where
  gRpcServiceMethodCall :: Proxy p -> Proxy pkg -> Proxy s -> Proxy m -> GrpcClient -> h
instance ( KnownName serviceName, KnownName pkg, KnownName mname
         , GRpcMethodCall p ('Method mname margs mret) h, MkRPC p )
         => GRpcServiceMethodCall p pkg serviceName ('Method mname margs mret) h where
  gRpcServiceMethodCall :: Proxy @GRpcMessageProtocol p
-> Proxy @Symbol pkg
-> Proxy @Symbol serviceName
-> Proxy
     @(Method @* Symbol Symbol Symbol (TypeRef Symbol))
     ('Method
        @* @Symbol @Symbol @Symbol @(TypeRef Symbol) mname margs mret)
-> GrpcClient
-> h
gRpcServiceMethodCall Proxy @GRpcMessageProtocol p
pro Proxy @Symbol pkg
_ Proxy @Symbol serviceName
_ = RPCTy p
-> Proxy
     @(Method @* Symbol Symbol Symbol (TypeRef Symbol))
     ('Method
        @* @Symbol @Symbol @Symbol @(TypeRef Symbol) mname margs mret)
-> GrpcClient
-> h
forall (p :: GRpcMessageProtocol)
       (method :: Method @* Symbol Symbol Symbol (TypeRef Symbol)) h.
GRpcMethodCall p method h =>
RPCTy p
-> Proxy @(Method @* Symbol Symbol Symbol (TypeRef Symbol)) method
-> GrpcClient
-> h
gRpcMethodCall @p RPCTy p
rpc
    where pkgName :: ByteString
pkgName = String -> ByteString
BS.pack (Proxy @Symbol pkg -> String
forall k (a :: k) (proxy :: k -> *).
KnownName @k a =>
proxy a -> String
nameVal (Proxy @Symbol pkg
forall k (t :: k). Proxy @k t
Proxy @pkg))
          svrName :: ByteString
svrName = String -> ByteString
BS.pack (Proxy @Symbol serviceName -> String
forall k (a :: k) (proxy :: k -> *).
KnownName @k a =>
proxy a -> String
nameVal (Proxy @Symbol serviceName
forall k (t :: k). Proxy @k t
Proxy @serviceName))
          metName :: ByteString
metName = String -> ByteString
BS.pack (Proxy @Symbol mname -> String
forall k (a :: k) (proxy :: k -> *).
KnownName @k a =>
proxy a -> String
nameVal (Proxy @Symbol mname
forall k (t :: k). Proxy @k t
Proxy @mname))
          rpc :: RPCTy p
rpc = Proxy @GRpcMessageProtocol p
-> ByteString -> ByteString -> ByteString -> RPCTy p
forall (p :: GRpcMessageProtocol).
MkRPC p =>
Proxy @GRpcMessageProtocol p
-> ByteString -> ByteString -> ByteString -> RPCTy p
mkRPC Proxy @GRpcMessageProtocol p
pro ByteString
pkgName ByteString
svrName ByteString
metName

data GRpcReply a
  = GRpcTooMuchConcurrency TooMuchConcurrency
  | GRpcErrorCode ErrorCode
  | GRpcErrorString String
  | GRpcClientError ClientError
  | GRpcOk a
  deriving (Int -> GRpcReply a -> ShowS
[GRpcReply a] -> ShowS
GRpcReply a -> String
(Int -> GRpcReply a -> ShowS)
-> (GRpcReply a -> String)
-> ([GRpcReply a] -> ShowS)
-> Show (GRpcReply a)
forall a. Show a => Int -> GRpcReply a -> ShowS
forall a. Show a => [GRpcReply a] -> ShowS
forall a. Show a => GRpcReply a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [GRpcReply a] -> ShowS
$cshowList :: forall a. Show a => [GRpcReply a] -> ShowS
show :: GRpcReply a -> String
$cshow :: forall a. Show a => GRpcReply a -> String
showsPrec :: Int -> GRpcReply a -> ShowS
$cshowsPrec :: forall a. Show a => Int -> GRpcReply a -> ShowS
Show, a -> GRpcReply b -> GRpcReply a
(a -> b) -> GRpcReply a -> GRpcReply b
(forall a b. (a -> b) -> GRpcReply a -> GRpcReply b)
-> (forall a b. a -> GRpcReply b -> GRpcReply a)
-> Functor GRpcReply
forall a b. a -> GRpcReply b -> GRpcReply a
forall a b. (a -> b) -> GRpcReply a -> GRpcReply b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> GRpcReply b -> GRpcReply a
$c<$ :: forall a b. a -> GRpcReply b -> GRpcReply a
fmap :: (a -> b) -> GRpcReply a -> GRpcReply b
$cfmap :: forall a b. (a -> b) -> GRpcReply a -> GRpcReply b
Functor)

buildGRpcReply1 :: Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 :: Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 (Left TooMuchConcurrency
tmc)                      = TooMuchConcurrency -> GRpcReply a
forall a. TooMuchConcurrency -> GRpcReply a
GRpcTooMuchConcurrency TooMuchConcurrency
tmc
buildGRpcReply1 (Right (Left ErrorCode
ec))               = ErrorCode -> GRpcReply a
forall a. ErrorCode -> GRpcReply a
GRpcErrorCode ErrorCode
ec
buildGRpcReply1 (Right (Right (CIHeaderList
_, Maybe CIHeaderList
_, Left String
es))) = String -> GRpcReply a
forall a. String -> GRpcReply a
GRpcErrorString String
es
buildGRpcReply1 (Right (Right (CIHeaderList
_, Maybe CIHeaderList
_, Right a
r))) = a -> GRpcReply a
forall a. a -> GRpcReply a
GRpcOk a
r

buildGRpcReply2 :: Either TooMuchConcurrency (r, RawReply a) -> GRpcReply a
buildGRpcReply2 :: Either TooMuchConcurrency (r, RawReply a) -> GRpcReply a
buildGRpcReply2 (Left TooMuchConcurrency
tmc)                         = TooMuchConcurrency -> GRpcReply a
forall a. TooMuchConcurrency -> GRpcReply a
GRpcTooMuchConcurrency TooMuchConcurrency
tmc
buildGRpcReply2 (Right (r
_, Left ErrorCode
ec))               = ErrorCode -> GRpcReply a
forall a. ErrorCode -> GRpcReply a
GRpcErrorCode ErrorCode
ec
buildGRpcReply2 (Right (r
_, Right (CIHeaderList
_, Maybe CIHeaderList
_, Left String
es))) = String -> GRpcReply a
forall a. String -> GRpcReply a
GRpcErrorString String
es
buildGRpcReply2 (Right (r
_, Right (CIHeaderList
_, Maybe CIHeaderList
_, Right a
r))) = a -> GRpcReply a
forall a. a -> GRpcReply a
GRpcOk a
r

buildGRpcReply3 :: Either TooMuchConcurrency v -> GRpcReply ()
buildGRpcReply3 :: Either TooMuchConcurrency v -> GRpcReply ()
buildGRpcReply3 (Left TooMuchConcurrency
tmc) = TooMuchConcurrency -> GRpcReply ()
forall a. TooMuchConcurrency -> GRpcReply a
GRpcTooMuchConcurrency TooMuchConcurrency
tmc
buildGRpcReply3 (Right v
_)  = () -> GRpcReply ()
forall a. a -> GRpcReply a
GRpcOk ()

simplifyResponse :: ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse :: ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse ClientIO (GRpcReply a)
reply = do
  Either ClientError (GRpcReply a)
r <- ClientIO (GRpcReply a) -> IO (Either ClientError (GRpcReply a))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT ClientIO (GRpcReply a)
reply
  GRpcReply a -> IO (GRpcReply a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (GRpcReply a -> IO (GRpcReply a))
-> GRpcReply a -> IO (GRpcReply a)
forall a b. (a -> b) -> a -> b
$ case Either ClientError (GRpcReply a)
r of
    Left ClientError
e  -> ClientError -> GRpcReply a
forall a. ClientError -> GRpcReply a
GRpcClientError ClientError
e
    Right GRpcReply a
v -> GRpcReply a
v

-- These type classes allow us to abstract over
-- the choice of message protocol (PB or Avro)

class GRPCInput (RPCTy p) (GRpcIWTy p ref r)
      => GRpcInputWrapper (p :: GRpcMessageProtocol) (ref :: TypeRef snm) (r :: Type) where
  type GRpcIWTy p ref r :: Type
  buildGRpcIWTy :: Proxy p -> Proxy ref -> r -> GRpcIWTy p ref r

instance ToProtoBufTypeRef ref r
         => GRpcInputWrapper 'MsgProtoBuf ref r where
  type GRpcIWTy 'MsgProtoBuf ref r = ViaToProtoBufTypeRef ref r
  buildGRpcIWTy :: Proxy @GRpcMessageProtocol 'MsgProtoBuf
-> Proxy @(TypeRef snm) ref
-> r
-> GRpcIWTy @snm 'MsgProtoBuf ref r
buildGRpcIWTy Proxy @GRpcMessageProtocol 'MsgProtoBuf
_ Proxy @(TypeRef snm) ref
_ = r -> GRpcIWTy @snm 'MsgProtoBuf ref r
forall snm (ref :: TypeRef snm) t.
t -> ViaToProtoBufTypeRef @snm ref t
ViaToProtoBufTypeRef

instance forall (sch :: Schema') (sty :: Symbol) (r :: Type).
         ( ToSchema sch sty r
         , ToAvro (WithSchema sch sty r)
         , HasAvroSchema (WithSchema sch sty r) )
         => GRpcInputWrapper 'MsgAvro ('SchemaRef sch sty) r where
  type GRpcIWTy 'MsgAvro ('SchemaRef sch sty) r = ViaToAvroTypeRef ('SchemaRef sch sty) r
  buildGRpcIWTy :: Proxy @GRpcMessageProtocol 'MsgAvro
-> Proxy @(TypeRef snm) ('SchemaRef @Symbol @Symbol @snm sch sty)
-> r
-> GRpcIWTy
     @snm 'MsgAvro ('SchemaRef @Symbol @Symbol @snm sch sty) r
buildGRpcIWTy Proxy @GRpcMessageProtocol 'MsgAvro
_ Proxy @(TypeRef snm) ('SchemaRef @Symbol @Symbol @snm sch sty)
_ = r
-> GRpcIWTy
     @snm 'MsgAvro ('SchemaRef @Symbol @Symbol @snm sch sty) r
forall snm (ref :: TypeRef snm) t. t -> ViaToAvroTypeRef @snm ref t
ViaToAvroTypeRef

class GRPCOutput (RPCTy p) (GRpcOWTy p ref r)
      => GRpcOutputWrapper (p :: GRpcMessageProtocol) (ref :: TypeRef snm) (r :: Type) where
  type GRpcOWTy p ref r :: Type
  unGRpcOWTy :: Proxy p -> Proxy ref -> GRpcOWTy p ref r -> r

instance FromProtoBufTypeRef ref r
         => GRpcOutputWrapper 'MsgProtoBuf ref r where
  type GRpcOWTy 'MsgProtoBuf ref r = ViaFromProtoBufTypeRef ref r
  unGRpcOWTy :: Proxy @GRpcMessageProtocol 'MsgProtoBuf
-> Proxy @(TypeRef snm) ref
-> GRpcOWTy @snm 'MsgProtoBuf ref r
-> r
unGRpcOWTy Proxy @GRpcMessageProtocol 'MsgProtoBuf
_ Proxy @(TypeRef snm) ref
_ = GRpcOWTy @snm 'MsgProtoBuf ref r -> r
forall snm (ref :: TypeRef snm) t.
ViaFromProtoBufTypeRef @snm ref t -> t
unViaFromProtoBufTypeRef

instance forall (sch :: Schema') (sty :: Symbol) (r :: Type).
         ( FromSchema sch sty r
         , FromAvro (WithSchema sch sty r)
         , HasAvroSchema (WithSchema sch sty r) )
         => GRpcOutputWrapper 'MsgAvro ('SchemaRef sch sty) r where
  type GRpcOWTy 'MsgAvro ('SchemaRef sch sty) r = ViaFromAvroTypeRef ('SchemaRef sch sty) r
  unGRpcOWTy :: Proxy @GRpcMessageProtocol 'MsgAvro
-> Proxy @(TypeRef snm) ('SchemaRef @Symbol @Symbol @snm sch sty)
-> GRpcOWTy
     @snm 'MsgAvro ('SchemaRef @Symbol @Symbol @snm sch sty) r
-> r
unGRpcOWTy Proxy @GRpcMessageProtocol 'MsgAvro
_ Proxy @(TypeRef snm) ('SchemaRef @Symbol @Symbol @snm sch sty)
_ = GRpcOWTy @snm 'MsgAvro ('SchemaRef @Symbol @Symbol @snm sch sty) r
-> r
forall snm (ref :: TypeRef snm) t.
ViaFromAvroTypeRef @snm ref t -> t
unViaFromAvroTypeRef

-- -----------------------------
-- IMPLEMENTATION OF THE METHODS
-- -----------------------------

class GRpcMethodCall (p :: GRpcMessageProtocol) (method :: Method') h where
  gRpcMethodCall :: RPCTy p -> Proxy method -> GrpcClient -> h

instance ( KnownName name
         , GRPCInput (RPCTy p) (), GRPCOutput (RPCTy p) ()
         , handler ~ IO (GRpcReply ()) )
         => GRpcMethodCall p ('Method name '[ ] 'RetNothing) handler where
  gRpcMethodCall :: RPCTy p
-> Proxy
     @(Method @* Symbol Symbol Symbol (TypeRef Symbol))
     ('Method
        @*
        @Symbol
        @Symbol
        @Symbol
        @(TypeRef Symbol)
        name
        ('[] @(Argument @* Symbol Symbol (TypeRef Symbol)))
        ('RetNothing @* @Symbol @(TypeRef Symbol)))
-> GrpcClient
-> handler
gRpcMethodCall RPCTy p
rpc Proxy
  @(Method @* Symbol Symbol Symbol (TypeRef Symbol))
  ('Method
     @*
     @Symbol
     @Symbol
     @Symbol
     @(TypeRef Symbol)
     name
     ('[] @(Argument @* Symbol Symbol (TypeRef Symbol)))
     ('RetNothing @* @Symbol @(TypeRef Symbol)))
_ GrpcClient
client
    = ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> IO (GRpcReply ()))
-> ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$
      Either TooMuchConcurrency (RawReply ()) -> GRpcReply ()
forall a. Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 (Either TooMuchConcurrency (RawReply ()) -> GRpcReply ())
-> ExceptT ClientError IO (Either TooMuchConcurrency (RawReply ()))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
      RPCTy p
-> GrpcClient
-> ()
-> ExceptT ClientError IO (Either TooMuchConcurrency (RawReply ()))
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> i
-> ClientIO (Either TooMuchConcurrency (RawReply o))
rawUnary RPCTy p
rpc GrpcClient
client ()

instance ( KnownName name
         , GRPCInput (RPCTy p) (), GRpcOutputWrapper p rref r
         , handler ~ IO (GRpcReply r) )
         => GRpcMethodCall p ('Method name '[ ] ('RetSingle rref)) handler where
  gRpcMethodCall :: RPCTy p
-> Proxy
     @(Method @* Symbol Symbol Symbol (TypeRef Symbol))
     ('Method
        @*
        @Symbol
        @Symbol
        @Symbol
        @(TypeRef Symbol)
        name
        ('[] @(Argument @* Symbol Symbol (TypeRef Symbol)))
        ('RetSingle @* @(TypeRef Symbol) @Symbol rref))
-> GrpcClient
-> handler
gRpcMethodCall RPCTy p
rpc Proxy
  @(Method @* Symbol Symbol Symbol (TypeRef Symbol))
  ('Method
     @*
     @Symbol
     @Symbol
     @Symbol
     @(TypeRef Symbol)
     name
     ('[] @(Argument @* Symbol Symbol (TypeRef Symbol)))
     ('RetSingle @* @(TypeRef Symbol) @Symbol rref))
_ GrpcClient
client
    = (GRpcReply (GRpcOWTy @Symbol p rref r) -> GRpcReply r)
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> IO (GRpcReply r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((GRpcOWTy @Symbol p rref r -> r)
-> GRpcReply (GRpcOWTy @Symbol p rref r) -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) rref -> GRpcOWTy @Symbol p rref r -> r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> GRpcOWTy @snm p ref r -> r
unGRpcOWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) rref
forall k (t :: k). Proxy @k t
Proxy @rref))) (IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> IO (GRpcReply r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> IO (GRpcReply r)
forall a b. (a -> b) -> a -> b
$
      ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
 -> IO (GRpcReply (GRpcOWTy @Symbol p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall a b. (a -> b) -> a -> b
$
      Either TooMuchConcurrency (RawReply (GRpcOWTy @Symbol p rref r))
-> GRpcReply (GRpcOWTy @Symbol p rref r)
forall a. Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 (Either TooMuchConcurrency (RawReply (GRpcOWTy @Symbol p rref r))
 -> GRpcReply (GRpcOWTy @Symbol p rref r))
-> ExceptT
     ClientError
     IO
     (Either TooMuchConcurrency (RawReply (GRpcOWTy @Symbol p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
      RPCTy p
-> GrpcClient
-> ()
-> ExceptT
     ClientError
     IO
     (Either TooMuchConcurrency (RawReply (GRpcOWTy @Symbol p rref r)))
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> i
-> ClientIO (Either TooMuchConcurrency (RawReply o))
rawUnary @_ @() @(GRpcOWTy p rref r) RPCTy p
rpc GrpcClient
client ()

instance ( KnownName name
         , GRPCInput (RPCTy p) (), GRpcOutputWrapper p rref r
         , handler ~ IO (ConduitT () (GRpcReply r) IO ()) )
         => GRpcMethodCall p ('Method name '[ ] ('RetStream rref)) handler where
  gRpcMethodCall :: RPCTy p
-> Proxy
     @(Method @* Symbol Symbol Symbol (TypeRef Symbol))
     ('Method
        @*
        @Symbol
        @Symbol
        @Symbol
        @(TypeRef Symbol)
        name
        ('[] @(Argument @* Symbol Symbol (TypeRef Symbol)))
        ('RetStream @* @(TypeRef Symbol) @Symbol rref))
-> GrpcClient
-> handler
gRpcMethodCall RPCTy p
rpc Proxy
  @(Method @* Symbol Symbol Symbol (TypeRef Symbol))
  ('Method
     @*
     @Symbol
     @Symbol
     @Symbol
     @(TypeRef Symbol)
     name
     ('[] @(Argument @* Symbol Symbol (TypeRef Symbol)))
     ('RetStream @* @(TypeRef Symbol) @Symbol rref))
_ GrpcClient
client
    = do -- Create a new TMChan
         TMChan r
chan <- IO (TMChan r)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan r)
         TMVar (GRpcReply ())
var  <- IO (TMVar (GRpcReply ()))
forall a. IO (TMVar a)
newEmptyTMVarIO  -- if full, this means an error
         -- Start executing the client in another thread
         Async ()
_ <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ do
            GRpcReply ()
v <- ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> IO (GRpcReply ()))
-> ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$
                 Either
  TooMuchConcurrency
  ((), [(ByteString, ByteString)], [(ByteString, ByteString)])
-> GRpcReply ()
forall v. Either TooMuchConcurrency v -> GRpcReply ()
buildGRpcReply3 (Either
   TooMuchConcurrency
   ((), [(ByteString, ByteString)], [(ByteString, ByteString)])
 -> GRpcReply ())
-> ExceptT
     ClientError
     IO
     (Either
        TooMuchConcurrency
        ((), [(ByteString, ByteString)], [(ByteString, ByteString)]))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
                 RPCTy p
-> GrpcClient
-> ()
-> ()
-> (()
    -> [(ByteString, ByteString)]
    -> GRpcOWTy @Symbol p rref r
    -> ClientIO ())
-> ExceptT
     ClientError
     IO
     (Either
        TooMuchConcurrency
        ((), [(ByteString, ByteString)], [(ByteString, ByteString)]))
forall r i o a.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> a
-> i
-> (a -> [(ByteString, ByteString)] -> o -> ClientIO a)
-> ClientIO
     (Either
        TooMuchConcurrency
        (a, [(ByteString, ByteString)], [(ByteString, ByteString)]))
rawStreamServer @_ @() @(GRpcOWTy p rref r)
                                 RPCTy p
rpc GrpcClient
client () ()
                                 (\()
_ [(ByteString, ByteString)]
_ GRpcOWTy @Symbol p rref r
newVal -> IO () -> ClientIO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ClientIO ()) -> IO () -> ClientIO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                                   -- on the first iteration, say that everything is OK
                                   Bool
_ <- TMVar (GRpcReply ()) -> GRpcReply () -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar (GRpcReply ())
var (() -> GRpcReply ()
forall a. a -> GRpcReply a
GRpcOk ())
                                   TMChan r -> r -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan r
chan (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) rref -> GRpcOWTy @Symbol p rref r -> r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> GRpcOWTy @snm p ref r -> r
unGRpcOWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) rref
forall k (t :: k). Proxy @k t
Proxy @rref) GRpcOWTy @Symbol p rref r
newVal))
            case GRpcReply ()
v of
              GRpcOk () -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan r -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan r
chan
              GRpcReply ()
_         -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> GRpcReply () -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (GRpcReply ())
var GRpcReply ()
v
         -- This conduit feeds information to the other thread
         let go :: ConduitT () (GRpcReply r) IO ()
go = do GRpcReply ()
firstResult <- IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ()))
-> IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ STM (GRpcReply ()) -> IO (GRpcReply ())
forall a. STM a -> IO a
atomically (STM (GRpcReply ()) -> IO (GRpcReply ()))
-> STM (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> STM (GRpcReply ())
forall a. TMVar a -> STM a
takeTMVar TMVar (GRpcReply ())
var
                     case GRpcReply ()
firstResult of
                       GRpcOk ()
_ -> -- no error, everything is fine
                         TMChan r -> ConduitT () r IO ()
forall (m :: * -> *) a. MonadIO m => TMChan a -> ConduitT () a m ()
sourceTMChan TMChan r
chan ConduitT () r IO ()
-> ConduitM r (GRpcReply r) IO ()
-> ConduitT () (GRpcReply r) IO ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (r -> GRpcReply r) -> ConduitM r (GRpcReply r) IO ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.map r -> GRpcReply r
forall a. a -> GRpcReply a
GRpcOk
                       GRpcReply ()
e -> GRpcReply r -> ConduitT () (GRpcReply r) IO ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (GRpcReply r -> ConduitT () (GRpcReply r) IO ())
-> GRpcReply r -> ConduitT () (GRpcReply r) IO ()
forall a b. (a -> b) -> a -> b
$ (\()
_ -> String -> r
forall a. HasCallStack => String -> a
error String
"this should never happen") (() -> r) -> GRpcReply () -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> GRpcReply ()
e
         ConduitT () (GRpcReply r) IO ()
-> IO (ConduitT () (GRpcReply r) IO ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ConduitT () (GRpcReply r) IO ()
go

instance ( KnownName name
         , GRpcInputWrapper p vref v, GRPCOutput (RPCTy p) ()
         , handler ~ (v -> IO (GRpcReply ())) )
         => GRpcMethodCall p ('Method name '[ 'ArgSingle aname vref ]
                                      'RetNothing) handler where
  gRpcMethodCall :: RPCTy p
-> Proxy
     @(Method @* Symbol Symbol Symbol (TypeRef Symbol))
     ('Method
        @*
        @Symbol
        @Symbol
        @Symbol
        @(TypeRef Symbol)
        name
        ((':)
           @(Argument @* Symbol Symbol (TypeRef Symbol))
           ('ArgSingle @* @Symbol @(TypeRef Symbol) @Symbol aname vref)
           ('[] @(Argument @* Symbol Symbol (TypeRef Symbol))))
        ('RetNothing @* @Symbol @(TypeRef Symbol)))
-> GrpcClient
-> handler
gRpcMethodCall RPCTy p
rpc Proxy
  @(Method @* Symbol Symbol Symbol (TypeRef Symbol))
  ('Method
     @*
     @Symbol
     @Symbol
     @Symbol
     @(TypeRef Symbol)
     name
     ((':)
        @(Argument @* Symbol Symbol (TypeRef Symbol))
        ('ArgSingle @* @Symbol @(TypeRef Symbol) @Symbol aname vref)
        ('[] @(Argument @* Symbol Symbol (TypeRef Symbol))))
     ('RetNothing @* @Symbol @(TypeRef Symbol)))
_ GrpcClient
client v
x
    = ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> IO (GRpcReply ()))
-> ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$
      Either TooMuchConcurrency (RawReply ()) -> GRpcReply ()
forall a. Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 (Either TooMuchConcurrency (RawReply ()) -> GRpcReply ())
-> ExceptT ClientError IO (Either TooMuchConcurrency (RawReply ()))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
      RPCTy p
-> GrpcClient
-> GRpcIWTy @Symbol p vref v
-> ExceptT ClientError IO (Either TooMuchConcurrency (RawReply ()))
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> i
-> ClientIO (Either TooMuchConcurrency (RawReply o))
rawUnary @_ @(GRpcIWTy p vref v) @() RPCTy p
rpc GrpcClient
client (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) vref -> v -> GRpcIWTy @Symbol p vref v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> r -> GRpcIWTy @snm p ref r
buildGRpcIWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) vref
forall k (t :: k). Proxy @k t
Proxy @vref) v
x)

instance ( KnownName name
         , GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r
         , handler ~ (v -> IO (GRpcReply r)) )
         => GRpcMethodCall p ('Method name '[ 'ArgSingle aname vref ]
                                      ('RetSingle rref)) handler where
  gRpcMethodCall :: RPCTy p
-> Proxy
     @(Method @* Symbol Symbol Symbol (TypeRef Symbol))
     ('Method
        @*
        @Symbol
        @Symbol
        @Symbol
        @(TypeRef Symbol)
        name
        ((':)
           @(Argument @* Symbol Symbol (TypeRef Symbol))
           ('ArgSingle @* @Symbol @(TypeRef Symbol) @Symbol aname vref)
           ('[] @(Argument @* Symbol Symbol (TypeRef Symbol))))
        ('RetSingle @* @(TypeRef Symbol) @Symbol rref))
-> GrpcClient
-> handler
gRpcMethodCall RPCTy p
rpc Proxy
  @(Method @* Symbol Symbol Symbol (TypeRef Symbol))
  ('Method
     @*
     @Symbol
     @Symbol
     @Symbol
     @(TypeRef Symbol)
     name
     ((':)
        @(Argument @* Symbol Symbol (TypeRef Symbol))
        ('ArgSingle @* @Symbol @(TypeRef Symbol) @Symbol aname vref)
        ('[] @(Argument @* Symbol Symbol (TypeRef Symbol))))
     ('RetSingle @* @(TypeRef Symbol) @Symbol rref))
_ GrpcClient
client v
x
    = (GRpcReply (GRpcOWTy @Symbol p rref r) -> GRpcReply r)
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> IO (GRpcReply r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((GRpcOWTy @Symbol p rref r -> r)
-> GRpcReply (GRpcOWTy @Symbol p rref r) -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) rref -> GRpcOWTy @Symbol p rref r -> r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> GRpcOWTy @snm p ref r -> r
unGRpcOWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) rref
forall k (t :: k). Proxy @k t
Proxy @rref))) (IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> IO (GRpcReply r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> IO (GRpcReply r)
forall a b. (a -> b) -> a -> b
$
      ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
 -> IO (GRpcReply (GRpcOWTy @Symbol p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall a b. (a -> b) -> a -> b
$
      Either TooMuchConcurrency (RawReply (GRpcOWTy @Symbol p rref r))
-> GRpcReply (GRpcOWTy @Symbol p rref r)
forall a. Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 (Either TooMuchConcurrency (RawReply (GRpcOWTy @Symbol p rref r))
 -> GRpcReply (GRpcOWTy @Symbol p rref r))
-> ExceptT
     ClientError
     IO
     (Either TooMuchConcurrency (RawReply (GRpcOWTy @Symbol p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
      RPCTy p
-> GrpcClient
-> GRpcIWTy @Symbol p vref v
-> ExceptT
     ClientError
     IO
     (Either TooMuchConcurrency (RawReply (GRpcOWTy @Symbol p rref r)))
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> i
-> ClientIO (Either TooMuchConcurrency (RawReply o))
rawUnary @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
               RPCTy p
rpc GrpcClient
client (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) vref -> v -> GRpcIWTy @Symbol p vref v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> r -> GRpcIWTy @snm p ref r
buildGRpcIWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) vref
forall k (t :: k). Proxy @k t
Proxy @vref) v
x)

instance ( KnownName name
         , GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r
         , handler ~ (v -> IO (ConduitT () (GRpcReply r) IO ())) )
         => GRpcMethodCall p ('Method name '[ 'ArgSingle aname vref ]
                                      ('RetStream rref)) handler where
  gRpcMethodCall :: RPCTy p
-> Proxy
     @(Method @* Symbol Symbol Symbol (TypeRef Symbol))
     ('Method
        @*
        @Symbol
        @Symbol
        @Symbol
        @(TypeRef Symbol)
        name
        ((':)
           @(Argument @* Symbol Symbol (TypeRef Symbol))
           ('ArgSingle @* @Symbol @(TypeRef Symbol) @Symbol aname vref)
           ('[] @(Argument @* Symbol Symbol (TypeRef Symbol))))
        ('RetStream @* @(TypeRef Symbol) @Symbol rref))
-> GrpcClient
-> handler
gRpcMethodCall RPCTy p
rpc Proxy
  @(Method @* Symbol Symbol Symbol (TypeRef Symbol))
  ('Method
     @*
     @Symbol
     @Symbol
     @Symbol
     @(TypeRef Symbol)
     name
     ((':)
        @(Argument @* Symbol Symbol (TypeRef Symbol))
        ('ArgSingle @* @Symbol @(TypeRef Symbol) @Symbol aname vref)
        ('[] @(Argument @* Symbol Symbol (TypeRef Symbol))))
     ('RetStream @* @(TypeRef Symbol) @Symbol rref))
_ GrpcClient
client v
x
    = do -- Create a new TMChan
         TMChan r
chan <- IO (TMChan r)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan r)
         TMVar (GRpcReply ())
var  <- IO (TMVar (GRpcReply ()))
forall a. IO (TMVar a)
newEmptyTMVarIO  -- if full, this means an error
         -- Start executing the client in another thread
         Async ()
_ <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ do
            GRpcReply ()
v <- ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> IO (GRpcReply ()))
-> ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$
                 Either
  TooMuchConcurrency
  ((), [(ByteString, ByteString)], [(ByteString, ByteString)])
-> GRpcReply ()
forall v. Either TooMuchConcurrency v -> GRpcReply ()
buildGRpcReply3 (Either
   TooMuchConcurrency
   ((), [(ByteString, ByteString)], [(ByteString, ByteString)])
 -> GRpcReply ())
-> ExceptT
     ClientError
     IO
     (Either
        TooMuchConcurrency
        ((), [(ByteString, ByteString)], [(ByteString, ByteString)]))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
                 RPCTy p
-> GrpcClient
-> ()
-> GRpcIWTy @Symbol p vref v
-> (()
    -> [(ByteString, ByteString)]
    -> GRpcOWTy @Symbol p rref r
    -> ClientIO ())
-> ExceptT
     ClientError
     IO
     (Either
        TooMuchConcurrency
        ((), [(ByteString, ByteString)], [(ByteString, ByteString)]))
forall r i o a.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> a
-> i
-> (a -> [(ByteString, ByteString)] -> o -> ClientIO a)
-> ClientIO
     (Either
        TooMuchConcurrency
        (a, [(ByteString, ByteString)], [(ByteString, ByteString)]))
rawStreamServer @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
                                 RPCTy p
rpc GrpcClient
client () (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) vref -> v -> GRpcIWTy @Symbol p vref v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> r -> GRpcIWTy @snm p ref r
buildGRpcIWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) vref
forall k (t :: k). Proxy @k t
Proxy @vref) v
x)
                                 (\()
_ [(ByteString, ByteString)]
_ GRpcOWTy @Symbol p rref r
newVal -> IO () -> ClientIO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ClientIO ()) -> IO () -> ClientIO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                                   -- on the first iteration, say that everything is OK
                                   Bool
_ <- TMVar (GRpcReply ()) -> GRpcReply () -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar (GRpcReply ())
var (() -> GRpcReply ()
forall a. a -> GRpcReply a
GRpcOk ())
                                   TMChan r -> r -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan r
chan (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) rref -> GRpcOWTy @Symbol p rref r -> r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> GRpcOWTy @snm p ref r -> r
unGRpcOWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) rref
forall k (t :: k). Proxy @k t
Proxy @rref) GRpcOWTy @Symbol p rref r
newVal))
            case GRpcReply ()
v of
              GRpcOk () -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan r -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan r
chan
              GRpcReply ()
_         -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> GRpcReply () -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (GRpcReply ())
var GRpcReply ()
v
         -- This conduit feeds information to the other thread
         let go :: ConduitT () (GRpcReply r) IO ()
go = do GRpcReply ()
firstResult <- IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ()))
-> IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ STM (GRpcReply ()) -> IO (GRpcReply ())
forall a. STM a -> IO a
atomically (STM (GRpcReply ()) -> IO (GRpcReply ()))
-> STM (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> STM (GRpcReply ())
forall a. TMVar a -> STM a
takeTMVar TMVar (GRpcReply ())
var
                     case GRpcReply ()
firstResult of
                       GRpcOk ()
_ -> -- no error, everything is fine
                         TMChan r -> ConduitT () r IO ()
forall (m :: * -> *) a. MonadIO m => TMChan a -> ConduitT () a m ()
sourceTMChan TMChan r
chan ConduitT () r IO ()
-> ConduitM r (GRpcReply r) IO ()
-> ConduitT () (GRpcReply r) IO ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (r -> GRpcReply r) -> ConduitM r (GRpcReply r) IO ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.map r -> GRpcReply r
forall a. a -> GRpcReply a
GRpcOk
                       GRpcReply ()
e -> GRpcReply r -> ConduitT () (GRpcReply r) IO ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (GRpcReply r -> ConduitT () (GRpcReply r) IO ())
-> GRpcReply r -> ConduitT () (GRpcReply r) IO ()
forall a b. (a -> b) -> a -> b
$ (\()
_ -> String -> r
forall a. HasCallStack => String -> a
error String
"this should never happen") (() -> r) -> GRpcReply () -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> GRpcReply ()
e
         ConduitT () (GRpcReply r) IO ()
-> IO (ConduitT () (GRpcReply r) IO ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ConduitT () (GRpcReply r) IO ()
go

instance ( KnownName name
         , GRpcInputWrapper p vref v, GRPCOutput (RPCTy p) ()
         , handler ~ (CompressMode -> IO (ConduitT v Void IO (GRpcReply ()))) )
         => GRpcMethodCall p ('Method name '[ 'ArgStream aname vref ]
                                      'RetNothing) handler where
  gRpcMethodCall :: RPCTy p
-> Proxy
     @(Method @* Symbol Symbol Symbol (TypeRef Symbol))
     ('Method
        @*
        @Symbol
        @Symbol
        @Symbol
        @(TypeRef Symbol)
        name
        ((':)
           @(Argument @* Symbol Symbol (TypeRef Symbol))
           ('ArgStream @* @Symbol @(TypeRef Symbol) @Symbol aname vref)
           ('[] @(Argument @* Symbol Symbol (TypeRef Symbol))))
        ('RetNothing @* @Symbol @(TypeRef Symbol)))
-> GrpcClient
-> handler
gRpcMethodCall RPCTy p
rpc Proxy
  @(Method @* Symbol Symbol Symbol (TypeRef Symbol))
  ('Method
     @*
     @Symbol
     @Symbol
     @Symbol
     @(TypeRef Symbol)
     name
     ((':)
        @(Argument @* Symbol Symbol (TypeRef Symbol))
        ('ArgStream @* @Symbol @(TypeRef Symbol) @Symbol aname vref)
        ('[] @(Argument @* Symbol Symbol (TypeRef Symbol))))
     ('RetNothing @* @Symbol @(TypeRef Symbol)))
_ GrpcClient
client CompressMode
compress
    = do -- Create a new TMChan
         TMChan v
chan <- IO (TMChan v)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan v)
         -- Start executing the client in another thread
         Async (GRpcReply ())
promise <- IO (GRpcReply ()) -> IO (Async (GRpcReply ()))
forall a. IO a -> IO (Async a)
async (IO (GRpcReply ()) -> IO (Async (GRpcReply ())))
-> IO (GRpcReply ()) -> IO (Async (GRpcReply ()))
forall a b. (a -> b) -> a -> b
$
            ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> IO (GRpcReply ()))
-> ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$
            Either TooMuchConcurrency ((), RawReply ()) -> GRpcReply ()
forall r a.
Either TooMuchConcurrency (r, RawReply a) -> GRpcReply a
buildGRpcReply2 (Either TooMuchConcurrency ((), RawReply ()) -> GRpcReply ())
-> ExceptT
     ClientError IO (Either TooMuchConcurrency ((), RawReply ()))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
            RPCTy p
-> GrpcClient
-> ()
-> (()
    -> ClientIO
         ((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v)))
-> ExceptT
     ClientError IO (Either TooMuchConcurrency ((), RawReply ()))
forall r i o a.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> a
-> (a -> ClientIO (a, Either StreamDone (CompressMode, i)))
-> ClientIO (Either TooMuchConcurrency (a, RawReply o))
rawStreamClient @_ @(GRpcIWTy p vref v) @() RPCTy p
rpc GrpcClient
client ()
                            (\()
_ -> do Maybe v
nextVal <- IO (Maybe v) -> ExceptT ClientError IO (Maybe v)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe v) -> ExceptT ClientError IO (Maybe v))
-> IO (Maybe v) -> ExceptT ClientError IO (Maybe v)
forall a b. (a -> b) -> a -> b
$ STM (Maybe v) -> IO (Maybe v)
forall a. STM a -> IO a
atomically (STM (Maybe v) -> IO (Maybe v)) -> STM (Maybe v) -> IO (Maybe v)
forall a b. (a -> b) -> a -> b
$ TMChan v -> STM (Maybe v)
forall a. TMChan a -> STM (Maybe a)
readTMChan TMChan v
chan
                                      case Maybe v
nextVal of
                                        Maybe v
Nothing -> ((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v))
-> ClientIO
     ((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v))
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), StreamDone
-> Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v)
forall a b. a -> Either a b
Left StreamDone
StreamDone)
                                        Just v
v  -> ((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v))
-> ClientIO
     ((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v))
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), (CompressMode, GRpcIWTy @Symbol p vref v)
-> Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v)
forall a b. b -> Either a b
Right (CompressMode
compress, Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) vref -> v -> GRpcIWTy @Symbol p vref v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> r -> GRpcIWTy @snm p ref r
buildGRpcIWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) vref
forall k (t :: k). Proxy @k t
Proxy @vref) v
v)))
         ConduitT v Void IO (GRpcReply ())
-> IO (ConduitT v Void IO (GRpcReply ()))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TMChan v
-> Async (GRpcReply ()) -> ConduitT v Void IO (GRpcReply ())
forall (m :: * -> *) a b o.
MonadIO m =>
TMChan a -> Async b -> ConduitT a o m b
conduitFromChannel TMChan v
chan Async (GRpcReply ())
promise)

instance ( KnownName name
         , GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r
         , handler ~ (CompressMode -> IO (ConduitT v Void IO (GRpcReply r))) )
         => GRpcMethodCall p ('Method name '[ 'ArgStream aname vref ]
                                      ('RetSingle rref)) handler where
  gRpcMethodCall :: RPCTy p
-> Proxy
     @(Method @* Symbol Symbol Symbol (TypeRef Symbol))
     ('Method
        @*
        @Symbol
        @Symbol
        @Symbol
        @(TypeRef Symbol)
        name
        ((':)
           @(Argument @* Symbol Symbol (TypeRef Symbol))
           ('ArgStream @* @Symbol @(TypeRef Symbol) @Symbol aname vref)
           ('[] @(Argument @* Symbol Symbol (TypeRef Symbol))))
        ('RetSingle @* @(TypeRef Symbol) @Symbol rref))
-> GrpcClient
-> handler
gRpcMethodCall RPCTy p
rpc Proxy
  @(Method @* Symbol Symbol Symbol (TypeRef Symbol))
  ('Method
     @*
     @Symbol
     @Symbol
     @Symbol
     @(TypeRef Symbol)
     name
     ((':)
        @(Argument @* Symbol Symbol (TypeRef Symbol))
        ('ArgStream @* @Symbol @(TypeRef Symbol) @Symbol aname vref)
        ('[] @(Argument @* Symbol Symbol (TypeRef Symbol))))
     ('RetSingle @* @(TypeRef Symbol) @Symbol rref))
_ GrpcClient
client CompressMode
compress
    = do -- Create a new TMChan
         TMChan v
chan <- IO (TMChan v)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan v)
         -- Start executing the client in another thread
         Async (GRpcReply r)
promise <- IO (GRpcReply r) -> IO (Async (GRpcReply r))
forall a. IO a -> IO (Async a)
async (IO (GRpcReply r) -> IO (Async (GRpcReply r)))
-> IO (GRpcReply r) -> IO (Async (GRpcReply r))
forall a b. (a -> b) -> a -> b
$
            (GRpcReply (GRpcOWTy @Symbol p rref r) -> GRpcReply r)
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> IO (GRpcReply r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((GRpcOWTy @Symbol p rref r -> r)
-> GRpcReply (GRpcOWTy @Symbol p rref r) -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) rref -> GRpcOWTy @Symbol p rref r -> r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> GRpcOWTy @snm p ref r -> r
unGRpcOWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) rref
forall k (t :: k). Proxy @k t
Proxy @rref))) (IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> IO (GRpcReply r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r)) -> IO (GRpcReply r)
forall a b. (a -> b) -> a -> b
$
            ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
 -> IO (GRpcReply (GRpcOWTy @Symbol p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
-> IO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall a b. (a -> b) -> a -> b
$
            Either
  TooMuchConcurrency ((), RawReply (GRpcOWTy @Symbol p rref r))
-> GRpcReply (GRpcOWTy @Symbol p rref r)
forall r a.
Either TooMuchConcurrency (r, RawReply a) -> GRpcReply a
buildGRpcReply2 (Either
   TooMuchConcurrency ((), RawReply (GRpcOWTy @Symbol p rref r))
 -> GRpcReply (GRpcOWTy @Symbol p rref r))
-> ExceptT
     ClientError
     IO
     (Either
        TooMuchConcurrency ((), RawReply (GRpcOWTy @Symbol p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy @Symbol p rref r))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
            RPCTy p
-> GrpcClient
-> ()
-> (()
    -> ClientIO
         ((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v)))
-> ExceptT
     ClientError
     IO
     (Either
        TooMuchConcurrency ((), RawReply (GRpcOWTy @Symbol p rref r)))
forall r i o a.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> a
-> (a -> ClientIO (a, Either StreamDone (CompressMode, i)))
-> ClientIO (Either TooMuchConcurrency (a, RawReply o))
rawStreamClient @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r) RPCTy p
rpc GrpcClient
client ()
                            (\()
_ -> do Maybe v
nextVal <- IO (Maybe v) -> ExceptT ClientError IO (Maybe v)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe v) -> ExceptT ClientError IO (Maybe v))
-> IO (Maybe v) -> ExceptT ClientError IO (Maybe v)
forall a b. (a -> b) -> a -> b
$ STM (Maybe v) -> IO (Maybe v)
forall a. STM a -> IO a
atomically (STM (Maybe v) -> IO (Maybe v)) -> STM (Maybe v) -> IO (Maybe v)
forall a b. (a -> b) -> a -> b
$ TMChan v -> STM (Maybe v)
forall a. TMChan a -> STM (Maybe a)
readTMChan TMChan v
chan
                                      case Maybe v
nextVal of
                                        Maybe v
Nothing -> ((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v))
-> ClientIO
     ((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v))
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), StreamDone
-> Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v)
forall a b. a -> Either a b
Left StreamDone
StreamDone)
                                        Just v
v  -> ((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v))
-> ClientIO
     ((), Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v))
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), (CompressMode, GRpcIWTy @Symbol p vref v)
-> Either StreamDone (CompressMode, GRpcIWTy @Symbol p vref v)
forall a b. b -> Either a b
Right (CompressMode
compress, Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) vref -> v -> GRpcIWTy @Symbol p vref v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> r -> GRpcIWTy @snm p ref r
buildGRpcIWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) vref
forall k (t :: k). Proxy @k t
Proxy @vref) v
v)))
         ConduitT v Void IO (GRpcReply r)
-> IO (ConduitT v Void IO (GRpcReply r))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TMChan v -> Async (GRpcReply r) -> ConduitT v Void IO (GRpcReply r)
forall (m :: * -> *) a b o.
MonadIO m =>
TMChan a -> Async b -> ConduitT a o m b
conduitFromChannel TMChan v
chan Async (GRpcReply r)
promise)

conduitFromChannel :: MonadIO m => TMChan a -> Async b -> ConduitT a o m b
conduitFromChannel :: TMChan a -> Async b -> ConduitT a o m b
conduitFromChannel TMChan a
chan Async b
promise = ConduitT a o m b
go
  where go :: ConduitT a o m b
go = do Maybe a
x <- ConduitT a o m (Maybe a)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await
                case Maybe a
x of
                  Just a
v  -> do IO () -> ConduitT a o m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT a o m ()) -> IO () -> ConduitT a o m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan a -> a -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan a
chan a
v
                                ConduitT a o m b
go
                  Maybe a
Nothing -> do IO () -> ConduitT a o m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT a o m ()) -> IO () -> ConduitT a o m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan a -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan a
chan
                                IO b -> ConduitT a o m b
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO b -> ConduitT a o m b) -> IO b -> ConduitT a o m b
forall a b. (a -> b) -> a -> b
$ Async b -> IO b
forall a. Async a -> IO a
wait Async b
promise

instance ( KnownName name
         , GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r
         , handler ~ (CompressMode -> IO (ConduitT v Void IO (), ConduitT () r IO (GRpcReply ()))))
         => GRpcMethodCall p ('Method name '[ 'ArgStream aname vref ]
                                      ('RetStream rref)) handler where
  gRpcMethodCall :: RPCTy p
-> Proxy
     @(Method @* Symbol Symbol Symbol (TypeRef Symbol))
     ('Method
        @*
        @Symbol
        @Symbol
        @Symbol
        @(TypeRef Symbol)
        name
        ((':)
           @(Argument @* Symbol Symbol (TypeRef Symbol))
           ('ArgStream @* @Symbol @(TypeRef Symbol) @Symbol aname vref)
           ('[] @(Argument @* Symbol Symbol (TypeRef Symbol))))
        ('RetStream @* @(TypeRef Symbol) @Symbol rref))
-> GrpcClient
-> handler
gRpcMethodCall RPCTy p
rpc Proxy
  @(Method @* Symbol Symbol Symbol (TypeRef Symbol))
  ('Method
     @*
     @Symbol
     @Symbol
     @Symbol
     @(TypeRef Symbol)
     name
     ((':)
        @(Argument @* Symbol Symbol (TypeRef Symbol))
        ('ArgStream @* @Symbol @(TypeRef Symbol) @Symbol aname vref)
        ('[] @(Argument @* Symbol Symbol (TypeRef Symbol))))
     ('RetStream @* @(TypeRef Symbol) @Symbol rref))
_ GrpcClient
client CompressMode
compress
    = do TMChan r
serverChan <- IO (TMChan r)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan r)
         TMChan v
clientChan <- IO (TMChan v)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan v)
         TMVar (GRpcReply ())
finalReply <- IO (TMVar (GRpcReply ()))
forall a. IO (TMVar a)
newEmptyTMVarIO :: IO (TMVar (GRpcReply ()))
         -- Start executing the client in another thread
         -- TODO: Is there anything that makes sure that this thread doesn't keep running forever?
         Async ()
_ <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ do
            GRpcReply ()
v <- ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> IO (GRpcReply ()))
-> ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$
                 Either TooMuchConcurrency ((), ()) -> GRpcReply ()
forall v. Either TooMuchConcurrency v -> GRpcReply ()
buildGRpcReply3 (Either TooMuchConcurrency ((), ()) -> GRpcReply ())
-> ExceptT ClientError IO (Either TooMuchConcurrency ((), ()))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
                 RPCTy p
-> GrpcClient
-> ()
-> (()
    -> IncomingEvent (GRpcOWTy @Symbol p rref r) () -> ClientIO ())
-> ()
-> (()
    -> ClientIO ((), OutgoingEvent (GRpcIWTy @Symbol p vref v) ()))
-> ExceptT ClientError IO (Either TooMuchConcurrency ((), ()))
forall r i o a b.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> a
-> (a -> IncomingEvent o a -> ClientIO a)
-> b
-> (b -> ClientIO (b, OutgoingEvent i b))
-> ClientIO (Either TooMuchConcurrency (a, b))
rawGeneralStream
                   @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
                   RPCTy p
rpc GrpcClient
client
                   () (TMChan r
-> ()
-> IncomingEvent (GRpcOWTy @Symbol p rref r) ()
-> ClientIO ()
incomingEventConsumer TMChan r
serverChan)
                   () (TMChan v
-> ()
-> ClientIO ((), OutgoingEvent (GRpcIWTy @Symbol p vref v) ())
outgoingEventProducer TMChan v
clientChan)
            IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> GRpcReply () -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (GRpcReply ())
finalReply GRpcReply ()
v
         let clientConduit :: ConduitT v Void IO ()
clientConduit = do
               TMChan v -> ConduitT v Void IO ()
forall (m :: * -> *) a z.
MonadIO m =>
TMChan a -> ConduitT a z m ()
sinkTMChan TMChan v
clientChan
               IO () -> ConduitT v Void IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT v Void IO ())
-> (TMChan v -> IO ()) -> TMChan v -> ConduitT v Void IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> (TMChan v -> STM ()) -> TMChan v -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TMChan v -> STM ()
forall a. TMChan a -> STM ()
closeTMChan (TMChan v -> ConduitT v Void IO ())
-> TMChan v -> ConduitT v Void IO ()
forall a b. (a -> b) -> a -> b
$ TMChan v
clientChan
             serverConduit :: ConduitT () r IO (GRpcReply ())
serverConduit = do
               TMChan r -> ConduitT () r IO ()
forall (m :: * -> *) a. MonadIO m => TMChan a -> ConduitT () a m ()
sourceTMChan TMChan r
serverChan
               IO (GRpcReply ()) -> ConduitT () r IO (GRpcReply ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (GRpcReply ()) -> ConduitT () r IO (GRpcReply ()))
-> (TMVar (GRpcReply ()) -> IO (GRpcReply ()))
-> TMVar (GRpcReply ())
-> ConduitT () r IO (GRpcReply ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (GRpcReply ()) -> IO (GRpcReply ())
forall a. STM a -> IO a
atomically (STM (GRpcReply ()) -> IO (GRpcReply ()))
-> (TMVar (GRpcReply ()) -> STM (GRpcReply ()))
-> TMVar (GRpcReply ())
-> IO (GRpcReply ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TMVar (GRpcReply ()) -> STM (GRpcReply ())
forall a. TMVar a -> STM a
readTMVar (TMVar (GRpcReply ()) -> ConduitT () r IO (GRpcReply ()))
-> TMVar (GRpcReply ()) -> ConduitT () r IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ())
finalReply
         (ConduitT v Void IO (), ConduitT () r IO (GRpcReply ()))
-> IO (ConduitT v Void IO (), ConduitT () r IO (GRpcReply ()))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ConduitT v Void IO ()
clientConduit, ConduitT () r IO (GRpcReply ())
serverConduit)
         where
           incomingEventConsumer :: TMChan r -> () -> IncomingEvent (GRpcOWTy p rref r) () -> ExceptT ClientError IO ()
           incomingEventConsumer :: TMChan r
-> ()
-> IncomingEvent (GRpcOWTy @Symbol p rref r) ()
-> ClientIO ()
incomingEventConsumer TMChan r
serverChan ()
_ IncomingEvent (GRpcOWTy @Symbol p rref r) ()
ievent =
             case IncomingEvent (GRpcOWTy @Symbol p rref r) ()
ievent of
               RecvMessage GRpcOWTy @Symbol p rref r
o -> do
                 IO () -> ClientIO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ClientIO ()) -> IO () -> ClientIO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan r -> r -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan r
serverChan (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) rref -> GRpcOWTy @Symbol p rref r -> r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> GRpcOWTy @snm p ref r -> r
unGRpcOWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) rref
forall k (t :: k). Proxy @k t
Proxy @rref) GRpcOWTy @Symbol p rref r
o)
               Invalid SomeException
e -> IO () -> ClientIO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ClientIO ()) -> IO () -> ClientIO ()
forall a b. (a -> b) -> a -> b
$ do
                 STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan r -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan r
serverChan
                 SomeException -> IO ()
forall e a. Exception e => e -> IO a
throwIO SomeException
e
               Trailers [(ByteString, ByteString)]
_ ->
                 -- TODO: Read the trailers and use them to make the 'finalReply'
                 IO () -> ClientIO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ClientIO ()) -> IO () -> ClientIO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan r -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan r
serverChan
               Headers [(ByteString, ByteString)]
_ ->
                 -- TODO: Read the headers and use them to make the 'finalReply'
                 () -> ClientIO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

           outgoingEventProducer :: TMChan v -> () -> ExceptT ClientError IO ((), OutgoingEvent (GRpcIWTy p vref v) ())
           outgoingEventProducer :: TMChan v
-> ()
-> ClientIO ((), OutgoingEvent (GRpcIWTy @Symbol p vref v) ())
outgoingEventProducer TMChan v
clientChan ()
_ = do
             Maybe v
nextVal <- IO (Maybe v) -> ExceptT ClientError IO (Maybe v)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe v) -> ExceptT ClientError IO (Maybe v))
-> IO (Maybe v) -> ExceptT ClientError IO (Maybe v)
forall a b. (a -> b) -> a -> b
$ STM (Maybe v) -> IO (Maybe v)
forall a. STM a -> IO a
atomically (STM (Maybe v) -> IO (Maybe v)) -> STM (Maybe v) -> IO (Maybe v)
forall a b. (a -> b) -> a -> b
$ TMChan v -> STM (Maybe v)
forall a. TMChan a -> STM (Maybe a)
readTMChan TMChan v
clientChan
             case Maybe v
nextVal of
               Maybe v
Nothing -> ((), OutgoingEvent (GRpcIWTy @Symbol p vref v) ())
-> ClientIO ((), OutgoingEvent (GRpcIWTy @Symbol p vref v) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), OutgoingEvent (GRpcIWTy @Symbol p vref v) ()
forall i b. OutgoingEvent i b
Finalize)
               Just v
v  -> ((), OutgoingEvent (GRpcIWTy @Symbol p vref v) ())
-> ClientIO ((), OutgoingEvent (GRpcIWTy @Symbol p vref v) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), CompressMode
-> GRpcIWTy @Symbol p vref v
-> OutgoingEvent (GRpcIWTy @Symbol p vref v) ()
forall i b. CompressMode -> i -> OutgoingEvent i b
SendMessage CompressMode
compress (Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef Symbol) vref -> v -> GRpcIWTy @Symbol p vref v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper @snm p ref r =>
Proxy @GRpcMessageProtocol p
-> Proxy @(TypeRef snm) ref -> r -> GRpcIWTy @snm p ref r
buildGRpcIWTy (Proxy @GRpcMessageProtocol p
forall k (t :: k). Proxy @k t
Proxy @p) (Proxy @(TypeRef Symbol) vref
forall k (t :: k). Proxy @k t
Proxy @vref) v
v))