Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit75f1f0e

Browse files
committed
[Messenger] expire delay queue and fix auto_setup logic
1 parent950306a commit75f1f0e

File tree

3 files changed

+47
-41
lines changed

3 files changed

+47
-41
lines changed

‎src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php‎

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ public function testSetChannelPrefetchWhenSetup()
308308
);
309309

310310
// makes sure the channel looks connected, so it's not re-created
311-
$amqpChannel->expects($this->exactly(2))->method('isConnected')->willReturn(true);
311+
$amqpChannel->expects($this->any())->method('isConnected')->willReturn(true);
312312

313313
$amqpChannel->expects($this->exactly(2))->method('setPrefetchCount')->with(2);
314314
$connection = Connection::fromDsn('amqp://localhost?prefetch_count=2', [],$factory);
@@ -321,26 +321,31 @@ public function testItDelaysTheMessage()
321321
{
322322
$amqpConnection =$this->createMock(\AMQPConnection::class);
323323
$amqpChannel =$this->createMock(\AMQPChannel::class);
324-
$delayQueue =$this->createMock(\AMQPQueue::class);
325324

326325
$factory =$this->createMock(AmqpFactory::class);
327326
$factory->method('createConnection')->willReturn($amqpConnection);
328327
$factory->method('createChannel')->willReturn($amqpChannel);
329-
$factory->method('createQueue')->willReturn($delayQueue);
328+
$factory->method('createQueue')->will($this->onConsecutiveCalls(
329+
$amqpQueue =$this->createMock(\AMQPQueue::class),
330+
$delayQueue =$this->createMock(\AMQPQueue::class)
331+
));
330332
$factory->method('createExchange')->will($this->onConsecutiveCalls(
331333
$amqpExchange =$this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
332334
$delayExchange =$this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
333335
));
334336

335337
$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
336338
$amqpExchange->expects($this->once())->method('declareExchange');
339+
$amqpQueue->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
340+
$amqpQueue->expects($this->once())->method('declareQueue');
337341

338342
$delayExchange->expects($this->once())->method('setName')->with('delay');
339343
$delayExchange->expects($this->once())->method('declareExchange');
340344

341-
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages__5000');
345+
$delayQueue->expects($this->once())->method('setName')->with('delay_messages__5000');
342346
$delayQueue->expects($this->once())->method('setArguments')->with([
343347
'x-message-ttl' =>5000,
348+
'x-expires' =>5000 +10000,
344349
'x-dead-letter-exchange' =>self::DEFAULT_EXCHANGE_NAME,
345350
'x-dead-letter-routing-key' =>'',
346351
]);
@@ -383,9 +388,10 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
383388

384389
$connection = Connection::fromDsn('amqp://localhost',$connectionOptions,$factory);
385390

386-
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages__120000');
391+
$delayQueue->expects($this->once())->method('setName')->with('delay_messages__120000');
387392
$delayQueue->expects($this->once())->method('setArguments')->with([
388393
'x-message-ttl' =>120000,
394+
'x-expires' =>120000 +10000,
389395
'x-dead-letter-exchange' =>self::DEFAULT_EXCHANGE_NAME,
390396
'x-dead-letter-routing-key' =>'',
391397
]);
@@ -492,9 +498,10 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument
492498

493499
$connection = Connection::fromDsn('amqp://localhost',$connectionOptions,$factory);
494500

495-
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages_routing_key_120000');
501+
$delayQueue->expects($this->once())->method('setName')->with('delay_messages_routing_key_120000');
496502
$delayQueue->expects($this->once())->method('setArguments')->with([
497503
'x-message-ttl' =>120000,
504+
'x-expires' =>120000 +10000,
498505
'x-dead-letter-exchange' =>self::DEFAULT_EXCHANGE_NAME,
499506
'x-dead-letter-routing-key' =>'routing_key',
500507
]);

‎src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php‎

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,7 @@ public function send(Envelope $envelope): Envelope
4545

4646
/** @var DelayStamp|null $delayStamp */
4747
$delayStamp =$envelope->last(DelayStamp::class);
48-
$delay =0;
49-
if (null !==$delayStamp) {
50-
$delay =$delayStamp->getDelay();
51-
}
48+
$delay =$delayStamp ?$delayStamp->getDelay() :0;
5249

5350
$amqpStamp =$envelope->last(AmqpStamp::class);
5451
if (isset($encodedMessage['headers']['Content-Type'])) {

‎src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php‎

Lines changed: 33 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,8 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
6262
{
6363
$this->connectionOptions =array_replace_recursive([
6464
'delay' => [
65-
'routing_key_pattern' =>'delay_%exchange_name%_%routing_key%_%delay%',
6665
'exchange_name' =>'delay',
67-
'queue_name_pattern' =>'delay_queue_%exchange_name%_%routing_key%_%delay%',
66+
'queue_name_pattern' =>'delay_%exchange_name%_%routing_key%_%delay%',
6867
],
6968
],$connectionOptions);
7069
$this->exchangeOptions =$exchangeOptions;
@@ -93,9 +92,8 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
9392
* * flags: Exchange flags (Default: AMQP_DURABLE)
9493
* * arguments: Extra arguments
9594
* * delay:
96-
* * routing_key_pattern: The pattern of the routing key (Default: "delay_%exchange_name%_%routing_key%_%delay%")
97-
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%exchange_name%_%routing_key%_%delay%")
98-
* * exchange_name: Name of the exchange to be used for the retried messages (Default: "delay")
95+
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_%exchange_name%_%routing_key%_%delay%")
96+
* * exchange_name: Name of the exchange to be used for the delayed/retried messages (Default: "delay")
9997
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
10098
* * prefetch_count: set channel prefetch count
10199
*/
@@ -171,20 +169,20 @@ private static function normalizeQueueArguments(array $arguments): array
171169
}
172170

173171
/**
174-
* @param int $delay The delay in milliseconds
175-
*
176172
* @throws \AMQPException
177173
*/
178-
publicfunctionpublish(string$body,array$headers = [],int$delay =0,AmqpStamp$amqpStamp =null):void
174+
publicfunctionpublish(string$body,array$headers = [],int$delayInMs =0,AmqpStamp$amqpStamp =null):void
179175
{
180-
if (0 !==$delay) {
181-
$this->publishWithDelay($body,$headers,$delay,$amqpStamp);
176+
$this->clearWhenDisconnected();
177+
178+
if (0 !==$delayInMs) {
179+
$this->publishWithDelay($body,$headers,$delayInMs,$amqpStamp);
182180

183181
return;
184182
}
185183

186184
if ($this->shouldSetup()) {
187-
$this->setup();
185+
$this->setupExchangeAndQueues();
188186
}
189187

190188
$this->publishOnExchange(
@@ -213,9 +211,7 @@ private function publishWithDelay(string $body, array $headers, int $delay, Amqp
213211
{
214212
$routingKey =$this->getRoutingKeyForMessage($amqpStamp);
215213

216-
if ($this->shouldSetup()) {
217-
$this->setupDelay($delay,$routingKey);
218-
}
214+
$this->setupDelay($delay,$routingKey);
219215

220216
$this->publishOnExchange(
221217
$this->getDelayExchange(),
@@ -241,15 +237,12 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, string
241237

242238
privatefunctionsetupDelay(int$delay, ?string$routingKey)
243239
{
244-
if (!$this->channel()->isConnected()) {
245-
$this->clear();
240+
if ($this->shouldSetup()) {
241+
$this->setup();// setup delay exchange and normal exchange for delay queue to DLX messages to
246242
}
247243

248-
$this->exchange()->declareExchange();// setup normal exchange for delay queue to DLX messages to
249-
$this->getDelayExchange()->declareExchange();
250-
251244
$queue =$this->createDelayQueue($delay,$routingKey);
252-
$queue->declareQueue();
245+
$queue->declareQueue();// the delay queue always need to be declared because the name is dynamic and cannot be declared in advance
253246
$queue->bind($this->connectionOptions['delay']['exchange_name'],$this->getRoutingKeyForDelay($delay,$routingKey));
254247
}
255248

@@ -283,6 +276,9 @@ private function createDelayQueue(int $delay, ?string $routingKey)
283276
));
284277
$queue->setArguments([
285278
'x-message-ttl' =>$delay,
279+
// delete the delay queue 10 seconds after the message expires
280+
// publishing another message redeclares the queue which renews the lease
281+
'x-expires' =>$delay +10000,
286282
'x-dead-letter-exchange' =>$this->exchangeOptions['name'],
287283
// after being released from to DLX, make sure the original routing key will be used
288284
// we must use an empty string instead of null for the argument to be picked up
@@ -297,7 +293,7 @@ private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): st
297293
returnstr_replace(
298294
['%delay%','%exchange_name%','%routing_key%'],
299295
[$delay,$this->exchangeOptions['name'],$finalRoutingKey ??''],
300-
$this->connectionOptions['delay']['routing_key_pattern']
296+
$this->connectionOptions['delay']['queue_name_pattern']
301297
);
302298
}
303299

@@ -308,8 +304,10 @@ private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): st
308304
*/
309305
publicfunctionget(string$queueName): ?\AMQPEnvelope
310306
{
307+
$this->clearWhenDisconnected();
308+
311309
if ($this->shouldSetup()) {
312-
$this->setup();
310+
$this->setupExchangeAndQueues();
313311
}
314312

315313
try {
@@ -319,7 +317,7 @@ public function get(string $queueName): ?\AMQPEnvelope
319317
}catch (\AMQPQueueException$e) {
320318
if (404 ===$e->getCode() &&$this->shouldSetup()) {
321319
// If we get a 404 for the queue, it means we need to setup the exchange & queue.
322-
$this->setup();
320+
$this->setupExchangeAndQueues();
323321

324322
return$this->get();
325323
}
@@ -342,10 +340,12 @@ public function nack(\AMQPEnvelope $message, string $queueName, int $flags = AMQ
342340

343341
publicfunctionsetup():void
344342
{
345-
if (!$this->channel()->isConnected()) {
346-
$this->clear();
347-
}
343+
$this->setupExchangeAndQueues();
344+
$this->getDelayExchange()->declareExchange();
345+
}
348346

347+
privatefunctionsetupExchangeAndQueues():void
348+
{
349349
$this->exchange()->declareExchange();
350350

351351
foreach ($this->queuesOptionsas$queueName =>$queueConfig) {
@@ -424,12 +424,14 @@ public function exchange(): \AMQPExchange
424424
return$this->amqpExchange;
425425
}
426426

427-
privatefunctionclear():void
427+
privatefunctionclearWhenDisconnected():void
428428
{
429-
$this->amqpChannel =null;
430-
$this->amqpQueues = [];
431-
$this->amqpExchange =null;
432-
$this->amqpDelayExchange =null;
429+
if (!$this->channel()->isConnected()) {
430+
$this->amqpChannel =null;
431+
$this->amqpQueues = [];
432+
$this->amqpExchange =null;
433+
$this->amqpDelayExchange =null;
434+
}
433435
}
434436

435437
privatefunctionshouldSetup():bool

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp