Source code for grpc.aio._call

# Copyright 2019 gRPC authors.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at##     http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License."""Invocation-side implementation of gRPC Asyncio Python."""importasyncioimportenumfromfunctoolsimportpartialimportinspectimportloggingimporttracebackfromtypingimport(Any,AsyncIterator,Generator,Generic,Optional,Tuple,Union,)importgrpcfromgrpcimport_commonfromgrpc._cythonimportcygrpcfrom.import_base_callfrom._metadataimportMetadatafrom._typingimportDeserializingFunctionfrom._typingimportDoneCallbackTypefrom._typingimportEOFTypefrom._typingimportMetadataTypefrom._typingimportMetadatumTypefrom._typingimportRequestIterableTypefrom._typingimportRequestTypefrom._typingimportResponseTypefrom._typingimportSerializingFunction__all__="AioRpcError","Call","UnaryUnaryCall","UnaryStreamCall"_LOCAL_CANCELLATION_DETAILS="Locally cancelled by application!"_GC_CANCELLATION_DETAILS="Cancelled upon garbage collection!"_RPC_ALREADY_FINISHED_DETAILS="RPC already finished."_RPC_HALF_CLOSED_DETAILS='RPC is half closed after calling "done_writing".'_API_STYLE_ERROR=("The iterator and read/write APIs may not be mixed on a single RPC.")_OK_CALL_REPRESENTATION=('<{} of RPC that terminated with:\n\tstatus ={}\n\tdetails = "{}"\n>')_NON_OK_CALL_REPRESENTATION=("<{} of RPC that terminated with:\n""\tstatus ={}\n"'\tdetails = "{}"\n''\tdebug_error_string = "{}"\n'">")_LOGGER=logging.getLogger(__name__)
[docs]classAioRpcError(grpc.RpcError):"""An implementation of RpcError to be used by the asynchronous API. Raised RpcError is a snapshot of the final status of the RPC, values are determined. Hence, its methods no longer needs to be coroutines. """_code:grpc.StatusCode_details:Optional[str]_initial_metadata:Optional[Metadata]_trailing_metadata:Optional[Metadata]_debug_error_string:Optional[str]def__init__(self,code:grpc.StatusCode,initial_metadata:Metadata,trailing_metadata:Metadata,details:Optional[str]=None,debug_error_string:Optional[str]=None,)->None:"""Constructor. Args: code: The status code with which the RPC has been finalized. initial_metadata: Optional initial metadata that could be sent by the Server. trailing_metadata: Optional metadata that could be sent by the Server. details: Optional details explaining the reason of the error. debug_error_string: Optional string """super().__init__()self._code=codeself._details=detailsself._initial_metadata=initial_metadataself._trailing_metadata=trailing_metadataself._debug_error_string=debug_error_string
[docs]defcode(self)->grpc.StatusCode:"""Accesses the status code sent by the server. Returns: The `grpc.StatusCode` status code. """returnself._code
[docs]defdetails(self)->Optional[str]:"""Accesses the details sent by the server. Returns: The description of the error. """returnself._details
[docs]definitial_metadata(self)->Metadata:"""Accesses the initial metadata sent by the server. Returns: The initial metadata received. """returnself._initial_metadata
[docs]deftrailing_metadata(self)->Metadata:"""Accesses the trailing metadata sent by the server. Returns: The trailing metadata received. """returnself._trailing_metadata
[docs]defdebug_error_string(self)->str:"""Accesses the debug error string sent by the server. Returns: The debug error string received. """returnself._debug_error_string
def_repr(self)->str:"""Assembles the error string for the RPC error."""return_NON_OK_CALL_REPRESENTATION.format(self.__class__.__name__,self._code,self._details,self._debug_error_string,)def__repr__(self)->str:returnself._repr()def__str__(self)->str:returnself._repr()def__reduce__(self):return(type(self),(self._code,self._initial_metadata,self._trailing_metadata,self._details,self._debug_error_string,),)
def_create_rpc_error(initial_metadata:MetadataType,status:cygrpc.AioRpcStatus,)->AioRpcError:returnAioRpcError(_common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[status.code()],Metadata.from_tuple(initial_metadata),Metadata.from_tuple(status.trailing_metadata()),details=status.details(),debug_error_string=status.debug_error_string(),)classCall:"""Base implementation of client RPC Call object. Implements logic around final status, metadata and cancellation. """_loop:asyncio.AbstractEventLoop_code:grpc.StatusCode_cython_call:cygrpc._AioCall_metadata:Tuple[MetadatumType,...]_request_serializer:Optional[SerializingFunction]_response_deserializer:Optional[DeserializingFunction]def__init__(self,cython_call:cygrpc._AioCall,metadata:Metadata,request_serializer:Optional[SerializingFunction],response_deserializer:Optional[DeserializingFunction],loop:asyncio.AbstractEventLoop,)->None:self._loop=loopself._cython_call=cython_callself._metadata=tuple(metadata)self._request_serializer=request_serializerself._response_deserializer=response_deserializerdef__del__(self)->None:# The '_cython_call' object might be destructed before Call objectifhasattr(self,"_cython_call"):ifnotself._cython_call.done():self._cancel(_GC_CANCELLATION_DETAILS)defcancelled(self)->bool:returnself._cython_call.cancelled()def_cancel(self,details:str)->bool:"""Forwards the application cancellation reasoning."""ifnotself._cython_call.done():self._cython_call.cancel(details)returnTruereturnFalsedefcancel(self)->bool:returnself._cancel(_LOCAL_CANCELLATION_DETAILS)defdone(self)->bool:returnself._cython_call.done()defadd_done_callback(self,callback:DoneCallbackType)->None:cb=partial(callback,self)self._cython_call.add_done_callback(cb)deftime_remaining(self)->Optional[float]:returnself._cython_call.time_remaining()asyncdefinitial_metadata(self)->Metadata:raw_metadata_tuple=awaitself._cython_call.initial_metadata()returnMetadata.from_tuple(raw_metadata_tuple)asyncdeftrailing_metadata(self)->Metadata:raw_metadata_tuple=(awaitself._cython_call.status()).trailing_metadata()returnMetadata.from_tuple(raw_metadata_tuple)asyncdefcode(self)->grpc.StatusCode:cygrpc_code=(awaitself._cython_call.status()).code()return_common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[cygrpc_code]asyncdefdetails(self)->str:return(awaitself._cython_call.status()).details()asyncdefdebug_error_string(self)->str:return(awaitself._cython_call.status()).debug_error_string()asyncdef_raise_for_status(self)->None:ifself._cython_call.is_locally_cancelled():raiseasyncio.CancelledError()code=awaitself.code()ifcode!=grpc.StatusCode.OK:raise_create_rpc_error(awaitself.initial_metadata(),awaitself._cython_call.status(),)def_repr(self)->str:returnrepr(self._cython_call)def__repr__(self)->str:returnself._repr()def__str__(self)->str:returnself._repr()class_APIStyle(enum.IntEnum):UNKNOWN=0ASYNC_GENERATOR=1READER_WRITER=2class_UnaryResponseMixin(Call,Generic[ResponseType]):_call_response:asyncio.Taskdef_init_unary_response_mixin(self,response_task:asyncio.Task):self._call_response=response_taskdefcancel(self)->bool:ifsuper().cancel():self._call_response.cancel()returnTruereturnFalsedef__await__(self)->Generator[Any,None,ResponseType]:"""Wait till the ongoing RPC request finishes."""try:response=yield fromself._call_responseexceptasyncio.CancelledError:# Even if we caught all other CancelledError, there is still# this corner case. If the application cancels immediately after# the Call object is created, we will observe this# `CancelledError`.ifnotself.cancelled():self.cancel()raise# NOTE(lidiz) If we raise RpcError in the task, and users doesn't# 'await' on it. AsyncIO will log 'Task exception was never retrieved'.# Instead, if we move the exception raising here, the spam stops.# Unfortunately, there can only be one 'yield from' in '__await__'. So,# we need to access the private instance variable.ifresponseiscygrpc.EOF:ifself._cython_call.is_locally_cancelled():raiseasyncio.CancelledError()else:raise_create_rpc_error(self._cython_call._initial_metadata,self._cython_call._status,)else:returnresponseclass_StreamResponseMixin(Call):_message_aiter:AsyncIterator[ResponseType]_preparation:asyncio.Task_response_style:_APIStyledef_init_stream_response_mixin(self,preparation:asyncio.Task):self._message_aiter=Noneself._preparation=preparationself._response_style=_APIStyle.UNKNOWNdef_update_response_style(self,style:_APIStyle):ifself._response_styleis_APIStyle.UNKNOWN:self._response_style=styleelifself._response_styleisnotstyle:raisecygrpc.UsageError(_API_STYLE_ERROR)defcancel(self)->bool:ifsuper().cancel():self._preparation.cancel()returnTruereturnFalseasyncdef_fetch_stream_responses(self)->ResponseType:message=awaitself._read()whilemessageisnotcygrpc.EOF:yieldmessagemessage=awaitself._read()# If the read operation failed, Core should explain why.awaitself._raise_for_status()def__aiter__(self)->AsyncIterator[ResponseType]:self._update_response_style(_APIStyle.ASYNC_GENERATOR)ifself._message_aiterisNone:self._message_aiter=self._fetch_stream_responses()returnself._message_aiterasyncdef_read(self)->ResponseType:# Wait for the request being sentawaitself._preparation# Reads response message from Coretry:raw_response=awaitself._cython_call.receive_serialized_message()exceptasyncio.CancelledError:ifnotself.cancelled():self.cancel()raiseifraw_responseiscygrpc.EOF:returncygrpc.EOFreturn_common.deserialize(raw_response,self._response_deserializer)asyncdefread(self)->Union[EOFType,ResponseType]:ifself.done():awaitself._raise_for_status()returncygrpc.EOFself._update_response_style(_APIStyle.READER_WRITER)response_message=awaitself._read()ifresponse_messageiscygrpc.EOF:# If the read operation failed, Core should explain why.awaitself._raise_for_status()returnresponse_messageclass_StreamRequestMixin(Call):_metadata_sent:asyncio.Event_done_writing_flag:bool_async_request_poller:Optional[asyncio.Task]_request_style:_APIStyledef_init_stream_request_mixin(self,request_iterator:Optional[RequestIterableType]):self._metadata_sent=asyncio.Event()self._done_writing_flag=False# If user passes in an async iterator, create a consumer Task.ifrequest_iteratorisnotNone:self._async_request_poller=self._loop.create_task(self._consume_request_iterator(request_iterator))self._request_style=_APIStyle.ASYNC_GENERATORelse:self._async_request_poller=Noneself._request_style=_APIStyle.READER_WRITERdef_raise_for_different_style(self,style:_APIStyle):ifself._request_styleisnotstyle:raisecygrpc.UsageError(_API_STYLE_ERROR)defcancel(self)->bool:ifsuper().cancel():ifself._async_request_pollerisnotNone:self._async_request_poller.cancel()returnTruereturnFalsedef_metadata_sent_observer(self):self._metadata_sent.set()asyncdef_consume_request_iterator(self,request_iterator:RequestIterableType)->None:try:ifinspect.isasyncgen(request_iterator)orhasattr(request_iterator,"__aiter__"):asyncforrequestinrequest_iterator:try:awaitself._write(request)exceptAioRpcErrorasrpc_error:_LOGGER.debug(("Exception while consuming the"" request_iterator:%s"),rpc_error,)returnelse:forrequestinrequest_iterator:try:awaitself._write(request)exceptAioRpcErrorasrpc_error:_LOGGER.debug(("Exception while consuming the"" request_iterator:%s"),rpc_error,)returnawaitself._done_writing()except:# pylint: disable=bare-except # noqa: E722# Client iterators can raise exceptions, which we should handle by# cancelling the RPC and logging the client's error. No exceptions# should escape this function._LOGGER.debug("Client request_iterator raised exception:\n%s",traceback.format_exc(),)self.cancel()asyncdef_write(self,request:RequestType)->None:ifself.done():raiseasyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)ifself._done_writing_flag:raiseasyncio.InvalidStateError(_RPC_HALF_CLOSED_DETAILS)ifnotself._metadata_sent.is_set():awaitself._metadata_sent.wait()ifself.done():awaitself._raise_for_status()serialized_request=_common.serialize(request,self._request_serializer)try:awaitself._cython_call.send_serialized_message(serialized_request)exceptcygrpc.InternalErroraserr:self._cython_call.set_internal_error(str(err))awaitself._raise_for_status()exceptasyncio.CancelledError:ifnotself.cancelled():self.cancel()raiseasyncdef_done_writing(self)->None:ifself.done():# If the RPC is finished, do nothing.returnifnotself._done_writing_flag:# If the done writing is not sent before, try to send it.self._done_writing_flag=Truetry:awaitself._cython_call.send_receive_close()exceptasyncio.CancelledError:ifnotself.cancelled():self.cancel()raiseasyncdefwrite(self,request:RequestType)->None:self._raise_for_different_style(_APIStyle.READER_WRITER)awaitself._write(request)asyncdefdone_writing(self)->None:"""Signal peer that client is done writing. This method is idempotent. """self._raise_for_different_style(_APIStyle.READER_WRITER)awaitself._done_writing()asyncdefwait_for_connection(self)->None:awaitself._metadata_sent.wait()ifself.done():awaitself._raise_for_status()classUnaryUnaryCall(_UnaryResponseMixin,Call,_base_call.UnaryUnaryCall):"""Object for managing unary-unary RPC calls. Returned when an instance of `UnaryUnaryMultiCallable` object is called. """_request:RequestType_invocation_task:asyncio.Task# pylint: disable=too-many-argumentsdef__init__(self,request:RequestType,deadline:Optional[float],metadata:Metadata,credentials:Optional[grpc.CallCredentials],wait_for_ready:Optional[bool],channel:cygrpc.AioChannel,method:bytes,request_serializer:Optional[SerializingFunction],response_deserializer:Optional[DeserializingFunction],loop:asyncio.AbstractEventLoop,)->None:super().__init__(channel.call(method,deadline,credentials,wait_for_ready),metadata,request_serializer,response_deserializer,loop,)self._request=requestself._context=cygrpc.build_census_context()self._invocation_task=loop.create_task(self._invoke())self._init_unary_response_mixin(self._invocation_task)asyncdef_invoke(self)->ResponseType:serialized_request=_common.serialize(self._request,self._request_serializer)# NOTE(lidiz) asyncio.CancelledError is not a good transport for status,# because the asyncio.Task class do not cache the exception object.# https://github.com/python/cpython/blob/edad4d89e357c92f70c0324b937845d652b20afd/Lib/asyncio/tasks.py#L785try:serialized_response=awaitself._cython_call.unary_unary(serialized_request,self._metadata,self._context)exceptasyncio.CancelledError:ifnotself.cancelled():self.cancel()ifself._cython_call.is_ok():return_common.deserialize(serialized_response,self._response_deserializer)returncygrpc.EOFasyncdefwait_for_connection(self)->None:awaitself._invocation_taskifself.done():awaitself._raise_for_status()classUnaryStreamCall(_StreamResponseMixin,Call,_base_call.UnaryStreamCall):"""Object for managing unary-stream RPC calls. Returned when an instance of `UnaryStreamMultiCallable` object is called. """_request:RequestType_send_unary_request_task:asyncio.Task# pylint: disable=too-many-argumentsdef__init__(self,request:RequestType,deadline:Optional[float],metadata:Metadata,credentials:Optional[grpc.CallCredentials],wait_for_ready:Optional[bool],channel:cygrpc.AioChannel,method:bytes,request_serializer:Optional[SerializingFunction],response_deserializer:Optional[DeserializingFunction],loop:asyncio.AbstractEventLoop,)->None:super().__init__(channel.call(method,deadline,credentials,wait_for_ready),metadata,request_serializer,response_deserializer,loop,)self._request=requestself._context=cygrpc.build_census_context()self._send_unary_request_task=loop.create_task(self._send_unary_request())self._init_stream_response_mixin(self._send_unary_request_task)asyncdef_send_unary_request(self)->ResponseType:serialized_request=_common.serialize(self._request,self._request_serializer)try:awaitself._cython_call.initiate_unary_stream(serialized_request,self._metadata,self._context)exceptasyncio.CancelledError:ifnotself.cancelled():self.cancel()raiseasyncdefwait_for_connection(self)->None:awaitself._send_unary_request_taskifself.done():awaitself._raise_for_status()# pylint: disable=too-many-ancestorsclassStreamUnaryCall(_StreamRequestMixin,_UnaryResponseMixin,Call,_base_call.StreamUnaryCall):"""Object for managing stream-unary RPC calls. Returned when an instance of `StreamUnaryMultiCallable` object is called. """# pylint: disable=too-many-argumentsdef__init__(self,request_iterator:Optional[RequestIterableType],deadline:Optional[float],metadata:Metadata,credentials:Optional[grpc.CallCredentials],wait_for_ready:Optional[bool],channel:cygrpc.AioChannel,method:bytes,request_serializer:Optional[SerializingFunction],response_deserializer:Optional[DeserializingFunction],loop:asyncio.AbstractEventLoop,)->None:super().__init__(channel.call(method,deadline,credentials,wait_for_ready),metadata,request_serializer,response_deserializer,loop,)self._context=cygrpc.build_census_context()self._init_stream_request_mixin(request_iterator)self._init_unary_response_mixin(loop.create_task(self._conduct_rpc()))asyncdef_conduct_rpc(self)->ResponseType:try:serialized_response=awaitself._cython_call.stream_unary(self._metadata,self._metadata_sent_observer,self._context)exceptasyncio.CancelledError:ifnotself.cancelled():self.cancel()raiseifself._cython_call.is_ok():return_common.deserialize(serialized_response,self._response_deserializer)returncygrpc.EOFclassStreamStreamCall(_StreamRequestMixin,_StreamResponseMixin,Call,_base_call.StreamStreamCall):"""Object for managing stream-stream RPC calls. Returned when an instance of `StreamStreamMultiCallable` object is called. """_initializer:asyncio.Task# pylint: disable=too-many-argumentsdef__init__(self,request_iterator:Optional[RequestIterableType],deadline:Optional[float],metadata:Metadata,credentials:Optional[grpc.CallCredentials],wait_for_ready:Optional[bool],channel:cygrpc.AioChannel,method:bytes,request_serializer:Optional[SerializingFunction],response_deserializer:Optional[DeserializingFunction],loop:asyncio.AbstractEventLoop,)->None:super().__init__(channel.call(method,deadline,credentials,wait_for_ready),metadata,request_serializer,response_deserializer,loop,)self._context=cygrpc.build_census_context()self._initializer=self._loop.create_task(self._prepare_rpc())self._init_stream_request_mixin(request_iterator)self._init_stream_response_mixin(self._initializer)asyncdef_prepare_rpc(self):"""Prepares the RPC for receiving/sending messages. All other operations around the stream should only happen after the completion of this method. """try:awaitself._cython_call.initiate_stream_stream(self._metadata,self._metadata_sent_observer,self._context)exceptasyncio.CancelledError:ifnotself.cancelled():self.cancel()# No need to raise RpcError here, because no one will `await` this task.