1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Internal utilities for gRPC Python.""" 15 16import collections 17import threading 18import time 19import logging 20 21import six 22 23import grpc 24from grpc import _common 25 26_LOGGER = logging.getLogger(__name__) 27 28_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE = ( 29 'Exception calling connectivity future "done" callback!') 30 31 32class RpcMethodHandler( 33 collections.namedtuple('_RpcMethodHandler', ( 34 'request_streaming', 35 'response_streaming', 36 'request_deserializer', 37 'response_serializer', 38 'unary_unary', 39 'unary_stream', 40 'stream_unary', 41 'stream_stream', 42 )), grpc.RpcMethodHandler): 43 pass 44 45 46class DictionaryGenericHandler(grpc.ServiceRpcHandler): 47 48 def __init__(self, service, method_handlers): 49 self._name = service 50 self._method_handlers = { 51 _common.fully_qualified_method(service, method): method_handler 52 for method, method_handler in six.iteritems(method_handlers) 53 } 54 55 def service_name(self): 56 return self._name 57 58 def service(self, handler_call_details): 59 return self._method_handlers.get(handler_call_details.method) 60 61 62class _ChannelReadyFuture(grpc.Future): 63 64 def __init__(self, channel): 65 self._condition = threading.Condition() 66 self._channel = channel 67 68 self._matured = False 69 self._cancelled = False 70 self._done_callbacks = [] 71 72 def _block(self, timeout): 73 until = None if timeout is None else time.time() + timeout 74 with self._condition: 75 while True: 76 if self._cancelled: 77 raise grpc.FutureCancelledError() 78 elif self._matured: 79 return 80 else: 81 if until is None: 82 self._condition.wait() 83 else: 84 remaining = until - time.time() 85 if remaining < 0: 86 raise grpc.FutureTimeoutError() 87 else: 88 self._condition.wait(timeout=remaining) 89 90 def _update(self, connectivity): 91 with self._condition: 92 if (not self._cancelled and 93 connectivity is grpc.ChannelConnectivity.READY): 94 self._matured = True 95 self._channel.unsubscribe(self._update) 96 self._condition.notify_all() 97 done_callbacks = tuple(self._done_callbacks) 98 self._done_callbacks = None 99 else: 100 return 101 102 for done_callback in done_callbacks: 103 try: 104 done_callback(self) 105 except Exception: # pylint: disable=broad-except 106 _LOGGER.exception(_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE) 107 108 def cancel(self): 109 with self._condition: 110 if not self._matured: 111 self._cancelled = True 112 self._channel.unsubscribe(self._update) 113 self._condition.notify_all() 114 done_callbacks = tuple(self._done_callbacks) 115 self._done_callbacks = None 116 else: 117 return False 118 119 for done_callback in done_callbacks: 120 try: 121 done_callback(self) 122 except Exception: # pylint: disable=broad-except 123 _LOGGER.exception(_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE) 124 125 return True 126 127 def cancelled(self): 128 with self._condition: 129 return self._cancelled 130 131 def running(self): 132 with self._condition: 133 return not self._cancelled and not self._matured 134 135 def done(self): 136 with self._condition: 137 return self._cancelled or self._matured 138 139 def result(self, timeout=None): 140 self._block(timeout) 141 142 def exception(self, timeout=None): 143 self._block(timeout) 144 145 def traceback(self, timeout=None): 146 self._block(timeout) 147 148 def add_done_callback(self, fn): 149 with self._condition: 150 if not self._cancelled and not self._matured: 151 self._done_callbacks.append(fn) 152 return 153 154 fn(self) 155 156 def start(self): 157 with self._condition: 158 self._channel.subscribe(self._update, try_to_connect=True) 159 160 def __del__(self): 161 with self._condition: 162 if not self._cancelled and not self._matured: 163 self._channel.unsubscribe(self._update) 164 165 166def channel_ready_future(channel): 167 ready_future = _ChannelReadyFuture(channel) 168 ready_future.start() 169 return ready_future 170