    1 -- | Launch- and Dispatch messages to processes.
    2 --
    3 -- A pool has an 'Input' for 'Multiplexed' messages,
    4 -- and dispatches incoming messges to concurrent
    5 -- processes using user defined @'MessageBox'es@.
    6 --
    7 -- The pool starts and stops the processes and
    8 -- creates the message boxes.
    9 --
   10 -- The user supplied 'PoolWorkerCallback' 
   11 -- usually runs a loop that @'receive's@ messages
   12 -- from the 'MessageBox' created by the pool for that worker.
   13 --
   14 -- When a worker process dies, e.g. because the 
   15 -- 'PoolWorkerCallback' returns, the pool
   16 -- process will also 'cancel' the process (just to make sure...)
   17 -- and cleanup the internal 'Broker'.
   18 module RIO.ProcessPool.Pool
   19   ( Pool (..),
   20     spawnPool,
   21     PoolWorkerCallback (..),
   22     removePoolWorkerMessage,
   23   )
   24 where
   26 import RIO
   27 import RIO.ProcessPool.Broker
   28   ( BrokerConfig (MkBrokerConfig),
   29     BrokerResult,
   30     Multiplexed (Dispatch),
   31     ResourceUpdate (KeepResource, RemoveResource),
   32     spawnBroker,
   33   )
   34 import UnliftIO.MessageBox.Class
   35   ( IsInput (deliver),
   36     IsMessageBox (Input, newInput),
   37     IsMessageBoxArg (MessageBox, newMessageBox),
   38   )
   40 -- | Start a 'Pool'.
   41 --
   42 -- Start a process that receives messages sent to the
   43 -- 'poolInput' and dispatches them to the 'Input' of
   44 -- __pool member__ processes. If necessary the
   45 -- pool worker processes are started.
   46 --
   47 -- Each pool worker process is started using 'async' and
   48 -- executes the 'PoolWorkerCallback'.
   49 --
   50 -- When the callback returns, the process will exit.
   51 --
   52 -- Internally the pool uses the 'async' function to wrap
   53 -- the callback.
   54 --
   55 -- When a 'Multiplixed' 'Dispatch' message is received with
   56 -- a @Nothing@ then the worker is @'cancel'led@ and the
   57 -- worker is removed from the map.
   58 --
   59 -- Such a message is automatically sent after the 'PoolWorkerCallback'
   60 -- has returned, even when an exception was thrown. See
   61 -- 'finally'.
   62 spawnPool ::
   63   forall k w poolBox workerBox m.
   64   ( IsMessageBoxArg poolBox,
   65     IsMessageBoxArg workerBox,
   66     Ord k,
   67     Display k,
   68     HasLogFunc m
   69   ) =>
   70   poolBox ->
   71   workerBox ->
   72   PoolWorkerCallback workerBox w k m ->
   73   RIO
   74     m
   75     ( Either
   76         SomeException
   77         (Pool poolBox k w)
   78     )
   79 spawnPool poolBox workerBoxArg poolMemberImpl = do
   80   brInRef <- newEmptyMVar
   81   let brCfg =
   82         MkBrokerConfig
   83           id
   84           dispatchToWorker
   85           (spawnWorker workerBoxArg brInRef poolMemberImpl)
   86           removeWorker
   87   spawnBroker poolBox brCfg
   88     >>= traverse
   89       ( \(brIn, brA) -> do
   90           putMVar brInRef brIn
   91           return MkPool {poolInput = brIn, poolAsync = brA}
   92       )
   94 -- | This message will 'cancel' the worker
   95 -- with the given key.
   96 -- If the 'PoolWorkerCallback' wants to do cleanup
   97 -- it should use 'finally' or 'onException'.
   98 removePoolWorkerMessage :: k -> Multiplexed k (Maybe w)
   99 removePoolWorkerMessage !k = Dispatch k Nothing
  101 -- | The function that processes a
  102 -- 'MessageBox' of a worker for a specific /key/.
  103 newtype PoolWorkerCallback workerBox w k m = MkPoolWorkerCallback
  104   { runPoolWorkerCallback :: k -> MessageBox workerBox w -> RIO m ()
  105   }
  107 -- | A record containing the message box 'Input' of the
  108 -- 'Broker' and the 'Async' value required to 'cancel'
  109 -- the pools broker process.
  110 data Pool poolBox k w = MkPool
  111   { -- | Message sent to this input are dispatched to workers.
  112     -- If the message is an 'Initialize' message, a new 'async'
  113     -- process will be started.
  114     -- If the message value is 'Nothing', the processes is killed.
  115     poolInput :: !(Input (MessageBox poolBox) (Multiplexed k (Maybe w))),
  116     -- | The async of the internal 'Broker'.
  117     poolAsync :: !(Async BrokerResult)
  118   }
  120 -- | Internal data structure containing a workers
  121 -- message 'Input' and 'Async' value for cancellation.
  122 data PoolWorker workerBox w = MkPoolWorker
  123   { poolWorkerIn :: !(Input (MessageBox workerBox) w),
  124     poolWorkerAsync :: !(Async ())
  125   }
  127 dispatchToWorker ::
  128   (HasLogFunc m, IsInput (Input (MessageBox b)), Display k) =>
  129   k ->
  130   Maybe w ->
  131   PoolWorker b w ->
  132   RIO m (ResourceUpdate (PoolWorker b w))
  133 dispatchToWorker k pMsg pm =
  134   case pMsg of
  135     Just w -> helper w
  136     Nothing -> return (RemoveResource Nothing)
  137   where
  138     helper msg = do
  139       ok <- deliver (poolWorkerIn pm) msg
  140       if not ok
  141         then do
  142           logError ("failed to deliver message to pool worker: " <> display k)
  143           return (RemoveResource Nothing)
  144         else return KeepResource
  146 spawnWorker ::
  147   forall k w poolBoxIn workerBox m.
  148   ( IsMessageBoxArg workerBox,
  149     HasLogFunc m,
  150     IsInput poolBoxIn,
  151     Display k
  152   ) =>
  153   workerBox ->
  154   MVar (poolBoxIn (Multiplexed k (Maybe w))) ->
  155   PoolWorkerCallback workerBox w k m ->
  156   k ->
  157   Maybe (Maybe w) ->
  158   RIO m (PoolWorker workerBox w)
  159 spawnWorker workerBox brInRef pmCb this _mw = do
  160   inputRef <- newEmptyMVar
  161   a <- async (go inputRef `finally` enqueueCleanup)
  162   boxInM <- takeMVar inputRef
  163   case boxInM of
  164     Nothing -> do
  165       cancel a
  166       throwIO (stringException "failed to spawnWorker")
  167     Just boxIn ->
  168       return MkPoolWorker {poolWorkerIn = boxIn, poolWorkerAsync = a}
  169   where
  170     go inputRef = do
  171       (b, boxIn) <-
  172         withException
  173           ( do
  174               b <- newMessageBox workerBox
  175               boxIn <- newInput b
  176               return (b, boxIn)
  177           )
  178           (\(ex :: SomeException) -> do
  179               logError
  180                 ( "failed to create the message box for the new pool worker: "
  181                     <> display this
  182                     <> " exception caught: "
  183                     <> display ex
  184                 )
  185               putMVar inputRef Nothing
  186           )
  187       putMVar inputRef (Just boxIn)
  188       runPoolWorkerCallback pmCb this b
  189     enqueueCleanup =
  190       tryReadMVar brInRef
  191         >>= traverse_
  192           ( \brIn ->
  193               void (deliver brIn (removePoolWorkerMessage this))
  194           )
  196 removeWorker ::  
  197   k ->
  198   PoolWorker workerBox w ->
  199   RIO m ()
  200 removeWorker _k =
  201   void . cancel . poolWorkerAsync