Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite97dbbd

Browse files
[Messenger] allow processing messages in batches
1 parentab7f816 commite97dbbd

File tree

8 files changed

+191
-62
lines changed

8 files changed

+191
-62
lines changed
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespaceSymfony\Component\Messenger\Handler;
13+
14+
/**
15+
* @author Nicolas Grekas <p@tchwork.com>
16+
*/
17+
interface BatchHandlerInterface
18+
{
19+
/**
20+
* @param \Closure(\Throwable=, mixed=)|null $ack The closure to call in handleBatch() to ack/nack the $message.
21+
* The message should be handled synchronously when null; or the
22+
* closure should be given a throwable if handling failed and the
23+
* result from handling the message.
24+
*
25+
* @return int The number of pending messages in the batch if $ack was provided,
26+
* the result from handling the message otherwise
27+
*/
28+
//public function __invoke(object $message, \Closure $ack = null): int|mixed;
29+
30+
/**
31+
* Handles messages buffered after successive calls to __invoke().
32+
*/
33+
publicfunctionhandleBatch();
34+
}

‎src/Symfony/Component/Messenger/Handler/HandlerDescriptor.php‎

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
finalclass HandlerDescriptor
2020
{
2121
private$handler;
22+
private$name;
23+
private$batchHandler;
2224
private$options;
2325

2426
publicfunction__construct(callable$handler,array$options = [])
@@ -34,7 +36,7 @@ public function getHandler(): callable
3436

3537
publicfunctiongetName():string
3638
{
37-
$name =$this->callableName($this->handler);
39+
$name =$this->name ??$this->callableName($this->handler);
3840
$alias =$this->options['alias'] ??null;
3941

4042
if (null !==$alias) {
@@ -44,37 +46,42 @@ public function getName(): string
4446
return$name;
4547
}
4648

49+
publicfunctiongetBatchHandler(): ?BatchHandlerInterface
50+
{
51+
if (null ===$this->name) {
52+
$this->callableName($this->handler);
53+
}
54+
55+
return$this->batchHandler;
56+
}
57+
4758
publicfunctiongetOption(string$option)
4859
{
4960
return$this->options[$option] ??null;
5061
}
5162

5263
privatefunctioncallableName(callable$handler):string
5364
{
54-
if (\is_array($handler)) {
55-
if (\is_object($handler[0])) {
56-
return\get_class($handler[0]).'::'.$handler[1];
57-
}
58-
59-
return$handler[0].'::'.$handler[1];
65+
if (!$handlerinstanceof \Closure) {
66+
$handler = \Closure::fromCallable($handler);
6067
}
6168

62-
if (\is_string($handler)) {
63-
return$handler;
69+
$r =new \ReflectionFunction($handler);
70+
71+
if (str_contains($r->name,'{closure}')) {
72+
return$this->name ='Closure';
6473
}
6574

66-
if ($handlerinstanceof \Closure) {
67-
$r =new \ReflectionFunction($handler);
68-
if (str_contains($r->name,'{closure}')) {
69-
return'Closure';
70-
}
71-
if ($class =$r->getClosureScopeClass()) {
72-
return$class->name.'::'.$r->name;
73-
}
75+
if (!$handler =$r->getClosureThis()) {
76+
$class =$r->getClosureScopeClass();
77+
78+
return$this->name = ($class ?$class->name.'::' :'').$r->name;
79+
}
7480

75-
return$r->name;
81+
if ($handlerinstanceof BatchHandlerInterface) {
82+
$this->batchHandler =$handler;
7683
}
7784

78-
return\get_class($handler).'::__invoke';
85+
return$this->name =\get_class($handler).'::'.$r->name;
7986
}
8087
}

‎src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php‎

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,13 @@
1515
usePsr\Log\NullLogger;
1616
useSymfony\Component\Messenger\Envelope;
1717
useSymfony\Component\Messenger\Exception\HandlerFailedException;
18+
useSymfony\Component\Messenger\Exception\LogicException;
1819
useSymfony\Component\Messenger\Exception\NoHandlerForMessageException;
1920
useSymfony\Component\Messenger\Handler\HandlerDescriptor;
2021
useSymfony\Component\Messenger\Handler\HandlersLocatorInterface;
2122
useSymfony\Component\Messenger\Stamp\HandledStamp;
23+
useSymfony\Component\Messenger\Stamp\NoAutoAckStamp;
24+
useSymfony\Component\Messenger\Stamp\ReceivedStamp;
2225

2326
/**
2427
* @author Samuel Roze <samuel.roze@gmail.com>
@@ -60,9 +63,44 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
6063

6164
try {
6265
$handler =$handlerDescriptor->getHandler();
63-
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor,$handler($message));
66+
$batchHandler =$handlerDescriptor->getBatchHandler();
67+
$acked =false;
68+
$ack =null;
69+
70+
if ($batchHandler &&$receivedStamp =$envelope->last(ReceivedStamp::class)) {
71+
$ack =function (\Throwable$e =null,mixed$result =null)use ($envelope,$receivedStamp,$handlerDescriptor, &$acked) {
72+
$acked =true;
73+
if (null ===$e) {
74+
$envelope =$envelope->with(HandledStamp::fromDescriptor($handlerDescriptor,$result));
75+
}
76+
$receivedStamp->ack($envelope,$e);
77+
};
78+
}
79+
80+
if (null ===$ack) {
81+
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor,$handler($message));
82+
}else {
83+
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor,$handler($message,$ack));
84+
85+
if ($acked) {
86+
$ack =null;
87+
}else {
88+
$envelope =$envelope->with(newNoAutoAckStamp());
89+
}
90+
}
91+
6492
$envelope =$envelope->with($handledStamp);
6593
$this->logger->info('Message {class} handled by {handler}',$context + ['handler' =>$handledStamp->getHandlerName()]);
94+
95+
if (null !==$ack) {
96+
if (!\is_int($batchSize =$handledStamp->getResult()) ||0 >=$batchSize) {
97+
thrownewLogicException(sprintf('A handler implementing BatchHandlerInterface must return the size of the current batch as a positive integer, "%s" returned from "%s".',\is_int($batchSize) ?$batchSize :get_debug_type($batchSize),get_debut_type(batchHandler)));
98+
}
99+
100+
if ($batchSize >= ($handlerDescriptor->getOption('batch_size') ??0)) {
101+
$batchHandler->handleBatch();
102+
}
103+
}
66104
}catch (\Throwable$e) {
67105
$exceptions[] =$e;
68106
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespaceSymfony\Component\Messenger\Stamp;
13+
14+
/**
15+
* A marker that ack'ing for this message should not be done automatically.
16+
*/
17+
class NoAutoAckStampimplements NonSendableStampInterface
18+
{
19+
}

‎src/Symfony/Component/Messenger/Stamp/ReceivedStamp.php‎

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,25 @@
2626
finalclass ReceivedStampimplements NonSendableStampInterface
2727
{
2828
private$transportName;
29+
private$ack;
2930

30-
publicfunction__construct(string$transportName)
31+
publicfunction__construct(string$transportName,\Closure$ack =null)
3132
{
3233
$this->transportName =$transportName;
34+
$this->ack =$ack;
3335
}
3436

3537
publicfunctiongetTransportName():string
3638
{
3739
return$this->transportName;
3840
}
41+
42+
publicfunctionack(Envelope$envelope,\Throwable$e =null):void
43+
{
44+
if ($this->ack) {
45+
($this->ack)($envelope,$e);
46+
}elseif ($e) {
47+
throw$e;
48+
}
49+
}
3950
}

‎src/Symfony/Component/Messenger/Tests/Handler/HandlersLocatorTest.php‎

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ public function testItYieldsHandlerDescriptors()
2727
DummyMessage::class => [$handler],
2828
]);
2929

30-
$this->assertEquals([newHandlerDescriptor($handler)],iterator_to_array($locator->getHandlers(newEnvelope(newDummyMessage('a')))));
30+
$descriptor =newHandlerDescriptor($handler);
31+
$descriptor->getName();
32+
33+
$this->assertEquals([$descriptor],iterator_to_array($locator->getHandlers(newEnvelope(newDummyMessage('a')))));
3134
}
3235

3336
publicfunctiontestItReturnsOnlyHandlersMatchingTransport()
@@ -43,6 +46,9 @@ public function testItReturnsOnlyHandlersMatchingTransport()
4346
],
4447
]);
4548

