Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

[Messenger] Add a way to no ack message automatically#42873

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Closed
lyrixx wants to merge1 commit intosymfony:5.4fromlyrixx:messenger-batch

Conversation

@lyrixx
Copy link
Member

@lyrixxlyrixx commentedSep 3, 2021
edited
Loading

QA
Branch?5.4
Bug fix?no
New feature?yes
Deprecations?no
TicketsFix#36910
LicenseMIT
Doc PR

This PR add a way to manually controller auto ACK from the handler.
It open the door to buffering.

Here is the code one must write to buffer message, and process the buffer each 10s. No more code is needed!

<?phpnamespaceApp\MessageHandler;useApp\Message\HelloMessage;usePsr\Container\ContainerInterface;usePsr\Log\LoggerInterface;useSymfony\Component\EventDispatcher\EventSubscriberInterface;useSymfony\Component\Messenger\Envelope;useSymfony\Component\Messenger\Event\WorkerRunningEvent;useSymfony\Component\Messenger\Event\WorkerStoppedEvent;useSymfony\Component\Messenger\Handler\ConfigureAutoAckInterface;useSymfony\Component\Messenger\Handler\MessageHandlerInterface;useSymfony\Component\Messenger\Stamp\ReceivedStamp;useSymfony\Component\Messenger\Transport\Receiver\ReceiverInterface;finalclass HelloMessageHandlerimplements MessageHandlerInterface, ConfigureAutoAckInterface, EventSubscriberInterface{private$buffer = [];private$lastFlushedAt =null;publicfunction__construct(privateContainerInterface$receiverLocator,privateLoggerInterface$logger,    ) {$this->lastFlushedAt =time();    }publicfunction__invoke(HelloMessage$message,Envelope$envelope)    {if ($this->isAutoAckDisabled($envelope)) {$this->logger->info('Add message to buffer.');$this->buffer[] =$envelope;$this->flushIfNeeded();return;        }$this->logger->info('process message.');// Do regular processing    }publicfunctionisAutoAckDisabled(Envelope$envelope):bool    {// This handler could be used by many transports.// But only the async one should be manually controlled // bufferisedreturn$envelope->last(ReceivedStamp::class)->getTransportName() ==='async';    }publicfunctionflushIfNeeded()    {$this->logger->info('Flush buffer if needed.');if (time() <$this->lastFlushedAt +10) {return;        }$this->flush()    }publicfunctionflush()    {$this->logger->info('Flush buffer.');$this->lastFlushedAt =time();// Do your custom processing on the buffertry {foreach ($this->bufferas$envelope) {/** @var ReceiverInterface */$receiver =$this->receiverLocator->get($envelope->last(ReceivedStamp::class)->getTransportName());$receiver->ack($envelope);            }        }finally {$this->buffer = [];        }    }publicstaticfunctiongetSubscribedEvents()    {return [            WorkerRunningEvent::class =>'flushIfNeeded',            WorkerStoppedEvent::class =>'flush',        ];    }}
# services.yamlservices:App\MessageHandler\HelloMessageHandler:arguments:$receiverLocator:'@messenger.receiver_locator'

sponsored byarte.tv

Korbeil, kolyadin, nikophil, stumbik, pyrech, damienalexandre, xavierlacot, franek, nitneuk, and chapterjason reacted with thumbs up emojiKorbeil and chapterjason reacted with rocket emojisstok reacted with eyes emoji
@carsonbot
Copy link

Hey!

I think@tyx has recently worked with this code. Maybe they can help review this?

Cheers!

Carsonbot

Copy link
Member

@jderussejderusse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I like this feature!.

Beware in your example, flushing in not an option on consumer stop

-WorkerStoppedEvent::class =>  'flushIfNeeded',+WorkerStoppedEvent::class =>  'forceFlush',

lyrixx reacted with thumbs up emoji
@lyrixx
Copy link
MemberAuthor

Hello @symfony/mergers

The initial issue got 8 👍🏼 and, this PR got 5 👍🏼 and@jderusse liked this feature.

So without feedback, I'm planning to merge this PR by the end of the month, before the feature freeze.
If you are against it, it's time to raise your voice :)

okwinza and franek reacted with thumbs up emoji

@okwinza
Copy link
Contributor

This is indeed a feature I would like to see in 5.4. 👍

Btw,@lyrixx any chance you could also take a look at my#42335 before the feature freeze is upon us? 😅

@lyrixx
Copy link
MemberAuthor

I have rebased the PR (to fix conflict on CHANGELOG.md file)

Copy link
Member

@nicolas-grekasnicolas-grekas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This makes handlers aware of the transport layer, which is something that we refused to do as of now, see eg#42005.
By doing so, this approach is also giving the responsibility of buffering to handlers. I'm wondering if handlers are the right place to do that. I don't think so for now.

Instead, what about adding a middleware that does the buffering, and communicates with the worker via a stamp as done here? Buffering would be enabled/disabled by either setting a limit on the number of messages per batch, or by injecting a buffering decision strategy if we want something super flexible.

BTW, how are we sure that messages are flushed eventually, when we don't know when messages can arrive? Should we inject a delayed message that would trigger the flush if nothing happened before it?

About naming, "DelayedAckStamp" made me think: "delayed? when?". Maybe this should be named "DisableAutoAckStamp"?

@lyrixx
Copy link
MemberAuthor

Instead, what about adding a middleware that does the buffering, and communicates with the worker via a stamp as done here? Buffering would be enabled/disabled by either setting a limit on the number of messages per batch, or by injecting a buffering decision strategy if we want something super flexible.

Hmm, indeed it might be possible. But ATM, I fail to see how the middleware could actually call the "code that process the buffer". Should we introduce a new method (via an interface) on the handler (Like it's done on monolog handler)?

So basically, you want me to move the theConfigurableAutoAckInterface::isAutoAckDisabled() to a middleware instead of the handler itself ? (I'm asking to be sure I fully understand)

BTW, how are we sure that messages are flushed eventually, when we don't know when messages can arrive? Should we inject a delayed message that would trigger the flush if nothing happened before it?

As you can see in the PR body, user need to register also a listener but a timer to flush after a configured time. But this could leave in the middleware

About naming, "DelayedAckStamp" made me think: "delayed? when?". Maybe this should be named "DisableAutoAckStamp"?

I agree 👍🏼


But I see a drawback with your idea: a new middleware implies a new bus because one doesn't want to buffer all message:https://symfony.com/doc/current/messenger.html#middleware

So publishing a message will be more complexe because one will have to choose the right buses when dispatching the message. IMHO, it's error prone.

So you want to move the "handlers [is] aware of the transport layer" to the publisher is aware of the transport layer. Is it really better?

@nicolas-grekas
Copy link
Member

how the middleware could actually call the "code that process the buffer".

can't the middleware call the handler many times in a loop, once per buffered message?
the middleware would do that when the Nth message arrives (aka when the buffer is full).

So basically, you want me to move the the ConfigurableAutoAckInterface::isAutoAckDisabled() to a middleware instead of the handler itself ?

I think we won't need any new interface if we do this. The stamp might be enough.

a new middleware implies a new bus because one doesn't want to buffer all message

we could have a config option that gives the size of the buffer, and if the size is > 1, then we add this middleware? (or we remove it if it's <= 1? or we make it a no-op if we can't skip registering it).

@lyrixx
Copy link
MemberAuthor

can't the middleware call the handler many times in a loop, once per buffered message?
the middleware would do that when the Nth message arrives (aka when the buffer is full).

nope, because you don't know how it's the "last message". You need all messages at once.

I think we won't need any new interface if we do this. The stamp might be enough.

You don't have access to the stamp in the handler. Adding it is what you don't want.

we could have a config option that gives the size of the buffer, and if the size is > 1, then we add this middleware? (or we remove it if it's <= 1? or we make it a no-op if we can't skip registering it).

I don't understand. Middlerware a setup for a busses. not for a transport or handler. See the doc to better understand.

Copy link
Member

@nicolas-grekasnicolas-grekas left a comment
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

From the PR description:

__invoke(HelloMessage $message, Envelope $envelope)

the envelope is not passed as 2nd arg so this doesn't work it would be in this PR :)
to be well thought as we rejected doing so in the past

$envelope->last(ReceivedStamp::class)->getTransportName() === 'async';

hardcoding the name of the transport in the class should be really avoided

I'm putting this on hold while looking for alternative approaches.

@nicolas-grekas
Copy link
Member

Here is another proposal on the topic:#43354

onEXHovia reacted with thumbs up emoji

@nicolas-grekas
Copy link
Member

Closing in favor of#43354, thanks for pushing this@lyrixx!

@lyrixxlyrixx deleted the messenger-batch branchOctober 28, 2021 07:15
fabpot added a commit that referenced this pull requestOct 30, 2021
…las-grekas)This PR was merged into the 5.4 branch.Discussion----------[Messenger] allow processing messages in batches| Q             | A| ------------- | ---| Branch?       | 5.4| Bug fix?      | no| New feature?  | yes| Deprecations? | no| Tickets       |#36910| License       | MIT| Doc PR        | -This replaces#42873 as it proposes an alternative approach to handling messages in batch.`BatchHandlerInterface` says it all: if a handler implements this interface, then it should expect a new `$ack` optional argument to be provided when `__invoke()` is called. When `$ack` is not provided, `__invoke()` is expected to handle the message synchronously as usual. But when `$ack` is provided, `__invoke()` is expected to buffer the message and its `$ack` function, and to return the number of pending messages in the batch.Batch handlers are responsible for deciding when they flush their buffers, calling the `$ack` functions while doing so.Best reviewed [ignoring whitespaces](https://github.com/symfony/symfony/pull/43354/files?w=1).Here is what a batch handler might look like:```phpclass MyBatchHandler implements BatchHandlerInterface{    use BatchHandlerTrait;    public function __invoke(MyMessage $message, Acknowledger $ack = null)    {        return $this->handle($message, $ack);    }    private function process(array $jobs): void    {        foreach ($jobs as [$job, $ack]) {            try {                // [...] compute $result from $job                $ack->ack($result);            } catch (\Throwable $e) {                $ack->nack($e);            }        }    }}```By default, `$jobs` contains the messages to handle, but it can be anything as returned by `BatchHandlerTrait::schedule()` (eg a Symfony HttpClient response derived from the message, a promise, etc.).The size of the batch is controlled by `BatchHandlerTrait::shouldProcess()` (defaults to 10).The transport is acknowledged in batch, *after* the bus returned from dispatching (unlike what is done in#42873). This is especially important when considering transactions since we don't want to ack unless the transaction committed successfully.By default, pending batches are flushed when the worker is idle and when it stops.Commits-------81e52b2 [Messenger] allow processing messages in batches
Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment

Reviewers

@nicolas-grekasnicolas-grekasnicolas-grekas requested changes

@srozesrozeAwaiting requested review from sroze

@jderussejderusseAwaiting requested review from jderusse

@chalasrchalasrAwaiting requested review from chalasr

@TobionTobionAwaiting requested review from Tobion

@xabbuhxabbuhAwaiting requested review from xabbuh

@fabpotfabpotAwaiting requested review from fabpot

+1 more reviewer

@okwinzaokwinzaokwinza left review comments

Reviewers whose approvals may not affect merge requirements

Assignees

No one assigned

Projects

None yet

Milestone

5.4

Development

Successfully merging this pull request may close these issues.

[Messenger] Consume batch of messages (manually, instead of worker), consuming multiple messages at once, BULK message processing at once

5 participants

@lyrixx@carsonbot@okwinza@nicolas-grekas@jderusse

[8]ページ先頭

©2009-2025 Movatter.jp