Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit2b103b6

Browse files
authored
fix: consume part of StreamingResponseIterator to support failure while under a retry context (#10206)
1 parent14f1f34 commit2b103b6

File tree

2 files changed

+54
-5
lines changed

2 files changed

+54
-5
lines changed

‎google/api_core/grpc_helpers.py‎

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,19 @@ class _StreamingResponseIterator(grpc.Call):
6565
def__init__(self,wrapped):
6666
self._wrapped=wrapped
6767

68+
# This iterator is used in a retry context, and returned outside after init.
69+
# gRPC will not throw an exception until the stream is consumed, so we need
70+
# to retrieve the first result, in order to fail, in order to trigger a retry.
71+
try:
72+
self._stored_first_result=six.next(self._wrapped)
73+
exceptTypeError:
74+
# It is possible the wrapped method isn't an iterable (a grpc.Call
75+
# for instance). If this happens don't store the first result.
76+
pass
77+
exceptStopIteration:
78+
# ignore stop iteration at this time. This should be handled outside of retry.
79+
pass
80+
6881
def__iter__(self):
6982
"""This iterator is also an iterable that returns itself."""
7083
returnself
@@ -76,8 +89,13 @@ def next(self):
7689
protobuf.Message: A single response from the stream.
7790
"""
7891
try:
92+
ifhasattr(self,"_stored_first_result"):
93+
result=self._stored_first_result
94+
delself._stored_first_result
95+
returnresult
7996
returnsix.next(self._wrapped)
8097
exceptgrpc.RpcErrorasexc:
98+
# If the stream has already returned data, we cannot recover here.
8199
six.raise_from(exceptions.from_grpc_error(exc),exc)
82100

83101
# Alias needed for Python 2/3 support.

‎tests/unit/test_grpc_helpers.py‎

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -129,24 +129,55 @@ def test_wrap_stream_errors_invocation():
129129
assertexc_info.value.response==grpc_error
130130

131131

132+
deftest_wrap_stream_empty_iterator():
133+
expected_responses= []
134+
callable_=mock.Mock(spec=["__call__"],return_value=iter(expected_responses))
135+
136+
wrapped_callable=grpc_helpers._wrap_stream_errors(callable_)
137+
138+
got_iterator=wrapped_callable()
139+
140+
responses=list(got_iterator)
141+
142+
callable_.assert_called_once_with()
143+
assertresponses==expected_responses
144+
145+
132146
classRpcResponseIteratorImpl(object):
133-
def__init__(self,exception):
134-
self._exception=exception
147+
def__init__(self,iterable):
148+
self._iterable=iter(iterable)
135149

136150
defnext(self):
137-
raiseself._exception
151+
next_item=next(self._iterable)
152+
ifisinstance(next_item,RpcErrorImpl):
153+
raisenext_item
154+
returnnext_item
138155

139156
__next__=next
140157

141158

142-
deftest_wrap_stream_errors_iterator():
159+
deftest_wrap_stream_errors_iterator_initialization():
143160
grpc_error=RpcErrorImpl(grpc.StatusCode.UNAVAILABLE)
144-
response_iter=RpcResponseIteratorImpl(grpc_error)
161+
response_iter=RpcResponseIteratorImpl([grpc_error])
145162
callable_=mock.Mock(spec=["__call__"],return_value=response_iter)
146163

147164
wrapped_callable=grpc_helpers._wrap_stream_errors(callable_)
148165

166+
withpytest.raises(exceptions.ServiceUnavailable)asexc_info:
167+
wrapped_callable(1,2,three="four")
168+
169+
callable_.assert_called_once_with(1,2,three="four")
170+
assertexc_info.value.response==grpc_error
171+
172+
173+
deftest_wrap_stream_errors_during_iteration():
174+
grpc_error=RpcErrorImpl(grpc.StatusCode.UNAVAILABLE)
175+
response_iter=RpcResponseIteratorImpl([1,grpc_error])
176+
callable_=mock.Mock(spec=["__call__"],return_value=response_iter)
177+
178+
wrapped_callable=grpc_helpers._wrap_stream_errors(callable_)
149179
got_iterator=wrapped_callable(1,2,three="four")
180+
next(got_iterator)
150181

151182
withpytest.raises(exceptions.ServiceUnavailable)asexc_info:
152183
next(got_iterator)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp