• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import os
2import signal
3import sys
4import unittest
5import warnings
6from unittest import mock
7
8import asyncio
9from asyncio import base_subprocess
10from asyncio import subprocess
11from test.test_asyncio import utils as test_utils
12from test import support
13from test.support import os_helper
14
15if sys.platform != 'win32':
16    from asyncio import unix_events
17
18# Program blocking
19PROGRAM_BLOCKED = [sys.executable, '-c', 'import time; time.sleep(3600)']
20
21# Program copying input to output
22PROGRAM_CAT = [
23    sys.executable, '-c',
24    ';'.join(('import sys',
25              'data = sys.stdin.buffer.read()',
26              'sys.stdout.buffer.write(data)'))]
27
28
29def tearDownModule():
30    asyncio.set_event_loop_policy(None)
31
32
33class TestSubprocessTransport(base_subprocess.BaseSubprocessTransport):
34    def _start(self, *args, **kwargs):
35        self._proc = mock.Mock()
36        self._proc.stdin = None
37        self._proc.stdout = None
38        self._proc.stderr = None
39        self._proc.pid = -1
40
41
42class SubprocessTransportTests(test_utils.TestCase):
43    def setUp(self):
44        super().setUp()
45        self.loop = self.new_test_loop()
46        self.set_event_loop(self.loop)
47
48    def create_transport(self, waiter=None):
49        protocol = mock.Mock()
50        protocol.connection_made._is_coroutine = False
51        protocol.process_exited._is_coroutine = False
52        transport = TestSubprocessTransport(
53                        self.loop, protocol, ['test'], False,
54                        None, None, None, 0, waiter=waiter)
55        return (transport, protocol)
56
57    def test_proc_exited(self):
58        waiter = self.loop.create_future()
59        transport, protocol = self.create_transport(waiter)
60        transport._process_exited(6)
61        self.loop.run_until_complete(waiter)
62
63        self.assertEqual(transport.get_returncode(), 6)
64
65        self.assertTrue(protocol.connection_made.called)
66        self.assertTrue(protocol.process_exited.called)
67        self.assertTrue(protocol.connection_lost.called)
68        self.assertEqual(protocol.connection_lost.call_args[0], (None,))
69
70        self.assertFalse(transport.is_closing())
71        self.assertIsNone(transport._loop)
72        self.assertIsNone(transport._proc)
73        self.assertIsNone(transport._protocol)
74
75        # methods must raise ProcessLookupError if the process exited
76        self.assertRaises(ProcessLookupError,
77                          transport.send_signal, signal.SIGTERM)
78        self.assertRaises(ProcessLookupError, transport.terminate)
79        self.assertRaises(ProcessLookupError, transport.kill)
80
81        transport.close()
82
83    def test_subprocess_repr(self):
84        waiter = self.loop.create_future()
85        transport, protocol = self.create_transport(waiter)
86        transport._process_exited(6)
87        self.loop.run_until_complete(waiter)
88
89        self.assertEqual(
90            repr(transport),
91            "<TestSubprocessTransport pid=-1 returncode=6>"
92        )
93        transport._returncode = None
94        self.assertEqual(
95            repr(transport),
96            "<TestSubprocessTransport pid=-1 running>"
97        )
98        transport._pid = None
99        transport._returncode = None
100        self.assertEqual(
101            repr(transport),
102            "<TestSubprocessTransport not started>"
103        )
104        transport.close()
105
106
107class SubprocessMixin:
108
109    def test_stdin_stdout(self):
110        args = PROGRAM_CAT
111
112        async def run(data):
113            proc = await asyncio.create_subprocess_exec(
114                *args,
115                stdin=subprocess.PIPE,
116                stdout=subprocess.PIPE,
117            )
118
119            # feed data
120            proc.stdin.write(data)
121            await proc.stdin.drain()
122            proc.stdin.close()
123
124            # get output and exitcode
125            data = await proc.stdout.read()
126            exitcode = await proc.wait()
127            return (exitcode, data)
128
129        task = run(b'some data')
130        task = asyncio.wait_for(task, 60.0)
131        exitcode, stdout = self.loop.run_until_complete(task)
132        self.assertEqual(exitcode, 0)
133        self.assertEqual(stdout, b'some data')
134
135    def test_communicate(self):
136        args = PROGRAM_CAT
137
138        async def run(data):
139            proc = await asyncio.create_subprocess_exec(
140                *args,
141                stdin=subprocess.PIPE,
142                stdout=subprocess.PIPE,
143            )
144            stdout, stderr = await proc.communicate(data)
145            return proc.returncode, stdout
146
147        task = run(b'some data')
148        task = asyncio.wait_for(task, support.LONG_TIMEOUT)
149        exitcode, stdout = self.loop.run_until_complete(task)
150        self.assertEqual(exitcode, 0)
151        self.assertEqual(stdout, b'some data')
152
153    def test_shell(self):
154        proc = self.loop.run_until_complete(
155            asyncio.create_subprocess_shell('exit 7')
156        )
157        exitcode = self.loop.run_until_complete(proc.wait())
158        self.assertEqual(exitcode, 7)
159
160    def test_start_new_session(self):
161        # start the new process in a new session
162        proc = self.loop.run_until_complete(
163            asyncio.create_subprocess_shell(
164                'exit 8',
165                start_new_session=True,
166            )
167        )
168        exitcode = self.loop.run_until_complete(proc.wait())
169        self.assertEqual(exitcode, 8)
170
171    def test_kill(self):
172        args = PROGRAM_BLOCKED
173        proc = self.loop.run_until_complete(
174            asyncio.create_subprocess_exec(*args)
175        )
176        proc.kill()
177        returncode = self.loop.run_until_complete(proc.wait())
178        if sys.platform == 'win32':
179            self.assertIsInstance(returncode, int)
180            # expect 1 but sometimes get 0
181        else:
182            self.assertEqual(-signal.SIGKILL, returncode)
183
184    def test_terminate(self):
185        args = PROGRAM_BLOCKED
186        proc = self.loop.run_until_complete(
187            asyncio.create_subprocess_exec(*args)
188        )
189        proc.terminate()
190        returncode = self.loop.run_until_complete(proc.wait())
191        if sys.platform == 'win32':
192            self.assertIsInstance(returncode, int)
193            # expect 1 but sometimes get 0
194        else:
195            self.assertEqual(-signal.SIGTERM, returncode)
196
197    @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
198    def test_send_signal(self):
199        # bpo-31034: Make sure that we get the default signal handler (killing
200        # the process). The parent process may have decided to ignore SIGHUP,
201        # and signal handlers are inherited.
202        old_handler = signal.signal(signal.SIGHUP, signal.SIG_DFL)
203        try:
204            code = 'import time; print("sleeping", flush=True); time.sleep(3600)'
205            args = [sys.executable, '-c', code]
206            proc = self.loop.run_until_complete(
207                asyncio.create_subprocess_exec(
208                    *args,
209                    stdout=subprocess.PIPE,
210                )
211            )
212
213            async def send_signal(proc):
214                # basic synchronization to wait until the program is sleeping
215                line = await proc.stdout.readline()
216                self.assertEqual(line, b'sleeping\n')
217
218                proc.send_signal(signal.SIGHUP)
219                returncode = await proc.wait()
220                return returncode
221
222            returncode = self.loop.run_until_complete(send_signal(proc))
223            self.assertEqual(-signal.SIGHUP, returncode)
224        finally:
225            signal.signal(signal.SIGHUP, old_handler)
226
227    def prepare_broken_pipe_test(self):
228        # buffer large enough to feed the whole pipe buffer
229        large_data = b'x' * support.PIPE_MAX_SIZE
230
231        # the program ends before the stdin can be fed
232        proc = self.loop.run_until_complete(
233            asyncio.create_subprocess_exec(
234                sys.executable, '-c', 'pass',
235                stdin=subprocess.PIPE,
236            )
237        )
238
239        return (proc, large_data)
240
241    def test_stdin_broken_pipe(self):
242        proc, large_data = self.prepare_broken_pipe_test()
243
244        async def write_stdin(proc, data):
245            await asyncio.sleep(0.5)
246            proc.stdin.write(data)
247            await proc.stdin.drain()
248
249        coro = write_stdin(proc, large_data)
250        # drain() must raise BrokenPipeError or ConnectionResetError
251        with test_utils.disable_logger():
252            self.assertRaises((BrokenPipeError, ConnectionResetError),
253                              self.loop.run_until_complete, coro)
254        self.loop.run_until_complete(proc.wait())
255
256    def test_communicate_ignore_broken_pipe(self):
257        proc, large_data = self.prepare_broken_pipe_test()
258
259        # communicate() must ignore BrokenPipeError when feeding stdin
260        self.loop.set_exception_handler(lambda loop, msg: None)
261        self.loop.run_until_complete(proc.communicate(large_data))
262        self.loop.run_until_complete(proc.wait())
263
264    def test_pause_reading(self):
265        limit = 10
266        size = (limit * 2 + 1)
267
268        async def test_pause_reading():
269            code = '\n'.join((
270                'import sys',
271                'sys.stdout.write("x" * %s)' % size,
272                'sys.stdout.flush()',
273            ))
274
275            connect_read_pipe = self.loop.connect_read_pipe
276
277            async def connect_read_pipe_mock(*args, **kw):
278                transport, protocol = await connect_read_pipe(*args, **kw)
279                transport.pause_reading = mock.Mock()
280                transport.resume_reading = mock.Mock()
281                return (transport, protocol)
282
283            self.loop.connect_read_pipe = connect_read_pipe_mock
284
285            proc = await asyncio.create_subprocess_exec(
286                sys.executable, '-c', code,
287                stdin=asyncio.subprocess.PIPE,
288                stdout=asyncio.subprocess.PIPE,
289                limit=limit,
290            )
291            stdout_transport = proc._transport.get_pipe_transport(1)
292
293            stdout, stderr = await proc.communicate()
294
295            # The child process produced more than limit bytes of output,
296            # the stream reader transport should pause the protocol to not
297            # allocate too much memory.
298            return (stdout, stdout_transport)
299
300        # Issue #22685: Ensure that the stream reader pauses the protocol
301        # when the child process produces too much data
302        stdout, transport = self.loop.run_until_complete(test_pause_reading())
303
304        self.assertEqual(stdout, b'x' * size)
305        self.assertTrue(transport.pause_reading.called)
306        self.assertTrue(transport.resume_reading.called)
307
308    def test_stdin_not_inheritable(self):
309        # asyncio issue #209: stdin must not be inheritable, otherwise
310        # the Process.communicate() hangs
311        async def len_message(message):
312            code = 'import sys; data = sys.stdin.read(); print(len(data))'
313            proc = await asyncio.create_subprocess_exec(
314                sys.executable, '-c', code,
315                stdin=asyncio.subprocess.PIPE,
316                stdout=asyncio.subprocess.PIPE,
317                stderr=asyncio.subprocess.PIPE,
318                close_fds=False,
319            )
320            stdout, stderr = await proc.communicate(message)
321            exitcode = await proc.wait()
322            return (stdout, exitcode)
323
324        output, exitcode = self.loop.run_until_complete(len_message(b'abc'))
325        self.assertEqual(output.rstrip(), b'3')
326        self.assertEqual(exitcode, 0)
327
328    def test_empty_input(self):
329
330        async def empty_input():
331            code = 'import sys; data = sys.stdin.read(); print(len(data))'
332            proc = await asyncio.create_subprocess_exec(
333                sys.executable, '-c', code,
334                stdin=asyncio.subprocess.PIPE,
335                stdout=asyncio.subprocess.PIPE,
336                stderr=asyncio.subprocess.PIPE,
337                close_fds=False,
338            )
339            stdout, stderr = await proc.communicate(b'')
340            exitcode = await proc.wait()
341            return (stdout, exitcode)
342
343        output, exitcode = self.loop.run_until_complete(empty_input())
344        self.assertEqual(output.rstrip(), b'0')
345        self.assertEqual(exitcode, 0)
346
347    def test_devnull_input(self):
348
349        async def empty_input():
350            code = 'import sys; data = sys.stdin.read(); print(len(data))'
351            proc = await asyncio.create_subprocess_exec(
352                sys.executable, '-c', code,
353                stdin=asyncio.subprocess.DEVNULL,
354                stdout=asyncio.subprocess.PIPE,
355                stderr=asyncio.subprocess.PIPE,
356                close_fds=False,
357            )
358            stdout, stderr = await proc.communicate()
359            exitcode = await proc.wait()
360            return (stdout, exitcode)
361
362        output, exitcode = self.loop.run_until_complete(empty_input())
363        self.assertEqual(output.rstrip(), b'0')
364        self.assertEqual(exitcode, 0)
365
366    def test_devnull_output(self):
367
368        async def empty_output():
369            code = 'import sys; data = sys.stdin.read(); print(len(data))'
370            proc = await asyncio.create_subprocess_exec(
371                sys.executable, '-c', code,
372                stdin=asyncio.subprocess.PIPE,
373                stdout=asyncio.subprocess.DEVNULL,
374                stderr=asyncio.subprocess.PIPE,
375                close_fds=False,
376            )
377            stdout, stderr = await proc.communicate(b"abc")
378            exitcode = await proc.wait()
379            return (stdout, exitcode)
380
381        output, exitcode = self.loop.run_until_complete(empty_output())
382        self.assertEqual(output, None)
383        self.assertEqual(exitcode, 0)
384
385    def test_devnull_error(self):
386
387        async def empty_error():
388            code = 'import sys; data = sys.stdin.read(); print(len(data))'
389            proc = await asyncio.create_subprocess_exec(
390                sys.executable, '-c', code,
391                stdin=asyncio.subprocess.PIPE,
392                stdout=asyncio.subprocess.PIPE,
393                stderr=asyncio.subprocess.DEVNULL,
394                close_fds=False,
395            )
396            stdout, stderr = await proc.communicate(b"abc")
397            exitcode = await proc.wait()
398            return (stderr, exitcode)
399
400        output, exitcode = self.loop.run_until_complete(empty_error())
401        self.assertEqual(output, None)
402        self.assertEqual(exitcode, 0)
403
404    def test_cancel_process_wait(self):
405        # Issue #23140: cancel Process.wait()
406
407        async def cancel_wait():
408            proc = await asyncio.create_subprocess_exec(*PROGRAM_BLOCKED)
409
410            # Create an internal future waiting on the process exit
411            task = self.loop.create_task(proc.wait())
412            self.loop.call_soon(task.cancel)
413            try:
414                await task
415            except asyncio.CancelledError:
416                pass
417
418            # Cancel the future
419            task.cancel()
420
421            # Kill the process and wait until it is done
422            proc.kill()
423            await proc.wait()
424
425        self.loop.run_until_complete(cancel_wait())
426
427    def test_cancel_make_subprocess_transport_exec(self):
428
429        async def cancel_make_transport():
430            coro = asyncio.create_subprocess_exec(*PROGRAM_BLOCKED)
431            task = self.loop.create_task(coro)
432
433            self.loop.call_soon(task.cancel)
434            try:
435                await task
436            except asyncio.CancelledError:
437                pass
438
439        # ignore the log:
440        # "Exception during subprocess creation, kill the subprocess"
441        with test_utils.disable_logger():
442            self.loop.run_until_complete(cancel_make_transport())
443
444    def test_cancel_post_init(self):
445
446        async def cancel_make_transport():
447            coro = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
448                                             *PROGRAM_BLOCKED)
449            task = self.loop.create_task(coro)
450
451            self.loop.call_soon(task.cancel)
452            try:
453                await task
454            except asyncio.CancelledError:
455                pass
456
457        # ignore the log:
458        # "Exception during subprocess creation, kill the subprocess"
459        with test_utils.disable_logger():
460            self.loop.run_until_complete(cancel_make_transport())
461            test_utils.run_briefly(self.loop)
462
463    def test_close_kill_running(self):
464
465        async def kill_running():
466            create = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
467                                               *PROGRAM_BLOCKED)
468            transport, protocol = await create
469
470            kill_called = False
471            def kill():
472                nonlocal kill_called
473                kill_called = True
474                orig_kill()
475
476            proc = transport.get_extra_info('subprocess')
477            orig_kill = proc.kill
478            proc.kill = kill
479            returncode = transport.get_returncode()
480            transport.close()
481            await asyncio.wait_for(transport._wait(), 5)
482            return (returncode, kill_called)
483
484        # Ignore "Close running child process: kill ..." log
485        with test_utils.disable_logger():
486            try:
487                returncode, killed = self.loop.run_until_complete(
488                    kill_running()
489                )
490            except asyncio.TimeoutError:
491                self.skipTest(
492                    "Timeout failure on waiting for subprocess stopping"
493                )
494        self.assertIsNone(returncode)
495
496        # transport.close() must kill the process if it is still running
497        self.assertTrue(killed)
498        test_utils.run_briefly(self.loop)
499
500    def test_close_dont_kill_finished(self):
501
502        async def kill_running():
503            create = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
504                                               *PROGRAM_BLOCKED)
505            transport, protocol = await create
506            proc = transport.get_extra_info('subprocess')
507
508            # kill the process (but asyncio is not notified immediately)
509            proc.kill()
510            proc.wait()
511
512            proc.kill = mock.Mock()
513            proc_returncode = proc.poll()
514            transport_returncode = transport.get_returncode()
515            transport.close()
516            return (proc_returncode, transport_returncode, proc.kill.called)
517
518        # Ignore "Unknown child process pid ..." log of SafeChildWatcher,
519        # emitted because the test already consumes the exit status:
520        # proc.wait()
521        with test_utils.disable_logger():
522            result = self.loop.run_until_complete(kill_running())
523            test_utils.run_briefly(self.loop)
524
525        proc_returncode, transport_return_code, killed = result
526
527        self.assertIsNotNone(proc_returncode)
528        self.assertIsNone(transport_return_code)
529
530        # transport.close() must not kill the process if it finished, even if
531        # the transport was not notified yet
532        self.assertFalse(killed)
533
534        # Unlike SafeChildWatcher, FastChildWatcher does not pop the
535        # callbacks if waitpid() is called elsewhere. Let's clear them
536        # manually to avoid a warning when the watcher is detached.
537        if (sys.platform != 'win32' and
538                isinstance(self, SubprocessFastWatcherTests)):
539            asyncio.get_child_watcher()._callbacks.clear()
540
541    async def _test_popen_error(self, stdin):
542        if sys.platform == 'win32':
543            target = 'asyncio.windows_utils.Popen'
544        else:
545            target = 'subprocess.Popen'
546        with mock.patch(target) as popen:
547            exc = ZeroDivisionError
548            popen.side_effect = exc
549
550            with warnings.catch_warnings(record=True) as warns:
551                with self.assertRaises(exc):
552                    await asyncio.create_subprocess_exec(
553                        sys.executable,
554                        '-c',
555                        'pass',
556                        stdin=stdin
557                    )
558                self.assertEqual(warns, [])
559
560    def test_popen_error(self):
561        # Issue #24763: check that the subprocess transport is closed
562        # when BaseSubprocessTransport fails
563        self.loop.run_until_complete(self._test_popen_error(stdin=None))
564
565    def test_popen_error_with_stdin_pipe(self):
566        # Issue #35721: check that newly created socket pair is closed when
567        # Popen fails
568        self.loop.run_until_complete(
569            self._test_popen_error(stdin=subprocess.PIPE))
570
571    def test_read_stdout_after_process_exit(self):
572
573        async def execute():
574            code = '\n'.join(['import sys',
575                              'for _ in range(64):',
576                              '    sys.stdout.write("x" * 4096)',
577                              'sys.stdout.flush()',
578                              'sys.exit(1)'])
579
580            process = await asyncio.create_subprocess_exec(
581                sys.executable, '-c', code,
582                stdout=asyncio.subprocess.PIPE,
583            )
584
585            while True:
586                data = await process.stdout.read(65536)
587                if data:
588                    await asyncio.sleep(0.3)
589                else:
590                    break
591
592        self.loop.run_until_complete(execute())
593
594    def test_create_subprocess_exec_text_mode_fails(self):
595        async def execute():
596            with self.assertRaises(ValueError):
597                await subprocess.create_subprocess_exec(sys.executable,
598                                                        text=True)
599
600            with self.assertRaises(ValueError):
601                await subprocess.create_subprocess_exec(sys.executable,
602                                                        encoding="utf-8")
603
604            with self.assertRaises(ValueError):
605                await subprocess.create_subprocess_exec(sys.executable,
606                                                        errors="strict")
607
608        self.loop.run_until_complete(execute())
609
610    def test_create_subprocess_shell_text_mode_fails(self):
611
612        async def execute():
613            with self.assertRaises(ValueError):
614                await subprocess.create_subprocess_shell(sys.executable,
615                                                         text=True)
616
617            with self.assertRaises(ValueError):
618                await subprocess.create_subprocess_shell(sys.executable,
619                                                         encoding="utf-8")
620
621            with self.assertRaises(ValueError):
622                await subprocess.create_subprocess_shell(sys.executable,
623                                                         errors="strict")
624
625        self.loop.run_until_complete(execute())
626
627    def test_create_subprocess_exec_with_path(self):
628        async def execute():
629            p = await subprocess.create_subprocess_exec(
630                os_helper.FakePath(sys.executable), '-c', 'pass')
631            await p.wait()
632            p = await subprocess.create_subprocess_exec(
633                sys.executable, '-c', 'pass', os_helper.FakePath('.'))
634            await p.wait()
635
636        self.assertIsNone(self.loop.run_until_complete(execute()))
637
638
639if sys.platform != 'win32':
640    # Unix
641    class SubprocessWatcherMixin(SubprocessMixin):
642
643        Watcher = None
644
645        def setUp(self):
646            super().setUp()
647            policy = asyncio.get_event_loop_policy()
648            self.loop = policy.new_event_loop()
649            self.set_event_loop(self.loop)
650
651            watcher = self.Watcher()
652            watcher.attach_loop(self.loop)
653            policy.set_child_watcher(watcher)
654
655        def tearDown(self):
656            super().tearDown()
657            policy = asyncio.get_event_loop_policy()
658            watcher = policy.get_child_watcher()
659            policy.set_child_watcher(None)
660            watcher.attach_loop(None)
661            watcher.close()
662
663    class SubprocessThreadedWatcherTests(SubprocessWatcherMixin,
664                                         test_utils.TestCase):
665
666        Watcher = unix_events.ThreadedChildWatcher
667
668    @unittest.skip("bpo-38323: MultiLoopChildWatcher has a race condition \
669                    and these tests can hang the test suite")
670    class SubprocessMultiLoopWatcherTests(SubprocessWatcherMixin,
671                                          test_utils.TestCase):
672
673        Watcher = unix_events.MultiLoopChildWatcher
674
675    class SubprocessSafeWatcherTests(SubprocessWatcherMixin,
676                                     test_utils.TestCase):
677
678        Watcher = unix_events.SafeChildWatcher
679
680    class SubprocessFastWatcherTests(SubprocessWatcherMixin,
681                                     test_utils.TestCase):
682
683        Watcher = unix_events.FastChildWatcher
684
685    def has_pidfd_support():
686        if not hasattr(os, 'pidfd_open'):
687            return False
688        try:
689            os.close(os.pidfd_open(os.getpid()))
690        except OSError:
691            return False
692        return True
693
694    @unittest.skipUnless(
695        has_pidfd_support(),
696        "operating system does not support pidfds",
697    )
698    class SubprocessPidfdWatcherTests(SubprocessWatcherMixin,
699                                      test_utils.TestCase):
700        Watcher = unix_events.PidfdChildWatcher
701
702else:
703    # Windows
704    class SubprocessProactorTests(SubprocessMixin, test_utils.TestCase):
705
706        def setUp(self):
707            super().setUp()
708            self.loop = asyncio.ProactorEventLoop()
709            self.set_event_loop(self.loop)
710
711
712class GenericWatcherTests:
713
714    def test_create_subprocess_fails_with_inactive_watcher(self):
715
716        async def execute():
717            watcher = mock.create_authspec(asyncio.AbstractChildWatcher)
718            watcher.is_active.return_value = False
719            asyncio.set_child_watcher(watcher)
720
721            with self.assertRaises(RuntimeError):
722                await subprocess.create_subprocess_exec(
723                    os_helper.FakePath(sys.executable), '-c', 'pass')
724
725            watcher.add_child_handler.assert_not_called()
726
727        self.assertIsNone(self.loop.run_until_complete(execute()))
728
729
730
731
732if __name__ == '__main__':
733    unittest.main()
734