1import unittest 2from test import test_support 3import subprocess 4import sys 5import signal 6import os 7import errno 8import tempfile 9import time 10import re 11import sysconfig 12 13try: 14 import resource 15except ImportError: 16 resource = None 17try: 18 import threading 19except ImportError: 20 threading = None 21 22mswindows = (sys.platform == "win32") 23 24# 25# Depends on the following external programs: Python 26# 27 28if mswindows: 29 SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), ' 30 'os.O_BINARY);') 31else: 32 SETBINARY = '' 33 34 35class BaseTestCase(unittest.TestCase): 36 def setUp(self): 37 # Try to minimize the number of children we have so this test 38 # doesn't crash on some buildbots (Alphas in particular). 39 test_support.reap_children() 40 41 def tearDown(self): 42 for inst in subprocess._active: 43 inst.wait() 44 subprocess._cleanup() 45 self.assertFalse(subprocess._active, "subprocess._active not empty") 46 47 def assertStderrEqual(self, stderr, expected, msg=None): 48 # In a debug build, stuff like "[6580 refs]" is printed to stderr at 49 # shutdown time. That frustrates tests trying to check stderr produced 50 # from a spawned Python process. 51 actual = re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr) 52 self.assertEqual(actual, expected, msg) 53 54 55class PopenTestException(Exception): 56 pass 57 58 59class PopenExecuteChildRaises(subprocess.Popen): 60 """Popen subclass for testing cleanup of subprocess.PIPE filehandles when 61 _execute_child fails. 62 """ 63 def _execute_child(self, *args, **kwargs): 64 raise PopenTestException("Forced Exception for Test") 65 66 67class ProcessTestCase(BaseTestCase): 68 69 def test_call_seq(self): 70 # call() function with sequence argument 71 rc = subprocess.call([sys.executable, "-c", 72 "import sys; sys.exit(47)"]) 73 self.assertEqual(rc, 47) 74 75 def test_check_call_zero(self): 76 # check_call() function with zero return code 77 rc = subprocess.check_call([sys.executable, "-c", 78 "import sys; sys.exit(0)"]) 79 self.assertEqual(rc, 0) 80 81 def test_check_call_nonzero(self): 82 # check_call() function with non-zero return code 83 with self.assertRaises(subprocess.CalledProcessError) as c: 84 subprocess.check_call([sys.executable, "-c", 85 "import sys; sys.exit(47)"]) 86 self.assertEqual(c.exception.returncode, 47) 87 88 def test_check_output(self): 89 # check_output() function with zero return code 90 output = subprocess.check_output( 91 [sys.executable, "-c", "print 'BDFL'"]) 92 self.assertIn('BDFL', output) 93 94 def test_check_output_nonzero(self): 95 # check_call() function with non-zero return code 96 with self.assertRaises(subprocess.CalledProcessError) as c: 97 subprocess.check_output( 98 [sys.executable, "-c", "import sys; sys.exit(5)"]) 99 self.assertEqual(c.exception.returncode, 5) 100 101 def test_check_output_stderr(self): 102 # check_output() function stderr redirected to stdout 103 output = subprocess.check_output( 104 [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"], 105 stderr=subprocess.STDOUT) 106 self.assertIn('BDFL', output) 107 108 def test_check_output_stdout_arg(self): 109 # check_output() function stderr redirected to stdout 110 with self.assertRaises(ValueError) as c: 111 output = subprocess.check_output( 112 [sys.executable, "-c", "print 'will not be run'"], 113 stdout=sys.stdout) 114 self.fail("Expected ValueError when stdout arg supplied.") 115 self.assertIn('stdout', c.exception.args[0]) 116 117 def test_call_kwargs(self): 118 # call() function with keyword args 119 newenv = os.environ.copy() 120 newenv["FRUIT"] = "banana" 121 rc = subprocess.call([sys.executable, "-c", 122 'import sys, os;' 123 'sys.exit(os.getenv("FRUIT")=="banana")'], 124 env=newenv) 125 self.assertEqual(rc, 1) 126 127 def test_invalid_args(self): 128 # Popen() called with invalid arguments should raise TypeError 129 # but Popen.__del__ should not complain (issue #12085) 130 with test_support.captured_stderr() as s: 131 self.assertRaises(TypeError, subprocess.Popen, invalid_arg_name=1) 132 argcount = subprocess.Popen.__init__.__code__.co_argcount 133 too_many_args = [0] * (argcount + 1) 134 self.assertRaises(TypeError, subprocess.Popen, *too_many_args) 135 self.assertEqual(s.getvalue(), '') 136 137 def test_stdin_none(self): 138 # .stdin is None when not redirected 139 p = subprocess.Popen([sys.executable, "-c", 'print "banana"'], 140 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 141 self.addCleanup(p.stdout.close) 142 self.addCleanup(p.stderr.close) 143 p.wait() 144 self.assertEqual(p.stdin, None) 145 146 def test_stdout_none(self): 147 # .stdout is None when not redirected, and the child's stdout will 148 # be inherited from the parent. In order to test this we run a 149 # subprocess in a subprocess: 150 # this_test 151 # \-- subprocess created by this test (parent) 152 # \-- subprocess created by the parent subprocess (child) 153 # The parent doesn't specify stdout, so the child will use the 154 # parent's stdout. This test checks that the message printed by the 155 # child goes to the parent stdout. The parent also checks that the 156 # child's stdout is None. See #11963. 157 code = ('import sys; from subprocess import Popen, PIPE;' 158 'p = Popen([sys.executable, "-c", "print \'test_stdout_none\'"],' 159 ' stdin=PIPE, stderr=PIPE);' 160 'p.wait(); assert p.stdout is None;') 161 p = subprocess.Popen([sys.executable, "-c", code], 162 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 163 self.addCleanup(p.stdout.close) 164 self.addCleanup(p.stderr.close) 165 out, err = p.communicate() 166 self.assertEqual(p.returncode, 0, err) 167 self.assertEqual(out.rstrip(), 'test_stdout_none') 168 169 def test_stderr_none(self): 170 # .stderr is None when not redirected 171 p = subprocess.Popen([sys.executable, "-c", 'print "banana"'], 172 stdin=subprocess.PIPE, stdout=subprocess.PIPE) 173 self.addCleanup(p.stdout.close) 174 self.addCleanup(p.stdin.close) 175 p.wait() 176 self.assertEqual(p.stderr, None) 177 178 def test_executable_with_cwd(self): 179 python_dir = os.path.dirname(os.path.realpath(sys.executable)) 180 p = subprocess.Popen(["somethingyoudonthave", "-c", 181 "import sys; sys.exit(47)"], 182 executable=sys.executable, cwd=python_dir) 183 p.wait() 184 self.assertEqual(p.returncode, 47) 185 186 @unittest.skipIf(sysconfig.is_python_build(), 187 "need an installed Python. See #7774") 188 def test_executable_without_cwd(self): 189 # For a normal installation, it should work without 'cwd' 190 # argument. For test runs in the build directory, see #7774. 191 p = subprocess.Popen(["somethingyoudonthave", "-c", 192 "import sys; sys.exit(47)"], 193 executable=sys.executable) 194 p.wait() 195 self.assertEqual(p.returncode, 47) 196 197 def test_stdin_pipe(self): 198 # stdin redirection 199 p = subprocess.Popen([sys.executable, "-c", 200 'import sys; sys.exit(sys.stdin.read() == "pear")'], 201 stdin=subprocess.PIPE) 202 p.stdin.write("pear") 203 p.stdin.close() 204 p.wait() 205 self.assertEqual(p.returncode, 1) 206 207 def test_stdin_filedes(self): 208 # stdin is set to open file descriptor 209 tf = tempfile.TemporaryFile() 210 d = tf.fileno() 211 os.write(d, "pear") 212 os.lseek(d, 0, 0) 213 p = subprocess.Popen([sys.executable, "-c", 214 'import sys; sys.exit(sys.stdin.read() == "pear")'], 215 stdin=d) 216 p.wait() 217 self.assertEqual(p.returncode, 1) 218 219 def test_stdin_fileobj(self): 220 # stdin is set to open file object 221 tf = tempfile.TemporaryFile() 222 tf.write("pear") 223 tf.seek(0) 224 p = subprocess.Popen([sys.executable, "-c", 225 'import sys; sys.exit(sys.stdin.read() == "pear")'], 226 stdin=tf) 227 p.wait() 228 self.assertEqual(p.returncode, 1) 229 230 def test_stdout_pipe(self): 231 # stdout redirection 232 p = subprocess.Popen([sys.executable, "-c", 233 'import sys; sys.stdout.write("orange")'], 234 stdout=subprocess.PIPE) 235 self.addCleanup(p.stdout.close) 236 self.assertEqual(p.stdout.read(), "orange") 237 238 def test_stdout_filedes(self): 239 # stdout is set to open file descriptor 240 tf = tempfile.TemporaryFile() 241 d = tf.fileno() 242 p = subprocess.Popen([sys.executable, "-c", 243 'import sys; sys.stdout.write("orange")'], 244 stdout=d) 245 p.wait() 246 os.lseek(d, 0, 0) 247 self.assertEqual(os.read(d, 1024), "orange") 248 249 def test_stdout_fileobj(self): 250 # stdout is set to open file object 251 tf = tempfile.TemporaryFile() 252 p = subprocess.Popen([sys.executable, "-c", 253 'import sys; sys.stdout.write("orange")'], 254 stdout=tf) 255 p.wait() 256 tf.seek(0) 257 self.assertEqual(tf.read(), "orange") 258 259 def test_stderr_pipe(self): 260 # stderr redirection 261 p = subprocess.Popen([sys.executable, "-c", 262 'import sys; sys.stderr.write("strawberry")'], 263 stderr=subprocess.PIPE) 264 self.addCleanup(p.stderr.close) 265 self.assertStderrEqual(p.stderr.read(), "strawberry") 266 267 def test_stderr_filedes(self): 268 # stderr is set to open file descriptor 269 tf = tempfile.TemporaryFile() 270 d = tf.fileno() 271 p = subprocess.Popen([sys.executable, "-c", 272 'import sys; sys.stderr.write("strawberry")'], 273 stderr=d) 274 p.wait() 275 os.lseek(d, 0, 0) 276 self.assertStderrEqual(os.read(d, 1024), "strawberry") 277 278 def test_stderr_fileobj(self): 279 # stderr is set to open file object 280 tf = tempfile.TemporaryFile() 281 p = subprocess.Popen([sys.executable, "-c", 282 'import sys; sys.stderr.write("strawberry")'], 283 stderr=tf) 284 p.wait() 285 tf.seek(0) 286 self.assertStderrEqual(tf.read(), "strawberry") 287 288 def test_stderr_redirect_with_no_stdout_redirect(self): 289 # test stderr=STDOUT while stdout=None (not set) 290 291 # - grandchild prints to stderr 292 # - child redirects grandchild's stderr to its stdout 293 # - the parent should get grandchild's stderr in child's stdout 294 p = subprocess.Popen([sys.executable, "-c", 295 'import sys, subprocess;' 296 'rc = subprocess.call([sys.executable, "-c",' 297 ' "import sys;"' 298 ' "sys.stderr.write(\'42\')"],' 299 ' stderr=subprocess.STDOUT);' 300 'sys.exit(rc)'], 301 stdout=subprocess.PIPE, 302 stderr=subprocess.PIPE) 303 stdout, stderr = p.communicate() 304 #NOTE: stdout should get stderr from grandchild 305 self.assertStderrEqual(stdout, b'42') 306 self.assertStderrEqual(stderr, b'') # should be empty 307 self.assertEqual(p.returncode, 0) 308 309 def test_stdout_stderr_pipe(self): 310 # capture stdout and stderr to the same pipe 311 p = subprocess.Popen([sys.executable, "-c", 312 'import sys;' 313 'sys.stdout.write("apple");' 314 'sys.stdout.flush();' 315 'sys.stderr.write("orange")'], 316 stdout=subprocess.PIPE, 317 stderr=subprocess.STDOUT) 318 self.addCleanup(p.stdout.close) 319 self.assertStderrEqual(p.stdout.read(), "appleorange") 320 321 def test_stdout_stderr_file(self): 322 # capture stdout and stderr to the same open file 323 tf = tempfile.TemporaryFile() 324 p = subprocess.Popen([sys.executable, "-c", 325 'import sys;' 326 'sys.stdout.write("apple");' 327 'sys.stdout.flush();' 328 'sys.stderr.write("orange")'], 329 stdout=tf, 330 stderr=tf) 331 p.wait() 332 tf.seek(0) 333 self.assertStderrEqual(tf.read(), "appleorange") 334 335 def test_stdout_filedes_of_stdout(self): 336 # stdout is set to 1 (#1531862). 337 # To avoid printing the text on stdout, we do something similar to 338 # test_stdout_none (see above). The parent subprocess calls the child 339 # subprocess passing stdout=1, and this test uses stdout=PIPE in 340 # order to capture and check the output of the parent. See #11963. 341 code = ('import sys, subprocess; ' 342 'rc = subprocess.call([sys.executable, "-c", ' 343 ' "import os, sys; sys.exit(os.write(sys.stdout.fileno(), ' 344 '\'test with stdout=1\'))"], stdout=1); ' 345 'assert rc == 18') 346 p = subprocess.Popen([sys.executable, "-c", code], 347 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 348 self.addCleanup(p.stdout.close) 349 self.addCleanup(p.stderr.close) 350 out, err = p.communicate() 351 self.assertEqual(p.returncode, 0, err) 352 self.assertEqual(out.rstrip(), 'test with stdout=1') 353 354 def test_cwd(self): 355 tmpdir = tempfile.gettempdir() 356 # We cannot use os.path.realpath to canonicalize the path, 357 # since it doesn't expand Tru64 {memb} strings. See bug 1063571. 358 cwd = os.getcwd() 359 os.chdir(tmpdir) 360 tmpdir = os.getcwd() 361 os.chdir(cwd) 362 p = subprocess.Popen([sys.executable, "-c", 363 'import sys,os;' 364 'sys.stdout.write(os.getcwd())'], 365 stdout=subprocess.PIPE, 366 cwd=tmpdir) 367 self.addCleanup(p.stdout.close) 368 normcase = os.path.normcase 369 self.assertEqual(normcase(p.stdout.read()), normcase(tmpdir)) 370 371 def test_env(self): 372 newenv = os.environ.copy() 373 newenv["FRUIT"] = "orange" 374 p = subprocess.Popen([sys.executable, "-c", 375 'import sys,os;' 376 'sys.stdout.write(os.getenv("FRUIT"))'], 377 stdout=subprocess.PIPE, 378 env=newenv) 379 self.addCleanup(p.stdout.close) 380 self.assertEqual(p.stdout.read(), "orange") 381 382 def test_communicate_stdin(self): 383 p = subprocess.Popen([sys.executable, "-c", 384 'import sys;' 385 'sys.exit(sys.stdin.read() == "pear")'], 386 stdin=subprocess.PIPE) 387 p.communicate("pear") 388 self.assertEqual(p.returncode, 1) 389 390 def test_communicate_stdout(self): 391 p = subprocess.Popen([sys.executable, "-c", 392 'import sys; sys.stdout.write("pineapple")'], 393 stdout=subprocess.PIPE) 394 (stdout, stderr) = p.communicate() 395 self.assertEqual(stdout, "pineapple") 396 self.assertEqual(stderr, None) 397 398 def test_communicate_stderr(self): 399 p = subprocess.Popen([sys.executable, "-c", 400 'import sys; sys.stderr.write("pineapple")'], 401 stderr=subprocess.PIPE) 402 (stdout, stderr) = p.communicate() 403 self.assertEqual(stdout, None) 404 self.assertStderrEqual(stderr, "pineapple") 405 406 def test_communicate(self): 407 p = subprocess.Popen([sys.executable, "-c", 408 'import sys,os;' 409 'sys.stderr.write("pineapple");' 410 'sys.stdout.write(sys.stdin.read())'], 411 stdin=subprocess.PIPE, 412 stdout=subprocess.PIPE, 413 stderr=subprocess.PIPE) 414 self.addCleanup(p.stdout.close) 415 self.addCleanup(p.stderr.close) 416 self.addCleanup(p.stdin.close) 417 (stdout, stderr) = p.communicate("banana") 418 self.assertEqual(stdout, "banana") 419 self.assertStderrEqual(stderr, "pineapple") 420 421 # This test is Linux specific for simplicity to at least have 422 # some coverage. It is not a platform specific bug. 423 @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()), 424 "Linux specific") 425 # Test for the fd leak reported in http://bugs.python.org/issue2791. 426 def test_communicate_pipe_fd_leak(self): 427 fd_directory = '/proc/%d/fd' % os.getpid() 428 num_fds_before_popen = len(os.listdir(fd_directory)) 429 p = subprocess.Popen([sys.executable, "-c", "print('')"], 430 stdout=subprocess.PIPE) 431 p.communicate() 432 num_fds_after_communicate = len(os.listdir(fd_directory)) 433 del p 434 num_fds_after_destruction = len(os.listdir(fd_directory)) 435 self.assertEqual(num_fds_before_popen, num_fds_after_destruction) 436 self.assertEqual(num_fds_before_popen, num_fds_after_communicate) 437 438 def test_communicate_returns(self): 439 # communicate() should return None if no redirection is active 440 p = subprocess.Popen([sys.executable, "-c", 441 "import sys; sys.exit(47)"]) 442 (stdout, stderr) = p.communicate() 443 self.assertEqual(stdout, None) 444 self.assertEqual(stderr, None) 445 446 def test_communicate_pipe_buf(self): 447 # communicate() with writes larger than pipe_buf 448 # This test will probably deadlock rather than fail, if 449 # communicate() does not work properly. 450 x, y = os.pipe() 451 if mswindows: 452 pipe_buf = 512 453 else: 454 pipe_buf = os.fpathconf(x, "PC_PIPE_BUF") 455 os.close(x) 456 os.close(y) 457 p = subprocess.Popen([sys.executable, "-c", 458 'import sys,os;' 459 'sys.stdout.write(sys.stdin.read(47));' 460 'sys.stderr.write("xyz"*%d);' 461 'sys.stdout.write(sys.stdin.read())' % pipe_buf], 462 stdin=subprocess.PIPE, 463 stdout=subprocess.PIPE, 464 stderr=subprocess.PIPE) 465 self.addCleanup(p.stdout.close) 466 self.addCleanup(p.stderr.close) 467 self.addCleanup(p.stdin.close) 468 string_to_write = "abc"*pipe_buf 469 (stdout, stderr) = p.communicate(string_to_write) 470 self.assertEqual(stdout, string_to_write) 471 472 def test_writes_before_communicate(self): 473 # stdin.write before communicate() 474 p = subprocess.Popen([sys.executable, "-c", 475 'import sys,os;' 476 'sys.stdout.write(sys.stdin.read())'], 477 stdin=subprocess.PIPE, 478 stdout=subprocess.PIPE, 479 stderr=subprocess.PIPE) 480 self.addCleanup(p.stdout.close) 481 self.addCleanup(p.stderr.close) 482 self.addCleanup(p.stdin.close) 483 p.stdin.write("banana") 484 (stdout, stderr) = p.communicate("split") 485 self.assertEqual(stdout, "bananasplit") 486 self.assertStderrEqual(stderr, "") 487 488 def test_universal_newlines(self): 489 p = subprocess.Popen([sys.executable, "-c", 490 'import sys,os;' + SETBINARY + 491 'sys.stdout.write("line1\\n");' 492 'sys.stdout.flush();' 493 'sys.stdout.write("line2\\r");' 494 'sys.stdout.flush();' 495 'sys.stdout.write("line3\\r\\n");' 496 'sys.stdout.flush();' 497 'sys.stdout.write("line4\\r");' 498 'sys.stdout.flush();' 499 'sys.stdout.write("\\nline5");' 500 'sys.stdout.flush();' 501 'sys.stdout.write("\\nline6");'], 502 stdout=subprocess.PIPE, 503 universal_newlines=1) 504 self.addCleanup(p.stdout.close) 505 stdout = p.stdout.read() 506 if hasattr(file, 'newlines'): 507 # Interpreter with universal newline support 508 self.assertEqual(stdout, 509 "line1\nline2\nline3\nline4\nline5\nline6") 510 else: 511 # Interpreter without universal newline support 512 self.assertEqual(stdout, 513 "line1\nline2\rline3\r\nline4\r\nline5\nline6") 514 515 def test_universal_newlines_communicate(self): 516 # universal newlines through communicate() 517 p = subprocess.Popen([sys.executable, "-c", 518 'import sys,os;' + SETBINARY + 519 'sys.stdout.write("line1\\n");' 520 'sys.stdout.flush();' 521 'sys.stdout.write("line2\\r");' 522 'sys.stdout.flush();' 523 'sys.stdout.write("line3\\r\\n");' 524 'sys.stdout.flush();' 525 'sys.stdout.write("line4\\r");' 526 'sys.stdout.flush();' 527 'sys.stdout.write("\\nline5");' 528 'sys.stdout.flush();' 529 'sys.stdout.write("\\nline6");'], 530 stdout=subprocess.PIPE, stderr=subprocess.PIPE, 531 universal_newlines=1) 532 self.addCleanup(p.stdout.close) 533 self.addCleanup(p.stderr.close) 534 (stdout, stderr) = p.communicate() 535 if hasattr(file, 'newlines'): 536 # Interpreter with universal newline support 537 self.assertEqual(stdout, 538 "line1\nline2\nline3\nline4\nline5\nline6") 539 else: 540 # Interpreter without universal newline support 541 self.assertEqual(stdout, 542 "line1\nline2\rline3\r\nline4\r\nline5\nline6") 543 544 def test_no_leaking(self): 545 # Make sure we leak no resources 546 if not mswindows: 547 max_handles = 1026 # too much for most UNIX systems 548 else: 549 max_handles = 2050 # too much for (at least some) Windows setups 550 handles = [] 551 try: 552 for i in range(max_handles): 553 try: 554 handles.append(os.open(test_support.TESTFN, 555 os.O_WRONLY | os.O_CREAT)) 556 except OSError as e: 557 if e.errno != errno.EMFILE: 558 raise 559 break 560 else: 561 self.skipTest("failed to reach the file descriptor limit " 562 "(tried %d)" % max_handles) 563 # Close a couple of them (should be enough for a subprocess) 564 for i in range(10): 565 os.close(handles.pop()) 566 # Loop creating some subprocesses. If one of them leaks some fds, 567 # the next loop iteration will fail by reaching the max fd limit. 568 for i in range(15): 569 p = subprocess.Popen([sys.executable, "-c", 570 "import sys;" 571 "sys.stdout.write(sys.stdin.read())"], 572 stdin=subprocess.PIPE, 573 stdout=subprocess.PIPE, 574 stderr=subprocess.PIPE) 575 data = p.communicate(b"lime")[0] 576 self.assertEqual(data, b"lime") 577 finally: 578 for h in handles: 579 os.close(h) 580 test_support.unlink(test_support.TESTFN) 581 582 def test_list2cmdline(self): 583 self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']), 584 '"a b c" d e') 585 self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']), 586 'ab\\"c \\ d') 587 self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']), 588 'ab\\"c " \\\\" d') 589 self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']), 590 'a\\\\\\b "de fg" h') 591 self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']), 592 'a\\\\\\"b c d') 593 self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']), 594 '"a\\\\b c" d e') 595 self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']), 596 '"a\\\\b\\ c" d e') 597 self.assertEqual(subprocess.list2cmdline(['ab', '']), 598 'ab ""') 599 600 601 def test_poll(self): 602 p = subprocess.Popen([sys.executable, 603 "-c", "import time; time.sleep(1)"]) 604 count = 0 605 while p.poll() is None: 606 time.sleep(0.1) 607 count += 1 608 # We expect that the poll loop probably went around about 10 times, 609 # but, based on system scheduling we can't control, it's possible 610 # poll() never returned None. It "should be" very rare that it 611 # didn't go around at least twice. 612 self.assertGreaterEqual(count, 2) 613 # Subsequent invocations should just return the returncode 614 self.assertEqual(p.poll(), 0) 615 616 617 def test_wait(self): 618 p = subprocess.Popen([sys.executable, 619 "-c", "import time; time.sleep(2)"]) 620 self.assertEqual(p.wait(), 0) 621 # Subsequent invocations should just return the returncode 622 self.assertEqual(p.wait(), 0) 623 624 625 def test_invalid_bufsize(self): 626 # an invalid type of the bufsize argument should raise 627 # TypeError. 628 with self.assertRaises(TypeError): 629 subprocess.Popen([sys.executable, "-c", "pass"], "orange") 630 631 def test_leaking_fds_on_error(self): 632 # see bug #5179: Popen leaks file descriptors to PIPEs if 633 # the child fails to execute; this will eventually exhaust 634 # the maximum number of open fds. 1024 seems a very common 635 # value for that limit, but Windows has 2048, so we loop 636 # 1024 times (each call leaked two fds). 637 for i in range(1024): 638 # Windows raises IOError. Others raise OSError. 639 with self.assertRaises(EnvironmentError) as c: 640 subprocess.Popen(['nonexisting_i_hope'], 641 stdout=subprocess.PIPE, 642 stderr=subprocess.PIPE) 643 # ignore errors that indicate the command was not found 644 if c.exception.errno not in (errno.ENOENT, errno.EACCES): 645 raise c.exception 646 647 @unittest.skipIf(threading is None, "threading required") 648 def test_double_close_on_error(self): 649 # Issue #18851 650 fds = [] 651 def open_fds(): 652 for i in range(20): 653 fds.extend(os.pipe()) 654 time.sleep(0.001) 655 t = threading.Thread(target=open_fds) 656 t.start() 657 try: 658 with self.assertRaises(EnvironmentError): 659 subprocess.Popen(['nonexisting_i_hope'], 660 stdin=subprocess.PIPE, 661 stdout=subprocess.PIPE, 662 stderr=subprocess.PIPE) 663 finally: 664 t.join() 665 exc = None 666 for fd in fds: 667 # If a double close occurred, some of those fds will 668 # already have been closed by mistake, and os.close() 669 # here will raise. 670 try: 671 os.close(fd) 672 except OSError as e: 673 exc = e 674 if exc is not None: 675 raise exc 676 677 def test_handles_closed_on_exception(self): 678 # If CreateProcess exits with an error, ensure the 679 # duplicate output handles are released 680 ifhandle, ifname = tempfile.mkstemp() 681 ofhandle, ofname = tempfile.mkstemp() 682 efhandle, efname = tempfile.mkstemp() 683 try: 684 subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle, 685 stderr=efhandle) 686 except OSError: 687 os.close(ifhandle) 688 os.remove(ifname) 689 os.close(ofhandle) 690 os.remove(ofname) 691 os.close(efhandle) 692 os.remove(efname) 693 self.assertFalse(os.path.exists(ifname)) 694 self.assertFalse(os.path.exists(ofname)) 695 self.assertFalse(os.path.exists(efname)) 696 697 def test_communicate_epipe(self): 698 # Issue 10963: communicate() should hide EPIPE 699 p = subprocess.Popen([sys.executable, "-c", 'pass'], 700 stdin=subprocess.PIPE, 701 stdout=subprocess.PIPE, 702 stderr=subprocess.PIPE) 703 self.addCleanup(p.stdout.close) 704 self.addCleanup(p.stderr.close) 705 self.addCleanup(p.stdin.close) 706 p.communicate("x" * 2**20) 707 708 def test_communicate_epipe_only_stdin(self): 709 # Issue 10963: communicate() should hide EPIPE 710 p = subprocess.Popen([sys.executable, "-c", 'pass'], 711 stdin=subprocess.PIPE) 712 self.addCleanup(p.stdin.close) 713 time.sleep(2) 714 p.communicate("x" * 2**20) 715 716 # This test is Linux-ish specific for simplicity to at least have 717 # some coverage. It is not a platform specific bug. 718 @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()), 719 "Linux specific") 720 def test_failed_child_execute_fd_leak(self): 721 """Test for the fork() failure fd leak reported in issue16327.""" 722 fd_directory = '/proc/%d/fd' % os.getpid() 723 fds_before_popen = os.listdir(fd_directory) 724 with self.assertRaises(PopenTestException): 725 PopenExecuteChildRaises( 726 [sys.executable, '-c', 'pass'], stdin=subprocess.PIPE, 727 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 728 729 # NOTE: This test doesn't verify that the real _execute_child 730 # does not close the file descriptors itself on the way out 731 # during an exception. Code inspection has confirmed that. 732 733 fds_after_exception = os.listdir(fd_directory) 734 self.assertEqual(fds_before_popen, fds_after_exception) 735 736 737# context manager 738class _SuppressCoreFiles(object): 739 """Try to prevent core files from being created.""" 740 old_limit = None 741 742 def __enter__(self): 743 """Try to save previous ulimit, then set it to (0, 0).""" 744 if resource is not None: 745 try: 746 self.old_limit = resource.getrlimit(resource.RLIMIT_CORE) 747 resource.setrlimit(resource.RLIMIT_CORE, (0, 0)) 748 except (ValueError, resource.error): 749 pass 750 751 if sys.platform == 'darwin': 752 # Check if the 'Crash Reporter' on OSX was configured 753 # in 'Developer' mode and warn that it will get triggered 754 # when it is. 755 # 756 # This assumes that this context manager is used in tests 757 # that might trigger the next manager. 758 value = subprocess.Popen(['/usr/bin/defaults', 'read', 759 'com.apple.CrashReporter', 'DialogType'], 760 stdout=subprocess.PIPE).communicate()[0] 761 if value.strip() == b'developer': 762 print "this tests triggers the Crash Reporter, that is intentional" 763 sys.stdout.flush() 764 765 def __exit__(self, *args): 766 """Return core file behavior to default.""" 767 if self.old_limit is None: 768 return 769 if resource is not None: 770 try: 771 resource.setrlimit(resource.RLIMIT_CORE, self.old_limit) 772 except (ValueError, resource.error): 773 pass 774 775 @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 776 "Requires signal.SIGALRM") 777 def test_communicate_eintr(self): 778 # Issue #12493: communicate() should handle EINTR 779 def handler(signum, frame): 780 pass 781 old_handler = signal.signal(signal.SIGALRM, handler) 782 self.addCleanup(signal.signal, signal.SIGALRM, old_handler) 783 784 # the process is running for 2 seconds 785 args = [sys.executable, "-c", 'import time; time.sleep(2)'] 786 for stream in ('stdout', 'stderr'): 787 kw = {stream: subprocess.PIPE} 788 with subprocess.Popen(args, **kw) as process: 789 signal.alarm(1) 790 # communicate() will be interrupted by SIGALRM 791 process.communicate() 792 793 794@unittest.skipIf(mswindows, "POSIX specific tests") 795class POSIXProcessTestCase(BaseTestCase): 796 797 def test_exceptions(self): 798 # caught & re-raised exceptions 799 with self.assertRaises(OSError) as c: 800 p = subprocess.Popen([sys.executable, "-c", ""], 801 cwd="/this/path/does/not/exist") 802 # The attribute child_traceback should contain "os.chdir" somewhere. 803 self.assertIn("os.chdir", c.exception.child_traceback) 804 805 def test_run_abort(self): 806 # returncode handles signal termination 807 with _SuppressCoreFiles(): 808 p = subprocess.Popen([sys.executable, "-c", 809 "import os; os.abort()"]) 810 p.wait() 811 self.assertEqual(-p.returncode, signal.SIGABRT) 812 813 def test_preexec(self): 814 # preexec function 815 p = subprocess.Popen([sys.executable, "-c", 816 "import sys, os;" 817 "sys.stdout.write(os.getenv('FRUIT'))"], 818 stdout=subprocess.PIPE, 819 preexec_fn=lambda: os.putenv("FRUIT", "apple")) 820 self.addCleanup(p.stdout.close) 821 self.assertEqual(p.stdout.read(), "apple") 822 823 class _TestExecuteChildPopen(subprocess.Popen): 824 """Used to test behavior at the end of _execute_child.""" 825 def __init__(self, testcase, *args, **kwargs): 826 self._testcase = testcase 827 subprocess.Popen.__init__(self, *args, **kwargs) 828 829 def _execute_child( 830 self, args, executable, preexec_fn, close_fds, cwd, env, 831 universal_newlines, startupinfo, creationflags, shell, to_close, 832 p2cread, p2cwrite, 833 c2pread, c2pwrite, 834 errread, errwrite): 835 try: 836 subprocess.Popen._execute_child( 837 self, args, executable, preexec_fn, close_fds, 838 cwd, env, universal_newlines, 839 startupinfo, creationflags, shell, to_close, 840 p2cread, p2cwrite, 841 c2pread, c2pwrite, 842 errread, errwrite) 843 finally: 844 # Open a bunch of file descriptors and verify that 845 # none of them are the same as the ones the Popen 846 # instance is using for stdin/stdout/stderr. 847 devzero_fds = [os.open("/dev/zero", os.O_RDONLY) 848 for _ in range(8)] 849 try: 850 for fd in devzero_fds: 851 self._testcase.assertNotIn( 852 fd, (p2cwrite, c2pread, errread)) 853 finally: 854 for fd in devzero_fds: 855 os.close(fd) 856 857 @unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.") 858 def test_preexec_errpipe_does_not_double_close_pipes(self): 859 """Issue16140: Don't double close pipes on preexec error.""" 860 861 def raise_it(): 862 raise RuntimeError("force the _execute_child() errpipe_data path.") 863 864 with self.assertRaises(RuntimeError): 865 self._TestExecuteChildPopen( 866 self, [sys.executable, "-c", "pass"], 867 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 868 stderr=subprocess.PIPE, preexec_fn=raise_it) 869 870 def test_args_string(self): 871 # args is a string 872 f, fname = tempfile.mkstemp() 873 os.write(f, "#!/bin/sh\n") 874 os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" % 875 sys.executable) 876 os.close(f) 877 os.chmod(fname, 0o700) 878 p = subprocess.Popen(fname) 879 p.wait() 880 os.remove(fname) 881 self.assertEqual(p.returncode, 47) 882 883 def test_invalid_args(self): 884 # invalid arguments should raise ValueError 885 self.assertRaises(ValueError, subprocess.call, 886 [sys.executable, "-c", 887 "import sys; sys.exit(47)"], 888 startupinfo=47) 889 self.assertRaises(ValueError, subprocess.call, 890 [sys.executable, "-c", 891 "import sys; sys.exit(47)"], 892 creationflags=47) 893 894 def test_shell_sequence(self): 895 # Run command through the shell (sequence) 896 newenv = os.environ.copy() 897 newenv["FRUIT"] = "apple" 898 p = subprocess.Popen(["echo $FRUIT"], shell=1, 899 stdout=subprocess.PIPE, 900 env=newenv) 901 self.addCleanup(p.stdout.close) 902 self.assertEqual(p.stdout.read().strip(), "apple") 903 904 def test_shell_string(self): 905 # Run command through the shell (string) 906 newenv = os.environ.copy() 907 newenv["FRUIT"] = "apple" 908 p = subprocess.Popen("echo $FRUIT", shell=1, 909 stdout=subprocess.PIPE, 910 env=newenv) 911 self.addCleanup(p.stdout.close) 912 self.assertEqual(p.stdout.read().strip(), "apple") 913 914 def test_call_string(self): 915 # call() function with string argument on UNIX 916 f, fname = tempfile.mkstemp() 917 os.write(f, "#!/bin/sh\n") 918 os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" % 919 sys.executable) 920 os.close(f) 921 os.chmod(fname, 0700) 922 rc = subprocess.call(fname) 923 os.remove(fname) 924 self.assertEqual(rc, 47) 925 926 def test_specific_shell(self): 927 # Issue #9265: Incorrect name passed as arg[0]. 928 shells = [] 929 for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']: 930 for name in ['bash', 'ksh']: 931 sh = os.path.join(prefix, name) 932 if os.path.isfile(sh): 933 shells.append(sh) 934 if not shells: # Will probably work for any shell but csh. 935 self.skipTest("bash or ksh required for this test") 936 sh = '/bin/sh' 937 if os.path.isfile(sh) and not os.path.islink(sh): 938 # Test will fail if /bin/sh is a symlink to csh. 939 shells.append(sh) 940 for sh in shells: 941 p = subprocess.Popen("echo $0", executable=sh, shell=True, 942 stdout=subprocess.PIPE) 943 self.addCleanup(p.stdout.close) 944 self.assertEqual(p.stdout.read().strip(), sh) 945 946 def _kill_process(self, method, *args): 947 # Do not inherit file handles from the parent. 948 # It should fix failures on some platforms. 949 p = subprocess.Popen([sys.executable, "-c", """if 1: 950 import sys, time 951 sys.stdout.write('x\\n') 952 sys.stdout.flush() 953 time.sleep(30) 954 """], 955 close_fds=True, 956 stdin=subprocess.PIPE, 957 stdout=subprocess.PIPE, 958 stderr=subprocess.PIPE) 959 # Wait for the interpreter to be completely initialized before 960 # sending any signal. 961 p.stdout.read(1) 962 getattr(p, method)(*args) 963 return p 964 965 @unittest.skipIf(sys.platform.startswith(('netbsd', 'openbsd')), 966 "Due to known OS bug (issue #16762)") 967 def _kill_dead_process(self, method, *args): 968 # Do not inherit file handles from the parent. 969 # It should fix failures on some platforms. 970 p = subprocess.Popen([sys.executable, "-c", """if 1: 971 import sys, time 972 sys.stdout.write('x\\n') 973 sys.stdout.flush() 974 """], 975 close_fds=True, 976 stdin=subprocess.PIPE, 977 stdout=subprocess.PIPE, 978 stderr=subprocess.PIPE) 979 # Wait for the interpreter to be completely initialized before 980 # sending any signal. 981 p.stdout.read(1) 982 # The process should end after this 983 time.sleep(1) 984 # This shouldn't raise even though the child is now dead 985 getattr(p, method)(*args) 986 p.communicate() 987 988 def test_send_signal(self): 989 p = self._kill_process('send_signal', signal.SIGINT) 990 _, stderr = p.communicate() 991 self.assertIn('KeyboardInterrupt', stderr) 992 self.assertNotEqual(p.wait(), 0) 993 994 def test_kill(self): 995 p = self._kill_process('kill') 996 _, stderr = p.communicate() 997 self.assertStderrEqual(stderr, '') 998 self.assertEqual(p.wait(), -signal.SIGKILL) 999 1000 def test_terminate(self): 1001 p = self._kill_process('terminate') 1002 _, stderr = p.communicate() 1003 self.assertStderrEqual(stderr, '') 1004 self.assertEqual(p.wait(), -signal.SIGTERM) 1005 1006 def test_send_signal_dead(self): 1007 # Sending a signal to a dead process 1008 self._kill_dead_process('send_signal', signal.SIGINT) 1009 1010 def test_kill_dead(self): 1011 # Killing a dead process 1012 self._kill_dead_process('kill') 1013 1014 def test_terminate_dead(self): 1015 # Terminating a dead process 1016 self._kill_dead_process('terminate') 1017 1018 def check_close_std_fds(self, fds): 1019 # Issue #9905: test that subprocess pipes still work properly with 1020 # some standard fds closed 1021 stdin = 0 1022 newfds = [] 1023 for a in fds: 1024 b = os.dup(a) 1025 newfds.append(b) 1026 if a == 0: 1027 stdin = b 1028 try: 1029 for fd in fds: 1030 os.close(fd) 1031 out, err = subprocess.Popen([sys.executable, "-c", 1032 'import sys;' 1033 'sys.stdout.write("apple");' 1034 'sys.stdout.flush();' 1035 'sys.stderr.write("orange")'], 1036 stdin=stdin, 1037 stdout=subprocess.PIPE, 1038 stderr=subprocess.PIPE).communicate() 1039 err = test_support.strip_python_stderr(err) 1040 self.assertEqual((out, err), (b'apple', b'orange')) 1041 finally: 1042 for b, a in zip(newfds, fds): 1043 os.dup2(b, a) 1044 for b in newfds: 1045 os.close(b) 1046 1047 def test_close_fd_0(self): 1048 self.check_close_std_fds([0]) 1049 1050 def test_close_fd_1(self): 1051 self.check_close_std_fds([1]) 1052 1053 def test_close_fd_2(self): 1054 self.check_close_std_fds([2]) 1055 1056 def test_close_fds_0_1(self): 1057 self.check_close_std_fds([0, 1]) 1058 1059 def test_close_fds_0_2(self): 1060 self.check_close_std_fds([0, 2]) 1061 1062 def test_close_fds_1_2(self): 1063 self.check_close_std_fds([1, 2]) 1064 1065 def test_close_fds_0_1_2(self): 1066 # Issue #10806: test that subprocess pipes still work properly with 1067 # all standard fds closed. 1068 self.check_close_std_fds([0, 1, 2]) 1069 1070 def check_swap_fds(self, stdin_no, stdout_no, stderr_no): 1071 # open up some temporary files 1072 temps = [tempfile.mkstemp() for i in range(3)] 1073 temp_fds = [fd for fd, fname in temps] 1074 try: 1075 # unlink the files -- we won't need to reopen them 1076 for fd, fname in temps: 1077 os.unlink(fname) 1078 1079 # save a copy of the standard file descriptors 1080 saved_fds = [os.dup(fd) for fd in range(3)] 1081 try: 1082 # duplicate the temp files over the standard fd's 0, 1, 2 1083 for fd, temp_fd in enumerate(temp_fds): 1084 os.dup2(temp_fd, fd) 1085 1086 # write some data to what will become stdin, and rewind 1087 os.write(stdin_no, b"STDIN") 1088 os.lseek(stdin_no, 0, 0) 1089 1090 # now use those files in the given order, so that subprocess 1091 # has to rearrange them in the child 1092 p = subprocess.Popen([sys.executable, "-c", 1093 'import sys; got = sys.stdin.read();' 1094 'sys.stdout.write("got %s"%got); sys.stderr.write("err")'], 1095 stdin=stdin_no, 1096 stdout=stdout_no, 1097 stderr=stderr_no) 1098 p.wait() 1099 1100 for fd in temp_fds: 1101 os.lseek(fd, 0, 0) 1102 1103 out = os.read(stdout_no, 1024) 1104 err = test_support.strip_python_stderr(os.read(stderr_no, 1024)) 1105 finally: 1106 for std, saved in enumerate(saved_fds): 1107 os.dup2(saved, std) 1108 os.close(saved) 1109 1110 self.assertEqual(out, b"got STDIN") 1111 self.assertEqual(err, b"err") 1112 1113 finally: 1114 for fd in temp_fds: 1115 os.close(fd) 1116 1117 # When duping fds, if there arises a situation where one of the fds is 1118 # either 0, 1 or 2, it is possible that it is overwritten (#12607). 1119 # This tests all combinations of this. 1120 def test_swap_fds(self): 1121 self.check_swap_fds(0, 1, 2) 1122 self.check_swap_fds(0, 2, 1) 1123 self.check_swap_fds(1, 0, 2) 1124 self.check_swap_fds(1, 2, 0) 1125 self.check_swap_fds(2, 0, 1) 1126 self.check_swap_fds(2, 1, 0) 1127 1128 def test_wait_when_sigchild_ignored(self): 1129 # NOTE: sigchild_ignore.py may not be an effective test on all OSes. 1130 sigchild_ignore = test_support.findfile("sigchild_ignore.py", 1131 subdir="subprocessdata") 1132 p = subprocess.Popen([sys.executable, sigchild_ignore], 1133 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 1134 stdout, stderr = p.communicate() 1135 self.assertEqual(0, p.returncode, "sigchild_ignore.py exited" 1136 " non-zero with this error:\n%s" % stderr) 1137 1138 def test_zombie_fast_process_del(self): 1139 # Issue #12650: on Unix, if Popen.__del__() was called before the 1140 # process exited, it wouldn't be added to subprocess._active, and would 1141 # remain a zombie. 1142 # spawn a Popen, and delete its reference before it exits 1143 p = subprocess.Popen([sys.executable, "-c", 1144 'import sys, time;' 1145 'time.sleep(0.2)'], 1146 stdout=subprocess.PIPE, 1147 stderr=subprocess.PIPE) 1148 self.addCleanup(p.stdout.close) 1149 self.addCleanup(p.stderr.close) 1150 ident = id(p) 1151 pid = p.pid 1152 del p 1153 # check that p is in the active processes list 1154 self.assertIn(ident, [id(o) for o in subprocess._active]) 1155 1156 def test_leak_fast_process_del_killed(self): 1157 # Issue #12650: on Unix, if Popen.__del__() was called before the 1158 # process exited, and the process got killed by a signal, it would never 1159 # be removed from subprocess._active, which triggered a FD and memory 1160 # leak. 1161 # spawn a Popen, delete its reference and kill it 1162 p = subprocess.Popen([sys.executable, "-c", 1163 'import time;' 1164 'time.sleep(3)'], 1165 stdout=subprocess.PIPE, 1166 stderr=subprocess.PIPE) 1167 self.addCleanup(p.stdout.close) 1168 self.addCleanup(p.stderr.close) 1169 ident = id(p) 1170 pid = p.pid 1171 del p 1172 os.kill(pid, signal.SIGKILL) 1173 # check that p is in the active processes list 1174 self.assertIn(ident, [id(o) for o in subprocess._active]) 1175 1176 # let some time for the process to exit, and create a new Popen: this 1177 # should trigger the wait() of p 1178 time.sleep(0.2) 1179 with self.assertRaises(EnvironmentError) as c: 1180 with subprocess.Popen(['nonexisting_i_hope'], 1181 stdout=subprocess.PIPE, 1182 stderr=subprocess.PIPE) as proc: 1183 pass 1184 # p should have been wait()ed on, and removed from the _active list 1185 self.assertRaises(OSError, os.waitpid, pid, 0) 1186 self.assertNotIn(ident, [id(o) for o in subprocess._active]) 1187 1188 def test_pipe_cloexec(self): 1189 # Issue 12786: check that the communication pipes' FDs are set CLOEXEC, 1190 # and are not inherited by another child process. 1191 p1 = subprocess.Popen([sys.executable, "-c", 1192 'import os;' 1193 'os.read(0, 1)' 1194 ], 1195 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 1196 stderr=subprocess.PIPE) 1197 1198 p2 = subprocess.Popen([sys.executable, "-c", """if True: 1199 import os, errno, sys 1200 for fd in %r: 1201 try: 1202 os.close(fd) 1203 except OSError as e: 1204 if e.errno != errno.EBADF: 1205 raise 1206 else: 1207 sys.exit(1) 1208 sys.exit(0) 1209 """ % [f.fileno() for f in (p1.stdin, p1.stdout, 1210 p1.stderr)] 1211 ], 1212 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 1213 stderr=subprocess.PIPE, close_fds=False) 1214 p1.communicate('foo') 1215 _, stderr = p2.communicate() 1216 1217 self.assertEqual(p2.returncode, 0, "Unexpected error: " + repr(stderr)) 1218 1219 1220@unittest.skipUnless(mswindows, "Windows specific tests") 1221class Win32ProcessTestCase(BaseTestCase): 1222 1223 def test_startupinfo(self): 1224 # startupinfo argument 1225 # We uses hardcoded constants, because we do not want to 1226 # depend on win32all. 1227 STARTF_USESHOWWINDOW = 1 1228 SW_MAXIMIZE = 3 1229 startupinfo = subprocess.STARTUPINFO() 1230 startupinfo.dwFlags = STARTF_USESHOWWINDOW 1231 startupinfo.wShowWindow = SW_MAXIMIZE 1232 # Since Python is a console process, it won't be affected 1233 # by wShowWindow, but the argument should be silently 1234 # ignored 1235 subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"], 1236 startupinfo=startupinfo) 1237 1238 def test_creationflags(self): 1239 # creationflags argument 1240 CREATE_NEW_CONSOLE = 16 1241 sys.stderr.write(" a DOS box should flash briefly ...\n") 1242 subprocess.call(sys.executable + 1243 ' -c "import time; time.sleep(0.25)"', 1244 creationflags=CREATE_NEW_CONSOLE) 1245 1246 def test_invalid_args(self): 1247 # invalid arguments should raise ValueError 1248 self.assertRaises(ValueError, subprocess.call, 1249 [sys.executable, "-c", 1250 "import sys; sys.exit(47)"], 1251 preexec_fn=lambda: 1) 1252 self.assertRaises(ValueError, subprocess.call, 1253 [sys.executable, "-c", 1254 "import sys; sys.exit(47)"], 1255 stdout=subprocess.PIPE, 1256 close_fds=True) 1257 1258 def test_close_fds(self): 1259 # close file descriptors 1260 rc = subprocess.call([sys.executable, "-c", 1261 "import sys; sys.exit(47)"], 1262 close_fds=True) 1263 self.assertEqual(rc, 47) 1264 1265 def test_shell_sequence(self): 1266 # Run command through the shell (sequence) 1267 newenv = os.environ.copy() 1268 newenv["FRUIT"] = "physalis" 1269 p = subprocess.Popen(["set"], shell=1, 1270 stdout=subprocess.PIPE, 1271 env=newenv) 1272 self.addCleanup(p.stdout.close) 1273 self.assertIn("physalis", p.stdout.read()) 1274 1275 def test_shell_string(self): 1276 # Run command through the shell (string) 1277 newenv = os.environ.copy() 1278 newenv["FRUIT"] = "physalis" 1279 p = subprocess.Popen("set", shell=1, 1280 stdout=subprocess.PIPE, 1281 env=newenv) 1282 self.addCleanup(p.stdout.close) 1283 self.assertIn("physalis", p.stdout.read()) 1284 1285 def test_call_string(self): 1286 # call() function with string argument on Windows 1287 rc = subprocess.call(sys.executable + 1288 ' -c "import sys; sys.exit(47)"') 1289 self.assertEqual(rc, 47) 1290 1291 def _kill_process(self, method, *args): 1292 # Some win32 buildbot raises EOFError if stdin is inherited 1293 p = subprocess.Popen([sys.executable, "-c", """if 1: 1294 import sys, time 1295 sys.stdout.write('x\\n') 1296 sys.stdout.flush() 1297 time.sleep(30) 1298 """], 1299 stdin=subprocess.PIPE, 1300 stdout=subprocess.PIPE, 1301 stderr=subprocess.PIPE) 1302 self.addCleanup(p.stdout.close) 1303 self.addCleanup(p.stderr.close) 1304 self.addCleanup(p.stdin.close) 1305 # Wait for the interpreter to be completely initialized before 1306 # sending any signal. 1307 p.stdout.read(1) 1308 getattr(p, method)(*args) 1309 _, stderr = p.communicate() 1310 self.assertStderrEqual(stderr, '') 1311 returncode = p.wait() 1312 self.assertNotEqual(returncode, 0) 1313 1314 def _kill_dead_process(self, method, *args): 1315 p = subprocess.Popen([sys.executable, "-c", """if 1: 1316 import sys, time 1317 sys.stdout.write('x\\n') 1318 sys.stdout.flush() 1319 sys.exit(42) 1320 """], 1321 stdin=subprocess.PIPE, 1322 stdout=subprocess.PIPE, 1323 stderr=subprocess.PIPE) 1324 self.addCleanup(p.stdout.close) 1325 self.addCleanup(p.stderr.close) 1326 self.addCleanup(p.stdin.close) 1327 # Wait for the interpreter to be completely initialized before 1328 # sending any signal. 1329 p.stdout.read(1) 1330 # The process should end after this 1331 time.sleep(1) 1332 # This shouldn't raise even though the child is now dead 1333 getattr(p, method)(*args) 1334 _, stderr = p.communicate() 1335 self.assertStderrEqual(stderr, b'') 1336 rc = p.wait() 1337 self.assertEqual(rc, 42) 1338 1339 def test_send_signal(self): 1340 self._kill_process('send_signal', signal.SIGTERM) 1341 1342 def test_kill(self): 1343 self._kill_process('kill') 1344 1345 def test_terminate(self): 1346 self._kill_process('terminate') 1347 1348 def test_send_signal_dead(self): 1349 self._kill_dead_process('send_signal', signal.SIGTERM) 1350 1351 def test_kill_dead(self): 1352 self._kill_dead_process('kill') 1353 1354 def test_terminate_dead(self): 1355 self._kill_dead_process('terminate') 1356 1357 1358@unittest.skipUnless(getattr(subprocess, '_has_poll', False), 1359 "poll system call not supported") 1360class ProcessTestCaseNoPoll(ProcessTestCase): 1361 def setUp(self): 1362 subprocess._has_poll = False 1363 ProcessTestCase.setUp(self) 1364 1365 def tearDown(self): 1366 subprocess._has_poll = True 1367 ProcessTestCase.tearDown(self) 1368 1369 1370class HelperFunctionTests(unittest.TestCase): 1371 @unittest.skipIf(mswindows, "errno and EINTR make no sense on windows") 1372 def test_eintr_retry_call(self): 1373 record_calls = [] 1374 def fake_os_func(*args): 1375 record_calls.append(args) 1376 if len(record_calls) == 2: 1377 raise OSError(errno.EINTR, "fake interrupted system call") 1378 return tuple(reversed(args)) 1379 1380 self.assertEqual((999, 256), 1381 subprocess._eintr_retry_call(fake_os_func, 256, 999)) 1382 self.assertEqual([(256, 999)], record_calls) 1383 # This time there will be an EINTR so it will loop once. 1384 self.assertEqual((666,), 1385 subprocess._eintr_retry_call(fake_os_func, 666)) 1386 self.assertEqual([(256, 999), (666,), (666,)], record_calls) 1387 1388@unittest.skipUnless(mswindows, "mswindows only") 1389class CommandsWithSpaces (BaseTestCase): 1390 1391 def setUp(self): 1392 super(CommandsWithSpaces, self).setUp() 1393 f, fname = tempfile.mkstemp(".py", "te st") 1394 self.fname = fname.lower () 1395 os.write(f, b"import sys;" 1396 b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))" 1397 ) 1398 os.close(f) 1399 1400 def tearDown(self): 1401 os.remove(self.fname) 1402 super(CommandsWithSpaces, self).tearDown() 1403 1404 def with_spaces(self, *args, **kwargs): 1405 kwargs['stdout'] = subprocess.PIPE 1406 p = subprocess.Popen(*args, **kwargs) 1407 self.addCleanup(p.stdout.close) 1408 self.assertEqual( 1409 p.stdout.read ().decode("mbcs"), 1410 "2 [%r, 'ab cd']" % self.fname 1411 ) 1412 1413 def test_shell_string_with_spaces(self): 1414 # call() function with string argument with spaces on Windows 1415 self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, 1416 "ab cd"), shell=1) 1417 1418 def test_shell_sequence_with_spaces(self): 1419 # call() function with sequence argument with spaces on Windows 1420 self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1) 1421 1422 def test_noshell_string_with_spaces(self): 1423 # call() function with string argument with spaces on Windows 1424 self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, 1425 "ab cd")) 1426 1427 def test_noshell_sequence_with_spaces(self): 1428 # call() function with sequence argument with spaces on Windows 1429 self.with_spaces([sys.executable, self.fname, "ab cd"]) 1430 1431def test_main(): 1432 unit_tests = (ProcessTestCase, 1433 POSIXProcessTestCase, 1434 Win32ProcessTestCase, 1435 ProcessTestCaseNoPoll, 1436 HelperFunctionTests, 1437 CommandsWithSpaces) 1438 1439 test_support.run_unittest(*unit_tests) 1440 test_support.reap_children() 1441 1442if __name__ == "__main__": 1443 test_main() 1444