Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb766607

Browse files
committed
[Messenger] Fix integration with newer version of Pheanstalk
1 parent195f1f1 commitb766607

File tree

2 files changed

+134
-26
lines changed

2 files changed

+134
-26
lines changed

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php

Lines changed: 91 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
usePheanstalk\Contract\PheanstalkSubscriberInterface;
1717
usePheanstalk\Exception;
1818
usePheanstalk\Exception\ClientException;
19+
usePheanstalk\Exception\ConnectionException;
1920
usePheanstalk\Exception\DeadlineSoonException;
2021
usePheanstalk\Exception\ServerException;
2122
usePheanstalk\Pheanstalk;
@@ -131,6 +132,7 @@ public function testItThrowsAnExceptionIfAnExtraOptionIsDefinedInDSN()
131132
publicfunctiontestGet()
132133
{
133134
$id ='1234';
135+
$id2 ='1235';
134136
$beanstalkdEnvelope = [
135137
'body' =>'foo',
136138
'headers' =>'bar',
@@ -140,13 +142,52 @@ public function testGet()
140142
$timeout =44;
141143

142144
$tubeList =newTubeList($tubeName =newTubeName($tube),$tubeNameDefault =newTubeName('default'));
143-
$job =newJob(newJobId($id),json_encode($beanstalkdEnvelope));
144145

145146
$client =$this->createMock(PheanstalkInterface::class);
146147
$client->expects($this->once())->method('watch')->with($tubeName)->willReturn(2);
147148
$client->expects($this->once())->method('listTubesWatched')->willReturn($tubeList);
148149
$client->expects($this->once())->method('ignore')->with($tubeNameDefault)->willReturn(1);
149-
$client->expects($this->once())->method('reserveWithTimeout')->with($timeout)->willReturn($job);
150+
$client->expects($this->exactly(2))->method('reserveWithTimeout')->with($timeout)->willReturnOnConsecutiveCalls(
151+
newJob(newJobId($id),json_encode($beanstalkdEnvelope)),
152+
newJob(newJobId($id2),json_encode($beanstalkdEnvelope)),
153+
);
154+
155+
$connection =newConnection(['tube_name' =>$tube,'timeout' =>$timeout],$client);
156+
157+
$envelope =$connection->get();
158+
159+
$this->assertSame($id,$envelope['id']);
160+
$this->assertSame($beanstalkdEnvelope['body'],$envelope['body']);
161+
$this->assertSame($beanstalkdEnvelope['headers'],$envelope['headers']);
162+
163+
$envelope =$connection->get();
164+
165+
$this->assertSame($id2,$envelope['id']);
166+
$this->assertSame($beanstalkdEnvelope['body'],$envelope['body']);
167+
$this->assertSame($beanstalkdEnvelope['headers'],$envelope['headers']);
168+
}
169+
170+
publicfunctiontestGetOnReconnect()
171+
{
172+
$id ='1234';
173+
$beanstalkdEnvelope = [
174+
'body' =>'foo',
175+
'headers' =>'bar',
176+
];
177+
178+
$tube ='baz';
179+
$timeout =44;
180+
181+
$tubeList =newTubeList($tubeName =newTubeName($tube),$tubeNameDefault =newTubeName('default'));
182+
183+
$client =$this->createMock(PheanstalkInterface::class);
184+
$client->expects($this->exactly(2))->method('watch')->with($tubeName)->willReturn(2);
185+
$client->expects($this->exactly(2))->method('listTubesWatched')->willReturn($tubeList);
186+
$client->expects($this->exactly(2))->method('ignore')->with($tubeNameDefault)->willReturn(1);
187+
$client->expects($this->exactly(2))->method('reserveWithTimeout')->with($timeout)->willReturnOnConsecutiveCalls(
188+
$this->throwException(newConnectionException('123','foobar')),
189+
newJob(newJobId($id),json_encode($beanstalkdEnvelope)),
190+
);
150191

151192
$connection =newConnection(['tube_name' =>$tube,'timeout' =>$timeout],$client);
152193

@@ -370,10 +411,11 @@ public function testSend()
370411
$expectedDelay =$delay /1000;
371412

372413
$id ='110';
414+
$id2 ='111';
373415

374416
$client =$this->createMock(PheanstalkInterface::class);
375417
$client->expects($this->once())->method('useTube')->with(newTubeName($tube));
376-
$client->expects($this->once())->method('put')->with(
418+
$client->expects($this->exactly(2))->method('put')->with(
377419
$this->callback(function (string$data)use ($body,$headers):bool {
378420
$expectedMessage =json_encode([
379421
'body' =>$body,
@@ -385,7 +427,51 @@ public function testSend()
385427
1024,
386428
$expectedDelay,
387429
90
388-
)->willReturn(newJob(newJobId($id),'foobar'));
430+
)->willReturnOnConsecutiveCalls(
431+
newJob(newJobId($id),'foobar'),
432+
newJob(newJobId($id2),'foobar'),
433+
);
434+
435+
$connection =newConnection(['tube_name' =>$tube],$client);
436+
437+
$returnedId =$connection->send($body,$headers,$delay);
438+
439+
$this->assertSame($id,$returnedId);
440+
441+
$returnedId =$connection->send($body,$headers,$delay);
442+
443+
$this->assertSame($id2,$returnedId);
444+
}
445+
446+
publicfunctiontestSendOnReconnect()
447+
{
448+
$tube ='xyz';
449+
450+
$body ='foo';
451+
$headers = ['test' =>'bar'];
452+
$delay =1000;
453+
$expectedDelay =$delay /1000;
454+
455+
$id ='110';
456+
457+
$client =$this->createMock(PheanstalkInterface::class);
458+
$client->expects($this->exactly(2))->method('useTube')->with(newTubeName($tube));
459+
$client->expects($this->exactly(2))->method('put')->with(
460+
$this->callback(function (string$data)use ($body,$headers):bool {
461+
$expectedMessage =json_encode([
462+
'body' =>$body,
463+
'headers' =>$headers,
464+
]);
465+
466+
return$expectedMessage ===$data;
467+
}),
468+
1024,
469+
$expectedDelay,
470+
90
471+
)->willReturnOnConsecutiveCalls(
472+
$this->throwException(newConnectionException('123','foobar')),
473+
newJob(newJobId($id),'foobar'),
474+
);
389475

390476
$connection =newConnection(['tube_name' =>$tube],$client);
391477

@@ -520,4 +606,5 @@ public function testSendWithRoundedDelay()
520606

521607
interface PheanstalkInterfaceextends PheanstalkPublisherInterface, PheanstalkSubscriberInterface, PheanstalkManagerInterface
522608
{
609+
publicfunctiondisconnect():void;
523610
}

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php

Lines changed: 43 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
usePheanstalk\Exception;
1919
usePheanstalk\Exception\ConnectionException;
2020
usePheanstalk\Pheanstalk;
21-
usePheanstalk\Values\JobasPheanstalkJob;
2221
usePheanstalk\Values\JobId;
2322
usePheanstalk\Values\TubeName;
2423
useSymfony\Component\Messenger\Exception\InvalidArgumentException;
@@ -45,6 +44,9 @@ class Connection
4544
privateint$ttr;
4645
privatebool$buryOnReject;
4746

47+
privatebool$usingTube =false;
48+
privatebool$watchingTube =false;
49+
4850
/**
4951
* Constructor.
5052
*
@@ -139,7 +141,7 @@ public function send(string $body, array $headers, int $delay = 0, ?int $priorit
139141
}
140142

141143
return$this->withReconnect(function ()use ($message,$delay,$priority) {
142-
$this->client->useTube($this->tube);
144+
$this->useTube();
143145
$job =$this->client->put(
144146
$message,
145147
$priority ?? PheanstalkPublisherInterface::DEFAULT_PRIORITY,
@@ -153,7 +155,11 @@ public function send(string $body, array $headers, int $delay = 0, ?int $priorit
153155

154156
publicfunctionget(): ?array
155157
{
156-
$job =$this->getFromTube();
158+
$job =$this->withReconnect(function () {
159+
$this->watchTube();
160+
161+
return$this->client->reserveWithTimeout($this->timeout);
162+
});
157163

158164
if (null ===$job) {
159165
returnnull;
@@ -174,33 +180,18 @@ public function get(): ?array
174180
];
175181
}
176182

177-
privatefunctiongetFromTube(): ?PheanstalkJob
178-
{
179-
return$this->withReconnect(function () {
180-
if ($this->client->watch($this->tube) >1) {
181-
foreach ($this->client->listTubesWatched()as$tube) {
182-
if ((string)$tube !== (string)$this->tube) {
183-
$this->client->ignore($tube);
184-
}
185-
}
186-
}
187-
188-
return$this->client->reserveWithTimeout($this->timeout);
189-
});
190-
}
191-
192183
publicfunctionack(string$id):void
193184
{
194185
$this->withReconnect(function ()use ($id) {
195-
$this->client->useTube($this->tube);
186+
$this->useTube();
196187
$this->client->delete(newJobId($id));
197188
});
198189
}
199190

200191
publicfunctionreject(string$id, ?int$priority =null,bool$forceDelete =false):void
201192
{
202193
$this->withReconnect(function ()use ($id,$priority,$forceDelete) {
203-
$this->client->useTube($this->tube);
194+
$this->useTube();
204195

205196
if (!$forceDelete &&$this->buryOnReject) {
206197
$this->client->bury(newJobId($id),$priority ?? PheanstalkPublisherInterface::DEFAULT_PRIORITY);
@@ -213,15 +204,15 @@ public function reject(string $id, ?int $priority = null, bool $forceDelete = fa
213204
publicfunctionkeepalive(string$id):void
214205
{
215206
$this->withReconnect(function ()use ($id) {
216-
$this->client->useTube($this->tube);
207+
$this->useTube();
217208
$this->client->touch(newJobId($id));
218209
});
219210
}
220211

221212
publicfunctiongetMessageCount():int
222213
{
223214
return$this->withReconnect(function () {
224-
$this->client->useTube($this->tube);
215+
$this->useTube();
225216
$tubeStats =$this->client->statsTube($this->tube);
226217

227218
return$tubeStats->currentJobsReady;
@@ -237,6 +228,33 @@ public function getMessagePriority(string $id): int
237228
});
238229
}
239230

231+
privatefunctionuseTube():void
232+
{
233+
if ($this->usingTube) {
234+
return;
235+
}
236+
237+
$this->client->useTube($this->tube);
238+
$this->usingTube =true;
239+
}
240+
241+
privatefunctionwatchTube():void
242+
{
243+
if ($this->watchingTube) {
244+
return;
245+
}
246+
247+
if ($this->client->watch($this->tube) >1) {
248+
foreach ($this->client->listTubesWatched()as$tube) {
249+
if ((string)$tube !== (string)$this->tube) {
250+
$this->client->ignore($tube);
251+
}
252+
}
253+
}
254+
255+
$this->watchingTube =true;
256+
}
257+
240258
privatefunctionwithReconnect(callable$command):mixed
241259
{
242260
try {
@@ -245,6 +263,9 @@ private function withReconnect(callable $command): mixed
245263
}catch (ConnectionException) {
246264
$this->client->disconnect();
247265

266+
$this->usingTube =false;
267+
$this->watchingTube =false;
268+
248269
return$command();
249270
}
250271
}catch (Exception$exception) {

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp