{-# language FlexibleContexts #-}
{-# language OverloadedStrings #-}
{-# language ScopedTypeVariables #-}
module Mu.GraphQL.Subscription.Protocol where
import Control.Applicative
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Monad (forM_)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Data.Aeson ((.:), (.:?), (.=))
import qualified Data.Aeson as A
import Data.Conduit
import qualified Data.HashMap.Strict as HM
import qualified Data.Text as T
import Language.GraphQL.AST
import qualified ListT as L
import Network.WebSockets
import qualified StmContainers.Map as M
import qualified Mu.GraphQL.Quasi.LostParser as P
import Mu.GraphQL.Query.Parse
protocol :: ( Maybe T.Text -> VariableMapC -> [Definition]
-> ConduitT A.Value Void IO ()
-> IO () )
-> Connection -> IO ()
protocol :: (Maybe Text
-> VariableMapC
-> [Definition]
-> ConduitT Value Void IO ()
-> IO ())
-> Connection -> IO ()
protocol Maybe Text
-> VariableMapC
-> [Definition]
-> ConduitT Value Void IO ()
-> IO ()
f Connection
conn = IO ()
start
where
start :: IO ()
start = do
Maybe ClientMessage
msg <- Connection -> IO (Maybe ClientMessage)
forall a. FromJSON a => Connection -> IO (Maybe a)
receiveJSON Connection
conn
case Maybe ClientMessage
msg of
Just (GQLConnectionInit Maybe Value
_)
-> do
Connection -> ServerMessage -> IO ()
forall a. ToJSON a => Connection -> a -> IO ()
sendJSON Connection
conn ServerMessage
GQLConnectionAck
Map Text (Async ())
vars <- IO (Map Text (Async ()))
forall key value. IO (Map key value)
M.newIO
IO Any -> (Async Any -> IO ()) -> IO ()
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync IO Any
forall b. IO b
keepAlive ((Async Any -> IO ()) -> IO ()) -> (Async Any -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Async Any
ka ->
Async Any -> Map Text (Async ()) -> IO ()
forall a. Async a -> Map Text (Async ()) -> IO ()
listen Async Any
ka Map Text (Async ())
vars
Maybe ClientMessage
_ -> IO ()
start
keepAlive :: IO b
keepAlive = do
Connection -> ServerMessage -> IO ()
forall a. ToJSON a => Connection -> a -> IO ()
sendJSON Connection
conn ServerMessage
GQLKeepAlive
Int -> IO ()
threadDelay Int
1000000
IO b
keepAlive
listen :: Async a -> Map Text (Async ()) -> IO ()
listen Async a
ka Map Text (Async ())
vars = do
Maybe ClientMessage
msg <- Connection -> IO (Maybe ClientMessage)
forall a. FromJSON a => Connection -> IO (Maybe a)
receiveJSON Connection
conn
case Maybe ClientMessage
msg of
Just (GQLStart Text
i Text
q VariableMapC
v Maybe Text
o)
-> IO () -> (Async () -> IO ()) -> IO ()
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync (Text -> Text -> VariableMapC -> Maybe Text -> IO ()
handle Text
i Text
q VariableMapC
v Maybe Text
o IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> STM () -> IO ()
forall a. STM a -> IO a
atomically (Text -> Map Text (Async ()) -> STM ()
forall key value.
(Eq key, Hashable key) =>
key -> Map key value -> STM ()
M.delete Text
i Map Text (Async ())
vars)) ((Async () -> IO ()) -> IO ()) -> (Async () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Async ()
t -> do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ Async () -> Text -> Map Text (Async ()) -> STM ()
forall key value.
(Eq key, Hashable key) =>
value -> key -> Map key value -> STM ()
M.insert Async ()
t Text
i Map Text (Async ())
vars
Async a -> Map Text (Async ()) -> IO ()
listen Async a
ka Map Text (Async ())
vars
Just (GQLStop Text
i)
-> do Maybe (Async ())
r <- STM (Maybe (Async ())) -> IO (Maybe (Async ()))
forall a. STM a -> IO a
atomically (STM (Maybe (Async ())) -> IO (Maybe (Async ())))
-> STM (Maybe (Async ())) -> IO (Maybe (Async ()))
forall a b. (a -> b) -> a -> b
$ Text -> Map Text (Async ()) -> STM (Maybe (Async ()))
forall key value.
(Eq key, Hashable key) =>
key -> Map key value -> STM (Maybe value)
M.lookup Text
i Map Text (Async ())
vars
case Maybe (Async ())
r of
Maybe (Async ())
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just Async ()
a -> do Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
a
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ Text -> Map Text (Async ()) -> STM ()
forall key value.
(Eq key, Hashable key) =>
key -> Map key value -> STM ()
M.delete Text
i Map Text (Async ())
vars
Async a -> Map Text (Async ()) -> IO ()
listen Async a
ka Map Text (Async ())
vars
Just ClientMessage
GQLTerminate
-> do Async a -> Map Text (Async ()) -> IO ()
forall a a a. Async a -> Map a (Async a) -> IO ()
cancelAll Async a
ka Map Text (Async ())
vars
Connection -> Text -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
sendClose Connection
conn (Text
"GraphQL session terminated" :: T.Text)
Maybe ClientMessage
_ -> Async a -> Map Text (Async ()) -> IO ()
listen Async a
ka Map Text (Async ())
vars
handle :: Text -> Text -> VariableMapC -> Maybe Text -> IO ()
handle Text
i Text
q VariableMapC
v Maybe Text
o
= case Text -> Either Text [Definition]
P.parseDoc Text
q of
Left Text
err -> Connection -> ServerMessage -> IO ()
forall a. ToJSON a => Connection -> a -> IO ()
sendJSON Connection
conn (Text -> Value -> ServerMessage
GQLError Text
i (Text -> Value
forall a. ToJSON a => a -> Value
A.toJSON Text
err))
Right [Definition]
d -> do
Maybe Text
-> VariableMapC
-> [Definition]
-> ConduitT Value Void IO ()
-> IO ()
f Maybe Text
o VariableMapC
v [Definition]
d (Text -> ConduitT Value Void IO ()
forall (m :: * -> *) o. MonadIO m => Text -> ConduitT Value o m ()
cndt Text
i)
Connection -> ServerMessage -> IO ()
forall a. ToJSON a => Connection -> a -> IO ()
sendJSON Connection
conn (Text -> ServerMessage
GQLComplete Text
i)
cndt :: Text -> ConduitT Value o m ()
cndt Text
i = do
Maybe Value
msg <- ConduitT Value o m (Maybe Value)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await
case Maybe Value
msg of
Maybe Value
Nothing -> () -> ConduitT Value o m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just Value
v -> do IO () -> ConduitT Value o m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT Value o m ()) -> IO () -> ConduitT Value o m ()
forall a b. (a -> b) -> a -> b
$ Connection -> ServerMessage -> IO ()
forall a. ToJSON a => Connection -> a -> IO ()
sendJSON Connection
conn (Text -> Value -> ServerMessage
GQLData Text
i Value
v)
Text -> ConduitT Value o m ()
cndt Text
i
cancelAll :: Async a -> Map a (Async a) -> IO ()
cancelAll Async a
ka Map a (Async a)
vars
= do Async a -> IO ()
forall a. Async a -> IO ()
cancel Async a
ka
[(a, Async a)]
vs <- STM [(a, Async a)] -> IO [(a, Async a)]
forall a. STM a -> IO a
atomically (STM [(a, Async a)] -> IO [(a, Async a)])
-> STM [(a, Async a)] -> IO [(a, Async a)]
forall a b. (a -> b) -> a -> b
$ ListT STM (a, Async a) -> STM [(a, Async a)]
forall (m :: * -> *) a. Monad m => ListT m a -> m [a]
L.toList (ListT STM (a, Async a) -> STM [(a, Async a)])
-> ListT STM (a, Async a) -> STM [(a, Async a)]
forall a b. (a -> b) -> a -> b
$ Map a (Async a) -> ListT STM (a, Async a)
forall key value. Map key value -> ListT STM (key, value)
M.listT Map a (Async a)
vars
[Async a] -> (Async a -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (((a, Async a) -> Async a) -> [(a, Async a)] -> [Async a]
forall a b. (a -> b) -> [a] -> [b]
map (a, Async a) -> Async a
forall a b. (a, b) -> b
snd [(a, Async a)]
vs) Async a -> IO ()
forall a. Async a -> IO ()
cancel
receiveJSON :: A.FromJSON a => Connection -> IO (Maybe a)
receiveJSON :: Connection -> IO (Maybe a)
receiveJSON Connection
conn = do
ByteString
d <- Connection -> IO ByteString
forall a. WebSocketsData a => Connection -> IO a
receiveData Connection
conn
Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> IO (Maybe a)) -> Maybe a -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ ByteString -> Maybe a
forall a. FromJSON a => ByteString -> Maybe a
A.decode ByteString
d
sendJSON :: A.ToJSON a => Connection -> a -> IO ()
sendJSON :: Connection -> a -> IO ()
sendJSON Connection
conn a
v
= Connection -> ByteString -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
sendTextData Connection
conn (a -> ByteString
forall a. ToJSON a => a -> ByteString
A.encode a
v)
data ClientMessage
= GQLConnectionInit { ClientMessage -> Maybe Value
initPayload :: Maybe A.Value }
| GQLStart { ClientMessage -> Text
clientMsgId :: T.Text
, ClientMessage -> Text
query :: T.Text
, ClientMessage -> VariableMapC
variables :: VariableMapC
, ClientMessage -> Maybe Text
operationName :: Maybe T.Text}
| GQLStop { clientMsgId :: T.Text }
| GQLTerminate
deriving Int -> ClientMessage -> ShowS
[ClientMessage] -> ShowS
ClientMessage -> String
(Int -> ClientMessage -> ShowS)
-> (ClientMessage -> String)
-> ([ClientMessage] -> ShowS)
-> Show ClientMessage
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ClientMessage] -> ShowS
$cshowList :: [ClientMessage] -> ShowS
show :: ClientMessage -> String
$cshow :: ClientMessage -> String
showsPrec :: Int -> ClientMessage -> ShowS
$cshowsPrec :: Int -> ClientMessage -> ShowS
Show
data ServerMessage
= GQLConnectionError { ServerMessage -> Maybe Value
errorPayload :: Maybe A.Value }
| GQLConnectionAck
| GQLData { ServerMessage -> Text
serverMsgId :: T.Text
, ServerMessage -> Value
payload :: A.Value }
| GQLError { serverMsgId :: T.Text
, payload :: A.Value }
| GQLComplete { serverMsgId :: T.Text}
| GQLKeepAlive
deriving Int -> ServerMessage -> ShowS
[ServerMessage] -> ShowS
ServerMessage -> String
(Int -> ServerMessage -> ShowS)
-> (ServerMessage -> String)
-> ([ServerMessage] -> ShowS)
-> Show ServerMessage
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ServerMessage] -> ShowS
$cshowList :: [ServerMessage] -> ShowS
show :: ServerMessage -> String
$cshow :: ServerMessage -> String
showsPrec :: Int -> ServerMessage -> ShowS
$cshowsPrec :: Int -> ServerMessage -> ShowS
Show
instance A.FromJSON ClientMessage where
parseJSON :: Value -> Parser ClientMessage
parseJSON = String
-> (Object -> Parser ClientMessage)
-> Value
-> Parser ClientMessage
forall a. String -> (Object -> Parser a) -> Value -> Parser a
A.withObject String
"ClientMessage" ((Object -> Parser ClientMessage) -> Value -> Parser ClientMessage)
-> (Object -> Parser ClientMessage)
-> Value
-> Parser ClientMessage
forall a b. (a -> b) -> a -> b
$ \Object
v -> do
String
ty :: String <- Object
v Object -> Text -> Parser String
forall a. FromJSON a => Object -> Text -> Parser a
.: Text
"type"
case String
ty of
String
"connection_init"
-> Maybe Value -> ClientMessage
GQLConnectionInit (Maybe Value -> ClientMessage)
-> Parser (Maybe Value) -> Parser ClientMessage
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
v Object -> Text -> Parser (Maybe Value)
forall a. FromJSON a => Object -> Text -> Parser (Maybe a)
.:? Text
"payload"
String
"start"
-> do Text
i <- Object
v Object -> Text -> Parser Text
forall a. FromJSON a => Object -> Text -> Parser a
.: Text
"id"
(Text
q,VariableMapC
vrs,Maybe Text
opN) <- Object
v Object -> Text -> Parser Value
forall a. FromJSON a => Object -> Text -> Parser a
.: Text
"payload" Parser Value
-> (Value -> Parser (Text, VariableMapC, Maybe Text))
-> Parser (Text, VariableMapC, Maybe Text)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Value -> Parser (Text, VariableMapC, Maybe Text)
parsePayload
ClientMessage -> Parser ClientMessage
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ClientMessage -> Parser ClientMessage)
-> ClientMessage -> Parser ClientMessage
forall a b. (a -> b) -> a -> b
$ Text -> Text -> VariableMapC -> Maybe Text -> ClientMessage
GQLStart Text
i Text
q VariableMapC
vrs Maybe Text
opN
String
"stop"
-> Text -> ClientMessage
GQLStop (Text -> ClientMessage) -> Parser Text -> Parser ClientMessage
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
v Object -> Text -> Parser Text
forall a. FromJSON a => Object -> Text -> Parser a
.: Text
"id"
String
"terminate"
-> ClientMessage -> Parser ClientMessage
forall (f :: * -> *) a. Applicative f => a -> f a
pure ClientMessage
GQLTerminate
String
_ -> Parser ClientMessage
forall (f :: * -> *) a. Alternative f => f a
empty
where
parsePayload :: Value -> Parser (Text, VariableMapC, Maybe Text)
parsePayload = String
-> (Object -> Parser (Text, VariableMapC, Maybe Text))
-> Value
-> Parser (Text, VariableMapC, Maybe Text)
forall a. String -> (Object -> Parser a) -> Value -> Parser a
A.withObject String
"ClientMessage/GQL_START" ((Object -> Parser (Text, VariableMapC, Maybe Text))
-> Value -> Parser (Text, VariableMapC, Maybe Text))
-> (Object -> Parser (Text, VariableMapC, Maybe Text))
-> Value
-> Parser (Text, VariableMapC, Maybe Text)
forall a b. (a -> b) -> a -> b
$
\Object
v -> (,,) (Text
-> VariableMapC -> Maybe Text -> (Text, VariableMapC, Maybe Text))
-> Parser Text
-> Parser
(VariableMapC -> Maybe Text -> (Text, VariableMapC, Maybe Text))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
v Object -> Text -> Parser Text
forall a. FromJSON a => Object -> Text -> Parser a
.: Text
"query"
Parser
(VariableMapC -> Maybe Text -> (Text, VariableMapC, Maybe Text))
-> Parser VariableMapC
-> Parser (Maybe Text -> (Text, VariableMapC, Maybe Text))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> (Object
v Object -> Text -> Parser VariableMapC
forall a. FromJSON a => Object -> Text -> Parser a
.: Text
"variables" Parser VariableMapC -> Parser VariableMapC -> Parser VariableMapC
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> VariableMapC -> Parser VariableMapC
forall (f :: * -> *) a. Applicative f => a -> f a
pure VariableMapC
forall k v. HashMap k v
HM.empty)
Parser (Maybe Text -> (Text, VariableMapC, Maybe Text))
-> Parser (Maybe Text) -> Parser (Text, VariableMapC, Maybe Text)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
v Object -> Text -> Parser (Maybe Text)
forall a. FromJSON a => Object -> Text -> Parser (Maybe a)
.:? Text
"operationName"
theType :: (A.KeyValue kv) => T.Text -> kv
theType :: Text -> kv
theType Text
t = Text
"type" Text -> Text -> kv
forall kv v. (KeyValue kv, ToJSON v) => Text -> v -> kv
.= Text
t
instance A.ToJSON ServerMessage where
toJSON :: ServerMessage -> Value
toJSON (GQLConnectionError Maybe Value
e)
= [Pair] -> Value
A.object [Text -> Pair
forall kv. KeyValue kv => Text -> kv
theType Text
"connection_error", Text
"payload" Text -> Maybe Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Text -> v -> kv
.= Maybe Value
e]
toJSON ServerMessage
GQLConnectionAck
= [Pair] -> Value
A.object [Text -> Pair
forall kv. KeyValue kv => Text -> kv
theType Text
"connection_ack"]
toJSON (GQLData Text
i Value
p)
= [Pair] -> Value
A.object [Text -> Pair
forall kv. KeyValue kv => Text -> kv
theType Text
"data", Text
"id" Text -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Text -> v -> kv
.= Text
i, Text
"payload" Text -> Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Text -> v -> kv
.= Value
p]
toJSON (GQLError Text
i Value
p)
= [Pair] -> Value
A.object [Text -> Pair
forall kv. KeyValue kv => Text -> kv
theType Text
"error", Text
"id" Text -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Text -> v -> kv
.= Text
i, Text
"payload" Text -> Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Text -> v -> kv
.= Value
p]
toJSON (GQLComplete Text
i)
= [Pair] -> Value
A.object [Text -> Pair
forall kv. KeyValue kv => Text -> kv
theType Text
"complete", Text
"id" Text -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Text -> v -> kv
.= Text
i]
toJSON ServerMessage
GQLKeepAlive
= [Pair] -> Value
A.object [Text -> Pair
forall kv. KeyValue kv => Text -> kv
theType Text
"ka"]