Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commita17be5f

Browse files
authored
fix: distinguish server timeouts from transport timeouts (#43)
* fix: distinguish transport and query timeoutsA transport layer timeout is made independent of the query timeout,i.e. the maximum time to wait for the query to complete.The query timeout is used by the blocking poll so that the backenddoes not block for too long when polling for job completion, butthe transport can have different timeout requirements, and we donot want it to be raising sometimes unnecessary timeout errors.* Apply timeout to each of the underlying requestsAs job methods do not split the timeout anymore between all requests amethod might make, the Client methods are adjusted in the same way.
1 parent24f3910 commita17be5f

File tree

4 files changed

+32
-233
lines changed

4 files changed

+32
-233
lines changed

‎google/cloud/bigquery/client.py‎

Lines changed: 13 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
exceptImportError:# Python 2.7
2323
importcollectionsascollections_abc
2424

25-
importconcurrent.futures
2625
importcopy
2726
importfunctools
2827
importgzip
@@ -48,7 +47,6 @@
4847
importgoogle.api_core.client_options
4948
importgoogle.api_core.exceptions
5049
fromgoogle.api_coreimportpage_iterator
51-
fromgoogle.auth.transport.requestsimportTimeoutGuard
5250
importgoogle.cloud._helpers
5351
fromgoogle.cloudimportexceptions
5452
fromgoogle.cloud.clientimportClientWithProject
@@ -2598,27 +2596,22 @@ def list_partitions(self, table, retry=DEFAULT_RETRY, timeout=None):
25982596
timeout (Optional[float]):
25992597
The number of seconds to wait for the underlying HTTP transport
26002598
before using ``retry``.
2601-
If multiple requests are made under the hood, ``timeout`` is
2602-
interpreted as the approximate total time of **all** requests.
2599+
If multiple requests are made under the hood, ``timeout``
2600+
applies to each individual request.
26032601
26042602
Returns:
26052603
List[str]:
26062604
A list of the partition ids present in the partitioned table
26072605
"""
26082606
table=_table_arg_to_table_ref(table,default_project=self.project)
2609-
2610-
withTimeoutGuard(
2611-
timeout,timeout_error_type=concurrent.futures.TimeoutError
2612-
)asguard:
2613-
meta_table=self.get_table(
2614-
TableReference(
2615-
DatasetReference(table.project,table.dataset_id),
2616-
"%s$__PARTITIONS_SUMMARY__"%table.table_id,
2617-
),
2618-
retry=retry,
2619-
timeout=timeout,
2620-
)
2621-
timeout=guard.remaining_timeout
2607+
meta_table=self.get_table(
2608+
TableReference(
2609+
DatasetReference(table.project,table.dataset_id),
2610+
"%s$__PARTITIONS_SUMMARY__"%table.table_id,
2611+
),
2612+
retry=retry,
2613+
timeout=timeout,
2614+
)
26222615

26232616
subset= [colforcolinmeta_table.schemaifcol.name=="partition_id"]
26242617
return [
@@ -2685,8 +2678,8 @@ def list_rows(
26852678
timeout (Optional[float]):
26862679
The number of seconds to wait for the underlying HTTP transport
26872680
before using ``retry``.
2688-
If multiple requests are made under the hood, ``timeout`` is
2689-
interpreted as the approximate total time of **all** requests.
2681+
If multiple requests are made under the hood, ``timeout``
2682+
applies to each individual request.
26902683
26912684
Returns:
26922685
google.cloud.bigquery.table.RowIterator:
@@ -2711,11 +2704,7 @@ def list_rows(
27112704
# No schema, but no selected_fields. Assume the developer wants all
27122705
# columns, so get the table resource for them rather than failing.
27132706
eliflen(schema)==0:
2714-
withTimeoutGuard(
2715-
timeout,timeout_error_type=concurrent.futures.TimeoutError
2716-
)asguard:
2717-
table=self.get_table(table.reference,retry=retry,timeout=timeout)
2718-
timeout=guard.remaining_timeout
2707+
table=self.get_table(table.reference,retry=retry,timeout=timeout)
27192708
schema=table.schema
27202709

27212710
params= {}

‎google/cloud/bigquery/job.py‎

Lines changed: 19 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
fromsix.movesimporthttp_client
2727

2828
importgoogle.api_core.future.polling
29-
fromgoogle.auth.transport.requestsimportTimeoutGuard
3029
fromgoogle.cloudimportexceptions
3130
fromgoogle.cloud.exceptionsimportNotFound
3231
fromgoogle.cloud.bigquery.datasetimportDataset
@@ -55,7 +54,6 @@
5554
_DONE_STATE="DONE"
5655
_STOPPED_REASON="stopped"
5756
_TIMEOUT_BUFFER_SECS=0.1
58-
_SERVER_TIMEOUT_MARGIN_SECS=1.0
5957
_CONTAINS_ORDER_BY=re.compile(r"ORDER\s+BY",re.IGNORECASE)
6058

6159
_ERROR_REASON_TO_EXCEPTION= {
@@ -796,8 +794,8 @@ def result(self, retry=DEFAULT_RETRY, timeout=None):
796794
timeout (Optional[float]):
797795
The number of seconds to wait for the underlying HTTP transport
798796
before using ``retry``.
799-
If multiple requests are made under the hood, ``timeout`` is
800-
interpreted as the approximate total time of **all** requests.
797+
If multiple requests are made under the hood, ``timeout``
798+
applies to each individual request.
801799
802800
Returns:
803801
_AsyncJob: This instance.
@@ -809,11 +807,7 @@ def result(self, retry=DEFAULT_RETRY, timeout=None):
809807
if the job did not complete in the given timeout.
810808
"""
811809
ifself.stateisNone:
812-
withTimeoutGuard(
813-
timeout,timeout_error_type=concurrent.futures.TimeoutError
814-
)asguard:
815-
self._begin(retry=retry,timeout=timeout)
816-
timeout=guard.remaining_timeout
810+
self._begin(retry=retry,timeout=timeout)
817811
# TODO: modify PollingFuture so it can pass a retry argument to done().
818812
returnsuper(_AsyncJob,self).result(timeout=timeout)
819813

@@ -2602,6 +2596,7 @@ def __init__(self, job_id, query, client, job_config=None):
26022596
self._configuration=job_config
26032597
self._query_results=None
26042598
self._done_timeout=None
2599+
self._transport_timeout=None
26052600

26062601
@property
26072602
defallow_large_results(self):
@@ -3059,19 +3054,9 @@ def done(self, retry=DEFAULT_RETRY, timeout=None):
30593054
self._done_timeout=max(0,self._done_timeout)
30603055
timeout_ms=int(api_timeout*1000)
30613056

3062-
# If the server-side processing timeout (timeout_ms) is specified and
3063-
# would be picked as the total request timeout, we want to add a small
3064-
# margin to it - we don't want to timeout the connection just as the
3065-
# server-side processing might have completed, but instead slightly
3066-
# after the server-side deadline.
3067-
# However, if `timeout` is specified, and is shorter than the adjusted
3068-
# server timeout, the former prevails.
3069-
iftimeout_msisnotNoneandtimeout_ms>0:
3070-
server_timeout_with_margin=timeout_ms/1000+_SERVER_TIMEOUT_MARGIN_SECS
3071-
iftimeoutisnotNone:
3072-
timeout=min(server_timeout_with_margin,timeout)
3073-
else:
3074-
timeout=server_timeout_with_margin
3057+
# If an explicit timeout is not given, fall back to the transport timeout
3058+
# stored in _blocking_poll() in the process of polling for job completion.
3059+
transport_timeout=timeoutiftimeoutisnotNoneelseself._transport_timeout
30753060

30763061
# Do not refresh if the state is already done, as the job will not
30773062
# change once complete.
@@ -3082,19 +3067,20 @@ def done(self, retry=DEFAULT_RETRY, timeout=None):
30823067
project=self.project,
30833068
timeout_ms=timeout_ms,
30843069
location=self.location,
3085-
timeout=timeout,
3070+
timeout=transport_timeout,
30863071
)
30873072

