Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

[WIP][Messenger] Multiple queue support + prioritization#30699

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Closed

Conversation

@weaverryan
Copy link
Member

QA
Branch?master
Bug fix?no
New feature?yes
BC breaks?WIP
Deprecations?no
Tests pass?not yet
Fixed ticketsPart of#30558
LicenseMIT
Doc PRTODO

Messenger is currently missing an idea of "prioritizing" messages on the queue. Inspired by Laravel, this adds the ability to add different messages to different "queues" and then process those queues in order.

First, put your messages into whatever "queues" you want, which sort of act like "categories":

$bus->dispatch(newEnvelope($message),newQueueNameStamp('low')));$bus->dispatch(newEnvelope($message),newQueueNameStamp('high')));

And when consuming, say which queues you want to consume and in which order:

./bin/console messenger:consume -vv --queues=high,low

All messages will be processed fromhigh before going tolow. I chose this approach, versus just putting everything in one queue with a priority (also possible) because it allows you to create multiple workers, each which work on only 1 (or several) queues. For example, you could have 5 workers working on thehigh queue and one working onlow.

TODO:

  • Tests
  • Implementation in Worker & Receiver is a mess

vudaltsov and yceruto reacted with thumbs up emoji
/**
* @author Ryan Weaver <ryan@symfonycasts.com>
*/
class QueueNameStamp implements StampInterface
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

What about defining it asfinal? 🤔

@nicolas-grekas
Copy link
Member

nicolas-grekas commentedMar 26, 2019
edited
Loading

Does the definition of the "queue" belong to the place that calls dispatch (ie user code)
or does it belong to the routing layer?
Shouldn't this be handled by using virtual transports instead? Why "queue" btw?

@weaverryan
Copy link
MemberAuthor

Does the definition of the "queue" belong to the place that calls dispatch (ie user code)
or does it belong to the routing layer?

That's a fair question. The message->transport routing is a natural fit for config, because you're using keys that are only defined in config - e.g. "send to theamqp transport -amqp is defined in config, so it fits nicely to "route" itfrom config (it would look a little funny if you were to mention this string in a controller or service). But, with queues, these aren't defined anywhere - you can just "invent them" at the time you dispatch - and whatever transport the message goes to will handle that. But, I'm not absolutely convinced one way or another.

Shouldn't this be handled by using virtual transports instead?

This is certainly a possibility - a "virtual" transport that basically "adds the queue name metadata to the message and forwards to the real transport".

Overall, it's a matter of:
A) Configuration at dispatch time via stamp
vs
B) Configuration in config via routing

The first gives nice, clear control per message. But it's harder to apply a stamp automatically to all classes that implement some interface or to messages dispatched by a 3rd party (you'd need an event listener). The second would require a new concept (virtual transports) and puts more config into YAML... which I don't love ;).

Why "queue" btw?

I'm not an AMQP expert, but AMQP has queues, and that's literally what you're doing here: putting each message into a different AMQP queue, then consuming on a queue-by-queue basis. For DoctrineTransport, it's actually just a column on the table (queue). But using the "standard" name from AMQP as the standard name/concept I think makes sense.

@nicolas-grekas
Copy link
Member

The second would require a new concept (virtual transports)

This PR is already a new concept - the stamp + the "queue" concept.
A virtual transport is less of a new concept as its ... a transport :)
But the real question is: where belongs the responsibility of deciding the queue/transport?
I don't think the dispatch point is the correct place - exactly same as deciding the transport doesn't belong there.
There is senderLocator - that's where this routing belongs, no need for a new concept here - a variation of the existing one fits to me.

@nicolas-grekasnicolas-grekas added this to thenext milestoneMar 26, 2019
@sroze
Copy link
Contributor

But, with queues, these aren't defined anywhere - you can just "invent them" at the time you dispatch

That's for AMQP-only. If you use Google Pub/Sub, Amazon SQS and so on, I believe you need to define them upfront.

where belongs the responsibility of deciding the queue/transport?
I don't think the dispatch point is the correct place

I agree this this.

I don't believe we need any new concept for this. Not even a "virtual transport". Why can't they just be to different transports, namedhigh_priority_amqp andlow_priority_amqp? The "only" thing we need to add is the ability for the Worker to consume messages from multiple transports at a time.

@weaverryan
Copy link
MemberAuthor

I think@sroze is right. I'm closing this PR in favor of#30708, which I can further enhance to allow for handling multiple transports inside a single Worker.

@weaverryanweaverryan deleted the queue-name-stamp branchMarch 27, 2019 12:32
fabpot added a commit that referenced this pull requestMar 30, 2019
…ker with prioritized transports (weaverryan)This PR was squashed before being merged into the 4.3-dev branch (closes#30708).Discussion----------[Messenger] ReceiverInterface::handle() to get() & Worker with prioritized transports| Q             | A| ------------- | ---| Branch?       | master| Bug fix?      | no| New feature?  | yes| BC breaks?    | no| Deprecations? | no| Tests pass?   | yes| Fixed tickets | Helps with#30699| License       | MIT| Doc PR        | TODOHighlights:* `messenger:consume` can now consume messages from multiple transports with priority ❗️```bin/console messenger:consume amqp_high amqp_medium amqp_low```* How long you want to sleep before checking more messages is now an option to `messenger:consume`* `ReceiverInterface::receive()` is replaced with `ReceiverInterface::get()`* Logic for looping & sleeping is moved into `Worker`Commits-------e800bd5 [Messenger] ReceiverInterface::handle() to get() & Worker with prioritized transports
@nicolas-grekasnicolas-grekas modified the milestones:next,4.3Apr 30, 2019
Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment

Reviewers

@srozesrozeAwaiting requested review from sroze

1 more reviewer

@GuikingoneGuikingoneGuikingone left review comments

Reviewers whose approvals may not affect merge requirements

Assignees

No one assigned

Projects

None yet

Milestone

4.3

Development

Successfully merging this pull request may close these issues.

5 participants

@weaverryan@nicolas-grekas@sroze@Guikingone@carsonbot

[8]ページ先頭

©2009-2025 Movatter.jp