@@ -413,36 +413,26 @@ public function testAutoSetupWithDelayDeclaresExchangeQueuesAndDelay()
413413
414414public function testItDelaysTheMessage ()
415415 {
416- $ amqpConnection =$ this ->createMock (\AMQPConnection::class);
417- $ amqpChannel =$ this ->createMock (\AMQPChannel::class);
418-
419- $ factory =$ this ->createMock (AmqpFactory::class);
420- $ factory ->method ('createConnection ' )->willReturn ($ amqpConnection );
421- $ factory ->method ('createChannel ' )->willReturn ($ amqpChannel );
422- $ factory ->method ('createQueue ' )->will ($ this ->onConsecutiveCalls (
423- $ this ->createMock (\AMQPQueue::class),
424- $ delayQueue =$ this ->createMock (\AMQPQueue::class)
425- ));
426- $ factory ->method ('createExchange ' )->will ($ this ->onConsecutiveCalls (
427- $ this ->createMock (\AMQPExchange::class),
428- $ delayExchange =$ this ->createMock (\AMQPExchange::class)
429- ));
416+ $ delayExchange =$ this ->createMock (\AMQPExchange::class);
417+ $ delayExchange ->expects ($ this ->once ())
418+ ->method ('publish ' )
419+ ->with ('{} ' ,'delay_messages__delay_5000 ' ,AMQP_NOPARAM , ['headers ' => ['x-some-headers ' =>'foo ' ],'delivery_mode ' =>2 ]);
420+ $ connection =$ this ->createDelayOrRetryConnection ($ delayExchange ,self ::DEFAULT_EXCHANGE_NAME ,'delay_messages__delay_5000 ' );
430421
431- $ delayQueue ->expects ($ this ->once ())->method ('setName ' )->with ('delay_messages__5000 ' );
432- $ delayQueue ->expects ($ this ->once ())->method ('setArguments ' )->with ([
433- 'x-message-ttl ' =>5000 ,
434- 'x-expires ' =>5000 +10000 ,
435- 'x-dead-letter-exchange ' =>self ::DEFAULT_EXCHANGE_NAME ,
436- 'x-dead-letter-routing-key ' =>'' ,
437- ]);
438-
439- $ delayQueue ->expects ($ this ->once ())->method ('declareQueue ' );
440- $ delayQueue ->expects ($ this ->once ())->method ('bind ' )->with ('delays ' ,'delay_messages__5000 ' );
422+ $ connection ->publish ('{} ' , ['x-some-headers ' =>'foo ' ],5000 );
423+ }
441424
442- $ delayExchange ->expects ($ this ->once ())->method ('publish ' )->with ('{} ' ,'delay_messages__5000 ' ,AMQP_NOPARAM , ['headers ' => ['x-some-headers ' =>'foo ' ],'delivery_mode ' =>2 ]);
425+ public function testItRetriesTheMessage ()
426+ {
427+ $ delayExchange =$ this ->createMock (\AMQPExchange::class);
428+ $ delayExchange ->expects ($ this ->once ())
429+ ->method ('publish ' )
430+ ->with ('{} ' ,'delay_messages__retry_5000 ' ,AMQP_NOPARAM );
431+ $ connection =$ this ->createDelayOrRetryConnection ($ delayExchange ,'' ,'delay_messages__retry_5000 ' );
443432
444- $ connection = Connection::fromDsn ('amqp://localhost ' , [],$ factory );
445- $ connection ->publish ('{} ' , ['x-some-headers ' =>'foo ' ],5000 );
433+ $ amqpEnvelope =$ this ->createMock (\AMQPEnvelope::class);
434+ $ amqpStamp = AmqpStamp::createFromAmqpEnvelope ($ amqpEnvelope ,null ,'' );
435+ $ connection ->publish ('{} ' , [],5000 ,$ amqpStamp );
446436 }
447437
448438public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs ()
@@ -470,7 +460,7 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
470460
471461$ connection = Connection::fromDsn ('amqp://localhost ' ,$ connectionOptions ,$ factory );
472462
473- $ delayQueue ->expects ($ this ->once ())->method ('setName ' )->with ('delay_messages__120000 ' );
463+ $ delayQueue ->expects ($ this ->once ())->method ('setName ' )->with ('delay_messages__delay_120000 ' );
474464$ delayQueue ->expects ($ this ->once ())->method ('setArguments ' )->with ([
475465'x-message-ttl ' =>120000 ,
476466'x-expires ' =>120000 +10000 ,
@@ -479,9 +469,9 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
479469 ]);
480470
481471$ delayQueue ->expects ($ this ->once ())->method ('declareQueue ' );
482- $ delayQueue ->expects ($ this ->once ())->method ('bind ' )->with ('delays ' ,'delay_messages__120000 ' );
472+ $ delayQueue ->expects ($ this ->once ())->method ('bind ' )->with ('delays ' ,'delay_messages__delay_120000 ' );
483473
484- $ delayExchange ->expects ($ this ->once ())->method ('publish ' )->with ('{} ' ,'delay_messages__120000 ' ,AMQP_NOPARAM , ['headers ' => [],'delivery_mode ' =>2 ]);
474+ $ delayExchange ->expects ($ this ->once ())->method ('publish ' )->with ('{} ' ,'delay_messages__delay_120000 ' ,AMQP_NOPARAM , ['headers ' => [],'delivery_mode ' =>2 ]);
485475$ connection ->publish ('{} ' , [],120000 );
486476 }
487477
@@ -589,7 +579,7 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument
589579
590580$ connection = Connection::fromDsn ('amqp://localhost ' ,$ connectionOptions ,$ factory );
591581
592- $ delayQueue ->expects ($ this ->once ())->method ('setName ' )->with ('delay_messages_routing_key_120000 ' );
582+ $ delayQueue ->expects ($ this ->once ())->method ('setName ' )->with ('delay_messages_routing_key_delay_120000 ' );
593583$ delayQueue ->expects ($ this ->once ())->method ('setArguments ' )->with ([
594584'x-message-ttl ' =>120000 ,
595585'x-expires ' =>120000 +10000 ,
@@ -598,9 +588,9 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument
598588 ]);
599589
600590$ delayQueue ->expects ($ this ->once ())->method ('declareQueue ' );
601- $ delayQueue ->expects ($ this ->once ())->method ('bind ' )->with ('delays ' ,'delay_messages_routing_key_120000 ' );
591+ $ delayQueue ->expects ($ this ->once ())->method ('bind ' )->with ('delays ' ,'delay_messages_routing_key_delay_120000 ' );
602592
603- $ delayExchange ->expects ($ this ->once ())->method ('publish ' )->with ('{} ' ,'delay_messages_routing_key_120000 ' ,AMQP_NOPARAM , ['headers ' => [],'delivery_mode ' =>2 ]);
593+ $ delayExchange ->expects ($ this ->once ())->method ('publish ' )->with ('{} ' ,'delay_messages_routing_key_delay_120000 ' ,AMQP_NOPARAM , ['headers ' => [],'delivery_mode ' =>2 ]);
604594$ connection ->publish ('{} ' , [],120000 ,new AmqpStamp ('routing_key ' ));
605595 }
606596
@@ -623,6 +613,37 @@ public function testItCanPublishWithCustomFlagsAndAttributes()
623613$ connection = Connection::fromDsn ('amqp://localhost ' , [],$ factory );
624614$ connection ->publish ('body ' , ['type ' => DummyMessage::class],0 ,new AmqpStamp ('routing_key ' ,AMQP_IMMEDIATE , ['delivery_mode ' =>2 ]));
625615 }
616+
617+ private function createDelayOrRetryConnection (\AMQPExchange $ delayExchange ,string $ deadLetterExchangeName ,string $ delayQueueName ):Connection
618+ {
619+ $ amqpConnection =$ this ->createMock (\AMQPConnection::class);
620+ $ amqpChannel =$ this ->createMock (\AMQPChannel::class);
621+
622+ $ factory =$ this ->createMock (AmqpFactory::class);
623+ $ factory ->method ('createConnection ' )->willReturn ($ amqpConnection );
624+ $ factory ->method ('createChannel ' )->willReturn ($ amqpChannel );
625+ $ factory ->method ('createQueue ' )->will ($ this ->onConsecutiveCalls (
626+ $ this ->createMock (\AMQPQueue::class),
627+ $ delayQueue =$ this ->createMock (\AMQPQueue::class)
628+ ));
629+ $ factory ->method ('createExchange ' )->will ($ this ->onConsecutiveCalls (
630+ $ this ->createMock (\AMQPExchange::class),
631+ $ delayExchange
632+ ));
633+
634+ $ delayQueue ->expects ($ this ->once ())->method ('setName ' )->with ($ delayQueueName );
635+ $ delayQueue ->expects ($ this ->once ())->method ('setArguments ' )->with ([
636+ 'x-message-ttl ' =>5000 ,
637+ 'x-expires ' =>5000 +10000 ,
638+ 'x-dead-letter-exchange ' =>$ deadLetterExchangeName ,
639+ 'x-dead-letter-routing-key ' =>'' ,
640+ ]);
641+
642+ $ delayQueue ->expects ($ this ->once ())->method ('declareQueue ' );
643+ $ delayQueue ->expects ($ this ->once ())->method ('bind ' )->with ('delays ' ,$ delayQueueName );
644+
645+ return Connection::fromDsn ('amqp://localhost ' , [],$ factory );
646+ }
626647}
627648
628649class TestAmqpFactoryextends AmqpFactory