Uh oh!
There was an error while loading.Please reload this page.
- Notifications
You must be signed in to change notification settings - Fork9.6k
[Messenger] implementation ofmessenger:consume
, which processes messages concurrently#53964
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
base:7.4
Are you sure you want to change the base?
Conversation
ea02a2e
to8cf4f65
CompareUh oh!
There was an error while loading.Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
quick review, I like it
src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
8b8e52c
to66200db
CompareUh oh!
There was an error while loading.Please reload this page.
c1d082a
to89b22b9
CompareUh oh!
There was an error while loading.Please reload this page.
src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
89b22b9
to885c278
CompareThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Nicely done! Just a few questions/comments.
src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
8fac51d
tof35b5ad
CompareUh oh!
There was an error while loading.Please reload this page.
src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
e6b8863
to5d6bae6
Comparef976458
to52e9176
Compare52e9176
to522a316
Compare@@ -282,4 +329,37 @@ public function getMetadata(): WorkerMetadata | |||
{ | |||
return $this->metadata; | |||
} | |||
public function handleFutures(string $transportName, $parallelProcessLimit): void |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
$parallelProcessLimit
seems unused
IndraGunawan commentedOct 4, 2024 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
@IndraGunawan I'd love to, but I can't decide that on my own 😅 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
when conflicts will be resolved
It's relevant. BatchHandler will handle processing data synchronously. If I understood well, do you suggest it miss async capabilities? Somehow updating BatchHandler to make it async aware or declaring another BatchAsyncHandler trait could allow processing batch messaging with async mechanisms. |
Uh oh!
There was an error while loading.Please reload this page.
The purpose of this PR is to enable concurrent handling of Symfony Messenger messages. This PR utilizes the amphp/parallel library to achieve this. During the worker configuration, it is possible to define how many processes should be batched in parallel before flushing, meaning accessing the result, acknowledging, or initiating executions in case of errors.
In the child process, a container is cached. Therefore, in each child process, it will be possible to inject services and make requests, etc.
In this initial use case, the decision was made not to reuse the parent connection, such as the Doctrine connection. This is because it could currently be inconvenient for users, as they would need to modify their handlers, which could be cumbersome.
Even without reusing the parent connection, there is a performance gain since operations are performed concurrently for x number of processes. Therefore, even if there is a handler with blocking code, it does not prevent other child processes from proceeding.
for example, in the case we have 2 handlers and one of the handlers has a 4-second pause, and we have, for instance, 40 messages processed concurrently with a batch size of 10 using the ParallelBus, it's approximately 6 times faster between handling the first message and the last message (This observation was made during a test conducted in our development environment)
In order to dispatch messages concurrently, it is necessary to first consider the ParallelMessageBus:
! It works for async mode - It doesn't work with BatchHandler Trait !
Async mode:
You can specify how many processes will be executed in parallel.
It will define how many processes will be launched - threaded to then complete them one by one and get the return of the message processing
By default: 10
TODO:
This feature is being developed by@coopTilleuls and@TradersPost and it has been designed with@jwage and@dunglas