- Notifications
You must be signed in to change notification settings - Fork36
ballerina-platform/module-ballerinax-kafka
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
This library provides an implementation to interact with Kafka Brokers via Kafka Consumer and Kafka Producer clients.
Apache Kafka is an open-source distributed event streaming platform used for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
This library supports Kafka 1.x.x, 2.x.x and 3.x.x versions.
A Kafka producer is a Kafka client that publishes records to the Kafka cluster. The producer is thread-safe and sharing a single producer instance across threads will generally be faster than having multiple instances. When working with a Kafka producer, the first thing to do is to initialize the producer.For the producer to execute successfully, an active Kafka broker should be available.
The code snippet given below initializes a producer with the basic configuration.
importballerinax/kafka;kafka:ProducerConfigurationproducerConfiguration= {clientId:"basic-producer",acks:"all",retryCount:3};kafka:ProducerkafkaProducer=checknew (kafka:DEFAULT_URL,producerConfiguration);
A Kafka consumer is a subscriber responsible for reading records from one or more topics and one or more partitions of a topic. When working with a Kafka consumer, the first thing to do is initialize the consumer.For the consumer to execute successfully, an active Kafka broker should be available.
The code snippet given below initializes a consumer with the basic configuration.
kafka:ConsumerConfigurationconsumerConfiguration= {groupId:"group-id",// Unique string that identifies the consumeroffsetReset:"earliest",// Offset reset strategy if no initial offsettopics: ["kafka-topic"]};kafka:ConsumerkafkaConsumer=checknew (kafka:DEFAULT_URL,consumerConfiguration);
The Kafka consumer can be used as a listener to a set of topics without the need to manuallypoll the messages.
You can use theCaller to manually commit the offsets of the messages that are read by the service. The following code snippet shows how to initialize and define the listener and how to commit the offsets manually.
kafka:ConsumerConfigurationconsumerConfiguration= {groupId:"group-id",topics: ["kafka-topic-1"],pollingInterval:1,autoCommit:false};listenerkafka:ListenerkafkaListener=new (kafka:DEFAULT_URL,consumerConfiguration);serviceon kafkaListener {remotefunction onConsumerRecord(kafka:Callercaller,kafka:BytesConsumerRecord[]records) {// processes the records...// commits the offsets manuallykafka:Error?commitResult=caller->commit();ifcommitResultiskafka:Error {log:printError("Error occurred while committing the offsets for the consumer",'error=commitResult); } }}
Serialization is the process of converting data into a stream of bytes that is used for transmission. Kafkastores and transmits these bytes of arrays in its queue. Deserialization does the opposite of serializationin which bytes of arrays are converted into the desired data type.
Currently, this library only supports thebyte array data type for both the keys and values. The following code snippetsshow how to produce and read a message from Kafka.
string message="Hello World, Ballerina";string key="my-key";// converts the message and key to a byte arraycheckkafkaProducer->send({topic:"test-kafka-topic",key:key.toBytes(),value:message.toBytes() });
kafka:BytesConsumerRecord[]records=checkkafkaConsumer->poll(1);foreachvarkafkaRecordinrecords {byte[] messageContent=kafkaRecord.value;// tries to generate the string value from the byte arraystring result=checkstring:fromBytes(messageContent);io:println("The result is :",result);}
In Kafka, records are grouped into smaller units called partitions. These can be processed independently withoutcompromising the correctness of the results and lays the foundation for parallel processing. This can be achieved byusing multiple consumers within the same group each reading and processing data from a subset of topic partitions andrunning in a single thread.
Topic partitions are assigned to consumers automatically or you can manually assign topic partitions.
The following code snippet joins a consumer to theconsumer-group and assigns it to a topic partition manually.
kafka:ConsumerConfigurationconsumerConfiguration= {// `groupId` determines the consumer groupgroupId:"consumer-group",pollingInterval:1,autoCommit:false};kafka:ConsumerkafkaConsumer=checknew (kafka:DEFAULT_URL,consumerConfiguration);// creates a topic partitionkafka:TopicPartitiontopicPartition= {topic:"kafka-topic-1",partition:1};// passes the topic partitions to the assign function as an arraycheckkafkaConsumer->assign([topicPartition]);
The following example shows how to use the Ballerinakafka connector to produce and consume messages using an Apache Kafka message broker.
- Order manager: A simple order management system that uses Kafka to process orders.
- Word count calculator: A word count calculator that reads messages from a Kafka topic and counts the occurrences of each word.
- Twitter filter: A Twitter filter that reads tweets from a Kafka topic and filters them based on certain criteria.
- Stock trading analyzer: This example demonstrates a simulated stock trading system built using Kafka and Ballerina.
- Banking transaction processor: A banking transaction processor that processes banking transactions using Kafka. It illustrates how banking transactions can be published and consumed in real time, while also integrating with Confluent Schema Registry to manage message schemas between the producer and consumer.
Issues and Projects tabs are disabled for this repository as this is part of the Ballerina Standard Library. To report bugs, request new features, start new discussions, view project boards, etc., go to theBallerina Standard Library parent repository.
This repository only contains the source code for the library.
Download and install Java SE Development Kit (JDK) version 21 (from one of the following locations).
- Download and installDocker. This is required to run the tests.
Execute the commands below to build from the source.
To build the library:
./gradlew clean buildTo run the tests:
./gradlew clean testTo build the library without the tests:
./gradlew clean build -x testTo debug library implementation:
./gradlew clean build -Pdebug=<port>To debug the library with Ballerina language:
./gradlew clean build -PbalJavaDebug=<port>Publish ZIP artifact to the local
.m2repository:./gradlew clean build publishToMavenLocalPublish the generated artifacts to the local Ballerina central repository:
./gradlew clean build -PpublishToLocalCentral=truePublish the generated artifacts to the Ballerina central repository:
./gradlew clean build -PpublishToCentral=true
As an open source project, Ballerina welcomes contributions from the community.
For more information, go to thecontribution guidelines.
All the contributors are encouraged to read theBallerina Code of Conduct.
- For more information go to the
kafkalibrary. - For example demonstrations of the usage, go toBallerina By Examples.
- Chat live with us via ourDiscord server.
- Post all technical questions on Stack Overflow with the#ballerina tag.
About
Ballerina Kafka Module.
Topics
Resources
License
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Packages0
Uh oh!
There was an error while loading.Please reload this page.