@@ -62,9 +62,8 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
6262 {
6363$ this ->connectionOptions =array_replace_recursive ([
6464'delay ' => [
65- 'routing_key_pattern ' =>'delay_%exchange_name%_%routing_key%_%delay% ' ,
6665'exchange_name ' =>'delay ' ,
67- 'queue_name_pattern ' =>'delay_queue_ %exchange_name%_%routing_key%_%delay% ' ,
66+ 'queue_name_pattern ' =>'delay_ %exchange_name%_%routing_key%_%delay% ' ,
6867 ],
6968 ],$ connectionOptions );
7069$ this ->exchangeOptions =$ exchangeOptions ;
@@ -93,9 +92,8 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
9392 * * flags: Exchange flags (Default: AMQP_DURABLE)
9493 * * arguments: Extra arguments
9594 * * delay:
96- * * routing_key_pattern: The pattern of the routing key (Default: "delay_%exchange_name%_%routing_key%_%delay%")
97- * * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%exchange_name%_%routing_key%_%delay%")
98- * * exchange_name: Name of the exchange to be used for the retried messages (Default: "delay")
95+ * * queue_name_pattern: Pattern to use to create the queues (Default: "delay_%exchange_name%_%routing_key%_%delay%")
96+ * * exchange_name: Name of the exchange to be used for the delayed/retried messages (Default: "delay")
9997 * * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
10098 * * prefetch_count: set channel prefetch count
10199 */
@@ -171,20 +169,20 @@ private static function normalizeQueueArguments(array $arguments): array
171169 }
172170
173171/**
174- * @param int $delay The delay in milliseconds
175- *
176172 * @throws \AMQPException
177173 */
178- public function publish (string $ body ,array $ headers = [],int $ delay =0 ,AmqpStamp $ amqpStamp =null ):void
174+ public function publish (string $ body ,array $ headers = [],int $ delayInMs =0 ,AmqpStamp $ amqpStamp =null ):void
179175 {
180- if (0 !==$ delay ) {
181- $ this ->publishWithDelay ($ body ,$ headers ,$ delay ,$ amqpStamp );
176+ $ this ->clearWhenDisconnected ();
177+
178+ if (0 !==$ delayInMs ) {
179+ $ this ->publishWithDelay ($ body ,$ headers ,$ delayInMs ,$ amqpStamp );
182180
183181return ;
184182 }
185183
186184if ($ this ->shouldSetup ()) {
187- $ this ->setup ();
185+ $ this ->setupExchangeAndQueues ();
188186 }
189187
190188$ this ->publishOnExchange (
@@ -213,9 +211,7 @@ private function publishWithDelay(string $body, array $headers, int $delay, Amqp
213211 {
214212$ routingKey =$ this ->getRoutingKeyForMessage ($ amqpStamp );
215213
216- if ($ this ->shouldSetup ()) {
217- $ this ->setupDelay ($ delay ,$ routingKey );
218- }
214+ $ this ->setupDelay ($ delay ,$ routingKey );
219215
220216$ this ->publishOnExchange (
221217$ this ->getDelayExchange (),
@@ -241,15 +237,12 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, string
241237
242238private function setupDelay (int $ delay , ?string $ routingKey )
243239 {
244- if (! $ this ->channel ()-> isConnected ()) {
245- $ this ->clear ();
240+ if ($ this ->shouldSetup ()) {
241+ $ this ->setup ();// setup delay exchange and normal exchange for delay queue to DLX messages to
246242 }
247243
248- $ this ->exchange ()->declareExchange ();// setup normal exchange for delay queue to DLX messages to
249- $ this ->getDelayExchange ()->declareExchange ();
250-
251244$ queue =$ this ->createDelayQueue ($ delay ,$ routingKey );
252- $ queue ->declareQueue ();
245+ $ queue ->declareQueue ();// the delay queue always need to be declared because the name is dynamic and cannot be declared in advance
253246$ queue ->bind ($ this ->connectionOptions ['delay ' ]['exchange_name ' ],$ this ->getRoutingKeyForDelay ($ delay ,$ routingKey ));
254247 }
255248
@@ -283,6 +276,9 @@ private function createDelayQueue(int $delay, ?string $routingKey)
283276 ));
284277$ queue ->setArguments ([
285278'x-message-ttl ' =>$ delay ,
279+ // delete the delay queue 10 seconds after the message expires
280+ // publishing another message redeclares the queue which renews the lease
281+ 'x-expires ' =>$ delay +10000 ,
286282'x-dead-letter-exchange ' =>$ this ->exchangeOptions ['name ' ],
287283// after being released from to DLX, make sure the original routing key will be used
288284// we must use an empty string instead of null for the argument to be picked up
@@ -297,7 +293,7 @@ private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): st
297293return str_replace (
298294 ['%delay% ' ,'%exchange_name% ' ,'%routing_key% ' ],
299295 [$ delay ,$ this ->exchangeOptions ['name ' ],$ finalRoutingKey ??'' ],
300- $ this ->connectionOptions ['delay ' ]['routing_key_pattern ' ]
296+ $ this ->connectionOptions ['delay ' ]['queue_name_pattern ' ]
301297 );
302298 }
303299
@@ -308,8 +304,10 @@ private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): st
308304 */
309305public function get (string $ queueName ): ?\AMQPEnvelope
310306 {
307+ $ this ->clearWhenDisconnected ();
308+
311309if ($ this ->shouldSetup ()) {
312- $ this ->setup ();
310+ $ this ->setupExchangeAndQueues ();
313311 }
314312
315313try {
@@ -319,7 +317,7 @@ public function get(string $queueName): ?\AMQPEnvelope
319317 }catch (\AMQPQueueException $ e ) {
320318if (404 ===$ e ->getCode () &&$ this ->shouldSetup ()) {
321319// If we get a 404 for the queue, it means we need to setup the exchange & queue.
322- $ this ->setup ();
320+ $ this ->setupExchangeAndQueues ();
323321
324322return $ this ->get ();
325323 }
@@ -342,10 +340,12 @@ public function nack(\AMQPEnvelope $message, string $queueName, int $flags = AMQ
342340
343341public function setup ():void
344342 {
345- if (! $ this ->channel ()-> isConnected ()) {
346- $ this ->clear ();
347- }
343+ $ this ->setupExchangeAndQueues ();
344+ $ this ->getDelayExchange ()-> declareExchange ();
345+ }
348346
347+ private function setupExchangeAndQueues ():void
348+ {
349349$ this ->exchange ()->declareExchange ();
350350
351351foreach ($ this ->queuesOptions as $ queueName =>$ queueConfig ) {
@@ -424,12 +424,14 @@ public function exchange(): \AMQPExchange
424424return $ this ->amqpExchange ;
425425 }
426426
427- private function clear ():void
427+ private function clearWhenDisconnected ():void
428428 {
429- $ this ->amqpChannel =null ;
430- $ this ->amqpQueues = [];
431- $ this ->amqpExchange =null ;
432- $ this ->amqpDelayExchange =null ;
429+ if (!$ this ->channel ()->isConnected ()) {
430+ $ this ->amqpChannel =null ;
431+ $ this ->amqpQueues = [];
432+ $ this ->amqpExchange =null ;
433+ $ this ->amqpDelayExchange =null ;
434+ }
433435 }
434436
435437private function shouldSetup ():bool