Stream data using the Storage Write API
This document describes how to use theBigQuery Storage Write API to stream datainto BigQuery.
In streaming scenarios, data arrives continuously and should be available forreads with minimal latency. When using the BigQuery Storage Write API for streamingworkloads, consider what guarantees you need:
- If your application only needs at-least-once semantics, then use thedefaultstream.
- If you need exactly-once semantics, then create one or more streams incommitted type and use stream offsets to guarantee exactly-once writes.
In committed type, data written to the stream is available for query as soon asthe server acknowledges the write request. The default stream also usescommitted type, but does not provide exactly-once guarantees.
Use the default stream for at-least-once semantics
If your application can accept the possibility of duplicate recordsappearing in the destination table, then we recommend using thedefault stream for streamingscenarios.
The following code shows how to write data to the default stream:
Java
To learn how to install and use the client library for BigQuery, seeBigQuery client libraries. For more information, see theBigQueryJava API reference documentation. To authenticate to BigQuery, set up Application Default Credentials. For more information, seeSet up authentication for client libraries.importcom.google.api.core.ApiFuture;importcom.google.api.core.ApiFutureCallback;importcom.google.api.core.ApiFutures;importcom.google.api.gax.batching.FlowControlSettings;importcom.google.api.gax.core.FixedExecutorProvider;importcom.google.api.gax.retrying.RetrySettings;importcom.google.cloud.bigquery.BigQuery;importcom.google.cloud.bigquery.BigQueryOptions;importcom.google.cloud.bigquery.QueryJobConfiguration;importcom.google.cloud.bigquery.TableResult;importcom.google.cloud.bigquery.storage.v1.AppendRowsRequest;importcom.google.cloud.bigquery.storage.v1.AppendRowsResponse;importcom.google.cloud.bigquery.storage.v1.BigQueryWriteClient;importcom.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;importcom.google.cloud.bigquery.storage.v1.Exceptions;importcom.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError;importcom.google.cloud.bigquery.storage.v1.Exceptions.MaximumRequestCallbackWaitTimeExceededException;importcom.google.cloud.bigquery.storage.v1.Exceptions.StorageException;importcom.google.cloud.bigquery.storage.v1.Exceptions.StreamWriterClosedException;importcom.google.cloud.bigquery.storage.v1.JsonStreamWriter;importcom.google.cloud.bigquery.storage.v1.TableName;importcom.google.common.util.concurrent.MoreExecutors;importcom.google.protobuf.ByteString;importcom.google.protobuf.Descriptors.DescriptorValidationException;importjava.io.IOException;importjava.util.Map;importjava.util.concurrent.Executors;importjava.util.concurrent.Phaser;importjava.util.concurrent.atomic.AtomicInteger;importjavax.annotation.concurrent.GuardedBy;importorg.json.JSONArray;importorg.json.JSONObject;importorg.threeten.bp.Duration;publicclassWriteToDefaultStream{publicstaticvoidrunWriteToDefaultStream()throwsDescriptorValidationException,InterruptedException,IOException{// TODO(developer): Replace these variables before running the sample.StringprojectId="MY_PROJECT_ID";StringdatasetName="MY_DATASET_NAME";StringtableName="MY_TABLE_NAME";writeToDefaultStream(projectId,datasetName,tableName);}privatestaticByteStringbuildByteString(){byte[]bytes=newbyte[]{1,2,3,4,5};returnByteString.copyFrom(bytes);}// Create a JSON object that is compatible with the table schema.privatestaticJSONObjectbuildRecord(inti,intj){JSONObjectrecord=newJSONObject();StringBuildersbSuffix=newStringBuilder();for(intk=0;k <j;k++){sbSuffix.append(k);}record.put("test_string",String.format("record %03d-%03d %s",i,j,sbSuffix.toString()));ByteStringbyteString=buildByteString();record.put("test_bytes",byteString);record.put("test_geo","POLYGON((-124.49 47.35,-124.49 40.73,-116.49 40.73,-116.49 47.35,-124.49 47.35))");returnrecord;}publicstaticvoidwriteToDefaultStream(StringprojectId,StringdatasetName,StringtableName)throwsDescriptorValidationException,InterruptedException,IOException{TableNameparentTable=TableName.of(projectId,datasetName,tableName);DataWriterwriter=newDataWriter();// One time initialization for the worker.writer.initialize(parentTable);// Write two batches of fake data to the stream, each with 10 JSON records. Data may be// batched up to the maximum request size:// https://cloud.google.com/bigquery/quotas#write-api-limitsfor(inti=0;i <2;i++){JSONArrayjsonArr=newJSONArray();for(intj=0;j <10;j++){JSONObjectrecord=buildRecord(i,j);jsonArr.put(record);}writer.append(newAppendContext(jsonArr));}// Final cleanup for the stream during worker teardown.writer.cleanup();verifyExpectedRowCount(parentTable,12);System.out.println("Appended records successfully.");}privatestaticvoidverifyExpectedRowCount(TableNameparentTable,intexpectedRowCount)throwsInterruptedException{StringqueryRowCount="SELECT COUNT(*) FROM `"+parentTable.getProject()+"."+parentTable.getDataset()+"."+parentTable.getTable()+"`";QueryJobConfigurationqueryConfig=QueryJobConfiguration.newBuilder(queryRowCount).build();BigQuerybigquery=BigQueryOptions.getDefaultInstance().getService();TableResultresults=bigquery.query(queryConfig);intcountRowsActual=Integer.parseInt(results.getValues().iterator().next().get("f0_").getStringValue());if(countRowsActual!=expectedRowCount){thrownewRuntimeException("Unexpected row count. Expected: "+expectedRowCount+". Actual: "+countRowsActual);}}privatestaticclassAppendContext{JSONArraydata;AppendContext(JSONArraydata){this.data=data;}}privatestaticclassDataWriter{privatestaticfinalintMAX_RECREATE_COUNT=3;privateBigQueryWriteClientclient;// Track the number of in-flight requests to wait for all responses before shutting down.privatefinalPhaserinflightRequestCount=newPhaser(1);privatefinalObjectlock=newObject();privateJsonStreamWriterstreamWriter;@GuardedBy("lock")privateRuntimeExceptionerror=null;privateAtomicIntegerrecreateCount=newAtomicInteger(0);privateJsonStreamWritercreateStreamWriter(StringtableName)throwsDescriptorValidationException,IOException,InterruptedException{// Configure in-stream automatic retry settings.// Error codes that are immediately retried:// * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED// Error codes that are retried with exponential backoff:// * RESOURCE_EXHAUSTEDRetrySettingsretrySettings=RetrySettings.newBuilder().setInitialRetryDelay(Duration.ofMillis(500)).setRetryDelayMultiplier(1.1).setMaxAttempts(5).setMaxRetryDelay(Duration.ofMinutes(1)).build();// Use the JSON stream writer to send records in JSON format. Specify the table name to write// to the default stream.// For more information about JsonStreamWriter, see:// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.htmlreturnJsonStreamWriter.newBuilder(tableName,client).setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(100))).setChannelProvider(BigQueryWriteSettings.defaultGrpcTransportProviderBuilder().setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1)).setKeepAliveTimeout(org.threeten.bp.Duration.ofMinutes(1)).setKeepAliveWithoutCalls(true).setChannelsPerCpu(2).build()).setEnableConnectionPool(true)// This will allow connection pool to scale up better..setFlowControlSettings(FlowControlSettings.newBuilder().setMaxOutstandingElementCount(100L).build())// If value is missing in json and there is a default value configured on bigquery// column, apply the default value to the missing value field..setDefaultMissingValueInterpretation(AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE).setRetrySettings(retrySettings).build();}publicvoidinitialize(TableNameparentTable)throwsDescriptorValidationException,IOException,InterruptedException{// Initialize client without settings, internally within stream writer a new client will be// created with full settings.client=BigQueryWriteClient.create();streamWriter=createStreamWriter(parentTable.toString());}publicvoidappend(AppendContextappendContext)throwsDescriptorValidationException,IOException,InterruptedException{synchronized(this.lock){if(!streamWriter.isUserClosed() &&streamWriter.isClosed() &&recreateCount.getAndIncrement() <MAX_RECREATE_COUNT){streamWriter=createStreamWriter(streamWriter.getStreamName());this.error=null;}// If earlier appends have failed, we need to reset before continuing.if(this.error!=null){throwthis.error;}}// Append asynchronously for increased throughput.ApiFuture<AppendRowsResponse>future=streamWriter.append(appendContext.data);ApiFutures.addCallback(future,newAppendCompleteCallback(this,appendContext),MoreExecutors.directExecutor());// Increase the count of in-flight requests.inflightRequestCount.register();}publicvoidcleanup(){// Wait for all in-flight requests to complete.inflightRequestCount.arriveAndAwaitAdvance();client.close();// Close the connection to the server.streamWriter.close();// Verify that no error occurred in the stream.synchronized(this.lock){if(this.error!=null){throwthis.error;}}}staticclassAppendCompleteCallbackimplementsApiFutureCallback<AppendRowsResponse>{privatefinalDataWriterparent;privatefinalAppendContextappendContext;publicAppendCompleteCallback(DataWriterparent,AppendContextappendContext){this.parent=parent;this.appendContext=appendContext;}publicvoidonSuccess(AppendRowsResponseresponse){System.out.format("Append success\n");this.parent.recreateCount.set(0);done();}publicvoidonFailure(Throwablethrowable){if(throwableinstanceofAppendSerializationError){AppendSerializationErrorase=(AppendSerializationError)throwable;Map<Integer,String>rowIndexToErrorMessage=ase.getRowIndexToErrorMessage();if(rowIndexToErrorMessage.size() >0){// Omit the faulty rowsJSONArraydataNew=newJSONArray();for(inti=0;i <appendContext.data.length();i++){if(!rowIndexToErrorMessage.containsKey(i)){dataNew.put(appendContext.data.get(i));}else{// process faulty rows by placing them on a dead-letter-queue, for instance}}// Retry the remaining valid rows, but using a separate thread to// avoid potentially blocking while we are in a callback.if(dataNew.length() >0){try{this.parent.append(newAppendContext(dataNew));}catch(DescriptorValidationExceptione){thrownewRuntimeException(e);}catch(IOExceptione){thrownewRuntimeException(e);}catch(InterruptedExceptione){thrownewRuntimeException(e);}}// Mark the existing attempt as done since we got a response for itdone();return;}}booleanresendRequest=false;if(throwableinstanceofMaximumRequestCallbackWaitTimeExceededException){resendRequest=true;}elseif(throwableinstanceofStreamWriterClosedException){if(!parent.streamWriter.isUserClosed()){resendRequest=true;}}if(resendRequest){// Retry this request.try{this.parent.append(newAppendContext(appendContext.data));}catch(DescriptorValidationExceptione){thrownewRuntimeException(e);}catch(IOExceptione){thrownewRuntimeException(e);}catch(InterruptedExceptione){thrownewRuntimeException(e);}// Mark the existing attempt as done since we got a response for itdone();return;}synchronized(this.parent.lock){if(this.parent.error==null){StorageExceptionstorageException=Exceptions.toStorageException(throwable);this.parent.error=(storageException!=null)?storageException:newRuntimeException(throwable);}}done();}privatevoiddone(){// Reduce the count of in-flight requests.this.parent.inflightRequestCount.arriveAndDeregister();}}}}
Node.js
To learn how to install and use the client library for BigQuery, seeBigQuery client libraries. To authenticate to BigQuery, set up Application Default Credentials. For more information, seeSet up authentication for client libraries.const{adapt,managedwriter}=require('@google-cloud/bigquery-storage');const{WriterClient,JSONWriter}=managedwriter;asyncfunctionappendJSONRowsDefaultStream(){/** * TODO(developer): Uncomment the following lines before running the sample. */// projectId = 'my_project';// datasetId = 'my_dataset';// tableId = 'my_table';constdestinationTable=`projects/${projectId}/datasets/${datasetId}/tables/${tableId}`;constwriteClient=newWriterClient({projectId});try{constwriteStream=awaitwriteClient.getWriteStream({streamId:`${destinationTable}/streams/_default`,view:'FULL',});constprotoDescriptor=adapt.convertStorageSchemaToProto2Descriptor(writeStream.tableSchema,'root',);constconnection=awaitwriteClient.createStreamConnection({streamId:managedwriter.DefaultStream,destinationTable,});conststreamId=connection.getStreamId();constwriter=newJSONWriter({streamId,connection,protoDescriptor,});letrows=[];constpendingWrites=[];// Row 1letrow={row_num:1,customer_name:'Octavia',};rows.push(row);// Row 2row={row_num:2,customer_name:'Turing',};rows.push(row);// Send batch.letpw=writer.appendRows(rows);pendingWrites.push(pw);rows=[];// Row 3row={row_num:3,customer_name:'Bell',};rows.push(row);// Send batch.pw=writer.appendRows(rows);pendingWrites.push(pw);constresults=awaitPromise.all(pendingWrites.map(pw=>pw.getResult()),);console.log('Write results:',results);}catch(err){console.log(err);}finally{writeClient.close();}}
Python
This example shows how to insert a record with two fields using the default stream:
fromgoogle.cloudimportbigquery_storage_v1fromgoogle.cloud.bigquery_storage_v1importtypesfromgoogle.cloud.bigquery_storage_v1importwriterfromgoogle.protobufimportdescriptor_pb2importloggingimportjsonimportsample_data_pb2# The list of columns from the table's schema to search in the given data to write to BigQuery.TABLE_COLUMNS_TO_CHECK=["name","age"]# Function to create a batch of row data to be serialized.defcreate_row_data(data):row=sample_data_pb2.SampleData()forfieldinTABLE_COLUMNS_TO_CHECK:# Optional fields will be passed as null if not providediffieldindata:setattr(row,field,data[field])returnrow.SerializeToString()classBigQueryStorageWriteAppend(object):# The stream name is: projects/{project}/datasets/{dataset}/tables/{table}/_defaultdefappend_rows_proto2(project_id:str,dataset_id:str,table_id:str,data:dict):write_client=bigquery_storage_v1.BigQueryWriteClient()parent=write_client.table_path(project_id,dataset_id,table_id)stream_name=f'{parent}/_default'write_stream=types.WriteStream()# Create a template with fields needed for the first request.request_template=types.AppendRowsRequest()# The request must contain the stream name.request_template.write_stream=stream_name# Generating the protocol buffer representation of the message descriptor.proto_schema=types.ProtoSchema()proto_descriptor=descriptor_pb2.DescriptorProto()sample_data_pb2.SampleData.DESCRIPTOR.CopyToProto(proto_descriptor)proto_schema.proto_descriptor=proto_descriptorproto_data=types.AppendRowsRequest.ProtoData()proto_data.writer_schema=proto_schemarequest_template.proto_rows=proto_data# Construct an AppendRowsStream to send an arbitrary number of requests to a stream.append_rows_stream=writer.AppendRowsStream(write_client,request_template)# Append proto2 serialized bytes to the serialized_rows repeated field using create_row_data.proto_rows=types.ProtoRows()forrowindata:proto_rows.serialized_rows.append(create_row_data(row))# Appends data to the given stream.request=types.AppendRowsRequest()proto_data=types.AppendRowsRequest.ProtoData()proto_data.rows=proto_rowsrequest.proto_rows=proto_dataappend_rows_stream.send(request)print(f"Rows to table: '{parent}' have been written.")if__name__=="__main__":###### Uncomment the below block to provide additional logging capabilities #######logging.basicConfig(# level=logging.DEBUG,# format="%(asctime)s [%(levelname)s] %(message)s",# handlers=[# logging.StreamHandler()# ]#)###### Uncomment the above block to provide additional logging capabilities ######withopen('entries.json','r')asjson_file:data=json.load(json_file)# Change this to your specific BigQuery project, dataset, table detailsBigQueryStorageWriteAppend.append_rows_proto2("PROJECT_ID","DATASET_ID","TABLE_ID ",data=data)This code example depends on the compiled protocol modulesample_data_pb2.py. To create the compiled module, execute theprotoc --python_out=. sample_data.proto command, whereprotoc is theprotocol buffer compiler. Thesample_data.proto file defines the formatof the messages used in the Python example. To install theprotoc compiler, follow the instructions inProtocol Buffers - Google's data interchange format.
Here are the contents of thesample_data.proto file:
message SampleData { required string name = 1; required int64 age = 2;}This script consumes theentries.json file, which contains sample row data to be inserted into the BigQuery table:
{"name": "Jim", "age": 35}{"name": "Jane", "age": 27}Use multiplexing
You enablemultiplexingat the stream writer level for default stream only. To enable multiplexing inJava, call thesetEnableConnectionPool method when you construct aStreamWriter orJsonStreamWriter object.
After enabling the connection pool, the Java client library manages yourconnections in the background, scaling up connections if the existingconnections are considered too busy. For automatic scaling up to be moreeffective, you should consider lowering themaxInflightRequestslimit.
// One possible way for constructing StreamWriterStreamWriter.newBuilder(streamName).setWriterSchema(protoSchema).setEnableConnectionPool(true).setMaxInflightRequests(100).build();// One possible way for constructing JsonStreamWriterJsonStreamWriter.newBuilder(tableName,bigqueryClient).setEnableConnectionPool(true).setMaxInflightRequests(100).build();
To enable multiplexing in Go, seeConnection Sharing (Multiplexing).
Use committed type for exactly-once semantics
If you need exactly-once write semantics, create a write stream in committedtype. In committed type, records are available for query as soon as the clientreceives acknowledgement from the back end.
Committed type provides exactly-once delivery within a stream through the use ofrecord offsets. By using record offsets, the application specifies the nextappend offset in each call toAppendRows. The write operation isonly performed if the offset value matches the next append offset. For moreinformation, seeManage stream offsets to achieve exactly-once semantics.
If you don't provide an offset, then records are appended to the current end ofthe stream. In that case, if an append request returns an error, retrying itcould result in the record appearing more than once in the stream.
To use committed type, perform the following steps:
Java
- Call
CreateWriteStreamto create one or more streams in committed type. - For each stream, call
AppendRowsin a loop to write batches of records. - Call
FinalizeWriteStreamfor each stream to release the stream. After youcall this method, you cannot write any more rows to the stream. This step isoptional in committed type, but helps to prevent exceeding the limit onactive streams. For more information, seeLimit the rate of stream creation.
Node.js
- Call
createWriteStreamFullResponseto create one or more streams in committed type. - For each stream, call
appendRowsin a loop to write batches of records. - Call
finalizefor each stream to release the stream. After youcall this method, you cannot write any more rows to the stream. This step isoptional in committed type, but helps to prevent exceeding the limit onactive streams. For more information, seeLimit the rate of stream creation.
You cannot delete a stream explicitly. Streams follow the system-defined time to live (TTL):
- A committed stream has a TTL of three days if there is no traffic on the stream.
- A buffered stream by default has a TTL of seven days if there is no traffic on the stream.
The following code shows how to use committed type:
Java
To learn how to install and use the client library for BigQuery, seeBigQuery client libraries. For more information, see theBigQueryJava API reference documentation. To authenticate to BigQuery, set up Application Default Credentials. For more information, seeSet up authentication for client libraries.importcom.google.api.core.ApiFuture;importcom.google.api.core.ApiFutureCallback;importcom.google.api.core.ApiFutures;importcom.google.api.gax.retrying.RetrySettings;importcom.google.cloud.bigquery.storage.v1.AppendRowsResponse;importcom.google.cloud.bigquery.storage.v1.BigQueryWriteClient;importcom.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;importcom.google.cloud.bigquery.storage.v1.Exceptions;importcom.google.cloud.bigquery.storage.v1.Exceptions.StorageException;importcom.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse;importcom.google.cloud.bigquery.storage.v1.JsonStreamWriter;importcom.google.cloud.bigquery.storage.v1.TableName;importcom.google.cloud.bigquery.storage.v1.WriteStream;importcom.google.common.util.concurrent.MoreExecutors;importcom.google.protobuf.Descriptors.DescriptorValidationException;importjava.io.IOException;importjava.util.concurrent.ExecutionException;importjava.util.concurrent.Phaser;importjavax.annotation.concurrent.GuardedBy;importorg.json.JSONArray;importorg.json.JSONObject;importorg.threeten.bp.Duration;publicclassWriteCommittedStream{publicstaticvoidrunWriteCommittedStream()throwsDescriptorValidationException,InterruptedException,IOException{// TODO(developer): Replace these variables before running the sample.StringprojectId="MY_PROJECT_ID";StringdatasetName="MY_DATASET_NAME";StringtableName="MY_TABLE_NAME";writeCommittedStream(projectId,datasetName,tableName);}publicstaticvoidwriteCommittedStream(StringprojectId,StringdatasetName,StringtableName)throwsDescriptorValidationException,InterruptedException,IOException{BigQueryWriteClientclient=BigQueryWriteClient.create();TableNameparentTable=TableName.of(projectId,datasetName,tableName);DataWriterwriter=newDataWriter();// One time initialization.writer.initialize(parentTable,client);try{// Write two batches of fake data to the stream, each with 10 JSON records. Data may be// batched up to the maximum request size:// https://cloud.google.com/bigquery/quotas#write-api-limitslongoffset=0;for(inti=0;i <2;i++){// Create a JSON object that is compatible with the table schema.JSONArrayjsonArr=newJSONArray();for(intj=0;j <10;j++){JSONObjectrecord=newJSONObject();record.put("col1",String.format("batch-record %03d-%03d",i,j));jsonArr.put(record);}writer.append(jsonArr,offset);offset+=jsonArr.length();}}catch(ExecutionExceptione){// If the wrapped exception is a StatusRuntimeException, check the state of the operation.// If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see:// https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.htmlSystem.out.println("Failed to append records. \n"+e);}// Final cleanup for the stream.writer.cleanup(client);System.out.println("Appended records successfully.");}// A simple wrapper object showing how the stateful stream writer should be used.privatestaticclassDataWriter{privateJsonStreamWriterstreamWriter;// Track the number of in-flight requests to wait for all responses before shutting down.privatefinalPhaserinflightRequestCount=newPhaser(1);privatefinalObjectlock=newObject();@GuardedBy("lock")privateRuntimeExceptionerror=null;voidinitialize(TableNameparentTable,BigQueryWriteClientclient)throwsIOException,DescriptorValidationException,InterruptedException{// Initialize a write stream for the specified table.// For more information on WriteStream.Type, see:// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/WriteStream.Type.htmlWriteStreamstream=WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build();CreateWriteStreamRequestcreateWriteStreamRequest=CreateWriteStreamRequest.newBuilder().setParent(parentTable.toString()).setWriteStream(stream).build();WriteStreamwriteStream=client.createWriteStream(createWriteStreamRequest);// Configure in-stream automatic retry settings.// Error codes that are immediately retried:// * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED// Error codes that are retried with exponential backoff:// * RESOURCE_EXHAUSTEDRetrySettingsretrySettings=RetrySettings.newBuilder().setInitialRetryDelay(Duration.ofMillis(500)).setRetryDelayMultiplier(1.1).setMaxAttempts(5).setMaxRetryDelay(Duration.ofMinutes(1)).build();// Use the JSON stream writer to send records in JSON format.// For more information about JsonStreamWriter, see:// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.htmlstreamWriter=JsonStreamWriter.newBuilder(writeStream.getName(),writeStream.getTableSchema(),client).setRetrySettings(retrySettings).build();}publicvoidappend(JSONArraydata,longoffset)throwsDescriptorValidationException,IOException,ExecutionException{synchronized(this.lock){// If earlier appends have failed, we need to reset before continuing.if(this.error!=null){throwthis.error;}}// Append asynchronously for increased throughput.ApiFuture<AppendRowsResponse>future=streamWriter.append(data,offset);ApiFutures.addCallback(future,newDataWriter.AppendCompleteCallback(this),MoreExecutors.directExecutor());// Increase the count of in-flight requests.inflightRequestCount.register();}publicvoidcleanup(BigQueryWriteClientclient){// Wait for all in-flight requests to complete.inflightRequestCount.arriveAndAwaitAdvance();// Close the connection to the server.streamWriter.close();// Verify that no error occurred in the stream.synchronized(this.lock){if(this.error!=null){throwthis.error;}}// Finalize the stream.FinalizeWriteStreamResponsefinalizeResponse=client.finalizeWriteStream(streamWriter.getStreamName());System.out.println("Rows written: "+finalizeResponse.getRowCount());}publicStringgetStreamName(){returnstreamWriter.getStreamName();}staticclassAppendCompleteCallbackimplementsApiFutureCallback<AppendRowsResponse>{privatefinalDataWriterparent;publicAppendCompleteCallback(DataWriterparent){this.parent=parent;}publicvoidonSuccess(AppendRowsResponseresponse){System.out.format("Append %d success\n",response.getAppendResult().getOffset().getValue());done();}publicvoidonFailure(Throwablethrowable){synchronized(this.parent.lock){if(this.parent.error==null){StorageExceptionstorageException=Exceptions.toStorageException(throwable);this.parent.error=(storageException!=null)?storageException:newRuntimeException(throwable);}}System.out.format("Error: %s\n",throwable.toString());done();}privatevoiddone(){// Reduce the count of in-flight requests.this.parent.inflightRequestCount.arriveAndDeregister();}}}}
Node.js
To learn how to install and use the client library for BigQuery, seeBigQuery client libraries. To authenticate to BigQuery, set up Application Default Credentials. For more information, seeSet up authentication for client libraries.const{adapt,managedwriter}=require('@google-cloud/bigquery-storage');const{WriterClient,JSONWriter}=managedwriter;asyncfunctionappendJSONRowsCommittedStream(){/** * TODO(developer): Uncomment the following lines before running the sample. */// projectId = 'my_project';// datasetId = 'my_dataset';// tableId = 'my_table';constdestinationTable=`projects/${projectId}/datasets/${datasetId}/tables/${tableId}`;conststreamType=managedwriter.CommittedStream;constwriteClient=newWriterClient({projectId});try{constwriteStream=awaitwriteClient.createWriteStreamFullResponse({streamType,destinationTable,});conststreamId=writeStream.name;console.log(`Stream created:${streamId}`);constprotoDescriptor=adapt.convertStorageSchemaToProto2Descriptor(writeStream.tableSchema,'root',);constconnection=awaitwriteClient.createStreamConnection({streamId,});constwriter=newJSONWriter({streamId,connection,protoDescriptor,});letrows=[];constpendingWrites=[];// Row 1letrow={row_num:1,customer_name:'Octavia',};rows.push(row);// Row 2row={row_num:2,customer_name:'Turing',};rows.push(row);// Send batch.letpw=writer.appendRows(rows);pendingWrites.push(pw);rows=[];// Row 3row={row_num:3,customer_name:'Bell',};rows.push(row);// Send batch.pw=writer.appendRows(rows);pendingWrites.push(pw);constresults=awaitPromise.all(pendingWrites.map(pw=>pw.getResult()),);console.log('Write results:',results);const{rowCount}=awaitconnection.finalize();console.log(`Row count:${rowCount}`);}catch(err){console.log(err);}finally{writeClient.close();}}
Use the Apache Arrow format to ingest data
The following code shows how to ingest data using the Apache Arrowformat.
Python
This example shows how to ingest a serialized PyArrow table using the defaultstream. For a more detailed, end-to-end example, see thePyArrow example on GitHub.
fromgoogle.cloud.bigquery_storage_v1importtypesasgapic_typesfromgoogle.cloud.bigquery_storage_v1.writerimportAppendRowsStreamfromgoogle.cloudimportbigquery_storage_v1defappend_rows_with_pyarrow(pyarrow_table:pyarrow.Table,project_id:str,dataset_id:str,table_id:str,):bqstorage_write_client=bigquery_storage_v1.BigQueryWriteClient()# Create request_template.request_template=gapic_types.AppendRowsRequest()request_template.write_stream=(f"projects/{project_id}/datasets/{dataset_id}/tables/{table_id}/_default")arrow_data=gapic_types.AppendRowsRequest.ArrowData()arrow_data.writer_schema.serialized_schema=(pyarrow_table.schema.serialize().to_pybytes())request_template.arrow_rows=arrow_data# Create AppendRowsStream.append_rows_stream=AppendRowsStream(bqstorage_write_client,request_template,)# Create request with table data.request=gapic_types.AppendRowsRequest()request.arrow_rows.rows.serialized_record_batch=(pyarrow_table.to_batches()[0].serialize().to_pybytes())# Send request.future=append_rows_stream.send(request)# Wait for result.future.result()Java
To learn how to install and use the client library for BigQuery, seeBigQuery client libraries. For more information, see theBigQueryJava API reference documentation. To authenticate to BigQuery, set up Application Default Credentials. For more information, seeSet up authentication for client libraries.importcom.google.api.core.ApiFuture;importcom.google.api.core.ApiFutureCallback;importcom.google.api.core.ApiFutures;importcom.google.api.gax.core.FixedExecutorProvider;importcom.google.api.gax.retrying.RetrySettings;importcom.google.cloud.bigquery.BigQuery;importcom.google.cloud.bigquery.BigQueryOptions;importcom.google.cloud.bigquery.QueryJobConfiguration;importcom.google.cloud.bigquery.TableResult;importcom.google.cloud.bigquery.storage.v1.AppendRowsRequest;importcom.google.cloud.bigquery.storage.v1.AppendRowsResponse;importcom.google.cloud.bigquery.storage.v1.BigQueryWriteClient;importcom.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;importcom.google.cloud.bigquery.storage.v1.Exceptions;importcom.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError;importcom.google.cloud.bigquery.storage.v1.Exceptions.MaximumRequestCallbackWaitTimeExceededException;importcom.google.cloud.bigquery.storage.v1.Exceptions.StorageException;importcom.google.cloud.bigquery.storage.v1.Exceptions.StreamWriterClosedException;importcom.google.cloud.bigquery.storage.v1.StreamWriter;importcom.google.cloud.bigquery.storage.v1.TableName;importcom.google.common.collect.ImmutableList;importcom.google.common.util.concurrent.MoreExecutors;importcom.google.protobuf.Descriptors.DescriptorValidationException;importjava.io.IOException;importjava.util.List;importjava.util.Map;importjava.util.concurrent.Executors;importjava.util.concurrent.Phaser;importjava.util.concurrent.atomic.AtomicInteger;importjavax.annotation.concurrent.GuardedBy;importorg.apache.arrow.memory.BufferAllocator;importorg.apache.arrow.memory.RootAllocator;importorg.apache.arrow.vector.BigIntVector;importorg.apache.arrow.vector.VarCharVector;importorg.apache.arrow.vector.VectorSchemaRoot;importorg.apache.arrow.vector.VectorUnloader;importorg.apache.arrow.vector.compression.CompressionCodec;importorg.apache.arrow.vector.compression.CompressionUtil;importorg.apache.arrow.vector.compression.NoCompressionCodec;importorg.apache.arrow.vector.ipc.message.ArrowRecordBatch;importorg.apache.arrow.vector.types.pojo.ArrowType;importorg.apache.arrow.vector.types.pojo.Field;importorg.apache.arrow.vector.types.pojo.FieldType;importorg.apache.arrow.vector.types.pojo.Schema;importorg.threeten.bp.Duration;/** * This class demonstrates how to ingest data using Arrow format into BigQuery via the default * stream. It initiates a DataWriter to establish a connection to BigQuery and reuses this * connection to continuously ingest data. */publicclassWriteToDefaultStreamWithArrow{publicstaticvoidmain(String[]args)throwsDescriptorValidationException,InterruptedException,IOException{if(args.length <3){System.out.println("Usage: WriteToDefaultStreamWithArrow <projectId> <datasetName> <tableName>");return;}StringprojectId=args[0];StringdatasetName=args[1];StringtableName=args[2];// Table schema should contain 3 fields:// ['test_string': STRING, 'test_int': INTEGER, 'test_geo':GEOGRAPHY]writeToDefaultStreamWithArrow(projectId,datasetName,tableName);}privatestaticSchemacreateArrowSchema(){List<Field>fields=ImmutableList.of(newField("test_string",FieldType.nullable(newArrowType.Utf8()),null),newField("test_int",FieldType.nullable(newArrowType.Int(64,true)),null),newField("test_geo",FieldType.nullable(newArrowType.Utf8()),null));returnnewSchema(fields,null);}// Create an ArrowRecordBatch object that is compatible with the table schema.privatestaticArrowRecordBatchbuildRecordBatch(VectorSchemaRootroot,introwCount){VarCharVectortestString=(VarCharVector)root.getVector("test_string");BigIntVectortestInt=(BigIntVector)root.getVector("test_int");VarCharVectortestGeo=(VarCharVector)root.getVector("test_geo");testString.allocateNew(rowCount);testInt.allocateNew(rowCount);testGeo.allocateNew(rowCount);for(inti=0;i <rowCount;i++){testString.set(i,("A"+i).getBytes());testInt.set(i,i+100);testGeo.set(i,"POLYGON((-124.49 47.35,-124.49 40.73,-116.49 40.73,-113.49 47.35,-124.49 47.35))".getBytes());}root.setRowCount(rowCount);CompressionCodeccodec=NoCompressionCodec.Factory.INSTANCE.createCodec(CompressionUtil.CodecType.NO_COMPRESSION);VectorUnloadervectorUnloader=newVectorUnloader(root,/* includeNullCount= */true,codec,/* alignBuffers= */true);returnvectorUnloader.getRecordBatch();}publicstaticvoidwriteToDefaultStreamWithArrow(StringprojectId,StringdatasetName,StringtableName)throwsDescriptorValidationException,InterruptedException,IOException{TableNameparentTable=TableName.of(projectId,datasetName,tableName);SchemaarrowSchema=createArrowSchema();DataWriterwriter=newDataWriter();// One time initialization for the worker.writer.initialize(parentTable,arrowSchema);longinitialRowCount=getRowCount(parentTable);BufferAllocatorallocator=newRootAllocator();// A writer should be used to ingest as much data as possible before teardown.// Append 100 batches.for(inti=0;i <100;i++){try(VectorSchemaRootroot=VectorSchemaRoot.create(arrowSchema,allocator)){// Each batch has 10 rows.ArrowRecordBatchbatch=buildRecordBatch(root,10);// Asynchronous append.writer.append(newArrowData(arrowSchema,batch));}}// Final cleanup for the stream during worker teardown.// It's blocked until all append requests' response are received.writer.cleanup();verifyExpectedRowCount(parentTable,initialRowCount+1000);System.out.println("Appended records successfully.");}privatestaticlonggetRowCount(TableNameparentTable)throwsInterruptedException{StringqueryRowCount="SELECT COUNT(*) FROM `"+parentTable.getProject()+"."+parentTable.getDataset()+"."+parentTable.getTable()+"`";QueryJobConfigurationqueryConfig=QueryJobConfiguration.newBuilder(queryRowCount).build();BigQuerybigquery=BigQueryOptions.newBuilder().setProjectId(parentTable.getProject()).build().getService();TableResultresults=bigquery.query(queryConfig);returnLong.parseLong(results.getValues().iterator().next().get("f0_").getStringValue());}privatestaticvoidverifyExpectedRowCount(TableNameparentTable,longexpectedRowCount)throwsInterruptedException{StringqueryRowCount="SELECT COUNT(*) FROM `"+parentTable.getProject()+"."+parentTable.getDataset()+"."+parentTable.getTable()+"`";QueryJobConfigurationqueryConfig=QueryJobConfiguration.newBuilder(queryRowCount).build();BigQuerybigquery=BigQueryOptions.newBuilder().setProjectId(parentTable.getProject()).build().getService();TableResultresults=bigquery.query(queryConfig);intcountRowsActual=Integer.parseInt(results.getValues().iterator().next().get("f0_").getStringValue());if(countRowsActual!=expectedRowCount){thrownewRuntimeException("Unexpected row count. Expected: "+expectedRowCount+". Actual: "+countRowsActual);}}privatestaticclassArrowData{SchemaarrowSchema;ArrowRecordBatchdata;ArrowData(SchemaarrowSchema,ArrowRecordBatchdata){this.arrowSchema=arrowSchema;this.data=data;}}privatestaticclassDataWriter{privatestaticfinalintMAX_RECREATE_COUNT=3;privateBigQueryWriteClientclient;// Track the number of in-flight requests to wait for all responses before shutting down.privatefinalPhaserinflightRequestCount=newPhaser(1);privatefinalObjectlock=newObject();privateSchemaarrowSchema;privateStreamWriterstreamWriter;@GuardedBy("lock")privateRuntimeExceptionerror=null;privatefinalAtomicIntegerrecreateCount=newAtomicInteger(0);privateStreamWritercreateStreamWriter(StringstreamName,SchemaarrowSchema)throwsDescriptorValidationException,IOException,InterruptedException{// Configure in-stream automatic retry settings.// Error codes that are immediately retried:// * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED// Error codes that are retried with exponential backoff:// * RESOURCE_EXHAUSTEDRetrySettingsretrySettings=RetrySettings.newBuilder().setInitialRetryDelay(Duration.ofMillis(500)).setRetryDelayMultiplier(1.1).setMaxAttempts(5).setMaxRetryDelay(Duration.ofMinutes(1)).build();// Use the Stream writer to send records in Arrow format. Specify the table name to write// to the default stream.// For more information about StreamWriter, see:// https://cloud.google.com/java/docs/reference/google-cloud-bigquerystorage/latest/com.google.cloud.bigquery.storage.v1.StreamWriterreturnStreamWriter.newBuilder(streamName,client).setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(100))).setChannelProvider(BigQueryWriteSettings.defaultGrpcTransportProviderBuilder().setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1)).setKeepAliveTimeout(org.threeten.bp.Duration.ofMinutes(1)).setKeepAliveWithoutCalls(true).setChannelsPerCpu(2).build()).setEnableConnectionPool(true)// If value is missing in ArrowRecordBatch and there is a default value configured on// bigquery column, apply the default value to the missing value field..setDefaultMissingValueInterpretation(AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE).setMaxRetryDuration(java.time.Duration.ofSeconds(5))// Set the StreamWriter with Arrow Schema, this would only allow the StreamWriter to// append data in Arrow format..setWriterSchema(arrowSchema).setRetrySettings(retrySettings).build();}publicvoidinitialize(TableNameparentTable,SchemaarrowSchema)throwsDescriptorValidationException,IOException,InterruptedException{// Initialize client without settings, internally within stream writer a new client will be// created with full settings.client=BigQueryWriteClient.create();streamWriter=createStreamWriter(parentTable.toString()+"/_default",arrowSchema);}publicvoidappend(ArrowDataarrowData)throwsDescriptorValidationException,IOException,InterruptedException{synchronized(this.lock){if(!streamWriter.isUserClosed() &&streamWriter.isClosed() &&recreateCount.getAndIncrement() <MAX_RECREATE_COUNT){streamWriter=createStreamWriter(streamWriter.getStreamName(),arrowData.arrowSchema);this.error=null;}// If earlier appends have failed, we need to reset before continuing.if(this.error!=null){throwthis.error;}}// Append asynchronously for increased throughput.ApiFuture<AppendRowsResponse>future=streamWriter.append(arrowData.data);ApiFutures.addCallback(future,newAppendCompleteCallback(this,arrowData),MoreExecutors.directExecutor());// Increase the count of in-flight requests.inflightRequestCount.register();}publicvoidcleanup(){// Wait for all in-flight requests to complete.inflightRequestCount.arriveAndAwaitAdvance();client.close();// Close the connection to the server.streamWriter.close();// Verify that no error occurred in the stream.synchronized(this.lock){if(this.error!=null){throwthis.error;}}}staticclassAppendCompleteCallbackimplementsApiFutureCallback<AppendRowsResponse>{privatefinalDataWriterparent;privatefinalArrowDataarrowData;publicAppendCompleteCallback(DataWriterparent,ArrowDataarrowData){this.parent=parent;this.arrowData=arrowData;}publicvoidonSuccess(AppendRowsResponseresponse){System.out.format("Append success\n");this.parent.recreateCount.set(0);done();}publicvoidonFailure(Throwablethrowable){System.out.format("Append failed: "+throwable.toString());if(throwableinstanceofAppendSerializationError){AppendSerializationErrorase=(AppendSerializationError)throwable;Map<Integer,String>rowIndexToErrorMessage=ase.getRowIndexToErrorMessage();if(rowIndexToErrorMessage.size() >0){System.out.format("row level errors: "+rowIndexToErrorMessage.toString());// The append returned failure with indices for faulty rows.// Fix the faulty rows or remove them from the appended data and retry the append.done();return;}}booleanresendRequest=false;if(throwableinstanceofMaximumRequestCallbackWaitTimeExceededException){resendRequest=true;}elseif(throwableinstanceofStreamWriterClosedException){if(!parent.streamWriter.isUserClosed()){resendRequest=true;}}if(resendRequest){// Retry this request.try{this.parent.append(newArrowData(arrowData.arrowSchema,arrowData.data));}catch(DescriptorValidationExceptione){thrownewRuntimeException(e);}catch(IOExceptione){thrownewRuntimeException(e);}catch(InterruptedExceptione){thrownewRuntimeException(e);}// Mark the existing attempt as done since we got a response for itdone();return;}synchronized(this.parent.lock){if(this.parent.error==null){StorageExceptionstorageException=Exceptions.toStorageException(throwable);this.parent.error=(storageException!=null)?storageException:newRuntimeException(throwable);}}done();}privatevoiddone(){// Reduce the count of in-flight requests.this.parent.inflightRequestCount.arriveAndDeregister();}}}}
Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2025-12-15 UTC.