Uh oh!
There was an error while loading.Please reload this page.
- Notifications
You must be signed in to change notification settings - Fork9.7k
[Messenger] allow processing messages in batches#43354
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Conversation
e97dbbd toeffa72fComparelyrixx commentedOct 6, 2021
Could you add a full example? |
src/Symfony/Component/Messenger/Handler/BatchHandlerInterface.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
eca72b4 to7892621Comparenicolas-grekas commentedOct 15, 2021 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
This PR is ready, tests included 🎉 |
1e2d542 to9cf1bc7Comparefabpot commentedOct 19, 2021
@lyrixx I think you were very interested in this feature :) |
chalasr commentedOct 19, 2021
This misses a CHANGELOG entry |
nicolas-grekas commentedOct 19, 2021
CHANGELOG added |
chalasr left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
LGTM
yched commentedOct 19, 2021
I'm super interested in this feature as well, the PR looks great, I'm just wondering if it will allow for "grab whatever pending messages exist and process them in a batch (N max, but don't wait for more if there are less than N)" ? Will the --time-limit worker option trigger a batch flush ? |
638d008 tofe89ec0Comparenicolas-grekas commentedOct 22, 2021
PR updated to inform handlers if flushing is called while the worker is idle (in which case the handler can decide not to flush) or if the flush happens while the worker is stopping. |
lyrixx left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I let some comments, but I did not finished to review
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
| $result =null !==$ack ?0 :null; | ||
| $this->jobs[] = [$this->schedule($message),$ack ??$ack =newAcknowledger(get_debug_type($this))]; | ||
| if (null !==$result && !$this->shouldFlush()) { | ||
| return\count($this->jobs); | ||
| } | ||
| $this->flush(true); | ||
| return$result ??$ack->getResult(); |
lyrixxOct 23, 2021 • edited by nicolas-grekas
Loading Uh oh!
There was an error while loading.Please reload this page.
edited by nicolas-grekas
Uh oh!
There was an error while loading.Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Something like is more readable
| $result =null !==$ack ?0 :null; | |
| $this->jobs[] = [$this->schedule($message),$ack ??$ack =newAcknowledger(get_debug_type($this))]; | |
| if (null !==$result && !$this->shouldFlush()) { | |
| return\count($this->jobs); | |
| } | |
| $this->flush(true); | |
| return$result ??$ack->getResult(); | |
| if ($ack ===null) { | |
| $ack =newAcknowledger(get_debug_type($this)) | |
| $this->jobs[] = [$this->schedule($message),$ack]; | |
| $this->flush(true); | |
| return$ack->getResult(); | |
| } | |
| $this->jobs[] = [$this->schedule($message),$ack]; | |
| if (!$this->shouldFlush()) { | |
| return\count($this->jobs); | |
| } | |
| $this->flush(true); | |
| return0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
BTW, the code when$ack === null seems broken: You create a newAcknowledger, so the$ack argument in it is null, so it's an empty callable, so it never reallyack on the transport?
I think this part can be simplified
nicolas-grekasOct 25, 2021 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Patch applied thanks.
This logic is the answer to your question:
as soon as I implement the interface, the handler is forced to be used in batch mode?
when$ack isnull,HandleMessageMiddleware will not add theNoAutoAckStamp, and the worker will ack as usual. The batch handler is thus compatible with non-batch mode.
src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
lyrixx commentedOct 23, 2021
One question: as soon as I implement the interface, the handler is forced to be used in batch mode? |
e6789e9 toce13bc6Compare
lyrixx left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
This is nice, But I let some comments for the public API.
Then, we could merge it, and I'll have to play with it on a real project to see how it behaves.
thanks for the hard work
| $ack =newAcknowledger(get_debug_type($this)); | ||
| $this->jobs[] = [$this->schedule($message),$ack]; | ||
| $this->flush(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I still want to simplify this PR and I think something could be done here.
This trait is namedBatchHandlerTrait so its purpose is to handle batches of messages. I understand that you want to preserve the contact in theHandlerInterface. But this very method must not allow to pass$ack=null.
So basically, If one wants to use the handler in a batch way and also in a non batch way (which is not possible BTW), it does not make sens when$ack === null (in the$handler::__invoke()) to callhandle().
So this trait must throw an exception when $ack is null. And it's up to the developer to check if$ack is null or not, and if it's null they must not call this method.
Finally, all cases where$ack === null could be removed (here, and inAcknowledger).
- the code will be simpler, and easier to read / lean
- the contract of
HandlerInterface::__invoke($message)(where$ack = null) is preserved - I'm happy
And by doing so, we can renamed handle() (which is very generic and not really intuitive) toschedule(). (see my comment on the current schedule method).
I really like the following code / API:
class Handlerimplements BatchHandlerInterace{publicfunction__invoke(Message$message,$ack) {if ($ack) {return$this->schedule(newFoobar($message),$ack); }// sync processing }}
Don't you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I don't agree with throwing. The handler should be usable in non-batch mode per LSP and throwing would break that. The current design makes it a no brainer: implementprocess() and no need to care about this concern. Your proposal would require ppl to write extra logic if they want to follow LSP. But LSP is not an option to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I don't agree with throwing. The handler should be usable in non-batch mode per LSP and throwing would break that.
Yes, I totally agree with you on that point
The current design makes it a no brainer
I disagree with you. You may think it's easy, but people read code under the hood. and ATM, the code is not simple.
Your proposal would require ppl to write extra logic
Yes one more line of code. Explicit is better than implicit
But LSP is not an option to me.
I don't understand why LSP may be broken. Can you elaborate?
nicolas-grekasOct 28, 2021 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
The boilerplate to implement both modes is in the trait. That's how it should be. Not doing so means requiring every consumer of the trait to add the same boring logic. That'd fail the purpose of the trait, and more critically, that'll break LSP when ppl won't implement the "else" in your snippet. Since we know ppl are lazy, or make mistakes, this is a recipe for more bugs.
About LSP, the principle is not about php interfaces but about abstractions that should be swappable. If an implementation fails to implement non-batch mode, eg by throwing, then it's not swappable and LSP is broken.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
nicolas-grekas commentedOct 28, 2021
@lyrixx thanks for the review, I should have addressed your comments. |
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
fabpot commentedOct 30, 2021
Thank you@nicolas-grekas. |
laticrete-pimcore commentedMay 12, 2025
How does nack'ing work with |
Uh oh!
There was an error while loading.Please reload this page.
This replaces#42873 as it proposes an alternative approach to handling messages in batch.
BatchHandlerInterfacesays it all: if a handler implements this interface, then it should expect a new$ackoptional argument to be provided when__invoke()is called. When$ackis not provided,__invoke()is expected to handle the message synchronously as usual. But when$ackis provided,__invoke()is expected to buffer the message and its$ackfunction, and to return the number of pending messages in the batch.Batch handlers are responsible for deciding when they flush their buffers, calling the
$ackfunctions while doing so.Best reviewedignoring whitespaces.
Here is what a batch handler might look like:
The size of the batch is controlled by
BatchHandlerTrait::shouldFlush()(defaults to 10).The transport is acknowledged in batch,after the bus returned from dispatching (unlike what is done in#42873). This is especially important when considering transactions since we don't want to ack unless the transaction committed successfully.
By default, pending batches are flushed when the worker is idle and when it stops.