Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Real-time Data Warehouse with Apache Flink & Apache Kafka & Apache Hudi

NotificationsYou must be signed in to change notification settings

izhangzhihao/Real-time-Data-Warehouse

Repository files navigation

Real-time Data Warehouse using:Flink & Kafka |Flink & Hudi |Spark & Delta |Flink & Hudi & E-commerce

demo_overview

Getting the setup up and running

docker compose build

docker compose up -d

Check everything really up and running

docker compose ps

You should be able to access the Flink Web UI (http://localhost:8081), as well as Kibana (http://localhost:5601).

Postgres

Start the Postgres client to have a look at the source tables and run some DML statements later:

docker composeexec postgres env PGOPTIONS="--search_path=claims" bash -c'psql -U $POSTGRES_USER postgres'

What tables are we dealing with?

SELECT*FROMinformation_schema.tablesWHERE table_schema='claims';

Debezium

Start theDebezium Postgres connectorusing the configuration provided in theregister-postgres.json file:

curl -i -X POST -H"Accept:application/json" -H"Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres.jsoncurl -i -X POST -H"Accept:application/json" -H"Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres-members.json

Check that the connector is running:

curl http://localhost:8083/connectors/claims-connector/status# | jq

The first time it connects to a Postgres server, Debezium takesaconsistent snapshotof all database schemas; so, you should see that the pre-existing records in theaccident_claims table have alreadybeen pushed into your Kafka topic:

docker composeexec kafka /kafka/bin/kafka-console-consumer.sh \    --bootstrap-server kafka:9092 \    --from-beginning \    --property print.key=true \    --topic pg_claims.claims.accident_claims

ℹ️ Have a quick read about the structure of these events in theDebezium documentation.

Is it working?

In the tab you used to start the Postgres client, you can now run some DML statements to see that the changes arepropagated all the way to your Kafka topic:

INSERT INTO accident_claims (claim_total, claim_total_receipt, claim_currency, member_id, accident_date, accident_type,accident_detail, claim_date, claim_status)VALUES (500,'PharetraMagnaVestibulum.tiff','AUD',321,'2020-08-01 06:43:03','Collision','Blue Ringed Octopus','2020-08-10 09:39:31','INITIAL');
UPDATE accident_claimsSET claim_total_receipt='CorrectReceipt.pdf'WHERE claim_id=1001;
DELETEFROM accident_claimsWHERE claim_id=1001;

In the output of your Kafka console consumer, you should now see three consecutive events withop values equaltoc (aninsert event),u (anupdate event) andd (adelete event).

Flink connectors

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/https://flink-packages.org/categories/connectorshttps://github.com/knaufk/flink-faker/

Datasource ingestion

Start the Flink SQL Client:

docker composeexec sql-client ./sql-client.sh

OR

docker composeexec sql-client ./sql-client-submit.sh

test

CREATETABLEt1(  uuidVARCHAR(20),-- you can use 'PRIMARY KEY NOT ENFORCED' syntax to mark the field as record key  nameVARCHAR(10),  ageINT,  tsTIMESTAMP(3),`partition`VARCHAR(20))PARTITIONED BY (`partition`)WITH ('connector'='hudi','path'='/data/t1','write.tasks'='1',-- default is 4 ,required more resource'compaction.tasks'='1',-- default is 10 ,required more resource'table.type'='COPY_ON_WRITE',-- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE'read.tasks'='1',-- default is 4 ,required more resource'read.streaming.enabled'='true',-- this option enable the streaming read'read.streaming.start-commit'='20210712134429',-- specifies the start commit instant time'read.streaming.check-interval'='4'-- specifies the check interval for finding new source commits, default 60s.);-- insert data using valuesINSERT INTO t1VALUES  ('id1','Danny',23,TIMESTAMP'1970-01-01 00:00:01','par1'),  ('id2','Stephen',33,TIMESTAMP'1970-01-01 00:00:02','par1'),  ('id3','Julian',53,TIMESTAMP'1970-01-01 00:00:03','par2'),  ('id4','Fabian',31,TIMESTAMP'1970-01-01 00:00:04','par2'),  ('id5','Sophia',18,TIMESTAMP'1970-01-01 00:00:05','par3'),  ('id6','Emma',20,TIMESTAMP'1970-01-01 00:00:06','par3'),  ('id7','Bob',44,TIMESTAMP'1970-01-01 00:00:07','par4'),  ('id8','Han',56,TIMESTAMP'1970-01-01 00:00:08','par4');SELECT*FROM t1;

RegisteraPostgres catalog, so you can access the metadata of the external tables over JDBC:

CREATE CATALOG datasource WITH ('type'='jdbc','property-version'='1','base-url'='jdbc:postgresql://postgres:5432/','default-database'='postgres','username'='postgres','password'='postgres');
CREATEDATABASEIF NOT EXISTS datasource;
CREATETABLEdatasource.accident_claims WITH ('connector'='kafka','topic'='pg_claims.claims.accident_claims','properties.bootstrap.servers'='kafka:9092','properties.group.id'='accident_claims-consumer-group','format'='debezium-json','scan.startup.mode'='earliest-offset'                                            )LIKEdatasource.postgres.`claims.accident_claims` (EXCLUDING ALL);

OR generate data from datagen connector:

CREATETABLEdatasource.accident_claims(    claim_idBIGINT,    claim_total         DOUBLE,    claim_total_receiptVARCHAR(50),    claim_currencyVARCHAR(3),    member_idINT,    accident_dateDATE,    accident_typeVARCHAR(20),    accident_detailVARCHAR(20),    claim_dateDATE,    claim_statusVARCHAR(10),    ts_createdTIMESTAMP(3),    ts_updatedTIMESTAMP(3)                                          ) WITH ('connector'='datagen','rows-per-second'='100'                                            );

andmembers table:

CREATETABLEdatasource.members WITH ('connector'='kafka','topic'='pg_claims.claims.members','properties.bootstrap.servers'='kafka:9092','properties.group.id'='members-consumer-group','format'='debezium-json','scan.startup.mode'='earliest-offset'                                    )LIKEdatasource.postgres.`claims.members` ( EXCLUDING OPTIONS);

OR generate data from datagen connector:

CREATETABLEdatasource.members(    idBIGINT,    first_nameVARCHAR(50),    last_nameVARCHAR(50),    addressVARCHAR(50),    address_cityVARCHAR(10),    address_countryVARCHAR(10),    insurance_companyVARCHAR(25),    insurance_numberVARCHAR(50),    ts_createdTIMESTAMP(3),    ts_updatedTIMESTAMP(3)                                    ) WITH ('connector'='datagen','rows-per-second'='100'                                            );

Check data:

SELECT*FROMdatasource.accident_claims;SELECT*FROMdatasource.members;

DWD

Create a database in DWD layer:

CREATEDATABASEIF NOT EXISTS dwd;
CREATETABLEdwd.accident_claims(    claim_idBIGINT,    claim_total         DOUBLE,    claim_total_receiptVARCHAR(50),    claim_currencyVARCHAR(3),    member_idINT,    accident_dateDATE,    accident_typeVARCHAR(20),    accident_detailVARCHAR(20),    claim_dateDATE,    claim_statusVARCHAR(10),    ts_createdTIMESTAMP(3),    ts_updatedTIMESTAMP(3),    dsDATE,PRIMARY KEY (claim_id) NOT ENFORCED) PARTITIONED BY (ds) WITH ('connector'='hudi','path'='/data/dwd/accident_claims','table.type'='MERGE_ON_READ','read.streaming.enabled'='true','write.batch.size'='1','write.tasks'='1','compaction.tasks'='1','compaction.delta_seconds'='60','write.precombine.field'='ts_updated','read.tasks'='1','read.streaming.check-interval'='5','read.streaming.start-commit'='20210712134429','index.bootstrap.enabled'='true');
CREATETABLEdwd.members(    idBIGINT,    first_nameVARCHAR(50),    last_nameVARCHAR(50),    addressVARCHAR(50),    address_cityVARCHAR(10),    address_countryVARCHAR(10),    insurance_companyVARCHAR(25),    insurance_numberVARCHAR(50),    ts_createdTIMESTAMP(3),    ts_updatedTIMESTAMP(3),    dsDATE,PRIMARY KEY (id) NOT ENFORCED) PARTITIONED BY (ds) WITH ('connector'='hudi','path'='/data/dwd/members','table.type'='MERGE_ON_READ','read.streaming.enabled'='true','write.batch.size'='1','write.tasks'='1','compaction.tasks'='1','compaction.delta_seconds'='60','write.precombine.field'='ts_updated','read.tasks'='1','read.streaming.check-interval'='5','read.streaming.start-commit'='20210712134429','index.bootstrap.enabled'='true');

and submit a continuous query to the Flink cluster that will write the data from datasource into dwd table(ES):

INSERT INTOdwd.accident_claimsSELECT claim_id,       claim_total,       claim_total_receipt,       claim_currency,       member_id,       CAST (accident_dateasDATE),       accident_type,       accident_detail,       CAST (claim_dateasDATE),       claim_status,       CAST (ts_createdasTIMESTAMP),       CAST (ts_updatedasTIMESTAMP),       claim_date--CAST (SUBSTRING(claim_date, 0, 9) as DATE)FROMdatasource.accident_claims;
INSERT INTOdwd.membersSELECT id,       first_name,       last_name,       address,       address_city,       address_country,       insurance_company,       insurance_number,       CAST (ts_createdasTIMESTAMP),       CAST (ts_updatedasTIMESTAMP),       ts_created--CAST (SUBSTRING(ts_created, 0, 9) as DATE)FROMdatasource.members;

Check data:

SELECT*FROMdwd.accident_claims;SELECT*FROMdwd.members;

DWB

Create a database in DWB layer:

CREATEDATABASEIF NOT EXISTS dwb;
CREATETABLEdwb.accident_claims(    claim_idBIGINT,    claim_total         DOUBLE,    claim_total_receiptVARCHAR(50),    claim_currencyVARCHAR(3),    member_idINT,    accident_dateDATE,    accident_typeVARCHAR(20),    accident_detailVARCHAR(20),    claim_dateDATE,    claim_statusVARCHAR(10),    ts_createdTIMESTAMP(3),    ts_updatedTIMESTAMP(3),    dsDATE,PRIMARY KEY (claim_id) NOT ENFORCED) PARTITIONED BY (ds) WITH ('connector'='hudi','path'='/data/dwb/accident_claims','table.type'='MERGE_ON_READ','read.streaming.enabled'='true','write.batch.size'='1','write.tasks'='1','compaction.tasks'='1','compaction.delta_seconds'='60','write.precombine.field'='ts_updated','read.tasks'='1','read.streaming.check-interval'='5','read.streaming.start-commit'='20210712134429','index.bootstrap.enabled'='true');
CREATETABLEdwb.members(    idBIGINT,    first_nameVARCHAR(50),    last_nameVARCHAR(50),    addressVARCHAR(50),    address_cityVARCHAR(10),    address_countryVARCHAR(10),    insurance_companyVARCHAR(25),    insurance_numberVARCHAR(50),    ts_createdTIMESTAMP(3),    ts_updatedTIMESTAMP(3),    dsDATE,PRIMARY KEY (id) NOT ENFORCED) PARTITIONED BY (ds) WITH ('connector'='hudi','path'='/data/dwb/members','table.type'='MERGE_ON_READ','read.streaming.enabled'='true','write.batch.size'='1','write.tasks'='1','compaction.tasks'='1','compaction.delta_seconds'='60','write.precombine.field'='ts_updated','read.tasks'='1','read.streaming.check-interval'='5','read.streaming.start-commit'='20210712134429','index.bootstrap.enabled'='true');
INSERT INTOdwb.accident_claimsSELECT claim_id,       claim_total,       claim_total_receipt,       claim_currency,       member_id,       accident_date,       accident_type,       accident_detail,       claim_date,       claim_status,       ts_created,       ts_updated,       dsFROMdwd.accident_claims;
INSERT INTOdwb.membersSELECT id,       first_name,       last_name,       address,       address_city,       address_country,       insurance_company,       insurance_number,       ts_created,       ts_updated,       dsFROMdwd.members;

Check data:

SELECT*FROMdwb.accident_claims;SELECT*FROMdwb.members;

DWS

Create a database in DWS layer:

CREATEDATABASEIF NOT EXISTS dws;
CREATETABLEdws.insurance_costs(    es_key            STRINGPRIMARY KEY NOT ENFORCED,    insurance_company STRING,    accident_detail   STRING,    accident_agg_cost DOUBLE) WITH ('connector'='elasticsearch-7','hosts'='http://elasticsearch:9200','index'='agg_insurance_costs'      );

and submit a continuous query to the Flink cluster that will write the aggregated insurance costsperinsurance_company, bucketed byaccident_detail (or, what animals are causing the most harm in terms of costs):

INSERT INTOdws.insurance_costsSELECTUPPER(SUBSTRING(m.insurance_company,0,4)||'_'||SUBSTRING(ac.accident_detail,0,4)) es_key,m.insurance_company,ac.accident_detail,SUM(ac.claim_total) member_totalFROMdwb.accident_claims acJOINdwb.members mONac.member_id=m.idWHEREac.claim_status<>'DENIED'GROUP BYm.insurance_company,ac.accident_detail;

Finally, create asimpledashboard in Kibana

References


[8]ページ先頭

©2009-2025 Movatter.jp