Movatterモバイル変換


[0]ホーム

URL:


kafka

packagemodule
v0.4.49Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 21, 2025 License:MITImports:71Imported by:3,830

Details

Repository

github.com/segmentio/kafka-go

Links

README

kafka-goCircleCIGo Report CardGoDoc

Motivations

We rely on both Go and Kafka a lot at Segment. Unfortunately, the state of the Goclient libraries for Kafka at the time of this writing was not ideal. The availableoptions were:

  • sarama, which is by far the most popularbut is quite difficult to work with. It is poorly documented, the API exposeslow level concepts of the Kafka protocol, and it doesn't support recent Go featureslikecontexts. It also passes all values aspointers which causes large numbers of dynamic memory allocations, more frequentgarbage collections, and higher memory usage.

  • confluent-kafka-go is acgo based wrapper aroundlibrdkafka,which means it introduces a dependency to a C library on all Go code that usesthe package. It has much better documentation than sarama but still lacks supportfor Go contexts.

  • goka is a more recent Kafka client for Gowhich focuses on a specific usage pattern. It provides abstractions for using Kafkaas a message passing bus between services rather than an ordered log of events, butthis is not the typical use case of Kafka for us at Segment. The package alsodepends on sarama for all interactions with Kafka.

This is wherekafka-go comes into play. It provides both low and high levelAPIs for interacting with Kafka, mirroring concepts and implementing interfaces ofthe Go standard library to make it easy to use and integrate with existingsoftware.

Note:

In order to better align with our newly adopted Code of Conduct, the kafka-goproject has renamed our default branch tomain. For the full details of ourCode Of Conduct seethis document.

Kafka versions

kafka-go is currently tested with Kafka versions 0.10.1.0 to 2.7.1.While it should also be compatible with later versions, newer features availablein the Kafka API may not yet be implemented in the client.

Go versions

kafka-go requires Go version 1.15 or later.

ConnectionGoDoc

TheConn type is the core of thekafka-go package. It wraps around a rawnetwork connection to expose a low-level API to a Kafka server.

Here are some examples showing typical use of a connection object:

// to produce messagestopic := "my-topic"partition := 0conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)if err != nil {    log.Fatal("failed to dial leader:", err)}conn.SetWriteDeadline(time.Now().Add(10*time.Second))_, err = conn.WriteMessages(    kafka.Message{Value: []byte("one!")},    kafka.Message{Value: []byte("two!")},    kafka.Message{Value: []byte("three!")},)if err != nil {    log.Fatal("failed to write messages:", err)}if err := conn.Close(); err != nil {    log.Fatal("failed to close writer:", err)}
// to consume messagestopic := "my-topic"partition := 0conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)if err != nil {    log.Fatal("failed to dial leader:", err)}conn.SetReadDeadline(time.Now().Add(10*time.Second))batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB maxb := make([]byte, 10e3) // 10KB max per messagefor {    n, err := batch.Read(b)    if err != nil {        break    }    fmt.Println(string(b[:n]))}if err := batch.Close(); err != nil {    log.Fatal("failed to close batch:", err)}if err := conn.Close(); err != nil {    log.Fatal("failed to close connection:", err)}
To Create Topics

By default kafka has theauto.create.topics.enable='true' (KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE='true' in the bitnami/kafka kafka docker image). If this value is set to'true' then topics will be created as a side effect ofkafka.DialLeader like so:

// to create topics when auto.create.topics.enable='true'conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", "my-topic", 0)if err != nil {    panic(err.Error())}

Ifauto.create.topics.enable='false' then you will need to create topics explicitly like so:

// to create topics when auto.create.topics.enable='false'topic := "my-topic"conn, err := kafka.Dial("tcp", "localhost:9092")if err != nil {    panic(err.Error())}defer conn.Close()controller, err := conn.Controller()if err != nil {    panic(err.Error())}var controllerConn *kafka.ConncontrollerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))if err != nil {    panic(err.Error())}defer controllerConn.Close()topicConfigs := []kafka.TopicConfig{    {        Topic:             topic,        NumPartitions:     1,        ReplicationFactor: 1,    },}err = controllerConn.CreateTopics(topicConfigs...)if err != nil {    panic(err.Error())}
To Connect To Leader Via a Non-leader Connection
// to connect to the kafka leader via an existing non-leader connection rather than using DialLeaderconn, err := kafka.Dial("tcp", "localhost:9092")if err != nil {    panic(err.Error())}defer conn.Close()controller, err := conn.Controller()if err != nil {    panic(err.Error())}var connLeader *kafka.ConnconnLeader, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))if err != nil {    panic(err.Error())}defer connLeader.Close()
To list topics
conn, err := kafka.Dial("tcp", "localhost:9092")if err != nil {    panic(err.Error())}defer conn.Close()partitions, err := conn.ReadPartitions()if err != nil {    panic(err.Error())}m := map[string]struct{}{}for _, p := range partitions {    m[p.Topic] = struct{}{}}for k := range m {    fmt.Println(k)}

Because it is low level, theConn type turns out to be a great building blockfor higher level abstractions, like theReader for example.

ReaderGoDoc

AReader is another concept exposed by thekafka-go package, which intendsto make it simpler to implement the typical use case of consuming from a singletopic-partition pair.AReader also automatically handles reconnections and offset management, andexposes an API that supports asynchronous cancellations and timeouts using Gocontexts.

Note that it is important to callClose() on aReader when a process exits.The kafka server needs a graceful disconnect to stop it from continuing toattempt to send messages to the connected clients. The given example will notcallClose() if the process is terminated with SIGINT (ctrl-c at the shell) orSIGTERM (as docker stop or a kubernetes restart does). This can result in adelay when a new reader on the same topic connects (e.g. new process startedor new container running). Use asignal.Notify handler to close the reader onprocess shutdown.

// make a new reader that consumes from topic-A, partition 0, at offset 42r := kafka.NewReader(kafka.ReaderConfig{    Brokers:   []string{"localhost:9092","localhost:9093", "localhost:9094"},    Topic:     "topic-A",    Partition: 0,    MaxBytes:  10e6, // 10MB})r.SetOffset(42)for {    m, err := r.ReadMessage(context.Background())    if err != nil {        break    }    fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))}if err := r.Close(); err != nil {    log.Fatal("failed to close reader:", err)}
Consumer Groups

kafka-go also supports Kafka consumer groups including broker managed offsets.To enable consumer groups, simply specify the GroupID in the ReaderConfig.

ReadMessage automatically commits offsets when using consumer groups.

// make a new reader that consumes from topic-Ar := kafka.NewReader(kafka.ReaderConfig{    Brokers:   []string{"localhost:9092", "localhost:9093", "localhost:9094"},    GroupID:   "consumer-group-id",    Topic:     "topic-A",    MaxBytes:  10e6, // 10MB})for {    m, err := r.ReadMessage(context.Background())    if err != nil {        break    }    fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))}if err := r.Close(); err != nil {    log.Fatal("failed to close reader:", err)}

There are a number of limitations when using consumer groups:

  • (*Reader).SetOffset will return an error when GroupID is set
  • (*Reader).Offset will always return-1 when GroupID is set
  • (*Reader).Lag will always return-1 when GroupID is set
  • (*Reader).ReadLag will return an error when GroupID is set
  • (*Reader).Stats will return a partition of-1 when GroupID is set
Explicit Commits

kafka-go also supports explicit commits. Instead of callingReadMessage,callFetchMessage followed byCommitMessages.

ctx := context.Background()for {    m, err := r.FetchMessage(ctx)    if err != nil {        break    }    fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))    if err := r.CommitMessages(ctx, m); err != nil {        log.Fatal("failed to commit messages:", err)    }}

When committing messages in consumer groups, the message with the highest offsetfor a given topic/partition determines the value of the committed offset forthat partition. For example, if messages at offset 1, 2, and 3 of a singlepartition were retrieved by call toFetchMessage, callingCommitMessageswith message offset 3 will also result in committing the messages at offsets 1and 2 for that partition.

Managing Commits

By default, CommitMessages will synchronously commit offsets to Kafka. Forimproved performance, you can instead periodically commit offsets to Kafkaby setting CommitInterval on the ReaderConfig.

// make a new reader that consumes from topic-Ar := kafka.NewReader(kafka.ReaderConfig{    Brokers:        []string{"localhost:9092", "localhost:9093", "localhost:9094"},    GroupID:        "consumer-group-id",    Topic:          "topic-A",    MaxBytes:       10e6, // 10MB    CommitInterval: time.Second, // flushes commits to Kafka every second})

WriterGoDoc

To produce messages to Kafka, a program may use the low-levelConn API, butthe package also provides a higher levelWriter type which is more appropriateto use in most cases as it provides additional features:

  • Automatic retries and reconnections on errors.
  • Configurable distribution of messages across available partitions.
  • Synchronous or asynchronous writes of messages to Kafka.
  • Asynchronous cancellation using contexts.
  • Flushing of pending messages on close to support graceful shutdowns.
  • Creation of a missing topic before publishing a message.Note! it was the default behaviour up to the versionv0.4.30.
// make a writer that produces to topic-A, using the least-bytes distributionw := &kafka.Writer{Addr:     kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),Topic:   "topic-A",Balancer: &kafka.LeastBytes{},}err := w.WriteMessages(context.Background(),kafka.Message{Key:   []byte("Key-A"),Value: []byte("Hello World!"),},kafka.Message{Key:   []byte("Key-B"),Value: []byte("One!"),},kafka.Message{Key:   []byte("Key-C"),Value: []byte("Two!"),},)if err != nil {    log.Fatal("failed to write messages:", err)}if err := w.Close(); err != nil {    log.Fatal("failed to close writer:", err)}
Missing topic creation before publication
// Make a writer that publishes messages to topic-A.// The topic will be created if it is missing.w := &Writer{    Addr:                   kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),    Topic:                  "topic-A",    AllowAutoTopicCreation: true,}messages := []kafka.Message{    {        Key:   []byte("Key-A"),        Value: []byte("Hello World!"),    },    {        Key:   []byte("Key-B"),        Value: []byte("One!"),    },    {        Key:   []byte("Key-C"),        Value: []byte("Two!"),    },}var err errorconst retries = 3for i := 0; i < retries; i++ {    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)    defer cancel()        // attempt to create topic prior to publishing the message    err = w.WriteMessages(ctx, messages...)    if errors.Is(err, kafka.LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {        time.Sleep(time.Millisecond * 250)        continue    }    if err != nil {        log.Fatalf("unexpected error %v", err)    }    break}if err := w.Close(); err != nil {    log.Fatal("failed to close writer:", err)}
Writing to multiple topics

Normally, theWriterConfig.Topic is used to initialize a single-topic writer.By excluding that particular configuration, you are given the ability to definethe topic on a per-message basis by settingMessage.Topic.

w := &kafka.Writer{Addr:     kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),    // NOTE: When Topic is not defined here, each Message must define it instead.Balancer: &kafka.LeastBytes{},}err := w.WriteMessages(context.Background(),    // NOTE: Each Message has Topic defined, otherwise an error is returned.kafka.Message{        Topic: "topic-A",Key:   []byte("Key-A"),Value: []byte("Hello World!"),},kafka.Message{        Topic: "topic-B",Key:   []byte("Key-B"),Value: []byte("One!"),},kafka.Message{        Topic: "topic-C",Key:   []byte("Key-C"),Value: []byte("Two!"),},)if err != nil {    log.Fatal("failed to write messages:", err)}if err := w.Close(); err != nil {    log.Fatal("failed to close writer:", err)}

NOTE: These 2 patterns are mutually exclusive, if you setWriter.Topic,you must not also explicitly defineMessage.Topic on the messages you arewriting. The opposite applies when you do not define a topic for the writer.TheWriter will return an error if it detects this ambiguity.

Compatibility with other clients
Sarama

If you're switching from Sarama and need/want to use the same algorithm for message partitioning, you can either usethekafka.Hash balancer or thekafka.ReferenceHash balancer:

  • kafka.Hash =sarama.NewHashPartitioner
  • kafka.ReferenceHash =sarama.NewReferenceHashPartitioner

Thekafka.Hash andkafka.ReferenceHash balancers would route messages to the same partitions that the twoaforementioned Sarama partitioners would route them to.

w := &kafka.Writer{Addr:     kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),Topic:    "topic-A",Balancer: &kafka.Hash{},}
librdkafka and confluent-kafka-go

Use thekafka.CRC32Balancer balancer to get the same behaviour as librdkafka'sdefaultconsistent_random partition strategy.

w := &kafka.Writer{Addr:     kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),Topic:    "topic-A",Balancer: kafka.CRC32Balancer{},}
Java

Use thekafka.Murmur2Balancer balancer to get the same behaviour as the canonicalJava client's default partitioner. Note: the Java class allows you to directly specifythe partition which is not permitted.

w := &kafka.Writer{Addr:     kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),Topic:    "topic-A",Balancer: kafka.Murmur2Balancer{},}
Compression

Compression can be enabled on theWriter by setting theCompression field:

w := &kafka.Writer{Addr:        kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),Topic:       "topic-A",Compression: kafka.Snappy,}

TheReader will by determine if the consumed messages are compressed byexamining the message attributes. However, the package(s) for all expectedcodecs must be imported so that they get loaded correctly.

Note: in versions prior to 0.4 programs had to import compression packages toinstall codecs and support reading compressed messages from kafka. This is nolonger the case and import of the compression packages are now no-ops.

TLS Support

For a bare bones Conn type or in the Reader/Writer configs you can specify a dialer option for TLS support. If the TLS field is nil, it will not connect with TLS.Note: Connecting to a Kafka cluster with TLS enabled without configuring TLS on the Conn/Reader/Writer can manifest in opaque io.ErrUnexpectedEOF errors.

Connection
dialer := &kafka.Dialer{    Timeout:   10 * time.Second,    DualStack: true,    TLS:       &tls.Config{...tls config...},}conn, err := dialer.DialContext(ctx, "tcp", "localhost:9093")
Reader
dialer := &kafka.Dialer{    Timeout:   10 * time.Second,    DualStack: true,    TLS:       &tls.Config{...tls config...},}r := kafka.NewReader(kafka.ReaderConfig{    Brokers:        []string{"localhost:9092", "localhost:9093", "localhost:9094"},    GroupID:        "consumer-group-id",    Topic:          "topic-A",    Dialer:         dialer,})
Writer

Direct Writer creation

w := kafka.Writer{    Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),     Topic:   "topic-A",    Balancer: &kafka.Hash{},    Transport: &kafka.Transport{        TLS: &tls.Config{},      },    }

Usingkafka.NewWriter

dialer := &kafka.Dialer{    Timeout:   10 * time.Second,    DualStack: true,    TLS:       &tls.Config{...tls config...},}w := kafka.NewWriter(kafka.WriterConfig{Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},Topic:   "topic-A",Balancer: &kafka.Hash{},Dialer:   dialer,})

Note thatkafka.NewWriter andkafka.WriterConfig are deprecated and will be removed in a future release.

SASL Support

You can specify an option on theDialer to use SASL authentication. TheDialer can be used directly to open aConn or it can be passed to aReader orWriter via their respective configs. If theSASLMechanism field isnil, it will not authenticate with SASL.

SASL Authentication Types
Plain
mechanism := plain.Mechanism{    Username: "username",    Password: "password",}
SCRAM
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")if err != nil {    panic(err)}
Connection
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")if err != nil {    panic(err)}dialer := &kafka.Dialer{    Timeout:       10 * time.Second,    DualStack:     true,    SASLMechanism: mechanism,}conn, err := dialer.DialContext(ctx, "tcp", "localhost:9093")
Reader
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")if err != nil {    panic(err)}dialer := &kafka.Dialer{    Timeout:       10 * time.Second,    DualStack:     true,    SASLMechanism: mechanism,}r := kafka.NewReader(kafka.ReaderConfig{    Brokers:        []string{"localhost:9092","localhost:9093", "localhost:9094"},    GroupID:        "consumer-group-id",    Topic:          "topic-A",    Dialer:         dialer,})
Writer
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")if err != nil {    panic(err)}// Transports are responsible for managing connection pools and other resources,// it's generally best to create a few of these and share them across your// application.sharedTransport := &kafka.Transport{    SASL: mechanism,}w := kafka.Writer{Addr:      kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),Topic:     "topic-A",Balancer:  &kafka.Hash{},Transport: sharedTransport,}
Client
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")if err != nil {    panic(err)}// Transports are responsible for managing connection pools and other resources,// it's generally best to create a few of these and share them across your// application.sharedTransport := &kafka.Transport{    SASL: mechanism,}client := &kafka.Client{    Addr:      kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),    Timeout:   10 * time.Second,    Transport: sharedTransport,}
Reading all messages within a time range
startTime := time.Now().Add(-time.Hour)endTime := time.Now()batchSize := int(10e6) // 10MBr := kafka.NewReader(kafka.ReaderConfig{    Brokers:   []string{"localhost:9092", "localhost:9093", "localhost:9094"},    Topic:     "my-topic1",    Partition: 0,    MaxBytes:  batchSize,})r.SetOffsetAt(context.Background(), startTime)for {    m, err := r.ReadMessage(context.Background())    if err != nil {        break    }    if m.Time.After(endTime) {        break    }    // TODO: process message    fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))}if err := r.Close(); err != nil {    log.Fatal("failed to close reader:", err)}

Logging

For visiblity into the operations of the Reader/Writer types, configure a logger on creation.

Reader
func logf(msg string, a ...interface{}) {fmt.Printf(msg, a...)fmt.Println()}r := kafka.NewReader(kafka.ReaderConfig{Brokers:     []string{"localhost:9092", "localhost:9093", "localhost:9094"},Topic:       "my-topic1",Partition:   0,Logger:      kafka.LoggerFunc(logf),ErrorLogger: kafka.LoggerFunc(logf),})
Writer
func logf(msg string, a ...interface{}) {fmt.Printf(msg, a...)fmt.Println()}w := &kafka.Writer{Addr:        kafka.TCP("localhost:9092"),Topic:       "topic",Logger:      kafka.LoggerFunc(logf),ErrorLogger: kafka.LoggerFunc(logf),}

Testing

Subtle behavior changes in later Kafka versions have caused some historical tests to break, if you are running against Kafka 2.3.1 or later, exporting theKAFKA_SKIP_NETTEST=1 environment variables will skip those tests.

Run Kafka locally in docker

docker-compose up -d

Run tests

KAFKA_VERSION=2.3.1 \  KAFKA_SKIP_NETTEST=1 \  go test -race ./...

(or) to clean up the cached test results and run tests:

go clean -cache && make test

Documentation

Index

Examples

Constants

View Source
const (SeekStart    = 0// Seek relative to the first offset available in the partition.SeekAbsolute = 1// Seek to an absolute offset.SeekEnd      = 2// Seek relative to the last offset available in the partition.SeekCurrent  = 3// Seek relative to the current offset.// This flag may be combined to any of the SeekAbsolute and SeekCurrent// constants to skip the bound check that the connection would do otherwise.// Programs can use this flag to avoid making a metadata request to the kafka// broker to read the current first and last offsets of the partition.SeekDontCheck = 1 << 30)
View Source
const (LastOffsetint64 = -1// The most recent offset available for a partition.FirstOffsetint64 = -2// The least recent offset available for a partition.)

Variables

View Source
var (// DefaultClientID is the default value used as ClientID of kafka// connections.DefaultClientIDstring)
View Source
var DefaultDialer = &Dialer{Timeout:   10 *time.Second,DualStack:true,}

DefaultDialer is the default dialer used when none is specified.

View Source
var ErrGenerationEnded =errors.New("consumer group generation has ended")

ErrGenerationEnded is returned by the context.Context issued by theGeneration's Start function when the context has been closed.

View Source
var ErrGroupClosed =errors.New("consumer group is closed")

ErrGroupClosed is returned by ConsumerGroup.Next when the group has alreadybeen closed.

Functions

funcMarshaladded inv0.4.0

func Marshal(v interface{}) ([]byte,error)

Marshal encodes v into a binary representation of the value in the kafka dataformat.

If v is a, or contains struct types, the kafka struct fields are interpretedand may contain one of these values:

nullable  valid on bytes and strings, encodes as a nullable valuecompact   valid on strings, encodes as a compact string

The kafka struct tags should not contain min and max versions. If you need toencode types based on specific versions of kafka APIs, use the Version typeinstead.

funcReadAlladded inv0.4.0

func ReadAll(bBytes) ([]byte,error)

ReadAll reads b into a byte slice.

funcTCPadded inv0.4.0

func TCP(address ...string)net.Addr

TCP constructs an address with the network set to "tcp".

funcUnmarshaladded inv0.4.0

func Unmarshal(b []byte, v interface{})error

Unmarshal decodes a binary representation from b into v.

See Marshal for details.

Types

typeACLDescriptionadded inv0.4.43

type ACLDescription struct {PrincipalstringHoststringOperationACLOperationTypePermissionTypeACLPermissionType}

typeACLEntryadded inv0.4.29

type ACLEntry struct {ResourceTypeResourceTypeResourceNamestringResourcePatternTypePatternTypePrincipalstringHoststringOperationACLOperationTypePermissionTypeACLPermissionType}

typeACLFilteradded inv0.4.43

type ACLFilter struct {ResourceTypeFilterResourceTypeResourceNameFilterstring// ResourcePatternTypeFilter was added in v1 and is not available prior to that.ResourcePatternTypeFilterPatternTypePrincipalFilterstringHostFilterstringOperationACLOperationTypePermissionTypeACLPermissionType}

typeACLOperationTypeadded inv0.4.29

type ACLOperationTypeint8
const (ACLOperationTypeUnknownACLOperationType = 0ACLOperationTypeAnyACLOperationType = 1ACLOperationTypeAllACLOperationType = 2ACLOperationTypeReadACLOperationType = 3ACLOperationTypeWriteACLOperationType = 4ACLOperationTypeCreateACLOperationType = 5ACLOperationTypeDeleteACLOperationType = 6ACLOperationTypeAlterACLOperationType = 7ACLOperationTypeDescribeACLOperationType = 8ACLOperationTypeClusterActionACLOperationType = 9ACLOperationTypeDescribeConfigsACLOperationType = 10ACLOperationTypeAlterConfigsACLOperationType = 11ACLOperationTypeIdempotentWriteACLOperationType = 12)

func (ACLOperationType)MarshalTextadded inv0.4.45

func (aotACLOperationType) MarshalText() ([]byte,error)

MarshalText transforms an ACLOperationType into its string representation.

func (ACLOperationType)Stringadded inv0.4.45

func (aotACLOperationType) String()string

func (*ACLOperationType)UnmarshalTextadded inv0.4.45

func (aot *ACLOperationType) UnmarshalText(text []byte)error

UnmarshalText takes a string representation of the resource type and converts it to an ACLPermissionType.

typeACLPermissionTypeadded inv0.4.29

type ACLPermissionTypeint8
const (ACLPermissionTypeUnknownACLPermissionType = 0ACLPermissionTypeAnyACLPermissionType = 1ACLPermissionTypeDenyACLPermissionType = 2ACLPermissionTypeAllowACLPermissionType = 3)

func (ACLPermissionType)MarshalTextadded inv0.4.45

func (aptACLPermissionType) MarshalText() ([]byte,error)

MarshalText transforms an ACLPermissionType into its string representation.

func (ACLPermissionType)Stringadded inv0.4.45

func (aptACLPermissionType) String()string

func (*ACLPermissionType)UnmarshalTextadded inv0.4.45

func (apt *ACLPermissionType) UnmarshalText(text []byte)error

UnmarshalText takes a string representation of the resource type and converts it to an ACLPermissionType.

typeACLResourceadded inv0.4.43

type ACLResource struct {ResourceTypeResourceTypeResourceNamestringPatternTypePatternTypeACLs         []ACLDescription}

typeAddOffsetsToTxnRequestadded inv0.4.20

type AddOffsetsToTxnRequest struct {// Address of the kafka broker to send the request to.Addrnet.Addr// The transactional id keyTransactionalIDstring// The Producer ID (PID) for the current producer session;// received from an InitProducerID request.ProducerIDint// The epoch associated with the current producer session for the given PIDProducerEpochint// The unique group identifier.GroupIDstring}

AddOffsetsToTxnRequest is the request structure for the AddOffsetsToTxn function.

typeAddOffsetsToTxnResponseadded inv0.4.20

type AddOffsetsToTxnResponse struct {// The amount of time that the broker throttled the request.Throttletime.Duration// An error that may have occurred when attempting to add the offsets// to a transaction.//// The errors contain the kafka error code. Programs may use the standard// errors.Is function to test the error against kafka error codes.Errorerror}

AddOffsetsToTxnResponse is the response structure for the AddOffsetsToTxn function.

typeAddPartitionToTxnadded inv0.4.19

type AddPartitionToTxn struct {// Partition is the ID of a partition to add to the transaction.Partitionint}

AddPartitionToTxn represents a partition to be addedto a transaction.

typeAddPartitionToTxnPartitionadded inv0.4.19

type AddPartitionToTxnPartition struct {// The ID of the partition.Partitionint// An error that may have occurred when attempting to add the partition// to a transaction.//// The errors contain the kafka error code. Programs may use the standard// errors.Is function to test the error against kafka error codes.Errorerror}

AddPartitionToTxnPartition represents the state of a single partitionin response to adding to a transaction.

typeAddPartitionsToTxnRequestadded inv0.4.19

type AddPartitionsToTxnRequest struct {// Address of the kafka broker to send the request to.Addrnet.Addr// The transactional id keyTransactionalIDstring// The Producer ID (PID) for the current producer session;// received from an InitProducerID request.ProducerIDint// The epoch associated with the current producer session for the given PIDProducerEpochint// Mappings of topic names to lists of partitions.Topics map[string][]AddPartitionToTxn}

AddPartitionsToTxnRequest is the request structure fo the AddPartitionsToTxn function.

typeAddPartitionsToTxnResponseadded inv0.4.21

type AddPartitionsToTxnResponse struct {// The amount of time that the broker throttled the request.Throttletime.Duration// Mappings of topic names to partitions being added to a transactions.Topics map[string][]AddPartitionToTxnPartition}

AddPartitionsToTxnResponse is the response structure for the AddPartitionsToTxn function.

typeAlterClientQuotaEntityadded inv0.4.41

type AlterClientQuotaEntity struct {// The quota entity type.EntityTypestring// The name of the quota entity, or null if the default.EntityNamestring}

typeAlterClientQuotaEntryadded inv0.4.41

type AlterClientQuotaEntry struct {// The quota entities to alter.Entities []AlterClientQuotaEntity// An individual quota configuration entry to alter.Ops []AlterClientQuotaOps}

typeAlterClientQuotaOpsadded inv0.4.41

type AlterClientQuotaOps struct {// The quota configuration key.Keystring// The quota configuration value to set, otherwise ignored if the value is to be removed.Valuefloat64// Whether the quota configuration value should be removed, otherwise set.Removebool}

typeAlterClientQuotaResponseQuotasadded inv0.4.41

type AlterClientQuotaResponseQuotas struct {// Error is set to a non-nil value including the code and message if a top-level// error was encountered when doing the update.Errorerror// The altered quota entities.Entities []AlterClientQuotaEntity}

typeAlterClientQuotasRequestadded inv0.4.41

type AlterClientQuotasRequest struct {// Address of the kafka broker to send the request to.Addrnet.Addr// List of client quotas entries to alter.Entries []AlterClientQuotaEntry// Whether the alteration should be validated, but not performed.ValidateOnlybool}

AlterClientQuotasRequest represents a request sent to a kafka broker toalter client quotas.

typeAlterClientQuotasResponseadded inv0.4.41

type AlterClientQuotasResponse struct {// The amount of time that the broker throttled the request.Throttletime.Duration// List of altered client quotas responses.Entries []AlterClientQuotaResponseQuotas}

AlterClientQuotasResponse represents a response from a kafka broker to an alter clientquotas request.

typeAlterConfigRequestConfigadded inv0.4.9

type AlterConfigRequestConfig struct {// Configuration key nameNamestring// The value to set for the configuration key.Valuestring}

typeAlterConfigRequestResourceadded inv0.4.9

type AlterConfigRequestResource struct {// Resource TypeResourceTypeResourceType// Resource NameResourceNamestring// Configs is a list of configuration updates.Configs []AlterConfigRequestConfig}

typeAlterConfigsRequestadded inv0.4.9

type AlterConfigsRequest struct {// Address of the kafka broker to send the request to.Addrnet.Addr// List of resources to update.Resources []AlterConfigRequestResource// When set to true, topics are not created but the configuration is// validated as if they were.ValidateOnlybool}

AlterConfigsRequest represents a request sent to a kafka broker to alter configs.

typeAlterConfigsResponseadded inv0.4.9

type AlterConfigsResponse struct {// Duration for which the request was throttled due to a quota violation.Throttletime.Duration// Mapping of topic names to errors that occurred while attempting to create// the topics.//// The errors contain the kafka error code. Programs may use the standard// errors.Is function to test the error against kafka error codes.Errors map[AlterConfigsResponseResource]error}

AlterConfigsResponse represents a response from a kafka broker to an alter config request.

typeAlterConfigsResponseResourceadded inv0.4.9

type AlterConfigsResponseResource struct {Typeint8Namestring}

AlterConfigsResponseResource helps map errors to specific resources in analter config response.

typeAlterPartitionReassignmentsRequestadded inv0.4.9

type AlterPartitionReassignmentsRequest struct {// Address of the kafka broker to send the request to.Addrnet.Addr// Topic is the name of the topic to alter partitions in. Keep this field empty and use Topic in AlterPartitionReassignmentsRequestAssignment to// reassign to multiple topics.Topicstring// Assignments is the list of partition reassignments to submit to the API.Assignments []AlterPartitionReassignmentsRequestAssignment// Timeout is the amount of time to wait for the request to complete.Timeouttime.Duration}

AlterPartitionReassignmentsRequest is a request to the AlterPartitionReassignments API.

typeAlterPartitionReassignmentsRequestAssignmentadded inv0.4.9

type AlterPartitionReassignmentsRequestAssignment struct {// Topic is the name of the topic to alter partitions in. If empty, the value of Topic in AlterPartitionReassignmentsRequest is used.Topicstring// PartitionID is the ID of the partition to make the reassignments in.PartitionIDint// BrokerIDs is a slice of brokers to set the partition replicas to, or null to cancel a pending reassignment for this partition.BrokerIDs []int}

AlterPartitionReassignmentsRequestAssignment contains the requested reassignments for a singlepartition.

typeAlterPartitionReassignmentsResponseadded inv0.4.9

type AlterPartitionReassignmentsResponse struct {// Error is set to a non-nil value including the code and message if a top-level// error was encountered when doing the update.Errorerror// PartitionResults contains the specific results for each partition.PartitionResults []AlterPartitionReassignmentsResponsePartitionResult}

AlterPartitionReassignmentsResponse is a response from the AlterPartitionReassignments API.

typeAlterPartitionReassignmentsResponsePartitionResultadded inv0.4.9

type AlterPartitionReassignmentsResponsePartitionResult struct {// Topic is the topic name.Topicstring// PartitionID is the ID of the partition that was altered.PartitionIDint// Error is set to a non-nil value including the code and message if an error was encountered// during the update for this partition.Errorerror}

AlterPartitionReassignmentsResponsePartitionResult contains the detailed result ofdoing reassignments for a single partition.

typeAlterUserScramCredentialsRequestadded inv0.4.43

type AlterUserScramCredentialsRequest struct {// Address of the kafka broker to send the request to.Addrnet.Addr// List of credentials to delete.Deletions []UserScramCredentialsDeletion// List of credentials to upsert.Upsertions []UserScramCredentialsUpsertion}

AlterUserScramCredentialsRequest represents a request sent to a kafka broker toalter user scram credentials.

typeAlterUserScramCredentialsResponseadded inv0.4.43

type AlterUserScramCredentialsResponse struct {// The amount of time that the broker throttled the request.Throttletime.Duration// List of altered user scram credentials.Results []AlterUserScramCredentialsResponseUser}

AlterUserScramCredentialsResponse represents a response from a kafka broker to an alter usercredentials request.

typeAlterUserScramCredentialsResponseUseradded inv0.4.43

type AlterUserScramCredentialsResponseUser struct {UserstringErrorerror}

typeApiVersionadded inv0.2.3

type ApiVersion struct {ApiKeyint16MinVersionint16MaxVersionint16}

func (ApiVersion)Formatadded inv0.3.5

func (vApiVersion) Format(wfmt.State, rrune)

typeApiVersionsRequestadded inv0.4.9

type ApiVersionsRequest struct {// Address of the kafka broker to send the request to.Addrnet.Addr}

ApiVersionsRequest is a request to the ApiVersions API.

typeApiVersionsResponseadded inv0.4.9

type ApiVersionsResponse struct {// Error is set to a non-nil value if an error was encountered.Errorerror// ApiKeys contains the specific details of each supported API.ApiKeys []ApiVersionsResponseApiKey}

ApiVersionsResponse is a response from the ApiVersions API.

typeApiVersionsResponseApiKeyadded inv0.4.9

type ApiVersionsResponseApiKey struct {// ApiKey is the ID of the API.ApiKeyint// ApiName is a human-friendly description of the API.ApiNamestring// MinVersion is the minimum API version supported by the broker.MinVersionint// MaxVersion is the maximum API version supported by the broker.MaxVersionint}

ApiVersionsResponseApiKey includes the details of which versions are supported for a single API.

typeBalancer

type Balancer interface {// Balance receives a message and a set of available partitions and// returns the partition number that the message should be routed to.//// An application should refrain from using a balancer to manage multiple// sets of partitions (from different topics for examples), use one balancer// instance for each partition set, so the balancer can detect when the// partitions change and assume that the kafka topic has been rebalanced.Balance(msgMessage, partitions ...int) (partitionint)}

The Balancer interface provides an abstraction of the message distributionlogic used by Writer instances to route messages to the partitions availableon a kafka cluster.

Balancers must be safe to use concurrently from multiple goroutines.

typeBalancerFunc

type BalancerFunc func(Message, ...int)int

BalancerFunc is an implementation of the Balancer interface that makes itpossible to use regular functions to distribute messages across partitions.

func (BalancerFunc)Balance

func (fBalancerFunc) Balance(msgMessage, partitions ...int)int

Balance calls f, satisfies the Balancer interface.

typeBatch

type Batch struct {// contains filtered or unexported fields}

A Batch is an iterator over a sequence of messages fetched from a kafkaserver.

Batches are created by calling (*Conn).ReadBatch. They hold a internal lockon the connection, which is released when the batch is closed. Failing tocall a batch's Close method will likely result in a dead-lock when trying touse the connection.

Batches are safe to use concurrently from multiple goroutines.

func (*Batch)Close

func (batch *Batch) Close()error

Close closes the batch, releasing the connection lock and returning an errorif reading the batch failed for any reason.

func (*Batch)Erradded inv0.2.5

func (batch *Batch) Err()error

Err returns a non-nil error if the batch is broken. This is the same errorthat would be returned by Read, ReadMessage or Close (except in the case ofio.EOF which is never returned by Close).

This method is useful when building retry mechanisms for (*Conn).ReadBatch,the program can check whether the batch carried a error before attempting toread the first message.

Note that checking errors on a batch is optional, calling Read or ReadMessageis always valid and can be used to either read a message or an error in caseswhere that's convenient.

func (*Batch)HighWaterMark

func (batch *Batch) HighWaterMark()int64

HighWaterMark returns the current highest watermark in a partition.

func (*Batch)Offset

func (batch *Batch) Offset()int64

Offset returns the offset of the next message in the batch.

func (*Batch)Partitionadded inv0.4.9

func (batch *Batch) Partition()int

Partition returns the batch partition.

func (*Batch)Read

func (batch *Batch) Read(b []byte) (int,error)

Read reads the value of the next message from the batch into b, returning thenumber of bytes read, or an error if the next message couldn't be read.

If an error is returned the batch cannot be used anymore and calling Readagain will keep returning that error. All errors except io.EOF (indicatingthat the program consumed all messages from the batch) are also returned byClose.

The method fails with io.ErrShortBuffer if the buffer passed as argument istoo small to hold the message value.

func (*Batch)ReadMessage

func (batch *Batch) ReadMessage() (Message,error)

ReadMessage reads and return the next message from the batch.

Because this method allocate memory buffers for the message key and valueit is less memory-efficient than Read, but has the advantage of neverfailing with io.ErrShortBuffer.

func (*Batch)Throttle

func (batch *Batch) Throttle()time.Duration

Throttle gives the throttling duration applied by the kafka server on theconnection.

typeBroker

type Broker struct {HoststringPortintIDintRackstring}

Broker represents a kafka broker in a kafka cluster.

typeBrokerResolveradded inv0.4.5

type BrokerResolver interface {// Returns the IP addresses of the broker passed as argument.LookupBrokerIPAddr(ctxcontext.Context, brokerBroker) ([]net.IPAddr,error)}

BrokerResolver is an interface implemented by types that translate hostnames into a network address.

This resolver is not intended to be a general purpose interface. Instead,it is tailored to the particular needs of the kafka protocol, with the goalbeing to provide a flexible mechanism for extending broker name resolutionwhile retaining context that is specific to interacting with a kafka cluster.

Resolvers must be safe to use from multiple goroutines.

funcNewBrokerResolveradded inv0.4.5

func NewBrokerResolver(r *net.Resolver)BrokerResolver

NewBrokerResolver constructs a Resolver from r.

If r is nil, net.DefaultResolver is used instead.

typeBytesadded inv0.4.0

type Bytes =protocol.Bytes

Bytes is an interface representing a sequence of bytes. This abstractionmakes it possible for programs to inject data into produce requests withouthaving to load in into an intermediary buffer, or read record keys and valuesfrom a fetch response directly from internal buffers.

Bytes are not safe to use concurrently from multiple goroutines.

funcNewBytesadded inv0.4.0

func NewBytes(b []byte)Bytes

NewBytes constructs a Bytes value from a byte slice.

If b is nil, nil is returned.

typeCRC32Balanceradded inv0.3.1

type CRC32Balancer struct {Consistentbool// contains filtered or unexported fields}

CRC32Balancer is a Balancer that uses the CRC32 hash function to determinewhich partition to route messages to. This ensures that messages with thesame key are routed to the same partition. This balancer is compatible withthe built-in hash partitioners in librdkafka and the language bindings thatare built on top of it, including thegithub.com/confluentinc/confluent-kafka-go Go package.

With the Consistent field false (default), this partitioner is equivalent tothe "consistent_random" setting in librdkafka. When Consistent is true, thispartitioner is equivalent to the "consistent" setting. The latter will hashempty or nil keys into the same partition.

Unless you are absolutely certain that all your messages will have keys, it'sbest to leave the Consistent flag off. Otherwise, you run the risk ofcreating a very hot partition.

func (CRC32Balancer)Balanceadded inv0.3.1

func (bCRC32Balancer) Balance(msgMessage, partitions ...int) (partitionint)

typeClientadded inv0.2.5

type Client struct {// Address of the kafka cluster (or specific broker) that the client will be// sending requests to.//// This field is optional, the address may be provided in each request// instead. The request address takes precedence if both were specified.Addrnet.Addr// Time limit for requests sent by this client.//// If zero, no timeout is applied.Timeouttime.Duration// A transport used to communicate with the kafka brokers.//// If nil, DefaultTransport is used.TransportRoundTripper}

Client is a high-level API to interract with kafka brokers.

All methods of the Client type accept a context as first argument, which maybe used to asynchronously cancel the requests.

Clients are safe to use concurrently from multiple goroutines, as long astheir configuration is not changed after first use.

func (*Client)AddOffsetsToTxnadded inv0.4.20

AddOffsetsToTnx sends an add offsets to txn request to a kafka broker and returns the response.

func (*Client)AddPartitionsToTxnadded inv0.4.19

AddPartitionsToTnx sends an add partitions to txn request to a kafka broker and returns the response.

func (*Client)AlterClientQuotasadded inv0.4.41

AlterClientQuotas sends client quotas alteration request to a kafka broker and returnsthe response.

func (*Client)AlterConfigsadded inv0.4.9

AlterConfigs sends a config altering request to a kafka broker and returns theresponse.

func (*Client)AlterPartitionReassignmentsadded inv0.4.9

func (*Client)AlterUserScramCredentialsadded inv0.4.43

AlterUserScramCredentials sends user scram credentials alteration request to a kafka broker and returnsthe response.

func (*Client)ApiVersionsadded inv0.4.9

func (*Client)ConsumerOffsetsadded inv0.2.5

func (c *Client) ConsumerOffsets(ctxcontext.Context, tgTopicAndGroup) (map[int]int64,error)

ConsumerOffsets returns a map[int]int64 of partition to committed offset fora consumer group id and topic.

DEPRECATED: this method will be removed in version 1.0, programs shouldmigrate to use kafka.(*Client).OffsetFetch instead.

func (*Client)CreateACLsadded inv0.4.29

CreateACLs sends ACLs creation request to a kafka broker and returns theresponse.

func (*Client)CreatePartitionsadded inv0.4.9

CreatePartitions sends a partitions creation request to a kafka broker and returns theresponse.

func (*Client)CreateTopicsadded inv0.4.0

CreateTopics sends a topic creation request to a kafka broker and returns theresponse.

func (*Client)DeleteACLsadded inv0.4.43

DeleteACLs sends ACLs deletion request to a kafka broker and returns theresponse.

func (*Client)DeleteGroupsadded inv0.4.40

DeleteGroups sends a delete groups request and returns the response. The request is sent to the group coordinator of the first groupof the request. All deleted groups must be managed by the same group coordinator.

func (*Client)DeleteTopicsadded inv0.4.0

DeleteTopics sends a topic deletion request to a kafka broker and returns theresponse.

func (*Client)DescribeACLsadded inv0.4.43

func (*Client)DescribeClientQuotasadded inv0.4.41

DescribeClientQuotas sends a describe client quotas request to a kafka broker and returnsthe response.

func (*Client)DescribeConfigsadded inv0.4.9

DescribeConfigs sends a config altering request to a kafka broker and returns theresponse.

func (*Client)DescribeGroupsadded inv0.4.9

DescribeGroups calls the Kafka DescribeGroups API to get information about one or moreconsumer groups. Seehttps://kafka.apache.org/protocol#The_Messages_DescribeGroups formore information.

func (*Client)DescribeUserScramCredentialsadded inv0.4.43

DescribeUserScramCredentials sends a user scram credentials describe request to a kafka broker and returnsthe response.

func (*Client)ElectLeadersadded inv0.4.9

func (*Client)EndTxnadded inv0.4.19

func (c *Client) EndTxn(ctxcontext.Context, req *EndTxnRequest) (*EndTxnResponse,error)

EndTxn sends an EndTxn request to a kafka broker and returns its response.

func (*Client)Fetchadded inv0.4.0

func (c *Client) Fetch(ctxcontext.Context, req *FetchRequest) (*FetchResponse,error)

Fetch sends a fetch request to a kafka broker and returns the response.

If the broker returned an invalid response with no topics, an error wrappingprotocol.ErrNoTopic is returned.

If the broker returned an invalid response with no partitions, an errorwrapping ErrNoPartitions is returned.

func (*Client)FindCoordinatoradded inv0.4.10

FindCoordinator sends a findCoordinator request to a kafka broker and returns theresponse.

func (*Client)Heartbeatadded inv0.4.19

Heartbeat sends a heartbeat request to a kafka broker and returns the response.

func (*Client)IncrementalAlterConfigsadded inv0.4.9

func (*Client)InitProducerIDadded inv0.4.11

InitProducerID sends a initProducerId request to a kafka broker and returns theresponse.

func (*Client)JoinGroupadded inv0.4.33

JoinGroup sends a join group request to the coordinator and returns the response.

func (*Client)LeaveGroupadded inv0.4.33

func (*Client)ListGroupsadded inv0.4.9

func (*Client)ListOffsetsadded inv0.4.0

ListOffsets sends an offset request to a kafka broker and returns theresponse.

func (*Client)ListPartitionReassignmentsadded inv0.4.45

func (*Client)Metadataadded inv0.4.0

Metadata sends a metadata request to a kafka broker and returns the response.

func (*Client)OffsetCommitadded inv0.4.19

OffsetCommit sends an offset commit request to a kafka broker and returns theresponse.

func (*Client)OffsetDeleteadded inv0.4.36

OffsetDelete sends a delete offset request to a kafka broker and returns theresponse.

func (*Client)OffsetFetchadded inv0.4.0

OffsetFetch sends an offset fetch request to a kafka broker and returns theresponse.

func (*Client)Produceadded inv0.4.0

func (c *Client) Produce(ctxcontext.Context, req *ProduceRequest) (*ProduceResponse,error)

Produce sends a produce request to a kafka broker and returns the response.

If the request contained no records, an error wrapping protocol.ErrNoRecordis returned.

When the request is configured with RequiredAcks=none, both the response andthe error will be nil on success.

func (*Client)RawProduceadded inv0.4.47

func (c *Client) RawProduce(ctxcontext.Context, req *RawProduceRequest) (*ProduceResponse,error)

RawProduce sends a raw produce request to a kafka broker and returns the response.

If the request contained no records, an error wrapping protocol.ErrNoRecordis returned.

When the request is configured with RequiredAcks=none, both the response andthe error will be nil on success.

func (*Client)SyncGroupadded inv0.4.33

SyncGroup sends a sync group request to the coordinator and returns the response.

func (*Client)TxnOffsetCommitadded inv0.4.20

TxnOffsetCommit sends an txn offset commit request to a kafka broker and returns theresponse.

typeCompressionCodec

type CompressionCodec =compress.Codec

typeConfigEntry

type ConfigEntry struct {ConfigNamestringConfigValuestring}

typeConfigOperationadded inv0.4.9

type ConfigOperationint8
const (ConfigOperationSetConfigOperation = 0ConfigOperationDeleteConfigOperation = 1ConfigOperationAppendConfigOperation = 2ConfigOperationSubtractConfigOperation = 3)

typeConn

type Conn struct {// contains filtered or unexported fields}

Conn represents a connection to a kafka broker.

Instances of Conn are safe to use concurrently from multiple goroutines.

funcDial

func Dial(networkstring, addressstring) (*Conn,error)

Dial is a convenience wrapper for DefaultDialer.Dial.

funcDialContext

func DialContext(ctxcontext.Context, networkstring, addressstring) (*Conn,error)

DialContext is a convenience wrapper for DefaultDialer.DialContext.

funcDialLeader

func DialLeader(ctxcontext.Context, networkstring, addressstring, topicstring, partitionint) (*Conn,error)

DialLeader is a convenience wrapper for DefaultDialer.DialLeader.

funcDialPartitionadded inv0.2.1

func DialPartition(ctxcontext.Context, networkstring, addressstring, partitionPartition) (*Conn,error)

DialPartition is a convenience wrapper for DefaultDialer.DialPartition.

funcNewConn

func NewConn(connnet.Conn, topicstring, partitionint) *Conn

NewConn returns a new kafka connection for the given topic and partition.

funcNewConnWith

func NewConnWith(connnet.Conn, configConnConfig) *Conn

NewConnWith returns a new kafka connection configured with config.The offset is initialized to FirstOffset.

func (*Conn)ApiVersionsadded inv0.2.3

func (c *Conn) ApiVersions() ([]ApiVersion,error)

func (*Conn)Brokeradded inv0.4.18

func (c *Conn) Broker()Broker

Broker returns a Broker value representing the kafka broker that thisconnection was established to.

func (*Conn)Brokersadded inv0.2.3

func (c *Conn) Brokers() ([]Broker,error)

Brokers retrieve the broker list from the Kafka metadata.

func (*Conn)Close

func (c *Conn) Close()error

Close closes the kafka connection.

func (*Conn)Controlleradded inv0.2.3

func (c *Conn) Controller() (brokerBroker, errerror)

Controller requests kafka for the current controller and returns its URL.

func (*Conn)CreateTopics

func (c *Conn) CreateTopics(topics ...TopicConfig)error

CreateTopics creates one topic per provided configuration with idempotentoperational semantics. In other words, if CreateTopics is invoked with aconfiguration for an existing topic, it will have no effect.

func (*Conn)DeleteTopics

func (c *Conn) DeleteTopics(topics ...string)error

DeleteTopics deletes the specified topics.

func (*Conn)LocalAddr

func (c *Conn) LocalAddr()net.Addr

LocalAddr returns the local network address.

func (*Conn)Offset

func (c *Conn) Offset() (offsetint64, whenceint)

Offset returns the current offset of the connection as pair of integers,where the first one is an offset value and the second one indicates howto interpret it.

See Seek for more details about the offset and whence values.

func (*Conn)Read

func (c *Conn) Read(b []byte) (int,error)

Read reads the message at the current offset from the connection, advancingthe offset on success so the next call to a read method will produce the nextmessage.The method returns the number of bytes read, or an error if something wentwrong.

While it is safe to call Read concurrently from multiple goroutines it maybe hard for the program to predict the results as the connection offset willbe read and written by multiple goroutines, they could read duplicates, ormessages may be seen by only some of the goroutines.

The method fails with io.ErrShortBuffer if the buffer passed as argument istoo small to hold the message value.

This method is provided to satisfy the net.Conn interface but is much lessefficient than using the more general purpose ReadBatch method.

func (*Conn)ReadBatch

func (c *Conn) ReadBatch(minBytes, maxBytesint) *Batch

ReadBatch reads a batch of messages from the kafka server. The method alwaysreturns a non-nil Batch value. If an error occurred, either sending the fetchrequest or reading the response, the error will be made available by thereturned value of the batch's Close method.

While it is safe to call ReadBatch concurrently from multiple goroutines itmay be hard for the program to predict the results as the connection offsetwill be read and written by multiple goroutines, they could read duplicates,or messages may be seen by only some of the goroutines.

A program doesn't specify the number of messages in wants from a batch, butgives the minimum and maximum number of bytes that it wants to receive fromthe kafka server.

func (*Conn)ReadBatchWithadded inv0.2.3

func (c *Conn) ReadBatchWith(cfgReadBatchConfig) *Batch

ReadBatchWith in every way is similar to ReadBatch. ReadBatch is configuredwith the default values in ReadBatchConfig except for minBytes and maxBytes.

func (*Conn)ReadFirstOffset

func (c *Conn) ReadFirstOffset() (int64,error)

ReadFirstOffset returns the first offset available on the connection.

func (*Conn)ReadLastOffset

func (c *Conn) ReadLastOffset() (int64,error)

ReadLastOffset returns the last offset available on the connection.

func (*Conn)ReadMessage

func (c *Conn) ReadMessage(maxBytesint) (Message,error)

ReadMessage reads the message at the current offset from the connection,advancing the offset on success so the next call to a read method willproduce the next message.

Because this method allocate memory buffers for the message key and valueit is less memory-efficient than Read, but has the advantage of neverfailing with io.ErrShortBuffer.

While it is safe to call Read concurrently from multiple goroutines it maybe hard for the program to predict the results as the connection offset willbe read and written by multiple goroutines, they could read duplicates, ormessages may be seen by only some of the goroutines.

This method is provided for convenience purposes but is much less efficientthan using the more general purpose ReadBatch method.

func (*Conn)ReadOffset

func (c *Conn) ReadOffset(ttime.Time) (int64,error)

ReadOffset returns the offset of the first message with a timestamp equal orgreater to t.

func (*Conn)ReadOffsets

func (c *Conn) ReadOffsets() (first, lastint64, errerror)

ReadOffsets returns the absolute first and last offsets of the topic used bythe connection.

func (*Conn)ReadPartitions

func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, errerror)

ReadPartitions returns the list of available partitions for the given list oftopics.

If the method is called with no topic, it uses the topic configured on theconnection. If there are none, the method fetches all partitions of the kafkacluster.

func (*Conn)RemoteAddr

func (c *Conn) RemoteAddr()net.Addr

RemoteAddr returns the remote network address.

func (*Conn)Seek

func (c *Conn) Seek(offsetint64, whenceint) (int64,error)

Seek sets the offset for the next read or write operation according to whence, whichshould be one of SeekStart, SeekAbsolute, SeekEnd, or SeekCurrent.When seeking relative to the end, the offset is subtracted from the current offset.Note that for historical reasons, these do not align with the usual whence constantsas in lseek(2) or os.Seek.The method returns the new absolute offset of the connection.

func (*Conn)SetDeadline

func (c *Conn) SetDeadline(ttime.Time)error

SetDeadline sets the read and write deadlines associated with the connection.It is equivalent to calling both SetReadDeadline and SetWriteDeadline.

A deadline is an absolute time after which I/O operations fail with a timeout(see type Error) instead of blocking. The deadline applies to all future andpending I/O, not just the immediately following call to Read or Write. Aftera deadline has been exceeded, the connection may be closed if it was found tobe in an unrecoverable state.

A zero value for t means I/O operations will not time out.

func (*Conn)SetReadDeadline

func (c *Conn) SetReadDeadline(ttime.Time)error

SetReadDeadline sets the deadline for future Read calls and anycurrently-blocked Read call.A zero value for t means Read will not time out.

func (*Conn)SetRequiredAcks

func (c *Conn) SetRequiredAcks(nint)error

SetRequiredAcks sets the number of acknowledges from replicas that theconnection requests when producing messages.

func (*Conn)SetWriteDeadline

func (c *Conn) SetWriteDeadline(ttime.Time)error

SetWriteDeadline sets the deadline for future Write calls and anycurrently-blocked Write call.Even if write times out, it may return n > 0, indicating that some of thedata was successfully written.A zero value for t means Write will not time out.

func (*Conn)Write

func (c *Conn) Write(b []byte) (int,error)

Write writes a message to the kafka broker that this connection wasestablished to. The method returns the number of bytes written, or an errorif something went wrong.

The operation either succeeds or fail, it never partially writes the message.

This method is exposed to satisfy the net.Conn interface but is less efficientthan the more general purpose WriteMessages method.

func (*Conn)WriteCompressedMessagesadded inv0.2.1

func (c *Conn) WriteCompressedMessages(codecCompressionCodec, msgs ...Message) (nbytesint, errerror)

WriteCompressedMessages writes a batch of messages to the connection's topicand partition, returning the number of bytes written. The write is an atomicoperation, it either fully succeeds or fails.

If the compression codec is not nil, the messages will be compressed.

func (*Conn)WriteCompressedMessagesAtadded inv0.2.3

func (c *Conn) WriteCompressedMessagesAt(codecCompressionCodec, msgs ...Message) (nbytesint, partitionint32, offsetint64, appendTimetime.Time, errerror)

WriteCompressedMessagesAt writes a batch of messages to the connection's topicand partition, returning the number of bytes written, partition and offset numbersand timestamp assigned by the kafka broker to the message set. The write is an atomicoperation, it either fully succeeds or fails.

If the compression codec is not nil, the messages will be compressed.

func (*Conn)WriteMessages

func (c *Conn) WriteMessages(msgs ...Message) (int,error)

WriteMessages writes a batch of messages to the connection's topic andpartition, returning the number of bytes written. The write is an atomicoperation, it either fully succeeds or fails.

typeConnConfig

type ConnConfig struct {ClientIDstringTopicstringPartitionintBrokerintRackstring// The transactional id to use for transactional delivery. Idempotent// deliver should be enabled if transactional id is configured.// For more details look at transactional.id description here:http://kafka.apache.org/documentation.html#producerconfigs// Empty string means that this connection can't be transactional.TransactionalIDstring}

ConnConfig is a configuration object used to create new instances of Conn.

typeConsumerGroupadded inv0.3.1

type ConsumerGroup struct {// contains filtered or unexported fields}

ConsumerGroup models a Kafka consumer group. A caller doesn't interact withthe group directly. Rather, they interact with a Generation. Every time amember enters or exits the group, it results in a new Generation. TheGeneration is where partition assignments and offset management occur.Callers will use Next to get a handle to the Generation.

funcNewConsumerGroupadded inv0.3.1

func NewConsumerGroup(configConsumerGroupConfig) (*ConsumerGroup,error)

NewConsumerGroup creates a new ConsumerGroup. It returns an error if theprovided configuration is invalid. It does not attempt to connect to theKafka cluster. That happens asynchronously, and any errors will be reportedby Next.

func (*ConsumerGroup)Closeadded inv0.3.1

func (cg *ConsumerGroup) Close()error

Close terminates the current generation by causing this member to leave andreleases all local resources used to participate in the consumer group.Close will also end the current generation if it is still active.

func (*ConsumerGroup)Nextadded inv0.3.1

Next waits for the next consumer group generation. There will never be twoactive generations. Next will never return a new generation until theprevious one has completed.

If there are errors setting up the next generation, they will be surfacedhere.

If the ConsumerGroup has been closed, then Next will return ErrGroupClosed.

typeConsumerGroupConfigadded inv0.3.1

type ConsumerGroupConfig struct {// ID is the consumer group ID.  It must not be empty.IDstring// The list of broker addresses used to connect to the kafka cluster.  It// must not be empty.Brokers []string// An dialer used to open connections to the kafka server. This field is// optional, if nil, the default dialer is used instead.Dialer *Dialer// Topics is the list of topics that will be consumed by this group.  It// will usually have a single value, but it is permitted to have multiple// for more complex use cases.Topics []string// GroupBalancers is the priority-ordered list of client-side consumer group// balancing strategies that will be offered to the coordinator.  The first// strategy that all group members support will be chosen by the leader.//// Default: [Range, RoundRobin]GroupBalancers []GroupBalancer// HeartbeatInterval sets the optional frequency at which the reader sends the consumer// group heartbeat update.//// Default: 3sHeartbeatIntervaltime.Duration// PartitionWatchInterval indicates how often a reader checks for partition changes.// If a reader sees a partition change (such as a partition add) it will rebalance the group// picking up new partitions.//// Default: 5sPartitionWatchIntervaltime.Duration// WatchForPartitionChanges is used to inform kafka-go that a consumer group should be// polling the brokers and rebalancing if any partition changes happen to the topic.WatchPartitionChangesbool// SessionTimeout optionally sets the length of time that may pass without a heartbeat// before the coordinator considers the consumer dead and initiates a rebalance.//// Default: 30sSessionTimeouttime.Duration// RebalanceTimeout optionally sets the length of time the coordinator will wait// for members to join as part of a rebalance.  For kafka servers under higher// load, it may be useful to set this value higher.//// Default: 30sRebalanceTimeouttime.Duration// JoinGroupBackoff optionally sets the length of time to wait before re-joining// the consumer group after an error.//// Default: 5sJoinGroupBackofftime.Duration// RetentionTime optionally sets the length of time the consumer group will// be saved by the broker.  -1 will disable the setting and leave the// retention up to the broker's offsets.retention.minutes property.  By// default, that setting is 1 day for kafka < 2.0 and 7 days for kafka >=// 2.0.//// Default: -1RetentionTimetime.Duration// StartOffset determines from whence the consumer group should begin// consuming when it finds a partition without a committed offset.  If// non-zero, it must be set to one of FirstOffset or LastOffset.//// Default: FirstOffsetStartOffsetint64// If not nil, specifies a logger used to report internal changes within the// reader.LoggerLogger// ErrorLogger is the logger used to report errors. If nil, the reader falls// back to using Logger instead.ErrorLoggerLogger// Timeout is the network timeout used when communicating with the consumer// group coordinator.  This value should not be too small since errors// communicating with the broker will generally cause a consumer group// rebalance, and it's undesirable that a transient network error intoduce// that overhead.  Similarly, it should not be too large or the consumer// group may be slow to respond to the coordinator failing over to another// broker.//// Default: 5sTimeouttime.Duration// contains filtered or unexported fields}

ConsumerGroupConfig is a configuration object used to create new instances ofConsumerGroup.

func (*ConsumerGroupConfig)Validateadded inv0.3.1

func (config *ConsumerGroupConfig) Validate()error

Validate method validates ConsumerGroupConfig properties and sets relevantdefaults.

typeCoordinatorKeyTypeadded inv0.4.10

type CoordinatorKeyTypeint8

CoordinatorKeyType is used to specify the type of coordinator to look for.

const (// CoordinatorKeyTypeConsumer type is used when looking for a Group coordinator.CoordinatorKeyTypeConsumerCoordinatorKeyType = 0// CoordinatorKeyTypeTransaction type is used when looking for a Transaction coordinator.CoordinatorKeyTypeTransactionCoordinatorKeyType = 1)

typeCreateACLsRequestadded inv0.4.29

type CreateACLsRequest struct {// Address of the kafka broker to send the request to.Addrnet.Addr// List of ACL to create.ACLs []ACLEntry}

CreateACLsRequest represents a request sent to a kafka broker to addnew ACLs.

typeCreateACLsResponseadded inv0.4.29

type CreateACLsResponse struct {// The amount of time that the broker throttled the request.Throttletime.Duration// List of errors that occurred while attempting to create// the ACLs.//// The errors contain the kafka error code. Programs may use the standard// errors.Is function to test the error against kafka error codes.Errors []error}

CreateACLsResponse represents a response from a kafka broker to an ACLcreation request.

typeCreatePartitionsRequestadded inv0.4.9

type CreatePartitionsRequest struct {// Address of the kafka broker to send the request to.Addrnet.Addr// List of topics to create and their configuration.Topics []TopicPartitionsConfig// When set to true, topics are not created but the configuration is// validated as if they were.ValidateOnlybool}

CreatePartitionsRequest represents a request sent to a kafka broker to createand update topic parititions.

typeCreatePartitionsResponseadded inv0.4.9

type CreatePartitionsResponse struct {// The amount of time that the broker throttled the request.Throttletime.Duration// Mapping of topic names to errors that occurred while attempting to create// the topics.//// The errors contain the kafka error code. Programs may use the standard// errors.Is function to test the error against kafka error codes.Errors map[string]error}

CreatePartitionsResponse represents a response from a kafka broker to a partitioncreation request.

typeCreateTopicsRequestadded inv0.4.0

type CreateTopicsRequest struct {// Address of the kafka broker to send the request to.Addrnet.Addr// List of topics to create and their configuration.Topics []TopicConfig// When set to true, topics are not created but the configuration is// validated as if they were.//// This field will be ignored if the kafka broker did not support the// CreateTopics API in version 1 or above.ValidateOnlybool}

CreateTopicsRequest represents a request sent to a kafka broker to createnew topics.

typeCreateTopicsResponseadded inv0.4.0

type CreateTopicsResponse struct {// The amount of time that the broker throttled the request.//// This field will be zero if the kafka broker did not support the// CreateTopics API in version 2 or above.Throttletime.Duration// Mapping of topic names to errors that occurred while attempting to create// the topics.//// The errors contain the kafka error code. Programs may use the standard// errors.Is function to test the error against kafka error codes.Errors map[string]error}

CreateTopicsResponse represents a response from a kafka broker to a topiccreation request.

typeDeleteACLsFilteradded inv0.4.43

type DeleteACLsFilter struct {ResourceTypeFilterResourceTypeResourceNameFilterstringResourcePatternTypeFilterPatternTypePrincipalFilterstringHostFilterstringOperationACLOperationTypePermissionTypeACLPermissionType}

typeDeleteACLsMatchingACLsadded inv0.4.43

type DeleteACLsMatchingACLs struct {ErrorerrorResourceTypeResourceTypeResourceNamestringResourcePatternTypePatternTypePrincipalstringHoststringOperationACLOperationTypePermissionTypeACLPermissionType}

typeDeleteACLsRequestadded inv0.4.43

type DeleteACLsRequest struct {// Address of the kafka broker to send the request to.Addrnet.Addr// List of ACL filters to use for deletion.Filters []DeleteACLsFilter}

DeleteACLsRequest represents a request sent to a kafka broker to deleteACLs.

typeDeleteACLsResponseadded inv0.4.43

type DeleteACLsResponse struct {// The amount of time that the broker throttled the request.Throttletime.Duration// List of the results from the deletion request.Results []DeleteACLsResult}

DeleteACLsResponse represents a response from a kafka broker to an ACLdeletion request.

typeDeleteACLsResultadded inv0.4.43

type DeleteACLsResult struct {ErrorerrorMatchingACLs []DeleteACLsMatchingACLs}

typeDeleteGroupsRequestadded inv0.4.40

type DeleteGroupsRequest struct {// Address of the kafka broker to send the request to.Addrnet.Addr// Identifiers of groups to delete.GroupIDs []string}

DeleteGroupsRequest represents a request sent to a kafka broker to deleteconsumer groups.

typeDeleteGroupsResponseadded inv0.4.40

type DeleteGroupsResponse struct {// The amount of time that the broker throttled the request.Throttletime.Duration// Mapping of group ids to errors that occurred while attempting to delete those groups.//// The errors contain the kafka error code. Programs may use the standard// errors.Is function to test the error against kafka error codes.Errors map[string]error}

DeleteGroupsResponse represents a response from a kafka broker to a consumer groupdeletion request.

typeDeleteTopicsRequestadded inv0.4.0

type DeleteTopicsRequest struct {// Address of the kafka broker to send the request to.Addrnet.Addr// Names of topics to delete.Topics []string}

DeleteTopicsRequest represents a request sent to a kafka broker to deletetopics.

typeDeleteTopicsResponseadded inv0.4.0

type DeleteTopicsResponse struct {// The amount of time that the broker throttled the request.//// This field will be zero if the kafka broker did not support the// DeleteTopics API in version 1 or above.Throttletime.Duration// Mapping of topic names to errors that occurred while attempting to delete// the topics.//// The errors contain the kafka error code. Programs may use the standard// errors.Is function to test the error against kafka error codes.Errors map[string]error}

DeleteTopicsResponse represents a response from a kafka broker to a topicdeletion request.

typeDescribeACLsRequestadded inv0.4.43

type DescribeACLsRequest struct {// Address of the kafka broker to send the request to.Addrnet.Addr// Filter to filter ACLs on.FilterACLFilter}

DescribeACLsRequest represents a request sent to a kafka broker to describeexisting ACLs.

typeDescribeACLsResponseadded inv0.4.43

type DescribeACLsResponse struct {// The amount of time that the broker throttled the request.Throttletime.Duration// Error that occurred while attempting to describe// the ACLs.Errorerror// ACL resources returned from the describe request.Resources []ACLResource}

DescribeACLsResponse represents a response from a kafka broker to an ACLdescribe request.

typeDescribeClientQuotasEntityadded inv0.4.41

type DescribeClientQuotasEntity struct {// The quota entity type.EntityTypestring// The name of the quota entity, or null if the default.EntityNamestring}

typeDescribeClientQuotasRequestadded inv0.4.41

type DescribeClientQuotasRequest struct {// Address of the kafka broker to send the request toAddrnet.Addr// List of quota components to describe.Components []DescribeClientQuotasRequestComponent// Whether the match is strict, i.e. should exclude entities with// unspecified entity types.Strictbool}

DescribeClientQuotasRequest represents a request sent to a kafka broker todescribe client quotas.

typeDescribeClientQuotasRequestComponentadded inv0.4.41

type DescribeClientQuotasRequestComponent struct {// The entity type that the filter component applies to.EntityTypestring// How to match the entity (0 = exact name, 1 = default name,// 2 = any specified name).MatchTypeint8// The string to match against, or null if unused for the match type.Matchstring}

typeDescribeClientQuotasResponseadded inv0.4.41

type DescribeClientQuotasResponse struct {// The amount of time that the broker throttled the request.Throttletime.Duration// Error is set to a non-nil value including the code and message if a top-level// error was encountered when doing the update.Errorerror// List of describe client quota responses.Entries []DescribeClientQuotasResponseQuotas}

DescribeClientQuotasResponse represents a response from a kafka broker to a describe client quota request.

typeDescribeClientQuotasResponseQuotasadded inv0.4.41

type DescribeClientQuotasResponseQuotas struct {// List of client quota entities and their descriptions.Entities []DescribeClientQuotasEntity// The client quota configuration values.Values []DescribeClientQuotasValue}

typeDescribeClientQuotasValueadded inv0.4.41

type DescribeClientQuotasValue struct {// The quota configuration key.Keystring// The quota configuration value.Valuefloat64}

typeDescribeConfigRequestResourceadded inv0.4.9

type DescribeConfigRequestResource struct {// Resource TypeResourceTypeResourceType// Resource NameResourceNamestring// ConfigNames is a list of configurations to update.ConfigNames []string}

typeDescribeConfigResponseConfigEntryadded inv0.4.9

type DescribeConfigResponseConfigEntry struct {ConfigNamestringConfigValuestringReadOnlybool// Ignored if API version is greater than v0IsDefaultbool// Ignored if API version is less than v1ConfigSourceint8IsSensitivebool// Ignored if API version is less than v1ConfigSynonyms []DescribeConfigResponseConfigSynonym// Ignored if API version is less than v3ConfigTypeint8// Ignored if API version is less than v3ConfigDocumentationstring}

DescribeConfigResponseConfigEntry.

typeDescribeConfigResponseConfigSynonymadded inv0.4.9

type DescribeConfigResponseConfigSynonym struct {// Ignored if API version is less than v1ConfigNamestring// Ignored if API version is less than v1ConfigValuestring// Ignored if API version is less than v1ConfigSourceint8}

DescribeConfigResponseConfigSynonym.

typeDescribeConfigResponseResourceadded inv0.4.9

type DescribeConfigResponseResource struct {// Resource TypeResourceTypeint8// Resource NameResourceNamestring// ErrorErrorerror// ConfigEntriesConfigEntries []DescribeConfigResponseConfigEntry}

DescribeConfigResponseResource.

typeDescribeConfigsRequestadded inv0.4.9

type DescribeConfigsRequest struct {// Address of the kafka broker to send the request to.Addrnet.Addr// List of resources to get details for.Resources []DescribeConfigRequestResource// Ignored if API version is less than v1IncludeSynonymsbool// Ignored if API version is less than v3IncludeDocumentationbool}

DescribeConfigsRequest represents a request sent to a kafka broker to describe configs.

typeDescribeConfigsResponseadded inv0.4.9

type DescribeConfigsResponse struct {// The amount of time that the broker throttled the request.Throttletime.Duration// ResourcesResources []DescribeConfigResponseResource}

DescribeConfigsResponse represents a response from a kafka broker to a describe config request.

typeDescribeGroupsRequestadded inv0.4.9

type DescribeGroupsRequest struct {// Addr is the address of the kafka broker to send the request to.Addrnet.Addr// GroupIDs is a slice of groups to get details for.GroupIDs []string}

DescribeGroupsRequest is a request to the DescribeGroups API.

typeDescribeGroupsResponseadded inv0.4.9

type DescribeGroupsResponse struct {// Groups is a slice of details for the requested groups.Groups []DescribeGroupsResponseGroup}

DescribeGroupsResponse is a response from the DescribeGroups API.

typeDescribeGroupsResponseAssignmentsadded inv0.4.9

type DescribeGroupsResponseAssignments struct {// Version is the version of the assignments data.Versionint// Topics contains the details of the partition assignments for each topic.Topics []GroupMemberTopic// UserData is the user data for the member.UserData []byte}

GroupMemberAssignmentsInfo stores the topic partition assignment data for a group member.

typeDescribeGroupsResponseGroupadded inv0.4.9

type DescribeGroupsResponseGroup struct {// Error is set to a non-nil value if there was an error fetching the details// for this group.Errorerror// GroupID is the ID of the group.GroupIDstring// GroupState is a description of the group state.GroupStatestring// Members contains details about each member of the group.Members []DescribeGroupsResponseMember}

DescribeGroupsResponseGroup contains the response details for a single group.

typeDescribeGroupsResponseMemberadded inv0.4.9

type DescribeGroupsResponseMember struct {// MemberID is the ID of the group member.MemberIDstring// ClientID is the ID of the client that the group member is using.ClientIDstring// ClientHost is the host of the client that the group member is connecting from.ClientHoststring// MemberMetadata contains metadata about this group member.MemberMetadataDescribeGroupsResponseMemberMetadata// MemberAssignments contains the topic partitions that this member is assigned to.MemberAssignmentsDescribeGroupsResponseAssignments}

MemberInfo represents the membership information for a single group member.

typeDescribeGroupsResponseMemberMetadataadded inv0.4.9

type DescribeGroupsResponseMemberMetadata struct {// Version is the version of the metadata.Versionint// Topics is the list of topics that the member is assigned to.Topics []string// UserData is the user data for the member.UserData []byte// OwnedPartitions contains the partitions owned by this group member; only set if// consumers are using a cooperative rebalancing assignor protocol.OwnedPartitions []DescribeGroupsResponseMemberMetadataOwnedPartition}

GroupMemberMetadata stores metadata associated with a group member.

typeDescribeGroupsResponseMemberMetadataOwnedPartitionadded inv0.4.21

type DescribeGroupsResponseMemberMetadataOwnedPartition struct {// Topic is the name of the topic.Topicstring// Partitions is the partitions that are owned by the group in the topic.Partitions []int}

typeDescribeUserScramCredentialsCredentialInfoadded inv0.4.43

type DescribeUserScramCredentialsCredentialInfo struct {MechanismScramMechanismIterationsint}

typeDescribeUserScramCredentialsRequestadded inv0.4.43

type DescribeUserScramCredentialsRequest struct {// Address of the kafka broker to send the request to.Addrnet.Addr// List of Scram users to describeUsers []UserScramCredentialsUser}

DescribeUserScramCredentialsRequest represents a request sent to a kafka broker todescribe user scram credentials.

typeDescribeUserScramCredentialsResponseadded inv0.4.43

type DescribeUserScramCredentialsResponse struct {// The amount of time that the broker throttled the request.Throttletime.Duration// Top level error that occurred while attempting to describe// the user scram credentials.//// The errors contain the kafka error code. Programs may use the standard// errors.Is function to test the error against kafka error codes.Errorerror// List of described user scram credentials.Results []DescribeUserScramCredentialsResponseResult}

DescribeUserScramCredentialsResponse represents a response from a kafka broker to a describe usercredentials request.

typeDescribeUserScramCredentialsResponseResultadded inv0.4.43

type DescribeUserScramCredentialsResponseResult struct {UserstringCredentialInfos []DescribeUserScramCredentialsCredentialInfoErrorerror}

typeDialer

type Dialer struct {// Unique identifier for client connections established by this Dialer.ClientIDstring// Optionally specifies the function that the dialer uses to establish// network connections. If nil, net.(*Dialer).DialContext is used instead.//// When DialFunc is set, LocalAddr, DualStack, FallbackDelay, and KeepAlive// are ignored.DialFunc func(ctxcontext.Context, networkstring, addressstring) (net.Conn,error)// Timeout is the maximum amount of time a dial will wait for a connect to// complete. If Deadline is also set, it may fail earlier.//// The default is no timeout.//// When dialing a name with multiple IP addresses, the timeout may be// divided between them.//// With or without a timeout, the operating system may impose its own// earlier timeout. For instance, TCP timeouts are often around 3 minutes.Timeouttime.Duration// Deadline is the absolute point in time after which dials will fail.// If Timeout is set, it may fail earlier.// Zero means no deadline, or dependent on the operating system as with the// Timeout option.Deadlinetime.Time// LocalAddr is the local address to use when dialing an address.// The address must be of a compatible type for the network being dialed.// If nil, a local address is automatically chosen.LocalAddrnet.Addr// DualStack enablesRFC 6555-compliant "Happy Eyeballs" dialing when the// network is "tcp" and the destination is a host name with both IPv4 and// IPv6 addresses. This allows a client to tolerate networks where one// address family is silently broken.DualStackbool// FallbackDelay specifies the length of time to wait before spawning a// fallback connection, when DualStack is enabled.// If zero, a default delay of 300ms is used.FallbackDelaytime.Duration// KeepAlive specifies the keep-alive period for an active network// connection.// If zero, keep-alives are not enabled. Network protocols that do not// support keep-alives ignore this field.KeepAlivetime.Duration// Resolver optionally gives a hook to convert the broker address into an// alternate host or IP address which is useful for custom service discovery.// If a custom resolver returns any possible hosts, the first one will be// used and the original discarded. If a port number is included with the// resolved host, it will only be used if a port number was not previously// specified. If no port is specified or resolved, the default of 9092 will be// used.ResolverResolver// TLS enables Dialer to open secure connections.  If nil, standard net.Conn// will be used.TLS *tls.Config// SASLMechanism configures the Dialer to use SASL authentication.  If nil,// no authentication will be performed.SASLMechanismsasl.Mechanism// The transactional id to use for transactional delivery. Idempotent// deliver should be enabled if transactional id is configured.// For more details look at transactional.id description here:http://kafka.apache.org/documentation.html#producerconfigs// Empty string means that the connection will be non-transactional.TransactionalIDstring}

The Dialer type mirrors the net.Dialer API but is designed to open kafkaconnections instead of raw network connections.

func (*Dialer)Dial

func (d *Dialer) Dial(networkstring, addressstring) (*Conn,error)

Dial connects to the address on the named network.

func (*Dialer)DialContext

func (d *Dialer) DialContext(ctxcontext.Context, networkstring, addressstring) (*Conn,error)

DialContext connects to the address on the named network using the providedcontext.

The provided Context must be non-nil. If the context expires before theconnection is complete, an error is returned. Once successfully connected,any expiration of the context will not affect the connection.

When using TCP, and the host in the address parameter resolves to multiplenetwork addresses, any dial timeout (from d.Timeout or ctx) is spread overeach consecutive dial, such that each is given an appropriate fraction of thetime to connect. For example, if a host has 4 IP addresses and the timeout is1 minute, the connect to each single address will be given 15 seconds tocomplete before trying the next one.

func (*Dialer)DialLeader

func (d *Dialer) DialLeader(ctxcontext.Context, networkstring, addressstring, topicstring, partitionint) (*Conn,error)

DialLeader opens a connection to the leader of the partition for a giventopic.

The address given to the DialContext method may not be the one that theconnection will end up being established to, because the dialer will lookupthe partition leader for the topic and return a connection to that server.The original address is only used as a mechanism to discover theconfiguration of the kafka cluster that we're connecting to.

func (*Dialer)DialPartitionadded inv0.2.1

func (d *Dialer) DialPartition(ctxcontext.Context, networkstring, addressstring, partitionPartition) (*Conn,error)

DialPartition opens a connection to the leader of the partition specified by partitiondescriptor. It's strongly advised to use descriptor of the partition that comes out offunctions LookupPartition or LookupPartitions.

func (*Dialer)LookupLeader

func (d *Dialer) LookupLeader(ctxcontext.Context, networkstring, addressstring, topicstring, partitionint) (Broker,error)

LookupLeader searches for the kafka broker that is the leader of thepartition for a given topic, returning a Broker value representing it.

func (*Dialer)LookupPartitionadded inv0.2.1

func (d *Dialer) LookupPartition(ctxcontext.Context, networkstring, addressstring, topicstring, partitionint) (Partition,error)

LookupPartition searches for the description of specified partition id.

func (*Dialer)LookupPartitions

func (d *Dialer) LookupPartitions(ctxcontext.Context, networkstring, addressstring, topicstring) ([]Partition,error)

LookupPartitions returns the list of partitions that exist for the given topic.

typeDurationStats

type DurationStats struct {Avgtime.Duration `metric:"avg" type:"gauge"`Mintime.Duration `metric:"min" type:"gauge"`Maxtime.Duration `metric:"max" type:"gauge"`Countint64         `metric:"count" type:"counter"`Sumtime.Duration `metric:"sum" type:"counter"`}

DurationStats is a data structure that carries a summary of observed duration values.

typeElectLeadersRequestadded inv0.4.9

type ElectLeadersRequest struct {// Addr is the address of the kafka broker to send the request to.Addrnet.Addr// Topic is the name of the topic to do the leader elections in.Topicstring// Partitions is the list of partitions to run leader elections for.Partitions []int// Timeout is the amount of time to wait for the election to run.Timeouttime.Duration}

ElectLeadersRequest is a request to the ElectLeaders API.

typeElectLeadersResponseadded inv0.4.9

type ElectLeadersResponse struct {// ErrorCode is set to a non-nil value if a top-level error occurred.Errorerror// PartitionResults contains the results for each partition leader election.PartitionResults []ElectLeadersResponsePartitionResult}

ElectLeadersResponse is a response from the ElectLeaders API.

typeElectLeadersResponsePartitionResultadded inv0.4.9

type ElectLeadersResponsePartitionResult struct {// Partition is the ID of the partition.Partitionint// Error is set to a non-nil value if an error occurred electing leaders// for this partition.Errorerror}

ElectLeadersResponsePartitionResult contains the response details for a single partition.

typeEndTxnRequestadded inv0.4.19

type EndTxnRequest struct {// Address of the kafka broker to send the request to.Addrnet.Addr// The transactional id key.TransactionalIDstring// The Producer ID (PID) for the current producer sessionProducerIDint// The epoch associated with the current producer session for the given PIDProducerEpochint// Committed should be set to true if the transaction was committed, false otherwise.Committedbool}

EndTxnRequest represets a request sent to a kafka broker to end a transaction.

typeEndTxnResponseadded inv0.4.19

type EndTxnResponse struct {// The amount of time that the broker throttled the request.Throttletime.Duration// Error is non-nil if an error occureda and contains the kafka error code.// Programs may use the standard errors.Is function to test the error// against kafka error codes.Errorerror}

EndTxnResponse represents a resposne from a kafka broker to an end transaction request.

typeError

type Errorint

Error represents the different error codes that may be returned by kafka.https://kafka.apache.org/protocol#protocol_error_codes

const (UnknownError = -1OffsetOutOfRangeError = 1InvalidMessageError = 2UnknownTopicOrPartitionError = 3InvalidMessageSizeError = 4LeaderNotAvailableError = 5NotLeaderForPartitionError = 6RequestTimedOutError = 7BrokerNotAvailableError = 8ReplicaNotAvailableError = 9MessageSizeTooLargeError = 10StaleControllerEpochError = 11OffsetMetadataTooLargeError = 12NetworkExceptionError = 13GroupLoadInProgressError = 14GroupCoordinatorNotAvailableError = 15NotCoordinatorForGroupError = 16InvalidTopicError = 17RecordListTooLargeError = 18NotEnoughReplicasError = 19NotEnoughReplicasAfterAppendError = 20InvalidRequiredAcksError = 21IllegalGenerationError = 22InconsistentGroupProtocolError = 23InvalidGroupIdError = 24UnknownMemberIdError = 25InvalidSessionTimeoutError = 26RebalanceInProgressError = 27InvalidCommitOffsetSizeError = 28TopicAuthorizationFailedError = 29GroupAuthorizationFailedError = 30ClusterAuthorizationFailedError = 31InvalidTimestampError = 32UnsupportedSASLMechanismError = 33IllegalSASLStateError = 34UnsupportedVersionError = 35TopicAlreadyExistsError = 36InvalidPartitionNumberError = 37InvalidReplicationFactorError = 38InvalidReplicaAssignmentError = 39InvalidConfigurationError = 40NotControllerError = 41InvalidRequestError = 42UnsupportedForMessageFormatError = 43PolicyViolationError = 44OutOfOrderSequenceNumberError = 45DuplicateSequenceNumberError = 46InvalidProducerEpochError = 47InvalidTransactionStateError = 48InvalidProducerIDMappingError = 49InvalidTransactionTimeoutError = 50ConcurrentTransactionsError = 51TransactionCoordinatorFencedError = 52TransactionalIDAuthorizationFailedError = 53SecurityDisabledError = 54BrokerAuthorizationFailedError = 55KafkaStorageErrorError = 56LogDirNotFoundError = 57SASLAuthenticationFailedError = 58UnknownProducerIdError = 59ReassignmentInProgressError = 60DelegationTokenAuthDisabledError = 61DelegationTokenNotFoundError = 62DelegationTokenOwnerMismatchError = 63DelegationTokenRequestNotAllowedError = 64DelegationTokenAuthorizationFailedError = 65DelegationTokenExpiredError = 66InvalidPrincipalTypeError = 67NonEmptyGroupError = 68GroupIdNotFoundError = 69FetchSessionIDNotFoundError = 70InvalidFetchSessionEpochError = 71ListenerNotFoundError = 72TopicDeletionDisabledError = 73FencedLeaderEpochError = 74UnknownLeaderEpochError = 75UnsupportedCompressionTypeError = 76StaleBrokerEpochError = 77OffsetNotAvailableError = 78MemberIDRequiredError = 79PreferredLeaderNotAvailableError = 80GroupMaxSizeReachedError = 81FencedInstanceIDError = 82EligibleLeadersNotAvailableError = 83ElectionNotNeededError = 84NoReassignmentInProgressError = 85GroupSubscribedToTopicError = 86InvalidRecordError = 87UnstableOffsetCommitError = 88ThrottlingQuotaExceededError = 89ProducerFencedError = 90ResourceNotFoundError = 91DuplicateResourceError = 92UnacceptableCredentialError = 93InconsistentVoterSetError = 94InvalidUpdateVersionError = 95FeatureUpdateFailedError = 96PrincipalDeserializationFailureError = 97SnapshotNotFoundError = 98PositionOutOfRangeError = 99UnknownTopicIDError = 100DuplicateBrokerRegistrationError = 101BrokerIDNotRegisteredError = 102InconsistentTopicIDError = 103InconsistentClusterIDError = 104TransactionalIDNotFoundError = 105FetchSessionTopicIDErrorError = 106)

func (Error)Description

func (eError) Description()string

Description returns a human readable description of cause of the error.

func (Error)Error

func (eError) Error()string

Error satisfies the error interface.

func (Error)Temporary

func (eError) Temporary()bool

Temporary returns true if the operation that generated the error may succeedif retried at a later time.Kafka error documentation specifies these as "retriable"https://kafka.apache.org/protocol#protocol_error_codes

func (Error)Timeout

func (eError) Timeout()bool

Timeout returns true if the error was due to a timeout.

func (Error)Title

func (eError) Title()string

Title returns a human readable title for the error.

typeFetchRequestadded inv0.4.0

type FetchRequest struct {// Address of the kafka broker to send the request to.Addrnet.Addr// Topic, partition, and offset to retrieve records from. The offset may be// one of the special FirstOffset or LastOffset constants, in which case the// request will automatically discover the first or last offset of the// partition and submit the request for these.TopicstringPartitionintOffsetint64// Size and time limits of the response returned by the broker.MinBytesint64MaxBytesint64MaxWaittime.Duration// The isolation level for the request.//// Defaults to ReadUncommitted.//// This field requires the kafka broker to support the Fetch API in version// 4 or above (otherwise the value is ignored).IsolationLevelIsolationLevel}

FetchRequest represents a request sent to a kafka broker to retrieve recordsfrom a topic partition.

typeFetchResponseadded inv0.4.0

type FetchResponse struct {// The amount of time that the broker throttled the request.Throttletime.Duration// The topic and partition that the response came for (will match the values// in the request).TopicstringPartitionint// Information about the topic partition layout returned from the broker.//// LastStableOffset requires the kafka broker to support the Fetch API in// version 4 or above (otherwise the value is zero).///// LogStartOffset requires the kafka broker to support the Fetch API in// version 5 or above (otherwise the value is zero).HighWatermarkint64LastStableOffsetint64LogStartOffsetint64// An error that may have occurred while attempting to fetch the records.//// The error contains both the kafka error code, and an error message// returned by the kafka broker. Programs may use the standard errors.Is// function to test the error against kafka error codes.Errorerror// The set of records returned in the response.//// The program is expected to call the RecordSet's Close method when it// finished reading the records.//// Note that kafka may return record batches that start at an offset before// the one that was requested. It is the program's responsibility to skip// the offsets that it is not interested in.RecordsRecordReader}

FetchResponse represents a response from a kafka broker to a fetch request.

typeFindCoordinatorRequestadded inv0.4.10

type FindCoordinatorRequest struct {// Address of the kafka broker to send the request to.Addrnet.Addr// The coordinator key.Keystring// The coordinator key type. (Group, transaction, etc.)KeyTypeCoordinatorKeyType}

FindCoordinatorRequest is the request structure for the FindCoordinator function.

typeFindCoordinatorResponseadded inv0.4.10

type FindCoordinatorResponse struct {// The Transaction/Group Coordinator detailsCoordinator *FindCoordinatorResponseCoordinator// The amount of time that the broker throttled the request.Throttletime.Duration// An error that may have occurred while attempting to retrieve Coordinator//// The error contains both the kafka error code, and an error message// returned by the kafka broker.Errorerror}

FindCoordinatorResponse is the response structure for the FindCoordinator function.

typeFindCoordinatorResponseCoordinatoradded inv0.4.10

type FindCoordinatorResponseCoordinator struct {// NodeID holds the broker id.NodeIDint// Host of the brokerHoststring// Port on which broker accepts requestsPortint}

FindCoordinatorResponseCoordinator contains details about the found coordinator.

typeGenerationadded inv0.3.1

type Generation struct {// ID is the generation ID as assigned by the consumer group coordinator.IDint32// GroupID is the name of the consumer group.GroupIDstring// MemberID is the ID assigned to this consumer by the consumer group// coordinator.MemberIDstring// Assignments is the initial state of this Generation.  The partition// assignments are grouped by topic.Assignments map[string][]PartitionAssignment// contains filtered or unexported fields}

Generation represents a single consumer group generation. The generationcarries the topic+partition assignments for the given. It also providesfacilities for committing offsets and for running functions whose lifecyclesare bound to the generation.

func (*Generation)CommitOffsetsadded inv0.3.1

func (g *Generation) CommitOffsets(offsets map[string]map[int]int64)error

CommitOffsets commits the provided topic+partition+offset combos to theconsumer group coordinator. This can be used to reset the consumer toexplicit offsets.

Example (OverwriteOffsets)
group, err := kafka.NewConsumerGroup(kafka.ConsumerGroupConfig{ID:      "my-group",Brokers: []string{"kafka:9092"},Topics:  []string{"my-topic"},})if err != nil {fmt.Printf("error creating consumer group: %+v\n", err)os.Exit(1)}defer group.Close()gen, err := group.Next(context.TODO())if err != nil {fmt.Printf("error getting next generation: %+v\n", err)os.Exit(1)}err = gen.CommitOffsets(map[string]map[int]int64{"my-topic": {0: 123,1: 456,3: 789,},})if err != nil {fmt.Printf("error committing offsets next generation: %+v\n", err)os.Exit(1)}

func (*Generation)Startadded inv0.3.1

func (g *Generation) Start(fn func(ctxcontext.Context))

Start launches the provided function in a go routine and adds accounting suchthat when the function exits, it stops the current generation (if notalready in the process of doing so).

The provided function MUST support cancellation via the ctx argument and exitin a timely manner once the ctx is complete. When the context is closed, thecontext's Error() function will return ErrGenerationEnded.

When closing out a generation, the consumer group will wait for all functionslaunched by Start to exit before the group can move on and join the nextgeneration. If the function does not exit promptly, it will stop forwardprogress for this consumer and potentially cause consumer group membershipchurn.

Example (ConsumerGroupParallelReaders)
group, err := kafka.NewConsumerGroup(kafka.ConsumerGroupConfig{ID:      "my-group",Brokers: []string{"kafka:9092"},Topics:  []string{"my-topic"},})if err != nil {fmt.Printf("error creating consumer group: %+v\n", err)os.Exit(1)}defer group.Close()for {gen, err := group.Next(context.TODO())if err != nil {break}assignments := gen.Assignments["my-topic"]for _, assignment := range assignments {partition, offset := assignment.ID, assignment.Offsetgen.Start(func(ctx context.Context) {// create reader for this partition.reader := kafka.NewReader(kafka.ReaderConfig{Brokers:   []string{"127.0.0.1:9092"},Topic:     "my-topic",Partition: partition,})defer reader.Close()// seek to the last committed offset for this partition.reader.SetOffset(offset)for {msg, err := reader.ReadMessage(ctx)if err != nil {if errors.Is(err, kafka.ErrGenerationEnded) {// generation has ended.  commit offsets.  in a real app,// offsets would be committed periodically.gen.CommitOffsets(map[string]map[int]int64{"my-topic": {partition: offset + 1}})return}fmt.Printf("error reading message: %+v\n", err)return}fmt.Printf("received message %s/%d/%d : %s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Value))offset = msg.Offset}})}}

typeGroupBalanceradded inv0.2.0

type GroupBalancer interface {// ProtocolName of the GroupBalancerProtocolName()string// UserData provides the GroupBalancer an opportunity to embed custom// UserData into the metadata.//// Will be used by JoinGroup to begin the consumer group handshake.//// Seehttps://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-JoinGroupRequestUserData() ([]byte,error)// DefineMemberships returns which members will be consuming// which topic partitionsAssignGroups(members []GroupMember, partitions []Partition)GroupMemberAssignments}

GroupBalancer encapsulates the client side rebalancing logic.

typeGroupMemberadded inv0.2.0

type GroupMember struct {// ID is the unique ID for this member as taken from the JoinGroup response.IDstring// Topics is a list of topics that this member is consuming.Topics []string// UserData contains any information that the GroupBalancer sent to the// consumer group coordinator.UserData []byte}

GroupMember describes a single participant in a consumer group.

typeGroupMemberAssignmentsadded inv0.2.0

type GroupMemberAssignments map[string]map[string][]int

GroupMemberAssignments holds MemberID => topic => partitions.

typeGroupMemberTopicadded inv0.4.9

type GroupMemberTopic struct {// Topic is the name of the topic.Topicstring// Partitions is a slice of partition IDs that this member is assigned to in the topic.Partitions []int}

GroupMemberTopic is a mapping from a topic to a list of partitions in the topic. It is usedto represent the topic partitions that have been assigned to a group member.

typeGroupProtocoladded inv0.4.33

type GroupProtocol struct {// The protocol name.Namestring// The protocol metadata.MetadataGroupProtocolSubscription}

GroupProtocol represents a consumer group protocol.

typeGroupProtocolAssignmentadded inv0.4.33

type GroupProtocolAssignment struct {// The topics and partitions assigned to the group memeber.AssignedPartitions map[string][]int// UserData for the assignemnt.UserData []byte}

GroupProtocolAssignment represents an assignment of topics and partitions for a group memeber.

typeGroupProtocolSubscriptionadded inv0.4.33

type GroupProtocolSubscription struct {// The Topics to subscribe to.Topics []string// UserData assosiated with the subscription for the given protocolUserData []byte// Partitions owned by this consumer.OwnedPartitions map[string][]int}

typeHash

type Hash struct {Hasherhash.Hash32// contains filtered or unexported fields}

Hash is a Balancer that uses the provided hash function to determine whichpartition to route messages to. This ensures that messages with the same keyare routed to the same partition.

The logic to calculate the partition is:

hasher.Sum32() % len(partitions) => partition

By default, Hash uses the FNV-1a algorithm. This is the same algorithm usedby the Sarama Producer and ensures that messages produced by kafka-go willbe delivered to the same topics that the Sarama producer would be delivered to.

func (*Hash)Balance

func (h *Hash) Balance(msgMessage, partitions ...int)int

typeHeaderadded inv0.2.3

type Header =protocol.Header

Header is a key/value pair type representing headers set on records.

typeHeartbeatRequestadded inv0.4.19

type HeartbeatRequest struct {// Address of the kafka broker to send the request to.Addrnet.Addr// GroupID is the ID of the group.GroupIDstring// GenerationID is the current generation for the group.GenerationIDint32// MemberID is the ID of the group member.MemberIDstring// GroupInstanceID is a unique identifier for the consumer.GroupInstanceIDstring}

HeartbeatRequest represents a heartbeat sent to kafka to indicate consume liveness.

typeHeartbeatResponseadded inv0.4.19

type HeartbeatResponse struct {// Error is set to non-nil if an error occurred.Errorerror// The amount of time that the broker throttled the request.//// This field will be zero if the kafka broker did not support the// Heartbeat API in version 1 or above.Throttletime.Duration}

HeartbeatResponse represents a response from a heartbeat request.

typeIncrementalAlterConfigsRequestadded inv0.4.9

type IncrementalAlterConfigsRequest struct {// Addr is the address of the kafka broker to send the request to.Addrnet.Addr// Resources contains the list of resources to update configs for.Resources []IncrementalAlterConfigsRequestResource// ValidateOnly indicates whether Kafka should validate the changes without actually// applying them.ValidateOnlybool}

IncrementalAlterConfigsRequest is a request to the IncrementalAlterConfigs API.

typeIncrementalAlterConfigsRequestConfigadded inv0.4.9

type IncrementalAlterConfigsRequestConfig struct {// Name is the name of the config.Namestring// Value is the value to set for this config.Valuestring// ConfigOperation indicates how this config should be updated (e.g., add, delete, etc.).ConfigOperationConfigOperation}

IncrementalAlterConfigsRequestConfig describes a single config key/value pair that shouldbe altered.

typeIncrementalAlterConfigsRequestResourceadded inv0.4.9

type IncrementalAlterConfigsRequestResource struct {// ResourceType is the type of resource to update.ResourceTypeResourceType// ResourceName is the name of the resource to update (i.e., topic name or broker ID).ResourceNamestring// Configs contains the list of config key/values to update.Configs []IncrementalAlterConfigsRequestConfig}

IncrementalAlterConfigsRequestResource contains the details of a single resource type whoseconfigs should be altered.

typeIncrementalAlterConfigsResponseadded inv0.4.9

type IncrementalAlterConfigsResponse struct {// Resources contains details of each resource config that was updated.Resources []IncrementalAlterConfigsResponseResource}

IncrementalAlterConfigsResponse is a response from the IncrementalAlterConfigs API.

typeIncrementalAlterConfigsResponseResourceadded inv0.4.9

type IncrementalAlterConfigsResponseResource struct {// Error is set to a non-nil value if an error occurred while updating this specific// config.Errorerror// ResourceType is the type of resource that was updated.ResourceTypeResourceType// ResourceName is the name of the resource that was updated.ResourceNamestring}

IncrementalAlterConfigsResponseResource contains the response details for a single resourcewhose configs were updated.

typeInitProducerIDRequestadded inv0.4.11

type InitProducerIDRequest struct {// Address of the kafka broker to send the request to.Addrnet.Addr// The transactional id key.TransactionalIDstring// Time after which a transaction should time outTransactionTimeoutMsint// The Producer ID (PID).// This is used to disambiguate requests if a transactional id is reused following its expiration.// Only supported in version >=3 of the request, will be ignore otherwise.ProducerIDint// The producer's current epoch.// This will be checked against the producer epoch on the broker,// and the request will return an error if they do not match.// Only supported in version >=3 of the request, will be ignore otherwise.ProducerEpochint}

InitProducerIDRequest is the request structure for the InitProducerId function.

typeInitProducerIDResponseadded inv0.4.11

type InitProducerIDResponse struct {// The Transaction/Group Coordinator detailsProducer *ProducerSession// The amount of time that the broker throttled the request.Throttletime.Duration// An error that may have occurred while attempting to retrieve initProducerId//// The error contains both the kafka error code, and an error message// returned by the kafka broker.Errorerror}

InitProducerIDResponse is the response structure for the InitProducerId function.

typeIsolationLeveladded inv0.2.3

type IsolationLevelint8
const (ReadUncommittedIsolationLevel = 0ReadCommittedIsolationLevel = 1)

typeJoinGroupRequestadded inv0.4.33

type JoinGroupRequest struct {// Address of the kafka broker to send the request to.Addrnet.Addr// GroupID of the group to join.GroupIDstring// The duration after which the coordinator considers the consumer dead// if it has not received a heartbeat.SessionTimeouttime.Duration// The duration the coordination will wait for each member to rejoin when rebalancing the group.RebalanceTimeouttime.Duration// The ID assigned by the group coordinator.MemberIDstring// The unique identifier for the consumer instance.GroupInstanceIDstring// The name for the class of protocols implemented by the group being joined.ProtocolTypestring// The list of protocols the member supports.Protocols []GroupProtocol}

JoinGroupRequest is the request structure for the JoinGroup function.

typeJoinGroupResponseadded inv0.4.33

type JoinGroupResponse struct {// An error that may have occurred when attempting to join the group.//// The errors contain the kafka error code. Programs may use the standard// errors.Is function to test the error against kafka error codes.Errorerror// The amount of time that the broker throttled the request.Throttletime.Duration// The generation ID of the group.GenerationIDint// The group protocol selected by the coordinatior.ProtocolNamestring// The group protocol name.ProtocolTypestring// The leader of the group.LeaderIDstring// The group member ID.MemberIDstring// The members of the group.Members []JoinGroupResponseMember}

JoinGroupResponse is the response structure for the JoinGroup function.

typeJoinGroupResponseMemberadded inv0.4.33

type JoinGroupResponseMember struct {// The group memmber ID.IDstring// The unique identifier of the consumer instance.GroupInstanceIDstring// The group member metadata.MetadataGroupProtocolSubscription}

JoinGroupResponseMember represents a group memmber in a reponse to a JoinGroup request.

typeLeastBytes

type LeastBytes struct {// contains filtered or unexported fields}

LeastBytes is a Balancer implementation that routes messages to the partitionthat has received the least amount of data.

Note that no coordination is done between multiple producers, having goodbalancing relies on the fact that each producer using a LeastBytes balancershould produce well balanced messages.

func (*LeastBytes)Balance

func (lb *LeastBytes) Balance(msgMessage, partitions ...int)int

Balance satisfies the Balancer interface.

typeLeaveGroupRequestadded inv0.4.33

type LeaveGroupRequest struct {// Address of the kafka broker to sent he request to.Addrnet.Addr// GroupID of the group to leave.GroupIDstring// List of leaving member identities.Members []LeaveGroupRequestMember}

LeaveGroupRequest is the request structure for the LeaveGroup function.

typeLeaveGroupRequestMemberadded inv0.4.33

type LeaveGroupRequestMember struct {// The member ID to remove from the group.IDstring// The group instance ID to remove from the group.GroupInstanceIDstring}

LeaveGroupRequestMember represents the indentify of a member leaving a group.

typeLeaveGroupResponseadded inv0.4.33

type LeaveGroupResponse struct {// An error that may have occurred when attempting to leave the group.//// The errors contain the kafka error code. Programs may use the standard// errors.Is function to test the error against kafka error codes.Errorerror// The amount of time that the broker throttled the request.Throttletime.Duration// List of leaving member responses.Members []LeaveGroupResponseMember}

LeaveGroupResponse is the response structure for the LeaveGroup function.

typeLeaveGroupResponseMemberadded inv0.4.33

type LeaveGroupResponseMember struct {// The member ID of the member leaving the group.IDstring// The group instance ID to remove from the group.GroupInstanceIDstring// An error that may have occured when attempting to remove the member from the group.//// The errors contain the kafka error code. Programs may use the standard// errors.Is function to test the error against kafka error codes.Errorerror}

LeaveGroupResponseMember represents a member leaving the group.

typeListGroupsRequestadded inv0.4.9

type ListGroupsRequest struct {// Addr is the address of the kafka broker to send the request to.Addrnet.Addr}

ListGroupsRequest is a request to the ListGroups API.

typeListGroupsResponseadded inv0.4.9

type ListGroupsResponse struct {// Error is set to a non-nil value if a top-level error occurred while fetching// groups.Errorerror// Groups contains the list of groups.Groups []ListGroupsResponseGroup}

ListGroupsResponse is a response from the ListGroups API.

typeListGroupsResponseGroupadded inv0.4.9

type ListGroupsResponseGroup struct {// GroupID is the ID of the group.GroupIDstring// Coordinator is the ID of the coordinator broker for the group.Coordinatorint// The group protocol type (eg "consumer", "connect")ProtocolTypestring}

ListGroupsResponseGroup contains the response details for a single group.

typeListOffsetsRequestadded inv0.4.0

type ListOffsetsRequest struct {// Address of the kafka broker to send the request to.Addrnet.Addr// A mapping of topic names to list of partitions that the program wishes to// get the offsets for.Topics map[string][]OffsetRequest// The isolation level for the request.//// Defaults to ReadUncommitted.//// This field requires the kafka broker to support the ListOffsets API in// version 2 or above (otherwise the value is ignored).IsolationLevelIsolationLevel}

ListOffsetsRequest represents a request sent to a kafka broker to list of theoffsets of topic partitions.

typeListOffsetsResponseadded inv0.4.0

type ListOffsetsResponse struct {// The amount of time that the broker throttled the request.Throttletime.Duration// Mappings of topics names to partition offsets, there will be one entry// for each topic in the request.Topics map[string][]PartitionOffsets}

ListOffsetsResponse represents a response from a kafka broker to a offsetlisting request.

typeListPartitionReassignmentsRequestadded inv0.4.45

type ListPartitionReassignmentsRequest struct {// Address of the kafka broker to send the request to.Addrnet.Addr// Topics we want reassignments for, mapped by their name, or nil to list everything.Topics map[string]ListPartitionReassignmentsRequestTopic// Timeout is the amount of time to wait for the request to complete.Timeouttime.Duration}

ListPartitionReassignmentsRequest is a request to the ListPartitionReassignments API.

typeListPartitionReassignmentsRequestTopicadded inv0.4.45

type ListPartitionReassignmentsRequestTopic struct {// The partitions to list partition reassignments for.PartitionIndexes []int}

ListPartitionReassignmentsRequestTopic contains the requested partitions for a singletopic.

typeListPartitionReassignmentsResponseadded inv0.4.45

type ListPartitionReassignmentsResponse struct {// Error is set to a non-nil value including the code and message if a top-level// error was encountered.Errorerror// Topics contains results for each topic, mapped by their name.Topics map[string]ListPartitionReassignmentsResponseTopic}

ListPartitionReassignmentsResponse is a response from the ListPartitionReassignments API.

typeListPartitionReassignmentsResponsePartitionadded inv0.4.45

type ListPartitionReassignmentsResponsePartition struct {// PartitionIndex contains index of the partition.PartitionIndexint// Replicas contains the current replica set.Replicas []int// AddingReplicas contains the set of replicas we are currently adding.AddingReplicas []int// RemovingReplicas contains the set of replicas we are currently removing.RemovingReplicas []int}

ListPartitionReassignmentsResponsePartition contains the detailed result ofongoing reassignments for a single partition.

typeListPartitionReassignmentsResponseTopicadded inv0.4.45

type ListPartitionReassignmentsResponseTopic struct {// Partitions contains result for topic partitions.Partitions []ListPartitionReassignmentsResponsePartition}

ListPartitionReassignmentsResponseTopic contains the detailed result ofongoing reassignments for a topic.

typeLoggeradded inv0.3.4

type Logger interface {Printf(string, ...interface{})}

Logger interface API for log.Logger.

typeLoggerFuncadded inv0.3.4

type LoggerFunc func(string, ...interface{})

LoggerFunc is a bridge between Logger and any third party loggerUsage:

l := NewLogger() // some loggerr := kafka.NewReader(kafka.ReaderConfig{  Logger:      kafka.LoggerFunc(l.Infof),  ErrorLogger: kafka.LoggerFunc(l.Errorf),})

func (LoggerFunc)Printfadded inv0.3.4

func (fLoggerFunc) Printf(msgstring, args ...interface{})

typeMessage

type Message struct {// Topic indicates which topic this message was consumed from via Reader.//// When being used with Writer, this can be used to configure the topic if// not already specified on the writer itself.Topicstring// Partition is read-only and MUST NOT be set when writing messagesPartitionintOffsetint64HighWaterMarkint64Key           []byteValue         []byteHeaders       []Header// This field is used to hold arbitrary data you wish to include, so it// will be available when handle it on the Writer's `Completion` method,// this support the application can do any post operation on each message.WriterData interface{}// If not set at the creation, Time will be automatically set when// writing the message.Timetime.Time}

Message is a data structure representing kafka messages.

typeMessageTooLargeErroradded inv0.2.5

type MessageTooLargeError struct {MessageMessageRemaining []Message}

MessageTooLargeError is returned when a message is too large to fit within the allowed size.

func (MessageTooLargeError)Erroradded inv0.2.5

func (MessageTooLargeError)Unwrapadded inv0.4.48

func (eMessageTooLargeError) Unwrap()error

typeMetadataRequestadded inv0.4.0

type MetadataRequest struct {// Address of the kafka broker to send the request to.Addrnet.Addr// The list of topics to retrieve metadata for.Topics []string}

MetadataRequest represents a request sent to a kafka broker to retrieve itscluster metadata.

typeMetadataResponseadded inv0.4.0

type MetadataResponse struct {// The amount of time that the broker throttled the request.Throttletime.Duration// Name of the kafka cluster that client retrieved metadata from.ClusterIDstring// The broker which is currently the controller for the cluster.ControllerBroker// The list of brokers registered to the cluster.Brokers []Broker// The list of topics available on the cluster.Topics []Topic}

MetadataResponse represents a response from a kafka broker to a metadatarequest.

typeMurmur2Balanceradded inv0.3.1

type Murmur2Balancer struct {Consistentbool// contains filtered or unexported fields}

Murmur2Balancer is a Balancer that uses the Murmur2 hash function todetermine which partition to route messages to. This ensures that messageswith the same key are routed to the same partition. This balancer iscompatible with the partitioner used by the Java library and by librdkafka's"murmur2" and "murmur2_random" partitioners.

With the Consistent field false (default), this partitioner is equivalent tothe "murmur2_random" setting in librdkafka. When Consistent is true, thispartitioner is equivalent to the "murmur2" setting. The latter will hashnil keys into the same partition. Empty, non-nil keys are always hashed tothe same partition regardless of configuration.

Unless you are absolutely certain that all your messages will have keys, it'sbest to leave the Consistent flag off. Otherwise, you run the risk ofcreating a very hot partition.

Note that the librdkafka documentation states that the "murmur2_random" isfunctionally equivalent to the default Java partitioner. That's because theJava partitioner will use a round robin balancer instead of random on nilkeys. We choose librdkafka's implementation because it arguably has a largerinstall base.

func (Murmur2Balancer)Balanceadded inv0.3.1

func (bMurmur2Balancer) Balance(msgMessage, partitions ...int) (partitionint)

typeOffsetCommitadded inv0.4.19

type OffsetCommit struct {PartitionintOffsetint64Metadatastring}

OffsetCommit represent the commit of an offset to a partition.

The extra metadata is opaque to the kafka protocol, it is intended to holdinformation like an identifier for the process that committed the offset,or the time at which the commit was made.

typeOffsetCommitPartitionadded inv0.4.19

type OffsetCommitPartition struct {// ID of the partition.Partitionint// An error that may have occurred while attempting to publish consumer// group offsets for this partition.//// The error contains both the kafka error code, and an error message// returned by the kafka broker. Programs may use the standard errors.Is// function to test the error against kafka error codes.Errorerror}

OffsetFetchPartition represents the state of a single partition in responsesto committing offsets.

typeOffsetCommitRequestadded inv0.4.19

type OffsetCommitRequest struct {// Address of the kafka broker to send the request to.Addrnet.Addr// ID of the consumer group to publish the offsets for.GroupIDstring// ID of the consumer group generation.GenerationIDint// ID of the group member submitting the offsets.MemberIDstring// ID of the group instance.InstanceIDstring// Set of topic partitions to publish the offsets for.//// Not that offset commits need to be submitted to the broker acting as the// group coordinator. This will be automatically resolved by the transport.Topics map[string][]OffsetCommit}

OffsetCommitRequest represents a request sent to a kafka broker to commitoffsets for a partition.

typeOffsetCommitResponseadded inv0.4.19

type OffsetCommitResponse struct {// The amount of time that the broker throttled the request.Throttletime.Duration// Set of topic partitions that the kafka broker has accepted offset commits// for.Topics map[string][]OffsetCommitPartition}

OffsetFetchResponse represents a response from a kafka broker to an offsetcommit request.

typeOffsetDeleteadded inv0.4.36

type OffsetDelete struct {TopicstringPartitionint}

OffsetDelete deletes the offset for a consumer group on a particular topicfor a particular partition.

typeOffsetDeletePartitionadded inv0.4.36

type OffsetDeletePartition struct {// ID of the partition.Partitionint// An error that may have occurred while attempting to delete an offset for// this partition.Errorerror}

OffsetDeletePartition represents the state of a status of a partition in responseto deleting offsets.

typeOffsetDeleteRequestadded inv0.4.36

type OffsetDeleteRequest struct {// Address of the kafka broker to send the request to.Addrnet.Addr// ID of the consumer group to delete the offsets for.GroupIDstring// Set of topic partitions to delete offsets for.Topics map[string][]int}

OffsetDeleteRequest represents a request sent to a kafka broker to deletethe offsets for a partition on a given topic associated with a consumer group.

typeOffsetDeleteResponseadded inv0.4.36

type OffsetDeleteResponse struct {// An error that may have occurred while attempting to delete an offsetErrorerror// The amount of time that the broker throttled the request.Throttletime.Duration// Set of topic partitions that the kafka broker has additional info (error?)// for.Topics map[string][]OffsetDeletePartition}

OffsetDeleteResponse represents a response from a kafka broker to a deleteoffset request.

typeOffsetFetchPartitionadded inv0.4.0

type OffsetFetchPartition struct {// ID of the partition.Partitionint// Last committed offsets on the partition when the request was served by// the kafka broker.CommittedOffsetint64// Consumer group metadata for this partition.Metadatastring// An error that may have occurred while attempting to retrieve consumer// group offsets for this partition.//// The error contains both the kafka error code, and an error message// returned by the kafka broker. Programs may use the standard errors.Is// function to test the error against kafka error codes.Errorerror}

OffsetFetchPartition represents the state of a single partition in a consumergroup.

typeOffsetFetchRequestadded inv0.4.0

type OffsetFetchRequest struct {// Address of the kafka broker to send the request to.Addrnet.Addr// ID of the consumer group to retrieve the offsets for.GroupIDstring// Set of topic partitions to retrieve the offsets for.Topics map[string][]int}

OffsetFetchRequest represents a request sent to a kafka broker to read thecurrently committed offsets of topic partitions.

typeOffsetFetchResponseadded inv0.4.0

type OffsetFetchResponse struct {// The amount of time that the broker throttled the request.Throttletime.Duration// Set of topic partitions that the kafka broker has returned offsets for.Topics map[string][]OffsetFetchPartition// An error that may have occurred while attempting to retrieve consumer// group offsets.//// The error contains both the kafka error code, and an error message// returned by the kafka broker. Programs may use the standard errors.Is// function to test the error against kafka error codes.Errorerror}

OffsetFetchResponse represents a response from a kafka broker to an offsetfetch request.

typeOffsetRequestadded inv0.4.0

type OffsetRequest struct {PartitionintTimestampint64}

OffsetRequest represents a request to retrieve a single partition offset.

funcFirstOffsetOfadded inv0.4.0

func FirstOffsetOf(partitionint)OffsetRequest

FirstOffsetOf constructs an OffsetRequest which asks for the first offset ofthe partition given as argument.

funcLastOffsetOfadded inv0.4.0

func LastOffsetOf(partitionint)OffsetRequest

LastOffsetOf constructs an OffsetRequest which asks for the last offset ofthe partition given as argument.

funcTimeOffsetOfadded inv0.4.0

func TimeOffsetOf(partitionint, attime.Time)OffsetRequest

TimeOffsetOf constructs an OffsetRequest which asks for a partition offsetat a given time.

typePartition

type Partition struct {// Name of the topic that the partition belongs to, and its index in the// topic.TopicstringIDint// Leader, replicas, and ISR for the partition.//// When no physical host is known to be running a broker, the Host and Port// fields will be set to the zero values. The logical broker ID is always// set to the value known to the kafka cluster, even if the broker is not// currently backed by a physical host.LeaderBrokerReplicas []BrokerIsr      []Broker// Available only with metadata API level >= 6:OfflineReplicas []Broker// An error that may have occurred while attempting to read the partition// metadata.//// The error contains both the kafka error code, and an error message// returned by the kafka broker. Programs may use the standard errors.Is// function to test the error against kafka error codes.Errorerror}

Partition carries the metadata associated with a kafka partition.

funcLookupPartitionadded inv0.2.1

func LookupPartition(ctxcontext.Context, networkstring, addressstring, topicstring, partitionint) (Partition,error)

LookupPartition is a convenience wrapper for DefaultDialer.LookupPartition.

funcLookupPartitionsadded inv0.2.1

func LookupPartitions(ctxcontext.Context, networkstring, addressstring, topicstring) ([]Partition,error)

LookupPartitions is a convenience wrapper for DefaultDialer.LookupPartitions.

typePartitionAssignmentadded inv0.3.1

type PartitionAssignment struct {// ID is the partition ID.IDint// Offset is the initial offset at which this assignment begins.  It will// either be an absolute offset if one has previously been committed for// the consumer group or a relative offset such as FirstOffset when this// is the first time the partition have been assigned to a member of the// group.Offsetint64}

PartitionAssignment represents the starting state of a partition that hasbeen assigned to a consumer.

typePartitionOffsetsadded inv0.4.0

type PartitionOffsets struct {PartitionintFirstOffsetint64LastOffsetint64Offsets     map[int64]time.TimeErrorerror}

PartitionOffsets carries information about offsets available in a topicpartition.

typePatternTypeadded inv0.4.29

type PatternTypeint8

https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/PatternType.java

const (// PatternTypeUnknown represents any PatternType which this client cannot// understand.PatternTypeUnknownPatternType = 0// PatternTypeAny matches any resource pattern type.PatternTypeAnyPatternType = 1// PatternTypeMatch perform pattern matching.PatternTypeMatchPatternType = 2// PatternTypeLiteral represents a literal name.// A literal name defines the full name of a resource, e.g. topic with name// 'foo', or group with name 'bob'.PatternTypeLiteralPatternType = 3// PatternTypePrefixed represents a prefixed name.// A prefixed name defines a prefix for a resource, e.g. topics with names// that start with 'foo'.PatternTypePrefixedPatternType = 4)

func (PatternType)MarshalTextadded inv0.4.45

func (ptPatternType) MarshalText() ([]byte,error)

func (PatternType)Stringadded inv0.4.45

func (ptPatternType) String()string

func (*PatternType)UnmarshalTextadded inv0.4.45

func (pt *PatternType) UnmarshalText(text []byte)error

typeProduceRequestadded inv0.4.0

type ProduceRequest struct {// Address of the kafka broker to send the request to.Addrnet.Addr// The topic to produce the records to.Topicstring// The partition to produce the records to.Partitionint// The level of required acknowledgements to ask the kafka broker for.RequiredAcksRequiredAcks// The message format version used when encoding the records.//// By default, the client automatically determine which version should be// used based on the version of the Produce API supported by the server.MessageVersionint// An optional transaction id when producing to the kafka broker is part of// a transaction.TransactionalIDstring// The sequence of records to produce to the topic partition.RecordsRecordReader// An optional compression algorithm to apply to the batch of records sent// to the kafka broker.CompressionCompression}

ProduceRequest represents a request sent to a kafka broker to produce recordsto a topic partition.

typeProduceResponseadded inv0.4.0

type ProduceResponse struct {// The amount of time that the broker throttled the request.Throttletime.Duration// An error that may have occurred while attempting to produce the records.//// The error contains both the kafka error code, and an error message// returned by the kafka broker. Programs may use the standard errors.Is// function to test the error against kafka error codes.Errorerror// Offset of the first record that was written to the topic partition.//// This field will be zero if the kafka broker did not support Produce API// version 3 or above.BaseOffsetint64// Time at which the broker wrote the records to the topic partition.//// This field will be zero if the kafka broker did not support Produce API// version 2 or above.LogAppendTimetime.Time// First offset in the topic partition that the records were written to.//// This field will be zero if the kafka broker did not support Produce// API version 5 or above (or if the first offset is zero).LogStartOffsetint64// If errors occurred writing specific records, they will be reported in// this map.//// This field will always be empty if the kafka broker did not support the// Produce API in version 8 or above.RecordErrors map[int]error}

ProduceResponse represents a response from a kafka broker to a producerequest.

typeProducerSessionadded inv0.4.11

type ProducerSession struct {// The Producer ID (PID) for the current producer sessionProducerIDint// The epoch associated with the current producer session for the given PIDProducerEpochint}

ProducerSession contains useful information about the producer session from the broker's response.

typeRackAffinityGroupBalanceradded inv0.3.6

type RackAffinityGroupBalancer struct {// Rack is the name of the rack where this consumer is running.  It will be// communicated to the consumer group leader via the UserData so that// assignments can be made with affinity to the partition leader.Rackstring}

RackAffinityGroupBalancer makes a best effort to pair up consumers withpartitions whose leader is in the same rack. This strategy can haveperformance benefits by minimizing round trip latency between the consumerand the broker. In environments where network traffic across racks incurscharges (such as cross AZ data transfer in AWS), this strategy is also a costoptimization measure because it keeps network traffic within the local rackwhere possible.

The primary objective is to spread partitions evenly across consumers with asecondary focus on maximizing the number of partitions where the leader andthe consumer are in the same rack. For best affinity, it's recommended tohave a balanced spread of consumers and partition leaders across racks.

This balancer requires Kafka version 0.10.0.0+ or later. Earlier versions donot return the brokers' racks in the metadata request.

func (RackAffinityGroupBalancer)AssignGroupsadded inv0.3.6

func (rRackAffinityGroupBalancer) AssignGroups(members []GroupMember, partitions []Partition)GroupMemberAssignments

func (RackAffinityGroupBalancer)ProtocolNameadded inv0.3.6

func (rRackAffinityGroupBalancer) ProtocolName()string

func (RackAffinityGroupBalancer)UserDataadded inv0.3.6

func (rRackAffinityGroupBalancer) UserData() ([]byte,error)

typeRangeGroupBalanceradded inv0.2.0

type RangeGroupBalancer struct{}

RangeGroupBalancer groups consumers by partition

Example: 5 partitions, 2 consumers

C0: [0, 1, 2]C1: [3, 4]

Example: 6 partitions, 3 consumers

C0: [0, 1]C1: [2, 3]C2: [4, 5]

func (RangeGroupBalancer)AssignGroupsadded inv0.2.0

func (rRangeGroupBalancer) AssignGroups(members []GroupMember, topicPartitions []Partition)GroupMemberAssignments

func (RangeGroupBalancer)ProtocolNameadded inv0.2.0

func (rRangeGroupBalancer) ProtocolName()string

func (RangeGroupBalancer)UserDataadded inv0.2.0

func (rRangeGroupBalancer) UserData() ([]byte,error)

typeRawProduceRequestadded inv0.4.47

type RawProduceRequest struct {// Address of the kafka broker to send the request to.Addrnet.Addr// The topic to produce the records to.Topicstring// The partition to produce the records to.Partitionint// The level of required acknowledgements to ask the kafka broker for.RequiredAcksRequiredAcks// The message format version used when encoding the records.//// By default, the client automatically determine which version should be// used based on the version of the Produce API supported by the server.MessageVersionint// An optional transaction id when producing to the kafka broker is part of// a transaction.TransactionalIDstring// The sequence of records to produce to the topic partition.RawRecordsprotocol.RawRecordSet}

RawProduceRequest represents a request sent to a kafka broker to produce recordsto a topic partition. The request contains a pre-encoded/raw record set.

typeReadBatchConfigadded inv0.2.3

type ReadBatchConfig struct {// MinBytes indicates to the broker the minimum batch size that the consumer// will accept. Setting a high minimum when consuming from a low-volume topic// may result in delayed delivery when the broker does not have enough data to// satisfy the defined minimum.MinBytesint// MaxBytes indicates to the broker the maximum batch size that the consumer// will accept. The broker will truncate a message to satisfy this maximum, so// choose a value that is high enough for your largest message size.MaxBytesint// IsolationLevel controls the visibility of transactional records.// ReadUncommitted makes all records visible. With ReadCommitted only// non-transactional and committed records are visible.IsolationLevelIsolationLevel// MaxWait is the amount of time for the broker while waiting to hit the// min/max byte targets.  This setting is independent of any network-level// timeouts or deadlines.//// For backward compatibility, when this field is left zero, kafka-go will// infer the max wait from the connection's read deadline.MaxWaittime.Duration}

ReadBatchConfig is a configuration object used for reading batches of messages.

typeReader

type Reader struct {// contains filtered or unexported fields}

Reader provides a high-level API for consuming messages from kafka.

A Reader automatically manages reconnections to a kafka server, andblocking methods have context support for asynchronous cancellations.

Note that it is important to call `Close()` on a `Reader` when a process exits.The kafka server needs a graceful disconnect to stop it from continuing toattempt to send messages to the connected clients. The given example will notcall `Close()` if the process is terminated with SIGINT (ctrl-c at the shell) orSIGTERM (as docker stop or a kubernetes restart does). This can result in adelay when a new reader on the same topic connects (e.g. new process startedor new container running). Use a `signal.Notify` handler to close the reader onprocess shutdown.

funcNewReader

func NewReader(configReaderConfig) *Reader

NewReader creates and returns a new Reader configured with config.The offset is initialized to FirstOffset.

Example (RackAffinity)

ExampleNewReader_rackAffinity shows how the RackAffinityGroupBalancer can beused to pair up consumers with brokers in the same AWS availability zone.This code assumes that each brokers' rack is configured to be the name of theAZ in which it is running.

package mainimport ("context""encoding/json""io/ioutil""net/http""os""strings""time")// ExampleNewReader_rackAffinity shows how the RackAffinityGroupBalancer can be// used to pair up consumers with brokers in the same AWS availability zone.// This code assumes that each brokers' rack is configured to be the name of the// AZ in which it is running.func main() {r := NewReader(ReaderConfig{Brokers: []string{"kafka:9092"},GroupID: "my-group",Topic:   "my-topic",GroupBalancers: []GroupBalancer{RackAffinityGroupBalancer{Rack: findRack()},RangeGroupBalancer{},},})r.ReadMessage(context.Background())r.Close()}// findRack is the basic rack resolver strategy for use in AWS.  It supports//   - ECS with the task metadata endpoint enabled (returns the container//     instance's availability zone)//   - Linux EC2 (returns the instance's availability zone)func findRack() string {switch whereAmI() {case "ecs":return ecsAvailabilityZone()case "ec2":return ec2AvailabilityZone()}return ""}const ecsContainerMetadataURI = "ECS_CONTAINER_METADATA_URI"// whereAmI determines which strategy the rack resolver should use.func whereAmI() string {// https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint.htmlif os.Getenv(ecsContainerMetadataURI) != "" {return "ecs"}// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/identify_ec2_instances.htmlfor _, path := range [...]string{"/sys/devices/virtual/dmi/id/product_uuid","/sys/hypervisor/uuid",} {b, err := ioutil.ReadFile(path)if err != nil {continue}s := string(b)switch {case strings.HasPrefix(s, "EC2"), strings.HasPrefix(s, "ec2"):return "ec2"}}return "somewhere"}// ecsAvailabilityZone queries the task endpoint for the metadata URI that ECS// injects into the ECS_CONTAINER_METADATA_URI variable in order to retrieve// the availability zone where the task is running.func ecsAvailabilityZone() string {client := http.Client{Timeout: time.Second,Transport: &http.Transport{DisableCompression: true,DisableKeepAlives:  true,},}r, err := client.Get(os.Getenv(ecsContainerMetadataURI) + "/task")if err != nil {return ""}defer r.Body.Close()var md struct {AvailabilityZone string}if err := json.NewDecoder(r.Body).Decode(&md); err != nil {return ""}return md.AvailabilityZone}// ec2AvailabilityZone queries the metadata endpoint to discover the// availability zone where this code is running.  we avoid calling this function// unless we know we're in EC2.  Otherwise, in other environments, we would need// to wait for the request to 169.254.169.254 to timeout before proceeding.func ec2AvailabilityZone() string {client := http.Client{Timeout: time.Second,Transport: &http.Transport{DisableCompression: true,DisableKeepAlives:  true,},}r, err := client.Get("http://169.254.169.254/latest/meta-data/placement/availability-zone")if err != nil {return ""}defer r.Body.Close()b, err := ioutil.ReadAll(r.Body)if err != nil {return ""}return string(b)}

func (*Reader)Close

func (r *Reader) Close()error

Close closes the stream, preventing the program from reading any moremessages from it.

func (*Reader)CommitMessages

func (r *Reader) CommitMessages(ctxcontext.Context, msgs ...Message)error

CommitMessages commits the list of messages passed as argument. The programmay pass a context to asynchronously cancel the commit operation when it wasconfigured to be blocking.

Because kafka consumer groups track a single offset per partition, thehighest message offset passed to CommitMessages will cause all previousmessages to be committed. Applications need to account for these Kafkalimitations when committing messages, and maintain message ordering if theyneed strong delivery guarantees. This property makes it valid to pass onlythe last message seen to CommitMessages in order to move the offset of thetopic/partition it belonged to forward, effectively committing all previousmessages in the partition.

func (*Reader)Config

func (r *Reader) Config()ReaderConfig

Config returns the reader's configuration.

func (*Reader)FetchMessage

func (r *Reader) FetchMessage(ctxcontext.Context) (Message,error)

FetchMessage reads and return the next message from the r. The method callblocks until a message becomes available, or an error occurs. The programmay also specify a context to asynchronously cancel the blocking operation.

The method returns io.EOF to indicate that the reader has been closed.

FetchMessage does not commit offsets automatically when using consumer groups.Use CommitMessages to commit the offset.

func (*Reader)Lag

func (r *Reader) Lag()int64

Lag returns the lag of the last message returned by ReadMessage, or -1if r is backed by a consumer group.

func (*Reader)Offset

func (r *Reader) Offset()int64

Offset returns the current absolute offset of the reader, or -1if r is backed by a consumer group.

func (*Reader)ReadLag

func (r *Reader) ReadLag(ctxcontext.Context) (lagint64, errerror)

ReadLag returns the current lag of the reader by fetching the last offset ofthe topic and partition and computing the difference between that value andthe offset of the last message returned by ReadMessage.

This method is intended to be used in cases where a program may be unable tocall ReadMessage to update the value returned by Lag, but still needs to getan up to date estimation of how far behind the reader is. For example whenthe consumer is not ready to process the next message.

The function returns a lag of zero when the reader's current offset isnegative.

func (*Reader)ReadMessage

func (r *Reader) ReadMessage(ctxcontext.Context) (Message,error)

ReadMessage reads and return the next message from the r. The method callblocks until a message becomes available, or an error occurs. The programmay also specify a context to asynchronously cancel the blocking operation.

The method returns io.EOF to indicate that the reader has been closed.

If consumer groups are used, ReadMessage will automatically commit theoffset when called. Note that this could result in an offset being committedbefore the message is fully processed.

If more fine-grained control of when offsets are committed is required, itis recommended to use FetchMessage with CommitMessages instead.

func (*Reader)SetOffset

func (r *Reader) SetOffset(offsetint64)error

SetOffset changes the offset from which the next batch of messages will beread. The method fails with io.ErrClosedPipe if the reader has already been closed.

From version 0.2.0, FirstOffset and LastOffset can be used to indicate the firstor last available offset in the partition. Please note while -1 and -2 were acceptedto indicate the first or last offset in previous versions, the meanings of the numberswere swapped in 0.2.0 to match the meanings in other libraries and the Kafka protocolspecification.

func (*Reader)SetOffsetAtadded inv0.2.3

func (r *Reader) SetOffsetAt(ctxcontext.Context, ttime.Time)error

SetOffsetAt changes the offset from which the next batch of messages will beread given the timestamp t.

The method fails if the unable to connect partition leader, or unable to read the offsetgiven the ts, or if the reader has been closed.

func (*Reader)Stats

func (r *Reader) Stats()ReaderStats

Stats returns a snapshot of the reader stats since the last time the methodwas called, or since the reader was created if it is called for the firsttime.

A typical use of this method is to spawn a goroutine that will periodicallycall Stats on a kafka reader and report the metrics to a stats collectionsystem.

typeReaderConfig

type ReaderConfig struct {// The list of broker addresses used to connect to the kafka cluster.Brokers []string// GroupID holds the optional consumer group id.  If GroupID is specified, then// Partition should NOT be specified e.g. 0GroupIDstring// GroupTopics allows specifying multiple topics, but can only be used in// combination with GroupID, as it is a consumer-group feature. As such, if// GroupID is set, then either Topic or GroupTopics must be defined.GroupTopics []string// The topic to read messages from.Topicstring// Partition to read messages from.  Either Partition or GroupID may// be assigned, but not bothPartitionint// An dialer used to open connections to the kafka server. This field is// optional, if nil, the default dialer is used instead.Dialer *Dialer// The capacity of the internal message queue, defaults to 100 if none is// set.QueueCapacityint// MinBytes indicates to the broker the minimum batch size that the consumer// will accept. Setting a high minimum when consuming from a low-volume topic// may result in delayed delivery when the broker does not have enough data to// satisfy the defined minimum.//// Default: 1MinBytesint// MaxBytes indicates to the broker the maximum batch size that the consumer// will accept. The broker will truncate a message to satisfy this maximum, so// choose a value that is high enough for your largest message size.//// Default: 1MBMaxBytesint// Maximum amount of time to wait for new data to come when fetching batches// of messages from kafka.//// Default: 10sMaxWaittime.Duration// ReadBatchTimeout amount of time to wait to fetch message from kafka messages batch.//// Default: 10sReadBatchTimeouttime.Duration// ReadLagInterval sets the frequency at which the reader lag is updated.// Setting this field to a negative value disables lag reporting.ReadLagIntervaltime.Duration// GroupBalancers is the priority-ordered list of client-side consumer group// balancing strategies that will be offered to the coordinator.  The first// strategy that all group members support will be chosen by the leader.//// Default: [Range, RoundRobin]//// Only used when GroupID is setGroupBalancers []GroupBalancer// HeartbeatInterval sets the optional frequency at which the reader sends the consumer// group heartbeat update.//// Default: 3s//// Only used when GroupID is setHeartbeatIntervaltime.Duration// CommitInterval indicates the interval at which offsets are committed to// the broker.  If 0, commits will be handled synchronously.//// Default: 0//// Only used when GroupID is setCommitIntervaltime.Duration// PartitionWatchInterval indicates how often a reader checks for partition changes.// If a reader sees a partition change (such as a partition add) it will rebalance the group// picking up new partitions.//// Default: 5s//// Only used when GroupID is set and WatchPartitionChanges is set.PartitionWatchIntervaltime.Duration// WatchForPartitionChanges is used to inform kafka-go that a consumer group should be// polling the brokers and rebalancing if any partition changes happen to the topic.WatchPartitionChangesbool// SessionTimeout optionally sets the length of time that may pass without a heartbeat// before the coordinator considers the consumer dead and initiates a rebalance.//// Default: 30s//// Only used when GroupID is setSessionTimeouttime.Duration// RebalanceTimeout optionally sets the length of time the coordinator will wait// for members to join as part of a rebalance.  For kafka servers under higher// load, it may be useful to set this value higher.//// Default: 30s//// Only used when GroupID is setRebalanceTimeouttime.Duration// JoinGroupBackoff optionally sets the length of time to wait between re-joining// the consumer group after an error.//// Default: 5sJoinGroupBackofftime.Duration// RetentionTime optionally sets the length of time the consumer group will be saved// by the broker. -1 will disable the setting and leave the// retention up to the broker's offsets.retention.minutes property. By// default, that setting is 1 day for kafka < 2.0 and 7 days for kafka >= 2.0.//// Default: -1//// Only used when GroupID is setRetentionTimetime.Duration// StartOffset determines from whence the consumer group should begin// consuming when it finds a partition without a committed offset.  If// non-zero, it must be set to one of FirstOffset or LastOffset.//// Default: FirstOffset//// Only used when GroupID is setStartOffsetint64// BackoffDelayMin optionally sets the smallest amount of time the reader will wait before// polling for new messages//// Default: 100msReadBackoffMintime.Duration// BackoffDelayMax optionally sets the maximum amount of time the reader will wait before// polling for new messages//// Default: 1sReadBackoffMaxtime.Duration// If not nil, specifies a logger used to report internal changes within the// reader.LoggerLogger// ErrorLogger is the logger used to report errors. If nil, the reader falls// back to using Logger instead.ErrorLoggerLogger// IsolationLevel controls the visibility of transactional records.// ReadUncommitted makes all records visible. With ReadCommitted only// non-transactional and committed records are visible.IsolationLevelIsolationLevel// Limit of how many attempts to connect will be made before returning the error.//// The default is to try 3 times.MaxAttemptsint// OffsetOutOfRangeError indicates that the reader should return an error in// the event of an OffsetOutOfRange error, rather than retrying indefinitely.// This flag is being added to retain backwards-compatibility, so it will be// removed in a future version of kafka-go.OffsetOutOfRangeErrorbool}

ReaderConfig is a configuration object used to create new instances ofReader.

func (*ReaderConfig)Validateadded inv0.2.3

func (config *ReaderConfig) Validate()error

Validate method validates ReaderConfig properties.

typeReaderStats

type ReaderStats struct {Dialsint64 `metric:"kafka.reader.dial.count"      type:"counter"`Fetchesint64 `metric:"kafka.reader.fetch.count"     type:"counter"`Messagesint64 `metric:"kafka.reader.message.count"   type:"counter"`Bytesint64 `metric:"kafka.reader.message.bytes"   type:"counter"`Rebalancesint64 `metric:"kafka.reader.rebalance.count" type:"counter"`Timeoutsint64 `metric:"kafka.reader.timeout.count"   type:"counter"`Errorsint64 `metric:"kafka.reader.error.count"     type:"counter"`DialTimeDurationStats `metric:"kafka.reader.dial.seconds"`ReadTimeDurationStats `metric:"kafka.reader.read.seconds"`WaitTimeDurationStats `metric:"kafka.reader.wait.seconds"`FetchSizeSummaryStats  `metric:"kafka.reader.fetch.size"`FetchBytesSummaryStats  `metric:"kafka.reader.fetch.bytes"`Offsetint64         `metric:"kafka.reader.offset"          type:"gauge"`Lagint64         `metric:"kafka.reader.lag"             type:"gauge"`MinBytesint64         `metric:"kafka.reader.fetch_bytes.min" type:"gauge"`MaxBytesint64         `metric:"kafka.reader.fetch_bytes.max" type:"gauge"`MaxWaittime.Duration `metric:"kafka.reader.fetch_wait.max"  type:"gauge"`QueueLengthint64         `metric:"kafka.reader.queue.length"    type:"gauge"`QueueCapacityint64         `metric:"kafka.reader.queue.capacity"  type:"gauge"`ClientIDstring `tag:"client_id"`Topicstring `tag:"topic"`Partitionstring `tag:"partition"`// The original `Fetches` field had a typo where the metric name was called// "kafak..." instead of "kafka...", in order to offer time to fix monitors// that may be relying on this mistake we are temporarily introducing this// field.DeprecatedFetchesWithTypoint64 `metric:"kafak.reader.fetch.count" type:"counter"`}

ReaderStats is a data structure returned by a call to Reader.Stats that exposesdetails about the behavior of the reader.

typeRecordadded inv0.4.0

type Record =protocol.Record

Record is an interface representing a single kafka record.

Record values are not safe to use concurrently from multiple goroutines.

typeRecordReaderadded inv0.4.0

type RecordReader =protocol.RecordReader

RecordReader is an interface representing a sequence of records. Record setsare used in both produce and fetch requests to represent the sequence ofrecords that are sent to or receive from kafka brokers.

RecordReader values are not safe to use concurrently from multiple goroutines.

funcNewRecordReaderadded inv0.4.0

func NewRecordReader(records ...Record)RecordReader

NewRecordReader reconstructs a RecordSet which exposes the sequence of recordspassed as arguments.

typeReferenceHashadded inv0.4.32

type ReferenceHash struct {Hasherhash.Hash32// contains filtered or unexported fields}

ReferenceHash is a Balancer that uses the provided hash function to determine whichpartition to route messages to. This ensures that messages with the same keyare routed to the same partition.

The logic to calculate the partition is:

(int32(hasher.Sum32()) & 0x7fffffff) % len(partitions) => partition

By default, ReferenceHash uses the FNV-1a algorithm. This is the same algorithm asthe Sarama NewReferenceHashPartitioner and ensures that messages produced by kafka-go willbe delivered to the same topics that the Sarama producer would be delivered to.

func (*ReferenceHash)Balanceadded inv0.4.32

func (h *ReferenceHash) Balance(msgMessage, partitions ...int)int

typeReplicaAssignment

type ReplicaAssignment struct {Partitionint// The list of brokers where the partition should be allocated. There must// be as many entries in thie list as there are replicas of the partition.// The first entry represents the broker that will be the preferred leader// for the partition.//// This field changed in 0.4 from `int` to `[]int`. It was invalid to pass// a single integer as this is supposed to be a list. While this introduces// a breaking change, it probably never worked before.Replicas []int}

typeRequestadded inv0.4.0

type Request =protocol.Message

Request is an interface implemented by types that represent messages sentfrom kafka clients to brokers.

typeRequiredAcksadded inv0.4.1

type RequiredAcksint
const (RequireNoneRequiredAcks = 0RequireOneRequiredAcks = 1RequireAllRequiredAcks = -1)

func (RequiredAcks)MarshalTextadded inv0.4.21

func (acksRequiredAcks) MarshalText() ([]byte,error)

func (RequiredAcks)Stringadded inv0.4.1

func (acksRequiredAcks) String()string

func (*RequiredAcks)UnmarshalTextadded inv0.4.21

func (acks *RequiredAcks) UnmarshalText(b []byte)error

typeResolver

type Resolver interface {// LookupHost looks up the given host using the local resolver.// It returns a slice of that host's addresses.LookupHost(ctxcontext.Context, hoststring) (addrs []string, errerror)}

The Resolver interface is used as an abstraction to provide service discoveryof the hosts of a kafka cluster.

typeResourceTypeadded inv0.4.9

type ResourceTypeint8

https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java

const (ResourceTypeUnknownResourceType = 0ResourceTypeAnyResourceType = 1ResourceTypeTopicResourceType = 2ResourceTypeGroupResourceType = 3// Seehttps://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java#L36ResourceTypeBrokerResourceType = 4ResourceTypeClusterResourceType = 4ResourceTypeTransactionalIDResourceType = 5ResourceTypeDelegationTokenResourceType = 6)

func (ResourceType)MarshalTextadded inv0.4.45

func (rtResourceType) MarshalText() ([]byte,error)

func (ResourceType)Stringadded inv0.4.45

func (rtResourceType) String()string

func (*ResourceType)UnmarshalTextadded inv0.4.45

func (rt *ResourceType) UnmarshalText(text []byte)error

typeResponseadded inv0.4.0

type Response =protocol.Message

Response is an interface implemented by types that represent messages sentfrom kafka brokers in response to client requests.

typeRoundRobin

type RoundRobin struct {ChunkSizeint// contains filtered or unexported fields}

RoundRobin is an Balancer implementation that equally distributes messagesacross all available partitions. It can take an optional chunk size to sendChunkSize messages to the same partition before moving to the next partition.This can be used to improve batch sizes.

func (*RoundRobin)Balance

func (rr *RoundRobin) Balance(msgMessage, partitions ...int)int

Balance satisfies the Balancer interface.

typeRoundRobinGroupBalanceradded inv0.2.0

type RoundRobinGroupBalancer struct{}

RoundrobinGroupBalancer divides partitions evenly among consumers

Example: 5 partitions, 2 consumers

C0: [0, 2, 4]C1: [1, 3]

Example: 6 partitions, 3 consumers

C0: [0, 3]C1: [1, 4]C2: [2, 5]

func (RoundRobinGroupBalancer)AssignGroupsadded inv0.2.0

func (rRoundRobinGroupBalancer) AssignGroups(members []GroupMember, topicPartitions []Partition)GroupMemberAssignments

func (RoundRobinGroupBalancer)ProtocolNameadded inv0.2.0

func (rRoundRobinGroupBalancer) ProtocolName()string

func (RoundRobinGroupBalancer)UserDataadded inv0.2.0

func (rRoundRobinGroupBalancer) UserData() ([]byte,error)

typeRoundTripperadded inv0.4.0

type RoundTripper interface {// RoundTrip sends a request to a kafka broker and returns the response that// was received, or a non-nil error.//// The context passed as first argument can be used to asynchronnously abort// the call if needed.RoundTrip(context.Context,net.Addr,Request) (Response,error)}

RoundTripper is an interface implemented by types which support interactingwith kafka brokers.

var DefaultTransportRoundTripper = &Transport{Dial: (&net.Dialer{Timeout:   3 *time.Second,DualStack:true,}).DialContext,}

DefaultTransport is the default transport used by kafka clients in thispackage.

typeScramMechanismadded inv0.4.43

type ScramMechanismint8
const (ScramMechanismUnknownScramMechanism =iota// 0ScramMechanismSha256// 1ScramMechanismSha512// 2)

typeSummaryStats

type SummaryStats struct {Avgint64 `metric:"avg" type:"gauge"`Minint64 `metric:"min" type:"gauge"`Maxint64 `metric:"max" type:"gauge"`Countint64 `metric:"count" type:"counter"`Sumint64 `metric:"sum" type:"counter"`}

SummaryStats is a data structure that carries a summary of observed values.

typeSyncGroupRequestadded inv0.4.33

type SyncGroupRequest struct {// Address of the kafka broker to sent he request to.Addrnet.Addr// GroupID of the group to sync.GroupIDstring// The generation of the group.GenerationIDint// The member ID assigned by the group.MemberIDstring// The unique identifier for the consumer instance.GroupInstanceIDstring// The name for the class of protocols implemented by the group being joined.ProtocolTypestring// The group protocol name.ProtocolNamestring// The group member assignments.Assignments []SyncGroupRequestAssignment}

SyncGroupRequest is the request structure for the SyncGroup function.

typeSyncGroupRequestAssignmentadded inv0.4.33

type SyncGroupRequestAssignment struct {// The ID of the member to assign.MemberIDstring// The member assignment.AssignmentGroupProtocolAssignment}

SyncGroupRequestAssignment represents an assignement for a goroup memeber.

typeSyncGroupResponseadded inv0.4.33

type SyncGroupResponse struct {// An error that may have occurred when attempting to sync the group.//// The errors contain the kafka error code. Programs may use the standard// errors.Is function to test the error against kafka error codes.Errorerror// The amount of time that the broker throttled the request.Throttletime.Duration// The group protocol type.ProtocolTypestring// The group protocol name.ProtocolNamestring// The member assignment.AssignmentGroupProtocolAssignment}

SyncGroupResponse is the response structure for the SyncGroup function.

typeTopicadded inv0.4.0

type Topic struct {// Name of the topic.Namestring// True if the topic is internal.Internalbool// The list of partition currently available on this topic.Partitions []Partition// An error that may have occurred while attempting to read the topic// metadata.//// The error contains both the kafka error code, and an error message// returned by the kafka broker. Programs may use the standard errors.Is// function to test the error against kafka error codes.Errorerror}

Topic represents a topic in a kafka cluster.

typeTopicAndGroupadded inv0.2.5

type TopicAndGroup struct {TopicstringGroupIdstring}

A ConsumerGroup and Topic as these are both strings we define a type forclarity when passing to the Client as a function argument

N.B TopicAndGroup is currently experimental! Therefore, it is subject tochange, including breaking changes between MINOR and PATCH releases.

DEPRECATED: this type will be removed in version 1.0, programs shouldmigrate to use kafka.(*Client).OffsetFetch instead.

typeTopicConfig

type TopicConfig struct {// Topic nameTopicstring// NumPartitions created. -1 indicates unset.NumPartitionsint// ReplicationFactor for the topic. -1 indicates unset.ReplicationFactorint// ReplicaAssignments among kafka brokers for this topic partitions. If this// is set num_partitions and replication_factor must be unset.ReplicaAssignments []ReplicaAssignment// ConfigEntries holds topic level configuration for topic to be set.ConfigEntries []ConfigEntry}

typeTopicPartitionAssignmentadded inv0.4.9

type TopicPartitionAssignment struct {// Broker IDsBrokerIDs []int32}

typeTopicPartitionsConfigadded inv0.4.9

type TopicPartitionsConfig struct {// Topic nameNamestring// Topic partition's count.Countint32// TopicPartitionAssignments among kafka brokers for this topic partitions.TopicPartitionAssignments []TopicPartitionAssignment}

typeTransportadded inv0.4.0

type Transport struct {// A function used to establish connections to the kafka cluster.Dial func(context.Context,string,string) (net.Conn,error)// Time limit set for establishing connections to the kafka cluster. This// limit includes all round trips done to establish the connections (TLS// handshake, SASL negotiation, etc...).//// Defaults to 5s.DialTimeouttime.Duration// Maximum amount of time that connections will remain open and unused.// The transport will manage to automatically close connections that have// been idle for too long, and re-open them on demand when the transport is// used again.//// Defaults to 30s.IdleTimeouttime.Duration// TTL for the metadata cached by this transport. Note that the value// configured here is an upper bound, the transport randomizes the TTLs to// avoid getting into states where multiple clients end up synchronized and// cause bursts of requests to the kafka broker.//// Default to 6s.MetadataTTLtime.Duration// Topic names for the metadata cached by this transport. If this field is left blank,// metadata information of all topics in the cluster will be retrieved.MetadataTopics []string// Unique identifier that the transport communicates to the brokers when it// sends requests.ClientIDstring// An optional configuration for TLS connections established by this// transport.//// If the ServerTLS *tls.Config// SASL configures the Transfer to use SASL authentication.SASLsasl.Mechanism// An optional resolver used to translate broker host names into network// addresses.//// The resolver will be called for every request (not every connection),// making it possible to implement ACL policies by validating that the// program is allowed to connect to the kafka broker. This also means that// the resolver should probably provide a caching layer to avoid storming// the service discovery backend with requests.//// When set, the Dial function is not responsible for performing name// resolution, and is always called with a pre-resolved address.ResolverBrokerResolver// The background context used to control goroutines started internally by// the transport.//// If nil, context.Background() is used instead.Contextcontext.Context// contains filtered or unexported fields}

Transport is an implementation of the RoundTripper interface.

Transport values manage a pool of connections and automatically discovers theclusters layout to route requests to the appropriate brokers.

Transport values are safe to use concurrently from multiple goroutines.

Note: The intent is for the Transport to become the underlying layer of thekafka.Reader and kafka.Writer types.

func (*Transport)CloseIdleConnectionsadded inv0.4.0

func (t *Transport) CloseIdleConnections()

CloseIdleConnections closes all idle connections immediately, and marks allconnections that are in use to be closed when they become idle again.

func (*Transport)RoundTripadded inv0.4.0

func (t *Transport) RoundTrip(ctxcontext.Context, addrnet.Addr, reqRequest) (Response,error)

RoundTrip sends a request to a kafka cluster and returns the response, or anerror if no responses were received.

Message types are available in sub-packages of the protocol package. Eachkafka API is implemented in a different sub-package. For example, the requestand response types for the Fetch API are available in the protocol/fetchpackage.

The type of the response message will match the type of the request. Forexample, if RoundTrip was called with a *fetch.Request as argument, the valuereturned will be of type *fetch.Response. It is safe for the program to do atype assertion after checking that no error was returned.

This example illustrates the way this method is expected to be used:

r, err := transport.RoundTrip(ctx, addr, &fetch.Request{ ... })if err != nil {...} else {res := r.(*fetch.Response)...}

The transport automatically selects the highest version of the API that issupported by both the kafka-go package and the kafka broker. The negotiationhappens transparently once when connections are established.

This API was introduced in version 0.4 as a way to leverage the lower-levelfeatures of the kafka protocol, but also provide a more efficient way ofmanaging connections to kafka brokers.

typeTxnOffsetCommitadded inv0.4.20

type TxnOffsetCommit struct {PartitionintOffsetint64Metadatastring}

TxnOffsetCommit represent the commit of an offset to a partition within a transaction.

The extra metadata is opaque to the kafka protocol, it is intended to holdinformation like an identifier for the process that committed the offset,or the time at which the commit was made.

typeTxnOffsetCommitPartitionadded inv0.4.20

type TxnOffsetCommitPartition struct {// ID of the partition.Partitionint// An error that may have occurred while attempting to publish consumer// group offsets for this partition.//// The error contains both the kafka error code, and an error message// returned by the kafka broker. Programs may use the standard errors.Is// function to test the error against kafka error codes.Errorerror}

TxnOffsetFetchPartition represents the state of a single partition in responsesto committing offsets within a transaction.

typeTxnOffsetCommitRequestadded inv0.4.20

type TxnOffsetCommitRequest struct {// Address of the kafka broker to send the request to.Addrnet.Addr// The transactional id key.TransactionalIDstring// ID of the consumer group to publish the offsets for.GroupIDstring// The Producer ID (PID) for the current producer session;// received from an InitProducerID request.ProducerIDint// The epoch associated with the current producer session for the given PIDProducerEpochint// GenerationID is the current generation for the group.GenerationIDint// ID of the group member submitting the offsets.MemberIDstring// GroupInstanceID is a unique identifier for the consumer.GroupInstanceIDstring// Set of topic partitions to publish the offsets for.//// Not that offset commits need to be submitted to the broker acting as the// group coordinator. This will be automatically resolved by the transport.Topics map[string][]TxnOffsetCommit}

TxnOffsetCommitRequest represents a request sent to a kafka broker to commitoffsets for a partition within a transaction.

typeTxnOffsetCommitResponseadded inv0.4.20

type TxnOffsetCommitResponse struct {// The amount of time that the broker throttled the request.Throttletime.Duration// Set of topic partitions that the kafka broker has accepted offset commits// for.Topics map[string][]TxnOffsetCommitPartition}

TxnOffsetFetchResponse represents a response from a kafka broker to an offsetcommit request within a transaction.

typeUserScramCredentialsDeletionadded inv0.4.43

type UserScramCredentialsDeletion struct {NamestringMechanismScramMechanism}

typeUserScramCredentialsUpsertionadded inv0.4.43

type UserScramCredentialsUpsertion struct {NamestringMechanismScramMechanismIterationsintSalt           []byteSaltedPassword []byte}

typeUserScramCredentialsUseradded inv0.4.43

type UserScramCredentialsUser struct {Namestring}

typeVersionadded inv0.4.0

type Versionint16

Version represents a version number for kafka APIs.

func (Version)Marshaladded inv0.4.0

func (nVersion) Marshal(v interface{}) ([]byte,error)

Marshal is like the top-level Marshal function, but will only encode structfields for which n falls within the min and max versions specified on thestruct tag.

func (Version)Unmarshaladded inv0.4.0

func (nVersion) Unmarshal(b []byte, v interface{})error

Unmarshal is like the top-level Unmarshal function, but will only decodestruct fields for which n falls within the min and max versions specified onthe struct tag.

typeWriteErrorsadded inv0.4.1

type WriteErrors []error

WriteError is returned by kafka.(*Writer).WriteMessages when the writer isnot configured to write messages asynchronously. WriteError values containa list of errors where each entry matches the position of a message in theWriteMessages call. The program can determine the status of each message bylooping over the error:

switch err := w.WriteMessages(ctx, msgs...).(type) {case nil:case kafka.WriteErrors:for i := range msgs {if err[i] != nil {// handle the error writing msgs[i]...}}default:// handle other errors...}

func (WriteErrors)Countadded inv0.4.1

func (errWriteErrors) Count()int

Count counts the number of non-nil errors in err.

func (WriteErrors)Erroradded inv0.4.1

func (errWriteErrors) Error()string

typeWriter

type Writer struct {// Address of the kafka cluster that this writer is configured to send// messages to.//// This field is required, attempting to write messages to a writer with a// nil address will error.Addrnet.Addr// Topic is the name of the topic that the writer will produce messages to.//// Setting this field or not is a mutually exclusive option. If you set Topic// here, you must not set Topic for any produced Message. Otherwise, if youdo// not set Topic, every Message must have Topic specified.Topicstring// The balancer used to distribute messages across partitions.//// The default is to use a round-robin distribution.BalancerBalancer// Limit on how many attempts will be made to deliver a message.//// The default is to try at most 10 times.MaxAttemptsint// WriteBackoffMin optionally sets the smallest amount of time the writer waits before// it attempts to write a batch of messages//// Default: 100msWriteBackoffMintime.Duration// WriteBackoffMax optionally sets the maximum amount of time the writer waits before// it attempts to write a batch of messages//// Default: 1sWriteBackoffMaxtime.Duration// Limit on how many messages will be buffered before being sent to a// partition.//// The default is to use a target batch size of 100 messages.BatchSizeint// Limit the maximum size of a request in bytes before being sent to// a partition.//// The default is to use a kafka default value of 1048576.BatchBytesint64// Time limit on how often incomplete message batches will be flushed to// kafka.//// The default is to flush at least every second.BatchTimeouttime.Duration// Timeout for read operations performed by the Writer.//// Defaults to 10 seconds.ReadTimeouttime.Duration// Timeout for write operation performed by the Writer.//// Defaults to 10 seconds.WriteTimeouttime.Duration// Number of acknowledges from partition replicas required before receiving// a response to a produce request, the following values are supported:////  RequireNone (0)  fire-and-forget, do not wait for acknowledgements from the//  RequireOne  (1)  wait for the leader to acknowledge the writes//  RequireAll  (-1) wait for the full ISR to acknowledge the writes//// Defaults to RequireNone.RequiredAcksRequiredAcks// Setting this flag to true causes the WriteMessages method to never block.// It also means that errors are ignored since the caller will not receive// the returned value. Use this only if you don't care about guarantees of// whether the messages were written to kafka.//// Defaults to false.Asyncbool// An optional function called when the writer succeeds or fails the// delivery of messages to a kafka partition. When writing the messages// fails, the `err` parameter will be non-nil.//// The messages that the Completion function is called with have their// topic, partition, offset, and time set based on the Produce responses// received from kafka. All messages passed to a call to the function have// been written to the same partition. The keys and values of messages are// referencing the original byte slices carried by messages in the calls to// WriteMessages.//// The function is called from goroutines started by the writer. Calls to// Close will block on the Completion function calls. When the Writer is// not writing asynchronously, the WriteMessages call will also block on// Completion function, which is a useful guarantee if the byte slices// for the message keys and values are intended to be reused after the// WriteMessages call returned.//// If a completion function panics, the program terminates because the// panic is not recovered by the writer and bubbles up to the top of the// goroutine's call stack.Completion func(messages []Message, errerror)// Compression set the compression codec to be used to compress messages.CompressionCompression// If not nil, specifies a logger used to report internal changes within the// writer.LoggerLogger// ErrorLogger is the logger used to report errors. If nil, the writer falls// back to using Logger instead.ErrorLoggerLogger// A transport used to send messages to kafka clusters.//// If nil, DefaultTransport is used.TransportRoundTripper// AllowAutoTopicCreation notifies writer to create topic if missing.AllowAutoTopicCreationbool// contains filtered or unexported fields}

The Writer type provides the implementation of a producer of kafka messagesthat automatically distributes messages across partitions of a single topicusing a configurable balancing policy.

Writes manage the dispatch of messages across partitions of the topic theyare configured to write to using a Balancer, and aggregate batches tooptimize the writes to kafka.

Writers may be configured to be used synchronously or asynchronously. Whenuse synchronously, calls to WriteMessages block until the messages have beenwritten to kafka. In this mode, the program should inspect the error returnedby the function and test if it an instance of kafka.WriteErrors in order toidentify which messages have succeeded or failed, for example:

// Construct a synchronous writer (the default mode).w := &kafka.Writer{Addr:         kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),Topic:        "topic-A",RequiredAcks: kafka.RequireAll,}... // Passing a context can prevent the operation from blocking indefinitely.switch err := w.WriteMessages(ctx, msgs...).(type) {case nil:case kafka.WriteErrors:for i := range msgs {if err[i] != nil {// handle the error writing msgs[i]...}}default:// handle other errors...}

In asynchronous mode, the program may configure a completion handler on thewriter to receive notifications of messages being written to kafka:

w := &kafka.Writer{Addr:         kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),Topic:        "topic-A",RequiredAcks: kafka.RequireAll,Async:        true, // make the writer asynchronousCompletion: func(messages []kafka.Message, err error) {...},}...// Because the writer is asynchronous, there is no need for the context to// be cancelled, the call will never block.if err := w.WriteMessages(context.Background(), msgs...); err != nil {// Only validation errors would be reported in this case....}

Methods of Writer are safe to use concurrently from multiple goroutines,however the writer configuration should not be modified after first use.

Example
w := &kafka.Writer{Addr:  kafka.TCP("localhost:9092"),Topic: "Topic-1",}w.WriteMessages(context.Background(),kafka.Message{Key:   []byte("Key-A"),Value: []byte("Hello World!"),},)w.Close()

funcNewWriter

func NewWriter(configWriterConfig) *Writer

NewWriter creates and returns a new Writer configured with config.

DEPRECATED: Writer value can be instantiated and configured directly,this function is retained for backward compatibility and will be removedin version 1.0.

func (*Writer)Close

func (w *Writer) Close()error

Close flushes pending writes, and waits for all writes to complete beforereturning. Calling Close also prevents new writes from being submitted tothe writer, further calls to WriteMessages and the like will fail withio.ErrClosedPipe.

func (*Writer)Stats

func (w *Writer) Stats()WriterStats

Stats returns a snapshot of the writer stats since the last time the methodwas called, or since the writer was created if it is called for the firsttime.

A typical use of this method is to spawn a goroutine that will periodicallycall Stats on a kafka writer and report the metrics to a stats collectionsystem.

func (*Writer)WriteMessages

func (w *Writer) WriteMessages(ctxcontext.Context, msgs ...Message)error

WriteMessages writes a batch of messages to the kafka topic configured on thiswriter.

Unless the writer was configured to write messages asynchronously, the methodblocks until all messages have been written, or until the maximum number ofattempts was reached.

When sending synchronously and the writer's batch size is configured to begreater than 1, this method blocks until either a full batch can be assembledor the batch timeout is reached. The batch size and timeouts are evaluatedper partition, so the choice of Balancer can also influence the flushingbehavior. For example, the Hash balancer will require on average N * batchsize messages to trigger a flush where N is the number of partitions. Thebest way to achieve good batching behavior is to share one Writer amongstmultiple go routines.

When the method returns an error, it may be of type kafka.WriteError to allowthe caller to determine the status of each message.

The context passed as first argument may also be used to asynchronouslycancel the operation. Note that in this case there are no guarantees made onwhether messages were written to kafka, they might also still be writtenafter this method has already returned, therefore it is important to notmodify byte slices of passed messages if WriteMessages returned early dueto a canceled context.The program should assume that the whole batch failed and re-write themessages later (which could then cause duplicates).

typeWriterConfig

type WriterConfig struct {// The list of brokers used to discover the partitions available on the// kafka cluster.//// This field is required, attempting to create a writer with an empty list// of brokers will panic.Brokers []string// The topic that the writer will produce messages to.//// If provided, this will be used to set the topic for all produced messages.// If not provided, each Message must specify a topic for itself. This must be// mutually exclusive, otherwise the Writer will return an error.Topicstring// The dialer used by the writer to establish connections to the kafka// cluster.//// If nil, the default dialer is used instead.Dialer *Dialer// The balancer used to distribute messages across partitions.//// The default is to use a round-robin distribution.BalancerBalancer// Limit on how many attempts will be made to deliver a message.//// The default is to try at most 10 times.MaxAttemptsint// DEPRECATED: in versions prior to 0.4, the writer used channels internally// to dispatch messages to partitions. This has been replaced by an in-memory// aggregation of batches which uses shared state instead of message passing,// making this option unnecessary.QueueCapacityint// Limit on how many messages will be buffered before being sent to a// partition.//// The default is to use a target batch size of 100 messages.BatchSizeint// Limit the maximum size of a request in bytes before being sent to// a partition.//// The default is to use a kafka default value of 1048576.BatchBytesint// Time limit on how often incomplete message batches will be flushed to// kafka.//// The default is to flush at least every second.BatchTimeouttime.Duration// Timeout for read operations performed by the Writer.//// Defaults to 10 seconds.ReadTimeouttime.Duration// Timeout for write operation performed by the Writer.//// Defaults to 10 seconds.WriteTimeouttime.Duration// DEPRECATED: in versions prior to 0.4, the writer used to maintain a cache// the topic layout. With the change to use a transport to manage connections,// the responsibility of syncing the cluster layout has been delegated to the// transport.RebalanceIntervaltime.Duration// DEPRECATED: in versions prior to 0.4, the writer used to manage connections// to the kafka cluster directly. With the change to use a transport to manage// connections, the writer has no connections to manage directly anymore.IdleConnTimeouttime.Duration// Number of acknowledges from partition replicas required before receiving// a response to a produce request. The default is -1, which means to wait for// all replicas, and a value above 0 is required to indicate how many replicas// should acknowledge a message to be considered successful.RequiredAcksint// Setting this flag to true causes the WriteMessages method to never block.// It also means that errors are ignored since the caller will not receive// the returned value. Use this only if you don't care about guarantees of// whether the messages were written to kafka.Asyncbool// CompressionCodec set the codec to be used to compress Kafka messages.CompressionCodec// If not nil, specifies a logger used to report internal changes within the// writer.LoggerLogger// ErrorLogger is the logger used to report errors. If nil, the writer falls// back to using Logger instead.ErrorLoggerLogger}

WriterConfig is a configuration type used to create new instances of Writer.

DEPRECATED: writer values should be configured directly by assigning theirexported fields. This type is kept for backward compatibility, and will beremoved in version 1.0.

func (*WriterConfig)Validateadded inv0.2.3

func (config *WriterConfig) Validate()error

Validate method validates WriterConfig properties.

typeWriterStats

type WriterStats struct {Writesint64 `metric:"kafka.writer.write.count"     type:"counter"`Messagesint64 `metric:"kafka.writer.message.count"   type:"counter"`Bytesint64 `metric:"kafka.writer.message.bytes"   type:"counter"`Errorsint64 `metric:"kafka.writer.error.count"     type:"counter"`BatchTimeDurationStats `metric:"kafka.writer.batch.seconds"`BatchQueueTimeDurationStats `metric:"kafka.writer.batch.queue.seconds"`WriteTimeDurationStats `metric:"kafka.writer.write.seconds"`WaitTimeDurationStats `metric:"kafka.writer.wait.seconds"`Retriesint64         `metric:"kafka.writer.retries.count" type:"counter"`BatchSizeSummaryStats  `metric:"kafka.writer.batch.size"`BatchBytesSummaryStats  `metric:"kafka.writer.batch.bytes"`MaxAttemptsint64         `metric:"kafka.writer.attempts.max"  type:"gauge"`WriteBackoffMintime.Duration `metric:"kafka.writer.backoff.min"   type:"gauge"`WriteBackoffMaxtime.Duration `metric:"kafka.writer.backoff.max"   type:"gauge"`MaxBatchSizeint64         `metric:"kafka.writer.batch.max"     type:"gauge"`BatchTimeouttime.Duration `metric:"kafka.writer.batch.timeout" type:"gauge"`ReadTimeouttime.Duration `metric:"kafka.writer.read.timeout"  type:"gauge"`WriteTimeouttime.Duration `metric:"kafka.writer.write.timeout" type:"gauge"`RequiredAcksint64         `metric:"kafka.writer.acks.required" type:"gauge"`Asyncbool          `metric:"kafka.writer.async"         type:"gauge"`Topicstring `tag:"topic"`// DEPRECATED: these fields will only be reported for backward compatibility// if the Writer was constructed with NewWriter.Dialsint64         `metric:"kafka.writer.dial.count" type:"counter"`DialTimeDurationStats `metric:"kafka.writer.dial.seconds"`// DEPRECATED: these fields were meaningful prior to kafka-go 0.4, changes// to the internal implementation and the introduction of the transport type// made them unnecessary.//// The values will be zero but are left for backward compatibility to avoid// breaking programs that used these fields.Rebalancesint64RebalanceIntervaltime.DurationQueueLengthint64QueueCapacityint64ClientIDstring}

WriterStats is a data structure returned by a call to Writer.Stats thatexposes details about the behavior of the writer.

Source Files

View all Source files

Directories

PathSynopsis
zstd
Package zstd implements Zstandard compression.
Package zstd implements Zstandard compression.
Package gzip does nothing, it's kept for backward compatibility to avoid breaking the majority of programs that imported it to install the compression codec, which is now always included.
Package gzip does nothing, it's kept for backward compatibility to avoid breaking the majority of programs that imported it to install the compression codec, which is now always included.
Package lz4 does nothing, it's kept for backward compatibility to avoid breaking the majority of programs that imported it to install the compression codec, which is now always included.
Package lz4 does nothing, it's kept for backward compatibility to avoid breaking the majority of programs that imported it to install the compression codec, which is now always included.
Package snappy does nothing, it's kept for backward compatibility to avoid breaking the majority of programs that imported it to install the compression codec, which is now always included.
Package snappy does nothing, it's kept for backward compatibility to avoid breaking the majority of programs that imported it to install the compression codec, which is now always included.
Package topics is an experimental package that provides additional tooling around Kafka Topics.
Package topics is an experimental package that provides additional tooling around Kafka Topics.
Package zstd does nothing, it's kept for backward compatibility to avoid breaking the majority of programs that imported it to install the compression codec, which is now always included.
Package zstd does nothing, it's kept for backward compatibility to avoid breaking the majority of programs that imported it to install the compression codec, which is now always included.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f orF : Jump to
y orY : Canonical URL
go.dev uses cookies from Google to deliver and enhance the quality of its services and to analyze traffic.Learn more.

[8]ページ先頭

©2009-2025 Movatter.jp