- Notifications
You must be signed in to change notification settings - Fork695
Confluent's Apache Kafka Golang client
License
confluentinc/confluent-kafka-go
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
confluent-kafka-go is Confluent's Golang client forApache Kafka and theConfluent Platform.
Features:
High performance - confluent-kafka-go is a lightweight wrapper aroundlibrdkafka, a finely tuned Cclient.
Reliability - There are a lot of details to get right when writing an Apache Kafkaclient. We get them right in one place (librdkafka) and leverage this workacross all of our clients (alsoconfluent-kafka-pythonandconfluent-kafka-dotnet).
Supported - Commercial support is offered byConfluent.
Future proof - Confluent, founded by theoriginal creator/co-creator of Kafka, is building astreaming platformwith Apache Kafka at its core. It's high priority for us that client features keeppace with core Apache Kafka and components of theConfluent Platform.
The Golang bindings provides a high-level Producer and Consumer with supportfor the balanced consumer groups of Apache Kafka 0.9 and above.
See theAPI documentation for more information.
For a step-by-step guide on using the client seeGetting Started with Apache Kafka and Golang.
High-level balanced consumer
import ("fmt""time""github.com/confluentinc/confluent-kafka-go/v2/kafka")funcmain() {c,err:=kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers":"localhost","group.id":"myGroup","auto.offset.reset":"earliest",})iferr!=nil {panic(err)}err=c.SubscribeTopics([]string{"myTopic","^aRegex.*[Tt]opic"},nil)iferr!=nil {panic(err)}// A signal handler or similar could be used to set this to false to break the loop.run:=trueforrun {msg,err:=c.ReadMessage(time.Second)iferr==nil {fmt.Printf("Message on %s: %s\n",msg.TopicPartition,string(msg.Value))}elseif!err.(kafka.Error).IsTimeout() {// The client will automatically try to recover from all errors.// Timeout is not considered an error because it is raised by// ReadMessage in absence of messages.fmt.Printf("Consumer error: %v (%v)\n",err,msg)}}c.Close()}
Producer
import ("fmt""github.com/confluentinc/confluent-kafka-go/v2/kafka")funcmain() {p,err:=kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers":"localhost"})iferr!=nil {panic(err)}deferp.Close()// Delivery report handler for produced messagesgofunc() {fore:=rangep.Events() {switchev:=e.(type) {case*kafka.Message:ifev.TopicPartition.Error!=nil {fmt.Printf("Delivery failed: %v\n",ev.TopicPartition)}else {fmt.Printf("Delivered message to %v\n",ev.TopicPartition)}}}}()// Produce messages to topic (asynchronously)topic:="myTopic"for_,word:=range []string{"Welcome","to","the","Confluent","Kafka","Golang","client"} {p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic:&topic,Partition:kafka.PartitionAny},Value: []byte(word),},nil)}// Wait for message deliveries before shutting downp.Flush(15*1000)}
More elaborate examples are available in theexamples directory,includinghow to configure the Go clientfor use withConfluent Cloud.
Supports Go 1.17+ and librdkafka 2.12.0+.
You can useGo Modules to installconfluent-kafka-go.
Import thekafka package from GitHub in your code:
import"github.com/confluentinc/confluent-kafka-go/v2/kafka"
Build your project:
go build ./...
If you are building for Alpine Linux (musl),-tags musl must be specified.
go build -tags musl ./...
A dependency to the latest stable version of confluent-kafka-go should be automatically added toyourgo.mod file.
Manual install:
go get -u github.com/confluentinc/confluent-kafka-go/v2/kafka
Golang import:
import"github.com/confluentinc/confluent-kafka-go/v2/kafka"
Prebuilt librdkafka binaries are included with the Go client and librdkafkadoes not need to be installed separately on the build or target system.The following platforms are supported by the prebuilt librdkafka binaries:
- Mac OSX x64 and arm64
- glibc-based Linux x64 and arm64 (e.g., RedHat, Debian, CentOS, Ubuntu, etc) - without GSSAPI/Kerberos support
- musl-based Linux amd64 and arm64 (Alpine) - without GSSAPI/Kerberos support
- Windows amd64 - without GSSAPI/Kerberos support
When building your application for Alpine Linux (musl libc) you must pass-tags musl togo get,go build, etc.
CGO_ENABLED must NOT be set to0 since the Go client is based on theC library librdkafka.
If GSSAPI/Kerberos authentication support is required you will needto install librdkafka separately, see theInstalling librdkafka chapterbelow, and then build your Go application with-tags dynamic.
If the bundled librdkafka build is not supported on your platform, or youneed a librdkafka with GSSAPI/Kerberos support, you must install librdkafkamanually on the build and target system using one of the following alternatives:
- For Debian and Ubuntu based distros, install
librdkafka-devfrom the standardrepositories or usingConfluent's Deb repository. - For Redhat based distros, install
librdkafka-develusingConfluent's YUM repository. - For MacOS X, install
librdkafkafrom Homebrew. You may also need to brew install pkg-config if you don't already have it:brew install librdkafka pkg-config. - For Alpine:
apk add librdkafka-dev pkgconf - For Windows: there are no official/supported packages, but static builds are included for Windows/x64.Installing from source is needed only for GSSAPI/Kerberos support.
- For source builds, see instructions below.
Build from source:
git clone https://github.com/confluentinc/librdkafka.gitcd librdkafka./configuremakesudo make installAfter installing librdkafka you will need to build your Go applicationwith-tags dynamic.
Note: If you use themaster branch of the Go client, then you need to usethemaster branch of librdkafka.
confluent-kafka-go requires librdkafka v1.9.0 or later.
Since we are usingcgo, Go builds a dynamically linked library even when usingthe prebuilt, statically-compiled librdkafka as described in thelibrdkafkachapter.
Forglibc based systems, if the system where the client is being compiled isdifferent from the target system, especially when the target system is older,there is aglibc version error when trying to run the compiled client.
Unfortunately, if we try building a statically linked binary, it doesn't solve the problem,since there is no way to have truly static builds usingglibc. This isbecause there are some functions inglibc, likegetaddrinfo which need the sharedversion of the library even when the code is compiled statically.
One way around this is to either use a container/VM to build the binary, or installan older version ofglibc on the system where the client is being compiled.
The other way is usingmusl to create truly static builds for Linux. To do this,install it for your system.
Static compilation command, meant to be used alongside the prebuilt librdkafka bundle:
CC=/path/to/musl-gcc go build --ldflags'-linkmode external -extldflags "-static"' -tags muslThis client supports FIPS 140-3 compliance for Schema Registry operations when using Go 1.24.3 or newer.
The Schema Registry Go client can operate in FIPS 140-3 compliant mode using Go's native FIPS support:
Build with FIPS support:
GOFIPS140=inprocess go build -o myapp
Run in FIPS mode:
GODEBUG=fips140=only ./myapp
When running withGODEBUG=fips140=only, the application will use only FIPS 140-3 validated cryptographic implementations for all TLS connections to Schema Registry. The application will panic immediately if any non-FIPS-approved cryptographic operation is attempted.
The recommended API strand is the Function-Based one,the Channel-Based one is documented inexamples/legacy.
Messages, errors and events are polled through theconsumer.Poll() function.
It has direct mapping to underlying librdkafka functionality.
Application callsproducer.Produce() to produce messages.Delivery reports are emitted on theproducer.Events() or specified private channel.
Warnings
Produce()is a non-blocking call, if the internal librdkafka queue is fullthe call will fail and can be retried.
KAFKA is a registered trademark of The Apache Software Foundation and has been licensed for useby confluent-kafka-go. confluent-kafka-go has no affiliation with and is not endorsed by The ApacheSoftware Foundation.
Seekafka/README
Contributions to the code, examples, documentation, et.al, are very much appreciated.
Make your changes, rungofmt, tests, etc, push your branch, create a PR, andsign the CLA.
For a step-by-step guide on using the Golang client with Confluent Cloud seeGetting Started with Apache Kafka and Golang onConfluent Developer.
About
Confluent's Apache Kafka Golang client
Topics
Resources
License
Uh oh!
There was an error while loading.Please reload this page.