Flink DDL
DDL commands🔗
CREATE Catalog
🔗
Hive catalog🔗
This creates an Iceberg catalog namedhive_catalog
that can be configured using'catalog-type'='hive'
, which loads tables from Hive metastore:
CREATECATALOGhive_catalogWITH('type'='iceberg','catalog-type'='hive','uri'='thrift://localhost:9083','clients'='5','property-version'='1','warehouse'='hdfs://nn:8020/warehouse/path');
The following properties can be set if using the Hive catalog:
uri
: The Hive metastore's thrift URI. (Required)clients
: The Hive metastore client pool size, default value is 2. (Optional)warehouse
: The Hive warehouse location, users should specify this path if neither set thehive-conf-dir
to specify a location containing ahive-site.xml
configuration file nor add a correcthive-site.xml
to classpath.hive-conf-dir
: Path to a directory containing ahive-site.xml
configuration file which will be used to provide custom Hive configuration values. The value ofhive.metastore.warehouse.dir
from<hive-conf-dir>/hive-site.xml
(or hive configure file from classpath) will be overwritten with thewarehouse
value if setting bothhive-conf-dir
andwarehouse
when creating iceberg catalog.hadoop-conf-dir
: Path to a directory containingcore-site.xml
andhdfs-site.xml
configuration files which will be used to provide custom Hadoop configuration values.
Hadoop catalog🔗
Iceberg also supports a directory-based catalog in HDFS that can be configured using'catalog-type'='hadoop'
:
CREATECATALOGhadoop_catalogWITH('type'='iceberg','catalog-type'='hadoop','warehouse'='hdfs://nn:8020/warehouse/path','property-version'='1');
The following properties can be set if using the Hadoop catalog:
warehouse
: The HDFS directory to store metadata files and data files. (Required)
Execute the sql commandUSE CATALOG hadoop_catalog
to set the current catalog.
REST catalog🔗
This creates an iceberg catalog namedrest_catalog
that can be configured using'catalog-type'='rest'
, which loads tables from a REST catalog:
The following properties can be set if using the REST catalog:
uri
: The URL to the REST Catalog (Required)credential
: A credential to exchange for a token in the OAuth2 client credentials flow (Optional)token
: A token which will be used to interact with the server (Optional)
Custom catalog🔗
Flink also supports loading a custom IcebergCatalog
implementation by specifying thecatalog-impl
property:
CREATECATALOGmy_catalogWITH('type'='iceberg','catalog-impl'='com.my.custom.CatalogImpl','my-additional-catalog-config'='my-value');
Create through YAML config🔗
Catalogs can be registered insql-client-defaults.yaml
before starting the SQL client.
Create through SQL Files🔗
The Flink SQL Client supports the-i
startup option to execute an initialization SQL file to set up environment when starting up the SQL Client.
-- define available catalogsCREATECATALOGhive_catalogWITH('type'='iceberg','catalog-type'='hive','uri'='thrift://localhost:9083','warehouse'='hdfs://nn:8020/warehouse/path');USECATALOGhive_catalog;
Using-i <init.sql>
option to initialize SQL Client session:
CREATE DATABASE
🔗
By default, Iceberg will use thedefault
database in Flink. Using the following example to create a separate database in order to avoid creating tables under thedefault
database:
CREATE TABLE
🔗
CREATETABLE`hive_catalog`.`default`.`sample`(idBIGINTCOMMENT'unique id',dataSTRINGNOTNULL)WITH('format-version'='2');
Table create commands support the commonly usedFlink create clauses including:
PARTITION BY (column1, column2, ...)
to configure partitioning, Flink does not yet support hidden partitioning.COMMENT 'table document'
to set a table description.WITH ('key'='value', ...)
to settable configuration which will be stored in Iceberg table properties.
Currently, it does not support computed column and watermark definition etc.
PRIMARY KEY
🔗
Primary key constraint can be declared for a column or a set of columns, which must be unique and do not contain null.It's required forUPSERT
mode.
CREATETABLE`hive_catalog`.`default`.`sample`(idBIGINTCOMMENT'unique id',dataSTRINGNOTNULL,PRIMARYKEY(`id`)NOTENFORCED)WITH('format-version'='2');
PARTITIONED BY
🔗
To create a partition table, usePARTITIONED BY
:
CREATETABLE`hive_catalog`.`default`.`sample`(idBIGINTCOMMENT'unique id',dataSTRINGNOTNULL)PARTITIONEDBY(data)WITH('format-version'='2');
Iceberg supports hidden partitioning but Flink doesn't support partitioning by a function on columns. There is no way to support hidden partitions in the Flink DDL.
CREATE TABLE LIKE
🔗
To create a table with the same schema, partitioning, and table properties as another table, useCREATE TABLE LIKE
.
CREATETABLE`hive_catalog`.`default`.`sample`(idBIGINTCOMMENT'unique id',dataSTRING);CREATETABLE`hive_catalog`.`default`.`sample_like`LIKE`hive_catalog`.`default`.`sample`;
For more details, refer to theFlinkCREATE TABLE
documentation.
ALTER TABLE
🔗
Iceberg only support altering table properties:
ALTER TABLE .. RENAME TO
🔗
DROP TABLE
🔗
To delete a table, run: