1import unittest 2from unittest import mock 3from test import support 4import subprocess 5import sys 6import platform 7import signal 8import io 9import os 10import errno 11import tempfile 12import time 13import selectors 14import sysconfig 15import select 16import shutil 17import gc 18import textwrap 19 20try: 21 import ctypes 22except ImportError: 23 ctypes = None 24 25try: 26 import threading 27except ImportError: 28 threading = None 29 30if support.PGO: 31 raise unittest.SkipTest("test is not helpful for PGO") 32 33mswindows = (sys.platform == "win32") 34 35# 36# Depends on the following external programs: Python 37# 38 39if mswindows: 40 SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), ' 41 'os.O_BINARY);') 42else: 43 SETBINARY = '' 44 45 46class BaseTestCase(unittest.TestCase): 47 def setUp(self): 48 # Try to minimize the number of children we have so this test 49 # doesn't crash on some buildbots (Alphas in particular). 50 support.reap_children() 51 52 def tearDown(self): 53 for inst in subprocess._active: 54 inst.wait() 55 subprocess._cleanup() 56 self.assertFalse(subprocess._active, "subprocess._active not empty") 57 58 def assertStderrEqual(self, stderr, expected, msg=None): 59 # In a debug build, stuff like "[6580 refs]" is printed to stderr at 60 # shutdown time. That frustrates tests trying to check stderr produced 61 # from a spawned Python process. 62 actual = support.strip_python_stderr(stderr) 63 # strip_python_stderr also strips whitespace, so we do too. 64 expected = expected.strip() 65 self.assertEqual(actual, expected, msg) 66 67 68class PopenTestException(Exception): 69 pass 70 71 72class PopenExecuteChildRaises(subprocess.Popen): 73 """Popen subclass for testing cleanup of subprocess.PIPE filehandles when 74 _execute_child fails. 75 """ 76 def _execute_child(self, *args, **kwargs): 77 raise PopenTestException("Forced Exception for Test") 78 79 80class ProcessTestCase(BaseTestCase): 81 82 def test_io_buffered_by_default(self): 83 p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"], 84 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 85 stderr=subprocess.PIPE) 86 try: 87 self.assertIsInstance(p.stdin, io.BufferedIOBase) 88 self.assertIsInstance(p.stdout, io.BufferedIOBase) 89 self.assertIsInstance(p.stderr, io.BufferedIOBase) 90 finally: 91 p.stdin.close() 92 p.stdout.close() 93 p.stderr.close() 94 p.wait() 95 96 def test_io_unbuffered_works(self): 97 p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"], 98 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 99 stderr=subprocess.PIPE, bufsize=0) 100 try: 101 self.assertIsInstance(p.stdin, io.RawIOBase) 102 self.assertIsInstance(p.stdout, io.RawIOBase) 103 self.assertIsInstance(p.stderr, io.RawIOBase) 104 finally: 105 p.stdin.close() 106 p.stdout.close() 107 p.stderr.close() 108 p.wait() 109 110 def test_call_seq(self): 111 # call() function with sequence argument 112 rc = subprocess.call([sys.executable, "-c", 113 "import sys; sys.exit(47)"]) 114 self.assertEqual(rc, 47) 115 116 def test_call_timeout(self): 117 # call() function with timeout argument; we want to test that the child 118 # process gets killed when the timeout expires. If the child isn't 119 # killed, this call will deadlock since subprocess.call waits for the 120 # child. 121 self.assertRaises(subprocess.TimeoutExpired, subprocess.call, 122 [sys.executable, "-c", "while True: pass"], 123 timeout=0.1) 124 125 def test_check_call_zero(self): 126 # check_call() function with zero return code 127 rc = subprocess.check_call([sys.executable, "-c", 128 "import sys; sys.exit(0)"]) 129 self.assertEqual(rc, 0) 130 131 def test_check_call_nonzero(self): 132 # check_call() function with non-zero return code 133 with self.assertRaises(subprocess.CalledProcessError) as c: 134 subprocess.check_call([sys.executable, "-c", 135 "import sys; sys.exit(47)"]) 136 self.assertEqual(c.exception.returncode, 47) 137 138 def test_check_output(self): 139 # check_output() function with zero return code 140 output = subprocess.check_output( 141 [sys.executable, "-c", "print('BDFL')"]) 142 self.assertIn(b'BDFL', output) 143 144 def test_check_output_nonzero(self): 145 # check_call() function with non-zero return code 146 with self.assertRaises(subprocess.CalledProcessError) as c: 147 subprocess.check_output( 148 [sys.executable, "-c", "import sys; sys.exit(5)"]) 149 self.assertEqual(c.exception.returncode, 5) 150 151 def test_check_output_stderr(self): 152 # check_output() function stderr redirected to stdout 153 output = subprocess.check_output( 154 [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"], 155 stderr=subprocess.STDOUT) 156 self.assertIn(b'BDFL', output) 157 158 def test_check_output_stdin_arg(self): 159 # check_output() can be called with stdin set to a file 160 tf = tempfile.TemporaryFile() 161 self.addCleanup(tf.close) 162 tf.write(b'pear') 163 tf.seek(0) 164 output = subprocess.check_output( 165 [sys.executable, "-c", 166 "import sys; sys.stdout.write(sys.stdin.read().upper())"], 167 stdin=tf) 168 self.assertIn(b'PEAR', output) 169 170 def test_check_output_input_arg(self): 171 # check_output() can be called with input set to a string 172 output = subprocess.check_output( 173 [sys.executable, "-c", 174 "import sys; sys.stdout.write(sys.stdin.read().upper())"], 175 input=b'pear') 176 self.assertIn(b'PEAR', output) 177 178 def test_check_output_stdout_arg(self): 179 # check_output() refuses to accept 'stdout' argument 180 with self.assertRaises(ValueError) as c: 181 output = subprocess.check_output( 182 [sys.executable, "-c", "print('will not be run')"], 183 stdout=sys.stdout) 184 self.fail("Expected ValueError when stdout arg supplied.") 185 self.assertIn('stdout', c.exception.args[0]) 186 187 def test_check_output_stdin_with_input_arg(self): 188 # check_output() refuses to accept 'stdin' with 'input' 189 tf = tempfile.TemporaryFile() 190 self.addCleanup(tf.close) 191 tf.write(b'pear') 192 tf.seek(0) 193 with self.assertRaises(ValueError) as c: 194 output = subprocess.check_output( 195 [sys.executable, "-c", "print('will not be run')"], 196 stdin=tf, input=b'hare') 197 self.fail("Expected ValueError when stdin and input args supplied.") 198 self.assertIn('stdin', c.exception.args[0]) 199 self.assertIn('input', c.exception.args[0]) 200 201 def test_check_output_timeout(self): 202 # check_output() function with timeout arg 203 with self.assertRaises(subprocess.TimeoutExpired) as c: 204 output = subprocess.check_output( 205 [sys.executable, "-c", 206 "import sys, time\n" 207 "sys.stdout.write('BDFL')\n" 208 "sys.stdout.flush()\n" 209 "time.sleep(3600)"], 210 # Some heavily loaded buildbots (sparc Debian 3.x) require 211 # this much time to start and print. 212 timeout=3) 213 self.fail("Expected TimeoutExpired.") 214 self.assertEqual(c.exception.output, b'BDFL') 215 216 def test_call_kwargs(self): 217 # call() function with keyword args 218 newenv = os.environ.copy() 219 newenv["FRUIT"] = "banana" 220 rc = subprocess.call([sys.executable, "-c", 221 'import sys, os;' 222 'sys.exit(os.getenv("FRUIT")=="banana")'], 223 env=newenv) 224 self.assertEqual(rc, 1) 225 226 def test_invalid_args(self): 227 # Popen() called with invalid arguments should raise TypeError 228 # but Popen.__del__ should not complain (issue #12085) 229 with support.captured_stderr() as s: 230 self.assertRaises(TypeError, subprocess.Popen, invalid_arg_name=1) 231 argcount = subprocess.Popen.__init__.__code__.co_argcount 232 too_many_args = [0] * (argcount + 1) 233 self.assertRaises(TypeError, subprocess.Popen, *too_many_args) 234 self.assertEqual(s.getvalue(), '') 235 236 def test_stdin_none(self): 237 # .stdin is None when not redirected 238 p = subprocess.Popen([sys.executable, "-c", 'print("banana")'], 239 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 240 self.addCleanup(p.stdout.close) 241 self.addCleanup(p.stderr.close) 242 p.wait() 243 self.assertEqual(p.stdin, None) 244 245 def test_stdout_none(self): 246 # .stdout is None when not redirected, and the child's stdout will 247 # be inherited from the parent. In order to test this we run a 248 # subprocess in a subprocess: 249 # this_test 250 # \-- subprocess created by this test (parent) 251 # \-- subprocess created by the parent subprocess (child) 252 # The parent doesn't specify stdout, so the child will use the 253 # parent's stdout. This test checks that the message printed by the 254 # child goes to the parent stdout. The parent also checks that the 255 # child's stdout is None. See #11963. 256 code = ('import sys; from subprocess import Popen, PIPE;' 257 'p = Popen([sys.executable, "-c", "print(\'test_stdout_none\')"],' 258 ' stdin=PIPE, stderr=PIPE);' 259 'p.wait(); assert p.stdout is None;') 260 p = subprocess.Popen([sys.executable, "-c", code], 261 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 262 self.addCleanup(p.stdout.close) 263 self.addCleanup(p.stderr.close) 264 out, err = p.communicate() 265 self.assertEqual(p.returncode, 0, err) 266 self.assertEqual(out.rstrip(), b'test_stdout_none') 267 268 def test_stderr_none(self): 269 # .stderr is None when not redirected 270 p = subprocess.Popen([sys.executable, "-c", 'print("banana")'], 271 stdin=subprocess.PIPE, stdout=subprocess.PIPE) 272 self.addCleanup(p.stdout.close) 273 self.addCleanup(p.stdin.close) 274 p.wait() 275 self.assertEqual(p.stderr, None) 276 277 def _assert_python(self, pre_args, **kwargs): 278 # We include sys.exit() to prevent the test runner from hanging 279 # whenever python is found. 280 args = pre_args + ["import sys; sys.exit(47)"] 281 p = subprocess.Popen(args, **kwargs) 282 p.wait() 283 self.assertEqual(47, p.returncode) 284 285 def test_executable(self): 286 # Check that the executable argument works. 287 # 288 # On Unix (non-Mac and non-Windows), Python looks at args[0] to 289 # determine where its standard library is, so we need the directory 290 # of args[0] to be valid for the Popen() call to Python to succeed. 291 # See also issue #16170 and issue #7774. 292 doesnotexist = os.path.join(os.path.dirname(sys.executable), 293 "doesnotexist") 294 self._assert_python([doesnotexist, "-c"], executable=sys.executable) 295 296 def test_executable_takes_precedence(self): 297 # Check that the executable argument takes precedence over args[0]. 298 # 299 # Verify first that the call succeeds without the executable arg. 300 pre_args = [sys.executable, "-c"] 301 self._assert_python(pre_args) 302 self.assertRaises((FileNotFoundError, PermissionError), 303 self._assert_python, pre_args, 304 executable="doesnotexist") 305 306 @unittest.skipIf(mswindows, "executable argument replaces shell") 307 def test_executable_replaces_shell(self): 308 # Check that the executable argument replaces the default shell 309 # when shell=True. 310 self._assert_python([], executable=sys.executable, shell=True) 311 312 # For use in the test_cwd* tests below. 313 def _normalize_cwd(self, cwd): 314 # Normalize an expected cwd (for Tru64 support). 315 # We can't use os.path.realpath since it doesn't expand Tru64 {memb} 316 # strings. See bug #1063571. 317 with support.change_cwd(cwd): 318 return os.getcwd() 319 320 # For use in the test_cwd* tests below. 321 def _split_python_path(self): 322 # Return normalized (python_dir, python_base). 323 python_path = os.path.realpath(sys.executable) 324 return os.path.split(python_path) 325 326 # For use in the test_cwd* tests below. 327 def _assert_cwd(self, expected_cwd, python_arg, **kwargs): 328 # Invoke Python via Popen, and assert that (1) the call succeeds, 329 # and that (2) the current working directory of the child process 330 # matches *expected_cwd*. 331 p = subprocess.Popen([python_arg, "-c", 332 "import os, sys; " 333 "sys.stdout.write(os.getcwd()); " 334 "sys.exit(47)"], 335 stdout=subprocess.PIPE, 336 **kwargs) 337 self.addCleanup(p.stdout.close) 338 p.wait() 339 self.assertEqual(47, p.returncode) 340 normcase = os.path.normcase 341 self.assertEqual(normcase(expected_cwd), 342 normcase(p.stdout.read().decode("utf-8"))) 343 344 def test_cwd(self): 345 # Check that cwd changes the cwd for the child process. 346 temp_dir = tempfile.gettempdir() 347 temp_dir = self._normalize_cwd(temp_dir) 348 self._assert_cwd(temp_dir, sys.executable, cwd=temp_dir) 349 350 def test_cwd_with_pathlike(self): 351 temp_dir = tempfile.gettempdir() 352 temp_dir = self._normalize_cwd(temp_dir) 353 354 class _PathLikeObj: 355 def __fspath__(self): 356 return temp_dir 357 358 self._assert_cwd(temp_dir, sys.executable, cwd=_PathLikeObj()) 359 360 @unittest.skipIf(mswindows, "pending resolution of issue #15533") 361 def test_cwd_with_relative_arg(self): 362 # Check that Popen looks for args[0] relative to cwd if args[0] 363 # is relative. 364 python_dir, python_base = self._split_python_path() 365 rel_python = os.path.join(os.curdir, python_base) 366 with support.temp_cwd() as wrong_dir: 367 # Before calling with the correct cwd, confirm that the call fails 368 # without cwd and with the wrong cwd. 369 self.assertRaises(FileNotFoundError, subprocess.Popen, 370 [rel_python]) 371 self.assertRaises(FileNotFoundError, subprocess.Popen, 372 [rel_python], cwd=wrong_dir) 373 python_dir = self._normalize_cwd(python_dir) 374 self._assert_cwd(python_dir, rel_python, cwd=python_dir) 375 376 @unittest.skipIf(mswindows, "pending resolution of issue #15533") 377 def test_cwd_with_relative_executable(self): 378 # Check that Popen looks for executable relative to cwd if executable 379 # is relative (and that executable takes precedence over args[0]). 380 python_dir, python_base = self._split_python_path() 381 rel_python = os.path.join(os.curdir, python_base) 382 doesntexist = "somethingyoudonthave" 383 with support.temp_cwd() as wrong_dir: 384 # Before calling with the correct cwd, confirm that the call fails 385 # without cwd and with the wrong cwd. 386 self.assertRaises(FileNotFoundError, subprocess.Popen, 387 [doesntexist], executable=rel_python) 388 self.assertRaises(FileNotFoundError, subprocess.Popen, 389 [doesntexist], executable=rel_python, 390 cwd=wrong_dir) 391 python_dir = self._normalize_cwd(python_dir) 392 self._assert_cwd(python_dir, doesntexist, executable=rel_python, 393 cwd=python_dir) 394 395 def test_cwd_with_absolute_arg(self): 396 # Check that Popen can find the executable when the cwd is wrong 397 # if args[0] is an absolute path. 398 python_dir, python_base = self._split_python_path() 399 abs_python = os.path.join(python_dir, python_base) 400 rel_python = os.path.join(os.curdir, python_base) 401 with support.temp_dir() as wrong_dir: 402 # Before calling with an absolute path, confirm that using a 403 # relative path fails. 404 self.assertRaises(FileNotFoundError, subprocess.Popen, 405 [rel_python], cwd=wrong_dir) 406 wrong_dir = self._normalize_cwd(wrong_dir) 407 self._assert_cwd(wrong_dir, abs_python, cwd=wrong_dir) 408 409 @unittest.skipIf(sys.base_prefix != sys.prefix, 410 'Test is not venv-compatible') 411 def test_executable_with_cwd(self): 412 python_dir, python_base = self._split_python_path() 413 python_dir = self._normalize_cwd(python_dir) 414 self._assert_cwd(python_dir, "somethingyoudonthave", 415 executable=sys.executable, cwd=python_dir) 416 417 @unittest.skipIf(sys.base_prefix != sys.prefix, 418 'Test is not venv-compatible') 419 @unittest.skipIf(sysconfig.is_python_build(), 420 "need an installed Python. See #7774") 421 def test_executable_without_cwd(self): 422 # For a normal installation, it should work without 'cwd' 423 # argument. For test runs in the build directory, see #7774. 424 self._assert_cwd(os.getcwd(), "somethingyoudonthave", 425 executable=sys.executable) 426 427 def test_stdin_pipe(self): 428 # stdin redirection 429 p = subprocess.Popen([sys.executable, "-c", 430 'import sys; sys.exit(sys.stdin.read() == "pear")'], 431 stdin=subprocess.PIPE) 432 p.stdin.write(b"pear") 433 p.stdin.close() 434 p.wait() 435 self.assertEqual(p.returncode, 1) 436 437 def test_stdin_filedes(self): 438 # stdin is set to open file descriptor 439 tf = tempfile.TemporaryFile() 440 self.addCleanup(tf.close) 441 d = tf.fileno() 442 os.write(d, b"pear") 443 os.lseek(d, 0, 0) 444 p = subprocess.Popen([sys.executable, "-c", 445 'import sys; sys.exit(sys.stdin.read() == "pear")'], 446 stdin=d) 447 p.wait() 448 self.assertEqual(p.returncode, 1) 449 450 def test_stdin_fileobj(self): 451 # stdin is set to open file object 452 tf = tempfile.TemporaryFile() 453 self.addCleanup(tf.close) 454 tf.write(b"pear") 455 tf.seek(0) 456 p = subprocess.Popen([sys.executable, "-c", 457 'import sys; sys.exit(sys.stdin.read() == "pear")'], 458 stdin=tf) 459 p.wait() 460 self.assertEqual(p.returncode, 1) 461 462 def test_stdout_pipe(self): 463 # stdout redirection 464 p = subprocess.Popen([sys.executable, "-c", 465 'import sys; sys.stdout.write("orange")'], 466 stdout=subprocess.PIPE) 467 with p: 468 self.assertEqual(p.stdout.read(), b"orange") 469 470 def test_stdout_filedes(self): 471 # stdout is set to open file descriptor 472 tf = tempfile.TemporaryFile() 473 self.addCleanup(tf.close) 474 d = tf.fileno() 475 p = subprocess.Popen([sys.executable, "-c", 476 'import sys; sys.stdout.write("orange")'], 477 stdout=d) 478 p.wait() 479 os.lseek(d, 0, 0) 480 self.assertEqual(os.read(d, 1024), b"orange") 481 482 def test_stdout_fileobj(self): 483 # stdout is set to open file object 484 tf = tempfile.TemporaryFile() 485 self.addCleanup(tf.close) 486 p = subprocess.Popen([sys.executable, "-c", 487 'import sys; sys.stdout.write("orange")'], 488 stdout=tf) 489 p.wait() 490 tf.seek(0) 491 self.assertEqual(tf.read(), b"orange") 492 493 def test_stderr_pipe(self): 494 # stderr redirection 495 p = subprocess.Popen([sys.executable, "-c", 496 'import sys; sys.stderr.write("strawberry")'], 497 stderr=subprocess.PIPE) 498 with p: 499 self.assertStderrEqual(p.stderr.read(), b"strawberry") 500 501 def test_stderr_filedes(self): 502 # stderr is set to open file descriptor 503 tf = tempfile.TemporaryFile() 504 self.addCleanup(tf.close) 505 d = tf.fileno() 506 p = subprocess.Popen([sys.executable, "-c", 507 'import sys; sys.stderr.write("strawberry")'], 508 stderr=d) 509 p.wait() 510 os.lseek(d, 0, 0) 511 self.assertStderrEqual(os.read(d, 1024), b"strawberry") 512 513 def test_stderr_fileobj(self): 514 # stderr is set to open file object 515 tf = tempfile.TemporaryFile() 516 self.addCleanup(tf.close) 517 p = subprocess.Popen([sys.executable, "-c", 518 'import sys; sys.stderr.write("strawberry")'], 519 stderr=tf) 520 p.wait() 521 tf.seek(0) 522 self.assertStderrEqual(tf.read(), b"strawberry") 523 524 def test_stderr_redirect_with_no_stdout_redirect(self): 525 # test stderr=STDOUT while stdout=None (not set) 526 527 # - grandchild prints to stderr 528 # - child redirects grandchild's stderr to its stdout 529 # - the parent should get grandchild's stderr in child's stdout 530 p = subprocess.Popen([sys.executable, "-c", 531 'import sys, subprocess;' 532 'rc = subprocess.call([sys.executable, "-c",' 533 ' "import sys;"' 534 ' "sys.stderr.write(\'42\')"],' 535 ' stderr=subprocess.STDOUT);' 536 'sys.exit(rc)'], 537 stdout=subprocess.PIPE, 538 stderr=subprocess.PIPE) 539 stdout, stderr = p.communicate() 540 #NOTE: stdout should get stderr from grandchild 541 self.assertStderrEqual(stdout, b'42') 542 self.assertStderrEqual(stderr, b'') # should be empty 543 self.assertEqual(p.returncode, 0) 544 545 def test_stdout_stderr_pipe(self): 546 # capture stdout and stderr to the same pipe 547 p = subprocess.Popen([sys.executable, "-c", 548 'import sys;' 549 'sys.stdout.write("apple");' 550 'sys.stdout.flush();' 551 'sys.stderr.write("orange")'], 552 stdout=subprocess.PIPE, 553 stderr=subprocess.STDOUT) 554 with p: 555 self.assertStderrEqual(p.stdout.read(), b"appleorange") 556 557 def test_stdout_stderr_file(self): 558 # capture stdout and stderr to the same open file 559 tf = tempfile.TemporaryFile() 560 self.addCleanup(tf.close) 561 p = subprocess.Popen([sys.executable, "-c", 562 'import sys;' 563 'sys.stdout.write("apple");' 564 'sys.stdout.flush();' 565 'sys.stderr.write("orange")'], 566 stdout=tf, 567 stderr=tf) 568 p.wait() 569 tf.seek(0) 570 self.assertStderrEqual(tf.read(), b"appleorange") 571 572 def test_stdout_filedes_of_stdout(self): 573 # stdout is set to 1 (#1531862). 574 # To avoid printing the text on stdout, we do something similar to 575 # test_stdout_none (see above). The parent subprocess calls the child 576 # subprocess passing stdout=1, and this test uses stdout=PIPE in 577 # order to capture and check the output of the parent. See #11963. 578 code = ('import sys, subprocess; ' 579 'rc = subprocess.call([sys.executable, "-c", ' 580 ' "import os, sys; sys.exit(os.write(sys.stdout.fileno(), ' 581 'b\'test with stdout=1\'))"], stdout=1); ' 582 'assert rc == 18') 583 p = subprocess.Popen([sys.executable, "-c", code], 584 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 585 self.addCleanup(p.stdout.close) 586 self.addCleanup(p.stderr.close) 587 out, err = p.communicate() 588 self.assertEqual(p.returncode, 0, err) 589 self.assertEqual(out.rstrip(), b'test with stdout=1') 590 591 def test_stdout_devnull(self): 592 p = subprocess.Popen([sys.executable, "-c", 593 'for i in range(10240):' 594 'print("x" * 1024)'], 595 stdout=subprocess.DEVNULL) 596 p.wait() 597 self.assertEqual(p.stdout, None) 598 599 def test_stderr_devnull(self): 600 p = subprocess.Popen([sys.executable, "-c", 601 'import sys\n' 602 'for i in range(10240):' 603 'sys.stderr.write("x" * 1024)'], 604 stderr=subprocess.DEVNULL) 605 p.wait() 606 self.assertEqual(p.stderr, None) 607 608 def test_stdin_devnull(self): 609 p = subprocess.Popen([sys.executable, "-c", 610 'import sys;' 611 'sys.stdin.read(1)'], 612 stdin=subprocess.DEVNULL) 613 p.wait() 614 self.assertEqual(p.stdin, None) 615 616 def test_env(self): 617 newenv = os.environ.copy() 618 newenv["FRUIT"] = "orange" 619 with subprocess.Popen([sys.executable, "-c", 620 'import sys,os;' 621 'sys.stdout.write(os.getenv("FRUIT"))'], 622 stdout=subprocess.PIPE, 623 env=newenv) as p: 624 stdout, stderr = p.communicate() 625 self.assertEqual(stdout, b"orange") 626 627 # Windows requires at least the SYSTEMROOT environment variable to start 628 # Python 629 @unittest.skipIf(sys.platform == 'win32', 630 'cannot test an empty env on Windows') 631 @unittest.skipIf(sysconfig.get_config_var('Py_ENABLE_SHARED') is not None, 632 'the python library cannot be loaded ' 633 'with an empty environment') 634 def test_empty_env(self): 635 with subprocess.Popen([sys.executable, "-c", 636 'import os; ' 637 'print(list(os.environ.keys()))'], 638 stdout=subprocess.PIPE, 639 env={}) as p: 640 stdout, stderr = p.communicate() 641 self.assertIn(stdout.strip(), 642 (b"[]", 643 # Mac OS X adds __CF_USER_TEXT_ENCODING variable to an empty 644 # environment 645 b"['__CF_USER_TEXT_ENCODING']")) 646 647 def test_communicate_stdin(self): 648 p = subprocess.Popen([sys.executable, "-c", 649 'import sys;' 650 'sys.exit(sys.stdin.read() == "pear")'], 651 stdin=subprocess.PIPE) 652 p.communicate(b"pear") 653 self.assertEqual(p.returncode, 1) 654 655 def test_communicate_stdout(self): 656 p = subprocess.Popen([sys.executable, "-c", 657 'import sys; sys.stdout.write("pineapple")'], 658 stdout=subprocess.PIPE) 659 (stdout, stderr) = p.communicate() 660 self.assertEqual(stdout, b"pineapple") 661 self.assertEqual(stderr, None) 662 663 def test_communicate_stderr(self): 664 p = subprocess.Popen([sys.executable, "-c", 665 'import sys; sys.stderr.write("pineapple")'], 666 stderr=subprocess.PIPE) 667 (stdout, stderr) = p.communicate() 668 self.assertEqual(stdout, None) 669 self.assertStderrEqual(stderr, b"pineapple") 670 671 def test_communicate(self): 672 p = subprocess.Popen([sys.executable, "-c", 673 'import sys,os;' 674 'sys.stderr.write("pineapple");' 675 'sys.stdout.write(sys.stdin.read())'], 676 stdin=subprocess.PIPE, 677 stdout=subprocess.PIPE, 678 stderr=subprocess.PIPE) 679 self.addCleanup(p.stdout.close) 680 self.addCleanup(p.stderr.close) 681 self.addCleanup(p.stdin.close) 682 (stdout, stderr) = p.communicate(b"banana") 683 self.assertEqual(stdout, b"banana") 684 self.assertStderrEqual(stderr, b"pineapple") 685 686 def test_communicate_timeout(self): 687 p = subprocess.Popen([sys.executable, "-c", 688 'import sys,os,time;' 689 'sys.stderr.write("pineapple\\n");' 690 'time.sleep(1);' 691 'sys.stderr.write("pear\\n");' 692 'sys.stdout.write(sys.stdin.read())'], 693 universal_newlines=True, 694 stdin=subprocess.PIPE, 695 stdout=subprocess.PIPE, 696 stderr=subprocess.PIPE) 697 self.assertRaises(subprocess.TimeoutExpired, p.communicate, "banana", 698 timeout=0.3) 699 # Make sure we can keep waiting for it, and that we get the whole output 700 # after it completes. 701 (stdout, stderr) = p.communicate() 702 self.assertEqual(stdout, "banana") 703 self.assertStderrEqual(stderr.encode(), b"pineapple\npear\n") 704 705 def test_communicate_timeout_large_output(self): 706 # Test an expiring timeout while the child is outputting lots of data. 707 p = subprocess.Popen([sys.executable, "-c", 708 'import sys,os,time;' 709 'sys.stdout.write("a" * (64 * 1024));' 710 'time.sleep(0.2);' 711 'sys.stdout.write("a" * (64 * 1024));' 712 'time.sleep(0.2);' 713 'sys.stdout.write("a" * (64 * 1024));' 714 'time.sleep(0.2);' 715 'sys.stdout.write("a" * (64 * 1024));'], 716 stdout=subprocess.PIPE) 717 self.assertRaises(subprocess.TimeoutExpired, p.communicate, timeout=0.4) 718 (stdout, _) = p.communicate() 719 self.assertEqual(len(stdout), 4 * 64 * 1024) 720 721 # Test for the fd leak reported in http://bugs.python.org/issue2791. 722 def test_communicate_pipe_fd_leak(self): 723 for stdin_pipe in (False, True): 724 for stdout_pipe in (False, True): 725 for stderr_pipe in (False, True): 726 options = {} 727 if stdin_pipe: 728 options['stdin'] = subprocess.PIPE 729 if stdout_pipe: 730 options['stdout'] = subprocess.PIPE 731 if stderr_pipe: 732 options['stderr'] = subprocess.PIPE 733 if not options: 734 continue 735 p = subprocess.Popen((sys.executable, "-c", "pass"), **options) 736 p.communicate() 737 if p.stdin is not None: 738 self.assertTrue(p.stdin.closed) 739 if p.stdout is not None: 740 self.assertTrue(p.stdout.closed) 741 if p.stderr is not None: 742 self.assertTrue(p.stderr.closed) 743 744 def test_communicate_returns(self): 745 # communicate() should return None if no redirection is active 746 p = subprocess.Popen([sys.executable, "-c", 747 "import sys; sys.exit(47)"]) 748 (stdout, stderr) = p.communicate() 749 self.assertEqual(stdout, None) 750 self.assertEqual(stderr, None) 751 752 def test_communicate_pipe_buf(self): 753 # communicate() with writes larger than pipe_buf 754 # This test will probably deadlock rather than fail, if 755 # communicate() does not work properly. 756 x, y = os.pipe() 757 os.close(x) 758 os.close(y) 759 p = subprocess.Popen([sys.executable, "-c", 760 'import sys,os;' 761 'sys.stdout.write(sys.stdin.read(47));' 762 'sys.stderr.write("x" * %d);' 763 'sys.stdout.write(sys.stdin.read())' % 764 support.PIPE_MAX_SIZE], 765 stdin=subprocess.PIPE, 766 stdout=subprocess.PIPE, 767 stderr=subprocess.PIPE) 768 self.addCleanup(p.stdout.close) 769 self.addCleanup(p.stderr.close) 770 self.addCleanup(p.stdin.close) 771 string_to_write = b"a" * support.PIPE_MAX_SIZE 772 (stdout, stderr) = p.communicate(string_to_write) 773 self.assertEqual(stdout, string_to_write) 774 775 def test_writes_before_communicate(self): 776 # stdin.write before communicate() 777 p = subprocess.Popen([sys.executable, "-c", 778 'import sys,os;' 779 'sys.stdout.write(sys.stdin.read())'], 780 stdin=subprocess.PIPE, 781 stdout=subprocess.PIPE, 782 stderr=subprocess.PIPE) 783 self.addCleanup(p.stdout.close) 784 self.addCleanup(p.stderr.close) 785 self.addCleanup(p.stdin.close) 786 p.stdin.write(b"banana") 787 (stdout, stderr) = p.communicate(b"split") 788 self.assertEqual(stdout, b"bananasplit") 789 self.assertStderrEqual(stderr, b"") 790 791 def test_universal_newlines(self): 792 p = subprocess.Popen([sys.executable, "-c", 793 'import sys,os;' + SETBINARY + 794 'buf = sys.stdout.buffer;' 795 'buf.write(sys.stdin.readline().encode());' 796 'buf.flush();' 797 'buf.write(b"line2\\n");' 798 'buf.flush();' 799 'buf.write(sys.stdin.read().encode());' 800 'buf.flush();' 801 'buf.write(b"line4\\n");' 802 'buf.flush();' 803 'buf.write(b"line5\\r\\n");' 804 'buf.flush();' 805 'buf.write(b"line6\\r");' 806 'buf.flush();' 807 'buf.write(b"\\nline7");' 808 'buf.flush();' 809 'buf.write(b"\\nline8");'], 810 stdin=subprocess.PIPE, 811 stdout=subprocess.PIPE, 812 universal_newlines=1) 813 with p: 814 p.stdin.write("line1\n") 815 p.stdin.flush() 816 self.assertEqual(p.stdout.readline(), "line1\n") 817 p.stdin.write("line3\n") 818 p.stdin.close() 819 self.addCleanup(p.stdout.close) 820 self.assertEqual(p.stdout.readline(), 821 "line2\n") 822 self.assertEqual(p.stdout.read(6), 823 "line3\n") 824 self.assertEqual(p.stdout.read(), 825 "line4\nline5\nline6\nline7\nline8") 826 827 def test_universal_newlines_communicate(self): 828 # universal newlines through communicate() 829 p = subprocess.Popen([sys.executable, "-c", 830 'import sys,os;' + SETBINARY + 831 'buf = sys.stdout.buffer;' 832 'buf.write(b"line2\\n");' 833 'buf.flush();' 834 'buf.write(b"line4\\n");' 835 'buf.flush();' 836 'buf.write(b"line5\\r\\n");' 837 'buf.flush();' 838 'buf.write(b"line6\\r");' 839 'buf.flush();' 840 'buf.write(b"\\nline7");' 841 'buf.flush();' 842 'buf.write(b"\\nline8");'], 843 stderr=subprocess.PIPE, 844 stdout=subprocess.PIPE, 845 universal_newlines=1) 846 self.addCleanup(p.stdout.close) 847 self.addCleanup(p.stderr.close) 848 (stdout, stderr) = p.communicate() 849 self.assertEqual(stdout, 850 "line2\nline4\nline5\nline6\nline7\nline8") 851 852 def test_universal_newlines_communicate_stdin(self): 853 # universal newlines through communicate(), with only stdin 854 p = subprocess.Popen([sys.executable, "-c", 855 'import sys,os;' + SETBINARY + textwrap.dedent(''' 856 s = sys.stdin.readline() 857 assert s == "line1\\n", repr(s) 858 s = sys.stdin.read() 859 assert s == "line3\\n", repr(s) 860 ''')], 861 stdin=subprocess.PIPE, 862 universal_newlines=1) 863 (stdout, stderr) = p.communicate("line1\nline3\n") 864 self.assertEqual(p.returncode, 0) 865 866 def test_universal_newlines_communicate_input_none(self): 867 # Test communicate(input=None) with universal newlines. 868 # 869 # We set stdout to PIPE because, as of this writing, a different 870 # code path is tested when the number of pipes is zero or one. 871 p = subprocess.Popen([sys.executable, "-c", "pass"], 872 stdin=subprocess.PIPE, 873 stdout=subprocess.PIPE, 874 universal_newlines=True) 875 p.communicate() 876 self.assertEqual(p.returncode, 0) 877 878 def test_universal_newlines_communicate_stdin_stdout_stderr(self): 879 # universal newlines through communicate(), with stdin, stdout, stderr 880 p = subprocess.Popen([sys.executable, "-c", 881 'import sys,os;' + SETBINARY + textwrap.dedent(''' 882 s = sys.stdin.buffer.readline() 883 sys.stdout.buffer.write(s) 884 sys.stdout.buffer.write(b"line2\\r") 885 sys.stderr.buffer.write(b"eline2\\n") 886 s = sys.stdin.buffer.read() 887 sys.stdout.buffer.write(s) 888 sys.stdout.buffer.write(b"line4\\n") 889 sys.stdout.buffer.write(b"line5\\r\\n") 890 sys.stderr.buffer.write(b"eline6\\r") 891 sys.stderr.buffer.write(b"eline7\\r\\nz") 892 ''')], 893 stdin=subprocess.PIPE, 894 stderr=subprocess.PIPE, 895 stdout=subprocess.PIPE, 896 universal_newlines=True) 897 self.addCleanup(p.stdout.close) 898 self.addCleanup(p.stderr.close) 899 (stdout, stderr) = p.communicate("line1\nline3\n") 900 self.assertEqual(p.returncode, 0) 901 self.assertEqual("line1\nline2\nline3\nline4\nline5\n", stdout) 902 # Python debug build push something like "[42442 refs]\n" 903 # to stderr at exit of subprocess. 904 # Don't use assertStderrEqual because it strips CR and LF from output. 905 self.assertTrue(stderr.startswith("eline2\neline6\neline7\n")) 906 907 def test_universal_newlines_communicate_encodings(self): 908 # Check that universal newlines mode works for various encodings, 909 # in particular for encodings in the UTF-16 and UTF-32 families. 910 # See issue #15595. 911 # 912 # UTF-16 and UTF-32-BE are sufficient to check both with BOM and 913 # without, and UTF-16 and UTF-32. 914 for encoding in ['utf-16', 'utf-32-be']: 915 code = ("import sys; " 916 r"sys.stdout.buffer.write('1\r\n2\r3\n4'.encode('%s'))" % 917 encoding) 918 args = [sys.executable, '-c', code] 919 # We set stdin to be non-None because, as of this writing, 920 # a different code path is used when the number of pipes is 921 # zero or one. 922 popen = subprocess.Popen(args, 923 stdin=subprocess.PIPE, 924 stdout=subprocess.PIPE, 925 encoding=encoding) 926 stdout, stderr = popen.communicate(input='') 927 self.assertEqual(stdout, '1\n2\n3\n4') 928 929 def test_communicate_errors(self): 930 for errors, expected in [ 931 ('ignore', ''), 932 ('replace', '\ufffd\ufffd'), 933 ('surrogateescape', '\udc80\udc80'), 934 ('backslashreplace', '\\x80\\x80'), 935 ]: 936 code = ("import sys; " 937 r"sys.stdout.buffer.write(b'[\x80\x80]')") 938 args = [sys.executable, '-c', code] 939 # We set stdin to be non-None because, as of this writing, 940 # a different code path is used when the number of pipes is 941 # zero or one. 942 popen = subprocess.Popen(args, 943 stdin=subprocess.PIPE, 944 stdout=subprocess.PIPE, 945 encoding='utf-8', 946 errors=errors) 947 stdout, stderr = popen.communicate(input='') 948 self.assertEqual(stdout, '[{}]'.format(expected)) 949 950 def test_no_leaking(self): 951 # Make sure we leak no resources 952 if not mswindows: 953 max_handles = 1026 # too much for most UNIX systems 954 else: 955 max_handles = 2050 # too much for (at least some) Windows setups 956 handles = [] 957 tmpdir = tempfile.mkdtemp() 958 try: 959 for i in range(max_handles): 960 try: 961 tmpfile = os.path.join(tmpdir, support.TESTFN) 962 handles.append(os.open(tmpfile, os.O_WRONLY|os.O_CREAT)) 963 except OSError as e: 964 if e.errno != errno.EMFILE: 965 raise 966 break 967 else: 968 self.skipTest("failed to reach the file descriptor limit " 969 "(tried %d)" % max_handles) 970 # Close a couple of them (should be enough for a subprocess) 971 for i in range(10): 972 os.close(handles.pop()) 973 # Loop creating some subprocesses. If one of them leaks some fds, 974 # the next loop iteration will fail by reaching the max fd limit. 975 for i in range(15): 976 p = subprocess.Popen([sys.executable, "-c", 977 "import sys;" 978 "sys.stdout.write(sys.stdin.read())"], 979 stdin=subprocess.PIPE, 980 stdout=subprocess.PIPE, 981 stderr=subprocess.PIPE) 982 data = p.communicate(b"lime")[0] 983 self.assertEqual(data, b"lime") 984 finally: 985 for h in handles: 986 os.close(h) 987 shutil.rmtree(tmpdir) 988 989 def test_list2cmdline(self): 990 self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']), 991 '"a b c" d e') 992 self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']), 993 'ab\\"c \\ d') 994 self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']), 995 'ab\\"c " \\\\" d') 996 self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']), 997 'a\\\\\\b "de fg" h') 998 self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']), 999 'a\\\\\\"b c d') 1000 self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']), 1001 '"a\\\\b c" d e') 1002 self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']), 1003 '"a\\\\b\\ c" d e') 1004 self.assertEqual(subprocess.list2cmdline(['ab', '']), 1005 'ab ""') 1006 1007 def test_poll(self): 1008 p = subprocess.Popen([sys.executable, "-c", 1009 "import os; os.read(0, 1)"], 1010 stdin=subprocess.PIPE) 1011 self.addCleanup(p.stdin.close) 1012 self.assertIsNone(p.poll()) 1013 os.write(p.stdin.fileno(), b'A') 1014 p.wait() 1015 # Subsequent invocations should just return the returncode 1016 self.assertEqual(p.poll(), 0) 1017 1018 def test_wait(self): 1019 p = subprocess.Popen([sys.executable, "-c", "pass"]) 1020 self.assertEqual(p.wait(), 0) 1021 # Subsequent invocations should just return the returncode 1022 self.assertEqual(p.wait(), 0) 1023 1024 def test_wait_timeout(self): 1025 p = subprocess.Popen([sys.executable, 1026 "-c", "import time; time.sleep(0.3)"]) 1027 with self.assertRaises(subprocess.TimeoutExpired) as c: 1028 p.wait(timeout=0.0001) 1029 self.assertIn("0.0001", str(c.exception)) # For coverage of __str__. 1030 # Some heavily loaded buildbots (sparc Debian 3.x) require this much 1031 # time to start. 1032 self.assertEqual(p.wait(timeout=3), 0) 1033 1034 def test_wait_endtime(self): 1035 """Confirm that the deprecated endtime parameter warns.""" 1036 p = subprocess.Popen([sys.executable, "-c", "pass"]) 1037 try: 1038 with self.assertWarns(DeprecationWarning) as warn_cm: 1039 p.wait(endtime=time.time()+0.01) 1040 except subprocess.TimeoutExpired: 1041 pass # We're not testing endtime timeout behavior. 1042 finally: 1043 p.kill() 1044 self.assertIn('test_subprocess.py', warn_cm.filename) 1045 self.assertIn('endtime', str(warn_cm.warning)) 1046 1047 def test_invalid_bufsize(self): 1048 # an invalid type of the bufsize argument should raise 1049 # TypeError. 1050 with self.assertRaises(TypeError): 1051 subprocess.Popen([sys.executable, "-c", "pass"], "orange") 1052 1053 def test_bufsize_is_none(self): 1054 # bufsize=None should be the same as bufsize=0. 1055 p = subprocess.Popen([sys.executable, "-c", "pass"], None) 1056 self.assertEqual(p.wait(), 0) 1057 # Again with keyword arg 1058 p = subprocess.Popen([sys.executable, "-c", "pass"], bufsize=None) 1059 self.assertEqual(p.wait(), 0) 1060 1061 def _test_bufsize_equal_one(self, line, expected, universal_newlines): 1062 # subprocess may deadlock with bufsize=1, see issue #21332 1063 with subprocess.Popen([sys.executable, "-c", "import sys;" 1064 "sys.stdout.write(sys.stdin.readline());" 1065 "sys.stdout.flush()"], 1066 stdin=subprocess.PIPE, 1067 stdout=subprocess.PIPE, 1068 stderr=subprocess.DEVNULL, 1069 bufsize=1, 1070 universal_newlines=universal_newlines) as p: 1071 p.stdin.write(line) # expect that it flushes the line in text mode 1072 os.close(p.stdin.fileno()) # close it without flushing the buffer 1073 read_line = p.stdout.readline() 1074 try: 1075 p.stdin.close() 1076 except OSError: 1077 pass 1078 p.stdin = None 1079 self.assertEqual(p.returncode, 0) 1080 self.assertEqual(read_line, expected) 1081 1082 def test_bufsize_equal_one_text_mode(self): 1083 # line is flushed in text mode with bufsize=1. 1084 # we should get the full line in return 1085 line = "line\n" 1086 self._test_bufsize_equal_one(line, line, universal_newlines=True) 1087 1088 def test_bufsize_equal_one_binary_mode(self): 1089 # line is not flushed in binary mode with bufsize=1. 1090 # we should get empty response 1091 line = b'line' + os.linesep.encode() # assume ascii-based locale 1092 self._test_bufsize_equal_one(line, b'', universal_newlines=False) 1093 1094 def test_leaking_fds_on_error(self): 1095 # see bug #5179: Popen leaks file descriptors to PIPEs if 1096 # the child fails to execute; this will eventually exhaust 1097 # the maximum number of open fds. 1024 seems a very common 1098 # value for that limit, but Windows has 2048, so we loop 1099 # 1024 times (each call leaked two fds). 1100 for i in range(1024): 1101 with self.assertRaises(OSError) as c: 1102 subprocess.Popen(['nonexisting_i_hope'], 1103 stdout=subprocess.PIPE, 1104 stderr=subprocess.PIPE) 1105 # ignore errors that indicate the command was not found 1106 if c.exception.errno not in (errno.ENOENT, errno.EACCES): 1107 raise c.exception 1108 1109 @unittest.skipIf(threading is None, "threading required") 1110 def test_double_close_on_error(self): 1111 # Issue #18851 1112 fds = [] 1113 def open_fds(): 1114 for i in range(20): 1115 fds.extend(os.pipe()) 1116 time.sleep(0.001) 1117 t = threading.Thread(target=open_fds) 1118 t.start() 1119 try: 1120 with self.assertRaises(EnvironmentError): 1121 subprocess.Popen(['nonexisting_i_hope'], 1122 stdin=subprocess.PIPE, 1123 stdout=subprocess.PIPE, 1124 stderr=subprocess.PIPE) 1125 finally: 1126 t.join() 1127 exc = None 1128 for fd in fds: 1129 # If a double close occurred, some of those fds will 1130 # already have been closed by mistake, and os.close() 1131 # here will raise. 1132 try: 1133 os.close(fd) 1134 except OSError as e: 1135 exc = e 1136 if exc is not None: 1137 raise exc 1138 1139 @unittest.skipIf(threading is None, "threading required") 1140 def test_threadsafe_wait(self): 1141 """Issue21291: Popen.wait() needs to be threadsafe for returncode.""" 1142 proc = subprocess.Popen([sys.executable, '-c', 1143 'import time; time.sleep(12)']) 1144 self.assertEqual(proc.returncode, None) 1145 results = [] 1146 1147 def kill_proc_timer_thread(): 1148 results.append(('thread-start-poll-result', proc.poll())) 1149 # terminate it from the thread and wait for the result. 1150 proc.kill() 1151 proc.wait() 1152 results.append(('thread-after-kill-and-wait', proc.returncode)) 1153 # this wait should be a no-op given the above. 1154 proc.wait() 1155 results.append(('thread-after-second-wait', proc.returncode)) 1156 1157 # This is a timing sensitive test, the failure mode is 1158 # triggered when both the main thread and this thread are in 1159 # the wait() call at once. The delay here is to allow the 1160 # main thread to most likely be blocked in its wait() call. 1161 t = threading.Timer(0.2, kill_proc_timer_thread) 1162 t.start() 1163 1164 if mswindows: 1165 expected_errorcode = 1 1166 else: 1167 # Should be -9 because of the proc.kill() from the thread. 1168 expected_errorcode = -9 1169 1170 # Wait for the process to finish; the thread should kill it 1171 # long before it finishes on its own. Supplying a timeout 1172 # triggers a different code path for better coverage. 1173 proc.wait(timeout=20) 1174 self.assertEqual(proc.returncode, expected_errorcode, 1175 msg="unexpected result in wait from main thread") 1176 1177 # This should be a no-op with no change in returncode. 1178 proc.wait() 1179 self.assertEqual(proc.returncode, expected_errorcode, 1180 msg="unexpected result in second main wait.") 1181 1182 t.join() 1183 # Ensure that all of the thread results are as expected. 1184 # When a race condition occurs in wait(), the returncode could 1185 # be set by the wrong thread that doesn't actually have it 1186 # leading to an incorrect value. 1187 self.assertEqual([('thread-start-poll-result', None), 1188 ('thread-after-kill-and-wait', expected_errorcode), 1189 ('thread-after-second-wait', expected_errorcode)], 1190 results) 1191 1192 def test_issue8780(self): 1193 # Ensure that stdout is inherited from the parent 1194 # if stdout=PIPE is not used 1195 code = ';'.join(( 1196 'import subprocess, sys', 1197 'retcode = subprocess.call(' 1198 "[sys.executable, '-c', 'print(\"Hello World!\")'])", 1199 'assert retcode == 0')) 1200 output = subprocess.check_output([sys.executable, '-c', code]) 1201 self.assertTrue(output.startswith(b'Hello World!'), ascii(output)) 1202 1203 def test_handles_closed_on_exception(self): 1204 # If CreateProcess exits with an error, ensure the 1205 # duplicate output handles are released 1206 ifhandle, ifname = tempfile.mkstemp() 1207 ofhandle, ofname = tempfile.mkstemp() 1208 efhandle, efname = tempfile.mkstemp() 1209 try: 1210 subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle, 1211 stderr=efhandle) 1212 except OSError: 1213 os.close(ifhandle) 1214 os.remove(ifname) 1215 os.close(ofhandle) 1216 os.remove(ofname) 1217 os.close(efhandle) 1218 os.remove(efname) 1219 self.assertFalse(os.path.exists(ifname)) 1220 self.assertFalse(os.path.exists(ofname)) 1221 self.assertFalse(os.path.exists(efname)) 1222 1223 def test_communicate_epipe(self): 1224 # Issue 10963: communicate() should hide EPIPE 1225 p = subprocess.Popen([sys.executable, "-c", 'pass'], 1226 stdin=subprocess.PIPE, 1227 stdout=subprocess.PIPE, 1228 stderr=subprocess.PIPE) 1229 self.addCleanup(p.stdout.close) 1230 self.addCleanup(p.stderr.close) 1231 self.addCleanup(p.stdin.close) 1232 p.communicate(b"x" * 2**20) 1233 1234 def test_communicate_epipe_only_stdin(self): 1235 # Issue 10963: communicate() should hide EPIPE 1236 p = subprocess.Popen([sys.executable, "-c", 'pass'], 1237 stdin=subprocess.PIPE) 1238 self.addCleanup(p.stdin.close) 1239 p.wait() 1240 p.communicate(b"x" * 2**20) 1241 1242 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 1243 "Requires signal.SIGUSR1") 1244 @unittest.skipUnless(hasattr(os, 'kill'), 1245 "Requires os.kill") 1246 @unittest.skipUnless(hasattr(os, 'getppid'), 1247 "Requires os.getppid") 1248 def test_communicate_eintr(self): 1249 # Issue #12493: communicate() should handle EINTR 1250 def handler(signum, frame): 1251 pass 1252 old_handler = signal.signal(signal.SIGUSR1, handler) 1253 self.addCleanup(signal.signal, signal.SIGUSR1, old_handler) 1254 1255 args = [sys.executable, "-c", 1256 'import os, signal;' 1257 'os.kill(os.getppid(), signal.SIGUSR1)'] 1258 for stream in ('stdout', 'stderr'): 1259 kw = {stream: subprocess.PIPE} 1260 with subprocess.Popen(args, **kw) as process: 1261 # communicate() will be interrupted by SIGUSR1 1262 process.communicate() 1263 1264 1265 # This test is Linux-ish specific for simplicity to at least have 1266 # some coverage. It is not a platform specific bug. 1267 @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()), 1268 "Linux specific") 1269 def test_failed_child_execute_fd_leak(self): 1270 """Test for the fork() failure fd leak reported in issue16327.""" 1271 fd_directory = '/proc/%d/fd' % os.getpid() 1272 fds_before_popen = os.listdir(fd_directory) 1273 with self.assertRaises(PopenTestException): 1274 PopenExecuteChildRaises( 1275 [sys.executable, '-c', 'pass'], stdin=subprocess.PIPE, 1276 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 1277 1278 # NOTE: This test doesn't verify that the real _execute_child 1279 # does not close the file descriptors itself on the way out 1280 # during an exception. Code inspection has confirmed that. 1281 1282 fds_after_exception = os.listdir(fd_directory) 1283 self.assertEqual(fds_before_popen, fds_after_exception) 1284 1285 1286class RunFuncTestCase(BaseTestCase): 1287 def run_python(self, code, **kwargs): 1288 """Run Python code in a subprocess using subprocess.run""" 1289 argv = [sys.executable, "-c", code] 1290 return subprocess.run(argv, **kwargs) 1291 1292 def test_returncode(self): 1293 # call() function with sequence argument 1294 cp = self.run_python("import sys; sys.exit(47)") 1295 self.assertEqual(cp.returncode, 47) 1296 with self.assertRaises(subprocess.CalledProcessError): 1297 cp.check_returncode() 1298 1299 def test_check(self): 1300 with self.assertRaises(subprocess.CalledProcessError) as c: 1301 self.run_python("import sys; sys.exit(47)", check=True) 1302 self.assertEqual(c.exception.returncode, 47) 1303 1304 def test_check_zero(self): 1305 # check_returncode shouldn't raise when returncode is zero 1306 cp = self.run_python("import sys; sys.exit(0)", check=True) 1307 self.assertEqual(cp.returncode, 0) 1308 1309 def test_timeout(self): 1310 # run() function with timeout argument; we want to test that the child 1311 # process gets killed when the timeout expires. If the child isn't 1312 # killed, this call will deadlock since subprocess.run waits for the 1313 # child. 1314 with self.assertRaises(subprocess.TimeoutExpired): 1315 self.run_python("while True: pass", timeout=0.0001) 1316 1317 def test_capture_stdout(self): 1318 # capture stdout with zero return code 1319 cp = self.run_python("print('BDFL')", stdout=subprocess.PIPE) 1320 self.assertIn(b'BDFL', cp.stdout) 1321 1322 def test_capture_stderr(self): 1323 cp = self.run_python("import sys; sys.stderr.write('BDFL')", 1324 stderr=subprocess.PIPE) 1325 self.assertIn(b'BDFL', cp.stderr) 1326 1327 def test_check_output_stdin_arg(self): 1328 # run() can be called with stdin set to a file 1329 tf = tempfile.TemporaryFile() 1330 self.addCleanup(tf.close) 1331 tf.write(b'pear') 1332 tf.seek(0) 1333 cp = self.run_python( 1334 "import sys; sys.stdout.write(sys.stdin.read().upper())", 1335 stdin=tf, stdout=subprocess.PIPE) 1336 self.assertIn(b'PEAR', cp.stdout) 1337 1338 def test_check_output_input_arg(self): 1339 # check_output() can be called with input set to a string 1340 cp = self.run_python( 1341 "import sys; sys.stdout.write(sys.stdin.read().upper())", 1342 input=b'pear', stdout=subprocess.PIPE) 1343 self.assertIn(b'PEAR', cp.stdout) 1344 1345 def test_check_output_stdin_with_input_arg(self): 1346 # run() refuses to accept 'stdin' with 'input' 1347 tf = tempfile.TemporaryFile() 1348 self.addCleanup(tf.close) 1349 tf.write(b'pear') 1350 tf.seek(0) 1351 with self.assertRaises(ValueError, 1352 msg="Expected ValueError when stdin and input args supplied.") as c: 1353 output = self.run_python("print('will not be run')", 1354 stdin=tf, input=b'hare') 1355 self.assertIn('stdin', c.exception.args[0]) 1356 self.assertIn('input', c.exception.args[0]) 1357 1358 def test_check_output_timeout(self): 1359 with self.assertRaises(subprocess.TimeoutExpired) as c: 1360 cp = self.run_python(( 1361 "import sys, time\n" 1362 "sys.stdout.write('BDFL')\n" 1363 "sys.stdout.flush()\n" 1364 "time.sleep(3600)"), 1365 # Some heavily loaded buildbots (sparc Debian 3.x) require 1366 # this much time to start and print. 1367 timeout=3, stdout=subprocess.PIPE) 1368 self.assertEqual(c.exception.output, b'BDFL') 1369 # output is aliased to stdout 1370 self.assertEqual(c.exception.stdout, b'BDFL') 1371 1372 def test_run_kwargs(self): 1373 newenv = os.environ.copy() 1374 newenv["FRUIT"] = "banana" 1375 cp = self.run_python(('import sys, os;' 1376 'sys.exit(33 if os.getenv("FRUIT")=="banana" else 31)'), 1377 env=newenv) 1378 self.assertEqual(cp.returncode, 33) 1379 1380 1381@unittest.skipIf(mswindows, "POSIX specific tests") 1382class POSIXProcessTestCase(BaseTestCase): 1383 1384 def setUp(self): 1385 super().setUp() 1386 self._nonexistent_dir = "/_this/pa.th/does/not/exist" 1387 1388 def _get_chdir_exception(self): 1389 try: 1390 os.chdir(self._nonexistent_dir) 1391 except OSError as e: 1392 # This avoids hard coding the errno value or the OS perror() 1393 # string and instead capture the exception that we want to see 1394 # below for comparison. 1395 desired_exception = e 1396 desired_exception.strerror += ': ' + repr(self._nonexistent_dir) 1397 else: 1398 self.fail("chdir to nonexistent directory %s succeeded." % 1399 self._nonexistent_dir) 1400 return desired_exception 1401 1402 def test_exception_cwd(self): 1403 """Test error in the child raised in the parent for a bad cwd.""" 1404 desired_exception = self._get_chdir_exception() 1405 try: 1406 p = subprocess.Popen([sys.executable, "-c", ""], 1407 cwd=self._nonexistent_dir) 1408 except OSError as e: 1409 # Test that the child process chdir failure actually makes 1410 # it up to the parent process as the correct exception. 1411 self.assertEqual(desired_exception.errno, e.errno) 1412 self.assertEqual(desired_exception.strerror, e.strerror) 1413 else: 1414 self.fail("Expected OSError: %s" % desired_exception) 1415 1416 def test_exception_bad_executable(self): 1417 """Test error in the child raised in the parent for a bad executable.""" 1418 desired_exception = self._get_chdir_exception() 1419 try: 1420 p = subprocess.Popen([sys.executable, "-c", ""], 1421 executable=self._nonexistent_dir) 1422 except OSError as e: 1423 # Test that the child process exec failure actually makes 1424 # it up to the parent process as the correct exception. 1425 self.assertEqual(desired_exception.errno, e.errno) 1426 self.assertEqual(desired_exception.strerror, e.strerror) 1427 else: 1428 self.fail("Expected OSError: %s" % desired_exception) 1429 1430 def test_exception_bad_args_0(self): 1431 """Test error in the child raised in the parent for a bad args[0].""" 1432 desired_exception = self._get_chdir_exception() 1433 try: 1434 p = subprocess.Popen([self._nonexistent_dir, "-c", ""]) 1435 except OSError as e: 1436 # Test that the child process exec failure actually makes 1437 # it up to the parent process as the correct exception. 1438 self.assertEqual(desired_exception.errno, e.errno) 1439 self.assertEqual(desired_exception.strerror, e.strerror) 1440 else: 1441 self.fail("Expected OSError: %s" % desired_exception) 1442 1443 def test_restore_signals(self): 1444 # Code coverage for both values of restore_signals to make sure it 1445 # at least does not blow up. 1446 # A test for behavior would be complex. Contributions welcome. 1447 subprocess.call([sys.executable, "-c", ""], restore_signals=True) 1448 subprocess.call([sys.executable, "-c", ""], restore_signals=False) 1449 1450 def test_start_new_session(self): 1451 # For code coverage of calling setsid(). We don't care if we get an 1452 # EPERM error from it depending on the test execution environment, that 1453 # still indicates that it was called. 1454 try: 1455 output = subprocess.check_output( 1456 [sys.executable, "-c", 1457 "import os; print(os.getpgid(os.getpid()))"], 1458 start_new_session=True) 1459 except OSError as e: 1460 if e.errno != errno.EPERM: 1461 raise 1462 else: 1463 parent_pgid = os.getpgid(os.getpid()) 1464 child_pgid = int(output) 1465 self.assertNotEqual(parent_pgid, child_pgid) 1466 1467 def test_run_abort(self): 1468 # returncode handles signal termination 1469 with support.SuppressCrashReport(): 1470 p = subprocess.Popen([sys.executable, "-c", 1471 'import os; os.abort()']) 1472 p.wait() 1473 self.assertEqual(-p.returncode, signal.SIGABRT) 1474 1475 def test_CalledProcessError_str_signal(self): 1476 err = subprocess.CalledProcessError(-int(signal.SIGABRT), "fake cmd") 1477 error_string = str(err) 1478 # We're relying on the repr() of the signal.Signals intenum to provide 1479 # the word signal, the signal name and the numeric value. 1480 self.assertIn("signal", error_string.lower()) 1481 # We're not being specific about the signal name as some signals have 1482 # multiple names and which name is revealed can vary. 1483 self.assertIn("SIG", error_string) 1484 self.assertIn(str(signal.SIGABRT), error_string) 1485 1486 def test_CalledProcessError_str_unknown_signal(self): 1487 err = subprocess.CalledProcessError(-9876543, "fake cmd") 1488 error_string = str(err) 1489 self.assertIn("unknown signal 9876543.", error_string) 1490 1491 def test_CalledProcessError_str_non_zero(self): 1492 err = subprocess.CalledProcessError(2, "fake cmd") 1493 error_string = str(err) 1494 self.assertIn("non-zero exit status 2.", error_string) 1495 1496 def test_preexec(self): 1497 # DISCLAIMER: Setting environment variables is *not* a good use 1498 # of a preexec_fn. This is merely a test. 1499 p = subprocess.Popen([sys.executable, "-c", 1500 'import sys,os;' 1501 'sys.stdout.write(os.getenv("FRUIT"))'], 1502 stdout=subprocess.PIPE, 1503 preexec_fn=lambda: os.putenv("FRUIT", "apple")) 1504 with p: 1505 self.assertEqual(p.stdout.read(), b"apple") 1506 1507 def test_preexec_exception(self): 1508 def raise_it(): 1509 raise ValueError("What if two swallows carried a coconut?") 1510 try: 1511 p = subprocess.Popen([sys.executable, "-c", ""], 1512 preexec_fn=raise_it) 1513 except subprocess.SubprocessError as e: 1514 self.assertTrue( 1515 subprocess._posixsubprocess, 1516 "Expected a ValueError from the preexec_fn") 1517 except ValueError as e: 1518 self.assertIn("coconut", e.args[0]) 1519 else: 1520 self.fail("Exception raised by preexec_fn did not make it " 1521 "to the parent process.") 1522 1523 class _TestExecuteChildPopen(subprocess.Popen): 1524 """Used to test behavior at the end of _execute_child.""" 1525 def __init__(self, testcase, *args, **kwargs): 1526 self._testcase = testcase 1527 subprocess.Popen.__init__(self, *args, **kwargs) 1528 1529 def _execute_child(self, *args, **kwargs): 1530 try: 1531 subprocess.Popen._execute_child(self, *args, **kwargs) 1532 finally: 1533 # Open a bunch of file descriptors and verify that 1534 # none of them are the same as the ones the Popen 1535 # instance is using for stdin/stdout/stderr. 1536 devzero_fds = [os.open("/dev/zero", os.O_RDONLY) 1537 for _ in range(8)] 1538 try: 1539 for fd in devzero_fds: 1540 self._testcase.assertNotIn( 1541 fd, (self.stdin.fileno(), self.stdout.fileno(), 1542 self.stderr.fileno()), 1543 msg="At least one fd was closed early.") 1544 finally: 1545 for fd in devzero_fds: 1546 os.close(fd) 1547 1548 @unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.") 1549 def test_preexec_errpipe_does_not_double_close_pipes(self): 1550 """Issue16140: Don't double close pipes on preexec error.""" 1551 1552 def raise_it(): 1553 raise subprocess.SubprocessError( 1554 "force the _execute_child() errpipe_data path.") 1555 1556 with self.assertRaises(subprocess.SubprocessError): 1557 self._TestExecuteChildPopen( 1558 self, [sys.executable, "-c", "pass"], 1559 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 1560 stderr=subprocess.PIPE, preexec_fn=raise_it) 1561 1562 def test_preexec_gc_module_failure(self): 1563 # This tests the code that disables garbage collection if the child 1564 # process will execute any Python. 1565 def raise_runtime_error(): 1566 raise RuntimeError("this shouldn't escape") 1567 enabled = gc.isenabled() 1568 orig_gc_disable = gc.disable 1569 orig_gc_isenabled = gc.isenabled 1570 try: 1571 gc.disable() 1572 self.assertFalse(gc.isenabled()) 1573 subprocess.call([sys.executable, '-c', ''], 1574 preexec_fn=lambda: None) 1575 self.assertFalse(gc.isenabled(), 1576 "Popen enabled gc when it shouldn't.") 1577 1578 gc.enable() 1579 self.assertTrue(gc.isenabled()) 1580 subprocess.call([sys.executable, '-c', ''], 1581 preexec_fn=lambda: None) 1582 self.assertTrue(gc.isenabled(), "Popen left gc disabled.") 1583 1584 gc.disable = raise_runtime_error 1585 self.assertRaises(RuntimeError, subprocess.Popen, 1586 [sys.executable, '-c', ''], 1587 preexec_fn=lambda: None) 1588 1589 del gc.isenabled # force an AttributeError 1590 self.assertRaises(AttributeError, subprocess.Popen, 1591 [sys.executable, '-c', ''], 1592 preexec_fn=lambda: None) 1593 finally: 1594 gc.disable = orig_gc_disable 1595 gc.isenabled = orig_gc_isenabled 1596 if not enabled: 1597 gc.disable() 1598 1599 @unittest.skipIf( 1600 sys.platform == 'darwin', 'setrlimit() seems to fail on OS X') 1601 def test_preexec_fork_failure(self): 1602 # The internal code did not preserve the previous exception when 1603 # re-enabling garbage collection 1604 try: 1605 from resource import getrlimit, setrlimit, RLIMIT_NPROC 1606 except ImportError as err: 1607 self.skipTest(err) # RLIMIT_NPROC is specific to Linux and BSD 1608 limits = getrlimit(RLIMIT_NPROC) 1609 [_, hard] = limits 1610 setrlimit(RLIMIT_NPROC, (0, hard)) 1611 self.addCleanup(setrlimit, RLIMIT_NPROC, limits) 1612 try: 1613 subprocess.call([sys.executable, '-c', ''], 1614 preexec_fn=lambda: None) 1615 except BlockingIOError: 1616 # Forking should raise EAGAIN, translated to BlockingIOError 1617 pass 1618 else: 1619 self.skipTest('RLIMIT_NPROC had no effect; probably superuser') 1620 1621 def test_args_string(self): 1622 # args is a string 1623 fd, fname = tempfile.mkstemp() 1624 # reopen in text mode 1625 with open(fd, "w", errors="surrogateescape") as fobj: 1626 fobj.write("#!%s\n" % support.unix_shell) 1627 fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" % 1628 sys.executable) 1629 os.chmod(fname, 0o700) 1630 p = subprocess.Popen(fname) 1631 p.wait() 1632 os.remove(fname) 1633 self.assertEqual(p.returncode, 47) 1634 1635 def test_invalid_args(self): 1636 # invalid arguments should raise ValueError 1637 self.assertRaises(ValueError, subprocess.call, 1638 [sys.executable, "-c", 1639 "import sys; sys.exit(47)"], 1640 startupinfo=47) 1641 self.assertRaises(ValueError, subprocess.call, 1642 [sys.executable, "-c", 1643 "import sys; sys.exit(47)"], 1644 creationflags=47) 1645 1646 def test_shell_sequence(self): 1647 # Run command through the shell (sequence) 1648 newenv = os.environ.copy() 1649 newenv["FRUIT"] = "apple" 1650 p = subprocess.Popen(["echo $FRUIT"], shell=1, 1651 stdout=subprocess.PIPE, 1652 env=newenv) 1653 with p: 1654 self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple") 1655 1656 def test_shell_string(self): 1657 # Run command through the shell (string) 1658 newenv = os.environ.copy() 1659 newenv["FRUIT"] = "apple" 1660 p = subprocess.Popen("echo $FRUIT", shell=1, 1661 stdout=subprocess.PIPE, 1662 env=newenv) 1663 with p: 1664 self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple") 1665 1666 def test_call_string(self): 1667 # call() function with string argument on UNIX 1668 fd, fname = tempfile.mkstemp() 1669 # reopen in text mode 1670 with open(fd, "w", errors="surrogateescape") as fobj: 1671 fobj.write("#!%s\n" % support.unix_shell) 1672 fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" % 1673 sys.executable) 1674 os.chmod(fname, 0o700) 1675 rc = subprocess.call(fname) 1676 os.remove(fname) 1677 self.assertEqual(rc, 47) 1678 1679 def test_specific_shell(self): 1680 # Issue #9265: Incorrect name passed as arg[0]. 1681 shells = [] 1682 for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']: 1683 for name in ['bash', 'ksh']: 1684 sh = os.path.join(prefix, name) 1685 if os.path.isfile(sh): 1686 shells.append(sh) 1687 if not shells: # Will probably work for any shell but csh. 1688 self.skipTest("bash or ksh required for this test") 1689 sh = '/bin/sh' 1690 if os.path.isfile(sh) and not os.path.islink(sh): 1691 # Test will fail if /bin/sh is a symlink to csh. 1692 shells.append(sh) 1693 for sh in shells: 1694 p = subprocess.Popen("echo $0", executable=sh, shell=True, 1695 stdout=subprocess.PIPE) 1696 with p: 1697 self.assertEqual(p.stdout.read().strip(), bytes(sh, 'ascii')) 1698 1699 def _kill_process(self, method, *args): 1700 # Do not inherit file handles from the parent. 1701 # It should fix failures on some platforms. 1702 # Also set the SIGINT handler to the default to make sure it's not 1703 # being ignored (some tests rely on that.) 1704 old_handler = signal.signal(signal.SIGINT, signal.default_int_handler) 1705 try: 1706 p = subprocess.Popen([sys.executable, "-c", """if 1: 1707 import sys, time 1708 sys.stdout.write('x\\n') 1709 sys.stdout.flush() 1710 time.sleep(30) 1711 """], 1712 close_fds=True, 1713 stdin=subprocess.PIPE, 1714 stdout=subprocess.PIPE, 1715 stderr=subprocess.PIPE) 1716 finally: 1717 signal.signal(signal.SIGINT, old_handler) 1718 # Wait for the interpreter to be completely initialized before 1719 # sending any signal. 1720 p.stdout.read(1) 1721 getattr(p, method)(*args) 1722 return p 1723 1724 @unittest.skipIf(sys.platform.startswith(('netbsd', 'openbsd')), 1725 "Due to known OS bug (issue #16762)") 1726 def _kill_dead_process(self, method, *args): 1727 # Do not inherit file handles from the parent. 1728 # It should fix failures on some platforms. 1729 p = subprocess.Popen([sys.executable, "-c", """if 1: 1730 import sys, time 1731 sys.stdout.write('x\\n') 1732 sys.stdout.flush() 1733 """], 1734 close_fds=True, 1735 stdin=subprocess.PIPE, 1736 stdout=subprocess.PIPE, 1737 stderr=subprocess.PIPE) 1738 # Wait for the interpreter to be completely initialized before 1739 # sending any signal. 1740 p.stdout.read(1) 1741 # The process should end after this 1742 time.sleep(1) 1743 # This shouldn't raise even though the child is now dead 1744 getattr(p, method)(*args) 1745 p.communicate() 1746 1747 def test_send_signal(self): 1748 p = self._kill_process('send_signal', signal.SIGINT) 1749 _, stderr = p.communicate() 1750 self.assertIn(b'KeyboardInterrupt', stderr) 1751 self.assertNotEqual(p.wait(), 0) 1752 1753 def test_kill(self): 1754 p = self._kill_process('kill') 1755 _, stderr = p.communicate() 1756 self.assertStderrEqual(stderr, b'') 1757 self.assertEqual(p.wait(), -signal.SIGKILL) 1758 1759 def test_terminate(self): 1760 p = self._kill_process('terminate') 1761 _, stderr = p.communicate() 1762 self.assertStderrEqual(stderr, b'') 1763 self.assertEqual(p.wait(), -signal.SIGTERM) 1764 1765 def test_send_signal_dead(self): 1766 # Sending a signal to a dead process 1767 self._kill_dead_process('send_signal', signal.SIGINT) 1768 1769 def test_kill_dead(self): 1770 # Killing a dead process 1771 self._kill_dead_process('kill') 1772 1773 def test_terminate_dead(self): 1774 # Terminating a dead process 1775 self._kill_dead_process('terminate') 1776 1777 def _save_fds(self, save_fds): 1778 fds = [] 1779 for fd in save_fds: 1780 inheritable = os.get_inheritable(fd) 1781 saved = os.dup(fd) 1782 fds.append((fd, saved, inheritable)) 1783 return fds 1784 1785 def _restore_fds(self, fds): 1786 for fd, saved, inheritable in fds: 1787 os.dup2(saved, fd, inheritable=inheritable) 1788 os.close(saved) 1789 1790 def check_close_std_fds(self, fds): 1791 # Issue #9905: test that subprocess pipes still work properly with 1792 # some standard fds closed 1793 stdin = 0 1794 saved_fds = self._save_fds(fds) 1795 for fd, saved, inheritable in saved_fds: 1796 if fd == 0: 1797 stdin = saved 1798 break 1799 try: 1800 for fd in fds: 1801 os.close(fd) 1802 out, err = subprocess.Popen([sys.executable, "-c", 1803 'import sys;' 1804 'sys.stdout.write("apple");' 1805 'sys.stdout.flush();' 1806 'sys.stderr.write("orange")'], 1807 stdin=stdin, 1808 stdout=subprocess.PIPE, 1809 stderr=subprocess.PIPE).communicate() 1810 err = support.strip_python_stderr(err) 1811 self.assertEqual((out, err), (b'apple', b'orange')) 1812 finally: 1813 self._restore_fds(saved_fds) 1814 1815 def test_close_fd_0(self): 1816 self.check_close_std_fds([0]) 1817 1818 def test_close_fd_1(self): 1819 self.check_close_std_fds([1]) 1820 1821 def test_close_fd_2(self): 1822 self.check_close_std_fds([2]) 1823 1824 def test_close_fds_0_1(self): 1825 self.check_close_std_fds([0, 1]) 1826 1827 def test_close_fds_0_2(self): 1828 self.check_close_std_fds([0, 2]) 1829 1830 def test_close_fds_1_2(self): 1831 self.check_close_std_fds([1, 2]) 1832 1833 def test_close_fds_0_1_2(self): 1834 # Issue #10806: test that subprocess pipes still work properly with 1835 # all standard fds closed. 1836 self.check_close_std_fds([0, 1, 2]) 1837 1838 def test_small_errpipe_write_fd(self): 1839 """Issue #15798: Popen should work when stdio fds are available.""" 1840 new_stdin = os.dup(0) 1841 new_stdout = os.dup(1) 1842 try: 1843 os.close(0) 1844 os.close(1) 1845 1846 # Side test: if errpipe_write fails to have its CLOEXEC 1847 # flag set this should cause the parent to think the exec 1848 # failed. Extremely unlikely: everyone supports CLOEXEC. 1849 subprocess.Popen([ 1850 sys.executable, "-c", 1851 "print('AssertionError:0:CLOEXEC failure.')"]).wait() 1852 finally: 1853 # Restore original stdin and stdout 1854 os.dup2(new_stdin, 0) 1855 os.dup2(new_stdout, 1) 1856 os.close(new_stdin) 1857 os.close(new_stdout) 1858 1859 def test_remapping_std_fds(self): 1860 # open up some temporary files 1861 temps = [tempfile.mkstemp() for i in range(3)] 1862 try: 1863 temp_fds = [fd for fd, fname in temps] 1864 1865 # unlink the files -- we won't need to reopen them 1866 for fd, fname in temps: 1867 os.unlink(fname) 1868 1869 # write some data to what will become stdin, and rewind 1870 os.write(temp_fds[1], b"STDIN") 1871 os.lseek(temp_fds[1], 0, 0) 1872 1873 # move the standard file descriptors out of the way 1874 saved_fds = self._save_fds(range(3)) 1875 try: 1876 # duplicate the file objects over the standard fd's 1877 for fd, temp_fd in enumerate(temp_fds): 1878 os.dup2(temp_fd, fd) 1879 1880 # now use those files in the "wrong" order, so that subprocess 1881 # has to rearrange them in the child 1882 p = subprocess.Popen([sys.executable, "-c", 1883 'import sys; got = sys.stdin.read();' 1884 'sys.stdout.write("got %s"%got); sys.stderr.write("err")'], 1885 stdin=temp_fds[1], 1886 stdout=temp_fds[2], 1887 stderr=temp_fds[0]) 1888 p.wait() 1889 finally: 1890 self._restore_fds(saved_fds) 1891 1892 for fd in temp_fds: 1893 os.lseek(fd, 0, 0) 1894 1895 out = os.read(temp_fds[2], 1024) 1896 err = support.strip_python_stderr(os.read(temp_fds[0], 1024)) 1897 self.assertEqual(out, b"got STDIN") 1898 self.assertEqual(err, b"err") 1899 1900 finally: 1901 for fd in temp_fds: 1902 os.close(fd) 1903 1904 def check_swap_fds(self, stdin_no, stdout_no, stderr_no): 1905 # open up some temporary files 1906 temps = [tempfile.mkstemp() for i in range(3)] 1907 temp_fds = [fd for fd, fname in temps] 1908 try: 1909 # unlink the files -- we won't need to reopen them 1910 for fd, fname in temps: 1911 os.unlink(fname) 1912 1913 # save a copy of the standard file descriptors 1914 saved_fds = self._save_fds(range(3)) 1915 try: 1916 # duplicate the temp files over the standard fd's 0, 1, 2 1917 for fd, temp_fd in enumerate(temp_fds): 1918 os.dup2(temp_fd, fd) 1919 1920 # write some data to what will become stdin, and rewind 1921 os.write(stdin_no, b"STDIN") 1922 os.lseek(stdin_no, 0, 0) 1923 1924 # now use those files in the given order, so that subprocess 1925 # has to rearrange them in the child 1926 p = subprocess.Popen([sys.executable, "-c", 1927 'import sys; got = sys.stdin.read();' 1928 'sys.stdout.write("got %s"%got); sys.stderr.write("err")'], 1929 stdin=stdin_no, 1930 stdout=stdout_no, 1931 stderr=stderr_no) 1932 p.wait() 1933 1934 for fd in temp_fds: 1935 os.lseek(fd, 0, 0) 1936 1937 out = os.read(stdout_no, 1024) 1938 err = support.strip_python_stderr(os.read(stderr_no, 1024)) 1939 finally: 1940 self._restore_fds(saved_fds) 1941 1942 self.assertEqual(out, b"got STDIN") 1943 self.assertEqual(err, b"err") 1944 1945 finally: 1946 for fd in temp_fds: 1947 os.close(fd) 1948 1949 # When duping fds, if there arises a situation where one of the fds is 1950 # either 0, 1 or 2, it is possible that it is overwritten (#12607). 1951 # This tests all combinations of this. 1952 def test_swap_fds(self): 1953 self.check_swap_fds(0, 1, 2) 1954 self.check_swap_fds(0, 2, 1) 1955 self.check_swap_fds(1, 0, 2) 1956 self.check_swap_fds(1, 2, 0) 1957 self.check_swap_fds(2, 0, 1) 1958 self.check_swap_fds(2, 1, 0) 1959 1960 def test_surrogates_error_message(self): 1961 def prepare(): 1962 raise ValueError("surrogate:\uDCff") 1963 1964 try: 1965 subprocess.call( 1966 [sys.executable, "-c", "pass"], 1967 preexec_fn=prepare) 1968 except ValueError as err: 1969 # Pure Python implementations keeps the message 1970 self.assertIsNone(subprocess._posixsubprocess) 1971 self.assertEqual(str(err), "surrogate:\uDCff") 1972 except subprocess.SubprocessError as err: 1973 # _posixsubprocess uses a default message 1974 self.assertIsNotNone(subprocess._posixsubprocess) 1975 self.assertEqual(str(err), "Exception occurred in preexec_fn.") 1976 else: 1977 self.fail("Expected ValueError or subprocess.SubprocessError") 1978 1979 def test_undecodable_env(self): 1980 for key, value in (('test', 'abc\uDCFF'), ('test\uDCFF', '42')): 1981 encoded_value = value.encode("ascii", "surrogateescape") 1982 1983 # test str with surrogates 1984 script = "import os; print(ascii(os.getenv(%s)))" % repr(key) 1985 env = os.environ.copy() 1986 env[key] = value 1987 # Use C locale to get ASCII for the locale encoding to force 1988 # surrogate-escaping of \xFF in the child process; otherwise it can 1989 # be decoded as-is if the default locale is latin-1. 1990 env['LC_ALL'] = 'C' 1991 if sys.platform.startswith("aix"): 1992 # On AIX, the C locale uses the Latin1 encoding 1993 decoded_value = encoded_value.decode("latin1", "surrogateescape") 1994 else: 1995 # On other UNIXes, the C locale uses the ASCII encoding 1996 decoded_value = value 1997 stdout = subprocess.check_output( 1998 [sys.executable, "-c", script], 1999 env=env) 2000 stdout = stdout.rstrip(b'\n\r') 2001 self.assertEqual(stdout.decode('ascii'), ascii(decoded_value)) 2002 2003 # test bytes 2004 key = key.encode("ascii", "surrogateescape") 2005 script = "import os; print(ascii(os.getenvb(%s)))" % repr(key) 2006 env = os.environ.copy() 2007 env[key] = encoded_value 2008 stdout = subprocess.check_output( 2009 [sys.executable, "-c", script], 2010 env=env) 2011 stdout = stdout.rstrip(b'\n\r') 2012 self.assertEqual(stdout.decode('ascii'), ascii(encoded_value)) 2013 2014 def test_bytes_program(self): 2015 abs_program = os.fsencode(sys.executable) 2016 path, program = os.path.split(sys.executable) 2017 program = os.fsencode(program) 2018 2019 # absolute bytes path 2020 exitcode = subprocess.call([abs_program, "-c", "pass"]) 2021 self.assertEqual(exitcode, 0) 2022 2023 # absolute bytes path as a string 2024 cmd = b"'" + abs_program + b"' -c pass" 2025 exitcode = subprocess.call(cmd, shell=True) 2026 self.assertEqual(exitcode, 0) 2027 2028 # bytes program, unicode PATH 2029 env = os.environ.copy() 2030 env["PATH"] = path 2031 exitcode = subprocess.call([program, "-c", "pass"], env=env) 2032 self.assertEqual(exitcode, 0) 2033 2034 # bytes program, bytes PATH 2035 envb = os.environb.copy() 2036 envb[b"PATH"] = os.fsencode(path) 2037 exitcode = subprocess.call([program, "-c", "pass"], env=envb) 2038 self.assertEqual(exitcode, 0) 2039 2040 def test_pipe_cloexec(self): 2041 sleeper = support.findfile("input_reader.py", subdir="subprocessdata") 2042 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 2043 2044 p1 = subprocess.Popen([sys.executable, sleeper], 2045 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 2046 stderr=subprocess.PIPE, close_fds=False) 2047 2048 self.addCleanup(p1.communicate, b'') 2049 2050 p2 = subprocess.Popen([sys.executable, fd_status], 2051 stdout=subprocess.PIPE, close_fds=False) 2052 2053 output, error = p2.communicate() 2054 result_fds = set(map(int, output.split(b','))) 2055 unwanted_fds = set([p1.stdin.fileno(), p1.stdout.fileno(), 2056 p1.stderr.fileno()]) 2057 2058 self.assertFalse(result_fds & unwanted_fds, 2059 "Expected no fds from %r to be open in child, " 2060 "found %r" % 2061 (unwanted_fds, result_fds & unwanted_fds)) 2062 2063 def test_pipe_cloexec_real_tools(self): 2064 qcat = support.findfile("qcat.py", subdir="subprocessdata") 2065 qgrep = support.findfile("qgrep.py", subdir="subprocessdata") 2066 2067 subdata = b'zxcvbn' 2068 data = subdata * 4 + b'\n' 2069 2070 p1 = subprocess.Popen([sys.executable, qcat], 2071 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 2072 close_fds=False) 2073 2074 p2 = subprocess.Popen([sys.executable, qgrep, subdata], 2075 stdin=p1.stdout, stdout=subprocess.PIPE, 2076 close_fds=False) 2077 2078 self.addCleanup(p1.wait) 2079 self.addCleanup(p2.wait) 2080 def kill_p1(): 2081 try: 2082 p1.terminate() 2083 except ProcessLookupError: 2084 pass 2085 def kill_p2(): 2086 try: 2087 p2.terminate() 2088 except ProcessLookupError: 2089 pass 2090 self.addCleanup(kill_p1) 2091 self.addCleanup(kill_p2) 2092 2093 p1.stdin.write(data) 2094 p1.stdin.close() 2095 2096 readfiles, ignored1, ignored2 = select.select([p2.stdout], [], [], 10) 2097 2098 self.assertTrue(readfiles, "The child hung") 2099 self.assertEqual(p2.stdout.read(), data) 2100 2101 p1.stdout.close() 2102 p2.stdout.close() 2103 2104 def test_close_fds(self): 2105 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 2106 2107 fds = os.pipe() 2108 self.addCleanup(os.close, fds[0]) 2109 self.addCleanup(os.close, fds[1]) 2110 2111 open_fds = set(fds) 2112 # add a bunch more fds 2113 for _ in range(9): 2114 fd = os.open(os.devnull, os.O_RDONLY) 2115 self.addCleanup(os.close, fd) 2116 open_fds.add(fd) 2117 2118 for fd in open_fds: 2119 os.set_inheritable(fd, True) 2120 2121 p = subprocess.Popen([sys.executable, fd_status], 2122 stdout=subprocess.PIPE, close_fds=False) 2123 output, ignored = p.communicate() 2124 remaining_fds = set(map(int, output.split(b','))) 2125 2126 self.assertEqual(remaining_fds & open_fds, open_fds, 2127 "Some fds were closed") 2128 2129 p = subprocess.Popen([sys.executable, fd_status], 2130 stdout=subprocess.PIPE, close_fds=True) 2131 output, ignored = p.communicate() 2132 remaining_fds = set(map(int, output.split(b','))) 2133 2134 self.assertFalse(remaining_fds & open_fds, 2135 "Some fds were left open") 2136 self.assertIn(1, remaining_fds, "Subprocess failed") 2137 2138 # Keep some of the fd's we opened open in the subprocess. 2139 # This tests _posixsubprocess.c's proper handling of fds_to_keep. 2140 fds_to_keep = set(open_fds.pop() for _ in range(8)) 2141 p = subprocess.Popen([sys.executable, fd_status], 2142 stdout=subprocess.PIPE, close_fds=True, 2143 pass_fds=()) 2144 output, ignored = p.communicate() 2145 remaining_fds = set(map(int, output.split(b','))) 2146 2147 self.assertFalse(remaining_fds & fds_to_keep & open_fds, 2148 "Some fds not in pass_fds were left open") 2149 self.assertIn(1, remaining_fds, "Subprocess failed") 2150 2151 2152 @unittest.skipIf(sys.platform.startswith("freebsd") and 2153 os.stat("/dev").st_dev == os.stat("/dev/fd").st_dev, 2154 "Requires fdescfs mounted on /dev/fd on FreeBSD.") 2155 def test_close_fds_when_max_fd_is_lowered(self): 2156 """Confirm that issue21618 is fixed (may fail under valgrind).""" 2157 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 2158 2159 # This launches the meat of the test in a child process to 2160 # avoid messing with the larger unittest processes maximum 2161 # number of file descriptors. 2162 # This process launches: 2163 # +--> Process that lowers its RLIMIT_NOFILE aftr setting up 2164 # a bunch of high open fds above the new lower rlimit. 2165 # Those are reported via stdout before launching a new 2166 # process with close_fds=False to run the actual test: 2167 # +--> The TEST: This one launches a fd_status.py 2168 # subprocess with close_fds=True so we can find out if 2169 # any of the fds above the lowered rlimit are still open. 2170 p = subprocess.Popen([sys.executable, '-c', textwrap.dedent( 2171 ''' 2172 import os, resource, subprocess, sys, textwrap 2173 open_fds = set() 2174 # Add a bunch more fds to pass down. 2175 for _ in range(40): 2176 fd = os.open(os.devnull, os.O_RDONLY) 2177 open_fds.add(fd) 2178 2179 # Leave a two pairs of low ones available for use by the 2180 # internal child error pipe and the stdout pipe. 2181 # We also leave 10 more open as some Python buildbots run into 2182 # "too many open files" errors during the test if we do not. 2183 for fd in sorted(open_fds)[:14]: 2184 os.close(fd) 2185 open_fds.remove(fd) 2186 2187 for fd in open_fds: 2188 #self.addCleanup(os.close, fd) 2189 os.set_inheritable(fd, True) 2190 2191 max_fd_open = max(open_fds) 2192 2193 # Communicate the open_fds to the parent unittest.TestCase process. 2194 print(','.join(map(str, sorted(open_fds)))) 2195 sys.stdout.flush() 2196 2197 rlim_cur, rlim_max = resource.getrlimit(resource.RLIMIT_NOFILE) 2198 try: 2199 # 29 is lower than the highest fds we are leaving open. 2200 resource.setrlimit(resource.RLIMIT_NOFILE, (29, rlim_max)) 2201 # Launch a new Python interpreter with our low fd rlim_cur that 2202 # inherits open fds above that limit. It then uses subprocess 2203 # with close_fds=True to get a report of open fds in the child. 2204 # An explicit list of fds to check is passed to fd_status.py as 2205 # letting fd_status rely on its default logic would miss the 2206 # fds above rlim_cur as it normally only checks up to that limit. 2207 subprocess.Popen( 2208 [sys.executable, '-c', 2209 textwrap.dedent(""" 2210 import subprocess, sys 2211 subprocess.Popen([sys.executable, %r] + 2212 [str(x) for x in range({max_fd})], 2213 close_fds=True).wait() 2214 """.format(max_fd=max_fd_open+1))], 2215 close_fds=False).wait() 2216 finally: 2217 resource.setrlimit(resource.RLIMIT_NOFILE, (rlim_cur, rlim_max)) 2218 ''' % fd_status)], stdout=subprocess.PIPE) 2219 2220 output, unused_stderr = p.communicate() 2221 output_lines = output.splitlines() 2222 self.assertEqual(len(output_lines), 2, 2223 msg="expected exactly two lines of output:\n%r" % output) 2224 opened_fds = set(map(int, output_lines[0].strip().split(b','))) 2225 remaining_fds = set(map(int, output_lines[1].strip().split(b','))) 2226 2227 self.assertFalse(remaining_fds & opened_fds, 2228 msg="Some fds were left open.") 2229 2230 2231 # Mac OS X Tiger (10.4) has a kernel bug: sometimes, the file 2232 # descriptor of a pipe closed in the parent process is valid in the 2233 # child process according to fstat(), but the mode of the file 2234 # descriptor is invalid, and read or write raise an error. 2235 @support.requires_mac_ver(10, 5) 2236 def test_pass_fds(self): 2237 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 2238 2239 open_fds = set() 2240 2241 for x in range(5): 2242 fds = os.pipe() 2243 self.addCleanup(os.close, fds[0]) 2244 self.addCleanup(os.close, fds[1]) 2245 os.set_inheritable(fds[0], True) 2246 os.set_inheritable(fds[1], True) 2247 open_fds.update(fds) 2248 2249 for fd in open_fds: 2250 p = subprocess.Popen([sys.executable, fd_status], 2251 stdout=subprocess.PIPE, close_fds=True, 2252 pass_fds=(fd, )) 2253 output, ignored = p.communicate() 2254 2255 remaining_fds = set(map(int, output.split(b','))) 2256 to_be_closed = open_fds - {fd} 2257 2258 self.assertIn(fd, remaining_fds, "fd to be passed not passed") 2259 self.assertFalse(remaining_fds & to_be_closed, 2260 "fd to be closed passed") 2261 2262 # pass_fds overrides close_fds with a warning. 2263 with self.assertWarns(RuntimeWarning) as context: 2264 self.assertFalse(subprocess.call( 2265 [sys.executable, "-c", "import sys; sys.exit(0)"], 2266 close_fds=False, pass_fds=(fd, ))) 2267 self.assertIn('overriding close_fds', str(context.warning)) 2268 2269 def test_pass_fds_inheritable(self): 2270 script = support.findfile("fd_status.py", subdir="subprocessdata") 2271 2272 inheritable, non_inheritable = os.pipe() 2273 self.addCleanup(os.close, inheritable) 2274 self.addCleanup(os.close, non_inheritable) 2275 os.set_inheritable(inheritable, True) 2276 os.set_inheritable(non_inheritable, False) 2277 pass_fds = (inheritable, non_inheritable) 2278 args = [sys.executable, script] 2279 args += list(map(str, pass_fds)) 2280 2281 p = subprocess.Popen(args, 2282 stdout=subprocess.PIPE, close_fds=True, 2283 pass_fds=pass_fds) 2284 output, ignored = p.communicate() 2285 fds = set(map(int, output.split(b','))) 2286 2287 # the inheritable file descriptor must be inherited, so its inheritable 2288 # flag must be set in the child process after fork() and before exec() 2289 self.assertEqual(fds, set(pass_fds), "output=%a" % output) 2290 2291 # inheritable flag must not be changed in the parent process 2292 self.assertEqual(os.get_inheritable(inheritable), True) 2293 self.assertEqual(os.get_inheritable(non_inheritable), False) 2294 2295 def test_stdout_stdin_are_single_inout_fd(self): 2296 with io.open(os.devnull, "r+") as inout: 2297 p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"], 2298 stdout=inout, stdin=inout) 2299 p.wait() 2300 2301 def test_stdout_stderr_are_single_inout_fd(self): 2302 with io.open(os.devnull, "r+") as inout: 2303 p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"], 2304 stdout=inout, stderr=inout) 2305 p.wait() 2306 2307 def test_stderr_stdin_are_single_inout_fd(self): 2308 with io.open(os.devnull, "r+") as inout: 2309 p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"], 2310 stderr=inout, stdin=inout) 2311 p.wait() 2312 2313 def test_wait_when_sigchild_ignored(self): 2314 # NOTE: sigchild_ignore.py may not be an effective test on all OSes. 2315 sigchild_ignore = support.findfile("sigchild_ignore.py", 2316 subdir="subprocessdata") 2317 p = subprocess.Popen([sys.executable, sigchild_ignore], 2318 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 2319 stdout, stderr = p.communicate() 2320 self.assertEqual(0, p.returncode, "sigchild_ignore.py exited" 2321 " non-zero with this error:\n%s" % 2322 stderr.decode('utf-8')) 2323 2324 def test_select_unbuffered(self): 2325 # Issue #11459: bufsize=0 should really set the pipes as 2326 # unbuffered (and therefore let select() work properly). 2327 select = support.import_module("select") 2328 p = subprocess.Popen([sys.executable, "-c", 2329 'import sys;' 2330 'sys.stdout.write("apple")'], 2331 stdout=subprocess.PIPE, 2332 bufsize=0) 2333 f = p.stdout 2334 self.addCleanup(f.close) 2335 try: 2336 self.assertEqual(f.read(4), b"appl") 2337 self.assertIn(f, select.select([f], [], [], 0.0)[0]) 2338 finally: 2339 p.wait() 2340 2341 def test_zombie_fast_process_del(self): 2342 # Issue #12650: on Unix, if Popen.__del__() was called before the 2343 # process exited, it wouldn't be added to subprocess._active, and would 2344 # remain a zombie. 2345 # spawn a Popen, and delete its reference before it exits 2346 p = subprocess.Popen([sys.executable, "-c", 2347 'import sys, time;' 2348 'time.sleep(0.2)'], 2349 stdout=subprocess.PIPE, 2350 stderr=subprocess.PIPE) 2351 self.addCleanup(p.stdout.close) 2352 self.addCleanup(p.stderr.close) 2353 ident = id(p) 2354 pid = p.pid 2355 with support.check_warnings(('', ResourceWarning)): 2356 p = None 2357 2358 # check that p is in the active processes list 2359 self.assertIn(ident, [id(o) for o in subprocess._active]) 2360 2361 def test_leak_fast_process_del_killed(self): 2362 # Issue #12650: on Unix, if Popen.__del__() was called before the 2363 # process exited, and the process got killed by a signal, it would never 2364 # be removed from subprocess._active, which triggered a FD and memory 2365 # leak. 2366 # spawn a Popen, delete its reference and kill it 2367 p = subprocess.Popen([sys.executable, "-c", 2368 'import time;' 2369 'time.sleep(3)'], 2370 stdout=subprocess.PIPE, 2371 stderr=subprocess.PIPE) 2372 self.addCleanup(p.stdout.close) 2373 self.addCleanup(p.stderr.close) 2374 ident = id(p) 2375 pid = p.pid 2376 with support.check_warnings(('', ResourceWarning)): 2377 p = None 2378 2379 os.kill(pid, signal.SIGKILL) 2380 # check that p is in the active processes list 2381 self.assertIn(ident, [id(o) for o in subprocess._active]) 2382 2383 # let some time for the process to exit, and create a new Popen: this 2384 # should trigger the wait() of p 2385 time.sleep(0.2) 2386 with self.assertRaises(OSError) as c: 2387 with subprocess.Popen(['nonexisting_i_hope'], 2388 stdout=subprocess.PIPE, 2389 stderr=subprocess.PIPE) as proc: 2390 pass 2391 # p should have been wait()ed on, and removed from the _active list 2392 self.assertRaises(OSError, os.waitpid, pid, 0) 2393 self.assertNotIn(ident, [id(o) for o in subprocess._active]) 2394 2395 def test_close_fds_after_preexec(self): 2396 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 2397 2398 # this FD is used as dup2() target by preexec_fn, and should be closed 2399 # in the child process 2400 fd = os.dup(1) 2401 self.addCleanup(os.close, fd) 2402 2403 p = subprocess.Popen([sys.executable, fd_status], 2404 stdout=subprocess.PIPE, close_fds=True, 2405 preexec_fn=lambda: os.dup2(1, fd)) 2406 output, ignored = p.communicate() 2407 2408 remaining_fds = set(map(int, output.split(b','))) 2409 2410 self.assertNotIn(fd, remaining_fds) 2411 2412 @support.cpython_only 2413 def test_fork_exec(self): 2414 # Issue #22290: fork_exec() must not crash on memory allocation failure 2415 # or other errors 2416 import _posixsubprocess 2417 gc_enabled = gc.isenabled() 2418 try: 2419 # Use a preexec function and enable the garbage collector 2420 # to force fork_exec() to re-enable the garbage collector 2421 # on error. 2422 func = lambda: None 2423 gc.enable() 2424 2425 for args, exe_list, cwd, env_list in ( 2426 (123, [b"exe"], None, [b"env"]), 2427 ([b"arg"], 123, None, [b"env"]), 2428 ([b"arg"], [b"exe"], 123, [b"env"]), 2429 ([b"arg"], [b"exe"], None, 123), 2430 ): 2431 with self.assertRaises(TypeError): 2432 _posixsubprocess.fork_exec( 2433 args, exe_list, 2434 True, [], cwd, env_list, 2435 -1, -1, -1, -1, 2436 1, 2, 3, 4, 2437 True, True, func) 2438 finally: 2439 if not gc_enabled: 2440 gc.disable() 2441 2442 @support.cpython_only 2443 def test_fork_exec_sorted_fd_sanity_check(self): 2444 # Issue #23564: sanity check the fork_exec() fds_to_keep sanity check. 2445 import _posixsubprocess 2446 gc_enabled = gc.isenabled() 2447 try: 2448 gc.enable() 2449 2450 for fds_to_keep in ( 2451 (-1, 2, 3, 4, 5), # Negative number. 2452 ('str', 4), # Not an int. 2453 (18, 23, 42, 2**63), # Out of range. 2454 (5, 4), # Not sorted. 2455 (6, 7, 7, 8), # Duplicate. 2456 ): 2457 with self.assertRaises( 2458 ValueError, 2459 msg='fds_to_keep={}'.format(fds_to_keep)) as c: 2460 _posixsubprocess.fork_exec( 2461 [b"false"], [b"false"], 2462 True, fds_to_keep, None, [b"env"], 2463 -1, -1, -1, -1, 2464 1, 2, 3, 4, 2465 True, True, None) 2466 self.assertIn('fds_to_keep', str(c.exception)) 2467 finally: 2468 if not gc_enabled: 2469 gc.disable() 2470 2471 def test_communicate_BrokenPipeError_stdin_close(self): 2472 # By not setting stdout or stderr or a timeout we force the fast path 2473 # that just calls _stdin_write() internally due to our mock. 2474 proc = subprocess.Popen([sys.executable, '-c', 'pass']) 2475 with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin: 2476 mock_proc_stdin.close.side_effect = BrokenPipeError 2477 proc.communicate() # Should swallow BrokenPipeError from close. 2478 mock_proc_stdin.close.assert_called_with() 2479 2480 def test_communicate_BrokenPipeError_stdin_write(self): 2481 # By not setting stdout or stderr or a timeout we force the fast path 2482 # that just calls _stdin_write() internally due to our mock. 2483 proc = subprocess.Popen([sys.executable, '-c', 'pass']) 2484 with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin: 2485 mock_proc_stdin.write.side_effect = BrokenPipeError 2486 proc.communicate(b'stuff') # Should swallow the BrokenPipeError. 2487 mock_proc_stdin.write.assert_called_once_with(b'stuff') 2488 mock_proc_stdin.close.assert_called_once_with() 2489 2490 def test_communicate_BrokenPipeError_stdin_flush(self): 2491 # Setting stdin and stdout forces the ._communicate() code path. 2492 # python -h exits faster than python -c pass (but spams stdout). 2493 proc = subprocess.Popen([sys.executable, '-h'], 2494 stdin=subprocess.PIPE, 2495 stdout=subprocess.PIPE) 2496 with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin, \ 2497 open(os.devnull, 'wb') as dev_null: 2498 mock_proc_stdin.flush.side_effect = BrokenPipeError 2499 # because _communicate registers a selector using proc.stdin... 2500 mock_proc_stdin.fileno.return_value = dev_null.fileno() 2501 # _communicate() should swallow BrokenPipeError from flush. 2502 proc.communicate(b'stuff') 2503 mock_proc_stdin.flush.assert_called_once_with() 2504 2505 def test_communicate_BrokenPipeError_stdin_close_with_timeout(self): 2506 # Setting stdin and stdout forces the ._communicate() code path. 2507 # python -h exits faster than python -c pass (but spams stdout). 2508 proc = subprocess.Popen([sys.executable, '-h'], 2509 stdin=subprocess.PIPE, 2510 stdout=subprocess.PIPE) 2511 with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin: 2512 mock_proc_stdin.close.side_effect = BrokenPipeError 2513 # _communicate() should swallow BrokenPipeError from close. 2514 proc.communicate(timeout=999) 2515 mock_proc_stdin.close.assert_called_once_with() 2516 2517 _libc_file_extensions = { 2518 'Linux': 'so.6', 2519 'Darwin': 'dylib', 2520 } 2521 @unittest.skipIf(not ctypes, 'ctypes module required.') 2522 @unittest.skipIf(platform.uname()[0] not in _libc_file_extensions, 2523 'Test requires a libc this code can load with ctypes.') 2524 @unittest.skipIf(not sys.executable, 'Test requires sys.executable.') 2525 def test_child_terminated_in_stopped_state(self): 2526 """Test wait() behavior when waitpid returns WIFSTOPPED; issue29335.""" 2527 PTRACE_TRACEME = 0 # From glibc and MacOS (PT_TRACE_ME). 2528 libc_name = 'libc.' + self._libc_file_extensions[platform.uname()[0]] 2529 libc = ctypes.CDLL(libc_name) 2530 if not hasattr(libc, 'ptrace'): 2531 raise unittest.SkipTest('ptrace() required.') 2532 test_ptrace = subprocess.Popen( 2533 [sys.executable, '-c', """if True: 2534 import ctypes 2535 libc = ctypes.CDLL({libc_name!r}) 2536 libc.ptrace({PTRACE_TRACEME}, 0, 0) 2537 """.format(libc_name=libc_name, PTRACE_TRACEME=PTRACE_TRACEME) 2538 ]) 2539 if test_ptrace.wait() != 0: 2540 raise unittest.SkipTest('ptrace() failed - unable to test.') 2541 child = subprocess.Popen( 2542 [sys.executable, '-c', """if True: 2543 import ctypes 2544 libc = ctypes.CDLL({libc_name!r}) 2545 libc.ptrace({PTRACE_TRACEME}, 0, 0) 2546 libc.printf(ctypes.c_char_p(0xdeadbeef)) # Crash the process. 2547 """.format(libc_name=libc_name, PTRACE_TRACEME=PTRACE_TRACEME) 2548 ]) 2549 try: 2550 returncode = child.wait() 2551 except Exception as e: 2552 child.kill() # Clean up the hung stopped process. 2553 raise e 2554 self.assertNotEqual(0, returncode) 2555 self.assertLess(returncode, 0) # signal death, likely SIGSEGV. 2556 2557 2558@unittest.skipUnless(mswindows, "Windows specific tests") 2559class Win32ProcessTestCase(BaseTestCase): 2560 2561 def test_startupinfo(self): 2562 # startupinfo argument 2563 # We uses hardcoded constants, because we do not want to 2564 # depend on win32all. 2565 STARTF_USESHOWWINDOW = 1 2566 SW_MAXIMIZE = 3 2567 startupinfo = subprocess.STARTUPINFO() 2568 startupinfo.dwFlags = STARTF_USESHOWWINDOW 2569 startupinfo.wShowWindow = SW_MAXIMIZE 2570 # Since Python is a console process, it won't be affected 2571 # by wShowWindow, but the argument should be silently 2572 # ignored 2573 subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"], 2574 startupinfo=startupinfo) 2575 2576 def test_creationflags(self): 2577 # creationflags argument 2578 CREATE_NEW_CONSOLE = 16 2579 sys.stderr.write(" a DOS box should flash briefly ...\n") 2580 subprocess.call(sys.executable + 2581 ' -c "import time; time.sleep(0.25)"', 2582 creationflags=CREATE_NEW_CONSOLE) 2583 2584 def test_invalid_args(self): 2585 # invalid arguments should raise ValueError 2586 self.assertRaises(ValueError, subprocess.call, 2587 [sys.executable, "-c", 2588 "import sys; sys.exit(47)"], 2589 preexec_fn=lambda: 1) 2590 self.assertRaises(ValueError, subprocess.call, 2591 [sys.executable, "-c", 2592 "import sys; sys.exit(47)"], 2593 stdout=subprocess.PIPE, 2594 close_fds=True) 2595 2596 def test_close_fds(self): 2597 # close file descriptors 2598 rc = subprocess.call([sys.executable, "-c", 2599 "import sys; sys.exit(47)"], 2600 close_fds=True) 2601 self.assertEqual(rc, 47) 2602 2603 def test_shell_sequence(self): 2604 # Run command through the shell (sequence) 2605 newenv = os.environ.copy() 2606 newenv["FRUIT"] = "physalis" 2607 p = subprocess.Popen(["set"], shell=1, 2608 stdout=subprocess.PIPE, 2609 env=newenv) 2610 with p: 2611 self.assertIn(b"physalis", p.stdout.read()) 2612 2613 def test_shell_string(self): 2614 # Run command through the shell (string) 2615 newenv = os.environ.copy() 2616 newenv["FRUIT"] = "physalis" 2617 p = subprocess.Popen("set", shell=1, 2618 stdout=subprocess.PIPE, 2619 env=newenv) 2620 with p: 2621 self.assertIn(b"physalis", p.stdout.read()) 2622 2623 def test_shell_encodings(self): 2624 # Run command through the shell (string) 2625 for enc in ['ansi', 'oem']: 2626 newenv = os.environ.copy() 2627 newenv["FRUIT"] = "physalis" 2628 p = subprocess.Popen("set", shell=1, 2629 stdout=subprocess.PIPE, 2630 env=newenv, 2631 encoding=enc) 2632 with p: 2633 self.assertIn("physalis", p.stdout.read(), enc) 2634 2635 def test_call_string(self): 2636 # call() function with string argument on Windows 2637 rc = subprocess.call(sys.executable + 2638 ' -c "import sys; sys.exit(47)"') 2639 self.assertEqual(rc, 47) 2640 2641 def _kill_process(self, method, *args): 2642 # Some win32 buildbot raises EOFError if stdin is inherited 2643 p = subprocess.Popen([sys.executable, "-c", """if 1: 2644 import sys, time 2645 sys.stdout.write('x\\n') 2646 sys.stdout.flush() 2647 time.sleep(30) 2648 """], 2649 stdin=subprocess.PIPE, 2650 stdout=subprocess.PIPE, 2651 stderr=subprocess.PIPE) 2652 with p: 2653 # Wait for the interpreter to be completely initialized before 2654 # sending any signal. 2655 p.stdout.read(1) 2656 getattr(p, method)(*args) 2657 _, stderr = p.communicate() 2658 self.assertStderrEqual(stderr, b'') 2659 returncode = p.wait() 2660 self.assertNotEqual(returncode, 0) 2661 2662 def _kill_dead_process(self, method, *args): 2663 p = subprocess.Popen([sys.executable, "-c", """if 1: 2664 import sys, time 2665 sys.stdout.write('x\\n') 2666 sys.stdout.flush() 2667 sys.exit(42) 2668 """], 2669 stdin=subprocess.PIPE, 2670 stdout=subprocess.PIPE, 2671 stderr=subprocess.PIPE) 2672 with p: 2673 # Wait for the interpreter to be completely initialized before 2674 # sending any signal. 2675 p.stdout.read(1) 2676 # The process should end after this 2677 time.sleep(1) 2678 # This shouldn't raise even though the child is now dead 2679 getattr(p, method)(*args) 2680 _, stderr = p.communicate() 2681 self.assertStderrEqual(stderr, b'') 2682 rc = p.wait() 2683 self.assertEqual(rc, 42) 2684 2685 def test_send_signal(self): 2686 self._kill_process('send_signal', signal.SIGTERM) 2687 2688 def test_kill(self): 2689 self._kill_process('kill') 2690 2691 def test_terminate(self): 2692 self._kill_process('terminate') 2693 2694 def test_send_signal_dead(self): 2695 self._kill_dead_process('send_signal', signal.SIGTERM) 2696 2697 def test_kill_dead(self): 2698 self._kill_dead_process('kill') 2699 2700 def test_terminate_dead(self): 2701 self._kill_dead_process('terminate') 2702 2703class MiscTests(unittest.TestCase): 2704 def test_getoutput(self): 2705 self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy') 2706 self.assertEqual(subprocess.getstatusoutput('echo xyzzy'), 2707 (0, 'xyzzy')) 2708 2709 # we use mkdtemp in the next line to create an empty directory 2710 # under our exclusive control; from that, we can invent a pathname 2711 # that we _know_ won't exist. This is guaranteed to fail. 2712 dir = None 2713 try: 2714 dir = tempfile.mkdtemp() 2715 name = os.path.join(dir, "foo") 2716 status, output = subprocess.getstatusoutput( 2717 ("type " if mswindows else "cat ") + name) 2718 self.assertNotEqual(status, 0) 2719 finally: 2720 if dir is not None: 2721 os.rmdir(dir) 2722 2723 def test__all__(self): 2724 """Ensure that __all__ is populated properly.""" 2725 intentionally_excluded = {"list2cmdline", "Handle"} 2726 exported = set(subprocess.__all__) 2727 possible_exports = set() 2728 import types 2729 for name, value in subprocess.__dict__.items(): 2730 if name.startswith('_'): 2731 continue 2732 if isinstance(value, (types.ModuleType,)): 2733 continue 2734 possible_exports.add(name) 2735 self.assertEqual(exported, possible_exports - intentionally_excluded) 2736 2737 2738@unittest.skipUnless(hasattr(selectors, 'PollSelector'), 2739 "Test needs selectors.PollSelector") 2740class ProcessTestCaseNoPoll(ProcessTestCase): 2741 def setUp(self): 2742 self.orig_selector = subprocess._PopenSelector 2743 subprocess._PopenSelector = selectors.SelectSelector 2744 ProcessTestCase.setUp(self) 2745 2746 def tearDown(self): 2747 subprocess._PopenSelector = self.orig_selector 2748 ProcessTestCase.tearDown(self) 2749 2750 2751@unittest.skipUnless(mswindows, "Windows-specific tests") 2752class CommandsWithSpaces (BaseTestCase): 2753 2754 def setUp(self): 2755 super().setUp() 2756 f, fname = tempfile.mkstemp(".py", "te st") 2757 self.fname = fname.lower () 2758 os.write(f, b"import sys;" 2759 b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))" 2760 ) 2761 os.close(f) 2762 2763 def tearDown(self): 2764 os.remove(self.fname) 2765 super().tearDown() 2766 2767 def with_spaces(self, *args, **kwargs): 2768 kwargs['stdout'] = subprocess.PIPE 2769 p = subprocess.Popen(*args, **kwargs) 2770 with p: 2771 self.assertEqual( 2772 p.stdout.read ().decode("mbcs"), 2773 "2 [%r, 'ab cd']" % self.fname 2774 ) 2775 2776 def test_shell_string_with_spaces(self): 2777 # call() function with string argument with spaces on Windows 2778 self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, 2779 "ab cd"), shell=1) 2780 2781 def test_shell_sequence_with_spaces(self): 2782 # call() function with sequence argument with spaces on Windows 2783 self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1) 2784 2785 def test_noshell_string_with_spaces(self): 2786 # call() function with string argument with spaces on Windows 2787 self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, 2788 "ab cd")) 2789 2790 def test_noshell_sequence_with_spaces(self): 2791 # call() function with sequence argument with spaces on Windows 2792 self.with_spaces([sys.executable, self.fname, "ab cd"]) 2793 2794 2795class ContextManagerTests(BaseTestCase): 2796 2797 def test_pipe(self): 2798 with subprocess.Popen([sys.executable, "-c", 2799 "import sys;" 2800 "sys.stdout.write('stdout');" 2801 "sys.stderr.write('stderr');"], 2802 stdout=subprocess.PIPE, 2803 stderr=subprocess.PIPE) as proc: 2804 self.assertEqual(proc.stdout.read(), b"stdout") 2805 self.assertStderrEqual(proc.stderr.read(), b"stderr") 2806 2807 self.assertTrue(proc.stdout.closed) 2808 self.assertTrue(proc.stderr.closed) 2809 2810 def test_returncode(self): 2811 with subprocess.Popen([sys.executable, "-c", 2812 "import sys; sys.exit(100)"]) as proc: 2813 pass 2814 # __exit__ calls wait(), so the returncode should be set 2815 self.assertEqual(proc.returncode, 100) 2816 2817 def test_communicate_stdin(self): 2818 with subprocess.Popen([sys.executable, "-c", 2819 "import sys;" 2820 "sys.exit(sys.stdin.read() == 'context')"], 2821 stdin=subprocess.PIPE) as proc: 2822 proc.communicate(b"context") 2823 self.assertEqual(proc.returncode, 1) 2824 2825 def test_invalid_args(self): 2826 with self.assertRaises((FileNotFoundError, PermissionError)) as c: 2827 with subprocess.Popen(['nonexisting_i_hope'], 2828 stdout=subprocess.PIPE, 2829 stderr=subprocess.PIPE) as proc: 2830 pass 2831 2832 def test_broken_pipe_cleanup(self): 2833 """Broken pipe error should not prevent wait() (Issue 21619)""" 2834 proc = subprocess.Popen([sys.executable, '-c', 'pass'], 2835 stdin=subprocess.PIPE, 2836 bufsize=support.PIPE_MAX_SIZE*2) 2837 proc = proc.__enter__() 2838 # Prepare to send enough data to overflow any OS pipe buffering and 2839 # guarantee a broken pipe error. Data is held in BufferedWriter 2840 # buffer until closed. 2841 proc.stdin.write(b'x' * support.PIPE_MAX_SIZE) 2842 self.assertIsNone(proc.returncode) 2843 # EPIPE expected under POSIX; EINVAL under Windows 2844 self.assertRaises(OSError, proc.__exit__, None, None, None) 2845 self.assertEqual(proc.returncode, 0) 2846 self.assertTrue(proc.stdin.closed) 2847 2848 2849if __name__ == "__main__": 2850 unittest.main() 2851