{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
module Network.XMPP.Concurrent
( Thread
, XmppThreadT
, runThreaded
, readChanS
, writeChanS
, withNewThread
, loop
, waitFor
) where
import Control.Concurrent
import Control.Monad.State
import Control.Monad.Reader
import Network.XMPP.Stream
import Network.XMPP.Types
import Network.XMPP.Utils
import Network.XMPP.XML
import UnliftIO.Async (Async, async)
import UnliftIO (TChan, MonadUnliftIO, atomically, newTChan,
writeTChan, readTChan, dupTChan)
import System.IO
data Thread e = Thread
{ Thread e -> TChan (Either XmppError (SomeStanza e))
tInCh :: TChan (Either XmppError (SomeStanza e))
, Thread e -> TChan (SomeStanza ())
tOutCh :: TChan (SomeStanza ())
}
type XmppThreadT m a e = ReaderT (Thread e) m a
instance MonadIO m => XmppSendable (ReaderT (Thread e) m) (Stanza t 'Outgoing ()) where
xmppSend :: Stanza t 'Outgoing () -> ReaderT (Thread e) m ()
xmppSend = SomeStanza () -> ReaderT (Thread e) m ()
forall (m :: * -> *) e.
MonadIO m =>
SomeStanza () -> XmppThreadT m () e
writeChanS (SomeStanza () -> ReaderT (Thread e) m ())
-> (Stanza t 'Outgoing () -> SomeStanza ())
-> Stanza t 'Outgoing ()
-> ReaderT (Thread e) m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stanza t 'Outgoing () -> SomeStanza ()
forall e (a :: StanzaType) (p :: StanzaPurpose).
Stanza a p e -> SomeStanza e
SomeStanza
runThreaded
:: (FromXML e, MonadIO m, MonadUnliftIO m)
=> XmppThreadT m () e
-> XmppMonad m ()
runThreaded :: XmppThreadT m () e -> XmppMonad m ()
runThreaded XmppThreadT m () e
action = do
(TChan (Either XmppError (SomeStanza e))
in', TChan (SomeStanza ())
out') <- STM
(TChan (Either XmppError (SomeStanza e)), TChan (SomeStanza ()))
-> XmppMonad
m (TChan (Either XmppError (SomeStanza e)), TChan (SomeStanza ()))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM
(TChan (Either XmppError (SomeStanza e)), TChan (SomeStanza ()))
-> XmppMonad
m (TChan (Either XmppError (SomeStanza e)), TChan (SomeStanza ())))
-> STM
(TChan (Either XmppError (SomeStanza e)), TChan (SomeStanza ()))
-> XmppMonad
m (TChan (Either XmppError (SomeStanza e)), TChan (SomeStanza ()))
forall a b. (a -> b) -> a -> b
$ (,) (TChan (Either XmppError (SomeStanza e))
-> TChan (SomeStanza ())
-> (TChan (Either XmppError (SomeStanza e)),
TChan (SomeStanza ())))
-> STM (TChan (Either XmppError (SomeStanza e)))
-> STM
(TChan (SomeStanza ())
-> (TChan (Either XmppError (SomeStanza e)),
TChan (SomeStanza ())))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM (TChan (Either XmppError (SomeStanza e)))
forall a. STM (TChan a)
newTChan STM
(TChan (SomeStanza ())
-> (TChan (Either XmppError (SomeStanza e)),
TChan (SomeStanza ())))
-> STM (TChan (SomeStanza ()))
-> STM
(TChan (Either XmppError (SomeStanza e)), TChan (SomeStanza ()))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM (TChan (SomeStanza ()))
forall a. STM (TChan a)
newTChan
s :: Stream
s@Stream{Int
[Token]
Handle
lexemes :: Stream -> [Token]
idx :: Stream -> Int
handle :: Stream -> Handle
lexemes :: [Token]
idx :: Int
handle :: Handle
..} <- XmppMonad m Stream
forall s (m :: * -> *). MonadState s m => m s
get
XmppMonad m (Async ()) -> XmppMonad m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (XmppMonad m (Async ()) -> XmppMonad m ())
-> XmppMonad m (Async ()) -> XmppMonad m ()
forall a b. (a -> b) -> a -> b
$ m (Async ()) -> XmppMonad m (Async ())
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (Async ()) -> XmppMonad m (Async ()))
-> m (Async ()) -> XmppMonad m (Async ())
forall a b. (a -> b) -> a -> b
$
m () -> m (Async ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (XmppThreadT m () e -> Thread e -> m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT XmppThreadT m () e
action (Thread e -> m ()) -> Thread e -> m ()
forall a b. (a -> b) -> a -> b
$ TChan (Either XmppError (SomeStanza e))
-> TChan (SomeStanza ()) -> Thread e
forall e.
TChan (Either XmppError (SomeStanza e))
-> TChan (SomeStanza ()) -> Thread e
Thread TChan (Either XmppError (SomeStanza e))
in' TChan (SomeStanza ())
out') m (Async ()) -> m (Async ()) -> m (Async ())
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>>
m () -> m (Async ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (m (Async ((), Stream)) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Async ((), Stream)) -> m ()) -> m (Async ((), Stream)) -> m ()
forall a b. (a -> b) -> a -> b
$ m ((), Stream) -> m (Async ((), Stream))
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (m ((), Stream) -> m (Async ((), Stream)))
-> m ((), Stream) -> m (Async ((), Stream))
forall a b. (a -> b) -> a -> b
$ Stream -> XmppMonad m () -> m ((), Stream)
forall (m :: * -> *) a.
MonadIO m =>
Stream -> XmppMonad m a -> m (a, Stream)
runXmppMonad' Stream
s (XmppMonad m () -> m ((), Stream))
-> XmppMonad m () -> m ((), Stream)
forall a b. (a -> b) -> a -> b
$ TChan (SomeStanza ()) -> XmppMonad m ()
forall (m :: * -> *) e.
MonadIO m =>
TChan (SomeStanza e) -> XmppMonad m ()
loopWrite TChan (SomeStanza ())
out') m (Async ()) -> m (Async ()) -> m (Async ())
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>>
m () -> m (Async ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (Handle -> m ()
forall (m :: * -> *). MonadIO m => Handle -> m ()
connPersist Handle
handle)
TChan (Either XmppError (SomeStanza e)) -> XmppMonad m ()
forall (m :: * -> *) e.
(FromXML e, MonadIO m) =>
TChan (Either XmppError (SomeStanza e)) -> XmppMonad m ()
loopRead TChan (Either XmppError (SomeStanza e))
in'
where
loopRead :: TChan (Either XmppError (SomeStanza e)) -> XmppMonad m ()
loopRead TChan (Either XmppError (SomeStanza e))
in' = do
Either XmppError (SomeStanza e)
msg <- XmppMonad m (Either XmppError (SomeStanza e))
forall e (m :: * -> *).
(FromXML e, MonadIO m) =>
XmppMonad m (Either XmppError (SomeStanza e))
parseM
STM () -> XmppMonad m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> XmppMonad m ()) -> STM () -> XmppMonad m ()
forall a b. (a -> b) -> a -> b
$ TChan (Either XmppError (SomeStanza e))
-> Either XmppError (SomeStanza e) -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan (Either XmppError (SomeStanza e))
in' Either XmppError (SomeStanza e)
msg
case Either XmppError (SomeStanza e)
msg of
Left XmppError
StreamClosedError -> () -> XmppMonad m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Left XmppError
RanOutOfInput -> () -> XmppMonad m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Either XmppError (SomeStanza e)
_ -> TChan (Either XmppError (SomeStanza e)) -> XmppMonad m ()
loopRead TChan (Either XmppError (SomeStanza e))
in'
loopWrite :: MonadIO m => TChan (SomeStanza e) -> XmppMonad m ()
loopWrite :: TChan (SomeStanza e) -> XmppMonad m ()
loopWrite TChan (SomeStanza e)
out'= do
IO (SomeStanza e) -> XmppMonad m (SomeStanza e)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (STM (SomeStanza e) -> IO (SomeStanza e)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (SomeStanza e) -> IO (SomeStanza e))
-> STM (SomeStanza e) -> IO (SomeStanza e)
forall a b. (a -> b) -> a -> b
$ TChan (SomeStanza e) -> STM (SomeStanza e)
forall a. TChan a -> STM a
readTChan TChan (SomeStanza e)
out') XmppMonad m (SomeStanza e)
-> (SomeStanza e -> XmppMonad m ()) -> XmppMonad m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
SomeStanza stnz :: Stanza a p e
stnz@MkMessage { mPurpose :: forall (p :: StanzaPurpose) ext. Stanza 'Message p ext -> Sing p
mPurpose = Sing p
SOutgoing } -> Stanza a p e -> XmppMonad m ()
forall (t :: * -> *) a. (XmppSendable t a, Monad t) => a -> t ()
xmppSend Stanza a p e
stnz
SomeStanza stnz :: Stanza a p e
stnz@MkPresence { pPurpose :: forall (p :: StanzaPurpose) ext. Stanza 'Presence p ext -> Sing p
pPurpose = Sing p
SOutgoing } -> Stanza a p e -> XmppMonad m ()
forall (t :: * -> *) a. (XmppSendable t a, Monad t) => a -> t ()
xmppSend Stanza a p e
stnz
SomeStanza stnz :: Stanza a p e
stnz@MkIQ { iqPurpose :: forall (p :: StanzaPurpose) ext. Stanza 'IQ p ext -> Sing p
iqPurpose = Sing p
SOutgoing } -> Stanza a p e -> XmppMonad m ()
forall (t :: * -> *) a. (XmppSendable t a, Monad t) => a -> t ()
xmppSend Stanza a p e
stnz
SomeStanza e
_ -> () -> XmppMonad m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
TChan (SomeStanza e) -> XmppMonad m ()
forall (m :: * -> *) e.
MonadIO m =>
TChan (SomeStanza e) -> XmppMonad m ()
loopWrite TChan (SomeStanza e)
out'
readChanS :: MonadIO m => XmppThreadT m (Either XmppError (SomeStanza e)) e
readChanS :: XmppThreadT m (Either XmppError (SomeStanza e)) e
readChanS = (Thread e -> TChan (Either XmppError (SomeStanza e)))
-> ReaderT (Thread e) m (TChan (Either XmppError (SomeStanza e)))
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Thread e -> TChan (Either XmppError (SomeStanza e))
forall e. Thread e -> TChan (Either XmppError (SomeStanza e))
tInCh ReaderT (Thread e) m (TChan (Either XmppError (SomeStanza e)))
-> (TChan (Either XmppError (SomeStanza e))
-> XmppThreadT m (Either XmppError (SomeStanza e)) e)
-> XmppThreadT m (Either XmppError (SomeStanza e)) e
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO (Either XmppError (SomeStanza e))
-> XmppThreadT m (Either XmppError (SomeStanza e)) e
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either XmppError (SomeStanza e))
-> XmppThreadT m (Either XmppError (SomeStanza e)) e)
-> (TChan (Either XmppError (SomeStanza e))
-> IO (Either XmppError (SomeStanza e)))
-> TChan (Either XmppError (SomeStanza e))
-> XmppThreadT m (Either XmppError (SomeStanza e)) e
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (Either XmppError (SomeStanza e))
-> IO (Either XmppError (SomeStanza e))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (Either XmppError (SomeStanza e))
-> IO (Either XmppError (SomeStanza e)))
-> (TChan (Either XmppError (SomeStanza e))
-> STM (Either XmppError (SomeStanza e)))
-> TChan (Either XmppError (SomeStanza e))
-> IO (Either XmppError (SomeStanza e))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TChan (Either XmppError (SomeStanza e))
-> STM (Either XmppError (SomeStanza e))
forall a. TChan a -> STM a
readTChan
writeChanS :: MonadIO m => SomeStanza () -> XmppThreadT m () e
writeChanS :: SomeStanza () -> XmppThreadT m () e
writeChanS SomeStanza ()
a = XmppThreadT m () e -> XmppThreadT m () e
forall (f :: * -> *) a. Functor f => f a -> f ()
void (XmppThreadT m () e -> XmppThreadT m () e)
-> XmppThreadT m () e -> XmppThreadT m () e
forall a b. (a -> b) -> a -> b
$ (Thread e -> TChan (SomeStanza ()))
-> ReaderT (Thread e) m (TChan (SomeStanza ()))
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Thread e -> TChan (SomeStanza ())
forall e. Thread e -> TChan (SomeStanza ())
tOutCh ReaderT (Thread e) m (TChan (SomeStanza ()))
-> (TChan (SomeStanza ()) -> XmppThreadT m () e)
-> XmppThreadT m () e
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO () -> XmppThreadT m () e
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> XmppThreadT m () e)
-> (TChan (SomeStanza ()) -> IO ())
-> TChan (SomeStanza ())
-> XmppThreadT m () e
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ())
-> (TChan (SomeStanza ()) -> STM ())
-> TChan (SomeStanza ())
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (TChan (SomeStanza ()) -> SomeStanza () -> STM ())
-> SomeStanza () -> TChan (SomeStanza ()) -> STM ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip TChan (SomeStanza ()) -> SomeStanza () -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan SomeStanza ()
a
withNewThread
:: (MonadIO m, MonadUnliftIO m)
=> XmppThreadT m () e
-> XmppThreadT m (Async ()) e
withNewThread :: XmppThreadT m () e -> XmppThreadT m (Async ()) e
withNewThread XmppThreadT m () e
a = do
TChan (Either XmppError (SomeStanza e))
newin <- (Thread e -> TChan (Either XmppError (SomeStanza e)))
-> ReaderT (Thread e) m (TChan (Either XmppError (SomeStanza e)))
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Thread e -> TChan (Either XmppError (SomeStanza e))
forall e. Thread e -> TChan (Either XmppError (SomeStanza e))
tInCh ReaderT (Thread e) m (TChan (Either XmppError (SomeStanza e)))
-> (TChan (Either XmppError (SomeStanza e))
-> ReaderT (Thread e) m (TChan (Either XmppError (SomeStanza e))))
-> ReaderT (Thread e) m (TChan (Either XmppError (SomeStanza e)))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO (TChan (Either XmppError (SomeStanza e)))
-> ReaderT (Thread e) m (TChan (Either XmppError (SomeStanza e)))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TChan (Either XmppError (SomeStanza e)))
-> ReaderT (Thread e) m (TChan (Either XmppError (SomeStanza e))))
-> (TChan (Either XmppError (SomeStanza e))
-> IO (TChan (Either XmppError (SomeStanza e))))
-> TChan (Either XmppError (SomeStanza e))
-> ReaderT (Thread e) m (TChan (Either XmppError (SomeStanza e)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (TChan (Either XmppError (SomeStanza e)))
-> IO (TChan (Either XmppError (SomeStanza e)))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (TChan (Either XmppError (SomeStanza e)))
-> IO (TChan (Either XmppError (SomeStanza e))))
-> (TChan (Either XmppError (SomeStanza e))
-> STM (TChan (Either XmppError (SomeStanza e))))
-> TChan (Either XmppError (SomeStanza e))
-> IO (TChan (Either XmppError (SomeStanza e)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TChan (Either XmppError (SomeStanza e))
-> STM (TChan (Either XmppError (SomeStanza e)))
forall a. TChan a -> STM (TChan a)
dupTChan
(Thread e -> TChan (SomeStanza ()))
-> ReaderT (Thread e) m (TChan (SomeStanza ()))
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Thread e -> TChan (SomeStanza ())
forall e. Thread e -> TChan (SomeStanza ())
tOutCh ReaderT (Thread e) m (TChan (SomeStanza ()))
-> (TChan (SomeStanza ()) -> XmppThreadT m (Async ()) e)
-> XmppThreadT m (Async ()) e
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= m (Async ()) -> XmppThreadT m (Async ()) e
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (Async ()) -> XmppThreadT m (Async ()) e)
-> (TChan (SomeStanza ()) -> m (Async ()))
-> TChan (SomeStanza ())
-> XmppThreadT m (Async ()) e
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> m (Async ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (m () -> m (Async ()))
-> (TChan (SomeStanza ()) -> m ())
-> TChan (SomeStanza ())
-> m (Async ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. XmppThreadT m () e -> Thread e -> m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT XmppThreadT m () e
a (Thread e -> m ())
-> (TChan (SomeStanza ()) -> Thread e)
-> TChan (SomeStanza ())
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TChan (Either XmppError (SomeStanza e))
-> TChan (SomeStanza ()) -> Thread e
forall e.
TChan (Either XmppError (SomeStanza e))
-> TChan (SomeStanza ()) -> Thread e
Thread TChan (Either XmppError (SomeStanza e))
newin
loop :: MonadIO m => XmppThreadT m () e -> XmppThreadT m () e
loop :: XmppThreadT m () e -> XmppThreadT m () e
loop XmppThreadT m () e
a = XmppThreadT m () e
a XmppThreadT m () e -> XmppThreadT m () e -> XmppThreadT m () e
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> XmppThreadT m () e -> XmppThreadT m () e
forall (m :: * -> *) e.
MonadIO m =>
XmppThreadT m () e -> XmppThreadT m () e
loop XmppThreadT m () e
a
waitFor
:: MonadIO m
=> (Either XmppError (SomeStanza e) -> Bool)
-> XmppThreadT m (Either XmppError (SomeStanza e)) e
waitFor :: (Either XmppError (SomeStanza e) -> Bool)
-> XmppThreadT m (Either XmppError (SomeStanza e)) e
waitFor Either XmppError (SomeStanza e) -> Bool
f = do
Either XmppError (SomeStanza e)
s <- XmppThreadT m (Either XmppError (SomeStanza e)) e
forall (m :: * -> *) e.
MonadIO m =>
XmppThreadT m (Either XmppError (SomeStanza e)) e
readChanS
if Either XmppError (SomeStanza e) -> Bool
f Either XmppError (SomeStanza e)
s then Either XmppError (SomeStanza e)
-> XmppThreadT m (Either XmppError (SomeStanza e)) e
forall (m :: * -> *) a. Monad m => a -> m a
return Either XmppError (SomeStanza e)
s else (Either XmppError (SomeStanza e) -> Bool)
-> XmppThreadT m (Either XmppError (SomeStanza e)) e
forall (m :: * -> *) e.
MonadIO m =>
(Either XmppError (SomeStanza e) -> Bool)
-> XmppThreadT m (Either XmppError (SomeStanza e)) e
waitFor Either XmppError (SomeStanza e) -> Bool
f
connPersist :: MonadIO m => Handle -> m ()
connPersist :: Handle -> m ()
connPersist Handle
h = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Handle -> String -> IO ()
hPutStr Handle
h String
" "
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
debugIO String
"<space added>"
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
30000000
Handle -> m ()
forall (m :: * -> *). MonadIO m => Handle -> m ()
connPersist Handle
h