Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

[Messenger] Worker events + global retry functionality#30557

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
fabpot merged 1 commit intosymfony:masterfromweaverryan:worker-events
Mar 23, 2019

Conversation

@weaverryan
Copy link
Member

@weaverryanweaverryan commentedMar 13, 2019
edited
Loading

QA
Branch?master
Bug fix?no
New feature?yes
BC breaks?yes, on Messenger only
Deprecations?no
Tests pass?NEEDED
Fixed tickets#29132,#27008,#27215 and part of#30540
LicenseMIT
Doc PRTODO

This is an alternative to#29132 and#27008. There are several big things:

  1. Themessenger:consume does not die if a handler has an error
  2. Events are dispatched before, after and on error a message being handled
  3. Logic is moved out of Amqp and into the Worker so that we can have some consistent features, like error handling.
  4. A generic retry system was added, which works with Amqp and future transports should support.
    It will work out of the box for users. Retrying works by putting the receivedEnvelope back into the bus, but with theReceivedStamp removed. The retry functionality has an integration test for AMQP.
  5. Added a newMessageDecodingFailedException that transport Serializers should throw ifdecode() fails. It allows us to reject a message in this situation, as allowing it to fail but remain on the queue causes it to be retried forever.
  6. A newDelayStamp was added, which is the first of (later) more stamps for configuring the transport layer (see[Messenger][RFC] Generic TransportConfig stamp #30558).

BC breaks are documented in the CHANGELOG.

Thanks!

ro0NL, dunglas, yceruto, vincenttouzet, Koc, ip512, OskarStark, soyuka, and wissem reacted with thumbs up emojisroze, Nyholm, ogizanagi, soyuka, and bigfoot90 reacted with heart emoji
@weaverryanweaverryan mentioned this pull requestMar 13, 2019
36 tasks
@nicolas-grekasnicolas-grekas added this to thenext milestoneMar 14, 2019
@weaverryanweaverryan changed the title[WIP] Events for Messenger worker and not failing[WIP][Messenger] Worker events + global retry functionalityMar 15, 2019
@weaverryan
Copy link
MemberAuthor

I've now incorporated generic retry abilities from#27008. I think more details need to be considered, like#30558 (transport stamp) and how it will affect things.

But, generally speaking, what big issues do people see? I'm pushing much more control into theWorker, and making the transport a bit more agnostic to sending/receiving/acking/nacking/retrying.

*/
class Connection
{
publicconstATTEMPT_COUNT_HEADER_NAME ='symfony-messenger-attempts';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

AMQP usesx-death when a message is dead-letter-ed. Maybe we can reuse the same? (not sure if it conflicts though, would need to try).

Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Hmm. I don't know much about this, but I don't think it'll work. Each time we retry, we're re-using the headers from the existing message. This has a nice effect that thex-death header (which is an array) will have 0 items, then 1 item, then 2 items, etc - it'll increase with each "death". The "newest" death apparently (i've just tested) always becomes the0 key - the others are "pushed back". If we try to insert an entry intox-death, it just looks like there was a "previous" death, and Rabbit pushes a new item onto the 0 index and our entry is pushed back to 0.

Itseems like we shouldn't be setting values ontox-death.

sroze reacted with thumbs up emoji
@weaverryan
Copy link
MemberAuthor

weaverryan commentedMar 18, 2019
edited
Loading

Thank you@sroze for the review! I was working concurrently with your review, so I pushed a bunch of changes as you were commenting. I would love another look. I've replied to a few of your comments with the changes I've made. Specifically, theQueuedMetadata thing is now gone, replaced with a Stamp. Also (and this is important), I've given theAmqpReceiver some temporary state. The issue is that theAMQPEnvelope is needed to do things likeack,nack, etc. And because these methods are now called byWorker, I previously was passing this "message" back out to the Worker... which was unfortunate, because this is really an internal detail to each transport. I fixed that by tracking the currentAMQPEnvelope being handled insideAmqpReceiver, which allows us to drop passing theAMQPEnvelope object around.

I also made the retry stuff configurable on a transport-by-transport basis using a service.

Also, question: should we ever redeliver vianack() onto the same queue? Like, if, for some reason, redelivery onto the DLX fails, should wenack() retry? Or is that just madness - trying to gracefully fail when something has gone terribly wrong.

Thanks!

*
* @throws TransportException If there is an issue communicating with the transport
*/
publicfunctionretryCurrentMessage(int$delay):void;
Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

These all rely on being called from inside thereceive() loop, because that keeps temporary "state" about which message is currently being handled. That's a key change here, which simplifies a lot, but which I want to make sure won't cause issues.

@weaverryan
Copy link
MemberAuthor

This is ready for review! This represents a big change in how we handle the transports, so I really appreciate review!

$this->dispatchEvent(
WorkerMessageFailedEvent::class,
newWorkerMessageFailedEvent($envelope,$this->receiverName,$throwable,$shouldRequeue)
);
Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

The events are currently dispatched before ack/reject/retry on the queue. That's subjective, and either order could, in theory, cause a situation where one fails and so the other doesn't run (e.g. some listener throws an exception, so the retry never happens, or, if we reverse, the retry fails due to a network connection, then the event is never dispatched). Not sure if we need to be thinking about this level of failure. Catching exceptions makes things harder to debug/know about when they go wrong.

thrownewRuntimeException(sprintf('Bus "%s" does not exist.',$busName));
}

