- Notifications
You must be signed in to change notification settings - Fork78
A gem providing cross-platform Kafka producer and consumer support
License
buoyant-data/hermann
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
A Ruby gem implementing a Kafka Publisher and Consumer
On MRI (C-based Ruby), this library wraps thelibrdkafkalibrary which is implemented in C.
On JRuby this librarydeclares jardependenciesinside the.gemspec
to express dependencies on the Java-based Kafka libraryprovided by the Kafka project. Tools likejbundler will handle thesedeclarations correctly.
Usage is modelled on thekafka-rb gem and is fairlystraightforward.
- Kafka 0.8 is supported.
- Ruby 1.9.3, 2.1.1 and JRuby are tested against
Discover Kafka brokers through zookeeper. Looks at/brokers/ids
in Zookeeper to find the list of brokers.
require'hermann/producer'require'hermann/discovery/zookeeper'broker_ids_array=Hermann::Discovery::Zookeeper.new('localhost:2181').get_brokersproducer=Hermann::Producer.new('topic',broker_ids_array)promise=producer.push('hello world')# send message to kafkapromise.value# forces the Concurrent::Promise to finish excuting (#value!)promise.state# the state of the promise
require'hermann/producer'broker_ids_array=Hermann::Discovery::Zookeeper.new('localhost:2181').get_brokersp=Hermann::Producer.new('topic',broker_ids_array)# arguments topic, list of brokersf=p.push('hello world from mri')f.statep.tick_reactorf.state
Messages can be consumed by calling the consume method and passing a block to handle the yielded messages. The consume method blocks, so take care to handle that functionality appropriately (i.e. use Concurrent::Promise, Thread, etc).
require'hermann'require'hermann/consumer'require'hermann_jars'topic='topic'new_topic='other_topic'the_consumer=Hermann::Consumer.new(topic,zookeepers:"localhost:2181",group_id:"group1")the_consumer.consume(new_topic)do |msg|# can change topic with optional argument to .consumeputs"Recv:#{msg}"end
MRI currently has no zookeeper / client group support.
require'hermann'require'hermann/consumer'topic='topic'new_topic='other_topic'the_consumer=Hermann::Consumer.new(topic,brokers:"localhost:9092",partition:1)the_consumer.consume(new_topic)do |msg,key,offset|# can change topic with optional argument to .consumeputs"Recv:#{msg}, key:#{key}, offset:#{offset}"end
Topic and cluster metadata may be retrieved in the MRI version by querying the Kafka brokers.
require'hermann'require'hermann/discovery/metadata'c=Hermann::Discovery::Metadata.new("localhost:9092")topic=c.topic("topic")putstopic.partitions.firstconsumers=topic.partitions.mapdo |partition|partition.consumerend
First time (from a clean repository):bundle install && bundle exec rake
Thereafter:bundle exec rake spec
To run the integration tests:
- startup your own instance of zookeeper/kafka
rspec spec/integration/producer_spec.rb
- Gemfile
- remove jruby-kafka
- add
gem "hermann"
bundle install
- Jarfile
- removed unecessary jars from your Jarfile (i.e. kafka, log4j)
- jar dependencies are automatically included with Hermann
jbundle install
- Test out one of the Producer/Consumer examples above
About
A gem providing cross-platform Kafka producer and consumer support