- Notifications
You must be signed in to change notification settings - Fork0
desainis/kafka-java-basics
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
- Add the Kafka dependencies to your
pom.xmlas below:- Fetch the latest atmvnrepository
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients--><dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.0</version></dependency>
- This project also makes use of slf4j. Fetch the latest atmvnrepository
- Writing a basic producer in Java. See
ProducerDemo.javafor further details.
StringbootstrapServers ="localhost:9092";// create Producer propertiesPropertiesproperties =newProperties();properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// create the producerKafkaProducer<String,String>producer =newKafkaProducer<String,String>(properties);ProducerRecord<String,String>record =newProducerRecord<String,String>("first_topic","hello world");// send the data - asynchronousproducer.send(record);// flush dataproducer.flush();// flush and close producerproducer.close();
- Send data with a callback function. See
ProducerWithCallback.javafor further details.
for (inti=0;i<10;i++) {ProducerRecord<String,String>record =newProducerRecord<String,String>("first_topic","hello world " +Integer.toString(i));// send the data - asynchronousproducer.send(record,newCallback() {publicvoidonCompletion(RecordMetadatarecordMetadata,Exceptione) {if (e ==null) {// record was successfully sentlogger.info("Received new metadata.\n" +"Topic: " +recordMetadata.topic() +"\n" +"Partition: " +recordMetadata.partition() +"\n" +"Offset: " +recordMetadata.offset() +"\n" +"Timestamp: " +recordMetadata.timestamp()); }else {logger.error("Error while producing",e); } } }); }
- A producer with keys value pairs. By providing a key we guarantee that the same key goes to the same partition.See
ProducerDemoKeys.javafor further details.
for (inti=0;i<10;i++) {Stringtopic ="first_topic";Stringvalue ="hello world " +Integer.toString(i);Stringkey ="id_" +Integer.toString(i);logger.info("Key: " +key);// log the key// By providing a key we guarantee that the same key goes to the same partitionProducerRecord<String,String>record =newProducerRecord<String,String>(topic,key,value);// send the data - asynchronousproducer.send(record,newCallback() {publicvoidonCompletion(RecordMetadatarecordMetadata,Exceptione) {if (e ==null) {// record was successfully sentlogger.info("Received new metadata.\n" +"Topic: " +recordMetadata.topic() +"\n" +"Partition: " +recordMetadata.partition() +"\n" +"Offset: " +recordMetadata.offset() +"\n" +"Timestamp: " +recordMetadata.timestamp()); }else {logger.error("Error while producing",e); } } }).get();// Bad practice, we just made the call synchronous. }
- A simple Kafka consumer.
Loggerlogger =LoggerFactory.getLogger(ConsumerDemo.class.getName());StringbootStrapServers ="localhost:9092";StringgroupId ="my-fourth-application";Stringtopic ="first_topic";// create consumer configsPropertiesproperties =newProperties();properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootStrapServers);properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");// earliest/latest/none// create consumerKafkaConsumer<String,String>consumer =newKafkaConsumer<String,String>(properties);// subscribe consumer to our topic(s)consumer.subscribe(Collections.singleton(topic));// consumer.subscribe(Arrays.asList("first_topic", "second_topic")) to subscribe to multiple topics// poll for new datawhile (true) {ConsumerRecords<String,String>records =consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String,String>record :records) {logger.info("Key: " +record.key() +", Value: " +record.value());logger.info("Partition: " +record.partition() +", Offset: " +record.offset()); }}
- Assign a group to a Kafka consumer.
StringbootStrapServers ="localhost:9092";StringgroupId ="my-fourth-application";Stringtopic ="first_topic";// create consumer configsPropertiesproperties =newProperties();properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootStrapServers);properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);
- If you're not familiar with Threads, this may be a little off putting. In general you want a threaded solution in production.
publicstaticvoidmain(String[]args) {newConsumerDemoWithThread().run(); }privateConsumerDemoWithThread() { }privatevoidrun() {Loggerlogger =LoggerFactory.getLogger(ConsumerDemoWithThread.class.getName());StringbootStrapServers ="localhost:9092";StringgroupId ="my-sixth-application";Stringtopic ="first_topic";// latch for dealing with concurrencyCountDownLatchlatch =newCountDownLatch(1);// create the consumer runnablelogger.info("Creating the consumer thread");RunnablemyConsumerRunnable =newConsumerRunnable(latch,bootStrapServers,groupId,topic);// Start the threadThreadmyThread =newThread(myConsumerRunnable);myThread.start();// add a shutdown hookRuntime.getRuntime().addShutdownHook(newThread( () -> {logger.info("Caught shutdown hook"); ((ConsumerRunnable)myConsumerRunnable).shutdown();try {latch.await(); }catch (InterruptedExceptione) {e.printStackTrace(); }logger.info("Application has exited"); }));try {latch.await(); }catch (InterruptedExceptione) {logger.error("Application got interrupted",e); }finally {logger.info("Application is closing"); } }publicclassConsumerRunnableimplementsRunnable {privateCountDownLatchlatch;privateKafkaConsumer<String,String>consumer;privateLoggerlogger =LoggerFactory.getLogger(ConsumerRunnable.class.getName());publicConsumerRunnable(CountDownLatchlatch,StringbootStrapServers,StringgroupId,Stringtopic) {this.latch =latch;// create consumer configsPropertiesproperties =newProperties();properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootStrapServers);properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");// earliest/latest/none// create consumerKafkaConsumer<String,String>consumer =newKafkaConsumer<String,String>(properties); }@Overridepublicvoidrun() {try {while (true) {ConsumerRecords<String,String>records =consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String,String>record :records) {logger.info("Key: " +record.key() +", Value: " +record.value());logger.info("Partition: " +record.partition() +", Offset: " +record.offset()); } } }catch (WakeupExceptione) {logger.info("Received shutdown signal!"); }finally {consumer.close();// tell our main code we're done with the consumerlatch.countDown(); } }publicvoidshutdown() {consumer.wakeup();// special method to interrupt consumer.poll() } }
- Use assign and seek carefully. Generally used to replay messages or fetch a specific message.
// create consumerKafkaConsumer<String,String>consumer =newKafkaConsumer<String,String>(properties);// assign and seek are used to replay data (or fetch a specific message)// assignTopicPartitionpartitionToReadFrom =newTopicPartition(topic,0);longoffsetToReadFrom =15L;consumer.assign(Arrays.asList(partitionToReadFrom));// seekconsumer.seek(partitionToReadFrom,offsetToReadFrom);intnumberOfMessagesToRead =5;booleankeepOnReading =true;intnumberOfMessagesReadSoFar =0;// poll for new datawhile (keepOnReading) {ConsumerRecords<String,String>records =consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String,String>record :records) {numberOfMessagesReadSoFar +=1;logger.info("Key: " +record.key() +", Value: " +record.value());logger.info("Partition: " +record.partition() +", Offset: " +record.offset());if (numberOfMessagesReadSoFar >=numberOfMessagesToRead) {keepOnReading =false;break; } } }}
- Set producer properties as below:
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");
- Enable compression by setting
compression.type. Experiment with different methods!
// Enable compression, your network will thank youproperties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");// experiment with different methods
- By default, produced messages are sent as soon as they are created. Enable
linger.ms&batch.sizeto control this flow.
// Enable batch sizingproperties.setProperty(ProducerConfig.LINGER_MS_CONFIG,"20");// wait 20ms before sending messages to they can be batched.properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,Integer.toString(32*1024));// 32KB Batch Size
- See
TwitterProducer.javafor further details
logger.info("Setup");/** Set up your blocking queues: Be sure to size these properly based on expected TPS of your stream */BlockingQueue<String>msgQueue =newLinkedBlockingQueue<String>(1000);// create a twitter clientClientclient =createTwitterClient(msgQueue);// Attempts to establish a connection.client.connect();// create a kafka producerKafkaProducer<String,String>producer =createKafkaProducer();// loop to send tweets to kafka// on a different thread, or multiple different threads....while (!client.isDone()) {Stringmsg =null;try {msg =msgQueue.poll(5,TimeUnit.SECONDS); }catch (InterruptedExceptione) {e.printStackTrace();client.stop(); }if (msg !=null) {logger.info(msg);producer.send(newProducerRecord<>("twitter_tweets",null,msg),newCallback() {@OverridepublicvoidonCompletion(RecordMetadatarecordMetadata,Exceptione) {if (e !=null) {logger.error("Something bad happened",e); } } }); } }logger.info("End of application");
Credits to Stephane. Checkout his awesome course onUdemy!
About
An introduction to Kafka outlining basic setup in Java
Topics
Resources
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Releases
No releases published
Packages0
No packages published