Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit519d99c

Browse files
author
Jim Fulton
authored
feat: retry failed query jobs inresult() (#837)
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:- [x] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-bigquery/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea- [x] Ensure the tests and linter pass- [x] Code coverage does not decrease (if any source code was changed)- [x] Appropriate docs were updated (if necessary)Fixes#539 🦕Previously, we only retried failed API requests. Now, we retry failed jobs (according to the predicate of the `Retry` object passed to `job.result()`).
1 parent93d15e2 commit519d99c

File tree

6 files changed

+518
-39
lines changed

6 files changed

+518
-39
lines changed

‎google/cloud/bigquery/client.py‎

Lines changed: 81 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@
8686
fromgoogle.cloud.bigquery.modelimportModelReference
8787
fromgoogle.cloud.bigquery.modelimport_model_arg_to_model_ref
8888
fromgoogle.cloud.bigquery.queryimport_QueryResults
89-
fromgoogle.cloud.bigquery.retryimportDEFAULT_RETRY
89+
fromgoogle.cloud.bigquery.retryimportDEFAULT_RETRY,DEFAULT_JOB_RETRY
9090
fromgoogle.cloud.bigquery.routineimportRoutine
9191
fromgoogle.cloud.bigquery.routineimportRoutineReference
9292
fromgoogle.cloud.bigquery.schemaimportSchemaField
@@ -3163,6 +3163,7 @@ def query(
31633163
project:str=None,
31643164
retry:retries.Retry=DEFAULT_RETRY,
31653165
timeout:float=None,
3166+
job_retry:retries.Retry=DEFAULT_JOB_RETRY,
31663167
)->job.QueryJob:
31673168
"""Run a SQL query.
31683169
@@ -3192,30 +3193,59 @@ def query(
31923193
Project ID of the project of where to run the job. Defaults
31933194
to the client's project.
31943195
retry (Optional[google.api_core.retry.Retry]):
3195-
How to retry the RPC.
3196+
How to retry the RPC. This only applies to making RPC
3197+
calls. It isn't used to retry failed jobs. This has
3198+
a reasonable default that should only be overridden
3199+
with care.
31963200
timeout (Optional[float]):
31973201
The number of seconds to wait for the underlying HTTP transport
31983202
before using ``retry``.
3203+
job_retry (Optional[google.api_core.retry.Retry]):
3204+
How to retry failed jobs. The default retries
3205+
rate-limit-exceeded errors. Passing ``None`` disables
3206+
job retry.
3207+
3208+
Not all jobs can be retried. If ``job_id`` is
3209+
provided, then the job returned by the query will not
3210+
be retryable, and an exception will be raised if a
3211+
non-``None`` (and non-default) value for ``job_retry``
3212+
is also provided.
3213+
3214+
Note that errors aren't detected until ``result()`` is
3215+
called on the job returned. The ``job_retry``
3216+
specified here becomes the default ``job_retry`` for
3217+
``result()``, where it can also be specified.
31993218
32003219
Returns:
32013220
google.cloud.bigquery.job.QueryJob: A new query job instance.
32023221
32033222
Raises:
32043223
TypeError:
3205-
If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.QueryJobConfig`
3206-
class.
3224+
If ``job_config`` is not an instance of
3225+
:class:`~google.cloud.bigquery.job.QueryJobConfig`
3226+
class, or if both ``job_id`` and non-``None`` non-default
3227+
``job_retry`` are provided.
32073228
"""
32083229
job_id_given=job_idisnotNone
3209-
job_id=_make_job_id(job_id,job_id_prefix)
3230+
if (
3231+
job_id_given
3232+
andjob_retryisnotNone
3233+
andjob_retryisnotDEFAULT_JOB_RETRY
3234+
):
3235+
raiseTypeError(
3236+
"`job_retry` was provided, but the returned job is"
3237+
" not retryable, because a custom `job_id` was"
3238+
" provided."
3239+
)
3240+
3241+
job_id_save=job_id
32103242

32113243
ifprojectisNone:
32123244
project=self.project
32133245

32143246
iflocationisNone:
32153247
location=self.location
32163248

3217-
job_config=copy.deepcopy(job_config)
3218-
32193249
ifself._default_query_job_config:
32203250
ifjob_config:
32213251
_verify_job_config_type(
@@ -3225,6 +3255,8 @@ def query(
32253255
# that is in the default,
32263256
# should be filled in with the default
32273257
# the incoming therefore has precedence
3258+
#
3259+
# Note that _fill_from_default doesn't mutate the receiver
32283260
job_config=job_config._fill_from_default(
32293261
self._default_query_job_config
32303262
)
@@ -3233,34 +3265,54 @@ def query(
32333265
self._default_query_job_config,
32343266
google.cloud.bigquery.job.QueryJobConfig,
32353267
)
3236-
job_config=copy.deepcopy(self._default_query_job_config)
3268+
job_config=self._default_query_job_config
32373269

3238-
job_ref=job._JobReference(job_id,project=project,location=location)
3239-
query_job=job.QueryJob(job_ref,query,client=self,job_config=job_config)
3270+
# Note that we haven't modified the original job_config (or
3271+
# _default_query_job_config) up to this point.
3272+
job_config_save=job_config
32403273

3241-
try:
3242-
query_job._begin(retry=retry,timeout=timeout)
3243-
exceptcore_exceptions.Conflictascreate_exc:
3244-
# The thought is if someone is providing their own job IDs and they get
3245-
# their job ID generation wrong, this could end up returning results for
3246-
# the wrong query. We thus only try to recover if job ID was not given.
3247-
ifjob_id_given:
3248-
raisecreate_exc
3274+
defdo_query():
3275+
# Make a copy now, so that original doesn't get changed by the process
3276+
# below and to facilitate retry
3277+
job_config=copy.deepcopy(job_config_save)
3278+
3279+
job_id=_make_job_id(job_id_save,job_id_prefix)
3280+
job_ref=job._JobReference(job_id,project=project,location=location)
3281+
query_job=job.QueryJob(job_ref,query,client=self,job_config=job_config)
32493282

32503283
try:
3251-
query_job=self.get_job(
3252-
job_id,
3253-
project=project,
3254-
location=location,
3255-
retry=retry,
3256-
timeout=timeout,
3257-
)
3258-
exceptcore_exceptions.GoogleAPIError:# (includes RetryError)
3259-
raisecreate_exc
3284+
query_job._begin(retry=retry,timeout=timeout)
3285+
exceptcore_exceptions.Conflictascreate_exc:
3286+
# The thought is if someone is providing their own job IDs and they get
3287+
# their job ID generation wrong, this could end up returning results for
3288+
# the wrong query. We thus only try to recover if job ID was not given.
3289+
ifjob_id_given:
3290+
raisecreate_exc
3291+
3292+
try:
3293+
query_job=self.get_job(
3294+
job_id,
3295+
project=project,
3296+
location=location,
3297+
retry=retry,
3298+
timeout=timeout,
3299+
)
3300+
exceptcore_exceptions.GoogleAPIError:# (includes RetryError)
3301+
raisecreate_exc
3302+
else:
3303+
returnquery_job
32603304
else:
32613305
returnquery_job
3262-
else:
3263-
returnquery_job
3306+
3307+
future=do_query()
3308+
# The future might be in a failed state now, but if it's
3309+
# unrecoverable, we'll find out when we ask for it's result, at which
3310+
# point, we may retry.
3311+
ifnotjob_id_given:
3312+
future._retry_do_query=do_query# in case we have to retry later
3313+
future._job_retry=job_retry
3314+
3315+
returnfuture
32643316

32653317
definsert_rows(
32663318
self,

‎google/cloud/bigquery/job/query.py‎

Lines changed: 74 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
fromgoogle.cloud.bigquery.queryimportScalarQueryParameter
3737
fromgoogle.cloud.bigquery.queryimportStructQueryParameter
3838
fromgoogle.cloud.bigquery.queryimportUDFResource
39-
fromgoogle.cloud.bigquery.retryimportDEFAULT_RETRY
39+
fromgoogle.cloud.bigquery.retryimportDEFAULT_RETRY,DEFAULT_JOB_RETRY
4040
fromgoogle.cloud.bigquery.routineimportRoutineReference
4141
fromgoogle.cloud.bigquery.tableimport_EmptyRowIterator
4242
fromgoogle.cloud.bigquery.tableimportRangePartitioning
@@ -1260,6 +1260,7 @@ def result(
12601260
retry:"retries.Retry"=DEFAULT_RETRY,
12611261
timeout:float=None,
12621262
start_index:int=None,
1263+
job_retry:"retries.Retry"=DEFAULT_JOB_RETRY,
12631264
)->Union["RowIterator",_EmptyRowIterator]:
12641265
"""Start the job and wait for it to complete and get the result.
12651266
@@ -1270,16 +1271,30 @@ def result(
12701271
max_results (Optional[int]):
12711272
The maximum total number of rows from this request.
12721273
retry (Optional[google.api_core.retry.Retry]):
1273-
How to retry the call that retrieves rows. If the job state is
1274-
``DONE``, retrying is aborted early even if the results are not
1275-
available, as this will not change anymore.
1274+
How to retry the call that retrieves rows. This only
1275+
applies to making RPC calls. It isn't used to retry
1276+
failed jobs. This has a reasonable default that
1277+
should only be overridden with care. If the job state
1278+
is ``DONE``, retrying is aborted early even if the
1279+
results are not available, as this will not change
1280+
anymore.
12761281
timeout (Optional[float]):
12771282
The number of seconds to wait for the underlying HTTP transport
12781283
before using ``retry``.
12791284
If multiple requests are made under the hood, ``timeout``
12801285
applies to each individual request.
12811286
start_index (Optional[int]):
12821287
The zero-based index of the starting row to read.
1288+
job_retry (Optional[google.api_core.retry.Retry]):
1289+
How to retry failed jobs. The default retries
1290+
rate-limit-exceeded errors. Passing ``None`` disables
1291+
job retry.
1292+
1293+
Not all jobs can be retried. If ``job_id`` was
1294+
provided to the query that created this job, then the
1295+
job returned by the query will not be retryable, and
1296+
an exception will be raised if non-``None``
1297+
non-default ``job_retry`` is also provided.
12831298
12841299
Returns:
12851300
google.cloud.bigquery.table.RowIterator:
@@ -1295,17 +1310,66 @@ def result(
12951310
12961311
Raises:
12971312
google.cloud.exceptions.GoogleAPICallError:
1298-
If the job failed.
1313+
If the job failed and retries aren't successful.
12991314
concurrent.futures.TimeoutError:
13001315
If the job did not complete in the given timeout.
1316+
TypeError:
1317+
If Non-``None`` and non-default ``job_retry`` is
1318+
provided and the job is not retryable.
13011319
"""
13021320
try:
1303-
super(QueryJob,self).result(retry=retry,timeout=timeout)
1321+
retry_do_query=getattr(self,"_retry_do_query",None)
1322+
ifretry_do_queryisnotNone:
1323+
ifjob_retryisDEFAULT_JOB_RETRY:
1324+
job_retry=self._job_retry
1325+
else:
1326+
ifjob_retryisnotNoneandjob_retryisnotDEFAULT_JOB_RETRY:
1327+
raiseTypeError(
1328+
"`job_retry` was provided, but this job is"
1329+
" not retryable, because a custom `job_id` was"
1330+
" provided to the query that created this job."
1331+
)
1332+
1333+
first=True
1334+
1335+
defdo_get_result():
1336+
nonlocalfirst
1337+
1338+
iffirst:
1339+
first=False
1340+
else:
1341+
# Note that we won't get here if retry_do_query is
1342+
# None, because we won't use a retry.
1343+
1344+
# The orinal job is failed. Create a new one.
1345+
job=retry_do_query()
1346+
1347+
# If it's already failed, we might as well stop:
1348+
ifjob.done()andjob.exception()isnotNone:
1349+
raisejob.exception()
1350+
1351+
# Become the new job:
1352+
self.__dict__.clear()
1353+
self.__dict__.update(job.__dict__)
1354+
1355+
# This shouldn't be necessary, because once we have a good
1356+
# job, it should stay good,and we shouldn't have to retry.
1357+
# But let's be paranoid. :)
1358+
self._retry_do_query=retry_do_query
1359+
self._job_retry=job_retry
1360+
1361+
super(QueryJob,self).result(retry=retry,timeout=timeout)
1362+
1363+
# Since the job could already be "done" (e.g. got a finished job
1364+
# via client.get_job), the superclass call to done() might not
1365+
# set the self._query_results cache.
1366+
self._reload_query_results(retry=retry,timeout=timeout)
1367+
1368+
ifretry_do_queryisnotNoneandjob_retryisnotNone:
1369+
do_get_result=job_retry(do_get_result)
1370+
1371+
do_get_result()
13041372

1305-
# Since the job could already be "done" (e.g. got a finished job
1306-
# via client.get_job), the superclass call to done() might not
1307-
# set the self._query_results cache.
1308-
self._reload_query_results(retry=retry,timeout=timeout)
13091373
exceptexceptions.GoogleAPICallErrorasexc:
13101374
exc.message+=self._format_for_exception(self.query,self.job_id)
13111375
exc.query_job=self

‎google/cloud/bigquery/retry.py‎

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
auth_exceptions.TransportError,
3333
)
3434

35+
_DEFAULT_JOB_DEADLINE=60.0*10.0# seconds
36+
3537

3638
def_should_retry(exc):
3739
"""Predicate for determining when to retry.
@@ -56,3 +58,21 @@ def _should_retry(exc):
5658
on ``DEFAULT_RETRY``. For example, to change the deadline to 30 seconds,
5759
pass ``retry=bigquery.DEFAULT_RETRY.with_deadline(30)``.
5860
"""
61+
62+
job_retry_reasons="rateLimitExceeded","backendError"
63+
64+
65+
def_job_should_retry(exc):
66+
ifnothasattr(exc,"errors")orlen(exc.errors)==0:
67+
returnFalse
68+
69+
reason=exc.errors[0]["reason"]
70+
returnreasoninjob_retry_reasons
71+
72+
73+
DEFAULT_JOB_RETRY=retry.Retry(
74+
predicate=_job_should_retry,deadline=_DEFAULT_JOB_DEADLINE
75+
)
76+
"""
77+
The default job retry object.
78+
"""

‎tests/system/test_job_retry.py‎

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# Copyright 2021 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
importcontextlib
16+
importthreading
17+
importtime
18+
19+
importgoogle.api_core.exceptions
20+
importgoogle.cloud.bigquery
21+
importpytest
22+
23+
24+
defthread(func):
25+
thread=threading.Thread(target=func,daemon=True)
26+
thread.start()
27+
returnthread
28+
29+
30+
@pytest.mark.parametrize("job_retry_on_query", [True,False])
31+
deftest_query_retry_539(bigquery_client,dataset_id,job_retry_on_query):
32+
"""
33+
Test job_retry
34+
35+
See: https://github.com/googleapis/python-bigquery/issues/539
36+
"""
37+
fromgoogle.api_coreimportexceptions
38+
fromgoogle.api_core.retryimportif_exception_type,Retry
39+
40+
table_name=f"{dataset_id}.t539"
41+
42+
# Without a custom retry, we fail:
43+
withpytest.raises(google.api_core.exceptions.NotFound):
44+
bigquery_client.query(f"select count(*) from{table_name}").result()
45+
46+
retry_notfound=Retry(predicate=if_exception_type(exceptions.NotFound))
47+
48+
job_retry=dict(job_retry=retry_notfound)ifjob_retry_on_queryelse {}
49+
job=bigquery_client.query(f"select count(*) from{table_name}",**job_retry)
50+
job_id=job.job_id
51+
52+
# We can already know that the job failed, but we're not supposed
53+
# to find out until we call result, which is where retry happend
54+
assertjob.done()
55+
assertjob.exception()isnotNone
56+
57+
@thread
58+
defcreate_table():
59+
time.sleep(1)# Give the first retry attempt time to fail.
60+
withcontextlib.closing(google.cloud.bigquery.Client())asclient:
61+
client.query(f"create table{table_name} (id int64)").result()
62+
63+
job_retry= {}ifjob_retry_on_queryelsedict(job_retry=retry_notfound)
64+
[[count]]=list(job.result(**job_retry))
65+
assertcount==0
66+
67+
# The job was retried, and thus got a new job id
68+
assertjob.job_id!=job_id
69+
70+
# Make sure we don't leave a thread behind:
71+
create_table.join()
72+
bigquery_client.query(f"drop table{table_name}").result()

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp