• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 The gRPC Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Abstract base classes for Channel objects and Multicallable objects."""
15
16import abc
17from typing import Generic, Optional
18
19import grpc
20
21from . import _base_call
22from ._typing import DeserializingFunction
23from ._typing import MetadataType
24from ._typing import RequestIterableType
25from ._typing import RequestType
26from ._typing import ResponseType
27from ._typing import SerializingFunction
28
29
30class UnaryUnaryMultiCallable(Generic[RequestType, ResponseType], abc.ABC):
31    """Enables asynchronous invocation of a unary-call RPC."""
32
33    @abc.abstractmethod
34    def __call__(
35        self,
36        request: RequestType,
37        *,
38        timeout: Optional[float] = None,
39        metadata: Optional[MetadataType] = None,
40        credentials: Optional[grpc.CallCredentials] = None,
41        wait_for_ready: Optional[bool] = None,
42        compression: Optional[grpc.Compression] = None,
43    ) -> _base_call.UnaryUnaryCall[RequestType, ResponseType]:
44        """Asynchronously invokes the underlying RPC.
45
46        Args:
47          request: The request value for the RPC.
48          timeout: An optional duration of time in seconds to allow
49            for the RPC.
50          metadata: Optional :term:`metadata` to be transmitted to the
51            service-side of the RPC.
52          credentials: An optional CallCredentials for the RPC. Only valid for
53            secure Channel.
54          wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism.
55          compression: An element of grpc.compression, e.g.
56            grpc.compression.Gzip.
57
58        Returns:
59          A UnaryUnaryCall object.
60
61        Raises:
62          RpcError: Indicates that the RPC terminated with non-OK status. The
63            raised RpcError will also be a Call for the RPC affording the RPC's
64            metadata, status code, and details.
65        """
66
67
68class UnaryStreamMultiCallable(Generic[RequestType, ResponseType], abc.ABC):
69    """Enables asynchronous invocation of a server-streaming RPC."""
70
71    @abc.abstractmethod
72    def __call__(
73        self,
74        request: RequestType,
75        *,
76        timeout: Optional[float] = None,
77        metadata: Optional[MetadataType] = None,
78        credentials: Optional[grpc.CallCredentials] = None,
79        wait_for_ready: Optional[bool] = None,
80        compression: Optional[grpc.Compression] = None,
81    ) -> _base_call.UnaryStreamCall[RequestType, ResponseType]:
82        """Asynchronously invokes the underlying RPC.
83
84        Args:
85          request: The request value for the RPC.
86          timeout: An optional duration of time in seconds to allow
87            for the RPC.
88          metadata: Optional :term:`metadata` to be transmitted to the
89            service-side of the RPC.
90          credentials: An optional CallCredentials for the RPC. Only valid for
91            secure Channel.
92          wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism.
93          compression: An element of grpc.compression, e.g.
94            grpc.compression.Gzip.
95
96        Returns:
97          A UnaryStreamCall object.
98
99        Raises:
100          RpcError: Indicates that the RPC terminated with non-OK status. The
101            raised RpcError will also be a Call for the RPC affording the RPC's
102            metadata, status code, and details.
103        """
104
105
106class StreamUnaryMultiCallable(abc.ABC):
107    """Enables asynchronous invocation of a client-streaming RPC."""
108
109    @abc.abstractmethod
110    def __call__(
111        self,
112        request_iterator: Optional[RequestIterableType] = None,
113        timeout: Optional[float] = None,
114        metadata: Optional[MetadataType] = None,
115        credentials: Optional[grpc.CallCredentials] = None,
116        wait_for_ready: Optional[bool] = None,
117        compression: Optional[grpc.Compression] = None,
118    ) -> _base_call.StreamUnaryCall:
119        """Asynchronously invokes the underlying RPC.
120
121        Args:
122          request_iterator: An optional async iterable or iterable of request
123            messages for the RPC.
124          timeout: An optional duration of time in seconds to allow
125            for the RPC.
126          metadata: Optional :term:`metadata` to be transmitted to the
127            service-side of the RPC.
128          credentials: An optional CallCredentials for the RPC. Only valid for
129            secure Channel.
130          wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism.
131          compression: An element of grpc.compression, e.g.
132            grpc.compression.Gzip.
133
134        Returns:
135          A StreamUnaryCall object.
136
137        Raises:
138          RpcError: Indicates that the RPC terminated with non-OK status. The
139            raised RpcError will also be a Call for the RPC affording the RPC's
140            metadata, status code, and details.
141        """
142
143
144class StreamStreamMultiCallable(abc.ABC):
145    """Enables asynchronous invocation of a bidirectional-streaming RPC."""
146
147    @abc.abstractmethod
148    def __call__(
149        self,
150        request_iterator: Optional[RequestIterableType] = None,
151        timeout: Optional[float] = None,
152        metadata: Optional[MetadataType] = None,
153        credentials: Optional[grpc.CallCredentials] = None,
154        wait_for_ready: Optional[bool] = None,
155        compression: Optional[grpc.Compression] = None,
156    ) -> _base_call.StreamStreamCall:
157        """Asynchronously invokes the underlying RPC.
158
159        Args:
160          request_iterator: An optional async iterable or iterable of request
161            messages for the RPC.
162          timeout: An optional duration of time in seconds to allow
163            for the RPC.
164          metadata: Optional :term:`metadata` to be transmitted to the
165            service-side of the RPC.
166          credentials: An optional CallCredentials for the RPC. Only valid for
167            secure Channel.
168          wait_for_ready: An optional flag to enable :term:`wait_for_ready` mechanism.
169          compression: An element of grpc.compression, e.g.
170            grpc.compression.Gzip.
171
172        Returns:
173          A StreamStreamCall object.
174
175        Raises:
176          RpcError: Indicates that the RPC terminated with non-OK status. The
177            raised RpcError will also be a Call for the RPC affording the RPC's
178            metadata, status code, and details.
179        """
180
181
182class Channel(abc.ABC):
183    """Enables asynchronous RPC invocation as a client.
184
185    Channel objects implement the Asynchronous Context Manager (aka. async
186    with) type, although they are not supportted to be entered and exited
187    multiple times.
188    """
189
190    @abc.abstractmethod
191    async def __aenter__(self):
192        """Starts an asynchronous context manager.
193
194        Returns:
195          Channel the channel that was instantiated.
196        """
197
198    @abc.abstractmethod
199    async def __aexit__(self, exc_type, exc_val, exc_tb):
200        """Finishes the asynchronous context manager by closing the channel.
201
202        Still active RPCs will be cancelled.
203        """
204
205    @abc.abstractmethod
206    async def close(self, grace: Optional[float] = None):
207        """Closes this Channel and releases all resources held by it.
208
209        This method immediately stops the channel from executing new RPCs in
210        all cases.
211
212        If a grace period is specified, this method waits until all active
213        RPCs are finished or until the grace period is reached. RPCs that haven't
214        been terminated within the grace period are aborted.
215        If a grace period is not specified (by passing None for grace),
216        all existing RPCs are cancelled immediately.
217
218        This method is idempotent.
219        """
220
221    @abc.abstractmethod
222    def get_state(
223        self, try_to_connect: bool = False
224    ) -> grpc.ChannelConnectivity:
225        """Checks the connectivity state of a channel.
226
227        This is an EXPERIMENTAL API.
228
229        If the channel reaches a stable connectivity state, it is guaranteed
230        that the return value of this function will eventually converge to that
231        state.
232
233        Args:
234          try_to_connect: a bool indicate whether the Channel should try to
235            connect to peer or not.
236
237        Returns: A ChannelConnectivity object.
238        """
239
240    @abc.abstractmethod
241    async def wait_for_state_change(
242        self,
243        last_observed_state: grpc.ChannelConnectivity,
244    ) -> None:
245        """Waits for a change in connectivity state.
246
247        This is an EXPERIMENTAL API.
248
249        The function blocks until there is a change in the channel connectivity
250        state from the "last_observed_state". If the state is already
251        different, this function will return immediately.
252
253        There is an inherent race between the invocation of
254        "Channel.wait_for_state_change" and "Channel.get_state". The state can
255        change arbitrary many times during the race, so there is no way to
256        observe every state transition.
257
258        If there is a need to put a timeout for this function, please refer to
259        "asyncio.wait_for".
260
261        Args:
262          last_observed_state: A grpc.ChannelConnectivity object representing
263            the last known state.
264        """
265
266    @abc.abstractmethod
267    async def channel_ready(self) -> None:
268        """Creates a coroutine that blocks until the Channel is READY."""
269
270    @abc.abstractmethod
271    def unary_unary(
272        self,
273        method: str,
274        request_serializer: Optional[SerializingFunction] = None,
275        response_deserializer: Optional[DeserializingFunction] = None,
276        _registered_method: Optional[bool] = False,
277    ) -> UnaryUnaryMultiCallable:
278        """Creates a UnaryUnaryMultiCallable for a unary-unary method.
279
280        Args:
281          method: The name of the RPC method.
282          request_serializer: Optional :term:`serializer` for serializing the request
283            message. Request goes unserialized in case None is passed.
284          response_deserializer: Optional :term:`deserializer` for deserializing the
285            response message. Response goes undeserialized in case None
286            is passed.
287          _registered_method: Implementation Private. Optional: A bool representing
288            whether the method is registered.
289
290        Returns:
291          A UnaryUnaryMultiCallable value for the named unary-unary method.
292        """
293
294    @abc.abstractmethod
295    def unary_stream(
296        self,
297        method: str,
298        request_serializer: Optional[SerializingFunction] = None,
299        response_deserializer: Optional[DeserializingFunction] = None,
300        _registered_method: Optional[bool] = False,
301    ) -> UnaryStreamMultiCallable:
302        """Creates a UnaryStreamMultiCallable for a unary-stream method.
303
304        Args:
305          method: The name of the RPC method.
306          request_serializer: Optional :term:`serializer` for serializing the request
307            message. Request goes unserialized in case None is passed.
308          response_deserializer: Optional :term:`deserializer` for deserializing the
309            response message. Response goes undeserialized in case None
310            is passed.
311          _registered_method: Implementation Private. Optional: A bool representing
312            whether the method is registered.
313
314        Returns:
315          A UnarySteramMultiCallable value for the named unary-stream method.
316        """
317
318    @abc.abstractmethod
319    def stream_unary(
320        self,
321        method: str,
322        request_serializer: Optional[SerializingFunction] = None,
323        response_deserializer: Optional[DeserializingFunction] = None,
324        _registered_method: Optional[bool] = False,
325    ) -> StreamUnaryMultiCallable:
326        """Creates a StreamUnaryMultiCallable for a stream-unary method.
327
328        Args:
329          method: The name of the RPC method.
330          request_serializer: Optional :term:`serializer` for serializing the request
331            message. Request goes unserialized in case None is passed.
332          response_deserializer: Optional :term:`deserializer` for deserializing the
333            response message. Response goes undeserialized in case None
334            is passed.
335          _registered_method: Implementation Private. Optional: A bool representing
336            whether the method is registered.
337
338        Returns:
339          A StreamUnaryMultiCallable value for the named stream-unary method.
340        """
341
342    @abc.abstractmethod
343    def stream_stream(
344        self,
345        method: str,
346        request_serializer: Optional[SerializingFunction] = None,
347        response_deserializer: Optional[DeserializingFunction] = None,
348        _registered_method: Optional[bool] = False,
349    ) -> StreamStreamMultiCallable:
350        """Creates a StreamStreamMultiCallable for a stream-stream method.
351
352        Args:
353          method: The name of the RPC method.
354          request_serializer: Optional :term:`serializer` for serializing the request
355            message. Request goes unserialized in case None is passed.
356          response_deserializer: Optional :term:`deserializer` for deserializing the
357            response message. Response goes undeserialized in case None
358            is passed.
359          _registered_method: Implementation Private. Optional: A bool representing
360            whether the method is registered.
361
362        Returns:
363          A StreamStreamMultiCallable value for the named stream-stream method.
364        """
365