Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

DbSink is a sink connector that provides a sink implementation for streaming changes emitted by Debezium

License

NotificationsYou must be signed in to change notification settings

1149782339/DbSink

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

14 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

DbSink is aKafka Sink Connector that provides a sink implementation for streaming changes emitted byDebezium

Features

1.100% compatible with debezium and able to process data change event, schema change event and transaction event without requiring any additional transform like 'ExtractNewRecordState'.

2.Supports both transaction-based and table-based parallel applying.

3.Supports for a variety of database dialect applying. Currently only Oracle to Postgres and MySQL to Postgres are supported, more database dialects are being developed and will be released in the near future.

4.Supports executing insert, update, and delete based on operation from the debezium event record without requiring additional configuration like 'insert mode'.

5.Support automatic switching to upsert mode to apply insert events when there are duplicate key errors.

6.Supports most data types and able to use the right way to process the data type from the source event based on the definition of the target column.

For example, the time data type of the MySQL can be inserted into either the time data type or the interval data type in the postgres database.

7.Support configuration of target table and column. By default, the connector uses the source table and column name as the target ones.

Connector Configuration Properties

connector.class

To use this connector, specify the name of the connector class in theconnector.class configuration property.

"connector.class": "io.dbsink.connector.sink.DbSinkConnector"

JDBC Group

jdbc.username

JDBC connection user.

  • Type: String
  • Default: null
  • Importantce: hight

jdbc.password

JDBC connection password.

  • Type: String
  • Default: null
  • Importantce: hight

jdbc.url

JDBC connection URL. For example:jdbc:postgresql://localhost:5432/migration",jdbc:mysql://localhost/db_name

  • Type: String
  • Default: null
  • Importantce: high

jdbc.driver.class

JDBC driver class. For example:org.postgresql.Driver,com.mysql.cj.jdbc.Driver,com.mysql.jdbc.Driver

  • Type: String
  • Default: null
  • Importantce: low

jdbc.retries.max

The maximum number of retries to call JDBC interface when SQLException hanppes. The value must be a positive integer.

  • Type: int
  • Default: 5
  • Importantce: low

jdbc.backoff.ms

The backoff time in milliseconds between JDBC retries.

  • Type: long
  • Default: 3000
  • Importantce: low

Applier Group

applier.parallel.max

The Maximum number of threads to apply.

  • Type: int
  • Default: cpu cores * 1.4
  • Importantce: high

applier.transaction.enabled

Whether to apply transactionally. Requires debezium configuration propertiesprovide.transaction.metadata to be true and all the incoming events with the same topic name. (use transforms)

  • Type: boolean
  • Default: false
  • Importantce: high

applier.transaction.buffer.size

Specifies how many transction to cache in the buffer.

  • Type: int
  • Default: 50
  • Importantce: high

applier.worker.buffer.size

Specifies how many transctions that an applier worker can cache in its buffer.

  • Type: int
  • Default: 100
  • Importantce: high

table.naming.strategy

Specifies the fully-qualified class name of aTableNamingStrategy implementation that the connector uses to resolve table names from incoming event s. DefaultTableNamingStrategy,LowCaseTableNamingStrategy,UpperCaseTableNamingStrategy are availables.

  • Type: class
  • Default: io.dbsink.connector.sink.naming.DefaultTableNamingStrategy
  • Importantce: high

column.naming.strategy

Specifies the fully-qualified class name of aColumnNamingStrategy implementation that the connector uses to resolve column names from incoming events. DefaultColumNamingStrategy,LowCaseColumNamingStrategy,UpperCaseColumNamingStrategy are availables.

  • Type: class
  • Default: io.dbsink.connector.sink.naming.DefaultColumNamingStrategy
  • Importantce: high

Build guidance

jdk version: >=11

maven: >=3.0

mvn clean package -Dmaven.test.skip=true

note: The dependencies like jdbc drivers are not to be packaged into the jar, you need to manually put them to in the plugin path of kafka connect.

Data type mapping

MySQL to Postgres

mysqlpostgresdescription
float(n)float(n)Float in postgres is a standard type while mysql is not, so this may be errors
float(n,p)float(n) or decimal(n,p)No fully equivalent data type, so may be errors
double(n,p)double precision or decimal(n,p)No fully equivalent data type, so may be errors
doubledouble precisionDouble in postgres is a standard type while mysql is not, so may be errors
decimal(n,p)decimal(n,p)
bigintbigint
mediumintint
intint
smallintsmallint
tinyintsmallint
timestamp(n)timestamp(n) with time zoneEnsure that MySQL and Postgres time zones are the same, in which case no error.
datetime(n)timestamp(n) without time zone
time(n)time(n) or interval day to second0-23:59:59.999999: time(n) ---> time(n). otherwise time(n) --> interval day to second
yearsmallint or interval year
bit(n)bit(m) m>=n
bit(1)bit(1) or bool
tinyint(1)bool
binary(n)bytea
blobbytea
tinyblobbytea
mediumblobbytea
longblobbytea
char(n)char(n)
varchar(n)varchar(n)
tinytexttext
texttext
mediumtexttext
jsonjson

Oracle to Postgres

oraclepostgresdescription
number(n,p)numeric(n,p)
binary_floatfloat
binary_doubledouble precision
integerint
char(n)char(n)
nvarchar2(n)varchar(n)
varchar2(n)varchar(n)
clobtext
nclobtext
blobbytea
rawbytea
long rawbytea
longbytea
datetimestamp(0) without time zone
timestamp(n)timestamp(n) without time zoneNumbers exceeding 6 digits of the Decimal separator will be truncated
timestamp(n) with time zonetimestamp(n) with time zoneNumbers exceeding 6 digits of the Decimal separator will be truncated
timestamp(n) with local time zonetimestamp(n) with time zoneNumbers exceeding 6 digits of the Decimal separator will be truncated
interval year to monthinterval year to month
interval day to monthinterval day to month

Best Practices

Here are some examples to introduce the usage method and its application scenarios

OLTP business system online migration

A trading system is preparing to migrate from MySQL database to Postgres database. In order to verify whether the Postgres database can meet business needs, it is necessary to establish real-time stream replication between MySQL and Postgres, and switch the read-only business of the system to the Postgres database to verify whether the business system can run normally. Now follow these steps to implement it.

step1

You need to manually or use third-party tools to convert table schema in mysql to Postgres. The mapping of data types can refer to the [MySQL to Postgres](MySQL to Postgres)

Example:

Mysql:

create database shop;create table shop.user(id int auto_increment primary key, name varchar(50), create_time timestamp(6))

Postgres:

create schema shop;create table shop.user(id serial primary key, name varchar(50), create_time timestamp(6))

note: the database in mysql will be converted to database in postgres.

step2

You need download zookeeper,kafka, jre(>=jre11) to create a running environment for the Debezium connector and DbSink connector. There are many such documents online, so we won't go into detail here.

step3

Write the configuration json for the debezium connector. Here a template is provided directly.

{    "name": "mysql_debezium_source_connector",     "config": {    "connector.class": "io.debezium.connector.mysql.MySqlConnector",    "database.allowPublicKeyRetrieval": true,    "database.user": "wangwei",    "database.server.id": "1000",    "tasks.max": 1,    "database.include.list": "shop",    "provide.transaction.metadata": true,    "schema.history.internal.kafka.bootstrap.servers": "localhost:9092",    "database.port": 3307,    "tombstones.on.delete": false,    "topic.prefix": "olap_migration",    "schema.history.internal.kafka.topic": "olap_migration_schema_history",    "database.hostname": "192.168.142.129",    "database.password": "19961028",    "snapshot.mode": "initial",    "snapshot.max.threads": 10,    "heartbeat.interval.ms": 10000,    "transforms":"Reroute",     "transforms.Reroute.type":"io.debezium.transforms.ByLogicalTableRouter",    "transforms.Reroute.topic.regex":"(.*)",    "transforms.Reroute.topic.replacement":"all_topics"    }}

Here are some special configuration items that you need to notice.

1.snapshot.mode

initial means to capture both snapshot and incremental data.

2.provide.transaction.metadata

In this case, we need get transaction metadata from the source side in order to apply data in transaction, so this configuration item must be set to true

3.transforms

Here we configures the 'ByLogicalTableRouter' transformer to convert all the topics of source records to the same topic('all_topics'). By default, the topic of a record produced by debezium is related to the table identifier. In this way, the order of records consumed by the sink connector may be different from the order produced by the source connector, because events in different tables will have different topics, only records within the same topic can be ordered

After configuring json, send a post request to kafka Connect to create a connector

step4

Write the configuration json for the DbSink connector. Here a template is provided directly.

{    "name": "DbSinkConnector",    "config": {        "connector.class": "io.dbsink.connector.sink.DbSinkConnector",        "jdbc.password": "wangwei123",        "jdbc.username": "wangwei",        "tasks.max": 1,        "topics": "all_topics",        "jdbc.url": "jdbc:postgresql://localhost:5432/migration",        "database.dialect.name": "PostgreSqlDialect",        "jdbc.driver.class": "org.postgresql.Driver",        "jdbc.retries.max": 5,        "jdbc.backoff.ms": 6000,        "applier.parallel.max": 50,        "applier.transaction.enabled": "true",        "applier.transaction.buffer.size": 10000,        "applier.worker.buffer.size": 100,        "table.naming.strategy": "io.dbsink.connector.sink.naming.DefaultTableNamingStrategy",        "column.naming.strategy": "io.dbsink.connector.sink.naming.DefaultColumnNamingStrategy"    }}

Here are some special configuration items that you need to notice.

1.apply.transaction.enabled

It specifies whether to apply in a transactional manner. in this case it's true

2.apply.transaction.buffer.size

it specifies the maximum number of cached transactions, the larger this value, the larger the heap memory consumes.

3.applier.worker.buffer.size

it specifies the size of the buffer in applier worker, which indicates the maximum number of transaction which a applier worker can have.

4.applier.parallel.max

it specifies the number of threads to apply parallel

5.table.naming.strategy

It specifies how to resolve the table name from source records, four strategies are provided:

1.DefaultTableNamingStrategy

"Table1"--> "Table1"

2.LowCaseTableNamingStrategy

"Table1"--> "table1"

3.UpperCaseTableNamingStrategy.

"table"--> "TABLE1"

According to step1, DefaultTableNamingStrategy is used.

6.column.naming.strategy

It specifies how to resolve the column name from source records, four strategies are provided:

1.DefaultColumnNamingStrategy

"Col1" ---> "Col1"

2.LowCaseColumnNamingStrategy

"COL1" ---> "col1"

3.UpperCaseColumnNamingStrategy.

"col1" ---> "COL1"

According to step1, DefaultColumnNamingStrategy is used.

After configuring json, send a post request to kafka Connect to create a connector

About

DbSink is a sink connector that provides a sink implementation for streaming changes emitted by Debezium

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

[8]ページ先頭

©2009-2025 Movatter.jp