1import os 2import signal 3import sys 4import unittest 5import warnings 6from unittest import mock 7 8import asyncio 9from asyncio import base_subprocess 10from asyncio import subprocess 11from test.test_asyncio import utils as test_utils 12from test import support 13from test.support import os_helper 14 15if sys.platform != 'win32': 16 from asyncio import unix_events 17 18# Program blocking 19PROGRAM_BLOCKED = [sys.executable, '-c', 'import time; time.sleep(3600)'] 20 21# Program copying input to output 22PROGRAM_CAT = [ 23 sys.executable, '-c', 24 ';'.join(('import sys', 25 'data = sys.stdin.buffer.read()', 26 'sys.stdout.buffer.write(data)'))] 27 28 29def tearDownModule(): 30 asyncio.set_event_loop_policy(None) 31 32 33class TestSubprocessTransport(base_subprocess.BaseSubprocessTransport): 34 def _start(self, *args, **kwargs): 35 self._proc = mock.Mock() 36 self._proc.stdin = None 37 self._proc.stdout = None 38 self._proc.stderr = None 39 self._proc.pid = -1 40 41 42class SubprocessTransportTests(test_utils.TestCase): 43 def setUp(self): 44 super().setUp() 45 self.loop = self.new_test_loop() 46 self.set_event_loop(self.loop) 47 48 def create_transport(self, waiter=None): 49 protocol = mock.Mock() 50 protocol.connection_made._is_coroutine = False 51 protocol.process_exited._is_coroutine = False 52 transport = TestSubprocessTransport( 53 self.loop, protocol, ['test'], False, 54 None, None, None, 0, waiter=waiter) 55 return (transport, protocol) 56 57 def test_proc_exited(self): 58 waiter = self.loop.create_future() 59 transport, protocol = self.create_transport(waiter) 60 transport._process_exited(6) 61 self.loop.run_until_complete(waiter) 62 63 self.assertEqual(transport.get_returncode(), 6) 64 65 self.assertTrue(protocol.connection_made.called) 66 self.assertTrue(protocol.process_exited.called) 67 self.assertTrue(protocol.connection_lost.called) 68 self.assertEqual(protocol.connection_lost.call_args[0], (None,)) 69 70 self.assertFalse(transport.is_closing()) 71 self.assertIsNone(transport._loop) 72 self.assertIsNone(transport._proc) 73 self.assertIsNone(transport._protocol) 74 75 # methods must raise ProcessLookupError if the process exited 76 self.assertRaises(ProcessLookupError, 77 transport.send_signal, signal.SIGTERM) 78 self.assertRaises(ProcessLookupError, transport.terminate) 79 self.assertRaises(ProcessLookupError, transport.kill) 80 81 transport.close() 82 83 def test_subprocess_repr(self): 84 waiter = self.loop.create_future() 85 transport, protocol = self.create_transport(waiter) 86 transport._process_exited(6) 87 self.loop.run_until_complete(waiter) 88 89 self.assertEqual( 90 repr(transport), 91 "<TestSubprocessTransport pid=-1 returncode=6>" 92 ) 93 transport._returncode = None 94 self.assertEqual( 95 repr(transport), 96 "<TestSubprocessTransport pid=-1 running>" 97 ) 98 transport._pid = None 99 transport._returncode = None 100 self.assertEqual( 101 repr(transport), 102 "<TestSubprocessTransport not started>" 103 ) 104 transport.close() 105 106 107class SubprocessMixin: 108 109 def test_stdin_stdout(self): 110 args = PROGRAM_CAT 111 112 async def run(data): 113 proc = await asyncio.create_subprocess_exec( 114 *args, 115 stdin=subprocess.PIPE, 116 stdout=subprocess.PIPE, 117 ) 118 119 # feed data 120 proc.stdin.write(data) 121 await proc.stdin.drain() 122 proc.stdin.close() 123 124 # get output and exitcode 125 data = await proc.stdout.read() 126 exitcode = await proc.wait() 127 return (exitcode, data) 128 129 task = run(b'some data') 130 task = asyncio.wait_for(task, 60.0) 131 exitcode, stdout = self.loop.run_until_complete(task) 132 self.assertEqual(exitcode, 0) 133 self.assertEqual(stdout, b'some data') 134 135 def test_communicate(self): 136 args = PROGRAM_CAT 137 138 async def run(data): 139 proc = await asyncio.create_subprocess_exec( 140 *args, 141 stdin=subprocess.PIPE, 142 stdout=subprocess.PIPE, 143 ) 144 stdout, stderr = await proc.communicate(data) 145 return proc.returncode, stdout 146 147 task = run(b'some data') 148 task = asyncio.wait_for(task, support.LONG_TIMEOUT) 149 exitcode, stdout = self.loop.run_until_complete(task) 150 self.assertEqual(exitcode, 0) 151 self.assertEqual(stdout, b'some data') 152 153 def test_shell(self): 154 proc = self.loop.run_until_complete( 155 asyncio.create_subprocess_shell('exit 7') 156 ) 157 exitcode = self.loop.run_until_complete(proc.wait()) 158 self.assertEqual(exitcode, 7) 159 160 def test_start_new_session(self): 161 # start the new process in a new session 162 proc = self.loop.run_until_complete( 163 asyncio.create_subprocess_shell( 164 'exit 8', 165 start_new_session=True, 166 ) 167 ) 168 exitcode = self.loop.run_until_complete(proc.wait()) 169 self.assertEqual(exitcode, 8) 170 171 def test_kill(self): 172 args = PROGRAM_BLOCKED 173 proc = self.loop.run_until_complete( 174 asyncio.create_subprocess_exec(*args) 175 ) 176 proc.kill() 177 returncode = self.loop.run_until_complete(proc.wait()) 178 if sys.platform == 'win32': 179 self.assertIsInstance(returncode, int) 180 # expect 1 but sometimes get 0 181 else: 182 self.assertEqual(-signal.SIGKILL, returncode) 183 184 def test_terminate(self): 185 args = PROGRAM_BLOCKED 186 proc = self.loop.run_until_complete( 187 asyncio.create_subprocess_exec(*args) 188 ) 189 proc.terminate() 190 returncode = self.loop.run_until_complete(proc.wait()) 191 if sys.platform == 'win32': 192 self.assertIsInstance(returncode, int) 193 # expect 1 but sometimes get 0 194 else: 195 self.assertEqual(-signal.SIGTERM, returncode) 196 197 @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP") 198 def test_send_signal(self): 199 # bpo-31034: Make sure that we get the default signal handler (killing 200 # the process). The parent process may have decided to ignore SIGHUP, 201 # and signal handlers are inherited. 202 old_handler = signal.signal(signal.SIGHUP, signal.SIG_DFL) 203 try: 204 code = 'import time; print("sleeping", flush=True); time.sleep(3600)' 205 args = [sys.executable, '-c', code] 206 proc = self.loop.run_until_complete( 207 asyncio.create_subprocess_exec( 208 *args, 209 stdout=subprocess.PIPE, 210 ) 211 ) 212 213 async def send_signal(proc): 214 # basic synchronization to wait until the program is sleeping 215 line = await proc.stdout.readline() 216 self.assertEqual(line, b'sleeping\n') 217 218 proc.send_signal(signal.SIGHUP) 219 returncode = await proc.wait() 220 return returncode 221 222 returncode = self.loop.run_until_complete(send_signal(proc)) 223 self.assertEqual(-signal.SIGHUP, returncode) 224 finally: 225 signal.signal(signal.SIGHUP, old_handler) 226 227 def prepare_broken_pipe_test(self): 228 # buffer large enough to feed the whole pipe buffer 229 large_data = b'x' * support.PIPE_MAX_SIZE 230 231 # the program ends before the stdin can be fed 232 proc = self.loop.run_until_complete( 233 asyncio.create_subprocess_exec( 234 sys.executable, '-c', 'pass', 235 stdin=subprocess.PIPE, 236 ) 237 ) 238 239 return (proc, large_data) 240 241 def test_stdin_broken_pipe(self): 242 proc, large_data = self.prepare_broken_pipe_test() 243 244 async def write_stdin(proc, data): 245 await asyncio.sleep(0.5) 246 proc.stdin.write(data) 247 await proc.stdin.drain() 248 249 coro = write_stdin(proc, large_data) 250 # drain() must raise BrokenPipeError or ConnectionResetError 251 with test_utils.disable_logger(): 252 self.assertRaises((BrokenPipeError, ConnectionResetError), 253 self.loop.run_until_complete, coro) 254 self.loop.run_until_complete(proc.wait()) 255 256 def test_communicate_ignore_broken_pipe(self): 257 proc, large_data = self.prepare_broken_pipe_test() 258 259 # communicate() must ignore BrokenPipeError when feeding stdin 260 self.loop.set_exception_handler(lambda loop, msg: None) 261 self.loop.run_until_complete(proc.communicate(large_data)) 262 self.loop.run_until_complete(proc.wait()) 263 264 def test_pause_reading(self): 265 limit = 10 266 size = (limit * 2 + 1) 267 268 async def test_pause_reading(): 269 code = '\n'.join(( 270 'import sys', 271 'sys.stdout.write("x" * %s)' % size, 272 'sys.stdout.flush()', 273 )) 274 275 connect_read_pipe = self.loop.connect_read_pipe 276 277 async def connect_read_pipe_mock(*args, **kw): 278 transport, protocol = await connect_read_pipe(*args, **kw) 279 transport.pause_reading = mock.Mock() 280 transport.resume_reading = mock.Mock() 281 return (transport, protocol) 282 283 self.loop.connect_read_pipe = connect_read_pipe_mock 284 285 proc = await asyncio.create_subprocess_exec( 286 sys.executable, '-c', code, 287 stdin=asyncio.subprocess.PIPE, 288 stdout=asyncio.subprocess.PIPE, 289 limit=limit, 290 ) 291 stdout_transport = proc._transport.get_pipe_transport(1) 292 293 stdout, stderr = await proc.communicate() 294 295 # The child process produced more than limit bytes of output, 296 # the stream reader transport should pause the protocol to not 297 # allocate too much memory. 298 return (stdout, stdout_transport) 299 300 # Issue #22685: Ensure that the stream reader pauses the protocol 301 # when the child process produces too much data 302 stdout, transport = self.loop.run_until_complete(test_pause_reading()) 303 304 self.assertEqual(stdout, b'x' * size) 305 self.assertTrue(transport.pause_reading.called) 306 self.assertTrue(transport.resume_reading.called) 307 308 def test_stdin_not_inheritable(self): 309 # asyncio issue #209: stdin must not be inheritable, otherwise 310 # the Process.communicate() hangs 311 async def len_message(message): 312 code = 'import sys; data = sys.stdin.read(); print(len(data))' 313 proc = await asyncio.create_subprocess_exec( 314 sys.executable, '-c', code, 315 stdin=asyncio.subprocess.PIPE, 316 stdout=asyncio.subprocess.PIPE, 317 stderr=asyncio.subprocess.PIPE, 318 close_fds=False, 319 ) 320 stdout, stderr = await proc.communicate(message) 321 exitcode = await proc.wait() 322 return (stdout, exitcode) 323 324 output, exitcode = self.loop.run_until_complete(len_message(b'abc')) 325 self.assertEqual(output.rstrip(), b'3') 326 self.assertEqual(exitcode, 0) 327 328 def test_empty_input(self): 329 330 async def empty_input(): 331 code = 'import sys; data = sys.stdin.read(); print(len(data))' 332 proc = await asyncio.create_subprocess_exec( 333 sys.executable, '-c', code, 334 stdin=asyncio.subprocess.PIPE, 335 stdout=asyncio.subprocess.PIPE, 336 stderr=asyncio.subprocess.PIPE, 337 close_fds=False, 338 ) 339 stdout, stderr = await proc.communicate(b'') 340 exitcode = await proc.wait() 341 return (stdout, exitcode) 342 343 output, exitcode = self.loop.run_until_complete(empty_input()) 344 self.assertEqual(output.rstrip(), b'0') 345 self.assertEqual(exitcode, 0) 346 347 def test_devnull_input(self): 348 349 async def empty_input(): 350 code = 'import sys; data = sys.stdin.read(); print(len(data))' 351 proc = await asyncio.create_subprocess_exec( 352 sys.executable, '-c', code, 353 stdin=asyncio.subprocess.DEVNULL, 354 stdout=asyncio.subprocess.PIPE, 355 stderr=asyncio.subprocess.PIPE, 356 close_fds=False, 357 ) 358 stdout, stderr = await proc.communicate() 359 exitcode = await proc.wait() 360 return (stdout, exitcode) 361 362 output, exitcode = self.loop.run_until_complete(empty_input()) 363 self.assertEqual(output.rstrip(), b'0') 364 self.assertEqual(exitcode, 0) 365 366 def test_devnull_output(self): 367 368 async def empty_output(): 369 code = 'import sys; data = sys.stdin.read(); print(len(data))' 370 proc = await asyncio.create_subprocess_exec( 371 sys.executable, '-c', code, 372 stdin=asyncio.subprocess.PIPE, 373 stdout=asyncio.subprocess.DEVNULL, 374 stderr=asyncio.subprocess.PIPE, 375 close_fds=False, 376 ) 377 stdout, stderr = await proc.communicate(b"abc") 378 exitcode = await proc.wait() 379 return (stdout, exitcode) 380 381 output, exitcode = self.loop.run_until_complete(empty_output()) 382 self.assertEqual(output, None) 383 self.assertEqual(exitcode, 0) 384 385 def test_devnull_error(self): 386 387 async def empty_error(): 388 code = 'import sys; data = sys.stdin.read(); print(len(data))' 389 proc = await asyncio.create_subprocess_exec( 390 sys.executable, '-c', code, 391 stdin=asyncio.subprocess.PIPE, 392 stdout=asyncio.subprocess.PIPE, 393 stderr=asyncio.subprocess.DEVNULL, 394 close_fds=False, 395 ) 396 stdout, stderr = await proc.communicate(b"abc") 397 exitcode = await proc.wait() 398 return (stderr, exitcode) 399 400 output, exitcode = self.loop.run_until_complete(empty_error()) 401 self.assertEqual(output, None) 402 self.assertEqual(exitcode, 0) 403 404 def test_cancel_process_wait(self): 405 # Issue #23140: cancel Process.wait() 406 407 async def cancel_wait(): 408 proc = await asyncio.create_subprocess_exec(*PROGRAM_BLOCKED) 409 410 # Create an internal future waiting on the process exit 411 task = self.loop.create_task(proc.wait()) 412 self.loop.call_soon(task.cancel) 413 try: 414 await task 415 except asyncio.CancelledError: 416 pass 417 418 # Cancel the future 419 task.cancel() 420 421 # Kill the process and wait until it is done 422 proc.kill() 423 await proc.wait() 424 425 self.loop.run_until_complete(cancel_wait()) 426 427 def test_cancel_make_subprocess_transport_exec(self): 428 429 async def cancel_make_transport(): 430 coro = asyncio.create_subprocess_exec(*PROGRAM_BLOCKED) 431 task = self.loop.create_task(coro) 432 433 self.loop.call_soon(task.cancel) 434 try: 435 await task 436 except asyncio.CancelledError: 437 pass 438 439 # ignore the log: 440 # "Exception during subprocess creation, kill the subprocess" 441 with test_utils.disable_logger(): 442 self.loop.run_until_complete(cancel_make_transport()) 443 444 def test_cancel_post_init(self): 445 446 async def cancel_make_transport(): 447 coro = self.loop.subprocess_exec(asyncio.SubprocessProtocol, 448 *PROGRAM_BLOCKED) 449 task = self.loop.create_task(coro) 450 451 self.loop.call_soon(task.cancel) 452 try: 453 await task 454 except asyncio.CancelledError: 455 pass 456 457 # ignore the log: 458 # "Exception during subprocess creation, kill the subprocess" 459 with test_utils.disable_logger(): 460 self.loop.run_until_complete(cancel_make_transport()) 461 test_utils.run_briefly(self.loop) 462 463 def test_close_kill_running(self): 464 465 async def kill_running(): 466 create = self.loop.subprocess_exec(asyncio.SubprocessProtocol, 467 *PROGRAM_BLOCKED) 468 transport, protocol = await create 469 470 kill_called = False 471 def kill(): 472 nonlocal kill_called 473 kill_called = True 474 orig_kill() 475 476 proc = transport.get_extra_info('subprocess') 477 orig_kill = proc.kill 478 proc.kill = kill 479 returncode = transport.get_returncode() 480 transport.close() 481 await asyncio.wait_for(transport._wait(), 5) 482 return (returncode, kill_called) 483 484 # Ignore "Close running child process: kill ..." log 485 with test_utils.disable_logger(): 486 try: 487 returncode, killed = self.loop.run_until_complete( 488 kill_running() 489 ) 490 except asyncio.TimeoutError: 491 self.skipTest( 492 "Timeout failure on waiting for subprocess stopping" 493 ) 494 self.assertIsNone(returncode) 495 496 # transport.close() must kill the process if it is still running 497 self.assertTrue(killed) 498 test_utils.run_briefly(self.loop) 499 500 def test_close_dont_kill_finished(self): 501 502 async def kill_running(): 503 create = self.loop.subprocess_exec(asyncio.SubprocessProtocol, 504 *PROGRAM_BLOCKED) 505 transport, protocol = await create 506 proc = transport.get_extra_info('subprocess') 507 508 # kill the process (but asyncio is not notified immediately) 509 proc.kill() 510 proc.wait() 511 512 proc.kill = mock.Mock() 513 proc_returncode = proc.poll() 514 transport_returncode = transport.get_returncode() 515 transport.close() 516 return (proc_returncode, transport_returncode, proc.kill.called) 517 518 # Ignore "Unknown child process pid ..." log of SafeChildWatcher, 519 # emitted because the test already consumes the exit status: 520 # proc.wait() 521 with test_utils.disable_logger(): 522 result = self.loop.run_until_complete(kill_running()) 523 test_utils.run_briefly(self.loop) 524 525 proc_returncode, transport_return_code, killed = result 526 527 self.assertIsNotNone(proc_returncode) 528 self.assertIsNone(transport_return_code) 529 530 # transport.close() must not kill the process if it finished, even if 531 # the transport was not notified yet 532 self.assertFalse(killed) 533 534 # Unlike SafeChildWatcher, FastChildWatcher does not pop the 535 # callbacks if waitpid() is called elsewhere. Let's clear them 536 # manually to avoid a warning when the watcher is detached. 537 if (sys.platform != 'win32' and 538 isinstance(self, SubprocessFastWatcherTests)): 539 asyncio.get_child_watcher()._callbacks.clear() 540 541 async def _test_popen_error(self, stdin): 542 if sys.platform == 'win32': 543 target = 'asyncio.windows_utils.Popen' 544 else: 545 target = 'subprocess.Popen' 546 with mock.patch(target) as popen: 547 exc = ZeroDivisionError 548 popen.side_effect = exc 549 550 with warnings.catch_warnings(record=True) as warns: 551 with self.assertRaises(exc): 552 await asyncio.create_subprocess_exec( 553 sys.executable, 554 '-c', 555 'pass', 556 stdin=stdin 557 ) 558 self.assertEqual(warns, []) 559 560 def test_popen_error(self): 561 # Issue #24763: check that the subprocess transport is closed 562 # when BaseSubprocessTransport fails 563 self.loop.run_until_complete(self._test_popen_error(stdin=None)) 564 565 def test_popen_error_with_stdin_pipe(self): 566 # Issue #35721: check that newly created socket pair is closed when 567 # Popen fails 568 self.loop.run_until_complete( 569 self._test_popen_error(stdin=subprocess.PIPE)) 570 571 def test_read_stdout_after_process_exit(self): 572 573 async def execute(): 574 code = '\n'.join(['import sys', 575 'for _ in range(64):', 576 ' sys.stdout.write("x" * 4096)', 577 'sys.stdout.flush()', 578 'sys.exit(1)']) 579 580 process = await asyncio.create_subprocess_exec( 581 sys.executable, '-c', code, 582 stdout=asyncio.subprocess.PIPE, 583 ) 584 585 while True: 586 data = await process.stdout.read(65536) 587 if data: 588 await asyncio.sleep(0.3) 589 else: 590 break 591 592 self.loop.run_until_complete(execute()) 593 594 def test_create_subprocess_exec_text_mode_fails(self): 595 async def execute(): 596 with self.assertRaises(ValueError): 597 await subprocess.create_subprocess_exec(sys.executable, 598 text=True) 599 600 with self.assertRaises(ValueError): 601 await subprocess.create_subprocess_exec(sys.executable, 602 encoding="utf-8") 603 604 with self.assertRaises(ValueError): 605 await subprocess.create_subprocess_exec(sys.executable, 606 errors="strict") 607 608 self.loop.run_until_complete(execute()) 609 610 def test_create_subprocess_shell_text_mode_fails(self): 611 612 async def execute(): 613 with self.assertRaises(ValueError): 614 await subprocess.create_subprocess_shell(sys.executable, 615 text=True) 616 617 with self.assertRaises(ValueError): 618 await subprocess.create_subprocess_shell(sys.executable, 619 encoding="utf-8") 620 621 with self.assertRaises(ValueError): 622 await subprocess.create_subprocess_shell(sys.executable, 623 errors="strict") 624 625 self.loop.run_until_complete(execute()) 626 627 def test_create_subprocess_exec_with_path(self): 628 async def execute(): 629 p = await subprocess.create_subprocess_exec( 630 os_helper.FakePath(sys.executable), '-c', 'pass') 631 await p.wait() 632 p = await subprocess.create_subprocess_exec( 633 sys.executable, '-c', 'pass', os_helper.FakePath('.')) 634 await p.wait() 635 636 self.assertIsNone(self.loop.run_until_complete(execute())) 637 638 639if sys.platform != 'win32': 640 # Unix 641 class SubprocessWatcherMixin(SubprocessMixin): 642 643 Watcher = None 644 645 def setUp(self): 646 super().setUp() 647 policy = asyncio.get_event_loop_policy() 648 self.loop = policy.new_event_loop() 649 self.set_event_loop(self.loop) 650 651 watcher = self.Watcher() 652 watcher.attach_loop(self.loop) 653 policy.set_child_watcher(watcher) 654 655 def tearDown(self): 656 super().tearDown() 657 policy = asyncio.get_event_loop_policy() 658 watcher = policy.get_child_watcher() 659 policy.set_child_watcher(None) 660 watcher.attach_loop(None) 661 watcher.close() 662 663 class SubprocessThreadedWatcherTests(SubprocessWatcherMixin, 664 test_utils.TestCase): 665 666 Watcher = unix_events.ThreadedChildWatcher 667 668 @unittest.skip("bpo-38323: MultiLoopChildWatcher has a race condition \ 669 and these tests can hang the test suite") 670 class SubprocessMultiLoopWatcherTests(SubprocessWatcherMixin, 671 test_utils.TestCase): 672 673 Watcher = unix_events.MultiLoopChildWatcher 674 675 class SubprocessSafeWatcherTests(SubprocessWatcherMixin, 676 test_utils.TestCase): 677 678 Watcher = unix_events.SafeChildWatcher 679 680 class SubprocessFastWatcherTests(SubprocessWatcherMixin, 681 test_utils.TestCase): 682 683 Watcher = unix_events.FastChildWatcher 684 685 def has_pidfd_support(): 686 if not hasattr(os, 'pidfd_open'): 687 return False 688 try: 689 os.close(os.pidfd_open(os.getpid())) 690 except OSError: 691 return False 692 return True 693 694 @unittest.skipUnless( 695 has_pidfd_support(), 696 "operating system does not support pidfds", 697 ) 698 class SubprocessPidfdWatcherTests(SubprocessWatcherMixin, 699 test_utils.TestCase): 700 Watcher = unix_events.PidfdChildWatcher 701 702else: 703 # Windows 704 class SubprocessProactorTests(SubprocessMixin, test_utils.TestCase): 705 706 def setUp(self): 707 super().setUp() 708 self.loop = asyncio.ProactorEventLoop() 709 self.set_event_loop(self.loop) 710 711 712class GenericWatcherTests: 713 714 def test_create_subprocess_fails_with_inactive_watcher(self): 715 716 async def execute(): 717 watcher = mock.create_authspec(asyncio.AbstractChildWatcher) 718 watcher.is_active.return_value = False 719 asyncio.set_child_watcher(watcher) 720 721 with self.assertRaises(RuntimeError): 722 await subprocess.create_subprocess_exec( 723 os_helper.FakePath(sys.executable), '-c', 'pass') 724 725 watcher.add_child_handler.assert_not_called() 726 727 self.assertIsNone(self.loop.run_until_complete(execute())) 728 729 730 731 732if __name__ == '__main__': 733 unittest.main() 734