- Notifications
You must be signed in to change notification settings - Fork3
Evaluation of Kafka client configurations via distributed tracing.
License
jeqo/tracing-kafka-apps
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
Evaluation of Kafka client configurations via distributed tracing.
- JDK 11
- Docker engine, Docker compose
- wrk
- Producers:
http-event-producer,http-metadata-producer - Stream-Processors:
stream-processor-joiner - Consumers:
console-consumer
Events and Metadata messages are sent to an input Kafka topic, a Stream applicationstore Metadata messages as a table representation, and join Events and Metadatamessages, to then send them to another Kafka topic. A Consumer polls fromthe final Kafka topic and process joined messages.
make# this will build containers, start compose, create topics# after it completesmake perf-test# this will start http testing took to create load.
Event producer: synchronous send.Console consumer: auto-commit.
publicclassEventPublisher {//...voidpublish()throwsException {varrecord =newProducerRecord<>(topic,"A","A");kafkaProducer.send(record).get(); }}
We can see that HTTP response is blocked bykafkaProducer.send() operation completes.
As record is stored in Kafka topic, consumption starts even before HTTP response isreturned.
Consumer poll and consume as soon as possible.
Event Producer: async send.Console consumer: auto-commit.
publicclassEventPublisher {// ...voidpublish()throwsException {varrecord =newProducerRecord<>(topic,"A","A");kafkaProducer.send(record); }}
Instead of waiting for an acknowledge from the Kafka broker, producer does not block, and HTTP return response after async send has complete.
Consumer poll and consume as soon as possible.
Event Producer: async, batched send.Console Consumer: auto-commit.
publicclassEventPublisher {publicEventPublisher(Tracingtracing,Configconfig) {varproducerConfig =newProperties();//...producerConfig.put(ProducerConfig.BATCH_SIZE_CONFIG,100000);producerConfig.put(ProducerConfig.LINGER_MS_CONFIG,1000);//... }voidpublish()throwsException {varrecord =newProducerRecord<>(topic,"A","A");kafkaProducer.send(record); }}
Every message will be buffered until a batch of 100KB (batch.size) is created,or 1 second times out (linger.ms).
Depending on how your message is positioned as part of the batch, your transactioncan take up to a second to send a message.
We only execute 1 round-trip (depending onacks andmin.isr) for every batch.
Event Producer: sync send.Console Consumer: commit per record.
publicclassConsoleConsumerimplementsRunnable {privatevoidprintRecord(Consumer<String,String>consumer,ConsumerRecord<String,String>record) {// processingconsumer.commitSync(Map.of(newTopicPartition(record.topic(),record.partition()),newOffsetAndMetadata(record.offset())));// ... }@Overridepublicvoidrun() {try (Consumer<String,String>tracingConsumer =kafkaTracing.consumer(newKafkaConsumer<>(config))) {tracingConsumer.subscribe(topics);while (running.get()) {varrecords =tracingConsumer.poll(Duration.ofSeconds(1));records.forEach(r ->this.printRecord(tracingConsumer,r)); } }// ... }}
If we commit per record, is much harder for the consumer to keep up with theproducer pace. In this trace, it took almost a second to consume the recordsince it was produced.
About
Evaluation of Kafka client configurations via distributed tracing.
Topics
Resources
License
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Releases
Packages0
Contributors2
Uh oh!
There was an error while loading.Please reload this page.




