Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit278ff6f

Browse files
committed
[Messenger][Amqp-messenger] Support content encoding and compression
1 parent4d09837 commit278ff6f

File tree

8 files changed

+180
-1
lines changed

8 files changed

+180
-1
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespaceSymfony\Component\Messenger\Bridge\Amqp\Compressor;
13+
14+
useSymfony\Component\Messenger\Exception\InvalidArgumentException;
15+
16+
class CompressorFactory
17+
{
18+
publicstaticfunctioncreateCompressor(string$mimeContentEncoding)
19+
{
20+
if ('gzip' ===$mimeContentEncoding) {
21+
returnnewGzip();
22+
}elseif ('deflate') {
23+
returnnewDeflate();
24+
}
25+
26+
thrownewInvalidArgumentException(sprintf('The MIME content encoding of the message cannot be decompressed "%s".',$mimeContentEncoding));
27+
}
28+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespaceSymfony\Component\Messenger\Bridge\Amqp\Compressor;
13+
14+
interface CompressorInterface
15+
{
16+
publicfunctioncompress(mixed$data):string;
17+
18+
publicfunctiondecompress(mixed$data):mixed;
19+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespaceSymfony\Component\Messenger\Bridge\Amqp\Compressor;
13+
14+
class Deflateimplements CompressorInterface
15+
{
16+
publicfunctioncompress(mixed$data):string
17+
{
18+
returngzdeflate($data);
19+
}
20+
21+
publicfunctiondecompress(mixed$data):mixed
22+
{
23+
$decompressData =gzinflate($data);
24+
if (false ===$decompressData) {
25+
return$data;
26+
}
27+
28+
return$decompressData;
29+
}
30+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespaceSymfony\Component\Messenger\Bridge\Amqp\Compressor;
13+
14+
class Gzipimplements CompressorInterface
15+
{
16+
publicfunctioncompress(mixed$data):string
17+
{
18+
returngzencode($data);
19+
}
20+
21+
publicfunctiondecompress(mixed$data):mixed
22+
{
23+
$decompressData =gzdecode($data);
24+
if (false ===$decompressData) {
25+
return$data;
26+
}
27+
28+
return$decompressData;
29+
}
30+
}

‎src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpReceiverTest.php

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,40 @@ public function testItReturnsTheDecodedMessageToTheHandler()
4646
$this->assertEquals(newDummyMessage('Hi'),$actualEnvelopes[0]->getMessage());
4747
}
4848

49+
publicfunctiontestItReturnsTheGzipDecompressMessageToTheHandler()
50+
{
51+
$serializer =newSerializer(
52+
newSerializerComponent\Serializer([newObjectNormalizer()], ['json' =>newJsonEncoder()])
53+
);
54+
55+
$amqpEnvelope =$this->createAMQPEnvelopeWithContentEncodingGzip();
56+
$connection =$this->createMock(Connection::class);
57+
$connection->method('getQueueNames')->willReturn(['queueName']);
58+
$connection->method('get')->with('queueName')->willReturn($amqpEnvelope);
59+
60+
$receiver =newAmqpReceiver($connection,$serializer);
61+
$actualEnvelopes =iterator_to_array($receiver->get());
62+
$this->assertCount(1,$actualEnvelopes);
63+
$this->assertEquals(newDummyMessage('Hi'),$actualEnvelopes[0]->getMessage());
64+
}
65+
66+
publicfunctiontestItReturnsTheDeflateDecompressMessageToTheHandler()
67+
{
68+
$serializer =newSerializer(
69+
newSerializerComponent\Serializer([newObjectNormalizer()], ['json' =>newJsonEncoder()])
70+
);
71+
72+
$amqpEnvelope =$this->createAMQPEnvelopeWithContentEncodingGzip();
73+
$connection =$this->createMock(Connection::class);
74+
$connection->method('getQueueNames')->willReturn(['queueName']);
75+
$connection->method('get')->with('queueName')->willReturn($amqpEnvelope);
76+
77+
$receiver =newAmqpReceiver($connection,$serializer);
78+
$actualEnvelopes =iterator_to_array($receiver->get());
79+
$this->assertCount(1,$actualEnvelopes);
80+
$this->assertEquals(newDummyMessage('Hi'),$actualEnvelopes[0]->getMessage());
81+
}
82+
4983
publicfunctiontestItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
5084
{
5185
$this->expectException(TransportException::class);
@@ -84,4 +118,28 @@ private function createAMQPEnvelope(): \AMQPEnvelope
84118

85119
return$envelope;
86120
}
121+
122+
privatefunctioncreateAMQPEnvelopeWithContentEncodingGzip():\AMQPEnvelope
123+
{
124+
$envelope =$this->createMock(\AMQPEnvelope::class);
125+
$envelope->method('getBody')->willReturn(gzencode('{"message": "Hi"}'));
126+
$envelope->method('getContentEncoding')->willReturn('gzip');
127+
$envelope->method('getHeaders')->willReturn([
128+
'type' => DummyMessage::class,
129+
]);
130+
131+
return$envelope;
132+
}
133+
134+
privatefunctioncreateAMQPEnvelopeWithContentEncodingDeflate():\AMQPEnvelope
135+
{
136+
$envelope =$this->createMock(\AMQPEnvelope::class);
137+
$envelope->method('getBody')->willReturn(gzdeflate('{"message": "Hi"}'));
138+
$envelope->method('getContentEncoding')->willReturn('gzip');
139+
$envelope->method('getHeaders')->willReturn([
140+
'type' => DummyMessage::class,
141+
]);
142+
143+
return$envelope;
144+
}
87145
}

‎src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
namespaceSymfony\Component\Messenger\Bridge\Amqp\Transport;
1313

14+
useSymfony\Component\Messenger\Bridge\Amqp\Compressor\CompressorFactory;
1415
useSymfony\Component\Messenger\Envelope;
1516
useSymfony\Component\Messenger\Exception\LogicException;
1617
useSymfony\Component\Messenger\Exception\MessageDecodingFailedException;
@@ -63,6 +64,12 @@ private function getEnvelope(string $queueName): iterable
6364
$body =$amqpEnvelope->getBody();
6465

6566
try {
67+
$contentEncoding =$amqpEnvelope->getContentEncoding();
68+
if ($contentEncoding) {
69+
$compressor = CompressorFactory::createCompressor($contentEncoding);
70+
$body =$compressor->decompress($body);
71+
}
72+
6673
$envelope =$this->serializer->decode([
6774
'body' =>false ===$body ?'' :$body,// workaround https://github.com/pdezwart/php-amqp/issues/351
6875
'headers' =>$amqpEnvelope->getHeaders(),

‎src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
namespaceSymfony\Component\Messenger\Bridge\Amqp\Transport;
1313

14+
useSymfony\Component\Messenger\Bridge\Amqp\Compressor\CompressorFactory;
1415
useSymfony\Component\Messenger\Exception\InvalidArgumentException;
1516
useSymfony\Component\Messenger\Exception\LogicException;
1617

@@ -332,6 +333,11 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, string
332333
$attributes['delivery_mode'] =$attributes['delivery_mode'] ??2;
333334
$attributes['timestamp'] =$attributes['timestamp'] ??time();
334335

336+
if (isset($attributes['content_encoding'])) {
337+
$compressor = CompressorFactory::createCompressor($attributes['content_encoding']);
338+
$body =$compressor->compress($body);
339+
}
340+
335341
$exchange->publish(
336342
$body,
337343
$routingKey,

‎src/Symfony/Component/Messenger/Bridge/Amqp/composer.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424
"symfony/event-dispatcher":"^5.4|^6.0",
2525
"symfony/process":"^5.4|^6.0",
2626
"symfony/property-access":"^5.4|^6.0",
27-
"symfony/serializer":"^5.4|^6.0"
27+
"symfony/serializer":"^5.4|^6.0",
28+
"ext-zlib":"*"
2829
},
2930
"autoload": {
3031
"psr-4": {"Symfony\\Component\\Messenger\\Bridge\\Amqp\\":"" },

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp