• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2016 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Invocation-side implementation of gRPC Python."""
15
16import logging
17import sys
18import threading
19import time
20
21import grpc
22from grpc import _common
23from grpc import _grpcio_metadata
24from grpc._cython import cygrpc
25from grpc.framework.foundation import callable_util
26
27logging.basicConfig()
28_LOGGER = logging.getLogger(__name__)
29
30_USER_AGENT = 'grpc-python/{}'.format(_grpcio_metadata.__version__)
31
32_EMPTY_FLAGS = 0
33
34_UNARY_UNARY_INITIAL_DUE = (
35    cygrpc.OperationType.send_initial_metadata,
36    cygrpc.OperationType.send_message,
37    cygrpc.OperationType.send_close_from_client,
38    cygrpc.OperationType.receive_initial_metadata,
39    cygrpc.OperationType.receive_message,
40    cygrpc.OperationType.receive_status_on_client,
41)
42_UNARY_STREAM_INITIAL_DUE = (
43    cygrpc.OperationType.send_initial_metadata,
44    cygrpc.OperationType.send_message,
45    cygrpc.OperationType.send_close_from_client,
46    cygrpc.OperationType.receive_initial_metadata,
47    cygrpc.OperationType.receive_status_on_client,
48)
49_STREAM_UNARY_INITIAL_DUE = (
50    cygrpc.OperationType.send_initial_metadata,
51    cygrpc.OperationType.receive_initial_metadata,
52    cygrpc.OperationType.receive_message,
53    cygrpc.OperationType.receive_status_on_client,
54)
55_STREAM_STREAM_INITIAL_DUE = (
56    cygrpc.OperationType.send_initial_metadata,
57    cygrpc.OperationType.receive_initial_metadata,
58    cygrpc.OperationType.receive_status_on_client,
59)
60
61_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
62    'Exception calling channel subscription callback!')
63
64_OK_RENDEZVOUS_REPR_FORMAT = ('<_Rendezvous of RPC that terminated with:\n'
65                              '\tstatus = {}\n'
66                              '\tdetails = "{}"\n'
67                              '>')
68
69_NON_OK_RENDEZVOUS_REPR_FORMAT = ('<_Rendezvous of RPC that terminated with:\n'
70                                  '\tstatus = {}\n'
71                                  '\tdetails = "{}"\n'
72                                  '\tdebug_error_string = "{}"\n'
73                                  '>')
74
75
76def _deadline(timeout):
77    return None if timeout is None else time.time() + timeout
78
79
80def _unknown_code_details(unknown_cygrpc_code, details):
81    return 'Server sent unknown code {} and details "{}"'.format(
82        unknown_cygrpc_code, details)
83
84
85def _wait_once_until(condition, until):
86    if until is None:
87        condition.wait()
88    else:
89        remaining = until - time.time()
90        if remaining < 0:
91            raise grpc.FutureTimeoutError()
92        else:
93            condition.wait(timeout=remaining)
94
95
96class _RPCState(object):
97
98    def __init__(self, due, initial_metadata, trailing_metadata, code, details):
99        self.condition = threading.Condition()
100        # The cygrpc.OperationType objects representing events due from the RPC's
101        # completion queue.
102        self.due = set(due)
103        self.initial_metadata = initial_metadata
104        self.response = None
105        self.trailing_metadata = trailing_metadata
106        self.code = code
107        self.details = details
108        self.debug_error_string = None
109        # The semantics of grpc.Future.cancel and grpc.Future.cancelled are
110        # slightly wonky, so they have to be tracked separately from the rest of the
111        # result of the RPC. This field tracks whether cancellation was requested
112        # prior to termination of the RPC.
113        self.cancelled = False
114        self.callbacks = []
115        self.fork_epoch = cygrpc.get_fork_epoch()
116
117    def reset_postfork_child(self):
118        self.condition = threading.Condition()
119
120
121def _abort(state, code, details):
122    if state.code is None:
123        state.code = code
124        state.details = details
125        if state.initial_metadata is None:
126            state.initial_metadata = ()
127        state.trailing_metadata = ()
128
129
130def _handle_event(event, state, response_deserializer):
131    callbacks = []
132    for batch_operation in event.batch_operations:
133        operation_type = batch_operation.type()
134        state.due.remove(operation_type)
135        if operation_type == cygrpc.OperationType.receive_initial_metadata:
136            state.initial_metadata = batch_operation.initial_metadata()
137        elif operation_type == cygrpc.OperationType.receive_message:
138            serialized_response = batch_operation.message()
139            if serialized_response is not None:
140                response = _common.deserialize(serialized_response,
141                                               response_deserializer)
142                if response is None:
143                    details = 'Exception deserializing response!'
144                    _abort(state, grpc.StatusCode.INTERNAL, details)
145                else:
146                    state.response = response
147        elif operation_type == cygrpc.OperationType.receive_status_on_client:
148            state.trailing_metadata = batch_operation.trailing_metadata()
149            if state.code is None:
150                code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get(
151                    batch_operation.code())
152                if code is None:
153                    state.code = grpc.StatusCode.UNKNOWN
154                    state.details = _unknown_code_details(
155                        code, batch_operation.details())
156                else:
157                    state.code = code
158                    state.details = batch_operation.details()
159                    state.debug_error_string = batch_operation.error_string()
160            callbacks.extend(state.callbacks)
161            state.callbacks = None
162    return callbacks
163
164
165def _event_handler(state, response_deserializer):
166
167    def handle_event(event):
168        with state.condition:
169            callbacks = _handle_event(event, state, response_deserializer)
170            state.condition.notify_all()
171            done = not state.due
172        for callback in callbacks:
173            callback()
174        return done and state.fork_epoch >= cygrpc.get_fork_epoch()
175
176    return handle_event
177
178
179def _consume_request_iterator(request_iterator, state, call, request_serializer,
180                              event_handler):
181    if cygrpc.is_fork_support_enabled():
182        condition_wait_timeout = 1.0
183    else:
184        condition_wait_timeout = None
185
186    def consume_request_iterator():  # pylint: disable=too-many-branches
187        while True:
188            return_from_user_request_generator_invoked = False
189            try:
190                # The thread may die in user-code. Do not block fork for this.
191                cygrpc.enter_user_request_generator()
192                request = next(request_iterator)
193            except StopIteration:
194                break
195            except Exception:  # pylint: disable=broad-except
196                cygrpc.return_from_user_request_generator()
197                return_from_user_request_generator_invoked = True
198                code = grpc.StatusCode.UNKNOWN
199                details = 'Exception iterating requests!'
200                _LOGGER.exception(details)
201                call.cancel(_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
202                            details)
203                _abort(state, code, details)
204                return
205            finally:
206                if not return_from_user_request_generator_invoked:
207                    cygrpc.return_from_user_request_generator()
208            serialized_request = _common.serialize(request, request_serializer)
209            with state.condition:
210                if state.code is None and not state.cancelled:
211                    if serialized_request is None:
212                        code = grpc.StatusCode.INTERNAL
213                        details = 'Exception serializing request!'
214                        call.cancel(
215                            _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
216                            details)
217                        _abort(state, code, details)
218                        return
219                    else:
220                        operations = (cygrpc.SendMessageOperation(
221                            serialized_request, _EMPTY_FLAGS),)
222                        operating = call.operate(operations, event_handler)
223                        if operating:
224                            state.due.add(cygrpc.OperationType.send_message)
225                        else:
226                            return
227                        while True:
228                            state.condition.wait(condition_wait_timeout)
229                            cygrpc.block_if_fork_in_progress(state)
230                            if state.code is None:
231                                if cygrpc.OperationType.send_message not in state.due:
232                                    break
233                            else:
234                                return
235                else:
236                    return
237        with state.condition:
238            if state.code is None:
239                operations = (
240                    cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),)
241                operating = call.operate(operations, event_handler)
242                if operating:
243                    state.due.add(cygrpc.OperationType.send_close_from_client)
244
245    consumption_thread = cygrpc.ForkManagedThread(
246        target=consume_request_iterator)
247    consumption_thread.setDaemon(True)
248    consumption_thread.start()
249
250
251class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call):
252
253    def __init__(self, state, call, response_deserializer, deadline):
254        super(_Rendezvous, self).__init__()
255        self._state = state
256        self._call = call
257        self._response_deserializer = response_deserializer
258        self._deadline = deadline
259
260    def cancel(self):
261        with self._state.condition:
262            if self._state.code is None:
263                code = grpc.StatusCode.CANCELLED
264                details = 'Locally cancelled by application!'
265                self._call.cancel(
266                    _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details)
267                self._state.cancelled = True
268                _abort(self._state, code, details)
269                self._state.condition.notify_all()
270            return False
271
272    def cancelled(self):
273        with self._state.condition:
274            return self._state.cancelled
275
276    def running(self):
277        with self._state.condition:
278            return self._state.code is None
279
280    def done(self):
281        with self._state.condition:
282            return self._state.code is not None
283
284    def result(self, timeout=None):
285        until = None if timeout is None else time.time() + timeout
286        with self._state.condition:
287            while True:
288                if self._state.code is None:
289                    _wait_once_until(self._state.condition, until)
290                elif self._state.code is grpc.StatusCode.OK:
291                    return self._state.response
292                elif self._state.cancelled:
293                    raise grpc.FutureCancelledError()
294                else:
295                    raise self
296
297    def exception(self, timeout=None):
298        until = None if timeout is None else time.time() + timeout
299        with self._state.condition:
300            while True:
301                if self._state.code is None:
302                    _wait_once_until(self._state.condition, until)
303                elif self._state.code is grpc.StatusCode.OK:
304                    return None
305                elif self._state.cancelled:
306                    raise grpc.FutureCancelledError()
307                else:
308                    return self
309
310    def traceback(self, timeout=None):
311        until = None if timeout is None else time.time() + timeout
312        with self._state.condition:
313            while True:
314                if self._state.code is None:
315                    _wait_once_until(self._state.condition, until)
316                elif self._state.code is grpc.StatusCode.OK:
317                    return None
318                elif self._state.cancelled:
319                    raise grpc.FutureCancelledError()
320                else:
321                    try:
322                        raise self
323                    except grpc.RpcError:
324                        return sys.exc_info()[2]
325
326    def add_done_callback(self, fn):
327        with self._state.condition:
328            if self._state.code is None:
329                self._state.callbacks.append(lambda: fn(self))
330                return
331
332        fn(self)
333
334    def _next(self):
335        with self._state.condition:
336            if self._state.code is None:
337                event_handler = _event_handler(self._state,
338                                               self._response_deserializer)
339                operating = self._call.operate(
340                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
341                    event_handler)
342                if operating:
343                    self._state.due.add(cygrpc.OperationType.receive_message)
344            elif self._state.code is grpc.StatusCode.OK:
345                raise StopIteration()
346            else:
347                raise self
348            while True:
349                self._state.condition.wait()
350                if self._state.response is not None:
351                    response = self._state.response
352                    self._state.response = None
353                    return response
354                elif cygrpc.OperationType.receive_message not in self._state.due:
355                    if self._state.code is grpc.StatusCode.OK:
356                        raise StopIteration()
357                    elif self._state.code is not None:
358                        raise self
359
360    def __iter__(self):
361        return self
362
363    def __next__(self):
364        return self._next()
365
366    def next(self):
367        return self._next()
368
369    def is_active(self):
370        with self._state.condition:
371            return self._state.code is None
372
373    def time_remaining(self):
374        if self._deadline is None:
375            return None
376        else:
377            return max(self._deadline - time.time(), 0)
378
379    def add_callback(self, callback):
380        with self._state.condition:
381            if self._state.callbacks is None:
382                return False
383            else:
384                self._state.callbacks.append(callback)
385                return True
386
387    def initial_metadata(self):
388        with self._state.condition:
389            while self._state.initial_metadata is None:
390                self._state.condition.wait()
391            return self._state.initial_metadata
392
393    def trailing_metadata(self):
394        with self._state.condition:
395            while self._state.trailing_metadata is None:
396                self._state.condition.wait()
397            return self._state.trailing_metadata
398
399    def code(self):
400        with self._state.condition:
401            while self._state.code is None:
402                self._state.condition.wait()
403            return self._state.code
404
405    def details(self):
406        with self._state.condition:
407            while self._state.details is None:
408                self._state.condition.wait()
409            return _common.decode(self._state.details)
410
411    def debug_error_string(self):
412        with self._state.condition:
413            while self._state.debug_error_string is None:
414                self._state.condition.wait()
415            return _common.decode(self._state.debug_error_string)
416
417    def _repr(self):
418        with self._state.condition:
419            if self._state.code is None:
420                return '<_Rendezvous object of in-flight RPC>'
421            elif self._state.code is grpc.StatusCode.OK:
422                return _OK_RENDEZVOUS_REPR_FORMAT.format(
423                    self._state.code, self._state.details)
424            else:
425                return _NON_OK_RENDEZVOUS_REPR_FORMAT.format(
426                    self._state.code, self._state.details,
427                    self._state.debug_error_string)
428
429    def __repr__(self):
430        return self._repr()
431
432    def __str__(self):
433        return self._repr()
434
435    def __del__(self):
436        with self._state.condition:
437            if self._state.code is None:
438                self._state.code = grpc.StatusCode.CANCELLED
439                self._state.details = 'Cancelled upon garbage collection!'
440                self._state.cancelled = True
441                self._call.cancel(
442                    _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code],
443                    self._state.details)
444                self._state.condition.notify_all()
445
446
447def _start_unary_request(request, timeout, request_serializer):
448    deadline = _deadline(timeout)
449    serialized_request = _common.serialize(request, request_serializer)
450    if serialized_request is None:
451        state = _RPCState((), (), (), grpc.StatusCode.INTERNAL,
452                          'Exception serializing request!')
453        rendezvous = _Rendezvous(state, None, None, deadline)
454        return deadline, None, rendezvous
455    else:
456        return deadline, serialized_request, None
457
458
459def _end_unary_response_blocking(state, call, with_call, deadline):
460    if state.code is grpc.StatusCode.OK:
461        if with_call:
462            rendezvous = _Rendezvous(state, call, None, deadline)
463            return state.response, rendezvous
464        else:
465            return state.response
466    else:
467        raise _Rendezvous(state, None, None, deadline)
468
469
470def _stream_unary_invocation_operationses(metadata):
471    return (
472        (
473            cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
474            cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
475            cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
476        ),
477        (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
478    )
479
480
481def _stream_unary_invocation_operationses_and_tags(metadata):
482    return tuple((
483        operations,
484        None,
485    ) for operations in _stream_unary_invocation_operationses(metadata))
486
487
488class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
489
490    def __init__(self, channel, managed_call, method, request_serializer,
491                 response_deserializer):
492        self._channel = channel
493        self._managed_call = managed_call
494        self._method = method
495        self._request_serializer = request_serializer
496        self._response_deserializer = response_deserializer
497
498    def _prepare(self, request, timeout, metadata):
499        deadline, serialized_request, rendezvous = _start_unary_request(
500            request, timeout, self._request_serializer)
501        if serialized_request is None:
502            return None, None, None, rendezvous
503        else:
504            state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None)
505            operations = (
506                cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
507                cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
508                cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
509                cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
510                cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
511                cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
512            )
513            return state, operations, deadline, None
514
515    def _blocking(self, request, timeout, metadata, credentials):
516        state, operations, deadline, rendezvous = self._prepare(
517            request, timeout, metadata)
518        if state is None:
519            raise rendezvous
520        else:
521            call = self._channel.segregated_call(
522                0, self._method, None, deadline, metadata, None
523                if credentials is None else credentials._credentials, ((
524                    operations,
525                    None,
526                ),))
527            event = call.next_event()
528            _handle_event(event, state, self._response_deserializer)
529            return state, call,
530
531    def __call__(self, request, timeout=None, metadata=None, credentials=None):
532        state, call, = self._blocking(request, timeout, metadata, credentials)
533        return _end_unary_response_blocking(state, call, False, None)
534
535    def with_call(self, request, timeout=None, metadata=None, credentials=None):
536        state, call, = self._blocking(request, timeout, metadata, credentials)
537        return _end_unary_response_blocking(state, call, True, None)
538
539    def future(self, request, timeout=None, metadata=None, credentials=None):
540        state, operations, deadline, rendezvous = self._prepare(
541            request, timeout, metadata)
542        if state is None:
543            raise rendezvous
544        else:
545            event_handler = _event_handler(state, self._response_deserializer)
546            call = self._managed_call(
547                0, self._method, None, deadline, metadata, None
548                if credentials is None else credentials._credentials,
549                (operations,), event_handler)
550            return _Rendezvous(state, call, self._response_deserializer,
551                               deadline)
552
553
554class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
555
556    def __init__(self, channel, managed_call, method, request_serializer,
557                 response_deserializer):
558        self._channel = channel
559        self._managed_call = managed_call
560        self._method = method
561        self._request_serializer = request_serializer
562        self._response_deserializer = response_deserializer
563
564    def __call__(self, request, timeout=None, metadata=None, credentials=None):
565        deadline, serialized_request, rendezvous = _start_unary_request(
566            request, timeout, self._request_serializer)
567        if serialized_request is None:
568            raise rendezvous
569        else:
570            state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
571            operationses = (
572                (
573                    cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
574                    cygrpc.SendMessageOperation(serialized_request,
575                                                _EMPTY_FLAGS),
576                    cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
577                    cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
578                ),
579                (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
580            )
581            event_handler = _event_handler(state, self._response_deserializer)
582            call = self._managed_call(
583                0, self._method, None, deadline, metadata, None
584                if credentials is None else credentials._credentials,
585                operationses, event_handler)
586            return _Rendezvous(state, call, self._response_deserializer,
587                               deadline)
588
589
590class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
591
592    def __init__(self, channel, managed_call, method, request_serializer,
593                 response_deserializer):
594        self._channel = channel
595        self._managed_call = managed_call
596        self._method = method
597        self._request_serializer = request_serializer
598        self._response_deserializer = response_deserializer
599
600    def _blocking(self, request_iterator, timeout, metadata, credentials):
601        deadline = _deadline(timeout)
602        state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
603        call = self._channel.segregated_call(
604            0, self._method, None, deadline, metadata, None
605            if credentials is None else credentials._credentials,
606            _stream_unary_invocation_operationses_and_tags(metadata))
607        _consume_request_iterator(request_iterator, state, call,
608                                  self._request_serializer, None)
609        while True:
610            event = call.next_event()
611            with state.condition:
612                _handle_event(event, state, self._response_deserializer)
613                state.condition.notify_all()
614                if not state.due:
615                    break
616        return state, call,
617
618    def __call__(self,
619                 request_iterator,
620                 timeout=None,
621                 metadata=None,
622                 credentials=None):
623        state, call, = self._blocking(request_iterator, timeout, metadata,
624                                      credentials)
625        return _end_unary_response_blocking(state, call, False, None)
626
627    def with_call(self,
628                  request_iterator,
629                  timeout=None,
630                  metadata=None,
631                  credentials=None):
632        state, call, = self._blocking(request_iterator, timeout, metadata,
633                                      credentials)
634        return _end_unary_response_blocking(state, call, True, None)
635
636    def future(self,
637               request_iterator,
638               timeout=None,
639               metadata=None,
640               credentials=None):
641        deadline = _deadline(timeout)
642        state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
643        event_handler = _event_handler(state, self._response_deserializer)
644        call = self._managed_call(
645            0, self._method, None, deadline, metadata, None
646            if credentials is None else credentials._credentials,
647            _stream_unary_invocation_operationses(metadata), event_handler)
648        _consume_request_iterator(request_iterator, state, call,
649                                  self._request_serializer, event_handler)
650        return _Rendezvous(state, call, self._response_deserializer, deadline)
651
652
653class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
654
655    def __init__(self, channel, managed_call, method, request_serializer,
656                 response_deserializer):
657        self._channel = channel
658        self._managed_call = managed_call
659        self._method = method
660        self._request_serializer = request_serializer
661        self._response_deserializer = response_deserializer
662
663    def __call__(self,
664                 request_iterator,
665                 timeout=None,
666                 metadata=None,
667                 credentials=None):
668        deadline = _deadline(timeout)
669        state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None)
670        operationses = (
671            (
672                cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
673                cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
674            ),
675            (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
676        )
677        event_handler = _event_handler(state, self._response_deserializer)
678        call = self._managed_call(
679            0, self._method, None, deadline, metadata, None
680            if credentials is None else credentials._credentials, operationses,
681            event_handler)
682        _consume_request_iterator(request_iterator, state, call,
683                                  self._request_serializer, event_handler)
684        return _Rendezvous(state, call, self._response_deserializer, deadline)
685
686
687class _ChannelCallState(object):
688
689    def __init__(self, channel):
690        self.lock = threading.Lock()
691        self.channel = channel
692        self.managed_calls = 0
693        self.threading = False
694
695    def reset_postfork_child(self):
696        self.managed_calls = 0
697
698
699def _run_channel_spin_thread(state):
700
701    def channel_spin():
702        while True:
703            cygrpc.block_if_fork_in_progress(state)
704            event = state.channel.next_call_event()
705            if event.completion_type == cygrpc.CompletionType.queue_timeout:
706                continue
707            call_completed = event.tag(event)
708            if call_completed:
709                with state.lock:
710                    state.managed_calls -= 1
711                    if state.managed_calls == 0:
712                        return
713
714    channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin)
715    channel_spin_thread.setDaemon(True)
716    channel_spin_thread.start()
717
718
719def _channel_managed_call_management(state):
720
721    # pylint: disable=too-many-arguments
722    def create(flags, method, host, deadline, metadata, credentials,
723               operationses, event_handler):
724        """Creates a cygrpc.IntegratedCall.
725
726        Args:
727          flags: An integer bitfield of call flags.
728          method: The RPC method.
729          host: A host string for the created call.
730          deadline: A float to be the deadline of the created call or None if
731            the call is to have an infinite deadline.
732          metadata: The metadata for the call or None.
733          credentials: A cygrpc.CallCredentials or None.
734          operationses: An iterable of iterables of cygrpc.Operations to be
735            started on the call.
736          event_handler: A behavior to call to handle the events resultant from
737            the operations on the call.
738
739        Returns:
740          A cygrpc.IntegratedCall with which to conduct an RPC.
741        """
742        operationses_and_tags = tuple((
743            operations,
744            event_handler,
745        ) for operations in operationses)
746        with state.lock:
747            call = state.channel.integrated_call(flags, method, host, deadline,
748                                                 metadata, credentials,
749                                                 operationses_and_tags)
750            if state.managed_calls == 0:
751                state.managed_calls = 1
752                _run_channel_spin_thread(state)
753            else:
754                state.managed_calls += 1
755            return call
756
757    return create
758
759
760class _ChannelConnectivityState(object):
761
762    def __init__(self, channel):
763        self.lock = threading.RLock()
764        self.channel = channel
765        self.polling = False
766        self.connectivity = None
767        self.try_to_connect = False
768        self.callbacks_and_connectivities = []
769        self.delivering = False
770
771    def reset_postfork_child(self):
772        self.polling = False
773        self.connectivity = None
774        self.try_to_connect = False
775        self.callbacks_and_connectivities = []
776        self.delivering = False
777
778
779def _deliveries(state):
780    callbacks_needing_update = []
781    for callback_and_connectivity in state.callbacks_and_connectivities:
782        callback, callback_connectivity, = callback_and_connectivity
783        if callback_connectivity is not state.connectivity:
784            callbacks_needing_update.append(callback)
785            callback_and_connectivity[1] = state.connectivity
786    return callbacks_needing_update
787
788
789def _deliver(state, initial_connectivity, initial_callbacks):
790    connectivity = initial_connectivity
791    callbacks = initial_callbacks
792    while True:
793        for callback in callbacks:
794            cygrpc.block_if_fork_in_progress(state)
795            callable_util.call_logging_exceptions(
796                callback, _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE,
797                connectivity)
798        with state.lock:
799            callbacks = _deliveries(state)
800            if callbacks:
801                connectivity = state.connectivity
802            else:
803                state.delivering = False
804                return
805
806
807def _spawn_delivery(state, callbacks):
808    delivering_thread = cygrpc.ForkManagedThread(
809        target=_deliver, args=(
810            state,
811            state.connectivity,
812            callbacks,
813        ))
814    delivering_thread.start()
815    state.delivering = True
816
817
818# NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll.
819def _poll_connectivity(state, channel, initial_try_to_connect):
820    try_to_connect = initial_try_to_connect
821    connectivity = channel.check_connectivity_state(try_to_connect)
822    with state.lock:
823        state.connectivity = (
824            _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
825                connectivity])
826        callbacks = tuple(callback
827                          for callback, unused_but_known_to_be_none_connectivity
828                          in state.callbacks_and_connectivities)
829        for callback_and_connectivity in state.callbacks_and_connectivities:
830            callback_and_connectivity[1] = state.connectivity
831        if callbacks:
832            _spawn_delivery(state, callbacks)
833    while True:
834        event = channel.watch_connectivity_state(connectivity,
835                                                 time.time() + 0.2)
836        cygrpc.block_if_fork_in_progress(state)
837        with state.lock:
838            if not state.callbacks_and_connectivities and not state.try_to_connect:
839                state.polling = False
840                state.connectivity = None
841                break
842            try_to_connect = state.try_to_connect
843            state.try_to_connect = False
844        if event.success or try_to_connect:
845            connectivity = channel.check_connectivity_state(try_to_connect)
846            with state.lock:
847                state.connectivity = (
848                    _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
849                        connectivity])
850                if not state.delivering:
851                    callbacks = _deliveries(state)
852                    if callbacks:
853                        _spawn_delivery(state, callbacks)
854
855
856def _moot(state):
857    with state.lock:
858        del state.callbacks_and_connectivities[:]
859
860
861def _subscribe(state, callback, try_to_connect):
862    with state.lock:
863        if not state.callbacks_and_connectivities and not state.polling:
864            polling_thread = cygrpc.ForkManagedThread(
865                target=_poll_connectivity,
866                args=(state, state.channel, bool(try_to_connect)))
867            polling_thread.setDaemon(True)
868            polling_thread.start()
869            state.polling = True
870            state.callbacks_and_connectivities.append([callback, None])
871        elif not state.delivering and state.connectivity is not None:
872            _spawn_delivery(state, (callback,))
873            state.try_to_connect |= bool(try_to_connect)
874            state.callbacks_and_connectivities.append(
875                [callback, state.connectivity])
876        else:
877            state.try_to_connect |= bool(try_to_connect)
878            state.callbacks_and_connectivities.append([callback, None])
879
880
881def _unsubscribe(state, callback):
882    with state.lock:
883        for index, (subscribed_callback, unused_connectivity) in enumerate(
884                state.callbacks_and_connectivities):
885            if callback == subscribed_callback:
886                state.callbacks_and_connectivities.pop(index)
887                break
888
889
890def _options(options):
891    return list(options) + [
892        (
893            cygrpc.ChannelArgKey.primary_user_agent_string,
894            _USER_AGENT,
895        ),
896    ]
897
898
899class Channel(grpc.Channel):
900    """A cygrpc.Channel-backed implementation of grpc.Channel."""
901
902    def __init__(self, target, options, credentials):
903        """Constructor.
904
905        Args:
906          target: The target to which to connect.
907          options: Configuration options for the channel.
908          credentials: A cygrpc.ChannelCredentials or None.
909        """
910        self._channel = cygrpc.Channel(
911            _common.encode(target), _options(options), credentials)
912        self._call_state = _ChannelCallState(self._channel)
913        self._connectivity_state = _ChannelConnectivityState(self._channel)
914        cygrpc.fork_register_channel(self)
915
916    def subscribe(self, callback, try_to_connect=None):
917        _subscribe(self._connectivity_state, callback, try_to_connect)
918
919    def unsubscribe(self, callback):
920        _unsubscribe(self._connectivity_state, callback)
921
922    def unary_unary(self,
923                    method,
924                    request_serializer=None,
925                    response_deserializer=None):
926        return _UnaryUnaryMultiCallable(
927            self._channel, _channel_managed_call_management(self._call_state),
928            _common.encode(method), request_serializer, response_deserializer)
929
930    def unary_stream(self,
931                     method,
932                     request_serializer=None,
933                     response_deserializer=None):
934        return _UnaryStreamMultiCallable(
935            self._channel, _channel_managed_call_management(self._call_state),
936            _common.encode(method), request_serializer, response_deserializer)
937
938    def stream_unary(self,
939                     method,
940                     request_serializer=None,
941                     response_deserializer=None):
942        return _StreamUnaryMultiCallable(
943            self._channel, _channel_managed_call_management(self._call_state),
944            _common.encode(method), request_serializer, response_deserializer)
945
946    def stream_stream(self,
947                      method,
948                      request_serializer=None,
949                      response_deserializer=None):
950        return _StreamStreamMultiCallable(
951            self._channel, _channel_managed_call_management(self._call_state),
952            _common.encode(method), request_serializer, response_deserializer)
953
954    def _close(self):
955        self._channel.close(cygrpc.StatusCode.cancelled, 'Channel closed!')
956        _moot(self._connectivity_state)
957
958    def _close_on_fork(self):
959        self._channel.close_on_fork(cygrpc.StatusCode.cancelled,
960                                    'Channel closed due to fork')
961        _moot(self._connectivity_state)
962
963    def __enter__(self):
964        return self
965
966    def __exit__(self, exc_type, exc_val, exc_tb):
967        self._close()
968        return False
969
970    def close(self):
971        self._close()
972
973    def __del__(self):
974        # TODO(https://github.com/grpc/grpc/issues/12531): Several releases
975        # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call
976        # here (or more likely, call self._close() here). We don't do this today
977        # because many valid use cases today allow the channel to be deleted
978        # immediately after stubs are created. After a sufficient period of time
979        # has passed for all users to be trusted to hang out to their channels
980        # for as long as they are in use and to close them after using them,
981        # then deletion of this grpc._channel.Channel instance can be made to
982        # effect closure of the underlying cygrpc.Channel instance.
983        cygrpc.fork_unregister_channel(self)
984        _moot(self._connectivity_state)
985