Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitdde9dc5

Browse files
authored
feat: use pyarrow stream compression, if available (#593)
* feat: use pyarrow stream compression, if available* Remove unnecessary pyarrow version checkArrow stream compression requires pyarrow>=1.0.0, but that's alreadyguaranteed by a version pin in setup.py if bqstorage extra isinstalled.* Remvoe unused pyarrow version parsing in tests* Only use arrow compression in tests if available
1 parentc8b5581 commitdde9dc5

File tree

5 files changed

+146
-14
lines changed

5 files changed

+146
-14
lines changed

‎google/cloud/bigquery/_pandas_helpers.py‎

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,14 @@
3333
exceptImportError:# pragma: NO COVER
3434
pyarrow=None
3535

36+
try:
37+
fromgoogle.cloud.bigquery_storageimportArrowSerializationOptions
38+
exceptImportError:
39+
_ARROW_COMPRESSION_SUPPORT=False
40+
else:
41+
# Having BQ Storage available implies that pyarrow >=1.0.0 is available, too.
42+
_ARROW_COMPRESSION_SUPPORT=True
43+
3644
fromgoogle.cloud.bigqueryimportschema
3745

3846

@@ -631,6 +639,11 @@ def _download_table_bqstorage(
631639
forfieldinselected_fields:
632640
requested_session.read_options.selected_fields.append(field.name)
633641

642+
if_ARROW_COMPRESSION_SUPPORT:
643+
requested_session.read_options.arrow_serialization_options.buffer_compression= (
644+
ArrowSerializationOptions.CompressionCodec.LZ4_FRAME
645+
)
646+
634647
session=bqstorage_client.create_read_session(
635648
parent="projects/{}".format(project_id),
636649
read_session=requested_session,

‎google/cloud/bigquery/dbapi/cursor.py‎

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,14 @@
1919
importcopy
2020
importlogging
2121

22+
try:
23+
fromgoogle.cloud.bigquery_storageimportArrowSerializationOptions
24+
exceptImportError:
25+
_ARROW_COMPRESSION_SUPPORT=False
26+
else:
27+
# Having BQ Storage available implies that pyarrow >=1.0.0 is available, too.
28+
_ARROW_COMPRESSION_SUPPORT=True
29+
2230
fromgoogle.cloud.bigqueryimportjob
2331
fromgoogle.cloud.bigquery.dbapiimport_helpers
2432
fromgoogle.cloud.bigquery.dbapiimportexceptions
@@ -255,6 +263,12 @@ def _bqstorage_fetch(self, bqstorage_client):
255263
table=table_reference.to_bqstorage(),
256264
data_format=bigquery_storage.types.DataFormat.ARROW,
257265
)
266+
267+
if_ARROW_COMPRESSION_SUPPORT:
268+
requested_session.read_options.arrow_serialization_options.buffer_compression= (
269+
ArrowSerializationOptions.CompressionCodec.LZ4_FRAME
270+
)
271+
258272
read_session=bqstorage_client.create_read_session(
259273
parent="projects/{}".format(table_reference.project),
260274
read_session=requested_session,

‎tests/system/test_client.py‎

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828

2929
importpsutil
3030
importpytest
31-
importpkg_resources
3231

3332
fromgoogle.cloud.bigquery._pandas_helpersimport_BIGNUMERIC_SUPPORT
3433
from .importhelpers
@@ -116,13 +115,6 @@
116115
(TooManyRequests,InternalServerError,ServiceUnavailable)
117116
)
118117

119-
PYARROW_MINIMUM_VERSION=pkg_resources.parse_version("0.17.0")
120-
121-
ifpyarrow:
122-
PYARROW_INSTALLED_VERSION=pkg_resources.get_distribution("pyarrow").parsed_version
123-
else:
124-
PYARROW_INSTALLED_VERSION=None
125-
126118
MTLS_TESTING=os.getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE")=="true"
127119

128120

‎tests/unit/job/test_query_pandas.py‎

Lines changed: 72 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,22 @@
4141
from .helpersimport_make_job_resource
4242

4343

44+
@pytest.fixture
45+
deftable_read_options_kwarg():
46+
# Create a BigQuery Storage table read options object with pyarrow compression
47+
# enabled if a recent-enough version of google-cloud-bigquery-storage dependency is
48+
# installed to support the compression.
49+
ifnothasattr(bigquery_storage,"ArrowSerializationOptions"):
50+
return {}
51+
52+
read_options=bigquery_storage.ReadSession.TableReadOptions(
53+
arrow_serialization_options=bigquery_storage.ArrowSerializationOptions(
54+
buffer_compression=bigquery_storage.ArrowSerializationOptions.CompressionCodec.LZ4_FRAME
55+
)
56+
)
57+
return {"read_options":read_options}
58+
59+
4460
@pytest.mark.parametrize(
4561
"query,expected",
4662
(
@@ -82,7 +98,7 @@ def test__contains_order_by(query, expected):
8298
"SelecT name, age froM table OrdeR\n\t BY other_column;",
8399
),
84100
)
85-
deftest_to_dataframe_bqstorage_preserve_order(query):
101+
deftest_to_dataframe_bqstorage_preserve_order(query,table_read_options_kwarg):
86102
fromgoogle.cloud.bigquery.jobimportQueryJobastarget_class
87103

88104
job_resource=_make_job_resource(
@@ -123,8 +139,10 @@ def test_to_dataframe_bqstorage_preserve_order(query):
123139
destination_table="projects/{projectId}/datasets/{datasetId}/tables/{tableId}".format(
124140
**job_resource["configuration"]["query"]["destinationTable"]
125141
)
126-
expected_session=bigquery_storage.types.ReadSession(
127-
table=destination_table,data_format=bigquery_storage.types.DataFormat.ARROW,
142+
expected_session=bigquery_storage.ReadSession(
143+
table=destination_table,
144+
data_format=bigquery_storage.DataFormat.ARROW,
145+
**table_read_options_kwarg,
128146
)
129147
bqstorage_client.create_read_session.assert_called_once_with(
130148
parent="projects/test-project",
@@ -431,7 +449,7 @@ def test_to_dataframe_ddl_query():
431449
@pytest.mark.skipif(
432450
bigquery_storageisNone,reason="Requires `google-cloud-bigquery-storage`"
433451
)
434-
deftest_to_dataframe_bqstorage():
452+
deftest_to_dataframe_bqstorage(table_read_options_kwarg):
435453
fromgoogle.cloud.bigquery.jobimportQueryJobastarget_class
436454

437455
resource=_make_job_resource(job_type="query",ended=True)
@@ -468,8 +486,10 @@ def test_to_dataframe_bqstorage():
468486
destination_table="projects/{projectId}/datasets/{datasetId}/tables/{tableId}".format(
469487
**resource["configuration"]["query"]["destinationTable"]
470488
)
471-
expected_session=bigquery_storage.types.ReadSession(
472-
table=destination_table,data_format=bigquery_storage.types.DataFormat.ARROW,
489+
expected_session=bigquery_storage.ReadSession(
490+
table=destination_table,
491+
data_format=bigquery_storage.DataFormat.ARROW,
492+
**table_read_options_kwarg,
473493
)
474494
bqstorage_client.create_read_session.assert_called_once_with(
475495
parent=f"projects/{client.project}",
@@ -478,6 +498,52 @@ def test_to_dataframe_bqstorage():
478498
)
479499

480500

501+
@pytest.mark.skipif(pandasisNone,reason="Requires `pandas`")
502+
@pytest.mark.skipif(
503+
bigquery_storageisNone,reason="Requires `google-cloud-bigquery-storage`"
504+
)
505+
deftest_to_dataframe_bqstorage_no_pyarrow_compression():
506+
fromgoogle.cloud.bigquery.jobimportQueryJobastarget_class
507+
508+
resource=_make_job_resource(job_type="query",ended=True)
509+
query_resource= {
510+
"jobComplete":True,
511+
"jobReference":resource["jobReference"],
512+
"totalRows":"4",
513+
"schema": {"fields": [{"name":"name","type":"STRING","mode":"NULLABLE"}]},
514+
}
515+
connection=_make_connection(query_resource)
516+
client=_make_client(connection=connection)
517+
job=target_class.from_api_repr(resource,client)
518+
bqstorage_client=mock.create_autospec(bigquery_storage.BigQueryReadClient)
519+
session=bigquery_storage.types.ReadSession()
520+
session.avro_schema.schema=json.dumps(
521+
{
522+
"type":"record",
523+
"name":"__root__",
524+
"fields": [{"name":"name","type": ["null","string"]}],
525+
}
526+
)
527+
bqstorage_client.create_read_session.return_value=session
528+
529+
withmock.patch(
530+
"google.cloud.bigquery._pandas_helpers._ARROW_COMPRESSION_SUPPORT",new=False
531+
):
532+
job.to_dataframe(bqstorage_client=bqstorage_client)
533+
534+
destination_table="projects/{projectId}/datasets/{datasetId}/tables/{tableId}".format(
535+
**resource["configuration"]["query"]["destinationTable"]
536+
)
537+
expected_session=bigquery_storage.ReadSession(
538+
table=destination_table,data_format=bigquery_storage.DataFormat.ARROW,
539+
)
540+
bqstorage_client.create_read_session.assert_called_once_with(
541+
parent=f"projects/{client.project}",
542+
read_session=expected_session,
543+
max_stream_count=0,
544+
)
545+
546+
481547
@pytest.mark.skipif(pandasisNone,reason="Requires `pandas`")
482548
deftest_to_dataframe_column_dtypes():
483549
fromgoogle.cloud.bigquery.jobimportQueryJobastarget_class

‎tests/unit/test_dbapi_cursor.py‎

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ def _mock_job(
123123
schema=schema,
124124
num_dml_affected_rows=num_dml_affected_rows,
125125
)
126+
mock_job.destination.project="P"
126127
mock_job.destination.to_bqstorage.return_value= (
127128
"projects/P/datasets/DS/tables/T"
128129
)
@@ -380,6 +381,52 @@ def test_fetchall_w_bqstorage_client_fetch_error_no_fallback(self):
380381
# the default client was not used
381382
mock_client.list_rows.assert_not_called()
382383

384+
@unittest.skipIf(
385+
bigquery_storageisNone,"Requires `google-cloud-bigquery-storage`"
386+
)
387+
@unittest.skipIf(pyarrowisNone,"Requires `pyarrow`")
388+
deftest_fetchall_w_bqstorage_client_no_arrow_compression(self):
389+
fromgoogle.cloud.bigqueryimportdbapi
390+
fromgoogle.cloud.bigqueryimporttable
391+
392+
# Use unordered data to also test any non-determenistic key order in dicts.
393+
row_data= [table.Row([1.2,1.1], {"bar":1,"foo":0})]
394+
bqstorage_streamed_rows= [{"bar":_to_pyarrow(1.2),"foo":_to_pyarrow(1.1)}]
395+
396+
mock_client=self._mock_client(rows=row_data)
397+
mock_bqstorage_client=self._mock_bqstorage_client(
398+
stream_count=1,rows=bqstorage_streamed_rows,
399+
)
400+
401+
connection=dbapi.connect(
402+
client=mock_client,bqstorage_client=mock_bqstorage_client,
403+
)
404+
cursor=connection.cursor()
405+
cursor.execute("SELECT foo, bar FROM some_table")
406+
407+
withmock.patch(
408+
"google.cloud.bigquery.dbapi.cursor._ARROW_COMPRESSION_SUPPORT",new=False
409+
):
410+
rows=cursor.fetchall()
411+
412+
mock_client.list_rows.assert_not_called()# The default client was not used.
413+
414+
# Check the BQ Storage session config.
415+
expected_session=bigquery_storage.ReadSession(
416+
table="projects/P/datasets/DS/tables/T",
417+
data_format=bigquery_storage.DataFormat.ARROW,
418+
)
419+
mock_bqstorage_client.create_read_session.assert_called_once_with(
420+
parent="projects/P",read_session=expected_session,max_stream_count=1
421+
)
422+
423+
# Check the data returned.
424+
field_value=op.itemgetter(1)
425+
sorted_row_data= [sorted(row.items(),key=field_value)forrowinrows]
426+
expected_row_data= [[("foo",1.1), ("bar",1.2)]]
427+
428+
self.assertEqual(sorted_row_data,expected_row_data)
429+
383430
deftest_execute_custom_job_id(self):
384431
fromgoogle.cloud.bigquery.dbapiimportconnect
385432

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp