Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

[Messenger][AmqpExt] Add a retry mechanism for AMQP messages#27008

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Closed
sroze wants to merge2 commits intosymfony:masterfromsroze:retry-amqp-messages

Conversation

@sroze
Copy link
Contributor

@srozesroze commentedApr 22, 2018
edited
Loading

QA
Branch?master
Bug fix?no
New feature?yes
BC breaks?no
Deprecations?no
Tests pass?yes
Fixed ticketsø
LicenseMIT
Doc PRsymfony/symfony-docs#9756

One of the most asked features (and required to have an enterprise-grade management of errors). I chose the approach of an AmqpExt-only retry mechanism to ensure we have a great user experience with this one at least. There are discussions on-going (#26945) to have more generic things like that across adapters.


So what's the point?

When processing some messages, some error might happen. It might be that one of our 3rd party is down, that something is wrong with the last deployment, etc... All of these things will throw exceptions in the message handlers. The way of tackling this problem is very well described in@odolbeau'sSwarrot Retry provider real-life example.

How to configure it

Same than for the rest, this can be configured both via the DSN or theoptions when creating the adapter. Here is a bunch of working DSNs with their corresponding effect:

  1. Enable 3 retries before having an exception
amqp://guest:guest@localhost:5672/%2f/messages?retry[attempts]=3
  1. Enable 3 retries before message to be queue in a "dead queue" (i.e. messages to be looked at manually later)
amqp://guest:guest@localhost:5672/%2f/messages?retry[attempts]=3&retry[dead_queue]=dead_queue
  1. Enable 3 retries, dead queue and custom settings
amqp://guest:guest@localhost:5672/%2f/messages?retry[attempts]=3&retry[exchange_name]=retry&retry[routing_key_pattern]=retry_attempt_%25attempt%25&retry[queue_name_pattern]=retry_queue_%attempt%&retry[dead_queue]=dead_queue&retry[dead_routing_key]=dead_routing_key
  1. Enable 3 retries, with custom TTLs per attempt. Waits 10 seconds before retrying for the first attempt, 30 seconds for the 2nd attempt and 5 minutes for the 3rd one.
amqp://guest:guest@localhost:5672/%2f/messages?retry[attempts]=3&retry[ttl][0]=10000&retry[ttl][1]=30000&retry[ttl][2]=300000

$queue->setName(str_replace('%attempt%',$attemptNumber,$name));
// $queue->setFlags(AMQP_DURABLE);
$queue->setArguments(array(
'x-message-ttl' =>30000,// 30 seconds
Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This TTL should be configurable per attempt number.

$exchange =$this->amqpFactory->createExchange($this->channel());
$exchange->setName($retryConfiguration['name'] ??'retry');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
// $exchange->setFlags(AMQP_DURABLE);
Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

To be removed.

{
$queue =$this->amqpFactory->createQueue($this->channel());
$queue->setName(str_replace('%attempt%',$attemptNumber,$name));
// $queue->setFlags(AMQP_DURABLE);
Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

To be removed.

returnfalse;
}

$routingKey ='dead';
Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Should be a configurabledead_routing_key

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

dead is a very strong word for humans. Could we useexpired instead? Thanks!

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Well, that’s the term used by AMQP unfortunately. Maybetoo_many_attempts if really you want to change? 🤔

$queue =$this->amqpFactory->createQueue($this->channel());
$queue->setName($retryConfiguration['dead_queue']);
$queue->declareQueue();
$queue->bind($exchange->getName(),'dead');
Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Should be using thedead_routing_key configuration.

@srozesroze added this to the4.1 milestoneApr 23, 2018
Copy link
Member

@lyrixxlyrixx left a comment
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

IMHO, this really need to be tested against a real implementation of AMQP (RabbitMQ).

I read very quickly the code and it seems this does not work well. I guess I should test it (by hand) to ensure work, but it looks like many case are not supported:

  • retrying a message that come from an exchange in a fan out mode => the message will be dispatched is all queue that are bound. It should not
  • retrying a message that come from an exchange in an header mode => the message will not be dispatched at all.

Then, I would be nice to be able to configure the type of retry strategy (linear / backoff), and offset (the 3/4 first iteration may be too close, and so we want to skip it) and of course the delay

Another things: all header should be casted to string.
From PHP, with the PECL, it's not really possible to create a typed header (in AMQP you can tell if the header is a string or an int). So it's better to usestring everywhere to avoid edge case. I some version of rabbitmq, we hit this issue. The exchange were created with "string" type. But the message where created with "int" type. So there were not matches. Really hard to find bugs ;)


Edited: I forgot to say, you can grad some code from#23315 where the retry mechanisme is really advanced / configurable. But the more I read code about Messenger, The more I think we should reconsider#23315. How I see things: I should remove everything about worker, keep only AMQP stuff, and update the Messenger to use AMQP. (ping@fabpot)

$queue =$this->amqpFactory->createQueue($this->channel());
$queue->setName(str_replace('%attempt%',$attemptNumber,$retryConfiguration['queue_name_pattern'] ??'retry_queue_%attempt%'));
$queue->setArguments(array(
'x-message-ttl' =>$retryConfiguration['ttls'][$attemptNumber -1] ??30000,// 30 seconds by default
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

why did you add as in the key name ?

@sroze
Copy link
ContributorAuthor

retrying a message that come from an exchange in a fan out mode => the message will be dispatched is all queue that are bound. It should not

That's the setup I used to test this retry mechanism and it works well. I'd love you to try and see really in which case it doesn't work :)

I think we should reconsider#23315. How I see things: I should remove everything about worker, keep only AMQP stuff, and update the Messenger to use AMQP.

If you believe you can't make the AMQP adapter much better by doing so, it is probably an interesting option (lots of work on your side though 😄).

Though, there is a point in keeping the AMQP adapter simple and not having it "fully blown featured". Obviously, there is the maintenance cost point. But also, most people will use the default auto-setuped queues, right? I also believe that everybody willing to configure the queues and exchanges very precisely will very rarely be fully satisfied by auto-configuration (regardless of the amount of effort we put into it) and end up configuring their AMQP bits manually, that's what everybody has been doing so far at the end, and this retry mechanism PR allows you to configure the routing key pattern without auto-configuring it, so this use-case would work.

@kbond
Copy link
Member

Question about retries in general: If a consumed message is handled by multiple handlers but only one throws an exception - how do you handle this? It probably isn't desirable to retry the message on handlers that were successful.

@nicolas-grekas
Copy link
Member

Rebase needed after#27129

@nicolas-grekasnicolas-grekas changed the base branch frommaster to4.1May 7, 2018 15:06
@sroze
Copy link
ContributorAuthor

If a consumed message is handled by multiple handlers but only one throws an exception - how do you handle this? It probably isn't desirable to retry the message on handlers that were successful.

That's a good question. You are right, it might not be desirable to retry on successful handlers but I believe that's the only (without a crazy amount of code) way to make it simple. On the other hand, I'd argue that something "critical" (a command in the CQRS point-of-view) will only have one handler. The things that will have multiple handlers are basically event subscribers and will be projections (or similar) and there should be no problem with having them running multiple times.

@srozesrozeforce-pushed theretry-amqp-messages branch 2 times, most recently fromca47435 todfdb38cCompareMay 9, 2018 08:27
@sroze
Copy link
ContributorAuthor

PR rebased.

@sroze
Copy link
ContributorAuthor

I suggest we go ahead with this simple (optional) retry implementation for 4.1 and replace it with the one in the AMQP component later when introduced.

ogizanagi
ogizanagi previously approved these changesMay 9, 2018
Copy link
Contributor

@ogizanagiogizanagi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Not much to say. Looks good to me.

}

$maximumAttempts =$retryConfiguration['attempts'] ??3;
$routingKey =str_replace('%attempt%',$attemptNumber,$retryConfiguration['routing_key_pattern'] ??'attempt_%attempt%');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Could we normalize all the options values before using it, in the constructor for instance?

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Yep, it's a good idea.

@kbond
Copy link
Member

kbond commentedMay 9, 2018
edited
Loading

@sroze another potential problem could be an exception being thrown at a point where retrying the message would cause additional problems.

What about an optional mechanism to have the subscriber tell the message bus/receiver to retry the message? This could also tell the bus to retry just this specific handler.

// in your handler/subscribertry {// ... some logic that throws an exception}catch (\Throwable$e) {thrownewRetryException(__CLASS__,$e);// or instead of __CLASS__ we can get the class from the exception's trace?}

This is probably a discussion for another place as it is a more general retry - I can open an issue if we want to discuss further.

@sroze
Copy link
ContributorAuthor

This is probably a discussion for another place as it is a more general retry - I can open an issue if we want to discuss further.

I'd say so. Though a very good point; I'd be great if you can open an issue, describe the issue and, cherry on top, if you have an idea on how to implement, propose something :)

@srozesrozeforce-pushed theretry-amqp-messages branch 2 times, most recently from624ae8d to4ae654bCompareMay 10, 2018 15:13
@sroze
Copy link
ContributorAuthor

I don't think that fabbot's PoV is relevant here :)

@sroze
Copy link
ContributorAuthor

Documentation PR is here:symfony/symfony-docs/pull/9756

private$amqpRetryExchange;

/**
* Available options:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

With so many options, it's IMO better to use a configuration class. Then the code self-explaining and more explicit.

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

But it would mean that we have something creating the configuration object from the DSNs. I'd say that keeping these options as they are now is good and that we don't need more code to maintain just for this configuration details.

Copy link
Contributor

@ogizanagiogizanagiMay 11, 2018
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Yes, I think it's fine like this (but not ideal, true). Otherwise we'll need something similar as theSecurityFactoryInterface in order to make it extendable to any transport with similar use-cases.

@srozesroze added this to thenext milestoneMay 21, 2018
Add some tests and remove the intrication between the receiver and the connectionConfigure each TTL individually and the dead routing keyThe `ttls` array is 0-indexedUpdate the retry based on feedback (`ttls` -> `ttl`, add options to documentation and normalises the default values)Catches failed retries and forward other messages' attributes
@srozesroze changed the base branch from4.1 tomasterAugust 12, 2018 17:32
@sroze
Copy link
ContributorAuthor

This pull-request is welcoming another round of review 💚

@Koc
Copy link
Contributor

Koc commentedAug 12, 2018

@sroze how can we programatically requeue message with specified delay from handler? For example for cases, when we cann't process this message now.

@sroze
Copy link
ContributorAuthor

can we programatically requeue message with specified delay from handler?

In the given implementation we can't. I believe that this is out of scope for now anyway and that it will be handled with the AMQP component (#27140) which will provide more features such as this one 👍

@fabpot
Copy link
Member

@sroze Can we talk about this one this week IRL?

@ragboyjr
Copy link
Contributor

@sroze do you think it would be possible to implement this in away so that it would standardize the retrying logic across the various transports?

@weaverryan
Copy link
Member

I've brought the AMQP retry logic from Sam and put it into#30557 - but also fitting it into a system where retrying has been made generic, so it can be implemented/included in future transports.

I would LOVE review on that - I'm running full speed to get the features Messenger needs/deserves, but I need queue experts to help me avoid pitfalls.

@lyrixx
Copy link
Member

IMHO this one could be closed in favor or#30557

@fabpot
Copy link
Member

Closing in favor of#30557

@fabpotfabpot closed thisMar 17, 2019
fabpot added a commit that referenced this pull requestMar 23, 2019
… (weaverryan)This PR was merged into the 4.3-dev branch.Discussion----------[Messenger] Worker events + global retry functionality| Q             | A| ------------- | ---| Branch?       | master| Bug fix?      | no| New feature?  | yes| BC breaks?    | yes, on Messenger only| Deprecations? | no| Tests pass?   | NEEDED| Fixed tickets |#29132,#27008,#27215 and part of#30540| License       | MIT| Doc PR        | TODOThis is an alternative to#29132 and#27008. There are several big things:1) The `messenger:consume` does not die if a handler has an error2) Events are dispatched before, after and on error a message being handled3) Logic is moved out of Amqp and into the Worker so that we can have some consistent features, like error handling.4) A generic retry system was added, which works with Amqp and future transports should support. It will work out of the box for users. Retrying works by putting the received `Envelope` back into the bus, but with the `ReceivedStamp` removed. The retry functionality has an integration test for AMQP.5) Added a new `MessageDecodingFailedException` that transport Serializers should throw if `decode()` fails. It allows us to reject a message in this situation, as allowing it to fail but remain on the queue causes it to be retried forever.6) A new `DelayStamp` was added, which is the first of (later) more stamps for configuring the transport layer (see#30558).BC breaks are documented in the CHANGELOG.Thanks!Commits-------a989384 Adding global retry support, events & more to messenger transport
fabpot added a commit that referenced this pull requestApr 6, 2019
…ndler (keulinho, sroze)This PR was merged into the 4.3-dev branch.Discussion----------[Messenger] Ensure message is handled only once per handlerAdd check to ensure that a message is only handled once per handlerAdd try...catch to run all handlers before throwing exception| Q             | A| ------------- | ---| Branch?       | master| Bug fix?      | no| New feature?  | yes| BC breaks?    | no| Deprecations? |no| Tests pass?   | yes| Fixed tickets |#27215| License       | MIT| Doc PR        | TodoThis would make error handling and retrying of messages much more easier. As statet  here#27008 (comment) there is currently no way to retry a for all failed handlers if there are mutliple handlers and just some throw an exception.Also if an Exception in an handler occurs the execution chain is disrupted and the other handlers are never invoked.With this change it is easily possible to create an userland middleware that catches the `ChainedHandlerFailedException` and does some custom retry logic. If you ensure that the `HandledStamps` on the `Envelope` are preserved the message will be handled just by the failed handlersCommits-------2e5e910 Rename exception, add change log and a few other thingse6e4cde Ensure message is handled only once per handler
@nicolas-grekasnicolas-grekas modified the milestones:next,4.3Apr 30, 2019
Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment

Reviewers

@javiereguiluzjaviereguiluzjaviereguiluz left review comments

@nicolas-grekasnicolas-grekasnicolas-grekas left review comments

@lyrixxlyrixxlyrixx left review comments

@dunglasdunglasAwaiting requested review from dunglas

@xabbuhxabbuhAwaiting requested review from xabbuh

+2 more reviewers

@TobionTobionTobion left review comments

@ogizanagiogizanagiogizanagi left review comments

Reviewers whose approvals may not affect merge requirements

Assignees

No one assigned

Projects

None yet

Milestone

4.3

Development

Successfully merging this pull request may close these issues.

12 participants

@sroze@kbond@nicolas-grekas@javiereguiluz@weaverryan@Koc@fabpot@ragboyjr@lyrixx@Tobion@ogizanagi@carsonbot

[8]ページ先頭

©2009-2025 Movatter.jp