Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit0663a95

Browse files
committed
feat: make QueryJob.done() method more performant
1 parent3ce826e commit0663a95

File tree

2 files changed

+45
-100
lines changed

2 files changed

+45
-100
lines changed

‎google/cloud/bigquery/job/query.py‎

Lines changed: 35 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
importre
2020

2121
fromgoogle.api_coreimportexceptions
22+
fromgoogle.api_core.futureimportpollingaspolling_future
2223
importrequests
2324

2425
fromgoogle.cloud.bigquery.datasetimportDataset
@@ -42,7 +43,6 @@
4243
fromgoogle.cloud.bigquery._tqdm_helpersimportwait_for_query
4344

4445
fromgoogle.cloud.bigquery.job.baseimport_AsyncJob
45-
fromgoogle.cloud.bigquery.job.baseimport_DONE_STATE
4646
fromgoogle.cloud.bigquery.job.baseimport_JobConfig
4747
fromgoogle.cloud.bigquery.job.baseimport_JobReference
4848

@@ -974,61 +974,6 @@ def estimated_bytes_processed(self):
974974
result=int(result)
975975
returnresult
976976

977-
defdone(self,retry=DEFAULT_RETRY,timeout=None,reload=True):
978-
"""Refresh the job and checks if it is complete.
979-
980-
Args:
981-
retry (Optional[google.api_core.retry.Retry]):
982-
How to retry the call that retrieves query results. If the job state is
983-
``DONE``, retrying is aborted early, as the job will not change anymore.
984-
timeout (Optional[float]):
985-
The number of seconds to wait for the underlying HTTP transport
986-
before using ``retry``.
987-
reload (Optional[bool]):
988-
If ``True``, make an API call to refresh the job state of
989-
unfinished jobs before checking. Default ``True``.
990-
991-
Returns:
992-
bool: ``True`` if the job is complete or if fetching its status resulted in
993-
an error, ``False`` otherwise.
994-
"""
995-
# Do not refresh if the state is already done, as the job will not
996-
# change once complete.
997-
is_done=self.state==_DONE_STATE
998-
ifnotreloadoris_done:
999-
returnis_done
1000-
1001-
# If an explicit timeout is not given, fall back to the transport timeout
1002-
# stored in _blocking_poll() in the process of polling for job completion.
1003-
transport_timeout=timeoutiftimeoutisnotNoneelseself._transport_timeout
1004-
1005-
try:
1006-
self._reload_query_results(retry=retry,timeout=transport_timeout)
1007-
exceptexceptions.GoogleAPIErrorasexc:
1008-
# Reloading also updates error details on self, thus no need for an
1009-
# explicit self.set_exception() call if reloading succeeds.
1010-
try:
1011-
self.reload(retry=retry,timeout=transport_timeout)
1012-
exceptexceptions.GoogleAPIError:
1013-
# Use the query results reload exception, as it generally contains
1014-
# much more useful error information.
1015-
self.set_exception(exc)
1016-
returnTrue
1017-
else:
1018-
returnself.state==_DONE_STATE
1019-
1020-
# Only reload the job once we know the query is complete.
1021-
# This will ensure that fields such as the destination table are
1022-
# correctly populated.
1023-
ifself._query_results.complete:
1024-
try:
1025-
self.reload(retry=retry,timeout=transport_timeout)
1026-
exceptexceptions.GoogleAPIErrorasexc:
1027-
self.set_exception(exc)
1028-
returnTrue
1029-
1030-
returnself.state==_DONE_STATE
1031-
1032977
def_blocking_poll(self,timeout=None,**kwargs):
1033978
self._done_timeout=timeout
1034979
self._transport_timeout=timeout
@@ -1130,6 +1075,40 @@ def _reload_query_results(self, retry=DEFAULT_RETRY, timeout=None):
11301075
timeout=transport_timeout,
11311076
)
11321077

1078+
def_done_or_raise(self,retry=DEFAULT_RETRY,timeout=None):
1079+
"""Check if the query has finished running and raise if it's not.
1080+
1081+
If the query has finished, also reload the job itself.
1082+
"""
1083+
# If an explicit timeout is not given, fall back to the transport timeout
1084+
# stored in _blocking_poll() in the process of polling for job completion.
1085+
transport_timeout=timeoutiftimeoutisnotNoneelseself._transport_timeout
1086+
1087+
try:
1088+
self._reload_query_results(retry=retry,timeout=transport_timeout)
1089+
exceptexceptions.GoogleAPIErrorasexc:
1090+
# Reloading also updates error details on self, thus no need for an
1091+
# explicit self.set_exception() call if reloading succeeds.
1092+
try:
1093+
self.reload(retry=retry,timeout=transport_timeout)
1094+
exceptexceptions.GoogleAPIError:
1095+
# Use the query results reload exception, as it generally contains
1096+
# much more useful error information.
1097+
self.set_exception(exc)
1098+
finally:
1099+
return
1100+
1101+
# Only reload the job once we know the query is complete.
1102+
# This will ensure that fields such as the destination table are
1103+
# correctly populated.
1104+
ifnotself._query_results.complete:
1105+
raisepolling_future._OperationNotComplete()
1106+
else:
1107+
try:
1108+
self.reload(retry=retry,timeout=transport_timeout)
1109+
exceptexceptions.GoogleAPIErrorasexc:
1110+
self.set_exception(exc)
1111+
11331112
defresult(
11341113
self,
11351114
page_size=None,

‎tests/unit/job/test_query.py‎

Lines changed: 10 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -309,24 +309,15 @@ def test_cancelled(self):
309309

310310
self.assertTrue(job.cancelled())
311311

312-
deftest_done_job_complete(self):
313-
client=_make_client(project=self.PROJECT)
314-
resource=self._make_resource(ended=True)
315-
job=self._get_target_class().from_api_repr(resource,client)
316-
job._query_results=google.cloud.bigquery.query._QueryResults.from_api_repr(
317-
{"jobComplete":True,"jobReference":resource["jobReference"]}
318-
)
319-
self.assertTrue(job.done())
320-
321-
deftest_done_w_timeout(self):
312+
deftest__done_or_raise_w_timeout(self):
322313
client=_make_client(project=self.PROJECT)
323314
resource=self._make_resource(ended=False)
324315
job=self._get_target_class().from_api_repr(resource,client)
325316

326317
withmock.patch.object(
327318
client,"_get_query_results"
328319
)asfake_get_results,mock.patch.object(job,"reload")asfake_reload:
329-
job.done(timeout=42)
320+
job._done_or_raise(timeout=42)
330321

331322
fake_get_results.assert_called_once()
332323
call_args=fake_get_results.call_args
@@ -335,7 +326,7 @@ def test_done_w_timeout(self):
335326
call_args=fake_reload.call_args
336327
self.assertEqual(call_args.kwargs.get("timeout"),42)
337328

338-
deftest_done_w_timeout_and_longer_internal_api_timeout(self):
329+
deftest__done_or_raise_w_timeout_and_longer_internal_api_timeout(self):
339330
client=_make_client(project=self.PROJECT)
340331
resource=self._make_resource(ended=False)
341332
job=self._get_target_class().from_api_repr(resource,client)
@@ -344,7 +335,7 @@ def test_done_w_timeout_and_longer_internal_api_timeout(self):
344335
withmock.patch.object(
345336
client,"_get_query_results"
346337
)asfake_get_results,mock.patch.object(job,"reload")asfake_reload:
347-
job.done(timeout=5.5)
338+
job._done_or_raise(timeout=5.5)
348339

349340
# The expected timeout used is simply the given timeout, as the latter
350341
# is shorter than the job's internal done timeout.
@@ -357,7 +348,7 @@ def test_done_w_timeout_and_longer_internal_api_timeout(self):
357348
call_args=fake_reload.call_args
358349
self.assertAlmostEqual(call_args.kwargs.get("timeout"),expected_timeout)
359350

360-
deftest_done_w_query_results_error_reload_ok_job_finished(self):
351+
deftest__done_or_raise_w_query_results_error_reload_ok(self):
361352
client=_make_client(project=self.PROJECT)
362353
bad_request_error=exceptions.BadRequest("Error in query")
363354
client._get_query_results=mock.Mock(side_effect=bad_request_error)
@@ -373,32 +364,11 @@ def fake_reload(self, *args, **kwargs):
373364
fake_reload_method=types.MethodType(fake_reload,job)
374365

375366
withmock.patch.object(job,"reload",new=fake_reload_method):
376-
is_done=job.done()
367+
job._done_or_raise()
377368

378-
assertis_done
379369
assertisinstance(job._exception,exceptions.BadRequest)
380370

381-
deftest_done_w_query_results_error_reload_ok_job_still_running(self):
382-
client=_make_client(project=self.PROJECT)
383-
retry_error=exceptions.RetryError("Too many retries",cause=TimeoutError)
384-
client._get_query_results=mock.Mock(side_effect=retry_error)
385-
386-
resource=self._make_resource(ended=False)
387-
job=self._get_target_class().from_api_repr(resource,client)
388-
job._exception=None
389-
390-
deffake_reload(self,*args,**kwargs):
391-
self._properties["status"]["state"]="RUNNING"
392-
393-
fake_reload_method=types.MethodType(fake_reload,job)
394-
395-
withmock.patch.object(job,"reload",new=fake_reload_method):
396-
is_done=job.done()
397-
398-
assertnotis_done
399-
assertjob._exceptionisNone
400-
401-
deftest_done_w_query_results_error_reload_error(self):
371+
deftest__done_or_raise_w_query_results_error_reload_error(self):
402372
client=_make_client(project=self.PROJECT)
403373
bad_request_error=exceptions.BadRequest("Error in query")
404374
client._get_query_results=mock.Mock(side_effect=bad_request_error)
@@ -409,12 +379,11 @@ def test_done_w_query_results_error_reload_error(self):
409379
job.reload=mock.Mock(side_effect=reload_error)
410380
job._exception=None
411381

412-
is_done=job.done()
382+
job._done_or_raise()
413383

414-
assertis_done
415384
assertjob._exceptionisbad_request_error
416385

417-
deftest_done_w_job_query_results_ok_reload_error(self):
386+
deftest__done_or_raise_w_job_query_results_ok_reload_error(self):
418387
client=_make_client(project=self.PROJECT)
419388
query_results=google.cloud.bigquery.query._QueryResults(
420389
properties={
@@ -430,9 +399,8 @@ def test_done_w_job_query_results_ok_reload_error(self):
430399
job.reload=mock.Mock(side_effect=retry_error)
431400
job._exception=None
432401

433-
is_done=job.done()
402+
job._done_or_raise()
434403

435-
assertis_done
436404
assertjob._exceptionisretry_error
437405

438406
deftest_query_plan(self):
@@ -1905,8 +1873,6 @@ def test_reload_w_timeout(self):
19051873
)
19061874

19071875
deftest_iter(self):
1908-
importtypes
1909-
19101876
begun_resource=self._make_resource()
19111877
query_resource= {
19121878
"jobComplete":True,

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp