1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15from concurrent import futures 16import time 17import unittest 18 19import grpc 20from grpc.framework.foundation import logging_pool 21import grpc_testing 22 23from tests.testing import _application_common 24from tests.testing import _application_testing_common 25from tests.testing import _client_application 26from tests.testing.proto import requests_pb2 27from tests.testing.proto import services_pb2 28from tests.unit.framework.common import test_constants 29 30 31# TODO(https://github.com/protocolbuffers/protobuf/issues/3452): Drop this skip. 32@unittest.skipIf( 33 services_pb2.DESCRIPTOR.services_by_name.get("FirstService") is None, 34 "Fix protobuf issue 3452!", 35) 36class ClientTest(unittest.TestCase): 37 def setUp(self): 38 # In this test the client-side application under test executes in 39 # a separate thread while we retain use of the test thread to "play 40 # server". 41 self._client_execution_thread_pool = logging_pool.pool(1) 42 43 self._fake_time = grpc_testing.strict_fake_time(time.time()) 44 self._real_time = grpc_testing.strict_real_time() 45 self._fake_time_channel = grpc_testing.channel( 46 services_pb2.DESCRIPTOR.services_by_name.values(), self._fake_time 47 ) 48 self._real_time_channel = grpc_testing.channel( 49 services_pb2.DESCRIPTOR.services_by_name.values(), self._real_time 50 ) 51 52 def tearDown(self): 53 self._client_execution_thread_pool.shutdown(wait=True) 54 55 def test_successful_unary_unary(self): 56 application_future = self._client_execution_thread_pool.submit( 57 _client_application.run, 58 _client_application.Scenario.UNARY_UNARY, 59 self._real_time_channel, 60 ) 61 ( 62 invocation_metadata, 63 request, 64 rpc, 65 ) = self._real_time_channel.take_unary_unary( 66 _application_testing_common.FIRST_SERVICE_UNUN 67 ) 68 rpc.send_initial_metadata(()) 69 rpc.terminate( 70 _application_common.UNARY_UNARY_RESPONSE, (), grpc.StatusCode.OK, "" 71 ) 72 application_return_value = application_future.result() 73 74 self.assertEqual(_application_common.UNARY_UNARY_REQUEST, request) 75 self.assertIs( 76 application_return_value.kind, 77 _client_application.Outcome.Kind.SATISFACTORY, 78 ) 79 80 def test_successful_unary_stream(self): 81 application_future = self._client_execution_thread_pool.submit( 82 _client_application.run, 83 _client_application.Scenario.UNARY_STREAM, 84 self._fake_time_channel, 85 ) 86 ( 87 invocation_metadata, 88 request, 89 rpc, 90 ) = self._fake_time_channel.take_unary_stream( 91 _application_testing_common.FIRST_SERVICE_UNSTRE 92 ) 93 rpc.send_initial_metadata(()) 94 rpc.terminate((), grpc.StatusCode.OK, "") 95 application_return_value = application_future.result() 96 97 self.assertEqual(_application_common.UNARY_STREAM_REQUEST, request) 98 self.assertIs( 99 application_return_value.kind, 100 _client_application.Outcome.Kind.SATISFACTORY, 101 ) 102 103 def test_successful_stream_unary(self): 104 application_future = self._client_execution_thread_pool.submit( 105 _client_application.run, 106 _client_application.Scenario.STREAM_UNARY, 107 self._real_time_channel, 108 ) 109 invocation_metadata, rpc = self._real_time_channel.take_stream_unary( 110 _application_testing_common.FIRST_SERVICE_STREUN 111 ) 112 rpc.send_initial_metadata(()) 113 first_request = rpc.take_request() 114 second_request = rpc.take_request() 115 third_request = rpc.take_request() 116 rpc.requests_closed() 117 rpc.terminate( 118 _application_common.STREAM_UNARY_RESPONSE, 119 (), 120 grpc.StatusCode.OK, 121 "", 122 ) 123 application_return_value = application_future.result() 124 125 self.assertEqual( 126 _application_common.STREAM_UNARY_REQUEST, first_request 127 ) 128 self.assertEqual( 129 _application_common.STREAM_UNARY_REQUEST, second_request 130 ) 131 self.assertEqual( 132 _application_common.STREAM_UNARY_REQUEST, third_request 133 ) 134 self.assertIs( 135 application_return_value.kind, 136 _client_application.Outcome.Kind.SATISFACTORY, 137 ) 138 139 def test_successful_stream_stream(self): 140 application_future = self._client_execution_thread_pool.submit( 141 _client_application.run, 142 _client_application.Scenario.STREAM_STREAM, 143 self._fake_time_channel, 144 ) 145 invocation_metadata, rpc = self._fake_time_channel.take_stream_stream( 146 _application_testing_common.FIRST_SERVICE_STRESTRE 147 ) 148 first_request = rpc.take_request() 149 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 150 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 151 second_request = rpc.take_request() 152 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 153 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 154 rpc.requests_closed() 155 rpc.terminate((), grpc.StatusCode.OK, "") 156 application_return_value = application_future.result() 157 158 self.assertEqual( 159 _application_common.STREAM_STREAM_REQUEST, first_request 160 ) 161 self.assertEqual( 162 _application_common.STREAM_STREAM_REQUEST, second_request 163 ) 164 self.assertIs( 165 application_return_value.kind, 166 _client_application.Outcome.Kind.SATISFACTORY, 167 ) 168 169 def test_concurrent_stream_stream(self): 170 application_future = self._client_execution_thread_pool.submit( 171 _client_application.run, 172 _client_application.Scenario.CONCURRENT_STREAM_STREAM, 173 self._real_time_channel, 174 ) 175 rpcs = [] 176 for _ in range(test_constants.RPC_CONCURRENCY): 177 ( 178 invocation_metadata, 179 rpc, 180 ) = self._real_time_channel.take_stream_stream( 181 _application_testing_common.FIRST_SERVICE_STRESTRE 182 ) 183 rpcs.append(rpc) 184 requests = {} 185 for rpc in rpcs: 186 requests[rpc] = [rpc.take_request()] 187 for rpc in rpcs: 188 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 189 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 190 for rpc in rpcs: 191 requests[rpc].append(rpc.take_request()) 192 for rpc in rpcs: 193 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 194 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 195 for rpc in rpcs: 196 rpc.requests_closed() 197 for rpc in rpcs: 198 rpc.terminate((), grpc.StatusCode.OK, "") 199 application_return_value = application_future.result() 200 201 for requests_of_one_rpc in requests.values(): 202 for request in requests_of_one_rpc: 203 self.assertEqual( 204 _application_common.STREAM_STREAM_REQUEST, request 205 ) 206 self.assertIs( 207 application_return_value.kind, 208 _client_application.Outcome.Kind.SATISFACTORY, 209 ) 210 211 def test_cancelled_unary_unary(self): 212 application_future = self._client_execution_thread_pool.submit( 213 _client_application.run, 214 _client_application.Scenario.CANCEL_UNARY_UNARY, 215 self._fake_time_channel, 216 ) 217 ( 218 invocation_metadata, 219 request, 220 rpc, 221 ) = self._fake_time_channel.take_unary_unary( 222 _application_testing_common.FIRST_SERVICE_UNUN 223 ) 224 rpc.send_initial_metadata(()) 225 rpc.cancelled() 226 application_return_value = application_future.result() 227 228 self.assertEqual(_application_common.UNARY_UNARY_REQUEST, request) 229 self.assertIs( 230 application_return_value.kind, 231 _client_application.Outcome.Kind.SATISFACTORY, 232 ) 233 234 def test_status_stream_unary(self): 235 application_future = self._client_execution_thread_pool.submit( 236 _client_application.run, 237 _client_application.Scenario.CONCURRENT_STREAM_UNARY, 238 self._fake_time_channel, 239 ) 240 rpcs = tuple( 241 self._fake_time_channel.take_stream_unary( 242 _application_testing_common.FIRST_SERVICE_STREUN 243 )[1] 244 for _ in range(test_constants.THREAD_CONCURRENCY) 245 ) 246 for rpc in rpcs: 247 rpc.take_request() 248 rpc.take_request() 249 rpc.take_request() 250 rpc.requests_closed() 251 rpc.send_initial_metadata( 252 ( 253 ( 254 "my_metadata_key", 255 "My Metadata Value!", 256 ), 257 ) 258 ) 259 for rpc in rpcs[:-1]: 260 rpc.terminate( 261 _application_common.STREAM_UNARY_RESPONSE, 262 (), 263 grpc.StatusCode.OK, 264 "", 265 ) 266 rpcs[-1].terminate( 267 _application_common.STREAM_UNARY_RESPONSE, 268 (), 269 grpc.StatusCode.RESOURCE_EXHAUSTED, 270 "nope; not able to handle all those RPCs!", 271 ) 272 application_return_value = application_future.result() 273 274 self.assertIs( 275 application_return_value.kind, 276 _client_application.Outcome.Kind.UNSATISFACTORY, 277 ) 278 279 def test_status_stream_stream(self): 280 code = grpc.StatusCode.DEADLINE_EXCEEDED 281 details = "test deadline exceeded!" 282 283 application_future = self._client_execution_thread_pool.submit( 284 _client_application.run, 285 _client_application.Scenario.STREAM_STREAM, 286 self._real_time_channel, 287 ) 288 invocation_metadata, rpc = self._real_time_channel.take_stream_stream( 289 _application_testing_common.FIRST_SERVICE_STRESTRE 290 ) 291 first_request = rpc.take_request() 292 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 293 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 294 second_request = rpc.take_request() 295 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 296 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 297 rpc.requests_closed() 298 rpc.terminate((), code, details) 299 application_return_value = application_future.result() 300 301 self.assertEqual( 302 _application_common.STREAM_STREAM_REQUEST, first_request 303 ) 304 self.assertEqual( 305 _application_common.STREAM_STREAM_REQUEST, second_request 306 ) 307 self.assertIs( 308 application_return_value.kind, 309 _client_application.Outcome.Kind.RPC_ERROR, 310 ) 311 self.assertIs(application_return_value.code, code) 312 self.assertEqual(application_return_value.details, details) 313 314 def test_misbehaving_server_unary_unary(self): 315 application_future = self._client_execution_thread_pool.submit( 316 _client_application.run, 317 _client_application.Scenario.UNARY_UNARY, 318 self._fake_time_channel, 319 ) 320 ( 321 invocation_metadata, 322 request, 323 rpc, 324 ) = self._fake_time_channel.take_unary_unary( 325 _application_testing_common.FIRST_SERVICE_UNUN 326 ) 327 rpc.send_initial_metadata(()) 328 rpc.terminate( 329 _application_common.ERRONEOUS_UNARY_UNARY_RESPONSE, 330 (), 331 grpc.StatusCode.OK, 332 "", 333 ) 334 application_return_value = application_future.result() 335 336 self.assertEqual(_application_common.UNARY_UNARY_REQUEST, request) 337 self.assertIs( 338 application_return_value.kind, 339 _client_application.Outcome.Kind.UNSATISFACTORY, 340 ) 341 342 def test_misbehaving_server_stream_stream(self): 343 application_future = self._client_execution_thread_pool.submit( 344 _client_application.run, 345 _client_application.Scenario.STREAM_STREAM, 346 self._real_time_channel, 347 ) 348 invocation_metadata, rpc = self._real_time_channel.take_stream_stream( 349 _application_testing_common.FIRST_SERVICE_STRESTRE 350 ) 351 first_request = rpc.take_request() 352 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 353 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 354 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 355 second_request = rpc.take_request() 356 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 357 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 358 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 359 rpc.requests_closed() 360 rpc.terminate((), grpc.StatusCode.OK, "") 361 application_return_value = application_future.result() 362 363 self.assertEqual( 364 _application_common.STREAM_STREAM_REQUEST, first_request 365 ) 366 self.assertEqual( 367 _application_common.STREAM_STREAM_REQUEST, second_request 368 ) 369 self.assertIs( 370 application_return_value.kind, 371 _client_application.Outcome.Kind.UNSATISFACTORY, 372 ) 373 374 def test_infinite_request_stream_real_time(self): 375 application_future = self._client_execution_thread_pool.submit( 376 _client_application.run, 377 _client_application.Scenario.INFINITE_REQUEST_STREAM, 378 self._real_time_channel, 379 ) 380 invocation_metadata, rpc = self._real_time_channel.take_stream_unary( 381 _application_testing_common.FIRST_SERVICE_STREUN 382 ) 383 rpc.send_initial_metadata(()) 384 first_request = rpc.take_request() 385 second_request = rpc.take_request() 386 third_request = rpc.take_request() 387 self._real_time.sleep_for( 388 _application_common.INFINITE_REQUEST_STREAM_TIMEOUT 389 ) 390 rpc.terminate( 391 _application_common.STREAM_UNARY_RESPONSE, 392 (), 393 grpc.StatusCode.DEADLINE_EXCEEDED, 394 "", 395 ) 396 application_return_value = application_future.result() 397 398 self.assertEqual( 399 _application_common.STREAM_UNARY_REQUEST, first_request 400 ) 401 self.assertEqual( 402 _application_common.STREAM_UNARY_REQUEST, second_request 403 ) 404 self.assertEqual( 405 _application_common.STREAM_UNARY_REQUEST, third_request 406 ) 407 self.assertIs( 408 application_return_value.kind, 409 _client_application.Outcome.Kind.SATISFACTORY, 410 ) 411 412 413if __name__ == "__main__": 414 unittest.main(verbosity=2) 415