1# Copyright 2016 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Invocation-side implementation of gRPC Python.""" 15 16import logging 17import sys 18import threading 19import time 20 21import grpc 22from grpc import _common 23from grpc import _grpcio_metadata 24from grpc._cython import cygrpc 25from grpc.framework.foundation import callable_util 26 27logging.basicConfig() 28_LOGGER = logging.getLogger(__name__) 29 30_USER_AGENT = 'grpc-python/{}'.format(_grpcio_metadata.__version__) 31 32_EMPTY_FLAGS = 0 33 34_UNARY_UNARY_INITIAL_DUE = ( 35 cygrpc.OperationType.send_initial_metadata, 36 cygrpc.OperationType.send_message, 37 cygrpc.OperationType.send_close_from_client, 38 cygrpc.OperationType.receive_initial_metadata, 39 cygrpc.OperationType.receive_message, 40 cygrpc.OperationType.receive_status_on_client, 41) 42_UNARY_STREAM_INITIAL_DUE = ( 43 cygrpc.OperationType.send_initial_metadata, 44 cygrpc.OperationType.send_message, 45 cygrpc.OperationType.send_close_from_client, 46 cygrpc.OperationType.receive_initial_metadata, 47 cygrpc.OperationType.receive_status_on_client, 48) 49_STREAM_UNARY_INITIAL_DUE = ( 50 cygrpc.OperationType.send_initial_metadata, 51 cygrpc.OperationType.receive_initial_metadata, 52 cygrpc.OperationType.receive_message, 53 cygrpc.OperationType.receive_status_on_client, 54) 55_STREAM_STREAM_INITIAL_DUE = ( 56 cygrpc.OperationType.send_initial_metadata, 57 cygrpc.OperationType.receive_initial_metadata, 58 cygrpc.OperationType.receive_status_on_client, 59) 60 61_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = ( 62 'Exception calling channel subscription callback!') 63 64_OK_RENDEZVOUS_REPR_FORMAT = ('<_Rendezvous of RPC that terminated with:\n' 65 '\tstatus = {}\n' 66 '\tdetails = "{}"\n' 67 '>') 68 69_NON_OK_RENDEZVOUS_REPR_FORMAT = ('<_Rendezvous of RPC that terminated with:\n' 70 '\tstatus = {}\n' 71 '\tdetails = "{}"\n' 72 '\tdebug_error_string = "{}"\n' 73 '>') 74 75 76def _deadline(timeout): 77 return None if timeout is None else time.time() + timeout 78 79 80def _unknown_code_details(unknown_cygrpc_code, details): 81 return 'Server sent unknown code {} and details "{}"'.format( 82 unknown_cygrpc_code, details) 83 84 85def _wait_once_until(condition, until): 86 if until is None: 87 condition.wait() 88 else: 89 remaining = until - time.time() 90 if remaining < 0: 91 raise grpc.FutureTimeoutError() 92 else: 93 condition.wait(timeout=remaining) 94 95 96class _RPCState(object): 97 98 def __init__(self, due, initial_metadata, trailing_metadata, code, details): 99 self.condition = threading.Condition() 100 # The cygrpc.OperationType objects representing events due from the RPC's 101 # completion queue. 102 self.due = set(due) 103 self.initial_metadata = initial_metadata 104 self.response = None 105 self.trailing_metadata = trailing_metadata 106 self.code = code 107 self.details = details 108 self.debug_error_string = None 109 # The semantics of grpc.Future.cancel and grpc.Future.cancelled are 110 # slightly wonky, so they have to be tracked separately from the rest of the 111 # result of the RPC. This field tracks whether cancellation was requested 112 # prior to termination of the RPC. 113 self.cancelled = False 114 self.callbacks = [] 115 self.fork_epoch = cygrpc.get_fork_epoch() 116 117 def reset_postfork_child(self): 118 self.condition = threading.Condition() 119 120 121def _abort(state, code, details): 122 if state.code is None: 123 state.code = code 124 state.details = details 125 if state.initial_metadata is None: 126 state.initial_metadata = () 127 state.trailing_metadata = () 128 129 130def _handle_event(event, state, response_deserializer): 131 callbacks = [] 132 for batch_operation in event.batch_operations: 133 operation_type = batch_operation.type() 134 state.due.remove(operation_type) 135 if operation_type == cygrpc.OperationType.receive_initial_metadata: 136 state.initial_metadata = batch_operation.initial_metadata() 137 elif operation_type == cygrpc.OperationType.receive_message: 138 serialized_response = batch_operation.message() 139 if serialized_response is not None: 140 response = _common.deserialize(serialized_response, 141 response_deserializer) 142 if response is None: 143 details = 'Exception deserializing response!' 144 _abort(state, grpc.StatusCode.INTERNAL, details) 145 else: 146 state.response = response 147 elif operation_type == cygrpc.OperationType.receive_status_on_client: 148 state.trailing_metadata = batch_operation.trailing_metadata() 149 if state.code is None: 150 code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get( 151 batch_operation.code()) 152 if code is None: 153 state.code = grpc.StatusCode.UNKNOWN 154 state.details = _unknown_code_details( 155 code, batch_operation.details()) 156 else: 157 state.code = code 158 state.details = batch_operation.details() 159 state.debug_error_string = batch_operation.error_string() 160 callbacks.extend(state.callbacks) 161 state.callbacks = None 162 return callbacks 163 164 165def _event_handler(state, response_deserializer): 166 167 def handle_event(event): 168 with state.condition: 169 callbacks = _handle_event(event, state, response_deserializer) 170 state.condition.notify_all() 171 done = not state.due 172 for callback in callbacks: 173 callback() 174 return done and state.fork_epoch >= cygrpc.get_fork_epoch() 175 176 return handle_event 177 178 179def _consume_request_iterator(request_iterator, state, call, request_serializer, 180 event_handler): 181 if cygrpc.is_fork_support_enabled(): 182 condition_wait_timeout = 1.0 183 else: 184 condition_wait_timeout = None 185 186 def consume_request_iterator(): # pylint: disable=too-many-branches 187 while True: 188 return_from_user_request_generator_invoked = False 189 try: 190 # The thread may die in user-code. Do not block fork for this. 191 cygrpc.enter_user_request_generator() 192 request = next(request_iterator) 193 except StopIteration: 194 break 195 except Exception: # pylint: disable=broad-except 196 cygrpc.return_from_user_request_generator() 197 return_from_user_request_generator_invoked = True 198 code = grpc.StatusCode.UNKNOWN 199 details = 'Exception iterating requests!' 200 _LOGGER.exception(details) 201 call.cancel(_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], 202 details) 203 _abort(state, code, details) 204 return 205 finally: 206 if not return_from_user_request_generator_invoked: 207 cygrpc.return_from_user_request_generator() 208 serialized_request = _common.serialize(request, request_serializer) 209 with state.condition: 210 if state.code is None and not state.cancelled: 211 if serialized_request is None: 212 code = grpc.StatusCode.INTERNAL 213 details = 'Exception serializing request!' 214 call.cancel( 215 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], 216 details) 217 _abort(state, code, details) 218 return 219 else: 220 operations = (cygrpc.SendMessageOperation( 221 serialized_request, _EMPTY_FLAGS),) 222 operating = call.operate(operations, event_handler) 223 if operating: 224 state.due.add(cygrpc.OperationType.send_message) 225 else: 226 return 227 while True: 228 state.condition.wait(condition_wait_timeout) 229 cygrpc.block_if_fork_in_progress(state) 230 if state.code is None: 231 if cygrpc.OperationType.send_message not in state.due: 232 break 233 else: 234 return 235 else: 236 return 237 with state.condition: 238 if state.code is None: 239 operations = ( 240 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),) 241 operating = call.operate(operations, event_handler) 242 if operating: 243 state.due.add(cygrpc.OperationType.send_close_from_client) 244 245 consumption_thread = cygrpc.ForkManagedThread( 246 target=consume_request_iterator) 247 consumption_thread.setDaemon(True) 248 consumption_thread.start() 249 250 251class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call): 252 253 def __init__(self, state, call, response_deserializer, deadline): 254 super(_Rendezvous, self).__init__() 255 self._state = state 256 self._call = call 257 self._response_deserializer = response_deserializer 258 self._deadline = deadline 259 260 def cancel(self): 261 with self._state.condition: 262 if self._state.code is None: 263 code = grpc.StatusCode.CANCELLED 264 details = 'Locally cancelled by application!' 265 self._call.cancel( 266 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details) 267 self._state.cancelled = True 268 _abort(self._state, code, details) 269 self._state.condition.notify_all() 270 return False 271 272 def cancelled(self): 273 with self._state.condition: 274 return self._state.cancelled 275 276 def running(self): 277 with self._state.condition: 278 return self._state.code is None 279 280 def done(self): 281 with self._state.condition: 282 return self._state.code is not None 283 284 def result(self, timeout=None): 285 until = None if timeout is None else time.time() + timeout 286 with self._state.condition: 287 while True: 288 if self._state.code is None: 289 _wait_once_until(self._state.condition, until) 290 elif self._state.code is grpc.StatusCode.OK: 291 return self._state.response 292 elif self._state.cancelled: 293 raise grpc.FutureCancelledError() 294 else: 295 raise self 296 297 def exception(self, timeout=None): 298 until = None if timeout is None else time.time() + timeout 299 with self._state.condition: 300 while True: 301 if self._state.code is None: 302 _wait_once_until(self._state.condition, until) 303 elif self._state.code is grpc.StatusCode.OK: 304 return None 305 elif self._state.cancelled: 306 raise grpc.FutureCancelledError() 307 else: 308 return self 309 310 def traceback(self, timeout=None): 311 until = None if timeout is None else time.time() + timeout 312 with self._state.condition: 313 while True: 314 if self._state.code is None: 315 _wait_once_until(self._state.condition, until) 316 elif self._state.code is grpc.StatusCode.OK: 317 return None 318 elif self._state.cancelled: 319 raise grpc.FutureCancelledError() 320 else: 321 try: 322 raise self 323 except grpc.RpcError: 324 return sys.exc_info()[2] 325 326 def add_done_callback(self, fn): 327 with self._state.condition: 328 if self._state.code is None: 329 self._state.callbacks.append(lambda: fn(self)) 330 return 331 332 fn(self) 333 334 def _next(self): 335 with self._state.condition: 336 if self._state.code is None: 337 event_handler = _event_handler(self._state, 338 self._response_deserializer) 339 operating = self._call.operate( 340 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), 341 event_handler) 342 if operating: 343 self._state.due.add(cygrpc.OperationType.receive_message) 344 elif self._state.code is grpc.StatusCode.OK: 345 raise StopIteration() 346 else: 347 raise self 348 while True: 349 self._state.condition.wait() 350 if self._state.response is not None: 351 response = self._state.response 352 self._state.response = None 353 return response 354 elif cygrpc.OperationType.receive_message not in self._state.due: 355 if self._state.code is grpc.StatusCode.OK: 356 raise StopIteration() 357 elif self._state.code is not None: 358 raise self 359 360 def __iter__(self): 361 return self 362 363 def __next__(self): 364 return self._next() 365 366 def next(self): 367 return self._next() 368 369 def is_active(self): 370 with self._state.condition: 371 return self._state.code is None 372 373 def time_remaining(self): 374 if self._deadline is None: 375 return None 376 else: 377 return max(self._deadline - time.time(), 0) 378 379 def add_callback(self, callback): 380 with self._state.condition: 381 if self._state.callbacks is None: 382 return False 383 else: 384 self._state.callbacks.append(callback) 385 return True 386 387 def initial_metadata(self): 388 with self._state.condition: 389 while self._state.initial_metadata is None: 390 self._state.condition.wait() 391 return self._state.initial_metadata 392 393 def trailing_metadata(self): 394 with self._state.condition: 395 while self._state.trailing_metadata is None: 396 self._state.condition.wait() 397 return self._state.trailing_metadata 398 399 def code(self): 400 with self._state.condition: 401 while self._state.code is None: 402 self._state.condition.wait() 403 return self._state.code 404 405 def details(self): 406 with self._state.condition: 407 while self._state.details is None: 408 self._state.condition.wait() 409 return _common.decode(self._state.details) 410 411 def debug_error_string(self): 412 with self._state.condition: 413 while self._state.debug_error_string is None: 414 self._state.condition.wait() 415 return _common.decode(self._state.debug_error_string) 416 417 def _repr(self): 418 with self._state.condition: 419 if self._state.code is None: 420 return '<_Rendezvous object of in-flight RPC>' 421 elif self._state.code is grpc.StatusCode.OK: 422 return _OK_RENDEZVOUS_REPR_FORMAT.format( 423 self._state.code, self._state.details) 424 else: 425 return _NON_OK_RENDEZVOUS_REPR_FORMAT.format( 426 self._state.code, self._state.details, 427 self._state.debug_error_string) 428 429 def __repr__(self): 430 return self._repr() 431 432 def __str__(self): 433 return self._repr() 434 435 def __del__(self): 436 with self._state.condition: 437 if self._state.code is None: 438 self._state.code = grpc.StatusCode.CANCELLED 439 self._state.details = 'Cancelled upon garbage collection!' 440 self._state.cancelled = True 441 self._call.cancel( 442 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code], 443 self._state.details) 444 self._state.condition.notify_all() 445 446 447def _start_unary_request(request, timeout, request_serializer): 448 deadline = _deadline(timeout) 449 serialized_request = _common.serialize(request, request_serializer) 450 if serialized_request is None: 451 state = _RPCState((), (), (), grpc.StatusCode.INTERNAL, 452 'Exception serializing request!') 453 rendezvous = _Rendezvous(state, None, None, deadline) 454 return deadline, None, rendezvous 455 else: 456 return deadline, serialized_request, None 457 458 459def _end_unary_response_blocking(state, call, with_call, deadline): 460 if state.code is grpc.StatusCode.OK: 461 if with_call: 462 rendezvous = _Rendezvous(state, call, None, deadline) 463 return state.response, rendezvous 464 else: 465 return state.response 466 else: 467 raise _Rendezvous(state, None, None, deadline) 468 469 470def _stream_unary_invocation_operationses(metadata): 471 return ( 472 ( 473 cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), 474 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), 475 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 476 ), 477 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 478 ) 479 480 481def _stream_unary_invocation_operationses_and_tags(metadata): 482 return tuple(( 483 operations, 484 None, 485 ) for operations in _stream_unary_invocation_operationses(metadata)) 486 487 488class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): 489 490 def __init__(self, channel, managed_call, method, request_serializer, 491 response_deserializer): 492 self._channel = channel 493 self._managed_call = managed_call 494 self._method = method 495 self._request_serializer = request_serializer 496 self._response_deserializer = response_deserializer 497 498 def _prepare(self, request, timeout, metadata): 499 deadline, serialized_request, rendezvous = _start_unary_request( 500 request, timeout, self._request_serializer) 501 if serialized_request is None: 502 return None, None, None, rendezvous 503 else: 504 state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None) 505 operations = ( 506 cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), 507 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), 508 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 509 cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), 510 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), 511 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 512 ) 513 return state, operations, deadline, None 514 515 def _blocking(self, request, timeout, metadata, credentials): 516 state, operations, deadline, rendezvous = self._prepare( 517 request, timeout, metadata) 518 if state is None: 519 raise rendezvous 520 else: 521 call = self._channel.segregated_call( 522 0, self._method, None, deadline, metadata, None 523 if credentials is None else credentials._credentials, (( 524 operations, 525 None, 526 ),)) 527 event = call.next_event() 528 _handle_event(event, state, self._response_deserializer) 529 return state, call, 530 531 def __call__(self, request, timeout=None, metadata=None, credentials=None): 532 state, call, = self._blocking(request, timeout, metadata, credentials) 533 return _end_unary_response_blocking(state, call, False, None) 534 535 def with_call(self, request, timeout=None, metadata=None, credentials=None): 536 state, call, = self._blocking(request, timeout, metadata, credentials) 537 return _end_unary_response_blocking(state, call, True, None) 538 539 def future(self, request, timeout=None, metadata=None, credentials=None): 540 state, operations, deadline, rendezvous = self._prepare( 541 request, timeout, metadata) 542 if state is None: 543 raise rendezvous 544 else: 545 event_handler = _event_handler(state, self._response_deserializer) 546 call = self._managed_call( 547 0, self._method, None, deadline, metadata, None 548 if credentials is None else credentials._credentials, 549 (operations,), event_handler) 550 return _Rendezvous(state, call, self._response_deserializer, 551 deadline) 552 553 554class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): 555 556 def __init__(self, channel, managed_call, method, request_serializer, 557 response_deserializer): 558 self._channel = channel 559 self._managed_call = managed_call 560 self._method = method 561 self._request_serializer = request_serializer 562 self._response_deserializer = response_deserializer 563 564 def __call__(self, request, timeout=None, metadata=None, credentials=None): 565 deadline, serialized_request, rendezvous = _start_unary_request( 566 request, timeout, self._request_serializer) 567 if serialized_request is None: 568 raise rendezvous 569 else: 570 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) 571 operationses = ( 572 ( 573 cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), 574 cygrpc.SendMessageOperation(serialized_request, 575 _EMPTY_FLAGS), 576 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 577 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 578 ), 579 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 580 ) 581 event_handler = _event_handler(state, self._response_deserializer) 582 call = self._managed_call( 583 0, self._method, None, deadline, metadata, None 584 if credentials is None else credentials._credentials, 585 operationses, event_handler) 586 return _Rendezvous(state, call, self._response_deserializer, 587 deadline) 588 589 590class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): 591 592 def __init__(self, channel, managed_call, method, request_serializer, 593 response_deserializer): 594 self._channel = channel 595 self._managed_call = managed_call 596 self._method = method 597 self._request_serializer = request_serializer 598 self._response_deserializer = response_deserializer 599 600 def _blocking(self, request_iterator, timeout, metadata, credentials): 601 deadline = _deadline(timeout) 602 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) 603 call = self._channel.segregated_call( 604 0, self._method, None, deadline, metadata, None 605 if credentials is None else credentials._credentials, 606 _stream_unary_invocation_operationses_and_tags(metadata)) 607 _consume_request_iterator(request_iterator, state, call, 608 self._request_serializer, None) 609 while True: 610 event = call.next_event() 611 with state.condition: 612 _handle_event(event, state, self._response_deserializer) 613 state.condition.notify_all() 614 if not state.due: 615 break 616 return state, call, 617 618 def __call__(self, 619 request_iterator, 620 timeout=None, 621 metadata=None, 622 credentials=None): 623 state, call, = self._blocking(request_iterator, timeout, metadata, 624 credentials) 625 return _end_unary_response_blocking(state, call, False, None) 626 627 def with_call(self, 628 request_iterator, 629 timeout=None, 630 metadata=None, 631 credentials=None): 632 state, call, = self._blocking(request_iterator, timeout, metadata, 633 credentials) 634 return _end_unary_response_blocking(state, call, True, None) 635 636 def future(self, 637 request_iterator, 638 timeout=None, 639 metadata=None, 640 credentials=None): 641 deadline = _deadline(timeout) 642 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) 643 event_handler = _event_handler(state, self._response_deserializer) 644 call = self._managed_call( 645 0, self._method, None, deadline, metadata, None 646 if credentials is None else credentials._credentials, 647 _stream_unary_invocation_operationses(metadata), event_handler) 648 _consume_request_iterator(request_iterator, state, call, 649 self._request_serializer, event_handler) 650 return _Rendezvous(state, call, self._response_deserializer, deadline) 651 652 653class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): 654 655 def __init__(self, channel, managed_call, method, request_serializer, 656 response_deserializer): 657 self._channel = channel 658 self._managed_call = managed_call 659 self._method = method 660 self._request_serializer = request_serializer 661 self._response_deserializer = response_deserializer 662 663 def __call__(self, 664 request_iterator, 665 timeout=None, 666 metadata=None, 667 credentials=None): 668 deadline = _deadline(timeout) 669 state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None) 670 operationses = ( 671 ( 672 cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), 673 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 674 ), 675 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 676 ) 677 event_handler = _event_handler(state, self._response_deserializer) 678 call = self._managed_call( 679 0, self._method, None, deadline, metadata, None 680 if credentials is None else credentials._credentials, operationses, 681 event_handler) 682 _consume_request_iterator(request_iterator, state, call, 683 self._request_serializer, event_handler) 684 return _Rendezvous(state, call, self._response_deserializer, deadline) 685 686 687class _ChannelCallState(object): 688 689 def __init__(self, channel): 690 self.lock = threading.Lock() 691 self.channel = channel 692 self.managed_calls = 0 693 self.threading = False 694 695 def reset_postfork_child(self): 696 self.managed_calls = 0 697 698 699def _run_channel_spin_thread(state): 700 701 def channel_spin(): 702 while True: 703 cygrpc.block_if_fork_in_progress(state) 704 event = state.channel.next_call_event() 705 if event.completion_type == cygrpc.CompletionType.queue_timeout: 706 continue 707 call_completed = event.tag(event) 708 if call_completed: 709 with state.lock: 710 state.managed_calls -= 1 711 if state.managed_calls == 0: 712 return 713 714 channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin) 715 channel_spin_thread.setDaemon(True) 716 channel_spin_thread.start() 717 718 719def _channel_managed_call_management(state): 720 721 # pylint: disable=too-many-arguments 722 def create(flags, method, host, deadline, metadata, credentials, 723 operationses, event_handler): 724 """Creates a cygrpc.IntegratedCall. 725 726 Args: 727 flags: An integer bitfield of call flags. 728 method: The RPC method. 729 host: A host string for the created call. 730 deadline: A float to be the deadline of the created call or None if 731 the call is to have an infinite deadline. 732 metadata: The metadata for the call or None. 733 credentials: A cygrpc.CallCredentials or None. 734 operationses: An iterable of iterables of cygrpc.Operations to be 735 started on the call. 736 event_handler: A behavior to call to handle the events resultant from 737 the operations on the call. 738 739 Returns: 740 A cygrpc.IntegratedCall with which to conduct an RPC. 741 """ 742 operationses_and_tags = tuple(( 743 operations, 744 event_handler, 745 ) for operations in operationses) 746 with state.lock: 747 call = state.channel.integrated_call(flags, method, host, deadline, 748 metadata, credentials, 749 operationses_and_tags) 750 if state.managed_calls == 0: 751 state.managed_calls = 1 752 _run_channel_spin_thread(state) 753 else: 754 state.managed_calls += 1 755 return call 756 757 return create 758 759 760class _ChannelConnectivityState(object): 761 762 def __init__(self, channel): 763 self.lock = threading.RLock() 764 self.channel = channel 765 self.polling = False 766 self.connectivity = None 767 self.try_to_connect = False 768 self.callbacks_and_connectivities = [] 769 self.delivering = False 770 771 def reset_postfork_child(self): 772 self.polling = False 773 self.connectivity = None 774 self.try_to_connect = False 775 self.callbacks_and_connectivities = [] 776 self.delivering = False 777 778 779def _deliveries(state): 780 callbacks_needing_update = [] 781 for callback_and_connectivity in state.callbacks_and_connectivities: 782 callback, callback_connectivity, = callback_and_connectivity 783 if callback_connectivity is not state.connectivity: 784 callbacks_needing_update.append(callback) 785 callback_and_connectivity[1] = state.connectivity 786 return callbacks_needing_update 787 788 789def _deliver(state, initial_connectivity, initial_callbacks): 790 connectivity = initial_connectivity 791 callbacks = initial_callbacks 792 while True: 793 for callback in callbacks: 794 cygrpc.block_if_fork_in_progress(state) 795 callable_util.call_logging_exceptions( 796 callback, _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE, 797 connectivity) 798 with state.lock: 799 callbacks = _deliveries(state) 800 if callbacks: 801 connectivity = state.connectivity 802 else: 803 state.delivering = False 804 return 805 806 807def _spawn_delivery(state, callbacks): 808 delivering_thread = cygrpc.ForkManagedThread( 809 target=_deliver, args=( 810 state, 811 state.connectivity, 812 callbacks, 813 )) 814 delivering_thread.start() 815 state.delivering = True 816 817 818# NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll. 819def _poll_connectivity(state, channel, initial_try_to_connect): 820 try_to_connect = initial_try_to_connect 821 connectivity = channel.check_connectivity_state(try_to_connect) 822 with state.lock: 823 state.connectivity = ( 824 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[ 825 connectivity]) 826 callbacks = tuple(callback 827 for callback, unused_but_known_to_be_none_connectivity 828 in state.callbacks_and_connectivities) 829 for callback_and_connectivity in state.callbacks_and_connectivities: 830 callback_and_connectivity[1] = state.connectivity 831 if callbacks: 832 _spawn_delivery(state, callbacks) 833 while True: 834 event = channel.watch_connectivity_state(connectivity, 835 time.time() + 0.2) 836 cygrpc.block_if_fork_in_progress(state) 837 with state.lock: 838 if not state.callbacks_and_connectivities and not state.try_to_connect: 839 state.polling = False 840 state.connectivity = None 841 break 842 try_to_connect = state.try_to_connect 843 state.try_to_connect = False 844 if event.success or try_to_connect: 845 connectivity = channel.check_connectivity_state(try_to_connect) 846 with state.lock: 847 state.connectivity = ( 848 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[ 849 connectivity]) 850 if not state.delivering: 851 callbacks = _deliveries(state) 852 if callbacks: 853 _spawn_delivery(state, callbacks) 854 855 856def _moot(state): 857 with state.lock: 858 del state.callbacks_and_connectivities[:] 859 860 861def _subscribe(state, callback, try_to_connect): 862 with state.lock: 863 if not state.callbacks_and_connectivities and not state.polling: 864 polling_thread = cygrpc.ForkManagedThread( 865 target=_poll_connectivity, 866 args=(state, state.channel, bool(try_to_connect))) 867 polling_thread.setDaemon(True) 868 polling_thread.start() 869 state.polling = True 870 state.callbacks_and_connectivities.append([callback, None]) 871 elif not state.delivering and state.connectivity is not None: 872 _spawn_delivery(state, (callback,)) 873 state.try_to_connect |= bool(try_to_connect) 874 state.callbacks_and_connectivities.append( 875 [callback, state.connectivity]) 876 else: 877 state.try_to_connect |= bool(try_to_connect) 878 state.callbacks_and_connectivities.append([callback, None]) 879 880 881def _unsubscribe(state, callback): 882 with state.lock: 883 for index, (subscribed_callback, unused_connectivity) in enumerate( 884 state.callbacks_and_connectivities): 885 if callback == subscribed_callback: 886 state.callbacks_and_connectivities.pop(index) 887 break 888 889 890def _options(options): 891 return list(options) + [ 892 ( 893 cygrpc.ChannelArgKey.primary_user_agent_string, 894 _USER_AGENT, 895 ), 896 ] 897 898 899class Channel(grpc.Channel): 900 """A cygrpc.Channel-backed implementation of grpc.Channel.""" 901 902 def __init__(self, target, options, credentials): 903 """Constructor. 904 905 Args: 906 target: The target to which to connect. 907 options: Configuration options for the channel. 908 credentials: A cygrpc.ChannelCredentials or None. 909 """ 910 self._channel = cygrpc.Channel( 911 _common.encode(target), _options(options), credentials) 912 self._call_state = _ChannelCallState(self._channel) 913 self._connectivity_state = _ChannelConnectivityState(self._channel) 914 cygrpc.fork_register_channel(self) 915 916 def subscribe(self, callback, try_to_connect=None): 917 _subscribe(self._connectivity_state, callback, try_to_connect) 918 919 def unsubscribe(self, callback): 920 _unsubscribe(self._connectivity_state, callback) 921 922 def unary_unary(self, 923 method, 924 request_serializer=None, 925 response_deserializer=None): 926 return _UnaryUnaryMultiCallable( 927 self._channel, _channel_managed_call_management(self._call_state), 928 _common.encode(method), request_serializer, response_deserializer) 929 930 def unary_stream(self, 931 method, 932 request_serializer=None, 933 response_deserializer=None): 934 return _UnaryStreamMultiCallable( 935 self._channel, _channel_managed_call_management(self._call_state), 936 _common.encode(method), request_serializer, response_deserializer) 937 938 def stream_unary(self, 939 method, 940 request_serializer=None, 941 response_deserializer=None): 942 return _StreamUnaryMultiCallable( 943 self._channel, _channel_managed_call_management(self._call_state), 944 _common.encode(method), request_serializer, response_deserializer) 945 946 def stream_stream(self, 947 method, 948 request_serializer=None, 949 response_deserializer=None): 950 return _StreamStreamMultiCallable( 951 self._channel, _channel_managed_call_management(self._call_state), 952 _common.encode(method), request_serializer, response_deserializer) 953 954 def _close(self): 955 self._channel.close(cygrpc.StatusCode.cancelled, 'Channel closed!') 956 _moot(self._connectivity_state) 957 958 def _close_on_fork(self): 959 self._channel.close_on_fork(cygrpc.StatusCode.cancelled, 960 'Channel closed due to fork') 961 _moot(self._connectivity_state) 962 963 def __enter__(self): 964 return self 965 966 def __exit__(self, exc_type, exc_val, exc_tb): 967 self._close() 968 return False 969 970 def close(self): 971 self._close() 972 973 def __del__(self): 974 # TODO(https://github.com/grpc/grpc/issues/12531): Several releases 975 # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call 976 # here (or more likely, call self._close() here). We don't do this today 977 # because many valid use cases today allow the channel to be deleted 978 # immediately after stubs are created. After a sufficient period of time 979 # has passed for all users to be trusted to hang out to their channels 980 # for as long as they are in use and to close them after using them, 981 # then deletion of this grpc._channel.Channel instance can be made to 982 # effect closure of the underlying cygrpc.Channel instance. 983 cygrpc.fork_unregister_channel(self) 984 _moot(self._connectivity_state) 985