module Calamity.Gateway.Shard (
Shard (..),
newShard,
) where
import Calamity.Gateway.DispatchEvents (
CalamityEvent (Dispatch),
DispatchData (Ready),
)
import Calamity.Gateway.Intents (Intents)
import Calamity.Gateway.Types (
ControlMessage (..),
IdentifyData (
IdentifyData,
compress,
intents,
largeThreshold,
presence,
properties,
shard,
token
),
IdentifyProps (IdentifyProps, browser, device),
ReceivedDiscordMessage (
EvtDispatch,
HeartBeatAck,
HeartBeatReq,
Hello,
InvalidSession,
Reconnect
),
ResumeData (ResumeData, seq, sessionID, token),
SentDiscordMessage (HeartBeat, Identify, Resume, StatusUpdate),
Shard (..),
ShardC,
ShardFlowControl (..),
ShardMsg (..),
ShardState (ShardState, wsConn),
StatusUpdateData,
)
import Calamity.Internal.RunIntoIO (bindSemToIO)
import Calamity.Internal.Utils (
debug,
error,
info,
leftToMaybe,
swap,
unlessM,
untilJustFinalIO,
whenJust,
whileMFinalIO,
)
import Calamity.Metrics.Eff (
MetricEff,
modifyGauge,
registerGauge,
)
import Calamity.Types.LogEff (LogEff)
import Calamity.Types.Token (Token, rawToken)
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (Async, cancel)
import Control.Concurrent.Chan.Unagi qualified as UC
import Control.Concurrent.STM (STM, atomically, retry)
import Control.Concurrent.STM.TBMQueue (
TBMQueue,
closeTBMQueue,
newTBMQueueIO,
readTBMQueue,
tryWriteTBMQueue,
writeTBMQueue,
)
import Control.Exception (
Exception (fromException),
SomeException,
)
import Control.Exception.Safe qualified as Ex
import Control.Monad (void, when)
import Control.Monad.State.Lazy (runState)
import Data.Aeson qualified as A
import Data.ByteString.Lazy qualified as LBS
import Data.Default.Class (def)
import Data.IORef (newIORef)
import Data.Maybe (fromMaybe)
import Data.Text qualified as T
import DiPolysemy (attr, push)
import Network.Connection qualified as NC
import Network.TLS qualified as NT
import Network.TLS.Extra qualified as NT
import Network.WebSockets (
Connection,
ConnectionException (..),
receiveData,
sendCloseCode,
sendTextData,
)
import Network.WebSockets qualified as NW
import Network.WebSockets.Stream qualified as NW
import Optics
import Optics.State.Operators
import Polysemy (Sem)
import Polysemy qualified as P
import Polysemy.Async qualified as P
import Polysemy.AtomicState qualified as P
import Polysemy.Error qualified as P
import Polysemy.Resource qualified as P
import System.X509 qualified as X509
import TextShow (showt)
import Prelude hiding (error)
runWebsocket ::
P.Members '[LogEff, P.Final IO, P.Embed IO] r =>
T.Text ->
T.Text ->
(Connection -> P.Sem r a) ->
P.Sem r (Maybe a)
runWebsocket :: forall (r :: EffectRow) a.
Members '[LogEff, Final IO, Embed IO] r =>
Text -> Text -> (Connection -> Sem r a) -> Sem r (Maybe a)
runWebsocket Text
host Text
path Connection -> Sem r a
ma = do
Connection -> IO (Maybe a)
inner <- (Connection -> Sem r a) -> Sem r (Connection -> IO (Maybe a))
forall (r :: EffectRow) p a.
Member (Final IO) r =>
(p -> Sem r a) -> Sem r (p -> IO (Maybe a))
bindSemToIO Connection -> Sem r a
ma
let logExc :: p -> Sem r ()
logExc p
e = Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug (Text -> Sem r ()) -> Text -> Sem r ()
forall a b. (a -> b) -> a -> b
$ Text
"runWebsocket raised with " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> (String -> Text
T.pack (String -> Text) -> (p -> String) -> p -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. p -> String
forall a. Show a => a -> String
show (p -> Text) -> p -> Text
forall a b. (a -> b) -> a -> b
$ p
e)
SomeException -> IO (Maybe ())
logExc' <- (SomeException -> Sem r ())
-> Sem r (SomeException -> IO (Maybe ()))
forall (r :: EffectRow) p a.
Member (Final IO) r =>
(p -> Sem r a) -> Sem r (p -> IO (Maybe a))
bindSemToIO SomeException -> Sem r ()
forall {r :: EffectRow} {p}.
(Member LogEff r, Show p) =>
p -> Sem r ()
logExc
let handler :: SomeException -> IO (Maybe a)
handler SomeException
e = do
IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ()) -> IO (Maybe ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ SomeException -> IO (Maybe ())
logExc' SomeException
e
Maybe a -> IO (Maybe a)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
forall a. Maybe a
Nothing
IO (Maybe a) -> Sem r (Maybe a)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO (Maybe a) -> Sem r (Maybe a))
-> (IO (Maybe a) -> IO (Maybe a))
-> IO (Maybe a)
-> Sem r (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (SomeException -> IO (Maybe a)) -> IO (Maybe a) -> IO (Maybe a)
forall (m :: * -> *) a.
MonadCatch m =>
(SomeException -> m a) -> m a -> m a
Ex.handleAny SomeException -> IO (Maybe a)
handler (IO (Maybe a) -> Sem r (Maybe a))
-> IO (Maybe a) -> Sem r (Maybe a)
forall a b. (a -> b) -> a -> b
$ do
ConnectionContext
ctx <- IO ConnectionContext
NC.initConnectionContext
CertificateStore
certStore <- IO CertificateStore
X509.getSystemCertificateStore
let clientParams :: ClientParams
clientParams =
(String -> ByteString -> ClientParams
NT.defaultParamsClient (Text -> String
T.unpack Text
host) ByteString
"443")
{ clientSupported :: Supported
NT.clientSupported = Supported
forall a. Default a => a
def {supportedCiphers :: [Cipher]
NT.supportedCiphers = [Cipher]
NT.ciphersuite_default}
, clientShared :: Shared
NT.clientShared =
Shared
forall a. Default a => a
def
{ sharedCAStore :: CertificateStore
NT.sharedCAStore = CertificateStore
certStore
}
}
let tlsSettings :: TLSSettings
tlsSettings = ClientParams -> TLSSettings
NC.TLSSettings ClientParams
clientParams
connParams :: ConnectionParams
connParams = String
-> PortNumber
-> Maybe TLSSettings
-> Maybe ProxySettings
-> ConnectionParams
NC.ConnectionParams (Text -> String
T.unpack Text
host) PortNumber
443 (TLSSettings -> Maybe TLSSettings
forall a. a -> Maybe a
Just TLSSettings
tlsSettings) Maybe ProxySettings
forall a. Maybe a
Nothing
IO Connection
-> (Connection -> IO ())
-> (Connection -> IO (Maybe a))
-> IO (Maybe a)
forall (m :: * -> *) a b c.
MonadMask m =>
m a -> (a -> m b) -> (a -> m c) -> m c
Ex.bracket
(ConnectionContext -> ConnectionParams -> IO Connection
NC.connectTo ConnectionContext
ctx ConnectionParams
connParams)
Connection -> IO ()
NC.connectionClose
( \Connection
conn -> do
Stream
stream <-
IO (Maybe ByteString) -> (Maybe ByteString -> IO ()) -> IO Stream
NW.makeStream
(ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (ByteString -> Maybe ByteString)
-> IO ByteString -> IO (Maybe ByteString)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> IO ByteString
NC.connectionGetChunk Connection
conn)
(IO () -> (ByteString -> IO ()) -> Maybe ByteString -> IO ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (Connection -> ByteString -> IO ()
NC.connectionPut Connection
conn (ByteString -> IO ())
-> (ByteString -> ByteString) -> ByteString -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
LBS.toStrict))
Stream
-> String
-> String
-> ConnectionOptions
-> Headers
-> (Connection -> IO (Maybe a))
-> IO (Maybe a)
forall a.
Stream
-> String
-> String
-> ConnectionOptions
-> Headers
-> ClientApp a
-> IO a
NW.runClientWithStream Stream
stream (Text -> String
T.unpack Text
host) (Text -> String
T.unpack Text
path) ConnectionOptions
NW.defaultConnectionOptions [] Connection -> IO (Maybe a)
inner
)
newShardState :: Shard -> ShardState
newShardState :: Shard -> ShardState
newShardState Shard
shard = Shard
-> Maybe Int
-> Maybe (Async (Maybe ()))
-> Bool
-> Maybe Text
-> Maybe Text
-> Maybe Connection
-> ShardState
ShardState Shard
shard Maybe Int
forall a. Maybe a
Nothing Maybe (Async (Maybe ()))
forall a. Maybe a
Nothing Bool
False Maybe Text
forall a. Maybe a
Nothing Maybe Text
forall a. Maybe a
Nothing Maybe Connection
forall a. Maybe a
Nothing
newShard ::
P.Members '[LogEff, MetricEff, P.Embed IO, P.Final IO, P.Async] r =>
T.Text ->
Int ->
Int ->
Token ->
Maybe StatusUpdateData ->
Intents ->
UC.InChan CalamityEvent ->
Sem r (UC.InChan ControlMessage, Async (Maybe ()))
newShard :: forall (r :: EffectRow).
Members '[LogEff, MetricEff, Embed IO, Final IO, Async] r =>
Text
-> Int
-> Int
-> Token
-> Maybe StatusUpdateData
-> Intents
-> InChan CalamityEvent
-> Sem r (InChan ControlMessage, Async (Maybe ()))
newShard Text
gateway Int
id Int
count Token
token Maybe StatusUpdateData
presence Intents
intents InChan CalamityEvent
evtIn = do
(InChan ControlMessage
cmdIn, OutChan ControlMessage
cmdOut) <- IO (InChan ControlMessage, OutChan ControlMessage)
-> Sem r (InChan ControlMessage, OutChan ControlMessage)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed IO (InChan ControlMessage, OutChan ControlMessage)
forall a. IO (InChan a, OutChan a)
UC.newChan
let shard :: Shard
shard = Int
-> Int
-> Text
-> InChan CalamityEvent
-> OutChan ControlMessage
-> Text
-> Maybe StatusUpdateData
-> Intents
-> Shard
Shard Int
id Int
count Text
gateway InChan CalamityEvent
evtIn OutChan ControlMessage
cmdOut (Token -> Text
rawToken Token
token) Maybe StatusUpdateData
presence Intents
intents
IORef ShardState
stateVar <- IO (IORef ShardState) -> Sem r (IORef ShardState)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO (IORef ShardState) -> Sem r (IORef ShardState))
-> (ShardState -> IO (IORef ShardState))
-> ShardState
-> Sem r (IORef ShardState)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ShardState -> IO (IORef ShardState)
forall a. a -> IO (IORef a)
newIORef (ShardState -> Sem r (IORef ShardState))
-> ShardState -> Sem r (IORef ShardState)
forall a b. (a -> b) -> a -> b
$ Shard -> ShardState
newShardState Shard
shard
let runShard :: Sem r ()
runShard = IORef ShardState -> Sem (AtomicState ShardState : r) () -> Sem r ()
forall s (r :: EffectRow) a.
Member (Embed IO) r =>
IORef s -> Sem (AtomicState s : r) a -> Sem r a
P.runAtomicStateIORef IORef ShardState
stateVar Sem (AtomicState ShardState : r) ()
forall (r :: EffectRow). ShardC r => Sem r ()
shardLoop
let action :: Sem r ()
action = Segment -> Sem r () -> Sem r ()
forall level msg (r :: EffectRow) a.
Member (Di level Path msg) r =>
Segment -> Sem r a -> Sem r a
push Segment
"calamity-shard" (Sem r () -> Sem r ())
-> (Sem r () -> Sem r ()) -> Sem r () -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Key -> Int -> Sem r () -> Sem r ()
forall value level msg (r :: EffectRow) a.
(ToValue value, Member (Di level Path msg) r) =>
Key -> value -> Sem r a -> Sem r a
attr Key
"shard-id" Int
id (Sem r () -> Sem r ()) -> Sem r () -> Sem r ()
forall a b. (a -> b) -> a -> b
$ Sem r ()
runShard
Async (Maybe ())
thread' <- Sem r () -> Sem r (Async (Maybe ()))
forall (r :: EffectRow) a.
Member Async r =>
Sem r a -> Sem r (Async (Maybe a))
P.async Sem r ()
action
(InChan ControlMessage, Async (Maybe ()))
-> Sem r (InChan ControlMessage, Async (Maybe ()))
forall a. a -> Sem r a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InChan ControlMessage
cmdIn, Async (Maybe ())
thread')
sendToWs :: ShardC r => SentDiscordMessage -> Sem r ()
sendToWs :: forall (r :: EffectRow). ShardC r => SentDiscordMessage -> Sem r ()
sendToWs SentDiscordMessage
data' = do
Maybe Connection
wsConn' <- (ShardState -> Maybe Connection) -> Sem r (Maybe Connection)
forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets ShardState -> Maybe Connection
wsConn
case Maybe Connection
wsConn' of
Just Connection
wsConn -> do
let encodedData :: ByteString
encodedData = SentDiscordMessage -> ByteString
forall a. ToJSON a => a -> ByteString
A.encode SentDiscordMessage
data'
Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug (Text -> Sem r ()) -> (String -> Text) -> String -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack (String -> Sem r ()) -> String -> Sem r ()
forall a b. (a -> b) -> a -> b
$ String
"sending " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> SentDiscordMessage -> String
forall a. Show a => a -> String
show SentDiscordMessage
data' String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" encoded to " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> ByteString -> String
forall a. Show a => a -> String
show ByteString
encodedData String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" to gateway"
IO () -> Sem r ()
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem r ())
-> (ByteString -> IO ()) -> ByteString -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> ByteString -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
sendTextData Connection
wsConn (ByteString -> Sem r ()) -> ByteString -> Sem r ()
forall a b. (a -> b) -> a -> b
$ ByteString
encodedData
Maybe Connection
Nothing -> Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"tried to send to closed WS"
tryWriteTBMQueue' :: TBMQueue a -> a -> STM Bool
tryWriteTBMQueue' :: forall a. TBMQueue a -> a -> STM Bool
tryWriteTBMQueue' TBMQueue a
q a
v = do
Maybe Bool
v' <- TBMQueue a -> a -> STM (Maybe Bool)
forall a. TBMQueue a -> a -> STM (Maybe Bool)
tryWriteTBMQueue TBMQueue a
q a
v
case Maybe Bool
v' of
Just Bool
False -> STM Bool
forall a. STM a
retry
Just Bool
True -> Bool -> STM Bool
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
Maybe Bool
Nothing -> Bool -> STM Bool
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
restartUnless :: P.Members '[LogEff, P.Error ShardFlowControl] r => T.Text -> Maybe a -> P.Sem r a
restartUnless :: forall (r :: EffectRow) a.
Members '[LogEff, Error ShardFlowControl] r =>
Text -> Maybe a -> Sem r a
restartUnless Text
_ (Just a
a) = a -> Sem r a
forall a. a -> Sem r a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
a
restartUnless Text
msg Maybe a
Nothing = do
Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
error Text
msg
ShardFlowControl -> Sem r a
forall e (r :: EffectRow) a. Member (Error e) r => e -> Sem r a
P.throw ShardFlowControl
ShardFlowRestart
shardLoop :: ShardC r => Sem r ()
shardLoop :: forall (r :: EffectRow). ShardC r => Sem r ()
shardLoop = do
Gauge
activeShards <- Text -> [(Text, Text)] -> Sem r Gauge
forall (r :: EffectRow).
Member MetricEff r =>
Text -> [(Text, Text)] -> Sem r Gauge
registerGauge Text
"active_shards" [(Text, Text)]
forall a. Monoid a => a
mempty
Sem r Double -> Sem r ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Sem r Double -> Sem r ()) -> Sem r Double -> Sem r ()
forall a b. (a -> b) -> a -> b
$ (Double -> Double) -> Gauge -> Sem r Double
forall (r :: EffectRow).
Member MetricEff r =>
(Double -> Double) -> Gauge -> Sem r Double
modifyGauge (Double -> Double -> Double
forall a. Num a => a -> a -> a
+ Double
1) Gauge
activeShards
Sem r () -> Sem r ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void Sem r ()
forall (r :: EffectRow). ShardC r => Sem r ()
outerloop
Sem r Double -> Sem r ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Sem r Double -> Sem r ()) -> Sem r Double -> Sem r ()
forall a b. (a -> b) -> a -> b
$ (Double -> Double) -> Gauge -> Sem r Double
forall (r :: EffectRow).
Member MetricEff r =>
(Double -> Double) -> Gauge -> Sem r Double
modifyGauge (Double -> Double -> Double
forall a. Num a => a -> a -> a
subtract Double
1) Gauge
activeShards
Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Shard shut down"
where
controlStream :: Shard -> TBMQueue ShardMsg -> IO ()
controlStream :: Shard -> TBMQueue ShardMsg -> IO ()
controlStream Shard
shard TBMQueue ShardMsg
outqueue = IO ()
inner
where
q :: OutChan ControlMessage
q = Shard
shard Shard
-> Optic' A_Lens NoIx Shard (OutChan ControlMessage)
-> OutChan ControlMessage
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx Shard (OutChan ControlMessage)
#cmdOut
inner :: IO ()
inner = do
ControlMessage
v <- OutChan ControlMessage -> IO ControlMessage
forall a. OutChan a -> IO a
UC.readChan OutChan ControlMessage
q
Bool
r <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TBMQueue ShardMsg -> ShardMsg -> STM Bool
forall a. TBMQueue a -> a -> STM Bool
tryWriteTBMQueue' TBMQueue ShardMsg
outqueue (ControlMessage -> ShardMsg
Control ControlMessage
v)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
r IO ()
inner
handleWSException :: SomeException -> IO (Either (ControlMessage, Maybe T.Text) a)
handleWSException :: forall a.
SomeException -> IO (Either (ControlMessage, Maybe Text) a)
handleWSException SomeException
e = Either (ControlMessage, Maybe Text) a
-> IO (Either (ControlMessage, Maybe Text) a)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either (ControlMessage, Maybe Text) a
-> IO (Either (ControlMessage, Maybe Text) a))
-> Either (ControlMessage, Maybe Text) a
-> IO (Either (ControlMessage, Maybe Text) a)
forall a b. (a -> b) -> a -> b
$ case SomeException -> Maybe ConnectionException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e of
Just (CloseRequest Word16
code ByteString
_)
| Word16
code Word16 -> [Word16] -> Bool
forall a. Eq a => a -> [a] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [Word16
4004, Word16
4010, Word16
4011, Word16
4012, Word16
4013, Word16
4014] ->
(ControlMessage, Maybe Text)
-> Either (ControlMessage, Maybe Text) a
forall a b. a -> Either a b
Left (ControlMessage
ShutDownShard, Text -> Maybe Text
forall a. a -> Maybe a
Just (Text -> Maybe Text) -> (Word16 -> Text) -> Word16 -> Maybe Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word16 -> Text
forall a. TextShow a => a -> Text
showt (Word16 -> Maybe Text) -> Word16 -> Maybe Text
forall a b. (a -> b) -> a -> b
$ Word16
code)
Maybe ConnectionException
e -> (ControlMessage, Maybe Text)
-> Either (ControlMessage, Maybe Text) a
forall a b. a -> Either a b
Left (ControlMessage
RestartShard, Text -> Maybe Text
forall a. a -> Maybe a
Just (Text -> Maybe Text)
-> (Maybe ConnectionException -> Text)
-> Maybe ConnectionException
-> Maybe Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack (String -> Text)
-> (Maybe ConnectionException -> String)
-> Maybe ConnectionException
-> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe ConnectionException -> String
forall a. Show a => a -> String
show (Maybe ConnectionException -> Maybe Text)
-> Maybe ConnectionException -> Maybe Text
forall a b. (a -> b) -> a -> b
$ Maybe ConnectionException
e)
discordStream :: P.Members '[LogEff, MetricEff, P.Embed IO, P.Final IO] r => Connection -> TBMQueue ShardMsg -> Sem r ()
discordStream :: forall (r :: EffectRow).
Members '[LogEff, MetricEff, Embed IO, Final IO] r =>
Connection -> TBMQueue ShardMsg -> Sem r ()
discordStream Connection
ws TBMQueue ShardMsg
outqueue = Sem r ()
inner
where
inner :: Sem r ()
inner = do
Either (ControlMessage, Maybe Text) ByteString
msg <- IO (Either (ControlMessage, Maybe Text) ByteString)
-> Sem r (Either (ControlMessage, Maybe Text) ByteString)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO (Either (ControlMessage, Maybe Text) ByteString)
-> Sem r (Either (ControlMessage, Maybe Text) ByteString))
-> IO (Either (ControlMessage, Maybe Text) ByteString)
-> Sem r (Either (ControlMessage, Maybe Text) ByteString)
forall a b. (a -> b) -> a -> b
$ IO (Either (ControlMessage, Maybe Text) ByteString)
-> (SomeException
-> IO (Either (ControlMessage, Maybe Text) ByteString))
-> IO (Either (ControlMessage, Maybe Text) ByteString)
forall (m :: * -> *) a.
MonadCatch m =>
m a -> (SomeException -> m a) -> m a
Ex.catchAny (ByteString -> Either (ControlMessage, Maybe Text) ByteString
forall a b. b -> Either a b
Right (ByteString -> Either (ControlMessage, Maybe Text) ByteString)
-> IO ByteString
-> IO (Either (ControlMessage, Maybe Text) ByteString)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> IO ByteString
forall a. WebSocketsData a => Connection -> IO a
receiveData Connection
ws) SomeException
-> IO (Either (ControlMessage, Maybe Text) ByteString)
forall a.
SomeException -> IO (Either (ControlMessage, Maybe Text) a)
handleWSException
case Either (ControlMessage, Maybe Text) ByteString
msg of
Left (ControlMessage
c, Maybe Text
reason) -> do
Maybe Text -> (Text -> Sem r ()) -> Sem r ()
forall (m :: * -> *) a.
Applicative m =>
Maybe a -> (a -> m ()) -> m ()
whenJust Maybe Text
reason (\Text
r -> Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
error (Text -> Sem r ()) -> (String -> Text) -> String -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack (String -> Sem r ()) -> String -> Sem r ()
forall a b. (a -> b) -> a -> b
$ String
"Shard closed with reason: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Text -> String
forall a. Show a => a -> String
show Text
r)
IO () -> Sem r ()
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem r ()) -> (STM () -> IO ()) -> STM () -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> Sem r ()) -> STM () -> Sem r ()
forall a b. (a -> b) -> a -> b
$ TBMQueue ShardMsg -> ShardMsg -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue ShardMsg
outqueue (ControlMessage -> ShardMsg
Control ControlMessage
c)
Right ByteString
msg' -> do
let decoded :: Either String ReceivedDiscordMessage
decoded = ByteString -> Either String ReceivedDiscordMessage
forall a. FromJSON a => ByteString -> Either String a
A.eitherDecode ByteString
msg'
Bool
r <- case Either String ReceivedDiscordMessage
decoded of
Right ReceivedDiscordMessage
a ->
IO Bool -> Sem r Bool
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO Bool -> Sem r Bool)
-> (STM Bool -> IO Bool) -> STM Bool -> Sem r Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> Sem r Bool) -> STM Bool -> Sem r Bool
forall a b. (a -> b) -> a -> b
$ TBMQueue ShardMsg -> ShardMsg -> STM Bool
forall a. TBMQueue a -> a -> STM Bool
tryWriteTBMQueue' TBMQueue ShardMsg
outqueue (ReceivedDiscordMessage -> ShardMsg
Discord ReceivedDiscordMessage
a)
Left String
e -> do
Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
error (Text -> Sem r ()) -> (String -> Text) -> String -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack (String -> Sem r ()) -> String -> Sem r ()
forall a b. (a -> b) -> a -> b
$ String
"Failed to decode " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
e String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
": " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> ByteString -> String
forall a. Show a => a -> String
show ByteString
msg'
Bool -> Sem r Bool
forall a. a -> Sem r a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
Bool -> Sem r () -> Sem r ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
r Sem r ()
inner
outerloop :: ShardC r => Sem r ()
outerloop :: forall (r :: EffectRow). ShardC r => Sem r ()
outerloop = Sem r Bool -> Sem r ()
forall (r :: EffectRow).
Member (Final IO) r =>
Sem r Bool -> Sem r ()
whileMFinalIO (Sem r Bool -> Sem r ()) -> Sem r Bool -> Sem r ()
forall a b. (a -> b) -> a -> b
$ do
Shard
shard :: Shard <- (ShardState -> Shard) -> Sem r Shard
forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState -> Optic' A_Lens NoIx ShardState Shard -> Shard
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx ShardState Shard
#shardS)
let host :: Text
host = Shard
shard Shard -> Optic' A_Lens NoIx Shard Text -> Text
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx Shard Text
#gateway
let host' :: Text
host' = Text -> Maybe Text -> Text
forall a. a -> Maybe a -> a
fromMaybe Text
host (Maybe Text -> Text) -> Maybe Text -> Text
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Maybe Text
T.stripPrefix Text
"wss://" Text
host
Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info (Text -> Sem r ()) -> (String -> Text) -> String -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack (String -> Sem r ()) -> String -> Sem r ()
forall a b. (a -> b) -> a -> b
$ String
"starting up shard " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show (Shard -> Int
shardID Shard
shard) String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" of " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show (Shard -> Int
shardCount Shard
shard)
Maybe ShardFlowControl
innerLoopVal <- Text
-> Text
-> (Connection -> Sem r ShardFlowControl)
-> Sem r (Maybe ShardFlowControl)
forall (r :: EffectRow) a.
Members '[LogEff, Final IO, Embed IO] r =>
Text -> Text -> (Connection -> Sem r a) -> Sem r (Maybe a)
runWebsocket Text
host' Text
"/?v=9&encoding=json" Connection -> Sem r ShardFlowControl
forall (r :: EffectRow).
ShardC r =>
Connection -> Sem r ShardFlowControl
innerloop
case Maybe ShardFlowControl
innerLoopVal of
Just ShardFlowControl
ShardFlowShutDown -> do
Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info Text
"Shutting down shard"
Bool -> Sem r Bool
forall a. a -> Sem r a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
Just ShardFlowControl
ShardFlowRestart -> do
Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info Text
"Restaring shard"
Bool -> Sem r Bool
forall a. a -> Sem r a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
Maybe ShardFlowControl
Nothing -> do
Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info Text
"Restarting shard (abnormal reasons?)"
Bool -> Sem r Bool
forall a. a -> Sem r a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
innerloop :: ShardC r => Connection -> Sem r ShardFlowControl
innerloop :: forall (r :: EffectRow).
ShardC r =>
Connection -> Sem r ShardFlowControl
innerloop Connection
ws = do
Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Entering inner loop of shard"
Shard
shard <- (ShardState -> Shard) -> Sem r Shard
forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState -> Optic' A_Lens NoIx ShardState Shard -> Shard
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx ShardState Shard
#shardS)
(ShardState -> ShardState) -> Sem r ()
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (Optic
A_Lens
NoIx
ShardState
ShardState
(Maybe Connection)
(Maybe Connection)
#wsConn Optic
A_Lens
NoIx
ShardState
ShardState
(Maybe Connection)
(Maybe Connection)
-> Connection -> ShardState -> ShardState
forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a (Maybe b) -> b -> s -> t
?~ Connection
ws)
Maybe Int
seqNum' <- (ShardState -> Maybe Int) -> Sem r (Maybe Int)
forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState
-> Optic' A_Lens NoIx ShardState (Maybe Int) -> Maybe Int
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx ShardState (Maybe Int)
#seqNum)
Maybe Text
sessionID' <- (ShardState -> Maybe Text) -> Sem r (Maybe Text)
forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState
-> Optic' A_Lens NoIx ShardState (Maybe Text) -> Maybe Text
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx ShardState (Maybe Text)
#sessionID)
case (Maybe Int
seqNum', Maybe Text
sessionID') of
(Just Int
n, Just Text
s) -> do
Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug (Text -> Sem r ()) -> Text -> Sem r ()
forall a b. (a -> b) -> a -> b
$ Text
"Resuming shard (sessionID: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
s Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
", seq: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Int -> String
forall a. Show a => a -> String
show Int
n)
SentDiscordMessage -> Sem r ()
forall (r :: EffectRow). ShardC r => SentDiscordMessage -> Sem r ()
sendToWs
( ResumeData -> SentDiscordMessage
Resume
ResumeData
{ $sel:token:ResumeData :: Text
token = Shard
shard Shard -> Optic' A_Lens NoIx Shard Text -> Text
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx Shard Text
#token
, $sel:sessionID:ResumeData :: Text
sessionID = Text
s
, $sel:seq:ResumeData :: Int
seq = Int
n
}
)
(Maybe Int, Maybe Text)
_noActiveSession -> do
Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Identifying shard"
SentDiscordMessage -> Sem r ()
forall (r :: EffectRow). ShardC r => SentDiscordMessage -> Sem r ()
sendToWs
( IdentifyData -> SentDiscordMessage
Identify
IdentifyData
{ $sel:token:IdentifyData :: Text
token = Shard
shard Shard -> Optic' A_Lens NoIx Shard Text -> Text
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx Shard Text
#token
, $sel:properties:IdentifyData :: IdentifyProps
properties =
IdentifyProps
{ $sel:browser:IdentifyProps :: Text
browser = Text
"Calamity: https://github.com/simmsb/calamity"
, $sel:device:IdentifyProps :: Text
device = Text
"Calamity: https://github.com/simmsb/calamity"
}
, $sel:compress:IdentifyData :: Bool
compress = Bool
False
, $sel:largeThreshold:IdentifyData :: Maybe Int
largeThreshold = Maybe Int
forall a. Maybe a
Nothing
, $sel:shard:IdentifyData :: Maybe (Int, Int)
shard =
(Int, Int) -> Maybe (Int, Int)
forall a. a -> Maybe a
Just (Shard
shard Shard -> Optic' A_Lens NoIx Shard Int -> Int
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx Shard Int
#shardID, Shard
shard Shard -> Optic' A_Lens NoIx Shard Int -> Int
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx Shard Int
#shardCount)
, $sel:presence:IdentifyData :: Maybe StatusUpdateData
presence = Shard
shard Shard
-> Optic' A_Lens NoIx Shard (Maybe StatusUpdateData)
-> Maybe StatusUpdateData
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx Shard (Maybe StatusUpdateData)
#initialStatus
, $sel:intents:IdentifyData :: Intents
intents = Shard
shard Shard -> Optic' A_Lens NoIx Shard Intents -> Intents
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx Shard Intents
#intents
}
)
ShardFlowControl
result <-
Sem (Resource : r) ShardFlowControl -> Sem r ShardFlowControl
forall (r :: EffectRow) a.
Member (Final IO) r =>
Sem (Resource : r) a -> Sem r a
P.resourceToIOFinal (Sem (Resource : r) ShardFlowControl -> Sem r ShardFlowControl)
-> Sem (Resource : r) ShardFlowControl -> Sem r ShardFlowControl
forall a b. (a -> b) -> a -> b
$
Sem (Resource : r) (TBMQueue ShardMsg)
-> (TBMQueue ShardMsg -> Sem (Resource : r) ())
-> (TBMQueue ShardMsg -> Sem (Resource : r) ShardFlowControl)
-> Sem (Resource : r) ShardFlowControl
forall (r :: EffectRow) a c b.
Member Resource r =>
Sem r a -> (a -> Sem r c) -> (a -> Sem r b) -> Sem r b
P.bracket
(IO (TBMQueue ShardMsg) -> Sem (Resource : r) (TBMQueue ShardMsg)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO (TBMQueue ShardMsg) -> Sem (Resource : r) (TBMQueue ShardMsg))
-> IO (TBMQueue ShardMsg) -> Sem (Resource : r) (TBMQueue ShardMsg)
forall a b. (a -> b) -> a -> b
$ Int -> IO (TBMQueue ShardMsg)
forall a. Int -> IO (TBMQueue a)
newTBMQueueIO Int
1)
(IO () -> Sem (Resource : r) ()
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem (Resource : r) ())
-> (TBMQueue ShardMsg -> IO ())
-> TBMQueue ShardMsg
-> Sem (Resource : r) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ())
-> (TBMQueue ShardMsg -> STM ()) -> TBMQueue ShardMsg -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TBMQueue ShardMsg -> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue)
( \TBMQueue ShardMsg
q -> do
Text -> Sem (Resource : r) ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"handling events now"
Async (Maybe ())
_controlThread <- Sem (Resource : r) () -> Sem (Resource : r) (Async (Maybe ()))
forall (r :: EffectRow) a.
Member Async r =>
Sem r a -> Sem r (Async (Maybe a))
P.async (Sem (Resource : r) () -> Sem (Resource : r) (Async (Maybe ())))
-> (IO () -> Sem (Resource : r) ())
-> IO ()
-> Sem (Resource : r) (Async (Maybe ()))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> Sem (Resource : r) ()
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem (Resource : r) (Async (Maybe ())))
-> IO () -> Sem (Resource : r) (Async (Maybe ()))
forall a b. (a -> b) -> a -> b
$ Shard -> TBMQueue ShardMsg -> IO ()
controlStream Shard
shard TBMQueue ShardMsg
q
Async (Maybe ())
_discordThread <- Sem (Resource : r) () -> Sem (Resource : r) (Async (Maybe ()))
forall (r :: EffectRow) a.
Member Async r =>
Sem r a -> Sem r (Async (Maybe a))
P.async (Sem (Resource : r) () -> Sem (Resource : r) (Async (Maybe ())))
-> Sem (Resource : r) () -> Sem (Resource : r) (Async (Maybe ()))
forall a b. (a -> b) -> a -> b
$ Connection -> TBMQueue ShardMsg -> Sem (Resource : r) ()
forall (r :: EffectRow).
Members '[LogEff, MetricEff, Embed IO, Final IO] r =>
Connection -> TBMQueue ShardMsg -> Sem r ()
discordStream Connection
ws TBMQueue ShardMsg
q
Sem r ShardFlowControl -> Sem (Resource : r) ShardFlowControl
forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
Sem r a -> Sem (e : r) a
P.raise (Sem r ShardFlowControl -> Sem (Resource : r) ShardFlowControl)
-> (Sem (Error ShardFlowControl : r) () -> Sem r ShardFlowControl)
-> Sem (Error ShardFlowControl : r) ()
-> Sem (Resource : r) ShardFlowControl
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sem r (Maybe ShardFlowControl) -> Sem r ShardFlowControl
forall (r :: EffectRow) a.
Member (Final IO) r =>
Sem r (Maybe a) -> Sem r a
untilJustFinalIO (Sem r (Maybe ShardFlowControl) -> Sem r ShardFlowControl)
-> (Sem (Error ShardFlowControl : r) ()
-> Sem r (Maybe ShardFlowControl))
-> Sem (Error ShardFlowControl : r) ()
-> Sem r ShardFlowControl
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Either ShardFlowControl () -> Maybe ShardFlowControl
forall e a. Either e a -> Maybe e
leftToMaybe (Either ShardFlowControl () -> Maybe ShardFlowControl)
-> Sem r (Either ShardFlowControl ())
-> Sem r (Maybe ShardFlowControl)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>) (Sem r (Either ShardFlowControl ())
-> Sem r (Maybe ShardFlowControl))
-> (Sem (Error ShardFlowControl : r) ()
-> Sem r (Either ShardFlowControl ()))
-> Sem (Error ShardFlowControl : r) ()
-> Sem r (Maybe ShardFlowControl)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sem (Error ShardFlowControl : r) ()
-> Sem r (Either ShardFlowControl ())
forall e (r :: EffectRow) a.
Sem (Error e : r) a -> Sem r (Either e a)
P.runError (Sem (Error ShardFlowControl : r) ()
-> Sem (Resource : r) ShardFlowControl)
-> Sem (Error ShardFlowControl : r) ()
-> Sem (Resource : r) ShardFlowControl
forall a b. (a -> b) -> a -> b
$ do
Maybe ShardMsg
msg <- IO (Maybe ShardMsg)
-> Sem (Error ShardFlowControl : r) (Maybe ShardMsg)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO (Maybe ShardMsg)
-> Sem (Error ShardFlowControl : r) (Maybe ShardMsg))
-> (STM (Maybe ShardMsg) -> IO (Maybe ShardMsg))
-> STM (Maybe ShardMsg)
-> Sem (Error ShardFlowControl : r) (Maybe ShardMsg)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (Maybe ShardMsg) -> IO (Maybe ShardMsg)
forall a. STM a -> IO a
atomically (STM (Maybe ShardMsg)
-> Sem (Error ShardFlowControl : r) (Maybe ShardMsg))
-> STM (Maybe ShardMsg)
-> Sem (Error ShardFlowControl : r) (Maybe ShardMsg)
forall a b. (a -> b) -> a -> b
$ TBMQueue ShardMsg -> STM (Maybe ShardMsg)
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue ShardMsg
q
ShardMsg -> Sem (Error ShardFlowControl : r) ()
forall (r :: EffectRow).
(ShardC r, Member (Error ShardFlowControl) r) =>
ShardMsg -> Sem r ()
handleMsg (ShardMsg -> Sem (Error ShardFlowControl : r) ())
-> Sem (Error ShardFlowControl : r) ShardMsg
-> Sem (Error ShardFlowControl : r) ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Text -> Maybe ShardMsg -> Sem (Error ShardFlowControl : r) ShardMsg
forall (r :: EffectRow) a.
Members '[LogEff, Error ShardFlowControl] r =>
Text -> Maybe a -> Sem r a
restartUnless Text
"shard message stream closed by someone other than the sink" Maybe ShardMsg
msg
)
Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Exiting inner loop of shard"
(ShardState -> ShardState) -> Sem r ()
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (Optic
A_Lens
NoIx
ShardState
ShardState
(Maybe Connection)
(Maybe Connection)
#wsConn Optic
A_Lens
NoIx
ShardState
ShardState
(Maybe Connection)
(Maybe Connection)
-> Maybe Connection -> ShardState -> ShardState
forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a b -> b -> s -> t
.~ Maybe Connection
forall a. Maybe a
Nothing)
Sem r ()
forall (r :: EffectRow). ShardC r => Sem r ()
haltHeartBeat
ShardFlowControl -> Sem r ShardFlowControl
forall a. a -> Sem r a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ShardFlowControl
result
handleMsg :: (ShardC r, P.Member (P.Error ShardFlowControl) r) => ShardMsg -> Sem r ()
handleMsg :: forall (r :: EffectRow).
(ShardC r, Member (Error ShardFlowControl) r) =>
ShardMsg -> Sem r ()
handleMsg (Discord ReceivedDiscordMessage
msg) = case ReceivedDiscordMessage
msg of
EvtDispatch Int
sn DispatchData
data' -> do
(ShardState -> ShardState) -> Sem r ()
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (Optic' A_Lens NoIx ShardState (Maybe Int)
#seqNum Optic' A_Lens NoIx ShardState (Maybe Int)
-> Int -> ShardState -> ShardState
forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a (Maybe b) -> b -> s -> t
?~ Int
sn)
case DispatchData
data' of
Ready ReadyData
rdata' ->
(ShardState -> ShardState) -> Sem r ()
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (Optic' A_Lens NoIx ShardState (Maybe Text)
#sessionID Optic' A_Lens NoIx ShardState (Maybe Text)
-> Text -> ShardState -> ShardState
forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a (Maybe b) -> b -> s -> t
?~ (ReadyData
rdata' ReadyData -> Optic' A_Lens NoIx ReadyData Text -> Text
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx ReadyData Text
#sessionID))
DispatchData
_NotReady -> () -> Sem r ()
forall a. a -> Sem r a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Shard
shard <- (ShardState -> Shard) -> Sem r Shard
forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState -> Optic' A_Lens NoIx ShardState Shard -> Shard
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx ShardState Shard
#shardS)
IO () -> Sem r ()
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem r ()) -> IO () -> Sem r ()
forall a b. (a -> b) -> a -> b
$ InChan CalamityEvent -> CalamityEvent -> IO ()
forall a. InChan a -> a -> IO ()
UC.writeChan (Shard
shard Shard
-> Optic' A_Lens NoIx Shard (InChan CalamityEvent)
-> InChan CalamityEvent
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx Shard (InChan CalamityEvent)
#evtIn) (Int -> DispatchData -> CalamityEvent
Dispatch (Shard
shard Shard -> Optic' A_Lens NoIx Shard Int -> Int
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx Shard Int
#shardID) DispatchData
data')
ReceivedDiscordMessage
HeartBeatReq -> do
Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Received heartbeat request"
Sem r ()
forall (r :: EffectRow). ShardC r => Sem r ()
sendHeartBeat
ReceivedDiscordMessage
Reconnect -> do
Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Being asked to restart by Discord"
ShardFlowControl -> Sem r ()
forall e (r :: EffectRow) a. Member (Error e) r => e -> Sem r a
P.throw ShardFlowControl
ShardFlowRestart
InvalidSession Bool
resumable -> do
if Bool
resumable
then Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info Text
"Received resumable invalid session"
else do
Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info Text
"Received non-resumable invalid session, sleeping for 15 seconds then retrying"
(ShardState -> ShardState) -> Sem r ()
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (Optic' A_Lens NoIx ShardState (Maybe Text)
#sessionID Optic' A_Lens NoIx ShardState (Maybe Text)
-> Maybe Text -> ShardState -> ShardState
forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a b -> b -> s -> t
.~ Maybe Text
forall a. Maybe a
Nothing)
(ShardState -> ShardState) -> Sem r ()
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (Optic' A_Lens NoIx ShardState (Maybe Int)
#seqNum Optic' A_Lens NoIx ShardState (Maybe Int)
-> Maybe Int -> ShardState -> ShardState
forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a b -> b -> s -> t
.~ Maybe Int
forall a. Maybe a
Nothing)
IO () -> Sem r ()
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem r ()) -> IO () -> Sem r ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Int
15 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000)
ShardFlowControl -> Sem r ()
forall e (r :: EffectRow) a. Member (Error e) r => e -> Sem r a
P.throw ShardFlowControl
ShardFlowRestart
Hello Int
interval -> do
Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
info (Text -> Sem r ()) -> (String -> Text) -> String -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack (String -> Sem r ()) -> String -> Sem r ()
forall a b. (a -> b) -> a -> b
$ String
"Received hello, beginning to heartbeat at an interval of " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
interval String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"ms"
Int -> Sem r ()
forall (r :: EffectRow). ShardC r => Int -> Sem r ()
startHeartBeatLoop Int
interval
ReceivedDiscordMessage
HeartBeatAck -> do
Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Received heartbeat ack"
(ShardState -> ShardState) -> Sem r ()
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (Optic A_Lens NoIx ShardState ShardState Bool Bool
#hbResponse Optic A_Lens NoIx ShardState ShardState Bool Bool
-> Bool -> ShardState -> ShardState
forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a b -> b -> s -> t
.~ Bool
True)
handleMsg (Control ControlMessage
msg) = case ControlMessage
msg of
SendPresence StatusUpdateData
data' -> do
Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug (Text -> Sem r ()) -> (String -> Text) -> String -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack (String -> Sem r ()) -> String -> Sem r ()
forall a b. (a -> b) -> a -> b
$ String
"Sending presence: (" String -> String -> String
forall a. Semigroup a => a -> a -> a
<> StatusUpdateData -> String
forall a. Show a => a -> String
show StatusUpdateData
data' String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
")"
SentDiscordMessage -> Sem r ()
forall (r :: EffectRow). ShardC r => SentDiscordMessage -> Sem r ()
sendToWs (SentDiscordMessage -> Sem r ()) -> SentDiscordMessage -> Sem r ()
forall a b. (a -> b) -> a -> b
$ StatusUpdateData -> SentDiscordMessage
StatusUpdate StatusUpdateData
data'
ControlMessage
RestartShard -> ShardFlowControl -> Sem r ()
forall e (r :: EffectRow) a. Member (Error e) r => e -> Sem r a
P.throw ShardFlowControl
ShardFlowRestart
ControlMessage
ShutDownShard -> ShardFlowControl -> Sem r ()
forall e (r :: EffectRow) a. Member (Error e) r => e -> Sem r a
P.throw ShardFlowControl
ShardFlowShutDown
startHeartBeatLoop :: ShardC r => Int -> Sem r ()
startHeartBeatLoop :: forall (r :: EffectRow). ShardC r => Int -> Sem r ()
startHeartBeatLoop Int
interval = do
Sem r ()
forall (r :: EffectRow). ShardC r => Sem r ()
haltHeartBeat
Async (Maybe ())
thread <- Sem r () -> Sem r (Async (Maybe ()))
forall (r :: EffectRow) a.
Member Async r =>
Sem r a -> Sem r (Async (Maybe a))
P.async (Sem r () -> Sem r (Async (Maybe ())))
-> Sem r () -> Sem r (Async (Maybe ()))
forall a b. (a -> b) -> a -> b
$ Int -> Sem r ()
forall (r :: EffectRow). ShardC r => Int -> Sem r ()
heartBeatLoop Int
interval
(ShardState -> ShardState) -> Sem r ()
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (Optic
A_Lens
NoIx
ShardState
ShardState
(Maybe (Async (Maybe ())))
(Maybe (Async (Maybe ())))
#hbThread Optic
A_Lens
NoIx
ShardState
ShardState
(Maybe (Async (Maybe ())))
(Maybe (Async (Maybe ())))
-> Async (Maybe ()) -> ShardState -> ShardState
forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a (Maybe b) -> b -> s -> t
?~ Async (Maybe ())
thread)
haltHeartBeat :: ShardC r => Sem r ()
haltHeartBeat :: forall (r :: EffectRow). ShardC r => Sem r ()
haltHeartBeat = do
Maybe (Async (Maybe ()))
thread <- forall s a (r :: EffectRow).
Member (AtomicState s) r =>
(s -> (s, a)) -> Sem r a
P.atomicState @ShardState ((ShardState -> (ShardState, Maybe (Async (Maybe ()))))
-> Sem r (Maybe (Async (Maybe ()))))
-> (State ShardState (Maybe (Async (Maybe ())))
-> ShardState -> (ShardState, Maybe (Async (Maybe ()))))
-> State ShardState (Maybe (Async (Maybe ())))
-> Sem r (Maybe (Async (Maybe ())))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((Maybe (Async (Maybe ())), ShardState)
-> (ShardState, Maybe (Async (Maybe ())))
forall a b. (a, b) -> (b, a)
swap ((Maybe (Async (Maybe ())), ShardState)
-> (ShardState, Maybe (Async (Maybe ()))))
-> (ShardState -> (Maybe (Async (Maybe ())), ShardState))
-> ShardState
-> (ShardState, Maybe (Async (Maybe ())))
forall b c a. (b -> c) -> (a -> b) -> a -> c
.) ((ShardState -> (Maybe (Async (Maybe ())), ShardState))
-> ShardState -> (ShardState, Maybe (Async (Maybe ()))))
-> (State ShardState (Maybe (Async (Maybe ())))
-> ShardState -> (Maybe (Async (Maybe ())), ShardState))
-> State ShardState (Maybe (Async (Maybe ())))
-> ShardState
-> (ShardState, Maybe (Async (Maybe ())))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. State ShardState (Maybe (Async (Maybe ())))
-> ShardState -> (Maybe (Async (Maybe ())), ShardState)
forall s a. State s a -> s -> (a, s)
runState (State ShardState (Maybe (Async (Maybe ())))
-> Sem r (Maybe (Async (Maybe ()))))
-> State ShardState (Maybe (Async (Maybe ())))
-> Sem r (Maybe (Async (Maybe ())))
forall a b. (a -> b) -> a -> b
$ do
Maybe (Async (Maybe ()))
thread <- Optic
A_Lens
NoIx
ShardState
ShardState
(Maybe (Async (Maybe ())))
(Maybe (Async (Maybe ())))
-> State ShardState (Maybe (Async (Maybe ())))
forall k s (m :: * -> *) (is :: IxList) a.
(Is k A_Getter, MonadState s m) =>
Optic' k is s a -> m a
use Optic
A_Lens
NoIx
ShardState
ShardState
(Maybe (Async (Maybe ())))
(Maybe (Async (Maybe ())))
#hbThread
#hbThread .= Nothing
Maybe (Async (Maybe ()))
-> State ShardState (Maybe (Async (Maybe ())))
forall a. a -> StateT ShardState Identity a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Async (Maybe ()))
thread
case Maybe (Async (Maybe ()))
thread of
Just Async (Maybe ())
t -> do
Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"Stopping heartbeat thread"
IO () -> Sem r ()
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Async (Maybe ()) -> IO ()
forall a. Async a -> IO ()
cancel Async (Maybe ())
t)
Maybe (Async (Maybe ()))
Nothing -> () -> Sem r ()
forall a. a -> Sem r a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
sendHeartBeat :: ShardC r => Sem r ()
sendHeartBeat :: forall (r :: EffectRow). ShardC r => Sem r ()
sendHeartBeat = do
Maybe Int
sn <- (ShardState -> Maybe Int) -> Sem r (Maybe Int)
forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState
-> Optic' A_Lens NoIx ShardState (Maybe Int) -> Maybe Int
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic' A_Lens NoIx ShardState (Maybe Int)
#seqNum)
Text -> Sem r ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug (Text -> Sem r ()) -> (String -> Text) -> String -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack (String -> Sem r ()) -> String -> Sem r ()
forall a b. (a -> b) -> a -> b
$ String
"Sending heartbeat (seq: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Maybe Int -> String
forall a. Show a => a -> String
show Maybe Int
sn String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
")"
SentDiscordMessage -> Sem r ()
forall (r :: EffectRow). ShardC r => SentDiscordMessage -> Sem r ()
sendToWs (SentDiscordMessage -> Sem r ()) -> SentDiscordMessage -> Sem r ()
forall a b. (a -> b) -> a -> b
$ Maybe Int -> SentDiscordMessage
HeartBeat Maybe Int
sn
(ShardState -> ShardState) -> Sem r ()
forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
P.atomicModify' (Optic A_Lens NoIx ShardState ShardState Bool Bool
#hbResponse Optic A_Lens NoIx ShardState ShardState Bool Bool
-> Bool -> ShardState -> ShardState
forall k (is :: IxList) s t a b.
Is k A_Setter =>
Optic k is s t a b -> b -> s -> t
.~ Bool
False)
heartBeatLoop :: ShardC r => Int -> Sem r ()
heartBeatLoop :: forall (r :: EffectRow). ShardC r => Int -> Sem r ()
heartBeatLoop Int
interval = Sem r (Maybe ()) -> Sem r ()
forall (r :: EffectRow) a.
Member (Final IO) r =>
Sem r (Maybe a) -> Sem r a
untilJustFinalIO (Sem r (Maybe ()) -> Sem r ())
-> (Sem (Error () : r) () -> Sem r (Maybe ()))
-> Sem (Error () : r) ()
-> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Either () () -> Maybe ()
forall e a. Either e a -> Maybe e
leftToMaybe (Either () () -> Maybe ())
-> Sem r (Either () ()) -> Sem r (Maybe ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>) (Sem r (Either () ()) -> Sem r (Maybe ()))
-> (Sem (Error () : r) () -> Sem r (Either () ()))
-> Sem (Error () : r) ()
-> Sem r (Maybe ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sem (Error () : r) () -> Sem r (Either () ())
forall e (r :: EffectRow) a.
Sem (Error e : r) a -> Sem r (Either e a)
P.runError (Sem (Error () : r) () -> Sem r ())
-> Sem (Error () : r) () -> Sem r ()
forall a b. (a -> b) -> a -> b
$ do
Sem (Error () : r) ()
forall (r :: EffectRow). ShardC r => Sem r ()
sendHeartBeat
IO () -> Sem (Error () : r) ()
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem (Error () : r) ())
-> (Int -> IO ()) -> Int -> Sem (Error () : r) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO ()
threadDelay (Int -> Sem (Error () : r) ()) -> Int -> Sem (Error () : r) ()
forall a b. (a -> b) -> a -> b
$ Int
interval Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000
Sem (Error () : r) Bool
-> Sem (Error () : r) () -> Sem (Error () : r) ()
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
unlessM ((ShardState -> Bool) -> Sem (Error () : r) Bool
forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState
-> Optic A_Lens NoIx ShardState ShardState Bool Bool -> Bool
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic A_Lens NoIx ShardState ShardState Bool Bool
#hbResponse)) (Sem (Error () : r) () -> Sem (Error () : r) ())
-> Sem (Error () : r) () -> Sem (Error () : r) ()
forall a b. (a -> b) -> a -> b
$ do
Text -> Sem (Error () : r) ()
forall (r :: EffectRow). Member LogEff r => Text -> Sem r ()
debug Text
"No heartbeat response, restarting shard"
Connection
wsConn <- () -> Maybe Connection -> Sem (Error () : r) Connection
forall e (r :: EffectRow) a.
Member (Error e) r =>
e -> Maybe a -> Sem r a
P.note () (Maybe Connection -> Sem (Error () : r) Connection)
-> Sem (Error () : r) (Maybe Connection)
-> Sem (Error () : r) Connection
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (ShardState -> Maybe Connection)
-> Sem (Error () : r) (Maybe Connection)
forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
P.atomicGets (ShardState
-> Optic
A_Lens
NoIx
ShardState
ShardState
(Maybe Connection)
(Maybe Connection)
-> Maybe Connection
forall k s (is :: IxList) a.
Is k A_Getter =>
s -> Optic' k is s a -> a
^. Optic
A_Lens
NoIx
ShardState
ShardState
(Maybe Connection)
(Maybe Connection)
#wsConn)
IO () -> Sem (Error () : r) ()
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
P.embed (IO () -> Sem (Error () : r) ()) -> IO () -> Sem (Error () : r) ()
forall a b. (a -> b) -> a -> b
$ Connection -> Word16 -> Text -> IO ()
forall a. WebSocketsData a => Connection -> Word16 -> a -> IO ()
sendCloseCode Connection
wsConn Word16
4000 (Text
"No heartbeat in time" :: T.Text)
() -> Sem (Error () : r) ()
forall e (r :: EffectRow) a. Member (Error e) r => e -> Sem r a
P.throw ()