• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from concurrent import futures
16import time
17import unittest
18
19import grpc
20from grpc.framework.foundation import logging_pool
21import grpc_testing
22
23from tests.testing import _application_common
24from tests.testing import _application_testing_common
25from tests.testing import _client_application
26from tests.testing.proto import requests_pb2
27from tests.testing.proto import services_pb2
28from tests.unit.framework.common import test_constants
29
30
31# TODO(https://github.com/protocolbuffers/protobuf/issues/3452): Drop this skip.
32@unittest.skipIf(
33    services_pb2.DESCRIPTOR.services_by_name.get("FirstService") is None,
34    "Fix protobuf issue 3452!",
35)
36class ClientTest(unittest.TestCase):
37    def setUp(self):
38        # In this test the client-side application under test executes in
39        # a separate thread while we retain use of the test thread to "play
40        # server".
41        self._client_execution_thread_pool = logging_pool.pool(1)
42
43        self._fake_time = grpc_testing.strict_fake_time(time.time())
44        self._real_time = grpc_testing.strict_real_time()
45        self._fake_time_channel = grpc_testing.channel(
46            services_pb2.DESCRIPTOR.services_by_name.values(), self._fake_time
47        )
48        self._real_time_channel = grpc_testing.channel(
49            services_pb2.DESCRIPTOR.services_by_name.values(), self._real_time
50        )
51
52    def tearDown(self):
53        self._client_execution_thread_pool.shutdown(wait=True)
54
55    def test_successful_unary_unary(self):
56        application_future = self._client_execution_thread_pool.submit(
57            _client_application.run,
58            _client_application.Scenario.UNARY_UNARY,
59            self._real_time_channel,
60        )
61        (
62            invocation_metadata,
63            request,
64            rpc,
65        ) = self._real_time_channel.take_unary_unary(
66            _application_testing_common.FIRST_SERVICE_UNUN
67        )
68        rpc.send_initial_metadata(())
69        rpc.terminate(
70            _application_common.UNARY_UNARY_RESPONSE, (), grpc.StatusCode.OK, ""
71        )
72        application_return_value = application_future.result()
73
74        self.assertEqual(_application_common.UNARY_UNARY_REQUEST, request)
75        self.assertIs(
76            application_return_value.kind,
77            _client_application.Outcome.Kind.SATISFACTORY,
78        )
79
80    def test_successful_unary_stream(self):
81        application_future = self._client_execution_thread_pool.submit(
82            _client_application.run,
83            _client_application.Scenario.UNARY_STREAM,
84            self._fake_time_channel,
85        )
86        (
87            invocation_metadata,
88            request,
89            rpc,
90        ) = self._fake_time_channel.take_unary_stream(
91            _application_testing_common.FIRST_SERVICE_UNSTRE
92        )
93        rpc.send_initial_metadata(())
94        rpc.terminate((), grpc.StatusCode.OK, "")
95        application_return_value = application_future.result()
96
97        self.assertEqual(_application_common.UNARY_STREAM_REQUEST, request)
98        self.assertIs(
99            application_return_value.kind,
100            _client_application.Outcome.Kind.SATISFACTORY,
101        )
102
103    def test_successful_stream_unary(self):
104        application_future = self._client_execution_thread_pool.submit(
105            _client_application.run,
106            _client_application.Scenario.STREAM_UNARY,
107            self._real_time_channel,
108        )
109        invocation_metadata, rpc = self._real_time_channel.take_stream_unary(
110            _application_testing_common.FIRST_SERVICE_STREUN
111        )
112        rpc.send_initial_metadata(())
113        first_request = rpc.take_request()
114        second_request = rpc.take_request()
115        third_request = rpc.take_request()
116        rpc.requests_closed()
117        rpc.terminate(
118            _application_common.STREAM_UNARY_RESPONSE,
119            (),
120            grpc.StatusCode.OK,
121            "",
122        )
123        application_return_value = application_future.result()
124
125        self.assertEqual(
126            _application_common.STREAM_UNARY_REQUEST, first_request
127        )
128        self.assertEqual(
129            _application_common.STREAM_UNARY_REQUEST, second_request
130        )
131        self.assertEqual(
132            _application_common.STREAM_UNARY_REQUEST, third_request
133        )
134        self.assertIs(
135            application_return_value.kind,
136            _client_application.Outcome.Kind.SATISFACTORY,
137        )
138
139    def test_successful_stream_stream(self):
140        application_future = self._client_execution_thread_pool.submit(
141            _client_application.run,
142            _client_application.Scenario.STREAM_STREAM,
143            self._fake_time_channel,
144        )
145        invocation_metadata, rpc = self._fake_time_channel.take_stream_stream(
146            _application_testing_common.FIRST_SERVICE_STRESTRE
147        )
148        first_request = rpc.take_request()
149        rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
150        rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
151        second_request = rpc.take_request()
152        rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
153        rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
154        rpc.requests_closed()
155        rpc.terminate((), grpc.StatusCode.OK, "")
156        application_return_value = application_future.result()
157
158        self.assertEqual(
159            _application_common.STREAM_STREAM_REQUEST, first_request
160        )
161        self.assertEqual(
162            _application_common.STREAM_STREAM_REQUEST, second_request
163        )
164        self.assertIs(
165            application_return_value.kind,
166            _client_application.Outcome.Kind.SATISFACTORY,
167        )
168
169    def test_concurrent_stream_stream(self):
170        application_future = self._client_execution_thread_pool.submit(
171            _client_application.run,
172            _client_application.Scenario.CONCURRENT_STREAM_STREAM,
173            self._real_time_channel,
174        )
175        rpcs = []
176        for _ in range(test_constants.RPC_CONCURRENCY):
177            (
178                invocation_metadata,
179                rpc,
180            ) = self._real_time_channel.take_stream_stream(
181                _application_testing_common.FIRST_SERVICE_STRESTRE
182            )
183            rpcs.append(rpc)
184        requests = {}
185        for rpc in rpcs:
186            requests[rpc] = [rpc.take_request()]
187        for rpc in rpcs:
188            rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
189            rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
190        for rpc in rpcs:
191            requests[rpc].append(rpc.take_request())
192        for rpc in rpcs:
193            rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
194            rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
195        for rpc in rpcs:
196            rpc.requests_closed()
197        for rpc in rpcs:
198            rpc.terminate((), grpc.StatusCode.OK, "")
199        application_return_value = application_future.result()
200
201        for requests_of_one_rpc in requests.values():
202            for request in requests_of_one_rpc:
203                self.assertEqual(
204                    _application_common.STREAM_STREAM_REQUEST, request
205                )
206        self.assertIs(
207            application_return_value.kind,
208            _client_application.Outcome.Kind.SATISFACTORY,
209        )
210
211    def test_cancelled_unary_unary(self):
212        application_future = self._client_execution_thread_pool.submit(
213            _client_application.run,
214            _client_application.Scenario.CANCEL_UNARY_UNARY,
215            self._fake_time_channel,
216        )
217        (
218            invocation_metadata,
219            request,
220            rpc,
221        ) = self._fake_time_channel.take_unary_unary(
222            _application_testing_common.FIRST_SERVICE_UNUN
223        )
224        rpc.send_initial_metadata(())
225        rpc.cancelled()
226        application_return_value = application_future.result()
227
228        self.assertEqual(_application_common.UNARY_UNARY_REQUEST, request)
229        self.assertIs(
230            application_return_value.kind,
231            _client_application.Outcome.Kind.SATISFACTORY,
232        )
233
234    def test_status_stream_unary(self):
235        application_future = self._client_execution_thread_pool.submit(
236            _client_application.run,
237            _client_application.Scenario.CONCURRENT_STREAM_UNARY,
238            self._fake_time_channel,
239        )
240        rpcs = tuple(
241            self._fake_time_channel.take_stream_unary(
242                _application_testing_common.FIRST_SERVICE_STREUN
243            )[1]
244            for _ in range(test_constants.THREAD_CONCURRENCY)
245        )
246        for rpc in rpcs:
247            rpc.take_request()
248            rpc.take_request()
249            rpc.take_request()
250            rpc.requests_closed()
251            rpc.send_initial_metadata(
252                (
253                    (
254                        "my_metadata_key",
255                        "My Metadata Value!",
256                    ),
257                )
258            )
259        for rpc in rpcs[:-1]:
260            rpc.terminate(
261                _application_common.STREAM_UNARY_RESPONSE,
262                (),
263                grpc.StatusCode.OK,
264                "",
265            )
266        rpcs[-1].terminate(
267            _application_common.STREAM_UNARY_RESPONSE,
268            (),
269            grpc.StatusCode.RESOURCE_EXHAUSTED,
270            "nope; not able to handle all those RPCs!",
271        )
272        application_return_value = application_future.result()
273
274        self.assertIs(
275            application_return_value.kind,
276            _client_application.Outcome.Kind.UNSATISFACTORY,
277        )
278
279    def test_status_stream_stream(self):
280        code = grpc.StatusCode.DEADLINE_EXCEEDED
281        details = "test deadline exceeded!"
282
283        application_future = self._client_execution_thread_pool.submit(
284            _client_application.run,
285            _client_application.Scenario.STREAM_STREAM,
286            self._real_time_channel,
287        )
288        invocation_metadata, rpc = self._real_time_channel.take_stream_stream(
289            _application_testing_common.FIRST_SERVICE_STRESTRE
290        )
291        first_request = rpc.take_request()
292        rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
293        rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
294        second_request = rpc.take_request()
295        rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
296        rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
297        rpc.requests_closed()
298        rpc.terminate((), code, details)
299        application_return_value = application_future.result()
300
301        self.assertEqual(
302            _application_common.STREAM_STREAM_REQUEST, first_request
303        )
304        self.assertEqual(
305            _application_common.STREAM_STREAM_REQUEST, second_request
306        )
307        self.assertIs(
308            application_return_value.kind,
309            _client_application.Outcome.Kind.RPC_ERROR,
310        )
311        self.assertIs(application_return_value.code, code)
312        self.assertEqual(application_return_value.details, details)
313
314    def test_misbehaving_server_unary_unary(self):
315        application_future = self._client_execution_thread_pool.submit(
316            _client_application.run,
317            _client_application.Scenario.UNARY_UNARY,
318            self._fake_time_channel,
319        )
320        (
321            invocation_metadata,
322            request,
323            rpc,
324        ) = self._fake_time_channel.take_unary_unary(
325            _application_testing_common.FIRST_SERVICE_UNUN
326        )
327        rpc.send_initial_metadata(())
328        rpc.terminate(
329            _application_common.ERRONEOUS_UNARY_UNARY_RESPONSE,
330            (),
331            grpc.StatusCode.OK,
332            "",
333        )
334        application_return_value = application_future.result()
335
336        self.assertEqual(_application_common.UNARY_UNARY_REQUEST, request)
337        self.assertIs(
338            application_return_value.kind,
339            _client_application.Outcome.Kind.UNSATISFACTORY,
340        )
341
342    def test_misbehaving_server_stream_stream(self):
343        application_future = self._client_execution_thread_pool.submit(
344            _client_application.run,
345            _client_application.Scenario.STREAM_STREAM,
346            self._real_time_channel,
347        )
348        invocation_metadata, rpc = self._real_time_channel.take_stream_stream(
349            _application_testing_common.FIRST_SERVICE_STRESTRE
350        )
351        first_request = rpc.take_request()
352        rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
353        rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
354        rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
355        second_request = rpc.take_request()
356        rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
357        rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
358        rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
359        rpc.requests_closed()
360        rpc.terminate((), grpc.StatusCode.OK, "")
361        application_return_value = application_future.result()
362
363        self.assertEqual(
364            _application_common.STREAM_STREAM_REQUEST, first_request
365        )
366        self.assertEqual(
367            _application_common.STREAM_STREAM_REQUEST, second_request
368        )
369        self.assertIs(
370            application_return_value.kind,
371            _client_application.Outcome.Kind.UNSATISFACTORY,
372        )
373
374    def test_infinite_request_stream_real_time(self):
375        application_future = self._client_execution_thread_pool.submit(
376            _client_application.run,
377            _client_application.Scenario.INFINITE_REQUEST_STREAM,
378            self._real_time_channel,
379        )
380        invocation_metadata, rpc = self._real_time_channel.take_stream_unary(
381            _application_testing_common.FIRST_SERVICE_STREUN
382        )
383        rpc.send_initial_metadata(())
384        first_request = rpc.take_request()
385        second_request = rpc.take_request()
386        third_request = rpc.take_request()
387        self._real_time.sleep_for(
388            _application_common.INFINITE_REQUEST_STREAM_TIMEOUT
389        )
390        rpc.terminate(
391            _application_common.STREAM_UNARY_RESPONSE,
392            (),
393            grpc.StatusCode.DEADLINE_EXCEEDED,
394            "",
395        )
396        application_return_value = application_future.result()
397
398        self.assertEqual(
399            _application_common.STREAM_UNARY_REQUEST, first_request
400        )
401        self.assertEqual(
402            _application_common.STREAM_UNARY_REQUEST, second_request
403        )
404        self.assertEqual(
405            _application_common.STREAM_UNARY_REQUEST, third_request
406        )
407        self.assertIs(
408            application_return_value.kind,
409            _client_application.Outcome.Kind.SATISFACTORY,
410        )
411
412
413if __name__ == "__main__":
414    unittest.main(verbosity=2)
415