1import signal 2import sys 3import unittest 4import warnings 5from unittest import mock 6 7import asyncio 8from asyncio import base_subprocess 9from asyncio import subprocess 10from test.test_asyncio import utils as test_utils 11from test import support 12 13if sys.platform != 'win32': 14 from asyncio import unix_events 15 16# Program blocking 17PROGRAM_BLOCKED = [sys.executable, '-c', 'import time; time.sleep(3600)'] 18 19# Program copying input to output 20PROGRAM_CAT = [ 21 sys.executable, '-c', 22 ';'.join(('import sys', 23 'data = sys.stdin.buffer.read()', 24 'sys.stdout.buffer.write(data)'))] 25 26 27def tearDownModule(): 28 asyncio.set_event_loop_policy(None) 29 30 31class TestSubprocessTransport(base_subprocess.BaseSubprocessTransport): 32 def _start(self, *args, **kwargs): 33 self._proc = mock.Mock() 34 self._proc.stdin = None 35 self._proc.stdout = None 36 self._proc.stderr = None 37 self._proc.pid = -1 38 39 40class SubprocessTransportTests(test_utils.TestCase): 41 def setUp(self): 42 super().setUp() 43 self.loop = self.new_test_loop() 44 self.set_event_loop(self.loop) 45 46 def create_transport(self, waiter=None): 47 protocol = mock.Mock() 48 protocol.connection_made._is_coroutine = False 49 protocol.process_exited._is_coroutine = False 50 transport = TestSubprocessTransport( 51 self.loop, protocol, ['test'], False, 52 None, None, None, 0, waiter=waiter) 53 return (transport, protocol) 54 55 def test_proc_exited(self): 56 waiter = self.loop.create_future() 57 transport, protocol = self.create_transport(waiter) 58 transport._process_exited(6) 59 self.loop.run_until_complete(waiter) 60 61 self.assertEqual(transport.get_returncode(), 6) 62 63 self.assertTrue(protocol.connection_made.called) 64 self.assertTrue(protocol.process_exited.called) 65 self.assertTrue(protocol.connection_lost.called) 66 self.assertEqual(protocol.connection_lost.call_args[0], (None,)) 67 68 self.assertFalse(transport.is_closing()) 69 self.assertIsNone(transport._loop) 70 self.assertIsNone(transport._proc) 71 self.assertIsNone(transport._protocol) 72 73 # methods must raise ProcessLookupError if the process exited 74 self.assertRaises(ProcessLookupError, 75 transport.send_signal, signal.SIGTERM) 76 self.assertRaises(ProcessLookupError, transport.terminate) 77 self.assertRaises(ProcessLookupError, transport.kill) 78 79 transport.close() 80 81 def test_subprocess_repr(self): 82 waiter = self.loop.create_future() 83 transport, protocol = self.create_transport(waiter) 84 transport._process_exited(6) 85 self.loop.run_until_complete(waiter) 86 87 self.assertEqual( 88 repr(transport), 89 "<TestSubprocessTransport pid=-1 returncode=6>" 90 ) 91 transport._returncode = None 92 self.assertEqual( 93 repr(transport), 94 "<TestSubprocessTransport pid=-1 running>" 95 ) 96 transport._pid = None 97 transport._returncode = None 98 self.assertEqual( 99 repr(transport), 100 "<TestSubprocessTransport not started>" 101 ) 102 transport.close() 103 104 105class SubprocessMixin: 106 107 def test_stdin_stdout(self): 108 args = PROGRAM_CAT 109 110 async def run(data): 111 proc = await asyncio.create_subprocess_exec( 112 *args, 113 stdin=subprocess.PIPE, 114 stdout=subprocess.PIPE, 115 ) 116 117 # feed data 118 proc.stdin.write(data) 119 await proc.stdin.drain() 120 proc.stdin.close() 121 122 # get output and exitcode 123 data = await proc.stdout.read() 124 exitcode = await proc.wait() 125 return (exitcode, data) 126 127 task = run(b'some data') 128 task = asyncio.wait_for(task, 60.0) 129 exitcode, stdout = self.loop.run_until_complete(task) 130 self.assertEqual(exitcode, 0) 131 self.assertEqual(stdout, b'some data') 132 133 def test_communicate(self): 134 args = PROGRAM_CAT 135 136 async def run(data): 137 proc = await asyncio.create_subprocess_exec( 138 *args, 139 stdin=subprocess.PIPE, 140 stdout=subprocess.PIPE, 141 ) 142 stdout, stderr = await proc.communicate(data) 143 return proc.returncode, stdout 144 145 task = run(b'some data') 146 task = asyncio.wait_for(task, 60.0) 147 exitcode, stdout = self.loop.run_until_complete(task) 148 self.assertEqual(exitcode, 0) 149 self.assertEqual(stdout, b'some data') 150 151 def test_shell(self): 152 proc = self.loop.run_until_complete( 153 asyncio.create_subprocess_shell('exit 7') 154 ) 155 exitcode = self.loop.run_until_complete(proc.wait()) 156 self.assertEqual(exitcode, 7) 157 158 def test_start_new_session(self): 159 # start the new process in a new session 160 proc = self.loop.run_until_complete( 161 asyncio.create_subprocess_shell( 162 'exit 8', 163 start_new_session=True, 164 ) 165 ) 166 exitcode = self.loop.run_until_complete(proc.wait()) 167 self.assertEqual(exitcode, 8) 168 169 def test_kill(self): 170 args = PROGRAM_BLOCKED 171 proc = self.loop.run_until_complete( 172 asyncio.create_subprocess_exec(*args) 173 ) 174 proc.kill() 175 returncode = self.loop.run_until_complete(proc.wait()) 176 if sys.platform == 'win32': 177 self.assertIsInstance(returncode, int) 178 # expect 1 but sometimes get 0 179 else: 180 self.assertEqual(-signal.SIGKILL, returncode) 181 182 def test_terminate(self): 183 args = PROGRAM_BLOCKED 184 proc = self.loop.run_until_complete( 185 asyncio.create_subprocess_exec(*args) 186 ) 187 proc.terminate() 188 returncode = self.loop.run_until_complete(proc.wait()) 189 if sys.platform == 'win32': 190 self.assertIsInstance(returncode, int) 191 # expect 1 but sometimes get 0 192 else: 193 self.assertEqual(-signal.SIGTERM, returncode) 194 195 @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP") 196 def test_send_signal(self): 197 # bpo-31034: Make sure that we get the default signal handler (killing 198 # the process). The parent process may have decided to ignore SIGHUP, 199 # and signal handlers are inherited. 200 old_handler = signal.signal(signal.SIGHUP, signal.SIG_DFL) 201 try: 202 code = 'import time; print("sleeping", flush=True); time.sleep(3600)' 203 args = [sys.executable, '-c', code] 204 proc = self.loop.run_until_complete( 205 asyncio.create_subprocess_exec( 206 *args, 207 stdout=subprocess.PIPE, 208 ) 209 ) 210 211 async def send_signal(proc): 212 # basic synchronization to wait until the program is sleeping 213 line = await proc.stdout.readline() 214 self.assertEqual(line, b'sleeping\n') 215 216 proc.send_signal(signal.SIGHUP) 217 returncode = await proc.wait() 218 return returncode 219 220 returncode = self.loop.run_until_complete(send_signal(proc)) 221 self.assertEqual(-signal.SIGHUP, returncode) 222 finally: 223 signal.signal(signal.SIGHUP, old_handler) 224 225 def prepare_broken_pipe_test(self): 226 # buffer large enough to feed the whole pipe buffer 227 large_data = b'x' * support.PIPE_MAX_SIZE 228 229 # the program ends before the stdin can be feeded 230 proc = self.loop.run_until_complete( 231 asyncio.create_subprocess_exec( 232 sys.executable, '-c', 'pass', 233 stdin=subprocess.PIPE, 234 ) 235 ) 236 237 return (proc, large_data) 238 239 def test_stdin_broken_pipe(self): 240 proc, large_data = self.prepare_broken_pipe_test() 241 242 async def write_stdin(proc, data): 243 await asyncio.sleep(0.5) 244 proc.stdin.write(data) 245 await proc.stdin.drain() 246 247 coro = write_stdin(proc, large_data) 248 # drain() must raise BrokenPipeError or ConnectionResetError 249 with test_utils.disable_logger(): 250 self.assertRaises((BrokenPipeError, ConnectionResetError), 251 self.loop.run_until_complete, coro) 252 self.loop.run_until_complete(proc.wait()) 253 254 def test_communicate_ignore_broken_pipe(self): 255 proc, large_data = self.prepare_broken_pipe_test() 256 257 # communicate() must ignore BrokenPipeError when feeding stdin 258 self.loop.set_exception_handler(lambda loop, msg: None) 259 self.loop.run_until_complete(proc.communicate(large_data)) 260 self.loop.run_until_complete(proc.wait()) 261 262 def test_pause_reading(self): 263 limit = 10 264 size = (limit * 2 + 1) 265 266 async def test_pause_reading(): 267 code = '\n'.join(( 268 'import sys', 269 'sys.stdout.write("x" * %s)' % size, 270 'sys.stdout.flush()', 271 )) 272 273 connect_read_pipe = self.loop.connect_read_pipe 274 275 async def connect_read_pipe_mock(*args, **kw): 276 transport, protocol = await connect_read_pipe(*args, **kw) 277 transport.pause_reading = mock.Mock() 278 transport.resume_reading = mock.Mock() 279 return (transport, protocol) 280 281 self.loop.connect_read_pipe = connect_read_pipe_mock 282 283 proc = await asyncio.create_subprocess_exec( 284 sys.executable, '-c', code, 285 stdin=asyncio.subprocess.PIPE, 286 stdout=asyncio.subprocess.PIPE, 287 limit=limit, 288 ) 289 stdout_transport = proc._transport.get_pipe_transport(1) 290 291 stdout, stderr = await proc.communicate() 292 293 # The child process produced more than limit bytes of output, 294 # the stream reader transport should pause the protocol to not 295 # allocate too much memory. 296 return (stdout, stdout_transport) 297 298 # Issue #22685: Ensure that the stream reader pauses the protocol 299 # when the child process produces too much data 300 stdout, transport = self.loop.run_until_complete(test_pause_reading()) 301 302 self.assertEqual(stdout, b'x' * size) 303 self.assertTrue(transport.pause_reading.called) 304 self.assertTrue(transport.resume_reading.called) 305 306 def test_stdin_not_inheritable(self): 307 # asyncio issue #209: stdin must not be inheritable, otherwise 308 # the Process.communicate() hangs 309 async def len_message(message): 310 code = 'import sys; data = sys.stdin.read(); print(len(data))' 311 proc = await asyncio.create_subprocess_exec( 312 sys.executable, '-c', code, 313 stdin=asyncio.subprocess.PIPE, 314 stdout=asyncio.subprocess.PIPE, 315 stderr=asyncio.subprocess.PIPE, 316 close_fds=False, 317 ) 318 stdout, stderr = await proc.communicate(message) 319 exitcode = await proc.wait() 320 return (stdout, exitcode) 321 322 output, exitcode = self.loop.run_until_complete(len_message(b'abc')) 323 self.assertEqual(output.rstrip(), b'3') 324 self.assertEqual(exitcode, 0) 325 326 def test_empty_input(self): 327 328 async def empty_input(): 329 code = 'import sys; data = sys.stdin.read(); print(len(data))' 330 proc = await asyncio.create_subprocess_exec( 331 sys.executable, '-c', code, 332 stdin=asyncio.subprocess.PIPE, 333 stdout=asyncio.subprocess.PIPE, 334 stderr=asyncio.subprocess.PIPE, 335 close_fds=False, 336 ) 337 stdout, stderr = await proc.communicate(b'') 338 exitcode = await proc.wait() 339 return (stdout, exitcode) 340 341 output, exitcode = self.loop.run_until_complete(empty_input()) 342 self.assertEqual(output.rstrip(), b'0') 343 self.assertEqual(exitcode, 0) 344 345 def test_devnull_input(self): 346 347 async def empty_input(): 348 code = 'import sys; data = sys.stdin.read(); print(len(data))' 349 proc = await asyncio.create_subprocess_exec( 350 sys.executable, '-c', code, 351 stdin=asyncio.subprocess.DEVNULL, 352 stdout=asyncio.subprocess.PIPE, 353 stderr=asyncio.subprocess.PIPE, 354 close_fds=False, 355 ) 356 stdout, stderr = await proc.communicate() 357 exitcode = await proc.wait() 358 return (stdout, exitcode) 359 360 output, exitcode = self.loop.run_until_complete(empty_input()) 361 self.assertEqual(output.rstrip(), b'0') 362 self.assertEqual(exitcode, 0) 363 364 def test_devnull_output(self): 365 366 async def empty_output(): 367 code = 'import sys; data = sys.stdin.read(); print(len(data))' 368 proc = await asyncio.create_subprocess_exec( 369 sys.executable, '-c', code, 370 stdin=asyncio.subprocess.PIPE, 371 stdout=asyncio.subprocess.DEVNULL, 372 stderr=asyncio.subprocess.PIPE, 373 close_fds=False, 374 ) 375 stdout, stderr = await proc.communicate(b"abc") 376 exitcode = await proc.wait() 377 return (stdout, exitcode) 378 379 output, exitcode = self.loop.run_until_complete(empty_output()) 380 self.assertEqual(output, None) 381 self.assertEqual(exitcode, 0) 382 383 def test_devnull_error(self): 384 385 async def empty_error(): 386 code = 'import sys; data = sys.stdin.read(); print(len(data))' 387 proc = await asyncio.create_subprocess_exec( 388 sys.executable, '-c', code, 389 stdin=asyncio.subprocess.PIPE, 390 stdout=asyncio.subprocess.PIPE, 391 stderr=asyncio.subprocess.DEVNULL, 392 close_fds=False, 393 ) 394 stdout, stderr = await proc.communicate(b"abc") 395 exitcode = await proc.wait() 396 return (stderr, exitcode) 397 398 output, exitcode = self.loop.run_until_complete(empty_error()) 399 self.assertEqual(output, None) 400 self.assertEqual(exitcode, 0) 401 402 def test_cancel_process_wait(self): 403 # Issue #23140: cancel Process.wait() 404 405 async def cancel_wait(): 406 proc = await asyncio.create_subprocess_exec(*PROGRAM_BLOCKED) 407 408 # Create an internal future waiting on the process exit 409 task = self.loop.create_task(proc.wait()) 410 self.loop.call_soon(task.cancel) 411 try: 412 await task 413 except asyncio.CancelledError: 414 pass 415 416 # Cancel the future 417 task.cancel() 418 419 # Kill the process and wait until it is done 420 proc.kill() 421 await proc.wait() 422 423 self.loop.run_until_complete(cancel_wait()) 424 425 def test_cancel_make_subprocess_transport_exec(self): 426 427 async def cancel_make_transport(): 428 coro = asyncio.create_subprocess_exec(*PROGRAM_BLOCKED) 429 task = self.loop.create_task(coro) 430 431 self.loop.call_soon(task.cancel) 432 try: 433 await task 434 except asyncio.CancelledError: 435 pass 436 437 # ignore the log: 438 # "Exception during subprocess creation, kill the subprocess" 439 with test_utils.disable_logger(): 440 self.loop.run_until_complete(cancel_make_transport()) 441 442 def test_cancel_post_init(self): 443 444 async def cancel_make_transport(): 445 coro = self.loop.subprocess_exec(asyncio.SubprocessProtocol, 446 *PROGRAM_BLOCKED) 447 task = self.loop.create_task(coro) 448 449 self.loop.call_soon(task.cancel) 450 try: 451 await task 452 except asyncio.CancelledError: 453 pass 454 455 # ignore the log: 456 # "Exception during subprocess creation, kill the subprocess" 457 with test_utils.disable_logger(): 458 self.loop.run_until_complete(cancel_make_transport()) 459 test_utils.run_briefly(self.loop) 460 461 def test_close_kill_running(self): 462 463 async def kill_running(): 464 create = self.loop.subprocess_exec(asyncio.SubprocessProtocol, 465 *PROGRAM_BLOCKED) 466 transport, protocol = await create 467 468 kill_called = False 469 def kill(): 470 nonlocal kill_called 471 kill_called = True 472 orig_kill() 473 474 proc = transport.get_extra_info('subprocess') 475 orig_kill = proc.kill 476 proc.kill = kill 477 returncode = transport.get_returncode() 478 transport.close() 479 await transport._wait() 480 return (returncode, kill_called) 481 482 # Ignore "Close running child process: kill ..." log 483 with test_utils.disable_logger(): 484 returncode, killed = self.loop.run_until_complete(kill_running()) 485 self.assertIsNone(returncode) 486 487 # transport.close() must kill the process if it is still running 488 self.assertTrue(killed) 489 test_utils.run_briefly(self.loop) 490 491 def test_close_dont_kill_finished(self): 492 493 async def kill_running(): 494 create = self.loop.subprocess_exec(asyncio.SubprocessProtocol, 495 *PROGRAM_BLOCKED) 496 transport, protocol = await create 497 proc = transport.get_extra_info('subprocess') 498 499 # kill the process (but asyncio is not notified immediately) 500 proc.kill() 501 proc.wait() 502 503 proc.kill = mock.Mock() 504 proc_returncode = proc.poll() 505 transport_returncode = transport.get_returncode() 506 transport.close() 507 return (proc_returncode, transport_returncode, proc.kill.called) 508 509 # Ignore "Unknown child process pid ..." log of SafeChildWatcher, 510 # emitted because the test already consumes the exit status: 511 # proc.wait() 512 with test_utils.disable_logger(): 513 result = self.loop.run_until_complete(kill_running()) 514 test_utils.run_briefly(self.loop) 515 516 proc_returncode, transport_return_code, killed = result 517 518 self.assertIsNotNone(proc_returncode) 519 self.assertIsNone(transport_return_code) 520 521 # transport.close() must not kill the process if it finished, even if 522 # the transport was not notified yet 523 self.assertFalse(killed) 524 525 # Unlike SafeChildWatcher, FastChildWatcher does not pop the 526 # callbacks if waitpid() is called elsewhere. Let's clear them 527 # manually to avoid a warning when the watcher is detached. 528 if (sys.platform != 'win32' and 529 isinstance(self, SubprocessFastWatcherTests)): 530 asyncio.get_child_watcher()._callbacks.clear() 531 532 async def _test_popen_error(self, stdin): 533 if sys.platform == 'win32': 534 target = 'asyncio.windows_utils.Popen' 535 else: 536 target = 'subprocess.Popen' 537 with mock.patch(target) as popen: 538 exc = ZeroDivisionError 539 popen.side_effect = exc 540 541 with warnings.catch_warnings(record=True) as warns: 542 with self.assertRaises(exc): 543 await asyncio.create_subprocess_exec( 544 sys.executable, 545 '-c', 546 'pass', 547 stdin=stdin 548 ) 549 self.assertEqual(warns, []) 550 551 def test_popen_error(self): 552 # Issue #24763: check that the subprocess transport is closed 553 # when BaseSubprocessTransport fails 554 self.loop.run_until_complete(self._test_popen_error(stdin=None)) 555 556 def test_popen_error_with_stdin_pipe(self): 557 # Issue #35721: check that newly created socket pair is closed when 558 # Popen fails 559 self.loop.run_until_complete( 560 self._test_popen_error(stdin=subprocess.PIPE)) 561 562 def test_read_stdout_after_process_exit(self): 563 564 async def execute(): 565 code = '\n'.join(['import sys', 566 'for _ in range(64):', 567 ' sys.stdout.write("x" * 4096)', 568 'sys.stdout.flush()', 569 'sys.exit(1)']) 570 571 process = await asyncio.create_subprocess_exec( 572 sys.executable, '-c', code, 573 stdout=asyncio.subprocess.PIPE, 574 ) 575 576 while True: 577 data = await process.stdout.read(65536) 578 if data: 579 await asyncio.sleep(0.3) 580 else: 581 break 582 583 self.loop.run_until_complete(execute()) 584 585 def test_create_subprocess_exec_text_mode_fails(self): 586 async def execute(): 587 with self.assertRaises(ValueError): 588 await subprocess.create_subprocess_exec(sys.executable, 589 text=True) 590 591 with self.assertRaises(ValueError): 592 await subprocess.create_subprocess_exec(sys.executable, 593 encoding="utf-8") 594 595 with self.assertRaises(ValueError): 596 await subprocess.create_subprocess_exec(sys.executable, 597 errors="strict") 598 599 self.loop.run_until_complete(execute()) 600 601 def test_create_subprocess_shell_text_mode_fails(self): 602 603 async def execute(): 604 with self.assertRaises(ValueError): 605 await subprocess.create_subprocess_shell(sys.executable, 606 text=True) 607 608 with self.assertRaises(ValueError): 609 await subprocess.create_subprocess_shell(sys.executable, 610 encoding="utf-8") 611 612 with self.assertRaises(ValueError): 613 await subprocess.create_subprocess_shell(sys.executable, 614 errors="strict") 615 616 self.loop.run_until_complete(execute()) 617 618 def test_create_subprocess_exec_with_path(self): 619 async def execute(): 620 p = await subprocess.create_subprocess_exec( 621 support.FakePath(sys.executable), '-c', 'pass') 622 await p.wait() 623 p = await subprocess.create_subprocess_exec( 624 sys.executable, '-c', 'pass', support.FakePath('.')) 625 await p.wait() 626 627 self.assertIsNone(self.loop.run_until_complete(execute())) 628 629 def test_exec_loop_deprecated(self): 630 async def go(): 631 with self.assertWarns(DeprecationWarning): 632 proc = await asyncio.create_subprocess_exec( 633 sys.executable, '-c', 'pass', 634 loop=self.loop, 635 ) 636 await proc.wait() 637 self.loop.run_until_complete(go()) 638 639 def test_shell_loop_deprecated(self): 640 async def go(): 641 with self.assertWarns(DeprecationWarning): 642 proc = await asyncio.create_subprocess_shell( 643 "exit 0", 644 loop=self.loop, 645 ) 646 await proc.wait() 647 self.loop.run_until_complete(go()) 648 649 650if sys.platform != 'win32': 651 # Unix 652 class SubprocessWatcherMixin(SubprocessMixin): 653 654 Watcher = None 655 656 def setUp(self): 657 super().setUp() 658 policy = asyncio.get_event_loop_policy() 659 self.loop = policy.new_event_loop() 660 self.set_event_loop(self.loop) 661 662 watcher = self.Watcher() 663 watcher.attach_loop(self.loop) 664 policy.set_child_watcher(watcher) 665 666 def tearDown(self): 667 super().tearDown() 668 policy = asyncio.get_event_loop_policy() 669 watcher = policy.get_child_watcher() 670 policy.set_child_watcher(None) 671 watcher.attach_loop(None) 672 watcher.close() 673 674 class SubprocessThreadedWatcherTests(SubprocessWatcherMixin, 675 test_utils.TestCase): 676 677 Watcher = unix_events.ThreadedChildWatcher 678 679 class SubprocessMultiLoopWatcherTests(SubprocessWatcherMixin, 680 test_utils.TestCase): 681 682 Watcher = unix_events.MultiLoopChildWatcher 683 684 class SubprocessSafeWatcherTests(SubprocessWatcherMixin, 685 test_utils.TestCase): 686 687 Watcher = unix_events.SafeChildWatcher 688 689 class SubprocessFastWatcherTests(SubprocessWatcherMixin, 690 test_utils.TestCase): 691 692 Watcher = unix_events.FastChildWatcher 693 694else: 695 # Windows 696 class SubprocessProactorTests(SubprocessMixin, test_utils.TestCase): 697 698 def setUp(self): 699 super().setUp() 700 self.loop = asyncio.ProactorEventLoop() 701 self.set_event_loop(self.loop) 702 703 704class GenericWatcherTests: 705 706 def test_create_subprocess_fails_with_inactive_watcher(self): 707 708 async def execute(): 709 watcher = mock.create_authspec(asyncio.AbstractChildWatcher) 710 watcher.is_active.return_value = False 711 asyncio.set_child_watcher(watcher) 712 713 with self.assertRaises(RuntimeError): 714 await subprocess.create_subprocess_exec( 715 support.FakePath(sys.executable), '-c', 'pass') 716 717 watcher.add_child_handler.assert_not_called() 718 719 self.assertIsNone(self.loop.run_until_complete(execute())) 720 721 722 723 724if __name__ == '__main__': 725 unittest.main() 726