never executed always true always false
    1 -- | Thread safe queues for message passing
    2 -- between many concurrent processes.
    3 --
    4 -- This message box is __UNLIMITED__.
    5 --
    6 -- Good single producer/single consumer performance
    7 --
    8 -- If you are sure that the producer(s) send messages
    9 -- at a lower rate than the rate at which the consumer
   10 -- consumes messages, use this module.
   11 --
   12 -- Otherwise use the more conservative
   13 -- "UnliftIO.MessageBox.Limited" module.
   14 module UnliftIO.MessageBox.Unlimited
   15   ( BlockingUnlimited (..),
   16     UnlimitedBox (),
   17     UnlimitedBoxInput (),
   18   )
   19 where
   20 
   21 -- import qualified Control.Concurrent.Chan.Unagi.NoBlocking as Unagi
   22 import qualified Control.Concurrent.Chan.Unagi as Unagi
   23 import Data.Functor (($>))
   24 import UnliftIO.MessageBox.Util.Future (Future (..))
   25 import qualified UnliftIO.MessageBox.Class as Class
   26 import UnliftIO
   27   ( MonadIO (liftIO),
   28     MonadUnliftIO,
   29   )
   30 
   31 -- | A message queue out of which messages can
   32 --   by 'receive'd.
   33 --
   34 -- This is the counter part of 'Input'. Can be
   35 -- used for reading messages.
   36 --
   37 -- Messages can be received by 'receive' or 'tryReceive'.
   38 data UnlimitedBox a
   39   = MkUnlimitedBox
   40       !(Unagi.InChan a)
   41       !(Unagi.OutChan a)
   42 
   43 -- | A message queue into which messages can be enqued by,
   44 --   e.g. 'deliver'.
   45 --   Messages can be received from an 'UnlimitedBox`.
   46 --
   47 --   The 'UnlimitedBoxInput' is the counter part of a 'UnlimitedBox'.
   48 newtype UnlimitedBoxInput a = MkUnlimitedBoxInput (Unagi.InChan a)
   49 
   50 -- | The (empty) configuration for creating
   51 -- 'UnlimitedBox'es using the 'Class.IsMessageBoxArg' methods.
   52 data BlockingUnlimited = BlockingUnlimited
   53 
   54 instance Show BlockingUnlimited where
   55   showsPrec _ _ = showString "Unlimited"
   56 
   57 instance Class.IsMessageBoxArg BlockingUnlimited where
   58   type MessageBox BlockingUnlimited = UnlimitedBox
   59   {-# INLINE newMessageBox #-}
   60   newMessageBox BlockingUnlimited = create
   61   getConfiguredMessageLimit _ = Nothing    
   62 
   63 -- | A blocking instance that invokes 'receive'.
   64 instance Class.IsMessageBox UnlimitedBox where
   65   type Input UnlimitedBox = UnlimitedBoxInput
   66   {-# INLINE receive #-}
   67   receive !i = Just <$> receive i
   68   {-# INLINE tryReceive #-}
   69   tryReceive !i = tryReceive i
   70   {-# INLINE newInput #-}
   71   newInput !i = newInput i
   72 
   73 -- | A blocking instance that invokes 'deliver'.
   74 instance Class.IsInput UnlimitedBoxInput where
   75   {-# INLINE deliver #-}
   76   deliver !o !m = deliver o m $> True
   77 
   78 
   79 -- | Create a 'MessageBox'.
   80 --
   81 -- From a 'MessageBox' a corresponding 'Input' can
   82 -- be made, that can be passed to some potential
   83 -- communication partners.
   84 {-# INLINE create #-}
   85 create :: MonadUnliftIO m => m (UnlimitedBox a)
   86 create = do
   87   (!inChan, !outChan) <- liftIO Unagi.newChan
   88   return $! MkUnlimitedBox inChan outChan
   89 
   90 -- | Wait for and receive a message from a 'MessageBox'.
   91 {-# INLINE receive #-}
   92 receive :: MonadUnliftIO m => UnlimitedBox a -> m a
   93 receive (MkUnlimitedBox _ !s) =
   94   --liftIO (Unagi.readChan IO.yield s)
   95   liftIO (Unagi.readChan s)
   96 
   97 -- | Try to receive a message from a 'MessageBox',
   98 -- return @Nothing@ if the queue is empty.
   99 {-# INLINE tryReceive #-}
  100 tryReceive :: MonadUnliftIO m => UnlimitedBox a -> m (Future a)
  101 tryReceive (MkUnlimitedBox _ !s) = liftIO $ do
  102   (!promise, _) <- Unagi.tryReadChan s
  103   return (Future (Unagi.tryRead promise))
  104 
  105 -- | Create an 'Input' to write the items
  106 -- that the given 'MessageBox' receives.
  107 {-# INLINE newInput #-}
  108 newInput :: MonadUnliftIO m => UnlimitedBox a -> m (UnlimitedBoxInput a)
  109 newInput (MkUnlimitedBox !s _) = return $! MkUnlimitedBoxInput s
  110 
  111 -- | Put a message into the 'Input'
  112 -- of a 'MessageBox', such that the process
  113 -- reading the 'MessageBox' receives the message.
  114 {-# INLINE deliver #-}
  115 deliver :: MonadUnliftIO m => UnlimitedBoxInput a -> a -> m ()
  116 deliver (MkUnlimitedBoxInput !s) !a =
  117   liftIO $ Unagi.writeChan s a