Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit017420b

Browse files
committed
feature#36094 [AmazonSqsMessenger] Use AsyncAws to handle SQS communication (jderusse)
This PR was squashed before being merged into the 5.1-dev branch.Discussion----------[AmazonSqsMessenger] Use AsyncAws to handle SQS communication| Q | A| ------------- | ---| Branch? | master| Bug fix? | no| New feature? | yes| Deprecations? | no| Tickets | /| License | MIT| Doc PR | /Similar to#35992 this PR use AsyncAws to handle Sqs messages sent/receiveIt move complexity of authentication/streaming outside Symfony while keeping HttpClient integration.Commits-------7c4888e [AmazonSqsMessenger] Use AsyncAws to handle SQS communication
2 parents87a5701 +7c4888e commit017420b

File tree

8 files changed

+174
-317
lines changed

8 files changed

+174
-317
lines changed

‎composer.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@
105105
"amphp/http-client":"^4.2",
106106
"amphp/http-tunnel":"^1.0",
107107
"async-aws/ses":"^1.0",
108+
"async-aws/sqs":"^1.0",
108109
"cache/integration-tests":"dev-master",
109110
"doctrine/annotations":"~1.0",
110111
"doctrine/cache":"~1.6",

‎src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsIntegrationTest.php

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
namespaceSymfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;
1313

14+
useAsyncAws\Sqs\SqsClient;
1415
usePHPUnit\Framework\TestCase;
1516
useSymfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage;
1617
useSymfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
@@ -39,7 +40,7 @@ private function execute(string $dsn): void
3940
{
4041
$connection = Connection::fromDsn($dsn, []);
4142
$connection->setup();
42-
$this->clearSqs($connection);
43+
$this->clearSqs($dsn);
4344

4445
$connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
4546
$this->assertSame(1,$connection->getMessageCount());
@@ -53,15 +54,12 @@ private function execute(string $dsn): void
5354
$this->assertEquals(['type' => DummyMessage::class],$encoded['headers']);
5455
}
5556

56-
privatefunctionclearSqs(Connection$connection):void
57+
privatefunctionclearSqs(string$dsn):void
5758
{
58-
$wait =0;
59-
while ($wait++ <50) {
60-
if (null ===$message =$connection->get()) {
61-
usleep(5000);
62-
continue;
63-
}
64-
$connection->delete($message['id']);
65-
}
59+
$url =parse_url($dsn);
60+
$client =newSqsClient(['endpoint' =>"http://{$url['host']}:{$url['port']}"]);
61+
$client->purgeQueue([
62+
'QueueUrl' =>$client->getQueueUrl(['QueueName' =>ltrim($url['path'],'/')])->getQueueUrl(),
63+
]);
6664
}
6765
}

‎src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/ConnectionTest.php

Lines changed: 54 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,15 @@
1111

1212
namespaceSymfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;
1313

14+
useAsyncAws\Core\Exception\Http\HttpException;
15+
useAsyncAws\Core\Test\ResultMockFactory;
16+
useAsyncAws\Sqs\Result\GetQueueUrlResult;
17+
useAsyncAws\Sqs\Result\ReceiveMessageResult;
18+
useAsyncAws\Sqs\SqsClient;
19+
useAsyncAws\Sqs\ValueObject\Message;
1420
usePHPUnit\Framework\TestCase;
1521
useSymfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
16-
useSymfony\Component\Messenger\Exception\TransportException;
1722
useSymfony\Contracts\HttpClient\HttpClientInterface;
18-
useSymfony\Contracts\HttpClient\ResponseInterface;
1923

