Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1367b58

Browse files
fix: avoid unnecessary API call in QueryJob.result() when job is already finished (#1900)
fix: retry query job after ambiguous failuresCo-authored-by: Chalmer Lowe <chalmerlowe@google.com>
1 parentbf8861c commit1367b58

File tree

6 files changed

+547
-230
lines changed

6 files changed

+547
-230
lines changed

‎google/cloud/bigquery/_job_helpers.py‎

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -258,15 +258,16 @@ def _to_query_job(
258258
errors=query_response["errors"]
259259
query_job._properties["status"]["errors"]=errors
260260

261-
#Transform job state so that QueryJob doesn't trytorestartthe query.
261+
#Avoid an extra callto`getQueryResults` ifthe query has finished.
262262
job_complete=query_response.get("jobComplete")
263263
ifjob_complete:
264-
query_job._properties["status"]["state"]="DONE"
265264
query_job._query_results=google.cloud.bigquery.query._QueryResults(
266265
query_response
267266
)
268-
else:
269-
query_job._properties["status"]["state"]="PENDING"
267+
268+
# We want job.result() to refresh the job state, so the conversion is
269+
# always "PENDING", even if the job is finished.
270+
query_job._properties["status"]["state"]="PENDING"
270271

271272
returnquery_job
272273

‎google/cloud/bigquery/job/query.py‎

Lines changed: 111 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717
importconcurrent.futures
1818
importcopy
1919
importre
20+
importtime
2021
importtyping
2122
fromtypingimportAny,Dict,Iterable,List,Optional,Union
2223

2324
fromgoogle.api_coreimportexceptions
24-
fromgoogle.api_core.futureimportpollingaspolling_future
2525
fromgoogle.api_coreimportretryasretries
2626
importrequests
2727

@@ -1383,7 +1383,7 @@ def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None):
13831383
def_reload_query_results(
13841384
self,retry:"retries.Retry"=DEFAULT_RETRY,timeout:Optional[float]=None
13851385
):
1386-
"""Refresh the cached query results.
1386+
"""Refresh the cached query results unless already cached and complete.
13871387
13881388
Args:
13891389
retry (Optional[google.api_core.retry.Retry]):
@@ -1392,6 +1392,8 @@ def _reload_query_results(
13921392
The number of seconds to wait for the underlying HTTP transport
13931393
before using ``retry``.
13941394
"""
1395+
# Optimization: avoid a call to jobs.getQueryResults if it's already
1396+
# been fetched, e.g. from jobs.query first page of results.
13951397
ifself._query_resultsandself._query_results.complete:
13961398
return
13971399

@@ -1430,40 +1432,6 @@ def _reload_query_results(
14301432
timeout=transport_timeout,
14311433
)
14321434

1433-
def_done_or_raise(self,retry=DEFAULT_RETRY,timeout=None):
1434-
"""Check if the query has finished running and raise if it's not.
1435-
1436-
If the query has finished, also reload the job itself.
1437-
"""
1438-
# If an explicit timeout is not given, fall back to the transport timeout
1439-
# stored in _blocking_poll() in the process of polling for job completion.
1440-
transport_timeout=timeoutiftimeoutisnotNoneelseself._transport_timeout
1441-
1442-
try:
1443-
self._reload_query_results(retry=retry,timeout=transport_timeout)
1444-
exceptexceptions.GoogleAPIErrorasexc:
1445-
# Reloading also updates error details on self, thus no need for an
1446-
# explicit self.set_exception() call if reloading succeeds.
1447-
try:
1448-
self.reload(retry=retry,timeout=transport_timeout)
1449-
exceptexceptions.GoogleAPIError:
1450-
# Use the query results reload exception, as it generally contains
1451-
# much more useful error information.
1452-
self.set_exception(exc)
1453-
finally:
1454-
return
1455-
1456-
# Only reload the job once we know the query is complete.
1457-
# This will ensure that fields such as the destination table are
1458-
# correctly populated.
1459-
ifnotself._query_results.complete:
1460-
raisepolling_future._OperationNotComplete()
1461-
else:
1462-
try:
1463-
self.reload(retry=retry,timeout=transport_timeout)
1464-
exceptexceptions.GoogleAPIErrorasexc:
1465-
self.set_exception(exc)
1466-
14671435
defresult(# type: ignore # (incompatible with supertype)
14681436
self,
14691437
page_size:Optional[int]=None,
@@ -1528,6 +1496,10 @@ def result( # type: ignore # (incompatible with supertype)
15281496
If Non-``None`` and non-default ``job_retry`` is
15291497
provided and the job is not retryable.
15301498
"""
1499+
# Note: Since waiting for a query job to finish is more complex than
1500+
# refreshing the job state in a loop, we avoid calling the superclass
1501+
# in this method.
1502+
15311503
ifself.dry_run:
15321504
return_EmptyRowIterator(
15331505
project=self.project,
@@ -1548,46 +1520,124 @@ def result( # type: ignore # (incompatible with supertype)
15481520
" provided to the query that created this job."
15491521
)
15501522

1551-
first=True
1523+
restart_query_job=False
1524+
1525+
defis_job_done():
1526+
nonlocalrestart_query_job
15521527

1553-
defdo_get_result():
1554-
nonlocalfirst
1528+
ifrestart_query_job:
1529+
restart_query_job=False
15551530

1556-
iffirst:
1557-
first=False
1558-
else:
1531+
# The original job has failed. Create a new one.
1532+
#
15591533
# Note that we won't get here if retry_do_query is
15601534
# None, because we won't use a retry.
1561-
1562-
# The orinal job is failed. Create a new one.
15631535
job=retry_do_query()
15641536

1565-
# If it's already failed, we might as well stop:
1566-
ifjob.done()andjob.exception()isnotNone:
1567-
raisejob.exception()
1568-
15691537
# Become the new job:
15701538
self.__dict__.clear()
15711539
self.__dict__.update(job.__dict__)
15721540

1573-
# This shouldn't be necessary, because once we have a good
1574-
# job, it should stay good,and we shouldn't have to retry.
1575-
# But let's be paranoid. :)
1541+
# It's possible the job fails again and we'll have to
1542+
# retry that too.
15761543
self._retry_do_query=retry_do_query
15771544
self._job_retry=job_retry
15781545

1579-
super(QueryJob,self).result(retry=retry,timeout=timeout)
1580-
1581-
# Since the job could already be "done" (e.g. got a finished job
1582-
# via client.get_job), the superclass call to done() might not
1583-
# set the self._query_results cache.
1584-
ifself._query_resultsisNoneornotself._query_results.complete:
1585-
self._reload_query_results(retry=retry,timeout=timeout)
1546+
# Refresh the job status with jobs.get because some of the
1547+
# exceptions thrown by jobs.getQueryResults like timeout and
1548+
# rateLimitExceeded errors are ambiguous. We want to know if
1549+
# the query job failed and not just the call to
1550+
# jobs.getQueryResults.
1551+
ifself.done(retry=retry,timeout=timeout):
1552+
# If it's already failed, we might as well stop.
1553+
job_failed_exception=self.exception()
1554+
ifjob_failed_exceptionisnotNone:
1555+
# Only try to restart the query job if the job failed for
1556+
# a retriable reason. For example, don't restart the query
1557+
# if the call to reload the job metadata within self.done()
1558+
# timed out.
1559+
#
1560+
# The `restart_query_job` must only be called after a
1561+
# successful call to the `jobs.get` REST API and we
1562+
# determine that the job has failed.
1563+
#
1564+
# The `jobs.get` REST API
1565+
# (https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get)
1566+
# is called via `self.done()` which calls
1567+
# `self.reload()`.
1568+
#
1569+
# To determine if the job failed, the `self.exception()`
1570+
# is set from `self.reload()` via
1571+
# `self._set_properties()`, which translates the
1572+
# `Job.status.errorResult` field
1573+
# (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatus.FIELDS.error_result)
1574+
# into an exception that can be processed by the
1575+
# `job_retry` predicate.
1576+
restart_query_job=True
1577+
raisejob_failed_exception
1578+
else:
1579+
# Make sure that the _query_results are cached so we
1580+
# can return a complete RowIterator.
1581+
#
1582+
# Note: As an optimization, _reload_query_results
1583+
# doesn't make any API calls if the query results are
1584+
# already cached and have jobComplete=True in the
1585+
# response from the REST API. This ensures we aren't
1586+
# making any extra API calls if the previous loop
1587+
# iteration fetched the finished job.
1588+
self._reload_query_results(retry=retry,timeout=timeout)
1589+
returnTrue
1590+
1591+
# Call jobs.getQueryResults with max results set to 0 just to
1592+
# wait for the query to finish. Unlike most methods,
1593+
# jobs.getQueryResults hangs as long as it can to ensure we
1594+
# know when the query has finished as soon as possible.
1595+
self._reload_query_results(retry=retry,timeout=timeout)
1596+
1597+
# Even if the query is finished now according to
1598+
# jobs.getQueryResults, we'll want to reload the job status if
1599+
# it's not already DONE.
1600+
returnFalse
15861601

15871602
ifretry_do_queryisnotNoneandjob_retryisnotNone:
1588-
do_get_result=job_retry(do_get_result)
1589-
1590-
do_get_result()
1603+
is_job_done=job_retry(is_job_done)
1604+
1605+
# timeout can be a number of seconds, `None`, or a
1606+
# `google.api_core.future.polling.PollingFuture._DEFAULT_VALUE`
1607+
# sentinel object indicating a default timeout if we choose to add
1608+
# one some day. This value can come from our PollingFuture
1609+
# superclass and was introduced in
1610+
# https://github.com/googleapis/python-api-core/pull/462.
1611+
ifisinstance(timeout, (float,int)):
1612+
remaining_timeout=timeout
1613+
else:
1614+
# Note: we may need to handle _DEFAULT_VALUE as a separate
1615+
# case someday, but even then the best we can do for queries
1616+
# is 72+ hours for hyperparameter tuning jobs:
1617+
# https://cloud.google.com/bigquery/quotas#query_jobs
1618+
#
1619+
# The timeout for a multi-statement query is 24+ hours. See:
1620+
# https://cloud.google.com/bigquery/quotas#multi_statement_query_limits
1621+
remaining_timeout=None
1622+
1623+
ifremaining_timeoutisNone:
1624+
# Since is_job_done() calls jobs.getQueryResults, which is a
1625+
# long-running API, don't delay the next request at all.
1626+
whilenotis_job_done():
1627+
pass
1628+
else:
1629+
# Use a monotonic clock since we don't actually care about
1630+
# daylight savings or similar, just the elapsed time.
1631+
previous_time=time.monotonic()
1632+
1633+
whilenotis_job_done():
1634+
current_time=time.monotonic()
1635+
elapsed_time=current_time-previous_time
1636+
remaining_timeout=remaining_timeout-elapsed_time
1637+
previous_time=current_time
1638+
1639+
ifremaining_timeout<0:
1640+
raiseconcurrent.futures.TimeoutError()
15911641

15921642
exceptexceptions.GoogleAPICallErrorasexc:
15931643
exc.message=_EXCEPTION_FOOTER_TEMPLATE.format(

‎google/cloud/bigquery/retry.py‎

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,25 @@
3636

3737
_DEFAULT_RETRY_DEADLINE=10.0*60.0# 10 minutes
3838

39-
# Allow for a few retries after the API request times out. This relevant for
40-
# rateLimitExceeded errors, which can be raised either by the Google load
41-
# balancer or the BigQuery job server.
42-
_DEFAULT_JOB_DEADLINE=3.0*_DEFAULT_RETRY_DEADLINE
39+
# Ambiguous errors (e.g. internalError, backendError, rateLimitExceeded) retry
40+
# until the full `_DEFAULT_RETRY_DEADLINE`. This is because the
41+
# `jobs.getQueryResults` REST API translates a job failure into an HTTP error.
42+
#
43+
# TODO(https://github.com/googleapis/python-bigquery/issues/1903): Investigate
44+
# if we can fail early for ambiguous errors in `QueryJob.result()`'s call to
45+
# the `jobs.getQueryResult` API.
46+
#
47+
# We need `_DEFAULT_JOB_DEADLINE` to be some multiple of
48+
# `_DEFAULT_RETRY_DEADLINE` to allow for a few retries after the retry
49+
# timeout is reached.
50+
#
51+
# Note: This multiple should actually be a multiple of
52+
# (2 * _DEFAULT_RETRY_DEADLINE). After an ambiguous exception, the first
53+
# call from `job_retry()` refreshes the job state without actually restarting
54+
# the query. The second `job_retry()` actually restarts the query. For a more
55+
# detailed explanation, see the comments where we set `restart_query_job = True`
56+
# in `QueryJob.result()`'s inner `is_job_done()` function.
57+
_DEFAULT_JOB_DEADLINE=2.0* (2.0*_DEFAULT_RETRY_DEADLINE)
4358

4459

4560
def_should_retry(exc):
@@ -66,17 +81,44 @@ def _should_retry(exc):
6681
pass ``retry=bigquery.DEFAULT_RETRY.with_deadline(30)``.
6782
"""
6883

84+
# Note: Take care when updating DEFAULT_TIMEOUT to anything but None. We
85+
# briefly had a default timeout, but even setting it at more than twice the
86+
# theoretical server-side default timeout of 2 minutes was not enough for
87+
# complex queries. See:
88+
# https://github.com/googleapis/python-bigquery/issues/970#issuecomment-921934647
6989
DEFAULT_TIMEOUT=None
7090
"""The default API timeout.
7191
7292
This is the time to wait per request. To adjust the total wait time, set a
7393
deadline on the retry object.
7494
"""
7595

76-
job_retry_reasons="rateLimitExceeded","backendError","jobRateLimitExceeded"
96+
job_retry_reasons= (
97+
"rateLimitExceeded",
98+
"backendError",
99+
"internalError",
100+
"jobRateLimitExceeded",
101+
)
77102

78103

79104
def_job_should_retry(exc):
105+
# Sometimes we have ambiguous errors, such as 'backendError' which could
106+
# be due to an API problem or a job problem. For these, make sure we retry
107+
# our is_job_done() function.
108+
#
109+
# Note: This won't restart the job unless we know for sure it's because of
110+
# the job status and set restart_query_job = True in that loop. This means
111+
# that we might end up calling this predicate twice for the same job
112+
# but from different paths: (1) from jobs.getQueryResults RetryError and
113+
# (2) from translating the job error from the body of a jobs.get response.
114+
#
115+
# Note: If we start retrying job types other than queries where we don't
116+
# call the problematic getQueryResults API to check the status, we need
117+
# to provide a different predicate, as there shouldn't be ambiguous
118+
# errors in those cases.
119+
ifisinstance(exc,exceptions.RetryError):
120+
exc=exc.cause
121+
80122
ifnothasattr(exc,"errors")orlen(exc.errors)==0:
81123
returnFalse
82124

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp