- Notifications
You must be signed in to change notification settings - Fork11
A Kafka Serde that reads and writes records from and to Blob storage (S3, Azure, Google) transparently.
License
bakdata/kafka-large-message-serde
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
A Kafka Serde that reads and writes records from and to a blob storage, such as Amazon S3, Azure Blob Storage, and Google Cloud Storage, transparently.Formerly known as kafka-s3-backed-serde.
You can add kafka-large-message-serde via Maven Central.
implementationgroup:'com.bakdata.kafka',name:'large-message-serde',version:'2.0.0'
<dependency> <groupId>com.bakdata.kafka</groupId> <artifactId>large-message-serde</artifactId> <version>2.0.0</version></dependency>
For other build tools or versions, refer to thelatest version in MvnRepository.
You can use it from your Kafka Streams application like any other Serde
finalSerde<String>serde =newLargeMessageSerde<>();serde.configure(Map.of(AbstractLargeMessageConfig.BASE_PATH_CONFIG,"s3://my-bucket/",LargeMessageSerdeConfig.VALUE_SERDE_CLASS_CONFIG,Serdes.StringSerde.class),false);
The following configuration options are available:
large.message.key.serdeKey serde class to use. All serde configurations are also delegated to this serde.
- Type: class
- Default:
org.apache.kafka.common.serialization.Serdes$ByteArraySerde - Importance: high
large.message.value.serdeValue serde class to use. All serde configurations are also delegated to this serde.
- Type: class
- Default:
org.apache.kafka.common.serialization.Serdes$ByteArraySerde - Importance: high
large.message.base.pathBase path to store data. Must include bucket and any prefix that should be used, e.g.,s3://my-bucket/my/prefix/. Available protocols:s3,abs.
- Type: string
- Default: ""
- Importance: high
large.message.max.byte.sizeMaximum serialized message size in bytes before messages are stored on blob storage.
- Type: int
- Default: 1000000
- Importance: medium
large.message.use.headersEnable if Kafka message headers should be used to distinguish blob storage backed messages. This is disabled by defaultfor backwards compatibility but leads to increased memory usage. It is recommended to enable this option.
- Type: boolean
- Default: false
- Importance: medium
large.message.accept.no.headersEnable if messages read with no headers should be treated as non-backed messages. This allows enabling of large messagebehavior for data that has been serialized using the wrapped serializer.
- Type: boolean
- Default: false
- Importance: medium
large.message.id.generatorClass to use for generating unique object IDs. Available generators are:com.bakdata.kafka.RandomUUIDGenerator,com.bakdata.kafka.Sha256HashIdGenerator,com.bakdata.kafka.MurmurHashIdGenerator.
- Type: class
- Default:
com.bakdata.kafka.RandomUUIDGenerator - Importance: medium
large.message.s3.access.keyAWS access key to use for connecting to S3. Leave empty if AWS credential provider chain or STS Assume Role provider should be used.
- Type: password
- Default: ""
- Importance: low
large.message.s3.secret.keyAWS secret key to use for connecting to S3. Leave empty if AWS credential provider chain or STS Assume Role provider should be used.
- Type: password
- Default: ""
- Importance: low
large.message.s3.sts.role.arnAWS STS role ARN to use for connecting to S3. Leave empty if AWS Basic provider or AWS credential provider chain should be used.
- Type: string
- Default: ""
- Importance: low
large.message.s3.role.external.idAWS STS role external ID used when retrieving session credentials under an assumed role. Leave empty if AWS Basic provider or AWS credential provider chain should be used.
- Type: string
- Default: ""
- Importance: low
large.message.s3.role.session.nameAWS STS role session name to use when starting a session. Leave empty if AWS Basic provider or AWS credential provider chain should be used.
- Type: string
- Default: ""
- Importance: low
large.message.s3.jwt.pathPath to an OIDC token file in JSON format (JWT) used to authenticate before AWS STS role authorisation, e.g. for EKS/var/run/secrets/eks.amazonaws.com/serviceaccount/token.
- Type: string
- Default: ""
- Importance: low
large.message.s3.regionS3 region to use. Leave empty if default S3 region should be used.
- Type: string
- Default: ""
- Importance: low
large.message.s3.endpointEndpoint to use for connection to Amazon S3. Leave empty if default S3 endpoint should be used.
- Type: string
- Default: ""
- Importance: low
large.message.s3.path.style.accessEnable path-style access for S3 client.
- Type: boolean
- Default: false
- Importance: low
large.message.s3.request.checksum.calculationAWS request checksum validation mode to use when uploading to S3. Leave empty to use the AWS SDK default.
- Type: string
- Default: "WHEN_SUPPORTED"
- Importance: low
large.message.abs.connection.stringAzure connection string for connection to blob storage. Leave empty if Azure credential provider chain should be used.
- Type: password
- Default: ""
- Importance: low
large.message.gs.key.pathGoogle service account key JSON path. Leave empty If the environment variable GOOGLE_APPLICATION_CREDENTIALS is setor if you want to use the default service account provided by your computer engine. For more information aboutauthenticating as a service account pleaserefer to themain documentation.
- Type: string
- Default: ""
- Importance: low
large.message.compression.typeThe compression type for data stored in blob storage. The default isnone (i.e. no compression). Valid values arenone,gzip,snappy,lz4 andzstd.Note: this option is only available whenlarge.message.use.headers is enabled.
- Type: string
- Default: "none"
- Importance: low
This serde also comes with support for Kafka Connect.You can add kafka-large-message-connect via Maven Central.
implementationgroup:'com.bakdata.kafka',name:'large-message-connect',version:'1.1.6'
<dependency> <groupId>com.bakdata.kafka</groupId> <artifactId>large-message-connect</artifactId> <version>2.0.0</version></dependency>
For other build tools or versions, refer to thelatest version in MvnRepository.
To use it with your Kafka Connect jobs, just configure your converter ascom.bakdata.kafka.LargeMessageConverter.
In addition to the configurations available for the serde (exceptlarge.message.key.serde andlarge.message.value.serde),you can configure the following:
large.message.converterConverter to use. All converter configurations are also delegated to this converter.
- Type: class
- Default:
org.apache.kafka.connect.converters.ByteArrayConverter - Importance: high
For general guidance on how to configure Kafka Connect converters, please have a look at theofficial documentation.
We also provide a method for cleaning up all files on the blob storage associated with a topic:
finalMap<String,Object>properties = ...;finalAbstractLargeMessageConfigconfig =newAbstractLargeMessageConfig(properties);finalLargeMessageStoringClientstorer =config.getStorer();storer.deleteAllFiles("topic");
If you want to contribute to this project, you can simply clone the repository and build it via Gradle.All dependencies should be included in the Gradle files, there are no external prerequisites.
> git clone git@github.com:bakdata/kafka-large-message-serde.git>cd kafka-large-message-serde&& ./gradlew build
Please note, that we havecode styles for Java.They are basically the Google style guide, with some small modifications.
We are happy if you want to contribute to this project.If you find any bugs or have suggestions for improvements, please open an issue.We are also happy to accept your PRs.Just open an issue beforehand and let us know what you want to do and why.
This project is licensed under the MIT license.Have a look at theLICENSE for more details.
About
A Kafka Serde that reads and writes records from and to Blob storage (S3, Azure, Google) transparently.
Topics
Resources
License
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Packages0
Uh oh!
There was an error while loading.Please reload this page.
Contributors14
Uh oh!
There was an error while loading.Please reload this page.