1# Copyright 2020 The gRPC Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Abstract base classes for Channel objects and Multicallable objects.""" 15 16import abc 17from typing import Generic, Optional 18 19import grpc 20 21from . import _base_call 22from ._typing import DeserializingFunction 23from ._typing import MetadataType 24from ._typing import RequestIterableType 25from ._typing import RequestType 26from ._typing import ResponseType 27from ._typing import SerializingFunction 28 29 30class UnaryUnaryMultiCallable(Generic[RequestType, ResponseType], abc.ABC): 31 """Enables asynchronous invocation of a unary-call RPC.""" 32 33 @abc.abstractmethod 34 def __call__( 35 self, 36 request: RequestType, 37 *, 38 timeout: Optional[float] = None, 39 metadata: Optional[MetadataType] = None, 40 credentials: Optional[grpc.CallCredentials] = None, 41 wait_for_ready: Optional[bool] = None, 42 compression: Optional[grpc.Compression] = None, 43 ) -> _base_call.UnaryUnaryCall[RequestType, ResponseType]: 44 """Asynchronously invokes the underlying RPC. 45 46 Args: 47 request: The request value for the RPC. 48 timeout: An optional duration of time in seconds to allow 49 for the RPC. 50 metadata: Optional :term:`metadata` to be transmitted to the 51 service-side of the RPC. 52 credentials: An optional CallCredentials for the RPC. Only valid for 53 secure Channel. 54 wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism. 55 compression: An element of grpc.compression, e.g. 56 grpc.compression.Gzip. 57 58 Returns: 59 A UnaryUnaryCall object. 60 61 Raises: 62 RpcError: Indicates that the RPC terminated with non-OK status. The 63 raised RpcError will also be a Call for the RPC affording the RPC's 64 metadata, status code, and details. 65 """ 66 67 68class UnaryStreamMultiCallable(Generic[RequestType, ResponseType], abc.ABC): 69 """Enables asynchronous invocation of a server-streaming RPC.""" 70 71 @abc.abstractmethod 72 def __call__( 73 self, 74 request: RequestType, 75 *, 76 timeout: Optional[float] = None, 77 metadata: Optional[MetadataType] = None, 78 credentials: Optional[grpc.CallCredentials] = None, 79 wait_for_ready: Optional[bool] = None, 80 compression: Optional[grpc.Compression] = None, 81 ) -> _base_call.UnaryStreamCall[RequestType, ResponseType]: 82 """Asynchronously invokes the underlying RPC. 83 84 Args: 85 request: The request value for the RPC. 86 timeout: An optional duration of time in seconds to allow 87 for the RPC. 88 metadata: Optional :term:`metadata` to be transmitted to the 89 service-side of the RPC. 90 credentials: An optional CallCredentials for the RPC. Only valid for 91 secure Channel. 92 wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism. 93 compression: An element of grpc.compression, e.g. 94 grpc.compression.Gzip. 95 96 Returns: 97 A UnaryStreamCall object. 98 99 Raises: 100 RpcError: Indicates that the RPC terminated with non-OK status. The 101 raised RpcError will also be a Call for the RPC affording the RPC's 102 metadata, status code, and details. 103 """ 104 105 106class StreamUnaryMultiCallable(abc.ABC): 107 """Enables asynchronous invocation of a client-streaming RPC.""" 108 109 @abc.abstractmethod 110 def __call__( 111 self, 112 request_iterator: Optional[RequestIterableType] = None, 113 timeout: Optional[float] = None, 114 metadata: Optional[MetadataType] = None, 115 credentials: Optional[grpc.CallCredentials] = None, 116 wait_for_ready: Optional[bool] = None, 117 compression: Optional[grpc.Compression] = None, 118 ) -> _base_call.StreamUnaryCall: 119 """Asynchronously invokes the underlying RPC. 120 121 Args: 122 request_iterator: An optional async iterable or iterable of request 123 messages for the RPC. 124 timeout: An optional duration of time in seconds to allow 125 for the RPC. 126 metadata: Optional :term:`metadata` to be transmitted to the 127 service-side of the RPC. 128 credentials: An optional CallCredentials for the RPC. Only valid for 129 secure Channel. 130 wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism. 131 compression: An element of grpc.compression, e.g. 132 grpc.compression.Gzip. 133 134 Returns: 135 A StreamUnaryCall object. 136 137 Raises: 138 RpcError: Indicates that the RPC terminated with non-OK status. The 139 raised RpcError will also be a Call for the RPC affording the RPC's 140 metadata, status code, and details. 141 """ 142 143 144class StreamStreamMultiCallable(abc.ABC): 145 """Enables asynchronous invocation of a bidirectional-streaming RPC.""" 146 147 @abc.abstractmethod 148 def __call__( 149 self, 150 request_iterator: Optional[RequestIterableType] = None, 151 timeout: Optional[float] = None, 152 metadata: Optional[MetadataType] = None, 153 credentials: Optional[grpc.CallCredentials] = None, 154 wait_for_ready: Optional[bool] = None, 155 compression: Optional[grpc.Compression] = None, 156 ) -> _base_call.StreamStreamCall: 157 """Asynchronously invokes the underlying RPC. 158 159 Args: 160 request_iterator: An optional async iterable or iterable of request 161 messages for the RPC. 162 timeout: An optional duration of time in seconds to allow 163 for the RPC. 164 metadata: Optional :term:`metadata` to be transmitted to the 165 service-side of the RPC. 166 credentials: An optional CallCredentials for the RPC. Only valid for 167 secure Channel. 168 wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism. 169 compression: An element of grpc.compression, e.g. 170 grpc.compression.Gzip. 171 172 Returns: 173 A StreamStreamCall object. 174 175 Raises: 176 RpcError: Indicates that the RPC terminated with non-OK status. The 177 raised RpcError will also be a Call for the RPC affording the RPC's 178 metadata, status code, and details. 179 """ 180 181 182class Channel(abc.ABC): 183 """Enables asynchronous RPC invocation as a client. 184 185 Channel objects implement the Asynchronous Context Manager (aka. async 186 with) type, although they are not supportted to be entered and exited 187 multiple times. 188 """ 189 190 @abc.abstractmethod 191 async def __aenter__(self): 192 """Starts an asynchronous context manager. 193 194 Returns: 195 Channel the channel that was instantiated. 196 """ 197 198 @abc.abstractmethod 199 async def __aexit__(self, exc_type, exc_val, exc_tb): 200 """Finishes the asynchronous context manager by closing the channel. 201 202 Still active RPCs will be cancelled. 203 """ 204 205 @abc.abstractmethod 206 async def close(self, grace: Optional[float] = None): 207 """Closes this Channel and releases all resources held by it. 208 209 This method immediately stops the channel from executing new RPCs in 210 all cases. 211 212 If a grace period is specified, this method waits until all active 213 RPCs are finished or until the grace period is reached. RPCs that haven't 214 been terminated within the grace period are aborted. 215 If a grace period is not specified (by passing None for grace), 216 all existing RPCs are cancelled immediately. 217 218 This method is idempotent. 219 """ 220 221 @abc.abstractmethod 222 def get_state( 223 self, try_to_connect: bool = False 224 ) -> grpc.ChannelConnectivity: 225 """Checks the connectivity state of a channel. 226 227 This is an EXPERIMENTAL API. 228 229 If the channel reaches a stable connectivity state, it is guaranteed 230 that the return value of this function will eventually converge to that 231 state. 232 233 Args: 234 try_to_connect: a bool indicate whether the Channel should try to 235 connect to peer or not. 236 237 Returns: A ChannelConnectivity object. 238 """ 239 240 @abc.abstractmethod 241 async def wait_for_state_change( 242 self, 243 last_observed_state: grpc.ChannelConnectivity, 244 ) -> None: 245 """Waits for a change in connectivity state. 246 247 This is an EXPERIMENTAL API. 248 249 The function blocks until there is a change in the channel connectivity 250 state from the "last_observed_state". If the state is already 251 different, this function will return immediately. 252 253 There is an inherent race between the invocation of 254 "Channel.wait_for_state_change" and "Channel.get_state". The state can 255 change arbitrary many times during the race, so there is no way to 256 observe every state transition. 257 258 If there is a need to put a timeout for this function, please refer to 259 "asyncio.wait_for". 260 261 Args: 262 last_observed_state: A grpc.ChannelConnectivity object representing 263 the last known state. 264 """ 265 266 @abc.abstractmethod 267 async def channel_ready(self) -> None: 268 """Creates a coroutine that blocks until the Channel is READY.""" 269 270 @abc.abstractmethod 271 def unary_unary( 272 self, 273 method: str, 274 request_serializer: Optional[SerializingFunction] = None, 275 response_deserializer: Optional[DeserializingFunction] = None, 276 _registered_method: Optional[bool] = False, 277 ) -> UnaryUnaryMultiCallable: 278 """Creates a UnaryUnaryMultiCallable for a unary-unary method. 279 280 Args: 281 method: The name of the RPC method. 282 request_serializer: Optional :term:`serializer` for serializing the request 283 message. Request goes unserialized in case None is passed. 284 response_deserializer: Optional :term:`deserializer` for deserializing the 285 response message. Response goes undeserialized in case None 286 is passed. 287 _registered_method: Implementation Private. Optional: A bool representing 288 whether the method is registered. 289 290 Returns: 291 A UnaryUnaryMultiCallable value for the named unary-unary method. 292 """ 293 294 @abc.abstractmethod 295 def unary_stream( 296 self, 297 method: str, 298 request_serializer: Optional[SerializingFunction] = None, 299 response_deserializer: Optional[DeserializingFunction] = None, 300 _registered_method: Optional[bool] = False, 301 ) -> UnaryStreamMultiCallable: 302 """Creates a UnaryStreamMultiCallable for a unary-stream method. 303 304 Args: 305 method: The name of the RPC method. 306 request_serializer: Optional :term:`serializer` for serializing the request 307 message. Request goes unserialized in case None is passed. 308 response_deserializer: Optional :term:`deserializer` for deserializing the 309 response message. Response goes undeserialized in case None 310 is passed. 311 _registered_method: Implementation Private. Optional: A bool representing 312 whether the method is registered. 313 314 Returns: 315 A UnarySteramMultiCallable value for the named unary-stream method. 316 """ 317 318 @abc.abstractmethod 319 def stream_unary( 320 self, 321 method: str, 322 request_serializer: Optional[SerializingFunction] = None, 323 response_deserializer: Optional[DeserializingFunction] = None, 324 _registered_method: Optional[bool] = False, 325 ) -> StreamUnaryMultiCallable: 326 """Creates a StreamUnaryMultiCallable for a stream-unary method. 327 328 Args: 329 method: The name of the RPC method. 330 request_serializer: Optional :term:`serializer` for serializing the request 331 message. Request goes unserialized in case None is passed. 332 response_deserializer: Optional :term:`deserializer` for deserializing the 333 response message. Response goes undeserialized in case None 334 is passed. 335 _registered_method: Implementation Private. Optional: A bool representing 336 whether the method is registered. 337 338 Returns: 339 A StreamUnaryMultiCallable value for the named stream-unary method. 340 """ 341 342 @abc.abstractmethod 343 def stream_stream( 344 self, 345 method: str, 346 request_serializer: Optional[SerializingFunction] = None, 347 response_deserializer: Optional[DeserializingFunction] = None, 348 _registered_method: Optional[bool] = False, 349 ) -> StreamStreamMultiCallable: 350 """Creates a StreamStreamMultiCallable for a stream-stream method. 351 352 Args: 353 method: The name of the RPC method. 354 request_serializer: Optional :term:`serializer` for serializing the request 355 message. Request goes unserialized in case None is passed. 356 response_deserializer: Optional :term:`deserializer` for deserializing the 357 response message. Response goes undeserialized in case None 358 is passed. 359 _registered_method: Implementation Private. Optional: A bool representing 360 whether the method is registered. 361 362 Returns: 363 A StreamStreamMultiCallable value for the named stream-stream method. 364 """ 365