Movatterモバイル変換


[0]ホーム

URL:


Skip to content
DEV Community
Log in Create account

DEV Community

Eduardo Issao Ito
Eduardo Issao Ito

Posted on

     

Reading one specific message from a Kafka topic

This utility class can be used to read one specific message from a Kafka topic, given its partition number and offset.

importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.common.TopicPartition;importjava.time.Duration;importjava.util.Collections;importjava.util.Properties;publicclassKafkaPickerimplementsAutoCloseable{privatestaticfinalDurationPOLL_TIMEOUT=Duration.ofSeconds(1);privateStringtopicName;privateKafkaConsumer<Object,Object>consumer;publicKafkaPicker(StringtopicName,Propertiesproperties){this.topicName=topicName;consumer=newKafkaConsumer<>(properties);}publicObjectpick(Longoffset,Integerpartition){TopicPartitiontopicPartition=newTopicPartition(topicName,partition);consumer.assign(Collections.singletonList(topicPartition));consumer.seek(topicPartition,offset);ConsumerRecords<Object,Object>records=consumer.poll(POLL_TIMEOUT);returnrecords.iterator().next().value();}publicvoidclose(){consumer.close();}}

In the example below, you will need aProperties object to connect to Kafka with at least three mandatory properties:broker. bootstrap.servers,key.deserializer andvalue.deserializer.

I this example, thespring.json.trusted.packages property is used by the Spring JsonDeserializer.

As we will be picking specific messages and not be reading in batch, it is recommended do setmax.poll.records to 1, so unnecessary messages will not be read.

Propertiesproperties=newProperties();properties.put("bootstrap.servers","kafka:9092");properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer","org.springframework.kafka.support.serializer.JsonDeserializer");properties.put("spring.json.trusted.packages","*");properties.put("max.poll.records","1");try(KafkaPickerkafkaPicker=newKafkaPicker("mytopic",properties)){System.out.println("msg = "+kafkaPicker.pick(4L,0));System.out.println("msg = "+kafkaPicker.pick(0L,0));}

Top comments(2)

Subscribe
pic
Create template

Templates let you quickly answer FAQs or store snippets for re-use.

Dismiss
CollapseExpand
 
art_ptushkin profile image
Artem Ptushkin
Java/Kotlin clean code developer; Consumer-Driven contracts expert; Spring expert
  • Location
    Utrecht, NL
  • Work
    Development experience engineer at IptiQ
  • Joined

This helped me, thank you

CollapseExpand
 
fcairib76 profile image
ali rezvani
  • Joined

Hi,can U give me this code or give github link?

Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment'spermalink.

For further actions, you may consider blocking this person and/orreporting abuse

Software architect, developer, learner.
  • Location
    São Paulo
  • Joined

More fromEduardo Issao Ito

DEV Community

We're a place where coders share, stay up-to-date and grow their careers.

Log in Create account

[8]ページ先頭

©2009-2025 Movatter.jp