Uh oh!
There was an error while loading.Please reload this page.
- Notifications
You must be signed in to change notification settings - Fork9.7k
[Messenger] Add a redis transport#28681
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Conversation
ragboyjr commentedOct 2, 2018
@soyuka This isn't necessarily directed at you, but I think just re-queing failed items or items that have been left in processing too long isn't a very safe way to go about this. In my experience, when a message/task fails, it almost always fails again when re-run, and you end up just having 6 failed attempts instead of the one. And in some cases, if your system is connecting to external systems, it could end up causing unwanted duplicate data. Not saying that retrying isn't a useful feature, but I think it should be something that should at least have the option to be turned off or maybe put into a failed jobs log or table. |
| * Takes last element (tail) of the list and add it to the processing queue (head - blocking) | ||
| * Also sets a key with TTL that will be checked by the `doCheck` method. | ||
| */ | ||
| public function waitAndGet(string $queue, int $processingTtl = 10000, int $blockingTimeout = 1000): ?array |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I actually made my own redis receiver for my company and found out that you can't use a blocking timeout over 60 seconds. It looks like redis ext has a bug with blocking timeouts around 60s and will eventually throw an exception with bad data sent over the network. I've stress tested this type of redis queue processor with a blocking timeout and never had any issues with 30s.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Cool, are you suggesting that we should add some kind of test to ensure that the ttl is < 60 seconds? Do you set the\Redis::OPT_READ_TIMEOUT to-1?
Oh I see it's configurable in your bundle, I propose that we keep this value configurable but with a guard so that it doesn't exceed 60s.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Y, I'm not entirely sure where that threshold lies. Is it 55? 56? it's like right around 60 seconds when it starts to happen, YMMV. So not sure what's the best way to warn the user via an exception other than give them a verbal warning when reading the docs.
ragboyjr commentedOct 2, 2018
https://github.com/krakphp/symfony-messenger-redis Here is the redis bundle I made and I admittedly copied several aspects of the redis part between thehttps://github.com/krakphp/job and your bundle. |
soyuka commentedOct 2, 2018
Definitely! I see that in your bundle you're using one queue per connection, therefore you are storing different kinds of messages on the same queue. Isn't this an issue while debugging or maintaining these lists? Do you have an opinion about this? |
ragboyjr commentedOct 2, 2018
@soyuka Well no because it's more like one queue per transport. That would be an example configuration. This definitely allows multiple types of messages for a single queue/transport, but that just depends on how you want to scale out your transports. Some not so frequent messages can go on the main queue, but maybe a heavy traffic message might go on it's own so it can have a dedicated consumer and not get blocked. |
9d1eed3 to7bf5a42Comparesoyuka commentedOct 3, 2018
I've decided to follow@sroze implementation for Amqp which states that:
|
| $connectionCredentials = array( | ||
| 'host' => $parsedUrl['host'] ?? '127.0.0.1', | ||
| 'port' => $parsedUrl['port'] ?? 6379, | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Maybe we could also support redis db here as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Connect doesn't seem to support the db though:https://github.com/phpredis/phpredis#connect-open
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
@soyuka may be Im wrong, but phpRedis can deal with databases. Look here how snc_redis do this:
https://github.com/snc/SncRedisBundle/blob/863a063114c68e62fea811c127f093a1ecd5ba9a/Factory/PhpredisClientFactory.php#L73
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Database selecting in phpRedis dochttps://github.com/phpredis/phpredis#select
| */ | ||
| interface RejectMessageExceptionInterface extends \Throwable | ||
| { | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Would make sense, especially if we want to switch from one transport to another without changing the code.
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Taking this path, it'd actually make sense to me to add aConnectionInterface but things may be really different from one transport to another.
| throw new InvalidArgumentException(sprintf('The given Redis DSN "%s" is invalid.', $dsn)); | ||
| } | ||
| $queue = isset($parsedUrl['path']) ? trim($parsedUrl['path'], '/') : 'messages'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Curious if it'd be better to put the queue apart of the DSN vs the options? Personally I prefer to use one DSN configured in the .env, and then re use it across multiple transports while changing the queue parameter in options.
Maybe we could support both? allow queue in the query param, but allow it to be overridden via the options?
ragboyjr commentedOct 3, 2018
@soyuka Extension looks great! Thanks for the hard work! |
ragboyjr commentedOct 3, 2018
@soyuka in regards to Requeing the message by default and Rejecting if it implements a RejectMessageExceptionInterface I think it would make more sense to default to Reject, and requeue if they implement a RequeueMessageExceptionInterface; however, that's my opinion for reasons as stated previously in this thread. If we end up sticking with defaulting to Requeue, do you think it would make sense to add a middleware/option in the messenger config to basically wrap all exceptions into a RejectMessageExceptionInterface? |
soyuka commentedOct 8, 2018
Looks a bit too magic to me. I definitely get your concerns about requeuing though, and I'd be in favor of having a |
ragboyjr commentedDec 9, 2018
@soyuka with the release of redis streams in redis 5.0, i'm wondering if we should target that data structure for a more robust implementation. We also could possibly implement two different redis connections one for streams and one with lists. |
weaverryan commentedMar 23, 2019
soyuka commentedMar 25, 2019 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
Will definitely work on this thanks for the heads up, any idea if the related documentation is going to be updated? /edit: I've looked at the PR, messenger is improving that's nice! I've an issue with the new interface though, because my Connection doesn't uses a Enveloppe for now (in |
weaverryan commentedMar 25, 2019
@soyuka Awesome!
Can you tell me more about this? Or you can ping me on Slack. Do you mean the methods like |
sroze commentedMar 27, 2019
I have no idea what these streams are all about, what is the awesomeness of them in this use-case? When you say that "most things is handled by redis itself" that sounds appealing... but what do you mean exactly? What are the use-cases/logic we wouldn't have to implement anymore?
|
soyuka commentedMar 27, 2019
I just looked at theredis introduction and it seems to have more features compared to using a normal list. This was the complexity I was talking about, not the one from the php implementation.
Everyone's thought is appreciated! I wasn't even aware that redis had implemented this feature, so thanks for the discovery and I'll definitely take a look at your implementation! |
ragboyjr commentedMar 27, 2019
My only concern with using streams (other than the additional complexity), is that forcing the use of the ext-redis extension can be an issue under heavier workloads. We were getting intermittent errors and failures on redis consumer with sf messenger and after some investigation, it looks like they are just related to the redis-ext and those errors don't occur on predis. phpredis/phpredis#831 After we switched, those intermittent errors all went away..., However, predis doesn't seem actively maintained, It's a good library and seems to be more reliable than phpredis, but nrk/predis hasn't touched that repo in forever. Makes me think that a reliable fork or a new maintainer might be necessary for that library. |
soyuka commentedMar 27, 2019
@ragboyjr if we want this to get merged into symfony we have to use the official |
alexander-schranz commentedMar 27, 2019 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
Lets say you have 3 Systems. System A is sending messages and System B and C want to receive this messages. When you want that B and C receive both all messages without redis streams you need to implement that logic yourself or sending them into 2 lists or something (correct me if I'm wrong). With redis streams you can just create 2 groups. one for system b and one for system c and redis will give you the messages which you did not receive yet. I personally found it really simple adding a new messages to a stream can be done with: $this->redis->xAdd('STREAMNAME','*', ['content' =>json_encode($encodedMessage)]); Reading the message can be done with: $messages =$this->redis->xReadGroup('GROUPB','CONSUMER1', [$this->stream =>0],1,45); Redis automatically sets the read messages into a pending state for this group so if you example have for the same group multiple consumer they will not get the same message again. And after you did successfully process the message you mark it in your group as acked: $this->redis->xAck('STREAMNAME','GROUPB', [$messageId]); So if you later need a System D also receiving all messages you just create in redis a GroupD and so system D is also starting to receive all messages. So streams will shine if you have multiple systems receiving messages. If you just have 2 Systems a Sender and a Receiver there will be no differents. But if you have multiple receivers Redis will handle you which group/system did receive a message and which one doesn't. The following is also a interesting blog post about streams:https://brandur.org/redis-streams.
I think as the streams feature is more build for this usecase less complex because of its features and automatic mechanisms. But maybe I have to less knowledge about the current implementation and maybe it makes sense to have 2 redis transports one using the redis list's and one using redis streams. |
sroze commentedMar 28, 2019
Indeed, if there is this logic of parking the messages until they are acknowledged, this sounds very interesting because the less we have to deal with these distributed system issues, the better 🙃 |
soyuka commentedMar 28, 2019
@alexander-schranz many thanks about this detailed explanation, it helped me to understand better why redis streams may be good for this use case! I'll definitely look into that and use them in this transport implementation! |
ragboyjr commentedMar 28, 2019
@soyuka I think the SF Lock and SF Cache component are able to utilize both redis libraries, i'd imagine if we did a redis transport using the list strategy, we should be able to provide some sort of compat layer to using either of the redis libraries. I'd be willing to help if needed, but as someone who has used a redis transport with messenger at a high volume on both ext-redis and predis, I wouldn't feel comfortable only allowing ext-redis. |
weaverryan commentedApr 3, 2019
Ping! Do we have some direction/motivation/time to finish this? We're after feature freeze, though there is some wiggle room probably because this component is experimental. |
soyuka commentedApr 3, 2019
To me, time is an issue, maybe this week-end but definitely not sooner. Also, the streams must be tested they may improve the transport a lot, which means more work. |
weaverryan commentedApr 4, 2019
I hear that :). Good candidate maybe for the hackathon this weekend, if not by you - by someone else? We could check to see if anyone has the expertise & is interested. Cheers! |
soyuka commentedApr 4, 2019
I'd love to work on this this week end but as I'm the Api Platform referent I'm not sure that I'll be able too :p. |
alexander-schranz commentedApr 4, 2019
@weaverryan@soyuka I will have a look at the current state at the |
weaverryan commentedApr 4, 2019
Added the hackathon star - thank you@alexander-schranz! |
alexander-schranz commentedApr 6, 2019 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
Analysed the current state and after also having a look at the new ReceiverInterface and playing around about it my concept will be the following: GET: Will use The sender will still be simple by calling the #EU-FOSSA |
weaverryan commentedApr 6, 2019
I don't know about the Redis streams part, but I can confirm the rest of the diagram is perfect: you're acking/nacking with the correct logic. |
sroze commentedApr 6, 2019
@alexander-schranz great. Fancy updating this pull-request (or creating another one)? |
alexander-schranz commentedApr 6, 2019
@sroze If I can I will update this PR so we have all discussion about it here. |
Nyholm left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Cool. I like this. I will test this later.
| return null; | ||
| } | ||
| $key = md5($value['body']); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
sha1 is quicker. I think we should use that one instead.
| */ | ||
| public function ack($message) | ||
| { | ||
| $key = md5($message['body']); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
sha1
| /** | ||
| * Reject the message: we acknowledge it, means we remove it form the queues. | ||
| * | ||
| * @TODO: log something? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
This should be removed or addressed.
| */ | ||
| public function requeue($message) | ||
| { | ||
| $key = md5($message['body']); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
sha1
| } | ||
| /** | ||
| * Add item at the tail of list. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
This comment is wrong. We are doing a queue and not a stack. =)
We are adding items to thehead of the list.
Usinglpush is correct.
| $pending = $this->connection->lRange($processingQueue, 0, -1); | ||
| foreach ($pending as $temp) { | ||
| $key = md5($temp['body']); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
sha1
soyuka commentedApr 6, 2019
Please feel free to update it! |
alexander-schranz commentedApr 6, 2019
soyuka commentedApr 6, 2019
Closing this one then! |
…ander-schranz)This PR was merged into the 4.3-dev branch.Discussion----------[Messenger] Add a redis stream transport| Q | A| ------------- | ---| Branch? | master| Bug fix? | no| New feature? | yes| BC breaks? | no| Deprecations? | no| Tests pass? | Yes| Fixed tickets |#28681| License | MIT| Doc PR |symfony/symfony-docs#11341As discussed in#28681 this will refractor@soyuka implementation of redis using the redis stream features so we don't need to handle parking the messages ourself and redis is doing it for us.Some interesting links about streams: -https://redis.io/topics/streams-intro -https://brandur.org/redis-streams```+-----------R| GET | -> XREADGROUP+-----------+ | | handleMessage V+-----------+ No| failed? |---------------------------++-----------+ | | | | Yes | V |+-----------+ No || retry? |---------------------------++-----------+ | | | | Yes | V V+-----------R +-----------R| REJECT | -> XDEL | ACK | -> XACK+-----------+ +-----------+```**GET**: Will use `XREADGROUP` to read the one message from the stream**REJECT**: Reject will just remove the message with `XDEL` from the stream as adding it back to the stream is handled by symfony worker itself**ACK**: Will use the `XACK` Method to ack the message for the specific groupThe sender will still be simple by calling the `XADD` redis function.#EU-FOSSACommits-------ff0b855 Refractor redis transport using redis streams7162d2e Implement redis transport
Hi!
This patch adds a redis transport to the Messenger component by using the official php redis-ext (https://github.com/phpredis/phpredis). It's a port ofhttps://github.com/soyuka/symfony-messenger-redis to be included directly in symfony.
How it works
Relevant discussion:https://twitter.com/jderusse/status/980768426116485122
The sender uses a
ListwithRPUSH(add value to the tail of the list).The receiver uses
BRPOPLPUSHwhich reads the last element of the list and adds in to the head of another list (queue_processing). If no elements are present it'll block the connection until a new element shows up or the timeout is reached. When timeout is reached it works like a "ping" of some sort (calls$handler(null)).On every iteration, we will check the
queue_processinglist. For every items in this queue we have a correspondingkeyin redis with a giventtl.If the key has expired, the item is
LREM(removed) fromqueue_processingand put back in the origin queue to be processed again. This workaround helps to avoid loosing messages.Difference with AmqpExt
I'm proposing this feature as a Work In Progress because I'm not sure how I should handle the queue.
Indeed, in AmqpExt the queue is tight to the Connection whereas here, a queue should be linked to a message.
It can work by using the same queue for different messages but I'd not advise to do this because it's messy if you need to maintain your queues afterwards (for example if you want to remove only the queue for message
Xyou should just remove queueX).What do you think? I can make the queue resilient inside the
Connectionclass so that the code is closer toAMQPExt\Connection(means opening 1 connection per messages) or keep the queue inside theReceiver/Sender(one connection only).When we find an agreement about this I'll add some tests and remove my WIP flag.
Thanks!