- Notifications
You must be signed in to change notification settings - Fork17
Minimal F# wrappers for Confluent.Kafka+librdkafka.redist 1.x
License
jet/FsKafka
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
F# friendly wrapper forConfluent.Kafka
, with minimal dependencies or additional abstractions (but seerelated repos).
FsKafka
wrapsConfluent.Kafka
to provide efficient batched Kafka Producer and Consumer configurations with basic logging instrumentation.Depends onConfluent.Kafka [1.9.2]
,librdkafka.redist [1.9.2]
(pinned to ensure we use a tested pairing),Serilog
(but no specific Serilog sinks, i.e. you configure to emit toNLog
etc) andNewtonsoft.Json
(used internally to parse Broker-provided Statistics for logging purposes).
FsKafka is delivered as aNuget package targetingnetstandard2.0
and F# >= 4.5.
dotnet add package FsKafka
or forpaket
, use:
paket add FsKafka
- Seethe Propulsion repo for extended Producers and Consumers.
- Seethe Jet
dotnet new
templates repo'sproProjector
template (in-k
mode) for example producer logic using theBatchedProducer
and theproConsumer
template for examples of using theBatchedConsumer
fromFsKafka
, alongside the extended modes inPropulsion
. - Seethe Equinox QuickStart for examples of using this library to project to Kafka from
Equinox.Cosmos
and/orEquinox.EventStore
.
Contributions of all sizes are warmly welcomed. Seeour contribution guide
The best place to start, sample-wise is from thedotnet new
templates storedin a dedicated repo.
Thetemplates are the best way to see how to consume it; these instructions are intended mainly for people looking to make changes.
NB The tests are reliant on aTEST_KAFKA_BROKER
environment variable pointing to a Broker that has been configured to auto-create ephemeral Kafka Topics as required by the tests (each test run writes to a guid-named topic)
export TEST_KAFKA_BROKER="<server>:9092"dotnet build build.proj-v n
The
BatchedConsumer
implementation tries to give clear feedback as to when reading is not keeping up, for diagnostic purposes. As of#32, such messages are tagged with the typeFsKafka.Core.InFlightMessageCounter
, and as such can be silenced by including the following in one'sLoggerConfiguration()
:.MinimumLevel.Override(FsKafka.Core.Constants.messageCounterSourceContext, Serilog.Events.LogEventLevel.Warning)
This code results from building out an end-to-end batteries-included set of libraries and templates as part of theEquinox project.
Equinox places some key constraints on all components and dependencies:-
- batteries-included examples of end-to-end functionality within the Equinox remit;samples should have clean consistent wiring
- pick a well-established base library, try not to add new concepts
- low dependencies, so it can work in lots of contexts without egregiously forcing you to upgrade things
- aim to add any resilience features as patches to upstream repos
- thorough test coverage; integration coverage for core wrapped functionality, unit tests for any non-trivial logic in the wrapper library
#r"nuget:FsKafka"openConfluent.KafkaopenFsKafkaletlog= Serilog.LoggerConfiguration().CreateLogger()letbatching= Batching.Linger(System.TimeSpan.FromMilliseconds10.)letproducerConfig= KafkaProducerConfig.Create("MyClientId","kafka:9092", Acks.All, batching)letproducer= KafkaProducer.Create(log, producerConfig,"MyTopic")letkey= Guid.NewGuid().ToString()letdeliveryResult= producer.ProduceAsync(key,"Hello World!")|> Async.RunSynchronously
#r"nuget:FsKafka"openConfluent.KafkaopenFsKafkaletlog= Serilog.LoggerConfiguration().CreateLogger()lethandler(messages:ConsumeResult<string,string>[])=async{for min messagesdo printfn"Received:%s" m.Message.Value}letcfg= KafkaConsumerConfig.Create("MyClientId","kafka:9092",["MyTopic"],"MyGroupId", AutoOffsetReset.Earliest)async{use consumer= BatchedConsumer.Start(log, cfg, handler)return! consumer.AwaitShutdown()}|> Async.RunSynchronously
#r"nuget:FsKafka"openConfluent.KafkaopenFsKafkaletlog= Serilog.LoggerConfiguration().CreateLogger()lethandler(messages:ConsumeResult<string,string>[])=async{for min messagesdo printfn"Received:%s" m.Message.Value}letcfg= KafkaConsumerConfig.Create("MyClientId","kafka:9092",["MyTopic"],"MyGroupId", AutoOffsetReset.Earliest)async{use consumer= BatchedConsumer.Start(log, cfg, handler)use _= KafkaMonitor(log).Start(consumer.Inner, cfg.Inner.GroupId)return! consumer.AwaitShutdown()}|> Async.RunSynchronously
#r"nuget:FsKafka"openConfluent.KafkaopenFsKafkaletlog= Serilog.LoggerConfiguration().CreateLogger()lethandler(messages:ConsumeResult<string,string>[])=async{for min messagesdo printfn"Received:%s" m.Message.Value}letconfig topic= KafkaConsumerConfig.Create("MyClientId","kafka:9092",[topic],"MyGroupId", AutoOffsetReset.Earliest)letcfg1,cfg2= config"MyTopicA", config"MyTopicB"async{use consumer1= BatchedConsumer.Start(log, cfg1, handler)use consumer2= BatchedConsumer.Start(log, cfg2, handler)use _= KafkaMonitor(log).Start(consumer1.Inner, cfg1.Inner.GroupId)use _= KafkaMonitor(log).Start(consumer2.Inner, cfg2.Inner.GroupId)return! Async.Parallel[consumer1.AwaitWithStopOnCancellation(); consumer2.AwaitWithStopOnCancellation()]}|> Async.RunSynchronously
About
Minimal F# wrappers for Confluent.Kafka+librdkafka.redist 1.x
Topics
Resources
License
Security policy
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Packages0
Uh oh!
There was an error while loading.Please reload this page.
Contributors13
Uh oh!
There was an error while loading.Please reload this page.