Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9530548

Browse files
authored
feat: support for async bidi streaming apis (#836)
1 parent8168988 commit9530548

File tree

4 files changed

+676
-63
lines changed

4 files changed

+676
-63
lines changed

‎google/api_core/bidi.py‎

Lines changed: 39 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
"""Bi-directional streaming RPC helpers."""
15+
"""Helpers for synchronous bidirectional streaming RPCs."""
1616

1717
importcollections
1818
importdatetime
@@ -22,6 +22,7 @@
2222
importtime
2323

2424
fromgoogle.api_coreimportexceptions
25+
fromgoogle.api_core.bidi_baseimportBidiRpcBase
2526

2627
_LOGGER=logging.getLogger(__name__)
2728
_BIDIRECTIONAL_CONSUMER_NAME="Thread-ConsumeBidirectionalStream"
@@ -36,21 +37,6 @@ class _RequestQueueGenerator(object):
3637
otherwise open-ended set of requests to send through a request-streaming
3738
(or bidirectional) RPC.
3839
39-
The reason this is necessary is because gRPC takes an iterator as the
40-
request for request-streaming RPCs. gRPC consumes this iterator in another
41-
thread to allow it to block while generating requests for the stream.
42-
However, if the generator blocks indefinitely gRPC will not be able to
43-
clean up the thread as it'll be blocked on `next(iterator)` and not be able
44-
to check the channel status to stop iterating. This helper mitigates that
45-
by waiting on the queue with a timeout and checking the RPC state before
46-
yielding.
47-
48-
Finally, it allows for retrying without swapping queues because if it does
49-
pull an item off the queue when the RPC is inactive, it'll immediately put
50-
it back and then exit. This is necessary because yielding the item in this
51-
case will cause gRPC to discard it. In practice, this means that the order
52-
of messages is not guaranteed. If such a thing is necessary it would be
53-
easy to use a priority queue.
5440
5541
Example::
5642
@@ -62,12 +48,6 @@ class _RequestQueueGenerator(object):
6248
print(response)
6349
q.put(...)
6450
65-
Note that it is possible to accomplish this behavior without "spinning"
66-
(using a queue timeout). One possible way would be to use more threads to
67-
multiplex the grpc end event with the queue, another possible way is to
68-
use selectors and a custom event/queue object. Both of these approaches
69-
are significant from an engineering perspective for small benefit - the
70-
CPU consumed by spinning is pretty minuscule.
7151
7252
Args:
7353
queue (queue_module.Queue): The request queue.
@@ -96,6 +76,31 @@ def _is_active(self):
9676
returnself.callisNoneorself.call.is_active()
9777

9878
def__iter__(self):
79+
# The reason this is necessary is because gRPC takes an iterator as the
80+
# request for request-streaming RPCs. gRPC consumes this iterator in
81+
# another thread to allow it to block while generating requests for
82+
# the stream. However, if the generator blocks indefinitely gRPC will
83+
# not be able to clean up the thread as it'll be blocked on
84+
# `next(iterator)` and not be able to check the channel status to stop
85+
# iterating. This helper mitigates that by waiting on the queue with
86+
# a timeout and checking the RPC state before yielding.
87+
#
88+
# Finally, it allows for retrying without swapping queues because if
89+
# it does pull an item off the queue when the RPC is inactive, it'll
90+
# immediately put it back and then exit. This is necessary because
91+
# yielding the item in this case will cause gRPC to discard it. In
92+
# practice, this means that the order of messages is not guaranteed.
93+
# If such a thing is necessary it would be easy to use a priority
94+
# queue.
95+
#
96+
# Note that it is possible to accomplish this behavior without
97+
# "spinning" (using a queue timeout). One possible way would be to use
98+
# more threads to multiplex the grpc end event with the queue, another
99+
# possible way is to use selectors and a custom event/queue object.
100+
# Both of these approaches are significant from an engineering
101+
# perspective for small benefit - the CPU consumed by spinning is
102+
# pretty minuscule.
103+
99104
ifself._initial_requestisnotNone:
100105
ifcallable(self._initial_request):
101106
yieldself._initial_request()
@@ -201,7 +206,7 @@ def __repr__(self):
201206
)
202207

203208

204-
classBidiRpc(object):
209+
classBidiRpc(BidiRpcBase):
205210
"""A helper for consuming a bi-directional streaming RPC.
206211
207212
This maps gRPC's built-in interface which uses a request iterator and a
@@ -227,6 +232,8 @@ class BidiRpc(object):
227232
rpc.send(example_pb2.StreamingRpcRequest(
228233
data='example'))
229234
235+
rpc.close()
236+
230237
This does *not* retry the stream on errors. See :class:`ResumableBidiRpc`.
231238
232239
Args:
@@ -240,40 +247,14 @@ class BidiRpc(object):
240247
the request.
241248
"""
242249

243-
def__init__(self,start_rpc,initial_request=None,metadata=None):
244-
self._start_rpc=start_rpc
245-
self._initial_request=initial_request
246-
self._rpc_metadata=metadata
247-
self._request_queue=queue_module.Queue()
248-
self._request_generator=None
249-
self._is_active=False
250-
self._callbacks= []
251-
self.call=None
252-
253-
defadd_done_callback(self,callback):
254-
"""Adds a callback that will be called when the RPC terminates.
255-
256-
This occurs when the RPC errors or is successfully terminated.
257-
258-
Args:
259-
callback (Callable[[grpc.Future], None]): The callback to execute.
260-
It will be provided with the same gRPC future as the underlying
261-
stream which will also be a :class:`grpc.Call`.
262-
"""
263-
self._callbacks.append(callback)
264-
265-
def_on_call_done(self,future):
266-
# This occurs when the RPC errors or is successfully terminated.
267-
# Note that grpc's "future" here can also be a grpc.RpcError.
268-
# See note in https://github.com/grpc/grpc/issues/10885#issuecomment-302651331
269-
# that `grpc.RpcError` is also `grpc.call`.
270-
forcallbackinself._callbacks:
271-
callback(future)
250+
def_create_queue(self):
251+
"""Create a queue for requests."""
252+
returnqueue_module.Queue()
272253

273254
defopen(self):
274255
"""Opens the stream."""
275256
ifself.is_active:
276-
raiseValueError("Can not open an already open stream.")
257+
raiseValueError("Cannot open an already open stream.")
277258

278259
request_generator=_RequestQueueGenerator(
279260
self._request_queue,initial_request=self._initial_request
@@ -322,7 +303,7 @@ def send(self, request):
322303
request (protobuf.Message): The request to send.
323304
"""
324305
ifself.callisNone:
325-
raiseValueError("Can notsend() on an RPC that has never beenopen()ed.")
306+
raiseValueError("Cannotsend on an RPCstreamthat has never beenopened.")
326307

327308
# Don't use self.is_active(), as ResumableBidiRpc will overload it
328309
# to mean something semantically different.
@@ -343,20 +324,15 @@ def recv(self):
343324
protobuf.Message: The received message.
344325
"""
345326
ifself.callisNone:
346-
raiseValueError("Can notrecv() on an RPC that has never beenopen()ed.")
327+
raiseValueError("Cannotrecv on an RPCstreamthat has never beenopened.")
347328

348329
returnnext(self.call)
349330

350331
@property
351332
defis_active(self):
352-
"""bool:True if this stream is currently open and active."""
333+
"""True if this stream is currently open and active."""
353334
returnself.callisnotNoneandself.call.is_active()
354335

355-
@property
356-
defpending_requests(self):
357-
"""int: Returns an estimate of the number of queued requests."""
358-
returnself._request_queue.qsize()
359-
360336

361337
def_never_terminate(future_or_error):
362338
"""By default, no errors cause BiDi termination."""
@@ -544,7 +520,7 @@ def _send(self, request):
544520
call=self.call
545521

546522
ifcallisNone:
547-
raiseValueError("Can notsend() on an RPC that has never beenopen()ed.")
523+
raiseValueError("Cannotsend on an RPC that has never beenopened.")
548524

549525
# Don't use self.is_active(), as ResumableBidiRpc will overload it
550526
# to mean something semantically different.
@@ -563,7 +539,7 @@ def _recv(self):
563539
call=self.call
564540

565541
ifcallisNone:
566-
raiseValueError("Can notrecv() on an RPC that has never beenopen()ed.")
542+
raiseValueError("Cannotrecv on an RPC that has never beenopened.")
567543

568544
returnnext(call)
569545

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp