1# Copyright 2016 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Test of RPCs made against gRPC Python's application-layer API.""" 15 16import itertools 17import threading 18import unittest 19from concurrent import futures 20 21import grpc 22from grpc.framework.foundation import logging_pool 23 24from tests.unit import test_common 25from tests.unit.framework.common import test_constants 26from tests.unit.framework.common import test_control 27 28_SERIALIZE_REQUEST = lambda bytestring: bytestring * 2 29_DESERIALIZE_REQUEST = lambda bytestring: bytestring[len(bytestring) // 2:] 30_SERIALIZE_RESPONSE = lambda bytestring: bytestring * 3 31_DESERIALIZE_RESPONSE = lambda bytestring: bytestring[:len(bytestring) // 3] 32 33_UNARY_UNARY = '/test/UnaryUnary' 34_UNARY_STREAM = '/test/UnaryStream' 35_STREAM_UNARY = '/test/StreamUnary' 36_STREAM_STREAM = '/test/StreamStream' 37 38 39class _Callback(object): 40 41 def __init__(self): 42 self._condition = threading.Condition() 43 self._value = None 44 self._called = False 45 46 def __call__(self, value): 47 with self._condition: 48 self._value = value 49 self._called = True 50 self._condition.notify_all() 51 52 def value(self): 53 with self._condition: 54 while not self._called: 55 self._condition.wait() 56 return self._value 57 58 59class _Handler(object): 60 61 def __init__(self, control): 62 self._control = control 63 64 def handle_unary_unary(self, request, servicer_context): 65 self._control.control() 66 if servicer_context is not None: 67 servicer_context.set_trailing_metadata((( 68 'testkey', 69 'testvalue', 70 ),)) 71 # TODO(https://github.com/grpc/grpc/issues/8483): test the values 72 # returned by these methods rather than only "smoke" testing that 73 # the return after having been called. 74 servicer_context.is_active() 75 servicer_context.time_remaining() 76 return request 77 78 def handle_unary_stream(self, request, servicer_context): 79 for _ in range(test_constants.STREAM_LENGTH): 80 self._control.control() 81 yield request 82 self._control.control() 83 if servicer_context is not None: 84 servicer_context.set_trailing_metadata((( 85 'testkey', 86 'testvalue', 87 ),)) 88 89 def handle_stream_unary(self, request_iterator, servicer_context): 90 if servicer_context is not None: 91 servicer_context.invocation_metadata() 92 self._control.control() 93 response_elements = [] 94 for request in request_iterator: 95 self._control.control() 96 response_elements.append(request) 97 self._control.control() 98 if servicer_context is not None: 99 servicer_context.set_trailing_metadata((( 100 'testkey', 101 'testvalue', 102 ),)) 103 return b''.join(response_elements) 104 105 def handle_stream_stream(self, request_iterator, servicer_context): 106 self._control.control() 107 if servicer_context is not None: 108 servicer_context.set_trailing_metadata((( 109 'testkey', 110 'testvalue', 111 ),)) 112 for request in request_iterator: 113 self._control.control() 114 yield request 115 self._control.control() 116 117 118class _MethodHandler(grpc.RpcMethodHandler): 119 120 def __init__(self, request_streaming, response_streaming, 121 request_deserializer, response_serializer, unary_unary, 122 unary_stream, stream_unary, stream_stream): 123 self.request_streaming = request_streaming 124 self.response_streaming = response_streaming 125 self.request_deserializer = request_deserializer 126 self.response_serializer = response_serializer 127 self.unary_unary = unary_unary 128 self.unary_stream = unary_stream 129 self.stream_unary = stream_unary 130 self.stream_stream = stream_stream 131 132 133class _GenericHandler(grpc.GenericRpcHandler): 134 135 def __init__(self, handler): 136 self._handler = handler 137 138 def service(self, handler_call_details): 139 if handler_call_details.method == _UNARY_UNARY: 140 return _MethodHandler(False, False, None, None, 141 self._handler.handle_unary_unary, None, None, 142 None) 143 elif handler_call_details.method == _UNARY_STREAM: 144 return _MethodHandler(False, True, _DESERIALIZE_REQUEST, 145 _SERIALIZE_RESPONSE, None, 146 self._handler.handle_unary_stream, None, None) 147 elif handler_call_details.method == _STREAM_UNARY: 148 return _MethodHandler(True, False, _DESERIALIZE_REQUEST, 149 _SERIALIZE_RESPONSE, None, None, 150 self._handler.handle_stream_unary, None) 151 elif handler_call_details.method == _STREAM_STREAM: 152 return _MethodHandler(True, True, None, None, None, None, None, 153 self._handler.handle_stream_stream) 154 else: 155 return None 156 157 158def _unary_unary_multi_callable(channel): 159 return channel.unary_unary(_UNARY_UNARY) 160 161 162def _unary_stream_multi_callable(channel): 163 return channel.unary_stream( 164 _UNARY_STREAM, 165 request_serializer=_SERIALIZE_REQUEST, 166 response_deserializer=_DESERIALIZE_RESPONSE) 167 168 169def _stream_unary_multi_callable(channel): 170 return channel.stream_unary( 171 _STREAM_UNARY, 172 request_serializer=_SERIALIZE_REQUEST, 173 response_deserializer=_DESERIALIZE_RESPONSE) 174 175 176def _stream_stream_multi_callable(channel): 177 return channel.stream_stream(_STREAM_STREAM) 178 179 180class RPCTest(unittest.TestCase): 181 182 def setUp(self): 183 self._control = test_control.PauseFailControl() 184 self._handler = _Handler(self._control) 185 186 self._server = test_common.test_server() 187 port = self._server.add_insecure_port('[::]:0') 188 self._server.add_generic_rpc_handlers((_GenericHandler(self._handler),)) 189 self._server.start() 190 191 self._channel = grpc.insecure_channel('localhost:%d' % port) 192 193 def tearDown(self): 194 self._server.stop(None) 195 196 def testUnrecognizedMethod(self): 197 request = b'abc' 198 199 with self.assertRaises(grpc.RpcError) as exception_context: 200 self._channel.unary_unary('NoSuchMethod')(request) 201 202 self.assertEqual(grpc.StatusCode.UNIMPLEMENTED, 203 exception_context.exception.code()) 204 205 def testSuccessfulUnaryRequestBlockingUnaryResponse(self): 206 request = b'\x07\x08' 207 expected_response = self._handler.handle_unary_unary(request, None) 208 209 multi_callable = _unary_unary_multi_callable(self._channel) 210 response = multi_callable( 211 request, 212 metadata=(('test', 'SuccessfulUnaryRequestBlockingUnaryResponse'),)) 213 214 self.assertEqual(expected_response, response) 215 216 def testSuccessfulUnaryRequestBlockingUnaryResponseWithCall(self): 217 request = b'\x07\x08' 218 expected_response = self._handler.handle_unary_unary(request, None) 219 220 multi_callable = _unary_unary_multi_callable(self._channel) 221 response, call = multi_callable.with_call( 222 request, 223 metadata=(('test', 224 'SuccessfulUnaryRequestBlockingUnaryResponseWithCall'),)) 225 226 self.assertEqual(expected_response, response) 227 self.assertIs(grpc.StatusCode.OK, call.code()) 228 self.assertEqual("", call.debug_error_string()) 229 230 def testSuccessfulUnaryRequestFutureUnaryResponse(self): 231 request = b'\x07\x08' 232 expected_response = self._handler.handle_unary_unary(request, None) 233 234 multi_callable = _unary_unary_multi_callable(self._channel) 235 response_future = multi_callable.future( 236 request, 237 metadata=(('test', 'SuccessfulUnaryRequestFutureUnaryResponse'),)) 238 response = response_future.result() 239 240 self.assertIsInstance(response_future, grpc.Future) 241 self.assertIsInstance(response_future, grpc.Call) 242 self.assertEqual(expected_response, response) 243 self.assertIsNone(response_future.exception()) 244 self.assertIsNone(response_future.traceback()) 245 246 def testSuccessfulUnaryRequestStreamResponse(self): 247 request = b'\x37\x58' 248 expected_responses = tuple( 249 self._handler.handle_unary_stream(request, None)) 250 251 multi_callable = _unary_stream_multi_callable(self._channel) 252 response_iterator = multi_callable( 253 request, 254 metadata=(('test', 'SuccessfulUnaryRequestStreamResponse'),)) 255 responses = tuple(response_iterator) 256 257 self.assertSequenceEqual(expected_responses, responses) 258 259 def testSuccessfulStreamRequestBlockingUnaryResponse(self): 260 requests = tuple( 261 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 262 expected_response = self._handler.handle_stream_unary( 263 iter(requests), None) 264 request_iterator = iter(requests) 265 266 multi_callable = _stream_unary_multi_callable(self._channel) 267 response = multi_callable( 268 request_iterator, 269 metadata=(('test', 270 'SuccessfulStreamRequestBlockingUnaryResponse'),)) 271 272 self.assertEqual(expected_response, response) 273 274 def testSuccessfulStreamRequestBlockingUnaryResponseWithCall(self): 275 requests = tuple( 276 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 277 expected_response = self._handler.handle_stream_unary( 278 iter(requests), None) 279 request_iterator = iter(requests) 280 281 multi_callable = _stream_unary_multi_callable(self._channel) 282 response, call = multi_callable.with_call( 283 request_iterator, 284 metadata=( 285 ('test', 286 'SuccessfulStreamRequestBlockingUnaryResponseWithCall'),)) 287 288 self.assertEqual(expected_response, response) 289 self.assertIs(grpc.StatusCode.OK, call.code()) 290 291 def testSuccessfulStreamRequestFutureUnaryResponse(self): 292 requests = tuple( 293 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 294 expected_response = self._handler.handle_stream_unary( 295 iter(requests), None) 296 request_iterator = iter(requests) 297 298 multi_callable = _stream_unary_multi_callable(self._channel) 299 response_future = multi_callable.future( 300 request_iterator, 301 metadata=(('test', 'SuccessfulStreamRequestFutureUnaryResponse'),)) 302 response = response_future.result() 303 304 self.assertEqual(expected_response, response) 305 self.assertIsNone(response_future.exception()) 306 self.assertIsNone(response_future.traceback()) 307 308 def testSuccessfulStreamRequestStreamResponse(self): 309 requests = tuple( 310 b'\x77\x58' for _ in range(test_constants.STREAM_LENGTH)) 311 expected_responses = tuple( 312 self._handler.handle_stream_stream(iter(requests), None)) 313 request_iterator = iter(requests) 314 315 multi_callable = _stream_stream_multi_callable(self._channel) 316 response_iterator = multi_callable( 317 request_iterator, 318 metadata=(('test', 'SuccessfulStreamRequestStreamResponse'),)) 319 responses = tuple(response_iterator) 320 321 self.assertSequenceEqual(expected_responses, responses) 322 323 def testSequentialInvocations(self): 324 first_request = b'\x07\x08' 325 second_request = b'\x0809' 326 expected_first_response = self._handler.handle_unary_unary( 327 first_request, None) 328 expected_second_response = self._handler.handle_unary_unary( 329 second_request, None) 330 331 multi_callable = _unary_unary_multi_callable(self._channel) 332 first_response = multi_callable( 333 first_request, metadata=(('test', 'SequentialInvocations'),)) 334 second_response = multi_callable( 335 second_request, metadata=(('test', 'SequentialInvocations'),)) 336 337 self.assertEqual(expected_first_response, first_response) 338 self.assertEqual(expected_second_response, second_response) 339 340 def testConcurrentBlockingInvocations(self): 341 pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) 342 requests = tuple( 343 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 344 expected_response = self._handler.handle_stream_unary( 345 iter(requests), None) 346 expected_responses = [expected_response 347 ] * test_constants.THREAD_CONCURRENCY 348 response_futures = [None] * test_constants.THREAD_CONCURRENCY 349 350 multi_callable = _stream_unary_multi_callable(self._channel) 351 for index in range(test_constants.THREAD_CONCURRENCY): 352 request_iterator = iter(requests) 353 response_future = pool.submit( 354 multi_callable, 355 request_iterator, 356 metadata=(('test', 'ConcurrentBlockingInvocations'),)) 357 response_futures[index] = response_future 358 responses = tuple( 359 response_future.result() for response_future in response_futures) 360 361 pool.shutdown(wait=True) 362 self.assertSequenceEqual(expected_responses, responses) 363 364 def testConcurrentFutureInvocations(self): 365 requests = tuple( 366 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 367 expected_response = self._handler.handle_stream_unary( 368 iter(requests), None) 369 expected_responses = [expected_response 370 ] * test_constants.THREAD_CONCURRENCY 371 response_futures = [None] * test_constants.THREAD_CONCURRENCY 372 373 multi_callable = _stream_unary_multi_callable(self._channel) 374 for index in range(test_constants.THREAD_CONCURRENCY): 375 request_iterator = iter(requests) 376 response_future = multi_callable.future( 377 request_iterator, 378 metadata=(('test', 'ConcurrentFutureInvocations'),)) 379 response_futures[index] = response_future 380 responses = tuple( 381 response_future.result() for response_future in response_futures) 382 383 self.assertSequenceEqual(expected_responses, responses) 384 385 def testWaitingForSomeButNotAllConcurrentFutureInvocations(self): 386 pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) 387 request = b'\x67\x68' 388 expected_response = self._handler.handle_unary_unary(request, None) 389 response_futures = [None] * test_constants.THREAD_CONCURRENCY 390 lock = threading.Lock() 391 test_is_running_cell = [True] 392 393 def wrap_future(future): 394 395 def wrap(): 396 try: 397 return future.result() 398 except grpc.RpcError: 399 with lock: 400 if test_is_running_cell[0]: 401 raise 402 return None 403 404 return wrap 405 406 multi_callable = _unary_unary_multi_callable(self._channel) 407 for index in range(test_constants.THREAD_CONCURRENCY): 408 inner_response_future = multi_callable.future( 409 request, 410 metadata=( 411 ('test', 412 'WaitingForSomeButNotAllConcurrentFutureInvocations'),)) 413 outer_response_future = pool.submit( 414 wrap_future(inner_response_future)) 415 response_futures[index] = outer_response_future 416 417 some_completed_response_futures_iterator = itertools.islice( 418 futures.as_completed(response_futures), 419 test_constants.THREAD_CONCURRENCY // 2) 420 for response_future in some_completed_response_futures_iterator: 421 self.assertEqual(expected_response, response_future.result()) 422 with lock: 423 test_is_running_cell[0] = False 424 425 def testConsumingOneStreamResponseUnaryRequest(self): 426 request = b'\x57\x38' 427 428 multi_callable = _unary_stream_multi_callable(self._channel) 429 response_iterator = multi_callable( 430 request, 431 metadata=(('test', 'ConsumingOneStreamResponseUnaryRequest'),)) 432 next(response_iterator) 433 434 def testConsumingSomeButNotAllStreamResponsesUnaryRequest(self): 435 request = b'\x57\x38' 436 437 multi_callable = _unary_stream_multi_callable(self._channel) 438 response_iterator = multi_callable( 439 request, 440 metadata=(('test', 441 'ConsumingSomeButNotAllStreamResponsesUnaryRequest'),)) 442 for _ in range(test_constants.STREAM_LENGTH // 2): 443 next(response_iterator) 444 445 def testConsumingSomeButNotAllStreamResponsesStreamRequest(self): 446 requests = tuple( 447 b'\x67\x88' for _ in range(test_constants.STREAM_LENGTH)) 448 request_iterator = iter(requests) 449 450 multi_callable = _stream_stream_multi_callable(self._channel) 451 response_iterator = multi_callable( 452 request_iterator, 453 metadata=(('test', 454 'ConsumingSomeButNotAllStreamResponsesStreamRequest'),)) 455 for _ in range(test_constants.STREAM_LENGTH // 2): 456 next(response_iterator) 457 458 def testConsumingTooManyStreamResponsesStreamRequest(self): 459 requests = tuple( 460 b'\x67\x88' for _ in range(test_constants.STREAM_LENGTH)) 461 request_iterator = iter(requests) 462 463 multi_callable = _stream_stream_multi_callable(self._channel) 464 response_iterator = multi_callable( 465 request_iterator, 466 metadata=(('test', 467 'ConsumingTooManyStreamResponsesStreamRequest'),)) 468 for _ in range(test_constants.STREAM_LENGTH): 469 next(response_iterator) 470 for _ in range(test_constants.STREAM_LENGTH): 471 with self.assertRaises(StopIteration): 472 next(response_iterator) 473 474 self.assertIsNotNone(response_iterator.initial_metadata()) 475 self.assertIs(grpc.StatusCode.OK, response_iterator.code()) 476 self.assertIsNotNone(response_iterator.details()) 477 self.assertIsNotNone(response_iterator.trailing_metadata()) 478 479 def testCancelledUnaryRequestUnaryResponse(self): 480 request = b'\x07\x17' 481 482 multi_callable = _unary_unary_multi_callable(self._channel) 483 with self._control.pause(): 484 response_future = multi_callable.future( 485 request, 486 metadata=(('test', 'CancelledUnaryRequestUnaryResponse'),)) 487 response_future.cancel() 488 489 self.assertTrue(response_future.cancelled()) 490 with self.assertRaises(grpc.FutureCancelledError): 491 response_future.result() 492 with self.assertRaises(grpc.FutureCancelledError): 493 response_future.exception() 494 with self.assertRaises(grpc.FutureCancelledError): 495 response_future.traceback() 496 self.assertIs(grpc.StatusCode.CANCELLED, response_future.code()) 497 498 def testCancelledUnaryRequestStreamResponse(self): 499 request = b'\x07\x19' 500 501 multi_callable = _unary_stream_multi_callable(self._channel) 502 with self._control.pause(): 503 response_iterator = multi_callable( 504 request, 505 metadata=(('test', 'CancelledUnaryRequestStreamResponse'),)) 506 self._control.block_until_paused() 507 response_iterator.cancel() 508 509 with self.assertRaises(grpc.RpcError) as exception_context: 510 next(response_iterator) 511 self.assertIs(grpc.StatusCode.CANCELLED, 512 exception_context.exception.code()) 513 self.assertIsNotNone(response_iterator.initial_metadata()) 514 self.assertIs(grpc.StatusCode.CANCELLED, response_iterator.code()) 515 self.assertIsNotNone(response_iterator.details()) 516 self.assertIsNotNone(response_iterator.trailing_metadata()) 517 518 def testCancelledStreamRequestUnaryResponse(self): 519 requests = tuple( 520 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 521 request_iterator = iter(requests) 522 523 multi_callable = _stream_unary_multi_callable(self._channel) 524 with self._control.pause(): 525 response_future = multi_callable.future( 526 request_iterator, 527 metadata=(('test', 'CancelledStreamRequestUnaryResponse'),)) 528 self._control.block_until_paused() 529 response_future.cancel() 530 531 self.assertTrue(response_future.cancelled()) 532 with self.assertRaises(grpc.FutureCancelledError): 533 response_future.result() 534 with self.assertRaises(grpc.FutureCancelledError): 535 response_future.exception() 536 with self.assertRaises(grpc.FutureCancelledError): 537 response_future.traceback() 538 self.assertIsNotNone(response_future.initial_metadata()) 539 self.assertIs(grpc.StatusCode.CANCELLED, response_future.code()) 540 self.assertIsNotNone(response_future.details()) 541 self.assertIsNotNone(response_future.trailing_metadata()) 542 543 def testCancelledStreamRequestStreamResponse(self): 544 requests = tuple( 545 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 546 request_iterator = iter(requests) 547 548 multi_callable = _stream_stream_multi_callable(self._channel) 549 with self._control.pause(): 550 response_iterator = multi_callable( 551 request_iterator, 552 metadata=(('test', 'CancelledStreamRequestStreamResponse'),)) 553 response_iterator.cancel() 554 555 with self.assertRaises(grpc.RpcError): 556 next(response_iterator) 557 self.assertIsNotNone(response_iterator.initial_metadata()) 558 self.assertIs(grpc.StatusCode.CANCELLED, response_iterator.code()) 559 self.assertIsNotNone(response_iterator.details()) 560 self.assertIsNotNone(response_iterator.trailing_metadata()) 561 562 def testExpiredUnaryRequestBlockingUnaryResponse(self): 563 request = b'\x07\x17' 564 565 multi_callable = _unary_unary_multi_callable(self._channel) 566 with self._control.pause(): 567 with self.assertRaises(grpc.RpcError) as exception_context: 568 multi_callable.with_call( 569 request, 570 timeout=test_constants.SHORT_TIMEOUT, 571 metadata=(('test', 572 'ExpiredUnaryRequestBlockingUnaryResponse'),)) 573 574 self.assertIsInstance(exception_context.exception, grpc.Call) 575 self.assertIsNotNone(exception_context.exception.initial_metadata()) 576 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, 577 exception_context.exception.code()) 578 self.assertIsNotNone(exception_context.exception.details()) 579 self.assertIsNotNone(exception_context.exception.trailing_metadata()) 580 581 def testExpiredUnaryRequestFutureUnaryResponse(self): 582 request = b'\x07\x17' 583 callback = _Callback() 584 585 multi_callable = _unary_unary_multi_callable(self._channel) 586 with self._control.pause(): 587 response_future = multi_callable.future( 588 request, 589 timeout=test_constants.SHORT_TIMEOUT, 590 metadata=(('test', 'ExpiredUnaryRequestFutureUnaryResponse'),)) 591 response_future.add_done_callback(callback) 592 value_passed_to_callback = callback.value() 593 594 self.assertIs(response_future, value_passed_to_callback) 595 self.assertIsNotNone(response_future.initial_metadata()) 596 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, response_future.code()) 597 self.assertIsNotNone(response_future.details()) 598 self.assertIsNotNone(response_future.trailing_metadata()) 599 with self.assertRaises(grpc.RpcError) as exception_context: 600 response_future.result() 601 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, 602 exception_context.exception.code()) 603 self.assertIsInstance(response_future.exception(), grpc.RpcError) 604 self.assertIsNotNone(response_future.traceback()) 605 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, 606 response_future.exception().code()) 607 608 def testExpiredUnaryRequestStreamResponse(self): 609 request = b'\x07\x19' 610 611 multi_callable = _unary_stream_multi_callable(self._channel) 612 with self._control.pause(): 613 with self.assertRaises(grpc.RpcError) as exception_context: 614 response_iterator = multi_callable( 615 request, 616 timeout=test_constants.SHORT_TIMEOUT, 617 metadata=(('test', 'ExpiredUnaryRequestStreamResponse'),)) 618 next(response_iterator) 619 620 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, 621 exception_context.exception.code()) 622 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, 623 response_iterator.code()) 624 625 def testExpiredStreamRequestBlockingUnaryResponse(self): 626 requests = tuple( 627 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 628 request_iterator = iter(requests) 629 630 multi_callable = _stream_unary_multi_callable(self._channel) 631 with self._control.pause(): 632 with self.assertRaises(grpc.RpcError) as exception_context: 633 multi_callable( 634 request_iterator, 635 timeout=test_constants.SHORT_TIMEOUT, 636 metadata=(('test', 637 'ExpiredStreamRequestBlockingUnaryResponse'),)) 638 639 self.assertIsInstance(exception_context.exception, grpc.RpcError) 640 self.assertIsInstance(exception_context.exception, grpc.Call) 641 self.assertIsNotNone(exception_context.exception.initial_metadata()) 642 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, 643 exception_context.exception.code()) 644 self.assertIsNotNone(exception_context.exception.details()) 645 self.assertIsNotNone(exception_context.exception.trailing_metadata()) 646 647 def testExpiredStreamRequestFutureUnaryResponse(self): 648 requests = tuple( 649 b'\x07\x18' for _ in range(test_constants.STREAM_LENGTH)) 650 request_iterator = iter(requests) 651 callback = _Callback() 652 653 multi_callable = _stream_unary_multi_callable(self._channel) 654 with self._control.pause(): 655 response_future = multi_callable.future( 656 request_iterator, 657 timeout=test_constants.SHORT_TIMEOUT, 658 metadata=(('test', 'ExpiredStreamRequestFutureUnaryResponse'),)) 659 with self.assertRaises(grpc.FutureTimeoutError): 660 response_future.result( 661 timeout=test_constants.SHORT_TIMEOUT / 2.0) 662 response_future.add_done_callback(callback) 663 value_passed_to_callback = callback.value() 664 665 with self.assertRaises(grpc.RpcError) as exception_context: 666 response_future.result() 667 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, response_future.code()) 668 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, 669 exception_context.exception.code()) 670 self.assertIsInstance(response_future.exception(), grpc.RpcError) 671 self.assertIsNotNone(response_future.traceback()) 672 self.assertIs(response_future, value_passed_to_callback) 673 self.assertIsNotNone(response_future.initial_metadata()) 674 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, response_future.code()) 675 self.assertIsNotNone(response_future.details()) 676 self.assertIsNotNone(response_future.trailing_metadata()) 677 678 def testExpiredStreamRequestStreamResponse(self): 679 requests = tuple( 680 b'\x67\x18' for _ in range(test_constants.STREAM_LENGTH)) 681 request_iterator = iter(requests) 682 683 multi_callable = _stream_stream_multi_callable(self._channel) 684 with self._control.pause(): 685 with self.assertRaises(grpc.RpcError) as exception_context: 686 response_iterator = multi_callable( 687 request_iterator, 688 timeout=test_constants.SHORT_TIMEOUT, 689 metadata=(('test', 'ExpiredStreamRequestStreamResponse'),)) 690 next(response_iterator) 691 692 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, 693 exception_context.exception.code()) 694 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, 695 response_iterator.code()) 696 697 def testFailedUnaryRequestBlockingUnaryResponse(self): 698 request = b'\x37\x17' 699 700 multi_callable = _unary_unary_multi_callable(self._channel) 701 with self._control.fail(): 702 with self.assertRaises(grpc.RpcError) as exception_context: 703 multi_callable.with_call( 704 request, 705 metadata=(('test', 706 'FailedUnaryRequestBlockingUnaryResponse'),)) 707 708 self.assertIs(grpc.StatusCode.UNKNOWN, 709 exception_context.exception.code()) 710 # sanity checks on to make sure returned string contains default members 711 # of the error 712 debug_error_string = exception_context.exception.debug_error_string() 713 self.assertIn("created", debug_error_string) 714 self.assertIn("description", debug_error_string) 715 self.assertIn("file", debug_error_string) 716 self.assertIn("file_line", debug_error_string) 717 718 def testFailedUnaryRequestFutureUnaryResponse(self): 719 request = b'\x37\x17' 720 callback = _Callback() 721 722 multi_callable = _unary_unary_multi_callable(self._channel) 723 with self._control.fail(): 724 response_future = multi_callable.future( 725 request, 726 metadata=(('test', 'FailedUnaryRequestFutureUnaryResponse'),)) 727 response_future.add_done_callback(callback) 728 value_passed_to_callback = callback.value() 729 730 self.assertIsInstance(response_future, grpc.Future) 731 self.assertIsInstance(response_future, grpc.Call) 732 with self.assertRaises(grpc.RpcError) as exception_context: 733 response_future.result() 734 self.assertIs(grpc.StatusCode.UNKNOWN, 735 exception_context.exception.code()) 736 self.assertIsInstance(response_future.exception(), grpc.RpcError) 737 self.assertIsNotNone(response_future.traceback()) 738 self.assertIs(grpc.StatusCode.UNKNOWN, 739 response_future.exception().code()) 740 self.assertIs(response_future, value_passed_to_callback) 741 742 def testFailedUnaryRequestStreamResponse(self): 743 request = b'\x37\x17' 744 745 multi_callable = _unary_stream_multi_callable(self._channel) 746 with self.assertRaises(grpc.RpcError) as exception_context: 747 with self._control.fail(): 748 response_iterator = multi_callable( 749 request, 750 metadata=(('test', 'FailedUnaryRequestStreamResponse'),)) 751 next(response_iterator) 752 753 self.assertIs(grpc.StatusCode.UNKNOWN, 754 exception_context.exception.code()) 755 756 def testFailedStreamRequestBlockingUnaryResponse(self): 757 requests = tuple( 758 b'\x47\x58' for _ in range(test_constants.STREAM_LENGTH)) 759 request_iterator = iter(requests) 760 761 multi_callable = _stream_unary_multi_callable(self._channel) 762 with self._control.fail(): 763 with self.assertRaises(grpc.RpcError) as exception_context: 764 multi_callable( 765 request_iterator, 766 metadata=(('test', 767 'FailedStreamRequestBlockingUnaryResponse'),)) 768 769 self.assertIs(grpc.StatusCode.UNKNOWN, 770 exception_context.exception.code()) 771 772 def testFailedStreamRequestFutureUnaryResponse(self): 773 requests = tuple( 774 b'\x07\x18' for _ in range(test_constants.STREAM_LENGTH)) 775 request_iterator = iter(requests) 776 callback = _Callback() 777 778 multi_callable = _stream_unary_multi_callable(self._channel) 779 with self._control.fail(): 780 response_future = multi_callable.future( 781 request_iterator, 782 metadata=(('test', 'FailedStreamRequestFutureUnaryResponse'),)) 783 response_future.add_done_callback(callback) 784 value_passed_to_callback = callback.value() 785 786 with self.assertRaises(grpc.RpcError) as exception_context: 787 response_future.result() 788 self.assertIs(grpc.StatusCode.UNKNOWN, response_future.code()) 789 self.assertIs(grpc.StatusCode.UNKNOWN, 790 exception_context.exception.code()) 791 self.assertIsInstance(response_future.exception(), grpc.RpcError) 792 self.assertIsNotNone(response_future.traceback()) 793 self.assertIs(response_future, value_passed_to_callback) 794 795 def testFailedStreamRequestStreamResponse(self): 796 requests = tuple( 797 b'\x67\x88' for _ in range(test_constants.STREAM_LENGTH)) 798 request_iterator = iter(requests) 799 800 multi_callable = _stream_stream_multi_callable(self._channel) 801 with self._control.fail(): 802 with self.assertRaises(grpc.RpcError) as exception_context: 803 response_iterator = multi_callable( 804 request_iterator, 805 metadata=(('test', 'FailedStreamRequestStreamResponse'),)) 806 tuple(response_iterator) 807 808 self.assertIs(grpc.StatusCode.UNKNOWN, 809 exception_context.exception.code()) 810 self.assertIs(grpc.StatusCode.UNKNOWN, response_iterator.code()) 811 812 def testIgnoredUnaryRequestFutureUnaryResponse(self): 813 request = b'\x37\x17' 814 815 multi_callable = _unary_unary_multi_callable(self._channel) 816 multi_callable.future( 817 request, 818 metadata=(('test', 'IgnoredUnaryRequestFutureUnaryResponse'),)) 819 820 def testIgnoredUnaryRequestStreamResponse(self): 821 request = b'\x37\x17' 822 823 multi_callable = _unary_stream_multi_callable(self._channel) 824 multi_callable( 825 request, metadata=(('test', 'IgnoredUnaryRequestStreamResponse'),)) 826 827 def testIgnoredStreamRequestFutureUnaryResponse(self): 828 requests = tuple( 829 b'\x07\x18' for _ in range(test_constants.STREAM_LENGTH)) 830 request_iterator = iter(requests) 831 832 multi_callable = _stream_unary_multi_callable(self._channel) 833 multi_callable.future( 834 request_iterator, 835 metadata=(('test', 'IgnoredStreamRequestFutureUnaryResponse'),)) 836 837 def testIgnoredStreamRequestStreamResponse(self): 838 requests = tuple( 839 b'\x67\x88' for _ in range(test_constants.STREAM_LENGTH)) 840 request_iterator = iter(requests) 841 842 multi_callable = _stream_stream_multi_callable(self._channel) 843 multi_callable( 844 request_iterator, 845 metadata=(('test', 'IgnoredStreamRequestStreamResponse'),)) 846 847 848if __name__ == '__main__': 849 unittest.main(verbosity=2) 850