1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15 16cdef class Server: 17 18 def __cinit__(self, object arguments, bint xds): 19 fork_handlers_and_grpc_init() 20 self.references = [] 21 self.registered_completion_queues = [] 22 self.is_started = False 23 self.is_shutting_down = False 24 self.is_shutdown = False 25 self.c_server = NULL 26 cdef _ChannelArgs channel_args = _ChannelArgs(arguments) 27 self.c_server = grpc_server_create(channel_args.c_args(), NULL) 28 if xds: 29 grpc_server_set_config_fetcher(self.c_server, grpc_server_config_fetcher_xds_create()) 30 self.references.append(arguments) 31 32 def request_call( 33 self, CompletionQueue call_queue not None, 34 CompletionQueue server_queue not None, tag): 35 if not self.is_started or self.is_shutting_down: 36 raise ValueError("server must be started and not shutting down") 37 if server_queue not in self.registered_completion_queues: 38 raise ValueError("server_queue must be a registered completion queue") 39 cdef _RequestCallTag request_call_tag = _RequestCallTag(tag) 40 request_call_tag.prepare() 41 cpython.Py_INCREF(request_call_tag) 42 return grpc_server_request_call( 43 self.c_server, &request_call_tag.call.c_call, 44 &request_call_tag.call_details.c_details, 45 &request_call_tag.c_invocation_metadata, 46 call_queue.c_completion_queue, server_queue.c_completion_queue, 47 <cpython.PyObject *>request_call_tag) 48 49 def register_completion_queue( 50 self, CompletionQueue queue not None): 51 if self.is_started: 52 raise ValueError("cannot register completion queues after start") 53 with nogil: 54 grpc_server_register_completion_queue( 55 self.c_server, queue.c_completion_queue, NULL) 56 self.registered_completion_queues.append(queue) 57 58 def start(self, backup_queue=True): 59 """Start the Cython gRPC Server. 60 61 Args: 62 backup_queue: a bool indicates whether to spawn a backup completion 63 queue. In the case that no CQ is bound to the server, and the shutdown 64 of server becomes un-observable. 65 """ 66 if self.is_started: 67 raise ValueError("the server has already started") 68 if backup_queue: 69 self.backup_shutdown_queue = CompletionQueue(shutdown_cq=True) 70 self.register_completion_queue(self.backup_shutdown_queue) 71 self.is_started = True 72 with nogil: 73 grpc_server_start(self.c_server) 74 if backup_queue: 75 # Ensure the core has gotten a chance to do the start-up work 76 self.backup_shutdown_queue.poll(deadline=time.time()) 77 78 def add_http2_port(self, bytes address, 79 ServerCredentials server_credentials=None): 80 address = str_to_bytes(address) 81 self.references.append(address) 82 cdef int result 83 cdef char *address_c_string = address 84 if server_credentials is not None: 85 self.references.append(server_credentials) 86 with nogil: 87 result = grpc_server_add_secure_http2_port( 88 self.c_server, address_c_string, server_credentials.c_credentials) 89 else: 90 with nogil: 91 result = grpc_server_add_insecure_http2_port(self.c_server, 92 address_c_string) 93 return result 94 95 cdef _c_shutdown(self, CompletionQueue queue, tag): 96 self.is_shutting_down = True 97 cdef _ServerShutdownTag server_shutdown_tag = _ServerShutdownTag(tag, self) 98 cpython.Py_INCREF(server_shutdown_tag) 99 with nogil: 100 grpc_server_shutdown_and_notify( 101 self.c_server, queue.c_completion_queue, 102 <cpython.PyObject *>server_shutdown_tag) 103 104 def shutdown(self, CompletionQueue queue not None, tag): 105 if queue.is_shutting_down: 106 raise ValueError("queue must be live") 107 elif not self.is_started: 108 raise ValueError("the server hasn't started yet") 109 elif self.is_shutting_down: 110 return 111 elif queue not in self.registered_completion_queues: 112 raise ValueError("expected registered completion queue") 113 else: 114 self._c_shutdown(queue, tag) 115 116 cdef notify_shutdown_complete(self): 117 # called only after our server shutdown tag has emerged from a completion 118 # queue. 119 self.is_shutdown = True 120 121 def cancel_all_calls(self): 122 if not self.is_shutting_down: 123 raise UsageError("the server must be shutting down to cancel all calls") 124 elif self.is_shutdown: 125 return 126 else: 127 with nogil: 128 grpc_server_cancel_all_calls(self.c_server) 129 130 # TODO(https://github.com/grpc/grpc/issues/17515) Determine what, if any, 131 # portion of this is safe to call from __dealloc__, and potentially remove 132 # backup_shutdown_queue. 133 def destroy(self): 134 if self.c_server != NULL: 135 if not self.is_started: 136 pass 137 elif self.is_shutdown: 138 pass 139 elif not self.is_shutting_down: 140 if self.backup_shutdown_queue is None: 141 raise InternalError('Server shutdown failed: no completion queue.') 142 else: 143 # the user didn't call shutdown - use our backup queue 144 self._c_shutdown(self.backup_shutdown_queue, None) 145 # and now we wait 146 while not self.is_shutdown: 147 self.backup_shutdown_queue.poll() 148 else: 149 # We're in the process of shutting down, but have not shutdown; can't do 150 # much but repeatedly release the GIL and wait 151 while not self.is_shutdown: 152 time.sleep(0) 153 with nogil: 154 grpc_server_destroy(self.c_server) 155 self.c_server = NULL 156 157 def __dealloc__(self): 158 if self.c_server == NULL: 159 grpc_shutdown() 160