Append pending records Stay organized with collections Save and categorize content based on your preferences.
Use the JSON stream writer to append pending records.
Explore further
For detailed documentation that includes this code sample, see the following:
Code sample
Java
Before trying this sample, follow theJava setup instructions in theBigQuery quickstart using client libraries. For more information, see theBigQueryJava API reference documentation.
To authenticate to BigQuery, set up Application Default Credentials. For more information, seeSet up authentication for client libraries.
importcom.google.api.core.ApiFuture;importcom.google.api.core.ApiFutureCallback;importcom.google.api.core.ApiFutures;importcom.google.api.gax.retrying.RetrySettings;importcom.google.cloud.bigquery.storage.v1.AppendRowsResponse;importcom.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest;importcom.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse;importcom.google.cloud.bigquery.storage.v1.BigQueryWriteClient;importcom.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;importcom.google.cloud.bigquery.storage.v1.Exceptions;importcom.google.cloud.bigquery.storage.v1.Exceptions.StorageException;importcom.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse;importcom.google.cloud.bigquery.storage.v1.JsonStreamWriter;importcom.google.cloud.bigquery.storage.v1.StorageError;importcom.google.cloud.bigquery.storage.v1.TableName;importcom.google.cloud.bigquery.storage.v1.WriteStream;importcom.google.common.util.concurrent.MoreExecutors;importcom.google.protobuf.Descriptors.DescriptorValidationException;importjava.io.IOException;importjava.util.concurrent.ExecutionException;importjava.util.concurrent.Phaser;importjavax.annotation.concurrent.GuardedBy;importorg.json.JSONArray;importorg.json.JSONObject;importorg.threeten.bp.Duration;publicclassWritePendingStream{publicstaticvoidrunWritePendingStream()throwsDescriptorValidationException,InterruptedException,IOException{// TODO(developer): Replace these variables before running the sample.StringprojectId="MY_PROJECT_ID";StringdatasetName="MY_DATASET_NAME";StringtableName="MY_TABLE_NAME";writePendingStream(projectId,datasetName,tableName);}publicstaticvoidwritePendingStream(StringprojectId,StringdatasetName,StringtableName)throwsDescriptorValidationException,InterruptedException,IOException{BigQueryWriteClientclient=BigQueryWriteClient.create();TableNameparentTable=TableName.of(projectId,datasetName,tableName);DataWriterwriter=newDataWriter();// One time initialization.writer.initialize(parentTable,client);try{// Write two batches of fake data to the stream, each with 10 JSON records. Data may be// batched up to the maximum request size:// https://cloud.google.com/bigquery/quotas#write-api-limitslongoffset=0;for(inti=0;i <2;i++){// Create a JSON object that is compatible with the table schema.JSONArrayjsonArr=newJSONArray();for(intj=0;j <10;j++){JSONObjectrecord=newJSONObject();record.put("col1",String.format("batch-record %03d-%03d",i,j));jsonArr.put(record);}writer.append(jsonArr,offset);offset+=jsonArr.length();}}catch(ExecutionExceptione){// If the wrapped exception is a StatusRuntimeException, check the state of the operation.// If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see:// https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.htmlSystem.out.println("Failed to append records. \n"+e);}// Final cleanup for the stream.writer.cleanup(client);System.out.println("Appended records successfully.");// Once all streams are done, if all writes were successful, commit all of them in one request.// This example only has the one stream. If any streams failed, their workload may be// retried on a new stream, and then only the successful stream should be included in the// commit.BatchCommitWriteStreamsRequestcommitRequest=BatchCommitWriteStreamsRequest.newBuilder().setParent(parentTable.toString()).addWriteStreams(writer.getStreamName()).build();BatchCommitWriteStreamsResponsecommitResponse=client.batchCommitWriteStreams(commitRequest);// If the response does not have a commit time, it means the commit operation failed.if(commitResponse.hasCommitTime()==false){for(StorageErrorerr:commitResponse.getStreamErrorsList()){System.out.println(err.getErrorMessage());}thrownewRuntimeException("Error committing the streams");}System.out.println("Appended and committed records successfully.");}// A simple wrapper object showing how the stateful stream writer should be used.privatestaticclassDataWriter{privateJsonStreamWriterstreamWriter;// Track the number of in-flight requests to wait for all responses before shutting down.privatefinalPhaserinflightRequestCount=newPhaser(1);privatefinalObjectlock=newObject();@GuardedBy("lock")privateRuntimeExceptionerror=null;voidinitialize(TableNameparentTable,BigQueryWriteClientclient)throwsIOException,DescriptorValidationException,InterruptedException{// Initialize a write stream for the specified table.// For more information on WriteStream.Type, see:// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/WriteStream.Type.htmlWriteStreamstream=WriteStream.newBuilder().setType(WriteStream.Type.PENDING).build();// Configure in-stream automatic retry settings.// Error codes that are immediately retried:// * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED// Error codes that are retried with exponential backoff:// * RESOURCE_EXHAUSTEDRetrySettingsretrySettings=RetrySettings.newBuilder().setInitialRetryDelay(Duration.ofMillis(500)).setRetryDelayMultiplier(1.1).setMaxAttempts(5).setMaxRetryDelay(Duration.ofMinutes(1)).build();CreateWriteStreamRequestcreateWriteStreamRequest=CreateWriteStreamRequest.newBuilder().setParent(parentTable.toString()).setWriteStream(stream).build();WriteStreamwriteStream=client.createWriteStream(createWriteStreamRequest);// Use the JSON stream writer to send records in JSON format.// For more information about JsonStreamWriter, see:// https://cloud.google.com/java/docs/reference/google-cloud-bigquerystorage/latest/com.google.cloud.bigquery.storage.v1.JsonStreamWriterstreamWriter=JsonStreamWriter.newBuilder(writeStream.getName(),writeStream.getTableSchema()).setRetrySettings(retrySettings).build();}publicvoidappend(JSONArraydata,longoffset)throwsDescriptorValidationException,IOException,ExecutionException{synchronized(this.lock){// If earlier appends have failed, we need to reset before continuing.if(this.error!=null){throwthis.error;}}// Append asynchronously for increased throughput.ApiFuture<AppendRowsResponse>future=streamWriter.append(data,offset);ApiFutures.addCallback(future,newAppendCompleteCallback(this),MoreExecutors.directExecutor());// Increase the count of in-flight requests.inflightRequestCount.register();}publicvoidcleanup(BigQueryWriteClientclient){// Wait for all in-flight requests to complete.inflightRequestCount.arriveAndAwaitAdvance();// Close the connection to the server.streamWriter.close();// Verify that no error occurred in the stream.synchronized(this.lock){if(this.error!=null){throwthis.error;}}// Finalize the stream.FinalizeWriteStreamResponsefinalizeResponse=client.finalizeWriteStream(streamWriter.getStreamName());System.out.println("Rows written: "+finalizeResponse.getRowCount());}publicStringgetStreamName(){returnstreamWriter.getStreamName();}staticclassAppendCompleteCallbackimplementsApiFutureCallback<AppendRowsResponse>{privatefinalDataWriterparent;publicAppendCompleteCallback(DataWriterparent){this.parent=parent;}publicvoidonSuccess(AppendRowsResponseresponse){System.out.format("Append %d success\n",response.getAppendResult().getOffset().getValue());done();}publicvoidonFailure(Throwablethrowable){synchronized(this.parent.lock){if(this.parent.error==null){StorageExceptionstorageException=Exceptions.toStorageException(throwable);this.parent.error=(storageException!=null)?storageException:newRuntimeException(throwable);}}System.out.format("Error: %s\n",throwable.toString());done();}privatevoiddone(){// Reduce the count of in-flight requests.this.parent.inflightRequestCount.arriveAndDeregister();}}}}Node.js
Before trying this sample, follow theNode.js setup instructions in theBigQuery quickstart using client libraries. For more information, see theBigQueryNode.js API reference documentation.
To authenticate to BigQuery, set up Application Default Credentials. For more information, seeSet up authentication for client libraries.
const{adapt,managedwriter}=require('@google-cloud/bigquery-storage');const{WriterClient,JSONWriter}=managedwriter;asyncfunctionappendRowsPendingStream(){/** * TODO(developer): Uncomment the following lines before running the sample. */// projectId = 'my_project';// datasetId = 'my_dataset';// tableId = 'my_table';constdestinationTable=`projects/${projectId}/datasets/${datasetId}/tables/${tableId}`;conststreamType=managedwriter.PendingStream;constwriteClient=newWriterClient({projectId:projectId});try{constwriteStream=awaitwriteClient.createWriteStreamFullResponse({streamType,destinationTable,});conststreamId=writeStream.name;console.log(`Stream created:${streamId}`);constprotoDescriptor=adapt.convertStorageSchemaToProto2Descriptor(writeStream.tableSchema,'root',);constconnection=awaitwriteClient.createStreamConnection({streamId,});constwriter=newJSONWriter({connection,protoDescriptor,});letrows=[];constpendingWrites=[];// Row 1letrow={row_num:1,bool_col:true,bytes_col:Buffer.from('hello world'),float64_col:parseFloat('+123.44999694824219'),int64_col:123,string_col:'omg',};rows.push(row);// Row 2row={row_num:2,bool_col:false,};rows.push(row);// Row 3row={row_num:3,bytes_col:Buffer.from('later, gator'),};rows.push(row);// Row 4row={row_num:4,float64_col:987.6539916992188,};rows.push(row);// Row 5row={row_num:5,int64_col:321,};rows.push(row);// Row 6row={row_num:6,string_col:'octavia',};rows.push(row);// Set an offset to allow resuming this stream if the connection breaks.// Keep track of which requests the server has acknowledged and resume the// stream at the first non-acknowledged message. If the server has already// processed a message with that offset, it will return an ALREADY_EXISTS// error, which can be safely ignored.// The first request must always have an offset of 0.letoffsetValue=0;// Send batch.letpw=writer.appendRows(rows,offsetValue);pendingWrites.push(pw);// Reset rows.rows=[];// Row 7row={row_num:7,date_col:newDate('2019-02-07'),};rows.push(row);// Row 8row={row_num:8,datetime_col:newDate('2019-02-17T11:24:00.000Z'),};rows.push(row);// Row 9row={row_num:9,geography_col:'POINT(5 5)',};rows.push(row);// Row 10row={row_num:10,numeric_col:123456,bignumeric_col:'99999999999999999999999999999.999999999',};rows.push(row);// Row 11row={row_num:11,time_col:'18:00:00',};rows.push(row);// Row 12row={row_num:12,timestamp_col:newDate('2022-01-09T03:49:46.564Z'),};rows.push(row);// Offset must equal the number of rows that were previously sent.offsetValue=6;// Send batch.pw=writer.appendRows(rows,offsetValue);pendingWrites.push(pw);rows=[];// Row 13row={row_num:13,int64_list:[1999,2001],};rows.push(row);// Row 14row={row_num:14,struct_col:{sub_int_col:99,},};rows.push(row);// Row 15row={row_num:15,struct_list:[{sub_int_col:100},{sub_int_col:101}],};rows.push(row);// Row 16row={row_num:16,range_col:{start:newDate('2022-01-09T03:49:46.564Z'),end:newDate('2022-01-09T04:49:46.564Z'),},};rows.push(row);offsetValue=12;// Send batch.pw=writer.appendRows(rows,offsetValue);pendingWrites.push(pw);constresults=awaitPromise.all(pendingWrites.map(pw=>pw.getResult()),);console.log('Write results:',results);const{rowCount}=awaitconnection.finalize();console.log(`Row count:${rowCount}`);constresponse=awaitwriteClient.batchCommitWriteStream({parent:destinationTable,writeStreams:[streamId],});console.log(response);}catch(err){console.log(err.message,err);}finally{writeClient.close();}}What's next
To search and filter code samples for other Google Cloud products, see theGoogle Cloud sample browser.
Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.