Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Example pipeline to stream the data changes from RDBMS to Apache Iceberg tables

License

NotificationsYou must be signed in to change notification settings

waiyan1612/postgres-kafka-iceberg-pipeline

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

8 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Example pipeline to stream the data changes from RDBMS to Apache Iceberg tables under two approaches.

  1. Manually processing the Kafka messages via Spark
  2. Using Kafka Sink Connector for Iceberg fromiceberg-kafka-connect repo
demo.mp4

Contents

├── docker-compose.yaml                        -> Compose file to launch postgres, kafka and spark containers├── kafka│   ├── config│   │   ├── connect-file-sink.properties       -> File sink connector│   │   ├── connect-iceberg-sink.properties    -> Iceberg Sink connector│   │   ├── connect-postgres-source.json       -> Postgressource connector│   │   └── connect-standalone.properties      -> Standalone kafka conenct config│   └── plugins│       └── debezium-connector-postgres        -> Debezium connector jars should be downloaded to this folder│       └── iceberg-kafka-connect              -> Iceberg kafka connect jars should be downloaded to this folder├── postgres│   ├── postgresql.conf                        -> Config with logical replication enabled│   └── scripts│       ├── manual│       └── seed                               -> SQL scripts that will be run the firsttime the db is created i.e. when the data directory is empty└── spark    ├── .ivy                                   -> Kafka and iceberg dependency jars will be downloaded to this folder    └── scripts        ├── consumer.py                        -> Pyspark script to consume kafka messages and stream to console and iceberg sinks        └── print_iceberg_tables.py            -> Pyspark script to query the tables created by Spark/Iceberg Sink Kafka connector

Moreover, these folders will be created and mounted so that docker containers can write back to the host file system.

├── data                                       -> Data from containers will be mounted to this path Configuredin docker-compose file.│   ├── kafka│   │   └── out│   │      ├── cdc.commerce.sink.txt           -> Output of Kafka File sink connector│   │      └── iceberg/warehouse               -> Storage location used by Kafka Iceberg sinkfor each table data + metadata│   └── spark│       └── out│          ├── iceberg/warehouse               -> Storage location used by Spark Iceberg sinkfor each table data + metadata│          └── spark/checkpoint                -> Checkpoint location used by Spark structured streaming

Setup Guide

  1. Download the connectors and copy them tokafka/plugins folder that will be mounted to the docker container.

    TMP_FOLDER=/tmp/dbz-pg-connectormkdir -p$TMP_FOLDER# See https://debezium.io/documentation/reference/stable/install.html to get the download link.wget -qO- https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.6.0.Final/debezium-connector-postgres-2.6.0.Final-plugin.tar.gz| tar -xvz -C$TMP_FOLDERcp$TMP_FOLDER/debezium-connector-postgres/*.jar$PWD/kafka/plugins/debezium-connector-postgres&& rm -rf$TMP_FOLDERTMP_FOLDER=/tmp/iceberg-kafka-connectmkdir -p$TMP_FOLDER# See https://github.com/tabular-io/iceberg-kafka-connect/releases to get the download link.wget -qO- https://github.com/tabular-io/iceberg-kafka-connect/releases/download/v0.6.15/iceberg-kafka-connect-runtime-0.6.15.zip| tar -xvz -C$TMP_FOLDERcp$TMP_FOLDER/iceberg-kafka-connect-runtime-0.6.15/lib/*.jar$PWD/kafka/plugins/iceberg-kafka-connect&& rm -rf$TMP_FOLDER# Download hadoop dependencies that are not includedwget -P$PWD/kafka/plugins/iceberg-kafka-connect https://repo1.maven.org/maven2/org/apache/commons/commons-configuration2/2.0/commons-configuration2-2.0.jarwget -P$PWD/kafka/plugins/iceberg-kafka-connect https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-auth/3.3.6/hadoop-auth-3.3.6.jar
  2. Run Docker compose. This will

    1. Start the Postgres database and also run the scripts underpostgres/scripts/seed.
    2. Start the Kafka in KRaft mode. The plugins from the previosuly step will be loaded.
    3. Start a spark container that will run the pyspark script fromspark/scripts/consumer.py. This container will also be used to visualize the created iceberg tables.
    docker compose up
  3. If we are running for the first time, spark container will need to download the required dependencies. We need to wait for the downloads to complete before we can proceed. We can monitor the stdout to see if the downloads are completed. Alternatively, we can also query to see the spark streaming app is already running.

    docker containerexec kafka-spark curl -s http://localhost:4040/api/v1/applications| jq

    Reponse

    [  {"id":"local-1713191253855","name":"cdc-consumer","attempts": [      {"startTime":"2024-04-15T14:27:32.908GMT","endTime":"1969-12-31T23:59:59.999GMT","lastUpdated":"2024-04-15T14:27:32.908GMT","duration":455827,"sparkUser":"root","completed":false,"appSparkVersion":"3.5.1","startTimeEpoch":1713191252908,"endTimeEpoch":-1,"lastUpdatedEpoch":1713191252908      }    ]  }]
  4. Create and start the kafka connectors for postgres source and iceberg sink in detached mode.

    docker containerexec -d kafka-standalone \  /opt/kafka/bin/connect-standalone.sh \  /opt/kafka/config-cdc/connect-standalone.properties \  /opt/kafka/config-cdc/connect-postgres-source.json \  /opt/kafka/config-cdc/connect-iceberg-sink.json

    Note: If we are facing issues with the iceberg sink connector, we can use the file sink connector to debug.

    docker containerexec -d kafka-standalone \  /opt/kafka/bin/connect-standalone.sh \  /opt/kafka/config-cdc/connect-standalone.properties \  /opt/kafka/config-cdc/connect-postgres-source.json \  /opt/kafka/config-cdc/connect-file-sink.properties
  5. We can monitor the console to see if the connectors are ready. Alternatively, we can also query to see the status of the connectors.

    curl -s localhost:8083/connectors/dbz-pg-source/status| jqcurl -s localhost:8083/connectors/iceberg-sink/status| jq

    Responses

    {"name":"dbz-pg-source","connector": {"state":"RUNNING","worker_id":"172.21.0.2:8083"  },"tasks": [    {"id":0,"state":"RUNNING","worker_id":"172.21.0.2:8083"    }  ],"type":"source"}
    {"name":"iceberg-sink","connector": {"state":"RUNNING","worker_id":"172.21.0.2:8083"  },"tasks": [    {"id":0,"state":"RUNNING","worker_id":"172.21.0.2:8083"    }  ],"type":"sink"}
  6. Check the Iceberg tables from the sinks.

    1. Check the iceberg tables created by Spark.
      docker containerexec kafka-spark \  /opt/spark/bin/spark-submit \  --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0 \  --conf spark.driver.extraJavaOptions="-Divy.cache.dir=/.ivy -Divy.home=/.ivy" \  /scripts/print_iceberg_tables.py spark
      Output
      +------+-----------------------------------------------------------------------+---+|before|after                                                                  |op |+------+-----------------------------------------------------------------------+---+|NULL  |{"user_id":1,"email":"alice@example.com","created_at":1713192083639740}|r  ||NULL  |{"user_id":2,"email":"bob@example.com","created_at":1713192083639740}  |r  ||NULL  |{"user_id":3,"email":"carol@example.com","created_at":1713192083639740}|r  |+------+-----------------------------------------------------------------------+---++------+----------------------------------------------------------------------------------------+---+|before|after                                                                                   |op |+------+----------------------------------------------------------------------------------------+---+|NULL  |{"product_id":1,"product_name":"Live Edge Dining Table","created_at":1713192083641523}  |r  ||NULL  |{"product_id":2,"product_name":"Simple Teak Dining Chair","created_at":1713192083641523}|r  |+------+----------------------------------------------------------------------------------------+---+
    2. Check the iceberg tables created by Iceberg sink kafka connector.
      docker containerexec kafka-spark \/opt/spark/bin/spark-submit \  --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0 \  --conf spark.driver.extraJavaOptions="-Divy.cache.dir=/.ivy -Divy.home=/.ivy" \  /scripts/print_iceberg_tables.py kafka
      Output
      +-------+-----------------+----------------+------------------------------------------------------------------------+|user_id|email            |created_at      |_cdc                                                                    |+-------+-----------------+----------------+------------------------------------------------------------------------+|1      |alice@example.com|1713192083639740|{I, 2024-04-15 14:43:04.407, 0, commerce.account, commerce.account, {1}}||2      |bob@example.com  |1713192083639740|{I, 2024-04-15 14:43:04.411, 1, commerce.account, commerce.account, {2}}||3      |carol@example.com|1713192083639740|{I, 2024-04-15 14:43:04.411, 2, commerce.account, commerce.account, {3}}|+-------+-----------------+----------------+------------------------------------------------------------------------++----------+------------------------+----------------+------------------------------------------------------------------------+|product_id|product_name            |created_at      |_cdc                                                                    |+----------+------------------------+----------------+------------------------------------------------------------------------+|1         |Live Edge Dining Table  |1713192083641523|{I, 2024-04-15 14:43:04.417, 0, commerce.product, commerce.product, {1}}||2         |Simple Teak Dining Chair|1713192083641523|{I, 2024-04-15 14:43:04.417, 1, commerce.product, commerce.product, {2}}|+----------+------------------------+----------------+------------------------------------------------------------------------+
    3. We can also run the pyspark shell to debug or run further analysis.
      docker containerexec -it kafka-spark /opt/spark/bin/pyspark \--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0 \--conf spark.driver.extraJavaOptions="-Divy.cache.dir=/.ivy -Divy.home=/.ivy"
  7. Run the CUD operations on postgres. Repeat step 5-6.

Helpful Commands for Debugging

One-liner to wipe data directory while keeping .gitkeep files

find ./data! -name .gitkeep -delete

List kafka topics

/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

Kafka Connect Endpoints

# Health checkcurl localhost:8083# Check pluginscurl localhost:8083/connector-plugins# Check connectors - configs, status, topicscurl localhost:8083/connectorscurl localhost:8083/connectors/dbz-pg-source| jqcurl localhost:8083/connectors/dbz-pg-source/status| jqcurl localhost:8083/connectors/dbz-pg-source/topics| jq# Pause, resume, restart, stop or delete connectorscurl -X PUT localhost:8083/connectors/dbz-pg-source/pausecurl -X PUT localhost:8083/connectors/dbz-pg-source/resumecurl -X PUT localhost:8083/connectors/dbz-pg-source/restartcurl -X PUT localhost:8083/connectors/dbz-pg-source/stopcurl -X DELETE localhost:8083/connectors/dbz-pg-source

About

Example pipeline to stream the data changes from RDBMS to Apache Iceberg tables

Topics

Resources

License

Stars

Watchers

Forks

Languages


[8]ページ先頭

©2009-2025 Movatter.jp