{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE RecordWildCards #-}
#include "inline.hs"
module Streamly.Internal.Network.Socket
(
SockSpec (..)
, handleWithM
, handleWith
, accept
, connections
, read
, readWithBufferOf
, readChunksWithBufferOf
, readChunks
, toChunksWithBufferOf
, toChunks
, toBytes
, write
, writeWithBufferOf
, fromChunks
, fromBytesWithBufferOf
, fromBytes
, writeChunk
, writeChunks
, writeChunksWithBufferOf
, writeStrings
)
where
import Control.Concurrent (threadWaitWrite, rtsSupportsBoundThreads)
import Control.Monad.Catch (MonadCatch, finally, MonadMask)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad (when)
import Data.Word (Word8)
import Foreign.ForeignPtr (withForeignPtr)
import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr)
import Foreign.Ptr (minusPtr, plusPtr, Ptr, castPtr)
import Foreign.Storable (Storable(..))
import GHC.ForeignPtr (mallocPlainForeignPtrBytes)
import Network.Socket
(Socket, SocketOption(..), Family(..), SockAddr(..),
ProtocolNumber, withSocketsDo, SocketType(..), socket, bind,
setSocketOption, sendBuf, recvBuf)
#if MIN_VERSION_network(3,1,0)
import Network.Socket (withFdSocket)
#else
import Network.Socket (fdSocket)
#endif
import Prelude hiding (read)
import qualified Network.Socket as Net
import Streamly (MonadAsync)
import Streamly.Internal.Data.Unfold.Types (Unfold(..))
import Streamly.Internal.Memory.Array.Types (Array(..), lpackArraysChunksOf)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Stream.StreamK.Type (IsStream, mkStream)
import Streamly.Data.Fold (Fold)
import qualified Streamly.Data.Fold as FL
import qualified Streamly.Internal.Data.Fold.Types as FL
import qualified Streamly.Internal.Data.Unfold as UF
import qualified Streamly.Internal.Memory.Array as IA
import qualified Streamly.Memory.Array as A
import qualified Streamly.Internal.Memory.ArrayStream as AS
import qualified Streamly.Internal.Memory.Array.Types as A
import qualified Streamly.Prelude as S
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
{-# INLINE handleWithM #-}
handleWithM :: (MonadMask m, MonadIO m) => (Socket -> m ()) -> Socket -> m ()
handleWithM :: (Socket -> m ()) -> Socket -> m ()
handleWithM Socket -> m ()
f Socket
sk = m () -> m () -> m ()
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
finally (Socket -> m ()
f Socket
sk) (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Socket -> IO ()
Net.close Socket
sk))
{-# INLINE handleWith #-}
handleWith :: (IsStream t, MonadCatch m, MonadIO m)
=> Socket -> (Socket -> t m a) -> t m a
handleWith :: Socket -> (Socket -> t m a) -> t m a
handleWith Socket
sk Socket -> t m a
f = m () -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, MonadCatch m) =>
m b -> t m a -> t m a
S.finally (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Socket -> IO ()
Net.close Socket
sk) (Socket -> t m a
f Socket
sk)
data SockSpec = SockSpec
{
SockSpec -> Family
sockFamily :: !Family
, SockSpec -> SocketType
sockType :: !SocketType
, SockSpec -> ProtocolNumber
sockProto :: !ProtocolNumber
, SockSpec -> [(SocketOption, Int)]
sockOpts :: ![(SocketOption, Int)]
}
initListener :: Int -> SockSpec -> SockAddr -> IO Socket
initListener :: Int -> SockSpec -> SockAddr -> IO Socket
initListener Int
listenQLen SockSpec{[(SocketOption, Int)]
ProtocolNumber
SocketType
Family
sockOpts :: [(SocketOption, Int)]
sockProto :: ProtocolNumber
sockType :: SocketType
sockFamily :: Family
sockOpts :: SockSpec -> [(SocketOption, Int)]
sockProto :: SockSpec -> ProtocolNumber
sockType :: SockSpec -> SocketType
sockFamily :: SockSpec -> Family
..} SockAddr
addr =
IO Socket -> IO Socket
forall a. IO a -> IO a
withSocketsDo (IO Socket -> IO Socket) -> IO Socket -> IO Socket
forall a b. (a -> b) -> a -> b
$ do
Socket
sock <- Family -> SocketType -> ProtocolNumber -> IO Socket
socket Family
sockFamily SocketType
sockType ProtocolNumber
sockProto
((SocketOption, Int) -> IO ()) -> [(SocketOption, Int)] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\(SocketOption
opt, Int
val) -> Socket -> SocketOption -> Int -> IO ()
setSocketOption Socket
sock SocketOption
opt Int
val) [(SocketOption, Int)]
sockOpts
Socket -> SockAddr -> IO ()
bind Socket
sock SockAddr
addr
Socket -> Int -> IO ()
Net.listen Socket
sock Int
listenQLen
Socket -> IO Socket
forall (m :: * -> *) a. Monad m => a -> m a
return Socket
sock
{-# INLINE listenTuples #-}
listenTuples :: MonadIO m
=> Unfold m (Int, SockSpec, SockAddr) (Socket, SockAddr)
listenTuples :: Unfold m (Int, SockSpec, SockAddr) (Socket, SockAddr)
listenTuples = (Socket -> m (Step Socket (Socket, SockAddr)))
-> ((Int, SockSpec, SockAddr) -> m Socket)
-> Unfold m (Int, SockSpec, SockAddr) (Socket, SockAddr)
forall (m :: * -> *) a b s.
(s -> m (Step s b)) -> (a -> m s) -> Unfold m a b
Unfold Socket -> m (Step Socket (Socket, SockAddr))
forall (m :: * -> *).
MonadIO m =>
Socket -> m (Step Socket (Socket, SockAddr))
step (Int, SockSpec, SockAddr) -> m Socket
forall (m :: * -> *).
MonadIO m =>
(Int, SockSpec, SockAddr) -> m Socket
inject
where
inject :: (Int, SockSpec, SockAddr) -> m Socket
inject (Int
listenQLen, SockSpec
spec, SockAddr
addr) = IO Socket -> m Socket
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Socket -> m Socket) -> IO Socket -> m Socket
forall a b. (a -> b) -> a -> b
$ Int -> SockSpec -> SockAddr -> IO Socket
initListener Int
listenQLen SockSpec
spec SockAddr
addr
step :: Socket -> m (Step Socket (Socket, SockAddr))
step Socket
listener = do
(Socket, SockAddr)
r <- IO (Socket, SockAddr) -> m (Socket, SockAddr)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Socket, SockAddr) -> m (Socket, SockAddr))
-> IO (Socket, SockAddr) -> m (Socket, SockAddr)
forall a b. (a -> b) -> a -> b
$ Socket -> IO (Socket, SockAddr)
Net.accept Socket
listener
Step Socket (Socket, SockAddr)
-> m (Step Socket (Socket, SockAddr))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step Socket (Socket, SockAddr)
-> m (Step Socket (Socket, SockAddr)))
-> Step Socket (Socket, SockAddr)
-> m (Step Socket (Socket, SockAddr))
forall a b. (a -> b) -> a -> b
$ (Socket, SockAddr) -> Socket -> Step Socket (Socket, SockAddr)
forall s a. a -> s -> Step s a
D.Yield (Socket, SockAddr)
r Socket
listener
{-# INLINE accept #-}
accept :: MonadIO m => Unfold m (Int, SockSpec, SockAddr) Socket
accept :: Unfold m (Int, SockSpec, SockAddr) Socket
accept = ((Socket, SockAddr) -> Socket)
-> Unfold m (Int, SockSpec, SockAddr) (Socket, SockAddr)
-> Unfold m (Int, SockSpec, SockAddr) Socket
forall (m :: * -> *) b c a.
Monad m =>
(b -> c) -> Unfold m a b -> Unfold m a c
UF.map (Socket, SockAddr) -> Socket
forall a b. (a, b) -> a
fst Unfold m (Int, SockSpec, SockAddr) (Socket, SockAddr)
forall (m :: * -> *).
MonadIO m =>
Unfold m (Int, SockSpec, SockAddr) (Socket, SockAddr)
listenTuples
{-# INLINE recvConnectionTuplesWith #-}
recvConnectionTuplesWith :: MonadAsync m
=> Int -> SockSpec -> SockAddr -> SerialT m (Socket, SockAddr)
recvConnectionTuplesWith :: Int -> SockSpec -> SockAddr -> SerialT m (Socket, SockAddr)
recvConnectionTuplesWith Int
tcpListenQ SockSpec
spec SockAddr
addr = (Maybe Socket -> m (Maybe ((Socket, SockAddr), Maybe Socket)))
-> Maybe Socket -> SerialT m (Socket, SockAddr)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, MonadAsync m) =>
(b -> m (Maybe (a, b))) -> b -> t m a
S.unfoldrM Maybe Socket -> m (Maybe ((Socket, SockAddr), Maybe Socket))
forall (m :: * -> *).
MonadIO m =>
Maybe Socket -> m (Maybe ((Socket, SockAddr), Maybe Socket))
step Maybe Socket
forall a. Maybe a
Nothing
where
step :: Maybe Socket -> m (Maybe ((Socket, SockAddr), Maybe Socket))
step Maybe Socket
Nothing = do
Socket
listener <- IO Socket -> m Socket
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Socket -> m Socket) -> IO Socket -> m Socket
forall a b. (a -> b) -> a -> b
$ Int -> SockSpec -> SockAddr -> IO Socket
initListener Int
tcpListenQ SockSpec
spec SockAddr
addr
(Socket, SockAddr)
r <- IO (Socket, SockAddr) -> m (Socket, SockAddr)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Socket, SockAddr) -> m (Socket, SockAddr))
-> IO (Socket, SockAddr) -> m (Socket, SockAddr)
forall a b. (a -> b) -> a -> b
$ Socket -> IO (Socket, SockAddr)
Net.accept Socket
listener
Maybe ((Socket, SockAddr), Maybe Socket)
-> m (Maybe ((Socket, SockAddr), Maybe Socket))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ((Socket, SockAddr), Maybe Socket)
-> m (Maybe ((Socket, SockAddr), Maybe Socket)))
-> Maybe ((Socket, SockAddr), Maybe Socket)
-> m (Maybe ((Socket, SockAddr), Maybe Socket))
forall a b. (a -> b) -> a -> b
$ ((Socket, SockAddr), Maybe Socket)
-> Maybe ((Socket, SockAddr), Maybe Socket)
forall a. a -> Maybe a
Just ((Socket, SockAddr)
r, Socket -> Maybe Socket
forall a. a -> Maybe a
Just Socket
listener)
step (Just Socket
listener) = do
(Socket, SockAddr)
r <- IO (Socket, SockAddr) -> m (Socket, SockAddr)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Socket, SockAddr) -> m (Socket, SockAddr))
-> IO (Socket, SockAddr) -> m (Socket, SockAddr)
forall a b. (a -> b) -> a -> b
$ Socket -> IO (Socket, SockAddr)
Net.accept Socket
listener
Maybe ((Socket, SockAddr), Maybe Socket)
-> m (Maybe ((Socket, SockAddr), Maybe Socket))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ((Socket, SockAddr), Maybe Socket)
-> m (Maybe ((Socket, SockAddr), Maybe Socket)))
-> Maybe ((Socket, SockAddr), Maybe Socket)
-> m (Maybe ((Socket, SockAddr), Maybe Socket))
forall a b. (a -> b) -> a -> b
$ ((Socket, SockAddr), Maybe Socket)
-> Maybe ((Socket, SockAddr), Maybe Socket)
forall a. a -> Maybe a
Just ((Socket, SockAddr)
r, Socket -> Maybe Socket
forall a. a -> Maybe a
Just Socket
listener)
{-# INLINE connections #-}
connections :: MonadAsync m => Int -> SockSpec -> SockAddr -> SerialT m Socket
connections :: Int -> SockSpec -> SockAddr -> SerialT m Socket
connections Int
tcpListenQ SockSpec
spec SockAddr
addr = (Socket, SockAddr) -> Socket
forall a b. (a, b) -> a
fst ((Socket, SockAddr) -> Socket)
-> SerialT m (Socket, SockAddr) -> SerialT m Socket
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> SockSpec -> SockAddr -> SerialT m (Socket, SockAddr)
forall (m :: * -> *).
MonadAsync m =>
Int -> SockSpec -> SockAddr -> SerialT m (Socket, SockAddr)
recvConnectionTuplesWith Int
tcpListenQ SockSpec
spec SockAddr
addr
{-# INLINABLE readArrayUptoWith #-}
readArrayUptoWith
:: (h -> Ptr Word8 -> Int -> IO Int)
-> Int
-> h
-> IO (Array Word8)
readArrayUptoWith :: (h -> Ptr Word8 -> Int -> IO Int) -> Int -> h -> IO (Array Word8)
readArrayUptoWith h -> Ptr Word8 -> Int -> IO Int
f Int
size h
h = do
ForeignPtr Word8
ptr <- Int -> IO (ForeignPtr Word8)
forall a. Int -> IO (ForeignPtr a)
mallocPlainForeignPtrBytes Int
size
ForeignPtr Word8
-> (Ptr Word8 -> IO (Array Word8)) -> IO (Array Word8)
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr Word8
ptr ((Ptr Word8 -> IO (Array Word8)) -> IO (Array Word8))
-> (Ptr Word8 -> IO (Array Word8)) -> IO (Array Word8)
forall a b. (a -> b) -> a -> b
$ \Ptr Word8
p -> do
Int
n <- h -> Ptr Word8 -> Int -> IO Int
f h
h Ptr Word8
p Int
size
let v :: Array Word8
v = Array :: forall a. ForeignPtr a -> Ptr a -> Ptr a -> Array a
Array
{ aStart :: ForeignPtr Word8
aStart = ForeignPtr Word8
ptr
, aEnd :: Ptr Word8
aEnd = Ptr Word8
p Ptr Word8 -> Int -> Ptr Word8
forall a b. Ptr a -> Int -> Ptr b
`plusPtr` Int
n
, aBound :: Ptr Word8
aBound = Ptr Word8
p Ptr Word8 -> Int -> Ptr Word8
forall a b. Ptr a -> Int -> Ptr b
`plusPtr` Int
size
}
Array Word8 -> IO (Array Word8)
forall (m :: * -> *) a. Monad m => a -> m a
return Array Word8
v
{-# INLINABLE readArrayOf #-}
readArrayOf :: Int -> Socket -> IO (Array Word8)
readArrayOf :: Int -> Socket -> IO (Array Word8)
readArrayOf = (Socket -> Ptr Word8 -> Int -> IO Int)
-> Int -> Socket -> IO (Array Word8)
forall h.
(h -> Ptr Word8 -> Int -> IO Int) -> Int -> h -> IO (Array Word8)
readArrayUptoWith Socket -> Ptr Word8 -> Int -> IO Int
recvBuf
waitWhen0 :: Int -> Socket -> IO ()
waitWhen0 :: Int -> Socket -> IO ()
waitWhen0 Int
0 Socket
s = Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
rtsSupportsBoundThreads (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
#if MIN_VERSION_network(3,1,0)
Socket -> (ProtocolNumber -> IO ()) -> IO ()
forall r. Socket -> (ProtocolNumber -> IO r) -> IO r
withFdSocket Socket
s ((ProtocolNumber -> IO ()) -> IO ())
-> (ProtocolNumber -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ProtocolNumber
fd -> Fd -> IO ()
threadWaitWrite (Fd -> IO ()) -> Fd -> IO ()
forall a b. (a -> b) -> a -> b
$ ProtocolNumber -> Fd
forall a b. (Integral a, Num b) => a -> b
fromIntegral ProtocolNumber
fd
#elif MIN_VERSION_network(3,0,0)
fdSocket s >>= threadWaitWrite . fromIntegral
#else
let fd = fdSocket s in threadWaitWrite $ fromIntegral fd
#endif
waitWhen0 Int
_ Socket
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
sendAll :: Socket -> Ptr Word8 -> Int -> IO ()
sendAll :: Socket -> Ptr Word8 -> Int -> IO ()
sendAll Socket
_ Ptr Word8
_ Int
len | Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
sendAll Socket
s Ptr Word8
p Int
len = do
Int
sent <- Socket -> Ptr Word8 -> Int -> IO Int
sendBuf Socket
s Ptr Word8
p Int
len
Int -> Socket -> IO ()
waitWhen0 Int
sent Socket
s
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
sent Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Socket -> Ptr Word8 -> Int -> IO ()
sendAll Socket
s (Ptr Word8
p Ptr Word8 -> Int -> Ptr Word8
forall a b. Ptr a -> Int -> Ptr b
`plusPtr` Int
sent) (Int
len Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
sent)
{-# INLINABLE writeArrayWith #-}
writeArrayWith :: Storable a
=> (h -> Ptr Word8 -> Int -> IO ())
-> h
-> Array a
-> IO ()
writeArrayWith :: (h -> Ptr Word8 -> Int -> IO ()) -> h -> Array a -> IO ()
writeArrayWith h -> Ptr Word8 -> Int -> IO ()
_ h
_ Array a
arr | Array a -> Int
forall a. Storable a => Array a -> Int
A.length Array a
arr Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
writeArrayWith h -> Ptr Word8 -> Int -> IO ()
f h
h Array{Ptr a
ForeignPtr a
aBound :: Ptr a
aEnd :: Ptr a
aStart :: ForeignPtr a
aBound :: forall a. Array a -> Ptr a
aEnd :: forall a. Array a -> Ptr a
aStart :: forall a. Array a -> ForeignPtr a
..} = ForeignPtr a -> (Ptr a -> IO ()) -> IO ()
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr a
aStart ((Ptr a -> IO ()) -> IO ()) -> (Ptr a -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Ptr a
p ->
h -> Ptr Word8 -> Int -> IO ()
f h
h (Ptr a -> Ptr Word8
forall a b. Ptr a -> Ptr b
castPtr Ptr a
p) Int
aLen
where
aLen :: Int
aLen =
let p :: Ptr a
p = ForeignPtr a -> Ptr a
forall a. ForeignPtr a -> Ptr a
unsafeForeignPtrToPtr ForeignPtr a
aStart
in Ptr a
aEnd Ptr a -> Ptr a -> Int
forall a b. Ptr a -> Ptr b -> Int
`minusPtr` Ptr a
p
{-# INLINABLE writeChunk #-}
writeChunk :: Storable a => Socket -> Array a -> IO ()
writeChunk :: Socket -> Array a -> IO ()
writeChunk = (Socket -> Ptr Word8 -> Int -> IO ()) -> Socket -> Array a -> IO ()
forall a h.
Storable a =>
(h -> Ptr Word8 -> Int -> IO ()) -> h -> Array a -> IO ()
writeArrayWith Socket -> Ptr Word8 -> Int -> IO ()
sendAll
{-# INLINABLE _readChunksUptoWith #-}
_readChunksUptoWith :: (IsStream t, MonadIO m)
=> (Int -> h -> IO (Array Word8))
-> Int -> h -> t m (Array Word8)
_readChunksUptoWith :: (Int -> h -> IO (Array Word8)) -> Int -> h -> t m (Array Word8)
_readChunksUptoWith Int -> h -> IO (Array Word8)
f Int
size h
h = t m (Array Word8)
go
where
go :: t m (Array Word8)
go = (forall r.
State Stream m (Array Word8)
-> (Array Word8 -> t m (Array Word8) -> m r)
-> (Array Word8 -> m r)
-> m r
-> m r)
-> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
State Stream m (Array Word8)
-> (Array Word8 -> t m (Array Word8) -> m r)
-> (Array Word8 -> m r)
-> m r
-> m r)
-> t m (Array Word8))
-> (forall r.
State Stream m (Array Word8)
-> (Array Word8 -> t m (Array Word8) -> m r)
-> (Array Word8 -> m r)
-> m r
-> m r)
-> t m (Array Word8)
forall a b. (a -> b) -> a -> b
$ \State Stream m (Array Word8)
_ Array Word8 -> t m (Array Word8) -> m r
yld Array Word8 -> m r
_ m r
stp -> do
Array Word8
arr <- IO (Array Word8) -> m (Array Word8)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Array Word8) -> m (Array Word8))
-> IO (Array Word8) -> m (Array Word8)
forall a b. (a -> b) -> a -> b
$ Int -> h -> IO (Array Word8)
f Int
size h
h
if Array Word8 -> Int
forall a. Storable a => Array a -> Int
A.length Array Word8
arr Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
then m r
stp
else Array Word8 -> t m (Array Word8) -> m r
yld Array Word8
arr t m (Array Word8)
go
{-# INLINE_NORMAL toChunksWithBufferOf #-}
toChunksWithBufferOf :: (IsStream t, MonadIO m)
=> Int -> Socket -> t m (Array Word8)
toChunksWithBufferOf :: Int -> Socket -> t m (Array Word8)
toChunksWithBufferOf Int
size Socket
h = Stream m (Array Word8) -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD ((State Stream m (Array Word8) -> () -> m (Step () (Array Word8)))
-> () -> Stream m (Array Word8)
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State Stream m (Array Word8) -> () -> m (Step () (Array Word8))
forall (m :: * -> *) p p.
MonadIO m =>
p -> p -> m (Step () (Array Word8))
step ())
where
{-# INLINE_LATE step #-}
step :: p -> p -> m (Step () (Array Word8))
step p
_ p
_ = do
Array Word8
arr <- IO (Array Word8) -> m (Array Word8)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Array Word8) -> m (Array Word8))
-> IO (Array Word8) -> m (Array Word8)
forall a b. (a -> b) -> a -> b
$ Int -> Socket -> IO (Array Word8)
readArrayOf Int
size Socket
h
Step () (Array Word8) -> m (Step () (Array Word8))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step () (Array Word8) -> m (Step () (Array Word8)))
-> Step () (Array Word8) -> m (Step () (Array Word8))
forall a b. (a -> b) -> a -> b
$
case Array Word8 -> Int
forall a. Storable a => Array a -> Int
A.length Array Word8
arr of
Int
0 -> Step () (Array Word8)
forall s a. Step s a
D.Stop
Int
_ -> Array Word8 -> () -> Step () (Array Word8)
forall s a. a -> s -> Step s a
D.Yield Array Word8
arr ()
{-# INLINE toChunks #-}
toChunks :: (IsStream t, MonadIO m) => Socket -> t m (Array Word8)
toChunks :: Socket -> t m (Array Word8)
toChunks = Int -> Socket -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadIO m) =>
Int -> Socket -> t m (Array Word8)
toChunksWithBufferOf Int
A.defaultChunkSize
{-# INLINE_NORMAL readChunksWithBufferOf #-}
readChunksWithBufferOf :: MonadIO m => Unfold m (Int, Socket) (Array Word8)
readChunksWithBufferOf :: Unfold m (Int, Socket) (Array Word8)
readChunksWithBufferOf = ((Int, Socket) -> m (Step (Int, Socket) (Array Word8)))
-> ((Int, Socket) -> m (Int, Socket))
-> Unfold m (Int, Socket) (Array Word8)
forall (m :: * -> *) a b s.
(s -> m (Step s b)) -> (a -> m s) -> Unfold m a b
Unfold (Int, Socket) -> m (Step (Int, Socket) (Array Word8))
forall (m :: * -> *).
MonadIO m =>
(Int, Socket) -> m (Step (Int, Socket) (Array Word8))
step (Int, Socket) -> m (Int, Socket)
forall (m :: * -> *) a. Monad m => a -> m a
return
where
{-# INLINE_LATE step #-}
step :: (Int, Socket) -> m (Step (Int, Socket) (Array Word8))
step (Int
size, Socket
h) = do
Array Word8
arr <- IO (Array Word8) -> m (Array Word8)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Array Word8) -> m (Array Word8))
-> IO (Array Word8) -> m (Array Word8)
forall a b. (a -> b) -> a -> b
$ Int -> Socket -> IO (Array Word8)
readArrayOf Int
size Socket
h
Step (Int, Socket) (Array Word8)
-> m (Step (Int, Socket) (Array Word8))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Int, Socket) (Array Word8)
-> m (Step (Int, Socket) (Array Word8)))
-> Step (Int, Socket) (Array Word8)
-> m (Step (Int, Socket) (Array Word8))
forall a b. (a -> b) -> a -> b
$
case Array Word8 -> Int
forall a. Storable a => Array a -> Int
A.length Array Word8
arr of
Int
0 -> Step (Int, Socket) (Array Word8)
forall s a. Step s a
D.Stop
Int
_ -> Array Word8 -> (Int, Socket) -> Step (Int, Socket) (Array Word8)
forall s a. a -> s -> Step s a
D.Yield Array Word8
arr (Int
size, Socket
h)
{-# INLINE readChunks #-}
readChunks :: MonadIO m => Unfold m Socket (Array Word8)
readChunks :: Unfold m Socket (Array Word8)
readChunks = Unfold m (Int, Socket) (Array Word8)
-> Int -> Unfold m Socket (Array Word8)
forall (m :: * -> *) a b c. Unfold m (a, b) c -> a -> Unfold m b c
UF.supplyFirst Unfold m (Int, Socket) (Array Word8)
forall (m :: * -> *).
MonadIO m =>
Unfold m (Int, Socket) (Array Word8)
readChunksWithBufferOf Int
A.defaultChunkSize
{-# INLINE toBytes #-}
toBytes :: (IsStream t, MonadIO m) => Socket -> t m Word8
toBytes :: Socket -> t m Word8
toBytes = t m (Array Word8) -> t m Word8
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m, Storable a) =>
t m (Array a) -> t m a
AS.concat (t m (Array Word8) -> t m Word8)
-> (Socket -> t m (Array Word8)) -> Socket -> t m Word8
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadIO m) =>
Socket -> t m (Array Word8)
toChunks
{-# INLINE readWithBufferOf #-}
readWithBufferOf :: MonadIO m => Unfold m (Int, Socket) Word8
readWithBufferOf :: Unfold m (Int, Socket) Word8
readWithBufferOf = Unfold m (Int, Socket) (Array Word8)
-> Unfold m (Array Word8) Word8 -> Unfold m (Int, Socket) Word8
forall (m :: * -> *) a b c.
Monad m =>
Unfold m a b -> Unfold m b c -> Unfold m a c
UF.concat Unfold m (Int, Socket) (Array Word8)
forall (m :: * -> *).
MonadIO m =>
Unfold m (Int, Socket) (Array Word8)
readChunksWithBufferOf Unfold m (Array Word8) Word8
forall (m :: * -> *) a.
(Monad m, Storable a) =>
Unfold m (Array a) a
A.read
{-# INLINE read #-}
read :: MonadIO m => Unfold m Socket Word8
read :: Unfold m Socket Word8
read = Unfold m (Int, Socket) Word8 -> Int -> Unfold m Socket Word8
forall (m :: * -> *) a b c. Unfold m (a, b) c -> a -> Unfold m b c
UF.supplyFirst Unfold m (Int, Socket) Word8
forall (m :: * -> *). MonadIO m => Unfold m (Int, Socket) Word8
readWithBufferOf Int
A.defaultChunkSize
{-# INLINE fromChunks #-}
fromChunks :: (MonadIO m, Storable a)
=> Socket -> SerialT m (Array a) -> m ()
fromChunks :: Socket -> SerialT m (Array a) -> m ()
fromChunks Socket
h = (Array a -> m ()) -> SerialT m (Array a) -> m ()
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> SerialT m a -> m ()
S.mapM_ (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (Array a -> IO ()) -> Array a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket -> Array a -> IO ()
forall a. Storable a => Socket -> Array a -> IO ()
writeChunk Socket
h)
{-# INLINE writeChunks #-}
writeChunks :: (MonadIO m, Storable a) => Socket -> Fold m (Array a) ()
writeChunks :: Socket -> Fold m (Array a) ()
writeChunks Socket
h = (Array a -> m ()) -> Fold m (Array a) ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> Fold m a ()
FL.drainBy (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (Array a -> IO ()) -> Array a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket -> Array a -> IO ()
forall a. Storable a => Socket -> Array a -> IO ()
writeChunk Socket
h)
{-# INLINE writeChunksWithBufferOf #-}
writeChunksWithBufferOf :: (MonadIO m, Storable a)
=> Int -> Socket -> Fold m (Array a) ()
writeChunksWithBufferOf :: Int -> Socket -> Fold m (Array a) ()
writeChunksWithBufferOf Int
n Socket
h = Int -> Fold m (Array a) () -> Fold m (Array a) ()
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Fold m (Array a) () -> Fold m (Array a) ()
lpackArraysChunksOf Int
n (Socket -> Fold m (Array a) ()
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Socket -> Fold m (Array a) ()
writeChunks Socket
h)
{-# INLINE writeStrings #-}
writeStrings :: MonadIO m
=> (SerialT m Char -> SerialT m Word8) -> Socket -> Fold m String ()
writeStrings :: (SerialT m Char -> SerialT m Word8) -> Socket -> Fold m String ()
writeStrings SerialT m Char -> SerialT m Word8
encode Socket
h =
(String -> m (Array Word8))
-> Fold m (Array Word8) () -> Fold m String ()
forall (m :: * -> *) a b r.
Monad m =>
(a -> m b) -> Fold m b r -> Fold m a r
FL.lmapM (SerialT m Word8 -> m (Array Word8)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
SerialT m a -> m (Array a)
IA.fromStream (SerialT m Word8 -> m (Array Word8))
-> (String -> SerialT m Word8) -> String -> m (Array Word8)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT m Char -> SerialT m Word8
encode (SerialT m Char -> SerialT m Word8)
-> (String -> SerialT m Char) -> String -> SerialT m Word8
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> SerialT m Char
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
S.fromList) (Socket -> Fold m (Array Word8) ()
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Socket -> Fold m (Array a) ()
writeChunks Socket
h)
{-# INLINE fromBytesWithBufferOf #-}
fromBytesWithBufferOf :: MonadIO m => Int -> Socket -> SerialT m Word8 -> m ()
fromBytesWithBufferOf :: Int -> Socket -> SerialT m Word8 -> m ()
fromBytesWithBufferOf Int
n Socket
h SerialT m Word8
m = Socket -> SerialT m (Array Word8) -> m ()
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Socket -> SerialT m (Array a) -> m ()
fromChunks Socket
h (SerialT m (Array Word8) -> m ())
-> SerialT m (Array Word8) -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> SerialT m Word8 -> SerialT m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m, Storable a) =>
Int -> t m a -> t m (Array a)
AS.arraysOf Int
n SerialT m Word8
m
{-# INLINE writeWithBufferOf #-}
writeWithBufferOf :: MonadIO m => Int -> Socket -> Fold m Word8 ()
writeWithBufferOf :: Int -> Socket -> Fold m Word8 ()
writeWithBufferOf Int
n Socket
h = Int
-> Fold m Word8 (Array Word8)
-> Fold m (Array Word8) ()
-> Fold m Word8 ()
forall (m :: * -> *) a b c.
Monad m =>
Int -> Fold m a b -> Fold m b c -> Fold m a c
FL.lchunksOf Int
n (Int -> Fold m Word8 (Array Word8)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Fold m a (Array a)
A.writeNUnsafe Int
n) (Socket -> Fold m (Array Word8) ()
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Socket -> Fold m (Array a) ()
writeChunks Socket
h)
{-# INLINE fromBytes #-}
fromBytes :: MonadIO m => Socket -> SerialT m Word8 -> m ()
fromBytes :: Socket -> SerialT m Word8 -> m ()
fromBytes = Int -> Socket -> SerialT m Word8 -> m ()
forall (m :: * -> *).
MonadIO m =>
Int -> Socket -> SerialT m Word8 -> m ()
fromBytesWithBufferOf Int
A.defaultChunkSize
{-# INLINE write #-}
write :: MonadIO m => Socket -> Fold m Word8 ()
write :: Socket -> Fold m Word8 ()
write = Int -> Socket -> Fold m Word8 ()
forall (m :: * -> *). MonadIO m => Int -> Socket -> Fold m Word8 ()
writeWithBufferOf Int
A.defaultChunkSize