Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up

Minimal F# wrappers for Confluent.Kafka+librdkafka.redist 1.x

License

NotificationsYou must be signed in to change notification settings

jet/FsKafka

Repository files navigation

F# friendly wrapper forConfluent.Kafka, with minimal dependencies or additional abstractions (but seerelated repos).

FsKafka wrapsConfluent.Kafka to provide efficient batched Kafka Producer and Consumer configurations with basic logging instrumentation.Depends onConfluent.Kafka [1.9.2],librdkafka.redist [1.9.2] (pinned to ensure we use a tested pairing),Serilog (but no specific Serilog sinks, i.e. you configure to emit toNLog etc) andNewtonsoft.Json (used internally to parse Broker-provided Statistics for logging purposes).

Usage

FsKafka is delivered as aNuget package targetingnetstandard2.0 and F# >= 4.5.

dotnet add package FsKafka

or forpaket, use:

paket add FsKafka

Related repos

  • Seethe Propulsion repo for extended Producers and Consumers.
  • Seethe Jetdotnet new templates repo'sproProjector template (in-k mode) for example producer logic using theBatchedProducer and theproConsumer template for examples of using theBatchedConsumer fromFsKafka, alongside the extended modes inPropulsion.
  • Seethe Equinox QuickStart for examples of using this library to project to Kafka fromEquinox.Cosmos and/orEquinox.EventStore.

CONTRIBUTING

Contributions of all sizes are warmly welcomed. Seeour contribution guide

TEMPLATES

The best place to start, sample-wise is from thedotnet new templates storedin a dedicated repo.

BUILDING

Thetemplates are the best way to see how to consume it; these instructions are intended mainly for people looking to make changes.

NB The tests are reliant on aTEST_KAFKA_BROKER environment variable pointing to a Broker that has been configured to auto-create ephemeral Kafka Topics as required by the tests (each test run writes to a guid-named topic)

build, including tests on netcoreapp3.1

export TEST_KAFKA_BROKER="<server>:9092"dotnet build build.proj-v n

FAQ

How do I get rid of all ~~~~thebreaking off polling ...resuming polling spam?

  • TheBatchedConsumer implementation tries to give clear feedback as to when reading is not keeping up, for diagnostic purposes. As of#32, such messages are tagged with the typeFsKafka.Core.InFlightMessageCounter, and as such can be silenced by including the following in one'sLoggerConfiguration():

    .MinimumLevel.Override(FsKafka.Core.Constants.messageCounterSourceContext, Serilog.Events.LogEventLevel.Warning)

What is this, why does it exist, where did it come from, is anyone using it ?

This code results from building out an end-to-end batteries-included set of libraries and templates as part of theEquinox project.

Equinox places some key constraints on all components and dependencies:-

  • batteries-included examples of end-to-end functionality within the Equinox remit;samples should have clean consistent wiring
  • pick a well-established base library, try not to add new concepts
  • low dependencies, so it can work in lots of contexts without egregiously forcing you to upgrade things
  • aim to add any resilience features as patches to upstream repos
  • thorough test coverage; integration coverage for core wrapped functionality, unit tests for any non-trivial logic in the wrapper library

Minimal producer example

#r"nuget:FsKafka"openConfluent.KafkaopenFsKafkaletlog= Serilog.LoggerConfiguration().CreateLogger()letbatching= Batching.Linger(System.TimeSpan.FromMilliseconds10.)letproducerConfig= KafkaProducerConfig.Create("MyClientId","kafka:9092", Acks.All, batching)letproducer= KafkaProducer.Create(log, producerConfig,"MyTopic")letkey= Guid.NewGuid().ToString()letdeliveryResult= producer.ProduceAsync(key,"Hello World!")|> Async.RunSynchronously

Minimal batched consumer example

#r"nuget:FsKafka"openConfluent.KafkaopenFsKafkaletlog= Serilog.LoggerConfiguration().CreateLogger()lethandler(messages:ConsumeResult<string,string>[])=async{for min messagesdo        printfn"Received:%s" m.Message.Value}letcfg= KafkaConsumerConfig.Create("MyClientId","kafka:9092",["MyTopic"],"MyGroupId", AutoOffsetReset.Earliest)async{use consumer= BatchedConsumer.Start(log, cfg, handler)return! consumer.AwaitShutdown()}|> Async.RunSynchronously

Minimal batched consumer example with monitor

#r"nuget:FsKafka"openConfluent.KafkaopenFsKafkaletlog= Serilog.LoggerConfiguration().CreateLogger()lethandler(messages:ConsumeResult<string,string>[])=async{for min messagesdo        printfn"Received:%s" m.Message.Value}letcfg= KafkaConsumerConfig.Create("MyClientId","kafka:9092",["MyTopic"],"MyGroupId", AutoOffsetReset.Earliest)async{use consumer= BatchedConsumer.Start(log, cfg, handler)use _= KafkaMonitor(log).Start(consumer.Inner, cfg.Inner.GroupId)return! consumer.AwaitShutdown()}|> Async.RunSynchronously

Running (and awaiting) a pair of consumers until either throws

#r"nuget:FsKafka"openConfluent.KafkaopenFsKafkaletlog= Serilog.LoggerConfiguration().CreateLogger()lethandler(messages:ConsumeResult<string,string>[])=async{for min messagesdo        printfn"Received:%s" m.Message.Value}letconfig topic= KafkaConsumerConfig.Create("MyClientId","kafka:9092",[topic],"MyGroupId", AutoOffsetReset.Earliest)letcfg1,cfg2= config"MyTopicA", config"MyTopicB"async{use consumer1= BatchedConsumer.Start(log, cfg1, handler)use consumer2= BatchedConsumer.Start(log, cfg2, handler)use _= KafkaMonitor(log).Start(consumer1.Inner, cfg1.Inner.GroupId)use _= KafkaMonitor(log).Start(consumer2.Inner, cfg2.Inner.GroupId)return! Async.Parallel[consumer1.AwaitWithStopOnCancellation(); consumer2.AwaitWithStopOnCancellation()]}|> Async.RunSynchronously

[8]ページ先頭

©2009-2025 Movatter.jp