Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit453a19d

Browse files
committed
[Messenger] Add SQS transport
1 parent40d04ec commit453a19d

19 files changed

+1260
-2
lines changed

‎.travis.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ env:
2121
-SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
2222
-MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
2323
-MESSENGER_REDIS_DSN=redis://127.0.0.1:7001/messages
24+
-MESSENGER_SQS_DSN=sqs://localhost:9494/messages?sslmode=disable
2425

2526
matrix:
2627
include:
@@ -61,6 +62,11 @@ before_install:
6162
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster grokzen/redis-cluster:5.0.4
6263
export REDIS_CLUSTER_HOSTS='localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'
6364
65+
-|
66+
# Start Sqs server
67+
docker pull feathj/fake-sqs
68+
docker run -d -p 9494:9494 --name sqs --net host feathj/fake-sqs
69+
6470
-|
6571
# General configuration
6672
set -e

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@
8282
<tagname="kernel.reset"method="reset" />
8383
</service>
8484

85+
<serviceid="messenger.transport.sqs.factory"class="Symfony\Component\Messenger\Transport\SqsExt\SqsTransportFactory">
86+
<tagname="messenger.transport_factory" />
87+
</service>
88+
8589
<!-- retry-->
8690
<serviceid="messenger.retry_strategy_locator">
8791
<tagname="container.service_locator" />

‎src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ CHANGELOG
1313
* Deprecated passing a`ContainerInterface` instance as first argument of the`ConsumeMessagesCommand` constructor,
1414
pass a`RoutableMessageBus` instance instead.
1515
* Added support for auto trimming of Redis streams.
16+
* Added AWS SQS transport.
1617

