This utility class can be used to read one specific message from a Kafka topic, given its partition number and offset.
importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.common.TopicPartition;importjava.time.Duration;importjava.util.Collections;importjava.util.Properties;publicclassKafkaPickerimplementsAutoCloseable{privatestaticfinalDurationPOLL_TIMEOUT=Duration.ofSeconds(1);privateStringtopicName;privateKafkaConsumer<Object,Object>consumer;publicKafkaPicker(StringtopicName,Propertiesproperties){this.topicName=topicName;consumer=newKafkaConsumer<>(properties);}publicObjectpick(Longoffset,Integerpartition){TopicPartitiontopicPartition=newTopicPartition(topicName,partition);consumer.assign(Collections.singletonList(topicPartition));consumer.seek(topicPartition,offset);ConsumerRecords<Object,Object>records=consumer.poll(POLL_TIMEOUT);returnrecords.iterator().next().value();}publicvoidclose(){consumer.close();}}
In the example below, you will need aProperties
object to connect to Kafka with at least three mandatory properties:broker. bootstrap.servers
,key.deserializer
andvalue.deserializer
.
I this example, thespring.json.trusted.packages
property is used by the Spring JsonDeserializer.
As we will be picking specific messages and not be reading in batch, it is recommended do setmax.poll.records
to 1, so unnecessary messages will not be read.
Propertiesproperties=newProperties();properties.put("bootstrap.servers","kafka:9092");properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer","org.springframework.kafka.support.serializer.JsonDeserializer");properties.put("spring.json.trusted.packages","*");properties.put("max.poll.records","1");try(KafkaPickerkafkaPicker=newKafkaPicker("mytopic",properties)){System.out.println("msg = "+kafkaPicker.pick(4L,0));System.out.println("msg = "+kafkaPicker.pick(0L,0));}
Top comments(2)
Subscribe

Artem Ptushkin•
Java/Kotlin clean code developer; Consumer-Driven contracts expert; Spring expert
- LocationUtrecht, NL
- WorkDevelopment experience engineer at IptiQ
- Joined
This helped me, thank you
For further actions, you may consider blocking this person and/orreporting abuse