1# Copyright 2016 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import logging 16import unittest 17 18import grpc 19 20from tests.unit import test_common 21from tests.unit.framework.common import test_constants 22from tests.unit.framework.common import test_control 23 24_SERIALIZE_REQUEST = lambda bytestring: bytestring * 2 25_DESERIALIZE_REQUEST = lambda bytestring: bytestring[len(bytestring) // 2 :] 26_SERIALIZE_RESPONSE = lambda bytestring: bytestring * 3 27_DESERIALIZE_RESPONSE = lambda bytestring: bytestring[: len(bytestring) // 3] 28 29_SERVICE_NAME = "test" 30_UNARY_UNARY = "UnaryUnary" 31_UNARY_UNARY_NESTED_EXCEPTION = "UnaryUnaryNestedException" 32_UNARY_STREAM = "UnaryStream" 33_STREAM_UNARY = "StreamUnary" 34_STREAM_STREAM = "StreamStream" 35_DEFECTIVE_GENERIC_RPC_HANDLER = "DefectiveGenericRpcHandler" 36 37 38class _Handler(object): 39 def __init__(self, control): 40 self._control = control 41 42 def handle_unary_unary(self, request, servicer_context): 43 self._control.control() 44 if servicer_context is not None: 45 servicer_context.set_trailing_metadata( 46 ( 47 ( 48 "testkey", 49 "testvalue", 50 ), 51 ) 52 ) 53 return request 54 55 def handle_unary_unary_with_nested_exception( 56 self, request, servicer_context 57 ): 58 raise test_control.NestedDefect() 59 60 def handle_unary_stream(self, request, servicer_context): 61 for _ in range(test_constants.STREAM_LENGTH): 62 self._control.control() 63 yield request 64 self._control.control() 65 if servicer_context is not None: 66 servicer_context.set_trailing_metadata( 67 ( 68 ( 69 "testkey", 70 "testvalue", 71 ), 72 ) 73 ) 74 75 def handle_stream_unary(self, request_iterator, servicer_context): 76 if servicer_context is not None: 77 servicer_context.invocation_metadata() 78 self._control.control() 79 response_elements = [] 80 for request in request_iterator: 81 self._control.control() 82 response_elements.append(request) 83 self._control.control() 84 if servicer_context is not None: 85 servicer_context.set_trailing_metadata( 86 ( 87 ( 88 "testkey", 89 "testvalue", 90 ), 91 ) 92 ) 93 return b"".join(response_elements) 94 95 def handle_stream_stream(self, request_iterator, servicer_context): 96 self._control.control() 97 if servicer_context is not None: 98 servicer_context.set_trailing_metadata( 99 ( 100 ( 101 "testkey", 102 "testvalue", 103 ), 104 ) 105 ) 106 for request in request_iterator: 107 self._control.control() 108 yield request 109 self._control.control() 110 111 def defective_generic_rpc_handler(self): 112 raise test_control.Defect() 113 114 115class _MethodHandler(grpc.RpcMethodHandler): 116 def __init__( 117 self, 118 request_streaming, 119 response_streaming, 120 request_deserializer, 121 response_serializer, 122 unary_unary, 123 unary_stream, 124 stream_unary, 125 stream_stream, 126 ): 127 self.request_streaming = request_streaming 128 self.response_streaming = response_streaming 129 self.request_deserializer = request_deserializer 130 self.response_serializer = response_serializer 131 self.unary_unary = unary_unary 132 self.unary_stream = unary_stream 133 self.stream_unary = stream_unary 134 self.stream_stream = stream_stream 135 136 137def get_method_handlers(handler): 138 return { 139 _UNARY_UNARY: _MethodHandler( 140 False, 141 False, 142 None, 143 None, 144 handler.handle_unary_unary, 145 None, 146 None, 147 None, 148 ), 149 _UNARY_STREAM: _MethodHandler( 150 False, 151 True, 152 _DESERIALIZE_REQUEST, 153 _SERIALIZE_RESPONSE, 154 None, 155 handler.handle_unary_stream, 156 None, 157 None, 158 ), 159 _STREAM_UNARY: _MethodHandler( 160 True, 161 False, 162 _DESERIALIZE_REQUEST, 163 _SERIALIZE_RESPONSE, 164 None, 165 None, 166 handler.handle_stream_unary, 167 None, 168 ), 169 _STREAM_STREAM: _MethodHandler( 170 True, 171 True, 172 None, 173 None, 174 None, 175 None, 176 None, 177 handler.handle_stream_stream, 178 ), 179 _DEFECTIVE_GENERIC_RPC_HANDLER: _MethodHandler( 180 False, 181 False, 182 None, 183 None, 184 handler.defective_generic_rpc_handler, 185 None, 186 None, 187 None, 188 ), 189 _UNARY_UNARY_NESTED_EXCEPTION: _MethodHandler( 190 False, 191 False, 192 None, 193 None, 194 handler.handle_unary_unary_with_nested_exception, 195 None, 196 None, 197 None, 198 ), 199 } 200 201 202class FailAfterFewIterationsCounter(object): 203 def __init__(self, high, bytestring): 204 self._current = 0 205 self._high = high 206 self._bytestring = bytestring 207 208 def __iter__(self): 209 return self 210 211 def __next__(self): 212 if self._current >= self._high: 213 raise test_control.Defect() 214 else: 215 self._current += 1 216 return self._bytestring 217 218 next = __next__ 219 220 221def _unary_unary_multi_callable(channel): 222 return channel.unary_unary( 223 grpc._common.fully_qualified_method(_SERVICE_NAME, _UNARY_UNARY), 224 _registered_method=True, 225 ) 226 227 228def _unary_stream_multi_callable(channel): 229 return channel.unary_stream( 230 grpc._common.fully_qualified_method(_SERVICE_NAME, _UNARY_STREAM), 231 request_serializer=_SERIALIZE_REQUEST, 232 response_deserializer=_DESERIALIZE_RESPONSE, 233 _registered_method=True, 234 ) 235 236 237def _stream_unary_multi_callable(channel): 238 return channel.stream_unary( 239 grpc._common.fully_qualified_method(_SERVICE_NAME, _STREAM_UNARY), 240 request_serializer=_SERIALIZE_REQUEST, 241 response_deserializer=_DESERIALIZE_RESPONSE, 242 _registered_method=True, 243 ) 244 245 246def _stream_stream_multi_callable(channel): 247 return channel.stream_stream( 248 grpc._common.fully_qualified_method(_SERVICE_NAME, _STREAM_STREAM), 249 _registered_method=True, 250 ) 251 252 253def _defective_handler_multi_callable(channel): 254 return channel.unary_unary( 255 grpc._common.fully_qualified_method( 256 _SERVICE_NAME, _DEFECTIVE_GENERIC_RPC_HANDLER 257 ), 258 _registered_method=True, 259 ) 260 261 262def _defective_nested_exception_handler_multi_callable(channel): 263 return channel.unary_unary( 264 grpc._common.fully_qualified_method( 265 _SERVICE_NAME, _UNARY_UNARY_NESTED_EXCEPTION 266 ), 267 _registered_method=True, 268 ) 269 270 271class InvocationDefectsTest(unittest.TestCase): 272 """Tests the handling of exception-raising user code on the client-side.""" 273 274 def setUp(self): 275 self._control = test_control.PauseFailControl() 276 self._handler = _Handler(self._control) 277 278 self._server = test_common.test_server() 279 port = self._server.add_insecure_port("[::]:0") 280 self._server.add_registered_method_handlers( 281 _SERVICE_NAME, get_method_handlers(self._handler) 282 ) 283 self._server.start() 284 285 self._channel = grpc.insecure_channel("localhost:%d" % port) 286 287 def tearDown(self): 288 self._server.stop(0) 289 self._channel.close() 290 291 def testIterableStreamRequestBlockingUnaryResponse(self): 292 requests = object() 293 multi_callable = _stream_unary_multi_callable(self._channel) 294 295 with self.assertRaises(grpc.RpcError) as exception_context: 296 multi_callable( 297 requests, 298 metadata=( 299 ("test", "IterableStreamRequestBlockingUnaryResponse"), 300 ), 301 ) 302 303 self.assertIs( 304 grpc.StatusCode.UNKNOWN, exception_context.exception.code() 305 ) 306 307 def testIterableStreamRequestFutureUnaryResponse(self): 308 requests = object() 309 multi_callable = _stream_unary_multi_callable(self._channel) 310 response_future = multi_callable.future( 311 requests, 312 metadata=(("test", "IterableStreamRequestFutureUnaryResponse"),), 313 ) 314 315 with self.assertRaises(grpc.RpcError) as exception_context: 316 response_future.result() 317 318 self.assertIs( 319 grpc.StatusCode.UNKNOWN, exception_context.exception.code() 320 ) 321 322 def testIterableStreamRequestStreamResponse(self): 323 requests = object() 324 multi_callable = _stream_stream_multi_callable(self._channel) 325 response_iterator = multi_callable( 326 requests, 327 metadata=(("test", "IterableStreamRequestStreamResponse"),), 328 ) 329 330 with self.assertRaises(grpc.RpcError) as exception_context: 331 next(response_iterator) 332 333 self.assertIs( 334 grpc.StatusCode.UNKNOWN, exception_context.exception.code() 335 ) 336 337 def testIteratorStreamRequestStreamResponse(self): 338 requests_iterator = FailAfterFewIterationsCounter( 339 test_constants.STREAM_LENGTH // 2, b"\x07\x08" 340 ) 341 multi_callable = _stream_stream_multi_callable(self._channel) 342 response_iterator = multi_callable( 343 requests_iterator, 344 metadata=(("test", "IteratorStreamRequestStreamResponse"),), 345 ) 346 347 with self.assertRaises(grpc.RpcError) as exception_context: 348 for _ in range(test_constants.STREAM_LENGTH // 2 + 1): 349 next(response_iterator) 350 351 self.assertIs( 352 grpc.StatusCode.UNKNOWN, exception_context.exception.code() 353 ) 354 355 def testDefectiveGenericRpcHandlerUnaryResponse(self): 356 request = b"\x07\x08" 357 multi_callable = _defective_handler_multi_callable(self._channel) 358 359 with self.assertRaises(grpc.RpcError) as exception_context: 360 multi_callable( 361 request, metadata=(("test", "DefectiveGenericRpcHandlerUnary"),) 362 ) 363 364 self.assertIs( 365 grpc.StatusCode.UNKNOWN, exception_context.exception.code() 366 ) 367 368 def testNestedExceptionGenericRpcHandlerUnaryResponse(self): 369 request = b"\x07\x08" 370 multi_callable = _defective_nested_exception_handler_multi_callable( 371 self._channel 372 ) 373 374 with self.assertRaises(grpc.RpcError) as exception_context: 375 multi_callable( 376 request, metadata=(("test", "DefectiveGenericRpcHandlerUnary"),) 377 ) 378 379 self.assertIs( 380 grpc.StatusCode.UNKNOWN, exception_context.exception.code() 381 ) 382 383 384if __name__ == "__main__": 385 logging.basicConfig() 386 unittest.main(verbosity=2) 387