1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15from concurrent import futures 16import time 17import unittest 18 19import grpc 20from grpc.framework.foundation import logging_pool 21from tests.unit.framework.common import test_constants 22import grpc_testing 23 24from tests.testing import _application_common 25from tests.testing import _application_testing_common 26from tests.testing import _client_application 27from tests.testing.proto import requests_pb2 28from tests.testing.proto import services_pb2 29 30 31# TODO(https://github.com/google/protobuf/issues/3452): Drop this skip. 32@unittest.skipIf( 33 services_pb2.DESCRIPTOR.services_by_name.get('FirstService') is None, 34 'Fix protobuf issue 3452!') 35class ClientTest(unittest.TestCase): 36 37 def setUp(self): 38 # In this test the client-side application under test executes in 39 # a separate thread while we retain use of the test thread to "play 40 # server". 41 self._client_execution_thread_pool = logging_pool.pool(1) 42 43 self._fake_time = grpc_testing.strict_fake_time(time.time()) 44 self._real_time = grpc_testing.strict_real_time() 45 self._fake_time_channel = grpc_testing.channel( 46 services_pb2.DESCRIPTOR.services_by_name.values(), self._fake_time) 47 self._real_time_channel = grpc_testing.channel( 48 services_pb2.DESCRIPTOR.services_by_name.values(), self._real_time) 49 50 def tearDown(self): 51 self._client_execution_thread_pool.shutdown(wait=True) 52 53 def test_successful_unary_unary(self): 54 application_future = self._client_execution_thread_pool.submit( 55 _client_application.run, _client_application.Scenario.UNARY_UNARY, 56 self._real_time_channel) 57 invocation_metadata, request, rpc = ( 58 self._real_time_channel.take_unary_unary( 59 _application_testing_common.FIRST_SERVICE_UNUN)) 60 rpc.send_initial_metadata(()) 61 rpc.terminate(_application_common.UNARY_UNARY_RESPONSE, (), 62 grpc.StatusCode.OK, '') 63 application_return_value = application_future.result() 64 65 self.assertEqual(_application_common.UNARY_UNARY_REQUEST, request) 66 self.assertIs(application_return_value.kind, 67 _client_application.Outcome.Kind.SATISFACTORY) 68 69 def test_successful_unary_stream(self): 70 application_future = self._client_execution_thread_pool.submit( 71 _client_application.run, _client_application.Scenario.UNARY_STREAM, 72 self._fake_time_channel) 73 invocation_metadata, request, rpc = ( 74 self._fake_time_channel.take_unary_stream( 75 _application_testing_common.FIRST_SERVICE_UNSTRE)) 76 rpc.send_initial_metadata(()) 77 rpc.terminate((), grpc.StatusCode.OK, '') 78 application_return_value = application_future.result() 79 80 self.assertEqual(_application_common.UNARY_STREAM_REQUEST, request) 81 self.assertIs(application_return_value.kind, 82 _client_application.Outcome.Kind.SATISFACTORY) 83 84 def test_successful_stream_unary(self): 85 application_future = self._client_execution_thread_pool.submit( 86 _client_application.run, _client_application.Scenario.STREAM_UNARY, 87 self._real_time_channel) 88 invocation_metadata, rpc = self._real_time_channel.take_stream_unary( 89 _application_testing_common.FIRST_SERVICE_STREUN) 90 rpc.send_initial_metadata(()) 91 first_request = rpc.take_request() 92 second_request = rpc.take_request() 93 third_request = rpc.take_request() 94 rpc.requests_closed() 95 rpc.terminate(_application_common.STREAM_UNARY_RESPONSE, (), 96 grpc.StatusCode.OK, '') 97 application_return_value = application_future.result() 98 99 self.assertEqual(_application_common.STREAM_UNARY_REQUEST, 100 first_request) 101 self.assertEqual(_application_common.STREAM_UNARY_REQUEST, 102 second_request) 103 self.assertEqual(_application_common.STREAM_UNARY_REQUEST, 104 third_request) 105 self.assertIs(application_return_value.kind, 106 _client_application.Outcome.Kind.SATISFACTORY) 107 108 def test_successful_stream_stream(self): 109 application_future = self._client_execution_thread_pool.submit( 110 _client_application.run, _client_application.Scenario.STREAM_STREAM, 111 self._fake_time_channel) 112 invocation_metadata, rpc = self._fake_time_channel.take_stream_stream( 113 _application_testing_common.FIRST_SERVICE_STRESTRE) 114 first_request = rpc.take_request() 115 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 116 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 117 second_request = rpc.take_request() 118 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 119 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 120 rpc.requests_closed() 121 rpc.terminate((), grpc.StatusCode.OK, '') 122 application_return_value = application_future.result() 123 124 self.assertEqual(_application_common.STREAM_STREAM_REQUEST, 125 first_request) 126 self.assertEqual(_application_common.STREAM_STREAM_REQUEST, 127 second_request) 128 self.assertIs(application_return_value.kind, 129 _client_application.Outcome.Kind.SATISFACTORY) 130 131 def test_concurrent_stream_stream(self): 132 application_future = self._client_execution_thread_pool.submit( 133 _client_application.run, 134 _client_application.Scenario.CONCURRENT_STREAM_STREAM, 135 self._real_time_channel) 136 rpcs = [] 137 for _ in range(test_constants.RPC_CONCURRENCY): 138 invocation_metadata, rpc = ( 139 self._real_time_channel.take_stream_stream( 140 _application_testing_common.FIRST_SERVICE_STRESTRE)) 141 rpcs.append(rpc) 142 requests = {} 143 for rpc in rpcs: 144 requests[rpc] = [rpc.take_request()] 145 for rpc in rpcs: 146 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 147 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 148 for rpc in rpcs: 149 requests[rpc].append(rpc.take_request()) 150 for rpc in rpcs: 151 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 152 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 153 for rpc in rpcs: 154 rpc.requests_closed() 155 for rpc in rpcs: 156 rpc.terminate((), grpc.StatusCode.OK, '') 157 application_return_value = application_future.result() 158 159 for requests_of_one_rpc in requests.values(): 160 for request in requests_of_one_rpc: 161 self.assertEqual(_application_common.STREAM_STREAM_REQUEST, 162 request) 163 self.assertIs(application_return_value.kind, 164 _client_application.Outcome.Kind.SATISFACTORY) 165 166 def test_cancelled_unary_unary(self): 167 application_future = self._client_execution_thread_pool.submit( 168 _client_application.run, 169 _client_application.Scenario.CANCEL_UNARY_UNARY, 170 self._fake_time_channel) 171 invocation_metadata, request, rpc = ( 172 self._fake_time_channel.take_unary_unary( 173 _application_testing_common.FIRST_SERVICE_UNUN)) 174 rpc.send_initial_metadata(()) 175 rpc.cancelled() 176 application_return_value = application_future.result() 177 178 self.assertEqual(_application_common.UNARY_UNARY_REQUEST, request) 179 self.assertIs(application_return_value.kind, 180 _client_application.Outcome.Kind.SATISFACTORY) 181 182 def test_status_stream_unary(self): 183 application_future = self._client_execution_thread_pool.submit( 184 _client_application.run, 185 _client_application.Scenario.CONCURRENT_STREAM_UNARY, 186 self._fake_time_channel) 187 rpcs = tuple( 188 self._fake_time_channel.take_stream_unary( 189 _application_testing_common.FIRST_SERVICE_STREUN)[1] 190 for _ in range(test_constants.THREAD_CONCURRENCY)) 191 for rpc in rpcs: 192 rpc.take_request() 193 rpc.take_request() 194 rpc.take_request() 195 rpc.requests_closed() 196 rpc.send_initial_metadata((( 197 'my_metadata_key', 198 'My Metadata Value!', 199 ),)) 200 for rpc in rpcs[:-1]: 201 rpc.terminate(_application_common.STREAM_UNARY_RESPONSE, (), 202 grpc.StatusCode.OK, '') 203 rpcs[-1].terminate(_application_common.STREAM_UNARY_RESPONSE, (), 204 grpc.StatusCode.RESOURCE_EXHAUSTED, 205 'nope; not able to handle all those RPCs!') 206 application_return_value = application_future.result() 207 208 self.assertIs(application_return_value.kind, 209 _client_application.Outcome.Kind.UNSATISFACTORY) 210 211 def test_status_stream_stream(self): 212 code = grpc.StatusCode.DEADLINE_EXCEEDED 213 details = 'test deadline exceeded!' 214 215 application_future = self._client_execution_thread_pool.submit( 216 _client_application.run, _client_application.Scenario.STREAM_STREAM, 217 self._real_time_channel) 218 invocation_metadata, rpc = self._real_time_channel.take_stream_stream( 219 _application_testing_common.FIRST_SERVICE_STRESTRE) 220 first_request = rpc.take_request() 221 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 222 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 223 second_request = rpc.take_request() 224 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 225 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 226 rpc.requests_closed() 227 rpc.terminate((), code, details) 228 application_return_value = application_future.result() 229 230 self.assertEqual(_application_common.STREAM_STREAM_REQUEST, 231 first_request) 232 self.assertEqual(_application_common.STREAM_STREAM_REQUEST, 233 second_request) 234 self.assertIs(application_return_value.kind, 235 _client_application.Outcome.Kind.RPC_ERROR) 236 self.assertIs(application_return_value.code, code) 237 self.assertEqual(application_return_value.details, details) 238 239 def test_misbehaving_server_unary_unary(self): 240 application_future = self._client_execution_thread_pool.submit( 241 _client_application.run, _client_application.Scenario.UNARY_UNARY, 242 self._fake_time_channel) 243 invocation_metadata, request, rpc = ( 244 self._fake_time_channel.take_unary_unary( 245 _application_testing_common.FIRST_SERVICE_UNUN)) 246 rpc.send_initial_metadata(()) 247 rpc.terminate(_application_common.ERRONEOUS_UNARY_UNARY_RESPONSE, (), 248 grpc.StatusCode.OK, '') 249 application_return_value = application_future.result() 250 251 self.assertEqual(_application_common.UNARY_UNARY_REQUEST, request) 252 self.assertIs(application_return_value.kind, 253 _client_application.Outcome.Kind.UNSATISFACTORY) 254 255 def test_misbehaving_server_stream_stream(self): 256 application_future = self._client_execution_thread_pool.submit( 257 _client_application.run, _client_application.Scenario.STREAM_STREAM, 258 self._real_time_channel) 259 invocation_metadata, rpc = self._real_time_channel.take_stream_stream( 260 _application_testing_common.FIRST_SERVICE_STRESTRE) 261 first_request = rpc.take_request() 262 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 263 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 264 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 265 second_request = rpc.take_request() 266 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 267 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 268 rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) 269 rpc.requests_closed() 270 rpc.terminate((), grpc.StatusCode.OK, '') 271 application_return_value = application_future.result() 272 273 self.assertEqual(_application_common.STREAM_STREAM_REQUEST, 274 first_request) 275 self.assertEqual(_application_common.STREAM_STREAM_REQUEST, 276 second_request) 277 self.assertIs(application_return_value.kind, 278 _client_application.Outcome.Kind.UNSATISFACTORY) 279 280 def test_infinite_request_stream_real_time(self): 281 application_future = self._client_execution_thread_pool.submit( 282 _client_application.run, 283 _client_application.Scenario.INFINITE_REQUEST_STREAM, 284 self._real_time_channel) 285 invocation_metadata, rpc = self._real_time_channel.take_stream_unary( 286 _application_testing_common.FIRST_SERVICE_STREUN) 287 rpc.send_initial_metadata(()) 288 first_request = rpc.take_request() 289 second_request = rpc.take_request() 290 third_request = rpc.take_request() 291 self._real_time.sleep_for( 292 _application_common.INFINITE_REQUEST_STREAM_TIMEOUT) 293 rpc.terminate(_application_common.STREAM_UNARY_RESPONSE, (), 294 grpc.StatusCode.DEADLINE_EXCEEDED, '') 295 application_return_value = application_future.result() 296 297 self.assertEqual(_application_common.STREAM_UNARY_REQUEST, 298 first_request) 299 self.assertEqual(_application_common.STREAM_UNARY_REQUEST, 300 second_request) 301 self.assertEqual(_application_common.STREAM_UNARY_REQUEST, 302 third_request) 303 self.assertIs(application_return_value.kind, 304 _client_application.Outcome.Kind.SATISFACTORY) 305 306 307if __name__ == '__main__': 308 unittest.main(verbosity=2) 309