vendor/symfony/messenger/Middleware/DispatchAfterCurrentBusMiddleware.php line 68

Open in your IDE?
  1. <?php
  2. /*
  3.  * This file is part of the Symfony package.
  4.  *
  5.  * (c) Fabien Potencier <fabien@symfony.com>
  6.  *
  7.  * For the full copyright and license information, please view the LICENSE
  8.  * file that was distributed with this source code.
  9.  */
  10. namespace Symfony\Component\Messenger\Middleware;
  11. use Symfony\Component\Messenger\Envelope;
  12. use Symfony\Component\Messenger\Exception\DelayedMessageHandlingException;
  13. use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp;
  14. /**
  15.  * Allow to configure messages to be handled after the current bus is finished.
  16.  *
  17.  * I.e, messages dispatched from a handler with a DispatchAfterCurrentBus stamp
  18.  * will actually be handled once the current message being dispatched is fully
  19.  * handled.
  20.  *
  21.  * For instance, using this middleware before the DoctrineTransactionMiddleware
  22.  * means sub-dispatched messages with a DispatchAfterCurrentBus stamp would be
  23.  * handled after the Doctrine transaction has been committed.
  24.  *
  25.  * @author Tobias Nyholm <tobias.nyholm@gmail.com>
  26.  */
  27. class DispatchAfterCurrentBusMiddleware implements MiddlewareInterface
  28. {
  29.     /**
  30.      * @var QueuedEnvelope[] A queue of messages and next middleware
  31.      */
  32.     private $queue = [];
  33.     /**
  34.      * @var bool this property is used to signal if we are inside a the first/root call to
  35.      *           MessageBusInterface::dispatch() or if dispatch has been called inside a message handler
  36.      */
  37.     private $isRootDispatchCallRunning false;
  38.     public function handle(Envelope $envelopeStackInterface $stack): Envelope
  39.     {
  40.         if (null !== $envelope->last(DispatchAfterCurrentBusStamp::class)) {
  41.             if ($this->isRootDispatchCallRunning) {
  42.                 $this->queue[] = new QueuedEnvelope($envelope$stack);
  43.                 return $envelope;
  44.             }
  45.             $envelope $envelope->withoutAll(DispatchAfterCurrentBusStamp::class);
  46.         }
  47.         if ($this->isRootDispatchCallRunning) {
  48.             /*
  49.              * A call to MessageBusInterface::dispatch() was made from inside the main bus handling,
  50.              * but the message does not have the stamp. So, process it like normal.
  51.              */
  52.             return $stack->next()->handle($envelope$stack);
  53.         }
  54.         // First time we get here, mark as inside a "root dispatch" call:
  55.         $this->isRootDispatchCallRunning true;
  56.         try {
  57.             // Execute the whole middleware stack & message handling for main dispatch:
  58.             $returnedEnvelope $stack->next()->handle($envelope$stack);
  59.         } catch (\Throwable $exception) {
  60.             /*
  61.              * Whenever an exception occurs while handling a message that has
  62.              * queued other messages, we drop the queued ones.
  63.              * This is intentional since the queued commands were likely dependent
  64.              * on the preceding command.
  65.              */
  66.             $this->queue = [];
  67.             $this->isRootDispatchCallRunning false;
  68.             throw $exception;
  69.         }
  70.         // "Root dispatch" call is finished, dispatch stored messages.
  71.         $exceptions = [];
  72.         while (null !== $queueItem array_shift($this->queue)) {
  73.             // Save how many messages are left in queue before handling the message
  74.             $queueLengthBefore \count($this->queue);
  75.             try {
  76.                 // Execute the stored messages
  77.                 $queueItem->getStack()->next()->handle($queueItem->getEnvelope(), $queueItem->getStack());
  78.             } catch (\Exception $exception) {
  79.                 // Gather all exceptions
  80.                 $exceptions[] = $exception;
  81.                 // Restore queue to previous state
  82.                 $this->queue \array_slice($this->queue0$queueLengthBefore);
  83.             }
  84.         }
  85.         $this->isRootDispatchCallRunning false;
  86.         if (\count($exceptions) > 0) {
  87.             throw new DelayedMessageHandlingException($exceptions);
  88.         }
  89.         return $returnedEnvelope;
  90.     }
  91. }
  92. /**
  93.  * @internal
  94.  */
  95. final class QueuedEnvelope
  96. {
  97.     /** @var Envelope */
  98.     private $envelope;
  99.     /** @var StackInterface */
  100.     private $stack;
  101.     public function __construct(Envelope $envelopeStackInterface $stack)
  102.     {
  103.         $this->envelope $envelope->withoutAll(DispatchAfterCurrentBusStamp::class);
  104.         $this->stack $stack;
  105.     }
  106.     public function getEnvelope(): Envelope
  107.     {
  108.         return $this->envelope;
  109.     }
  110.     public function getStack(): StackInterface
  111.     {
  112.         return $this->stack;
  113.     }
  114. }