Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit102bf8e

Browse files
committed
Added ability to distinguish retry and delay actions so that different "x-dead-letter-exchange" exchange name will be used in different scenarios
1 parent75e71e3 commit102bf8e

File tree

5 files changed

+149
-53
lines changed

5 files changed

+149
-53
lines changed

‎src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpExtIntegrationTest.php‎

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
useSymfony\Component\Messenger\Bridge\Amqp\Transport\Connection;
2121
useSymfony\Component\Messenger\Envelope;
2222
useSymfony\Component\Messenger\Stamp\DelayStamp;
23+
useSymfony\Component\Messenger\Stamp\ReceivedStamp;
2324
useSymfony\Component\Messenger\Stamp\RedeliveryStamp;
2425
useSymfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
2526
useSymfony\Component\Messenger\Transport\Serialization\Serializer;
@@ -136,6 +137,45 @@ public function testRetryAndDelay()
136137
$receiver->ack($envelope);
137138
}
138139

140+
publicfunctiontestRetryAffectsOnlyOriginalQueue()
141+
{
142+
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'), [
143+
'exchange' => [
144+
'name' =>'messages_topic',
145+
'type' =>'topic',
146+
'default_publish_routing_key' =>'topic_routing_key',
147+
],
148+
'queues' => [
149+
'A' => ['binding_keys' => ['topic_routing_key']],
150+
'B' => ['binding_keys' => ['topic_routing_key']],
151+
],
152+
]);
153+
$connection->setup();
154+
$connection->purgeQueues();
155+
156+
$serializer =$this->createSerializer();
157+
$sender =newAmqpSender($connection,$serializer);
158+
$receiver =newAmqpReceiver($connection,$serializer);
159+
160+
// initial delivery: should receive in both queues
161+
$sender->send(newEnvelope(newDummyMessage('Payload')));
162+
163+
$receivedEnvelopes =$this->receiveWithQueueName($receiver);
164+
$this->assertCount(2,$receivedEnvelopes);
165+
$this->assertArrayHasKey('A',$receivedEnvelopes);
166+
$this->assertArrayHasKey('B',$receivedEnvelopes);
167+
168+
// redelivery: should receive in only "A" queue
169+
$retryEnvelope =$receivedEnvelopes['A']
170+
->with(newDelayStamp(10))
171+
->with(newRedeliveryStamp(1));
172+
$sender->send($retryEnvelope);
173+
174+
$redelivedEnvelopes =$this->receiveWithQueueName($receiver);
175+
$this->assertCount(1,$redelivedEnvelopes);
176+
$this->assertArrayHasKey('A',$redelivedEnvelopes);
177+
}
178+
139179
publicfunctiontestItReceivesSignals()
140180
{
141181
$serializer =$this->createSerializer();
@@ -248,4 +288,19 @@ private function receiveEnvelopes(ReceiverInterface $receiver, int $timeout): ar
248288

249289
return$envelopes;
250290
}
291+
292+
privatefunctionreceiveWithQueueName(AmqpReceiver$receiver)
293+
{
294+
// let RabbitMQ receive messages
295+
usleep(100 *1000);// 100ms
296+
297+
$receivedEnvelopes = [];
298+
foreach ($receiver->get()as$envelope) {
299+
$queueName =$envelope->last(AmqpReceivedStamp::class)->getQueueName();
300+
$receivedEnvelopes[$queueName] =$envelope;
301+
$receiver->ack($envelope);
302+
}
303+
304+
return$receivedEnvelopes;
305+
}
251306
}

‎src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php‎

Lines changed: 54 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -413,36 +413,26 @@ public function testAutoSetupWithDelayDeclaresExchangeQueuesAndDelay()
413413

414414
publicfunctiontestItDelaysTheMessage()
415415
{
416-
$amqpConnection =$this->createMock(\AMQPConnection::class);
417-
$amqpChannel =$this->createMock(\AMQPChannel::class);
418-
419-
$factory =$this->createMock(AmqpFactory::class);
420-
$factory->method('createConnection')->willReturn($amqpConnection);
421-
$factory->method('createChannel')->willReturn($amqpChannel);
422-
$factory->method('createQueue')->will($this->onConsecutiveCalls(
423-
$this->createMock(\AMQPQueue::class),
424-
$delayQueue =$this->createMock(\AMQPQueue::class)
425-
));
426-
$factory->method('createExchange')->will($this->onConsecutiveCalls(
427-
$this->createMock(\AMQPExchange::class),
428-
$delayExchange =$this->createMock(\AMQPExchange::class)
429-
));
416+
$delayExchange =$this->createMock(\AMQPExchange::class);
417+
$delayExchange->expects($this->once())
418+
->method('publish')
419+
->with('{}','delay_messages__delay_5000',AMQP_NOPARAM, ['headers' => ['x-some-headers' =>'foo'],'delivery_mode' =>2]);
420+
$connection =$this->createDelayOrRetryConnection($delayExchange,self::DEFAULT_EXCHANGE_NAME,'delay_messages__delay_5000');
430421

431-
$delayQueue->expects($this->once())->method('setName')->with('delay_messages__5000');
432-
$delayQueue->expects($this->once())->method('setArguments')->with([
433-
'x-message-ttl' =>5000,
434-
'x-expires' =>5000 +10000,
435-
'x-dead-letter-exchange' =>self::DEFAULT_EXCHANGE_NAME,
436-
'x-dead-letter-routing-key' =>'',
437-
]);
438-
439-
$delayQueue->expects($this->once())->method('declareQueue');
440-
$delayQueue->expects($this->once())->method('bind')->with('delays','delay_messages__5000');
422+
$connection->publish('{}', ['x-some-headers' =>'foo'],5000);
423+
}
441424

442-
$delayExchange->expects($this->once())->method('publish')->with('{}','delay_messages__5000',AMQP_NOPARAM, ['headers' => ['x-some-headers' =>'foo'],'delivery_mode' =>2]);
425+
publicfunctiontestItRetriesTheMessage()
426+
{
427+
$delayExchange =$this->createMock(\AMQPExchange::class);
428+
$delayExchange->expects($this->once())
429+
->method('publish')
430+
->with('{}','delay_messages__retry_5000',AMQP_NOPARAM);
431+
$connection =$this->createDelayOrRetryConnection($delayExchange,'','delay_messages__retry_5000');
443432

444-
$connection = Connection::fromDsn('amqp://localhost', [],$factory);
445-
$connection->publish('{}', ['x-some-headers' =>'foo'],5000);
433+
$amqpEnvelope =$this->createMock(\AMQPEnvelope::class);
434+
$amqpStamp = AmqpStamp::createFromAmqpEnvelope($amqpEnvelope,null,'');
435+
$connection->publish('{}', [],5000,$amqpStamp);
446436
}
447437

448438
publicfunctiontestItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
@@ -470,7 +460,7 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
470460

471461
$connection = Connection::fromDsn('amqp://localhost',$connectionOptions,$factory);
472462

473-
$delayQueue->expects($this->once())->method('setName')->with('delay_messages__120000');
463+
$delayQueue->expects($this->once())->method('setName')->with('delay_messages__delay_120000');
474464
$delayQueue->expects($this->once())->method('setArguments')->with([
475465
'x-message-ttl' =>120000,
476466
'x-expires' =>120000 +10000,
@@ -479,9 +469,9 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
479469
]);
480470

481471
$delayQueue->expects($this->once())->method('declareQueue');
482-
$delayQueue->expects($this->once())->method('bind')->with('delays','delay_messages__120000');
472+
$delayQueue->expects($this->once())->method('bind')->with('delays','delay_messages__delay_120000');
483473

484-
$delayExchange->expects($this->once())->method('publish')->with('{}','delay_messages__120000',AMQP_NOPARAM, ['headers' => [],'delivery_mode' =>2]);
474+
$delayExchange->expects($this->once())->method('publish')->with('{}','delay_messages__delay_120000',AMQP_NOPARAM, ['headers' => [],'delivery_mode' =>2]);
485475
$connection->publish('{}', [],120000);
486476
}
487477

@@ -589,7 +579,7 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument
589579

590580
$connection = Connection::fromDsn('amqp://localhost',$connectionOptions,$factory);
591581

592-
$delayQueue->expects($this->once())->method('setName')->with('delay_messages_routing_key_120000');
582+
$delayQueue->expects($this->once())->method('setName')->with('delay_messages_routing_key_delay_120000');
593583
$delayQueue->expects($this->once())->method('setArguments')->with([
594584
'x-message-ttl' =>120000,
595585
'x-expires' =>120000 +10000,
@@ -598,9 +588,9 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument
598588
]);
599589

600590
$delayQueue->expects($this->once())->method('declareQueue');
601-
$delayQueue->expects($this->once())->method('bind')->with('delays','delay_messages_routing_key_120000');
591+
$delayQueue->expects($this->once())->method('bind')->with('delays','delay_messages_routing_key_delay_120000');
602592

603-
$delayExchange->expects($this->once())->method('publish')->with('{}','delay_messages_routing_key_120000',AMQP_NOPARAM, ['headers' => [],'delivery_mode' =>2]);
593+
$delayExchange->expects($this->once())->method('publish')->with('{}','delay_messages_routing_key_delay_120000',AMQP_NOPARAM, ['headers' => [],'delivery_mode' =>2]);
604594
$connection->publish('{}', [],120000,newAmqpStamp('routing_key'));
605595
}
606596

@@ -623,6 +613,37 @@ public function testItCanPublishWithCustomFlagsAndAttributes()
623613
$connection = Connection::fromDsn('amqp://localhost', [],$factory);
624614
$connection->publish('body', ['type' => DummyMessage::class],0,newAmqpStamp('routing_key',AMQP_IMMEDIATE, ['delivery_mode' =>2]));
625615
}
616+
617+
privatefunctioncreateDelayOrRetryConnection(\AMQPExchange$delayExchange,string$deadLetterExchangeName,string$delayQueueName):Connection
618+
{
619+
$amqpConnection =$this->createMock(\AMQPConnection::class);
620+
$amqpChannel =$this->createMock(\AMQPChannel::class);
621+
622+
$factory =$this->createMock(AmqpFactory::class);
623+
$factory->method('createConnection')->willReturn($amqpConnection);
624+
$factory->method('createChannel')->willReturn($amqpChannel);
625+
$factory->method('createQueue')->will($this->onConsecutiveCalls(
626+
$this->createMock(\AMQPQueue::class),
627+
$delayQueue =$this->createMock(\AMQPQueue::class)
628+
));
629+
$factory->method('createExchange')->will($this->onConsecutiveCalls(
630+
$this->createMock(\AMQPExchange::class),
631+
$delayExchange
632+
));
633+
634+
$delayQueue->expects($this->once())->method('setName')->with($delayQueueName);
635+
$delayQueue->expects($this->once())->method('setArguments')->with([
636+
'x-message-ttl' =>5000,
637+
'x-expires' =>5000 +10000,
638+
'x-dead-letter-exchange' =>$deadLetterExchangeName,
639+
'x-dead-letter-routing-key' =>'',
640+
]);
641+
642+
$delayQueue->expects($this->once())->method('declareQueue');
643+
$delayQueue->expects($this->once())->method('bind')->with('delays',$delayQueueName);
644+
645+
return Connection::fromDsn('amqp://localhost', [],$factory);
646+
}
626647
}
627648

628649
class TestAmqpFactoryextends AmqpFactory

‎src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpSender.php‎

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
useSymfony\Component\Messenger\Envelope;
1515
useSymfony\Component\Messenger\Exception\TransportException;
1616
useSymfony\Component\Messenger\Stamp\DelayStamp;
17+
useSymfony\Component\Messenger\Stamp\RedeliveryStamp;
1718
useSymfony\Component\Messenger\Transport\Sender\SenderInterface;
1819
useSymfony\Component\Messenger\Transport\Serialization\PhpSerializer;
1920
useSymfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -58,7 +59,11 @@ public function send(Envelope $envelope): Envelope
5859

5960
$amqpReceivedStamp =$envelope->last(AmqpReceivedStamp::class);
6061
if ($amqpReceivedStampinstanceof AmqpReceivedStamp) {
61-
$amqpStamp = AmqpStamp::createFromAmqpEnvelope($amqpReceivedStamp->getAmqpEnvelope(),$amqpStamp);
62+
$amqpStamp = AmqpStamp::createFromAmqpEnvelope(
63+
$amqpReceivedStamp->getAmqpEnvelope(),
64+
$amqpStamp,
65+
$envelope->last(RedeliveryStamp::class) ?$amqpReceivedStamp->getQueueName() :null
66+
);
6267
}
6368

6469
try {

‎src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpStamp.php‎

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespaceSymfony\Component\Messenger\Bridge\Amqp\Transport;
1313

1414
useSymfony\Component\Messenger\Stamp\NonSendableStampInterface;
15+
useSymfony\Component\Messenger\Stamp\RedeliveryStamp;
1516

1617
/**
1718
* @author Guillaume Gammelin <ggammelin@gmail.com>
@@ -22,6 +23,7 @@ final class AmqpStamp implements NonSendableStampInterface
2223
private$routingKey;
2324
private$flags;
2425
private$attributes;
26+
private$isEnvelopeRedelivered =false;
2527

2628
publicfunction__construct(string$routingKey =null,int$flags =AMQP_NOPARAM,array$attributes = [])
2729
{
@@ -45,7 +47,12 @@ public function getAttributes(): array
4547
return$this->attributes;
4648
}
4749

48-
publicstaticfunctioncreateFromAmqpEnvelope(\AMQPEnvelope$amqpEnvelope,self$previousStamp =null):self
50+
publicfunctionisEnvelopeRedelivered():bool
51+
{
52+
return$this->isEnvelopeRedelivered;
53+
}
54+
55+
publicstaticfunctioncreateFromAmqpEnvelope(\AMQPEnvelope$amqpEnvelope,self$previousStamp =null,string$redeliveryRoutingKey =null):self
4956
{
5057
$attr =$previousStamp->attributes ?? [];
5158

@@ -62,7 +69,14 @@ public static function createFromAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, self
6269
$attr['type'] =$attr['type'] ??$amqpEnvelope->getType();
6370
$attr['reply_to'] =$attr['reply_to'] ??$amqpEnvelope->getReplyTo();
6471

65-
returnnewself($previousStamp->routingKey ??$amqpEnvelope->getRoutingKey(),$previousStamp->flags ??AMQP_NOPARAM,$attr);
72+
if (is_null($redeliveryRoutingKey)) {
73+
$stamp =newself($previousStamp->routingKey ??$amqpEnvelope->getRoutingKey(),$previousStamp->flags ??AMQP_NOPARAM,$attr);
74+
}else {
75+
$stamp =newself($redeliveryRoutingKey,$previousStamp->flags ??AMQP_NOPARAM,$attr);
76+
$stamp->isEnvelopeRedelivered =true;
77+
}
78+
79+
return$stamp;
6680
}
6781

6882
publicstaticfunctioncreateWithAttributes(array$attributes,self$previousStamp =null):self

‎src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php‎

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
107107
$this->connectionOptions =array_replace_recursive([
108108
'delay' => [
109109
'exchange_name' =>'delays',
110-
'queue_name_pattern' =>'delay_%exchange_name%_%routing_key%_%delay%',
110+
'queue_name_pattern' =>'delay_%exchange_name%_%routing_key%_%action%_%delay%',
111111
],
112112
],$connectionOptions);
113113
$this->exchangeOptions =$exchangeOptions;
@@ -140,7 +140,7 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
140140
* * flags: Exchange flags (Default: AMQP_DURABLE)
141141
* * arguments: Extra arguments
142142
* * delay:
143-
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_%exchange_name%_%routing_key%_%delay%")
143+
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_%exchange_name%_%routing_key%_%action%_%delay%")
144144
* * exchange_name: Name of the exchange to be used for the delayed/retried messages (Default: "delays")
145145
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
146146
* * prefetch_count: set channel prefetch count
@@ -300,13 +300,14 @@ public function countMessagesInQueues(): int
300300
privatefunctionpublishWithDelay(string$body,array$headers,int$delay,AmqpStamp$amqpStamp =null)
301301
{
302302
$routingKey =$this->getRoutingKeyForMessage($amqpStamp);
303+
$isRetryAttempt =$amqpStamp ?$amqpStamp->isEnvelopeRedelivered() :false;
303304

304-
$this->setupDelay($delay,$routingKey);
305+
$this->setupDelay($delay,$routingKey,$isRetryAttempt);
305306

306307
$this->publishOnExchange(
307308
$this->getDelayExchange(),
308309
$body,
309-
$this->getRoutingKeyForDelay($delay,$routingKey),
310+
$this->getRoutingKeyForDelay($delay,$routingKey,$isRetryAttempt),
310311
$headers,
311312
$amqpStamp
312313
);
@@ -326,15 +327,15 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, string
326327
);
327328
}
328329

329-
privatefunctionsetupDelay(int$delay, ?string$routingKey)
330+
privatefunctionsetupDelay(int$delay, ?string$routingKey,bool$isRetryAttempt)
330331
{
331332
if ($this->shouldSetup()) {
332333
$this->setup();// setup delay exchange and normal exchange for delay queue to DLX messages to
333334
}
334335

335-
$queue =$this->createDelayQueue($delay,$routingKey);
336+
$queue =$this->createDelayQueue($delay,$routingKey,$isRetryAttempt);
336337
$queue->declareQueue();// the delay queue always need to be declared because the name is dynamic and cannot be declared in advance
337-
$queue->bind($this->connectionOptions['delay']['exchange_name'],$this->getRoutingKeyForDelay($delay,$routingKey));
338+
$queue->bind($this->connectionOptions['delay']['exchange_name'],$this->getRoutingKeyForDelay($delay,$routingKey,$isRetryAttempt));
338339
}
339340

340341
privatefunctiongetDelayExchange():\AMQPExchange
@@ -358,21 +359,19 @@ private function getDelayExchange(): \AMQPExchange
358359
* which is the original exchange, resulting on it being put back into
359360
* the original queue.
360361
*/
361-
privatefunctioncreateDelayQueue(int$delay, ?string$routingKey):\AMQPQueue
362+
privatefunctioncreateDelayQueue(int$delay, ?string$routingKey,bool$isRetryAttempt):\AMQPQueue
362363
{
363364
$queue =$this->amqpFactory->createQueue($this->channel());
364-
$queue->setName(str_replace(
365-
['%delay%','%exchange_name%','%routing_key%'],
366-
[$delay,$this->exchangeOptions['name'],$routingKey ??''],
367-
$this->connectionOptions['delay']['queue_name_pattern']
368-
));
365+
$queue->setName($this->getRoutingKeyForDelay($delay,$routingKey,$isRetryAttempt));
369366
$queue->setFlags(AMQP_DURABLE);
370367
$queue->setArguments([
371368
'x-message-ttl' =>$delay,
372369
// delete the delay queue 10 seconds after the message expires
373370
// publishing another message redeclares the queue which renews the lease
374371
'x-expires' =>$delay +10000,
375-
'x-dead-letter-exchange' =>$this->exchangeOptions['name'],
372+
// message should be broadcasted to all consumers during delay, but to only one queue during retry
373+
// empty name is default direct exchange
374+
'x-dead-letter-exchange' =>$isRetryAttempt ?'' :$this->exchangeOptions['name'],
376375
// after being released from to DLX, make sure the original routing key will be used
377376
// we must use an empty string instead of null for the argument to be picked up
378377
'x-dead-letter-routing-key' =>$routingKey ??'',
@@ -381,11 +380,13 @@ private function createDelayQueue(int $delay, ?string $routingKey): \AMQPQueue
381380
return$queue;
382381
}
383382

384-
privatefunctiongetRoutingKeyForDelay(int$delay, ?string$finalRoutingKey):string
383+
privatefunctiongetRoutingKeyForDelay(int$delay, ?string$finalRoutingKey,bool$isRetryAttempt):string
385384
{
385+
$action =$isRetryAttempt ?'retry' :'delay';
386+
386387
returnstr_replace(
387-
['%delay%','%exchange_name%','%routing_key%'],
388-
[$delay,$this->exchangeOptions['name'],$finalRoutingKey ??''],
388+
['%delay%','%exchange_name%','%routing_key%','%action%'],
389+
[$delay,$this->exchangeOptions['name'],$finalRoutingKey ??'',$action],
389390
$this->connectionOptions['delay']['queue_name_pattern']
390391
);
391392
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp