1# Copyright 2016 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Test of RPCs made against gRPC Python's application-layer API.""" 15 16import itertools 17import threading 18import unittest 19import logging 20from concurrent import futures 21 22import grpc 23from grpc.framework.foundation import logging_pool 24 25from tests.unit import test_common 26from tests.unit import thread_pool 27from tests.unit.framework.common import test_constants 28from tests.unit.framework.common import test_control 29 30_SERIALIZE_REQUEST = lambda bytestring: bytestring * 2 31_DESERIALIZE_REQUEST = lambda bytestring: bytestring[len(bytestring) // 2:] 32_SERIALIZE_RESPONSE = lambda bytestring: bytestring * 3 33_DESERIALIZE_RESPONSE = lambda bytestring: bytestring[:len(bytestring) // 3] 34 35_UNARY_UNARY = '/test/UnaryUnary' 36_UNARY_STREAM = '/test/UnaryStream' 37_UNARY_STREAM_NON_BLOCKING = '/test/UnaryStreamNonBlocking' 38_STREAM_UNARY = '/test/StreamUnary' 39_STREAM_STREAM = '/test/StreamStream' 40_STREAM_STREAM_NON_BLOCKING = '/test/StreamStreamNonBlocking' 41 42 43class _Callback(object): 44 45 def __init__(self): 46 self._condition = threading.Condition() 47 self._value = None 48 self._called = False 49 50 def __call__(self, value): 51 with self._condition: 52 self._value = value 53 self._called = True 54 self._condition.notify_all() 55 56 def value(self): 57 with self._condition: 58 while not self._called: 59 self._condition.wait() 60 return self._value 61 62 63class _Handler(object): 64 65 def __init__(self, control, thread_pool): 66 self._control = control 67 self._thread_pool = thread_pool 68 non_blocking_functions = (self.handle_unary_stream_non_blocking, 69 self.handle_stream_stream_non_blocking) 70 for non_blocking_function in non_blocking_functions: 71 non_blocking_function.__func__.experimental_non_blocking = True 72 non_blocking_function.__func__.experimental_thread_pool = self._thread_pool 73 74 def handle_unary_unary(self, request, servicer_context): 75 self._control.control() 76 if servicer_context is not None: 77 servicer_context.set_trailing_metadata((( 78 'testkey', 79 'testvalue', 80 ),)) 81 # TODO(https://github.com/grpc/grpc/issues/8483): test the values 82 # returned by these methods rather than only "smoke" testing that 83 # the return after having been called. 84 servicer_context.is_active() 85 servicer_context.time_remaining() 86 return request 87 88 def handle_unary_stream(self, request, servicer_context): 89 for _ in range(test_constants.STREAM_LENGTH): 90 self._control.control() 91 yield request 92 self._control.control() 93 if servicer_context is not None: 94 servicer_context.set_trailing_metadata((( 95 'testkey', 96 'testvalue', 97 ),)) 98 99 def handle_unary_stream_non_blocking(self, request, servicer_context, 100 on_next): 101 for _ in range(test_constants.STREAM_LENGTH): 102 self._control.control() 103 on_next(request) 104 self._control.control() 105 if servicer_context is not None: 106 servicer_context.set_trailing_metadata((( 107 'testkey', 108 'testvalue', 109 ),)) 110 on_next(None) 111 112 def handle_stream_unary(self, request_iterator, servicer_context): 113 if servicer_context is not None: 114 servicer_context.invocation_metadata() 115 self._control.control() 116 response_elements = [] 117 for request in request_iterator: 118 self._control.control() 119 response_elements.append(request) 120 self._control.control() 121 if servicer_context is not None: 122 servicer_context.set_trailing_metadata((( 123 'testkey', 124 'testvalue', 125 ),)) 126 return b''.join(response_elements) 127 128 def handle_stream_stream(self, request_iterator, servicer_context): 129 self._control.control() 130 if servicer_context is not None: 131 servicer_context.set_trailing_metadata((( 132 'testkey', 133 'testvalue', 134 ),)) 135 for request in request_iterator: 136 self._control.control() 137 yield request 138 self._control.control() 139 140 def handle_stream_stream_non_blocking(self, request_iterator, 141 servicer_context, on_next): 142 self._control.control() 143 if servicer_context is not None: 144 servicer_context.set_trailing_metadata((( 145 'testkey', 146 'testvalue', 147 ),)) 148 for request in request_iterator: 149 self._control.control() 150 on_next(request) 151 self._control.control() 152 on_next(None) 153 154 155class _MethodHandler(grpc.RpcMethodHandler): 156 157 def __init__(self, request_streaming, response_streaming, 158 request_deserializer, response_serializer, unary_unary, 159 unary_stream, stream_unary, stream_stream): 160 self.request_streaming = request_streaming 161 self.response_streaming = response_streaming 162 self.request_deserializer = request_deserializer 163 self.response_serializer = response_serializer 164 self.unary_unary = unary_unary 165 self.unary_stream = unary_stream 166 self.stream_unary = stream_unary 167 self.stream_stream = stream_stream 168 169 170class _GenericHandler(grpc.GenericRpcHandler): 171 172 def __init__(self, handler): 173 self._handler = handler 174 175 def service(self, handler_call_details): 176 if handler_call_details.method == _UNARY_UNARY: 177 return _MethodHandler(False, False, None, None, 178 self._handler.handle_unary_unary, None, None, 179 None) 180 elif handler_call_details.method == _UNARY_STREAM: 181 return _MethodHandler(False, True, _DESERIALIZE_REQUEST, 182 _SERIALIZE_RESPONSE, None, 183 self._handler.handle_unary_stream, None, None) 184 elif handler_call_details.method == _UNARY_STREAM_NON_BLOCKING: 185 return _MethodHandler( 186 False, True, _DESERIALIZE_REQUEST, _SERIALIZE_RESPONSE, None, 187 self._handler.handle_unary_stream_non_blocking, None, None) 188 elif handler_call_details.method == _STREAM_UNARY: 189 return _MethodHandler(True, False, _DESERIALIZE_REQUEST, 190 _SERIALIZE_RESPONSE, None, None, 191 self._handler.handle_stream_unary, None) 192 elif handler_call_details.method == _STREAM_STREAM: 193 return _MethodHandler(True, True, None, None, None, None, None, 194 self._handler.handle_stream_stream) 195 elif handler_call_details.method == _STREAM_STREAM_NON_BLOCKING: 196 return _MethodHandler( 197 True, True, None, None, None, None, None, 198 self._handler.handle_stream_stream_non_blocking) 199 else: 200 return None 201 202 203def _unary_unary_multi_callable(channel): 204 return channel.unary_unary(_UNARY_UNARY) 205 206 207def _unary_stream_multi_callable(channel): 208 return channel.unary_stream(_UNARY_STREAM, 209 request_serializer=_SERIALIZE_REQUEST, 210 response_deserializer=_DESERIALIZE_RESPONSE) 211 212 213def _unary_stream_non_blocking_multi_callable(channel): 214 return channel.unary_stream(_UNARY_STREAM_NON_BLOCKING, 215 request_serializer=_SERIALIZE_REQUEST, 216 response_deserializer=_DESERIALIZE_RESPONSE) 217 218 219def _stream_unary_multi_callable(channel): 220 return channel.stream_unary(_STREAM_UNARY, 221 request_serializer=_SERIALIZE_REQUEST, 222 response_deserializer=_DESERIALIZE_RESPONSE) 223 224 225def _stream_stream_multi_callable(channel): 226 return channel.stream_stream(_STREAM_STREAM) 227 228 229def _stream_stream_non_blocking_multi_callable(channel): 230 return channel.stream_stream(_STREAM_STREAM_NON_BLOCKING) 231 232 233class RPCTest(unittest.TestCase): 234 235 def setUp(self): 236 self._control = test_control.PauseFailControl() 237 self._thread_pool = thread_pool.RecordingThreadPool(max_workers=None) 238 self._handler = _Handler(self._control, self._thread_pool) 239 240 self._server = test_common.test_server() 241 port = self._server.add_insecure_port('[::]:0') 242 self._server.add_generic_rpc_handlers((_GenericHandler(self._handler),)) 243 self._server.start() 244 245 self._channel = grpc.insecure_channel('localhost:%d' % port) 246 247 def tearDown(self): 248 self._server.stop(None) 249 self._channel.close() 250 251 def testDefaultThreadPoolIsUsed(self): 252 self._consume_one_stream_response_unary_request( 253 _unary_stream_multi_callable(self._channel)) 254 self.assertFalse(self._thread_pool.was_used()) 255 256 def testExperimentalThreadPoolIsUsed(self): 257 self._consume_one_stream_response_unary_request( 258 _unary_stream_non_blocking_multi_callable(self._channel)) 259 self.assertTrue(self._thread_pool.was_used()) 260 261 def testUnrecognizedMethod(self): 262 request = b'abc' 263 264 with self.assertRaises(grpc.RpcError) as exception_context: 265 self._channel.unary_unary('NoSuchMethod')(request) 266 267 self.assertEqual(grpc.StatusCode.UNIMPLEMENTED, 268 exception_context.exception.code()) 269 270 def testSuccessfulUnaryRequestBlockingUnaryResponse(self): 271 request = b'\x07\x08' 272 expected_response = self._handler.handle_unary_unary(request, None) 273 274 multi_callable = _unary_unary_multi_callable(self._channel) 275 response = multi_callable( 276 request, 277 metadata=(('test', 'SuccessfulUnaryRequestBlockingUnaryResponse'),)) 278 279 self.assertEqual(expected_response, response) 280 281 def testSuccessfulUnaryRequestBlockingUnaryResponseWithCall(self): 282 request = b'\x07\x08' 283 expected_response = self._handler.handle_unary_unary(request, None) 284 285 multi_callable = _unary_unary_multi_callable(self._channel) 286 response, call = multi_callable.with_call( 287 request, 288 metadata=(('test', 289 'SuccessfulUnaryRequestBlockingUnaryResponseWithCall'),)) 290 291 self.assertEqual(expected_response, response) 292 self.assertIs(grpc.StatusCode.OK, call.code()) 293 self.assertEqual('', call.debug_error_string()) 294 295 def testSuccessfulUnaryRequestFutureUnaryResponse(self): 296 request = b'\x07\x08' 297 expected_response = self._handler.handle_unary_unary(request, None) 298 299 multi_callable = _unary_unary_multi_callable(self._channel) 300 response_future = multi_callable.future( 301 request, 302 metadata=(('test', 'SuccessfulUnaryRequestFutureUnaryResponse'),)) 303 response = response_future.result() 304 305 self.assertIsInstance(response_future, grpc.Future) 306 self.assertIsInstance(response_future, grpc.Call) 307 self.assertEqual(expected_response, response) 308 self.assertIsNone(response_future.exception()) 309 self.assertIsNone(response_future.traceback()) 310 311 def testSuccessfulUnaryRequestStreamResponse(self): 312 request = b'\x37\x58' 313 expected_responses = tuple( 314 self._handler.handle_unary_stream(request, None)) 315 316 multi_callable = _unary_stream_multi_callable(self._channel) 317 response_iterator = multi_callable( 318 request, 319 metadata=(('test', 'SuccessfulUnaryRequestStreamResponse'),)) 320 responses = tuple(response_iterator) 321 322 self.assertSequenceEqual(expected_responses, responses) 323 324 def testSuccessfulStreamRequestBlockingUnaryResponse(self): 325 requests = tuple( 326 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 327 expected_response = self._handler.handle_stream_unary( 328 iter(requests), None) 329 request_iterator = iter(requests) 330 331 multi_callable = _stream_unary_multi_callable(self._channel) 332 response = multi_callable( 333 request_iterator, 334 metadata=(('test', 335 'SuccessfulStreamRequestBlockingUnaryResponse'),)) 336 337 self.assertEqual(expected_response, response) 338 339 def testSuccessfulStreamRequestBlockingUnaryResponseWithCall(self): 340 requests = tuple( 341 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 342 expected_response = self._handler.handle_stream_unary( 343 iter(requests), None) 344 request_iterator = iter(requests) 345 346 multi_callable = _stream_unary_multi_callable(self._channel) 347 response, call = multi_callable.with_call( 348 request_iterator, 349 metadata=( 350 ('test', 351 'SuccessfulStreamRequestBlockingUnaryResponseWithCall'),)) 352 353 self.assertEqual(expected_response, response) 354 self.assertIs(grpc.StatusCode.OK, call.code()) 355 356 def testSuccessfulStreamRequestFutureUnaryResponse(self): 357 requests = tuple( 358 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 359 expected_response = self._handler.handle_stream_unary( 360 iter(requests), None) 361 request_iterator = iter(requests) 362 363 multi_callable = _stream_unary_multi_callable(self._channel) 364 response_future = multi_callable.future( 365 request_iterator, 366 metadata=(('test', 'SuccessfulStreamRequestFutureUnaryResponse'),)) 367 response = response_future.result() 368 369 self.assertEqual(expected_response, response) 370 self.assertIsNone(response_future.exception()) 371 self.assertIsNone(response_future.traceback()) 372 373 def testSuccessfulStreamRequestStreamResponse(self): 374 requests = tuple( 375 b'\x77\x58' for _ in range(test_constants.STREAM_LENGTH)) 376 377 expected_responses = tuple( 378 self._handler.handle_stream_stream(iter(requests), None)) 379 request_iterator = iter(requests) 380 381 multi_callable = _stream_stream_multi_callable(self._channel) 382 response_iterator = multi_callable( 383 request_iterator, 384 metadata=(('test', 'SuccessfulStreamRequestStreamResponse'),)) 385 responses = tuple(response_iterator) 386 387 self.assertSequenceEqual(expected_responses, responses) 388 389 def testSequentialInvocations(self): 390 first_request = b'\x07\x08' 391 second_request = b'\x0809' 392 expected_first_response = self._handler.handle_unary_unary( 393 first_request, None) 394 expected_second_response = self._handler.handle_unary_unary( 395 second_request, None) 396 397 multi_callable = _unary_unary_multi_callable(self._channel) 398 first_response = multi_callable(first_request, 399 metadata=(('test', 400 'SequentialInvocations'),)) 401 second_response = multi_callable(second_request, 402 metadata=(('test', 403 'SequentialInvocations'),)) 404 405 self.assertEqual(expected_first_response, first_response) 406 self.assertEqual(expected_second_response, second_response) 407 408 def testConcurrentBlockingInvocations(self): 409 pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) 410 requests = tuple( 411 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 412 expected_response = self._handler.handle_stream_unary( 413 iter(requests), None) 414 expected_responses = [expected_response 415 ] * test_constants.THREAD_CONCURRENCY 416 response_futures = [None] * test_constants.THREAD_CONCURRENCY 417 418 multi_callable = _stream_unary_multi_callable(self._channel) 419 for index in range(test_constants.THREAD_CONCURRENCY): 420 request_iterator = iter(requests) 421 response_future = pool.submit( 422 multi_callable, 423 request_iterator, 424 metadata=(('test', 'ConcurrentBlockingInvocations'),)) 425 response_futures[index] = response_future 426 responses = tuple( 427 response_future.result() for response_future in response_futures) 428 429 pool.shutdown(wait=True) 430 self.assertSequenceEqual(expected_responses, responses) 431 432 def testConcurrentFutureInvocations(self): 433 requests = tuple( 434 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 435 expected_response = self._handler.handle_stream_unary( 436 iter(requests), None) 437 expected_responses = [expected_response 438 ] * test_constants.THREAD_CONCURRENCY 439 response_futures = [None] * test_constants.THREAD_CONCURRENCY 440 441 multi_callable = _stream_unary_multi_callable(self._channel) 442 for index in range(test_constants.THREAD_CONCURRENCY): 443 request_iterator = iter(requests) 444 response_future = multi_callable.future( 445 request_iterator, 446 metadata=(('test', 'ConcurrentFutureInvocations'),)) 447 response_futures[index] = response_future 448 responses = tuple( 449 response_future.result() for response_future in response_futures) 450 451 self.assertSequenceEqual(expected_responses, responses) 452 453 def testWaitingForSomeButNotAllConcurrentFutureInvocations(self): 454 pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) 455 request = b'\x67\x68' 456 expected_response = self._handler.handle_unary_unary(request, None) 457 response_futures = [None] * test_constants.THREAD_CONCURRENCY 458 lock = threading.Lock() 459 test_is_running_cell = [True] 460 461 def wrap_future(future): 462 463 def wrap(): 464 try: 465 return future.result() 466 except grpc.RpcError: 467 with lock: 468 if test_is_running_cell[0]: 469 raise 470 return None 471 472 return wrap 473 474 multi_callable = _unary_unary_multi_callable(self._channel) 475 for index in range(test_constants.THREAD_CONCURRENCY): 476 inner_response_future = multi_callable.future( 477 request, 478 metadata=( 479 ('test', 480 'WaitingForSomeButNotAllConcurrentFutureInvocations'),)) 481 outer_response_future = pool.submit( 482 wrap_future(inner_response_future)) 483 response_futures[index] = outer_response_future 484 485 some_completed_response_futures_iterator = itertools.islice( 486 futures.as_completed(response_futures), 487 test_constants.THREAD_CONCURRENCY // 2) 488 for response_future in some_completed_response_futures_iterator: 489 self.assertEqual(expected_response, response_future.result()) 490 with lock: 491 test_is_running_cell[0] = False 492 493 def testConsumingOneStreamResponseUnaryRequest(self): 494 self._consume_one_stream_response_unary_request( 495 _unary_stream_multi_callable(self._channel)) 496 497 def testConsumingOneStreamResponseUnaryRequestNonBlocking(self): 498 self._consume_one_stream_response_unary_request( 499 _unary_stream_non_blocking_multi_callable(self._channel)) 500 501 def testConsumingSomeButNotAllStreamResponsesUnaryRequest(self): 502 self._consume_some_but_not_all_stream_responses_unary_request( 503 _unary_stream_multi_callable(self._channel)) 504 505 def testConsumingSomeButNotAllStreamResponsesUnaryRequestNonBlocking(self): 506 self._consume_some_but_not_all_stream_responses_unary_request( 507 _unary_stream_non_blocking_multi_callable(self._channel)) 508 509 def testConsumingSomeButNotAllStreamResponsesStreamRequest(self): 510 self._consume_some_but_not_all_stream_responses_stream_request( 511 _stream_stream_multi_callable(self._channel)) 512 513 def testConsumingSomeButNotAllStreamResponsesStreamRequestNonBlocking(self): 514 self._consume_some_but_not_all_stream_responses_stream_request( 515 _stream_stream_non_blocking_multi_callable(self._channel)) 516 517 def testConsumingTooManyStreamResponsesStreamRequest(self): 518 self._consume_too_many_stream_responses_stream_request( 519 _stream_stream_multi_callable(self._channel)) 520 521 def testConsumingTooManyStreamResponsesStreamRequestNonBlocking(self): 522 self._consume_too_many_stream_responses_stream_request( 523 _stream_stream_non_blocking_multi_callable(self._channel)) 524 525 def testCancelledUnaryRequestUnaryResponse(self): 526 request = b'\x07\x17' 527 528 multi_callable = _unary_unary_multi_callable(self._channel) 529 with self._control.pause(): 530 response_future = multi_callable.future( 531 request, 532 metadata=(('test', 'CancelledUnaryRequestUnaryResponse'),)) 533 response_future.cancel() 534 535 self.assertIs(grpc.StatusCode.CANCELLED, response_future.code()) 536 self.assertTrue(response_future.cancelled()) 537 with self.assertRaises(grpc.FutureCancelledError): 538 response_future.result() 539 with self.assertRaises(grpc.FutureCancelledError): 540 response_future.exception() 541 with self.assertRaises(grpc.FutureCancelledError): 542 response_future.traceback() 543 544 def testCancelledUnaryRequestStreamResponse(self): 545 self._cancelled_unary_request_stream_response( 546 _unary_stream_multi_callable(self._channel)) 547 548 def testCancelledUnaryRequestStreamResponseNonBlocking(self): 549 self._cancelled_unary_request_stream_response( 550 _unary_stream_non_blocking_multi_callable(self._channel)) 551 552 def testCancelledStreamRequestUnaryResponse(self): 553 requests = tuple( 554 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 555 request_iterator = iter(requests) 556 557 multi_callable = _stream_unary_multi_callable(self._channel) 558 with self._control.pause(): 559 response_future = multi_callable.future( 560 request_iterator, 561 metadata=(('test', 'CancelledStreamRequestUnaryResponse'),)) 562 self._control.block_until_paused() 563 response_future.cancel() 564 565 self.assertIs(grpc.StatusCode.CANCELLED, response_future.code()) 566 self.assertTrue(response_future.cancelled()) 567 with self.assertRaises(grpc.FutureCancelledError): 568 response_future.result() 569 with self.assertRaises(grpc.FutureCancelledError): 570 response_future.exception() 571 with self.assertRaises(grpc.FutureCancelledError): 572 response_future.traceback() 573 self.assertIsNotNone(response_future.initial_metadata()) 574 self.assertIsNotNone(response_future.details()) 575 self.assertIsNotNone(response_future.trailing_metadata()) 576 577 def testCancelledStreamRequestStreamResponse(self): 578 self._cancelled_stream_request_stream_response( 579 _stream_stream_multi_callable(self._channel)) 580 581 def testCancelledStreamRequestStreamResponseNonBlocking(self): 582 self._cancelled_stream_request_stream_response( 583 _stream_stream_non_blocking_multi_callable(self._channel)) 584 585 def testExpiredUnaryRequestBlockingUnaryResponse(self): 586 request = b'\x07\x17' 587 588 multi_callable = _unary_unary_multi_callable(self._channel) 589 with self._control.pause(): 590 with self.assertRaises(grpc.RpcError) as exception_context: 591 multi_callable.with_call( 592 request, 593 timeout=test_constants.SHORT_TIMEOUT, 594 metadata=(('test', 595 'ExpiredUnaryRequestBlockingUnaryResponse'),)) 596 597 self.assertIsInstance(exception_context.exception, grpc.Call) 598 self.assertIsNotNone(exception_context.exception.initial_metadata()) 599 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, 600 exception_context.exception.code()) 601 self.assertIsNotNone(exception_context.exception.details()) 602 self.assertIsNotNone(exception_context.exception.trailing_metadata()) 603 604 def testExpiredUnaryRequestFutureUnaryResponse(self): 605 request = b'\x07\x17' 606 callback = _Callback() 607 608 multi_callable = _unary_unary_multi_callable(self._channel) 609 with self._control.pause(): 610 response_future = multi_callable.future( 611 request, 612 timeout=test_constants.SHORT_TIMEOUT, 613 metadata=(('test', 'ExpiredUnaryRequestFutureUnaryResponse'),)) 614 response_future.add_done_callback(callback) 615 value_passed_to_callback = callback.value() 616 617 self.assertIs(response_future, value_passed_to_callback) 618 self.assertIsNotNone(response_future.initial_metadata()) 619 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, response_future.code()) 620 self.assertIsNotNone(response_future.details()) 621 self.assertIsNotNone(response_future.trailing_metadata()) 622 with self.assertRaises(grpc.RpcError) as exception_context: 623 response_future.result() 624 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, 625 exception_context.exception.code()) 626 self.assertIsInstance(response_future.exception(), grpc.RpcError) 627 self.assertIsNotNone(response_future.traceback()) 628 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, 629 response_future.exception().code()) 630 631 def testExpiredUnaryRequestStreamResponse(self): 632 self._expired_unary_request_stream_response( 633 _unary_stream_multi_callable(self._channel)) 634 635 def testExpiredUnaryRequestStreamResponseNonBlocking(self): 636 self._expired_unary_request_stream_response( 637 _unary_stream_non_blocking_multi_callable(self._channel)) 638 639 def testExpiredStreamRequestBlockingUnaryResponse(self): 640 requests = tuple( 641 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 642 request_iterator = iter(requests) 643 644 multi_callable = _stream_unary_multi_callable(self._channel) 645 with self._control.pause(): 646 with self.assertRaises(grpc.RpcError) as exception_context: 647 multi_callable( 648 request_iterator, 649 timeout=test_constants.SHORT_TIMEOUT, 650 metadata=(('test', 651 'ExpiredStreamRequestBlockingUnaryResponse'),)) 652 653 self.assertIsInstance(exception_context.exception, grpc.RpcError) 654 self.assertIsInstance(exception_context.exception, grpc.Call) 655 self.assertIsNotNone(exception_context.exception.initial_metadata()) 656 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, 657 exception_context.exception.code()) 658 self.assertIsNotNone(exception_context.exception.details()) 659 self.assertIsNotNone(exception_context.exception.trailing_metadata()) 660 661 def testExpiredStreamRequestFutureUnaryResponse(self): 662 requests = tuple( 663 b'\x07\x18' for _ in range(test_constants.STREAM_LENGTH)) 664 request_iterator = iter(requests) 665 callback = _Callback() 666 667 multi_callable = _stream_unary_multi_callable(self._channel) 668 with self._control.pause(): 669 response_future = multi_callable.future( 670 request_iterator, 671 timeout=test_constants.SHORT_TIMEOUT, 672 metadata=(('test', 'ExpiredStreamRequestFutureUnaryResponse'),)) 673 with self.assertRaises(grpc.FutureTimeoutError): 674 response_future.result(timeout=test_constants.SHORT_TIMEOUT / 675 2.0) 676 response_future.add_done_callback(callback) 677 value_passed_to_callback = callback.value() 678 679 with self.assertRaises(grpc.RpcError) as exception_context: 680 response_future.result() 681 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, response_future.code()) 682 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, 683 exception_context.exception.code()) 684 self.assertIsInstance(response_future.exception(), grpc.RpcError) 685 self.assertIsNotNone(response_future.traceback()) 686 self.assertIs(response_future, value_passed_to_callback) 687 self.assertIsNotNone(response_future.initial_metadata()) 688 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, response_future.code()) 689 self.assertIsNotNone(response_future.details()) 690 self.assertIsNotNone(response_future.trailing_metadata()) 691 692 def testExpiredStreamRequestStreamResponse(self): 693 self._expired_stream_request_stream_response( 694 _stream_stream_multi_callable(self._channel)) 695 696 def testExpiredStreamRequestStreamResponseNonBlocking(self): 697 self._expired_stream_request_stream_response( 698 _stream_stream_non_blocking_multi_callable(self._channel)) 699 700 def testFailedUnaryRequestBlockingUnaryResponse(self): 701 request = b'\x37\x17' 702 703 multi_callable = _unary_unary_multi_callable(self._channel) 704 with self._control.fail(): 705 with self.assertRaises(grpc.RpcError) as exception_context: 706 multi_callable.with_call( 707 request, 708 metadata=(('test', 709 'FailedUnaryRequestBlockingUnaryResponse'),)) 710 711 self.assertIs(grpc.StatusCode.UNKNOWN, 712 exception_context.exception.code()) 713 # sanity checks on to make sure returned string contains default members 714 # of the error 715 debug_error_string = exception_context.exception.debug_error_string() 716 self.assertIn('created', debug_error_string) 717 self.assertIn('description', debug_error_string) 718 self.assertIn('file', debug_error_string) 719 self.assertIn('file_line', debug_error_string) 720 721 def testFailedUnaryRequestFutureUnaryResponse(self): 722 request = b'\x37\x17' 723 callback = _Callback() 724 725 multi_callable = _unary_unary_multi_callable(self._channel) 726 with self._control.fail(): 727 response_future = multi_callable.future( 728 request, 729 metadata=(('test', 'FailedUnaryRequestFutureUnaryResponse'),)) 730 response_future.add_done_callback(callback) 731 value_passed_to_callback = callback.value() 732 733 self.assertIsInstance(response_future, grpc.Future) 734 self.assertIsInstance(response_future, grpc.Call) 735 with self.assertRaises(grpc.RpcError) as exception_context: 736 response_future.result() 737 self.assertIs(grpc.StatusCode.UNKNOWN, 738 exception_context.exception.code()) 739 self.assertIsInstance(response_future.exception(), grpc.RpcError) 740 self.assertIsNotNone(response_future.traceback()) 741 self.assertIs(grpc.StatusCode.UNKNOWN, 742 response_future.exception().code()) 743 self.assertIs(response_future, value_passed_to_callback) 744 745 def testFailedUnaryRequestStreamResponse(self): 746 self._failed_unary_request_stream_response( 747 _unary_stream_multi_callable(self._channel)) 748 749 def testFailedUnaryRequestStreamResponseNonBlocking(self): 750 self._failed_unary_request_stream_response( 751 _unary_stream_non_blocking_multi_callable(self._channel)) 752 753 def testFailedStreamRequestBlockingUnaryResponse(self): 754 requests = tuple( 755 b'\x47\x58' for _ in range(test_constants.STREAM_LENGTH)) 756 request_iterator = iter(requests) 757 758 multi_callable = _stream_unary_multi_callable(self._channel) 759 with self._control.fail(): 760 with self.assertRaises(grpc.RpcError) as exception_context: 761 multi_callable( 762 request_iterator, 763 metadata=(('test', 764 'FailedStreamRequestBlockingUnaryResponse'),)) 765 766 self.assertIs(grpc.StatusCode.UNKNOWN, 767 exception_context.exception.code()) 768 769 def testFailedStreamRequestFutureUnaryResponse(self): 770 requests = tuple( 771 b'\x07\x18' for _ in range(test_constants.STREAM_LENGTH)) 772 request_iterator = iter(requests) 773 callback = _Callback() 774 775 multi_callable = _stream_unary_multi_callable(self._channel) 776 with self._control.fail(): 777 response_future = multi_callable.future( 778 request_iterator, 779 metadata=(('test', 'FailedStreamRequestFutureUnaryResponse'),)) 780 response_future.add_done_callback(callback) 781 value_passed_to_callback = callback.value() 782 783 with self.assertRaises(grpc.RpcError) as exception_context: 784 response_future.result() 785 self.assertIs(grpc.StatusCode.UNKNOWN, response_future.code()) 786 self.assertIs(grpc.StatusCode.UNKNOWN, 787 exception_context.exception.code()) 788 self.assertIsInstance(response_future.exception(), grpc.RpcError) 789 self.assertIsNotNone(response_future.traceback()) 790 self.assertIs(response_future, value_passed_to_callback) 791 792 def testFailedStreamRequestStreamResponse(self): 793 self._failed_stream_request_stream_response( 794 _stream_stream_multi_callable(self._channel)) 795 796 def testFailedStreamRequestStreamResponseNonBlocking(self): 797 self._failed_stream_request_stream_response( 798 _stream_stream_non_blocking_multi_callable(self._channel)) 799 800 def testIgnoredUnaryRequestFutureUnaryResponse(self): 801 request = b'\x37\x17' 802 803 multi_callable = _unary_unary_multi_callable(self._channel) 804 multi_callable.future( 805 request, 806 metadata=(('test', 'IgnoredUnaryRequestFutureUnaryResponse'),)) 807 808 def testIgnoredUnaryRequestStreamResponse(self): 809 self._ignored_unary_stream_request_future_unary_response( 810 _unary_stream_multi_callable(self._channel)) 811 812 def testIgnoredUnaryRequestStreamResponseNonBlocking(self): 813 self._ignored_unary_stream_request_future_unary_response( 814 _unary_stream_non_blocking_multi_callable(self._channel)) 815 816 def testIgnoredStreamRequestFutureUnaryResponse(self): 817 requests = tuple( 818 b'\x07\x18' for _ in range(test_constants.STREAM_LENGTH)) 819 request_iterator = iter(requests) 820 821 multi_callable = _stream_unary_multi_callable(self._channel) 822 multi_callable.future( 823 request_iterator, 824 metadata=(('test', 'IgnoredStreamRequestFutureUnaryResponse'),)) 825 826 def testIgnoredStreamRequestStreamResponse(self): 827 self._ignored_stream_request_stream_response( 828 _stream_stream_multi_callable(self._channel)) 829 830 def testIgnoredStreamRequestStreamResponseNonBlocking(self): 831 self._ignored_stream_request_stream_response( 832 _stream_stream_non_blocking_multi_callable(self._channel)) 833 834 def _consume_one_stream_response_unary_request(self, multi_callable): 835 request = b'\x57\x38' 836 837 response_iterator = multi_callable( 838 request, 839 metadata=(('test', 'ConsumingOneStreamResponseUnaryRequest'),)) 840 next(response_iterator) 841 842 def _consume_some_but_not_all_stream_responses_unary_request( 843 self, multi_callable): 844 request = b'\x57\x38' 845 846 response_iterator = multi_callable( 847 request, 848 metadata=(('test', 849 'ConsumingSomeButNotAllStreamResponsesUnaryRequest'),)) 850 for _ in range(test_constants.STREAM_LENGTH // 2): 851 next(response_iterator) 852 853 def _consume_some_but_not_all_stream_responses_stream_request( 854 self, multi_callable): 855 requests = tuple( 856 b'\x67\x88' for _ in range(test_constants.STREAM_LENGTH)) 857 request_iterator = iter(requests) 858 859 response_iterator = multi_callable( 860 request_iterator, 861 metadata=(('test', 862 'ConsumingSomeButNotAllStreamResponsesStreamRequest'),)) 863 for _ in range(test_constants.STREAM_LENGTH // 2): 864 next(response_iterator) 865 866 def _consume_too_many_stream_responses_stream_request(self, multi_callable): 867 requests = tuple( 868 b'\x67\x88' for _ in range(test_constants.STREAM_LENGTH)) 869 request_iterator = iter(requests) 870 871 response_iterator = multi_callable( 872 request_iterator, 873 metadata=(('test', 874 'ConsumingTooManyStreamResponsesStreamRequest'),)) 875 for _ in range(test_constants.STREAM_LENGTH): 876 next(response_iterator) 877 for _ in range(test_constants.STREAM_LENGTH): 878 with self.assertRaises(StopIteration): 879 next(response_iterator) 880 881 self.assertIsNotNone(response_iterator.initial_metadata()) 882 self.assertIs(grpc.StatusCode.OK, response_iterator.code()) 883 self.assertIsNotNone(response_iterator.details()) 884 self.assertIsNotNone(response_iterator.trailing_metadata()) 885 886 def _cancelled_unary_request_stream_response(self, multi_callable): 887 request = b'\x07\x19' 888 889 with self._control.pause(): 890 response_iterator = multi_callable( 891 request, 892 metadata=(('test', 'CancelledUnaryRequestStreamResponse'),)) 893 self._control.block_until_paused() 894 response_iterator.cancel() 895 896 with self.assertRaises(grpc.RpcError) as exception_context: 897 next(response_iterator) 898 self.assertIs(grpc.StatusCode.CANCELLED, 899 exception_context.exception.code()) 900 self.assertIsNotNone(response_iterator.initial_metadata()) 901 self.assertIs(grpc.StatusCode.CANCELLED, response_iterator.code()) 902 self.assertIsNotNone(response_iterator.details()) 903 self.assertIsNotNone(response_iterator.trailing_metadata()) 904 905 def _cancelled_stream_request_stream_response(self, multi_callable): 906 requests = tuple( 907 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 908 request_iterator = iter(requests) 909 910 with self._control.pause(): 911 response_iterator = multi_callable( 912 request_iterator, 913 metadata=(('test', 'CancelledStreamRequestStreamResponse'),)) 914 response_iterator.cancel() 915 916 with self.assertRaises(grpc.RpcError): 917 next(response_iterator) 918 self.assertIsNotNone(response_iterator.initial_metadata()) 919 self.assertIs(grpc.StatusCode.CANCELLED, response_iterator.code()) 920 self.assertIsNotNone(response_iterator.details()) 921 self.assertIsNotNone(response_iterator.trailing_metadata()) 922 923 def _expired_unary_request_stream_response(self, multi_callable): 924 request = b'\x07\x19' 925 926 with self._control.pause(): 927 with self.assertRaises(grpc.RpcError) as exception_context: 928 response_iterator = multi_callable( 929 request, 930 timeout=test_constants.SHORT_TIMEOUT, 931 metadata=(('test', 'ExpiredUnaryRequestStreamResponse'),)) 932 next(response_iterator) 933 934 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, 935 exception_context.exception.code()) 936 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, 937 response_iterator.code()) 938 939 def _expired_stream_request_stream_response(self, multi_callable): 940 requests = tuple( 941 b'\x67\x18' for _ in range(test_constants.STREAM_LENGTH)) 942 request_iterator = iter(requests) 943 944 with self._control.pause(): 945 with self.assertRaises(grpc.RpcError) as exception_context: 946 response_iterator = multi_callable( 947 request_iterator, 948 timeout=test_constants.SHORT_TIMEOUT, 949 metadata=(('test', 'ExpiredStreamRequestStreamResponse'),)) 950 next(response_iterator) 951 952 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, 953 exception_context.exception.code()) 954 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, 955 response_iterator.code()) 956 957 def _failed_unary_request_stream_response(self, multi_callable): 958 request = b'\x37\x17' 959 960 with self.assertRaises(grpc.RpcError) as exception_context: 961 with self._control.fail(): 962 response_iterator = multi_callable( 963 request, 964 metadata=(('test', 'FailedUnaryRequestStreamResponse'),)) 965 next(response_iterator) 966 967 self.assertIs(grpc.StatusCode.UNKNOWN, 968 exception_context.exception.code()) 969 970 def _failed_stream_request_stream_response(self, multi_callable): 971 requests = tuple( 972 b'\x67\x88' for _ in range(test_constants.STREAM_LENGTH)) 973 request_iterator = iter(requests) 974 975 with self._control.fail(): 976 with self.assertRaises(grpc.RpcError) as exception_context: 977 response_iterator = multi_callable( 978 request_iterator, 979 metadata=(('test', 'FailedStreamRequestStreamResponse'),)) 980 tuple(response_iterator) 981 982 self.assertIs(grpc.StatusCode.UNKNOWN, 983 exception_context.exception.code()) 984 self.assertIs(grpc.StatusCode.UNKNOWN, response_iterator.code()) 985 986 def _ignored_unary_stream_request_future_unary_response( 987 self, multi_callable): 988 request = b'\x37\x17' 989 990 multi_callable(request, 991 metadata=(('test', 992 'IgnoredUnaryRequestStreamResponse'),)) 993 994 def _ignored_stream_request_stream_response(self, multi_callable): 995 requests = tuple( 996 b'\x67\x88' for _ in range(test_constants.STREAM_LENGTH)) 997 request_iterator = iter(requests) 998 999 multi_callable(request_iterator, 1000 metadata=(('test', 1001 'IgnoredStreamRequestStreamResponse'),)) 1002 1003 1004if __name__ == '__main__': 1005 logging.basicConfig() 1006 unittest.main(verbosity=2) 1007