The Beam SDKs include built-in transforms that can read data from and write datatoGoogle BigQuery tables.
To use BigQueryIO, add the Maven artifact dependency to yourpom.xml file.
Additional resources:
To use BigQueryIO, you must install the Google Cloud Platform dependencies byrunningpip install apache-beam[gcp].
Additional resources:
To read or write from a BigQuery table, you must provide a fully-qualifiedBigQuery table name (for example,bigquery-public-data:github_repos.sample_contents).A fully-qualified BigQuery table name consists of three parts:
A table name can also include atable decoratorif you are usingtime-partitioned tables.
To specify a BigQuery table, you can use either the table’s fully-qualified name asa string, or use aTableReferenceTableReferenceobject.
To specify a table with a string, use the format[project_id]:[dataset_id].[table_id] or[project_id].[dataset_id].[table_id]to specify the fully-qualified BigQuery table name.
You can also omitproject_id and use the[dataset_id].[table_id] format. Ifyou omit the project ID, Beam uses the default project ID from yourpipeline options.pipeline options.
To specify a table with aTableReference, create a newTableReference usingthe three parts of the BigQuery table name.
The Beam SDK for Java also provides theparseTableSpechelper method, which constructs aTableReference object from a String thatcontains the fully-qualified BigQuery table name. However, the static factorymethods for BigQueryIO transforms accept the table name as a String andconstruct aTableReference object for you.
BigQueryIO read and write transforms produce and consume data as aPCollectionof dictionaries, where each element in thePCollection represents a single rowin the table.
When writing to BigQuery, you must supply a table schema for the destinationtable that you want to write to, unless you specify acreatedisposition ofCREATE_NEVER.Creating a tableschema covers schemas in more detail.
BigQuery supports the following data types: STRING, BYTES, INTEGER, FLOAT,NUMERIC, BOOLEAN, TIMESTAMP, DATE, TIME, DATETIME and GEOGRAPHY. For anoverview of Google Standard SQL data types, seeData types.BigQueryIO allows you to use all of these data types. The following exampleshows the correct format for data types used when reading from and writing toBigQuery:
importcom.google.api.services.bigquery.model.TableRow;importjava.math.BigDecimal;importjava.nio.charset.StandardCharsets;importjava.time.Instant;importjava.time.LocalDate;importjava.time.LocalDateTime;importjava.time.LocalTime;importjava.util.AbstractMap.SimpleEntry;importjava.util.Arrays;importjava.util.Base64;importjava.util.stream.Collectors;importjava.util.stream.Stream;classBigQueryTableRowCreate{publicstaticTableRowcreateTableRow(){TableRowrow=newTableRow()// To learn more about BigQuery data types:// https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types.set("string_field","UTF-8 strings are supported! 🌱🌳🌍").set("int64_field",432).set("float64_field",3.141592653589793).set("numeric_field",newBigDecimal("1234.56").toString()).set("bool_field",true).set("bytes_field",Base64.getEncoder().encodeToString("UTF-8 byte string 🌱🌳🌍".getBytes(StandardCharsets.UTF_8)))// To learn more about date formatting:// https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/time/format/DateTimeFormatter.html.set("date_field",LocalDate.parse("2020-03-19").toString())// ISO_LOCAL_DATE.set("datetime_field",LocalDateTime.parse("2020-03-19T20:41:25.123").toString())// ISO_LOCAL_DATE_TIME.set("time_field",LocalTime.parse("20:41:25.123").toString())// ISO_LOCAL_TIME.set("timestamp_field",Instant.parse("2020-03-20T03:41:42.123Z").toString())// ISO_INSTANT// To learn more about the geography Well-Known Text (WKT) format:// https://en.wikipedia.org/wiki/Well-known_text_representation_of_geometry.set("geography_field","POINT(30 10)")// An array has its mode set to REPEATED..set("array_field",Arrays.asList(1,2,3,4))// Any class can be written as a STRUCT as long as all the fields in the// schema are present and they are encoded correctly as BigQuery types..set("struct_field",Stream.of(newSimpleEntry<>("string_value","Text 🌱🌳🌍"),newSimpleEntry<>("int64_value","42")).collect(Collectors.toMap(SimpleEntry::getKey,SimpleEntry::getValue)));returnrow;}}As of Beam 2.7.0, the NUMERIC data type is supported. This data type supportshigh-precision decimal numbers (precision of 38 digits, scale of 9 digits).The GEOGRAPHY data type works with Well-Known Text (Seehttps://en.wikipedia.org/wiki/Well-known_textformat for reading and writing to BigQuery.BigQuery IO requires values of BYTES datatype to be encoded using base64encoding when writing to BigQuery. When bytes are read from BigQuery they arereturned as base64-encoded strings.
As of Beam 2.7.0, the NUMERIC data type is supported. This data type supportshigh-precision decimal numbers (precision of 38 digits, scale of 9 digits).The GEOGRAPHY data type works with Well-Known Text (Seehttps://en.wikipedia.org/wiki/Well-known_textformat for reading and writing to BigQuery.BigQuery IO requires values of BYTES datatype to be encoded using base64encoding when writing to BigQuery. When bytes are read from BigQuery they arereturned as base64-encoded bytes.
BigQueryIO allows you to read from a BigQuery table, or to execute a SQL queryand read the results. By default, Beam invokes aBigQuery exportrequest when you apply aBigQueryIO read transform. However, the Beam SDK for Java also supports usingtheBigQuery Storage ReadAPI to read directlyfrom BigQuery storage. SeeUsing the Storage Read API formore information.
Beam’s use of BigQuery APIs is subject to BigQuery’sQuotaandPricing policies.
The Beam SDK for Java has two BigQueryIO read methods. Both of these methodsallow you to read from a table, or read fields using a query string.
read(SerializableFunction) reads Avro-formatted records and uses aspecified parsing function to parse them into aPCollection of custom typedobjects. Each element in thePCollection represents a single row in thetable. Theexample code for reading with aquery string shows how to useread(SerializableFunction).
readTableRows returns aPCollection of BigQueryTableRowobjects. Each element in thePCollection represents a single row in thetable. Integer values in theTableRow objects are encoded as strings tomatch BigQuery’s exported JSON format. This method is convenient, but can be2-3 times slower in performance compared toread(SerializableFunction). Theexample code for reading from a table shows how tousereadTableRows.
Note:BigQueryIO.read() is deprecated as of Beam SDK 2.2.0. Instead, useread(SerializableFunction<SchemaAndRecord, T>) to parse BigQuery rows fromAvroGenericRecord into your custom type, or usereadTableRows() to parsethem into JSONTableRow objects.
To read from a BigQuery table using the Beam SDK for Python, apply aReadFromBigQuerytransform.ReadFromBigQuery returns aPCollection of dictionaries,where each element in thePCollection represents a single row in the table.Integer values in theTableRow objects are encoded as strings to matchBigQuery’s exported JSON format.
Note:BigQuerySource() is deprecated as of Beam SDK 2.25.0. Before 2.25.0, to read froma BigQuery table using the Beam SDK, apply aRead transform on aBigQuerySource. For example,beam.io.Read(beam.io.BigQuerySource(table_spec)).
To read an entire BigQuery table, use thefrom method with a BigQuery tablename. This example usesreadTableRows.
To read an entire BigQuery table, use thetable parameter with the BigQuerytable name.
The following code reads an entire table that contains weather station data andthen extracts themax_temperature column.
importorg.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;importorg.apache.beam.sdk.Pipeline;importorg.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;importorg.apache.beam.sdk.transforms.MapElements;importorg.apache.beam.sdk.values.PCollection;importorg.apache.beam.sdk.values.TypeDescriptor;classBigQueryReadFromTable{publicstaticPCollection<MyData>readFromTable(Stringproject,Stringdataset,Stringtable,Pipelinepipeline){// String project = "my-project-id";// String dataset = "my_bigquery_dataset_id";// String table = "my_bigquery_table_id";// Pipeline pipeline = Pipeline.create();PCollection<MyData>rows=pipeline.apply("Read from BigQuery query",BigQueryIO.readTableRows().from(String.format("%s:%s.%s",project,dataset,table))).apply("TableRows to MyData",MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));returnrows;}}If you don’t want to read an entire table, you can supply a query string withthefromQuery method.
If you don’t want to read an entire table, you can supply a query string toReadFromBigQuery by specifying thequery parameter.
The following code uses a SQL query to only read themax_temperature column.
importorg.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;importorg.apache.beam.sdk.Pipeline;importorg.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;importorg.apache.beam.sdk.transforms.MapElements;importorg.apache.beam.sdk.values.PCollection;importorg.apache.beam.sdk.values.TypeDescriptor;classBigQueryReadFromQuery{publicstaticPCollection<MyData>readFromQuery(Stringproject,Stringdataset,Stringtable,Pipelinepipeline){// String project = "my-project-id";// String dataset = "my_bigquery_dataset_id";// String table = "my_bigquery_table_id";// Pipeline pipeline = Pipeline.create();PCollection<MyData>rows=pipeline.apply("Read from BigQuery query",BigQueryIO.readTableRows().fromQuery(String.format("SELECT * FROM `%s.%s.%s`",project,dataset,table)).usingStandardSql()).apply("TableRows to MyData",MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));returnrows;}}You can also use BigQuery’s standard SQL dialect with a query string, as shownin the following example:
By default the pipeline executes the query in the Google Cloud project associated with the pipeline (in case of the Dataflow runner it’s the project where the pipeline runs). There are cases where the query execution project should be different from the pipeline project. If you use Java SDK, you can define the query execution project by setting the pipeline option “bigQueryProject” to the desired Google Cloud project id.
TheBigQuery Storage APIallows you to directly access tables in BigQuery storage, and supports featuressuch as column selection and predicate filter push-down which can allow moreefficient pipeline execution.
The Beam SDK for Java supports using the BigQuery Storage API when reading fromBigQuery. SDK versions before 2.25.0 support the BigQuery Storage API as anexperimental featureand use the pre-GA BigQuery Storage API surface. Callers should migratepipelines which use the BigQuery Storage API to use SDK version 2.25.0 or later.
The Beam SDK for Python supports the BigQuery Storage API. Enable itby passingmethod=DIRECT_READ as a parameter toReadFromBigQuery.
Use the following methods when you read from a table:
The following code snippet reads from a table. This example is from theBigQueryTornadoesexample.When the example’s read method option is set toDIRECT_READ, the pipeline usesthe BigQuery Storage API and column projection to read public samples of weatherdata from a BigQuery table. You can view thefull source code onGitHub.
importjava.util.Arrays;importorg.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;importorg.apache.beam.sdk.Pipeline;importorg.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;importorg.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;importorg.apache.beam.sdk.transforms.MapElements;importorg.apache.beam.sdk.values.PCollection;importorg.apache.beam.sdk.values.TypeDescriptor;classBigQueryReadFromTableWithBigQueryStorageAPI{publicstaticPCollection<MyData>readFromTableWithBigQueryStorageAPI(Stringproject,Stringdataset,Stringtable,Pipelinepipeline){// String project = "my-project-id";// String dataset = "my_bigquery_dataset_id";// String table = "my_bigquery_table_id";// Pipeline pipeline = Pipeline.create();PCollection<MyData>rows=pipeline.apply("Read from BigQuery table",BigQueryIO.readTableRows().from(String.format("%s:%s.%s",project,dataset,table)).withMethod(Method.DIRECT_READ).withSelectedFields(Arrays.asList("string_field","int64_field","float64_field","numeric_field","bool_field","bytes_field","date_field","datetime_field","time_field","timestamp_field","geography_field","array_field","struct_field"))).apply("TableRows to MyData",MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));returnrows;}}The following code snippet reads with a query string.
importorg.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;importorg.apache.beam.sdk.Pipeline;importorg.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;importorg.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;importorg.apache.beam.sdk.transforms.MapElements;importorg.apache.beam.sdk.values.PCollection;importorg.apache.beam.sdk.values.TypeDescriptor;classBigQueryReadFromQueryWithBigQueryStorageAPI{publicstaticPCollection<MyData>readFromQueryWithBigQueryStorageAPI(Stringproject,Stringdataset,Stringtable,Stringquery,Pipelinepipeline){// String project = "my-project-id";// String dataset = "my_bigquery_dataset_id";// String table = "my_bigquery_table_id";// Pipeline pipeline = Pipeline.create();/* String query = String.format("SELECT\n" + " string_field,\n" + " int64_field,\n" + " float64_field,\n" + " numeric_field,\n" + " bool_field,\n" + " bytes_field,\n" + " date_field,\n" + " datetime_field,\n" + " time_field,\n" + " timestamp_field,\n" + " geography_field,\n" + " array_field,\n" + " struct_field\n" + "FROM\n" + " `%s:%s.%s`", project, dataset, table) */PCollection<MyData>rows=pipeline.apply("Read from BigQuery table",BigQueryIO.readTableRows().fromQuery(query).usingStandardSql().withMethod(Method.DIRECT_READ)).apply("TableRows to MyData",MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));returnrows;}}BigQueryIO lets you write to BigQuery tables. If you are using the Beam SDKfor Java, you can write different rows to different tables. The Beam SDK forJava also supports using theBigQuery Storage Write APIto write directly to BigQuery storage. For more information, seeUsing the Storage Write API.
BigQueryIO write transforms use APIs that are subject to BigQuery’sQuota andPricing policies.
When you apply a write transform, you must provide the following informationfor the destination table(s):
In addition, if your write operation creates a new BigQuery table, you must alsosupply a table schema for the destination table.
The create disposition controls whether or not your BigQuery write operationshould create a table if the destination table does not exist.
Use.withCreateDisposition to specify the create disposition. Valid enumvalues are:
Write.CreateDisposition.CREATE_IF_NEEDED: Specifies that thewrite operation should create a new table if one does not exist. If you usethis value, you must provide a table schema with thewithSchema method.CREATE_IF_NEEDED is the default behavior.
Write.CreateDisposition.CREATE_NEVER: Specifies that a tableshould never be created. If the destination table does not exist, the writeoperation fails.
Use thecreate_disposition parameter to specify the create disposition. Validenum values are:
BigQueryDisposition.CREATE_IF_NEEDED: Specifies that the write operationshould create a new table if one does not exist. If you use this value, youmust provide a table schema.CREATE_IF_NEEDED is the default behavior.
BigQueryDisposition.CREATE_NEVER: Specifies that a table should never becreated. If the destination table does not exist, the write operation fails.
If you specifyCREATE_IF_NEEDED as the create disposition and you don’t supplya table schema, the transform might fail at runtime if the destination table doesnot exist.
The write disposition controls how your BigQuery write operation applies to anexisting table.
Use.withWriteDisposition to specify the write disposition. Valid enum valuesare:
Write.WriteDisposition.WRITE_EMPTY: Specifies that the writeoperation should fail at runtime if the destination table is not empty.WRITE_EMPTY is the default behavior.
Write.WriteDisposition.WRITE_TRUNCATE: Specifies that the writeoperation should replace an existing table. Any existing rows in thedestination table are removed, and the new rows are added to the table.
Write.WriteDisposition.WRITE_APPEND: Specifies that the writeoperation should append the rows to the end of the existing table.
Use thewrite_disposition parameter to specify the write disposition. Validenum values are:
BigQueryDisposition.WRITE_EMPTY: Specifies that the write operation shouldfail at runtime if the destination table is not empty.WRITE_EMPTY is thedefault behavior.
BigQueryDisposition.WRITE_TRUNCATE: Specifies that the write operationshould replace an existing table. Any existing rows in the destination tableare removed, and the new rows are added to the table.
BigQueryDisposition.WRITE_APPEND: Specifies that the write operation shouldappend the rows to the end of the existing table.
When you useWRITE_EMPTY, the check for whether or not the destination tableis empty can occur before the actual write operation. This check doesn’tguarantee that your pipeline will have exclusive access to the table. Twoconcurrent pipelines that write to the same output table with a writedisposition ofWRITE_EMPTY might start successfully, but both pipelines canfail later when the write attempts happen.
If your BigQuery write operation creates a new table, you must provide schemainformation. The schema contains information about each field in the table.When updating a pipeline with a new schema, the existing schema fields muststay in the same order, or the pipeline will break, failing to write to BigQuery.
To create a table schema in Java, you can either use aTableSchema object, oruse a string that contains a JSON-serializedTableSchema object.
To create a table schema in Python, you can either use aTableSchema object,or use a string that defines a list of fields. Single string based schemas donot support nested fields, repeated fields, or specifying a BigQuery mode forfields (the mode is always set toNULLABLE).
To create and use a table schema as aTableSchema object, follow these steps.
Create a list ofTableFieldSchema objects. EachTableFieldSchema objectrepresents a field in the table.
Create aTableSchema object and use thesetFields method to specify yourlist of fields.
Use thewithSchema method to provide your table schema when you apply awrite transform.
Create aTableSchema object.
Create and append aTableFieldSchema object for each field in your table.
Use theschema parameter to provide your table schema when you applya write transform. Set the parameter’s value to theTableSchema object.
The following example code shows how to create aTableSchema for a table withtwo fields (source and quote) of type string.
importcom.google.api.services.bigquery.model.TableFieldSchema;importcom.google.api.services.bigquery.model.TableSchema;importjava.util.Arrays;classBigQuerySchemaCreate{publicstaticTableSchemacreateSchema(){// To learn more about BigQuery schemas:// https://cloud.google.com/bigquery/docs/schemasTableSchemaschema=newTableSchema().setFields(Arrays.asList(newTableFieldSchema().setName("string_field").setType("STRING").setMode("REQUIRED"),newTableFieldSchema().setName("int64_field").setType("INT64").setMode("NULLABLE"),newTableFieldSchema().setName("float64_field").setType("FLOAT64"),// default mode is "NULLABLE"newTableFieldSchema().setName("numeric_field").setType("NUMERIC"),newTableFieldSchema().setName("bool_field").setType("BOOL"),newTableFieldSchema().setName("bytes_field").setType("BYTES"),newTableFieldSchema().setName("date_field").setType("DATE"),newTableFieldSchema().setName("datetime_field").setType("DATETIME"),newTableFieldSchema().setName("time_field").setType("TIME"),newTableFieldSchema().setName("timestamp_field").setType("TIMESTAMP"),newTableFieldSchema().setName("geography_field").setType("GEOGRAPHY"),newTableFieldSchema().setName("array_field").setType("INT64").setMode("REPEATED").setDescription("Setting the mode to REPEATED makes this an ARRAY<INT64>."),newTableFieldSchema().setName("struct_field").setType("STRUCT").setDescription("A STRUCT accepts a custom data class, the fields must match the custom class fields.").setFields(Arrays.asList(newTableFieldSchema().setName("string_value").setType("STRING"),newTableFieldSchema().setName("int64_value").setType("INT64")))));returnschema;}}To create and use a table schema as a string that contains JSON-serializedTableSchema object, follow these steps.
Create a string that contains a JSON-serializedTableSchema object.
Use thewithJsonSchema method to provide your table schema when you apply awrite transform.
To create and use a table schema as a string, follow these steps.
Create a single comma separated string of the form“field1:type1,field2:type2,field3:type3” that defines a list of fields. Thetype should specify the field’s BigQuery type.
Use theschema parameter to provide your table schema when you apply awrite transform. Set the parameter’s value to the string.
The following example shows how to use a string to specify the same table schemaas the previous example.
BigQueryIO supports two methods of inserting data into BigQuery: load jobs andstreaming inserts. Each insertion method provides different tradeoffs of cost,quota, and data consistency. See the BigQuery documentation fordifferent data ingestion options(specifically,load jobsandstreaming inserts)for more information about these tradeoffs.
BigQueryIO chooses a default insertion method based on the inputPCollection.You can usewithMethod to specify the desired insertion method. SeeWrite.Methodfor the list of the available methods and their restrictions.
BigQueryIO chooses a default insertion method based on the inputPCollection.You can usemethod to specify the desired insertion method. SeeWriteToBigQueryfor the list of the available methods and their restrictions.
BigQueryIO uses load jobs in the following situations:
PCollection.BigQueryIO.write().withMethod(FILE_LOADS).PCollection.WriteToBigQuery(method='FILE_LOADS').Note: If you use batch loads in a streaming pipeline:
You must usewithTriggeringFrequency to specify a triggering frequency forinitiating load jobs. Be careful about setting the frequency such that yourpipeline doesn’t exceed the BigQuery load jobquota limit.
You can either usewithNumFileShards to explicitly set the number of fileshards written, or usewithAutoSharding to enable dynamic sharding (starting2.29.0 release) and the number of shards may be determined and changed atruntime. The sharding behavior depends on the runners.
You must usetriggering_frequency to specify a triggering frequency forinitiating load jobs. Be careful about setting the frequency such that yourpipeline doesn’t exceed the BigQuery load jobquota limit.
You can setwith_auto_sharding=True to enable dynamic sharding (starting2.29.0 release). The number of shards may be determined and changed at runtime.The sharding behavior depends on the runners.
BigQueryIO uses streaming inserts in the following situations:
PCollection.BigQueryIO.write().withMethod(STREAMING_INSERTS).PCollection.WriteToBigQuery(method='STREAMING_INSERTS').Note: Streaming inserts by default enables BigQuerybest-effort deduplication mechanism.You can disable that by settingignoreInsertIds. Thequota limitationsare different when deduplication is enabled vs. disabled.
Streaming inserts applies a default sharding for each table destination. You canusewithAutoSharding (starting 2.28.0 release) to enable dynamic sharding andthe number of shards may be determined and changed at runtime. The shardingbehavior depends on the runners.
Note: Streaming inserts by default enables BigQuerybest-effort deduplication mechanism.You can disable that by settingignore_insert_ids=True. Thequota limitationsare different when deduplication is enabled vs. disabled.
Streaming inserts applies a default sharding for each table destination. You cansetwith_auto_sharding=True (starting 2.29.0 release) to enable dynamicsharding. The number of shards may be determined and changed at runtime. Thesharding behavior depends on the runners.
To write to a BigQuery table, apply either awriteTableRows orwritetransform.
To write to a BigQuery table, apply theWriteToBigQuery transform.WriteToBigQuery supports both batch mode and streaming mode. You must applythe transform to aPCollection of dictionaries. In general, you’ll need to useanother transform, such asParDo, to format your output data into acollection.
The following examples use thisPCollection that contains quotes.
ThewriteTableRows method writes aPCollection of BigQueryTableRowobjects to a BigQuery table. Each element in thePCollection represents asingle row in the table. This example useswriteTableRows to write elements to aPCollection<TableRow>. The write operation creates a table if needed. If thetable already exists, it is replaced.
importcom.google.api.services.bigquery.model.TableRow;importcom.google.api.services.bigquery.model.TableSchema;importorg.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;importorg.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;importorg.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;importorg.apache.beam.sdk.values.PCollection;classBigQueryWriteToTable{publicstaticvoidwriteToTable(Stringproject,Stringdataset,Stringtable,TableSchemaschema,PCollection<TableRow>rows){// String project = "my-project-id";// String dataset = "my_bigquery_dataset_id";// String table = "my_bigquery_table_id";// TableSchema schema = new TableSchema().setFields(Arrays.asList(...));// Pipeline pipeline = Pipeline.create();// PCollection<TableRow> rows = ...rows.apply("Write to BigQuery",BigQueryIO.writeTableRows().to(String.format("%s:%s.%s",project,dataset,table)).withSchema(schema)// For CreateDisposition:// - CREATE_IF_NEEDED (default): creates the table if it doesn't exist, a schema is// required// - CREATE_NEVER: raises an error if the table doesn't exist, a schema is not needed.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)// For WriteDisposition:// - WRITE_EMPTY (default): raises an error if the table is not empty// - WRITE_APPEND: appends new rows to existing rows// - WRITE_TRUNCATE: deletes the existing rows before writing.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));// pipeline.run().waitUntilFinish();}}The following example code shows how to apply aWriteToBigQuery transform towrite aPCollection of dictionaries to a BigQuery table. The write operationcreates a table if needed. If the table already exists, it is replaced.
Thewrite transform writes aPCollection of custom typed objects to a BigQuerytable. Use.withFormatFunction(SerializableFunction) to provide a formattingfunction that converts each input element in thePCollection into aTableRow. This example useswrite to write aPCollection<String>. Thewrite operation creates a table if needed. If the table already exists, it isreplaced.
When you use streaming inserts, you can decide what to do with failed records.You can either keep retrying, or return the failed records in a separatePCollection using theWriteResult.getFailedInserts() method.
Starting with version 2.36.0 of the Beam SDK for Java, you can use theBigQuery Storage Write APIfrom the BigQueryIO connector.
Also after version 2.47.0 of Beam SDK for Python, SDK supports BigQuery Storage Write API.
BigQuery Storage Write API for Python SDK currently has some limitations on supported data types. As this method makes use of cross-language transforms, we are limited to the types supported at the cross-language boundary. For example,apache_beam.utils.timestamp.Timestamp is needed to write aTIMESTAMP BigQuery type. Also, some types (e.g.DATETIME) are not supported yet. For more details, please refer to thefull type mapping.
Note: If you want to run WriteToBigQuery with Storage Write API from the source code, you need to run./gradlew :sdks:java:io:google-cloud-platform:expansion-service:build to build the expansion-service jar. If you are running from a released Beam SDK, the jar is already included.
To write to BigQuery using the Storage Write API, setwithMethod toMethod.STORAGE_WRITE_API.Here’s an example transform that writes to BigQuery using the Storage Write API and exactly-once semantics:
If you want to change the behavior of BigQueryIO so that all the BigQuery sinksfor your pipeline use the Storage Write API by default, set theUseStorageWriteApi option.
If your pipeline needs to create the table (in case it doesn’t exist and youspecified the create disposition asCREATE_IF_NEEDED), you must provide atable schema. The API uses the schema to validate data and convert it to abinary protocol.
For streaming pipelines, you need to set two additional parameters: the numberof streams and the triggering frequency.
The number of streams defines the parallelism of the BigQueryIO Write transformand roughly corresponds to the number of Storage Write API streams that thepipeline uses. You can set it explicitly on the transform viawithNumStorageWriteApiStreamsor provide thenumStorageWriteApiStreams option to the pipeline as defined inBigQueryOptions.Please note this is only supported for streaming pipelines.
Triggering frequency determines how soon the data is visible for querying inBigQuery. You can explicitly set it viawithTriggeringFrequencyor specify the number of seconds by setting thestorageWriteApiTriggeringFrequencySec option.
The combination of these two parameters affects the size of the batches of rowsthat BigQueryIO creates before calling the Storage Write API. Setting thefrequency too high can result in smaller batches, which can affect performance.As a general rule, a single stream should be able to handle throughput of atleast 1Mb per second. Creating exclusive streams is an expensive operation forthe BigQuery service, so you should use only as many streams as needed for youruse case. Triggering frequency in single-digit seconds is a good choice for mostpipelines.
Similar to streaming inserts,STORAGE_WRITE_API supports dynamically determiningthe number of parallel streams to write to BigQuery (starting 2.42.0). You canexplicitly enable this usingwithAutoSharding.
STORAGE_WRITE_API defaults to dynamic sharding whennumStorageWriteApiStreams is set to 0 or is unspecified.
When usingSTORAGE_WRITE_API, thePCollection returned byWriteResult.getFailedStorageApiInsertscontains the rows that failed to be written to the Storage Write API sink.
If your use case allows for potential duplicate records in the target table, youcan use theSTORAGE_API_AT_LEAST_ONCEmethod. This method doesn’t persist the records to be written toBigQuery into its shuffle storage, which is needed to provide the exactly-once semanticsof theSTORAGE_WRITE_API method. Therefore, for most pipelines, using this method is oftenless expensive and results in lower latency.If you useSTORAGE_API_AT_LEAST_ONCE, you don’t need tospecify the number of streams, and you can’t specify the triggering frequency.
Auto sharding is not applicable forSTORAGE_API_AT_LEAST_ONCE.
When usingSTORAGE_API_AT_LEAST_ONCE, thePCollection returned byWriteResult.getFailedStorageApiInsertscontains the rows that failed to be written to the Storage Write API sink.
By default, the BigQueryIO Write transform uses Storage Write API settings thatare reasonable for most pipelines.
If you see performance issues, such as stuck pipelines, quota limit errors, ormonotonically increasing backlog, consider tuning the following pipelineoptions when you run the job:
| Option (Java/Python) | Description |
|---|---|
| If the write mode isSTORAGE_API_AT_LEAST_ONCE and theuseStorageApiConnectionPool option istrue, thisoption sets the maximum number of connections that each pool creates, perworker and region. If your pipeline writes many dynamic destinations (morethan 20), and you see performance issues or append operations arecompeting for streams, then consider increasing this value. |
| If the write mode is In practice, the minimum number of connections created is the minimumof this option and |
| If the write mode isSTORAGE_API_AT_LEAST_ONCE, this optionsets the number of stream append clients allocated per worker anddestination. For high-volume pipelines with a large number of workers,a high value can cause the job to exceed the BigQuery connection quota.For most low- to mid-volume pipelines, the default value is sufficient. |
| Maximum size of a single append to the Storage Write API (best effort). |
| Maximum record count of a single append to the Storage Write API (besteffort). |
| Expected maximum number of inflight messages per connection. |
| If If you enable multiplexing, consider setting the following options totune the number of connections created by the connection pool:
For more information, seeConnection pool management in the BigQuery documentation. |
Before using the Storage Write API, be aware of theBigQuery Storage Write API quotas.
You can use the dynamic destinations feature to write elements in aPCollection to different BigQuery tables, possibly with different schemas.
The dynamic destinations feature groups your user type by a user-defineddestination key, uses the key to compute a destination table and/or schema, andwrites each group’s elements to the computed destination.
In addition, you can also write your own types that have a mapping function toTableRow, and you can use side inputs in allDynamicDestinations methods.
To use dynamic destinations, you must create aDynamicDestinations object andimplement the following methods:
getDestination: Returns an object thatgetTable andgetSchema can use asthe destination key to compute the destination table and/or schema.
getTable: Returns the table (as aTableDestination object) for thedestination key. This method must return a unique table for each uniquedestination.
getSchema: Returns the table schema (as aTableSchema object) for thedestination key.
Then, usewrite().to with yourDynamicDestinations object. This exampleuses aPCollection that contains weather data and writes the data into adifferent table for each year.
/*@DefaultCoder(AvroCoder.class)static class WeatherData { final long year; final long month; final long day; final double maxTemp; public WeatherData() { this.year = 0; this.month = 0; this.day = 0; this.maxTemp = 0.0f; } public WeatherData(long year, long month, long day, double maxTemp) { this.year = year; this.month = month; this.day = day; this.maxTemp = maxTemp; }}*/PCollection<WeatherData>weatherData=p.apply(BigQueryIO.read((SchemaAndRecordelem)->{GenericRecordrecord=elem.getRecord();returnnewWeatherData((Long)record.get("year"),(Long)record.get("month"),(Long)record.get("day"),(Double)record.get("max_temperature"));}).fromQuery("SELECT year, month, day, max_temperature "+"FROM [apache-beam-testing.samples.weather_stations] "+"WHERE year BETWEEN 2007 AND 2009").withCoder(AvroCoder.of(WeatherData.class)));// We will send the weather data into different tables for every year.weatherData.apply(BigQueryIO.<WeatherData>write().to(newDynamicDestinations<WeatherData,Long>(){@OverridepublicLonggetDestination(ValueInSingleWindow<WeatherData>elem){returnelem.getValue().year;}@OverridepublicTableDestinationgetTable(Longdestination){returnnewTableDestination(newTableReference().setProjectId(writeProject).setDatasetId(writeDataset).setTableId(writeTable+"_"+destination),"Table for year "+destination);}@OverridepublicTableSchemagetSchema(Longdestination){returnnewTableSchema().setFields(ImmutableList.of(newTableFieldSchema().setName("year").setType("INTEGER").setMode("REQUIRED"),newTableFieldSchema().setName("month").setType("INTEGER").setMode("REQUIRED"),newTableFieldSchema().setName("day").setType("INTEGER").setMode("REQUIRED"),newTableFieldSchema().setName("maxTemp").setType("FLOAT").setMode("NULLABLE")));}}).withFormatFunction((WeatherDataelem)->newTableRow().set("year",elem.year).set("month",elem.month).set("day",elem.day).set("maxTemp",elem.maxTemp)).withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED).withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));fictional_characters_view=beam.pvalue.AsDict(pipeline|'CreateCharacters'>>beam.Create([('Yoda',True),('Obi Wan Kenobi',True)]))deftable_fn(element,fictional_characters):ifelementinfictional_characters:return'my_dataset.fictional_quotes'else:return'my_dataset.real_quotes'quotes|'WriteWithDynamicDestination'>>beam.io.WriteToBigQuery(table_fn,schema=table_schema,table_side_inputs=(fictional_characters_view,),write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)BigQuery time partitioning divides your table into smaller partitions, which iscalled apartitioned table.Partitioned tables make it easier for you to manage and query your data.
To use BigQuery time partitioning, use one of these two methods:
withTimePartitioning: This method takes aTimePartitioning class, and isonly usable if you are writing to a single table.
withJsonTimePartitioning: This method is the same aswithTimePartitioning, but takes a JSON-serialized String object.
This example generates one partition per day.
weatherData.apply(BigQueryIO.<WeatherData>write().to(tableSpec+"_partitioning").withSchema(tableSchema).withFormatFunction((WeatherDataelem)->newTableRow().set("year",elem.year).set("month",elem.month).set("day",elem.day).set("maxTemp",elem.maxTemp))// NOTE: an existing table without time partitioning set up will not work.withTimePartitioning(newTimePartitioning().setType("DAY")).withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED).withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));BigQueryIO currently has the following limitations.
You can’t sequence the completion of a BigQuery write with other steps ofyour pipeline.
If you are using the Beam SDK for Python, you might have import size quotaissues if you write a very large dataset. As a workaround, you can partitionthe dataset (for example, using Beam’sPartition transform) and write tomultiple BigQuery tables. The Beam SDK for Java does not have this limitationas it partitions your dataset for you.
When youload data into BigQuery,these limits are applied.By default, BigQuery uses a shared pool of slots to load data.This means that the available capacity is not guaranteed, and your load may be queued untila slot becomes available. If a slot does not become available within 6 hours,the load will fail due to the limits set by BigQuery. To avoid this situation,it is highly recommended that you useBigQuery reservations,which ensure that your load does not get queued and fail due to capacity issues.
You can find additional examples that use BigQuery in Beam’s examplesdirectories.
These examples are from the Javacookbook examplesdirectory.
BigQueryTornadoesreads the public samples of weather data from BigQuery, counts the number oftornadoes that occur in each month, and writes the results to a BigQuerytable.
CombinePerKeyExamplesreads the public Shakespeare data from BigQuery, and for each word in thedataset that exceeds a given length, generates a string containing the list ofplay names in which that word appears. The pipeline then writes the results toa BigQuery table.
FilterExamplesreads public samples of weather data from BigQuery, performs a projectionon the data, finds the global mean of the temperature readings, filters onreadings for a single given month, and outputs only data (for that month)that has a mean temp smaller than the derived global mean.
JoinExamplesreads a sample of theGDELT “world event” fromBigQuery and joins the eventaction country code against a table that mapscountry codes to country names.
MaxPerKeyExamplesreads the public samples of weather data from BigQuery, finds the maximumtemperature for each month, and writes the results to a BigQuery table.
TriggerExampleperforms a streaming analysis of traffic data from San Diego freeways. Thepipeline looks at the data coming in from a text file and writes the resultsto a BigQuery table.
These examples are from the Javacomplete examplesdirectory.
AutoCompletecomputes the most popular hash tags for every prefix, which can be used forauto-completion. The pipeline can optionally write the results to a BigQuerytable.
StreamingWordExtractreads lines of text, splits each line into individual words, capitalizes thosewords, and writes the output to a BigQuery table.
TrafficMaxLaneFlowreads traffic sensor data, finds the lane that had the highest recorded flow,and writes the results to a BigQuery table.
TrafficRoutesreads traffic sensor data, calculates the average speed for each window andlooks for slowdowns in routes, and writes the results to a BigQuery table.
These examples are from the Pythoncookbook examplesdirectory.
BigQuery schemacreates aTableSchema with nested and repeated fields, generates data withnested and repeated fields, and writes the data to a BigQuery table.
BigQuery side inputsuses BigQuery sources as side inputs. It illustrates how to insertside-inputs into transforms in three different forms: as a singleton, as aiterator, and as a list.
BigQuery tornadoesreads from a BigQuery table that has the ‘month’ and ’tornado’ fields as partof the table schema, computes the number of tornadoes in each month, andoutputs the results to a BigQuery table.
BigQuery filtersreads weather station data from a BigQuery table, manipulates BigQuery rows inmemory, and writes the results to a BigQuery table.
Last updated on 2025/12/16
Was it all useful and clear? Is there anything that you would like to change? Let us know!