- Notifications
You must be signed in to change notification settings - Fork1
Example pipeline to stream the data changes from RDBMS to Apache Iceberg tables
License
waiyan1612/postgres-kafka-iceberg-pipeline
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
Example pipeline to stream the data changes from RDBMS to Apache Iceberg tables under two approaches.
- Manually processing the Kafka messages via Spark
- Using Kafka Sink Connector for Iceberg fromiceberg-kafka-connect repo
demo.mp4
├── docker-compose.yaml -> Compose file to launch postgres, kafka and spark containers├── kafka│ ├── config│ │ ├── connect-file-sink.properties -> File sink connector│ │ ├── connect-iceberg-sink.properties -> Iceberg Sink connector│ │ ├── connect-postgres-source.json -> Postgressource connector│ │ └── connect-standalone.properties -> Standalone kafka conenct config│ └── plugins│ └── debezium-connector-postgres -> Debezium connector jars should be downloaded to this folder│ └── iceberg-kafka-connect -> Iceberg kafka connect jars should be downloaded to this folder├── postgres│ ├── postgresql.conf -> Config with logical replication enabled│ └── scripts│ ├── manual│ └── seed -> SQL scripts that will be run the firsttime the db is created i.e. when the data directory is empty└── spark ├── .ivy -> Kafka and iceberg dependency jars will be downloaded to this folder └── scripts ├── consumer.py -> Pyspark script to consume kafka messages and stream to console and iceberg sinks └── print_iceberg_tables.py -> Pyspark script to query the tables created by Spark/Iceberg Sink Kafka connector
Moreover, these folders will be created and mounted so that docker containers can write back to the host file system.
├── data -> Data from containers will be mounted to this path Configuredin docker-compose file.│ ├── kafka│ │ └── out│ │ ├── cdc.commerce.sink.txt -> Output of Kafka File sink connector│ │ └── iceberg/warehouse -> Storage location used by Kafka Iceberg sinkfor each table data + metadata│ └── spark│ └── out│ ├── iceberg/warehouse -> Storage location used by Spark Iceberg sinkfor each table data + metadata│ └── spark/checkpoint -> Checkpoint location used by Spark structured streaming
Download the connectors and copy them to
kafka/pluginsfolder that will be mounted to the docker container.TMP_FOLDER=/tmp/dbz-pg-connectormkdir -p$TMP_FOLDER# See https://debezium.io/documentation/reference/stable/install.html to get the download link.wget -qO- https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.6.0.Final/debezium-connector-postgres-2.6.0.Final-plugin.tar.gz| tar -xvz -C$TMP_FOLDERcp$TMP_FOLDER/debezium-connector-postgres/*.jar$PWD/kafka/plugins/debezium-connector-postgres&& rm -rf$TMP_FOLDERTMP_FOLDER=/tmp/iceberg-kafka-connectmkdir -p$TMP_FOLDER# See https://github.com/tabular-io/iceberg-kafka-connect/releases to get the download link.wget -qO- https://github.com/tabular-io/iceberg-kafka-connect/releases/download/v0.6.15/iceberg-kafka-connect-runtime-0.6.15.zip| tar -xvz -C$TMP_FOLDERcp$TMP_FOLDER/iceberg-kafka-connect-runtime-0.6.15/lib/*.jar$PWD/kafka/plugins/iceberg-kafka-connect&& rm -rf$TMP_FOLDER# Download hadoop dependencies that are not includedwget -P$PWD/kafka/plugins/iceberg-kafka-connect https://repo1.maven.org/maven2/org/apache/commons/commons-configuration2/2.0/commons-configuration2-2.0.jarwget -P$PWD/kafka/plugins/iceberg-kafka-connect https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-auth/3.3.6/hadoop-auth-3.3.6.jar
Run Docker compose. This will
- Start the Postgres database and also run the scripts under
postgres/scripts/seed. - Start the Kafka in KRaft mode. The plugins from the previosuly step will be loaded.
- Start a spark container that will run the pyspark script from
spark/scripts/consumer.py. This container will also be used to visualize the created iceberg tables.
docker compose up
- Start the Postgres database and also run the scripts under
If we are running for the first time, spark container will need to download the required dependencies. We need to wait for the downloads to complete before we can proceed. We can monitor the stdout to see if the downloads are completed. Alternatively, we can also query to see the spark streaming app is already running.
docker containerexec kafka-spark curl -s http://localhost:4040/api/v1/applications| jq
Reponse
[ {"id":"local-1713191253855","name":"cdc-consumer","attempts": [ {"startTime":"2024-04-15T14:27:32.908GMT","endTime":"1969-12-31T23:59:59.999GMT","lastUpdated":"2024-04-15T14:27:32.908GMT","duration":455827,"sparkUser":"root","completed":false,"appSparkVersion":"3.5.1","startTimeEpoch":1713191252908,"endTimeEpoch":-1,"lastUpdatedEpoch":1713191252908 } ] }]Create and start the kafka connectors for postgres source and iceberg sink in detached mode.
docker containerexec -d kafka-standalone \ /opt/kafka/bin/connect-standalone.sh \ /opt/kafka/config-cdc/connect-standalone.properties \ /opt/kafka/config-cdc/connect-postgres-source.json \ /opt/kafka/config-cdc/connect-iceberg-sink.jsonNote: If we are facing issues with the iceberg sink connector, we can use the file sink connector to debug.
docker containerexec -d kafka-standalone \ /opt/kafka/bin/connect-standalone.sh \ /opt/kafka/config-cdc/connect-standalone.properties \ /opt/kafka/config-cdc/connect-postgres-source.json \ /opt/kafka/config-cdc/connect-file-sink.propertiesWe can monitor the console to see if the connectors are ready. Alternatively, we can also query to see the status of the connectors.
curl -s localhost:8083/connectors/dbz-pg-source/status| jqcurl -s localhost:8083/connectors/iceberg-sink/status| jq
Responses
{"name":"dbz-pg-source","connector": {"state":"RUNNING","worker_id":"172.21.0.2:8083" },"tasks": [ {"id":0,"state":"RUNNING","worker_id":"172.21.0.2:8083" } ],"type":"source"}{"name":"iceberg-sink","connector": {"state":"RUNNING","worker_id":"172.21.0.2:8083" },"tasks": [ {"id":0,"state":"RUNNING","worker_id":"172.21.0.2:8083" } ],"type":"sink"}Check the Iceberg tables from the sinks.
- Check the iceberg tables created by Spark.Output
docker containerexec kafka-spark \ /opt/spark/bin/spark-submit \ --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0 \ --conf spark.driver.extraJavaOptions="-Divy.cache.dir=/.ivy -Divy.home=/.ivy" \ /scripts/print_iceberg_tables.py spark
+------+-----------------------------------------------------------------------+---+|before|after |op |+------+-----------------------------------------------------------------------+---+|NULL |{"user_id":1,"email":"alice@example.com","created_at":1713192083639740}|r ||NULL |{"user_id":2,"email":"bob@example.com","created_at":1713192083639740} |r ||NULL |{"user_id":3,"email":"carol@example.com","created_at":1713192083639740}|r |+------+-----------------------------------------------------------------------+---++------+----------------------------------------------------------------------------------------+---+|before|after |op |+------+----------------------------------------------------------------------------------------+---+|NULL |{"product_id":1,"product_name":"Live Edge Dining Table","created_at":1713192083641523} |r ||NULL |{"product_id":2,"product_name":"Simple Teak Dining Chair","created_at":1713192083641523}|r |+------+----------------------------------------------------------------------------------------+---+ - Check the iceberg tables created by Iceberg sink kafka connector.Output
docker containerexec kafka-spark \/opt/spark/bin/spark-submit \ --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0 \ --conf spark.driver.extraJavaOptions="-Divy.cache.dir=/.ivy -Divy.home=/.ivy" \ /scripts/print_iceberg_tables.py kafka
+-------+-----------------+----------------+------------------------------------------------------------------------+|user_id|email |created_at |_cdc |+-------+-----------------+----------------+------------------------------------------------------------------------+|1 |alice@example.com|1713192083639740|{I, 2024-04-15 14:43:04.407, 0, commerce.account, commerce.account, {1}}||2 |bob@example.com |1713192083639740|{I, 2024-04-15 14:43:04.411, 1, commerce.account, commerce.account, {2}}||3 |carol@example.com|1713192083639740|{I, 2024-04-15 14:43:04.411, 2, commerce.account, commerce.account, {3}}|+-------+-----------------+----------------+------------------------------------------------------------------------++----------+------------------------+----------------+------------------------------------------------------------------------+|product_id|product_name |created_at |_cdc |+----------+------------------------+----------------+------------------------------------------------------------------------+|1 |Live Edge Dining Table |1713192083641523|{I, 2024-04-15 14:43:04.417, 0, commerce.product, commerce.product, {1}}||2 |Simple Teak Dining Chair|1713192083641523|{I, 2024-04-15 14:43:04.417, 1, commerce.product, commerce.product, {2}}|+----------+------------------------+----------------+------------------------------------------------------------------------+ - We can also run the pyspark shell to debug or run further analysis.
docker containerexec -it kafka-spark /opt/spark/bin/pyspark \--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0 \--conf spark.driver.extraJavaOptions="-Divy.cache.dir=/.ivy -Divy.home=/.ivy"
- Check the iceberg tables created by Spark.
Run the CUD operations on postgres. Repeat step 5-6.
find ./data! -name .gitkeep -delete/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
# Health checkcurl localhost:8083# Check pluginscurl localhost:8083/connector-plugins# Check connectors - configs, status, topicscurl localhost:8083/connectorscurl localhost:8083/connectors/dbz-pg-source| jqcurl localhost:8083/connectors/dbz-pg-source/status| jqcurl localhost:8083/connectors/dbz-pg-source/topics| jq# Pause, resume, restart, stop or delete connectorscurl -X PUT localhost:8083/connectors/dbz-pg-source/pausecurl -X PUT localhost:8083/connectors/dbz-pg-source/resumecurl -X PUT localhost:8083/connectors/dbz-pg-source/restartcurl -X PUT localhost:8083/connectors/dbz-pg-source/stopcurl -X DELETE localhost:8083/connectors/dbz-pg-source
About
Example pipeline to stream the data changes from RDBMS to Apache Iceberg tables
Topics
Resources
License
Uh oh!
There was an error while loading.Please reload this page.