never executed always true always false
1 {-# LANGUAGE Strict #-}
2
3 -- | A broker extracts a /key/ value from incoming messages
4 -- and creates, keeps and destroys a /resource/ for each key.
5 --
6 -- The demultiplexed messages and their resources are passed to
7 -- a custom 'MessageHandler'/
8 --
9 -- The user provides a 'Demultiplexer' is a pure function that
10 -- returns a key for the resource associated
11 -- to the message and potientially changes the
12 -- message.
13 --
14 -- The demultiplexer may also return a value indicating that
15 -- a new resource must be created, or that a message
16 -- shall be ignored.
17 --
18 -- The broker is run in a seperate process using 'async'.
19 -- The usual way to stop a broker is to 'cancel' it.
20 --
21 -- When cancelling a broker, the resource cleanup
22 -- actions for all resources will be called with
23 -- async exceptions masked.
24 --
25 -- In order to prevent the resource map filling up with
26 -- /dead/ resources, the user of this module has to ensure
27 -- that whenever a resource is not required anymore, a message
28 -- will be sent to the broker, that will cause the 'MessageHandler'
29 -- to be executed for the resource, which will in turn return,
30 -- return 'RemoveResource'.
31 module RIO.ProcessPool.Broker
32 ( spawnBroker,
33 BrokerConfig (..),
34 BrokerResult (..),
35 ResourceCreator,
36 Demultiplexer,
37 ResourceCleaner,
38 MessageHandler,
39 Multiplexed (..),
40 ResourceUpdate (..),
41 )
42 where
43
44 import qualified Data.Map.Strict as Map
45 import RIO
46 import UnliftIO.MessageBox
47 ( IsMessageBox (Input, newInput, receive),
48 IsMessageBoxArg (MessageBox, newMessageBox),
49 )
50 import Control.Concurrent.Async(AsyncCancelled)
51
52 -- | Spawn a broker with a new 'MessageBox',
53 -- and return its message 'Input' channel as well as
54 -- the 'Async' handle of the spawned process, needed to
55 -- stop the broker process.
56 --
57 -- * @k@ is the /key/ for the resource associated to an incoming
58 -- message
59 -- * @w'@ is the type of incoming messages.
60 -- * @w@ is the type of the demultiplexed messages.
61 -- * @a@ specifies the resource type.
62 -- * @m@ is the base monad
63 spawnBroker ::
64 forall brokerBoxArg k w' w a m.
65 ( HasLogFunc m,
66 Ord k,
67 Display k,
68 IsMessageBoxArg brokerBoxArg
69 ) =>
70 brokerBoxArg ->
71 BrokerConfig k w' w a m ->
72 RIO
73 m
74 ( Either
75 SomeException
76 ( Input (MessageBox brokerBoxArg) w',
77 Async BrokerResult
78 )
79 )
80 spawnBroker brokerBoxArg config = do
81 brokerA <- async $ do
82 mBrokerBox <-
83 tryAny
84 ( do
85 b <- newMessageBox brokerBoxArg
86 i <- newInput b
87 return (b, i)
88 )
89 case mBrokerBox of
90 Left er -> return (Left er)
91 Right (brokerBox, brokerInp) -> do
92 aInner <- mask_ $
93 asyncWithUnmask $ \unmaskInner ->
94 brokerLoop unmaskInner brokerBox config Map.empty
95 return (Right (brokerInp, aInner))
96 join <$> waitCatch brokerA
97
98 -- | This is just what the 'Async' returned from
99 -- 'spawnBroker' returns, it's current purpose is to
100 -- make code easier to read.
101 --
102 -- Instead of some @Async ()@ that could be anything,
103 -- there is @Async BrokerResult@.
104 data BrokerResult = MkBrokerResult
105 deriving stock (Show, Eq)
106
107 -- | The broker configuration, used by 'spawnBroker'.
108 --
109 -- * @k@ is the /key/ for the resource associated to an incoming
110 -- message
111 -- * @w'@ is the type of incoming messages.
112 -- * @w@ is the type of the demultiplexed messages.
113 -- * @a@ specifies the resource type.
114 -- * @m@ is the base monad
115 data BrokerConfig k w' w a m = MkBrokerConfig
116 { demultiplexer :: !(Demultiplexer w' k w),
117 messageDispatcher :: !(MessageHandler k w a m),
118 resourceCreator :: !(ResourceCreator k w a m),
119 resourceCleaner :: !(ResourceCleaner k a m)
120 }
121
122 -- | User supplied callback to extract the key and the 'Multiplexed'
123 -- from a message.
124 -- (Sync-) Exceptions thrown from this function are caught and lead
125 -- to dropping of the incoming message, while the broker continues.
126 --
127 -- * @k@ is the /key/ for the resource associated to an incoming
128 -- message
129 -- * @w'@ is the type of incoming messages.
130 -- * @w@ is the type of the demultiplexed messages.
131 type Demultiplexer w' k w = w' -> Multiplexed k w
132
133 -- | User supplied callback to use the 'Multiplexed' message and
134 -- the associated resource.
135 -- (Sync-) Exceptions thrown from this function are caught and lead
136 -- to immediate cleanup of the resource but the broker continues.
137 --
138 -- * Type @k@ is the /key/ for the resource associated to an incoming
139 -- message
140 -- * Type @w@ is the type of incoming, demultiplexed, messages.
141 -- * Type @a@ specifies the resource type.
142 -- * Type @m@ is the base monad
143 type MessageHandler k w a m = k -> w -> a -> RIO m (ResourceUpdate a)
144
145 -- | This value indicates in what state a worker is in after the
146 -- 'MessageHandler' action was executed.
147 data ResourceUpdate a
148 = -- | The resources is still required.
149 KeepResource
150 | -- | The resource is still required but must be updated.
151 UpdateResource a
152 | -- | The resource is obsolete and can
153 -- be removed from the broker.
154 -- The broker will call 'ResourceCleaner' either
155 -- on the current, or an updated resource value.
156 RemoveResource !(Maybe a)
157
158 -- | The action that the broker has to take for in incoming message.
159 --
160 -- * @k@ is the /key/ for the resource associated to an incoming
161 -- message
162 -- * @w@ is the type of the demultiplexed messages.
163 data Multiplexed k w
164 = -- | The message is an initialization message, that requires the
165 -- creation of a new resouce for the given key.
166 -- When the resource is created, then /maybe/ additionally
167 -- a message will also be dispatched.
168 Initialize k !(Maybe w)
169 | -- | Dispatch a message using an existing resource.
170 -- Silently ignore if no resource for the key exists.
171 Dispatch k w
172
173 -- deriving stock (Show)
174
175 -- | User supplied callback to create and initialize a resource.
176 -- (Sync-) Exceptions thrown from this function are caught,
177 -- and the broker continues.
178 --
179 -- * @k@ is the /key/ for the resource associated to an incoming
180 -- message
181 -- * @w@ is the type of the demultiplexed messages.
182 -- * @a@ specifies the resource type.
183 -- * @m@ is the monad of the returned action.
184 type ResourceCreator k w a m = k -> Maybe w -> RIO m a
185
186 -- | User supplied callback called _with exceptions masked_
187 -- when the 'MessageHandler' returns 'RemoveResource'
188 -- (Sync-) Exceptions thrown from this function are caught,
189 -- and do not prevent the removal of the resource, also the
190 -- broker continues.
191 --
192 -- * @k@ is the /key/ for the resource associated to an incoming
193 -- message
194 -- * @a@ specifies the resource type.
195 -- * @m@ is the monad of the returned action.
196 type ResourceCleaner k a m = k -> a -> RIO m ()
197
198 type BrokerState k a = Map k a
199
200 {-# NOINLINE brokerLoop #-}
201 brokerLoop ::
202 ( HasLogFunc m,
203 Ord k,
204 Display k,
205 IsMessageBox msgBox
206 ) =>
207 (forall x. RIO m x -> RIO m x) ->
208 msgBox w' ->
209 BrokerConfig k w' w a m ->
210 BrokerState k a ->
211 RIO m BrokerResult
212 brokerLoop unmask brokerBox config brokerState =
213 withException
214 ( unmask (receive brokerBox)
215 >>= traverse (tryAny . onIncoming unmask config brokerState)
216 )
217 ( \(ex :: SomeException) -> do
218 case fromException ex of
219 Just (_cancelled :: AsyncCancelled) ->
220 logDebug "broker loop: cancelled"
221 _ ->
222 logError
223 ( "broker loop: exception while \
224 \receiving and dispatching messages: "
225 <> display ex
226 )
227 cleanupAllResources config brokerState
228 )
229 >>= maybe
230 ( do
231 logError "broker loop: failed to receive next message"
232 cleanupAllResources config brokerState
233 return MkBrokerResult
234 )
235 ( \res -> do
236 next <-
237 either
238 ( \err -> do
239 logWarn
240 ( "broker loop: Handling the last message\
241 \ caused an exception:"
242 <> display err
243 )
244 return brokerState
245 )
246 return
247 res
248 brokerLoop unmask brokerBox config next
249 )
250
251 {-# NOINLINE onIncoming #-}
252 onIncoming ::
253 (Ord k, HasLogFunc m, Display k) =>
254 (forall x. RIO m x -> RIO m x) ->
255 BrokerConfig k w' w a m ->
256 BrokerState k a ->
257 w' ->
258 RIO m (BrokerState k a)
259 onIncoming unmask config brokerState w' =
260 case demultiplexer config w' of
261 Initialize k mw ->
262 onInitialize unmask k config brokerState mw
263 Dispatch k w ->
264 onDispatch unmask k w config brokerState
265
266 onInitialize ::
267 (Ord k, HasLogFunc m, Display k) =>
268 (forall x. RIO m x -> RIO m x) ->
269 k ->
270 BrokerConfig k w' w a m ->
271 BrokerState k a ->
272 Maybe w ->
273 RIO m (BrokerState k a)
274 onInitialize unmask k config brokerState mw =
275 case Map.lookup k brokerState of
276 Just _ -> do
277 logError
278 ( "cannot initialize a new worker, a worker with that ID exists: "
279 <> display k
280 )
281 return brokerState
282 Nothing ->
283 tryAny (unmask (resourceCreator config k mw))
284 >>= either
285 ( \err -> do
286 logError
287 ( "the resource creator for worker "
288 <> display k
289 <> " threw an exception: "
290 <> display err
291 )
292 return brokerState
293 )
294 ( \res ->
295 let brokerState1 = Map.insert k res brokerState
296 in case mw of
297 Nothing ->
298 return brokerState1
299 Just w ->
300 onException
301 (onDispatch unmask k w config brokerState1)
302 ( do
303 logError
304 ( "exception while dispatching the "
305 <> "post-initialization message for worker: "
306 <> display k
307 )
308 resourceCleaner config k res
309 )
310 )
311
312 onDispatch ::
313 (Ord k, HasLogFunc m, Display k) =>
314 (forall x. RIO m x -> RIO m x) ->
315 k ->
316 w ->
317 BrokerConfig k w' w a m ->
318 BrokerState k a ->
319 RIO m (BrokerState k a)
320 onDispatch unmask k w config brokerState =
321 maybe notFound dispatch (Map.lookup k brokerState)
322 where
323 notFound = do
324 logWarn
325 ( "cannot dispatch message, worker not found: "
326 <> display k
327 )
328 return brokerState
329 dispatch res =
330 tryAny (unmask (messageDispatcher config k w res))
331 >>= either
332 ( \err -> do
333
334 logError
335 ( "the message dispatcher callback for worker "
336 <> display k
337 <> " threw: "
338 <> display err
339 )
340 cleanupResource
341 k
342 config
343 brokerState
344 )
345 ( \case
346 KeepResource ->
347 return brokerState
348 UpdateResource newRes ->
349 return (Map.insert k newRes brokerState)
350 RemoveResource mNewRes ->
351 cleanupResource
352 k
353 config
354 ( maybe
355 brokerState
356 ( \newRes ->
357 Map.insert k newRes brokerState
358 )
359 mNewRes
360 )
361 )
362
363 cleanupAllResources ::
364 BrokerConfig k w' w a m ->
365 BrokerState k a ->
366 RIO m ()
367 cleanupAllResources config brokerState =
368 traverse_
369 ( uncurry
370 (tryResourceCleaner config)
371 )
372 (Map.assocs brokerState)
373
374 cleanupResource ::
375 (Ord k) =>
376 k ->
377 BrokerConfig k w' w a m ->
378 Map k a ->
379 RIO m (Map k a)
380 cleanupResource k config brokerState = do
381 traverse_ (tryResourceCleaner config k) (Map.lookup k brokerState)
382 return (Map.delete k brokerState)
383
384 tryResourceCleaner ::
385 BrokerConfig k w' w a m ->
386 k ->
387 a ->
388 RIO m ()
389 tryResourceCleaner config k res = do
390 void $ tryAny (resourceCleaner config k res)