Write from Dataflow to BigQuery

This document describes how to write data from Dataflow toBigQuery.

Overview

For most use cases, consider usingManaged I/O to write toBigQuery. Managed I/O provides features such asautomatic upgrades and a consistentconfiguration API. Whenwriting to BigQuery, Managed I/O automatically chooses the bestwrite method for batch or streaming jobs.

If you need more advanced performance tuning, consider using theBigQueryIO connector. For more information, seeUse theBigQueryIO connector in this document.

Performance

The following table shows performance metrics for various workloads. Theseworkloads were run on onee2-standard2 worker, using the Apache Beam SDK2.49.0 for Java. They did not use Runner v2.

100 M records | 1 kB | 1 columnThroughput (bytes)Throughput (elements)
Storage Write55 MBps54,000 elements per second
Avro Load78 MBps77,000 elements per second
Json Load54 MBps53,000 elements per second

These metrics are based on simple batch pipelines. They are intended to compare performance between I/O connectors, and are not necessarily representative of real-world pipelines. Dataflow pipeline performance is complex, and is a function of VM type, the data being processed, the performance of external sources and sinks, and user code. Metrics are based on running the Java SDK, and aren't representative of the performance characteristics of other language SDKs. For more information, seeBeam IO Performance.

Use theBigQueryIO connector

TheBigQuery I/O connector supports the following methodsfor writing to BigQuery:

  • STORAGE_WRITE_API. In this mode, the connectorperforms direct writes to BigQuery storage, using theBigQuery Storage Write API. TheStorage Write API combines streaming ingestion and batch loadinginto a single high-performance API. This mode guarantees exactly-oncesemantics.
  • STORAGE_API_AT_LEAST_ONCE. This mode also uses theStorage Write API, but provides at-least-once semantics.This mode results in lower latency for most pipelines. However, duplicatewrites are possible.
  • FILE_LOADS. In this mode, the connector writes the input data to stagingfiles in Cloud Storage. Then it runs a BigQueryload job to load the data intoBigQuery. The mode is the default for boundedPCollections,which are most commonly found in batch pipelines.
  • STREAMING_INSERTS. In this mode, the connector uses thelegacy streaming API. This modeis the default for unboundedPCollections, but is not recommended for newprojects.

When choosing a write method, consider the following points:

  • For streaming jobs, consider usingSTORAGE_WRITE_API orSTORAGE_API_AT_LEAST_ONCE, because these modes write directly toBigQuery storage, without using intermediate staging files.
  • If you run the pipeline usingat-least-once streaming mode, set thewrite mode toSTORAGE_API_AT_LEAST_ONCE. This setting is more efficient andmatches the semantics of at-least-once streaming mode.
  • File loads and Storage Write API have differentquotas and limits.
  • Load jobs use either the shared BigQuery slot pool or reservedslots. To use reserved slots, run the load job in a project with a reservationassignment of typePIPELINE. Load jobs are free if you use the sharedBigQuery slot pool. However, BigQuery does notmake guarantees about the available capacity of the shared pool. For moreinformation, seeIntroduction to reservations.

Parallelism

  • ForFILE_LOADS andSTORAGE_WRITE_API in streaming pipelines, the connectorshards the data to a number of files or streams. In general, we recommendcallingwithAutoSharding to enable auto-sharding.

  • ForFILE_LOADS in batch pipelines, the connector writes data to partitionedfiles, which are then loaded into BigQuery in parallel.

  • ForSTORAGE_WRITE_API in batch pipelines, each worker creates one or morestreams to write to BigQuery, determined by the total numberof shards.

  • ForSTORAGE_API_AT_LEAST_ONCE, there is a singledefault write stream. Multipleworkers append to this stream.

