Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit0f3a341

Browse files
committed
feat: add default LoadJobConfig to Client
1 parentaa0fa02 commit0f3a341

File tree

5 files changed

+621
-56
lines changed

5 files changed

+621
-56
lines changed

‎google/cloud/bigquery/client.py‎

Lines changed: 71 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,9 @@ class Client(ClientWithProject):
210210
default_query_job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]):
211211
Default ``QueryJobConfig``.
212212
Will be merged into job configs passed into the ``query`` method.
213+
default_load_job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]):
214+
Default ``LoadJobConfig``.
215+
Will be merged into job configs passed into the ``load_table_*`` methods.
213216
client_info (Optional[google.api_core.client_info.ClientInfo]):
214217
The client info used to send a user-agent string along with API
215218
requests. If ``None``, then default info will be used. Generally,
@@ -235,6 +238,7 @@ def __init__(
235238
_http=None,
236239
location=None,
237240
default_query_job_config=None,
241+
default_load_job_config=None,
238242
client_info=None,
239243
client_options=None,
240244
)->None:
@@ -260,6 +264,7 @@ def __init__(
260264
self._connection=Connection(self,**kw_args)
261265
self._location=location
262266
self._default_query_job_config=copy.deepcopy(default_query_job_config)
267+
self._default_load_job_config=copy.deepcopy(default_load_job_config)
263268

264269
@property
265270
deflocation(self):
@@ -277,6 +282,17 @@ def default_query_job_config(self):
277282
defdefault_query_job_config(self,value:QueryJobConfig):
278283
self._default_query_job_config=copy.deepcopy(value)
279284

285+
@property
286+
defdefault_load_job_config(self):
287+
"""Default ``LoadJobConfig``.
288+
Will be merged into job configs passed into the ``load_table_*`` methods.
289+
"""
290+
returnself._default_load_job_config
291+
292+
@default_load_job_config.setter
293+
defdefault_load_job_config(self,value:LoadJobConfig):
294+
self._default_load_job_config=copy.deepcopy(value)
295+
280296
defclose(self):
281297
"""Close the underlying transport objects, releasing system resources.
282298
@@ -2330,8 +2346,8 @@ def load_table_from_uri(
23302346
23312347
Raises:
23322348
TypeError:
2333-
If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig`
2334-
class.
2349+
If ``job_config`` is not an instance of
2350+
:class:`~google.cloud.bigquery.job.LoadJobConfig`class.
23352351
"""
23362352
job_id=_make_job_id(job_id,job_id_prefix)
23372353

@@ -2348,11 +2364,14 @@ def load_table_from_uri(
23482364

23492365
destination=_table_arg_to_table_ref(destination,default_project=self.project)
23502366

2351-
ifjob_config:
2352-
job_config=copy.deepcopy(job_config)
2353-
_verify_job_config_type(job_config,google.cloud.bigquery.job.LoadJobConfig)
2367+
ifjob_configisnotNone:
2368+
_verify_job_config_type(job_config,LoadJobConfig)
2369+
else:
2370+
job_config=job.LoadJobConfig()
23542371

2355-
load_job=job.LoadJob(job_ref,source_uris,destination,self,job_config)
2372+
new_job_config=job_config._fill_from_default(self._default_load_job_config)
2373+
2374+
load_job=job.LoadJob(job_ref,source_uris,destination,self,new_job_config)
23562375
load_job._begin(retry=retry,timeout=timeout)
23572376

23582377
returnload_job
@@ -2424,8 +2443,8 @@ def load_table_from_file(
24242443
mode.
24252444
24262445
TypeError:
2427-
If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig`
2428-
class.
2446+
If ``job_config`` is not an instance of
2447+
:class:`~google.cloud.bigquery.job.LoadJobConfig`class.
24292448
"""
24302449
job_id=_make_job_id(job_id,job_id_prefix)
24312450

@@ -2437,10 +2456,15 @@ def load_table_from_file(
24372456

24382457
destination=_table_arg_to_table_ref(destination,default_project=self.project)
24392458
job_ref=job._JobReference(job_id,project=project,location=location)
2440-
ifjob_config:
2441-
job_config=copy.deepcopy(job_config)
2442-
_verify_job_config_type(job_config,google.cloud.bigquery.job.LoadJobConfig)
2443-
load_job=job.LoadJob(job_ref,None,destination,self,job_config)
2459+
2460+
ifjob_configisnotNone:
2461+
_verify_job_config_type(job_config,LoadJobConfig)
2462+
else:
2463+
job_config=job.LoadJobConfig()
2464+
2465+
new_job_config=job_config._fill_from_default(self._default_load_job_config)
2466+
2467+
load_job=job.LoadJob(job_ref,None,destination,self,new_job_config)
24442468
job_resource=load_job.to_api_repr()
24452469

24462470
ifrewind:
@@ -2564,43 +2588,40 @@ def load_table_from_dataframe(
25642588
If a usable parquet engine cannot be found. This method
25652589
requires :mod:`pyarrow` to be installed.
25662590
TypeError:
2567-
If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig`
2568-
class.
2591+
If ``job_config`` is not an instance of
2592+
:class:`~google.cloud.bigquery.job.LoadJobConfig`class.
25692593
"""
25702594
job_id=_make_job_id(job_id,job_id_prefix)
25712595

2572-
ifjob_config:
2573-
_verify_job_config_type(job_config,google.cloud.bigquery.job.LoadJobConfig)
2574-
# Make a copy so that the job config isn't modified in-place.
2575-
job_config_properties=copy.deepcopy(job_config._properties)
2576-
job_config=job.LoadJobConfig()
2577-
job_config._properties=job_config_properties
2578-
2596+
ifjob_configisnotNone:
2597+
_verify_job_config_type(job_config,LoadJobConfig)
25792598
else:
25802599
job_config=job.LoadJobConfig()
25812600

2601+
new_job_config=job_config._fill_from_default(self._default_load_job_config)
2602+
25822603
supported_formats= {job.SourceFormat.CSV,job.SourceFormat.PARQUET}
2583-
ifjob_config.source_formatisNone:
2604+
ifnew_job_config.source_formatisNone:
25842605
# default value
2585-
job_config.source_format=job.SourceFormat.PARQUET
2606+
new_job_config.source_format=job.SourceFormat.PARQUET
25862607

25872608
if (
2588-
job_config.source_format==job.SourceFormat.PARQUET
2589-
andjob_config.parquet_optionsisNone
2609+
new_job_config.source_format==job.SourceFormat.PARQUET
2610+
andnew_job_config.parquet_optionsisNone
25902611
):
25912612
parquet_options=ParquetOptions()
25922613
# default value
25932614
parquet_options.enable_list_inference=True
2594-
job_config.parquet_options=parquet_options
2615+
new_job_config.parquet_options=parquet_options
25952616

2596-
ifjob_config.source_formatnotinsupported_formats:
2617+
ifnew_job_config.source_formatnotinsupported_formats:
25972618
raiseValueError(
25982619
"Got unexpected source_format: '{}'. Currently, only PARQUET and CSV are supported".format(
2599-
job_config.source_format
2620+
new_job_config.source_format
26002621
)
26012622
)
26022623

2603-
ifpyarrowisNoneandjob_config.source_format==job.SourceFormat.PARQUET:
2624+
ifpyarrowisNoneandnew_job_config.source_format==job.SourceFormat.PARQUET:
26042625
# pyarrow is now the only supported parquet engine.
26052626
raiseValueError("This method requires pyarrow to be installed")
26062627

@@ -2611,8 +2632,8 @@ def load_table_from_dataframe(
26112632
# schema, and check if dataframe schema is compatible with it - except
26122633
# for WRITE_TRUNCATE jobs, the existing schema does not matter then.
26132634
if (
2614-
notjob_config.schema
2615-
andjob_config.write_disposition!=job.WriteDisposition.WRITE_TRUNCATE
2635+
notnew_job_config.schema
2636+
andnew_job_config.write_disposition!=job.WriteDisposition.WRITE_TRUNCATE
26162637
):
26172638
try:
26182639
table=self.get_table(destination)
@@ -2623,7 +2644,7 @@ def load_table_from_dataframe(
26232644
name
26242645
forname,_in_pandas_helpers.list_columns_and_indexes(dataframe)
26252646
)
2626-
job_config.schema= [
2647+
new_job_config.schema= [
26272648
# Field description and policy tags are not needed to
26282649
# serialize a data frame.
26292650
SchemaField(
@@ -2637,11 +2658,11 @@ def load_table_from_dataframe(
26372658
iffield.nameincolumns_and_indexes
26382659
]
26392660

2640-
job_config.schema=_pandas_helpers.dataframe_to_bq_schema(
2641-
dataframe,job_config.schema
2661+
new_job_config.schema=_pandas_helpers.dataframe_to_bq_schema(
2662+
dataframe,new_job_config.schema
26422663
)
26432664

2644-
ifnotjob_config.schema:
2665+
ifnotnew_job_config.schema:
26452666
# the schema could not be fully detected
26462667
warnings.warn(
26472668
"Schema could not be detected for all columns. Loading from a "
@@ -2652,13 +2673,13 @@ def load_table_from_dataframe(
26522673
)
26532674

26542675
tmpfd,tmppath=tempfile.mkstemp(
2655-
suffix="_job_{}.{}".format(job_id[:8],job_config.source_format.lower())
2676+
suffix="_job_{}.{}".format(job_id[:8],new_job_config.source_format.lower())
26562677
)
26572678
os.close(tmpfd)
26582679

26592680
try:
26602681

2661-
ifjob_config.source_format==job.SourceFormat.PARQUET:
2682+
ifnew_job_config.source_format==job.SourceFormat.PARQUET:
26622683
if_PYARROW_VERSIONin_PYARROW_BAD_VERSIONS:
26632684
msg= (
26642685
"Loading dataframe data in PARQUET format with pyarrow "
@@ -2669,13 +2690,13 @@ def load_table_from_dataframe(
26692690
)
26702691
warnings.warn(msg,category=RuntimeWarning)
26712692

2672-
ifjob_config.schema:
2693+
ifnew_job_config.schema:
26732694
ifparquet_compression=="snappy":# adjust the default value
26742695
parquet_compression=parquet_compression.upper()
26752696

26762697
_pandas_helpers.dataframe_to_parquet(
26772698
dataframe,
2678-
job_config.schema,
2699+
new_job_config.schema,
26792700
tmppath,
26802701
parquet_compression=parquet_compression,
26812702
parquet_use_compliant_nested_type=True,
@@ -2715,7 +2736,7 @@ def load_table_from_dataframe(
27152736
job_id_prefix=job_id_prefix,
27162737
location=location,
27172738
project=project,
2718-
job_config=job_config,
2739+
job_config=new_job_config,
27192740
timeout=timeout,
27202741
)
27212742

@@ -2791,22 +2812,22 @@ def load_table_from_json(
27912812
27922813
Raises:
27932814
TypeError:
2794-
If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig`
2795-
class.
2815+
If ``job_config`` is not an instance of
2816+
:class:`~google.cloud.bigquery.job.LoadJobConfig`class.
27962817
"""
27972818
job_id=_make_job_id(job_id,job_id_prefix)
27982819

2799-
ifjob_config:
2800-
_verify_job_config_type(job_config,google.cloud.bigquery.job.LoadJobConfig)
2801-
# Make a copy so that the job config isn't modified in-place.
2802-
job_config=copy.deepcopy(job_config)
2820+
ifjob_configisnotNone:
2821+
_verify_job_config_type(job_config,LoadJobConfig)
28032822
else:
28042823
job_config=job.LoadJobConfig()
28052824

2806-
job_config.source_format=job.SourceFormat.NEWLINE_DELIMITED_JSON
2825+
new_job_config=job_config._fill_from_default(self._default_load_job_config)
2826+
2827+
new_job_config.source_format=job.SourceFormat.NEWLINE_DELIMITED_JSON
28072828

2808-
ifjob_config.schemaisNone:
2809-
job_config.autodetect=True
2829+
ifnew_job_config.schemaisNone:
2830+
new_job_config.autodetect=True
28102831

28112832
ifprojectisNone:
28122833
project=self.project
@@ -2828,7 +2849,7 @@ def load_table_from_json(
28282849
job_id_prefix=job_id_prefix,
28292850
location=location,
28302851
project=project,
2831-
job_config=job_config,
2852+
job_config=new_job_config,
28322853
timeout=timeout,
28332854
)
28342855

‎google/cloud/bigquery/job/base.py‎

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ def to_api_repr(self) -> dict:
269269
"""
270270
returncopy.deepcopy(self._properties)
271271

272-
def_fill_from_default(self,default_job_config):
272+
def_fill_from_default(self,default_job_config=None):
273273
"""Merge this job config with a default job config.
274274
275275
The keys in this object take precedence over the keys in the default
@@ -283,6 +283,10 @@ def _fill_from_default(self, default_job_config):
283283
Returns:
284284
google.cloud.bigquery.job._JobConfig: A new (merged) job config.
285285
"""
286+
ifnotdefault_job_config:
287+
new_job_config=copy.deepcopy(self)
288+
returnnew_job_config
289+
286290
ifself._job_type!=default_job_config._job_type:
287291
raiseTypeError(
288292
"attempted to merge two incompatible job types: "

‎tests/system/test_client.py‎

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2319,7 +2319,7 @@ def _table_exists(t):
23192319
returnFalse
23202320

23212321

2322-
deftest_dbapi_create_view(dataset_id):
2322+
deftest_dbapi_create_view(dataset_id:str):
23232323

23242324
query=f"""
23252325
CREATE VIEW{dataset_id}.dbapi_create_view
@@ -2332,7 +2332,7 @@ def test_dbapi_create_view(dataset_id):
23322332
assertConfig.CURSOR.rowcount==0,"expected 0 rows"
23332333

23342334

2335-
deftest_parameterized_types_round_trip(dataset_id):
2335+
deftest_parameterized_types_round_trip(dataset_id:str):
23362336
client=Config.CLIENT
23372337
table_id=f"{dataset_id}.test_parameterized_types_round_trip"
23382338
fields= (
@@ -2358,7 +2358,7 @@ def test_parameterized_types_round_trip(dataset_id):
23582358
asserttuple(s._key()[:2]forsintable2.schema)==fields
23592359

23602360

2361-
deftest_table_snapshots(dataset_id):
2361+
deftest_table_snapshots(dataset_id:str):
23622362
fromgoogle.cloud.bigqueryimportCopyJobConfig
23632363
fromgoogle.cloud.bigqueryimportOperationType
23642364

@@ -2429,7 +2429,7 @@ def test_table_snapshots(dataset_id):
24292429
assertrows== [(1,"one"), (2,"two")]
24302430

24312431

2432-
deftest_table_clones(dataset_id):
2432+
deftest_table_clones(dataset_id:str):
24332433
fromgoogle.cloud.bigqueryimportCopyJobConfig
24342434
fromgoogle.cloud.bigqueryimportOperationType
24352435

‎tests/unit/job/test_base.py‎

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1104,7 +1104,7 @@ def test_ctor_with_unknown_property_raises_error(self):
11041104
config=self._make_one()
11051105
config.wrong_name=None
11061106

1107-
deftest_fill_from_default(self):
1107+
deftest_fill_query_job_config_from_default(self):
11081108
fromgoogle.cloud.bigqueryimportQueryJobConfig
11091109

11101110
job_config=QueryJobConfig()
@@ -1120,6 +1120,22 @@ def test_fill_from_default(self):
11201120
self.assertTrue(final_job_config.use_query_cache)
11211121
self.assertEqual(final_job_config.maximum_bytes_billed,1000)
11221122

1123+
deftest_fill_load_job_from_default(self):
1124+
fromgoogle.cloud.bigqueryimportLoadJobConfig
1125+
1126+
job_config=LoadJobConfig()
1127+
job_config.create_session=True
1128+
job_config.encoding="UTF-8"
1129+
1130+
default_job_config=LoadJobConfig()
1131+
default_job_config.ignore_unknown_values=True
1132+
default_job_config.encoding="ISO-8859-1"
1133+
1134+
final_job_config=job_config._fill_from_default(default_job_config)
1135+
self.assertTrue(final_job_config.create_session)
1136+
self.assertTrue(final_job_config.ignore_unknown_values)
1137+
self.assertEqual(final_job_config.encoding,"UTF-8")
1138+
11231139
deftest_fill_from_default_conflict(self):
11241140
fromgoogle.cloud.bigqueryimportQueryJobConfig
11251141

@@ -1132,6 +1148,17 @@ def test_fill_from_default_conflict(self):
11321148
withself.assertRaises(TypeError):
11331149
basic_job_config._fill_from_default(conflicting_job_config)
11341150

1151+
deftest_fill_from_empty_default_conflict(self):
1152+
fromgoogle.cloud.bigqueryimportQueryJobConfig
1153+
1154+
job_config=QueryJobConfig()
1155+
job_config.dry_run=True
1156+
job_config.maximum_bytes_billed=1000
1157+
1158+
final_job_config=job_config._fill_from_default(default_job_config=None)
1159+
self.assertTrue(final_job_config.dry_run)
1160+
self.assertEqual(final_job_config.maximum_bytes_billed,1000)
1161+
11351162
@mock.patch("google.cloud.bigquery._helpers._get_sub_prop")
11361163
deftest__get_sub_prop_wo_default(self,_get_sub_prop):
11371164
job_config=self._make_one()

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp