1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Internal utilities for gRPC Python.""" 15 16import collections 17import threading 18import time 19 20import six 21 22import grpc 23from grpc import _common 24from grpc.framework.foundation import callable_util 25 26_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE = ( 27 'Exception calling connectivity future "done" callback!') 28 29 30class RpcMethodHandler( 31 collections.namedtuple('_RpcMethodHandler', ( 32 'request_streaming', 33 'response_streaming', 34 'request_deserializer', 35 'response_serializer', 36 'unary_unary', 37 'unary_stream', 38 'stream_unary', 39 'stream_stream', 40 )), grpc.RpcMethodHandler): 41 pass 42 43 44class DictionaryGenericHandler(grpc.ServiceRpcHandler): 45 46 def __init__(self, service, method_handlers): 47 self._name = service 48 self._method_handlers = { 49 _common.fully_qualified_method(service, method): method_handler 50 for method, method_handler in six.iteritems(method_handlers) 51 } 52 53 def service_name(self): 54 return self._name 55 56 def service(self, handler_call_details): 57 return self._method_handlers.get(handler_call_details.method) 58 59 60class _ChannelReadyFuture(grpc.Future): 61 62 def __init__(self, channel): 63 self._condition = threading.Condition() 64 self._channel = channel 65 66 self._matured = False 67 self._cancelled = False 68 self._done_callbacks = [] 69 70 def _block(self, timeout): 71 until = None if timeout is None else time.time() + timeout 72 with self._condition: 73 while True: 74 if self._cancelled: 75 raise grpc.FutureCancelledError() 76 elif self._matured: 77 return 78 else: 79 if until is None: 80 self._condition.wait() 81 else: 82 remaining = until - time.time() 83 if remaining < 0: 84 raise grpc.FutureTimeoutError() 85 else: 86 self._condition.wait(timeout=remaining) 87 88 def _update(self, connectivity): 89 with self._condition: 90 if (not self._cancelled and 91 connectivity is grpc.ChannelConnectivity.READY): 92 self._matured = True 93 self._channel.unsubscribe(self._update) 94 self._condition.notify_all() 95 done_callbacks = tuple(self._done_callbacks) 96 self._done_callbacks = None 97 else: 98 return 99 100 for done_callback in done_callbacks: 101 callable_util.call_logging_exceptions( 102 done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self) 103 104 def cancel(self): 105 with self._condition: 106 if not self._matured: 107 self._cancelled = True 108 self._channel.unsubscribe(self._update) 109 self._condition.notify_all() 110 done_callbacks = tuple(self._done_callbacks) 111 self._done_callbacks = None 112 else: 113 return False 114 115 for done_callback in done_callbacks: 116 callable_util.call_logging_exceptions( 117 done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self) 118 119 return True 120 121 def cancelled(self): 122 with self._condition: 123 return self._cancelled 124 125 def running(self): 126 with self._condition: 127 return not self._cancelled and not self._matured 128 129 def done(self): 130 with self._condition: 131 return self._cancelled or self._matured 132 133 def result(self, timeout=None): 134 self._block(timeout) 135 return None 136 137 def exception(self, timeout=None): 138 self._block(timeout) 139 return None 140 141 def traceback(self, timeout=None): 142 self._block(timeout) 143 return None 144 145 def add_done_callback(self, fn): 146 with self._condition: 147 if not self._cancelled and not self._matured: 148 self._done_callbacks.append(fn) 149 return 150 151 fn(self) 152 153 def start(self): 154 with self._condition: 155 self._channel.subscribe(self._update, try_to_connect=True) 156 157 def __del__(self): 158 with self._condition: 159 if not self._cancelled and not self._matured: 160 self._channel.unsubscribe(self._update) 161 162 163def channel_ready_future(channel): 164 ready_future = _ChannelReadyFuture(channel) 165 ready_future.start() 166 return ready_future 167