Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf35b5ad

Browse files
committed
[Messenger] implementation of messenger:consume, which processes messages concurrently
1 parent82acd7a commitf35b5ad

File tree

10 files changed

+263
-10
lines changed

10 files changed

+263
-10
lines changed

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2158,7 +2158,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
21582158

21592159
if ($busId ===$config['default_bus']) {
21602160
$container->setAlias('messenger.default_bus',$busId)->setPublic(true);
2161-
$container->setAlias(MessageBusInterface::class,$busId);
2161+
$container->setAlias(MessageBusInterface::class,$busId)->setPublic(true);
21622162
}else {
21632163
$container->registerAliasForArgument($busId, MessageBusInterface::class);
21642164
}

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
useSymfony\Component\Messenger\Middleware\SendMessageMiddleware;
3434
useSymfony\Component\Messenger\Middleware\TraceableMiddleware;
3535
useSymfony\Component\Messenger\Middleware\ValidationMiddleware;
36+
useSymfony\Component\Messenger\ParallelMessageBus;
3637
useSymfony\Component\Messenger\Retry\MultiplierRetryStrategy;
3738
useSymfony\Component\Messenger\RoutableMessageBus;
3839
useSymfony\Component\Messenger\Transport\InMemory\InMemoryTransportFactory;
@@ -54,6 +55,7 @@
5455
abstract_arg('per message senders map'),
5556
abstract_arg('senders service locator'),
5657
])
58+
5759
->set('messenger.middleware.send_message', SendMessageMiddleware::class)
5860
->abstract()
5961
->args([
@@ -134,6 +136,15 @@
134136
])
135137
->tag('messenger.transport_factory')
136138

139+
->set('parallel_bus', ParallelMessageBus::class)
140+
->args([
141+
[],
142+
param('kernel.environment'),
143+
param('kernel.debug'),
144+
param('kernel.project_dir'),
145+
])
146+
->tag('messenger.bus')
147+
137148
->set('messenger.transport.in_memory.factory', InMemoryTransportFactory::class)
138149
->tag('messenger.transport_factory')
139150
->tag('kernel.reset', ['method' =>'reset'])

‎src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ protected function configure(): void
8585
newInputOption('queues',null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY,'Limit receivers to only consume from the specified queues'),
8686
newInputOption('no-reset',null, InputOption::VALUE_NONE,'Do not reset container services after each message'),
8787
newInputOption('all',null, InputOption::VALUE_NONE,'Consume messages from all receivers'),
88+
newInputOption('parallel-limit','p', InputOption::VALUE_REQUIRED,'The number of concurrent processes',10),
8889
])
8990
->setHelp(<<<'EOF'
9091
The <info>%command.name%</info> command consumes messages and dispatches them to the message bus.
@@ -250,6 +251,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int
250251
$options['queues'] =$queues;
251252
}
252253

