Download table data in the Avro data format Stay organized with collections Save and categorize content based on your preferences.
Download table data using the Avro data format and deserialize the data into row objects.
Explore further
For detailed documentation that includes this code sample, see the following:
Code sample
C++
Before trying this sample, follow theC++ setup instructions in theBigQuery quickstart using client libraries. For more information, see theBigQueryC++ API reference documentation.
To authenticate to BigQuery, set up Application Default Credentials. For more information, seeSet up authentication for client libraries.
#include"google/cloud/bigquery/storage/v1/bigquery_read_client.h"#include <iostream>namespace{voidProcessRowsInAvroFormat(::google::cloud::bigquery::storage::v1::AvroSchemaconst&,::google::cloud::bigquery::storage::v1::AvroRowsconst&){// Code to deserialize avro rows should be added here.}}// namespaceintmain(intargc,char*argv[])try{if(argc!=3){std::cerr <<"Usage: " <<argv[0] <<" <project-id> <table-name>\n";return1;}// project_name should be in the format "projects/<your-gcp-project>"std::stringconstproject_name="projects/"+std::string(argv[1]);// table_name should be in the format:// "projects/<project-table-resides-in>/datasets/<dataset-table_resides-in>/tables/<table// name>" The project values in project_name and table_name do not have to be// identical.std::stringconsttable_name=argv[2];// Create a namespace alias to make the code easier to read.namespacebigquery_storage=::google::cloud::bigquery_storage_v1;constexprintkMaxReadStreams=1;// Create the ReadSession.autoclient=bigquery_storage::BigQueryReadClient(bigquery_storage::MakeBigQueryReadConnection());::google::cloud::bigquery::storage::v1::ReadSessionread_session;read_session.set_data_format(google::cloud::bigquery::storage::v1::DataFormat::AVRO);read_session.set_table(table_name);autosession=client.CreateReadSession(project_name,read_session,kMaxReadStreams);if(!session)throwstd::move(session).status();// Read rows from the ReadSession.constexprintkRowOffset=0;autoread_rows=client.ReadRows(session->streams(0).name(),kRowOffset);std::int64_tnum_rows=0;for(autoconst&row:read_rows){if(row.ok()){num_rows+=row->row_count();ProcessRowsInAvroFormat(session->avro_schema(),row->avro_rows());}}std::cout <<num_rows <<" rows read from table: " <<table_name <<"\n";return0;}catch(google::cloud::Statusconst&status){std::cerr <<"google::cloud::Status thrown: " <<status <<"\n";return1;}C#
Before trying this sample, follow theC# setup instructions in theBigQuery quickstart using client libraries. For more information, see theBigQueryC# API reference documentation.
To authenticate to BigQuery, set up Application Default Credentials. For more information, seeSet up authentication for client libraries.
usingAvro;usingAvro.IO;usingAvro.Specific;usingBigQueryStorage.Samples.Utilities;usingGoogle.Api.Gax.ResourceNames;usingGoogle.Cloud.BigQuery.Storage.V1;usingSystem;usingSystem.Collections.Generic;usingSystem.IO;usingSystem.Linq;usingSystem.Threading.Tasks;usingstaticGoogle.Cloud.BigQuery.Storage.V1.ReadSession.Types;publicclassQuickstartSample{publicasyncTask<List<BabyNamesData>>QuickstartAsync(stringprojectId){varbigQueryReadClient=BigQueryReadClient.Create();CreateReadSessionRequestcreateReadSessionRequest=newCreateReadSessionRequest{ParentAsProjectName=newProjectName(projectId),ReadSession=newReadSession{// This example uses baby name data from the public datasets.TableAsTableName=newTableName("bigquery-public-data","usa_names","usa_1910_current"),DataFormat=DataFormat.Avro,ReadOptions=newTableReadOptions{// Specify the columns to be projected by adding them to the selected fields.SelectedFields={"name","number","state"},RowRestriction="state = \"WA\"",},},// Sets maximum number of reading streams to 1.MaxStreamCount=1,};varreadSession=bigQueryReadClient.CreateReadSession(createReadSessionRequest);// Uses the first (and only) stream to read data from and reading starts from offset 0.varreadRowsStream=bigQueryReadClient.ReadRows(readSession.Streams.First().Name,0).GetResponseStream();varschema=Schema.Parse(readSession.AvroSchema.Schema);// BabyNamesData has been generated using AvroGen, version 1.11.1.// The file is available here https://github.com/GoogleCloudPlatform/dotnet-docs-samples/blob/main/bigquery-storage/api/BigQueryStorage.Samples/Utilities/BabyNamesData.g.csvarreader=newSpecificDatumReader<BabyNamesData>(schema,schema);vardataList=newList<BabyNamesData>();awaitforeach(varreadRowResponseinreadRowsStream){varbyteArray=readRowResponse.AvroRows.SerializedBinaryRows.ToByteArray();vardecoder=newBinaryDecoder(newMemoryStream(byteArray));for(introw=0;row <readRowResponse.RowCount;row++){varrecord=reader.Read(newBabyNamesData(),decoder);dataList.Add(record);Console.WriteLine($"name: {record.name}, state: {record.state}, number: {record.number}");}}returndataList;}}Go
Before trying this sample, follow theGo setup instructions in theBigQuery quickstart using client libraries. For more information, see theBigQueryGo API reference documentation.
To authenticate to BigQuery, set up Application Default Credentials. For more information, seeSet up authentication for client libraries.
// The bigquery_storage_quickstart application demonstrates usage of the// BigQuery Storage read API. It demonstrates API features such as column// projection (limiting the output to a subset of a table's columns),// column filtering (using simple predicates to filter records on the server// side), establishing the snapshot time (reading data from the table at a// specific point in time), decoding Avro row blocks using the third party// "github.com/linkedin/goavro" library, and decoding Arrow row blocks using// the third party "github.com/apache/arrow/go" library.packagemainimport("bytes""context""encoding/json""flag""fmt""io""log""sort""strings""sync""time"bqStorage"cloud.google.com/go/bigquery/storage/apiv1""cloud.google.com/go/bigquery/storage/apiv1/storagepb""github.com/apache/arrow/go/v10/arrow""github.com/apache/arrow/go/v10/arrow/ipc""github.com/apache/arrow/go/v10/arrow/memory"gax"github.com/googleapis/gax-go/v2"goavro"github.com/linkedin/goavro/v2""google.golang.org/genproto/googleapis/rpc/errdetails""google.golang.org/grpc""google.golang.org/grpc/codes""google.golang.org/grpc/status""google.golang.org/protobuf/types/known/timestamppb")// rpcOpts is used to configure the underlying gRPC client to accept large// messages. The BigQuery Storage API may send message blocks up to 128MB// in size.varrpcOpts=gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(1024*1024*129),)// Available formatsconst(AVRO_FORMAT="avro"ARROW_FORMAT="arrow")// Command-line flags.var(projectID=flag.String("project_id","","Cloud Project ID, used for session creation.")snapshotMillis=flag.Int64("snapshot_millis",0,"Snapshot time to use for reads, represented in epoch milliseconds format. Default behavior reads current data.")format=flag.String("format",AVRO_FORMAT,"format to read data from storage API. Default is avro."))funcmain(){flag.Parse()ctx:=context.Background()bqReadClient,err:=bqStorage.NewBigQueryReadClient(ctx)iferr!=nil{log.Fatalf("NewBigQueryStorageClient: %v",err)}deferbqReadClient.Close()// Verify we've been provided a parent project which will contain the read session. The// session may exist in a different project than the table being read.if*projectID==""{log.Fatalf("No parent project ID specified, please supply using the --project_id flag.")}// This example uses baby name data from the public datasets.srcProjectID:="bigquery-public-data"srcDatasetID:="usa_names"srcTableID:="usa_1910_current"readTable:=fmt.Sprintf("projects/%s/datasets/%s/tables/%s",srcProjectID,srcDatasetID,srcTableID,)// We limit the output columns to a subset of those allowed in the table,// and set a simple filter to only report names from the state of// Washington (WA).tableReadOptions:=&storagepb.ReadSession_TableReadOptions{SelectedFields:[]string{"name","number","state"},RowRestriction:`state = "WA"`,}dataFormat:=storagepb.DataFormat_AVROif*format==ARROW_FORMAT{dataFormat=storagepb.DataFormat_ARROW}createReadSessionRequest:=&storagepb.CreateReadSessionRequest{Parent:fmt.Sprintf("projects/%s",*projectID),ReadSession:&storagepb.ReadSession{Table:readTable,DataFormat:dataFormat,ReadOptions:tableReadOptions,},MaxStreamCount:1,}// Set a snapshot time if it's been specified.if*snapshotMillis >0{ts:=timestamppb.New(time.Unix(0,*snapshotMillis*1000))if!ts.IsValid(){log.Fatalf("Invalid snapshot millis (%d): %v",*snapshotMillis,err)}createReadSessionRequest.ReadSession.TableModifiers=&storagepb.ReadSession_TableModifiers{SnapshotTime:ts,}}// Create the session from the request.session,err:=bqReadClient.CreateReadSession(ctx,createReadSessionRequest,rpcOpts)iferr!=nil{log.Fatalf("CreateReadSession: %v",err)}fmt.Printf("Read session: %s\n",session.GetName())iflen(session.GetStreams())==0{log.Fatalf("no streams in session. if this was a small query result, consider writing to output to a named table.")}// We'll use only a single stream for reading data from the table. Because// of dynamic sharding, this will yield all the rows in the table. However,// if you wanted to fan out multiple readers you could do so by having a// increasing the MaxStreamCount.readStream:=session.GetStreams()[0].Namech:=make(chan*storagepb.ReadRowsResponse)// Use a waitgroup to coordinate the reading and decoding goroutines.varwgsync.WaitGroup// Start the reading in one goroutine.wg.Add(1)gofunc(){deferwg.Done()iferr:=processStream(ctx,bqReadClient,readStream,ch);err!=nil{log.Fatalf("processStream failure: %v",err)}close(ch)}()// Start Avro processing and decoding in another goroutine.wg.Add(1)gofunc(){deferwg.Done()varerrerrorswitch*format{caseARROW_FORMAT:err=processArrow(ctx,session.GetArrowSchema().GetSerializedSchema(),ch)caseAVRO_FORMAT:err=processAvro(ctx,session.GetAvroSchema().GetSchema(),ch)}iferr!=nil{log.Fatalf("error processing %s: %v",*format,err)}}()// Wait until both the reading and decoding goroutines complete.wg.Wait()}// printDatum prints the decoded row datum.funcprintDatum(dinterface{}){m,ok:=d.(map[string]interface{})if!ok{log.Printf("failed type assertion: %v",d)}// Go's map implementation returns keys in a random ordering, so we sort// the keys before accessing.keys:=make([]string,len(m))i:=0fork:=rangem{keys[i]=ki++}sort.Strings(keys)for_,key:=rangekeys{fmt.Printf("%s: %-20v ",key,valueFromTypeMap(m[key]))}fmt.Println()}// printRecordBatch prints the arrow record batchfuncprintRecordBatch(recordarrow.Record)error{out,err:=record.MarshalJSON()iferr!=nil{returnerr}list:=[]map[string]interface{}{}err=json.Unmarshal(out,&list)iferr!=nil{returnerr}iflen(list)==0{returnnil}first:=list[0]keys:=make([]string,len(first))i:=0fork:=rangefirst{keys[i]=ki++}sort.Strings(keys)builder:=strings.Builder{}for_,m:=rangelist{for_,key:=rangekeys{builder.WriteString(fmt.Sprintf("%s: %-20v ",key,m[key]))}builder.WriteString("\n")}fmt.Print(builder.String())returnnil}// valueFromTypeMap returns the first value/key in the type map. This function// is only suitable for simple schemas, as complex typing such as arrays and// records necessitate a more robust implementation. See the goavro library// and the Avro specification for more information.funcvalueFromTypeMap(fieldinterface{})interface{}{m,ok:=field.(map[string]interface{})if!ok{returnnil}for_,v:=rangem{// Return the first key encountered.returnv}returnnil}// processStream reads rows from a single storage Stream, and sends the Storage Response// data blocks to a channel. This function will retry on transient stream// failures and bookmark progress to avoid re-reading data that's already been// successfully transmitted.funcprocessStream(ctxcontext.Context,client*bqStorage.BigQueryReadClient,ststring,chchan<-*storagepb.ReadRowsResponse)error{varoffsetint64// Streams may be long-running. Rather than using a global retry for the// stream, implement a retry that resets once progress is made.retryLimit:=3retries:=0for{// Send the initiating request to start streaming row blocks.rowStream,err:=client.ReadRows(ctx,&storagepb.ReadRowsRequest{ReadStream:st,Offset:offset,},rpcOpts)iferr!=nil{returnfmt.Errorf("couldn't invoke ReadRows: %w",err)}// Process the streamed responses.for{r,err:=rowStream.Recv()iferr==io.EOF{returnnil}iferr!=nil{// If there is an error, check whether it is a retryable// error with a retry delay and sleep instead of increasing// retries count.varretryDelayDurationtime.DurationiferrorStatus,ok:=status.FromError(err);ok &&errorStatus.Code()==codes.ResourceExhausted{for_,detail:=rangeerrorStatus.Details(){retryInfo,ok:=detail.(*errdetails.RetryInfo)if!ok{continue}retryDelay:=retryInfo.GetRetryDelay()retryDelayDuration=time.Duration(retryDelay.Seconds)*time.Second+time.Duration(retryDelay.Nanos)*time.Nanosecondbreak}}ifretryDelayDuration!=0{log.Printf("processStream failed with a retryable error, retrying in %v",retryDelayDuration)time.Sleep(retryDelayDuration)}else{retries++ifretries>=retryLimit{returnfmt.Errorf("processStream retries exhausted: %w",err)}}// break the inner loop, and try to recover by starting a new streaming// ReadRows call at the last known good offset.break}else{// Reset retries after a successful response.retries=0}rc:=r.GetRowCount()ifrc >0{// Bookmark our progress in case of retries and send the rowblock on the channel.offset=offset+rc// We're making progress, reset retries.retries=0ch<-r}}}}// processArrow receives row blocks from a channel, and uses the provided Arrow// schema to decode the blocks into individual row messages for printing. Will// continue to run until the channel is closed or the provided context is// cancelled.funcprocessArrow(ctxcontext.Context,schema[]byte,ch<-chan*storagepb.ReadRowsResponse)error{mem:=memory.NewGoAllocator()buf:=bytes.NewBuffer(schema)r,err:=ipc.NewReader(buf,ipc.WithAllocator(mem))iferr!=nil{returnerr}aschema:=r.Schema()for{select{case<-ctx.Done():// Context was cancelled. Stop.returnctx.Err()caserows,ok:=<-ch:if!ok{// Channel closed, no further arrow messages. Stop.returnnil}undecoded:=rows.GetArrowRecordBatch().GetSerializedRecordBatch()iflen(undecoded) >0{buf=bytes.NewBuffer(schema)buf.Write(undecoded)r,err=ipc.NewReader(buf,ipc.WithAllocator(mem),ipc.WithSchema(aschema))iferr!=nil{returnerr}forr.Next(){rec:=r.Record()err=printRecordBatch(rec)iferr!=nil{returnerr}}}}}}// processAvro receives row blocks from a channel, and uses the provided Avro// schema to decode the blocks into individual row messages for printing. Will// continue to run until the channel is closed or the provided context is// cancelled.funcprocessAvro(ctxcontext.Context,schemastring,ch<-chan*storagepb.ReadRowsResponse)error{// Establish a decoder that can process blocks of messages using the// reference schema. All blocks share the same schema, so the decoder// can be long-lived.codec,err:=goavro.NewCodec(schema)iferr!=nil{returnfmt.Errorf("couldn't create codec: %w",err)}for{select{case<-ctx.Done():// Context was cancelled. Stop.returnctx.Err()caserows,ok:=<-ch:if!ok{// Channel closed, no further avro messages. Stop.returnnil}undecoded:=rows.GetAvroRows().GetSerializedBinaryRows()forlen(undecoded) >0{datum,remainingBytes,err:=codec.NativeFromBinary(undecoded)iferr!=nil{iferr==io.EOF{break}returnfmt.Errorf("decoding error with %d bytes remaining: %v",len(undecoded),err)}printDatum(datum)undecoded=remainingBytes}}}}Java
Before trying this sample, follow theJava setup instructions in theBigQuery quickstart using client libraries. For more information, see theBigQueryJava API reference documentation.
To authenticate to BigQuery, set up Application Default Credentials. For more information, seeSet up authentication for client libraries.
importcom.google.api.gax.rpc.ServerStream;importcom.google.cloud.bigquery.storage.v1.AvroRows;importcom.google.cloud.bigquery.storage.v1.BigQueryReadClient;importcom.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;importcom.google.cloud.bigquery.storage.v1.DataFormat;importcom.google.cloud.bigquery.storage.v1.ReadRowsRequest;importcom.google.cloud.bigquery.storage.v1.ReadRowsResponse;importcom.google.cloud.bigquery.storage.v1.ReadSession;importcom.google.cloud.bigquery.storage.v1.ReadSession.TableModifiers;importcom.google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions;importcom.google.common.base.Preconditions;importcom.google.protobuf.Timestamp;importjava.io.IOException;importorg.apache.avro.Schema;importorg.apache.avro.generic.GenericDatumReader;importorg.apache.avro.generic.GenericRecord;importorg.apache.avro.io.BinaryDecoder;importorg.apache.avro.io.DatumReader;importorg.apache.avro.io.DecoderFactory;publicclassStorageSample{/* * SimpleRowReader handles deserialization of the Avro-encoded row blocks transmitted * from the storage API using a generic datum decoder. */privatestaticclassSimpleRowReader{privatefinalDatumReader<GenericRecord>datumReader;// Decoder object will be reused to avoid re-allocation and too much garbage collection.privateBinaryDecoderdecoder=null;// GenericRecord object will be reused.privateGenericRecordrow=null;publicSimpleRowReader(Schemaschema){Preconditions.checkNotNull(schema);datumReader=newGenericDatumReader<>(schema);}/** * Sample method for processing AVRO rows which only validates decoding. * * @param avroRows object returned from the ReadRowsResponse. */publicvoidprocessRows(AvroRowsavroRows)throwsIOException{decoder=DecoderFactory.get().binaryDecoder(avroRows.getSerializedBinaryRows().toByteArray(),decoder);while(!decoder.isEnd()){// Reusing object rowrow=datumReader.read(row,decoder);System.out.println(row.toString());}}}publicstaticvoidmain(String...args)throwsException{// Sets your Google Cloud Platform project ID.// String projectId = "YOUR_PROJECT_ID";StringprojectId=args[0];IntegersnapshotMillis=null;if(args.length >1){snapshotMillis=Integer.parseInt(args[1]);}try(BigQueryReadClientclient=BigQueryReadClient.create()){Stringparent=String.format("projects/%s",projectId);// This example uses baby name data from the public datasets.StringsrcTable=String.format("projects/%s/datasets/%s/tables/%s","bigquery-public-data","usa_names","usa_1910_current");// We specify the columns to be projected by adding them to the selected fields,// and set a simple filter to restrict which rows are transmitted.TableReadOptionsoptions=TableReadOptions.newBuilder().addSelectedFields("name").addSelectedFields("number").addSelectedFields("state").setRowRestriction("state = \"WA\"").build();// Start specifying the read session we want created.ReadSession.BuildersessionBuilder=ReadSession.newBuilder().setTable(srcTable)// This API can also deliver data serialized in Apache Avro format.// This example leverages Apache Avro..setDataFormat(DataFormat.AVRO).setReadOptions(options);// Optionally specify the snapshot time. When unspecified, snapshot time is "now".if(snapshotMillis!=null){Timestampt=Timestamp.newBuilder().setSeconds(snapshotMillis/1000).setNanos((int)((snapshotMillis%1000)*1000000)).build();TableModifiersmodifiers=TableModifiers.newBuilder().setSnapshotTime(t).build();sessionBuilder.setTableModifiers(modifiers);}// Begin building the session creation request.CreateReadSessionRequest.Builderbuilder=CreateReadSessionRequest.newBuilder().setParent(parent).setReadSession(sessionBuilder).setMaxStreamCount(1);// Request the session creation.ReadSessionsession=client.createReadSession(builder.build());SimpleRowReaderreader=newSimpleRowReader(newSchema.Parser().parse(session.getAvroSchema().getSchema()));// Assert that there are streams available in the session. An empty table may not have// data available. If no sessions are available for an anonymous (cached) table, consider// writing results of a query to a named table rather than consuming cached results directly.Preconditions.checkState(session.getStreamsCount() >0);// Use the first stream to perform reading.StringstreamName=session.getStreams(0).getName();ReadRowsRequestreadRowsRequest=ReadRowsRequest.newBuilder().setReadStream(streamName).build();// Process each block of rows as they arrive and decode using our simple row reader.ServerStream<ReadRowsResponse>stream=client.readRowsCallable().call(readRowsRequest);for(ReadRowsResponseresponse:stream){Preconditions.checkState(response.hasAvroRows());reader.processRows(response.getAvroRows());}}}}Node.js
Before trying this sample, follow theNode.js setup instructions in theBigQuery quickstart using client libraries. For more information, see theBigQueryNode.js API reference documentation.
To authenticate to BigQuery, set up Application Default Credentials. For more information, seeSet up authentication for client libraries.
// The read stream contains blocks of Avro-encoded bytes. We use the// 'avsc' library to decode these blocks. Install avsc with the following// command: npm install avscconstavro=require('avsc');// See reference documentation at// https://cloud.google.com/bigquery/docs/reference/storageconst{BigQueryReadClient}=require('@google-cloud/bigquery-storage');constclient=newBigQueryReadClient();asyncfunctionbigqueryStorageQuickstart(){// Get current project ID. The read session is created in this project.// This project can be different from that which contains the table.constmyProjectId=awaitclient.getProjectId();// This example reads baby name data from the public datasets.constprojectId='bigquery-public-data';constdatasetId='usa_names';consttableId='usa_1910_current';consttableReference=`projects/${projectId}/datasets/${datasetId}/tables/${tableId}`;constparent=`projects/${myProjectId}`;/* We limit the output columns to a subset of those allowed in the table, * and set a simple filter to only report names from the state of * Washington (WA). */constreadOptions={selectedFields:['name','number','state'],rowRestriction:'state = "WA"',};lettableModifiers=null;constsnapshotSeconds=0;// Set a snapshot time if it's been specified.if(snapshotSeconds >0){tableModifiers={snapshotTime:{seconds:snapshotSeconds}};}// API request.constrequest={parent,readSession:{table:tableReference,// This API can also deliver data serialized in Apache Arrow format.// This example leverages Apache Avro.dataFormat:'AVRO',readOptions,tableModifiers,},};const[session]=awaitclient.createReadSession(request);constschema=JSON.parse(session.avroSchema.schema);constavroType=avro.Type.forSchema(schema);/* The offset requested must be less than the last * row read from ReadRows. Requesting a larger offset is * undefined. */letoffset=0;constreadRowsRequest={// Required stream name and optional offset. Offset requested must be less than// the last row read from readRows(). Requesting a larger offset is undefined.readStream:session.streams[0].name,offset,};constnames=newSet();conststates=[];/* We'll use only a single stream for reading data from the table. Because * of dynamic sharding, this will yield all the rows in the table. However, * if you wanted to fan out multiple readers you could do so by having a * reader process each individual stream. */client.readRows(readRowsRequest).on('error',console.error).on('data',data=>{offset=data.avroRows.serializedBinaryRows.offset;try{// Decode all rows in bufferletpos;do{constdecodedData=avroType.decode(data.avroRows.serializedBinaryRows,pos,);if(decodedData.value){names.add(decodedData.value.name);if(!states.includes(decodedData.value.state)){states.push(decodedData.value.state);}}pos=decodedData.offset;}while(pos >0);}catch(error){console.log(error);}}).on('end',()=>{console.log(`Got${names.size} unique names in states:${states}`);console.log(`Last offset:${offset}`);});}PHP
Before trying this sample, follow thePHP setup instructions in theBigQuery quickstart using client libraries. For more information, see theBigQueryPHP API reference documentation.
To authenticate to BigQuery, set up Application Default Credentials. For more information, seeSet up authentication for client libraries.
// Includes the autoloader for libraries installed with composerrequire __DIR__ . '/vendor/autoload.php';use Google\Cloud\BigQuery\Storage\V1\Client\BigQueryReadClient;use Google\Cloud\BigQuery\Storage\V1\CreateReadSessionRequest;use Google\Cloud\BigQuery\Storage\V1\DataFormat;use Google\Cloud\BigQuery\Storage\V1\ReadRowsRequest;use Google\Cloud\BigQuery\Storage\V1\ReadSession;use Google\Cloud\BigQuery\Storage\V1\ReadSession\TableModifiers;use Google\Cloud\BigQuery\Storage\V1\ReadSession\TableReadOptions;use Google\Protobuf\Timestamp;// Instantiates the client and sets the project$client = new BigQueryReadClient();$project = $client->projectName('YOUR_PROJECT_ID');$snapshotMillis = 'YOUR_SNAPSHOT_MILLIS';// This example reads baby name data from the below public dataset.$table = $client->tableName( 'bigquery-public-data', 'usa_names', 'usa_1910_current');// This API can also deliver data serialized in Apache Arrow format.// This example leverages Apache Avro.$readSession = new ReadSession();$readSession->setTable($table)->setDataFormat(DataFormat::AVRO);// We limit the output columns to a subset of those allowed in the table,// and set a simple filter to only report names from the state of// Washington (WA).$readOptions = new TableReadOptions();$readOptions->setSelectedFields(['name', 'number', 'state']);$readOptions->setRowRestriction('state = "WA"');$readSession->setReadOptions($readOptions);// With snapshot millis if presentif (!empty($snapshotMillis)) { $timestamp = new Timestamp(); $timestamp->setSeconds($snapshotMillis / 1000); $timestamp->setNanos((int) ($snapshotMillis % 1000) * 1000000); $tableModifier = new TableModifiers(); $tableModifier->setSnapshotTime($timestamp); $readSession->setTableModifiers($tableModifier);}try { $createReadSessionRequest = (new CreateReadSessionRequest()) ->setParent($project) ->setReadSession($readSession) ->setMaxStreamCount(1); $session = $client->createReadSession($createReadSessionRequest); $readRowsRequest = (new ReadRowsRequest()) ->setReadStream($session->getStreams()[0]->getName()); $stream = $client->readRows($readRowsRequest); // Do any local processing by iterating over the responses. The // google-cloud-bigquery-storage client reconnects to the API after any // transient network errors or timeouts. $schema = ''; $names = []; $states = []; foreach ($stream->readAll() as $response) { $data = $response->getAvroRows()->getSerializedBinaryRows(); if ($response->hasAvroSchema()) { $schema = $response->getAvroSchema()->getSchema(); } $avroSchema = AvroSchema::parse($schema); $readIO = new AvroStringIO($data); $datumReader = new AvroIODatumReader($avroSchema); while (!$readIO->is_eof()) { $record = $datumReader->read(new AvroIOBinaryDecoder($readIO)); $names[$record['name']] = ''; $states[$record['state']] = ''; } } $states = array_keys($states); printf( 'Got %d unique names in states: %s' . PHP_EOL, count($names), implode(', ', $states) );} finally { $client->close();}Python
Before trying this sample, follow thePython setup instructions in theBigQuery quickstart using client libraries. For more information, see theBigQueryPython API reference documentation.
To authenticate to BigQuery, set up Application Default Credentials. For more information, seeSet up authentication for client libraries.
fromgoogle.cloud.bigquery_storageimportBigQueryReadClient,types# TODO(developer): Set the project_id variable.# project_id = 'your-project-id'## The read session is created in this project. This project can be# different from that which contains the table.client=BigQueryReadClient()# This example reads baby name data from the public datasets.table="projects/{}/datasets/{}/tables/{}".format("bigquery-public-data","usa_names","usa_1910_current")requested_session=types.ReadSession()requested_session.table=table# This API can also deliver data serialized in Apache Arrow format.# This example leverages Apache Avro.requested_session.data_format=types.DataFormat.AVRO# We limit the output columns to a subset of those allowed in the table,# and set a simple filter to only report names from the state of# Washington (WA).requested_session.read_options.selected_fields=["name","number","state"]requested_session.read_options.row_restriction='state = "WA"'# Set a snapshot time if it's been specified.ifsnapshot_millis >0:snapshot_time=types.Timestamp()snapshot_time.FromMilliseconds(snapshot_millis)requested_session.table_modifiers.snapshot_time=snapshot_timeparent="projects/{}".format(project_id)session=client.create_read_session(parent=parent,read_session=requested_session,# We'll use only a single stream for reading data from the table. However,# if you wanted to fan out multiple readers you could do so by having a# reader process each individual stream.max_stream_count=1,)reader=client.read_rows(session.streams[0].name)# The read stream contains blocks of Avro-encoded bytes. The rows() method# uses the fastavro library to parse these blocks as an iterable of Python# dictionaries. Install fastavro with the following command:## pip install google-cloud-bigquery-storage[fastavro]rows=reader.rows(session)# Do any local processing by iterating over the rows. The# google-cloud-bigquery-storage client reconnects to the API after any# transient network errors or timeouts.names=set()states=set()# fastavro returns EOFError instead of StopIterationError starting v1.8.4.# See https://github.com/googleapis/python-bigquery-storage/pull/687try:forrowinrows:names.add(row["name"])states.add(row["state"])exceptEOFError:passprint("Got{} unique names in states:{}".format(len(names),", ".join(states)))What's next
To search and filter code samples for other Google Cloud products, see theGoogle Cloud sample browser.
Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.