1import unittest 2from unittest import mock 3from test import support 4import subprocess 5import sys 6import signal 7import io 8import itertools 9import os 10import errno 11import tempfile 12import time 13import traceback 14import selectors 15import sysconfig 16import select 17import shutil 18import threading 19import gc 20import textwrap 21from test.support import FakePath 22 23try: 24 import _testcapi 25except ImportError: 26 _testcapi = None 27 28 29if support.PGO: 30 raise unittest.SkipTest("test is not helpful for PGO") 31 32mswindows = (sys.platform == "win32") 33 34# 35# Depends on the following external programs: Python 36# 37 38if mswindows: 39 SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), ' 40 'os.O_BINARY);') 41else: 42 SETBINARY = '' 43 44NONEXISTING_CMD = ('nonexisting_i_hope',) 45# Ignore errors that indicate the command was not found 46NONEXISTING_ERRORS = (FileNotFoundError, NotADirectoryError, PermissionError) 47 48ZERO_RETURN_CMD = (sys.executable, '-c', 'pass') 49 50 51def setUpModule(): 52 shell_true = shutil.which('true') 53 if shell_true is None: 54 return 55 if (os.access(shell_true, os.X_OK) and 56 subprocess.run([shell_true]).returncode == 0): 57 global ZERO_RETURN_CMD 58 ZERO_RETURN_CMD = (shell_true,) # Faster than Python startup. 59 60 61class BaseTestCase(unittest.TestCase): 62 def setUp(self): 63 # Try to minimize the number of children we have so this test 64 # doesn't crash on some buildbots (Alphas in particular). 65 support.reap_children() 66 67 def tearDown(self): 68 if not mswindows: 69 # subprocess._active is not used on Windows and is set to None. 70 for inst in subprocess._active: 71 inst.wait() 72 subprocess._cleanup() 73 self.assertFalse( 74 subprocess._active, "subprocess._active not empty" 75 ) 76 self.doCleanups() 77 support.reap_children() 78 79 def assertStderrEqual(self, stderr, expected, msg=None): 80 # In a debug build, stuff like "[6580 refs]" is printed to stderr at 81 # shutdown time. That frustrates tests trying to check stderr produced 82 # from a spawned Python process. 83 actual = support.strip_python_stderr(stderr) 84 # strip_python_stderr also strips whitespace, so we do too. 85 expected = expected.strip() 86 self.assertEqual(actual, expected, msg) 87 88 89class PopenTestException(Exception): 90 pass 91 92 93class PopenExecuteChildRaises(subprocess.Popen): 94 """Popen subclass for testing cleanup of subprocess.PIPE filehandles when 95 _execute_child fails. 96 """ 97 def _execute_child(self, *args, **kwargs): 98 raise PopenTestException("Forced Exception for Test") 99 100 101class ProcessTestCase(BaseTestCase): 102 103 def test_io_buffered_by_default(self): 104 p = subprocess.Popen(ZERO_RETURN_CMD, 105 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 106 stderr=subprocess.PIPE) 107 try: 108 self.assertIsInstance(p.stdin, io.BufferedIOBase) 109 self.assertIsInstance(p.stdout, io.BufferedIOBase) 110 self.assertIsInstance(p.stderr, io.BufferedIOBase) 111 finally: 112 p.stdin.close() 113 p.stdout.close() 114 p.stderr.close() 115 p.wait() 116 117 def test_io_unbuffered_works(self): 118 p = subprocess.Popen(ZERO_RETURN_CMD, 119 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 120 stderr=subprocess.PIPE, bufsize=0) 121 try: 122 self.assertIsInstance(p.stdin, io.RawIOBase) 123 self.assertIsInstance(p.stdout, io.RawIOBase) 124 self.assertIsInstance(p.stderr, io.RawIOBase) 125 finally: 126 p.stdin.close() 127 p.stdout.close() 128 p.stderr.close() 129 p.wait() 130 131 def test_call_seq(self): 132 # call() function with sequence argument 133 rc = subprocess.call([sys.executable, "-c", 134 "import sys; sys.exit(47)"]) 135 self.assertEqual(rc, 47) 136 137 def test_call_timeout(self): 138 # call() function with timeout argument; we want to test that the child 139 # process gets killed when the timeout expires. If the child isn't 140 # killed, this call will deadlock since subprocess.call waits for the 141 # child. 142 self.assertRaises(subprocess.TimeoutExpired, subprocess.call, 143 [sys.executable, "-c", "while True: pass"], 144 timeout=0.1) 145 146 def test_check_call_zero(self): 147 # check_call() function with zero return code 148 rc = subprocess.check_call(ZERO_RETURN_CMD) 149 self.assertEqual(rc, 0) 150 151 def test_check_call_nonzero(self): 152 # check_call() function with non-zero return code 153 with self.assertRaises(subprocess.CalledProcessError) as c: 154 subprocess.check_call([sys.executable, "-c", 155 "import sys; sys.exit(47)"]) 156 self.assertEqual(c.exception.returncode, 47) 157 158 def test_check_output(self): 159 # check_output() function with zero return code 160 output = subprocess.check_output( 161 [sys.executable, "-c", "print('BDFL')"]) 162 self.assertIn(b'BDFL', output) 163 164 def test_check_output_nonzero(self): 165 # check_call() function with non-zero return code 166 with self.assertRaises(subprocess.CalledProcessError) as c: 167 subprocess.check_output( 168 [sys.executable, "-c", "import sys; sys.exit(5)"]) 169 self.assertEqual(c.exception.returncode, 5) 170 171 def test_check_output_stderr(self): 172 # check_output() function stderr redirected to stdout 173 output = subprocess.check_output( 174 [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"], 175 stderr=subprocess.STDOUT) 176 self.assertIn(b'BDFL', output) 177 178 def test_check_output_stdin_arg(self): 179 # check_output() can be called with stdin set to a file 180 tf = tempfile.TemporaryFile() 181 self.addCleanup(tf.close) 182 tf.write(b'pear') 183 tf.seek(0) 184 output = subprocess.check_output( 185 [sys.executable, "-c", 186 "import sys; sys.stdout.write(sys.stdin.read().upper())"], 187 stdin=tf) 188 self.assertIn(b'PEAR', output) 189 190 def test_check_output_input_arg(self): 191 # check_output() can be called with input set to a string 192 output = subprocess.check_output( 193 [sys.executable, "-c", 194 "import sys; sys.stdout.write(sys.stdin.read().upper())"], 195 input=b'pear') 196 self.assertIn(b'PEAR', output) 197 198 def test_check_output_stdout_arg(self): 199 # check_output() refuses to accept 'stdout' argument 200 with self.assertRaises(ValueError) as c: 201 output = subprocess.check_output( 202 [sys.executable, "-c", "print('will not be run')"], 203 stdout=sys.stdout) 204 self.fail("Expected ValueError when stdout arg supplied.") 205 self.assertIn('stdout', c.exception.args[0]) 206 207 def test_check_output_stdin_with_input_arg(self): 208 # check_output() refuses to accept 'stdin' with 'input' 209 tf = tempfile.TemporaryFile() 210 self.addCleanup(tf.close) 211 tf.write(b'pear') 212 tf.seek(0) 213 with self.assertRaises(ValueError) as c: 214 output = subprocess.check_output( 215 [sys.executable, "-c", "print('will not be run')"], 216 stdin=tf, input=b'hare') 217 self.fail("Expected ValueError when stdin and input args supplied.") 218 self.assertIn('stdin', c.exception.args[0]) 219 self.assertIn('input', c.exception.args[0]) 220 221 def test_check_output_timeout(self): 222 # check_output() function with timeout arg 223 with self.assertRaises(subprocess.TimeoutExpired) as c: 224 output = subprocess.check_output( 225 [sys.executable, "-c", 226 "import sys, time\n" 227 "sys.stdout.write('BDFL')\n" 228 "sys.stdout.flush()\n" 229 "time.sleep(3600)"], 230 # Some heavily loaded buildbots (sparc Debian 3.x) require 231 # this much time to start and print. 232 timeout=3) 233 self.fail("Expected TimeoutExpired.") 234 self.assertEqual(c.exception.output, b'BDFL') 235 236 def test_call_kwargs(self): 237 # call() function with keyword args 238 newenv = os.environ.copy() 239 newenv["FRUIT"] = "banana" 240 rc = subprocess.call([sys.executable, "-c", 241 'import sys, os;' 242 'sys.exit(os.getenv("FRUIT")=="banana")'], 243 env=newenv) 244 self.assertEqual(rc, 1) 245 246 def test_invalid_args(self): 247 # Popen() called with invalid arguments should raise TypeError 248 # but Popen.__del__ should not complain (issue #12085) 249 with support.captured_stderr() as s: 250 self.assertRaises(TypeError, subprocess.Popen, invalid_arg_name=1) 251 argcount = subprocess.Popen.__init__.__code__.co_argcount 252 too_many_args = [0] * (argcount + 1) 253 self.assertRaises(TypeError, subprocess.Popen, *too_many_args) 254 self.assertEqual(s.getvalue(), '') 255 256 def test_stdin_none(self): 257 # .stdin is None when not redirected 258 p = subprocess.Popen([sys.executable, "-c", 'print("banana")'], 259 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 260 self.addCleanup(p.stdout.close) 261 self.addCleanup(p.stderr.close) 262 p.wait() 263 self.assertEqual(p.stdin, None) 264 265 def test_stdout_none(self): 266 # .stdout is None when not redirected, and the child's stdout will 267 # be inherited from the parent. In order to test this we run a 268 # subprocess in a subprocess: 269 # this_test 270 # \-- subprocess created by this test (parent) 271 # \-- subprocess created by the parent subprocess (child) 272 # The parent doesn't specify stdout, so the child will use the 273 # parent's stdout. This test checks that the message printed by the 274 # child goes to the parent stdout. The parent also checks that the 275 # child's stdout is None. See #11963. 276 code = ('import sys; from subprocess import Popen, PIPE;' 277 'p = Popen([sys.executable, "-c", "print(\'test_stdout_none\')"],' 278 ' stdin=PIPE, stderr=PIPE);' 279 'p.wait(); assert p.stdout is None;') 280 p = subprocess.Popen([sys.executable, "-c", code], 281 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 282 self.addCleanup(p.stdout.close) 283 self.addCleanup(p.stderr.close) 284 out, err = p.communicate() 285 self.assertEqual(p.returncode, 0, err) 286 self.assertEqual(out.rstrip(), b'test_stdout_none') 287 288 def test_stderr_none(self): 289 # .stderr is None when not redirected 290 p = subprocess.Popen([sys.executable, "-c", 'print("banana")'], 291 stdin=subprocess.PIPE, stdout=subprocess.PIPE) 292 self.addCleanup(p.stdout.close) 293 self.addCleanup(p.stdin.close) 294 p.wait() 295 self.assertEqual(p.stderr, None) 296 297 def _assert_python(self, pre_args, **kwargs): 298 # We include sys.exit() to prevent the test runner from hanging 299 # whenever python is found. 300 args = pre_args + ["import sys; sys.exit(47)"] 301 p = subprocess.Popen(args, **kwargs) 302 p.wait() 303 self.assertEqual(47, p.returncode) 304 305 def test_executable(self): 306 # Check that the executable argument works. 307 # 308 # On Unix (non-Mac and non-Windows), Python looks at args[0] to 309 # determine where its standard library is, so we need the directory 310 # of args[0] to be valid for the Popen() call to Python to succeed. 311 # See also issue #16170 and issue #7774. 312 doesnotexist = os.path.join(os.path.dirname(sys.executable), 313 "doesnotexist") 314 self._assert_python([doesnotexist, "-c"], executable=sys.executable) 315 316 def test_bytes_executable(self): 317 doesnotexist = os.path.join(os.path.dirname(sys.executable), 318 "doesnotexist") 319 self._assert_python([doesnotexist, "-c"], 320 executable=os.fsencode(sys.executable)) 321 322 def test_pathlike_executable(self): 323 doesnotexist = os.path.join(os.path.dirname(sys.executable), 324 "doesnotexist") 325 self._assert_python([doesnotexist, "-c"], 326 executable=FakePath(sys.executable)) 327 328 def test_executable_takes_precedence(self): 329 # Check that the executable argument takes precedence over args[0]. 330 # 331 # Verify first that the call succeeds without the executable arg. 332 pre_args = [sys.executable, "-c"] 333 self._assert_python(pre_args) 334 self.assertRaises(NONEXISTING_ERRORS, 335 self._assert_python, pre_args, 336 executable=NONEXISTING_CMD[0]) 337 338 @unittest.skipIf(mswindows, "executable argument replaces shell") 339 def test_executable_replaces_shell(self): 340 # Check that the executable argument replaces the default shell 341 # when shell=True. 342 self._assert_python([], executable=sys.executable, shell=True) 343 344 @unittest.skipIf(mswindows, "executable argument replaces shell") 345 def test_bytes_executable_replaces_shell(self): 346 self._assert_python([], executable=os.fsencode(sys.executable), 347 shell=True) 348 349 @unittest.skipIf(mswindows, "executable argument replaces shell") 350 def test_pathlike_executable_replaces_shell(self): 351 self._assert_python([], executable=FakePath(sys.executable), 352 shell=True) 353 354 # For use in the test_cwd* tests below. 355 def _normalize_cwd(self, cwd): 356 # Normalize an expected cwd (for Tru64 support). 357 # We can't use os.path.realpath since it doesn't expand Tru64 {memb} 358 # strings. See bug #1063571. 359 with support.change_cwd(cwd): 360 return os.getcwd() 361 362 # For use in the test_cwd* tests below. 363 def _split_python_path(self): 364 # Return normalized (python_dir, python_base). 365 python_path = os.path.realpath(sys.executable) 366 return os.path.split(python_path) 367 368 # For use in the test_cwd* tests below. 369 def _assert_cwd(self, expected_cwd, python_arg, **kwargs): 370 # Invoke Python via Popen, and assert that (1) the call succeeds, 371 # and that (2) the current working directory of the child process 372 # matches *expected_cwd*. 373 p = subprocess.Popen([python_arg, "-c", 374 "import os, sys; " 375 "sys.stdout.write(os.getcwd()); " 376 "sys.exit(47)"], 377 stdout=subprocess.PIPE, 378 **kwargs) 379 self.addCleanup(p.stdout.close) 380 p.wait() 381 self.assertEqual(47, p.returncode) 382 normcase = os.path.normcase 383 self.assertEqual(normcase(expected_cwd), 384 normcase(p.stdout.read().decode("utf-8"))) 385 386 def test_cwd(self): 387 # Check that cwd changes the cwd for the child process. 388 temp_dir = tempfile.gettempdir() 389 temp_dir = self._normalize_cwd(temp_dir) 390 self._assert_cwd(temp_dir, sys.executable, cwd=temp_dir) 391 392 def test_cwd_with_bytes(self): 393 temp_dir = tempfile.gettempdir() 394 temp_dir = self._normalize_cwd(temp_dir) 395 self._assert_cwd(temp_dir, sys.executable, cwd=os.fsencode(temp_dir)) 396 397 def test_cwd_with_pathlike(self): 398 temp_dir = tempfile.gettempdir() 399 temp_dir = self._normalize_cwd(temp_dir) 400 self._assert_cwd(temp_dir, sys.executable, cwd=FakePath(temp_dir)) 401 402 @unittest.skipIf(mswindows, "pending resolution of issue #15533") 403 def test_cwd_with_relative_arg(self): 404 # Check that Popen looks for args[0] relative to cwd if args[0] 405 # is relative. 406 python_dir, python_base = self._split_python_path() 407 rel_python = os.path.join(os.curdir, python_base) 408 with support.temp_cwd() as wrong_dir: 409 # Before calling with the correct cwd, confirm that the call fails 410 # without cwd and with the wrong cwd. 411 self.assertRaises(FileNotFoundError, subprocess.Popen, 412 [rel_python]) 413 self.assertRaises(FileNotFoundError, subprocess.Popen, 414 [rel_python], cwd=wrong_dir) 415 python_dir = self._normalize_cwd(python_dir) 416 self._assert_cwd(python_dir, rel_python, cwd=python_dir) 417 418 @unittest.skipIf(mswindows, "pending resolution of issue #15533") 419 def test_cwd_with_relative_executable(self): 420 # Check that Popen looks for executable relative to cwd if executable 421 # is relative (and that executable takes precedence over args[0]). 422 python_dir, python_base = self._split_python_path() 423 rel_python = os.path.join(os.curdir, python_base) 424 doesntexist = "somethingyoudonthave" 425 with support.temp_cwd() as wrong_dir: 426 # Before calling with the correct cwd, confirm that the call fails 427 # without cwd and with the wrong cwd. 428 self.assertRaises(FileNotFoundError, subprocess.Popen, 429 [doesntexist], executable=rel_python) 430 self.assertRaises(FileNotFoundError, subprocess.Popen, 431 [doesntexist], executable=rel_python, 432 cwd=wrong_dir) 433 python_dir = self._normalize_cwd(python_dir) 434 self._assert_cwd(python_dir, doesntexist, executable=rel_python, 435 cwd=python_dir) 436 437 def test_cwd_with_absolute_arg(self): 438 # Check that Popen can find the executable when the cwd is wrong 439 # if args[0] is an absolute path. 440 python_dir, python_base = self._split_python_path() 441 abs_python = os.path.join(python_dir, python_base) 442 rel_python = os.path.join(os.curdir, python_base) 443 with support.temp_dir() as wrong_dir: 444 # Before calling with an absolute path, confirm that using a 445 # relative path fails. 446 self.assertRaises(FileNotFoundError, subprocess.Popen, 447 [rel_python], cwd=wrong_dir) 448 wrong_dir = self._normalize_cwd(wrong_dir) 449 self._assert_cwd(wrong_dir, abs_python, cwd=wrong_dir) 450 451 @unittest.skipIf(sys.base_prefix != sys.prefix, 452 'Test is not venv-compatible') 453 def test_executable_with_cwd(self): 454 python_dir, python_base = self._split_python_path() 455 python_dir = self._normalize_cwd(python_dir) 456 self._assert_cwd(python_dir, "somethingyoudonthave", 457 executable=sys.executable, cwd=python_dir) 458 459 @unittest.skipIf(sys.base_prefix != sys.prefix, 460 'Test is not venv-compatible') 461 @unittest.skipIf(sysconfig.is_python_build(), 462 "need an installed Python. See #7774") 463 def test_executable_without_cwd(self): 464 # For a normal installation, it should work without 'cwd' 465 # argument. For test runs in the build directory, see #7774. 466 self._assert_cwd(os.getcwd(), "somethingyoudonthave", 467 executable=sys.executable) 468 469 def test_stdin_pipe(self): 470 # stdin redirection 471 p = subprocess.Popen([sys.executable, "-c", 472 'import sys; sys.exit(sys.stdin.read() == "pear")'], 473 stdin=subprocess.PIPE) 474 p.stdin.write(b"pear") 475 p.stdin.close() 476 p.wait() 477 self.assertEqual(p.returncode, 1) 478 479 def test_stdin_filedes(self): 480 # stdin is set to open file descriptor 481 tf = tempfile.TemporaryFile() 482 self.addCleanup(tf.close) 483 d = tf.fileno() 484 os.write(d, b"pear") 485 os.lseek(d, 0, 0) 486 p = subprocess.Popen([sys.executable, "-c", 487 'import sys; sys.exit(sys.stdin.read() == "pear")'], 488 stdin=d) 489 p.wait() 490 self.assertEqual(p.returncode, 1) 491 492 def test_stdin_fileobj(self): 493 # stdin is set to open file object 494 tf = tempfile.TemporaryFile() 495 self.addCleanup(tf.close) 496 tf.write(b"pear") 497 tf.seek(0) 498 p = subprocess.Popen([sys.executable, "-c", 499 'import sys; sys.exit(sys.stdin.read() == "pear")'], 500 stdin=tf) 501 p.wait() 502 self.assertEqual(p.returncode, 1) 503 504 def test_stdout_pipe(self): 505 # stdout redirection 506 p = subprocess.Popen([sys.executable, "-c", 507 'import sys; sys.stdout.write("orange")'], 508 stdout=subprocess.PIPE) 509 with p: 510 self.assertEqual(p.stdout.read(), b"orange") 511 512 def test_stdout_filedes(self): 513 # stdout is set to open file descriptor 514 tf = tempfile.TemporaryFile() 515 self.addCleanup(tf.close) 516 d = tf.fileno() 517 p = subprocess.Popen([sys.executable, "-c", 518 'import sys; sys.stdout.write("orange")'], 519 stdout=d) 520 p.wait() 521 os.lseek(d, 0, 0) 522 self.assertEqual(os.read(d, 1024), b"orange") 523 524 def test_stdout_fileobj(self): 525 # stdout is set to open file object 526 tf = tempfile.TemporaryFile() 527 self.addCleanup(tf.close) 528 p = subprocess.Popen([sys.executable, "-c", 529 'import sys; sys.stdout.write("orange")'], 530 stdout=tf) 531 p.wait() 532 tf.seek(0) 533 self.assertEqual(tf.read(), b"orange") 534 535 def test_stderr_pipe(self): 536 # stderr redirection 537 p = subprocess.Popen([sys.executable, "-c", 538 'import sys; sys.stderr.write("strawberry")'], 539 stderr=subprocess.PIPE) 540 with p: 541 self.assertStderrEqual(p.stderr.read(), b"strawberry") 542 543 def test_stderr_filedes(self): 544 # stderr is set to open file descriptor 545 tf = tempfile.TemporaryFile() 546 self.addCleanup(tf.close) 547 d = tf.fileno() 548 p = subprocess.Popen([sys.executable, "-c", 549 'import sys; sys.stderr.write("strawberry")'], 550 stderr=d) 551 p.wait() 552 os.lseek(d, 0, 0) 553 self.assertStderrEqual(os.read(d, 1024), b"strawberry") 554 555 def test_stderr_fileobj(self): 556 # stderr is set to open file object 557 tf = tempfile.TemporaryFile() 558 self.addCleanup(tf.close) 559 p = subprocess.Popen([sys.executable, "-c", 560 'import sys; sys.stderr.write("strawberry")'], 561 stderr=tf) 562 p.wait() 563 tf.seek(0) 564 self.assertStderrEqual(tf.read(), b"strawberry") 565 566 def test_stderr_redirect_with_no_stdout_redirect(self): 567 # test stderr=STDOUT while stdout=None (not set) 568 569 # - grandchild prints to stderr 570 # - child redirects grandchild's stderr to its stdout 571 # - the parent should get grandchild's stderr in child's stdout 572 p = subprocess.Popen([sys.executable, "-c", 573 'import sys, subprocess;' 574 'rc = subprocess.call([sys.executable, "-c",' 575 ' "import sys;"' 576 ' "sys.stderr.write(\'42\')"],' 577 ' stderr=subprocess.STDOUT);' 578 'sys.exit(rc)'], 579 stdout=subprocess.PIPE, 580 stderr=subprocess.PIPE) 581 stdout, stderr = p.communicate() 582 #NOTE: stdout should get stderr from grandchild 583 self.assertStderrEqual(stdout, b'42') 584 self.assertStderrEqual(stderr, b'') # should be empty 585 self.assertEqual(p.returncode, 0) 586 587 def test_stdout_stderr_pipe(self): 588 # capture stdout and stderr to the same pipe 589 p = subprocess.Popen([sys.executable, "-c", 590 'import sys;' 591 'sys.stdout.write("apple");' 592 'sys.stdout.flush();' 593 'sys.stderr.write("orange")'], 594 stdout=subprocess.PIPE, 595 stderr=subprocess.STDOUT) 596 with p: 597 self.assertStderrEqual(p.stdout.read(), b"appleorange") 598 599 def test_stdout_stderr_file(self): 600 # capture stdout and stderr to the same open file 601 tf = tempfile.TemporaryFile() 602 self.addCleanup(tf.close) 603 p = subprocess.Popen([sys.executable, "-c", 604 'import sys;' 605 'sys.stdout.write("apple");' 606 'sys.stdout.flush();' 607 'sys.stderr.write("orange")'], 608 stdout=tf, 609 stderr=tf) 610 p.wait() 611 tf.seek(0) 612 self.assertStderrEqual(tf.read(), b"appleorange") 613 614 def test_stdout_filedes_of_stdout(self): 615 # stdout is set to 1 (#1531862). 616 # To avoid printing the text on stdout, we do something similar to 617 # test_stdout_none (see above). The parent subprocess calls the child 618 # subprocess passing stdout=1, and this test uses stdout=PIPE in 619 # order to capture and check the output of the parent. See #11963. 620 code = ('import sys, subprocess; ' 621 'rc = subprocess.call([sys.executable, "-c", ' 622 ' "import os, sys; sys.exit(os.write(sys.stdout.fileno(), ' 623 'b\'test with stdout=1\'))"], stdout=1); ' 624 'assert rc == 18') 625 p = subprocess.Popen([sys.executable, "-c", code], 626 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 627 self.addCleanup(p.stdout.close) 628 self.addCleanup(p.stderr.close) 629 out, err = p.communicate() 630 self.assertEqual(p.returncode, 0, err) 631 self.assertEqual(out.rstrip(), b'test with stdout=1') 632 633 def test_stdout_devnull(self): 634 p = subprocess.Popen([sys.executable, "-c", 635 'for i in range(10240):' 636 'print("x" * 1024)'], 637 stdout=subprocess.DEVNULL) 638 p.wait() 639 self.assertEqual(p.stdout, None) 640 641 def test_stderr_devnull(self): 642 p = subprocess.Popen([sys.executable, "-c", 643 'import sys\n' 644 'for i in range(10240):' 645 'sys.stderr.write("x" * 1024)'], 646 stderr=subprocess.DEVNULL) 647 p.wait() 648 self.assertEqual(p.stderr, None) 649 650 def test_stdin_devnull(self): 651 p = subprocess.Popen([sys.executable, "-c", 652 'import sys;' 653 'sys.stdin.read(1)'], 654 stdin=subprocess.DEVNULL) 655 p.wait() 656 self.assertEqual(p.stdin, None) 657 658 def test_env(self): 659 newenv = os.environ.copy() 660 newenv["FRUIT"] = "orange" 661 with subprocess.Popen([sys.executable, "-c", 662 'import sys,os;' 663 'sys.stdout.write(os.getenv("FRUIT"))'], 664 stdout=subprocess.PIPE, 665 env=newenv) as p: 666 stdout, stderr = p.communicate() 667 self.assertEqual(stdout, b"orange") 668 669 # Windows requires at least the SYSTEMROOT environment variable to start 670 # Python 671 @unittest.skipIf(sys.platform == 'win32', 672 'cannot test an empty env on Windows') 673 @unittest.skipIf(sysconfig.get_config_var('Py_ENABLE_SHARED') == 1, 674 'The Python shared library cannot be loaded ' 675 'with an empty environment.') 676 def test_empty_env(self): 677 """Verify that env={} is as empty as possible.""" 678 679 def is_env_var_to_ignore(n): 680 """Determine if an environment variable is under our control.""" 681 # This excludes some __CF_* and VERSIONER_* keys MacOS insists 682 # on adding even when the environment in exec is empty. 683 # Gentoo sandboxes also force LD_PRELOAD and SANDBOX_* to exist. 684 return ('VERSIONER' in n or '__CF' in n or # MacOS 685 n == 'LD_PRELOAD' or n.startswith('SANDBOX') or # Gentoo 686 n == 'LC_CTYPE') # Locale coercion triggered 687 688 with subprocess.Popen([sys.executable, "-c", 689 'import os; print(list(os.environ.keys()))'], 690 stdout=subprocess.PIPE, env={}) as p: 691 stdout, stderr = p.communicate() 692 child_env_names = eval(stdout.strip()) 693 self.assertIsInstance(child_env_names, list) 694 child_env_names = [k for k in child_env_names 695 if not is_env_var_to_ignore(k)] 696 self.assertEqual(child_env_names, []) 697 698 def test_invalid_cmd(self): 699 # null character in the command name 700 cmd = sys.executable + '\0' 701 with self.assertRaises(ValueError): 702 subprocess.Popen([cmd, "-c", "pass"]) 703 704 # null character in the command argument 705 with self.assertRaises(ValueError): 706 subprocess.Popen([sys.executable, "-c", "pass#\0"]) 707 708 def test_invalid_env(self): 709 # null character in the environment variable name 710 newenv = os.environ.copy() 711 newenv["FRUIT\0VEGETABLE"] = "cabbage" 712 with self.assertRaises(ValueError): 713 subprocess.Popen(ZERO_RETURN_CMD, env=newenv) 714 715 # null character in the environment variable value 716 newenv = os.environ.copy() 717 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage" 718 with self.assertRaises(ValueError): 719 subprocess.Popen(ZERO_RETURN_CMD, env=newenv) 720 721 # equal character in the environment variable name 722 newenv = os.environ.copy() 723 newenv["FRUIT=ORANGE"] = "lemon" 724 with self.assertRaises(ValueError): 725 subprocess.Popen(ZERO_RETURN_CMD, env=newenv) 726 727 # equal character in the environment variable value 728 newenv = os.environ.copy() 729 newenv["FRUIT"] = "orange=lemon" 730 with subprocess.Popen([sys.executable, "-c", 731 'import sys, os;' 732 'sys.stdout.write(os.getenv("FRUIT"))'], 733 stdout=subprocess.PIPE, 734 env=newenv) as p: 735 stdout, stderr = p.communicate() 736 self.assertEqual(stdout, b"orange=lemon") 737 738 def test_communicate_stdin(self): 739 p = subprocess.Popen([sys.executable, "-c", 740 'import sys;' 741 'sys.exit(sys.stdin.read() == "pear")'], 742 stdin=subprocess.PIPE) 743 p.communicate(b"pear") 744 self.assertEqual(p.returncode, 1) 745 746 def test_communicate_stdout(self): 747 p = subprocess.Popen([sys.executable, "-c", 748 'import sys; sys.stdout.write("pineapple")'], 749 stdout=subprocess.PIPE) 750 (stdout, stderr) = p.communicate() 751 self.assertEqual(stdout, b"pineapple") 752 self.assertEqual(stderr, None) 753 754 def test_communicate_stderr(self): 755 p = subprocess.Popen([sys.executable, "-c", 756 'import sys; sys.stderr.write("pineapple")'], 757 stderr=subprocess.PIPE) 758 (stdout, stderr) = p.communicate() 759 self.assertEqual(stdout, None) 760 self.assertStderrEqual(stderr, b"pineapple") 761 762 def test_communicate(self): 763 p = subprocess.Popen([sys.executable, "-c", 764 'import sys,os;' 765 'sys.stderr.write("pineapple");' 766 'sys.stdout.write(sys.stdin.read())'], 767 stdin=subprocess.PIPE, 768 stdout=subprocess.PIPE, 769 stderr=subprocess.PIPE) 770 self.addCleanup(p.stdout.close) 771 self.addCleanup(p.stderr.close) 772 self.addCleanup(p.stdin.close) 773 (stdout, stderr) = p.communicate(b"banana") 774 self.assertEqual(stdout, b"banana") 775 self.assertStderrEqual(stderr, b"pineapple") 776 777 def test_communicate_timeout(self): 778 p = subprocess.Popen([sys.executable, "-c", 779 'import sys,os,time;' 780 'sys.stderr.write("pineapple\\n");' 781 'time.sleep(1);' 782 'sys.stderr.write("pear\\n");' 783 'sys.stdout.write(sys.stdin.read())'], 784 universal_newlines=True, 785 stdin=subprocess.PIPE, 786 stdout=subprocess.PIPE, 787 stderr=subprocess.PIPE) 788 self.assertRaises(subprocess.TimeoutExpired, p.communicate, "banana", 789 timeout=0.3) 790 # Make sure we can keep waiting for it, and that we get the whole output 791 # after it completes. 792 (stdout, stderr) = p.communicate() 793 self.assertEqual(stdout, "banana") 794 self.assertStderrEqual(stderr.encode(), b"pineapple\npear\n") 795 796 def test_communicate_timeout_large_output(self): 797 # Test an expiring timeout while the child is outputting lots of data. 798 p = subprocess.Popen([sys.executable, "-c", 799 'import sys,os,time;' 800 'sys.stdout.write("a" * (64 * 1024));' 801 'time.sleep(0.2);' 802 'sys.stdout.write("a" * (64 * 1024));' 803 'time.sleep(0.2);' 804 'sys.stdout.write("a" * (64 * 1024));' 805 'time.sleep(0.2);' 806 'sys.stdout.write("a" * (64 * 1024));'], 807 stdout=subprocess.PIPE) 808 self.assertRaises(subprocess.TimeoutExpired, p.communicate, timeout=0.4) 809 (stdout, _) = p.communicate() 810 self.assertEqual(len(stdout), 4 * 64 * 1024) 811 812 # Test for the fd leak reported in http://bugs.python.org/issue2791. 813 def test_communicate_pipe_fd_leak(self): 814 for stdin_pipe in (False, True): 815 for stdout_pipe in (False, True): 816 for stderr_pipe in (False, True): 817 options = {} 818 if stdin_pipe: 819 options['stdin'] = subprocess.PIPE 820 if stdout_pipe: 821 options['stdout'] = subprocess.PIPE 822 if stderr_pipe: 823 options['stderr'] = subprocess.PIPE 824 if not options: 825 continue 826 p = subprocess.Popen(ZERO_RETURN_CMD, **options) 827 p.communicate() 828 if p.stdin is not None: 829 self.assertTrue(p.stdin.closed) 830 if p.stdout is not None: 831 self.assertTrue(p.stdout.closed) 832 if p.stderr is not None: 833 self.assertTrue(p.stderr.closed) 834 835 def test_communicate_returns(self): 836 # communicate() should return None if no redirection is active 837 p = subprocess.Popen([sys.executable, "-c", 838 "import sys; sys.exit(47)"]) 839 (stdout, stderr) = p.communicate() 840 self.assertEqual(stdout, None) 841 self.assertEqual(stderr, None) 842 843 def test_communicate_pipe_buf(self): 844 # communicate() with writes larger than pipe_buf 845 # This test will probably deadlock rather than fail, if 846 # communicate() does not work properly. 847 x, y = os.pipe() 848 os.close(x) 849 os.close(y) 850 p = subprocess.Popen([sys.executable, "-c", 851 'import sys,os;' 852 'sys.stdout.write(sys.stdin.read(47));' 853 'sys.stderr.write("x" * %d);' 854 'sys.stdout.write(sys.stdin.read())' % 855 support.PIPE_MAX_SIZE], 856 stdin=subprocess.PIPE, 857 stdout=subprocess.PIPE, 858 stderr=subprocess.PIPE) 859 self.addCleanup(p.stdout.close) 860 self.addCleanup(p.stderr.close) 861 self.addCleanup(p.stdin.close) 862 string_to_write = b"a" * support.PIPE_MAX_SIZE 863 (stdout, stderr) = p.communicate(string_to_write) 864 self.assertEqual(stdout, string_to_write) 865 866 def test_writes_before_communicate(self): 867 # stdin.write before communicate() 868 p = subprocess.Popen([sys.executable, "-c", 869 'import sys,os;' 870 'sys.stdout.write(sys.stdin.read())'], 871 stdin=subprocess.PIPE, 872 stdout=subprocess.PIPE, 873 stderr=subprocess.PIPE) 874 self.addCleanup(p.stdout.close) 875 self.addCleanup(p.stderr.close) 876 self.addCleanup(p.stdin.close) 877 p.stdin.write(b"banana") 878 (stdout, stderr) = p.communicate(b"split") 879 self.assertEqual(stdout, b"bananasplit") 880 self.assertStderrEqual(stderr, b"") 881 882 def test_universal_newlines_and_text(self): 883 args = [ 884 sys.executable, "-c", 885 'import sys,os;' + SETBINARY + 886 'buf = sys.stdout.buffer;' 887 'buf.write(sys.stdin.readline().encode());' 888 'buf.flush();' 889 'buf.write(b"line2\\n");' 890 'buf.flush();' 891 'buf.write(sys.stdin.read().encode());' 892 'buf.flush();' 893 'buf.write(b"line4\\n");' 894 'buf.flush();' 895 'buf.write(b"line5\\r\\n");' 896 'buf.flush();' 897 'buf.write(b"line6\\r");' 898 'buf.flush();' 899 'buf.write(b"\\nline7");' 900 'buf.flush();' 901 'buf.write(b"\\nline8");'] 902 903 for extra_kwarg in ('universal_newlines', 'text'): 904 p = subprocess.Popen(args, **{'stdin': subprocess.PIPE, 905 'stdout': subprocess.PIPE, 906 extra_kwarg: True}) 907 with p: 908 p.stdin.write("line1\n") 909 p.stdin.flush() 910 self.assertEqual(p.stdout.readline(), "line1\n") 911 p.stdin.write("line3\n") 912 p.stdin.close() 913 self.addCleanup(p.stdout.close) 914 self.assertEqual(p.stdout.readline(), 915 "line2\n") 916 self.assertEqual(p.stdout.read(6), 917 "line3\n") 918 self.assertEqual(p.stdout.read(), 919 "line4\nline5\nline6\nline7\nline8") 920 921 def test_universal_newlines_communicate(self): 922 # universal newlines through communicate() 923 p = subprocess.Popen([sys.executable, "-c", 924 'import sys,os;' + SETBINARY + 925 'buf = sys.stdout.buffer;' 926 'buf.write(b"line2\\n");' 927 'buf.flush();' 928 'buf.write(b"line4\\n");' 929 'buf.flush();' 930 'buf.write(b"line5\\r\\n");' 931 'buf.flush();' 932 'buf.write(b"line6\\r");' 933 'buf.flush();' 934 'buf.write(b"\\nline7");' 935 'buf.flush();' 936 'buf.write(b"\\nline8");'], 937 stderr=subprocess.PIPE, 938 stdout=subprocess.PIPE, 939 universal_newlines=1) 940 self.addCleanup(p.stdout.close) 941 self.addCleanup(p.stderr.close) 942 (stdout, stderr) = p.communicate() 943 self.assertEqual(stdout, 944 "line2\nline4\nline5\nline6\nline7\nline8") 945 946 def test_universal_newlines_communicate_stdin(self): 947 # universal newlines through communicate(), with only stdin 948 p = subprocess.Popen([sys.executable, "-c", 949 'import sys,os;' + SETBINARY + textwrap.dedent(''' 950 s = sys.stdin.readline() 951 assert s == "line1\\n", repr(s) 952 s = sys.stdin.read() 953 assert s == "line3\\n", repr(s) 954 ''')], 955 stdin=subprocess.PIPE, 956 universal_newlines=1) 957 (stdout, stderr) = p.communicate("line1\nline3\n") 958 self.assertEqual(p.returncode, 0) 959 960 def test_universal_newlines_communicate_input_none(self): 961 # Test communicate(input=None) with universal newlines. 962 # 963 # We set stdout to PIPE because, as of this writing, a different 964 # code path is tested when the number of pipes is zero or one. 965 p = subprocess.Popen(ZERO_RETURN_CMD, 966 stdin=subprocess.PIPE, 967 stdout=subprocess.PIPE, 968 universal_newlines=True) 969 p.communicate() 970 self.assertEqual(p.returncode, 0) 971 972 def test_universal_newlines_communicate_stdin_stdout_stderr(self): 973 # universal newlines through communicate(), with stdin, stdout, stderr 974 p = subprocess.Popen([sys.executable, "-c", 975 'import sys,os;' + SETBINARY + textwrap.dedent(''' 976 s = sys.stdin.buffer.readline() 977 sys.stdout.buffer.write(s) 978 sys.stdout.buffer.write(b"line2\\r") 979 sys.stderr.buffer.write(b"eline2\\n") 980 s = sys.stdin.buffer.read() 981 sys.stdout.buffer.write(s) 982 sys.stdout.buffer.write(b"line4\\n") 983 sys.stdout.buffer.write(b"line5\\r\\n") 984 sys.stderr.buffer.write(b"eline6\\r") 985 sys.stderr.buffer.write(b"eline7\\r\\nz") 986 ''')], 987 stdin=subprocess.PIPE, 988 stderr=subprocess.PIPE, 989 stdout=subprocess.PIPE, 990 universal_newlines=True) 991 self.addCleanup(p.stdout.close) 992 self.addCleanup(p.stderr.close) 993 (stdout, stderr) = p.communicate("line1\nline3\n") 994 self.assertEqual(p.returncode, 0) 995 self.assertEqual("line1\nline2\nline3\nline4\nline5\n", stdout) 996 # Python debug build push something like "[42442 refs]\n" 997 # to stderr at exit of subprocess. 998 # Don't use assertStderrEqual because it strips CR and LF from output. 999 self.assertTrue(stderr.startswith("eline2\neline6\neline7\n")) 1000 1001 def test_universal_newlines_communicate_encodings(self): 1002 # Check that universal newlines mode works for various encodings, 1003 # in particular for encodings in the UTF-16 and UTF-32 families. 1004 # See issue #15595. 1005 # 1006 # UTF-16 and UTF-32-BE are sufficient to check both with BOM and 1007 # without, and UTF-16 and UTF-32. 1008 for encoding in ['utf-16', 'utf-32-be']: 1009 code = ("import sys; " 1010 r"sys.stdout.buffer.write('1\r\n2\r3\n4'.encode('%s'))" % 1011 encoding) 1012 args = [sys.executable, '-c', code] 1013 # We set stdin to be non-None because, as of this writing, 1014 # a different code path is used when the number of pipes is 1015 # zero or one. 1016 popen = subprocess.Popen(args, 1017 stdin=subprocess.PIPE, 1018 stdout=subprocess.PIPE, 1019 encoding=encoding) 1020 stdout, stderr = popen.communicate(input='') 1021 self.assertEqual(stdout, '1\n2\n3\n4') 1022 1023 def test_communicate_errors(self): 1024 for errors, expected in [ 1025 ('ignore', ''), 1026 ('replace', '\ufffd\ufffd'), 1027 ('surrogateescape', '\udc80\udc80'), 1028 ('backslashreplace', '\\x80\\x80'), 1029 ]: 1030 code = ("import sys; " 1031 r"sys.stdout.buffer.write(b'[\x80\x80]')") 1032 args = [sys.executable, '-c', code] 1033 # We set stdin to be non-None because, as of this writing, 1034 # a different code path is used when the number of pipes is 1035 # zero or one. 1036 popen = subprocess.Popen(args, 1037 stdin=subprocess.PIPE, 1038 stdout=subprocess.PIPE, 1039 encoding='utf-8', 1040 errors=errors) 1041 stdout, stderr = popen.communicate(input='') 1042 self.assertEqual(stdout, '[{}]'.format(expected)) 1043 1044 def test_no_leaking(self): 1045 # Make sure we leak no resources 1046 if not mswindows: 1047 max_handles = 1026 # too much for most UNIX systems 1048 else: 1049 max_handles = 2050 # too much for (at least some) Windows setups 1050 handles = [] 1051 tmpdir = tempfile.mkdtemp() 1052 try: 1053 for i in range(max_handles): 1054 try: 1055 tmpfile = os.path.join(tmpdir, support.TESTFN) 1056 handles.append(os.open(tmpfile, os.O_WRONLY|os.O_CREAT)) 1057 except OSError as e: 1058 if e.errno != errno.EMFILE: 1059 raise 1060 break 1061 else: 1062 self.skipTest("failed to reach the file descriptor limit " 1063 "(tried %d)" % max_handles) 1064 # Close a couple of them (should be enough for a subprocess) 1065 for i in range(10): 1066 os.close(handles.pop()) 1067 # Loop creating some subprocesses. If one of them leaks some fds, 1068 # the next loop iteration will fail by reaching the max fd limit. 1069 for i in range(15): 1070 p = subprocess.Popen([sys.executable, "-c", 1071 "import sys;" 1072 "sys.stdout.write(sys.stdin.read())"], 1073 stdin=subprocess.PIPE, 1074 stdout=subprocess.PIPE, 1075 stderr=subprocess.PIPE) 1076 data = p.communicate(b"lime")[0] 1077 self.assertEqual(data, b"lime") 1078 finally: 1079 for h in handles: 1080 os.close(h) 1081 shutil.rmtree(tmpdir) 1082 1083 def test_list2cmdline(self): 1084 self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']), 1085 '"a b c" d e') 1086 self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']), 1087 'ab\\"c \\ d') 1088 self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']), 1089 'ab\\"c " \\\\" d') 1090 self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']), 1091 'a\\\\\\b "de fg" h') 1092 self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']), 1093 'a\\\\\\"b c d') 1094 self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']), 1095 '"a\\\\b c" d e') 1096 self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']), 1097 '"a\\\\b\\ c" d e') 1098 self.assertEqual(subprocess.list2cmdline(['ab', '']), 1099 'ab ""') 1100 1101 def test_poll(self): 1102 p = subprocess.Popen([sys.executable, "-c", 1103 "import os; os.read(0, 1)"], 1104 stdin=subprocess.PIPE) 1105 self.addCleanup(p.stdin.close) 1106 self.assertIsNone(p.poll()) 1107 os.write(p.stdin.fileno(), b'A') 1108 p.wait() 1109 # Subsequent invocations should just return the returncode 1110 self.assertEqual(p.poll(), 0) 1111 1112 def test_wait(self): 1113 p = subprocess.Popen(ZERO_RETURN_CMD) 1114 self.assertEqual(p.wait(), 0) 1115 # Subsequent invocations should just return the returncode 1116 self.assertEqual(p.wait(), 0) 1117 1118 def test_wait_timeout(self): 1119 p = subprocess.Popen([sys.executable, 1120 "-c", "import time; time.sleep(0.3)"]) 1121 with self.assertRaises(subprocess.TimeoutExpired) as c: 1122 p.wait(timeout=0.0001) 1123 self.assertIn("0.0001", str(c.exception)) # For coverage of __str__. 1124 # Some heavily loaded buildbots (sparc Debian 3.x) require this much 1125 # time to start. 1126 self.assertEqual(p.wait(timeout=3), 0) 1127 1128 def test_invalid_bufsize(self): 1129 # an invalid type of the bufsize argument should raise 1130 # TypeError. 1131 with self.assertRaises(TypeError): 1132 subprocess.Popen(ZERO_RETURN_CMD, "orange") 1133 1134 def test_bufsize_is_none(self): 1135 # bufsize=None should be the same as bufsize=0. 1136 p = subprocess.Popen(ZERO_RETURN_CMD, None) 1137 self.assertEqual(p.wait(), 0) 1138 # Again with keyword arg 1139 p = subprocess.Popen(ZERO_RETURN_CMD, bufsize=None) 1140 self.assertEqual(p.wait(), 0) 1141 1142 def _test_bufsize_equal_one(self, line, expected, universal_newlines): 1143 # subprocess may deadlock with bufsize=1, see issue #21332 1144 with subprocess.Popen([sys.executable, "-c", "import sys;" 1145 "sys.stdout.write(sys.stdin.readline());" 1146 "sys.stdout.flush()"], 1147 stdin=subprocess.PIPE, 1148 stdout=subprocess.PIPE, 1149 stderr=subprocess.DEVNULL, 1150 bufsize=1, 1151 universal_newlines=universal_newlines) as p: 1152 p.stdin.write(line) # expect that it flushes the line in text mode 1153 os.close(p.stdin.fileno()) # close it without flushing the buffer 1154 read_line = p.stdout.readline() 1155 with support.SuppressCrashReport(): 1156 try: 1157 p.stdin.close() 1158 except OSError: 1159 pass 1160 p.stdin = None 1161 self.assertEqual(p.returncode, 0) 1162 self.assertEqual(read_line, expected) 1163 1164 def test_bufsize_equal_one_text_mode(self): 1165 # line is flushed in text mode with bufsize=1. 1166 # we should get the full line in return 1167 line = "line\n" 1168 self._test_bufsize_equal_one(line, line, universal_newlines=True) 1169 1170 def test_bufsize_equal_one_binary_mode(self): 1171 # line is not flushed in binary mode with bufsize=1. 1172 # we should get empty response 1173 line = b'line' + os.linesep.encode() # assume ascii-based locale 1174 with self.assertWarnsRegex(RuntimeWarning, 'line buffering'): 1175 self._test_bufsize_equal_one(line, b'', universal_newlines=False) 1176 1177 def test_leaking_fds_on_error(self): 1178 # see bug #5179: Popen leaks file descriptors to PIPEs if 1179 # the child fails to execute; this will eventually exhaust 1180 # the maximum number of open fds. 1024 seems a very common 1181 # value for that limit, but Windows has 2048, so we loop 1182 # 1024 times (each call leaked two fds). 1183 for i in range(1024): 1184 with self.assertRaises(NONEXISTING_ERRORS): 1185 subprocess.Popen(NONEXISTING_CMD, 1186 stdout=subprocess.PIPE, 1187 stderr=subprocess.PIPE) 1188 1189 def test_nonexisting_with_pipes(self): 1190 # bpo-30121: Popen with pipes must close properly pipes on error. 1191 # Previously, os.close() was called with a Windows handle which is not 1192 # a valid file descriptor. 1193 # 1194 # Run the test in a subprocess to control how the CRT reports errors 1195 # and to get stderr content. 1196 try: 1197 import msvcrt 1198 msvcrt.CrtSetReportMode 1199 except (AttributeError, ImportError): 1200 self.skipTest("need msvcrt.CrtSetReportMode") 1201 1202 code = textwrap.dedent(f""" 1203 import msvcrt 1204 import subprocess 1205 1206 cmd = {NONEXISTING_CMD!r} 1207 1208 for report_type in [msvcrt.CRT_WARN, 1209 msvcrt.CRT_ERROR, 1210 msvcrt.CRT_ASSERT]: 1211 msvcrt.CrtSetReportMode(report_type, msvcrt.CRTDBG_MODE_FILE) 1212 msvcrt.CrtSetReportFile(report_type, msvcrt.CRTDBG_FILE_STDERR) 1213 1214 try: 1215 subprocess.Popen(cmd, 1216 stdout=subprocess.PIPE, 1217 stderr=subprocess.PIPE) 1218 except OSError: 1219 pass 1220 """) 1221 cmd = [sys.executable, "-c", code] 1222 proc = subprocess.Popen(cmd, 1223 stderr=subprocess.PIPE, 1224 universal_newlines=True) 1225 with proc: 1226 stderr = proc.communicate()[1] 1227 self.assertEqual(stderr, "") 1228 self.assertEqual(proc.returncode, 0) 1229 1230 def test_double_close_on_error(self): 1231 # Issue #18851 1232 fds = [] 1233 def open_fds(): 1234 for i in range(20): 1235 fds.extend(os.pipe()) 1236 time.sleep(0.001) 1237 t = threading.Thread(target=open_fds) 1238 t.start() 1239 try: 1240 with self.assertRaises(EnvironmentError): 1241 subprocess.Popen(NONEXISTING_CMD, 1242 stdin=subprocess.PIPE, 1243 stdout=subprocess.PIPE, 1244 stderr=subprocess.PIPE) 1245 finally: 1246 t.join() 1247 exc = None 1248 for fd in fds: 1249 # If a double close occurred, some of those fds will 1250 # already have been closed by mistake, and os.close() 1251 # here will raise. 1252 try: 1253 os.close(fd) 1254 except OSError as e: 1255 exc = e 1256 if exc is not None: 1257 raise exc 1258 1259 def test_threadsafe_wait(self): 1260 """Issue21291: Popen.wait() needs to be threadsafe for returncode.""" 1261 proc = subprocess.Popen([sys.executable, '-c', 1262 'import time; time.sleep(12)']) 1263 self.assertEqual(proc.returncode, None) 1264 results = [] 1265 1266 def kill_proc_timer_thread(): 1267 results.append(('thread-start-poll-result', proc.poll())) 1268 # terminate it from the thread and wait for the result. 1269 proc.kill() 1270 proc.wait() 1271 results.append(('thread-after-kill-and-wait', proc.returncode)) 1272 # this wait should be a no-op given the above. 1273 proc.wait() 1274 results.append(('thread-after-second-wait', proc.returncode)) 1275 1276 # This is a timing sensitive test, the failure mode is 1277 # triggered when both the main thread and this thread are in 1278 # the wait() call at once. The delay here is to allow the 1279 # main thread to most likely be blocked in its wait() call. 1280 t = threading.Timer(0.2, kill_proc_timer_thread) 1281 t.start() 1282 1283 if mswindows: 1284 expected_errorcode = 1 1285 else: 1286 # Should be -9 because of the proc.kill() from the thread. 1287 expected_errorcode = -9 1288 1289 # Wait for the process to finish; the thread should kill it 1290 # long before it finishes on its own. Supplying a timeout 1291 # triggers a different code path for better coverage. 1292 proc.wait(timeout=20) 1293 self.assertEqual(proc.returncode, expected_errorcode, 1294 msg="unexpected result in wait from main thread") 1295 1296 # This should be a no-op with no change in returncode. 1297 proc.wait() 1298 self.assertEqual(proc.returncode, expected_errorcode, 1299 msg="unexpected result in second main wait.") 1300 1301 t.join() 1302 # Ensure that all of the thread results are as expected. 1303 # When a race condition occurs in wait(), the returncode could 1304 # be set by the wrong thread that doesn't actually have it 1305 # leading to an incorrect value. 1306 self.assertEqual([('thread-start-poll-result', None), 1307 ('thread-after-kill-and-wait', expected_errorcode), 1308 ('thread-after-second-wait', expected_errorcode)], 1309 results) 1310 1311 def test_issue8780(self): 1312 # Ensure that stdout is inherited from the parent 1313 # if stdout=PIPE is not used 1314 code = ';'.join(( 1315 'import subprocess, sys', 1316 'retcode = subprocess.call(' 1317 "[sys.executable, '-c', 'print(\"Hello World!\")'])", 1318 'assert retcode == 0')) 1319 output = subprocess.check_output([sys.executable, '-c', code]) 1320 self.assertTrue(output.startswith(b'Hello World!'), ascii(output)) 1321 1322 def test_handles_closed_on_exception(self): 1323 # If CreateProcess exits with an error, ensure the 1324 # duplicate output handles are released 1325 ifhandle, ifname = tempfile.mkstemp() 1326 ofhandle, ofname = tempfile.mkstemp() 1327 efhandle, efname = tempfile.mkstemp() 1328 try: 1329 subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle, 1330 stderr=efhandle) 1331 except OSError: 1332 os.close(ifhandle) 1333 os.remove(ifname) 1334 os.close(ofhandle) 1335 os.remove(ofname) 1336 os.close(efhandle) 1337 os.remove(efname) 1338 self.assertFalse(os.path.exists(ifname)) 1339 self.assertFalse(os.path.exists(ofname)) 1340 self.assertFalse(os.path.exists(efname)) 1341 1342 def test_communicate_epipe(self): 1343 # Issue 10963: communicate() should hide EPIPE 1344 p = subprocess.Popen(ZERO_RETURN_CMD, 1345 stdin=subprocess.PIPE, 1346 stdout=subprocess.PIPE, 1347 stderr=subprocess.PIPE) 1348 self.addCleanup(p.stdout.close) 1349 self.addCleanup(p.stderr.close) 1350 self.addCleanup(p.stdin.close) 1351 p.communicate(b"x" * 2**20) 1352 1353 def test_communicate_epipe_only_stdin(self): 1354 # Issue 10963: communicate() should hide EPIPE 1355 p = subprocess.Popen(ZERO_RETURN_CMD, 1356 stdin=subprocess.PIPE) 1357 self.addCleanup(p.stdin.close) 1358 p.wait() 1359 p.communicate(b"x" * 2**20) 1360 1361 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 1362 "Requires signal.SIGUSR1") 1363 @unittest.skipUnless(hasattr(os, 'kill'), 1364 "Requires os.kill") 1365 @unittest.skipUnless(hasattr(os, 'getppid'), 1366 "Requires os.getppid") 1367 def test_communicate_eintr(self): 1368 # Issue #12493: communicate() should handle EINTR 1369 def handler(signum, frame): 1370 pass 1371 old_handler = signal.signal(signal.SIGUSR1, handler) 1372 self.addCleanup(signal.signal, signal.SIGUSR1, old_handler) 1373 1374 args = [sys.executable, "-c", 1375 'import os, signal;' 1376 'os.kill(os.getppid(), signal.SIGUSR1)'] 1377 for stream in ('stdout', 'stderr'): 1378 kw = {stream: subprocess.PIPE} 1379 with subprocess.Popen(args, **kw) as process: 1380 # communicate() will be interrupted by SIGUSR1 1381 process.communicate() 1382 1383 1384 # This test is Linux-ish specific for simplicity to at least have 1385 # some coverage. It is not a platform specific bug. 1386 @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()), 1387 "Linux specific") 1388 def test_failed_child_execute_fd_leak(self): 1389 """Test for the fork() failure fd leak reported in issue16327.""" 1390 fd_directory = '/proc/%d/fd' % os.getpid() 1391 fds_before_popen = os.listdir(fd_directory) 1392 with self.assertRaises(PopenTestException): 1393 PopenExecuteChildRaises( 1394 ZERO_RETURN_CMD, stdin=subprocess.PIPE, 1395 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 1396 1397 # NOTE: This test doesn't verify that the real _execute_child 1398 # does not close the file descriptors itself on the way out 1399 # during an exception. Code inspection has confirmed that. 1400 1401 fds_after_exception = os.listdir(fd_directory) 1402 self.assertEqual(fds_before_popen, fds_after_exception) 1403 1404 @unittest.skipIf(mswindows, "behavior currently not supported on Windows") 1405 def test_file_not_found_includes_filename(self): 1406 with self.assertRaises(FileNotFoundError) as c: 1407 subprocess.call(['/opt/nonexistent_binary', 'with', 'some', 'args']) 1408 self.assertEqual(c.exception.filename, '/opt/nonexistent_binary') 1409 1410 @unittest.skipIf(mswindows, "behavior currently not supported on Windows") 1411 def test_file_not_found_with_bad_cwd(self): 1412 with self.assertRaises(FileNotFoundError) as c: 1413 subprocess.Popen(['exit', '0'], cwd='/some/nonexistent/directory') 1414 self.assertEqual(c.exception.filename, '/some/nonexistent/directory') 1415 1416 1417class RunFuncTestCase(BaseTestCase): 1418 def run_python(self, code, **kwargs): 1419 """Run Python code in a subprocess using subprocess.run""" 1420 argv = [sys.executable, "-c", code] 1421 return subprocess.run(argv, **kwargs) 1422 1423 def test_returncode(self): 1424 # call() function with sequence argument 1425 cp = self.run_python("import sys; sys.exit(47)") 1426 self.assertEqual(cp.returncode, 47) 1427 with self.assertRaises(subprocess.CalledProcessError): 1428 cp.check_returncode() 1429 1430 def test_check(self): 1431 with self.assertRaises(subprocess.CalledProcessError) as c: 1432 self.run_python("import sys; sys.exit(47)", check=True) 1433 self.assertEqual(c.exception.returncode, 47) 1434 1435 def test_check_zero(self): 1436 # check_returncode shouldn't raise when returncode is zero 1437 cp = subprocess.run(ZERO_RETURN_CMD, check=True) 1438 self.assertEqual(cp.returncode, 0) 1439 1440 def test_timeout(self): 1441 # run() function with timeout argument; we want to test that the child 1442 # process gets killed when the timeout expires. If the child isn't 1443 # killed, this call will deadlock since subprocess.run waits for the 1444 # child. 1445 with self.assertRaises(subprocess.TimeoutExpired): 1446 self.run_python("while True: pass", timeout=0.0001) 1447 1448 def test_capture_stdout(self): 1449 # capture stdout with zero return code 1450 cp = self.run_python("print('BDFL')", stdout=subprocess.PIPE) 1451 self.assertIn(b'BDFL', cp.stdout) 1452 1453 def test_capture_stderr(self): 1454 cp = self.run_python("import sys; sys.stderr.write('BDFL')", 1455 stderr=subprocess.PIPE) 1456 self.assertIn(b'BDFL', cp.stderr) 1457 1458 def test_check_output_stdin_arg(self): 1459 # run() can be called with stdin set to a file 1460 tf = tempfile.TemporaryFile() 1461 self.addCleanup(tf.close) 1462 tf.write(b'pear') 1463 tf.seek(0) 1464 cp = self.run_python( 1465 "import sys; sys.stdout.write(sys.stdin.read().upper())", 1466 stdin=tf, stdout=subprocess.PIPE) 1467 self.assertIn(b'PEAR', cp.stdout) 1468 1469 def test_check_output_input_arg(self): 1470 # check_output() can be called with input set to a string 1471 cp = self.run_python( 1472 "import sys; sys.stdout.write(sys.stdin.read().upper())", 1473 input=b'pear', stdout=subprocess.PIPE) 1474 self.assertIn(b'PEAR', cp.stdout) 1475 1476 def test_check_output_stdin_with_input_arg(self): 1477 # run() refuses to accept 'stdin' with 'input' 1478 tf = tempfile.TemporaryFile() 1479 self.addCleanup(tf.close) 1480 tf.write(b'pear') 1481 tf.seek(0) 1482 with self.assertRaises(ValueError, 1483 msg="Expected ValueError when stdin and input args supplied.") as c: 1484 output = self.run_python("print('will not be run')", 1485 stdin=tf, input=b'hare') 1486 self.assertIn('stdin', c.exception.args[0]) 1487 self.assertIn('input', c.exception.args[0]) 1488 1489 def test_check_output_timeout(self): 1490 with self.assertRaises(subprocess.TimeoutExpired) as c: 1491 cp = self.run_python(( 1492 "import sys, time\n" 1493 "sys.stdout.write('BDFL')\n" 1494 "sys.stdout.flush()\n" 1495 "time.sleep(3600)"), 1496 # Some heavily loaded buildbots (sparc Debian 3.x) require 1497 # this much time to start and print. 1498 timeout=3, stdout=subprocess.PIPE) 1499 self.assertEqual(c.exception.output, b'BDFL') 1500 # output is aliased to stdout 1501 self.assertEqual(c.exception.stdout, b'BDFL') 1502 1503 def test_run_kwargs(self): 1504 newenv = os.environ.copy() 1505 newenv["FRUIT"] = "banana" 1506 cp = self.run_python(('import sys, os;' 1507 'sys.exit(33 if os.getenv("FRUIT")=="banana" else 31)'), 1508 env=newenv) 1509 self.assertEqual(cp.returncode, 33) 1510 1511 def test_run_with_pathlike_path(self): 1512 # bpo-31961: test run(pathlike_object) 1513 # the name of a command that can be run without 1514 # any argumenets that exit fast 1515 prog = 'tree.com' if mswindows else 'ls' 1516 path = shutil.which(prog) 1517 if path is None: 1518 self.skipTest(f'{prog} required for this test') 1519 path = FakePath(path) 1520 res = subprocess.run(path, stdout=subprocess.DEVNULL) 1521 self.assertEqual(res.returncode, 0) 1522 with self.assertRaises(TypeError): 1523 subprocess.run(path, stdout=subprocess.DEVNULL, shell=True) 1524 1525 def test_run_with_bytes_path_and_arguments(self): 1526 # bpo-31961: test run([bytes_object, b'additional arguments']) 1527 path = os.fsencode(sys.executable) 1528 args = [path, '-c', b'import sys; sys.exit(57)'] 1529 res = subprocess.run(args) 1530 self.assertEqual(res.returncode, 57) 1531 1532 def test_run_with_pathlike_path_and_arguments(self): 1533 # bpo-31961: test run([pathlike_object, 'additional arguments']) 1534 path = FakePath(sys.executable) 1535 args = [path, '-c', 'import sys; sys.exit(57)'] 1536 res = subprocess.run(args) 1537 self.assertEqual(res.returncode, 57) 1538 1539 def test_capture_output(self): 1540 cp = self.run_python(("import sys;" 1541 "sys.stdout.write('BDFL'); " 1542 "sys.stderr.write('FLUFL')"), 1543 capture_output=True) 1544 self.assertIn(b'BDFL', cp.stdout) 1545 self.assertIn(b'FLUFL', cp.stderr) 1546 1547 def test_stdout_with_capture_output_arg(self): 1548 # run() refuses to accept 'stdout' with 'capture_output' 1549 tf = tempfile.TemporaryFile() 1550 self.addCleanup(tf.close) 1551 with self.assertRaises(ValueError, 1552 msg=("Expected ValueError when stdout and capture_output " 1553 "args supplied.")) as c: 1554 output = self.run_python("print('will not be run')", 1555 capture_output=True, stdout=tf) 1556 self.assertIn('stdout', c.exception.args[0]) 1557 self.assertIn('capture_output', c.exception.args[0]) 1558 1559 def test_stderr_with_capture_output_arg(self): 1560 # run() refuses to accept 'stderr' with 'capture_output' 1561 tf = tempfile.TemporaryFile() 1562 self.addCleanup(tf.close) 1563 with self.assertRaises(ValueError, 1564 msg=("Expected ValueError when stderr and capture_output " 1565 "args supplied.")) as c: 1566 output = self.run_python("print('will not be run')", 1567 capture_output=True, stderr=tf) 1568 self.assertIn('stderr', c.exception.args[0]) 1569 self.assertIn('capture_output', c.exception.args[0]) 1570 1571 # This test _might_ wind up a bit fragile on loaded build+test machines 1572 # as it depends on the timing with wide enough margins for normal situations 1573 # but does assert that it happened "soon enough" to believe the right thing 1574 # happened. 1575 @unittest.skipIf(mswindows, "requires posix like 'sleep' shell command") 1576 def test_run_with_shell_timeout_and_capture_output(self): 1577 """Output capturing after a timeout mustn't hang forever on open filehandles.""" 1578 before_secs = time.monotonic() 1579 try: 1580 subprocess.run('sleep 3', shell=True, timeout=0.1, 1581 capture_output=True) # New session unspecified. 1582 except subprocess.TimeoutExpired as exc: 1583 after_secs = time.monotonic() 1584 stacks = traceback.format_exc() # assertRaises doesn't give this. 1585 else: 1586 self.fail("TimeoutExpired not raised.") 1587 self.assertLess(after_secs - before_secs, 1.5, 1588 msg="TimeoutExpired was delayed! Bad traceback:\n```\n" 1589 f"{stacks}```") 1590 1591 1592@unittest.skipIf(mswindows, "POSIX specific tests") 1593class POSIXProcessTestCase(BaseTestCase): 1594 1595 def setUp(self): 1596 super().setUp() 1597 self._nonexistent_dir = "/_this/pa.th/does/not/exist" 1598 1599 def _get_chdir_exception(self): 1600 try: 1601 os.chdir(self._nonexistent_dir) 1602 except OSError as e: 1603 # This avoids hard coding the errno value or the OS perror() 1604 # string and instead capture the exception that we want to see 1605 # below for comparison. 1606 desired_exception = e 1607 else: 1608 self.fail("chdir to nonexistent directory %s succeeded." % 1609 self._nonexistent_dir) 1610 return desired_exception 1611 1612 def test_exception_cwd(self): 1613 """Test error in the child raised in the parent for a bad cwd.""" 1614 desired_exception = self._get_chdir_exception() 1615 try: 1616 p = subprocess.Popen([sys.executable, "-c", ""], 1617 cwd=self._nonexistent_dir) 1618 except OSError as e: 1619 # Test that the child process chdir failure actually makes 1620 # it up to the parent process as the correct exception. 1621 self.assertEqual(desired_exception.errno, e.errno) 1622 self.assertEqual(desired_exception.strerror, e.strerror) 1623 self.assertEqual(desired_exception.filename, e.filename) 1624 else: 1625 self.fail("Expected OSError: %s" % desired_exception) 1626 1627 def test_exception_bad_executable(self): 1628 """Test error in the child raised in the parent for a bad executable.""" 1629 desired_exception = self._get_chdir_exception() 1630 try: 1631 p = subprocess.Popen([sys.executable, "-c", ""], 1632 executable=self._nonexistent_dir) 1633 except OSError as e: 1634 # Test that the child process exec failure actually makes 1635 # it up to the parent process as the correct exception. 1636 self.assertEqual(desired_exception.errno, e.errno) 1637 self.assertEqual(desired_exception.strerror, e.strerror) 1638 self.assertEqual(desired_exception.filename, e.filename) 1639 else: 1640 self.fail("Expected OSError: %s" % desired_exception) 1641 1642 def test_exception_bad_args_0(self): 1643 """Test error in the child raised in the parent for a bad args[0].""" 1644 desired_exception = self._get_chdir_exception() 1645 try: 1646 p = subprocess.Popen([self._nonexistent_dir, "-c", ""]) 1647 except OSError as e: 1648 # Test that the child process exec failure actually makes 1649 # it up to the parent process as the correct exception. 1650 self.assertEqual(desired_exception.errno, e.errno) 1651 self.assertEqual(desired_exception.strerror, e.strerror) 1652 self.assertEqual(desired_exception.filename, e.filename) 1653 else: 1654 self.fail("Expected OSError: %s" % desired_exception) 1655 1656 # We mock the __del__ method for Popen in the next two tests 1657 # because it does cleanup based on the pid returned by fork_exec 1658 # along with issuing a resource warning if it still exists. Since 1659 # we don't actually spawn a process in these tests we can forego 1660 # the destructor. An alternative would be to set _child_created to 1661 # False before the destructor is called but there is no easy way 1662 # to do that 1663 class PopenNoDestructor(subprocess.Popen): 1664 def __del__(self): 1665 pass 1666 1667 @mock.patch("subprocess._posixsubprocess.fork_exec") 1668 def test_exception_errpipe_normal(self, fork_exec): 1669 """Test error passing done through errpipe_write in the good case""" 1670 def proper_error(*args): 1671 errpipe_write = args[13] 1672 # Write the hex for the error code EISDIR: 'is a directory' 1673 err_code = '{:x}'.format(errno.EISDIR).encode() 1674 os.write(errpipe_write, b"OSError:" + err_code + b":") 1675 return 0 1676 1677 fork_exec.side_effect = proper_error 1678 1679 with mock.patch("subprocess.os.waitpid", 1680 side_effect=ChildProcessError): 1681 with self.assertRaises(IsADirectoryError): 1682 self.PopenNoDestructor(["non_existent_command"]) 1683 1684 @mock.patch("subprocess._posixsubprocess.fork_exec") 1685 def test_exception_errpipe_bad_data(self, fork_exec): 1686 """Test error passing done through errpipe_write where its not 1687 in the expected format""" 1688 error_data = b"\xFF\x00\xDE\xAD" 1689 def bad_error(*args): 1690 errpipe_write = args[13] 1691 # Anything can be in the pipe, no assumptions should 1692 # be made about its encoding, so we'll write some 1693 # arbitrary hex bytes to test it out 1694 os.write(errpipe_write, error_data) 1695 return 0 1696 1697 fork_exec.side_effect = bad_error 1698 1699 with mock.patch("subprocess.os.waitpid", 1700 side_effect=ChildProcessError): 1701 with self.assertRaises(subprocess.SubprocessError) as e: 1702 self.PopenNoDestructor(["non_existent_command"]) 1703 1704 self.assertIn(repr(error_data), str(e.exception)) 1705 1706 @unittest.skipIf(not os.path.exists('/proc/self/status'), 1707 "need /proc/self/status") 1708 def test_restore_signals(self): 1709 # Blindly assume that cat exists on systems with /proc/self/status... 1710 default_proc_status = subprocess.check_output( 1711 ['cat', '/proc/self/status'], 1712 restore_signals=False) 1713 for line in default_proc_status.splitlines(): 1714 if line.startswith(b'SigIgn'): 1715 default_sig_ign_mask = line 1716 break 1717 else: 1718 self.skipTest("SigIgn not found in /proc/self/status.") 1719 restored_proc_status = subprocess.check_output( 1720 ['cat', '/proc/self/status'], 1721 restore_signals=True) 1722 for line in restored_proc_status.splitlines(): 1723 if line.startswith(b'SigIgn'): 1724 restored_sig_ign_mask = line 1725 break 1726 self.assertNotEqual(default_sig_ign_mask, restored_sig_ign_mask, 1727 msg="restore_signals=True should've unblocked " 1728 "SIGPIPE and friends.") 1729 1730 def test_start_new_session(self): 1731 # For code coverage of calling setsid(). We don't care if we get an 1732 # EPERM error from it depending on the test execution environment, that 1733 # still indicates that it was called. 1734 try: 1735 output = subprocess.check_output( 1736 [sys.executable, "-c", "import os; print(os.getsid(0))"], 1737 start_new_session=True) 1738 except OSError as e: 1739 if e.errno != errno.EPERM: 1740 raise 1741 else: 1742 parent_sid = os.getsid(0) 1743 child_sid = int(output) 1744 self.assertNotEqual(parent_sid, child_sid) 1745 1746 def test_run_abort(self): 1747 # returncode handles signal termination 1748 with support.SuppressCrashReport(): 1749 p = subprocess.Popen([sys.executable, "-c", 1750 'import os; os.abort()']) 1751 p.wait() 1752 self.assertEqual(-p.returncode, signal.SIGABRT) 1753 1754 def test_CalledProcessError_str_signal(self): 1755 err = subprocess.CalledProcessError(-int(signal.SIGABRT), "fake cmd") 1756 error_string = str(err) 1757 # We're relying on the repr() of the signal.Signals intenum to provide 1758 # the word signal, the signal name and the numeric value. 1759 self.assertIn("signal", error_string.lower()) 1760 # We're not being specific about the signal name as some signals have 1761 # multiple names and which name is revealed can vary. 1762 self.assertIn("SIG", error_string) 1763 self.assertIn(str(signal.SIGABRT), error_string) 1764 1765 def test_CalledProcessError_str_unknown_signal(self): 1766 err = subprocess.CalledProcessError(-9876543, "fake cmd") 1767 error_string = str(err) 1768 self.assertIn("unknown signal 9876543.", error_string) 1769 1770 def test_CalledProcessError_str_non_zero(self): 1771 err = subprocess.CalledProcessError(2, "fake cmd") 1772 error_string = str(err) 1773 self.assertIn("non-zero exit status 2.", error_string) 1774 1775 def test_preexec(self): 1776 # DISCLAIMER: Setting environment variables is *not* a good use 1777 # of a preexec_fn. This is merely a test. 1778 p = subprocess.Popen([sys.executable, "-c", 1779 'import sys,os;' 1780 'sys.stdout.write(os.getenv("FRUIT"))'], 1781 stdout=subprocess.PIPE, 1782 preexec_fn=lambda: os.putenv("FRUIT", "apple")) 1783 with p: 1784 self.assertEqual(p.stdout.read(), b"apple") 1785 1786 def test_preexec_exception(self): 1787 def raise_it(): 1788 raise ValueError("What if two swallows carried a coconut?") 1789 try: 1790 p = subprocess.Popen([sys.executable, "-c", ""], 1791 preexec_fn=raise_it) 1792 except subprocess.SubprocessError as e: 1793 self.assertTrue( 1794 subprocess._posixsubprocess, 1795 "Expected a ValueError from the preexec_fn") 1796 except ValueError as e: 1797 self.assertIn("coconut", e.args[0]) 1798 else: 1799 self.fail("Exception raised by preexec_fn did not make it " 1800 "to the parent process.") 1801 1802 class _TestExecuteChildPopen(subprocess.Popen): 1803 """Used to test behavior at the end of _execute_child.""" 1804 def __init__(self, testcase, *args, **kwargs): 1805 self._testcase = testcase 1806 subprocess.Popen.__init__(self, *args, **kwargs) 1807 1808 def _execute_child(self, *args, **kwargs): 1809 try: 1810 subprocess.Popen._execute_child(self, *args, **kwargs) 1811 finally: 1812 # Open a bunch of file descriptors and verify that 1813 # none of them are the same as the ones the Popen 1814 # instance is using for stdin/stdout/stderr. 1815 devzero_fds = [os.open("/dev/zero", os.O_RDONLY) 1816 for _ in range(8)] 1817 try: 1818 for fd in devzero_fds: 1819 self._testcase.assertNotIn( 1820 fd, (self.stdin.fileno(), self.stdout.fileno(), 1821 self.stderr.fileno()), 1822 msg="At least one fd was closed early.") 1823 finally: 1824 for fd in devzero_fds: 1825 os.close(fd) 1826 1827 @unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.") 1828 def test_preexec_errpipe_does_not_double_close_pipes(self): 1829 """Issue16140: Don't double close pipes on preexec error.""" 1830 1831 def raise_it(): 1832 raise subprocess.SubprocessError( 1833 "force the _execute_child() errpipe_data path.") 1834 1835 with self.assertRaises(subprocess.SubprocessError): 1836 self._TestExecuteChildPopen( 1837 self, ZERO_RETURN_CMD, 1838 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 1839 stderr=subprocess.PIPE, preexec_fn=raise_it) 1840 1841 def test_preexec_gc_module_failure(self): 1842 # This tests the code that disables garbage collection if the child 1843 # process will execute any Python. 1844 def raise_runtime_error(): 1845 raise RuntimeError("this shouldn't escape") 1846 enabled = gc.isenabled() 1847 orig_gc_disable = gc.disable 1848 orig_gc_isenabled = gc.isenabled 1849 try: 1850 gc.disable() 1851 self.assertFalse(gc.isenabled()) 1852 subprocess.call([sys.executable, '-c', ''], 1853 preexec_fn=lambda: None) 1854 self.assertFalse(gc.isenabled(), 1855 "Popen enabled gc when it shouldn't.") 1856 1857 gc.enable() 1858 self.assertTrue(gc.isenabled()) 1859 subprocess.call([sys.executable, '-c', ''], 1860 preexec_fn=lambda: None) 1861 self.assertTrue(gc.isenabled(), "Popen left gc disabled.") 1862 1863 gc.disable = raise_runtime_error 1864 self.assertRaises(RuntimeError, subprocess.Popen, 1865 [sys.executable, '-c', ''], 1866 preexec_fn=lambda: None) 1867 1868 del gc.isenabled # force an AttributeError 1869 self.assertRaises(AttributeError, subprocess.Popen, 1870 [sys.executable, '-c', ''], 1871 preexec_fn=lambda: None) 1872 finally: 1873 gc.disable = orig_gc_disable 1874 gc.isenabled = orig_gc_isenabled 1875 if not enabled: 1876 gc.disable() 1877 1878 @unittest.skipIf( 1879 sys.platform == 'darwin', 'setrlimit() seems to fail on OS X') 1880 def test_preexec_fork_failure(self): 1881 # The internal code did not preserve the previous exception when 1882 # re-enabling garbage collection 1883 try: 1884 from resource import getrlimit, setrlimit, RLIMIT_NPROC 1885 except ImportError as err: 1886 self.skipTest(err) # RLIMIT_NPROC is specific to Linux and BSD 1887 limits = getrlimit(RLIMIT_NPROC) 1888 [_, hard] = limits 1889 setrlimit(RLIMIT_NPROC, (0, hard)) 1890 self.addCleanup(setrlimit, RLIMIT_NPROC, limits) 1891 try: 1892 subprocess.call([sys.executable, '-c', ''], 1893 preexec_fn=lambda: None) 1894 except BlockingIOError: 1895 # Forking should raise EAGAIN, translated to BlockingIOError 1896 pass 1897 else: 1898 self.skipTest('RLIMIT_NPROC had no effect; probably superuser') 1899 1900 def test_args_string(self): 1901 # args is a string 1902 fd, fname = tempfile.mkstemp() 1903 # reopen in text mode 1904 with open(fd, "w", errors="surrogateescape") as fobj: 1905 fobj.write("#!%s\n" % support.unix_shell) 1906 fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" % 1907 sys.executable) 1908 os.chmod(fname, 0o700) 1909 p = subprocess.Popen(fname) 1910 p.wait() 1911 os.remove(fname) 1912 self.assertEqual(p.returncode, 47) 1913 1914 def test_invalid_args(self): 1915 # invalid arguments should raise ValueError 1916 self.assertRaises(ValueError, subprocess.call, 1917 [sys.executable, "-c", 1918 "import sys; sys.exit(47)"], 1919 startupinfo=47) 1920 self.assertRaises(ValueError, subprocess.call, 1921 [sys.executable, "-c", 1922 "import sys; sys.exit(47)"], 1923 creationflags=47) 1924 1925 def test_shell_sequence(self): 1926 # Run command through the shell (sequence) 1927 newenv = os.environ.copy() 1928 newenv["FRUIT"] = "apple" 1929 p = subprocess.Popen(["echo $FRUIT"], shell=1, 1930 stdout=subprocess.PIPE, 1931 env=newenv) 1932 with p: 1933 self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple") 1934 1935 def test_shell_string(self): 1936 # Run command through the shell (string) 1937 newenv = os.environ.copy() 1938 newenv["FRUIT"] = "apple" 1939 p = subprocess.Popen("echo $FRUIT", shell=1, 1940 stdout=subprocess.PIPE, 1941 env=newenv) 1942 with p: 1943 self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple") 1944 1945 def test_call_string(self): 1946 # call() function with string argument on UNIX 1947 fd, fname = tempfile.mkstemp() 1948 # reopen in text mode 1949 with open(fd, "w", errors="surrogateescape") as fobj: 1950 fobj.write("#!%s\n" % support.unix_shell) 1951 fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" % 1952 sys.executable) 1953 os.chmod(fname, 0o700) 1954 rc = subprocess.call(fname) 1955 os.remove(fname) 1956 self.assertEqual(rc, 47) 1957 1958 def test_specific_shell(self): 1959 # Issue #9265: Incorrect name passed as arg[0]. 1960 shells = [] 1961 for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']: 1962 for name in ['bash', 'ksh']: 1963 sh = os.path.join(prefix, name) 1964 if os.path.isfile(sh): 1965 shells.append(sh) 1966 if not shells: # Will probably work for any shell but csh. 1967 self.skipTest("bash or ksh required for this test") 1968 sh = '/bin/sh' 1969 if os.path.isfile(sh) and not os.path.islink(sh): 1970 # Test will fail if /bin/sh is a symlink to csh. 1971 shells.append(sh) 1972 for sh in shells: 1973 p = subprocess.Popen("echo $0", executable=sh, shell=True, 1974 stdout=subprocess.PIPE) 1975 with p: 1976 self.assertEqual(p.stdout.read().strip(), bytes(sh, 'ascii')) 1977 1978 def _kill_process(self, method, *args): 1979 # Do not inherit file handles from the parent. 1980 # It should fix failures on some platforms. 1981 # Also set the SIGINT handler to the default to make sure it's not 1982 # being ignored (some tests rely on that.) 1983 old_handler = signal.signal(signal.SIGINT, signal.default_int_handler) 1984 try: 1985 p = subprocess.Popen([sys.executable, "-c", """if 1: 1986 import sys, time 1987 sys.stdout.write('x\\n') 1988 sys.stdout.flush() 1989 time.sleep(30) 1990 """], 1991 close_fds=True, 1992 stdin=subprocess.PIPE, 1993 stdout=subprocess.PIPE, 1994 stderr=subprocess.PIPE) 1995 finally: 1996 signal.signal(signal.SIGINT, old_handler) 1997 # Wait for the interpreter to be completely initialized before 1998 # sending any signal. 1999 p.stdout.read(1) 2000 getattr(p, method)(*args) 2001 return p 2002 2003 @unittest.skipIf(sys.platform.startswith(('netbsd', 'openbsd')), 2004 "Due to known OS bug (issue #16762)") 2005 def _kill_dead_process(self, method, *args): 2006 # Do not inherit file handles from the parent. 2007 # It should fix failures on some platforms. 2008 p = subprocess.Popen([sys.executable, "-c", """if 1: 2009 import sys, time 2010 sys.stdout.write('x\\n') 2011 sys.stdout.flush() 2012 """], 2013 close_fds=True, 2014 stdin=subprocess.PIPE, 2015 stdout=subprocess.PIPE, 2016 stderr=subprocess.PIPE) 2017 # Wait for the interpreter to be completely initialized before 2018 # sending any signal. 2019 p.stdout.read(1) 2020 # The process should end after this 2021 time.sleep(1) 2022 # This shouldn't raise even though the child is now dead 2023 getattr(p, method)(*args) 2024 p.communicate() 2025 2026 def test_send_signal(self): 2027 p = self._kill_process('send_signal', signal.SIGINT) 2028 _, stderr = p.communicate() 2029 self.assertIn(b'KeyboardInterrupt', stderr) 2030 self.assertNotEqual(p.wait(), 0) 2031 2032 def test_kill(self): 2033 p = self._kill_process('kill') 2034 _, stderr = p.communicate() 2035 self.assertStderrEqual(stderr, b'') 2036 self.assertEqual(p.wait(), -signal.SIGKILL) 2037 2038 def test_terminate(self): 2039 p = self._kill_process('terminate') 2040 _, stderr = p.communicate() 2041 self.assertStderrEqual(stderr, b'') 2042 self.assertEqual(p.wait(), -signal.SIGTERM) 2043 2044 def test_send_signal_dead(self): 2045 # Sending a signal to a dead process 2046 self._kill_dead_process('send_signal', signal.SIGINT) 2047 2048 def test_kill_dead(self): 2049 # Killing a dead process 2050 self._kill_dead_process('kill') 2051 2052 def test_terminate_dead(self): 2053 # Terminating a dead process 2054 self._kill_dead_process('terminate') 2055 2056 def _save_fds(self, save_fds): 2057 fds = [] 2058 for fd in save_fds: 2059 inheritable = os.get_inheritable(fd) 2060 saved = os.dup(fd) 2061 fds.append((fd, saved, inheritable)) 2062 return fds 2063 2064 def _restore_fds(self, fds): 2065 for fd, saved, inheritable in fds: 2066 os.dup2(saved, fd, inheritable=inheritable) 2067 os.close(saved) 2068 2069 def check_close_std_fds(self, fds): 2070 # Issue #9905: test that subprocess pipes still work properly with 2071 # some standard fds closed 2072 stdin = 0 2073 saved_fds = self._save_fds(fds) 2074 for fd, saved, inheritable in saved_fds: 2075 if fd == 0: 2076 stdin = saved 2077 break 2078 try: 2079 for fd in fds: 2080 os.close(fd) 2081 out, err = subprocess.Popen([sys.executable, "-c", 2082 'import sys;' 2083 'sys.stdout.write("apple");' 2084 'sys.stdout.flush();' 2085 'sys.stderr.write("orange")'], 2086 stdin=stdin, 2087 stdout=subprocess.PIPE, 2088 stderr=subprocess.PIPE).communicate() 2089 err = support.strip_python_stderr(err) 2090 self.assertEqual((out, err), (b'apple', b'orange')) 2091 finally: 2092 self._restore_fds(saved_fds) 2093 2094 def test_close_fd_0(self): 2095 self.check_close_std_fds([0]) 2096 2097 def test_close_fd_1(self): 2098 self.check_close_std_fds([1]) 2099 2100 def test_close_fd_2(self): 2101 self.check_close_std_fds([2]) 2102 2103 def test_close_fds_0_1(self): 2104 self.check_close_std_fds([0, 1]) 2105 2106 def test_close_fds_0_2(self): 2107 self.check_close_std_fds([0, 2]) 2108 2109 def test_close_fds_1_2(self): 2110 self.check_close_std_fds([1, 2]) 2111 2112 def test_close_fds_0_1_2(self): 2113 # Issue #10806: test that subprocess pipes still work properly with 2114 # all standard fds closed. 2115 self.check_close_std_fds([0, 1, 2]) 2116 2117 def test_small_errpipe_write_fd(self): 2118 """Issue #15798: Popen should work when stdio fds are available.""" 2119 new_stdin = os.dup(0) 2120 new_stdout = os.dup(1) 2121 try: 2122 os.close(0) 2123 os.close(1) 2124 2125 # Side test: if errpipe_write fails to have its CLOEXEC 2126 # flag set this should cause the parent to think the exec 2127 # failed. Extremely unlikely: everyone supports CLOEXEC. 2128 subprocess.Popen([ 2129 sys.executable, "-c", 2130 "print('AssertionError:0:CLOEXEC failure.')"]).wait() 2131 finally: 2132 # Restore original stdin and stdout 2133 os.dup2(new_stdin, 0) 2134 os.dup2(new_stdout, 1) 2135 os.close(new_stdin) 2136 os.close(new_stdout) 2137 2138 def test_remapping_std_fds(self): 2139 # open up some temporary files 2140 temps = [tempfile.mkstemp() for i in range(3)] 2141 try: 2142 temp_fds = [fd for fd, fname in temps] 2143 2144 # unlink the files -- we won't need to reopen them 2145 for fd, fname in temps: 2146 os.unlink(fname) 2147 2148 # write some data to what will become stdin, and rewind 2149 os.write(temp_fds[1], b"STDIN") 2150 os.lseek(temp_fds[1], 0, 0) 2151 2152 # move the standard file descriptors out of the way 2153 saved_fds = self._save_fds(range(3)) 2154 try: 2155 # duplicate the file objects over the standard fd's 2156 for fd, temp_fd in enumerate(temp_fds): 2157 os.dup2(temp_fd, fd) 2158 2159 # now use those files in the "wrong" order, so that subprocess 2160 # has to rearrange them in the child 2161 p = subprocess.Popen([sys.executable, "-c", 2162 'import sys; got = sys.stdin.read();' 2163 'sys.stdout.write("got %s"%got); sys.stderr.write("err")'], 2164 stdin=temp_fds[1], 2165 stdout=temp_fds[2], 2166 stderr=temp_fds[0]) 2167 p.wait() 2168 finally: 2169 self._restore_fds(saved_fds) 2170 2171 for fd in temp_fds: 2172 os.lseek(fd, 0, 0) 2173 2174 out = os.read(temp_fds[2], 1024) 2175 err = support.strip_python_stderr(os.read(temp_fds[0], 1024)) 2176 self.assertEqual(out, b"got STDIN") 2177 self.assertEqual(err, b"err") 2178 2179 finally: 2180 for fd in temp_fds: 2181 os.close(fd) 2182 2183 def check_swap_fds(self, stdin_no, stdout_no, stderr_no): 2184 # open up some temporary files 2185 temps = [tempfile.mkstemp() for i in range(3)] 2186 temp_fds = [fd for fd, fname in temps] 2187 try: 2188 # unlink the files -- we won't need to reopen them 2189 for fd, fname in temps: 2190 os.unlink(fname) 2191 2192 # save a copy of the standard file descriptors 2193 saved_fds = self._save_fds(range(3)) 2194 try: 2195 # duplicate the temp files over the standard fd's 0, 1, 2 2196 for fd, temp_fd in enumerate(temp_fds): 2197 os.dup2(temp_fd, fd) 2198 2199 # write some data to what will become stdin, and rewind 2200 os.write(stdin_no, b"STDIN") 2201 os.lseek(stdin_no, 0, 0) 2202 2203 # now use those files in the given order, so that subprocess 2204 # has to rearrange them in the child 2205 p = subprocess.Popen([sys.executable, "-c", 2206 'import sys; got = sys.stdin.read();' 2207 'sys.stdout.write("got %s"%got); sys.stderr.write("err")'], 2208 stdin=stdin_no, 2209 stdout=stdout_no, 2210 stderr=stderr_no) 2211 p.wait() 2212 2213 for fd in temp_fds: 2214 os.lseek(fd, 0, 0) 2215 2216 out = os.read(stdout_no, 1024) 2217 err = support.strip_python_stderr(os.read(stderr_no, 1024)) 2218 finally: 2219 self._restore_fds(saved_fds) 2220 2221 self.assertEqual(out, b"got STDIN") 2222 self.assertEqual(err, b"err") 2223 2224 finally: 2225 for fd in temp_fds: 2226 os.close(fd) 2227 2228 # When duping fds, if there arises a situation where one of the fds is 2229 # either 0, 1 or 2, it is possible that it is overwritten (#12607). 2230 # This tests all combinations of this. 2231 def test_swap_fds(self): 2232 self.check_swap_fds(0, 1, 2) 2233 self.check_swap_fds(0, 2, 1) 2234 self.check_swap_fds(1, 0, 2) 2235 self.check_swap_fds(1, 2, 0) 2236 self.check_swap_fds(2, 0, 1) 2237 self.check_swap_fds(2, 1, 0) 2238 2239 def _check_swap_std_fds_with_one_closed(self, from_fds, to_fds): 2240 saved_fds = self._save_fds(range(3)) 2241 try: 2242 for from_fd in from_fds: 2243 with tempfile.TemporaryFile() as f: 2244 os.dup2(f.fileno(), from_fd) 2245 2246 fd_to_close = (set(range(3)) - set(from_fds)).pop() 2247 os.close(fd_to_close) 2248 2249 arg_names = ['stdin', 'stdout', 'stderr'] 2250 kwargs = {} 2251 for from_fd, to_fd in zip(from_fds, to_fds): 2252 kwargs[arg_names[to_fd]] = from_fd 2253 2254 code = textwrap.dedent(r''' 2255 import os, sys 2256 skipped_fd = int(sys.argv[1]) 2257 for fd in range(3): 2258 if fd != skipped_fd: 2259 os.write(fd, str(fd).encode('ascii')) 2260 ''') 2261 2262 skipped_fd = (set(range(3)) - set(to_fds)).pop() 2263 2264 rc = subprocess.call([sys.executable, '-c', code, str(skipped_fd)], 2265 **kwargs) 2266 self.assertEqual(rc, 0) 2267 2268 for from_fd, to_fd in zip(from_fds, to_fds): 2269 os.lseek(from_fd, 0, os.SEEK_SET) 2270 read_bytes = os.read(from_fd, 1024) 2271 read_fds = list(map(int, read_bytes.decode('ascii'))) 2272 msg = textwrap.dedent(f""" 2273 When testing {from_fds} to {to_fds} redirection, 2274 parent descriptor {from_fd} got redirected 2275 to descriptor(s) {read_fds} instead of descriptor {to_fd}. 2276 """) 2277 self.assertEqual([to_fd], read_fds, msg) 2278 finally: 2279 self._restore_fds(saved_fds) 2280 2281 # Check that subprocess can remap std fds correctly even 2282 # if one of them is closed (#32844). 2283 def test_swap_std_fds_with_one_closed(self): 2284 for from_fds in itertools.combinations(range(3), 2): 2285 for to_fds in itertools.permutations(range(3), 2): 2286 self._check_swap_std_fds_with_one_closed(from_fds, to_fds) 2287 2288 def test_surrogates_error_message(self): 2289 def prepare(): 2290 raise ValueError("surrogate:\uDCff") 2291 2292 try: 2293 subprocess.call( 2294 ZERO_RETURN_CMD, 2295 preexec_fn=prepare) 2296 except ValueError as err: 2297 # Pure Python implementations keeps the message 2298 self.assertIsNone(subprocess._posixsubprocess) 2299 self.assertEqual(str(err), "surrogate:\uDCff") 2300 except subprocess.SubprocessError as err: 2301 # _posixsubprocess uses a default message 2302 self.assertIsNotNone(subprocess._posixsubprocess) 2303 self.assertEqual(str(err), "Exception occurred in preexec_fn.") 2304 else: 2305 self.fail("Expected ValueError or subprocess.SubprocessError") 2306 2307 def test_undecodable_env(self): 2308 for key, value in (('test', 'abc\uDCFF'), ('test\uDCFF', '42')): 2309 encoded_value = value.encode("ascii", "surrogateescape") 2310 2311 # test str with surrogates 2312 script = "import os; print(ascii(os.getenv(%s)))" % repr(key) 2313 env = os.environ.copy() 2314 env[key] = value 2315 # Use C locale to get ASCII for the locale encoding to force 2316 # surrogate-escaping of \xFF in the child process 2317 env['LC_ALL'] = 'C' 2318 decoded_value = value 2319 stdout = subprocess.check_output( 2320 [sys.executable, "-c", script], 2321 env=env) 2322 stdout = stdout.rstrip(b'\n\r') 2323 self.assertEqual(stdout.decode('ascii'), ascii(decoded_value)) 2324 2325 # test bytes 2326 key = key.encode("ascii", "surrogateescape") 2327 script = "import os; print(ascii(os.getenvb(%s)))" % repr(key) 2328 env = os.environ.copy() 2329 env[key] = encoded_value 2330 stdout = subprocess.check_output( 2331 [sys.executable, "-c", script], 2332 env=env) 2333 stdout = stdout.rstrip(b'\n\r') 2334 self.assertEqual(stdout.decode('ascii'), ascii(encoded_value)) 2335 2336 def test_bytes_program(self): 2337 abs_program = os.fsencode(ZERO_RETURN_CMD[0]) 2338 args = list(ZERO_RETURN_CMD[1:]) 2339 path, program = os.path.split(ZERO_RETURN_CMD[0]) 2340 program = os.fsencode(program) 2341 2342 # absolute bytes path 2343 exitcode = subprocess.call([abs_program]+args) 2344 self.assertEqual(exitcode, 0) 2345 2346 # absolute bytes path as a string 2347 cmd = b"'%s' %s" % (abs_program, " ".join(args).encode("utf-8")) 2348 exitcode = subprocess.call(cmd, shell=True) 2349 self.assertEqual(exitcode, 0) 2350 2351 # bytes program, unicode PATH 2352 env = os.environ.copy() 2353 env["PATH"] = path 2354 exitcode = subprocess.call([program]+args, env=env) 2355 self.assertEqual(exitcode, 0) 2356 2357 # bytes program, bytes PATH 2358 envb = os.environb.copy() 2359 envb[b"PATH"] = os.fsencode(path) 2360 exitcode = subprocess.call([program]+args, env=envb) 2361 self.assertEqual(exitcode, 0) 2362 2363 def test_pipe_cloexec(self): 2364 sleeper = support.findfile("input_reader.py", subdir="subprocessdata") 2365 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 2366 2367 p1 = subprocess.Popen([sys.executable, sleeper], 2368 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 2369 stderr=subprocess.PIPE, close_fds=False) 2370 2371 self.addCleanup(p1.communicate, b'') 2372 2373 p2 = subprocess.Popen([sys.executable, fd_status], 2374 stdout=subprocess.PIPE, close_fds=False) 2375 2376 output, error = p2.communicate() 2377 result_fds = set(map(int, output.split(b','))) 2378 unwanted_fds = set([p1.stdin.fileno(), p1.stdout.fileno(), 2379 p1.stderr.fileno()]) 2380 2381 self.assertFalse(result_fds & unwanted_fds, 2382 "Expected no fds from %r to be open in child, " 2383 "found %r" % 2384 (unwanted_fds, result_fds & unwanted_fds)) 2385 2386 def test_pipe_cloexec_real_tools(self): 2387 qcat = support.findfile("qcat.py", subdir="subprocessdata") 2388 qgrep = support.findfile("qgrep.py", subdir="subprocessdata") 2389 2390 subdata = b'zxcvbn' 2391 data = subdata * 4 + b'\n' 2392 2393 p1 = subprocess.Popen([sys.executable, qcat], 2394 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 2395 close_fds=False) 2396 2397 p2 = subprocess.Popen([sys.executable, qgrep, subdata], 2398 stdin=p1.stdout, stdout=subprocess.PIPE, 2399 close_fds=False) 2400 2401 self.addCleanup(p1.wait) 2402 self.addCleanup(p2.wait) 2403 def kill_p1(): 2404 try: 2405 p1.terminate() 2406 except ProcessLookupError: 2407 pass 2408 def kill_p2(): 2409 try: 2410 p2.terminate() 2411 except ProcessLookupError: 2412 pass 2413 self.addCleanup(kill_p1) 2414 self.addCleanup(kill_p2) 2415 2416 p1.stdin.write(data) 2417 p1.stdin.close() 2418 2419 readfiles, ignored1, ignored2 = select.select([p2.stdout], [], [], 10) 2420 2421 self.assertTrue(readfiles, "The child hung") 2422 self.assertEqual(p2.stdout.read(), data) 2423 2424 p1.stdout.close() 2425 p2.stdout.close() 2426 2427 def test_close_fds(self): 2428 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 2429 2430 fds = os.pipe() 2431 self.addCleanup(os.close, fds[0]) 2432 self.addCleanup(os.close, fds[1]) 2433 2434 open_fds = set(fds) 2435 # add a bunch more fds 2436 for _ in range(9): 2437 fd = os.open(os.devnull, os.O_RDONLY) 2438 self.addCleanup(os.close, fd) 2439 open_fds.add(fd) 2440 2441 for fd in open_fds: 2442 os.set_inheritable(fd, True) 2443 2444 p = subprocess.Popen([sys.executable, fd_status], 2445 stdout=subprocess.PIPE, close_fds=False) 2446 output, ignored = p.communicate() 2447 remaining_fds = set(map(int, output.split(b','))) 2448 2449 self.assertEqual(remaining_fds & open_fds, open_fds, 2450 "Some fds were closed") 2451 2452 p = subprocess.Popen([sys.executable, fd_status], 2453 stdout=subprocess.PIPE, close_fds=True) 2454 output, ignored = p.communicate() 2455 remaining_fds = set(map(int, output.split(b','))) 2456 2457 self.assertFalse(remaining_fds & open_fds, 2458 "Some fds were left open") 2459 self.assertIn(1, remaining_fds, "Subprocess failed") 2460 2461 # Keep some of the fd's we opened open in the subprocess. 2462 # This tests _posixsubprocess.c's proper handling of fds_to_keep. 2463 fds_to_keep = set(open_fds.pop() for _ in range(8)) 2464 p = subprocess.Popen([sys.executable, fd_status], 2465 stdout=subprocess.PIPE, close_fds=True, 2466 pass_fds=fds_to_keep) 2467 output, ignored = p.communicate() 2468 remaining_fds = set(map(int, output.split(b','))) 2469 2470 self.assertFalse((remaining_fds - fds_to_keep) & open_fds, 2471 "Some fds not in pass_fds were left open") 2472 self.assertIn(1, remaining_fds, "Subprocess failed") 2473 2474 2475 @unittest.skipIf(sys.platform.startswith("freebsd") and 2476 os.stat("/dev").st_dev == os.stat("/dev/fd").st_dev, 2477 "Requires fdescfs mounted on /dev/fd on FreeBSD.") 2478 def test_close_fds_when_max_fd_is_lowered(self): 2479 """Confirm that issue21618 is fixed (may fail under valgrind).""" 2480 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 2481 2482 # This launches the meat of the test in a child process to 2483 # avoid messing with the larger unittest processes maximum 2484 # number of file descriptors. 2485 # This process launches: 2486 # +--> Process that lowers its RLIMIT_NOFILE aftr setting up 2487 # a bunch of high open fds above the new lower rlimit. 2488 # Those are reported via stdout before launching a new 2489 # process with close_fds=False to run the actual test: 2490 # +--> The TEST: This one launches a fd_status.py 2491 # subprocess with close_fds=True so we can find out if 2492 # any of the fds above the lowered rlimit are still open. 2493 p = subprocess.Popen([sys.executable, '-c', textwrap.dedent( 2494 ''' 2495 import os, resource, subprocess, sys, textwrap 2496 open_fds = set() 2497 # Add a bunch more fds to pass down. 2498 for _ in range(40): 2499 fd = os.open(os.devnull, os.O_RDONLY) 2500 open_fds.add(fd) 2501 2502 # Leave a two pairs of low ones available for use by the 2503 # internal child error pipe and the stdout pipe. 2504 # We also leave 10 more open as some Python buildbots run into 2505 # "too many open files" errors during the test if we do not. 2506 for fd in sorted(open_fds)[:14]: 2507 os.close(fd) 2508 open_fds.remove(fd) 2509 2510 for fd in open_fds: 2511 #self.addCleanup(os.close, fd) 2512 os.set_inheritable(fd, True) 2513 2514 max_fd_open = max(open_fds) 2515 2516 # Communicate the open_fds to the parent unittest.TestCase process. 2517 print(','.join(map(str, sorted(open_fds)))) 2518 sys.stdout.flush() 2519 2520 rlim_cur, rlim_max = resource.getrlimit(resource.RLIMIT_NOFILE) 2521 try: 2522 # 29 is lower than the highest fds we are leaving open. 2523 resource.setrlimit(resource.RLIMIT_NOFILE, (29, rlim_max)) 2524 # Launch a new Python interpreter with our low fd rlim_cur that 2525 # inherits open fds above that limit. It then uses subprocess 2526 # with close_fds=True to get a report of open fds in the child. 2527 # An explicit list of fds to check is passed to fd_status.py as 2528 # letting fd_status rely on its default logic would miss the 2529 # fds above rlim_cur as it normally only checks up to that limit. 2530 subprocess.Popen( 2531 [sys.executable, '-c', 2532 textwrap.dedent(""" 2533 import subprocess, sys 2534 subprocess.Popen([sys.executable, %r] + 2535 [str(x) for x in range({max_fd})], 2536 close_fds=True).wait() 2537 """.format(max_fd=max_fd_open+1))], 2538 close_fds=False).wait() 2539 finally: 2540 resource.setrlimit(resource.RLIMIT_NOFILE, (rlim_cur, rlim_max)) 2541 ''' % fd_status)], stdout=subprocess.PIPE) 2542 2543 output, unused_stderr = p.communicate() 2544 output_lines = output.splitlines() 2545 self.assertEqual(len(output_lines), 2, 2546 msg="expected exactly two lines of output:\n%r" % output) 2547 opened_fds = set(map(int, output_lines[0].strip().split(b','))) 2548 remaining_fds = set(map(int, output_lines[1].strip().split(b','))) 2549 2550 self.assertFalse(remaining_fds & opened_fds, 2551 msg="Some fds were left open.") 2552 2553 2554 # Mac OS X Tiger (10.4) has a kernel bug: sometimes, the file 2555 # descriptor of a pipe closed in the parent process is valid in the 2556 # child process according to fstat(), but the mode of the file 2557 # descriptor is invalid, and read or write raise an error. 2558 @support.requires_mac_ver(10, 5) 2559 def test_pass_fds(self): 2560 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 2561 2562 open_fds = set() 2563 2564 for x in range(5): 2565 fds = os.pipe() 2566 self.addCleanup(os.close, fds[0]) 2567 self.addCleanup(os.close, fds[1]) 2568 os.set_inheritable(fds[0], True) 2569 os.set_inheritable(fds[1], True) 2570 open_fds.update(fds) 2571 2572 for fd in open_fds: 2573 p = subprocess.Popen([sys.executable, fd_status], 2574 stdout=subprocess.PIPE, close_fds=True, 2575 pass_fds=(fd, )) 2576 output, ignored = p.communicate() 2577 2578 remaining_fds = set(map(int, output.split(b','))) 2579 to_be_closed = open_fds - {fd} 2580 2581 self.assertIn(fd, remaining_fds, "fd to be passed not passed") 2582 self.assertFalse(remaining_fds & to_be_closed, 2583 "fd to be closed passed") 2584 2585 # pass_fds overrides close_fds with a warning. 2586 with self.assertWarns(RuntimeWarning) as context: 2587 self.assertFalse(subprocess.call( 2588 ZERO_RETURN_CMD, 2589 close_fds=False, pass_fds=(fd, ))) 2590 self.assertIn('overriding close_fds', str(context.warning)) 2591 2592 def test_pass_fds_inheritable(self): 2593 script = support.findfile("fd_status.py", subdir="subprocessdata") 2594 2595 inheritable, non_inheritable = os.pipe() 2596 self.addCleanup(os.close, inheritable) 2597 self.addCleanup(os.close, non_inheritable) 2598 os.set_inheritable(inheritable, True) 2599 os.set_inheritable(non_inheritable, False) 2600 pass_fds = (inheritable, non_inheritable) 2601 args = [sys.executable, script] 2602 args += list(map(str, pass_fds)) 2603 2604 p = subprocess.Popen(args, 2605 stdout=subprocess.PIPE, close_fds=True, 2606 pass_fds=pass_fds) 2607 output, ignored = p.communicate() 2608 fds = set(map(int, output.split(b','))) 2609 2610 # the inheritable file descriptor must be inherited, so its inheritable 2611 # flag must be set in the child process after fork() and before exec() 2612 self.assertEqual(fds, set(pass_fds), "output=%a" % output) 2613 2614 # inheritable flag must not be changed in the parent process 2615 self.assertEqual(os.get_inheritable(inheritable), True) 2616 self.assertEqual(os.get_inheritable(non_inheritable), False) 2617 2618 2619 # bpo-32270: Ensure that descriptors specified in pass_fds 2620 # are inherited even if they are used in redirections. 2621 # Contributed by @izbyshev. 2622 def test_pass_fds_redirected(self): 2623 """Regression test for https://bugs.python.org/issue32270.""" 2624 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 2625 pass_fds = [] 2626 for _ in range(2): 2627 fd = os.open(os.devnull, os.O_RDWR) 2628 self.addCleanup(os.close, fd) 2629 pass_fds.append(fd) 2630 2631 stdout_r, stdout_w = os.pipe() 2632 self.addCleanup(os.close, stdout_r) 2633 self.addCleanup(os.close, stdout_w) 2634 pass_fds.insert(1, stdout_w) 2635 2636 with subprocess.Popen([sys.executable, fd_status], 2637 stdin=pass_fds[0], 2638 stdout=pass_fds[1], 2639 stderr=pass_fds[2], 2640 close_fds=True, 2641 pass_fds=pass_fds): 2642 output = os.read(stdout_r, 1024) 2643 fds = {int(num) for num in output.split(b',')} 2644 2645 self.assertEqual(fds, {0, 1, 2} | frozenset(pass_fds), f"output={output!a}") 2646 2647 2648 def test_stdout_stdin_are_single_inout_fd(self): 2649 with io.open(os.devnull, "r+") as inout: 2650 p = subprocess.Popen(ZERO_RETURN_CMD, 2651 stdout=inout, stdin=inout) 2652 p.wait() 2653 2654 def test_stdout_stderr_are_single_inout_fd(self): 2655 with io.open(os.devnull, "r+") as inout: 2656 p = subprocess.Popen(ZERO_RETURN_CMD, 2657 stdout=inout, stderr=inout) 2658 p.wait() 2659 2660 def test_stderr_stdin_are_single_inout_fd(self): 2661 with io.open(os.devnull, "r+") as inout: 2662 p = subprocess.Popen(ZERO_RETURN_CMD, 2663 stderr=inout, stdin=inout) 2664 p.wait() 2665 2666 def test_wait_when_sigchild_ignored(self): 2667 # NOTE: sigchild_ignore.py may not be an effective test on all OSes. 2668 sigchild_ignore = support.findfile("sigchild_ignore.py", 2669 subdir="subprocessdata") 2670 p = subprocess.Popen([sys.executable, sigchild_ignore], 2671 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 2672 stdout, stderr = p.communicate() 2673 self.assertEqual(0, p.returncode, "sigchild_ignore.py exited" 2674 " non-zero with this error:\n%s" % 2675 stderr.decode('utf-8')) 2676 2677 def test_select_unbuffered(self): 2678 # Issue #11459: bufsize=0 should really set the pipes as 2679 # unbuffered (and therefore let select() work properly). 2680 select = support.import_module("select") 2681 p = subprocess.Popen([sys.executable, "-c", 2682 'import sys;' 2683 'sys.stdout.write("apple")'], 2684 stdout=subprocess.PIPE, 2685 bufsize=0) 2686 f = p.stdout 2687 self.addCleanup(f.close) 2688 try: 2689 self.assertEqual(f.read(4), b"appl") 2690 self.assertIn(f, select.select([f], [], [], 0.0)[0]) 2691 finally: 2692 p.wait() 2693 2694 def test_zombie_fast_process_del(self): 2695 # Issue #12650: on Unix, if Popen.__del__() was called before the 2696 # process exited, it wouldn't be added to subprocess._active, and would 2697 # remain a zombie. 2698 # spawn a Popen, and delete its reference before it exits 2699 p = subprocess.Popen([sys.executable, "-c", 2700 'import sys, time;' 2701 'time.sleep(0.2)'], 2702 stdout=subprocess.PIPE, 2703 stderr=subprocess.PIPE) 2704 self.addCleanup(p.stdout.close) 2705 self.addCleanup(p.stderr.close) 2706 ident = id(p) 2707 pid = p.pid 2708 with support.check_warnings(('', ResourceWarning)): 2709 p = None 2710 2711 if mswindows: 2712 # subprocess._active is not used on Windows and is set to None. 2713 self.assertIsNone(subprocess._active) 2714 else: 2715 # check that p is in the active processes list 2716 self.assertIn(ident, [id(o) for o in subprocess._active]) 2717 2718 def test_leak_fast_process_del_killed(self): 2719 # Issue #12650: on Unix, if Popen.__del__() was called before the 2720 # process exited, and the process got killed by a signal, it would never 2721 # be removed from subprocess._active, which triggered a FD and memory 2722 # leak. 2723 # spawn a Popen, delete its reference and kill it 2724 p = subprocess.Popen([sys.executable, "-c", 2725 'import time;' 2726 'time.sleep(3)'], 2727 stdout=subprocess.PIPE, 2728 stderr=subprocess.PIPE) 2729 self.addCleanup(p.stdout.close) 2730 self.addCleanup(p.stderr.close) 2731 ident = id(p) 2732 pid = p.pid 2733 with support.check_warnings(('', ResourceWarning)): 2734 p = None 2735 2736 os.kill(pid, signal.SIGKILL) 2737 if mswindows: 2738 # subprocess._active is not used on Windows and is set to None. 2739 self.assertIsNone(subprocess._active) 2740 else: 2741 # check that p is in the active processes list 2742 self.assertIn(ident, [id(o) for o in subprocess._active]) 2743 2744 # let some time for the process to exit, and create a new Popen: this 2745 # should trigger the wait() of p 2746 time.sleep(0.2) 2747 with self.assertRaises(OSError): 2748 with subprocess.Popen(NONEXISTING_CMD, 2749 stdout=subprocess.PIPE, 2750 stderr=subprocess.PIPE) as proc: 2751 pass 2752 # p should have been wait()ed on, and removed from the _active list 2753 self.assertRaises(OSError, os.waitpid, pid, 0) 2754 if mswindows: 2755 # subprocess._active is not used on Windows and is set to None. 2756 self.assertIsNone(subprocess._active) 2757 else: 2758 self.assertNotIn(ident, [id(o) for o in subprocess._active]) 2759 2760 def test_close_fds_after_preexec(self): 2761 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 2762 2763 # this FD is used as dup2() target by preexec_fn, and should be closed 2764 # in the child process 2765 fd = os.dup(1) 2766 self.addCleanup(os.close, fd) 2767 2768 p = subprocess.Popen([sys.executable, fd_status], 2769 stdout=subprocess.PIPE, close_fds=True, 2770 preexec_fn=lambda: os.dup2(1, fd)) 2771 output, ignored = p.communicate() 2772 2773 remaining_fds = set(map(int, output.split(b','))) 2774 2775 self.assertNotIn(fd, remaining_fds) 2776 2777 @support.cpython_only 2778 def test_fork_exec(self): 2779 # Issue #22290: fork_exec() must not crash on memory allocation failure 2780 # or other errors 2781 import _posixsubprocess 2782 gc_enabled = gc.isenabled() 2783 try: 2784 # Use a preexec function and enable the garbage collector 2785 # to force fork_exec() to re-enable the garbage collector 2786 # on error. 2787 func = lambda: None 2788 gc.enable() 2789 2790 for args, exe_list, cwd, env_list in ( 2791 (123, [b"exe"], None, [b"env"]), 2792 ([b"arg"], 123, None, [b"env"]), 2793 ([b"arg"], [b"exe"], 123, [b"env"]), 2794 ([b"arg"], [b"exe"], None, 123), 2795 ): 2796 with self.assertRaises(TypeError): 2797 _posixsubprocess.fork_exec( 2798 args, exe_list, 2799 True, (), cwd, env_list, 2800 -1, -1, -1, -1, 2801 1, 2, 3, 4, 2802 True, True, func) 2803 finally: 2804 if not gc_enabled: 2805 gc.disable() 2806 2807 @support.cpython_only 2808 def test_fork_exec_sorted_fd_sanity_check(self): 2809 # Issue #23564: sanity check the fork_exec() fds_to_keep sanity check. 2810 import _posixsubprocess 2811 class BadInt: 2812 first = True 2813 def __init__(self, value): 2814 self.value = value 2815 def __int__(self): 2816 if self.first: 2817 self.first = False 2818 return self.value 2819 raise ValueError 2820 2821 gc_enabled = gc.isenabled() 2822 try: 2823 gc.enable() 2824 2825 for fds_to_keep in ( 2826 (-1, 2, 3, 4, 5), # Negative number. 2827 ('str', 4), # Not an int. 2828 (18, 23, 42, 2**63), # Out of range. 2829 (5, 4), # Not sorted. 2830 (6, 7, 7, 8), # Duplicate. 2831 (BadInt(1), BadInt(2)), 2832 ): 2833 with self.assertRaises( 2834 ValueError, 2835 msg='fds_to_keep={}'.format(fds_to_keep)) as c: 2836 _posixsubprocess.fork_exec( 2837 [b"false"], [b"false"], 2838 True, fds_to_keep, None, [b"env"], 2839 -1, -1, -1, -1, 2840 1, 2, 3, 4, 2841 True, True, None) 2842 self.assertIn('fds_to_keep', str(c.exception)) 2843 finally: 2844 if not gc_enabled: 2845 gc.disable() 2846 2847 def test_communicate_BrokenPipeError_stdin_close(self): 2848 # By not setting stdout or stderr or a timeout we force the fast path 2849 # that just calls _stdin_write() internally due to our mock. 2850 proc = subprocess.Popen(ZERO_RETURN_CMD) 2851 with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin: 2852 mock_proc_stdin.close.side_effect = BrokenPipeError 2853 proc.communicate() # Should swallow BrokenPipeError from close. 2854 mock_proc_stdin.close.assert_called_with() 2855 2856 def test_communicate_BrokenPipeError_stdin_write(self): 2857 # By not setting stdout or stderr or a timeout we force the fast path 2858 # that just calls _stdin_write() internally due to our mock. 2859 proc = subprocess.Popen(ZERO_RETURN_CMD) 2860 with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin: 2861 mock_proc_stdin.write.side_effect = BrokenPipeError 2862 proc.communicate(b'stuff') # Should swallow the BrokenPipeError. 2863 mock_proc_stdin.write.assert_called_once_with(b'stuff') 2864 mock_proc_stdin.close.assert_called_once_with() 2865 2866 def test_communicate_BrokenPipeError_stdin_flush(self): 2867 # Setting stdin and stdout forces the ._communicate() code path. 2868 # python -h exits faster than python -c pass (but spams stdout). 2869 proc = subprocess.Popen([sys.executable, '-h'], 2870 stdin=subprocess.PIPE, 2871 stdout=subprocess.PIPE) 2872 with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin, \ 2873 open(os.devnull, 'wb') as dev_null: 2874 mock_proc_stdin.flush.side_effect = BrokenPipeError 2875 # because _communicate registers a selector using proc.stdin... 2876 mock_proc_stdin.fileno.return_value = dev_null.fileno() 2877 # _communicate() should swallow BrokenPipeError from flush. 2878 proc.communicate(b'stuff') 2879 mock_proc_stdin.flush.assert_called_once_with() 2880 2881 def test_communicate_BrokenPipeError_stdin_close_with_timeout(self): 2882 # Setting stdin and stdout forces the ._communicate() code path. 2883 # python -h exits faster than python -c pass (but spams stdout). 2884 proc = subprocess.Popen([sys.executable, '-h'], 2885 stdin=subprocess.PIPE, 2886 stdout=subprocess.PIPE) 2887 with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin: 2888 mock_proc_stdin.close.side_effect = BrokenPipeError 2889 # _communicate() should swallow BrokenPipeError from close. 2890 proc.communicate(timeout=999) 2891 mock_proc_stdin.close.assert_called_once_with() 2892 2893 @unittest.skipUnless(_testcapi is not None 2894 and hasattr(_testcapi, 'W_STOPCODE'), 2895 'need _testcapi.W_STOPCODE') 2896 def test_stopped(self): 2897 """Test wait() behavior when waitpid returns WIFSTOPPED; issue29335.""" 2898 args = ZERO_RETURN_CMD 2899 proc = subprocess.Popen(args) 2900 2901 # Wait until the real process completes to avoid zombie process 2902 pid = proc.pid 2903 pid, status = os.waitpid(pid, 0) 2904 self.assertEqual(status, 0) 2905 2906 status = _testcapi.W_STOPCODE(3) 2907 with mock.patch('subprocess.os.waitpid', return_value=(pid, status)): 2908 returncode = proc.wait() 2909 2910 self.assertEqual(returncode, -3) 2911 2912 def test_communicate_repeated_call_after_stdout_close(self): 2913 proc = subprocess.Popen([sys.executable, '-c', 2914 'import os, time; os.close(1), time.sleep(2)'], 2915 stdout=subprocess.PIPE) 2916 while True: 2917 try: 2918 proc.communicate(timeout=0.1) 2919 return 2920 except subprocess.TimeoutExpired: 2921 pass 2922 2923 2924@unittest.skipUnless(mswindows, "Windows specific tests") 2925class Win32ProcessTestCase(BaseTestCase): 2926 2927 def test_startupinfo(self): 2928 # startupinfo argument 2929 # We uses hardcoded constants, because we do not want to 2930 # depend on win32all. 2931 STARTF_USESHOWWINDOW = 1 2932 SW_MAXIMIZE = 3 2933 startupinfo = subprocess.STARTUPINFO() 2934 startupinfo.dwFlags = STARTF_USESHOWWINDOW 2935 startupinfo.wShowWindow = SW_MAXIMIZE 2936 # Since Python is a console process, it won't be affected 2937 # by wShowWindow, but the argument should be silently 2938 # ignored 2939 subprocess.call(ZERO_RETURN_CMD, 2940 startupinfo=startupinfo) 2941 2942 def test_startupinfo_keywords(self): 2943 # startupinfo argument 2944 # We use hardcoded constants, because we do not want to 2945 # depend on win32all. 2946 STARTF_USERSHOWWINDOW = 1 2947 SW_MAXIMIZE = 3 2948 startupinfo = subprocess.STARTUPINFO( 2949 dwFlags=STARTF_USERSHOWWINDOW, 2950 wShowWindow=SW_MAXIMIZE 2951 ) 2952 # Since Python is a console process, it won't be affected 2953 # by wShowWindow, but the argument should be silently 2954 # ignored 2955 subprocess.call(ZERO_RETURN_CMD, 2956 startupinfo=startupinfo) 2957 2958 def test_startupinfo_copy(self): 2959 # bpo-34044: Popen must not modify input STARTUPINFO structure 2960 startupinfo = subprocess.STARTUPINFO() 2961 startupinfo.dwFlags = subprocess.STARTF_USESHOWWINDOW 2962 startupinfo.wShowWindow = subprocess.SW_HIDE 2963 2964 # Call Popen() twice with the same startupinfo object to make sure 2965 # that it's not modified 2966 for _ in range(2): 2967 cmd = ZERO_RETURN_CMD 2968 with open(os.devnull, 'w') as null: 2969 proc = subprocess.Popen(cmd, 2970 stdout=null, 2971 stderr=subprocess.STDOUT, 2972 startupinfo=startupinfo) 2973 with proc: 2974 proc.communicate() 2975 self.assertEqual(proc.returncode, 0) 2976 2977 self.assertEqual(startupinfo.dwFlags, 2978 subprocess.STARTF_USESHOWWINDOW) 2979 self.assertIsNone(startupinfo.hStdInput) 2980 self.assertIsNone(startupinfo.hStdOutput) 2981 self.assertIsNone(startupinfo.hStdError) 2982 self.assertEqual(startupinfo.wShowWindow, subprocess.SW_HIDE) 2983 self.assertEqual(startupinfo.lpAttributeList, {"handle_list": []}) 2984 2985 def test_creationflags(self): 2986 # creationflags argument 2987 CREATE_NEW_CONSOLE = 16 2988 sys.stderr.write(" a DOS box should flash briefly ...\n") 2989 subprocess.call(sys.executable + 2990 ' -c "import time; time.sleep(0.25)"', 2991 creationflags=CREATE_NEW_CONSOLE) 2992 2993 def test_invalid_args(self): 2994 # invalid arguments should raise ValueError 2995 self.assertRaises(ValueError, subprocess.call, 2996 [sys.executable, "-c", 2997 "import sys; sys.exit(47)"], 2998 preexec_fn=lambda: 1) 2999 3000 @support.cpython_only 3001 def test_issue31471(self): 3002 # There shouldn't be an assertion failure in Popen() in case the env 3003 # argument has a bad keys() method. 3004 class BadEnv(dict): 3005 keys = None 3006 with self.assertRaises(TypeError): 3007 subprocess.Popen(ZERO_RETURN_CMD, env=BadEnv()) 3008 3009 def test_close_fds(self): 3010 # close file descriptors 3011 rc = subprocess.call([sys.executable, "-c", 3012 "import sys; sys.exit(47)"], 3013 close_fds=True) 3014 self.assertEqual(rc, 47) 3015 3016 def test_close_fds_with_stdio(self): 3017 import msvcrt 3018 3019 fds = os.pipe() 3020 self.addCleanup(os.close, fds[0]) 3021 self.addCleanup(os.close, fds[1]) 3022 3023 handles = [] 3024 for fd in fds: 3025 os.set_inheritable(fd, True) 3026 handles.append(msvcrt.get_osfhandle(fd)) 3027 3028 p = subprocess.Popen([sys.executable, "-c", 3029 "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])], 3030 stdout=subprocess.PIPE, close_fds=False) 3031 stdout, stderr = p.communicate() 3032 self.assertEqual(p.returncode, 0) 3033 int(stdout.strip()) # Check that stdout is an integer 3034 3035 p = subprocess.Popen([sys.executable, "-c", 3036 "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])], 3037 stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True) 3038 stdout, stderr = p.communicate() 3039 self.assertEqual(p.returncode, 1) 3040 self.assertIn(b"OSError", stderr) 3041 3042 # The same as the previous call, but with an empty handle_list 3043 handle_list = [] 3044 startupinfo = subprocess.STARTUPINFO() 3045 startupinfo.lpAttributeList = {"handle_list": handle_list} 3046 p = subprocess.Popen([sys.executable, "-c", 3047 "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])], 3048 stdout=subprocess.PIPE, stderr=subprocess.PIPE, 3049 startupinfo=startupinfo, close_fds=True) 3050 stdout, stderr = p.communicate() 3051 self.assertEqual(p.returncode, 1) 3052 self.assertIn(b"OSError", stderr) 3053 3054 # Check for a warning due to using handle_list and close_fds=False 3055 with support.check_warnings((".*overriding close_fds", RuntimeWarning)): 3056 startupinfo = subprocess.STARTUPINFO() 3057 startupinfo.lpAttributeList = {"handle_list": handles[:]} 3058 p = subprocess.Popen([sys.executable, "-c", 3059 "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])], 3060 stdout=subprocess.PIPE, stderr=subprocess.PIPE, 3061 startupinfo=startupinfo, close_fds=False) 3062 stdout, stderr = p.communicate() 3063 self.assertEqual(p.returncode, 0) 3064 3065 def test_empty_attribute_list(self): 3066 startupinfo = subprocess.STARTUPINFO() 3067 startupinfo.lpAttributeList = {} 3068 subprocess.call(ZERO_RETURN_CMD, 3069 startupinfo=startupinfo) 3070 3071 def test_empty_handle_list(self): 3072 startupinfo = subprocess.STARTUPINFO() 3073 startupinfo.lpAttributeList = {"handle_list": []} 3074 subprocess.call(ZERO_RETURN_CMD, 3075 startupinfo=startupinfo) 3076 3077 def test_shell_sequence(self): 3078 # Run command through the shell (sequence) 3079 newenv = os.environ.copy() 3080 newenv["FRUIT"] = "physalis" 3081 p = subprocess.Popen(["set"], shell=1, 3082 stdout=subprocess.PIPE, 3083 env=newenv) 3084 with p: 3085 self.assertIn(b"physalis", p.stdout.read()) 3086 3087 def test_shell_string(self): 3088 # Run command through the shell (string) 3089 newenv = os.environ.copy() 3090 newenv["FRUIT"] = "physalis" 3091 p = subprocess.Popen("set", shell=1, 3092 stdout=subprocess.PIPE, 3093 env=newenv) 3094 with p: 3095 self.assertIn(b"physalis", p.stdout.read()) 3096 3097 def test_shell_encodings(self): 3098 # Run command through the shell (string) 3099 for enc in ['ansi', 'oem']: 3100 newenv = os.environ.copy() 3101 newenv["FRUIT"] = "physalis" 3102 p = subprocess.Popen("set", shell=1, 3103 stdout=subprocess.PIPE, 3104 env=newenv, 3105 encoding=enc) 3106 with p: 3107 self.assertIn("physalis", p.stdout.read(), enc) 3108 3109 def test_call_string(self): 3110 # call() function with string argument on Windows 3111 rc = subprocess.call(sys.executable + 3112 ' -c "import sys; sys.exit(47)"') 3113 self.assertEqual(rc, 47) 3114 3115 def _kill_process(self, method, *args): 3116 # Some win32 buildbot raises EOFError if stdin is inherited 3117 p = subprocess.Popen([sys.executable, "-c", """if 1: 3118 import sys, time 3119 sys.stdout.write('x\\n') 3120 sys.stdout.flush() 3121 time.sleep(30) 3122 """], 3123 stdin=subprocess.PIPE, 3124 stdout=subprocess.PIPE, 3125 stderr=subprocess.PIPE) 3126 with p: 3127 # Wait for the interpreter to be completely initialized before 3128 # sending any signal. 3129 p.stdout.read(1) 3130 getattr(p, method)(*args) 3131 _, stderr = p.communicate() 3132 self.assertStderrEqual(stderr, b'') 3133 returncode = p.wait() 3134 self.assertNotEqual(returncode, 0) 3135 3136 def _kill_dead_process(self, method, *args): 3137 p = subprocess.Popen([sys.executable, "-c", """if 1: 3138 import sys, time 3139 sys.stdout.write('x\\n') 3140 sys.stdout.flush() 3141 sys.exit(42) 3142 """], 3143 stdin=subprocess.PIPE, 3144 stdout=subprocess.PIPE, 3145 stderr=subprocess.PIPE) 3146 with p: 3147 # Wait for the interpreter to be completely initialized before 3148 # sending any signal. 3149 p.stdout.read(1) 3150 # The process should end after this 3151 time.sleep(1) 3152 # This shouldn't raise even though the child is now dead 3153 getattr(p, method)(*args) 3154 _, stderr = p.communicate() 3155 self.assertStderrEqual(stderr, b'') 3156 rc = p.wait() 3157 self.assertEqual(rc, 42) 3158 3159 def test_send_signal(self): 3160 self._kill_process('send_signal', signal.SIGTERM) 3161 3162 def test_kill(self): 3163 self._kill_process('kill') 3164 3165 def test_terminate(self): 3166 self._kill_process('terminate') 3167 3168 def test_send_signal_dead(self): 3169 self._kill_dead_process('send_signal', signal.SIGTERM) 3170 3171 def test_kill_dead(self): 3172 self._kill_dead_process('kill') 3173 3174 def test_terminate_dead(self): 3175 self._kill_dead_process('terminate') 3176 3177class MiscTests(unittest.TestCase): 3178 3179 class RecordingPopen(subprocess.Popen): 3180 """A Popen that saves a reference to each instance for testing.""" 3181 instances_created = [] 3182 3183 def __init__(self, *args, **kwargs): 3184 super().__init__(*args, **kwargs) 3185 self.instances_created.append(self) 3186 3187 @mock.patch.object(subprocess.Popen, "_communicate") 3188 def _test_keyboardinterrupt_no_kill(self, popener, mock__communicate, 3189 **kwargs): 3190 """Fake a SIGINT happening during Popen._communicate() and ._wait(). 3191 3192 This avoids the need to actually try and get test environments to send 3193 and receive signals reliably across platforms. The net effect of a ^C 3194 happening during a blocking subprocess execution which we want to clean 3195 up from is a KeyboardInterrupt coming out of communicate() or wait(). 3196 """ 3197 3198 mock__communicate.side_effect = KeyboardInterrupt 3199 try: 3200 with mock.patch.object(subprocess.Popen, "_wait") as mock__wait: 3201 # We patch out _wait() as no signal was involved so the 3202 # child process isn't actually going to exit rapidly. 3203 mock__wait.side_effect = KeyboardInterrupt 3204 with mock.patch.object(subprocess, "Popen", 3205 self.RecordingPopen): 3206 with self.assertRaises(KeyboardInterrupt): 3207 popener([sys.executable, "-c", 3208 "import time\ntime.sleep(9)\nimport sys\n" 3209 "sys.stderr.write('\\n!runaway child!\\n')"], 3210 stdout=subprocess.DEVNULL, **kwargs) 3211 for call in mock__wait.call_args_list[1:]: 3212 self.assertNotEqual( 3213 call, mock.call(timeout=None), 3214 "no open-ended wait() after the first allowed: " 3215 f"{mock__wait.call_args_list}") 3216 sigint_calls = [] 3217 for call in mock__wait.call_args_list: 3218 if call == mock.call(timeout=0.25): # from Popen.__init__ 3219 sigint_calls.append(call) 3220 self.assertLessEqual(mock__wait.call_count, 2, 3221 msg=mock__wait.call_args_list) 3222 self.assertEqual(len(sigint_calls), 1, 3223 msg=mock__wait.call_args_list) 3224 finally: 3225 # cleanup the forgotten (due to our mocks) child process 3226 process = self.RecordingPopen.instances_created.pop() 3227 process.kill() 3228 process.wait() 3229 self.assertEqual([], self.RecordingPopen.instances_created) 3230 3231 def test_call_keyboardinterrupt_no_kill(self): 3232 self._test_keyboardinterrupt_no_kill(subprocess.call, timeout=6.282) 3233 3234 def test_run_keyboardinterrupt_no_kill(self): 3235 self._test_keyboardinterrupt_no_kill(subprocess.run, timeout=6.282) 3236 3237 def test_context_manager_keyboardinterrupt_no_kill(self): 3238 def popen_via_context_manager(*args, **kwargs): 3239 with subprocess.Popen(*args, **kwargs) as unused_process: 3240 raise KeyboardInterrupt # Test how __exit__ handles ^C. 3241 self._test_keyboardinterrupt_no_kill(popen_via_context_manager) 3242 3243 def test_getoutput(self): 3244 self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy') 3245 self.assertEqual(subprocess.getstatusoutput('echo xyzzy'), 3246 (0, 'xyzzy')) 3247 3248 # we use mkdtemp in the next line to create an empty directory 3249 # under our exclusive control; from that, we can invent a pathname 3250 # that we _know_ won't exist. This is guaranteed to fail. 3251 dir = None 3252 try: 3253 dir = tempfile.mkdtemp() 3254 name = os.path.join(dir, "foo") 3255 status, output = subprocess.getstatusoutput( 3256 ("type " if mswindows else "cat ") + name) 3257 self.assertNotEqual(status, 0) 3258 finally: 3259 if dir is not None: 3260 os.rmdir(dir) 3261 3262 def test__all__(self): 3263 """Ensure that __all__ is populated properly.""" 3264 intentionally_excluded = {"list2cmdline", "Handle"} 3265 exported = set(subprocess.__all__) 3266 possible_exports = set() 3267 import types 3268 for name, value in subprocess.__dict__.items(): 3269 if name.startswith('_'): 3270 continue 3271 if isinstance(value, (types.ModuleType,)): 3272 continue 3273 possible_exports.add(name) 3274 self.assertEqual(exported, possible_exports - intentionally_excluded) 3275 3276 3277@unittest.skipUnless(hasattr(selectors, 'PollSelector'), 3278 "Test needs selectors.PollSelector") 3279class ProcessTestCaseNoPoll(ProcessTestCase): 3280 def setUp(self): 3281 self.orig_selector = subprocess._PopenSelector 3282 subprocess._PopenSelector = selectors.SelectSelector 3283 ProcessTestCase.setUp(self) 3284 3285 def tearDown(self): 3286 subprocess._PopenSelector = self.orig_selector 3287 ProcessTestCase.tearDown(self) 3288 3289 3290@unittest.skipUnless(mswindows, "Windows-specific tests") 3291class CommandsWithSpaces (BaseTestCase): 3292 3293 def setUp(self): 3294 super().setUp() 3295 f, fname = tempfile.mkstemp(".py", "te st") 3296 self.fname = fname.lower () 3297 os.write(f, b"import sys;" 3298 b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))" 3299 ) 3300 os.close(f) 3301 3302 def tearDown(self): 3303 os.remove(self.fname) 3304 super().tearDown() 3305 3306 def with_spaces(self, *args, **kwargs): 3307 kwargs['stdout'] = subprocess.PIPE 3308 p = subprocess.Popen(*args, **kwargs) 3309 with p: 3310 self.assertEqual( 3311 p.stdout.read ().decode("mbcs"), 3312 "2 [%r, 'ab cd']" % self.fname 3313 ) 3314 3315 def test_shell_string_with_spaces(self): 3316 # call() function with string argument with spaces on Windows 3317 self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, 3318 "ab cd"), shell=1) 3319 3320 def test_shell_sequence_with_spaces(self): 3321 # call() function with sequence argument with spaces on Windows 3322 self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1) 3323 3324 def test_noshell_string_with_spaces(self): 3325 # call() function with string argument with spaces on Windows 3326 self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, 3327 "ab cd")) 3328 3329 def test_noshell_sequence_with_spaces(self): 3330 # call() function with sequence argument with spaces on Windows 3331 self.with_spaces([sys.executable, self.fname, "ab cd"]) 3332 3333 3334class ContextManagerTests(BaseTestCase): 3335 3336 def test_pipe(self): 3337 with subprocess.Popen([sys.executable, "-c", 3338 "import sys;" 3339 "sys.stdout.write('stdout');" 3340 "sys.stderr.write('stderr');"], 3341 stdout=subprocess.PIPE, 3342 stderr=subprocess.PIPE) as proc: 3343 self.assertEqual(proc.stdout.read(), b"stdout") 3344 self.assertStderrEqual(proc.stderr.read(), b"stderr") 3345 3346 self.assertTrue(proc.stdout.closed) 3347 self.assertTrue(proc.stderr.closed) 3348 3349 def test_returncode(self): 3350 with subprocess.Popen([sys.executable, "-c", 3351 "import sys; sys.exit(100)"]) as proc: 3352 pass 3353 # __exit__ calls wait(), so the returncode should be set 3354 self.assertEqual(proc.returncode, 100) 3355 3356 def test_communicate_stdin(self): 3357 with subprocess.Popen([sys.executable, "-c", 3358 "import sys;" 3359 "sys.exit(sys.stdin.read() == 'context')"], 3360 stdin=subprocess.PIPE) as proc: 3361 proc.communicate(b"context") 3362 self.assertEqual(proc.returncode, 1) 3363 3364 def test_invalid_args(self): 3365 with self.assertRaises(NONEXISTING_ERRORS): 3366 with subprocess.Popen(NONEXISTING_CMD, 3367 stdout=subprocess.PIPE, 3368 stderr=subprocess.PIPE) as proc: 3369 pass 3370 3371 def test_broken_pipe_cleanup(self): 3372 """Broken pipe error should not prevent wait() (Issue 21619)""" 3373 proc = subprocess.Popen(ZERO_RETURN_CMD, 3374 stdin=subprocess.PIPE, 3375 bufsize=support.PIPE_MAX_SIZE*2) 3376 proc = proc.__enter__() 3377 # Prepare to send enough data to overflow any OS pipe buffering and 3378 # guarantee a broken pipe error. Data is held in BufferedWriter 3379 # buffer until closed. 3380 proc.stdin.write(b'x' * support.PIPE_MAX_SIZE) 3381 self.assertIsNone(proc.returncode) 3382 # EPIPE expected under POSIX; EINVAL under Windows 3383 self.assertRaises(OSError, proc.__exit__, None, None, None) 3384 self.assertEqual(proc.returncode, 0) 3385 self.assertTrue(proc.stdin.closed) 3386 3387 3388if __name__ == "__main__": 3389 unittest.main() 3390