1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15cimport cpython 16 17import logging 18import time 19import grpc 20 21logging.basicConfig() 22_LOGGER = logging.getLogger(__name__) 23 24cdef class Server: 25 26 def __cinit__(self, object arguments): 27 fork_handlers_and_grpc_init() 28 self.references = [] 29 self.registered_completion_queues = [] 30 self._vtable.copy = &_copy_pointer 31 self._vtable.destroy = &_destroy_pointer 32 self._vtable.cmp = &_compare_pointer 33 cdef _ArgumentsProcessor arguments_processor = _ArgumentsProcessor( 34 arguments) 35 cdef grpc_channel_args *c_arguments = arguments_processor.c(&self._vtable) 36 self.c_server = grpc_server_create(c_arguments, NULL) 37 arguments_processor.un_c() 38 self.references.append(arguments) 39 self.is_started = False 40 self.is_shutting_down = False 41 self.is_shutdown = False 42 43 def request_call( 44 self, CompletionQueue call_queue not None, 45 CompletionQueue server_queue not None, tag): 46 if not self.is_started or self.is_shutting_down: 47 raise ValueError("server must be started and not shutting down") 48 if server_queue not in self.registered_completion_queues: 49 raise ValueError("server_queue must be a registered completion queue") 50 cdef _RequestCallTag request_call_tag = _RequestCallTag(tag) 51 request_call_tag.prepare() 52 cpython.Py_INCREF(request_call_tag) 53 return grpc_server_request_call( 54 self.c_server, &request_call_tag.call.c_call, 55 &request_call_tag.call_details.c_details, 56 &request_call_tag.c_invocation_metadata, 57 call_queue.c_completion_queue, server_queue.c_completion_queue, 58 <cpython.PyObject *>request_call_tag) 59 60 def register_completion_queue( 61 self, CompletionQueue queue not None): 62 if self.is_started: 63 raise ValueError("cannot register completion queues after start") 64 with nogil: 65 grpc_server_register_completion_queue( 66 self.c_server, queue.c_completion_queue, NULL) 67 self.registered_completion_queues.append(queue) 68 69 def start(self): 70 if self.is_started: 71 raise ValueError("the server has already started") 72 self.backup_shutdown_queue = CompletionQueue(shutdown_cq=True) 73 self.register_completion_queue(self.backup_shutdown_queue) 74 self.is_started = True 75 with nogil: 76 grpc_server_start(self.c_server) 77 # Ensure the core has gotten a chance to do the start-up work 78 self.backup_shutdown_queue.poll(deadline=time.time()) 79 80 def add_http2_port(self, bytes address, 81 ServerCredentials server_credentials=None): 82 address = str_to_bytes(address) 83 self.references.append(address) 84 cdef int result 85 cdef char *address_c_string = address 86 if server_credentials is not None: 87 self.references.append(server_credentials) 88 with nogil: 89 result = grpc_server_add_secure_http2_port( 90 self.c_server, address_c_string, server_credentials.c_credentials) 91 else: 92 with nogil: 93 result = grpc_server_add_insecure_http2_port(self.c_server, 94 address_c_string) 95 return result 96 97 cdef _c_shutdown(self, CompletionQueue queue, tag): 98 self.is_shutting_down = True 99 cdef _ServerShutdownTag server_shutdown_tag = _ServerShutdownTag(tag, self) 100 cpython.Py_INCREF(server_shutdown_tag) 101 with nogil: 102 grpc_server_shutdown_and_notify( 103 self.c_server, queue.c_completion_queue, 104 <cpython.PyObject *>server_shutdown_tag) 105 106 def shutdown(self, CompletionQueue queue not None, tag): 107 if queue.is_shutting_down: 108 raise ValueError("queue must be live") 109 elif not self.is_started: 110 raise ValueError("the server hasn't started yet") 111 elif self.is_shutting_down: 112 return 113 elif queue not in self.registered_completion_queues: 114 raise ValueError("expected registered completion queue") 115 else: 116 self._c_shutdown(queue, tag) 117 118 cdef notify_shutdown_complete(self): 119 # called only after our server shutdown tag has emerged from a completion 120 # queue. 121 self.is_shutdown = True 122 123 def cancel_all_calls(self): 124 if not self.is_shutting_down: 125 raise RuntimeError("the server must be shutting down to cancel all calls") 126 elif self.is_shutdown: 127 return 128 else: 129 with nogil: 130 grpc_server_cancel_all_calls(self.c_server) 131 132 def __dealloc__(self): 133 if self.c_server != NULL: 134 if not self.is_started: 135 pass 136 elif self.is_shutdown: 137 pass 138 elif not self.is_shutting_down: 139 # the user didn't call shutdown - use our backup queue 140 self._c_shutdown(self.backup_shutdown_queue, None) 141 # and now we wait 142 while not self.is_shutdown: 143 self.backup_shutdown_queue.poll() 144 else: 145 # We're in the process of shutting down, but have not shutdown; can't do 146 # much but repeatedly release the GIL and wait 147 while not self.is_shutdown: 148 time.sleep(0) 149 grpc_server_destroy(self.c_server) 150 grpc_shutdown() 151