30883073
# Only reload the job once we know the query is complete.
30893074
# This will ensure that fields such as the destination table are
30903075
# correctly populated.
30913076
ifself._query_results.complete:
3092-
self.reload(retry=retry,timeout=timeout)
3077+
self.reload(retry=retry,timeout=transport_timeout)
30933078

30943079
returnself.state==_DONE_STATE
30953080

30963081
def_blocking_poll(self,timeout=None):
30973082
self._done_timeout=timeout
3083+
self._transport_timeout=timeout
30983084
super(QueryJob,self)._blocking_poll(timeout=timeout)
30993085

31003086
@staticmethod
@@ -3170,8 +3156,8 @@ def result(
31703156
timeout (Optional[float]):
31713157
The number of seconds to wait for the underlying HTTP transport
31723158
before using ``retry``.
3173-
If multiple requests are made under the hood, ``timeout`` is
3174-
interpreted as the approximate total time of **all** requests.
3159+
If multiple requests are made under the hood, ``timeout``
3160+
applies to each individual request.
31753161
31763162
Returns:
31773163
google.cloud.bigquery.table.RowIterator:
@@ -3189,27 +3175,17 @@ def result(
31893175
If the job did not complete in the given timeout.
31903176
"""
31913177
try:
3192-
guard=TimeoutGuard(
3193-
timeout,timeout_error_type=concurrent.futures.TimeoutError
3194-
)
3195-
withguard:
3196-
super(QueryJob,self).result(retry=retry,timeout=timeout)
3197-
timeout=guard.remaining_timeout
3178+
super(QueryJob,self).result(retry=retry,timeout=timeout)
31983179

31993180
# Return an iterator instead of returning the job.
32003181
ifnotself._query_results:
3201-
guard=TimeoutGuard(
3202-
timeout,timeout_error_type=concurrent.futures.TimeoutError
3182+
self._query_results=self._client._get_query_results(
3183+
self.job_id,
3184+
retry,
3185+
project=self.project,
3186+
location=self.location,
3187+
timeout=timeout,
32033188
)
3204-
withguard:
3205-
self._query_results=self._client._get_query_results(
3206-
self.job_id,
3207-
retry,
3208-
project=self.project,
3209-
location=self.location,
3210-
timeout=timeout,
3211-
)
3212-
timeout=guard.remaining_timeout
32133189
exceptexceptions.GoogleCloudErrorasexc:
32143190
exc.message+=self._format_for_exception(self.query,self.job_id)
32153191
exc.query_job=self

‎tests/unit/test_client.py‎

Lines changed: 0 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
importunittest
2525
importwarnings
2626

27-
importfreezegun
2827
importmock
2928
importrequests
3029
importsix
@@ -5496,43 +5495,6 @@ def test_list_partitions_with_string_id(self):
54965495

54975496
self.assertEqual(len(partition_list),0)
54985497

5499-
deftest_list_partitions_splitting_timout_between_requests(self):
5500-
fromgoogle.cloud.bigquery.tableimportTable
5501-
5502-
row_count=2
5503-
meta_info=_make_list_partitons_meta_info(
5504-
self.PROJECT,self.DS_ID,self.TABLE_ID,row_count
5505-
)
5506-
5507-
data= {
5508-
"totalRows":str(row_count),
5509-
"rows": [{"f": [{"v":"20180101"}]}, {"f": [{"v":"20180102"}]}],
5510-
}
5511-
creds=_make_credentials()
5512-
http=object()
5513-
client=self._make_one(project=self.PROJECT,credentials=creds,_http=http)
5514-
client._connection=make_connection(meta_info,data)
5515-
table=Table(self.TABLE_REF)
5516-
5517-
withfreezegun.freeze_time("2019-01-01 00:00:00",tick=False)asfrozen_time:
5518-
5519-
defdelayed_get_table(*args,**kwargs):
5520-
frozen_time.tick(delta=1.4)
5521-
returnorig_get_table(*args,**kwargs)
5522-
5523-
orig_get_table=client.get_table
5524-
client.get_table=mock.Mock(side_effect=delayed_get_table)
5525-
5526-
client.list_partitions(table,timeout=5.0)
5527-
5528-
client.get_table.assert_called_once()
5529-
_,kwargs=client.get_table.call_args
5530-
self.assertEqual(kwargs.get("timeout"),5.0)
5531-
5532-
client._connection.api_request.assert_called()
5533-
_,kwargs=client._connection.api_request.call_args
5534-
self.assertAlmostEqual(kwargs.get("timeout"),3.6,places=5)
5535-
55365498
deftest_list_rows(self):
55375499
importdatetime
55385500
fromgoogle.cloud._helpersimportUTC
@@ -5918,46 +5880,6 @@ def test_list_rows_with_missing_schema(self):
59185880
self.assertEqual(rows[1].age,31,msg=repr(table))
59195881
self.assertIsNone(rows[2].age,msg=repr(table))
59205882

5921-
deftest_list_rows_splitting_timout_between_requests(self):
5922-
fromgoogle.cloud.bigquery.schemaimportSchemaField
5923-
fromgoogle.cloud.bigquery.tableimportTable
5924-
5925-
response= {"totalRows":"0","rows": []}
5926-
creds=_make_credentials()
5927-
http=object()
5928-
client=self._make_one(project=self.PROJECT,credentials=creds,_http=http)
5929-
client._connection=make_connection(response,response)
5930-
5931-
table=Table(
5932-
self.TABLE_REF,schema=[SchemaField("field_x","INTEGER",mode="NULLABLE")]
5933-
)
5934-
5935-
withfreezegun.freeze_time("1970-01-01 00:00:00",tick=False)asfrozen_time:
5936-
5937-
defdelayed_get_table(*args,**kwargs):
5938-
frozen_time.tick(delta=1.4)
5939-
returntable
5940-
5941-
client.get_table=mock.Mock(side_effect=delayed_get_table)
5942-
5943-
rows_iter=client.list_rows(
5944-
"{}.{}.{}".format(
5945-
self.TABLE_REF.project,
5946-
self.TABLE_REF.dataset_id,
5947-
self.TABLE_REF.table_id,
5948-
),
5949-
timeout=5.0,
5950-
)
5951-
six.next(rows_iter.pages)
5952-
5953-
client.get_table.assert_called_once()
5954-
_,kwargs=client.get_table.call_args
5955-
self.assertEqual(kwargs.get("timeout"),5.0)
5956-
5957-
client._connection.api_request.assert_called_once()
5958-
_,kwargs=client._connection.api_request.call_args
5959-
self.assertAlmostEqual(kwargs.get("timeout"),3.6)
5960-
59615883
deftest_list_rows_error(self):
59625884
creds=_make_credentials()
59635885
http=object()

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp