Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Kafka Serialization Playground

License

NotificationsYou must be signed in to change notification settings

fillmore-labs/kafka-sensors

Repository files navigation

Purpose

This source demonstrates how to process a stream of sensor data usingKafka Streams.

The sensors produce a stream of records, including sensor ID, a timestamp and the current state (onor off). The desired result is a stream of records enriched with the duration the sensor has been inthis state.

Example

For example, a stream

Table 1. Sensor Data
NameTimestampState

Sensor 1

1984-01-22T15:45:00Z

off

Sensor 1

1984-01-22T15:45:10Z

off

Sensor 1

1984-01-22T15:45:30Z

on

Sensor 1

1984-01-22T15:46:30Z

off

should produce

Table 2. Enriched Data
NameTimestampStateDuration

Sensor 1

1984-01-22T15:45:00Z

off

10s

Sensor 1

1984-01-22T15:45:00Z

off

30s

Sensor 1

1984-01-22T15:45:30Z

on

60s

Which tells us that “Sensor 1” was “off” from 15:45:00 for 30 seconds and “on” from 15:45:30 for 60seconds.

Note that the second “off” reading produced an intermediate result.

Design decisions

Duplicate readings of the same state generate intermediate results, and delayed readings (timestampspreceding previously seen values) are treated as errors.

These are deliberate choices and can easily be changed.

Implementation of Business Logic

Care has been taken to keep the business logic independent of implementation details likeserialization formats.

The data model is in themodel directory, thebusiness logic inlogic.

Thetests test the topology with ninedifferent formats,Protocol Buffers,JSON,Apache Avro, theConfluent variants of these three,XML,Apache Thrift andAmazon Ion. Different, random combinations of input, result, andstate store formats are tested.

While this abstraction might not be necessary in practice, it demonstrates two important designconsiderations:

  • The business logic should only depend on a data model, not capabilities of the serializationmechanism.

We can simply useDuration::between,which is a simple call and easy to understand and test, instead of cluttering our logic withconversions and unnecessary error-prone calculations.

  • The choice of (de-)serializers should depend on the requirements, not on what is just at hand.

While internal processing pipelines tend (but don’t have) to use one serialization mechanism, it isperfectly valid and a good design decision to use different mechanisms for parts interfacing withexternal components.

Since the business logic is independent of the serialization mechanism, changing it is simple andusually does not require retesting.

By refactoring the business logic to depend only on an abstract store, we speed up testing by afactor of seven(bazel test //src/test/java/com/fillmore_labs/kafka/sensors/logic:all vs.bazel test //src/test/java/com/fillmore_labs/kafka/sensors/topology:all), which demonstrates a potentialfor improvement in development speed and testability.

Running

Prerequisites

You needBazelisk installed, see alsoInstalling Bazel using Bazelisk.

macOS

UsingHomeBrew enter

brew install bazelisk

Windows

UsingChocolatey enter

choco install bazelisk

Enable developer mode:

  1. Open Windows settings

  2. Go to “Update & security”, then “For developers”

  3. Under “Developer Mode” section enable “Install apps from any source, including loose files”.

or run with administrator privileges.

Tests

To run all tests, use

bazeltest //src/test/...

To run a single test, use

bazeltest //src/test/java/com/fillmore_labs/kafka/sensors/topology:all

The tests run with an embedded Kafka and mock schema registry, when necessary.

Main App

The main app needs Kafka running atlocalhost, port 9092 (seeapplication.yaml). There is a script doing that:

scripts/kafka-server.sh

When Kafka has finished starting, create the topics in a different terminal:

scripts/kafka-topics.sh

Now start the main app:

bazel run //:kafka-sensors

Open another terminal to watch the results:

scripts/kafka-consume.sh

Publish sensor values:

scripts/kafka-produce.sh

Benchmark

Run theJMH microbenchmarks with

bazel run //:benchmark

Compare deserialization of two formats:

bazel run //:benchmark -- -p"format=proto,thrift""Bench\\.deserialize"

Generate a flame graph for detailed analysis:

bazel run //:benchmark -- -p"format=proto""Bench\\.deserialize" \  -prof"async:output=flamegraph;direction=forward"open"$(bazel info bazel-bin)/src/main/java/com/fillmore_labs/kafka/sensors/benchmark/benchmark.runfiles/com_fillmore_labs_kafka_sensors/com.fillmore_labs.kafka.sensors.benchmark.Bench.deserialize-AverageTime-format-proto/flame-cpu-forward.html"

Run the latest image on your Kubernetes cluster:

kubectl run serialization-benchmark --image=fillmorelabs/serialization-benchmark \  --attach --rm --restart=Never -- -p"format=proto,json,json-iso""Bench\\.serialize"

Notes

Mapping

As noted inImplementation of Business Logic the business login is independent of theserialization, in the spirit of hexagonal architecture. This of course requires some mapping,where we mostly useMapStruct for. This necessitates some limitations indata model naming conventions. MapStruct uses a fixed und quite inflexible accessor naming strategy,so you can’t really decide that Protocol Buffers should have one convention but Immutables another.Especially for Immutables we are forced to use JavaBeans-style naming convention, although this isnot a JEE application.


[8]ページ先頭

©2009-2025 Movatter.jp