Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Spark Procedures🔗

To use Iceberg in Spark, first configureSpark catalogs. Stored procedures are only available when usingIceberg SQL extensions in Spark 3.

Usage🔗

Procedures can be used from any configured Iceberg catalog withCALL. All procedures are in the namespacesystem.

CALL supports passing arguments by name (recommended) or by position. Mixing position and named arguments is not supported.

Named arguments🔗

All procedure arguments are named. When passing arguments by name, arguments can be in any order and any optional argument can be omitted.

CALLcatalog_name.system.procedure_name(arg_name_2=>arg_2,arg_name_1=>arg_1);

Positional arguments🔗

When passing arguments by position, only the ending arguments may be omitted if they are optional.

CALLcatalog_name.system.procedure_name(arg_1,arg_2,...arg_n);

Snapshot management🔗

rollback_to_snapshot🔗

Roll back a table to a specific snapshot ID.

To roll back to a specific time, userollback_to_timestamp.

Info

This procedure invalidates all cached Spark plans that reference the affected table.

Usage🔗

Argument NameRequired?TypeDescription
table✔️stringName of the table to update
snapshot_id✔️longSnapshot ID to rollback to

Output🔗

Output NameTypeDescription
previous_snapshot_idlongThe current snapshot ID before the rollback
current_snapshot_idlongThe new current snapshot ID

Example🔗

Roll back tabledb.sample to snapshot ID1:

CALLcatalog_name.system.rollback_to_snapshot('db.sample',1);

rollback_to_timestamp🔗

Roll back a table to the snapshot that was current at some time.

Info

This procedure invalidates all cached Spark plans that reference the affected table.

Usage🔗

Argument NameRequired?TypeDescription
table✔️stringName of the table to update
timestamp✔️timestampA timestamp to rollback to

Output🔗

Output NameTypeDescription
previous_snapshot_idlongThe current snapshot ID before the rollback
current_snapshot_idlongThe new current snapshot ID

Example🔗

Roll backdb.sample to a specific day and time.

CALLcatalog_name.system.rollback_to_timestamp('db.sample',TIMESTAMP'2021-06-30 00:00:00.000');

set_current_snapshot🔗

Sets the current snapshot ID for a table.

Unlike rollback, the snapshot is not required to be an ancestor of the current table state.

Info

This procedure invalidates all cached Spark plans that reference the affected table.

Usage🔗

Argument NameRequired?TypeDescription
table✔️stringName of the table to update
snapshot_idlongSnapshot ID to set as current
refstringSnapshot Reference (branch or tag) to set as current

Eithersnapshot_id orref must be provided but not both.

Output🔗

Output NameTypeDescription
previous_snapshot_idlongThe current snapshot ID before the rollback
current_snapshot_idlongThe new current snapshot ID

Example🔗

Set the current snapshot fordb.sample to 1:

CALLcatalog_name.system.set_current_snapshot('db.sample',1);

Set the current snapshot fordb.sample to tags1:

CALLcatalog_name.system.set_current_snapshot(table=>'db.sample',ref=>'s1');

cherrypick_snapshot🔗

Cherry-picks changes from a snapshot into the current table state.

Cherry-picking creates a new snapshot from an existing snapshot without altering or removing the original.

Only append and dynamic overwrite snapshots can be cherry-picked.

Info

This procedure invalidates all cached Spark plans that reference the affected table.

Usage🔗

Argument NameRequired?TypeDescription
table✔️stringName of the table to update
snapshot_id✔️longThe snapshot ID to cherry-pick

Output🔗

Output NameTypeDescription
source_snapshot_idlongThe table's current snapshot before the cherry-pick
current_snapshot_idlongThe snapshot ID created by applying the cherry-pick

Examples🔗

Cherry-pick snapshot 1

CALLcatalog_name.system.cherrypick_snapshot('my_table',1);

Cherry-pick snapshot 1 with named args

CALLcatalog_name.system.cherrypick_snapshot(snapshot_id=>1,table=>'my_table');

publish_changes🔗

Publish changes from a staged WAP ID into the current table state.

publish_changes creates a new snapshot from an existing snapshot without altering or removing the original.

Only append and dynamic overwrite snapshots can be successfully published.

Info

This procedure invalidates all cached Spark plans that reference the affected table.

Usage🔗

Argument NameRequired?TypeDescription
table✔️stringName of the table to update
wap_id✔️longThe wap_id to be published from stage to prod

Output🔗

Output NameTypeDescription
source_snapshot_idlongThe table's current snapshot before publishing the change
current_snapshot_idlongThe snapshot ID created by applying the change

Examples🔗

publish_changes with WAP ID 'wap_id_1'

CALLcatalog_name.system.publish_changes('my_table','wap_id_1');

publish_changes with named args

