- Notifications
You must be signed in to change notification settings - Fork142
support LZ4 compression#152
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
base:master
Are you sure you want to change the base?
Conversation
flier commentedApr 14, 2017 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
Base onKAFKA-1493, Kafka 0.8/0.9 use a wrong LZ4 frame header checksum (include the magic). To followKIP-57 - Interoperable LZ4 Framing, we must useApiVersions API to find out whether the broker support correct checksum (Kafka 0.10 or later), and encode/decode LZ4 frame header base on it.
|
63d2e06
toa494ebd
Comparexitep commentedApr 18, 2017 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Thanks for the PR! I can see this probably took you a lot of time to do. It is greatly appreciated!
However, there are some things that kind of bother me about this PR:
- I do not have much experience working with LZ4, but keeping in mind the issues you described with Kafka using the wrong frame header in older versions, is it at all possible to avoid having all this low-level LZ4 code in this crate? I feel like having this code there will significantly increase the burden of the maintainers and volunteer contributors, who may or may not have the expertise to deal with modifying and debugging the LZ4 code.
- If it's at all possible, could we avoid having the API versions in every request struct, and consequently needing to be passed around in various methods? I think this significantly cluttered up the lib's API.
- Please write some integration tests using the new
run-all-tests
script described in the README.
src/client/mod.rs Outdated
Ok(conn) => { | ||
let req = protocol::ApiVersionsRequest::new(correlation, | ||
&self.config.client_id); | ||
match __send_request(conn, req) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Can you please store this in an intermediate variable, rather than nesting amatch
inside all the error handling?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
switch to use a method chain instead of match tree
src/client/mod.rs Outdated
conn.host(), | ||
err); | ||
conn.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I'm not sure if it's appropriate to close the connection because of one failed request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
In fact, Kafka 0.8 will close the connection immediately when it received the unknownApiVersionsRequest
, we just mark the connection was closed for the connection pool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Oh, I see. Interesting!
src/protocol/produce.rs Outdated
@@ -24,19 +27,22 @@ pub struct ProduceRequest<'a, 'b> { | |||
pub timeout: i32, | |||
pub topic_partitions: Vec<TopicPartitionProduceRequest<'b>>, | |||
pub compression: Compression, | |||
pub api_version: Rc<ApiVersion>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Two questions:
- Why are these
ApiVersion
s being added to these request structs? If I am not mistaken, the API version goes into the requestheader, not the request itself, so I'm not sure it makes sense to have the API version stored inside every request object. - Why are they all
Rc
? This adds significant overhead on every access.ApiVersion
is just a simple struct, so even copying it is quite cheap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
- Yes, there is a
api_version
field in the request header, butkafka-rust
hard code it to 0. So, we have to add a realApiVersion
to decide whether the server use the wrong frame checksum (0.8/0.9) or not (0.10 later). - All the
Rc
has been removed
src/protocol/fetch.rs Outdated
c if c == Compression::LZ4 as i8 => { | ||
use std::io::Read; | ||
let mut v = Vec::new(); | ||
try!(try!(lz4::Lz4Reader::new(pmsg.value, false)).read_to_end(&mut v)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Using the new?
operator might make this a little easier to read
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I usetry!
because the old code use it, if you thinkrust 0.13 later
will be fine, I would like to use?
in future.
@@ -0,0 +1,502 @@ | |||
use std::mem; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Whoa, this is a lot of low-level code. Did you write this all yourself? If not, where did it come from? Do we need to have all this code, or is there a crate we can use?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Yeah, it doesn't feel appropriate to have all this LZ4 code inside a crate for Kafka, and I think it could significantly increase the burden on maintainers and future contributors who may not have the expertise to mess with the code for an LZ4 implementation. If a library doesn't exist that suits the needs ofkafka-rust
, then I think it's more appropriate to wait for one that does than to (please excuse the bluntness, because I really do appreciate your efforts!) have a bunch of hand-written, scantily-tested, -documented, and -commented code to handle a compatibility layer.
To be honest, in the absence of a quality crate that fits our needs, I'd rather just use a crate that uses the right frame checksum and notify users that it will only work for new versions of Kafka.
@xitep what are your thoughts?
@@ -371,6 +387,7 @@ impl KafkaConnection { | |||
id: id, | |||
host: host.to_owned(), | |||
stream: stream, | |||
closed: false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
ATcpStream
is not guaranteed to be open, so I'm not sure it's correct to assume it is here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
it just a mark for the connection pool when we found the server closed the connection. If you like to trace the whole lifecycle of connection, I could add a new state machine for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Eh, actually on some more thought, it's probably ok if it turns out that the connection was closed. Further down the chain, it would just return an I/O error I think.
pub fn new(correlation_id: i32, client_id: &'a str) -> ApiVersionsRequest<'a> { | ||
ApiVersionsRequest { | ||
header: HeaderRequest::new(API_KEY_API_VERSIONS, | ||
API_VERSION, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
According to the Kafka protocol docs, theApiVersionsRequest
is only supported by 0.10.0.0 and up. So if you're just hard-codingAPI_VERSION
here, which is in turn hard-coded to the Kafka 0.8 API version, then won't this request fail every time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Kafka use theapi_version
for request, it means different request may have differentapi_version
on different server version.
For example,Produce API use
v0, v1 (supported in 0.9.0 or later) and v2 (supported in 0.10.0 or later)
AndOffset API use
v1 (supported in 0.10.1.0 and later)
So, hard code it will be fine for 0.10 or later, and it always failed on Kafka 0.8
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Oh, I see; I misunderstood how the API versioning worked. I just realized that every request type has its own individual version, which starts at 0, which will work out here by happenstance. This kind of makes me think that we should refactor the request code in general.
@xitep I think this presents an interesting problem. I was thinking we should make a ticket to update all of our request/response structs with the additional fields from new API versions, but if we do that, how do we deserialize responses from older APIs that don't have those fields? We could useOption
s, but that's a very clunky option, I think. We could have separate structs likeProduceResponseV0
,ProduceResponseV1
, etc., but that's also kind of clunky. Man, maintaining compatibility across all the versions is such a pain 😰
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Please rebase on top of master instead of doing a 3-way merge. It produces a cleaner git tree :)
91ab83a
to71caa27
Compareaa0ba8e
to2e99bb7
CompareWhen debug the integration tests, I found |
@flier hmm, are you running the |
flier commentedMay 4, 2017 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
@dead10ck interesting, I run the integration tests with $ cargotest -j 1 --features integration_tests anyway, # need to run tests serially to avoid the tests stepping on each others' toesexport RUST_TEST_THREADS=1 |
vorner commentedFeb 10, 2018 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
Hello I'm getting I haven't gone through the code or the whole discussion, but is there a chance this'll be woken up and will be implemented? Or, in a minimal way on the consumer side? It's bigger problem there than on the producer. Any way I can help? |
gklijs commentedAug 1, 2020
It's actually more likely configured on the broker, and quite mainstream. Are you still planning to merge it eventually? |
dead10ck commentedAug 1, 2020 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
Unfortunately, I have not had much time to work on this crate the last few years, and there haven't been any active maintainers, so this has stalled. I could probably help out in the way of reviewing changes and doing releases, but not much more than that. It's been a while since this discussion took place, and since I looked very deeply at the code, but after taking another peruse through both discussion and code, I think what I said before is still blocking this PR. Having LZ4 code in this crate seems inappropriate. I would of course love to see this merged, but I think to make that happen, it should be changed to use an external crate. I can't remember all the details, but it looked like there was an issue using an external crate because of supporting Kafka 0.8. If this is still the case, I think it would make more sense to just drop support for 0.8 than to vendor and modify LZ4 compression code so that it remains compatible. |
gklijs commentedAug 2, 2020
Using a seperate crate makes a lot of sense, that's also how they did it for KafkaJs (but then with an npm module). I fear this project will slowly die. As much as I would like a pure rust Kafka client, it's a lot of work keeping it on par with the Java client. Even for the c/c++ it's hard and they have (a few?) people from Confluent working nearly full-time on it. |
Personally, I came to the same conclusion after working on it for a little bit a few years ago. It's just a very big API, there's a lot more than just the Consumer/Producer stuff. I think to even hope to get something even close in terms of API coverage that is reliable and production quality, there would need to be people working on it full time. |
Uh oh!
There was an error while loading.Please reload this page.
I'm still working on the LZ4 compression supports, it not a finalized design, please help to review code and give your comments.
Kafka use anincomplete implementation ofLZ4 frame format, I doubt whether we need a more complete implementation or just follow Kafka did.
Besides, are there any document about how to generate test-data and write integration test?