Write from Dataflow to Apache Iceberg Stay organized with collections Save and categorize content based on your preferences.
To write from Dataflow to Apache Iceberg, use themanaged I/O connector.
Managed I/O supports the following capabilities for Apache Iceberg:
| Catalogs |
|
|---|---|
| Read capabilities | Batch read |
| Write capabilities |
|
ForBigQuery tables for Apache Iceberg,use theBigQueryIO connectorwith BigQuery Storage API. The table must already exist; dynamic table creation isnot supported.
Dependencies
Add the following dependencies to your project:
Java
<dependency><groupId>org.apache.beam</groupId><artifactId>beam-sdks-java-managed</artifactId><version>${beam.version}</version></dependency><dependency><groupId>org.apache.beam</groupId><artifactId>beam-sdks-java-io-iceberg</artifactId><version>${beam.version}</version></dependency>Dynamic destinations
Managed I/O for Apache Iceberg supports dynamic destinations. Instead ofwriting to a single fixed table, the connector can dynamically select adestination table based on field values within the incoming records.
To use dynamic destinations, provide a template for thetable configurationparameter. For more information, seeDynamic destinations.
Examples
The following examples show how to use Managed I/O to write toApache Iceberg.
Write to an Apache Iceberg table
The following example writes in-memory JSON data to an Apache Iceberg table.
Java
To authenticate to Dataflow, set up Application Default Credentials. For more information, seeSet up authentication for a local development environment.
importcom.google.common.collect.ImmutableMap;importjava.util.Arrays;importjava.util.List;importjava.util.Map;importorg.apache.beam.sdk.Pipeline;importorg.apache.beam.sdk.managed.Managed;importorg.apache.beam.sdk.options.Description;importorg.apache.beam.sdk.options.PipelineOptions;importorg.apache.beam.sdk.options.PipelineOptionsFactory;importorg.apache.beam.sdk.schemas.Schema;importorg.apache.beam.sdk.transforms.Create;importorg.apache.beam.sdk.transforms.JsonToRow;importorg.apache.beam.sdk.values.PCollectionRowTuple;publicclassApacheIcebergWrite{staticfinalList<String>TABLE_ROWS=Arrays.asList("{\"id\":0, \"name\":\"Alice\"}","{\"id\":1, \"name\":\"Bob\"}","{\"id\":2, \"name\":\"Charles\"}");staticfinalStringCATALOG_TYPE="hadoop";// The schema for the table rows.publicstaticfinalSchemaSCHEMA=newSchema.Builder().addStringField("name").addInt64Field("id").build();publicinterfaceOptionsextendsPipelineOptions{@Description("The URI of the Apache Iceberg warehouse location")StringgetWarehouseLocation();voidsetWarehouseLocation(Stringvalue);@Description("The name of the Apache Iceberg catalog")StringgetCatalogName();voidsetCatalogName(Stringvalue);@Description("The name of the table to write to")StringgetTableName();voidsetTableName(Stringvalue);}publicstaticvoidmain(String[]args){// Parse the pipeline options passed into the application. Example:// --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \// --tableName= $TABLE_NAME// For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-optionsOptionsoptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);Pipelinepipeline=Pipeline.create(options);// Configure the Iceberg source I/OMapcatalogConfig=ImmutableMap.<String,Object>builder().put("warehouse",options.getWarehouseLocation()).put("type",CATALOG_TYPE).build();ImmutableMap<String,Object>config=ImmutableMap.<String,Object>builder().put("table",options.getTableName()).put("catalog_name",options.getCatalogName()).put("catalog_properties",catalogConfig).build();// Build the pipeline.pipeline.apply(Create.of(TABLE_ROWS)).apply(JsonToRow.withSchema(SCHEMA)).apply(Managed.write(Managed.ICEBERG).withConfig(config));pipeline.run().waitUntilFinish();}}Write with dynamic destinations
The following example writes to different Apache Iceberg tables based on afield in the input data.
Java
To authenticate to Dataflow, set up Application Default Credentials. For more information, seeSet up authentication for a local development environment.
importcom.google.common.collect.ImmutableMap;importjava.util.Arrays;importjava.util.List;importjava.util.Map;importorg.apache.beam.sdk.Pipeline;importorg.apache.beam.sdk.PipelineResult;importorg.apache.beam.sdk.managed.Managed;importorg.apache.beam.sdk.options.Description;importorg.apache.beam.sdk.options.PipelineOptions;importorg.apache.beam.sdk.options.PipelineOptionsFactory;importorg.apache.beam.sdk.schemas.Schema;importorg.apache.beam.sdk.transforms.Create;importorg.apache.beam.sdk.transforms.JsonToRow;publicclassApacheIcebergDynamicDestinations{// The schema for the table rows.publicstaticfinalSchemaSCHEMA=newSchema.Builder().addInt64Field("id").addStringField("name").addStringField("airport").build();// The data to write to table, formatted as JSON strings.staticfinalList<String>TABLE_ROWS=List.of("{\"id\":0, \"name\":\"Alice\", \"airport\": \"ORD\" }","{\"id\":1, \"name\":\"Bob\", \"airport\": \"SYD\" }","{\"id\":2, \"name\":\"Charles\", \"airport\": \"ORD\" }");publicinterfaceOptionsextendsPipelineOptions{@Description("The URI of the Apache Iceberg warehouse location")StringgetWarehouseLocation();voidsetWarehouseLocation(Stringvalue);@Description("The name of the Apache Iceberg catalog")StringgetCatalogName();voidsetCatalogName(Stringvalue);}// Write JSON data to Apache Iceberg, using dynamic destinations to determine the Iceberg table// where Dataflow writes each record. The JSON data contains a field named "airport". The// Dataflow pipeline writes to Iceberg tables with the naming pattern "flights-{airport}".publicstaticvoidmain(String[]args){// Parse the pipeline options passed into the application. Example:// --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \// For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-optionsOptionsoptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);Pipelinepipeline=Pipeline.create(options);// Configure the Iceberg source I/OMapcatalogConfig=ImmutableMap.<String,Object>builder().put("warehouse",options.getWarehouseLocation()).put("type","hadoop").build();ImmutableMap<String,Object>config=ImmutableMap.<String,Object>builder().put("catalog_name",options.getCatalogName()).put("catalog_properties",catalogConfig)// Route the incoming records based on the value of the "airport" field..put("table","flights-{airport}")// Specify which fields to keep from the input data..put("keep",Arrays.asList("name","id")).build();// Build the pipeline.pipeline// Read in-memory JSON data..apply(Create.of(TABLE_ROWS))// Convert the JSON records to Row objects..apply(JsonToRow.withSchema(SCHEMA))// Write each Row to Apache Iceberg..apply(Managed.write(Managed.ICEBERG).withConfig(config));// Run the pipeline.pipeline.run().waitUntilFinish();}}What's next
- Read from Apache Iceberg.
- Streaming Write to Apache Iceberg with BigLake REST Catalog.
- Learn more aboutManaged I/O.
Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2026-02-19 UTC.