Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Spark Queries🔗

To use Iceberg in Spark, first configureSpark catalogs. Iceberg uses Apache Spark's DataSourceV2 API for data source and catalog implementations.

Querying with SQL🔗

In Spark 3, tables use identifiers that include acatalog name.

SELECT*FROMprod.db.table;-- catalog: prod, namespace: db, table: table

Metadata tables, likehistory andsnapshots, can use the Iceberg table name as a namespace.

For example, to read from thefiles metadata table forprod.db.table:

SELECT*FROMprod.db.table.files;
contentfile_pathfile_formatspec_idpartitionrecord_countfile_size_in_bytescolumn_sizesvalue_countsnull_value_countsnan_value_countslower_boundsupper_boundskey_metadatasplit_offsetsequality_idssort_order_id
0s3:/.../table/data/00000-3-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquetPARQUET0{1999-01-01, 01}1597[1 -> 90, 2 -> 62][1 -> 1, 2 -> 1][1 -> 0, 2 -> 0][][1 -> , 2 -> c][1 -> , 2 -> c]null[4]nullnull
0s3:/.../table/data/00001-4-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquetPARQUET0{1999-01-01, 02}1597[1 -> 90, 2 -> 62][1 -> 1, 2 -> 1][1 -> 0, 2 -> 0][][1 -> , 2 -> b][1 -> , 2 -> b]null[4]nullnull
0s3:/.../table/data/00002-5-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquetPARQUET0{1999-01-01, 03}1597[1 -> 90, 2 -> 62][1 -> 1, 2 -> 1][1 -> 0, 2 -> 0][][1 -> , 2 -> a][1 -> , 2 -> a]null[4]nullnull

Querying with DataFrames🔗

To load a table as a DataFrame, usetable:

valdf=spark.table("prod.db.table")

Catalogs with DataFrameReader🔗

Paths and table names can be loaded with Spark'sDataFrameReader interface. How tables are loaded depends on howthe identifier is specified. When usingspark.read.format("iceberg").load(table) orspark.table(table) thetablevariable can take a number of forms as listed below:

  • file:///path/to/table: loads a HadoopTable at given path
  • tablename: loadscurrentCatalog.currentNamespace.tablename
  • catalog.tablename: loadstablename from the specified catalog.
  • namespace.tablename: loadsnamespace.tablename from current catalog
  • catalog.namespace.tablename: loadsnamespace.tablename from the specified catalog.
  • namespace1.namespace2.tablename: loadsnamespace1.namespace2.tablename from current catalog

The above list is in order of priority. For example: a matching catalog will take priority over any namespace resolution.

Time travel🔗

SQL🔗

Spark 3.3 and later supports time travel in SQL queries usingTIMESTAMP AS OF orVERSION AS OF clauses.TheVERSION AS OF clause can contain a long snapshot ID or a string branch or tag name.

Info

Note: If the name of a branch or tag is the same as a snapshot ID, then the snapshot which is selected for time travel is the snapshotwith the given snapshot ID. For example, consider the case where there is a tag named '1' and it references snapshot with ID 2. If the version travel clause isVERSION AS OF '1', time travel will be done to the snapshot with ID 1. If this is not desired, rename the tag or branch with a well-defined prefix such as 'snapshot-1'.

-- time travel to October 26, 1986 at 01:21:00SELECT*FROMprod.db.tableTIMESTAMPASOF'1986-10-26 01:21:00';-- time travel to snapshot with id 10963874102873LSELECT*FROMprod.db.tableVERSIONASOF10963874102873;-- time travel to the head snapshot of audit-branchSELECT*FROMprod.db.tableVERSIONASOF'audit-branch';-- time travel to the snapshot referenced by the tag historical-snapshotSELECT*FROMprod.db.tableVERSIONASOF'historical-snapshot';

In addition,FOR SYSTEM_TIME AS OF andFOR SYSTEM_VERSION AS OF clauses are also supported:

SELECT*FROMprod.db.tableFORSYSTEM_TIMEASOF'1986-10-26 01:21:00';SELECT*FROMprod.db.tableFORSYSTEM_VERSIONASOF10963874102873;SELECT*FROMprod.db.tableFORSYSTEM_VERSIONASOF'audit-branch';SELECT*FROMprod.db.tableFORSYSTEM_VERSIONASOF'historical-snapshot';

Timestamps may also be supplied as a Unix timestamp, in seconds:

-- timestamp in secondsSELECT*FROMprod.db.tableTIMESTAMPASOF499162860;SELECT*FROMprod.db.tableFORSYSTEM_TIMEASOF499162860;

The branch or tag may also be specified using a similar syntax to metadata tables, withbranch_<branchname> ortag_<tagname>:

SELECT*FROMprod.db.table.`branch_audit-branch`;SELECT*FROMprod.db.table.`tag_historical-snapshot`;

(Identifiers with "-" are not valid, and so must be escaped using back quotes.)

Note that the identifier with branch or tag may not be used in combination withVERSION AS OF.

Schema selection in time travel queries🔗

The different time travel queries mentioned in the previous section can use either the snapshot's schema or the table's schema:

-- time travel to October 26, 1986 at 01:21:00 -> uses the snapshot's schemaSELECT*FROMprod.db.tableTIMESTAMPASOF'1986-10-26 01:21:00';-- time travel to snapshot with id 10963874102873L -> uses the snapshot's schemaSELECT*FROMprod.db.tableVERSIONASOF10963874102873;-- time travel to the head of audit-branch -> uses the table's schemaSELECT*FROMprod.db.tableVERSIONASOF'audit-branch';SELECT*FROMprod.db.table.`branch_audit-branch`;-- time travel to the snapshot referenced by the tag historical-snapshot -> uses the snapshot's schemaSELECT*FROMprod.db.tableVERSIONASOF'historical-snapshot';SELECT*FROMprod.db.table.`tag_historical-snapshot`;

DataFrame🔗

To select a specific table snapshot or the snapshot at some time in the DataFrame API, Iceberg supports four Spark read options:

  • snapshot-id selects a specific table snapshot
  • as-of-timestamp selects the current snapshot at a timestamp, in milliseconds
  • branch selects the head snapshot of the specified branch. Note that currently branch cannot be combined with as-of-timestamp.
  • tag selects the snapshot associated with the specified tag. Tags cannot be combined withas-of-timestamp.
// time travel to October 26, 1986 at 01:21:00spark.read.option("as-of-timestamp","499162860000").format("iceberg").load("path/to/table")
// time travel to snapshot with ID 10963874102873Lspark.read.option("snapshot-id",10963874102873L).format("iceberg").load("path/to/table")
// time travel to tag historical-snapshotspark.read.option(SparkReadOptions.TAG,"historical-snapshot").format("iceberg").load("path/to/table")
// time travel to the head snapshot of audit-branchspark.read.option(SparkReadOptions.BRANCH,"audit-branch").format("iceberg").load("path/to/table")

Info

Spark 3.0 and earlier versions do not support usingoption withtable in DataFrameReader commands. All options will be silently ignored. Do not usetable when attempting to time-travel or use other options. SeeSPARK-32592.

Incremental read🔗

To read appended data incrementally, use:

  • start-snapshot-id Start snapshot ID used in incremental scans (exclusive).
  • end-snapshot-id End snapshot ID used in incremental scans (inclusive). This is optional. Omitting it will default to the current snapshot.
// get the data added after start-snapshot-id (10963874102873L) until end-snapshot-id (63874143573109L)spark.read.format("iceberg").option("start-snapshot-id","10963874102873").option("end-snapshot-id","63874143573109").load("path/to/table")

Info

Currently gets only the data fromappend operation. Cannot supportreplace,overwrite,delete operations.Incremental read works with both V1 and V2 format-version.Incremental read is not supported by Spark's SQL syntax.

Inspecting tables🔗

To inspect a table's history, snapshots, and other metadata, Iceberg supports metadata tables.

Metadata tables are identified by adding the metadata table name after the original table name. For example, history fordb.table is read usingdb.table.history.

History🔗

To show table history:

SELECT*FROMprod.db.table.history;
made_current_atsnapshot_idparent_idis_current_ancestor
2019-02-08 03:29:51.2155781947118336215154NULLtrue
2019-02-08 03:47:55.94851792995261850568305781947118336215154true
2019-02-09 16:24:30.132964100402475335445179299526185056830false
2019-02-09 16:32:47.33629998756080624373305179299526185056830true
2019-02-09 19:42:03.91989245587860605834792999875608062437330true
2019-02-09 19:49:16.34365367338231819750458924558786060583479true

Info

This shows a commit that was rolled back. The example has two snapshots with the same parent, and one isnot an ancestor of the current table state.

Metadata Log Entries🔗

To show table metadata log entries:

SELECT*fromprod.db.table.metadata_log_entries;
timestampfilelatest_snapshot_idlatest_schema_idlatest_sequence_number
2022-07-28 10:43:52.93s3://.../table/metadata/00000-9441e604-b3c2-498a-a45a-6320e8ab9006.metadata.jsonnullnullnull
2022-07-28 10:43:57.487s3://.../table/metadata/00001-f30823df-b745-4a0a-b293-7532e0c99986.metadata.json17026083367764530001
2022-07-28 10:43:58.25s3://.../table/metadata/00002-2cc2837a-02dc-4687-acc1-b4d86ea486f4.metadata.json95890649397670977402

Snapshots🔗

To show the valid snapshots for a table:

SELECT*FROMprod.db.table.snapshots;
committed_atsnapshot_idparent_idoperationmanifest_listsummary
2019-02-08 03:29:51.21557897183625154nullappends3://.../table/metadata/snap-57897183625154-1.avro{ added-records -> 2478404, total-records -> 2478404, added-data-files -> 438, total-data-files -> 438, spark.app.id -> application_1520379288616_155055 }

You can also join snapshots to table history. For example, this query will show table history, with the application ID that wrote each snapshot:

selecth.made_current_at,s.operation,h.snapshot_id,h.is_current_ancestor,s.summary['spark.app.id']fromprod.db.table.historyhjoinprod.db.table.snapshotssonh.snapshot_id=s.snapshot_idorderbymade_current_at;
made_current_atoperationsnapshot_idis_current_ancestorsummary[spark.app.id]
2019-02-08 03:29:51.215append57897183625154trueapplication_1520379288616_155055
2019-02-09 16:24:30.13delete29641004024753falseapplication_1520379288616_151109
2019-02-09 16:32:47.336append57897183625154trueapplication_1520379288616_155055
2019-02-08 03:47:55.948overwrite51792995261850trueapplication_1520379288616_152431

Entries🔗

To show all the table's current manifest entries for both data and delete files.

SELECT*FROMprod.db.table.entries;
statussnapshot_idsequence_numberfile_sequence_numberdata_filereadable_metrics
25789718362515400{"content":0,"file_path":"s3:/.../table/data/00047-25-833044d0-127b-415c-b874-038a4f978c29-00612.parquet","file_format":"PARQUET","spec_id":0,"record_count":15,"file_size_in_bytes":473,"column_sizes":{1:103},"value_counts":{1:15},"null_value_counts":{1:0},"nan_value_counts":{},"lower_bounds":{1:},"upper_bounds":{1:},"key_metadata":null,"split_offsets":[4],"equality_ids":null,"sort_order_id":0}{"c1":{"column_size":103,"value_count":15,"null_value_count":0,"nan_value_count":null,"lower_bound":1,"upper_bound":3}}

Files🔗

To show a table's current files:

SELECT*FROMprod.db.table.files;
contentfile_pathfile_formatspec_idrecord_countfile_size_in_bytescolumn_sizesvalue_countsnull_value_countsnan_value_countslower_boundsupper_boundskey_metadatasplit_offsetsequality_idssort_order_idreadable_metrics
0s3:/.../table/data/00042-3-a9aa8b24-20bc-4d56-93b0-6b7675782bb5-00001.parquetPARQUET01652{1:52,2:48}{1:1,2:1}{1:0,2:0}{}{1:,2:d}{1:,2:d}NULL[4]NULL0{"data":{"column_size":48,"value_count":1,"null_value_count":0,"nan_value_count":null,"lower_bound":"d","upper_bound":"d"},"id":{"column_size":52,"value_count":1,"null_value_count":0,"nan_value_count":null,"lower_bound":1,"upper_bound":1}}
0s3:/.../table/data/00000-0-f9709213-22ca-4196-8733-5cb15d2afeb9-00001.parquetPARQUET01643{1:46,2:48}{1:1,2:1}{1:0,2:0}{}{1:,2:a}{1:,2:a}NULL[4]NULL0{"data":{"column_size":48,"value_count":1,"null_value_count":0,"nan_value_count":null,"lower_bound":"a","upper_bound":"a"},"id":{"column_size":46,"value_count":1,"null_value_count":0,"nan_value_count":null,"lower_bound":1,"upper_bound":1}}
0s3:/.../table/data/00001-1-f9709213-22ca-4196-8733-5cb15d2afeb9-00001.parquetPARQUET02644{1:49,2:51}{1:2,2:2}{1:0,2:0}{}{1:,2:b}{1:,2:c}NULL[4]NULL0{"data":{"column_size":51,"value_count":2,"null_value_count":0,"nan_value_count":null,"lower_bound":"b","upper_bound":"c"},"id":{"column_size":49,"value_count":2,"null_value_count":0,"nan_value_count":null,"lower_bound":2,"upper_bound":3}}
1s3:/.../table/data/00081-4-a9aa8b24-20bc-4d56-93b0-6b7675782bb5-00001-deletes.parquetPARQUET011560{2147483545:46,2147483546:152}{2147483545:1,2147483546:1}{2147483545:0,2147483546:0}{}{2147483545:,2147483546:s3:/.../table/data/00000-0-f9709213-22ca-4196-8733-5cb15d2afeb9-00001.parquet}{2147483545:,2147483546:s3:/.../table/data/00000-0-f9709213-22ca-4196-8733-5cb15d2afeb9-00001.parquet}NULL[4]NULLNULL{"data":{"column_size":null,"value_count":null,"null_value_count":null,"nan_value_count":null,"lower_bound":null,"upper_bound":null},"id":{"column_size":null,"value_count":null,"null_value_count":null,"nan_value_count":null,"lower_bound":null,"upper_bound":null}}
2s3:/.../table/data/00047-25-833044d0-127b-415c-b874-038a4f978c29-00612.parquetPARQUET012650628613985{100:135377,101:11314}{100:126506,101:126506}{100:105434,101:11}{}{100:0,101:17}{100:404455227527,101:23}NULLNULL[1]0{"id":{"column_size":135377,"value_count":126506,"null_value_count":105434,"nan_value_count":null,"lower_bound":0,"upper_bound":404455227527},"data":{"column_size":11314,"value_count":126506,"null_value_count": 11,"nan_value_count":null,"lower_bound":17,"upper_bound":23}}

Info

Content refers to type of content stored by the data file: * 0 Data * 1 Position Deletes * 2 Equality Deletes

To show only data files or delete files, queryprod.db.table.data_files andprod.db.table.delete_files respectively.To show all files, data files and delete files across all tracked snapshots, queryprod.db.table.all_files,prod.db.table.all_data_files andprod.db.table.all_delete_files respectively.

Manifests🔗

To show a table's current file manifests:

SELECT*FROMprod.db.table.manifests;
pathlengthpartition_spec_idadded_snapshot_idadded_data_files_countexisting_data_files_countdeleted_data_files_countpartition_summaries
s3://.../table/metadata/45b5290b-ee61-4788-b324-b1e2735c0e10-m0.avro447906668963634911763636800[[false,null,2019-05-13,2019-05-15]]

Note:

  1. Fields withinpartition_summaries column of the manifests table correspond tofield_summary structs withinmanifest list, with the following order:
    • contains_null
    • contains_nan
    • lower_bound
    • upper_bound
  2. contains_nan could return null, which indicates that this information is not available from the file's metadata. This usually occurs when reading from V1 table, wherecontains_nan is not populated.

Partitions🔗

To show a table's current partitions:

SELECT*FROMprod.db.table.partitions;
partitionspec_idrecord_countfile_counttotal_data_file_size_in_bytesposition_delete_record_countposition_delete_file_countequality_delete_record_countequality_delete_file_countlast_updated_at(μs)last_updated_snapshot_id
{20211001, 11}011100210016330860341920009205185327307503337
{20211002, 11}04350011001633172537358000867027598972211003
{20211001, 10}074700000016330825987160003280122546965981531
{20211002, 10}032400001116331691594890006941468797545315876

Note:

  1. For unpartitioned tables, the partitions table will not contain the partition and spec_id fields.

  2. The partitions metadata table shows partitions with data files or delete files in the current snapshot. However, delete files are not applied, and so in some cases partitions may be shown even though all their data rows are marked deleted by delete files.

Positional Delete Files🔗

To show all positional delete files from the current snapshot of table:

SELECT*fromprod.db.table.position_deletes;
file_pathposrowspec_iddelete_file_path
s3:/.../table/data/00042-3-a9aa8b24-20bc-4d56-93b0-6b7675782bb5-00001.parquet100s3:/.../table/data/00191-1933-25e9f2f3-d863-4a69-a5e1-f9aeeebe60bb-00001-deletes.parquet

All Metadata Tables🔗

These tables are unions of the metadata tables specific to the current snapshot, and return metadata across all snapshots.

Danger

The "all" metadata tables may produce more than one row per data file or manifest file because metadata files may be part of more than one table snapshot.

All Data Files🔗

To show all of the table's data files and each file's metadata:

SELECT*FROMprod.db.table.all_data_files;
contentfile_pathfile_formatpartitionrecord_countfile_size_in_bytescolumn_sizesvalue_countsnull_value_countsnan_value_countslower_boundsupper_boundskey_metadatasplit_offsetsequality_idssort_order_id
0s3://.../dt=20210102/00000-0-756e2512-49ae-45bb-aae3-c0ca475e7879-00001.parquetPARQUET{20210102}142444{1 -> 94, 2 -> 17}{1 -> 14, 2 -> 14}{1 -> 0, 2 -> 0}{}{1 -> 1, 2 -> 20210102}{1 -> 2, 2 -> 20210102}null[4]null0
0s3://.../dt=20210103/00000-0-26222098-032f-472b-8ea5-651a55b21210-00001.parquetPARQUET{20210103}142444{1 -> 94, 2 -> 17}{1 -> 14, 2 -> 14}{1 -> 0, 2 -> 0}{}{1 -> 1, 2 -> 20210103}{1 -> 3, 2 -> 20210103}null[4]null0
0s3://.../dt=20210104/00000-0-a3bb1927-88eb-4f1c-bc6e-19076b0d952e-00001.parquetPARQUET{20210104}142444{1 -> 94, 2 -> 17}{1 -> 14, 2 -> 14}{1 -> 0, 2 -> 0}{}{1 -> 1, 2 -> 20210104}{1 -> 3, 2 -> 20210104}null[4]null0

All Delete Files🔗

To show the table's delete files and each file's metadata from all the snapshots:

SELECT*FROMprod.db.table.all_delete_files;
contentfile_pathfile_formatspec_idrecord_countfile_size_in_bytescolumn_sizesvalue_countsnull_value_countsnan_value_countslower_boundsupper_boundskey_metadatasplit_offsetsequality_idssort_order_idreadable_metrics
1s3:/.../table/data/00081-4-a9aa8b24-20bc-4d56-93b0-6b7675782bb5-00001-deletes.parquetPARQUET011560{2147483545:46,2147483546:152}{2147483545:1,2147483546:1}{2147483545:0,2147483546:0}{}{2147483545:,2147483546:s3:/.../table/data/00000-0-f9709213-22ca-4196-8733-5cb15d2afeb9-00001.parquet}{2147483545:,2147483546:s3:/.../table/data/00000-0-f9709213-22ca-4196-8733-5cb15d2afeb9-00001.parquet}NULL[4]NULLNULL{"data":{"column_size":null,"value_count":null,"null_value_count":null,"nan_value_count":null,"lower_bound":null,"upper_bound":null},"id":{"column_size":null,"value_count":null,"null_value_count":null,"nan_value_count":null,"lower_bound":null,"upper_bound":null}}
2s3:/.../table/data/00047-25-833044d0-127b-415c-b874-038a4f978c29-00612.parquetPARQUET012650628613985{100:135377,101:11314}{100:126506,101:126506}{100:105434,101:11}{}{100:0,101:17}{100:404455227527,101:23}NULLNULL[1]0{"id":{"column_size":135377,"value_count":126506,"null_value_count":105434,"nan_value_count":null,"lower_bound":0,"upper_bound":404455227527},"data":{"column_size":11314,"value_count":126506,"null_value_count": 11,"nan_value_count":null,"lower_bound":17,"upper_bound":23}}

All Entries🔗

To show the table's manifest entries from all the snapshots for both data and delete files:

SELECT*FROMprod.db.table.all_entries;
statussnapshot_idsequence_numberfile_sequence_numberdata_filereadable_metrics
25789718362515400{"content":0,"file_path":"s3:/.../table/data/00047-25-833044d0-127b-415c-b874-038a4f978c29-00612.parquet","file_format":"PARQUET","spec_id":0,"record_count":15,"file_size_in_bytes":473,"column_sizes":{1:103},"value_counts":{1:15},"null_value_counts":{1:0},"nan_value_counts":{},"lower_bounds":{1:},"upper_bounds":{1:},"key_metadata":null,"split_offsets":[4],"equality_ids":null,"sort_order_id":0}{"c1":{"column_size":103,"value_count":15,"null_value_count":0,"nan_value_count":null,"lower_bound":1,"upper_bound":3}}

All Manifests🔗

To show all of the table's manifest files:

SELECT*FROMprod.db.table.all_manifests;
pathlengthpartition_spec_idadded_snapshot_idadded_data_files_countexisting_data_files_countdeleted_data_files_countpartition_summaries
s3://.../metadata/a85f78c5-3222-4b37-b7e4-faf944425d48-m0.avro637606272782676904868561200[{false, false, 20210101, 20210101}]

Note:

  1. Fields withinpartition_summaries column of the manifests table correspond tofield_summary structs withinmanifest list, with the following order:
    • contains_null
    • contains_nan
    • lower_bound
    • upper_bound
  2. contains_nan could return null, which indicates that this information is not available from the file's metadata. This usually occurs when reading from V1 table, wherecontains_nan is not populated.

References🔗

To show a table's known snapshot references:

SELECT*FROMprod.db.table.refs;
nametypesnapshot_idmax_reference_age_in_msmin_snapshots_to_keepmax_snapshot_age_in_ms
mainBRANCH4686954189838128572102030
testTagTAG468695418983812857210nullnull

Inspecting with DataFrames🔗

Metadata tables can be loaded using the DataFrameReader API:

// named metastore tablespark.read.format("iceberg").load("db.table.files")// Hadoop path tablespark.read.format("iceberg").load("hdfs://nn:8020/path/to/table#files")

Time Travel with Metadata Tables🔗

To inspect a tables's metadata with the time travel feature:

-- get the table's file manifests at timestamp Sep 20, 2021 08:00:00SELECT*FROMprod.db.table.manifestsTIMESTAMPASOF'2021-09-20 08:00:00';-- get the table's partitions with snapshot id 10963874102873LSELECT*FROMprod.db.table.partitionsVERSIONASOF10963874102873;

Metadata tables can also be inspected with time travel using the DataFrameReader API:

// load the table's file metadata at snapshot-id 10963874102873 as DataFramespark.read.format("iceberg").option("snapshot-id",10963874102873L).load("db.table.files")

[8]ページ先頭

©2009-2025 Movatter.jp