• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import unittest
2from unittest import mock
3from test import support
4import subprocess
5import sys
6import signal
7import io
8import itertools
9import os
10import errno
11import tempfile
12import time
13import traceback
14import selectors
15import sysconfig
16import select
17import shutil
18import threading
19import gc
20import textwrap
21from test.support import FakePath
22
23try:
24    import _testcapi
25except ImportError:
26    _testcapi = None
27
28
29if support.PGO:
30    raise unittest.SkipTest("test is not helpful for PGO")
31
32mswindows = (sys.platform == "win32")
33
34#
35# Depends on the following external programs: Python
36#
37
38if mswindows:
39    SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), '
40                                                'os.O_BINARY);')
41else:
42    SETBINARY = ''
43
44NONEXISTING_CMD = ('nonexisting_i_hope',)
45# Ignore errors that indicate the command was not found
46NONEXISTING_ERRORS = (FileNotFoundError, NotADirectoryError, PermissionError)
47
48ZERO_RETURN_CMD = (sys.executable, '-c', 'pass')
49
50
51def setUpModule():
52    shell_true = shutil.which('true')
53    if shell_true is None:
54        return
55    if (os.access(shell_true, os.X_OK) and
56        subprocess.run([shell_true]).returncode == 0):
57        global ZERO_RETURN_CMD
58        ZERO_RETURN_CMD = (shell_true,)  # Faster than Python startup.
59
60
61class BaseTestCase(unittest.TestCase):
62    def setUp(self):
63        # Try to minimize the number of children we have so this test
64        # doesn't crash on some buildbots (Alphas in particular).
65        support.reap_children()
66
67    def tearDown(self):
68        if not mswindows:
69            # subprocess._active is not used on Windows and is set to None.
70            for inst in subprocess._active:
71                inst.wait()
72            subprocess._cleanup()
73            self.assertFalse(
74                subprocess._active, "subprocess._active not empty"
75            )
76        self.doCleanups()
77        support.reap_children()
78
79    def assertStderrEqual(self, stderr, expected, msg=None):
80        # In a debug build, stuff like "[6580 refs]" is printed to stderr at
81        # shutdown time.  That frustrates tests trying to check stderr produced
82        # from a spawned Python process.
83        actual = support.strip_python_stderr(stderr)
84        # strip_python_stderr also strips whitespace, so we do too.
85        expected = expected.strip()
86        self.assertEqual(actual, expected, msg)
87
88
89class PopenTestException(Exception):
90    pass
91
92
93class PopenExecuteChildRaises(subprocess.Popen):
94    """Popen subclass for testing cleanup of subprocess.PIPE filehandles when
95    _execute_child fails.
96    """
97    def _execute_child(self, *args, **kwargs):
98        raise PopenTestException("Forced Exception for Test")
99
100
101class ProcessTestCase(BaseTestCase):
102
103    def test_io_buffered_by_default(self):
104        p = subprocess.Popen(ZERO_RETURN_CMD,
105                             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
106                             stderr=subprocess.PIPE)
107        try:
108            self.assertIsInstance(p.stdin, io.BufferedIOBase)
109            self.assertIsInstance(p.stdout, io.BufferedIOBase)
110            self.assertIsInstance(p.stderr, io.BufferedIOBase)
111        finally:
112            p.stdin.close()
113            p.stdout.close()
114            p.stderr.close()
115            p.wait()
116
117    def test_io_unbuffered_works(self):
118        p = subprocess.Popen(ZERO_RETURN_CMD,
119                             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
120                             stderr=subprocess.PIPE, bufsize=0)
121        try:
122            self.assertIsInstance(p.stdin, io.RawIOBase)
123            self.assertIsInstance(p.stdout, io.RawIOBase)
124            self.assertIsInstance(p.stderr, io.RawIOBase)
125        finally:
126            p.stdin.close()
127            p.stdout.close()
128            p.stderr.close()
129            p.wait()
130
131    def test_call_seq(self):
132        # call() function with sequence argument
133        rc = subprocess.call([sys.executable, "-c",
134                              "import sys; sys.exit(47)"])
135        self.assertEqual(rc, 47)
136
137    def test_call_timeout(self):
138        # call() function with timeout argument; we want to test that the child
139        # process gets killed when the timeout expires.  If the child isn't
140        # killed, this call will deadlock since subprocess.call waits for the
141        # child.
142        self.assertRaises(subprocess.TimeoutExpired, subprocess.call,
143                          [sys.executable, "-c", "while True: pass"],
144                          timeout=0.1)
145
146    def test_check_call_zero(self):
147        # check_call() function with zero return code
148        rc = subprocess.check_call(ZERO_RETURN_CMD)
149        self.assertEqual(rc, 0)
150
151    def test_check_call_nonzero(self):
152        # check_call() function with non-zero return code
153        with self.assertRaises(subprocess.CalledProcessError) as c:
154            subprocess.check_call([sys.executable, "-c",
155                                   "import sys; sys.exit(47)"])
156        self.assertEqual(c.exception.returncode, 47)
157
158    def test_check_output(self):
159        # check_output() function with zero return code
160        output = subprocess.check_output(
161                [sys.executable, "-c", "print('BDFL')"])
162        self.assertIn(b'BDFL', output)
163
164    def test_check_output_nonzero(self):
165        # check_call() function with non-zero return code
166        with self.assertRaises(subprocess.CalledProcessError) as c:
167            subprocess.check_output(
168                    [sys.executable, "-c", "import sys; sys.exit(5)"])
169        self.assertEqual(c.exception.returncode, 5)
170
171    def test_check_output_stderr(self):
172        # check_output() function stderr redirected to stdout
173        output = subprocess.check_output(
174                [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"],
175                stderr=subprocess.STDOUT)
176        self.assertIn(b'BDFL', output)
177
178    def test_check_output_stdin_arg(self):
179        # check_output() can be called with stdin set to a file
180        tf = tempfile.TemporaryFile()
181        self.addCleanup(tf.close)
182        tf.write(b'pear')
183        tf.seek(0)
184        output = subprocess.check_output(
185                [sys.executable, "-c",
186                 "import sys; sys.stdout.write(sys.stdin.read().upper())"],
187                stdin=tf)
188        self.assertIn(b'PEAR', output)
189
190    def test_check_output_input_arg(self):
191        # check_output() can be called with input set to a string
192        output = subprocess.check_output(
193                [sys.executable, "-c",
194                 "import sys; sys.stdout.write(sys.stdin.read().upper())"],
195                input=b'pear')
196        self.assertIn(b'PEAR', output)
197
198    def test_check_output_stdout_arg(self):
199        # check_output() refuses to accept 'stdout' argument
200        with self.assertRaises(ValueError) as c:
201            output = subprocess.check_output(
202                    [sys.executable, "-c", "print('will not be run')"],
203                    stdout=sys.stdout)
204            self.fail("Expected ValueError when stdout arg supplied.")
205        self.assertIn('stdout', c.exception.args[0])
206
207    def test_check_output_stdin_with_input_arg(self):
208        # check_output() refuses to accept 'stdin' with 'input'
209        tf = tempfile.TemporaryFile()
210        self.addCleanup(tf.close)
211        tf.write(b'pear')
212        tf.seek(0)
213        with self.assertRaises(ValueError) as c:
214            output = subprocess.check_output(
215                    [sys.executable, "-c", "print('will not be run')"],
216                    stdin=tf, input=b'hare')
217            self.fail("Expected ValueError when stdin and input args supplied.")
218        self.assertIn('stdin', c.exception.args[0])
219        self.assertIn('input', c.exception.args[0])
220
221    def test_check_output_timeout(self):
222        # check_output() function with timeout arg
223        with self.assertRaises(subprocess.TimeoutExpired) as c:
224            output = subprocess.check_output(
225                    [sys.executable, "-c",
226                     "import sys, time\n"
227                     "sys.stdout.write('BDFL')\n"
228                     "sys.stdout.flush()\n"
229                     "time.sleep(3600)"],
230                    # Some heavily loaded buildbots (sparc Debian 3.x) require
231                    # this much time to start and print.
232                    timeout=3)
233            self.fail("Expected TimeoutExpired.")
234        self.assertEqual(c.exception.output, b'BDFL')
235
236    def test_call_kwargs(self):
237        # call() function with keyword args
238        newenv = os.environ.copy()
239        newenv["FRUIT"] = "banana"
240        rc = subprocess.call([sys.executable, "-c",
241                              'import sys, os;'
242                              'sys.exit(os.getenv("FRUIT")=="banana")'],
243                             env=newenv)
244        self.assertEqual(rc, 1)
245
246    def test_invalid_args(self):
247        # Popen() called with invalid arguments should raise TypeError
248        # but Popen.__del__ should not complain (issue #12085)
249        with support.captured_stderr() as s:
250            self.assertRaises(TypeError, subprocess.Popen, invalid_arg_name=1)
251            argcount = subprocess.Popen.__init__.__code__.co_argcount
252            too_many_args = [0] * (argcount + 1)
253            self.assertRaises(TypeError, subprocess.Popen, *too_many_args)
254        self.assertEqual(s.getvalue(), '')
255
256    def test_stdin_none(self):
257        # .stdin is None when not redirected
258        p = subprocess.Popen([sys.executable, "-c", 'print("banana")'],
259                         stdout=subprocess.PIPE, stderr=subprocess.PIPE)
260        self.addCleanup(p.stdout.close)
261        self.addCleanup(p.stderr.close)
262        p.wait()
263        self.assertEqual(p.stdin, None)
264
265    def test_stdout_none(self):
266        # .stdout is None when not redirected, and the child's stdout will
267        # be inherited from the parent.  In order to test this we run a
268        # subprocess in a subprocess:
269        # this_test
270        #   \-- subprocess created by this test (parent)
271        #          \-- subprocess created by the parent subprocess (child)
272        # The parent doesn't specify stdout, so the child will use the
273        # parent's stdout.  This test checks that the message printed by the
274        # child goes to the parent stdout.  The parent also checks that the
275        # child's stdout is None.  See #11963.
276        code = ('import sys; from subprocess import Popen, PIPE;'
277                'p = Popen([sys.executable, "-c", "print(\'test_stdout_none\')"],'
278                '          stdin=PIPE, stderr=PIPE);'
279                'p.wait(); assert p.stdout is None;')
280        p = subprocess.Popen([sys.executable, "-c", code],
281                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
282        self.addCleanup(p.stdout.close)
283        self.addCleanup(p.stderr.close)
284        out, err = p.communicate()
285        self.assertEqual(p.returncode, 0, err)
286        self.assertEqual(out.rstrip(), b'test_stdout_none')
287
288    def test_stderr_none(self):
289        # .stderr is None when not redirected
290        p = subprocess.Popen([sys.executable, "-c", 'print("banana")'],
291                         stdin=subprocess.PIPE, stdout=subprocess.PIPE)
292        self.addCleanup(p.stdout.close)
293        self.addCleanup(p.stdin.close)
294        p.wait()
295        self.assertEqual(p.stderr, None)
296
297    def _assert_python(self, pre_args, **kwargs):
298        # We include sys.exit() to prevent the test runner from hanging
299        # whenever python is found.
300        args = pre_args + ["import sys; sys.exit(47)"]
301        p = subprocess.Popen(args, **kwargs)
302        p.wait()
303        self.assertEqual(47, p.returncode)
304
305    def test_executable(self):
306        # Check that the executable argument works.
307        #
308        # On Unix (non-Mac and non-Windows), Python looks at args[0] to
309        # determine where its standard library is, so we need the directory
310        # of args[0] to be valid for the Popen() call to Python to succeed.
311        # See also issue #16170 and issue #7774.
312        doesnotexist = os.path.join(os.path.dirname(sys.executable),
313                                    "doesnotexist")
314        self._assert_python([doesnotexist, "-c"], executable=sys.executable)
315
316    def test_bytes_executable(self):
317        doesnotexist = os.path.join(os.path.dirname(sys.executable),
318                                    "doesnotexist")
319        self._assert_python([doesnotexist, "-c"],
320                            executable=os.fsencode(sys.executable))
321
322    def test_pathlike_executable(self):
323        doesnotexist = os.path.join(os.path.dirname(sys.executable),
324                                    "doesnotexist")
325        self._assert_python([doesnotexist, "-c"],
326                            executable=FakePath(sys.executable))
327
328    def test_executable_takes_precedence(self):
329        # Check that the executable argument takes precedence over args[0].
330        #
331        # Verify first that the call succeeds without the executable arg.
332        pre_args = [sys.executable, "-c"]
333        self._assert_python(pre_args)
334        self.assertRaises(NONEXISTING_ERRORS,
335                          self._assert_python, pre_args,
336                          executable=NONEXISTING_CMD[0])
337
338    @unittest.skipIf(mswindows, "executable argument replaces shell")
339    def test_executable_replaces_shell(self):
340        # Check that the executable argument replaces the default shell
341        # when shell=True.
342        self._assert_python([], executable=sys.executable, shell=True)
343
344    @unittest.skipIf(mswindows, "executable argument replaces shell")
345    def test_bytes_executable_replaces_shell(self):
346        self._assert_python([], executable=os.fsencode(sys.executable),
347                            shell=True)
348
349    @unittest.skipIf(mswindows, "executable argument replaces shell")
350    def test_pathlike_executable_replaces_shell(self):
351        self._assert_python([], executable=FakePath(sys.executable),
352                            shell=True)
353
354    # For use in the test_cwd* tests below.
355    def _normalize_cwd(self, cwd):
356        # Normalize an expected cwd (for Tru64 support).
357        # We can't use os.path.realpath since it doesn't expand Tru64 {memb}
358        # strings.  See bug #1063571.
359        with support.change_cwd(cwd):
360            return os.getcwd()
361
362    # For use in the test_cwd* tests below.
363    def _split_python_path(self):
364        # Return normalized (python_dir, python_base).
365        python_path = os.path.realpath(sys.executable)
366        return os.path.split(python_path)
367
368    # For use in the test_cwd* tests below.
369    def _assert_cwd(self, expected_cwd, python_arg, **kwargs):
370        # Invoke Python via Popen, and assert that (1) the call succeeds,
371        # and that (2) the current working directory of the child process
372        # matches *expected_cwd*.
373        p = subprocess.Popen([python_arg, "-c",
374                              "import os, sys; "
375                              "sys.stdout.write(os.getcwd()); "
376                              "sys.exit(47)"],
377                              stdout=subprocess.PIPE,
378                              **kwargs)
379        self.addCleanup(p.stdout.close)
380        p.wait()
381        self.assertEqual(47, p.returncode)
382        normcase = os.path.normcase
383        self.assertEqual(normcase(expected_cwd),
384                         normcase(p.stdout.read().decode("utf-8")))
385
386    def test_cwd(self):
387        # Check that cwd changes the cwd for the child process.
388        temp_dir = tempfile.gettempdir()
389        temp_dir = self._normalize_cwd(temp_dir)
390        self._assert_cwd(temp_dir, sys.executable, cwd=temp_dir)
391
392    def test_cwd_with_bytes(self):
393        temp_dir = tempfile.gettempdir()
394        temp_dir = self._normalize_cwd(temp_dir)
395        self._assert_cwd(temp_dir, sys.executable, cwd=os.fsencode(temp_dir))
396
397    def test_cwd_with_pathlike(self):
398        temp_dir = tempfile.gettempdir()
399        temp_dir = self._normalize_cwd(temp_dir)
400        self._assert_cwd(temp_dir, sys.executable, cwd=FakePath(temp_dir))
401
402    @unittest.skipIf(mswindows, "pending resolution of issue #15533")
403    def test_cwd_with_relative_arg(self):
404        # Check that Popen looks for args[0] relative to cwd if args[0]
405        # is relative.
406        python_dir, python_base = self._split_python_path()
407        rel_python = os.path.join(os.curdir, python_base)
408        with support.temp_cwd() as wrong_dir:
409            # Before calling with the correct cwd, confirm that the call fails
410            # without cwd and with the wrong cwd.
411            self.assertRaises(FileNotFoundError, subprocess.Popen,
412                              [rel_python])
413            self.assertRaises(FileNotFoundError, subprocess.Popen,
414                              [rel_python], cwd=wrong_dir)
415            python_dir = self._normalize_cwd(python_dir)
416            self._assert_cwd(python_dir, rel_python, cwd=python_dir)
417
418    @unittest.skipIf(mswindows, "pending resolution of issue #15533")
419    def test_cwd_with_relative_executable(self):
420        # Check that Popen looks for executable relative to cwd if executable
421        # is relative (and that executable takes precedence over args[0]).
422        python_dir, python_base = self._split_python_path()
423        rel_python = os.path.join(os.curdir, python_base)
424        doesntexist = "somethingyoudonthave"
425        with support.temp_cwd() as wrong_dir:
426            # Before calling with the correct cwd, confirm that the call fails
427            # without cwd and with the wrong cwd.
428            self.assertRaises(FileNotFoundError, subprocess.Popen,
429                              [doesntexist], executable=rel_python)
430            self.assertRaises(FileNotFoundError, subprocess.Popen,
431                              [doesntexist], executable=rel_python,
432                              cwd=wrong_dir)
433            python_dir = self._normalize_cwd(python_dir)
434            self._assert_cwd(python_dir, doesntexist, executable=rel_python,
435                             cwd=python_dir)
436
437    def test_cwd_with_absolute_arg(self):
438        # Check that Popen can find the executable when the cwd is wrong
439        # if args[0] is an absolute path.
440        python_dir, python_base = self._split_python_path()
441        abs_python = os.path.join(python_dir, python_base)
442        rel_python = os.path.join(os.curdir, python_base)
443        with support.temp_dir() as wrong_dir:
444            # Before calling with an absolute path, confirm that using a
445            # relative path fails.
446            self.assertRaises(FileNotFoundError, subprocess.Popen,
447                              [rel_python], cwd=wrong_dir)
448            wrong_dir = self._normalize_cwd(wrong_dir)
449            self._assert_cwd(wrong_dir, abs_python, cwd=wrong_dir)
450
451    @unittest.skipIf(sys.base_prefix != sys.prefix,
452                     'Test is not venv-compatible')
453    def test_executable_with_cwd(self):
454        python_dir, python_base = self._split_python_path()
455        python_dir = self._normalize_cwd(python_dir)
456        self._assert_cwd(python_dir, "somethingyoudonthave",
457                         executable=sys.executable, cwd=python_dir)
458
459    @unittest.skipIf(sys.base_prefix != sys.prefix,
460                     'Test is not venv-compatible')
461    @unittest.skipIf(sysconfig.is_python_build(),
462                     "need an installed Python. See #7774")
463    def test_executable_without_cwd(self):
464        # For a normal installation, it should work without 'cwd'
465        # argument.  For test runs in the build directory, see #7774.
466        self._assert_cwd(os.getcwd(), "somethingyoudonthave",
467                         executable=sys.executable)
468
469    def test_stdin_pipe(self):
470        # stdin redirection
471        p = subprocess.Popen([sys.executable, "-c",
472                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
473                        stdin=subprocess.PIPE)
474        p.stdin.write(b"pear")
475        p.stdin.close()
476        p.wait()
477        self.assertEqual(p.returncode, 1)
478
479    def test_stdin_filedes(self):
480        # stdin is set to open file descriptor
481        tf = tempfile.TemporaryFile()
482        self.addCleanup(tf.close)
483        d = tf.fileno()
484        os.write(d, b"pear")
485        os.lseek(d, 0, 0)
486        p = subprocess.Popen([sys.executable, "-c",
487                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
488                         stdin=d)
489        p.wait()
490        self.assertEqual(p.returncode, 1)
491
492    def test_stdin_fileobj(self):
493        # stdin is set to open file object
494        tf = tempfile.TemporaryFile()
495        self.addCleanup(tf.close)
496        tf.write(b"pear")
497        tf.seek(0)
498        p = subprocess.Popen([sys.executable, "-c",
499                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
500                         stdin=tf)
501        p.wait()
502        self.assertEqual(p.returncode, 1)
503
504    def test_stdout_pipe(self):
505        # stdout redirection
506        p = subprocess.Popen([sys.executable, "-c",
507                          'import sys; sys.stdout.write("orange")'],
508                         stdout=subprocess.PIPE)
509        with p:
510            self.assertEqual(p.stdout.read(), b"orange")
511
512    def test_stdout_filedes(self):
513        # stdout is set to open file descriptor
514        tf = tempfile.TemporaryFile()
515        self.addCleanup(tf.close)
516        d = tf.fileno()
517        p = subprocess.Popen([sys.executable, "-c",
518                          'import sys; sys.stdout.write("orange")'],
519                         stdout=d)
520        p.wait()
521        os.lseek(d, 0, 0)
522        self.assertEqual(os.read(d, 1024), b"orange")
523
524    def test_stdout_fileobj(self):
525        # stdout is set to open file object
526        tf = tempfile.TemporaryFile()
527        self.addCleanup(tf.close)
528        p = subprocess.Popen([sys.executable, "-c",
529                          'import sys; sys.stdout.write("orange")'],
530                         stdout=tf)
531        p.wait()
532        tf.seek(0)
533        self.assertEqual(tf.read(), b"orange")
534
535    def test_stderr_pipe(self):
536        # stderr redirection
537        p = subprocess.Popen([sys.executable, "-c",
538                          'import sys; sys.stderr.write("strawberry")'],
539                         stderr=subprocess.PIPE)
540        with p:
541            self.assertStderrEqual(p.stderr.read(), b"strawberry")
542
543    def test_stderr_filedes(self):
544        # stderr is set to open file descriptor
545        tf = tempfile.TemporaryFile()
546        self.addCleanup(tf.close)
547        d = tf.fileno()
548        p = subprocess.Popen([sys.executable, "-c",
549                          'import sys; sys.stderr.write("strawberry")'],
550                         stderr=d)
551        p.wait()
552        os.lseek(d, 0, 0)
553        self.assertStderrEqual(os.read(d, 1024), b"strawberry")
554
555    def test_stderr_fileobj(self):
556        # stderr is set to open file object
557        tf = tempfile.TemporaryFile()
558        self.addCleanup(tf.close)
559        p = subprocess.Popen([sys.executable, "-c",
560                          'import sys; sys.stderr.write("strawberry")'],
561                         stderr=tf)
562        p.wait()
563        tf.seek(0)
564        self.assertStderrEqual(tf.read(), b"strawberry")
565
566    def test_stderr_redirect_with_no_stdout_redirect(self):
567        # test stderr=STDOUT while stdout=None (not set)
568
569        # - grandchild prints to stderr
570        # - child redirects grandchild's stderr to its stdout
571        # - the parent should get grandchild's stderr in child's stdout
572        p = subprocess.Popen([sys.executable, "-c",
573                              'import sys, subprocess;'
574                              'rc = subprocess.call([sys.executable, "-c",'
575                              '    "import sys;"'
576                              '    "sys.stderr.write(\'42\')"],'
577                              '    stderr=subprocess.STDOUT);'
578                              'sys.exit(rc)'],
579                             stdout=subprocess.PIPE,
580                             stderr=subprocess.PIPE)
581        stdout, stderr = p.communicate()
582        #NOTE: stdout should get stderr from grandchild
583        self.assertStderrEqual(stdout, b'42')
584        self.assertStderrEqual(stderr, b'') # should be empty
585        self.assertEqual(p.returncode, 0)
586
587    def test_stdout_stderr_pipe(self):
588        # capture stdout and stderr to the same pipe
589        p = subprocess.Popen([sys.executable, "-c",
590                              'import sys;'
591                              'sys.stdout.write("apple");'
592                              'sys.stdout.flush();'
593                              'sys.stderr.write("orange")'],
594                             stdout=subprocess.PIPE,
595                             stderr=subprocess.STDOUT)
596        with p:
597            self.assertStderrEqual(p.stdout.read(), b"appleorange")
598
599    def test_stdout_stderr_file(self):
600        # capture stdout and stderr to the same open file
601        tf = tempfile.TemporaryFile()
602        self.addCleanup(tf.close)
603        p = subprocess.Popen([sys.executable, "-c",
604                              'import sys;'
605                              'sys.stdout.write("apple");'
606                              'sys.stdout.flush();'
607                              'sys.stderr.write("orange")'],
608                             stdout=tf,
609                             stderr=tf)
610        p.wait()
611        tf.seek(0)
612        self.assertStderrEqual(tf.read(), b"appleorange")
613
614    def test_stdout_filedes_of_stdout(self):
615        # stdout is set to 1 (#1531862).
616        # To avoid printing the text on stdout, we do something similar to
617        # test_stdout_none (see above).  The parent subprocess calls the child
618        # subprocess passing stdout=1, and this test uses stdout=PIPE in
619        # order to capture and check the output of the parent. See #11963.
620        code = ('import sys, subprocess; '
621                'rc = subprocess.call([sys.executable, "-c", '
622                '    "import os, sys; sys.exit(os.write(sys.stdout.fileno(), '
623                     'b\'test with stdout=1\'))"], stdout=1); '
624                'assert rc == 18')
625        p = subprocess.Popen([sys.executable, "-c", code],
626                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
627        self.addCleanup(p.stdout.close)
628        self.addCleanup(p.stderr.close)
629        out, err = p.communicate()
630        self.assertEqual(p.returncode, 0, err)
631        self.assertEqual(out.rstrip(), b'test with stdout=1')
632
633    def test_stdout_devnull(self):
634        p = subprocess.Popen([sys.executable, "-c",
635                              'for i in range(10240):'
636                              'print("x" * 1024)'],
637                              stdout=subprocess.DEVNULL)
638        p.wait()
639        self.assertEqual(p.stdout, None)
640
641    def test_stderr_devnull(self):
642        p = subprocess.Popen([sys.executable, "-c",
643                              'import sys\n'
644                              'for i in range(10240):'
645                              'sys.stderr.write("x" * 1024)'],
646                              stderr=subprocess.DEVNULL)
647        p.wait()
648        self.assertEqual(p.stderr, None)
649
650    def test_stdin_devnull(self):
651        p = subprocess.Popen([sys.executable, "-c",
652                              'import sys;'
653                              'sys.stdin.read(1)'],
654                              stdin=subprocess.DEVNULL)
655        p.wait()
656        self.assertEqual(p.stdin, None)
657
658    def test_env(self):
659        newenv = os.environ.copy()
660        newenv["FRUIT"] = "orange"
661        with subprocess.Popen([sys.executable, "-c",
662                               'import sys,os;'
663                               'sys.stdout.write(os.getenv("FRUIT"))'],
664                              stdout=subprocess.PIPE,
665                              env=newenv) as p:
666            stdout, stderr = p.communicate()
667            self.assertEqual(stdout, b"orange")
668
669    # Windows requires at least the SYSTEMROOT environment variable to start
670    # Python
671    @unittest.skipIf(sys.platform == 'win32',
672                     'cannot test an empty env on Windows')
673    @unittest.skipIf(sysconfig.get_config_var('Py_ENABLE_SHARED') == 1,
674                     'The Python shared library cannot be loaded '
675                     'with an empty environment.')
676    def test_empty_env(self):
677        """Verify that env={} is as empty as possible."""
678
679        def is_env_var_to_ignore(n):
680            """Determine if an environment variable is under our control."""
681            # This excludes some __CF_* and VERSIONER_* keys MacOS insists
682            # on adding even when the environment in exec is empty.
683            # Gentoo sandboxes also force LD_PRELOAD and SANDBOX_* to exist.
684            return ('VERSIONER' in n or '__CF' in n or  # MacOS
685                    n == 'LD_PRELOAD' or n.startswith('SANDBOX') or # Gentoo
686                    n == 'LC_CTYPE') # Locale coercion triggered
687
688        with subprocess.Popen([sys.executable, "-c",
689                               'import os; print(list(os.environ.keys()))'],
690                              stdout=subprocess.PIPE, env={}) as p:
691            stdout, stderr = p.communicate()
692            child_env_names = eval(stdout.strip())
693            self.assertIsInstance(child_env_names, list)
694            child_env_names = [k for k in child_env_names
695                               if not is_env_var_to_ignore(k)]
696            self.assertEqual(child_env_names, [])
697
698    def test_invalid_cmd(self):
699        # null character in the command name
700        cmd = sys.executable + '\0'
701        with self.assertRaises(ValueError):
702            subprocess.Popen([cmd, "-c", "pass"])
703
704        # null character in the command argument
705        with self.assertRaises(ValueError):
706            subprocess.Popen([sys.executable, "-c", "pass#\0"])
707
708    def test_invalid_env(self):
709        # null character in the environment variable name
710        newenv = os.environ.copy()
711        newenv["FRUIT\0VEGETABLE"] = "cabbage"
712        with self.assertRaises(ValueError):
713            subprocess.Popen(ZERO_RETURN_CMD, env=newenv)
714
715        # null character in the environment variable value
716        newenv = os.environ.copy()
717        newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
718        with self.assertRaises(ValueError):
719            subprocess.Popen(ZERO_RETURN_CMD, env=newenv)
720
721        # equal character in the environment variable name
722        newenv = os.environ.copy()
723        newenv["FRUIT=ORANGE"] = "lemon"
724        with self.assertRaises(ValueError):
725            subprocess.Popen(ZERO_RETURN_CMD, env=newenv)
726
727        # equal character in the environment variable value
728        newenv = os.environ.copy()
729        newenv["FRUIT"] = "orange=lemon"
730        with subprocess.Popen([sys.executable, "-c",
731                               'import sys, os;'
732                               'sys.stdout.write(os.getenv("FRUIT"))'],
733                              stdout=subprocess.PIPE,
734                              env=newenv) as p:
735            stdout, stderr = p.communicate()
736            self.assertEqual(stdout, b"orange=lemon")
737
738    def test_communicate_stdin(self):
739        p = subprocess.Popen([sys.executable, "-c",
740                              'import sys;'
741                              'sys.exit(sys.stdin.read() == "pear")'],
742                             stdin=subprocess.PIPE)
743        p.communicate(b"pear")
744        self.assertEqual(p.returncode, 1)
745
746    def test_communicate_stdout(self):
747        p = subprocess.Popen([sys.executable, "-c",
748                              'import sys; sys.stdout.write("pineapple")'],
749                             stdout=subprocess.PIPE)
750        (stdout, stderr) = p.communicate()
751        self.assertEqual(stdout, b"pineapple")
752        self.assertEqual(stderr, None)
753
754    def test_communicate_stderr(self):
755        p = subprocess.Popen([sys.executable, "-c",
756                              'import sys; sys.stderr.write("pineapple")'],
757                             stderr=subprocess.PIPE)
758        (stdout, stderr) = p.communicate()
759        self.assertEqual(stdout, None)
760        self.assertStderrEqual(stderr, b"pineapple")
761
762    def test_communicate(self):
763        p = subprocess.Popen([sys.executable, "-c",
764                              'import sys,os;'
765                              'sys.stderr.write("pineapple");'
766                              'sys.stdout.write(sys.stdin.read())'],
767                             stdin=subprocess.PIPE,
768                             stdout=subprocess.PIPE,
769                             stderr=subprocess.PIPE)
770        self.addCleanup(p.stdout.close)
771        self.addCleanup(p.stderr.close)
772        self.addCleanup(p.stdin.close)
773        (stdout, stderr) = p.communicate(b"banana")
774        self.assertEqual(stdout, b"banana")
775        self.assertStderrEqual(stderr, b"pineapple")
776
777    def test_communicate_timeout(self):
778        p = subprocess.Popen([sys.executable, "-c",
779                              'import sys,os,time;'
780                              'sys.stderr.write("pineapple\\n");'
781                              'time.sleep(1);'
782                              'sys.stderr.write("pear\\n");'
783                              'sys.stdout.write(sys.stdin.read())'],
784                             universal_newlines=True,
785                             stdin=subprocess.PIPE,
786                             stdout=subprocess.PIPE,
787                             stderr=subprocess.PIPE)
788        self.assertRaises(subprocess.TimeoutExpired, p.communicate, "banana",
789                          timeout=0.3)
790        # Make sure we can keep waiting for it, and that we get the whole output
791        # after it completes.
792        (stdout, stderr) = p.communicate()
793        self.assertEqual(stdout, "banana")
794        self.assertStderrEqual(stderr.encode(), b"pineapple\npear\n")
795
796    def test_communicate_timeout_large_output(self):
797        # Test an expiring timeout while the child is outputting lots of data.
798        p = subprocess.Popen([sys.executable, "-c",
799                              'import sys,os,time;'
800                              'sys.stdout.write("a" * (64 * 1024));'
801                              'time.sleep(0.2);'
802                              'sys.stdout.write("a" * (64 * 1024));'
803                              'time.sleep(0.2);'
804                              'sys.stdout.write("a" * (64 * 1024));'
805                              'time.sleep(0.2);'
806                              'sys.stdout.write("a" * (64 * 1024));'],
807                             stdout=subprocess.PIPE)
808        self.assertRaises(subprocess.TimeoutExpired, p.communicate, timeout=0.4)
809        (stdout, _) = p.communicate()
810        self.assertEqual(len(stdout), 4 * 64 * 1024)
811
812    # Test for the fd leak reported in http://bugs.python.org/issue2791.
813    def test_communicate_pipe_fd_leak(self):
814        for stdin_pipe in (False, True):
815            for stdout_pipe in (False, True):
816                for stderr_pipe in (False, True):
817                    options = {}
818                    if stdin_pipe:
819                        options['stdin'] = subprocess.PIPE
820                    if stdout_pipe:
821                        options['stdout'] = subprocess.PIPE
822                    if stderr_pipe:
823                        options['stderr'] = subprocess.PIPE
824                    if not options:
825                        continue
826                    p = subprocess.Popen(ZERO_RETURN_CMD, **options)
827                    p.communicate()
828                    if p.stdin is not None:
829                        self.assertTrue(p.stdin.closed)
830                    if p.stdout is not None:
831                        self.assertTrue(p.stdout.closed)
832                    if p.stderr is not None:
833                        self.assertTrue(p.stderr.closed)
834
835    def test_communicate_returns(self):
836        # communicate() should return None if no redirection is active
837        p = subprocess.Popen([sys.executable, "-c",
838                              "import sys; sys.exit(47)"])
839        (stdout, stderr) = p.communicate()
840        self.assertEqual(stdout, None)
841        self.assertEqual(stderr, None)
842
843    def test_communicate_pipe_buf(self):
844        # communicate() with writes larger than pipe_buf
845        # This test will probably deadlock rather than fail, if
846        # communicate() does not work properly.
847        x, y = os.pipe()
848        os.close(x)
849        os.close(y)
850        p = subprocess.Popen([sys.executable, "-c",
851                              'import sys,os;'
852                              'sys.stdout.write(sys.stdin.read(47));'
853                              'sys.stderr.write("x" * %d);'
854                              'sys.stdout.write(sys.stdin.read())' %
855                              support.PIPE_MAX_SIZE],
856                             stdin=subprocess.PIPE,
857                             stdout=subprocess.PIPE,
858                             stderr=subprocess.PIPE)
859        self.addCleanup(p.stdout.close)
860        self.addCleanup(p.stderr.close)
861        self.addCleanup(p.stdin.close)
862        string_to_write = b"a" * support.PIPE_MAX_SIZE
863        (stdout, stderr) = p.communicate(string_to_write)
864        self.assertEqual(stdout, string_to_write)
865
866    def test_writes_before_communicate(self):
867        # stdin.write before communicate()
868        p = subprocess.Popen([sys.executable, "-c",
869                              'import sys,os;'
870                              'sys.stdout.write(sys.stdin.read())'],
871                             stdin=subprocess.PIPE,
872                             stdout=subprocess.PIPE,
873                             stderr=subprocess.PIPE)
874        self.addCleanup(p.stdout.close)
875        self.addCleanup(p.stderr.close)
876        self.addCleanup(p.stdin.close)
877        p.stdin.write(b"banana")
878        (stdout, stderr) = p.communicate(b"split")
879        self.assertEqual(stdout, b"bananasplit")
880        self.assertStderrEqual(stderr, b"")
881
882    def test_universal_newlines_and_text(self):
883        args = [
884            sys.executable, "-c",
885            'import sys,os;' + SETBINARY +
886            'buf = sys.stdout.buffer;'
887            'buf.write(sys.stdin.readline().encode());'
888            'buf.flush();'
889            'buf.write(b"line2\\n");'
890            'buf.flush();'
891            'buf.write(sys.stdin.read().encode());'
892            'buf.flush();'
893            'buf.write(b"line4\\n");'
894            'buf.flush();'
895            'buf.write(b"line5\\r\\n");'
896            'buf.flush();'
897            'buf.write(b"line6\\r");'
898            'buf.flush();'
899            'buf.write(b"\\nline7");'
900            'buf.flush();'
901            'buf.write(b"\\nline8");']
902
903        for extra_kwarg in ('universal_newlines', 'text'):
904            p = subprocess.Popen(args, **{'stdin': subprocess.PIPE,
905                                          'stdout': subprocess.PIPE,
906                                          extra_kwarg: True})
907            with p:
908                p.stdin.write("line1\n")
909                p.stdin.flush()
910                self.assertEqual(p.stdout.readline(), "line1\n")
911                p.stdin.write("line3\n")
912                p.stdin.close()
913                self.addCleanup(p.stdout.close)
914                self.assertEqual(p.stdout.readline(),
915                                 "line2\n")
916                self.assertEqual(p.stdout.read(6),
917                                 "line3\n")
918                self.assertEqual(p.stdout.read(),
919                                 "line4\nline5\nline6\nline7\nline8")
920
921    def test_universal_newlines_communicate(self):
922        # universal newlines through communicate()
923        p = subprocess.Popen([sys.executable, "-c",
924                              'import sys,os;' + SETBINARY +
925                              'buf = sys.stdout.buffer;'
926                              'buf.write(b"line2\\n");'
927                              'buf.flush();'
928                              'buf.write(b"line4\\n");'
929                              'buf.flush();'
930                              'buf.write(b"line5\\r\\n");'
931                              'buf.flush();'
932                              'buf.write(b"line6\\r");'
933                              'buf.flush();'
934                              'buf.write(b"\\nline7");'
935                              'buf.flush();'
936                              'buf.write(b"\\nline8");'],
937                             stderr=subprocess.PIPE,
938                             stdout=subprocess.PIPE,
939                             universal_newlines=1)
940        self.addCleanup(p.stdout.close)
941        self.addCleanup(p.stderr.close)
942        (stdout, stderr) = p.communicate()
943        self.assertEqual(stdout,
944                         "line2\nline4\nline5\nline6\nline7\nline8")
945
946    def test_universal_newlines_communicate_stdin(self):
947        # universal newlines through communicate(), with only stdin
948        p = subprocess.Popen([sys.executable, "-c",
949                              'import sys,os;' + SETBINARY + textwrap.dedent('''
950                               s = sys.stdin.readline()
951                               assert s == "line1\\n", repr(s)
952                               s = sys.stdin.read()
953                               assert s == "line3\\n", repr(s)
954                              ''')],
955                             stdin=subprocess.PIPE,
956                             universal_newlines=1)
957        (stdout, stderr) = p.communicate("line1\nline3\n")
958        self.assertEqual(p.returncode, 0)
959
960    def test_universal_newlines_communicate_input_none(self):
961        # Test communicate(input=None) with universal newlines.
962        #
963        # We set stdout to PIPE because, as of this writing, a different
964        # code path is tested when the number of pipes is zero or one.
965        p = subprocess.Popen(ZERO_RETURN_CMD,
966                             stdin=subprocess.PIPE,
967                             stdout=subprocess.PIPE,
968                             universal_newlines=True)
969        p.communicate()
970        self.assertEqual(p.returncode, 0)
971
972    def test_universal_newlines_communicate_stdin_stdout_stderr(self):
973        # universal newlines through communicate(), with stdin, stdout, stderr
974        p = subprocess.Popen([sys.executable, "-c",
975                              'import sys,os;' + SETBINARY + textwrap.dedent('''
976                               s = sys.stdin.buffer.readline()
977                               sys.stdout.buffer.write(s)
978                               sys.stdout.buffer.write(b"line2\\r")
979                               sys.stderr.buffer.write(b"eline2\\n")
980                               s = sys.stdin.buffer.read()
981                               sys.stdout.buffer.write(s)
982                               sys.stdout.buffer.write(b"line4\\n")
983                               sys.stdout.buffer.write(b"line5\\r\\n")
984                               sys.stderr.buffer.write(b"eline6\\r")
985                               sys.stderr.buffer.write(b"eline7\\r\\nz")
986                              ''')],
987                             stdin=subprocess.PIPE,
988                             stderr=subprocess.PIPE,
989                             stdout=subprocess.PIPE,
990                             universal_newlines=True)
991        self.addCleanup(p.stdout.close)
992        self.addCleanup(p.stderr.close)
993        (stdout, stderr) = p.communicate("line1\nline3\n")
994        self.assertEqual(p.returncode, 0)
995        self.assertEqual("line1\nline2\nline3\nline4\nline5\n", stdout)
996        # Python debug build push something like "[42442 refs]\n"
997        # to stderr at exit of subprocess.
998        # Don't use assertStderrEqual because it strips CR and LF from output.
999        self.assertTrue(stderr.startswith("eline2\neline6\neline7\n"))
1000
1001    def test_universal_newlines_communicate_encodings(self):
1002        # Check that universal newlines mode works for various encodings,
1003        # in particular for encodings in the UTF-16 and UTF-32 families.
1004        # See issue #15595.
1005        #
1006        # UTF-16 and UTF-32-BE are sufficient to check both with BOM and
1007        # without, and UTF-16 and UTF-32.
1008        for encoding in ['utf-16', 'utf-32-be']:
1009            code = ("import sys; "
1010                    r"sys.stdout.buffer.write('1\r\n2\r3\n4'.encode('%s'))" %
1011                    encoding)
1012            args = [sys.executable, '-c', code]
1013            # We set stdin to be non-None because, as of this writing,
1014            # a different code path is used when the number of pipes is
1015            # zero or one.
1016            popen = subprocess.Popen(args,
1017                                     stdin=subprocess.PIPE,
1018                                     stdout=subprocess.PIPE,
1019                                     encoding=encoding)
1020            stdout, stderr = popen.communicate(input='')
1021            self.assertEqual(stdout, '1\n2\n3\n4')
1022
1023    def test_communicate_errors(self):
1024        for errors, expected in [
1025            ('ignore', ''),
1026            ('replace', '\ufffd\ufffd'),
1027            ('surrogateescape', '\udc80\udc80'),
1028            ('backslashreplace', '\\x80\\x80'),
1029        ]:
1030            code = ("import sys; "
1031                    r"sys.stdout.buffer.write(b'[\x80\x80]')")
1032            args = [sys.executable, '-c', code]
1033            # We set stdin to be non-None because, as of this writing,
1034            # a different code path is used when the number of pipes is
1035            # zero or one.
1036            popen = subprocess.Popen(args,
1037                                     stdin=subprocess.PIPE,
1038                                     stdout=subprocess.PIPE,
1039                                     encoding='utf-8',
1040                                     errors=errors)
1041            stdout, stderr = popen.communicate(input='')
1042            self.assertEqual(stdout, '[{}]'.format(expected))
1043
1044    def test_no_leaking(self):
1045        # Make sure we leak no resources
1046        if not mswindows:
1047            max_handles = 1026 # too much for most UNIX systems
1048        else:
1049            max_handles = 2050 # too much for (at least some) Windows setups
1050        handles = []
1051        tmpdir = tempfile.mkdtemp()
1052        try:
1053            for i in range(max_handles):
1054                try:
1055                    tmpfile = os.path.join(tmpdir, support.TESTFN)
1056                    handles.append(os.open(tmpfile, os.O_WRONLY|os.O_CREAT))
1057                except OSError as e:
1058                    if e.errno != errno.EMFILE:
1059                        raise
1060                    break
1061            else:
1062                self.skipTest("failed to reach the file descriptor limit "
1063                    "(tried %d)" % max_handles)
1064            # Close a couple of them (should be enough for a subprocess)
1065            for i in range(10):
1066                os.close(handles.pop())
1067            # Loop creating some subprocesses. If one of them leaks some fds,
1068            # the next loop iteration will fail by reaching the max fd limit.
1069            for i in range(15):
1070                p = subprocess.Popen([sys.executable, "-c",
1071                                      "import sys;"
1072                                      "sys.stdout.write(sys.stdin.read())"],
1073                                     stdin=subprocess.PIPE,
1074                                     stdout=subprocess.PIPE,
1075                                     stderr=subprocess.PIPE)
1076                data = p.communicate(b"lime")[0]
1077                self.assertEqual(data, b"lime")
1078        finally:
1079            for h in handles:
1080                os.close(h)
1081            shutil.rmtree(tmpdir)
1082
1083    def test_list2cmdline(self):
1084        self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
1085                         '"a b c" d e')
1086        self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),
1087                         'ab\\"c \\ d')
1088        self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']),
1089                         'ab\\"c " \\\\" d')
1090        self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),
1091                         'a\\\\\\b "de fg" h')
1092        self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),
1093                         'a\\\\\\"b c d')
1094        self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),
1095                         '"a\\\\b c" d e')
1096        self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),
1097                         '"a\\\\b\\ c" d e')
1098        self.assertEqual(subprocess.list2cmdline(['ab', '']),
1099                         'ab ""')
1100
1101    def test_poll(self):
1102        p = subprocess.Popen([sys.executable, "-c",
1103                              "import os; os.read(0, 1)"],
1104                             stdin=subprocess.PIPE)
1105        self.addCleanup(p.stdin.close)
1106        self.assertIsNone(p.poll())
1107        os.write(p.stdin.fileno(), b'A')
1108        p.wait()
1109        # Subsequent invocations should just return the returncode
1110        self.assertEqual(p.poll(), 0)
1111
1112    def test_wait(self):
1113        p = subprocess.Popen(ZERO_RETURN_CMD)
1114        self.assertEqual(p.wait(), 0)
1115        # Subsequent invocations should just return the returncode
1116        self.assertEqual(p.wait(), 0)
1117
1118    def test_wait_timeout(self):
1119        p = subprocess.Popen([sys.executable,
1120                              "-c", "import time; time.sleep(0.3)"])
1121        with self.assertRaises(subprocess.TimeoutExpired) as c:
1122            p.wait(timeout=0.0001)
1123        self.assertIn("0.0001", str(c.exception))  # For coverage of __str__.
1124        # Some heavily loaded buildbots (sparc Debian 3.x) require this much
1125        # time to start.
1126        self.assertEqual(p.wait(timeout=3), 0)
1127
1128    def test_invalid_bufsize(self):
1129        # an invalid type of the bufsize argument should raise
1130        # TypeError.
1131        with self.assertRaises(TypeError):
1132            subprocess.Popen(ZERO_RETURN_CMD, "orange")
1133
1134    def test_bufsize_is_none(self):
1135        # bufsize=None should be the same as bufsize=0.
1136        p = subprocess.Popen(ZERO_RETURN_CMD, None)
1137        self.assertEqual(p.wait(), 0)
1138        # Again with keyword arg
1139        p = subprocess.Popen(ZERO_RETURN_CMD, bufsize=None)
1140        self.assertEqual(p.wait(), 0)
1141
1142    def _test_bufsize_equal_one(self, line, expected, universal_newlines):
1143        # subprocess may deadlock with bufsize=1, see issue #21332
1144        with subprocess.Popen([sys.executable, "-c", "import sys;"
1145                               "sys.stdout.write(sys.stdin.readline());"
1146                               "sys.stdout.flush()"],
1147                              stdin=subprocess.PIPE,
1148                              stdout=subprocess.PIPE,
1149                              stderr=subprocess.DEVNULL,
1150                              bufsize=1,
1151                              universal_newlines=universal_newlines) as p:
1152            p.stdin.write(line) # expect that it flushes the line in text mode
1153            os.close(p.stdin.fileno()) # close it without flushing the buffer
1154            read_line = p.stdout.readline()
1155            with support.SuppressCrashReport():
1156                try:
1157                    p.stdin.close()
1158                except OSError:
1159                    pass
1160            p.stdin = None
1161        self.assertEqual(p.returncode, 0)
1162        self.assertEqual(read_line, expected)
1163
1164    def test_bufsize_equal_one_text_mode(self):
1165        # line is flushed in text mode with bufsize=1.
1166        # we should get the full line in return
1167        line = "line\n"
1168        self._test_bufsize_equal_one(line, line, universal_newlines=True)
1169
1170    def test_bufsize_equal_one_binary_mode(self):
1171        # line is not flushed in binary mode with bufsize=1.
1172        # we should get empty response
1173        line = b'line' + os.linesep.encode() # assume ascii-based locale
1174        with self.assertWarnsRegex(RuntimeWarning, 'line buffering'):
1175            self._test_bufsize_equal_one(line, b'', universal_newlines=False)
1176
1177    def test_leaking_fds_on_error(self):
1178        # see bug #5179: Popen leaks file descriptors to PIPEs if
1179        # the child fails to execute; this will eventually exhaust
1180        # the maximum number of open fds. 1024 seems a very common
1181        # value for that limit, but Windows has 2048, so we loop
1182        # 1024 times (each call leaked two fds).
1183        for i in range(1024):
1184            with self.assertRaises(NONEXISTING_ERRORS):
1185                subprocess.Popen(NONEXISTING_CMD,
1186                                 stdout=subprocess.PIPE,
1187                                 stderr=subprocess.PIPE)
1188
1189    def test_nonexisting_with_pipes(self):
1190        # bpo-30121: Popen with pipes must close properly pipes on error.
1191        # Previously, os.close() was called with a Windows handle which is not
1192        # a valid file descriptor.
1193        #
1194        # Run the test in a subprocess to control how the CRT reports errors
1195        # and to get stderr content.
1196        try:
1197            import msvcrt
1198            msvcrt.CrtSetReportMode
1199        except (AttributeError, ImportError):
1200            self.skipTest("need msvcrt.CrtSetReportMode")
1201
1202        code = textwrap.dedent(f"""
1203            import msvcrt
1204            import subprocess
1205
1206            cmd = {NONEXISTING_CMD!r}
1207
1208            for report_type in [msvcrt.CRT_WARN,
1209                                msvcrt.CRT_ERROR,
1210                                msvcrt.CRT_ASSERT]:
1211                msvcrt.CrtSetReportMode(report_type, msvcrt.CRTDBG_MODE_FILE)
1212                msvcrt.CrtSetReportFile(report_type, msvcrt.CRTDBG_FILE_STDERR)
1213
1214            try:
1215                subprocess.Popen(cmd,
1216                                 stdout=subprocess.PIPE,
1217                                 stderr=subprocess.PIPE)
1218            except OSError:
1219                pass
1220        """)
1221        cmd = [sys.executable, "-c", code]
1222        proc = subprocess.Popen(cmd,
1223                                stderr=subprocess.PIPE,
1224                                universal_newlines=True)
1225        with proc:
1226            stderr = proc.communicate()[1]
1227        self.assertEqual(stderr, "")
1228        self.assertEqual(proc.returncode, 0)
1229
1230    def test_double_close_on_error(self):
1231        # Issue #18851
1232        fds = []
1233        def open_fds():
1234            for i in range(20):
1235                fds.extend(os.pipe())
1236                time.sleep(0.001)
1237        t = threading.Thread(target=open_fds)
1238        t.start()
1239        try:
1240            with self.assertRaises(EnvironmentError):
1241                subprocess.Popen(NONEXISTING_CMD,
1242                                 stdin=subprocess.PIPE,
1243                                 stdout=subprocess.PIPE,
1244                                 stderr=subprocess.PIPE)
1245        finally:
1246            t.join()
1247            exc = None
1248            for fd in fds:
1249                # If a double close occurred, some of those fds will
1250                # already have been closed by mistake, and os.close()
1251                # here will raise.
1252                try:
1253                    os.close(fd)
1254                except OSError as e:
1255                    exc = e
1256            if exc is not None:
1257                raise exc
1258
1259    def test_threadsafe_wait(self):
1260        """Issue21291: Popen.wait() needs to be threadsafe for returncode."""
1261        proc = subprocess.Popen([sys.executable, '-c',
1262                                 'import time; time.sleep(12)'])
1263        self.assertEqual(proc.returncode, None)
1264        results = []
1265
1266        def kill_proc_timer_thread():
1267            results.append(('thread-start-poll-result', proc.poll()))
1268            # terminate it from the thread and wait for the result.
1269            proc.kill()
1270            proc.wait()
1271            results.append(('thread-after-kill-and-wait', proc.returncode))
1272            # this wait should be a no-op given the above.
1273            proc.wait()
1274            results.append(('thread-after-second-wait', proc.returncode))
1275
1276        # This is a timing sensitive test, the failure mode is
1277        # triggered when both the main thread and this thread are in
1278        # the wait() call at once.  The delay here is to allow the
1279        # main thread to most likely be blocked in its wait() call.
1280        t = threading.Timer(0.2, kill_proc_timer_thread)
1281        t.start()
1282
1283        if mswindows:
1284            expected_errorcode = 1
1285        else:
1286            # Should be -9 because of the proc.kill() from the thread.
1287            expected_errorcode = -9
1288
1289        # Wait for the process to finish; the thread should kill it
1290        # long before it finishes on its own.  Supplying a timeout
1291        # triggers a different code path for better coverage.
1292        proc.wait(timeout=20)
1293        self.assertEqual(proc.returncode, expected_errorcode,
1294                         msg="unexpected result in wait from main thread")
1295
1296        # This should be a no-op with no change in returncode.
1297        proc.wait()
1298        self.assertEqual(proc.returncode, expected_errorcode,
1299                         msg="unexpected result in second main wait.")
1300
1301        t.join()
1302        # Ensure that all of the thread results are as expected.
1303        # When a race condition occurs in wait(), the returncode could
1304        # be set by the wrong thread that doesn't actually have it
1305        # leading to an incorrect value.
1306        self.assertEqual([('thread-start-poll-result', None),
1307                          ('thread-after-kill-and-wait', expected_errorcode),
1308                          ('thread-after-second-wait', expected_errorcode)],
1309                         results)
1310
1311    def test_issue8780(self):
1312        # Ensure that stdout is inherited from the parent
1313        # if stdout=PIPE is not used
1314        code = ';'.join((
1315            'import subprocess, sys',
1316            'retcode = subprocess.call('
1317                "[sys.executable, '-c', 'print(\"Hello World!\")'])",
1318            'assert retcode == 0'))
1319        output = subprocess.check_output([sys.executable, '-c', code])
1320        self.assertTrue(output.startswith(b'Hello World!'), ascii(output))
1321
1322    def test_handles_closed_on_exception(self):
1323        # If CreateProcess exits with an error, ensure the
1324        # duplicate output handles are released
1325        ifhandle, ifname = tempfile.mkstemp()
1326        ofhandle, ofname = tempfile.mkstemp()
1327        efhandle, efname = tempfile.mkstemp()
1328        try:
1329            subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle,
1330              stderr=efhandle)
1331        except OSError:
1332            os.close(ifhandle)
1333            os.remove(ifname)
1334            os.close(ofhandle)
1335            os.remove(ofname)
1336            os.close(efhandle)
1337            os.remove(efname)
1338        self.assertFalse(os.path.exists(ifname))
1339        self.assertFalse(os.path.exists(ofname))
1340        self.assertFalse(os.path.exists(efname))
1341
1342    def test_communicate_epipe(self):
1343        # Issue 10963: communicate() should hide EPIPE
1344        p = subprocess.Popen(ZERO_RETURN_CMD,
1345                             stdin=subprocess.PIPE,
1346                             stdout=subprocess.PIPE,
1347                             stderr=subprocess.PIPE)
1348        self.addCleanup(p.stdout.close)
1349        self.addCleanup(p.stderr.close)
1350        self.addCleanup(p.stdin.close)
1351        p.communicate(b"x" * 2**20)
1352
1353    def test_communicate_epipe_only_stdin(self):
1354        # Issue 10963: communicate() should hide EPIPE
1355        p = subprocess.Popen(ZERO_RETURN_CMD,
1356                             stdin=subprocess.PIPE)
1357        self.addCleanup(p.stdin.close)
1358        p.wait()
1359        p.communicate(b"x" * 2**20)
1360
1361    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'),
1362                         "Requires signal.SIGUSR1")
1363    @unittest.skipUnless(hasattr(os, 'kill'),
1364                         "Requires os.kill")
1365    @unittest.skipUnless(hasattr(os, 'getppid'),
1366                         "Requires os.getppid")
1367    def test_communicate_eintr(self):
1368        # Issue #12493: communicate() should handle EINTR
1369        def handler(signum, frame):
1370            pass
1371        old_handler = signal.signal(signal.SIGUSR1, handler)
1372        self.addCleanup(signal.signal, signal.SIGUSR1, old_handler)
1373
1374        args = [sys.executable, "-c",
1375                'import os, signal;'
1376                'os.kill(os.getppid(), signal.SIGUSR1)']
1377        for stream in ('stdout', 'stderr'):
1378            kw = {stream: subprocess.PIPE}
1379            with subprocess.Popen(args, **kw) as process:
1380                # communicate() will be interrupted by SIGUSR1
1381                process.communicate()
1382
1383
1384    # This test is Linux-ish specific for simplicity to at least have
1385    # some coverage.  It is not a platform specific bug.
1386    @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()),
1387                         "Linux specific")
1388    def test_failed_child_execute_fd_leak(self):
1389        """Test for the fork() failure fd leak reported in issue16327."""
1390        fd_directory = '/proc/%d/fd' % os.getpid()
1391        fds_before_popen = os.listdir(fd_directory)
1392        with self.assertRaises(PopenTestException):
1393            PopenExecuteChildRaises(
1394                    ZERO_RETURN_CMD, stdin=subprocess.PIPE,
1395                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1396
1397        # NOTE: This test doesn't verify that the real _execute_child
1398        # does not close the file descriptors itself on the way out
1399        # during an exception.  Code inspection has confirmed that.
1400
1401        fds_after_exception = os.listdir(fd_directory)
1402        self.assertEqual(fds_before_popen, fds_after_exception)
1403
1404    @unittest.skipIf(mswindows, "behavior currently not supported on Windows")
1405    def test_file_not_found_includes_filename(self):
1406        with self.assertRaises(FileNotFoundError) as c:
1407            subprocess.call(['/opt/nonexistent_binary', 'with', 'some', 'args'])
1408        self.assertEqual(c.exception.filename, '/opt/nonexistent_binary')
1409
1410    @unittest.skipIf(mswindows, "behavior currently not supported on Windows")
1411    def test_file_not_found_with_bad_cwd(self):
1412        with self.assertRaises(FileNotFoundError) as c:
1413            subprocess.Popen(['exit', '0'], cwd='/some/nonexistent/directory')
1414        self.assertEqual(c.exception.filename, '/some/nonexistent/directory')
1415
1416
1417class RunFuncTestCase(BaseTestCase):
1418    def run_python(self, code, **kwargs):
1419        """Run Python code in a subprocess using subprocess.run"""
1420        argv = [sys.executable, "-c", code]
1421        return subprocess.run(argv, **kwargs)
1422
1423    def test_returncode(self):
1424        # call() function with sequence argument
1425        cp = self.run_python("import sys; sys.exit(47)")
1426        self.assertEqual(cp.returncode, 47)
1427        with self.assertRaises(subprocess.CalledProcessError):
1428            cp.check_returncode()
1429
1430    def test_check(self):
1431        with self.assertRaises(subprocess.CalledProcessError) as c:
1432            self.run_python("import sys; sys.exit(47)", check=True)
1433        self.assertEqual(c.exception.returncode, 47)
1434
1435    def test_check_zero(self):
1436        # check_returncode shouldn't raise when returncode is zero
1437        cp = subprocess.run(ZERO_RETURN_CMD, check=True)
1438        self.assertEqual(cp.returncode, 0)
1439
1440    def test_timeout(self):
1441        # run() function with timeout argument; we want to test that the child
1442        # process gets killed when the timeout expires.  If the child isn't
1443        # killed, this call will deadlock since subprocess.run waits for the
1444        # child.
1445        with self.assertRaises(subprocess.TimeoutExpired):
1446            self.run_python("while True: pass", timeout=0.0001)
1447
1448    def test_capture_stdout(self):
1449        # capture stdout with zero return code
1450        cp = self.run_python("print('BDFL')", stdout=subprocess.PIPE)
1451        self.assertIn(b'BDFL', cp.stdout)
1452
1453    def test_capture_stderr(self):
1454        cp = self.run_python("import sys; sys.stderr.write('BDFL')",
1455                             stderr=subprocess.PIPE)
1456        self.assertIn(b'BDFL', cp.stderr)
1457
1458    def test_check_output_stdin_arg(self):
1459        # run() can be called with stdin set to a file
1460        tf = tempfile.TemporaryFile()
1461        self.addCleanup(tf.close)
1462        tf.write(b'pear')
1463        tf.seek(0)
1464        cp = self.run_python(
1465                 "import sys; sys.stdout.write(sys.stdin.read().upper())",
1466                stdin=tf, stdout=subprocess.PIPE)
1467        self.assertIn(b'PEAR', cp.stdout)
1468
1469    def test_check_output_input_arg(self):
1470        # check_output() can be called with input set to a string
1471        cp = self.run_python(
1472                "import sys; sys.stdout.write(sys.stdin.read().upper())",
1473                input=b'pear', stdout=subprocess.PIPE)
1474        self.assertIn(b'PEAR', cp.stdout)
1475
1476    def test_check_output_stdin_with_input_arg(self):
1477        # run() refuses to accept 'stdin' with 'input'
1478        tf = tempfile.TemporaryFile()
1479        self.addCleanup(tf.close)
1480        tf.write(b'pear')
1481        tf.seek(0)
1482        with self.assertRaises(ValueError,
1483              msg="Expected ValueError when stdin and input args supplied.") as c:
1484            output = self.run_python("print('will not be run')",
1485                                     stdin=tf, input=b'hare')
1486        self.assertIn('stdin', c.exception.args[0])
1487        self.assertIn('input', c.exception.args[0])
1488
1489    def test_check_output_timeout(self):
1490        with self.assertRaises(subprocess.TimeoutExpired) as c:
1491            cp = self.run_python((
1492                     "import sys, time\n"
1493                     "sys.stdout.write('BDFL')\n"
1494                     "sys.stdout.flush()\n"
1495                     "time.sleep(3600)"),
1496                    # Some heavily loaded buildbots (sparc Debian 3.x) require
1497                    # this much time to start and print.
1498                    timeout=3, stdout=subprocess.PIPE)
1499        self.assertEqual(c.exception.output, b'BDFL')
1500        # output is aliased to stdout
1501        self.assertEqual(c.exception.stdout, b'BDFL')
1502
1503    def test_run_kwargs(self):
1504        newenv = os.environ.copy()
1505        newenv["FRUIT"] = "banana"
1506        cp = self.run_python(('import sys, os;'
1507                      'sys.exit(33 if os.getenv("FRUIT")=="banana" else 31)'),
1508                             env=newenv)
1509        self.assertEqual(cp.returncode, 33)
1510
1511    def test_run_with_pathlike_path(self):
1512        # bpo-31961: test run(pathlike_object)
1513        # the name of a command that can be run without
1514        # any argumenets that exit fast
1515        prog = 'tree.com' if mswindows else 'ls'
1516        path = shutil.which(prog)
1517        if path is None:
1518            self.skipTest(f'{prog} required for this test')
1519        path = FakePath(path)
1520        res = subprocess.run(path, stdout=subprocess.DEVNULL)
1521        self.assertEqual(res.returncode, 0)
1522        with self.assertRaises(TypeError):
1523            subprocess.run(path, stdout=subprocess.DEVNULL, shell=True)
1524
1525    def test_run_with_bytes_path_and_arguments(self):
1526        # bpo-31961: test run([bytes_object, b'additional arguments'])
1527        path = os.fsencode(sys.executable)
1528        args = [path, '-c', b'import sys; sys.exit(57)']
1529        res = subprocess.run(args)
1530        self.assertEqual(res.returncode, 57)
1531
1532    def test_run_with_pathlike_path_and_arguments(self):
1533        # bpo-31961: test run([pathlike_object, 'additional arguments'])
1534        path = FakePath(sys.executable)
1535        args = [path, '-c', 'import sys; sys.exit(57)']
1536        res = subprocess.run(args)
1537        self.assertEqual(res.returncode, 57)
1538
1539    def test_capture_output(self):
1540        cp = self.run_python(("import sys;"
1541                              "sys.stdout.write('BDFL'); "
1542                              "sys.stderr.write('FLUFL')"),
1543                             capture_output=True)
1544        self.assertIn(b'BDFL', cp.stdout)
1545        self.assertIn(b'FLUFL', cp.stderr)
1546
1547    def test_stdout_with_capture_output_arg(self):
1548        # run() refuses to accept 'stdout' with 'capture_output'
1549        tf = tempfile.TemporaryFile()
1550        self.addCleanup(tf.close)
1551        with self.assertRaises(ValueError,
1552            msg=("Expected ValueError when stdout and capture_output "
1553                 "args supplied.")) as c:
1554            output = self.run_python("print('will not be run')",
1555                                      capture_output=True, stdout=tf)
1556        self.assertIn('stdout', c.exception.args[0])
1557        self.assertIn('capture_output', c.exception.args[0])
1558
1559    def test_stderr_with_capture_output_arg(self):
1560        # run() refuses to accept 'stderr' with 'capture_output'
1561        tf = tempfile.TemporaryFile()
1562        self.addCleanup(tf.close)
1563        with self.assertRaises(ValueError,
1564            msg=("Expected ValueError when stderr and capture_output "
1565                 "args supplied.")) as c:
1566            output = self.run_python("print('will not be run')",
1567                                      capture_output=True, stderr=tf)
1568        self.assertIn('stderr', c.exception.args[0])
1569        self.assertIn('capture_output', c.exception.args[0])
1570
1571    # This test _might_ wind up a bit fragile on loaded build+test machines
1572    # as it depends on the timing with wide enough margins for normal situations
1573    # but does assert that it happened "soon enough" to believe the right thing
1574    # happened.
1575    @unittest.skipIf(mswindows, "requires posix like 'sleep' shell command")
1576    def test_run_with_shell_timeout_and_capture_output(self):
1577        """Output capturing after a timeout mustn't hang forever on open filehandles."""
1578        before_secs = time.monotonic()
1579        try:
1580            subprocess.run('sleep 3', shell=True, timeout=0.1,
1581                           capture_output=True)  # New session unspecified.
1582        except subprocess.TimeoutExpired as exc:
1583            after_secs = time.monotonic()
1584            stacks = traceback.format_exc()  # assertRaises doesn't give this.
1585        else:
1586            self.fail("TimeoutExpired not raised.")
1587        self.assertLess(after_secs - before_secs, 1.5,
1588                        msg="TimeoutExpired was delayed! Bad traceback:\n```\n"
1589                        f"{stacks}```")
1590
1591
1592@unittest.skipIf(mswindows, "POSIX specific tests")
1593class POSIXProcessTestCase(BaseTestCase):
1594
1595    def setUp(self):
1596        super().setUp()
1597        self._nonexistent_dir = "/_this/pa.th/does/not/exist"
1598
1599    def _get_chdir_exception(self):
1600        try:
1601            os.chdir(self._nonexistent_dir)
1602        except OSError as e:
1603            # This avoids hard coding the errno value or the OS perror()
1604            # string and instead capture the exception that we want to see
1605            # below for comparison.
1606            desired_exception = e
1607        else:
1608            self.fail("chdir to nonexistent directory %s succeeded." %
1609                      self._nonexistent_dir)
1610        return desired_exception
1611
1612    def test_exception_cwd(self):
1613        """Test error in the child raised in the parent for a bad cwd."""
1614        desired_exception = self._get_chdir_exception()
1615        try:
1616            p = subprocess.Popen([sys.executable, "-c", ""],
1617                                 cwd=self._nonexistent_dir)
1618        except OSError as e:
1619            # Test that the child process chdir failure actually makes
1620            # it up to the parent process as the correct exception.
1621            self.assertEqual(desired_exception.errno, e.errno)
1622            self.assertEqual(desired_exception.strerror, e.strerror)
1623            self.assertEqual(desired_exception.filename, e.filename)
1624        else:
1625            self.fail("Expected OSError: %s" % desired_exception)
1626
1627    def test_exception_bad_executable(self):
1628        """Test error in the child raised in the parent for a bad executable."""
1629        desired_exception = self._get_chdir_exception()
1630        try:
1631            p = subprocess.Popen([sys.executable, "-c", ""],
1632                                 executable=self._nonexistent_dir)
1633        except OSError as e:
1634            # Test that the child process exec failure actually makes
1635            # it up to the parent process as the correct exception.
1636            self.assertEqual(desired_exception.errno, e.errno)
1637            self.assertEqual(desired_exception.strerror, e.strerror)
1638            self.assertEqual(desired_exception.filename, e.filename)
1639        else:
1640            self.fail("Expected OSError: %s" % desired_exception)
1641
1642    def test_exception_bad_args_0(self):
1643        """Test error in the child raised in the parent for a bad args[0]."""
1644        desired_exception = self._get_chdir_exception()
1645        try:
1646            p = subprocess.Popen([self._nonexistent_dir, "-c", ""])
1647        except OSError as e:
1648            # Test that the child process exec failure actually makes
1649            # it up to the parent process as the correct exception.
1650            self.assertEqual(desired_exception.errno, e.errno)
1651            self.assertEqual(desired_exception.strerror, e.strerror)
1652            self.assertEqual(desired_exception.filename, e.filename)
1653        else:
1654            self.fail("Expected OSError: %s" % desired_exception)
1655
1656    # We mock the __del__ method for Popen in the next two tests
1657    # because it does cleanup based on the pid returned by fork_exec
1658    # along with issuing a resource warning if it still exists. Since
1659    # we don't actually spawn a process in these tests we can forego
1660    # the destructor. An alternative would be to set _child_created to
1661    # False before the destructor is called but there is no easy way
1662    # to do that
1663    class PopenNoDestructor(subprocess.Popen):
1664        def __del__(self):
1665            pass
1666
1667    @mock.patch("subprocess._posixsubprocess.fork_exec")
1668    def test_exception_errpipe_normal(self, fork_exec):
1669        """Test error passing done through errpipe_write in the good case"""
1670        def proper_error(*args):
1671            errpipe_write = args[13]
1672            # Write the hex for the error code EISDIR: 'is a directory'
1673            err_code = '{:x}'.format(errno.EISDIR).encode()
1674            os.write(errpipe_write, b"OSError:" + err_code + b":")
1675            return 0
1676
1677        fork_exec.side_effect = proper_error
1678
1679        with mock.patch("subprocess.os.waitpid",
1680                        side_effect=ChildProcessError):
1681            with self.assertRaises(IsADirectoryError):
1682                self.PopenNoDestructor(["non_existent_command"])
1683
1684    @mock.patch("subprocess._posixsubprocess.fork_exec")
1685    def test_exception_errpipe_bad_data(self, fork_exec):
1686        """Test error passing done through errpipe_write where its not
1687        in the expected format"""
1688        error_data = b"\xFF\x00\xDE\xAD"
1689        def bad_error(*args):
1690            errpipe_write = args[13]
1691            # Anything can be in the pipe, no assumptions should
1692            # be made about its encoding, so we'll write some
1693            # arbitrary hex bytes to test it out
1694            os.write(errpipe_write, error_data)
1695            return 0
1696
1697        fork_exec.side_effect = bad_error
1698
1699        with mock.patch("subprocess.os.waitpid",
1700                        side_effect=ChildProcessError):
1701            with self.assertRaises(subprocess.SubprocessError) as e:
1702                self.PopenNoDestructor(["non_existent_command"])
1703
1704        self.assertIn(repr(error_data), str(e.exception))
1705
1706    @unittest.skipIf(not os.path.exists('/proc/self/status'),
1707                     "need /proc/self/status")
1708    def test_restore_signals(self):
1709        # Blindly assume that cat exists on systems with /proc/self/status...
1710        default_proc_status = subprocess.check_output(
1711                ['cat', '/proc/self/status'],
1712                restore_signals=False)
1713        for line in default_proc_status.splitlines():
1714            if line.startswith(b'SigIgn'):
1715                default_sig_ign_mask = line
1716                break
1717        else:
1718            self.skipTest("SigIgn not found in /proc/self/status.")
1719        restored_proc_status = subprocess.check_output(
1720                ['cat', '/proc/self/status'],
1721                restore_signals=True)
1722        for line in restored_proc_status.splitlines():
1723            if line.startswith(b'SigIgn'):
1724                restored_sig_ign_mask = line
1725                break
1726        self.assertNotEqual(default_sig_ign_mask, restored_sig_ign_mask,
1727                            msg="restore_signals=True should've unblocked "
1728                            "SIGPIPE and friends.")
1729
1730    def test_start_new_session(self):
1731        # For code coverage of calling setsid().  We don't care if we get an
1732        # EPERM error from it depending on the test execution environment, that
1733        # still indicates that it was called.
1734        try:
1735            output = subprocess.check_output(
1736                    [sys.executable, "-c", "import os; print(os.getsid(0))"],
1737                    start_new_session=True)
1738        except OSError as e:
1739            if e.errno != errno.EPERM:
1740                raise
1741        else:
1742            parent_sid = os.getsid(0)
1743            child_sid = int(output)
1744            self.assertNotEqual(parent_sid, child_sid)
1745
1746    def test_run_abort(self):
1747        # returncode handles signal termination
1748        with support.SuppressCrashReport():
1749            p = subprocess.Popen([sys.executable, "-c",
1750                                  'import os; os.abort()'])
1751            p.wait()
1752        self.assertEqual(-p.returncode, signal.SIGABRT)
1753
1754    def test_CalledProcessError_str_signal(self):
1755        err = subprocess.CalledProcessError(-int(signal.SIGABRT), "fake cmd")
1756        error_string = str(err)
1757        # We're relying on the repr() of the signal.Signals intenum to provide
1758        # the word signal, the signal name and the numeric value.
1759        self.assertIn("signal", error_string.lower())
1760        # We're not being specific about the signal name as some signals have
1761        # multiple names and which name is revealed can vary.
1762        self.assertIn("SIG", error_string)
1763        self.assertIn(str(signal.SIGABRT), error_string)
1764
1765    def test_CalledProcessError_str_unknown_signal(self):
1766        err = subprocess.CalledProcessError(-9876543, "fake cmd")
1767        error_string = str(err)
1768        self.assertIn("unknown signal 9876543.", error_string)
1769
1770    def test_CalledProcessError_str_non_zero(self):
1771        err = subprocess.CalledProcessError(2, "fake cmd")
1772        error_string = str(err)
1773        self.assertIn("non-zero exit status 2.", error_string)
1774
1775    def test_preexec(self):
1776        # DISCLAIMER: Setting environment variables is *not* a good use
1777        # of a preexec_fn.  This is merely a test.
1778        p = subprocess.Popen([sys.executable, "-c",
1779                              'import sys,os;'
1780                              'sys.stdout.write(os.getenv("FRUIT"))'],
1781                             stdout=subprocess.PIPE,
1782                             preexec_fn=lambda: os.putenv("FRUIT", "apple"))
1783        with p:
1784            self.assertEqual(p.stdout.read(), b"apple")
1785
1786    def test_preexec_exception(self):
1787        def raise_it():
1788            raise ValueError("What if two swallows carried a coconut?")
1789        try:
1790            p = subprocess.Popen([sys.executable, "-c", ""],
1791                                 preexec_fn=raise_it)
1792        except subprocess.SubprocessError as e:
1793            self.assertTrue(
1794                    subprocess._posixsubprocess,
1795                    "Expected a ValueError from the preexec_fn")
1796        except ValueError as e:
1797            self.assertIn("coconut", e.args[0])
1798        else:
1799            self.fail("Exception raised by preexec_fn did not make it "
1800                      "to the parent process.")
1801
1802    class _TestExecuteChildPopen(subprocess.Popen):
1803        """Used to test behavior at the end of _execute_child."""
1804        def __init__(self, testcase, *args, **kwargs):
1805            self._testcase = testcase
1806            subprocess.Popen.__init__(self, *args, **kwargs)
1807
1808        def _execute_child(self, *args, **kwargs):
1809            try:
1810                subprocess.Popen._execute_child(self, *args, **kwargs)
1811            finally:
1812                # Open a bunch of file descriptors and verify that
1813                # none of them are the same as the ones the Popen
1814                # instance is using for stdin/stdout/stderr.
1815                devzero_fds = [os.open("/dev/zero", os.O_RDONLY)
1816                               for _ in range(8)]
1817                try:
1818                    for fd in devzero_fds:
1819                        self._testcase.assertNotIn(
1820                                fd, (self.stdin.fileno(), self.stdout.fileno(),
1821                                     self.stderr.fileno()),
1822                                msg="At least one fd was closed early.")
1823                finally:
1824                    for fd in devzero_fds:
1825                        os.close(fd)
1826
1827    @unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.")
1828    def test_preexec_errpipe_does_not_double_close_pipes(self):
1829        """Issue16140: Don't double close pipes on preexec error."""
1830
1831        def raise_it():
1832            raise subprocess.SubprocessError(
1833                    "force the _execute_child() errpipe_data path.")
1834
1835        with self.assertRaises(subprocess.SubprocessError):
1836            self._TestExecuteChildPopen(
1837                        self, ZERO_RETURN_CMD,
1838                        stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1839                        stderr=subprocess.PIPE, preexec_fn=raise_it)
1840
1841    def test_preexec_gc_module_failure(self):
1842        # This tests the code that disables garbage collection if the child
1843        # process will execute any Python.
1844        def raise_runtime_error():
1845            raise RuntimeError("this shouldn't escape")
1846        enabled = gc.isenabled()
1847        orig_gc_disable = gc.disable
1848        orig_gc_isenabled = gc.isenabled
1849        try:
1850            gc.disable()
1851            self.assertFalse(gc.isenabled())
1852            subprocess.call([sys.executable, '-c', ''],
1853                            preexec_fn=lambda: None)
1854            self.assertFalse(gc.isenabled(),
1855                             "Popen enabled gc when it shouldn't.")
1856
1857            gc.enable()
1858            self.assertTrue(gc.isenabled())
1859            subprocess.call([sys.executable, '-c', ''],
1860                            preexec_fn=lambda: None)
1861            self.assertTrue(gc.isenabled(), "Popen left gc disabled.")
1862
1863            gc.disable = raise_runtime_error
1864            self.assertRaises(RuntimeError, subprocess.Popen,
1865                              [sys.executable, '-c', ''],
1866                              preexec_fn=lambda: None)
1867
1868            del gc.isenabled  # force an AttributeError
1869            self.assertRaises(AttributeError, subprocess.Popen,
1870                              [sys.executable, '-c', ''],
1871                              preexec_fn=lambda: None)
1872        finally:
1873            gc.disable = orig_gc_disable
1874            gc.isenabled = orig_gc_isenabled
1875            if not enabled:
1876                gc.disable()
1877
1878    @unittest.skipIf(
1879        sys.platform == 'darwin', 'setrlimit() seems to fail on OS X')
1880    def test_preexec_fork_failure(self):
1881        # The internal code did not preserve the previous exception when
1882        # re-enabling garbage collection
1883        try:
1884            from resource import getrlimit, setrlimit, RLIMIT_NPROC
1885        except ImportError as err:
1886            self.skipTest(err)  # RLIMIT_NPROC is specific to Linux and BSD
1887        limits = getrlimit(RLIMIT_NPROC)
1888        [_, hard] = limits
1889        setrlimit(RLIMIT_NPROC, (0, hard))
1890        self.addCleanup(setrlimit, RLIMIT_NPROC, limits)
1891        try:
1892            subprocess.call([sys.executable, '-c', ''],
1893                            preexec_fn=lambda: None)
1894        except BlockingIOError:
1895            # Forking should raise EAGAIN, translated to BlockingIOError
1896            pass
1897        else:
1898            self.skipTest('RLIMIT_NPROC had no effect; probably superuser')
1899
1900    def test_args_string(self):
1901        # args is a string
1902        fd, fname = tempfile.mkstemp()
1903        # reopen in text mode
1904        with open(fd, "w", errors="surrogateescape") as fobj:
1905            fobj.write("#!%s\n" % support.unix_shell)
1906            fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" %
1907                       sys.executable)
1908        os.chmod(fname, 0o700)
1909        p = subprocess.Popen(fname)
1910        p.wait()
1911        os.remove(fname)
1912        self.assertEqual(p.returncode, 47)
1913
1914    def test_invalid_args(self):
1915        # invalid arguments should raise ValueError
1916        self.assertRaises(ValueError, subprocess.call,
1917                          [sys.executable, "-c",
1918                           "import sys; sys.exit(47)"],
1919                          startupinfo=47)
1920        self.assertRaises(ValueError, subprocess.call,
1921                          [sys.executable, "-c",
1922                           "import sys; sys.exit(47)"],
1923                          creationflags=47)
1924
1925    def test_shell_sequence(self):
1926        # Run command through the shell (sequence)
1927        newenv = os.environ.copy()
1928        newenv["FRUIT"] = "apple"
1929        p = subprocess.Popen(["echo $FRUIT"], shell=1,
1930                             stdout=subprocess.PIPE,
1931                             env=newenv)
1932        with p:
1933            self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
1934
1935    def test_shell_string(self):
1936        # Run command through the shell (string)
1937        newenv = os.environ.copy()
1938        newenv["FRUIT"] = "apple"
1939        p = subprocess.Popen("echo $FRUIT", shell=1,
1940                             stdout=subprocess.PIPE,
1941                             env=newenv)
1942        with p:
1943            self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
1944
1945    def test_call_string(self):
1946        # call() function with string argument on UNIX
1947        fd, fname = tempfile.mkstemp()
1948        # reopen in text mode
1949        with open(fd, "w", errors="surrogateescape") as fobj:
1950            fobj.write("#!%s\n" % support.unix_shell)
1951            fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" %
1952                       sys.executable)
1953        os.chmod(fname, 0o700)
1954        rc = subprocess.call(fname)
1955        os.remove(fname)
1956        self.assertEqual(rc, 47)
1957
1958    def test_specific_shell(self):
1959        # Issue #9265: Incorrect name passed as arg[0].
1960        shells = []
1961        for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']:
1962            for name in ['bash', 'ksh']:
1963                sh = os.path.join(prefix, name)
1964                if os.path.isfile(sh):
1965                    shells.append(sh)
1966        if not shells: # Will probably work for any shell but csh.
1967            self.skipTest("bash or ksh required for this test")
1968        sh = '/bin/sh'
1969        if os.path.isfile(sh) and not os.path.islink(sh):
1970            # Test will fail if /bin/sh is a symlink to csh.
1971            shells.append(sh)
1972        for sh in shells:
1973            p = subprocess.Popen("echo $0", executable=sh, shell=True,
1974                                 stdout=subprocess.PIPE)
1975            with p:
1976                self.assertEqual(p.stdout.read().strip(), bytes(sh, 'ascii'))
1977
1978    def _kill_process(self, method, *args):
1979        # Do not inherit file handles from the parent.
1980        # It should fix failures on some platforms.
1981        # Also set the SIGINT handler to the default to make sure it's not
1982        # being ignored (some tests rely on that.)
1983        old_handler = signal.signal(signal.SIGINT, signal.default_int_handler)
1984        try:
1985            p = subprocess.Popen([sys.executable, "-c", """if 1:
1986                                 import sys, time
1987                                 sys.stdout.write('x\\n')
1988                                 sys.stdout.flush()
1989                                 time.sleep(30)
1990                                 """],
1991                                 close_fds=True,
1992                                 stdin=subprocess.PIPE,
1993                                 stdout=subprocess.PIPE,
1994                                 stderr=subprocess.PIPE)
1995        finally:
1996            signal.signal(signal.SIGINT, old_handler)
1997        # Wait for the interpreter to be completely initialized before
1998        # sending any signal.
1999        p.stdout.read(1)
2000        getattr(p, method)(*args)
2001        return p
2002
2003    @unittest.skipIf(sys.platform.startswith(('netbsd', 'openbsd')),
2004                     "Due to known OS bug (issue #16762)")
2005    def _kill_dead_process(self, method, *args):
2006        # Do not inherit file handles from the parent.
2007        # It should fix failures on some platforms.
2008        p = subprocess.Popen([sys.executable, "-c", """if 1:
2009                             import sys, time
2010                             sys.stdout.write('x\\n')
2011                             sys.stdout.flush()
2012                             """],
2013                             close_fds=True,
2014                             stdin=subprocess.PIPE,
2015                             stdout=subprocess.PIPE,
2016                             stderr=subprocess.PIPE)
2017        # Wait for the interpreter to be completely initialized before
2018        # sending any signal.
2019        p.stdout.read(1)
2020        # The process should end after this
2021        time.sleep(1)
2022        # This shouldn't raise even though the child is now dead
2023        getattr(p, method)(*args)
2024        p.communicate()
2025
2026    def test_send_signal(self):
2027        p = self._kill_process('send_signal', signal.SIGINT)
2028        _, stderr = p.communicate()
2029        self.assertIn(b'KeyboardInterrupt', stderr)
2030        self.assertNotEqual(p.wait(), 0)
2031
2032    def test_kill(self):
2033        p = self._kill_process('kill')
2034        _, stderr = p.communicate()
2035        self.assertStderrEqual(stderr, b'')
2036        self.assertEqual(p.wait(), -signal.SIGKILL)
2037
2038    def test_terminate(self):
2039        p = self._kill_process('terminate')
2040        _, stderr = p.communicate()
2041        self.assertStderrEqual(stderr, b'')
2042        self.assertEqual(p.wait(), -signal.SIGTERM)
2043
2044    def test_send_signal_dead(self):
2045        # Sending a signal to a dead process
2046        self._kill_dead_process('send_signal', signal.SIGINT)
2047
2048    def test_kill_dead(self):
2049        # Killing a dead process
2050        self._kill_dead_process('kill')
2051
2052    def test_terminate_dead(self):
2053        # Terminating a dead process
2054        self._kill_dead_process('terminate')
2055
2056    def _save_fds(self, save_fds):
2057        fds = []
2058        for fd in save_fds:
2059            inheritable = os.get_inheritable(fd)
2060            saved = os.dup(fd)
2061            fds.append((fd, saved, inheritable))
2062        return fds
2063
2064    def _restore_fds(self, fds):
2065        for fd, saved, inheritable in fds:
2066            os.dup2(saved, fd, inheritable=inheritable)
2067            os.close(saved)
2068
2069    def check_close_std_fds(self, fds):
2070        # Issue #9905: test that subprocess pipes still work properly with
2071        # some standard fds closed
2072        stdin = 0
2073        saved_fds = self._save_fds(fds)
2074        for fd, saved, inheritable in saved_fds:
2075            if fd == 0:
2076                stdin = saved
2077                break
2078        try:
2079            for fd in fds:
2080                os.close(fd)
2081            out, err = subprocess.Popen([sys.executable, "-c",
2082                              'import sys;'
2083                              'sys.stdout.write("apple");'
2084                              'sys.stdout.flush();'
2085                              'sys.stderr.write("orange")'],
2086                       stdin=stdin,
2087                       stdout=subprocess.PIPE,
2088                       stderr=subprocess.PIPE).communicate()
2089            err = support.strip_python_stderr(err)
2090            self.assertEqual((out, err), (b'apple', b'orange'))
2091        finally:
2092            self._restore_fds(saved_fds)
2093
2094    def test_close_fd_0(self):
2095        self.check_close_std_fds([0])
2096
2097    def test_close_fd_1(self):
2098        self.check_close_std_fds([1])
2099
2100    def test_close_fd_2(self):
2101        self.check_close_std_fds([2])
2102
2103    def test_close_fds_0_1(self):
2104        self.check_close_std_fds([0, 1])
2105
2106    def test_close_fds_0_2(self):
2107        self.check_close_std_fds([0, 2])
2108
2109    def test_close_fds_1_2(self):
2110        self.check_close_std_fds([1, 2])
2111
2112    def test_close_fds_0_1_2(self):
2113        # Issue #10806: test that subprocess pipes still work properly with
2114        # all standard fds closed.
2115        self.check_close_std_fds([0, 1, 2])
2116
2117    def test_small_errpipe_write_fd(self):
2118        """Issue #15798: Popen should work when stdio fds are available."""
2119        new_stdin = os.dup(0)
2120        new_stdout = os.dup(1)
2121        try:
2122            os.close(0)
2123            os.close(1)
2124
2125            # Side test: if errpipe_write fails to have its CLOEXEC
2126            # flag set this should cause the parent to think the exec
2127            # failed.  Extremely unlikely: everyone supports CLOEXEC.
2128            subprocess.Popen([
2129                    sys.executable, "-c",
2130                    "print('AssertionError:0:CLOEXEC failure.')"]).wait()
2131        finally:
2132            # Restore original stdin and stdout
2133            os.dup2(new_stdin, 0)
2134            os.dup2(new_stdout, 1)
2135            os.close(new_stdin)
2136            os.close(new_stdout)
2137
2138    def test_remapping_std_fds(self):
2139        # open up some temporary files
2140        temps = [tempfile.mkstemp() for i in range(3)]
2141        try:
2142            temp_fds = [fd for fd, fname in temps]
2143
2144            # unlink the files -- we won't need to reopen them
2145            for fd, fname in temps:
2146                os.unlink(fname)
2147
2148            # write some data to what will become stdin, and rewind
2149            os.write(temp_fds[1], b"STDIN")
2150            os.lseek(temp_fds[1], 0, 0)
2151
2152            # move the standard file descriptors out of the way
2153            saved_fds = self._save_fds(range(3))
2154            try:
2155                # duplicate the file objects over the standard fd's
2156                for fd, temp_fd in enumerate(temp_fds):
2157                    os.dup2(temp_fd, fd)
2158
2159                # now use those files in the "wrong" order, so that subprocess
2160                # has to rearrange them in the child
2161                p = subprocess.Popen([sys.executable, "-c",
2162                    'import sys; got = sys.stdin.read();'
2163                    'sys.stdout.write("got %s"%got); sys.stderr.write("err")'],
2164                    stdin=temp_fds[1],
2165                    stdout=temp_fds[2],
2166                    stderr=temp_fds[0])
2167                p.wait()
2168            finally:
2169                self._restore_fds(saved_fds)
2170
2171            for fd in temp_fds:
2172                os.lseek(fd, 0, 0)
2173
2174            out = os.read(temp_fds[2], 1024)
2175            err = support.strip_python_stderr(os.read(temp_fds[0], 1024))
2176            self.assertEqual(out, b"got STDIN")
2177            self.assertEqual(err, b"err")
2178
2179        finally:
2180            for fd in temp_fds:
2181                os.close(fd)
2182
2183    def check_swap_fds(self, stdin_no, stdout_no, stderr_no):
2184        # open up some temporary files
2185        temps = [tempfile.mkstemp() for i in range(3)]
2186        temp_fds = [fd for fd, fname in temps]
2187        try:
2188            # unlink the files -- we won't need to reopen them
2189            for fd, fname in temps:
2190                os.unlink(fname)
2191
2192            # save a copy of the standard file descriptors
2193            saved_fds = self._save_fds(range(3))
2194            try:
2195                # duplicate the temp files over the standard fd's 0, 1, 2
2196                for fd, temp_fd in enumerate(temp_fds):
2197                    os.dup2(temp_fd, fd)
2198
2199                # write some data to what will become stdin, and rewind
2200                os.write(stdin_no, b"STDIN")
2201                os.lseek(stdin_no, 0, 0)
2202
2203                # now use those files in the given order, so that subprocess
2204                # has to rearrange them in the child
2205                p = subprocess.Popen([sys.executable, "-c",
2206                    'import sys; got = sys.stdin.read();'
2207                    'sys.stdout.write("got %s"%got); sys.stderr.write("err")'],
2208                    stdin=stdin_no,
2209                    stdout=stdout_no,
2210                    stderr=stderr_no)
2211                p.wait()
2212
2213                for fd in temp_fds:
2214                    os.lseek(fd, 0, 0)
2215
2216                out = os.read(stdout_no, 1024)
2217                err = support.strip_python_stderr(os.read(stderr_no, 1024))
2218            finally:
2219                self._restore_fds(saved_fds)
2220
2221            self.assertEqual(out, b"got STDIN")
2222            self.assertEqual(err, b"err")
2223
2224        finally:
2225            for fd in temp_fds:
2226                os.close(fd)
2227
2228    # When duping fds, if there arises a situation where one of the fds is
2229    # either 0, 1 or 2, it is possible that it is overwritten (#12607).
2230    # This tests all combinations of this.
2231    def test_swap_fds(self):
2232        self.check_swap_fds(0, 1, 2)
2233        self.check_swap_fds(0, 2, 1)
2234        self.check_swap_fds(1, 0, 2)
2235        self.check_swap_fds(1, 2, 0)
2236        self.check_swap_fds(2, 0, 1)
2237        self.check_swap_fds(2, 1, 0)
2238
2239    def _check_swap_std_fds_with_one_closed(self, from_fds, to_fds):
2240        saved_fds = self._save_fds(range(3))
2241        try:
2242            for from_fd in from_fds:
2243                with tempfile.TemporaryFile() as f:
2244                    os.dup2(f.fileno(), from_fd)
2245
2246            fd_to_close = (set(range(3)) - set(from_fds)).pop()
2247            os.close(fd_to_close)
2248
2249            arg_names = ['stdin', 'stdout', 'stderr']
2250            kwargs = {}
2251            for from_fd, to_fd in zip(from_fds, to_fds):
2252                kwargs[arg_names[to_fd]] = from_fd
2253
2254            code = textwrap.dedent(r'''
2255                import os, sys
2256                skipped_fd = int(sys.argv[1])
2257                for fd in range(3):
2258                    if fd != skipped_fd:
2259                        os.write(fd, str(fd).encode('ascii'))
2260            ''')
2261
2262            skipped_fd = (set(range(3)) - set(to_fds)).pop()
2263
2264            rc = subprocess.call([sys.executable, '-c', code, str(skipped_fd)],
2265                                 **kwargs)
2266            self.assertEqual(rc, 0)
2267
2268            for from_fd, to_fd in zip(from_fds, to_fds):
2269                os.lseek(from_fd, 0, os.SEEK_SET)
2270                read_bytes = os.read(from_fd, 1024)
2271                read_fds = list(map(int, read_bytes.decode('ascii')))
2272                msg = textwrap.dedent(f"""
2273                    When testing {from_fds} to {to_fds} redirection,
2274                    parent descriptor {from_fd} got redirected
2275                    to descriptor(s) {read_fds} instead of descriptor {to_fd}.
2276                """)
2277                self.assertEqual([to_fd], read_fds, msg)
2278        finally:
2279            self._restore_fds(saved_fds)
2280
2281    # Check that subprocess can remap std fds correctly even
2282    # if one of them is closed (#32844).
2283    def test_swap_std_fds_with_one_closed(self):
2284        for from_fds in itertools.combinations(range(3), 2):
2285            for to_fds in itertools.permutations(range(3), 2):
2286                self._check_swap_std_fds_with_one_closed(from_fds, to_fds)
2287
2288    def test_surrogates_error_message(self):
2289        def prepare():
2290            raise ValueError("surrogate:\uDCff")
2291
2292        try:
2293            subprocess.call(
2294                ZERO_RETURN_CMD,
2295                preexec_fn=prepare)
2296        except ValueError as err:
2297            # Pure Python implementations keeps the message
2298            self.assertIsNone(subprocess._posixsubprocess)
2299            self.assertEqual(str(err), "surrogate:\uDCff")
2300        except subprocess.SubprocessError as err:
2301            # _posixsubprocess uses a default message
2302            self.assertIsNotNone(subprocess._posixsubprocess)
2303            self.assertEqual(str(err), "Exception occurred in preexec_fn.")
2304        else:
2305            self.fail("Expected ValueError or subprocess.SubprocessError")
2306
2307    def test_undecodable_env(self):
2308        for key, value in (('test', 'abc\uDCFF'), ('test\uDCFF', '42')):
2309            encoded_value = value.encode("ascii", "surrogateescape")
2310
2311            # test str with surrogates
2312            script = "import os; print(ascii(os.getenv(%s)))" % repr(key)
2313            env = os.environ.copy()
2314            env[key] = value
2315            # Use C locale to get ASCII for the locale encoding to force
2316            # surrogate-escaping of \xFF in the child process
2317            env['LC_ALL'] = 'C'
2318            decoded_value = value
2319            stdout = subprocess.check_output(
2320                [sys.executable, "-c", script],
2321                env=env)
2322            stdout = stdout.rstrip(b'\n\r')
2323            self.assertEqual(stdout.decode('ascii'), ascii(decoded_value))
2324
2325            # test bytes
2326            key = key.encode("ascii", "surrogateescape")
2327            script = "import os; print(ascii(os.getenvb(%s)))" % repr(key)
2328            env = os.environ.copy()
2329            env[key] = encoded_value
2330            stdout = subprocess.check_output(
2331                [sys.executable, "-c", script],
2332                env=env)
2333            stdout = stdout.rstrip(b'\n\r')
2334            self.assertEqual(stdout.decode('ascii'), ascii(encoded_value))
2335
2336    def test_bytes_program(self):
2337        abs_program = os.fsencode(ZERO_RETURN_CMD[0])
2338        args = list(ZERO_RETURN_CMD[1:])
2339        path, program = os.path.split(ZERO_RETURN_CMD[0])
2340        program = os.fsencode(program)
2341
2342        # absolute bytes path
2343        exitcode = subprocess.call([abs_program]+args)
2344        self.assertEqual(exitcode, 0)
2345
2346        # absolute bytes path as a string
2347        cmd = b"'%s' %s" % (abs_program, " ".join(args).encode("utf-8"))
2348        exitcode = subprocess.call(cmd, shell=True)
2349        self.assertEqual(exitcode, 0)
2350
2351        # bytes program, unicode PATH
2352        env = os.environ.copy()
2353        env["PATH"] = path
2354        exitcode = subprocess.call([program]+args, env=env)
2355        self.assertEqual(exitcode, 0)
2356
2357        # bytes program, bytes PATH
2358        envb = os.environb.copy()
2359        envb[b"PATH"] = os.fsencode(path)
2360        exitcode = subprocess.call([program]+args, env=envb)
2361        self.assertEqual(exitcode, 0)
2362
2363    def test_pipe_cloexec(self):
2364        sleeper = support.findfile("input_reader.py", subdir="subprocessdata")
2365        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2366
2367        p1 = subprocess.Popen([sys.executable, sleeper],
2368                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
2369                              stderr=subprocess.PIPE, close_fds=False)
2370
2371        self.addCleanup(p1.communicate, b'')
2372
2373        p2 = subprocess.Popen([sys.executable, fd_status],
2374                              stdout=subprocess.PIPE, close_fds=False)
2375
2376        output, error = p2.communicate()
2377        result_fds = set(map(int, output.split(b',')))
2378        unwanted_fds = set([p1.stdin.fileno(), p1.stdout.fileno(),
2379                            p1.stderr.fileno()])
2380
2381        self.assertFalse(result_fds & unwanted_fds,
2382                         "Expected no fds from %r to be open in child, "
2383                         "found %r" %
2384                              (unwanted_fds, result_fds & unwanted_fds))
2385
2386    def test_pipe_cloexec_real_tools(self):
2387        qcat = support.findfile("qcat.py", subdir="subprocessdata")
2388        qgrep = support.findfile("qgrep.py", subdir="subprocessdata")
2389
2390        subdata = b'zxcvbn'
2391        data = subdata * 4 + b'\n'
2392
2393        p1 = subprocess.Popen([sys.executable, qcat],
2394                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
2395                              close_fds=False)
2396
2397        p2 = subprocess.Popen([sys.executable, qgrep, subdata],
2398                              stdin=p1.stdout, stdout=subprocess.PIPE,
2399                              close_fds=False)
2400
2401        self.addCleanup(p1.wait)
2402        self.addCleanup(p2.wait)
2403        def kill_p1():
2404            try:
2405                p1.terminate()
2406            except ProcessLookupError:
2407                pass
2408        def kill_p2():
2409            try:
2410                p2.terminate()
2411            except ProcessLookupError:
2412                pass
2413        self.addCleanup(kill_p1)
2414        self.addCleanup(kill_p2)
2415
2416        p1.stdin.write(data)
2417        p1.stdin.close()
2418
2419        readfiles, ignored1, ignored2 = select.select([p2.stdout], [], [], 10)
2420
2421        self.assertTrue(readfiles, "The child hung")
2422        self.assertEqual(p2.stdout.read(), data)
2423
2424        p1.stdout.close()
2425        p2.stdout.close()
2426
2427    def test_close_fds(self):
2428        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2429
2430        fds = os.pipe()
2431        self.addCleanup(os.close, fds[0])
2432        self.addCleanup(os.close, fds[1])
2433
2434        open_fds = set(fds)
2435        # add a bunch more fds
2436        for _ in range(9):
2437            fd = os.open(os.devnull, os.O_RDONLY)
2438            self.addCleanup(os.close, fd)
2439            open_fds.add(fd)
2440
2441        for fd in open_fds:
2442            os.set_inheritable(fd, True)
2443
2444        p = subprocess.Popen([sys.executable, fd_status],
2445                             stdout=subprocess.PIPE, close_fds=False)
2446        output, ignored = p.communicate()
2447        remaining_fds = set(map(int, output.split(b',')))
2448
2449        self.assertEqual(remaining_fds & open_fds, open_fds,
2450                         "Some fds were closed")
2451
2452        p = subprocess.Popen([sys.executable, fd_status],
2453                             stdout=subprocess.PIPE, close_fds=True)
2454        output, ignored = p.communicate()
2455        remaining_fds = set(map(int, output.split(b',')))
2456
2457        self.assertFalse(remaining_fds & open_fds,
2458                         "Some fds were left open")
2459        self.assertIn(1, remaining_fds, "Subprocess failed")
2460
2461        # Keep some of the fd's we opened open in the subprocess.
2462        # This tests _posixsubprocess.c's proper handling of fds_to_keep.
2463        fds_to_keep = set(open_fds.pop() for _ in range(8))
2464        p = subprocess.Popen([sys.executable, fd_status],
2465                             stdout=subprocess.PIPE, close_fds=True,
2466                             pass_fds=fds_to_keep)
2467        output, ignored = p.communicate()
2468        remaining_fds = set(map(int, output.split(b',')))
2469
2470        self.assertFalse((remaining_fds - fds_to_keep) & open_fds,
2471                         "Some fds not in pass_fds were left open")
2472        self.assertIn(1, remaining_fds, "Subprocess failed")
2473
2474
2475    @unittest.skipIf(sys.platform.startswith("freebsd") and
2476                     os.stat("/dev").st_dev == os.stat("/dev/fd").st_dev,
2477                     "Requires fdescfs mounted on /dev/fd on FreeBSD.")
2478    def test_close_fds_when_max_fd_is_lowered(self):
2479        """Confirm that issue21618 is fixed (may fail under valgrind)."""
2480        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2481
2482        # This launches the meat of the test in a child process to
2483        # avoid messing with the larger unittest processes maximum
2484        # number of file descriptors.
2485        #  This process launches:
2486        #  +--> Process that lowers its RLIMIT_NOFILE aftr setting up
2487        #    a bunch of high open fds above the new lower rlimit.
2488        #    Those are reported via stdout before launching a new
2489        #    process with close_fds=False to run the actual test:
2490        #    +--> The TEST: This one launches a fd_status.py
2491        #      subprocess with close_fds=True so we can find out if
2492        #      any of the fds above the lowered rlimit are still open.
2493        p = subprocess.Popen([sys.executable, '-c', textwrap.dedent(
2494        '''
2495        import os, resource, subprocess, sys, textwrap
2496        open_fds = set()
2497        # Add a bunch more fds to pass down.
2498        for _ in range(40):
2499            fd = os.open(os.devnull, os.O_RDONLY)
2500            open_fds.add(fd)
2501
2502        # Leave a two pairs of low ones available for use by the
2503        # internal child error pipe and the stdout pipe.
2504        # We also leave 10 more open as some Python buildbots run into
2505        # "too many open files" errors during the test if we do not.
2506        for fd in sorted(open_fds)[:14]:
2507            os.close(fd)
2508            open_fds.remove(fd)
2509
2510        for fd in open_fds:
2511            #self.addCleanup(os.close, fd)
2512            os.set_inheritable(fd, True)
2513
2514        max_fd_open = max(open_fds)
2515
2516        # Communicate the open_fds to the parent unittest.TestCase process.
2517        print(','.join(map(str, sorted(open_fds))))
2518        sys.stdout.flush()
2519
2520        rlim_cur, rlim_max = resource.getrlimit(resource.RLIMIT_NOFILE)
2521        try:
2522            # 29 is lower than the highest fds we are leaving open.
2523            resource.setrlimit(resource.RLIMIT_NOFILE, (29, rlim_max))
2524            # Launch a new Python interpreter with our low fd rlim_cur that
2525            # inherits open fds above that limit.  It then uses subprocess
2526            # with close_fds=True to get a report of open fds in the child.
2527            # An explicit list of fds to check is passed to fd_status.py as
2528            # letting fd_status rely on its default logic would miss the
2529            # fds above rlim_cur as it normally only checks up to that limit.
2530            subprocess.Popen(
2531                [sys.executable, '-c',
2532                 textwrap.dedent("""
2533                     import subprocess, sys
2534                     subprocess.Popen([sys.executable, %r] +
2535                                      [str(x) for x in range({max_fd})],
2536                                      close_fds=True).wait()
2537                     """.format(max_fd=max_fd_open+1))],
2538                close_fds=False).wait()
2539        finally:
2540            resource.setrlimit(resource.RLIMIT_NOFILE, (rlim_cur, rlim_max))
2541        ''' % fd_status)], stdout=subprocess.PIPE)
2542
2543        output, unused_stderr = p.communicate()
2544        output_lines = output.splitlines()
2545        self.assertEqual(len(output_lines), 2,
2546                         msg="expected exactly two lines of output:\n%r" % output)
2547        opened_fds = set(map(int, output_lines[0].strip().split(b',')))
2548        remaining_fds = set(map(int, output_lines[1].strip().split(b',')))
2549
2550        self.assertFalse(remaining_fds & opened_fds,
2551                         msg="Some fds were left open.")
2552
2553
2554    # Mac OS X Tiger (10.4) has a kernel bug: sometimes, the file
2555    # descriptor of a pipe closed in the parent process is valid in the
2556    # child process according to fstat(), but the mode of the file
2557    # descriptor is invalid, and read or write raise an error.
2558    @support.requires_mac_ver(10, 5)
2559    def test_pass_fds(self):
2560        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2561
2562        open_fds = set()
2563
2564        for x in range(5):
2565            fds = os.pipe()
2566            self.addCleanup(os.close, fds[0])
2567            self.addCleanup(os.close, fds[1])
2568            os.set_inheritable(fds[0], True)
2569            os.set_inheritable(fds[1], True)
2570            open_fds.update(fds)
2571
2572        for fd in open_fds:
2573            p = subprocess.Popen([sys.executable, fd_status],
2574                                 stdout=subprocess.PIPE, close_fds=True,
2575                                 pass_fds=(fd, ))
2576            output, ignored = p.communicate()
2577
2578            remaining_fds = set(map(int, output.split(b',')))
2579            to_be_closed = open_fds - {fd}
2580
2581            self.assertIn(fd, remaining_fds, "fd to be passed not passed")
2582            self.assertFalse(remaining_fds & to_be_closed,
2583                             "fd to be closed passed")
2584
2585            # pass_fds overrides close_fds with a warning.
2586            with self.assertWarns(RuntimeWarning) as context:
2587                self.assertFalse(subprocess.call(
2588                        ZERO_RETURN_CMD,
2589                        close_fds=False, pass_fds=(fd, )))
2590            self.assertIn('overriding close_fds', str(context.warning))
2591
2592    def test_pass_fds_inheritable(self):
2593        script = support.findfile("fd_status.py", subdir="subprocessdata")
2594
2595        inheritable, non_inheritable = os.pipe()
2596        self.addCleanup(os.close, inheritable)
2597        self.addCleanup(os.close, non_inheritable)
2598        os.set_inheritable(inheritable, True)
2599        os.set_inheritable(non_inheritable, False)
2600        pass_fds = (inheritable, non_inheritable)
2601        args = [sys.executable, script]
2602        args += list(map(str, pass_fds))
2603
2604        p = subprocess.Popen(args,
2605                             stdout=subprocess.PIPE, close_fds=True,
2606                             pass_fds=pass_fds)
2607        output, ignored = p.communicate()
2608        fds = set(map(int, output.split(b',')))
2609
2610        # the inheritable file descriptor must be inherited, so its inheritable
2611        # flag must be set in the child process after fork() and before exec()
2612        self.assertEqual(fds, set(pass_fds), "output=%a" % output)
2613
2614        # inheritable flag must not be changed in the parent process
2615        self.assertEqual(os.get_inheritable(inheritable), True)
2616        self.assertEqual(os.get_inheritable(non_inheritable), False)
2617
2618
2619    # bpo-32270: Ensure that descriptors specified in pass_fds
2620    # are inherited even if they are used in redirections.
2621    # Contributed by @izbyshev.
2622    def test_pass_fds_redirected(self):
2623        """Regression test for https://bugs.python.org/issue32270."""
2624        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2625        pass_fds = []
2626        for _ in range(2):
2627            fd = os.open(os.devnull, os.O_RDWR)
2628            self.addCleanup(os.close, fd)
2629            pass_fds.append(fd)
2630
2631        stdout_r, stdout_w = os.pipe()
2632        self.addCleanup(os.close, stdout_r)
2633        self.addCleanup(os.close, stdout_w)
2634        pass_fds.insert(1, stdout_w)
2635
2636        with subprocess.Popen([sys.executable, fd_status],
2637                              stdin=pass_fds[0],
2638                              stdout=pass_fds[1],
2639                              stderr=pass_fds[2],
2640                              close_fds=True,
2641                              pass_fds=pass_fds):
2642            output = os.read(stdout_r, 1024)
2643        fds = {int(num) for num in output.split(b',')}
2644
2645        self.assertEqual(fds, {0, 1, 2} | frozenset(pass_fds), f"output={output!a}")
2646
2647
2648    def test_stdout_stdin_are_single_inout_fd(self):
2649        with io.open(os.devnull, "r+") as inout:
2650            p = subprocess.Popen(ZERO_RETURN_CMD,
2651                                 stdout=inout, stdin=inout)
2652            p.wait()
2653
2654    def test_stdout_stderr_are_single_inout_fd(self):
2655        with io.open(os.devnull, "r+") as inout:
2656            p = subprocess.Popen(ZERO_RETURN_CMD,
2657                                 stdout=inout, stderr=inout)
2658            p.wait()
2659
2660    def test_stderr_stdin_are_single_inout_fd(self):
2661        with io.open(os.devnull, "r+") as inout:
2662            p = subprocess.Popen(ZERO_RETURN_CMD,
2663                                 stderr=inout, stdin=inout)
2664            p.wait()
2665
2666    def test_wait_when_sigchild_ignored(self):
2667        # NOTE: sigchild_ignore.py may not be an effective test on all OSes.
2668        sigchild_ignore = support.findfile("sigchild_ignore.py",
2669                                           subdir="subprocessdata")
2670        p = subprocess.Popen([sys.executable, sigchild_ignore],
2671                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
2672        stdout, stderr = p.communicate()
2673        self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"
2674                         " non-zero with this error:\n%s" %
2675                         stderr.decode('utf-8'))
2676
2677    def test_select_unbuffered(self):
2678        # Issue #11459: bufsize=0 should really set the pipes as
2679        # unbuffered (and therefore let select() work properly).
2680        select = support.import_module("select")
2681        p = subprocess.Popen([sys.executable, "-c",
2682                              'import sys;'
2683                              'sys.stdout.write("apple")'],
2684                             stdout=subprocess.PIPE,
2685                             bufsize=0)
2686        f = p.stdout
2687        self.addCleanup(f.close)
2688        try:
2689            self.assertEqual(f.read(4), b"appl")
2690            self.assertIn(f, select.select([f], [], [], 0.0)[0])
2691        finally:
2692            p.wait()
2693
2694    def test_zombie_fast_process_del(self):
2695        # Issue #12650: on Unix, if Popen.__del__() was called before the
2696        # process exited, it wouldn't be added to subprocess._active, and would
2697        # remain a zombie.
2698        # spawn a Popen, and delete its reference before it exits
2699        p = subprocess.Popen([sys.executable, "-c",
2700                              'import sys, time;'
2701                              'time.sleep(0.2)'],
2702                             stdout=subprocess.PIPE,
2703                             stderr=subprocess.PIPE)
2704        self.addCleanup(p.stdout.close)
2705        self.addCleanup(p.stderr.close)
2706        ident = id(p)
2707        pid = p.pid
2708        with support.check_warnings(('', ResourceWarning)):
2709            p = None
2710
2711        if mswindows:
2712            # subprocess._active is not used on Windows and is set to None.
2713            self.assertIsNone(subprocess._active)
2714        else:
2715            # check that p is in the active processes list
2716            self.assertIn(ident, [id(o) for o in subprocess._active])
2717
2718    def test_leak_fast_process_del_killed(self):
2719        # Issue #12650: on Unix, if Popen.__del__() was called before the
2720        # process exited, and the process got killed by a signal, it would never
2721        # be removed from subprocess._active, which triggered a FD and memory
2722        # leak.
2723        # spawn a Popen, delete its reference and kill it
2724        p = subprocess.Popen([sys.executable, "-c",
2725                              'import time;'
2726                              'time.sleep(3)'],
2727                             stdout=subprocess.PIPE,
2728                             stderr=subprocess.PIPE)
2729        self.addCleanup(p.stdout.close)
2730        self.addCleanup(p.stderr.close)
2731        ident = id(p)
2732        pid = p.pid
2733        with support.check_warnings(('', ResourceWarning)):
2734            p = None
2735
2736        os.kill(pid, signal.SIGKILL)
2737        if mswindows:
2738            # subprocess._active is not used on Windows and is set to None.
2739            self.assertIsNone(subprocess._active)
2740        else:
2741            # check that p is in the active processes list
2742            self.assertIn(ident, [id(o) for o in subprocess._active])
2743
2744        # let some time for the process to exit, and create a new Popen: this
2745        # should trigger the wait() of p
2746        time.sleep(0.2)
2747        with self.assertRaises(OSError):
2748            with subprocess.Popen(NONEXISTING_CMD,
2749                                  stdout=subprocess.PIPE,
2750                                  stderr=subprocess.PIPE) as proc:
2751                pass
2752        # p should have been wait()ed on, and removed from the _active list
2753        self.assertRaises(OSError, os.waitpid, pid, 0)
2754        if mswindows:
2755            # subprocess._active is not used on Windows and is set to None.
2756            self.assertIsNone(subprocess._active)
2757        else:
2758            self.assertNotIn(ident, [id(o) for o in subprocess._active])
2759
2760    def test_close_fds_after_preexec(self):
2761        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2762
2763        # this FD is used as dup2() target by preexec_fn, and should be closed
2764        # in the child process
2765        fd = os.dup(1)
2766        self.addCleanup(os.close, fd)
2767
2768        p = subprocess.Popen([sys.executable, fd_status],
2769                             stdout=subprocess.PIPE, close_fds=True,
2770                             preexec_fn=lambda: os.dup2(1, fd))
2771        output, ignored = p.communicate()
2772
2773        remaining_fds = set(map(int, output.split(b',')))
2774
2775        self.assertNotIn(fd, remaining_fds)
2776
2777    @support.cpython_only
2778    def test_fork_exec(self):
2779        # Issue #22290: fork_exec() must not crash on memory allocation failure
2780        # or other errors
2781        import _posixsubprocess
2782        gc_enabled = gc.isenabled()
2783        try:
2784            # Use a preexec function and enable the garbage collector
2785            # to force fork_exec() to re-enable the garbage collector
2786            # on error.
2787            func = lambda: None
2788            gc.enable()
2789
2790            for args, exe_list, cwd, env_list in (
2791                (123,      [b"exe"], None, [b"env"]),
2792                ([b"arg"], 123,      None, [b"env"]),
2793                ([b"arg"], [b"exe"], 123,  [b"env"]),
2794                ([b"arg"], [b"exe"], None, 123),
2795            ):
2796                with self.assertRaises(TypeError):
2797                    _posixsubprocess.fork_exec(
2798                        args, exe_list,
2799                        True, (), cwd, env_list,
2800                        -1, -1, -1, -1,
2801                        1, 2, 3, 4,
2802                        True, True, func)
2803        finally:
2804            if not gc_enabled:
2805                gc.disable()
2806
2807    @support.cpython_only
2808    def test_fork_exec_sorted_fd_sanity_check(self):
2809        # Issue #23564: sanity check the fork_exec() fds_to_keep sanity check.
2810        import _posixsubprocess
2811        class BadInt:
2812            first = True
2813            def __init__(self, value):
2814                self.value = value
2815            def __int__(self):
2816                if self.first:
2817                    self.first = False
2818                    return self.value
2819                raise ValueError
2820
2821        gc_enabled = gc.isenabled()
2822        try:
2823            gc.enable()
2824
2825            for fds_to_keep in (
2826                (-1, 2, 3, 4, 5),  # Negative number.
2827                ('str', 4),  # Not an int.
2828                (18, 23, 42, 2**63),  # Out of range.
2829                (5, 4),  # Not sorted.
2830                (6, 7, 7, 8),  # Duplicate.
2831                (BadInt(1), BadInt(2)),
2832            ):
2833                with self.assertRaises(
2834                        ValueError,
2835                        msg='fds_to_keep={}'.format(fds_to_keep)) as c:
2836                    _posixsubprocess.fork_exec(
2837                        [b"false"], [b"false"],
2838                        True, fds_to_keep, None, [b"env"],
2839                        -1, -1, -1, -1,
2840                        1, 2, 3, 4,
2841                        True, True, None)
2842                self.assertIn('fds_to_keep', str(c.exception))
2843        finally:
2844            if not gc_enabled:
2845                gc.disable()
2846
2847    def test_communicate_BrokenPipeError_stdin_close(self):
2848        # By not setting stdout or stderr or a timeout we force the fast path
2849        # that just calls _stdin_write() internally due to our mock.
2850        proc = subprocess.Popen(ZERO_RETURN_CMD)
2851        with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
2852            mock_proc_stdin.close.side_effect = BrokenPipeError
2853            proc.communicate()  # Should swallow BrokenPipeError from close.
2854            mock_proc_stdin.close.assert_called_with()
2855
2856    def test_communicate_BrokenPipeError_stdin_write(self):
2857        # By not setting stdout or stderr or a timeout we force the fast path
2858        # that just calls _stdin_write() internally due to our mock.
2859        proc = subprocess.Popen(ZERO_RETURN_CMD)
2860        with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
2861            mock_proc_stdin.write.side_effect = BrokenPipeError
2862            proc.communicate(b'stuff')  # Should swallow the BrokenPipeError.
2863            mock_proc_stdin.write.assert_called_once_with(b'stuff')
2864            mock_proc_stdin.close.assert_called_once_with()
2865
2866    def test_communicate_BrokenPipeError_stdin_flush(self):
2867        # Setting stdin and stdout forces the ._communicate() code path.
2868        # python -h exits faster than python -c pass (but spams stdout).
2869        proc = subprocess.Popen([sys.executable, '-h'],
2870                                stdin=subprocess.PIPE,
2871                                stdout=subprocess.PIPE)
2872        with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin, \
2873                open(os.devnull, 'wb') as dev_null:
2874            mock_proc_stdin.flush.side_effect = BrokenPipeError
2875            # because _communicate registers a selector using proc.stdin...
2876            mock_proc_stdin.fileno.return_value = dev_null.fileno()
2877            # _communicate() should swallow BrokenPipeError from flush.
2878            proc.communicate(b'stuff')
2879            mock_proc_stdin.flush.assert_called_once_with()
2880
2881    def test_communicate_BrokenPipeError_stdin_close_with_timeout(self):
2882        # Setting stdin and stdout forces the ._communicate() code path.
2883        # python -h exits faster than python -c pass (but spams stdout).
2884        proc = subprocess.Popen([sys.executable, '-h'],
2885                                stdin=subprocess.PIPE,
2886                                stdout=subprocess.PIPE)
2887        with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
2888            mock_proc_stdin.close.side_effect = BrokenPipeError
2889            # _communicate() should swallow BrokenPipeError from close.
2890            proc.communicate(timeout=999)
2891            mock_proc_stdin.close.assert_called_once_with()
2892
2893    @unittest.skipUnless(_testcapi is not None
2894                         and hasattr(_testcapi, 'W_STOPCODE'),
2895                         'need _testcapi.W_STOPCODE')
2896    def test_stopped(self):
2897        """Test wait() behavior when waitpid returns WIFSTOPPED; issue29335."""
2898        args = ZERO_RETURN_CMD
2899        proc = subprocess.Popen(args)
2900
2901        # Wait until the real process completes to avoid zombie process
2902        pid = proc.pid
2903        pid, status = os.waitpid(pid, 0)
2904        self.assertEqual(status, 0)
2905
2906        status = _testcapi.W_STOPCODE(3)
2907        with mock.patch('subprocess.os.waitpid', return_value=(pid, status)):
2908            returncode = proc.wait()
2909
2910        self.assertEqual(returncode, -3)
2911
2912    def test_communicate_repeated_call_after_stdout_close(self):
2913        proc = subprocess.Popen([sys.executable, '-c',
2914                                 'import os, time; os.close(1), time.sleep(2)'],
2915                                stdout=subprocess.PIPE)
2916        while True:
2917            try:
2918                proc.communicate(timeout=0.1)
2919                return
2920            except subprocess.TimeoutExpired:
2921                pass
2922
2923
2924@unittest.skipUnless(mswindows, "Windows specific tests")
2925class Win32ProcessTestCase(BaseTestCase):
2926
2927    def test_startupinfo(self):
2928        # startupinfo argument
2929        # We uses hardcoded constants, because we do not want to
2930        # depend on win32all.
2931        STARTF_USESHOWWINDOW = 1
2932        SW_MAXIMIZE = 3
2933        startupinfo = subprocess.STARTUPINFO()
2934        startupinfo.dwFlags = STARTF_USESHOWWINDOW
2935        startupinfo.wShowWindow = SW_MAXIMIZE
2936        # Since Python is a console process, it won't be affected
2937        # by wShowWindow, but the argument should be silently
2938        # ignored
2939        subprocess.call(ZERO_RETURN_CMD,
2940                        startupinfo=startupinfo)
2941
2942    def test_startupinfo_keywords(self):
2943        # startupinfo argument
2944        # We use hardcoded constants, because we do not want to
2945        # depend on win32all.
2946        STARTF_USERSHOWWINDOW = 1
2947        SW_MAXIMIZE = 3
2948        startupinfo = subprocess.STARTUPINFO(
2949            dwFlags=STARTF_USERSHOWWINDOW,
2950            wShowWindow=SW_MAXIMIZE
2951        )
2952        # Since Python is a console process, it won't be affected
2953        # by wShowWindow, but the argument should be silently
2954        # ignored
2955        subprocess.call(ZERO_RETURN_CMD,
2956                        startupinfo=startupinfo)
2957
2958    def test_startupinfo_copy(self):
2959        # bpo-34044: Popen must not modify input STARTUPINFO structure
2960        startupinfo = subprocess.STARTUPINFO()
2961        startupinfo.dwFlags = subprocess.STARTF_USESHOWWINDOW
2962        startupinfo.wShowWindow = subprocess.SW_HIDE
2963
2964        # Call Popen() twice with the same startupinfo object to make sure
2965        # that it's not modified
2966        for _ in range(2):
2967            cmd = ZERO_RETURN_CMD
2968            with open(os.devnull, 'w') as null:
2969                proc = subprocess.Popen(cmd,
2970                                        stdout=null,
2971                                        stderr=subprocess.STDOUT,
2972                                        startupinfo=startupinfo)
2973                with proc:
2974                    proc.communicate()
2975                self.assertEqual(proc.returncode, 0)
2976
2977            self.assertEqual(startupinfo.dwFlags,
2978                             subprocess.STARTF_USESHOWWINDOW)
2979            self.assertIsNone(startupinfo.hStdInput)
2980            self.assertIsNone(startupinfo.hStdOutput)
2981            self.assertIsNone(startupinfo.hStdError)
2982            self.assertEqual(startupinfo.wShowWindow, subprocess.SW_HIDE)
2983            self.assertEqual(startupinfo.lpAttributeList, {"handle_list": []})
2984
2985    def test_creationflags(self):
2986        # creationflags argument
2987        CREATE_NEW_CONSOLE = 16
2988        sys.stderr.write("    a DOS box should flash briefly ...\n")
2989        subprocess.call(sys.executable +
2990                        ' -c "import time; time.sleep(0.25)"',
2991                        creationflags=CREATE_NEW_CONSOLE)
2992
2993    def test_invalid_args(self):
2994        # invalid arguments should raise ValueError
2995        self.assertRaises(ValueError, subprocess.call,
2996                          [sys.executable, "-c",
2997                           "import sys; sys.exit(47)"],
2998                          preexec_fn=lambda: 1)
2999
3000    @support.cpython_only
3001    def test_issue31471(self):
3002        # There shouldn't be an assertion failure in Popen() in case the env
3003        # argument has a bad keys() method.
3004        class BadEnv(dict):
3005            keys = None
3006        with self.assertRaises(TypeError):
3007            subprocess.Popen(ZERO_RETURN_CMD, env=BadEnv())
3008
3009    def test_close_fds(self):
3010        # close file descriptors
3011        rc = subprocess.call([sys.executable, "-c",
3012                              "import sys; sys.exit(47)"],
3013                              close_fds=True)
3014        self.assertEqual(rc, 47)
3015
3016    def test_close_fds_with_stdio(self):
3017        import msvcrt
3018
3019        fds = os.pipe()
3020        self.addCleanup(os.close, fds[0])
3021        self.addCleanup(os.close, fds[1])
3022
3023        handles = []
3024        for fd in fds:
3025            os.set_inheritable(fd, True)
3026            handles.append(msvcrt.get_osfhandle(fd))
3027
3028        p = subprocess.Popen([sys.executable, "-c",
3029                              "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])],
3030                             stdout=subprocess.PIPE, close_fds=False)
3031        stdout, stderr = p.communicate()
3032        self.assertEqual(p.returncode, 0)
3033        int(stdout.strip())  # Check that stdout is an integer
3034
3035        p = subprocess.Popen([sys.executable, "-c",
3036                              "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])],
3037                             stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
3038        stdout, stderr = p.communicate()
3039        self.assertEqual(p.returncode, 1)
3040        self.assertIn(b"OSError", stderr)
3041
3042        # The same as the previous call, but with an empty handle_list
3043        handle_list = []
3044        startupinfo = subprocess.STARTUPINFO()
3045        startupinfo.lpAttributeList = {"handle_list": handle_list}
3046        p = subprocess.Popen([sys.executable, "-c",
3047                              "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])],
3048                             stdout=subprocess.PIPE, stderr=subprocess.PIPE,
3049                             startupinfo=startupinfo, close_fds=True)
3050        stdout, stderr = p.communicate()
3051        self.assertEqual(p.returncode, 1)
3052        self.assertIn(b"OSError", stderr)
3053
3054        # Check for a warning due to using handle_list and close_fds=False
3055        with support.check_warnings((".*overriding close_fds", RuntimeWarning)):
3056            startupinfo = subprocess.STARTUPINFO()
3057            startupinfo.lpAttributeList = {"handle_list": handles[:]}
3058            p = subprocess.Popen([sys.executable, "-c",
3059                                  "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])],
3060                                 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
3061                                 startupinfo=startupinfo, close_fds=False)
3062            stdout, stderr = p.communicate()
3063            self.assertEqual(p.returncode, 0)
3064
3065    def test_empty_attribute_list(self):
3066        startupinfo = subprocess.STARTUPINFO()
3067        startupinfo.lpAttributeList = {}
3068        subprocess.call(ZERO_RETURN_CMD,
3069                        startupinfo=startupinfo)
3070
3071    def test_empty_handle_list(self):
3072        startupinfo = subprocess.STARTUPINFO()
3073        startupinfo.lpAttributeList = {"handle_list": []}
3074        subprocess.call(ZERO_RETURN_CMD,
3075                        startupinfo=startupinfo)
3076
3077    def test_shell_sequence(self):
3078        # Run command through the shell (sequence)
3079        newenv = os.environ.copy()
3080        newenv["FRUIT"] = "physalis"
3081        p = subprocess.Popen(["set"], shell=1,
3082                             stdout=subprocess.PIPE,
3083                             env=newenv)
3084        with p:
3085            self.assertIn(b"physalis", p.stdout.read())
3086
3087    def test_shell_string(self):
3088        # Run command through the shell (string)
3089        newenv = os.environ.copy()
3090        newenv["FRUIT"] = "physalis"
3091        p = subprocess.Popen("set", shell=1,
3092                             stdout=subprocess.PIPE,
3093                             env=newenv)
3094        with p:
3095            self.assertIn(b"physalis", p.stdout.read())
3096
3097    def test_shell_encodings(self):
3098        # Run command through the shell (string)
3099        for enc in ['ansi', 'oem']:
3100            newenv = os.environ.copy()
3101            newenv["FRUIT"] = "physalis"
3102            p = subprocess.Popen("set", shell=1,
3103                                 stdout=subprocess.PIPE,
3104                                 env=newenv,
3105                                 encoding=enc)
3106            with p:
3107                self.assertIn("physalis", p.stdout.read(), enc)
3108
3109    def test_call_string(self):
3110        # call() function with string argument on Windows
3111        rc = subprocess.call(sys.executable +
3112                             ' -c "import sys; sys.exit(47)"')
3113        self.assertEqual(rc, 47)
3114
3115    def _kill_process(self, method, *args):
3116        # Some win32 buildbot raises EOFError if stdin is inherited
3117        p = subprocess.Popen([sys.executable, "-c", """if 1:
3118                             import sys, time
3119                             sys.stdout.write('x\\n')
3120                             sys.stdout.flush()
3121                             time.sleep(30)
3122                             """],
3123                             stdin=subprocess.PIPE,
3124                             stdout=subprocess.PIPE,
3125                             stderr=subprocess.PIPE)
3126        with p:
3127            # Wait for the interpreter to be completely initialized before
3128            # sending any signal.
3129            p.stdout.read(1)
3130            getattr(p, method)(*args)
3131            _, stderr = p.communicate()
3132            self.assertStderrEqual(stderr, b'')
3133            returncode = p.wait()
3134        self.assertNotEqual(returncode, 0)
3135
3136    def _kill_dead_process(self, method, *args):
3137        p = subprocess.Popen([sys.executable, "-c", """if 1:
3138                             import sys, time
3139                             sys.stdout.write('x\\n')
3140                             sys.stdout.flush()
3141                             sys.exit(42)
3142                             """],
3143                             stdin=subprocess.PIPE,
3144                             stdout=subprocess.PIPE,
3145                             stderr=subprocess.PIPE)
3146        with p:
3147            # Wait for the interpreter to be completely initialized before
3148            # sending any signal.
3149            p.stdout.read(1)
3150            # The process should end after this
3151            time.sleep(1)
3152            # This shouldn't raise even though the child is now dead
3153            getattr(p, method)(*args)
3154            _, stderr = p.communicate()
3155            self.assertStderrEqual(stderr, b'')
3156            rc = p.wait()
3157        self.assertEqual(rc, 42)
3158
3159    def test_send_signal(self):
3160        self._kill_process('send_signal', signal.SIGTERM)
3161
3162    def test_kill(self):
3163        self._kill_process('kill')
3164
3165    def test_terminate(self):
3166        self._kill_process('terminate')
3167
3168    def test_send_signal_dead(self):
3169        self._kill_dead_process('send_signal', signal.SIGTERM)
3170
3171    def test_kill_dead(self):
3172        self._kill_dead_process('kill')
3173
3174    def test_terminate_dead(self):
3175        self._kill_dead_process('terminate')
3176
3177class MiscTests(unittest.TestCase):
3178
3179    class RecordingPopen(subprocess.Popen):
3180        """A Popen that saves a reference to each instance for testing."""
3181        instances_created = []
3182
3183        def __init__(self, *args, **kwargs):
3184            super().__init__(*args, **kwargs)
3185            self.instances_created.append(self)
3186
3187    @mock.patch.object(subprocess.Popen, "_communicate")
3188    def _test_keyboardinterrupt_no_kill(self, popener, mock__communicate,
3189                                        **kwargs):
3190        """Fake a SIGINT happening during Popen._communicate() and ._wait().
3191
3192        This avoids the need to actually try and get test environments to send
3193        and receive signals reliably across platforms.  The net effect of a ^C
3194        happening during a blocking subprocess execution which we want to clean
3195        up from is a KeyboardInterrupt coming out of communicate() or wait().
3196        """
3197
3198        mock__communicate.side_effect = KeyboardInterrupt
3199        try:
3200            with mock.patch.object(subprocess.Popen, "_wait") as mock__wait:
3201                # We patch out _wait() as no signal was involved so the
3202                # child process isn't actually going to exit rapidly.
3203                mock__wait.side_effect = KeyboardInterrupt
3204                with mock.patch.object(subprocess, "Popen",
3205                                       self.RecordingPopen):
3206                    with self.assertRaises(KeyboardInterrupt):
3207                        popener([sys.executable, "-c",
3208                                 "import time\ntime.sleep(9)\nimport sys\n"
3209                                 "sys.stderr.write('\\n!runaway child!\\n')"],
3210                                stdout=subprocess.DEVNULL, **kwargs)
3211                for call in mock__wait.call_args_list[1:]:
3212                    self.assertNotEqual(
3213                            call, mock.call(timeout=None),
3214                            "no open-ended wait() after the first allowed: "
3215                            f"{mock__wait.call_args_list}")
3216                sigint_calls = []
3217                for call in mock__wait.call_args_list:
3218                    if call == mock.call(timeout=0.25):  # from Popen.__init__
3219                        sigint_calls.append(call)
3220                self.assertLessEqual(mock__wait.call_count, 2,
3221                                     msg=mock__wait.call_args_list)
3222                self.assertEqual(len(sigint_calls), 1,
3223                                 msg=mock__wait.call_args_list)
3224        finally:
3225            # cleanup the forgotten (due to our mocks) child process
3226            process = self.RecordingPopen.instances_created.pop()
3227            process.kill()
3228            process.wait()
3229            self.assertEqual([], self.RecordingPopen.instances_created)
3230
3231    def test_call_keyboardinterrupt_no_kill(self):
3232        self._test_keyboardinterrupt_no_kill(subprocess.call, timeout=6.282)
3233
3234    def test_run_keyboardinterrupt_no_kill(self):
3235        self._test_keyboardinterrupt_no_kill(subprocess.run, timeout=6.282)
3236
3237    def test_context_manager_keyboardinterrupt_no_kill(self):
3238        def popen_via_context_manager(*args, **kwargs):
3239            with subprocess.Popen(*args, **kwargs) as unused_process:
3240                raise KeyboardInterrupt  # Test how __exit__ handles ^C.
3241        self._test_keyboardinterrupt_no_kill(popen_via_context_manager)
3242
3243    def test_getoutput(self):
3244        self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy')
3245        self.assertEqual(subprocess.getstatusoutput('echo xyzzy'),
3246                         (0, 'xyzzy'))
3247
3248        # we use mkdtemp in the next line to create an empty directory
3249        # under our exclusive control; from that, we can invent a pathname
3250        # that we _know_ won't exist.  This is guaranteed to fail.
3251        dir = None
3252        try:
3253            dir = tempfile.mkdtemp()
3254            name = os.path.join(dir, "foo")
3255            status, output = subprocess.getstatusoutput(
3256                ("type " if mswindows else "cat ") + name)
3257            self.assertNotEqual(status, 0)
3258        finally:
3259            if dir is not None:
3260                os.rmdir(dir)
3261
3262    def test__all__(self):
3263        """Ensure that __all__ is populated properly."""
3264        intentionally_excluded = {"list2cmdline", "Handle"}
3265        exported = set(subprocess.__all__)
3266        possible_exports = set()
3267        import types
3268        for name, value in subprocess.__dict__.items():
3269            if name.startswith('_'):
3270                continue
3271            if isinstance(value, (types.ModuleType,)):
3272                continue
3273            possible_exports.add(name)
3274        self.assertEqual(exported, possible_exports - intentionally_excluded)
3275
3276
3277@unittest.skipUnless(hasattr(selectors, 'PollSelector'),
3278                     "Test needs selectors.PollSelector")
3279class ProcessTestCaseNoPoll(ProcessTestCase):
3280    def setUp(self):
3281        self.orig_selector = subprocess._PopenSelector
3282        subprocess._PopenSelector = selectors.SelectSelector
3283        ProcessTestCase.setUp(self)
3284
3285    def tearDown(self):
3286        subprocess._PopenSelector = self.orig_selector
3287        ProcessTestCase.tearDown(self)
3288
3289
3290@unittest.skipUnless(mswindows, "Windows-specific tests")
3291class CommandsWithSpaces (BaseTestCase):
3292
3293    def setUp(self):
3294        super().setUp()
3295        f, fname = tempfile.mkstemp(".py", "te st")
3296        self.fname = fname.lower ()
3297        os.write(f, b"import sys;"
3298                    b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))"
3299        )
3300        os.close(f)
3301
3302    def tearDown(self):
3303        os.remove(self.fname)
3304        super().tearDown()
3305
3306    def with_spaces(self, *args, **kwargs):
3307        kwargs['stdout'] = subprocess.PIPE
3308        p = subprocess.Popen(*args, **kwargs)
3309        with p:
3310            self.assertEqual(
3311              p.stdout.read ().decode("mbcs"),
3312              "2 [%r, 'ab cd']" % self.fname
3313            )
3314
3315    def test_shell_string_with_spaces(self):
3316        # call() function with string argument with spaces on Windows
3317        self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
3318                                             "ab cd"), shell=1)
3319
3320    def test_shell_sequence_with_spaces(self):
3321        # call() function with sequence argument with spaces on Windows
3322        self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1)
3323
3324    def test_noshell_string_with_spaces(self):
3325        # call() function with string argument with spaces on Windows
3326        self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
3327                             "ab cd"))
3328
3329    def test_noshell_sequence_with_spaces(self):
3330        # call() function with sequence argument with spaces on Windows
3331        self.with_spaces([sys.executable, self.fname, "ab cd"])
3332
3333
3334class ContextManagerTests(BaseTestCase):
3335
3336    def test_pipe(self):
3337        with subprocess.Popen([sys.executable, "-c",
3338                               "import sys;"
3339                               "sys.stdout.write('stdout');"
3340                               "sys.stderr.write('stderr');"],
3341                              stdout=subprocess.PIPE,
3342                              stderr=subprocess.PIPE) as proc:
3343            self.assertEqual(proc.stdout.read(), b"stdout")
3344            self.assertStderrEqual(proc.stderr.read(), b"stderr")
3345
3346        self.assertTrue(proc.stdout.closed)
3347        self.assertTrue(proc.stderr.closed)
3348
3349    def test_returncode(self):
3350        with subprocess.Popen([sys.executable, "-c",
3351                               "import sys; sys.exit(100)"]) as proc:
3352            pass
3353        # __exit__ calls wait(), so the returncode should be set
3354        self.assertEqual(proc.returncode, 100)
3355
3356    def test_communicate_stdin(self):
3357        with subprocess.Popen([sys.executable, "-c",
3358                              "import sys;"
3359                              "sys.exit(sys.stdin.read() == 'context')"],
3360                             stdin=subprocess.PIPE) as proc:
3361            proc.communicate(b"context")
3362            self.assertEqual(proc.returncode, 1)
3363
3364    def test_invalid_args(self):
3365        with self.assertRaises(NONEXISTING_ERRORS):
3366            with subprocess.Popen(NONEXISTING_CMD,
3367                                  stdout=subprocess.PIPE,
3368                                  stderr=subprocess.PIPE) as proc:
3369                pass
3370
3371    def test_broken_pipe_cleanup(self):
3372        """Broken pipe error should not prevent wait() (Issue 21619)"""
3373        proc = subprocess.Popen(ZERO_RETURN_CMD,
3374                                stdin=subprocess.PIPE,
3375                                bufsize=support.PIPE_MAX_SIZE*2)
3376        proc = proc.__enter__()
3377        # Prepare to send enough data to overflow any OS pipe buffering and
3378        # guarantee a broken pipe error. Data is held in BufferedWriter
3379        # buffer until closed.
3380        proc.stdin.write(b'x' * support.PIPE_MAX_SIZE)
3381        self.assertIsNone(proc.returncode)
3382        # EPIPE expected under POSIX; EINVAL under Windows
3383        self.assertRaises(OSError, proc.__exit__, None, None, None)
3384        self.assertEqual(proc.returncode, 0)
3385        self.assertTrue(proc.stdin.closed)
3386
3387
3388if __name__ == "__main__":
3389    unittest.main()
3390