Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit222bede

Browse files
committed
[Messenger] Add BeanstalkdPriorityStamp to Beanstalkd bridge
1 parent4d6d36c commit222bede

File tree

8 files changed

+162
-21
lines changed

8 files changed

+162
-21
lines changed

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ CHANGELOG
55
---
66

77
* Add`bury_on_reject` option to bury failed messages instead of deleting them
8+
* Add`BeanstalkdPriorityStamp` option to allow setting the message priority
89

910
5.2.0
1011
-----

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdReceiverTest.php

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
usePHPUnit\Framework\TestCase;
1515
useSymfony\Component\Messenger\Bridge\Beanstalkd\Tests\Fixtures\DummyMessage;
16+
useSymfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdPriorityStamp;
1617
useSymfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceivedStamp;
1718
useSymfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceiver;
1819
useSymfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection;
@@ -74,7 +75,8 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
7475
$beanstalkdEnvelope =$this->createBeanstalkdEnvelope();
7576
$connection =$this->createMock(Connection::class);
7677
$connection->expects($this->once())->method('get')->willReturn($beanstalkdEnvelope);
77-
$connection->expects($this->once())->method('reject');
78+
$connection->expects($this->once())->method('getMessagePriority')->with($beanstalkdEnvelope['id'])->willReturn(2);
79+
$connection->expects($this->once())->method('reject')->with($beanstalkdEnvelope['id'],2);
7880

7981
$receiver =newBeanstalkdReceiver($connection,$serializer);
8082
$receiver->get();
@@ -83,14 +85,14 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
8385
/**
8486
* @dataProvider provideRejectCases
8587
*/
86-
publicfunctiontestReject(array$stamps,bool$forceDelete)
88+
publicfunctiontestReject(array$stamps,?int$priority,bool$forceDelete)
8789
{
8890
$serializer =$this->createSerializer();
8991

9092
$id ='some id';
9193

9294
$connection =$this->createMock(Connection::class);
93-
$connection->expects($this->once())->method('reject')->with($id,$forceDelete);
95+
$connection->expects($this->once())->method('reject')->with($id,$priority,$forceDelete);
9496

9597
$envelope = (newEnvelope(newDummyMessage('Oy')))->with(newBeanstalkdReceivedStamp($id,'foo bar'));
9698
foreach ($stampsas$stamp) {
@@ -103,9 +105,10 @@ public function testReject(array $stamps, bool $forceDelete)
103105

104106
publicstaticfunctionprovideRejectCases():iterable
105107
{
106-
yield'No stamp' => [[],false];
107-
yield'With sent for retry true' => [[newSentForRetryStamp(true)],true];
108-
yield'With sent for retry false' => [[newSentForRetryStamp(false)],false];
108+
yield'No stamp' => [[],null,false];
109+
yield'With sent for retry true' => [[newSentForRetryStamp(true)],null,true];
110+
yield'With sent for retry true and priority' => [[newBeanstalkdPriorityStamp(2),newSentForRetryStamp(true)],2,true];
111+
yield'With sent for retry false' => [[newSentForRetryStamp(false)],null,false];
109112
}
110113

111114
privatefunctioncreateBeanstalkdEnvelope():array

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdSenderTest.php

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
usePHPUnit\Framework\TestCase;
1515
useSymfony\Component\Messenger\Bridge\Beanstalkd\Tests\Fixtures\DummyMessage;
16+
useSymfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdPriorityStamp;
1617
useSymfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdSender;
1718
useSymfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection;
1819
useSymfony\Component\Messenger\Envelope;
@@ -27,7 +28,7 @@ public function testSend()
2728
$encoded = ['body' =>'...','headers' => ['type' => DummyMessage::class]];
2829

2930
$connection =$this->createMock(Connection::class);
30-
$connection->expects($this->once())->method('send')->with($encoded['body'],$encoded['headers'],0);
31+
$connection->expects($this->once())->method('send')->with($encoded['body'],$encoded['headers'],0,null);
3132

3233
$serializer =$this->createMock(SerializerInterface::class);
3334
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
@@ -42,7 +43,22 @@ public function testSendWithDelay()
4243
$encoded = ['body' =>'...','headers' => ['type' => DummyMessage::class]];
4344

4445
$connection =$this->createMock(Connection::class);
45-
$connection->expects($this->once())->method('send')->with($encoded['body'],$encoded['headers'],500);
46+
$connection->expects($this->once())->method('send')->with($encoded['body'],$encoded['headers'],500,null);
47+
48+
$serializer =$this->createMock(SerializerInterface::class);
49+
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
50+
51+
$sender =newBeanstalkdSender($connection,$serializer);
52+
$sender->send($envelope);
53+
}
54+
55+
publicfunctiontestSendWithPriority()
56+
{
57+
$envelope = (newEnvelope(newDummyMessage('Oy')))->with(newBeanstalkdPriorityStamp(2));
58+
$encoded = ['body' =>'...','headers' => ['type' => DummyMessage::class]];
59+
60+
$connection =$this->createMock(Connection::class);
61+
$connection->expects($this->once())->method('send')->with($encoded['body'],$encoded['headers'],0,2);
4662

4763
$serializer =$this->createMock(SerializerInterface::class);
4864
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php

Lines changed: 87 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ public function testReject(bool $buryOnReject, bool $forceDelete)
229229

230230
$connection =newConnection(['tube_name' =>$tube,'bury_on_reject' =>$buryOnReject],$client);
231231

232-
$connection->reject((string)$id,$forceDelete);
232+
$connection->reject((string)$id,null,$forceDelete);
233233
}
234234

235235
publicfunctiontestRejectWithBury()
@@ -240,13 +240,29 @@ public function testRejectWithBury()
240240

241241
$client =$this->createMock(PheanstalkInterface::class);
242242
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
243-
$client->expects($this->once())->method('bury')->with($this->callback(fn (JobId$jobId):bool =>$jobId->getId() ===$id));
243+
$client->expects($this->once())->method('bury')->with($this->callback(fn (JobId$jobId):bool =>$jobId->getId() ===$id),1024);
244244

245245
$connection =newConnection(['tube_name' =>$tube,'bury_on_reject' =>true],$client);
246246

247247
$connection->reject((string)$id);
248248
}
249249

250+
publicfunctiontestRejectWithBuryAndPriority()
251+
{
252+
$id =123456;
253+
$priority =2;
254+
255+
$tube ='baz';
256+
257+
$client =$this->createMock(PheanstalkInterface::class);
258+
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
259+
$client->expects($this->once())->method('bury')->with($this->callback(fn (JobId$jobId):bool =>$jobId->getId() ===$id),$priority);
260+
261+
$connection =newConnection(['tube_name' =>$tube,'bury_on_reject' =>true],$client);
262+
263+
$connection->reject((string)$id,$priority);
264+
}
265+
250266
publicfunctiontestRejectWhenABeanstalkdExceptionOccurs()
251267
{
252268
$id =123456;
@@ -296,6 +312,40 @@ public function testMessageCountWhenABeanstalkdExceptionOccurs()
296312
$connection->getMessageCount();
297313
}
298314

315+
publicfunctiontestMessagePriority()
316+
{
317+
$id =123456;
318+
$priority =51;
319+
320+
$tube ='baz';
321+
322+
$response =newArrayResponse('OK', ['pri' =>$priority]);
323+
324+
$client =$this->createMock(PheanstalkInterface::class);
325+
$client->expects($this->once())->method('statsJob')->with($this->callback(fn (JobId$jobId):bool =>$jobId->getId() ===$id))->willReturn($response);
326+
327+
$connection =newConnection(['tube_name' =>$tube],$client);
328+
329+
$this->assertSame($priority,$connection->getMessagePriority((string)$id));
330+
}
331+
332+
publicfunctiontestMessagePriorityWhenABeanstalkdExceptionOccurs()
333+
{
334+
$id =123456;
335+
336+
$tube ='baz1234';
337+
338+
$exception =newClientException('foobar error');
339+
340+
$client =$this->createMock(PheanstalkInterface::class);
341+
$client->expects($this->once())->method('statsJob')->with($this->callback(fn (JobId$jobId):bool =>$jobId->getId() ===$id))->willThrowException($exception);
342+
343+
$connection =newConnection(['tube_name' =>$tube],$client);
344+
345+
$this->expectExceptionObject(newTransportException($exception->getMessage(),0,$exception));
346+
$connection->getMessagePriority((string)$id);
347+
}
348+
299349
publicfunctiontestSend()
300350
{
301351
$tube ='xyz';
@@ -330,6 +380,41 @@ public function testSend()
330380
$this->assertSame($id, (int)$returnedId);
331381
}
332382

383+
publicfunctiontestSendWithPriority()
384+
{
385+
$tube ='xyz';
386+
387+
$body ='foo';
388+
$headers = ['test' =>'bar'];
389+
$delay =1000;
390+
$priority =2;
391+
$expectedDelay =$delay /1000;
392+
393+
$id =110;
394+
395+
$client =$this->createMock(PheanstalkInterface::class);
396+
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
397+
$client->expects($this->once())->method('put')->with(
398+
$this->callback(function (string$data)use ($body,$headers):bool {
399+
$expectedMessage =json_encode([
400+
'body' =>$body,
401+
'headers' =>$headers,
402+
]);
403+
404+
return$expectedMessage ===$data;
405+
}),
406+
$priority,
407+
$expectedDelay,
408+
90
409+
)->willReturn(newJob($id,'foobar'));
410+
411+
$connection =newConnection(['tube_name' =>$tube],$client);
412+
413+
$returnedId =$connection->send($body,$headers,$delay,$priority);
414+
415+
$this->assertSame($id, (int)$returnedId);
416+
}
417+
333418
publicfunctiontestSendWhenABeanstalkdExceptionOccurs()
334419
{
335420
$tube ='xyz';
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespaceSymfony\Component\Messenger\Bridge\Beanstalkd\Transport;
13+
14+
useSymfony\Component\Messenger\Stamp\StampInterface;
15+
16+
finalclass BeanstalkdPriorityStampimplements StampInterface
17+
{
18+
publicfunction__construct(
19+
publicreadonlyint$priority,
20+
) {
21+
}
22+
}

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public function get(): iterable
4848
'headers' =>$beanstalkdEnvelope['headers'],
4949
]);
5050
}catch (MessageDecodingFailedException$exception) {
51-
$this->connection->reject($beanstalkdEnvelope['id']);
51+
$this->connection->reject($beanstalkdEnvelope['id'],$this->connection->getMessagePriority($beanstalkdEnvelope['id']));
5252

5353
throw$exception;
5454
}
@@ -65,6 +65,7 @@ public function reject(Envelope $envelope): void
6565
{
6666
$this->connection->reject(
6767
$this->findBeanstalkdReceivedStamp($envelope)->getId(),
68+
$envelope->last(BeanstalkdPriorityStamp::class)?->priority,
6869
$envelope->last(SentForRetryStamp::class)?->isSent ??false,
6970
);
7071
}

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdSender.php

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,12 @@ public function send(Envelope $envelope): Envelope
3535
{
3636
$encodedMessage =$this->serializer->encode($envelope);
3737

38-
/** @var DelayStamp|null $delayStamp */
39-
$delayStamp =$envelope->last(DelayStamp::class);
40-
$delayInMs =null !==$delayStamp ?$delayStamp->getDelay() :0;
41-
42-
$this->connection->send($encodedMessage['body'],$encodedMessage['headers'] ?? [],$delayInMs);
38+
$this->connection->send(
39+
$encodedMessage['body'],
40+
$encodedMessage['headers'] ?? [],
41+
$envelope->last(DelayStamp::class)?->getDelay() ??0,
42+
$envelope->last(BeanstalkdPriorityStamp::class)?->priority,
43+
);
4344

4445
return$envelope;
4546
}

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -116,11 +116,12 @@ public function getTube(): string
116116
}
117117

118118
/**
119-
* @param int $delay The delay in milliseconds
119+
* @param int $delay The delay in milliseconds
120+
* @param ?int $priority The priority by which the message will be reserved
120121
*
121122
* @return string The inserted id
122123
*/
123-
publicfunctionsend(string$body,array$headers,int$delay =0):string
124+
publicfunctionsend(string$body,array$headers,int$delay =0,int$priority =null):string
124125
{
125126
$message =json_encode([
126127
'body' =>$body,
@@ -134,7 +135,7 @@ public function send(string $body, array $headers, int $delay = 0): string
134135
try {
135136
$job =$this->client->useTube($this->tube)->put(
136137
$message,
137-
PheanstalkInterface::DEFAULT_PRIORITY,
138+
$priority ??PheanstalkInterface::DEFAULT_PRIORITY,
138139
$delay /1000,
139140
$this->ttr
140141
);
@@ -182,11 +183,11 @@ public function ack(string $id): void
182183
}
183184
}
184185

185-
publicfunctionreject(string$id,bool$forceDelete =false):void
186+
publicfunctionreject(string$id,int$priority =null,bool$forceDelete =false):void
186187
{
187188
try {
188189
if (!$forceDelete &&$this->buryOnReject) {
189-
$this->client->useTube($this->tube)->bury(newJobId((int)$id));
190+
$this->client->useTube($this->tube)->bury(newJobId((int)$id),$priority ?? PheanstalkInterface::DEFAULT_PRIORITY);
190191
}else {
191192
$this->client->useTube($this->tube)->delete(newJobId((int)$id));
192193
}
@@ -205,4 +206,15 @@ public function getMessageCount(): int
205206

206207
return (int)$tubeStats['current-jobs-ready'];
207208
}
209+
210+
publicfunctiongetMessagePriority(string$id):int
211+
{
212+
try {
213+
$jobStats =$this->client->statsJob(newJobId((int)$id));
214+
}catch (Exception$exception) {
215+
thrownewTransportException($exception->getMessage(),0,$exception);
216+
}
217+
218+
return (int)$jobStats['pri'];
219+
}
208220
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp