1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import logging 16import threading 17 18import grpc 19 20_NOT_YET_OBSERVED = object() 21logging.basicConfig() 22_LOGGER = logging.getLogger(__name__) 23 24 25def _cancel(handler): 26 return handler.cancel(grpc.StatusCode.CANCELLED, 'Locally cancelled!') 27 28 29def _is_active(handler): 30 return handler.is_active() 31 32 33def _time_remaining(unused_handler): 34 raise NotImplementedError() 35 36 37def _add_callback(handler, callback): 38 return handler.add_callback(callback) 39 40 41def _initial_metadata(handler): 42 return handler.initial_metadata() 43 44 45def _trailing_metadata(handler): 46 trailing_metadata, unused_code, unused_details = handler.termination() 47 return trailing_metadata 48 49 50def _code(handler): 51 unused_trailing_metadata, code, unused_details = handler.termination() 52 return code 53 54 55def _details(handler): 56 unused_trailing_metadata, unused_code, details = handler.termination() 57 return details 58 59 60class _Call(grpc.Call): 61 62 def __init__(self, handler): 63 self._handler = handler 64 65 def cancel(self): 66 _cancel(self._handler) 67 68 def is_active(self): 69 return _is_active(self._handler) 70 71 def time_remaining(self): 72 return _time_remaining(self._handler) 73 74 def add_callback(self, callback): 75 return _add_callback(self._handler, callback) 76 77 def initial_metadata(self): 78 return _initial_metadata(self._handler) 79 80 def trailing_metadata(self): 81 return _trailing_metadata(self._handler) 82 83 def code(self): 84 return _code(self._handler) 85 86 def details(self): 87 return _details(self._handler) 88 89 90class _RpcErrorCall(grpc.RpcError, grpc.Call): 91 92 def __init__(self, handler): 93 self._handler = handler 94 95 def cancel(self): 96 _cancel(self._handler) 97 98 def is_active(self): 99 return _is_active(self._handler) 100 101 def time_remaining(self): 102 return _time_remaining(self._handler) 103 104 def add_callback(self, callback): 105 return _add_callback(self._handler, callback) 106 107 def initial_metadata(self): 108 return _initial_metadata(self._handler) 109 110 def trailing_metadata(self): 111 return _trailing_metadata(self._handler) 112 113 def code(self): 114 return _code(self._handler) 115 116 def details(self): 117 return _details(self._handler) 118 119 120def _next(handler): 121 read = handler.take_response() 122 if read.code is None: 123 return read.response 124 elif read.code is grpc.StatusCode.OK: 125 raise StopIteration() 126 else: 127 raise _RpcErrorCall(handler) 128 129 130class _HandlerExtras(object): 131 132 def __init__(self): 133 self.condition = threading.Condition() 134 self.unary_response = _NOT_YET_OBSERVED 135 self.cancelled = False 136 137 138def _with_extras_cancel(handler, extras): 139 with extras.condition: 140 if handler.cancel(grpc.StatusCode.CANCELLED, 'Locally cancelled!'): 141 extras.cancelled = True 142 return True 143 else: 144 return False 145 146 147def _extras_without_cancelled(extras): 148 with extras.condition: 149 return extras.cancelled 150 151 152def _running(handler): 153 return handler.is_active() 154 155 156def _done(handler): 157 return not handler.is_active() 158 159 160def _with_extras_unary_response(handler, extras): 161 with extras.condition: 162 if extras.unary_response is _NOT_YET_OBSERVED: 163 read = handler.take_response() 164 if read.code is None: 165 extras.unary_response = read.response 166 return read.response 167 else: 168 raise _RpcErrorCall(handler) 169 else: 170 return extras.unary_response 171 172 173def _exception(unused_handler): 174 raise NotImplementedError('TODO!') 175 176 177def _traceback(unused_handler): 178 raise NotImplementedError('TODO!') 179 180 181def _add_done_callback(handler, callback, future): 182 adapted_callback = lambda: callback(future) 183 if not handler.add_callback(adapted_callback): 184 callback(future) 185 186 187class _FutureCall(grpc.Future, grpc.Call): 188 189 def __init__(self, handler, extras): 190 self._handler = handler 191 self._extras = extras 192 193 def cancel(self): 194 return _with_extras_cancel(self._handler, self._extras) 195 196 def cancelled(self): 197 return _extras_without_cancelled(self._extras) 198 199 def running(self): 200 return _running(self._handler) 201 202 def done(self): 203 return _done(self._handler) 204 205 def result(self): 206 return _with_extras_unary_response(self._handler, self._extras) 207 208 def exception(self): 209 return _exception(self._handler) 210 211 def traceback(self): 212 return _traceback(self._handler) 213 214 def add_done_callback(self, fn): 215 _add_done_callback(self._handler, fn, self) 216 217 def is_active(self): 218 return _is_active(self._handler) 219 220 def time_remaining(self): 221 return _time_remaining(self._handler) 222 223 def add_callback(self, callback): 224 return _add_callback(self._handler, callback) 225 226 def initial_metadata(self): 227 return _initial_metadata(self._handler) 228 229 def trailing_metadata(self): 230 return _trailing_metadata(self._handler) 231 232 def code(self): 233 return _code(self._handler) 234 235 def details(self): 236 return _details(self._handler) 237 238 239def consume_requests(request_iterator, handler): 240 241 def _consume(): 242 while True: 243 try: 244 request = next(request_iterator) 245 added = handler.add_request(request) 246 if not added: 247 break 248 except StopIteration: 249 handler.close_requests() 250 break 251 except Exception: # pylint: disable=broad-except 252 details = 'Exception iterating requests!' 253 _LOGGER.exception(details) 254 handler.cancel(grpc.StatusCode.UNKNOWN, details) 255 256 consumption = threading.Thread(target=_consume) 257 consumption.start() 258 259 260def blocking_unary_response(handler): 261 read = handler.take_response() 262 if read.code is None: 263 unused_trailing_metadata, code, unused_details = handler.termination() 264 if code is grpc.StatusCode.OK: 265 return read.response 266 else: 267 raise _RpcErrorCall(handler) 268 else: 269 raise _RpcErrorCall(handler) 270 271 272def blocking_unary_response_with_call(handler): 273 read = handler.take_response() 274 if read.code is None: 275 unused_trailing_metadata, code, unused_details = handler.termination() 276 if code is grpc.StatusCode.OK: 277 return read.response, _Call(handler) 278 else: 279 raise _RpcErrorCall(handler) 280 else: 281 raise _RpcErrorCall(handler) 282 283 284def future_call(handler): 285 return _FutureCall(handler, _HandlerExtras()) 286 287 288class ResponseIteratorCall(grpc.Call): 289 290 def __init__(self, handler): 291 self._handler = handler 292 293 def __iter__(self): 294 return self 295 296 def __next__(self): 297 return _next(self._handler) 298 299 def next(self): 300 return _next(self._handler) 301 302 def cancel(self): 303 _cancel(self._handler) 304 305 def is_active(self): 306 return _is_active(self._handler) 307 308 def time_remaining(self): 309 return _time_remaining(self._handler) 310 311 def add_callback(self, callback): 312 return _add_callback(self._handler, callback) 313 314 def initial_metadata(self): 315 return _initial_metadata(self._handler) 316 317 def trailing_metadata(self): 318 return _trailing_metadata(self._handler) 319 320 def code(self): 321 return _code(self._handler) 322 323 def details(self): 324 return _details(self._handler) 325