Best practices

  • The Storage Write API hasquota limits. The connector handles theselimits for most pipelines. However, some scenarios can exhaust the availableStorage Write API streams. For example, this issue might happen ina pipeline that uses auto-sharding and autoscaling with a large number ofdestinations, especially in long-running jobs with highly variable workloads.If this problem occurs, consider usingSTORAGE_WRITE_API_AT_LEAST_ONCE,which avoids the issue.

  • UseGoogle Cloud Platform metrics tomonitor your Storage Write API quota usage.

  • When using file loads, Avro typically outperforms JSON. To use Avro, callwithAvroFormatFunction.

  • By default, load jobs run in the same project as the Dataflowjob. To specify a different project, callwithLoadJobProjectId.

  • When using the Java SDK, consider creating a class that represents theschema of the BigQuery table. Then calluseBeamSchema in your pipeline to automatically convertbetween Apache BeamRow and BigQueryTableRow types.For an example of a schema class, seeExampleModel.java.

  • If you load tables with complex schemas containing thousands of fields,consider callingwithMaxBytesPerPartition to seta smaller maximum size for each load job.

  • By default,BigQueryIO uses Storage Write API settings that arereasonable for most pipelines. However, if you see performance issues, you canset pipeline options to tune these settings. For more information, seeTune the Storage Write API in theApache Beam documentation.

Streaming pipelines

The following recommendations apply to streaming pipelines.

  • For streaming pipelines, we recommend using the Storage Write API(STORAGE_WRITE_API orSTORAGE_API_AT_LEAST_ONCE).

  • A streaming pipeline can use file loads, but this approach has disadvantages:

    • It requireswindowingin order to write the files. You can't use the global window.
    • BigQuery loads files on a best-effort basis when using theshared slot pool. There can be a significant delaybetween when a record is written and when it's available inBigQuery.
    • If a load job fails — for example, due to bad data or a schemamismatch — the entire pipeline fails.
  • Consider usingSTORAGE_WRITE_API_AT_LEAST_ONCE when possible. It can resultin duplicate records being written to BigQuery, but is lessexpensive and more scalable thanSTORAGE_WRITE_API.

  • In general, avoid usingSTREAMING_INSERTS. Streaming inserts are moreexpensive than Storage Write API, and don't perform as well.

  • Data sharding can improve performance in streaming pipelines. For mostpipelines, auto-sharding is a good starting point. However, you can tunesharding as follows:

  • If you use streaming inserts, we recommend settingretryTransientErrors as theretry policy.

Batch pipelines

The following recommendations apply to batch pipelines.

  • For most large batch pipelines, we recommend first tryingFILE_LOADS. Abatch pipeline can useSTORAGE_WRITE_API, but it's likely to exceed quotalimits at large scale (1,000+ vCPUs) or if concurrent pipelines are running.Apache Beam doesn't throttle the maximum number of write streams for batchSTORAGE_WRITE_API jobs, so the job eventually reaches BigQuery Storage APIlimits.

  • When usingFILE_LOADS, you might exhaust either the sharedBigQuery slot pool or your pool of reserved slots. If youencounter this kind of failure, try the following approaches:

    • Reduce the maximum number of workers or worker size for the job.
    • Purchase morereserved slots.
    • Consider usingSTORAGE_WRITE_API.
  • Small to medium pipelines (<1,000 vCPUs) might benefit from usingSTORAGE_WRITE_API. For these smaller jobs, consider usingSTORAGE_WRITE_API if you want adead letter queueor when theFILE_LOADS shared slot pool is not enough.

  • If you can tolerate duplicate data, consider usingSTORAGE_WRITE_API_AT_LEAST_ONCE. This mode can result in duplicaterecords being written to BigQuery, but might be less expensivethan theSTORAGE_WRITE_API option.

  • Different write modes might perform differently based on the characteristicsof your pipeline. Experiment to find the best write mode for your workload.

Handle row-level errors

This section describes how to handle errors that might happen at the row level,for example because of badly formed input data or schema mismatches.

For Storage Write API, any rows that can't be written are placedinto a separatePCollection. To get this collection, callgetFailedStorageApiInserts on theWriteResultobject. For an example of this approach, seeStream data to BigQuery.

It's a good practice tosend the errors to a dead-letter queue or table, for later processing. For moreinformation about this pattern, seeBigQueryIO dead letter pattern.

ForFILE_LOADS, if an error occurs while loading the data, the load job failsand the pipeline throws a runtime exception. You can view the error in theDataflow logs or look at the BigQuery job history.The I/O connector does not return information about individual failed rows.

For more information about troubleshooting errors, seeBigQuery connector errors.

Examples

The following examples show how to use Dataflow to write toBigQuery. These examples use theBigQueryIO connector.

Write to an existing table

The following example creates a batch pipeline that writes aPCollection<MyData> to BigQuery, whereMyData is a customdata type.

TheBigQueryIO.write() method returns aBigQueryIO.Write<T> type, which is used to configure the writeoperation. For more information, seeWriting to a tablein the Apache Beam documentation. This code example writes to an existingtable (CREATE_NEVER) and appends the new rows to the table (WRITE_APPEND).

Java

To authenticate to Dataflow, set up Application Default Credentials. For more information, seeSet up authentication for a local development environment.

importcom.google.api.services.bigquery.model.TableRow;importjava.util.Arrays;importjava.util.List;importorg.apache.beam.sdk.Pipeline;importorg.apache.beam.sdk.coders.DefaultCoder;importorg.apache.beam.sdk.extensions.avro.coders.AvroCoder;importorg.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;importorg.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;importorg.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;importorg.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;importorg.apache.beam.sdk.options.PipelineOptionsFactory;importorg.apache.beam.sdk.transforms.Create;publicclassBigQueryWrite{// A custom datatype for the source data.@DefaultCoder(AvroCoder.class)publicstaticclassMyData{publicStringname;publicLongage;publicMyData(){}publicMyData(Stringname,Longage){this.name=name;this.age=age;}}publicstaticvoidmain(String[]args){// Example source data.finalList<MyData>data=Arrays.asList(newMyData("Alice",40L),newMyData("Bob",30L),newMyData("Charlie",20L));// Parse the pipeline options passed into the application. Example://   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME// For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-optionsPipelineOptionsFactory.register(ExamplePipelineOptions.class);ExamplePipelineOptionsoptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(ExamplePipelineOptions.class);// Create a pipeline and apply transforms.Pipelinepipeline=Pipeline.create(options);pipeline// Create an in-memory PCollection of MyData objects..apply(Create.of(data))// Write the data to an exiting BigQuery table..apply(BigQueryIO.<MyData>write().to(String.format("%s:%s.%s",options.getProjectId(),options.getDatasetName(),options.getTableName())).withFormatFunction((MyDatax)->newTableRow().set("user_name",x.name).set("age",x.age)).withCreateDisposition(CreateDisposition.CREATE_NEVER).withWriteDisposition(WriteDisposition.WRITE_APPEND).withMethod(Write.Method.STORAGE_WRITE_API));pipeline.run().waitUntilFinish();}}

Write to a new or existing table

The following example creates a new table if the destination table does notexist, by setting thecreate dispositiontoCREATE_IF_NEEDED. When you use this option, you must provide a tableschema. The connector uses this schema if it creates a new table.

Java

To authenticate to Dataflow, set up Application Default Credentials. For more information, seeSet up authentication for a local development environment.

importcom.google.api.services.bigquery.model.TableFieldSchema;importcom.google.api.services.bigquery.model.TableRow;importcom.google.api.services.bigquery.model.TableSchema;importjava.util.Arrays;importjava.util.List;importorg.apache.beam.sdk.Pipeline;importorg.apache.beam.sdk.coders.DefaultCoder;importorg.apache.beam.sdk.extensions.avro.coders.AvroCoder;importorg.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;importorg.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;importorg.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;importorg.apache.beam.sdk.options.PipelineOptionsFactory;importorg.apache.beam.sdk.transforms.Create;publicclassBigQueryWriteWithSchema{// A custom datatype for the source data.@DefaultCoder(AvroCoder.class)publicstaticclassMyData{publicStringname;publicLongage;publicMyData(){}publicMyData(Stringname,Longage){this.name=name;this.age=age;}}publicstaticvoidmain(String[]args){// Example source data.finalList<MyData>data=Arrays.asList(newMyData("Alice",40L),newMyData("Bob",30L),newMyData("Charlie",20L));// Define a table schema. A schema is required for write disposition CREATE_IF_NEEDED.TableSchemaschema=newTableSchema().setFields(Arrays.asList(newTableFieldSchema().setName("user_name").setType("STRING").setMode("REQUIRED"),newTableFieldSchema().setName("age").setType("INT64")// Defaults to NULLABLE));// Parse the pipeline options passed into the application. Example://   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME// For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-optionsPipelineOptionsFactory.register(ExamplePipelineOptions.class);ExamplePipelineOptionsoptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(ExamplePipelineOptions.class);// Create a pipeline and apply transforms.Pipelinepipeline=Pipeline.create(options);pipeline// Create an in-memory PCollection of MyData objects..apply(Create.of(data))// Write the data to a new or existing BigQuery table..apply(BigQueryIO.<MyData>write().to(String.format("%s:%s.%s",options.getProjectId(),options.getDatasetName(),options.getTableName())).withFormatFunction((MyDatax)->newTableRow().set("user_name",x.name).set("age",x.age)).withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED).withSchema(schema).withMethod(Write.Method.STORAGE_WRITE_API));pipeline.run().waitUntilFinish();}}

Stream data to BigQuery

The following example shows how to stream data using exactly-once semantics, bysetting the write mode toSTORAGE_WRITE_API

Not all streaming pipelines require exactly-once semantics. For example, youmight be able tomanually remove duplicatesfrom the destination table. If the possibility of duplicate records isacceptable for your scenario, consider using at-least-once semantics by settingthewrite method toSTORAGE_API_AT_LEAST_ONCE. This method isgenerally more efficient and results in lower latency for most pipelines.

Java

To authenticate to Dataflow, set up Application Default Credentials. For more information, seeSet up authentication for a local development environment.

importcom.google.api.services.bigquery.model.TableRow;importorg.apache.beam.sdk.Pipeline;importorg.apache.beam.sdk.PipelineResult;importorg.apache.beam.sdk.coders.StringUtf8Coder;importorg.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;importorg.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;importorg.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;importorg.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;importorg.apache.beam.sdk.options.PipelineOptionsFactory;importorg.apache.beam.sdk.testing.TestStream;importorg.apache.beam.sdk.transforms.MapElements;importorg.apache.beam.sdk.values.TimestampedValue;importorg.apache.beam.sdk.values.TypeDescriptor;importorg.apache.beam.sdk.values.TypeDescriptors;importorg.joda.time.Duration;importorg.joda.time.Instant;publicclassBigQueryStreamExactlyOnce{// Create a PTransform that sends simulated streaming data. In a real application, the data// source would be an external source, such as Pub/Sub.privatestaticTestStream<String>createEventSource(){InstantstartTime=newInstant(0);returnTestStream.create(StringUtf8Coder.of()).advanceWatermarkTo(startTime).addElements(TimestampedValue.of("Alice,20",startTime),TimestampedValue.of("Bob,30",startTime.plus(Duration.standardSeconds(1))),TimestampedValue.of("Charles,40",startTime.plus(Duration.standardSeconds(2))),TimestampedValue.of("Dylan,Invalid value",startTime.plus(Duration.standardSeconds(2)))).advanceWatermarkToInfinity();}publicstaticPipelineResultmain(String[]args){// Parse the pipeline options passed into the application. Example://   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME// For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-optionsPipelineOptionsFactory.register(ExamplePipelineOptions.class);ExamplePipelineOptionsoptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(ExamplePipelineOptions.class);options.setStreaming(true);// Create a pipeline and apply transforms.Pipelinepipeline=Pipeline.create(options);pipeline// Add a streaming data source..apply(createEventSource())// Map the event data into TableRow objects..apply(MapElements.into(TypeDescriptor.of(TableRow.class)).via((Stringx)->{String[]columns=x.split(",");returnnewTableRow().set("user_name",columns[0]).set("age",columns[1]);}))// Write the rows to BigQuery.apply(BigQueryIO.writeTableRows().to(String.format("%s:%s.%s",options.getProjectId(),options.getDatasetName(),options.getTableName())).withCreateDisposition(CreateDisposition.CREATE_NEVER).withWriteDisposition(WriteDisposition.WRITE_APPEND).withMethod(Write.Method.STORAGE_WRITE_API)// For exactly-once processing, set the triggering frequency..withTriggeringFrequency(Duration.standardSeconds(5)))// Get the collection of write errors..getFailedStorageApiInserts().apply(MapElements.into(TypeDescriptors.strings())// Process each error. In production systems, it's useful to write the errors to// another destination, such as a dead-letter table or queue..via(x->{System.out.println("Failed insert: "+x.getErrorMessage());System.out.println("Row: "+x.getRow());return"";}));returnpipeline.run();}}

What's next

Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2026-02-19 UTC.