Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd5f7beb

Browse files
committed
Use BigQuery Storage client in Cursor if available
1 parent6560183 commitd5f7beb

File tree

3 files changed

+120
-0
lines changed

3 files changed

+120
-0
lines changed

‎google/cloud/bigquery/dbapi/_helpers.py‎

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
importsix
2525

2626
fromgoogle.cloudimportbigquery
27+
fromgoogle.cloud.bigqueryimporttable
2728
fromgoogle.cloud.bigquery.dbapiimportexceptions
2829

2930

@@ -218,3 +219,22 @@ def array_like(value):
218219
returnisinstance(value,collections_abc.Sequence)andnotisinstance(
219220
value, (six.text_type,six.binary_type,bytearray)
220221
)
222+
223+
224+
defto_bq_table_rows(rows_iterable):
225+
"""Convert table rows to BigQuery table Row instances.
226+
227+
Args:
228+
rows_iterable (Iterable[Mapping]):
229+
An iterable of row data items to convert to ``Row`` instances.
230+
231+
Returns:
232+
Iterable[google.cloud.bigquery.table.Row]
233+
"""
234+
235+
defto_table_row(row):
236+
values=tuple(row.values())
237+
keys_to_index= {key:ifori,keyinenumerate(row.keys())}
238+
returntable.Row(values,keys_to_index)
239+
240+
return (to_table_row(row_data)forrow_datainrows_iterable)

‎google/cloud/bigquery/dbapi/cursor.py‎

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,18 @@
2121
exceptImportError:# Python 2.7
2222
importcollectionsascollections_abc
2323

24+
importlogging
25+
2426
importsix
2527

2628
fromgoogle.cloud.bigqueryimportjob
2729
fromgoogle.cloud.bigquery.dbapiimport_helpers
2830
fromgoogle.cloud.bigquery.dbapiimportexceptions
2931
importgoogle.cloud.exceptions
3032

33+
34+
_LOGGER=logging.getLogger(__name__)
35+
3136
# Per PEP 249: A 7-item sequence containing information describing one result
3237
# column. The first two items (name and type_code) are mandatory, the other
3338
# five are optional and are set to None if no meaningful values can be
@@ -212,13 +217,74 @@ def _try_fetch(self, size=None):
212217

213218
ifself._query_dataisNone:
214219
client=self.connection._client
220+
bqstorage_client=self.connection._bqstorage_client
221+
222+
ifbqstorage_client:
223+
try:
224+
rows_iterable=self._bqstorage_fetch(bqstorage_client)
225+
self._query_data=_helpers.to_bq_table_rows(rows_iterable)
226+
return
227+
exceptgoogle.api_core.exceptions.Forbidden:
228+
# Don't hide errors such as insufficient permissions to create
229+
# a read session, or the API is not enabled. Both of those are
230+
# clearly problems if the developer has explicitly asked for
231+
# BigQuery Storage API support.
232+
raise
233+
exceptgoogle.api_core.exceptions.GoogleAPICallError:
234+
# There is an issue with reading from small anonymous
235+
# query results tables. If such an error occurs, we silence
236+
# it in order to try again with the tabledata.list API.
237+
_LOGGER.debug(
238+
"Error fetching data with BigQuery Storage API, "
239+
"falling back to tabledata.list API."
240+
)
241+
215242
rows_iter=client.list_rows(
216243
self._query_job.destination,
217244
selected_fields=self._query_job._query_results.schema,
218245
page_size=self.arraysize,
219246
)
220247
self._query_data=iter(rows_iter)
221248

249+
def_bqstorage_fetch(self,bqstorage_client):
250+
"""Start fetching data with the BigQuery Storage API.
251+
252+
The method assumes that the data about the relevant query job already
253+
exists internally.
254+
255+
Args:
256+
bqstorage_client(\
257+
google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient\
258+
):
259+
A client tha know how to talk to the BigQuery Storage API.
260+
261+
Returns:
262+
Iterable[Mapping]:
263+
A sequence of rows, represented as dictionaries.
264+
"""
265+
# NOTE: Given that BQ storage client instance is passed in, it means
266+
# that bigquery_storage_v1beta1 library is available (no ImportError).
267+
fromgoogle.cloudimportbigquery_storage_v1beta1
268+
269+
table_reference=self._query_job.destination
270+
271+
read_session=bqstorage_client.create_read_session(
272+
table_reference.to_bqstorage(),
273+
"projects/{}".format(table_reference.project),
274+
# only a single stream only, as DB API is not well-suited for multithreading
275+
requested_streams=1,
276+
)
277+
278+
ifnotread_session.streams:
279+
returniter([])# empty table, nothing to read
280+
281+
read_position=bigquery_storage_v1beta1.types.StreamPosition(
282+
stream=read_session.streams[0],
283+
)
284+
read_rows_stream=bqstorage_client.read_rows(read_position)
285+
rows_iterable=read_rows_stream.rows(read_session)
286+
returnrows_iterable
287+
222288
deffetchone(self):
223289
"""Fetch a single row from the results of the last ``execute*()`` call.
224290

‎tests/unit/test_dbapi__helpers.py‎

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
importdatetime
1616
importdecimal
1717
importmath
18+
importoperatorasop
1819
importunittest
1920

2021
importgoogle.cloud._helpers
22+
fromgoogle.cloud.bigqueryimporttable
2123
fromgoogle.cloud.bigquery.dbapiimport_helpers
2224
fromgoogle.cloud.bigquery.dbapiimportexceptions
2325

@@ -185,3 +187,35 @@ def test_to_query_parameters_w_list_dict_param(self):
185187
deftest_to_query_parameters_none_argument(self):
186188
query_parameters=_helpers.to_query_parameters(None)
187189
self.assertEqual(query_parameters, [])
190+
191+
192+
classTestToBqTableRows(unittest.TestCase):
193+
deftest_empty_iterable(self):
194+
rows_iterable=iter([])
195+
result=_helpers.to_bq_table_rows(rows_iterable)
196+
self.assertEqual(list(result), [])
197+
198+
deftest_non_empty_iterable(self):
199+
rows_iterable= [
200+
dict(foo=1.1,bar=1.2,baz=1.3,quux=1.4),
201+
dict(foo=2.1,bar=2.2,baz=2.3,quux=2.4),
202+
]
203+
204+
result=_helpers.to_bq_table_rows(rows_iterable)
205+
206+
rows=list(result)
207+
self.assertEqual(len(rows),2)
208+
209+
row_1,row_2=rows
210+
self.assertIsInstance(row_1,table.Row)
211+
self.assertIsInstance(row_2,table.Row)
212+
213+
field_value=op.itemgetter(1)
214+
215+
items=sorted(row_1.items(),key=field_value)
216+
expected_items= [("foo",1.1), ("bar",1.2), ("baz",1.3), ("quux",1.4)]
217+
self.assertEqual(items,expected_items)
218+
219+
items=sorted(row_2.items(),key=field_value)
220+
expected_items= [("foo",2.1), ("bar",2.2), ("baz",2.3), ("quux",2.4)]
221+
self.assertEqual(items,expected_items)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp