Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9a8906e

Browse files
committed
feat: Third batch of AsyncIO integration
* LRO client* gRPC wrappers & helpers* With unit tests & docs
1 parentdd9b2f3 commit9a8906e

File tree

10 files changed

+1337
-10
lines changed

10 files changed

+1337
-10
lines changed

‎google/api_core/gapic_v1/__init__.py‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,6 @@
2323

2424
ifsys.version_info>= (3,6):
2525
fromgoogle.api_core.gapic_v1importconfig_async# noqa: F401
26+
fromgoogle.api_core.gapic_v1importmethod_async# noqa: F401
2627
__all__.append("config_async")
28+
__all__.append("method_async")
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# Copyright 2020 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""AsyncIO helpers for wrapping gRPC methods with common functionality.
15+
16+
This is used by gapic clients to provide common error mapping, retry, timeout,
17+
pagination, and long-running operations to gRPC methods.
18+
"""
19+
20+
fromgoogle.api_coreimportgeneral_helpers,grpc_helpers_async
21+
fromgoogle.api_core.gapic_v1importclient_info
22+
fromgoogle.api_core.gapic_v1.methodimport (_GapicCallable,# noqa: F401
23+
DEFAULT,
24+
USE_DEFAULT_METADATA)
25+
26+
27+
defwrap_method(
28+
func,
29+
default_retry=None,
30+
default_timeout=None,
31+
client_info=client_info.DEFAULT_CLIENT_INFO,
32+
):
33+
"""Wrap an async RPC method with common behavior.
34+
35+
Returns:
36+
Callable: A new callable that takes optional ``retry`` and ``timeout``
37+
arguments and applies the common error mapping, retry, timeout,
38+
and metadata behavior to the low-level RPC method.
39+
"""
40+
func=grpc_helpers_async.wrap_errors(func)
41+
42+
ifclient_infoisnotNone:
43+
user_agent_metadata= [client_info.to_grpc_metadata()]
44+
else:
45+
user_agent_metadata=None
46+
47+
returngeneral_helpers.wraps(func)(_GapicCallable(
48+
func,default_retry,default_timeout,metadata=user_agent_metadata))

‎google/api_core/grpc_helpers.py‎

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -170,13 +170,10 @@ def wrap_errors(callable_):
170170
return_wrap_unary_errors(callable_)
171171

172172

173-
defcreate_channel(
174-
target,credentials=None,scopes=None,ssl_credentials=None,**kwargs
175-
):
176-
"""Create a secure channel with credentials.
173+
def_create_composite_credentials(credentials=None,scopes=None,ssl_credentials=None):
174+
"""Create the composite credentials for secure channels.
177175
178176
Args:
179-
target (str): The target service address in the format 'hostname:port'.
180177
credentials (google.auth.credentials.Credentials): The credentials. If
181178
not specified, then this function will attempt to ascertain the
182179
credentials from the environment using :func:`google.auth.default`.
@@ -185,11 +182,9 @@ def create_channel(
185182
are passed to :func:`google.auth.default`.
186183
ssl_credentials (grpc.ChannelCredentials): Optional SSL channel
187184
credentials. This can be used to specify different certificates.
188-
kwargs: Additional key-word args passed to
189-
:func:`grpc_gcp.secure_channel` or :func:`grpc.secure_channel`.
190185
191186
Returns:
192-
grpc.Channel: Thecreated channel.
187+
grpc.ChannelCredentials: Thecomposed channel credentials object.
193188
"""
194189
ifcredentialsisNone:
195190
credentials,_=google.auth.default(scopes=scopes)
@@ -212,10 +207,34 @@ def create_channel(
212207
ssl_credentials=grpc.ssl_channel_credentials()
213208

214209
# Combine the ssl credentials and the authorization credentials.
215-
composite_credentials=grpc.composite_channel_credentials(
210+
returngrpc.composite_channel_credentials(
216211
ssl_credentials,google_auth_credentials
217212
)
218213

214+
215+
defcreate_channel(target,credentials=None,scopes=None,ssl_credentials=None,**kwargs):
216+
"""Create a secure channel with credentials.
217+
218+
Args:
219+
target (str): The target service address in the format 'hostname:port'.
220+
credentials (google.auth.credentials.Credentials): The credentials. If
221+
not specified, then this function will attempt to ascertain the
222+
credentials from the environment using :func:`google.auth.default`.
223+
scopes (Sequence[str]): A optional list of scopes needed for this
224+
service. These are only used when credentials are not specified and
225+
are passed to :func:`google.auth.default`.
226+
ssl_credentials (grpc.ChannelCredentials): Optional SSL channel
227+
credentials. This can be used to specify different certificates.
228+
kwargs: Additional key-word args passed to
229+
:func:`grpc_gcp.secure_channel` or :func:`grpc.secure_channel`.
230+
231+
Returns:
232+
grpc.Channel: The created channel.
233+
"""
234+
composite_credentials=_create_composite_credentials(
235+
credentials,scopes,ssl_credentials
236+
)
237+
219238
ifHAS_GRPC_GCP:
220239
# If grpc_gcp module is available use grpc_gcp.secure_channel,
221240
# otherwise, use grpc.secure_channel to create grpc channel.
Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
# Copyright 2020 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""AsyncIO helpers for :mod:`grpc` supporting 3.6+.
16+
17+
Please combine more detailed docstring in grpc_helpers.py to use following
18+
functions. This module is implementing the same surface with AsyncIO semantics.
19+
"""
20+
21+
importasyncio
22+
importfunctools
23+
24+
importgrpc
25+
fromgrpc.experimentalimportaio
26+
27+
fromgoogle.api_coreimportexceptions,grpc_helpers
28+
29+
30+
# TODO(lidiz) Support gRPC GCP wrapper
31+
HAS_GRPC_GCP=False
32+
33+
# NOTE(lidiz) Alternatively, we can hack "__getattribute__" to perform
34+
# automatic patching for us. But that means the overhead of creating an
35+
# extra Python function spreads to every single send and receive.
36+
37+
38+
class_WrappedCall(aio.Call):
39+
40+
def__init__(self):
41+
self._call=None
42+
43+
defwith_call(self,call):
44+
"""Supplies the call object separately to keep __init__ clean."""
45+
self._call=call
46+
returnself
47+
48+
asyncdefinitial_metadata(self):
49+
returnawaitself._call.initial_metadata()
50+
51+
asyncdeftrailing_metadata(self):
52+
returnawaitself._call.trailing_metadata()
53+
54+
asyncdefcode(self):
55+
returnawaitself._call.code()
56+
57+
asyncdefdetails(self):
58+
returnawaitself._call.details()
59+
60+
defcancelled(self):
61+
returnself._call.cancelled()
62+
63+
defdone(self):
64+
returnself._call.done()
65+
66+
deftime_remaining(self):
67+
returnself._call.time_remaining()
68+
69+
defcancel(self):
70+
returnself._call.cancel()
71+
72+
defadd_done_callback(self,callback):
73+
self._call.add_done_callback(callback)
74+
75+
asyncdefwait_for_connection(self):
76+
try:
77+
awaitself._call.wait_for_connection()
78+
exceptgrpc.RpcErrorasrpc_error:
79+
raiseexceptions.from_grpc_error(rpc_error)fromrpc_error
80+
81+
82+
class_WrappedUnaryResponseMixin(_WrappedCall):
83+
84+
def__await__(self):
85+
try:
86+
response=yieldfromself._call.__await__()
87+
returnresponse
88+
exceptgrpc.RpcErrorasrpc_error:
89+
raiseexceptions.from_grpc_error(rpc_error)fromrpc_error
90+
91+
92+
class_WrappedStreamResponseMixin(_WrappedCall):
93+
94+
def__init__(self):
95+
self._wrapped_async_generator=None
96+
97+
asyncdefread(self):
98+
try:
99+
returnawaitself._call.read()
100+
exceptgrpc.RpcErrorasrpc_error:
101+
raiseexceptions.from_grpc_error(rpc_error)fromrpc_error
102+
103+
asyncdef_wrapped_aiter(self):
104+
try:
105+
# NOTE(lidiz) coverage doesn't understand the exception raised from
106+
# __anext__ method. It is covered by test case:
107+
# test_wrap_stream_errors_aiter_non_rpc_error
108+
asyncforresponseinself._call:# pragma: no branch
109+
yieldresponse
110+
exceptgrpc.RpcErrorasrpc_error:
111+
raiseexceptions.from_grpc_error(rpc_error)fromrpc_error
112+
113+
def__aiter__(self):
114+
ifnotself._wrapped_async_generator:
115+
self._wrapped_async_generator=self._wrapped_aiter()
116+
returnself._wrapped_async_generator
117+
118+
119+
class_WrappedStreamRequestMixin(_WrappedCall):
120+
121+
asyncdefwrite(self,request):
122+
try:
123+
awaitself._call.write(request)
124+
exceptgrpc.RpcErrorasrpc_error:
125+
raiseexceptions.from_grpc_error(rpc_error)fromrpc_error
126+
127+
asyncdefdone_writing(self):
128+
try:
129+
awaitself._call.done_writing()
130+
exceptgrpc.RpcErrorasrpc_error:
131+
raiseexceptions.from_grpc_error(rpc_error)fromrpc_error
132+
133+
134+
# NOTE(lidiz) Implementing each individual class separately, so we don't
135+
# expose any API that should not be seen. E.g., __aiter__ in unary-unary
136+
# RPC, or __await__ in stream-stream RPC.
137+
class_WrappedUnaryUnaryCall(_WrappedUnaryResponseMixin,aio.UnaryUnaryCall):
138+
"""Wrapped UnaryUnaryCall to map exceptions."""
139+
140+
141+
class_WrappedUnaryStreamCall(_WrappedStreamResponseMixin,aio.UnaryStreamCall):
142+
"""Wrapped UnaryStreamCall to map exceptions."""
143+
144+
145+
class_WrappedStreamUnaryCall(_WrappedUnaryResponseMixin,_WrappedStreamRequestMixin,aio.StreamUnaryCall):
146+
"""Wrapped StreamUnaryCall to map exceptions."""
147+
148+
149+
class_WrappedStreamStreamCall(_WrappedStreamRequestMixin,_WrappedStreamResponseMixin,aio.StreamStreamCall):
150+
"""Wrapped StreamStreamCall to map exceptions."""
151+
152+
153+
def_wrap_unary_errors(callable_):
154+
"""Map errors for Unary-Unary async callables."""
155+
grpc_helpers._patch_callable_name(callable_)
156+
157+
@functools.wraps(callable_)
158+
deferror_remapped_callable(*args,**kwargs):
159+
call=callable_(*args,**kwargs)
160+
return_WrappedUnaryUnaryCall().with_call(call)
161+
162+
returnerror_remapped_callable
163+
164+
165+
def_wrap_stream_errors(callable_):
166+
"""Map errors for streaming RPC async callables."""
167+
grpc_helpers._patch_callable_name(callable_)
168+
169+
@functools.wraps(callable_)
170+
asyncdeferror_remapped_callable(*args,**kwargs):
171+
call=callable_(*args,**kwargs)
172+
173+
ifisinstance(call,aio.UnaryStreamCall):
174+
call=_WrappedUnaryStreamCall().with_call(call)
175+
elifisinstance(call,aio.StreamUnaryCall):
176+
call=_WrappedStreamUnaryCall().with_call(call)
177+
elifisinstance(call,aio.StreamStreamCall):
178+
call=_WrappedStreamStreamCall().with_call(call)
179+
else:
180+
raiseTypeError('Unexpected type of call %s'%type(call))
181+
182+
awaitcall.wait_for_connection()
183+
returncall
184+
185+
returnerror_remapped_callable
186+
187+
188+
defwrap_errors(callable_):
189+
"""Wrap a gRPC async callable and map :class:`grpc.RpcErrors` to
190+
friendly error classes.
191+
192+
Errors raised by the gRPC callable are mapped to the appropriate
193+
:class:`google.api_core.exceptions.GoogleAPICallError` subclasses. The
194+
original `grpc.RpcError` (which is usually also a `grpc.Call`) is
195+
available from the ``response`` property on the mapped exception. This
196+
is useful for extracting metadata from the original error.
197+
198+
Args:
199+
callable_ (Callable): A gRPC callable.
200+
201+
Returns: Callable: The wrapped gRPC callable.
202+
"""
203+
ifisinstance(callable_,aio.UnaryUnaryMultiCallable):
204+
return_wrap_unary_errors(callable_)
205+
else:
206+
return_wrap_stream_errors(callable_)
207+
208+
209+
defcreate_channel(target,credentials=None,scopes=None,ssl_credentials=None,**kwargs):
210+
"""Create an AsyncIO secure channel with credentials.
211+
212+
Args:
213+
target (str): The target service address in the format 'hostname:port'.
214+
credentials (google.auth.credentials.Credentials): The credentials. If
215+
not specified, then this function will attempt to ascertain the
216+
credentials from the environment using :func:`google.auth.default`.
217+
scopes (Sequence[str]): A optional list of scopes needed for this
218+
service. These are only used when credentials are not specified and
219+
are passed to :func:`google.auth.default`.
220+
ssl_credentials (grpc.ChannelCredentials): Optional SSL channel
221+
credentials. This can be used to specify different certificates.
222+
kwargs: Additional key-word args passed to :func:`aio.secure_channel`.
223+
224+
Returns:
225+
aio.Channel: The created channel.
226+
"""
227+
composite_credentials=grpc_helpers._create_composite_credentials(
228+
credentials,scopes,ssl_credentials
229+
)
230+
231+
returnaio.secure_channel(target,composite_credentials,**kwargs)
232+
233+
234+
classFakeUnaryUnaryCall(_WrappedUnaryUnaryCall):
235+
"""Fake implementation for unary-unary RPCs.
236+
237+
It is a dummy object for response message. Supply the intended response
238+
upon the initialization, and the coroutine will return the exact response
239+
message.
240+
"""
241+
242+
def__init__(self,response=object()):
243+
self.response=response
244+
self._future=asyncio.get_event_loop().create_future()
245+
self._future.set_result(self.response)
246+
247+
def__await__(self):
248+
response=yieldfromself._future.__await__()
249+
returnresponse
250+
251+
252+
classFakeStreamUnaryCall(_WrappedStreamUnaryCall):
253+
"""Fake implementation for stream-unary RPCs.
254+
255+
It is a dummy object for response message. Supply the intended response
256+
upon the initialization, and the coroutine will return the exact response
257+
message.
258+
"""
259+
260+
def__init__(self,response=object()):
261+
self.response=response
262+
self._future=asyncio.get_event_loop().create_future()
263+
self._future.set_result(self.response)
264+
265+
def__await__(self):
266+
response=yieldfromself._future.__await__()
267+
returnresponse
268+
269+
asyncdefwait_for_connection(self):
270+
pass

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp