Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Node.js bindings for librdkafka

License

NotificationsYou must be signed in to change notification settings

playson-dev/node-rdkafka

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Copyright (c) 2016 Blizzard Entertainment.

https://github.com/blizzard/node-rdkafka

Build Status

npm version

Looking for Collaborators!

I am looking foryour help to make this project even better! If you're interested, checkthis out

Overview

Thenode-rdkafka library is a high-performance NodeJS client forApache Kafka that wraps the nativelibrdkafka library. All the complexity of balancing writes across partitions and managing (possibly ever-changing) brokers should be encapsulated in the library.

This library currently useslibrdkafka version2.5.3.

Reference Docs

To view the reference docs for the current version, gohere

Contributing

For guidelines on contributing please seeCONTRIBUTING.md

Code of Conduct

Play nice; Play fair.

Requirements

  • Apache Kafka >=0.9
  • Node.js >=16
  • Linux/Mac
  • Windows?! See below
  • OpenSSL

Mac OS High Sierra / Mojave

OpenSSL has been upgraded in High Sierra and homebrew does not overwrite default system libraries. That means when building node-rdkafka, because you are using openssl, you need to tell the linker where to find it:

export CPPFLAGS=-I/usr/local/opt/openssl/includeexport LDFLAGS=-L/usr/local/opt/openssl/lib

Then you can runnpm install on your application to get it to build correctly.

NOTE: From thelibrdkafka docs

WARNING: Due to a bug in Apache Kafka 0.9.0.x, the ApiVersionRequest (as sent by the client when connecting to the broker) will be silently ignored by the broker causing the request to time out after 10 seconds. This causes client-broker connections to stall for 10 seconds during connection-setup before librdkafka falls back on thebroker.version.fallback protocol features. The workaround is to explicitly configureapi.version.request tofalse on clients communicating with <=0.9.0.x brokers.

Alpine

Using Alpine Linux? Check out thedocs.

Windows

Windows buildis not compiled fromlibrdkafka source but it is rather linked against the appropriate version ofNuGet librdkafka.redist static binary that gets downloaded fromhttps://globalcdn.nuget.org/packages/librdkafka.redist.2.5.3.nupkg during installation. This download link can be changed using the environment variableNODE_RDKAFKA_NUGET_BASE_URL that defaults tohttps://globalcdn.nuget.org/packages/ when it's no set.

Requirements:

Note: Istill do not recommend usingnode-rdkafka in production on Windows. This feature was in high demand and is provided to help develop, but we do not test against Windows, and windows support may lag behind Linux/Mac support because those platforms are the ones used to develop this library. Contributors are welcome if any Windows issues are found :)

Tests

This project includes two types of unit tests in this project:

  • end-to-end integration tests
  • unit tests

You can run both types of tests by usingMakefile. Doing so callsmocha in your locally installednode_modules directory.

  • Before you run the tests, be sure to init and update the submodules:
    1. git submodule init
    2. git submodule update
  • To run the unit tests, you can runmake lint ormake test.
  • To run the integration tests, you must have a running Kafka installation available. By default, the test tries to connect tolocalhost:9092; however, you can supply theKAFKA_HOST environment variable to override this default behavior. Runmake e2e.

Usage

You can install thenode-rdkafka module like any other module:

npm install node-rdkafka

To use the module, you mustrequire it.

constKafka=require('node-rdkafka');

Configuration

You can pass many configuration options tolibrdkafka. A full list can be found inlibrdkafka'sConfiguration.md

Configuration keys that have the suffix_cb are designated as callbacks. Someof these keys are informational and you can choose to opt-in (for example,dr_cb). Others are callbacks designed toreturn a value, such aspartitioner_cb.

Not all of these options are supported.The library will throw an error if the value you send in is invalid.

The library currently supports the following callbacks:

  • partitioner_cb
  • dr_cb ordr_msg_cb
  • event_cb
  • rebalance_cb (seeRebalancing)
  • offset_commit_cb (seeCommits)

Librdkafka Methods

This library includes two utility functions for detecting the status of your installation. Please try to include these when making issue reports where applicable.

You can get the features supported by your compile oflibrdkafka by reading the variable "features" on the root of thenode-rdkafka object.

constKafka=require('node-rdkafka');console.log(Kafka.features);// #=> [ 'gzip', 'snappy', 'ssl', 'sasl', 'regex', 'lz4' ]

You can also get the version oflibrdkafka

constKafka=require('node-rdkafka');console.log(Kafka.librdkafkaVersion);// #=> 2.5.3

Sending Messages

AProducer sends messages to Kafka. TheProducer constructor takes a configuration object, as shown in the following example:

constproducer=newKafka.Producer({'metadata.broker.list':'kafka-host1:9092,kafka-host2:9092'});

AProducer requires onlymetadata.broker.list (the Kafka brokers) to be created. The values in this list are separated by commas. For other configuration options, see theConfiguration.md file described previously.

The following example illustrates a list with severallibrdkafka options set.

constproducer=newKafka.Producer({'client.id':'kafka','metadata.broker.list':'localhost:9092','compression.codec':'gzip','retry.backoff.ms':200,'message.send.max.retries':10,'socket.keepalive.enable':true,'queue.buffering.max.messages':100000,'queue.buffering.max.ms':1000,'batch.num.messages':1000000,'dr_cb':true});

Stream API

You can easily use theProducer as a writable stream immediately after creation (as shown in the following example):

// Our producer with its Kafka brokers// This call returns a new writable stream to our topic 'topic-name'conststream=Kafka.Producer.createWriteStream({'metadata.broker.list':'kafka-host1:9092,kafka-host2:9092'},{},{topic:'topic-name'});// Writes a message to the streamconstqueuedSuccess=stream.write(Buffer.from('Awesome message'));if(queuedSuccess){console.log('We queued our message!');}else{// Note that this only tells us if the stream's queue is full,// it does NOT tell us if the message got to Kafka!  See below...console.log('Too many messages in our queue already');}// NOTE: MAKE SURE TO LISTEN TO THIS IF YOU WANT THE STREAM TO BE DURABLE// Otherwise, any error will bubble up as an uncaught exception.stream.on('error',(err)=>{// Here's where we'll know if something went wrong sending to Kafkaconsole.error('Error in our kafka stream');console.error(err);})

If you do not want your code to crash when an error happens, ensure you have anerror listener on the stream. Most errors are not necessarily fatal, but the ones that are will immediately destroy the stream. If you useautoClose, the stream will close itself at the first sign of a problem.

Standard API

The Standard API is more performant, particularly when handling high volumes of messages.However, it requires more manual setup to use. The following example illustrates its use:

constproducer=newKafka.Producer({'metadata.broker.list':'localhost:9092','dr_cb':true});// Connect to the broker manuallyproducer.connect();// Wait for the ready event before proceedingproducer.on('ready',()=>{try{producer.produce(// Topic to send the message to'topic',// optionally we can manually specify a partition for the message// this defaults to -1 - which will use librdkafka's default partitioner (consistent random for keyed messages, random for unkeyed messages)null,// Message to send. Must be a bufferBuffer.from('Awesome message'),// for keyed messages, we also specify the key - note that this field is optional'Stormwind',// you can send a timestamp here. If your broker version supports it,// it will get added. Otherwise, we default to 0Date.now(),// you can send an opaque token here, which gets passed along// to your delivery reports);}catch(err){console.error('A problem occurred when sending our message');console.error(err);}});// Any errors we encounter, including connection errorsproducer.on('event.error',(err)=>{console.error('Error from producer');console.error(err);})// We must either call .poll() manually after sending messages// or set the producer to poll on an interval (.setPollInterval).// Without this, we do not get delivery events and the queue// will eventually fill up.producer.setPollInterval(100);

To see the configuration options available to you, see theConfiguration section.

Methods
MethodDescription
producer.connect()Connects to the broker.

Theconnect() method emits theready event when it connects successfully. If it does not, the error will be passed through the callback.
producer.disconnect()Disconnects from the broker.

Thedisconnect() method emits thedisconnected event when it has disconnected. If it does not, the error will be passed through the callback.
producer.poll()Polls the producer for delivery reports or other events to be transmitted via the emitter.

In order to get the events inlibrdkafka's queue to emit, you must call this regularly.
producer.setPollInterval(interval)Polls the producer on this interval, handling disconnections and reconnection. Set it to 0 to turn it off.
producer.produce(topic, partition, msg, key, timestamp, opaque)Sends a message.

Theproduce() method throws when produce would return an error. Ordinarily, this is just if the queue is full.
producer.flush(timeout, callback)Flush the librdkafka internal queue, sending all messages. Default timeout is 500ms
producer.initTransactions(timeout, callback)Initializes the transactional producer.
producer.beginTransaction(callback)Starts a new transaction.
producer.sendOffsetsToTransaction(offsets, consumer, timeout, callback)Sends consumed topic-partition-offsets to the broker, which will get committed along with the transaction.
producer.abortTransaction(timeout, callback)Aborts the ongoing transaction.
producer.commitTransaction(timeout, callback)Commits the ongoing transaction.
Events

Some configuration properties that end in_cb indicate that an event should be generated for that option. You can either:

  • provide a value oftrue and react to the event
  • provide a callback function directly

The following example illustrates an event:

constproducer=newKafka.Producer({'client.id':'my-client',// Specifies an identifier to use to help trace activity in Kafka'metadata.broker.list':'localhost:9092',// Connect to a Kafka instance on localhost'dr_cb':true// Specifies that we want a delivery-report event to be generated});// Poll for events every 100 msproducer.setPollInterval(100);producer.on('delivery-report',(err,report)=>{// Report of delivery statistics here://console.log(report);});

The following table describes types of events.

EventDescription
disconnectedThedisconnected event is emitted when the broker has disconnected.

This event is emitted only when.disconnect is called. The wrapper will always try to reconnect otherwise.
readyTheready event is emitted when theProducer is ready to send messages.
eventTheevent event is emitted whenlibrdkafka reports an event (if you opted in via theevent_cb option).
event.logTheevent.log event is emitted when logging events come in (if you opted into logging via theevent_cb option).

You will need to set a value fordebug if you want to send information.
event.statsTheevent.stats event is emitted whenlibrdkafka reports stats (if you opted in by setting thestatistics.interval.ms to a non-zero value).
event.errorTheevent.error event is emitted whenlibrdkafka reports an error
event.throttleTheevent.throttle event emitted whenlibrdkafka reports throttling.
delivery-reportThedelivery-report event is emitted when a delivery report has been found via polling.

To use this event, you must setrequest.required.acks to1 or-1 in topic configuration anddr_cb (ordr_msg_cb if you want the report to contain the message payload) totrue in theProducer constructor options.

Higher Level Producer

The higher level producer is a variant of the producer which can propagate callbacks to you upon message delivery.

constproducer=newKafka.HighLevelProducer({'metadata.broker.list':'localhost:9092',});

This will enrich the produce call so it will have a callback to tell you when the message has been delivered. You lose the ability to specify opaque tokens.

producer.produce(topicName,null,Buffer.from('alliance4ever'),null,Date.now(),(err,offset)=>{// The offset if our acknowledgement level allows us to receive delivery offsetsconsole.log(offset);});

Additionally you can add serializers to modify the value of a produce for a key or value before it is sent over to Kafka.

producer.setValueSerializer((value)=>{returnBuffer.from(JSON.stringify(value));});

Otherwise the behavior of the class should be exactly the same.

Kafka.KafkaConsumer

To read messages from Kafka, you use aKafkaConsumer. You instantiate aKafkaConsumer object as follows:

constconsumer=newKafka.KafkaConsumer({'group.id':'kafka','metadata.broker.list':'localhost:9092',},{});

The first parameter is the global config, while the second parameter is the topic config that gets applied to all subscribed topics. To view a list of all supported configuration properties, see theConfiguration.md file described previously. Look for theC and* keys.

Thegroup.id andmetadata.broker.list properties are required for a consumer.

Rebalancing

Rebalancing is managed internally bylibrdkafka by default. If you would like to override this functionality, you may provide your own logic as a rebalance callback.

constconsumer=newKafka.KafkaConsumer({'group.id':'kafka','metadata.broker.list':'localhost:9092','rebalance_cb':(err,assignment)=>{if(err.code===Kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS){// Note: this can throw when you are disconnected. Take care and wrap it in// a try catch if that matters to youthis.assign(assignment);}elseif(err.code==Kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS){// Same as abovethis.unassign();}else{// We had a real errorconsole.error(err);}}})

this is bound to theKafkaConsumer you have created. By specifying arebalance_cb you can also listen to therebalance event as an emitted event. This event is not emitted when using the internallibrdkafka rebalancer.

Commits

When you commit innode-rdkafka, the standard way is to queue the commit request up with the nextlibrdkafka request to the broker. When doing this, there isn't a way to know the result of the commit. Luckily there is another callback you can listen to to get this information

constconsumer=newKafka.KafkaConsumer({'group.id':'kafka','metadata.broker.list':'localhost:9092','offset_commit_cb':(err,topicPartitions)=>{if(err){// There was an error committingconsole.error(err);}else{// Commit went through. Let's log the topic partitionsconsole.log(topicPartitions);}}})

this is bound to theKafkaConsumer you have created. By specifying anoffset_commit_cb you can also listen to theoffset.commit event as an emitted event. It receives an error and the list of topic partitions as argument. This is not emitted unless opted in.

Message Structure

Messages that are returned by theKafkaConsumer have the following structure.

{value:Buffer.from('hi'),// message contents as a Buffersize:2,// size of the message, in bytestopic:'librdtesting-01',// topic the message comes fromoffset:1337,// offset the message was read frompartition:1,// partition the message was onkey:'someKey',// key of the message if presenttimestamp:1510325354780// timestamp of message creation}

Stream API

The stream API is the easiest way to consume messages. The following example illustrates the use of the stream API:

// Read from the librdtesting-01 topic... note that this creates a new stream on each call!conststream=KafkaConsumer.createReadStream(globalConfig,topicConfig,{topics:['librdtesting-01']});stream.on('data',(message)=>{console.log('Got message');console.log(message.value.toString());});

You can also get theconsumer from the streamConsumer, for using consumer methods. The following example illustrates that:

stream.consumer.commit();// Commits all locally stored offsets

Standard API

You can also use the Standard API and manage callbacks and events yourself. You can choose different modes for consuming messages:

  • Flowing mode. This mode flows all of the messages it can read by maintaining an infinite loop in the event loop. It only stops when it detects the consumer has issued theunsubscribe ordisconnect method.
  • Non-flowing mode. This mode reads a single message from Kafka at a time manually.

The following example illustrates flowing mode:

// Flowing modeconsumer.connect();consumer.on('ready',()=>{consumer.subscribe(['librdtesting-01']);// Consume from the librdtesting-01 topic. This is what determines// the mode we are running in. By not specifying a callback (or specifying// only a callback) we get messages as soon as they are available.consumer.consume();}).on('data',(data)=>{// Output the actual message contentsconsole.log(data.value.toString());});

The following example illustrates non-flowing mode:

// Non-flowing modeconsumer.connect();consumer.on('ready',()=>{// Subscribe to the librdtesting-01 topic// This makes subsequent consumes read from that topic.consumer.subscribe(['librdtesting-01']);// Read one message every 1000 millisecondssetInterval(()=>{consumer.consume(1);},1000);}).on('data',(data)=>{console.log('Message found!  Contents below.');console.log(data.value.toString());});

The following table lists important methods for this API.

MethodDescription
consumer.connect()Connects to the broker.

Theconnect() emits the eventready when it has successfully connected. If it does not, the error will be passed through the callback.
consumer.disconnect()Disconnects from the broker.

Thedisconnect() method emitsdisconnected when it has disconnected. If it does not, the error will be passed through the callback.
consumer.subscribe(topics)Subscribes to an array of topics.
consumer.unsubscribe()Unsubscribes from the currently subscribed topics.

You cannot subscribe to different topics without calling theunsubscribe() method first.
consumer.consume(cb)Gets messages from the existing subscription as quickly as possible. Ifcb is specified, invokescb(err, message).

This method keeps a background thread running to do the work. Note that the number of threads in nodejs process is limited byUV_THREADPOOL_SIZE (default value is 4) and using up all of them blocks other parts of the application that need threads. If you need multiple consumers then consider increasingUV_THREADPOOL_SIZE or usingconsumer.consume(number, cb) instead.
consumer.consume(number, cb)Getsnumber of messages from the existing subscription. Ifcb is specified, invokescb(err, message).
consumer.commit()Commits all locally stored offsets
consumer.commit(topicPartition)Commits offsets specified by the topic partition
consumer.commitMessage(message)Commits the offsets specified by the message

The following table lists events for this API.

EventDescription
dataWhen using the Standard API consumed messages are emitted in this event.
partition.eofWhen using Standard API and the configuration optionenable.partition.eof is set,partition.eof events are emitted in this event. The event containstopic,partition andoffset properties.
warningThe event is emitted in case ofUNKNOWN_TOPIC_OR_PART orTOPIC_AUTHORIZATION_FAILED errors when consuming inFlowing mode. Since the consumer will continue working if the error is still happening, the warning event should reappear after the next metadata refresh. To control the metadata refresh rate settopic.metadata.refresh.interval.ms property. Once you resolve the error, you can manually callgetMetadata to speed up consumer recovery.
rebalanceTherebalance event is emitted when the consumer group is rebalanced.

This event is only emitted if therebalance_cb configuration is set to a function or set totrue
disconnectedThedisconnected event is emitted when the broker disconnects.

This event is only emitted when.disconnect is called. The wrapper will always try to reconnect otherwise.
readyTheready event is emitted when theConsumer is ready to read messages.
eventTheevent event is emitted whenlibrdkafka reports an event (if you opted in via theevent_cb option).
event.logTheevent.log event is emitted when logging events occur (if you opted in for logging via theevent_cb option).

You will need to set a value fordebug if you want information to send.
event.statsTheevent.stats event is emitted whenlibrdkafka reports stats (if you opted in by setting thestatistics.interval.ms to a non-zero value).
event.errorTheevent.error event is emitted whenlibrdkafka reports an error
event.throttleTheevent.throttle event is emitted whenlibrdkafka reports throttling.

Reading current offsets from the broker for a topic

Some times you find yourself in the situation where you need to know the latest (and earliest) offset for one of your topics. Connected producers and consumers both allow you to query for these throughqueryWaterMarkOffsets like follows:

consttimeout=5000,partition=0;consumer.queryWatermarkOffsets('my-topic',partition,timeout,(err,offsets)=>{consthigh=offsets.highOffset;constlow=offsets.lowOffset;});producer.queryWatermarkOffsets('my-topic',partition,timeout,(err,offsets)=>{consthigh=offsets.highOffset;constlow=offsets.lowOffset;});Anerrorwillbereturnediftheclientwasnotconnectedortherequesttimedoutwithinthespecifiedinterval.

Metadata

BothKafka.Producer andKafka.KafkaConsumer include agetMetadata method to retrieve metadata from Kafka.

Getting metadata on any connection returns the following data structure:

{orig_broker_id:1,orig_broker_name:"broker_name",brokers:[{id:1,host:'localhost',port:40}],topics:[{name:'awesome-topic',partitions:[{id:1,leader:20,replicas:[1,2],isrs:[1,2]}]}]}

The following example illustrates how to use thegetMetadata method.

When fetching metadata for a specific topic, if a topic reference does not exist, one is created using the default config.Please see the documentation onClient.getMetadata if you want to set configuration parameters, e.g.acks, on a topic to produce messages to.

constopts={topic:'librdtesting-01',timeout:10000};producer.getMetadata(opts,(err,metadata)=>{if(err){console.error('Error getting metadata');console.error(err);}else{console.log('Got metadata');console.log(metadata);}});

Admin Client

node-rdkafka now supports the admin client for creating, deleting, and scaling out topics. Thelibrdkafka APIs also support altering configuration of topics and broker, but that is not currently implemented.

To create an Admin client, you can do as follows:

constKafka=require('node-rdkafka');constclient=Kafka.AdminClient.create({'client.id':'kafka-admin','metadata.broker.list':'broker01'});

This will instantiate theAdminClient, which will allow the calling of the admin methods.

client.createTopic({topic:topicName,num_partitions:1,replication_factor:1},(err)=>{// Done!});

All of the admin api methods can have an optional timeout as their penultimate parameter.

The following table lists important methods for this API.

MethodDescription
client.disconnect()Destroy the admin client, making it invalid for further use.
client.createTopic(topic, timeout, cb)Create a topic on the broker with the given configuration. See JS doc for more on structure of the topic object
client.deleteTopic(topicName, timeout, cb)Delete a topic of the given name
client.createPartitions(topicName, desiredPartitions, timeout, cb)Create partitions until the topic has the desired number of partitions.

Check the tests for an example of how to use this API!

About

Node.js bindings for librdkafka

Resources

License

Contributing

Stars

Watchers

Forks

Packages

No packages published

Languages

  • JavaScript51.4%
  • C++46.3%
  • Other2.3%

[8]ページ先頭

©2009-2025 Movatter.jp