CALLcatalog_name.system.publish_changes(wap_id=>'wap_id_2',table=>'my_table');

fast_forward🔗

Fast-forward the current snapshot of one branch to the latest snapshot of another.

Usage🔗

Argument NameRequired?TypeDescription
table✔️stringName of the table to update
branch✔️stringName of the branch to fast-forward
to✔️string

Output🔗

Output NameTypeDescription
branch_updatedstringName of the branch that has been fast-forwarded
previous_reflongThe snapshot ID before applying fast-forward
updated_reflongThe current snapshot ID after applying fast-forward

Examples🔗

Fast-forward the main branch to the head ofaudit-branch

CALLcatalog_name.system.fast_forward('my_table','main','audit-branch');

Metadata management🔗

Manymaintenance actions can be performed using Iceberg stored procedures.

expire_snapshots🔗

Each write/update/delete/upsert/compaction in Iceberg produces a new snapshot while keeping the old data and metadataaround for snapshot isolation and time travel. Theexpire_snapshots procedure can be used to remove older snapshotsand their files which are no longer needed.

This procedure will remove old snapshots and data files which are uniquely required by those old snapshots. This meanstheexpire_snapshots procedure will never remove files which are still required by a non-expired snapshot.

Usage🔗

Argument NameRequired?TypeDescription
table✔️stringName of the table to update
older_thantimestampTimestamp before which snapshots will be removed (Default: 5 days ago)
retain_lastintNumber of ancestor snapshots to preserve regardless ofolder_than (defaults to 1)
max_concurrent_deletesintSize of the thread pool used for delete file actions (by default, no thread pool is used)
stream_resultsbooleanWhen true, deletion files will be sent to Spark driver by RDD partition (by default, all the files will be sent to Spark driver). This option is recommended to set totrue to prevent Spark driver OOM from large file size
snapshot_idsarray of longArray of snapshot IDs to expire.

Ifolder_than andretain_last are omitted, the table'sexpiration properties will be used.Snapshots that are still referenced by branches or tags won't be removed. By default, branches and tags never expire, but their retention policy can be changed with the table propertyhistory.expire.max-ref-age-ms. Themain branch never expires.

Output🔗

Output NameTypeDescription
deleted_data_files_countlongNumber of data files deleted by this operation
deleted_position_delete_files_countlongNumber of position delete files deleted by this operation
deleted_equality_delete_files_countlongNumber of equality delete files deleted by this operation
deleted_manifest_files_countlongNumber of manifest files deleted by this operation
deleted_manifest_lists_countlongNumber of manifest list files deleted by this operation
deleted_statistics_files_countlongNumber of statistics files deleted by this operation

Examples🔗

Remove snapshots older than specific day and time, but retain the last 100 snapshots:

CALLhive_prod.system.expire_snapshots('db.sample',TIMESTAMP'2021-06-30 00:00:00.000',100);

Remove snapshots with snapshot ID123 (note that this snapshot ID should not be the current snapshot):

CALLhive_prod.system.expire_snapshots(table=>'db.sample',snapshot_ids=>ARRAY(123));

remove_orphan_files🔗

Used to remove files which are not referenced in any metadata files of an Iceberg table and can thus be considered "orphaned".

Usage🔗

Argument NameRequired?TypeDescription
table✔️stringName of the table to clean
older_thantimestampRemove orphan files created before this timestamp (Defaults to 3 days ago)
locationstringDirectory to look for files in (defaults to the table's location)
dry_runbooleanWhen true, don't actually remove files (defaults to false)
max_concurrent_deletesintSize of the thread pool used for delete file actions (by default, no thread pool is used)
file_list_viewstringDataset to look for files in (skipping the directory listing)
equal_schemesmapMapping of file system schemes to be considered equal. Key is a comma-separated list of schemes and value is a scheme (defaults tomap('s3a,s3n','s3')).
equal_authoritiesmapMapping of file system authorities to be considered equal. Key is a comma-separated list of authorities and value is an authority.
prefix_mismatch_modestringAction behavior when location prefixes (schemes/authorities) mismatch:
  • ERROR - throw an exception. (default)
  • IGNORE - no action.
  • DELETE - delete files.

Output🔗

Output NameTypeDescription
orphan_file_locationStringThe path to each file determined to be an orphan by this command

Examples🔗

List all the files that are candidates for removal by performing a dry run of theremove_orphan_files command on this table without actually removing them:

CALLcatalog_name.system.remove_orphan_files(table=>'db.sample',dry_run=>true);

Remove any files in thetablelocation/data folder which are not known to the tabledb.sample.

CALLcatalog_name.system.remove_orphan_files(table=>'db.sample',location=>'tablelocation/data');

Remove any files in thefiles_view view which are not known to the tabledb.sample.

Dataset<Row>compareToFileList=spark.createDataFrame(allFiles,FilePathLastModifiedRecord.class).withColumnRenamed("filePath","file_path").withColumnRenamed("lastModified","last_modified");StringfileListViewName="files_view";compareToFileList.createOrReplaceTempView(fileListViewName);
CALLcatalog_name.system.remove_orphan_files(table=>'db.sample',file_list_view=>'files_view');

When a file matches references in metadata files except for location prefix (scheme/authority), an error is thrown by default. The error can be ignored and the file will be skipped by settingprefix_mismatch_mode toIGNORE.

CALLcatalog_name.system.remove_orphan_files(table=>'db.sample',prefix_mismatch_mode=>'IGNORE');

The file can still be deleted by settingprefix_mismatch_mode toDELETE.

CALLcatalog_name.system.remove_orphan_files(table=>'db.sample',prefix_mismatch_mode=>'DELETE');

The file can also be deleted by considering the mismatched prefixes equal.

CALLcatalog_name.system.remove_orphan_files(table=>'db.sample',equal_schemes=>map('file','file1'));

CALLcatalog_name.system.remove_orphan_files(table=>'db.sample',equal_authorities=>map('ns1','ns2'));

rewrite_data_files🔗

Iceberg tracks each data file in a table. More data files leads to more metadata stored in manifest files, and small data files causes an unnecessary amount of metadata and less efficient queries from file open costs.

Iceberg can compact data files in parallel using Spark with therewriteDataFiles action. This will combine small files into larger files to reduce metadata overhead and runtime file open cost.

Usage🔗

Argument NameRequired?TypeDescription
table✔️stringName of the table to update
strategystringName of the strategy - binpack or sort. Defaults to binpack strategy
sort_orderstringFor Zorder use a comma separated list of columns within zorder(). Example: zorder(c1,c2,c3).
Else, Comma separated sort orders in the format (ColumnName SortDirection NullOrder).
Where SortDirection can be ASC or DESC. NullOrder can be NULLS FIRST or NULLS LAST.
Defaults to the table's sort order
optionsmapOptions to be used for actions
wherestringpredicate as a string used for filtering the files. Note that all files that may contain data matching the filter will be selected for rewriting

Options🔗

General Options🔗
NameDefault ValueDescription
max-concurrent-file-group-rewrites5Maximum number of file groups to be simultaneously rewritten
partial-progress.enabledfalseEnable committing groups of files prior to the entire rewrite completing
partial-progress.max-commits10Maximum amount of commits that this rewrite is allowed to produce if partial progress is enabled
partial-progress.max-failed-commitsvalue ofpartital-progress.max-commitsMaximum amount of failed commits allowed before job failure, if partial progress is enabled
use-starting-sequence-numbertrueUse the sequence number of the snapshot at compaction start time instead of that of the newly produced snapshot
rewrite-job-ordernoneForce the rewrite job order based on the value.
  • If rewrite-job-order=bytes-asc, then rewrite the smallest job groups first.
  • If rewrite-job-order=bytes-desc, then rewrite the largest job groups first.
  • If rewrite-job-order=files-asc, then rewrite the job groups with the least files first.
  • If rewrite-job-order=files-desc, then rewrite the job groups with the most files first.
  • If rewrite-job-order=none, then rewrite job groups in the order they were planned (no specific ordering).
target-file-size-bytes536870912 (512 MB, default value ofwrite.target-file-size-bytes fromtable properties)Target output file size
min-file-size-bytes75% of target file sizeFiles under this threshold will be considered for rewriting regardless of any other criteria
max-file-size-bytes180% of target file sizeFiles with sizes above this threshold will be considered for rewriting regardless of any other criteria
min-input-files5Any file group exceeding this number of files will be rewritten regardless of other criteria
rewrite-allfalseForce rewriting of all provided files overriding other options
max-file-group-size-bytes107374182400 (100GB)Largest amount of data that should be rewritten in a single file group. The entire rewrite operation is broken down into pieces based on partitioning and within partitions based on size into file-groups. This helps with breaking down the rewriting of very large partitions which may not be rewritable otherwise due to the resource constraints of the cluster.
delete-file-threshold2147483647Minimum number of deletes that needs to be associated with a data file for it to be considered for rewriting
delete-ratio-threshold0.3Minimum deletion ratio that needs to be associated with a data file for it to be considered for rewriting
output-spec-idcurrent partition spec idIdentifier of the output partition spec. Data will be reorganized during the rewrite to align with the output partitioning.
remove-dangling-deletesfalseRemove dangling position and equality deletes after rewriting. A delete file is considered dangling if it does not apply to any live data files. Enabling this will generate an additional commit for the removal.

Info

Dangling delete files are removed based solely on data sequence numbers. This action does not apply to global equality deletes or invalid equality deletes if their delete conditions do not match any data files, nor to position delete files containing position deletes no longer matching any live data files.

Options for sort strategy🔗
NameDefault ValueDescription
compression-factor1.0The number of shuffle partitions and consequently the number of output files created by the Spark sort is based on the size of the input data files used in this file rewriter. Due to compression, the disk file sizes may not accurately represent the size of files in the output. This parameter lets the user adjust the file size used for estimating actual output data size. A factor greater than 1.0 would generate more files than we would expect based on the on-disk file size. A value less than 1.0 would create fewer files than we would expect based on the on-disk size.
shuffle-partitions-per-file1Number of shuffle partitions to use for each output file. Iceberg will use a custom coalesce operation to stitch these sorted partitions back together into a single sorted file.
Options for sort strategy with zorder sort_order🔗
NameDefault ValueDescription
var-length-contribution8Number of bytes considered from an input column of a type with variable length (String, Binary)
max-output-size2147483647Amount of bytes interleaved in the ZOrder algorithm

Output🔗

Output NameTypeDescription
rewritten_data_files_countintNumber of data which were re-written by this command
added_data_files_countintNumber of new data files which were written by this command
rewritten_bytes_countlongNumber of bytes which were written by this command
failed_data_files_countintNumber of data files that failed to be rewritten whenpartial-progress.enabled is true

Examples🔗

Rewrite the data files in tabledb.sample using the default rewrite algorithm of bin-packing to combine small files and also split large files according to the default write size of the table.

CALLcatalog_name.system.rewrite_data_files('db.sample');

Rewrite the data files in tabledb.sample by sorting all the data on id and name using the same defaults as bin-pack to determine which files to rewrite.

CALLcatalog_name.system.rewrite_data_files(table=>'db.sample',strategy=>'sort',sort_order=>'id DESC NULLS LAST,name ASC NULLS FIRST');

Rewrite the data files in tabledb.sample by zOrdering on column c1 and c2.Using the same defaults as bin-pack to determine which files to rewrite.

CALLcatalog_name.system.rewrite_data_files(table=>'db.sample',strategy=>'sort',sort_order=>'zorder(c1,c2)');

Rewrite the data files in tabledb.sample using bin-pack strategy in any partition where at least two files need rewriting, and then remove any dangling delete files.

CALLcatalog_name.system.rewrite_data_files(table=>'db.sample',options=>map('min-input-files','2','remove-dangling-deletes','true'));

Rewrite the data files in tabledb.sample and select the files that may contain data matching the filter (id = 3 and name = "foo") to be rewritten.

CALLcatalog_name.system.rewrite_data_files(table=>'db.sample',where=>'id = 3 and name = "foo"');

rewrite_manifests🔗

Rewrite manifests for a table to optimize scan planning.

Data files in manifests are sorted by fields in the partition spec. This procedure runs in parallel using a Spark job.

Info

This procedure invalidates all cached Spark plans that reference the affected table.

Usage🔗

Argument NameRequired?TypeDescription
table✔️stringName of the table to update
use_cachingbooleanUse Spark caching during operation (defaults to true)
spec_idintSpec id of the manifests to rewrite (defaults to current spec id)

Output🔗

Output NameTypeDescription
rewritten_manifests_countintNumber of manifests which were re-written by this command
added_mainfests_countintNumber of new manifest files which were written by this command

Examples🔗

Rewrite the manifests in tabledb.sample and align manifest files with table partitioning.

CALLcatalog_name.system.rewrite_manifests('db.sample');

Rewrite the manifests in tabledb.sample and disable the use of Spark caching. This could be done to avoid memory issues on executors.

CALLcatalog_name.system.rewrite_manifests('db.sample',false);

rewrite_position_delete_files🔗

Iceberg can rewrite position delete files, which serves two purposes:

  • Minor Compaction: Compact small position delete files into larger ones. This reduces the size of metadata stored in manifest files and overhead of opening small delete files.
  • Remove Dangling Deletes: Filter out position delete records that refer to data files that are no longer live. After rewrite_data_files, position delete records pointing to the rewritten data files are not always marked for removal, and can remain tracked by the table's live snapshot metadata. This is known as the 'dangling delete' problem.

Usage🔗

Argument NameRequired?TypeDescription
table✔️stringName of the table to update
optionsmapOptions to be used for procedure

Dangling deletes are always filtered out during rewriting.

Options🔗

NameDefault ValueDescription
max-concurrent-file-group-rewrites5Maximum number of file groups to be simultaneously rewritten
partial-progress.enabledfalseEnable committing groups of files prior to the entire rewrite completing
partial-progress.max-commits10Maximum amount of commits that this rewrite is allowed to produce if partial progress is enabled
rewrite-job-ordernoneForce the rewrite job order based on the value.
  • If rewrite-job-order=bytes-asc, then rewrite the smallest job groups first.
  • If rewrite-job-order=bytes-desc, then rewrite the largest job groups first.
  • If rewrite-job-order=files-asc, then rewrite the job groups with the least files first.
  • If rewrite-job-order=files-desc, then rewrite the job groups with the most files first.
  • If rewrite-job-order=none, then rewrite job groups in the order they were planned (no specific ordering).
target-file-size-bytes67108864 (64MB, default value ofwrite.delete.target-file-size-bytes fromtable properties)Target output file size
min-file-size-bytes75% of target file sizeFiles under this threshold will be considered for rewriting regardless of any other criteria
max-file-size-bytes180% of target file sizeFiles with sizes above this threshold will be considered for rewriting regardless of any other criteria
min-input-files5Any file group exceeding this number of files will be rewritten regardless of other criteria
rewrite-allfalseForce rewriting of all provided files overriding other options
max-file-group-size-bytes107374182400 (100GB)Largest amount of data that should be rewritten in a single file group. The entire rewrite operation is broken down into pieces based on partitioning and within partitions based on size into file-groups. This helps with breaking down the rewriting of very large partitions which may not be rewritable otherwise due to the resource constraints of the cluster.

Output🔗

Output NameTypeDescription
rewritten_delete_files_countintNumber of delete files which were removed by this command
added_delete_files_countintNumber of delete files which were added by this command
rewritten_bytes_countlongCount of bytes across delete files which were removed by this command
added_bytes_countlongCount of bytes across all new delete files which were added by this command

Examples🔗

Rewrite position delete files in tabledb.sample. This selects position delete files that fit default rewrite criteria, and writes new files of target sizetarget-file-size-bytes. Dangling deletes are removed from rewritten delete files.

CALLcatalog_name.system.rewrite_position_delete_files('db.sample');

Rewrite all position delete files in tabledb.sample, writing new filestarget-file-size-bytes. Dangling deletes are removed from rewritten delete files.

CALLcatalog_name.system.rewrite_position_delete_files(table=>'db.sample',options=>map('rewrite-all','true'));

Rewrite position delete files in tabledb.sample. This selects position delete files in partitions where 2 or more position delete files need to be rewritten based on size criteria. Dangling deletes are removed from rewritten delete files.

CALLcatalog_name.system.rewrite_position_delete_files(table=>'db.sample',options=>map('min-input-files','2'));

Table migration🔗

Thesnapshot andmigrate procedures help test and migrate existing Hive or Spark tables to Iceberg.

snapshot🔗

Create a light-weight temporary copy of a table for testing, without changing the source table.

The newly created table can be changed or written to without affecting the source table, but the snapshot uses the original table's data files.

When inserts or overwrites run on the snapshot, new files are placed in the snapshot table's location rather than the original table location.

When finished testing a snapshot table, clean it up by runningDROP TABLE.

Info

Because tables created bysnapshot are not the sole owners of their data files, they are prohibited fromactions likeexpire_snapshots which would physically delete data files. Iceberg deletes, which only effect metadata,are still allowed. In addition, any operations which affect the original data files will disrupt the Snapshot's integrity. DELETE statements executed against the original Hive table will remove original data files and thesnapshot table will no longer be able to access them.

Seemigrate to replace an existing table with an Iceberg table.

Usage🔗

Argument NameRequired?TypeDescription
source_table✔️stringName of the table to snapshot
table✔️stringName of the new Iceberg table to create
locationstringTable location for the new table (delegated to the catalog by default)
propertiesmapProperties to add to the newly created table
parallelismintNumber of threads to use for file reading (defaults to 1)

Output🔗

Output NameTypeDescription
imported_files_countlongNumber of files added to the new table

Examples🔗

Make an isolated Iceberg table which references tabledb.sample nameddb.snap at thecatalog's default location fordb.snap.

CALLcatalog_name.system.snapshot('db.sample','db.snap');

Migrate an isolated Iceberg table which references tabledb.sample nameddb.snap ata manually specified location/tmp/temptable/.

CALLcatalog_name.system.snapshot('db.sample','db.snap','/tmp/temptable/');

migrate🔗

Replace a table with an Iceberg table, loaded with the source's data files.

Table schema, partitioning, properties, and location will be copied from the source table.

Migrate will fail if any table partition uses an unsupported format. Supported formats are Avro, Parquet, and ORC.Existing data files are added to the Iceberg table's metadata and can be read using a name-to-id mapping created from the original table schema.

To leave the original table intact while testing, usesnapshot to create new temporary table that shares source data files and schema.

By default, the original table is retained with the nametable_BACKUP_.

Usage🔗

Argument NameRequired?TypeDescription
table✔️stringName of the table to migrate
propertiesmapProperties for the new Iceberg table
drop_backupbooleanWhen true, the original table will not be retained as backup (defaults to false)
backup_table_namestringName of the table that will be retained as backup (defaults totable_BACKUP_)
parallelismintNumber of threads to use for file reading (defaults to 1)

Output🔗

Output NameTypeDescription
migrated_files_countlongNumber of files appended to the Iceberg table

Examples🔗

Migrate the tabledb.sample in Spark's default catalog to an Iceberg table and add a property 'foo' set to 'bar':

CALLcatalog_name.system.migrate('spark_catalog.db.sample',map('foo','bar'));

Migratedb.sample in the current catalog to an Iceberg table without adding any additional properties:

CALLcatalog_name.system.migrate('db.sample');

add_files🔗

Attempts to directly add files from a Hive or file based table into a given Iceberg table. Unlike migrate orsnapshot,add_files can import files from a specific partition or partitions and does not create a new Iceberg table.This command will create metadata for the new files and will not move them. This procedure will not analyze the schema of the files to determine if they actually match the schema of the Iceberg table. Upon completion, the Iceberg table will then treat these files as if they are part of the set of files owned by Iceberg. This means any subsequentexpire_snapshot calls will be able to physically delete the added files. This method should not be used ifmigrate orsnapshot are possible.

Warning

Keep in mind theadd_files procedure will fetch the Parquet metadata from each file being added just once. If you're using tiered storage, (such asAmazon S3 Intelligent-Tiering storage class), the underlying, file will be retrieved from the archive, and will remain on a higher tier for a set period of time.

Usage🔗

Argument NameRequired?TypeDescription
table✔️stringTable which will have files added to
source_table✔️stringTable where files should come from, paths are also possible in the form of `file_format`.`path`
partition_filtermapA map of partitions in the source table to import from
check_duplicate_filesbooleanWhether to prevent files existing in the table from being added (defaults to true)
parallelismintNumber of threads to use for file reading (defaults to 1)

Warning : Schema is not validated, adding files with different schema to the Iceberg table will cause issues.

Warning : Files added by this method can be physically deleted by Iceberg operations

Output🔗

Output NameTypeDescription
added_files_countlongThe number of files added by this command
changed_partition_countlongThe number of partitioned changed by this command (if known)

Warning

changed_partition_count will be NULL when table propertycompatibility.snapshot-id-inheritance.enabled is set to true or if the table format version is > 1.

Examples🔗

Add the files from tabledb.src_table, a Hive or Spark table registered in the session Catalog, to Iceberg tabledb.tbl. Only add files that exist within partitions wherepart_col_1 is equal toA.

CALLspark_catalog.system.add_files(table=>'db.tbl',source_table=>'db.src_tbl',partition_filter=>map('part_col_1','A'));

Add files from aparquet file based table at locationpath/to/table to the Iceberg tabledb.tbl. Add allfiles regardless of what partition they belong to.

CALLspark_catalog.system.add_files(table=>'db.tbl',source_table=>'`parquet`.`path/to/table`');

register_table🔗

Creates a catalog entry for a metadata.json file which already exists but does not have a corresponding catalog identifier.

Usage🔗

Argument NameRequired?TypeDescription
table✔️stringTable which is to be registered
metadata_file✔️stringMetadata file which is to be registered as a new catalog identifier

Warning

Having the same metadata.json registered in more than one catalog can lead to missing updates, loss of data, and table corruption.Only use this procedure when the table is no longer registered in an existing catalog, or you are moving a table between catalogs.

Output🔗

Output NameTypeDescription
current_snapshot_idlongThe current snapshot ID of the newly registered Iceberg table
total_records_countlongTotal records count of the newly registered Iceberg table
total_data_files_countlongTotal data files count of the newly registered Iceberg table

Examples🔗

Register a new table asdb.tbl tospark_catalog pointing to metadata.json filepath/to/metadata/file.json.

CALLspark_catalog.system.register_table(table=>'db.tbl',metadata_file=>'path/to/metadata/file.json');

Metadata information🔗

ancestors_of🔗

Report the live snapshot IDs of parents of a specified snapshot

Usage🔗

Argument NameRequired?TypeDescription
table✔️stringName of the table to report live snapshot IDs
snapshot_idlongUse a specified snapshot to get the live snapshot IDs of parents

tip : Using snapshot_id

Given snapshots history with roll back to B and addition of C' -> D'

A->B->C->D\->C' -> (D')
Not specifying the snapshot ID would return A -> B -> C' -> D', while providing the snapshot ID ofD as an argument would return A-> B -> C -> D

Output🔗

Output NameTypeDescription
snapshot_idlongthe ancestor snapshot id
timestamplongsnapshot creation time

Examples🔗

Get all the snapshot ancestors of current snapshots(default)

CALLspark_catalog.system.ancestors_of('db.tbl');

Get all the snapshot ancestors by a particular snapshot

CALLspark_catalog.system.ancestors_of('db.tbl',1);CALLspark_catalog.system.ancestors_of(snapshot_id=>1,table=>'db.tbl');

Change Data Capture🔗

create_changelog_view🔗

Creates a view that contains the changes from a given table.

Usage🔗

Argument NameRequired?TypeDescription
table✔️stringName of the source table for the changelog
changelog_viewstringName of the view to create
optionsmapA map of Spark read options to use
net_changesbooleanWhether to output net changes (see below for more information). Defaults to false. It must be false whencompute_updates is true.
compute_updatesbooleanWhether to compute pre/post update images (see below for more information). Defaults to true ifidentifer_columns are provided; otherwise, defaults to false.
identifier_columnsarrayThe list of identifier columns to compute updates. If the argumentcompute_updates is set to true andidentifier_columns are not provided, the table’s current identifier fields will be used.

Here is a list of commonly used Spark read options:

  • start-snapshot-id: the exclusive start snapshot ID. If not provided, it reads from the table’s first snapshot inclusively.
  • end-snapshot-id: the inclusive end snapshot id, default to table's current snapshot.
  • start-timestamp: the exclusive start timestamp. If not provided, it reads from the table’s first snapshot inclusively.
  • end-timestamp: the inclusive end timestamp, default to table's current snapshot.

Output🔗

Output NameTypeDescription
changelog_viewstringThe name of the created changelog view

Examples🔗

Create a changelog viewtbl_changes based on the changes that happened between snapshot1 (exclusive) and2 (inclusive).

CALLspark_catalog.system.create_changelog_view(table=>'db.tbl',options=>map('start-snapshot-id','1','end-snapshot-id','2'));

Create a changelog viewmy_changelog_view based on the changes that happened between timestamp1678335750489 (exclusive) and1678992105265 (inclusive).

CALLspark_catalog.system.create_changelog_view(table=>'db.tbl',options=>map('start-timestamp','1678335750489','end-timestamp','1678992105265'),changelog_view=>'my_changelog_view');

Create a changelog view that computes updates based on the identifier columnsid andname.

CALLspark_catalog.system.create_changelog_view(table=>'db.tbl',options=>map('start-snapshot-id','1','end-snapshot-id','2'),identifier_columns=>array('id','name'));

Once the changelog view is created, you can query the view to see the changes that happened between the snapshots.

SELECT*FROMtbl_changes;
SELECT*FROMtbl_changeswhere_change_type='INSERT'ANDid=3ORDERBY_change_ordinal;
Please note that the changelog view includes Change Data Capture(CDC) metadata columnsthat provide additional information about the changes being tracked. These columns are:

  • _change_type: the type of change. It has one of the following values:INSERT,DELETE,UPDATE_BEFORE, orUPDATE_AFTER.
  • _change_ordinal: the order of changes
  • _commit_snapshot_id: the snapshot ID where the change occurred

Here is an example of corresponding results. It shows that the first snapshot inserted 2 records, and thesecond snapshot deleted 1 record.

idname_change_type_change_ordinal_commit_snapshot_id
1AliceINSERT05390529835796506035
2BobINSERT05390529835796506035
1AliceDELETE18764748981452218370

Net Changes🔗

The procedure can remove intermediate changes across multiple snapshots, and only outputs the net changes. Here is an example to create a changelog view that computes net changes.

CALLspark_catalog.system.create_changelog_view(table=>'db.tbl',options=>map('end-snapshot-id','87647489814522183702'),net_changes=>true);

With the net changes, the above changelog view only contains the following row since Alice was inserted in the first snapshot and deleted in the second snapshot.

idname_change_type_change_ordinal_commit_snapshot_id
2BobINSERT05390529835796506035

Carry-over Rows🔗

The procedure removes the carry-over rows by default. Carry-over rows are the result of row-level operations(MERGE,UPDATE andDELETE)when using copy-on-write. For example, given a file which contains row1(id=1, name='Alice') and row2(id=2, name='Bob').A copy-on-write delete of row2 would require erasing this file and preserving row1 in a new file. The changelog tablereports this as the following pair of rows, despite it not being an actual change to the table.

idname_change_type
1AliceDELETE
1AliceINSERT

