Flink Connector🔗
Apache Flink supports creating Iceberg table directly without creating the explicit Flink catalog in Flink SQL. That means we can just create an iceberg table by specifying'connector'='iceberg'
table option in Flink SQL which is similar to usage in the Flink officialdocument.
In Flink, the SQLCREATE TABLE test (..) WITH ('connector'='iceberg', ...)
will create a Flink table in current Flink catalog (useGenericInMemoryCatalog by default),which is just mapping to the underlying iceberg table instead of maintaining iceberg table directly in current Flink catalog.
To create the table in Flink SQL by using SQL syntaxCREATE TABLE test (..) WITH ('connector'='iceberg', ...)
, Flink iceberg connector provides the following table properties:
connector
: Use the constanticeberg
.catalog-name
: User-specified catalog name. It's required because the connector don't have any default value.catalog-type
:hive
orhadoop
for built-in catalogs (defaults tohive
), or left unset for custom catalog implementations usingcatalog-impl
.catalog-impl
: The fully-qualified class name of a custom catalog implementation. Must be set ifcatalog-type
is unset. See alsocustom catalog for more details.catalog-database
: The iceberg database name in the backend catalog, use the current flink database name by default.catalog-table
: The iceberg table name in the backend catalog. Default to use the table name in the flinkCREATE TABLE
sentence.
Table managed in Hive catalog.🔗
Before executing the following SQL, please make sure you've configured the Flink SQL client correctly according to thequick start documentation.
The following SQL will create a Flink table in the current Flink catalog, which maps to the iceberg tabledefault_database.flink_table
managed in iceberg catalog.
CREATETABLEflink_table(idBIGINT,dataSTRING)WITH('connector'='iceberg','catalog-name'='hive_prod','uri'='thrift://localhost:9083','warehouse'='hdfs://nn:8020/path/to/warehouse');
If you want to create a Flink table mapping to a different iceberg table managed in Hive catalog (such ashive_db.hive_iceberg_table
in Hive), then you can create Flink table as following:
CREATETABLEflink_table(idBIGINT,dataSTRING)WITH('connector'='iceberg','catalog-name'='hive_prod','catalog-database'='hive_db','catalog-table'='hive_iceberg_table','uri'='thrift://localhost:9083','warehouse'='hdfs://nn:8020/path/to/warehouse');
Info
The underlying catalog database (hive_db
in the above example) will be created automatically if it does not exist when writing records into the Flink table.
Table managed in hadoop catalog🔗
The following SQL will create a Flink table in current Flink catalog, which maps to the iceberg tabledefault_database.flink_table
managed in hadoop catalog.
CREATETABLEflink_table(idBIGINT,dataSTRING)WITH('connector'='iceberg','catalog-name'='hadoop_prod','catalog-type'='hadoop','warehouse'='hdfs://nn:8020/path/to/warehouse');
Table managed in custom catalog🔗
The following SQL will create a Flink table in current Flink catalog, which maps to the iceberg tabledefault_database.flink_table
managed ina custom catalog of typecom.my.custom.CatalogImpl
.
CREATETABLEflink_table(idBIGINT,dataSTRING)WITH('connector'='iceberg','catalog-name'='custom_prod','catalog-impl'='com.my.custom.CatalogImpl',-- More table properties for the customized catalog'my-additional-catalog-config'='my-value',...);
Please check sections under the Integrations tab for all custom catalogs.
A complete example.🔗
Take the Hive catalog as an example:
CREATETABLEflink_table(idBIGINT,dataSTRING)WITH('connector'='iceberg','catalog-name'='hive_prod','uri'='thrift://localhost:9083','warehouse'='file:///path/to/warehouse');INSERTINTOflink_tableVALUES(1,'AAA'),(2,'BBB'),(3,'CCC');SETexecution.result-mode=tableau;SELECT*FROMflink_table;+----+------+|id|data|+----+------+|1|AAA||2|BBB||3|CCC|+----+------+3rowsinset
For more details, please refer to the IcebergFlink documentation.