• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import collections
16import contextlib
17import distutils.spawn
18import errno
19import os
20import shutil
21import subprocess
22import sys
23import tempfile
24import threading
25import unittest
26
27from six import moves
28
29import grpc
30import grpc.experimental
31from tests.unit import test_common
32from tests.unit.framework.common import test_constants
33
34import tests.protoc_plugin.protos.payload.test_payload_pb2 as payload_pb2
35import tests.protoc_plugin.protos.requests.r.test_requests_pb2 as request_pb2
36import tests.protoc_plugin.protos.responses.test_responses_pb2 as response_pb2
37import tests.protoc_plugin.protos.service.test_service_pb2_grpc as service_pb2_grpc
38
39# Identifiers of entities we expect to find in the generated module.
40STUB_IDENTIFIER = 'TestServiceStub'
41SERVICER_IDENTIFIER = 'TestServiceServicer'
42ADD_SERVICER_TO_SERVER_IDENTIFIER = 'add_TestServiceServicer_to_server'
43
44
45class _ServicerMethods(object):
46
47    def __init__(self):
48        self._condition = threading.Condition()
49        self._paused = False
50        self._fail = False
51
52    @contextlib.contextmanager
53    def pause(self):  # pylint: disable=invalid-name
54        with self._condition:
55            self._paused = True
56        yield
57        with self._condition:
58            self._paused = False
59            self._condition.notify_all()
60
61    @contextlib.contextmanager
62    def fail(self):  # pylint: disable=invalid-name
63        with self._condition:
64            self._fail = True
65        yield
66        with self._condition:
67            self._fail = False
68
69    def _control(self):  # pylint: disable=invalid-name
70        with self._condition:
71            if self._fail:
72                raise ValueError()
73            while self._paused:
74                self._condition.wait()
75
76    def UnaryCall(self, request, unused_rpc_context):
77        response = response_pb2.SimpleResponse()
78        response.payload.payload_type = payload_pb2.COMPRESSABLE
79        response.payload.payload_compressable = 'a' * request.response_size
80        self._control()
81        return response
82
83    def StreamingOutputCall(self, request, unused_rpc_context):
84        for parameter in request.response_parameters:
85            response = response_pb2.StreamingOutputCallResponse()
86            response.payload.payload_type = payload_pb2.COMPRESSABLE
87            response.payload.payload_compressable = 'a' * parameter.size
88            self._control()
89            yield response
90
91    def StreamingInputCall(self, request_iter, unused_rpc_context):
92        response = response_pb2.StreamingInputCallResponse()
93        aggregated_payload_size = 0
94        for request in request_iter:
95            aggregated_payload_size += len(request.payload.payload_compressable)
96        response.aggregated_payload_size = aggregated_payload_size
97        self._control()
98        return response
99
100    def FullDuplexCall(self, request_iter, unused_rpc_context):
101        for request in request_iter:
102            for parameter in request.response_parameters:
103                response = response_pb2.StreamingOutputCallResponse()
104                response.payload.payload_type = payload_pb2.COMPRESSABLE
105                response.payload.payload_compressable = 'a' * parameter.size
106                self._control()
107                yield response
108
109    def HalfDuplexCall(self, request_iter, unused_rpc_context):
110        responses = []
111        for request in request_iter:
112            for parameter in request.response_parameters:
113                response = response_pb2.StreamingOutputCallResponse()
114                response.payload.payload_type = payload_pb2.COMPRESSABLE
115                response.payload.payload_compressable = 'a' * parameter.size
116                self._control()
117                responses.append(response)
118        for response in responses:
119            yield response
120
121
122class _Service(
123        collections.namedtuple('_Service', (
124            'servicer_methods',
125            'server',
126            'stub',
127        ))):
128    """A live and running service.
129
130  Attributes:
131    servicer_methods: The _ServicerMethods servicing RPCs.
132    server: The grpc.Server servicing RPCs.
133    stub: A stub on which to invoke RPCs.
134  """
135
136
137def _CreateService():
138    """Provides a servicer backend and a stub.
139
140  Returns:
141    A _Service with which to test RPCs.
142  """
143    servicer_methods = _ServicerMethods()
144
145    class Servicer(getattr(service_pb2_grpc, SERVICER_IDENTIFIER)):
146
147        def UnaryCall(self, request, context):
148            return servicer_methods.UnaryCall(request, context)
149
150        def StreamingOutputCall(self, request, context):
151            return servicer_methods.StreamingOutputCall(request, context)
152
153        def StreamingInputCall(self, request_iterator, context):
154            return servicer_methods.StreamingInputCall(request_iterator,
155                                                       context)
156
157        def FullDuplexCall(self, request_iterator, context):
158            return servicer_methods.FullDuplexCall(request_iterator, context)
159
160        def HalfDuplexCall(self, request_iterator, context):
161            return servicer_methods.HalfDuplexCall(request_iterator, context)
162
163    server = test_common.test_server()
164    getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(),
165                                                                 server)
166    port = server.add_insecure_port('[::]:0')
167    server.start()
168    channel = grpc.insecure_channel('localhost:{}'.format(port))
169    stub = getattr(service_pb2_grpc, STUB_IDENTIFIER)(channel)
170    return _Service(servicer_methods, server, stub)
171
172
173def _CreateIncompleteService():
174    """Provides a servicer backend that fails to implement methods and its stub.
175
176  Returns:
177    A _Service with which to test RPCs. The returned _Service's
178      servicer_methods implements none of the methods required of it.
179  """
180
181    class Servicer(getattr(service_pb2_grpc, SERVICER_IDENTIFIER)):
182        pass
183
184    server = test_common.test_server()
185    getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(),
186                                                                 server)
187    port = server.add_insecure_port('[::]:0')
188    server.start()
189    channel = grpc.insecure_channel('localhost:{}'.format(port))
190    stub = getattr(service_pb2_grpc, STUB_IDENTIFIER)(channel)
191    return _Service(None, server, stub)
192
193
194def _streaming_input_request_iterator():
195    for _ in range(3):
196        request = request_pb2.StreamingInputCallRequest()
197        request.payload.payload_type = payload_pb2.COMPRESSABLE
198        request.payload.payload_compressable = 'a'
199        yield request
200
201
202def _streaming_output_request():
203    request = request_pb2.StreamingOutputCallRequest()
204    sizes = [1, 2, 3]
205    request.response_parameters.add(size=sizes[0], interval_us=0)
206    request.response_parameters.add(size=sizes[1], interval_us=0)
207    request.response_parameters.add(size=sizes[2], interval_us=0)
208    return request
209
210
211def _full_duplex_request_iterator():
212    request = request_pb2.StreamingOutputCallRequest()
213    request.response_parameters.add(size=1, interval_us=0)
214    yield request
215    request = request_pb2.StreamingOutputCallRequest()
216    request.response_parameters.add(size=2, interval_us=0)
217    request.response_parameters.add(size=3, interval_us=0)
218    yield request
219
220
221class PythonPluginTest(unittest.TestCase):
222    """Test case for the gRPC Python protoc-plugin.
223
224  While reading these tests, remember that the futures API
225  (`stub.method.future()`) only gives futures for the *response-unary*
226  methods and does not exist for response-streaming methods.
227  """
228
229    def testImportAttributes(self):
230        # check that we can access the generated module and its members.
231        self.assertIsNotNone(getattr(service_pb2_grpc, STUB_IDENTIFIER, None))
232        self.assertIsNotNone(
233            getattr(service_pb2_grpc, SERVICER_IDENTIFIER, None))
234        self.assertIsNotNone(
235            getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER, None))
236
237    def testUpDown(self):
238        service = _CreateService()
239        self.assertIsNotNone(service.servicer_methods)
240        self.assertIsNotNone(service.server)
241        self.assertIsNotNone(service.stub)
242        service.server.stop(None)
243
244    def testIncompleteServicer(self):
245        service = _CreateIncompleteService()
246        request = request_pb2.SimpleRequest(response_size=13)
247        with self.assertRaises(grpc.RpcError) as exception_context:
248            service.stub.UnaryCall(request)
249        self.assertIs(exception_context.exception.code(),
250                      grpc.StatusCode.UNIMPLEMENTED)
251        service.server.stop(None)
252
253    def testUnaryCall(self):
254        service = _CreateService()
255        request = request_pb2.SimpleRequest(response_size=13)
256        response = service.stub.UnaryCall(request)
257        expected_response = service.servicer_methods.UnaryCall(
258            request, 'not a real context!')
259        self.assertEqual(expected_response, response)
260        service.server.stop(None)
261
262    def testUnaryCallFuture(self):
263        service = _CreateService()
264        request = request_pb2.SimpleRequest(response_size=13)
265        # Check that the call does not block waiting for the server to respond.
266        with service.servicer_methods.pause():
267            response_future = service.stub.UnaryCall.future(request)
268        response = response_future.result()
269        expected_response = service.servicer_methods.UnaryCall(
270            request, 'not a real RpcContext!')
271        self.assertEqual(expected_response, response)
272        service.server.stop(None)
273
274    def testUnaryCallFutureExpired(self):
275        service = _CreateService()
276        request = request_pb2.SimpleRequest(response_size=13)
277        with service.servicer_methods.pause():
278            response_future = service.stub.UnaryCall.future(
279                request, timeout=test_constants.SHORT_TIMEOUT)
280            with self.assertRaises(grpc.RpcError) as exception_context:
281                response_future.result()
282        self.assertIs(exception_context.exception.code(),
283                      grpc.StatusCode.DEADLINE_EXCEEDED)
284        self.assertIs(response_future.code(), grpc.StatusCode.DEADLINE_EXCEEDED)
285        service.server.stop(None)
286
287    def testUnaryCallFutureCancelled(self):
288        service = _CreateService()
289        request = request_pb2.SimpleRequest(response_size=13)
290        with service.servicer_methods.pause():
291            response_future = service.stub.UnaryCall.future(request)
292            response_future.cancel()
293        self.assertTrue(response_future.cancelled())
294        self.assertIs(response_future.code(), grpc.StatusCode.CANCELLED)
295        service.server.stop(None)
296
297    def testUnaryCallFutureFailed(self):
298        service = _CreateService()
299        request = request_pb2.SimpleRequest(response_size=13)
300        with service.servicer_methods.fail():
301            response_future = service.stub.UnaryCall.future(request)
302            self.assertIsNotNone(response_future.exception())
303        self.assertIs(response_future.code(), grpc.StatusCode.UNKNOWN)
304        service.server.stop(None)
305
306    def testStreamingOutputCall(self):
307        service = _CreateService()
308        request = _streaming_output_request()
309        responses = service.stub.StreamingOutputCall(request)
310        expected_responses = service.servicer_methods.StreamingOutputCall(
311            request, 'not a real RpcContext!')
312        for expected_response, response in moves.zip_longest(
313                expected_responses, responses):
314            self.assertEqual(expected_response, response)
315        service.server.stop(None)
316
317    def testStreamingOutputCallExpired(self):
318        service = _CreateService()
319        request = _streaming_output_request()
320        with service.servicer_methods.pause():
321            responses = service.stub.StreamingOutputCall(
322                request, timeout=test_constants.SHORT_TIMEOUT)
323            with self.assertRaises(grpc.RpcError) as exception_context:
324                list(responses)
325        self.assertIs(exception_context.exception.code(),
326                      grpc.StatusCode.DEADLINE_EXCEEDED)
327        service.server.stop(None)
328
329    def testStreamingOutputCallCancelled(self):
330        service = _CreateService()
331        request = _streaming_output_request()
332        responses = service.stub.StreamingOutputCall(request)
333        next(responses)
334        responses.cancel()
335        with self.assertRaises(grpc.RpcError) as exception_context:
336            next(responses)
337        self.assertIs(responses.code(), grpc.StatusCode.CANCELLED)
338        service.server.stop(None)
339
340    def testStreamingOutputCallFailed(self):
341        service = _CreateService()
342        request = _streaming_output_request()
343        with service.servicer_methods.fail():
344            responses = service.stub.StreamingOutputCall(request)
345            self.assertIsNotNone(responses)
346            with self.assertRaises(grpc.RpcError) as exception_context:
347                next(responses)
348        self.assertIs(exception_context.exception.code(),
349                      grpc.StatusCode.UNKNOWN)
350        service.server.stop(None)
351
352    def testStreamingInputCall(self):
353        service = _CreateService()
354        response = service.stub.StreamingInputCall(
355            _streaming_input_request_iterator())
356        expected_response = service.servicer_methods.StreamingInputCall(
357            _streaming_input_request_iterator(), 'not a real RpcContext!')
358        self.assertEqual(expected_response, response)
359        service.server.stop(None)
360
361    def testStreamingInputCallFuture(self):
362        service = _CreateService()
363        with service.servicer_methods.pause():
364            response_future = service.stub.StreamingInputCall.future(
365                _streaming_input_request_iterator())
366        response = response_future.result()
367        expected_response = service.servicer_methods.StreamingInputCall(
368            _streaming_input_request_iterator(), 'not a real RpcContext!')
369        self.assertEqual(expected_response, response)
370        service.server.stop(None)
371
372    def testStreamingInputCallFutureExpired(self):
373        service = _CreateService()
374        with service.servicer_methods.pause():
375            response_future = service.stub.StreamingInputCall.future(
376                _streaming_input_request_iterator(),
377                timeout=test_constants.SHORT_TIMEOUT)
378            with self.assertRaises(grpc.RpcError) as exception_context:
379                response_future.result()
380        self.assertIsInstance(response_future.exception(), grpc.RpcError)
381        self.assertIs(response_future.exception().code(),
382                      grpc.StatusCode.DEADLINE_EXCEEDED)
383        self.assertIs(exception_context.exception.code(),
384                      grpc.StatusCode.DEADLINE_EXCEEDED)
385        service.server.stop(None)
386
387    def testStreamingInputCallFutureCancelled(self):
388        service = _CreateService()
389        with service.servicer_methods.pause():
390            response_future = service.stub.StreamingInputCall.future(
391                _streaming_input_request_iterator())
392            response_future.cancel()
393        self.assertTrue(response_future.cancelled())
394        with self.assertRaises(grpc.FutureCancelledError):
395            response_future.result()
396        service.server.stop(None)
397
398    def testStreamingInputCallFutureFailed(self):
399        service = _CreateService()
400        with service.servicer_methods.fail():
401            response_future = service.stub.StreamingInputCall.future(
402                _streaming_input_request_iterator())
403            self.assertIsNotNone(response_future.exception())
404            self.assertIs(response_future.code(), grpc.StatusCode.UNKNOWN)
405        service.server.stop(None)
406
407    def testFullDuplexCall(self):
408        service = _CreateService()
409        responses = service.stub.FullDuplexCall(_full_duplex_request_iterator())
410        expected_responses = service.servicer_methods.FullDuplexCall(
411            _full_duplex_request_iterator(), 'not a real RpcContext!')
412        for expected_response, response in moves.zip_longest(
413                expected_responses, responses):
414            self.assertEqual(expected_response, response)
415        service.server.stop(None)
416
417    def testFullDuplexCallExpired(self):
418        request_iterator = _full_duplex_request_iterator()
419        service = _CreateService()
420        with service.servicer_methods.pause():
421            responses = service.stub.FullDuplexCall(
422                request_iterator, timeout=test_constants.SHORT_TIMEOUT)
423            with self.assertRaises(grpc.RpcError) as exception_context:
424                list(responses)
425        self.assertIs(exception_context.exception.code(),
426                      grpc.StatusCode.DEADLINE_EXCEEDED)
427        service.server.stop(None)
428
429    def testFullDuplexCallCancelled(self):
430        service = _CreateService()
431        request_iterator = _full_duplex_request_iterator()
432        responses = service.stub.FullDuplexCall(request_iterator)
433        next(responses)
434        responses.cancel()
435        with self.assertRaises(grpc.RpcError) as exception_context:
436            next(responses)
437        self.assertIs(exception_context.exception.code(),
438                      grpc.StatusCode.CANCELLED)
439        service.server.stop(None)
440
441    def testFullDuplexCallFailed(self):
442        request_iterator = _full_duplex_request_iterator()
443        service = _CreateService()
444        with service.servicer_methods.fail():
445            responses = service.stub.FullDuplexCall(request_iterator)
446            with self.assertRaises(grpc.RpcError) as exception_context:
447                next(responses)
448        self.assertIs(exception_context.exception.code(),
449                      grpc.StatusCode.UNKNOWN)
450        service.server.stop(None)
451
452    def testHalfDuplexCall(self):
453        service = _CreateService()
454
455        def half_duplex_request_iterator():
456            request = request_pb2.StreamingOutputCallRequest()
457            request.response_parameters.add(size=1, interval_us=0)
458            yield request
459            request = request_pb2.StreamingOutputCallRequest()
460            request.response_parameters.add(size=2, interval_us=0)
461            request.response_parameters.add(size=3, interval_us=0)
462            yield request
463
464        responses = service.stub.HalfDuplexCall(half_duplex_request_iterator())
465        expected_responses = service.servicer_methods.HalfDuplexCall(
466            half_duplex_request_iterator(), 'not a real RpcContext!')
467        for expected_response, response in moves.zip_longest(
468                expected_responses, responses):
469            self.assertEqual(expected_response, response)
470        service.server.stop(None)
471
472    def testHalfDuplexCallWedged(self):
473        condition = threading.Condition()
474        wait_cell = [False]
475
476        @contextlib.contextmanager
477        def wait():  # pylint: disable=invalid-name
478            # Where's Python 3's 'nonlocal' statement when you need it?
479            with condition:
480                wait_cell[0] = True
481            yield
482            with condition:
483                wait_cell[0] = False
484                condition.notify_all()
485
486        def half_duplex_request_iterator():
487            request = request_pb2.StreamingOutputCallRequest()
488            request.response_parameters.add(size=1, interval_us=0)
489            yield request
490            with condition:
491                while wait_cell[0]:
492                    condition.wait()
493
494        service = _CreateService()
495        with wait():
496            responses = service.stub.HalfDuplexCall(
497                half_duplex_request_iterator(),
498                timeout=test_constants.SHORT_TIMEOUT)
499            # half-duplex waits for the client to send all info
500            with self.assertRaises(grpc.RpcError) as exception_context:
501                next(responses)
502        self.assertIs(exception_context.exception.code(),
503                      grpc.StatusCode.DEADLINE_EXCEEDED)
504        service.server.stop(None)
505
506
507@unittest.skipIf(sys.version_info[0] < 3 or sys.version_info[1] < 6,
508                 "Unsupported on Python 2.")
509class SimpleStubsPluginTest(unittest.TestCase):
510    servicer_methods = _ServicerMethods()
511
512    class Servicer(service_pb2_grpc.TestServiceServicer):
513
514        def UnaryCall(self, request, context):
515            return SimpleStubsPluginTest.servicer_methods.UnaryCall(
516                request, context)
517
518        def StreamingOutputCall(self, request, context):
519            return SimpleStubsPluginTest.servicer_methods.StreamingOutputCall(
520                request, context)
521
522        def StreamingInputCall(self, request_iterator, context):
523            return SimpleStubsPluginTest.servicer_methods.StreamingInputCall(
524                request_iterator, context)
525
526        def FullDuplexCall(self, request_iterator, context):
527            return SimpleStubsPluginTest.servicer_methods.FullDuplexCall(
528                request_iterator, context)
529
530        def HalfDuplexCall(self, request_iterator, context):
531            return SimpleStubsPluginTest.servicer_methods.HalfDuplexCall(
532                request_iterator, context)
533
534    def setUp(self):
535        super(SimpleStubsPluginTest, self).setUp()
536        self._server = test_common.test_server()
537        service_pb2_grpc.add_TestServiceServicer_to_server(
538            self.Servicer(), self._server)
539        self._port = self._server.add_insecure_port('[::]:0')
540        self._server.start()
541        self._target = 'localhost:{}'.format(self._port)
542
543    def tearDown(self):
544        self._server.stop(None)
545        super(SimpleStubsPluginTest, self).tearDown()
546
547    def testUnaryCall(self):
548        request = request_pb2.SimpleRequest(response_size=13)
549        response = service_pb2_grpc.TestService.UnaryCall(
550            request,
551            self._target,
552            channel_credentials=grpc.experimental.insecure_channel_credentials(
553            ),
554            wait_for_ready=True)
555        expected_response = self.servicer_methods.UnaryCall(
556            request, 'not a real context!')
557        self.assertEqual(expected_response, response)
558
559    def testUnaryCallInsecureSugar(self):
560        request = request_pb2.SimpleRequest(response_size=13)
561        response = service_pb2_grpc.TestService.UnaryCall(request,
562                                                          self._target,
563                                                          insecure=True,
564                                                          wait_for_ready=True)
565        expected_response = self.servicer_methods.UnaryCall(
566            request, 'not a real context!')
567        self.assertEqual(expected_response, response)
568
569    def testStreamingOutputCall(self):
570        request = _streaming_output_request()
571        expected_responses = self.servicer_methods.StreamingOutputCall(
572            request, 'not a real RpcContext!')
573        responses = service_pb2_grpc.TestService.StreamingOutputCall(
574            request,
575            self._target,
576            channel_credentials=grpc.experimental.insecure_channel_credentials(
577            ),
578            wait_for_ready=True)
579        for expected_response, response in moves.zip_longest(
580                expected_responses, responses):
581            self.assertEqual(expected_response, response)
582
583    def testStreamingInputCall(self):
584        response = service_pb2_grpc.TestService.StreamingInputCall(
585            _streaming_input_request_iterator(),
586            self._target,
587            channel_credentials=grpc.experimental.insecure_channel_credentials(
588            ),
589            wait_for_ready=True)
590        expected_response = self.servicer_methods.StreamingInputCall(
591            _streaming_input_request_iterator(), 'not a real RpcContext!')
592        self.assertEqual(expected_response, response)
593
594    def testFullDuplexCall(self):
595        responses = service_pb2_grpc.TestService.FullDuplexCall(
596            _full_duplex_request_iterator(),
597            self._target,
598            channel_credentials=grpc.experimental.insecure_channel_credentials(
599            ),
600            wait_for_ready=True)
601        expected_responses = self.servicer_methods.FullDuplexCall(
602            _full_duplex_request_iterator(), 'not a real RpcContext!')
603        for expected_response, response in moves.zip_longest(
604                expected_responses, responses):
605            self.assertEqual(expected_response, response)
606
607    def testHalfDuplexCall(self):
608
609        def half_duplex_request_iterator():
610            request = request_pb2.StreamingOutputCallRequest()
611            request.response_parameters.add(size=1, interval_us=0)
612            yield request
613            request = request_pb2.StreamingOutputCallRequest()
614            request.response_parameters.add(size=2, interval_us=0)
615            request.response_parameters.add(size=3, interval_us=0)
616            yield request
617
618        responses = service_pb2_grpc.TestService.HalfDuplexCall(
619            half_duplex_request_iterator(),
620            self._target,
621            channel_credentials=grpc.experimental.insecure_channel_credentials(
622            ),
623            wait_for_ready=True)
624        expected_responses = self.servicer_methods.HalfDuplexCall(
625            half_duplex_request_iterator(), 'not a real RpcContext!')
626        for expected_response, response in moves.zip_longest(
627                expected_responses, responses):
628            self.assertEqual(expected_response, response)
629
630
631class ModuleMainTest(unittest.TestCase):
632    """Test case for running `python -m grpc_tools.protoc`.
633    """
634
635    def test_clean_output(self):
636        if sys.executable is None:
637            raise unittest.SkipTest(
638                "Running on a interpreter that cannot be invoked from the CLI.")
639        proto_dir_path = os.path.join("src", "proto")
640        test_proto_path = os.path.join(proto_dir_path, "grpc", "testing",
641                                       "empty.proto")
642        streams = tuple(tempfile.TemporaryFile() for _ in range(2))
643        work_dir = tempfile.mkdtemp()
644        try:
645            invocation = (sys.executable, "-m", "grpc_tools.protoc",
646                          "--proto_path", proto_dir_path, "--python_out",
647                          work_dir, "--grpc_python_out", work_dir,
648                          test_proto_path)
649            proc = subprocess.Popen(invocation,
650                                    stdout=streams[0],
651                                    stderr=streams[1])
652            proc.wait()
653            outs = []
654            for stream in streams:
655                stream.seek(0)
656                self.assertEqual(0, len(stream.read()))
657            self.assertEqual(0, proc.returncode)
658        except Exception:  # pylint: disable=broad-except
659            shutil.rmtree(work_dir)
660
661
662if __name__ == '__main__':
663    unittest.main(verbosity=2)
664