Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitc66a791

Browse files
committed
[Messenger] Add BeanstalkdPriorityStamp to Beanstalkd bridge
1 parent8c73cb3 commitc66a791

File tree

9 files changed

+164
-23
lines changed

9 files changed

+164
-23
lines changed

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
CHANGELOG
22
=========
33

4-
6.4
4+
7.1
55
---
66

77
* Add`bury_on_reject` option to bury failed messages instead of deleting them
8+
* Add`BeanstalkdPriorityStamp` option to allow setting the message priority
89

910
5.2.0
1011
-----

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdReceiverTest.php

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
usePHPUnit\Framework\TestCase;
1515
useSymfony\Component\Messenger\Bridge\Beanstalkd\Tests\Fixtures\DummyMessage;
16+
useSymfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdPriorityStamp;
1617
useSymfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceivedStamp;
1718
useSymfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceiver;
1819
useSymfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection;
@@ -74,7 +75,8 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
7475
$beanstalkdEnvelope =$this->createBeanstalkdEnvelope();
7576
$connection =$this->createMock(Connection::class);
7677
$connection->expects($this->once())->method('get')->willReturn($beanstalkdEnvelope);
77-
$connection->expects($this->once())->method('reject');
78+
$connection->expects($this->once())->method('getMessagePriority')->with($beanstalkdEnvelope['id'])->willReturn(2);
79+
$connection->expects($this->once())->method('reject')->with($beanstalkdEnvelope['id'],2);
7880

7981
$receiver =newBeanstalkdReceiver($connection,$serializer);
8082
$receiver->get();
@@ -83,14 +85,14 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
8385
/**
8486
* @dataProvider provideRejectCases
8587
*/
86-
publicfunctiontestReject(array$stamps,bool$forceDelete)
88+
publicfunctiontestReject(array$stamps,?int$priority,bool$forceDelete)
8789
{
8890
$serializer =$this->createSerializer();
8991

9092
$id ='some id';
9193

9294
$connection =$this->createMock(Connection::class);
93-
$connection->expects($this->once())->method('reject')->with($id,$forceDelete);
95+
$connection->expects($this->once())->method('reject')->with($id,$priority,$forceDelete);
9496

9597
$envelope = (newEnvelope(newDummyMessage('Oy')))->with(newBeanstalkdReceivedStamp($id,'foo bar'));
9698
foreach ($stampsas$stamp) {
@@ -103,9 +105,10 @@ public function testReject(array $stamps, bool $forceDelete)
103105

104106
publicstaticfunctionprovideRejectCases():iterable
105107
{
106-
yield'No stamp' => [[],false];
107-
yield'With sent for retry true' => [[newSentForRetryStamp(true)],true];
108-
yield'With sent for retry false' => [[newSentForRetryStamp(false)],false];
108+
yield'No stamp' => [[],null,false];
109+
yield'With sent for retry true' => [[newSentForRetryStamp(true)],null,true];
110+
yield'With sent for retry true and priority' => [[newBeanstalkdPriorityStamp(2),newSentForRetryStamp(true)],2,true];
111+
yield'With sent for retry false' => [[newSentForRetryStamp(false)],null,false];
109112
}
110113

111114
privatefunctioncreateBeanstalkdEnvelope():array

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdSenderTest.php

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
usePHPUnit\Framework\TestCase;
1515
useSymfony\Component\Messenger\Bridge\Beanstalkd\Tests\Fixtures\DummyMessage;
16+
useSymfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdPriorityStamp;
1617
useSymfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdSender;
1718
useSymfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection;
1819
useSymfony\Component\Messenger\Envelope;
@@ -27,7 +28,7 @@ public function testSend()
2728
$encoded = ['body' =>'...','headers' => ['type' => DummyMessage::class]];
2829

2930
$connection =$this->createMock(Connection::class);
30-
$connection->expects($this->once())->method('send')->with($encoded['body'],$encoded['headers'],0);
31+
$connection->expects($this->once())->method('send')->with($encoded['body'],$encoded['headers'],0,null);
3132

3233
$serializer =$this->createMock(SerializerInterface::class);
3334
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
@@ -42,7 +43,22 @@ public function testSendWithDelay()
4243
$encoded = ['body' =>'...','headers' => ['type' => DummyMessage::class]];
4344

4445
$connection =$this->createMock(Connection::class);
45-
$connection->expects($this->once())->method('send')->with($encoded['body'],$encoded['headers'],500);
46+
$connection->expects($this->once())->method('send')->with($encoded['body'],$encoded['headers'],500,null);
47+
48+
$serializer =$this->createMock(SerializerInterface::class);
49+
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
50+
51+
$sender =newBeanstalkdSender($connection,$serializer);
52+
$sender->send($envelope);
53+
}
54+
55+
publicfunctiontestSendWithPriority()
56+
{
57+
$envelope = (newEnvelope(newDummyMessage('Oy')))->with(newBeanstalkdPriorityStamp(2));
58+
$encoded = ['body' =>'...','headers' => ['type' => DummyMessage::class]];
59+
60+
$connection =$this->createMock(Connection::class);
61+
$connection->expects($this->once())->method('send')->with($encoded['body'],$encoded['headers'],0,2);
4662

4763
$serializer =$this->createMock(SerializerInterface::class);
4864
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php

Lines changed: 87 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ public function testReject(bool $buryOnReject, bool $forceDelete)
229229

230230
$connection =newConnection(['tube_name' =>$tube,'bury_on_reject' =>$buryOnReject],$client);
231231

232-
$connection->reject((string)$id,$forceDelete);
232+
$connection->reject((string)$id,null,$forceDelete);
233233
}
234234

235235
publicfunctiontestRejectWithBury()
@@ -240,13 +240,29 @@ public function testRejectWithBury()
240240

241241
$client =$this->createMock(PheanstalkInterface::class);
242242
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
243-
$client->expects($this->once())->method('bury')->with($this->callback(fn (JobId$jobId):bool =>$jobId->getId() ===$id));
243+
$client->expects($this->once())->method('bury')->with($this->callback(fn (JobId$jobId):bool =>$jobId->getId() ===$id),1024);
244244

245245
$connection =newConnection(['tube_name' =>$tube,'bury_on_reject' =>true],$client);
246246

247247
$connection->reject((string)$id);
248248
}
249249

250+
publicfunctiontestRejectWithBuryAndPriority()
251+
{
252+
$id =123456;
253+
$priority =2;
254+
255+
$tube ='baz';
256+
257+
$client =$this->createMock(PheanstalkInterface::class);
258+
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
259+
$client->expects($this->once())->method('bury')->with($this->callback(fn (JobId$jobId):bool =>$jobId->getId() ===$id),$priority);
260+
261+
$connection =newConnection(['tube_name' =>$tube,'bury_on_reject' =>true],$client);
262+
263+
$connection->reject((string)$id,$priority);
264+
}
265+
250266
publicfunctiontestRejectWhenABeanstalkdExceptionOccurs()
251267
{
252268
$id =123456;
@@ -296,6 +312,40 @@ public function testMessageCountWhenABeanstalkdExceptionOccurs()
296312
$connection->getMessageCount();
297313
}
298314

315+
publicfunctiontestMessagePriority()
316+
{
317+
$id =123456;
318+
$priority =51;
319+
320+
$tube ='baz';
321+
322+
$response =newArrayResponse('OK', ['pri' =>$priority]);
323+
324+
$client =$this->createMock(PheanstalkInterface::class);
325+
$client->expects($this->once())->method('statsJob')->with($this->callback(fn (JobId$jobId):bool =>$jobId->getId() ===$id))->willReturn($response);
326+
327+
$connection =newConnection(['tube_name' =>$tube],$client);
328+
329+
$this->assertSame($priority,$connection->getMessagePriority((string)$id));
330+
}
331+
332+
publicfunctiontestMessagePriorityWhenABeanstalkdExceptionOccurs()
333+
{
334+
$id =123456;
335+
336+
$tube ='baz1234';
337+
338+
$exception =newClientException('foobar error');
339+
340+
$client =$this->createMock(PheanstalkInterface::class);
341+
$client->expects($this->once())->method('statsJob')->with($this->callback(fn (JobId$jobId):bool =>$jobId->getId() ===$id))->willThrowException($exception);
342+
343+
$connection =newConnection(['tube_name' =>$tube],$client);
344+
345+
$this->expectExceptionObject(newTransportException($exception->getMessage(),0,$exception));
346+
$connection->getMessagePriority((string)$id);
347+
}
348+
299349
publicfunctiontestSend()
300350
{
301351
$tube ='xyz';
@@ -330,6 +380,41 @@ public function testSend()
330380
$this->assertSame($id, (int)$returnedId);
331381
}
332382

383+
publicfunctiontestSendWithPriority()
384+
{
385+
$tube ='xyz';
386+
387+
$body ='foo';
388+
$headers = ['test' =>'bar'];
389+
$delay =1000;
390+
$priority =2;
391+
$expectedDelay =$delay /1000;
392+
393+
$id =110;
394+
395+
$client =$this->createMock(PheanstalkInterface::class);
396+
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
397+
$client->expects($this->once())->method('put')->with(
398+
$this->callback(function (string$data)use ($body,$headers):bool {
399+
$expectedMessage =json_encode([
400+
'body' =>$body,
401+
'headers' =>$headers,
402+
]);
403+
404+
return$expectedMessage ===$data;
405+
}),
406+
$priority,
407+
$expectedDelay,
408+
90
409+
)->willReturn(newJob($id,'foobar'));
410+
411+
$connection =newConnection(['tube_name' =>$tube],$client);
412+
413+
$returnedId =$connection->send($body,$headers,$delay,$priority);
414+
415+
$this->assertSame($id, (int)$returnedId);
416+
}
417+
333418
publicfunctiontestSendWhenABeanstalkdExceptionOccurs()
334419
{
335420
$tube ='xyz';
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespaceSymfony\Component\Messenger\Bridge\Beanstalkd\Transport;
13+
14+
useSymfony\Component\Messenger\Stamp\StampInterface;
15+
16+
finalclass BeanstalkdPriorityStampimplements StampInterface
17+
{
18+
publicfunction__construct(
19+
publicreadonlyint$priority,
20+
) {
21+
}
22+
}

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public function get(): iterable
4848
'headers' =>$beanstalkdEnvelope['headers'],
4949
]);
5050
}catch (MessageDecodingFailedException$exception) {
51-
$this->connection->reject($beanstalkdEnvelope['id']);
51+
$this->connection->reject($beanstalkdEnvelope['id'],$this->connection->getMessagePriority($beanstalkdEnvelope['id']));
5252

5353
throw$exception;
5454
}
@@ -65,6 +65,7 @@ public function reject(Envelope $envelope): void
6565
{
6666
$this->connection->reject(
6767
$this->findBeanstalkdReceivedStamp($envelope)->getId(),
68+
$envelope->last(BeanstalkdPriorityStamp::class)?->priority,
6869
$envelope->last(SentForRetryStamp::class)?->isSent ??false,
6970
);
7071
}

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdSender.php

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,12 @@ public function send(Envelope $envelope): Envelope
3535
{
3636
$encodedMessage =$this->serializer->encode($envelope);
3737

38-
/** @var DelayStamp|null $delayStamp */
39-
$delayStamp =$envelope->last(DelayStamp::class);
40-
$delayInMs =null !==$delayStamp ?$delayStamp->getDelay() :0;
41-
42-
$this->connection->send($encodedMessage['body'],$encodedMessage['headers'] ?? [],$delayInMs);
38+
$this->connection->send(
39+
$encodedMessage['body'],
40+
$encodedMessage['headers'] ?? [],
41+
$envelope->last(DelayStamp::class)?->getDelay() ??0,
42+
$envelope->last(BeanstalkdPriorityStamp::class)?->priority,
43+
);
4344

4445
return$envelope;
4546
}

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -117,11 +117,12 @@ public function getTube(): string
117117
}
118118

119119
/**
120-
* @param int $delay The delay in milliseconds
120+
* @param int $delay The delay in milliseconds
121+
* @param ?int $priority The priority by which the message will be reserved
121122
*
122123
* @return string The inserted id
123124
*/
124-
publicfunctionsend(string$body,array$headers,int$delay =0):string
125+
publicfunctionsend(string$body,array$headers,int$delay =0, ?int$priority =null):string
125126
{
126127
$message =json_encode([
127128
'body' =>$body,
@@ -135,7 +136,7 @@ public function send(string $body, array $headers, int $delay = 0): string
135136
try {
136137
$job =$this->client->useTube($this->tube)->put(
137138
$message,
138-
PheanstalkInterface::DEFAULT_PRIORITY,
139+
$priority ??PheanstalkInterface::DEFAULT_PRIORITY,
139140
$delay /1000,
140141
$this->ttr
141142
);
@@ -183,11 +184,11 @@ public function ack(string $id): void
183184
}
184185
}
185186

186-
publicfunctionreject(string$id,bool$forceDelete =false):void
187+
publicfunctionreject(string$id,?int$priority =null,bool$forceDelete =false):void
187188
{
188189
try {
189190
if (!$forceDelete &&$this->buryOnReject) {
190-
$this->client->useTube($this->tube)->bury(newJobId((int)$id));
191+
$this->client->useTube($this->tube)->bury(newJobId((int)$id),$priority ?? PheanstalkInterface::DEFAULT_PRIORITY);
191192
}else {
192193
$this->client->useTube($this->tube)->delete(newJobId((int)$id));
193194
}
@@ -207,4 +208,15 @@ public function getMessageCount(): int
207208

208209
return (int)$tubeStats['current-jobs-ready'];
209210
}
211+
212+
publicfunctiongetMessagePriority(string$id):int
213+
{
214+
try {
215+
$jobStats =$this->client->statsJob(newJobId((int)$id));
216+
}catch (Exception$exception) {
217+
thrownewTransportException($exception->getMessage(),0,$exception);
218+
}
219+
220+
return (int)$jobStats['pri'];
221+
}
210222
}

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
"require": {
1515
"php":">=8.2",
1616
"pda/pheanstalk":"^4.0",
17-
"symfony/messenger":"^6.4|^7.0"
17+
"symfony/messenger":"^7.1"
1818
},
1919
"require-dev": {
2020
"symfony/property-access":"^6.4|^7.0",

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp