Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Flink DDL

DDL commands🔗

CREATE Catalog🔗

Hive catalog🔗

This creates an Iceberg catalog namedhive_catalog that can be configured using'catalog-type'='hive', which loads tables from Hive metastore:

CREATECATALOGhive_catalogWITH('type'='iceberg','catalog-type'='hive','uri'='thrift://localhost:9083','clients'='5','property-version'='1','warehouse'='hdfs://nn:8020/warehouse/path');

The following properties can be set if using the Hive catalog:

  • uri: The Hive metastore's thrift URI. (Required)
  • clients: The Hive metastore client pool size, default value is 2. (Optional)
  • warehouse: The Hive warehouse location, users should specify this path if neither set thehive-conf-dir to specify a location containing ahive-site.xml configuration file nor add a correcthive-site.xml to classpath.
  • hive-conf-dir: Path to a directory containing ahive-site.xml configuration file which will be used to provide custom Hive configuration values. The value ofhive.metastore.warehouse.dir from<hive-conf-dir>/hive-site.xml (or hive configure file from classpath) will be overwritten with thewarehouse value if setting bothhive-conf-dir andwarehouse when creating iceberg catalog.
  • hadoop-conf-dir: Path to a directory containingcore-site.xml andhdfs-site.xml configuration files which will be used to provide custom Hadoop configuration values.

Hadoop catalog🔗

Iceberg also supports a directory-based catalog in HDFS that can be configured using'catalog-type'='hadoop':

CREATECATALOGhadoop_catalogWITH('type'='iceberg','catalog-type'='hadoop','warehouse'='hdfs://nn:8020/warehouse/path','property-version'='1');

The following properties can be set if using the Hadoop catalog:

  • warehouse: The HDFS directory to store metadata files and data files. (Required)

Execute the sql commandUSE CATALOG hadoop_catalog to set the current catalog.

REST catalog🔗

This creates an iceberg catalog namedrest_catalog that can be configured using'catalog-type'='rest', which loads tables from a REST catalog:

CREATECATALOGrest_catalogWITH('type'='iceberg','catalog-type'='rest','uri'='https://localhost/');

The following properties can be set if using the REST catalog:

  • uri: The URL to the REST Catalog (Required)
  • credential: A credential to exchange for a token in the OAuth2 client credentials flow (Optional)
  • token: A token which will be used to interact with the server (Optional)

Custom catalog🔗

Flink also supports loading a custom IcebergCatalog implementation by specifying thecatalog-impl property:

CREATECATALOGmy_catalogWITH('type'='iceberg','catalog-impl'='com.my.custom.CatalogImpl','my-additional-catalog-config'='my-value');

Create through YAML config🔗

Catalogs can be registered insql-client-defaults.yaml before starting the SQL client.

catalogs:-name:my_catalogtype:icebergcatalog-type:hadoopwarehouse:hdfs://nn:8020/warehouse/path

Create through SQL Files🔗

The Flink SQL Client supports the-i startup option to execute an initialization SQL file to set up environment when starting up the SQL Client.

-- define available catalogsCREATECATALOGhive_catalogWITH('type'='iceberg','catalog-type'='hive','uri'='thrift://localhost:9083','warehouse'='hdfs://nn:8020/warehouse/path');USECATALOGhive_catalog;

Using-i <init.sql> option to initialize SQL Client session:

/path/to/bin/sql-client.sh-i/path/to/init.sql

CREATE DATABASE🔗

By default, Iceberg will use thedefault database in Flink. Using the following example to create a separate database in order to avoid creating tables under thedefault database:

CREATEDATABASEiceberg_db;USEiceberg_db;

CREATE TABLE🔗

CREATETABLE`hive_catalog`.`default`.`sample`(idBIGINTCOMMENT'unique id',dataSTRINGNOTNULL)WITH('format-version'='2');

Table create commands support the commonly usedFlink create clauses including:

  • PARTITION BY (column1, column2, ...) to configure partitioning, Flink does not yet support hidden partitioning.
  • COMMENT 'table document' to set a table description.
  • WITH ('key'='value', ...) to settable configuration which will be stored in Iceberg table properties.

Currently, it does not support computed column and watermark definition etc.

PRIMARY KEY🔗

Primary key constraint can be declared for a column or a set of columns, which must be unique and do not contain null.It's required forUPSERT mode.

CREATETABLE`hive_catalog`.`default`.`sample`(idBIGINTCOMMENT'unique id',dataSTRINGNOTNULL,PRIMARYKEY(`id`)NOTENFORCED)WITH('format-version'='2');

PARTITIONED BY🔗

To create a partition table, usePARTITIONED BY:

CREATETABLE`hive_catalog`.`default`.`sample`(idBIGINTCOMMENT'unique id',dataSTRINGNOTNULL)PARTITIONEDBY(data)WITH('format-version'='2');

Iceberg supports hidden partitioning but Flink doesn't support partitioning by a function on columns. There is no way to support hidden partitions in the Flink DDL.

CREATE TABLE LIKE🔗

To create a table with the same schema, partitioning, and table properties as another table, useCREATE TABLE LIKE.

CREATETABLE`hive_catalog`.`default`.`sample`(idBIGINTCOMMENT'unique id',dataSTRING);CREATETABLE`hive_catalog`.`default`.`sample_like`LIKE`hive_catalog`.`default`.`sample`;

For more details, refer to theFlinkCREATE TABLE documentation.

ALTER TABLE🔗

Iceberg only support altering table properties:

ALTERTABLE`hive_catalog`.`default`.`sample`SET('write.format.default'='avro');

ALTER TABLE .. RENAME TO🔗

ALTERTABLE`hive_catalog`.`default`.`sample`RENAMETO`hive_catalog`.`default`.`new_sample`;

DROP TABLE🔗

To delete a table, run:

DROPTABLE`hive_catalog`.`default`.`sample`;

[8]ページ先頭

©2009-2025 Movatter.jp