Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitee2bb61

Browse files
HypeMCOskarStark
authored andcommitted
[Messenger] Add bury_on_reject option to Beanstalkd bridge
1 parent1160b37 commitee2bb61

File tree

5 files changed

+99
-11
lines changed

5 files changed

+99
-11
lines changed

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
CHANGELOG
22
=========
33

4+
6.4
5+
---
6+
7+
* Add`bury_on_reject` option to bury failed messages instead of deleting them
8+
49
5.2.0
510
-----
611

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdReceiverTest.php

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
useSymfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceivedStamp;
1717
useSymfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceiver;
1818
useSymfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection;
19+
useSymfony\Component\Messenger\Envelope;
1920
useSymfony\Component\Messenger\Exception\MessageDecodingFailedException;
21+
useSymfony\Component\Messenger\Stamp\SentForRetryStamp;
2022
useSymfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2123
useSymfony\Component\Messenger\Transport\Serialization\Serializer;
2224
useSymfony\Component\SerializerasSerializerComponent;
@@ -78,6 +80,34 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
7880
$receiver->get();
7981
}
8082

83+
/**
84+
* @dataProvider provideRejectCases
85+
*/
86+
publicfunctiontestReject(array$stamps,bool$forceDelete)
87+
{
88+
$serializer =$this->createSerializer();
89+
90+
$id ='some id';
91+
92+
$connection =$this->createMock(Connection::class);
93+
$connection->expects($this->once())->method('reject')->with($id,$forceDelete);
94+
95+
$envelope = (newEnvelope(newDummyMessage('Oy')))->with(newBeanstalkdReceivedStamp($id,'foo bar'));
96+
foreach ($stampsas$stamp) {
97+
$envelope =$envelope->with($stamp);
98+
}
99+
100+
$receiver =newBeanstalkdReceiver($connection,$serializer);
101+
$receiver->reject($envelope);
102+
}
103+
104+
publicstaticfunctionprovideRejectCases():iterable
105+
{
106+
yield'No stamp' => [[],false];
107+
yield'With sent for retry true' => [[newSentForRetryStamp(true)],true];
108+
yield'With sent for retry false' => [[newSentForRetryStamp(false)],false];
109+
}
110+
81111
privatefunctioncreateBeanstalkdEnvelope():array
82112
{
83113
return [

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public function testFromDsn()
4747
$this->assertSame('default',$configuration['tube_name']);
4848
$this->assertSame(0,$configuration['timeout']);
4949
$this->assertSame(90,$configuration['ttr']);
50+
$this->assertFalse($configuration['bury_on_reject']);
5051

5152
$this->assertEquals(
5253
$connection =newConnection([], Pheanstalk::create('foobar',15555)),
@@ -58,22 +59,32 @@ public function testFromDsn()
5859
$this->assertSame('default',$configuration['tube_name']);
5960
$this->assertSame(0,$configuration['timeout']);
6061
$this->assertSame(90,$configuration['ttr']);
62+
$this->assertFalse($configuration['bury_on_reject']);
6163
$this->assertSame('default',$connection->getTube());
6264
}
6365

6466
publicfunctiontestFromDsnWithOptions()
6567
{
6668
$this->assertEquals(
67-
$connection = Connection::fromDsn('beanstalkd://localhost', ['tube_name' =>'foo','timeout' =>10,'ttr' =>5000]),
68-
Connection::fromDsn('beanstalkd://localhost?tube_name=foo&timeout=10&ttr=5000')
69+
$connectionWithOptions = Connection::fromDsn('beanstalkd://localhost', ['tube_name' =>'foo','timeout' =>10,'ttr' =>5000,'bury_on_reject' =>true]),
70+
$connectionWithQuery =Connection::fromDsn('beanstalkd://localhost?tube_name=foo&timeout=10&ttr=5000&bury_on_reject=true')
6971
);
7072

71-
$configuration =$connection->getConfiguration();
73+
$configuration =$connectionWithOptions->getConfiguration();
7274

7375
$this->assertSame('foo',$configuration['tube_name']);
7476
$this->assertSame(10,$configuration['timeout']);
7577
$this->assertSame(5000,$configuration['ttr']);
76-
$this->assertSame('foo',$connection->getTube());
78+
$this->assertTrue($configuration['bury_on_reject']);
79+
$this->assertSame('foo',$connectionWithOptions->getTube());
80+
81+
$configuration =$connectionWithQuery->getConfiguration();
82+
83+
$this->assertSame('foo',$configuration['tube_name']);
84+
$this->assertSame(10,$configuration['timeout']);
85+
$this->assertSame(5000,$configuration['ttr']);
86+
$this->assertTrue($configuration['bury_on_reject']);
87+
$this->assertSame('foo',$connectionWithOptions->getTube());
7788
}
7889

7990
publicfunctiontestFromDsnOptionsArrayWinsOverOptionsFromDsn()
@@ -82,18 +93,20 @@ public function testFromDsnOptionsArrayWinsOverOptionsFromDsn()
8293
'tube_name' =>'bar',
8394
'timeout' =>20,
8495
'ttr' =>6000,
96+
'bury_on_reject' =>false,
8597
];
8698

8799
$this->assertEquals(
88100
$connection =newConnection($options, Pheanstalk::create('localhost',11333)),
89-
Connection::fromDsn('beanstalkd://localhost:11333?tube_name=foo&timeout=10&ttr=5000',$options)
101+
Connection::fromDsn('beanstalkd://localhost:11333?tube_name=foo&timeout=10&ttr=5000&bury_on_reject=true',$options)
90102
);
91103

92104
$configuration =$connection->getConfiguration();
93105

94106
$this->assertSame($options['tube_name'],$configuration['tube_name']);
95107
$this->assertSame($options['timeout'],$configuration['timeout']);
96108
$this->assertSame($options['ttr'],$configuration['ttr']);
109+
$this->assertSame($options['bury_on_reject'],$configuration['bury_on_reject']);
97110
$this->assertSame($options['tube_name'],$connection->getTube());
98111
}
99112

@@ -199,7 +212,12 @@ public function testAckWhenABeanstalkdExceptionOccurs()
199212
$connection->ack((string)$id);
200213
}
201214

202-
publicfunctiontestReject()
215+
/**
216+
* @testWith [false, false]
217+
* [false, true]
218+
* [true, true]
219+
*/
220+
publicfunctiontestReject(bool$buryOnReject,bool$forceDelete)
203221
{
204222
$id =123456;
205223

@@ -209,7 +227,22 @@ public function testReject()
209227
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
210228
$client->expects($this->once())->method('delete')->with($this->callback(fn (JobId$jobId):bool =>$jobId->getId() ===$id));
211229

212-
$connection =newConnection(['tube_name' =>$tube],$client);
230+
$connection =newConnection(['tube_name' =>$tube,'bury_on_reject' =>$buryOnReject],$client);
231+
232+
$connection->reject((string)$id,$forceDelete);
233+
}
234+
235+
publicfunctiontestRejectWithBury()
236+
{
237+
$id =123456;
238+
239+
$tube ='baz';
240+
241+
$client =$this->createMock(PheanstalkInterface::class);
242+
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
243+
$client->expects($this->once())->method('bury')->with($this->callback(fn (JobId$jobId):bool =>$jobId->getId() ===$id));
244+
245+
$connection =newConnection(['tube_name' =>$tube,'bury_on_reject' =>true],$client);
213246

214247
$connection->reject((string)$id);
215248
}

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
useSymfony\Component\Messenger\Envelope;
1515
useSymfony\Component\Messenger\Exception\LogicException;
1616
useSymfony\Component\Messenger\Exception\MessageDecodingFailedException;
17+
useSymfony\Component\Messenger\Stamp\SentForRetryStamp;
1718
useSymfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1819
useSymfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
1920
useSymfony\Component\Messenger\Transport\Serialization\PhpSerializer;
@@ -62,7 +63,10 @@ public function ack(Envelope $envelope): void
6263

6364
publicfunctionreject(Envelope$envelope):void
6465
{
65-
$this->connection->reject($this->findBeanstalkdReceivedStamp($envelope)->getId());
66+
$this->connection->reject(
67+
$this->findBeanstalkdReceivedStamp($envelope)->getId(),
68+
$envelope->last(SentForRetryStamp::class)?->isSent ??false,
69+
);
6670
}
6771

6872
publicfunctiongetMessageCount():int

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ class Connection
3232
'tube_name' => PheanstalkInterface::DEFAULT_TUBE,
3333
'timeout' =>0,
3434
'ttr' =>90,
35+
'bury_on_reject' =>false,
3536
];
3637

3738
/**
@@ -40,12 +41,14 @@ class Connection
4041
* * tube_name: name of the tube
4142
* * timeout: message reservation timeout (in seconds)
4243
* * ttr: the message time to run before it is put back in the ready queue (in seconds)
44+
* * bury_on_reject: bury rejected messages instead of deleting them
4345
*/
4446
privatearray$configuration;
4547
privatePheanstalkInterface$client;
4648
privatestring$tube;
4749
privateint$timeout;
4850
privateint$ttr;
51+
privatebool$buryOnReject;
4952

5053
publicfunction__construct(array$configuration,PheanstalkInterface$client)
5154
{
@@ -54,6 +57,7 @@ public function __construct(array $configuration, PheanstalkInterface $client)
5457
$this->tube =$this->configuration['tube_name'];
5558
$this->timeout =$this->configuration['timeout'];
5659
$this->ttr =$this->configuration['ttr'];
60+
$this->buryOnReject =$this->configuration['bury_on_reject'];
5761
}
5862

5963
publicstaticfunctionfromDsn(#[\SensitiveParameter]string$dsn,array$options = []):self
@@ -73,7 +77,15 @@ public static function fromDsn(#[\SensitiveParameter] string $dsn, array $option
7377
}
7478

7579
$configuration = [];
76-
$configuration +=$options +$query +self::DEFAULT_OPTIONS;
80+
foreach (self::DEFAULT_OPTIONSas$k =>$v) {
81+
$value =$options[$k] ??$query[$k] ??$v;
82+
83+
$configuration[$k] =match (\gettype($v)) {
84+
'integer' =>filter_var($value, \FILTER_VALIDATE_INT),
85+
'boolean' =>filter_var($value, \FILTER_VALIDATE_BOOL),
86+
default =>$value,
87+
};
88+
}
7789

7890
// check for extra keys in options
7991
$optionsExtraKeys =array_diff(array_keys($options),array_keys(self::DEFAULT_OPTIONS));
@@ -170,10 +182,14 @@ public function ack(string $id): void
170182
}
171183
}
172184

173-
publicfunctionreject(string$id):void
185+
publicfunctionreject(string$id,bool$forceDelete =false):void
174186
{
175187
try {
176-
$this->client->useTube($this->tube)->delete(newJobId((int)$id));
188+
if (!$forceDelete &&$this->buryOnReject) {
189+
$this->client->useTube($this->tube)->bury(newJobId((int)$id));
190+
}else {
191+
$this->client->useTube($this->tube)->delete(newJobId((int)$id));
192+
}
177193
}catch (Exception$exception) {
178194
thrownewTransportException($exception->getMessage(),0,$exception);
179195
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp