1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import collections 16import contextlib 17import distutils.spawn 18import errno 19import os 20import shutil 21import subprocess 22import sys 23import tempfile 24import threading 25import unittest 26 27from six import moves 28 29import grpc 30import grpc.experimental 31from tests.unit import test_common 32from tests.unit.framework.common import test_constants 33 34import tests.protoc_plugin.protos.payload.test_payload_pb2 as payload_pb2 35import tests.protoc_plugin.protos.requests.r.test_requests_pb2 as request_pb2 36import tests.protoc_plugin.protos.responses.test_responses_pb2 as response_pb2 37import tests.protoc_plugin.protos.service.test_service_pb2_grpc as service_pb2_grpc 38 39# Identifiers of entities we expect to find in the generated module. 40STUB_IDENTIFIER = 'TestServiceStub' 41SERVICER_IDENTIFIER = 'TestServiceServicer' 42ADD_SERVICER_TO_SERVER_IDENTIFIER = 'add_TestServiceServicer_to_server' 43 44 45class _ServicerMethods(object): 46 47 def __init__(self): 48 self._condition = threading.Condition() 49 self._paused = False 50 self._fail = False 51 52 @contextlib.contextmanager 53 def pause(self): # pylint: disable=invalid-name 54 with self._condition: 55 self._paused = True 56 yield 57 with self._condition: 58 self._paused = False 59 self._condition.notify_all() 60 61 @contextlib.contextmanager 62 def fail(self): # pylint: disable=invalid-name 63 with self._condition: 64 self._fail = True 65 yield 66 with self._condition: 67 self._fail = False 68 69 def _control(self): # pylint: disable=invalid-name 70 with self._condition: 71 if self._fail: 72 raise ValueError() 73 while self._paused: 74 self._condition.wait() 75 76 def UnaryCall(self, request, unused_rpc_context): 77 response = response_pb2.SimpleResponse() 78 response.payload.payload_type = payload_pb2.COMPRESSABLE 79 response.payload.payload_compressable = 'a' * request.response_size 80 self._control() 81 return response 82 83 def StreamingOutputCall(self, request, unused_rpc_context): 84 for parameter in request.response_parameters: 85 response = response_pb2.StreamingOutputCallResponse() 86 response.payload.payload_type = payload_pb2.COMPRESSABLE 87 response.payload.payload_compressable = 'a' * parameter.size 88 self._control() 89 yield response 90 91 def StreamingInputCall(self, request_iter, unused_rpc_context): 92 response = response_pb2.StreamingInputCallResponse() 93 aggregated_payload_size = 0 94 for request in request_iter: 95 aggregated_payload_size += len(request.payload.payload_compressable) 96 response.aggregated_payload_size = aggregated_payload_size 97 self._control() 98 return response 99 100 def FullDuplexCall(self, request_iter, unused_rpc_context): 101 for request in request_iter: 102 for parameter in request.response_parameters: 103 response = response_pb2.StreamingOutputCallResponse() 104 response.payload.payload_type = payload_pb2.COMPRESSABLE 105 response.payload.payload_compressable = 'a' * parameter.size 106 self._control() 107 yield response 108 109 def HalfDuplexCall(self, request_iter, unused_rpc_context): 110 responses = [] 111 for request in request_iter: 112 for parameter in request.response_parameters: 113 response = response_pb2.StreamingOutputCallResponse() 114 response.payload.payload_type = payload_pb2.COMPRESSABLE 115 response.payload.payload_compressable = 'a' * parameter.size 116 self._control() 117 responses.append(response) 118 for response in responses: 119 yield response 120 121 122class _Service( 123 collections.namedtuple('_Service', ( 124 'servicer_methods', 125 'server', 126 'stub', 127 ))): 128 """A live and running service. 129 130 Attributes: 131 servicer_methods: The _ServicerMethods servicing RPCs. 132 server: The grpc.Server servicing RPCs. 133 stub: A stub on which to invoke RPCs. 134 """ 135 136 137def _CreateService(): 138 """Provides a servicer backend and a stub. 139 140 Returns: 141 A _Service with which to test RPCs. 142 """ 143 servicer_methods = _ServicerMethods() 144 145 class Servicer(getattr(service_pb2_grpc, SERVICER_IDENTIFIER)): 146 147 def UnaryCall(self, request, context): 148 return servicer_methods.UnaryCall(request, context) 149 150 def StreamingOutputCall(self, request, context): 151 return servicer_methods.StreamingOutputCall(request, context) 152 153 def StreamingInputCall(self, request_iterator, context): 154 return servicer_methods.StreamingInputCall(request_iterator, 155 context) 156 157 def FullDuplexCall(self, request_iterator, context): 158 return servicer_methods.FullDuplexCall(request_iterator, context) 159 160 def HalfDuplexCall(self, request_iterator, context): 161 return servicer_methods.HalfDuplexCall(request_iterator, context) 162 163 server = test_common.test_server() 164 getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(), 165 server) 166 port = server.add_insecure_port('[::]:0') 167 server.start() 168 channel = grpc.insecure_channel('localhost:{}'.format(port)) 169 stub = getattr(service_pb2_grpc, STUB_IDENTIFIER)(channel) 170 return _Service(servicer_methods, server, stub) 171 172 173def _CreateIncompleteService(): 174 """Provides a servicer backend that fails to implement methods and its stub. 175 176 Returns: 177 A _Service with which to test RPCs. The returned _Service's 178 servicer_methods implements none of the methods required of it. 179 """ 180 181 class Servicer(getattr(service_pb2_grpc, SERVICER_IDENTIFIER)): 182 pass 183 184 server = test_common.test_server() 185 getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(), 186 server) 187 port = server.add_insecure_port('[::]:0') 188 server.start() 189 channel = grpc.insecure_channel('localhost:{}'.format(port)) 190 stub = getattr(service_pb2_grpc, STUB_IDENTIFIER)(channel) 191 return _Service(None, server, stub) 192 193 194def _streaming_input_request_iterator(): 195 for _ in range(3): 196 request = request_pb2.StreamingInputCallRequest() 197 request.payload.payload_type = payload_pb2.COMPRESSABLE 198 request.payload.payload_compressable = 'a' 199 yield request 200 201 202def _streaming_output_request(): 203 request = request_pb2.StreamingOutputCallRequest() 204 sizes = [1, 2, 3] 205 request.response_parameters.add(size=sizes[0], interval_us=0) 206 request.response_parameters.add(size=sizes[1], interval_us=0) 207 request.response_parameters.add(size=sizes[2], interval_us=0) 208 return request 209 210 211def _full_duplex_request_iterator(): 212 request = request_pb2.StreamingOutputCallRequest() 213 request.response_parameters.add(size=1, interval_us=0) 214 yield request 215 request = request_pb2.StreamingOutputCallRequest() 216 request.response_parameters.add(size=2, interval_us=0) 217 request.response_parameters.add(size=3, interval_us=0) 218 yield request 219 220 221class PythonPluginTest(unittest.TestCase): 222 """Test case for the gRPC Python protoc-plugin. 223 224 While reading these tests, remember that the futures API 225 (`stub.method.future()`) only gives futures for the *response-unary* 226 methods and does not exist for response-streaming methods. 227 """ 228 229 def testImportAttributes(self): 230 # check that we can access the generated module and its members. 231 self.assertIsNotNone(getattr(service_pb2_grpc, STUB_IDENTIFIER, None)) 232 self.assertIsNotNone( 233 getattr(service_pb2_grpc, SERVICER_IDENTIFIER, None)) 234 self.assertIsNotNone( 235 getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER, None)) 236 237 def testUpDown(self): 238 service = _CreateService() 239 self.assertIsNotNone(service.servicer_methods) 240 self.assertIsNotNone(service.server) 241 self.assertIsNotNone(service.stub) 242 service.server.stop(None) 243 244 def testIncompleteServicer(self): 245 service = _CreateIncompleteService() 246 request = request_pb2.SimpleRequest(response_size=13) 247 with self.assertRaises(grpc.RpcError) as exception_context: 248 service.stub.UnaryCall(request) 249 self.assertIs(exception_context.exception.code(), 250 grpc.StatusCode.UNIMPLEMENTED) 251 service.server.stop(None) 252 253 def testUnaryCall(self): 254 service = _CreateService() 255 request = request_pb2.SimpleRequest(response_size=13) 256 response = service.stub.UnaryCall(request) 257 expected_response = service.servicer_methods.UnaryCall( 258 request, 'not a real context!') 259 self.assertEqual(expected_response, response) 260 service.server.stop(None) 261 262 def testUnaryCallFuture(self): 263 service = _CreateService() 264 request = request_pb2.SimpleRequest(response_size=13) 265 # Check that the call does not block waiting for the server to respond. 266 with service.servicer_methods.pause(): 267 response_future = service.stub.UnaryCall.future(request) 268 response = response_future.result() 269 expected_response = service.servicer_methods.UnaryCall( 270 request, 'not a real RpcContext!') 271 self.assertEqual(expected_response, response) 272 service.server.stop(None) 273 274 def testUnaryCallFutureExpired(self): 275 service = _CreateService() 276 request = request_pb2.SimpleRequest(response_size=13) 277 with service.servicer_methods.pause(): 278 response_future = service.stub.UnaryCall.future( 279 request, timeout=test_constants.SHORT_TIMEOUT) 280 with self.assertRaises(grpc.RpcError) as exception_context: 281 response_future.result() 282 self.assertIs(exception_context.exception.code(), 283 grpc.StatusCode.DEADLINE_EXCEEDED) 284 self.assertIs(response_future.code(), grpc.StatusCode.DEADLINE_EXCEEDED) 285 service.server.stop(None) 286 287 def testUnaryCallFutureCancelled(self): 288 service = _CreateService() 289 request = request_pb2.SimpleRequest(response_size=13) 290 with service.servicer_methods.pause(): 291 response_future = service.stub.UnaryCall.future(request) 292 response_future.cancel() 293 self.assertTrue(response_future.cancelled()) 294 self.assertIs(response_future.code(), grpc.StatusCode.CANCELLED) 295 service.server.stop(None) 296 297 def testUnaryCallFutureFailed(self): 298 service = _CreateService() 299 request = request_pb2.SimpleRequest(response_size=13) 300 with service.servicer_methods.fail(): 301 response_future = service.stub.UnaryCall.future(request) 302 self.assertIsNotNone(response_future.exception()) 303 self.assertIs(response_future.code(), grpc.StatusCode.UNKNOWN) 304 service.server.stop(None) 305 306 def testStreamingOutputCall(self): 307 service = _CreateService() 308 request = _streaming_output_request() 309 responses = service.stub.StreamingOutputCall(request) 310 expected_responses = service.servicer_methods.StreamingOutputCall( 311 request, 'not a real RpcContext!') 312 for expected_response, response in moves.zip_longest( 313 expected_responses, responses): 314 self.assertEqual(expected_response, response) 315 service.server.stop(None) 316 317 def testStreamingOutputCallExpired(self): 318 service = _CreateService() 319 request = _streaming_output_request() 320 with service.servicer_methods.pause(): 321 responses = service.stub.StreamingOutputCall( 322 request, timeout=test_constants.SHORT_TIMEOUT) 323 with self.assertRaises(grpc.RpcError) as exception_context: 324 list(responses) 325 self.assertIs(exception_context.exception.code(), 326 grpc.StatusCode.DEADLINE_EXCEEDED) 327 service.server.stop(None) 328 329 def testStreamingOutputCallCancelled(self): 330 service = _CreateService() 331 request = _streaming_output_request() 332 responses = service.stub.StreamingOutputCall(request) 333 next(responses) 334 responses.cancel() 335 with self.assertRaises(grpc.RpcError) as exception_context: 336 next(responses) 337 self.assertIs(responses.code(), grpc.StatusCode.CANCELLED) 338 service.server.stop(None) 339 340 def testStreamingOutputCallFailed(self): 341 service = _CreateService() 342 request = _streaming_output_request() 343 with service.servicer_methods.fail(): 344 responses = service.stub.StreamingOutputCall(request) 345 self.assertIsNotNone(responses) 346 with self.assertRaises(grpc.RpcError) as exception_context: 347 next(responses) 348 self.assertIs(exception_context.exception.code(), 349 grpc.StatusCode.UNKNOWN) 350 service.server.stop(None) 351 352 def testStreamingInputCall(self): 353 service = _CreateService() 354 response = service.stub.StreamingInputCall( 355 _streaming_input_request_iterator()) 356 expected_response = service.servicer_methods.StreamingInputCall( 357 _streaming_input_request_iterator(), 'not a real RpcContext!') 358 self.assertEqual(expected_response, response) 359 service.server.stop(None) 360 361 def testStreamingInputCallFuture(self): 362 service = _CreateService() 363 with service.servicer_methods.pause(): 364 response_future = service.stub.StreamingInputCall.future( 365 _streaming_input_request_iterator()) 366 response = response_future.result() 367 expected_response = service.servicer_methods.StreamingInputCall( 368 _streaming_input_request_iterator(), 'not a real RpcContext!') 369 self.assertEqual(expected_response, response) 370 service.server.stop(None) 371 372 def testStreamingInputCallFutureExpired(self): 373 service = _CreateService() 374 with service.servicer_methods.pause(): 375 response_future = service.stub.StreamingInputCall.future( 376 _streaming_input_request_iterator(), 377 timeout=test_constants.SHORT_TIMEOUT) 378 with self.assertRaises(grpc.RpcError) as exception_context: 379 response_future.result() 380 self.assertIsInstance(response_future.exception(), grpc.RpcError) 381 self.assertIs(response_future.exception().code(), 382 grpc.StatusCode.DEADLINE_EXCEEDED) 383 self.assertIs(exception_context.exception.code(), 384 grpc.StatusCode.DEADLINE_EXCEEDED) 385 service.server.stop(None) 386 387 def testStreamingInputCallFutureCancelled(self): 388 service = _CreateService() 389 with service.servicer_methods.pause(): 390 response_future = service.stub.StreamingInputCall.future( 391 _streaming_input_request_iterator()) 392 response_future.cancel() 393 self.assertTrue(response_future.cancelled()) 394 with self.assertRaises(grpc.FutureCancelledError): 395 response_future.result() 396 service.server.stop(None) 397 398 def testStreamingInputCallFutureFailed(self): 399 service = _CreateService() 400 with service.servicer_methods.fail(): 401 response_future = service.stub.StreamingInputCall.future( 402 _streaming_input_request_iterator()) 403 self.assertIsNotNone(response_future.exception()) 404 self.assertIs(response_future.code(), grpc.StatusCode.UNKNOWN) 405 service.server.stop(None) 406 407 def testFullDuplexCall(self): 408 service = _CreateService() 409 responses = service.stub.FullDuplexCall(_full_duplex_request_iterator()) 410 expected_responses = service.servicer_methods.FullDuplexCall( 411 _full_duplex_request_iterator(), 'not a real RpcContext!') 412 for expected_response, response in moves.zip_longest( 413 expected_responses, responses): 414 self.assertEqual(expected_response, response) 415 service.server.stop(None) 416 417 def testFullDuplexCallExpired(self): 418 request_iterator = _full_duplex_request_iterator() 419 service = _CreateService() 420 with service.servicer_methods.pause(): 421 responses = service.stub.FullDuplexCall( 422 request_iterator, timeout=test_constants.SHORT_TIMEOUT) 423 with self.assertRaises(grpc.RpcError) as exception_context: 424 list(responses) 425 self.assertIs(exception_context.exception.code(), 426 grpc.StatusCode.DEADLINE_EXCEEDED) 427 service.server.stop(None) 428 429 def testFullDuplexCallCancelled(self): 430 service = _CreateService() 431 request_iterator = _full_duplex_request_iterator() 432 responses = service.stub.FullDuplexCall(request_iterator) 433 next(responses) 434 responses.cancel() 435 with self.assertRaises(grpc.RpcError) as exception_context: 436 next(responses) 437 self.assertIs(exception_context.exception.code(), 438 grpc.StatusCode.CANCELLED) 439 service.server.stop(None) 440 441 def testFullDuplexCallFailed(self): 442 request_iterator = _full_duplex_request_iterator() 443 service = _CreateService() 444 with service.servicer_methods.fail(): 445 responses = service.stub.FullDuplexCall(request_iterator) 446 with self.assertRaises(grpc.RpcError) as exception_context: 447 next(responses) 448 self.assertIs(exception_context.exception.code(), 449 grpc.StatusCode.UNKNOWN) 450 service.server.stop(None) 451 452 def testHalfDuplexCall(self): 453 service = _CreateService() 454 455 def half_duplex_request_iterator(): 456 request = request_pb2.StreamingOutputCallRequest() 457 request.response_parameters.add(size=1, interval_us=0) 458 yield request 459 request = request_pb2.StreamingOutputCallRequest() 460 request.response_parameters.add(size=2, interval_us=0) 461 request.response_parameters.add(size=3, interval_us=0) 462 yield request 463 464 responses = service.stub.HalfDuplexCall(half_duplex_request_iterator()) 465 expected_responses = service.servicer_methods.HalfDuplexCall( 466 half_duplex_request_iterator(), 'not a real RpcContext!') 467 for expected_response, response in moves.zip_longest( 468 expected_responses, responses): 469 self.assertEqual(expected_response, response) 470 service.server.stop(None) 471 472 def testHalfDuplexCallWedged(self): 473 condition = threading.Condition() 474 wait_cell = [False] 475 476 @contextlib.contextmanager 477 def wait(): # pylint: disable=invalid-name 478 # Where's Python 3's 'nonlocal' statement when you need it? 479 with condition: 480 wait_cell[0] = True 481 yield 482 with condition: 483 wait_cell[0] = False 484 condition.notify_all() 485 486 def half_duplex_request_iterator(): 487 request = request_pb2.StreamingOutputCallRequest() 488 request.response_parameters.add(size=1, interval_us=0) 489 yield request 490 with condition: 491 while wait_cell[0]: 492 condition.wait() 493 494 service = _CreateService() 495 with wait(): 496 responses = service.stub.HalfDuplexCall( 497 half_duplex_request_iterator(), 498 timeout=test_constants.SHORT_TIMEOUT) 499 # half-duplex waits for the client to send all info 500 with self.assertRaises(grpc.RpcError) as exception_context: 501 next(responses) 502 self.assertIs(exception_context.exception.code(), 503 grpc.StatusCode.DEADLINE_EXCEEDED) 504 service.server.stop(None) 505 506 507@unittest.skipIf(sys.version_info[0] < 3 or sys.version_info[1] < 6, 508 "Unsupported on Python 2.") 509class SimpleStubsPluginTest(unittest.TestCase): 510 servicer_methods = _ServicerMethods() 511 512 class Servicer(service_pb2_grpc.TestServiceServicer): 513 514 def UnaryCall(self, request, context): 515 return SimpleStubsPluginTest.servicer_methods.UnaryCall( 516 request, context) 517 518 def StreamingOutputCall(self, request, context): 519 return SimpleStubsPluginTest.servicer_methods.StreamingOutputCall( 520 request, context) 521 522 def StreamingInputCall(self, request_iterator, context): 523 return SimpleStubsPluginTest.servicer_methods.StreamingInputCall( 524 request_iterator, context) 525 526 def FullDuplexCall(self, request_iterator, context): 527 return SimpleStubsPluginTest.servicer_methods.FullDuplexCall( 528 request_iterator, context) 529 530 def HalfDuplexCall(self, request_iterator, context): 531 return SimpleStubsPluginTest.servicer_methods.HalfDuplexCall( 532 request_iterator, context) 533 534 def setUp(self): 535 super(SimpleStubsPluginTest, self).setUp() 536 self._server = test_common.test_server() 537 service_pb2_grpc.add_TestServiceServicer_to_server( 538 self.Servicer(), self._server) 539 self._port = self._server.add_insecure_port('[::]:0') 540 self._server.start() 541 self._target = 'localhost:{}'.format(self._port) 542 543 def tearDown(self): 544 self._server.stop(None) 545 super(SimpleStubsPluginTest, self).tearDown() 546 547 def testUnaryCall(self): 548 request = request_pb2.SimpleRequest(response_size=13) 549 response = service_pb2_grpc.TestService.UnaryCall( 550 request, 551 self._target, 552 channel_credentials=grpc.experimental.insecure_channel_credentials( 553 ), 554 wait_for_ready=True) 555 expected_response = self.servicer_methods.UnaryCall( 556 request, 'not a real context!') 557 self.assertEqual(expected_response, response) 558 559 def testUnaryCallInsecureSugar(self): 560 request = request_pb2.SimpleRequest(response_size=13) 561 response = service_pb2_grpc.TestService.UnaryCall(request, 562 self._target, 563 insecure=True, 564 wait_for_ready=True) 565 expected_response = self.servicer_methods.UnaryCall( 566 request, 'not a real context!') 567 self.assertEqual(expected_response, response) 568 569 def testStreamingOutputCall(self): 570 request = _streaming_output_request() 571 expected_responses = self.servicer_methods.StreamingOutputCall( 572 request, 'not a real RpcContext!') 573 responses = service_pb2_grpc.TestService.StreamingOutputCall( 574 request, 575 self._target, 576 channel_credentials=grpc.experimental.insecure_channel_credentials( 577 ), 578 wait_for_ready=True) 579 for expected_response, response in moves.zip_longest( 580 expected_responses, responses): 581 self.assertEqual(expected_response, response) 582 583 def testStreamingInputCall(self): 584 response = service_pb2_grpc.TestService.StreamingInputCall( 585 _streaming_input_request_iterator(), 586 self._target, 587 channel_credentials=grpc.experimental.insecure_channel_credentials( 588 ), 589 wait_for_ready=True) 590 expected_response = self.servicer_methods.StreamingInputCall( 591 _streaming_input_request_iterator(), 'not a real RpcContext!') 592 self.assertEqual(expected_response, response) 593 594 def testFullDuplexCall(self): 595 responses = service_pb2_grpc.TestService.FullDuplexCall( 596 _full_duplex_request_iterator(), 597 self._target, 598 channel_credentials=grpc.experimental.insecure_channel_credentials( 599 ), 600 wait_for_ready=True) 601 expected_responses = self.servicer_methods.FullDuplexCall( 602 _full_duplex_request_iterator(), 'not a real RpcContext!') 603 for expected_response, response in moves.zip_longest( 604 expected_responses, responses): 605 self.assertEqual(expected_response, response) 606 607 def testHalfDuplexCall(self): 608 609 def half_duplex_request_iterator(): 610 request = request_pb2.StreamingOutputCallRequest() 611 request.response_parameters.add(size=1, interval_us=0) 612 yield request 613 request = request_pb2.StreamingOutputCallRequest() 614 request.response_parameters.add(size=2, interval_us=0) 615 request.response_parameters.add(size=3, interval_us=0) 616 yield request 617 618 responses = service_pb2_grpc.TestService.HalfDuplexCall( 619 half_duplex_request_iterator(), 620 self._target, 621 channel_credentials=grpc.experimental.insecure_channel_credentials( 622 ), 623 wait_for_ready=True) 624 expected_responses = self.servicer_methods.HalfDuplexCall( 625 half_duplex_request_iterator(), 'not a real RpcContext!') 626 for expected_response, response in moves.zip_longest( 627 expected_responses, responses): 628 self.assertEqual(expected_response, response) 629 630 631class ModuleMainTest(unittest.TestCase): 632 """Test case for running `python -m grpc_tools.protoc`. 633 """ 634 635 def test_clean_output(self): 636 if sys.executable is None: 637 raise unittest.SkipTest( 638 "Running on a interpreter that cannot be invoked from the CLI.") 639 proto_dir_path = os.path.join("src", "proto") 640 test_proto_path = os.path.join(proto_dir_path, "grpc", "testing", 641 "empty.proto") 642 streams = tuple(tempfile.TemporaryFile() for _ in range(2)) 643 work_dir = tempfile.mkdtemp() 644 try: 645 invocation = (sys.executable, "-m", "grpc_tools.protoc", 646 "--proto_path", proto_dir_path, "--python_out", 647 work_dir, "--grpc_python_out", work_dir, 648 test_proto_path) 649 proc = subprocess.Popen(invocation, 650 stdout=streams[0], 651 stderr=streams[1]) 652 proc.wait() 653 outs = [] 654 for stream in streams: 655 stream.seek(0) 656 self.assertEqual(0, len(stream.read())) 657 self.assertEqual(0, proc.returncode) 658 except Exception: # pylint: disable=broad-except 659 shutil.rmtree(work_dir) 660 661 662if __name__ == '__main__': 663 unittest.main(verbosity=2) 664