Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite1bf105

Browse files
committed
[Messenger] Add bury_on_reject option to Beanstalkd bridge
1 parent95a5d96 commite1bf105

File tree

6 files changed

+165
-14
lines changed

6 files changed

+165
-14
lines changed

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ CHANGELOG
55
---
66

77
* Add`BeanstalkdPriorityStamp` option to allow setting the message priority
8+
* Add`bury_on_reject` option to bury failed messages instead of deleting them
89

910
7.2
1011
---

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdReceiverTest.php

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,13 @@
1313

1414
usePHPUnit\Framework\TestCase;
1515
useSymfony\Component\Messenger\Bridge\Beanstalkd\Tests\Fixtures\DummyMessage;
16+
useSymfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdPriorityStamp;
1617
useSymfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceivedStamp;
1718
useSymfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceiver;
1819
useSymfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection;
1920
useSymfony\Component\Messenger\Envelope;
2021
useSymfony\Component\Messenger\Exception\MessageDecodingFailedException;
22+
useSymfony\Component\Messenger\Stamp\SentForRetryStamp;
2123
useSymfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2224
useSymfony\Component\Messenger\Transport\Serialization\Serializer;
2325
useSymfony\Component\SerializerasSerializerComponent;
@@ -73,12 +75,42 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
7375
$beanstalkdEnvelope =$this->createBeanstalkdEnvelope();
7476
$connection =$this->createMock(Connection::class);
7577
$connection->expects($this->once())->method('get')->willReturn($beanstalkdEnvelope);
76-
$connection->expects($this->once())->method('reject');
78+
$connection->expects($this->once())->method('getMessagePriority')->with($beanstalkdEnvelope['id'])->willReturn(2);
79+
$connection->expects($this->once())->method('reject')->with($beanstalkdEnvelope['id'],2);
7780

7881
$receiver =newBeanstalkdReceiver($connection,$serializer);
7982
$receiver->get();
8083
}
8184

