Movatterモバイル変換


[0]ホーム

URL:


Skip to content
DEV Community
Log in Create account

DEV Community

Cover image for Debezium Custom Converters - TimestampConverter
Oryan Moshe
Oryan Moshe

Posted on • Originally published atin.dev

     

Debezium Custom Converters - TimestampConverter

Debezium Custom Converters

Creating custom converters using Debezium's new SPI to override value conversions

Introduction

Background about theTimestampConverter

Hey, my name is Oryan Moshe, and I started my own community for senior developers in Israel namedin.dev.

I'm also a Lead Architect atRivery, We are a fully managed data integration platform, and part of my job here is developing the ability to stream changes straight from the clients’ databases, to our platform, using change data capture (CDC).

The last time I coded in Java was 7 years ago, so if you have any suggestions to improve the code shown here please feel free to comment below!

You can find the converter right here:
https://github.com/oryanmoshe/debezium-timestamp-converter/

CDC — Change Data Capture

Before we talk about Debezium, we have to talk about CDC.

CDC is a way for us to get thechanges happening in the database (as opposed to theactual data)

This means we can actually getevery state thatevery record has been through in the database.

CDC is useful for a number of cases:

  • Compiling a log of record changes
  • Undoing (or reverting) a change
  • Tracking record deletion (which is not simply a matter of usingSELECT)

What is Debezium Anyway?

Debezium is an open source platform, maintained by Red Hat, that allows developers to implement CDC into a Kafka infrastructure.

Debezium actuates CDC by configuring connections using the provided Kafka Connect data connectors. Currently there's support for MySQL, PostgreSQL, Microsoft SQL Server, MongoDB, and even some limited support for Oracle.

What are converters, and why would we need a custom one?

All messages produced by Debezium are processed before entering the designated topic.

This ensures that all fields of a given type (defined by the schema) behave the same.

In other words,allDATE fields onall of the databases will be transformed into the same format. This is, by default "Days since epoch".

But this behavior isn't always wanted, especially in this temporal example.

Forour particular use case we need all temporal fields to be in the same format, whether the type isDATE,DATETIME,DATETIME2,TIME orTIMESTAMP.

The format we chose wasYYYY-MM-dd'T'HH:mm:ss.SSS'Z'.


Creating a custom converter

Here's an explanation for each step needed to create ourTimestampConverter.

The basics of custom converters

To allow such behavior, the Debezium SPI (Service Provider Interface) was added to Debezium Version 1.1.

This allows developers to make their own converters with Java, by creating a class that implements theio.debezium.spi.converter.CustomConverter interface.

The First Gotcha

What we didn't know when we started developing this converter, is that once we registered a custom converter to a temporal column, Debezium's behavior became sporadic. Sometimes it'll pass aDATE column as "Days since epoch", as expected, but sometimes it'll pass it as a string, matching the date format of the database it came from.

This meant we had to have all of our bases covered, both for numeric values (let's say, "Days since epoch") and for all date format databases can produce (YYYY-MM-dd,dd/MM/YYYY,YYYY-MMM-dd, etc.)

Things gota bit complicated on the logic front, but let's not get into this right now.

What's needed for our custom converter to work

Each converter has to implement at least two methods to be harnessed by Debezium:

configure

This method runs when the connector is initialised. It accepts one argument:

props
An object of typejava.util.Properties, containing all of the properties we passed to our converter instance.

converterFor

This method runs once for each column defined in our schema, and its job is to define (a.k.a "register") the converter for each. It accepts two arguments:

column
An object of typeio.debezium.spi.converter.RelationalColumn, containing the definition of the column we're currently handling, including its name, type, size, table, etc.

registration
An object of typeio.debezium.spi.converter.CustomConverter.ConverterRegistration, an internal definition, that has one method:register.

Using theconfigure method

As stated above, we use theconfigure method to pass properties into our converter. This is important becausewe can use the same converter for multiple connectors, and change its behavior according to these properties.

For ourTimestampConverter we wanted to pass four properties:

  • debug – Indicates whether to print debug messages. Defaults tofalse.
  • format.date – The format to convert all columns of typeDATE. Defaults toYYYY-MM-dd.
  • format.time – The format to convert all columns of typeTIME. Defaults toHH:mm:ss.
  • format.datetime – The format to convertall other temporal columns. Defaults toYYYY-MM-dd'T'HH:mm:ss.SSS'Z'.

All of these properties are optional and have default values associated with them.

To support them we defined each of them as a class property with the default value. Inside theconfigure method we assigned them with the passed value:

publicclassTimestampConverterimplementsCustomConverter<SchemaBuilder,RelationalColumn>{publicstaticfinalStringDEFAULT_DATETIME_FORMAT="yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";publicstaticfinalStringDEFAULT_DATE_FORMAT="YYYY-MM-dd";publicstaticfinalStringDEFAULT_TIME_FORMAT="HH:mm:ss.SSS";publicStringstrDatetimeFormat,strDateFormat,strTimeFormat;publicBooleandebug;privateSimpleDateFormatsimpleDatetimeFormatter,simpleDateFormatter,simpleTimeFormatter;@Overridepublicvoidconfigure(Propertiesprops){this.strDatetimeFormat=props.getProperty("format.datetime",DEFAULT_DATETIME_FORMAT);this.simpleDatetimeFormatter=newSimpleDateFormat(this.strDatetimeFormat);this.strDateFormat=props.getProperty("format.date",DEFAULT_DATE_FORMAT);this.simpleDateFormatter=newSimpleDateFormat(this.strDateFormat);this.strTimeFormat=props.getProperty("format.time",DEFAULT_TIME_FORMAT);this.simpleTimeFormatter=newSimpleDateFormat(this.strTimeFormat);this.debug=props.getProperty("debug","false").equals("true");this.simpleDatetimeFormatter.setTimeZone(TimeZone.getTimeZone("UTC"));this.simpleTimeFormatter.setTimeZone(TimeZone.getTimeZone("UTC"));}}
Enter fullscreen modeExit fullscreen mode

Using theconverterFor method

Now it's time for the big moment. Each column must be converted to its respective format.

First of all, we have to understand the type of the column we're currently handling. This is determined usingcolumn.typeName.

If the type is any of the temporal types (defined as a class constant) we handle it accordingly. If it's not, we do nothing, and Debezium will take control.

To tell Debezium to convert a specific column to something else, we need to use theregistration passed to us. Thenregister it, providing aschema (create one of typestring and make itoptional) and a converter.

The converter is just a function, or in our case a lambda, that receives anObject. This is the source value, and returns a value matching the schema we provided. In our case, we needed to return aString (ornull, because we made itoptional).

@OverridepublicvoidconverterFor(RelationalColumncolumn,ConverterRegistration<SchemaBuilder>registration){if(SUPPORTED_DATA_TYPES.stream().anyMatch(s->s.toLowerCase().equals(column.typeName().toLowerCase()))){booleanisTime="time".equals(column.typeName().toLowerCase());registration.register(datetimeSchema,rawValue->{if(rawValue==null)returnrawValue;Longmillis=getMillis(rawValue.toString(),isTime);Instantinstant=Instant.ofEpochMilli(millis);DatedateObject=Date.from(instant);switch(column.typeName().toLowerCase()){case"time":returnsimpleTimeFormatter.format(dateObject);case"date":returnsimpleDateFormatter.format(dateObject);default:returnsimpleDatetimeFormatter.format(dateObject);}});}}
Enter fullscreen modeExit fullscreen mode

In this code snippet look at the two crucial parts we have mentioned before. These are the call toregistration.register, and thereturn statements.

Using a Custom Converter with Debezium

Installation

Installation in our Debezium cluster is straight-forward. We just need to add the.jar file of the converter to the connector we want to use it in.

The Second Gotcha

Notice I said " ... to the connecter we want ... ", this is a thing Debezium didn't make clear in the documentation. We need to add this converter toevery connector if we want to use it in.

Let's say the base folder for connectors is/kafka/connect. Then inside we'll find folders likedebezium-connector-mysql, ordebezium-connector-postgres.

We need to add the converter.jar file to each of those folders if we intend to use it.

Configuration

After adding the.jar file to our connector, we can configure our connectors to use it!

To do so all we need to do is add the following keys to our existing configuration:

"converters":"timestampConverter","timestampConverter.type":"oryanmoshe.kafka.connect.util.TimestampConverter"
Enter fullscreen modeExit fullscreen mode

If we need to customize the formats of specific data types, we can use these additional configuration keys:

"timestampConverter.format.time":"HH:mm:ss.SSS","timestampConverter.format.date":"YYYY-MM-dd","timestampConverter.format.datetime":"YYYY-MM-dd'T'HH:mm:ss.SSS'Z'","timestampConverter.debug":"false"
Enter fullscreen modeExit fullscreen mode

Conclusions

The addition of an SPI to Debezium brought a lot to the table in term of custom converters.

This allows us to get a tailored fit CDC connector, with the data streaming into our Kafka cluster exactly in the format we want.

I didn't include the actual logic, converting the values from their raw format into the epoch time (this part is contained in thegetMillis method)

But I have published theTimestampConverter as open source, so anyone can read the code there, use the converter in an application (be it as a.jar file found in the releases section, or as a dependency found in the packages section), or contribute to its development!

Feel free to suggest contributions to this converter, and share with me what kind of convertersyou created using the new Debezium SPI, and which ones you wish were made!

Links

To read more about Debezium Custom Converter visit their official documentation:
https://debezium.io/documentation/reference/1.1/development/converters.html

Link to the repository of myTimestampConverter:
https://github.com/oryanmoshe/debezium-timestamp-converter

Top comments(1)

Subscribe
pic
Create template

Templates let you quickly answer FAQs or store snippets for re-use.

Dismiss
CollapseExpand
 
sagnaic profile image
Sagnaic
  • Joined

Hi I have added he JAR file to the plugin.path(base folder for connectors) folder where other debezium JAR files are also copied previously.

Starting the connect distributed process results in a error when i include the timestamp converter JAR file.

github.com/oryanmoshe/debezium-tim...

Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment'spermalink.

For further actions, you may consider blocking this person and/orreporting abuse

A code maker that loves to rant.
  • Location
    Tel Aviv
  • Work
    CTO @ Magnifica VR
  • Joined

Trending onDEV CommunityHot

DEV Community

We're a place where coders share, stay up-to-date and grow their careers.

Log in Create account

[8]ページ先頭

©2009-2025 Movatter.jp