never executed always true always false
1 -- | Thread safe queues for message passing
2 -- between many concurrent processes.
3 --
4 -- This message box is __UNLIMITED__.
5 --
6 -- Good single producer/single consumer performance
7 --
8 -- If you are sure that the producer(s) send messages
9 -- at a lower rate than the rate at which the consumer
10 -- consumes messages, use this module.
11 --
12 -- Otherwise use the more conservative
13 -- "UnliftIO.MessageBox.Limited" module.
14 module UnliftIO.MessageBox.Unlimited
15 ( BlockingUnlimited (..),
16 UnlimitedBox (),
17 UnlimitedBoxInput (),
18 )
19 where
20
21 -- import qualified Control.Concurrent.Chan.Unagi.NoBlocking as Unagi
22 import qualified Control.Concurrent.Chan.Unagi as Unagi
23 import Data.Functor (($>))
24 import UnliftIO.MessageBox.Util.Future (Future (..))
25 import qualified UnliftIO.MessageBox.Class as Class
26 import UnliftIO
27 ( MonadIO (liftIO),
28 MonadUnliftIO,
29 )
30
31 -- | A message queue out of which messages can
32 -- by 'receive'd.
33 --
34 -- This is the counter part of 'Input'. Can be
35 -- used for reading messages.
36 --
37 -- Messages can be received by 'receive' or 'tryReceive'.
38 data UnlimitedBox a
39 = MkUnlimitedBox
40 !(Unagi.InChan a)
41 !(Unagi.OutChan a)
42
43 -- | A message queue into which messages can be enqued by,
44 -- e.g. 'deliver'.
45 -- Messages can be received from an 'UnlimitedBox`.
46 --
47 -- The 'UnlimitedBoxInput' is the counter part of a 'UnlimitedBox'.
48 newtype UnlimitedBoxInput a = MkUnlimitedBoxInput (Unagi.InChan a)
49
50 -- | The (empty) configuration for creating
51 -- 'UnlimitedBox'es using the 'Class.IsMessageBoxArg' methods.
52 data BlockingUnlimited = BlockingUnlimited
53
54 instance Show BlockingUnlimited where
55 showsPrec _ _ = showString "Unlimited"
56
57 instance Class.IsMessageBoxArg BlockingUnlimited where
58 type MessageBox BlockingUnlimited = UnlimitedBox
59 {-# INLINE newMessageBox #-}
60 newMessageBox BlockingUnlimited = create
61 getConfiguredMessageLimit _ = Nothing
62
63 -- | A blocking instance that invokes 'receive'.
64 instance Class.IsMessageBox UnlimitedBox where
65 type Input UnlimitedBox = UnlimitedBoxInput
66 {-# INLINE receive #-}
67 receive !i = Just <$> receive i
68 {-# INLINE tryReceive #-}
69 tryReceive !i = tryReceive i
70 {-# INLINE newInput #-}
71 newInput !i = newInput i
72
73 -- | A blocking instance that invokes 'deliver'.
74 instance Class.IsInput UnlimitedBoxInput where
75 {-# INLINE deliver #-}
76 deliver !o !m = deliver o m $> True
77
78
79 -- | Create a 'MessageBox'.
80 --
81 -- From a 'MessageBox' a corresponding 'Input' can
82 -- be made, that can be passed to some potential
83 -- communication partners.
84 {-# INLINE create #-}
85 create :: MonadUnliftIO m => m (UnlimitedBox a)
86 create = do
87 (!inChan, !outChan) <- liftIO Unagi.newChan
88 return $! MkUnlimitedBox inChan outChan
89
90 -- | Wait for and receive a message from a 'MessageBox'.
91 {-# INLINE receive #-}
92 receive :: MonadUnliftIO m => UnlimitedBox a -> m a
93 receive (MkUnlimitedBox _ !s) =
94 --liftIO (Unagi.readChan IO.yield s)
95 liftIO (Unagi.readChan s)
96
97 -- | Try to receive a message from a 'MessageBox',
98 -- return @Nothing@ if the queue is empty.
99 {-# INLINE tryReceive #-}
100 tryReceive :: MonadUnliftIO m => UnlimitedBox a -> m (Future a)
101 tryReceive (MkUnlimitedBox _ !s) = liftIO $ do
102 (!promise, _) <- Unagi.tryReadChan s
103 return (Future (Unagi.tryRead promise))
104
105 -- | Create an 'Input' to write the items
106 -- that the given 'MessageBox' receives.
107 {-# INLINE newInput #-}
108 newInput :: MonadUnliftIO m => UnlimitedBox a -> m (UnlimitedBoxInput a)
109 newInput (MkUnlimitedBox !s _) = return $! MkUnlimitedBoxInput s
110
111 -- | Put a message into the 'Input'
112 -- of a 'MessageBox', such that the process
113 -- reading the 'MessageBox' receives the message.
114 {-# INLINE deliver #-}
115 deliver :: MonadUnliftIO m => UnlimitedBoxInput a -> a -> m ()
116 deliver (MkUnlimitedBoxInput !s) !a =
117 liftIO $ Unagi.writeChan s a