85+
/**
86+
* @dataProvider provideRejectCases
87+
*/
88+
publicfunctiontestReject(array$stamps, ?int$priority,bool$forceDelete)
89+
{
90+
$serializer =$this->createSerializer();
91+
92+
$id ='some id';
93+
94+
$connection =$this->createMock(Connection::class);
95+
$connection->expects($this->once())->method('reject')->with($id,$priority,$forceDelete);
96+
97+
$envelope = (newEnvelope(newDummyMessage('Oy')))->with(newBeanstalkdReceivedStamp($id,'foo bar'));
98+
foreach ($stampsas$stamp) {
99+
$envelope =$envelope->with($stamp);
100+
}
101+
102+
$receiver =newBeanstalkdReceiver($connection,$serializer);
103+
$receiver->reject($envelope);
104+
}
105+
106+
publicstaticfunctionprovideRejectCases():iterable
107+
{
108+
yield'No stamp' => [[],null,false];
109+
yield'With sent for retry true' => [[newSentForRetryStamp(true)],null,true];
110+
yield'With sent for retry true and priority' => [[newBeanstalkdPriorityStamp(2),newSentForRetryStamp(true)],2,true];
111+
yield'With sent for retry false' => [[newSentForRetryStamp(false)],null,false];
112+
}
113+
82114
publicfunctiontestKeepalive()
83115
{
84116
$serializer =$this->createSerializer();

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php

Lines changed: 90 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public function testFromDsn()
4747
$this->assertSame('default',$configuration['tube_name']);
4848
$this->assertSame(0,$configuration['timeout']);
4949
$this->assertSame(90,$configuration['ttr']);
50+
$this->assertFalse($configuration['bury_on_reject']);
5051

5152
$this->assertEquals(
5253
$connection =newConnection([], Pheanstalk::create('foobar',15555)),
@@ -58,22 +59,32 @@ public function testFromDsn()
5859
$this->assertSame('default',$configuration['tube_name']);
5960
$this->assertSame(0,$configuration['timeout']);
6061
$this->assertSame(90,$configuration['ttr']);
62+
$this->assertFalse($configuration['bury_on_reject']);
6163
$this->assertSame('default',$connection->getTube());
6264
}
6365

6466
publicfunctiontestFromDsnWithOptions()
6567
{
6668
$this->assertEquals(
67-
$connection = Connection::fromDsn('beanstalkd://localhost', ['tube_name' =>'foo','timeout' =>10,'ttr' =>5000]),
68-
Connection::fromDsn('beanstalkd://localhost?tube_name=foo&timeout=10&ttr=5000')
69+
$connectionWithOptions = Connection::fromDsn('beanstalkd://localhost', ['tube_name' =>'foo','timeout' =>10,'ttr' =>5000,'bury_on_reject' =>true]),
70+
$connectionWithQuery =Connection::fromDsn('beanstalkd://localhost?tube_name=foo&timeout=10&ttr=5000&bury_on_reject=true')
6971
);
7072

71-
$configuration =$connection->getConfiguration();
73+
$configuration =$connectionWithOptions->getConfiguration();
7274

7375
$this->assertSame('foo',$configuration['tube_name']);
7476
$this->assertSame(10,$configuration['timeout']);
7577
$this->assertSame(5000,$configuration['ttr']);
76-
$this->assertSame('foo',$connection->getTube());
78+
$this->assertTrue($configuration['bury_on_reject']);
79+
$this->assertSame('foo',$connectionWithOptions->getTube());
80+
81+
$configuration =$connectionWithQuery->getConfiguration();
82+
83+
$this->assertSame('foo',$configuration['tube_name']);
84+
$this->assertSame(10,$configuration['timeout']);
85+
$this->assertSame(5000,$configuration['ttr']);
86+
$this->assertTrue($configuration['bury_on_reject']);
87+
$this->assertSame('foo',$connectionWithOptions->getTube());
7788
}
7889

7990
publicfunctiontestFromDsnOptionsArrayWinsOverOptionsFromDsn()
@@ -82,18 +93,20 @@ public function testFromDsnOptionsArrayWinsOverOptionsFromDsn()
8293
'tube_name' =>'bar',
8394
'timeout' =>20,
8495
'ttr' =>6000,
96+
'bury_on_reject' =>false,
8597
];
8698

8799
$this->assertEquals(
88100
$connection =newConnection($options, Pheanstalk::create('localhost',11333)),
89-
Connection::fromDsn('beanstalkd://localhost:11333?tube_name=foo&timeout=10&ttr=5000',$options)
101+
Connection::fromDsn('beanstalkd://localhost:11333?tube_name=foo&timeout=10&ttr=5000&bury_on_reject=true',$options)
90102
);
91103

92104
$configuration =$connection->getConfiguration();
93105

94106
$this->assertSame($options['tube_name'],$configuration['tube_name']);
95107
$this->assertSame($options['timeout'],$configuration['timeout']);
96108
$this->assertSame($options['ttr'],$configuration['ttr']);
109+
$this->assertSame($options['bury_on_reject'],$configuration['bury_on_reject']);
97110
$this->assertSame($options['tube_name'],$connection->getTube());
98111
}
99112

@@ -199,7 +212,12 @@ public function testAckWhenABeanstalkdExceptionOccurs()
199212
$connection->ack((string)$id);
200213
}
201214

202-
publicfunctiontestReject()
215+
/**
216+
* @testWith [false, false]
217+
* [false, true]
218+
* [true, true]
219+
*/
220+
publicfunctiontestReject(bool$buryOnReject,bool$forceDelete)
203221
{
204222
$id =123456;
205223

@@ -209,11 +227,42 @@ public function testReject()
209227
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
210228
$client->expects($this->once())->method('delete')->with($this->callback(fn (JobId$jobId):bool =>$jobId->getId() ===$id));
211229

212-
$connection =newConnection(['tube_name' =>$tube],$client);
230+
$connection =newConnection(['tube_name' =>$tube,'bury_on_reject' =>$buryOnReject],$client);
231+
232+
$connection->reject((string)$id,null,$forceDelete);
233+
}
234+
235+
publicfunctiontestRejectWithBury()
236+
{
237+
$id =123456;
238+
239+
$tube ='baz';
240+
241+
$client =$this->createMock(PheanstalkInterface::class);
242+
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
243+
$client->expects($this->once())->method('bury')->with($this->callback(fn (JobId$jobId):bool =>$jobId->getId() ===$id),1024);
244+
245+
$connection =newConnection(['tube_name' =>$tube,'bury_on_reject' =>true],$client);
213246

214247
$connection->reject((string)$id);
215248
}
216249

250+
publicfunctiontestRejectWithBuryAndPriority()
251+
{
252+
$id =123456;
253+
$priority =2;
254+
255+
$tube ='baz';
256+
257+
$client =$this->createMock(PheanstalkInterface::class);
258+
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
259+
$client->expects($this->once())->method('bury')->with($this->callback(fn (JobId$jobId):bool =>$jobId->getId() ===$id),$priority);
260+
261+
$connection =newConnection(['tube_name' =>$tube,'bury_on_reject' =>true],$client);
262+
263+
$connection->reject((string)$id,$priority);
264+
}
265+
217266
publicfunctiontestRejectWhenABeanstalkdExceptionOccurs()
218267
{
219268
$id =123456;
@@ -263,6 +312,40 @@ public function testMessageCountWhenABeanstalkdExceptionOccurs()
263312
$connection->getMessageCount();
264313
}
265314

315+
publicfunctiontestMessagePriority()
316+
{
317+
$id =123456;
318+
$priority =51;
319+
320+
$tube ='baz';
321+
322+
$response =newArrayResponse('OK', ['pri' =>$priority]);
323+
324+
$client =$this->createMock(PheanstalkInterface::class);
325+
$client->expects($this->once())->method('statsJob')->with($this->callback(fn (JobId$jobId):bool =>$jobId->getId() ===$id))->willReturn($response);
326+
327+
$connection =newConnection(['tube_name' =>$tube],$client);
328+
329+
$this->assertSame($priority,$connection->getMessagePriority((string)$id));
330+
}
331+
332+
publicfunctiontestMessagePriorityWhenABeanstalkdExceptionOccurs()
333+
{
334+
$id =123456;
335+
336+
$tube ='baz1234';
337+
338+
$exception =newClientException('foobar error');
339+
340+
$client =$this->createMock(PheanstalkInterface::class);
341+
$client->expects($this->once())->method('statsJob')->with($this->callback(fn (JobId$jobId):bool =>$jobId->getId() ===$id))->willThrowException($exception);
342+
343+
$connection =newConnection(['tube_name' =>$tube],$client);
344+
345+
$this->expectExceptionObject(newTransportException($exception->getMessage(),0,$exception));
346+
$connection->getMessagePriority((string)$id);
347+
}
348+
266349
publicfunctiontestSend()
267350
{
268351
$tube ='xyz';

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
useSymfony\Component\Messenger\Envelope;
1515
useSymfony\Component\Messenger\Exception\LogicException;
1616
useSymfony\Component\Messenger\Exception\MessageDecodingFailedException;
17+
useSymfony\Component\Messenger\Stamp\SentForRetryStamp;
1718
useSymfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
1819
useSymfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1920
useSymfony\Component\Messenger\Transport\Serialization\PhpSerializer;
@@ -47,7 +48,10 @@ public function get(): iterable
4748
'headers' =>$beanstalkdEnvelope['headers'],
4849
]);
4950
}catch (MessageDecodingFailedException$exception) {
50-
$this->connection->reject($beanstalkdEnvelope['id']);
51+
$this->connection->reject(
52+
$beanstalkdEnvelope['id'],
53+
$this->connection->getMessagePriority($beanstalkdEnvelope['id']),
54+
);
5155

5256
throw$exception;
5357
}
@@ -62,7 +66,11 @@ public function ack(Envelope $envelope): void
6266

6367
publicfunctionreject(Envelope$envelope):void
6468
{
65-
$this->connection->reject($this->findBeanstalkdReceivedStamp($envelope)->getId());
69+
$this->connection->reject(
70+
$this->findBeanstalkdReceivedStamp($envelope)->getId(),
71+
$envelope->last(BeanstalkdPriorityStamp::class)?->priority,
72+
$envelope->last(SentForRetryStamp::class)?->isSent ??false,
73+
);
6674
}
6775

6876
publicfunctionkeepalive(Envelope$envelope, ?int$seconds =null):void

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,13 @@ class Connection
3232
'tube_name' => PheanstalkInterface::DEFAULT_TUBE,
3333
'timeout' =>0,
3434
'ttr' =>90,
35+
'bury_on_reject' =>false,
3536
];
3637

3738
privatestring$tube;
3839
privateint$timeout;
3940
privateint$ttr;
41+
privatebool$buryOnReject;
4042

4143
/**
4244
* Constructor.
@@ -46,6 +48,7 @@ class Connection
4648
* * tube_name: name of the tube
4749
* * timeout: message reservation timeout (in seconds)
4850
* * ttr: the message time to run before it is put back in the ready queue (in seconds)
51+
* * bury_on_reject: bury rejected messages instead of deleting them
4952
*/
5053
publicfunction__construct(
5154
privatearray$configuration,
@@ -55,6 +58,7 @@ public function __construct(
5558
$this->tube =$this->configuration['tube_name'];
5659
$this->timeout =$this->configuration['timeout'];
5760
$this->ttr =$this->configuration['ttr'];
61+
$this->buryOnReject =$this->configuration['bury_on_reject'];
5862
}
5963

6064
publicstaticfunctionfromDsn(#[\SensitiveParameter]string$dsn,array$options = []):self
@@ -74,7 +78,15 @@ public static function fromDsn(#[\SensitiveParameter] string $dsn, array $option
7478
}
7579

7680
$configuration = [];
77-
$configuration +=$options +$query +self::DEFAULT_OPTIONS;
81+
foreach (self::DEFAULT_OPTIONSas$k =>$v) {
82+
$value =$options[$k] ??$query[$k] ??$v;
83+
84+
$configuration[$k] =match (\gettype($v)) {
85+
'integer' =>filter_var($value, \FILTER_VALIDATE_INT),
86+
'boolean' =>filter_var($value, \FILTER_VALIDATE_BOOL),
87+
default =>$value,
88+
};
89+
}
7890

7991
// check for extra keys in options
8092
$optionsExtraKeys =array_diff(array_keys($options),array_keys(self::DEFAULT_OPTIONS));
@@ -172,10 +184,14 @@ public function ack(string $id): void
172184
}
173185
}
174186

175-
publicfunctionreject(string$id):void
187+
publicfunctionreject(string$id, ?int$priority =null,bool$forceDelete =false):void
176188
{
177189
try {
178-
$this->client->useTube($this->tube)->delete(newJobId((int)$id));
190+
if (!$forceDelete &&$this->buryOnReject) {
191+
$this->client->useTube($this->tube)->bury(newJobId((int)$id),$priority ?? PheanstalkInterface::DEFAULT_PRIORITY);
192+
}else {
193+
$this->client->useTube($this->tube)->delete(newJobId((int)$id));
194+
}
179195
}catch (Exception$exception) {
180196
thrownewTransportException($exception->getMessage(),0,$exception);
181197
}
@@ -201,4 +217,15 @@ public function getMessageCount(): int
201217

202218
return (int)$tubeStats['current-jobs-ready'];
203219
}
220+
221+
publicfunctiongetMessagePriority(string$id):int
222+
{
223+
try {
224+
$jobStats =$this->client->statsJob(newJobId((int)$id));
225+
}catch (Exception$exception) {
226+
thrownewTransportException($exception->getMessage(),0,$exception);
227+
}
228+
229+
return (int)$jobStats['pri'];
230+
}
204231
}

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
"require": {
1515
"php":">=8.2",
1616
"pda/pheanstalk":"^4.0",
17-
"symfony/messenger":"^7.2"
17+
"symfony/messenger":"^7.3"
1818
},
1919
"require-dev": {
2020
"symfony/property-access":"^6.4|^7.0",

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp