Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9f4b8a4

Browse files
HypeMCfabpot
authored andcommitted
[Messenger] Addbury_on_reject option to Beanstalkd bridge
1 parentd4566b2 commit9f4b8a4

File tree

10 files changed

+254
-14
lines changed

10 files changed

+254
-14
lines changed

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ CHANGELOG
55
---
66

77
* Add`BeanstalkdPriorityStamp` option to allow setting the message priority
8+
* Add`bury_on_reject` option to bury failed messages instead of deleting them
89

910
7.2
1011
---

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdReceiverTest.php

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,13 @@
1313

1414
usePHPUnit\Framework\TestCase;
1515
useSymfony\Component\Messenger\Bridge\Beanstalkd\Tests\Fixtures\DummyMessage;
16+
useSymfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdPriorityStamp;
1617
useSymfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceivedStamp;
1718
useSymfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceiver;
1819
useSymfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection;
1920
useSymfony\Component\Messenger\Envelope;
2021
useSymfony\Component\Messenger\Exception\MessageDecodingFailedException;
22+
useSymfony\Component\Messenger\Stamp\SentForRetryStamp;
2123
useSymfony\Component\Messenger\Stamp\TransportMessageIdStamp;
2224
useSymfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2325
useSymfony\Component\Messenger\Transport\Serialization\Serializer;
@@ -81,12 +83,42 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
8183
$beanstalkdEnvelope =$this->createBeanstalkdEnvelope();
8284
$connection =$this->createMock(Connection::class);
8385
$connection->expects($this->once())->method('get')->willReturn($beanstalkdEnvelope);
84-
$connection->expects($this->once())->method('reject');
86+
$connection->expects($this->once())->method('getMessagePriority')->with($beanstalkdEnvelope['id'])->willReturn(2);
87+
$connection->expects($this->once())->method('reject')->with($beanstalkdEnvelope['id'],2);
8588

8689
$receiver =newBeanstalkdReceiver($connection,$serializer);
8790
$receiver->get();
8891
}
8992

93+
/**
94+
* @dataProvider provideRejectCases
95+
*/
96+
publicfunctiontestReject(array$stamps, ?int$priority,bool$forceDelete)
97+
{
98+
$serializer =$this->createSerializer();
99+
100+
$id ='some id';
101+
102+
$connection =$this->createMock(Connection::class);
103+
$connection->expects($this->once())->method('reject')->with($id,$priority,$forceDelete);
104+
105+
$envelope = (newEnvelope(newDummyMessage('Oy')))->with(newBeanstalkdReceivedStamp($id,'foo bar'));
106+
foreach ($stampsas$stamp) {
107+
$envelope =$envelope->with($stamp);
108+
}
109+
110+
$receiver =newBeanstalkdReceiver($connection,$serializer);
111+
$receiver->reject($envelope);
112+
}
113+
114+
publicstaticfunctionprovideRejectCases():iterable
115+
{
116+
yield'No stamp' => [[],null,false];
117+
yield'With sent for retry true' => [[newSentForRetryStamp(true)],null,true];
118+
yield'With sent for retry true and priority' => [[newBeanstalkdPriorityStamp(2),newSentForRetryStamp(true)],2,true];
119+
yield'With sent for retry false' => [[newSentForRetryStamp(false)],null,false];
120+
}
121+
90122
publicfunctiontestKeepalive()
91123
{
92124
$serializer =$this->createSerializer();

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php

Lines changed: 90 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public function testFromDsn()
4747
$this->assertSame('default',$configuration['tube_name']);
4848
$this->assertSame(0,$configuration['timeout']);
4949
$this->assertSame(90,$configuration['ttr']);
50+
$this->assertFalse($configuration['bury_on_reject']);
5051

5152
$this->assertEquals(
5253
$connection =newConnection([], Pheanstalk::create('foobar',15555)),
@@ -58,22 +59,32 @@ public function testFromDsn()
5859
$this->assertSame('default',$configuration['tube_name']);
5960
$this->assertSame(0,$configuration['timeout']);
6061
$this->assertSame(90,$configuration['ttr']);
62+
$this->assertFalse($configuration['bury_on_reject']);
6163
$this->assertSame('default',$connection->getTube());
6264
}
6365

6466
publicfunctiontestFromDsnWithOptions()
6567
{
6668
$this->assertEquals(
67-
$connection = Connection::fromDsn('beanstalkd://localhost', ['tube_name' =>'foo','timeout' =>10,'ttr' =>5000]),
68-
Connection::fromDsn('beanstalkd://localhost?tube_name=foo&timeout=10&ttr=5000')
69+
$connectionWithOptions = Connection::fromDsn('beanstalkd://localhost', ['tube_name' =>'foo','timeout' =>10,'ttr' =>5000,'bury_on_reject' =>true]),
70+
$connectionWithQuery =Connection::fromDsn('beanstalkd://localhost?tube_name=foo&timeout=10&ttr=5000&bury_on_reject=true')
6971
);
7072

71-
$configuration =$connection->getConfiguration();
73+
$configuration =$connectionWithOptions->getConfiguration();
7274

7375
$this->assertSame('foo',$configuration['tube_name']);
7476
$this->assertSame(10,$configuration['timeout']);
7577
$this->assertSame(5000,$configuration['ttr']);
76-
$this->assertSame('foo',$connection->getTube());
78+
$this->assertTrue($configuration['bury_on_reject']);
79+
$this->assertSame('foo',$connectionWithOptions->getTube());
80+
81+
$configuration =$connectionWithQuery->getConfiguration();
82+
83+
$this->assertSame('foo',$configuration['tube_name']);
84+
$this->assertSame(10,$configuration['timeout']);
85+
$this->assertSame(5000,$configuration['ttr']);
86+
$this->assertTrue($configuration['bury_on_reject']);
87+
$this->assertSame('foo',$connectionWithOptions->getTube());
7788
}
7889

7990
publicfunctiontestFromDsnOptionsArrayWinsOverOptionsFromDsn()
@@ -82,18 +93,20 @@ public function testFromDsnOptionsArrayWinsOverOptionsFromDsn()
8293
'tube_name' =>'bar',
8394
'timeout' =>20,
8495
'ttr' =>6000,
96+
'bury_on_reject' =>false,
8597
];
8698

8799
$this->assertEquals(
88100
$connection =newConnection($options, Pheanstalk::create('localhost',11333)),
89-
Connection::fromDsn('beanstalkd://localhost:11333?tube_name=foo&timeout=10&ttr=5000',$options)
101+
Connection::fromDsn('beanstalkd://localhost:11333?tube_name=foo&timeout=10&ttr=5000&bury_on_reject=true',$options)
90102
);
91103

92104
$configuration =$connection->getConfiguration();
93105

94106
$this->assertSame($options['tube_name'],$configuration['tube_name']);
95107
$this->assertSame($options['timeout'],$configuration['timeout']);
96108
$this->assertSame($options['ttr'],$configuration['ttr']);
109+
$this->assertSame($options['bury_on_reject'],$configuration['bury_on_reject']);
97110
$this->assertSame($options['tube_name'],$connection->getTube());
98111
}
99112

@@ -199,7 +212,12 @@ public function testAckWhenABeanstalkdExceptionOccurs()
199212
$connection->ack((string)$id);
200213
}
201214

202-
publicfunctiontestReject()
215+
/**
216+
* @testWith [false, false]
217+
* [false, true]
218+
* [true, true]
219+
*/
220+
publicfunctiontestReject(bool$buryOnReject,bool$forceDelete)
203221
{
204222
$id =123456;
205223

@@ -209,11 +227,42 @@ public function testReject()
209227
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
210228
$client->expects($this->once())->method('delete')->with($this->callback(fn (JobId$jobId):bool =>$jobId->getId() ===$id));
211229

212-
$connection =newConnection(['tube_name' =>$tube],$client);
230+
$connection =newConnection(['tube_name' =>$tube,'bury_on_reject' =>$buryOnReject],$client);
231+
232+
$connection->reject((string)$id,null,$forceDelete);
233+
}
234+
235+
publicfunctiontestRejectWithBury()
236+
{
237+
$id =123456;
238+
239+
$tube ='baz';
240+
241+
$client =$this->createMock(PheanstalkInterface::class);
242+
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
243+
$client->expects($this->once())->method('bury')->with($this->callback(fn (JobId$jobId):bool =>$jobId->getId() ===$id),1024);
244+
245+
$connection =newConnection(['tube_name' =>$tube,'bury_on_reject' =>true],$client);
213246

214247
$connection->reject((string)$id);
215248
}
216249

250+
publicfunctiontestRejectWithBuryAndPriority()
251+
{
252+
$id =123456;
253+
$priority =2;
254+
255+
$tube ='baz';
256+
257+
$client =$this->createMock(PheanstalkInterface::class);
258+
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
259+
$client->expects($this->once())->method('bury')->with($this->callback(fn (JobId$jobId):bool =>$jobId->getId() ===$id),$priority);
260+
261+
$connection =newConnection(['tube_name' =>$tube,'bury_on_reject' =>true],$client);
262+
263+
$connection->reject((string)$id,$priority);
264+
}
265+
217266
publicfunctiontestRejectWhenABeanstalkdExceptionOccurs()
218267
{
219268
$id =123456;
@@ -263,6 +312,40 @@ public function testMessageCountWhenABeanstalkdExceptionOccurs()
263312
$connection->getMessageCount();
264313
}
265314

315+
publicfunctiontestMessagePriority()
316+
{
317+
$id =123456;
318+
$priority =51;
319+
320+
$tube ='baz';
321+
322+
$response =newArrayResponse('OK', ['pri' =>$priority]);
323+
324+
$client =$this->createMock(PheanstalkInterface::class);
325+
$client->expects($this->once())->method('statsJob')->with($this->callback(fn (JobId$jobId):bool =>$jobId->getId() ===$id))->willReturn($response);
326+
327+
$connection =newConnection(['tube_name' =>$tube],$client);
328+
329+
$this->assertSame($priority,$connection->getMessagePriority((string)$id));
330+
}
331+
332+
publicfunctiontestMessagePriorityWhenABeanstalkdExceptionOccurs()
333+
{
334+
$id =123456;
335+
336+
$tube ='baz1234';
337+
338+
$exception =newClientException('foobar error');
339+
340+
$client =$this->createMock(PheanstalkInterface::class);
341+
$client->expects($this->once())->method('statsJob')->with($this->callback(fn (JobId$jobId):bool =>$jobId->getId() ===$id))->willThrowException($exception);
342+
343+
$connection =newConnection(['tube_name' =>$tube],$client);
344+
345+
$this->expectExceptionObject(newTransportException($exception->getMessage(),0,$exception));
346+
$connection->getMessagePriority((string)$id);
347+
}
348+
266349
publicfunctiontestSend()
267350
{
268351
$tube ='xyz';

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
useSymfony\Component\Messenger\Envelope;
1515
useSymfony\Component\Messenger\Exception\LogicException;
1616
useSymfony\Component\Messenger\Exception\MessageDecodingFailedException;
17+
useSymfony\Component\Messenger\Stamp\SentForRetryStamp;
1718
useSymfony\Component\Messenger\Stamp\TransportMessageIdStamp;
1819
useSymfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
1920
useSymfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
@@ -48,7 +49,10 @@ public function get(): iterable
4849
'headers' =>$beanstalkdEnvelope['headers'],
4950
]);
5051
}catch (MessageDecodingFailedException$exception) {
51-
$this->connection->reject($beanstalkdEnvelope['id']);
52+
$this->connection->reject(
53+
$beanstalkdEnvelope['id'],
54+
$this->connection->getMessagePriority($beanstalkdEnvelope['id']),
55+
);
5256

5357
throw$exception;
5458
}
@@ -68,7 +72,11 @@ public function ack(Envelope $envelope): void
6872

6973
publicfunctionreject(Envelope$envelope):void
7074
{
71-
$this->connection->reject($this->findBeanstalkdReceivedStamp($envelope)->getId());
75+
$this->connection->reject(
76+
$this->findBeanstalkdReceivedStamp($envelope)->getId(),
77+
$envelope->last(BeanstalkdPriorityStamp::class)?->priority,
78+
$envelope->last(SentForRetryStamp::class)?->isSent ??false,
79+
);
7280
}
7381

7482
publicfunctionkeepalive(Envelope$envelope, ?int$seconds =null):void

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,13 @@ class Connection
3232
'tube_name' => PheanstalkInterface::DEFAULT_TUBE,
3333
'timeout' =>0,
3434
'ttr' =>90,
35+
'bury_on_reject' =>false,
3536
];
3637

3738
privatestring$tube;
3839
privateint$timeout;
3940
privateint$ttr;
41+
privatebool$buryOnReject;
4042

4143
/**
4244
* Constructor.
@@ -46,6 +48,7 @@ class Connection
4648
* * tube_name: name of the tube
4749
* * timeout: message reservation timeout (in seconds)
4850
* * ttr: the message time to run before it is put back in the ready queue (in seconds)
51+
* * bury_on_reject: bury rejected messages instead of deleting them
4952
*/
5053
publicfunction__construct(
5154
privatearray$configuration,
@@ -55,6 +58,7 @@ public function __construct(
5558
$this->tube =$this->configuration['tube_name'];
5659
$this->timeout =$this->configuration['timeout'];
5760
$this->ttr =$this->configuration['ttr'];
61+
$this->buryOnReject =$this->configuration['bury_on_reject'];
5862
}
5963

6064
publicstaticfunctionfromDsn(#[\SensitiveParameter]string$dsn,array$options = []):self
@@ -74,7 +78,15 @@ public static function fromDsn(#[\SensitiveParameter] string $dsn, array $option
7478
}
7579

7680
$configuration = [];
77-
$configuration +=$options +$query +self::DEFAULT_OPTIONS;
81+
foreach (self::DEFAULT_OPTIONSas$k =>$v) {
82+
$value =$options[$k] ??$query[$k] ??$v;
83+
84+
$configuration[$k] =match (\gettype($v)) {
85+
'integer' =>filter_var($value, \FILTER_VALIDATE_INT),
86+
'boolean' =>filter_var($value, \FILTER_VALIDATE_BOOL),
87+
default =>$value,
88+
};
89+
}
7890

7991
// check for extra keys in options
8092
$optionsExtraKeys =array_diff(array_keys($options),array_keys(self::DEFAULT_OPTIONS));
@@ -172,10 +184,14 @@ public function ack(string $id): void
172184
}
173185
}
174186

175-
publicfunctionreject(string$id):void
187+
publicfunctionreject(string$id, ?int$priority =null,bool$forceDelete =false):void
176188
{
177189
try {
178-
$this->client->useTube($this->tube)->delete(newJobId((int)$id));
190+
if (!$forceDelete &&$this->buryOnReject) {
191+
$this->client->useTube($this->tube)->bury(newJobId((int)$id),$priority ?? PheanstalkInterface::DEFAULT_PRIORITY);
192+
}else {
193+
$this->client->useTube($this->tube)->delete(newJobId((int)$id));
194+
}
179195
}catch (Exception$exception) {
180196
thrownewTransportException($exception->getMessage(),0,$exception);
181197
}
@@ -201,4 +217,15 @@ public function getMessageCount(): int
201217

202218
return (int)$tubeStats['current-jobs-ready'];
203219
}
220+
221+
publicfunctiongetMessagePriority(string$id):int
222+
{
223+
try {
224+
$jobStats =$this->client->statsJob(newJobId((int)$id));
225+
}catch (Exception$exception) {
226+
thrownewTransportException($exception->getMessage(),0,$exception);
227+
}
228+
229+
return (int)$jobStats['pri'];
230+
}
204231
}

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
"require": {
1515
"php":">=8.2",
1616
"pda/pheanstalk":"^4.0",
17-
"symfony/messenger":"^7.2"
17+
"symfony/messenger":"^7.3"
1818
},
1919
"require-dev": {
2020
"symfony/property-access":"^6.4|^7.0",

‎src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
CHANGELOG
22
=========
33

4+
7.3
5+
---
6+
7+
* Add`SentForRetryStamp` that identifies whether a failed message was sent for retry
8+
49
7.2
510
---
611

‎src/Symfony/Component/Messenger/EventListener/SendFailedMessageForRetryListener.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
useSymfony\Component\Messenger\Retry\RetryStrategyInterface;
2626
useSymfony\Component\Messenger\Stamp\DelayStamp;
2727
useSymfony\Component\Messenger\Stamp\RedeliveryStamp;
28+
useSymfony\Component\Messenger\Stamp\SentForRetryStamp;
2829
useSymfony\Component\Messenger\Stamp\StampInterface;
2930
useSymfony\Component\Messenger\Stamp\TransportMessageIdStamp;
3031
useSymfony\Component\Messenger\Transport\Sender\SenderInterface;
@@ -82,6 +83,8 @@ public function onMessageFailed(WorkerMessageFailedEvent $event): void
8283
}else {
8384
$this->logger?->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"',$context + ['retryCount' =>$retryCount,'error' =>$throwable->getMessage(),'exception' =>$throwable]);
8485
}
86+
87+
$event->addStamps(newSentForRetryStamp($shouldRetry));
8588
}
8689

8790
/**

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp