• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2019 The gRPC Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Server-side implementation of gRPC Asyncio Python."""
15
16from concurrent.futures import Executor
17from typing import Any, Optional, Sequence
18
19import grpc
20from grpc import _common, _compression
21from grpc._cython import cygrpc
22
23from . import _base_server
24from ._typing import ChannelArgumentType
25from ._interceptor import ServerInterceptor
26
27
28def _augment_channel_arguments(base_options: ChannelArgumentType,
29                               compression: Optional[grpc.Compression]):
30    compression_option = _compression.create_channel_option(compression)
31    return tuple(base_options) + compression_option
32
33
34class Server(_base_server.Server):
35    """Serves RPCs."""
36
37    def __init__(self, thread_pool: Optional[Executor],
38                 generic_handlers: Optional[Sequence[grpc.GenericRpcHandler]],
39                 interceptors: Optional[Sequence[Any]],
40                 options: ChannelArgumentType,
41                 maximum_concurrent_rpcs: Optional[int],
42                 compression: Optional[grpc.Compression]):
43        self._loop = cygrpc.get_working_loop()
44        if interceptors:
45            invalid_interceptors = [
46                interceptor for interceptor in interceptors
47                if not isinstance(interceptor, ServerInterceptor)
48            ]
49            if invalid_interceptors:
50                raise ValueError(
51                    'Interceptor must be ServerInterceptor, the '
52                    f'following are invalid: {invalid_interceptors}')
53        self._server = cygrpc.AioServer(
54            self._loop, thread_pool, generic_handlers, interceptors,
55            _augment_channel_arguments(options, compression),
56            maximum_concurrent_rpcs)
57
58    def add_generic_rpc_handlers(
59            self,
60            generic_rpc_handlers: Sequence[grpc.GenericRpcHandler]) -> None:
61        """Registers GenericRpcHandlers with this Server.
62
63        This method is only safe to call before the server is started.
64
65        Args:
66          generic_rpc_handlers: A sequence of GenericRpcHandlers that will be
67          used to service RPCs.
68        """
69        self._server.add_generic_rpc_handlers(generic_rpc_handlers)
70
71    def add_insecure_port(self, address: str) -> int:
72        """Opens an insecure port for accepting RPCs.
73
74        This method may only be called before starting the server.
75
76        Args:
77          address: The address for which to open a port. If the port is 0,
78            or not specified in the address, then the gRPC runtime will choose a port.
79
80        Returns:
81          An integer port on which the server will accept RPC requests.
82        """
83        return _common.validate_port_binding_result(
84            address, self._server.add_insecure_port(_common.encode(address)))
85
86    def add_secure_port(self, address: str,
87                        server_credentials: grpc.ServerCredentials) -> int:
88        """Opens a secure port for accepting RPCs.
89
90        This method may only be called before starting the server.
91
92        Args:
93          address: The address for which to open a port.
94            if the port is 0, or not specified in the address, then the gRPC
95            runtime will choose a port.
96          server_credentials: A ServerCredentials object.
97
98        Returns:
99          An integer port on which the server will accept RPC requests.
100        """
101        return _common.validate_port_binding_result(
102            address,
103            self._server.add_secure_port(_common.encode(address),
104                                         server_credentials))
105
106    async def start(self) -> None:
107        """Starts this Server.
108
109        This method may only be called once. (i.e. it is not idempotent).
110        """
111        await self._server.start()
112
113    async def stop(self, grace: Optional[float]) -> None:
114        """Stops this Server.
115
116        This method immediately stops the server from servicing new RPCs in
117        all cases.
118
119        If a grace period is specified, this method returns immediately and all
120        RPCs active at the end of the grace period are aborted. If a grace
121        period is not specified (by passing None for grace), all existing RPCs
122        are aborted immediately and this method blocks until the last RPC
123        handler terminates.
124
125        This method is idempotent and may be called at any time. Passing a
126        smaller grace value in a subsequent call will have the effect of
127        stopping the Server sooner (passing None will have the effect of
128        stopping the server immediately). Passing a larger grace value in a
129        subsequent call will not have the effect of stopping the server later
130        (i.e. the most restrictive grace value is used).
131
132        Args:
133          grace: A duration of time in seconds or None.
134        """
135        await self._server.shutdown(grace)
136
137    async def wait_for_termination(self,
138                                   timeout: Optional[float] = None) -> bool:
139        """Block current coroutine until the server stops.
140
141        This is an EXPERIMENTAL API.
142
143        The wait will not consume computational resources during blocking, and
144        it will block until one of the two following conditions are met:
145
146        1) The server is stopped or terminated;
147        2) A timeout occurs if timeout is not `None`.
148
149        The timeout argument works in the same way as `threading.Event.wait()`.
150        https://docs.python.org/3/library/threading.html#threading.Event.wait
151
152        Args:
153          timeout: A floating point number specifying a timeout for the
154            operation in seconds.
155
156        Returns:
157          A bool indicates if the operation times out.
158        """
159        return await self._server.wait_for_termination(timeout)
160
161    def __del__(self):
162        """Schedules a graceful shutdown in current event loop.
163
164        The Cython AioServer doesn't hold a ref-count to this class. It should
165        be safe to slightly extend the underlying Cython object's life span.
166        """
167        if hasattr(self, '_server'):
168            if self._server.is_running():
169                cygrpc.schedule_coro_threadsafe(
170                    self._server.shutdown(None),
171                    self._loop,
172                )
173
174
175def server(migration_thread_pool: Optional[Executor] = None,
176           handlers: Optional[Sequence[grpc.GenericRpcHandler]] = None,
177           interceptors: Optional[Sequence[Any]] = None,
178           options: Optional[ChannelArgumentType] = None,
179           maximum_concurrent_rpcs: Optional[int] = None,
180           compression: Optional[grpc.Compression] = None):
181    """Creates a Server with which RPCs can be serviced.
182
183    Args:
184      migration_thread_pool: A futures.ThreadPoolExecutor to be used by the
185        Server to execute non-AsyncIO RPC handlers for migration purpose.
186      handlers: An optional list of GenericRpcHandlers used for executing RPCs.
187        More handlers may be added by calling add_generic_rpc_handlers any time
188        before the server is started.
189      interceptors: An optional list of ServerInterceptor objects that observe
190        and optionally manipulate the incoming RPCs before handing them over to
191        handlers. The interceptors are given control in the order they are
192        specified. This is an EXPERIMENTAL API.
193      options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC runtime)
194        to configure the channel.
195      maximum_concurrent_rpcs: The maximum number of concurrent RPCs this server
196        will service before returning RESOURCE_EXHAUSTED status, or None to
197        indicate no limit.
198      compression: An element of grpc.compression, e.g.
199        grpc.compression.Gzip. This compression algorithm will be used for the
200        lifetime of the server unless overridden by set_compression. This is an
201        EXPERIMENTAL option.
202
203    Returns:
204      A Server object.
205    """
206    return Server(migration_thread_pool, () if handlers is None else handlers,
207                  () if interceptors is None else interceptors,
208                  () if options is None else options, maximum_concurrent_rpcs,
209                  compression)
210