-- |
-- Module     : Simulation.Aivika.Distributed.Optimistic.Internal.AcknowledgementMessageQueue
-- Copyright  : Copyright (c) 2015-2018, David Sorokin <david.sorokin@gmail.com>
-- License    : BSD3
-- Maintainer : David Sorokin <david.sorokin@gmail.com>
-- Stability  : experimental
-- Tested with: GHC 7.10.3
--
-- This module defines an acknowledegment message queue.
--
module Simulation.Aivika.Distributed.Optimistic.Internal.AcknowledgementMessageQueue
       (AcknowledgementMessageQueue,
        newAcknowledgementMessageQueue,
        acknowledgementMessageQueueSize,
        enqueueAcknowledgementMessage,
        reduceAcknowledgementMessages,
        filterAcknowledgementMessages) where

import Data.Maybe
import Data.List
import Data.IORef

import Control.Monad
import Control.Monad.Trans

import Simulation.Aivika.Vector
import Simulation.Aivika.Trans.Comp
import Simulation.Aivika.Trans.Simulation
import Simulation.Aivika.Trans.Dynamics
import Simulation.Aivika.Trans.Event
import Simulation.Aivika.Trans.Signal
import Simulation.Aivika.Trans.Internal.Types

import Simulation.Aivika.Distributed.Optimistic.Internal.Message
import Simulation.Aivika.Distributed.Optimistic.Internal.DIO
import Simulation.Aivika.Distributed.Optimistic.Internal.IO
import Simulation.Aivika.Distributed.Optimistic.DIO

-- | Specifies the acknowledgement message queue.
data AcknowledgementMessageQueue =
  AcknowledgementMessageQueue { AcknowledgementMessageQueue -> Vector AcknowledgementMessage
acknowledgementMessages :: Vector AcknowledgementMessage
                                -- ^ the acknowedgement messages
                              }

-- | Create a new acknowledgement message queue.
newAcknowledgementMessageQueue :: DIO AcknowledgementMessageQueue
newAcknowledgementMessageQueue :: DIO AcknowledgementMessageQueue
newAcknowledgementMessageQueue =
  do Vector AcknowledgementMessage
ms <- IO (Vector AcknowledgementMessage)
-> DIO (Vector AcknowledgementMessage)
forall a. IO a -> DIO a
forall (m :: * -> *) a. MonadIOUnsafe m => IO a -> m a
liftIOUnsafe IO (Vector AcknowledgementMessage)
forall a. IO (Vector a)
newVector
     AcknowledgementMessageQueue -> DIO AcknowledgementMessageQueue
forall a. a -> DIO a
forall (m :: * -> *) a. Monad m => a -> m a
return AcknowledgementMessageQueue { acknowledgementMessages :: Vector AcknowledgementMessage
acknowledgementMessages = Vector AcknowledgementMessage
ms }

-- | Return the acknowledgement message queue size.
acknowledgementMessageQueueSize :: AcknowledgementMessageQueue -> IO Int
{-# INLINE acknowledgementMessageQueueSize #-}
acknowledgementMessageQueueSize :: AcknowledgementMessageQueue -> IO Int
acknowledgementMessageQueueSize = Vector AcknowledgementMessage -> IO Int
forall a. Vector a -> IO Int
vectorCount (Vector AcknowledgementMessage -> IO Int)
-> (AcknowledgementMessageQueue -> Vector AcknowledgementMessage)
-> AcknowledgementMessageQueue
-> IO Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AcknowledgementMessageQueue -> Vector AcknowledgementMessage
acknowledgementMessages

-- | Return a complement.
complement :: Int -> Int
complement :: Int -> Int
complement Int
x = - Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1

-- | Enqueue a new acknowledement message ignoring the duplicated messages.
enqueueAcknowledgementMessage :: AcknowledgementMessageQueue -> AcknowledgementMessage -> IO ()
enqueueAcknowledgementMessage :: AcknowledgementMessageQueue -> AcknowledgementMessage -> IO ()
enqueueAcknowledgementMessage AcknowledgementMessageQueue
q AcknowledgementMessage
m =
  do Int
i <- AcknowledgementMessageQueue -> AcknowledgementMessage -> IO Int
lookupAcknowledgementMessageIndex AcknowledgementMessageQueue
q AcknowledgementMessage
m
     Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
       do -- insert the message at the specified index
          let i' :: Int
i' = Int -> Int
complement Int
i
          Vector AcknowledgementMessage
-> Int -> AcknowledgementMessage -> IO ()
forall a. Vector a -> Int -> a -> IO ()
vectorInsert (AcknowledgementMessageQueue -> Vector AcknowledgementMessage
acknowledgementMessages AcknowledgementMessageQueue
q) Int
i' AcknowledgementMessage
m

-- | Search for the message index.
lookupAcknowledgementMessageIndex' :: AcknowledgementMessageQueue -> AcknowledgementMessage -> Int -> Int -> IO Int
lookupAcknowledgementMessageIndex' :: AcknowledgementMessageQueue
-> AcknowledgementMessage -> Int -> Int -> IO Int
lookupAcknowledgementMessageIndex' AcknowledgementMessageQueue
q AcknowledgementMessage
m Int
left Int
right =
  if Int
left Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
right
  then Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> IO Int) -> Int -> IO Int