49+
$first->getName();
50+
$second->getName();
51+
4652
$this->assertEquals([
4753
$first,
4854
$second,

‎src/Symfony/Component/Messenger/Tests/WorkerTest.php‎

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,24 +48,26 @@ public function testWorkerDispatchTheReceivedMessage()
4848
]);
4949

5050
$bus =$this->createMock(MessageBusInterface::class);
51+
$envelopes = [];
5152

5253
$bus->expects($this->exactly(2))
5354
->method('dispatch')
54-
->withConsecutive(
55-
[newEnvelope($apiMessage, [newReceivedStamp('transport'),newConsumedByWorkerStamp()])],
56-
[newEnvelope($ipaMessage, [newReceivedStamp('transport'),newConsumedByWorkerStamp()])]
57-
)
58-
->willReturnOnConsecutiveCalls(
59-
$this->returnArgument(0),
60-
$this->returnArgument(0)
61-
);
55+
->willReturnCallback(function ($envelope)use (&$envelopes) {
56+
return$envelopes[] =$envelope;
57+
});
6258

6359
$dispatcher =newEventDispatcher();
6460
$dispatcher->addSubscriber(newStopWorkerOnMessageLimitListener(2));
6561

6662
$worker =newWorker(['transport' =>$receiver],$bus,$dispatcher);
6763
$worker->run();
6864

65+
$this->assertSame($apiMessage,$envelopes[0]->getMessage());
66+
$this->assertSame($ipaMessage,$envelopes[1]->getMessage());
67+
$this->assertCount(1,$envelopes[0]->all(ReceivedStamp::class));
68+
$this->assertCount(1,$envelopes[0]->all(ConsumedByWorkerStamp::class));
69+
$this->assertSame('transport',$envelopes[0]->last(ReceivedStamp::class)->getTransportName());
70+
6971
$this->assertSame(2,$receiver->getAcknowledgeCount());
7072
}
7173

‎src/Symfony/Component/Messenger/Worker.php‎

Lines changed: 44 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -133,45 +133,57 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,
133133
return;
134134
}
135135

136-
try {
137-
$envelope =$this->bus->dispatch($envelope->with(newReceivedStamp($transportName),newConsumedByWorkerStamp()));
138-
}catch (\Throwable$throwable) {
139-
$rejectFirst =$throwableinstanceof RejectRedeliveredMessageException;
140-
if ($rejectFirst) {
141-
// redelivered messages are rejected first so that continuous failures in an event listener or while
142-
// publishing for retry does not cause infinite redelivery loops
143-
$receiver->reject($envelope);
144-
}
136+
$acked =false;
137+
$ack =function (Envelope$envelope,\Throwable$e =null)use ($receiver,$transportName, &$acked) {
138+
$acked =true;
139+
140+
if ($e) {
141+
if ($rejectFirst =$einstanceof RejectRedeliveredMessageException) {
142+
// redelivered messages are rejected first so that continuous failures in an event listener or while
143+
// publishing for retry does not cause infinite redelivery loops
144+
$receiver->reject($envelope);
145+
}
145146

146-
if ($throwableinstanceof HandlerFailedException) {
147-
$envelope =$throwable->getEnvelope();
148-
}
147+
if ($einstanceof HandlerFailedException) {
148+
$envelope =$e->getEnvelope();
149+
}
150+
151+
$failedEvent =newWorkerMessageFailedEvent($envelope,$transportName,$e);
152+
$this->dispatchEvent($failedEvent);
153+
$envelope =$failedEvent->getEnvelope();
149154

150-
$failedEvent =newWorkerMessageFailedEvent($envelope,$transportName,$throwable);
151-
$this->dispatchEvent($failedEvent);
152-
$envelope =$failedEvent->getEnvelope();
155+
if (!$rejectFirst) {
156+
$receiver->reject($envelope);
157+
}
153158

154-
if (!$rejectFirst) {
155-
$receiver->reject($envelope);
159+
return;
156160
}
157161

158-
return;
159-
}
162+
$handledEvent =newWorkerMessageHandledEvent($envelope,$transportName);
163+
$this->dispatchEvent($handledEvent);
164+
$envelope =$handledEvent->getEnvelope();
165+
166+
if (null !==$this->logger) {
167+
$message =$envelope->getMessage();
168+
$context = [
169+
'message' =>$message,
170+
'class' =>\get_class($message),
171+
];
172+
$this->logger->info('{class} was handled successfully (acknowledging to transport).',$context);
173+
}
160174

161-
$handledEvent =newWorkerMessageHandledEvent($envelope,$transportName);
162-
$this->dispatchEvent($handledEvent);
163-
$envelope =$handledEvent->getEnvelope();
164-
165-
if (null !==$this->logger) {
166-
$message =$envelope->getMessage();
167-
$context = [
168-
'message' =>$message,
169-
'class' =>\get_class($message),
170-
];
171-
$this->logger->info('{class} was handled successfully (acknowledging to transport).',$context);
172-
}
175+
$receiver->ack($envelope);
176+
};
173177

174-
$receiver->ack($envelope);
178+
try {
179+
$envelope =$this->bus->dispatch($envelope->with(newReceivedStamp($transportName,$ack),newConsumedByWorkerStamp()));
180+
181+
if (!$acked && !$envelope->last(NoAutoAckStamp::class)) {
182+
$ack($envelope);
183+
}
184+
}catch (\Throwable$e) {
185+
$ack($envelope,$e);
186+
}
175187
}
176188

177189
publicfunctionstop():void

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp