module Mpv.SocketQueues where

import Control.Concurrent.STM.TBMQueue (TBMQueue)
import qualified Data.Aeson as Aeson
import Data.Aeson (Value)
import qualified Data.ByteString as ByteString
import Exon (exon)
import Network.Socket (Socket)
import qualified Network.Socket.ByteString as Socket
import Polysemy.Conc (withAsync_)
import qualified Polysemy.Conc.Data.QueueResult as QueueResult
import Polysemy.Conc.Interpreter.Queue.TBM (interpretQueueTBMWith)
import qualified Polysemy.Conc.Queue as Queue
import qualified Polysemy.Log as Log

import Mpv.Data.MpvResources (
  InMessage (InMessage, InMessageError),
  MpvResources (MpvResources),
  OutMessage (OutMessage),
  )

messageLines :: ByteString -> ([ByteString], ByteString)
messageLines :: ByteString -> ([ByteString], ByteString)
messageLines =
  (ByteString -> [ByteString])
-> (ByteString, ByteString) -> ([ByteString], ByteString)
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first ((ByteString -> Bool) -> [ByteString] -> [ByteString]
forall a. (a -> Bool) -> [a] -> [a]
filter (Bool -> Bool
not (Bool -> Bool) -> (ByteString -> Bool) -> ByteString -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Bool
ByteString.null) ([ByteString] -> [ByteString])
-> (ByteString -> [ByteString]) -> ByteString -> [ByteString]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word8 -> ByteString -> [ByteString]
ByteString.split Word8
10) ((ByteString, ByteString) -> ([ByteString], ByteString))
-> (ByteString -> (ByteString, ByteString))
-> ByteString
-> ([ByteString], ByteString)
forall b c a. (b -> c) -> (a -> b) -> a -> c
.
  (Word8 -> Bool) -> ByteString -> (ByteString, ByteString)
ByteString.spanEnd (Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
/= Word8
10)

parseInMessage :: ByteString -> InMessage Value
parseInMessage :: ByteString -> InMessage Value
parseInMessage =
  ByteString -> Either String Value
forall a. FromJSON a => ByteString -> Either String a
Aeson.eitherDecodeStrict' (ByteString -> Either String Value)
-> (Either String Value -> InMessage Value)
-> ByteString
-> InMessage Value
forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> \case
    Right Value
v -> Value -> InMessage Value
forall fmt. fmt -> InMessage fmt
InMessage Value
v
    Left String
err -> Text -> InMessage Value
forall fmt. Text -> InMessage fmt
InMessageError (String -> Text
forall a. ToText a => a -> Text
toText String
err)

publishAsInMessage ::
  Member (Queue (InMessage Value)) r =>
  ByteString ->
  Sem r ()
publishAsInMessage :: forall (r :: [(* -> *) -> * -> *]).
Member (Queue (InMessage Value)) r =>
ByteString -> Sem r ()
publishAsInMessage =
  InMessage Value -> Sem r ()
forall d (r :: [(* -> *) -> * -> *]).
MemberWithError (Queue d) r =>
d -> Sem r ()
Queue.write (InMessage Value -> Sem r ())
-> (ByteString -> InMessage Value) -> ByteString -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> InMessage Value
parseInMessage

concatMessages ::
  ByteString ->
  [ByteString] ->
  ByteString ->
  (ByteString, [ByteString])
concatMessages :: ByteString
-> [ByteString] -> ByteString -> (ByteString, [ByteString])
concatMessages ByteString
"" [ByteString]
complete ByteString
extra =
  (ByteString
extra, [ByteString]
complete)
concatMessages ByteString
buf (ByteString
rest : [ByteString]
complete) ByteString
extra =
  (ByteString
extra, (ByteString
buf ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
rest ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: [ByteString]
complete))
concatMessages ByteString
buf [] ByteString
extra =
  (ByteString
buf ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
extra, [])

accumulateMessages ::
  ByteString ->
  ByteString ->
  (ByteString, [ByteString])
accumulateMessages :: ByteString -> ByteString -> (ByteString, [ByteString])
accumulateMessages ByteString
buf =
  ([ByteString] -> ByteString -> (ByteString, [ByteString]))
-> ([ByteString], ByteString) -> (ByteString, [ByteString])
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry (ByteString
-> [ByteString] -> ByteString -> (ByteString, [ByteString])
concatMessages ByteString
buf) (([ByteString], ByteString) -> (ByteString, [ByteString]))
-> (ByteString -> ([ByteString], ByteString))
-> ByteString
-> (ByteString, [ByteString])
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ([ByteString], ByteString)
messageLines

readQueue ::
   r .
  Members [Queue (InMessage Value), Embed IO] r =>
  Socket ->
  Sem r ()
readQueue :: forall (r :: [(* -> *) -> * -> *]).
Members '[Queue (InMessage Value), Embed IO] r =>
Socket -> Sem r ()
readQueue Socket
socket =
  ByteString -> Sem r ()
spin ByteString
""
  where
    spin :: ByteString -> Sem r ()
spin ByteString
buf = do
      IO (ByteString, [ByteString])
-> Sem r (Either Text (ByteString, [ByteString]))
forall (r :: [(* -> *) -> * -> *]) a.
Member (Embed IO) r =>
IO a -> Sem r (Either Text a)
tryAny (ByteString -> ByteString -> (ByteString, [ByteString])
accumulateMessages ByteString
buf (ByteString -> (ByteString, [ByteString]))
-> IO ByteString -> IO (ByteString, [ByteString])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Socket -> Int -> IO ByteString
Socket.recv Socket
socket Int
4096) Sem r (Either Text (ByteString, [ByteString]))
-> (Either Text (ByteString, [ByteString]) -> Sem r ()) -> Sem r ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Right (ByteString
newBuf, [ByteString]
complete) -> do
          (ByteString -> Sem r ()) -> [ByteString] -> Sem r ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ ByteString -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Member (Queue (InMessage Value)) r =>
ByteString -> Sem r ()
publishAsInMessage [ByteString]
complete
          ByteString -> Sem r ()
spin ByteString
newBuf
        Left Text
_ ->
          Sem r ()
forall (f :: * -> *). Applicative f => f ()
unit

writeQueue ::
   r .
  Members [Queue (OutMessage Value), Log, Embed IO] r =>
  Socket ->
  Sem r ()
writeQueue :: forall (r :: [(* -> *) -> * -> *]).
Members '[Queue (OutMessage Value), Log, Embed IO] r =>
Socket -> Sem r ()
writeQueue Socket
socket =
  Sem r (QueueResult (OutMessage Value))
forall d (r :: [(* -> *) -> * -> *]).
MemberWithError (Queue d) r =>
Sem r (QueueResult d)
Queue.read Sem r (QueueResult (OutMessage Value))
-> (QueueResult (OutMessage Value) -> Sem r ()) -> Sem r ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    QueueResult.Success (OutMessage Value
msg) ->
      IO () -> Sem r (Either Text ())
forall (r :: [(* -> *) -> * -> *]) a.
Member (Embed IO) r =>
IO a -> Sem r (Either Text a)
tryAny (Socket -> ByteString -> IO ()
Socket.sendAll Socket
socket (ByteString -> ByteString
forall l s. LazyStrict l s => l -> s
toStrict (Value -> ByteString
forall a. ToJSON a => a -> ByteString
Aeson.encode Value
msg) ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
"\n")) Sem r (Either Text ()) -> (Either Text () -> Sem r ()) -> Sem r ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Right () ->
          Socket -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
Members '[Queue (OutMessage Value), Log, Embed IO] r =>
Socket -> Sem r ()
writeQueue Socket
socket
        Left Text
err ->
          Text -> Sem r ()
