Uh oh!
There was an error while loading.Please reload this page.
- Notifications
You must be signed in to change notification settings - Fork9.7k
[Messenger] Add a new Messenger component#24411
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Conversation
| */ | ||
| publicfunctionhandle($message) | ||
| { | ||
| call_user_func($this->callableForNextMiddleware(0),$message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
return missing
everzet left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I'd work on naming a bit, Sam :)
Otherwise - 👍
| */ | ||
| private$producerForMessageResolver; | ||
| publicfunction__construct(ProducerForMessageResolverInterface$producerForMessageResolver) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
MessageSenderResolverInterface
| /** | ||
| * @author Samuel Roze <samuel.roze@gmail.com> | ||
| */ | ||
| class SendMessageToProducersMiddlewareimplements MessageBusMiddlewareInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
MessageSendingMiddleware
| { | ||
| if ($messageinstanceof ConsumedMessage) { | ||
| $message =$message->getMessage(); | ||
| }elseif (!empty($producers =$this->producerForMessageResolver->getProducersForMessage($message))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
getProducersForMessage() =>getSender()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
As I guess we'll renameProducer toSender I see where you are going. But you replace plural by singular here and that's an interesting place for me to explain why it's plural.
At the beginning it was returningProducerInterface|null and I had a collection of producer that was dispatching to a set of producers to handle this plural. But there's one use-case that didn't work:
When I dispatch a message "Foo" in the message bus
Then I want the message to be sent to the producer "FooProducer"
And I want the message to be handled by the handler
i.e. I want to send the message tosomethingand I want to call the local handler. This can be useful for audit purposes, duplication, etc... To make this use case happening,you just need to have to use anull handler. (See thenote in `README.md)
Therefore, I'd saygetProducersForMessage() =>getSenders()
| * | ||
| * @author Samuel Roze <samuel.roze@gmail.com> | ||
| */ | ||
| class CollectionOfMessageHandlers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
MessageHandlers orMessageHandlerCollection for consistency with other names
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Let's go forMessageHandlerCollection.
| * @author Samuel Roze <samuel.roze@gmail.com> | ||
| * @author Matthias Noback <matthiasnoback@gmail.com> | ||
| */ | ||
| class MessageBusimplements MessageBusInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
MiddleawareMessageBus :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
if we took that the interface is calledMessageBusMiddlewareInterface then it should beMessageBusMiddleware which is more accurate
| /** | ||
| * @author Samuel Roze <samuel.roze@gmail.com> | ||
| */ | ||
| interface MessageConsumerInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
ConsumerInterface
| /** | ||
| * @author Samuel Roze <samuel.roze@gmail.com> | ||
| */ | ||
| class MessageHandlerResolverimplements MessageHandlerResolverInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
MappedHandlerResolver
| /** | ||
| * @author Samuel Roze <samuel.roze@gmail.com> | ||
| */ | ||
| interface MessageHandlerResolverInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
HandlerResolverInterface
| /** | ||
| * @author Samuel Roze <samuel.roze@gmail.com> | ||
| */ | ||
| interface MessageProducerInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
ProducerInterface
| /** | ||
| * @author Samuel Roze <samuel.roze@gmail.com> | ||
| */ | ||
| class CallMessageHandlerMiddlewareimplements MessageBusMiddlewareInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
HandlerMiddleware
| protectedfunctionexecute(InputInterface$input,OutputInterface$output) | ||
| { | ||
| /** @var ContainerInterface $container */ | ||
| $container =$this->getApplication()->getKernel()->getContainer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Can we use service locators instead of container?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
@Koc the point here is that the consumer - i.e. the service name - can be given as an argument. Not sure how would the service locator plays with that (as it needs to know the dependencies when constructed AFAIK)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Require tags on consumers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Correct, we can require the receivers (note the name change) to be tagged, so we can use service locators 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Actually, I don't see a real point of using service locators in here. It forces us to create a new compiler pass and have another set of tags... while just using the container (as in many other commands in the FrameworkBundle) is the simplest option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
It would be nice to promote the good practice of not injecting the container in our own code. A service locator + autoconfiguration looks better to me, especially because you have aReceiverInterface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Auto-configuration would be nice if we also define a generic folder (such assrc/MessageReceiver). Without, it feels a bit too much work for not much benefits, as I explained in theprevious comment.But obviously, if the general feedback is that we need this, we can add it (even later hehe :))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Fetching the receiver from the container has one big drawback: it must be public. I would say go for an autoconfigured tag + scoped receiver locator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Fair enough, updated to use service locators 👍
| * | ||
| * @param object $message | ||
| * | ||
| * @return mixed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Normally a command bus should return void, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
And that's the big point here: that's not "just" a command bus but it is a message bus. This can also be used for queries in a CQRS-type architecture.
Note that if you want to enforce such limitation for commands, you can very easily create your own instance of theMessageBus and have a special "noop returns" (or "EnforceCommandsDoNotReturnAnything") middleware that would force returnnull for example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Ok, thank for explanations.
| */ | ||
| interface MessageConsumerInterface | ||
| { | ||
| publicfunctionconsume():\Generator; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
If I understand this method creates a collection of messages? So you don't consume messages but you create them, right? Perhaps this is not the good wording. Plus, it is strange to use a verb for a method which builds something it is better to use a word likemessages()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I'm really not found of themessages() suggestion but agree the naming can be better. What aboutFetcherInterface that has afetch(): \Generator method? Because these things are fetching messages from various sources...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
OrFetchedMessage::messages() :) butFetcher::fetch() is more clear than the previous proposal.
| * | ||
| * @param object $message | ||
| */ | ||
| publicfunctionproduce($message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
: void ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I'm not sure we should enforce this for now and leave the door open for results. i.e. I know that some brokers like Google Pub/Sub can return things like the "message ID" that is quite useful for logging/debugging purposes sometimes - so I can expect to see people logging these.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Or one can pass this directly to the e.g frontend, for it to check periodically if this is already processed. and you don't need to predefine the message id and store it to application db.
| 3.`CallMessageHandlerMiddleware` (call the registered handle) | ||
| ```php | ||
| $result = $this->get('message_bus')->dispatch(new MyMessage(/* ... */)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I think the bus handles message: dispatch -> handle
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I'm not against this change; waiting to see more opinions on this :)
| return$this->messageToProducerMapping['*']; | ||
| } | ||
| returnarray(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
[]
| $handlerResolver->replaceArgument(0,$this->findHandlers($container)); | ||
| } | ||
| privatefunctionfindHandlers(ContainerBuilder$container) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
missing return type hint
| * | ||
| * @author Samuel Roze <samuel.roze@gmail.com> | ||
| */ | ||
| interface ExceptionInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
is there any value to add an empty interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
That's a good question; but it's a constant in every Symfony Component so I didn't want to start questioning this one 😉
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Using this interface, allows you to catch all the exceptions that come from this component.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
ok thanks!
mvrhov commentedDec 1, 2017
Now that 4.0 is out the door, can we get some more input from the symfony core team. |
| /** | ||
| * {@inheritdoc} | ||
| */ | ||
| publicfunctiongetProducersForMessage($message):array |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
could this method be refactored as follows?
return$this->messageToProducerMapping[get_class($message)] ??$this->messageToProducerMapping['*'] ??array();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Good point 👍
| private$messageBusService; | ||
| private$middlewareTag; | ||
| publicfunction__construct( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
In Symfony, the arguments are always put on the same line, no matter how long they are.
| { | ||
| $handlersByMessage =array(); | ||
| foreach ($container->findTaggedServiceIds($this->handlerTag,true)as$serviceId =>$tags) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Can't we use aSplPriorityQueue here instead of manually dealing with priorities? We use it here for example:
symfony/src/Symfony/Component/DependencyInjection/Compiler/DecoratorServicePass.php
Line 28 inbf4b09f
| $definitions =new \SplPriorityQueue(); |
sroze left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I'll rename the following things in this pull-request, following a number of valid comments regarding the names.
- Producer -> Sender
- Consumer -> Receiver
| protectedfunctionexecute(InputInterface$input,OutputInterface$output) | ||
| { | ||
| /** @var ContainerInterface $container */ | ||
| $container =$this->getApplication()->getKernel()->getContainer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
@Koc the point here is that the consumer - i.e. the service name - can be given as an argument. Not sure how would the service locator plays with that (as it needs to know the dependencies when constructed AFAIK)
1202153 toba18c84CompareKoc commentedDec 2, 2017
@sroze can we tag all consumers with tag |
mvrhov commentedDec 2, 2017
|
Koc commentedDec 2, 2017
It depends on accepted terminology. Maybe even without dot separatorm like already proposed in this PR |
sroze commentedDec 3, 2017
@Koc do you mean the message handlers or message receivers? Following the reviews, I hadto rename them. Check out theconcepts documentation to clarify. |
mvrhov commentedDec 23, 2017 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
I've converted a small project from SimpleBus to Messaging component.
I still cannot get rid of the SimpleBus as this is missing the EventRecorder and a EventBus. |
3bd29c4 toac80529Compare| { | ||
| returnarray_map(function ($handler)use ($message) { | ||
| return$handler($message); | ||
| },$this->handlers); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
wouldn't anarray_walk be better ? Don't see why we would need to fetch the return values of each handlers..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Asdiscussed here I think we should have all we need to get the result of handlers, to support queries. Therefore,array_map makes sense here :)
ro0NL left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
And a 👍 from me :)
| <!-- Bus--> | ||
| <serviceid="message_bus"class="Symfony\Component\Message\MessageBus"public="true"> | ||
| <argumenttype="collection" /><!-- Middlewares--> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
can it be typetagged? :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Yep, I think so 👍
| class SendMessageMiddlewareimplements MiddlewareInterface | ||
| { | ||
| /** | ||
| * @var SenderLocatorInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
obvious IMHO
| */ | ||
| publicfunctionhandle($message,callable$next) | ||
| { | ||
| $this->logger->debug('Starting processing message',array( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
... message "'.get_class($message).'"'? (as for logs below).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I don't think we should have that as part of the log message (as it's in the context) but happy to change if more people believe it should be the case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
it would make identifying messages a bit easier (i.e. in the web profiler log panel, which would boldify the message class :))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Only if the message isStarting processing {class} message and we addclass as context I believe. Which... makes sense actually 👍
| try { | ||
| $result =$next($message); | ||
| }catch (\Throwable$e) { | ||
| $this->logger->warning('Something went wrong while processing message',array( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
An exception occured while ...?
| try { | ||
| $method =$reflection->getMethod('__invoke'); | ||
| }catch (\ReflectionException$e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
just usehasMethod?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
getMethod has to be called nonetheless. As we need the access to the method parameters. Do we really need to call 2 methods instead of just catching the exception in those rare cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Do we really need to call 2 methods instead of just catching the exception
For readability / less LoC, i'd say yes. No real issue i guess.. just looks like an unneeded micro-optim.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Note that we should also support having thehandles attribute on the tag. In case a handler is handling multiple commands/queries and/or the typehint is not present, we need to be able to specify the handled message.
| */ | ||
| publicfunction__construct(array$middlewares =array()) | ||
| { | ||
| $this->middlewares =$middlewares; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
to leveragetagged (which impliesiterable type), what about;
$this->middlewares =is_array($middlewares) ?array_values($middlewares) :iterator_to_array($middlewares,false);
or find a lazy way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Yup, good point. Why would youarray_values if array?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
to re-index middlewares, ensuringcallableForNextMiddleware implem always works. (needs anint $index type btw :))
| */ | ||
| private$serializer; | ||
| publicfunction__construct(SerializerInterface$serializer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
, $format = 'json?
| * | ||
| * @param object $message | ||
| * | ||
| * @return array |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
What aboutTransport\EncodedMessage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
What would be the value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
type info :) i.e. not having to rely on such a comment;
* The most common keys of the encoded array are:* - `body` (string) - the message body* - `headers` (string<string>) - a key/value pair of headerscommon means required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
But much less flexibility for use-cases we don't know about yet. That's for the same reason that the serializer's context is an array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I've been back and forth on this.
Serializers are never used in Symfony (message component) code. They are only used by queues to transform a PHP object to array to json. Which means that we should not really care. =)
Except for theSymfonySerialization that ships with the component, any implementation ofEncoderInterface andDecoderInterface will only be used inside the concept of a "my-queue-bridge". Which leads to the question, do we need these interfaces?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Serializers are never used in Symfony (message component) code. They are only used by queues to transform a PHP object to array to json.
I've read it this way:
Encoders are never used in Symfony (message component) code. They are only used byadapters to transform a PHP object toa PHP array. Adapters will then transform thePHP array to their own transport layer (can be JSON, can be whatever...)
Which means that we should not really care. =)
We, as users, that's correct, we shouldn't care at all about these technical details when dispatching messages to the bus.
Any implementation of EncoderInterface and DecoderInterface will only be used inside the concept of a "my-queue-bridge".
I don't think so, that's why I've put these interfaces within the component. There are multiple use-cases that are not adapter-specific but might provider encoder/decoders:
- When dispatching messages, it's pretty useful to be able to trace them. To get things like OpenTracing or Zipkin to work well, all we need is "just" proper headers on the messages (via AMQP, HTTP, whatever). So a
symfony-message-zipkinpackage could offer encoder/decoders decorators to populate and read these headers, regardless of the transport. - It might be useful for some users/companies to add their own routing specific keys (in body, header or anything else) within the message before it being sent to either of the senders (which could be different adapters). Such encoder/decoder would help them here.
Which leads to the question, do we need these interfaces?
The reasons given above expresses my answer which is: I believe we need them. Also, it reduces the duplication within the various adapters and reduces the overhead of creating one.
| #### Same bus received and sender | ||
| To allow us to receive and send messages on the same bus and prevent a loop, the message bus is equipped with the | ||
| `WrappedIntoReceivedMessage`received. It will wraps the received messages into `ReceivedMessage` objects and the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
WrapIntoReceivedMessage +It will wrap the..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Yep, good point 👍
| */ | ||
| interface ReceiverInterface | ||
| { | ||
| publicfunctionreceive():\Generator; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
iterable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
This has been done then reverted to\Generator for the reasons described inthis comment.
| /** | ||
| * @author Samuel Roze <samuel.roze@gmail.com> | ||
| */ | ||
| class HandlerLocatorimplements HandlerLocatorInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
@sroze: HandlerLocator should be able to lazy load the handlers. Now the question is, can I change this class to have ServiceLocator injected into the constructor, or should a new class be created and set as default in framework bundle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
+1 for a PSR-11 implem. (IMHO fits core/component)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
| { | ||
| $this | ||
| ->setDefinition(array( | ||
| newInputArgument('consumer', InputArgument::REQUIRED,'Name of the consumer'), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
We renamed "consumer" to "receiver": this needs to reflect the change as well.
| protectedfunctionexecute(InputInterface$input,OutputInterface$output) | ||
| { | ||
| /** @var ContainerInterface $container */ | ||
| $container =$this->getApplication()->getKernel()->getContainer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Correct, we can require the receivers (note the name change) to be tagged, so we can use service locators 👍
| <!-- Bus--> | ||
| <serviceid="message_bus"class="Symfony\Component\Message\MessageBus"public="true"> | ||
| <argumenttype="collection" /><!-- Middlewares--> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Yep, I think so 👍
| */ | ||
| publicfunctionhandle($message,callable$next) | ||
| { | ||
| $this->logger->debug('Starting processing message',array( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Only if the message isStarting processing {class} message and we addclass as context I believe. Which... makes sense actually 👍
| /** | ||
| * @author Samuel Roze <samuel.roze@gmail.com> | ||
| */ | ||
| class HandlerLocatorimplements HandlerLocatorInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
21ae05b to2824e9aCompareogizanagi commentedMar 20, 2018 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
sroze commentedMar 20, 2018
After maaany discussions, especially with@fabpot and@nicolas-grekas, the component is going to be called « Messenger ». |
| $handlersLocatorMapping =array(); | ||
| foreach ($handlersByMessageas$message =>$handler) { | ||
| $handlersLocatorMapping['handles.'.$message] =$handler; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Isn't this one supposed to behandler just like in ContainerHandlerLocator.php line 33?
I'm curious why the test didn't catch this
pborreli commentedMar 21, 2018
@sroze so now you have to rename your conference name at Symfony Live Paris 😄 |
| protectedfunctiondispatchMessage($message) | ||
| { | ||
| if (!$this->container->has('message_bus')) { | ||
| thrownew \LogicException('The message bus is not enabled in your application. Try running "composer require symfony/message".'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Need to be renamed as well.
| $container->addCompilerPass(newResettableServicePass()); | ||
| $container->addCompilerPass(newTestServiceContainerWeakRefPass(), PassConfig::TYPE_BEFORE_REMOVING, -32); | ||
| $container->addCompilerPass(newTestServiceContainerRealRefPass(), PassConfig::TYPE_AFTER_REMOVING); | ||
| $this->addCompilerPassIfExists($container, MessagePass::class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
To be renamed toMessengerPass
| <!-- Bus--> | ||
| <serviceid="message_bus"class="Symfony\Component\Messenger\MessageBus"public="true"> | ||
| <argumenttype="tagged"tag="message_middleware" /><!-- Middlewares--> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
To keep explicit, I'll therefore rename tomessage_bus_middleware.
| ), | ||
| ), | ||
| ), | ||
| 'message' =>array( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
messenger
| interface SenderLocatorInterface | ||
| { | ||
| /** | ||
| * Gets the producer (if applicable) for the given message object. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
s/producer/sender
| try { | ||
| $result =$next($message); | ||
| }catch (\Throwable$e) { | ||
| $this->logger->warning('An exception occurred while processing message {class}',array( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Let's keep the same language:s/processing/handling
| */ | ||
| publicfunctionhandle($message,callable$next) | ||
| { | ||
| $this->logger->debug('Starting processing message {class}',array( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Let's keep the same language: s/processing/handling
| throw$e; | ||
| } | ||
| $this->logger->debug('Finished processing message {class}',array( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Let's keep the same language: s/processing/handling
| /** | ||
| * @author Samuel Roze <samuel.roze@gmail.com> | ||
| */ | ||
| class MessagePassimplements CompilerPassInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Should beMessengerPass
| $parameter =$parameters[0]; | ||
| if (null ===$parameter->getClass()) { | ||
| thrownewRuntimeException(sprintf('The parameter of `__invoke` function of service "%s" must type hint the Message class it handles.',$serviceId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Message =>message
sroze commentedMar 21, 2018
@pborreli I know 😅 |
| <tagname="console.command"command="debug:event-dispatcher" /> | ||
| </service> | ||
| <serviceid="console.command.messenger_consume_message"class="Symfony\Bundle\FrameworkBundle\Command\MessengerConsumeMessagesCommand"> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
s\messenger_consume_message\messenger_consume_messages
GwendolenLynch left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
So much awesome! Thank you@sroze 🍾
| </service> | ||
| <serviceid="data_collector.messenger"class="Symfony\Bundle\FrameworkBundle\DataCollector\MessengerDataCollector"> | ||
| <tagname="data_collector"template="@WebProfiler/Collector/messages.html.twig"id="messenger"priority="100" /> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
s\id="messenger"\id="messages"
this breaks the profiler :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Changed sir' 👍
sroze commentedMar 21, 2018
I have squashed all the commits and added tests for what I believe is really important for now. I'd argue the bits that aren't covered by tests will be covered as long as we keep the policy that every change in this experimental component has to be covered by tests. Therefore, I'd say it's ready to be merged :) |
fabpot commentedMar 22, 2018
@sroze Can you add a sentence about the fact that the new component is experimental in the README file? |
sroze commentedMar 22, 2018 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
@fabpot just added a message and a link to the documentation 👍 (kept two commits so it's super easy to revert the experimental one before 4.2) |
fabpot commentedMar 23, 2018
Thank you@sroze. |
mcfedr commentedMar 26, 2018 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
This looks really interesting, I look forward to trying it out, I have been developing/using a library that solves a similar problem, but in a slightly different (probably less abstract/extensible) way, so you might interested to have a look,https://github.com/mcfedr/queue-manager-bundle, its been used in production on a number of sites that handle ~50 million messages/month - its less of a by-the-book solution, but thought might interesting for comparision |
| * | ||
| * @param object $message The message to dispatch | ||
| * | ||
| * @final |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
There is an annotation@final but method is not final itself
This PR was squashed before being merged into the 4.1-dev branch (closes#26632).Discussion----------[Messenger] Add AMQP adapter| Q | A| ------------- | ---| Branch? | master| Bug fix? | no| New feature? | yes| BC breaks? | no| Deprecations? | no| Tests pass? | ø| License | MIT- [x] Depends on the Messenger component#24411- [x] Add tests once we are all happy about the structure---In order to give a great DX for simple needs such as sending messages through an AMQP broker such as RabbitMq, we should ship an AMQP adapter for the Messenger component within Symfony Core. It should be as simple as this proposal. We don't need to handle more specific use-cases nor brokers as other adapters such as the [enqueue adapter](https://github.com/sroze/enqueue-bridge) can also be used.Configuring the adapter is as simple as the following configuration:```yaml# config/packages/messenger_adapters.yamlframework: messenger: adapter: "%env(MESSENGER_DSN)%"```With the given `.env` for example:```MESSENGER_DSN=amqp://guest:guest@localhost:5672/%2f/messages```Keep in mind that after having configured the adapter, developers have to route their messages to the given adapter.```yaml# config/packages/messenger_routes.yamlframework: messenger: routing:producer). 'App\Message\Command\CreateNumber': messenger.default_sender```---Additionally, multiple adapters can be created and messages routed to these ones.```yaml# config/packages/messenger_routes.yamlframework: messenger: adapters: commands: "amqp://guest:guest@localhost:5672/%2f/commands" maintenance: "amqp://guest:guest@localhost:5672/%2f/maintenance" routing:producer). 'App\Message\Command\CreateNumber': messenger.commands_sender 'App\Message\Command\MaintenanceSpecificCommand': messenger.maintenance_sender```Commits-------798c230 [Messenger] Add AMQP adapter
Uh oh!
There was an error while loading.Please reload this page.
As discussed in#24326. This PR is to help going forward with the discussions of having a Message component.
Resources
sroze/symfony-demo:message-component-demosroze/enqueue-bridge(to be moved assymfony/enqueue-bridgeI guess)2. Demo:In
sroze/symfony-demo:message-component-demo-with-enqueuesroze/swarrot-bridge(to be moved assymfony/swarrot-bridgeI guess)2. Demo:In
sroze/symfony-demo:message-component-demo-with-swarrotsroze/message-http-adapter2. Demo:In
sroze/symfony-demo:message-component-demo-with-http-adapterImportant points
I guess that this would replace#23842 嬓.
Changes from the proposals
Based on the comments, a few changes have been made from the proposal.
MessageProducers have been renamed toMessageSendersMessageConsumers have been renamed toMessageReceivers