The aim of this tutorial is to capture every change (delete, insert, and update) from the Mysql table and sync it with Clickhouse.
Prerequisites
- Mysql
- Zookeeper
- Kafka
- Kafka-Connect
- Clickhouse
We can set up all of these services with a simple docker-compose file(Source).
version:'2'services: zookeeper: image:quay.io/debezium/zookeeper:${DEBEZIUM_VERSION} ports: - 2181:2181 - 2888:2888 - 3888:3888 kafka: image:quay.io/debezium/kafka:${DEBEZIUM_VERSION} ports: - 9092:9092 links: - zookeeper environment: - ZOOKEEPER_CONNECT=zookeeper:2181 mysql: image:quay.io/debezium/example-mysql:${DEBEZIUM_VERSION} ports: - 3306:3306 environment: - MYSQL_ROOT_PASSWORD=debezium - MYSQL_USER=mysqluser - MYSQL_PASSWORD=mysqlpw connect: image:quay.io/debezium/connect:${DEBEZIUM_VERSION} ports: - 8083:8083 links: - kafka - mysql environment: - BOOTSTRAP_SERVERS=kafka:9092 - GROUP_ID=1 - CONFIG_STORAGE_TOPIC=my_connect_configs - OFFSET_STORAGE_TOPIC=my_connect_offsets - STATUS_STORAGE_TOPIC=my_connect_statuse clickhouse: image:clickhouse/clickhouse-server:23.2.4.12 links: - kafka ulimits: nofile: soft:262144 hard:262144 ports: - 8123:8123 - 9000:9000
You can read more about the options of every service inthis tutorial.
After saving the yaml file asdocker-compose.yml
:
exportDEBEZIUM_VERSION=2.2docker compose up
Now we have aMysql
container which contains a simple database namedinventory
, aKafka
container, andZookeeper
which manages aKafka
cluster,connect
instance which adds abilities of Kafka-Connectors to Kafka and also aClickhouse
instance. Now we have all perquisites.
Deploy Debezium connector
We can interact withKafka-Connect
withRest API.
Base request :
curl-i-X{Request_Type}-H"Accept:application/json"-H"Content-Type:application/json" localhost:8083/connectors/
See current connectors :
curl-i-X GET-H"Accept:application/json"-H"Content-Type:application/json" localhost:8083/connectors/
Delete {my-conn} connector:
curl-i-X DELETE-H"Accept:application/json"-H"Content-Type:application/json" localhost:8083/connectors/{my-conn}
Add connector:
curl-i-X POST-H"Accept:application/json"-H"Content-Type:application/json" localhost:8083/connectors/-d'{connector-config-as-json}'
Config for MySQL Connector
{ "name":"mysql-connector", "config":{ "tasks.max":"1", "connector.class":"io.debezium.connector.mysql.MySqlConnector", "database.hostname":"mysql", "database.port":"3306", "database.user":"root", "database.password":"debezium", "database.include.list":"inventory", "table.include.list":"inventory.orders", "database.server.id":"1", "message.key.columns":"inventory.orders:order_number", "schema.history.internal.kafka.bootstrap.servers":"kafka:9092", "schema.history.internal.kafka.topic":"dbz.inventory.history", "snapshot.mode":"schema_only", "topic.prefix":"dbz.inventory.v2", "transforms":"unwrap", "transforms.unwrap.delete.handling.mode":"rewrite", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState" }}
name
: The name of the connector.config
: The connector’s configuration.tasks.max
: Only one task should operate at any one time. Because the MySQL connector reads the MySQL server’sbinlog
, using a single connector task ensures proper order and event handling. The Kafka Connect service uses connectors to start one or more tasks that do the work, and it automatically distributes the running tasks across the cluster of Kafka Connect services. If any of the services stop or crash, those tasks will be redistributed to running services.connector.class
: Type of connector, On ofThesedatabase.hostname
: The database host, which is the name of the Docker container running the MySQL server (mysql
). Docker manipulates the network stack within the containers so that each linked container can be resolved with /etc/hosts using the container name for the hostname. If MySQL were running on a normal network, you would specify the IP address or resolvable hostname for this value.database.user
&database.password
: Username and password of mysql user withthese privileges. For this example, I use the root user and pass.database.include.list
: Only changes in the inventory database will be detected.topic.prefix
: A unique topic prefix. This name will be used as the prefix for all Kafka topics.schema.history.internal.kafka.bootstrap.servers
&schema.history.internal.kafka.topic
: The connector will store the history of the database schemas in Kafka using this broker (the same broker to which you are sending events) and topic name. Upon restart, the connector will recover the schemas of the database that existed at the point in time in thebinlog
when the connector should begin reading.transforms*
: These transformations are needed to insert data in Clickhouse. More explanationhere
Full reference of configs for MySQL connector can be foundhere.
Consume Messages From Kafka
We wanna see a list of topics in our Kafka broker. First, we should access bash inside the Kafka container :
dockerexec-it{kafka-container-name} /bin/bash
Then:
/kafka/bin/kafka-topics.sh--bootstrap-server kafka:9092--list
Note that the topic corresponding to ourorders
table in MySQL has such format:{topic.prefix}.{database_name}.{table_name}
. In this example, it turns todbz.inventory.v2.inventory.orders
To consume all messages from a topic:
/kafka/bin/kafka-console-consumer.sh--bootstrap-server kafka:9092--topic dbz.inventory.v2.inventory.orders--from-beginning
Set Up Clickhouse Tables
As mentioned inthis article in Clickhouse doc, we need 3 tables:
- A Table witch Kafka engine
- A Materialized View table
- A MergeTree table
Kafka Engine Table
As mentioned inthe doc we should specify the format of message arriving from Kafka topic (one ofthese), We can use [[Kafka Schema Registry]] but here we wanna parse Json directly, So with help of solution provided inthis post we get message asJSONString
format and then parse it using Mat. View.
CREATETABLE`default`.kafka_orders(`msg_json_str`String)Engine=Kafka('kafka:9092','dbz.inventory.v2.inventory.orders','clickhouse','JSONAsString')
Full doc of Kafka engine in Clickhouse.
MergeTree Table
As mentioned at the first of this article we wanna capture delete and update so we useReplacingMergeTree
:
CREATETABLEdefault.stream_orders(`order_number`Int16,`order_date`DATE,`purchaser`Int16,`quantity`Int16,`product_id`Int16,`__deleted`Nullable(String))ENGINE=ReplacingMergeTreeORDERBY(order_number)SETTINGSindex_granularity=8192
Mat. View
We parse Json usingJSONExtract functions in Clickhouse.
We should consider that Debezium treatsDATE
data type as a number of days since the1970-01-01
Source. It's the cause of usingtoDate
with combination ofJSONExtractInt
.
CREATEMATERIALIZEDVIEWdefault.consumer__ordersTOdefault.stream_orders(`order_number`Int16,`order_date`DATE,`purchaser`Int16,`quantity`Int16,`product_id`Int16,`__deleted`Nullable(String))ASSELECTJSONExtractInt(msg_json_str,'payload','order_number')ASorder_number,(toDate('1970-01-01')+JSONExtractInt(msg_json_str,'payload','order_date'))ASorder_date,JSONExtractInt(msg_json_str,'payload','purchaser')ASpurchaser,JSONExtractInt(msg_json_str,'payload','quantity')ASquantity,JSONExtractInt(msg_json_str,'payload','product_id')asproduct_id,JSONExtractString(msg_json_str,'payload','__deleted')AS__deletedFROMdefault.kafka_orders
A View (Optional)
Clickhouse will mergeconsumer__orders
table in an irregular schedule so we can't see the latest version of data at all times. But we can use view to obtain this goal:
CREATEVIEWorders(`order_number`Int16,`order_date_`DATE,`purchaser`Int16,`quantity`Int16,`product_id`Int16)ASSELECTorder_number,max(order_date)asorder_date_,argMax(purchaser,order_date)aspurchaser,argMax(quantity,order_date)asquantity,argMax(product_id,order_date)asproduct_idFROMdefault.stream_ordersWHERE`__deleted`='false'GROUPBYorder_number
We can also useFINAL
modified instead ofGROUP BY
but it's not recommended in a production environment.
Troubleshooting
In case of any error or even lack of data in tables, we should check Clickhouse server logs located in/var/log/clickhouse-server/clickhouse-server.err.log
References
Top comments(0)
For further actions, you may consider blocking this person and/orreporting abuse