never executed always true always false
    1 {-# LANGUAGE Strict #-}
    2 
    3 -- | A broker extracts a /key/ value from incoming messages
    4 --  and creates, keeps and destroys a /resource/ for each key.
    5 --
    6 -- The demultiplexed messages and their resources are passed to
    7 -- a custom 'MessageHandler'/
    8 --
    9 -- The user provides a 'Demultiplexer' is a pure function that
   10 -- returns a key for the resource associated
   11 -- to the message and potientially changes the
   12 -- message.
   13 --
   14 -- The demultiplexer may also return a value indicating that
   15 -- a new resource must be created, or that a message
   16 -- shall be ignored.
   17 --
   18 -- The broker is run in a seperate process using 'async'.
   19 -- The usual way to stop a broker is to 'cancel' it.
   20 --
   21 -- When cancelling a broker, the resource cleanup
   22 -- actions for all resources will be called with
   23 -- async exceptions masked.
   24 --
   25 -- In order to prevent the resource map filling up with
   26 -- /dead/ resources, the user of this module has to ensure
   27 -- that whenever a resource is not required anymore, a message
   28 -- will be sent to the broker, that will cause the 'MessageHandler'
   29 -- to be executed for the resource, which will in turn return,
   30 -- return 'RemoveResource'.
   31 module RIO.ProcessPool.Broker
   32   ( spawnBroker,
   33     BrokerConfig (..),
   34     BrokerResult (..),
   35     ResourceCreator,
   36     Demultiplexer,
   37     ResourceCleaner,
   38     MessageHandler,
   39     Multiplexed (..),
   40     ResourceUpdate (..),
   41   )
   42 where
   43 
   44 import qualified Data.Map.Strict as Map
   45 import RIO
   46 import UnliftIO.MessageBox
   47   ( IsMessageBox (Input, newInput, receive),
   48     IsMessageBoxArg (MessageBox, newMessageBox),
   49   )
   50 import Control.Concurrent.Async(AsyncCancelled)  
   51 
   52 -- | Spawn a broker with a new 'MessageBox',
   53 --  and return its message 'Input' channel as well as
   54 -- the 'Async' handle of the spawned process, needed to
   55 -- stop the broker process.
   56 --
   57 -- * @k@ is the /key/ for the resource associated to an incoming
   58 --   message
   59 -- * @w'@ is the type of incoming messages.
   60 -- * @w@ is the type of the demultiplexed messages.
   61 -- * @a@ specifies the resource type.
   62 -- * @m@ is the base monad
   63 spawnBroker ::
   64   forall brokerBoxArg k w' w a m.
   65   ( HasLogFunc m,
   66     Ord k,
   67     Display k,
   68     IsMessageBoxArg brokerBoxArg
   69   ) =>
   70   brokerBoxArg ->
   71   BrokerConfig k w' w a m ->
   72   RIO
   73     m
   74     ( Either
   75         SomeException
   76         ( Input (MessageBox brokerBoxArg) w',
   77           Async BrokerResult
   78         )
   79     )
   80 spawnBroker brokerBoxArg config = do
   81   brokerA <- async $ do
   82     mBrokerBox <-
   83       tryAny
   84         ( do
   85             b <- newMessageBox brokerBoxArg
   86             i <- newInput b
   87             return (b, i)
   88         )
   89     case mBrokerBox of
   90       Left er -> return (Left er)
   91       Right (brokerBox, brokerInp) -> do
   92         aInner <- mask_ $
   93           asyncWithUnmask $ \unmaskInner ->
   94             brokerLoop unmaskInner brokerBox config Map.empty
   95         return (Right (brokerInp, aInner))
   96   join <$> waitCatch brokerA
   97 
   98 -- | This is just what the 'Async' returned from
   99 -- 'spawnBroker' returns, it's current purpose is to
  100 -- make code easier to read.
  101 --
  102 -- Instead of some @Async ()@ that could be anything,
  103 -- there is @Async BrokerResult@.
  104 data BrokerResult = MkBrokerResult
  105   deriving stock (Show, Eq)
  106 
  107 -- | The broker configuration, used by 'spawnBroker'.
  108 --
  109 -- * @k@ is the /key/ for the resource associated to an incoming
  110 --   message
  111 -- * @w'@ is the type of incoming messages.
  112 -- * @w@ is the type of the demultiplexed messages.
  113 -- * @a@ specifies the resource type.
  114 -- * @m@ is the base monad
  115 data BrokerConfig k w' w a m = MkBrokerConfig
  116   { demultiplexer :: !(Demultiplexer w' k w),
  117     messageDispatcher :: !(MessageHandler k w a m),
  118     resourceCreator :: !(ResourceCreator k w a m),
  119     resourceCleaner :: !(ResourceCleaner k a m)
  120   }
  121 
  122 -- | User supplied callback to extract the key and the 'Multiplexed'
  123 --  from a message.
  124 --  (Sync-) Exceptions thrown from this function are caught and lead
  125 --  to dropping of the incoming message, while the broker continues.
  126 --
  127 -- * @k@ is the /key/ for the resource associated to an incoming
  128 --   message
  129 -- * @w'@ is the type of incoming messages.
  130 -- * @w@ is the type of the demultiplexed messages.
  131 type Demultiplexer w' k w = w' -> Multiplexed k w
  132 
  133 -- | User supplied callback to use the 'Multiplexed' message and
  134 --  the associated resource.
  135 --  (Sync-) Exceptions thrown from this function are caught and lead
  136 --  to immediate cleanup of the resource but the broker continues.
  137 --
  138 -- * Type @k@ is the /key/ for the resource associated to an incoming
  139 --   message
  140 -- * Type @w@ is the type of incoming, demultiplexed, messages.
  141 -- * Type @a@ specifies the resource type.
  142 -- * Type @m@ is the base monad
  143 type MessageHandler k w a m = k -> w -> a -> RIO m (ResourceUpdate a)
  144 
  145 -- | This value indicates in what state a worker is in after the
  146 --  'MessageHandler' action was executed.
  147 data ResourceUpdate a
  148   = -- | The resources is still required.
  149     KeepResource
  150   | -- | The resource is still required but must be updated.
  151     UpdateResource a
  152   | -- | The resource is obsolete and can
  153     --   be removed from the broker.
  154     --   The broker will call 'ResourceCleaner' either
  155     --   on the current, or an updated resource value.
  156     RemoveResource !(Maybe a)
  157 
  158 -- | The action that the broker has to take for in incoming message.
  159 --
  160 -- * @k@ is the /key/ for the resource associated to an incoming
  161 --   message
  162 -- * @w@ is the type of the demultiplexed messages.
  163 data Multiplexed k w
  164   = -- | The message is an initialization message, that requires the
  165     --   creation of a new resouce for the given key.
  166     --   When the resource is created, then /maybe/ additionally
  167     --   a message will also be dispatched.
  168     Initialize k !(Maybe w)
  169   | -- | Dispatch a message using an existing resource.
  170     -- Silently ignore if no resource for the key exists.
  171     Dispatch k w
  172 
  173 -- deriving stock (Show)
  174 
  175 -- | User supplied callback to create and initialize a resource.
  176 --  (Sync-) Exceptions thrown from this function are caught,
  177 --  and the broker continues.
  178 --
  179 -- * @k@ is the /key/ for the resource associated to an incoming
  180 --   message
  181 -- * @w@ is the type of the demultiplexed messages.
  182 -- * @a@ specifies the resource type.
  183 -- * @m@ is the monad of the returned action.
  184 type ResourceCreator k w a m = k -> Maybe w -> RIO m a
  185 
  186 -- | User supplied callback called _with exceptions masked_
  187 -- when the 'MessageHandler' returns 'RemoveResource'
  188 -- (Sync-) Exceptions thrown from this function are caught,
  189 -- and do not prevent the removal of the resource, also the
  190 -- broker continues.
  191 --
  192 -- * @k@ is the /key/ for the resource associated to an incoming
  193 --   message
  194 -- * @a@ specifies the resource type.
  195 -- * @m@ is the monad of the returned action.
  196 type ResourceCleaner k a m = k -> a -> RIO m ()
  197 
  198 type BrokerState k a = Map k a
  199 
  200 {-# NOINLINE brokerLoop #-}
  201 brokerLoop ::
  202   ( HasLogFunc m,
  203     Ord k,
  204     Display k,
  205     IsMessageBox msgBox
  206   ) =>
  207   (forall x. RIO m x -> RIO m x) ->
  208   msgBox w' ->
  209   BrokerConfig k w' w a m ->
  210   BrokerState k a ->
  211   RIO m BrokerResult
  212 brokerLoop unmask brokerBox config brokerState =
  213   withException
  214     ( unmask (receive brokerBox)
  215         >>= traverse (tryAny . onIncoming unmask config brokerState)
  216     )
  217     ( \(ex :: SomeException) -> do      
  218         case fromException ex of
  219           Just (_cancelled :: AsyncCancelled) ->
  220             logDebug "broker loop: cancelled"
  221           _ ->
  222             logError
  223               ( "broker loop: exception while \
  224                 \receiving and dispatching messages: "
  225                   <> display ex
  226               )
  227         cleanupAllResources config brokerState
  228     )
  229     >>= maybe
  230       ( do
  231           logError "broker loop: failed to receive next message"
  232           cleanupAllResources config brokerState
  233           return MkBrokerResult
  234       )
  235       ( \res -> do
  236           next <-
  237             either
  238               ( \err -> do
  239                   logWarn
  240                     ( "broker loop: Handling the last message\
  241                       \ caused an exception:"
  242                         <> display err
  243                     )
  244                   return brokerState
  245               )
  246               return
  247               res
  248           brokerLoop unmask brokerBox config next
  249       )
  250 
  251 {-# NOINLINE onIncoming #-}
  252 onIncoming ::
  253   (Ord k, HasLogFunc m, Display k) =>
  254   (forall x. RIO m x -> RIO m x) ->
  255   BrokerConfig k w' w a m ->
  256   BrokerState k a ->
  257   w' ->
  258   RIO m (BrokerState k a)
  259 onIncoming unmask config brokerState w' =
  260   case demultiplexer config w' of
  261     Initialize k mw ->
  262       onInitialize unmask k config brokerState mw
  263     Dispatch k w ->
  264       onDispatch unmask k w config brokerState
  265 
  266 onInitialize ::
  267   (Ord k, HasLogFunc m, Display k) =>
  268   (forall x. RIO m x -> RIO m x) ->
  269   k ->
  270   BrokerConfig k w' w a m ->
  271   BrokerState k a ->
  272   Maybe w ->
  273   RIO m (BrokerState k a)
  274 onInitialize unmask k config brokerState mw =
  275   case Map.lookup k brokerState of
  276     Just _ -> do
  277       logError
  278         ( "cannot initialize a new worker, a worker with that ID exists: "
  279             <> display k
  280         )
  281       return brokerState
  282     Nothing ->
  283       tryAny (unmask (resourceCreator config k mw))
  284         >>= either
  285           ( \err -> do
  286               logError
  287                 ( "the resource creator for worker "
  288                     <> display k
  289                     <> " threw an exception: "
  290                     <> display err
  291                 )
  292               return brokerState
  293           )
  294           ( \res ->
  295               let brokerState1 = Map.insert k res brokerState
  296                in case mw of
  297                     Nothing ->
  298                       return brokerState1
  299                     Just w ->
  300                       onException
  301                         (onDispatch unmask k w config brokerState1)
  302                         ( do
  303                             logError
  304                               ( "exception while dispatching the "
  305                                   <> "post-initialization message for worker: "
  306                                   <> display k
  307                               )
  308                             resourceCleaner config k res
  309                         )
  310           )
  311 
  312 onDispatch ::
  313   (Ord k, HasLogFunc m, Display k) =>
  314   (forall x. RIO m x -> RIO m x) ->
  315   k ->
  316   w ->
  317   BrokerConfig k w' w a m ->
  318   BrokerState k a ->
  319   RIO m (BrokerState k a)
  320 onDispatch unmask k w config brokerState =
  321   maybe notFound dispatch (Map.lookup k brokerState)
  322   where
  323     notFound = do
  324       logWarn
  325         ( "cannot dispatch message, worker not found: "
  326             <> display k
  327         )
  328       return brokerState
  329     dispatch res =
  330       tryAny (unmask (messageDispatcher config k w res))
  331         >>= either
  332           ( \err -> do
  333 
  334               logError
  335                 ( "the message dispatcher callback for worker "
  336                     <> display k
  337                     <> " threw: "
  338                     <> display err
  339                 )
  340               cleanupResource
  341                 k
  342                 config
  343                 brokerState
  344           )
  345           ( \case
  346               KeepResource ->
  347                 return brokerState
  348               UpdateResource newRes ->
  349                 return (Map.insert k newRes brokerState)
  350               RemoveResource mNewRes ->
  351                 cleanupResource
  352                   k
  353                   config
  354                   ( maybe
  355                       brokerState
  356                       ( \newRes ->
  357                           Map.insert k newRes brokerState
  358                       )
  359                       mNewRes
  360                   )
  361           )
  362 
  363 cleanupAllResources ::
  364   BrokerConfig k w' w a m ->
  365   BrokerState k a ->
  366   RIO m ()
  367 cleanupAllResources config brokerState =
  368   traverse_
  369     ( uncurry
  370         (tryResourceCleaner config)
  371     )
  372     (Map.assocs brokerState)
  373 
  374 cleanupResource ::
  375   (Ord k) =>
  376   k ->
  377   BrokerConfig k w' w a m ->
  378   Map k a ->
  379   RIO m (Map k a)
  380 cleanupResource k config brokerState = do
  381   traverse_ (tryResourceCleaner config k) (Map.lookup k brokerState)
  382   return (Map.delete k brokerState)
  383 
  384 tryResourceCleaner ::
  385   BrokerConfig k w' w a m ->
  386   k ->
  387   a ->
  388   RIO m ()
  389 tryResourceCleaner config k res = do
  390   void $ tryAny (resourceCleaner config k res)