Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

[Messenger] Add receiving of old pending messages (redis)#35384

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
fabpot merged 1 commit intosymfony:masterfromtoooni:claim_abandoned_messages
Feb 8, 2020
Merged

[Messenger] Add receiving of old pending messages (redis)#35384

fabpot merged 1 commit intosymfony:masterfromtoooni:claim_abandoned_messages
Feb 8, 2020

Conversation

toooni
Copy link
Contributor

@tooonitoooni commentedJan 19, 2020
edited
Loading

QA
Branch?master
Bug fix?no
New feature?yes
Deprecations?no
Tickets
LicenseMIT
Doc PRsymfony/symfony-docs#12976

This PR makes it possible for the redis transport to get abandoned messages from not running/idle consumers by usingXPENDING andXCLAIM.
Usually it would be best to let the claiming of pending messages be handled by a separate command. Since the messenger component's commands are fixed, we do need to set aclaimTimeout. TheclaimTimeout defines how long an idle message should be left alone until it will get claimed by the current consumer (Must be a value higher than the longest running handling time of a message or else the message will be handled twice).
Using this solution makes the remarks (symfony/symfony-docs#11869 (review)) regarding not being able to use the hostname as consumer name obsolete. I would even recommend the hostname as the consumer name.

Questions

  • Which value should we use as defaultclaimTimeout?
  • How should theclaimTimeout be configured?
  • Feature or Bugfix?

I will create a docs PR and a PR for the other branches as soon as the questions are resolved.

@tooonitoooni requested a review fromsroze as acode ownerJanuary 19, 2020 08:32
@tooonitoooni changed the titleUse xpending and xclaim to retrieve abandoned messages[Messenger] Redis Transport: Use XPENDING and XCLAIM to retrieve abandoned messagesJan 19, 2020
@chalasrchalasr added this to the4.4 milestoneJan 19, 2020
@chalasr
Copy link
Member

Thanks for the PR. I'll find time to give my thoughts asap.
ping@alexander-schranz also

@alexander-schranz
Copy link
Contributor

alexander-schranz commentedJan 21, 2020
edited
Loading

As written with you on slack@toooni this should moved into a function call inside the following if line:

if ($this->couldHavePendingMessages) {
$messageId ='0';// will receive consumers pending messages
}

so e.g.:

if ($this->couldHavePendingMessages) {if ($this->isRandomConsumer) {$this->claimOldMessages();// this function does the xpending and xclaim// the logic after this doesn't need to change as it will read the correct messages            }$messageId ='0';// will receive consumers pending/claimed messages        }

as it is required to have a uniqid consumer name for your flag you should do that in the constructor of the connection here:

$this->consumer =$configuration['consumer'] ??self::DEFAULT_OPTIONS['consumer'];

e.g.:

if ($configuration['randomconsumer'] ==='true') {$this->consumer .='_'.uniqid();$this->isRandomConsumer =true;}

and currently you still will end in a race condition as you still use the same consumer name else both will claim the same messages.

The couldHavePendingMessages should be at start on true and only then claim old messages not in every loop so that flag you should not touch in your PR when you move the logic to the claimOldMessages as written above.

The whole logic should be behind a flag e.g.randomConsumer=true as it is a bc break how messages are consumed, because currently one consumer you could be 100% sure that the order of the messages is the same as the created this can not longer be provided. As written its bad practice to spam redis with new consumers on every connection so its not what I recommend to use, but I understand thats for some people its annoying to set a unique consumer name themselves. My recommendation still would be if you want to start multiple processes on the same server use something like instead of a random consumer which claims other messages:

CONSUMER_NAME=consumer1 bin/console messenger:consume ...CONSUMER_NAME=consumer2 bin/console messenger:consume ...CONSUMER_NAME=consumer3 bin/console messenger:consume ...

And for kubernetes thestateful should be used. This implementation is something I would only use if you have no control which with env the consumer are started and you wanted to have multiple consumers.

@toooni
Copy link
ContributorAuthor

toooni commentedJan 21, 2020
edited
Loading

@alexander-schranz I don't think so. With the PR the flow looks like the following (It's not everything we talked about):

  • couldHavePendingMessages isfalse by default
  • XPENDING checks for pending message
  • if there is a pending message for the current consumercouldHavePendingMessages is set to true
  • if there is a pending message from another consumer which has a higher idle than theclaimTimeout, the message gets claimed. (XPENDING check on next run will return this message again for the current consumer which will then setcouldHavePendingMessages totrue
  • depending oncouldHavePendingMessages the id0 or> is set to get a pending message or the next message in the queue

This PR currently has nothing to do with random consumer names. I intentionally didn't include this in the PR because it's not directly related to this. The benefit of this PR is that no messages are being lost - even if you are using random consumer names or for example the hostname as the consumer name (or you are just switching to a lower number of consumers). Scaling consumers up or down without this feature is not really possible.

To haveXPENDING run on everyget() call isn't very good for performance, i know. But it makes the whole transport safe to not loosing messages whatever you do. (Like the delay implementation).
The best would be ifXPENDING withXCLAIM would be called by a second command regularly but unfortunately this isn't possible.
Not runningXPENDING on everyget() call and using a timeout or counter to useXPENDING only periodically does not work because of possible lowtime-limit,memory-limit orlimit options of the consume command.

@alexander-schranz
Copy link
Contributor

alexander-schranz commentedJan 22, 2020
edited
Loading

@toooni currenty never a message is lost because on restart the couldHavePendingMessages is true and will read all messages which are not acked so this statement is not true when using it the correct way.

Using this implementation you provided without a random consumer name could end up in the same error you have in your issue as if 2 consume messages start at the same time a race condition between xpending and xclaim both processes would have the same consumer name and so they will process the same messages the xclaim doesnt do anything in this case as the old messages where the same consumer name. So the only thing is that we stopped reading crashed pending messages immediately after the crash and make the maybe exception a little less common because it happens later at the time-lmit but it still can happen at that place if the processes start at the same time, so for me this way is not a fix of that issue then.

Unless that if we really want to implement it this way we should move that xpending <-> xclaim part into a function and call that not on everyget. I would say on first call of the get and when the time is reached or a xpending message could be old enough to get claimed.

@alexander-schranz
Copy link
Contributor

alexander-schranz commentedJan 22, 2020
edited
Loading

I tested a little bit around and thing we should implement it this way then@toooni:

We only claim messages if there are no pending messages so we don't change the old behaviour.
So on first start it will behave like before. First read its own pending message and finish it. After it finished couldHavePendingMessages is false and it will claim messages of other consumers. With the claimInterval we avoid that its called on every get and make it also possible to disable the behaviour by providing a high claimInterval.

private$couldHavePendingMessages =true;private$lastClaim =0;private$claimInterval =1000;// not sure in which interval we want to check for claim messages maybe evey 10 seconds? parameter should be configureable ?claim-interval=publicfunctionget()    {// ...if (!$this->couldHavePendingMessages && ($this->lastClaim +$this->claimInterval) >time()) {$this->claimOldPendingMessages();         }$messageId ='>';// will receive new messagesif ($this->couldHavePendingMessages) {$messageId ='0';// will receive pending messages         }// ...    }privatefunctionclaimOldPendingMessages()     {try {// This could soon be optimized with https://github.com/antirez/redis/issues/5212 or// https://github.com/antirez/redis/issues/6256$pendingMessages =$this->connection->xpending($this->stream,$this->group,'-','+',3);        }catch (\RedisException$e) {thrownewTransportException($e->getMessage(),0,$e);        }$claimableIds = [];foreach ($pendingMessagesas$pendingMessage) {if ($pendingMessage[1] ===$this->consumer) {$this->couldHavePendingMessages =true;continue;            }if ($pendingMessage[2] < (time() -$this->redeliveryTime)) {$claimableIds[] =$pendingMessage[0];            }        }if (\count($claimableIds) >0) {try {$this->connection->xclaim($this->stream,$this->group,$this->consumer,$this->claimTimeout,$claimableIds,                    ['JUSTID']                );$this->couldHavePendingMessages =true;            }catch (\RedisException$e) {thrownewTransportException($e->getMessage(),0,$e);            }        }$this->lastClaim =time();     }

With that claimInterval it will be possible to disable the behaviour if somebody is using a

@toooni
Copy link
ContributorAuthor

Just for traceability:

currenty never a message is lost because on restart the couldHavePendingMessages is true and will read all messages which are not acked so this statement is not true when using it the correct way.

The messages are lost if there won't be a consumer with the same name again. This isn't only an issue if you want to use random names but also if you want to scale your workers up and down.

@alexander-schranz
Copy link
Contributor

@toooni

The messages are lost if there won't be a consumer with the same name again. This isn't only an issue if you want to use random names but also if you want to scale your workers up and down.

Oh yeah you are totally correct about that use case 👍 some little changes then it looks good for me.

Copy link
Contributor

@alexander-schranzalexander-schranz left a comment
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

@toooni thx for your time and patience. Code looks code from my side need to test it in the evening.

@alexander-schranz
Copy link
Contributor

alexander-schranz commentedJan 22, 2020
edited
Loading

@toooni as the xpending response doesnt seems to look the same for me I needed to change how the message timestamp is read. And I needed to disable the claimIdleTime for the integration test as symfony clockmock have no effect on the redis min idle time on xclaim.

diff --git a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.phpindex 537c3fba68..04a61e0e9c 100644--- a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php+++ b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php@@ -32,6 +32,7 @@ class RedisExtIntegrationTest extends TestCase          $this->redis = new \Redis();         $this->connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), [], $this->redis);+        $this->connection->disableClaimIdleTime(true);         $this->connection->cleanup();         $this->connection->setup();     }@@ -104,4 +105,36 @@ class RedisExtIntegrationTest extends TestCase         $this->assertEquals($body, $encoded['body']);         $this->assertEquals($headers, $encoded['headers']);     }++    public function testConnectionGetClaimMessages()+    {+        $body = '{"message": "Hi"}';+        $headers = ['type' => DummyMessage::class];++        $this->connection->add($body, $headers);++        // Read message with other consumer+        $messages = $this->redis->xreadgroup(+            $this->connection->getGroup(),+            'other-consumer2',+            [$this->connection->getStream() => '>'],+            1+        );++        // Queue will not have any messages yet+        $this->assertNull($this->connection->get());++        // Wait for next claim check+        sleep(1001);++        // Should still be empty+        $this->assertNull($this->connection->get());++        // Wait for redelivery timeout+        sleep(2600);++        $encoded = $this->connection->get();+        $this->assertEquals($body, $encoded['body']);+        $this->assertEquals($headers, $encoded['headers']);+    } }diff --git a/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php b/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.phpindex fea1bb8cbf..959e6b6a8f 100644--- a/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php+++ b/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php@@ -48,6 +48,7 @@ class Connection     private $redeliverTimeout;     private $nextClaim = 0;     private $claimInterval;+    private $disabledClaimIdleTime = false;     private $couldHavePendingMessages = true;      public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null)@@ -74,7 +75,7 @@ class Connection         $this->queue = $this->stream.'__queue';         $this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup'];         $this->maxEntries = $configuration['stream_max_entries'] ?? self::DEFAULT_OPTIONS['stream_max_entries'];-        $this->redeliverTimeout = ($configuration['redeliver_timeout'] ?? self::DEFAULT_OPTIONS['redeliver_timeout']) * 1000;+        $this->redeliverTimeout = $configuration['redeliver_timeout'] ?? self::DEFAULT_OPTIONS['redeliver_timeout'];         $this->claimInterval = $configuration['claim_interval'] ?? self::DEFAULT_OPTIONS['claim_interval'];     }@@ -167,6 +168,7 @@ class Connection         }          $claimableIds = [];+         foreach ($pendingMessages as $pendingMessage) {             if ($pendingMessage[1] === $this->consumer) {                 $this->couldHavePendingMessages = true;@@ -174,18 +176,26 @@ class Connection                 return;             }-            if ($pendingMessage[2] < (time() - $this->redeliverTimeout)) {+            $messageTimestampInSeconds = (int) (intval($pendingMessage[0]) / 1000);+            if ($messageTimestampInSeconds < (time() - $this->redeliverTimeout)) {                 $claimableIds[] = $pendingMessage[0];             }         }          if (\count($claimableIds) > 0) {+            $claimIdleTime = ($this->redeliverTimeout * 1000);++            if ($this->disabledClaimIdleTime) {+                // For tests it needed to disable the idle timeout+                $claimIdleTime = 0;+            }+             try {                 $this->connection->xclaim(                     $this->stream,                     $this->group,                     $this->consumer,-                    $this->redeliverTimeout,+                    $claimIdleTime,                     $claimableIds,                     ['JUSTID']                 );@@ -388,9 +398,24 @@ class Connection         return (int) (microtime(true) * 1000);     }+    public function disableClaimIdleTime(): void+    {+        $this->disabledClaimIdleTime = true;+    }+     public function cleanup(): void     {         $this->connection->del($this->stream);         $this->connection->del($this->queue);     }++    public function getStream(): string+    {+        return $this->stream;+    }++    public function getGroup(): string+    {+        return $this->group;+    } }

@toooni
Copy link
ContributorAuthor

@chalasr@alexander-schranz I've updated theclaim_interval default value to 1 Minute. It doesn't make sense to have it lower if we have a defaultredeliver_timeout of 1 hour.

@chalasr
Copy link
Member

chalasr commentedFeb 2, 2020
edited
Loading

they might want to set their own values for these options.

Which would mean that they are using this change as a new feature, since they would need to update their code. Upgrading to patch versions should not involve code changes.

Sorry to insist, but if we are not confident that the default values are good enough for everyone as a bugfix, then this PR should target master.
Inlining the default values for 3.4 and exposing these options as a new feature on master (and document it on master as well) is the right tradeoff IMHO.

@toooni
Copy link
ContributorAuthor

I will change the PR to target themaster branch. But I don't think it makes sense to introduce these changes to 3.4 if there is no way to change the default values.

@tooonitoooni changed the base branch from4.4 tomasterFebruary 3, 2020 15:25
@tooonitoooni changed the title[Messenger] Fix receiving of old pending messages (redis)[Messenger] Add receiving of old pending messages (redis)Feb 3, 2020
@toooni
Copy link
ContributorAuthor

@chalasr The PR now targets the master branch 👍

@chalasrchalasr modified the milestones:4.4,nextFeb 4, 2020
@chalasrchalasr added Feature and removed Bug labelsFeb 4, 2020
@toooni
Copy link
ContributorAuthor

@chalasr I've rebased again but travisci is still failing because of anothermaster branch issue.

@fabpot
Copy link
Member

Thank you@toooni.

toooni and OriolyMainAccount reacted with thumbs up emojitoooni reacted with hooray emoji

fabpot added a commit that referenced this pull requestFeb 8, 2020
…is) (toooni)This PR was merged into the 5.1-dev branch.Discussion----------[Messenger] Add receiving of old pending messages (redis)| Q             | A| ------------- | ---| Branch?       | master| Bug fix?      | no| New feature?  | yes| Deprecations? | no| Tickets       || License       | MIT| Doc PR        |symfony/symfony-docs#12976This PR makes it possible for the redis transport to get abandoned messages from not running/idle consumers by using `XPENDING` and `XCLAIM`.Usually it would be best to let the claiming of pending messages be handled by a separate command. Since the messenger component's commands are fixed, we do need to set a `claimTimeout`. The `claimTimeout` defines how long an idle message should be left alone until it will get claimed by the current consumer (Must be a value higher than the longest running handling time of a message or else the message will be handled twice).Using this solution makes the remarks (symfony/symfony-docs#11869 (review)) regarding not being able to use the hostname as consumer name obsolete. I would even recommend the hostname as the consumer name.**Questions**- [x] Which value should we use as default `claimTimeout`?- [x] How should the `claimTimeout` be configured?- [x] Feature or Bugfix?I will create a docs PR and a PR for the other branches as soon as the questions are resolved.Commits-------9cb6fdf Implemted receiving of old pending messages
@fabpotfabpot merged commit9cb6fdf intosymfony:masterFeb 8, 2020
chalasr added a commit that referenced this pull requestFeb 11, 2020
This PR was merged into the 5.1-dev branch.Discussion----------[Messenger][Redis] Add missing changelog entry| Q             | A| ------------- | ---| Branch?       | master| Bug fix?      | no| New feature?  | no| Deprecations? | no| Tickets       | -| License       | MIT| Doc PR        | -Missed in#35384Commits-------08fb0c4 [Messenger][Redis] Add missing changelog entry
@nicolas-grekasnicolas-grekas modified the milestones:next,5.1May 4, 2020
@fabpotfabpot mentioned this pull requestMay 5, 2020
@tooonitoooni deleted the claim_abandoned_messages branchJuly 3, 2020 13:46
@cesurapp
Copy link
Contributor

Is this feature implemented for symfony 4.4?

@alexander-schranz
Copy link
Contributor

alexander-schranz commentedAug 13, 2020
edited
Loading

Hello @appaydin,

no this is part of 5.1 as you see linked above in the Release 5.1. But there is also another way to find out in which release a merged pull request is by clicking in github on the merge commit (look for the merge symbol) and then click on the commit id in this case9cb6fdf:

Bildschirmfoto 2020-08-13 um 10 04 43

And then you see there at the top when you expand the... in which versions the commit is:

Bildschirmfoto 2020-08-13 um 10 06 34

@cesurapp
Copy link
Contributor

Hello @appaydin,

no this is part of 5.1 as you see linked above in the Release 5.1. But there is also another way to find out in which release a merged pull request is by clicking in github on the merge commit in this case9cb6fdf:

Bildschirmfoto 2020-08-13 um 10 04 43

And then you see there at the top when you expand the... in which versions the commit is:

Bildschirmfoto 2020-08-13 um 10 06 34

Ty, It is a pity that it is not for 4.4

@stof
Copy link
Member

New features are never backported to patch releases.

@cesurapp
Copy link
Contributor

Yeni özellikler hiçbir zaman yama sürümlerine geri yüklenmez.

My opinion should not be this new version. Because I'm getting this error right now. I have implemented a workaround by increasing the "startsecs" and "startretries" values with Supervisor.

@alexander-schranz
Copy link
Contributor

@appaydin this is a feature for scaling down consumers. You should as redis recommend always reuse consumer names and make sure that your consumer name is unique. Don't use random consumer names thats not recommended by redis as it will bloat the memory usage of redis. Aslong as you don't need to scale down consumers you should not need this feature of receiving messages of other consumers.

wouterj added a commit to symfony/symfony-docs that referenced this pull requestOct 23, 2020
…ooni)This PR was submitted for the master branch but it was squashed and merged into the 5.1 branch instead.Discussion----------Added redeliver_timeout and claim_interval optionsAdds new optional config parameters introduced bysymfony/symfony#35384Commits-------8fe20c3 Added redeliver_timeout and claim_interval options
Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment
Reviewers

@alexander-schranzalexander-schranzalexander-schranz approved these changes

@chalasrchalasrchalasr approved these changes

@srozesrozeAwaiting requested review from sroze

Assignees
No one assigned
Projects
None yet
Milestone
5.1
Development

Successfully merging this pull request may close these issues.

8 participants
@toooni@chalasr@alexander-schranz@fabpot@cesurapp@stof@nicolas-grekas@carsonbot

[8]ページ先頭

©2009-2025 Movatter.jp