1# Copyright 2019 The gRPC Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15 16import inspect 17import traceback 18import functools 19 20 21cdef int _EMPTY_FLAG = 0 22cdef str _RPC_FINISHED_DETAILS = 'RPC already finished.' 23cdef str _SERVER_STOPPED_DETAILS = 'Server already stopped.' 24 25cdef _augment_metadata(tuple metadata, object compression): 26 if compression is None: 27 return metadata 28 else: 29 return (( 30 GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, 31 _COMPRESSION_METADATA_STRING_MAPPING[compression] 32 ),) + metadata 33 34 35cdef class _HandlerCallDetails: 36 def __cinit__(self, str method, tuple invocation_metadata): 37 self.method = method 38 self.invocation_metadata = invocation_metadata 39 40 41class _ServerStoppedError(BaseError): 42 """Raised if the server is stopped.""" 43 44 45cdef class RPCState: 46 47 def __cinit__(self, AioServer server): 48 init_grpc_aio() 49 self.call = NULL 50 self.server = server 51 grpc_metadata_array_init(&self.request_metadata) 52 grpc_call_details_init(&self.details) 53 self.client_closed = False 54 self.abort_exception = None 55 self.metadata_sent = False 56 self.status_sent = False 57 self.status_code = StatusCode.ok 58 self.py_status_code = None 59 self.status_details = '' 60 self.trailing_metadata = _IMMUTABLE_EMPTY_METADATA 61 self.compression_algorithm = None 62 self.disable_next_compression = False 63 self.callbacks = [] 64 65 cdef bytes method(self): 66 return _slice_bytes(self.details.method) 67 68 cdef tuple invocation_metadata(self): 69 return _metadata(&self.request_metadata) 70 71 cdef void raise_for_termination(self) except *: 72 """Raise exceptions if RPC is not running. 73 74 Server method handlers may suppress the abort exception. We need to halt 75 the RPC execution in that case. This function needs to be called after 76 running application code. 77 78 Also, the server may stop unexpected. We need to check before calling 79 into Core functions, otherwise, segfault. 80 """ 81 if self.abort_exception is not None: 82 raise self.abort_exception 83 if self.status_sent: 84 raise UsageError(_RPC_FINISHED_DETAILS) 85 if self.server._status == AIO_SERVER_STATUS_STOPPED: 86 raise _ServerStoppedError(_SERVER_STOPPED_DETAILS) 87 88 cdef int get_write_flag(self): 89 if self.disable_next_compression: 90 self.disable_next_compression = False 91 return WriteFlag.no_compress 92 else: 93 return _EMPTY_FLAG 94 95 cdef Operation create_send_initial_metadata_op_if_not_sent(self): 96 cdef SendInitialMetadataOperation op 97 if self.metadata_sent: 98 return None 99 else: 100 op = SendInitialMetadataOperation( 101 _augment_metadata(_IMMUTABLE_EMPTY_METADATA, self.compression_algorithm), 102 _EMPTY_FLAG 103 ) 104 return op 105 106 def __dealloc__(self): 107 """Cleans the Core objects.""" 108 grpc_call_details_destroy(&self.details) 109 grpc_metadata_array_destroy(&self.request_metadata) 110 if self.call: 111 grpc_call_unref(self.call) 112 shutdown_grpc_aio() 113 114 115cdef class _ServicerContext: 116 117 def __cinit__(self, 118 RPCState rpc_state, 119 object request_deserializer, 120 object response_serializer, 121 object loop): 122 self._rpc_state = rpc_state 123 self._request_deserializer = request_deserializer 124 self._response_serializer = response_serializer 125 self._loop = loop 126 127 async def read(self): 128 cdef bytes raw_message 129 self._rpc_state.raise_for_termination() 130 131 raw_message = await _receive_message(self._rpc_state, self._loop) 132 self._rpc_state.raise_for_termination() 133 134 if raw_message is None: 135 return EOF 136 else: 137 return deserialize(self._request_deserializer, 138 raw_message) 139 140 async def write(self, object message): 141 self._rpc_state.raise_for_termination() 142 143 await _send_message(self._rpc_state, 144 serialize(self._response_serializer, message), 145 self._rpc_state.create_send_initial_metadata_op_if_not_sent(), 146 self._rpc_state.get_write_flag(), 147 self._loop) 148 self._rpc_state.metadata_sent = True 149 150 async def send_initial_metadata(self, object metadata): 151 self._rpc_state.raise_for_termination() 152 153 if self._rpc_state.metadata_sent: 154 raise UsageError('Send initial metadata failed: already sent') 155 else: 156 await _send_initial_metadata( 157 self._rpc_state, 158 _augment_metadata(tuple(metadata), self._rpc_state.compression_algorithm), 159 _EMPTY_FLAG, 160 self._loop 161 ) 162 self._rpc_state.metadata_sent = True 163 164 async def abort(self, 165 object code, 166 str details='', 167 tuple trailing_metadata=_IMMUTABLE_EMPTY_METADATA): 168 if self._rpc_state.abort_exception is not None: 169 raise UsageError('Abort already called!') 170 else: 171 # Keeps track of the exception object. After abort happen, the RPC 172 # should stop execution. However, if users decided to suppress it, it 173 # could lead to undefined behavior. 174 self._rpc_state.abort_exception = AbortError('Locally aborted.') 175 176 if trailing_metadata == _IMMUTABLE_EMPTY_METADATA and self._rpc_state.trailing_metadata: 177 trailing_metadata = self._rpc_state.trailing_metadata 178 else: 179 raise_if_not_valid_trailing_metadata(trailing_metadata) 180 self._rpc_state.trailing_metadata = trailing_metadata 181 182 if details == '' and self._rpc_state.status_details: 183 details = self._rpc_state.status_details 184 else: 185 self._rpc_state.status_details = details 186 187 actual_code = get_status_code(code) 188 self._rpc_state.py_status_code = code 189 self._rpc_state.status_code = actual_code 190 191 self._rpc_state.status_sent = True 192 await _send_error_status_from_server( 193 self._rpc_state, 194 actual_code, 195 details, 196 trailing_metadata, 197 self._rpc_state.create_send_initial_metadata_op_if_not_sent(), 198 self._loop 199 ) 200 201 raise self._rpc_state.abort_exception 202 203 async def abort_with_status(self, object status): 204 await self.abort(status.code, status.details, status.trailing_metadata) 205 206 def set_trailing_metadata(self, object metadata): 207 raise_if_not_valid_trailing_metadata(metadata) 208 self._rpc_state.trailing_metadata = tuple(metadata) 209 210 def trailing_metadata(self): 211 return self._rpc_state.trailing_metadata 212 213 def invocation_metadata(self): 214 return self._rpc_state.invocation_metadata() 215 216 def set_code(self, object code): 217 self._rpc_state.status_code = get_status_code(code) 218 self._rpc_state.py_status_code = code 219 220 def code(self): 221 return self._rpc_state.py_status_code 222 223 def set_details(self, str details): 224 self._rpc_state.status_details = details 225 226 def details(self): 227 return self._rpc_state.status_details 228 229 def set_compression(self, object compression): 230 if self._rpc_state.metadata_sent: 231 raise RuntimeError('Compression setting must be specified before sending initial metadata') 232 else: 233 self._rpc_state.compression_algorithm = compression 234 235 def disable_next_message_compression(self): 236 self._rpc_state.disable_next_compression = True 237 238 def peer(self): 239 cdef char *c_peer = NULL 240 c_peer = grpc_call_get_peer(self._rpc_state.call) 241 peer = (<bytes>c_peer).decode('utf8') 242 gpr_free(c_peer) 243 return peer 244 245 def peer_identities(self): 246 cdef Call query_call = Call() 247 query_call.c_call = self._rpc_state.call 248 identities = peer_identities(query_call) 249 query_call.c_call = NULL 250 return identities 251 252 def peer_identity_key(self): 253 cdef Call query_call = Call() 254 query_call.c_call = self._rpc_state.call 255 identity_key = peer_identity_key(query_call) 256 query_call.c_call = NULL 257 if identity_key: 258 return identity_key.decode('utf8') 259 else: 260 return None 261 262 def auth_context(self): 263 cdef Call query_call = Call() 264 query_call.c_call = self._rpc_state.call 265 bytes_ctx = auth_context(query_call) 266 query_call.c_call = NULL 267 if bytes_ctx: 268 ctx = {} 269 for key in bytes_ctx: 270 ctx[key.decode('utf8')] = bytes_ctx[key] 271 return ctx 272 else: 273 return {} 274 275 def time_remaining(self): 276 if self._rpc_state.details.deadline.seconds == _GPR_INF_FUTURE.seconds: 277 return None 278 else: 279 return max(_time_from_timespec(self._rpc_state.details.deadline) - time.time(), 0) 280 281 def add_done_callback(self, callback): 282 cb = functools.partial(callback, self) 283 self._rpc_state.callbacks.append(cb) 284 285 def done(self): 286 return self._rpc_state.status_sent 287 288 def cancelled(self): 289 return self._rpc_state.status_code == StatusCode.cancelled 290 291 292cdef class _SyncServicerContext: 293 """Sync servicer context for sync handler compatibility.""" 294 295 def __cinit__(self, 296 _ServicerContext context): 297 self._context = context 298 self._callbacks = [] 299 self._loop = context._loop 300 301 def abort(self, 302 object code, 303 str details='', 304 tuple trailing_metadata=_IMMUTABLE_EMPTY_METADATA): 305 future = asyncio.run_coroutine_threadsafe( 306 self._context.abort(code, details, trailing_metadata), 307 self._loop) 308 # Abort should raise an AbortError 309 future.exception() 310 311 def send_initial_metadata(self, object metadata): 312 future = asyncio.run_coroutine_threadsafe( 313 self._context.send_initial_metadata(metadata), 314 self._loop) 315 future.result() 316 317 def set_trailing_metadata(self, object metadata): 318 self._context.set_trailing_metadata(metadata) 319 320 def invocation_metadata(self): 321 return self._context.invocation_metadata() 322 323 def set_code(self, object code): 324 self._context.set_code(code) 325 326 def set_details(self, str details): 327 self._context.set_details(details) 328 329 def set_compression(self, object compression): 330 self._context.set_compression(compression) 331 332 def disable_next_message_compression(self): 333 self._context.disable_next_message_compression() 334 335 def add_callback(self, object callback): 336 self._callbacks.append(callback) 337 338 def peer(self): 339 return self._context.peer() 340 341 def peer_identities(self): 342 return self._context.peer_identities() 343 344 def peer_identity_key(self): 345 return self._context.peer_identity_key() 346 347 def auth_context(self): 348 return self._context.auth_context() 349 350 def time_remaining(self): 351 return self._context.time_remaining() 352 353 354async def _run_interceptor(object interceptors, object query_handler, 355 object handler_call_details): 356 interceptor = next(interceptors, None) 357 if interceptor: 358 continuation = functools.partial(_run_interceptor, interceptors, 359 query_handler) 360 return await interceptor.intercept_service(continuation, handler_call_details) 361 else: 362 return query_handler(handler_call_details) 363 364 365def _is_async_handler(object handler): 366 """Inspect if a method handler is async or sync.""" 367 return inspect.isawaitable(handler) or inspect.iscoroutinefunction(handler) or inspect.isasyncgenfunction(handler) 368 369 370async def _find_method_handler(str method, tuple metadata, list generic_handlers, 371 tuple interceptors): 372 def query_handlers(handler_call_details): 373 for generic_handler in generic_handlers: 374 method_handler = generic_handler.service(handler_call_details) 375 if method_handler is not None: 376 return method_handler 377 return None 378 379 cdef _HandlerCallDetails handler_call_details = _HandlerCallDetails(method, 380 metadata) 381 # interceptor 382 if interceptors: 383 return await _run_interceptor(iter(interceptors), query_handlers, 384 handler_call_details) 385 else: 386 return query_handlers(handler_call_details) 387 388 389async def _finish_handler_with_unary_response(RPCState rpc_state, 390 object unary_handler, 391 object request, 392 _ServicerContext servicer_context, 393 object response_serializer, 394 object loop): 395 """Finishes server method handler with a single response. 396 397 This function executes the application handler, and handles response 398 sending, as well as errors. It is shared between unary-unary and 399 stream-unary handlers. 400 """ 401 # Executes application logic 402 cdef object response_message 403 cdef _SyncServicerContext sync_servicer_context 404 install_context_from_request_call_event_aio(rpc_state) 405 406 if _is_async_handler(unary_handler): 407 # Run async method handlers in this coroutine 408 response_message = await unary_handler( 409 request, 410 servicer_context, 411 ) 412 else: 413 # Run sync method handlers in the thread pool 414 sync_servicer_context = _SyncServicerContext(servicer_context) 415 response_message = await loop.run_in_executor( 416 rpc_state.server.thread_pool(), 417 unary_handler, 418 request, 419 sync_servicer_context, 420 ) 421 # Support sync-stack callback 422 for callback in sync_servicer_context._callbacks: 423 callback() 424 425 # Raises exception if aborted 426 rpc_state.raise_for_termination() 427 428 # Serializes the response message 429 cdef bytes response_raw 430 if rpc_state.status_code == StatusCode.ok: 431 response_raw = serialize( 432 response_serializer, 433 response_message, 434 ) 435 else: 436 # Discards the response message if the status code is non-OK. 437 response_raw = b'' 438 439 # Assembles the batch operations 440 cdef tuple finish_ops 441 finish_ops = ( 442 SendMessageOperation(response_raw, rpc_state.get_write_flag()), 443 SendStatusFromServerOperation( 444 rpc_state.trailing_metadata, 445 rpc_state.status_code, 446 rpc_state.status_details, 447 _EMPTY_FLAGS, 448 ), 449 ) 450 if not rpc_state.metadata_sent: 451 finish_ops = prepend_send_initial_metadata_op( 452 finish_ops, 453 None) 454 rpc_state.metadata_sent = True 455 rpc_state.status_sent = True 456 await execute_batch(rpc_state, finish_ops, loop) 457 uninstall_context() 458 459 460async def _finish_handler_with_stream_responses(RPCState rpc_state, 461 object stream_handler, 462 object request, 463 _ServicerContext servicer_context, 464 object loop): 465 """Finishes server method handler with multiple responses. 466 467 This function executes the application handler, and handles response 468 sending, as well as errors. It is shared between unary-stream and 469 stream-stream handlers. 470 """ 471 cdef object async_response_generator 472 cdef object response_message 473 install_context_from_request_call_event_aio(rpc_state) 474 475 if inspect.iscoroutinefunction(stream_handler): 476 # Case 1: Coroutine async handler - using reader-writer API 477 # The handler uses reader / writer API, returns None. 478 await stream_handler( 479 request, 480 servicer_context, 481 ) 482 else: 483 if inspect.isasyncgenfunction(stream_handler): 484 # Case 2: Async handler - async generator 485 # The handler uses async generator API 486 async_response_generator = stream_handler( 487 request, 488 servicer_context, 489 ) 490 else: 491 # Case 3: Sync handler - normal generator 492 # NOTE(lidiz) Streaming handler in sync stack is either a generator 493 # function or a function returns a generator. 494 sync_servicer_context = _SyncServicerContext(servicer_context) 495 gen = stream_handler(request, sync_servicer_context) 496 async_response_generator = generator_to_async_generator(gen, 497 loop, 498 rpc_state.server.thread_pool()) 499 500 # Consumes messages from the generator 501 async for response_message in async_response_generator: 502 # Raises exception if aborted 503 rpc_state.raise_for_termination() 504 505 await servicer_context.write(response_message) 506 507 # Raises exception if aborted 508 rpc_state.raise_for_termination() 509 510 # Sends the final status of this RPC 511 cdef SendStatusFromServerOperation op = SendStatusFromServerOperation( 512 rpc_state.trailing_metadata, 513 rpc_state.status_code, 514 rpc_state.status_details, 515 _EMPTY_FLAGS, 516 ) 517 518 cdef tuple finish_ops = (op,) 519 if not rpc_state.metadata_sent: 520 finish_ops = prepend_send_initial_metadata_op( 521 finish_ops, 522 None 523 ) 524 rpc_state.metadata_sent = True 525 rpc_state.status_sent = True 526 await execute_batch(rpc_state, finish_ops, loop) 527 uninstall_context() 528 529 530async def _handle_unary_unary_rpc(object method_handler, 531 RPCState rpc_state, 532 object loop): 533 # Receives request message 534 cdef bytes request_raw = await _receive_message(rpc_state, loop) 535 if request_raw is None: 536 # The RPC was cancelled immediately after start on client side. 537 return 538 539 # Deserializes the request message 540 cdef object request_message = deserialize( 541 method_handler.request_deserializer, 542 request_raw, 543 ) 544 545 # Creates a dedicated ServicerContext 546 cdef _ServicerContext servicer_context = _ServicerContext( 547 rpc_state, 548 None, 549 None, 550 loop, 551 ) 552 553 # Finishes the application handler 554 await _finish_handler_with_unary_response( 555 rpc_state, 556 method_handler.unary_unary, 557 request_message, 558 servicer_context, 559 method_handler.response_serializer, 560 loop 561 ) 562 563 564async def _handle_unary_stream_rpc(object method_handler, 565 RPCState rpc_state, 566 object loop): 567 # Receives request message 568 cdef bytes request_raw = await _receive_message(rpc_state, loop) 569 if request_raw is None: 570 return 571 572 # Deserializes the request message 573 cdef object request_message = deserialize( 574 method_handler.request_deserializer, 575 request_raw, 576 ) 577 578 # Creates a dedicated ServicerContext 579 cdef _ServicerContext servicer_context = _ServicerContext( 580 rpc_state, 581 method_handler.request_deserializer, 582 method_handler.response_serializer, 583 loop, 584 ) 585 586 # Finishes the application handler 587 await _finish_handler_with_stream_responses( 588 rpc_state, 589 method_handler.unary_stream, 590 request_message, 591 servicer_context, 592 loop, 593 ) 594 595 596cdef class _MessageReceiver: 597 """Bridge between the async generator API and the reader-writer API.""" 598 599 def __cinit__(self, _ServicerContext servicer_context): 600 self._servicer_context = servicer_context 601 self._agen = None 602 603 async def _async_message_receiver(self): 604 """An async generator that receives messages.""" 605 cdef object message 606 while True: 607 message = await self._servicer_context.read() 608 if message is not EOF: 609 yield message 610 else: 611 break 612 613 def __aiter__(self): 614 # Prevents never awaited warning if application never used the async generator 615 if self._agen is None: 616 self._agen = self._async_message_receiver() 617 return self._agen 618 619 async def __anext__(self): 620 return await self.__aiter__().__anext__() 621 622 623async def _handle_stream_unary_rpc(object method_handler, 624 RPCState rpc_state, 625 object loop): 626 # Creates a dedicated ServicerContext 627 cdef _ServicerContext servicer_context = _ServicerContext( 628 rpc_state, 629 method_handler.request_deserializer, 630 None, 631 loop, 632 ) 633 634 # Prepares the request generator 635 cdef object request_iterator 636 if _is_async_handler(method_handler.stream_unary): 637 request_iterator = _MessageReceiver(servicer_context) 638 else: 639 request_iterator = async_generator_to_generator( 640 _MessageReceiver(servicer_context), 641 loop 642 ) 643 644 # Finishes the application handler 645 await _finish_handler_with_unary_response( 646 rpc_state, 647 method_handler.stream_unary, 648 request_iterator, 649 servicer_context, 650 method_handler.response_serializer, 651 loop 652 ) 653 654 655async def _handle_stream_stream_rpc(object method_handler, 656 RPCState rpc_state, 657 object loop): 658 # Creates a dedicated ServicerContext 659 cdef _ServicerContext servicer_context = _ServicerContext( 660 rpc_state, 661 method_handler.request_deserializer, 662 method_handler.response_serializer, 663 loop, 664 ) 665 666 # Prepares the request generator 667 cdef object request_iterator 668 if _is_async_handler(method_handler.stream_stream): 669 request_iterator = _MessageReceiver(servicer_context) 670 else: 671 request_iterator = async_generator_to_generator( 672 _MessageReceiver(servicer_context), 673 loop 674 ) 675 676 # Finishes the application handler 677 await _finish_handler_with_stream_responses( 678 rpc_state, 679 method_handler.stream_stream, 680 request_iterator, 681 servicer_context, 682 loop, 683 ) 684 685 686async def _handle_exceptions(RPCState rpc_state, object rpc_coro, object loop): 687 try: 688 try: 689 await rpc_coro 690 except AbortError as e: 691 # Caught AbortError check if it is the same one 692 assert rpc_state.abort_exception is e, 'Abort error has been replaced!' 693 return 694 else: 695 # Check if the abort exception got suppressed 696 if rpc_state.abort_exception is not None: 697 _LOGGER.error( 698 'Abort error unexpectedly suppressed: %s', 699 traceback.format_exception(rpc_state.abort_exception) 700 ) 701 except (KeyboardInterrupt, SystemExit): 702 raise 703 except asyncio.CancelledError: 704 _LOGGER.debug('RPC cancelled for servicer method [%s]', _decode(rpc_state.method())) 705 except _ServerStoppedError: 706 _LOGGER.warning('Aborting method [%s] due to server stop.', _decode(rpc_state.method())) 707 except ExecuteBatchError: 708 # If client closed (aka. cancelled), ignore the failed batch operations. 709 if rpc_state.client_closed: 710 return 711 else: 712 _LOGGER.exception('ExecuteBatchError raised in core by servicer method [%s]' % ( 713 _decode(rpc_state.method()))) 714 return 715 except Exception as e: 716 _LOGGER.exception('Unexpected [%s] raised by servicer method [%s]' % ( 717 type(e).__name__, 718 _decode(rpc_state.method()), 719 )) 720 if not rpc_state.status_sent and rpc_state.server._status != AIO_SERVER_STATUS_STOPPED: 721 # Allows users to raise other types of exception with specified status code 722 if rpc_state.status_code == StatusCode.ok: 723 status_code = StatusCode.unknown 724 else: 725 status_code = rpc_state.status_code 726 727 rpc_state.status_sent = True 728 try: 729 await _send_error_status_from_server( 730 rpc_state, 731 status_code, 732 'Unexpected %s: %s' % (type(e), e), 733 rpc_state.trailing_metadata, 734 rpc_state.create_send_initial_metadata_op_if_not_sent(), 735 loop 736 ) 737 except ExecuteBatchError: 738 _LOGGER.exception('Failed sending error status from server') 739 traceback.print_exc() 740 741 742cdef _add_callback_handler(object rpc_task, RPCState rpc_state): 743 744 def handle_callbacks(object unused_task): 745 try: 746 for callback in rpc_state.callbacks: 747 # The _ServicerContext object is bound in add_done_callback. 748 callback() 749 except: 750 _LOGGER.exception('Error in callback for method [%s]', _decode(rpc_state.method())) 751 752 rpc_task.add_done_callback(handle_callbacks) 753 754 755async def _handle_cancellation_from_core(object rpc_task, 756 RPCState rpc_state, 757 object loop): 758 cdef ReceiveCloseOnServerOperation op = ReceiveCloseOnServerOperation(_EMPTY_FLAG) 759 cdef tuple ops = (op,) 760 761 # Awaits cancellation from peer. 762 await execute_batch(rpc_state, ops, loop) 763 rpc_state.client_closed = True 764 # If 1) received cancel signal; 2) the Task is not finished; 3) the server 765 # wasn't replying final status. For condition 3, it might cause inaccurate 766 # log that an RPC is both aborted and cancelled. 767 if op.cancelled() and not rpc_task.done() and not rpc_state.status_sent: 768 # Injects `CancelledError` to halt the RPC coroutine 769 rpc_task.cancel() 770 771 772async def _schedule_rpc_coro(object rpc_coro, 773 RPCState rpc_state, 774 object loop): 775 # Schedules the RPC coroutine. 776 cdef object rpc_task = loop.create_task(_handle_exceptions( 777 rpc_state, 778 rpc_coro, 779 loop, 780 ), name="HandleExceptions[%s]" % _decode(rpc_state.method())) 781 _add_callback_handler(rpc_task, rpc_state) 782 await _handle_cancellation_from_core(rpc_task, rpc_state, loop) 783 try: 784 # Propagate any errors not handled by _handle_exceptions. If not awaited 785 # there will be logs of the form "Task exception was never retrieved". 786 # Catching it here we can provide traceback and debugging logs. 787 await rpc_task 788 except: 789 _LOGGER.exception('Exception not handled by _handle_exceptions in servicer method [%s]' % ( 790 _decode(rpc_state.method()), 791 )) 792 traceback.print_exc() 793 794 795async def _handle_rpc(list generic_handlers, tuple interceptors, 796 RPCState rpc_state, object loop, bint concurrency_exceeded): 797 cdef object method_handler 798 # Finds the method handler (application logic) 799 method_handler = await _find_method_handler( 800 rpc_state.method().decode(), 801 rpc_state.invocation_metadata(), 802 generic_handlers, 803 interceptors, 804 ) 805 if method_handler is None: 806 rpc_state.status_sent = True 807 await _send_error_status_from_server( 808 rpc_state, 809 StatusCode.unimplemented, 810 'Method not found!', 811 _IMMUTABLE_EMPTY_METADATA, 812 rpc_state.create_send_initial_metadata_op_if_not_sent(), 813 loop 814 ) 815 return 816 817 if concurrency_exceeded: 818 rpc_state.status_sent = True 819 await _send_error_status_from_server( 820 rpc_state, 821 StatusCode.resource_exhausted, 822 'Concurrent RPC limit exceeded!', 823 _IMMUTABLE_EMPTY_METADATA, 824 rpc_state.create_send_initial_metadata_op_if_not_sent(), 825 loop 826 ) 827 return 828 829 # Handles unary-unary case 830 if not method_handler.request_streaming and not method_handler.response_streaming: 831 await _handle_unary_unary_rpc(method_handler, 832 rpc_state, 833 loop) 834 return 835 836 # Handles unary-stream case 837 if not method_handler.request_streaming and method_handler.response_streaming: 838 await _handle_unary_stream_rpc(method_handler, 839 rpc_state, 840 loop) 841 return 842 843 # Handles stream-unary case 844 if method_handler.request_streaming and not method_handler.response_streaming: 845 await _handle_stream_unary_rpc(method_handler, 846 rpc_state, 847 loop) 848 return 849 850 # Handles stream-stream case 851 if method_handler.request_streaming and method_handler.response_streaming: 852 await _handle_stream_stream_rpc(method_handler, 853 rpc_state, 854 loop) 855 return 856 857 858class _RequestCallError(Exception): pass 859 860cdef CallbackFailureHandler REQUEST_CALL_FAILURE_HANDLER = CallbackFailureHandler( 861 'grpc_server_request_call', None, _RequestCallError) 862 863 864cdef CallbackFailureHandler SERVER_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHandler( 865 'grpc_server_shutdown_and_notify', 866 None, 867 InternalError) 868 869 870cdef class _ConcurrentRpcLimiter: 871 872 def __cinit__(self, int maximum_concurrent_rpcs): 873 if maximum_concurrent_rpcs <= 0: 874 raise ValueError("maximum_concurrent_rpcs should be a positive integer") 875 self._maximum_concurrent_rpcs = maximum_concurrent_rpcs 876 self._active_rpcs = 0 877 self.limiter_concurrency_exceeded = False 878 879 def check_before_request_call(self): 880 if self._active_rpcs >= self._maximum_concurrent_rpcs: 881 self.limiter_concurrency_exceeded = True 882 else: 883 self._active_rpcs += 1 884 885 def _decrease_active_rpcs_count(self, unused_future): 886 self._active_rpcs -= 1 887 if self._active_rpcs < self._maximum_concurrent_rpcs: 888 self.limiter_concurrency_exceeded = False 889 890 def decrease_once_finished(self, object rpc_task): 891 rpc_task.add_done_callback(self._decrease_active_rpcs_count) 892 893 894cdef class AioServer: 895 896 def __init__(self, loop, thread_pool, generic_handlers, interceptors, 897 options, maximum_concurrent_rpcs): 898 init_grpc_aio() 899 # NOTE(lidiz) Core objects won't be deallocated automatically. 900 # If AioServer.shutdown is not called, those objects will leak. 901 # TODO(rbellevi): Support xDS in aio server. 902 self._server = Server(options, False) 903 grpc_server_register_completion_queue( 904 self._server.c_server, 905 global_completion_queue(), 906 NULL 907 ) 908 909 self._loop = loop 910 self._status = AIO_SERVER_STATUS_READY 911 self._generic_handlers = [] 912 self.add_generic_rpc_handlers(generic_handlers) 913 self._serving_task = None 914 915 self._shutdown_lock = asyncio.Lock() 916 self._shutdown_completed = self._loop.create_future() 917 self._shutdown_callback_wrapper = CallbackWrapper( 918 self._shutdown_completed, 919 self._loop, 920 SERVER_SHUTDOWN_FAILURE_HANDLER) 921 self._crash_exception = None 922 923 if interceptors: 924 self._interceptors = tuple(interceptors) 925 else: 926 self._interceptors = () 927 928 self._thread_pool = thread_pool 929 if maximum_concurrent_rpcs is not None: 930 self._limiter = _ConcurrentRpcLimiter(maximum_concurrent_rpcs) 931 932 def add_generic_rpc_handlers(self, object generic_rpc_handlers): 933 self._generic_handlers.extend(generic_rpc_handlers) 934 935 def add_insecure_port(self, address): 936 return self._server.add_http2_port(address) 937 938 def add_secure_port(self, address, server_credentials): 939 return self._server.add_http2_port(address, 940 server_credentials._credentials) 941 942 async def _request_call(self): 943 cdef grpc_call_error error 944 cdef RPCState rpc_state = RPCState(self) 945 cdef object future = self._loop.create_future() 946 cdef CallbackWrapper wrapper = CallbackWrapper( 947 future, 948 self._loop, 949 REQUEST_CALL_FAILURE_HANDLER) 950 error = grpc_server_request_call( 951 self._server.c_server, &rpc_state.call, &rpc_state.details, 952 &rpc_state.request_metadata, 953 global_completion_queue(), global_completion_queue(), 954 wrapper.c_functor() 955 ) 956 if error != GRPC_CALL_OK: 957 raise InternalError("Error in grpc_server_request_call: %s" % error) 958 959 await future 960 return rpc_state 961 962 async def _server_main_loop(self, 963 object server_started): 964 self._server.start(backup_queue=False) 965 cdef RPCState rpc_state 966 server_started.set_result(True) 967 rpc_tasks = set() 968 969 while True: 970 # When shutdown begins, no more new connections. 971 if self._status != AIO_SERVER_STATUS_RUNNING: 972 break 973 974 concurrency_exceeded = False 975 if self._limiter is not None: 976 self._limiter.check_before_request_call() 977 concurrency_exceeded = self._limiter.limiter_concurrency_exceeded 978 979 # Accepts new request from Core 980 rpc_state = await self._request_call() 981 982 # Creates the dedicated RPC coroutine. If we schedule it right now, 983 # there is no guarantee if the cancellation listening coroutine is 984 # ready or not. So, we should control the ordering by scheduling 985 # the coroutine onto event loop inside of the cancellation 986 # coroutine. 987 rpc_coro = _handle_rpc(self._generic_handlers, 988 self._interceptors, 989 rpc_state, 990 self._loop, 991 concurrency_exceeded) 992 993 # Fires off a task that listens on the cancellation from client. 994 rpc_task = self._loop.create_task( 995 _schedule_rpc_coro( 996 rpc_coro, 997 rpc_state, 998 self._loop 999 ), 1000 name="rpc_task", 1001 ) 1002 1003 # loop.create_task only holds a weakref to the task. 1004 # Maintain reference to tasks to avoid garbage collection. 1005 rpc_tasks.add(rpc_task) 1006 rpc_task.add_done_callback(rpc_tasks.discard) 1007 1008 if self._limiter is not None: 1009 self._limiter.decrease_once_finished(rpc_task) 1010 1011 def _serving_task_crash_handler(self, object task): 1012 """Shutdown the server immediately if unexpectedly exited.""" 1013 if task.cancelled(): 1014 return 1015 if task.exception() is None: 1016 return 1017 if self._status != AIO_SERVER_STATUS_STOPPING: 1018 self._crash_exception = task.exception() 1019 _LOGGER.exception(self._crash_exception) 1020 self._loop.create_task(self.shutdown(None)) 1021 1022 async def start(self): 1023 if self._status == AIO_SERVER_STATUS_RUNNING: 1024 return 1025 elif self._status != AIO_SERVER_STATUS_READY: 1026 raise UsageError('Server not in ready state') 1027 1028 self._status = AIO_SERVER_STATUS_RUNNING 1029 cdef object server_started = self._loop.create_future() 1030 self._serving_task = self._loop.create_task(self._server_main_loop(server_started)) 1031 self._serving_task.add_done_callback(self._serving_task_crash_handler) 1032 # Needs to explicitly wait for the server to start up. 1033 # Otherwise, the actual start time of the server is un-controllable. 1034 await server_started 1035 1036 async def _start_shutting_down(self): 1037 """Prepares the server to shutting down. 1038 1039 This coroutine function is NOT coroutine-safe. 1040 """ 1041 # The shutdown callback won't be called until there is no live RPC. 1042 grpc_server_shutdown_and_notify( 1043 self._server.c_server, 1044 global_completion_queue(), 1045 self._shutdown_callback_wrapper.c_functor()) 1046 1047 # Ensures the serving task (coroutine) exits. 1048 try: 1049 await self._serving_task 1050 except _RequestCallError: 1051 pass 1052 1053 async def shutdown(self, grace): 1054 """Gracefully shutdown the Core server. 1055 1056 Application should only call shutdown once. 1057 1058 Args: 1059 grace: An optional float indicating the length of grace period in 1060 seconds. 1061 """ 1062 if self._status == AIO_SERVER_STATUS_READY or self._status == AIO_SERVER_STATUS_STOPPED: 1063 return 1064 1065 async with self._shutdown_lock: 1066 if self._status == AIO_SERVER_STATUS_RUNNING: 1067 self._server.is_shutting_down = True 1068 self._status = AIO_SERVER_STATUS_STOPPING 1069 await self._start_shutting_down() 1070 1071 if grace is None: 1072 # Directly cancels all calls 1073 grpc_server_cancel_all_calls(self._server.c_server) 1074 await self._shutdown_completed 1075 else: 1076 try: 1077 await asyncio.wait_for( 1078 asyncio.shield(self._shutdown_completed), 1079 grace, 1080 ) 1081 except asyncio.TimeoutError: 1082 # Cancels all ongoing calls by the end of grace period. 1083 grpc_server_cancel_all_calls(self._server.c_server) 1084 await self._shutdown_completed 1085 1086 async with self._shutdown_lock: 1087 if self._status == AIO_SERVER_STATUS_STOPPING: 1088 grpc_server_destroy(self._server.c_server) 1089 self._server.c_server = NULL 1090 self._server.is_shutdown = True 1091 self._status = AIO_SERVER_STATUS_STOPPED 1092 1093 async def wait_for_termination(self, object timeout): 1094 if timeout is None: 1095 await self._shutdown_completed 1096 else: 1097 try: 1098 await asyncio.wait_for( 1099 asyncio.shield(self._shutdown_completed), 1100 timeout, 1101 ) 1102 except asyncio.TimeoutError: 1103 if self._crash_exception is not None: 1104 raise self._crash_exception 1105 return True 1106 if self._crash_exception is not None: 1107 raise self._crash_exception 1108 return False 1109 1110 def __dealloc__(self): 1111 """Deallocation of Core objects are ensured by Python layer.""" 1112 # TODO(lidiz) if users create server, and then dealloc it immediately. 1113 # There is a potential memory leak of created Core server. 1114 if self._status != AIO_SERVER_STATUS_STOPPED: 1115 _LOGGER.debug( 1116 '__dealloc__ called on running server %s with status %d', 1117 self, 1118 self._status 1119 ) 1120 shutdown_grpc_aio() 1121 1122 cdef thread_pool(self): 1123 """Access the thread pool instance.""" 1124 return self._thread_pool 1125 1126 def is_running(self): 1127 return self._status == AIO_SERVER_STATUS_RUNNING 1128