Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

A real-time change data capture (CDC) pipeline using Debezium, Kafka, and PostgreSQL to stream database changes and send alerts via Telegram. The project also includes a PostgreSQL trigger-based audit log that captures detailed change context such as username, IP address, and client application providing both real-time monitoring and forensic audit

License

NotificationsYou must be signed in to change notification settings

zalihat/real-time-CDC-pipeline-with-postgres-kafka-telegram

Repository files navigation

Areal-time change data capture (CDC) pipeline usingDebezium,Kafka, andPostgreSQL to stream database changes and send alerts viaTelegram.
The project also includes aPostgreSQL trigger-based audit log that captures detailed change context such as username, IP address, and client application providing both real-time monitoring and forensic auditing capabilities.


🏗️ Architecture Overview

Architecture Flow

⚙️ Tech Stack

ComponentDescription
PostgreSQLMain source database
DebeziumCDC connector capturing row-level changes
KafkaEvent streaming backbone
Schema Registry & Control CenterSchema management & Kafka monitoring
PostgreSQL TriggerLocal audit logging
Telegram BotNotification endpoint
DockerService containerization

✨ Features

  • Captures and streamsupdate anddelete events in real-time
  • Sends formattedalerts to Telegram via bot API
  • Uses aPostgres trigger to maintain an internalaudit trail with:
    • Username
    • Client application name
    • IP address
    • Before/after state
    • JSON diff of changes

🚀 Setup & Installation

1. Clone the project

git clone https://github.com/zalihat/real-time-CDC-pipeline-with-postgres-kafka-telegram.git
cd real-time-CDC-pipeline-with-postgres-kafka-telegram

2. Build Kafka Connect image

docker buildx build --platform linux/amd64 -t kafka-connect:latest ./kafka-connect --load

3. Create a Telegram bot

  • In Telegram, search for @BotFather

  • Send /newbot → follow instructions to get your bot token

  • Get your chat ID: Open your bot in Telegram and send any message

  • Visit:https://api.telegram.org/bot<YOUR_BOT_TOKEN>/getUpdates

    Find "chat":{"id":123456789} — that’s your CHAT_ID

  • Create a .env file with the following

TELEGRAM_TOKEN = "<your telegram token>"TELEGRAM_CHAT_ID = "<your chat id>"

4. Start services

docker compose up -d --build

This starts the following containers:

  • PostgreSQL

    → Accessible on port 5432

    → You can connect using a client such as DBeaver with:

Host: localhostPort: 5432User: postgresPassword: postgres
  • Kafka Broker & Zookeeper

  • Schema Registry

  • Kafka Connect

  • Telegram consumer that send notifications

  • Control Center — provides a web UI to monitor Kafka, topics, and connectors→ View it athttp://localhost:8084

Create a Test Table

CREATE TABLE public.sales_test (  id SERIAL PRIMARY KEY,  customer_name TEXT,  item TEXT,  amount NUMERIC(10,2),  updated_at TIMESTAMPTZ DEFAULT NOW());-- Include full row in WAL for updates/deletesALTER TABLE public.sales_test REPLICA IDENTITY FULL;

🔗 Configure Debezium Source Connector

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \  localhost:8083/connectors/ -d '@./connectors/source.json'

Create the Audit Schema

CREATE SCHEMA IF NOT EXISTS audit;CREATE TABLE IF NOT EXISTS audit.data_changes (  audit_id     bigserial PRIMARY KEY,  table_name   text NOT NULL,  record_pk    text,  op_type      text NOT NULL,  changed_by   text,  app_name     text,  client_ip    inet,  changed_at   timestamptz DEFAULT now(),  before_row   jsonb,  after_row    jsonb,  diff         jsonb);

Create the Audit Function and Trigger

CREATE OR REPLACE FUNCTION audit.log_row_change() RETURNS trigger AS $$DECLARE  pk_val text;  before_json jsonb;  after_json jsonb;  computed_diff jsonb := '{}'::jsonb;BEGIN  IF TG_OP = 'DELETE' THEN    before_json := row_to_json(OLD)::jsonb;    after_json := NULL;    pk_val := OLD.id::text;  ELSIF TG_OP = 'INSERT' THEN    before_json := NULL;    after_json := row_to_json(NEW)::jsonb;    pk_val := NEW.id::text;  ELSE    before_json := row_to_json(OLD)::jsonb;    after_json := row_to_json(NEW)::jsonb;    pk_val := NEW.id::text;  END IF;  IF before_json IS NOT NULL AND after_json IS NOT NULL THEN    computed_diff := (      SELECT jsonb_object_agg(k, jsonb_build_object('old', before_json->k, 'new', after_json->k))      FROM (        SELECT jsonb_object_keys(before_json) AS k        UNION        SELECT jsonb_object_keys(after_json) AS k      ) s      WHERE (before_json->s.k) IS DISTINCT FROM (after_json->s.k)    );  END IF;  INSERT INTO audit.data_changes(    table_name, record_pk, op_type, changed_by, app_name, client_ip, before_row, after_row, diff  )  VALUES (    TG_TABLE_NAME,    pk_val,    TG_OP,    current_user,    current_setting('application_name', true),    inet_client_addr(),    before_json,    after_json,    computed_diff  );  RETURN NEW;END;$$ LANGUAGE plpgsql;

Create Trigger

CREATE TRIGGER trg_audit_sales_testAFTER INSERT OR UPDATE OR DELETEON public.sales_testFOR EACH ROW EXECUTE FUNCTION audit.log_row_change();

Test the CDC and Audit Log

INSERT INTO public.sales_test (customer_name, item, amount)VALUES ('Alice', 'Laptop', 1200.00);UPDATE public.sales_testSET amount = 1800.00WHERE customer_name = 'Alice';SELECT * FROM audit.data_changes;

Sample Kafka topic output

Telegram Notification

🧹 Shutdown

After testing:

docker compose down

About

A real-time change data capture (CDC) pipeline using Debezium, Kafka, and PostgreSQL to stream database changes and send alerts via Telegram. The project also includes a PostgreSQL trigger-based audit log that captures detailed change context such as username, IP address, and client application providing both real-time monitoring and forensic audit

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

[8]ページ先頭

©2009-2025 Movatter.jp