forall a b. (a -> b) -> a -> b
$ Int -> Int
complement Int
left
  else  
    do let index :: Int
index = (Int
left Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
right) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
2
       AcknowledgementMessage
m' <- Vector AcknowledgementMessage -> Int -> IO AcknowledgementMessage
forall a. Vector a -> Int -> IO a
readVector (AcknowledgementMessageQueue -> Vector AcknowledgementMessage
acknowledgementMessages AcknowledgementMessageQueue
q) Int
index
       let t' :: Double
t' = AcknowledgementMessage -> Double
acknowledgementReceiveTime AcknowledgementMessage
m'
           t :: Double
t  = AcknowledgementMessage -> Double
acknowledgementReceiveTime AcknowledgementMessage
m
       if Double
t' Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> Double
t Bool -> Bool -> Bool
|| (Double
t' Double -> Double -> Bool
forall a. Eq a => a -> a -> Bool
== Double
t Bool -> Bool -> Bool
&& AcknowledgementMessage
m' AcknowledgementMessage -> AcknowledgementMessage -> Bool
forall a. Ord a => a -> a -> Bool
> AcknowledgementMessage
m)
         then AcknowledgementMessageQueue
-> AcknowledgementMessage -> Int -> Int -> IO Int
lookupAcknowledgementMessageIndex' AcknowledgementMessageQueue
q AcknowledgementMessage
m Int
left (Int
index Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)
         else if Double
t' Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< Double
t Bool -> Bool -> Bool
|| (Double
t' Double -> Double -> Bool
forall a. Eq a => a -> a -> Bool
== Double
t Bool -> Bool -> Bool
&& AcknowledgementMessage
m' AcknowledgementMessage -> AcknowledgementMessage -> Bool
forall a. Ord a => a -> a -> Bool
< AcknowledgementMessage
m)
              then AcknowledgementMessageQueue
-> AcknowledgementMessage -> Int -> Int -> IO Int
lookupAcknowledgementMessageIndex' AcknowledgementMessageQueue
q AcknowledgementMessage
m (Int
index Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int
right
              else Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
index      
 
-- | Search for the message index.
lookupAcknowledgementMessageIndex :: AcknowledgementMessageQueue -> AcknowledgementMessage -> IO Int
lookupAcknowledgementMessageIndex :: AcknowledgementMessageQueue -> AcknowledgementMessage -> IO Int
lookupAcknowledgementMessageIndex AcknowledgementMessageQueue
q AcknowledgementMessage
m =
  do Int
n <- Vector AcknowledgementMessage -> IO Int
forall a. Vector a -> IO Int
vectorCount (AcknowledgementMessageQueue -> Vector AcknowledgementMessage
acknowledgementMessages AcknowledgementMessageQueue
q)
     AcknowledgementMessageQueue
-> AcknowledgementMessage -> Int -> Int -> IO Int
lookupAcknowledgementMessageIndex' AcknowledgementMessageQueue
q AcknowledgementMessage
m Int
0 (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)

-- | Reduce the acknowledgement messages till the specified time.
reduceAcknowledgementMessages :: AcknowledgementMessageQueue -> Double -> IO ()
reduceAcknowledgementMessages :: AcknowledgementMessageQueue -> Double -> IO ()
reduceAcknowledgementMessages AcknowledgementMessageQueue
q Double
t =
  do Int
count <- Vector AcknowledgementMessage -> IO Int
forall a. Vector a -> IO Int
vectorCount (AcknowledgementMessageQueue -> Vector AcknowledgementMessage
acknowledgementMessages AcknowledgementMessageQueue
q)
     Int
len   <- Int -> Int -> IO Int
loop Int
count Int
0
     Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
       Vector AcknowledgementMessage -> Int -> Int -> IO ()
forall a. Vector a -> Int -> Int -> IO ()
vectorDeleteRange (AcknowledgementMessageQueue -> Vector AcknowledgementMessage
acknowledgementMessages AcknowledgementMessageQueue
q) Int
0 Int
len
       where
         loop :: Int -> Int -> IO Int
loop Int
n Int
i
           | Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n    = Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
i
           | Bool
otherwise = do AcknowledgementMessage
m <- Vector AcknowledgementMessage -> Int -> IO AcknowledgementMessage
forall a. Vector a -> Int -> IO a
readVector (AcknowledgementMessageQueue -> Vector AcknowledgementMessage
acknowledgementMessages AcknowledgementMessageQueue
q) Int
i
                            if AcknowledgementMessage -> Double
acknowledgementReceiveTime AcknowledgementMessage
m Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< Double
t
                              then Int -> Int -> IO Int
loop Int
n (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
                              else Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
i

-- | Filter the acknowledgement messages using the specified predicate.
filterAcknowledgementMessages :: (AcknowledgementMessage -> Bool) -> AcknowledgementMessageQueue -> IO [AcknowledgementMessage]
filterAcknowledgementMessages :: (AcknowledgementMessage -> Bool)
-> AcknowledgementMessageQueue -> IO [AcknowledgementMessage]
filterAcknowledgementMessages AcknowledgementMessage -> Bool
pred AcknowledgementMessageQueue
q =
  do Int
count <- Vector AcknowledgementMessage -> IO Int
forall a. Vector a -> IO Int
vectorCount (AcknowledgementMessageQueue -> Vector AcknowledgementMessage
acknowledgementMessages AcknowledgementMessageQueue
q)
     Int
-> Int -> [AcknowledgementMessage] -> IO [AcknowledgementMessage]
loop Int
count Int
0 []
       where
         loop :: Int
-> Int -> [AcknowledgementMessage] -> IO [AcknowledgementMessage]
loop Int
n Int
i [AcknowledgementMessage]
acc
           | Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n    = [AcknowledgementMessage] -> IO [AcknowledgementMessage]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([AcknowledgementMessage] -> [AcknowledgementMessage]
forall a. [a] -> [a]
reverse [AcknowledgementMessage]
acc)
           | Bool
otherwise = do AcknowledgementMessage
m <- Vector AcknowledgementMessage -> Int -> IO AcknowledgementMessage
forall a. Vector a -> Int -> IO a
readVector (AcknowledgementMessageQueue -> Vector AcknowledgementMessage
acknowledgementMessages AcknowledgementMessageQueue
q) Int
i
                            if AcknowledgementMessage -> Bool
pred AcknowledgementMessage
m
                              then Int
-> Int -> [AcknowledgementMessage] -> IO [AcknowledgementMessage]
loop Int
n (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) (AcknowledgementMessage
m AcknowledgementMessage
-> [AcknowledgementMessage] -> [AcknowledgementMessage]
forall a. a -> [a] -> [a]
: [AcknowledgementMessage]
acc)
                              else Int
-> Int -> [AcknowledgementMessage] -> IO [AcknowledgementMessage]
loop Int
n (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) [AcknowledgementMessage]
acc