1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Utilities for the gRPC Python Beta API.""" 15 16import threading 17import time 18 19# implementations is referenced from specification in this module. 20from grpc.beta import implementations # pylint: disable=unused-import 21from grpc.beta import interfaces 22from grpc.framework.foundation import callable_util 23from grpc.framework.foundation import future 24 25_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE = ( 26 'Exception calling connectivity future "done" callback!') 27 28 29class _ChannelReadyFuture(future.Future): 30 31 def __init__(self, channel): 32 self._condition = threading.Condition() 33 self._channel = channel 34 35 self._matured = False 36 self._cancelled = False 37 self._done_callbacks = [] 38 39 def _block(self, timeout): 40 until = None if timeout is None else time.time() + timeout 41 with self._condition: 42 while True: 43 if self._cancelled: 44 raise future.CancelledError() 45 elif self._matured: 46 return 47 else: 48 if until is None: 49 self._condition.wait() 50 else: 51 remaining = until - time.time() 52 if remaining < 0: 53 raise future.TimeoutError() 54 else: 55 self._condition.wait(timeout=remaining) 56 57 def _update(self, connectivity): 58 with self._condition: 59 if (not self._cancelled and 60 connectivity is interfaces.ChannelConnectivity.READY): 61 self._matured = True 62 self._channel.unsubscribe(self._update) 63 self._condition.notify_all() 64 done_callbacks = tuple(self._done_callbacks) 65 self._done_callbacks = None 66 else: 67 return 68 69 for done_callback in done_callbacks: 70 callable_util.call_logging_exceptions( 71 done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self) 72 73 def cancel(self): 74 with self._condition: 75 if not self._matured: 76 self._cancelled = True 77 self._channel.unsubscribe(self._update) 78 self._condition.notify_all() 79 done_callbacks = tuple(self._done_callbacks) 80 self._done_callbacks = None 81 else: 82 return False 83 84 for done_callback in done_callbacks: 85 callable_util.call_logging_exceptions( 86 done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self) 87 88 return True 89 90 def cancelled(self): 91 with self._condition: 92 return self._cancelled 93 94 def running(self): 95 with self._condition: 96 return not self._cancelled and not self._matured 97 98 def done(self): 99 with self._condition: 100 return self._cancelled or self._matured 101 102 def result(self, timeout=None): 103 self._block(timeout) 104 return None 105 106 def exception(self, timeout=None): 107 self._block(timeout) 108 return None 109 110 def traceback(self, timeout=None): 111 self._block(timeout) 112 return None 113 114 def add_done_callback(self, fn): 115 with self._condition: 116 if not self._cancelled and not self._matured: 117 self._done_callbacks.append(fn) 118 return 119 120 fn(self) 121 122 def start(self): 123 with self._condition: 124 self._channel.subscribe(self._update, try_to_connect=True) 125 126 def __del__(self): 127 with self._condition: 128 if not self._cancelled and not self._matured: 129 self._channel.unsubscribe(self._update) 130 131 132def channel_ready_future(channel): 133 """Creates a future.Future tracking when an implementations.Channel is ready. 134 135 Cancelling the returned future.Future does not tell the given 136 implementations.Channel to abandon attempts it may have been making to 137 connect; cancelling merely deactivates the return future.Future's 138 subscription to the given implementations.Channel's connectivity. 139 140 Args: 141 channel: An implementations.Channel. 142 143 Returns: 144 A future.Future that matures when the given Channel has connectivity 145 interfaces.ChannelConnectivity.READY. 146 """ 147 ready_future = _ChannelReadyFuture(channel) 148 ready_future.start() 149 return ready_future 150