1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15 16cdef class Server: 17 18 def __cinit__(self, object arguments, bint xds): 19 fork_handlers_and_grpc_init() 20 self.references = [] 21 self.registered_completion_queues = [] 22 self.is_started = False 23 self.is_shutting_down = False 24 self.is_shutdown = False 25 self.c_server = NULL 26 cdef _ChannelArgs channel_args = _ChannelArgs(arguments) 27 self.c_server = grpc_server_create(channel_args.c_args(), NULL) 28 cdef grpc_server_xds_status_notifier notifier 29 notifier.on_serving_status_update = NULL 30 notifier.user_data = NULL 31 if xds: 32 grpc_server_set_config_fetcher(self.c_server, 33 grpc_server_config_fetcher_xds_create(notifier, channel_args.c_args())) 34 self.references.append(arguments) 35 36 def request_call( 37 self, CompletionQueue call_queue not None, 38 CompletionQueue server_queue not None, tag): 39 if not self.is_started or self.is_shutting_down: 40 raise ValueError("server must be started and not shutting down") 41 if server_queue not in self.registered_completion_queues: 42 raise ValueError("server_queue must be a registered completion queue") 43 cdef _RequestCallTag request_call_tag = _RequestCallTag(tag) 44 request_call_tag.prepare() 45 cpython.Py_INCREF(request_call_tag) 46 return grpc_server_request_call( 47 self.c_server, &request_call_tag.call.c_call, 48 &request_call_tag.call_details.c_details, 49 &request_call_tag.c_invocation_metadata, 50 call_queue.c_completion_queue, server_queue.c_completion_queue, 51 <cpython.PyObject *>request_call_tag) 52 53 def register_completion_queue( 54 self, CompletionQueue queue not None): 55 if self.is_started: 56 raise ValueError("cannot register completion queues after start") 57 with nogil: 58 grpc_server_register_completion_queue( 59 self.c_server, queue.c_completion_queue, NULL) 60 self.registered_completion_queues.append(queue) 61 62 def start(self, backup_queue=True): 63 """Start the Cython gRPC Server. 64 65 Args: 66 backup_queue: a bool indicates whether to spawn a backup completion 67 queue. In the case that no CQ is bound to the server, and the shutdown 68 of server becomes un-observable. 69 """ 70 if self.is_started: 71 raise ValueError("the server has already started") 72 if backup_queue: 73 self.backup_shutdown_queue = CompletionQueue(shutdown_cq=True) 74 self.register_completion_queue(self.backup_shutdown_queue) 75 self.is_started = True 76 with nogil: 77 grpc_server_start(self.c_server) 78 if backup_queue: 79 # Ensure the core has gotten a chance to do the start-up work 80 self.backup_shutdown_queue.poll(deadline=time.time()) 81 82 def add_http2_port(self, bytes address, 83 ServerCredentials server_credentials=None): 84 address = str_to_bytes(address) 85 self.references.append(address) 86 cdef int result 87 cdef char *address_c_string = address 88 if server_credentials is not None: 89 self.references.append(server_credentials) 90 with nogil: 91 result = grpc_server_add_http2_port( 92 self.c_server, address_c_string, server_credentials.c_credentials) 93 else: 94 with nogil: 95 creds = grpc_insecure_server_credentials_create() 96 result = grpc_server_add_http2_port(self.c_server, 97 address_c_string, creds) 98 grpc_server_credentials_release(creds) 99 return result 100 101 cdef _c_shutdown(self, CompletionQueue queue, tag): 102 self.is_shutting_down = True 103 cdef _ServerShutdownTag server_shutdown_tag = _ServerShutdownTag(tag, self) 104 cpython.Py_INCREF(server_shutdown_tag) 105 with nogil: 106 grpc_server_shutdown_and_notify( 107 self.c_server, queue.c_completion_queue, 108 <cpython.PyObject *>server_shutdown_tag) 109 110 def shutdown(self, CompletionQueue queue not None, tag): 111 if queue.is_shutting_down: 112 raise ValueError("queue must be live") 113 elif not self.is_started: 114 raise ValueError("the server hasn't started yet") 115 elif self.is_shutting_down: 116 return 117 elif queue not in self.registered_completion_queues: 118 raise ValueError("expected registered completion queue") 119 else: 120 self._c_shutdown(queue, tag) 121 122 cdef notify_shutdown_complete(self): 123 # called only after our server shutdown tag has emerged from a completion 124 # queue. 125 self.is_shutdown = True 126 127 def cancel_all_calls(self): 128 if not self.is_shutting_down: 129 raise UsageError("the server must be shutting down to cancel all calls") 130 elif self.is_shutdown: 131 return 132 else: 133 with nogil: 134 grpc_server_cancel_all_calls(self.c_server) 135 136 # TODO(https://github.com/grpc/grpc/issues/17515) Determine what, if any, 137 # portion of this is safe to call from __dealloc__, and potentially remove 138 # backup_shutdown_queue. 139 def destroy(self): 140 if self.c_server != NULL: 141 if not self.is_started: 142 pass 143 elif self.is_shutdown: 144 pass 145 elif not self.is_shutting_down: 146 if self.backup_shutdown_queue is None: 147 raise InternalError('Server shutdown failed: no completion queue.') 148 else: 149 # the user didn't call shutdown - use our backup queue 150 self._c_shutdown(self.backup_shutdown_queue, None) 151 # and now we wait 152 while not self.is_shutdown: 153 self.backup_shutdown_queue.poll() 154 else: 155 # We're in the process of shutting down, but have not shutdown; can't do 156 # much but repeatedly release the GIL and wait 157 while not self.is_shutdown: 158 time.sleep(0) 159 with nogil: 160 grpc_server_destroy(self.c_server) 161 self.c_server = NULL 162 163 def __dealloc__(self): 164 if self.c_server == NULL: 165 grpc_shutdown() 166