Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings
This repository was archived by the owner on Jan 25, 2023. It is now read-only.

[Unmaintained] Karafka Sidekiq backend for background messages processing

License

NotificationsYou must be signed in to change notification settings

karafka/sidekiq-backend

Repository files navigation

Deprecation notice

This backend was designed to compensate for lack of multi-threading in Karafka. Karafka2.0is multi-threaded.

It isno longer needed and isno longer maintained.

About

Build StatusGem VersionJoin the chat at https://slack.karafka.io

Karafka Sidekiq Backend provides support for consuming (processing) received Kafka messages inside of Sidekiq workers.

Installations

Add this to your gemfile:

gem'karafka-sidekiq-backend'

and create a file calledapplication_worker.rb inside of yourapp/workers directory, that looks like that:

classApplicationWorker <Karafka::BaseWorkerend

and you are ready to go. Karafka Sidekiq Backend integrates with Karafka automatically

Note: You can name your application worker base class with any name you want. The only thing that is required is a direct inheritance from theKarafka::BaseWorker class.

Usage

If you want to process messages with Sidekiq backend, you need to tell this to Karafka.

To do so, you can either configure that in a configuration block:

classApp <Karafka::Appsetupdo |config|config.backend=:sidekiq# Other config options...endend

or on a per topic level:

App.routes.drawdoconsumer_group:videos_consumerdotopic:binary_video_detailsdobackend:sidekiqconsumerVideos::DetailsConsumerworkerWorkers::DetailsWorkerinterchangerInterchangers::MyCustomInterchanger.newendendend

You don't need to do anything beyond that. Karafka will know, that you want to run your consumer's#consume method in a background job.

Configuration

There are two options you can set inside of thetopic block:

OptionValue typeDescription
workerClassName of a worker class that we want to use to schedule perform code
interchangerInstanceInstance of an interchanger class that we want to use to pass the incoming data to Sidekiq

Workers

Karafka by default will build a worker that will correspond to each of your consumers (so you will have a pair - consumer and a worker). All of them will inherit fromApplicationWorker and will share all its settings.

To run Sidekiq you should have sidekiq.yml file inconfig folder. The example ofsidekiq.yml file will be generated to config/sidekiq.yml.example once you runbundle exec karafka install.

However, if you want to use a raw Sidekiq worker (without any Karafka additional magic), or you want to use SidekiqPro (or any other queuing engine that has the same API as Sidekiq), you can assign your own custom worker:

topic:incoming_messagesdoconsumerMessagesConsumerworkerMyCustomWorkerend

Note that even then, you need to specify a consumer that will schedule a background task.

Custom workers need to provide a#perform_async method. It needs to accept two arguments:

  • topic_id - first argument is a current topic id from which a given message comes
  • params_batch - all the params that came from Kafka + additional metadata. This data format might be changed if you use custom interchangers. Otherwise, it will be an instance of Karafka::Params::ParamsBatch.

Note: If you use custom interchangers, keep in mind, that params inside params batch might be in two states: parsed or unparsed when passed to #perform_async. This means, that if you use custom interchangers and/or custom workers, you might want to look into Karafka's sources to see exactly how it works.

Interchangers

Custom interchangers target issues with non-standard (binary, etc.) data that we want to store when we do#perform_async. This data might be corrupted when fetched in a worker (seethis issue). With custom interchangers, you can encode/compress data before it is being passed to scheduling and decode/decompress it when it gets into the worker.

To specify the interchanger for a topic, specify the interchanger inside routes like this:

App.routes.drawdoconsumer_group:videos_consumerdotopic:binary_video_detailsdoconsumerVideos::DetailsConsumerinterchangerInterchangers::MyCustomInterchanger.newendendend

Each custom interchanger should defineencode to encode params before they get stored in Redis, anddecode to convert the params to hash format, as shown below:

classBase64Interchanger < ::Karafka::Interchangerdefencode(params_batch)Base64.encode64(Marshal.dump(super))enddefdecode(params_string)Marshal.load(Base64.decode64(super))endend

Warning: if you decide to use slow interchangers, they might significantly slow down Karafka.

References

Note on contributions

First, thank you for considering contributing to the Karafka ecosystem! It's people like you that make the open source community such a great community!

Each pull request must pass all the RSpec specs, integration tests and meet our quality requirements.

Fork it, update and wait for the Github Actions results.

About

[Unmaintained] Karafka Sidekiq backend for background messages processing

Topics

Resources

License

Stars

Watchers

Forks

Contributors12

Languages


[8]ページ先頭

©2009-2025 Movatter.jp