1# Copyright 2016 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Invocation-side implementation of gRPC Python.""" 15 16import copy 17import functools 18import logging 19import os 20import sys 21import threading 22import time 23 24import grpc 25import grpc.experimental 26from grpc import _compression 27from grpc import _common 28from grpc import _grpcio_metadata 29from grpc._cython import cygrpc 30 31_LOGGER = logging.getLogger(__name__) 32 33_USER_AGENT = 'grpc-python/{}'.format(_grpcio_metadata.__version__) 34 35_EMPTY_FLAGS = 0 36 37# NOTE(rbellevi): No guarantees are given about the maintenance of this 38# environment variable. 39_DEFAULT_SINGLE_THREADED_UNARY_STREAM = os.getenv( 40 "GRPC_SINGLE_THREADED_UNARY_STREAM") is not None 41 42_UNARY_UNARY_INITIAL_DUE = ( 43 cygrpc.OperationType.send_initial_metadata, 44 cygrpc.OperationType.send_message, 45 cygrpc.OperationType.send_close_from_client, 46 cygrpc.OperationType.receive_initial_metadata, 47 cygrpc.OperationType.receive_message, 48 cygrpc.OperationType.receive_status_on_client, 49) 50_UNARY_STREAM_INITIAL_DUE = ( 51 cygrpc.OperationType.send_initial_metadata, 52 cygrpc.OperationType.send_message, 53 cygrpc.OperationType.send_close_from_client, 54 cygrpc.OperationType.receive_initial_metadata, 55 cygrpc.OperationType.receive_status_on_client, 56) 57_STREAM_UNARY_INITIAL_DUE = ( 58 cygrpc.OperationType.send_initial_metadata, 59 cygrpc.OperationType.receive_initial_metadata, 60 cygrpc.OperationType.receive_message, 61 cygrpc.OperationType.receive_status_on_client, 62) 63_STREAM_STREAM_INITIAL_DUE = ( 64 cygrpc.OperationType.send_initial_metadata, 65 cygrpc.OperationType.receive_initial_metadata, 66 cygrpc.OperationType.receive_status_on_client, 67) 68 69_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = ( 70 'Exception calling channel subscription callback!') 71 72_OK_RENDEZVOUS_REPR_FORMAT = ('<{} of RPC that terminated with:\n' 73 '\tstatus = {}\n' 74 '\tdetails = "{}"\n' 75 '>') 76 77_NON_OK_RENDEZVOUS_REPR_FORMAT = ('<{} of RPC that terminated with:\n' 78 '\tstatus = {}\n' 79 '\tdetails = "{}"\n' 80 '\tdebug_error_string = "{}"\n' 81 '>') 82 83 84def _deadline(timeout): 85 return None if timeout is None else time.time() + timeout 86 87 88def _unknown_code_details(unknown_cygrpc_code, details): 89 return 'Server sent unknown code {} and details "{}"'.format( 90 unknown_cygrpc_code, details) 91 92 93class _RPCState(object): 94 95 def __init__(self, due, initial_metadata, trailing_metadata, code, details): 96 self.condition = threading.Condition() 97 # The cygrpc.OperationType objects representing events due from the RPC's 98 # completion queue. 99 self.due = set(due) 100 self.initial_metadata = initial_metadata 101 self.response = None 102 self.trailing_metadata = trailing_metadata 103 self.code = code 104 self.details = details 105 self.debug_error_string = None 106 # The semantics of grpc.Future.cancel and grpc.Future.cancelled are 107 # slightly wonky, so they have to be tracked separately from the rest of the 108 # result of the RPC. This field tracks whether cancellation was requested 109 # prior to termination of the RPC. 110 self.cancelled = False 111 self.callbacks = [] 112 self.fork_epoch = cygrpc.get_fork_epoch() 113 114 def reset_postfork_child(self): 115 self.condition = threading.Condition() 116 117 118def _abort(state, code, details): 119 if state.code is None: 120 state.code = code 121 state.details = details 122 if state.initial_metadata is None: 123 state.initial_metadata = () 124 state.trailing_metadata = () 125 126 127def _handle_event(event, state, response_deserializer): 128 callbacks = [] 129 for batch_operation in event.batch_operations: 130 operation_type = batch_operation.type() 131 state.due.remove(operation_type) 132 if operation_type == cygrpc.OperationType.receive_initial_metadata: 133 state.initial_metadata = batch_operation.initial_metadata() 134 elif operation_type == cygrpc.OperationType.receive_message: 135 serialized_response = batch_operation.message() 136 if serialized_response is not None: 137 response = _common.deserialize(serialized_response, 138 response_deserializer) 139 if response is None: 140 details = 'Exception deserializing response!' 141 _abort(state, grpc.StatusCode.INTERNAL, details) 142 else: 143 state.response = response 144 elif operation_type == cygrpc.OperationType.receive_status_on_client: 145 state.trailing_metadata = batch_operation.trailing_metadata() 146 if state.code is None: 147 code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get( 148 batch_operation.code()) 149 if code is None: 150 state.code = grpc.StatusCode.UNKNOWN 151 state.details = _unknown_code_details( 152 code, batch_operation.details()) 153 else: 154 state.code = code 155 state.details = batch_operation.details() 156 state.debug_error_string = batch_operation.error_string() 157 callbacks.extend(state.callbacks) 158 state.callbacks = None 159 return callbacks 160 161 162def _event_handler(state, response_deserializer): 163 164 def handle_event(event): 165 with state.condition: 166 callbacks = _handle_event(event, state, response_deserializer) 167 state.condition.notify_all() 168 done = not state.due 169 for callback in callbacks: 170 try: 171 callback() 172 except Exception as e: # pylint: disable=broad-except 173 # NOTE(rbellevi): We suppress but log errors here so as not to 174 # kill the channel spin thread. 175 logging.error('Exception in callback %s: %s', 176 repr(callback.func), repr(e)) 177 return done and state.fork_epoch >= cygrpc.get_fork_epoch() 178 179 return handle_event 180 181 182#pylint: disable=too-many-statements 183def _consume_request_iterator(request_iterator, state, call, request_serializer, 184 event_handler): 185 """Consume a request iterator supplied by the user.""" 186 187 def consume_request_iterator(): # pylint: disable=too-many-branches 188 # Iterate over the request iterator until it is exhausted or an error 189 # condition is encountered. 190 while True: 191 return_from_user_request_generator_invoked = False 192 try: 193 # The thread may die in user-code. Do not block fork for this. 194 cygrpc.enter_user_request_generator() 195 request = next(request_iterator) 196 except StopIteration: 197 break 198 except Exception: # pylint: disable=broad-except 199 cygrpc.return_from_user_request_generator() 200 return_from_user_request_generator_invoked = True 201 code = grpc.StatusCode.UNKNOWN 202 details = 'Exception iterating requests!' 203 _LOGGER.exception(details) 204 call.cancel(_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], 205 details) 206 _abort(state, code, details) 207 return 208 finally: 209 if not return_from_user_request_generator_invoked: 210 cygrpc.return_from_user_request_generator() 211 serialized_request = _common.serialize(request, request_serializer) 212 with state.condition: 213 if state.code is None and not state.cancelled: 214 if serialized_request is None: 215 code = grpc.StatusCode.INTERNAL 216 details = 'Exception serializing request!' 217 call.cancel( 218 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], 219 details) 220 _abort(state, code, details) 221 return 222 else: 223 operations = (cygrpc.SendMessageOperation( 224 serialized_request, _EMPTY_FLAGS),) 225 operating = call.operate(operations, event_handler) 226 if operating: 227 state.due.add(cygrpc.OperationType.send_message) 228 else: 229 return 230 231 def _done(): 232 return (state.code is not None or 233 cygrpc.OperationType.send_message 234 not in state.due) 235 236 _common.wait(state.condition.wait, 237 _done, 238 spin_cb=functools.partial( 239 cygrpc.block_if_fork_in_progress, 240 state)) 241 if state.code is not None: 242 return 243 else: 244 return 245 with state.condition: 246 if state.code is None: 247 operations = ( 248 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),) 249 operating = call.operate(operations, event_handler) 250 if operating: 251 state.due.add(cygrpc.OperationType.send_close_from_client) 252 253 consumption_thread = cygrpc.ForkManagedThread( 254 target=consume_request_iterator) 255 consumption_thread.setDaemon(True) 256 consumption_thread.start() 257 258 259def _rpc_state_string(class_name, rpc_state): 260 """Calculates error string for RPC.""" 261 with rpc_state.condition: 262 if rpc_state.code is None: 263 return '<{} object>'.format(class_name) 264 elif rpc_state.code is grpc.StatusCode.OK: 265 return _OK_RENDEZVOUS_REPR_FORMAT.format(class_name, rpc_state.code, 266 rpc_state.details) 267 else: 268 return _NON_OK_RENDEZVOUS_REPR_FORMAT.format( 269 class_name, rpc_state.code, rpc_state.details, 270 rpc_state.debug_error_string) 271 272 273class _InactiveRpcError(grpc.RpcError, grpc.Call, grpc.Future): 274 """An RPC error not tied to the execution of a particular RPC. 275 276 The RPC represented by the state object must not be in-progress or 277 cancelled. 278 279 Attributes: 280 _state: An instance of _RPCState. 281 """ 282 283 def __init__(self, state): 284 with state.condition: 285 self._state = _RPCState((), copy.deepcopy(state.initial_metadata), 286 copy.deepcopy(state.trailing_metadata), 287 state.code, copy.deepcopy(state.details)) 288 self._state.response = copy.copy(state.response) 289 self._state.debug_error_string = copy.copy(state.debug_error_string) 290 291 def initial_metadata(self): 292 return self._state.initial_metadata 293 294 def trailing_metadata(self): 295 return self._state.trailing_metadata 296 297 def code(self): 298 return self._state.code 299 300 def details(self): 301 return _common.decode(self._state.details) 302 303 def debug_error_string(self): 304 return _common.decode(self._state.debug_error_string) 305 306 def _repr(self): 307 return _rpc_state_string(self.__class__.__name__, self._state) 308 309 def __repr__(self): 310 return self._repr() 311 312 def __str__(self): 313 return self._repr() 314 315 def cancel(self): 316 """See grpc.Future.cancel.""" 317 return False 318 319 def cancelled(self): 320 """See grpc.Future.cancelled.""" 321 return False 322 323 def running(self): 324 """See grpc.Future.running.""" 325 return False 326 327 def done(self): 328 """See grpc.Future.done.""" 329 return True 330 331 def result(self, timeout=None): # pylint: disable=unused-argument 332 """See grpc.Future.result.""" 333 raise self 334 335 def exception(self, timeout=None): # pylint: disable=unused-argument 336 """See grpc.Future.exception.""" 337 return self 338 339 def traceback(self, timeout=None): # pylint: disable=unused-argument 340 """See grpc.Future.traceback.""" 341 try: 342 raise self 343 except grpc.RpcError: 344 return sys.exc_info()[2] 345 346 def add_done_callback(self, fn, timeout=None): # pylint: disable=unused-argument 347 """See grpc.Future.add_done_callback.""" 348 fn(self) 349 350 351class _Rendezvous(grpc.RpcError, grpc.RpcContext): 352 """An RPC iterator. 353 354 Attributes: 355 _state: An instance of _RPCState. 356 _call: An instance of SegregatedCall or IntegratedCall. 357 In either case, the _call object is expected to have operate, cancel, 358 and next_event methods. 359 _response_deserializer: A callable taking bytes and return a Python 360 object. 361 _deadline: A float representing the deadline of the RPC in seconds. Or 362 possibly None, to represent an RPC with no deadline at all. 363 """ 364 365 def __init__(self, state, call, response_deserializer, deadline): 366 super(_Rendezvous, self).__init__() 367 self._state = state 368 self._call = call 369 self._response_deserializer = response_deserializer 370 self._deadline = deadline 371 372 def is_active(self): 373 """See grpc.RpcContext.is_active""" 374 with self._state.condition: 375 return self._state.code is None 376 377 def time_remaining(self): 378 """See grpc.RpcContext.time_remaining""" 379 with self._state.condition: 380 if self._deadline is None: 381 return None 382 else: 383 return max(self._deadline - time.time(), 0) 384 385 def cancel(self): 386 """See grpc.RpcContext.cancel""" 387 with self._state.condition: 388 if self._state.code is None: 389 code = grpc.StatusCode.CANCELLED 390 details = 'Locally cancelled by application!' 391 self._call.cancel( 392 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details) 393 self._state.cancelled = True 394 _abort(self._state, code, details) 395 self._state.condition.notify_all() 396 return True 397 else: 398 return False 399 400 def add_callback(self, callback): 401 """See grpc.RpcContext.add_callback""" 402 with self._state.condition: 403 if self._state.callbacks is None: 404 return False 405 else: 406 self._state.callbacks.append(callback) 407 return True 408 409 def __iter__(self): 410 return self 411 412 def next(self): 413 return self._next() 414 415 def __next__(self): 416 return self._next() 417 418 def _next(self): 419 raise NotImplementedError() 420 421 def debug_error_string(self): 422 raise NotImplementedError() 423 424 def _repr(self): 425 return _rpc_state_string(self.__class__.__name__, self._state) 426 427 def __repr__(self): 428 return self._repr() 429 430 def __str__(self): 431 return self._repr() 432 433 def __del__(self): 434 with self._state.condition: 435 if self._state.code is None: 436 self._state.code = grpc.StatusCode.CANCELLED 437 self._state.details = 'Cancelled upon garbage collection!' 438 self._state.cancelled = True 439 self._call.cancel( 440 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code], 441 self._state.details) 442 self._state.condition.notify_all() 443 444 445class _SingleThreadedRendezvous(_Rendezvous, grpc.Call, grpc.Future): # pylint: disable=too-many-ancestors 446 """An RPC iterator operating entirely on a single thread. 447 448 The __next__ method of _SingleThreadedRendezvous does not depend on the 449 existence of any other thread, including the "channel spin thread". 450 However, this means that its interface is entirely synchronous. So this 451 class cannot completely fulfill the grpc.Future interface. The result, 452 exception, and traceback methods will never block and will instead raise 453 an exception if calling the method would result in blocking. 454 455 This means that these methods are safe to call from add_done_callback 456 handlers. 457 """ 458 459 def _is_complete(self): 460 return self._state.code is not None 461 462 def cancelled(self): 463 with self._state.condition: 464 return self._state.cancelled 465 466 def running(self): 467 with self._state.condition: 468 return self._state.code is None 469 470 def done(self): 471 with self._state.condition: 472 return self._state.code is not None 473 474 def result(self, timeout=None): 475 """Returns the result of the computation or raises its exception. 476 477 This method will never block. Instead, it will raise an exception 478 if calling this method would otherwise result in blocking. 479 480 Since this method will never block, any `timeout` argument passed will 481 be ignored. 482 """ 483 del timeout 484 with self._state.condition: 485 if not self._is_complete(): 486 raise grpc.experimental.UsageError( 487 "_SingleThreadedRendezvous only supports result() when the RPC is complete." 488 ) 489 if self._state.code is grpc.StatusCode.OK: 490 return self._state.response 491 elif self._state.cancelled: 492 raise grpc.FutureCancelledError() 493 else: 494 raise self 495 496 def exception(self, timeout=None): 497 """Return the exception raised by the computation. 498 499 This method will never block. Instead, it will raise an exception 500 if calling this method would otherwise result in blocking. 501 502 Since this method will never block, any `timeout` argument passed will 503 be ignored. 504 """ 505 del timeout 506 with self._state.condition: 507 if not self._is_complete(): 508 raise grpc.experimental.UsageError( 509 "_SingleThreadedRendezvous only supports exception() when the RPC is complete." 510 ) 511 if self._state.code is grpc.StatusCode.OK: 512 return None 513 elif self._state.cancelled: 514 raise grpc.FutureCancelledError() 515 else: 516 return self 517 518 def traceback(self, timeout=None): 519 """Access the traceback of the exception raised by the computation. 520 521 This method will never block. Instead, it will raise an exception 522 if calling this method would otherwise result in blocking. 523 524 Since this method will never block, any `timeout` argument passed will 525 be ignored. 526 """ 527 del timeout 528 with self._state.condition: 529 if not self._is_complete(): 530 raise grpc.experimental.UsageError( 531 "_SingleThreadedRendezvous only supports traceback() when the RPC is complete." 532 ) 533 if self._state.code is grpc.StatusCode.OK: 534 return None 535 elif self._state.cancelled: 536 raise grpc.FutureCancelledError() 537 else: 538 try: 539 raise self 540 except grpc.RpcError: 541 return sys.exc_info()[2] 542 543 def add_done_callback(self, fn): 544 with self._state.condition: 545 if self._state.code is None: 546 self._state.callbacks.append(functools.partial(fn, self)) 547 return 548 549 fn(self) 550 551 def initial_metadata(self): 552 """See grpc.Call.initial_metadata""" 553 with self._state.condition: 554 # NOTE(gnossen): Based on our initial call batch, we are guaranteed 555 # to receive initial metadata before any messages. 556 while self._state.initial_metadata is None: 557 self._consume_next_event() 558 return self._state.initial_metadata 559 560 def trailing_metadata(self): 561 """See grpc.Call.trailing_metadata""" 562 with self._state.condition: 563 if self._state.trailing_metadata is None: 564 raise grpc.experimental.UsageError( 565 "Cannot get trailing metadata until RPC is completed.") 566 return self._state.trailing_metadata 567 568 def code(self): 569 """See grpc.Call.code""" 570 with self._state.condition: 571 if self._state.code is None: 572 raise grpc.experimental.UsageError( 573 "Cannot get code until RPC is completed.") 574 return self._state.code 575 576 def details(self): 577 """See grpc.Call.details""" 578 with self._state.condition: 579 if self._state.details is None: 580 raise grpc.experimental.UsageError( 581 "Cannot get details until RPC is completed.") 582 return _common.decode(self._state.details) 583 584 def _consume_next_event(self): 585 event = self._call.next_event() 586 with self._state.condition: 587 callbacks = _handle_event(event, self._state, 588 self._response_deserializer) 589 for callback in callbacks: 590 # NOTE(gnossen): We intentionally allow exceptions to bubble up 591 # to the user when running on a single thread. 592 callback() 593 return event 594 595 def _next_response(self): 596 while True: 597 self._consume_next_event() 598 with self._state.condition: 599 if self._state.response is not None: 600 response = self._state.response 601 self._state.response = None 602 return response 603 elif cygrpc.OperationType.receive_message not in self._state.due: 604 if self._state.code is grpc.StatusCode.OK: 605 raise StopIteration() 606 elif self._state.code is not None: 607 raise self 608 609 def _next(self): 610 with self._state.condition: 611 if self._state.code is None: 612 operating = self._call.operate( 613 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None) 614 if operating: 615 self._state.due.add(cygrpc.OperationType.receive_message) 616 elif self._state.code is grpc.StatusCode.OK: 617 raise StopIteration() 618 else: 619 raise self 620 return self._next_response() 621 622 def debug_error_string(self): 623 with self._state.condition: 624 if self._state.debug_error_string is None: 625 raise grpc.experimental.UsageError( 626 "Cannot get debug error string until RPC is completed.") 627 return _common.decode(self._state.debug_error_string) 628 629 630class _MultiThreadedRendezvous(_Rendezvous, grpc.Call, grpc.Future): # pylint: disable=too-many-ancestors 631 """An RPC iterator that depends on a channel spin thread. 632 633 This iterator relies upon a per-channel thread running in the background, 634 dequeueing events from the completion queue, and notifying threads waiting 635 on the threading.Condition object in the _RPCState object. 636 637 This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface 638 and to mediate a bidirection streaming RPC. 639 """ 640 641 def initial_metadata(self): 642 """See grpc.Call.initial_metadata""" 643 with self._state.condition: 644 645 def _done(): 646 return self._state.initial_metadata is not None 647 648 _common.wait(self._state.condition.wait, _done) 649 return self._state.initial_metadata 650 651 def trailing_metadata(self): 652 """See grpc.Call.trailing_metadata""" 653 with self._state.condition: 654 655 def _done(): 656 return self._state.trailing_metadata is not None 657 658 _common.wait(self._state.condition.wait, _done) 659 return self._state.trailing_metadata 660 661 def code(self): 662 """See grpc.Call.code""" 663 with self._state.condition: 664 665 def _done(): 666 return self._state.code is not None 667 668 _common.wait(self._state.condition.wait, _done) 669 return self._state.code 670 671 def details(self): 672 """See grpc.Call.details""" 673 with self._state.condition: 674 675 def _done(): 676 return self._state.details is not None 677 678 _common.wait(self._state.condition.wait, _done) 679 return _common.decode(self._state.details) 680 681 def debug_error_string(self): 682 with self._state.condition: 683 684 def _done(): 685 return self._state.debug_error_string is not None 686 687 _common.wait(self._state.condition.wait, _done) 688 return _common.decode(self._state.debug_error_string) 689 690 def cancelled(self): 691 with self._state.condition: 692 return self._state.cancelled 693 694 def running(self): 695 with self._state.condition: 696 return self._state.code is None 697 698 def done(self): 699 with self._state.condition: 700 return self._state.code is not None 701 702 def _is_complete(self): 703 return self._state.code is not None 704 705 def result(self, timeout=None): 706 """Returns the result of the computation or raises its exception. 707 708 See grpc.Future.result for the full API contract. 709 """ 710 with self._state.condition: 711 timed_out = _common.wait(self._state.condition.wait, 712 self._is_complete, 713 timeout=timeout) 714 if timed_out: 715 raise grpc.FutureTimeoutError() 716 else: 717 if self._state.code is grpc.StatusCode.OK: 718 return self._state.response 719 elif self._state.cancelled: 720 raise grpc.FutureCancelledError() 721 else: 722 raise self 723 724 def exception(self, timeout=None): 725 """Return the exception raised by the computation. 726 727 See grpc.Future.exception for the full API contract. 728 """ 729 with self._state.condition: 730 timed_out = _common.wait(self._state.condition.wait, 731 self._is_complete, 732 timeout=timeout) 733 if timed_out: 734 raise grpc.FutureTimeoutError() 735 else: 736 if self._state.code is grpc.StatusCode.OK: 737 return None 738 elif self._state.cancelled: 739 raise grpc.FutureCancelledError() 740 else: 741 return self 742 743 def traceback(self, timeout=None): 744 """Access the traceback of the exception raised by the computation. 745 746 See grpc.future.traceback for the full API contract. 747 """ 748 with self._state.condition: 749 timed_out = _common.wait(self._state.condition.wait, 750 self._is_complete, 751 timeout=timeout) 752 if timed_out: 753 raise grpc.FutureTimeoutError() 754 else: 755 if self._state.code is grpc.StatusCode.OK: 756 return None 757 elif self._state.cancelled: 758 raise grpc.FutureCancelledError() 759 else: 760 try: 761 raise self 762 except grpc.RpcError: 763 return sys.exc_info()[2] 764 765 def add_done_callback(self, fn): 766 with self._state.condition: 767 if self._state.code is None: 768 self._state.callbacks.append(functools.partial(fn, self)) 769 return 770 771 fn(self) 772 773 def _next(self): 774 with self._state.condition: 775 if self._state.code is None: 776 event_handler = _event_handler(self._state, 777 self._response_deserializer) 778 operating = self._call.operate( 779 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), 780 event_handler) 781 if operating: 782 self._state.due.add(cygrpc.OperationType.receive_message) 783 elif self._state.code is grpc.StatusCode.OK: 784 raise StopIteration() 785 else: 786 raise self 787 788 def _response_ready(): 789 return (self._state.response is not None or 790 (cygrpc.OperationType.receive_message 791 not in self._state.due and 792 self._state.code is not None)) 793 794 _common.wait(self._state.condition.wait, _response_ready) 795 if self._state.response is not None: 796 response = self._state.response 797 self._state.response = None 798 return response 799 elif cygrpc.OperationType.receive_message not in self._state.due: 800 if self._state.code is grpc.StatusCode.OK: 801 raise StopIteration() 802 elif self._state.code is not None: 803 raise self 804 805 806def _start_unary_request(request, timeout, request_serializer): 807 deadline = _deadline(timeout) 808 serialized_request = _common.serialize(request, request_serializer) 809 if serialized_request is None: 810 state = _RPCState((), (), (), grpc.StatusCode.INTERNAL, 811 'Exception serializing request!') 812 error = _InactiveRpcError(state) 813 return deadline, None, error 814 else: 815 return deadline, serialized_request, None 816 817 818def _end_unary_response_blocking(state, call, with_call, deadline): 819 if state.code is grpc.StatusCode.OK: 820 if with_call: 821 rendezvous = _MultiThreadedRendezvous(state, call, None, deadline) 822 return state.response, rendezvous 823 else: 824 return state.response 825 else: 826 raise _InactiveRpcError(state) 827 828 829def _stream_unary_invocation_operationses(metadata, initial_metadata_flags): 830 return ( 831 ( 832 cygrpc.SendInitialMetadataOperation(metadata, 833 initial_metadata_flags), 834 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), 835 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 836 ), 837 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 838 ) 839 840 841def _stream_unary_invocation_operationses_and_tags(metadata, 842 initial_metadata_flags): 843 return tuple(( 844 operations, 845 None, 846 ) for operations in _stream_unary_invocation_operationses( 847 metadata, initial_metadata_flags)) 848 849 850def _determine_deadline(user_deadline): 851 parent_deadline = cygrpc.get_deadline_from_context() 852 if parent_deadline is None and user_deadline is None: 853 return None 854 elif parent_deadline is not None and user_deadline is None: 855 return parent_deadline 856 elif user_deadline is not None and parent_deadline is None: 857 return user_deadline 858 else: 859 return min(parent_deadline, user_deadline) 860 861 862class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): 863 864 # pylint: disable=too-many-arguments 865 def __init__(self, channel, managed_call, method, request_serializer, 866 response_deserializer): 867 self._channel = channel 868 self._managed_call = managed_call 869 self._method = method 870 self._request_serializer = request_serializer 871 self._response_deserializer = response_deserializer 872 self._context = cygrpc.build_census_context() 873 874 def _prepare(self, request, timeout, metadata, wait_for_ready, compression): 875 deadline, serialized_request, rendezvous = _start_unary_request( 876 request, timeout, self._request_serializer) 877 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 878 wait_for_ready) 879 augmented_metadata = _compression.augment_metadata( 880 metadata, compression) 881 if serialized_request is None: 882 return None, None, None, rendezvous 883 else: 884 state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None) 885 operations = ( 886 cygrpc.SendInitialMetadataOperation(augmented_metadata, 887 initial_metadata_flags), 888 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), 889 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 890 cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), 891 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), 892 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 893 ) 894 return state, operations, deadline, None 895 896 def _blocking(self, request, timeout, metadata, credentials, wait_for_ready, 897 compression): 898 state, operations, deadline, rendezvous = self._prepare( 899 request, timeout, metadata, wait_for_ready, compression) 900 if state is None: 901 raise rendezvous # pylint: disable-msg=raising-bad-type 902 else: 903 call = self._channel.segregated_call( 904 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 905 self._method, None, _determine_deadline(deadline), metadata, 906 None if credentials is None else credentials._credentials, (( 907 operations, 908 None, 909 ),), self._context) 910 event = call.next_event() 911 _handle_event(event, state, self._response_deserializer) 912 return state, call 913 914 def __call__(self, 915 request, 916 timeout=None, 917 metadata=None, 918 credentials=None, 919 wait_for_ready=None, 920 compression=None): 921 state, call, = self._blocking(request, timeout, metadata, credentials, 922 wait_for_ready, compression) 923 return _end_unary_response_blocking(state, call, False, None) 924 925 def with_call(self, 926 request, 927 timeout=None, 928 metadata=None, 929 credentials=None, 930 wait_for_ready=None, 931 compression=None): 932 state, call, = self._blocking(request, timeout, metadata, credentials, 933 wait_for_ready, compression) 934 return _end_unary_response_blocking(state, call, True, None) 935 936 def future(self, 937 request, 938 timeout=None, 939 metadata=None, 940 credentials=None, 941 wait_for_ready=None, 942 compression=None): 943 state, operations, deadline, rendezvous = self._prepare( 944 request, timeout, metadata, wait_for_ready, compression) 945 if state is None: 946 raise rendezvous # pylint: disable-msg=raising-bad-type 947 else: 948 event_handler = _event_handler(state, self._response_deserializer) 949 call = self._managed_call( 950 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 951 self._method, None, deadline, metadata, 952 None if credentials is None else credentials._credentials, 953 (operations,), event_handler, self._context) 954 return _MultiThreadedRendezvous(state, call, 955 self._response_deserializer, 956 deadline) 957 958 959class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): 960 961 # pylint: disable=too-many-arguments 962 def __init__(self, channel, method, request_serializer, 963 response_deserializer): 964 self._channel = channel 965 self._method = method 966 self._request_serializer = request_serializer 967 self._response_deserializer = response_deserializer 968 self._context = cygrpc.build_census_context() 969 970 def __call__( # pylint: disable=too-many-locals 971 self, 972 request, 973 timeout=None, 974 metadata=None, 975 credentials=None, 976 wait_for_ready=None, 977 compression=None): 978 deadline = _deadline(timeout) 979 serialized_request = _common.serialize(request, 980 self._request_serializer) 981 if serialized_request is None: 982 state = _RPCState((), (), (), grpc.StatusCode.INTERNAL, 983 'Exception serializing request!') 984 raise _InactiveRpcError(state) 985 986 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) 987 call_credentials = None if credentials is None else credentials._credentials 988 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 989 wait_for_ready) 990 augmented_metadata = _compression.augment_metadata( 991 metadata, compression) 992 operations = ( 993 (cygrpc.SendInitialMetadataOperation(augmented_metadata, 994 initial_metadata_flags), 995 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), 996 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS)), 997 (cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),), 998 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 999 ) 1000 operations_and_tags = tuple((ops, None) for ops in operations) 1001 call = self._channel.segregated_call( 1002 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method, 1003 None, _determine_deadline(deadline), metadata, call_credentials, 1004 operations_and_tags, self._context) 1005 return _SingleThreadedRendezvous(state, call, 1006 self._response_deserializer, deadline) 1007 1008 1009class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): 1010 1011 # pylint: disable=too-many-arguments 1012 def __init__(self, channel, managed_call, method, request_serializer, 1013 response_deserializer): 1014 self._channel = channel 1015 self._managed_call = managed_call 1016 self._method = method 1017 self._request_serializer = request_serializer 1018 self._response_deserializer = response_deserializer 1019 self._context = cygrpc.build_census_context() 1020 1021 def __call__( # pylint: disable=too-many-locals 1022 self, 1023 request, 1024 timeout=None, 1025 metadata=None, 1026 credentials=None, 1027 wait_for_ready=None, 1028 compression=None): 1029 deadline, serialized_request, rendezvous = _start_unary_request( 1030 request, timeout, self._request_serializer) 1031 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 1032 wait_for_ready) 1033 if serialized_request is None: 1034 raise rendezvous # pylint: disable-msg=raising-bad-type 1035 else: 1036 augmented_metadata = _compression.augment_metadata( 1037 metadata, compression) 1038 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) 1039 operationses = ( 1040 ( 1041 cygrpc.SendInitialMetadataOperation(augmented_metadata, 1042 initial_metadata_flags), 1043 cygrpc.SendMessageOperation(serialized_request, 1044 _EMPTY_FLAGS), 1045 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 1046 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 1047 ), 1048 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 1049 ) 1050 call = self._managed_call( 1051 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 1052 self._method, None, _determine_deadline(deadline), metadata, 1053 None if credentials is None else credentials._credentials, 1054 operationses, _event_handler(state, 1055 self._response_deserializer), 1056 self._context) 1057 return _MultiThreadedRendezvous(state, call, 1058 self._response_deserializer, 1059 deadline) 1060 1061 1062class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): 1063 1064 # pylint: disable=too-many-arguments 1065 def __init__(self, channel, managed_call, method, request_serializer, 1066 response_deserializer): 1067 self._channel = channel 1068 self._managed_call = managed_call 1069 self._method = method 1070 self._request_serializer = request_serializer 1071 self._response_deserializer = response_deserializer 1072 self._context = cygrpc.build_census_context() 1073 1074 def _blocking(self, request_iterator, timeout, metadata, credentials, 1075 wait_for_ready, compression): 1076 deadline = _deadline(timeout) 1077 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) 1078 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 1079 wait_for_ready) 1080 augmented_metadata = _compression.augment_metadata( 1081 metadata, compression) 1082 call = self._channel.segregated_call( 1083 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method, 1084 None, _determine_deadline(deadline), augmented_metadata, 1085 None if credentials is None else credentials._credentials, 1086 _stream_unary_invocation_operationses_and_tags( 1087 augmented_metadata, initial_metadata_flags), self._context) 1088 _consume_request_iterator(request_iterator, state, call, 1089 self._request_serializer, None) 1090 while True: 1091 event = call.next_event() 1092 with state.condition: 1093 _handle_event(event, state, self._response_deserializer) 1094 state.condition.notify_all() 1095 if not state.due: 1096 break 1097 return state, call 1098 1099 def __call__(self, 1100 request_iterator, 1101 timeout=None, 1102 metadata=None, 1103 credentials=None, 1104 wait_for_ready=None, 1105 compression=None): 1106 state, call, = self._blocking(request_iterator, timeout, metadata, 1107 credentials, wait_for_ready, compression) 1108 return _end_unary_response_blocking(state, call, False, None) 1109 1110 def with_call(self, 1111 request_iterator, 1112 timeout=None, 1113 metadata=None, 1114 credentials=None, 1115 wait_for_ready=None, 1116 compression=None): 1117 state, call, = self._blocking(request_iterator, timeout, metadata, 1118 credentials, wait_for_ready, compression) 1119 return _end_unary_response_blocking(state, call, True, None) 1120 1121 def future(self, 1122 request_iterator, 1123 timeout=None, 1124 metadata=None, 1125 credentials=None, 1126 wait_for_ready=None, 1127 compression=None): 1128 deadline = _deadline(timeout) 1129 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) 1130 event_handler = _event_handler(state, self._response_deserializer) 1131 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 1132 wait_for_ready) 1133 augmented_metadata = _compression.augment_metadata( 1134 metadata, compression) 1135 call = self._managed_call( 1136 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method, 1137 None, deadline, augmented_metadata, 1138 None if credentials is None else credentials._credentials, 1139 _stream_unary_invocation_operationses(metadata, 1140 initial_metadata_flags), 1141 event_handler, self._context) 1142 _consume_request_iterator(request_iterator, state, call, 1143 self._request_serializer, event_handler) 1144 return _MultiThreadedRendezvous(state, call, 1145 self._response_deserializer, deadline) 1146 1147 1148class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): 1149 1150 # pylint: disable=too-many-arguments 1151 def __init__(self, channel, managed_call, method, request_serializer, 1152 response_deserializer): 1153 self._channel = channel 1154 self._managed_call = managed_call 1155 self._method = method 1156 self._request_serializer = request_serializer 1157 self._response_deserializer = response_deserializer 1158 self._context = cygrpc.build_census_context() 1159 1160 def __call__(self, 1161 request_iterator, 1162 timeout=None, 1163 metadata=None, 1164 credentials=None, 1165 wait_for_ready=None, 1166 compression=None): 1167 deadline = _deadline(timeout) 1168 state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None) 1169 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 1170 wait_for_ready) 1171 augmented_metadata = _compression.augment_metadata( 1172 metadata, compression) 1173 operationses = ( 1174 ( 1175 cygrpc.SendInitialMetadataOperation(augmented_metadata, 1176 initial_metadata_flags), 1177 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 1178 ), 1179 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 1180 ) 1181 event_handler = _event_handler(state, self._response_deserializer) 1182 call = self._managed_call( 1183 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method, 1184 None, _determine_deadline(deadline), augmented_metadata, 1185 None if credentials is None else credentials._credentials, 1186 operationses, event_handler, self._context) 1187 _consume_request_iterator(request_iterator, state, call, 1188 self._request_serializer, event_handler) 1189 return _MultiThreadedRendezvous(state, call, 1190 self._response_deserializer, deadline) 1191 1192 1193class _InitialMetadataFlags(int): 1194 """Stores immutable initial metadata flags""" 1195 1196 def __new__(cls, value=_EMPTY_FLAGS): 1197 value &= cygrpc.InitialMetadataFlags.used_mask 1198 return super(_InitialMetadataFlags, cls).__new__(cls, value) 1199 1200 def with_wait_for_ready(self, wait_for_ready): 1201 if wait_for_ready is not None: 1202 if wait_for_ready: 1203 return self.__class__(self | cygrpc.InitialMetadataFlags.wait_for_ready | \ 1204 cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set) 1205 elif not wait_for_ready: 1206 return self.__class__(self & ~cygrpc.InitialMetadataFlags.wait_for_ready | \ 1207 cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set) 1208 return self 1209 1210 1211class _ChannelCallState(object): 1212 1213 def __init__(self, channel): 1214 self.lock = threading.Lock() 1215 self.channel = channel 1216 self.managed_calls = 0 1217 self.threading = False 1218 1219 def reset_postfork_child(self): 1220 self.managed_calls = 0 1221 1222 def __del__(self): 1223 try: 1224 self.channel.close(cygrpc.StatusCode.cancelled, 1225 'Channel deallocated!') 1226 except (TypeError, AttributeError): 1227 pass 1228 1229 1230def _run_channel_spin_thread(state): 1231 1232 def channel_spin(): 1233 while True: 1234 cygrpc.block_if_fork_in_progress(state) 1235 event = state.channel.next_call_event() 1236 if event.completion_type == cygrpc.CompletionType.queue_timeout: 1237 continue 1238 call_completed = event.tag(event) 1239 if call_completed: 1240 with state.lock: 1241 state.managed_calls -= 1 1242 if state.managed_calls == 0: 1243 return 1244 1245 channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin) 1246 channel_spin_thread.setDaemon(True) 1247 channel_spin_thread.start() 1248 1249 1250def _channel_managed_call_management(state): 1251 1252 # pylint: disable=too-many-arguments 1253 def create(flags, method, host, deadline, metadata, credentials, 1254 operationses, event_handler, context): 1255 """Creates a cygrpc.IntegratedCall. 1256 1257 Args: 1258 flags: An integer bitfield of call flags. 1259 method: The RPC method. 1260 host: A host string for the created call. 1261 deadline: A float to be the deadline of the created call or None if 1262 the call is to have an infinite deadline. 1263 metadata: The metadata for the call or None. 1264 credentials: A cygrpc.CallCredentials or None. 1265 operationses: An iterable of iterables of cygrpc.Operations to be 1266 started on the call. 1267 event_handler: A behavior to call to handle the events resultant from 1268 the operations on the call. 1269 context: Context object for distributed tracing. 1270 Returns: 1271 A cygrpc.IntegratedCall with which to conduct an RPC. 1272 """ 1273 operationses_and_tags = tuple(( 1274 operations, 1275 event_handler, 1276 ) for operations in operationses) 1277 with state.lock: 1278 call = state.channel.integrated_call(flags, method, host, deadline, 1279 metadata, credentials, 1280 operationses_and_tags, context) 1281 if state.managed_calls == 0: 1282 state.managed_calls = 1 1283 _run_channel_spin_thread(state) 1284 else: 1285 state.managed_calls += 1 1286 return call 1287 1288 return create 1289 1290 1291class _ChannelConnectivityState(object): 1292 1293 def __init__(self, channel): 1294 self.lock = threading.RLock() 1295 self.channel = channel 1296 self.polling = False 1297 self.connectivity = None 1298 self.try_to_connect = False 1299 self.callbacks_and_connectivities = [] 1300 self.delivering = False 1301 1302 def reset_postfork_child(self): 1303 self.polling = False 1304 self.connectivity = None 1305 self.try_to_connect = False 1306 self.callbacks_and_connectivities = [] 1307 self.delivering = False 1308 1309 1310def _deliveries(state): 1311 callbacks_needing_update = [] 1312 for callback_and_connectivity in state.callbacks_and_connectivities: 1313 callback, callback_connectivity, = callback_and_connectivity 1314 if callback_connectivity is not state.connectivity: 1315 callbacks_needing_update.append(callback) 1316 callback_and_connectivity[1] = state.connectivity 1317 return callbacks_needing_update 1318 1319 1320def _deliver(state, initial_connectivity, initial_callbacks): 1321 connectivity = initial_connectivity 1322 callbacks = initial_callbacks 1323 while True: 1324 for callback in callbacks: 1325 cygrpc.block_if_fork_in_progress(state) 1326 try: 1327 callback(connectivity) 1328 except Exception: # pylint: disable=broad-except 1329 _LOGGER.exception( 1330 _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE) 1331 with state.lock: 1332 callbacks = _deliveries(state) 1333 if callbacks: 1334 connectivity = state.connectivity 1335 else: 1336 state.delivering = False 1337 return 1338 1339 1340def _spawn_delivery(state, callbacks): 1341 delivering_thread = cygrpc.ForkManagedThread(target=_deliver, 1342 args=( 1343 state, 1344 state.connectivity, 1345 callbacks, 1346 )) 1347 delivering_thread.setDaemon(True) 1348 delivering_thread.start() 1349 state.delivering = True 1350 1351 1352# NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll. 1353def _poll_connectivity(state, channel, initial_try_to_connect): 1354 try_to_connect = initial_try_to_connect 1355 connectivity = channel.check_connectivity_state(try_to_connect) 1356 with state.lock: 1357 state.connectivity = ( 1358 _common. 1359 CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[connectivity]) 1360 callbacks = tuple( 1361 callback for callback, unused_but_known_to_be_none_connectivity in 1362 state.callbacks_and_connectivities) 1363 for callback_and_connectivity in state.callbacks_and_connectivities: 1364 callback_and_connectivity[1] = state.connectivity 1365 if callbacks: 1366 _spawn_delivery(state, callbacks) 1367 while True: 1368 event = channel.watch_connectivity_state(connectivity, 1369 time.time() + 0.2) 1370 cygrpc.block_if_fork_in_progress(state) 1371 with state.lock: 1372 if not state.callbacks_and_connectivities and not state.try_to_connect: 1373 state.polling = False 1374 state.connectivity = None 1375 break 1376 try_to_connect = state.try_to_connect 1377 state.try_to_connect = False 1378 if event.success or try_to_connect: 1379 connectivity = channel.check_connectivity_state(try_to_connect) 1380 with state.lock: 1381 state.connectivity = ( 1382 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[ 1383 connectivity]) 1384 if not state.delivering: 1385 callbacks = _deliveries(state) 1386 if callbacks: 1387 _spawn_delivery(state, callbacks) 1388 1389 1390def _subscribe(state, callback, try_to_connect): 1391 with state.lock: 1392 if not state.callbacks_and_connectivities and not state.polling: 1393 polling_thread = cygrpc.ForkManagedThread( 1394 target=_poll_connectivity, 1395 args=(state, state.channel, bool(try_to_connect))) 1396 polling_thread.setDaemon(True) 1397 polling_thread.start() 1398 state.polling = True 1399 state.callbacks_and_connectivities.append([callback, None]) 1400 elif not state.delivering and state.connectivity is not None: 1401 _spawn_delivery(state, (callback,)) 1402 state.try_to_connect |= bool(try_to_connect) 1403 state.callbacks_and_connectivities.append( 1404 [callback, state.connectivity]) 1405 else: 1406 state.try_to_connect |= bool(try_to_connect) 1407 state.callbacks_and_connectivities.append([callback, None]) 1408 1409 1410def _unsubscribe(state, callback): 1411 with state.lock: 1412 for index, (subscribed_callback, unused_connectivity) in enumerate( 1413 state.callbacks_and_connectivities): 1414 if callback == subscribed_callback: 1415 state.callbacks_and_connectivities.pop(index) 1416 break 1417 1418 1419def _augment_options(base_options, compression): 1420 compression_option = _compression.create_channel_option(compression) 1421 return tuple(base_options) + compression_option + (( 1422 cygrpc.ChannelArgKey.primary_user_agent_string, 1423 _USER_AGENT, 1424 ),) 1425 1426 1427def _separate_channel_options(options): 1428 """Separates core channel options from Python channel options.""" 1429 core_options = [] 1430 python_options = [] 1431 for pair in options: 1432 if pair[0] == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream: 1433 python_options.append(pair) 1434 else: 1435 core_options.append(pair) 1436 return python_options, core_options 1437 1438 1439class Channel(grpc.Channel): 1440 """A cygrpc.Channel-backed implementation of grpc.Channel.""" 1441 1442 def __init__(self, target, options, credentials, compression): 1443 """Constructor. 1444 1445 Args: 1446 target: The target to which to connect. 1447 options: Configuration options for the channel. 1448 credentials: A cygrpc.ChannelCredentials or None. 1449 compression: An optional value indicating the compression method to be 1450 used over the lifetime of the channel. 1451 """ 1452 python_options, core_options = _separate_channel_options(options) 1453 self._single_threaded_unary_stream = _DEFAULT_SINGLE_THREADED_UNARY_STREAM 1454 self._process_python_options(python_options) 1455 self._channel = cygrpc.Channel( 1456 _common.encode(target), _augment_options(core_options, compression), 1457 credentials) 1458 self._call_state = _ChannelCallState(self._channel) 1459 self._connectivity_state = _ChannelConnectivityState(self._channel) 1460 cygrpc.fork_register_channel(self) 1461 1462 def _process_python_options(self, python_options): 1463 """Sets channel attributes according to python-only channel options.""" 1464 for pair in python_options: 1465 if pair[0] == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream: 1466 self._single_threaded_unary_stream = True 1467 1468 def subscribe(self, callback, try_to_connect=None): 1469 _subscribe(self._connectivity_state, callback, try_to_connect) 1470 1471 def unsubscribe(self, callback): 1472 _unsubscribe(self._connectivity_state, callback) 1473 1474 def unary_unary(self, 1475 method, 1476 request_serializer=None, 1477 response_deserializer=None): 1478 return _UnaryUnaryMultiCallable( 1479 self._channel, _channel_managed_call_management(self._call_state), 1480 _common.encode(method), request_serializer, response_deserializer) 1481 1482 def unary_stream(self, 1483 method, 1484 request_serializer=None, 1485 response_deserializer=None): 1486 # NOTE(rbellevi): Benchmarks have shown that running a unary-stream RPC 1487 # on a single Python thread results in an appreciable speed-up. However, 1488 # due to slight differences in capability, the multi-threaded variant 1489 # remains the default. 1490 if self._single_threaded_unary_stream: 1491 return _SingleThreadedUnaryStreamMultiCallable( 1492 self._channel, _common.encode(method), request_serializer, 1493 response_deserializer) 1494 else: 1495 return _UnaryStreamMultiCallable( 1496 self._channel, 1497 _channel_managed_call_management(self._call_state), 1498 _common.encode(method), request_serializer, 1499 response_deserializer) 1500 1501 def stream_unary(self, 1502 method, 1503 request_serializer=None, 1504 response_deserializer=None): 1505 return _StreamUnaryMultiCallable( 1506 self._channel, _channel_managed_call_management(self._call_state), 1507 _common.encode(method), request_serializer, response_deserializer) 1508 1509 def stream_stream(self, 1510 method, 1511 request_serializer=None, 1512 response_deserializer=None): 1513 return _StreamStreamMultiCallable( 1514 self._channel, _channel_managed_call_management(self._call_state), 1515 _common.encode(method), request_serializer, response_deserializer) 1516 1517 def _unsubscribe_all(self): 1518 state = self._connectivity_state 1519 if state: 1520 with state.lock: 1521 del state.callbacks_and_connectivities[:] 1522 1523 def _close(self): 1524 self._unsubscribe_all() 1525 self._channel.close(cygrpc.StatusCode.cancelled, 'Channel closed!') 1526 cygrpc.fork_unregister_channel(self) 1527 1528 def _close_on_fork(self): 1529 self._unsubscribe_all() 1530 self._channel.close_on_fork(cygrpc.StatusCode.cancelled, 1531 'Channel closed due to fork') 1532 1533 def __enter__(self): 1534 return self 1535 1536 def __exit__(self, exc_type, exc_val, exc_tb): 1537 self._close() 1538 return False 1539 1540 def close(self): 1541 self._close() 1542 1543 def __del__(self): 1544 # TODO(https://github.com/grpc/grpc/issues/12531): Several releases 1545 # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call 1546 # here (or more likely, call self._close() here). We don't do this today 1547 # because many valid use cases today allow the channel to be deleted 1548 # immediately after stubs are created. After a sufficient period of time 1549 # has passed for all users to be trusted to hang out to their channels 1550 # for as long as they are in use and to close them after using them, 1551 # then deletion of this grpc._channel.Channel instance can be made to 1552 # effect closure of the underlying cygrpc.Channel instance. 1553 try: 1554 self._unsubscribe_all() 1555 except: # pylint: disable=bare-except 1556 # Exceptions in __del__ are ignored by Python anyway, but they can 1557 # keep spamming logs. Just silence them. 1558 pass 1559