1718
4.3.0
1819
-----
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespaceSymfony\Component\Messenger\Tests\Transport\SqsExt;
13+
14+
usePHPUnit\Framework\TestCase;
15+
useSymfony\Component\Messenger\Exception\TransportException;
16+
useSymfony\Component\Messenger\Transport\SqsExt\Connection;
17+
useSymfony\Contracts\HttpClient\HttpClientInterface;
18+
useSymfony\Contracts\HttpClient\ResponseInterface;
19+
20+
class ConnectionTestextends TestCase
21+
{
22+
publicfunctiontestFromInvalidDsn()
23+
{
24+
$this->expectException(\InvalidArgumentException::class);
25+
$this->expectExceptionMessage('The given SQS DSN "sqs://" is invalid.');
26+
27+
Connection::fromDsn('sqs://');
28+
}
29+
30+
publicfunctiontestFromDsn()
31+
{
32+
$httpClient =$this->getMockBuilder(HttpClientInterface::class)->getMock();
33+
$this->assertEquals(
34+
newConnection(['endpoint' =>'https://localhost','queue_name' =>'queue'],$httpClient),
35+
Connection::fromDsn('sqs://localhost/queue', [],$httpClient)
36+
);
37+
}
38+
39+
publicfunctiontestFromDsnWithOptions()
40+
{
41+
$httpClient =$this->getMockBuilder(HttpClientInterface::class)->getMock();
42+
$this->assertEquals(
43+
newConnection(['endpoint' =>'https://localhost','account' =>'213','queue_name' =>'queue','buffer_size' =>1,'wait_time' =>5,'auto_setup' =>false],$httpClient),
44+
Connection::fromDsn('sqs://localhost/213/queue', ['buffer_size' =>1,'wait_time' =>5,'auto_setup' =>false],$httpClient)
45+
);
46+
}
47+
48+
publicfunctiontestFromDsnWithQueryOptions()
49+
{
50+
$httpClient =$this->getMockBuilder(HttpClientInterface::class)->getMock();
51+
$this->assertEquals(
52+
newConnection(['endpoint' =>'https://localhost','account' =>'213','queue_name' =>'queue','buffer_size' =>1,'wait_time' =>5,'auto_setup' =>false],$httpClient),
53+
Connection::fromDsn('sqs://localhost/213/queue?buffer_size=1&wait_time=5&auto_setup=0', [],$httpClient)
54+
);
55+
}
56+
57+
privatefunctionhandleGetQueueUrl(int$index,$mock):string
58+
{
59+
$response =$this->getMockBuilder(ResponseInterface::class)->getMock();
60+
61+
$mock->expects($this->at($index))->method('request')
62+
->with('POST','https://localhost', ['body' => ['Action' =>'GetQueueUrl','QueueName' =>'queue']])
63+
->willReturn($response);
64+
$response->expects($this->once())->method('getStatusCode')->willReturn(200);
65+
$response->expects($this->once())->method('getContent')->willReturn('<GetQueueUrlResponse>
66+
<GetQueueUrlResult>
67+
<QueueUrl>https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue</QueueUrl>
68+
</GetQueueUrlResult>
69+
<ResponseMetadata>
70+
<RequestId>470a6f13-2ed9-4181-ad8a-2fdea142988e</RequestId>
71+
</ResponseMetadata>
72+
</GetQueueUrlResponse>');
73+
74+
return'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue';
75+
}
76+
77+
publicfunctiontestKeepGettingPendingMessages()
78+
{
79+
$httpClient =$this->getMockBuilder(HttpClientInterface::class)->getMock();
80+
$response =$this->getMockBuilder(ResponseInterface::class)->getMock();
81+
82+
$queueUrl =$this->handleGetQueueUrl(0,$httpClient);
83+
84+
$httpClient->expects($this->at(1))->method('request')
85+
->with('POST',$queueUrl, ['body' => ['Action' =>'ReceiveMessage','VisibilityTimeout' =>null,'MaxNumberOfMessages' =>9,'WaitTimeSeconds' =>20]])
86+
->willReturn($response);
87+
$response->expects($this->once())->method('getContent')->willReturn('<ReceiveMessageResponse>
88+
<ReceiveMessageResult>
89+
<Message>
90+
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
91+
<ReceiptHandle>
92+
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
93+
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
94+
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
95+
</ReceiptHandle>
96+
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
97+
<Body>{"body":"this is a test","headers":{}}</Body>
98+
<Attribute>
99+
<Name>SenderId</Name>
100+
<Value>195004372649</Value>
101+
</Attribute>
102+
<Attribute>
103+
<Name>SentTimestamp</Name>
104+
<Value>1238099229000</Value>
105+
</Attribute>
106+
<Attribute>
107+
<Name>ApproximateReceiveCount</Name>
108+
<Value>5</Value>
109+
</Attribute>
110+
<Attribute>
111+
<Name>ApproximateFirstReceiveTimestamp</Name>
112+
<Value>1250700979248</Value>
113+
</Attribute>
114+
</Message>
115+
<Message>
116+
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
117+
<ReceiptHandle>
118+
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
119+
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
120+
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
121+
</ReceiptHandle>
122+
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
123+
<Body>{"body":"this is a test","headers":{}}</Body>
124+
<Attribute>
125+
<Name>SenderId</Name>
126+
<Value>195004372649</Value>
127+
</Attribute>
128+
<Attribute>
129+
<Name>SentTimestamp</Name>
130+
<Value>1238099229000</Value>
131+
</Attribute>
132+
<Attribute>
133+
<Name>ApproximateReceiveCount</Name>
134+
<Value>5</Value>
135+
</Attribute>
136+
<Attribute>
137+
<Name>ApproximateFirstReceiveTimestamp</Name>
138+
<Value>1250700979248</Value>
139+
</Attribute>
140+
</Message>
141+
<Message>
142+
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
143+
<ReceiptHandle>
144+
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
145+
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
146+
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
147+
</ReceiptHandle>
148+
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
149+
<Body>{"body":"this is a test","headers":{}}</Body>
150+
<Attribute>
151+
<Name>SenderId</Name>
152+
<Value>195004372649</Value>
153+
</Attribute>
154+
<Attribute>
155+
<Name>SentTimestamp</Name>
156+
<Value>1238099229000</Value>
157+
</Attribute>
158+
<Attribute>
159+
<Name>ApproximateReceiveCount</Name>
160+
<Value>5</Value>
161+
</Attribute>
162+
<Attribute>
163+
<Name>ApproximateFirstReceiveTimestamp</Name>
164+
<Value>1250700979248</Value>
165+
</Attribute>
166+
</Message>
167+
</ReceiveMessageResult>
168+
<ResponseMetadata>
169+
<RequestId>b6633655-283d-45b4-aee4-4e84e0ae6afa</RequestId>
170+
</ResponseMetadata>
171+
</ReceiveMessageResponse>');
172+
173+
$connection = Connection::fromDsn('sqs://localhost/queue', ['auto_setup' =>false],$httpClient);
174+
$this->assertNotNull($connection->get());
175+
$this->assertNotNull($connection->get());
176+
$this->assertNotNull($connection->get());
177+
}
178+
179+
publicfunctiontestUnexpectedSqsError()
180+
{
181+
$this->expectException(TransportException::class);
182+
$this->expectExceptionMessage('SQS error happens');
183+
184+
$httpClient =$this->getMockBuilder(HttpClientInterface::class)->getMock();
185+
$response =$this->getMockBuilder(ResponseInterface::class)->getMock();
186+
187+
$httpClient->expects($this->once())->method('request')->willReturn($response);
188+
$response->expects($this->once())->method('getStatusCode')->willReturn(400);
189+
$response->expects($this->once())->method('getContent')->willReturn('<ErrorResponse xmlns="http://queue.amazonaws.com/doc/2012-11-05/">
190+
<Error>
191+
<Type>Sender</Type>
192+
<Code>boom</Code>
193+
<Message>SQS error happens</Message>
194+
<Detail/>
195+
</Error>
196+
<RequestId>30441e49-5246-5231-9c87-4bd704b81ce9</RequestId>
197+
</ErrorResponse>');
198+
$connection = Connection::fromDsn('sqs://localhost/queue', [],$httpClient);
199+
$connection->get();
200+
}
201+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespaceSymfony\Component\Messenger\Tests\Transport\SqsExt;
13+
14+
usePHPUnit\Framework\TestCase;
15+
useSymfony\Component\Messenger\Tests\Fixtures\DummyMessage;
16+
useSymfony\Component\Messenger\Transport\SqsExt\Connection;
17+
18+
class SqsExtIntegrationTestextends TestCase
19+
{
20+
private$connection;
21+
22+
protectedfunctionsetUp()
23+
{
24+
if (!getenv('MESSENGER_SQS_DSN')) {
25+
$this->markTestSkipped('The "MESSENGER_SQS_DSN" environment variable is required.');
26+
}
27+
28+
$this->connection = Connection::fromDsn(getenv('MESSENGER_SQS_DSN'), []);
29+
$this->connection->setup();
30+
$this->clearSqs();
31+
}
32+
33+
publicfunctiontestConnectionSendAndGet()
34+
{
35+
$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
36+
$wait =0;
37+
while ((null ===$encoded =$this->connection->get()) &&$wait++ <200) {
38+
usleep(5000);
39+
}
40+
41+
$this->assertEquals('{"message": "Hi"}',$encoded['body']);
42+
$this->assertEquals(['type' => DummyMessage::class],$encoded['headers']);
43+
}
44+
45+
publicfunctiontestGetTheFirstAvailableMessage()
46+
{
47+
$this->connection->send('{"message": "Hi1"}', ['type' => DummyMessage::class]);
48+
$this->connection->send('{"message": "Hi2"}', ['type' => DummyMessage::class]);
49+
$wait =0;
50+
while ((null ===$encoded1 =$this->connection->get()) &&$wait++ <50) {
51+
usleep(5000);
52+
}
53+
$this->assertEquals(['type' => DummyMessage::class],$encoded1['headers']);
54+
55+
$wait =0;
56+
while ((null ===$encoded2 =$this->connection->get()) &&$wait++ <50) {
57+
usleep(5000);
58+
}
59+
$this->assertEquals(['type' => DummyMessage::class],$encoded2['headers']);
60+
61+
$messages = [$encoded2['body'],$encoded2['body']];
62+
sort($messages);
63+
$this->assertEquals(['{"message": "Hi1"}','{"message": "Hi2"}'],$messages);
64+
}
65+
66+
privatefunctionclearSqs()
67+
{
68+
$this->connection->requeuePrefetchedMessages();
69+
$wait =0;
70+
while ($wait++ <50) {
71+
if (null ===$message =$this->connection->get()) {
72+
usleep(5000);
73+
continue;
74+
}
75+
$this->connection->delete($message['id']);
76+
}
77+
}
78+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespaceSymfony\Component\Messenger\Tests\Transport\SqsExt;
13+
14+
usePHPUnit\Framework\TestCase;
15+
useSymfony\Component\Messenger\Exception\MessageDecodingFailedException;
16+
useSymfony\Component\Messenger\Tests\Fixtures\DummyMessage;
17+
useSymfony\Component\Messenger\Transport\Serialization\PhpSerializer;
18+
useSymfony\Component\Messenger\Transport\Serialization\Serializer;
19+
useSymfony\Component\Messenger\Transport\SqsExt\Connection;
20+
useSymfony\Component\Messenger\Transport\SqsExt\SqsReceiver;
21+
useSymfony\Component\SerializerasSerializerComponent;
22+
useSymfony\Component\Serializer\Encoder\JsonEncoder;
23+
useSymfony\Component\Serializer\Normalizer\ObjectNormalizer;
24+
25+
class SqsReceiverTestextends TestCase
26+
{
27+
publicfunctiontestItReturnsTheDecodedMessageToTheHandler()
28+
{
29+
$serializer =$this->createSerializer();
30+
31+
$sqsEnvelop =$this->createSqsEnvelope();
32+
$connection =$this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
33+
$connection->method('get')->willReturn($sqsEnvelop);
34+
35+
$receiver =newSqsReceiver($connection,$serializer);
36+
$actualEnvelopes =iterator_to_array($receiver->get());
37+
$this->assertCount(1,$actualEnvelopes);
38+
$this->assertEquals(newDummyMessage('Hi'),$actualEnvelopes[0]->getMessage());
39+
}
40+
41+
publicfunctiontestItRejectTheMessageIfThereIsAMessageDecodingFailedException()
42+
{
43+
$this->expectException(MessageDecodingFailedException::class);
44+
45+
$serializer =$this->createMock(PhpSerializer::class);
46+
$serializer->method('decode')->willThrowException(newMessageDecodingFailedException());
47+
48+
$sqsEnvelop =$this->createSqsEnvelope();
49+
$connection =$this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
50+
$connection->method('get')->willReturn($sqsEnvelop);
51+
$connection->expects($this->once())->method('delete');
52+
53+
$receiver =newSqsReceiver($connection,$serializer);
54+
iterator_to_array($receiver->get());
55+
}
56+
57+
privatefunctioncreateSqsEnvelope()
58+
{
59+
return [
60+
'id' =>1,
61+
'body' =>'{"message": "Hi"}',
62+
'headers' => [
63+
'type' => DummyMessage::class,
64+
],
65+
];
66+
}
67+
68+
privatefunctioncreateSerializer():Serializer
69+
{
70+
$serializer =newSerializer(
71+
newSerializerComponent\Serializer([newObjectNormalizer()], ['json' =>newJsonEncoder()])
72+
);
73+
74+
return$serializer;
75+
}
76+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp