1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import logging 16import threading 17 18import grpc 19from grpc_testing import _common 20 21logging.basicConfig() 22_LOGGER = logging.getLogger(__name__) 23 24 25class Rpc(object): 26 27 def __init__(self, handler, invocation_metadata): 28 self._condition = threading.Condition() 29 self._handler = handler 30 self._invocation_metadata = invocation_metadata 31 self._initial_metadata_sent = False 32 self._pending_trailing_metadata = None 33 self._pending_code = None 34 self._pending_details = None 35 self._callbacks = [] 36 self._active = True 37 self._rpc_errors = [] 38 39 def _ensure_initial_metadata_sent(self): 40 if not self._initial_metadata_sent: 41 self._handler.send_initial_metadata(_common.FUSSED_EMPTY_METADATA) 42 self._initial_metadata_sent = True 43 44 def _call_back(self): 45 callbacks = tuple(self._callbacks) 46 self._callbacks = None 47 48 def call_back(): 49 for callback in callbacks: 50 try: 51 callback() 52 except Exception: # pylint: disable=broad-except 53 _LOGGER.exception('Exception calling server-side callback!') 54 55 callback_calling_thread = threading.Thread(target=call_back) 56 callback_calling_thread.start() 57 58 def _terminate(self, trailing_metadata, code, details): 59 if self._active: 60 self._active = False 61 self._handler.send_termination(trailing_metadata, code, details) 62 self._call_back() 63 self._condition.notify_all() 64 65 def _complete(self): 66 if self._pending_trailing_metadata is None: 67 trailing_metadata = _common.FUSSED_EMPTY_METADATA 68 else: 69 trailing_metadata = self._pending_trailing_metadata 70 if self._pending_code is None: 71 code = grpc.StatusCode.OK 72 else: 73 code = self._pending_code 74 details = '' if self._pending_details is None else self._pending_details 75 self._terminate(trailing_metadata, code, details) 76 77 def _abort(self, code, details): 78 self._terminate(_common.FUSSED_EMPTY_METADATA, code, details) 79 80 def add_rpc_error(self, rpc_error): 81 with self._condition: 82 self._rpc_errors.append(rpc_error) 83 84 def application_cancel(self): 85 with self._condition: 86 self._abort(grpc.StatusCode.CANCELLED, 87 'Cancelled by server-side application!') 88 89 def application_exception_abort(self, exception): 90 with self._condition: 91 if exception not in self._rpc_errors: 92 _LOGGER.exception('Exception calling application!') 93 self._abort( 94 grpc.StatusCode.UNKNOWN, 95 'Exception calling application: {}'.format(exception)) 96 97 def extrinsic_abort(self): 98 with self._condition: 99 if self._active: 100 self._active = False 101 self._call_back() 102 self._condition.notify_all() 103 104 def unary_response_complete(self, response): 105 with self._condition: 106 self._ensure_initial_metadata_sent() 107 self._handler.add_response(response) 108 self._complete() 109 110 def stream_response(self, response): 111 with self._condition: 112 self._ensure_initial_metadata_sent() 113 self._handler.add_response(response) 114 115 def stream_response_complete(self): 116 with self._condition: 117 self._ensure_initial_metadata_sent() 118 self._complete() 119 120 def send_initial_metadata(self, initial_metadata): 121 with self._condition: 122 if self._initial_metadata_sent: 123 return False 124 else: 125 self._handler.send_initial_metadata(initial_metadata) 126 self._initial_metadata_sent = True 127 return True 128 129 def is_active(self): 130 with self._condition: 131 return self._active 132 133 def add_callback(self, callback): 134 with self._condition: 135 if self._callbacks is None: 136 return False 137 else: 138 self._callbacks.append(callback) 139 return True 140 141 def invocation_metadata(self): 142 with self._condition: 143 return self._invocation_metadata 144 145 def set_trailing_metadata(self, trailing_metadata): 146 with self._condition: 147 self._pending_trailing_metadata = trailing_metadata 148 149 def set_code(self, code): 150 with self._condition: 151 self._pending_code = code 152 153 def set_details(self, details): 154 with self._condition: 155 self._pending_details = details 156