Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite75ff82

Browse files
plamutshollyman
andauthored
feat: use BigQuery Storage client by default (#55)
* feat: use BigQuery Storage client by default* Use BQ Storage API by default in cell magic* Add raise_on_closed helper decorator to DB API* Use BigQuery Storage API by default in DB API* Use BQ Storage v1 stable version in main client* Use BQ Storage v1 stable in BigQuery cell magic* Use BQ Storage v1 stable in DB API* Support both v1 stable and beta1 BQ Storage client* Fix some typos and redundant Beta mark* Use ARROW as data format in DB API cursor* feat: add HOUR support for time partitioning interval (#91)* feat: add HOUR support for time partitioning interval* Bump BQ storage pin to stable version.Co-authored-by: shollyman <shollyman@google.com>
1 parent3869e34 commite75ff82

File tree

18 files changed

+1195
-455
lines changed

18 files changed

+1195
-455
lines changed

‎google/cloud/bigquery/_pandas_helpers.py‎

Lines changed: 59 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@
2222
fromsix.movesimportqueue
2323

2424
try:
25-
fromgoogle.cloudimportbigquery_storage_v1beta1
25+
fromgoogle.cloudimportbigquery_storage_v1
2626
exceptImportError:# pragma: NO COVER
27-
bigquery_storage_v1beta1=None
27+
bigquery_storage_v1=None
2828

2929
try:
3030
importpandas
@@ -577,8 +577,19 @@ def _bqstorage_page_to_dataframe(column_names, dtypes, page):
577577
def_download_table_bqstorage_stream(
578578
download_state,bqstorage_client,session,stream,worker_queue,page_to_item
579579
):
580-
position=bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
581-
rowstream=bqstorage_client.read_rows(position).rows(session)
580+
# Passing a BQ Storage client in implies that the BigQuery Storage library
581+
# is available and can be imported.
582+
fromgoogle.cloudimportbigquery_storage_v1beta1
583+
584+
# We want to preserve comaptibility with the v1beta1 BQ Storage clients,
585+
# thus adjust constructing the rowstream if needed.
586+
# The assumption is that the caller provides a BQ Storage `session` that is
587+
# compatible with the version of the BQ Storage client passed in.
588+
ifisinstance(bqstorage_client,bigquery_storage_v1beta1.BigQueryStorageClient):
589+
position=bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
590+
rowstream=bqstorage_client.read_rows(position).rows(session)
591+
else:
592+
rowstream=bqstorage_client.read_rows(stream.name).rows(session)
582593

583594
forpageinrowstream.pages:
584595
ifdownload_state.done:
@@ -610,29 +621,57 @@ def _download_table_bqstorage(
610621
page_to_item=None,
611622
):
612623
"""Use (faster, but billable) BQ Storage API to construct DataFrame."""
624+
625+
# Passing a BQ Storage client in implies that the BigQuery Storage library
626+
# is available and can be imported.
627+
fromgoogle.cloudimportbigquery_storage_v1
628+
fromgoogle.cloudimportbigquery_storage_v1beta1
629+
613630
if"$"intable.table_id:
614631
raiseValueError(
615632
"Reading from a specific partition is not currently supported."
616633
)
617634
if"@"intable.table_id:
618635
raiseValueError("Reading from a specific snapshot is not currently supported.")
619636

620-
read_options=bigquery_storage_v1beta1.types.TableReadOptions()
621-
ifselected_fieldsisnotNone:
622-
forfieldinselected_fields:
623-
read_options.selected_fields.append(field.name)
624-
625-
requested_streams=0
626-
ifpreserve_order:
627-
requested_streams=1
628-
629-
session=bqstorage_client.create_read_session(
630-
table.to_bqstorage(),
631-
"projects/{}".format(project_id),
632-
format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
633-
read_options=read_options,
634-
requested_streams=requested_streams,
635-
)
637+
requested_streams=1ifpreserve_orderelse0
638+
639+
# We want to preserve comaptibility with the v1beta1 BQ Storage clients,
640+
# thus adjust the session creation if needed.
641+
ifisinstance(bqstorage_client,bigquery_storage_v1beta1.BigQueryStorageClient):
642+
warnings.warn(
643+
"Support for BigQuery Storage v1beta1 clients is deprecated, please "
644+
"consider upgrading the client to BigQuery Storage v1 stable version.",
645+
category=DeprecationWarning,
646+
)
647+
read_options=bigquery_storage_v1beta1.types.TableReadOptions()
648+
649+
ifselected_fieldsisnotNone:
650+
forfieldinselected_fields:
651+
read_options.selected_fields.append(field.name)
652+
653+
session=bqstorage_client.create_read_session(
654+
table.to_bqstorage(v1beta1=True),
655+
"projects/{}".format(project_id),
656+
format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
657+
read_options=read_options,
658+
requested_streams=requested_streams,
659+
)
660+
else:
661+
requested_session=bigquery_storage_v1.types.ReadSession(
662+
table=table.to_bqstorage(),
663+
data_format=bigquery_storage_v1.enums.DataFormat.ARROW,
664+
)
665+
ifselected_fieldsisnotNone:
666+
forfieldinselected_fields:
667+
requested_session.read_options.selected_fields.append(field.name)
668+
669+
session=bqstorage_client.create_read_session(
670+
parent="projects/{}".format(project_id),
671+
read_session=requested_session,
672+
max_stream_count=requested_streams,
673+
)
674+
636675
_LOGGER.debug(
637676
"Started reading table '{}.{}.{}' with BQ Storage API session '{}'.".format(
638677
table.project,table.dataset_id,table.table_id,session.name

‎google/cloud/bigquery/client.py‎

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -397,15 +397,23 @@ def dataset(self, dataset_id, project=None):
397397
def_create_bqstorage_client(self):
398398
"""Create a BigQuery Storage API client using this client's credentials.
399399
400+
If a client cannot be created due to missing dependencies, raise a
401+
warning and return ``None``.
402+
400403
Returns:
401-
google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient:
404+
Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient]:
402405
A BigQuery Storage API client.
403406
"""
404-
fromgoogle.cloudimportbigquery_storage_v1beta1
407+
try:
408+
fromgoogle.cloudimportbigquery_storage_v1
409+
exceptImportError:
410+
warnings.warn(
411+
"Cannot create BigQuery Storage client, the dependency "
412+
"google-cloud-bigquery-storage is not installed."
413+
)
414+
returnNone
405415

406-
returnbigquery_storage_v1beta1.BigQueryStorageClient(
407-
credentials=self._credentials
408-
)
416+
returnbigquery_storage_v1.BigQueryReadClient(credentials=self._credentials)
409417

410418
defcreate_dataset(
411419
self,dataset,exists_ok=False,retry=DEFAULT_RETRY,timeout=None

‎google/cloud/bigquery/dbapi/_helpers.py‎

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
importdatetime
2121
importdecimal
22+
importfunctools
2223
importnumbers
2324

2425
importsix
@@ -233,8 +234,52 @@ def to_bq_table_rows(rows_iterable):
233234
"""
234235

235236
defto_table_row(row):
236-
values=tuple(row.values())
237+
# NOTE: We fetch ARROW values, thus we need to convert them to Python
238+
# objects with as_py().
239+
values=tuple(value.as_py()forvalueinrow.values())
237240
keys_to_index= {key:ifori,keyinenumerate(row.keys())}
238241
returntable.Row(values,keys_to_index)
239242

240243
return (to_table_row(row_data)forrow_datainrows_iterable)
244+
245+
246+
defraise_on_closed(
247+
exc_msg,exc_class=exceptions.ProgrammingError,closed_attr_name="_closed"
248+
):
249+
"""Make public instance methods raise an error if the instance is closed."""
250+
251+
def_raise_on_closed(method):
252+
"""Make a non-static method raise an error if its containing instance is closed.
253+
"""
254+
255+
defwith_closed_check(self,*args,**kwargs):
256+
ifgetattr(self,closed_attr_name):
257+
raiseexc_class(exc_msg)
258+
returnmethod(self,*args,**kwargs)
259+
260+
functools.update_wrapper(with_closed_check,method)
261+
returnwith_closed_check
262+
263+
defdecorate_public_methods(klass):
264+
"""Apply ``_raise_on_closed()`` decorator to public instance methods.
265+
"""
266+
fornameindir(klass):
267+
ifname.startswith("_"):
268+
continue
269+
270+
member=getattr(klass,name)
271+
ifnotcallable(member):
272+
continue
273+
274+
# We need to check for class/static methods directly in the instance
275+
# __dict__, not via the retrieved attribute (`member`), as the
276+
# latter is already a callable *produced* by one of these descriptors.
277+
ifisinstance(klass.__dict__[name], (staticmethod,classmethod)):
278+
continue
279+
280+
member=_raise_on_closed(member)
281+
setattr(klass,name,member)
282+
283+
returnklass
284+
285+
returndecorate_public_methods

‎google/cloud/bigquery/dbapi/connection.py‎

Lines changed: 60 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,30 @@
1414

1515
"""Connection for the Google BigQuery DB-API."""
1616

17+
importweakref
18+
1719
fromgoogle.cloudimportbigquery
1820
fromgoogle.cloud.bigquery.dbapiimportcursor
21+
fromgoogle.cloud.bigquery.dbapiimport_helpers
1922

2023

24+
@_helpers.raise_on_closed("Operating on a closed connection.")
2125
classConnection(object):
2226
"""DB-API Connection to Google BigQuery.
2327
2428
Args:
25-
client (google.cloud.bigquery.Client): A client used to connect to BigQuery.
29+
client (Optional[google.cloud.bigquery.Client]):
30+
A REST API client used to connect to BigQuery. If not passed, a
31+
client is created using default options inferred from the environment.
2632
bqstorage_client(\
27-
Optional[google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient]\
33+
Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient]\
2834
):
29-
[Beta] An alternative client that uses the faster BigQuery Storage
30-
API to fetch rows from BigQuery. If both clients are given,
31-
``bqstorage_client`` is used first to fetch query results,
32-
with a fallback on ``client``, if necessary.
35+
A client that uses the faster BigQuery Storage API to fetch rows from
36+
BigQuery. If not passed, it is created using the same credentials
37+
as ``client``.
38+
39+
When fetching query results, ``bqstorage_client`` is used first, with
40+
a fallback on ``client``, if necessary.
3341
3442
.. note::
3543
There is a known issue with the BigQuery Storage API with small
@@ -38,39 +46,74 @@ class Connection(object):
3846
https://github.com/googleapis/python-bigquery-storage/issues/2
3947
"""
4048

41-
def__init__(self,client,bqstorage_client=None):
49+
def__init__(self,client=None,bqstorage_client=None):
50+
ifclientisNone:
51+
client=bigquery.Client()
52+
self._owns_client=True
53+
else:
54+
self._owns_client=False
55+
56+
ifbqstorage_clientisNone:
57+
# A warning is already raised by the factory if instantiation fails.
58+
bqstorage_client=client._create_bqstorage_client()
59+
self._owns_bqstorage_client=bqstorage_clientisnotNone
60+
else:
61+
self._owns_bqstorage_client=False
62+
4263
self._client=client
4364
self._bqstorage_client=bqstorage_client
4465

66+
self._closed=False
67+
self._cursors_created=weakref.WeakSet()
68+
4569
defclose(self):
46-
"""No-op."""
70+
"""Close the connection and any cursors created from it.
71+
72+
Any BigQuery clients explicitly passed to the constructor are *not*
73+
closed, only those created by the connection instance itself.
74+
"""
75+
self._closed=True
76+
77+
ifself._owns_client:
78+
self._client.close()
79+
80+
ifself._owns_bqstorage_client:
81+
# There is no close() on the BQ Storage client itself.
82+
self._bqstorage_client.transport.channel.close()
83+
84+
forcursor_inself._cursors_created:
85+
cursor_.close()
4786

4887
defcommit(self):
49-
"""No-op."""
88+
"""No-op, but for consistency raise an error if connection is closed."""
5089

5190
defcursor(self):
5291
"""Return a new cursor object.
5392
5493
Returns:
5594
google.cloud.bigquery.dbapi.Cursor: A DB-API cursor that uses this connection.
5695
"""
57-
returncursor.Cursor(self)
96+
new_cursor=cursor.Cursor(self)
97+
self._cursors_created.add(new_cursor)
98+
returnnew_cursor
5899

59100

60101
defconnect(client=None,bqstorage_client=None):
61102
"""Construct a DB-API connection to Google BigQuery.
62103
63104
Args:
64105
client (Optional[google.cloud.bigquery.Client]):
65-
A client used to connect to BigQuery. If not passed, a client is
66-
created using default options inferred from the environment.
106+
AREST APIclient used to connect to BigQuery. If not passed, a
107+
client iscreated using default options inferred from the environment.
67108
bqstorage_client(\
68-
Optional[google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient]\
109+
Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient]\
69110
):
70-
[Beta] An alternative client that uses the faster BigQuery Storage
71-
API to fetch rows from BigQuery. If both clients are given,
72-
``bqstorage_client`` is used first to fetch query results,
73-
with a fallback on ``client``, if necessary.
111+
A client that uses the faster BigQuery Storage API to fetch rows from
112+
BigQuery. If not passed, it is created using the same credentials
113+
as ``client``.
114+
115+
When fetching query results, ``bqstorage_client`` is used first, with
116+
a fallback on ``client``, if necessary.
74117
75118
.. note::
76119
There is a known issue with the BigQuery Storage API with small
@@ -81,6 +124,4 @@ def connect(client=None, bqstorage_client=None):
81124
Returns:
82125
google.cloud.bigquery.dbapi.Connection: A new DB-API connection to BigQuery.
83126
"""
84-
ifclientisNone:
85-
client=bigquery.Client()
86127
returnConnection(client,bqstorage_client)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp