vendor/shopware/core/Framework/MessageQueue/Api/ConsumeMessagesController.php line 98

Open in your IDE?
  1. <?php declare(strict_types=1);
  2. namespace Shopware\Core\Framework\MessageQueue\Api;
  3. use Shopware\Core\Framework\Log\Package;
  4. use Shopware\Core\Framework\MessageQueue\Subscriber\CountHandledMessagesListener;
  5. use Shopware\Core\Framework\MessageQueue\Subscriber\EarlyReturnMessagesListener;
  6. use Shopware\Core\Framework\Routing\Annotation\RouteScope;
  7. use Shopware\Core\Framework\Routing\Annotation\Since;
  8. use Shopware\Core\Framework\Util\MemorySizeCalculator;
  9. use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
  10. use Symfony\Component\DependencyInjection\ServiceLocator;
  11. use Symfony\Component\EventDispatcher\EventDispatcher;
  12. use Symfony\Component\HttpFoundation\JsonResponse;
  13. use Symfony\Component\HttpFoundation\Request;
  14. use Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener;
  15. use Symfony\Component\Messenger\EventListener\StopWorkerOnMemoryLimitListener;
  16. use Symfony\Component\Messenger\EventListener\StopWorkerOnRestartSignalListener;
  17. use Symfony\Component\Messenger\EventListener\StopWorkerOnSigtermSignalListener;
  18. use Symfony\Component\Messenger\MessageBusInterface;
  19. use Symfony\Component\Messenger\Worker;
  20. use Symfony\Component\Routing\Annotation\Route;
  21. /**
  22.  * @Route(defaults={"_routeScope"={"api"}})
  23.  */
  24. #[Package('system-settings')]
  25. class ConsumeMessagesController extends AbstractController
  26. {
  27.     /**
  28.      * @var ServiceLocator
  29.      */
  30.     private $receiverLocator;
  31.     /**
  32.      * @var MessageBusInterface
  33.      */
  34.     private $bus;
  35.     /**
  36.      * @var int
  37.      */
  38.     private $pollInterval;
  39.     /**
  40.      * @var StopWorkerOnRestartSignalListener
  41.      */
  42.     private $stopWorkerOnRestartSignalListener;
  43.     /**
  44.      * @var StopWorkerOnSigtermSignalListener
  45.      */
  46.     private $stopWorkerOnSigtermSignalListener;
  47.     /**
  48.      * @var DispatchPcntlSignalListener
  49.      */
  50.     private $dispatchPcntlSignalListener;
  51.     /**
  52.      * @var EarlyReturnMessagesListener
  53.      */
  54.     private $earlyReturnListener;
  55.     private string $defaultTransportName;
  56.     private string $memoryLimit;
  57.     /**
  58.      * @internal
  59.      */
  60.     public function __construct(
  61.         ServiceLocator $receiverLocator,
  62.         MessageBusInterface $bus,
  63.         int $pollInterval,
  64.         StopWorkerOnRestartSignalListener $stopWorkerOnRestartSignalListener,
  65.         StopWorkerOnSigtermSignalListener $stopWorkerOnSigtermSignalListener,
  66.         DispatchPcntlSignalListener $dispatchPcntlSignalListener,
  67.         EarlyReturnMessagesListener $earlyReturnListener,
  68.         string $defaultTransportName,
  69.         string $memoryLimit
  70.     ) {
  71.         $this->receiverLocator $receiverLocator;
  72.         $this->bus $bus;
  73.         $this->pollInterval $pollInterval;
  74.         $this->stopWorkerOnRestartSignalListener $stopWorkerOnRestartSignalListener;
  75.         $this->stopWorkerOnSigtermSignalListener $stopWorkerOnSigtermSignalListener;
  76.         $this->dispatchPcntlSignalListener $dispatchPcntlSignalListener;
  77.         $this->earlyReturnListener $earlyReturnListener;
  78.         $this->defaultTransportName $defaultTransportName;
  79.         $this->memoryLimit $memoryLimit;
  80.     }
  81.     /**
  82.      * @Since("6.0.0.0")
  83.      * @Route("/api/_action/message-queue/consume", name="api.action.message-queue.consume", methods={"POST"})
  84.      */
  85.     public function consumeMessages(Request $request): JsonResponse
  86.     {
  87.         $receiverName $request->get('receiver');
  88.         if (!$receiverName || !$this->receiverLocator->has($receiverName)) {
  89.             throw new \RuntimeException('No receiver name provided.');
  90.         }
  91.         $receiver $this->receiverLocator->get($receiverName);
  92.         $workerDispatcher = new EventDispatcher();
  93.         $listener = new CountHandledMessagesListener($this->pollInterval);
  94.         $workerDispatcher->addSubscriber($listener);
  95.         $workerDispatcher->addSubscriber($this->stopWorkerOnRestartSignalListener);
  96.         $workerDispatcher->addSubscriber($this->stopWorkerOnSigtermSignalListener);
  97.         $workerDispatcher->addSubscriber($this->dispatchPcntlSignalListener);
  98.         $workerDispatcher->addSubscriber($this->earlyReturnListener);
  99.         if ($this->memoryLimit !== '-1') {
  100.             $workerDispatcher->addSubscriber(new StopWorkerOnMemoryLimitListener(
  101.                 MemorySizeCalculator::convertToBytes($this->memoryLimit)
  102.             ));
  103.         }
  104.         $worker = new Worker([$this->defaultTransportName => $receiver], $this->bus$workerDispatcher);
  105.         $worker->run(['sleep' => 50]);
  106.         return new JsonResponse(['handledMessages' => $listener->getHandledMessages()]);
  107.     }
  108. }