Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3de3e4e

Browse files
committed
feature#30913 [Messenger] Uses anAmqpStamp to provide flags and attributes (sroze)
This PR was merged into the 4.3-dev branch.Discussion----------[Messenger] Uses an `AmqpStamp` to provide flags and attributes| Q | A| ------------- | ---| Branch? | master| Bug fix? | no| New feature? | yes| BC breaks? | no| Deprecations? | no| Tests pass? | yes| Fixed tickets |#28885| License | MIT| Doc PR | øUsing the `AmqpStamp` you can configure the flags and any attribute (such as `delivery_mode`).Commits-------56fa574 Uses an `AmqpStamp` to provide flags and attributes
2 parents8a62892 +56fa574 commit3de3e4e

File tree

8 files changed

+113
-50
lines changed

8 files changed

+113
-50
lines changed

‎src/Symfony/Component/Messenger/CHANGELOG.md‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ CHANGELOG
1818
changed: a required 3rd`SerializerInterface` argument was added.
1919
* Added a new`SyncTransport` along with`ForceCallHandlersStamp` to
2020
explicitly handle messages synchronously.
21-
* Added`AmqpRoutingKeyStamp` allowing to provide a routing key on message publishing.
21+
* Added`AmqpStamp` allowing to provide a routing key, flags and attributes on message publishing.
2222
*[BC BREAK] Removed publishing with a`routing_key` option from queue configuration, for
23-
AMQP. Use exchange`default_publish_routing_key` or`AmqpRoutingKeyStamp` instead.
23+
AMQP. Use exchange`default_publish_routing_key` or`AmqpStamp` instead.
2424
*[BC BREAK] Changed the`queue` option in the AMQP transport DSN to be`queues[name]`. You can
2525
therefore name the queue but also configure`binding_keys`,`flags` and`arguments`.
2626
*[BC BREAK] The methods`get`,`ack`,`nack` and`queue` of the AMQP`Connection`

‎src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpRoutingKeyStampTest.php‎

Lines changed: 0 additions & 24 deletions
This file was deleted.

‎src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php‎

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
usePHPUnit\Framework\TestCase;
1515
useSymfony\Component\Messenger\Envelope;
1616
useSymfony\Component\Messenger\Tests\Fixtures\DummyMessage;
17-
useSymfony\Component\Messenger\Transport\AmqpExt\AmqpRoutingKeyStamp;
1817
useSymfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
18+
useSymfony\Component\Messenger\Transport\AmqpExt\AmqpStamp;
1919
useSymfony\Component\Messenger\Transport\AmqpExt\Connection;
2020
useSymfony\Component\Messenger\Transport\Serialization\SerializerInterface;
2121

@@ -41,14 +41,14 @@ public function testItSendsTheEncodedMessage()
4141

4242
publicfunctiontestItSendsTheEncodedMessageUsingARoutingKey()
4343
{
44-
$envelope = (newEnvelope(newDummyMessage('Oy')))->with(newAmqpRoutingKeyStamp('rk'));
44+
$envelope = (newEnvelope(newDummyMessage('Oy')))->with($stamp =newAmqpStamp('rk'));
4545
$encoded = ['body' =>'...','headers' => ['type' => DummyMessage::class]];
4646

4747
$serializer =$this->createMock(SerializerInterface::class);
4848
$serializer->method('encode')->with($envelope)->willReturn($encoded);
4949

5050
$connection =$this->createMock(Connection::class);
51-
$connection->expects($this->once())->method('publish')->with($encoded['body'],$encoded['headers'],0,'rk');
51+
$connection->expects($this->once())->method('publish')->with($encoded['body'],$encoded['headers'],0,$stamp);
5252

5353
$sender =newAmqpSender($connection,$serializer);
5454
$sender->send($envelope);
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespaceSymfony\Component\Messenger\Tests\Transport\AmqpExt;
13+
14+
usePHPUnit\Framework\TestCase;
15+
useSymfony\Component\Messenger\Transport\AmqpExt\AmqpStamp;
16+
17+
/**
18+
* @requires extension amqp
19+
*/
20+
class AmqpStampTestextends TestCase
21+
{
22+
publicfunctiontestRoutingKeyOnly()
23+
{
24+
$stamp =newAmqpStamp('routing_key');
25+
$this->assertSame('routing_key',$stamp->getRoutingKey());
26+
$this->assertSame(AMQP_NOPARAM,$stamp->getFlags());
27+
$this->assertSame([],$stamp->getAttributes());
28+
}
29+
30+
publicfunctiontestFlagsAndAttributes()
31+
{
32+
$stamp =newAmqpStamp(null,AMQP_DURABLE, ['delivery_mode' =>'unknown']);
33+
$this->assertNull($stamp->getRoutingKey());
34+
$this->assertSame(AMQP_DURABLE,$stamp->getFlags());
35+
$this->assertSame(['delivery_mode' =>'unknown'],$stamp->getAttributes());
36+
}
37+
}

‎src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php‎

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313

1414
usePHPUnit\Framework\TestCase;
1515
useSymfony\Component\Messenger\Exception\InvalidArgumentException;
16+
useSymfony\Component\Messenger\Tests\Fixtures\DummyMessage;
1617
useSymfony\Component\Messenger\Transport\AmqpExt\AmqpFactory;
18+
useSymfony\Component\Messenger\Transport\AmqpExt\AmqpStamp;
1719
useSymfony\Component\Messenger\Transport\AmqpExt\Connection;
1820

1921
/**
@@ -430,7 +432,7 @@ public function testItCanPublishWithASuppliedRoutingKey()
430432
$amqpExchange->expects($this->once())->method('publish')->with('body','routing_key');
431433

432434
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[default_publish_routing_key]=default_routing_key', [],$factory);
433-
$connection->publish('body', [],0,'routing_key');
435+
$connection->publish('body', [],0,newAmqpStamp('routing_key'));
434436
}
435437

436438
publicfunctiontestItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument()
@@ -477,7 +479,27 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument
477479
$delayQueue->expects($this->once())->method('bind')->with('delay','delay_120000');
478480

479481
$delayExchange->expects($this->once())->method('publish')->with('{}','delay_120000',AMQP_NOPARAM, ['headers' => []]);
480-
$connection->publish('{}', [],120000,'routing_key');
482+
$connection->publish('{}', [],120000,newAmqpStamp('routing_key'));
483+
}
484+
485+
publicfunctiontestItCanPublishWithCustomFlagsAndAttributes()
486+
{
487+
$factory =newTestAmqpFactory(
488+
$amqpConnection =$this->createMock(\AMQPConnection::class),
489+
$amqpChannel =$this->createMock(\AMQPChannel::class),
490+
$amqpQueue =$this->createMock(\AMQPQueue::class),
491+
$amqpExchange =$this->createMock(\AMQPExchange::class)
492+
);
493+
494+
$amqpExchange->expects($this->once())->method('publish')->with(
495+
'body',
496+
'routing_key',
497+
AMQP_IMMEDIATE,
498+
['delivery_mode' =>2,'headers' => ['type' => DummyMessage::class]]
499+
);
500+
501+
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [],$factory);
502+
$connection->publish('body', ['type' => DummyMessage::class],0,newAmqpStamp('routing_key',AMQP_IMMEDIATE, ['delivery_mode' =>2]));
481503
}
482504
}
483505

‎src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php‎

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,12 @@ public function send(Envelope $envelope): Envelope
5151
}
5252

5353
try {
54-
/** @var $routingKeyStamp AmqpRoutingKeyStamp */
55-
$routingKeyStamp =$envelope->last(AmqpRoutingKeyStamp::class);
56-
$routingKey =$routingKeyStamp ?$routingKeyStamp->getRoutingKey() :null;
57-
58-
$this->connection->publish($encodedMessage['body'],$encodedMessage['headers'] ?? [],$delay,$routingKey);
54+
$this->connection->publish(
55+
$encodedMessage['body'],
56+
$encodedMessage['headers'] ?? [],
57+
$delay,
58+
$envelope->last(AmqpStamp::class)
59+
);
5960
}catch (\AMQPException$e) {
6061
thrownewTransportException($e->getMessage(),0,$e);
6162
}

