Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

[Messenger] Adding support for record messages#27844

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Closed
Nyholm wants to merge1 commit intosymfony:masterfromNyholm:record-messages

Conversation

@Nyholm
Copy link
Member

@NyholmNyholm commentedJul 4, 2018
edited
Loading

QA
Branch?master
Bug fix?no
New feature?yes
BC breaks?no
Deprecations?no
Tests pass?yes
Fixed tickets
LicenseMIT
Doc PRsymfony/symfony-docs#10015

With inspiration fromthis blog post, I created a message recorder. This is needed when you handle messages in the same doctrine transaction. Ie, you have aCommand and theCommandHandler dispatchesEvents. Those events should be handled in a new doctrine transaction.

Scenario

If the command handler creates a User with uuid X and dispatchesUserCreated event. If they are handled in the same transaction then this code will find nothing:

$this->em->getRepository(User::class)->findOneBy(['uuid'=>X]);

Example configuration:

framework:messenger:default_bus:messenger.bus.commandbuses:messenger.bus.command:middleware:# Make sure recorded events are handled by the event bus. You may also configure the message recorder with a second parameter.                    -app.messenger.middleware.record_messages                    -doctrine_transaction_middleware:['default']# -- more busesservices:app.messenger.middleware.record_messages:parent:messenger.middleware.record_messagesarguments:['@messenger.bus.event']

