• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import unittest
2from unittest import mock
3from test import support
4import subprocess
5import sys
6import platform
7import signal
8import io
9import os
10import errno
11import tempfile
12import time
13import selectors
14import sysconfig
15import select
16import shutil
17import gc
18import textwrap
19
20try:
21    import ctypes
22except ImportError:
23    ctypes = None
24
25try:
26    import threading
27except ImportError:
28    threading = None
29
30if support.PGO:
31    raise unittest.SkipTest("test is not helpful for PGO")
32
33mswindows = (sys.platform == "win32")
34
35#
36# Depends on the following external programs: Python
37#
38
39if mswindows:
40    SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), '
41                                                'os.O_BINARY);')
42else:
43    SETBINARY = ''
44
45
46class BaseTestCase(unittest.TestCase):
47    def setUp(self):
48        # Try to minimize the number of children we have so this test
49        # doesn't crash on some buildbots (Alphas in particular).
50        support.reap_children()
51
52    def tearDown(self):
53        for inst in subprocess._active:
54            inst.wait()
55        subprocess._cleanup()
56        self.assertFalse(subprocess._active, "subprocess._active not empty")
57
58    def assertStderrEqual(self, stderr, expected, msg=None):
59        # In a debug build, stuff like "[6580 refs]" is printed to stderr at
60        # shutdown time.  That frustrates tests trying to check stderr produced
61        # from a spawned Python process.
62        actual = support.strip_python_stderr(stderr)
63        # strip_python_stderr also strips whitespace, so we do too.
64        expected = expected.strip()
65        self.assertEqual(actual, expected, msg)
66
67
68class PopenTestException(Exception):
69    pass
70
71
72class PopenExecuteChildRaises(subprocess.Popen):
73    """Popen subclass for testing cleanup of subprocess.PIPE filehandles when
74    _execute_child fails.
75    """
76    def _execute_child(self, *args, **kwargs):
77        raise PopenTestException("Forced Exception for Test")
78
79
80class ProcessTestCase(BaseTestCase):
81
82    def test_io_buffered_by_default(self):
83        p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
84                             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
85                             stderr=subprocess.PIPE)
86        try:
87            self.assertIsInstance(p.stdin, io.BufferedIOBase)
88            self.assertIsInstance(p.stdout, io.BufferedIOBase)
89            self.assertIsInstance(p.stderr, io.BufferedIOBase)
90        finally:
91            p.stdin.close()
92            p.stdout.close()
93            p.stderr.close()
94            p.wait()
95
96    def test_io_unbuffered_works(self):
97        p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
98                             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
99                             stderr=subprocess.PIPE, bufsize=0)
100        try:
101            self.assertIsInstance(p.stdin, io.RawIOBase)
102            self.assertIsInstance(p.stdout, io.RawIOBase)
103            self.assertIsInstance(p.stderr, io.RawIOBase)
104        finally:
105            p.stdin.close()
106            p.stdout.close()
107            p.stderr.close()
108            p.wait()
109
110    def test_call_seq(self):
111        # call() function with sequence argument
112        rc = subprocess.call([sys.executable, "-c",
113                              "import sys; sys.exit(47)"])
114        self.assertEqual(rc, 47)
115
116    def test_call_timeout(self):
117        # call() function with timeout argument; we want to test that the child
118        # process gets killed when the timeout expires.  If the child isn't
119        # killed, this call will deadlock since subprocess.call waits for the
120        # child.
121        self.assertRaises(subprocess.TimeoutExpired, subprocess.call,
122                          [sys.executable, "-c", "while True: pass"],
123                          timeout=0.1)
124
125    def test_check_call_zero(self):
126        # check_call() function with zero return code
127        rc = subprocess.check_call([sys.executable, "-c",
128                                    "import sys; sys.exit(0)"])
129        self.assertEqual(rc, 0)
130
131    def test_check_call_nonzero(self):
132        # check_call() function with non-zero return code
133        with self.assertRaises(subprocess.CalledProcessError) as c:
134            subprocess.check_call([sys.executable, "-c",
135                                   "import sys; sys.exit(47)"])
136        self.assertEqual(c.exception.returncode, 47)
137
138    def test_check_output(self):
139        # check_output() function with zero return code
140        output = subprocess.check_output(
141                [sys.executable, "-c", "print('BDFL')"])
142        self.assertIn(b'BDFL', output)
143
144    def test_check_output_nonzero(self):
145        # check_call() function with non-zero return code
146        with self.assertRaises(subprocess.CalledProcessError) as c:
147            subprocess.check_output(
148                    [sys.executable, "-c", "import sys; sys.exit(5)"])
149        self.assertEqual(c.exception.returncode, 5)
150
151    def test_check_output_stderr(self):
152        # check_output() function stderr redirected to stdout
153        output = subprocess.check_output(
154                [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"],
155                stderr=subprocess.STDOUT)
156        self.assertIn(b'BDFL', output)
157
158    def test_check_output_stdin_arg(self):
159        # check_output() can be called with stdin set to a file
160        tf = tempfile.TemporaryFile()
161        self.addCleanup(tf.close)
162        tf.write(b'pear')
163        tf.seek(0)
164        output = subprocess.check_output(
165                [sys.executable, "-c",
166                 "import sys; sys.stdout.write(sys.stdin.read().upper())"],
167                stdin=tf)
168        self.assertIn(b'PEAR', output)
169
170    def test_check_output_input_arg(self):
171        # check_output() can be called with input set to a string
172        output = subprocess.check_output(
173                [sys.executable, "-c",
174                 "import sys; sys.stdout.write(sys.stdin.read().upper())"],
175                input=b'pear')
176        self.assertIn(b'PEAR', output)
177
178    def test_check_output_stdout_arg(self):
179        # check_output() refuses to accept 'stdout' argument
180        with self.assertRaises(ValueError) as c:
181            output = subprocess.check_output(
182                    [sys.executable, "-c", "print('will not be run')"],
183                    stdout=sys.stdout)
184            self.fail("Expected ValueError when stdout arg supplied.")
185        self.assertIn('stdout', c.exception.args[0])
186
187    def test_check_output_stdin_with_input_arg(self):
188        # check_output() refuses to accept 'stdin' with 'input'
189        tf = tempfile.TemporaryFile()
190        self.addCleanup(tf.close)
191        tf.write(b'pear')
192        tf.seek(0)
193        with self.assertRaises(ValueError) as c:
194            output = subprocess.check_output(
195                    [sys.executable, "-c", "print('will not be run')"],
196                    stdin=tf, input=b'hare')
197            self.fail("Expected ValueError when stdin and input args supplied.")
198        self.assertIn('stdin', c.exception.args[0])
199        self.assertIn('input', c.exception.args[0])
200
201    def test_check_output_timeout(self):
202        # check_output() function with timeout arg
203        with self.assertRaises(subprocess.TimeoutExpired) as c:
204            output = subprocess.check_output(
205                    [sys.executable, "-c",
206                     "import sys, time\n"
207                     "sys.stdout.write('BDFL')\n"
208                     "sys.stdout.flush()\n"
209                     "time.sleep(3600)"],
210                    # Some heavily loaded buildbots (sparc Debian 3.x) require
211                    # this much time to start and print.
212                    timeout=3)
213            self.fail("Expected TimeoutExpired.")
214        self.assertEqual(c.exception.output, b'BDFL')
215
216    def test_call_kwargs(self):
217        # call() function with keyword args
218        newenv = os.environ.copy()
219        newenv["FRUIT"] = "banana"
220        rc = subprocess.call([sys.executable, "-c",
221                              'import sys, os;'
222                              'sys.exit(os.getenv("FRUIT")=="banana")'],
223                             env=newenv)
224        self.assertEqual(rc, 1)
225
226    def test_invalid_args(self):
227        # Popen() called with invalid arguments should raise TypeError
228        # but Popen.__del__ should not complain (issue #12085)
229        with support.captured_stderr() as s:
230            self.assertRaises(TypeError, subprocess.Popen, invalid_arg_name=1)
231            argcount = subprocess.Popen.__init__.__code__.co_argcount
232            too_many_args = [0] * (argcount + 1)
233            self.assertRaises(TypeError, subprocess.Popen, *too_many_args)
234        self.assertEqual(s.getvalue(), '')
235
236    def test_stdin_none(self):
237        # .stdin is None when not redirected
238        p = subprocess.Popen([sys.executable, "-c", 'print("banana")'],
239                         stdout=subprocess.PIPE, stderr=subprocess.PIPE)
240        self.addCleanup(p.stdout.close)
241        self.addCleanup(p.stderr.close)
242        p.wait()
243        self.assertEqual(p.stdin, None)
244
245    def test_stdout_none(self):
246        # .stdout is None when not redirected, and the child's stdout will
247        # be inherited from the parent.  In order to test this we run a
248        # subprocess in a subprocess:
249        # this_test
250        #   \-- subprocess created by this test (parent)
251        #          \-- subprocess created by the parent subprocess (child)
252        # The parent doesn't specify stdout, so the child will use the
253        # parent's stdout.  This test checks that the message printed by the
254        # child goes to the parent stdout.  The parent also checks that the
255        # child's stdout is None.  See #11963.
256        code = ('import sys; from subprocess import Popen, PIPE;'
257                'p = Popen([sys.executable, "-c", "print(\'test_stdout_none\')"],'
258                '          stdin=PIPE, stderr=PIPE);'
259                'p.wait(); assert p.stdout is None;')
260        p = subprocess.Popen([sys.executable, "-c", code],
261                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
262        self.addCleanup(p.stdout.close)
263        self.addCleanup(p.stderr.close)
264        out, err = p.communicate()
265        self.assertEqual(p.returncode, 0, err)
266        self.assertEqual(out.rstrip(), b'test_stdout_none')
267
268    def test_stderr_none(self):
269        # .stderr is None when not redirected
270        p = subprocess.Popen([sys.executable, "-c", 'print("banana")'],
271                         stdin=subprocess.PIPE, stdout=subprocess.PIPE)
272        self.addCleanup(p.stdout.close)
273        self.addCleanup(p.stdin.close)
274        p.wait()
275        self.assertEqual(p.stderr, None)
276
277    def _assert_python(self, pre_args, **kwargs):
278        # We include sys.exit() to prevent the test runner from hanging
279        # whenever python is found.
280        args = pre_args + ["import sys; sys.exit(47)"]
281        p = subprocess.Popen(args, **kwargs)
282        p.wait()
283        self.assertEqual(47, p.returncode)
284
285    def test_executable(self):
286        # Check that the executable argument works.
287        #
288        # On Unix (non-Mac and non-Windows), Python looks at args[0] to
289        # determine where its standard library is, so we need the directory
290        # of args[0] to be valid for the Popen() call to Python to succeed.
291        # See also issue #16170 and issue #7774.
292        doesnotexist = os.path.join(os.path.dirname(sys.executable),
293                                    "doesnotexist")
294        self._assert_python([doesnotexist, "-c"], executable=sys.executable)
295
296    def test_executable_takes_precedence(self):
297        # Check that the executable argument takes precedence over args[0].
298        #
299        # Verify first that the call succeeds without the executable arg.
300        pre_args = [sys.executable, "-c"]
301        self._assert_python(pre_args)
302        self.assertRaises((FileNotFoundError, PermissionError),
303                          self._assert_python, pre_args,
304                          executable="doesnotexist")
305
306    @unittest.skipIf(mswindows, "executable argument replaces shell")
307    def test_executable_replaces_shell(self):
308        # Check that the executable argument replaces the default shell
309        # when shell=True.
310        self._assert_python([], executable=sys.executable, shell=True)
311
312    # For use in the test_cwd* tests below.
313    def _normalize_cwd(self, cwd):
314        # Normalize an expected cwd (for Tru64 support).
315        # We can't use os.path.realpath since it doesn't expand Tru64 {memb}
316        # strings.  See bug #1063571.
317        with support.change_cwd(cwd):
318            return os.getcwd()
319
320    # For use in the test_cwd* tests below.
321    def _split_python_path(self):
322        # Return normalized (python_dir, python_base).
323        python_path = os.path.realpath(sys.executable)
324        return os.path.split(python_path)
325
326    # For use in the test_cwd* tests below.
327    def _assert_cwd(self, expected_cwd, python_arg, **kwargs):
328        # Invoke Python via Popen, and assert that (1) the call succeeds,
329        # and that (2) the current working directory of the child process
330        # matches *expected_cwd*.
331        p = subprocess.Popen([python_arg, "-c",
332                              "import os, sys; "
333                              "sys.stdout.write(os.getcwd()); "
334                              "sys.exit(47)"],
335                              stdout=subprocess.PIPE,
336                              **kwargs)
337        self.addCleanup(p.stdout.close)
338        p.wait()
339        self.assertEqual(47, p.returncode)
340        normcase = os.path.normcase
341        self.assertEqual(normcase(expected_cwd),
342                         normcase(p.stdout.read().decode("utf-8")))
343
344    def test_cwd(self):
345        # Check that cwd changes the cwd for the child process.
346        temp_dir = tempfile.gettempdir()
347        temp_dir = self._normalize_cwd(temp_dir)
348        self._assert_cwd(temp_dir, sys.executable, cwd=temp_dir)
349
350    def test_cwd_with_pathlike(self):
351        temp_dir = tempfile.gettempdir()
352        temp_dir = self._normalize_cwd(temp_dir)
353
354        class _PathLikeObj:
355            def __fspath__(self):
356                return temp_dir
357
358        self._assert_cwd(temp_dir, sys.executable, cwd=_PathLikeObj())
359
360    @unittest.skipIf(mswindows, "pending resolution of issue #15533")
361    def test_cwd_with_relative_arg(self):
362        # Check that Popen looks for args[0] relative to cwd if args[0]
363        # is relative.
364        python_dir, python_base = self._split_python_path()
365        rel_python = os.path.join(os.curdir, python_base)
366        with support.temp_cwd() as wrong_dir:
367            # Before calling with the correct cwd, confirm that the call fails
368            # without cwd and with the wrong cwd.
369            self.assertRaises(FileNotFoundError, subprocess.Popen,
370                              [rel_python])
371            self.assertRaises(FileNotFoundError, subprocess.Popen,
372                              [rel_python], cwd=wrong_dir)
373            python_dir = self._normalize_cwd(python_dir)
374            self._assert_cwd(python_dir, rel_python, cwd=python_dir)
375
376    @unittest.skipIf(mswindows, "pending resolution of issue #15533")
377    def test_cwd_with_relative_executable(self):
378        # Check that Popen looks for executable relative to cwd if executable
379        # is relative (and that executable takes precedence over args[0]).
380        python_dir, python_base = self._split_python_path()
381        rel_python = os.path.join(os.curdir, python_base)
382        doesntexist = "somethingyoudonthave"
383        with support.temp_cwd() as wrong_dir:
384            # Before calling with the correct cwd, confirm that the call fails
385            # without cwd and with the wrong cwd.
386            self.assertRaises(FileNotFoundError, subprocess.Popen,
387                              [doesntexist], executable=rel_python)
388            self.assertRaises(FileNotFoundError, subprocess.Popen,
389                              [doesntexist], executable=rel_python,
390                              cwd=wrong_dir)
391            python_dir = self._normalize_cwd(python_dir)
392            self._assert_cwd(python_dir, doesntexist, executable=rel_python,
393                             cwd=python_dir)
394
395    def test_cwd_with_absolute_arg(self):
396        # Check that Popen can find the executable when the cwd is wrong
397        # if args[0] is an absolute path.
398        python_dir, python_base = self._split_python_path()
399        abs_python = os.path.join(python_dir, python_base)
400        rel_python = os.path.join(os.curdir, python_base)
401        with support.temp_dir() as wrong_dir:
402            # Before calling with an absolute path, confirm that using a
403            # relative path fails.
404            self.assertRaises(FileNotFoundError, subprocess.Popen,
405                              [rel_python], cwd=wrong_dir)
406            wrong_dir = self._normalize_cwd(wrong_dir)
407            self._assert_cwd(wrong_dir, abs_python, cwd=wrong_dir)
408
409    @unittest.skipIf(sys.base_prefix != sys.prefix,
410                     'Test is not venv-compatible')
411    def test_executable_with_cwd(self):
412        python_dir, python_base = self._split_python_path()
413        python_dir = self._normalize_cwd(python_dir)
414        self._assert_cwd(python_dir, "somethingyoudonthave",
415                         executable=sys.executable, cwd=python_dir)
416
417    @unittest.skipIf(sys.base_prefix != sys.prefix,
418                     'Test is not venv-compatible')
419    @unittest.skipIf(sysconfig.is_python_build(),
420                     "need an installed Python. See #7774")
421    def test_executable_without_cwd(self):
422        # For a normal installation, it should work without 'cwd'
423        # argument.  For test runs in the build directory, see #7774.
424        self._assert_cwd(os.getcwd(), "somethingyoudonthave",
425                         executable=sys.executable)
426
427    def test_stdin_pipe(self):
428        # stdin redirection
429        p = subprocess.Popen([sys.executable, "-c",
430                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
431                        stdin=subprocess.PIPE)
432        p.stdin.write(b"pear")
433        p.stdin.close()
434        p.wait()
435        self.assertEqual(p.returncode, 1)
436
437    def test_stdin_filedes(self):
438        # stdin is set to open file descriptor
439        tf = tempfile.TemporaryFile()
440        self.addCleanup(tf.close)
441        d = tf.fileno()
442        os.write(d, b"pear")
443        os.lseek(d, 0, 0)
444        p = subprocess.Popen([sys.executable, "-c",
445                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
446                         stdin=d)
447        p.wait()
448        self.assertEqual(p.returncode, 1)
449
450    def test_stdin_fileobj(self):
451        # stdin is set to open file object
452        tf = tempfile.TemporaryFile()
453        self.addCleanup(tf.close)
454        tf.write(b"pear")
455        tf.seek(0)
456        p = subprocess.Popen([sys.executable, "-c",
457                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
458                         stdin=tf)
459        p.wait()
460        self.assertEqual(p.returncode, 1)
461
462    def test_stdout_pipe(self):
463        # stdout redirection
464        p = subprocess.Popen([sys.executable, "-c",
465                          'import sys; sys.stdout.write("orange")'],
466                         stdout=subprocess.PIPE)
467        with p:
468            self.assertEqual(p.stdout.read(), b"orange")
469
470    def test_stdout_filedes(self):
471        # stdout is set to open file descriptor
472        tf = tempfile.TemporaryFile()
473        self.addCleanup(tf.close)
474        d = tf.fileno()
475        p = subprocess.Popen([sys.executable, "-c",
476                          'import sys; sys.stdout.write("orange")'],
477                         stdout=d)
478        p.wait()
479        os.lseek(d, 0, 0)
480        self.assertEqual(os.read(d, 1024), b"orange")
481
482    def test_stdout_fileobj(self):
483        # stdout is set to open file object
484        tf = tempfile.TemporaryFile()
485        self.addCleanup(tf.close)
486        p = subprocess.Popen([sys.executable, "-c",
487                          'import sys; sys.stdout.write("orange")'],
488                         stdout=tf)
489        p.wait()
490        tf.seek(0)
491        self.assertEqual(tf.read(), b"orange")
492
493    def test_stderr_pipe(self):
494        # stderr redirection
495        p = subprocess.Popen([sys.executable, "-c",
496                          'import sys; sys.stderr.write("strawberry")'],
497                         stderr=subprocess.PIPE)
498        with p:
499            self.assertStderrEqual(p.stderr.read(), b"strawberry")
500
501    def test_stderr_filedes(self):
502        # stderr is set to open file descriptor
503        tf = tempfile.TemporaryFile()
504        self.addCleanup(tf.close)
505        d = tf.fileno()
506        p = subprocess.Popen([sys.executable, "-c",
507                          'import sys; sys.stderr.write("strawberry")'],
508                         stderr=d)
509        p.wait()
510        os.lseek(d, 0, 0)
511        self.assertStderrEqual(os.read(d, 1024), b"strawberry")
512
513    def test_stderr_fileobj(self):
514        # stderr is set to open file object
515        tf = tempfile.TemporaryFile()
516        self.addCleanup(tf.close)
517        p = subprocess.Popen([sys.executable, "-c",
518                          'import sys; sys.stderr.write("strawberry")'],
519                         stderr=tf)
520        p.wait()
521        tf.seek(0)
522        self.assertStderrEqual(tf.read(), b"strawberry")
523
524    def test_stderr_redirect_with_no_stdout_redirect(self):
525        # test stderr=STDOUT while stdout=None (not set)
526
527        # - grandchild prints to stderr
528        # - child redirects grandchild's stderr to its stdout
529        # - the parent should get grandchild's stderr in child's stdout
530        p = subprocess.Popen([sys.executable, "-c",
531                              'import sys, subprocess;'
532                              'rc = subprocess.call([sys.executable, "-c",'
533                              '    "import sys;"'
534                              '    "sys.stderr.write(\'42\')"],'
535                              '    stderr=subprocess.STDOUT);'
536                              'sys.exit(rc)'],
537                             stdout=subprocess.PIPE,
538                             stderr=subprocess.PIPE)
539        stdout, stderr = p.communicate()
540        #NOTE: stdout should get stderr from grandchild
541        self.assertStderrEqual(stdout, b'42')
542        self.assertStderrEqual(stderr, b'') # should be empty
543        self.assertEqual(p.returncode, 0)
544
545    def test_stdout_stderr_pipe(self):
546        # capture stdout and stderr to the same pipe
547        p = subprocess.Popen([sys.executable, "-c",
548                              'import sys;'
549                              'sys.stdout.write("apple");'
550                              'sys.stdout.flush();'
551                              'sys.stderr.write("orange")'],
552                             stdout=subprocess.PIPE,
553                             stderr=subprocess.STDOUT)
554        with p:
555            self.assertStderrEqual(p.stdout.read(), b"appleorange")
556
557    def test_stdout_stderr_file(self):
558        # capture stdout and stderr to the same open file
559        tf = tempfile.TemporaryFile()
560        self.addCleanup(tf.close)
561        p = subprocess.Popen([sys.executable, "-c",
562                              'import sys;'
563                              'sys.stdout.write("apple");'
564                              'sys.stdout.flush();'
565                              'sys.stderr.write("orange")'],
566                             stdout=tf,
567                             stderr=tf)
568        p.wait()
569        tf.seek(0)
570        self.assertStderrEqual(tf.read(), b"appleorange")
571
572    def test_stdout_filedes_of_stdout(self):
573        # stdout is set to 1 (#1531862).
574        # To avoid printing the text on stdout, we do something similar to
575        # test_stdout_none (see above).  The parent subprocess calls the child
576        # subprocess passing stdout=1, and this test uses stdout=PIPE in
577        # order to capture and check the output of the parent. See #11963.
578        code = ('import sys, subprocess; '
579                'rc = subprocess.call([sys.executable, "-c", '
580                '    "import os, sys; sys.exit(os.write(sys.stdout.fileno(), '
581                     'b\'test with stdout=1\'))"], stdout=1); '
582                'assert rc == 18')
583        p = subprocess.Popen([sys.executable, "-c", code],
584                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
585        self.addCleanup(p.stdout.close)
586        self.addCleanup(p.stderr.close)
587        out, err = p.communicate()
588        self.assertEqual(p.returncode, 0, err)
589        self.assertEqual(out.rstrip(), b'test with stdout=1')
590
591    def test_stdout_devnull(self):
592        p = subprocess.Popen([sys.executable, "-c",
593                              'for i in range(10240):'
594                              'print("x" * 1024)'],
595                              stdout=subprocess.DEVNULL)
596        p.wait()
597        self.assertEqual(p.stdout, None)
598
599    def test_stderr_devnull(self):
600        p = subprocess.Popen([sys.executable, "-c",
601                              'import sys\n'
602                              'for i in range(10240):'
603                              'sys.stderr.write("x" * 1024)'],
604                              stderr=subprocess.DEVNULL)
605        p.wait()
606        self.assertEqual(p.stderr, None)
607
608    def test_stdin_devnull(self):
609        p = subprocess.Popen([sys.executable, "-c",
610                              'import sys;'
611                              'sys.stdin.read(1)'],
612                              stdin=subprocess.DEVNULL)
613        p.wait()
614        self.assertEqual(p.stdin, None)
615
616    def test_env(self):
617        newenv = os.environ.copy()
618        newenv["FRUIT"] = "orange"
619        with subprocess.Popen([sys.executable, "-c",
620                               'import sys,os;'
621                               'sys.stdout.write(os.getenv("FRUIT"))'],
622                              stdout=subprocess.PIPE,
623                              env=newenv) as p:
624            stdout, stderr = p.communicate()
625            self.assertEqual(stdout, b"orange")
626
627    # Windows requires at least the SYSTEMROOT environment variable to start
628    # Python
629    @unittest.skipIf(sys.platform == 'win32',
630                     'cannot test an empty env on Windows')
631    @unittest.skipIf(sysconfig.get_config_var('Py_ENABLE_SHARED') is not None,
632                     'the python library cannot be loaded '
633                     'with an empty environment')
634    def test_empty_env(self):
635        with subprocess.Popen([sys.executable, "-c",
636                               'import os; '
637                               'print(list(os.environ.keys()))'],
638                              stdout=subprocess.PIPE,
639                              env={}) as p:
640            stdout, stderr = p.communicate()
641            self.assertIn(stdout.strip(),
642                (b"[]",
643                 # Mac OS X adds __CF_USER_TEXT_ENCODING variable to an empty
644                 # environment
645                 b"['__CF_USER_TEXT_ENCODING']"))
646
647    def test_communicate_stdin(self):
648        p = subprocess.Popen([sys.executable, "-c",
649                              'import sys;'
650                              'sys.exit(sys.stdin.read() == "pear")'],
651                             stdin=subprocess.PIPE)
652        p.communicate(b"pear")
653        self.assertEqual(p.returncode, 1)
654
655    def test_communicate_stdout(self):
656        p = subprocess.Popen([sys.executable, "-c",
657                              'import sys; sys.stdout.write("pineapple")'],
658                             stdout=subprocess.PIPE)
659        (stdout, stderr) = p.communicate()
660        self.assertEqual(stdout, b"pineapple")
661        self.assertEqual(stderr, None)
662
663    def test_communicate_stderr(self):
664        p = subprocess.Popen([sys.executable, "-c",
665                              'import sys; sys.stderr.write("pineapple")'],
666                             stderr=subprocess.PIPE)
667        (stdout, stderr) = p.communicate()
668        self.assertEqual(stdout, None)
669        self.assertStderrEqual(stderr, b"pineapple")
670
671    def test_communicate(self):
672        p = subprocess.Popen([sys.executable, "-c",
673                              'import sys,os;'
674                              'sys.stderr.write("pineapple");'
675                              'sys.stdout.write(sys.stdin.read())'],
676                             stdin=subprocess.PIPE,
677                             stdout=subprocess.PIPE,
678                             stderr=subprocess.PIPE)
679        self.addCleanup(p.stdout.close)
680        self.addCleanup(p.stderr.close)
681        self.addCleanup(p.stdin.close)
682        (stdout, stderr) = p.communicate(b"banana")
683        self.assertEqual(stdout, b"banana")
684        self.assertStderrEqual(stderr, b"pineapple")
685
686    def test_communicate_timeout(self):
687        p = subprocess.Popen([sys.executable, "-c",
688                              'import sys,os,time;'
689                              'sys.stderr.write("pineapple\\n");'
690                              'time.sleep(1);'
691                              'sys.stderr.write("pear\\n");'
692                              'sys.stdout.write(sys.stdin.read())'],
693                             universal_newlines=True,
694                             stdin=subprocess.PIPE,
695                             stdout=subprocess.PIPE,
696                             stderr=subprocess.PIPE)
697        self.assertRaises(subprocess.TimeoutExpired, p.communicate, "banana",
698                          timeout=0.3)
699        # Make sure we can keep waiting for it, and that we get the whole output
700        # after it completes.
701        (stdout, stderr) = p.communicate()
702        self.assertEqual(stdout, "banana")
703        self.assertStderrEqual(stderr.encode(), b"pineapple\npear\n")
704
705    def test_communicate_timeout_large_output(self):
706        # Test an expiring timeout while the child is outputting lots of data.
707        p = subprocess.Popen([sys.executable, "-c",
708                              'import sys,os,time;'
709                              'sys.stdout.write("a" * (64 * 1024));'
710                              'time.sleep(0.2);'
711                              'sys.stdout.write("a" * (64 * 1024));'
712                              'time.sleep(0.2);'
713                              'sys.stdout.write("a" * (64 * 1024));'
714                              'time.sleep(0.2);'
715                              'sys.stdout.write("a" * (64 * 1024));'],
716                             stdout=subprocess.PIPE)
717        self.assertRaises(subprocess.TimeoutExpired, p.communicate, timeout=0.4)
718        (stdout, _) = p.communicate()
719        self.assertEqual(len(stdout), 4 * 64 * 1024)
720
721    # Test for the fd leak reported in http://bugs.python.org/issue2791.
722    def test_communicate_pipe_fd_leak(self):
723        for stdin_pipe in (False, True):
724            for stdout_pipe in (False, True):
725                for stderr_pipe in (False, True):
726                    options = {}
727                    if stdin_pipe:
728                        options['stdin'] = subprocess.PIPE
729                    if stdout_pipe:
730                        options['stdout'] = subprocess.PIPE
731                    if stderr_pipe:
732                        options['stderr'] = subprocess.PIPE
733                    if not options:
734                        continue
735                    p = subprocess.Popen((sys.executable, "-c", "pass"), **options)
736                    p.communicate()
737                    if p.stdin is not None:
738                        self.assertTrue(p.stdin.closed)
739                    if p.stdout is not None:
740                        self.assertTrue(p.stdout.closed)
741                    if p.stderr is not None:
742                        self.assertTrue(p.stderr.closed)
743
744    def test_communicate_returns(self):
745        # communicate() should return None if no redirection is active
746        p = subprocess.Popen([sys.executable, "-c",
747                              "import sys; sys.exit(47)"])
748        (stdout, stderr) = p.communicate()
749        self.assertEqual(stdout, None)
750        self.assertEqual(stderr, None)
751
752    def test_communicate_pipe_buf(self):
753        # communicate() with writes larger than pipe_buf
754        # This test will probably deadlock rather than fail, if
755        # communicate() does not work properly.
756        x, y = os.pipe()
757        os.close(x)
758        os.close(y)
759        p = subprocess.Popen([sys.executable, "-c",
760                              'import sys,os;'
761                              'sys.stdout.write(sys.stdin.read(47));'
762                              'sys.stderr.write("x" * %d);'
763                              'sys.stdout.write(sys.stdin.read())' %
764                              support.PIPE_MAX_SIZE],
765                             stdin=subprocess.PIPE,
766                             stdout=subprocess.PIPE,
767                             stderr=subprocess.PIPE)
768        self.addCleanup(p.stdout.close)
769        self.addCleanup(p.stderr.close)
770        self.addCleanup(p.stdin.close)
771        string_to_write = b"a" * support.PIPE_MAX_SIZE
772        (stdout, stderr) = p.communicate(string_to_write)
773        self.assertEqual(stdout, string_to_write)
774
775    def test_writes_before_communicate(self):
776        # stdin.write before communicate()
777        p = subprocess.Popen([sys.executable, "-c",
778                              'import sys,os;'
779                              'sys.stdout.write(sys.stdin.read())'],
780                             stdin=subprocess.PIPE,
781                             stdout=subprocess.PIPE,
782                             stderr=subprocess.PIPE)
783        self.addCleanup(p.stdout.close)
784        self.addCleanup(p.stderr.close)
785        self.addCleanup(p.stdin.close)
786        p.stdin.write(b"banana")
787        (stdout, stderr) = p.communicate(b"split")
788        self.assertEqual(stdout, b"bananasplit")
789        self.assertStderrEqual(stderr, b"")
790
791    def test_universal_newlines(self):
792        p = subprocess.Popen([sys.executable, "-c",
793                              'import sys,os;' + SETBINARY +
794                              'buf = sys.stdout.buffer;'
795                              'buf.write(sys.stdin.readline().encode());'
796                              'buf.flush();'
797                              'buf.write(b"line2\\n");'
798                              'buf.flush();'
799                              'buf.write(sys.stdin.read().encode());'
800                              'buf.flush();'
801                              'buf.write(b"line4\\n");'
802                              'buf.flush();'
803                              'buf.write(b"line5\\r\\n");'
804                              'buf.flush();'
805                              'buf.write(b"line6\\r");'
806                              'buf.flush();'
807                              'buf.write(b"\\nline7");'
808                              'buf.flush();'
809                              'buf.write(b"\\nline8");'],
810                             stdin=subprocess.PIPE,
811                             stdout=subprocess.PIPE,
812                             universal_newlines=1)
813        with p:
814            p.stdin.write("line1\n")
815            p.stdin.flush()
816            self.assertEqual(p.stdout.readline(), "line1\n")
817            p.stdin.write("line3\n")
818            p.stdin.close()
819            self.addCleanup(p.stdout.close)
820            self.assertEqual(p.stdout.readline(),
821                             "line2\n")
822            self.assertEqual(p.stdout.read(6),
823                             "line3\n")
824            self.assertEqual(p.stdout.read(),
825                             "line4\nline5\nline6\nline7\nline8")
826
827    def test_universal_newlines_communicate(self):
828        # universal newlines through communicate()
829        p = subprocess.Popen([sys.executable, "-c",
830                              'import sys,os;' + SETBINARY +
831                              'buf = sys.stdout.buffer;'
832                              'buf.write(b"line2\\n");'
833                              'buf.flush();'
834                              'buf.write(b"line4\\n");'
835                              'buf.flush();'
836                              'buf.write(b"line5\\r\\n");'
837                              'buf.flush();'
838                              'buf.write(b"line6\\r");'
839                              'buf.flush();'
840                              'buf.write(b"\\nline7");'
841                              'buf.flush();'
842                              'buf.write(b"\\nline8");'],
843                             stderr=subprocess.PIPE,
844                             stdout=subprocess.PIPE,
845                             universal_newlines=1)
846        self.addCleanup(p.stdout.close)
847        self.addCleanup(p.stderr.close)
848        (stdout, stderr) = p.communicate()
849        self.assertEqual(stdout,
850                         "line2\nline4\nline5\nline6\nline7\nline8")
851
852    def test_universal_newlines_communicate_stdin(self):
853        # universal newlines through communicate(), with only stdin
854        p = subprocess.Popen([sys.executable, "-c",
855                              'import sys,os;' + SETBINARY + textwrap.dedent('''
856                               s = sys.stdin.readline()
857                               assert s == "line1\\n", repr(s)
858                               s = sys.stdin.read()
859                               assert s == "line3\\n", repr(s)
860                              ''')],
861                             stdin=subprocess.PIPE,
862                             universal_newlines=1)
863        (stdout, stderr) = p.communicate("line1\nline3\n")
864        self.assertEqual(p.returncode, 0)
865
866    def test_universal_newlines_communicate_input_none(self):
867        # Test communicate(input=None) with universal newlines.
868        #
869        # We set stdout to PIPE because, as of this writing, a different
870        # code path is tested when the number of pipes is zero or one.
871        p = subprocess.Popen([sys.executable, "-c", "pass"],
872                             stdin=subprocess.PIPE,
873                             stdout=subprocess.PIPE,
874                             universal_newlines=True)
875        p.communicate()
876        self.assertEqual(p.returncode, 0)
877
878    def test_universal_newlines_communicate_stdin_stdout_stderr(self):
879        # universal newlines through communicate(), with stdin, stdout, stderr
880        p = subprocess.Popen([sys.executable, "-c",
881                              'import sys,os;' + SETBINARY + textwrap.dedent('''
882                               s = sys.stdin.buffer.readline()
883                               sys.stdout.buffer.write(s)
884                               sys.stdout.buffer.write(b"line2\\r")
885                               sys.stderr.buffer.write(b"eline2\\n")
886                               s = sys.stdin.buffer.read()
887                               sys.stdout.buffer.write(s)
888                               sys.stdout.buffer.write(b"line4\\n")
889                               sys.stdout.buffer.write(b"line5\\r\\n")
890                               sys.stderr.buffer.write(b"eline6\\r")
891                               sys.stderr.buffer.write(b"eline7\\r\\nz")
892                              ''')],
893                             stdin=subprocess.PIPE,
894                             stderr=subprocess.PIPE,
895                             stdout=subprocess.PIPE,
896                             universal_newlines=True)
897        self.addCleanup(p.stdout.close)
898        self.addCleanup(p.stderr.close)
899        (stdout, stderr) = p.communicate("line1\nline3\n")
900        self.assertEqual(p.returncode, 0)
901        self.assertEqual("line1\nline2\nline3\nline4\nline5\n", stdout)
902        # Python debug build push something like "[42442 refs]\n"
903        # to stderr at exit of subprocess.
904        # Don't use assertStderrEqual because it strips CR and LF from output.
905        self.assertTrue(stderr.startswith("eline2\neline6\neline7\n"))
906
907    def test_universal_newlines_communicate_encodings(self):
908        # Check that universal newlines mode works for various encodings,
909        # in particular for encodings in the UTF-16 and UTF-32 families.
910        # See issue #15595.
911        #
912        # UTF-16 and UTF-32-BE are sufficient to check both with BOM and
913        # without, and UTF-16 and UTF-32.
914        for encoding in ['utf-16', 'utf-32-be']:
915            code = ("import sys; "
916                    r"sys.stdout.buffer.write('1\r\n2\r3\n4'.encode('%s'))" %
917                    encoding)
918            args = [sys.executable, '-c', code]
919            # We set stdin to be non-None because, as of this writing,
920            # a different code path is used when the number of pipes is
921            # zero or one.
922            popen = subprocess.Popen(args,
923                                     stdin=subprocess.PIPE,
924                                     stdout=subprocess.PIPE,
925                                     encoding=encoding)
926            stdout, stderr = popen.communicate(input='')
927            self.assertEqual(stdout, '1\n2\n3\n4')
928
929    def test_communicate_errors(self):
930        for errors, expected in [
931            ('ignore', ''),
932            ('replace', '\ufffd\ufffd'),
933            ('surrogateescape', '\udc80\udc80'),
934            ('backslashreplace', '\\x80\\x80'),
935        ]:
936            code = ("import sys; "
937                    r"sys.stdout.buffer.write(b'[\x80\x80]')")
938            args = [sys.executable, '-c', code]
939            # We set stdin to be non-None because, as of this writing,
940            # a different code path is used when the number of pipes is
941            # zero or one.
942            popen = subprocess.Popen(args,
943                                     stdin=subprocess.PIPE,
944                                     stdout=subprocess.PIPE,
945                                     encoding='utf-8',
946                                     errors=errors)
947            stdout, stderr = popen.communicate(input='')
948            self.assertEqual(stdout, '[{}]'.format(expected))
949
950    def test_no_leaking(self):
951        # Make sure we leak no resources
952        if not mswindows:
953            max_handles = 1026 # too much for most UNIX systems
954        else:
955            max_handles = 2050 # too much for (at least some) Windows setups
956        handles = []
957        tmpdir = tempfile.mkdtemp()
958        try:
959            for i in range(max_handles):
960                try:
961                    tmpfile = os.path.join(tmpdir, support.TESTFN)
962                    handles.append(os.open(tmpfile, os.O_WRONLY|os.O_CREAT))
963                except OSError as e:
964                    if e.errno != errno.EMFILE:
965                        raise
966                    break
967            else:
968                self.skipTest("failed to reach the file descriptor limit "
969                    "(tried %d)" % max_handles)
970            # Close a couple of them (should be enough for a subprocess)
971            for i in range(10):
972                os.close(handles.pop())
973            # Loop creating some subprocesses. If one of them leaks some fds,
974            # the next loop iteration will fail by reaching the max fd limit.
975            for i in range(15):
976                p = subprocess.Popen([sys.executable, "-c",
977                                      "import sys;"
978                                      "sys.stdout.write(sys.stdin.read())"],
979                                     stdin=subprocess.PIPE,
980                                     stdout=subprocess.PIPE,
981                                     stderr=subprocess.PIPE)
982                data = p.communicate(b"lime")[0]
983                self.assertEqual(data, b"lime")
984        finally:
985            for h in handles:
986                os.close(h)
987            shutil.rmtree(tmpdir)
988
989    def test_list2cmdline(self):
990        self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
991                         '"a b c" d e')
992        self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),
993                         'ab\\"c \\ d')
994        self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']),
995                         'ab\\"c " \\\\" d')
996        self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),
997                         'a\\\\\\b "de fg" h')
998        self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),
999                         'a\\\\\\"b c d')
1000        self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),
1001                         '"a\\\\b c" d e')
1002        self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),
1003                         '"a\\\\b\\ c" d e')
1004        self.assertEqual(subprocess.list2cmdline(['ab', '']),
1005                         'ab ""')
1006
1007    def test_poll(self):
1008        p = subprocess.Popen([sys.executable, "-c",
1009                              "import os; os.read(0, 1)"],
1010                             stdin=subprocess.PIPE)
1011        self.addCleanup(p.stdin.close)
1012        self.assertIsNone(p.poll())
1013        os.write(p.stdin.fileno(), b'A')
1014        p.wait()
1015        # Subsequent invocations should just return the returncode
1016        self.assertEqual(p.poll(), 0)
1017
1018    def test_wait(self):
1019        p = subprocess.Popen([sys.executable, "-c", "pass"])
1020        self.assertEqual(p.wait(), 0)
1021        # Subsequent invocations should just return the returncode
1022        self.assertEqual(p.wait(), 0)
1023
1024    def test_wait_timeout(self):
1025        p = subprocess.Popen([sys.executable,
1026                              "-c", "import time; time.sleep(0.3)"])
1027        with self.assertRaises(subprocess.TimeoutExpired) as c:
1028            p.wait(timeout=0.0001)
1029        self.assertIn("0.0001", str(c.exception))  # For coverage of __str__.
1030        # Some heavily loaded buildbots (sparc Debian 3.x) require this much
1031        # time to start.
1032        self.assertEqual(p.wait(timeout=3), 0)
1033
1034    def test_wait_endtime(self):
1035        """Confirm that the deprecated endtime parameter warns."""
1036        p = subprocess.Popen([sys.executable, "-c", "pass"])
1037        try:
1038            with self.assertWarns(DeprecationWarning) as warn_cm:
1039                p.wait(endtime=time.time()+0.01)
1040        except subprocess.TimeoutExpired:
1041            pass  # We're not testing endtime timeout behavior.
1042        finally:
1043            p.kill()
1044        self.assertIn('test_subprocess.py', warn_cm.filename)
1045        self.assertIn('endtime', str(warn_cm.warning))
1046
1047    def test_invalid_bufsize(self):
1048        # an invalid type of the bufsize argument should raise
1049        # TypeError.
1050        with self.assertRaises(TypeError):
1051            subprocess.Popen([sys.executable, "-c", "pass"], "orange")
1052
1053    def test_bufsize_is_none(self):
1054        # bufsize=None should be the same as bufsize=0.
1055        p = subprocess.Popen([sys.executable, "-c", "pass"], None)
1056        self.assertEqual(p.wait(), 0)
1057        # Again with keyword arg
1058        p = subprocess.Popen([sys.executable, "-c", "pass"], bufsize=None)
1059        self.assertEqual(p.wait(), 0)
1060
1061    def _test_bufsize_equal_one(self, line, expected, universal_newlines):
1062        # subprocess may deadlock with bufsize=1, see issue #21332
1063        with subprocess.Popen([sys.executable, "-c", "import sys;"
1064                               "sys.stdout.write(sys.stdin.readline());"
1065                               "sys.stdout.flush()"],
1066                              stdin=subprocess.PIPE,
1067                              stdout=subprocess.PIPE,
1068                              stderr=subprocess.DEVNULL,
1069                              bufsize=1,
1070                              universal_newlines=universal_newlines) as p:
1071            p.stdin.write(line) # expect that it flushes the line in text mode
1072            os.close(p.stdin.fileno()) # close it without flushing the buffer
1073            read_line = p.stdout.readline()
1074            try:
1075                p.stdin.close()
1076            except OSError:
1077                pass
1078            p.stdin = None
1079        self.assertEqual(p.returncode, 0)
1080        self.assertEqual(read_line, expected)
1081
1082    def test_bufsize_equal_one_text_mode(self):
1083        # line is flushed in text mode with bufsize=1.
1084        # we should get the full line in return
1085        line = "line\n"
1086        self._test_bufsize_equal_one(line, line, universal_newlines=True)
1087
1088    def test_bufsize_equal_one_binary_mode(self):
1089        # line is not flushed in binary mode with bufsize=1.
1090        # we should get empty response
1091        line = b'line' + os.linesep.encode() # assume ascii-based locale
1092        self._test_bufsize_equal_one(line, b'', universal_newlines=False)
1093
1094    def test_leaking_fds_on_error(self):
1095        # see bug #5179: Popen leaks file descriptors to PIPEs if
1096        # the child fails to execute; this will eventually exhaust
1097        # the maximum number of open fds. 1024 seems a very common
1098        # value for that limit, but Windows has 2048, so we loop
1099        # 1024 times (each call leaked two fds).
1100        for i in range(1024):
1101            with self.assertRaises(OSError) as c:
1102                subprocess.Popen(['nonexisting_i_hope'],
1103                                 stdout=subprocess.PIPE,
1104                                 stderr=subprocess.PIPE)
1105            # ignore errors that indicate the command was not found
1106            if c.exception.errno not in (errno.ENOENT, errno.EACCES):
1107                raise c.exception
1108
1109    @unittest.skipIf(threading is None, "threading required")
1110    def test_double_close_on_error(self):
1111        # Issue #18851
1112        fds = []
1113        def open_fds():
1114            for i in range(20):
1115                fds.extend(os.pipe())
1116                time.sleep(0.001)
1117        t = threading.Thread(target=open_fds)
1118        t.start()
1119        try:
1120            with self.assertRaises(EnvironmentError):
1121                subprocess.Popen(['nonexisting_i_hope'],
1122                                 stdin=subprocess.PIPE,
1123                                 stdout=subprocess.PIPE,
1124                                 stderr=subprocess.PIPE)
1125        finally:
1126            t.join()
1127            exc = None
1128            for fd in fds:
1129                # If a double close occurred, some of those fds will
1130                # already have been closed by mistake, and os.close()
1131                # here will raise.
1132                try:
1133                    os.close(fd)
1134                except OSError as e:
1135                    exc = e
1136            if exc is not None:
1137                raise exc
1138
1139    @unittest.skipIf(threading is None, "threading required")
1140    def test_threadsafe_wait(self):
1141        """Issue21291: Popen.wait() needs to be threadsafe for returncode."""
1142        proc = subprocess.Popen([sys.executable, '-c',
1143                                 'import time; time.sleep(12)'])
1144        self.assertEqual(proc.returncode, None)
1145        results = []
1146
1147        def kill_proc_timer_thread():
1148            results.append(('thread-start-poll-result', proc.poll()))
1149            # terminate it from the thread and wait for the result.
1150            proc.kill()
1151            proc.wait()
1152            results.append(('thread-after-kill-and-wait', proc.returncode))
1153            # this wait should be a no-op given the above.
1154            proc.wait()
1155            results.append(('thread-after-second-wait', proc.returncode))
1156
1157        # This is a timing sensitive test, the failure mode is
1158        # triggered when both the main thread and this thread are in
1159        # the wait() call at once.  The delay here is to allow the
1160        # main thread to most likely be blocked in its wait() call.
1161        t = threading.Timer(0.2, kill_proc_timer_thread)
1162        t.start()
1163
1164        if mswindows:
1165            expected_errorcode = 1
1166        else:
1167            # Should be -9 because of the proc.kill() from the thread.
1168            expected_errorcode = -9
1169
1170        # Wait for the process to finish; the thread should kill it
1171        # long before it finishes on its own.  Supplying a timeout
1172        # triggers a different code path for better coverage.
1173        proc.wait(timeout=20)
1174        self.assertEqual(proc.returncode, expected_errorcode,
1175                         msg="unexpected result in wait from main thread")
1176
1177        # This should be a no-op with no change in returncode.
1178        proc.wait()
1179        self.assertEqual(proc.returncode, expected_errorcode,
1180                         msg="unexpected result in second main wait.")
1181
1182        t.join()
1183        # Ensure that all of the thread results are as expected.
1184        # When a race condition occurs in wait(), the returncode could
1185        # be set by the wrong thread that doesn't actually have it
1186        # leading to an incorrect value.
1187        self.assertEqual([('thread-start-poll-result', None),
1188                          ('thread-after-kill-and-wait', expected_errorcode),
1189                          ('thread-after-second-wait', expected_errorcode)],
1190                         results)
1191
1192    def test_issue8780(self):
1193        # Ensure that stdout is inherited from the parent
1194        # if stdout=PIPE is not used
1195        code = ';'.join((
1196            'import subprocess, sys',
1197            'retcode = subprocess.call('
1198                "[sys.executable, '-c', 'print(\"Hello World!\")'])",
1199            'assert retcode == 0'))
1200        output = subprocess.check_output([sys.executable, '-c', code])
1201        self.assertTrue(output.startswith(b'Hello World!'), ascii(output))
1202
1203    def test_handles_closed_on_exception(self):
1204        # If CreateProcess exits with an error, ensure the
1205        # duplicate output handles are released
1206        ifhandle, ifname = tempfile.mkstemp()
1207        ofhandle, ofname = tempfile.mkstemp()
1208        efhandle, efname = tempfile.mkstemp()
1209        try:
1210            subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle,
1211              stderr=efhandle)
1212        except OSError:
1213            os.close(ifhandle)
1214            os.remove(ifname)
1215            os.close(ofhandle)
1216            os.remove(ofname)
1217            os.close(efhandle)
1218            os.remove(efname)
1219        self.assertFalse(os.path.exists(ifname))
1220        self.assertFalse(os.path.exists(ofname))
1221        self.assertFalse(os.path.exists(efname))
1222
1223    def test_communicate_epipe(self):
1224        # Issue 10963: communicate() should hide EPIPE
1225        p = subprocess.Popen([sys.executable, "-c", 'pass'],
1226                             stdin=subprocess.PIPE,
1227                             stdout=subprocess.PIPE,
1228                             stderr=subprocess.PIPE)
1229        self.addCleanup(p.stdout.close)
1230        self.addCleanup(p.stderr.close)
1231        self.addCleanup(p.stdin.close)
1232        p.communicate(b"x" * 2**20)
1233
1234    def test_communicate_epipe_only_stdin(self):
1235        # Issue 10963: communicate() should hide EPIPE
1236        p = subprocess.Popen([sys.executable, "-c", 'pass'],
1237                             stdin=subprocess.PIPE)
1238        self.addCleanup(p.stdin.close)
1239        p.wait()
1240        p.communicate(b"x" * 2**20)
1241
1242    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'),
1243                         "Requires signal.SIGUSR1")
1244    @unittest.skipUnless(hasattr(os, 'kill'),
1245                         "Requires os.kill")
1246    @unittest.skipUnless(hasattr(os, 'getppid'),
1247                         "Requires os.getppid")
1248    def test_communicate_eintr(self):
1249        # Issue #12493: communicate() should handle EINTR
1250        def handler(signum, frame):
1251            pass
1252        old_handler = signal.signal(signal.SIGUSR1, handler)
1253        self.addCleanup(signal.signal, signal.SIGUSR1, old_handler)
1254
1255        args = [sys.executable, "-c",
1256                'import os, signal;'
1257                'os.kill(os.getppid(), signal.SIGUSR1)']
1258        for stream in ('stdout', 'stderr'):
1259            kw = {stream: subprocess.PIPE}
1260            with subprocess.Popen(args, **kw) as process:
1261                # communicate() will be interrupted by SIGUSR1
1262                process.communicate()
1263
1264
1265    # This test is Linux-ish specific for simplicity to at least have
1266    # some coverage.  It is not a platform specific bug.
1267    @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()),
1268                         "Linux specific")
1269    def test_failed_child_execute_fd_leak(self):
1270        """Test for the fork() failure fd leak reported in issue16327."""
1271        fd_directory = '/proc/%d/fd' % os.getpid()
1272        fds_before_popen = os.listdir(fd_directory)
1273        with self.assertRaises(PopenTestException):
1274            PopenExecuteChildRaises(
1275                    [sys.executable, '-c', 'pass'], stdin=subprocess.PIPE,
1276                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1277
1278        # NOTE: This test doesn't verify that the real _execute_child
1279        # does not close the file descriptors itself on the way out
1280        # during an exception.  Code inspection has confirmed that.
1281
1282        fds_after_exception = os.listdir(fd_directory)
1283        self.assertEqual(fds_before_popen, fds_after_exception)
1284
1285
1286class RunFuncTestCase(BaseTestCase):
1287    def run_python(self, code, **kwargs):
1288        """Run Python code in a subprocess using subprocess.run"""
1289        argv = [sys.executable, "-c", code]
1290        return subprocess.run(argv, **kwargs)
1291
1292    def test_returncode(self):
1293        # call() function with sequence argument
1294        cp = self.run_python("import sys; sys.exit(47)")
1295        self.assertEqual(cp.returncode, 47)
1296        with self.assertRaises(subprocess.CalledProcessError):
1297            cp.check_returncode()
1298
1299    def test_check(self):
1300        with self.assertRaises(subprocess.CalledProcessError) as c:
1301            self.run_python("import sys; sys.exit(47)", check=True)
1302        self.assertEqual(c.exception.returncode, 47)
1303
1304    def test_check_zero(self):
1305        # check_returncode shouldn't raise when returncode is zero
1306        cp = self.run_python("import sys; sys.exit(0)", check=True)
1307        self.assertEqual(cp.returncode, 0)
1308
1309    def test_timeout(self):
1310        # run() function with timeout argument; we want to test that the child
1311        # process gets killed when the timeout expires.  If the child isn't
1312        # killed, this call will deadlock since subprocess.run waits for the
1313        # child.
1314        with self.assertRaises(subprocess.TimeoutExpired):
1315            self.run_python("while True: pass", timeout=0.0001)
1316
1317    def test_capture_stdout(self):
1318        # capture stdout with zero return code
1319        cp = self.run_python("print('BDFL')", stdout=subprocess.PIPE)
1320        self.assertIn(b'BDFL', cp.stdout)
1321
1322    def test_capture_stderr(self):
1323        cp = self.run_python("import sys; sys.stderr.write('BDFL')",
1324                             stderr=subprocess.PIPE)
1325        self.assertIn(b'BDFL', cp.stderr)
1326
1327    def test_check_output_stdin_arg(self):
1328        # run() can be called with stdin set to a file
1329        tf = tempfile.TemporaryFile()
1330        self.addCleanup(tf.close)
1331        tf.write(b'pear')
1332        tf.seek(0)
1333        cp = self.run_python(
1334                 "import sys; sys.stdout.write(sys.stdin.read().upper())",
1335                stdin=tf, stdout=subprocess.PIPE)
1336        self.assertIn(b'PEAR', cp.stdout)
1337
1338    def test_check_output_input_arg(self):
1339        # check_output() can be called with input set to a string
1340        cp = self.run_python(
1341                "import sys; sys.stdout.write(sys.stdin.read().upper())",
1342                input=b'pear', stdout=subprocess.PIPE)
1343        self.assertIn(b'PEAR', cp.stdout)
1344
1345    def test_check_output_stdin_with_input_arg(self):
1346        # run() refuses to accept 'stdin' with 'input'
1347        tf = tempfile.TemporaryFile()
1348        self.addCleanup(tf.close)
1349        tf.write(b'pear')
1350        tf.seek(0)
1351        with self.assertRaises(ValueError,
1352              msg="Expected ValueError when stdin and input args supplied.") as c:
1353            output = self.run_python("print('will not be run')",
1354                                     stdin=tf, input=b'hare')
1355        self.assertIn('stdin', c.exception.args[0])
1356        self.assertIn('input', c.exception.args[0])
1357
1358    def test_check_output_timeout(self):
1359        with self.assertRaises(subprocess.TimeoutExpired) as c:
1360            cp = self.run_python((
1361                     "import sys, time\n"
1362                     "sys.stdout.write('BDFL')\n"
1363                     "sys.stdout.flush()\n"
1364                     "time.sleep(3600)"),
1365                    # Some heavily loaded buildbots (sparc Debian 3.x) require
1366                    # this much time to start and print.
1367                    timeout=3, stdout=subprocess.PIPE)
1368        self.assertEqual(c.exception.output, b'BDFL')
1369        # output is aliased to stdout
1370        self.assertEqual(c.exception.stdout, b'BDFL')
1371
1372    def test_run_kwargs(self):
1373        newenv = os.environ.copy()
1374        newenv["FRUIT"] = "banana"
1375        cp = self.run_python(('import sys, os;'
1376                      'sys.exit(33 if os.getenv("FRUIT")=="banana" else 31)'),
1377                             env=newenv)
1378        self.assertEqual(cp.returncode, 33)
1379
1380
1381@unittest.skipIf(mswindows, "POSIX specific tests")
1382class POSIXProcessTestCase(BaseTestCase):
1383
1384    def setUp(self):
1385        super().setUp()
1386        self._nonexistent_dir = "/_this/pa.th/does/not/exist"
1387
1388    def _get_chdir_exception(self):
1389        try:
1390            os.chdir(self._nonexistent_dir)
1391        except OSError as e:
1392            # This avoids hard coding the errno value or the OS perror()
1393            # string and instead capture the exception that we want to see
1394            # below for comparison.
1395            desired_exception = e
1396            desired_exception.strerror += ': ' + repr(self._nonexistent_dir)
1397        else:
1398            self.fail("chdir to nonexistent directory %s succeeded." %
1399                      self._nonexistent_dir)
1400        return desired_exception
1401
1402    def test_exception_cwd(self):
1403        """Test error in the child raised in the parent for a bad cwd."""
1404        desired_exception = self._get_chdir_exception()
1405        try:
1406            p = subprocess.Popen([sys.executable, "-c", ""],
1407                                 cwd=self._nonexistent_dir)
1408        except OSError as e:
1409            # Test that the child process chdir failure actually makes
1410            # it up to the parent process as the correct exception.
1411            self.assertEqual(desired_exception.errno, e.errno)
1412            self.assertEqual(desired_exception.strerror, e.strerror)
1413        else:
1414            self.fail("Expected OSError: %s" % desired_exception)
1415
1416    def test_exception_bad_executable(self):
1417        """Test error in the child raised in the parent for a bad executable."""
1418        desired_exception = self._get_chdir_exception()
1419        try:
1420            p = subprocess.Popen([sys.executable, "-c", ""],
1421                                 executable=self._nonexistent_dir)
1422        except OSError as e:
1423            # Test that the child process exec failure actually makes
1424            # it up to the parent process as the correct exception.
1425            self.assertEqual(desired_exception.errno, e.errno)
1426            self.assertEqual(desired_exception.strerror, e.strerror)
1427        else:
1428            self.fail("Expected OSError: %s" % desired_exception)
1429
1430    def test_exception_bad_args_0(self):
1431        """Test error in the child raised in the parent for a bad args[0]."""
1432        desired_exception = self._get_chdir_exception()
1433        try:
1434            p = subprocess.Popen([self._nonexistent_dir, "-c", ""])
1435        except OSError as e:
1436            # Test that the child process exec failure actually makes
1437            # it up to the parent process as the correct exception.
1438            self.assertEqual(desired_exception.errno, e.errno)
1439            self.assertEqual(desired_exception.strerror, e.strerror)
1440        else:
1441            self.fail("Expected OSError: %s" % desired_exception)
1442
1443    def test_restore_signals(self):
1444        # Code coverage for both values of restore_signals to make sure it
1445        # at least does not blow up.
1446        # A test for behavior would be complex.  Contributions welcome.
1447        subprocess.call([sys.executable, "-c", ""], restore_signals=True)
1448        subprocess.call([sys.executable, "-c", ""], restore_signals=False)
1449
1450    def test_start_new_session(self):
1451        # For code coverage of calling setsid().  We don't care if we get an
1452        # EPERM error from it depending on the test execution environment, that
1453        # still indicates that it was called.
1454        try:
1455            output = subprocess.check_output(
1456                    [sys.executable, "-c",
1457                     "import os; print(os.getpgid(os.getpid()))"],
1458                    start_new_session=True)
1459        except OSError as e:
1460            if e.errno != errno.EPERM:
1461                raise
1462        else:
1463            parent_pgid = os.getpgid(os.getpid())
1464            child_pgid = int(output)
1465            self.assertNotEqual(parent_pgid, child_pgid)
1466
1467    def test_run_abort(self):
1468        # returncode handles signal termination
1469        with support.SuppressCrashReport():
1470            p = subprocess.Popen([sys.executable, "-c",
1471                                  'import os; os.abort()'])
1472            p.wait()
1473        self.assertEqual(-p.returncode, signal.SIGABRT)
1474
1475    def test_CalledProcessError_str_signal(self):
1476        err = subprocess.CalledProcessError(-int(signal.SIGABRT), "fake cmd")
1477        error_string = str(err)
1478        # We're relying on the repr() of the signal.Signals intenum to provide
1479        # the word signal, the signal name and the numeric value.
1480        self.assertIn("signal", error_string.lower())
1481        # We're not being specific about the signal name as some signals have
1482        # multiple names and which name is revealed can vary.
1483        self.assertIn("SIG", error_string)
1484        self.assertIn(str(signal.SIGABRT), error_string)
1485
1486    def test_CalledProcessError_str_unknown_signal(self):
1487        err = subprocess.CalledProcessError(-9876543, "fake cmd")
1488        error_string = str(err)
1489        self.assertIn("unknown signal 9876543.", error_string)
1490
1491    def test_CalledProcessError_str_non_zero(self):
1492        err = subprocess.CalledProcessError(2, "fake cmd")
1493        error_string = str(err)
1494        self.assertIn("non-zero exit status 2.", error_string)
1495
1496    def test_preexec(self):
1497        # DISCLAIMER: Setting environment variables is *not* a good use
1498        # of a preexec_fn.  This is merely a test.
1499        p = subprocess.Popen([sys.executable, "-c",
1500                              'import sys,os;'
1501                              'sys.stdout.write(os.getenv("FRUIT"))'],
1502                             stdout=subprocess.PIPE,
1503                             preexec_fn=lambda: os.putenv("FRUIT", "apple"))
1504        with p:
1505            self.assertEqual(p.stdout.read(), b"apple")
1506
1507    def test_preexec_exception(self):
1508        def raise_it():
1509            raise ValueError("What if two swallows carried a coconut?")
1510        try:
1511            p = subprocess.Popen([sys.executable, "-c", ""],
1512                                 preexec_fn=raise_it)
1513        except subprocess.SubprocessError as e:
1514            self.assertTrue(
1515                    subprocess._posixsubprocess,
1516                    "Expected a ValueError from the preexec_fn")
1517        except ValueError as e:
1518            self.assertIn("coconut", e.args[0])
1519        else:
1520            self.fail("Exception raised by preexec_fn did not make it "
1521                      "to the parent process.")
1522
1523    class _TestExecuteChildPopen(subprocess.Popen):
1524        """Used to test behavior at the end of _execute_child."""
1525        def __init__(self, testcase, *args, **kwargs):
1526            self._testcase = testcase
1527            subprocess.Popen.__init__(self, *args, **kwargs)
1528
1529        def _execute_child(self, *args, **kwargs):
1530            try:
1531                subprocess.Popen._execute_child(self, *args, **kwargs)
1532            finally:
1533                # Open a bunch of file descriptors and verify that
1534                # none of them are the same as the ones the Popen
1535                # instance is using for stdin/stdout/stderr.
1536                devzero_fds = [os.open("/dev/zero", os.O_RDONLY)
1537                               for _ in range(8)]
1538                try:
1539                    for fd in devzero_fds:
1540                        self._testcase.assertNotIn(
1541                                fd, (self.stdin.fileno(), self.stdout.fileno(),
1542                                     self.stderr.fileno()),
1543                                msg="At least one fd was closed early.")
1544                finally:
1545                    for fd in devzero_fds:
1546                        os.close(fd)
1547
1548    @unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.")
1549    def test_preexec_errpipe_does_not_double_close_pipes(self):
1550        """Issue16140: Don't double close pipes on preexec error."""
1551
1552        def raise_it():
1553            raise subprocess.SubprocessError(
1554                    "force the _execute_child() errpipe_data path.")
1555
1556        with self.assertRaises(subprocess.SubprocessError):
1557            self._TestExecuteChildPopen(
1558                        self, [sys.executable, "-c", "pass"],
1559                        stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1560                        stderr=subprocess.PIPE, preexec_fn=raise_it)
1561
1562    def test_preexec_gc_module_failure(self):
1563        # This tests the code that disables garbage collection if the child
1564        # process will execute any Python.
1565        def raise_runtime_error():
1566            raise RuntimeError("this shouldn't escape")
1567        enabled = gc.isenabled()
1568        orig_gc_disable = gc.disable
1569        orig_gc_isenabled = gc.isenabled
1570        try:
1571            gc.disable()
1572            self.assertFalse(gc.isenabled())
1573            subprocess.call([sys.executable, '-c', ''],
1574                            preexec_fn=lambda: None)
1575            self.assertFalse(gc.isenabled(),
1576                             "Popen enabled gc when it shouldn't.")
1577
1578            gc.enable()
1579            self.assertTrue(gc.isenabled())
1580            subprocess.call([sys.executable, '-c', ''],
1581                            preexec_fn=lambda: None)
1582            self.assertTrue(gc.isenabled(), "Popen left gc disabled.")
1583
1584            gc.disable = raise_runtime_error
1585            self.assertRaises(RuntimeError, subprocess.Popen,
1586                              [sys.executable, '-c', ''],
1587                              preexec_fn=lambda: None)
1588
1589            del gc.isenabled  # force an AttributeError
1590            self.assertRaises(AttributeError, subprocess.Popen,
1591                              [sys.executable, '-c', ''],
1592                              preexec_fn=lambda: None)
1593        finally:
1594            gc.disable = orig_gc_disable
1595            gc.isenabled = orig_gc_isenabled
1596            if not enabled:
1597                gc.disable()
1598
1599    @unittest.skipIf(
1600        sys.platform == 'darwin', 'setrlimit() seems to fail on OS X')
1601    def test_preexec_fork_failure(self):
1602        # The internal code did not preserve the previous exception when
1603        # re-enabling garbage collection
1604        try:
1605            from resource import getrlimit, setrlimit, RLIMIT_NPROC
1606        except ImportError as err:
1607            self.skipTest(err)  # RLIMIT_NPROC is specific to Linux and BSD
1608        limits = getrlimit(RLIMIT_NPROC)
1609        [_, hard] = limits
1610        setrlimit(RLIMIT_NPROC, (0, hard))
1611        self.addCleanup(setrlimit, RLIMIT_NPROC, limits)
1612        try:
1613            subprocess.call([sys.executable, '-c', ''],
1614                            preexec_fn=lambda: None)
1615        except BlockingIOError:
1616            # Forking should raise EAGAIN, translated to BlockingIOError
1617            pass
1618        else:
1619            self.skipTest('RLIMIT_NPROC had no effect; probably superuser')
1620
1621    def test_args_string(self):
1622        # args is a string
1623        fd, fname = tempfile.mkstemp()
1624        # reopen in text mode
1625        with open(fd, "w", errors="surrogateescape") as fobj:
1626            fobj.write("#!%s\n" % support.unix_shell)
1627            fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" %
1628                       sys.executable)
1629        os.chmod(fname, 0o700)
1630        p = subprocess.Popen(fname)
1631        p.wait()
1632        os.remove(fname)
1633        self.assertEqual(p.returncode, 47)
1634
1635    def test_invalid_args(self):
1636        # invalid arguments should raise ValueError
1637        self.assertRaises(ValueError, subprocess.call,
1638                          [sys.executable, "-c",
1639                           "import sys; sys.exit(47)"],
1640                          startupinfo=47)
1641        self.assertRaises(ValueError, subprocess.call,
1642                          [sys.executable, "-c",
1643                           "import sys; sys.exit(47)"],
1644                          creationflags=47)
1645
1646    def test_shell_sequence(self):
1647        # Run command through the shell (sequence)
1648        newenv = os.environ.copy()
1649        newenv["FRUIT"] = "apple"
1650        p = subprocess.Popen(["echo $FRUIT"], shell=1,
1651                             stdout=subprocess.PIPE,
1652                             env=newenv)
1653        with p:
1654            self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
1655
1656    def test_shell_string(self):
1657        # Run command through the shell (string)
1658        newenv = os.environ.copy()
1659        newenv["FRUIT"] = "apple"
1660        p = subprocess.Popen("echo $FRUIT", shell=1,
1661                             stdout=subprocess.PIPE,
1662                             env=newenv)
1663        with p:
1664            self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
1665
1666    def test_call_string(self):
1667        # call() function with string argument on UNIX
1668        fd, fname = tempfile.mkstemp()
1669        # reopen in text mode
1670        with open(fd, "w", errors="surrogateescape") as fobj:
1671            fobj.write("#!%s\n" % support.unix_shell)
1672            fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" %
1673                       sys.executable)
1674        os.chmod(fname, 0o700)
1675        rc = subprocess.call(fname)
1676        os.remove(fname)
1677        self.assertEqual(rc, 47)
1678
1679    def test_specific_shell(self):
1680        # Issue #9265: Incorrect name passed as arg[0].
1681        shells = []
1682        for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']:
1683            for name in ['bash', 'ksh']:
1684                sh = os.path.join(prefix, name)
1685                if os.path.isfile(sh):
1686                    shells.append(sh)
1687        if not shells: # Will probably work for any shell but csh.
1688            self.skipTest("bash or ksh required for this test")
1689        sh = '/bin/sh'
1690        if os.path.isfile(sh) and not os.path.islink(sh):
1691            # Test will fail if /bin/sh is a symlink to csh.
1692            shells.append(sh)
1693        for sh in shells:
1694            p = subprocess.Popen("echo $0", executable=sh, shell=True,
1695                                 stdout=subprocess.PIPE)
1696            with p:
1697                self.assertEqual(p.stdout.read().strip(), bytes(sh, 'ascii'))
1698
1699    def _kill_process(self, method, *args):
1700        # Do not inherit file handles from the parent.
1701        # It should fix failures on some platforms.
1702        # Also set the SIGINT handler to the default to make sure it's not
1703        # being ignored (some tests rely on that.)
1704        old_handler = signal.signal(signal.SIGINT, signal.default_int_handler)
1705        try:
1706            p = subprocess.Popen([sys.executable, "-c", """if 1:
1707                                 import sys, time
1708                                 sys.stdout.write('x\\n')
1709                                 sys.stdout.flush()
1710                                 time.sleep(30)
1711                                 """],
1712                                 close_fds=True,
1713                                 stdin=subprocess.PIPE,
1714                                 stdout=subprocess.PIPE,
1715                                 stderr=subprocess.PIPE)
1716        finally:
1717            signal.signal(signal.SIGINT, old_handler)
1718        # Wait for the interpreter to be completely initialized before
1719        # sending any signal.
1720        p.stdout.read(1)
1721        getattr(p, method)(*args)
1722        return p
1723
1724    @unittest.skipIf(sys.platform.startswith(('netbsd', 'openbsd')),
1725                     "Due to known OS bug (issue #16762)")
1726    def _kill_dead_process(self, method, *args):
1727        # Do not inherit file handles from the parent.
1728        # It should fix failures on some platforms.
1729        p = subprocess.Popen([sys.executable, "-c", """if 1:
1730                             import sys, time
1731                             sys.stdout.write('x\\n')
1732                             sys.stdout.flush()
1733                             """],
1734                             close_fds=True,
1735                             stdin=subprocess.PIPE,
1736                             stdout=subprocess.PIPE,
1737                             stderr=subprocess.PIPE)
1738        # Wait for the interpreter to be completely initialized before
1739        # sending any signal.
1740        p.stdout.read(1)
1741        # The process should end after this
1742        time.sleep(1)
1743        # This shouldn't raise even though the child is now dead
1744        getattr(p, method)(*args)
1745        p.communicate()
1746
1747    def test_send_signal(self):
1748        p = self._kill_process('send_signal', signal.SIGINT)
1749        _, stderr = p.communicate()
1750        self.assertIn(b'KeyboardInterrupt', stderr)
1751        self.assertNotEqual(p.wait(), 0)
1752
1753    def test_kill(self):
1754        p = self._kill_process('kill')
1755        _, stderr = p.communicate()
1756        self.assertStderrEqual(stderr, b'')
1757        self.assertEqual(p.wait(), -signal.SIGKILL)
1758
1759    def test_terminate(self):
1760        p = self._kill_process('terminate')
1761        _, stderr = p.communicate()
1762        self.assertStderrEqual(stderr, b'')
1763        self.assertEqual(p.wait(), -signal.SIGTERM)
1764
1765    def test_send_signal_dead(self):
1766        # Sending a signal to a dead process
1767        self._kill_dead_process('send_signal', signal.SIGINT)
1768
1769    def test_kill_dead(self):
1770        # Killing a dead process
1771        self._kill_dead_process('kill')
1772
1773    def test_terminate_dead(self):
1774        # Terminating a dead process
1775        self._kill_dead_process('terminate')
1776
1777    def _save_fds(self, save_fds):
1778        fds = []
1779        for fd in save_fds:
1780            inheritable = os.get_inheritable(fd)
1781            saved = os.dup(fd)
1782            fds.append((fd, saved, inheritable))
1783        return fds
1784
1785    def _restore_fds(self, fds):
1786        for fd, saved, inheritable in fds:
1787            os.dup2(saved, fd, inheritable=inheritable)
1788            os.close(saved)
1789
1790    def check_close_std_fds(self, fds):
1791        # Issue #9905: test that subprocess pipes still work properly with
1792        # some standard fds closed
1793        stdin = 0
1794        saved_fds = self._save_fds(fds)
1795        for fd, saved, inheritable in saved_fds:
1796            if fd == 0:
1797                stdin = saved
1798                break
1799        try:
1800            for fd in fds:
1801                os.close(fd)
1802            out, err = subprocess.Popen([sys.executable, "-c",
1803                              'import sys;'
1804                              'sys.stdout.write("apple");'
1805                              'sys.stdout.flush();'
1806                              'sys.stderr.write("orange")'],
1807                       stdin=stdin,
1808                       stdout=subprocess.PIPE,
1809                       stderr=subprocess.PIPE).communicate()
1810            err = support.strip_python_stderr(err)
1811            self.assertEqual((out, err), (b'apple', b'orange'))
1812        finally:
1813            self._restore_fds(saved_fds)
1814
1815    def test_close_fd_0(self):
1816        self.check_close_std_fds([0])
1817
1818    def test_close_fd_1(self):
1819        self.check_close_std_fds([1])
1820
1821    def test_close_fd_2(self):
1822        self.check_close_std_fds([2])
1823
1824    def test_close_fds_0_1(self):
1825        self.check_close_std_fds([0, 1])
1826
1827    def test_close_fds_0_2(self):
1828        self.check_close_std_fds([0, 2])
1829
1830    def test_close_fds_1_2(self):
1831        self.check_close_std_fds([1, 2])
1832
1833    def test_close_fds_0_1_2(self):
1834        # Issue #10806: test that subprocess pipes still work properly with
1835        # all standard fds closed.
1836        self.check_close_std_fds([0, 1, 2])
1837
1838    def test_small_errpipe_write_fd(self):
1839        """Issue #15798: Popen should work when stdio fds are available."""
1840        new_stdin = os.dup(0)
1841        new_stdout = os.dup(1)
1842        try:
1843            os.close(0)
1844            os.close(1)
1845
1846            # Side test: if errpipe_write fails to have its CLOEXEC
1847            # flag set this should cause the parent to think the exec
1848            # failed.  Extremely unlikely: everyone supports CLOEXEC.
1849            subprocess.Popen([
1850                    sys.executable, "-c",
1851                    "print('AssertionError:0:CLOEXEC failure.')"]).wait()
1852        finally:
1853            # Restore original stdin and stdout
1854            os.dup2(new_stdin, 0)
1855            os.dup2(new_stdout, 1)
1856            os.close(new_stdin)
1857            os.close(new_stdout)
1858
1859    def test_remapping_std_fds(self):
1860        # open up some temporary files
1861        temps = [tempfile.mkstemp() for i in range(3)]
1862        try:
1863            temp_fds = [fd for fd, fname in temps]
1864
1865            # unlink the files -- we won't need to reopen them
1866            for fd, fname in temps:
1867                os.unlink(fname)
1868
1869            # write some data to what will become stdin, and rewind
1870            os.write(temp_fds[1], b"STDIN")
1871            os.lseek(temp_fds[1], 0, 0)
1872
1873            # move the standard file descriptors out of the way
1874            saved_fds = self._save_fds(range(3))
1875            try:
1876                # duplicate the file objects over the standard fd's
1877                for fd, temp_fd in enumerate(temp_fds):
1878                    os.dup2(temp_fd, fd)
1879
1880                # now use those files in the "wrong" order, so that subprocess
1881                # has to rearrange them in the child
1882                p = subprocess.Popen([sys.executable, "-c",
1883                    'import sys; got = sys.stdin.read();'
1884                    'sys.stdout.write("got %s"%got); sys.stderr.write("err")'],
1885                    stdin=temp_fds[1],
1886                    stdout=temp_fds[2],
1887                    stderr=temp_fds[0])
1888                p.wait()
1889            finally:
1890                self._restore_fds(saved_fds)
1891
1892            for fd in temp_fds:
1893                os.lseek(fd, 0, 0)
1894
1895            out = os.read(temp_fds[2], 1024)
1896            err = support.strip_python_stderr(os.read(temp_fds[0], 1024))
1897            self.assertEqual(out, b"got STDIN")
1898            self.assertEqual(err, b"err")
1899
1900        finally:
1901            for fd in temp_fds:
1902                os.close(fd)
1903
1904    def check_swap_fds(self, stdin_no, stdout_no, stderr_no):
1905        # open up some temporary files
1906        temps = [tempfile.mkstemp() for i in range(3)]
1907        temp_fds = [fd for fd, fname in temps]
1908        try:
1909            # unlink the files -- we won't need to reopen them
1910            for fd, fname in temps:
1911                os.unlink(fname)
1912
1913            # save a copy of the standard file descriptors
1914            saved_fds = self._save_fds(range(3))
1915            try:
1916                # duplicate the temp files over the standard fd's 0, 1, 2
1917                for fd, temp_fd in enumerate(temp_fds):
1918                    os.dup2(temp_fd, fd)
1919
1920                # write some data to what will become stdin, and rewind
1921                os.write(stdin_no, b"STDIN")
1922                os.lseek(stdin_no, 0, 0)
1923
1924                # now use those files in the given order, so that subprocess
1925                # has to rearrange them in the child
1926                p = subprocess.Popen([sys.executable, "-c",
1927                    'import sys; got = sys.stdin.read();'
1928                    'sys.stdout.write("got %s"%got); sys.stderr.write("err")'],
1929                    stdin=stdin_no,
1930                    stdout=stdout_no,
1931                    stderr=stderr_no)
1932                p.wait()
1933
1934                for fd in temp_fds:
1935                    os.lseek(fd, 0, 0)
1936
1937                out = os.read(stdout_no, 1024)
1938                err = support.strip_python_stderr(os.read(stderr_no, 1024))
1939            finally:
1940                self._restore_fds(saved_fds)
1941
1942            self.assertEqual(out, b"got STDIN")
1943            self.assertEqual(err, b"err")
1944
1945        finally:
1946            for fd in temp_fds:
1947                os.close(fd)
1948
1949    # When duping fds, if there arises a situation where one of the fds is
1950    # either 0, 1 or 2, it is possible that it is overwritten (#12607).
1951    # This tests all combinations of this.
1952    def test_swap_fds(self):
1953        self.check_swap_fds(0, 1, 2)
1954        self.check_swap_fds(0, 2, 1)
1955        self.check_swap_fds(1, 0, 2)
1956        self.check_swap_fds(1, 2, 0)
1957        self.check_swap_fds(2, 0, 1)
1958        self.check_swap_fds(2, 1, 0)
1959
1960    def test_surrogates_error_message(self):
1961        def prepare():
1962            raise ValueError("surrogate:\uDCff")
1963
1964        try:
1965            subprocess.call(
1966                [sys.executable, "-c", "pass"],
1967                preexec_fn=prepare)
1968        except ValueError as err:
1969            # Pure Python implementations keeps the message
1970            self.assertIsNone(subprocess._posixsubprocess)
1971            self.assertEqual(str(err), "surrogate:\uDCff")
1972        except subprocess.SubprocessError as err:
1973            # _posixsubprocess uses a default message
1974            self.assertIsNotNone(subprocess._posixsubprocess)
1975            self.assertEqual(str(err), "Exception occurred in preexec_fn.")
1976        else:
1977            self.fail("Expected ValueError or subprocess.SubprocessError")
1978
1979    def test_undecodable_env(self):
1980        for key, value in (('test', 'abc\uDCFF'), ('test\uDCFF', '42')):
1981            encoded_value = value.encode("ascii", "surrogateescape")
1982
1983            # test str with surrogates
1984            script = "import os; print(ascii(os.getenv(%s)))" % repr(key)
1985            env = os.environ.copy()
1986            env[key] = value
1987            # Use C locale to get ASCII for the locale encoding to force
1988            # surrogate-escaping of \xFF in the child process; otherwise it can
1989            # be decoded as-is if the default locale is latin-1.
1990            env['LC_ALL'] = 'C'
1991            if sys.platform.startswith("aix"):
1992                # On AIX, the C locale uses the Latin1 encoding
1993                decoded_value = encoded_value.decode("latin1", "surrogateescape")
1994            else:
1995                # On other UNIXes, the C locale uses the ASCII encoding
1996                decoded_value = value
1997            stdout = subprocess.check_output(
1998                [sys.executable, "-c", script],
1999                env=env)
2000            stdout = stdout.rstrip(b'\n\r')
2001            self.assertEqual(stdout.decode('ascii'), ascii(decoded_value))
2002
2003            # test bytes
2004            key = key.encode("ascii", "surrogateescape")
2005            script = "import os; print(ascii(os.getenvb(%s)))" % repr(key)
2006            env = os.environ.copy()
2007            env[key] = encoded_value
2008            stdout = subprocess.check_output(
2009                [sys.executable, "-c", script],
2010                env=env)
2011            stdout = stdout.rstrip(b'\n\r')
2012            self.assertEqual(stdout.decode('ascii'), ascii(encoded_value))
2013
2014    def test_bytes_program(self):
2015        abs_program = os.fsencode(sys.executable)
2016        path, program = os.path.split(sys.executable)
2017        program = os.fsencode(program)
2018
2019        # absolute bytes path
2020        exitcode = subprocess.call([abs_program, "-c", "pass"])
2021        self.assertEqual(exitcode, 0)
2022
2023        # absolute bytes path as a string
2024        cmd = b"'" + abs_program + b"' -c pass"
2025        exitcode = subprocess.call(cmd, shell=True)
2026        self.assertEqual(exitcode, 0)
2027
2028        # bytes program, unicode PATH
2029        env = os.environ.copy()
2030        env["PATH"] = path
2031        exitcode = subprocess.call([program, "-c", "pass"], env=env)
2032        self.assertEqual(exitcode, 0)
2033
2034        # bytes program, bytes PATH
2035        envb = os.environb.copy()
2036        envb[b"PATH"] = os.fsencode(path)
2037        exitcode = subprocess.call([program, "-c", "pass"], env=envb)
2038        self.assertEqual(exitcode, 0)
2039
2040    def test_pipe_cloexec(self):
2041        sleeper = support.findfile("input_reader.py", subdir="subprocessdata")
2042        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2043
2044        p1 = subprocess.Popen([sys.executable, sleeper],
2045                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
2046                              stderr=subprocess.PIPE, close_fds=False)
2047
2048        self.addCleanup(p1.communicate, b'')
2049
2050        p2 = subprocess.Popen([sys.executable, fd_status],
2051                              stdout=subprocess.PIPE, close_fds=False)
2052
2053        output, error = p2.communicate()
2054        result_fds = set(map(int, output.split(b',')))
2055        unwanted_fds = set([p1.stdin.fileno(), p1.stdout.fileno(),
2056                            p1.stderr.fileno()])
2057
2058        self.assertFalse(result_fds & unwanted_fds,
2059                         "Expected no fds from %r to be open in child, "
2060                         "found %r" %
2061                              (unwanted_fds, result_fds & unwanted_fds))
2062
2063    def test_pipe_cloexec_real_tools(self):
2064        qcat = support.findfile("qcat.py", subdir="subprocessdata")
2065        qgrep = support.findfile("qgrep.py", subdir="subprocessdata")
2066
2067        subdata = b'zxcvbn'
2068        data = subdata * 4 + b'\n'
2069
2070        p1 = subprocess.Popen([sys.executable, qcat],
2071                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
2072                              close_fds=False)
2073
2074        p2 = subprocess.Popen([sys.executable, qgrep, subdata],
2075                              stdin=p1.stdout, stdout=subprocess.PIPE,
2076                              close_fds=False)
2077
2078        self.addCleanup(p1.wait)
2079        self.addCleanup(p2.wait)
2080        def kill_p1():
2081            try:
2082                p1.terminate()
2083            except ProcessLookupError:
2084                pass
2085        def kill_p2():
2086            try:
2087                p2.terminate()
2088            except ProcessLookupError:
2089                pass
2090        self.addCleanup(kill_p1)
2091        self.addCleanup(kill_p2)
2092
2093        p1.stdin.write(data)
2094        p1.stdin.close()
2095
2096        readfiles, ignored1, ignored2 = select.select([p2.stdout], [], [], 10)
2097
2098        self.assertTrue(readfiles, "The child hung")
2099        self.assertEqual(p2.stdout.read(), data)
2100
2101        p1.stdout.close()
2102        p2.stdout.close()
2103
2104    def test_close_fds(self):
2105        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2106
2107        fds = os.pipe()
2108        self.addCleanup(os.close, fds[0])
2109        self.addCleanup(os.close, fds[1])
2110
2111        open_fds = set(fds)
2112        # add a bunch more fds
2113        for _ in range(9):
2114            fd = os.open(os.devnull, os.O_RDONLY)
2115            self.addCleanup(os.close, fd)
2116            open_fds.add(fd)
2117
2118        for fd in open_fds:
2119            os.set_inheritable(fd, True)
2120
2121        p = subprocess.Popen([sys.executable, fd_status],
2122                             stdout=subprocess.PIPE, close_fds=False)
2123        output, ignored = p.communicate()
2124        remaining_fds = set(map(int, output.split(b',')))
2125
2126        self.assertEqual(remaining_fds & open_fds, open_fds,
2127                         "Some fds were closed")
2128
2129        p = subprocess.Popen([sys.executable, fd_status],
2130                             stdout=subprocess.PIPE, close_fds=True)
2131        output, ignored = p.communicate()
2132        remaining_fds = set(map(int, output.split(b',')))
2133
2134        self.assertFalse(remaining_fds & open_fds,
2135                         "Some fds were left open")
2136        self.assertIn(1, remaining_fds, "Subprocess failed")
2137
2138        # Keep some of the fd's we opened open in the subprocess.
2139        # This tests _posixsubprocess.c's proper handling of fds_to_keep.
2140        fds_to_keep = set(open_fds.pop() for _ in range(8))
2141        p = subprocess.Popen([sys.executable, fd_status],
2142                             stdout=subprocess.PIPE, close_fds=True,
2143                             pass_fds=())
2144        output, ignored = p.communicate()
2145        remaining_fds = set(map(int, output.split(b',')))
2146
2147        self.assertFalse(remaining_fds & fds_to_keep & open_fds,
2148                         "Some fds not in pass_fds were left open")
2149        self.assertIn(1, remaining_fds, "Subprocess failed")
2150
2151
2152    @unittest.skipIf(sys.platform.startswith("freebsd") and
2153                     os.stat("/dev").st_dev == os.stat("/dev/fd").st_dev,
2154                     "Requires fdescfs mounted on /dev/fd on FreeBSD.")
2155    def test_close_fds_when_max_fd_is_lowered(self):
2156        """Confirm that issue21618 is fixed (may fail under valgrind)."""
2157        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2158
2159        # This launches the meat of the test in a child process to
2160        # avoid messing with the larger unittest processes maximum
2161        # number of file descriptors.
2162        #  This process launches:
2163        #  +--> Process that lowers its RLIMIT_NOFILE aftr setting up
2164        #    a bunch of high open fds above the new lower rlimit.
2165        #    Those are reported via stdout before launching a new
2166        #    process with close_fds=False to run the actual test:
2167        #    +--> The TEST: This one launches a fd_status.py
2168        #      subprocess with close_fds=True so we can find out if
2169        #      any of the fds above the lowered rlimit are still open.
2170        p = subprocess.Popen([sys.executable, '-c', textwrap.dedent(
2171        '''
2172        import os, resource, subprocess, sys, textwrap
2173        open_fds = set()
2174        # Add a bunch more fds to pass down.
2175        for _ in range(40):
2176            fd = os.open(os.devnull, os.O_RDONLY)
2177            open_fds.add(fd)
2178
2179        # Leave a two pairs of low ones available for use by the
2180        # internal child error pipe and the stdout pipe.
2181        # We also leave 10 more open as some Python buildbots run into
2182        # "too many open files" errors during the test if we do not.
2183        for fd in sorted(open_fds)[:14]:
2184            os.close(fd)
2185            open_fds.remove(fd)
2186
2187        for fd in open_fds:
2188            #self.addCleanup(os.close, fd)
2189            os.set_inheritable(fd, True)
2190
2191        max_fd_open = max(open_fds)
2192
2193        # Communicate the open_fds to the parent unittest.TestCase process.
2194        print(','.join(map(str, sorted(open_fds))))
2195        sys.stdout.flush()
2196
2197        rlim_cur, rlim_max = resource.getrlimit(resource.RLIMIT_NOFILE)
2198        try:
2199            # 29 is lower than the highest fds we are leaving open.
2200            resource.setrlimit(resource.RLIMIT_NOFILE, (29, rlim_max))
2201            # Launch a new Python interpreter with our low fd rlim_cur that
2202            # inherits open fds above that limit.  It then uses subprocess
2203            # with close_fds=True to get a report of open fds in the child.
2204            # An explicit list of fds to check is passed to fd_status.py as
2205            # letting fd_status rely on its default logic would miss the
2206            # fds above rlim_cur as it normally only checks up to that limit.
2207            subprocess.Popen(
2208                [sys.executable, '-c',
2209                 textwrap.dedent("""
2210                     import subprocess, sys
2211                     subprocess.Popen([sys.executable, %r] +
2212                                      [str(x) for x in range({max_fd})],
2213                                      close_fds=True).wait()
2214                     """.format(max_fd=max_fd_open+1))],
2215                close_fds=False).wait()
2216        finally:
2217            resource.setrlimit(resource.RLIMIT_NOFILE, (rlim_cur, rlim_max))
2218        ''' % fd_status)], stdout=subprocess.PIPE)
2219
2220        output, unused_stderr = p.communicate()
2221        output_lines = output.splitlines()
2222        self.assertEqual(len(output_lines), 2,
2223                         msg="expected exactly two lines of output:\n%r" % output)
2224        opened_fds = set(map(int, output_lines[0].strip().split(b',')))
2225        remaining_fds = set(map(int, output_lines[1].strip().split(b',')))
2226
2227        self.assertFalse(remaining_fds & opened_fds,
2228                         msg="Some fds were left open.")
2229
2230
2231    # Mac OS X Tiger (10.4) has a kernel bug: sometimes, the file
2232    # descriptor of a pipe closed in the parent process is valid in the
2233    # child process according to fstat(), but the mode of the file
2234    # descriptor is invalid, and read or write raise an error.
2235    @support.requires_mac_ver(10, 5)
2236    def test_pass_fds(self):
2237        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2238
2239        open_fds = set()
2240
2241        for x in range(5):
2242            fds = os.pipe()
2243            self.addCleanup(os.close, fds[0])
2244            self.addCleanup(os.close, fds[1])
2245            os.set_inheritable(fds[0], True)
2246            os.set_inheritable(fds[1], True)
2247            open_fds.update(fds)
2248
2249        for fd in open_fds:
2250            p = subprocess.Popen([sys.executable, fd_status],
2251                                 stdout=subprocess.PIPE, close_fds=True,
2252                                 pass_fds=(fd, ))
2253            output, ignored = p.communicate()
2254
2255            remaining_fds = set(map(int, output.split(b',')))
2256            to_be_closed = open_fds - {fd}
2257
2258            self.assertIn(fd, remaining_fds, "fd to be passed not passed")
2259            self.assertFalse(remaining_fds & to_be_closed,
2260                             "fd to be closed passed")
2261
2262            # pass_fds overrides close_fds with a warning.
2263            with self.assertWarns(RuntimeWarning) as context:
2264                self.assertFalse(subprocess.call(
2265                        [sys.executable, "-c", "import sys; sys.exit(0)"],
2266                        close_fds=False, pass_fds=(fd, )))
2267            self.assertIn('overriding close_fds', str(context.warning))
2268
2269    def test_pass_fds_inheritable(self):
2270        script = support.findfile("fd_status.py", subdir="subprocessdata")
2271
2272        inheritable, non_inheritable = os.pipe()
2273        self.addCleanup(os.close, inheritable)
2274        self.addCleanup(os.close, non_inheritable)
2275        os.set_inheritable(inheritable, True)
2276        os.set_inheritable(non_inheritable, False)
2277        pass_fds = (inheritable, non_inheritable)
2278        args = [sys.executable, script]
2279        args += list(map(str, pass_fds))
2280
2281        p = subprocess.Popen(args,
2282                             stdout=subprocess.PIPE, close_fds=True,
2283                             pass_fds=pass_fds)
2284        output, ignored = p.communicate()
2285        fds = set(map(int, output.split(b',')))
2286
2287        # the inheritable file descriptor must be inherited, so its inheritable
2288        # flag must be set in the child process after fork() and before exec()
2289        self.assertEqual(fds, set(pass_fds), "output=%a" % output)
2290
2291        # inheritable flag must not be changed in the parent process
2292        self.assertEqual(os.get_inheritable(inheritable), True)
2293        self.assertEqual(os.get_inheritable(non_inheritable), False)
2294
2295    def test_stdout_stdin_are_single_inout_fd(self):
2296        with io.open(os.devnull, "r+") as inout:
2297            p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
2298                                 stdout=inout, stdin=inout)
2299            p.wait()
2300
2301    def test_stdout_stderr_are_single_inout_fd(self):
2302        with io.open(os.devnull, "r+") as inout:
2303            p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
2304                                 stdout=inout, stderr=inout)
2305            p.wait()
2306
2307    def test_stderr_stdin_are_single_inout_fd(self):
2308        with io.open(os.devnull, "r+") as inout:
2309            p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
2310                                 stderr=inout, stdin=inout)
2311            p.wait()
2312
2313    def test_wait_when_sigchild_ignored(self):
2314        # NOTE: sigchild_ignore.py may not be an effective test on all OSes.
2315        sigchild_ignore = support.findfile("sigchild_ignore.py",
2316                                           subdir="subprocessdata")
2317        p = subprocess.Popen([sys.executable, sigchild_ignore],
2318                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
2319        stdout, stderr = p.communicate()
2320        self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"
2321                         " non-zero with this error:\n%s" %
2322                         stderr.decode('utf-8'))
2323
2324    def test_select_unbuffered(self):
2325        # Issue #11459: bufsize=0 should really set the pipes as
2326        # unbuffered (and therefore let select() work properly).
2327        select = support.import_module("select")
2328        p = subprocess.Popen([sys.executable, "-c",
2329                              'import sys;'
2330                              'sys.stdout.write("apple")'],
2331                             stdout=subprocess.PIPE,
2332                             bufsize=0)
2333        f = p.stdout
2334        self.addCleanup(f.close)
2335        try:
2336            self.assertEqual(f.read(4), b"appl")
2337            self.assertIn(f, select.select([f], [], [], 0.0)[0])
2338        finally:
2339            p.wait()
2340
2341    def test_zombie_fast_process_del(self):
2342        # Issue #12650: on Unix, if Popen.__del__() was called before the
2343        # process exited, it wouldn't be added to subprocess._active, and would
2344        # remain a zombie.
2345        # spawn a Popen, and delete its reference before it exits
2346        p = subprocess.Popen([sys.executable, "-c",
2347                              'import sys, time;'
2348                              'time.sleep(0.2)'],
2349                             stdout=subprocess.PIPE,
2350                             stderr=subprocess.PIPE)
2351        self.addCleanup(p.stdout.close)
2352        self.addCleanup(p.stderr.close)
2353        ident = id(p)
2354        pid = p.pid
2355        with support.check_warnings(('', ResourceWarning)):
2356            p = None
2357
2358        # check that p is in the active processes list
2359        self.assertIn(ident, [id(o) for o in subprocess._active])
2360
2361    def test_leak_fast_process_del_killed(self):
2362        # Issue #12650: on Unix, if Popen.__del__() was called before the
2363        # process exited, and the process got killed by a signal, it would never
2364        # be removed from subprocess._active, which triggered a FD and memory
2365        # leak.
2366        # spawn a Popen, delete its reference and kill it
2367        p = subprocess.Popen([sys.executable, "-c",
2368                              'import time;'
2369                              'time.sleep(3)'],
2370                             stdout=subprocess.PIPE,
2371                             stderr=subprocess.PIPE)
2372        self.addCleanup(p.stdout.close)
2373        self.addCleanup(p.stderr.close)
2374        ident = id(p)
2375        pid = p.pid
2376        with support.check_warnings(('', ResourceWarning)):
2377            p = None
2378
2379        os.kill(pid, signal.SIGKILL)
2380        # check that p is in the active processes list
2381        self.assertIn(ident, [id(o) for o in subprocess._active])
2382
2383        # let some time for the process to exit, and create a new Popen: this
2384        # should trigger the wait() of p
2385        time.sleep(0.2)
2386        with self.assertRaises(OSError) as c:
2387            with subprocess.Popen(['nonexisting_i_hope'],
2388                                  stdout=subprocess.PIPE,
2389                                  stderr=subprocess.PIPE) as proc:
2390                pass
2391        # p should have been wait()ed on, and removed from the _active list
2392        self.assertRaises(OSError, os.waitpid, pid, 0)
2393        self.assertNotIn(ident, [id(o) for o in subprocess._active])
2394
2395    def test_close_fds_after_preexec(self):
2396        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
2397
2398        # this FD is used as dup2() target by preexec_fn, and should be closed
2399        # in the child process
2400        fd = os.dup(1)
2401        self.addCleanup(os.close, fd)
2402
2403        p = subprocess.Popen([sys.executable, fd_status],
2404                             stdout=subprocess.PIPE, close_fds=True,
2405                             preexec_fn=lambda: os.dup2(1, fd))
2406        output, ignored = p.communicate()
2407
2408        remaining_fds = set(map(int, output.split(b',')))
2409
2410        self.assertNotIn(fd, remaining_fds)
2411
2412    @support.cpython_only
2413    def test_fork_exec(self):
2414        # Issue #22290: fork_exec() must not crash on memory allocation failure
2415        # or other errors
2416        import _posixsubprocess
2417        gc_enabled = gc.isenabled()
2418        try:
2419            # Use a preexec function and enable the garbage collector
2420            # to force fork_exec() to re-enable the garbage collector
2421            # on error.
2422            func = lambda: None
2423            gc.enable()
2424
2425            for args, exe_list, cwd, env_list in (
2426                (123,      [b"exe"], None, [b"env"]),
2427                ([b"arg"], 123,      None, [b"env"]),
2428                ([b"arg"], [b"exe"], 123,  [b"env"]),
2429                ([b"arg"], [b"exe"], None, 123),
2430            ):
2431                with self.assertRaises(TypeError):
2432                    _posixsubprocess.fork_exec(
2433                        args, exe_list,
2434                        True, [], cwd, env_list,
2435                        -1, -1, -1, -1,
2436                        1, 2, 3, 4,
2437                        True, True, func)
2438        finally:
2439            if not gc_enabled:
2440                gc.disable()
2441
2442    @support.cpython_only
2443    def test_fork_exec_sorted_fd_sanity_check(self):
2444        # Issue #23564: sanity check the fork_exec() fds_to_keep sanity check.
2445        import _posixsubprocess
2446        gc_enabled = gc.isenabled()
2447        try:
2448            gc.enable()
2449
2450            for fds_to_keep in (
2451                (-1, 2, 3, 4, 5),  # Negative number.
2452                ('str', 4),  # Not an int.
2453                (18, 23, 42, 2**63),  # Out of range.
2454                (5, 4),  # Not sorted.
2455                (6, 7, 7, 8),  # Duplicate.
2456            ):
2457                with self.assertRaises(
2458                        ValueError,
2459                        msg='fds_to_keep={}'.format(fds_to_keep)) as c:
2460                    _posixsubprocess.fork_exec(
2461                        [b"false"], [b"false"],
2462                        True, fds_to_keep, None, [b"env"],
2463                        -1, -1, -1, -1,
2464                        1, 2, 3, 4,
2465                        True, True, None)
2466                self.assertIn('fds_to_keep', str(c.exception))
2467        finally:
2468            if not gc_enabled:
2469                gc.disable()
2470
2471    def test_communicate_BrokenPipeError_stdin_close(self):
2472        # By not setting stdout or stderr or a timeout we force the fast path
2473        # that just calls _stdin_write() internally due to our mock.
2474        proc = subprocess.Popen([sys.executable, '-c', 'pass'])
2475        with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
2476            mock_proc_stdin.close.side_effect = BrokenPipeError
2477            proc.communicate()  # Should swallow BrokenPipeError from close.
2478            mock_proc_stdin.close.assert_called_with()
2479
2480    def test_communicate_BrokenPipeError_stdin_write(self):
2481        # By not setting stdout or stderr or a timeout we force the fast path
2482        # that just calls _stdin_write() internally due to our mock.
2483        proc = subprocess.Popen([sys.executable, '-c', 'pass'])
2484        with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
2485            mock_proc_stdin.write.side_effect = BrokenPipeError
2486            proc.communicate(b'stuff')  # Should swallow the BrokenPipeError.
2487            mock_proc_stdin.write.assert_called_once_with(b'stuff')
2488            mock_proc_stdin.close.assert_called_once_with()
2489
2490    def test_communicate_BrokenPipeError_stdin_flush(self):
2491        # Setting stdin and stdout forces the ._communicate() code path.
2492        # python -h exits faster than python -c pass (but spams stdout).
2493        proc = subprocess.Popen([sys.executable, '-h'],
2494                                stdin=subprocess.PIPE,
2495                                stdout=subprocess.PIPE)
2496        with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin, \
2497                open(os.devnull, 'wb') as dev_null:
2498            mock_proc_stdin.flush.side_effect = BrokenPipeError
2499            # because _communicate registers a selector using proc.stdin...
2500            mock_proc_stdin.fileno.return_value = dev_null.fileno()
2501            # _communicate() should swallow BrokenPipeError from flush.
2502            proc.communicate(b'stuff')
2503            mock_proc_stdin.flush.assert_called_once_with()
2504
2505    def test_communicate_BrokenPipeError_stdin_close_with_timeout(self):
2506        # Setting stdin and stdout forces the ._communicate() code path.
2507        # python -h exits faster than python -c pass (but spams stdout).
2508        proc = subprocess.Popen([sys.executable, '-h'],
2509                                stdin=subprocess.PIPE,
2510                                stdout=subprocess.PIPE)
2511        with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
2512            mock_proc_stdin.close.side_effect = BrokenPipeError
2513            # _communicate() should swallow BrokenPipeError from close.
2514            proc.communicate(timeout=999)
2515            mock_proc_stdin.close.assert_called_once_with()
2516
2517    _libc_file_extensions = {
2518      'Linux': 'so.6',
2519      'Darwin': 'dylib',
2520    }
2521    @unittest.skipIf(not ctypes, 'ctypes module required.')
2522    @unittest.skipIf(platform.uname()[0] not in _libc_file_extensions,
2523                     'Test requires a libc this code can load with ctypes.')
2524    @unittest.skipIf(not sys.executable, 'Test requires sys.executable.')
2525    def test_child_terminated_in_stopped_state(self):
2526        """Test wait() behavior when waitpid returns WIFSTOPPED; issue29335."""
2527        PTRACE_TRACEME = 0  # From glibc and MacOS (PT_TRACE_ME).
2528        libc_name = 'libc.' + self._libc_file_extensions[platform.uname()[0]]
2529        libc = ctypes.CDLL(libc_name)
2530        if not hasattr(libc, 'ptrace'):
2531            raise unittest.SkipTest('ptrace() required.')
2532        test_ptrace = subprocess.Popen(
2533            [sys.executable, '-c', """if True:
2534             import ctypes
2535             libc = ctypes.CDLL({libc_name!r})
2536             libc.ptrace({PTRACE_TRACEME}, 0, 0)
2537             """.format(libc_name=libc_name, PTRACE_TRACEME=PTRACE_TRACEME)
2538            ])
2539        if test_ptrace.wait() != 0:
2540            raise unittest.SkipTest('ptrace() failed - unable to test.')
2541        child = subprocess.Popen(
2542            [sys.executable, '-c', """if True:
2543             import ctypes
2544             libc = ctypes.CDLL({libc_name!r})
2545             libc.ptrace({PTRACE_TRACEME}, 0, 0)
2546             libc.printf(ctypes.c_char_p(0xdeadbeef))  # Crash the process.
2547             """.format(libc_name=libc_name, PTRACE_TRACEME=PTRACE_TRACEME)
2548            ])
2549        try:
2550            returncode = child.wait()
2551        except Exception as e:
2552            child.kill()  # Clean up the hung stopped process.
2553            raise e
2554        self.assertNotEqual(0, returncode)
2555        self.assertLess(returncode, 0)  # signal death, likely SIGSEGV.
2556
2557
2558@unittest.skipUnless(mswindows, "Windows specific tests")
2559class Win32ProcessTestCase(BaseTestCase):
2560
2561    def test_startupinfo(self):
2562        # startupinfo argument
2563        # We uses hardcoded constants, because we do not want to
2564        # depend on win32all.
2565        STARTF_USESHOWWINDOW = 1
2566        SW_MAXIMIZE = 3
2567        startupinfo = subprocess.STARTUPINFO()
2568        startupinfo.dwFlags = STARTF_USESHOWWINDOW
2569        startupinfo.wShowWindow = SW_MAXIMIZE
2570        # Since Python is a console process, it won't be affected
2571        # by wShowWindow, but the argument should be silently
2572        # ignored
2573        subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
2574                        startupinfo=startupinfo)
2575
2576    def test_creationflags(self):
2577        # creationflags argument
2578        CREATE_NEW_CONSOLE = 16
2579        sys.stderr.write("    a DOS box should flash briefly ...\n")
2580        subprocess.call(sys.executable +
2581                        ' -c "import time; time.sleep(0.25)"',
2582                        creationflags=CREATE_NEW_CONSOLE)
2583
2584    def test_invalid_args(self):
2585        # invalid arguments should raise ValueError
2586        self.assertRaises(ValueError, subprocess.call,
2587                          [sys.executable, "-c",
2588                           "import sys; sys.exit(47)"],
2589                          preexec_fn=lambda: 1)
2590        self.assertRaises(ValueError, subprocess.call,
2591                          [sys.executable, "-c",
2592                           "import sys; sys.exit(47)"],
2593                          stdout=subprocess.PIPE,
2594                          close_fds=True)
2595
2596    def test_close_fds(self):
2597        # close file descriptors
2598        rc = subprocess.call([sys.executable, "-c",
2599                              "import sys; sys.exit(47)"],
2600                              close_fds=True)
2601        self.assertEqual(rc, 47)
2602
2603    def test_shell_sequence(self):
2604        # Run command through the shell (sequence)
2605        newenv = os.environ.copy()
2606        newenv["FRUIT"] = "physalis"
2607        p = subprocess.Popen(["set"], shell=1,
2608                             stdout=subprocess.PIPE,
2609                             env=newenv)
2610        with p:
2611            self.assertIn(b"physalis", p.stdout.read())
2612
2613    def test_shell_string(self):
2614        # Run command through the shell (string)
2615        newenv = os.environ.copy()
2616        newenv["FRUIT"] = "physalis"
2617        p = subprocess.Popen("set", shell=1,
2618                             stdout=subprocess.PIPE,
2619                             env=newenv)
2620        with p:
2621            self.assertIn(b"physalis", p.stdout.read())
2622
2623    def test_shell_encodings(self):
2624        # Run command through the shell (string)
2625        for enc in ['ansi', 'oem']:
2626            newenv = os.environ.copy()
2627            newenv["FRUIT"] = "physalis"
2628            p = subprocess.Popen("set", shell=1,
2629                                 stdout=subprocess.PIPE,
2630                                 env=newenv,
2631                                 encoding=enc)
2632            with p:
2633                self.assertIn("physalis", p.stdout.read(), enc)
2634
2635    def test_call_string(self):
2636        # call() function with string argument on Windows
2637        rc = subprocess.call(sys.executable +
2638                             ' -c "import sys; sys.exit(47)"')
2639        self.assertEqual(rc, 47)
2640
2641    def _kill_process(self, method, *args):
2642        # Some win32 buildbot raises EOFError if stdin is inherited
2643        p = subprocess.Popen([sys.executable, "-c", """if 1:
2644                             import sys, time
2645                             sys.stdout.write('x\\n')
2646                             sys.stdout.flush()
2647                             time.sleep(30)
2648                             """],
2649                             stdin=subprocess.PIPE,
2650                             stdout=subprocess.PIPE,
2651                             stderr=subprocess.PIPE)
2652        with p:
2653            # Wait for the interpreter to be completely initialized before
2654            # sending any signal.
2655            p.stdout.read(1)
2656            getattr(p, method)(*args)
2657            _, stderr = p.communicate()
2658            self.assertStderrEqual(stderr, b'')
2659            returncode = p.wait()
2660        self.assertNotEqual(returncode, 0)
2661
2662    def _kill_dead_process(self, method, *args):
2663        p = subprocess.Popen([sys.executable, "-c", """if 1:
2664                             import sys, time
2665                             sys.stdout.write('x\\n')
2666                             sys.stdout.flush()
2667                             sys.exit(42)
2668                             """],
2669                             stdin=subprocess.PIPE,
2670                             stdout=subprocess.PIPE,
2671                             stderr=subprocess.PIPE)
2672        with p:
2673            # Wait for the interpreter to be completely initialized before
2674            # sending any signal.
2675            p.stdout.read(1)
2676            # The process should end after this
2677            time.sleep(1)
2678            # This shouldn't raise even though the child is now dead
2679            getattr(p, method)(*args)
2680            _, stderr = p.communicate()
2681            self.assertStderrEqual(stderr, b'')
2682            rc = p.wait()
2683        self.assertEqual(rc, 42)
2684
2685    def test_send_signal(self):
2686        self._kill_process('send_signal', signal.SIGTERM)
2687
2688    def test_kill(self):
2689        self._kill_process('kill')
2690
2691    def test_terminate(self):
2692        self._kill_process('terminate')
2693
2694    def test_send_signal_dead(self):
2695        self._kill_dead_process('send_signal', signal.SIGTERM)
2696
2697    def test_kill_dead(self):
2698        self._kill_dead_process('kill')
2699
2700    def test_terminate_dead(self):
2701        self._kill_dead_process('terminate')
2702
2703class MiscTests(unittest.TestCase):
2704    def test_getoutput(self):
2705        self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy')
2706        self.assertEqual(subprocess.getstatusoutput('echo xyzzy'),
2707                         (0, 'xyzzy'))
2708
2709        # we use mkdtemp in the next line to create an empty directory
2710        # under our exclusive control; from that, we can invent a pathname
2711        # that we _know_ won't exist.  This is guaranteed to fail.
2712        dir = None
2713        try:
2714            dir = tempfile.mkdtemp()
2715            name = os.path.join(dir, "foo")
2716            status, output = subprocess.getstatusoutput(
2717                ("type " if mswindows else "cat ") + name)
2718            self.assertNotEqual(status, 0)
2719        finally:
2720            if dir is not None:
2721                os.rmdir(dir)
2722
2723    def test__all__(self):
2724        """Ensure that __all__ is populated properly."""
2725        intentionally_excluded = {"list2cmdline", "Handle"}
2726        exported = set(subprocess.__all__)
2727        possible_exports = set()
2728        import types
2729        for name, value in subprocess.__dict__.items():
2730            if name.startswith('_'):
2731                continue
2732            if isinstance(value, (types.ModuleType,)):
2733                continue
2734            possible_exports.add(name)
2735        self.assertEqual(exported, possible_exports - intentionally_excluded)
2736
2737
2738@unittest.skipUnless(hasattr(selectors, 'PollSelector'),
2739                     "Test needs selectors.PollSelector")
2740class ProcessTestCaseNoPoll(ProcessTestCase):
2741    def setUp(self):
2742        self.orig_selector = subprocess._PopenSelector
2743        subprocess._PopenSelector = selectors.SelectSelector
2744        ProcessTestCase.setUp(self)
2745
2746    def tearDown(self):
2747        subprocess._PopenSelector = self.orig_selector
2748        ProcessTestCase.tearDown(self)
2749
2750
2751@unittest.skipUnless(mswindows, "Windows-specific tests")
2752class CommandsWithSpaces (BaseTestCase):
2753
2754    def setUp(self):
2755        super().setUp()
2756        f, fname = tempfile.mkstemp(".py", "te st")
2757        self.fname = fname.lower ()
2758        os.write(f, b"import sys;"
2759                    b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))"
2760        )
2761        os.close(f)
2762
2763    def tearDown(self):
2764        os.remove(self.fname)
2765        super().tearDown()
2766
2767    def with_spaces(self, *args, **kwargs):
2768        kwargs['stdout'] = subprocess.PIPE
2769        p = subprocess.Popen(*args, **kwargs)
2770        with p:
2771            self.assertEqual(
2772              p.stdout.read ().decode("mbcs"),
2773              "2 [%r, 'ab cd']" % self.fname
2774            )
2775
2776    def test_shell_string_with_spaces(self):
2777        # call() function with string argument with spaces on Windows
2778        self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
2779                                             "ab cd"), shell=1)
2780
2781    def test_shell_sequence_with_spaces(self):
2782        # call() function with sequence argument with spaces on Windows
2783        self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1)
2784
2785    def test_noshell_string_with_spaces(self):
2786        # call() function with string argument with spaces on Windows
2787        self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
2788                             "ab cd"))
2789
2790    def test_noshell_sequence_with_spaces(self):
2791        # call() function with sequence argument with spaces on Windows
2792        self.with_spaces([sys.executable, self.fname, "ab cd"])
2793
2794
2795class ContextManagerTests(BaseTestCase):
2796
2797    def test_pipe(self):
2798        with subprocess.Popen([sys.executable, "-c",
2799                               "import sys;"
2800                               "sys.stdout.write('stdout');"
2801                               "sys.stderr.write('stderr');"],
2802                              stdout=subprocess.PIPE,
2803                              stderr=subprocess.PIPE) as proc:
2804            self.assertEqual(proc.stdout.read(), b"stdout")
2805            self.assertStderrEqual(proc.stderr.read(), b"stderr")
2806
2807        self.assertTrue(proc.stdout.closed)
2808        self.assertTrue(proc.stderr.closed)
2809
2810    def test_returncode(self):
2811        with subprocess.Popen([sys.executable, "-c",
2812                               "import sys; sys.exit(100)"]) as proc:
2813            pass
2814        # __exit__ calls wait(), so the returncode should be set
2815        self.assertEqual(proc.returncode, 100)
2816
2817    def test_communicate_stdin(self):
2818        with subprocess.Popen([sys.executable, "-c",
2819                              "import sys;"
2820                              "sys.exit(sys.stdin.read() == 'context')"],
2821                             stdin=subprocess.PIPE) as proc:
2822            proc.communicate(b"context")
2823            self.assertEqual(proc.returncode, 1)
2824
2825    def test_invalid_args(self):
2826        with self.assertRaises((FileNotFoundError, PermissionError)) as c:
2827            with subprocess.Popen(['nonexisting_i_hope'],
2828                                  stdout=subprocess.PIPE,
2829                                  stderr=subprocess.PIPE) as proc:
2830                pass
2831
2832    def test_broken_pipe_cleanup(self):
2833        """Broken pipe error should not prevent wait() (Issue 21619)"""
2834        proc = subprocess.Popen([sys.executable, '-c', 'pass'],
2835                                stdin=subprocess.PIPE,
2836                                bufsize=support.PIPE_MAX_SIZE*2)
2837        proc = proc.__enter__()
2838        # Prepare to send enough data to overflow any OS pipe buffering and
2839        # guarantee a broken pipe error. Data is held in BufferedWriter
2840        # buffer until closed.
2841        proc.stdin.write(b'x' * support.PIPE_MAX_SIZE)
2842        self.assertIsNone(proc.returncode)
2843        # EPIPE expected under POSIX; EINVAL under Windows
2844        self.assertRaises(OSError, proc.__exit__, None, None, None)
2845        self.assertEqual(proc.returncode, 0)
2846        self.assertTrue(proc.stdin.closed)
2847
2848
2849if __name__ == "__main__":
2850    unittest.main()
2851