
Debezium Custom Converters
Creating custom converters using Debezium's new SPI to override value conversions
Introduction
Background about theTimestampConverter
Hey, my name is Oryan Moshe, and I started my own community for senior developers in Israel namedin.dev.
I'm also a Lead Architect atRivery, We are a fully managed data integration platform, and part of my job here is developing the ability to stream changes straight from the clients’ databases, to our platform, using change data capture (CDC).
The last time I coded in Java was 7 years ago, so if you have any suggestions to improve the code shown here please feel free to comment below!
You can find the converter right here:
https://github.com/oryanmoshe/debezium-timestamp-converter/
CDC — Change Data Capture
Before we talk about Debezium, we have to talk about CDC.
CDC is a way for us to get thechanges happening in the database (as opposed to theactual data)
This means we can actually getevery state thatevery record has been through in the database.
CDC is useful for a number of cases:
- Compiling a log of record changes
- Undoing (or reverting) a change
- Tracking record deletion (which is not simply a matter of using
SELECT
)
What is Debezium Anyway?
Debezium is an open source platform, maintained by Red Hat, that allows developers to implement CDC into a Kafka infrastructure.
Debezium actuates CDC by configuring connections using the provided Kafka Connect data connectors. Currently there's support for MySQL, PostgreSQL, Microsoft SQL Server, MongoDB, and even some limited support for Oracle.
What are converters, and why would we need a custom one?
All messages produced by Debezium are processed before entering the designated topic.
This ensures that all fields of a given type (defined by the schema) behave the same.
In other words,allDATE
fields onall of the databases will be transformed into the same format. This is, by default "Days since epoch".
But this behavior isn't always wanted, especially in this temporal example.
Forour particular use case we need all temporal fields to be in the same format, whether the type isDATE
,DATETIME
,DATETIME2
,TIME
orTIMESTAMP
.
The format we chose wasYYYY-MM-dd'T'HH:mm:ss.SSS'Z'
.
Creating a custom converter
Here's an explanation for each step needed to create ourTimestampConverter
.
The basics of custom converters
To allow such behavior, the Debezium SPI (Service Provider Interface) was added to Debezium Version 1.1.
This allows developers to make their own converters with Java, by creating a class that implements theio.debezium.spi.converter.CustomConverter
interface.
The First Gotcha
What we didn't know when we started developing this converter, is that once we registered a custom converter to a temporal column, Debezium's behavior became sporadic. Sometimes it'll pass aDATE
column as "Days since epoch", as expected, but sometimes it'll pass it as a string, matching the date format of the database it came from.
This meant we had to have all of our bases covered, both for numeric values (let's say, "Days since epoch") and for all date format databases can produce (YYYY-MM-dd
,dd/MM/YYYY
,YYYY-MMM-dd
, etc.)
Things gota bit complicated on the logic front, but let's not get into this right now.
What's needed for our custom converter to work
Each converter has to implement at least two methods to be harnessed by Debezium:
configure
This method runs when the connector is initialised. It accepts one argument:
props
An object of typejava.util.Properties
, containing all of the properties we passed to our converter instance.
converterFor
This method runs once for each column defined in our schema, and its job is to define (a.k.a "register") the converter for each. It accepts two arguments:
column
An object of typeio.debezium.spi.converter.RelationalColumn
, containing the definition of the column we're currently handling, including its name, type, size, table, etc.
registration
An object of typeio.debezium.spi.converter.CustomConverter.ConverterRegistration
, an internal definition, that has one method:register
.
Using theconfigure
method
As stated above, we use theconfigure
method to pass properties into our converter. This is important becausewe can use the same converter for multiple connectors, and change its behavior according to these properties.
For ourTimestampConverter
we wanted to pass four properties:
debug
– Indicates whether to print debug messages. Defaults tofalse
.format.date
– The format to convert all columns of typeDATE
. Defaults toYYYY-MM-dd
.format.time
– The format to convert all columns of typeTIME
. Defaults toHH:mm:ss
.format.datetime
– The format to convertall other temporal columns. Defaults toYYYY-MM-dd'T'HH:mm:ss.SSS'Z'
.
All of these properties are optional and have default values associated with them.
To support them we defined each of them as a class property with the default value. Inside theconfigure
method we assigned them with the passed value:
publicclassTimestampConverterimplementsCustomConverter<SchemaBuilder,RelationalColumn>{publicstaticfinalStringDEFAULT_DATETIME_FORMAT="yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";publicstaticfinalStringDEFAULT_DATE_FORMAT="YYYY-MM-dd";publicstaticfinalStringDEFAULT_TIME_FORMAT="HH:mm:ss.SSS";publicStringstrDatetimeFormat,strDateFormat,strTimeFormat;publicBooleandebug;privateSimpleDateFormatsimpleDatetimeFormatter,simpleDateFormatter,simpleTimeFormatter;@Overridepublicvoidconfigure(Propertiesprops){this.strDatetimeFormat=props.getProperty("format.datetime",DEFAULT_DATETIME_FORMAT);this.simpleDatetimeFormatter=newSimpleDateFormat(this.strDatetimeFormat);this.strDateFormat=props.getProperty("format.date",DEFAULT_DATE_FORMAT);this.simpleDateFormatter=newSimpleDateFormat(this.strDateFormat);this.strTimeFormat=props.getProperty("format.time",DEFAULT_TIME_FORMAT);this.simpleTimeFormatter=newSimpleDateFormat(this.strTimeFormat);this.debug=props.getProperty("debug","false").equals("true");this.simpleDatetimeFormatter.setTimeZone(TimeZone.getTimeZone("UTC"));this.simpleTimeFormatter.setTimeZone(TimeZone.getTimeZone("UTC"));}}
Using theconverterFor
method
Now it's time for the big moment. Each column must be converted to its respective format.
First of all, we have to understand the type of the column we're currently handling. This is determined usingcolumn.typeName
.
If the type is any of the temporal types (defined as a class constant) we handle it accordingly. If it's not, we do nothing, and Debezium will take control.
To tell Debezium to convert a specific column to something else, we need to use theregistration
passed to us. Thenregister
it, providing aschema
(create one of typestring
and make itoptional
) and a converter.
The converter is just a function, or in our case a lambda, that receives anObject
. This is the source value, and returns a value matching the schema we provided. In our case, we needed to return aString
(ornull
, because we made itoptional
).
@OverridepublicvoidconverterFor(RelationalColumncolumn,ConverterRegistration<SchemaBuilder>registration){if(SUPPORTED_DATA_TYPES.stream().anyMatch(s->s.toLowerCase().equals(column.typeName().toLowerCase()))){booleanisTime="time".equals(column.typeName().toLowerCase());registration.register(datetimeSchema,rawValue->{if(rawValue==null)returnrawValue;Longmillis=getMillis(rawValue.toString(),isTime);Instantinstant=Instant.ofEpochMilli(millis);DatedateObject=Date.from(instant);switch(column.typeName().toLowerCase()){case"time":returnsimpleTimeFormatter.format(dateObject);case"date":returnsimpleDateFormatter.format(dateObject);default:returnsimpleDatetimeFormatter.format(dateObject);}});}}
In this code snippet look at the two crucial parts we have mentioned before. These are the call toregistration.register
, and thereturn
statements.
Using a Custom Converter with Debezium
Installation
Installation in our Debezium cluster is straight-forward. We just need to add the.jar
file of the converter to the connector we want to use it in.
The Second Gotcha
Notice I said " ... to the connecter we want ... ", this is a thing Debezium didn't make clear in the documentation. We need to add this converter toevery connector if we want to use it in.
Let's say the base folder for connectors is/kafka/connect
. Then inside we'll find folders likedebezium-connector-mysql
, ordebezium-connector-postgres
.
We need to add the converter.jar
file to each of those folders if we intend to use it.
Configuration
After adding the.jar
file to our connector, we can configure our connectors to use it!
To do so all we need to do is add the following keys to our existing configuration:
"converters":"timestampConverter","timestampConverter.type":"oryanmoshe.kafka.connect.util.TimestampConverter"
If we need to customize the formats of specific data types, we can use these additional configuration keys:
"timestampConverter.format.time":"HH:mm:ss.SSS","timestampConverter.format.date":"YYYY-MM-dd","timestampConverter.format.datetime":"YYYY-MM-dd'T'HH:mm:ss.SSS'Z'","timestampConverter.debug":"false"
Conclusions
The addition of an SPI to Debezium brought a lot to the table in term of custom converters.
This allows us to get a tailored fit CDC connector, with the data streaming into our Kafka cluster exactly in the format we want.
I didn't include the actual logic, converting the values from their raw format into the epoch time (this part is contained in thegetMillis
method)
But I have published theTimestampConverter
as open source, so anyone can read the code there, use the converter in an application (be it as a.jar
file found in the releases section, or as a dependency found in the packages section), or contribute to its development!
Feel free to suggest contributions to this converter, and share with me what kind of convertersyou created using the new Debezium SPI, and which ones you wish were made!
Links
To read more about Debezium Custom Converter visit their official documentation:
https://debezium.io/documentation/reference/1.1/development/converters.html
Link to the repository of myTimestampConverter
:
https://github.com/oryanmoshe/debezium-timestamp-converter
Top comments(1)
For further actions, you may consider blocking this person and/orreporting abuse