-- | Internal implementation of the @io-streams@ library, intended for library
-- writers
--
-- Library users should use the interface provided by "System.IO.Streams"

{-# LANGUAGE BangPatterns               #-}
{-# LANGUAGE DeriveDataTypeable         #-}
{-# LANGUAGE FlexibleInstances          #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE OverloadedStrings          #-}
{-# LANGUAGE TypeSynonymInstances       #-}

module System.IO.Streams.Internal
  ( -- * Types
    SP(..)
  , StreamPair

    -- * About pushback
    -- $pushback

    -- * Input and output streams
  , InputStream(..)
  , OutputStream(..)

    -- * Primitive stream operations
  , read
  , unRead
  , peek
  , write
  , atEOF

    -- * Building streams
  , makeInputStream
  , makeOutputStream
  , appendInputStream
  , concatInputStreams

    -- * Connecting streams
  , connect
  , connectTo
  , supply
  , supplyTo

    -- * Thread safety
  , lockingInputStream
  , lockingOutputStream

    -- * Utility streams
  , nullInput
  , nullOutput

    -- * Generator monad
  , Generator
  , fromGenerator
  , yield

    -- * Consumer monad
  , Consumer
  , fromConsumer
  , await
  ) where

------------------------------------------------------------------------------
import           Control.Applicative      (Applicative (..), (<$>))
import           Control.Concurrent       (newMVar, withMVar)
import           Control.Exception        (throwIO)
import           Control.Monad            (when)
import           Control.Monad.IO.Class   (MonadIO (..))
import           Data.ByteString.Char8    (ByteString)
import qualified Data.ByteString.Char8    as S
import qualified Data.ByteString.Internal as S
import qualified Data.ByteString.Unsafe   as S
import           Data.IORef               (newIORef, readIORef, writeIORef)
import           Data.Maybe               (isNothing)
import           Data.Typeable            (Typeable)
import           Data.Word                (Word8)
import           Foreign.Marshal.Utils    (copyBytes)
import           Foreign.Ptr              (castPtr)
import qualified GHC.IO.Buffer            as H
import qualified GHC.IO.BufferedIO        as H
import qualified GHC.IO.Device            as H
import           GHC.IO.Exception         (unsupportedOperation)
import           Prelude                  hiding (read)


------------------------------------------------------------------------------
-- | A strict pair type.
data SP a b = SP !a !b
  deriving (Typeable)


------------------------------------------------------------------------------
-- | An 'InputStream' generates values of type @c@ in the 'IO' monad.
--
--  Two primitive operations are defined on 'InputStream':
--
-- * @'read' :: 'InputStream' c -> 'IO' ('Maybe' c)@ reads a value from the stream,
-- where \"end of stream\" is signaled by 'read' returning 'Nothing'.
--
-- * @'unRead' :: c -> 'InputStream' c -> 'IO' ()@ \"pushes back\" a value to the
-- stream.
--
-- It is intended that 'InputStream's obey the following law:
--
-- @'unRead' c stream >> 'read' stream === 'return' ('Just' c)@
--
data InputStream a = InputStream {
      _read   :: IO (Maybe a)
    , _unRead :: a -> IO ()
    } deriving (Typeable)


------------------------------------------------------------------------------
-- | An 'OutputStream' consumes values of type @c@ in the 'IO' monad.
-- The only primitive operation defined on 'OutputStream' is:
--
-- * @'write' :: 'Maybe' c -> 'OutputStream' c -> 'IO' ()@
--
-- Values of type @c@ are written in an 'OutputStream' by wrapping them in
-- 'Just', and the end of the stream is indicated by supplying 'Nothing'.
--
-- If you supply a value after a 'Nothing', the behavior is defined by the
-- implementer of the given 'OutputStream'. (All 'OutputStream' definitions in
-- this library will simply discard the extra input.)
--
data OutputStream a = OutputStream {
      _write :: Maybe a -> IO ()
    } deriving (Typeable)


------------------------------------------------------------------------------
-- | Reads one value from an 'InputStream'.
--
-- Returns either a value wrapped in a 'Just', or 'Nothing' if the end of the
-- stream is reached.
read :: InputStream a -> IO (Maybe a)
read = _read
{-# INLINE read #-}


------------------------------------------------------------------------------
-- | Feeds a value to an 'OutputStream'. Values of type @c@ are written in an
-- 'OutputStream' by wrapping them in 'Just', and the end of the stream is
-- indicated by supplying 'Nothing'.
--
write :: Maybe a -> OutputStream a -> IO ()
write = flip _write
{-# INLINE write #-}


------------------------------------------------------------------------------
-- | Observes the first value from an 'InputStream' without consuming it.
--
-- Returns 'Nothing' if the 'InputStream' is empty. 'peek' satisfies the
-- following law:
--
-- @
-- Streams.'peek' stream >> Streams.'read' stream === Streams.'read' stream
-- @
peek :: InputStream a -> IO (Maybe a)
peek s = do
    x <- read s
    maybe (return $! ()) (_unRead s) x
    return x


------------------------------------------------------------------------------
-- | Pushes a value back onto an input stream. 'read' and 'unRead' should
-- satisfy the following law, with the possible exception of side effects:
--
-- @
-- Streams.'unRead' c stream >> Streams.'read' stream === 'return' ('Just' c)
-- @
--
-- Note that this could be used to add values back to the stream that were not
-- originally drawn from the stream.
unRead :: a -> InputStream a -> IO ()
unRead = flip _unRead


------------------------------------------------------------------------------
-- | Connects an 'InputStream' and 'OutputStream', supplying values from the
-- 'InputStream' to the 'OutputStream', and propagating the end-of-stream
-- message from the 'InputStream' through to the 'OutputStream'.
--
-- The connection ends when the 'InputStream' yields a 'Nothing'.
connect :: InputStream a -> OutputStream a -> IO ()
connect p q = loop
  where
    loop = do
        m <- read p
        maybe (write Nothing q)
              (const $ write m q >> loop)
              m
{-# INLINE connect #-}


------------------------------------------------------------------------------
-- | The 'connectTo' function is just @'flip' 'connect'@.
--
-- Useful for writing expressions like @fromList [1,2,3] >>= connectTo foo@.
--
connectTo :: OutputStream a -> InputStream a -> IO ()
connectTo = flip connect
{-# INLINE connectTo #-}


------------------------------------------------------------------------------
-- | Connects an 'InputStream' to an 'OutputStream' without passing the
-- end-of-stream notification through to the 'OutputStream'.
--
-- Use this to supply an 'OutputStream' with multiple 'InputStream's and use
-- 'connect' for the final 'InputStream' to finalize the 'OutputStream', like
-- so:
--
-- @
-- do Streams.'supply'  input1 output
--    Streams.'supply'  input2 output
--    Streams.'connect' input3 output
-- @
--
supply :: InputStream a -> OutputStream a -> IO ()
supply p q = loop
  where
    loop = do
        m <- read p
        maybe (return $! ())
              (const $ write m q >> loop)
              m
{-# INLINE supply #-}


------------------------------------------------------------------------------
-- | 'supply' with the arguments flipped.
supplyTo :: OutputStream a -> InputStream a -> IO ()
supplyTo = flip supply
{-# INLINE supplyTo #-}


------------------------------------------------------------------------------
-- | Creates an 'InputStream' from a value-producing action.
--
-- (@makeInputStream m@) calls the action @m@ each time you request a value
-- from the 'InputStream'. The given action is extended with the default
-- pushback mechanism (see "System.IO.Streams.Internal#pushback").
makeInputStream :: IO (Maybe a) -> IO (InputStream a)
makeInputStream m = do
    doneRef <- newIORef False
    pbRef   <- newIORef []
    return $! InputStream (grab doneRef pbRef) (pb pbRef)
  where
    grab doneRef pbRef = do
        l <- readIORef pbRef
        case l of
          []     -> do done <- readIORef doneRef
                       if done
                         then return Nothing
                         else do
                             x <- m
                             when (isNothing x) $ writeIORef doneRef True
                             return x
          (x:xs) -> writeIORef pbRef xs >> (return $! Just x)

    pb ref x = readIORef ref >>= \xs -> writeIORef ref (x:xs)
{-# INLINE makeInputStream #-}


------------------------------------------------------------------------------
-- | Creates an 'OutputStream' from a value-consuming action.
--
-- (@makeOutputStream f@) runs the computation @f@ on each value fed to it.
--
-- Since version 1.2.0.0, 'makeOutputStream' also ensures that output streams
-- no longer receive data once EOF is received (i.e. you can now assume that
-- makeOutputStream will feed your function @Nothing@ at most once.)
makeOutputStream :: (Maybe a -> IO ()) -> IO (OutputStream a)
makeOutputStream func = (OutputStream . go) <$> newIORef False
  where
    go closedRef !m = do
        closed <- readIORef closedRef
        if closed
          then return $! ()
          else do
            when (isNothing m) $ writeIORef closedRef True
            func m
{-# INLINE makeOutputStream #-}


------------------------------------------------------------------------------
-- | Converts an 'InputStream' into a thread-safe 'InputStream', at a slight
-- performance penalty.
--
-- For performance reasons, this library provides non-thread-safe streams by
-- default. Use the @locking@ functions to convert these streams into slightly
-- slower, but thread-safe, equivalents.
lockingInputStream :: InputStream a -> IO (InputStream a)
lockingInputStream s = do
    mv <- newMVar $! ()
    return $! InputStream (grab mv) (pb mv)

  where
    grab mv = withMVar mv $ const $ read s
    pb mv x = withMVar mv $ const $ unRead x s
{-# INLINE lockingInputStream #-}


------------------------------------------------------------------------------
-- | Converts an 'OutputStream' into a thread-safe 'OutputStream', at a slight
-- performance penalty.
--
-- For performance reasons, this library provides non-thread-safe streams by
-- default. Use the @locking@ functions to convert these streams into slightly
-- slower, but thread-safe, equivalents.
lockingOutputStream :: OutputStream a -> IO (OutputStream a)
lockingOutputStream s = do
    mv <- newMVar $! ()
    makeOutputStream $ f mv

  where
    f mv x = withMVar mv $ const $ write x s
{-# INLINE lockingOutputStream #-}


------------------------------------------------------------------------------
-- | An empty 'InputStream' that yields 'Nothing' immediately.
nullInput :: IO (InputStream a)
nullInput = makeInputStream $ return Nothing


------------------------------------------------------------------------------
-- | An empty 'OutputStream' that discards any input fed to it.
nullOutput :: IO (OutputStream a)
nullOutput = makeOutputStream $ const $ return $! ()


------------------------------------------------------------------------------
-- | 'appendInputStream' concatenates two 'InputStream's, analogous to ('++')
-- for lists.
--
-- The second 'InputStream' continues where the first 'InputStream' ends.
--
-- Note: values pushed back to 'appendInputStream' are not propagated to either
-- wrapped 'InputStream'.
appendInputStream :: InputStream a -> InputStream a -> IO (InputStream a)
appendInputStream s1 s2 = concatInputStreams [s1, s2]


------------------------------------------------------------------------------
-- | 'concatInputStreams' concatenates a list of 'InputStream's, analogous to
-- ('++') for lists.
--
-- Subsequent 'InputStream's continue where the previous one 'InputStream'
-- ends.
--
-- Note: values pushed back to the 'InputStream' returned by
-- 'concatInputStreams' are not propagated to any of the source
-- 'InputStream's.
concatInputStreams :: [InputStream a] -> IO (InputStream a)
concatInputStreams inputStreams = do
    ref <- newIORef inputStreams
    makeInputStream $! run ref

  where
    run ref = go
      where
        go = do
            streams <- readIORef ref
            case streams of
              []       -> return Nothing
              (s:rest) -> do
                  next <- read s
                  case next of
                    Nothing -> writeIORef ref rest >> go
                    Just _  -> return next


------------------------------------------------------------------------------
-- | Checks if an 'InputStream' is at end-of-stream.
atEOF :: InputStream a -> IO Bool
atEOF s = read s >>= maybe (return True) (\k -> unRead k s >> return False)


------------------------------------------------------------------------------
-- $pushback
-- #pushback#
--
-- Users can push a value back into an input stream using the 'unRead'
-- function. Usually this will use the default pushback mechanism which
-- provides a buffer for the stream. Some stream transformers, like
-- 'takeBytes', produce streams that send pushed-back values back to the
-- streams that they wrap. A function like 'System.IO.Streams.Combinators.map'
-- cannot do this because the types don't match up:
--
-- @
-- 'System.IO.Streams.Combinators.map' :: (a -> b) -> 'InputStream' a -> 'IO' ('InputStream' b)
-- @
--
-- A function will usually document if its pushback behaviour differs from the
-- default. No matter what the case, input streams should obey the following
-- law:
--
-- @
-- Streams.'unRead' c stream >> Streams.'read' stream === 'return' ('Just' c)
-- @


                 --------------------------------------------
                 -- Typeclass instances for Handle support --
                 --------------------------------------------

------------------------------------------------------------------------------
bUFSIZ :: Int
bUFSIZ = 32752


------------------------------------------------------------------------------
unsupported :: IO a
unsupported = throwIO unsupportedOperation


------------------------------------------------------------------------------
bufferToBS :: H.Buffer Word8 -> ByteString
bufferToBS buf = S.copy $! S.fromForeignPtr raw l sz
  where
    raw  = H.bufRaw buf
    l    = H.bufL buf
    r    = H.bufR buf
    sz   = r - l


------------------------------------------------------------------------------
instance H.RawIO (InputStream ByteString) where
    read is ptr n = read is >>= maybe (return 0) f
      where
        f s = S.unsafeUseAsCStringLen s $ \(cstr, l) -> do
                  let c = min n l
                  copyBytes ptr (castPtr cstr) c
                  return $! c

    readNonBlocking  _ _ _ = unsupported
    write            _ _ _ = unsupported
    writeNonBlocking _ _ _ = unsupported


------------------------------------------------------------------------------
instance H.RawIO (OutputStream ByteString) where
    read _ _ _             = unsupported
    readNonBlocking _ _ _  = unsupported
    write os ptr n         = S.packCStringLen (castPtr ptr, n) >>=
                             flip write os . Just
    writeNonBlocking _ _ _ = unsupported


------------------------------------------------------------------------------
-- | Internal convenience synonym for a pair of input\/output streams.
type StreamPair a = SP (InputStream a) (OutputStream a)

instance H.RawIO (StreamPair ByteString) where
    read (SP is _) ptr n   = H.read is ptr n
    readNonBlocking  _ _ _ = unsupported
    write (SP _ os) ptr n  = H.write os ptr n
    writeNonBlocking _ _ _ = unsupported


------------------------------------------------------------------------------
instance H.BufferedIO (OutputStream ByteString) where
    newBuffer !_ bs            = H.newByteBuffer bUFSIZ bs
    fillReadBuffer !_ _        = unsupported
    fillReadBuffer0 !_ _       = unsupported

    flushWriteBuffer !os !buf  = do
        write (Just $! bufferToBS buf) os
        emptyWriteBuffer buf

    flushWriteBuffer0 !os !buf = do
        let s = bufferToBS buf
        let l = S.length s
        write (Just s) os
        buf' <- emptyWriteBuffer buf
        return $! (l, buf')


------------------------------------------------------------------------------
instance H.BufferedIO (InputStream ByteString) where
    newBuffer !_ !bs        = H.newByteBuffer bUFSIZ bs
    fillReadBuffer !is !buf = H.readBuf is buf
    fillReadBuffer0 _ _    = unsupported
    flushWriteBuffer _ _   = unsupported
    flushWriteBuffer0 _ _  = unsupported


------------------------------------------------------------------------------
instance H.BufferedIO (StreamPair ByteString) where
    newBuffer !_ bs              = H.newByteBuffer bUFSIZ bs
    fillReadBuffer (SP is _)     = H.fillReadBuffer is
    fillReadBuffer0 _ _          = unsupported
    flushWriteBuffer (SP _ !os)  = H.flushWriteBuffer os
    flushWriteBuffer0 (SP _ !os) = H.flushWriteBuffer0 os


------------------------------------------------------------------------------
instance H.IODevice (OutputStream ByteString) where
  ready _ _ _ = return True
  close       = write Nothing
  devType _   = return H.Stream


------------------------------------------------------------------------------
instance H.IODevice (InputStream ByteString) where
  ready _ _ _ = return True
  close _     = return $! ()
  devType _   = return H.Stream


------------------------------------------------------------------------------
instance H.IODevice (StreamPair ByteString) where
  ready _ _ _     = return True
  close (SP _ os) = write Nothing os
  devType _       = return H.Stream


------------------------------------------------------------------------------
emptyWriteBuffer :: H.Buffer Word8
                 -> IO (H.Buffer Word8)
emptyWriteBuffer buf
    = return buf { H.bufL=0, H.bufR=0, H.bufState = H.WriteBuffer }


------------------------------------------------------------------------------
-- | A 'Generator' is a coroutine monad that can be used to define complex
-- 'InputStream's. You can cause a value of type @Just r@ to appear when the
-- 'InputStream' is read by calling 'yield':
--
-- @
-- g :: 'Generator' Int ()
-- g = do
--     Streams.'yield' 1
--     Streams.'yield' 2
--     Streams.'yield' 3
-- @
--
-- A 'Generator' can be turned into an 'InputStream' by calling
-- 'fromGenerator':
--
-- @
-- m :: 'IO' ['Int']
-- m = Streams.'fromGenerator' g >>= Streams.'System.IO.Streams.toList'     \-\- value returned is [1,2,3]
-- @
--
-- You can perform IO by calling 'liftIO', and turn a 'Generator' into an
-- 'InputStream' with 'fromGenerator'.
--
-- As a general rule, you should not acquire resources that need to be freed
-- from a 'Generator', because there is no guarantee the coroutine continuation
-- will ever be called, nor can you catch an exception from within a
-- 'Generator'.
newtype Generator r a = Generator {
      unG :: IO (Either (SP r (Generator r a)) a)
    } deriving (Typeable)


------------------------------------------------------------------------------
generatorBind :: Generator r a -> (a -> Generator r b) -> Generator r b
generatorBind (Generator m) f = Generator (m >>= either step value)
  where
    step (SP v r) = return $! Left $! SP v (generatorBind r f)
    value = unG .  f
{-# INLINE generatorBind #-}


------------------------------------------------------------------------------
instance Monad (Generator r) where
   return = Generator . return . Right
   (>>=)  = generatorBind


------------------------------------------------------------------------------
instance MonadIO (Generator r) where
    liftIO = Generator . (Right `fmap`)


------------------------------------------------------------------------------
instance Functor (Generator r) where
    fmap f (Generator m) = Generator $ m >>= either step value
      where
        step (SP v m') = return $! Left $! SP v (fmap f m')
        value v        = return $! Right $! f v


------------------------------------------------------------------------------
instance Applicative (Generator r) where
    pure = Generator . return . Right

    m <*> n = do
        f <- m
        v <- n
        return $! f v


------------------------------------------------------------------------------
-- | Calling @'yield' x@ causes the value @'Just' x@ to appear on the input
-- when this generator is converted to an 'InputStream'. The rest of the
-- computation after the call to 'yield' is resumed later when the
-- 'InputStream' is 'read' again.
yield :: r -> Generator r ()
yield x = Generator $! return $! Left $! SP x (return $! ())


------------------------------------------------------------------------------
-- | Turns a 'Generator' into an 'InputStream'.
fromGenerator :: Generator r a -> IO (InputStream r)
fromGenerator (Generator m) = do
    ref <- newIORef m
    makeInputStream $! go ref
  where
    go ref = readIORef ref >>= (\n -> n >>= either step finish)
      where
        step (SP v gen) = do
            writeIORef ref $! unG gen
            return $! Just v

        finish _ = return Nothing


------------------------------------------------------------------------------
newtype Consumer c a = Consumer {
      unC :: IO (Either (Maybe c -> Consumer c a) a)
    } deriving (Typeable)


------------------------------------------------------------------------------
instance Monad (Consumer c) where
    return = Consumer . return . Right

    (Consumer m) >>= f = Consumer $ m >>= either step value
      where
        step g  = return $! Left $! (>>= f) . g
        value v = unC $ f v


------------------------------------------------------------------------------
instance MonadIO (Consumer c) where
    liftIO = Consumer . fmap Right


------------------------------------------------------------------------------
instance Functor (Consumer r) where
    fmap f (Consumer m) = Consumer (m >>= either step value)
      where
        step g = return $! Left $! (fmap f) . g
        value v = return $! Right $! f v


------------------------------------------------------------------------------
instance Applicative (Consumer r) where
    pure = return

    m <*> n = do
        f <- m
        v <- n
        return $! f v


------------------------------------------------------------------------------
await :: Consumer r (Maybe r)
await = Consumer $ return (Left return)


------------------------------------------------------------------------------
fromConsumer :: Consumer r a -> IO (OutputStream r)
fromConsumer c0 = newIORef c0 >>= makeOutputStream . go
  where
    go ref mb = do
        c  <- readIORef ref
        c' <- unC c >>= either step (const $! return c)
        writeIORef ref c'
      where
        force c = do e <- unC c
                     return $! Consumer $! return e
        step g  = force $! g mb