never executed always true always false
    1 -- | Thread safe queues for uni directional message passing
    2 -- between threads.
    3 --
    4 -- This message box has an upper limit, that means that
    5 -- sometimes delivery either fails or is blocked until
    6 -- the receiving thread has consumed more messages.
    7 --
    8 -- Use this module if the producer(s) outperform the consumer,
    9 -- but you want the extra safety that the queue blocks the
   10 -- 'Input' after a certain message limit is reached.
   11 --
   12 -- If you are sure that the producers fire at a slower rate
   13 -- then the rate at which the consumer consumes messages, use this
   14 -- module.
   15 module UnliftIO.MessageBox.Limited
   16   ( MessageLimit (..),
   17     messageLimitToInt,
   18     BlockingBoxLimit (..),
   19     BlockingBox (),
   20     BlockingInput (),
   21     NonBlockingBoxLimit (..),
   22     NonBlockingBox (),
   23     NonBlockingInput (..),
   24     WaitingBoxLimit (..),
   25     WaitingBox (..),
   26     WaitingInput (..),
   27   )
   28 where
   29 
   30 import qualified Control.Concurrent.Chan.Unagi.Bounded as Unagi
   31 import Control.Monad (unless)
   32 import Data.Functor (($>))
   33 import Data.Maybe (fromMaybe)
   34 import UnliftIO.MessageBox.Util.Future (Future (..))
   35 import qualified UnliftIO.MessageBox.Class as Class
   36 import UnliftIO
   37   ( MonadIO (liftIO),
   38     MonadUnliftIO,
   39     timeout,
   40   )
   41 import UnliftIO.Concurrent (threadDelay)
   42 
   43 -- | Message Limit
   44 --
   45 -- The message limit must be a reasonable small positive integer
   46 -- that is also a power of two. This stems from the fact that
   47 -- Unagi is used under the hood.
   48 --
   49 -- The limit is a lower bound.
   50 data MessageLimit
   51   = MessageLimit_1
   52   | MessageLimit_2
   53   | MessageLimit_4
   54   | MessageLimit_8
   55   | MessageLimit_16
   56   | MessageLimit_32
   57   | MessageLimit_64
   58   | MessageLimit_128
   59   | MessageLimit_256
   60   | MessageLimit_512
   61   | MessageLimit_1024
   62   | MessageLimit_2048
   63   | MessageLimit_4096
   64   deriving stock
   65     (Eq, Ord, Show, Bounded, Enum)
   66 
   67 -- | Convert a 'MessageLimit' to the
   68 -- 'Int' representation.
   69 {-# INLINE messageLimitToInt #-}
   70 messageLimitToInt :: MessageLimit -> Int
   71 messageLimitToInt =
   72   \case
   73     MessageLimit_1 -> 1
   74     MessageLimit_2 -> 2
   75     MessageLimit_4 -> 4
   76     MessageLimit_8 -> 8
   77     MessageLimit_16 -> 16
   78     MessageLimit_32 -> 32
   79     MessageLimit_64 -> 64
   80     MessageLimit_128 -> 128
   81     MessageLimit_256 -> 256
   82     MessageLimit_512 -> 512
   83     MessageLimit_1024 -> 1024
   84     MessageLimit_2048 -> 2048
   85     MessageLimit_4096 -> 4096
   86 
   87 -- * 'Class.IsMessageBoxArg' instances
   88 
   89 -- ** Blocking
   90 
   91 -- | Contains the (vague) limit of messages that a 'BlockingBox'
   92 -- can buffer, i.e. that 'deliver' can put into a 'BlockingInput'
   93 -- of a 'BlockingBox'.
   94 newtype BlockingBoxLimit = BlockingBoxLimit MessageLimit
   95   deriving stock (Eq, Ord)
   96 
   97 instance Show BlockingBoxLimit where
   98   showsPrec _ (BlockingBoxLimit !l) =
   99     showString "Blocking" . showsPrec 9 (messageLimitToInt l)
  100 
  101 -- | A message queue out of which messages can by 'receive'd.
  102 --
  103 -- This is the counter part of 'Input'. Can be used for reading
  104 -- messages.
  105 --
  106 -- Messages can be received by 'receive' or 'tryReceive'.
  107 data BlockingBox a
  108   = MkBlockingBox
  109       !(Unagi.InChan a)
  110       !(Unagi.OutChan a)
  111 
  112 -- | A message queue into which messages can be enqued by,
  113 --   e.g. 'tryToDeliver'.
  114 --   Messages can be received from an 'BlockingBox`.
  115 --
  116 --   The 'Input' is the counter part of a 'BlockingBox'.
  117 newtype BlockingInput a = MkBlockingInput (Unagi.InChan a)
  118 
  119 instance Class.IsMessageBoxArg BlockingBoxLimit where
  120   type MessageBox BlockingBoxLimit = BlockingBox
  121   {-# INLINE newMessageBox #-}
  122   newMessageBox (BlockingBoxLimit !limit) = create limit
  123   getConfiguredMessageLimit (BlockingBoxLimit !limit) =
  124     Just (messageLimitToInt limit)
  125 
  126 -- | A blocking instance that invokes 'receive'.
  127 instance Class.IsMessageBox BlockingBox where
  128   type Input BlockingBox = BlockingInput
  129 
  130   {-# INLINE receive #-}
  131   receive !i = Just <$> receive i
  132   {-# INLINE tryReceive #-}
  133   tryReceive !i = tryReceive i
  134   {-# INLINE newInput #-}
  135   newInput !i = newInput i
  136   receiveAfter (MkBlockingBox _ !s) !rto =
  137     do
  138       (!promise, !blocker) <- liftIO (Unagi.tryReadChan s)
  139       liftIO (Unagi.tryRead promise)
  140         >>= maybe
  141           (timeout rto (liftIO blocker))
  142           (return . Just)
  143 
  144 -- | A blocking instance that invokes 'deliver'.
  145 instance Class.IsInput BlockingInput where
  146   {-# INLINE deliver #-}
  147   deliver !o !a = deliver o a $> True
  148 
  149 --  ** A wrapper around 'BlockingBox' for Non-Blocking Input (NBI)
  150 
  151 -- | A 'BlockingBoxLimit' wrapper for non-blocking 'Class.IsMessageBoxArg' instances.
  152 newtype NonBlockingBoxLimit = NonBlockingBoxLimit MessageLimit
  153   deriving stock (Eq, Ord)
  154 
  155 instance Show NonBlockingBoxLimit where
  156   showsPrec _ (NonBlockingBoxLimit !l) =
  157     showString "NonBlocking" . showsPrec 9 (messageLimitToInt l)
  158 
  159 instance Class.IsMessageBoxArg NonBlockingBoxLimit where
  160   type MessageBox NonBlockingBoxLimit = NonBlockingBox
  161   {-# INLINE newMessageBox #-}
  162   newMessageBox (NonBlockingBoxLimit !l) =
  163     NonBlockingBox <$> Class.newMessageBox (BlockingBoxLimit l)
  164   getConfiguredMessageLimit (NonBlockingBoxLimit !limit) =
  165     Just (messageLimitToInt limit)
  166 
  167 -- | A 'BlockingBox' wrapper for non-blocking 'Class.IsMessageBox' instances.
  168 --
  169 -- The difference to the 'BlockingBox' instance is that 'Class.deliver'
  170 -- immediately returns if the message box limit is surpassed.
  171 newtype NonBlockingBox a = NonBlockingBox (BlockingBox a)
  172 
  173 instance Class.IsMessageBox NonBlockingBox where
  174   type Input NonBlockingBox = NonBlockingInput
  175   {-# INLINE receive #-}
  176   receive (NonBlockingBox !i) = Just <$> receive i
  177   {-# INLINE tryReceive #-}
  178   tryReceive (NonBlockingBox !i) = tryReceive i
  179   {-# INLINE receiveAfter #-}
  180   receiveAfter (NonBlockingBox !b) !rto =
  181     Class.receiveAfter b rto
  182   {-# INLINE newInput #-}
  183   newInput (NonBlockingBox !i) = NonBlockingInput <$> newInput i
  184 
  185 -- | A wrapper around 'BlockingInput' with a non-blocking 'Class.IsInput' instance.
  186 --
  187 -- 'deliver' will enqueue the message or return 'False' immediately,
  188 -- if the message box already contains more messages than
  189 -- it's limit allows.
  190 newtype NonBlockingInput a = NonBlockingInput (BlockingInput a)
  191 
  192 instance Class.IsInput NonBlockingInput where
  193   {-# INLINE deliver #-}
  194   deliver (NonBlockingInput !o) !a = do
  195     !res <- tryToDeliver o a
  196     unless res (threadDelay 10)
  197     return res
  198 
  199 --  ** 'BlockingBox' Wrapper with Timeout
  200 
  201 -- | A 'Class.IsMessageBoxArg' instance wrapping the 'BlockingBox'
  202 --  with independently configurable timeouts for 'receive' and 'deliver'.
  203 data WaitingBoxLimit
  204   = WaitingBoxLimit
  205       !(Maybe Int)
  206       !Int
  207       !MessageLimit
  208   deriving stock (Eq, Ord)
  209 
  210 instance Show WaitingBoxLimit where
  211   showsPrec _ (WaitingBoxLimit !t0 !t1 !l) =
  212     showString "Waiting_"
  213       . ( case t0 of
  214             Nothing -> id
  215             Just !t -> showsPrec 9 t . showChar '_'
  216         )
  217       . showsPrec 9 t1
  218       . showChar '_'
  219       . showsPrec 9 (messageLimitToInt l)
  220 
  221 instance Class.IsMessageBoxArg WaitingBoxLimit where
  222   type MessageBox WaitingBoxLimit = WaitingBox
  223   {-# INLINE newMessageBox #-}
  224   newMessageBox l@(WaitingBoxLimit _ _ !c) =
  225     WaitingBox l <$> Class.newMessageBox (BlockingBoxLimit c)
  226   getConfiguredMessageLimit (WaitingBoxLimit _ _ !limit) =
  227     Just (messageLimitToInt limit)
  228 
  229 -- | A 'BlockingBox' an a 'WaitingBoxLimit' for
  230 -- the 'Class.IsMessageBox' instance.
  231 data WaitingBox a
  232   = WaitingBox WaitingBoxLimit (BlockingBox a)
  233 
  234 instance Class.IsMessageBox WaitingBox where
  235   type Input WaitingBox = WaitingInput
  236   {-# INLINE receive #-}
  237   receive (WaitingBox (WaitingBoxLimit (Just !rto) _ _) (MkBlockingBox _ !s)) =
  238     liftIO $ do
  239       (!promise, !blocker) <- Unagi.tryReadChan s
  240       Unagi.tryRead promise
  241         >>= maybe
  242           (timeout rto blocker)
  243           (return . Just)
  244   receive (WaitingBox !_ !m) =
  245     Class.receive m
  246   {-# INLINE receiveAfter #-}
  247   receiveAfter (WaitingBox _ !b) !rto =
  248     Class.receiveAfter b rto
  249   {-# INLINE tryReceive #-}
  250   tryReceive (WaitingBox _ !m) = tryReceive m
  251   {-# INLINE newInput #-}
  252   newInput (WaitingBox (WaitingBoxLimit _ !dto _) !m) =
  253     WaitingInput dto <$> newInput m
  254 
  255 -- | An input for a 'BlockingBox' that will block
  256 -- for not much more than the given timeout when
  257 -- the message box is full.
  258 data WaitingInput a
  259   = WaitingInput
  260       !Int
  261       !(BlockingInput a)
  262 
  263 instance Class.IsInput WaitingInput where
  264   {-# INLINE deliver #-}
  265   deliver (WaitingInput !t !o) !a = tryToDeliverAndWait t o a
  266 
  267 -- Internal Functions
  268 
  269 {-# INLINE create #-}
  270 create :: MonadUnliftIO m => MessageLimit -> m (BlockingBox a)
  271 create !limit = do
  272   (!inChan, !outChan) <- liftIO (Unagi.newChan (messageLimitToInt limit))
  273   return $! MkBlockingBox inChan outChan
  274 
  275 {-# INLINE receive #-}
  276 receive :: MonadUnliftIO m => BlockingBox a -> m a
  277 receive (MkBlockingBox _ !s) =
  278   liftIO (Unagi.readChan s)
  279 
  280 -- | Return a 'Future' for the next value that will be received.
  281 {-# INLINE tryReceive #-}
  282 tryReceive :: MonadUnliftIO m => BlockingBox a -> m (Future a)
  283 tryReceive (MkBlockingBox _ !s) = liftIO $ do
  284   (!promise, _) <- Unagi.tryReadChan s
  285   return (Future (Unagi.tryRead promise))
  286 
  287 {-# INLINE newInput #-}
  288 newInput :: MonadUnliftIO m => BlockingBox a -> m (BlockingInput a)
  289 newInput (MkBlockingBox !s _) = return $! MkBlockingInput s
  290 
  291 {-# INLINE deliver #-}
  292 deliver :: MonadUnliftIO m => BlockingInput a -> a -> m ()
  293 deliver (MkBlockingInput !s) !a =
  294   liftIO $ Unagi.writeChan s a
  295 
  296 -- | Try to put a message into the 'BlockingInput'
  297 -- of a 'MessageBox', such that the process
  298 -- reading the 'MessageBox' receives the message.
  299 --
  300 -- If the 'MessageBox' is full return False.
  301 {-# INLINE tryToDeliver #-}
  302 tryToDeliver :: MonadUnliftIO m => BlockingInput a -> a -> m Bool
  303 tryToDeliver (MkBlockingInput !s) !a =
  304   liftIO $ Unagi.tryWriteChan s a
  305 
  306 -- | Send a message by putting it into the 'BlockingInput'
  307 -- of a 'MessageBox', such that the process
  308 -- reading the 'MessageBox' receives the message.
  309 --
  310 -- Return False if the
  311 -- 'MessageBox' has been closed or is full.
  312 --
  313 -- This assumes that the queue is likely empty, and
  314 -- tries 'tryToDeliver' first before wasting any
  315 -- precious cpu cycles entering 'timeout'.
  316 tryToDeliverAndWait ::
  317   MonadUnliftIO m =>
  318   Int ->
  319   BlockingInput a ->
  320   a ->
  321   m Bool
  322 tryToDeliverAndWait !t !o !a =
  323   -- Benchmarks have shown great improvements
  324   -- when calling tryToDeliver once before doing
  325   -- deliver in a System.Timeout.timeout;
  326   --
  327   -- We even tried calling 'tryToDeliver' more than once,
  328   -- but that did not lead to convinving improvements.
  329   --
  330   -- Benachmarks have also shown, that sending pessimistically
  331   -- (i.e. avoiding `tryToDeliver`) does not improve performance,
  332   -- even when the message queue is congested
  333   --
  334   -- See benchmark results:
  335   -- `benchmark-results/optimistic-vs-pessimistic.html`
  336   tryToDeliver o a >>= \case
  337     True -> return True
  338     False ->
  339       fromMaybe False <$> timeout t (deliver o a $> True)