- Notifications
You must be signed in to change notification settings - Fork17
Minimal F# wrappers for Confluent.Kafka+librdkafka.redist 1.x
License
jet/FsKafka
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
F# friendly wrapper forConfluent.Kafka
, with minimal dependencies or additional abstractions (but seerelated repos).
FsKafka
wrapsConfluent.Kafka
to provide efficient batched Kafka Producer and Consumer configurations with basic logging instrumentation.Depends onConfluent.Kafka [1.9.2]
,librdkafka.redist [1.9.2]
(pinned to ensure we use a tested pairing),Serilog
(but no specific Serilog sinks, i.e. you configure to emit toNLog
etc) andNewtonsoft.Json
(used internally to parse Broker-provided Statistics for logging purposes).
FsKafka is delivered as aNuget package targetingnetstandard2.0
and F# >= 4.5.
dotnet add package FsKafka
or forpaket
, use:
paket add FsKafka
- Seethe Propulsion repo for extended Producers and Consumers.
- Seethe Jet
dotnet new
templates repo'sproProjector
template (in-k
mode) for example producer logic using theBatchedProducer
and theproConsumer
template for examples of using theBatchedConsumer
fromFsKafka
, alongside the extended modes inPropulsion
. - Seethe Equinox QuickStart for examples of using this library to project to Kafka from
Equinox.Cosmos
and/orEquinox.EventStore
.
Contributions of all sizes are warmly welcomed. Seeour contribution guide
The best place to start, sample-wise is from thedotnet new
templates storedin a dedicated repo.
Thetemplates are the best way to see how to consume it; these instructions are intended mainly for people looking to make changes.
NB The tests are reliant on aTEST_KAFKA_BROKER
environment variable pointing to a Broker that has been configured to auto-create ephemeral Kafka Topics as required by the tests (each test run writes to a guid-named topic)
export TEST_KAFKA_BROKER="<server>:9092"dotnet build build.proj-v n
The
BatchedConsumer
implementation tries to give clear feedback as to when reading is not keeping up, for diagnostic purposes. As of#32, such messages are tagged with the typeFsKafka.Core.InFlightMessageCounter
, and as such can be silenced by including the following in one'sLoggerConfiguration()
:.MinimumLevel.Override(FsKafka.Core.Constants.messageCounterSourceContext, Serilog.Events.LogEventLevel.Warning)
This code results from building out an end-to-end batteries-included set of libraries and templates as part of theEquinox project.
Equinox places some key constraints on all components and dependencies:-
- batteries-included examples of end-to-end functionality within the Equinox remit;samples should have clean consistent wiring
- pick a well-established base library, try not to add new concepts
- low dependencies, so it can work in lots of contexts without egregiously forcing you to upgrade things
- aim to add any resilience features as patches to upstream repos
- thorough test coverage; integration coverage for core wrapped functionality, unit tests for any non-trivial logic in the wrapper library
#r"nuget:FsKafka"openConfluent.KafkaopenFsKafkaletlog= Serilog.LoggerConfiguration().CreateLogger()letbatching= Batching.Linger(System.TimeSpan.FromMilliseconds10.)letproducerConfig= KafkaProducerConfig.Create("MyClientId","kafka:9092", Acks.All, batching)letproducer= KafkaProducer.Create(log, producerConfig,"MyTopic")letkey= Guid.NewGuid().ToString()letdeliveryResult= producer.ProduceAsync(key,"Hello World!")|> Async.RunSynchronously
#r"nuget:FsKafka"openConfluent.KafkaopenFsKafkaletlog= Serilog.LoggerConfiguration().CreateLogger()lethandler(messages:ConsumeResult<string,string>[])=async{for min messagesdo printfn"Received:%s" m.Message.Value}letcfg= KafkaConsumerConfig.Create("MyClientId","kafka:9092",["MyTopic"],"MyGroupId", AutoOffsetReset.Earliest)async{use consumer= BatchedConsumer.Start(log, cfg, handler)return! consumer.AwaitShutdown()}|> Async.RunSynchronously
#r"nuget:FsKafka"openConfluent.KafkaopenFsKafkaletlog= Serilog.LoggerConfiguration().CreateLogger()lethandler(messages:ConsumeResult<string,string>[])=async{for min messagesdo printfn"Received:%s" m.Message.Value}letcfg= KafkaConsumerConfig.Create("MyClientId","kafka:9092",["MyTopic"],"MyGroupId", AutoOffsetReset.Earliest)async{use consumer= BatchedConsumer.Start(log, cfg, handler)use _= KafkaMonitor(log).Start(consumer.Inner, cfg.Inner.GroupId)return! consumer.AwaitShutdown()}|> Async.RunSynchronously
#r"nuget:FsKafka"openConfluent.KafkaopenFsKafkaletlog= Serilog.LoggerConfiguration().CreateLogger()lethandler(messages:ConsumeResult<string,string>[])=async{for min messagesdo printfn"Received:%s" m.Message.Value}letconfig topic= KafkaConsumerConfig.Create("MyClientId","kafka:9092",[topic],"MyGroupId", AutoOffsetReset.Earliest)letcfg1,cfg2= config"MyTopicA", config"MyTopicB"async{use consumer1= BatchedConsumer.Start(log, cfg1, handler)use consumer2= BatchedConsumer.Start(log, cfg2, handler)use _= KafkaMonitor(log).Start(consumer1.Inner, cfg1.Inner.GroupId)use _= KafkaMonitor(log).Start(consumer2.Inner, cfg2.Inner.GroupId)return! Async.Parallel[consumer1.AwaitWithStopOnCancellation(); consumer2.AwaitWithStopOnCancellation()]}|> Async.RunSynchronously
About
Minimal F# wrappers for Confluent.Kafka+librdkafka.redist 1.x