Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite51fd45

Browse files
authored
feat: addreload argument to*Job.done() functions (#341)
This enables checking the job status without making an API call.It also fixes an inconsistency in `QueryJob`, where a job can bereported as "done" without having the results of a `getQueryResults` APIcall.Follow-up to#340
1 parent5a925ec commite51fd45

File tree

3 files changed

+218
-123
lines changed

3 files changed

+218
-123
lines changed

‎google/cloud/bigquery/job.py‎

Lines changed: 36 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -767,7 +767,7 @@ def _set_future_result(self):
767767
# set, do not call set_result/set_exception again.
768768
# Note: self._result_set is set to True in set_result and
769769
# set_exception, in case those methods are invoked directly.
770-
ifself.state!=_DONE_STATEorself._result_set:
770+
ifnotself.done(reload=False)orself._result_set:
771771
return
772772

773773
ifself.error_resultisnotNone:
@@ -776,21 +776,24 @@ def _set_future_result(self):
776776
else:
777777
self.set_result(self)
778778

779-
defdone(self,retry=DEFAULT_RETRY,timeout=None):
780-
"""Refreshthe job and checks if it is complete.
779+
defdone(self,retry=DEFAULT_RETRY,timeout=None,reload=True):
780+
"""Checks ifthe job is complete.
781781
782782
Args:
783783
retry (Optional[google.api_core.retry.Retry]): How to retry the RPC.
784784
timeout (Optional[float]):
785785
The number of seconds to wait for the underlying HTTP transport
786786
before using ``retry``.
787+
reload (Optional[bool]):
788+
If ``True``, make an API call to refresh the job state of
789+
unfinished jobs before checking. Default ``True``.
787790
788791
Returns:
789792
bool: True if the job is complete, False otherwise.
790793
"""
791794
# Do not refresh is the state is already done, as the job will not
792795
# change once complete.
793-
ifself.state!=_DONE_STATE:
796+
ifself.state!=_DONE_STATEandreload:
794797
self.reload(retry=retry,timeout=timeout)
795798
returnself.state==_DONE_STATE
796799

@@ -3073,7 +3076,7 @@ def estimated_bytes_processed(self):
30733076
result=int(result)
30743077
returnresult
30753078

3076-
defdone(self,retry=DEFAULT_RETRY,timeout=None):
3079+
defdone(self,retry=DEFAULT_RETRY,timeout=None,reload=True):
30773080
"""Refresh the job and checks if it is complete.
30783081
30793082
Args:
@@ -3082,10 +3085,25 @@ def done(self, retry=DEFAULT_RETRY, timeout=None):
30823085
timeout (Optional[float]):
30833086
The number of seconds to wait for the underlying HTTP transport
30843087
before using ``retry``.
3088+
reload (Optional[bool]):
3089+
If ``True``, make an API call to refresh the job state of
3090+
unfinished jobs before checking. Default ``True``.
30853091
30863092
Returns:
30873093
bool: True if the job is complete, False otherwise.
30883094
"""
3095+
is_done= (
3096+
# Only consider a QueryJob complete when we know we have the final
3097+
# query results available.
3098+
self._query_resultsisnotNone
3099+
andself._query_results.complete
3100+
andself.state==_DONE_STATE
3101+
)
3102+
# Do not refresh if the state is already done, as the job will not
3103+
# change once complete.
3104+
ifnotreloadoris_done:
3105+
returnis_done
3106+
30893107
# Since the API to getQueryResults can hang up to the timeout value
30903108
# (default of 10 seconds), set the timeout parameter to ensure that
30913109
# the timeout from the futures API is respected. See:
@@ -3103,23 +3121,20 @@ def done(self, retry=DEFAULT_RETRY, timeout=None):
31033121
# stored in _blocking_poll() in the process of polling for job completion.
31043122
transport_timeout=timeoutiftimeoutisnotNoneelseself._transport_timeout
31053123

3106-
# Do not refresh if the state is already done, as the job will not
3107-
# change once complete.
3108-
ifself.state!=_DONE_STATE:
3109-
self._query_results=self._client._get_query_results(
3110-
self.job_id,
3111-
retry,
3112-
project=self.project,
3113-
timeout_ms=timeout_ms,
3114-
location=self.location,
3115-
timeout=transport_timeout,
3116-
)
3124+
self._query_results=self._client._get_query_results(
3125+
self.job_id,
3126+
retry,
3127+
project=self.project,
3128+
timeout_ms=timeout_ms,
3129+
location=self.location,
3130+
timeout=transport_timeout,
3131+
)
31173132

3118-
# Only reload the job once we know the query is complete.
3119-
# This will ensure that fields such as the destination table are
3120-
# correctly populated.
3121-
ifself._query_results.complete:
3122-
self.reload(retry=retry,timeout=transport_timeout)
3133+
# Only reload the job once we know the query is complete.
3134+
# This will ensure that fields such as the destination table are
3135+
# correctly populated.
3136+
ifself._query_results.completeandself.state!=_DONE_STATE:
3137+
self.reload(retry=retry,timeout=transport_timeout)
31233138

31243139
returnself.state==_DONE_STATE
31253140

@@ -3231,16 +3246,6 @@ def result(
32313246
"""
32323247
try:
32333248
super(QueryJob,self).result(retry=retry,timeout=timeout)
3234-
3235-
# Return an iterator instead of returning the job.
3236-
ifnotself._query_results:
3237-
self._query_results=self._client._get_query_results(
3238-
self.job_id,
3239-
retry,
3240-
project=self.project,
3241-
location=self.location,
3242-
timeout=timeout,
3243-
)
32443249
exceptexceptions.GoogleCloudErrorasexc:
32453250
exc.message+=self._format_for_exception(self.query,self.job_id)
32463251
exc.query_job=self

‎tests/unit/test_job.py‎

Lines changed: 90 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
except (ImportError,AttributeError):# pragma: NO COVER
4646
tqdm=None
4747

48+
importgoogle.cloud.bigquery.query
49+
4850

4951
def_make_credentials():
5052
importgoogle.auth.credentials
@@ -3942,10 +3944,6 @@ def _make_resource(self, started=False, ended=False):
39423944
resource=super(TestQueryJob,self)._make_resource(started,ended)
39433945
config=resource["configuration"]["query"]
39443946
config["query"]=self.QUERY
3945-
3946-
ifended:
3947-
resource["status"]= {"state":"DONE"}
3948-
39493947
returnresource
39503948

39513949
def_verifyBooleanResourceProperties(self,job,config):
@@ -4211,6 +4209,9 @@ def test_done(self):
42114209
client=_make_client(project=self.PROJECT)
42124210
resource=self._make_resource(ended=True)
42134211
job=self._get_target_class().from_api_repr(resource,client)
4212+
job._query_results=google.cloud.bigquery.query._QueryResults.from_api_repr(
4213+
{"jobComplete":True,"jobReference":resource["jobReference"]}
4214+
)
42144215
self.assertTrue(job.done())
42154216

42164217
deftest_done_w_timeout(self):
@@ -4668,35 +4669,110 @@ def test_result(self):
46684669
fromgoogle.cloud.bigquery.tableimportRowIterator
46694670

46704671
query_resource= {
4672+
"jobComplete":False,
4673+
"jobReference": {"projectId":self.PROJECT,"jobId":self.JOB_ID},
4674+
}
4675+
query_resource_done= {
46714676
"jobComplete":True,
46724677
"jobReference": {"projectId":self.PROJECT,"jobId":self.JOB_ID},
46734678
"schema": {"fields": [{"name":"col1","type":"STRING"}]},
46744679
"totalRows":"2",
46754680
}
4681+
job_resource=self._make_resource(started=True)
4682+
job_resource_done=self._make_resource(started=True,ended=True)
4683+
job_resource_done["configuration"]["query"]["destinationTable"]= {
4684+
"projectId":"dest-project",
4685+
"datasetId":"dest_dataset",
4686+
"tableId":"dest_table",
4687+
}
46764688
tabledata_resource= {
4677-
# Explicitly set totalRows to be different from thequery response.
4678-
# to test update during iteration.
4689+
# Explicitly set totalRows to be different from theinitial
4690+
#responseto test update during iteration.
46794691
"totalRows":"1",
46804692
"pageToken":None,
46814693
"rows": [{"f": [{"v":"abc"}]}],
46824694
}
4683-
connection=_make_connection(query_resource,tabledata_resource)
4684-
client=_make_client(self.PROJECT,connection=connection)
4685-
resource=self._make_resource(ended=True)
4686-
job=self._get_target_class().from_api_repr(resource,client)
4695+
conn=_make_connection(
4696+
query_resource,query_resource_done,job_resource_done,tabledata_resource
4697+
)
4698+
client=_make_client(self.PROJECT,connection=conn)
4699+
job=self._get_target_class().from_api_repr(job_resource,client)
46874700

46884701
result=job.result()
46894702

46904703
self.assertIsInstance(result,RowIterator)
46914704
self.assertEqual(result.total_rows,2)
4692-
46934705
rows=list(result)
46944706
self.assertEqual(len(rows),1)
46954707
self.assertEqual(rows[0].col1,"abc")
46964708
# Test that the total_rows property has changed during iteration, based
46974709
# on the response from tabledata.list.
46984710
self.assertEqual(result.total_rows,1)
46994711

4712+
query_results_call=mock.call(
4713+
method="GET",
4714+
path=f"/projects/{self.PROJECT}/queries/{self.JOB_ID}",
4715+
query_params={"maxResults":0},
4716+
timeout=None,
4717+
)
4718+
reload_call=mock.call(
4719+
method="GET",
4720+
path=f"/projects/{self.PROJECT}/jobs/{self.JOB_ID}",
4721+
query_params={},
4722+
timeout=None,
4723+
)
4724+
tabledata_call=mock.call(
4725+
method="GET",
4726+
path="/projects/dest-project/datasets/dest_dataset/tables/dest_table/data",
4727+
query_params={},
4728+
timeout=None,
4729+
)
4730+
conn.api_request.assert_has_calls(
4731+
[query_results_call,query_results_call,reload_call,tabledata_call]
4732+
)
4733+
4734+
deftest_result_with_done_job_calls_get_query_results(self):
4735+
query_resource_done= {
4736+
"jobComplete":True,
4737+
"jobReference": {"projectId":self.PROJECT,"jobId":self.JOB_ID},
4738+
"schema": {"fields": [{"name":"col1","type":"STRING"}]},
4739+
"totalRows":"1",
4740+
}
4741+
job_resource=self._make_resource(started=True,ended=True)
4742+
job_resource["configuration"]["query"]["destinationTable"]= {
4743+
"projectId":"dest-project",
4744+
"datasetId":"dest_dataset",
4745+
"tableId":"dest_table",
4746+
}
4747+
tabledata_resource= {
4748+
"totalRows":"1",
4749+
"pageToken":None,
4750+
"rows": [{"f": [{"v":"abc"}]}],
4751+
}
4752+
conn=_make_connection(query_resource_done,tabledata_resource)
4753+
client=_make_client(self.PROJECT,connection=conn)
4754+
job=self._get_target_class().from_api_repr(job_resource,client)
4755+
4756+
result=job.result()
4757+
4758+
rows=list(result)
4759+
self.assertEqual(len(rows),1)
4760+
self.assertEqual(rows[0].col1,"abc")
4761+
4762+
query_results_call=mock.call(
4763+
method="GET",
4764+
path=f"/projects/{self.PROJECT}/queries/{self.JOB_ID}",
4765+
query_params={"maxResults":0},
4766+
timeout=None,
4767+
)
4768+
tabledata_call=mock.call(
4769+
method="GET",
4770+
path="/projects/dest-project/datasets/dest_dataset/tables/dest_table/data",
4771+
query_params={},
4772+
timeout=None,
4773+
)
4774+
conn.api_request.assert_has_calls([query_results_call,tabledata_call])
4775+
47004776
deftest_result_with_max_results(self):
47014777
fromgoogle.cloud.bigquery.tableimportRowIterator
47024778

@@ -4938,6 +5014,9 @@ def test_result_error(self):
49385014
"errors": [error_result],
49395015
"state":"DONE",
49405016
}
5017+
job._query_results=google.cloud.bigquery.query._QueryResults.from_api_repr(
5018+
{"jobComplete":True,"jobReference":job._properties["jobReference"]}
5019+
)
49415020
job._set_future_result()
49425021

49435022
withself.assertRaises(exceptions.GoogleCloudError)asexc_info:

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp