1717import concurrent .futures
1818import copy
1919import re
20+ import time
2021import typing
2122from typing import Any ,Dict ,Iterable ,List ,Optional ,Union
2223
2324from google .api_core import exceptions
24- from google .api_core .future import polling as polling_future
2525from google .api_core import retry as retries
2626import requests
2727
@@ -1383,7 +1383,7 @@ def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None):
13831383def _reload_query_results (
13841384self ,retry :"retries.Retry" = DEFAULT_RETRY ,timeout :Optional [float ]= None
13851385 ):
1386- """Refresh the cached query results.
1386+ """Refresh the cached query results unless already cached and complete .
13871387
13881388 Args:
13891389 retry (Optional[google.api_core.retry.Retry]):
@@ -1392,6 +1392,8 @@ def _reload_query_results(
13921392 The number of seconds to wait for the underlying HTTP transport
13931393 before using ``retry``.
13941394 """
1395+ # Optimization: avoid a call to jobs.getQueryResults if it's already
1396+ # been fetched, e.g. from jobs.query first page of results.
13951397if self ._query_results and self ._query_results .complete :
13961398return
13971399
@@ -1430,40 +1432,6 @@ def _reload_query_results(
14301432timeout = transport_timeout ,
14311433 )
14321434
1433- def _done_or_raise (self ,retry = DEFAULT_RETRY ,timeout = None ):
1434- """Check if the query has finished running and raise if it's not.
1435-
1436- If the query has finished, also reload the job itself.
1437- """
1438- # If an explicit timeout is not given, fall back to the transport timeout
1439- # stored in _blocking_poll() in the process of polling for job completion.
1440- transport_timeout = timeout if timeout is not None else self ._transport_timeout
1441-
1442- try :
1443- self ._reload_query_results (retry = retry ,timeout = transport_timeout )
1444- except exceptions .GoogleAPIError as exc :
1445- # Reloading also updates error details on self, thus no need for an
1446- # explicit self.set_exception() call if reloading succeeds.
1447- try :
1448- self .reload (retry = retry ,timeout = transport_timeout )
1449- except exceptions .GoogleAPIError :
1450- # Use the query results reload exception, as it generally contains
1451- # much more useful error information.
1452- self .set_exception (exc )
1453- finally :
1454- return
1455-
1456- # Only reload the job once we know the query is complete.
1457- # This will ensure that fields such as the destination table are
1458- # correctly populated.
1459- if not self ._query_results .complete :
1460- raise polling_future ._OperationNotComplete ()
1461- else :
1462- try :
1463- self .reload (retry = retry ,timeout = transport_timeout )
1464- except exceptions .GoogleAPIError as exc :
1465- self .set_exception (exc )
1466-
14671435def result (# type: ignore # (incompatible with supertype)
14681436self ,
14691437page_size :Optional [int ]= None ,
@@ -1528,6 +1496,10 @@ def result( # type: ignore # (incompatible with supertype)
15281496 If Non-``None`` and non-default ``job_retry`` is
15291497 provided and the job is not retryable.
15301498 """
1499+ # Note: Since waiting for a query job to finish is more complex than
1500+ # refreshing the job state in a loop, we avoid calling the superclass
1501+ # in this method.
1502+
15311503if self .dry_run :
15321504return _EmptyRowIterator (
15331505project = self .project ,
@@ -1548,46 +1520,124 @@ def result( # type: ignore # (incompatible with supertype)
15481520" provided to the query that created this job."
15491521 )
15501522
1551- first = True
1523+ restart_query_job = False
1524+
1525+ def is_job_done ():
1526+ nonlocal restart_query_job
15521527
1553- def do_get_result () :
1554- nonlocal first
1528+ if restart_query_job :
1529+ restart_query_job = False
15551530
1556- if first :
1557- first = False
1558- else :
1531+ # The original job has failed. Create a new one.
1532+ #
15591533# Note that we won't get here if retry_do_query is
15601534# None, because we won't use a retry.
1561-
1562- # The orinal job is failed. Create a new one.
15631535job = retry_do_query ()
15641536
1565- # If it's already failed, we might as well stop:
1566- if job .done ()and job .exception ()is not None :
1567- raise job .exception ()
1568-
15691537# Become the new job:
15701538self .__dict__ .clear ()
15711539self .__dict__ .update (job .__dict__ )
15721540
1573- # This shouldn't be necessary, because once we have a good
1574- # job, it should stay good,and we shouldn't have to retry.
1575- # But let's be paranoid. :)
1541+ # It's possible the job fails again and we'll have to
1542+ # retry that too.
15761543self ._retry_do_query = retry_do_query
15771544self ._job_retry = job_retry
15781545
1579- super (QueryJob ,self ).result (retry = retry ,timeout = timeout )
1580-
1581- # Since the job could already be "done" (e.g. got a finished job
1582- # via client.get_job), the superclass call to done() might not
1583- # set the self._query_results cache.
1584- if self ._query_results is None or not self ._query_results .complete :
1585- self ._reload_query_results (retry = retry ,timeout = timeout )
1546+ # Refresh the job status with jobs.get because some of the
1547+ # exceptions thrown by jobs.getQueryResults like timeout and
1548+ # rateLimitExceeded errors are ambiguous. We want to know if
1549+ # the query job failed and not just the call to
1550+ # jobs.getQueryResults.
1551+ if self .done (retry = retry ,timeout = timeout ):
1552+ # If it's already failed, we might as well stop.
1553+ job_failed_exception = self .exception ()
1554+ if job_failed_exception is not None :
1555+ # Only try to restart the query job if the job failed for
1556+ # a retriable reason. For example, don't restart the query
1557+ # if the call to reload the job metadata within self.done()
1558+ # timed out.
1559+ #
1560+ # The `restart_query_job` must only be called after a
1561+ # successful call to the `jobs.get` REST API and we
1562+ # determine that the job has failed.
1563+ #
1564+ # The `jobs.get` REST API
1565+ # (https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get)
1566+ # is called via `self.done()` which calls
1567+ # `self.reload()`.
1568+ #
1569+ # To determine if the job failed, the `self.exception()`
1570+ # is set from `self.reload()` via
1571+ # `self._set_properties()`, which translates the
1572+ # `Job.status.errorResult` field
1573+ # (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatus.FIELDS.error_result)
1574+ # into an exception that can be processed by the
1575+ # `job_retry` predicate.
1576+ restart_query_job = True
1577+ raise job_failed_exception
1578+ else :
1579+ # Make sure that the _query_results are cached so we
1580+ # can return a complete RowIterator.
1581+ #
1582+ # Note: As an optimization, _reload_query_results
1583+ # doesn't make any API calls if the query results are
1584+ # already cached and have jobComplete=True in the
1585+ # response from the REST API. This ensures we aren't
1586+ # making any extra API calls if the previous loop
1587+ # iteration fetched the finished job.
1588+ self ._reload_query_results (retry = retry ,timeout = timeout )
1589+ return True
1590+
1591+ # Call jobs.getQueryResults with max results set to 0 just to
1592+ # wait for the query to finish. Unlike most methods,
1593+ # jobs.getQueryResults hangs as long as it can to ensure we
1594+ # know when the query has finished as soon as possible.
1595+ self ._reload_query_results (retry = retry ,timeout = timeout )
1596+
1597+ # Even if the query is finished now according to
1598+ # jobs.getQueryResults, we'll want to reload the job status if
1599+ # it's not already DONE.
1600+ return False
15861601
15871602if retry_do_query is not None and job_retry is not None :
1588- do_get_result = job_retry (do_get_result )
1589-
1590- do_get_result ()
1603+ is_job_done = job_retry (is_job_done )
1604+
1605+ # timeout can be a number of seconds, `None`, or a
1606+ # `google.api_core.future.polling.PollingFuture._DEFAULT_VALUE`
1607+ # sentinel object indicating a default timeout if we choose to add
1608+ # one some day. This value can come from our PollingFuture
1609+ # superclass and was introduced in
1610+ # https://github.com/googleapis/python-api-core/pull/462.
1611+ if isinstance (timeout , (float ,int )):
1612+ remaining_timeout = timeout
1613+ else :
1614+ # Note: we may need to handle _DEFAULT_VALUE as a separate
1615+ # case someday, but even then the best we can do for queries
1616+ # is 72+ hours for hyperparameter tuning jobs:
1617+ # https://cloud.google.com/bigquery/quotas#query_jobs
1618+ #
1619+ # The timeout for a multi-statement query is 24+ hours. See:
1620+ # https://cloud.google.com/bigquery/quotas#multi_statement_query_limits
1621+ remaining_timeout = None
1622+
1623+ if remaining_timeout is None :
1624+ # Since is_job_done() calls jobs.getQueryResults, which is a
1625+ # long-running API, don't delay the next request at all.
1626+ while not is_job_done ():
1627+ pass
1628+ else :
1629+ # Use a monotonic clock since we don't actually care about
1630+ # daylight savings or similar, just the elapsed time.
1631+ previous_time = time .monotonic ()
1632+
1633+ while not is_job_done ():
1634+ current_time = time .monotonic ()
1635+ elapsed_time = current_time - previous_time
1636+ remaining_timeout = remaining_timeout - elapsed_time
1637+ previous_time = current_time
1638+
1639+ if remaining_timeout < 0 :
1640+ raise concurrent .futures .TimeoutError ()
15911641
15921642except exceptions .GoogleAPICallError as exc :
15931643exc .message = _EXCEPTION_FOOTER_TEMPLATE .format (