Append records using default client Stay organized with collections Save and categorize content based on your preferences.
Use the JSON stream writer to append records using default client.
Explore further
For detailed documentation that includes this code sample, see the following:
Code sample
Java
Before trying this sample, follow theJava setup instructions in theBigQuery quickstart using client libraries. For more information, see theBigQueryJava API reference documentation.
To authenticate to BigQuery, set up Application Default Credentials. For more information, seeSet up authentication for client libraries.
importcom.google.api.core.ApiFuture;importcom.google.api.core.ApiFutureCallback;importcom.google.api.core.ApiFutures;importcom.google.api.gax.batching.FlowControlSettings;importcom.google.api.gax.core.FixedExecutorProvider;importcom.google.api.gax.retrying.RetrySettings;importcom.google.cloud.bigquery.BigQuery;importcom.google.cloud.bigquery.BigQueryOptions;importcom.google.cloud.bigquery.QueryJobConfiguration;importcom.google.cloud.bigquery.TableResult;importcom.google.cloud.bigquery.storage.v1.AppendRowsRequest;importcom.google.cloud.bigquery.storage.v1.AppendRowsResponse;importcom.google.cloud.bigquery.storage.v1.BigQueryWriteClient;importcom.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;importcom.google.cloud.bigquery.storage.v1.Exceptions;importcom.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError;importcom.google.cloud.bigquery.storage.v1.Exceptions.MaximumRequestCallbackWaitTimeExceededException;importcom.google.cloud.bigquery.storage.v1.Exceptions.StorageException;importcom.google.cloud.bigquery.storage.v1.Exceptions.StreamWriterClosedException;importcom.google.cloud.bigquery.storage.v1.JsonStreamWriter;importcom.google.cloud.bigquery.storage.v1.TableName;importcom.google.common.util.concurrent.MoreExecutors;importcom.google.protobuf.ByteString;importcom.google.protobuf.Descriptors.DescriptorValidationException;importjava.io.IOException;importjava.util.Map;importjava.util.concurrent.Executors;importjava.util.concurrent.Phaser;importjava.util.concurrent.atomic.AtomicInteger;importjavax.annotation.concurrent.GuardedBy;importorg.json.JSONArray;importorg.json.JSONObject;importorg.threeten.bp.Duration;publicclassWriteToDefaultStream{publicstaticvoidrunWriteToDefaultStream()throwsDescriptorValidationException,InterruptedException,IOException{// TODO(developer): Replace these variables before running the sample.StringprojectId="MY_PROJECT_ID";StringdatasetName="MY_DATASET_NAME";StringtableName="MY_TABLE_NAME";writeToDefaultStream(projectId,datasetName,tableName);}privatestaticByteStringbuildByteString(){byte[]bytes=newbyte[]{1,2,3,4,5};returnByteString.copyFrom(bytes);}// Create a JSON object that is compatible with the table schema.privatestaticJSONObjectbuildRecord(inti,intj){JSONObjectrecord=newJSONObject();StringBuildersbSuffix=newStringBuilder();for(intk=0;k <j;k++){sbSuffix.append(k);}record.put("test_string",String.format("record %03d-%03d %s",i,j,sbSuffix.toString()));ByteStringbyteString=buildByteString();record.put("test_bytes",byteString);record.put("test_geo","POLYGON((-124.49 47.35,-124.49 40.73,-116.49 40.73,-116.49 47.35,-124.49 47.35))");returnrecord;}publicstaticvoidwriteToDefaultStream(StringprojectId,StringdatasetName,StringtableName)throwsDescriptorValidationException,InterruptedException,IOException{TableNameparentTable=TableName.of(projectId,datasetName,tableName);DataWriterwriter=newDataWriter();// One time initialization for the worker.writer.initialize(parentTable);// Write two batches of fake data to the stream, each with 10 JSON records. Data may be// batched up to the maximum request size:// https://cloud.google.com/bigquery/quotas#write-api-limitsfor(inti=0;i <2;i++){JSONArrayjsonArr=newJSONArray();for(intj=0;j <10;j++){JSONObjectrecord=buildRecord(i,j);jsonArr.put(record);}writer.append(newAppendContext(jsonArr));}// Final cleanup for the stream during worker teardown.writer.cleanup();verifyExpectedRowCount(parentTable,12);System.out.println("Appended records successfully.");}privatestaticvoidverifyExpectedRowCount(TableNameparentTable,intexpectedRowCount)throwsInterruptedException{StringqueryRowCount="SELECT COUNT(*) FROM `"+parentTable.getProject()+"."+parentTable.getDataset()+"."+parentTable.getTable()+"`";QueryJobConfigurationqueryConfig=QueryJobConfiguration.newBuilder(queryRowCount).build();BigQuerybigquery=BigQueryOptions.getDefaultInstance().getService();TableResultresults=bigquery.query(queryConfig);intcountRowsActual=Integer.parseInt(results.getValues().iterator().next().get("f0_").getStringValue());if(countRowsActual!=expectedRowCount){thrownewRuntimeException("Unexpected row count. Expected: "+expectedRowCount+". Actual: "+countRowsActual);}}privatestaticclassAppendContext{JSONArraydata;AppendContext(JSONArraydata){this.data=data;}}privatestaticclassDataWriter{privatestaticfinalintMAX_RECREATE_COUNT=3;privateBigQueryWriteClientclient;// Track the number of in-flight requests to wait for all responses before shutting down.privatefinalPhaserinflightRequestCount=newPhaser(1);privatefinalObjectlock=newObject();privateJsonStreamWriterstreamWriter;@GuardedBy("lock")privateRuntimeExceptionerror=null;privateAtomicIntegerrecreateCount=newAtomicInteger(0);privateJsonStreamWritercreateStreamWriter(StringtableName)throwsDescriptorValidationException,IOException,InterruptedException{// Configure in-stream automatic retry settings.// Error codes that are immediately retried:// * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED// Error codes that are retried with exponential backoff:// * RESOURCE_EXHAUSTEDRetrySettingsretrySettings=RetrySettings.newBuilder().setInitialRetryDelay(Duration.ofMillis(500)).setRetryDelayMultiplier(1.1).setMaxAttempts(5).setMaxRetryDelay(Duration.ofMinutes(1)).build();// Use the JSON stream writer to send records in JSON format. Specify the table name to write// to the default stream.// For more information about JsonStreamWriter, see:// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.htmlreturnJsonStreamWriter.newBuilder(tableName,client).setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(100))).setChannelProvider(BigQueryWriteSettings.defaultGrpcTransportProviderBuilder().setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1)).setKeepAliveTimeout(org.threeten.bp.Duration.ofMinutes(1)).setKeepAliveWithoutCalls(true).setChannelsPerCpu(2).build()).setEnableConnectionPool(true)// This will allow connection pool to scale up better..setFlowControlSettings(FlowControlSettings.newBuilder().setMaxOutstandingElementCount(100L).build())// If value is missing in json and there is a default value configured on bigquery// column, apply the default value to the missing value field..setDefaultMissingValueInterpretation(AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE).setRetrySettings(retrySettings).build();}publicvoidinitialize(TableNameparentTable)throwsDescriptorValidationException,IOException,InterruptedException{// Initialize client without settings, internally within stream writer a new client will be// created with full settings.client=BigQueryWriteClient.create();streamWriter=createStreamWriter(parentTable.toString());}publicvoidappend(AppendContextappendContext)throwsDescriptorValidationException,IOException,InterruptedException{synchronized(this.lock){if(!streamWriter.isUserClosed() &&streamWriter.isClosed() &&recreateCount.getAndIncrement() <MAX_RECREATE_COUNT){streamWriter=createStreamWriter(streamWriter.getStreamName());this.error=null;}// If earlier appends have failed, we need to reset before continuing.if(this.error!=null){throwthis.error;}}// Append asynchronously for increased throughput.ApiFuture<AppendRowsResponse>future=streamWriter.append(appendContext.data);ApiFutures.addCallback(future,newAppendCompleteCallback(this,appendContext),MoreExecutors.directExecutor());// Increase the count of in-flight requests.inflightRequestCount.register();}publicvoidcleanup(){// Wait for all in-flight requests to complete.inflightRequestCount.arriveAndAwaitAdvance();client.close();// Close the connection to the server.streamWriter.close();// Verify that no error occurred in the stream.synchronized(this.lock){if(this.error!=null){throwthis.error;}}}staticclassAppendCompleteCallbackimplementsApiFutureCallback<AppendRowsResponse>{privatefinalDataWriterparent;privatefinalAppendContextappendContext;publicAppendCompleteCallback(DataWriterparent,AppendContextappendContext){this.parent=parent;this.appendContext=appendContext;}publicvoidonSuccess(AppendRowsResponseresponse){System.out.format("Append success\n");this.parent.recreateCount.set(0);done();}publicvoidonFailure(Throwablethrowable){if(throwableinstanceofAppendSerializationError){AppendSerializationErrorase=(AppendSerializationError)throwable;Map<Integer,String>rowIndexToErrorMessage=ase.getRowIndexToErrorMessage();if(rowIndexToErrorMessage.size() >0){// Omit the faulty rowsJSONArraydataNew=newJSONArray();for(inti=0;i <appendContext.data.length();i++){if(!rowIndexToErrorMessage.containsKey(i)){dataNew.put(appendContext.data.get(i));}else{// process faulty rows by placing them on a dead-letter-queue, for instance}}// Retry the remaining valid rows, but using a separate thread to// avoid potentially blocking while we are in a callback.if(dataNew.length() >0){try{this.parent.append(newAppendContext(dataNew));}catch(DescriptorValidationExceptione){thrownewRuntimeException(e);}catch(IOExceptione){thrownewRuntimeException(e);}catch(InterruptedExceptione){thrownewRuntimeException(e);}}// Mark the existing attempt as done since we got a response for itdone();return;}}booleanresendRequest=false;if(throwableinstanceofMaximumRequestCallbackWaitTimeExceededException){resendRequest=true;}elseif(throwableinstanceofStreamWriterClosedException){if(!parent.streamWriter.isUserClosed()){resendRequest=true;}}if(resendRequest){// Retry this request.try{this.parent.append(newAppendContext(appendContext.data));}catch(DescriptorValidationExceptione){thrownewRuntimeException(e);}catch(IOExceptione){thrownewRuntimeException(e);}catch(InterruptedExceptione){thrownewRuntimeException(e);}// Mark the existing attempt as done since we got a response for itdone();return;}synchronized(this.parent.lock){if(this.parent.error==null){StorageExceptionstorageException=Exceptions.toStorageException(throwable);this.parent.error=(storageException!=null)?storageException:newRuntimeException(throwable);}}done();}privatevoiddone(){// Reduce the count of in-flight requests.this.parent.inflightRequestCount.arriveAndDeregister();}}}}Node.js
Before trying this sample, follow theNode.js setup instructions in theBigQuery quickstart using client libraries. For more information, see theBigQueryNode.js API reference documentation.
To authenticate to BigQuery, set up Application Default Credentials. For more information, seeSet up authentication for client libraries.
const{adapt,managedwriter}=require('@google-cloud/bigquery-storage');const{WriterClient,JSONWriter}=managedwriter;asyncfunctionappendJSONRowsDefaultStream(){/** * TODO(developer): Uncomment the following lines before running the sample. */// projectId = 'my_project';// datasetId = 'my_dataset';// tableId = 'my_table';constdestinationTable=`projects/${projectId}/datasets/${datasetId}/tables/${tableId}`;constwriteClient=newWriterClient({projectId});try{constwriteStream=awaitwriteClient.getWriteStream({streamId:`${destinationTable}/streams/_default`,view:'FULL',});constprotoDescriptor=adapt.convertStorageSchemaToProto2Descriptor(writeStream.tableSchema,'root',);constconnection=awaitwriteClient.createStreamConnection({streamId:managedwriter.DefaultStream,destinationTable,});conststreamId=connection.getStreamId();constwriter=newJSONWriter({streamId,connection,protoDescriptor,});letrows=[];constpendingWrites=[];// Row 1letrow={row_num:1,customer_name:'Octavia',};rows.push(row);// Row 2row={row_num:2,customer_name:'Turing',};rows.push(row);// Send batch.letpw=writer.appendRows(rows);pendingWrites.push(pw);rows=[];// Row 3row={row_num:3,customer_name:'Bell',};rows.push(row);// Send batch.pw=writer.appendRows(rows);pendingWrites.push(pw);constresults=awaitPromise.all(pendingWrites.map(pw=>pw.getResult()),);console.log('Write results:',results);}catch(err){console.log(err);}finally{writeClient.close();}}What's next
To search and filter code samples for other Google Cloud products, see theGoogle Cloud sample browser.
Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.