254+
$options['parallel-limit'] =$input->getOption('parallel-limit');
255+
253256
try {
254257
$this->worker->run($options);
255258
}finally {
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespaceSymfony\Component\Messenger;
13+
14+
useAmp\Cache\LocalCache;
15+
useAmp\Cancellation;
16+
useAmp\Parallel\Worker\Task;
17+
useAmp\Sync\Channel;
18+
useApp\Kernel;
19+
usePsr\Container\ContainerInterface;
20+
useSymfony\Component\Dotenv\Dotenv;
21+
useSymfony\Component\Messenger\Exception\LogicException;
22+
useSymfony\Component\Messenger\Stamp\AckStamp;
23+
24+
class DispatchTaskimplements Task
25+
{
26+
privatestatic ?LocalCache$cache =null;
27+
28+
publicfunction__construct(privateEnvelope$envelope,privatearray$stamps,privatereadonlystring$env,privatereadonlybool$isDebug,privatereadonlystring$projectDir)
29+
{
30+
}
31+
32+
publicfunctionrun(Channel$channel,Cancellation$cancellation):mixed
33+
{
34+
$container =$this->getContainer();
35+
$envelope =$this->dispatch($container,$channel);
36+
37+
return$envelope->withoutStampsOfType(AckStamp::class);
38+
}
39+
40+
privatefunctiondispatch(ContainerInterface$container,$channel)
41+
{
42+
if (!$container->has(MessageBusInterface::class)) {
43+
thrownewLogicException(sprintf("%s can't be found.", MessageBusInterface::class));
44+
}
45+
46+
$messageBus =$container->get(MessageBusInterface::class);
47+
48+
return$messageBus->dispatch($this->envelope,$this->stamps);
49+
}
50+
51+
privatefunctiongetContainer()
52+
{
53+
$cache =self::$cache ??=newLocalCache();
54+
$container =$cache->get('cache-container');
55+
56+
// if not in cache, create container
57+
if (!$container) {
58+
if (!method_exists(Dotenv::class,'bootEnv')) {
59+
thrownew \LogicException(sprintf("Method bootEnv de\"%s\" doesn't exist", Dotenv::class));
60+
}
61+
62+
(newDotenv())->bootEnv($this->projectDir.'/.env');
63+
64+
if (!class_exists(Kernel::class) && !isset($_ENV['KERNEL_CLASS'])) {
65+
thrownew \LogicException('You must set the KERNEL_CLASS environment variable to the fully-qualified class name of your Kernel in .env or have "%s" class.', Kernel::class);
66+
}elseif (class_exists(Kernel::class)) {
67+
$kernel =newKernel($this->env,$this->isDebug);
68+
}else {
69+
$kernel =new$_ENV['KERNEL_CLASS']($this->env,$this->isDebug);
70+
}
71+
72+
$kernel->boot();
73+
74+
$container =$kernel->getContainer();
75+
$cache->set('cache-container',$container);
76+
}
77+
78+
return$container;
79+
}
80+
}

‎src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
useSymfony\Component\Messenger\Handler\HandlerDescriptor;
2121
useSymfony\Component\Messenger\Handler\HandlersLocatorInterface;
2222
useSymfony\Component\Messenger\Stamp\AckStamp;
23+
useSymfony\Component\Messenger\Stamp\BusNameStamp;
2324
useSymfony\Component\Messenger\Stamp\FlushBatchHandlersStamp;
2425
useSymfony\Component\Messenger\Stamp\HandledStamp;
2526
useSymfony\Component\Messenger\Stamp\HandlerArgumentsStamp;
@@ -32,6 +33,8 @@ class HandleMessageMiddleware implements MiddlewareInterface
3233
{
3334
use LoggerAwareTrait;
3435

36+
privateconstPARALLEL_BUS ='parallel_bus';
37+
3538
publicfunction__construct(
3639
privateHandlersLocatorInterface$handlersLocator,
3740
privatebool$allowNoHandlers =false,
@@ -64,6 +67,10 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
6467

6568
/** @var AckStamp $ackStamp */
6669
if ($batchHandler &&$ackStamp =$envelope->last(AckStamp::class)) {
70+
if ($envelope->last(BusNameStamp::class) &&self::PARALLEL_BUS ===$envelope->last(BusNameStamp::class)->getBusName()) {
71+
thrownewHandlerFailedException($envelope, [newLogicException("Parallel bus can't be used for batch messages")]);
72+
}
73+
6774
$ack =newAcknowledger(get_debug_type($batchHandler),staticfunction (?\Throwable$e =null,$result =null)use ($envelope,$ackStamp,$handlerDescriptor) {
6875
if (null !==$e) {
6976
$e =newHandlerFailedException($envelope, [$handlerDescriptor->getName() =>$e]);
@@ -75,7 +82,6 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
7582
});
7683

7784
$result =$this->callHandler($handler,$message,$ack,$envelope->last(HandlerArgumentsStamp::class));
78-
7985
if (!\is_int($result) ||0 >$result) {
8086
thrownewLogicException(sprintf('A handler implementing BatchHandlerInterface must return the size of the current batch as a positive integer, "%s" returned from "%s".',\is_int($result) ?$result :get_debug_type($result),get_debug_type($batchHandler)));
8187
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespaceSymfony\Component\Messenger;
13+
14+
useAmp\Parallel\Worker\ContextWorkerPool;
15+
useSymfony\Component\Messenger\Stamp\FutureStamp;
16+
17+
usefunctionAmp\async;
18+
usefunctionAmp\Parallel\Worker\workerPool;
19+
20+
class ParallelMessageBusimplements MessageBusInterface
21+
{
22+
publicstatic ?ContextWorkerPool$worker =null;
23+
24+
publicfunction__construct(privatearray$something,privatereadonlystring$env,privatereadonlystring$debug,privatereadonlystring$projectdir)
25+
{
26+
}
27+
28+
publicfunctiondispatch(object$message,array$stamps = []):Envelope
29+
{
30+
$worker = (self::$worker ??=workerPool());
31+
32+
$envelope = Envelope::wrap($message,$stamps);
33+
$task =newDispatchTask($envelope,$stamps,$this->env,$this->debug,$this->projectdir);
34+
35+
$future =async(function ()use ($worker,$task) {
36+
return$worker->submit($task);
37+
});
38+
39+
return$envelope->with(newFutureStamp($future));
40+
}
41+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespaceSymfony\Component\Messenger\Stamp;
13+
14+
useAmp\Future;
15+
16+
readonlyclass FutureStampimplements StampInterface
17+
{
18+
publicfunction__construct(privateFuture$future)
19+
{
20+
}
21+
22+
publicfunctiongetFuture():Future
23+
{
24+
return$this->future;
25+
}
26+
}

‎src/Symfony/Component/Messenger/TraceableMessageBus.php

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ public function dispatch(object $message, array $stamps = []): Envelope
4141

4242
throw$e;
4343
}finally {
44-
$this->dispatchedMessages[] =$context + ['stamps_after_dispatch' =>array_merge([], ...array_values($envelope->all()))];
44+
if ($envelopeinstanceof Envelope) {
45+
$this->dispatchedMessages[] =$context + ['stamps_after_dispatch' =>array_merge([], ...array_values($envelope->all()))];
46+
}
4547
}
4648
}
4749

@@ -102,4 +104,9 @@ private function getCaller(): array
102104
'line' =>$line,
103105
];
104106
}
107+
108+
publicfunctiongetMessageBus():MessageBusInterface
109+
{
110+
return$this->decoratedBus;
111+
}
105112
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp