- Notifications
You must be signed in to change notification settings - Fork884
Confluent's Apache Kafka .NET client
License
confluentinc/confluent-kafka-dotnet
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
confluent-kafka-dotnet is Confluent's .NET client forApache Kafka and theConfluent Platform.
Features:
High performance - confluent-kafka-dotnet is a lightweight wrapper aroundlibrdkafka, a finely tuned Cclient.
Reliability - There are a lot of details to get right when writing an Apache Kafkaclient. We get them right in one place (librdkafka) and leverage this workacross all of our clients (alsoconfluent-kafka-pythonandconfluent-kafka-go).
Supported - Commercial support is offered byConfluent.
Future proof - Confluent, founded by theoriginal creator/co-creator of Kafka, is building astreaming platformwith Apache Kafka at its core. It's high priority for us that client features keeppace with core Apache Kafka and components of theConfluent Platform.
confluent-kafka-dotnet is derived from Andreas Heider'srdkafka-dotnet.We're fans of his work and were very happy to have been able to leverage rdkafka-dotnet as the basis of thisclient. Thanks Andreas!
confluent-kafka-dotnet is distributed via NuGet. We provide the following packages:
- Confluent.Kafka[netstandard2.0, net462, net6.0, net8.0] - The core client library.
- Confluent.SchemaRegistry.Serdes.Avro[netstandard2.0, net6.0, net8.0] - Provides a serializer and deserializer for working with Avro serialized data with Confluent Schema Registry integration.
- Confluent.SchemaRegistry.Serdes.Protobuf[netstandard2.0, net6.0, net8.0] - Provides a serializer and deserializer for working with Protobuf serialized data with Confluent Schema Registry integration.
- Confluent.SchemaRegistry.Serdes.Json[netstandard2.0, net6.0, net8.0] - Provides a serializer and deserializer for working with Json serialized data with Confluent Schema Registry integration.
- Confluent.SchemaRegistry[netstandard2.0, net6.0, net8.0] - Confluent Schema Registry client (a dependency of the Confluent.SchemaRegistry.Serdes packages).
- Confluent.SchemaRegistry.Encryption[net6.0, net8.0] - Confluent Schema Registry client-side field-level encryption client (a dependency of the other Confluent.SchemaRegistry.Encryption.* packages).
- Confluent.SchemaRegistry.Encryption.Aws[net6.0, net8.0] - Confluent Schema Registry client-side field-level encryption client for AWS KMS.
- Confluent.SchemaRegistry.Encryption.Azure[net6.0, net8.0] - Confluent Schema Registry client-side field-level encryption client for Azure Key Vault.
- Confluent.SchemaRegistry.Encryption.Gcp[net6.0, net8.0] - Confluent Schema Registry client-side field-level encryption client for Google Cloud KMS.
- Confluent.SchemaRegistry.Encryption.HcVault[net6.0, net8.0] - Confluent Schema Registry client-side field-level encryption client for Hashicorp Vault.
- Confluent.SchemaRegistry.Rules[net6.0, net8.0] - Confluent Schema Registry client-side support for data quality rules (via the Common Expression Language) and schema migration rules (via JSONata).
To install Confluent.Kafka from within Visual Studio, search for Confluent.Kafka in the NuGet Package Manager UI, or run the following command in the Package Manager Console:
Install-Package Confluent.Kafka -Version 2.11.0To add a reference to a dotnet core project, execute the following at the command line:
dotnet add package -v 2.11.1 Confluent.KafkaNote:Confluent.Kafka depends on thelibrdkafka.redist package which provides a number of different builds oflibrdkafka that are compatible withcommon platforms. If you are on one of these platforms this will all work seamlessly (and you don't need to explicitly referencelibrdkafka.redist). If you are on a different platform, you may need tobuild librdkafka manually (or acquire it via other means) and load it using theLibrary.Load method.
Nuget packages corresponding to all commits to release branches are available from the following nuget package source (Note: this is not a web URL - youshould specify it in the nuget package manager):https://ci.appveyor.com/nuget/confluent-kafka-dotnet. The version suffix of these nuget packagesmatches the appveyor build number. You can see which commit a particular build number corresponds to by looking at theAppVeyor build history
For a step-by-step guide and code samples, seeGetting Started with Apache Kafka and .NET onConfluent Developer.
You can also take the free self-paced training courseApache Kafka for .NET Developers onConfluent Developer.
Take a look in theexamples directory and at theintegration tests for further examples.
For an overview of configuration properties, refer to thelibrdkafka documentation.
You should use theProduceAsync method if you would like to wait for the result of your producerequests before proceeding. You might typically want to do this in highly concurrent scenarios,for example in the context of handling web requests. Behind the scenes, the client will manageoptimizing communication with the Kafka brokers for you, batching requests as appropriate.
usingSystem;usingSystem.Threading.Tasks;usingConfluent.Kafka;classProgram{publicstaticasyncTaskMain(string[]args){varconfig=newProducerConfig{BootstrapServers="localhost:9092"};// If serializers are not specified, default serializers from// `Confluent.Kafka.Serializers` will be automatically used where// available. Note: by default strings are encoded as UTF8.using(varp=newProducerBuilder<Null,string>(config).Build()){try{vardr=awaitp.ProduceAsync("test-topic",newMessage<Null,string>{Value="test"});Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");}catch(ProduceException<Null,string>e){Console.WriteLine($"Delivery failed:{e.Error.Reason}");}}}}
Note that a server round-trip is slow (3ms at a minimum; actual latency depends on many factors).In highly concurrent scenarios you will achieve high overall throughput out of the producer usingthe above approach, but there will be a delay on eachawait call. In stream processingapplications, where you would like to process many messages in rapid succession, you would typicallyuse theProduce method instead:
usingSystem;usingConfluent.Kafka;classProgram{publicstaticvoidMain(string[]args){varconf=newProducerConfig{BootstrapServers="localhost:9092"};Action<DeliveryReport<Null,string>>handler= r=>Console.WriteLine(!r.Error.IsError?$"Delivered message to{r.TopicPartitionOffset}":$"Delivery Error:{r.Error.Reason}");using(varp=newProducerBuilder<Null,string>(conf).Build()){for(inti=0;i<100;++i){p.Produce("my-topic",newMessage<Null,string>{Value=i.ToString()},handler);}// wait for up to 10 seconds for any inflight messages to be delivered.p.Flush(TimeSpan.FromSeconds(10));}}}
usingSystem;usingSystem.Threading;usingConfluent.Kafka;classProgram{publicstaticvoidMain(string[]args){varconf=newConsumerConfig{GroupId="test-consumer-group",BootstrapServers="localhost:9092",// Note: The AutoOffsetReset property determines the start offset in the event// there are not yet any committed offsets for the consumer group for the// topic/partitions of interest. By default, offsets are committed// automatically, so in this example, consumption will only start from the// earliest message in the topic 'my-topic' the first time you run the program.AutoOffsetReset=AutoOffsetReset.Earliest};using(varc=newConsumerBuilder<Ignore,string>(conf).Build()){c.Subscribe("my-topic");CancellationTokenSourcects=newCancellationTokenSource();Console.CancelKeyPress+=(_,e)=>{// Prevent the process from terminating.e.Cancel=true;cts.Cancel();};try{while(true){try{varcr=c.Consume(cts.Token);Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");}catch(ConsumeExceptione){Console.WriteLine($"Error occured:{e.Error.Reason}");}}}catch(OperationCanceledException){// Ensure the consumer leaves the group cleanly and final offsets are committed.c.Close();}}}}
TheWeb example demonstrates how to integrateApache Kafka with a web application, including how to implementIHostedService to realize a long running consumer poll loop, how toregister a producer as a singleton service, and how to bind configuration from an injectedIConfiguration instance.
The .NET Client has full support for transactions and idempotent message production, allowing you to write horizontally scalable streamprocessing applications with exactly once semantics. TheExactlyOnce example demonstrates this capability by wayof an implementation of the classic "word count" problem, also demonstrating how to use theFASTERKey/Value store (similar to RocksDb) to materialize working state that may be larger than available memory, and incremental rebalancingto avoid stop-the-world rebalancing operations and unnecessary reloading of state when you add or remove processing nodes.
The three "Serdes" packages provide serializers and deserializers for Avro, Protobuf and JSON withConfluent Schema Registry integration. TheConfluent.SchemaRegistry nuget package provides a client for interfacing withSchema Registry's REST API.
Note: All three serialization formats are supported across Confluent Platform. They each make different tradeoffs, and you should use the one that best matches to your requirements. Avro is well suited to the streaming data use-case, but thequality andmaturity of the non-Java implementations lags that of Java - this is an important consideration. Protobuf and JSON both have great support in .NET.
Errors delivered to a client's error handler should be considered informational except when theIsFatal flagis set totrue, indicating that the client is in an un-recoverable state. Currently, this can only happen onthe producer, and only whenenable.idempotence has been set totrue. In all other scenarios, clients willattempt to recover from all errors automatically.
Although calling most methods on the clients will result in a fatal error if the client is in an un-recoverablestate, you should generally only need to explicitly check for fatal errors in your error handler, and handlethis scenario there.
When usingProduce, to determine whether a particular message has been successfully delivered to a cluster,check theError field of theDeliveryReport during the delivery handler callback.
When usingProduceAsync, any delivery result other thanNoError will cause the returnedTask to be in thefaulted state, with theTask.Exception field set to aProduceException containing information about the messageand error via theDeliveryResult andError fields. Note: if youawait the call, this means aProduceExceptionwill be thrown.
AllConsume errors will result in aConsumeException with further information about the error and contextavailable via theError andConsumeResult fields.
There are numerous libraries that expand on the capabilities provided by Confluent.Kafka, or use Confluent.Kafkato integrate with Kafka. For more information, refer to the3rd Party Libraries page.
For a step-by-step guide on using the .NET client with Confluent Cloud seeGetting Started with Apache Kafka and .NET onConfluent Developer.
You can also refer to theConfluent Cloud example which demonstrates how to configure the .NET client for use withConfluent Cloud.
Instructions on building and testing confluent-kafka-dotnet can be foundhere.
Copyright (c)2016-2019Confluent Inc.2015-2016Andreas Heider
KAFKA is a registered trademark of The Apache Software Foundation and has been licensed for useby confluent-kafka-dotnet. confluent-kafka-dotnet has no affiliation with and is not endorsed byThe Apache Software Foundation.
About
Confluent's Apache Kafka .NET client
Topics
Resources
License
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Packages0
Uh oh!
There was an error while loading.Please reload this page.