Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

[KIP-848]: Trigger metadata refresh when partition count increases#5190

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Open
PratRanj07 wants to merge8 commits intomaster
base:master
Choose a base branch
Loading
fromdev_kip848_MetadataRefreshBug

Conversation

@PratRanj07
Copy link
Contributor

Currently, in rd_kafka_cgrp_consumer_assignment_with_metadata we only check whether the metadata for the topic is present in cache.
This PR changes the logic to also verify that the partitions listed in the assignment match the current consumer assignment.
Added a test case to cover this new partition assignment matching logic.

@PratRanj07PratRanj07 requested a review froma team as acode ownerSeptember 11, 2025 06:46
CopilotAI review requested due to automatic review settingsSeptember 11, 2025 06:46
@confluent-cla-assistant

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

Copy link

CopilotAI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Pull Request Overview

This PR implements KIP-848 partition metadata refresh functionality to trigger metadata refresh when partition count increases. The change modifies the consumer group assignment logic to verify that partitions listed in assignments match the current cached metadata, and triggers a refresh when partitions are missing.

  • Updates consumer assignment validation to check for partition existence in cached metadata
  • Adds debug logging when assigned partitions are not found in metadata cache
  • Includes comprehensive test coverage for the new partition assignment matching logic

Reviewed Changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.

Show a summary per file
FileDescription
src/rdkafka_cgrp.cCore implementation: adds partition existence validation and logging for metadata refresh triggers
tests/0154-metadata_refresh.cNew test file implementing integration test for KIP-848 functionality
tests/test.cTest registration: adds declaration and entry for the new metadata refresh test
tests/CMakeLists.txtBuild system: includes new test file in CMake configuration
win32/tests/tests.vcxprojBuild system: includes new test file in Visual Studio project

Tip: Customize your code reviews with copilot-instructions.md.Create the file orlearn how to get started.

test_conf_set(conf,"group.id",group);
test_conf_set(conf,"auto.offset.reset","earliest");
test_conf_set(conf,"debug","cgrp, protocol");
rd_kafka_conf_set_log_cb(conf,test_metadata_log_cb);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

The log callback parameter is hardcoded totest_metadata_log_cb instead of using thelog_cb parameter passed to the function. This makes thelog_cb parameter unused and could lead to incorrect behavior.

Suggested change
rd_kafka_conf_set_log_cb(conf,test_metadata_log_cb);
rd_kafka_conf_set_log_cb(conf,log_cb);

Copilot uses AI. Check for mistakes.
}

intmain_0154_metadata_refresh(intargc,char**argv) {
if (!test_consumer_group_protocol_classic())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

The logic is inverted - the test only runs when the classic protocol is NOT being used, but the test appears designed for classic consumer group protocol based on the heartbeat logs it's looking for. This should beif (test_consumer_group_protocol_classic()).

Suggested change
if (!test_consumer_group_protocol_classic())
if (test_consumer_group_protocol_classic())

Copilot uses AI. Check for mistakes.
Copy link
Member

@pranavrthpranavrth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Thanks Pratyush. First round of comments.

Comment on lines 2939 to 2945
for (j=0;j<pcnt;j++) {
if (rkmce->rkmce_mtopic.partitions[j].id==
partition) {
partition_found=rd_true;
break;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Since the partitions are monotonically increasing and cannot be deleted in Kafka, we can just usecnt to know if the partition is present in the metadata or not.

Suggested change
for (j=0;j<pcnt;j++) {
if (rkmce->rkmce_mtopic.partitions[j].id==
partition) {
partition_found=rd_true;
break;
}
}
partition_found= (partition<pcnt);

PratRanj07 reacted with thumbs up emoji
Comment on lines 2966 to 2973
rd_kafka_dbg(
rkcg->rkcg_rk,CGRP,"HEARTBEAT",
"Partition assigned to this consumer is not "
"present in cached metadata for topic id: %s. "
"This may indicate that the topic's partition "
"count has increased and metadata needs to be "
"refreshed. ",
rd_kafka_Uuid_base64str(&request_topic_id));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Debug statement should not be so verbose. I think its sufficient to add something like "Found new partition for the topic %s. Updating metadata.". Add topic name as well instead of just topic id.

PratRanj07 reacted with thumbs up emoji

if (likely(topic_name!=NULL)) {
// If topic name is found and partition exists in metadata
if (likely(topic_name!=NULL)&&partition_found) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Suggested change
if (likely(topic_name!=NULL)&&partition_found) {
if (topic_name&&partition_found) {

PratRanj07 reacted with thumbs up emoji
}
rd_kafka_rdunlock(rk);

if (unlikely(!topic_name)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Suggested change
if (unlikely(!topic_name)) {
if (!topic_name) {

PratRanj07 reacted with thumbs up emoji
returnconsumer;
}

staticvoiddo_test_setup_and_run_metadata_refresh_test(void) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Perform the same test in the older version of the code. Add the same log at the same place there and see if this is failing or not.

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Ran the test, Its failing:
`[0147_consumer_group_consumer_mock/ 14.471s] [ do_test_setup_and_run_metadata_refresh_test:1008 ]
[0147_consumer_group_consumer_mock/ 14.471s] Creating topic rdkafkatest_rnd2504dd53625e8721_cgrp_metadata with 2 partitions
[0147_consumer_group_consumer_mock/ 14.471s] Created kafka instance 0147_consumer_group_consumer_mock#producer-48
[0147_consumer_group_consumer_mock/ 14.471s] Creating topic "rdkafkatest_rnd2504dd53625e8721_cgrp_metadata" (partitions=2, replication_factor=1, timeout=26666)
[0147_consumer_group_consumer_mock/ 14.559s] CreateTopics: duration 88.111ms
[0147_consumer_group_consumer_mock/ 14.559s] Creating consumers
[0147_consumer_group_consumer_mock/ 14.560s] Setting test timeout to 60s * 2.7
[0147_consumer_group_consumer_mock/ 14.560s] Skipping setting forbidden configuration session.timeout.ms for CONSUMER protocol.
[0147_consumer_group_consumer_mock/ 14.560s] Created kafka instance 0147_consumer_group_consumer_mock#consumer-49
[0147_consumer_group_consumer_mock/ 14.560s] Setting test timeout to 60s * 2.7
[0147_consumer_group_consumer_mock/ 14.560s] Skipping setting forbidden configuration session.timeout.ms for CONSUMER protocol.
[0147_consumer_group_consumer_mock/ 14.560s] Created kafka instance 0147_consumer_group_consumer_mock#consumer-50
[0147_consumer_group_consumer_mock/ 14.560s] Created kafka instance 0147_consumer_group_consumer_mock#producer-51
[0147_consumer_group_consumer_mock/ 14.560s] Subscribing to topic rdkafkatest_rnd2504dd53625e8721_cgrp_metadata
[

/ 15.055s] 1 test(s) running: 0147_consumer_group_consumer_mock
[ / 16.061s] 1 test(s) running: 0147_consumer_group_consumer_mock
[0147_consumer_group_consumer_mock/ 16.564s] 0147_consumer_group_consumer_mock#consumer-49: Assignment (1 partition(s)): rdkafkatest_rnd2504dd53625e8721_cgrp_metadata[0]
[ / 17.067s] 1 test(s) running: 0147_consumer_group_consumer_mock
[ / 18.071s] 1 test(s) running: 0147_consumer_group_consumer_mock
[ / 19.077s] 1 test(s) running: 0147_consumer_group_consumer_mock
[ / 20.082s] 1 test(s) running: 0147_consumer_group_consumer_mock
[ / 21.085s] 1 test(s) running: 0147_consumer_group_consumer_mock
[0147_consumer_group_consumer_mock/ 21.582s] 0147_consumer_group_consumer_mock#consumer-50: Assignment (1 partition(s)): rdkafkatest_rnd2504dd53625e8721_cgrp_metadata[1]
[0147_consumer_group_consumer_mock/ 21.582s] Increasing partition count to 4
[0147_consumer_group_consumer_mock/ 21.582s] Creating 4 (total) partitions for topic "rdkafkatest_rnd2504dd53625e8721_cgrp_metadata"
[0147_consumer_group_consumer_mock/ 21.634s] CreatePartitions: duration 51.855ms
[ / 22.089s] 1 test(s) running: 0147_consumer_group_consumer_mock
[ / 23.092s] 1 test(s) running: 0147_consumer_group_consumer_mock
[ / 24.097s] 1 test(s) running: 0147_consumer_group_consumer_mock
[ / 25.102s] 1 test(s) running: 0147_consumer_group_consumer_mock
[ / 26.106s] 1 test(s) running: 0147_consumer_group_consumer_mock
[ / 27.112s] 1 test(s) running: 0147_consumer_group_consumer_mock
[ / 28.115s] 1 test(s) running: 0147_consumer_group_consumer_mock
[ / 29.119s] 1 test(s) running: 0147_consumer_group_consumer_mock
[ / 30.124s] 1 test(s) running: 0147_consumer_group_consumer_mock
[ / 31.126s] 1 test(s) running: 0147_consumer_group_consumer_mock
[0147_consumer_group_consumer_mock/ 31.705s] TEST FAILURE

Test "0147_consumer_group_consumer_mock (do_test_setup_and_run_metadata_refresh_test:1008)" failed at 0147-consumer_group_consumer_mock.c:969:wait_for_metadata_refresh_log() at Wed Sep 17 16:17:35 2025:

Test assertion failed: "seen_metadata_log": Expected metadata refresh log not seen after partition creation and heartbeat`

rd_kafka_rdunlock(rk);

if (unlikely(!topic_name)) {
if (!topic_name) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Why removing this unlikely? It's more likely the partition is found in cache.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Yes but we receive change in assignments very rarely once those are assigned. This is not critical path so there is no need oflikely andunlikely IMO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

That is should be done in the critical path doesn't mean it shouldn't be done elsewhere


if (likely(topic_name!=NULL)) {
// If topic name is found and partition exists in metadata
if (topic_name!=NULL&&partition_found) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

We can keep the likely here too

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Same

if (!partition_found&&topic_name!=NULL)
rd_kafka_dbg(rkcg->rkcg_rk,CGRP,"HEARTBEAT",
"Found new partition for topic %s. "
"Updating metadata.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Suggested change
"Updating metadata.",
"Updating metadata",

Copy link
Contributor

@emasabemasabSep 17, 2025
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Internal periods are ok, but not at the end of the log message

PratRanj07 reacted with thumbs up emoji

if (!partition_found&&topic_name!=NULL)
rd_kafka_dbg(rkcg->rkcg_rk,CGRP,"HEARTBEAT",
"Found new partition for topic %s. "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Suggested change
"Found new partition for topic%s. "
"Found new partition for topic\"%s\". "

PratRanj07 reacted with thumbs up emoji
Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment

Reviewers

@emasabemasabemasab left review comments

@pranavrthpranavrthpranavrth requested changes

Copilot code reviewCopilotCopilot left review comments

Requested changes must be addressed to merge this pull request.

Assignees

No one assigned

Labels

None yet

Projects

None yet

Milestone

No milestone

Development

Successfully merging this pull request may close these issues.

4 participants

@PratRanj07@emasab@pranavrth

[8]ページ先頭

©2009-2025 Movatter.jp