1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15cimport cpython 16 17import threading 18import time 19 20_INTERNAL_CALL_ERROR_MESSAGE_FORMAT = ( 21 'Internal gRPC call error %d. ' + 22 'Please report to https://github.com/grpc/grpc/issues') 23 24 25cdef str _call_error_metadata(metadata): 26 return 'metadata was invalid: %s' % metadata 27 28 29cdef str _call_error_no_metadata(c_call_error): 30 return _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % c_call_error 31 32 33cdef str _call_error(c_call_error, metadata): 34 if c_call_error == GRPC_CALL_ERROR_INVALID_METADATA: 35 return _call_error_metadata(metadata) 36 else: 37 return _call_error_no_metadata(c_call_error) 38 39 40cdef _check_call_error_no_metadata(c_call_error): 41 if c_call_error != GRPC_CALL_OK: 42 return _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % c_call_error 43 else: 44 return None 45 46 47cdef _check_and_raise_call_error_no_metadata(c_call_error): 48 error = _check_call_error_no_metadata(c_call_error) 49 if error is not None: 50 raise ValueError(error) 51 52 53cdef _check_call_error(c_call_error, metadata): 54 if c_call_error == GRPC_CALL_ERROR_INVALID_METADATA: 55 return _call_error_metadata(metadata) 56 else: 57 return _check_call_error_no_metadata(c_call_error) 58 59 60cdef void _raise_call_error_no_metadata(c_call_error) except *: 61 raise ValueError(_call_error_no_metadata(c_call_error)) 62 63 64cdef void _raise_call_error(c_call_error, metadata) except *: 65 raise ValueError(_call_error(c_call_error, metadata)) 66 67 68cdef _destroy_c_completion_queue(grpc_completion_queue *c_completion_queue): 69 grpc_completion_queue_shutdown(c_completion_queue) 70 grpc_completion_queue_destroy(c_completion_queue) 71 72 73cdef class _CallState: 74 75 def __cinit__(self): 76 self.due = set() 77 78 79cdef class _ChannelState: 80 81 def __cinit__(self): 82 self.condition = threading.Condition() 83 self.open = True 84 self.integrated_call_states = {} 85 self.segregated_call_states = set() 86 self.connectivity_due = set() 87 self.closed_reason = None 88 89 90cdef tuple _operate(grpc_call *c_call, object operations, object user_tag): 91 cdef grpc_call_error c_call_error 92 cdef _BatchOperationTag tag = _BatchOperationTag(user_tag, operations, None) 93 tag.prepare() 94 cpython.Py_INCREF(tag) 95 with nogil: 96 c_call_error = grpc_call_start_batch( 97 c_call, tag.c_ops, tag.c_nops, <cpython.PyObject *>tag, NULL) 98 return c_call_error, tag 99 100 101cdef object _operate_from_integrated_call( 102 _ChannelState channel_state, _CallState call_state, object operations, 103 object user_tag): 104 cdef grpc_call_error c_call_error 105 cdef _BatchOperationTag tag 106 with channel_state.condition: 107 if call_state.due: 108 c_call_error, tag = _operate(call_state.c_call, operations, user_tag) 109 if c_call_error == GRPC_CALL_OK: 110 call_state.due.add(tag) 111 channel_state.integrated_call_states[tag] = call_state 112 return True 113 else: 114 _raise_call_error_no_metadata(c_call_error) 115 else: 116 return False 117 118 119cdef object _operate_from_segregated_call( 120 _ChannelState channel_state, _CallState call_state, object operations, 121 object user_tag): 122 cdef grpc_call_error c_call_error 123 cdef _BatchOperationTag tag 124 with channel_state.condition: 125 if call_state.due: 126 c_call_error, tag = _operate(call_state.c_call, operations, user_tag) 127 if c_call_error == GRPC_CALL_OK: 128 call_state.due.add(tag) 129 return True 130 else: 131 _raise_call_error_no_metadata(c_call_error) 132 else: 133 return False 134 135 136cdef _cancel( 137 _ChannelState channel_state, _CallState call_state, grpc_status_code code, 138 str details): 139 cdef grpc_call_error c_call_error 140 with channel_state.condition: 141 if call_state.due: 142 c_call_error = grpc_call_cancel_with_status( 143 call_state.c_call, code, _encode(details), NULL) 144 _check_and_raise_call_error_no_metadata(c_call_error) 145 146 147cdef _next_call_event( 148 _ChannelState channel_state, grpc_completion_queue *c_completion_queue, 149 on_success, deadline): 150 tag, event = _latent_event(c_completion_queue, deadline) 151 with channel_state.condition: 152 on_success(tag) 153 channel_state.condition.notify_all() 154 return event 155 156 157# TODO(https://github.com/grpc/grpc/issues/14569): This could be a lot simpler. 158cdef void _call( 159 _ChannelState channel_state, _CallState call_state, 160 grpc_completion_queue *c_completion_queue, on_success, int flags, method, 161 host, object deadline, CallCredentials credentials, 162 object operationses_and_user_tags, object metadata) except *: 163 """Invokes an RPC. 164 165 Args: 166 channel_state: A _ChannelState with its "open" attribute set to True. RPCs 167 may not be invoked on a closed channel. 168 call_state: An empty _CallState to be altered (specifically assigned a 169 c_call and having its due set populated) if the RPC invocation is 170 successful. 171 c_completion_queue: A grpc_completion_queue to be used for the call's 172 operations. 173 on_success: A behavior to be called if attempting to start operations for 174 the call succeeds. If called the behavior will be called while holding the 175 channel_state condition and passed the tags associated with operations 176 that were successfully started for the call. 177 flags: Flags to be passed to gRPC Core as part of call creation. 178 method: The fully-qualified name of the RPC method being invoked. 179 host: A "host" string to be passed to gRPC Core as part of call creation. 180 deadline: A float for the deadline of the RPC, or None if the RPC is to have 181 no deadline. 182 credentials: A _CallCredentials for the RPC or None. 183 operationses_and_user_tags: A sequence of length-two sequences the first 184 element of which is a sequence of Operations and the second element of 185 which is an object to be used as a tag. A SendInitialMetadataOperation 186 must be present in the first element of this value. 187 metadata: The metadata for this call. 188 """ 189 cdef grpc_slice method_slice 190 cdef grpc_slice host_slice 191 cdef grpc_slice *host_slice_ptr 192 cdef grpc_call_credentials *c_call_credentials 193 cdef grpc_call_error c_call_error 194 cdef tuple error_and_wrapper_tag 195 cdef _BatchOperationTag wrapper_tag 196 with channel_state.condition: 197 if channel_state.open: 198 method_slice = _slice_from_bytes(method) 199 if host is None: 200 host_slice_ptr = NULL 201 else: 202 host_slice = _slice_from_bytes(host) 203 host_slice_ptr = &host_slice 204 call_state.c_call = grpc_channel_create_call( 205 channel_state.c_channel, NULL, flags, 206 c_completion_queue, method_slice, host_slice_ptr, 207 _timespec_from_time(deadline), NULL) 208 grpc_slice_unref(method_slice) 209 if host_slice_ptr: 210 grpc_slice_unref(host_slice) 211 if credentials is not None: 212 c_call_credentials = credentials.c() 213 c_call_error = grpc_call_set_credentials( 214 call_state.c_call, c_call_credentials) 215 grpc_call_credentials_release(c_call_credentials) 216 if c_call_error != GRPC_CALL_OK: 217 grpc_call_unref(call_state.c_call) 218 call_state.c_call = NULL 219 _raise_call_error_no_metadata(c_call_error) 220 started_tags = set() 221 for operations, user_tag in operationses_and_user_tags: 222 c_call_error, tag = _operate(call_state.c_call, operations, user_tag) 223 if c_call_error == GRPC_CALL_OK: 224 started_tags.add(tag) 225 else: 226 grpc_call_cancel(call_state.c_call, NULL) 227 grpc_call_unref(call_state.c_call) 228 call_state.c_call = NULL 229 _raise_call_error(c_call_error, metadata) 230 else: 231 call_state.due.update(started_tags) 232 on_success(started_tags) 233 else: 234 raise ValueError('Cannot invoke RPC: %s' % channel_state.closed_reason) 235cdef void _process_integrated_call_tag( 236 _ChannelState state, _BatchOperationTag tag) except *: 237 cdef _CallState call_state = state.integrated_call_states.pop(tag) 238 call_state.due.remove(tag) 239 if not call_state.due: 240 grpc_call_unref(call_state.c_call) 241 call_state.c_call = NULL 242 243 244cdef class IntegratedCall: 245 246 def __cinit__(self, _ChannelState channel_state, _CallState call_state): 247 self._channel_state = channel_state 248 self._call_state = call_state 249 250 def operate(self, operations, tag): 251 return _operate_from_integrated_call( 252 self._channel_state, self._call_state, operations, tag) 253 254 def cancel(self, code, details): 255 _cancel(self._channel_state, self._call_state, code, details) 256 257 258cdef IntegratedCall _integrated_call( 259 _ChannelState state, int flags, method, host, object deadline, 260 object metadata, CallCredentials credentials, operationses_and_user_tags): 261 call_state = _CallState() 262 263 def on_success(started_tags): 264 for started_tag in started_tags: 265 state.integrated_call_states[started_tag] = call_state 266 267 _call( 268 state, call_state, state.c_call_completion_queue, on_success, flags, 269 method, host, deadline, credentials, operationses_and_user_tags, metadata) 270 271 return IntegratedCall(state, call_state) 272 273 274cdef object _process_segregated_call_tag( 275 _ChannelState state, _CallState call_state, 276 grpc_completion_queue *c_completion_queue, _BatchOperationTag tag): 277 call_state.due.remove(tag) 278 if not call_state.due: 279 grpc_call_unref(call_state.c_call) 280 call_state.c_call = NULL 281 state.segregated_call_states.remove(call_state) 282 _destroy_c_completion_queue(c_completion_queue) 283 return True 284 else: 285 return False 286 287 288cdef class SegregatedCall: 289 290 def __cinit__(self, _ChannelState channel_state, _CallState call_state): 291 self._channel_state = channel_state 292 self._call_state = call_state 293 294 def operate(self, operations, tag): 295 return _operate_from_segregated_call( 296 self._channel_state, self._call_state, operations, tag) 297 298 def cancel(self, code, details): 299 _cancel(self._channel_state, self._call_state, code, details) 300 301 def next_event(self): 302 def on_success(tag): 303 _process_segregated_call_tag( 304 self._channel_state, self._call_state, self._c_completion_queue, tag) 305 return _next_call_event( 306 self._channel_state, self._c_completion_queue, on_success, None) 307 308 309cdef SegregatedCall _segregated_call( 310 _ChannelState state, int flags, method, host, object deadline, 311 object metadata, CallCredentials credentials, operationses_and_user_tags): 312 cdef _CallState call_state = _CallState() 313 cdef SegregatedCall segregated_call 314 cdef grpc_completion_queue *c_completion_queue 315 316 def on_success(started_tags): 317 state.segregated_call_states.add(call_state) 318 319 with state.condition: 320 if state.open: 321 c_completion_queue = (grpc_completion_queue_create_for_next(NULL)) 322 else: 323 raise ValueError('Cannot invoke RPC on closed channel!') 324 325 try: 326 _call( 327 state, call_state, c_completion_queue, on_success, flags, method, host, 328 deadline, credentials, operationses_and_user_tags, metadata) 329 except: 330 _destroy_c_completion_queue(c_completion_queue) 331 raise 332 333 segregated_call = SegregatedCall(state, call_state) 334 segregated_call._c_completion_queue = c_completion_queue 335 return segregated_call 336 337 338cdef object _watch_connectivity_state( 339 _ChannelState state, grpc_connectivity_state last_observed_state, 340 object deadline): 341 cdef _ConnectivityTag tag = _ConnectivityTag(object()) 342 with state.condition: 343 if state.open: 344 cpython.Py_INCREF(tag) 345 grpc_channel_watch_connectivity_state( 346 state.c_channel, last_observed_state, _timespec_from_time(deadline), 347 state.c_connectivity_completion_queue, <cpython.PyObject *>tag) 348 state.connectivity_due.add(tag) 349 else: 350 raise ValueError('Cannot invoke RPC: %s' % state.closed_reason) 351 completed_tag, event = _latent_event( 352 state.c_connectivity_completion_queue, None) 353 with state.condition: 354 state.connectivity_due.remove(completed_tag) 355 state.condition.notify_all() 356 return event 357 358 359cdef _close(Channel channel, grpc_status_code code, object details, 360 drain_calls): 361 cdef _ChannelState state = channel._state 362 cdef _CallState call_state 363 encoded_details = _encode(details) 364 with state.condition: 365 if state.open: 366 state.open = False 367 state.closed_reason = details 368 for call_state in set(state.integrated_call_states.values()): 369 grpc_call_cancel_with_status( 370 call_state.c_call, code, encoded_details, NULL) 371 for call_state in state.segregated_call_states: 372 grpc_call_cancel_with_status( 373 call_state.c_call, code, encoded_details, NULL) 374 # TODO(https://github.com/grpc/grpc/issues/3064): Cancel connectivity 375 # watching. 376 377 if drain_calls: 378 while not _calls_drained(state): 379 event = channel.next_call_event() 380 if event.completion_type == CompletionType.queue_timeout: 381 continue 382 event.tag(event) 383 else: 384 while state.integrated_call_states: 385 state.condition.wait() 386 while state.segregated_call_states: 387 state.condition.wait() 388 while state.connectivity_due: 389 state.condition.wait() 390 391 _destroy_c_completion_queue(state.c_call_completion_queue) 392 _destroy_c_completion_queue(state.c_connectivity_completion_queue) 393 grpc_channel_destroy(state.c_channel) 394 state.c_channel = NULL 395 grpc_shutdown() 396 state.condition.notify_all() 397 else: 398 # Another call to close already completed in the past or is currently 399 # being executed in another thread. 400 while state.c_channel != NULL: 401 state.condition.wait() 402 403 404cdef _calls_drained(_ChannelState state): 405 return not (state.integrated_call_states or state.segregated_call_states or 406 state.connectivity_due) 407 408cdef class Channel: 409 410 def __cinit__( 411 self, bytes target, object arguments, 412 ChannelCredentials channel_credentials): 413 arguments = () if arguments is None else tuple(arguments) 414 fork_handlers_and_grpc_init() 415 self._state = _ChannelState() 416 self._vtable.copy = &_copy_pointer 417 self._vtable.destroy = &_destroy_pointer 418 self._vtable.cmp = &_compare_pointer 419 cdef _ArgumentsProcessor arguments_processor = _ArgumentsProcessor( 420 arguments) 421 cdef grpc_channel_args *c_arguments = arguments_processor.c(&self._vtable) 422 if channel_credentials is None: 423 self._state.c_channel = grpc_insecure_channel_create( 424 <char *>target, c_arguments, NULL) 425 else: 426 c_channel_credentials = channel_credentials.c() 427 self._state.c_channel = grpc_secure_channel_create( 428 c_channel_credentials, <char *>target, c_arguments, NULL) 429 grpc_channel_credentials_release(c_channel_credentials) 430 self._state.c_call_completion_queue = ( 431 grpc_completion_queue_create_for_next(NULL)) 432 self._state.c_connectivity_completion_queue = ( 433 grpc_completion_queue_create_for_next(NULL)) 434 self._arguments = arguments 435 436 def target(self): 437 cdef char *c_target 438 with self._state.condition: 439 c_target = grpc_channel_get_target(self._state.c_channel) 440 target = <bytes>c_target 441 gpr_free(c_target) 442 return target 443 444 def integrated_call( 445 self, int flags, method, host, object deadline, object metadata, 446 CallCredentials credentials, operationses_and_tags): 447 return _integrated_call( 448 self._state, flags, method, host, deadline, metadata, credentials, 449 operationses_and_tags) 450 451 def next_call_event(self): 452 def on_success(tag): 453 if tag is not None: 454 _process_integrated_call_tag(self._state, tag) 455 if is_fork_support_enabled(): 456 queue_deadline = time.time() + 1.0 457 else: 458 queue_deadline = None 459 return _next_call_event(self._state, self._state.c_call_completion_queue, 460 on_success, queue_deadline) 461 462 def segregated_call( 463 self, int flags, method, host, object deadline, object metadata, 464 CallCredentials credentials, operationses_and_tags): 465 return _segregated_call( 466 self._state, flags, method, host, deadline, metadata, credentials, 467 operationses_and_tags) 468 469 def check_connectivity_state(self, bint try_to_connect): 470 with self._state.condition: 471 if self._state.open: 472 return grpc_channel_check_connectivity_state( 473 self._state.c_channel, try_to_connect) 474 else: 475 raise ValueError('Cannot invoke RPC: %s' % self._state.closed_reason) 476 477 def watch_connectivity_state( 478 self, grpc_connectivity_state last_observed_state, object deadline): 479 return _watch_connectivity_state(self._state, last_observed_state, deadline) 480 481 def close(self, code, details): 482 _close(self, code, details, False) 483 484 def close_on_fork(self, code, details): 485 _close(self, code, details, True) 486