1# Copyright 2016 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Service-side implementation of gRPC Python.""" 15 16import collections 17import enum 18import logging 19import threading 20import time 21 22import six 23 24import grpc 25from grpc import _common 26from grpc import _interceptor 27from grpc._cython import cygrpc 28from grpc.framework.foundation import callable_util 29 30logging.basicConfig() 31_LOGGER = logging.getLogger(__name__) 32 33_SHUTDOWN_TAG = 'shutdown' 34_REQUEST_CALL_TAG = 'request_call' 35 36_RECEIVE_CLOSE_ON_SERVER_TOKEN = 'receive_close_on_server' 37_SEND_INITIAL_METADATA_TOKEN = 'send_initial_metadata' 38_RECEIVE_MESSAGE_TOKEN = 'receive_message' 39_SEND_MESSAGE_TOKEN = 'send_message' 40_SEND_INITIAL_METADATA_AND_SEND_MESSAGE_TOKEN = ( 41 'send_initial_metadata * send_message') 42_SEND_STATUS_FROM_SERVER_TOKEN = 'send_status_from_server' 43_SEND_INITIAL_METADATA_AND_SEND_STATUS_FROM_SERVER_TOKEN = ( 44 'send_initial_metadata * send_status_from_server') 45 46_OPEN = 'open' 47_CLOSED = 'closed' 48_CANCELLED = 'cancelled' 49 50_EMPTY_FLAGS = 0 51 52_UNEXPECTED_EXIT_SERVER_GRACE = 1.0 53 54 55def _serialized_request(request_event): 56 return request_event.batch_operations[0].message() 57 58 59def _application_code(code): 60 cygrpc_code = _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE.get(code) 61 return cygrpc.StatusCode.unknown if cygrpc_code is None else cygrpc_code 62 63 64def _completion_code(state): 65 if state.code is None: 66 return cygrpc.StatusCode.ok 67 else: 68 return _application_code(state.code) 69 70 71def _abortion_code(state, code): 72 if state.code is None: 73 return code 74 else: 75 return _application_code(state.code) 76 77 78def _details(state): 79 return b'' if state.details is None else state.details 80 81 82class _HandlerCallDetails( 83 collections.namedtuple('_HandlerCallDetails', ( 84 'method', 85 'invocation_metadata', 86 )), grpc.HandlerCallDetails): 87 pass 88 89 90class _RPCState(object): 91 92 def __init__(self): 93 self.condition = threading.Condition() 94 self.due = set() 95 self.request = None 96 self.client = _OPEN 97 self.initial_metadata_allowed = True 98 self.disable_next_compression = False 99 self.trailing_metadata = None 100 self.code = None 101 self.details = None 102 self.statused = False 103 self.rpc_errors = [] 104 self.callbacks = [] 105 self.abortion = None 106 107 108def _raise_rpc_error(state): 109 rpc_error = grpc.RpcError() 110 state.rpc_errors.append(rpc_error) 111 raise rpc_error 112 113 114def _possibly_finish_call(state, token): 115 state.due.remove(token) 116 if (state.client is _CANCELLED or state.statused) and not state.due: 117 callbacks = state.callbacks 118 state.callbacks = None 119 return state, callbacks 120 else: 121 return None, () 122 123 124def _send_status_from_server(state, token): 125 126 def send_status_from_server(unused_send_status_from_server_event): 127 with state.condition: 128 return _possibly_finish_call(state, token) 129 130 return send_status_from_server 131 132 133def _abort(state, call, code, details): 134 if state.client is not _CANCELLED: 135 effective_code = _abortion_code(state, code) 136 effective_details = details if state.details is None else state.details 137 if state.initial_metadata_allowed: 138 operations = ( 139 cygrpc.SendInitialMetadataOperation(None, _EMPTY_FLAGS), 140 cygrpc.SendStatusFromServerOperation( 141 state.trailing_metadata, effective_code, effective_details, 142 _EMPTY_FLAGS), 143 ) 144 token = _SEND_INITIAL_METADATA_AND_SEND_STATUS_FROM_SERVER_TOKEN 145 else: 146 operations = (cygrpc.SendStatusFromServerOperation( 147 state.trailing_metadata, effective_code, effective_details, 148 _EMPTY_FLAGS),) 149 token = _SEND_STATUS_FROM_SERVER_TOKEN 150 call.start_server_batch(operations, 151 _send_status_from_server(state, token)) 152 state.statused = True 153 state.due.add(token) 154 155 156def _receive_close_on_server(state): 157 158 def receive_close_on_server(receive_close_on_server_event): 159 with state.condition: 160 if receive_close_on_server_event.batch_operations[0].cancelled(): 161 state.client = _CANCELLED 162 elif state.client is _OPEN: 163 state.client = _CLOSED 164 state.condition.notify_all() 165 return _possibly_finish_call(state, _RECEIVE_CLOSE_ON_SERVER_TOKEN) 166 167 return receive_close_on_server 168 169 170def _receive_message(state, call, request_deserializer): 171 172 def receive_message(receive_message_event): 173 serialized_request = _serialized_request(receive_message_event) 174 if serialized_request is None: 175 with state.condition: 176 if state.client is _OPEN: 177 state.client = _CLOSED 178 state.condition.notify_all() 179 return _possibly_finish_call(state, _RECEIVE_MESSAGE_TOKEN) 180 else: 181 request = _common.deserialize(serialized_request, 182 request_deserializer) 183 with state.condition: 184 if request is None: 185 _abort(state, call, cygrpc.StatusCode.internal, 186 b'Exception deserializing request!') 187 else: 188 state.request = request 189 state.condition.notify_all() 190 return _possibly_finish_call(state, _RECEIVE_MESSAGE_TOKEN) 191 192 return receive_message 193 194 195def _send_initial_metadata(state): 196 197 def send_initial_metadata(unused_send_initial_metadata_event): 198 with state.condition: 199 return _possibly_finish_call(state, _SEND_INITIAL_METADATA_TOKEN) 200 201 return send_initial_metadata 202 203 204def _send_message(state, token): 205 206 def send_message(unused_send_message_event): 207 with state.condition: 208 state.condition.notify_all() 209 return _possibly_finish_call(state, token) 210 211 return send_message 212 213 214class _Context(grpc.ServicerContext): 215 216 def __init__(self, rpc_event, state, request_deserializer): 217 self._rpc_event = rpc_event 218 self._state = state 219 self._request_deserializer = request_deserializer 220 221 def is_active(self): 222 with self._state.condition: 223 return self._state.client is not _CANCELLED and not self._state.statused 224 225 def time_remaining(self): 226 return max(self._rpc_event.call_details.deadline - time.time(), 0) 227 228 def cancel(self): 229 self._rpc_event.call.cancel() 230 231 def add_callback(self, callback): 232 with self._state.condition: 233 if self._state.callbacks is None: 234 return False 235 else: 236 self._state.callbacks.append(callback) 237 return True 238 239 def disable_next_message_compression(self): 240 with self._state.condition: 241 self._state.disable_next_compression = True 242 243 def invocation_metadata(self): 244 return self._rpc_event.invocation_metadata 245 246 def peer(self): 247 return _common.decode(self._rpc_event.call.peer()) 248 249 def peer_identities(self): 250 return cygrpc.peer_identities(self._rpc_event.call) 251 252 def peer_identity_key(self): 253 id_key = cygrpc.peer_identity_key(self._rpc_event.call) 254 return id_key if id_key is None else _common.decode(id_key) 255 256 def auth_context(self): 257 return { 258 _common.decode(key): value 259 for key, value in six.iteritems( 260 cygrpc.auth_context(self._rpc_event.call)) 261 } 262 263 def send_initial_metadata(self, initial_metadata): 264 with self._state.condition: 265 if self._state.client is _CANCELLED: 266 _raise_rpc_error(self._state) 267 else: 268 if self._state.initial_metadata_allowed: 269 operation = cygrpc.SendInitialMetadataOperation( 270 initial_metadata, _EMPTY_FLAGS) 271 self._rpc_event.call.start_server_batch( 272 (operation,), _send_initial_metadata(self._state)) 273 self._state.initial_metadata_allowed = False 274 self._state.due.add(_SEND_INITIAL_METADATA_TOKEN) 275 else: 276 raise ValueError('Initial metadata no longer allowed!') 277 278 def set_trailing_metadata(self, trailing_metadata): 279 with self._state.condition: 280 self._state.trailing_metadata = trailing_metadata 281 282 def abort(self, code, details): 283 # treat OK like other invalid arguments: fail the RPC 284 if code == grpc.StatusCode.OK: 285 _LOGGER.error( 286 'abort() called with StatusCode.OK; returning UNKNOWN') 287 code = grpc.StatusCode.UNKNOWN 288 details = '' 289 with self._state.condition: 290 self._state.code = code 291 self._state.details = _common.encode(details) 292 self._state.abortion = Exception() 293 raise self._state.abortion 294 295 def set_code(self, code): 296 with self._state.condition: 297 self._state.code = code 298 299 def set_details(self, details): 300 with self._state.condition: 301 self._state.details = _common.encode(details) 302 303 304class _RequestIterator(object): 305 306 def __init__(self, state, call, request_deserializer): 307 self._state = state 308 self._call = call 309 self._request_deserializer = request_deserializer 310 311 def _raise_or_start_receive_message(self): 312 if self._state.client is _CANCELLED: 313 _raise_rpc_error(self._state) 314 elif self._state.client is _CLOSED or self._state.statused: 315 raise StopIteration() 316 else: 317 self._call.start_server_batch( 318 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), 319 _receive_message(self._state, self._call, 320 self._request_deserializer)) 321 self._state.due.add(_RECEIVE_MESSAGE_TOKEN) 322 323 def _look_for_request(self): 324 if self._state.client is _CANCELLED: 325 _raise_rpc_error(self._state) 326 elif (self._state.request is None and 327 _RECEIVE_MESSAGE_TOKEN not in self._state.due): 328 raise StopIteration() 329 else: 330 request = self._state.request 331 self._state.request = None 332 return request 333 334 raise AssertionError() # should never run 335 336 def _next(self): 337 with self._state.condition: 338 self._raise_or_start_receive_message() 339 while True: 340 self._state.condition.wait() 341 request = self._look_for_request() 342 if request is not None: 343 return request 344 345 def __iter__(self): 346 return self 347 348 def __next__(self): 349 return self._next() 350 351 def next(self): 352 return self._next() 353 354 355def _unary_request(rpc_event, state, request_deserializer): 356 357 def unary_request(): 358 with state.condition: 359 if state.client is _CANCELLED or state.statused: 360 return None 361 else: 362 rpc_event.call.start_server_batch( 363 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), 364 _receive_message(state, rpc_event.call, 365 request_deserializer)) 366 state.due.add(_RECEIVE_MESSAGE_TOKEN) 367 while True: 368 state.condition.wait() 369 if state.request is None: 370 if state.client is _CLOSED: 371 details = '"{}" requires exactly one request message.'.format( 372 rpc_event.call_details.method) 373 _abort(state, rpc_event.call, 374 cygrpc.StatusCode.unimplemented, 375 _common.encode(details)) 376 return None 377 elif state.client is _CANCELLED: 378 return None 379 else: 380 request = state.request 381 state.request = None 382 return request 383 384 return unary_request 385 386 387def _call_behavior(rpc_event, state, behavior, argument, request_deserializer): 388 context = _Context(rpc_event, state, request_deserializer) 389 try: 390 return behavior(argument, context), True 391 except Exception as exception: # pylint: disable=broad-except 392 with state.condition: 393 if exception is state.abortion: 394 _abort(state, rpc_event.call, cygrpc.StatusCode.unknown, 395 b'RPC Aborted') 396 elif exception not in state.rpc_errors: 397 details = 'Exception calling application: {}'.format(exception) 398 _LOGGER.exception(details) 399 _abort(state, rpc_event.call, cygrpc.StatusCode.unknown, 400 _common.encode(details)) 401 return None, False 402 403 404def _take_response_from_response_iterator(rpc_event, state, response_iterator): 405 try: 406 return next(response_iterator), True 407 except StopIteration: 408 return None, True 409 except Exception as exception: # pylint: disable=broad-except 410 with state.condition: 411 if exception is state.abortion: 412 _abort(state, rpc_event.call, cygrpc.StatusCode.unknown, 413 b'RPC Aborted') 414 elif exception not in state.rpc_errors: 415 details = 'Exception iterating responses: {}'.format(exception) 416 _LOGGER.exception(details) 417 _abort(state, rpc_event.call, cygrpc.StatusCode.unknown, 418 _common.encode(details)) 419 return None, False 420 421 422def _serialize_response(rpc_event, state, response, response_serializer): 423 serialized_response = _common.serialize(response, response_serializer) 424 if serialized_response is None: 425 with state.condition: 426 _abort(state, rpc_event.call, cygrpc.StatusCode.internal, 427 b'Failed to serialize response!') 428 return None 429 else: 430 return serialized_response 431 432 433def _send_response(rpc_event, state, serialized_response): 434 with state.condition: 435 if state.client is _CANCELLED or state.statused: 436 return False 437 else: 438 if state.initial_metadata_allowed: 439 operations = ( 440 cygrpc.SendInitialMetadataOperation(None, _EMPTY_FLAGS), 441 cygrpc.SendMessageOperation(serialized_response, 442 _EMPTY_FLAGS), 443 ) 444 state.initial_metadata_allowed = False 445 token = _SEND_INITIAL_METADATA_AND_SEND_MESSAGE_TOKEN 446 else: 447 operations = (cygrpc.SendMessageOperation( 448 serialized_response, _EMPTY_FLAGS),) 449 token = _SEND_MESSAGE_TOKEN 450 rpc_event.call.start_server_batch(operations, 451 _send_message(state, token)) 452 state.due.add(token) 453 while True: 454 state.condition.wait() 455 if token not in state.due: 456 return state.client is not _CANCELLED and not state.statused 457 458 459def _status(rpc_event, state, serialized_response): 460 with state.condition: 461 if state.client is not _CANCELLED: 462 code = _completion_code(state) 463 details = _details(state) 464 operations = [ 465 cygrpc.SendStatusFromServerOperation( 466 state.trailing_metadata, code, details, _EMPTY_FLAGS), 467 ] 468 if state.initial_metadata_allowed: 469 operations.append( 470 cygrpc.SendInitialMetadataOperation(None, _EMPTY_FLAGS)) 471 if serialized_response is not None: 472 operations.append( 473 cygrpc.SendMessageOperation(serialized_response, 474 _EMPTY_FLAGS)) 475 rpc_event.call.start_server_batch( 476 operations, 477 _send_status_from_server(state, _SEND_STATUS_FROM_SERVER_TOKEN)) 478 state.statused = True 479 state.due.add(_SEND_STATUS_FROM_SERVER_TOKEN) 480 481 482def _unary_response_in_pool(rpc_event, state, behavior, argument_thunk, 483 request_deserializer, response_serializer): 484 argument = argument_thunk() 485 if argument is not None: 486 response, proceed = _call_behavior(rpc_event, state, behavior, argument, 487 request_deserializer) 488 if proceed: 489 serialized_response = _serialize_response( 490 rpc_event, state, response, response_serializer) 491 if serialized_response is not None: 492 _status(rpc_event, state, serialized_response) 493 494 495def _stream_response_in_pool(rpc_event, state, behavior, argument_thunk, 496 request_deserializer, response_serializer): 497 argument = argument_thunk() 498 if argument is not None: 499 response_iterator, proceed = _call_behavior( 500 rpc_event, state, behavior, argument, request_deserializer) 501 if proceed: 502 while True: 503 response, proceed = _take_response_from_response_iterator( 504 rpc_event, state, response_iterator) 505 if proceed: 506 if response is None: 507 _status(rpc_event, state, None) 508 break 509 else: 510 serialized_response = _serialize_response( 511 rpc_event, state, response, response_serializer) 512 if serialized_response is not None: 513 proceed = _send_response(rpc_event, state, 514 serialized_response) 515 if not proceed: 516 break 517 else: 518 break 519 else: 520 break 521 522 523def _handle_unary_unary(rpc_event, state, method_handler, thread_pool): 524 unary_request = _unary_request(rpc_event, state, 525 method_handler.request_deserializer) 526 return thread_pool.submit(_unary_response_in_pool, rpc_event, state, 527 method_handler.unary_unary, unary_request, 528 method_handler.request_deserializer, 529 method_handler.response_serializer) 530 531 532def _handle_unary_stream(rpc_event, state, method_handler, thread_pool): 533 unary_request = _unary_request(rpc_event, state, 534 method_handler.request_deserializer) 535 return thread_pool.submit(_stream_response_in_pool, rpc_event, state, 536 method_handler.unary_stream, unary_request, 537 method_handler.request_deserializer, 538 method_handler.response_serializer) 539 540 541def _handle_stream_unary(rpc_event, state, method_handler, thread_pool): 542 request_iterator = _RequestIterator(state, rpc_event.call, 543 method_handler.request_deserializer) 544 return thread_pool.submit( 545 _unary_response_in_pool, rpc_event, state, method_handler.stream_unary, 546 lambda: request_iterator, method_handler.request_deserializer, 547 method_handler.response_serializer) 548 549 550def _handle_stream_stream(rpc_event, state, method_handler, thread_pool): 551 request_iterator = _RequestIterator(state, rpc_event.call, 552 method_handler.request_deserializer) 553 return thread_pool.submit( 554 _stream_response_in_pool, rpc_event, state, 555 method_handler.stream_stream, lambda: request_iterator, 556 method_handler.request_deserializer, method_handler.response_serializer) 557 558 559def _find_method_handler(rpc_event, generic_handlers, interceptor_pipeline): 560 561 def query_handlers(handler_call_details): 562 for generic_handler in generic_handlers: 563 method_handler = generic_handler.service(handler_call_details) 564 if method_handler is not None: 565 return method_handler 566 return None 567 568 handler_call_details = _HandlerCallDetails( 569 _common.decode(rpc_event.call_details.method), 570 rpc_event.invocation_metadata) 571 572 if interceptor_pipeline is not None: 573 return interceptor_pipeline.execute(query_handlers, 574 handler_call_details) 575 else: 576 return query_handlers(handler_call_details) 577 578 579def _reject_rpc(rpc_event, status, details): 580 operations = ( 581 cygrpc.SendInitialMetadataOperation(None, _EMPTY_FLAGS), 582 cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS), 583 cygrpc.SendStatusFromServerOperation(None, status, details, 584 _EMPTY_FLAGS), 585 ) 586 rpc_state = _RPCState() 587 rpc_event.call.start_server_batch(operations, 588 lambda ignored_event: (rpc_state, (),)) 589 return rpc_state 590 591 592def _handle_with_method_handler(rpc_event, method_handler, thread_pool): 593 state = _RPCState() 594 with state.condition: 595 rpc_event.call.start_server_batch( 596 (cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),), 597 _receive_close_on_server(state)) 598 state.due.add(_RECEIVE_CLOSE_ON_SERVER_TOKEN) 599 if method_handler.request_streaming: 600 if method_handler.response_streaming: 601 return state, _handle_stream_stream(rpc_event, state, 602 method_handler, thread_pool) 603 else: 604 return state, _handle_stream_unary(rpc_event, state, 605 method_handler, thread_pool) 606 else: 607 if method_handler.response_streaming: 608 return state, _handle_unary_stream(rpc_event, state, 609 method_handler, thread_pool) 610 else: 611 return state, _handle_unary_unary(rpc_event, state, 612 method_handler, thread_pool) 613 614 615def _handle_call(rpc_event, generic_handlers, interceptor_pipeline, thread_pool, 616 concurrency_exceeded): 617 if not rpc_event.success: 618 return None, None 619 if rpc_event.call_details.method is not None: 620 try: 621 method_handler = _find_method_handler(rpc_event, generic_handlers, 622 interceptor_pipeline) 623 except Exception as exception: # pylint: disable=broad-except 624 details = 'Exception servicing handler: {}'.format(exception) 625 _LOGGER.exception(details) 626 return _reject_rpc(rpc_event, cygrpc.StatusCode.unknown, 627 b'Error in service handler!'), None 628 if method_handler is None: 629 return _reject_rpc(rpc_event, cygrpc.StatusCode.unimplemented, 630 b'Method not found!'), None 631 elif concurrency_exceeded: 632 return _reject_rpc(rpc_event, cygrpc.StatusCode.resource_exhausted, 633 b'Concurrent RPC limit exceeded!'), None 634 else: 635 return _handle_with_method_handler(rpc_event, method_handler, 636 thread_pool) 637 else: 638 return None, None 639 640 641@enum.unique 642class _ServerStage(enum.Enum): 643 STOPPED = 'stopped' 644 STARTED = 'started' 645 GRACE = 'grace' 646 647 648class _ServerState(object): 649 650 # pylint: disable=too-many-arguments 651 def __init__(self, completion_queue, server, generic_handlers, 652 interceptor_pipeline, thread_pool, maximum_concurrent_rpcs): 653 self.lock = threading.RLock() 654 self.completion_queue = completion_queue 655 self.server = server 656 self.generic_handlers = list(generic_handlers) 657 self.interceptor_pipeline = interceptor_pipeline 658 self.thread_pool = thread_pool 659 self.stage = _ServerStage.STOPPED 660 self.shutdown_events = None 661 self.maximum_concurrent_rpcs = maximum_concurrent_rpcs 662 self.active_rpc_count = 0 663 664 # TODO(https://github.com/grpc/grpc/issues/6597): eliminate these fields. 665 self.rpc_states = set() 666 self.due = set() 667 668 669def _add_generic_handlers(state, generic_handlers): 670 with state.lock: 671 state.generic_handlers.extend(generic_handlers) 672 673 674def _add_insecure_port(state, address): 675 with state.lock: 676 return state.server.add_http2_port(address) 677 678 679def _add_secure_port(state, address, server_credentials): 680 with state.lock: 681 return state.server.add_http2_port(address, 682 server_credentials._credentials) 683 684 685def _request_call(state): 686 state.server.request_call(state.completion_queue, state.completion_queue, 687 _REQUEST_CALL_TAG) 688 state.due.add(_REQUEST_CALL_TAG) 689 690 691# TODO(https://github.com/grpc/grpc/issues/6597): delete this function. 692def _stop_serving(state): 693 if not state.rpc_states and not state.due: 694 for shutdown_event in state.shutdown_events: 695 shutdown_event.set() 696 state.stage = _ServerStage.STOPPED 697 return True 698 else: 699 return False 700 701 702def _on_call_completed(state): 703 with state.lock: 704 state.active_rpc_count -= 1 705 706 707def _serve(state): 708 while True: 709 event = state.completion_queue.poll() 710 if event.tag is _SHUTDOWN_TAG: 711 with state.lock: 712 state.due.remove(_SHUTDOWN_TAG) 713 if _stop_serving(state): 714 return 715 elif event.tag is _REQUEST_CALL_TAG: 716 with state.lock: 717 state.due.remove(_REQUEST_CALL_TAG) 718 concurrency_exceeded = ( 719 state.maximum_concurrent_rpcs is not None and 720 state.active_rpc_count >= state.maximum_concurrent_rpcs) 721 rpc_state, rpc_future = _handle_call( 722 event, state.generic_handlers, state.interceptor_pipeline, 723 state.thread_pool, concurrency_exceeded) 724 if rpc_state is not None: 725 state.rpc_states.add(rpc_state) 726 if rpc_future is not None: 727 state.active_rpc_count += 1 728 rpc_future.add_done_callback( 729 lambda unused_future: _on_call_completed(state)) 730 if state.stage is _ServerStage.STARTED: 731 _request_call(state) 732 elif _stop_serving(state): 733 return 734 else: 735 rpc_state, callbacks = event.tag(event) 736 for callback in callbacks: 737 callable_util.call_logging_exceptions( 738 callback, 'Exception calling callback!') 739 if rpc_state is not None: 740 with state.lock: 741 state.rpc_states.remove(rpc_state) 742 if _stop_serving(state): 743 return 744 # We want to force the deletion of the previous event 745 # ~before~ we poll again; if the event has a reference 746 # to a shutdown Call object, this can induce spinlock. 747 event = None 748 749 750def _stop(state, grace): 751 with state.lock: 752 if state.stage is _ServerStage.STOPPED: 753 shutdown_event = threading.Event() 754 shutdown_event.set() 755 return shutdown_event 756 else: 757 if state.stage is _ServerStage.STARTED: 758 state.server.shutdown(state.completion_queue, _SHUTDOWN_TAG) 759 state.stage = _ServerStage.GRACE 760 state.shutdown_events = [] 761 state.due.add(_SHUTDOWN_TAG) 762 shutdown_event = threading.Event() 763 state.shutdown_events.append(shutdown_event) 764 if grace is None: 765 state.server.cancel_all_calls() 766 else: 767 768 def cancel_all_calls_after_grace(): 769 shutdown_event.wait(timeout=grace) 770 with state.lock: 771 state.server.cancel_all_calls() 772 773 thread = threading.Thread(target=cancel_all_calls_after_grace) 774 thread.start() 775 return shutdown_event 776 shutdown_event.wait() 777 return shutdown_event 778 779 780def _start(state): 781 with state.lock: 782 if state.stage is not _ServerStage.STOPPED: 783 raise ValueError('Cannot start already-started server!') 784 state.server.start() 785 state.stage = _ServerStage.STARTED 786 _request_call(state) 787 788 thread = threading.Thread(target=_serve, args=(state,)) 789 thread.daemon = True 790 thread.start() 791 792 793def _validate_generic_rpc_handlers(generic_rpc_handlers): 794 for generic_rpc_handler in generic_rpc_handlers: 795 service_attribute = getattr(generic_rpc_handler, 'service', None) 796 if service_attribute is None: 797 raise AttributeError( 798 '"{}" must conform to grpc.GenericRpcHandler type but does ' 799 'not have "service" method!'.format(generic_rpc_handler)) 800 801 802class _Server(grpc.Server): 803 804 # pylint: disable=too-many-arguments 805 def __init__(self, thread_pool, generic_handlers, interceptors, options, 806 maximum_concurrent_rpcs): 807 completion_queue = cygrpc.CompletionQueue() 808 server = cygrpc.Server(options) 809 server.register_completion_queue(completion_queue) 810 self._state = _ServerState(completion_queue, server, generic_handlers, 811 _interceptor.service_pipeline(interceptors), 812 thread_pool, maximum_concurrent_rpcs) 813 814 def add_generic_rpc_handlers(self, generic_rpc_handlers): 815 _validate_generic_rpc_handlers(generic_rpc_handlers) 816 _add_generic_handlers(self._state, generic_rpc_handlers) 817 818 def add_insecure_port(self, address): 819 return _add_insecure_port(self._state, _common.encode(address)) 820 821 def add_secure_port(self, address, server_credentials): 822 return _add_secure_port(self._state, _common.encode(address), 823 server_credentials) 824 825 def start(self): 826 _start(self._state) 827 828 def stop(self, grace): 829 return _stop(self._state, grace) 830 831 def __del__(self): 832 _stop(self._state, None) 833 834 835def create_server(thread_pool, generic_rpc_handlers, interceptors, options, 836 maximum_concurrent_rpcs): 837 _validate_generic_rpc_handlers(generic_rpc_handlers) 838 return _Server(thread_pool, generic_rpc_handlers, interceptors, options, 839 maximum_concurrent_rpcs) 840