{-# LANGUAGE DeriveAnyClass #-}
module Data.Binary.IO
(
ReaderError (..)
, Reader
, newReader
, newReaderWith
, Writer
, newWriter
, newWriterWith
, newPipe
, Duplex (..)
, newDuplex
, newDuplexWith
, CanGet (..)
, read
, isEmpty
, CanPut (..)
, write
)
where
import Prelude hiding (read)
import Control.Concurrent.MVar (MVar, modifyMVar, newMVar)
import qualified Control.Exception as Exception
import Control.Monad (join, unless)
import Data.Bifunctor (bimap)
import qualified Data.Binary as Binary
import qualified Data.Binary.Get as Binary.Get
import Data.Binary.IO.Internal.AwaitNotify (newAwaitNotify, runAwait, runNotify)
import qualified Data.Binary.Put as Binary.Put
import qualified Data.ByteString as ByteString.Strict
import qualified Data.ByteString.Lazy as ByteString
import Data.ByteString.Lazy.Internal (ByteString (Chunk, Empty))
import Data.IORef (atomicModifyIORef', mkWeakIORef, newIORef)
import qualified Deque.Strict as Deque
import System.IO (Handle, hSetBinaryMode)
import System.IO.Unsafe (unsafeInterleaveIO)
import System.Mem.Weak (deRefWeak)
data ReaderError = ReaderGetError
{ readerErrorRemaining :: !ByteString.ByteString
, readerErrorOffset :: !Binary.Get.ByteOffset
, readerErrorInput :: !ByteString.ByteString
, readerErrorMessage :: !String
}
deriving (Show, Exception.Exception)
newtype StationaryReader = StationaryReader ByteString.ByteString
runStationaryReader
:: StationaryReader
-> Binary.Get.Get a
-> Either ReaderError (StationaryReader, a)
runStationaryReader (StationaryReader stream) getter =
bimap withError withSuccess (Binary.Get.runGetOrFail getter stream)
where
withError (remainingBody, offset, errorMessage) =
ReaderGetError
{ readerErrorRemaining = remainingBody
, readerErrorOffset = offset
, readerErrorInput = stream
, readerErrorMessage = errorMessage
}
withSuccess (tailStream, _, value) = (StationaryReader tailStream, value)
newStationaryReader :: Handle -> IO StationaryReader
newStationaryReader handle = do
hSetBinaryMode handle True
StationaryReader <$> ByteString.hGetContents handle
newStationaryReaderWith :: IO ByteString.Strict.ByteString -> IO StationaryReader
newStationaryReaderWith get =
StationaryReader <$> mkStream get
newtype Reader = Reader (MVar StationaryReader)
runReader :: Reader -> Binary.Get a -> IO a
runReader (Reader readerVar) getter =
modifyMVar readerVar $ \posReader ->
either Exception.throwIO pure (runStationaryReader posReader getter)
newReader
:: Handle
-> IO Reader
newReader handle = do
posReader <- newStationaryReader handle
Reader <$> newMVar posReader
newReaderWith
:: IO ByteString.Strict.ByteString
-> IO Reader
newReaderWith get = do
posReader <- newStationaryReaderWith get
Reader <$> newMVar posReader
newtype Writer = Writer (ByteString.Strict.ByteString -> IO ())
runWriter :: Writer -> Binary.Put -> IO ()
runWriter (Writer write) putter =
write (ByteString.toStrict (Binary.Put.runPut putter))
newWriter
:: Handle
-> Writer
newWriter handle =
Writer (ByteString.Strict.hPut handle)
newWriterWith
:: (ByteString.Strict.ByteString -> IO ())
-> Writer
newWriterWith =
Writer
newPipe :: IO (Reader, Writer)
newPipe = do
chan <- newIORef mempty
weakChan <- mkWeakIORef chan (pure ())
(await, notify) <- newAwaitNotify
let
read = do
mbChan <- deRefWeak weakChan
case mbChan of
Nothing -> pure ByteString.Strict.empty
Just chan -> join $
atomicModifyIORef' chan $ \queue ->
case Deque.uncons queue of
Just (elem, queue) -> (queue, pure elem)
Nothing -> (queue, runAwait await >> read)
write msg =
unless (ByteString.Strict.null msg) $ do
atomicModifyIORef' chan $ \queue ->
(Deque.snoc msg queue, ())
runNotify notify
reader <- newReaderWith read
let writer = newWriterWith write
pure (reader, writer)
data Duplex = Duplex
{ duplexWriter :: !Writer
, duplexReader :: !Reader
}
newDuplex
:: Handle
-> IO Duplex
newDuplex handle =
Duplex (newWriter handle) <$> newReader handle
newDuplexWith
:: IO ByteString.Strict.ByteString
-> (ByteString.Strict.ByteString -> IO ())
-> IO Duplex
newDuplexWith get push =
Duplex (newWriterWith push) <$> newReaderWith get
class CanGet r where
runGet
:: r
-> Binary.Get a
-> IO a
instance CanGet Reader where
runGet = runReader
instance CanGet Duplex where
runGet = runGet . duplexReader
read
:: (CanGet r, Binary.Binary a)
=> r
-> IO a
read reader =
runGet reader Binary.get
isEmpty :: CanGet r => r -> IO Bool
isEmpty reader = runGet reader Binary.Get.isEmpty
class CanPut w where
runPut
:: w
-> Binary.Put
-> IO ()
instance CanPut Handle where
runPut handle putter =
ByteString.Strict.hPut handle (ByteString.toStrict (Binary.Put.runPut putter))
instance CanPut Writer where
runPut = runWriter
instance CanPut Duplex where
runPut = runPut . duplexWriter
write
:: (CanPut w, Binary.Binary a)
=> w
-> a
-> IO ()
write writer value =
runPut writer (Binary.put value)
mkStream :: IO ByteString.Strict.ByteString -> IO ByteString.ByteString
mkStream get =
readLazily
where
read = do
chunk <- get
if ByteString.Strict.null chunk then
pure Empty
else
Chunk chunk <$> readLazily
readLazily = unsafeInterleaveIO read