Movatterモバイル変換


[0]ホーム

URL:


Skip to content
DEV Community
Log in Create account

DEV Community

Mohammad Arab Anvari
Mohammad Arab Anvari

Posted on • Edited on

     

Apply CDC From MySQL To Clickhouse on local environment

The aim of this tutorial is to capture every change (delete, insert, and update) from the Mysql table and sync it with Clickhouse.

Prerequisites

  • Mysql
  • Zookeeper
  • Kafka
  • Kafka-Connect
  • Clickhouse

We can set up all of these services with a simple docker-compose file(Source).

version:'2'services:  zookeeper:    image:quay.io/debezium/zookeeper:${DEBEZIUM_VERSION}    ports:     - 2181:2181     - 2888:2888     - 3888:3888  kafka:    image:quay.io/debezium/kafka:${DEBEZIUM_VERSION}    ports:     - 9092:9092    links:     - zookeeper    environment:     - ZOOKEEPER_CONNECT=zookeeper:2181  mysql:    image:quay.io/debezium/example-mysql:${DEBEZIUM_VERSION}    ports:     - 3306:3306    environment:     - MYSQL_ROOT_PASSWORD=debezium     - MYSQL_USER=mysqluser     - MYSQL_PASSWORD=mysqlpw  connect:    image:quay.io/debezium/connect:${DEBEZIUM_VERSION}    ports:     - 8083:8083    links:     - kafka     - mysql    environment:     - BOOTSTRAP_SERVERS=kafka:9092     - GROUP_ID=1     - CONFIG_STORAGE_TOPIC=my_connect_configs     - OFFSET_STORAGE_TOPIC=my_connect_offsets     - STATUS_STORAGE_TOPIC=my_connect_statuse  clickhouse:    image:clickhouse/clickhouse-server:23.2.4.12    links:     - kafka    ulimits:      nofile:        soft:262144        hard:262144    ports:      - 8123:8123      - 9000:9000
Enter fullscreen modeExit fullscreen mode

You can read more about the options of every service inthis tutorial.
After saving the yaml file asdocker-compose.yml :

exportDEBEZIUM_VERSION=2.2docker compose up
Enter fullscreen modeExit fullscreen mode

Now we have aMysql container which contains a simple database namedinventory, aKafka container, andZookeeper which manages aKafka cluster,connect instance which adds abilities of Kafka-Connectors to Kafka and also aClickhouse instance. Now we have all perquisites.

debezium, clickhouse to mysql architecture
Image Source

Deploy Debezium connector

We can interact withKafka-Connect withRest API.
Base request :

curl-i-X{Request_Type}-H"Accept:application/json"-H"Content-Type:application/json" localhost:8083/connectors/
Enter fullscreen modeExit fullscreen mode

See current connectors :

curl-i-X GET-H"Accept:application/json"-H"Content-Type:application/json" localhost:8083/connectors/
Enter fullscreen modeExit fullscreen mode

Delete {my-conn} connector:

curl-i-X DELETE-H"Accept:application/json"-H"Content-Type:application/json" localhost:8083/connectors/{my-conn}
Enter fullscreen modeExit fullscreen mode

Add connector:

curl-i-X POST-H"Accept:application/json"-H"Content-Type:application/json" localhost:8083/connectors/-d'{connector-config-as-json}'
Enter fullscreen modeExit fullscreen mode

Config for MySQL Connector

{  "name":"mysql-connector",  "config":{  "tasks.max":"1",  "connector.class":"io.debezium.connector.mysql.MySqlConnector",  "database.hostname":"mysql",  "database.port":"3306",  "database.user":"root",  "database.password":"debezium",  "database.include.list":"inventory",  "table.include.list":"inventory.orders",  "database.server.id":"1",  "message.key.columns":"inventory.orders:order_number",  "schema.history.internal.kafka.bootstrap.servers":"kafka:9092",  "schema.history.internal.kafka.topic":"dbz.inventory.history",  "snapshot.mode":"schema_only",  "topic.prefix":"dbz.inventory.v2",  "transforms":"unwrap",  "transforms.unwrap.delete.handling.mode":"rewrite",  "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState" }}
Enter fullscreen modeExit fullscreen mode
  • name: The name of the connector.
  • config: The connector’s configuration.
  • tasks.max: Only one task should operate at any one time. Because the MySQL connector reads the MySQL server’sbinlog, using a single connector task ensures proper order and event handling. The Kafka Connect service uses connectors to start one or more tasks that do the work, and it automatically distributes the running tasks across the cluster of Kafka Connect services. If any of the services stop or crash, those tasks will be redistributed to running services.
  • connector.class: Type of connector, On ofThese
  • database.hostname: The database host, which is the name of the Docker container running the MySQL server (mysql). Docker manipulates the network stack within the containers so that each linked container can be resolved with /etc/hosts using the container name for the hostname. If MySQL were running on a normal network, you would specify the IP address or resolvable hostname for this value.
  • database.user &database.password: Username and password of mysql user withthese privileges. For this example, I use the root user and pass.
  • database.include.list: Only changes in the inventory database will be detected.
  • topic.prefix: A unique topic prefix. This name will be used as the prefix for all Kafka topics.
  • schema.history.internal.kafka.bootstrap.servers &schema.history.internal.kafka.topic: The connector will store the history of the database schemas in Kafka using this broker (the same broker to which you are sending events) and topic name. Upon restart, the connector will recover the schemas of the database that existed at the point in time in thebinlog when the connector should begin reading.
  • transforms*: These transformations are needed to insert data in Clickhouse. More explanationhere

Full reference of configs for MySQL connector can be foundhere.

Consume Messages From Kafka

We wanna see a list of topics in our Kafka broker. First, we should access bash inside the Kafka container :

dockerexec-it{kafka-container-name} /bin/bash
Enter fullscreen modeExit fullscreen mode

Then:

/kafka/bin/kafka-topics.sh--bootstrap-server kafka:9092--list
Enter fullscreen modeExit fullscreen mode

Note that the topic corresponding to ourorders table in MySQL has such format:{topic.prefix}.{database_name}.{table_name}. In this example, it turns todbz.inventory.v2.inventory.orders
To consume all messages from a topic:

 /kafka/bin/kafka-console-consumer.sh--bootstrap-server kafka:9092--topic dbz.inventory.v2.inventory.orders--from-beginning
Enter fullscreen modeExit fullscreen mode

Set Up Clickhouse Tables

As mentioned inthis article in Clickhouse doc, we need 3 tables:

  • A Table witch Kafka engine
  • A Materialized View table
  • A MergeTree table

Kafka Engine Table

As mentioned inthe doc we should specify the format of message arriving from Kafka topic (one ofthese), We can use [[Kafka Schema Registry]] but here we wanna parse Json directly, So with help of solution provided inthis post we get message asJSONString format and then parse it using Mat. View.

CREATETABLE`default`.kafka_orders(`msg_json_str`String)Engine=Kafka('kafka:9092','dbz.inventory.v2.inventory.orders','clickhouse','JSONAsString')
Enter fullscreen modeExit fullscreen mode

Full doc of Kafka engine in Clickhouse.

MergeTree Table

As mentioned at the first of this article we wanna capture delete and update so we useReplacingMergeTree:

CREATETABLEdefault.stream_orders(`order_number`Int16,`order_date`DATE,`purchaser`Int16,`quantity`Int16,`product_id`Int16,`__deleted`Nullable(String))ENGINE=ReplacingMergeTreeORDERBY(order_number)SETTINGSindex_granularity=8192
Enter fullscreen modeExit fullscreen mode

Mat. View

We parse Json usingJSONExtract functions in Clickhouse.
We should consider that Debezium treatsDATE data type as a number of days since the1970-01-01Source. It's the cause of usingtoDate with combination ofJSONExtractInt.

CREATEMATERIALIZEDVIEWdefault.consumer__ordersTOdefault.stream_orders(`order_number`Int16,`order_date`DATE,`purchaser`Int16,`quantity`Int16,`product_id`Int16,`__deleted`Nullable(String))ASSELECTJSONExtractInt(msg_json_str,'payload','order_number')ASorder_number,(toDate('1970-01-01')+JSONExtractInt(msg_json_str,'payload','order_date'))ASorder_date,JSONExtractInt(msg_json_str,'payload','purchaser')ASpurchaser,JSONExtractInt(msg_json_str,'payload','quantity')ASquantity,JSONExtractInt(msg_json_str,'payload','product_id')asproduct_id,JSONExtractString(msg_json_str,'payload','__deleted')AS__deletedFROMdefault.kafka_orders
Enter fullscreen modeExit fullscreen mode

A View (Optional)

Clickhouse will mergeconsumer__orders table in an irregular schedule so we can't see the latest version of data at all times. But we can use view to obtain this goal:

CREATEVIEWorders(`order_number`Int16,`order_date_`DATE,`purchaser`Int16,`quantity`Int16,`product_id`Int16)ASSELECTorder_number,max(order_date)asorder_date_,argMax(purchaser,order_date)aspurchaser,argMax(quantity,order_date)asquantity,argMax(product_id,order_date)asproduct_idFROMdefault.stream_ordersWHERE`__deleted`='false'GROUPBYorder_number
Enter fullscreen modeExit fullscreen mode

We can also useFINAL modified instead ofGROUP BY but it's not recommended in a production environment.

Troubleshooting

In case of any error or even lack of data in tables, we should check Clickhouse server logs located in/var/log/clickhouse-server/clickhouse-server.err.log

References

Top comments(0)

Subscribe
pic
Create template

Templates let you quickly answer FAQs or store snippets for re-use.

Dismiss

Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment'spermalink.

For further actions, you may consider blocking this person and/orreporting abuse

Excited about Data Engineering :)
  • Work
    Data Engineer at Snapp!
  • Joined

More fromMohammad Arab Anvari

DEV Community

We're a place where coders share, stay up-to-date and grow their careers.

Log in Create account

[8]ページ先頭

©2009-2025 Movatter.jp