kbond, ogizanagi, yceruto, arnolanglade, and andreybolonin reacted with thumbs up emoji
foreach (array_values($arguments ??array())as$key =>$argument) {
// Convert argument to references if needed.
if (is_string($argument) &&$argument[0] ==='@') {
$argument =newReference(substr($argument,1));
Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Ping@ogizanagi, This is the best way I found to support references. Im not sure if this is needed if you configureframework.messenger with XML.

foreach (array_values($arguments ??array())as$key =>$argument) {
// Convert argument to references if needed.
if (is_string($argument) &&'@' ===$argument[0]) {
$argument =newReference(substr($argument,1));
Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Ping@ogizanagi, This is the best way I found to support references. Im not sure if this is needed if you configure framework.messenger with XML.

Or should I go the long way of creating a Factory that uses@container as a dependency?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

The idea of factories weren't to replace DI config when more suitable, only to solve most simple needs for configurable middlewares. Therefore, I don't think we should add services references support.
If really desired, we could indeed use dedicated service locators or perhaps let the user wiring it itself by creating the middleware service using DI config for now.

Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Hm. I tried using a Factory that depends on@service_container. That does not work since the buses are private.

I also tried injecting all services tagged withmessage_bus into the factory. That did not work because of circular dependencies.

Circular reference detected for service "debug.traced.messenger.bus.command", path: "debug.traced.messenger.bus.command -
debug.traced.messenger.bus.command.inner -> messenger.bus.command.middleware.messenger.middleware.handles_recorded_mess
ages -> messenger.middleware.handles_recorded_messages.factory -> debug.traced.messenger.bus.command".

Im saying that I do not know a better (working) solution that this. Im sorry.

Copy link
Contributor

@ogizanagiogizanagiJul 5, 2018
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Injecting the Symfony service container now belongs to another era 😃
We should register dedicatedservice locators instead. This would happen in theMessengerPass, usingServiceLocatorTagPass::register().

But as I said, we could let the user create the concrete instance of this middleware in userland for now.

Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Done. I've updated the configuration example

$this->messageRecorder->eraseMessages();

foreach ($recordedMessagesas$recordedMessage) {
$this->messageBus->dispatch($recordedMessage);
Copy link
Contributor

@ogizanagiogizanagiJul 4, 2018
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

What if a recorded message recorded another one?
Should we clear any recorded message before middleware execution, so it's obvious it's not supported & undesired by design, and no left over message is dispatched when reusing the bus to handle another message? (am I clear? 😅)

Also should an exception while dispatching a recorded message really interrupt everything?

Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Interesting.

I think I agree with all your questions. I'll try to make some changes to address them.

*
* @return object[]
*/
publicfunctionrecordedMessages():array;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Why not naming this functionfetch() instead ofrecordedMessages(), as you already name below the methodrecord() (withoutmessages at the end of the name) ?

Same thing foreraseMessages().

ogizanagi reacted with thumbs up emoji
Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I took the names from SimpleBus. But you are correct, your naming seams easier

try {
$this->messageBus->dispatch($recordedMessage);
}catch (\Throwable$exception) {
$exceptions[] =$exception;
Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Note that if one handler to message A throws an exception, then other handlers may not run (depending on the order).

This will just make sure that handlers to message B will run even though a handler to A throws an exception.

Copy link
Member

@kbondkbondJul 5, 2018
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This slightly ties into what I proposed in#27215. I think all handlers should be run even if some fail. Also, I think catching all the exceptions should be done in a middleware on the event bus.

/**
* {@inheritdoc}
*/
publicfunctionerase():void
Copy link
Contributor

@ogizanagiogizanagiJul 5, 2018
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

reset? Would be a legit use-case for theResettableInterface too I guess (for now using thekernel.reset tag manually until#27093 is approved and merged)

Nyholm reacted with thumbs up emoji
}

$exceptions =array();
while(!empty($recordedMessages =$this->messageRecorder->fetch())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

if rather thanwhile?

publicstaticfunctioncreate(array$exceptions):self
{
$message =sprintf(
"One or more handlers for reordered messages threw an exception. Their messages were:\n\n%s",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

s/reordered/recorded

{
return$this->exceptions;
}
} No newline at end of file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Missing new line

Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Fixed

}

if (!empty($exceptions)) {
throw MessageRecorderHandlerException::create($exceptions);
Copy link
Contributor

@ogizanagiogizanagiJul 5, 2018
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

We don't use named construct for exceptions in core ^^

Wild thoughts:
Should it really throw? Make it configurable? Logged aserror? Should theMessageRecorder store the exceptions itself?

Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Hm.. I think we should throw.
But making it configurable sounds like a reasonable future update.

<argumenttype="service"id="validator" />
</service>

<serviceid="messenger.middleware.handles_recorded_messages"class="Symfony\Component\Messenger\Middleware\HandlesRecordedMessagesMiddleware"abstract="true">
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Looking at the names of other middlewares, this one maybe should be calledmessenger.middleware.record_messages

Nyholm reacted with thumbs up emoji
publicfunctionfetch():array;

/**
* Erase messages that were recorded since the last call to eraseMessages().

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

As your function is calledreset(), I would have say:

Reset messages that were recorded since the last call to eraseMessages().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Reset messages that were recorded since the last call toreset().

😄

sroze reacted with thumbs up emoji
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

You can actually extend theResettableInterface fromsymfony/contracts now.

BenjaminRbt reacted with thumbs up emoji
*
* @param object $message
*/
publicfunctionrecord($message);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Add: void as function signature (because of interface).

arnolanglade reacted with thumbs up emoji
Copy link
Contributor

@srozesrozeJul 20, 2018
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I don't see any value in adding: void tbh. It might be an issue if some recorders want to return something? (which might be needed in some case?)

sroze
sroze previously requested changesJul 20, 2018
Copy link
Contributor

@srozesroze left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

In this pull-request, there is nothing that actually records the messages, right? 🤔

As far as I understand, the problem you are trying to solve is to wrap messages into some "transaction", so that dispatched message will be handled after the "current" one, right? If I understand it correctly then I believe we should work on the naming to clarify this and we can use the term "transactions", it isn't necessarily database-only.

publicfunctionfetch():array;

/**
* Erase messages that were recorded since the last call to eraseMessages().
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

You can actually extend theResettableInterface fromsymfony/contracts now.

BenjaminRbt reacted with thumbs up emoji
*
* @param object $message
*/
publicfunctionrecord($message);
Copy link
Contributor

@srozesrozeJul 20, 2018
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I don't see any value in adding: void tbh. It might be an issue if some recorders want to return something? (which might be needed in some case?)

namespaceSymfony\Component\Messenger\Exception;

/**
* When handling recorded messaged one or more handlers caused an exception.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

s/one/on

publicfunction__construct(array$exceptions)
{
$message =sprintf(
"One or more handlers for recorded messages threw an exception. Their messages were:\n\n%s",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

If there is only one exception, let's not confuse the user and display it without wrapping it to "one or more".

*
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
*/
class MessageRecorderHandlerExceptionextends \RuntimeExceptionimplements ExceptionInterface
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

MessageHandlingException? This has nothing to do with the "recorder" AFAIK


$exceptions =array();
while (!empty($recordedMessages =$this->messageRecorder->fetch())) {
$this->messageRecorder->erase();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

If the use-case is alwaysfetch anderase... I'm pretty sure we can find better thanfetch +erase/reset.

Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Im not sure what you mean here.

publicfunctionhandle($message,callable$next)
{
// Make sure the recorder is empty before we begin
$this->messageRecorder->erase();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Why would you? 🤔

Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

To avoid any side effects. See#27844 (comment)

@Nyholm
Copy link
MemberAuthor

In this pull-request, there is nothing that actually records the messages, right? 🤔

True. "record" is a word from SimpleBus.

As far as I understand, the problem you are trying to solve is to wrap messages into some "transaction", so that dispatched message will be handled after the "current" one, right? If I understand it correctly then I believe we should work on the naming to clarify this and we can use the term "transactions", it isn't necessarily database-only.

Correct. I want to wrap mycommands in one transaction and handle myevents later. So when used with doctrine transaction middleware, if my event fails, it should not roll back the command.

@gubler
Copy link

I implemented this pull request in an application I'm working on after it was pointed out to me by@ogizanagi in Slack and@sroze asked me to write up a use case.

I am working on App with a Command Bus with doctrine transactions and had an Event Bus. When injecting the Event Bus directly to the Command Handler, any error in the doctrine transaction caused a rollback as it was supposed to, but the Events were still dispatched to the Event Bus.

I copied the contents of this pull request into my app's namespace - just changing the namespaces for everything and it worked flawlessly. I did have to add thereset method to theMessageRecorderInterface because I am on Symfony 4.1 and not usingsymfony/contracts.

Now I inject the message recorder and if my transaction fails, the event is not dispatched. Here is an example Command Handler:

namespaceApp\Domain\CommandHandler\Facility;useApp\Domain\Command\Facility\NewFacilityCommand;useApp\Domain\Event\Facility\FacilityCreatedEvent;useApp\Lib\Messenger\MessageRecorder\MessageRecorderInterface;useApp\Repository\FacilityRepository;/** * Class NewFacilityHandler. */class NewFacilityHandler{/** @var FacilityRepository */protected$repository;/** @var MessageRecorderInterface */protected$recorder;/**     * @param FacilityRepository       $repository     * @param MessageRecorderInterface $recorder     */publicfunction__construct(FacilityRepository$repository,MessageRecorderInterface$recorder)    {$this->repository =$repository;$this->recorder =$recorder;    }/**     * @param NewFacilityCommand $command     */publicfunction__invoke(NewFacilityCommand$command)    {$facility =$this->repository->persist($command);$this->recorder->record(newFacilityCreatedEvent($facility,$command->generator            )        );    }}

I kept all the names the same so when this is (hopefully) merged and added to a tagged release, I can update all of theuse statements and myservices.yaml file and everything should continue to work the same.

I did have one problem until I read the documentation PR that mentioned that the message recorder middleware needed to be declaredbefore the transaction middleware, but once I fixed that, it worked fine.

Here is mymessages.yaml if it helps:

framework:messenger:default_bus:messenger.bus.commandsbuses:messenger.bus.commands:middleware:                    -validation                    -record_messages                    -doctrine_transaction_middleware                    -'App\Domain\BusMiddleware\SecurityMiddleware'messenger.bus.events:middleware:                    -validation                    -allow_no_handler                    -doctrine_transaction_middleware                    -'App\Domain\BusMiddleware\SecurityMiddleware'                    -'App\Domain\EventBusMiddleware\FlashMessageMiddleware'messenger.bus.queries:middleware:                    -validation                    -doctrine_transaction_middleware                    -'App\Domain\BusMiddleware\SecurityMiddleware'

and the relevant parts of myservices.yaml. I had to add themethod argument to the tags formessage.recorder - I assume this is bacause I am using Symfony 4.1 and not usingsymfony/contracts.

# doctrine middleware for command busdoctrine.orm.messenger.middleware_factory.transaction:class:Symfony\Bridge\Doctrine\Messenger\DoctrineTransactionMiddlewareFactoryarguments:['@doctrine']messenger.middleware.doctrine_transaction_middleware:class:Symfony\Bridge\Doctrine\Messenger\DoctrineTransactionMiddlewarefactory:['@doctrine.orm.messenger.middleware_factory.transaction', 'createMiddleware']abstract:truearguments:['default']messenger.middleware.record_messages:class:App\Domain\BusMiddleware\HandlesRecordedMessagesMiddlewaremessenger.recorder:class:App\Lib\Messenger\MessageRecorder\MessageRecordertags:            -name:'kernel.reset'method:'reset'App\Lib\Messenger\MessageRecorder\MessageRecorderInterface:alias:"messenger.recorder"
ogizanagi, Nyholm, sroze, and andreybolonin reacted with thumbs up emoji

$this->messageRecorder->reset();

try {
$next($message);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

You need to capture the results as well to return them.

Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

You are correct. Thank you. I've updated the PR

* @author Tobias Nyholm <tobias.nyholm@gmail.com>
* @author Matthias Noback <matthiasnoback@gmail.com>
*/
interface MessageRecorderInterfaceextends ResetInterface

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

SimpleBus split this interface into 2 small ones. The first oneContainsRecordedMessages is implement by our models because you need to tell them to give us the events raise during the transaction. The second oneRecordsMessages is used to record event at this end of the transaction. For instance, I use it in my repository when my aggregate is flushed. I guess this two distinct responsibilities: I don't want that my model have a public methodrecord.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

BTW, I prefer the naming of simple bus (declarative vs imperative programming). It is better to describer what happens instead how it happens :)

Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I thought about slitting them. But I was not sure. Thank you for commenting!

*
* @return object[]
*/
publicfunctionfetch():array;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This name seems weird to me. Like I said previously my model will implement this interface. For example, a product model. My product will have Product::fetch() and Product::reset(). In term of business that does not have any sense. I think we should useeraseMessage() andrecordedMessage() it more explicit. I think it is a good compromise between technical and business stuff.

parent::__construct($message);
}

publicfunctiongetExceptions():array

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

toArray would be better.

Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Hm, Im not sure

}

if (!empty($exceptions)) {
if (1 ===count($exceptions)) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Why do you do that? I think it will be better to throw a single type of exception. It will be easier to handle it.

Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

TheMessageHandlingException is only for grouped exceptions. If the group only have one element, there is no need for a group.

I think an earlier review suggested it to be this way.

</service>

<serviceid="messenger.middleware.record_messages"class="Symfony\Component\Messenger\Middleware\HandlesRecordedMessagesMiddleware"abstract="true">
<argumenttype="service"id="message_bus" />

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

message_bus? Will you dispatch the events into the same bus than the command?

Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This will be overwritten by your configuration. It is just abstract.

@Nyholm
Copy link
MemberAuthor

Thank you for the reviews. I've made some updates.

@chalasr
Copy link
Member

Could you add some tests? Something likehttps://github.com/symfony/symfony/blob/master/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php#L615 would help understanding the wiring of this feature for me.

@Koc
Copy link
Contributor

Koc commentedSep 15, 2018

@Nyholm consider another approach, when aggregate root emits events and they are dispatched after transaction commited. Seehttps://beberlei.de/2013/07/24/doctrine_and_domainevents.html for more details.

With current approach we need emit events manually from command handlers.

@Nyholm
Copy link
MemberAuthor

Nyholm commentedSep 15, 2018 via email

That is a follow up PR :)It is just a new listener and a messageContainer :)
On 15 Sep 2018, at 04:08, Konstantin Myakshin ***@***.***> wrote:@Nyholm consider another approach, when aggregate root emits events and they are dispatched after transaction commited. Seehttps://beberlei.de/2013/07/24/doctrine_and_domainevents.html for more details. With current approach we need emit events manually from command handlers. — You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub, or mute the thread.
Koc reacted with thumbs up emoji

@NyholmNyholmforce-pushed therecord-messages branch 2 times, most recently from57a9089 tof81f82bCompareSeptember 15, 2018 12:06
@Nyholm
Copy link
MemberAuthor

I've squashed and rebased. I'm ready for a final review.

@srozesroze dismissed theirstale reviewSeptember 15, 2018 13:43

Changes made

publicfunctionhandle($message,callable$next)
{
// Make sure the recorder is empty before we begin
$this->messageRecorder->resetRecordedMessages();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Let's imagine the following example:

  1. Controllerdispatches aFirstMessage message.
  2. FirstMessageHandler gets called and it does the following:
    $recorder->record(newSecondMessage);$bus->dispatch(newSaveToSecondaryDatabase);$recorder->record(newThirdMessage);

Thedispatch will call the middleware and the just recorded message will be erased, right?

Koc reacted with thumbs up emoji
Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This is correct IF the current bus and$bus is using the sameMessageRecorder.

I think I should remove this line. Are you okey with that@ogizanagi? It was added byyour suggestion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

It might be fine removing it, but the case described above is a more complex topic then.
For instance, if there is any issue withFirstMessageHandler afterSaveToSecondaryDatabase,SecondMessage
(and possibly those recorded bySaveToSecondaryDatabase) would still have been executed while I would expect it does only ifFirstMessageHandler is fully successful.
Actually, to me recorded messages should be treated if and only if the highestdispatch call is successful. Possibly by using an internal flag inside the middleware? (Thus, we could even keep the reset for safety, only when the flag is off).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Koc reacted with thumbs up emoji
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Which is equivalent totheFinishesHandlingMessageBeforeHandlingNext middleware of SimpleBus. (I prefer SimpleBus' naming in here, it's more explicit)

}

$exceptions =array();
while (!empty($recordedMessages =$this->messageRecorder->getRecordedMessages())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

What's the reasoning ofget +reset instead ofpop?

Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I want to make sure nobody runs$this->messageRecorder->resetRecordedMessages(); while I'm handling the recorded messages.

@nicolas-grekas
Copy link
Member

(deps=low tests are red, but they should never be to me.)

@Nyholm
Copy link
MemberAuthor

I rebased and everything is green.

@weaverryan
Copy link
Member

@Nyholm is the documentation PR more-or-less up-to-date with the most recent changes? I'm want to "wrap my head" around this PR - I was starting by reading your docs :)

@Nyholm
Copy link
MemberAuthor

Yes. It’s up to date.

@javiereguiluz
Copy link
Member

@Nyholm is"record" a common/required word in this context? Personally I would always say:"save messages" or"store messages" to refer to something like this but never"record messages".

arnolanglade reacted with thumbs down emoji

@Nyholm
Copy link
MemberAuthor

That is a fair point. "Record" is used from SimpleBus. As far as I know it is made up by them. Using "record" will make it easier for SimpleBus users to migrate (if they would like to).

Im all for using another word but it has to be more clear than using "record". I think that using "Store" or "Save" would cause for confusion since the component support adding things to a queue or "queue like".

Since I'm not "saving" any messages anywhere (just having them in a temporary memory queue) I don't think it is a good word.

@sroze
Copy link
Contributor

sroze commentedSep 22, 2018
edited
Loading

Then, what about this?

$this->bus->dispatch(Envelope::wrap($message)->with(newToBeDispatchedAfterCurrentMessageHandling))

All we need is a middleware that is doing the "locking" (i.e. tactitian term) or "FinishesHandlingMessageBeforeHandlingNext" (SimpleBus) only if the following envelope item is present.

@Nyholm
Copy link
MemberAuthor

With an EventRecorder you can collect events from Doctrine (just as@arnolanglade is doing).

class User {use EventRecorderTrait;// To be created in a following PRpublicfunctionsetUsername($username)   {$this->username =$username;$this->record(newUsernameWasUpdated());  }// ...}

Using the minor modification in my entity as shown above and aDoctrineEventRecorder (Doctrine subscriber) I can make sure that domain events like these eventually ends up on the bus.


You will also have the flexibility to decide which events that should be recorded and which should be handled now.

publicfunctionhandle(FooCommand$command){$this->recorder->record(newBar());// Handle later$this->eventBus->dispatch(newBiz());// Handle now}

This is not possible with a middleware likeFinishesHandlingMessageBeforeHandlingNext.


I know "EventRecorder" and "record" is a new concept to this component. But it is not a new concept for message busses. SimpleBus has successfully been using this for years.

@sroze
Copy link
Contributor

sroze commentedSep 22, 2018
edited
Loading

You will also have the flexibility to decide which events that should be recorded and which should be handled now.

That would exactly be the point of this Envelope itemin this comment, right?

Using the minor modification in my entity as shown above and a DoctrineEventRecorder (Doctrine subscriber) I can make sure that domain events like these eventually ends up on the bus.

But you could do the same with the other option which does not introduce this extra notion of "recorder", isn't it?

@Nyholm
Copy link
MemberAuthor

No, you can’t. (As far as I know). How and when would you dispatch new messages to the bus?
Sure, you can create a middleware called “DispatchMessagesCollectedByDoctrineSubscriber”. That would possibly achieve the same thing but will less abstraction and flexibility.

I don’t know why we are hesitant of introducing this concept to the MessengerComponent. The argument cannot be DX and/or to keep the component simple, because the alternative solutions proposed are neighter simpler or more intuitive.

Btw, I like that we are challenging this, and I don’t think we should have redundant ways of doing the same thing.

@Koc
Copy link
Contributor

Koc commentedSep 22, 2018

How it possible to use thisMessageRecorder for event sourcing and store all triggered events in db insame transaction? But queue them to the bus after transaction is complete.

@Nyholm
Copy link
MemberAuthor

This is how you keep them in different transaction:

# config/packages/messenger.yamlframework:messenger:default_bus:messenger.bus.commandbuses:messenger.bus.command:middleware:                    -messenger.middleware.validation                    -messenger.middleware.handles_recorded_messages# Doctrine transaction must be after handles_recorded_messages middleware                    -doctrine_transaction_middleware:['default']messenger.bus.event:middleware:                    -messenger.middleware.allow_no_handler                    -messenger.middleware.validation

If you change the order fo the middlewares, you can get them in the same middleware:

messenger.bus.command:middleware:                    -messenger.middleware.validation                    -doctrine_transaction_middleware:['default']                    -messenger.middleware.handles_recorded_messages

If you are talking about events from a CommandHandler you would obviously not use this event recorder but the event bus directly. But if you are referring to the events from your entities, then the solution above it for you.

If you want to use both, just define more services for the same classes.

@Koc
Copy link
Contributor

Koc commentedSep 22, 2018
edited
Loading

If you are talking about events from a CommandHandler you would obviously not use this event recorder but the event bus directly.

No, I cann't. Command handing wrapped in transaction and we shouldn't send any events until it commited.

  1. emit events in aggregate root
  2. save aggregate root and emited events in db in same transaction
  3. after transaction commited - send emited events also to bus.

If you want to use both, just define more services for the same classes.

Not sure that I understand what services should be defined twice. Can you elaborate?

@fabpot
Copy link
Member

Closing in favor of#28849

@fabpotfabpot closed thisMar 3, 2019
sroze added a commit that referenced this pull requestMar 19, 2019
…t bus is finished (Nyholm)This PR was merged into the 4.3-dev branch.Discussion----------[Messenger] Support for handling messages after current bus is finished| Q             | A| ------------- | ---| Branch?       | master| Bug fix?      | no| New feature?  | yes| BC breaks?    | no| Deprecations? | no| Tests pass?   | yes| Fixed tickets || License       | MIT| Doc PR        |symfony/symfony-docs#10015This is a replacement for#27844. We achieve the same goals without introducing the new concept of "recorder".```phpclass CreateUserHandler{    private $em;    private $eventBus;    public function __construct(MessageBus $eventBus, EntityManagerInterface $em)    {        $this->eventBus = $eventBus;        $this->em = $em;    }    public function __invoke(CreateUser $command)    {        $user = new User($command->getUuid(), $command->getName(), $command->getEmail());        $this->em->persist($user);        $message = new UserCreatedEvent($command->getUuid();        $this->eventBus->dispatch((new Envelope($message))->with(new DispatchAfterCurrentBus()));    }}```Note that this `DispatchAfterCurrentBusMiddleware` is added automatically as the first middleware.2019-03-13: I updated the PR description.Commits-------903355f Support for handling messages after current bus is finished
@nicolas-grekasnicolas-grekas modified the milestones:next,4.3Apr 30, 2019
weaverryan added a commit to symfony/symfony-docs that referenced this pull requestMay 24, 2019
…rentBusMiddleware (Nyholm, gubler, OskarStark)This PR was submitted for the master branch but it was merged into the 4.3 branch instead (closes#10015).Discussion----------[Messenger] Added documentation about DispatchAfterCurrentBusMiddlewareDocumentation to PR:symfony/symfony#27844![Screenshot 2019-03-17 at 11 43 23](https://user-images.githubusercontent.com/1275206/54489171-edee4880-48a9-11e9-8028-ac501e62dfcb.png)![Screenshot 2019-03-17 at 11 43 34](https://user-images.githubusercontent.com/1275206/54489173-edee4880-48a9-11e9-877f-f749f242b502.png)![Screenshot 2019-03-17 at 11 43 39](https://user-images.githubusercontent.com/1275206/54489172-edee4880-48a9-11e9-84f1-d78e68474c7a.png)Commits-------0d12880 minor updates according to feedbackec3e1d7 updates according to feedback3e99663 Update messenger/message-recorder.rstfe81abc Update messenger/message-recorder.rstc1cef9e Update messenger/message-recorder.rstcfa56f7 Update messenger/message-recorder.rst06ec8a4 Added PHP and XML config797f530 Minor updates30e958c Explain HandleMessageInNewTransaction middleware84b81e3 Added fixesca22c3c a userfff8659 According to feedbackafdddbf Added documentation about message recorder
Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment

Reviewers

@javiereguiluzjaviereguiluzjaviereguiluz left review comments

@kbondkbondkbond left review comments

@chalasrchalasrchalasr left review comments

+4 more reviewers

@srozesrozesroze left review comments

@ogizanagiogizanagiogizanagi left review comments

@arnolangladearnolangladearnolanglade left review comments

@BenjaminRbtBenjaminRbtBenjaminRbt left review comments

Reviewers whose approvals may not affect merge requirements

Assignees

No one assigned

Projects

None yet

Milestone

4.3

Development

Successfully merging this pull request may close these issues.

14 participants

@Nyholm@gubler@chalasr@Koc@nicolas-grekas@weaverryan@javiereguiluz@sroze@fabpot@kbond@ogizanagi@arnolanglade@BenjaminRbt@carsonbot

[8]ページ先頭

©2009-2025 Movatter.jp