module Mpv.SocketQueues where import Control.Concurrent.STM.TBMQueue (TBMQueue) import qualified Data.Aeson as Aeson import Data.Aeson (Value) import qualified Data.ByteString as ByteString import Exon (exon) import Network.Socket (Socket) import qualified Network.Socket.ByteString as Socket import Polysemy.Conc (withAsync_) import qualified Polysemy.Conc.Data.QueueResult as QueueResult import Polysemy.Conc.Interpreter.Queue.TBM (interpretQueueTBMWith) import qualified Polysemy.Conc.Queue as Queue import qualified Polysemy.Log as Log import Mpv.Data.MpvResources ( InMessage (InMessage, InMessageError), MpvResources (MpvResources), OutMessage (OutMessage), ) messageLines :: ByteString -> ([ByteString], ByteString) messageLines :: ByteString -> ([ByteString], ByteString) messageLines = (ByteString -> [ByteString]) -> (ByteString, ByteString) -> ([ByteString], ByteString) forall (p :: * -> * -> *) a b c. Bifunctor p => (a -> b) -> p a c -> p b c first ((ByteString -> Bool) -> [ByteString] -> [ByteString] forall a. (a -> Bool) -> [a] -> [a] filter (Bool -> Bool not (Bool -> Bool) -> (ByteString -> Bool) -> ByteString -> Bool forall b c a. (b -> c) -> (a -> b) -> a -> c . ByteString -> Bool ByteString.null) ([ByteString] -> [ByteString]) -> (ByteString -> [ByteString]) -> ByteString -> [ByteString] forall b c a. (b -> c) -> (a -> b) -> a -> c . Word8 -> ByteString -> [ByteString] ByteString.split Word8 10) ((ByteString, ByteString) -> ([ByteString], ByteString)) -> (ByteString -> (ByteString, ByteString)) -> ByteString -> ([ByteString], ByteString) forall b c a. (b -> c) -> (a -> b) -> a -> c . (Word8 -> Bool) -> ByteString -> (ByteString, ByteString) ByteString.spanEnd (Word8 -> Word8 -> Bool forall a. Eq a => a -> a -> Bool /= Word8 10) parseInMessage :: ByteString -> InMessage Value parseInMessage :: ByteString -> InMessage Value parseInMessage = ByteString -> Either String Value forall a. FromJSON a => ByteString -> Either String a Aeson.eitherDecodeStrict' (ByteString -> Either String Value) -> (Either String Value -> InMessage Value) -> ByteString -> InMessage Value forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k). Category cat => cat a b -> cat b c -> cat a c >>> \case Right Value v -> Value -> InMessage Value forall fmt. fmt -> InMessage fmt InMessage Value v Left String err -> Text -> InMessage Value forall fmt. Text -> InMessage fmt InMessageError (String -> Text forall a. ToText a => a -> Text toText String err) publishAsInMessage :: Member (Queue (InMessage Value)) r => ByteString -> Sem r () publishAsInMessage :: forall (r :: [(* -> *) -> * -> *]). Member (Queue (InMessage Value)) r => ByteString -> Sem r () publishAsInMessage = InMessage Value -> Sem r () forall d (r :: [(* -> *) -> * -> *]). MemberWithError (Queue d) r => d -> Sem r () Queue.write (InMessage Value -> Sem r ()) -> (ByteString -> InMessage Value) -> ByteString -> Sem r () forall b c a. (b -> c) -> (a -> b) -> a -> c . ByteString -> InMessage Value parseInMessage concatMessages :: ByteString -> [ByteString] -> ByteString -> (ByteString, [ByteString]) concatMessages :: ByteString -> [ByteString] -> ByteString -> (ByteString, [ByteString]) concatMessages ByteString "" [ByteString] complete ByteString extra = (ByteString extra, [ByteString] complete) concatMessages ByteString buf (ByteString rest : [ByteString] complete) ByteString extra = (ByteString extra, (ByteString buf ByteString -> ByteString -> ByteString forall a. Semigroup a => a -> a -> a <> ByteString rest ByteString -> [ByteString] -> [ByteString] forall a. a -> [a] -> [a] : [ByteString] complete)) concatMessages ByteString buf [] ByteString extra = (ByteString buf ByteString -> ByteString -> ByteString forall a. Semigroup a => a -> a -> a <> ByteString extra, []) accumulateMessages :: ByteString -> ByteString -> (ByteString, [ByteString]) accumulateMessages :: ByteString -> ByteString -> (ByteString, [ByteString]) accumulateMessages ByteString buf = ([ByteString] -> ByteString -> (ByteString, [ByteString])) -> ([ByteString], ByteString) -> (ByteString, [ByteString]) forall a b c. (a -> b -> c) -> (a, b) -> c uncurry (ByteString -> [ByteString] -> ByteString -> (ByteString, [ByteString]) concatMessages ByteString buf) (([ByteString], ByteString) -> (ByteString, [ByteString])) -> (ByteString -> ([ByteString], ByteString)) -> ByteString -> (ByteString, [ByteString]) forall b c a. (b -> c) -> (a -> b) -> a -> c . ByteString -> ([ByteString], ByteString) messageLines readQueue :: ∀ r . Members [Queue (InMessage Value), Embed IO] r => Socket -> Sem r () readQueue :: forall (r :: [(* -> *) -> * -> *]). Members '[Queue (InMessage Value), Embed IO] r => Socket -> Sem r () readQueue Socket socket = ByteString -> Sem r () spin ByteString "" where spin :: ByteString -> Sem r () spin ByteString buf = do IO (ByteString, [ByteString]) -> Sem r (Either Text (ByteString, [ByteString])) forall (r :: [(* -> *) -> * -> *]) a. Member (Embed IO) r => IO a -> Sem r (Either Text a) tryAny (ByteString -> ByteString -> (ByteString, [ByteString]) accumulateMessages ByteString buf (ByteString -> (ByteString, [ByteString])) -> IO ByteString -> IO (ByteString, [ByteString]) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> Socket -> Int -> IO ByteString Socket.recv Socket socket Int 4096) Sem r (Either Text (ByteString, [ByteString])) -> (Either Text (ByteString, [ByteString]) -> Sem r ()) -> Sem r () forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b >>= \case Right (ByteString newBuf, [ByteString] complete) -> do (ByteString -> Sem r ()) -> [ByteString] -> Sem r () forall (t :: * -> *) (f :: * -> *) a b. (Foldable t, Applicative f) => (a -> f b) -> t a -> f () traverse_ ByteString -> Sem r () forall (r :: [(* -> *) -> * -> *]). Member (Queue (InMessage Value)) r => ByteString -> Sem r () publishAsInMessage [ByteString] complete ByteString -> Sem r () spin ByteString newBuf Left Text _ -> Sem r () forall (f :: * -> *). Applicative f => f () unit writeQueue :: ∀ r . Members [Queue (OutMessage Value), Log, Embed IO] r => Socket -> Sem r () writeQueue :: forall (r :: [(* -> *) -> * -> *]). Members '[Queue (OutMessage Value), Log, Embed IO] r => Socket -> Sem r () writeQueue Socket socket = Sem r (QueueResult (OutMessage Value)) forall d (r :: [(* -> *) -> * -> *]). MemberWithError (Queue d) r => Sem r (QueueResult d) Queue.read Sem r (QueueResult (OutMessage Value)) -> (QueueResult (OutMessage Value) -> Sem r ()) -> Sem r () forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b >>= \case QueueResult.Success (OutMessage Value msg) -> IO () -> Sem r (Either Text ()) forall (r :: [(* -> *) -> * -> *]) a. Member (Embed IO) r => IO a -> Sem r (Either Text a) tryAny (Socket -> ByteString -> IO () Socket.sendAll Socket socket (ByteString -> ByteString forall l s. LazyStrict l s => l -> s toStrict (Value -> ByteString forall a. ToJSON a => a -> ByteString Aeson.encode Value msg) ByteString -> ByteString -> ByteString forall a. Semigroup a => a -> a -> a <> ByteString "\n")) Sem r (Either Text ()) -> (Either Text () -> Sem r ()) -> Sem r () forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b >>= \case Right () -> Socket -> Sem r () forall (r :: [(* -> *) -> * -> *]). Members '[Queue (OutMessage Value), Log, Embed IO] r => Socket -> Sem r () writeQueue Socket socket Left Text err -> Text -> Sem r () forall (r :: [(* -> *) -> * -> *]). (HasCallStack, Member Log r) => Text -> Sem r () Log.debug [exon|mpv write socket terminated: #{err}|] QueueResult (OutMessage Value) _ -> Sem r () forall (f :: * -> *). Applicative f => f () unit interpretQueues :: Members [Resource, Race, Embed IO] r => TBMQueue (OutMessage fmt) -> TBMQueue (InMessage fmt) -> InterpretersFor [Queue (InMessage fmt), Queue (OutMessage fmt)] r interpretQueues :: forall (r :: [(* -> *) -> * -> *]) fmt. Members '[Resource, Race, Embed IO] r => TBMQueue (OutMessage fmt) -> TBMQueue (InMessage fmt) -> InterpretersFor '[Queue (InMessage fmt), Queue (OutMessage fmt)] r interpretQueues TBMQueue (OutMessage fmt) outQ TBMQueue (InMessage fmt) inQ = TBMQueue (OutMessage fmt) -> InterpreterFor (Queue (OutMessage fmt)) r forall d (r :: [(* -> *) -> * -> *]). Members '[Race, Embed IO] r => TBMQueue d -> InterpreterFor (Queue d) r interpretQueueTBMWith TBMQueue (OutMessage fmt) outQ (Sem (Queue (OutMessage fmt) : r) a -> Sem r a) -> (Sem (Queue (InMessage fmt) : Queue (OutMessage fmt) : r) a -> Sem (Queue (OutMessage fmt) : r) a) -> Sem (Queue (InMessage fmt) : Queue (OutMessage fmt) : r) a -> Sem r a forall b c a. (b -> c) -> (a -> b) -> a -> c . TBMQueue (InMessage fmt) -> InterpreterFor (Queue (InMessage fmt)) (Queue (OutMessage fmt) : r) forall d (r :: [(* -> *) -> * -> *]). Members '[Race, Embed IO] r => TBMQueue d -> InterpreterFor (Queue d) r interpretQueueTBMWith TBMQueue (InMessage fmt) inQ withSocketQueues :: Members [Resource, Async, Race, Log, Embed IO] r => MpvResources Value -> InterpretersFor [Queue (InMessage Value), Queue (OutMessage Value)] r withSocketQueues :: forall (r :: [(* -> *) -> * -> *]). Members '[Resource, Async, Race, Log, Embed IO] r => MpvResources Value -> InterpretersFor '[Queue (InMessage Value), Queue (OutMessage Value)] r withSocketQueues (MpvResources Socket socket TBMQueue (OutMessage Value) outQ TBMQueue (InMessage Value) inQ TVar (Requests Value) _) = TBMQueue (OutMessage Value) -> TBMQueue (InMessage Value) -> InterpretersFor '[Queue (InMessage Value), Queue (OutMessage Value)] r forall (r :: [(* -> *) -> * -> *]) fmt. Members '[Resource, Race, Embed IO] r => TBMQueue (OutMessage fmt) -> TBMQueue (InMessage fmt) -> InterpretersFor '[Queue (InMessage fmt), Queue (OutMessage fmt)] r interpretQueues TBMQueue (OutMessage Value) outQ TBMQueue (InMessage Value) inQ (Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) a -> Sem r a) -> (Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) a -> Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) a) -> Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) a -> Sem r a forall b c a. (b -> c) -> (a -> b) -> a -> c . Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) () -> Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) a -> Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) a forall (r :: [(* -> *) -> * -> *]) b a. Members '[Resource, Race, Async] r => Sem r b -> Sem r a -> Sem r a withAsync_ (Socket -> Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) () forall (r :: [(* -> *) -> * -> *]). Members '[Queue (InMessage Value), Embed IO] r => Socket -> Sem r () readQueue Socket socket) (Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) a -> Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) a) -> (Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) a -> Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) a) -> Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) a -> Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) a forall b c a. (b -> c) -> (a -> b) -> a -> c . Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) () -> Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) a -> Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) a forall (r :: [(* -> *) -> * -> *]) b a. Members '[Resource, Race, Async] r => Sem r b -> Sem r a -> Sem r a withAsync_ (Socket -> Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) () forall (r :: [(* -> *) -> * -> *]). Members '[Queue (OutMessage Value), Log, Embed IO] r => Socket -> Sem r () writeQueue Socket socket)