forall (r :: [(* -> *) -> * -> *]).
(HasCallStack, Member Log r) =>
Text -> Sem r ()
Log.debug [exon|mpv write socket terminated: #{err}|]
    QueueResult (OutMessage Value)
_ ->
      Sem r ()
forall (f :: * -> *). Applicative f => f ()
unit

interpretQueues ::
  Members [Resource, Race, Embed IO] r =>
  TBMQueue (OutMessage fmt) ->
  TBMQueue (InMessage fmt) ->
  InterpretersFor [Queue (InMessage fmt), Queue (OutMessage fmt)] r
interpretQueues :: forall (r :: [(* -> *) -> * -> *]) fmt.
Members '[Resource, Race, Embed IO] r =>
TBMQueue (OutMessage fmt)
-> TBMQueue (InMessage fmt)
-> InterpretersFor
     '[Queue (InMessage fmt), Queue (OutMessage fmt)] r
interpretQueues TBMQueue (OutMessage fmt)
outQ TBMQueue (InMessage fmt)
inQ =
  TBMQueue (OutMessage fmt)
-> InterpreterFor (Queue (OutMessage fmt)) r
forall d (r :: [(* -> *) -> * -> *]).
Members '[Race, Embed IO] r =>
TBMQueue d -> InterpreterFor (Queue d) r
interpretQueueTBMWith TBMQueue (OutMessage fmt)
outQ (Sem (Queue (OutMessage fmt) : r) a -> Sem r a)
-> (Sem (Queue (InMessage fmt) : Queue (OutMessage fmt) : r) a
    -> Sem (Queue (OutMessage fmt) : r) a)
-> Sem (Queue (InMessage fmt) : Queue (OutMessage fmt) : r) a
-> Sem r a
forall b c a. (b -> c) -> (a -> b) -> a -> c
.
  TBMQueue (InMessage fmt)
-> InterpreterFor
     (Queue (InMessage fmt)) (Queue (OutMessage fmt) : r)
forall d (r :: [(* -> *) -> * -> *]).
Members '[Race, Embed IO] r =>
TBMQueue d -> InterpreterFor (Queue d) r
interpretQueueTBMWith TBMQueue (InMessage fmt)
inQ

withSocketQueues ::
  Members [Resource, Async, Race, Log, Embed IO] r =>
  MpvResources Value ->
  InterpretersFor [Queue (InMessage Value), Queue (OutMessage Value)] r
withSocketQueues :: forall (r :: [(* -> *) -> * -> *]).
Members '[Resource, Async, Race, Log, Embed IO] r =>
MpvResources Value
-> InterpretersFor
     '[Queue (InMessage Value), Queue (OutMessage Value)] r
withSocketQueues (MpvResources Socket
socket TBMQueue (OutMessage Value)
outQ TBMQueue (InMessage Value)
inQ TVar (Requests Value)
_) =
  TBMQueue (OutMessage Value)
-> TBMQueue (InMessage Value)
-> InterpretersFor
     '[Queue (InMessage Value), Queue (OutMessage Value)] r
forall (r :: [(* -> *) -> * -> *]) fmt.
Members '[Resource, Race, Embed IO] r =>
TBMQueue (OutMessage fmt)
-> TBMQueue (InMessage fmt)
-> InterpretersFor
     '[Queue (InMessage fmt), Queue (OutMessage fmt)] r
interpretQueues TBMQueue (OutMessage Value)
outQ TBMQueue (InMessage Value)
inQ (Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) a
 -> Sem r a)
-> (Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) a
    -> Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) a)
-> Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) a
-> Sem r a
forall b c a. (b -> c) -> (a -> b) -> a -> c
.
  Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) ()
-> Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) a
-> Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) a
forall (r :: [(* -> *) -> * -> *]) b a.
Members '[Resource, Race, Async] r =>
Sem r b -> Sem r a -> Sem r a
withAsync_ (Socket
-> Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) ()
forall (r :: [(* -> *) -> * -> *]).
Members '[Queue (InMessage Value), Embed IO] r =>
Socket -> Sem r ()
readQueue Socket
socket) (Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) a
 -> Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) a)
-> (Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) a
    -> Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) a)
-> Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) a
-> Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) a
forall b c a. (b -> c) -> (a -> b) -> a -> c
.
  Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) ()
-> Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) a
-> Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) a
forall (r :: [(* -> *) -> * -> *]) b a.
Members '[Resource, Race, Async] r =>
Sem r b -> Sem r a -> Sem r a
withAsync_ (Socket
-> Sem (Queue (InMessage Value) : Queue (OutMessage Value) : r) ()
forall (r :: [(* -> *) -> * -> *]).
Members '[Queue (OutMessage Value), Log, Embed IO] r =>
Socket -> Sem r ()
writeQueue Socket
socket)