Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1e59083

Browse files
authored
fix: support ARRAY data type when loading from DataFrame with Parquet (#980)
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:- [x] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-bigquery/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea- [x] Ensure the tests and linter pass- [x] Code coverage does not decrease (if any source code was changed)- [x] Appropriate docs were updated (if necessary)Fixes#19 🦕
1 parentaacc521 commit1e59083

File tree

5 files changed

+483
-45
lines changed

5 files changed

+483
-45
lines changed

‎google/cloud/bigquery/_helpers.py‎

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,9 @@ def verify_version(self):
107107
classPyarrowVersions:
108108
"""Version comparisons for pyarrow package."""
109109

110+
# https://github.com/googleapis/python-bigquery/issues/781#issuecomment-883497414
111+
_PYARROW_BAD_VERSIONS=frozenset([packaging.version.Version("2.0.0")])
112+
110113
def__init__(self):
111114
self._installed_version=None
112115

@@ -126,6 +129,14 @@ def installed_version(self) -> packaging.version.Version:
126129

127130
returnself._installed_version
128131

132+
@property
133+
defis_bad_version(self)->bool:
134+
returnself.installed_versioninself._PYARROW_BAD_VERSIONS
135+
136+
@property
137+
defuse_compliant_nested_type(self)->bool:
138+
returnself.installed_version.major>=4
139+
129140
deftry_import(self,raise_if_error:bool=False)->Any:
130141
"""Verify that a recent enough version of pyarrow extra is
131142
installed.

‎google/cloud/bigquery/_pandas_helpers.py‎

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ def _to_wkb(v):
7979
_PANDAS_DTYPE_TO_BQ= {
8080
"bool":"BOOLEAN",
8181
"datetime64[ns, UTC]":"TIMESTAMP",
82-
#BigQuery does not support uploadingDATETIMEvalues from Parquet files.
83-
#See:https://github.com/googleapis/google-cloud-python/issues/9996
82+
#TODO: Update toDATETIMEin V3
83+
# https://github.com/googleapis/python-bigquery/issues/985
8484
"datetime64[ns]":"TIMESTAMP",
8585
"float32":"FLOAT",
8686
"float64":"FLOAT",
@@ -396,7 +396,7 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
396396
# column, but it was not found.
397397
ifbq_schema_unused:
398398
raiseValueError(
399-
u"bq_schema contains fields not present in dataframe: {}".format(
399+
"bq_schema contains fields not present in dataframe: {}".format(
400400
bq_schema_unused
401401
)
402402
)
@@ -405,7 +405,7 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
405405
# pyarrow, if available.
406406
ifunknown_type_fields:
407407
ifnotpyarrow:
408-
msg=u"Could not determine the type of columns: {}".format(
408+
msg="Could not determine the type of columns: {}".format(
409409
", ".join(field.nameforfieldinunknown_type_fields)
410410
)
411411
warnings.warn(msg)
@@ -444,7 +444,14 @@ def augment_schema(dataframe, current_bq_schema):
444444
continue
445445

446446
arrow_table=pyarrow.array(dataframe[field.name])
447-
detected_type=ARROW_SCALAR_IDS_TO_BQ.get(arrow_table.type.id)
447+
448+
ifpyarrow.types.is_list(arrow_table.type):
449+
# `pyarrow.ListType`
450+
detected_mode="REPEATED"
451+
detected_type=ARROW_SCALAR_IDS_TO_BQ.get(arrow_table.values.type.id)
452+
else:
453+
detected_mode=field.mode
454+
detected_type=ARROW_SCALAR_IDS_TO_BQ.get(arrow_table.type.id)
448455

449456
ifdetected_typeisNone:
450457
unknown_type_fields.append(field)
@@ -453,15 +460,15 @@ def augment_schema(dataframe, current_bq_schema):
453460
new_field=schema.SchemaField(
454461
name=field.name,
455462
field_type=detected_type,
456-
mode=field.mode,
463+
mode=detected_mode,
457464
description=field.description,
458465
fields=field.fields,
459466
)
460467
augmented_schema.append(new_field)
461468

462469
ifunknown_type_fields:
463470
warnings.warn(
464-
u"Pyarrow could not determine the type of columns: {}.".format(
471+
"Pyarrow could not determine the type of columns: {}.".format(
465472
", ".join(field.nameforfieldinunknown_type_fields)
466473
)
467474
)
@@ -500,7 +507,7 @@ def dataframe_to_arrow(dataframe, bq_schema):
500507
extra_fields=bq_field_names-column_and_index_names
501508
ifextra_fields:
502509
raiseValueError(
503-
u"bq_schema contains fields not present in dataframe: {}".format(
510+
"bq_schema contains fields not present in dataframe: {}".format(
504511
extra_fields
505512
)
506513
)
@@ -510,7 +517,7 @@ def dataframe_to_arrow(dataframe, bq_schema):
510517
missing_fields=column_names-bq_field_names
511518
ifmissing_fields:
512519
raiseValueError(
513-
u"bq_schema is missing fields from dataframe: {}".format(missing_fields)
520+
"bq_schema is missing fields from dataframe: {}".format(missing_fields)
514521
)
515522

516523
arrow_arrays= []
@@ -530,7 +537,13 @@ def dataframe_to_arrow(dataframe, bq_schema):
530537
returnpyarrow.Table.from_arrays(arrow_arrays,names=arrow_names)
531538

532539

533-
defdataframe_to_parquet(dataframe,bq_schema,filepath,parquet_compression="SNAPPY"):
540+
defdataframe_to_parquet(
541+
dataframe,
542+
bq_schema,
543+
filepath,
544+
parquet_compression="SNAPPY",
545+
parquet_use_compliant_nested_type=True,
546+
):
534547
"""Write dataframe as a Parquet file, according to the desired BQ schema.
535548
536549
This function requires the :mod:`pyarrow` package. Arrow is used as an
@@ -551,14 +564,29 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SN
551564
The compression codec to use by the the ``pyarrow.parquet.write_table``
552565
serializing method. Defaults to "SNAPPY".
553566
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
567+
parquet_use_compliant_nested_type (bool):
568+
Whether the ``pyarrow.parquet.write_table`` serializing method should write
569+
compliant Parquet nested type (lists). Defaults to ``True``.
570+
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#nested-types
571+
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
572+
573+
This argument is ignored for ``pyarrow`` versions earlier than ``4.0.0``.
554574
"""
555575
pyarrow=_helpers.PYARROW_VERSIONS.try_import(raise_if_error=True)
556576

557577
importpyarrow.parquet
558578

579+
kwargs= (
580+
{"use_compliant_nested_type":parquet_use_compliant_nested_type}
581+
if_helpers.PYARROW_VERSIONS.use_compliant_nested_type
582+
else {}
583+
)
584+
559585
bq_schema=schema._to_schema_fields(bq_schema)
560586
arrow_table=dataframe_to_arrow(dataframe,bq_schema)
561-
pyarrow.parquet.write_table(arrow_table,filepath,compression=parquet_compression)
587+
pyarrow.parquet.write_table(
588+
arrow_table,filepath,compression=parquet_compression,**kwargs,
589+
)
562590

563591

564592
def_row_iterator_page_to_arrow(page,column_names,arrow_types):

‎google/cloud/bigquery/client.py‎

Lines changed: 43 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,11 @@
2727
importjson
2828
importmath
2929
importos
30-
importpackaging.version
3130
importtempfile
3231
fromtypingimportAny,BinaryIO,Dict,Iterable,Optional,Sequence,Tuple,Union
3332
importuuid
3433
importwarnings
3534

36-
try:
37-
importpyarrow
38-
39-
_PYARROW_VERSION=packaging.version.parse(pyarrow.__version__)
40-
exceptImportError:# pragma: NO COVER
41-
pyarrow=None
42-
4335
fromgoogleimportresumable_media# type: ignore
4436
fromgoogle.resumable_media.requestsimportMultipartUpload
4537
fromgoogle.resumable_media.requestsimportResumableUpload
@@ -103,6 +95,10 @@
10395
fromgoogle.cloud.bigquery.tableimportTableListItem
10496
fromgoogle.cloud.bigquery.tableimportTableReference
10597
fromgoogle.cloud.bigquery.tableimportRowIterator
98+
fromgoogle.cloud.bigquery.format_optionsimportParquetOptions
99+
fromgoogle.cloud.bigqueryimport_helpers
100+
101+
pyarrow=_helpers.PYARROW_VERSIONS.try_import()
106102

107103

108104
_DEFAULT_CHUNKSIZE=100*1024*1024# 100 MB
@@ -128,8 +124,6 @@
128124
# https://github.com/googleapis/python-bigquery/issues/438
129125
_MIN_GET_QUERY_RESULTS_TIMEOUT=120
130126

131-
# https://github.com/googleapis/python-bigquery/issues/781#issuecomment-883497414
132-
_PYARROW_BAD_VERSIONS=frozenset([packaging.version.Version("2.0.0")])
133127

134128
TIMEOUT_HEADER="X-Server-Timeout"
135129

@@ -2469,10 +2463,10 @@ def load_table_from_dataframe(
24692463
They are supported when using the PARQUET source format, but
24702464
due to the way they are encoded in the ``parquet`` file,
24712465
a mismatch with the existing table schema can occur, so
2472-
100% compatibility cannot be guaranteed for REPEATED fields when
2466+
REPEATED fields are not properly supported when using ``pyarrow<4.0.0``
24732467
using the parquet format.
24742468
2475-
https://github.com/googleapis/python-bigquery/issues/17
2469+
https://github.com/googleapis/python-bigquery/issues/19
24762470
24772471
Args:
24782472
dataframe (pandas.DataFrame):
@@ -2519,18 +2513,18 @@ def load_table_from_dataframe(
25192513
:attr:`~google.cloud.bigquery.job.SourceFormat.PARQUET` are
25202514
supported.
25212515
parquet_compression (Optional[str]):
2522-
[Beta] The compression method to use if intermittently
2523-
serializing ``dataframe`` to a parquet file.
2524-
2525-
The argument is directly passed as the ``compression``
2526-
argument to the underlying ``pyarrow.parquet.write_table()``
2527-
method (the default value "snappy" gets converted to uppercase).
2528-
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
2529-
2530-
If the job config schema is missing, the argument is directly
2531-
passed as the ``compression`` argument to the underlying
2532-
``DataFrame.to_parquet()`` method.
2533-
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet
2516+
[Beta] The compression method to use if intermittently
2517+
serializing ``dataframe`` to a parquet file.
2518+
2519+
The argument is directly passed as the ``compression``
2520+
argument to the underlying ``pyarrow.parquet.write_table()``
2521+
method (the default value "snappy" gets converted to uppercase).
2522+
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
2523+
2524+
If the job config schema is missing, the argument is directly
2525+
passed as the ``compression`` argument to the underlying
2526+
``DataFrame.to_parquet()`` method.
2527+
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet
25342528
timeout (Optional[float]):
25352529
The number of seconds to wait for the underlying HTTP transport
25362530
before using ``retry``.
@@ -2562,6 +2556,16 @@ def load_table_from_dataframe(
25622556
ifjob_config.source_formatisNone:
25632557
# default value
25642558
job_config.source_format=job.SourceFormat.PARQUET
2559+
2560+
if (
2561+
job_config.source_format==job.SourceFormat.PARQUET
2562+
andjob_config.parquet_optionsisNone
2563+
):
2564+
parquet_options=ParquetOptions()
2565+
# default value
2566+
parquet_options.enable_list_inference=True
2567+
job_config.parquet_options=parquet_options
2568+
25652569
ifjob_config.source_formatnotinsupported_formats:
25662570
raiseValueError(
25672571
"Got unexpected source_format: '{}'. Currently, only PARQUET and CSV are supported".format(
@@ -2628,12 +2632,12 @@ def load_table_from_dataframe(
26282632
try:
26292633

26302634
ifjob_config.source_format==job.SourceFormat.PARQUET:
2631-
if_PYARROW_VERSIONin_PYARROW_BAD_VERSIONS:
2635+
if_helpers.PYARROW_VERSIONS.is_bad_version:
26322636
msg= (
26332637
"Loading dataframe data in PARQUET format with pyarrow "
2634-
f"{_PYARROW_VERSION} can result in data corruption. It is "
2635-
"therefore *strongly* advised to use a different pyarrow "
2636-
"version or a different source format. "
2638+
f"{_helpers.PYARROW_VERSIONS.installed_version} can result in data "
2639+
"corruption. It istherefore *strongly* advised to use a "
2640+
"different pyarrowversion or a different source format. "
26372641
"See: https://github.com/googleapis/python-bigquery/issues/781"
26382642
)
26392643
warnings.warn(msg,category=RuntimeWarning)
@@ -2647,9 +2651,19 @@ def load_table_from_dataframe(
26472651
job_config.schema,
26482652
tmppath,
26492653
parquet_compression=parquet_compression,
2654+
parquet_use_compliant_nested_type=True,
26502655
)
26512656
else:
2652-
dataframe.to_parquet(tmppath,compression=parquet_compression)
2657+
dataframe.to_parquet(
2658+
tmppath,
2659+
engine="pyarrow",
2660+
compression=parquet_compression,
2661+
**(
2662+
{"use_compliant_nested_type":True}
2663+
if_helpers.PYARROW_VERSIONS.use_compliant_nested_type
2664+
else {}
2665+
),
2666+
)
26532667

26542668
else:
26552669

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp