Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

An introduction to Kafka outlining basic setup in Java

NotificationsYou must be signed in to change notification settings

desainis/kafka-java-basics

Repository files navigation

Starting Kafka with Java (Maven)

  • Add the Kafka dependencies to yourpom.xml as below:
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients--><dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka-clients</artifactId>    <version>2.4.0</version></dependency>
  • This project also makes use of slf4j. Fetch the latest atmvnrepository

Kafka Producers

  • Writing a basic producer in Java. SeeProducerDemo.java for further details.
StringbootstrapServers ="localhost:9092";// create Producer propertiesPropertiesproperties =newProperties();properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// create the producerKafkaProducer<String,String>producer =newKafkaProducer<String,String>(properties);ProducerRecord<String,String>record =newProducerRecord<String,String>("first_topic","hello world");// send the data - asynchronousproducer.send(record);// flush dataproducer.flush();// flush and close producerproducer.close();

Kafka Producers with Callback

  • Send data with a callback function. SeeProducerWithCallback.java for further details.
for (inti=0;i<10;i++) {ProducerRecord<String,String>record =newProducerRecord<String,String>("first_topic","hello world " +Integer.toString(i));// send the data - asynchronousproducer.send(record,newCallback() {publicvoidonCompletion(RecordMetadatarecordMetadata,Exceptione) {if (e ==null) {// record was successfully sentlogger.info("Received new metadata.\n" +"Topic: " +recordMetadata.topic() +"\n" +"Partition: " +recordMetadata.partition() +"\n" +"Offset: " +recordMetadata.offset() +"\n" +"Timestamp: " +recordMetadata.timestamp());                    }else {logger.error("Error while producing",e);                    }                }            });        }

Kafka Producers with Keys

  • A producer with keys value pairs. By providing a key we guarantee that the same key goes to the same partition.SeeProducerDemoKeys.java for further details.
for (inti=0;i<10;i++) {Stringtopic ="first_topic";Stringvalue ="hello world " +Integer.toString(i);Stringkey ="id_" +Integer.toString(i);logger.info("Key: " +key);// log the key// By providing a key we guarantee that the same key goes to the same partitionProducerRecord<String,String>record =newProducerRecord<String,String>(topic,key,value);// send the data - asynchronousproducer.send(record,newCallback() {publicvoidonCompletion(RecordMetadatarecordMetadata,Exceptione) {if (e ==null) {// record was successfully sentlogger.info("Received new metadata.\n" +"Topic: " +recordMetadata.topic() +"\n" +"Partition: " +recordMetadata.partition() +"\n" +"Offset: " +recordMetadata.offset() +"\n" +"Timestamp: " +recordMetadata.timestamp());                    }else {logger.error("Error while producing",e);                    }                }            }).get();// Bad practice, we just made the call synchronous.        }

Kafka Consumers

  • A simple Kafka consumer.
Loggerlogger =LoggerFactory.getLogger(ConsumerDemo.class.getName());StringbootStrapServers ="localhost:9092";StringgroupId ="my-fourth-application";Stringtopic ="first_topic";// create consumer configsPropertiesproperties =newProperties();properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootStrapServers);properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");// earliest/latest/none// create consumerKafkaConsumer<String,String>consumer =newKafkaConsumer<String,String>(properties);// subscribe consumer to our topic(s)consumer.subscribe(Collections.singleton(topic));// consumer.subscribe(Arrays.asList("first_topic", "second_topic")) to subscribe to multiple topics// poll for new datawhile (true) {ConsumerRecords<String,String>records =consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String,String>record :records) {logger.info("Key: " +record.key() +", Value: "  +record.value());logger.info("Partition: " +record.partition() +", Offset: " +record.offset());    }}

Kafka Consumers with Groups

  • Assign a group to a Kafka consumer.
StringbootStrapServers ="localhost:9092";StringgroupId ="my-fourth-application";Stringtopic ="first_topic";// create consumer configsPropertiesproperties =newProperties();properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootStrapServers);properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);

Kafka Consumers with Thread(s)

  • If you're not familiar with Threads, this may be a little off putting. In general you want a threaded solution in production.
publicstaticvoidmain(String[]args) {newConsumerDemoWithThread().run();    }privateConsumerDemoWithThread() {    }privatevoidrun() {Loggerlogger =LoggerFactory.getLogger(ConsumerDemoWithThread.class.getName());StringbootStrapServers ="localhost:9092";StringgroupId ="my-sixth-application";Stringtopic ="first_topic";// latch for dealing with concurrencyCountDownLatchlatch =newCountDownLatch(1);// create the consumer runnablelogger.info("Creating the consumer thread");RunnablemyConsumerRunnable =newConsumerRunnable(latch,bootStrapServers,groupId,topic);// Start the threadThreadmyThread =newThread(myConsumerRunnable);myThread.start();// add a shutdown hookRuntime.getRuntime().addShutdownHook(newThread( () -> {logger.info("Caught shutdown hook");            ((ConsumerRunnable)myConsumerRunnable).shutdown();try {latch.await();            }catch (InterruptedExceptione) {e.printStackTrace();            }logger.info("Application has exited");        }));try {latch.await();        }catch (InterruptedExceptione) {logger.error("Application got interrupted",e);        }finally {logger.info("Application is closing");        }    }publicclassConsumerRunnableimplementsRunnable {privateCountDownLatchlatch;privateKafkaConsumer<String,String>consumer;privateLoggerlogger =LoggerFactory.getLogger(ConsumerRunnable.class.getName());publicConsumerRunnable(CountDownLatchlatch,StringbootStrapServers,StringgroupId,Stringtopic) {this.latch =latch;// create consumer configsPropertiesproperties =newProperties();properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootStrapServers);properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");// earliest/latest/none// create consumerKafkaConsumer<String,String>consumer =newKafkaConsumer<String,String>(properties);        }@Overridepublicvoidrun() {try {while (true) {ConsumerRecords<String,String>records =consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String,String>record :records) {logger.info("Key: " +record.key() +", Value: " +record.value());logger.info("Partition: " +record.partition() +", Offset: " +record.offset());                    }                }            }catch (WakeupExceptione) {logger.info("Received shutdown signal!");            }finally {consumer.close();// tell our main code we're done with the consumerlatch.countDown();            }        }publicvoidshutdown() {consumer.wakeup();// special method to interrupt consumer.poll()        }    }

Kafka Consumers using Assign and Seek

  • Use assign and seek carefully. Generally used to replay messages or fetch a specific message.
// create consumerKafkaConsumer<String,String>consumer =newKafkaConsumer<String,String>(properties);// assign and seek are used to replay data (or fetch a specific message)// assignTopicPartitionpartitionToReadFrom =newTopicPartition(topic,0);longoffsetToReadFrom =15L;consumer.assign(Arrays.asList(partitionToReadFrom));// seekconsumer.seek(partitionToReadFrom,offsetToReadFrom);intnumberOfMessagesToRead =5;booleankeepOnReading =true;intnumberOfMessagesReadSoFar =0;// poll for new datawhile (keepOnReading) {ConsumerRecords<String,String>records =consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String,String>record :records) {numberOfMessagesReadSoFar +=1;logger.info("Key: " +record.key() +", Value: "  +record.value());logger.info("Partition: " +record.partition() +", Offset: " +record.offset());if (numberOfMessagesReadSoFar >=numberOfMessagesToRead) {keepOnReading =false;break;            }        }    }}

Enabling an Idempotent Producer

  • Set producer properties as below:
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");

Enable Compression

  • Enable compression by settingcompression.type. Experiment with different methods!
// Enable compression, your network will thank youproperties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");// experiment with different methods

Enable Producer Batching

  • By default, produced messages are sent as soon as they are created. Enablelinger.ms &batch.size to control this flow.
// Enable batch sizingproperties.setProperty(ProducerConfig.LINGER_MS_CONFIG,"20");// wait 20ms before sending messages to they can be batched.properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,Integer.toString(32*1024));// 32KB Batch Size

Using Kafka to consume live tweets

  • SeeTwitterProducer.java for further details
logger.info("Setup");/** Set up your blocking queues: Be sure to size these properly based on expected TPS of your stream */BlockingQueue<String>msgQueue =newLinkedBlockingQueue<String>(1000);// create a twitter clientClientclient =createTwitterClient(msgQueue);// Attempts to establish a connection.client.connect();// create a kafka producerKafkaProducer<String,String>producer =createKafkaProducer();// loop to send tweets to kafka// on a different thread, or multiple different threads....while (!client.isDone()) {Stringmsg =null;try {msg =msgQueue.poll(5,TimeUnit.SECONDS);            }catch (InterruptedExceptione) {e.printStackTrace();client.stop();            }if (msg !=null) {logger.info(msg);producer.send(newProducerRecord<>("twitter_tweets",null,msg),newCallback() {@OverridepublicvoidonCompletion(RecordMetadatarecordMetadata,Exceptione) {if (e !=null) {logger.error("Something bad happened",e);                        }                    }                });            }        }logger.info("End of application");

Credits to Stephane. Checkout his awesome course onUdemy!

About

An introduction to Kafka outlining basic setup in Java

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages


[8]ページ先頭

©2009-2025 Movatter.jp