never executed always true always false
1 -- | Launch- and Dispatch messages to processes.
2 --
3 -- A pool has an 'Input' for 'Multiplexed' messages,
4 -- and dispatches incoming messges to concurrent
5 -- processes using user defined @'MessageBox'es@.
6 --
7 -- The pool starts and stops the processes and
8 -- creates the message boxes.
9 --
10 -- The user supplied 'PoolWorkerCallback'
11 -- usually runs a loop that @'receive's@ messages
12 -- from the 'MessageBox' created by the pool for that worker.
13 --
14 -- When a worker process dies, e.g. because the
15 -- 'PoolWorkerCallback' returns, the pool
16 -- process will also 'cancel' the process (just to make sure...)
17 -- and cleanup the internal 'Broker'.
18 module RIO.ProcessPool.Pool
19 ( Pool (..),
20 spawnPool,
21 PoolWorkerCallback (..),
22 removePoolWorkerMessage,
23 )
24 where
25
26 import RIO
27 import RIO.ProcessPool.Broker
28 ( BrokerConfig (MkBrokerConfig),
29 BrokerResult,
30 Multiplexed (Dispatch),
31 ResourceUpdate (KeepResource, RemoveResource),
32 spawnBroker,
33 )
34 import UnliftIO.MessageBox.Class
35 ( IsInput (deliver),
36 IsMessageBox (Input, newInput),
37 IsMessageBoxArg (MessageBox, newMessageBox),
38 )
39
40 -- | Start a 'Pool'.
41 --
42 -- Start a process that receives messages sent to the
43 -- 'poolInput' and dispatches them to the 'Input' of
44 -- __pool member__ processes. If necessary the
45 -- pool worker processes are started.
46 --
47 -- Each pool worker process is started using 'async' and
48 -- executes the 'PoolWorkerCallback'.
49 --
50 -- When the callback returns, the process will exit.
51 --
52 -- Internally the pool uses the 'async' function to wrap
53 -- the callback.
54 --
55 -- When a 'Multiplixed' 'Dispatch' message is received with
56 -- a @Nothing@ then the worker is @'cancel'led@ and the
57 -- worker is removed from the map.
58 --
59 -- Such a message is automatically sent after the 'PoolWorkerCallback'
60 -- has returned, even when an exception was thrown. See
61 -- 'finally'.
62 spawnPool ::
63 forall k w poolBox workerBox m.
64 ( IsMessageBoxArg poolBox,
65 IsMessageBoxArg workerBox,
66 Ord k,
67 Display k,
68 HasLogFunc m
69 ) =>
70 poolBox ->
71 workerBox ->
72 PoolWorkerCallback workerBox w k m ->
73 RIO
74 m
75 ( Either
76 SomeException
77 (Pool poolBox k w)
78 )
79 spawnPool poolBox workerBoxArg poolMemberImpl = do
80 brInRef <- newEmptyMVar
81 let brCfg =
82 MkBrokerConfig
83 id
84 dispatchToWorker
85 (spawnWorker workerBoxArg brInRef poolMemberImpl)
86 removeWorker
87 spawnBroker poolBox brCfg
88 >>= traverse
89 ( \(brIn, brA) -> do
90 putMVar brInRef brIn
91 return MkPool {poolInput = brIn, poolAsync = brA}
92 )
93
94 -- | This message will 'cancel' the worker
95 -- with the given key.
96 -- If the 'PoolWorkerCallback' wants to do cleanup
97 -- it should use 'finally' or 'onException'.
98 removePoolWorkerMessage :: k -> Multiplexed k (Maybe w)
99 removePoolWorkerMessage !k = Dispatch k Nothing
100
101 -- | The function that processes a
102 -- 'MessageBox' of a worker for a specific /key/.
103 newtype PoolWorkerCallback workerBox w k m = MkPoolWorkerCallback
104 { runPoolWorkerCallback :: k -> MessageBox workerBox w -> RIO m ()
105 }
106
107 -- | A record containing the message box 'Input' of the
108 -- 'Broker' and the 'Async' value required to 'cancel'
109 -- the pools broker process.
110 data Pool poolBox k w = MkPool
111 { -- | Message sent to this input are dispatched to workers.
112 -- If the message is an 'Initialize' message, a new 'async'
113 -- process will be started.
114 -- If the message value is 'Nothing', the processes is killed.
115 poolInput :: !(Input (MessageBox poolBox) (Multiplexed k (Maybe w))),
116 -- | The async of the internal 'Broker'.
117 poolAsync :: !(Async BrokerResult)
118 }
119
120 -- | Internal data structure containing a workers
121 -- message 'Input' and 'Async' value for cancellation.
122 data PoolWorker workerBox w = MkPoolWorker
123 { poolWorkerIn :: !(Input (MessageBox workerBox) w),
124 poolWorkerAsync :: !(Async ())
125 }
126
127 dispatchToWorker ::
128 (HasLogFunc m, IsInput (Input (MessageBox b)), Display k) =>
129 k ->
130 Maybe w ->
131 PoolWorker b w ->
132 RIO m (ResourceUpdate (PoolWorker b w))
133 dispatchToWorker k pMsg pm =
134 case pMsg of
135 Just w -> helper w
136 Nothing -> return (RemoveResource Nothing)
137 where
138 helper msg = do
139 ok <- deliver (poolWorkerIn pm) msg
140 if not ok
141 then do
142 logError ("failed to deliver message to pool worker: " <> display k)
143 return (RemoveResource Nothing)
144 else return KeepResource
145
146 spawnWorker ::
147 forall k w poolBoxIn workerBox m.
148 ( IsMessageBoxArg workerBox,
149 HasLogFunc m,
150 IsInput poolBoxIn,
151 Display k
152 ) =>
153 workerBox ->
154 MVar (poolBoxIn (Multiplexed k (Maybe w))) ->
155 PoolWorkerCallback workerBox w k m ->
156 k ->
157 Maybe (Maybe w) ->
158 RIO m (PoolWorker workerBox w)
159 spawnWorker workerBox brInRef pmCb this _mw = do
160 inputRef <- newEmptyMVar
161 a <- async (go inputRef `finally` enqueueCleanup)
162 boxInM <- takeMVar inputRef
163 case boxInM of
164 Nothing -> do
165 cancel a
166 throwIO (stringException "failed to spawnWorker")
167 Just boxIn ->
168 return MkPoolWorker {poolWorkerIn = boxIn, poolWorkerAsync = a}
169 where
170 go inputRef = do
171 (b, boxIn) <-
172 withException
173 ( do
174 b <- newMessageBox workerBox
175 boxIn <- newInput b
176 return (b, boxIn)
177 )
178 (\(ex :: SomeException) -> do
179 logError
180 ( "failed to create the message box for the new pool worker: "
181 <> display this
182 <> " exception caught: "
183 <> display ex
184 )
185 putMVar inputRef Nothing
186 )
187 putMVar inputRef (Just boxIn)
188 runPoolWorkerCallback pmCb this b
189 enqueueCleanup =
190 tryReadMVar brInRef
191 >>= traverse_
192 ( \brIn ->
193 void (deliver brIn (removePoolWorkerMessage this))
194 )
195
196 removeWorker ::
197 k ->
198 PoolWorker workerBox w ->
199 RIO m ()
200 removeWorker _k =
201 void . cancel . poolWorkerAsync