2024
class ConnectionTestextends TestCase
2125
{
@@ -31,7 +35,7 @@ public function testFromDsn()
3135
{
3236
$httpClient =$this->getMockBuilder(HttpClientInterface::class)->getMock();
3337
$this->assertEquals(
34-
newConnection(['endpoint' =>'https://sqs.eu-west-1.amazonaws.com','queue_name' =>'queue'],$httpClient),
38+
newConnection(['queue_name' =>'queue'],newSqsClient(['region' =>'eu-west-1','accessKeyId' =>null,'accessKeySecret' =>null],null,$httpClient)),
3539
Connection::fromDsn('sqs://default/queue', [],$httpClient)
3640
);
3741
}
@@ -40,16 +44,16 @@ public function testFromDsnWithRegion()
4044
{
4145
$httpClient =$this->getMockBuilder(HttpClientInterface::class)->getMock();
4246
$this->assertEquals(
43-
newConnection(['endpoint' =>'https://sqs.us-east-1.amazonaws.com','queue_name' =>'queue','region' =>'us-east-1'],$httpClient),
44-
Connection::fromDsn('sqs://default/queue?region=us-east-1', [],$httpClient)
47+
newConnection(['queue_name' =>'queue'],newSqsClient(['region' =>'us-west-2','accessKeyId' =>null,'accessKeySecret' =>null],null,$httpClient)),
48+
Connection::fromDsn('sqs://default/queue?region=us-west-2', [],$httpClient)
4549
);
4650
}
4751

4852
publicfunctiontestFromDsnWithCustomEndpoint()
4953
{
5054
$httpClient =$this->getMockBuilder(HttpClientInterface::class)->getMock();
5155
$this->assertEquals(
52-
newConnection(['endpoint' =>'https://localhost','queue_name' =>'queue'],$httpClient),
56+
newConnection(['queue_name' =>'queue'],newSqsClient(['region' =>'eu-west-1','endpoint' =>'https://localhost','accessKeyId' =>null,'accessKeySecret' =>null],null,$httpClient)),
5357
Connection::fromDsn('sqs://localhost/queue', [],$httpClient)
5458
);
5559
}
@@ -58,7 +62,7 @@ public function testFromDsnWithCustomEndpointAndPort()
5862
{
5963
$httpClient =$this->getMockBuilder(HttpClientInterface::class)->getMock();
6064
$this->assertEquals(
61-
newConnection(['endpoint' =>'https://localhost:1234','queue_name' =>'queue'],$httpClient),
65+
newConnection(['queue_name' =>'queue'],newSqsClient(['region' =>'eu-west-1','endpoint' =>'https://localhost:1234','accessKeyId' =>null,'accessKeySecret' =>null],null,$httpClient)),
6266
Connection::fromDsn('sqs://localhost:1234/queue', [],$httpClient)
6367
);
6468
}
@@ -67,7 +71,7 @@ public function testFromDsnWithOptions()
6771
{
6872
$httpClient =$this->getMockBuilder(HttpClientInterface::class)->getMock();
6973
$this->assertEquals(
70-
newConnection(['endpoint' =>'https://sqs.eu-west-1.amazonaws.com','account' =>'213','queue_name' =>'queue','buffer_size' =>1,'wait_time' =>5,'auto_setup' =>false],$httpClient),
74+
newConnection(['account' =>'213','queue_name' =>'queue','buffer_size' =>1,'wait_time' =>5,'auto_setup' =>false],newSqsClient(['region' =>'eu-west-1','accessKeyId' =>null,'accessKeySecret' =>null],null,$httpClient)),
7175
Connection::fromDsn('sqs://default/213/queue', ['buffer_size' =>1,'wait_time' =>5,'auto_setup' =>false],$httpClient)
7276
);
7377
}
@@ -76,153 +80,63 @@ public function testFromDsnWithQueryOptions()
7680
{
7781
$httpClient =$this->getMockBuilder(HttpClientInterface::class)->getMock();
7882
$this->assertEquals(
79-
newConnection(['endpoint' =>'https://sqs.eu-west-1.amazonaws.com','account' =>'213','queue_name' =>'queue','buffer_size' =>1,'wait_time' =>5,'auto_setup' =>false],$httpClient),
83+
newConnection(['account' =>'213','queue_name' =>'queue','buffer_size' =>1,'wait_time' =>5,'auto_setup' =>false],newSqsClient(['region' =>'eu-west-1','accessKeyId' =>null,'accessKeySecret' =>null],null,$httpClient)),
8084
Connection::fromDsn('sqs://default/213/queue?buffer_size=1&wait_time=5&auto_setup=0', [],$httpClient)
8185
);
8286
}
8387

84-
privatefunctionhandleGetQueueUrl(int$index,$mock):string
85-
{
86-
$response =$this->getMockBuilder(ResponseInterface::class)->getMock();
87-
88-
$mock->expects($this->at($index))->method('request')
89-
->with('POST','https://localhost', ['body' => ['Action' =>'GetQueueUrl','QueueName' =>'queue']])
90-
->willReturn($response);
91-
$response->expects($this->once())->method('getStatusCode')->willReturn(200);
92-
$response->expects($this->once())->method('getContent')->willReturn('<GetQueueUrlResponse>
93-
<GetQueueUrlResult>
94-
<QueueUrl>https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue</QueueUrl>
95-
</GetQueueUrlResult>
96-
<ResponseMetadata>
97-
<RequestId>470a6f13-2ed9-4181-ad8a-2fdea142988e</RequestId>
98-
</ResponseMetadata>
99-
</GetQueueUrlResponse>');
100-
101-
return'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue';
102-
}
103-
10488
publicfunctiontestKeepGettingPendingMessages()
10589
{
106-
$httpClient =$this->getMockBuilder(HttpClientInterface::class)->getMock();
107-
$response =$this->getMockBuilder(ResponseInterface::class)->getMock();
108-
109-
$queueUrl =$this->handleGetQueueUrl(0,$httpClient);
110-
111-
$httpClient->expects($this->at(1))->method('request')
112-
->with('POST',$queueUrl, ['body' => ['Action' =>'ReceiveMessage','VisibilityTimeout' =>null,'MaxNumberOfMessages' =>9,'WaitTimeSeconds' =>20,'MessageAttributeName.1' =>'All']])
113-
->willReturn($response);
114-
$response->expects($this->once())->method('getContent')->willReturn('<ReceiveMessageResponse>
115-
<ReceiveMessageResult>
116-
<Message>
117-
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
118-
<ReceiptHandle>
119-
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
120-
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
121-
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
122-
</ReceiptHandle>
123-
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
124-
<Body>{"body":"this is a test","headers":{}}</Body>
125-
<Attribute>
126-
<Name>SenderId</Name>
127-
<Value>195004372649</Value>
128-
</Attribute>
129-
<Attribute>
130-
<Name>SentTimestamp</Name>
131-
<Value>1238099229000</Value>
132-
</Attribute>
133-
<Attribute>
134-
<Name>ApproximateReceiveCount</Name>
135-
<Value>5</Value>
136-
</Attribute>
137-
<Attribute>
138-
<Name>ApproximateFirstReceiveTimestamp</Name>
139-
<Value>1250700979248</Value>
140-
</Attribute>
141-
</Message>
142-
<Message>
143-
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
144-
<ReceiptHandle>
145-
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
146-
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
147-
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
148-
</ReceiptHandle>
149-
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
150-
<Body>{"body":"this is a test","headers":{}}</Body>
151-
<Attribute>
152-
<Name>SenderId</Name>
153-
<Value>195004372649</Value>
154-
</Attribute>
155-
<Attribute>
156-
<Name>SentTimestamp</Name>
157-
<Value>1238099229000</Value>
158-
</Attribute>
159-
<Attribute>
160-
<Name>ApproximateReceiveCount</Name>
161-
<Value>5</Value>
162-
</Attribute>
163-
<Attribute>
164-
<Name>ApproximateFirstReceiveTimestamp</Name>
165-
<Value>1250700979248</Value>
166-
</Attribute>
167-
</Message>
168-
<Message>
169-
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
170-
<ReceiptHandle>
171-
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
172-
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
173-
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
174-
</ReceiptHandle>
175-
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
176-
<Body>{"body":"this is a test","headers":{}}</Body>
177-
<Attribute>
178-
<Name>SenderId</Name>
179-
<Value>195004372649</Value>
180-
</Attribute>
181-
<Attribute>
182-
<Name>SentTimestamp</Name>
183-
<Value>1238099229000</Value>
184-
</Attribute>
185-
<Attribute>
186-
<Name>ApproximateReceiveCount</Name>
187-
<Value>5</Value>
188-
</Attribute>
189-
<Attribute>
190-
<Name>ApproximateFirstReceiveTimestamp</Name>
191-
<Value>1250700979248</Value>
192-
</Attribute>
193-
</Message>
194-
</ReceiveMessageResult>
195-
<ResponseMetadata>
196-
<RequestId>b6633655-283d-45b4-aee4-4e84e0ae6afa</RequestId>
197-
</ResponseMetadata>
198-
</ReceiveMessageResponse>');
199-
200-
$connection = Connection::fromDsn('sqs://localhost/queue', ['auto_setup' =>false],$httpClient);
90+
$client =$this->createMock(SqsClient::class);
91+
$client->expects($this->any())
92+
->method('getQueueUrl')
93+
->with(['QueueName' =>'queue','QueueOwnerAWSAccountId' =>123])
94+
->willReturn(ResultMockFactory::create(GetQueueUrlResult::class, ['QueueUrl' =>'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue']));
95+
$client->expects($this->at(1))
96+
->method('receiveMessage')
97+
->with([
98+
'QueueUrl' =>'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue',
99+
'MaxNumberOfMessages' =>9,
100+
'WaitTimeSeconds' =>20,
101+
'MessageAttributeNames' => ['All'],
102+
'VisibilityTimeout' =>null,
103+
])
104+
->willReturn(ResultMockFactory::create(ReceiveMessageResult::class, ['Messages' => [
105+
newMessage(['MessageId' =>1,'Body' =>'this is a test']),
106+
newMessage(['MessageId' =>2,'Body' =>'this is a test']),
107+
newMessage(['MessageId' =>3,'Body' =>'this is a test']),
108+
]]));
109+
$client->expects($this->at(2))
110+
->method('receiveMessage')
111+
->with([
112+
'QueueUrl' =>'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue',
113+
'MaxNumberOfMessages' =>9,
114+
'WaitTimeSeconds' =>20,
115+
'MessageAttributeNames' => ['All'],
116+
'VisibilityTimeout' =>null,
117+
])
118+
->willReturn(ResultMockFactory::create(ReceiveMessageResult::class, ['Messages' => [
119+
]]));
120+
121+
$connection =newConnection(['queue_name' =>'queue','account' =>123,'auto_setup' =>false],$client);
201122
$this->assertNotNull($connection->get());
202123
$this->assertNotNull($connection->get());
203124
$this->assertNotNull($connection->get());
125+
$this->assertNull($connection->get());
204126
}
205127

206128
publicfunctiontestUnexpectedSqsError()
207129
{
208-
$this->expectException(TransportException::class);
130+
$this->expectException(HttpException::class);
209131
$this->expectExceptionMessage('SQS error happens');
210132

211-
$httpClient =$this->getMockBuilder(HttpClientInterface::class)->getMock();
212-
$response =$this->getMockBuilder(ResponseInterface::class)->getMock();
213-
214-
$httpClient->expects($this->once())->method('request')->willReturn($response);
215-
$response->expects($this->once())->method('getStatusCode')->willReturn(400);
216-
$response->expects($this->once())->method('getContent')->willReturn('<ErrorResponse xmlns="http://queue.amazonaws.com/doc/2012-11-05/">
217-
<Error>
218-
<Type>Sender</Type>
219-
<Code>boom</Code>
220-
<Message>SQS error happens</Message>
221-
<Detail/>
222-
</Error>
223-
<RequestId>30441e49-5246-5231-9c87-4bd704b81ce9</RequestId>
224-
</ErrorResponse>');
225-
$connection = Connection::fromDsn('sqs://localhost/queue', [],$httpClient);
133+
$client =$this->createMock(SqsClient::class);
134+
$client->expects($this->any())
135+
->method('getQueueUrl')
136+
->with(['QueueName' =>'queue','QueueOwnerAWSAccountId' =>123])
137+
->willReturn(ResultMockFactory::createFailing(GetQueueUrlResult::class,400,'SQS error happens'));
138+
139+
$connection =newConnection(['queue_name' =>'queue','account' =>123,'auto_setup' =>false],$client);
226140
$connection->get();
227141
}
228142
}

‎src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsReceiver.php

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
namespaceSymfony\Component\Messenger\Bridge\AmazonSqs\Transport;
1313

14+
useAsyncAws\Core\Exception\Http\HttpException;
1415
useSymfony\Component\Messenger\Envelope;
1516
useSymfony\Component\Messenger\Exception\LogicException;
1617
useSymfony\Component\Messenger\Exception\MessageDecodingFailedException;
@@ -19,7 +20,6 @@
1920
useSymfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
2021
useSymfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2122
useSymfony\Component\Messenger\Transport\Serialization\SerializerInterface;
22-
useSymfony\Contracts\HttpClient\Exception\HttpExceptionInterface;
2323

2424
/**
2525
* @author Jérémy Derussé <jeremy@derusse.com>
@@ -42,7 +42,7 @@ public function get(): iterable
4242
{
4343
try {
4444
$sqsEnvelope =$this->connection->get();
45-
}catch (HttpExceptionInterface$e) {
45+
}catch (HttpException$e) {
4646
thrownewTransportException($e->getMessage(),0,$e);
4747
}
4848
if (null ===$sqsEnvelope) {
@@ -70,7 +70,7 @@ public function ack(Envelope $envelope): void
7070
{
7171
try {
7272
$this->connection->delete($this->findSqsReceivedStamp($envelope)->getId());
73-
}catch (HttpExceptionInterface$e) {
73+
}catch (HttpException$e) {
7474
thrownewTransportException($e->getMessage(),0,$e);
7575
}
7676
}
@@ -82,7 +82,7 @@ public function reject(Envelope $envelope): void
8282
{
8383
try {
8484
$this->connection->delete($this->findSqsReceivedStamp($envelope)->getId());
85-
}catch (HttpExceptionInterface$e) {
85+
}catch (HttpException$e) {
8686
thrownewTransportException($e->getMessage(),0,$e);
8787
}
8888
}
@@ -94,7 +94,7 @@ public function getMessageCount(): int
9494
{
9595
try {
9696
return$this->connection->getMessageCount();
97-
}catch (HttpExceptionInterface$e) {
97+
}catch (HttpException$e) {
9898
thrownewTransportException($e->getMessage(),0,$e);
9999
}
100100
}

‎src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsSender.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@
1111

1212
namespaceSymfony\Component\Messenger\Bridge\AmazonSqs\Transport;
1313

14+
useAsyncAws\Core\Exception\Http\HttpException;
1415
useSymfony\Component\Messenger\Envelope;
1516
useSymfony\Component\Messenger\Exception\TransportException;
1617
useSymfony\Component\Messenger\Stamp\DelayStamp;
1718
useSymfony\Component\Messenger\Transport\Sender\SenderInterface;
1819
useSymfony\Component\Messenger\Transport\Serialization\SerializerInterface;
19-
useSymfony\Contracts\HttpClient\Exception\HttpExceptionInterface;
2020

2121
/**
2222
* @author Jérémy Derussé <jeremy@derusse.com>
@@ -61,7 +61,7 @@ public function send(Envelope $envelope): Envelope
6161
$messageGroupId,
6262
$messageDeduplicationId
6363
);
64-
}catch (HttpExceptionInterface$e) {
64+
}catch (HttpException$e) {
6565
thrownewTransportException($e->getMessage(),0,$e);
6666
}
6767

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp