16
16
use Pheanstalk \Contract \PheanstalkSubscriberInterface ;
17
17
use Pheanstalk \Exception ;
18
18
use Pheanstalk \Exception \ClientException ;
19
+ use Pheanstalk \Exception \ConnectionException ;
19
20
use Pheanstalk \Exception \DeadlineSoonException ;
20
21
use Pheanstalk \Exception \ServerException ;
21
22
use Pheanstalk \Pheanstalk ;
@@ -131,6 +132,7 @@ public function testItThrowsAnExceptionIfAnExtraOptionIsDefinedInDSN()
131
132
public function testGet ()
132
133
{
133
134
$ id ='1234 ' ;
135
+ $ id2 ='1235 ' ;
134
136
$ beanstalkdEnvelope = [
135
137
'body ' =>'foo ' ,
136
138
'headers ' =>'bar ' ,
@@ -140,13 +142,52 @@ public function testGet()
140
142
$ timeout =44 ;
141
143
142
144
$ tubeList =new TubeList ($ tubeName =new TubeName ($ tube ),$ tubeNameDefault =new TubeName ('default ' ));
143
- $ job =new Job (new JobId ($ id ),json_encode ($ beanstalkdEnvelope ));
144
145
145
146
$ client =$ this ->createMock (PheanstalkInterface::class);
146
147
$ client ->expects ($ this ->once ())->method ('watch ' )->with ($ tubeName )->willReturn (2 );
147
148
$ client ->expects ($ this ->once ())->method ('listTubesWatched ' )->willReturn ($ tubeList );
148
149
$ client ->expects ($ this ->once ())->method ('ignore ' )->with ($ tubeNameDefault )->willReturn (1 );
149
- $ client ->expects ($ this ->once ())->method ('reserveWithTimeout ' )->with ($ timeout )->willReturn ($ job );
150
+ $ client ->expects ($ this ->exactly (2 ))->method ('reserveWithTimeout ' )->with ($ timeout )->willReturnOnConsecutiveCalls (
151
+ new Job (new JobId ($ id ),json_encode ($ beanstalkdEnvelope )),
152
+ new Job (new JobId ($ id2 ),json_encode ($ beanstalkdEnvelope )),
153
+ );
154
+
155
+ $ connection =new Connection (['tube_name ' =>$ tube ,'timeout ' =>$ timeout ],$ client );
156
+
157
+ $ envelope =$ connection ->get ();
158
+
159
+ $ this ->assertSame ($ id ,$ envelope ['id ' ]);
160
+ $ this ->assertSame ($ beanstalkdEnvelope ['body ' ],$ envelope ['body ' ]);
161
+ $ this ->assertSame ($ beanstalkdEnvelope ['headers ' ],$ envelope ['headers ' ]);
162
+
163
+ $ envelope =$ connection ->get ();
164
+
165
+ $ this ->assertSame ($ id2 ,$ envelope ['id ' ]);
166
+ $ this ->assertSame ($ beanstalkdEnvelope ['body ' ],$ envelope ['body ' ]);
167
+ $ this ->assertSame ($ beanstalkdEnvelope ['headers ' ],$ envelope ['headers ' ]);
168
+ }
169
+
170
+ public function testGetOnReconnect ()
171
+ {
172
+ $ id ='1234 ' ;
173
+ $ beanstalkdEnvelope = [
174
+ 'body ' =>'foo ' ,
175
+ 'headers ' =>'bar ' ,
176
+ ];
177
+
178
+ $ tube ='baz ' ;
179
+ $ timeout =44 ;
180
+
181
+ $ tubeList =new TubeList ($ tubeName =new TubeName ($ tube ),$ tubeNameDefault =new TubeName ('default ' ));
182
+
183
+ $ client =$ this ->createMock (PheanstalkInterface::class);
184
+ $ client ->expects ($ this ->exactly (2 ))->method ('watch ' )->with ($ tubeName )->willReturn (2 );
185
+ $ client ->expects ($ this ->exactly (2 ))->method ('listTubesWatched ' )->willReturn ($ tubeList );
186
+ $ client ->expects ($ this ->exactly (2 ))->method ('ignore ' )->with ($ tubeNameDefault )->willReturn (1 );
187
+ $ client ->expects ($ this ->exactly (2 ))->method ('reserveWithTimeout ' )->with ($ timeout )->willReturnOnConsecutiveCalls (
188
+ $ this ->throwException (new ConnectionException ('123 ' ,'foobar ' )),
189
+ new Job (new JobId ($ id ),json_encode ($ beanstalkdEnvelope )),
190
+ );
150
191
151
192
$ connection =new Connection (['tube_name ' =>$ tube ,'timeout ' =>$ timeout ],$ client );
152
193
@@ -370,10 +411,11 @@ public function testSend()
370
411
$ expectedDelay =$ delay /1000 ;
371
412
372
413
$ id ='110 ' ;
414
+ $ id2 ='111 ' ;
373
415
374
416
$ client =$ this ->createMock (PheanstalkInterface::class);
375
417
$ client ->expects ($ this ->once ())->method ('useTube ' )->with (new TubeName ($ tube ));
376
- $ client ->expects ($ this ->once ( ))->method ('put ' )->with (
418
+ $ client ->expects ($ this ->exactly ( 2 ))->method ('put ' )->with (
377
419
$ this ->callback (function (string $ data )use ($ body ,$ headers ):bool {
378
420
$ expectedMessage =json_encode ([
379
421
'body ' =>$ body ,
@@ -385,7 +427,51 @@ public function testSend()
385
427
1024 ,
386
428
$ expectedDelay ,
387
429
90
388
- )->willReturn (new Job (new JobId ($ id ),'foobar ' ));
430
+ )->willReturnOnConsecutiveCalls (
431
+ new Job (new JobId ($ id ),'foobar ' ),
432
+ new Job (new JobId ($ id2 ),'foobar ' ),
433
+ );
434
+
435
+ $ connection =new Connection (['tube_name ' =>$ tube ],$ client );
436
+
437
+ $ returnedId =$ connection ->send ($ body ,$ headers ,$ delay );
438
+
439
+ $ this ->assertSame ($ id ,$ returnedId );
440
+
441
+ $ returnedId =$ connection ->send ($ body ,$ headers ,$ delay );
442
+
443
+ $ this ->assertSame ($ id2 ,$ returnedId );
444
+ }
445
+
446
+ public function testSendOnReconnect ()
447
+ {
448
+ $ tube ='xyz ' ;
449
+
450
+ $ body ='foo ' ;
451
+ $ headers = ['test ' =>'bar ' ];
452
+ $ delay =1000 ;
453
+ $ expectedDelay =$ delay /1000 ;
454
+
455
+ $ id ='110 ' ;
456
+
457
+ $ client =$ this ->createMock (PheanstalkInterface::class);
458
+ $ client ->expects ($ this ->exactly (2 ))->method ('useTube ' )->with (new TubeName ($ tube ));
459
+ $ client ->expects ($ this ->exactly (2 ))->method ('put ' )->with (
460
+ $ this ->callback (function (string $ data )use ($ body ,$ headers ):bool {
461
+ $ expectedMessage =json_encode ([
462
+ 'body ' =>$ body ,
463
+ 'headers ' =>$ headers ,
464
+ ]);
465
+
466
+ return $ expectedMessage ===$ data ;
467
+ }),
468
+ 1024 ,
469
+ $ expectedDelay ,
470
+ 90
471
+ )->willReturnOnConsecutiveCalls (
472
+ $ this ->throwException (new ConnectionException ('123 ' ,'foobar ' )),
473
+ new Job (new JobId ($ id ),'foobar ' ),
474
+ );
389
475
390
476
$ connection =new Connection (['tube_name ' =>$ tube ],$ client );
391
477
@@ -520,4 +606,5 @@ public function testSendWithRoundedDelay()
520
606
521
607
interface PheanstalkInterfaceextends PheanstalkPublisherInterface, PheanstalkSubscriberInterface, PheanstalkManagerInterface
522
608
{
609
+ public function disconnect ():void ;
523
610
}