Uh oh!
There was an error while loading.Please reload this page.
- Notifications
You must be signed in to change notification settings - Fork9.7k
[Messenger][AmqpExt] Add a retry mechanism for AMQP messages#27008
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Conversation
| $queue->setName(str_replace('%attempt%',$attemptNumber,$name)); | ||
| // $queue->setFlags(AMQP_DURABLE); | ||
| $queue->setArguments(array( | ||
| 'x-message-ttl' =>30000,// 30 seconds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
This TTL should be configurable per attempt number.
| $exchange =$this->amqpFactory->createExchange($this->channel()); | ||
| $exchange->setName($retryConfiguration['name'] ??'retry'); | ||
| $exchange->setType(AMQP_EX_TYPE_DIRECT); | ||
| // $exchange->setFlags(AMQP_DURABLE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
To be removed.
| { | ||
| $queue =$this->amqpFactory->createQueue($this->channel()); | ||
| $queue->setName(str_replace('%attempt%',$attemptNumber,$name)); | ||
| // $queue->setFlags(AMQP_DURABLE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
To be removed.
| returnfalse; | ||
| } | ||
| $routingKey ='dead'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Should be a configurabledead_routing_key
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
dead is a very strong word for humans. Could we useexpired instead? Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Well, that’s the term used by AMQP unfortunately. Maybetoo_many_attempts if really you want to change? 🤔
| $queue =$this->amqpFactory->createQueue($this->channel()); | ||
| $queue->setName($retryConfiguration['dead_queue']); | ||
| $queue->declareQueue(); | ||
| $queue->bind($exchange->getName(),'dead'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Should be using thedead_routing_key configuration.
lyrixx left a comment• edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
IMHO, this really need to be tested against a real implementation of AMQP (RabbitMQ).
I read very quickly the code and it seems this does not work well. I guess I should test it (by hand) to ensure work, but it looks like many case are not supported:
- retrying a message that come from an exchange in a fan out mode => the message will be dispatched is all queue that are bound. It should not
- retrying a message that come from an exchange in an header mode => the message will not be dispatched at all.
Then, I would be nice to be able to configure the type of retry strategy (linear / backoff), and offset (the 3/4 first iteration may be too close, and so we want to skip it) and of course the delay
Another things: all header should be casted to string.
From PHP, with the PECL, it's not really possible to create a typed header (in AMQP you can tell if the header is a string or an int). So it's better to usestring everywhere to avoid edge case. I some version of rabbitmq, we hit this issue. The exchange were created with "string" type. But the message where created with "int" type. So there were not matches. Really hard to find bugs ;)
Edited: I forgot to say, you can grad some code from#23315 where the retry mechanisme is really advanced / configurable. But the more I read code about Messenger, The more I think we should reconsider#23315. How I see things: I should remove everything about worker, keep only AMQP stuff, and update the Messenger to use AMQP. (ping@fabpot)
| $queue =$this->amqpFactory->createQueue($this->channel()); | ||
| $queue->setName(str_replace('%attempt%',$attemptNumber,$retryConfiguration['queue_name_pattern'] ??'retry_queue_%attempt%')); | ||
| $queue->setArguments(array( | ||
| 'x-message-ttl' =>$retryConfiguration['ttls'][$attemptNumber -1] ??30000,// 30 seconds by default |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
why did you add as in the key name ?
sroze commentedApr 23, 2018
That's the setup I used to test this retry mechanism and it works well. I'd love you to try and see really in which case it doesn't work :)
If you believe you can't make the AMQP adapter much better by doing so, it is probably an interesting option (lots of work on your side though 😄). Though, there is a point in keeping the AMQP adapter simple and not having it "fully blown featured". Obviously, there is the maintenance cost point. But also, most people will use the default auto-setuped queues, right? I also believe that everybody willing to configure the queues and exchanges very precisely will very rarely be fully satisfied by auto-configuration (regardless of the amount of effort we put into it) and end up configuring their AMQP bits manually, that's what everybody has been doing so far at the end, and this retry mechanism PR allows you to configure the routing key pattern without auto-configuring it, so this use-case would work. |
kbond commentedApr 26, 2018
Question about retries in general: If a consumed message is handled by multiple handlers but only one throws an exception - how do you handle this? It probably isn't desirable to retry the message on handlers that were successful. |
nicolas-grekas commentedMay 4, 2018
Rebase needed after#27129 |
sroze commentedMay 9, 2018
That's a good question. You are right, it might not be desirable to retry on successful handlers but I believe that's the only (without a crazy amount of code) way to make it simple. On the other hand, I'd argue that something "critical" (a command in the CQRS point-of-view) will only have one handler. The things that will have multiple handlers are basically event subscribers and will be projections (or similar) and there should be no problem with having them running multiple times. |
ca47435 todfdb38cComparesroze commentedMay 9, 2018
PR rebased. |
sroze commentedMay 9, 2018
I suggest we go ahead with this simple (optional) retry implementation for 4.1 and replace it with the one in the AMQP component later when introduced. |
ogizanagi left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Not much to say. Looks good to me.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
| } | ||
| $maximumAttempts =$retryConfiguration['attempts'] ??3; | ||
| $routingKey =str_replace('%attempt%',$attemptNumber,$retryConfiguration['routing_key_pattern'] ??'attempt_%attempt%'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Could we normalize all the options values before using it, in the constructor for instance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Yep, it's a good idea.
kbond commentedMay 9, 2018 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
@sroze another potential problem could be an exception being thrown at a point where retrying the message would cause additional problems. What about an optional mechanism to have the subscriber tell the message bus/receiver to retry the message? This could also tell the bus to retry just this specific handler. // in your handler/subscribertry {// ... some logic that throws an exception}catch (\Throwable$e) {thrownewRetryException(__CLASS__,$e);// or instead of __CLASS__ we can get the class from the exception's trace?} This is probably a discussion for another place as it is a more general retry - I can open an issue if we want to discuss further. |
sroze commentedMay 9, 2018
I'd say so. Though a very good point; I'd be great if you can open an issue, describe the issue and, cherry on top, if you have an idea on how to implement, propose something :) |
624ae8d to4ae654bComparesroze commentedMay 10, 2018
I don't think that fabbot's PoV is relevant here :) |
sroze commentedMay 10, 2018
Documentation PR is here:symfony/symfony-docs/pull/9756 |
| private$amqpRetryExchange; | ||
| /** | ||
| * Available options: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
With so many options, it's IMO better to use a configuration class. Then the code self-explaining and more explicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
But it would mean that we have something creating the configuration object from the DSNs. I'd say that keeping these options as they are now is good and that we don't need more code to maintain just for this configuration details.
ogizanagiMay 11, 2018 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Yes, I think it's fine like this (but not ideal, true). Otherwise we'll need something similar as theSecurityFactoryInterface in order to make it extendable to any transport with similar use-cases.
Uh oh!
There was an error while loading.Please reload this page.
src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Add some tests and remove the intrication between the receiver and the connectionConfigure each TTL individually and the dead routing keyThe `ttls` array is 0-indexedUpdate the retry based on feedback (`ttls` -> `ttl`, add options to documentation and normalises the default values)Catches failed retries and forward other messages' attributes
sroze commentedAug 12, 2018
This pull-request is welcoming another round of review 💚 |
Koc commentedAug 12, 2018
@sroze how can we programatically requeue message with specified delay from handler? For example for cases, when we cann't process this message now. |
sroze commentedAug 24, 2018
In the given implementation we can't. I believe that this is out of scope for now anyway and that it will be handled with the AMQP component (#27140) which will provide more features such as this one 👍 |
fabpot commentedOct 10, 2018
@sroze Can we talk about this one this week IRL? |
ragboyjr commentedDec 9, 2018
@sroze do you think it would be possible to implement this in away so that it would standardize the retrying logic across the various transports? |
weaverryan commentedMar 15, 2019
I've brought the AMQP retry logic from Sam and put it into#30557 - but also fitting it into a system where retrying has been made generic, so it can be implemented/included in future transports. I would LOVE review on that - I'm running full speed to get the features Messenger needs/deserves, but I need queue experts to help me avoid pitfalls. |
lyrixx commentedMar 16, 2019
IMHO this one could be closed in favor or#30557 |
fabpot commentedMar 17, 2019
Closing in favor of#30557 |
… (weaverryan)This PR was merged into the 4.3-dev branch.Discussion----------[Messenger] Worker events + global retry functionality| Q | A| ------------- | ---| Branch? | master| Bug fix? | no| New feature? | yes| BC breaks? | yes, on Messenger only| Deprecations? | no| Tests pass? | NEEDED| Fixed tickets |#29132,#27008,#27215 and part of#30540| License | MIT| Doc PR | TODOThis is an alternative to#29132 and#27008. There are several big things:1) The `messenger:consume` does not die if a handler has an error2) Events are dispatched before, after and on error a message being handled3) Logic is moved out of Amqp and into the Worker so that we can have some consistent features, like error handling.4) A generic retry system was added, which works with Amqp and future transports should support. It will work out of the box for users. Retrying works by putting the received `Envelope` back into the bus, but with the `ReceivedStamp` removed. The retry functionality has an integration test for AMQP.5) Added a new `MessageDecodingFailedException` that transport Serializers should throw if `decode()` fails. It allows us to reject a message in this situation, as allowing it to fail but remain on the queue causes it to be retried forever.6) A new `DelayStamp` was added, which is the first of (later) more stamps for configuring the transport layer (see#30558).BC breaks are documented in the CHANGELOG.Thanks!Commits-------a989384 Adding global retry support, events & more to messenger transport
…ndler (keulinho, sroze)This PR was merged into the 4.3-dev branch.Discussion----------[Messenger] Ensure message is handled only once per handlerAdd check to ensure that a message is only handled once per handlerAdd try...catch to run all handlers before throwing exception| Q | A| ------------- | ---| Branch? | master| Bug fix? | no| New feature? | yes| BC breaks? | no| Deprecations? |no| Tests pass? | yes| Fixed tickets |#27215| License | MIT| Doc PR | TodoThis would make error handling and retrying of messages much more easier. As statet here#27008 (comment) there is currently no way to retry a for all failed handlers if there are mutliple handlers and just some throw an exception.Also if an Exception in an handler occurs the execution chain is disrupted and the other handlers are never invoked.With this change it is easily possible to create an userland middleware that catches the `ChainedHandlerFailedException` and does some custom retry logic. If you ensure that the `HandledStamps` on the `Envelope` are preserved the message will be handled just by the failed handlersCommits-------2e5e910 Rename exception, add change log and a few other thingse6e4cde Ensure message is handled only once per handler
Uh oh!
There was an error while loading.Please reload this page.
One of the most asked features (and required to have an enterprise-grade management of errors). I chose the approach of an AmqpExt-only retry mechanism to ensure we have a great user experience with this one at least. There are discussions on-going (#26945) to have more generic things like that across adapters.
So what's the point?
When processing some messages, some error might happen. It might be that one of our 3rd party is down, that something is wrong with the last deployment, etc... All of these things will throw exceptions in the message handlers. The way of tackling this problem is very well described in@odolbeau'sSwarrot Retry provider real-life example.
How to configure it
Same than for the rest, this can be configured both via the DSN or the
optionswhen creating the adapter. Here is a bunch of working DSNs with their corresponding effect: