Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Confluent's Apache Kafka .NET client

License

NotificationsYou must be signed in to change notification settings

confluentinc/confluent-kafka-dotnet

 
 

Repository files navigation

Try Confluent Cloud - The Data Streaming Platform

Confluent's .NET Client for Apache KafkaTM

Chat on Slack

confluent-kafka-dotnet is Confluent's .NET client forApache Kafka and theConfluent Platform.

Features:

  • High performance - confluent-kafka-dotnet is a lightweight wrapper aroundlibrdkafka, a finely tuned Cclient.

  • Reliability - There are a lot of details to get right when writing an Apache Kafkaclient. We get them right in one place (librdkafka) and leverage this workacross all of our clients (alsoconfluent-kafka-pythonandconfluent-kafka-go).

  • Supported - Commercial support is offered byConfluent.

  • Future proof - Confluent, founded by theoriginal creator/co-creator of Kafka, is building astreaming platformwith Apache Kafka at its core. It's high priority for us that client features keeppace with core Apache Kafka and components of theConfluent Platform.

confluent-kafka-dotnet is derived from Andreas Heider'srdkafka-dotnet.We're fans of his work and were very happy to have been able to leverage rdkafka-dotnet as the basis of thisclient. Thanks Andreas!

Referencing

confluent-kafka-dotnet is distributed via NuGet. We provide the following packages:

To install Confluent.Kafka from within Visual Studio, search for Confluent.Kafka in the NuGet Package Manager UI, or run the following command in the Package Manager Console:

Install-Package Confluent.Kafka -Version 2.11.0

To add a reference to a dotnet core project, execute the following at the command line:

dotnet add package -v 2.11.1 Confluent.Kafka

Note:Confluent.Kafka depends on thelibrdkafka.redist package which provides a number of different builds oflibrdkafka that are compatible withcommon platforms. If you are on one of these platforms this will all work seamlessly (and you don't need to explicitly referencelibrdkafka.redist). If you are on a different platform, you may need tobuild librdkafka manually (or acquire it via other means) and load it using theLibrary.Load method.

Branch builds

Nuget packages corresponding to all commits to release branches are available from the following nuget package source (Note: this is not a web URL - youshould specify it in the nuget package manager):https://ci.appveyor.com/nuget/confluent-kafka-dotnet. The version suffix of these nuget packagesmatches the appveyor build number. You can see which commit a particular build number corresponds to by looking at theAppVeyor build history

Usage

For a step-by-step guide and code samples, seeGetting Started with Apache Kafka and .NET onConfluent Developer.

You can also take the free self-paced training courseApache Kafka for .NET Developers onConfluent Developer.

Take a look in theexamples directory and at theintegration tests for further examples.

For an overview of configuration properties, refer to thelibrdkafka documentation.

Basic Producer Examples

You should use theProduceAsync method if you would like to wait for the result of your producerequests before proceeding. You might typically want to do this in highly concurrent scenarios,for example in the context of handling web requests. Behind the scenes, the client will manageoptimizing communication with the Kafka brokers for you, batching requests as appropriate.

usingSystem;usingSystem.Threading.Tasks;usingConfluent.Kafka;classProgram{publicstaticasyncTaskMain(string[]args){varconfig=newProducerConfig{BootstrapServers="localhost:9092"};// If serializers are not specified, default serializers from// `Confluent.Kafka.Serializers` will be automatically used where// available. Note: by default strings are encoded as UTF8.using(varp=newProducerBuilder<Null,string>(config).Build()){try{vardr=awaitp.ProduceAsync("test-topic",newMessage<Null,string>{Value="test"});Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");}catch(ProduceException<Null,string>e){Console.WriteLine($"Delivery failed:{e.Error.Reason}");}}}}

Note that a server round-trip is slow (3ms at a minimum; actual latency depends on many factors).In highly concurrent scenarios you will achieve high overall throughput out of the producer usingthe above approach, but there will be a delay on eachawait call. In stream processingapplications, where you would like to process many messages in rapid succession, you would typicallyuse theProduce method instead:

usingSystem;usingConfluent.Kafka;classProgram{publicstaticvoidMain(string[]args){varconf=newProducerConfig{BootstrapServers="localhost:9092"};Action<DeliveryReport<Null,string>>handler= r=>Console.WriteLine(!r.Error.IsError?$"Delivered message to{r.TopicPartitionOffset}":$"Delivery Error:{r.Error.Reason}");using(varp=newProducerBuilder<Null,string>(conf).Build()){for(inti=0;i<100;++i){p.Produce("my-topic",newMessage<Null,string>{Value=i.ToString()},handler);}// wait for up to 10 seconds for any inflight messages to be delivered.p.Flush(TimeSpan.FromSeconds(10));}}}

Basic Consumer Example

usingSystem;usingSystem.Threading;usingConfluent.Kafka;classProgram{publicstaticvoidMain(string[]args){varconf=newConsumerConfig{GroupId="test-consumer-group",BootstrapServers="localhost:9092",// Note: The AutoOffsetReset property determines the start offset in the event// there are not yet any committed offsets for the consumer group for the// topic/partitions of interest. By default, offsets are committed// automatically, so in this example, consumption will only start from the// earliest message in the topic 'my-topic' the first time you run the program.AutoOffsetReset=AutoOffsetReset.Earliest};using(varc=newConsumerBuilder<Ignore,string>(conf).Build()){c.Subscribe("my-topic");CancellationTokenSourcects=newCancellationTokenSource();Console.CancelKeyPress+=(_,e)=>{// Prevent the process from terminating.e.Cancel=true;cts.Cancel();};try{while(true){try{varcr=c.Consume(cts.Token);Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");}catch(ConsumeExceptione){Console.WriteLine($"Error occured:{e.Error.Reason}");}}}catch(OperationCanceledException){// Ensure the consumer leaves the group cleanly and final offsets are committed.c.Close();}}}}

IHostedService and Web Application Integration

TheWeb example demonstrates how to integrateApache Kafka with a web application, including how to implementIHostedService to realize a long running consumer poll loop, how toregister a producer as a singleton service, and how to bind configuration from an injectedIConfiguration instance.

Exactly Once Processing

The .NET Client has full support for transactions and idempotent message production, allowing you to write horizontally scalable streamprocessing applications with exactly once semantics. TheExactlyOnce example demonstrates this capability by wayof an implementation of the classic "word count" problem, also demonstrating how to use theFASTERKey/Value store (similar to RocksDb) to materialize working state that may be larger than available memory, and incremental rebalancingto avoid stop-the-world rebalancing operations and unnecessary reloading of state when you add or remove processing nodes.

Schema Registry Integration

The three "Serdes" packages provide serializers and deserializers for Avro, Protobuf and JSON withConfluent Schema Registry integration. TheConfluent.SchemaRegistry nuget package provides a client for interfacing withSchema Registry's REST API.

Note: All three serialization formats are supported across Confluent Platform. They each make different tradeoffs, and you should use the one that best matches to your requirements. Avro is well suited to the streaming data use-case, but thequality andmaturity of the non-Java implementations lags that of Java - this is an important consideration. Protobuf and JSON both have great support in .NET.

Error Handling

Errors delivered to a client's error handler should be considered informational except when theIsFatal flagis set totrue, indicating that the client is in an un-recoverable state. Currently, this can only happen onthe producer, and only whenenable.idempotence has been set totrue. In all other scenarios, clients willattempt to recover from all errors automatically.

Although calling most methods on the clients will result in a fatal error if the client is in an un-recoverablestate, you should generally only need to explicitly check for fatal errors in your error handler, and handlethis scenario there.

Producer

When usingProduce, to determine whether a particular message has been successfully delivered to a cluster,check theError field of theDeliveryReport during the delivery handler callback.

When usingProduceAsync, any delivery result other thanNoError will cause the returnedTask to be in thefaulted state, with theTask.Exception field set to aProduceException containing information about the messageand error via theDeliveryResult andError fields. Note: if youawait the call, this means aProduceExceptionwill be thrown.

Consumer

AllConsume errors will result in aConsumeException with further information about the error and contextavailable via theError andConsumeResult fields.

3rd Party

There are numerous libraries that expand on the capabilities provided by Confluent.Kafka, or use Confluent.Kafkato integrate with Kafka. For more information, refer to the3rd Party Libraries page.

Confluent Cloud

For a step-by-step guide on using the .NET client with Confluent Cloud seeGetting Started with Apache Kafka and .NET onConfluent Developer.

You can also refer to theConfluent Cloud example which demonstrates how to configure the .NET client for use withConfluent Cloud.

Developer Notes

Instructions on building and testing confluent-kafka-dotnet can be foundhere.

Copyright (c)2016-2019Confluent Inc.2015-2016Andreas Heider

KAFKA is a registered trademark of The Apache Software Foundation and has been licensed for useby confluent-kafka-dotnet. confluent-kafka-dotnet has no affiliation with and is not endorsed byThe Apache Software Foundation.


[8]ページ先頭

©2009-2025 Movatter.jp