To see carry-over rows, querySparkChangelogTable as follows:

SELECT*FROMspark_catalog.db.tbl.changes;

Pre/Post Update Images🔗

The procedure computes the pre/post update images if configured. Pre/post update images are converted from apair of a delete row and an insert row. Identifier columns are used for determining whether an insert and a delete recordrefer to the same row. If the two records share the same values for the identity columns they are considered to be beforeand after states of the same row. You can either set identifier fields in the table schema or input them as the procedure parameters.

The following example shows pre/post update images computation with an identifier column(id), where a row deletionand an insertion with the sameid are treated as a single update operation. Specifically, suppose we have the following pair of rows:

idname_change_type
3RobertDELETE
3DanINSERT

In this case, the procedure marks the row before the update as anUPDATE_BEFORE image and the row after the updateas anUPDATE_AFTER image, resulting in the following pre/post update images:

idname_change_type
3RobertUPDATE_BEFORE
3DanUPDATE_AFTER

Table Statistics🔗

compute_table_stats🔗

This procedure calculates theNumber of Distinct Values (NDV) statistics for a specific table.By default, statistics are computed for all columns using the table's current snapshot.The procedure can be optionally configured to compute statistics for a specific snapshot and/or a subset of columns.

Argument NameRequired?TypeDescription
table✔️stringName of the table
snapshot_idstringId of the snapshot to collect stats
columnsarrayColumns to collect stats

Output🔗

Output NameTypeDescription
statistics_filestringPath to stats file created from by this command

Examples🔗

Collect statistics of the latest snapshot of tablemy_table

CALLcatalog_name.system.compute_table_stats('my_table');

Collect statistics of the snapshot with idsnap1 of tablemy_table

CALLcatalog_name.system.compute_table_stats(table=>'my_table',snapshot_id=>'snap1');

Collect statistics of the snapshot with idsnap1 of tablemy_table for columnscol1 andcol2

CALLcatalog_name.system.compute_table_stats(table=>'my_table',snapshot_id=>'snap1',columns=>array('col1','col2'));

Table Replication🔗

Therewrite_table_path procedure prepares an Iceberg table for copying to another location.

rewrite_table_path🔗

Stages a copy of the Iceberg table's metadata files where every absolute path source prefix is replaced by the specified target prefix.
This can be the starting point to fully or incrementally copy an Iceberg table to a new location.

Info

This procedure only stages rewritten metadata files and prepares a list of files to copy. The actual file copy is not included in this procedure.

Argument NameRequired?defaultTypeDescription
table✔️stringName of the table
source_prefix✔️stringThe existing prefix to be replaced
target_prefix✔️stringThe replacement prefix forsource_prefix
start_versionfirst metadata.json in table's metadata logstringThe name or path of the chronologically first metadata.json to rewrite
end_versionlatest metadata.json in table's metadata logstringThe name or path of the chronologically last metadata.json to rewrite
staging_locationnew directory under table's metadata directorystringThe output location for newly rewritten metadata files

Modes of operation🔗

  • Full Rewrite: A full rewrite will rewrite all reachable metadata files (this includes metadata.json, manifest lists, manifests, and position delete files), and will return all reachable files in thefile_list_location. This is the default mode of operation for this procedure.

  • Incremental Rewrite: Optionally,start_version andend_version can be provided to limit the scope to an incremental rewrite. An incremental rewrite will only rewrite metadata files added betweenstart_version andend_version, and will only return files added in this range in thefile_list_location.

Output🔗

Output NameTypeDescription
latest_versionstringName of the latest metadata file rewritten by this procedure
file_list_locationstringPath to a CSV file containing a mapping of source to target paths
File List🔗

The file contains the copy plan for all files added to the table betweenstart_version andend_version.

For each file, it specifies:

  • Source Path: The original file path in the table, or the staging location if the file has been rewritten

  • Target Path: The path with the replacement prefix

The following example shows a copy plan for three files:

sourcepath/datafile1.parquet,targetpath/datafile1.parquetsourcepath/datafile2.parquet,targetpath/datafile2.parquetstagingpath/manifest.avro,targetpath/manifest.avro

Examples🔗

This example fully rewrites metadata paths ofmy_table from source location in HDFS to a target location in S3.It will produce a new set of metadata in the default staging location under the table's metadata directory.

CALLcatalog_name.system.rewrite_table_path(table=>'db.my_table',source_prefix=>'hdfs://nn:8020/path/to/source_table',target_prefix=>'s3a://bucket/prefix/db.db/my_table');

This example incrementally rewrites metadata paths ofmy_table between metadata versionsv2.metadata.json andv20.metadata.json,with new metadata files written to an explicit staging location.

CALLcatalog_name.system.rewrite_table_path(table=>'db.my_table',source_prefix=>'s3a://bucketOne/prefix/db.db/my_table',target_prefix=>'s3a://bucketTwo/prefix/db.db/my_table',start_version=>'v2.metadata.json',end_version=>'v20.metadata.json',staging_location=>'s3a://bucketStaging/my_table');

Once the rewrite completes, third-party tools (eg.Distcp) can copy the newly createdmetadata files and data files to the target location.

Lastly, theregister_table procedure can be used to register the copied table in the target location with a catalog.

Warning

Iceberg tables with partition statistics files are not currently supported for path rewrite.


[8]ページ先頭

©2009-2025 Movatter.jp