never executed always true always false
1 -- | Thread safe queues for uni directional message passing
2 -- between threads.
3 --
4 -- This message box has an upper limit, that means that
5 -- sometimes delivery either fails or is blocked until
6 -- the receiving thread has consumed more messages.
7 --
8 -- Use this module if the producer(s) outperform the consumer,
9 -- but you want the extra safety that the queue blocks the
10 -- 'Input' after a certain message limit is reached.
11 --
12 -- If you are sure that the producers fire at a slower rate
13 -- then the rate at which the consumer consumes messages, use this
14 -- module.
15 module UnliftIO.MessageBox.Limited
16 ( MessageLimit (..),
17 messageLimitToInt,
18 BlockingBoxLimit (..),
19 BlockingBox (),
20 BlockingInput (),
21 NonBlockingBoxLimit (..),
22 NonBlockingBox (),
23 NonBlockingInput (..),
24 WaitingBoxLimit (..),
25 WaitingBox (..),
26 WaitingInput (..),
27 )
28 where
29
30 import qualified Control.Concurrent.Chan.Unagi.Bounded as Unagi
31 import Control.Monad (unless)
32 import Data.Functor (($>))
33 import Data.Maybe (fromMaybe)
34 import UnliftIO.MessageBox.Util.Future (Future (..))
35 import qualified UnliftIO.MessageBox.Class as Class
36 import UnliftIO
37 ( MonadIO (liftIO),
38 MonadUnliftIO,
39 timeout,
40 )
41 import UnliftIO.Concurrent (threadDelay)
42
43 -- | Message Limit
44 --
45 -- The message limit must be a reasonable small positive integer
46 -- that is also a power of two. This stems from the fact that
47 -- Unagi is used under the hood.
48 --
49 -- The limit is a lower bound.
50 data MessageLimit
51 = MessageLimit_1
52 | MessageLimit_2
53 | MessageLimit_4
54 | MessageLimit_8
55 | MessageLimit_16
56 | MessageLimit_32
57 | MessageLimit_64
58 | MessageLimit_128
59 | MessageLimit_256
60 | MessageLimit_512
61 | MessageLimit_1024
62 | MessageLimit_2048
63 | MessageLimit_4096
64 deriving stock
65 (Eq, Ord, Show, Bounded, Enum)
66
67 -- | Convert a 'MessageLimit' to the
68 -- 'Int' representation.
69 {-# INLINE messageLimitToInt #-}
70 messageLimitToInt :: MessageLimit -> Int
71 messageLimitToInt =
72 \case
73 MessageLimit_1 -> 1
74 MessageLimit_2 -> 2
75 MessageLimit_4 -> 4
76 MessageLimit_8 -> 8
77 MessageLimit_16 -> 16
78 MessageLimit_32 -> 32
79 MessageLimit_64 -> 64
80 MessageLimit_128 -> 128
81 MessageLimit_256 -> 256
82 MessageLimit_512 -> 512
83 MessageLimit_1024 -> 1024
84 MessageLimit_2048 -> 2048
85 MessageLimit_4096 -> 4096
86
87 -- * 'Class.IsMessageBoxArg' instances
88
89 -- ** Blocking
90
91 -- | Contains the (vague) limit of messages that a 'BlockingBox'
92 -- can buffer, i.e. that 'deliver' can put into a 'BlockingInput'
93 -- of a 'BlockingBox'.
94 newtype BlockingBoxLimit = BlockingBoxLimit MessageLimit
95 deriving stock (Eq, Ord)
96
97 instance Show BlockingBoxLimit where
98 showsPrec _ (BlockingBoxLimit !l) =
99 showString "Blocking" . showsPrec 9 (messageLimitToInt l)
100
101 -- | A message queue out of which messages can by 'receive'd.
102 --
103 -- This is the counter part of 'Input'. Can be used for reading
104 -- messages.
105 --
106 -- Messages can be received by 'receive' or 'tryReceive'.
107 data BlockingBox a
108 = MkBlockingBox
109 !(Unagi.InChan a)
110 !(Unagi.OutChan a)
111
112 -- | A message queue into which messages can be enqued by,
113 -- e.g. 'tryToDeliver'.
114 -- Messages can be received from an 'BlockingBox`.
115 --
116 -- The 'Input' is the counter part of a 'BlockingBox'.
117 newtype BlockingInput a = MkBlockingInput (Unagi.InChan a)
118
119 instance Class.IsMessageBoxArg BlockingBoxLimit where
120 type MessageBox BlockingBoxLimit = BlockingBox
121 {-# INLINE newMessageBox #-}
122 newMessageBox (BlockingBoxLimit !limit) = create limit
123 getConfiguredMessageLimit (BlockingBoxLimit !limit) =
124 Just (messageLimitToInt limit)
125
126 -- | A blocking instance that invokes 'receive'.
127 instance Class.IsMessageBox BlockingBox where
128 type Input BlockingBox = BlockingInput
129
130 {-# INLINE receive #-}
131 receive !i = Just <$> receive i
132 {-# INLINE tryReceive #-}
133 tryReceive !i = tryReceive i
134 {-# INLINE newInput #-}
135 newInput !i = newInput i
136 receiveAfter (MkBlockingBox _ !s) !rto =
137 do
138 (!promise, !blocker) <- liftIO (Unagi.tryReadChan s)
139 liftIO (Unagi.tryRead promise)
140 >>= maybe
141 (timeout rto (liftIO blocker))
142 (return . Just)
143
144 -- | A blocking instance that invokes 'deliver'.
145 instance Class.IsInput BlockingInput where
146 {-# INLINE deliver #-}
147 deliver !o !a = deliver o a $> True
148
149 -- ** A wrapper around 'BlockingBox' for Non-Blocking Input (NBI)
150
151 -- | A 'BlockingBoxLimit' wrapper for non-blocking 'Class.IsMessageBoxArg' instances.
152 newtype NonBlockingBoxLimit = NonBlockingBoxLimit MessageLimit
153 deriving stock (Eq, Ord)
154
155 instance Show NonBlockingBoxLimit where
156 showsPrec _ (NonBlockingBoxLimit !l) =
157 showString "NonBlocking" . showsPrec 9 (messageLimitToInt l)
158
159 instance Class.IsMessageBoxArg NonBlockingBoxLimit where
160 type MessageBox NonBlockingBoxLimit = NonBlockingBox
161 {-# INLINE newMessageBox #-}
162 newMessageBox (NonBlockingBoxLimit !l) =
163 NonBlockingBox <$> Class.newMessageBox (BlockingBoxLimit l)
164 getConfiguredMessageLimit (NonBlockingBoxLimit !limit) =
165 Just (messageLimitToInt limit)
166
167 -- | A 'BlockingBox' wrapper for non-blocking 'Class.IsMessageBox' instances.
168 --
169 -- The difference to the 'BlockingBox' instance is that 'Class.deliver'
170 -- immediately returns if the message box limit is surpassed.
171 newtype NonBlockingBox a = NonBlockingBox (BlockingBox a)
172
173 instance Class.IsMessageBox NonBlockingBox where
174 type Input NonBlockingBox = NonBlockingInput
175 {-# INLINE receive #-}
176 receive (NonBlockingBox !i) = Just <$> receive i
177 {-# INLINE tryReceive #-}
178 tryReceive (NonBlockingBox !i) = tryReceive i
179 {-# INLINE receiveAfter #-}
180 receiveAfter (NonBlockingBox !b) !rto =
181 Class.receiveAfter b rto
182 {-# INLINE newInput #-}
183 newInput (NonBlockingBox !i) = NonBlockingInput <$> newInput i
184
185 -- | A wrapper around 'BlockingInput' with a non-blocking 'Class.IsInput' instance.
186 --
187 -- 'deliver' will enqueue the message or return 'False' immediately,
188 -- if the message box already contains more messages than
189 -- it's limit allows.
190 newtype NonBlockingInput a = NonBlockingInput (BlockingInput a)
191
192 instance Class.IsInput NonBlockingInput where
193 {-# INLINE deliver #-}
194 deliver (NonBlockingInput !o) !a = do
195 !res <- tryToDeliver o a
196 unless res (threadDelay 10)
197 return res
198
199 -- ** 'BlockingBox' Wrapper with Timeout
200
201 -- | A 'Class.IsMessageBoxArg' instance wrapping the 'BlockingBox'
202 -- with independently configurable timeouts for 'receive' and 'deliver'.
203 data WaitingBoxLimit
204 = WaitingBoxLimit
205 !(Maybe Int)
206 !Int
207 !MessageLimit
208 deriving stock (Eq, Ord)
209
210 instance Show WaitingBoxLimit where
211 showsPrec _ (WaitingBoxLimit !t0 !t1 !l) =
212 showString "Waiting_"
213 . ( case t0 of
214 Nothing -> id
215 Just !t -> showsPrec 9 t . showChar '_'
216 )
217 . showsPrec 9 t1
218 . showChar '_'
219 . showsPrec 9 (messageLimitToInt l)
220
221 instance Class.IsMessageBoxArg WaitingBoxLimit where
222 type MessageBox WaitingBoxLimit = WaitingBox
223 {-# INLINE newMessageBox #-}
224 newMessageBox l@(WaitingBoxLimit _ _ !c) =
225 WaitingBox l <$> Class.newMessageBox (BlockingBoxLimit c)
226 getConfiguredMessageLimit (WaitingBoxLimit _ _ !limit) =
227 Just (messageLimitToInt limit)
228
229 -- | A 'BlockingBox' an a 'WaitingBoxLimit' for
230 -- the 'Class.IsMessageBox' instance.
231 data WaitingBox a
232 = WaitingBox WaitingBoxLimit (BlockingBox a)
233
234 instance Class.IsMessageBox WaitingBox where
235 type Input WaitingBox = WaitingInput
236 {-# INLINE receive #-}
237 receive (WaitingBox (WaitingBoxLimit (Just !rto) _ _) (MkBlockingBox _ !s)) =
238 liftIO $ do
239 (!promise, !blocker) <- Unagi.tryReadChan s
240 Unagi.tryRead promise
241 >>= maybe
242 (timeout rto blocker)
243 (return . Just)
244 receive (WaitingBox !_ !m) =
245 Class.receive m
246 {-# INLINE receiveAfter #-}
247 receiveAfter (WaitingBox _ !b) !rto =
248 Class.receiveAfter b rto
249 {-# INLINE tryReceive #-}
250 tryReceive (WaitingBox _ !m) = tryReceive m
251 {-# INLINE newInput #-}
252 newInput (WaitingBox (WaitingBoxLimit _ !dto _) !m) =
253 WaitingInput dto <$> newInput m
254
255 -- | An input for a 'BlockingBox' that will block
256 -- for not much more than the given timeout when
257 -- the message box is full.
258 data WaitingInput a
259 = WaitingInput
260 !Int
261 !(BlockingInput a)
262
263 instance Class.IsInput WaitingInput where
264 {-# INLINE deliver #-}
265 deliver (WaitingInput !t !o) !a = tryToDeliverAndWait t o a
266
267 -- Internal Functions
268
269 {-# INLINE create #-}
270 create :: MonadUnliftIO m => MessageLimit -> m (BlockingBox a)
271 create !limit = do
272 (!inChan, !outChan) <- liftIO (Unagi.newChan (messageLimitToInt limit))
273 return $! MkBlockingBox inChan outChan
274
275 {-# INLINE receive #-}
276 receive :: MonadUnliftIO m => BlockingBox a -> m a
277 receive (MkBlockingBox _ !s) =
278 liftIO (Unagi.readChan s)
279
280 -- | Return a 'Future' for the next value that will be received.
281 {-# INLINE tryReceive #-}
282 tryReceive :: MonadUnliftIO m => BlockingBox a -> m (Future a)
283 tryReceive (MkBlockingBox _ !s) = liftIO $ do
284 (!promise, _) <- Unagi.tryReadChan s
285 return (Future (Unagi.tryRead promise))
286
287 {-# INLINE newInput #-}
288 newInput :: MonadUnliftIO m => BlockingBox a -> m (BlockingInput a)
289 newInput (MkBlockingBox !s _) = return $! MkBlockingInput s
290
291 {-# INLINE deliver #-}
292 deliver :: MonadUnliftIO m => BlockingInput a -> a -> m ()
293 deliver (MkBlockingInput !s) !a =
294 liftIO $ Unagi.writeChan s a
295
296 -- | Try to put a message into the 'BlockingInput'
297 -- of a 'MessageBox', such that the process
298 -- reading the 'MessageBox' receives the message.
299 --
300 -- If the 'MessageBox' is full return False.
301 {-# INLINE tryToDeliver #-}
302 tryToDeliver :: MonadUnliftIO m => BlockingInput a -> a -> m Bool
303 tryToDeliver (MkBlockingInput !s) !a =
304 liftIO $ Unagi.tryWriteChan s a
305
306 -- | Send a message by putting it into the 'BlockingInput'
307 -- of a 'MessageBox', such that the process
308 -- reading the 'MessageBox' receives the message.
309 --
310 -- Return False if the
311 -- 'MessageBox' has been closed or is full.
312 --
313 -- This assumes that the queue is likely empty, and
314 -- tries 'tryToDeliver' first before wasting any
315 -- precious cpu cycles entering 'timeout'.
316 tryToDeliverAndWait ::
317 MonadUnliftIO m =>
318 Int ->
319 BlockingInput a ->
320 a ->
321 m Bool
322 tryToDeliverAndWait !t !o !a =
323 -- Benchmarks have shown great improvements
324 -- when calling tryToDeliver once before doing
325 -- deliver in a System.Timeout.timeout;
326 --
327 -- We even tried calling 'tryToDeliver' more than once,
328 -- but that did not lead to convinving improvements.
329 --
330 -- Benachmarks have also shown, that sending pessimistically
331 -- (i.e. avoiding `tryToDeliver`) does not improve performance,
332 -- even when the message queue is congested
333 --
334 -- See benchmark results:
335 -- `benchmark-results/optimistic-vs-pessimistic.html`
336 tryToDeliver o a >>= \case
337 True -> return True
338 False ->
339 fromMaybe False <$> timeout t (deliver o a $> True)