• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import unittest
2from unittest import mock
3from test import support
4from test.support import import_helper
5from test.support import os_helper
6from test.support import warnings_helper
7import subprocess
8import sys
9import signal
10import io
11import itertools
12import os
13import errno
14import tempfile
15import time
16import traceback
17import types
18import selectors
19import sysconfig
20import select
21import shutil
22import threading
23import gc
24import textwrap
25import json
26import pathlib
27from test.support.os_helper import FakePath
28
29try:
30    import _testcapi
31except ImportError:
32    _testcapi = None
33
34try:
35    import pwd
36except ImportError:
37    pwd = None
38try:
39    import grp
40except ImportError:
41    grp = None
42
43try:
44    import fcntl
45except:
46    fcntl = None
47
48if support.PGO:
49    raise unittest.SkipTest("test is not helpful for PGO")
50
51mswindows = (sys.platform == "win32")
52
53#
54# Depends on the following external programs: Python
55#
56
57if mswindows:
58    SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), '
59                                                'os.O_BINARY);')
60else:
61    SETBINARY = ''
62
63NONEXISTING_CMD = ('nonexisting_i_hope',)
64# Ignore errors that indicate the command was not found
65NONEXISTING_ERRORS = (FileNotFoundError, NotADirectoryError, PermissionError)
66
67ZERO_RETURN_CMD = (sys.executable, '-c', 'pass')
68
69
70def setUpModule():
71    shell_true = shutil.which('true')
72    if shell_true is None:
73        return
74    if (os.access(shell_true, os.X_OK) and
75        subprocess.run([shell_true]).returncode == 0):
76        global ZERO_RETURN_CMD
77        ZERO_RETURN_CMD = (shell_true,)  # Faster than Python startup.
78
79
80class BaseTestCase(unittest.TestCase):
81    def setUp(self):
82        # Try to minimize the number of children we have so this test
83        # doesn't crash on some buildbots (Alphas in particular).
84        support.reap_children()
85
86    def tearDown(self):
87        if not mswindows:
88            # subprocess._active is not used on Windows and is set to None.
89            for inst in subprocess._active:
90                inst.wait()
91            subprocess._cleanup()
92            self.assertFalse(
93                subprocess._active, "subprocess._active not empty"
94            )
95        self.doCleanups()
96        support.reap_children()
97
98
99class PopenTestException(Exception):
100    pass
101
102
103class PopenExecuteChildRaises(subprocess.Popen):
104    """Popen subclass for testing cleanup of subprocess.PIPE filehandles when
105    _execute_child fails.
106    """
107    def _execute_child(self, *args, **kwargs):
108        raise PopenTestException("Forced Exception for Test")
109
110
111class ProcessTestCase(BaseTestCase):
112
113    def test_io_buffered_by_default(self):
114        p = subprocess.Popen(ZERO_RETURN_CMD,
115                             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
116                             stderr=subprocess.PIPE)
117        try:
118            self.assertIsInstance(p.stdin, io.BufferedIOBase)
119            self.assertIsInstance(p.stdout, io.BufferedIOBase)
120            self.assertIsInstance(p.stderr, io.BufferedIOBase)
121        finally:
122            p.stdin.close()
123            p.stdout.close()
124            p.stderr.close()
125            p.wait()
126
127    def test_io_unbuffered_works(self):
128        p = subprocess.Popen(ZERO_RETURN_CMD,
129                             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
130                             stderr=subprocess.PIPE, bufsize=0)
131        try:
132            self.assertIsInstance(p.stdin, io.RawIOBase)
133            self.assertIsInstance(p.stdout, io.RawIOBase)
134            self.assertIsInstance(p.stderr, io.RawIOBase)
135        finally:
136            p.stdin.close()
137            p.stdout.close()
138            p.stderr.close()
139            p.wait()
140
141    def test_call_seq(self):
142        # call() function with sequence argument
143        rc = subprocess.call([sys.executable, "-c",
144                              "import sys; sys.exit(47)"])
145        self.assertEqual(rc, 47)
146
147    def test_call_timeout(self):
148        # call() function with timeout argument; we want to test that the child
149        # process gets killed when the timeout expires.  If the child isn't
150        # killed, this call will deadlock since subprocess.call waits for the
151        # child.
152        self.assertRaises(subprocess.TimeoutExpired, subprocess.call,
153                          [sys.executable, "-c", "while True: pass"],
154                          timeout=0.1)
155
156    def test_check_call_zero(self):
157        # check_call() function with zero return code
158        rc = subprocess.check_call(ZERO_RETURN_CMD)
159        self.assertEqual(rc, 0)
160
161    def test_check_call_nonzero(self):
162        # check_call() function with non-zero return code
163        with self.assertRaises(subprocess.CalledProcessError) as c:
164            subprocess.check_call([sys.executable, "-c",
165                                   "import sys; sys.exit(47)"])
166        self.assertEqual(c.exception.returncode, 47)
167
168    def test_check_output(self):
169        # check_output() function with zero return code
170        output = subprocess.check_output(
171                [sys.executable, "-c", "print('BDFL')"])
172        self.assertIn(b'BDFL', output)
173
174    def test_check_output_nonzero(self):
175        # check_call() function with non-zero return code
176        with self.assertRaises(subprocess.CalledProcessError) as c:
177            subprocess.check_output(
178                    [sys.executable, "-c", "import sys; sys.exit(5)"])
179        self.assertEqual(c.exception.returncode, 5)
180
181    def test_check_output_stderr(self):
182        # check_output() function stderr redirected to stdout
183        output = subprocess.check_output(
184                [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"],
185                stderr=subprocess.STDOUT)
186        self.assertIn(b'BDFL', output)
187
188    def test_check_output_stdin_arg(self):
189        # check_output() can be called with stdin set to a file
190        tf = tempfile.TemporaryFile()
191        self.addCleanup(tf.close)
192        tf.write(b'pear')
193        tf.seek(0)
194        output = subprocess.check_output(
195                [sys.executable, "-c",
196                 "import sys; sys.stdout.write(sys.stdin.read().upper())"],
197                stdin=tf)
198        self.assertIn(b'PEAR', output)
199
200    def test_check_output_input_arg(self):
201        # check_output() can be called with input set to a string
202        output = subprocess.check_output(
203                [sys.executable, "-c",
204                 "import sys; sys.stdout.write(sys.stdin.read().upper())"],
205                input=b'pear')
206        self.assertIn(b'PEAR', output)
207
208    def test_check_output_input_none(self):
209        """input=None has a legacy meaning of input='' on check_output."""
210        output = subprocess.check_output(
211                [sys.executable, "-c",
212                 "import sys; print('XX' if sys.stdin.read() else '')"],
213                input=None)
214        self.assertNotIn(b'XX', output)
215
216    def test_check_output_input_none_text(self):
217        output = subprocess.check_output(
218                [sys.executable, "-c",
219                 "import sys; print('XX' if sys.stdin.read() else '')"],
220                input=None, text=True)
221        self.assertNotIn('XX', output)
222
223    def test_check_output_input_none_universal_newlines(self):
224        output = subprocess.check_output(
225                [sys.executable, "-c",
226                 "import sys; print('XX' if sys.stdin.read() else '')"],
227                input=None, universal_newlines=True)
228        self.assertNotIn('XX', output)
229
230    def test_check_output_stdout_arg(self):
231        # check_output() refuses to accept 'stdout' argument
232        with self.assertRaises(ValueError) as c:
233            output = subprocess.check_output(
234                    [sys.executable, "-c", "print('will not be run')"],
235                    stdout=sys.stdout)
236            self.fail("Expected ValueError when stdout arg supplied.")
237        self.assertIn('stdout', c.exception.args[0])
238
239    def test_check_output_stdin_with_input_arg(self):
240        # check_output() refuses to accept 'stdin' with 'input'
241        tf = tempfile.TemporaryFile()
242        self.addCleanup(tf.close)
243        tf.write(b'pear')
244        tf.seek(0)
245        with self.assertRaises(ValueError) as c:
246            output = subprocess.check_output(
247                    [sys.executable, "-c", "print('will not be run')"],
248                    stdin=tf, input=b'hare')
249            self.fail("Expected ValueError when stdin and input args supplied.")
250        self.assertIn('stdin', c.exception.args[0])
251        self.assertIn('input', c.exception.args[0])
252
253    def test_check_output_timeout(self):
254        # check_output() function with timeout arg
255        with self.assertRaises(subprocess.TimeoutExpired) as c:
256            output = subprocess.check_output(
257                    [sys.executable, "-c",
258                     "import sys, time\n"
259                     "sys.stdout.write('BDFL')\n"
260                     "sys.stdout.flush()\n"
261                     "time.sleep(3600)"],
262                    # Some heavily loaded buildbots (sparc Debian 3.x) require
263                    # this much time to start and print.
264                    timeout=3)
265            self.fail("Expected TimeoutExpired.")
266        self.assertEqual(c.exception.output, b'BDFL')
267
268    def test_call_kwargs(self):
269        # call() function with keyword args
270        newenv = os.environ.copy()
271        newenv["FRUIT"] = "banana"
272        rc = subprocess.call([sys.executable, "-c",
273                              'import sys, os;'
274                              'sys.exit(os.getenv("FRUIT")=="banana")'],
275                             env=newenv)
276        self.assertEqual(rc, 1)
277
278    def test_invalid_args(self):
279        # Popen() called with invalid arguments should raise TypeError
280        # but Popen.__del__ should not complain (issue #12085)
281        with support.captured_stderr() as s:
282            self.assertRaises(TypeError, subprocess.Popen, invalid_arg_name=1)
283            argcount = subprocess.Popen.__init__.__code__.co_argcount
284            too_many_args = [0] * (argcount + 1)
285            self.assertRaises(TypeError, subprocess.Popen, *too_many_args)
286        self.assertEqual(s.getvalue(), '')
287
288    def test_stdin_none(self):
289        # .stdin is None when not redirected
290        p = subprocess.Popen([sys.executable, "-c", 'print("banana")'],
291                         stdout=subprocess.PIPE, stderr=subprocess.PIPE)
292        self.addCleanup(p.stdout.close)
293        self.addCleanup(p.stderr.close)
294        p.wait()
295        self.assertEqual(p.stdin, None)
296
297    def test_stdout_none(self):
298        # .stdout is None when not redirected, and the child's stdout will
299        # be inherited from the parent.  In order to test this we run a
300        # subprocess in a subprocess:
301        # this_test
302        #   \-- subprocess created by this test (parent)
303        #          \-- subprocess created by the parent subprocess (child)
304        # The parent doesn't specify stdout, so the child will use the
305        # parent's stdout.  This test checks that the message printed by the
306        # child goes to the parent stdout.  The parent also checks that the
307        # child's stdout is None.  See #11963.
308        code = ('import sys; from subprocess import Popen, PIPE;'
309                'p = Popen([sys.executable, "-c", "print(\'test_stdout_none\')"],'
310                '          stdin=PIPE, stderr=PIPE);'
311                'p.wait(); assert p.stdout is None;')
312        p = subprocess.Popen([sys.executable, "-c", code],
313                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
314        self.addCleanup(p.stdout.close)
315        self.addCleanup(p.stderr.close)
316        out, err = p.communicate()
317        self.assertEqual(p.returncode, 0, err)
318        self.assertEqual(out.rstrip(), b'test_stdout_none')
319
320    def test_stderr_none(self):
321        # .stderr is None when not redirected
322        p = subprocess.Popen([sys.executable, "-c", 'print("banana")'],
323                         stdin=subprocess.PIPE, stdout=subprocess.PIPE)
324        self.addCleanup(p.stdout.close)
325        self.addCleanup(p.stdin.close)
326        p.wait()
327        self.assertEqual(p.stderr, None)
328
329    def _assert_python(self, pre_args, **kwargs):
330        # We include sys.exit() to prevent the test runner from hanging
331        # whenever python is found.
332        args = pre_args + ["import sys; sys.exit(47)"]
333        p = subprocess.Popen(args, **kwargs)
334        p.wait()
335        self.assertEqual(47, p.returncode)
336
337    def test_executable(self):
338        # Check that the executable argument works.
339        #
340        # On Unix (non-Mac and non-Windows), Python looks at args[0] to
341        # determine where its standard library is, so we need the directory
342        # of args[0] to be valid for the Popen() call to Python to succeed.
343        # See also issue #16170 and issue #7774.
344        doesnotexist = os.path.join(os.path.dirname(sys.executable),
345                                    "doesnotexist")
346        self._assert_python([doesnotexist, "-c"], executable=sys.executable)
347
348    def test_bytes_executable(self):
349        doesnotexist = os.path.join(os.path.dirname(sys.executable),
350                                    "doesnotexist")
351        self._assert_python([doesnotexist, "-c"],
352                            executable=os.fsencode(sys.executable))
353
354    def test_pathlike_executable(self):
355        doesnotexist = os.path.join(os.path.dirname(sys.executable),
356                                    "doesnotexist")
357        self._assert_python([doesnotexist, "-c"],
358                            executable=FakePath(sys.executable))
359
360    def test_executable_takes_precedence(self):
361        # Check that the executable argument takes precedence over args[0].
362        #
363        # Verify first that the call succeeds without the executable arg.
364        pre_args = [sys.executable, "-c"]
365        self._assert_python(pre_args)
366        self.assertRaises(NONEXISTING_ERRORS,
367                          self._assert_python, pre_args,
368                          executable=NONEXISTING_CMD[0])
369
370    @unittest.skipIf(mswindows, "executable argument replaces shell")
371    def test_executable_replaces_shell(self):
372        # Check that the executable argument replaces the default shell
373        # when shell=True.
374        self._assert_python([], executable=sys.executable, shell=True)
375
376    @unittest.skipIf(mswindows, "executable argument replaces shell")
377    def test_bytes_executable_replaces_shell(self):
378        self._assert_python([], executable=os.fsencode(sys.executable),
379                            shell=True)
380
381    @unittest.skipIf(mswindows, "executable argument replaces shell")
382    def test_pathlike_executable_replaces_shell(self):
383        self._assert_python([], executable=FakePath(sys.executable),
384                            shell=True)
385
386    # For use in the test_cwd* tests below.
387    def _normalize_cwd(self, cwd):
388        # Normalize an expected cwd (for Tru64 support).
389        # We can't use os.path.realpath since it doesn't expand Tru64 {memb}
390        # strings.  See bug #1063571.
391        with os_helper.change_cwd(cwd):
392            return os.getcwd()
393
394    # For use in the test_cwd* tests below.
395    def _split_python_path(self):
396        # Return normalized (python_dir, python_base).
397        python_path = os.path.realpath(sys.executable)
398        return os.path.split(python_path)
399
400    # For use in the test_cwd* tests below.
401    def _assert_cwd(self, expected_cwd, python_arg, **kwargs):
402        # Invoke Python via Popen, and assert that (1) the call succeeds,
403        # and that (2) the current working directory of the child process
404        # matches *expected_cwd*.
405        p = subprocess.Popen([python_arg, "-c",
406                              "import os, sys; "
407                              "buf = sys.stdout.buffer; "
408                              "buf.write(os.getcwd().encode()); "
409                              "buf.flush(); "
410                              "sys.exit(47)"],
411                              stdout=subprocess.PIPE,
412                              **kwargs)
413        self.addCleanup(p.stdout.close)
414        p.wait()
415        self.assertEqual(47, p.returncode)
416        normcase = os.path.normcase
417        self.assertEqual(normcase(expected_cwd),
418                         normcase(p.stdout.read().decode()))
419
420    def test_cwd(self):
421        # Check that cwd changes the cwd for the child process.
422        temp_dir = tempfile.gettempdir()
423        temp_dir = self._normalize_cwd(temp_dir)
424        self._assert_cwd(temp_dir, sys.executable, cwd=temp_dir)
425
426    def test_cwd_with_bytes(self):
427        temp_dir = tempfile.gettempdir()
428        temp_dir = self._normalize_cwd(temp_dir)
429        self._assert_cwd(temp_dir, sys.executable, cwd=os.fsencode(temp_dir))
430
431    def test_cwd_with_pathlike(self):
432        temp_dir = tempfile.gettempdir()
433        temp_dir = self._normalize_cwd(temp_dir)
434        self._assert_cwd(temp_dir, sys.executable, cwd=FakePath(temp_dir))
435
436    @unittest.skipIf(mswindows, "pending resolution of issue #15533")
437    def test_cwd_with_relative_arg(self):
438        # Check that Popen looks for args[0] relative to cwd if args[0]
439        # is relative.
440        python_dir, python_base = self._split_python_path()
441        rel_python = os.path.join(os.curdir, python_base)
442        with os_helper.temp_cwd() as wrong_dir:
443            # Before calling with the correct cwd, confirm that the call fails
444            # without cwd and with the wrong cwd.
445            self.assertRaises(FileNotFoundError, subprocess.Popen,
446                              [rel_python])
447            self.assertRaises(FileNotFoundError, subprocess.Popen,
448                              [rel_python], cwd=wrong_dir)
449            python_dir = self._normalize_cwd(python_dir)
450            self._assert_cwd(python_dir, rel_python, cwd=python_dir)
451
452    @unittest.skipIf(mswindows, "pending resolution of issue #15533")
453    def test_cwd_with_relative_executable(self):
454        # Check that Popen looks for executable relative to cwd if executable
455        # is relative (and that executable takes precedence over args[0]).
456        python_dir, python_base = self._split_python_path()
457        rel_python = os.path.join(os.curdir, python_base)
458        doesntexist = "somethingyoudonthave"
459        with os_helper.temp_cwd() as wrong_dir:
460            # Before calling with the correct cwd, confirm that the call fails
461            # without cwd and with the wrong cwd.
462            self.assertRaises(FileNotFoundError, subprocess.Popen,
463                              [doesntexist], executable=rel_python)
464            self.assertRaises(FileNotFoundError, subprocess.Popen,
465                              [doesntexist], executable=rel_python,
466                              cwd=wrong_dir)
467            python_dir = self._normalize_cwd(python_dir)
468            self._assert_cwd(python_dir, doesntexist, executable=rel_python,
469                             cwd=python_dir)
470
471    def test_cwd_with_absolute_arg(self):
472        # Check that Popen can find the executable when the cwd is wrong
473        # if args[0] is an absolute path.
474        python_dir, python_base = self._split_python_path()
475        abs_python = os.path.join(python_dir, python_base)
476        rel_python = os.path.join(os.curdir, python_base)
477        with os_helper.temp_dir() as wrong_dir:
478            # Before calling with an absolute path, confirm that using a
479            # relative path fails.
480            self.assertRaises(FileNotFoundError, subprocess.Popen,
481                              [rel_python], cwd=wrong_dir)
482            wrong_dir = self._normalize_cwd(wrong_dir)
483            self._assert_cwd(wrong_dir, abs_python, cwd=wrong_dir)
484
485    @unittest.skipIf(sys.base_prefix != sys.prefix,
486                     'Test is not venv-compatible')
487    def test_executable_with_cwd(self):
488        python_dir, python_base = self._split_python_path()
489        python_dir = self._normalize_cwd(python_dir)
490        self._assert_cwd(python_dir, "somethingyoudonthave",
491                         executable=sys.executable, cwd=python_dir)
492
493    @unittest.skipIf(sys.base_prefix != sys.prefix,
494                     'Test is not venv-compatible')
495    @unittest.skipIf(sysconfig.is_python_build(),
496                     "need an installed Python. See #7774")
497    def test_executable_without_cwd(self):
498        # For a normal installation, it should work without 'cwd'
499        # argument.  For test runs in the build directory, see #7774.
500        self._assert_cwd(os.getcwd(), "somethingyoudonthave",
501                         executable=sys.executable)
502
503    def test_stdin_pipe(self):
504        # stdin redirection
505        p = subprocess.Popen([sys.executable, "-c",
506                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
507                        stdin=subprocess.PIPE)
508        p.stdin.write(b"pear")
509        p.stdin.close()
510        p.wait()
511        self.assertEqual(p.returncode, 1)
512
513    def test_stdin_filedes(self):
514        # stdin is set to open file descriptor
515        tf = tempfile.TemporaryFile()
516        self.addCleanup(tf.close)
517        d = tf.fileno()
518        os.write(d, b"pear")
519        os.lseek(d, 0, 0)
520        p = subprocess.Popen([sys.executable, "-c",
521                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
522                         stdin=d)
523        p.wait()
524        self.assertEqual(p.returncode, 1)
525
526    def test_stdin_fileobj(self):
527        # stdin is set to open file object
528        tf = tempfile.TemporaryFile()
529        self.addCleanup(tf.close)
530        tf.write(b"pear")
531        tf.seek(0)
532        p = subprocess.Popen([sys.executable, "-c",
533                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
534                         stdin=tf)
535        p.wait()
536        self.assertEqual(p.returncode, 1)
537
538    def test_stdout_pipe(self):
539        # stdout redirection
540        p = subprocess.Popen([sys.executable, "-c",
541                          'import sys; sys.stdout.write("orange")'],
542                         stdout=subprocess.PIPE)
543        with p:
544            self.assertEqual(p.stdout.read(), b"orange")
545
546    def test_stdout_filedes(self):
547        # stdout is set to open file descriptor
548        tf = tempfile.TemporaryFile()
549        self.addCleanup(tf.close)
550        d = tf.fileno()
551        p = subprocess.Popen([sys.executable, "-c",
552                          'import sys; sys.stdout.write("orange")'],
553                         stdout=d)
554        p.wait()
555        os.lseek(d, 0, 0)
556        self.assertEqual(os.read(d, 1024), b"orange")
557
558    def test_stdout_fileobj(self):
559        # stdout is set to open file object
560        tf = tempfile.TemporaryFile()
561        self.addCleanup(tf.close)
562        p = subprocess.Popen([sys.executable, "-c",
563                          'import sys; sys.stdout.write("orange")'],
564                         stdout=tf)
565        p.wait()
566        tf.seek(0)
567        self.assertEqual(tf.read(), b"orange")
568
569    def test_stderr_pipe(self):
570        # stderr redirection
571        p = subprocess.Popen([sys.executable, "-c",
572                          'import sys; sys.stderr.write("strawberry")'],
573                         stderr=subprocess.PIPE)
574        with p:
575            self.assertEqual(p.stderr.read(), b"strawberry")
576
577    def test_stderr_filedes(self):
578        # stderr is set to open file descriptor
579        tf = tempfile.TemporaryFile()
580        self.addCleanup(tf.close)
581        d = tf.fileno()
582        p = subprocess.Popen([sys.executable, "-c",
583                          'import sys; sys.stderr.write("strawberry")'],
584                         stderr=d)
585        p.wait()
586        os.lseek(d, 0, 0)
587        self.assertEqual(os.read(d, 1024), b"strawberry")
588
589    def test_stderr_fileobj(self):
590        # stderr is set to open file object
591        tf = tempfile.TemporaryFile()
592        self.addCleanup(tf.close)
593        p = subprocess.Popen([sys.executable, "-c",
594                          'import sys; sys.stderr.write("strawberry")'],
595                         stderr=tf)
596        p.wait()
597        tf.seek(0)
598        self.assertEqual(tf.read(), b"strawberry")
599
600    def test_stderr_redirect_with_no_stdout_redirect(self):
601        # test stderr=STDOUT while stdout=None (not set)
602
603        # - grandchild prints to stderr
604        # - child redirects grandchild's stderr to its stdout
605        # - the parent should get grandchild's stderr in child's stdout
606        p = subprocess.Popen([sys.executable, "-c",
607                              'import sys, subprocess;'
608                              'rc = subprocess.call([sys.executable, "-c",'
609                              '    "import sys;"'
610                              '    "sys.stderr.write(\'42\')"],'
611                              '    stderr=subprocess.STDOUT);'
612                              'sys.exit(rc)'],
613                             stdout=subprocess.PIPE,
614                             stderr=subprocess.PIPE)
615        stdout, stderr = p.communicate()
616        #NOTE: stdout should get stderr from grandchild
617        self.assertEqual(stdout, b'42')
618        self.assertEqual(stderr, b'') # should be empty
619        self.assertEqual(p.returncode, 0)
620
621    def test_stdout_stderr_pipe(self):
622        # capture stdout and stderr to the same pipe
623        p = subprocess.Popen([sys.executable, "-c",
624                              'import sys;'
625                              'sys.stdout.write("apple");'
626                              'sys.stdout.flush();'
627                              'sys.stderr.write("orange")'],
628                             stdout=subprocess.PIPE,
629                             stderr=subprocess.STDOUT)
630        with p:
631            self.assertEqual(p.stdout.read(), b"appleorange")
632
633    def test_stdout_stderr_file(self):
634        # capture stdout and stderr to the same open file
635        tf = tempfile.TemporaryFile()
636        self.addCleanup(tf.close)
637        p = subprocess.Popen([sys.executable, "-c",
638                              'import sys;'
639                              'sys.stdout.write("apple");'
640                              'sys.stdout.flush();'
641                              'sys.stderr.write("orange")'],
642                             stdout=tf,
643                             stderr=tf)
644        p.wait()
645        tf.seek(0)
646        self.assertEqual(tf.read(), b"appleorange")
647
648    def test_stdout_filedes_of_stdout(self):
649        # stdout is set to 1 (#1531862).
650        # To avoid printing the text on stdout, we do something similar to
651        # test_stdout_none (see above).  The parent subprocess calls the child
652        # subprocess passing stdout=1, and this test uses stdout=PIPE in
653        # order to capture and check the output of the parent. See #11963.
654        code = ('import sys, subprocess; '
655                'rc = subprocess.call([sys.executable, "-c", '
656                '    "import os, sys; sys.exit(os.write(sys.stdout.fileno(), '
657                     'b\'test with stdout=1\'))"], stdout=1); '
658                'assert rc == 18')
659        p = subprocess.Popen([sys.executable, "-c", code],
660                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
661        self.addCleanup(p.stdout.close)
662        self.addCleanup(p.stderr.close)
663        out, err = p.communicate()
664        self.assertEqual(p.returncode, 0, err)
665        self.assertEqual(out.rstrip(), b'test with stdout=1')
666
667    def test_stdout_devnull(self):
668        p = subprocess.Popen([sys.executable, "-c",
669                              'for i in range(10240):'
670                              'print("x" * 1024)'],
671                              stdout=subprocess.DEVNULL)
672        p.wait()
673        self.assertEqual(p.stdout, None)
674
675    def test_stderr_devnull(self):
676        p = subprocess.Popen([sys.executable, "-c",
677                              'import sys\n'
678                              'for i in range(10240):'
679                              'sys.stderr.write("x" * 1024)'],
680                              stderr=subprocess.DEVNULL)
681        p.wait()
682        self.assertEqual(p.stderr, None)
683
684    def test_stdin_devnull(self):
685        p = subprocess.Popen([sys.executable, "-c",
686                              'import sys;'
687                              'sys.stdin.read(1)'],
688                              stdin=subprocess.DEVNULL)
689        p.wait()
690        self.assertEqual(p.stdin, None)
691
692    @unittest.skipUnless(fcntl and hasattr(fcntl, 'F_GETPIPE_SZ'),
693                         'fcntl.F_GETPIPE_SZ required for test.')
694    def test_pipesizes(self):
695        test_pipe_r, test_pipe_w = os.pipe()
696        try:
697            # Get the default pipesize with F_GETPIPE_SZ
698            pipesize_default = fcntl.fcntl(test_pipe_w, fcntl.F_GETPIPE_SZ)
699        finally:
700            os.close(test_pipe_r)
701            os.close(test_pipe_w)
702        pipesize = pipesize_default // 2
703        if pipesize < 512:  # the POSIX minimum
704            raise unittest.SkitTest(
705                'default pipesize too small to perform test.')
706        p = subprocess.Popen(
707            [sys.executable, "-c",
708             'import sys; sys.stdin.read(); sys.stdout.write("out"); '
709             'sys.stderr.write("error!")'],
710            stdin=subprocess.PIPE, stdout=subprocess.PIPE,
711            stderr=subprocess.PIPE, pipesize=pipesize)
712        try:
713            for fifo in [p.stdin, p.stdout, p.stderr]:
714                self.assertEqual(
715                    fcntl.fcntl(fifo.fileno(), fcntl.F_GETPIPE_SZ),
716                    pipesize)
717            # Windows pipe size can be acquired via GetNamedPipeInfoFunction
718            # https://docs.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-getnamedpipeinfo
719            # However, this function is not yet in _winapi.
720            p.stdin.write(b"pear")
721            p.stdin.close()
722        finally:
723            p.kill()
724            p.wait()
725
726    @unittest.skipUnless(fcntl and hasattr(fcntl, 'F_GETPIPE_SZ'),
727                         'fcntl.F_GETPIPE_SZ required for test.')
728    def test_pipesize_default(self):
729        p = subprocess.Popen(
730            [sys.executable, "-c",
731             'import sys; sys.stdin.read(); sys.stdout.write("out"); '
732             'sys.stderr.write("error!")'],
733            stdin=subprocess.PIPE, stdout=subprocess.PIPE,
734            stderr=subprocess.PIPE, pipesize=-1)
735        try:
736            fp_r, fp_w = os.pipe()
737            try:
738                default_pipesize = fcntl.fcntl(fp_w, fcntl.F_GETPIPE_SZ)
739                for fifo in [p.stdin, p.stdout, p.stderr]:
740                    self.assertEqual(
741                        fcntl.fcntl(fifo.fileno(), fcntl.F_GETPIPE_SZ),
742                        default_pipesize)
743            finally:
744                os.close(fp_r)
745                os.close(fp_w)
746            # On other platforms we cannot test the pipe size (yet). But above
747            # code using pipesize=-1 should not crash.
748            p.stdin.close()
749        finally:
750            p.kill()
751            p.wait()
752
753    def test_env(self):
754        newenv = os.environ.copy()
755        newenv["FRUIT"] = "orange"
756        with subprocess.Popen([sys.executable, "-c",
757                               'import sys,os;'
758                               'sys.stdout.write(os.getenv("FRUIT"))'],
759                              stdout=subprocess.PIPE,
760                              env=newenv) as p:
761            stdout, stderr = p.communicate()
762            self.assertEqual(stdout, b"orange")
763
764    # Windows requires at least the SYSTEMROOT environment variable to start
765    # Python
766    @unittest.skipIf(sys.platform == 'win32',
767                     'cannot test an empty env on Windows')
768    @unittest.skipIf(sysconfig.get_config_var('Py_ENABLE_SHARED') == 1,
769                     'The Python shared library cannot be loaded '
770                     'with an empty environment.')
771    def test_empty_env(self):
772        """Verify that env={} is as empty as possible."""
773
774        def is_env_var_to_ignore(n):
775            """Determine if an environment variable is under our control."""
776            # This excludes some __CF_* and VERSIONER_* keys MacOS insists
777            # on adding even when the environment in exec is empty.
778            # Gentoo sandboxes also force LD_PRELOAD and SANDBOX_* to exist.
779            return ('VERSIONER' in n or '__CF' in n or  # MacOS
780                    n == 'LD_PRELOAD' or n.startswith('SANDBOX') or # Gentoo
781                    n == 'LC_CTYPE') # Locale coercion triggered
782
783        with subprocess.Popen([sys.executable, "-c",
784                               'import os; print(list(os.environ.keys()))'],
785                              stdout=subprocess.PIPE, env={}) as p:
786            stdout, stderr = p.communicate()
787            child_env_names = eval(stdout.strip())
788            self.assertIsInstance(child_env_names, list)
789            child_env_names = [k for k in child_env_names
790                               if not is_env_var_to_ignore(k)]
791            self.assertEqual(child_env_names, [])
792
793    def test_invalid_cmd(self):
794        # null character in the command name
795        cmd = sys.executable + '\0'
796        with self.assertRaises(ValueError):
797            subprocess.Popen([cmd, "-c", "pass"])
798
799        # null character in the command argument
800        with self.assertRaises(ValueError):
801            subprocess.Popen([sys.executable, "-c", "pass#\0"])
802
803    def test_invalid_env(self):
804        # null character in the environment variable name
805        newenv = os.environ.copy()
806        newenv["FRUIT\0VEGETABLE"] = "cabbage"
807        with self.assertRaises(ValueError):
808            subprocess.Popen(ZERO_RETURN_CMD, env=newenv)
809
810        # null character in the environment variable value
811        newenv = os.environ.copy()
812        newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
813        with self.assertRaises(ValueError):
814            subprocess.Popen(ZERO_RETURN_CMD, env=newenv)
815
816        # equal character in the environment variable name
817        newenv = os.environ.copy()
818        newenv["FRUIT=ORANGE"] = "lemon"
819        with self.assertRaises(ValueError):
820            subprocess.Popen(ZERO_RETURN_CMD, env=newenv)
821
822        # equal character in the environment variable value
823        newenv = os.environ.copy()
824        newenv["FRUIT"] = "orange=lemon"
825        with subprocess.Popen([sys.executable, "-c",
826                               'import sys, os;'
827                               'sys.stdout.write(os.getenv("FRUIT"))'],
828                              stdout=subprocess.PIPE,
829                              env=newenv) as p:
830            stdout, stderr = p.communicate()
831            self.assertEqual(stdout, b"orange=lemon")
832
833    def test_communicate_stdin(self):
834        p = subprocess.Popen([sys.executable, "-c",
835                              'import sys;'
836                              'sys.exit(sys.stdin.read() == "pear")'],
837                             stdin=subprocess.PIPE)
838        p.communicate(b"pear")
839        self.assertEqual(p.returncode, 1)
840
841    def test_communicate_stdout(self):
842        p = subprocess.Popen([sys.executable, "-c",
843                              'import sys; sys.stdout.write("pineapple")'],
844                             stdout=subprocess.PIPE)
845        (stdout, stderr) = p.communicate()
846        self.assertEqual(stdout, b"pineapple")
847        self.assertEqual(stderr, None)
848
849    def test_communicate_stderr(self):
850        p = subprocess.Popen([sys.executable, "-c",
851                              'import sys; sys.stderr.write("pineapple")'],
852                             stderr=subprocess.PIPE)
853        (stdout, stderr) = p.communicate()
854        self.assertEqual(stdout, None)
855        self.assertEqual(stderr, b"pineapple")
856
857    def test_communicate(self):
858        p = subprocess.Popen([sys.executable, "-c",
859                              'import sys,os;'
860                              'sys.stderr.write("pineapple");'
861                              'sys.stdout.write(sys.stdin.read())'],
862                             stdin=subprocess.PIPE,
863                             stdout=subprocess.PIPE,
864                             stderr=subprocess.PIPE)
865        self.addCleanup(p.stdout.close)
866        self.addCleanup(p.stderr.close)
867        self.addCleanup(p.stdin.close)
868        (stdout, stderr) = p.communicate(b"banana")
869        self.assertEqual(stdout, b"banana")
870        self.assertEqual(stderr, b"pineapple")
871
872    def test_communicate_timeout(self):
873        p = subprocess.Popen([sys.executable, "-c",
874                              'import sys,os,time;'
875                              'sys.stderr.write("pineapple\\n");'
876                              'time.sleep(1);'
877                              'sys.stderr.write("pear\\n");'
878                              'sys.stdout.write(sys.stdin.read())'],
879                             universal_newlines=True,
880                             stdin=subprocess.PIPE,
881                             stdout=subprocess.PIPE,
882                             stderr=subprocess.PIPE)
883        self.assertRaises(subprocess.TimeoutExpired, p.communicate, "banana",
884                          timeout=0.3)
885        # Make sure we can keep waiting for it, and that we get the whole output
886        # after it completes.
887        (stdout, stderr) = p.communicate()
888        self.assertEqual(stdout, "banana")
889        self.assertEqual(stderr.encode(), b"pineapple\npear\n")
890
891    def test_communicate_timeout_large_output(self):
892        # Test an expiring timeout while the child is outputting lots of data.
893        p = subprocess.Popen([sys.executable, "-c",
894                              'import sys,os,time;'
895                              'sys.stdout.write("a" * (64 * 1024));'
896                              'time.sleep(0.2);'
897                              'sys.stdout.write("a" * (64 * 1024));'
898                              'time.sleep(0.2);'
899                              'sys.stdout.write("a" * (64 * 1024));'
900                              'time.sleep(0.2);'
901                              'sys.stdout.write("a" * (64 * 1024));'],
902                             stdout=subprocess.PIPE)
903        self.assertRaises(subprocess.TimeoutExpired, p.communicate, timeout=0.4)
904        (stdout, _) = p.communicate()
905        self.assertEqual(len(stdout), 4 * 64 * 1024)
906
907    # Test for the fd leak reported in http://bugs.python.org/issue2791.
908    def test_communicate_pipe_fd_leak(self):
909        for stdin_pipe in (False, True):
910            for stdout_pipe in (False, True):
911                for stderr_pipe in (False, True):
912                    options = {}
913                    if stdin_pipe:
914                        options['stdin'] = subprocess.PIPE
915                    if stdout_pipe:
916                        options['stdout'] = subprocess.PIPE
917                    if stderr_pipe:
918                        options['stderr'] = subprocess.PIPE
919                    if not options:
920                        continue
921                    p = subprocess.Popen(ZERO_RETURN_CMD, **options)
922                    p.communicate()
923                    if p.stdin is not None:
924                        self.assertTrue(p.stdin.closed)
925                    if p.stdout is not None:
926                        self.assertTrue(p.stdout.closed)
927                    if p.stderr is not None:
928                        self.assertTrue(p.stderr.closed)
929
930    def test_communicate_returns(self):
931        # communicate() should return None if no redirection is active
932        p = subprocess.Popen([sys.executable, "-c",
933                              "import sys; sys.exit(47)"])
934        (stdout, stderr) = p.communicate()
935        self.assertEqual(stdout, None)
936        self.assertEqual(stderr, None)
937
938    def test_communicate_pipe_buf(self):
939        # communicate() with writes larger than pipe_buf
940        # This test will probably deadlock rather than fail, if
941        # communicate() does not work properly.
942        x, y = os.pipe()
943        os.close(x)
944        os.close(y)
945        p = subprocess.Popen([sys.executable, "-c",
946                              'import sys,os;'
947                              'sys.stdout.write(sys.stdin.read(47));'
948                              'sys.stderr.write("x" * %d);'
949                              'sys.stdout.write(sys.stdin.read())' %
950                              support.PIPE_MAX_SIZE],
951                             stdin=subprocess.PIPE,
952                             stdout=subprocess.PIPE,
953                             stderr=subprocess.PIPE)
954        self.addCleanup(p.stdout.close)
955        self.addCleanup(p.stderr.close)
956        self.addCleanup(p.stdin.close)
957        string_to_write = b"a" * support.PIPE_MAX_SIZE
958        (stdout, stderr) = p.communicate(string_to_write)
959        self.assertEqual(stdout, string_to_write)
960
961    def test_writes_before_communicate(self):
962        # stdin.write before communicate()
963        p = subprocess.Popen([sys.executable, "-c",
964                              'import sys,os;'
965                              'sys.stdout.write(sys.stdin.read())'],
966                             stdin=subprocess.PIPE,
967                             stdout=subprocess.PIPE,
968                             stderr=subprocess.PIPE)
969        self.addCleanup(p.stdout.close)
970        self.addCleanup(p.stderr.close)
971        self.addCleanup(p.stdin.close)
972        p.stdin.write(b"banana")
973        (stdout, stderr) = p.communicate(b"split")
974        self.assertEqual(stdout, b"bananasplit")
975        self.assertEqual(stderr, b"")
976
977    def test_universal_newlines_and_text(self):
978        args = [
979            sys.executable, "-c",
980            'import sys,os;' + SETBINARY +
981            'buf = sys.stdout.buffer;'
982            'buf.write(sys.stdin.readline().encode());'
983            'buf.flush();'
984            'buf.write(b"line2\\n");'
985            'buf.flush();'
986            'buf.write(sys.stdin.read().encode());'
987            'buf.flush();'
988            'buf.write(b"line4\\n");'
989            'buf.flush();'
990            'buf.write(b"line5\\r\\n");'
991            'buf.flush();'
992            'buf.write(b"line6\\r");'
993            'buf.flush();'
994            'buf.write(b"\\nline7");'
995            'buf.flush();'
996            'buf.write(b"\\nline8");']
997
998        for extra_kwarg in ('universal_newlines', 'text'):
999            p = subprocess.Popen(args, **{'stdin': subprocess.PIPE,
1000                                          'stdout': subprocess.PIPE,
1001                                          extra_kwarg: True})
1002            with p:
1003                p.stdin.write("line1\n")
1004                p.stdin.flush()
1005                self.assertEqual(p.stdout.readline(), "line1\n")
1006                p.stdin.write("line3\n")
1007                p.stdin.close()
1008                self.addCleanup(p.stdout.close)
1009                self.assertEqual(p.stdout.readline(),
1010                                 "line2\n")
1011                self.assertEqual(p.stdout.read(6),
1012                                 "line3\n")
1013                self.assertEqual(p.stdout.read(),
1014                                 "line4\nline5\nline6\nline7\nline8")
1015
1016    def test_universal_newlines_communicate(self):
1017        # universal newlines through communicate()
1018        p = subprocess.Popen([sys.executable, "-c",
1019                              'import sys,os;' + SETBINARY +
1020                              'buf = sys.stdout.buffer;'
1021                              'buf.write(b"line2\\n");'
1022                              'buf.flush();'
1023                              'buf.write(b"line4\\n");'
1024                              'buf.flush();'
1025                              'buf.write(b"line5\\r\\n");'
1026                              'buf.flush();'
1027                              'buf.write(b"line6\\r");'
1028                              'buf.flush();'
1029                              'buf.write(b"\\nline7");'
1030                              'buf.flush();'
1031                              'buf.write(b"\\nline8");'],
1032                             stderr=subprocess.PIPE,
1033                             stdout=subprocess.PIPE,
1034                             universal_newlines=1)
1035        self.addCleanup(p.stdout.close)
1036        self.addCleanup(p.stderr.close)
1037        (stdout, stderr) = p.communicate()
1038        self.assertEqual(stdout,
1039                         "line2\nline4\nline5\nline6\nline7\nline8")
1040
1041    def test_universal_newlines_communicate_stdin(self):
1042        # universal newlines through communicate(), with only stdin
1043        p = subprocess.Popen([sys.executable, "-c",
1044                              'import sys,os;' + SETBINARY + textwrap.dedent('''
1045                               s = sys.stdin.readline()
1046                               assert s == "line1\\n", repr(s)
1047                               s = sys.stdin.read()
1048                               assert s == "line3\\n", repr(s)
1049                              ''')],
1050                             stdin=subprocess.PIPE,
1051                             universal_newlines=1)
1052        (stdout, stderr) = p.communicate("line1\nline3\n")
1053        self.assertEqual(p.returncode, 0)
1054
1055    def test_universal_newlines_communicate_input_none(self):
1056        # Test communicate(input=None) with universal newlines.
1057        #
1058        # We set stdout to PIPE because, as of this writing, a different
1059        # code path is tested when the number of pipes is zero or one.
1060        p = subprocess.Popen(ZERO_RETURN_CMD,
1061                             stdin=subprocess.PIPE,
1062                             stdout=subprocess.PIPE,
1063                             universal_newlines=True)
1064        p.communicate()
1065        self.assertEqual(p.returncode, 0)
1066
1067    def test_universal_newlines_communicate_stdin_stdout_stderr(self):
1068        # universal newlines through communicate(), with stdin, stdout, stderr
1069        p = subprocess.Popen([sys.executable, "-c",
1070                              'import sys,os;' + SETBINARY + textwrap.dedent('''
1071                               s = sys.stdin.buffer.readline()
1072                               sys.stdout.buffer.write(s)
1073                               sys.stdout.buffer.write(b"line2\\r")
1074                               sys.stderr.buffer.write(b"eline2\\n")
1075                               s = sys.stdin.buffer.read()
1076                               sys.stdout.buffer.write(s)
1077                               sys.stdout.buffer.write(b"line4\\n")
1078                               sys.stdout.buffer.write(b"line5\\r\\n")
1079                               sys.stderr.buffer.write(b"eline6\\r")
1080                               sys.stderr.buffer.write(b"eline7\\r\\nz")
1081                              ''')],
1082                             stdin=subprocess.PIPE,
1083                             stderr=subprocess.PIPE,
1084                             stdout=subprocess.PIPE,
1085                             universal_newlines=True)
1086        self.addCleanup(p.stdout.close)
1087        self.addCleanup(p.stderr.close)
1088        (stdout, stderr) = p.communicate("line1\nline3\n")
1089        self.assertEqual(p.returncode, 0)
1090        self.assertEqual("line1\nline2\nline3\nline4\nline5\n", stdout)
1091        # Python debug build push something like "[42442 refs]\n"
1092        # to stderr at exit of subprocess.
1093        self.assertTrue(stderr.startswith("eline2\neline6\neline7\n"))
1094
1095    def test_universal_newlines_communicate_encodings(self):
1096        # Check that universal newlines mode works for various encodings,
1097        # in particular for encodings in the UTF-16 and UTF-32 families.
1098        # See issue #15595.
1099        #
1100        # UTF-16 and UTF-32-BE are sufficient to check both with BOM and
1101        # without, and UTF-16 and UTF-32.
1102        for encoding in ['utf-16', 'utf-32-be']:
1103            code = ("import sys; "
1104                    r"sys.stdout.buffer.write('1\r\n2\r3\n4'.encode('%s'))" %
1105                    encoding)
1106            args = [sys.executable, '-c', code]
1107            # We set stdin to be non-None because, as of this writing,
1108            # a different code path is used when the number of pipes is
1109            # zero or one.
1110            popen = subprocess.Popen(args,
1111                                     stdin=subprocess.PIPE,
1112                                     stdout=subprocess.PIPE,
1113                                     encoding=encoding)
1114            stdout, stderr = popen.communicate(input='')
1115            self.assertEqual(stdout, '1\n2\n3\n4')
1116
1117    def test_communicate_errors(self):
1118        for errors, expected in [
1119            ('ignore', ''),
1120            ('replace', '\ufffd\ufffd'),
1121            ('surrogateescape', '\udc80\udc80'),
1122            ('backslashreplace', '\\x80\\x80'),
1123        ]:
1124            code = ("import sys; "
1125                    r"sys.stdout.buffer.write(b'[\x80\x80]')")
1126            args = [sys.executable, '-c', code]
1127            # We set stdin to be non-None because, as of this writing,
1128            # a different code path is used when the number of pipes is
1129            # zero or one.
1130            popen = subprocess.Popen(args,
1131                                     stdin=subprocess.PIPE,
1132                                     stdout=subprocess.PIPE,
1133                                     encoding='utf-8',
1134                                     errors=errors)
1135            stdout, stderr = popen.communicate(input='')
1136            self.assertEqual(stdout, '[{}]'.format(expected))
1137
1138    def test_no_leaking(self):
1139        # Make sure we leak no resources
1140        if not mswindows:
1141            max_handles = 1026 # too much for most UNIX systems
1142        else:
1143            max_handles = 2050 # too much for (at least some) Windows setups
1144        handles = []
1145        tmpdir = tempfile.mkdtemp()
1146        try:
1147            for i in range(max_handles):
1148                try:
1149                    tmpfile = os.path.join(tmpdir, os_helper.TESTFN)
1150                    handles.append(os.open(tmpfile, os.O_WRONLY|os.O_CREAT))
1151                except OSError as e:
1152                    if e.errno != errno.EMFILE:
1153                        raise
1154                    break
1155            else:
1156                self.skipTest("failed to reach the file descriptor limit "
1157                    "(tried %d)" % max_handles)
1158            # Close a couple of them (should be enough for a subprocess)
1159            for i in range(10):
1160                os.close(handles.pop())
1161            # Loop creating some subprocesses. If one of them leaks some fds,
1162            # the next loop iteration will fail by reaching the max fd limit.
1163            for i in range(15):
1164                p = subprocess.Popen([sys.executable, "-c",
1165                                      "import sys;"
1166                                      "sys.stdout.write(sys.stdin.read())"],
1167                                     stdin=subprocess.PIPE,
1168                                     stdout=subprocess.PIPE,
1169                                     stderr=subprocess.PIPE)
1170                data = p.communicate(b"lime")[0]
1171                self.assertEqual(data, b"lime")
1172        finally:
1173            for h in handles:
1174                os.close(h)
1175            shutil.rmtree(tmpdir)
1176
1177    def test_list2cmdline(self):
1178        self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
1179                         '"a b c" d e')
1180        self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),
1181                         'ab\\"c \\ d')
1182        self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']),
1183                         'ab\\"c " \\\\" d')
1184        self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),
1185                         'a\\\\\\b "de fg" h')
1186        self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),
1187                         'a\\\\\\"b c d')
1188        self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),
1189                         '"a\\\\b c" d e')
1190        self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),
1191                         '"a\\\\b\\ c" d e')
1192        self.assertEqual(subprocess.list2cmdline(['ab', '']),
1193                         'ab ""')
1194
1195    def test_poll(self):
1196        p = subprocess.Popen([sys.executable, "-c",
1197                              "import os; os.read(0, 1)"],
1198                             stdin=subprocess.PIPE)
1199        self.addCleanup(p.stdin.close)
1200        self.assertIsNone(p.poll())
1201        os.write(p.stdin.fileno(), b'A')
1202        p.wait()
1203        # Subsequent invocations should just return the returncode
1204        self.assertEqual(p.poll(), 0)
1205
1206    def test_wait(self):
1207        p = subprocess.Popen(ZERO_RETURN_CMD)
1208        self.assertEqual(p.wait(), 0)
1209        # Subsequent invocations should just return the returncode
1210        self.assertEqual(p.wait(), 0)
1211
1212    def test_wait_timeout(self):
1213        p = subprocess.Popen([sys.executable,
1214                              "-c", "import time; time.sleep(0.3)"])
1215        with self.assertRaises(subprocess.TimeoutExpired) as c:
1216            p.wait(timeout=0.0001)
1217        self.assertIn("0.0001", str(c.exception))  # For coverage of __str__.
1218        self.assertEqual(p.wait(timeout=support.SHORT_TIMEOUT), 0)
1219
1220    def test_invalid_bufsize(self):
1221        # an invalid type of the bufsize argument should raise
1222        # TypeError.
1223        with self.assertRaises(TypeError):
1224            subprocess.Popen(ZERO_RETURN_CMD, "orange")
1225
1226    def test_bufsize_is_none(self):
1227        # bufsize=None should be the same as bufsize=0.
1228        p = subprocess.Popen(ZERO_RETURN_CMD, None)
1229        self.assertEqual(p.wait(), 0)
1230        # Again with keyword arg
1231        p = subprocess.Popen(ZERO_RETURN_CMD, bufsize=None)
1232        self.assertEqual(p.wait(), 0)
1233
1234    def _test_bufsize_equal_one(self, line, expected, universal_newlines):
1235        # subprocess may deadlock with bufsize=1, see issue #21332
1236        with subprocess.Popen([sys.executable, "-c", "import sys;"
1237                               "sys.stdout.write(sys.stdin.readline());"
1238                               "sys.stdout.flush()"],
1239                              stdin=subprocess.PIPE,
1240                              stdout=subprocess.PIPE,
1241                              stderr=subprocess.DEVNULL,
1242                              bufsize=1,
1243                              universal_newlines=universal_newlines) as p:
1244            p.stdin.write(line) # expect that it flushes the line in text mode
1245            os.close(p.stdin.fileno()) # close it without flushing the buffer
1246            read_line = p.stdout.readline()
1247            with support.SuppressCrashReport():
1248                try:
1249                    p.stdin.close()
1250                except OSError:
1251                    pass
1252            p.stdin = None
1253        self.assertEqual(p.returncode, 0)
1254        self.assertEqual(read_line, expected)
1255
1256    def test_bufsize_equal_one_text_mode(self):
1257        # line is flushed in text mode with bufsize=1.
1258        # we should get the full line in return
1259        line = "line\n"
1260        self._test_bufsize_equal_one(line, line, universal_newlines=True)
1261
1262    def test_bufsize_equal_one_binary_mode(self):
1263        # line is not flushed in binary mode with bufsize=1.
1264        # we should get empty response
1265        line = b'line' + os.linesep.encode() # assume ascii-based locale
1266        with self.assertWarnsRegex(RuntimeWarning, 'line buffering'):
1267            self._test_bufsize_equal_one(line, b'', universal_newlines=False)
1268
1269    def test_leaking_fds_on_error(self):
1270        # see bug #5179: Popen leaks file descriptors to PIPEs if
1271        # the child fails to execute; this will eventually exhaust
1272        # the maximum number of open fds. 1024 seems a very common
1273        # value for that limit, but Windows has 2048, so we loop
1274        # 1024 times (each call leaked two fds).
1275        for i in range(1024):
1276            with self.assertRaises(NONEXISTING_ERRORS):
1277                subprocess.Popen(NONEXISTING_CMD,
1278                                 stdout=subprocess.PIPE,
1279                                 stderr=subprocess.PIPE)
1280
1281    def test_nonexisting_with_pipes(self):
1282        # bpo-30121: Popen with pipes must close properly pipes on error.
1283        # Previously, os.close() was called with a Windows handle which is not
1284        # a valid file descriptor.
1285        #
1286        # Run the test in a subprocess to control how the CRT reports errors
1287        # and to get stderr content.
1288        try:
1289            import msvcrt
1290            msvcrt.CrtSetReportMode
1291        except (AttributeError, ImportError):
1292            self.skipTest("need msvcrt.CrtSetReportMode")
1293
1294        code = textwrap.dedent(f"""
1295            import msvcrt
1296            import subprocess
1297
1298            cmd = {NONEXISTING_CMD!r}
1299
1300            for report_type in [msvcrt.CRT_WARN,
1301                                msvcrt.CRT_ERROR,
1302                                msvcrt.CRT_ASSERT]:
1303                msvcrt.CrtSetReportMode(report_type, msvcrt.CRTDBG_MODE_FILE)
1304                msvcrt.CrtSetReportFile(report_type, msvcrt.CRTDBG_FILE_STDERR)
1305
1306            try:
1307                subprocess.Popen(cmd,
1308                                 stdout=subprocess.PIPE,
1309                                 stderr=subprocess.PIPE)
1310            except OSError:
1311                pass
1312        """)
1313        cmd = [sys.executable, "-c", code]
1314        proc = subprocess.Popen(cmd,
1315                                stderr=subprocess.PIPE,
1316                                universal_newlines=True)
1317        with proc:
1318            stderr = proc.communicate()[1]
1319        self.assertEqual(stderr, "")
1320        self.assertEqual(proc.returncode, 0)
1321
1322    def test_double_close_on_error(self):
1323        # Issue #18851
1324        fds = []
1325        def open_fds():
1326            for i in range(20):
1327                fds.extend(os.pipe())
1328                time.sleep(0.001)
1329        t = threading.Thread(target=open_fds)
1330        t.start()
1331        try:
1332            with self.assertRaises(EnvironmentError):
1333                subprocess.Popen(NONEXISTING_CMD,
1334                                 stdin=subprocess.PIPE,
1335                                 stdout=subprocess.PIPE,
1336                                 stderr=subprocess.PIPE)
1337        finally:
1338            t.join()
1339            exc = None
1340            for fd in fds:
1341                # If a double close occurred, some of those fds will
1342                # already have been closed by mistake, and os.close()
1343                # here will raise.
1344                try:
1345                    os.close(fd)
1346                except OSError as e:
1347                    exc = e
1348            if exc is not None:
1349                raise exc
1350
1351    def test_threadsafe_wait(self):
1352        """Issue21291: Popen.wait() needs to be threadsafe for returncode."""
1353        proc = subprocess.Popen([sys.executable, '-c',
1354                                 'import time; time.sleep(12)'])
1355        self.assertEqual(proc.returncode, None)
1356        results = []
1357
1358        def kill_proc_timer_thread():
1359            results.append(('thread-start-poll-result', proc.poll()))
1360            # terminate it from the thread and wait for the result.
1361            proc.kill()
1362            proc.wait()
1363            results.append(('thread-after-kill-and-wait', proc.returncode))
1364            # this wait should be a no-op given the above.
1365            proc.wait()
1366            results.append(('thread-after-second-wait', proc.returncode))
1367
1368        # This is a timing sensitive test, the failure mode is
1369        # triggered when both the main thread and this thread are in
1370        # the wait() call at once.  The delay here is to allow the
1371        # main thread to most likely be blocked in its wait() call.
1372        t = threading.Timer(0.2, kill_proc_timer_thread)
1373        t.start()
1374
1375        if mswindows:
1376            expected_errorcode = 1
1377        else:
1378            # Should be -9 because of the proc.kill() from the thread.
1379            expected_errorcode = -9
1380
1381        # Wait for the process to finish; the thread should kill it
1382        # long before it finishes on its own.  Supplying a timeout
1383        # triggers a different code path for better coverage.
1384        proc.wait(timeout=support.SHORT_TIMEOUT)
1385        self.assertEqual(proc.returncode, expected_errorcode,
1386                         msg="unexpected result in wait from main thread")
1387
1388        # This should be a no-op with no change in returncode.
1389        proc.wait()
1390        self.assertEqual(proc.returncode, expected_errorcode,
1391                         msg="unexpected result in second main wait.")
1392
1393        t.join()
1394        # Ensure that all of the thread results are as expected.
1395        # When a race condition occurs in wait(), the returncode could
1396        # be set by the wrong thread that doesn't actually have it
1397        # leading to an incorrect value.
1398        self.assertEqual([('thread-start-poll-result', None),
1399                          ('thread-after-kill-and-wait', expected_errorcode),
1400                          ('thread-after-second-wait', expected_errorcode)],
1401                         results)
1402
1403    def test_issue8780(self):
1404        # Ensure that stdout is inherited from the parent
1405        # if stdout=PIPE is not used
1406        code = ';'.join((
1407            'import subprocess, sys',
1408            'retcode = subprocess.call('
1409                "[sys.executable, '-c', 'print(\"Hello World!\")'])",
1410            'assert retcode == 0'))
1411        output = subprocess.check_output([sys.executable, '-c', code])
1412        self.assertTrue(output.startswith(b'Hello World!'), ascii(output))
1413
1414    def test_handles_closed_on_exception(self):
1415        # If CreateProcess exits with an error, ensure the
1416        # duplicate output handles are released
1417        ifhandle, ifname = tempfile.mkstemp()
1418        ofhandle, ofname = tempfile.mkstemp()
1419        efhandle, efname = tempfile.mkstemp()
1420        try:
1421            subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle,
1422              stderr=efhandle)
1423        except OSError:
1424            os.close(ifhandle)
1425            os.remove(ifname)
1426            os.close(ofhandle)
1427            os.remove(ofname)
1428            os.close(efhandle)
1429            os.remove(efname)
1430        self.assertFalse(os.path.exists(ifname))
1431        self.assertFalse(os.path.exists(ofname))
1432        self.assertFalse(os.path.exists(efname))
1433
1434    def test_communicate_epipe(self):
1435        # Issue 10963: communicate() should hide EPIPE
1436        p = subprocess.Popen(ZERO_RETURN_CMD,
1437                             stdin=subprocess.PIPE,
1438                             stdout=subprocess.PIPE,
1439                             stderr=subprocess.PIPE)
1440        self.addCleanup(p.stdout.close)
1441        self.addCleanup(p.stderr.close)
1442        self.addCleanup(p.stdin.close)
1443        p.communicate(b"x" * 2**20)
1444
1445    def test_repr(self):
1446        path_cmd = pathlib.Path("my-tool.py")
1447        pathlib_cls = path_cmd.__class__.__name__
1448
1449        cases = [
1450            ("ls", True, 123, "<Popen: returncode: 123 args: 'ls'>"),
1451            ('a' * 100, True, 0,
1452             "<Popen: returncode: 0 args: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa...>"),
1453            (["ls"], False, None, "<Popen: returncode: None args: ['ls']>"),
1454            (["ls", '--my-opts', 'a' * 100], False, None,
1455             "<Popen: returncode: None args: ['ls', '--my-opts', 'aaaaaaaaaaaaaaaaaaaaaaaa...>"),
1456            (path_cmd, False, 7, f"<Popen: returncode: 7 args: {pathlib_cls}('my-tool.py')>")
1457        ]
1458        with unittest.mock.patch.object(subprocess.Popen, '_execute_child'):
1459            for cmd, shell, code, sx in cases:
1460                p = subprocess.Popen(cmd, shell=shell)
1461                p.returncode = code
1462                self.assertEqual(repr(p), sx)
1463
1464    def test_communicate_epipe_only_stdin(self):
1465        # Issue 10963: communicate() should hide EPIPE
1466        p = subprocess.Popen(ZERO_RETURN_CMD,
1467                             stdin=subprocess.PIPE)
1468        self.addCleanup(p.stdin.close)
1469        p.wait()
1470        p.communicate(b"x" * 2**20)
1471
1472    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'),
1473                         "Requires signal.SIGUSR1")
1474    @unittest.skipUnless(hasattr(os, 'kill'),
1475                         "Requires os.kill")
1476    @unittest.skipUnless(hasattr(os, 'getppid'),
1477                         "Requires os.getppid")
1478    def test_communicate_eintr(self):
1479        # Issue #12493: communicate() should handle EINTR
1480        def handler(signum, frame):
1481            pass
1482        old_handler = signal.signal(signal.SIGUSR1, handler)
1483        self.addCleanup(signal.signal, signal.SIGUSR1, old_handler)
1484
1485        args = [sys.executable, "-c",
1486                'import os, signal;'
1487                'os.kill(os.getppid(), signal.SIGUSR1)']
1488        for stream in ('stdout', 'stderr'):
1489            kw = {stream: subprocess.PIPE}
1490            with subprocess.Popen(args, **kw) as process:
1491                # communicate() will be interrupted by SIGUSR1
1492                process.communicate()
1493
1494
1495    # This test is Linux-ish specific for simplicity to at least have
1496    # some coverage.  It is not a platform specific bug.
1497    @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()),
1498                         "Linux specific")
1499    def test_failed_child_execute_fd_leak(self):
1500        """Test for the fork() failure fd leak reported in issue16327."""
1501        fd_directory = '/proc/%d/fd' % os.getpid()
1502        fds_before_popen = os.listdir(fd_directory)
1503        with self.assertRaises(PopenTestException):
1504            PopenExecuteChildRaises(
1505                    ZERO_RETURN_CMD, stdin=subprocess.PIPE,
1506                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1507
1508        # NOTE: This test doesn't verify that the real _execute_child
1509        # does not close the file descriptors itself on the way out
1510        # during an exception.  Code inspection has confirmed that.
1511
1512        fds_after_exception = os.listdir(fd_directory)
1513        self.assertEqual(fds_before_popen, fds_after_exception)
1514
1515    @unittest.skipIf(mswindows, "behavior currently not supported on Windows")
1516    def test_file_not_found_includes_filename(self):
1517        with self.assertRaises(FileNotFoundError) as c:
1518            subprocess.call(['/opt/nonexistent_binary', 'with', 'some', 'args'])
1519        self.assertEqual(c.exception.filename, '/opt/nonexistent_binary')
1520
1521    @unittest.skipIf(mswindows, "behavior currently not supported on Windows")
1522    def test_file_not_found_with_bad_cwd(self):
1523        with self.assertRaises(FileNotFoundError) as c:
1524            subprocess.Popen(['exit', '0'], cwd='/some/nonexistent/directory')
1525        self.assertEqual(c.exception.filename, '/some/nonexistent/directory')
1526
1527    def test_class_getitems(self):
1528        self.assertIsInstance(subprocess.Popen[bytes], types.GenericAlias)
1529        self.assertIsInstance(subprocess.CompletedProcess[str], types.GenericAlias)
1530
1531class RunFuncTestCase(BaseTestCase):
1532    def run_python(self, code, **kwargs):
1533        """Run Python code in a subprocess using subprocess.run"""
1534        argv = [sys.executable, "-c", code]
1535        return subprocess.run(argv, **kwargs)
1536
1537    def test_returncode(self):
1538        # call() function with sequence argument
1539        cp = self.run_python("import sys; sys.exit(47)")
1540        self.assertEqual(cp.returncode, 47)
1541        with self.assertRaises(subprocess.CalledProcessError):
1542            cp.check_returncode()
1543
1544    def test_check(self):
1545        with self.assertRaises(subprocess.CalledProcessError) as c:
1546            self.run_python("import sys; sys.exit(47)", check=True)
1547        self.assertEqual(c.exception.returncode, 47)
1548
1549    def test_check_zero(self):
1550        # check_returncode shouldn't raise when returncode is zero
1551        cp = subprocess.run(ZERO_RETURN_CMD, check=True)
1552        self.assertEqual(cp.returncode, 0)
1553
1554    def test_timeout(self):
1555        # run() function with timeout argument; we want to test that the child
1556        # process gets killed when the timeout expires.  If the child isn't
1557        # killed, this call will deadlock since subprocess.run waits for the
1558        # child.
1559        with self.assertRaises(subprocess.TimeoutExpired):
1560            self.run_python("while True: pass", timeout=0.0001)
1561
1562    def test_capture_stdout(self):
1563        # capture stdout with zero return code
1564        cp = self.run_python("print('BDFL')", stdout=subprocess.PIPE)
1565        self.assertIn(b'BDFL', cp.stdout)
1566
1567    def test_capture_stderr(self):
1568        cp = self.run_python("import sys; sys.stderr.write('BDFL')",
1569                             stderr=subprocess.PIPE)
1570        self.assertIn(b'BDFL', cp.stderr)
1571
1572    def test_check_output_stdin_arg(self):
1573        # run() can be called with stdin set to a file
1574        tf = tempfile.TemporaryFile()
1575        self.addCleanup(tf.close)
1576        tf.write(b'pear')
1577        tf.seek(0)
1578        cp = self.run_python(
1579                 "import sys; sys.stdout.write(sys.stdin.read().upper())",
1580                stdin=tf, stdout=subprocess.PIPE)
1581        self.assertIn(b'PEAR', cp.stdout)
1582
1583    def test_check_output_input_arg(self):
1584        # check_output() can be called with input set to a string
1585        cp = self.run_python(
1586                "import sys; sys.stdout.write(sys.stdin.read().upper())",
1587                input=b'pear', stdout=subprocess.PIPE)
1588        self.assertIn(b'PEAR', cp.stdout)
1589
1590    def test_check_output_stdin_with_input_arg(self):
1591        # run() refuses to accept 'stdin' with 'input'
1592        tf = tempfile.TemporaryFile()
1593        self.addCleanup(tf.close)
1594        tf.write(b'pear')
1595        tf.seek(0)
1596        with self.assertRaises(ValueError,
1597              msg="Expected ValueError when stdin and input args supplied.") as c:
1598            output = self.run_python("print('will not be run')",
1599                                     stdin=tf, input=b'hare')
1600        self.assertIn('stdin', c.exception.args[0])
1601        self.assertIn('input', c.exception.args[0])
1602
1603    def test_check_output_timeout(self):
1604        with self.assertRaises(subprocess.TimeoutExpired) as c:
1605            cp = self.run_python((
1606                     "import sys, time\n"
1607                     "sys.stdout.write('BDFL')\n"
1608                     "sys.stdout.flush()\n"
1609                     "time.sleep(3600)"),
1610                    # Some heavily loaded buildbots (sparc Debian 3.x) require
1611                    # this much time to start and print.
1612                    timeout=3, stdout=subprocess.PIPE)
1613        self.assertEqual(c.exception.output, b'BDFL')
1614        # output is aliased to stdout
1615        self.assertEqual(c.exception.stdout, b'BDFL')
1616
1617    def test_run_kwargs(self):
1618        newenv = os.environ.copy()
1619        newenv["FRUIT"] = "banana"
1620        cp = self.run_python(('import sys, os;'
1621                      'sys.exit(33 if os.getenv("FRUIT")=="banana" else 31)'),
1622                             env=newenv)
1623        self.assertEqual(cp.returncode, 33)
1624
1625    def test_run_with_pathlike_path(self):
1626        # bpo-31961: test run(pathlike_object)
1627        # the name of a command that can be run without
1628        # any arguments that exit fast
1629        prog = 'tree.com' if mswindows else 'ls'
1630        path = shutil.which(prog)
1631        if path is None:
1632            self.skipTest(f'{prog} required for this test')
1633        path = FakePath(path)
1634        res = subprocess.run(path, stdout=subprocess.DEVNULL)
1635        self.assertEqual(res.returncode, 0)
1636        with self.assertRaises(TypeError):
1637            subprocess.run(path, stdout=subprocess.DEVNULL, shell=True)
1638
1639    def test_run_with_bytes_path_and_arguments(self):
1640        # bpo-31961: test run([bytes_object, b'additional arguments'])
1641        path = os.fsencode(sys.executable)
1642        args = [path, '-c', b'import sys; sys.exit(57)']
1643        res = subprocess.run(args)
1644        self.assertEqual(res.returncode, 57)
1645
1646    def test_run_with_pathlike_path_and_arguments(self):
1647        # bpo-31961: test run([pathlike_object, 'additional arguments'])
1648        path = FakePath(sys.executable)
1649        args = [path, '-c', 'import sys; sys.exit(57)']
1650        res = subprocess.run(args)
1651        self.assertEqual(res.returncode, 57)
1652
1653    def test_capture_output(self):
1654        cp = self.run_python(("import sys;"
1655                              "sys.stdout.write('BDFL'); "
1656                              "sys.stderr.write('FLUFL')"),
1657                             capture_output=True)
1658        self.assertIn(b'BDFL', cp.stdout)
1659        self.assertIn(b'FLUFL', cp.stderr)
1660
1661    def test_stdout_with_capture_output_arg(self):
1662        # run() refuses to accept 'stdout' with 'capture_output'
1663        tf = tempfile.TemporaryFile()
1664        self.addCleanup(tf.close)
1665        with self.assertRaises(ValueError,
1666            msg=("Expected ValueError when stdout and capture_output "
1667                 "args supplied.")) as c:
1668            output = self.run_python("print('will not be run')",
1669                                      capture_output=True, stdout=tf)
1670        self.assertIn('stdout', c.exception.args[0])
1671        self.assertIn('capture_output', c.exception.args[0])
1672
1673    def test_stderr_with_capture_output_arg(self):
1674        # run() refuses to accept 'stderr' with 'capture_output'
1675        tf = tempfile.TemporaryFile()
1676        self.addCleanup(tf.close)
1677        with self.assertRaises(ValueError,
1678            msg=("Expected ValueError when stderr and capture_output "
1679                 "args supplied.")) as c:
1680            output = self.run_python("print('will not be run')",
1681                                      capture_output=True, stderr=tf)
1682        self.assertIn('stderr', c.exception.args[0])
1683        self.assertIn('capture_output', c.exception.args[0])
1684
1685    # This test _might_ wind up a bit fragile on loaded build+test machines
1686    # as it depends on the timing with wide enough margins for normal situations
1687    # but does assert that it happened "soon enough" to believe the right thing
1688    # happened.
1689    @unittest.skipIf(mswindows, "requires posix like 'sleep' shell command")
1690    def test_run_with_shell_timeout_and_capture_output(self):
1691        """Output capturing after a timeout mustn't hang forever on open filehandles."""
1692        before_secs = time.monotonic()
1693        try:
1694            subprocess.run('sleep 3', shell=True, timeout=0.1,
1695                           capture_output=True)  # New session unspecified.
1696        except subprocess.TimeoutExpired as exc:
1697            after_secs = time.monotonic()
1698            stacks = traceback.format_exc()  # assertRaises doesn't give this.
1699        else:
1700            self.fail("TimeoutExpired not raised.")
1701        self.assertLess(after_secs - before_secs, 1.5,
1702                        msg="TimeoutExpired was delayed! Bad traceback:\n```\n"
1703                        f"{stacks}```")
1704
1705
1706def _get_test_grp_name():
1707    for name_group in ('staff', 'nogroup', 'grp', 'nobody', 'nfsnobody'):
1708        if grp:
1709            try:
1710                grp.getgrnam(name_group)
1711            except KeyError:
1712                continue
1713            return name_group
1714    else:
1715        raise unittest.SkipTest('No identified group name to use for this test on this platform.')
1716
1717
1718@unittest.skipIf(mswindows, "POSIX specific tests")
1719class POSIXProcessTestCase(BaseTestCase):
1720
1721    def setUp(self):
1722        super().setUp()
1723        self._nonexistent_dir = "/_this/pa.th/does/not/exist"
1724
1725    def _get_chdir_exception(self):
1726        try:
1727            os.chdir(self._nonexistent_dir)
1728        except OSError as e:
1729            # This avoids hard coding the errno value or the OS perror()
1730            # string and instead capture the exception that we want to see
1731            # below for comparison.
1732            desired_exception = e
1733        else:
1734            self.fail("chdir to nonexistent directory %s succeeded." %
1735                      self._nonexistent_dir)
1736        return desired_exception
1737
1738    def test_exception_cwd(self):
1739        """Test error in the child raised in the parent for a bad cwd."""
1740        desired_exception = self._get_chdir_exception()
1741        try:
1742            p = subprocess.Popen([sys.executable, "-c", ""],
1743                                 cwd=self._nonexistent_dir)
1744        except OSError as e:
1745            # Test that the child process chdir failure actually makes
1746            # it up to the parent process as the correct exception.
1747            self.assertEqual(desired_exception.errno, e.errno)
1748            self.assertEqual(desired_exception.strerror, e.strerror)
1749            self.assertEqual(desired_exception.filename, e.filename)
1750        else:
1751            self.fail("Expected OSError: %s" % desired_exception)
1752
1753    def test_exception_bad_executable(self):
1754        """Test error in the child raised in the parent for a bad executable."""
1755        desired_exception = self._get_chdir_exception()
1756        try:
1757            p = subprocess.Popen([sys.executable, "-c", ""],
1758                                 executable=self._nonexistent_dir)
1759        except OSError as e:
1760            # Test that the child process exec failure actually makes
1761            # it up to the parent process as the correct exception.
1762            self.assertEqual(desired_exception.errno, e.errno)
1763            self.assertEqual(desired_exception.strerror, e.strerror)
1764            self.assertEqual(desired_exception.filename, e.filename)
1765        else:
1766            self.fail("Expected OSError: %s" % desired_exception)
1767
1768    def test_exception_bad_args_0(self):
1769        """Test error in the child raised in the parent for a bad args[0]."""
1770        desired_exception = self._get_chdir_exception()
1771        try:
1772            p = subprocess.Popen([self._nonexistent_dir, "-c", ""])
1773        except OSError as e:
1774            # Test that the child process exec failure actually makes
1775            # it up to the parent process as the correct exception.
1776            self.assertEqual(desired_exception.errno, e.errno)
1777            self.assertEqual(desired_exception.strerror, e.strerror)
1778            self.assertEqual(desired_exception.filename, e.filename)
1779        else:
1780            self.fail("Expected OSError: %s" % desired_exception)
1781
1782    # We mock the __del__ method for Popen in the next two tests
1783    # because it does cleanup based on the pid returned by fork_exec
1784    # along with issuing a resource warning if it still exists. Since
1785    # we don't actually spawn a process in these tests we can forego
1786    # the destructor. An alternative would be to set _child_created to
1787    # False before the destructor is called but there is no easy way
1788    # to do that
1789    class PopenNoDestructor(subprocess.Popen):
1790        def __del__(self):
1791            pass
1792
1793    @mock.patch("subprocess._posixsubprocess.fork_exec")
1794    def test_exception_errpipe_normal(self, fork_exec):
1795        """Test error passing done through errpipe_write in the good case"""
1796        def proper_error(*args):
1797            errpipe_write = args[13]
1798            # Write the hex for the error code EISDIR: 'is a directory'
1799            err_code = '{:x}'.format(errno.EISDIR).encode()
1800            os.write(errpipe_write, b"OSError:" + err_code + b":")
1801            return 0
1802
1803        fork_exec.side_effect = proper_error
1804
1805        with mock.patch("subprocess.os.waitpid",
1806                        side_effect=ChildProcessError):
1807            with self.assertRaises(IsADirectoryError):
1808                self.PopenNoDestructor(["non_existent_command"])
1809
1810    @mock.patch("subprocess._posixsubprocess.fork_exec")
1811    def test_exception_errpipe_bad_data(self, fork_exec):
1812        """Test error passing done through errpipe_write where its not
1813        in the expected format"""
1814        error_data = b"\xFF\x00\xDE\xAD"
1815        def bad_error(*args):
1816            errpipe_write = args[13]
1817            # Anything can be in the pipe, no assumptions should
1818            # be made about its encoding, so we'll write some
1819            # arbitrary hex bytes to test it out
1820            os.write(errpipe_write, error_data)
1821            return 0
1822
1823        fork_exec.side_effect = bad_error
1824
1825        with mock.patch("subprocess.os.waitpid",
1826                        side_effect=ChildProcessError):
1827            with self.assertRaises(subprocess.SubprocessError) as e:
1828                self.PopenNoDestructor(["non_existent_command"])
1829
1830        self.assertIn(repr(error_data), str(e.exception))
1831
1832    @unittest.skipIf(not os.path.exists('/proc/self/status'),
1833                     "need /proc/self/status")
1834    def test_restore_signals(self):
1835        # Blindly assume that cat exists on systems with /proc/self/status...
1836        default_proc_status = subprocess.check_output(
1837                ['cat', '/proc/self/status'],
1838                restore_signals=False)
1839        for line in default_proc_status.splitlines():
1840            if line.startswith(b'SigIgn'):
1841                default_sig_ign_mask = line
1842                break
1843        else:
1844            self.skipTest("SigIgn not found in /proc/self/status.")
1845        restored_proc_status = subprocess.check_output(
1846                ['cat', '/proc/self/status'],
1847                restore_signals=True)
1848        for line in restored_proc_status.splitlines():
1849            if line.startswith(b'SigIgn'):
1850                restored_sig_ign_mask = line
1851                break
1852        self.assertNotEqual(default_sig_ign_mask, restored_sig_ign_mask,
1853                            msg="restore_signals=True should've unblocked "
1854                            "SIGPIPE and friends.")
1855
1856    def test_start_new_session(self):
1857        # For code coverage of calling setsid().  We don't care if we get an
1858        # EPERM error from it depending on the test execution environment, that
1859        # still indicates that it was called.
1860        try:
1861            output = subprocess.check_output(
1862                    [sys.executable, "-c", "import os; print(os.getsid(0))"],
1863                    start_new_session=True)
1864        except OSError as e:
1865            if e.errno != errno.EPERM:
1866                raise
1867        else:
1868            parent_sid = os.getsid(0)
1869            child_sid = int(output)
1870            self.assertNotEqual(parent_sid, child_sid)
1871
1872    @unittest.skipUnless(hasattr(os, 'setreuid'), 'no setreuid on platform')
1873    def test_user(self):
1874        # For code coverage of the user parameter.  We don't care if we get an
1875        # EPERM error from it depending on the test execution environment, that
1876        # still indicates that it was called.
1877
1878        uid = os.geteuid()
1879        test_users = [65534 if uid != 65534 else 65533, uid]
1880        name_uid = "nobody" if sys.platform != 'darwin' else "unknown"
1881
1882        if pwd is not None:
1883            try:
1884                pwd.getpwnam(name_uid)
1885                test_users.append(name_uid)
1886            except KeyError:
1887                # unknown user name
1888                name_uid = None
1889
1890        for user in test_users:
1891            # posix_spawn() may be used with close_fds=False
1892            for close_fds in (False, True):
1893                with self.subTest(user=user, close_fds=close_fds):
1894                    try:
1895                        output = subprocess.check_output(
1896                                [sys.executable, "-c",
1897                                 "import os; print(os.getuid())"],
1898                                user=user,
1899                                close_fds=close_fds)
1900                    except PermissionError:  # (EACCES, EPERM)
1901                        pass
1902                    except OSError as e:
1903                        if e.errno not in (errno.EACCES, errno.EPERM):
1904                            raise
1905                    else:
1906                        if isinstance(user, str):
1907                            user_uid = pwd.getpwnam(user).pw_uid
1908                        else:
1909                            user_uid = user
1910                        child_user = int(output)
1911                        self.assertEqual(child_user, user_uid)
1912
1913        with self.assertRaises(ValueError):
1914            subprocess.check_call(ZERO_RETURN_CMD, user=-1)
1915
1916        with self.assertRaises(OverflowError):
1917            subprocess.check_call(ZERO_RETURN_CMD,
1918                                  cwd=os.curdir, env=os.environ, user=2**64)
1919
1920        if pwd is None and name_uid is not None:
1921            with self.assertRaises(ValueError):
1922                subprocess.check_call(ZERO_RETURN_CMD, user=name_uid)
1923
1924    @unittest.skipIf(hasattr(os, 'setreuid'), 'setreuid() available on platform')
1925    def test_user_error(self):
1926        with self.assertRaises(ValueError):
1927            subprocess.check_call(ZERO_RETURN_CMD, user=65535)
1928
1929    @unittest.skipUnless(hasattr(os, 'setregid'), 'no setregid() on platform')
1930    def test_group(self):
1931        gid = os.getegid()
1932        group_list = [65534 if gid != 65534 else 65533]
1933        name_group = _get_test_grp_name()
1934
1935        if grp is not None:
1936            group_list.append(name_group)
1937
1938        for group in group_list + [gid]:
1939            # posix_spawn() may be used with close_fds=False
1940            for close_fds in (False, True):
1941                with self.subTest(group=group, close_fds=close_fds):
1942                    try:
1943                        output = subprocess.check_output(
1944                                [sys.executable, "-c",
1945                                 "import os; print(os.getgid())"],
1946                                group=group,
1947                                close_fds=close_fds)
1948                    except PermissionError:  # (EACCES, EPERM)
1949                        pass
1950                    else:
1951                        if isinstance(group, str):
1952                            group_gid = grp.getgrnam(group).gr_gid
1953                        else:
1954                            group_gid = group
1955
1956                        child_group = int(output)
1957                        self.assertEqual(child_group, group_gid)
1958
1959        # make sure we bomb on negative values
1960        with self.assertRaises(ValueError):
1961            subprocess.check_call(ZERO_RETURN_CMD, group=-1)
1962
1963        with self.assertRaises(OverflowError):
1964            subprocess.check_call(ZERO_RETURN_CMD,
1965                                  cwd=os.curdir, env=os.environ, group=2**64)
1966
1967        if grp is None:
1968            with self.assertRaises(ValueError):
1969                subprocess.check_call(ZERO_RETURN_CMD, group=name_group)
1970
1971    @unittest.skipIf(hasattr(os, 'setregid'), 'setregid() available on platform')
1972    def test_group_error(self):
1973        with self.assertRaises(ValueError):
1974            subprocess.check_call(ZERO_RETURN_CMD, group=65535)
1975
1976    @unittest.skipUnless(hasattr(os, 'setgroups'), 'no setgroups() on platform')
1977    def test_extra_groups(self):
1978        gid = os.getegid()
1979        group_list = [65534 if gid != 65534 else 65533]
1980        name_group = _get_test_grp_name()
1981        perm_error = False
1982
1983        if grp is not None:
1984            group_list.append(name_group)
1985
1986        try:
1987            output = subprocess.check_output(
1988                    [sys.executable, "-c",
1989                     "import os, sys, json; json.dump(os.getgroups(), sys.stdout)"],
1990                    extra_groups=group_list)
1991        except OSError as ex:
1992            if ex.errno != errno.EPERM:
1993                raise
1994            perm_error = True
1995
1996        else:
1997            parent_groups = os.getgroups()
1998            child_groups = json.loads(output)
1999
2000            if grp is not None:
2001                desired_gids = [grp.getgrnam(g).gr_gid if isinstance(g, str) else g
2002                                for g in group_list]
2003            else:
2004                desired_gids = group_list
2005
2006            if perm_error:
2007                self.assertEqual(set(child_groups), set(parent_groups))
2008            else:
2009                self.assertEqual(set(desired_gids), set(child_groups))
2010
2011        # make sure we bomb on negative values
2012        with self.assertRaises(ValueError):
2013            subprocess.check_call(ZERO_RETURN_CMD, extra_groups=[-1])
2014
2015        with self.assertRaises(ValueError):
2016            subprocess.check_call(ZERO_RETURN_CMD,
2017                                  cwd=os.curdir, env=os.environ,
2018                                  extra_groups=[2**64])
2019
2020        if grp is None:
2021            with self.assertRaises(ValueError):
2022                subprocess.check_call(ZERO_RETURN_CMD,
2023                                      extra_groups=[name_group])
2024
2025    @unittest.skipIf(hasattr(os, 'setgroups'), 'setgroups() available on platform')
2026    def test_extra_groups_error(self):
2027        with self.assertRaises(ValueError):
2028            subprocess.check_call(ZERO_RETURN_CMD, extra_groups=[])
2029
2030    @unittest.skipIf(mswindows or not hasattr(os, 'umask'),
2031                     'POSIX umask() is not available.')
2032    def test_umask(self):
2033        tmpdir = None
2034        try:
2035            tmpdir = tempfile.mkdtemp()
2036            name = os.path.join(tmpdir, "beans")
2037            # We set an unusual umask in the child so as a unique mode
2038            # for us to test the child's touched file for.
2039            subprocess.check_call(
2040                    [sys.executable, "-c", f"open({name!r}, 'w').close()"],
2041                    umask=0o053)
2042            # Ignore execute permissions entirely in our test,
2043            # filesystems could be mounted to ignore or force that.
2044            st_mode = os.stat(name).st_mode & 0o666
2045            expected_mode = 0o624
2046            self.assertEqual(expected_mode, st_mode,
2047                             msg=f'{oct(expected_mode)} != {oct(st_mode)}')
2048        finally:
2049            if tmpdir is not None:
2050                shutil.rmtree(tmpdir)
2051
2052    def test_run_abort(self):
2053        # returncode handles signal termination
2054        with support.SuppressCrashReport():
2055            p = subprocess.Popen([sys.executable, "-c",
2056                                  'import os; os.abort()'])
2057            p.wait()
2058        self.assertEqual(-p.returncode, signal.SIGABRT)
2059
2060    def test_CalledProcessError_str_signal(self):
2061        err = subprocess.CalledProcessError(-int(signal.SIGABRT), "fake cmd")
2062        error_string = str(err)
2063        # We're relying on the repr() of the signal.Signals intenum to provide
2064        # the word signal, the signal name and the numeric value.
2065        self.assertIn("signal", error_string.lower())
2066        # We're not being specific about the signal name as some signals have
2067        # multiple names and which name is revealed can vary.
2068        self.assertIn("SIG", error_string)
2069        self.assertIn(str(signal.SIGABRT), error_string)
2070
2071    def test_CalledProcessError_str_unknown_signal(self):
2072        err = subprocess.CalledProcessError(-9876543, "fake cmd")
2073        error_string = str(err)
2074        self.assertIn("unknown signal 9876543.", error_string)
2075
2076    def test_CalledProcessError_str_non_zero(self):
2077        err = subprocess.CalledProcessError(2, "fake cmd")
2078        error_string = str(err)
2079        self.assertIn("non-zero exit status 2.", error_string)
2080
2081    def test_preexec(self):
2082        # DISCLAIMER: Setting environment variables is *not* a good use
2083        # of a preexec_fn.  This is merely a test.
2084        p = subprocess.Popen([sys.executable, "-c",
2085                              'import sys,os;'
2086                              'sys.stdout.write(os.getenv("FRUIT"))'],
2087                             stdout=subprocess.PIPE,
2088                             preexec_fn=lambda: os.putenv("FRUIT", "apple"))
2089        with p:
2090            self.assertEqual(p.stdout.read(), b"apple")
2091
2092    def test_preexec_exception(self):
2093        def raise_it():
2094            raise ValueError("What if two swallows carried a coconut?")
2095        try:
2096            p = subprocess.Popen([sys.executable, "-c", ""],
2097                                 preexec_fn=raise_it)
2098        except subprocess.SubprocessError as e:
2099            self.assertTrue(
2100                    subprocess._posixsubprocess,
2101                    "Expected a ValueError from the preexec_fn")
2102        except ValueError as e:
2103            self.assertIn("coconut", e.args[0])
2104        else:
2105            self.fail("Exception raised by preexec_fn did not make it "
2106                      "to the parent process.")
2107
2108    class _TestExecuteChildPopen(subprocess.Popen):
2109        """Used to test behavior at the end of _execute_child."""
2110        def __init__(self, testcase, *args, **kwargs):
2111            self._testcase = testcase
2112            subprocess.Popen.__init__(self, *args, **kwargs)
2113
2114        def _execute_child(self, *args, **kwargs):
2115            try:
2116                subprocess.Popen._execute_child(self, *args, **kwargs)
2117            finally:
2118                # Open a bunch of file descriptors and verify that
2119                # none of them are the same as the ones the Popen
2120                # instance is using for stdin/stdout/stderr.
2121                devzero_fds = [os.open("/dev/zero", os.O_RDONLY)
2122                               for _ in range(8)]
2123                try:
2124                    for fd in devzero_fds:
2125                        self._testcase.assertNotIn(
2126                                fd, (self.stdin.fileno(), self.stdout.fileno(),
2127                                     self.stderr.fileno()),
2128                                msg="At least one fd was closed early.")
2129                finally:
2130                    for fd in devzero_fds:
2131                        os.close(fd)
2132
2133    @unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.")
2134    def test_preexec_errpipe_does_not_double_close_pipes(self):
2135        """Issue16140: Don't double close pipes on preexec error."""
2136
2137        def raise_it():
2138            raise subprocess.SubprocessError(
2139                    "force the _execute_child() errpipe_data path.")
2140
2141        with self.assertRaises(subprocess.SubprocessError):
2142            self._TestExecuteChildPopen(
2143                        self, ZERO_RETURN_CMD,
2144                        stdin=subprocess.PIPE, stdout=subprocess.PIPE,
2145                        stderr=subprocess.PIPE, preexec_fn=raise_it)
2146
2147    def test_preexec_gc_module_failure(self):
2148        # This tests the code that disables garbage collection if the child
2149        # process will execute any Python.
2150        enabled = gc.isenabled()
2151        try:
2152            gc.disable()
2153            self.assertFalse(gc.isenabled())
2154            subprocess.call([sys.executable, '-c', ''],
2155                            preexec_fn=lambda: None)
2156            self.assertFalse(gc.isenabled(),
2157                             "Popen enabled gc when it shouldn't.")
2158
2159            gc.enable()
2160            self.assertTrue(gc.isenabled())
2161            subprocess.call([sys.executable, '-c', ''],
2162                            preexec_fn=lambda: None)
2163            self.assertTrue(gc.isenabled(), "Popen left gc disabled.")
2164        finally:
2165            if not enabled:
2166                gc.disable()
2167
2168    @unittest.skipIf(
2169        sys.platform == 'darwin', 'setrlimit() seems to fail on OS X')
2170    def test_preexec_fork_failure(self):
2171        # The internal code did not preserve the previous exception when
2172        # re-enabling garbage collection
2173        try:
2174            from resource import getrlimit, setrlimit, RLIMIT_NPROC
2175        except ImportError as err:
2176            self.skipTest(err)  # RLIMIT_NPROC is specific to Linux and BSD
2177        limits = getrlimit(RLIMIT_NPROC)
2178        [_, hard] = limits
2179        setrlimit(RLIMIT_NPROC, (0, hard))
2180        self.addCleanup(setrlimit, RLIMIT_NPROC, limits)
2181        try:
2182            subprocess.call([sys.executable, '-c', ''],
2183                            preexec_fn=lambda: None)
2184        except BlockingIOError:
2185            # Forking should raise EAGAIN, translated to BlockingIOError
2186            pass
2187        else:
2188            self.skipTest('RLIMIT_NPROC had no effect; probably superuser')
2189
2190    def test_args_string(self):
2191        # args is a string
2192        fd, fname = tempfile.mkstemp()
2193        # reopen in text mode
2194        with open(fd, "w", errors="surrogateescape") as fobj:
2195            fobj.write("#!%s\n" % support.unix_shell)
2196            fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" %
2197                       sys.executable)
2198        os.chmod(fname, 0o700)
2199        p = subprocess.Popen(fname)
2200        p.wait()
2201        os.remove(fname)
2202        self.assertEqual(p.returncode, 47)
2203
2204    def test_invalid_args(self):
2205        # invalid arguments should raise ValueError
2206        self.assertRaises(ValueError, subprocess.call,
2207                          [sys.executable, "-c",
2208                           "import sys; sys.exit(47)"],
2209                          startupinfo=47)
2210        self.assertRaises(ValueError, subprocess.call,
2211                          [sys.executable, "-c",
2212                           "import sys; sys.exit(47)"],
2213                          creationflags=47)
2214
2215    def test_shell_sequence(self):
2216        # Run command through the shell (sequence)
2217        newenv = os.environ.copy()
2218        newenv["FRUIT"] = "apple"
2219        p = subprocess.Popen(["echo $FRUIT"], shell=1,
2220                             stdout=subprocess.PIPE,
2221                             env=newenv)
2222        with p:
2223            self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
2224
2225    def test_shell_string(self):
2226        # Run command through the shell (string)
2227        newenv = os.environ.copy()
2228        newenv["FRUIT"] = "apple"
2229        p = subprocess.Popen("echo $FRUIT", shell=1,
2230                             stdout=subprocess.PIPE,
2231                             env=newenv)
2232        with p:
2233            self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
2234
2235    def test_call_string(self):
2236        # call() function with string argument on UNIX
2237        fd, fname = tempfile.mkstemp()
2238        # reopen in text mode
2239        with open(fd, "w", errors="surrogateescape") as fobj:
2240            fobj.write("#!%s\n" % support.unix_shell)
2241            fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" %
2242                       sys.executable)
2243        os.chmod(fname, 0o700)
2244        rc = subprocess.call(fname)
2245        os.remove(fname)
2246        self.assertEqual(rc, 47)
2247
2248    def test_specific_shell(self):
2249        # Issue #9265: Incorrect name passed as arg[0].
2250        shells = []
2251        for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']:
2252            for name in ['bash', 'ksh']:
2253                sh = os.path.join(prefix, name)
2254                if os.path.isfile(sh):
2255                    shells.append(sh)
2256        if not shells: # Will probably work for any shell but csh.
2257            self.skipTest("bash or ksh required for this test")
2258        sh = '/bin/sh'
2259        if os.path.isfile(sh) and not os.path.islink(sh):
2260            # Test will fail if /bin/sh is a symlink to csh.
2261            shells.append(sh)
2262        for sh in shells:
2263            p = subprocess.Popen("echo $0", executable=sh, shell=True,
2264                                 stdout=subprocess.PIPE)
2265            with p:
2266                self.assertEqual(p.stdout.read().strip(), bytes(sh, 'ascii'))
2267
2268    def _kill_process(self, method, *args):
2269        # Do not inherit file handles from the parent.
2270        # It should fix failures on some platforms.
2271        # Also set the SIGINT handler to the default to make sure it's not
2272        # being ignored (some tests rely on that.)
2273        old_handler = signal.signal(signal.SIGINT, signal.default_int_handler)
2274        try:
2275            p = subprocess.Popen([sys.executable, "-c", """if 1:
2276                                 import sys, time
2277                                 sys.stdout.write('x\\n')
2278                                 sys.stdout.flush()
2279                                 time.sleep(30)
2280                                 """],
2281                                 close_fds=True,
2282                                 stdin=subprocess.PIPE,
2283                                 stdout=subprocess.PIPE,
2284                                 stderr=subprocess.PIPE)
2285        finally:
2286            signal.signal(signal.SIGINT, old_handler)
2287        # Wait for the interpreter to be completely initialized before
2288        # sending any signal.
2289        p.stdout.read(1)
2290        getattr(p, method)(*args)
2291        return p
2292
2293    @unittest.skipIf(sys.platform.startswith(('netbsd', 'openbsd')),
2294                     "Due to known OS bug (issue #16762)")
2295    def _kill_dead_process(self, method, *args):
2296        # Do not inherit file handles from the parent.
2297        # It should fix failures on some platforms.
2298        p = subprocess.Popen([sys.executable, "-c", """if 1:
2299                             import sys, time
2300                             sys.stdout.write('x\\n')
2301                             sys.stdout.flush()
2302                             """],
2303                             close_fds=True,
2304                             stdin=subprocess.PIPE,
2305                             stdout=subprocess.PIPE,
2306                             stderr=subprocess.PIPE)
2307        # Wait for the interpreter to be completely initialized before
2308        # sending any signal.
2309        p.stdout.read(1)
2310        # The process should end after this
2311        time.sleep(1)
2312        # This shouldn't raise even though the child is now dead
2313        getattr(p, method)(*args)
2314        p.communicate()
2315
2316    def test_send_signal(self):
2317        p = self._kill_process('send_signal', signal.SIGINT)
2318        _, stderr = p.communicate()
2319        self.assertIn(b'KeyboardInterrupt', stderr)
2320        self.assertNotEqual(p.wait(), 0)
2321
2322    def test_kill(self):
2323        p = self._kill_process('kill')
2324        _, stderr = p.communicate()
2325        self.assertEqual(stderr, b'')
2326        self.assertEqual(p.wait(), -signal.SIGKILL)
2327
2328    def test_terminate(self):
2329        p = self._kill_process('terminate')
2330        _, stderr = p.communicate()
2331        self.assertEqual(stderr, b'')
2332        self.assertEqual(p.wait(), -signal.SIGTERM)
2333
2334    def test_send_signal_dead(self):
2335        # Sending a signal to a dead process
2336        self._kill_dead_process('send_signal', signal.SIGINT)
2337
2338    def test_kill_dead(self):
2339        # Killing a dead process
2340        self._kill_dead_process('kill')
2341
2342    def test_terminate_dead(self):
2343        # Terminating a dead process
2344        self._kill_dead_process('terminate')
2345
2346    def _save_fds(self, save_fds):
2347        fds = []
2348        for fd in save_fds:
2349            inheritable = os.get_inheritable(fd)
2350            saved = os.dup(fd)
2351            fds.append((fd, saved, inheritable))
2352        return fds
2353
2354    def _restore_fds(self, fds):
2355        for fd, saved, inheritable in fds:
2356            os.dup2(saved, fd, inheritable=inheritable)
2357            os.close(saved)
2358
2359    def check_close_std_fds(self, fds):
2360        # Issue #9905: test that subprocess pipes still work properly with
2361        # some standard fds closed
2362        stdin = 0
2363        saved_fds = self._save_fds(fds)
2364        for fd, saved, inheritable in saved_fds:
2365            if fd == 0:
2366                stdin = saved
2367                break
2368        try:
2369            for fd in fds:
2370                os.close(fd)
2371            out, err = subprocess.Popen([sys.executable, "-c",
2372                              'import sys;'
2373                              'sys.stdout.write("apple");'
2374                              'sys.stdout.flush();'
2375                              'sys.stderr.write("orange")'],
2376                       stdin=stdin,
2377                       stdout=subprocess.PIPE,
2378                       stderr=subprocess.PIPE).communicate()
2379            self.assertEqual(out, b'apple')
2380            self.assertEqual(err, b'orange')
2381        finally:
2382            self._restore_fds(saved_fds)
2383
2384    def test_close_fd_0(self):
2385        self.check_close_std_fds([0])
2386
2387    def test_close_fd_1(self):
2388        self.check_close_std_fds([1])
2389
2390    def test_close_fd_2(self):
2391        self.check_close_std_fds([2])
2392
2393    def test_close_fds_0_1(self):
2394        self.check_close_std_fds([0, 1])
2395
2396    def test_close_fds_0_2(self):
2397        self.check_close_std_fds([0, 2])
2398
2399    def test_close_fds_1_2(self):
2400        self.check_close_std_fds([1, 2])
2401
2402    def test_close_fds_0_1_2(self):
2403        # Issue #10806: test that subprocess pipes still work properly with
2404        # all standard fds closed.
2405        self.check_close_std_fds([0, 1, 2])
2406
2407    def test_small_errpipe_write_fd(self):
2408        """Issue #15798: Popen should work when stdio fds are available."""
2409        new_stdin = os.dup(0)
2410        new_stdout = os.dup(1)
2411        try:
2412            os.close(0)
2413            os.close(1)
2414
2415            # Side test: if errpipe_write fails to have its CLOEXEC
2416            # flag set this should cause the parent to think the exec
2417            # failed.  Extremely unlikely: everyone supports CLOEXEC.
2418            subprocess.Popen([
2419                    sys.executable, "-c",
2420                    "print('AssertionError:0:CLOEXEC failure.')"]).wait()
2421        finally:
2422            # Restore original stdin and stdout
2423            os.dup2(new_stdin, 0)
2424            os.dup2(new_stdout, 1)
2425            os.close(new_stdin)
2426            os.close(new_stdout)
2427
2428    def test_remapping_std_fds(self):
2429        # open up some temporary files
2430        temps = [tempfile.mkstemp() for i in range(3)]
2431        try:
2432            temp_fds = [fd for fd, fname in temps]
2433
2434            # unlink the files -- we won't need to reopen them
2435            for fd, fname in temps:
2436                os.unlink(fname)
2437
2438            # write some data to what will become stdin, and rewind
2439            os.write(temp_fds[1], b"STDIN")
2440            os.lseek(temp_fds[1], 0, 0)
2441
2442            # move the standard file descriptors out of the way
2443            saved_fds = self._save_fds(range(3))
2444            try:
2445                # duplicate the file objects over the standard fd's
2446                for fd, temp_fd in enumerate(temp_fds):
2447                    os.dup2(temp_fd, fd)
2448
2449                # now use those files in the "wrong" order, so that subprocess
2450                # has to rearrange them in the child
2451                p = subprocess.Popen([sys.executable, "-c",
2452                    'import sys; got = sys.stdin.read();'
2453                    'sys.stdout.write("got %s"%got); sys.stderr.write("err")'],
2454                    stdin=temp_fds[1],
2455                    stdout=temp_fds[2],
2456                    stderr=temp_fds[0])
2457                p.wait()
2458            finally:
2459                self._restore_fds(saved_fds)
2460
2461            for fd in temp_fds:
2462                os.lseek(fd, 0, 0)
2463
2464            out = os.read(temp_fds[2], 1024)
2465            err = os.read(temp_fds[0], 1024).strip()
2466            self.assertEqual(out, b"got STDIN")
2467            self.assertEqual(err, b"err")
2468
2469        finally:
2470            for fd in temp_fds:
2471                os.close(fd)
2472
2473    def check_swap_fds(self, stdin_no, stdout_no, stderr_no):
2474        # open up some temporary files
2475        temps = [tempfile.mkstemp() for i in range(3)]
2476        temp_fds = [fd for fd, fname in temps]
2477        try:
2478            # unlink the files -- we won't need to reopen them
2479            for fd, fname in temps:
2480                os.unlink(fname)
2481
2482            # save a copy of the standard file descriptors
2483            saved_fds = self._save_fds(range(3))
2484            try:
2485                # duplicate the temp files over the standard fd's 0, 1, 2
2486                for fd, temp_fd in enumerate(temp_fds):
2487                    os.dup2(temp_fd, fd)
2488
2489                # write some data to what will become stdin, and rewind
2490                os.write(stdin_no, b"STDIN")
2491                os.lseek(stdin_no, 0, 0)
2492
2493                # now use those files in the given order, so that subprocess
2494                # has to rearrange them in the child
2495                p = subprocess.Popen([sys.executable, "-c",
2496                    'import sys; got = sys.stdin.read();'
2497                    'sys.stdout.write("got %s"%got); sys.stderr.write("err")'],
2498                    stdin=stdin_no,
2499                    stdout=stdout_no,
2500                    stderr=stderr_no)
2501                p.wait()
2502
2503                for fd in temp_fds:
2504                    os.lseek(fd, 0, 0)
2505
2506                out = os.read(stdout_no, 1024)
2507                err = os.read(stderr_no, 1024).strip()
2508            finally:
2509                self._restore_fds(saved_fds)
2510
2511            self.assertEqual(out, b"got STDIN")
2512            self.assertEqual(err, b"err")
2513
2514        finally:
2515            for fd in temp_fds:
2516                os.close(fd)
2517
2518    # When duping fds, if there arises a situation where one of the fds is
2519    # either 0, 1 or 2, it is possible that it is overwritten (#12607).
2520    # This tests all combinations of this.
2521    def test_swap_fds(self):
2522        self.check_swap_fds(0, 1, 2)
2523        self.check_swap_fds(0, 2, 1)
2524        self.check_swap_fds(1, 0, 2)
2525        self.check_swap_fds(1, 2, 0)
2526        self.check_swap_fds(2, 0, 1)
2527        self.check_swap_fds(2, 1, 0)
2528
2529    def _check_swap_std_fds_with_one_closed(self, from_fds, to_fds):
2530        saved_fds = self._save_fds(range(3))
2531        try:
2532            for from_fd in from_fds:
2533                with tempfile.TemporaryFile() as f:
2534                    os.dup2(f.fileno(), from_fd)
2535
2536            fd_to_close = (set(range(3)) - set(from_fds)).pop()
2537            os.close(fd_to_close)
2538
2539            arg_names = ['stdin', 'stdout', 'stderr']
2540            kwargs = {}
2541            for from_fd, to_fd in zip(from_fds, to_fds):
2542                kwargs[arg_names[to_fd]] = from_fd
2543
2544            code = textwrap.dedent(r'''
2545                import os, sys
2546                skipped_fd = int(sys.argv[1])
2547                for fd in range(3):
2548                    if fd != skipped_fd:
2549                        os.write(fd, str(fd).encode('ascii'))
2550            ''')
2551
2552            skipped_fd = (set(range(3)) - set(to_fds)).pop()
2553
2554            rc = subprocess.call([sys.executable, '-c', code, str(skipped_fd)],
2555                                 **kwargs)
2556            self.assertEqual(rc, 0)
2557
2558            for from_fd, to_fd in zip(from_fds, to_fds):
2559                os.lseek(from_fd, 0, os.SEEK_SET)
2560                read_bytes = os.read(from_fd, 1024)
2561                read_fds = list(map(int, read_bytes.decode('ascii')))
2562                msg = textwrap.dedent(f"""
2563                    When testing {from_fds} to {to_fds} redirection,
2564                    parent descriptor {from_fd} got redirected
2565                    to descriptor(s) {read_fds} instead of descriptor {to_fd}.
2566                """)
2567                self.assertEqual([to_fd], read_fds, msg)
2568        finally:
2569            self._restore_fds(saved_fds)
2570
2571    # Check that subprocess can remap std fds correctly even
2572    # if one of them is closed (#32844).
2573    def test_swap_std_fds_with_one_closed(self):
2574        for from_fds in itertools.combinations(range(3), 2):
2575            for to_fds in itertools.permutations(range(3), 2):
2576                self._check_swap_std_fds_with_one_closed(from_fds, to_fds)
2577
2578    def test_surrogates_error_message(self):
2579        def prepare():
2580            raise ValueError("surrogate:\uDCff")
2581
2582        try:
2583            subprocess.call(
2584                ZERO_RETURN_CMD,
2585                preexec_fn=prepare)
2586        except ValueError as err:
2587            # Pure Python implementations keeps the message
2588            self.assertIsNone(subprocess._posixsubprocess)
2589            self.assertEqual(str(err), "surrogate:\uDCff")
2590        except subprocess.SubprocessError as err:
2591            # _posixsubprocess uses a default message
2592            self.assertIsNotNone(subprocess._posixsubprocess)
2593            self.assertEqual(str(err), "Exception occurred in preexec_fn.")
2594        else:
2595            self.fail("Expected ValueError or subprocess.SubprocessError")
2596
2597    def test_undecodable_env(self):
2598        for key, value in (('test', 'abc\uDCFF'), ('test\uDCFF', '42')):
2599            encoded_value = value.encode("ascii", "surrogateescape")
2600
2601            # test str with surrogates
2602            script = "import os; print(ascii(os.getenv(%s)))" % repr(key)
2603            env = os.environ.copy()
2604            env[key] = value
2605            # Use C locale to get ASCII for the locale encoding to force
2606            # surrogate-escaping of \xFF in the child process
2607            env['LC_ALL'] = 'C'
2608            decoded_value = value
2609            stdout = subprocess.check_output(
2610                [sys.executable, "-c", script],
2611                env=env)
2612            stdout = stdout.rstrip(b'\n\r')
2613            self.assertEqual(stdout.decode('ascii'), ascii(decoded_value))
2614
2615            # test bytes
2616            key = key.encode("ascii", "surrogateescape")
2617            script = "import os; print(ascii(os.getenvb(%s)))" % repr(key)
2618            env = os.environ.copy()
2619            env[key] = encoded_value
2620            stdout = subprocess.check_output(
2621                [sys.executable, "-c", script],
2622                env=env)
2623            stdout = stdout.rstrip(b'\n\r')
2624            self.assertEqual(stdout.decode('ascii'), ascii(encoded_value))
2625
2626    def test_bytes_program(self):
2627        abs_program = os.fsencode(ZERO_RETURN_CMD[0])
2628        args = list(ZERO_RETURN_CMD[1:])
2629        path, program = os.path.split(ZERO_RETURN_CMD[0])
2630        program = os.fsencode(program)
2631
2632        # absolute bytes path
2633        exitcode = subprocess.call([abs_program]+args)
2634        self.assertEqual(exitcode, 0)
2635
2636        # absolute bytes path as a string
2637        cmd = b"'%s' %s" % (abs_program, " ".join(args).encode("utf-8"))
2638        exitcode = subprocess.call(cmd, shell=True)
2639        self.assertEqual(exitcode, 0)
2640
2641        # bytes program, unicode PATH
2642        env = os.environ.copy()
2643        env["PATH"] = path
2644        exitcode = subprocess.call([program]+args, env=env)
2645        self.assertEqual(exitcode, 0)
2646
2647        # bytes program, bytes PATH
2648        envb = os.environb.copy()
2649        envb[b"PATH"] = os.fsencode(path)
2650        exitcode = subprocess.call([program]+args, env=envb)
2651        self.assertEqual(exitcode, 0)
2652
2653    def test_pipe_cloexec(self):
2654        sleeper = support.findfile("input_reader.py", subdir="subprocessdata")
2655        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2656
2657        p1 = subprocess.Popen([sys.executable, sleeper],
2658                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
2659                              stderr=subprocess.PIPE, close_fds=False)
2660
2661        self.addCleanup(p1.communicate, b'')
2662
2663        p2 = subprocess.Popen([sys.executable, fd_status],
2664                              stdout=subprocess.PIPE, close_fds=False)
2665
2666        output, error = p2.communicate()
2667        result_fds = set(map(int, output.split(b',')))
2668        unwanted_fds = set([p1.stdin.fileno(), p1.stdout.fileno(),
2669                            p1.stderr.fileno()])
2670
2671        self.assertFalse(result_fds & unwanted_fds,
2672                         "Expected no fds from %r to be open in child, "
2673                         "found %r" %
2674                              (unwanted_fds, result_fds & unwanted_fds))
2675
2676    def test_pipe_cloexec_real_tools(self):
2677        qcat = support.findfile("qcat.py", subdir="subprocessdata")
2678        qgrep = support.findfile("qgrep.py", subdir="subprocessdata")
2679
2680        subdata = b'zxcvbn'
2681        data = subdata * 4 + b'\n'
2682
2683        p1 = subprocess.Popen([sys.executable, qcat],
2684                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
2685                              close_fds=False)
2686
2687        p2 = subprocess.Popen([sys.executable, qgrep, subdata],
2688                              stdin=p1.stdout, stdout=subprocess.PIPE,
2689                              close_fds=False)
2690
2691        self.addCleanup(p1.wait)
2692        self.addCleanup(p2.wait)
2693        def kill_p1():
2694            try:
2695                p1.terminate()
2696            except ProcessLookupError:
2697                pass
2698        def kill_p2():
2699            try:
2700                p2.terminate()
2701            except ProcessLookupError:
2702                pass
2703        self.addCleanup(kill_p1)
2704        self.addCleanup(kill_p2)
2705
2706        p1.stdin.write(data)
2707        p1.stdin.close()
2708
2709        readfiles, ignored1, ignored2 = select.select([p2.stdout], [], [], 10)
2710
2711        self.assertTrue(readfiles, "The child hung")
2712        self.assertEqual(p2.stdout.read(), data)
2713
2714        p1.stdout.close()
2715        p2.stdout.close()
2716
2717    def test_close_fds(self):
2718        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2719
2720        fds = os.pipe()
2721        self.addCleanup(os.close, fds[0])
2722        self.addCleanup(os.close, fds[1])
2723
2724        open_fds = set(fds)
2725        # add a bunch more fds
2726        for _ in range(9):
2727            fd = os.open(os.devnull, os.O_RDONLY)
2728            self.addCleanup(os.close, fd)
2729            open_fds.add(fd)
2730
2731        for fd in open_fds:
2732            os.set_inheritable(fd, True)
2733
2734        p = subprocess.Popen([sys.executable, fd_status],
2735                             stdout=subprocess.PIPE, close_fds=False)
2736        output, ignored = p.communicate()
2737        remaining_fds = set(map(int, output.split(b',')))
2738
2739        self.assertEqual(remaining_fds & open_fds, open_fds,
2740                         "Some fds were closed")
2741
2742        p = subprocess.Popen([sys.executable, fd_status],
2743                             stdout=subprocess.PIPE, close_fds=True)
2744        output, ignored = p.communicate()
2745        remaining_fds = set(map(int, output.split(b',')))
2746
2747        self.assertFalse(remaining_fds & open_fds,
2748                         "Some fds were left open")
2749        self.assertIn(1, remaining_fds, "Subprocess failed")
2750
2751        # Keep some of the fd's we opened open in the subprocess.
2752        # This tests _posixsubprocess.c's proper handling of fds_to_keep.
2753        fds_to_keep = set(open_fds.pop() for _ in range(8))
2754        p = subprocess.Popen([sys.executable, fd_status],
2755                             stdout=subprocess.PIPE, close_fds=True,
2756                             pass_fds=fds_to_keep)
2757        output, ignored = p.communicate()
2758        remaining_fds = set(map(int, output.split(b',')))
2759
2760        self.assertFalse((remaining_fds - fds_to_keep) & open_fds,
2761                         "Some fds not in pass_fds were left open")
2762        self.assertIn(1, remaining_fds, "Subprocess failed")
2763
2764
2765    @unittest.skipIf(sys.platform.startswith("freebsd") and
2766                     os.stat("/dev").st_dev == os.stat("/dev/fd").st_dev,
2767                     "Requires fdescfs mounted on /dev/fd on FreeBSD.")
2768    def test_close_fds_when_max_fd_is_lowered(self):
2769        """Confirm that issue21618 is fixed (may fail under valgrind)."""
2770        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2771
2772        # This launches the meat of the test in a child process to
2773        # avoid messing with the larger unittest processes maximum
2774        # number of file descriptors.
2775        #  This process launches:
2776        #  +--> Process that lowers its RLIMIT_NOFILE aftr setting up
2777        #    a bunch of high open fds above the new lower rlimit.
2778        #    Those are reported via stdout before launching a new
2779        #    process with close_fds=False to run the actual test:
2780        #    +--> The TEST: This one launches a fd_status.py
2781        #      subprocess with close_fds=True so we can find out if
2782        #      any of the fds above the lowered rlimit are still open.
2783        p = subprocess.Popen([sys.executable, '-c', textwrap.dedent(
2784        '''
2785        import os, resource, subprocess, sys, textwrap
2786        open_fds = set()
2787        # Add a bunch more fds to pass down.
2788        for _ in range(40):
2789            fd = os.open(os.devnull, os.O_RDONLY)
2790            open_fds.add(fd)
2791
2792        # Leave a two pairs of low ones available for use by the
2793        # internal child error pipe and the stdout pipe.
2794        # We also leave 10 more open as some Python buildbots run into
2795        # "too many open files" errors during the test if we do not.
2796        for fd in sorted(open_fds)[:14]:
2797            os.close(fd)
2798            open_fds.remove(fd)
2799
2800        for fd in open_fds:
2801            #self.addCleanup(os.close, fd)
2802            os.set_inheritable(fd, True)
2803
2804        max_fd_open = max(open_fds)
2805
2806        # Communicate the open_fds to the parent unittest.TestCase process.
2807        print(','.join(map(str, sorted(open_fds))))
2808        sys.stdout.flush()
2809
2810        rlim_cur, rlim_max = resource.getrlimit(resource.RLIMIT_NOFILE)
2811        try:
2812            # 29 is lower than the highest fds we are leaving open.
2813            resource.setrlimit(resource.RLIMIT_NOFILE, (29, rlim_max))
2814            # Launch a new Python interpreter with our low fd rlim_cur that
2815            # inherits open fds above that limit.  It then uses subprocess
2816            # with close_fds=True to get a report of open fds in the child.
2817            # An explicit list of fds to check is passed to fd_status.py as
2818            # letting fd_status rely on its default logic would miss the
2819            # fds above rlim_cur as it normally only checks up to that limit.
2820            subprocess.Popen(
2821                [sys.executable, '-c',
2822                 textwrap.dedent("""
2823                     import subprocess, sys
2824                     subprocess.Popen([sys.executable, %r] +
2825                                      [str(x) for x in range({max_fd})],
2826                                      close_fds=True).wait()
2827                     """.format(max_fd=max_fd_open+1))],
2828                close_fds=False).wait()
2829        finally:
2830            resource.setrlimit(resource.RLIMIT_NOFILE, (rlim_cur, rlim_max))
2831        ''' % fd_status)], stdout=subprocess.PIPE)
2832
2833        output, unused_stderr = p.communicate()
2834        output_lines = output.splitlines()
2835        self.assertEqual(len(output_lines), 2,
2836                         msg="expected exactly two lines of output:\n%r" % output)
2837        opened_fds = set(map(int, output_lines[0].strip().split(b',')))
2838        remaining_fds = set(map(int, output_lines[1].strip().split(b',')))
2839
2840        self.assertFalse(remaining_fds & opened_fds,
2841                         msg="Some fds were left open.")
2842
2843
2844    # Mac OS X Tiger (10.4) has a kernel bug: sometimes, the file
2845    # descriptor of a pipe closed in the parent process is valid in the
2846    # child process according to fstat(), but the mode of the file
2847    # descriptor is invalid, and read or write raise an error.
2848    @support.requires_mac_ver(10, 5)
2849    def test_pass_fds(self):
2850        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2851
2852        open_fds = set()
2853
2854        for x in range(5):
2855            fds = os.pipe()
2856            self.addCleanup(os.close, fds[0])
2857            self.addCleanup(os.close, fds[1])
2858            os.set_inheritable(fds[0], True)
2859            os.set_inheritable(fds[1], True)
2860            open_fds.update(fds)
2861
2862        for fd in open_fds:
2863            p = subprocess.Popen([sys.executable, fd_status],
2864                                 stdout=subprocess.PIPE, close_fds=True,
2865                                 pass_fds=(fd, ))
2866            output, ignored = p.communicate()
2867
2868            remaining_fds = set(map(int, output.split(b',')))
2869            to_be_closed = open_fds - {fd}
2870
2871            self.assertIn(fd, remaining_fds, "fd to be passed not passed")
2872            self.assertFalse(remaining_fds & to_be_closed,
2873                             "fd to be closed passed")
2874
2875            # pass_fds overrides close_fds with a warning.
2876            with self.assertWarns(RuntimeWarning) as context:
2877                self.assertFalse(subprocess.call(
2878                        ZERO_RETURN_CMD,
2879                        close_fds=False, pass_fds=(fd, )))
2880            self.assertIn('overriding close_fds', str(context.warning))
2881
2882    def test_pass_fds_inheritable(self):
2883        script = support.findfile("fd_status.py", subdir="subprocessdata")
2884
2885        inheritable, non_inheritable = os.pipe()
2886        self.addCleanup(os.close, inheritable)
2887        self.addCleanup(os.close, non_inheritable)
2888        os.set_inheritable(inheritable, True)
2889        os.set_inheritable(non_inheritable, False)
2890        pass_fds = (inheritable, non_inheritable)
2891        args = [sys.executable, script]
2892        args += list(map(str, pass_fds))
2893
2894        p = subprocess.Popen(args,
2895                             stdout=subprocess.PIPE, close_fds=True,
2896                             pass_fds=pass_fds)
2897        output, ignored = p.communicate()
2898        fds = set(map(int, output.split(b',')))
2899
2900        # the inheritable file descriptor must be inherited, so its inheritable
2901        # flag must be set in the child process after fork() and before exec()
2902        self.assertEqual(fds, set(pass_fds), "output=%a" % output)
2903
2904        # inheritable flag must not be changed in the parent process
2905        self.assertEqual(os.get_inheritable(inheritable), True)
2906        self.assertEqual(os.get_inheritable(non_inheritable), False)
2907
2908
2909    # bpo-32270: Ensure that descriptors specified in pass_fds
2910    # are inherited even if they are used in redirections.
2911    # Contributed by @izbyshev.
2912    def test_pass_fds_redirected(self):
2913        """Regression test for https://bugs.python.org/issue32270."""
2914        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2915        pass_fds = []
2916        for _ in range(2):
2917            fd = os.open(os.devnull, os.O_RDWR)
2918            self.addCleanup(os.close, fd)
2919            pass_fds.append(fd)
2920
2921        stdout_r, stdout_w = os.pipe()
2922        self.addCleanup(os.close, stdout_r)
2923        self.addCleanup(os.close, stdout_w)
2924        pass_fds.insert(1, stdout_w)
2925
2926        with subprocess.Popen([sys.executable, fd_status],
2927                              stdin=pass_fds[0],
2928                              stdout=pass_fds[1],
2929                              stderr=pass_fds[2],
2930                              close_fds=True,
2931                              pass_fds=pass_fds):
2932            output = os.read(stdout_r, 1024)
2933        fds = {int(num) for num in output.split(b',')}
2934
2935        self.assertEqual(fds, {0, 1, 2} | frozenset(pass_fds), f"output={output!a}")
2936
2937
2938    def test_stdout_stdin_are_single_inout_fd(self):
2939        with io.open(os.devnull, "r+") as inout:
2940            p = subprocess.Popen(ZERO_RETURN_CMD,
2941                                 stdout=inout, stdin=inout)
2942            p.wait()
2943
2944    def test_stdout_stderr_are_single_inout_fd(self):
2945        with io.open(os.devnull, "r+") as inout:
2946            p = subprocess.Popen(ZERO_RETURN_CMD,
2947                                 stdout=inout, stderr=inout)
2948            p.wait()
2949
2950    def test_stderr_stdin_are_single_inout_fd(self):
2951        with io.open(os.devnull, "r+") as inout:
2952            p = subprocess.Popen(ZERO_RETURN_CMD,
2953                                 stderr=inout, stdin=inout)
2954            p.wait()
2955
2956    def test_wait_when_sigchild_ignored(self):
2957        # NOTE: sigchild_ignore.py may not be an effective test on all OSes.
2958        sigchild_ignore = support.findfile("sigchild_ignore.py",
2959                                           subdir="subprocessdata")
2960        p = subprocess.Popen([sys.executable, sigchild_ignore],
2961                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
2962        stdout, stderr = p.communicate()
2963        self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"
2964                         " non-zero with this error:\n%s" %
2965                         stderr.decode('utf-8'))
2966
2967    def test_select_unbuffered(self):
2968        # Issue #11459: bufsize=0 should really set the pipes as
2969        # unbuffered (and therefore let select() work properly).
2970        select = import_helper.import_module("select")
2971        p = subprocess.Popen([sys.executable, "-c",
2972                              'import sys;'
2973                              'sys.stdout.write("apple")'],
2974                             stdout=subprocess.PIPE,
2975                             bufsize=0)
2976        f = p.stdout
2977        self.addCleanup(f.close)
2978        try:
2979            self.assertEqual(f.read(4), b"appl")
2980            self.assertIn(f, select.select([f], [], [], 0.0)[0])
2981        finally:
2982            p.wait()
2983
2984    def test_zombie_fast_process_del(self):
2985        # Issue #12650: on Unix, if Popen.__del__() was called before the
2986        # process exited, it wouldn't be added to subprocess._active, and would
2987        # remain a zombie.
2988        # spawn a Popen, and delete its reference before it exits
2989        p = subprocess.Popen([sys.executable, "-c",
2990                              'import sys, time;'
2991                              'time.sleep(0.2)'],
2992                             stdout=subprocess.PIPE,
2993                             stderr=subprocess.PIPE)
2994        self.addCleanup(p.stdout.close)
2995        self.addCleanup(p.stderr.close)
2996        ident = id(p)
2997        pid = p.pid
2998        with warnings_helper.check_warnings(('', ResourceWarning)):
2999            p = None
3000
3001        if mswindows:
3002            # subprocess._active is not used on Windows and is set to None.
3003            self.assertIsNone(subprocess._active)
3004        else:
3005            # check that p is in the active processes list
3006            self.assertIn(ident, [id(o) for o in subprocess._active])
3007
3008    def test_leak_fast_process_del_killed(self):
3009        # Issue #12650: on Unix, if Popen.__del__() was called before the
3010        # process exited, and the process got killed by a signal, it would never
3011        # be removed from subprocess._active, which triggered a FD and memory
3012        # leak.
3013        # spawn a Popen, delete its reference and kill it
3014        p = subprocess.Popen([sys.executable, "-c",
3015                              'import time;'
3016                              'time.sleep(3)'],
3017                             stdout=subprocess.PIPE,
3018                             stderr=subprocess.PIPE)
3019        self.addCleanup(p.stdout.close)
3020        self.addCleanup(p.stderr.close)
3021        ident = id(p)
3022        pid = p.pid
3023        with warnings_helper.check_warnings(('', ResourceWarning)):
3024            p = None
3025            support.gc_collect()  # For PyPy or other GCs.
3026
3027        os.kill(pid, signal.SIGKILL)
3028        if mswindows:
3029            # subprocess._active is not used on Windows and is set to None.
3030            self.assertIsNone(subprocess._active)
3031        else:
3032            # check that p is in the active processes list
3033            self.assertIn(ident, [id(o) for o in subprocess._active])
3034
3035        # let some time for the process to exit, and create a new Popen: this
3036        # should trigger the wait() of p
3037        time.sleep(0.2)
3038        with self.assertRaises(OSError):
3039            with subprocess.Popen(NONEXISTING_CMD,
3040                                  stdout=subprocess.PIPE,
3041                                  stderr=subprocess.PIPE) as proc:
3042                pass
3043        # p should have been wait()ed on, and removed from the _active list
3044        self.assertRaises(OSError, os.waitpid, pid, 0)
3045        if mswindows:
3046            # subprocess._active is not used on Windows and is set to None.
3047            self.assertIsNone(subprocess._active)
3048        else:
3049            self.assertNotIn(ident, [id(o) for o in subprocess._active])
3050
3051    def test_close_fds_after_preexec(self):
3052        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
3053
3054        # this FD is used as dup2() target by preexec_fn, and should be closed
3055        # in the child process
3056        fd = os.dup(1)
3057        self.addCleanup(os.close, fd)
3058
3059        p = subprocess.Popen([sys.executable, fd_status],
3060                             stdout=subprocess.PIPE, close_fds=True,
3061                             preexec_fn=lambda: os.dup2(1, fd))
3062        output, ignored = p.communicate()
3063
3064        remaining_fds = set(map(int, output.split(b',')))
3065
3066        self.assertNotIn(fd, remaining_fds)
3067
3068    @support.cpython_only
3069    def test_fork_exec(self):
3070        # Issue #22290: fork_exec() must not crash on memory allocation failure
3071        # or other errors
3072        import _posixsubprocess
3073        gc_enabled = gc.isenabled()
3074        try:
3075            # Use a preexec function and enable the garbage collector
3076            # to force fork_exec() to re-enable the garbage collector
3077            # on error.
3078            func = lambda: None
3079            gc.enable()
3080
3081            for args, exe_list, cwd, env_list in (
3082                (123,      [b"exe"], None, [b"env"]),
3083                ([b"arg"], 123,      None, [b"env"]),
3084                ([b"arg"], [b"exe"], 123,  [b"env"]),
3085                ([b"arg"], [b"exe"], None, 123),
3086            ):
3087                with self.assertRaises(TypeError) as err:
3088                    _posixsubprocess.fork_exec(
3089                        args, exe_list,
3090                        True, (), cwd, env_list,
3091                        -1, -1, -1, -1,
3092                        1, 2, 3, 4,
3093                        True, True,
3094                        False, [], 0, -1,
3095                        func)
3096                # Attempt to prevent
3097                # "TypeError: fork_exec() takes exactly N arguments (M given)"
3098                # from passing the test.  More refactoring to have us start
3099                # with a valid *args list, confirm a good call with that works
3100                # before mutating it in various ways to ensure that bad calls
3101                # with individual arg type errors raise a typeerror would be
3102                # ideal.  Saving that for a future PR...
3103                self.assertNotIn('takes exactly', str(err.exception))
3104        finally:
3105            if not gc_enabled:
3106                gc.disable()
3107
3108    @support.cpython_only
3109    def test_fork_exec_sorted_fd_sanity_check(self):
3110        # Issue #23564: sanity check the fork_exec() fds_to_keep sanity check.
3111        import _posixsubprocess
3112        class BadInt:
3113            first = True
3114            def __init__(self, value):
3115                self.value = value
3116            def __int__(self):
3117                if self.first:
3118                    self.first = False
3119                    return self.value
3120                raise ValueError
3121
3122        gc_enabled = gc.isenabled()
3123        try:
3124            gc.enable()
3125
3126            for fds_to_keep in (
3127                (-1, 2, 3, 4, 5),  # Negative number.
3128                ('str', 4),  # Not an int.
3129                (18, 23, 42, 2**63),  # Out of range.
3130                (5, 4),  # Not sorted.
3131                (6, 7, 7, 8),  # Duplicate.
3132                (BadInt(1), BadInt(2)),
3133            ):
3134                with self.assertRaises(
3135                        ValueError,
3136                        msg='fds_to_keep={}'.format(fds_to_keep)) as c:
3137                    _posixsubprocess.fork_exec(
3138                        [b"false"], [b"false"],
3139                        True, fds_to_keep, None, [b"env"],
3140                        -1, -1, -1, -1,
3141                        1, 2, 3, 4,
3142                        True, True,
3143                        None, None, None, -1,
3144                        None)
3145                self.assertIn('fds_to_keep', str(c.exception))
3146        finally:
3147            if not gc_enabled:
3148                gc.disable()
3149
3150    def test_communicate_BrokenPipeError_stdin_close(self):
3151        # By not setting stdout or stderr or a timeout we force the fast path
3152        # that just calls _stdin_write() internally due to our mock.
3153        proc = subprocess.Popen(ZERO_RETURN_CMD)
3154        with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
3155            mock_proc_stdin.close.side_effect = BrokenPipeError
3156            proc.communicate()  # Should swallow BrokenPipeError from close.
3157            mock_proc_stdin.close.assert_called_with()
3158
3159    def test_communicate_BrokenPipeError_stdin_write(self):
3160        # By not setting stdout or stderr or a timeout we force the fast path
3161        # that just calls _stdin_write() internally due to our mock.
3162        proc = subprocess.Popen(ZERO_RETURN_CMD)
3163        with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
3164            mock_proc_stdin.write.side_effect = BrokenPipeError
3165            proc.communicate(b'stuff')  # Should swallow the BrokenPipeError.
3166            mock_proc_stdin.write.assert_called_once_with(b'stuff')
3167            mock_proc_stdin.close.assert_called_once_with()
3168
3169    def test_communicate_BrokenPipeError_stdin_flush(self):
3170        # Setting stdin and stdout forces the ._communicate() code path.
3171        # python -h exits faster than python -c pass (but spams stdout).
3172        proc = subprocess.Popen([sys.executable, '-h'],
3173                                stdin=subprocess.PIPE,
3174                                stdout=subprocess.PIPE)
3175        with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin, \
3176                open(os.devnull, 'wb') as dev_null:
3177            mock_proc_stdin.flush.side_effect = BrokenPipeError
3178            # because _communicate registers a selector using proc.stdin...
3179            mock_proc_stdin.fileno.return_value = dev_null.fileno()
3180            # _communicate() should swallow BrokenPipeError from flush.
3181            proc.communicate(b'stuff')
3182            mock_proc_stdin.flush.assert_called_once_with()
3183
3184    def test_communicate_BrokenPipeError_stdin_close_with_timeout(self):
3185        # Setting stdin and stdout forces the ._communicate() code path.
3186        # python -h exits faster than python -c pass (but spams stdout).
3187        proc = subprocess.Popen([sys.executable, '-h'],
3188                                stdin=subprocess.PIPE,
3189                                stdout=subprocess.PIPE)
3190        with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
3191            mock_proc_stdin.close.side_effect = BrokenPipeError
3192            # _communicate() should swallow BrokenPipeError from close.
3193            proc.communicate(timeout=999)
3194            mock_proc_stdin.close.assert_called_once_with()
3195
3196    @unittest.skipUnless(_testcapi is not None
3197                         and hasattr(_testcapi, 'W_STOPCODE'),
3198                         'need _testcapi.W_STOPCODE')
3199    def test_stopped(self):
3200        """Test wait() behavior when waitpid returns WIFSTOPPED; issue29335."""
3201        args = ZERO_RETURN_CMD
3202        proc = subprocess.Popen(args)
3203
3204        # Wait until the real process completes to avoid zombie process
3205        support.wait_process(proc.pid, exitcode=0)
3206
3207        status = _testcapi.W_STOPCODE(3)
3208        with mock.patch('subprocess.os.waitpid', return_value=(proc.pid, status)):
3209            returncode = proc.wait()
3210
3211        self.assertEqual(returncode, -3)
3212
3213    def test_send_signal_race(self):
3214        # bpo-38630: send_signal() must poll the process exit status to reduce
3215        # the risk of sending the signal to the wrong process.
3216        proc = subprocess.Popen(ZERO_RETURN_CMD)
3217
3218        # wait until the process completes without using the Popen APIs.
3219        support.wait_process(proc.pid, exitcode=0)
3220
3221        # returncode is still None but the process completed.
3222        self.assertIsNone(proc.returncode)
3223
3224        with mock.patch("os.kill") as mock_kill:
3225            proc.send_signal(signal.SIGTERM)
3226
3227        # send_signal() didn't call os.kill() since the process already
3228        # completed.
3229        mock_kill.assert_not_called()
3230
3231        # Don't check the returncode value: the test reads the exit status,
3232        # so Popen failed to read it and uses a default returncode instead.
3233        self.assertIsNotNone(proc.returncode)
3234
3235    def test_send_signal_race2(self):
3236        # bpo-40550: the process might exist between the returncode check and
3237        # the kill operation
3238        p = subprocess.Popen([sys.executable, '-c', 'exit(1)'])
3239
3240        # wait for process to exit
3241        while not p.returncode:
3242            p.poll()
3243
3244        with mock.patch.object(p, 'poll', new=lambda: None):
3245            p.returncode = None
3246            p.send_signal(signal.SIGTERM)
3247
3248    def test_communicate_repeated_call_after_stdout_close(self):
3249        proc = subprocess.Popen([sys.executable, '-c',
3250                                 'import os, time; os.close(1), time.sleep(2)'],
3251                                stdout=subprocess.PIPE)
3252        while True:
3253            try:
3254                proc.communicate(timeout=0.1)
3255                return
3256            except subprocess.TimeoutExpired:
3257                pass
3258
3259
3260@unittest.skipUnless(mswindows, "Windows specific tests")
3261class Win32ProcessTestCase(BaseTestCase):
3262
3263    def test_startupinfo(self):
3264        # startupinfo argument
3265        # We uses hardcoded constants, because we do not want to
3266        # depend on win32all.
3267        STARTF_USESHOWWINDOW = 1
3268        SW_MAXIMIZE = 3
3269        startupinfo = subprocess.STARTUPINFO()
3270        startupinfo.dwFlags = STARTF_USESHOWWINDOW
3271        startupinfo.wShowWindow = SW_MAXIMIZE
3272        # Since Python is a console process, it won't be affected
3273        # by wShowWindow, but the argument should be silently
3274        # ignored
3275        subprocess.call(ZERO_RETURN_CMD,
3276                        startupinfo=startupinfo)
3277
3278    def test_startupinfo_keywords(self):
3279        # startupinfo argument
3280        # We use hardcoded constants, because we do not want to
3281        # depend on win32all.
3282        STARTF_USERSHOWWINDOW = 1
3283        SW_MAXIMIZE = 3
3284        startupinfo = subprocess.STARTUPINFO(
3285            dwFlags=STARTF_USERSHOWWINDOW,
3286            wShowWindow=SW_MAXIMIZE
3287        )
3288        # Since Python is a console process, it won't be affected
3289        # by wShowWindow, but the argument should be silently
3290        # ignored
3291        subprocess.call(ZERO_RETURN_CMD,
3292                        startupinfo=startupinfo)
3293
3294    def test_startupinfo_copy(self):
3295        # bpo-34044: Popen must not modify input STARTUPINFO structure
3296        startupinfo = subprocess.STARTUPINFO()
3297        startupinfo.dwFlags = subprocess.STARTF_USESHOWWINDOW
3298        startupinfo.wShowWindow = subprocess.SW_HIDE
3299
3300        # Call Popen() twice with the same startupinfo object to make sure
3301        # that it's not modified
3302        for _ in range(2):
3303            cmd = ZERO_RETURN_CMD
3304            with open(os.devnull, 'w') as null:
3305                proc = subprocess.Popen(cmd,
3306                                        stdout=null,
3307                                        stderr=subprocess.STDOUT,
3308                                        startupinfo=startupinfo)
3309                with proc:
3310                    proc.communicate()
3311                self.assertEqual(proc.returncode, 0)
3312
3313            self.assertEqual(startupinfo.dwFlags,
3314                             subprocess.STARTF_USESHOWWINDOW)
3315            self.assertIsNone(startupinfo.hStdInput)
3316            self.assertIsNone(startupinfo.hStdOutput)
3317            self.assertIsNone(startupinfo.hStdError)
3318            self.assertEqual(startupinfo.wShowWindow, subprocess.SW_HIDE)
3319            self.assertEqual(startupinfo.lpAttributeList, {"handle_list": []})
3320
3321    def test_creationflags(self):
3322        # creationflags argument
3323        CREATE_NEW_CONSOLE = 16
3324        sys.stderr.write("    a DOS box should flash briefly ...\n")
3325        subprocess.call(sys.executable +
3326                        ' -c "import time; time.sleep(0.25)"',
3327                        creationflags=CREATE_NEW_CONSOLE)
3328
3329    def test_invalid_args(self):
3330        # invalid arguments should raise ValueError
3331        self.assertRaises(ValueError, subprocess.call,
3332                          [sys.executable, "-c",
3333                           "import sys; sys.exit(47)"],
3334                          preexec_fn=lambda: 1)
3335
3336    @support.cpython_only
3337    def test_issue31471(self):
3338        # There shouldn't be an assertion failure in Popen() in case the env
3339        # argument has a bad keys() method.
3340        class BadEnv(dict):
3341            keys = None
3342        with self.assertRaises(TypeError):
3343            subprocess.Popen(ZERO_RETURN_CMD, env=BadEnv())
3344
3345    def test_close_fds(self):
3346        # close file descriptors
3347        rc = subprocess.call([sys.executable, "-c",
3348                              "import sys; sys.exit(47)"],
3349                              close_fds=True)
3350        self.assertEqual(rc, 47)
3351
3352    def test_close_fds_with_stdio(self):
3353        import msvcrt
3354
3355        fds = os.pipe()
3356        self.addCleanup(os.close, fds[0])
3357        self.addCleanup(os.close, fds[1])
3358
3359        handles = []
3360        for fd in fds:
3361            os.set_inheritable(fd, True)
3362            handles.append(msvcrt.get_osfhandle(fd))
3363
3364        p = subprocess.Popen([sys.executable, "-c",
3365                              "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])],
3366                             stdout=subprocess.PIPE, close_fds=False)
3367        stdout, stderr = p.communicate()
3368        self.assertEqual(p.returncode, 0)
3369        int(stdout.strip())  # Check that stdout is an integer
3370
3371        p = subprocess.Popen([sys.executable, "-c",
3372                              "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])],
3373                             stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
3374        stdout, stderr = p.communicate()
3375        self.assertEqual(p.returncode, 1)
3376        self.assertIn(b"OSError", stderr)
3377
3378        # The same as the previous call, but with an empty handle_list
3379        handle_list = []
3380        startupinfo = subprocess.STARTUPINFO()
3381        startupinfo.lpAttributeList = {"handle_list": handle_list}
3382        p = subprocess.Popen([sys.executable, "-c",
3383                              "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])],
3384                             stdout=subprocess.PIPE, stderr=subprocess.PIPE,
3385                             startupinfo=startupinfo, close_fds=True)
3386        stdout, stderr = p.communicate()
3387        self.assertEqual(p.returncode, 1)
3388        self.assertIn(b"OSError", stderr)
3389
3390        # Check for a warning due to using handle_list and close_fds=False
3391        with warnings_helper.check_warnings((".*overriding close_fds",
3392                                             RuntimeWarning)):
3393            startupinfo = subprocess.STARTUPINFO()
3394            startupinfo.lpAttributeList = {"handle_list": handles[:]}
3395            p = subprocess.Popen([sys.executable, "-c",
3396                                  "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])],
3397                                 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
3398                                 startupinfo=startupinfo, close_fds=False)
3399            stdout, stderr = p.communicate()
3400            self.assertEqual(p.returncode, 0)
3401
3402    def test_empty_attribute_list(self):
3403        startupinfo = subprocess.STARTUPINFO()
3404        startupinfo.lpAttributeList = {}
3405        subprocess.call(ZERO_RETURN_CMD,
3406                        startupinfo=startupinfo)
3407
3408    def test_empty_handle_list(self):
3409        startupinfo = subprocess.STARTUPINFO()
3410        startupinfo.lpAttributeList = {"handle_list": []}
3411        subprocess.call(ZERO_RETURN_CMD,
3412                        startupinfo=startupinfo)
3413
3414    def test_shell_sequence(self):
3415        # Run command through the shell (sequence)
3416        newenv = os.environ.copy()
3417        newenv["FRUIT"] = "physalis"
3418        p = subprocess.Popen(["set"], shell=1,
3419                             stdout=subprocess.PIPE,
3420                             env=newenv)
3421        with p:
3422            self.assertIn(b"physalis", p.stdout.read())
3423
3424    def test_shell_string(self):
3425        # Run command through the shell (string)
3426        newenv = os.environ.copy()
3427        newenv["FRUIT"] = "physalis"
3428        p = subprocess.Popen("set", shell=1,
3429                             stdout=subprocess.PIPE,
3430                             env=newenv)
3431        with p:
3432            self.assertIn(b"physalis", p.stdout.read())
3433
3434    def test_shell_encodings(self):
3435        # Run command through the shell (string)
3436        for enc in ['ansi', 'oem']:
3437            newenv = os.environ.copy()
3438            newenv["FRUIT"] = "physalis"
3439            p = subprocess.Popen("set", shell=1,
3440                                 stdout=subprocess.PIPE,
3441                                 env=newenv,
3442                                 encoding=enc)
3443            with p:
3444                self.assertIn("physalis", p.stdout.read(), enc)
3445
3446    def test_call_string(self):
3447        # call() function with string argument on Windows
3448        rc = subprocess.call(sys.executable +
3449                             ' -c "import sys; sys.exit(47)"')
3450        self.assertEqual(rc, 47)
3451
3452    def _kill_process(self, method, *args):
3453        # Some win32 buildbot raises EOFError if stdin is inherited
3454        p = subprocess.Popen([sys.executable, "-c", """if 1:
3455                             import sys, time
3456                             sys.stdout.write('x\\n')
3457                             sys.stdout.flush()
3458                             time.sleep(30)
3459                             """],
3460                             stdin=subprocess.PIPE,
3461                             stdout=subprocess.PIPE,
3462                             stderr=subprocess.PIPE)
3463        with p:
3464            # Wait for the interpreter to be completely initialized before
3465            # sending any signal.
3466            p.stdout.read(1)
3467            getattr(p, method)(*args)
3468            _, stderr = p.communicate()
3469            self.assertEqual(stderr, b'')
3470            returncode = p.wait()
3471        self.assertNotEqual(returncode, 0)
3472
3473    def _kill_dead_process(self, method, *args):
3474        p = subprocess.Popen([sys.executable, "-c", """if 1:
3475                             import sys, time
3476                             sys.stdout.write('x\\n')
3477                             sys.stdout.flush()
3478                             sys.exit(42)
3479                             """],
3480                             stdin=subprocess.PIPE,
3481                             stdout=subprocess.PIPE,
3482                             stderr=subprocess.PIPE)
3483        with p:
3484            # Wait for the interpreter to be completely initialized before
3485            # sending any signal.
3486            p.stdout.read(1)
3487            # The process should end after this
3488            time.sleep(1)
3489            # This shouldn't raise even though the child is now dead
3490            getattr(p, method)(*args)
3491            _, stderr = p.communicate()
3492            self.assertEqual(stderr, b'')
3493            rc = p.wait()
3494        self.assertEqual(rc, 42)
3495
3496    def test_send_signal(self):
3497        self._kill_process('send_signal', signal.SIGTERM)
3498
3499    def test_kill(self):
3500        self._kill_process('kill')
3501
3502    def test_terminate(self):
3503        self._kill_process('terminate')
3504
3505    def test_send_signal_dead(self):
3506        self._kill_dead_process('send_signal', signal.SIGTERM)
3507
3508    def test_kill_dead(self):
3509        self._kill_dead_process('kill')
3510
3511    def test_terminate_dead(self):
3512        self._kill_dead_process('terminate')
3513
3514class MiscTests(unittest.TestCase):
3515
3516    class RecordingPopen(subprocess.Popen):
3517        """A Popen that saves a reference to each instance for testing."""
3518        instances_created = []
3519
3520        def __init__(self, *args, **kwargs):
3521            super().__init__(*args, **kwargs)
3522            self.instances_created.append(self)
3523
3524    @mock.patch.object(subprocess.Popen, "_communicate")
3525    def _test_keyboardinterrupt_no_kill(self, popener, mock__communicate,
3526                                        **kwargs):
3527        """Fake a SIGINT happening during Popen._communicate() and ._wait().
3528
3529        This avoids the need to actually try and get test environments to send
3530        and receive signals reliably across platforms.  The net effect of a ^C
3531        happening during a blocking subprocess execution which we want to clean
3532        up from is a KeyboardInterrupt coming out of communicate() or wait().
3533        """
3534
3535        mock__communicate.side_effect = KeyboardInterrupt
3536        try:
3537            with mock.patch.object(subprocess.Popen, "_wait") as mock__wait:
3538                # We patch out _wait() as no signal was involved so the
3539                # child process isn't actually going to exit rapidly.
3540                mock__wait.side_effect = KeyboardInterrupt
3541                with mock.patch.object(subprocess, "Popen",
3542                                       self.RecordingPopen):
3543                    with self.assertRaises(KeyboardInterrupt):
3544                        popener([sys.executable, "-c",
3545                                 "import time\ntime.sleep(9)\nimport sys\n"
3546                                 "sys.stderr.write('\\n!runaway child!\\n')"],
3547                                stdout=subprocess.DEVNULL, **kwargs)
3548                for call in mock__wait.call_args_list[1:]:
3549                    self.assertNotEqual(
3550                            call, mock.call(timeout=None),
3551                            "no open-ended wait() after the first allowed: "
3552                            f"{mock__wait.call_args_list}")
3553                sigint_calls = []
3554                for call in mock__wait.call_args_list:
3555                    if call == mock.call(timeout=0.25):  # from Popen.__init__
3556                        sigint_calls.append(call)
3557                self.assertLessEqual(mock__wait.call_count, 2,
3558                                     msg=mock__wait.call_args_list)
3559                self.assertEqual(len(sigint_calls), 1,
3560                                 msg=mock__wait.call_args_list)
3561        finally:
3562            # cleanup the forgotten (due to our mocks) child process
3563            process = self.RecordingPopen.instances_created.pop()
3564            process.kill()
3565            process.wait()
3566            self.assertEqual([], self.RecordingPopen.instances_created)
3567
3568    def test_call_keyboardinterrupt_no_kill(self):
3569        self._test_keyboardinterrupt_no_kill(subprocess.call, timeout=6.282)
3570
3571    def test_run_keyboardinterrupt_no_kill(self):
3572        self._test_keyboardinterrupt_no_kill(subprocess.run, timeout=6.282)
3573
3574    def test_context_manager_keyboardinterrupt_no_kill(self):
3575        def popen_via_context_manager(*args, **kwargs):
3576            with subprocess.Popen(*args, **kwargs) as unused_process:
3577                raise KeyboardInterrupt  # Test how __exit__ handles ^C.
3578        self._test_keyboardinterrupt_no_kill(popen_via_context_manager)
3579
3580    def test_getoutput(self):
3581        self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy')
3582        self.assertEqual(subprocess.getstatusoutput('echo xyzzy'),
3583                         (0, 'xyzzy'))
3584
3585        # we use mkdtemp in the next line to create an empty directory
3586        # under our exclusive control; from that, we can invent a pathname
3587        # that we _know_ won't exist.  This is guaranteed to fail.
3588        dir = None
3589        try:
3590            dir = tempfile.mkdtemp()
3591            name = os.path.join(dir, "foo")
3592            status, output = subprocess.getstatusoutput(
3593                ("type " if mswindows else "cat ") + name)
3594            self.assertNotEqual(status, 0)
3595        finally:
3596            if dir is not None:
3597                os.rmdir(dir)
3598
3599    def test__all__(self):
3600        """Ensure that __all__ is populated properly."""
3601        intentionally_excluded = {"list2cmdline", "Handle", "pwd", "grp", "fcntl"}
3602        exported = set(subprocess.__all__)
3603        possible_exports = set()
3604        import types
3605        for name, value in subprocess.__dict__.items():
3606            if name.startswith('_'):
3607                continue
3608            if isinstance(value, (types.ModuleType,)):
3609                continue
3610            possible_exports.add(name)
3611        self.assertEqual(exported, possible_exports - intentionally_excluded)
3612
3613
3614@unittest.skipUnless(hasattr(selectors, 'PollSelector'),
3615                     "Test needs selectors.PollSelector")
3616class ProcessTestCaseNoPoll(ProcessTestCase):
3617    def setUp(self):
3618        self.orig_selector = subprocess._PopenSelector
3619        subprocess._PopenSelector = selectors.SelectSelector
3620        ProcessTestCase.setUp(self)
3621
3622    def tearDown(self):
3623        subprocess._PopenSelector = self.orig_selector
3624        ProcessTestCase.tearDown(self)
3625
3626
3627@unittest.skipUnless(mswindows, "Windows-specific tests")
3628class CommandsWithSpaces (BaseTestCase):
3629
3630    def setUp(self):
3631        super().setUp()
3632        f, fname = tempfile.mkstemp(".py", "te st")
3633        self.fname = fname.lower ()
3634        os.write(f, b"import sys;"
3635                    b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))"
3636        )
3637        os.close(f)
3638
3639    def tearDown(self):
3640        os.remove(self.fname)
3641        super().tearDown()
3642
3643    def with_spaces(self, *args, **kwargs):
3644        kwargs['stdout'] = subprocess.PIPE
3645        p = subprocess.Popen(*args, **kwargs)
3646        with p:
3647            self.assertEqual(
3648              p.stdout.read ().decode("mbcs"),
3649              "2 [%r, 'ab cd']" % self.fname
3650            )
3651
3652    def test_shell_string_with_spaces(self):
3653        # call() function with string argument with spaces on Windows
3654        self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
3655                                             "ab cd"), shell=1)
3656
3657    def test_shell_sequence_with_spaces(self):
3658        # call() function with sequence argument with spaces on Windows
3659        self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1)
3660
3661    def test_noshell_string_with_spaces(self):
3662        # call() function with string argument with spaces on Windows
3663        self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
3664                             "ab cd"))
3665
3666    def test_noshell_sequence_with_spaces(self):
3667        # call() function with sequence argument with spaces on Windows
3668        self.with_spaces([sys.executable, self.fname, "ab cd"])
3669
3670
3671class ContextManagerTests(BaseTestCase):
3672
3673    def test_pipe(self):
3674        with subprocess.Popen([sys.executable, "-c",
3675                               "import sys;"
3676                               "sys.stdout.write('stdout');"
3677                               "sys.stderr.write('stderr');"],
3678                              stdout=subprocess.PIPE,
3679                              stderr=subprocess.PIPE) as proc:
3680            self.assertEqual(proc.stdout.read(), b"stdout")
3681            self.assertEqual(proc.stderr.read(), b"stderr")
3682
3683        self.assertTrue(proc.stdout.closed)
3684        self.assertTrue(proc.stderr.closed)
3685
3686    def test_returncode(self):
3687        with subprocess.Popen([sys.executable, "-c",
3688                               "import sys; sys.exit(100)"]) as proc:
3689            pass
3690        # __exit__ calls wait(), so the returncode should be set
3691        self.assertEqual(proc.returncode, 100)
3692
3693    def test_communicate_stdin(self):
3694        with subprocess.Popen([sys.executable, "-c",
3695                              "import sys;"
3696                              "sys.exit(sys.stdin.read() == 'context')"],
3697                             stdin=subprocess.PIPE) as proc:
3698            proc.communicate(b"context")
3699            self.assertEqual(proc.returncode, 1)
3700
3701    def test_invalid_args(self):
3702        with self.assertRaises(NONEXISTING_ERRORS):
3703            with subprocess.Popen(NONEXISTING_CMD,
3704                                  stdout=subprocess.PIPE,
3705                                  stderr=subprocess.PIPE) as proc:
3706                pass
3707
3708    def test_broken_pipe_cleanup(self):
3709        """Broken pipe error should not prevent wait() (Issue 21619)"""
3710        proc = subprocess.Popen(ZERO_RETURN_CMD,
3711                                stdin=subprocess.PIPE,
3712                                bufsize=support.PIPE_MAX_SIZE*2)
3713        proc = proc.__enter__()
3714        # Prepare to send enough data to overflow any OS pipe buffering and
3715        # guarantee a broken pipe error. Data is held in BufferedWriter
3716        # buffer until closed.
3717        proc.stdin.write(b'x' * support.PIPE_MAX_SIZE)
3718        self.assertIsNone(proc.returncode)
3719        # EPIPE expected under POSIX; EINVAL under Windows
3720        self.assertRaises(OSError, proc.__exit__, None, None, None)
3721        self.assertEqual(proc.returncode, 0)
3722        self.assertTrue(proc.stdin.closed)
3723
3724
3725if __name__ == "__main__":
3726    unittest.main()
3727