1import unittest 2from unittest import mock 3from test import support 4from test.support import import_helper 5from test.support import os_helper 6from test.support import warnings_helper 7import subprocess 8import sys 9import signal 10import io 11import itertools 12import os 13import errno 14import tempfile 15import time 16import traceback 17import types 18import selectors 19import sysconfig 20import select 21import shutil 22import threading 23import gc 24import textwrap 25import json 26import pathlib 27from test.support.os_helper import FakePath 28 29try: 30 import _testcapi 31except ImportError: 32 _testcapi = None 33 34try: 35 import pwd 36except ImportError: 37 pwd = None 38try: 39 import grp 40except ImportError: 41 grp = None 42 43try: 44 import fcntl 45except: 46 fcntl = None 47 48if support.PGO: 49 raise unittest.SkipTest("test is not helpful for PGO") 50 51mswindows = (sys.platform == "win32") 52 53# 54# Depends on the following external programs: Python 55# 56 57if mswindows: 58 SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), ' 59 'os.O_BINARY);') 60else: 61 SETBINARY = '' 62 63NONEXISTING_CMD = ('nonexisting_i_hope',) 64# Ignore errors that indicate the command was not found 65NONEXISTING_ERRORS = (FileNotFoundError, NotADirectoryError, PermissionError) 66 67ZERO_RETURN_CMD = (sys.executable, '-c', 'pass') 68 69 70def setUpModule(): 71 shell_true = shutil.which('true') 72 if shell_true is None: 73 return 74 if (os.access(shell_true, os.X_OK) and 75 subprocess.run([shell_true]).returncode == 0): 76 global ZERO_RETURN_CMD 77 ZERO_RETURN_CMD = (shell_true,) # Faster than Python startup. 78 79 80class BaseTestCase(unittest.TestCase): 81 def setUp(self): 82 # Try to minimize the number of children we have so this test 83 # doesn't crash on some buildbots (Alphas in particular). 84 support.reap_children() 85 86 def tearDown(self): 87 if not mswindows: 88 # subprocess._active is not used on Windows and is set to None. 89 for inst in subprocess._active: 90 inst.wait() 91 subprocess._cleanup() 92 self.assertFalse( 93 subprocess._active, "subprocess._active not empty" 94 ) 95 self.doCleanups() 96 support.reap_children() 97 98 99class PopenTestException(Exception): 100 pass 101 102 103class PopenExecuteChildRaises(subprocess.Popen): 104 """Popen subclass for testing cleanup of subprocess.PIPE filehandles when 105 _execute_child fails. 106 """ 107 def _execute_child(self, *args, **kwargs): 108 raise PopenTestException("Forced Exception for Test") 109 110 111class ProcessTestCase(BaseTestCase): 112 113 def test_io_buffered_by_default(self): 114 p = subprocess.Popen(ZERO_RETURN_CMD, 115 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 116 stderr=subprocess.PIPE) 117 try: 118 self.assertIsInstance(p.stdin, io.BufferedIOBase) 119 self.assertIsInstance(p.stdout, io.BufferedIOBase) 120 self.assertIsInstance(p.stderr, io.BufferedIOBase) 121 finally: 122 p.stdin.close() 123 p.stdout.close() 124 p.stderr.close() 125 p.wait() 126 127 def test_io_unbuffered_works(self): 128 p = subprocess.Popen(ZERO_RETURN_CMD, 129 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 130 stderr=subprocess.PIPE, bufsize=0) 131 try: 132 self.assertIsInstance(p.stdin, io.RawIOBase) 133 self.assertIsInstance(p.stdout, io.RawIOBase) 134 self.assertIsInstance(p.stderr, io.RawIOBase) 135 finally: 136 p.stdin.close() 137 p.stdout.close() 138 p.stderr.close() 139 p.wait() 140 141 def test_call_seq(self): 142 # call() function with sequence argument 143 rc = subprocess.call([sys.executable, "-c", 144 "import sys; sys.exit(47)"]) 145 self.assertEqual(rc, 47) 146 147 def test_call_timeout(self): 148 # call() function with timeout argument; we want to test that the child 149 # process gets killed when the timeout expires. If the child isn't 150 # killed, this call will deadlock since subprocess.call waits for the 151 # child. 152 self.assertRaises(subprocess.TimeoutExpired, subprocess.call, 153 [sys.executable, "-c", "while True: pass"], 154 timeout=0.1) 155 156 def test_check_call_zero(self): 157 # check_call() function with zero return code 158 rc = subprocess.check_call(ZERO_RETURN_CMD) 159 self.assertEqual(rc, 0) 160 161 def test_check_call_nonzero(self): 162 # check_call() function with non-zero return code 163 with self.assertRaises(subprocess.CalledProcessError) as c: 164 subprocess.check_call([sys.executable, "-c", 165 "import sys; sys.exit(47)"]) 166 self.assertEqual(c.exception.returncode, 47) 167 168 def test_check_output(self): 169 # check_output() function with zero return code 170 output = subprocess.check_output( 171 [sys.executable, "-c", "print('BDFL')"]) 172 self.assertIn(b'BDFL', output) 173 174 def test_check_output_nonzero(self): 175 # check_call() function with non-zero return code 176 with self.assertRaises(subprocess.CalledProcessError) as c: 177 subprocess.check_output( 178 [sys.executable, "-c", "import sys; sys.exit(5)"]) 179 self.assertEqual(c.exception.returncode, 5) 180 181 def test_check_output_stderr(self): 182 # check_output() function stderr redirected to stdout 183 output = subprocess.check_output( 184 [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"], 185 stderr=subprocess.STDOUT) 186 self.assertIn(b'BDFL', output) 187 188 def test_check_output_stdin_arg(self): 189 # check_output() can be called with stdin set to a file 190 tf = tempfile.TemporaryFile() 191 self.addCleanup(tf.close) 192 tf.write(b'pear') 193 tf.seek(0) 194 output = subprocess.check_output( 195 [sys.executable, "-c", 196 "import sys; sys.stdout.write(sys.stdin.read().upper())"], 197 stdin=tf) 198 self.assertIn(b'PEAR', output) 199 200 def test_check_output_input_arg(self): 201 # check_output() can be called with input set to a string 202 output = subprocess.check_output( 203 [sys.executable, "-c", 204 "import sys; sys.stdout.write(sys.stdin.read().upper())"], 205 input=b'pear') 206 self.assertIn(b'PEAR', output) 207 208 def test_check_output_input_none(self): 209 """input=None has a legacy meaning of input='' on check_output.""" 210 output = subprocess.check_output( 211 [sys.executable, "-c", 212 "import sys; print('XX' if sys.stdin.read() else '')"], 213 input=None) 214 self.assertNotIn(b'XX', output) 215 216 def test_check_output_input_none_text(self): 217 output = subprocess.check_output( 218 [sys.executable, "-c", 219 "import sys; print('XX' if sys.stdin.read() else '')"], 220 input=None, text=True) 221 self.assertNotIn('XX', output) 222 223 def test_check_output_input_none_universal_newlines(self): 224 output = subprocess.check_output( 225 [sys.executable, "-c", 226 "import sys; print('XX' if sys.stdin.read() else '')"], 227 input=None, universal_newlines=True) 228 self.assertNotIn('XX', output) 229 230 def test_check_output_stdout_arg(self): 231 # check_output() refuses to accept 'stdout' argument 232 with self.assertRaises(ValueError) as c: 233 output = subprocess.check_output( 234 [sys.executable, "-c", "print('will not be run')"], 235 stdout=sys.stdout) 236 self.fail("Expected ValueError when stdout arg supplied.") 237 self.assertIn('stdout', c.exception.args[0]) 238 239 def test_check_output_stdin_with_input_arg(self): 240 # check_output() refuses to accept 'stdin' with 'input' 241 tf = tempfile.TemporaryFile() 242 self.addCleanup(tf.close) 243 tf.write(b'pear') 244 tf.seek(0) 245 with self.assertRaises(ValueError) as c: 246 output = subprocess.check_output( 247 [sys.executable, "-c", "print('will not be run')"], 248 stdin=tf, input=b'hare') 249 self.fail("Expected ValueError when stdin and input args supplied.") 250 self.assertIn('stdin', c.exception.args[0]) 251 self.assertIn('input', c.exception.args[0]) 252 253 def test_check_output_timeout(self): 254 # check_output() function with timeout arg 255 with self.assertRaises(subprocess.TimeoutExpired) as c: 256 output = subprocess.check_output( 257 [sys.executable, "-c", 258 "import sys, time\n" 259 "sys.stdout.write('BDFL')\n" 260 "sys.stdout.flush()\n" 261 "time.sleep(3600)"], 262 # Some heavily loaded buildbots (sparc Debian 3.x) require 263 # this much time to start and print. 264 timeout=3) 265 self.fail("Expected TimeoutExpired.") 266 self.assertEqual(c.exception.output, b'BDFL') 267 268 def test_call_kwargs(self): 269 # call() function with keyword args 270 newenv = os.environ.copy() 271 newenv["FRUIT"] = "banana" 272 rc = subprocess.call([sys.executable, "-c", 273 'import sys, os;' 274 'sys.exit(os.getenv("FRUIT")=="banana")'], 275 env=newenv) 276 self.assertEqual(rc, 1) 277 278 def test_invalid_args(self): 279 # Popen() called with invalid arguments should raise TypeError 280 # but Popen.__del__ should not complain (issue #12085) 281 with support.captured_stderr() as s: 282 self.assertRaises(TypeError, subprocess.Popen, invalid_arg_name=1) 283 argcount = subprocess.Popen.__init__.__code__.co_argcount 284 too_many_args = [0] * (argcount + 1) 285 self.assertRaises(TypeError, subprocess.Popen, *too_many_args) 286 self.assertEqual(s.getvalue(), '') 287 288 def test_stdin_none(self): 289 # .stdin is None when not redirected 290 p = subprocess.Popen([sys.executable, "-c", 'print("banana")'], 291 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 292 self.addCleanup(p.stdout.close) 293 self.addCleanup(p.stderr.close) 294 p.wait() 295 self.assertEqual(p.stdin, None) 296 297 def test_stdout_none(self): 298 # .stdout is None when not redirected, and the child's stdout will 299 # be inherited from the parent. In order to test this we run a 300 # subprocess in a subprocess: 301 # this_test 302 # \-- subprocess created by this test (parent) 303 # \-- subprocess created by the parent subprocess (child) 304 # The parent doesn't specify stdout, so the child will use the 305 # parent's stdout. This test checks that the message printed by the 306 # child goes to the parent stdout. The parent also checks that the 307 # child's stdout is None. See #11963. 308 code = ('import sys; from subprocess import Popen, PIPE;' 309 'p = Popen([sys.executable, "-c", "print(\'test_stdout_none\')"],' 310 ' stdin=PIPE, stderr=PIPE);' 311 'p.wait(); assert p.stdout is None;') 312 p = subprocess.Popen([sys.executable, "-c", code], 313 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 314 self.addCleanup(p.stdout.close) 315 self.addCleanup(p.stderr.close) 316 out, err = p.communicate() 317 self.assertEqual(p.returncode, 0, err) 318 self.assertEqual(out.rstrip(), b'test_stdout_none') 319 320 def test_stderr_none(self): 321 # .stderr is None when not redirected 322 p = subprocess.Popen([sys.executable, "-c", 'print("banana")'], 323 stdin=subprocess.PIPE, stdout=subprocess.PIPE) 324 self.addCleanup(p.stdout.close) 325 self.addCleanup(p.stdin.close) 326 p.wait() 327 self.assertEqual(p.stderr, None) 328 329 def _assert_python(self, pre_args, **kwargs): 330 # We include sys.exit() to prevent the test runner from hanging 331 # whenever python is found. 332 args = pre_args + ["import sys; sys.exit(47)"] 333 p = subprocess.Popen(args, **kwargs) 334 p.wait() 335 self.assertEqual(47, p.returncode) 336 337 def test_executable(self): 338 # Check that the executable argument works. 339 # 340 # On Unix (non-Mac and non-Windows), Python looks at args[0] to 341 # determine where its standard library is, so we need the directory 342 # of args[0] to be valid for the Popen() call to Python to succeed. 343 # See also issue #16170 and issue #7774. 344 doesnotexist = os.path.join(os.path.dirname(sys.executable), 345 "doesnotexist") 346 self._assert_python([doesnotexist, "-c"], executable=sys.executable) 347 348 def test_bytes_executable(self): 349 doesnotexist = os.path.join(os.path.dirname(sys.executable), 350 "doesnotexist") 351 self._assert_python([doesnotexist, "-c"], 352 executable=os.fsencode(sys.executable)) 353 354 def test_pathlike_executable(self): 355 doesnotexist = os.path.join(os.path.dirname(sys.executable), 356 "doesnotexist") 357 self._assert_python([doesnotexist, "-c"], 358 executable=FakePath(sys.executable)) 359 360 def test_executable_takes_precedence(self): 361 # Check that the executable argument takes precedence over args[0]. 362 # 363 # Verify first that the call succeeds without the executable arg. 364 pre_args = [sys.executable, "-c"] 365 self._assert_python(pre_args) 366 self.assertRaises(NONEXISTING_ERRORS, 367 self._assert_python, pre_args, 368 executable=NONEXISTING_CMD[0]) 369 370 @unittest.skipIf(mswindows, "executable argument replaces shell") 371 def test_executable_replaces_shell(self): 372 # Check that the executable argument replaces the default shell 373 # when shell=True. 374 self._assert_python([], executable=sys.executable, shell=True) 375 376 @unittest.skipIf(mswindows, "executable argument replaces shell") 377 def test_bytes_executable_replaces_shell(self): 378 self._assert_python([], executable=os.fsencode(sys.executable), 379 shell=True) 380 381 @unittest.skipIf(mswindows, "executable argument replaces shell") 382 def test_pathlike_executable_replaces_shell(self): 383 self._assert_python([], executable=FakePath(sys.executable), 384 shell=True) 385 386 # For use in the test_cwd* tests below. 387 def _normalize_cwd(self, cwd): 388 # Normalize an expected cwd (for Tru64 support). 389 # We can't use os.path.realpath since it doesn't expand Tru64 {memb} 390 # strings. See bug #1063571. 391 with os_helper.change_cwd(cwd): 392 return os.getcwd() 393 394 # For use in the test_cwd* tests below. 395 def _split_python_path(self): 396 # Return normalized (python_dir, python_base). 397 python_path = os.path.realpath(sys.executable) 398 return os.path.split(python_path) 399 400 # For use in the test_cwd* tests below. 401 def _assert_cwd(self, expected_cwd, python_arg, **kwargs): 402 # Invoke Python via Popen, and assert that (1) the call succeeds, 403 # and that (2) the current working directory of the child process 404 # matches *expected_cwd*. 405 p = subprocess.Popen([python_arg, "-c", 406 "import os, sys; " 407 "buf = sys.stdout.buffer; " 408 "buf.write(os.getcwd().encode()); " 409 "buf.flush(); " 410 "sys.exit(47)"], 411 stdout=subprocess.PIPE, 412 **kwargs) 413 self.addCleanup(p.stdout.close) 414 p.wait() 415 self.assertEqual(47, p.returncode) 416 normcase = os.path.normcase 417 self.assertEqual(normcase(expected_cwd), 418 normcase(p.stdout.read().decode())) 419 420 def test_cwd(self): 421 # Check that cwd changes the cwd for the child process. 422 temp_dir = tempfile.gettempdir() 423 temp_dir = self._normalize_cwd(temp_dir) 424 self._assert_cwd(temp_dir, sys.executable, cwd=temp_dir) 425 426 def test_cwd_with_bytes(self): 427 temp_dir = tempfile.gettempdir() 428 temp_dir = self._normalize_cwd(temp_dir) 429 self._assert_cwd(temp_dir, sys.executable, cwd=os.fsencode(temp_dir)) 430 431 def test_cwd_with_pathlike(self): 432 temp_dir = tempfile.gettempdir() 433 temp_dir = self._normalize_cwd(temp_dir) 434 self._assert_cwd(temp_dir, sys.executable, cwd=FakePath(temp_dir)) 435 436 @unittest.skipIf(mswindows, "pending resolution of issue #15533") 437 def test_cwd_with_relative_arg(self): 438 # Check that Popen looks for args[0] relative to cwd if args[0] 439 # is relative. 440 python_dir, python_base = self._split_python_path() 441 rel_python = os.path.join(os.curdir, python_base) 442 with os_helper.temp_cwd() as wrong_dir: 443 # Before calling with the correct cwd, confirm that the call fails 444 # without cwd and with the wrong cwd. 445 self.assertRaises(FileNotFoundError, subprocess.Popen, 446 [rel_python]) 447 self.assertRaises(FileNotFoundError, subprocess.Popen, 448 [rel_python], cwd=wrong_dir) 449 python_dir = self._normalize_cwd(python_dir) 450 self._assert_cwd(python_dir, rel_python, cwd=python_dir) 451 452 @unittest.skipIf(mswindows, "pending resolution of issue #15533") 453 def test_cwd_with_relative_executable(self): 454 # Check that Popen looks for executable relative to cwd if executable 455 # is relative (and that executable takes precedence over args[0]). 456 python_dir, python_base = self._split_python_path() 457 rel_python = os.path.join(os.curdir, python_base) 458 doesntexist = "somethingyoudonthave" 459 with os_helper.temp_cwd() as wrong_dir: 460 # Before calling with the correct cwd, confirm that the call fails 461 # without cwd and with the wrong cwd. 462 self.assertRaises(FileNotFoundError, subprocess.Popen, 463 [doesntexist], executable=rel_python) 464 self.assertRaises(FileNotFoundError, subprocess.Popen, 465 [doesntexist], executable=rel_python, 466 cwd=wrong_dir) 467 python_dir = self._normalize_cwd(python_dir) 468 self._assert_cwd(python_dir, doesntexist, executable=rel_python, 469 cwd=python_dir) 470 471 def test_cwd_with_absolute_arg(self): 472 # Check that Popen can find the executable when the cwd is wrong 473 # if args[0] is an absolute path. 474 python_dir, python_base = self._split_python_path() 475 abs_python = os.path.join(python_dir, python_base) 476 rel_python = os.path.join(os.curdir, python_base) 477 with os_helper.temp_dir() as wrong_dir: 478 # Before calling with an absolute path, confirm that using a 479 # relative path fails. 480 self.assertRaises(FileNotFoundError, subprocess.Popen, 481 [rel_python], cwd=wrong_dir) 482 wrong_dir = self._normalize_cwd(wrong_dir) 483 self._assert_cwd(wrong_dir, abs_python, cwd=wrong_dir) 484 485 @unittest.skipIf(sys.base_prefix != sys.prefix, 486 'Test is not venv-compatible') 487 def test_executable_with_cwd(self): 488 python_dir, python_base = self._split_python_path() 489 python_dir = self._normalize_cwd(python_dir) 490 self._assert_cwd(python_dir, "somethingyoudonthave", 491 executable=sys.executable, cwd=python_dir) 492 493 @unittest.skipIf(sys.base_prefix != sys.prefix, 494 'Test is not venv-compatible') 495 @unittest.skipIf(sysconfig.is_python_build(), 496 "need an installed Python. See #7774") 497 def test_executable_without_cwd(self): 498 # For a normal installation, it should work without 'cwd' 499 # argument. For test runs in the build directory, see #7774. 500 self._assert_cwd(os.getcwd(), "somethingyoudonthave", 501 executable=sys.executable) 502 503 def test_stdin_pipe(self): 504 # stdin redirection 505 p = subprocess.Popen([sys.executable, "-c", 506 'import sys; sys.exit(sys.stdin.read() == "pear")'], 507 stdin=subprocess.PIPE) 508 p.stdin.write(b"pear") 509 p.stdin.close() 510 p.wait() 511 self.assertEqual(p.returncode, 1) 512 513 def test_stdin_filedes(self): 514 # stdin is set to open file descriptor 515 tf = tempfile.TemporaryFile() 516 self.addCleanup(tf.close) 517 d = tf.fileno() 518 os.write(d, b"pear") 519 os.lseek(d, 0, 0) 520 p = subprocess.Popen([sys.executable, "-c", 521 'import sys; sys.exit(sys.stdin.read() == "pear")'], 522 stdin=d) 523 p.wait() 524 self.assertEqual(p.returncode, 1) 525 526 def test_stdin_fileobj(self): 527 # stdin is set to open file object 528 tf = tempfile.TemporaryFile() 529 self.addCleanup(tf.close) 530 tf.write(b"pear") 531 tf.seek(0) 532 p = subprocess.Popen([sys.executable, "-c", 533 'import sys; sys.exit(sys.stdin.read() == "pear")'], 534 stdin=tf) 535 p.wait() 536 self.assertEqual(p.returncode, 1) 537 538 def test_stdout_pipe(self): 539 # stdout redirection 540 p = subprocess.Popen([sys.executable, "-c", 541 'import sys; sys.stdout.write("orange")'], 542 stdout=subprocess.PIPE) 543 with p: 544 self.assertEqual(p.stdout.read(), b"orange") 545 546 def test_stdout_filedes(self): 547 # stdout is set to open file descriptor 548 tf = tempfile.TemporaryFile() 549 self.addCleanup(tf.close) 550 d = tf.fileno() 551 p = subprocess.Popen([sys.executable, "-c", 552 'import sys; sys.stdout.write("orange")'], 553 stdout=d) 554 p.wait() 555 os.lseek(d, 0, 0) 556 self.assertEqual(os.read(d, 1024), b"orange") 557 558 def test_stdout_fileobj(self): 559 # stdout is set to open file object 560 tf = tempfile.TemporaryFile() 561 self.addCleanup(tf.close) 562 p = subprocess.Popen([sys.executable, "-c", 563 'import sys; sys.stdout.write("orange")'], 564 stdout=tf) 565 p.wait() 566 tf.seek(0) 567 self.assertEqual(tf.read(), b"orange") 568 569 def test_stderr_pipe(self): 570 # stderr redirection 571 p = subprocess.Popen([sys.executable, "-c", 572 'import sys; sys.stderr.write("strawberry")'], 573 stderr=subprocess.PIPE) 574 with p: 575 self.assertEqual(p.stderr.read(), b"strawberry") 576 577 def test_stderr_filedes(self): 578 # stderr is set to open file descriptor 579 tf = tempfile.TemporaryFile() 580 self.addCleanup(tf.close) 581 d = tf.fileno() 582 p = subprocess.Popen([sys.executable, "-c", 583 'import sys; sys.stderr.write("strawberry")'], 584 stderr=d) 585 p.wait() 586 os.lseek(d, 0, 0) 587 self.assertEqual(os.read(d, 1024), b"strawberry") 588 589 def test_stderr_fileobj(self): 590 # stderr is set to open file object 591 tf = tempfile.TemporaryFile() 592 self.addCleanup(tf.close) 593 p = subprocess.Popen([sys.executable, "-c", 594 'import sys; sys.stderr.write("strawberry")'], 595 stderr=tf) 596 p.wait() 597 tf.seek(0) 598 self.assertEqual(tf.read(), b"strawberry") 599 600 def test_stderr_redirect_with_no_stdout_redirect(self): 601 # test stderr=STDOUT while stdout=None (not set) 602 603 # - grandchild prints to stderr 604 # - child redirects grandchild's stderr to its stdout 605 # - the parent should get grandchild's stderr in child's stdout 606 p = subprocess.Popen([sys.executable, "-c", 607 'import sys, subprocess;' 608 'rc = subprocess.call([sys.executable, "-c",' 609 ' "import sys;"' 610 ' "sys.stderr.write(\'42\')"],' 611 ' stderr=subprocess.STDOUT);' 612 'sys.exit(rc)'], 613 stdout=subprocess.PIPE, 614 stderr=subprocess.PIPE) 615 stdout, stderr = p.communicate() 616 #NOTE: stdout should get stderr from grandchild 617 self.assertEqual(stdout, b'42') 618 self.assertEqual(stderr, b'') # should be empty 619 self.assertEqual(p.returncode, 0) 620 621 def test_stdout_stderr_pipe(self): 622 # capture stdout and stderr to the same pipe 623 p = subprocess.Popen([sys.executable, "-c", 624 'import sys;' 625 'sys.stdout.write("apple");' 626 'sys.stdout.flush();' 627 'sys.stderr.write("orange")'], 628 stdout=subprocess.PIPE, 629 stderr=subprocess.STDOUT) 630 with p: 631 self.assertEqual(p.stdout.read(), b"appleorange") 632 633 def test_stdout_stderr_file(self): 634 # capture stdout and stderr to the same open file 635 tf = tempfile.TemporaryFile() 636 self.addCleanup(tf.close) 637 p = subprocess.Popen([sys.executable, "-c", 638 'import sys;' 639 'sys.stdout.write("apple");' 640 'sys.stdout.flush();' 641 'sys.stderr.write("orange")'], 642 stdout=tf, 643 stderr=tf) 644 p.wait() 645 tf.seek(0) 646 self.assertEqual(tf.read(), b"appleorange") 647 648 def test_stdout_filedes_of_stdout(self): 649 # stdout is set to 1 (#1531862). 650 # To avoid printing the text on stdout, we do something similar to 651 # test_stdout_none (see above). The parent subprocess calls the child 652 # subprocess passing stdout=1, and this test uses stdout=PIPE in 653 # order to capture and check the output of the parent. See #11963. 654 code = ('import sys, subprocess; ' 655 'rc = subprocess.call([sys.executable, "-c", ' 656 ' "import os, sys; sys.exit(os.write(sys.stdout.fileno(), ' 657 'b\'test with stdout=1\'))"], stdout=1); ' 658 'assert rc == 18') 659 p = subprocess.Popen([sys.executable, "-c", code], 660 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 661 self.addCleanup(p.stdout.close) 662 self.addCleanup(p.stderr.close) 663 out, err = p.communicate() 664 self.assertEqual(p.returncode, 0, err) 665 self.assertEqual(out.rstrip(), b'test with stdout=1') 666 667 def test_stdout_devnull(self): 668 p = subprocess.Popen([sys.executable, "-c", 669 'for i in range(10240):' 670 'print("x" * 1024)'], 671 stdout=subprocess.DEVNULL) 672 p.wait() 673 self.assertEqual(p.stdout, None) 674 675 def test_stderr_devnull(self): 676 p = subprocess.Popen([sys.executable, "-c", 677 'import sys\n' 678 'for i in range(10240):' 679 'sys.stderr.write("x" * 1024)'], 680 stderr=subprocess.DEVNULL) 681 p.wait() 682 self.assertEqual(p.stderr, None) 683 684 def test_stdin_devnull(self): 685 p = subprocess.Popen([sys.executable, "-c", 686 'import sys;' 687 'sys.stdin.read(1)'], 688 stdin=subprocess.DEVNULL) 689 p.wait() 690 self.assertEqual(p.stdin, None) 691 692 @unittest.skipUnless(fcntl and hasattr(fcntl, 'F_GETPIPE_SZ'), 693 'fcntl.F_GETPIPE_SZ required for test.') 694 def test_pipesizes(self): 695 test_pipe_r, test_pipe_w = os.pipe() 696 try: 697 # Get the default pipesize with F_GETPIPE_SZ 698 pipesize_default = fcntl.fcntl(test_pipe_w, fcntl.F_GETPIPE_SZ) 699 finally: 700 os.close(test_pipe_r) 701 os.close(test_pipe_w) 702 pipesize = pipesize_default // 2 703 if pipesize < 512: # the POSIX minimum 704 raise unittest.SkitTest( 705 'default pipesize too small to perform test.') 706 p = subprocess.Popen( 707 [sys.executable, "-c", 708 'import sys; sys.stdin.read(); sys.stdout.write("out"); ' 709 'sys.stderr.write("error!")'], 710 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 711 stderr=subprocess.PIPE, pipesize=pipesize) 712 try: 713 for fifo in [p.stdin, p.stdout, p.stderr]: 714 self.assertEqual( 715 fcntl.fcntl(fifo.fileno(), fcntl.F_GETPIPE_SZ), 716 pipesize) 717 # Windows pipe size can be acquired via GetNamedPipeInfoFunction 718 # https://docs.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-getnamedpipeinfo 719 # However, this function is not yet in _winapi. 720 p.stdin.write(b"pear") 721 p.stdin.close() 722 finally: 723 p.kill() 724 p.wait() 725 726 @unittest.skipUnless(fcntl and hasattr(fcntl, 'F_GETPIPE_SZ'), 727 'fcntl.F_GETPIPE_SZ required for test.') 728 def test_pipesize_default(self): 729 p = subprocess.Popen( 730 [sys.executable, "-c", 731 'import sys; sys.stdin.read(); sys.stdout.write("out"); ' 732 'sys.stderr.write("error!")'], 733 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 734 stderr=subprocess.PIPE, pipesize=-1) 735 try: 736 fp_r, fp_w = os.pipe() 737 try: 738 default_pipesize = fcntl.fcntl(fp_w, fcntl.F_GETPIPE_SZ) 739 for fifo in [p.stdin, p.stdout, p.stderr]: 740 self.assertEqual( 741 fcntl.fcntl(fifo.fileno(), fcntl.F_GETPIPE_SZ), 742 default_pipesize) 743 finally: 744 os.close(fp_r) 745 os.close(fp_w) 746 # On other platforms we cannot test the pipe size (yet). But above 747 # code using pipesize=-1 should not crash. 748 p.stdin.close() 749 finally: 750 p.kill() 751 p.wait() 752 753 def test_env(self): 754 newenv = os.environ.copy() 755 newenv["FRUIT"] = "orange" 756 with subprocess.Popen([sys.executable, "-c", 757 'import sys,os;' 758 'sys.stdout.write(os.getenv("FRUIT"))'], 759 stdout=subprocess.PIPE, 760 env=newenv) as p: 761 stdout, stderr = p.communicate() 762 self.assertEqual(stdout, b"orange") 763 764 # Windows requires at least the SYSTEMROOT environment variable to start 765 # Python 766 @unittest.skipIf(sys.platform == 'win32', 767 'cannot test an empty env on Windows') 768 @unittest.skipIf(sysconfig.get_config_var('Py_ENABLE_SHARED') == 1, 769 'The Python shared library cannot be loaded ' 770 'with an empty environment.') 771 def test_empty_env(self): 772 """Verify that env={} is as empty as possible.""" 773 774 def is_env_var_to_ignore(n): 775 """Determine if an environment variable is under our control.""" 776 # This excludes some __CF_* and VERSIONER_* keys MacOS insists 777 # on adding even when the environment in exec is empty. 778 # Gentoo sandboxes also force LD_PRELOAD and SANDBOX_* to exist. 779 return ('VERSIONER' in n or '__CF' in n or # MacOS 780 n == 'LD_PRELOAD' or n.startswith('SANDBOX') or # Gentoo 781 n == 'LC_CTYPE') # Locale coercion triggered 782 783 with subprocess.Popen([sys.executable, "-c", 784 'import os; print(list(os.environ.keys()))'], 785 stdout=subprocess.PIPE, env={}) as p: 786 stdout, stderr = p.communicate() 787 child_env_names = eval(stdout.strip()) 788 self.assertIsInstance(child_env_names, list) 789 child_env_names = [k for k in child_env_names 790 if not is_env_var_to_ignore(k)] 791 self.assertEqual(child_env_names, []) 792 793 def test_invalid_cmd(self): 794 # null character in the command name 795 cmd = sys.executable + '\0' 796 with self.assertRaises(ValueError): 797 subprocess.Popen([cmd, "-c", "pass"]) 798 799 # null character in the command argument 800 with self.assertRaises(ValueError): 801 subprocess.Popen([sys.executable, "-c", "pass#\0"]) 802 803 def test_invalid_env(self): 804 # null character in the environment variable name 805 newenv = os.environ.copy() 806 newenv["FRUIT\0VEGETABLE"] = "cabbage" 807 with self.assertRaises(ValueError): 808 subprocess.Popen(ZERO_RETURN_CMD, env=newenv) 809 810 # null character in the environment variable value 811 newenv = os.environ.copy() 812 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage" 813 with self.assertRaises(ValueError): 814 subprocess.Popen(ZERO_RETURN_CMD, env=newenv) 815 816 # equal character in the environment variable name 817 newenv = os.environ.copy() 818 newenv["FRUIT=ORANGE"] = "lemon" 819 with self.assertRaises(ValueError): 820 subprocess.Popen(ZERO_RETURN_CMD, env=newenv) 821 822 # equal character in the environment variable value 823 newenv = os.environ.copy() 824 newenv["FRUIT"] = "orange=lemon" 825 with subprocess.Popen([sys.executable, "-c", 826 'import sys, os;' 827 'sys.stdout.write(os.getenv("FRUIT"))'], 828 stdout=subprocess.PIPE, 829 env=newenv) as p: 830 stdout, stderr = p.communicate() 831 self.assertEqual(stdout, b"orange=lemon") 832 833 def test_communicate_stdin(self): 834 p = subprocess.Popen([sys.executable, "-c", 835 'import sys;' 836 'sys.exit(sys.stdin.read() == "pear")'], 837 stdin=subprocess.PIPE) 838 p.communicate(b"pear") 839 self.assertEqual(p.returncode, 1) 840 841 def test_communicate_stdout(self): 842 p = subprocess.Popen([sys.executable, "-c", 843 'import sys; sys.stdout.write("pineapple")'], 844 stdout=subprocess.PIPE) 845 (stdout, stderr) = p.communicate() 846 self.assertEqual(stdout, b"pineapple") 847 self.assertEqual(stderr, None) 848 849 def test_communicate_stderr(self): 850 p = subprocess.Popen([sys.executable, "-c", 851 'import sys; sys.stderr.write("pineapple")'], 852 stderr=subprocess.PIPE) 853 (stdout, stderr) = p.communicate() 854 self.assertEqual(stdout, None) 855 self.assertEqual(stderr, b"pineapple") 856 857 def test_communicate(self): 858 p = subprocess.Popen([sys.executable, "-c", 859 'import sys,os;' 860 'sys.stderr.write("pineapple");' 861 'sys.stdout.write(sys.stdin.read())'], 862 stdin=subprocess.PIPE, 863 stdout=subprocess.PIPE, 864 stderr=subprocess.PIPE) 865 self.addCleanup(p.stdout.close) 866 self.addCleanup(p.stderr.close) 867 self.addCleanup(p.stdin.close) 868 (stdout, stderr) = p.communicate(b"banana") 869 self.assertEqual(stdout, b"banana") 870 self.assertEqual(stderr, b"pineapple") 871 872 def test_communicate_timeout(self): 873 p = subprocess.Popen([sys.executable, "-c", 874 'import sys,os,time;' 875 'sys.stderr.write("pineapple\\n");' 876 'time.sleep(1);' 877 'sys.stderr.write("pear\\n");' 878 'sys.stdout.write(sys.stdin.read())'], 879 universal_newlines=True, 880 stdin=subprocess.PIPE, 881 stdout=subprocess.PIPE, 882 stderr=subprocess.PIPE) 883 self.assertRaises(subprocess.TimeoutExpired, p.communicate, "banana", 884 timeout=0.3) 885 # Make sure we can keep waiting for it, and that we get the whole output 886 # after it completes. 887 (stdout, stderr) = p.communicate() 888 self.assertEqual(stdout, "banana") 889 self.assertEqual(stderr.encode(), b"pineapple\npear\n") 890 891 def test_communicate_timeout_large_output(self): 892 # Test an expiring timeout while the child is outputting lots of data. 893 p = subprocess.Popen([sys.executable, "-c", 894 'import sys,os,time;' 895 'sys.stdout.write("a" * (64 * 1024));' 896 'time.sleep(0.2);' 897 'sys.stdout.write("a" * (64 * 1024));' 898 'time.sleep(0.2);' 899 'sys.stdout.write("a" * (64 * 1024));' 900 'time.sleep(0.2);' 901 'sys.stdout.write("a" * (64 * 1024));'], 902 stdout=subprocess.PIPE) 903 self.assertRaises(subprocess.TimeoutExpired, p.communicate, timeout=0.4) 904 (stdout, _) = p.communicate() 905 self.assertEqual(len(stdout), 4 * 64 * 1024) 906 907 # Test for the fd leak reported in http://bugs.python.org/issue2791. 908 def test_communicate_pipe_fd_leak(self): 909 for stdin_pipe in (False, True): 910 for stdout_pipe in (False, True): 911 for stderr_pipe in (False, True): 912 options = {} 913 if stdin_pipe: 914 options['stdin'] = subprocess.PIPE 915 if stdout_pipe: 916 options['stdout'] = subprocess.PIPE 917 if stderr_pipe: 918 options['stderr'] = subprocess.PIPE 919 if not options: 920 continue 921 p = subprocess.Popen(ZERO_RETURN_CMD, **options) 922 p.communicate() 923 if p.stdin is not None: 924 self.assertTrue(p.stdin.closed) 925 if p.stdout is not None: 926 self.assertTrue(p.stdout.closed) 927 if p.stderr is not None: 928 self.assertTrue(p.stderr.closed) 929 930 def test_communicate_returns(self): 931 # communicate() should return None if no redirection is active 932 p = subprocess.Popen([sys.executable, "-c", 933 "import sys; sys.exit(47)"]) 934 (stdout, stderr) = p.communicate() 935 self.assertEqual(stdout, None) 936 self.assertEqual(stderr, None) 937 938 def test_communicate_pipe_buf(self): 939 # communicate() with writes larger than pipe_buf 940 # This test will probably deadlock rather than fail, if 941 # communicate() does not work properly. 942 x, y = os.pipe() 943 os.close(x) 944 os.close(y) 945 p = subprocess.Popen([sys.executable, "-c", 946 'import sys,os;' 947 'sys.stdout.write(sys.stdin.read(47));' 948 'sys.stderr.write("x" * %d);' 949 'sys.stdout.write(sys.stdin.read())' % 950 support.PIPE_MAX_SIZE], 951 stdin=subprocess.PIPE, 952 stdout=subprocess.PIPE, 953 stderr=subprocess.PIPE) 954 self.addCleanup(p.stdout.close) 955 self.addCleanup(p.stderr.close) 956 self.addCleanup(p.stdin.close) 957 string_to_write = b"a" * support.PIPE_MAX_SIZE 958 (stdout, stderr) = p.communicate(string_to_write) 959 self.assertEqual(stdout, string_to_write) 960 961 def test_writes_before_communicate(self): 962 # stdin.write before communicate() 963 p = subprocess.Popen([sys.executable, "-c", 964 'import sys,os;' 965 'sys.stdout.write(sys.stdin.read())'], 966 stdin=subprocess.PIPE, 967 stdout=subprocess.PIPE, 968 stderr=subprocess.PIPE) 969 self.addCleanup(p.stdout.close) 970 self.addCleanup(p.stderr.close) 971 self.addCleanup(p.stdin.close) 972 p.stdin.write(b"banana") 973 (stdout, stderr) = p.communicate(b"split") 974 self.assertEqual(stdout, b"bananasplit") 975 self.assertEqual(stderr, b"") 976 977 def test_universal_newlines_and_text(self): 978 args = [ 979 sys.executable, "-c", 980 'import sys,os;' + SETBINARY + 981 'buf = sys.stdout.buffer;' 982 'buf.write(sys.stdin.readline().encode());' 983 'buf.flush();' 984 'buf.write(b"line2\\n");' 985 'buf.flush();' 986 'buf.write(sys.stdin.read().encode());' 987 'buf.flush();' 988 'buf.write(b"line4\\n");' 989 'buf.flush();' 990 'buf.write(b"line5\\r\\n");' 991 'buf.flush();' 992 'buf.write(b"line6\\r");' 993 'buf.flush();' 994 'buf.write(b"\\nline7");' 995 'buf.flush();' 996 'buf.write(b"\\nline8");'] 997 998 for extra_kwarg in ('universal_newlines', 'text'): 999 p = subprocess.Popen(args, **{'stdin': subprocess.PIPE, 1000 'stdout': subprocess.PIPE, 1001 extra_kwarg: True}) 1002 with p: 1003 p.stdin.write("line1\n") 1004 p.stdin.flush() 1005 self.assertEqual(p.stdout.readline(), "line1\n") 1006 p.stdin.write("line3\n") 1007 p.stdin.close() 1008 self.addCleanup(p.stdout.close) 1009 self.assertEqual(p.stdout.readline(), 1010 "line2\n") 1011 self.assertEqual(p.stdout.read(6), 1012 "line3\n") 1013 self.assertEqual(p.stdout.read(), 1014 "line4\nline5\nline6\nline7\nline8") 1015 1016 def test_universal_newlines_communicate(self): 1017 # universal newlines through communicate() 1018 p = subprocess.Popen([sys.executable, "-c", 1019 'import sys,os;' + SETBINARY + 1020 'buf = sys.stdout.buffer;' 1021 'buf.write(b"line2\\n");' 1022 'buf.flush();' 1023 'buf.write(b"line4\\n");' 1024 'buf.flush();' 1025 'buf.write(b"line5\\r\\n");' 1026 'buf.flush();' 1027 'buf.write(b"line6\\r");' 1028 'buf.flush();' 1029 'buf.write(b"\\nline7");' 1030 'buf.flush();' 1031 'buf.write(b"\\nline8");'], 1032 stderr=subprocess.PIPE, 1033 stdout=subprocess.PIPE, 1034 universal_newlines=1) 1035 self.addCleanup(p.stdout.close) 1036 self.addCleanup(p.stderr.close) 1037 (stdout, stderr) = p.communicate() 1038 self.assertEqual(stdout, 1039 "line2\nline4\nline5\nline6\nline7\nline8") 1040 1041 def test_universal_newlines_communicate_stdin(self): 1042 # universal newlines through communicate(), with only stdin 1043 p = subprocess.Popen([sys.executable, "-c", 1044 'import sys,os;' + SETBINARY + textwrap.dedent(''' 1045 s = sys.stdin.readline() 1046 assert s == "line1\\n", repr(s) 1047 s = sys.stdin.read() 1048 assert s == "line3\\n", repr(s) 1049 ''')], 1050 stdin=subprocess.PIPE, 1051 universal_newlines=1) 1052 (stdout, stderr) = p.communicate("line1\nline3\n") 1053 self.assertEqual(p.returncode, 0) 1054 1055 def test_universal_newlines_communicate_input_none(self): 1056 # Test communicate(input=None) with universal newlines. 1057 # 1058 # We set stdout to PIPE because, as of this writing, a different 1059 # code path is tested when the number of pipes is zero or one. 1060 p = subprocess.Popen(ZERO_RETURN_CMD, 1061 stdin=subprocess.PIPE, 1062 stdout=subprocess.PIPE, 1063 universal_newlines=True) 1064 p.communicate() 1065 self.assertEqual(p.returncode, 0) 1066 1067 def test_universal_newlines_communicate_stdin_stdout_stderr(self): 1068 # universal newlines through communicate(), with stdin, stdout, stderr 1069 p = subprocess.Popen([sys.executable, "-c", 1070 'import sys,os;' + SETBINARY + textwrap.dedent(''' 1071 s = sys.stdin.buffer.readline() 1072 sys.stdout.buffer.write(s) 1073 sys.stdout.buffer.write(b"line2\\r") 1074 sys.stderr.buffer.write(b"eline2\\n") 1075 s = sys.stdin.buffer.read() 1076 sys.stdout.buffer.write(s) 1077 sys.stdout.buffer.write(b"line4\\n") 1078 sys.stdout.buffer.write(b"line5\\r\\n") 1079 sys.stderr.buffer.write(b"eline6\\r") 1080 sys.stderr.buffer.write(b"eline7\\r\\nz") 1081 ''')], 1082 stdin=subprocess.PIPE, 1083 stderr=subprocess.PIPE, 1084 stdout=subprocess.PIPE, 1085 universal_newlines=True) 1086 self.addCleanup(p.stdout.close) 1087 self.addCleanup(p.stderr.close) 1088 (stdout, stderr) = p.communicate("line1\nline3\n") 1089 self.assertEqual(p.returncode, 0) 1090 self.assertEqual("line1\nline2\nline3\nline4\nline5\n", stdout) 1091 # Python debug build push something like "[42442 refs]\n" 1092 # to stderr at exit of subprocess. 1093 self.assertTrue(stderr.startswith("eline2\neline6\neline7\n")) 1094 1095 def test_universal_newlines_communicate_encodings(self): 1096 # Check that universal newlines mode works for various encodings, 1097 # in particular for encodings in the UTF-16 and UTF-32 families. 1098 # See issue #15595. 1099 # 1100 # UTF-16 and UTF-32-BE are sufficient to check both with BOM and 1101 # without, and UTF-16 and UTF-32. 1102 for encoding in ['utf-16', 'utf-32-be']: 1103 code = ("import sys; " 1104 r"sys.stdout.buffer.write('1\r\n2\r3\n4'.encode('%s'))" % 1105 encoding) 1106 args = [sys.executable, '-c', code] 1107 # We set stdin to be non-None because, as of this writing, 1108 # a different code path is used when the number of pipes is 1109 # zero or one. 1110 popen = subprocess.Popen(args, 1111 stdin=subprocess.PIPE, 1112 stdout=subprocess.PIPE, 1113 encoding=encoding) 1114 stdout, stderr = popen.communicate(input='') 1115 self.assertEqual(stdout, '1\n2\n3\n4') 1116 1117 def test_communicate_errors(self): 1118 for errors, expected in [ 1119 ('ignore', ''), 1120 ('replace', '\ufffd\ufffd'), 1121 ('surrogateescape', '\udc80\udc80'), 1122 ('backslashreplace', '\\x80\\x80'), 1123 ]: 1124 code = ("import sys; " 1125 r"sys.stdout.buffer.write(b'[\x80\x80]')") 1126 args = [sys.executable, '-c', code] 1127 # We set stdin to be non-None because, as of this writing, 1128 # a different code path is used when the number of pipes is 1129 # zero or one. 1130 popen = subprocess.Popen(args, 1131 stdin=subprocess.PIPE, 1132 stdout=subprocess.PIPE, 1133 encoding='utf-8', 1134 errors=errors) 1135 stdout, stderr = popen.communicate(input='') 1136 self.assertEqual(stdout, '[{}]'.format(expected)) 1137 1138 def test_no_leaking(self): 1139 # Make sure we leak no resources 1140 if not mswindows: 1141 max_handles = 1026 # too much for most UNIX systems 1142 else: 1143 max_handles = 2050 # too much for (at least some) Windows setups 1144 handles = [] 1145 tmpdir = tempfile.mkdtemp() 1146 try: 1147 for i in range(max_handles): 1148 try: 1149 tmpfile = os.path.join(tmpdir, os_helper.TESTFN) 1150 handles.append(os.open(tmpfile, os.O_WRONLY|os.O_CREAT)) 1151 except OSError as e: 1152 if e.errno != errno.EMFILE: 1153 raise 1154 break 1155 else: 1156 self.skipTest("failed to reach the file descriptor limit " 1157 "(tried %d)" % max_handles) 1158 # Close a couple of them (should be enough for a subprocess) 1159 for i in range(10): 1160 os.close(handles.pop()) 1161 # Loop creating some subprocesses. If one of them leaks some fds, 1162 # the next loop iteration will fail by reaching the max fd limit. 1163 for i in range(15): 1164 p = subprocess.Popen([sys.executable, "-c", 1165 "import sys;" 1166 "sys.stdout.write(sys.stdin.read())"], 1167 stdin=subprocess.PIPE, 1168 stdout=subprocess.PIPE, 1169 stderr=subprocess.PIPE) 1170 data = p.communicate(b"lime")[0] 1171 self.assertEqual(data, b"lime") 1172 finally: 1173 for h in handles: 1174 os.close(h) 1175 shutil.rmtree(tmpdir) 1176 1177 def test_list2cmdline(self): 1178 self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']), 1179 '"a b c" d e') 1180 self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']), 1181 'ab\\"c \\ d') 1182 self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']), 1183 'ab\\"c " \\\\" d') 1184 self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']), 1185 'a\\\\\\b "de fg" h') 1186 self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']), 1187 'a\\\\\\"b c d') 1188 self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']), 1189 '"a\\\\b c" d e') 1190 self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']), 1191 '"a\\\\b\\ c" d e') 1192 self.assertEqual(subprocess.list2cmdline(['ab', '']), 1193 'ab ""') 1194 1195 def test_poll(self): 1196 p = subprocess.Popen([sys.executable, "-c", 1197 "import os; os.read(0, 1)"], 1198 stdin=subprocess.PIPE) 1199 self.addCleanup(p.stdin.close) 1200 self.assertIsNone(p.poll()) 1201 os.write(p.stdin.fileno(), b'A') 1202 p.wait() 1203 # Subsequent invocations should just return the returncode 1204 self.assertEqual(p.poll(), 0) 1205 1206 def test_wait(self): 1207 p = subprocess.Popen(ZERO_RETURN_CMD) 1208 self.assertEqual(p.wait(), 0) 1209 # Subsequent invocations should just return the returncode 1210 self.assertEqual(p.wait(), 0) 1211 1212 def test_wait_timeout(self): 1213 p = subprocess.Popen([sys.executable, 1214 "-c", "import time; time.sleep(0.3)"]) 1215 with self.assertRaises(subprocess.TimeoutExpired) as c: 1216 p.wait(timeout=0.0001) 1217 self.assertIn("0.0001", str(c.exception)) # For coverage of __str__. 1218 self.assertEqual(p.wait(timeout=support.SHORT_TIMEOUT), 0) 1219 1220 def test_invalid_bufsize(self): 1221 # an invalid type of the bufsize argument should raise 1222 # TypeError. 1223 with self.assertRaises(TypeError): 1224 subprocess.Popen(ZERO_RETURN_CMD, "orange") 1225 1226 def test_bufsize_is_none(self): 1227 # bufsize=None should be the same as bufsize=0. 1228 p = subprocess.Popen(ZERO_RETURN_CMD, None) 1229 self.assertEqual(p.wait(), 0) 1230 # Again with keyword arg 1231 p = subprocess.Popen(ZERO_RETURN_CMD, bufsize=None) 1232 self.assertEqual(p.wait(), 0) 1233 1234 def _test_bufsize_equal_one(self, line, expected, universal_newlines): 1235 # subprocess may deadlock with bufsize=1, see issue #21332 1236 with subprocess.Popen([sys.executable, "-c", "import sys;" 1237 "sys.stdout.write(sys.stdin.readline());" 1238 "sys.stdout.flush()"], 1239 stdin=subprocess.PIPE, 1240 stdout=subprocess.PIPE, 1241 stderr=subprocess.DEVNULL, 1242 bufsize=1, 1243 universal_newlines=universal_newlines) as p: 1244 p.stdin.write(line) # expect that it flushes the line in text mode 1245 os.close(p.stdin.fileno()) # close it without flushing the buffer 1246 read_line = p.stdout.readline() 1247 with support.SuppressCrashReport(): 1248 try: 1249 p.stdin.close() 1250 except OSError: 1251 pass 1252 p.stdin = None 1253 self.assertEqual(p.returncode, 0) 1254 self.assertEqual(read_line, expected) 1255 1256 def test_bufsize_equal_one_text_mode(self): 1257 # line is flushed in text mode with bufsize=1. 1258 # we should get the full line in return 1259 line = "line\n" 1260 self._test_bufsize_equal_one(line, line, universal_newlines=True) 1261 1262 def test_bufsize_equal_one_binary_mode(self): 1263 # line is not flushed in binary mode with bufsize=1. 1264 # we should get empty response 1265 line = b'line' + os.linesep.encode() # assume ascii-based locale 1266 with self.assertWarnsRegex(RuntimeWarning, 'line buffering'): 1267 self._test_bufsize_equal_one(line, b'', universal_newlines=False) 1268 1269 def test_leaking_fds_on_error(self): 1270 # see bug #5179: Popen leaks file descriptors to PIPEs if 1271 # the child fails to execute; this will eventually exhaust 1272 # the maximum number of open fds. 1024 seems a very common 1273 # value for that limit, but Windows has 2048, so we loop 1274 # 1024 times (each call leaked two fds). 1275 for i in range(1024): 1276 with self.assertRaises(NONEXISTING_ERRORS): 1277 subprocess.Popen(NONEXISTING_CMD, 1278 stdout=subprocess.PIPE, 1279 stderr=subprocess.PIPE) 1280 1281 def test_nonexisting_with_pipes(self): 1282 # bpo-30121: Popen with pipes must close properly pipes on error. 1283 # Previously, os.close() was called with a Windows handle which is not 1284 # a valid file descriptor. 1285 # 1286 # Run the test in a subprocess to control how the CRT reports errors 1287 # and to get stderr content. 1288 try: 1289 import msvcrt 1290 msvcrt.CrtSetReportMode 1291 except (AttributeError, ImportError): 1292 self.skipTest("need msvcrt.CrtSetReportMode") 1293 1294 code = textwrap.dedent(f""" 1295 import msvcrt 1296 import subprocess 1297 1298 cmd = {NONEXISTING_CMD!r} 1299 1300 for report_type in [msvcrt.CRT_WARN, 1301 msvcrt.CRT_ERROR, 1302 msvcrt.CRT_ASSERT]: 1303 msvcrt.CrtSetReportMode(report_type, msvcrt.CRTDBG_MODE_FILE) 1304 msvcrt.CrtSetReportFile(report_type, msvcrt.CRTDBG_FILE_STDERR) 1305 1306 try: 1307 subprocess.Popen(cmd, 1308 stdout=subprocess.PIPE, 1309 stderr=subprocess.PIPE) 1310 except OSError: 1311 pass 1312 """) 1313 cmd = [sys.executable, "-c", code] 1314 proc = subprocess.Popen(cmd, 1315 stderr=subprocess.PIPE, 1316 universal_newlines=True) 1317 with proc: 1318 stderr = proc.communicate()[1] 1319 self.assertEqual(stderr, "") 1320 self.assertEqual(proc.returncode, 0) 1321 1322 def test_double_close_on_error(self): 1323 # Issue #18851 1324 fds = [] 1325 def open_fds(): 1326 for i in range(20): 1327 fds.extend(os.pipe()) 1328 time.sleep(0.001) 1329 t = threading.Thread(target=open_fds) 1330 t.start() 1331 try: 1332 with self.assertRaises(EnvironmentError): 1333 subprocess.Popen(NONEXISTING_CMD, 1334 stdin=subprocess.PIPE, 1335 stdout=subprocess.PIPE, 1336 stderr=subprocess.PIPE) 1337 finally: 1338 t.join() 1339 exc = None 1340 for fd in fds: 1341 # If a double close occurred, some of those fds will 1342 # already have been closed by mistake, and os.close() 1343 # here will raise. 1344 try: 1345 os.close(fd) 1346 except OSError as e: 1347 exc = e 1348 if exc is not None: 1349 raise exc 1350 1351 def test_threadsafe_wait(self): 1352 """Issue21291: Popen.wait() needs to be threadsafe for returncode.""" 1353 proc = subprocess.Popen([sys.executable, '-c', 1354 'import time; time.sleep(12)']) 1355 self.assertEqual(proc.returncode, None) 1356 results = [] 1357 1358 def kill_proc_timer_thread(): 1359 results.append(('thread-start-poll-result', proc.poll())) 1360 # terminate it from the thread and wait for the result. 1361 proc.kill() 1362 proc.wait() 1363 results.append(('thread-after-kill-and-wait', proc.returncode)) 1364 # this wait should be a no-op given the above. 1365 proc.wait() 1366 results.append(('thread-after-second-wait', proc.returncode)) 1367 1368 # This is a timing sensitive test, the failure mode is 1369 # triggered when both the main thread and this thread are in 1370 # the wait() call at once. The delay here is to allow the 1371 # main thread to most likely be blocked in its wait() call. 1372 t = threading.Timer(0.2, kill_proc_timer_thread) 1373 t.start() 1374 1375 if mswindows: 1376 expected_errorcode = 1 1377 else: 1378 # Should be -9 because of the proc.kill() from the thread. 1379 expected_errorcode = -9 1380 1381 # Wait for the process to finish; the thread should kill it 1382 # long before it finishes on its own. Supplying a timeout 1383 # triggers a different code path for better coverage. 1384 proc.wait(timeout=support.SHORT_TIMEOUT) 1385 self.assertEqual(proc.returncode, expected_errorcode, 1386 msg="unexpected result in wait from main thread") 1387 1388 # This should be a no-op with no change in returncode. 1389 proc.wait() 1390 self.assertEqual(proc.returncode, expected_errorcode, 1391 msg="unexpected result in second main wait.") 1392 1393 t.join() 1394 # Ensure that all of the thread results are as expected. 1395 # When a race condition occurs in wait(), the returncode could 1396 # be set by the wrong thread that doesn't actually have it 1397 # leading to an incorrect value. 1398 self.assertEqual([('thread-start-poll-result', None), 1399 ('thread-after-kill-and-wait', expected_errorcode), 1400 ('thread-after-second-wait', expected_errorcode)], 1401 results) 1402 1403 def test_issue8780(self): 1404 # Ensure that stdout is inherited from the parent 1405 # if stdout=PIPE is not used 1406 code = ';'.join(( 1407 'import subprocess, sys', 1408 'retcode = subprocess.call(' 1409 "[sys.executable, '-c', 'print(\"Hello World!\")'])", 1410 'assert retcode == 0')) 1411 output = subprocess.check_output([sys.executable, '-c', code]) 1412 self.assertTrue(output.startswith(b'Hello World!'), ascii(output)) 1413 1414 def test_handles_closed_on_exception(self): 1415 # If CreateProcess exits with an error, ensure the 1416 # duplicate output handles are released 1417 ifhandle, ifname = tempfile.mkstemp() 1418 ofhandle, ofname = tempfile.mkstemp() 1419 efhandle, efname = tempfile.mkstemp() 1420 try: 1421 subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle, 1422 stderr=efhandle) 1423 except OSError: 1424 os.close(ifhandle) 1425 os.remove(ifname) 1426 os.close(ofhandle) 1427 os.remove(ofname) 1428 os.close(efhandle) 1429 os.remove(efname) 1430 self.assertFalse(os.path.exists(ifname)) 1431 self.assertFalse(os.path.exists(ofname)) 1432 self.assertFalse(os.path.exists(efname)) 1433 1434 def test_communicate_epipe(self): 1435 # Issue 10963: communicate() should hide EPIPE 1436 p = subprocess.Popen(ZERO_RETURN_CMD, 1437 stdin=subprocess.PIPE, 1438 stdout=subprocess.PIPE, 1439 stderr=subprocess.PIPE) 1440 self.addCleanup(p.stdout.close) 1441 self.addCleanup(p.stderr.close) 1442 self.addCleanup(p.stdin.close) 1443 p.communicate(b"x" * 2**20) 1444 1445 def test_repr(self): 1446 path_cmd = pathlib.Path("my-tool.py") 1447 pathlib_cls = path_cmd.__class__.__name__ 1448 1449 cases = [ 1450 ("ls", True, 123, "<Popen: returncode: 123 args: 'ls'>"), 1451 ('a' * 100, True, 0, 1452 "<Popen: returncode: 0 args: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa...>"), 1453 (["ls"], False, None, "<Popen: returncode: None args: ['ls']>"), 1454 (["ls", '--my-opts', 'a' * 100], False, None, 1455 "<Popen: returncode: None args: ['ls', '--my-opts', 'aaaaaaaaaaaaaaaaaaaaaaaa...>"), 1456 (path_cmd, False, 7, f"<Popen: returncode: 7 args: {pathlib_cls}('my-tool.py')>") 1457 ] 1458 with unittest.mock.patch.object(subprocess.Popen, '_execute_child'): 1459 for cmd, shell, code, sx in cases: 1460 p = subprocess.Popen(cmd, shell=shell) 1461 p.returncode = code 1462 self.assertEqual(repr(p), sx) 1463 1464 def test_communicate_epipe_only_stdin(self): 1465 # Issue 10963: communicate() should hide EPIPE 1466 p = subprocess.Popen(ZERO_RETURN_CMD, 1467 stdin=subprocess.PIPE) 1468 self.addCleanup(p.stdin.close) 1469 p.wait() 1470 p.communicate(b"x" * 2**20) 1471 1472 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 1473 "Requires signal.SIGUSR1") 1474 @unittest.skipUnless(hasattr(os, 'kill'), 1475 "Requires os.kill") 1476 @unittest.skipUnless(hasattr(os, 'getppid'), 1477 "Requires os.getppid") 1478 def test_communicate_eintr(self): 1479 # Issue #12493: communicate() should handle EINTR 1480 def handler(signum, frame): 1481 pass 1482 old_handler = signal.signal(signal.SIGUSR1, handler) 1483 self.addCleanup(signal.signal, signal.SIGUSR1, old_handler) 1484 1485 args = [sys.executable, "-c", 1486 'import os, signal;' 1487 'os.kill(os.getppid(), signal.SIGUSR1)'] 1488 for stream in ('stdout', 'stderr'): 1489 kw = {stream: subprocess.PIPE} 1490 with subprocess.Popen(args, **kw) as process: 1491 # communicate() will be interrupted by SIGUSR1 1492 process.communicate() 1493 1494 1495 # This test is Linux-ish specific for simplicity to at least have 1496 # some coverage. It is not a platform specific bug. 1497 @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()), 1498 "Linux specific") 1499 def test_failed_child_execute_fd_leak(self): 1500 """Test for the fork() failure fd leak reported in issue16327.""" 1501 fd_directory = '/proc/%d/fd' % os.getpid() 1502 fds_before_popen = os.listdir(fd_directory) 1503 with self.assertRaises(PopenTestException): 1504 PopenExecuteChildRaises( 1505 ZERO_RETURN_CMD, stdin=subprocess.PIPE, 1506 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 1507 1508 # NOTE: This test doesn't verify that the real _execute_child 1509 # does not close the file descriptors itself on the way out 1510 # during an exception. Code inspection has confirmed that. 1511 1512 fds_after_exception = os.listdir(fd_directory) 1513 self.assertEqual(fds_before_popen, fds_after_exception) 1514 1515 @unittest.skipIf(mswindows, "behavior currently not supported on Windows") 1516 def test_file_not_found_includes_filename(self): 1517 with self.assertRaises(FileNotFoundError) as c: 1518 subprocess.call(['/opt/nonexistent_binary', 'with', 'some', 'args']) 1519 self.assertEqual(c.exception.filename, '/opt/nonexistent_binary') 1520 1521 @unittest.skipIf(mswindows, "behavior currently not supported on Windows") 1522 def test_file_not_found_with_bad_cwd(self): 1523 with self.assertRaises(FileNotFoundError) as c: 1524 subprocess.Popen(['exit', '0'], cwd='/some/nonexistent/directory') 1525 self.assertEqual(c.exception.filename, '/some/nonexistent/directory') 1526 1527 def test_class_getitems(self): 1528 self.assertIsInstance(subprocess.Popen[bytes], types.GenericAlias) 1529 self.assertIsInstance(subprocess.CompletedProcess[str], types.GenericAlias) 1530 1531class RunFuncTestCase(BaseTestCase): 1532 def run_python(self, code, **kwargs): 1533 """Run Python code in a subprocess using subprocess.run""" 1534 argv = [sys.executable, "-c", code] 1535 return subprocess.run(argv, **kwargs) 1536 1537 def test_returncode(self): 1538 # call() function with sequence argument 1539 cp = self.run_python("import sys; sys.exit(47)") 1540 self.assertEqual(cp.returncode, 47) 1541 with self.assertRaises(subprocess.CalledProcessError): 1542 cp.check_returncode() 1543 1544 def test_check(self): 1545 with self.assertRaises(subprocess.CalledProcessError) as c: 1546 self.run_python("import sys; sys.exit(47)", check=True) 1547 self.assertEqual(c.exception.returncode, 47) 1548 1549 def test_check_zero(self): 1550 # check_returncode shouldn't raise when returncode is zero 1551 cp = subprocess.run(ZERO_RETURN_CMD, check=True) 1552 self.assertEqual(cp.returncode, 0) 1553 1554 def test_timeout(self): 1555 # run() function with timeout argument; we want to test that the child 1556 # process gets killed when the timeout expires. If the child isn't 1557 # killed, this call will deadlock since subprocess.run waits for the 1558 # child. 1559 with self.assertRaises(subprocess.TimeoutExpired): 1560 self.run_python("while True: pass", timeout=0.0001) 1561 1562 def test_capture_stdout(self): 1563 # capture stdout with zero return code 1564 cp = self.run_python("print('BDFL')", stdout=subprocess.PIPE) 1565 self.assertIn(b'BDFL', cp.stdout) 1566 1567 def test_capture_stderr(self): 1568 cp = self.run_python("import sys; sys.stderr.write('BDFL')", 1569 stderr=subprocess.PIPE) 1570 self.assertIn(b'BDFL', cp.stderr) 1571 1572 def test_check_output_stdin_arg(self): 1573 # run() can be called with stdin set to a file 1574 tf = tempfile.TemporaryFile() 1575 self.addCleanup(tf.close) 1576 tf.write(b'pear') 1577 tf.seek(0) 1578 cp = self.run_python( 1579 "import sys; sys.stdout.write(sys.stdin.read().upper())", 1580 stdin=tf, stdout=subprocess.PIPE) 1581 self.assertIn(b'PEAR', cp.stdout) 1582 1583 def test_check_output_input_arg(self): 1584 # check_output() can be called with input set to a string 1585 cp = self.run_python( 1586 "import sys; sys.stdout.write(sys.stdin.read().upper())", 1587 input=b'pear', stdout=subprocess.PIPE) 1588 self.assertIn(b'PEAR', cp.stdout) 1589 1590 def test_check_output_stdin_with_input_arg(self): 1591 # run() refuses to accept 'stdin' with 'input' 1592 tf = tempfile.TemporaryFile() 1593 self.addCleanup(tf.close) 1594 tf.write(b'pear') 1595 tf.seek(0) 1596 with self.assertRaises(ValueError, 1597 msg="Expected ValueError when stdin and input args supplied.") as c: 1598 output = self.run_python("print('will not be run')", 1599 stdin=tf, input=b'hare') 1600 self.assertIn('stdin', c.exception.args[0]) 1601 self.assertIn('input', c.exception.args[0]) 1602 1603 def test_check_output_timeout(self): 1604 with self.assertRaises(subprocess.TimeoutExpired) as c: 1605 cp = self.run_python(( 1606 "import sys, time\n" 1607 "sys.stdout.write('BDFL')\n" 1608 "sys.stdout.flush()\n" 1609 "time.sleep(3600)"), 1610 # Some heavily loaded buildbots (sparc Debian 3.x) require 1611 # this much time to start and print. 1612 timeout=3, stdout=subprocess.PIPE) 1613 self.assertEqual(c.exception.output, b'BDFL') 1614 # output is aliased to stdout 1615 self.assertEqual(c.exception.stdout, b'BDFL') 1616 1617 def test_run_kwargs(self): 1618 newenv = os.environ.copy() 1619 newenv["FRUIT"] = "banana" 1620 cp = self.run_python(('import sys, os;' 1621 'sys.exit(33 if os.getenv("FRUIT")=="banana" else 31)'), 1622 env=newenv) 1623 self.assertEqual(cp.returncode, 33) 1624 1625 def test_run_with_pathlike_path(self): 1626 # bpo-31961: test run(pathlike_object) 1627 # the name of a command that can be run without 1628 # any arguments that exit fast 1629 prog = 'tree.com' if mswindows else 'ls' 1630 path = shutil.which(prog) 1631 if path is None: 1632 self.skipTest(f'{prog} required for this test') 1633 path = FakePath(path) 1634 res = subprocess.run(path, stdout=subprocess.DEVNULL) 1635 self.assertEqual(res.returncode, 0) 1636 with self.assertRaises(TypeError): 1637 subprocess.run(path, stdout=subprocess.DEVNULL, shell=True) 1638 1639 def test_run_with_bytes_path_and_arguments(self): 1640 # bpo-31961: test run([bytes_object, b'additional arguments']) 1641 path = os.fsencode(sys.executable) 1642 args = [path, '-c', b'import sys; sys.exit(57)'] 1643 res = subprocess.run(args) 1644 self.assertEqual(res.returncode, 57) 1645 1646 def test_run_with_pathlike_path_and_arguments(self): 1647 # bpo-31961: test run([pathlike_object, 'additional arguments']) 1648 path = FakePath(sys.executable) 1649 args = [path, '-c', 'import sys; sys.exit(57)'] 1650 res = subprocess.run(args) 1651 self.assertEqual(res.returncode, 57) 1652 1653 def test_capture_output(self): 1654 cp = self.run_python(("import sys;" 1655 "sys.stdout.write('BDFL'); " 1656 "sys.stderr.write('FLUFL')"), 1657 capture_output=True) 1658 self.assertIn(b'BDFL', cp.stdout) 1659 self.assertIn(b'FLUFL', cp.stderr) 1660 1661 def test_stdout_with_capture_output_arg(self): 1662 # run() refuses to accept 'stdout' with 'capture_output' 1663 tf = tempfile.TemporaryFile() 1664 self.addCleanup(tf.close) 1665 with self.assertRaises(ValueError, 1666 msg=("Expected ValueError when stdout and capture_output " 1667 "args supplied.")) as c: 1668 output = self.run_python("print('will not be run')", 1669 capture_output=True, stdout=tf) 1670 self.assertIn('stdout', c.exception.args[0]) 1671 self.assertIn('capture_output', c.exception.args[0]) 1672 1673 def test_stderr_with_capture_output_arg(self): 1674 # run() refuses to accept 'stderr' with 'capture_output' 1675 tf = tempfile.TemporaryFile() 1676 self.addCleanup(tf.close) 1677 with self.assertRaises(ValueError, 1678 msg=("Expected ValueError when stderr and capture_output " 1679 "args supplied.")) as c: 1680 output = self.run_python("print('will not be run')", 1681 capture_output=True, stderr=tf) 1682 self.assertIn('stderr', c.exception.args[0]) 1683 self.assertIn('capture_output', c.exception.args[0]) 1684 1685 # This test _might_ wind up a bit fragile on loaded build+test machines 1686 # as it depends on the timing with wide enough margins for normal situations 1687 # but does assert that it happened "soon enough" to believe the right thing 1688 # happened. 1689 @unittest.skipIf(mswindows, "requires posix like 'sleep' shell command") 1690 def test_run_with_shell_timeout_and_capture_output(self): 1691 """Output capturing after a timeout mustn't hang forever on open filehandles.""" 1692 before_secs = time.monotonic() 1693 try: 1694 subprocess.run('sleep 3', shell=True, timeout=0.1, 1695 capture_output=True) # New session unspecified. 1696 except subprocess.TimeoutExpired as exc: 1697 after_secs = time.monotonic() 1698 stacks = traceback.format_exc() # assertRaises doesn't give this. 1699 else: 1700 self.fail("TimeoutExpired not raised.") 1701 self.assertLess(after_secs - before_secs, 1.5, 1702 msg="TimeoutExpired was delayed! Bad traceback:\n```\n" 1703 f"{stacks}```") 1704 1705 1706def _get_test_grp_name(): 1707 for name_group in ('staff', 'nogroup', 'grp', 'nobody', 'nfsnobody'): 1708 if grp: 1709 try: 1710 grp.getgrnam(name_group) 1711 except KeyError: 1712 continue 1713 return name_group 1714 else: 1715 raise unittest.SkipTest('No identified group name to use for this test on this platform.') 1716 1717 1718@unittest.skipIf(mswindows, "POSIX specific tests") 1719class POSIXProcessTestCase(BaseTestCase): 1720 1721 def setUp(self): 1722 super().setUp() 1723 self._nonexistent_dir = "/_this/pa.th/does/not/exist" 1724 1725 def _get_chdir_exception(self): 1726 try: 1727 os.chdir(self._nonexistent_dir) 1728 except OSError as e: 1729 # This avoids hard coding the errno value or the OS perror() 1730 # string and instead capture the exception that we want to see 1731 # below for comparison. 1732 desired_exception = e 1733 else: 1734 self.fail("chdir to nonexistent directory %s succeeded." % 1735 self._nonexistent_dir) 1736 return desired_exception 1737 1738 def test_exception_cwd(self): 1739 """Test error in the child raised in the parent for a bad cwd.""" 1740 desired_exception = self._get_chdir_exception() 1741 try: 1742 p = subprocess.Popen([sys.executable, "-c", ""], 1743 cwd=self._nonexistent_dir) 1744 except OSError as e: 1745 # Test that the child process chdir failure actually makes 1746 # it up to the parent process as the correct exception. 1747 self.assertEqual(desired_exception.errno, e.errno) 1748 self.assertEqual(desired_exception.strerror, e.strerror) 1749 self.assertEqual(desired_exception.filename, e.filename) 1750 else: 1751 self.fail("Expected OSError: %s" % desired_exception) 1752 1753 def test_exception_bad_executable(self): 1754 """Test error in the child raised in the parent for a bad executable.""" 1755 desired_exception = self._get_chdir_exception() 1756 try: 1757 p = subprocess.Popen([sys.executable, "-c", ""], 1758 executable=self._nonexistent_dir) 1759 except OSError as e: 1760 # Test that the child process exec failure actually makes 1761 # it up to the parent process as the correct exception. 1762 self.assertEqual(desired_exception.errno, e.errno) 1763 self.assertEqual(desired_exception.strerror, e.strerror) 1764 self.assertEqual(desired_exception.filename, e.filename) 1765 else: 1766 self.fail("Expected OSError: %s" % desired_exception) 1767 1768 def test_exception_bad_args_0(self): 1769 """Test error in the child raised in the parent for a bad args[0].""" 1770 desired_exception = self._get_chdir_exception() 1771 try: 1772 p = subprocess.Popen([self._nonexistent_dir, "-c", ""]) 1773 except OSError as e: 1774 # Test that the child process exec failure actually makes 1775 # it up to the parent process as the correct exception. 1776 self.assertEqual(desired_exception.errno, e.errno) 1777 self.assertEqual(desired_exception.strerror, e.strerror) 1778 self.assertEqual(desired_exception.filename, e.filename) 1779 else: 1780 self.fail("Expected OSError: %s" % desired_exception) 1781 1782 # We mock the __del__ method for Popen in the next two tests 1783 # because it does cleanup based on the pid returned by fork_exec 1784 # along with issuing a resource warning if it still exists. Since 1785 # we don't actually spawn a process in these tests we can forego 1786 # the destructor. An alternative would be to set _child_created to 1787 # False before the destructor is called but there is no easy way 1788 # to do that 1789 class PopenNoDestructor(subprocess.Popen): 1790 def __del__(self): 1791 pass 1792 1793 @mock.patch("subprocess._posixsubprocess.fork_exec") 1794 def test_exception_errpipe_normal(self, fork_exec): 1795 """Test error passing done through errpipe_write in the good case""" 1796 def proper_error(*args): 1797 errpipe_write = args[13] 1798 # Write the hex for the error code EISDIR: 'is a directory' 1799 err_code = '{:x}'.format(errno.EISDIR).encode() 1800 os.write(errpipe_write, b"OSError:" + err_code + b":") 1801 return 0 1802 1803 fork_exec.side_effect = proper_error 1804 1805 with mock.patch("subprocess.os.waitpid", 1806 side_effect=ChildProcessError): 1807 with self.assertRaises(IsADirectoryError): 1808 self.PopenNoDestructor(["non_existent_command"]) 1809 1810 @mock.patch("subprocess._posixsubprocess.fork_exec") 1811 def test_exception_errpipe_bad_data(self, fork_exec): 1812 """Test error passing done through errpipe_write where its not 1813 in the expected format""" 1814 error_data = b"\xFF\x00\xDE\xAD" 1815 def bad_error(*args): 1816 errpipe_write = args[13] 1817 # Anything can be in the pipe, no assumptions should 1818 # be made about its encoding, so we'll write some 1819 # arbitrary hex bytes to test it out 1820 os.write(errpipe_write, error_data) 1821 return 0 1822 1823 fork_exec.side_effect = bad_error 1824 1825 with mock.patch("subprocess.os.waitpid", 1826 side_effect=ChildProcessError): 1827 with self.assertRaises(subprocess.SubprocessError) as e: 1828 self.PopenNoDestructor(["non_existent_command"]) 1829 1830 self.assertIn(repr(error_data), str(e.exception)) 1831 1832 @unittest.skipIf(not os.path.exists('/proc/self/status'), 1833 "need /proc/self/status") 1834 def test_restore_signals(self): 1835 # Blindly assume that cat exists on systems with /proc/self/status... 1836 default_proc_status = subprocess.check_output( 1837 ['cat', '/proc/self/status'], 1838 restore_signals=False) 1839 for line in default_proc_status.splitlines(): 1840 if line.startswith(b'SigIgn'): 1841 default_sig_ign_mask = line 1842 break 1843 else: 1844 self.skipTest("SigIgn not found in /proc/self/status.") 1845 restored_proc_status = subprocess.check_output( 1846 ['cat', '/proc/self/status'], 1847 restore_signals=True) 1848 for line in restored_proc_status.splitlines(): 1849 if line.startswith(b'SigIgn'): 1850 restored_sig_ign_mask = line 1851 break 1852 self.assertNotEqual(default_sig_ign_mask, restored_sig_ign_mask, 1853 msg="restore_signals=True should've unblocked " 1854 "SIGPIPE and friends.") 1855 1856 def test_start_new_session(self): 1857 # For code coverage of calling setsid(). We don't care if we get an 1858 # EPERM error from it depending on the test execution environment, that 1859 # still indicates that it was called. 1860 try: 1861 output = subprocess.check_output( 1862 [sys.executable, "-c", "import os; print(os.getsid(0))"], 1863 start_new_session=True) 1864 except OSError as e: 1865 if e.errno != errno.EPERM: 1866 raise 1867 else: 1868 parent_sid = os.getsid(0) 1869 child_sid = int(output) 1870 self.assertNotEqual(parent_sid, child_sid) 1871 1872 @unittest.skipUnless(hasattr(os, 'setreuid'), 'no setreuid on platform') 1873 def test_user(self): 1874 # For code coverage of the user parameter. We don't care if we get an 1875 # EPERM error from it depending on the test execution environment, that 1876 # still indicates that it was called. 1877 1878 uid = os.geteuid() 1879 test_users = [65534 if uid != 65534 else 65533, uid] 1880 name_uid = "nobody" if sys.platform != 'darwin' else "unknown" 1881 1882 if pwd is not None: 1883 try: 1884 pwd.getpwnam(name_uid) 1885 test_users.append(name_uid) 1886 except KeyError: 1887 # unknown user name 1888 name_uid = None 1889 1890 for user in test_users: 1891 # posix_spawn() may be used with close_fds=False 1892 for close_fds in (False, True): 1893 with self.subTest(user=user, close_fds=close_fds): 1894 try: 1895 output = subprocess.check_output( 1896 [sys.executable, "-c", 1897 "import os; print(os.getuid())"], 1898 user=user, 1899 close_fds=close_fds) 1900 except PermissionError: # (EACCES, EPERM) 1901 pass 1902 except OSError as e: 1903 if e.errno not in (errno.EACCES, errno.EPERM): 1904 raise 1905 else: 1906 if isinstance(user, str): 1907 user_uid = pwd.getpwnam(user).pw_uid 1908 else: 1909 user_uid = user 1910 child_user = int(output) 1911 self.assertEqual(child_user, user_uid) 1912 1913 with self.assertRaises(ValueError): 1914 subprocess.check_call(ZERO_RETURN_CMD, user=-1) 1915 1916 with self.assertRaises(OverflowError): 1917 subprocess.check_call(ZERO_RETURN_CMD, 1918 cwd=os.curdir, env=os.environ, user=2**64) 1919 1920 if pwd is None and name_uid is not None: 1921 with self.assertRaises(ValueError): 1922 subprocess.check_call(ZERO_RETURN_CMD, user=name_uid) 1923 1924 @unittest.skipIf(hasattr(os, 'setreuid'), 'setreuid() available on platform') 1925 def test_user_error(self): 1926 with self.assertRaises(ValueError): 1927 subprocess.check_call(ZERO_RETURN_CMD, user=65535) 1928 1929 @unittest.skipUnless(hasattr(os, 'setregid'), 'no setregid() on platform') 1930 def test_group(self): 1931 gid = os.getegid() 1932 group_list = [65534 if gid != 65534 else 65533] 1933 name_group = _get_test_grp_name() 1934 1935 if grp is not None: 1936 group_list.append(name_group) 1937 1938 for group in group_list + [gid]: 1939 # posix_spawn() may be used with close_fds=False 1940 for close_fds in (False, True): 1941 with self.subTest(group=group, close_fds=close_fds): 1942 try: 1943 output = subprocess.check_output( 1944 [sys.executable, "-c", 1945 "import os; print(os.getgid())"], 1946 group=group, 1947 close_fds=close_fds) 1948 except PermissionError: # (EACCES, EPERM) 1949 pass 1950 else: 1951 if isinstance(group, str): 1952 group_gid = grp.getgrnam(group).gr_gid 1953 else: 1954 group_gid = group 1955 1956 child_group = int(output) 1957 self.assertEqual(child_group, group_gid) 1958 1959 # make sure we bomb on negative values 1960 with self.assertRaises(ValueError): 1961 subprocess.check_call(ZERO_RETURN_CMD, group=-1) 1962 1963 with self.assertRaises(OverflowError): 1964 subprocess.check_call(ZERO_RETURN_CMD, 1965 cwd=os.curdir, env=os.environ, group=2**64) 1966 1967 if grp is None: 1968 with self.assertRaises(ValueError): 1969 subprocess.check_call(ZERO_RETURN_CMD, group=name_group) 1970 1971 @unittest.skipIf(hasattr(os, 'setregid'), 'setregid() available on platform') 1972 def test_group_error(self): 1973 with self.assertRaises(ValueError): 1974 subprocess.check_call(ZERO_RETURN_CMD, group=65535) 1975 1976 @unittest.skipUnless(hasattr(os, 'setgroups'), 'no setgroups() on platform') 1977 def test_extra_groups(self): 1978 gid = os.getegid() 1979 group_list = [65534 if gid != 65534 else 65533] 1980 name_group = _get_test_grp_name() 1981 perm_error = False 1982 1983 if grp is not None: 1984 group_list.append(name_group) 1985 1986 try: 1987 output = subprocess.check_output( 1988 [sys.executable, "-c", 1989 "import os, sys, json; json.dump(os.getgroups(), sys.stdout)"], 1990 extra_groups=group_list) 1991 except OSError as ex: 1992 if ex.errno != errno.EPERM: 1993 raise 1994 perm_error = True 1995 1996 else: 1997 parent_groups = os.getgroups() 1998 child_groups = json.loads(output) 1999 2000 if grp is not None: 2001 desired_gids = [grp.getgrnam(g).gr_gid if isinstance(g, str) else g 2002 for g in group_list] 2003 else: 2004 desired_gids = group_list 2005 2006 if perm_error: 2007 self.assertEqual(set(child_groups), set(parent_groups)) 2008 else: 2009 self.assertEqual(set(desired_gids), set(child_groups)) 2010 2011 # make sure we bomb on negative values 2012 with self.assertRaises(ValueError): 2013 subprocess.check_call(ZERO_RETURN_CMD, extra_groups=[-1]) 2014 2015 with self.assertRaises(ValueError): 2016 subprocess.check_call(ZERO_RETURN_CMD, 2017 cwd=os.curdir, env=os.environ, 2018 extra_groups=[2**64]) 2019 2020 if grp is None: 2021 with self.assertRaises(ValueError): 2022 subprocess.check_call(ZERO_RETURN_CMD, 2023 extra_groups=[name_group]) 2024 2025 @unittest.skipIf(hasattr(os, 'setgroups'), 'setgroups() available on platform') 2026 def test_extra_groups_error(self): 2027 with self.assertRaises(ValueError): 2028 subprocess.check_call(ZERO_RETURN_CMD, extra_groups=[]) 2029 2030 @unittest.skipIf(mswindows or not hasattr(os, 'umask'), 2031 'POSIX umask() is not available.') 2032 def test_umask(self): 2033 tmpdir = None 2034 try: 2035 tmpdir = tempfile.mkdtemp() 2036 name = os.path.join(tmpdir, "beans") 2037 # We set an unusual umask in the child so as a unique mode 2038 # for us to test the child's touched file for. 2039 subprocess.check_call( 2040 [sys.executable, "-c", f"open({name!r}, 'w').close()"], 2041 umask=0o053) 2042 # Ignore execute permissions entirely in our test, 2043 # filesystems could be mounted to ignore or force that. 2044 st_mode = os.stat(name).st_mode & 0o666 2045 expected_mode = 0o624 2046 self.assertEqual(expected_mode, st_mode, 2047 msg=f'{oct(expected_mode)} != {oct(st_mode)}') 2048 finally: 2049 if tmpdir is not None: 2050 shutil.rmtree(tmpdir) 2051 2052 def test_run_abort(self): 2053 # returncode handles signal termination 2054 with support.SuppressCrashReport(): 2055 p = subprocess.Popen([sys.executable, "-c", 2056 'import os; os.abort()']) 2057 p.wait() 2058 self.assertEqual(-p.returncode, signal.SIGABRT) 2059 2060 def test_CalledProcessError_str_signal(self): 2061 err = subprocess.CalledProcessError(-int(signal.SIGABRT), "fake cmd") 2062 error_string = str(err) 2063 # We're relying on the repr() of the signal.Signals intenum to provide 2064 # the word signal, the signal name and the numeric value. 2065 self.assertIn("signal", error_string.lower()) 2066 # We're not being specific about the signal name as some signals have 2067 # multiple names and which name is revealed can vary. 2068 self.assertIn("SIG", error_string) 2069 self.assertIn(str(signal.SIGABRT), error_string) 2070 2071 def test_CalledProcessError_str_unknown_signal(self): 2072 err = subprocess.CalledProcessError(-9876543, "fake cmd") 2073 error_string = str(err) 2074 self.assertIn("unknown signal 9876543.", error_string) 2075 2076 def test_CalledProcessError_str_non_zero(self): 2077 err = subprocess.CalledProcessError(2, "fake cmd") 2078 error_string = str(err) 2079 self.assertIn("non-zero exit status 2.", error_string) 2080 2081 def test_preexec(self): 2082 # DISCLAIMER: Setting environment variables is *not* a good use 2083 # of a preexec_fn. This is merely a test. 2084 p = subprocess.Popen([sys.executable, "-c", 2085 'import sys,os;' 2086 'sys.stdout.write(os.getenv("FRUIT"))'], 2087 stdout=subprocess.PIPE, 2088 preexec_fn=lambda: os.putenv("FRUIT", "apple")) 2089 with p: 2090 self.assertEqual(p.stdout.read(), b"apple") 2091 2092 def test_preexec_exception(self): 2093 def raise_it(): 2094 raise ValueError("What if two swallows carried a coconut?") 2095 try: 2096 p = subprocess.Popen([sys.executable, "-c", ""], 2097 preexec_fn=raise_it) 2098 except subprocess.SubprocessError as e: 2099 self.assertTrue( 2100 subprocess._posixsubprocess, 2101 "Expected a ValueError from the preexec_fn") 2102 except ValueError as e: 2103 self.assertIn("coconut", e.args[0]) 2104 else: 2105 self.fail("Exception raised by preexec_fn did not make it " 2106 "to the parent process.") 2107 2108 class _TestExecuteChildPopen(subprocess.Popen): 2109 """Used to test behavior at the end of _execute_child.""" 2110 def __init__(self, testcase, *args, **kwargs): 2111 self._testcase = testcase 2112 subprocess.Popen.__init__(self, *args, **kwargs) 2113 2114 def _execute_child(self, *args, **kwargs): 2115 try: 2116 subprocess.Popen._execute_child(self, *args, **kwargs) 2117 finally: 2118 # Open a bunch of file descriptors and verify that 2119 # none of them are the same as the ones the Popen 2120 # instance is using for stdin/stdout/stderr. 2121 devzero_fds = [os.open("/dev/zero", os.O_RDONLY) 2122 for _ in range(8)] 2123 try: 2124 for fd in devzero_fds: 2125 self._testcase.assertNotIn( 2126 fd, (self.stdin.fileno(), self.stdout.fileno(), 2127 self.stderr.fileno()), 2128 msg="At least one fd was closed early.") 2129 finally: 2130 for fd in devzero_fds: 2131 os.close(fd) 2132 2133 @unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.") 2134 def test_preexec_errpipe_does_not_double_close_pipes(self): 2135 """Issue16140: Don't double close pipes on preexec error.""" 2136 2137 def raise_it(): 2138 raise subprocess.SubprocessError( 2139 "force the _execute_child() errpipe_data path.") 2140 2141 with self.assertRaises(subprocess.SubprocessError): 2142 self._TestExecuteChildPopen( 2143 self, ZERO_RETURN_CMD, 2144 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 2145 stderr=subprocess.PIPE, preexec_fn=raise_it) 2146 2147 def test_preexec_gc_module_failure(self): 2148 # This tests the code that disables garbage collection if the child 2149 # process will execute any Python. 2150 enabled = gc.isenabled() 2151 try: 2152 gc.disable() 2153 self.assertFalse(gc.isenabled()) 2154 subprocess.call([sys.executable, '-c', ''], 2155 preexec_fn=lambda: None) 2156 self.assertFalse(gc.isenabled(), 2157 "Popen enabled gc when it shouldn't.") 2158 2159 gc.enable() 2160 self.assertTrue(gc.isenabled()) 2161 subprocess.call([sys.executable, '-c', ''], 2162 preexec_fn=lambda: None) 2163 self.assertTrue(gc.isenabled(), "Popen left gc disabled.") 2164 finally: 2165 if not enabled: 2166 gc.disable() 2167 2168 @unittest.skipIf( 2169 sys.platform == 'darwin', 'setrlimit() seems to fail on OS X') 2170 def test_preexec_fork_failure(self): 2171 # The internal code did not preserve the previous exception when 2172 # re-enabling garbage collection 2173 try: 2174 from resource import getrlimit, setrlimit, RLIMIT_NPROC 2175 except ImportError as err: 2176 self.skipTest(err) # RLIMIT_NPROC is specific to Linux and BSD 2177 limits = getrlimit(RLIMIT_NPROC) 2178 [_, hard] = limits 2179 setrlimit(RLIMIT_NPROC, (0, hard)) 2180 self.addCleanup(setrlimit, RLIMIT_NPROC, limits) 2181 try: 2182 subprocess.call([sys.executable, '-c', ''], 2183 preexec_fn=lambda: None) 2184 except BlockingIOError: 2185 # Forking should raise EAGAIN, translated to BlockingIOError 2186 pass 2187 else: 2188 self.skipTest('RLIMIT_NPROC had no effect; probably superuser') 2189 2190 def test_args_string(self): 2191 # args is a string 2192 fd, fname = tempfile.mkstemp() 2193 # reopen in text mode 2194 with open(fd, "w", errors="surrogateescape") as fobj: 2195 fobj.write("#!%s\n" % support.unix_shell) 2196 fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" % 2197 sys.executable) 2198 os.chmod(fname, 0o700) 2199 p = subprocess.Popen(fname) 2200 p.wait() 2201 os.remove(fname) 2202 self.assertEqual(p.returncode, 47) 2203 2204 def test_invalid_args(self): 2205 # invalid arguments should raise ValueError 2206 self.assertRaises(ValueError, subprocess.call, 2207 [sys.executable, "-c", 2208 "import sys; sys.exit(47)"], 2209 startupinfo=47) 2210 self.assertRaises(ValueError, subprocess.call, 2211 [sys.executable, "-c", 2212 "import sys; sys.exit(47)"], 2213 creationflags=47) 2214 2215 def test_shell_sequence(self): 2216 # Run command through the shell (sequence) 2217 newenv = os.environ.copy() 2218 newenv["FRUIT"] = "apple" 2219 p = subprocess.Popen(["echo $FRUIT"], shell=1, 2220 stdout=subprocess.PIPE, 2221 env=newenv) 2222 with p: 2223 self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple") 2224 2225 def test_shell_string(self): 2226 # Run command through the shell (string) 2227 newenv = os.environ.copy() 2228 newenv["FRUIT"] = "apple" 2229 p = subprocess.Popen("echo $FRUIT", shell=1, 2230 stdout=subprocess.PIPE, 2231 env=newenv) 2232 with p: 2233 self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple") 2234 2235 def test_call_string(self): 2236 # call() function with string argument on UNIX 2237 fd, fname = tempfile.mkstemp() 2238 # reopen in text mode 2239 with open(fd, "w", errors="surrogateescape") as fobj: 2240 fobj.write("#!%s\n" % support.unix_shell) 2241 fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" % 2242 sys.executable) 2243 os.chmod(fname, 0o700) 2244 rc = subprocess.call(fname) 2245 os.remove(fname) 2246 self.assertEqual(rc, 47) 2247 2248 def test_specific_shell(self): 2249 # Issue #9265: Incorrect name passed as arg[0]. 2250 shells = [] 2251 for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']: 2252 for name in ['bash', 'ksh']: 2253 sh = os.path.join(prefix, name) 2254 if os.path.isfile(sh): 2255 shells.append(sh) 2256 if not shells: # Will probably work for any shell but csh. 2257 self.skipTest("bash or ksh required for this test") 2258 sh = '/bin/sh' 2259 if os.path.isfile(sh) and not os.path.islink(sh): 2260 # Test will fail if /bin/sh is a symlink to csh. 2261 shells.append(sh) 2262 for sh in shells: 2263 p = subprocess.Popen("echo $0", executable=sh, shell=True, 2264 stdout=subprocess.PIPE) 2265 with p: 2266 self.assertEqual(p.stdout.read().strip(), bytes(sh, 'ascii')) 2267 2268 def _kill_process(self, method, *args): 2269 # Do not inherit file handles from the parent. 2270 # It should fix failures on some platforms. 2271 # Also set the SIGINT handler to the default to make sure it's not 2272 # being ignored (some tests rely on that.) 2273 old_handler = signal.signal(signal.SIGINT, signal.default_int_handler) 2274 try: 2275 p = subprocess.Popen([sys.executable, "-c", """if 1: 2276 import sys, time 2277 sys.stdout.write('x\\n') 2278 sys.stdout.flush() 2279 time.sleep(30) 2280 """], 2281 close_fds=True, 2282 stdin=subprocess.PIPE, 2283 stdout=subprocess.PIPE, 2284 stderr=subprocess.PIPE) 2285 finally: 2286 signal.signal(signal.SIGINT, old_handler) 2287 # Wait for the interpreter to be completely initialized before 2288 # sending any signal. 2289 p.stdout.read(1) 2290 getattr(p, method)(*args) 2291 return p 2292 2293 @unittest.skipIf(sys.platform.startswith(('netbsd', 'openbsd')), 2294 "Due to known OS bug (issue #16762)") 2295 def _kill_dead_process(self, method, *args): 2296 # Do not inherit file handles from the parent. 2297 # It should fix failures on some platforms. 2298 p = subprocess.Popen([sys.executable, "-c", """if 1: 2299 import sys, time 2300 sys.stdout.write('x\\n') 2301 sys.stdout.flush() 2302 """], 2303 close_fds=True, 2304 stdin=subprocess.PIPE, 2305 stdout=subprocess.PIPE, 2306 stderr=subprocess.PIPE) 2307 # Wait for the interpreter to be completely initialized before 2308 # sending any signal. 2309 p.stdout.read(1) 2310 # The process should end after this 2311 time.sleep(1) 2312 # This shouldn't raise even though the child is now dead 2313 getattr(p, method)(*args) 2314 p.communicate() 2315 2316 def test_send_signal(self): 2317 p = self._kill_process('send_signal', signal.SIGINT) 2318 _, stderr = p.communicate() 2319 self.assertIn(b'KeyboardInterrupt', stderr) 2320 self.assertNotEqual(p.wait(), 0) 2321 2322 def test_kill(self): 2323 p = self._kill_process('kill') 2324 _, stderr = p.communicate() 2325 self.assertEqual(stderr, b'') 2326 self.assertEqual(p.wait(), -signal.SIGKILL) 2327 2328 def test_terminate(self): 2329 p = self._kill_process('terminate') 2330 _, stderr = p.communicate() 2331 self.assertEqual(stderr, b'') 2332 self.assertEqual(p.wait(), -signal.SIGTERM) 2333 2334 def test_send_signal_dead(self): 2335 # Sending a signal to a dead process 2336 self._kill_dead_process('send_signal', signal.SIGINT) 2337 2338 def test_kill_dead(self): 2339 # Killing a dead process 2340 self._kill_dead_process('kill') 2341 2342 def test_terminate_dead(self): 2343 # Terminating a dead process 2344 self._kill_dead_process('terminate') 2345 2346 def _save_fds(self, save_fds): 2347 fds = [] 2348 for fd in save_fds: 2349 inheritable = os.get_inheritable(fd) 2350 saved = os.dup(fd) 2351 fds.append((fd, saved, inheritable)) 2352 return fds 2353 2354 def _restore_fds(self, fds): 2355 for fd, saved, inheritable in fds: 2356 os.dup2(saved, fd, inheritable=inheritable) 2357 os.close(saved) 2358 2359 def check_close_std_fds(self, fds): 2360 # Issue #9905: test that subprocess pipes still work properly with 2361 # some standard fds closed 2362 stdin = 0 2363 saved_fds = self._save_fds(fds) 2364 for fd, saved, inheritable in saved_fds: 2365 if fd == 0: 2366 stdin = saved 2367 break 2368 try: 2369 for fd in fds: 2370 os.close(fd) 2371 out, err = subprocess.Popen([sys.executable, "-c", 2372 'import sys;' 2373 'sys.stdout.write("apple");' 2374 'sys.stdout.flush();' 2375 'sys.stderr.write("orange")'], 2376 stdin=stdin, 2377 stdout=subprocess.PIPE, 2378 stderr=subprocess.PIPE).communicate() 2379 self.assertEqual(out, b'apple') 2380 self.assertEqual(err, b'orange') 2381 finally: 2382 self._restore_fds(saved_fds) 2383 2384 def test_close_fd_0(self): 2385 self.check_close_std_fds([0]) 2386 2387 def test_close_fd_1(self): 2388 self.check_close_std_fds([1]) 2389 2390 def test_close_fd_2(self): 2391 self.check_close_std_fds([2]) 2392 2393 def test_close_fds_0_1(self): 2394 self.check_close_std_fds([0, 1]) 2395 2396 def test_close_fds_0_2(self): 2397 self.check_close_std_fds([0, 2]) 2398 2399 def test_close_fds_1_2(self): 2400 self.check_close_std_fds([1, 2]) 2401 2402 def test_close_fds_0_1_2(self): 2403 # Issue #10806: test that subprocess pipes still work properly with 2404 # all standard fds closed. 2405 self.check_close_std_fds([0, 1, 2]) 2406 2407 def test_small_errpipe_write_fd(self): 2408 """Issue #15798: Popen should work when stdio fds are available.""" 2409 new_stdin = os.dup(0) 2410 new_stdout = os.dup(1) 2411 try: 2412 os.close(0) 2413 os.close(1) 2414 2415 # Side test: if errpipe_write fails to have its CLOEXEC 2416 # flag set this should cause the parent to think the exec 2417 # failed. Extremely unlikely: everyone supports CLOEXEC. 2418 subprocess.Popen([ 2419 sys.executable, "-c", 2420 "print('AssertionError:0:CLOEXEC failure.')"]).wait() 2421 finally: 2422 # Restore original stdin and stdout 2423 os.dup2(new_stdin, 0) 2424 os.dup2(new_stdout, 1) 2425 os.close(new_stdin) 2426 os.close(new_stdout) 2427 2428 def test_remapping_std_fds(self): 2429 # open up some temporary files 2430 temps = [tempfile.mkstemp() for i in range(3)] 2431 try: 2432 temp_fds = [fd for fd, fname in temps] 2433 2434 # unlink the files -- we won't need to reopen them 2435 for fd, fname in temps: 2436 os.unlink(fname) 2437 2438 # write some data to what will become stdin, and rewind 2439 os.write(temp_fds[1], b"STDIN") 2440 os.lseek(temp_fds[1], 0, 0) 2441 2442 # move the standard file descriptors out of the way 2443 saved_fds = self._save_fds(range(3)) 2444 try: 2445 # duplicate the file objects over the standard fd's 2446 for fd, temp_fd in enumerate(temp_fds): 2447 os.dup2(temp_fd, fd) 2448 2449 # now use those files in the "wrong" order, so that subprocess 2450 # has to rearrange them in the child 2451 p = subprocess.Popen([sys.executable, "-c", 2452 'import sys; got = sys.stdin.read();' 2453 'sys.stdout.write("got %s"%got); sys.stderr.write("err")'], 2454 stdin=temp_fds[1], 2455 stdout=temp_fds[2], 2456 stderr=temp_fds[0]) 2457 p.wait() 2458 finally: 2459 self._restore_fds(saved_fds) 2460 2461 for fd in temp_fds: 2462 os.lseek(fd, 0, 0) 2463 2464 out = os.read(temp_fds[2], 1024) 2465 err = os.read(temp_fds[0], 1024).strip() 2466 self.assertEqual(out, b"got STDIN") 2467 self.assertEqual(err, b"err") 2468 2469 finally: 2470 for fd in temp_fds: 2471 os.close(fd) 2472 2473 def check_swap_fds(self, stdin_no, stdout_no, stderr_no): 2474 # open up some temporary files 2475 temps = [tempfile.mkstemp() for i in range(3)] 2476 temp_fds = [fd for fd, fname in temps] 2477 try: 2478 # unlink the files -- we won't need to reopen them 2479 for fd, fname in temps: 2480 os.unlink(fname) 2481 2482 # save a copy of the standard file descriptors 2483 saved_fds = self._save_fds(range(3)) 2484 try: 2485 # duplicate the temp files over the standard fd's 0, 1, 2 2486 for fd, temp_fd in enumerate(temp_fds): 2487 os.dup2(temp_fd, fd) 2488 2489 # write some data to what will become stdin, and rewind 2490 os.write(stdin_no, b"STDIN") 2491 os.lseek(stdin_no, 0, 0) 2492 2493 # now use those files in the given order, so that subprocess 2494 # has to rearrange them in the child 2495 p = subprocess.Popen([sys.executable, "-c", 2496 'import sys; got = sys.stdin.read();' 2497 'sys.stdout.write("got %s"%got); sys.stderr.write("err")'], 2498 stdin=stdin_no, 2499 stdout=stdout_no, 2500 stderr=stderr_no) 2501 p.wait() 2502 2503 for fd in temp_fds: 2504 os.lseek(fd, 0, 0) 2505 2506 out = os.read(stdout_no, 1024) 2507 err = os.read(stderr_no, 1024).strip() 2508 finally: 2509 self._restore_fds(saved_fds) 2510 2511 self.assertEqual(out, b"got STDIN") 2512 self.assertEqual(err, b"err") 2513 2514 finally: 2515 for fd in temp_fds: 2516 os.close(fd) 2517 2518 # When duping fds, if there arises a situation where one of the fds is 2519 # either 0, 1 or 2, it is possible that it is overwritten (#12607). 2520 # This tests all combinations of this. 2521 def test_swap_fds(self): 2522 self.check_swap_fds(0, 1, 2) 2523 self.check_swap_fds(0, 2, 1) 2524 self.check_swap_fds(1, 0, 2) 2525 self.check_swap_fds(1, 2, 0) 2526 self.check_swap_fds(2, 0, 1) 2527 self.check_swap_fds(2, 1, 0) 2528 2529 def _check_swap_std_fds_with_one_closed(self, from_fds, to_fds): 2530 saved_fds = self._save_fds(range(3)) 2531 try: 2532 for from_fd in from_fds: 2533 with tempfile.TemporaryFile() as f: 2534 os.dup2(f.fileno(), from_fd) 2535 2536 fd_to_close = (set(range(3)) - set(from_fds)).pop() 2537 os.close(fd_to_close) 2538 2539 arg_names = ['stdin', 'stdout', 'stderr'] 2540 kwargs = {} 2541 for from_fd, to_fd in zip(from_fds, to_fds): 2542 kwargs[arg_names[to_fd]] = from_fd 2543 2544 code = textwrap.dedent(r''' 2545 import os, sys 2546 skipped_fd = int(sys.argv[1]) 2547 for fd in range(3): 2548 if fd != skipped_fd: 2549 os.write(fd, str(fd).encode('ascii')) 2550 ''') 2551 2552 skipped_fd = (set(range(3)) - set(to_fds)).pop() 2553 2554 rc = subprocess.call([sys.executable, '-c', code, str(skipped_fd)], 2555 **kwargs) 2556 self.assertEqual(rc, 0) 2557 2558 for from_fd, to_fd in zip(from_fds, to_fds): 2559 os.lseek(from_fd, 0, os.SEEK_SET) 2560 read_bytes = os.read(from_fd, 1024) 2561 read_fds = list(map(int, read_bytes.decode('ascii'))) 2562 msg = textwrap.dedent(f""" 2563 When testing {from_fds} to {to_fds} redirection, 2564 parent descriptor {from_fd} got redirected 2565 to descriptor(s) {read_fds} instead of descriptor {to_fd}. 2566 """) 2567 self.assertEqual([to_fd], read_fds, msg) 2568 finally: 2569 self._restore_fds(saved_fds) 2570 2571 # Check that subprocess can remap std fds correctly even 2572 # if one of them is closed (#32844). 2573 def test_swap_std_fds_with_one_closed(self): 2574 for from_fds in itertools.combinations(range(3), 2): 2575 for to_fds in itertools.permutations(range(3), 2): 2576 self._check_swap_std_fds_with_one_closed(from_fds, to_fds) 2577 2578 def test_surrogates_error_message(self): 2579 def prepare(): 2580 raise ValueError("surrogate:\uDCff") 2581 2582 try: 2583 subprocess.call( 2584 ZERO_RETURN_CMD, 2585 preexec_fn=prepare) 2586 except ValueError as err: 2587 # Pure Python implementations keeps the message 2588 self.assertIsNone(subprocess._posixsubprocess) 2589 self.assertEqual(str(err), "surrogate:\uDCff") 2590 except subprocess.SubprocessError as err: 2591 # _posixsubprocess uses a default message 2592 self.assertIsNotNone(subprocess._posixsubprocess) 2593 self.assertEqual(str(err), "Exception occurred in preexec_fn.") 2594 else: 2595 self.fail("Expected ValueError or subprocess.SubprocessError") 2596 2597 def test_undecodable_env(self): 2598 for key, value in (('test', 'abc\uDCFF'), ('test\uDCFF', '42')): 2599 encoded_value = value.encode("ascii", "surrogateescape") 2600 2601 # test str with surrogates 2602 script = "import os; print(ascii(os.getenv(%s)))" % repr(key) 2603 env = os.environ.copy() 2604 env[key] = value 2605 # Use C locale to get ASCII for the locale encoding to force 2606 # surrogate-escaping of \xFF in the child process 2607 env['LC_ALL'] = 'C' 2608 decoded_value = value 2609 stdout = subprocess.check_output( 2610 [sys.executable, "-c", script], 2611 env=env) 2612 stdout = stdout.rstrip(b'\n\r') 2613 self.assertEqual(stdout.decode('ascii'), ascii(decoded_value)) 2614 2615 # test bytes 2616 key = key.encode("ascii", "surrogateescape") 2617 script = "import os; print(ascii(os.getenvb(%s)))" % repr(key) 2618 env = os.environ.copy() 2619 env[key] = encoded_value 2620 stdout = subprocess.check_output( 2621 [sys.executable, "-c", script], 2622 env=env) 2623 stdout = stdout.rstrip(b'\n\r') 2624 self.assertEqual(stdout.decode('ascii'), ascii(encoded_value)) 2625 2626 def test_bytes_program(self): 2627 abs_program = os.fsencode(ZERO_RETURN_CMD[0]) 2628 args = list(ZERO_RETURN_CMD[1:]) 2629 path, program = os.path.split(ZERO_RETURN_CMD[0]) 2630 program = os.fsencode(program) 2631 2632 # absolute bytes path 2633 exitcode = subprocess.call([abs_program]+args) 2634 self.assertEqual(exitcode, 0) 2635 2636 # absolute bytes path as a string 2637 cmd = b"'%s' %s" % (abs_program, " ".join(args).encode("utf-8")) 2638 exitcode = subprocess.call(cmd, shell=True) 2639 self.assertEqual(exitcode, 0) 2640 2641 # bytes program, unicode PATH 2642 env = os.environ.copy() 2643 env["PATH"] = path 2644 exitcode = subprocess.call([program]+args, env=env) 2645 self.assertEqual(exitcode, 0) 2646 2647 # bytes program, bytes PATH 2648 envb = os.environb.copy() 2649 envb[b"PATH"] = os.fsencode(path) 2650 exitcode = subprocess.call([program]+args, env=envb) 2651 self.assertEqual(exitcode, 0) 2652 2653 def test_pipe_cloexec(self): 2654 sleeper = support.findfile("input_reader.py", subdir="subprocessdata") 2655 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 2656 2657 p1 = subprocess.Popen([sys.executable, sleeper], 2658 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 2659 stderr=subprocess.PIPE, close_fds=False) 2660 2661 self.addCleanup(p1.communicate, b'') 2662 2663 p2 = subprocess.Popen([sys.executable, fd_status], 2664 stdout=subprocess.PIPE, close_fds=False) 2665 2666 output, error = p2.communicate() 2667 result_fds = set(map(int, output.split(b','))) 2668 unwanted_fds = set([p1.stdin.fileno(), p1.stdout.fileno(), 2669 p1.stderr.fileno()]) 2670 2671 self.assertFalse(result_fds & unwanted_fds, 2672 "Expected no fds from %r to be open in child, " 2673 "found %r" % 2674 (unwanted_fds, result_fds & unwanted_fds)) 2675 2676 def test_pipe_cloexec_real_tools(self): 2677 qcat = support.findfile("qcat.py", subdir="subprocessdata") 2678 qgrep = support.findfile("qgrep.py", subdir="subprocessdata") 2679 2680 subdata = b'zxcvbn' 2681 data = subdata * 4 + b'\n' 2682 2683 p1 = subprocess.Popen([sys.executable, qcat], 2684 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 2685 close_fds=False) 2686 2687 p2 = subprocess.Popen([sys.executable, qgrep, subdata], 2688 stdin=p1.stdout, stdout=subprocess.PIPE, 2689 close_fds=False) 2690 2691 self.addCleanup(p1.wait) 2692 self.addCleanup(p2.wait) 2693 def kill_p1(): 2694 try: 2695 p1.terminate() 2696 except ProcessLookupError: 2697 pass 2698 def kill_p2(): 2699 try: 2700 p2.terminate() 2701 except ProcessLookupError: 2702 pass 2703 self.addCleanup(kill_p1) 2704 self.addCleanup(kill_p2) 2705 2706 p1.stdin.write(data) 2707 p1.stdin.close() 2708 2709 readfiles, ignored1, ignored2 = select.select([p2.stdout], [], [], 10) 2710 2711 self.assertTrue(readfiles, "The child hung") 2712 self.assertEqual(p2.stdout.read(), data) 2713 2714 p1.stdout.close() 2715 p2.stdout.close() 2716 2717 def test_close_fds(self): 2718 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 2719 2720 fds = os.pipe() 2721 self.addCleanup(os.close, fds[0]) 2722 self.addCleanup(os.close, fds[1]) 2723 2724 open_fds = set(fds) 2725 # add a bunch more fds 2726 for _ in range(9): 2727 fd = os.open(os.devnull, os.O_RDONLY) 2728 self.addCleanup(os.close, fd) 2729 open_fds.add(fd) 2730 2731 for fd in open_fds: 2732 os.set_inheritable(fd, True) 2733 2734 p = subprocess.Popen([sys.executable, fd_status], 2735 stdout=subprocess.PIPE, close_fds=False) 2736 output, ignored = p.communicate() 2737 remaining_fds = set(map(int, output.split(b','))) 2738 2739 self.assertEqual(remaining_fds & open_fds, open_fds, 2740 "Some fds were closed") 2741 2742 p = subprocess.Popen([sys.executable, fd_status], 2743 stdout=subprocess.PIPE, close_fds=True) 2744 output, ignored = p.communicate() 2745 remaining_fds = set(map(int, output.split(b','))) 2746 2747 self.assertFalse(remaining_fds & open_fds, 2748 "Some fds were left open") 2749 self.assertIn(1, remaining_fds, "Subprocess failed") 2750 2751 # Keep some of the fd's we opened open in the subprocess. 2752 # This tests _posixsubprocess.c's proper handling of fds_to_keep. 2753 fds_to_keep = set(open_fds.pop() for _ in range(8)) 2754 p = subprocess.Popen([sys.executable, fd_status], 2755 stdout=subprocess.PIPE, close_fds=True, 2756 pass_fds=fds_to_keep) 2757 output, ignored = p.communicate() 2758 remaining_fds = set(map(int, output.split(b','))) 2759 2760 self.assertFalse((remaining_fds - fds_to_keep) & open_fds, 2761 "Some fds not in pass_fds were left open") 2762 self.assertIn(1, remaining_fds, "Subprocess failed") 2763 2764 2765 @unittest.skipIf(sys.platform.startswith("freebsd") and 2766 os.stat("/dev").st_dev == os.stat("/dev/fd").st_dev, 2767 "Requires fdescfs mounted on /dev/fd on FreeBSD.") 2768 def test_close_fds_when_max_fd_is_lowered(self): 2769 """Confirm that issue21618 is fixed (may fail under valgrind).""" 2770 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 2771 2772 # This launches the meat of the test in a child process to 2773 # avoid messing with the larger unittest processes maximum 2774 # number of file descriptors. 2775 # This process launches: 2776 # +--> Process that lowers its RLIMIT_NOFILE aftr setting up 2777 # a bunch of high open fds above the new lower rlimit. 2778 # Those are reported via stdout before launching a new 2779 # process with close_fds=False to run the actual test: 2780 # +--> The TEST: This one launches a fd_status.py 2781 # subprocess with close_fds=True so we can find out if 2782 # any of the fds above the lowered rlimit are still open. 2783 p = subprocess.Popen([sys.executable, '-c', textwrap.dedent( 2784 ''' 2785 import os, resource, subprocess, sys, textwrap 2786 open_fds = set() 2787 # Add a bunch more fds to pass down. 2788 for _ in range(40): 2789 fd = os.open(os.devnull, os.O_RDONLY) 2790 open_fds.add(fd) 2791 2792 # Leave a two pairs of low ones available for use by the 2793 # internal child error pipe and the stdout pipe. 2794 # We also leave 10 more open as some Python buildbots run into 2795 # "too many open files" errors during the test if we do not. 2796 for fd in sorted(open_fds)[:14]: 2797 os.close(fd) 2798 open_fds.remove(fd) 2799 2800 for fd in open_fds: 2801 #self.addCleanup(os.close, fd) 2802 os.set_inheritable(fd, True) 2803 2804 max_fd_open = max(open_fds) 2805 2806 # Communicate the open_fds to the parent unittest.TestCase process. 2807 print(','.join(map(str, sorted(open_fds)))) 2808 sys.stdout.flush() 2809 2810 rlim_cur, rlim_max = resource.getrlimit(resource.RLIMIT_NOFILE) 2811 try: 2812 # 29 is lower than the highest fds we are leaving open. 2813 resource.setrlimit(resource.RLIMIT_NOFILE, (29, rlim_max)) 2814 # Launch a new Python interpreter with our low fd rlim_cur that 2815 # inherits open fds above that limit. It then uses subprocess 2816 # with close_fds=True to get a report of open fds in the child. 2817 # An explicit list of fds to check is passed to fd_status.py as 2818 # letting fd_status rely on its default logic would miss the 2819 # fds above rlim_cur as it normally only checks up to that limit. 2820 subprocess.Popen( 2821 [sys.executable, '-c', 2822 textwrap.dedent(""" 2823 import subprocess, sys 2824 subprocess.Popen([sys.executable, %r] + 2825 [str(x) for x in range({max_fd})], 2826 close_fds=True).wait() 2827 """.format(max_fd=max_fd_open+1))], 2828 close_fds=False).wait() 2829 finally: 2830 resource.setrlimit(resource.RLIMIT_NOFILE, (rlim_cur, rlim_max)) 2831 ''' % fd_status)], stdout=subprocess.PIPE) 2832 2833 output, unused_stderr = p.communicate() 2834 output_lines = output.splitlines() 2835 self.assertEqual(len(output_lines), 2, 2836 msg="expected exactly two lines of output:\n%r" % output) 2837 opened_fds = set(map(int, output_lines[0].strip().split(b','))) 2838 remaining_fds = set(map(int, output_lines[1].strip().split(b','))) 2839 2840 self.assertFalse(remaining_fds & opened_fds, 2841 msg="Some fds were left open.") 2842 2843 2844 # Mac OS X Tiger (10.4) has a kernel bug: sometimes, the file 2845 # descriptor of a pipe closed in the parent process is valid in the 2846 # child process according to fstat(), but the mode of the file 2847 # descriptor is invalid, and read or write raise an error. 2848 @support.requires_mac_ver(10, 5) 2849 def test_pass_fds(self): 2850 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 2851 2852 open_fds = set() 2853 2854 for x in range(5): 2855 fds = os.pipe() 2856 self.addCleanup(os.close, fds[0]) 2857 self.addCleanup(os.close, fds[1]) 2858 os.set_inheritable(fds[0], True) 2859 os.set_inheritable(fds[1], True) 2860 open_fds.update(fds) 2861 2862 for fd in open_fds: 2863 p = subprocess.Popen([sys.executable, fd_status], 2864 stdout=subprocess.PIPE, close_fds=True, 2865 pass_fds=(fd, )) 2866 output, ignored = p.communicate() 2867 2868 remaining_fds = set(map(int, output.split(b','))) 2869 to_be_closed = open_fds - {fd} 2870 2871 self.assertIn(fd, remaining_fds, "fd to be passed not passed") 2872 self.assertFalse(remaining_fds & to_be_closed, 2873 "fd to be closed passed") 2874 2875 # pass_fds overrides close_fds with a warning. 2876 with self.assertWarns(RuntimeWarning) as context: 2877 self.assertFalse(subprocess.call( 2878 ZERO_RETURN_CMD, 2879 close_fds=False, pass_fds=(fd, ))) 2880 self.assertIn('overriding close_fds', str(context.warning)) 2881 2882 def test_pass_fds_inheritable(self): 2883 script = support.findfile("fd_status.py", subdir="subprocessdata") 2884 2885 inheritable, non_inheritable = os.pipe() 2886 self.addCleanup(os.close, inheritable) 2887 self.addCleanup(os.close, non_inheritable) 2888 os.set_inheritable(inheritable, True) 2889 os.set_inheritable(non_inheritable, False) 2890 pass_fds = (inheritable, non_inheritable) 2891 args = [sys.executable, script] 2892 args += list(map(str, pass_fds)) 2893 2894 p = subprocess.Popen(args, 2895 stdout=subprocess.PIPE, close_fds=True, 2896 pass_fds=pass_fds) 2897 output, ignored = p.communicate() 2898 fds = set(map(int, output.split(b','))) 2899 2900 # the inheritable file descriptor must be inherited, so its inheritable 2901 # flag must be set in the child process after fork() and before exec() 2902 self.assertEqual(fds, set(pass_fds), "output=%a" % output) 2903 2904 # inheritable flag must not be changed in the parent process 2905 self.assertEqual(os.get_inheritable(inheritable), True) 2906 self.assertEqual(os.get_inheritable(non_inheritable), False) 2907 2908 2909 # bpo-32270: Ensure that descriptors specified in pass_fds 2910 # are inherited even if they are used in redirections. 2911 # Contributed by @izbyshev. 2912 def test_pass_fds_redirected(self): 2913 """Regression test for https://bugs.python.org/issue32270.""" 2914 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 2915 pass_fds = [] 2916 for _ in range(2): 2917 fd = os.open(os.devnull, os.O_RDWR) 2918 self.addCleanup(os.close, fd) 2919 pass_fds.append(fd) 2920 2921 stdout_r, stdout_w = os.pipe() 2922 self.addCleanup(os.close, stdout_r) 2923 self.addCleanup(os.close, stdout_w) 2924 pass_fds.insert(1, stdout_w) 2925 2926 with subprocess.Popen([sys.executable, fd_status], 2927 stdin=pass_fds[0], 2928 stdout=pass_fds[1], 2929 stderr=pass_fds[2], 2930 close_fds=True, 2931 pass_fds=pass_fds): 2932 output = os.read(stdout_r, 1024) 2933 fds = {int(num) for num in output.split(b',')} 2934 2935 self.assertEqual(fds, {0, 1, 2} | frozenset(pass_fds), f"output={output!a}") 2936 2937 2938 def test_stdout_stdin_are_single_inout_fd(self): 2939 with io.open(os.devnull, "r+") as inout: 2940 p = subprocess.Popen(ZERO_RETURN_CMD, 2941 stdout=inout, stdin=inout) 2942 p.wait() 2943 2944 def test_stdout_stderr_are_single_inout_fd(self): 2945 with io.open(os.devnull, "r+") as inout: 2946 p = subprocess.Popen(ZERO_RETURN_CMD, 2947 stdout=inout, stderr=inout) 2948 p.wait() 2949 2950 def test_stderr_stdin_are_single_inout_fd(self): 2951 with io.open(os.devnull, "r+") as inout: 2952 p = subprocess.Popen(ZERO_RETURN_CMD, 2953 stderr=inout, stdin=inout) 2954 p.wait() 2955 2956 def test_wait_when_sigchild_ignored(self): 2957 # NOTE: sigchild_ignore.py may not be an effective test on all OSes. 2958 sigchild_ignore = support.findfile("sigchild_ignore.py", 2959 subdir="subprocessdata") 2960 p = subprocess.Popen([sys.executable, sigchild_ignore], 2961 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 2962 stdout, stderr = p.communicate() 2963 self.assertEqual(0, p.returncode, "sigchild_ignore.py exited" 2964 " non-zero with this error:\n%s" % 2965 stderr.decode('utf-8')) 2966 2967 def test_select_unbuffered(self): 2968 # Issue #11459: bufsize=0 should really set the pipes as 2969 # unbuffered (and therefore let select() work properly). 2970 select = import_helper.import_module("select") 2971 p = subprocess.Popen([sys.executable, "-c", 2972 'import sys;' 2973 'sys.stdout.write("apple")'], 2974 stdout=subprocess.PIPE, 2975 bufsize=0) 2976 f = p.stdout 2977 self.addCleanup(f.close) 2978 try: 2979 self.assertEqual(f.read(4), b"appl") 2980 self.assertIn(f, select.select([f], [], [], 0.0)[0]) 2981 finally: 2982 p.wait() 2983 2984 def test_zombie_fast_process_del(self): 2985 # Issue #12650: on Unix, if Popen.__del__() was called before the 2986 # process exited, it wouldn't be added to subprocess._active, and would 2987 # remain a zombie. 2988 # spawn a Popen, and delete its reference before it exits 2989 p = subprocess.Popen([sys.executable, "-c", 2990 'import sys, time;' 2991 'time.sleep(0.2)'], 2992 stdout=subprocess.PIPE, 2993 stderr=subprocess.PIPE) 2994 self.addCleanup(p.stdout.close) 2995 self.addCleanup(p.stderr.close) 2996 ident = id(p) 2997 pid = p.pid 2998 with warnings_helper.check_warnings(('', ResourceWarning)): 2999 p = None 3000 3001 if mswindows: 3002 # subprocess._active is not used on Windows and is set to None. 3003 self.assertIsNone(subprocess._active) 3004 else: 3005 # check that p is in the active processes list 3006 self.assertIn(ident, [id(o) for o in subprocess._active]) 3007 3008 def test_leak_fast_process_del_killed(self): 3009 # Issue #12650: on Unix, if Popen.__del__() was called before the 3010 # process exited, and the process got killed by a signal, it would never 3011 # be removed from subprocess._active, which triggered a FD and memory 3012 # leak. 3013 # spawn a Popen, delete its reference and kill it 3014 p = subprocess.Popen([sys.executable, "-c", 3015 'import time;' 3016 'time.sleep(3)'], 3017 stdout=subprocess.PIPE, 3018 stderr=subprocess.PIPE) 3019 self.addCleanup(p.stdout.close) 3020 self.addCleanup(p.stderr.close) 3021 ident = id(p) 3022 pid = p.pid 3023 with warnings_helper.check_warnings(('', ResourceWarning)): 3024 p = None 3025 support.gc_collect() # For PyPy or other GCs. 3026 3027 os.kill(pid, signal.SIGKILL) 3028 if mswindows: 3029 # subprocess._active is not used on Windows and is set to None. 3030 self.assertIsNone(subprocess._active) 3031 else: 3032 # check that p is in the active processes list 3033 self.assertIn(ident, [id(o) for o in subprocess._active]) 3034 3035 # let some time for the process to exit, and create a new Popen: this 3036 # should trigger the wait() of p 3037 time.sleep(0.2) 3038 with self.assertRaises(OSError): 3039 with subprocess.Popen(NONEXISTING_CMD, 3040 stdout=subprocess.PIPE, 3041 stderr=subprocess.PIPE) as proc: 3042 pass 3043 # p should have been wait()ed on, and removed from the _active list 3044 self.assertRaises(OSError, os.waitpid, pid, 0) 3045 if mswindows: 3046 # subprocess._active is not used on Windows and is set to None. 3047 self.assertIsNone(subprocess._active) 3048 else: 3049 self.assertNotIn(ident, [id(o) for o in subprocess._active]) 3050 3051 def test_close_fds_after_preexec(self): 3052 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 3053 3054 # this FD is used as dup2() target by preexec_fn, and should be closed 3055 # in the child process 3056 fd = os.dup(1) 3057 self.addCleanup(os.close, fd) 3058 3059 p = subprocess.Popen([sys.executable, fd_status], 3060 stdout=subprocess.PIPE, close_fds=True, 3061 preexec_fn=lambda: os.dup2(1, fd)) 3062 output, ignored = p.communicate() 3063 3064 remaining_fds = set(map(int, output.split(b','))) 3065 3066 self.assertNotIn(fd, remaining_fds) 3067 3068 @support.cpython_only 3069 def test_fork_exec(self): 3070 # Issue #22290: fork_exec() must not crash on memory allocation failure 3071 # or other errors 3072 import _posixsubprocess 3073 gc_enabled = gc.isenabled() 3074 try: 3075 # Use a preexec function and enable the garbage collector 3076 # to force fork_exec() to re-enable the garbage collector 3077 # on error. 3078 func = lambda: None 3079 gc.enable() 3080 3081 for args, exe_list, cwd, env_list in ( 3082 (123, [b"exe"], None, [b"env"]), 3083 ([b"arg"], 123, None, [b"env"]), 3084 ([b"arg"], [b"exe"], 123, [b"env"]), 3085 ([b"arg"], [b"exe"], None, 123), 3086 ): 3087 with self.assertRaises(TypeError) as err: 3088 _posixsubprocess.fork_exec( 3089 args, exe_list, 3090 True, (), cwd, env_list, 3091 -1, -1, -1, -1, 3092 1, 2, 3, 4, 3093 True, True, 3094 False, [], 0, -1, 3095 func) 3096 # Attempt to prevent 3097 # "TypeError: fork_exec() takes exactly N arguments (M given)" 3098 # from passing the test. More refactoring to have us start 3099 # with a valid *args list, confirm a good call with that works 3100 # before mutating it in various ways to ensure that bad calls 3101 # with individual arg type errors raise a typeerror would be 3102 # ideal. Saving that for a future PR... 3103 self.assertNotIn('takes exactly', str(err.exception)) 3104 finally: 3105 if not gc_enabled: 3106 gc.disable() 3107 3108 @support.cpython_only 3109 def test_fork_exec_sorted_fd_sanity_check(self): 3110 # Issue #23564: sanity check the fork_exec() fds_to_keep sanity check. 3111 import _posixsubprocess 3112 class BadInt: 3113 first = True 3114 def __init__(self, value): 3115 self.value = value 3116 def __int__(self): 3117 if self.first: 3118 self.first = False 3119 return self.value 3120 raise ValueError 3121 3122 gc_enabled = gc.isenabled() 3123 try: 3124 gc.enable() 3125 3126 for fds_to_keep in ( 3127 (-1, 2, 3, 4, 5), # Negative number. 3128 ('str', 4), # Not an int. 3129 (18, 23, 42, 2**63), # Out of range. 3130 (5, 4), # Not sorted. 3131 (6, 7, 7, 8), # Duplicate. 3132 (BadInt(1), BadInt(2)), 3133 ): 3134 with self.assertRaises( 3135 ValueError, 3136 msg='fds_to_keep={}'.format(fds_to_keep)) as c: 3137 _posixsubprocess.fork_exec( 3138 [b"false"], [b"false"], 3139 True, fds_to_keep, None, [b"env"], 3140 -1, -1, -1, -1, 3141 1, 2, 3, 4, 3142 True, True, 3143 None, None, None, -1, 3144 None) 3145 self.assertIn('fds_to_keep', str(c.exception)) 3146 finally: 3147 if not gc_enabled: 3148 gc.disable() 3149 3150 def test_communicate_BrokenPipeError_stdin_close(self): 3151 # By not setting stdout or stderr or a timeout we force the fast path 3152 # that just calls _stdin_write() internally due to our mock. 3153 proc = subprocess.Popen(ZERO_RETURN_CMD) 3154 with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin: 3155 mock_proc_stdin.close.side_effect = BrokenPipeError 3156 proc.communicate() # Should swallow BrokenPipeError from close. 3157 mock_proc_stdin.close.assert_called_with() 3158 3159 def test_communicate_BrokenPipeError_stdin_write(self): 3160 # By not setting stdout or stderr or a timeout we force the fast path 3161 # that just calls _stdin_write() internally due to our mock. 3162 proc = subprocess.Popen(ZERO_RETURN_CMD) 3163 with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin: 3164 mock_proc_stdin.write.side_effect = BrokenPipeError 3165 proc.communicate(b'stuff') # Should swallow the BrokenPipeError. 3166 mock_proc_stdin.write.assert_called_once_with(b'stuff') 3167 mock_proc_stdin.close.assert_called_once_with() 3168 3169 def test_communicate_BrokenPipeError_stdin_flush(self): 3170 # Setting stdin and stdout forces the ._communicate() code path. 3171 # python -h exits faster than python -c pass (but spams stdout). 3172 proc = subprocess.Popen([sys.executable, '-h'], 3173 stdin=subprocess.PIPE, 3174 stdout=subprocess.PIPE) 3175 with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin, \ 3176 open(os.devnull, 'wb') as dev_null: 3177 mock_proc_stdin.flush.side_effect = BrokenPipeError 3178 # because _communicate registers a selector using proc.stdin... 3179 mock_proc_stdin.fileno.return_value = dev_null.fileno() 3180 # _communicate() should swallow BrokenPipeError from flush. 3181 proc.communicate(b'stuff') 3182 mock_proc_stdin.flush.assert_called_once_with() 3183 3184 def test_communicate_BrokenPipeError_stdin_close_with_timeout(self): 3185 # Setting stdin and stdout forces the ._communicate() code path. 3186 # python -h exits faster than python -c pass (but spams stdout). 3187 proc = subprocess.Popen([sys.executable, '-h'], 3188 stdin=subprocess.PIPE, 3189 stdout=subprocess.PIPE) 3190 with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin: 3191 mock_proc_stdin.close.side_effect = BrokenPipeError 3192 # _communicate() should swallow BrokenPipeError from close. 3193 proc.communicate(timeout=999) 3194 mock_proc_stdin.close.assert_called_once_with() 3195 3196 @unittest.skipUnless(_testcapi is not None 3197 and hasattr(_testcapi, 'W_STOPCODE'), 3198 'need _testcapi.W_STOPCODE') 3199 def test_stopped(self): 3200 """Test wait() behavior when waitpid returns WIFSTOPPED; issue29335.""" 3201 args = ZERO_RETURN_CMD 3202 proc = subprocess.Popen(args) 3203 3204 # Wait until the real process completes to avoid zombie process 3205 support.wait_process(proc.pid, exitcode=0) 3206 3207 status = _testcapi.W_STOPCODE(3) 3208 with mock.patch('subprocess.os.waitpid', return_value=(proc.pid, status)): 3209 returncode = proc.wait() 3210 3211 self.assertEqual(returncode, -3) 3212 3213 def test_send_signal_race(self): 3214 # bpo-38630: send_signal() must poll the process exit status to reduce 3215 # the risk of sending the signal to the wrong process. 3216 proc = subprocess.Popen(ZERO_RETURN_CMD) 3217 3218 # wait until the process completes without using the Popen APIs. 3219 support.wait_process(proc.pid, exitcode=0) 3220 3221 # returncode is still None but the process completed. 3222 self.assertIsNone(proc.returncode) 3223 3224 with mock.patch("os.kill") as mock_kill: 3225 proc.send_signal(signal.SIGTERM) 3226 3227 # send_signal() didn't call os.kill() since the process already 3228 # completed. 3229 mock_kill.assert_not_called() 3230 3231 # Don't check the returncode value: the test reads the exit status, 3232 # so Popen failed to read it and uses a default returncode instead. 3233 self.assertIsNotNone(proc.returncode) 3234 3235 def test_send_signal_race2(self): 3236 # bpo-40550: the process might exist between the returncode check and 3237 # the kill operation 3238 p = subprocess.Popen([sys.executable, '-c', 'exit(1)']) 3239 3240 # wait for process to exit 3241 while not p.returncode: 3242 p.poll() 3243 3244 with mock.patch.object(p, 'poll', new=lambda: None): 3245 p.returncode = None 3246 p.send_signal(signal.SIGTERM) 3247 3248 def test_communicate_repeated_call_after_stdout_close(self): 3249 proc = subprocess.Popen([sys.executable, '-c', 3250 'import os, time; os.close(1), time.sleep(2)'], 3251 stdout=subprocess.PIPE) 3252 while True: 3253 try: 3254 proc.communicate(timeout=0.1) 3255 return 3256 except subprocess.TimeoutExpired: 3257 pass 3258 3259 3260@unittest.skipUnless(mswindows, "Windows specific tests") 3261class Win32ProcessTestCase(BaseTestCase): 3262 3263 def test_startupinfo(self): 3264 # startupinfo argument 3265 # We uses hardcoded constants, because we do not want to 3266 # depend on win32all. 3267 STARTF_USESHOWWINDOW = 1 3268 SW_MAXIMIZE = 3 3269 startupinfo = subprocess.STARTUPINFO() 3270 startupinfo.dwFlags = STARTF_USESHOWWINDOW 3271 startupinfo.wShowWindow = SW_MAXIMIZE 3272 # Since Python is a console process, it won't be affected 3273 # by wShowWindow, but the argument should be silently 3274 # ignored 3275 subprocess.call(ZERO_RETURN_CMD, 3276 startupinfo=startupinfo) 3277 3278 def test_startupinfo_keywords(self): 3279 # startupinfo argument 3280 # We use hardcoded constants, because we do not want to 3281 # depend on win32all. 3282 STARTF_USERSHOWWINDOW = 1 3283 SW_MAXIMIZE = 3 3284 startupinfo = subprocess.STARTUPINFO( 3285 dwFlags=STARTF_USERSHOWWINDOW, 3286 wShowWindow=SW_MAXIMIZE 3287 ) 3288 # Since Python is a console process, it won't be affected 3289 # by wShowWindow, but the argument should be silently 3290 # ignored 3291 subprocess.call(ZERO_RETURN_CMD, 3292 startupinfo=startupinfo) 3293 3294 def test_startupinfo_copy(self): 3295 # bpo-34044: Popen must not modify input STARTUPINFO structure 3296 startupinfo = subprocess.STARTUPINFO() 3297 startupinfo.dwFlags = subprocess.STARTF_USESHOWWINDOW 3298 startupinfo.wShowWindow = subprocess.SW_HIDE 3299 3300 # Call Popen() twice with the same startupinfo object to make sure 3301 # that it's not modified 3302 for _ in range(2): 3303 cmd = ZERO_RETURN_CMD 3304 with open(os.devnull, 'w') as null: 3305 proc = subprocess.Popen(cmd, 3306 stdout=null, 3307 stderr=subprocess.STDOUT, 3308 startupinfo=startupinfo) 3309 with proc: 3310 proc.communicate() 3311 self.assertEqual(proc.returncode, 0) 3312 3313 self.assertEqual(startupinfo.dwFlags, 3314 subprocess.STARTF_USESHOWWINDOW) 3315 self.assertIsNone(startupinfo.hStdInput) 3316 self.assertIsNone(startupinfo.hStdOutput) 3317 self.assertIsNone(startupinfo.hStdError) 3318 self.assertEqual(startupinfo.wShowWindow, subprocess.SW_HIDE) 3319 self.assertEqual(startupinfo.lpAttributeList, {"handle_list": []}) 3320 3321 def test_creationflags(self): 3322 # creationflags argument 3323 CREATE_NEW_CONSOLE = 16 3324 sys.stderr.write(" a DOS box should flash briefly ...\n") 3325 subprocess.call(sys.executable + 3326 ' -c "import time; time.sleep(0.25)"', 3327 creationflags=CREATE_NEW_CONSOLE) 3328 3329 def test_invalid_args(self): 3330 # invalid arguments should raise ValueError 3331 self.assertRaises(ValueError, subprocess.call, 3332 [sys.executable, "-c", 3333 "import sys; sys.exit(47)"], 3334 preexec_fn=lambda: 1) 3335 3336 @support.cpython_only 3337 def test_issue31471(self): 3338 # There shouldn't be an assertion failure in Popen() in case the env 3339 # argument has a bad keys() method. 3340 class BadEnv(dict): 3341 keys = None 3342 with self.assertRaises(TypeError): 3343 subprocess.Popen(ZERO_RETURN_CMD, env=BadEnv()) 3344 3345 def test_close_fds(self): 3346 # close file descriptors 3347 rc = subprocess.call([sys.executable, "-c", 3348 "import sys; sys.exit(47)"], 3349 close_fds=True) 3350 self.assertEqual(rc, 47) 3351 3352 def test_close_fds_with_stdio(self): 3353 import msvcrt 3354 3355 fds = os.pipe() 3356 self.addCleanup(os.close, fds[0]) 3357 self.addCleanup(os.close, fds[1]) 3358 3359 handles = [] 3360 for fd in fds: 3361 os.set_inheritable(fd, True) 3362 handles.append(msvcrt.get_osfhandle(fd)) 3363 3364 p = subprocess.Popen([sys.executable, "-c", 3365 "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])], 3366 stdout=subprocess.PIPE, close_fds=False) 3367 stdout, stderr = p.communicate() 3368 self.assertEqual(p.returncode, 0) 3369 int(stdout.strip()) # Check that stdout is an integer 3370 3371 p = subprocess.Popen([sys.executable, "-c", 3372 "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])], 3373 stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True) 3374 stdout, stderr = p.communicate() 3375 self.assertEqual(p.returncode, 1) 3376 self.assertIn(b"OSError", stderr) 3377 3378 # The same as the previous call, but with an empty handle_list 3379 handle_list = [] 3380 startupinfo = subprocess.STARTUPINFO() 3381 startupinfo.lpAttributeList = {"handle_list": handle_list} 3382 p = subprocess.Popen([sys.executable, "-c", 3383 "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])], 3384 stdout=subprocess.PIPE, stderr=subprocess.PIPE, 3385 startupinfo=startupinfo, close_fds=True) 3386 stdout, stderr = p.communicate() 3387 self.assertEqual(p.returncode, 1) 3388 self.assertIn(b"OSError", stderr) 3389 3390 # Check for a warning due to using handle_list and close_fds=False 3391 with warnings_helper.check_warnings((".*overriding close_fds", 3392 RuntimeWarning)): 3393 startupinfo = subprocess.STARTUPINFO() 3394 startupinfo.lpAttributeList = {"handle_list": handles[:]} 3395 p = subprocess.Popen([sys.executable, "-c", 3396 "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])], 3397 stdout=subprocess.PIPE, stderr=subprocess.PIPE, 3398 startupinfo=startupinfo, close_fds=False) 3399 stdout, stderr = p.communicate() 3400 self.assertEqual(p.returncode, 0) 3401 3402 def test_empty_attribute_list(self): 3403 startupinfo = subprocess.STARTUPINFO() 3404 startupinfo.lpAttributeList = {} 3405 subprocess.call(ZERO_RETURN_CMD, 3406 startupinfo=startupinfo) 3407 3408 def test_empty_handle_list(self): 3409 startupinfo = subprocess.STARTUPINFO() 3410 startupinfo.lpAttributeList = {"handle_list": []} 3411 subprocess.call(ZERO_RETURN_CMD, 3412 startupinfo=startupinfo) 3413 3414 def test_shell_sequence(self): 3415 # Run command through the shell (sequence) 3416 newenv = os.environ.copy() 3417 newenv["FRUIT"] = "physalis" 3418 p = subprocess.Popen(["set"], shell=1, 3419 stdout=subprocess.PIPE, 3420 env=newenv) 3421 with p: 3422 self.assertIn(b"physalis", p.stdout.read()) 3423 3424 def test_shell_string(self): 3425 # Run command through the shell (string) 3426 newenv = os.environ.copy() 3427 newenv["FRUIT"] = "physalis" 3428 p = subprocess.Popen("set", shell=1, 3429 stdout=subprocess.PIPE, 3430 env=newenv) 3431 with p: 3432 self.assertIn(b"physalis", p.stdout.read()) 3433 3434 def test_shell_encodings(self): 3435 # Run command through the shell (string) 3436 for enc in ['ansi', 'oem']: 3437 newenv = os.environ.copy() 3438 newenv["FRUIT"] = "physalis" 3439 p = subprocess.Popen("set", shell=1, 3440 stdout=subprocess.PIPE, 3441 env=newenv, 3442 encoding=enc) 3443 with p: 3444 self.assertIn("physalis", p.stdout.read(), enc) 3445 3446 def test_call_string(self): 3447 # call() function with string argument on Windows 3448 rc = subprocess.call(sys.executable + 3449 ' -c "import sys; sys.exit(47)"') 3450 self.assertEqual(rc, 47) 3451 3452 def _kill_process(self, method, *args): 3453 # Some win32 buildbot raises EOFError if stdin is inherited 3454 p = subprocess.Popen([sys.executable, "-c", """if 1: 3455 import sys, time 3456 sys.stdout.write('x\\n') 3457 sys.stdout.flush() 3458 time.sleep(30) 3459 """], 3460 stdin=subprocess.PIPE, 3461 stdout=subprocess.PIPE, 3462 stderr=subprocess.PIPE) 3463 with p: 3464 # Wait for the interpreter to be completely initialized before 3465 # sending any signal. 3466 p.stdout.read(1) 3467 getattr(p, method)(*args) 3468 _, stderr = p.communicate() 3469 self.assertEqual(stderr, b'') 3470 returncode = p.wait() 3471 self.assertNotEqual(returncode, 0) 3472 3473 def _kill_dead_process(self, method, *args): 3474 p = subprocess.Popen([sys.executable, "-c", """if 1: 3475 import sys, time 3476 sys.stdout.write('x\\n') 3477 sys.stdout.flush() 3478 sys.exit(42) 3479 """], 3480 stdin=subprocess.PIPE, 3481 stdout=subprocess.PIPE, 3482 stderr=subprocess.PIPE) 3483 with p: 3484 # Wait for the interpreter to be completely initialized before 3485 # sending any signal. 3486 p.stdout.read(1) 3487 # The process should end after this 3488 time.sleep(1) 3489 # This shouldn't raise even though the child is now dead 3490 getattr(p, method)(*args) 3491 _, stderr = p.communicate() 3492 self.assertEqual(stderr, b'') 3493 rc = p.wait() 3494 self.assertEqual(rc, 42) 3495 3496 def test_send_signal(self): 3497 self._kill_process('send_signal', signal.SIGTERM) 3498 3499 def test_kill(self): 3500 self._kill_process('kill') 3501 3502 def test_terminate(self): 3503 self._kill_process('terminate') 3504 3505 def test_send_signal_dead(self): 3506 self._kill_dead_process('send_signal', signal.SIGTERM) 3507 3508 def test_kill_dead(self): 3509 self._kill_dead_process('kill') 3510 3511 def test_terminate_dead(self): 3512 self._kill_dead_process('terminate') 3513 3514class MiscTests(unittest.TestCase): 3515 3516 class RecordingPopen(subprocess.Popen): 3517 """A Popen that saves a reference to each instance for testing.""" 3518 instances_created = [] 3519 3520 def __init__(self, *args, **kwargs): 3521 super().__init__(*args, **kwargs) 3522 self.instances_created.append(self) 3523 3524 @mock.patch.object(subprocess.Popen, "_communicate") 3525 def _test_keyboardinterrupt_no_kill(self, popener, mock__communicate, 3526 **kwargs): 3527 """Fake a SIGINT happening during Popen._communicate() and ._wait(). 3528 3529 This avoids the need to actually try and get test environments to send 3530 and receive signals reliably across platforms. The net effect of a ^C 3531 happening during a blocking subprocess execution which we want to clean 3532 up from is a KeyboardInterrupt coming out of communicate() or wait(). 3533 """ 3534 3535 mock__communicate.side_effect = KeyboardInterrupt 3536 try: 3537 with mock.patch.object(subprocess.Popen, "_wait") as mock__wait: 3538 # We patch out _wait() as no signal was involved so the 3539 # child process isn't actually going to exit rapidly. 3540 mock__wait.side_effect = KeyboardInterrupt 3541 with mock.patch.object(subprocess, "Popen", 3542 self.RecordingPopen): 3543 with self.assertRaises(KeyboardInterrupt): 3544 popener([sys.executable, "-c", 3545 "import time\ntime.sleep(9)\nimport sys\n" 3546 "sys.stderr.write('\\n!runaway child!\\n')"], 3547 stdout=subprocess.DEVNULL, **kwargs) 3548 for call in mock__wait.call_args_list[1:]: 3549 self.assertNotEqual( 3550 call, mock.call(timeout=None), 3551 "no open-ended wait() after the first allowed: " 3552 f"{mock__wait.call_args_list}") 3553 sigint_calls = [] 3554 for call in mock__wait.call_args_list: 3555 if call == mock.call(timeout=0.25): # from Popen.__init__ 3556 sigint_calls.append(call) 3557 self.assertLessEqual(mock__wait.call_count, 2, 3558 msg=mock__wait.call_args_list) 3559 self.assertEqual(len(sigint_calls), 1, 3560 msg=mock__wait.call_args_list) 3561 finally: 3562 # cleanup the forgotten (due to our mocks) child process 3563 process = self.RecordingPopen.instances_created.pop() 3564 process.kill() 3565 process.wait() 3566 self.assertEqual([], self.RecordingPopen.instances_created) 3567 3568 def test_call_keyboardinterrupt_no_kill(self): 3569 self._test_keyboardinterrupt_no_kill(subprocess.call, timeout=6.282) 3570 3571 def test_run_keyboardinterrupt_no_kill(self): 3572 self._test_keyboardinterrupt_no_kill(subprocess.run, timeout=6.282) 3573 3574 def test_context_manager_keyboardinterrupt_no_kill(self): 3575 def popen_via_context_manager(*args, **kwargs): 3576 with subprocess.Popen(*args, **kwargs) as unused_process: 3577 raise KeyboardInterrupt # Test how __exit__ handles ^C. 3578 self._test_keyboardinterrupt_no_kill(popen_via_context_manager) 3579 3580 def test_getoutput(self): 3581 self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy') 3582 self.assertEqual(subprocess.getstatusoutput('echo xyzzy'), 3583 (0, 'xyzzy')) 3584 3585 # we use mkdtemp in the next line to create an empty directory 3586 # under our exclusive control; from that, we can invent a pathname 3587 # that we _know_ won't exist. This is guaranteed to fail. 3588 dir = None 3589 try: 3590 dir = tempfile.mkdtemp() 3591 name = os.path.join(dir, "foo") 3592 status, output = subprocess.getstatusoutput( 3593 ("type " if mswindows else "cat ") + name) 3594 self.assertNotEqual(status, 0) 3595 finally: 3596 if dir is not None: 3597 os.rmdir(dir) 3598 3599 def test__all__(self): 3600 """Ensure that __all__ is populated properly.""" 3601 intentionally_excluded = {"list2cmdline", "Handle", "pwd", "grp", "fcntl"} 3602 exported = set(subprocess.__all__) 3603 possible_exports = set() 3604 import types 3605 for name, value in subprocess.__dict__.items(): 3606 if name.startswith('_'): 3607 continue 3608 if isinstance(value, (types.ModuleType,)): 3609 continue 3610 possible_exports.add(name) 3611 self.assertEqual(exported, possible_exports - intentionally_excluded) 3612 3613 3614@unittest.skipUnless(hasattr(selectors, 'PollSelector'), 3615 "Test needs selectors.PollSelector") 3616class ProcessTestCaseNoPoll(ProcessTestCase): 3617 def setUp(self): 3618 self.orig_selector = subprocess._PopenSelector 3619 subprocess._PopenSelector = selectors.SelectSelector 3620 ProcessTestCase.setUp(self) 3621 3622 def tearDown(self): 3623 subprocess._PopenSelector = self.orig_selector 3624 ProcessTestCase.tearDown(self) 3625 3626 3627@unittest.skipUnless(mswindows, "Windows-specific tests") 3628class CommandsWithSpaces (BaseTestCase): 3629 3630 def setUp(self): 3631 super().setUp() 3632 f, fname = tempfile.mkstemp(".py", "te st") 3633 self.fname = fname.lower () 3634 os.write(f, b"import sys;" 3635 b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))" 3636 ) 3637 os.close(f) 3638 3639 def tearDown(self): 3640 os.remove(self.fname) 3641 super().tearDown() 3642 3643 def with_spaces(self, *args, **kwargs): 3644 kwargs['stdout'] = subprocess.PIPE 3645 p = subprocess.Popen(*args, **kwargs) 3646 with p: 3647 self.assertEqual( 3648 p.stdout.read ().decode("mbcs"), 3649 "2 [%r, 'ab cd']" % self.fname 3650 ) 3651 3652 def test_shell_string_with_spaces(self): 3653 # call() function with string argument with spaces on Windows 3654 self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, 3655 "ab cd"), shell=1) 3656 3657 def test_shell_sequence_with_spaces(self): 3658 # call() function with sequence argument with spaces on Windows 3659 self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1) 3660 3661 def test_noshell_string_with_spaces(self): 3662 # call() function with string argument with spaces on Windows 3663 self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, 3664 "ab cd")) 3665 3666 def test_noshell_sequence_with_spaces(self): 3667 # call() function with sequence argument with spaces on Windows 3668 self.with_spaces([sys.executable, self.fname, "ab cd"]) 3669 3670 3671class ContextManagerTests(BaseTestCase): 3672 3673 def test_pipe(self): 3674 with subprocess.Popen([sys.executable, "-c", 3675 "import sys;" 3676 "sys.stdout.write('stdout');" 3677 "sys.stderr.write('stderr');"], 3678 stdout=subprocess.PIPE, 3679 stderr=subprocess.PIPE) as proc: 3680 self.assertEqual(proc.stdout.read(), b"stdout") 3681 self.assertEqual(proc.stderr.read(), b"stderr") 3682 3683 self.assertTrue(proc.stdout.closed) 3684 self.assertTrue(proc.stderr.closed) 3685 3686 def test_returncode(self): 3687 with subprocess.Popen([sys.executable, "-c", 3688 "import sys; sys.exit(100)"]) as proc: 3689 pass 3690 # __exit__ calls wait(), so the returncode should be set 3691 self.assertEqual(proc.returncode, 100) 3692 3693 def test_communicate_stdin(self): 3694 with subprocess.Popen([sys.executable, "-c", 3695 "import sys;" 3696 "sys.exit(sys.stdin.read() == 'context')"], 3697 stdin=subprocess.PIPE) as proc: 3698 proc.communicate(b"context") 3699 self.assertEqual(proc.returncode, 1) 3700 3701 def test_invalid_args(self): 3702 with self.assertRaises(NONEXISTING_ERRORS): 3703 with subprocess.Popen(NONEXISTING_CMD, 3704 stdout=subprocess.PIPE, 3705 stderr=subprocess.PIPE) as proc: 3706 pass 3707 3708 def test_broken_pipe_cleanup(self): 3709 """Broken pipe error should not prevent wait() (Issue 21619)""" 3710 proc = subprocess.Popen(ZERO_RETURN_CMD, 3711 stdin=subprocess.PIPE, 3712 bufsize=support.PIPE_MAX_SIZE*2) 3713 proc = proc.__enter__() 3714 # Prepare to send enough data to overflow any OS pipe buffering and 3715 # guarantee a broken pipe error. Data is held in BufferedWriter 3716 # buffer until closed. 3717 proc.stdin.write(b'x' * support.PIPE_MAX_SIZE) 3718 self.assertIsNone(proc.returncode) 3719 # EPIPE expected under POSIX; EINVAL under Windows 3720 self.assertRaises(OSError, proc.__exit__, None, None, None) 3721 self.assertEqual(proc.returncode, 0) 3722 self.assertTrue(proc.stdin.closed) 3723 3724 3725if __name__ == "__main__": 3726 unittest.main() 3727