Uh oh!
There was an error while loading.Please reload this page.
- Notifications
You must be signed in to change notification settings - Fork9.7k
[Messenger] Worker events + global retry functionality#30557
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Conversation
weaverryan commentedMar 15, 2019
I've now incorporated generic retry abilities from#27008. I think more details need to be considered, like#30558 (transport stamp) and how it will affect things. But, generally speaking, what big issues do people see? I'm pushing much more control into the |
src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
| */ | ||
| class Connection | ||
| { | ||
| publicconstATTEMPT_COUNT_HEADER_NAME ='symfony-messenger-attempts'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
AMQP usesx-death when a message is dead-letter-ed. Maybe we can reuse the same? (not sure if it conflicts though, would need to try).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Hmm. I don't know much about this, but I don't think it'll work. Each time we retry, we're re-using the headers from the existing message. This has a nice effect that thex-death header (which is an array) will have 0 items, then 1 item, then 2 items, etc - it'll increase with each "death". The "newest" death apparently (i've just tested) always becomes the0 key - the others are "pushed back". If we try to insert an entry intox-death, it just looks like there was a "previous" death, and Rabbit pushes a new item onto the 0 index and our entry is pushed back to 0.
Itseems like we shouldn't be setting values ontox-death.
src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
weaverryan commentedMar 18, 2019 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
Thank you@sroze for the review! I was working concurrently with your review, so I pushed a bunch of changes as you were commenting. I would love another look. I've replied to a few of your comments with the changes I've made. Specifically, the I also made the retry stuff configurable on a transport-by-transport basis using a service. Also, question: should we ever redeliver via Thanks! |
| * | ||
| * @throws TransportException If there is an issue communicating with the transport | ||
| */ | ||
| publicfunctionretryCurrentMessage(int$delay):void; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
These all rely on being called from inside thereceive() loop, because that keeps temporary "state" about which message is currently being handled. That's a key change here, which simplifies a lot, but which I want to make sure won't cause issues.
weaverryan commentedMar 18, 2019
This is ready for review! This represents a big change in how we handle the transports, so I really appreciate review! |
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
| $this->dispatchEvent( | ||
| WorkerMessageFailedEvent::class, | ||
| newWorkerMessageFailedEvent($envelope,$this->receiverName,$throwable,$shouldRequeue) | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
The events are currently dispatched before ack/reject/retry on the queue. That's subjective, and either order could, in theory, cause a situation where one fails and so the other doesn't run (e.g. some listener throws an exception, so the retry never happens, or, if we reverse, the retry fails due to a network connection, then the event is never dispatched). Not sure if we need to be thinking about this level of failure. Catching exceptions makes things harder to debug/know about when they go wrong.
src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
| thrownewRuntimeException(sprintf('Bus "%s" does not exist.',$busName)); | ||
| } | ||
| if (!$this->retryStrategyLocator->has($receiverName)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Shouldn't we decorate the receiver instead of having this logic in the command & worker then? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
We probably could, butshould we? The code reads really clearly insideWorker, and my thought is sort of that we're setting out the "core" logic that (unless you really want to) everyone gets. At this point, it includes a lot - event dispatching, retry logic (and a stamp being added for this).
So I guess I would say: someone needs to sell hard on the idea of making it a decorator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Or, to say it differently:
- Do wereally need this?
- If we do want it, could we only move some parts out (e.g. logging decorator, or event listener, event decorator) and leave others
Uh oh!
There was an error while loading.Please reload this page.
| $AMQPEnvelope =$this->connection->get(); | ||
| if (null ===$AMQPEnvelope) { | ||
| try { | ||
| $this->currentAmqpEnvelope =$this->connection->get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I'm not fond of the idea of forcing the receivers to have such local state at all. Can't you have another stamp (non-serialisable), AMQP-specific, that contains this\AMQPMessage? You'd call itAmqpStamp and that's it, you can just get it from theEnvelope, no need of these*currentMessage methods anymore, just "normal" methods taking theEnvelope as an argument.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Indeed, I've gone back-and-forth on this. Your transport-specific stamp is a nice idea. Itwould leak this information out to userland, like middleware or event listeners. What do you think about that?
Also, in the latest commit, on retry, I re-send the Envelope for normal encoding/decoding. This means (and is by design) that if a new Envelope was created with new stamps, those stamps will be serialized & sent. It would mean that this AMQP-specific stampwould be serialized & sent. You mentioned "non-serialisable"... do you basically mean: give it a "sleep" method so that if/when we serialize it, it'll just be an empty object (i.e. it won't cause an error).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Side note: if we did this and made methods likereject() require theEnvelope, we wouldn't be able to handle theMessageDecodingFailedException in theWorker - catching that and rejecting would need to remain the responsibility of each transport, because there wouldn't be anyEnvelope that theWorker could send back to theReceiver::reject() method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
It would leak this information out to userland, like middleware or event listeners. What do you think about that?
That's a good thing IMHO. This allows users to go deep into customising it.
This means (and is by design) that if a new Envelope was created with new stamps, those stamps will be serialized & sent.
Indeed, hence my (non-serializable) comment. We need a way to make them non-transportable, it's definitely a valid use-case. Thesleep method works but is only about theserialize method... I'd imagine aNonSerializableStampInterface actually.
if we did this and made methods like reject() require the Envelope, we wouldn't be able to handle the MessageDecodingFailedException in the Worker
Technically speaking we could actually create an empty envelope as part of the exception. It would make sense. But for the sake of this pull-request, let's keep it the responsibility of the transport :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I've made the change to use a stamp (AmqpReceivedStamp) and it's really, really nice - good suggestion. However, I also found out that serializing that stamp works just fine. TheAMQPEnvelope inside it is just a simple object that doesn't cause any problems.
So, should we still add thisNonSerializableStampInterface? Serializing theAMQPEnvelope doesn't cause any problems, as we're always looking for$envelope->last(AMQPEnvelop::class). Also, to implement this, each serializer would actually need to create anew envelope with theNonSerializableStampInterface filtered out. Totally doable, but is this something we actually need to add?
src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
weaverryan commentedMar 19, 2019
One last update: "retrying" is no longer a special situation - we basically just call |
weaverryan commentedMar 22, 2019
Ready to go again! Last commits guarantee that redeliveries are only sent back to the same transport. |
Nyholm commentedMar 22, 2019
There are still quite a few mentions of "queue" when we really mean "transport". I think we should try to be more technically correct. |
sroze left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Fabulous. Last thing is the event name I think.
@fabpot can you update your review? 🙏
src/Symfony/Component/Messenger/Event/WorkerMessageHandlingEvent.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
weaverryan commentedMar 23, 2019
Last changes look good to me. Thank you! |
fabpot commentedMar 23, 2019
@weaverryan Can you squash so that I can merge? Thank you. |
Co-authored-by: Samuel ROZE <samuel.roze@gmail.com>
weaverryan commentedMar 23, 2019
Squashed! |
fabpot commentedMar 23, 2019
Thank you@weaverryan. |
… (weaverryan)This PR was merged into the 4.3-dev branch.Discussion----------[Messenger] Worker events + global retry functionality| Q | A| ------------- | ---| Branch? | master| Bug fix? | no| New feature? | yes| BC breaks? | yes, on Messenger only| Deprecations? | no| Tests pass? | NEEDED| Fixed tickets |#29132,#27008,#27215 and part of#30540| License | MIT| Doc PR | TODOThis is an alternative to#29132 and#27008. There are several big things:1) The `messenger:consume` does not die if a handler has an error2) Events are dispatched before, after and on error a message being handled3) Logic is moved out of Amqp and into the Worker so that we can have some consistent features, like error handling.4) A generic retry system was added, which works with Amqp and future transports should support. It will work out of the box for users. Retrying works by putting the received `Envelope` back into the bus, but with the `ReceivedStamp` removed. The retry functionality has an integration test for AMQP.5) Added a new `MessageDecodingFailedException` that transport Serializers should throw if `decode()` fails. It allows us to reject a message in this situation, as allowing it to fail but remain on the queue causes it to be retried forever.6) A new `DelayStamp` was added, which is the first of (later) more stamps for configuring the transport layer (see#30558).BC breaks are documented in the CHANGELOG.Thanks!Commits-------a989384 Adding global retry support, events & more to messenger transport
…nnect() does not work (sroze)This PR was merged into the 4.3-dev branch.Discussion----------[Messenger] Ensure an exception is thrown when the AMQP connect() does not work| Q | A| ------------- | ---| Branch? | master| Bug fix? | yes| New feature? | no| BC breaks? | no| Deprecations? | no| Tests pass? | yes| Fixed tickets |#30557| License | MIT| Doc PR | øThis `connectionCredentials` instance escaped the renaming in#30557.Commits-------46b9476 Ensure an exception is thrown when the AMQP connect() does not work
…d (weaverryan)This PR was merged into the 4.3-dev branch.Discussion----------Dispatching two events when a message is sent & handled| Q | A| ------------- | ---| Branch? | master| Bug fix? | no| New feature? | yes| BC breaks? | no| Deprecations? | no| Tests pass? | yes| Fixed tickets | none| License | MIT| Doc PR | TODOAlternative to#30646. This uses a more generic system, so you could do anything when a message is sent. The main use-case is when a message is dispatched by a 3rd party.I didn't try to add *exhaustive* events everywhere: I added an event for a very specific use-case:When a message is dispatched by a 3rd party, being able to add stamps (e.g. `DelayStamp` or a future `AmqpRoutingKeyStamp` before the message is sent. Example:```phpclass MailerMessageSendToTransportEventSubscriber implements EventSubscriberInterface{ public function onSendMessage(SendMessageToTransportsEvent $event) { $envelope = $event->getEnvelope(); if (!$envelope->getMessage() instanceof SomeMailerMessage) { return; } $event->setEnvelope($envelope->with(new AmpqRoutingKeyStamp('mailer-route'))); } public static function getSubscribedEvents() { return [SendMessageToTransportsEvent::class => 'onSendMessage']; }}```Along with#30557, we will now have the following events, regarding async messages:* Event when a message is sent to transports (this PR)* Event when a message is received from transport, but before handling it* Event when a message is received from transport and after handling itCommits-------a7ad1b4 Dispatching two events when a message is sent & handled
This PR was merged into the 4.3-dev branch.Discussion----------[Messenger] Add a Doctrine transport| Q | A| ------------- | ---| Branch? | master| Bug fix? | no| New feature? | yes| BC breaks? | no| Deprecations? | no| Tests pass? | yes| Fixed tickets || License | MIT| Doc PR |symfony/symfony-docs#10616| DoctrineBundle PR |doctrine/DoctrineBundle#868As discussed with@sroze at PHPForum in Paris I've worked on adding a Doctrine transport to the Messenger component.Actually `AMQP` is the only supported transport and it could be a good thing to support multiple transports. Having a Doctrine transport could help users to start using the component IMHO (Almost all projects use a database).# How it worksThe code is splitted betwwen this PR and the one on the DoctrineBundle :doctrine/DoctrineBundle#868## ConfigurationTo configure a Doctrine transport the dsn MUST have the format `doctrine://<entity_manager_name>` where `<entity_manager_name>` is the name of the entity manager (usually `default`)```yml # config/packages/messenger.yaml framework: messenger: transports: my_transport: "doctrine://default?queue=important"```## Table schemaDispatched messages are stored into a database table with the following schema:| Column | Type | Options | Description ||--------------|----------|--------------------------|-------------------------------------------------------------------|| id | bigint | AUTO_INCREMENT, NOT NULL | Primary key || body | text | NOT NULL | Body of the message || headers | text | NOT NULL | Headers of the message || queue | varchar(32) | NOT NULL | Headers of the message || created_at | datetime | NOT NULL | When the message was inserted onto the table. (automatically set) || available_at | datetime | NOT NULL | When the message is available to be handled || delivered_at | datetime | NULL | When the message was delivered to a worker |## Message dispatchingWhen dispatching a message a new row is inserted into the table. See `Symfony\Component\Messenger\Transport\Doctrine::publish`## Message consumingThe message is retrieved by the `Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver`. It calls the `Symfony\Component\Messenger\Transport\Doctrine::get` method to get the next message to handle.### Getting the next message* Start a transaction* Lock the table to get the first message to handle (The lock is done with the `SELECT ... FOR UPDATE` query)* Update the message in database to update the delivered_at columns* Commit the transaction### Handling the messageThe retrieved message is then passed to the handler. If the message is correctly handled the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::ack` which delete the message from the table.If an error occured the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::nack` method which update the message to set the delivered_at column to `null`.## Message requeueingIt may happen that a message is stuck in `delivered` state but the handler does not really handle the message (Database connection error, server crash, ...). To requeue messages the `DoctrineReceiver` call the `Symfony\Component\Messenger\Transport\Doctrine::requeueMessages`. This method update all the message with a `delivered_at` not null since more than the "redeliver timeout" (default to 3600 seconds)# TODO- [x] Add tests- [x] Create DOC PR- [x] PR on doctrine-bundle for transport factory- [x] Add a `available_at` column- [x] Add a `queue` column- [x] Implement the retry functionnality : See#30557- [x] Rebase after#29476Commits-------88d008c [Messenger] Add a Doctrine transport
Uh oh!
There was an error while loading.Please reload this page.
This is an alternative to#29132 and#27008. There are several big things:
messenger:consumedoes not die if a handler has an errorIt will work out of the box for users. Retrying works by putting the received
Envelopeback into the bus, but with theReceivedStampremoved. The retry functionality has an integration test for AMQP.MessageDecodingFailedExceptionthat transport Serializers should throw ifdecode()fails. It allows us to reject a message in this situation, as allowing it to fail but remain on the queue causes it to be retried forever.DelayStampwas added, which is the first of (later) more stamps for configuring the transport layer (see[Messenger][RFC] Generic TransportConfig stamp #30558).BC breaks are documented in the CHANGELOG.
Thanks!