Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Fluent Kafka Streams Test with Java

License

NotificationsYou must be signed in to change notification settings

bakdata/fluent-kafka-streams-tests

Repository files navigation

Build StatusSonarcloud statusCode coverageMaven

Fluent Kafka Streams Tests

Write clean and concise tests for your Kafka Streams application.

You can find ablog post on medium with some examples and detailed explanations of how Fluent Kafka Streams Tests work.

Getting Started

You can add Fluent Kafka Streams Tests via Maven Central.

Gradle

compilegroup:'com.bakdata.fluent-kafka-streams-tests',name:'fluent-kafka-streams-tests-junit5',version:'2.1.0'

Maven

<dependency>    <groupId>com.bakdata.fluent-kafka-streams-tests</groupId>    <artifactId>fluent-kafka-streams-tests-junit5</artifactId>    <version>2.1.0</version></dependency>

There is also a junit4 version and one without any dependencies to a specific testing framework.

For other build tools or versions, refer to theoverview of sonatype.

Using it to Write Tests

Here are two example tests which show you how to use Fluent Kafka Streams Tests.

Word Count Test

Assume you have a Word Count Kafka Streams application, calledWordCount, and want to test it correctly.First, start by creating a new test class with your application.

classWordCountTest {privatefinalWordCountapp =newWordCount();}

Then, set up theTestTopology.

classWordCountTest {privatefinalWordCountapp =newWordCount();@RegisterExtensionfinalTestTopologyExtension<Object,String>testTopology =newTestTopologyExtension<>(this.app::getTopology,this.app.getKafkaProperties());}

TheTestTopology takes care of all the inputs, processing, and outputs of you application.For it to do that, you need to register it as an extension (JUnit5), so certain setup/teardown methods are called.The constructor expects a topology factory (for a fresh topology in each test) that creates the topology under test.

Additionally, the properties of theKafkaClient need to be specified.Broker and application-id must be present (Kafka testutil limitation), but are ignored.Most importantly, if the application expects default serde for key and value, these must be present in the properties orexplicitly specified withwithDefaultKeySerde(Serde serde) and/orwithDefaultValueSerde(Serde serde).

To test your appliction, you can simply write a JUnit test.

classWordCountTest {privatefinalWordCountapp =newWordCount();@RegisterExtensionfinalTestTopologyExtension<Object,String>testTopology =newTestTopologyExtension<>(this.app::getTopology,this.app.getKafkaProperties());@TestvoidshouldAggregateSameWordStream() {this.testTopology.input()            .add("cat")            .add("dog")            .add("cat");this.testTopology.streamOutput().withSerde(Serdes.String(),Serdes.Long())            .expectNextRecord().hasKey("cat").hasValue(1L)            .expectNextRecord().hasKey("dog").hasValue(1L)            .expectNextRecord().hasKey("cat").hasValue(2L)            .expectNoMoreRecord();    }}

See the tests for thejunit4 andframework agnostic setup.

TheTestTopology has a method.input() to retrieve the input topic (or.input(String topic)) if more than one input topic is present).You can simply add values to your input stream by calling.add(V value) or.add(K key, V value).

To get the output,TestTopology provides two methods:.streamOutput() and.tableOutput().They behave just like the input with regard to the number of output topics.Using the stream version simulates Kafka's stream-semantics, meaning that a key can be present many times in an output stream, whereas the table-semantics only output the newest value of each key.

To check the output records, you can call.expectNextRecord() and then chain.hasKey(K key),.hasKeySatisfying(Consumer<K> requirements),.hasValue(V value) or.hasValueSatisfying(Consumer<V> requirements) to this call.Note that calling.expectNextRecord() by itself without chaining at least one of the.has* methods will not check for the existence of a next record!

Once you expect no further records, call.expectNoMoreRecord() to indicate the end of the output stream.

Using Other Test Frameworks to Check Output

We intentionally kept the API for output checking slim, because there are many tools out there which focus on doing exactly that.TheTestOutput class implements theIterable interface, so you can use your favorite tool to test iterables.

Here is an example usingAssertJ.

@TestvoidshouldReturnCorrectIteratorTable() {this.testTopology.input()        .add("cat")        .add("dog")        .add("bird");assertThat(this.testTopology.tableOutput().withSerde(Serdes.String(),Serdes.Long()))        .extracting(ProducerRecord::key)        .containsAll(List.of("cat","dog","bird"));}

There is also an API to consume a record's key or value in order to embed another assertion framework into our API.

@TestvoidshouldReturnCorrectIteratorTable() {this.testTopology.input()        .add("cat");this.testTopology.streamOutput().withSerde(Serdes.String(),Serdes.Long())            .expectNextRecord()            .hasKeySatisfying(key ->assertThat(key).isEqualTo("cat"))            .hasValueSatisfying(value ->assertThat(value).isEqualTo(1L))            .expectNoMoreRecord();}

Alternatively, you can convert the output toList for use with your assertion framework. Here is an example of this withAssertJ.

@TestvoidshouldConvertStreamOutputToList(){this.testTopology.input()        .add("cat")        .add("dog")        .add("bird");finalList<ProducerRecord<String,Long>>outputs =this.testTopology.streamOutput()        .withSerde(Serdes.String(),Serdes.Long())        .toList();assertThat(outputs)        .extracting(ProducerRecord::key)        .containsExactly("cat","dog","bird");assertThat(outputs)        .extracting(ProducerRecord::value)        .containsExactly(1L,1L,1L);}

More Examples

You can find many more testsinthis repository's test code.

Development

If you want to contribute to this project, you can simply clone the repository and build it via Gradle.All dependencies should be included in the Gradle files, there are no external prerequisites.

> git clone git@github.com:bakdata/fluent-kafka-streams-tests.git>cd fluent-kafka-streams-tests&& ./gradlew build

Please note, that we havecode styles for Java.They are basically the Google style guide, with some small modifications.

Contributing

We are happy if you want to contribute to this project.If you find any bugs or have suggestions for improvements, please open an issue.We are also happy to accept your PRs.Just open an issue beforehand and let us know what you want to do and why.

License

This project is licensed under the MIT license.Have a look at theLICENSE for more details.


[8]ページ先頭

©2009-2025 Movatter.jp