Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

support LZ4 compression#152

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Open
flier wants to merge2 commits intokafka-rust:master
base:master
Choose a base branch
Loading
fromflier:lz4
Open

support LZ4 compression#152

flier wants to merge2 commits intokafka-rust:masterfromflier:lz4

Conversation

flier
Copy link
Contributor

@flierflier commentedApr 13, 2017
edited
Loading

I'm still working on the LZ4 compression supports, it not a finalized design, please help to review code and give your comments.

Kafka use anincomplete implementation ofLZ4 frame format, I doubt whether we need a more complete implementation or just follow Kafka did.

Besides, are there any document about how to generate test-data and write integration test?

hongbo-miao reacted with eyes emoji
@flier
Copy link
ContributorAuthor

flier commentedApr 14, 2017
edited
Loading

Base onKAFKA-1493, Kafka 0.8/0.9 use a wrong LZ4 frame header checksum (include the magic). To followKIP-57 - Interoperable LZ4 Framing, we must useApiVersions API to find out whether the broker support correct checksum (Kafka 0.10 or later), and encode/decode LZ4 frame header base on it.

DEBUG:kafka::client: fetch_api_versions: got API versions from 10.6.5.140:9092: ApiVersionsResponse { header: HeaderResponse { correlation: 2 }, error_code: 0, api_versions: [ApiVersion { api_key: 0, min_version: 0, max_version: 2 },  // ProduceApiVersion { api_key: 1, min_version: 0, max_version: 3 },  // FetchApiVersion { api_key: 2, min_version: 0, max_version: 1 }, ...

@flierflierforce-pushed thelz4 branch 2 times, most recently from63d2e06 toa494ebdCompareApril 14, 2017 09:11
@xitep
Copy link
Collaborator

xitep commentedApr 18, 2017
edited
Loading

  • cool! 👍
  • unfortunately, i don't have enough time go through it :/ at least, I wanted to provide quick feedback regarding your questions:
    • i usually capture the protocol into a file likedemonstrated here; it's kind of hacky but works; however, having to capture the protocol is not what we generally want.
    • @dead10ck has managed to integrate kafka and docker for us. Writing integration tests against a running (dockerized) kafka instance is surely prefered over capturing the protocol. i'm sorry we don't have much documentation about it yet, however, i think looking attests/run-all-tests,tests/docker-compose.yml, andtests/test_kafka.rs will give you all necessary bits required.
    • yes, i think we can just "follow what kafka did". we can improve upon it at any time (later).

Copy link
Collaborator

@dead10ckdead10ck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Thanks for the PR! I can see this probably took you a lot of time to do. It is greatly appreciated!

However, there are some things that kind of bother me about this PR:

  1. I do not have much experience working with LZ4, but keeping in mind the issues you described with Kafka using the wrong frame header in older versions, is it at all possible to avoid having all this low-level LZ4 code in this crate? I feel like having this code there will significantly increase the burden of the maintainers and volunteer contributors, who may or may not have the expertise to deal with modifying and debugging the LZ4 code.
  2. If it's at all possible, could we avoid having the API versions in every request struct, and consequently needing to be passed around in various methods? I think this significantly cluttered up the lib's API.
  3. Please write some integration tests using the newrun-all-tests script described in the README.

Ok(conn) => {
let req = protocol::ApiVersionsRequest::new(correlation,
&self.config.client_id);
match __send_request(conn, req) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Can you please store this in an intermediate variable, rather than nesting amatch inside all the error handling?

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

switch to use a method chain instead of match tree

conn.host(),
err);

conn.close();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I'm not sure if it's appropriate to close the connection because of one failed request.

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

In fact, Kafka 0.8 will close the connection immediately when it received the unknownApiVersionsRequest, we just mark the connection was closed for the connection pool.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Oh, I see. Interesting!

@@ -24,19 +27,22 @@ pub struct ProduceRequest<'a, 'b> {
pub timeout: i32,
pub topic_partitions: Vec<TopicPartitionProduceRequest<'b>>,
pub compression: Compression,
pub api_version: Rc<ApiVersion>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Two questions:

  1. Why are theseApiVersions being added to these request structs? If I am not mistaken, the API version goes into the requestheader, not the request itself, so I'm not sure it makes sense to have the API version stored inside every request object.
  2. Why are they allRc? This adds significant overhead on every access.ApiVersion is just a simple struct, so even copying it is quite cheap.

Copy link
ContributorAuthor

@flierflierApr 28, 2017
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

  1. Yes, there is aapi_version field in the request header, butkafka-rust hard code it to 0. So, we have to add a realApiVersion to decide whether the server use the wrong frame checksum (0.8/0.9) or not (0.10 later).
  2. All theRc has been removed

c if c == Compression::LZ4 as i8 => {
use std::io::Read;
let mut v = Vec::new();
try!(try!(lz4::Lz4Reader::new(pmsg.value, false)).read_to_end(&mut v));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Using the new? operator might make this a little easier to read

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I usetry! because the old code use it, if you thinkrust 0.13 later will be fine, I would like to use? in future.

@@ -0,0 +1,502 @@
use std::mem;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Whoa, this is a lot of low-level code. Did you write this all yourself? If not, where did it come from? Do we need to have all this code, or is there a crate we can use?

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Sure, I write it.

In fact, I have extracted it to an isolated cratelz4frame. I'm not sure you like to depend on external crate or embedded those code.

lz4-rs have similar LZ4 frame supports fromliblz4, but it doesn't support the wrong frame checksum used in Kafka 0.8.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Yeah, it doesn't feel appropriate to have all this LZ4 code inside a crate for Kafka, and I think it could significantly increase the burden on maintainers and future contributors who may not have the expertise to mess with the code for an LZ4 implementation. If a library doesn't exist that suits the needs ofkafka-rust, then I think it's more appropriate to wait for one that does than to (please excuse the bluntness, because I really do appreciate your efforts!) have a bunch of hand-written, scantily-tested, -documented, and -commented code to handle a compatibility layer.

To be honest, in the absence of a quality crate that fits our needs, I'd rather just use a crate that uses the right frame checksum and notify users that it will only work for new versions of Kafka.

@xitep what are your thoughts?

@@ -371,6 +387,7 @@ impl KafkaConnection {
id: id,
host: host.to_owned(),
stream: stream,
closed: false,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

ATcpStream is not guaranteed to be open, so I'm not sure it's correct to assume it is here.

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

it just a mark for the connection pool when we found the server closed the connection. If you like to trace the whole lifecycle of connection, I could add a new state machine for it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Eh, actually on some more thought, it's probably ok if it turns out that the connection was closed. Further down the chain, it would just return an I/O error I think.

pub fn new(correlation_id: i32, client_id: &'a str) -> ApiVersionsRequest<'a> {
ApiVersionsRequest {
header: HeaderRequest::new(API_KEY_API_VERSIONS,
API_VERSION,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

According to the Kafka protocol docs, theApiVersionsRequest is only supported by 0.10.0.0 and up. So if you're just hard-codingAPI_VERSION here, which is in turn hard-coded to the Kafka 0.8 API version, then won't this request fail every time?

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Kafka use theapi_version for request, it means different request may have differentapi_version on different server version.

For example,Produce API use

v0, v1 (supported in 0.9.0 or later) and v2 (supported in 0.10.0 or later)

AndOffset API use

v1 (supported in 0.10.1.0 and later)

So, hard code it will be fine for 0.10 or later, and it always failed on Kafka 0.8

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Oh, I see; I misunderstood how the API versioning worked. I just realized that every request type has its own individual version, which starts at 0, which will work out here by happenstance. This kind of makes me think that we should refactor the request code in general.

@xitep I think this presents an interesting problem. I was thinking we should make a ticket to update all of our request/response structs with the additional fields from new API versions, but if we do that, how do we deserialize responses from older APIs that don't have those fields? We could useOptions, but that's a very clunky option, I think. We could have separate structs likeProduceResponseV0,ProduceResponseV1, etc., but that's also kind of clunky. Man, maintaining compatibility across all the versions is such a pain 😰

Copy link
Collaborator

@dead10ckdead10ck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Please rebase on top of master instead of doing a 3-way merge. It produces a cleaner git tree :)

@flierflierforce-pushed thelz4 branch 2 times, most recently from91ab83a to71caa27CompareApril 28, 2017 06:25
@flierflierforce-pushed thelz4 branch 2 times, most recently fromaa0ba8e to2e99bb7CompareMay 3, 2017 04:23
@flier
Copy link
ContributorAuthor

When debug the integration tests, I foundconsumer always received old messages whenpoll, I have todrain the pending message before produce a new one. Is it correct? or I use wrong API?

@dead10ck
Copy link
Collaborator

@flier hmm, are you running therun-all-tests script that's mentioned in theREADME? If you just run them viacargo, by default, it will run the tests in parallel, which can cause issues, since there is only one topic for the integration tests. The script will take care of invokingcargo correctly to run tests serially.

@flier
Copy link
ContributorAuthor

flier commentedMay 4, 2017
edited
Loading

@dead10ck interesting, I run the integration tests with-j 1 option, it seems doesn't work :(

$ cargotest -j 1 --features integration_tests

anyway,run-all-tests could passed all tests withRUST_TEST_THREADS

# need to run tests serially to avoid the tests stepping on each others' toesexport RUST_TEST_THREADS=1

@vorner
Copy link

vorner commentedFeb 10, 2018
edited
Loading

Hello

I'm gettingUnsupported compression error when trying to use this crate on the consumer side, and Iguess this is because of the messages are encrypted with LZ4 (I'll have to check with the author of the producer to make sure).

I haven't gone through the code or the whole discussion, but is there a chance this'll be woken up and will be implemented? Or, in a minimal way on the consumer side? It's bigger problem there than on the producer. Any way I can help?

DGolubets, gklijs, and calvinbrown085 reacted with thumbs up emoji

@gklijs
Copy link

It's actually more likely configured on the broker, and quite mainstream. Are you still planning to merge it eventually?

@dead10ck
Copy link
Collaborator

dead10ck commentedAug 1, 2020
edited
Loading

Unfortunately, I have not had much time to work on this crate the last few years, and there haven't been any active maintainers, so this has stalled. I could probably help out in the way of reviewing changes and doing releases, but not much more than that.

It's been a while since this discussion took place, and since I looked very deeply at the code, but after taking another peruse through both discussion and code, I think what I said before is still blocking this PR. Having LZ4 code in this crate seems inappropriate. I would of course love to see this merged, but I think to make that happen, it should be changed to use an external crate.

I can't remember all the details, but it looked like there was an issue using an external crate because of supporting Kafka 0.8. If this is still the case, I think it would make more sense to just drop support for 0.8 than to vendor and modify LZ4 compression code so that it remains compatible.

@gklijs
Copy link

Using a seperate crate makes a lot of sense, that's also how they did it for KafkaJs (but then with an npm module). I fear this project will slowly die. As much as I would like a pure rust Kafka client, it's a lot of work keeping it on par with the Java client. Even for the c/c++ it's hard and they have (a few?) people from Confluent working nearly full-time on it.

@dead10ck
Copy link
Collaborator

Personally, I came to the same conclusion after working on it for a little bit a few years ago. It's just a very big API, there's a lot more than just the Consumer/Producer stuff. I think to even hope to get something even close in terms of API coverage that is reliable and production quality, there would need to be people working on it full time.

gklijs and Dzordzu reacted with thumbs up emoji

Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment
Reviewers

@dead10ckdead10ckdead10ck requested changes

Assignees
No one assigned
Labels
None yet
Projects
None yet
Milestone
No milestone
Development

Successfully merging this pull request may close these issues.

5 participants
@flier@xitep@dead10ck@vorner@gklijs

[8]ページ先頭

©2009-2025 Movatter.jp