Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit0c3476d

Browse files
authored
perf: usejobs.getQueryResults to download result sets (#363)
* refactor: break job into multiple modulesOriginal paths are retained for backwards compatibility.* perf: use `jobs.getQueryResults` to download result setsReplaces `tabledata.list` when `RowIterator` is used for query results.This likely also fixes a few edge cases around BigQuery scripting jobs.* revert unnecessary changes to _get_query_results* simplify RowIterator. no need to hack Table object* fix tests for bqstorage warning* populate location
1 parent2849e56 commit0c3476d

File tree

11 files changed

+256
-131
lines changed

11 files changed

+256
-131
lines changed

‎google/cloud/bigquery/_pandas_helpers.py‎

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,7 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SN
474474
pyarrow.parquet.write_table(arrow_table,filepath,compression=parquet_compression)
475475

476476

477-
def_tabledata_list_page_to_arrow(page,column_names,arrow_types):
477+
def_row_iterator_page_to_arrow(page,column_names,arrow_types):
478478
# Iterate over the page to force the API request to get the page data.
479479
try:
480480
next(iter(page))
@@ -490,8 +490,8 @@ def _tabledata_list_page_to_arrow(page, column_names, arrow_types):
490490
returnpyarrow.RecordBatch.from_arrays(arrays,names=column_names)
491491

492492

493-
defdownload_arrow_tabledata_list(pages,bq_schema):
494-
"""Usetabledata.list to construct an iterable of RecordBatches.
493+
defdownload_arrow_row_iterator(pages,bq_schema):
494+
"""UseHTTP JSON RowIterator to construct an iterable of RecordBatches.
495495
496496
Args:
497497
pages (Iterator[:class:`google.api_core.page_iterator.Page`]):
@@ -510,10 +510,10 @@ def download_arrow_tabledata_list(pages, bq_schema):
510510
arrow_types= [bq_to_arrow_data_type(field)forfieldinbq_schema]
511511

512512
forpageinpages:
513-
yield_tabledata_list_page_to_arrow(page,column_names,arrow_types)
513+
yield_row_iterator_page_to_arrow(page,column_names,arrow_types)
514514

515515

516-
def_tabledata_list_page_to_dataframe(page,column_names,dtypes):
516+
def_row_iterator_page_to_dataframe(page,column_names,dtypes):
517517
# Iterate over the page to force the API request to get the page data.
518518
try:
519519
next(iter(page))
@@ -528,8 +528,8 @@ def _tabledata_list_page_to_dataframe(page, column_names, dtypes):
528528
returnpandas.DataFrame(columns,columns=column_names)
529529

530530

531-
defdownload_dataframe_tabledata_list(pages,bq_schema,dtypes):
532-
"""Use(slower, but free) tabledata.list to construct a DataFrame.
531+
defdownload_dataframe_row_iterator(pages,bq_schema,dtypes):
532+
"""UseHTTP JSON RowIterator to construct a DataFrame.
533533
534534
Args:
535535
pages (Iterator[:class:`google.api_core.page_iterator.Page`]):
@@ -549,7 +549,7 @@ def download_dataframe_tabledata_list(pages, bq_schema, dtypes):
549549
bq_schema=schema._to_schema_fields(bq_schema)
550550
column_names= [field.nameforfieldinbq_schema]
551551
forpageinpages:
552-
yield_tabledata_list_page_to_dataframe(page,column_names,dtypes)
552+
yield_row_iterator_page_to_dataframe(page,column_names,dtypes)
553553

554554

555555
def_bqstorage_page_to_arrow(page):

‎google/cloud/bigquery/client.py‎

Lines changed: 91 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -80,18 +80,19 @@
8080
_MAX_MULTIPART_SIZE=5*1024*1024
8181
_DEFAULT_NUM_RETRIES=6
8282
_BASE_UPLOAD_TEMPLATE= (
83-
u"https://bigquery.googleapis.com/upload/bigquery/v2/projects/"
84-
u"{project}/jobs?uploadType="
83+
"https://bigquery.googleapis.com/upload/bigquery/v2/projects/"
84+
"{project}/jobs?uploadType="
8585
)
86-
_MULTIPART_URL_TEMPLATE=_BASE_UPLOAD_TEMPLATE+u"multipart"
87-
_RESUMABLE_URL_TEMPLATE=_BASE_UPLOAD_TEMPLATE+u"resumable"
88-
_GENERIC_CONTENT_TYPE=u"*/*"
86+
_MULTIPART_URL_TEMPLATE=_BASE_UPLOAD_TEMPLATE+"multipart"
87+
_RESUMABLE_URL_TEMPLATE=_BASE_UPLOAD_TEMPLATE+"resumable"
88+
_GENERIC_CONTENT_TYPE="*/*"
8989
_READ_LESS_THAN_SIZE= (
9090
"Size {:d} was specified but the file-like object only had ""{:d} bytes remaining."
9191
)
9292
_NEED_TABLE_ARGUMENT= (
9393
"The table argument should be a table ID string, Table, or TableReference"
9494
)
95+
_LIST_ROWS_FROM_QUERY_RESULTS_FIELDS="jobReference,totalRows,pageToken,rows"
9596

9697

9798
classProject(object):
@@ -293,7 +294,7 @@ def api_request(*args, **kwargs):
293294
span_attributes=span_attributes,
294295
*args,
295296
timeout=timeout,
296-
**kwargs
297+
**kwargs,
297298
)
298299

299300
returnpage_iterator.HTTPIterator(
@@ -371,7 +372,7 @@ def api_request(*args, **kwargs):
371372
span_attributes=span_attributes,
372373
*args,
373374
timeout=timeout,
374-
**kwargs
375+
**kwargs,
375376
)
376377

377378
returnpage_iterator.HTTPIterator(
@@ -1129,7 +1130,7 @@ def api_request(*args, **kwargs):
11291130
span_attributes=span_attributes,
11301131
*args,
11311132
timeout=timeout,
1132-
**kwargs
1133+
**kwargs,
11331134
)
11341135

11351136
result=page_iterator.HTTPIterator(
@@ -1207,7 +1208,7 @@ def api_request(*args, **kwargs):
12071208
span_attributes=span_attributes,
12081209
*args,
12091210
timeout=timeout,
1210-
**kwargs
1211+
**kwargs,
12111212
)
12121213

12131214
result=page_iterator.HTTPIterator(
@@ -1284,7 +1285,7 @@ def api_request(*args, **kwargs):
12841285
span_attributes=span_attributes,
12851286
*args,
12861287
timeout=timeout,
1287-
**kwargs
1288+
**kwargs,
12881289
)
12891290

12901291
result=page_iterator.HTTPIterator(
@@ -1510,7 +1511,7 @@ def delete_table(
15101511
raise
15111512

15121513
def_get_query_results(
1513-
self,job_id,retry,project=None,timeout_ms=None,location=None,timeout=None
1514+
self,job_id,retry,project=None,timeout_ms=None,location=None,timeout=None,
15141515
):
15151516
"""Get the query results object for a query job.
15161517
@@ -1890,7 +1891,7 @@ def api_request(*args, **kwargs):
18901891
span_attributes=span_attributes,
18911892
*args,
18921893
timeout=timeout,
1893-
**kwargs
1894+
**kwargs,
18941895
)
18951896

18961897
returnpage_iterator.HTTPIterator(
@@ -2374,7 +2375,7 @@ def load_table_from_json(
23742375

23752376
destination=_table_arg_to_table_ref(destination,default_project=self.project)
23762377

2377-
data_str=u"\n".join(json.dumps(item)foriteminjson_rows)
2378+
data_str="\n".join(json.dumps(item)foriteminjson_rows)
23782379
encoded_str=data_str.encode()
23792380
data_file=io.BytesIO(encoded_str)
23802381
returnself.load_table_from_file(
@@ -3169,6 +3170,83 @@ def list_rows(
31693170
# Pass in selected_fields separately from schema so that full
31703171
# tables can be fetched without a column filter.
31713172
selected_fields=selected_fields,
3173+
total_rows=getattr(table,"num_rows",None),
3174+
)
3175+
returnrow_iterator
3176+
3177+
def_list_rows_from_query_results(
3178+
self,
3179+
job_id,
3180+
location,
3181+
project,
3182+
schema,
3183+
total_rows=None,
3184+
destination=None,
3185+
max_results=None,
3186+
start_index=None,
3187+
page_size=None,
3188+
retry=DEFAULT_RETRY,
3189+
timeout=None,
3190+
):
3191+
"""List the rows of a completed query.
3192+
See
3193+
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/getQueryResults
3194+
Args:
3195+
job_id (str):
3196+
ID of a query job.
3197+
location (str): Location of the query job.
3198+
project (str):
3199+
ID of the project where the query job was run.
3200+
schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
3201+
The fields expected in these query results. Used to convert
3202+
from JSON to expected Python types.
3203+
total_rows (Optional[int]):
3204+
Total number of rows in the query results.
3205+
destination (Optional[Union[\
3206+
google.cloud.bigquery.table.Table,\
3207+
google.cloud.bigquery.table.TableListItem,\
3208+
google.cloud.bigquery.table.TableReference,\
3209+
str,\
3210+
]]):
3211+
Destination table reference. Used to fetch the query results
3212+
with the BigQuery Storage API.
3213+
max_results (Optional[int]):
3214+
Maximum number of rows to return across the whole iterator.
3215+
start_index (Optional[int]):
3216+
The zero-based index of the starting row to read.
3217+
page_size (Optional[int]):
3218+
The maximum number of rows in each page of results from this request.
3219+
Non-positive values are ignored. Defaults to a sensible value set by the API.
3220+
retry (Optional[google.api_core.retry.Retry]):
3221+
How to retry the RPC.
3222+
timeout (Optional[float]):
3223+
The number of seconds to wait for the underlying HTTP transport
3224+
before using ``retry``.
3225+
If multiple requests are made under the hood, ``timeout``
3226+
applies to each individual request.
3227+
Returns:
3228+
google.cloud.bigquery.table.RowIterator:
3229+
Iterator of row data
3230+
:class:`~google.cloud.bigquery.table.Row`-s.
3231+
"""
3232+
params= {
3233+
"fields":_LIST_ROWS_FROM_QUERY_RESULTS_FIELDS,
3234+
"location":location,
3235+
}
3236+
3237+
ifstart_indexisnotNone:
3238+
params["startIndex"]=start_index
3239+
3240+
row_iterator=RowIterator(
3241+
client=self,
3242+
api_request=functools.partial(self._call_api,retry,timeout=timeout),
3243+
path=f"/projects/{project}/queries/{job_id}",
3244+
schema=schema,
3245+
max_results=max_results,
3246+
page_size=page_size,
3247+
table=destination,
3248+
extra_params=params,
3249+
total_rows=total_rows,
31723250
)
31733251
returnrow_iterator
31743252

‎google/cloud/bigquery/job/query.py‎

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
fromgoogle.cloud.bigquery.tableimport_EmptyRowIterator
3939
fromgoogle.cloud.bigquery.tableimportRangePartitioning
4040
fromgoogle.cloud.bigquery.tableimport_table_arg_to_table_ref
41-
fromgoogle.cloud.bigquery.tableimportTable
4241
fromgoogle.cloud.bigquery.tableimportTableReference
4342
fromgoogle.cloud.bigquery.tableimportTimePartitioning
4443

@@ -1159,12 +1158,13 @@ def result(
11591158
ifself._query_results.total_rowsisNone:
11601159
return_EmptyRowIterator()
11611160

1162-
schema=self._query_results.schema
1163-
dest_table_ref=self.destination
1164-
dest_table=Table(dest_table_ref,schema=schema)
1165-
dest_table._properties["numRows"]=self._query_results.total_rows
1166-
rows=self._client.list_rows(
1167-
dest_table,
1161+
rows=self._client._list_rows_from_query_results(
1162+
self._query_results.job_id,
1163+
self.location,
1164+
self._query_results.project,
1165+
self._query_results.schema,
1166+
total_rows=self._query_results.total_rows,
1167+
destination=self.destination,
11681168
page_size=page_size,
11691169
max_results=max_results,
11701170
start_index=start_index,

‎google/cloud/bigquery/table.py‎

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1306,6 +1306,8 @@ class RowIterator(HTTPIterator):
13061306
call the BigQuery Storage API to fetch rows.
13071307
selected_fields (Optional[Sequence[google.cloud.bigquery.schema.SchemaField]]):
13081308
A subset of columns to select from this table.
1309+
total_rows (Optional[int]):
1310+
Total number of rows in the table.
13091311
13101312
"""
13111313

@@ -1321,6 +1323,7 @@ def __init__(
13211323
extra_params=None,
13221324
table=None,
13231325
selected_fields=None,
1326+
total_rows=None,
13241327
):
13251328
super(RowIterator,self).__init__(
13261329
client,
@@ -1342,7 +1345,7 @@ def __init__(
13421345
self._schema=schema
13431346
self._selected_fields=selected_fields
13441347
self._table=table
1345-
self._total_rows=getattr(table,"num_rows",None)
1348+
self._total_rows=total_rows
13461349

13471350
def_get_next_page_response(self):
13481351
"""Requests the next page from the path provided.
@@ -1419,7 +1422,7 @@ def _to_arrow_iterable(self, bqstorage_client=None):
14191422
selected_fields=self._selected_fields,
14201423
)
14211424
tabledata_list_download=functools.partial(
1422-
_pandas_helpers.download_arrow_tabledata_list,iter(self.pages),self.schema
1425+
_pandas_helpers.download_arrow_row_iterator,iter(self.pages),self.schema
14231426
)
14241427
returnself._to_page_iterable(
14251428
bqstorage_download,
@@ -1496,7 +1499,7 @@ def to_arrow(
14961499
)andself.max_resultsisnotNone:
14971500
warnings.warn(
14981501
"Cannot use bqstorage_client if max_results is set, "
1499-
"reverting to fetching data with thetabledata.list endpoint.",
1502+
"reverting to fetching data with theREST endpoint.",
15001503
stacklevel=2,
15011504
)
15021505
create_bqstorage_client=False
@@ -1582,7 +1585,7 @@ def to_dataframe_iterable(self, bqstorage_client=None, dtypes=None):
15821585
selected_fields=self._selected_fields,
15831586
)
15841587
tabledata_list_download=functools.partial(
1585-
_pandas_helpers.download_dataframe_tabledata_list,
1588+
_pandas_helpers.download_dataframe_row_iterator,
15861589
iter(self.pages),
15871590
self.schema,
15881591
dtypes,
@@ -1680,7 +1683,7 @@ def to_dataframe(
16801683
)andself.max_resultsisnotNone:
16811684
warnings.warn(
16821685
"Cannot use bqstorage_client if max_results is set, "
1683-
"reverting to fetching data with thetabledata.list endpoint.",
1686+
"reverting to fetching data with theREST endpoint.",
16841687
stacklevel=2,
16851688
)
16861689
create_bqstorage_client=False
@@ -2167,7 +2170,7 @@ def _item_to_row(iterator, resource):
21672170
)
21682171

21692172

2170-
def_tabledata_list_page_columns(schema,response):
2173+
def_row_iterator_page_columns(schema,response):
21712174
"""Make a generator of all the columns in a page from tabledata.list.
21722175
21732176
This enables creating a :class:`pandas.DataFrame` and other
@@ -2197,7 +2200,7 @@ def _rows_page_start(iterator, page, response):
21972200
"""
21982201
# Make a (lazy) copy of the page in column-oriented format for use in data
21992202
# science packages.
2200-
page._columns=_tabledata_list_page_columns(iterator._schema,response)
2203+
page._columns=_row_iterator_page_columns(iterator._schema,response)
22012204

22022205
total_rows=response.get("totalRows")
22032206
iftotal_rowsisnotNone:

‎tests/unit/job/helpers.py‎

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ def _make_job_resource(
6060
endpoint="https://bigquery.googleapis.com",
6161
job_type="load",
6262
job_id="a-random-id",
63+
location="US",
6364
project_id="some-project",
6465
user_email="bq-user@example.com",
6566
):
@@ -69,7 +70,11 @@ def _make_job_resource(
6970
"statistics": {"creationTime":creation_time_ms,job_type: {}},
7071
"etag":etag,
7172
"id":"{}:{}".format(project_id,job_id),
72-
"jobReference": {"projectId":project_id,"jobId":job_id},
73+
"jobReference": {
74+
"projectId":project_id,
75+
"jobId":job_id,
76+
"location":location,
77+
},
7378
"selfLink":"{}/bigquery/v2/projects/{}/jobs/{}".format(
7479
endpoint,project_id,job_id
7580
),
@@ -130,7 +135,7 @@ def _table_ref(self, table_id):
130135

131136
returnTableReference(self.DS_REF,table_id)
132137

133-
def_make_resource(self,started=False,ended=False):
138+
def_make_resource(self,started=False,ended=False,location="US"):
134139
self._setUpConstants()
135140
return_make_job_resource(
136141
creation_time_ms=int(self.WHEN_TS*1000),
@@ -144,6 +149,7 @@ def _make_resource(self, started=False, ended=False):
144149
job_id=self.JOB_ID,
145150
project_id=self.PROJECT,
146151
user_email=self.USER_EMAIL,
152+
location=location,
147153
)
148154

149155
def_verifyInitialReadonlyProperties(self,job):

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp