Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Spark DDL🔗

To use Iceberg in Spark, first configureSpark catalogs. Iceberg uses Apache Spark's DataSourceV2 API for data source and catalog implementations.

CREATE TABLE🔗

Spark 3 can create tables in any Iceberg catalog with the clauseUSING iceberg:

CREATETABLEprod.db.sample(idbigintNOTNULLCOMMENT'unique id',datastring)USINGiceberg;

Iceberg will convert the column type in Spark to corresponding Iceberg type. Please check the section oftype compatibility on creating table for details.

Table create commands, including CTAS and RTAS, support the full range of Spark create clauses, including:

  • PARTITIONED BY (partition-expressions) to configure partitioning
  • LOCATION '(fully-qualified-uri)' to set the table location
  • COMMENT 'table documentation' to set a table description
  • TBLPROPERTIES ('key'='value', ...) to settable configuration

Create commands may also set the default format with theUSING clause. This is only supported forSparkCatalog because Spark handles theUSING clause differently for the built-in catalog.

CREATE TABLE ... LIKE ... syntax is not supported.

PARTITIONED BY🔗

To create a partitioned table, usePARTITIONED BY:

CREATETABLEprod.db.sample(idbigint,datastring,categorystring)USINGicebergPARTITIONEDBY(category);

ThePARTITIONED BY clause supports transform expressions to createhidden partitions.

CREATETABLEprod.db.sample(idbigint,datastring,categorystring,tstimestamp)USINGicebergPARTITIONEDBY(bucket(16,id),days(ts),category);

Supported transformations are:

  • year(ts): partition by year
  • month(ts): partition by month
  • day(ts) ordate(ts): equivalent to dateint partitioning
  • hour(ts) ordate_hour(ts): equivalent to dateint and hour partitioning
  • bucket(N, col): partition by hashed value mod N buckets
  • truncate(L, col): partition by value truncated to L
    • Strings are truncated to the given length
    • Integers and longs truncate to bins:truncate(10, i) produces partitions 0, 10, 20, 30, ...

Note: Old syntax ofyears(ts),months(ts),days(ts) andhours(ts) are also supported for compatibility.

CREATE TABLE ... AS SELECT🔗

Iceberg supports CTAS as an atomic operation when using aSparkCatalog. CTAS is supported, but is not atomic when usingSparkSessionCatalog.

CREATETABLEprod.db.sampleUSINGicebergASSELECT...

The newly created table won't inherit the partition spec and table properties from the source table in SELECT, you can use PARTITIONED BY and TBLPROPERTIES in CTAS to declare partition spec and table properties for the new table.

CREATETABLEprod.db.sampleUSINGicebergPARTITIONEDBY(part)TBLPROPERTIES('key'='value')ASSELECT...

REPLACE TABLE ... AS SELECT🔗

Iceberg supports RTAS as an atomic operation when using aSparkCatalog. RTAS is supported, but is not atomic when usingSparkSessionCatalog.

Atomic table replacement creates a new snapshot with the results of theSELECT query, but keeps table history.

REPLACETABLEprod.db.sampleUSINGicebergASSELECT...
REPLACETABLEprod.db.sampleUSINGicebergPARTITIONEDBY(part)TBLPROPERTIES('key'='value')ASSELECT...
CREATEORREPLACETABLEprod.db.sampleUSINGicebergASSELECT...

The schema and partition spec will be replaced if changed. To avoid modifying the table's schema and partitioning, useINSERT OVERWRITE instead ofREPLACE TABLE.The new table properties in theREPLACE TABLE command will be merged with any existing table properties. The existing table properties will be updated if changed else they are preserved.

DROP TABLE🔗

The drop table behavior changed in 0.14.

Prior to 0.14, runningDROP TABLE would remove the table from the catalog and delete the table contents as well.

From 0.14 onwards,DROP TABLE would only remove the table from the catalog.In order to delete the table contentsDROP TABLE PURGE should be used.

DROP TABLE🔗

To drop the table from the catalog, run:

DROPTABLEprod.db.sample;

DROP TABLE PURGE🔗

To drop the table from the catalog and delete the table's contents, run:

DROPTABLEprod.db.samplePURGE;

ALTER TABLE🔗

Iceberg has fullALTER TABLE support in Spark 3, including:

  • Renaming a table
  • Setting or removing table properties
  • Adding, deleting, and renaming columns
  • Adding, deleting, and renaming nested fields
  • Reordering top-level columns and nested struct fields
  • Widening the type ofint,float, anddecimal fields
  • Making required columns optional

In addition,SQL extensions can be used to add support for partition evolution and setting a table's write order

ALTER TABLE ... RENAME TO🔗

ALTERTABLEprod.db.sampleRENAMETOprod.db.new_name;

ALTER TABLE ... SET TBLPROPERTIES🔗

ALTERTABLEprod.db.sampleSETTBLPROPERTIES('read.split.target-size'='268435456');

Iceberg uses table properties to control table behavior. For a list of available properties, seeTable configuration.

UNSET is used to remove properties:

ALTERTABLEprod.db.sampleUNSETTBLPROPERTIES('read.split.target-size');

SET TBLPROPERTIES can also be used to set the table comment (description):

ALTERTABLEprod.db.sampleSETTBLPROPERTIES('comment'='A table comment.');

ALTER TABLE ... ADD COLUMN🔗

To add a column to Iceberg, use theADD COLUMNS clause withALTER TABLE:

ALTERTABLEprod.db.sampleADDCOLUMNS(new_columnstringcomment'new_column docs');

Multiple columns can be added at the same time, separated by commas.

Nested columns should be identified using the full column name:

-- create a struct columnALTERTABLEprod.db.sampleADDCOLUMNpointstruct<x:double,y:double>;-- add a field to the structALTERTABLEprod.db.sampleADDCOLUMNpoint.zdouble;
-- create a nested array column of structALTERTABLEprod.db.sampleADDCOLUMNpointsarray<struct<x:double,y:double>>;-- add a field to the struct within an array. Using keyword 'element' to access the array's element column.ALTERTABLEprod.db.sampleADDCOLUMNpoints.element.zdouble;
-- create a map column of struct key and struct valueALTERTABLEprod.db.sampleADDCOLUMNpointsmap<struct<x:int>,struct<a:int>>;-- add a field to the value struct in a map. Using keyword 'value' to access the map's value column.ALTERTABLEprod.db.sampleADDCOLUMNpoints.value.bint;

Note: Altering a map 'key' column by adding columns is not allowed. Only map values can be updated.

Add columns in any position by addingFIRST orAFTER clauses:

ALTERTABLEprod.db.sampleADDCOLUMNnew_columnbigintAFTERother_column;
ALTERTABLEprod.db.sampleADDCOLUMNnested.new_columnbigintFIRST;

ALTER TABLE ... RENAME COLUMN🔗

Iceberg allows any field to be renamed. To rename a field, useRENAME COLUMN:

ALTERTABLEprod.db.sampleRENAMECOLUMNdataTOpayload;ALTERTABLEprod.db.sampleRENAMECOLUMNlocation.latTOlatitude;

Note that nested rename commands only rename the leaf field. The above command renameslocation.lat tolocation.latitude

ALTER TABLE ... ALTER COLUMN🔗

Alter column is used to widen types, make a field optional, set comments, and reorder fields.

Iceberg allows updating column types if the update is safe. Safe updates are:

  • int tobigint
  • float todouble
  • decimal(P,S) todecimal(P2,S) when P2 > P (scale cannot change)
ALTERTABLEprod.db.sampleALTERCOLUMNmeasurementTYPEdouble;

To add or remove columns from a struct, useADD COLUMN orDROP COLUMN with a nested column name.

Column comments can also be updated usingALTER COLUMN:

ALTERTABLEprod.db.sampleALTERCOLUMNmeasurementTYPEdoubleCOMMENT'unit is bytes per second';ALTERTABLEprod.db.sampleALTERCOLUMNmeasurementCOMMENT'unit is kilobytes per second';

Iceberg allows reordering top-level columns or columns in a struct usingFIRST andAFTER clauses:

ALTERTABLEprod.db.sampleALTERCOLUMNcolFIRST;
ALTERTABLEprod.db.sampleALTERCOLUMNnested.colAFTERother_col;

Nullability for a non-nullable column can be changed usingDROP NOT NULL:

ALTERTABLEprod.db.sampleALTERCOLUMNidDROPNOTNULL;

Info

It is not possible to change a nullable column to a non-nullable column withSET NOT NULL because Iceberg doesn't know whether there is existing data with null values.

Info

ALTER COLUMN is not used to updatestruct types. UseADD COLUMN andDROP COLUMN to add or remove struct fields.

ALTER TABLE ... DROP COLUMN🔗

To drop columns, useALTER TABLE ... DROP COLUMN:

ALTERTABLEprod.db.sampleDROPCOLUMNid;ALTERTABLEprod.db.sampleDROPCOLUMNpoint.z;

ALTER TABLE SQL extensions🔗

These commands are available in Spark 3 when using IcebergSQL extensions.

ALTER TABLE ... ADD PARTITION FIELD🔗

Iceberg supports adding new partition fields to a spec usingADD PARTITION FIELD:

ALTERTABLEprod.db.sampleADDPARTITIONFIELDcatalog;-- identity transform

Partition transforms are also supported:

ALTERTABLEprod.db.sampleADDPARTITIONFIELDbucket(16,id);ALTERTABLEprod.db.sampleADDPARTITIONFIELDtruncate(4,data);ALTERTABLEprod.db.sampleADDPARTITIONFIELDyear(ts);-- use optional AS keyword to specify a custom name for the partition fieldALTERTABLEprod.db.sampleADDPARTITIONFIELDbucket(16,id)ASshard;

Adding a partition field is a metadata operation and does not change any of the existing table data. New data will be written with the new partitioning, but existing data will remain in the old partition layout. Old data files will have null values for the new partition fields in metadata tables.

Dynamic partition overwrite behavior will change when the table's partitioning changes because dynamic overwrite replaces partitions implicitly. To overwrite explicitly, use the newDataFrameWriterV2 API.

Note

To migrate from daily to hourly partitioning with transforms, it is not necessary to drop the daily partition field. Keeping the field ensures existing metadata table queries continue to work.

Danger

Dynamic partition overwrite behavior will change when partitioning changesFor example, if you partition by days and move to partitioning by hours, overwrites will overwrite hourly partitions but not days anymore.

ALTER TABLE ... DROP PARTITION FIELD🔗

Partition fields can be removed usingDROP PARTITION FIELD:

ALTERTABLEprod.db.sampleDROPPARTITIONFIELDcatalog;ALTERTABLEprod.db.sampleDROPPARTITIONFIELDbucket(16,id);ALTERTABLEprod.db.sampleDROPPARTITIONFIELDtruncate(4,data);ALTERTABLEprod.db.sampleDROPPARTITIONFIELDyear(ts);ALTERTABLEprod.db.sampleDROPPARTITIONFIELDshard;

Note that although the partition is removed, the column will still exist in the table schema.

Dropping a partition field is a metadata operation and does not change any of the existing table data. New data will be written with the new partitioning, but existing data will remain in the old partition layout.

Danger

Dynamic partition overwrite behavior will change when partitioning changesFor example, if you partition by days and move to partitioning by hours, overwrites will overwrite hourly partitions but not days anymore.

Danger

Be careful when dropping a partition field because it will change the schema of metadata tables, likefiles, and may cause metadata queries to fail or produce different results.

ALTER TABLE ... REPLACE PARTITION FIELD🔗

A partition field can be replaced by a new partition field in a single metadata update by usingREPLACE PARTITION FIELD:

ALTERTABLEprod.db.sampleREPLACEPARTITIONFIELDts_dayWITHday(ts);-- use optional AS keyword to specify a custom name for the new partition fieldALTERTABLEprod.db.sampleREPLACEPARTITIONFIELDts_dayWITHday(ts)ASday_of_ts;

ALTER TABLE ... WRITE ORDERED BY🔗

Iceberg tables can be configured with a sort order that is used to automatically sort data that is written to the table in some engines. For example,MERGE INTO in Spark will use the table ordering.

To set the write order for a table, useWRITE ORDERED BY:

ALTERTABLEprod.db.sampleWRITEORDEREDBYcategory,id-- use optional ASC/DEC keyword to specify sort order of each field (default ASC)ALTERTABLEprod.db.sampleWRITEORDEREDBYcategoryASC,idDESC-- use optional NULLS FIRST/NULLS LAST keyword to specify null order of each field (default FIRST)ALTERTABLEprod.db.sampleWRITEORDEREDBYcategoryASCNULLSLAST,idDESCNULLSFIRST

Info

Table write order does not guarantee data order for queries. It only affects how data is written to the table.

WRITE ORDERED BY sets a global ordering where rows are ordered across tasks, like usingORDER BY in anINSERT command:

INSERTINTOprod.db.sampleSELECTid,data,category,tsFROManother_tableORDERBYts,category

To order within each task, not across tasks, useLOCALLY ORDERED BY:

ALTERTABLEprod.db.sampleWRITELOCALLYORDEREDBYcategory,id

To unset the sort order of the table, useUNORDERED:

ALTERTABLEprod.db.sampleWRITEUNORDERED

ALTER TABLE ... WRITE DISTRIBUTED BY PARTITION🔗

WRITE DISTRIBUTED BY PARTITION will request that each partition is handled by one writer, the default implementation is hash distribution.

ALTERTABLEprod.db.sampleWRITEDISTRIBUTEDBYPARTITION

DISTRIBUTED BY PARTITION andLOCALLY ORDERED BY may be used together, to distribute by partition and locally order rows within each task.

ALTERTABLEprod.db.sampleWRITEDISTRIBUTEDBYPARTITIONLOCALLYORDEREDBYcategory,id

ALTER TABLE ... SET IDENTIFIER FIELDS🔗

Iceberg supports settingidentifier fields to a spec usingSET IDENTIFIER FIELDS:Spark table can support Flink SQL upsert operation if the table has identifier fields.

ALTERTABLEprod.db.sampleSETIDENTIFIERFIELDSid-- single columnALTERTABLEprod.db.sampleSETIDENTIFIERFIELDSid,data-- multiple columns

Identifier fields must beNOT NULL columns when they are created or added. The laterALTER statement will overwrite the previous setting.

ALTER TABLE ... DROP IDENTIFIER FIELDS🔗

Identifier fields can be removed usingDROP IDENTIFIER FIELDS:

ALTERTABLEprod.db.sampleDROPIDENTIFIERFIELDSid-- single columnALTERTABLEprod.db.sampleDROPIDENTIFIERFIELDSid,data-- multiple columns

Note that although the identifier is removed, the column will still exist in the table schema.

Branching and Tagging DDL🔗

ALTER TABLE ... CREATE BRANCH🔗

Branches can be created via theCREATE BRANCH statement with the following options:

  • Do not fail if the branch already exists withIF NOT EXISTS
  • Update the branch if it already exists withCREATE OR REPLACE
  • Create a branch at a specific snapshot
  • Create a branch with a specified retention period
-- CREATE audit-branch at current snapshot with default retention.ALTERTABLEprod.db.sampleCREATEBRANCH`audit-branch`-- CREATE audit-branch at current snapshot with default retention if it doesn't exist.ALTERTABLEprod.db.sampleCREATEBRANCHIFNOTEXISTS`audit-branch`-- CREATE audit-branch at current snapshot with default retention or REPLACE it if it already exists.ALTERTABLEprod.db.sampleCREATEORREPLACEBRANCH`audit-branch`-- CREATE audit-branch at snapshot 1234 with default retention.ALTERTABLEprod.db.sampleCREATEBRANCH`audit-branch`ASOFVERSION1234-- CREATE audit-branch at snapshot 1234, retain audit-branch for 30 days, and retain the latest 30 days. The latest 3 snapshot snapshots, and 2 days worth of snapshots.ALTERTABLEprod.db.sampleCREATEBRANCH`audit-branch`ASOFVERSION1234RETAIN30DAYSWITHSNAPSHOTRETENTION3SNAPSHOTS2DAYS

ALTER TABLE ... CREATE TAG🔗

Tags can be created via theCREATE TAG statement with the following options:

  • Do not fail if the tag already exists withIF NOT EXISTS
  • Update the tag if it already exists withCREATE OR REPLACE
  • Create a tag at a specific snapshot
  • Create a tag with a specified retention period
-- CREATE historical-tag at current snapshot with default retention.ALTERTABLEprod.db.sampleCREATETAG`historical-tag`-- CREATE historical-tag at current snapshot with default retention if it doesn't exist.ALTERTABLEprod.db.sampleCREATETAGIFNOTEXISTS`historical-tag`-- CREATE historical-tag at current snapshot with default retention or REPLACE it if it already exists.ALTERTABLEprod.db.sampleCREATEORREPLACETAG`historical-tag`-- CREATE historical-tag at snapshot 1234 with default retention.ALTERTABLEprod.db.sampleCREATETAG`historical-tag`ASOFVERSION1234-- CREATE historical-tag at snapshot 1234 and retain it for 1 year.ALTERTABLEprod.db.sampleCREATETAG`historical-tag`ASOFVERSION1234RETAIN365DAYS

ALTER TABLE ... REPLACE BRANCH🔗

The snapshot which a branch references can be updated viatheREPLACE BRANCH sql. Retention can also be updated in this statement.

-- REPLACE audit-branch to reference snapshot 4567 and update the retention to 60 days.ALTERTABLEprod.db.sampleREPLACEBRANCH`audit-branch`ASOFVERSION4567RETAIN60DAYS

ALTER TABLE ... REPLACE TAG🔗

The snapshot which a tag references can be updated viatheREPLACE TAG sql. Retention can also be updated in this statement.

-- REPLACE historical-tag to reference snapshot 4567 and update the retention to 60 days.ALTERTABLEprod.db.sampleREPLACETAG`historical-tag`ASOFVERSION4567RETAIN60DAYS

ALTER TABLE ... DROP BRANCH🔗

Branches can be removed via theDROP BRANCH sql

ALTERTABLEprod.db.sampleDROPBRANCH`audit-branch`

ALTER TABLE ... DROP TAG🔗

Tags can be removed via theDROP TAG sql

ALTERTABLEprod.db.sampleDROPTAG`historical-tag`

Iceberg views in Spark🔗

Iceberg views are acommon representation of a SQL view that aim to be interpreted across multiple query engines.This section covers how to create and manage views in Spark using Spark 3.4 and above (earlier versions of Spark are not supported).

Note

All the SQL examples in this section follow the official Spark SQL syntax:

Creating a view🔗

Create a simple view without any comments or properties:

CREATEVIEW<viewName>ASSELECT*FROM<tableName>

UsingIF NOT EXISTS prevents the SQL statement from failing in case the view already exists:

CREATEVIEWIFNOTEXISTS<viewName>ASSELECT*FROM<tableName>

Create a view with a comment, including aliased and commented columns that are different from the source table:

CREATEVIEW<viewName>(IDCOMMENT'Unique ID',ZIPCOMMENT'Zipcode')COMMENT'View Comment'ASSELECTid,zipFROM<tableName>

Creating a view with properties🔗

Create a view with properties usingTBLPROPERTIES:

CREATEVIEW<viewName>TBLPROPERTIES('key1'='val1','key2'='val2')ASSELECT*FROM<tableName>

Display view properties:

SHOWTBLPROPERTIES<viewName>

Dropping a view🔗

Drop an existing view:

DROPVIEW<viewName>

UsingIF EXISTS prevents the SQL statement from failing if the view does not exist:

DROPVIEWIFEXISTS<viewName>

Replacing a view🔗

Update a view's schema, its properties, or the underlying SQL statement usingCREATE OR REPLACE:

CREATEORREPLACE<viewName>(updated_idCOMMENT'updated ID')TBLPROPERTIES('key1'='new_val1')ASSELECTidFROM<tableName>

Setting and removing view properties🔗

Set the properties of an existing view usingALTER VIEW ... SET TBLPROPERTIES:

ALTERVIEW<viewName>SETTBLPROPERTIES('key1'='val1','key2'='val2')

Remove the properties from an existing view usingALTER VIEW ... UNSET TBLPROPERTIES:

ALTERVIEW<viewName>UNSETTBLPROPERTIES('key1','key2')

Showing available views🔗

List all views in the currently set namespace (viaUSE <namespace>):

SHOWVIEWS

List all available views in the defined catalog and/or namespace using one of the below variations:

SHOWVIEWSIN<catalog>
SHOWVIEWSIN<namespace>
SHOWVIEWSIN<catalog>.<namespace>

Showing the CREATE statement of a view🔗

Show the CREATE statement of a view:

SHOWCREATETABLE<viewName>

Displaying view details🔗

Display additional view details usingDESCRIBE:

DESCRIBE[EXTENDED]<viewName>

[8]ページ先頭

©2009-2025 Movatter.jp