1# Copyright 2016 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Test of RPCs made against gRPC Python's application-layer API.""" 15 16import itertools 17import threading 18import unittest 19import logging 20from concurrent import futures 21 22import grpc 23from grpc.framework.foundation import logging_pool 24 25from tests.unit._rpc_test_helpers import ( 26 TIMEOUT_SHORT, Callback, unary_unary_multi_callable, 27 unary_stream_multi_callable, unary_stream_non_blocking_multi_callable, 28 stream_unary_multi_callable, stream_stream_multi_callable, 29 stream_stream_non_blocking_multi_callable, BaseRPCTest) 30from tests.unit.framework.common import test_constants 31 32 33class RPCPart2Test(BaseRPCTest, unittest.TestCase): 34 35 def testDefaultThreadPoolIsUsed(self): 36 self._consume_one_stream_response_unary_request( 37 unary_stream_multi_callable(self._channel)) 38 self.assertFalse(self._thread_pool.was_used()) 39 40 def testExperimentalThreadPoolIsUsed(self): 41 self._consume_one_stream_response_unary_request( 42 unary_stream_non_blocking_multi_callable(self._channel)) 43 self.assertTrue(self._thread_pool.was_used()) 44 45 def testUnrecognizedMethod(self): 46 request = b'abc' 47 48 with self.assertRaises(grpc.RpcError) as exception_context: 49 self._channel.unary_unary('NoSuchMethod')(request) 50 51 self.assertEqual(grpc.StatusCode.UNIMPLEMENTED, 52 exception_context.exception.code()) 53 54 def testSuccessfulUnaryRequestBlockingUnaryResponse(self): 55 request = b'\x07\x08' 56 expected_response = self._handler.handle_unary_unary(request, None) 57 58 multi_callable = unary_unary_multi_callable(self._channel) 59 response = multi_callable( 60 request, 61 metadata=(('test', 'SuccessfulUnaryRequestBlockingUnaryResponse'),)) 62 63 self.assertEqual(expected_response, response) 64 65 def testSuccessfulUnaryRequestBlockingUnaryResponseWithCall(self): 66 request = b'\x07\x08' 67 expected_response = self._handler.handle_unary_unary(request, None) 68 69 multi_callable = unary_unary_multi_callable(self._channel) 70 response, call = multi_callable.with_call( 71 request, 72 metadata=(('test', 73 'SuccessfulUnaryRequestBlockingUnaryResponseWithCall'),)) 74 75 self.assertEqual(expected_response, response) 76 self.assertIs(grpc.StatusCode.OK, call.code()) 77 self.assertEqual('', call.debug_error_string()) 78 79 def testSuccessfulUnaryRequestFutureUnaryResponse(self): 80 request = b'\x07\x08' 81 expected_response = self._handler.handle_unary_unary(request, None) 82 83 multi_callable = unary_unary_multi_callable(self._channel) 84 response_future = multi_callable.future( 85 request, 86 metadata=(('test', 'SuccessfulUnaryRequestFutureUnaryResponse'),)) 87 response = response_future.result() 88 89 self.assertIsInstance(response_future, grpc.Future) 90 self.assertIsInstance(response_future, grpc.Call) 91 self.assertEqual(expected_response, response) 92 self.assertIsNone(response_future.exception()) 93 self.assertIsNone(response_future.traceback()) 94 95 def testSuccessfulUnaryRequestStreamResponse(self): 96 request = b'\x37\x58' 97 expected_responses = tuple( 98 self._handler.handle_unary_stream(request, None)) 99 100 multi_callable = unary_stream_multi_callable(self._channel) 101 response_iterator = multi_callable( 102 request, 103 metadata=(('test', 'SuccessfulUnaryRequestStreamResponse'),)) 104 responses = tuple(response_iterator) 105 106 self.assertSequenceEqual(expected_responses, responses) 107 108 def testSuccessfulStreamRequestBlockingUnaryResponse(self): 109 requests = tuple( 110 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 111 expected_response = self._handler.handle_stream_unary( 112 iter(requests), None) 113 request_iterator = iter(requests) 114 115 multi_callable = stream_unary_multi_callable(self._channel) 116 response = multi_callable( 117 request_iterator, 118 metadata=(('test', 119 'SuccessfulStreamRequestBlockingUnaryResponse'),)) 120 121 self.assertEqual(expected_response, response) 122 123 def testSuccessfulStreamRequestBlockingUnaryResponseWithCall(self): 124 requests = tuple( 125 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 126 expected_response = self._handler.handle_stream_unary( 127 iter(requests), None) 128 request_iterator = iter(requests) 129 130 multi_callable = stream_unary_multi_callable(self._channel) 131 response, call = multi_callable.with_call( 132 request_iterator, 133 metadata=( 134 ('test', 135 'SuccessfulStreamRequestBlockingUnaryResponseWithCall'),)) 136 137 self.assertEqual(expected_response, response) 138 self.assertIs(grpc.StatusCode.OK, call.code()) 139 140 def testSuccessfulStreamRequestFutureUnaryResponse(self): 141 requests = tuple( 142 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 143 expected_response = self._handler.handle_stream_unary( 144 iter(requests), None) 145 request_iterator = iter(requests) 146 147 multi_callable = stream_unary_multi_callable(self._channel) 148 response_future = multi_callable.future( 149 request_iterator, 150 metadata=(('test', 'SuccessfulStreamRequestFutureUnaryResponse'),)) 151 response = response_future.result() 152 153 self.assertEqual(expected_response, response) 154 self.assertIsNone(response_future.exception()) 155 self.assertIsNone(response_future.traceback()) 156 157 def testSuccessfulStreamRequestStreamResponse(self): 158 requests = tuple( 159 b'\x77\x58' for _ in range(test_constants.STREAM_LENGTH)) 160 161 expected_responses = tuple( 162 self._handler.handle_stream_stream(iter(requests), None)) 163 request_iterator = iter(requests) 164 165 multi_callable = stream_stream_multi_callable(self._channel) 166 response_iterator = multi_callable( 167 request_iterator, 168 metadata=(('test', 'SuccessfulStreamRequestStreamResponse'),)) 169 responses = tuple(response_iterator) 170 171 self.assertSequenceEqual(expected_responses, responses) 172 173 def testSequentialInvocations(self): 174 first_request = b'\x07\x08' 175 second_request = b'\x0809' 176 expected_first_response = self._handler.handle_unary_unary( 177 first_request, None) 178 expected_second_response = self._handler.handle_unary_unary( 179 second_request, None) 180 181 multi_callable = unary_unary_multi_callable(self._channel) 182 first_response = multi_callable(first_request, 183 metadata=(('test', 184 'SequentialInvocations'),)) 185 second_response = multi_callable(second_request, 186 metadata=(('test', 187 'SequentialInvocations'),)) 188 189 self.assertEqual(expected_first_response, first_response) 190 self.assertEqual(expected_second_response, second_response) 191 192 def testConcurrentBlockingInvocations(self): 193 pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) 194 requests = tuple( 195 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 196 expected_response = self._handler.handle_stream_unary( 197 iter(requests), None) 198 expected_responses = [expected_response 199 ] * test_constants.THREAD_CONCURRENCY 200 response_futures = [None] * test_constants.THREAD_CONCURRENCY 201 202 multi_callable = stream_unary_multi_callable(self._channel) 203 for index in range(test_constants.THREAD_CONCURRENCY): 204 request_iterator = iter(requests) 205 response_future = pool.submit( 206 multi_callable, 207 request_iterator, 208 metadata=(('test', 'ConcurrentBlockingInvocations'),)) 209 response_futures[index] = response_future 210 responses = tuple( 211 response_future.result() for response_future in response_futures) 212 213 pool.shutdown(wait=True) 214 self.assertSequenceEqual(expected_responses, responses) 215 216 def testConcurrentFutureInvocations(self): 217 requests = tuple( 218 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 219 expected_response = self._handler.handle_stream_unary( 220 iter(requests), None) 221 expected_responses = [expected_response 222 ] * test_constants.THREAD_CONCURRENCY 223 response_futures = [None] * test_constants.THREAD_CONCURRENCY 224 225 multi_callable = stream_unary_multi_callable(self._channel) 226 for index in range(test_constants.THREAD_CONCURRENCY): 227 request_iterator = iter(requests) 228 response_future = multi_callable.future( 229 request_iterator, 230 metadata=(('test', 'ConcurrentFutureInvocations'),)) 231 response_futures[index] = response_future 232 responses = tuple( 233 response_future.result() for response_future in response_futures) 234 235 self.assertSequenceEqual(expected_responses, responses) 236 237 def testWaitingForSomeButNotAllConcurrentFutureInvocations(self): 238 pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) 239 request = b'\x67\x68' 240 expected_response = self._handler.handle_unary_unary(request, None) 241 response_futures = [None] * test_constants.THREAD_CONCURRENCY 242 lock = threading.Lock() 243 test_is_running_cell = [True] 244 245 def wrap_future(future): 246 247 def wrap(): 248 try: 249 return future.result() 250 except grpc.RpcError: 251 with lock: 252 if test_is_running_cell[0]: 253 raise 254 return None 255 256 return wrap 257 258 multi_callable = unary_unary_multi_callable(self._channel) 259 for index in range(test_constants.THREAD_CONCURRENCY): 260 inner_response_future = multi_callable.future( 261 request, 262 metadata=( 263 ('test', 264 'WaitingForSomeButNotAllConcurrentFutureInvocations'),)) 265 outer_response_future = pool.submit( 266 wrap_future(inner_response_future)) 267 response_futures[index] = outer_response_future 268 269 some_completed_response_futures_iterator = itertools.islice( 270 futures.as_completed(response_futures), 271 test_constants.THREAD_CONCURRENCY // 2) 272 for response_future in some_completed_response_futures_iterator: 273 self.assertEqual(expected_response, response_future.result()) 274 with lock: 275 test_is_running_cell[0] = False 276 277 def testConsumingOneStreamResponseUnaryRequest(self): 278 self._consume_one_stream_response_unary_request( 279 unary_stream_multi_callable(self._channel)) 280 281 def testConsumingOneStreamResponseUnaryRequestNonBlocking(self): 282 self._consume_one_stream_response_unary_request( 283 unary_stream_non_blocking_multi_callable(self._channel)) 284 285 def testConsumingSomeButNotAllStreamResponsesUnaryRequest(self): 286 self._consume_some_but_not_all_stream_responses_unary_request( 287 unary_stream_multi_callable(self._channel)) 288 289 def testConsumingSomeButNotAllStreamResponsesUnaryRequestNonBlocking(self): 290 self._consume_some_but_not_all_stream_responses_unary_request( 291 unary_stream_non_blocking_multi_callable(self._channel)) 292 293 def testConsumingSomeButNotAllStreamResponsesStreamRequest(self): 294 self._consume_some_but_not_all_stream_responses_stream_request( 295 stream_stream_multi_callable(self._channel)) 296 297 def testConsumingSomeButNotAllStreamResponsesStreamRequestNonBlocking(self): 298 self._consume_some_but_not_all_stream_responses_stream_request( 299 stream_stream_non_blocking_multi_callable(self._channel)) 300 301 def testConsumingTooManyStreamResponsesStreamRequest(self): 302 self._consume_too_many_stream_responses_stream_request( 303 stream_stream_multi_callable(self._channel)) 304 305 def testConsumingTooManyStreamResponsesStreamRequestNonBlocking(self): 306 self._consume_too_many_stream_responses_stream_request( 307 stream_stream_non_blocking_multi_callable(self._channel)) 308 309 def testCancelledUnaryRequestUnaryResponse(self): 310 request = b'\x07\x17' 311 312 multi_callable = unary_unary_multi_callable(self._channel) 313 with self._control.pause(): 314 response_future = multi_callable.future( 315 request, 316 metadata=(('test', 'CancelledUnaryRequestUnaryResponse'),)) 317 response_future.cancel() 318 319 self.assertIs(grpc.StatusCode.CANCELLED, response_future.code()) 320 self.assertTrue(response_future.cancelled()) 321 with self.assertRaises(grpc.FutureCancelledError): 322 response_future.result() 323 with self.assertRaises(grpc.FutureCancelledError): 324 response_future.exception() 325 with self.assertRaises(grpc.FutureCancelledError): 326 response_future.traceback() 327 328 def testCancelledUnaryRequestStreamResponse(self): 329 self._cancelled_unary_request_stream_response( 330 unary_stream_multi_callable(self._channel)) 331 332 def testCancelledUnaryRequestStreamResponseNonBlocking(self): 333 self._cancelled_unary_request_stream_response( 334 unary_stream_non_blocking_multi_callable(self._channel)) 335 336 def testCancelledStreamRequestUnaryResponse(self): 337 requests = tuple( 338 b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) 339 request_iterator = iter(requests) 340 341 multi_callable = stream_unary_multi_callable(self._channel) 342 with self._control.pause(): 343 response_future = multi_callable.future( 344 request_iterator, 345 metadata=(('test', 'CancelledStreamRequestUnaryResponse'),)) 346 self._control.block_until_paused() 347 response_future.cancel() 348 349 self.assertIs(grpc.StatusCode.CANCELLED, response_future.code()) 350 self.assertTrue(response_future.cancelled()) 351 with self.assertRaises(grpc.FutureCancelledError): 352 response_future.result() 353 with self.assertRaises(grpc.FutureCancelledError): 354 response_future.exception() 355 with self.assertRaises(grpc.FutureCancelledError): 356 response_future.traceback() 357 self.assertIsNotNone(response_future.initial_metadata()) 358 self.assertIsNotNone(response_future.details()) 359 self.assertIsNotNone(response_future.trailing_metadata()) 360 361 def testCancelledStreamRequestStreamResponse(self): 362 self._cancelled_stream_request_stream_response( 363 stream_stream_multi_callable(self._channel)) 364 365 def testCancelledStreamRequestStreamResponseNonBlocking(self): 366 self._cancelled_stream_request_stream_response( 367 stream_stream_non_blocking_multi_callable(self._channel)) 368 369 def testExpiredUnaryRequestBlockingUnaryResponse(self): 370 request = b'\x07\x17' 371 372 multi_callable = unary_unary_multi_callable(self._channel) 373 with self._control.pause(): 374 with self.assertRaises(grpc.RpcError) as exception_context: 375 multi_callable.with_call( 376 request, 377 timeout=TIMEOUT_SHORT, 378 metadata=(('test', 379 'ExpiredUnaryRequestBlockingUnaryResponse'),)) 380 381 self.assertIsInstance(exception_context.exception, grpc.Call) 382 self.assertIsNotNone(exception_context.exception.initial_metadata()) 383 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, 384 exception_context.exception.code()) 385 self.assertIsNotNone(exception_context.exception.details()) 386 self.assertIsNotNone(exception_context.exception.trailing_metadata()) 387 388 def testExpiredUnaryRequestFutureUnaryResponse(self): 389 request = b'\x07\x17' 390 callback = Callback() 391 392 multi_callable = unary_unary_multi_callable(self._channel) 393 with self._control.pause(): 394 response_future = multi_callable.future( 395 request, 396 timeout=TIMEOUT_SHORT, 397 metadata=(('test', 'ExpiredUnaryRequestFutureUnaryResponse'),)) 398 response_future.add_done_callback(callback) 399 value_passed_to_callback = callback.value() 400 401 self.assertIs(response_future, value_passed_to_callback) 402 self.assertIsNotNone(response_future.initial_metadata()) 403 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, response_future.code()) 404 self.assertIsNotNone(response_future.details()) 405 self.assertIsNotNone(response_future.trailing_metadata()) 406 with self.assertRaises(grpc.RpcError) as exception_context: 407 response_future.result() 408 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, 409 exception_context.exception.code()) 410 self.assertIsInstance(response_future.exception(), grpc.RpcError) 411 self.assertIsNotNone(response_future.traceback()) 412 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, 413 response_future.exception().code()) 414 415 def testExpiredUnaryRequestStreamResponse(self): 416 self._expired_unary_request_stream_response( 417 unary_stream_multi_callable(self._channel)) 418 419 def testExpiredUnaryRequestStreamResponseNonBlocking(self): 420 self._expired_unary_request_stream_response( 421 unary_stream_non_blocking_multi_callable(self._channel)) 422 423 424if __name__ == '__main__': 425 logging.basicConfig() 426 unittest.main(verbosity=2) 427