Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit70dcc96

Browse files
committed
[Messenger] setup target exchange for delay
1 parent3d43ea0 commit70dcc96

File tree

2 files changed

+44
-48
lines changed

2 files changed

+44
-48
lines changed

‎src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php‎

Lines changed: 40 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
*/
2424
class ConnectionTestextends TestCase
2525
{
26+
privateconstDEFAULT_EXCHANGE_NAME ='messages';
27+
2628
/**
2729
* @expectedException \InvalidArgumentException
2830
* @expectedExceptionMessage The given AMQP DSN "amqp://:" is invalid.
@@ -40,9 +42,9 @@ public function testItCanBeConstructedWithDefaults()
4042
'port' =>5672,
4143
'vhost' =>'/',
4244
], [
43-
'name' =>'messages',
45+
'name' =>self::DEFAULT_EXCHANGE_NAME,
4446
], [
45-
'messages' => [],
47+
self::DEFAULT_EXCHANGE_NAME => [],
4648
]),
4749
Connection::fromDsn('amqp://')
4850
);
@@ -196,7 +198,7 @@ public function testItUsesANormalConnectionByDefault()
196198
$amqpChannel->expects($this->once())->method('isConnected')->willReturn(true);
197199
$amqpConnection->expects($this->once())->method('connect');
198200

199-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [],$factory);
201+
$connection = Connection::fromDsn('amqp://localhost', [],$factory);
200202
$connection->publish('body');
201203
}
202204

@@ -213,7 +215,7 @@ public function testItAllowsToUseAPersistentConnection()
213215
$amqpChannel->expects($this->once())->method('isConnected')->willReturn(true);
214216
$amqpConnection->expects($this->once())->method('pconnect');
215217

216-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?persistent=true', [],$factory);
218+
$connection = Connection::fromDsn('amqp://localhost?persistent=true', [],$factory);
217219
$connection->publish('body');
218220
}
219221

@@ -226,13 +228,12 @@ public function testItSetupsTheConnectionWithDefaults()
226228
$amqpExchange =$this->createMock(\AMQPExchange::class)
227229
);
228230

229-
$amqpExchange->method('getName')->willReturn('exchange_name');
230231
$amqpExchange->expects($this->once())->method('declareExchange');
231232
$amqpExchange->expects($this->once())->method('publish')->with('body',null,AMQP_NOPARAM, ['headers' => []]);
232233
$amqpQueue->expects($this->once())->method('declareQueue');
233-
$amqpQueue->expects($this->once())->method('bind')->with('exchange_name',null);
234+
$amqpQueue->expects($this->once())->method('bind')->with(self::DEFAULT_EXCHANGE_NAME,null);
234235

235-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [],$factory);
236+
$connection = Connection::fromDsn('amqp://localhost', [],$factory);
236237
$connection->publish('body');
237238
}
238239

@@ -250,21 +251,20 @@ public function testItSetupsTheConnection()
250251
$factory->method('createExchange')->willReturn($amqpExchange);
251252
$factory->method('createQueue')->will($this->onConsecutiveCalls($amqpQueue0,$amqpQueue1));
252253

253-
$amqpExchange->method('getName')->willReturn('exchange_name');
254254
$amqpExchange->expects($this->once())->method('declareExchange');
255255
$amqpExchange->expects($this->once())->method('publish')->with('body','routing_key',AMQP_NOPARAM, ['headers' => []]);
256256
$amqpQueue0->expects($this->once())->method('declareQueue');
257257
$amqpQueue0->expects($this->exactly(2))->method('bind')->withConsecutive(
258-
['exchange_name','binding_key0'],
259-
['exchange_name','binding_key1']
258+
[self::DEFAULT_EXCHANGE_NAME,'binding_key0'],
259+
[self::DEFAULT_EXCHANGE_NAME,'binding_key1']
260260
);
261261
$amqpQueue1->expects($this->once())->method('declareQueue');
262262
$amqpQueue1->expects($this->exactly(2))->method('bind')->withConsecutive(
263-
['exchange_name','binding_key2'],
264-
['exchange_name','binding_key3']
263+
[self::DEFAULT_EXCHANGE_NAME,'binding_key2'],
264+
[self::DEFAULT_EXCHANGE_NAME,'binding_key3']
265265
);
266266

267-
$dsn ='amqp://localhost/%2f/messages?'.
267+
$dsn ='amqp://localhost?'.
268268
'exchange[default_publish_routing_key]=routing_key&'.
269269
'queues[queue0][binding_keys][0]=binding_key0&'.
270270
'queues[queue0][binding_keys][1]=binding_key1&'.
@@ -284,18 +284,17 @@ public function testItCanDisableTheSetup()
284284
$amqpExchange =$this->createMock(\AMQPExchange::class)
285285
);
286286

287-
$amqpExchange->method('getName')->willReturn('exchange_name');
288287
$amqpExchange->expects($this->never())->method('declareExchange');
289288
$amqpQueue->expects($this->never())->method('declareQueue');
290289
$amqpQueue->expects($this->never())->method('bind');
291290

292-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', ['auto_setup' =>'false'],$factory);
291+
$connection = Connection::fromDsn('amqp://localhost', ['auto_setup' =>'false'],$factory);
293292
$connection->publish('body');
294293

295-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', ['auto_setup' =>false],$factory);
294+
$connection = Connection::fromDsn('amqp://localhost', ['auto_setup' =>false],$factory);
296295
$connection->publish('body');
297296

298-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?auto_setup=false', [],$factory);
297+
$connection = Connection::fromDsn('amqp://localhost?auto_setup=false', [],$factory);
299298
$connection->publish('body');
300299
}
301300

@@ -312,9 +311,9 @@ public function testSetChannelPrefetchWhenSetup()
312311
$amqpChannel->expects($this->exactly(2))->method('isConnected')->willReturn(true);
313312

314313
$amqpChannel->expects($this->exactly(2))->method('setPrefetchCount')->with(2);
315-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?prefetch_count=2', [],$factory);
314+
$connection = Connection::fromDsn('amqp://localhost?prefetch_count=2', [],$factory);
316315
$connection->setup();
317-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', ['prefetch_count' =>2],$factory);
316+
$connection = Connection::fromDsn('amqp://localhost', ['prefetch_count' =>2],$factory);
318317
$connection->setup();
319318
}
320319

@@ -329,21 +328,20 @@ public function testItDelaysTheMessage()
329328
$factory->method('createChannel')->willReturn($amqpChannel);
330329
$factory->method('createQueue')->willReturn($delayQueue);
331330
$factory->method('createExchange')->will($this->onConsecutiveCalls(
332-
$delayExchange =$this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
333-
$amqpExchange =$this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
331+
$amqpExchange =$this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
332+
$delayExchange =$this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
334333
));
335334

336-
$amqpExchange->expects($this->once())->method('setName')->with('messages');
337-
$amqpExchange->method('getName')->willReturn('messages');
335+
$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
336+
$amqpExchange->expects($this->once())->method('declareExchange');
338337

339338
$delayExchange->expects($this->once())->method('setName')->with('delay');
340339
$delayExchange->expects($this->once())->method('declareExchange');
341-
$delayExchange->method('getName')->willReturn('delay');
342340

343341
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages__5000');
344342
$delayQueue->expects($this->once())->method('setArguments')->with([
345343
'x-message-ttl' =>5000,
346-
'x-dead-letter-exchange' =>'messages',
344+
'x-dead-letter-exchange' =>self::DEFAULT_EXCHANGE_NAME,
347345
'x-dead-letter-routing-key' =>'',
348346
]);
349347

@@ -352,7 +350,7 @@ public function testItDelaysTheMessage()
352350

353351
$delayExchange->expects($this->once())->method('publish')->with('{}','delay_messages__5000',AMQP_NOPARAM, ['headers' => ['x-some-headers' =>'foo']]);
354352

355-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [],$factory);
353+
$connection = Connection::fromDsn('amqp://localhost', [],$factory);
356354
$connection->publish('{}', ['x-some-headers' =>'foo'],5000);
357355
}
358356

@@ -367,29 +365,28 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
367365
$factory->method('createChannel')->willReturn($amqpChannel);
368366
$factory->method('createQueue')->willReturn($delayQueue);
369367
$factory->method('createExchange')->will($this->onConsecutiveCalls(
370-
$delayExchange =$this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
371-
$amqpExchange =$this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
368+
$amqpExchange =$this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
369+
$delayExchange =$this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
372370
));
373371

374-
$amqpExchange->expects($this->once())->method('setName')->with('messages');
375-
$amqpExchange->method('getName')->willReturn('messages');
372+
$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
373+
$amqpExchange->expects($this->once())->method('declareExchange');
376374

377375
$delayExchange->expects($this->once())->method('setName')->with('delay');
378376
$delayExchange->expects($this->once())->method('declareExchange');
379-
$delayExchange->method('getName')->willReturn('delay');
380377

381378
$connectionOptions = [
382379
'retry' => [
383380
'dead_routing_key' =>'my_dead_routing_key',
384381
],
385382
];
386383

387-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages',$connectionOptions,$factory);
384+
$connection = Connection::fromDsn('amqp://localhost',$connectionOptions,$factory);
388385

389386
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages__120000');
390387
$delayQueue->expects($this->once())->method('setArguments')->with([
391388
'x-message-ttl' =>120000,
392-
'x-dead-letter-exchange' =>'messages',
389+
'x-dead-letter-exchange' =>self::DEFAULT_EXCHANGE_NAME,
393390
'x-dead-letter-routing-key' =>'',
394391
]);
395392

@@ -417,7 +414,7 @@ public function testObfuscatePasswordInDsn()
417414
new \AMQPConnectionException('Oups.')
418415
);
419416

420-
$connection = Connection::fromDsn('amqp://user:secretpassword@localhost/%2f/messages', [],$factory);
417+
$connection = Connection::fromDsn('amqp://user:secretpassword@localhost', [],$factory);
421418
$connection->channel();
422419
}
423420

@@ -432,7 +429,7 @@ public function testItCanPublishWithTheDefaultRoutingKey()
432429

433430
$amqpExchange->expects($this->once())->method('publish')->with('body','routing_key');
434431

435-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[default_publish_routing_key]=routing_key', [],$factory);
432+
$connection = Connection::fromDsn('amqp://localhost?exchange[default_publish_routing_key]=routing_key', [],$factory);
436433
$connection->publish('body');
437434
}
438435

@@ -447,7 +444,7 @@ public function testItCanPublishWithASuppliedRoutingKey()
447444

448445
$amqpExchange->expects($this->once())->method('publish')->with('body','routing_key');
449446

450-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[default_publish_routing_key]=default_routing_key', [],$factory);
447+
$connection = Connection::fromDsn('amqp://localhost?exchange[default_publish_routing_key]=default_routing_key', [],$factory);
451448
$connection->publish('body', [],0,newAmqpStamp('routing_key'));
452449
}
453450

@@ -462,29 +459,28 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument
462459
$factory->method('createChannel')->willReturn($amqpChannel);
463460
$factory->method('createQueue')->willReturn($delayQueue);
464461
$factory->method('createExchange')->will($this->onConsecutiveCalls(
465-
$delayExchange =$this->createMock(\AMQPExchange::class),
466-
$amqpExchange =$this->createMock(\AMQPExchange::class)
462+
$amqpExchange =$this->createMock(\AMQPExchange::class),
463+
$delayExchange =$this->createMock(\AMQPExchange::class)
467464
));
468465

469-
$amqpExchange->expects($this->once())->method('setName')->with('messages');
470-
$amqpExchange->method('getName')->willReturn('messages');
466+
$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
467+
$amqpExchange->expects($this->once())->method('declareExchange');
471468

472469
$delayExchange->expects($this->once())->method('setName')->with('delay');
473470
$delayExchange->expects($this->once())->method('declareExchange');
474-
$delayExchange->method('getName')->willReturn('delay');
475471

476472
$connectionOptions = [
477473
'retry' => [
478474
'dead_routing_key' =>'my_dead_routing_key',
479475
],
480476
];
481477

482-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages',$connectionOptions,$factory);
478+
$connection = Connection::fromDsn('amqp://localhost',$connectionOptions,$factory);
483479

484480
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages_routing_key_120000');
485481
$delayQueue->expects($this->once())->method('setArguments')->with([
486482
'x-message-ttl' =>120000,
487-
'x-dead-letter-exchange' =>'messages',
483+
'x-dead-letter-exchange' =>self::DEFAULT_EXCHANGE_NAME,
488484
'x-dead-letter-routing-key' =>'routing_key',
489485
]);
490486
$delayQueue->expects($this->once())->method('setArgument')->with(
@@ -515,7 +511,7 @@ public function testItCanPublishWithCustomFlagsAndAttributes()
515511
['delivery_mode' =>2,'headers' => ['type' => DummyMessage::class]]
516512
);
517513

518-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [],$factory);
514+
$connection = Connection::fromDsn('amqp://localhost', [],$factory);
519515
$connection->publish('body', ['type' => DummyMessage::class],0,newAmqpStamp('routing_key',AMQP_IMMEDIATE, ['delivery_mode' =>2]));
520516
}
521517
}

‎src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php‎

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -246,12 +246,12 @@ private function setupDelay(int $delay, ?string $routingKey)
246246
$this->clear();
247247
}
248248

249-
$exchange =$this->getDelayExchange();
250-
$exchange->declareExchange();
249+
$this->exchange()->declareExchange();// setup normal exchange for delay queue to DLX messages to
250+
$this->getDelayExchange()->declareExchange();
251251

252252
$queue =$this->createDelayQueue($delay,$routingKey);
253253
$queue->declareQueue();
254-
$queue->bind($exchange->getName(),$this->getRoutingKeyForDelay($delay,$routingKey));
254+
$queue->bind($this->connectionOptions['delay']['exchange_name'],$this->getRoutingKeyForDelay($delay,$routingKey));
255255
}
256256

257257
privatefunctiongetDelayExchange():\AMQPExchange
@@ -352,7 +352,7 @@ public function setup(): void
352352
foreach ($this->queuesOptionsas$queueName =>$queueConfig) {
353353
$this->queue($queueName)->declareQueue();
354354
foreach ($queueConfig['binding_keys'] ?? [null]as$bindingKey) {
355-
$this->queue($queueName)->bind($this->exchange()->getName(),$bindingKey);
355+
$this->queue($queueName)->bind($this->exchangeOptions['name'],$bindingKey);
356356
}
357357
}
358358
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp