Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

[Messenger] Add a redis transport#28681

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Closed
soyuka wants to merge1 commit intosymfony:masterfromsoyuka:redis-messenger

Conversation

@soyuka
Copy link
Contributor

QA
Branch?master
Bug fix?no
New feature?yes
BC breaks?no
Deprecations?no
Tests pass?not yet added!
Fixed ticketsn/a
LicenseMIT
Doc PRTODO symfony/symfony-docs#...

Hi!

This patch adds a redis transport to the Messenger component by using the official php redis-ext (https://github.com/phpredis/phpredis). It's a port ofhttps://github.com/soyuka/symfony-messenger-redis to be included directly in symfony.

How it works

Relevant discussion:https://twitter.com/jderusse/status/980768426116485122

The sender uses aList withRPUSH (add value to the tail of the list).
The receiver usesBRPOPLPUSH which reads the last element of the list and adds in to the head of another list (queue_processing). If no elements are present it'll block the connection until a new element shows up or the timeout is reached. When timeout is reached it works like a "ping" of some sort (calls$handler(null)).

On every iteration, we will check thequeue_processing list. For every items in this queue we have a correspondingkey in redis with a giventtl.
If the key has expired, the item isLREM (removed) fromqueue_processing and put back in the origin queue to be processed again. This workaround helps to avoid loosing messages.

Difference with AmqpExt

I'm proposing this feature as a Work In Progress because I'm not sure how I should handle the queue.
Indeed, in AmqpExt the queue is tight to the Connection whereas here, a queue should be linked to a message.
It can work by using the same queue for different messages but I'd not advise to do this because it's messy if you need to maintain your queues afterwards (for example if you want to remove only the queue for messageX you should just remove queueX).

What do you think? I can make the queue resilient inside theConnection class so that the code is closer toAMQPExt\Connection (means opening 1 connection per messages) or keep the queue inside theReceiver/Sender (one connection only).

When we find an agreement about this I'll add some tests and remove my WIP flag.

Thanks!

ragboyjr, dunglas, yceruto, and alexander-schranz reacted with thumbs up emoji
@ragboyjr
Copy link
Contributor

@soyuka This isn't necessarily directed at you, but I think just re-queing failed items or items that have been left in processing too long isn't a very safe way to go about this.

In my experience, when a message/task fails, it almost always fails again when re-run, and you end up just having 6 failed attempts instead of the one. And in some cases, if your system is connecting to external systems, it could end up causing unwanted duplicate data.

Not saying that retrying isn't a useful feature, but I think it should be something that should at least have the option to be turned off or maybe put into a failed jobs log or table.

soyuka and chalasr reacted with thumbs up emoji

* Takes last element (tail) of the list and add it to the processing queue (head - blocking)
* Also sets a key with TTL that will be checked by the `doCheck` method.
*/
public function waitAndGet(string $queue, int $processingTtl = 10000, int $blockingTimeout = 1000): ?array
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I actually made my own redis receiver for my company and found out that you can't use a blocking timeout over 60 seconds. It looks like redis ext has a bug with blocking timeouts around 60s and will eventually throw an exception with bad data sent over the network. I've stress tested this type of redis queue processor with a blocking timeout and never had any issues with 30s.

Copy link
ContributorAuthor

@soyukasoyukaOct 2, 2018
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Cool, are you suggesting that we should add some kind of test to ensure that the ttl is < 60 seconds? Do you set the\Redis::OPT_READ_TIMEOUT to-1?

Oh I see it's configurable in your bundle, I propose that we keep this value configurable but with a guard so that it doesn't exceed 60s.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Y, I'm not entirely sure where that threshold lies. Is it 55? 56? it's like right around 60 seconds when it starts to happen, YMMV. So not sure what's the best way to warn the user via an exception other than give them a verbal warning when reading the docs.

soyuka reacted with thumbs up emoji
@ragboyjr
Copy link
Contributor

https://github.com/krakphp/symfony-messenger-redis Here is the redis bundle I made and I admittedly copied several aspects of the redis part between thehttps://github.com/krakphp/job and your bundle.

@soyuka
Copy link
ContributorAuthor

but I think it should be something that should at least have the option to be turned off or maybe put into a failed jobs log or table.

Definitely!

I see that in your bundle you're using one queue per connection, therefore you are storing different kinds of messages on the same queue. Isn't this an issue while debugging or maintaining these lists? Do you have an opinion about this?
Thanks for the early comments!

@ragboyjr
Copy link
Contributor

@soyuka Well no because it's more like one queue per transport.

framework:  messenger:    transports:      main:        dsn: '%env(TRANSPORT_DSN)%'        options:          queue: queue_main      secondary:        dsn: '%env(TRANSPORT_DSN)%'        options:          queue: queue_secondary    routing:      App\MainMessage: main      App\SecondaryMessage: secondary

That would be an example configuration. This definitely allows multiple types of messages for a single queue/transport, but that just depends on how you want to scale out your transports. Some not so frequent messages can go on the main queue, but maybe a heavy traffic message might go on it's own so it can have a dedicated consumer and not get blocked.

soyuka reacted with thumbs up emoji

@soyukasoyukaforce-pushed theredis-messenger branch 2 times, most recently from9d1eed3 to7bf5a42CompareOctober 3, 2018 16:15
@soyukasoyuka changed the title[Messenger] WIP: Add a redis transport[Messenger] Add a redis transportOct 3, 2018
@soyuka
Copy link
ContributorAuthor

@soyuka This isn't necessarily directed at you, but I think just re-queing failed items or items that have been left in processing too long isn't a very safe way to go about this.

I've decided to follow@sroze implementation for Amqp which states that:

If something goes wrong while consuming and handling a message from the Redis broker, there are two choices: rejecting or re-queuing the message.
If the exception that is thrown by the bus while dispatching the message implements this interface (RejectMessageExceptionInterface), the message will be rejected. Otherwise, it will be re-queued.

$connectionCredentials = array(
'host' => $parsedUrl['host'] ?? '127.0.0.1',
'port' => $parsedUrl['port'] ?? 6379,
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Maybe we could also support redis db here as well?

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Connect doesn't seem to support the db though:https://github.com/phpredis/phpredis#connect-open

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

@soyuka may be Im wrong, but phpRedis can deal with databases. Look here how snc_redis do this:
https://github.com/snc/SncRedisBundle/blob/863a063114c68e62fea811c127f093a1ecd5ba9a/Factory/PhpredisClientFactory.php#L73

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Database selecting in phpRedis dochttps://github.com/phpredis/phpredis#select

*/
interface RejectMessageExceptionInterface extends \Throwable
{
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Hmm,@sroze@soyuka Do you think we could make these a common exception instead of per transport?

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Would make sense, especially if we want to switch from one transport to another without changing the code.

}
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

@sroze In#28547 when I mention that the logic is shared across different types of receivers, this is what I meant.

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Taking this path, it'd actually make sense to me to add aConnectionInterface but things may be really different from one transport to another.

throw new InvalidArgumentException(sprintf('The given Redis DSN "%s" is invalid.', $dsn));
}

$queue = isset($parsedUrl['path']) ? trim($parsedUrl['path'], '/') : 'messages';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Curious if it'd be better to put the queue apart of the DSN vs the options? Personally I prefer to use one DSN configured in the .env, and then re use it across multiple transports while changing the queue parameter in options.

Maybe we could support both? allow queue in the query param, but allow it to be overridden via the options?

@ragboyjr
Copy link
Contributor

@soyuka Extension looks great! Thanks for the hard work!

@ragboyjr
Copy link
Contributor

@soyuka in regards to Requeing the message by default and Rejecting if it implements a RejectMessageExceptionInterface

I think it would make more sense to default to Reject, and requeue if they implement a RequeueMessageExceptionInterface; however, that's my opinion for reasons as stated previously in this thread. If we end up sticking with defaulting to Requeue, do you think it would make sense to add a middleware/option in the messenger config to basically wrap all exceptions into a RejectMessageExceptionInterface?

@soyuka
Copy link
ContributorAuthor

do you think it would make sense to add a middleware/option in the messenger config to basically wrap all exceptions into a RejectMessageExceptionInterface?

Looks a bit too magic to me. I definitely get your concerns about requeuing though, and I'd be in favor of having aRequeueMessageExceptionInterface, with the default behavior set to rejecting messages that fail.

@ragboyjr
Copy link
Contributor

@soyuka with the release of redis streams in redis 5.0, i'm wondering if we should target that data structure for a more robust implementation. We also could possibly implement two different redis connections one for streams and one with lists.

soyuka and sroze reacted with thumbs up emoji

@weaverryanweaverryan mentioned this pull requestMar 12, 2019
36 tasks
@weaverryan
Copy link
Member

Ping@soyuka! If you're still willing to work in this, we just merged#30557, which clarifies what a transport needs to do / behave (should simplify the receiver a bit). I would love to see this PR updated!

chalasr reacted with thumbs up emoji

@soyuka
Copy link
ContributorAuthor

soyuka commentedMar 25, 2019
edited
Loading

Will definitely work on this thanks for the heads up, any idea if the related documentation is going to be updated?

/edit: I've looked at the PR, messenger is improving that's nice! I've an issue with the new interface though, because my Connection doesn't uses a Enveloppe for now (inreceive I'm working on a message, not anEnvelope).

@weaverryan
Copy link
Member

@soyuka Awesome!

I've looked at the PR, messenger is improving that's nice! I've an issue with the new interface though, because my Connection doesn't uses a Enveloppe for now (in receive I'm working on a message, not an Envelope).

Can you tell me more about this? Or you can ping me on Slack. Do you mean the methods likeReceiverInterface::ack(Envelope $envelope)? If that's true, it shouldn't be a problem because your receiver is not responsible for calling this anymore - theWorker will call this and it will pass you the Envelope. If you need some sort of "identifier" so you know which message to ack (you will), you should add a stamp to the Envelope before passing it to$handler. Check out the AmqpReceiver. We wrap in a custom stamp (https://github.com/symfony/symfony/blob/master/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php#L73) then use that inack() andreject() to know which message was received, so we can ack/reject it.

@sroze
Copy link
Contributor

groups are awesome if you have multiple systems receiving the same messages

I have no idea what these streams are all about, what is the awesomeness of them in this use-case? When you say that "most things is handled by redis itself" that sounds appealing... but what do you mean exactly? What are the use-cases/logic we wouldn't have to implement anymore?

the only con is that it only works with redis 5 and ext-redis 4.2.

ext-redis 4.2 has been released on the 2018-11-20 so I don't see this as an issue. The 4.3 has also been released and is considered as stable. Though, do you know what's the Redis 5 availability in most of the cloud providers?

soyuka reacted with thumbs up emoji

@soyuka
Copy link
ContributorAuthor

I don't think that streams are complex in case of the php implementation

I just looked at theredis introduction and it seems to have more features compared to using a normal list. This was the complexity I was talking about, not the one from the php implementation.

But I'm nobody so I can't do a decision here, just wanted to mention it here, thought the implemtation could maybe help.

Everyone's thought is appreciated! I wasn't even aware that redis had implemented this feature, so thanks for the discovery and I'll definitely take a look at your implementation!

@ragboyjr
Copy link
Contributor

My only concern with using streams (other than the additional complexity), is that forcing the use of the ext-redis extension can be an issue under heavier workloads.

We were getting intermittent errors and failures on redis consumer with sf messenger and after some investigation, it looks like they are just related to the redis-ext and those errors don't occur on predis.

phpredis/phpredis#831
predis/predis#524 (comment)

After we switched, those intermittent errors all went away..., However, predis doesn't seem actively maintained, It's a good library and seems to be more reliable than phpredis, but nrk/predis hasn't touched that repo in forever. Makes me think that a reliable fork or a new maintainer might be necessary for that library.

@soyuka
Copy link
ContributorAuthor

@ragboyjr if we want this to get merged into symfony we have to use the officialext-redis, we can't introduce a dependency to a third party because of maintenance issues.

chalasr and sroze reacted with thumbs up emoji

@alexander-schranz
Copy link
Contributor

alexander-schranz commentedMar 27, 2019
edited
Loading

I have no idea what these streams are all about, what is the awesomeness of them in this use-case? When you say that "most things is handled by redis itself" that sounds appealing... but what do you mean exactly? What are the use-cases/logic we wouldn't have to implement anymore?

Lets say you have 3 Systems. System A is sending messages and System B and C want to receive this messages. When you want that B and C receive both all messages without redis streams you need to implement that logic yourself or sending them into 2 lists or something (correct me if I'm wrong). With redis streams you can just create 2 groups. one for system b and one for system c and redis will give you the messages which you did not receive yet.

I personally found it really simple adding a new messages to a stream can be done with:

$this->redis->xAdd('STREAMNAME','*', ['content' =>json_encode($encodedMessage)]);

Reading the message can be done with:

$messages =$this->redis->xReadGroup('GROUPB','CONSUMER1', [$this->stream =>0],1,45);

Redis automatically sets the read messages into a pending state for this group so if you example have for the same group multiple consumer they will not get the same message again.

And after you did successfully process the message you mark it in your group as acked:

$this->redis->xAck('STREAMNAME','GROUPB', [$messageId]);

So if you later need a System D also receiving all messages you just create in redis a GroupD and so system D is also starting to receive all messages.

So streams will shine if you have multiple systems receiving messages. If you just have 2 Systems a Sender and a Receiver there will be no differents. But if you have multiple receivers Redis will handle you which group/system did receive a message and which one doesn't.

The following is also a interesting blog post about streams:https://brandur.org/redis-streams.

it seems to have more features compared to using a normal list. This was the complexity I was talking about, not the one from the php implementation.

I think as the streams feature is more build for this usecase less complex because of its features and automatic mechanisms.

But maybe I have to less knowledge about the current implementation and maybe it makes sense to have 2 redis transports one using the redis list's and one using redis streams.

sroze, soyuka, and chirimoya reacted with heart emoji

@sroze
Copy link
Contributor

Indeed, if there is this logic of parking the messages until they are acknowledged, this sounds very interesting because the less we have to deal with these distributed system issues, the better 🙃

soyuka and ragboyjr reacted with thumbs up emoji

@soyuka
Copy link
ContributorAuthor

@alexander-schranz many thanks about this detailed explanation, it helped me to understand better why redis streams may be good for this use case! I'll definitely look into that and use them in this transport implementation!

@ragboyjr
Copy link
Contributor

@soyuka I think the SF Lock and SF Cache component are able to utilize both redis libraries, i'd imagine if we did a redis transport using the list strategy, we should be able to provide some sort of compat layer to using either of the redis libraries.

I'd be willing to help if needed, but as someone who has used a redis transport with messenger at a high volume on both ext-redis and predis, I wouldn't feel comfortable only allowing ext-redis.

@weaverryan
Copy link
Member

Ping! Do we have some direction/motivation/time to finish this? We're after feature freeze, though there is some wiggle room probably because this component is experimental.

@soyuka
Copy link
ContributorAuthor

Ping! Do we have some direction/motivation/time to finish this? We're after feature freeze, though there is some wiggle room probably because this component is experimental.

To me, time is an issue, maybe this week-end but definitely not sooner. Also, the streams must be tested they may improve the transport a lot, which means more work.

@weaverryan
Copy link
Member

To me, time is an issue

I hear that :). Good candidate maybe for the hackathon this weekend, if not by you - by someone else? We could check to see if anyone has the expertise & is interested.

Cheers!

soyuka reacted with thumbs up emoji

@soyuka
Copy link
ContributorAuthor

I'd love to work on this this week end but as I'm the Api Platform referent I'm not sure that I'll be able too :p.

@alexander-schranz
Copy link
Contributor

@weaverryan@soyuka I will have a look at the current state at theEU-FOSSA Hackathon and will then refractor it to use the redis stream functions.

chirimoya reacted with thumbs up emojichirimoya reacted with hooray emojichirimoya reacted with heart emoji

@weaverryan
Copy link
Member

Added the hackathon star - thank you@alexander-schranz!

alexander-schranz reacted with thumbs up emoji

@alexander-schranz
Copy link
Contributor

alexander-schranz commentedApr 6, 2019
edited
Loading

Analysed the current state and after also having a look at the new ReceiverInterface and playing around about it my concept will be the following:

+-----------R|    GET    | -> XREADGROUP+-----------+      |      | handleMessage      V+-----------+  No|  failed?  |---------------------------++-----------+                           |      |                                 |      | Yes                             |      V                                 |+-----------+  No                       ||   retry?  |---------------------------++-----------+                           |      |                                 |      | Yes                             |      V                                 V+-----------R                    +-----------R|   REJECT  | -> XACK            |    ACK    | -> XACK+-----------+                    +-----------+

GET: Will useXREADGROUP to read the one message from the stream
REJECT: Reject will just remove the message withXACK from the stream as adding it back to the stream is handled by symfony worker itself
ACK: Will use theXACK Method to ack the message for the specific group

The sender will still be simple by calling theXADD redis function.

#EU-FOSSA

soyuka reacted with thumbs up emoji

@weaverryan
Copy link
Member

I don't know about the Redis streams part, but I can confirm the rest of the diagram is perfect: you're acking/nacking with the correct logic.

@sroze
Copy link
Contributor

@alexander-schranz great. Fancy updating this pull-request (or creating another one)?

@alexander-schranz
Copy link
Contributor

@sroze If I can I will update this PR so we have all discussion about it here.

soyuka reacted with thumbs up emoji

Copy link
Member

@NyholmNyholm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Cool. I like this. I will test this later.

return null;
}

$key = md5($value['body']);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

sha1 is quicker. I think we should use that one instead.

*/
public function ack($message)
{
$key = md5($message['body']);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

sha1

/**
* Reject the message: we acknowledge it, means we remove it form the queues.
*
* @TODO: log something?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This should be removed or addressed.

*/
public function requeue($message)
{
$key = md5($message['body']);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

sha1

}

/**
* Add item at the tail of list.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This comment is wrong. We are doing a queue and not a stack. =)

We are adding items to thehead of the list.

Usinglpush is correct.

$pending = $this->connection->lRange($processingQueue, 0, -1);

foreach ($pending as $temp) {
$key = md5($temp['body']);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

sha1

@soyuka
Copy link
ContributorAuthor

Please feel free to update it!

Nyholm reacted with heart emoji

@alexander-schranz
Copy link
Contributor

I'm not able to push into@soyuka's fork so I created a new PR#30917
/cc@Nyholm@sroze

@soyuka
Copy link
ContributorAuthor

Closing this one then!

@soyukasoyuka closed thisApr 6, 2019
fabpot added a commit that referenced this pull requestApr 27, 2019
…ander-schranz)This PR was merged into the 4.3-dev branch.Discussion----------[Messenger] Add a redis stream transport| Q             | A| ------------- | ---| Branch?       | master| Bug fix?      | no| New feature?  | yes| BC breaks?    | no| Deprecations? | no| Tests pass?   | Yes| Fixed tickets |#28681| License       | MIT| Doc PR        |symfony/symfony-docs#11341As discussed in#28681 this will refractor@soyuka implementation of redis using the redis stream features so we don't need to handle parking the messages ourself and redis is doing it for us.Some interesting links about streams: -https://redis.io/topics/streams-intro -https://brandur.org/redis-streams```+-----------R|    GET    | -> XREADGROUP+-----------+      |      | handleMessage      V+-----------+  No|  failed?  |---------------------------++-----------+                           |      |                                 |      | Yes                             |      V                                 |+-----------+  No                       ||   retry?  |---------------------------++-----------+                           |      |                                 |      | Yes                             |      V                                 V+-----------R                     +-----------R|   REJECT  | -> XDEL             |    ACK    | -> XACK+-----------+                     +-----------+```**GET**: Will use `XREADGROUP` to read the one  message from the stream**REJECT**: Reject will just remove the message with `XDEL` from the stream as adding it back to the stream is handled by symfony worker itself**ACK**: Will use the `XACK` Method to ack the message for the specific groupThe sender will still be simple by calling the `XADD` redis function.#EU-FOSSACommits-------ff0b855 Refractor redis transport using redis streams7162d2e Implement redis transport
@nicolas-grekasnicolas-grekas modified the milestones:next,4.3Apr 30, 2019
Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment

Reviewers

@NyholmNyholmNyholm requested changes

+2 more reviewers

@ragboyjrragboyjrragboyjr left review comments

@CvekCodingCvekCodingCvekCoding left review comments

Reviewers whose approvals may not affect merge requirements

Assignees

No one assigned

Projects

None yet

Milestone

4.3

Development

Successfully merging this pull request may close these issues.

9 participants

@soyuka@ragboyjr@weaverryan@alexander-schranz@sroze@Nyholm@CvekCoding@nicolas-grekas@carsonbot

[8]ページ先頭

©2009-2025 Movatter.jp