• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2016 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Invocation-side implementation of gRPC Python."""
15
16import copy
17import functools
18import logging
19import os
20import sys
21import threading
22import time
23
24import grpc
25import grpc.experimental
26from grpc import _compression
27from grpc import _common
28from grpc import _grpcio_metadata
29from grpc._cython import cygrpc
30
31_LOGGER = logging.getLogger(__name__)
32
33_USER_AGENT = 'grpc-python/{}'.format(_grpcio_metadata.__version__)
34
35_EMPTY_FLAGS = 0
36
37# NOTE(rbellevi): No guarantees are given about the maintenance of this
38# environment variable.
39_DEFAULT_SINGLE_THREADED_UNARY_STREAM = os.getenv(
40    "GRPC_SINGLE_THREADED_UNARY_STREAM") is not None
41
42_UNARY_UNARY_INITIAL_DUE = (
43    cygrpc.OperationType.send_initial_metadata,
44    cygrpc.OperationType.send_message,
45    cygrpc.OperationType.send_close_from_client,
46    cygrpc.OperationType.receive_initial_metadata,
47    cygrpc.OperationType.receive_message,
48    cygrpc.OperationType.receive_status_on_client,
49)
50_UNARY_STREAM_INITIAL_DUE = (
51    cygrpc.OperationType.send_initial_metadata,
52    cygrpc.OperationType.send_message,
53    cygrpc.OperationType.send_close_from_client,
54    cygrpc.OperationType.receive_initial_metadata,
55    cygrpc.OperationType.receive_status_on_client,
56)
57_STREAM_UNARY_INITIAL_DUE = (
58    cygrpc.OperationType.send_initial_metadata,
59    cygrpc.OperationType.receive_initial_metadata,
60    cygrpc.OperationType.receive_message,
61    cygrpc.OperationType.receive_status_on_client,
62)
63_STREAM_STREAM_INITIAL_DUE = (
64    cygrpc.OperationType.send_initial_metadata,
65    cygrpc.OperationType.receive_initial_metadata,
66    cygrpc.OperationType.receive_status_on_client,
67)
68
69_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
70    'Exception calling channel subscription callback!')
71
72_OK_RENDEZVOUS_REPR_FORMAT = ('<{} of RPC that terminated with:\n'
73                              '\tstatus = {}\n'
74                              '\tdetails = "{}"\n'
75                              '>')
76
77_NON_OK_RENDEZVOUS_REPR_FORMAT = ('<{} of RPC that terminated with:\n'
78                                  '\tstatus = {}\n'
79                                  '\tdetails = "{}"\n'
80                                  '\tdebug_error_string = "{}"\n'
81                                  '>')
82
83
84def _deadline(timeout):
85    return None if timeout is None else time.time() + timeout
86
87
88def _unknown_code_details(unknown_cygrpc_code, details):
89    return 'Server sent unknown code {} and details "{}"'.format(
90        unknown_cygrpc_code, details)
91
92
93class _RPCState(object):
94
95    def __init__(self, due, initial_metadata, trailing_metadata, code, details):
96        self.condition = threading.Condition()
97        # The cygrpc.OperationType objects representing events due from the RPC's
98        # completion queue.
99        self.due = set(due)
100        self.initial_metadata = initial_metadata
101        self.response = None
102        self.trailing_metadata = trailing_metadata
103        self.code = code
104        self.details = details
105        self.debug_error_string = None
106        # The semantics of grpc.Future.cancel and grpc.Future.cancelled are
107        # slightly wonky, so they have to be tracked separately from the rest of the
108        # result of the RPC. This field tracks whether cancellation was requested
109        # prior to termination of the RPC.
110        self.cancelled = False
111        self.callbacks = []
112        self.fork_epoch = cygrpc.get_fork_epoch()
113
114    def reset_postfork_child(self):
115        self.condition = threading.Condition()
116
117
118def _abort(state, code, details):
119    if state.code is None:
120        state.code = code
121        state.details = details
122        if state.initial_metadata is None:
123            state.initial_metadata = ()
124        state.trailing_metadata = ()
125
126
127def _handle_event(event, state, response_deserializer):
128    callbacks = []
129    for batch_operation in event.batch_operations:
130        operation_type = batch_operation.type()
131        state.due.remove(operation_type)
132        if operation_type == cygrpc.OperationType.receive_initial_metadata:
133            state.initial_metadata = batch_operation.initial_metadata()
134        elif operation_type == cygrpc.OperationType.receive_message:
135            serialized_response = batch_operation.message()
136            if serialized_response is not None:
137                response = _common.deserialize(serialized_response,
138                                               response_deserializer)
139                if response is None:
140                    details = 'Exception deserializing response!'
141                    _abort(state, grpc.StatusCode.INTERNAL, details)
142                else:
143                    state.response = response
144        elif operation_type == cygrpc.OperationType.receive_status_on_client:
145            state.trailing_metadata = batch_operation.trailing_metadata()
146            if state.code is None:
147                code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get(
148                    batch_operation.code())
149                if code is None:
150                    state.code = grpc.StatusCode.UNKNOWN
151                    state.details = _unknown_code_details(
152                        code, batch_operation.details())
153                else:
154                    state.code = code
155                    state.details = batch_operation.details()
156                    state.debug_error_string = batch_operation.error_string()
157            callbacks.extend(state.callbacks)
158            state.callbacks = None
159    return callbacks
160
161
162def _event_handler(state, response_deserializer):
163
164    def handle_event(event):
165        with state.condition:
166            callbacks = _handle_event(event, state, response_deserializer)
167            state.condition.notify_all()
168            done = not state.due
169        for callback in callbacks:
170            try:
171                callback()
172            except Exception as e:  # pylint: disable=broad-except
173                # NOTE(rbellevi): We suppress but log errors here so as not to
174                # kill the channel spin thread.
175                logging.error('Exception in callback %s: %s',
176                              repr(callback.func), repr(e))
177        return done and state.fork_epoch >= cygrpc.get_fork_epoch()
178
179    return handle_event
180
181
182#pylint: disable=too-many-statements
183def _consume_request_iterator(request_iterator, state, call, request_serializer,
184                              event_handler):
185    """Consume a request iterator supplied by the user."""
186
187    def consume_request_iterator():  # pylint: disable=too-many-branches
188        # Iterate over the request iterator until it is exhausted or an error
189        # condition is encountered.
190        while True:
191            return_from_user_request_generator_invoked = False
192            try:
193                # The thread may die in user-code. Do not block fork for this.
194                cygrpc.enter_user_request_generator()
195                request = next(request_iterator)
196            except StopIteration:
197                break
198            except Exception:  # pylint: disable=broad-except
199                cygrpc.return_from_user_request_generator()
200                return_from_user_request_generator_invoked = True
201                code = grpc.StatusCode.UNKNOWN
202                details = 'Exception iterating requests!'
203                _LOGGER.exception(details)
204                call.cancel(_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
205                            details)
206                _abort(state, code, details)
207                return
208            finally:
209                if not return_from_user_request_generator_invoked:
210                    cygrpc.return_from_user_request_generator()
211            serialized_request = _common.serialize(request, request_serializer)
212            with state.condition:
213                if state.code is None and not state.cancelled:
214                    if serialized_request is None:
215                        code = grpc.StatusCode.INTERNAL
216                        details = 'Exception serializing request!'
217                        call.cancel(
218                            _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
219                            details)
220                        _abort(state, code, details)
221                        return
222                    else:
223                        operations = (cygrpc.SendMessageOperation(
224                            serialized_request, _EMPTY_FLAGS),)
225                        operating = call.operate(operations, event_handler)
226                        if operating:
227                            state.due.add(cygrpc.OperationType.send_message)
228                        else:
229                            return
230
231                        def _done():
232                            return (state.code is not None or
233                                    cygrpc.OperationType.send_message
234                                    not in state.due)
235
236                        _common.wait(state.condition.wait,
237                                     _done,
238                                     spin_cb=functools.partial(
239                                         cygrpc.block_if_fork_in_progress,
240                                         state))
241                        if state.code is not None:
242                            return
243                else:
244                    return
245        with state.condition:
246            if state.code is None:
247                operations = (
248                    cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),)
249                operating = call.operate(operations, event_handler)
250                if operating:
251                    state.due.add(cygrpc.OperationType.send_close_from_client)
252
253    consumption_thread = cygrpc.ForkManagedThread(
254        target=consume_request_iterator)
255    consumption_thread.setDaemon(True)
256    consumption_thread.start()
257
258
259def _rpc_state_string(class_name, rpc_state):
260    """Calculates error string for RPC."""
261    with rpc_state.condition:
262        if rpc_state.code is None:
263            return '<{} object>'.format(class_name)
264        elif rpc_state.code is grpc.StatusCode.OK:
265            return _OK_RENDEZVOUS_REPR_FORMAT.format(class_name, rpc_state.code,
266                                                     rpc_state.details)
267        else:
268            return _NON_OK_RENDEZVOUS_REPR_FORMAT.format(
269                class_name, rpc_state.code, rpc_state.details,
270                rpc_state.debug_error_string)
271
272
273class _InactiveRpcError(grpc.RpcError, grpc.Call, grpc.Future):
274    """An RPC error not tied to the execution of a particular RPC.
275
276    The RPC represented by the state object must not be in-progress or
277    cancelled.
278
279    Attributes:
280      _state: An instance of _RPCState.
281    """
282
283    def __init__(self, state):
284        with state.condition:
285            self._state = _RPCState((), copy.deepcopy(state.initial_metadata),
286                                    copy.deepcopy(state.trailing_metadata),
287                                    state.code, copy.deepcopy(state.details))
288            self._state.response = copy.copy(state.response)
289            self._state.debug_error_string = copy.copy(state.debug_error_string)
290
291    def initial_metadata(self):
292        return self._state.initial_metadata
293
294    def trailing_metadata(self):
295        return self._state.trailing_metadata
296
297    def code(self):
298        return self._state.code
299
300    def details(self):
301        return _common.decode(self._state.details)
302
303    def debug_error_string(self):
304        return _common.decode(self._state.debug_error_string)
305
306    def _repr(self):
307        return _rpc_state_string(self.__class__.__name__, self._state)
308
309    def __repr__(self):
310        return self._repr()
311
312    def __str__(self):
313        return self._repr()
314
315    def cancel(self):
316        """See grpc.Future.cancel."""
317        return False
318
319    def cancelled(self):
320        """See grpc.Future.cancelled."""
321        return False
322
323    def running(self):
324        """See grpc.Future.running."""
325        return False
326
327    def done(self):
328        """See grpc.Future.done."""
329        return True
330
331    def result(self, timeout=None):  # pylint: disable=unused-argument
332        """See grpc.Future.result."""
333        raise self
334
335    def exception(self, timeout=None):  # pylint: disable=unused-argument
336        """See grpc.Future.exception."""
337        return self
338
339    def traceback(self, timeout=None):  # pylint: disable=unused-argument
340        """See grpc.Future.traceback."""
341        try:
342            raise self
343        except grpc.RpcError:
344            return sys.exc_info()[2]
345
346    def add_done_callback(self, fn, timeout=None):  # pylint: disable=unused-argument
347        """See grpc.Future.add_done_callback."""
348        fn(self)
349
350
351class _Rendezvous(grpc.RpcError, grpc.RpcContext):
352    """An RPC iterator.
353
354    Attributes:
355      _state: An instance of _RPCState.
356      _call: An instance of SegregatedCall or IntegratedCall.
357        In either case, the _call object is expected to have operate, cancel,
358        and next_event methods.
359      _response_deserializer: A callable taking bytes and return a Python
360        object.
361      _deadline: A float representing the deadline of the RPC in seconds. Or
362        possibly None, to represent an RPC with no deadline at all.
363    """
364
365    def __init__(self, state, call, response_deserializer, deadline):
366        super(_Rendezvous, self).__init__()
367        self._state = state
368        self._call = call
369        self._response_deserializer = response_deserializer
370        self._deadline = deadline
371
372    def is_active(self):
373        """See grpc.RpcContext.is_active"""
374        with self._state.condition:
375            return self._state.code is None
376
377    def time_remaining(self):
378        """See grpc.RpcContext.time_remaining"""
379        with self._state.condition:
380            if self._deadline is None:
381                return None
382            else:
383                return max(self._deadline - time.time(), 0)
384
385    def cancel(self):
386        """See grpc.RpcContext.cancel"""
387        with self._state.condition:
388            if self._state.code is None:
389                code = grpc.StatusCode.CANCELLED
390                details = 'Locally cancelled by application!'
391                self._call.cancel(
392                    _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details)
393                self._state.cancelled = True
394                _abort(self._state, code, details)
395                self._state.condition.notify_all()
396                return True
397            else:
398                return False
399
400    def add_callback(self, callback):
401        """See grpc.RpcContext.add_callback"""
402        with self._state.condition:
403            if self._state.callbacks is None:
404                return False
405            else:
406                self._state.callbacks.append(callback)
407                return True
408
409    def __iter__(self):
410        return self
411
412    def next(self):
413        return self._next()
414
415    def __next__(self):
416        return self._next()
417
418    def _next(self):
419        raise NotImplementedError()
420
421    def debug_error_string(self):
422        raise NotImplementedError()
423
424    def _repr(self):
425        return _rpc_state_string(self.__class__.__name__, self._state)
426
427    def __repr__(self):
428        return self._repr()
429
430    def __str__(self):
431        return self._repr()
432
433    def __del__(self):
434        with self._state.condition:
435            if self._state.code is None:
436                self._state.code = grpc.StatusCode.CANCELLED
437                self._state.details = 'Cancelled upon garbage collection!'
438                self._state.cancelled = True
439                self._call.cancel(
440                    _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code],
441                    self._state.details)
442                self._state.condition.notify_all()
443
444
445class _SingleThreadedRendezvous(_Rendezvous, grpc.Call, grpc.Future):  # pylint: disable=too-many-ancestors
446    """An RPC iterator operating entirely on a single thread.
447
448    The __next__ method of _SingleThreadedRendezvous does not depend on the
449    existence of any other thread, including the "channel spin thread".
450    However, this means that its interface is entirely synchronous. So this
451    class cannot completely fulfill the grpc.Future interface. The result,
452    exception, and traceback methods will never block and will instead raise
453    an exception if calling the method would result in blocking.
454
455    This means that these methods are safe to call from add_done_callback
456    handlers.
457    """
458
459    def _is_complete(self):
460        return self._state.code is not None
461
462    def cancelled(self):
463        with self._state.condition:
464            return self._state.cancelled
465
466    def running(self):
467        with self._state.condition:
468            return self._state.code is None
469
470    def done(self):
471        with self._state.condition:
472            return self._state.code is not None
473
474    def result(self, timeout=None):
475        """Returns the result of the computation or raises its exception.
476
477        This method will never block. Instead, it will raise an exception
478        if calling this method would otherwise result in blocking.
479
480        Since this method will never block, any `timeout` argument passed will
481        be ignored.
482        """
483        del timeout
484        with self._state.condition:
485            if not self._is_complete():
486                raise grpc.experimental.UsageError(
487                    "_SingleThreadedRendezvous only supports result() when the RPC is complete."
488                )
489            if self._state.code is grpc.StatusCode.OK:
490                return self._state.response
491            elif self._state.cancelled:
492                raise grpc.FutureCancelledError()
493            else:
494                raise self
495
496    def exception(self, timeout=None):
497        """Return the exception raised by the computation.
498
499        This method will never block. Instead, it will raise an exception
500        if calling this method would otherwise result in blocking.
501
502        Since this method will never block, any `timeout` argument passed will
503        be ignored.
504        """
505        del timeout
506        with self._state.condition:
507            if not self._is_complete():
508                raise grpc.experimental.UsageError(
509                    "_SingleThreadedRendezvous only supports exception() when the RPC is complete."
510                )
511            if self._state.code is grpc.StatusCode.OK:
512                return None
513            elif self._state.cancelled:
514                raise grpc.FutureCancelledError()
515            else:
516                return self
517
518    def traceback(self, timeout=None):
519        """Access the traceback of the exception raised by the computation.
520
521        This method will never block. Instead, it will raise an exception
522        if calling this method would otherwise result in blocking.
523
524        Since this method will never block, any `timeout` argument passed will
525        be ignored.
526        """
527        del timeout
528        with self._state.condition:
529            if not self._is_complete():
530                raise grpc.experimental.UsageError(
531                    "_SingleThreadedRendezvous only supports traceback() when the RPC is complete."
532                )
533            if self._state.code is grpc.StatusCode.OK:
534                return None
535            elif self._state.cancelled:
536                raise grpc.FutureCancelledError()
537            else:
538                try:
539                    raise self
540                except grpc.RpcError:
541                    return sys.exc_info()[2]
542
543    def add_done_callback(self, fn):
544        with self._state.condition:
545            if self._state.code is None:
546                self._state.callbacks.append(functools.partial(fn, self))
547                return
548
549        fn(self)
550
551    def initial_metadata(self):
552        """See grpc.Call.initial_metadata"""
553        with self._state.condition:
554            # NOTE(gnossen): Based on our initial call batch, we are guaranteed
555            # to receive initial metadata before any messages.
556            while self._state.initial_metadata is None:
557                self._consume_next_event()
558            return self._state.initial_metadata
559
560    def trailing_metadata(self):
561        """See grpc.Call.trailing_metadata"""
562        with self._state.condition:
563            if self._state.trailing_metadata is None:
564                raise grpc.experimental.UsageError(
565                    "Cannot get trailing metadata until RPC is completed.")
566            return self._state.trailing_metadata
567
568    def code(self):
569        """See grpc.Call.code"""
570        with self._state.condition:
571            if self._state.code is None:
572                raise grpc.experimental.UsageError(
573                    "Cannot get code until RPC is completed.")
574            return self._state.code
575
576    def details(self):
577        """See grpc.Call.details"""
578        with self._state.condition:
579            if self._state.details is None:
580                raise grpc.experimental.UsageError(
581                    "Cannot get details until RPC is completed.")
582            return _common.decode(self._state.details)
583
584    def _consume_next_event(self):
585        event = self._call.next_event()
586        with self._state.condition:
587            callbacks = _handle_event(event, self._state,
588                                      self._response_deserializer)
589            for callback in callbacks:
590                # NOTE(gnossen): We intentionally allow exceptions to bubble up
591                # to the user when running on a single thread.
592                callback()
593        return event
594
595    def _next_response(self):
596        while True:
597            self._consume_next_event()
598            with self._state.condition:
599                if self._state.response is not None:
600                    response = self._state.response
601                    self._state.response = None
602                    return response
603                elif cygrpc.OperationType.receive_message not in self._state.due:
604                    if self._state.code is grpc.StatusCode.OK:
605                        raise StopIteration()
606                    elif self._state.code is not None:
607                        raise self
608
609    def _next(self):
610        with self._state.condition:
611            if self._state.code is None:
612                operating = self._call.operate(
613                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None)
614                if operating:
615                    self._state.due.add(cygrpc.OperationType.receive_message)
616            elif self._state.code is grpc.StatusCode.OK:
617                raise StopIteration()
618            else:
619                raise self
620        return self._next_response()
621
622    def debug_error_string(self):
623        with self._state.condition:
624            if self._state.debug_error_string is None:
625                raise grpc.experimental.UsageError(
626                    "Cannot get debug error string until RPC is completed.")
627            return _common.decode(self._state.debug_error_string)
628
629
630class _MultiThreadedRendezvous(_Rendezvous, grpc.Call, grpc.Future):  # pylint: disable=too-many-ancestors
631    """An RPC iterator that depends on a channel spin thread.
632
633    This iterator relies upon a per-channel thread running in the background,
634    dequeueing events from the completion queue, and notifying threads waiting
635    on the threading.Condition object in the _RPCState object.
636
637    This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface
638    and to mediate a bidirection streaming RPC.
639    """
640
641    def initial_metadata(self):
642        """See grpc.Call.initial_metadata"""
643        with self._state.condition:
644
645            def _done():
646                return self._state.initial_metadata is not None
647
648            _common.wait(self._state.condition.wait, _done)
649            return self._state.initial_metadata
650
651    def trailing_metadata(self):
652        """See grpc.Call.trailing_metadata"""
653        with self._state.condition:
654
655            def _done():
656                return self._state.trailing_metadata is not None
657
658            _common.wait(self._state.condition.wait, _done)
659            return self._state.trailing_metadata
660
661    def code(self):
662        """See grpc.Call.code"""
663        with self._state.condition:
664
665            def _done():
666                return self._state.code is not None
667
668            _common.wait(self._state.condition.wait, _done)
669            return self._state.code
670
671    def details(self):
672        """See grpc.Call.details"""
673        with self._state.condition:
674
675            def _done():
676                return self._state.details is not None
677
678            _common.wait(self._state.condition.wait, _done)
679            return _common.decode(self._state.details)
680
681    def debug_error_string(self):
682        with self._state.condition:
683
684            def _done():
685                return self._state.debug_error_string is not None
686
687            _common.wait(self._state.condition.wait, _done)
688            return _common.decode(self._state.debug_error_string)
689
690    def cancelled(self):
691        with self._state.condition:
692            return self._state.cancelled
693
694    def running(self):
695        with self._state.condition:
696            return self._state.code is None
697
698    def done(self):
699        with self._state.condition:
700            return self._state.code is not None
701
702    def _is_complete(self):
703        return self._state.code is not None
704
705    def result(self, timeout=None):
706        """Returns the result of the computation or raises its exception.
707
708        See grpc.Future.result for the full API contract.
709        """
710        with self._state.condition:
711            timed_out = _common.wait(self._state.condition.wait,
712                                     self._is_complete,
713                                     timeout=timeout)
714            if timed_out:
715                raise grpc.FutureTimeoutError()
716            else:
717                if self._state.code is grpc.StatusCode.OK:
718                    return self._state.response
719                elif self._state.cancelled:
720                    raise grpc.FutureCancelledError()
721                else:
722                    raise self
723
724    def exception(self, timeout=None):
725        """Return the exception raised by the computation.
726
727        See grpc.Future.exception for the full API contract.
728        """
729        with self._state.condition:
730            timed_out = _common.wait(self._state.condition.wait,
731                                     self._is_complete,
732                                     timeout=timeout)
733            if timed_out:
734                raise grpc.FutureTimeoutError()
735            else:
736                if self._state.code is grpc.StatusCode.OK:
737                    return None
738                elif self._state.cancelled:
739                    raise grpc.FutureCancelledError()
740                else:
741                    return self
742
743    def traceback(self, timeout=None):
744        """Access the traceback of the exception raised by the computation.
745
746        See grpc.future.traceback for the full API contract.
747        """
748        with self._state.condition:
749            timed_out = _common.wait(self._state.condition.wait,
750                                     self._is_complete,
751                                     timeout=timeout)
752            if timed_out:
753                raise grpc.FutureTimeoutError()
754            else:
755                if self._state.code is grpc.StatusCode.OK:
756                    return None
757                elif self._state.cancelled:
758                    raise grpc.FutureCancelledError()
759                else:
760                    try:
761                        raise self
762                    except grpc.RpcError:
763                        return sys.exc_info()[2]
764
765    def add_done_callback(self, fn):
766        with self._state.condition:
767            if self._state.code is None:
768                self._state.callbacks.append(functools.partial(fn, self))
769                return
770
771        fn(self)
772
773    def _next(self):
774        with self._state.condition:
775            if self._state.code is None:
776                event_handler = _event_handler(self._state,
777                                               self._response_deserializer)
778                operating = self._call.operate(
779                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
780                    event_handler)
781                if operating:
782                    self._state.due.add(cygrpc.OperationType.receive_message)
783            elif self._state.code is grpc.StatusCode.OK:
784                raise StopIteration()
785            else:
786                raise self
787
788            def _response_ready():
789                return (self._state.response is not None or
790                        (cygrpc.OperationType.receive_message
791                         not in self._state.due and
792                         self._state.code is not None))
793
794            _common.wait(self._state.condition.wait, _response_ready)
795            if self._state.response is not None:
796                response = self._state.response
797                self._state.response = None
798                return response
799            elif cygrpc.OperationType.receive_message not in self._state.due:
800                if self._state.code is grpc.StatusCode.OK:
801                    raise StopIteration()
802                elif self._state.code is not None:
803                    raise self
804
805
806def _start_unary_request(request, timeout, request_serializer):
807    deadline = _deadline(timeout)
808    serialized_request = _common.serialize(request, request_serializer)
809    if serialized_request is None:
810        state = _RPCState((), (), (), grpc.StatusCode.INTERNAL,
811                          'Exception serializing request!')
812        error = _InactiveRpcError(state)
813        return deadline, None, error
814    else:
815        return deadline, serialized_request, None
816
817
818def _end_unary_response_blocking(state, call, with_call, deadline):
819    if state.code is grpc.StatusCode.OK:
820        if with_call:
821            rendezvous = _MultiThreadedRendezvous(state, call, None, deadline)
822            return state.response, rendezvous
823        else:
824            return state.response
825    else:
826        raise _InactiveRpcError(state)
827
828
829def _stream_unary_invocation_operationses(metadata, initial_metadata_flags):
830    return (
831        (
832            cygrpc.SendInitialMetadataOperation(metadata,
833                                                initial_metadata_flags),
834            cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
835            cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
836        ),
837        (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
838    )
839
840
841def _stream_unary_invocation_operationses_and_tags(metadata,
842                                                   initial_metadata_flags):
843    return tuple((
844        operations,
845        None,
846    ) for operations in _stream_unary_invocation_operationses(
847        metadata, initial_metadata_flags))
848
849
850def _determine_deadline(user_deadline):
851    parent_deadline = cygrpc.get_deadline_from_context()
852    if parent_deadline is None and user_deadline is None:
853        return None
854    elif parent_deadline is not None and user_deadline is None:
855        return parent_deadline
856    elif user_deadline is not None and parent_deadline is None:
857        return user_deadline
858    else:
859        return min(parent_deadline, user_deadline)
860
861
862class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
863
864    # pylint: disable=too-many-arguments
865    def __init__(self, channel, managed_call, method, request_serializer,
866                 response_deserializer):
867        self._channel = channel
868        self._managed_call = managed_call
869        self._method = method
870        self._request_serializer = request_serializer
871        self._response_deserializer = response_deserializer
872        self._context = cygrpc.build_census_context()
873
874    def _prepare(self, request, timeout, metadata, wait_for_ready, compression):
875        deadline, serialized_request, rendezvous = _start_unary_request(
876            request, timeout, self._request_serializer)
877        initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
878            wait_for_ready)
879        augmented_metadata = _compression.augment_metadata(
880            metadata, compression)
881        if serialized_request is None:
882            return None, None, None, rendezvous
883        else:
884            state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None)
885            operations = (
886                cygrpc.SendInitialMetadataOperation(augmented_metadata,
887                                                    initial_metadata_flags),
888                cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
889                cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
890                cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
891                cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
892                cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
893            )
894            return state, operations, deadline, None
895
896    def _blocking(self, request, timeout, metadata, credentials, wait_for_ready,
897                  compression):
898        state, operations, deadline, rendezvous = self._prepare(
899            request, timeout, metadata, wait_for_ready, compression)
900        if state is None:
901            raise rendezvous  # pylint: disable-msg=raising-bad-type
902        else:
903            call = self._channel.segregated_call(
904                cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
905                self._method, None, _determine_deadline(deadline), metadata,
906                None if credentials is None else credentials._credentials, ((
907                    operations,
908                    None,
909                ),), self._context)
910            event = call.next_event()
911            _handle_event(event, state, self._response_deserializer)
912            return state, call
913
914    def __call__(self,
915                 request,
916                 timeout=None,
917                 metadata=None,
918                 credentials=None,
919                 wait_for_ready=None,
920                 compression=None):
921        state, call, = self._blocking(request, timeout, metadata, credentials,
922                                      wait_for_ready, compression)
923        return _end_unary_response_blocking(state, call, False, None)
924
925    def with_call(self,
926                  request,
927                  timeout=None,
928                  metadata=None,
929                  credentials=None,
930                  wait_for_ready=None,
931                  compression=None):
932        state, call, = self._blocking(request, timeout, metadata, credentials,
933                                      wait_for_ready, compression)
934        return _end_unary_response_blocking(state, call, True, None)
935
936    def future(self,
937               request,
938               timeout=None,
939               metadata=None,
940               credentials=None,
941               wait_for_ready=None,
942               compression=None):
943        state, operations, deadline, rendezvous = self._prepare(
944            request, timeout, metadata, wait_for_ready, compression)
945        if state is None:
946            raise rendezvous  # pylint: disable-msg=raising-bad-type
947        else:
948            event_handler = _event_handler(state, self._response_deserializer)
949            call = self._managed_call(
950                cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
951                self._method, None, deadline, metadata,
952                None if credentials is None else credentials._credentials,
953                (operations,), event_handler, self._context)
954            return _MultiThreadedRendezvous(state, call,
955                                            self._response_deserializer,
956                                            deadline)
957
958
959class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
960
961    # pylint: disable=too-many-arguments
962    def __init__(self, channel, method, request_serializer,
963                 response_deserializer):
964        self._channel = channel
965        self._method = method
966        self._request_serializer = request_serializer
967        self._response_deserializer = response_deserializer
968        self._context = cygrpc.build_census_context()
969
970    def __call__(  # pylint: disable=too-many-locals
971            self,
972            request,
973            timeout=None,
974            metadata=None,
975            credentials=None,
976            wait_for_ready=None,
977            compression=None):
978        deadline = _deadline(timeout)
979        serialized_request = _common.serialize(request,
980                                               self._request_serializer)
981        if serialized_request is None:
982            state = _RPCState((), (), (), grpc.StatusCode.INTERNAL,
983                              'Exception serializing request!')
984            raise _InactiveRpcError(state)
985
986        state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
987        call_credentials = None if credentials is None else credentials._credentials
988        initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
989            wait_for_ready)
990        augmented_metadata = _compression.augment_metadata(
991            metadata, compression)
992        operations = (
993            (cygrpc.SendInitialMetadataOperation(augmented_metadata,
994                                                 initial_metadata_flags),
995             cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
996             cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS)),
997            (cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),),
998            (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
999        )
1000        operations_and_tags = tuple((ops, None) for ops in operations)
1001        call = self._channel.segregated_call(
1002            cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,
1003            None, _determine_deadline(deadline), metadata, call_credentials,
1004            operations_and_tags, self._context)
1005        return _SingleThreadedRendezvous(state, call,
1006                                         self._response_deserializer, deadline)
1007
1008
1009class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
1010
1011    # pylint: disable=too-many-arguments
1012    def __init__(self, channel, managed_call, method, request_serializer,
1013                 response_deserializer):
1014        self._channel = channel
1015        self._managed_call = managed_call
1016        self._method = method
1017        self._request_serializer = request_serializer
1018        self._response_deserializer = response_deserializer
1019        self._context = cygrpc.build_census_context()
1020
1021    def __call__(  # pylint: disable=too-many-locals
1022            self,
1023            request,
1024            timeout=None,
1025            metadata=None,
1026            credentials=None,
1027            wait_for_ready=None,
1028            compression=None):
1029        deadline, serialized_request, rendezvous = _start_unary_request(
1030            request, timeout, self._request_serializer)
1031        initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1032            wait_for_ready)
1033        if serialized_request is None:
1034            raise rendezvous  # pylint: disable-msg=raising-bad-type
1035        else:
1036            augmented_metadata = _compression.augment_metadata(
1037                metadata, compression)
1038            state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
1039            operationses = (
1040                (
1041                    cygrpc.SendInitialMetadataOperation(augmented_metadata,
1042                                                        initial_metadata_flags),
1043                    cygrpc.SendMessageOperation(serialized_request,
1044                                                _EMPTY_FLAGS),
1045                    cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
1046                    cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1047                ),
1048                (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1049            )
1050            call = self._managed_call(
1051                cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1052                self._method, None, _determine_deadline(deadline), metadata,
1053                None if credentials is None else credentials._credentials,
1054                operationses, _event_handler(state,
1055                                             self._response_deserializer),
1056                self._context)
1057            return _MultiThreadedRendezvous(state, call,
1058                                            self._response_deserializer,
1059                                            deadline)
1060
1061
1062class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
1063
1064    # pylint: disable=too-many-arguments
1065    def __init__(self, channel, managed_call, method, request_serializer,
1066                 response_deserializer):
1067        self._channel = channel
1068        self._managed_call = managed_call
1069        self._method = method
1070        self._request_serializer = request_serializer
1071        self._response_deserializer = response_deserializer
1072        self._context = cygrpc.build_census_context()
1073
1074    def _blocking(self, request_iterator, timeout, metadata, credentials,
1075                  wait_for_ready, compression):
1076        deadline = _deadline(timeout)
1077        state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
1078        initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1079            wait_for_ready)
1080        augmented_metadata = _compression.augment_metadata(
1081            metadata, compression)
1082        call = self._channel.segregated_call(
1083            cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,
1084            None, _determine_deadline(deadline), augmented_metadata,
1085            None if credentials is None else credentials._credentials,
1086            _stream_unary_invocation_operationses_and_tags(
1087                augmented_metadata, initial_metadata_flags), self._context)
1088        _consume_request_iterator(request_iterator, state, call,
1089                                  self._request_serializer, None)
1090        while True:
1091            event = call.next_event()
1092            with state.condition:
1093                _handle_event(event, state, self._response_deserializer)
1094                state.condition.notify_all()
1095                if not state.due:
1096                    break
1097        return state, call
1098
1099    def __call__(self,
1100                 request_iterator,
1101                 timeout=None,
1102                 metadata=None,
1103                 credentials=None,
1104                 wait_for_ready=None,
1105                 compression=None):
1106        state, call, = self._blocking(request_iterator, timeout, metadata,
1107                                      credentials, wait_for_ready, compression)
1108        return _end_unary_response_blocking(state, call, False, None)
1109
1110    def with_call(self,
1111                  request_iterator,
1112                  timeout=None,
1113                  metadata=None,
1114                  credentials=None,
1115                  wait_for_ready=None,
1116                  compression=None):
1117        state, call, = self._blocking(request_iterator, timeout, metadata,
1118                                      credentials, wait_for_ready, compression)
1119        return _end_unary_response_blocking(state, call, True, None)
1120
1121    def future(self,
1122               request_iterator,
1123               timeout=None,
1124               metadata=None,
1125               credentials=None,
1126               wait_for_ready=None,
1127               compression=None):
1128        deadline = _deadline(timeout)
1129        state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
1130        event_handler = _event_handler(state, self._response_deserializer)
1131        initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1132            wait_for_ready)
1133        augmented_metadata = _compression.augment_metadata(
1134            metadata, compression)
1135        call = self._managed_call(
1136            cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,
1137            None, deadline, augmented_metadata,
1138            None if credentials is None else credentials._credentials,
1139            _stream_unary_invocation_operationses(metadata,
1140                                                  initial_metadata_flags),
1141            event_handler, self._context)
1142        _consume_request_iterator(request_iterator, state, call,
1143                                  self._request_serializer, event_handler)
1144        return _MultiThreadedRendezvous(state, call,
1145                                        self._response_deserializer, deadline)
1146
1147
1148class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
1149
1150    # pylint: disable=too-many-arguments
1151    def __init__(self, channel, managed_call, method, request_serializer,
1152                 response_deserializer):
1153        self._channel = channel
1154        self._managed_call = managed_call
1155        self._method = method
1156        self._request_serializer = request_serializer
1157        self._response_deserializer = response_deserializer
1158        self._context = cygrpc.build_census_context()
1159
1160    def __call__(self,
1161                 request_iterator,
1162                 timeout=None,
1163                 metadata=None,
1164                 credentials=None,
1165                 wait_for_ready=None,
1166                 compression=None):
1167        deadline = _deadline(timeout)
1168        state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None)
1169        initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1170            wait_for_ready)
1171        augmented_metadata = _compression.augment_metadata(
1172            metadata, compression)
1173        operationses = (
1174            (
1175                cygrpc.SendInitialMetadataOperation(augmented_metadata,
1176                                                    initial_metadata_flags),
1177                cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1178            ),
1179            (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1180        )
1181        event_handler = _event_handler(state, self._response_deserializer)
1182        call = self._managed_call(
1183            cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,
1184            None, _determine_deadline(deadline), augmented_metadata,
1185            None if credentials is None else credentials._credentials,
1186            operationses, event_handler, self._context)
1187        _consume_request_iterator(request_iterator, state, call,
1188                                  self._request_serializer, event_handler)
1189        return _MultiThreadedRendezvous(state, call,
1190                                        self._response_deserializer, deadline)
1191
1192
1193class _InitialMetadataFlags(int):
1194    """Stores immutable initial metadata flags"""
1195
1196    def __new__(cls, value=_EMPTY_FLAGS):
1197        value &= cygrpc.InitialMetadataFlags.used_mask
1198        return super(_InitialMetadataFlags, cls).__new__(cls, value)
1199
1200    def with_wait_for_ready(self, wait_for_ready):
1201        if wait_for_ready is not None:
1202            if wait_for_ready:
1203                return self.__class__(self | cygrpc.InitialMetadataFlags.wait_for_ready | \
1204                    cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set)
1205            elif not wait_for_ready:
1206                return self.__class__(self & ~cygrpc.InitialMetadataFlags.wait_for_ready | \
1207                    cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set)
1208        return self
1209
1210
1211class _ChannelCallState(object):
1212
1213    def __init__(self, channel):
1214        self.lock = threading.Lock()
1215        self.channel = channel
1216        self.managed_calls = 0
1217        self.threading = False
1218
1219    def reset_postfork_child(self):
1220        self.managed_calls = 0
1221
1222    def __del__(self):
1223        try:
1224            self.channel.close(cygrpc.StatusCode.cancelled,
1225                               'Channel deallocated!')
1226        except (TypeError, AttributeError):
1227            pass
1228
1229
1230def _run_channel_spin_thread(state):
1231
1232    def channel_spin():
1233        while True:
1234            cygrpc.block_if_fork_in_progress(state)
1235            event = state.channel.next_call_event()
1236            if event.completion_type == cygrpc.CompletionType.queue_timeout:
1237                continue
1238            call_completed = event.tag(event)
1239            if call_completed:
1240                with state.lock:
1241                    state.managed_calls -= 1
1242                    if state.managed_calls == 0:
1243                        return
1244
1245    channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin)
1246    channel_spin_thread.setDaemon(True)
1247    channel_spin_thread.start()
1248
1249
1250def _channel_managed_call_management(state):
1251
1252    # pylint: disable=too-many-arguments
1253    def create(flags, method, host, deadline, metadata, credentials,
1254               operationses, event_handler, context):
1255        """Creates a cygrpc.IntegratedCall.
1256
1257        Args:
1258          flags: An integer bitfield of call flags.
1259          method: The RPC method.
1260          host: A host string for the created call.
1261          deadline: A float to be the deadline of the created call or None if
1262            the call is to have an infinite deadline.
1263          metadata: The metadata for the call or None.
1264          credentials: A cygrpc.CallCredentials or None.
1265          operationses: An iterable of iterables of cygrpc.Operations to be
1266            started on the call.
1267          event_handler: A behavior to call to handle the events resultant from
1268            the operations on the call.
1269          context: Context object for distributed tracing.
1270        Returns:
1271          A cygrpc.IntegratedCall with which to conduct an RPC.
1272        """
1273        operationses_and_tags = tuple((
1274            operations,
1275            event_handler,
1276        ) for operations in operationses)
1277        with state.lock:
1278            call = state.channel.integrated_call(flags, method, host, deadline,
1279                                                 metadata, credentials,
1280                                                 operationses_and_tags, context)
1281            if state.managed_calls == 0:
1282                state.managed_calls = 1
1283                _run_channel_spin_thread(state)
1284            else:
1285                state.managed_calls += 1
1286            return call
1287
1288    return create
1289
1290
1291class _ChannelConnectivityState(object):
1292
1293    def __init__(self, channel):
1294        self.lock = threading.RLock()
1295        self.channel = channel
1296        self.polling = False
1297        self.connectivity = None
1298        self.try_to_connect = False
1299        self.callbacks_and_connectivities = []
1300        self.delivering = False
1301
1302    def reset_postfork_child(self):
1303        self.polling = False
1304        self.connectivity = None
1305        self.try_to_connect = False
1306        self.callbacks_and_connectivities = []
1307        self.delivering = False
1308
1309
1310def _deliveries(state):
1311    callbacks_needing_update = []
1312    for callback_and_connectivity in state.callbacks_and_connectivities:
1313        callback, callback_connectivity, = callback_and_connectivity
1314        if callback_connectivity is not state.connectivity:
1315            callbacks_needing_update.append(callback)
1316            callback_and_connectivity[1] = state.connectivity
1317    return callbacks_needing_update
1318
1319
1320def _deliver(state, initial_connectivity, initial_callbacks):
1321    connectivity = initial_connectivity
1322    callbacks = initial_callbacks
1323    while True:
1324        for callback in callbacks:
1325            cygrpc.block_if_fork_in_progress(state)
1326            try:
1327                callback(connectivity)
1328            except Exception:  # pylint: disable=broad-except
1329                _LOGGER.exception(
1330                    _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE)
1331        with state.lock:
1332            callbacks = _deliveries(state)
1333            if callbacks:
1334                connectivity = state.connectivity
1335            else:
1336                state.delivering = False
1337                return
1338
1339
1340def _spawn_delivery(state, callbacks):
1341    delivering_thread = cygrpc.ForkManagedThread(target=_deliver,
1342                                                 args=(
1343                                                     state,
1344                                                     state.connectivity,
1345                                                     callbacks,
1346                                                 ))
1347    delivering_thread.setDaemon(True)
1348    delivering_thread.start()
1349    state.delivering = True
1350
1351
1352# NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll.
1353def _poll_connectivity(state, channel, initial_try_to_connect):
1354    try_to_connect = initial_try_to_connect
1355    connectivity = channel.check_connectivity_state(try_to_connect)
1356    with state.lock:
1357        state.connectivity = (
1358            _common.
1359            CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[connectivity])
1360        callbacks = tuple(
1361            callback for callback, unused_but_known_to_be_none_connectivity in
1362            state.callbacks_and_connectivities)
1363        for callback_and_connectivity in state.callbacks_and_connectivities:
1364            callback_and_connectivity[1] = state.connectivity
1365        if callbacks:
1366            _spawn_delivery(state, callbacks)
1367    while True:
1368        event = channel.watch_connectivity_state(connectivity,
1369                                                 time.time() + 0.2)
1370        cygrpc.block_if_fork_in_progress(state)
1371        with state.lock:
1372            if not state.callbacks_and_connectivities and not state.try_to_connect:
1373                state.polling = False
1374                state.connectivity = None
1375                break
1376            try_to_connect = state.try_to_connect
1377            state.try_to_connect = False
1378        if event.success or try_to_connect:
1379            connectivity = channel.check_connectivity_state(try_to_connect)
1380            with state.lock:
1381                state.connectivity = (
1382                    _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
1383                        connectivity])
1384                if not state.delivering:
1385                    callbacks = _deliveries(state)
1386                    if callbacks:
1387                        _spawn_delivery(state, callbacks)
1388
1389
1390def _subscribe(state, callback, try_to_connect):
1391    with state.lock:
1392        if not state.callbacks_and_connectivities and not state.polling:
1393            polling_thread = cygrpc.ForkManagedThread(
1394                target=_poll_connectivity,
1395                args=(state, state.channel, bool(try_to_connect)))
1396            polling_thread.setDaemon(True)
1397            polling_thread.start()
1398            state.polling = True
1399            state.callbacks_and_connectivities.append([callback, None])
1400        elif not state.delivering and state.connectivity is not None:
1401            _spawn_delivery(state, (callback,))
1402            state.try_to_connect |= bool(try_to_connect)
1403            state.callbacks_and_connectivities.append(
1404                [callback, state.connectivity])
1405        else:
1406            state.try_to_connect |= bool(try_to_connect)
1407            state.callbacks_and_connectivities.append([callback, None])
1408
1409
1410def _unsubscribe(state, callback):
1411    with state.lock:
1412        for index, (subscribed_callback, unused_connectivity) in enumerate(
1413                state.callbacks_and_connectivities):
1414            if callback == subscribed_callback:
1415                state.callbacks_and_connectivities.pop(index)
1416                break
1417
1418
1419def _augment_options(base_options, compression):
1420    compression_option = _compression.create_channel_option(compression)
1421    return tuple(base_options) + compression_option + ((
1422        cygrpc.ChannelArgKey.primary_user_agent_string,
1423        _USER_AGENT,
1424    ),)
1425
1426
1427def _separate_channel_options(options):
1428    """Separates core channel options from Python channel options."""
1429    core_options = []
1430    python_options = []
1431    for pair in options:
1432        if pair[0] == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream:
1433            python_options.append(pair)
1434        else:
1435            core_options.append(pair)
1436    return python_options, core_options
1437
1438
1439class Channel(grpc.Channel):
1440    """A cygrpc.Channel-backed implementation of grpc.Channel."""
1441
1442    def __init__(self, target, options, credentials, compression):
1443        """Constructor.
1444
1445        Args:
1446          target: The target to which to connect.
1447          options: Configuration options for the channel.
1448          credentials: A cygrpc.ChannelCredentials or None.
1449          compression: An optional value indicating the compression method to be
1450            used over the lifetime of the channel.
1451        """
1452        python_options, core_options = _separate_channel_options(options)
1453        self._single_threaded_unary_stream = _DEFAULT_SINGLE_THREADED_UNARY_STREAM
1454        self._process_python_options(python_options)
1455        self._channel = cygrpc.Channel(
1456            _common.encode(target), _augment_options(core_options, compression),
1457            credentials)
1458        self._call_state = _ChannelCallState(self._channel)
1459        self._connectivity_state = _ChannelConnectivityState(self._channel)
1460        cygrpc.fork_register_channel(self)
1461
1462    def _process_python_options(self, python_options):
1463        """Sets channel attributes according to python-only channel options."""
1464        for pair in python_options:
1465            if pair[0] == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream:
1466                self._single_threaded_unary_stream = True
1467
1468    def subscribe(self, callback, try_to_connect=None):
1469        _subscribe(self._connectivity_state, callback, try_to_connect)
1470
1471    def unsubscribe(self, callback):
1472        _unsubscribe(self._connectivity_state, callback)
1473
1474    def unary_unary(self,
1475                    method,
1476                    request_serializer=None,
1477                    response_deserializer=None):
1478        return _UnaryUnaryMultiCallable(
1479            self._channel, _channel_managed_call_management(self._call_state),
1480            _common.encode(method), request_serializer, response_deserializer)
1481
1482    def unary_stream(self,
1483                     method,
1484                     request_serializer=None,
1485                     response_deserializer=None):
1486        # NOTE(rbellevi): Benchmarks have shown that running a unary-stream RPC
1487        # on a single Python thread results in an appreciable speed-up. However,
1488        # due to slight differences in capability, the multi-threaded variant
1489        # remains the default.
1490        if self._single_threaded_unary_stream:
1491            return _SingleThreadedUnaryStreamMultiCallable(
1492                self._channel, _common.encode(method), request_serializer,
1493                response_deserializer)
1494        else:
1495            return _UnaryStreamMultiCallable(
1496                self._channel,
1497                _channel_managed_call_management(self._call_state),
1498                _common.encode(method), request_serializer,
1499                response_deserializer)
1500
1501    def stream_unary(self,
1502                     method,
1503                     request_serializer=None,
1504                     response_deserializer=None):
1505        return _StreamUnaryMultiCallable(
1506            self._channel, _channel_managed_call_management(self._call_state),
1507            _common.encode(method), request_serializer, response_deserializer)
1508
1509    def stream_stream(self,
1510                      method,
1511                      request_serializer=None,
1512                      response_deserializer=None):
1513        return _StreamStreamMultiCallable(
1514            self._channel, _channel_managed_call_management(self._call_state),
1515            _common.encode(method), request_serializer, response_deserializer)
1516
1517    def _unsubscribe_all(self):
1518        state = self._connectivity_state
1519        if state:
1520            with state.lock:
1521                del state.callbacks_and_connectivities[:]
1522
1523    def _close(self):
1524        self._unsubscribe_all()
1525        self._channel.close(cygrpc.StatusCode.cancelled, 'Channel closed!')
1526        cygrpc.fork_unregister_channel(self)
1527
1528    def _close_on_fork(self):
1529        self._unsubscribe_all()
1530        self._channel.close_on_fork(cygrpc.StatusCode.cancelled,
1531                                    'Channel closed due to fork')
1532
1533    def __enter__(self):
1534        return self
1535
1536    def __exit__(self, exc_type, exc_val, exc_tb):
1537        self._close()
1538        return False
1539
1540    def close(self):
1541        self._close()
1542
1543    def __del__(self):
1544        # TODO(https://github.com/grpc/grpc/issues/12531): Several releases
1545        # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call
1546        # here (or more likely, call self._close() here). We don't do this today
1547        # because many valid use cases today allow the channel to be deleted
1548        # immediately after stubs are created. After a sufficient period of time
1549        # has passed for all users to be trusted to hang out to their channels
1550        # for as long as they are in use and to close them after using them,
1551        # then deletion of this grpc._channel.Channel instance can be made to
1552        # effect closure of the underlying cygrpc.Channel instance.
1553        try:
1554            self._unsubscribe_all()
1555        except:  # pylint: disable=bare-except
1556            # Exceptions in __del__ are ignored by Python anyway, but they can
1557            # keep spamming logs.  Just silence them.
1558            pass
1559