‎src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpRoutingKeyStamp.php‎renamed to ‎src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpStamp.php‎

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,35 @@
1515

1616
/**
1717
* @author Guillaume Gammelin <ggammelin@gmail.com>
18+
* @author Samuel Roze <samuel.roze@gmail.com>
1819
*
1920
* @experimental in 4.3
2021
*/
21-
finalclassAmqpRoutingKeyStampimplements StampInterface
22+
finalclassAmqpStampimplements StampInterface
2223
{
2324
private$routingKey;
25+
private$flags;
26+
private$attributes;
2427

25-
publicfunction__construct(string$routingKey)
28+
publicfunction__construct(string$routingKey =null,int$flags =AMQP_NOPARAM,array$attributes = [])
2629
{
2730
$this->routingKey =$routingKey;
31+
$this->flags =$flags;
32+
$this->attributes =$attributes;
2833
}
2934

30-
publicfunctiongetRoutingKey():string
35+
publicfunctiongetRoutingKey():?string
3136
{
3237
return$this->routingKey;
3338
}
39+
40+
publicfunctiongetFlags():int
41+
{
42+
return$this->flags;
43+
}
44+
45+
publicfunctiongetAttributes():array
46+
{
47+
return$this->attributes;
48+
}
3449
}

‎src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php‎

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -171,10 +171,10 @@ private static function normalizeQueueArguments(array $arguments): array
171171
*
172172
* @throws \AMQPException
173173
*/
174-
publicfunctionpublish(string$body,array$headers = [],int$delay =0,string$routingKey =null):void
174+
publicfunctionpublish(string$body,array$headers = [],int$delay =0,AmqpStamp$amqpStamp =null):void
175175
{
176176
if (0 !==$delay) {
177-
$this->publishWithDelay($body,$headers,$delay,$routingKey);
177+
$this->publishWithDelay($body,$headers,$delay,$amqpStamp);
178178

179179
return;
180180
}
@@ -183,13 +183,14 @@ public function publish(string $body, array $headers = [], int $delay = 0, strin
183183
$this->setup();
184184
}
185185

186-
$this->exchange()->publish(
186+
$this->publishOnExchange(
187+
$this->exchange(),
187188
$body,
188-
$routingKey ??$this->getDefaultPublishRoutingKey(),
189-
AMQP_NOPARAM,
189+
(null !==$amqpStamp ?$amqpStamp->getRoutingKey() :null) ??$this->getDefaultPublishRoutingKey(),
190190
[
191191
'headers' =>$headers,
192-
]
192+
],
193+
$amqpStamp
193194
);
194195
}
195196

@@ -206,19 +207,30 @@ public function countMessagesInQueues(): int
206207
/**
207208
* @throws \AMQPException
208209
*/
209-
privatefunctionpublishWithDelay(string$body,array$headers,int$delay,?string$exchangeRoutingKey)
210+
privatefunctionpublishWithDelay(string$body,array$headers,int$delay,AmqpStamp$amqpStamp =null)
210211
{
211212
if ($this->shouldSetup()) {
212-
$this->setupDelay($delay,$exchangeRoutingKey);
213+
$this->setupDelay($delay,null !==$amqpStamp ?$amqpStamp->getRoutingKey() :null);
213214
}
214215

215-
$this->getDelayExchange()->publish(
216+
$this->publishOnExchange(
217+
$this->getDelayExchange(),
216218
$body,
217219
$this->getRoutingKeyForDelay($delay),
218-
AMQP_NOPARAM,
219220
[
220221
'headers' =>$headers,
221-
]
222+
],
223+
$amqpStamp
224+
);
225+
}
226+
227+
privatefunctionpublishOnExchange(\AMQPExchange$exchange,string$body,string$routingKey =null,array$attributes = [],AmqpStamp$amqpStamp =null)
228+
{
229+
$exchange->publish(
230+
$body,
231+
$routingKey,
232+
$amqpStamp ?$amqpStamp->getFlags() :AMQP_NOPARAM,
233+
array_merge($amqpStamp ?$amqpStamp->getAttributes() : [],$attributes)
222234
);
223235
}
224236

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp