1# Copyright 2016 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Test of RPCs made against gRPC Python's application-layer API.""" 15 16from concurrent import futures 17import itertools 18import logging 19import threading 20import unittest 21 22import grpc 23from grpc.framework.foundation import logging_pool 24 25from tests.unit._rpc_test_helpers import ( 26 stream_stream_non_blocking_multi_callable, 27) 28from tests.unit._rpc_test_helpers import ( 29 unary_stream_non_blocking_multi_callable, 30) 31from tests.unit._rpc_test_helpers import BaseRPCTest 32from tests.unit._rpc_test_helpers import Callback 33from tests.unit._rpc_test_helpers import TIMEOUT_SHORT 34from tests.unit._rpc_test_helpers import stream_stream_multi_callable 35from tests.unit._rpc_test_helpers import stream_unary_multi_callable 36from tests.unit._rpc_test_helpers import unary_stream_multi_callable 37from tests.unit._rpc_test_helpers import unary_unary_multi_callable 38from tests.unit.framework.common import test_constants 39 40 41class RPCPart1Test(BaseRPCTest, unittest.TestCase): 42 def testExpiredStreamRequestBlockingUnaryResponse(self): 43 requests = tuple( 44 b"\x07\x08" for _ in range(test_constants.STREAM_LENGTH) 45 ) 46 request_iterator = iter(requests) 47 48 multi_callable = stream_unary_multi_callable(self._channel) 49 with self._control.pause(): 50 with self.assertRaises(grpc.RpcError) as exception_context: 51 multi_callable( 52 request_iterator, 53 timeout=TIMEOUT_SHORT, 54 metadata=( 55 ("test", "ExpiredStreamRequestBlockingUnaryResponse"), 56 ), 57 ) 58 59 self.assertIsInstance(exception_context.exception, grpc.RpcError) 60 self.assertIsInstance(exception_context.exception, grpc.Call) 61 self.assertIsNotNone(exception_context.exception.initial_metadata()) 62 self.assertIs( 63 grpc.StatusCode.DEADLINE_EXCEEDED, 64 exception_context.exception.code(), 65 ) 66 self.assertIsNotNone(exception_context.exception.details()) 67 self.assertIsNotNone(exception_context.exception.trailing_metadata()) 68 69 def testExpiredStreamRequestFutureUnaryResponse(self): 70 requests = tuple( 71 b"\x07\x18" for _ in range(test_constants.STREAM_LENGTH) 72 ) 73 request_iterator = iter(requests) 74 callback = Callback() 75 76 multi_callable = stream_unary_multi_callable(self._channel) 77 with self._control.pause(): 78 response_future = multi_callable.future( 79 request_iterator, 80 timeout=TIMEOUT_SHORT, 81 metadata=(("test", "ExpiredStreamRequestFutureUnaryResponse"),), 82 ) 83 with self.assertRaises(grpc.FutureTimeoutError): 84 response_future.result(timeout=TIMEOUT_SHORT / 2.0) 85 response_future.add_done_callback(callback) 86 value_passed_to_callback = callback.value() 87 88 with self.assertRaises(grpc.RpcError) as exception_context: 89 response_future.result() 90 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, response_future.code()) 91 self.assertIs( 92 grpc.StatusCode.DEADLINE_EXCEEDED, 93 exception_context.exception.code(), 94 ) 95 self.assertIsInstance(response_future.exception(), grpc.RpcError) 96 self.assertIsNotNone(response_future.traceback()) 97 self.assertIs(response_future, value_passed_to_callback) 98 self.assertIsNotNone(response_future.initial_metadata()) 99 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, response_future.code()) 100 self.assertIsNotNone(response_future.details()) 101 self.assertIsNotNone(response_future.trailing_metadata()) 102 103 def testExpiredStreamRequestStreamResponse(self): 104 self._expired_stream_request_stream_response( 105 stream_stream_multi_callable(self._channel) 106 ) 107 108 def testExpiredStreamRequestStreamResponseNonBlocking(self): 109 self._expired_stream_request_stream_response( 110 stream_stream_non_blocking_multi_callable(self._channel) 111 ) 112 113 def testFailedUnaryRequestBlockingUnaryResponse(self): 114 request = b"\x37\x17" 115 116 multi_callable = unary_unary_multi_callable(self._channel) 117 with self._control.fail(): 118 with self.assertRaises(grpc.RpcError) as exception_context: 119 multi_callable.with_call( 120 request, 121 metadata=( 122 ("test", "FailedUnaryRequestBlockingUnaryResponse"), 123 ), 124 ) 125 126 self.assertIs( 127 grpc.StatusCode.UNKNOWN, exception_context.exception.code() 128 ) 129 # sanity checks on to make sure returned string contains default members 130 # of the error 131 debug_error_string = exception_context.exception.debug_error_string() 132 self.assertIn("grpc_status", debug_error_string) 133 self.assertIn("grpc_message", debug_error_string) 134 135 def testFailedUnaryRequestFutureUnaryResponse(self): 136 request = b"\x37\x17" 137 callback = Callback() 138 139 multi_callable = unary_unary_multi_callable(self._channel) 140 with self._control.fail(): 141 response_future = multi_callable.future( 142 request, 143 metadata=(("test", "FailedUnaryRequestFutureUnaryResponse"),), 144 ) 145 response_future.add_done_callback(callback) 146 value_passed_to_callback = callback.value() 147 148 self.assertIsInstance(response_future, grpc.Future) 149 self.assertIsInstance(response_future, grpc.Call) 150 with self.assertRaises(grpc.RpcError) as exception_context: 151 response_future.result() 152 self.assertIs( 153 grpc.StatusCode.UNKNOWN, exception_context.exception.code() 154 ) 155 self.assertIsInstance(response_future.exception(), grpc.RpcError) 156 self.assertIsNotNone(response_future.traceback()) 157 self.assertIs( 158 grpc.StatusCode.UNKNOWN, response_future.exception().code() 159 ) 160 self.assertIs(response_future, value_passed_to_callback) 161 162 def testFailedUnaryRequestStreamResponse(self): 163 self._failed_unary_request_stream_response( 164 unary_stream_multi_callable(self._channel) 165 ) 166 167 def testFailedUnaryRequestStreamResponseNonBlocking(self): 168 self._failed_unary_request_stream_response( 169 unary_stream_non_blocking_multi_callable(self._channel) 170 ) 171 172 def testFailedStreamRequestBlockingUnaryResponse(self): 173 requests = tuple( 174 b"\x47\x58" for _ in range(test_constants.STREAM_LENGTH) 175 ) 176 request_iterator = iter(requests) 177 178 multi_callable = stream_unary_multi_callable(self._channel) 179 with self._control.fail(): 180 with self.assertRaises(grpc.RpcError) as exception_context: 181 multi_callable( 182 request_iterator, 183 metadata=( 184 ("test", "FailedStreamRequestBlockingUnaryResponse"), 185 ), 186 ) 187 188 self.assertIs( 189 grpc.StatusCode.UNKNOWN, exception_context.exception.code() 190 ) 191 192 def testFailedStreamRequestFutureUnaryResponse(self): 193 requests = tuple( 194 b"\x07\x18" for _ in range(test_constants.STREAM_LENGTH) 195 ) 196 request_iterator = iter(requests) 197 callback = Callback() 198 199 multi_callable = stream_unary_multi_callable(self._channel) 200 with self._control.fail(): 201 response_future = multi_callable.future( 202 request_iterator, 203 metadata=(("test", "FailedStreamRequestFutureUnaryResponse"),), 204 ) 205 response_future.add_done_callback(callback) 206 value_passed_to_callback = callback.value() 207 208 with self.assertRaises(grpc.RpcError) as exception_context: 209 response_future.result() 210 self.assertIs(grpc.StatusCode.UNKNOWN, response_future.code()) 211 self.assertIs( 212 grpc.StatusCode.UNKNOWN, exception_context.exception.code() 213 ) 214 self.assertIsInstance(response_future.exception(), grpc.RpcError) 215 self.assertIsNotNone(response_future.traceback()) 216 self.assertIs(response_future, value_passed_to_callback) 217 218 def testFailedStreamRequestStreamResponse(self): 219 self._failed_stream_request_stream_response( 220 stream_stream_multi_callable(self._channel) 221 ) 222 223 def testFailedStreamRequestStreamResponseNonBlocking(self): 224 self._failed_stream_request_stream_response( 225 stream_stream_non_blocking_multi_callable(self._channel) 226 ) 227 228 def testIgnoredUnaryRequestFutureUnaryResponse(self): 229 request = b"\x37\x17" 230 231 multi_callable = unary_unary_multi_callable(self._channel) 232 multi_callable.future( 233 request, 234 metadata=(("test", "IgnoredUnaryRequestFutureUnaryResponse"),), 235 ) 236 237 def testIgnoredUnaryRequestStreamResponse(self): 238 self._ignored_unary_stream_request_future_unary_response( 239 unary_stream_multi_callable(self._channel) 240 ) 241 242 def testIgnoredUnaryRequestStreamResponseNonBlocking(self): 243 self._ignored_unary_stream_request_future_unary_response( 244 unary_stream_non_blocking_multi_callable(self._channel) 245 ) 246 247 def testIgnoredStreamRequestFutureUnaryResponse(self): 248 requests = tuple( 249 b"\x07\x18" for _ in range(test_constants.STREAM_LENGTH) 250 ) 251 request_iterator = iter(requests) 252 253 multi_callable = stream_unary_multi_callable(self._channel) 254 multi_callable.future( 255 request_iterator, 256 metadata=(("test", "IgnoredStreamRequestFutureUnaryResponse"),), 257 ) 258 259 def testIgnoredStreamRequestStreamResponse(self): 260 self._ignored_stream_request_stream_response( 261 stream_stream_multi_callable(self._channel) 262 ) 263 264 def testIgnoredStreamRequestStreamResponseNonBlocking(self): 265 self._ignored_stream_request_stream_response( 266 stream_stream_non_blocking_multi_callable(self._channel) 267 ) 268 269 270if __name__ == "__main__": 271 logging.basicConfig() 272 unittest.main(verbosity=3) 273