if (!$this->retryStrategyLocator->has($receiverName)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Shouldn't we decorate the receiver instead of having this logic in the command & worker then? 🤔

Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

We probably could, butshould we? The code reads really clearly insideWorker, and my thought is sort of that we're setting out the "core" logic that (unless you really want to) everyone gets. At this point, it includes a lot - event dispatching, retry logic (and a stamp being added for this).

So I guess I would say: someone needs to sell hard on the idea of making it a decorator.

Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Or, to say it differently:

  • Do wereally need this?
  • If we do want it, could we only move some parts out (e.g. logging decorator, or event listener, event decorator) and leave others

$AMQPEnvelope =$this->connection->get();
if (null ===$AMQPEnvelope) {
try {
$this->currentAmqpEnvelope =$this->connection->get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I'm not fond of the idea of forcing the receivers to have such local state at all. Can't you have another stamp (non-serialisable), AMQP-specific, that contains this\AMQPMessage? You'd call itAmqpStamp and that's it, you can just get it from theEnvelope, no need of these*currentMessage methods anymore, just "normal" methods taking theEnvelope as an argument.

Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Indeed, I've gone back-and-forth on this. Your transport-specific stamp is a nice idea. Itwould leak this information out to userland, like middleware or event listeners. What do you think about that?

Also, in the latest commit, on retry, I re-send the Envelope for normal encoding/decoding. This means (and is by design) that if a new Envelope was created with new stamps, those stamps will be serialized & sent. It would mean that this AMQP-specific stampwould be serialized & sent. You mentioned "non-serialisable"... do you basically mean: give it a "sleep" method so that if/when we serialize it, it'll just be an empty object (i.e. it won't cause an error).

Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Side note: if we did this and made methods likereject() require theEnvelope, we wouldn't be able to handle theMessageDecodingFailedException in theWorker - catching that and rejecting would need to remain the responsibility of each transport, because there wouldn't be anyEnvelope that theWorker could send back to theReceiver::reject() method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

It would leak this information out to userland, like middleware or event listeners. What do you think about that?

That's a good thing IMHO. This allows users to go deep into customising it.

This means (and is by design) that if a new Envelope was created with new stamps, those stamps will be serialized & sent.

Indeed, hence my (non-serializable) comment. We need a way to make them non-transportable, it's definitely a valid use-case. Thesleep method works but is only about theserialize method... I'd imagine aNonSerializableStampInterface actually.

if we did this and made methods like reject() require the Envelope, we wouldn't be able to handle the MessageDecodingFailedException in the Worker

Technically speaking we could actually create an empty envelope as part of the exception. It would make sense. But for the sake of this pull-request, let's keep it the responsibility of the transport :)

weaverryan reacted with thumbs up emoji
Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I've made the change to use a stamp (AmqpReceivedStamp) and it's really, really nice - good suggestion. However, I also found out that serializing that stamp works just fine. TheAMQPEnvelope inside it is just a simple object that doesn't cause any problems.

So, should we still add thisNonSerializableStampInterface? Serializing theAMQPEnvelope doesn't cause any problems, as we're always looking for$envelope->last(AMQPEnvelop::class). Also, to implement this, each serializer would actually need to create anew envelope with theNonSerializableStampInterface filtered out. Totally doable, but is this something we actually need to add?

@weaverryan
Copy link
MemberAuthor

One last update: "retrying" is no longer a special situation - we basically just callSenderInterface::send() with whatever the delay is, then "ack" the old message. That isbasically what we were doing before, but the difference is that we're now re-serializing the Envelope on re-send, instead of sending a duplicate of the original message. That allows us to add stamps to the Envelope, which isreally powerful because we can, for example, manage theRetryCountStamp in a way where the transport doesn't need to doanything with tracking how many retries have happened.

@weaverryan
Copy link
MemberAuthor

Ready to go again!

Last commits guarantee that redeliveries are only sent back to the same transport.

@Nyholm
Copy link
Member

There are still quite a few mentions of "queue" when we really mean "transport". I think we should try to be more technically correct.

Copy link
Contributor

@srozesroze left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Fabulous. Last thing is the event name I think.

@fabpot can you update your review? 🙏

@weaverryan
Copy link
MemberAuthor

Last changes look good to me. Thank you!

@fabpot
Copy link
Member

@weaverryan Can you squash so that I can merge? Thank you.

Co-authored-by: Samuel ROZE <samuel.roze@gmail.com>
@weaverryan
Copy link
MemberAuthor

Squashed!

@fabpot
Copy link
Member

Thank you@weaverryan.

@fabpotfabpot merged commita989384 intosymfony:masterMar 23, 2019
fabpot added a commit that referenced this pull requestMar 23, 2019
… (weaverryan)This PR was merged into the 4.3-dev branch.Discussion----------[Messenger] Worker events + global retry functionality| Q             | A| ------------- | ---| Branch?       | master| Bug fix?      | no| New feature?  | yes| BC breaks?    | yes, on Messenger only| Deprecations? | no| Tests pass?   | NEEDED| Fixed tickets |#29132,#27008,#27215 and part of#30540| License       | MIT| Doc PR        | TODOThis is an alternative to#29132 and#27008. There are several big things:1) The `messenger:consume` does not die if a handler has an error2) Events are dispatched before, after and on error a message being handled3) Logic is moved out of Amqp and into the Worker so that we can have some consistent features, like error handling.4) A generic retry system was added, which works with Amqp and future transports should support. It will work out of the box for users. Retrying works by putting the received `Envelope` back into the bus, but with the `ReceivedStamp` removed. The retry functionality has an integration test for AMQP.5) Added a new `MessageDecodingFailedException` that transport Serializers should throw if `decode()` fails. It allows us to reject a message in this situation, as allowing it to fail but remain on the queue causes it to be retried forever.6) A new `DelayStamp` was added, which is the first of (later) more stamps for configuring the transport layer (see#30558).BC breaks are documented in the CHANGELOG.Thanks!Commits-------a989384 Adding global retry support, events & more to messenger transport
@weaverryanweaverryan deleted the worker-events branchMarch 23, 2019 14:02
sroze added a commit that referenced this pull requestMar 23, 2019
…nnect() does not work (sroze)This PR was merged into the 4.3-dev branch.Discussion----------[Messenger] Ensure an exception is thrown when the AMQP connect() does not work| Q             | A| ------------- | ---| Branch?       | master| Bug fix?      | yes| New feature?  | no| BC breaks?    | no| Deprecations? | no| Tests pass?   | yes| Fixed tickets |#30557| License       | MIT| Doc PR        | øThis `connectionCredentials` instance escaped the renaming in#30557.Commits-------46b9476 Ensure an exception is thrown when the AMQP connect() does not work
fabpot added a commit that referenced this pull requestMar 23, 2019
…d (weaverryan)This PR was merged into the 4.3-dev branch.Discussion----------Dispatching two events when a message is sent & handled| Q             | A| ------------- | ---| Branch?       | master| Bug fix?      | no| New feature?  | yes| BC breaks?    | no| Deprecations? | no| Tests pass?   | yes| Fixed tickets | none| License       | MIT| Doc PR        | TODOAlternative to#30646. This uses a more generic system, so you could do anything when a message is sent. The main use-case is when a message is dispatched by a 3rd party.I didn't try to add *exhaustive* events everywhere: I added an event for a very specific use-case:When a message is dispatched by a 3rd party, being able to add stamps (e.g. `DelayStamp` or a future `AmqpRoutingKeyStamp` before the message is sent. Example:```phpclass MailerMessageSendToTransportEventSubscriber implements EventSubscriberInterface{    public function onSendMessage(SendMessageToTransportsEvent $event)    {        $envelope = $event->getEnvelope();        if (!$envelope->getMessage() instanceof SomeMailerMessage) {            return;        }        $event->setEnvelope($envelope->with(new AmpqRoutingKeyStamp('mailer-route')));    }    public static function getSubscribedEvents()    {        return [SendMessageToTransportsEvent::class => 'onSendMessage'];    }}```Along with#30557, we will now have the following events, regarding async messages:* Event when a message is sent to transports (this PR)* Event when a message is received from transport, but before handling it* Event when a message is received from transport and after handling itCommits-------a7ad1b4 Dispatching two events when a message is sent & handled
sroze added a commit that referenced this pull requestMar 31, 2019
This PR was merged into the 4.3-dev branch.Discussion----------[Messenger] Add a Doctrine transport| Q             | A| ------------- | ---| Branch?       | master| Bug fix?      | no| New feature?  | yes| BC breaks?    | no| Deprecations? | no| Tests pass?   | yes| Fixed tickets || License       | MIT| Doc PR        |symfony/symfony-docs#10616| DoctrineBundle PR |doctrine/DoctrineBundle#868As discussed with@sroze at PHPForum in Paris I've worked on adding a Doctrine transport to the Messenger component.Actually `AMQP` is the only supported transport and it could be a good thing to support multiple transports. Having a Doctrine transport could help users to start using the component IMHO (Almost all projects use a database).# How it worksThe code is splitted betwwen this PR and the one on the DoctrineBundle :doctrine/DoctrineBundle#868## ConfigurationTo configure a Doctrine transport the dsn MUST have the format `doctrine://<entity_manager_name>` where `<entity_manager_name>` is the name of the entity manager (usually `default`)```yml        # config/packages/messenger.yaml        framework:            messenger:                transports:                    my_transport: "doctrine://default?queue=important"```## Table schemaDispatched messages are stored into a database table with the following schema:| Column       | Type     | Options                  | Description                                                       ||--------------|----------|--------------------------|-------------------------------------------------------------------|| id           | bigint   | AUTO_INCREMENT, NOT NULL | Primary key                                                       || body         | text     | NOT NULL                 | Body of the message                                               || headers      | text     | NOT NULL                 | Headers of the message                                            || queue      | varchar(32)     | NOT NULL                 | Headers of the message                                            || created_at   | datetime | NOT NULL                 | When the message was inserted onto the table. (automatically set) || available_at       | datetime   | NOT NULL                 | When the message is available to be handled                      || delivered_at | datetime | NULL                     | When the message was delivered to a worker                        |## Message dispatchingWhen dispatching a message a new row is inserted into the table. See `Symfony\Component\Messenger\Transport\Doctrine::publish`## Message consumingThe message is retrieved by the `Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver`. It calls the `Symfony\Component\Messenger\Transport\Doctrine::get` method to get the next message to handle.### Getting the next message* Start a transaction* Lock the table to get the first message to handle (The lock is done with the `SELECT ... FOR UPDATE` query)* Update the message in database to update the delivered_at columns* Commit the transaction### Handling the messageThe retrieved message is then passed to the handler. If the message is correctly handled the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::ack` which delete the message from the table.If an error occured the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::nack` method which update the message to set the delivered_at column to `null`.## Message requeueingIt may happen that a message is stuck in `delivered` state but the handler does not really handle the message (Database connection error, server crash, ...). To requeue messages the `DoctrineReceiver` call the `Symfony\Component\Messenger\Transport\Doctrine::requeueMessages`. This method update all the message with a  `delivered_at` not null since more than the "redeliver timeout" (default to 3600 seconds)# TODO- [x] Add tests- [x] Create DOC PR- [x] PR on doctrine-bundle for transport factory- [x] Add a `available_at` column- [x] Add a `queue` column- [x] Implement the retry functionnality : See#30557- [x] Rebase after#29476Commits-------88d008c [Messenger] Add a Doctrine transport
@nicolas-grekasnicolas-grekas modified the milestones:next,4.3Apr 30, 2019
@fabpotfabpot mentioned this pull requestMay 9, 2019
Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment

Reviewers

@javiereguiluzjaviereguiluzjaviereguiluz left review comments

@chalasrchalasrchalasr left review comments

@fabpotfabpotfabpot approved these changes

@nicolas-grekasnicolas-grekasnicolas-grekas approved these changes

@NyholmNyholmNyholm approved these changes

+2 more reviewers

@srozesrozesroze requested changes

@GuikingoneGuikingoneGuikingone left review comments

Reviewers whose approvals may not affect merge requirements

Assignees

No one assigned

Projects

None yet

Milestone

4.3

Development

Successfully merging this pull request may close these issues.

9 participants

@weaverryan@javiereguiluz@fabpot@Nyholm@nicolas-grekas@sroze@chalasr@Guikingone@carsonbot

[8]ページ先頭

©2009-2025 Movatter.jp