# Copyright 2019 gRPC authors.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License."""Interceptors implementation of gRPC Asyncio Python."""from__future__importannotationsfromabcimportABCMetafromabcimportabstractmethodimportasyncioimportcollectionsimportfunctoolsfromtypingimport(AsyncIterable,AsyncIterator,Awaitable,Callable,List,Optional,Sequence,Union,)importgrpcfromgrpc._cythonimportcygrpcfrom.import_base_callfrom._callimportAioRpcErrorfrom._callimportStreamStreamCallfrom._callimportStreamUnaryCallfrom._callimportUnaryStreamCallfrom._callimportUnaryUnaryCallfrom._callimport_API_STYLE_ERRORfrom._callimport_RPC_ALREADY_FINISHED_DETAILSfrom._callimport_RPC_HALF_CLOSED_DETAILSfrom._metadataimportMetadatafrom._typingimportDeserializingFunctionfrom._typingimportDoneCallbackTypefrom._typingimportEOFTypefrom._typingimportRequestIterableTypefrom._typingimportRequestTypefrom._typingimportResponseIterableTypefrom._typingimportResponseTypefrom._typingimportSerializingFunctionfrom._utilsimport_timeout_to_deadline_LOCAL_CANCELLATION_DETAILS="Locally cancelled by application!"[docs]classServerInterceptor(metaclass=ABCMeta):"""Affords intercepting incoming RPCs on the service-side. This is an EXPERIMENTAL API. """[docs]@abstractmethodasyncdefintercept_service(self,continuation:Callable[[grpc.HandlerCallDetails],Awaitable[grpc.RpcMethodHandler]],handler_call_details:grpc.HandlerCallDetails,)->grpc.RpcMethodHandler:"""Intercepts incoming RPCs before handing them over to a handler. State can be passed from an interceptor to downstream interceptors via contextvars. The first interceptor is called from an empty contextvars.Context, and the same Context is used for downstream interceptors and for the final handler call. Note that there are no guarantees that interceptors and handlers will be called from the same thread. Args: continuation: A function that takes a HandlerCallDetails and proceeds to invoke the next interceptor in the chain, if any, or the RPC handler lookup logic, with the call details passed as an argument, and returns an RpcMethodHandler instance if the RPC is considered serviced, or None otherwise. handler_call_details: A HandlerCallDetails describing the RPC. Returns: An RpcMethodHandler with which the RPC may be serviced if the interceptor chooses to service this RPC, or None otherwise. """ [docs]classClientCallDetails(collections.namedtuple("ClientCallDetails",("method","timeout","metadata","credentials","wait_for_ready"),),grpc.ClientCallDetails,):"""Describes an RPC to be invoked. This is an EXPERIMENTAL API. Args: method: The method name of the RPC. timeout: An optional duration of time in seconds to allow for the RPC. metadata: Optional metadata to be transmitted to the service-side of the RPC. credentials: An optional CallCredentials for the RPC. wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism. """method:bytestimeout:Optional[float]metadata:Optional[Metadata]credentials:Optional[grpc.CallCredentials]wait_for_ready:Optional[bool] [docs]classClientInterceptor(metaclass=ABCMeta):"""Base class used for all Aio Client Interceptor classes""" [docs]classUnaryUnaryClientInterceptor(ClientInterceptor,metaclass=ABCMeta):"""Affords intercepting unary-unary invocations."""[docs]@abstractmethodasyncdefintercept_unary_unary(self,continuation:Callable[[ClientCallDetails,RequestType],UnaryUnaryCall],client_call_details:ClientCallDetails,request:RequestType,)->Union[UnaryUnaryCall,ResponseType]:"""Intercepts a unary-unary invocation asynchronously. Args: continuation: A coroutine that proceeds with the invocation by executing the next interceptor in the chain or invoking the actual RPC on the underlying Channel. It is the interceptor's responsibility to call it if it decides to move the RPC forward. The interceptor can use `call = await continuation(client_call_details, request)` to continue with the RPC. `continuation` returns the call to the RPC. client_call_details: A ClientCallDetails object describing the outgoing RPC. request: The request value for the RPC. Returns: An object with the RPC response. Raises: AioRpcError: Indicating that the RPC terminated with non-OK status. asyncio.CancelledError: Indicating that the RPC was canceled. """ [docs]classUnaryStreamClientInterceptor(ClientInterceptor,metaclass=ABCMeta):"""Affords intercepting unary-stream invocations."""[docs]@abstractmethodasyncdefintercept_unary_stream(self,continuation:Callable[[ClientCallDetails,RequestType],UnaryStreamCall],client_call_details:ClientCallDetails,request:RequestType,)->Union[ResponseIterableType,UnaryStreamCall]:"""Intercepts a unary-stream invocation asynchronously. The function could return the call object or an asynchronous iterator, in case of being an asyncrhonous iterator this will become the source of the reads done by the caller. Args: continuation: A coroutine that proceeds with the invocation by executing the next interceptor in the chain or invoking the actual RPC on the underlying Channel. It is the interceptor's responsibility to call it if it decides to move the RPC forward. The interceptor can use `call = await continuation(client_call_details, request)` to continue with the RPC. `continuation` returns the call to the RPC. client_call_details: A ClientCallDetails object describing the outgoing RPC. request: The request value for the RPC. Returns: The RPC Call or an asynchronous iterator. Raises: AioRpcError: Indicating that the RPC terminated with non-OK status. asyncio.CancelledError: Indicating that the RPC was canceled. """ [docs]classStreamUnaryClientInterceptor(ClientInterceptor,metaclass=ABCMeta):"""Affords intercepting stream-unary invocations."""[docs]@abstractmethodasyncdefintercept_stream_unary(self,continuation:Callable[[ClientCallDetails,RequestType],StreamUnaryCall],client_call_details:ClientCallDetails,request_iterator:RequestIterableType,)->StreamUnaryCall:"""Intercepts a stream-unary invocation asynchronously. Within the interceptor the usage of the call methods like `write` or even awaiting the call should be done carefully, since the caller could be expecting an untouched call, for example for start writing messages to it. Args: continuation: A coroutine that proceeds with the invocation by executing the next interceptor in the chain or invoking the actual RPC on the underlying Channel. It is the interceptor's responsibility to call it if it decides to move the RPC forward. The interceptor can use `call = await continuation(client_call_details, request_iterator)` to continue with the RPC. `continuation` returns the call to the RPC. client_call_details: A ClientCallDetails object describing the outgoing RPC. request_iterator: The request iterator that will produce requests for the RPC. Returns: The RPC Call. Raises: AioRpcError: Indicating that the RPC terminated with non-OK status. asyncio.CancelledError: Indicating that the RPC was canceled. """ [docs]classStreamStreamClientInterceptor(ClientInterceptor,metaclass=ABCMeta):"""Affords intercepting stream-stream invocations."""[docs]@abstractmethodasyncdefintercept_stream_stream(self,continuation:Callable[[ClientCallDetails,RequestType],StreamStreamCall],client_call_details:ClientCallDetails,request_iterator:RequestIterableType,)->Union[ResponseIterableType,StreamStreamCall]:"""Intercepts a stream-stream invocation asynchronously. Within the interceptor the usage of the call methods like `write` or even awaiting the call should be done carefully, since the caller could be expecting an untouched call, for example for start writing messages to it. The function could return the call object or an asynchronous iterator, in case of being an asyncrhonous iterator this will become the source of the reads done by the caller. Args: continuation: A coroutine that proceeds with the invocation by executing the next interceptor in the chain or invoking the actual RPC on the underlying Channel. It is the interceptor's responsibility to call it if it decides to move the RPC forward. The interceptor can use `call = await continuation(client_call_details, request_iterator)` to continue with the RPC. `continuation` returns the call to the RPC. client_call_details: A ClientCallDetails object describing the outgoing RPC. request_iterator: The request iterator that will produce requests for the RPC. Returns: The RPC Call or an asynchronous iterator. Raises: AioRpcError: Indicating that the RPC terminated with non-OK status. asyncio.CancelledError: Indicating that the RPC was canceled. """ classInterceptedCall:"""Base implementation for all intercepted call arities. Interceptors might have some work to do before the RPC invocation with the capacity of changing the invocation parameters, and some work to do after the RPC invocation with the capacity for accessing to the wrapped `UnaryUnaryCall`. It handles also early and later cancellations, when the RPC has not even started and the execution is still held by the interceptors or when the RPC has finished but again the execution is still held by the interceptors. Once the RPC is finally executed, all methods are finally done against the intercepted call, being at the same time the same call returned to the interceptors. As a base class for all of the interceptors implements the logic around final status, metadata and cancellation. """_interceptors_task:asyncio.Task_pending_add_done_callbacks:Sequence[DoneCallbackType]def__init__(self,interceptors_task:asyncio.Task)->None:self._interceptors_task=interceptors_taskself._pending_add_done_callbacks=[]self._interceptors_task.add_done_callback(self._fire_or_add_pending_done_callbacks)def__del__(self):self.cancel()def_fire_or_add_pending_done_callbacks(self,interceptors_task:asyncio.Task)->None:ifnotself._pending_add_done_callbacks:returncall_completed=Falsetry:call=interceptors_task.result()ifcall.done():call_completed=Trueexcept(AioRpcError,asyncio.CancelledError):call_completed=Trueifcall_completed:forcallbackinself._pending_add_done_callbacks:callback(self)else:forcallbackinself._pending_add_done_callbacks:callback=functools.partial(self._wrap_add_done_callback,callback)call.add_done_callback(callback)self._pending_add_done_callbacks=[]def_wrap_add_done_callback(self,callback:DoneCallbackType,unused_call:_base_call.Call)->None:callback(self)defcancel(self)->bool:ifnotself._interceptors_task.done():# There is no yet the intercepted call available,# Trying to cancel it by using the generic Asyncio# cancellation method.returnself._interceptors_task.cancel()try:call=self._interceptors_task.result()exceptAioRpcError:returnFalseexceptasyncio.CancelledError:returnFalsereturncall.cancel()defcancelled(self)->bool:ifnotself._interceptors_task.done():returnFalsetry:call=self._interceptors_task.result()exceptAioRpcErroraserr:returnerr.code()==grpc.StatusCode.CANCELLEDexceptasyncio.CancelledError:returnTruereturncall.cancelled()defdone(self)->bool:ifnotself._interceptors_task.done():returnFalsetry:call=self._interceptors_task.result()except(AioRpcError,asyncio.CancelledError):returnTruereturncall.done()defadd_done_callback(self,callback:DoneCallbackType)->None:ifnotself._interceptors_task.done():self._pending_add_done_callbacks.append(callback)returntry:call=self._interceptors_task.result()except(AioRpcError,asyncio.CancelledError):callback(self)returnifcall.done():callback(self)else:callback=functools.partial(self._wrap_add_done_callback,callback)call.add_done_callback(callback)deftime_remaining(self)->Optional[float]:raiseNotImplementedError()asyncdefinitial_metadata(self)->Optional[Metadata]:try:call=awaitself._interceptors_taskexceptAioRpcErroraserr:returnerr.initial_metadata()exceptasyncio.CancelledError:returnNonereturnawaitcall.initial_metadata()asyncdeftrailing_metadata(self)->Optional[Metadata]:try:call=awaitself._interceptors_taskexceptAioRpcErroraserr:returnerr.trailing_metadata()exceptasyncio.CancelledError:returnNonereturnawaitcall.trailing_metadata()asyncdefcode(self)->grpc.StatusCode:try:call=awaitself._interceptors_taskexceptAioRpcErroraserr:returnerr.code()exceptasyncio.CancelledError:returngrpc.StatusCode.CANCELLEDreturnawaitcall.code()asyncdefdetails(self)->str:try:call=awaitself._interceptors_taskexceptAioRpcErroraserr:returnerr.details()exceptasyncio.CancelledError:return_LOCAL_CANCELLATION_DETAILSreturnawaitcall.details()asyncdefdebug_error_string(self)->Optional[str]:try:call=awaitself._interceptors_taskexceptAioRpcErroraserr:returnerr.debug_error_string()exceptasyncio.CancelledError:return""returnawaitcall.debug_error_string()asyncdefwait_for_connection(self)->None:call=awaitself._interceptors_taskreturnawaitcall.wait_for_connection()class_InterceptedUnaryResponseMixin:def__await__(self):call=yield fromself._interceptors_task.__await__()response=yield fromcall.__await__()returnresponseclass_InterceptedStreamResponseMixin:_response_aiter:Optional[AsyncIterable[ResponseType]]def_init_stream_response_mixin(self)->None:# Is initialized later, otherwise if the iterator is not finally# consumed a logging warning is emitted by Asyncio.self._response_aiter=Noneasyncdef_wait_for_interceptor_task_response_iterator(self,)->ResponseType:call=awaitself._interceptors_taskasyncforresponseincall:yieldresponsedef__aiter__(self)->AsyncIterator[ResponseType]:ifself._response_aiterisNone:self._response_aiter=(self._wait_for_interceptor_task_response_iterator())returnself._response_aiterasyncdefread(self)->Union[EOFType,ResponseType]:ifself._response_aiterisNone:self._response_aiter=(self._wait_for_interceptor_task_response_iterator())try:returnawaitself._response_aiter.asend(None)exceptStopAsyncIteration:returncygrpc.EOFclass_InterceptedStreamRequestMixin:_write_to_iterator_async_gen:Optional[AsyncIterable[RequestType]]_write_to_iterator_queue:Optional[asyncio.Queue]_status_code_task:Optional[asyncio.Task]_FINISH_ITERATOR_SENTINEL=object()def_init_stream_request_mixin(self,request_iterator:Optional[RequestIterableType])->RequestIterableType:ifrequest_iteratorisNone:# We provide our own request iterator which is a proxy# of the futures writes that will be done by the caller.self._write_to_iterator_queue=asyncio.Queue(maxsize=1)self._write_to_iterator_async_gen=(self._proxy_writes_as_request_iterator())self._status_code_task=Nonerequest_iterator=self._write_to_iterator_async_genelse:self._write_to_iterator_queue=Nonereturnrequest_iteratorasyncdef_proxy_writes_as_request_iterator(self):awaitself._interceptors_taskwhileTrue:value=awaitself._write_to_iterator_queue.get()if(valueis_InterceptedStreamRequestMixin._FINISH_ITERATOR_SENTINEL):breakyieldvalueasyncdef_write_to_iterator_queue_interruptible(self,request:RequestType,call:_base_call.Call,):# Write the specified 'request' to the request iterator queue using the# specified 'call' to allow for interruption of the write in the case# of abrupt termination of the call.ifself._status_code_taskisNone:self._status_code_task=self._loop.create_task(call.code())awaitasyncio.wait((self._loop.create_task(self._write_to_iterator_queue.put(request)),self._status_code_task,),return_when=asyncio.FIRST_COMPLETED,)asyncdefwrite(self,request:RequestType)->None:# If no queue was created it means that requests# should be expected through an iterators provided# by the caller.ifself._write_to_iterator_queueisNone:raisecygrpc.UsageError(_API_STYLE_ERROR)try:call=awaitself._interceptors_taskexcept(asyncio.CancelledError,AioRpcError):raiseasyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)ifcall.done():raiseasyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)elifcall._done_writing_flag:raiseasyncio.InvalidStateError(_RPC_HALF_CLOSED_DETAILS)awaitself._write_to_iterator_queue_interruptible(request,call)ifcall.done():raiseasyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)asyncdefdone_writing(self)->None:"""Signal peer that client is done writing. This method is idempotent. """# If no queue was created it means that requests# should be expected through an iterators provided# by the caller.ifself._write_to_iterator_queueisNone:raisecygrpc.UsageError(_API_STYLE_ERROR)try:call=awaitself._interceptors_taskexceptasyncio.CancelledError:raiseasyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)awaitself._write_to_iterator_queue_interruptible(_InterceptedStreamRequestMixin._FINISH_ITERATOR_SENTINEL,call)[docs]classInterceptedUnaryUnaryCall(_InterceptedUnaryResponseMixin,InterceptedCall,_base_call.UnaryUnaryCall):"""Used for running a `UnaryUnaryCall` wrapped by interceptors. For the `__await__` method is it is proxied to the intercepted call only when the interceptor task is finished. """_loop:asyncio.AbstractEventLoop_channel:cygrpc.AioChannel# pylint: disable=too-many-argumentsdef__init__(self,interceptors:Sequence[UnaryUnaryClientInterceptor],request:RequestType,timeout:Optional[float],metadata:Metadata,credentials:Optional[grpc.CallCredentials],wait_for_ready:Optional[bool],channel:cygrpc.AioChannel,method:bytes,request_serializer:Optional[SerializingFunction],response_deserializer:Optional[DeserializingFunction],loop:asyncio.AbstractEventLoop,)->None:self._loop=loopself._channel=channelinterceptors_task=loop.create_task(self._invoke(interceptors,method,timeout,metadata,credentials,wait_for_ready,request,request_serializer,response_deserializer,))super().__init__(interceptors_task)# pylint: disable=too-many-argumentsasyncdef_invoke(self,interceptors:Sequence[UnaryUnaryClientInterceptor],method:bytes,timeout:Optional[float],metadata:Optional[Metadata],credentials:Optional[grpc.CallCredentials],wait_for_ready:Optional[bool],request:RequestType,request_serializer:Optional[SerializingFunction],response_deserializer:Optional[DeserializingFunction],)->Union[UnaryUnaryCall,UnaryUnaryCallResponse]:"""Run the RPC call wrapped in interceptors"""asyncdef_run_interceptor(interceptors:List[UnaryUnaryClientInterceptor],client_call_details:ClientCallDetails,request:RequestType,)->Union[UnaryUnaryCall,UnaryUnaryCallResponse]:ifinterceptors:continuation=functools.partial(_run_interceptor,interceptors[1:])call_or_response=awaitinterceptors[0].intercept_unary_unary(continuation,client_call_details,request)ifisinstance(call_or_response,_base_call.UnaryUnaryCall):returncall_or_responsereturnUnaryUnaryCallResponse(call_or_response)returnUnaryUnaryCall(request,_timeout_to_deadline(client_call_details.timeout),client_call_details.metadata,client_call_details.credentials,client_call_details.wait_for_ready,self._channel,client_call_details.method,request_serializer,response_deserializer,self._loop,)client_call_details=ClientCallDetails(method,timeout,metadata,credentials,wait_for_ready)returnawait_run_interceptor(list(interceptors),client_call_details,request)[docs]deftime_remaining(self)->Optional[float]:raiseNotImplementedError() classInterceptedUnaryStreamCall(_InterceptedStreamResponseMixin,InterceptedCall,_base_call.UnaryStreamCall):"""Used for running a `UnaryStreamCall` wrapped by interceptors."""_loop:asyncio.AbstractEventLoop_channel:cygrpc.AioChannel_last_returned_call_from_interceptors=Optional[_base_call.UnaryStreamCall]# pylint: disable=too-many-argumentsdef__init__(self,interceptors:Sequence[UnaryStreamClientInterceptor],request:RequestType,timeout:Optional[float],metadata:Metadata,credentials:Optional[grpc.CallCredentials],wait_for_ready:Optional[bool],channel:cygrpc.AioChannel,method:bytes,request_serializer:Optional[SerializingFunction],response_deserializer:Optional[DeserializingFunction],loop:asyncio.AbstractEventLoop,)->None:self._loop=loopself._channel=channelself._init_stream_response_mixin()self._last_returned_call_from_interceptors=Noneinterceptors_task=loop.create_task(self._invoke(interceptors,method,timeout,metadata,credentials,wait_for_ready,request,request_serializer,response_deserializer,))super().__init__(interceptors_task)# pylint: disable=too-many-argumentsasyncdef_invoke(self,interceptors:Sequence[UnaryStreamClientInterceptor],method:bytes,timeout:Optional[float],metadata:Optional[Metadata],credentials:Optional[grpc.CallCredentials],wait_for_ready:Optional[bool],request:RequestType,request_serializer:Optional[SerializingFunction],response_deserializer:Optional[DeserializingFunction],)->Union[UnaryStreamCall,UnaryStreamCallResponseIterator]:"""Run the RPC call wrapped in interceptors"""asyncdef_run_interceptor(interceptors:List[UnaryStreamClientInterceptor],client_call_details:ClientCallDetails,request:RequestType,)->Union[UnaryStreamCall,UnaryStreamCallResponseIterator]:ifinterceptors:continuation=functools.partial(_run_interceptor,interceptors[1:])call_or_response_iterator=awaitinterceptors[0].intercept_unary_stream(continuation,client_call_details,request)ifisinstance(call_or_response_iterator,_base_call.UnaryStreamCall):self._last_returned_call_from_interceptors=(call_or_response_iterator)else:self._last_returned_call_from_interceptors=(UnaryStreamCallResponseIterator(self._last_returned_call_from_interceptors,call_or_response_iterator,))returnself._last_returned_call_from_interceptorsself._last_returned_call_from_interceptors=UnaryStreamCall(request,_timeout_to_deadline(client_call_details.timeout),client_call_details.metadata,client_call_details.credentials,client_call_details.wait_for_ready,self._channel,client_call_details.method,request_serializer,response_deserializer,self._loop,)returnself._last_returned_call_from_interceptorsclient_call_details=ClientCallDetails(method,timeout,metadata,credentials,wait_for_ready)returnawait_run_interceptor(list(interceptors),client_call_details,request)deftime_remaining(self)->Optional[float]:raiseNotImplementedError()classInterceptedStreamUnaryCall(_InterceptedUnaryResponseMixin,_InterceptedStreamRequestMixin,InterceptedCall,_base_call.StreamUnaryCall,):"""Used for running a `StreamUnaryCall` wrapped by interceptors. For the `__await__` method is it is proxied to the intercepted call only when the interceptor task is finished. """_loop:asyncio.AbstractEventLoop_channel:cygrpc.AioChannel# pylint: disable=too-many-argumentsdef__init__(self,interceptors:Sequence[StreamUnaryClientInterceptor],request_iterator:Optional[RequestIterableType],timeout:Optional[float],metadata:Metadata,credentials:Optional[grpc.CallCredentials],wait_for_ready:Optional[bool],channel:cygrpc.AioChannel,method:bytes,request_serializer:Optional[SerializingFunction],response_deserializer:Optional[DeserializingFunction],loop:asyncio.AbstractEventLoop,)->None:self._loop=loopself._channel=channelrequest_iterator=self._init_stream_request_mixin(request_iterator)interceptors_task=loop.create_task(self._invoke(interceptors,method,timeout,metadata,credentials,wait_for_ready,request_iterator,request_serializer,response_deserializer,))super().__init__(interceptors_task)# pylint: disable=too-many-argumentsasyncdef_invoke(self,interceptors:Sequence[StreamUnaryClientInterceptor],method:bytes,timeout:Optional[float],metadata:Optional[Metadata],credentials:Optional[grpc.CallCredentials],wait_for_ready:Optional[bool],request_iterator:RequestIterableType,request_serializer:Optional[SerializingFunction],response_deserializer:Optional[DeserializingFunction],)->StreamUnaryCall:"""Run the RPC call wrapped in interceptors"""asyncdef_run_interceptor(interceptors:Sequence[StreamUnaryClientInterceptor],client_call_details:ClientCallDetails,request_iterator:RequestIterableType,)->_base_call.StreamUnaryCall:ifinterceptors:continuation=functools.partial(_run_interceptor,interceptors[1:])returnawaitinterceptors[0].intercept_stream_unary(continuation,client_call_details,request_iterator)returnStreamUnaryCall(request_iterator,_timeout_to_deadline(client_call_details.timeout),client_call_details.metadata,client_call_details.credentials,client_call_details.wait_for_ready,self._channel,client_call_details.method,request_serializer,response_deserializer,self._loop,)client_call_details=ClientCallDetails(method,timeout,metadata,credentials,wait_for_ready)returnawait_run_interceptor(list(interceptors),client_call_details,request_iterator)deftime_remaining(self)->Optional[float]:raiseNotImplementedError()classInterceptedStreamStreamCall(_InterceptedStreamResponseMixin,_InterceptedStreamRequestMixin,InterceptedCall,_base_call.StreamStreamCall,):"""Used for running a `StreamStreamCall` wrapped by interceptors."""_loop:asyncio.AbstractEventLoop_channel:cygrpc.AioChannel_last_returned_call_from_interceptors=Optional[_base_call.StreamStreamCall]# pylint: disable=too-many-argumentsdef__init__(self,interceptors:Sequence[StreamStreamClientInterceptor],request_iterator:Optional[RequestIterableType],timeout:Optional[float],metadata:Metadata,credentials:Optional[grpc.CallCredentials],wait_for_ready:Optional[bool],channel:cygrpc.AioChannel,method:bytes,request_serializer:Optional[SerializingFunction],response_deserializer:Optional[DeserializingFunction],loop:asyncio.AbstractEventLoop,)->None:self._loop=loopself._channel=channelself._init_stream_response_mixin()request_iterator=self._init_stream_request_mixin(request_iterator)self._last_returned_call_from_interceptors=Noneinterceptors_task=loop.create_task(self._invoke(interceptors,method,timeout,metadata,credentials,wait_for_ready,request_iterator,request_serializer,response_deserializer,))super().__init__(interceptors_task)# pylint: disable=too-many-argumentsasyncdef_invoke(self,interceptors:Sequence[StreamStreamClientInterceptor],method:bytes,timeout:Optional[float],metadata:Optional[Metadata],credentials:Optional[grpc.CallCredentials],wait_for_ready:Optional[bool],request_iterator:RequestIterableType,request_serializer:Optional[SerializingFunction],response_deserializer:Optional[DeserializingFunction],)->Union[StreamStreamCall,StreamStreamCallResponseIterator]:"""Run the RPC call wrapped in interceptors"""asyncdef_run_interceptor(interceptors:List[StreamStreamClientInterceptor],client_call_details:ClientCallDetails,request_iterator:RequestIterableType,)->Union[StreamStreamCall,StreamStreamCallResponseIterator]:ifinterceptors:continuation=functools.partial(_run_interceptor,interceptors[1:])call_or_response_iterator=awaitinterceptors[0].intercept_stream_stream(continuation,client_call_details,request_iterator)ifisinstance(call_or_response_iterator,_base_call.StreamStreamCall):self._last_returned_call_from_interceptors=(call_or_response_iterator)else:self._last_returned_call_from_interceptors=(StreamStreamCallResponseIterator(self._last_returned_call_from_interceptors,call_or_response_iterator,))returnself._last_returned_call_from_interceptorsself._last_returned_call_from_interceptors=StreamStreamCall(request_iterator,_timeout_to_deadline(client_call_details.timeout),client_call_details.metadata,client_call_details.credentials,client_call_details.wait_for_ready,self._channel,client_call_details.method,request_serializer,response_deserializer,self._loop,)returnself._last_returned_call_from_interceptorsclient_call_details=ClientCallDetails(method,timeout,metadata,credentials,wait_for_ready)returnawait_run_interceptor(list(interceptors),client_call_details,request_iterator)deftime_remaining(self)->Optional[float]:raiseNotImplementedError()classUnaryUnaryCallResponse(_base_call.UnaryUnaryCall):"""Final UnaryUnaryCall class finished with a response."""_response:ResponseTypedef__init__(self,response:ResponseType)->None:self._response=responsedefcancel(self)->bool:returnFalsedefcancelled(self)->bool:returnFalsedefdone(self)->bool:returnTruedefadd_done_callback(self,unused_callback)->None:raiseNotImplementedError()deftime_remaining(self)->Optional[float]:raiseNotImplementedError()asyncdefinitial_metadata(self)->Optional[Metadata]:returnNoneasyncdeftrailing_metadata(self)->Optional[Metadata]:returnNoneasyncdefcode(self)->grpc.StatusCode:returngrpc.StatusCode.OKasyncdefdetails(self)->str:return""asyncdefdebug_error_string(self)->Optional[str]:returnNonedef__await__(self):ifFalse:# pylint: disable=using-constant-test# This code path is never used, but a yield statement is needed# for telling the interpreter that __await__ is a generator.yieldNonereturnself._responseasyncdefwait_for_connection(self)->None:passclass_StreamCallResponseIterator:_call:Union[_base_call.UnaryStreamCall,_base_call.StreamStreamCall]_response_iterator:AsyncIterable[ResponseType]def__init__(self,call:Union[_base_call.UnaryStreamCall,_base_call.StreamStreamCall],response_iterator:AsyncIterable[ResponseType],)->None:self._response_iterator=response_iteratorself._call=calldefcancel(self)->bool:returnself._call.cancel()defcancelled(self)->bool:returnself._call.cancelled()defdone(self)->bool:returnself._call.done()defadd_done_callback(self,callback)->None:self._call.add_done_callback(callback)deftime_remaining(self)->Optional[float]:returnself._call.time_remaining()asyncdefinitial_metadata(self)->Optional[Metadata]:returnawaitself._call.initial_metadata()asyncdeftrailing_metadata(self)->Optional[Metadata]:returnawaitself._call.trailing_metadata()asyncdefcode(self)->grpc.StatusCode:returnawaitself._call.code()asyncdefdetails(self)->str:returnawaitself._call.details()asyncdefdebug_error_string(self)->Optional[str]:returnawaitself._call.debug_error_string()def__aiter__(self):returnself._response_iterator.__aiter__()asyncdefwait_for_connection(self)->None:returnawaitself._call.wait_for_connection()classUnaryStreamCallResponseIterator(_StreamCallResponseIterator,_base_call.UnaryStreamCall):"""UnaryStreamCall class which uses an alternative response iterator."""asyncdefread(self)->Union[EOFType,ResponseType]:# Behind the scenes everything goes through the# async iterator. So this path should not be reached.raiseNotImplementedError()classStreamStreamCallResponseIterator(_StreamCallResponseIterator,_base_call.StreamStreamCall):"""StreamStreamCall class which uses an alternative response iterator."""asyncdefread(self)->Union[EOFType,ResponseType]:# Behind the scenes everything goes through the# async iterator. So this path should not be reached.raiseNotImplementedError()asyncdefwrite(self,request:RequestType)->None:# Behind the scenes everything goes through the# async iterator provided by the InterceptedStreamStreamCall.# So this path should not be reached.raiseNotImplementedError()asyncdefdone_writing(self)->None:# Behind the scenes everything goes through the# async iterator provided by the InterceptedStreamStreamCall.# So this path should not be reached.raiseNotImplementedError()@propertydef_done_writing_flag(self)->bool:returnself._call._done_writing_flag