• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import signal
2import sys
3import unittest
4import warnings
5from unittest import mock
6
7import asyncio
8from asyncio import base_subprocess
9from asyncio import subprocess
10from test.test_asyncio import utils as test_utils
11from test import support
12
13if sys.platform != 'win32':
14    from asyncio import unix_events
15
16# Program blocking
17PROGRAM_BLOCKED = [sys.executable, '-c', 'import time; time.sleep(3600)']
18
19# Program copying input to output
20PROGRAM_CAT = [
21    sys.executable, '-c',
22    ';'.join(('import sys',
23              'data = sys.stdin.buffer.read()',
24              'sys.stdout.buffer.write(data)'))]
25
26
27def tearDownModule():
28    asyncio.set_event_loop_policy(None)
29
30
31class TestSubprocessTransport(base_subprocess.BaseSubprocessTransport):
32    def _start(self, *args, **kwargs):
33        self._proc = mock.Mock()
34        self._proc.stdin = None
35        self._proc.stdout = None
36        self._proc.stderr = None
37        self._proc.pid = -1
38
39
40class SubprocessTransportTests(test_utils.TestCase):
41    def setUp(self):
42        super().setUp()
43        self.loop = self.new_test_loop()
44        self.set_event_loop(self.loop)
45
46    def create_transport(self, waiter=None):
47        protocol = mock.Mock()
48        protocol.connection_made._is_coroutine = False
49        protocol.process_exited._is_coroutine = False
50        transport = TestSubprocessTransport(
51                        self.loop, protocol, ['test'], False,
52                        None, None, None, 0, waiter=waiter)
53        return (transport, protocol)
54
55    def test_proc_exited(self):
56        waiter = self.loop.create_future()
57        transport, protocol = self.create_transport(waiter)
58        transport._process_exited(6)
59        self.loop.run_until_complete(waiter)
60
61        self.assertEqual(transport.get_returncode(), 6)
62
63        self.assertTrue(protocol.connection_made.called)
64        self.assertTrue(protocol.process_exited.called)
65        self.assertTrue(protocol.connection_lost.called)
66        self.assertEqual(protocol.connection_lost.call_args[0], (None,))
67
68        self.assertFalse(transport.is_closing())
69        self.assertIsNone(transport._loop)
70        self.assertIsNone(transport._proc)
71        self.assertIsNone(transport._protocol)
72
73        # methods must raise ProcessLookupError if the process exited
74        self.assertRaises(ProcessLookupError,
75                          transport.send_signal, signal.SIGTERM)
76        self.assertRaises(ProcessLookupError, transport.terminate)
77        self.assertRaises(ProcessLookupError, transport.kill)
78
79        transport.close()
80
81    def test_subprocess_repr(self):
82        waiter = self.loop.create_future()
83        transport, protocol = self.create_transport(waiter)
84        transport._process_exited(6)
85        self.loop.run_until_complete(waiter)
86
87        self.assertEqual(
88            repr(transport),
89            "<TestSubprocessTransport pid=-1 returncode=6>"
90        )
91        transport._returncode = None
92        self.assertEqual(
93            repr(transport),
94            "<TestSubprocessTransport pid=-1 running>"
95        )
96        transport._pid = None
97        transport._returncode = None
98        self.assertEqual(
99            repr(transport),
100            "<TestSubprocessTransport not started>"
101        )
102        transport.close()
103
104
105class SubprocessMixin:
106
107    def test_stdin_stdout(self):
108        args = PROGRAM_CAT
109
110        async def run(data):
111            proc = await asyncio.create_subprocess_exec(
112                *args,
113                stdin=subprocess.PIPE,
114                stdout=subprocess.PIPE,
115            )
116
117            # feed data
118            proc.stdin.write(data)
119            await proc.stdin.drain()
120            proc.stdin.close()
121
122            # get output and exitcode
123            data = await proc.stdout.read()
124            exitcode = await proc.wait()
125            return (exitcode, data)
126
127        task = run(b'some data')
128        task = asyncio.wait_for(task, 60.0)
129        exitcode, stdout = self.loop.run_until_complete(task)
130        self.assertEqual(exitcode, 0)
131        self.assertEqual(stdout, b'some data')
132
133    def test_communicate(self):
134        args = PROGRAM_CAT
135
136        async def run(data):
137            proc = await asyncio.create_subprocess_exec(
138                *args,
139                stdin=subprocess.PIPE,
140                stdout=subprocess.PIPE,
141            )
142            stdout, stderr = await proc.communicate(data)
143            return proc.returncode, stdout
144
145        task = run(b'some data')
146        task = asyncio.wait_for(task, 60.0)
147        exitcode, stdout = self.loop.run_until_complete(task)
148        self.assertEqual(exitcode, 0)
149        self.assertEqual(stdout, b'some data')
150
151    def test_shell(self):
152        proc = self.loop.run_until_complete(
153            asyncio.create_subprocess_shell('exit 7')
154        )
155        exitcode = self.loop.run_until_complete(proc.wait())
156        self.assertEqual(exitcode, 7)
157
158    def test_start_new_session(self):
159        # start the new process in a new session
160        proc = self.loop.run_until_complete(
161            asyncio.create_subprocess_shell(
162                'exit 8',
163                start_new_session=True,
164            )
165        )
166        exitcode = self.loop.run_until_complete(proc.wait())
167        self.assertEqual(exitcode, 8)
168
169    def test_kill(self):
170        args = PROGRAM_BLOCKED
171        proc = self.loop.run_until_complete(
172            asyncio.create_subprocess_exec(*args)
173        )
174        proc.kill()
175        returncode = self.loop.run_until_complete(proc.wait())
176        if sys.platform == 'win32':
177            self.assertIsInstance(returncode, int)
178            # expect 1 but sometimes get 0
179        else:
180            self.assertEqual(-signal.SIGKILL, returncode)
181
182    def test_terminate(self):
183        args = PROGRAM_BLOCKED
184        proc = self.loop.run_until_complete(
185            asyncio.create_subprocess_exec(*args)
186        )
187        proc.terminate()
188        returncode = self.loop.run_until_complete(proc.wait())
189        if sys.platform == 'win32':
190            self.assertIsInstance(returncode, int)
191            # expect 1 but sometimes get 0
192        else:
193            self.assertEqual(-signal.SIGTERM, returncode)
194
195    @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
196    def test_send_signal(self):
197        # bpo-31034: Make sure that we get the default signal handler (killing
198        # the process). The parent process may have decided to ignore SIGHUP,
199        # and signal handlers are inherited.
200        old_handler = signal.signal(signal.SIGHUP, signal.SIG_DFL)
201        try:
202            code = 'import time; print("sleeping", flush=True); time.sleep(3600)'
203            args = [sys.executable, '-c', code]
204            proc = self.loop.run_until_complete(
205                asyncio.create_subprocess_exec(
206                    *args,
207                    stdout=subprocess.PIPE,
208                )
209            )
210
211            async def send_signal(proc):
212                # basic synchronization to wait until the program is sleeping
213                line = await proc.stdout.readline()
214                self.assertEqual(line, b'sleeping\n')
215
216                proc.send_signal(signal.SIGHUP)
217                returncode = await proc.wait()
218                return returncode
219
220            returncode = self.loop.run_until_complete(send_signal(proc))
221            self.assertEqual(-signal.SIGHUP, returncode)
222        finally:
223            signal.signal(signal.SIGHUP, old_handler)
224
225    def prepare_broken_pipe_test(self):
226        # buffer large enough to feed the whole pipe buffer
227        large_data = b'x' * support.PIPE_MAX_SIZE
228
229        # the program ends before the stdin can be feeded
230        proc = self.loop.run_until_complete(
231            asyncio.create_subprocess_exec(
232                sys.executable, '-c', 'pass',
233                stdin=subprocess.PIPE,
234            )
235        )
236
237        return (proc, large_data)
238
239    def test_stdin_broken_pipe(self):
240        proc, large_data = self.prepare_broken_pipe_test()
241
242        async def write_stdin(proc, data):
243            await asyncio.sleep(0.5)
244            proc.stdin.write(data)
245            await proc.stdin.drain()
246
247        coro = write_stdin(proc, large_data)
248        # drain() must raise BrokenPipeError or ConnectionResetError
249        with test_utils.disable_logger():
250            self.assertRaises((BrokenPipeError, ConnectionResetError),
251                              self.loop.run_until_complete, coro)
252        self.loop.run_until_complete(proc.wait())
253
254    def test_communicate_ignore_broken_pipe(self):
255        proc, large_data = self.prepare_broken_pipe_test()
256
257        # communicate() must ignore BrokenPipeError when feeding stdin
258        self.loop.set_exception_handler(lambda loop, msg: None)
259        self.loop.run_until_complete(proc.communicate(large_data))
260        self.loop.run_until_complete(proc.wait())
261
262    def test_pause_reading(self):
263        limit = 10
264        size = (limit * 2 + 1)
265
266        async def test_pause_reading():
267            code = '\n'.join((
268                'import sys',
269                'sys.stdout.write("x" * %s)' % size,
270                'sys.stdout.flush()',
271            ))
272
273            connect_read_pipe = self.loop.connect_read_pipe
274
275            async def connect_read_pipe_mock(*args, **kw):
276                transport, protocol = await connect_read_pipe(*args, **kw)
277                transport.pause_reading = mock.Mock()
278                transport.resume_reading = mock.Mock()
279                return (transport, protocol)
280
281            self.loop.connect_read_pipe = connect_read_pipe_mock
282
283            proc = await asyncio.create_subprocess_exec(
284                sys.executable, '-c', code,
285                stdin=asyncio.subprocess.PIPE,
286                stdout=asyncio.subprocess.PIPE,
287                limit=limit,
288            )
289            stdout_transport = proc._transport.get_pipe_transport(1)
290
291            stdout, stderr = await proc.communicate()
292
293            # The child process produced more than limit bytes of output,
294            # the stream reader transport should pause the protocol to not
295            # allocate too much memory.
296            return (stdout, stdout_transport)
297
298        # Issue #22685: Ensure that the stream reader pauses the protocol
299        # when the child process produces too much data
300        stdout, transport = self.loop.run_until_complete(test_pause_reading())
301
302        self.assertEqual(stdout, b'x' * size)
303        self.assertTrue(transport.pause_reading.called)
304        self.assertTrue(transport.resume_reading.called)
305
306    def test_stdin_not_inheritable(self):
307        # asyncio issue #209: stdin must not be inheritable, otherwise
308        # the Process.communicate() hangs
309        async def len_message(message):
310            code = 'import sys; data = sys.stdin.read(); print(len(data))'
311            proc = await asyncio.create_subprocess_exec(
312                sys.executable, '-c', code,
313                stdin=asyncio.subprocess.PIPE,
314                stdout=asyncio.subprocess.PIPE,
315                stderr=asyncio.subprocess.PIPE,
316                close_fds=False,
317            )
318            stdout, stderr = await proc.communicate(message)
319            exitcode = await proc.wait()
320            return (stdout, exitcode)
321
322        output, exitcode = self.loop.run_until_complete(len_message(b'abc'))
323        self.assertEqual(output.rstrip(), b'3')
324        self.assertEqual(exitcode, 0)
325
326    def test_empty_input(self):
327
328        async def empty_input():
329            code = 'import sys; data = sys.stdin.read(); print(len(data))'
330            proc = await asyncio.create_subprocess_exec(
331                sys.executable, '-c', code,
332                stdin=asyncio.subprocess.PIPE,
333                stdout=asyncio.subprocess.PIPE,
334                stderr=asyncio.subprocess.PIPE,
335                close_fds=False,
336            )
337            stdout, stderr = await proc.communicate(b'')
338            exitcode = await proc.wait()
339            return (stdout, exitcode)
340
341        output, exitcode = self.loop.run_until_complete(empty_input())
342        self.assertEqual(output.rstrip(), b'0')
343        self.assertEqual(exitcode, 0)
344
345    def test_devnull_input(self):
346
347        async def empty_input():
348            code = 'import sys; data = sys.stdin.read(); print(len(data))'
349            proc = await asyncio.create_subprocess_exec(
350                sys.executable, '-c', code,
351                stdin=asyncio.subprocess.DEVNULL,
352                stdout=asyncio.subprocess.PIPE,
353                stderr=asyncio.subprocess.PIPE,
354                close_fds=False,
355            )
356            stdout, stderr = await proc.communicate()
357            exitcode = await proc.wait()
358            return (stdout, exitcode)
359
360        output, exitcode = self.loop.run_until_complete(empty_input())
361        self.assertEqual(output.rstrip(), b'0')
362        self.assertEqual(exitcode, 0)
363
364    def test_devnull_output(self):
365
366        async def empty_output():
367            code = 'import sys; data = sys.stdin.read(); print(len(data))'
368            proc = await asyncio.create_subprocess_exec(
369                sys.executable, '-c', code,
370                stdin=asyncio.subprocess.PIPE,
371                stdout=asyncio.subprocess.DEVNULL,
372                stderr=asyncio.subprocess.PIPE,
373                close_fds=False,
374            )
375            stdout, stderr = await proc.communicate(b"abc")
376            exitcode = await proc.wait()
377            return (stdout, exitcode)
378
379        output, exitcode = self.loop.run_until_complete(empty_output())
380        self.assertEqual(output, None)
381        self.assertEqual(exitcode, 0)
382
383    def test_devnull_error(self):
384
385        async def empty_error():
386            code = 'import sys; data = sys.stdin.read(); print(len(data))'
387            proc = await asyncio.create_subprocess_exec(
388                sys.executable, '-c', code,
389                stdin=asyncio.subprocess.PIPE,
390                stdout=asyncio.subprocess.PIPE,
391                stderr=asyncio.subprocess.DEVNULL,
392                close_fds=False,
393            )
394            stdout, stderr = await proc.communicate(b"abc")
395            exitcode = await proc.wait()
396            return (stderr, exitcode)
397
398        output, exitcode = self.loop.run_until_complete(empty_error())
399        self.assertEqual(output, None)
400        self.assertEqual(exitcode, 0)
401
402    def test_cancel_process_wait(self):
403        # Issue #23140: cancel Process.wait()
404
405        async def cancel_wait():
406            proc = await asyncio.create_subprocess_exec(*PROGRAM_BLOCKED)
407
408            # Create an internal future waiting on the process exit
409            task = self.loop.create_task(proc.wait())
410            self.loop.call_soon(task.cancel)
411            try:
412                await task
413            except asyncio.CancelledError:
414                pass
415
416            # Cancel the future
417            task.cancel()
418
419            # Kill the process and wait until it is done
420            proc.kill()
421            await proc.wait()
422
423        self.loop.run_until_complete(cancel_wait())
424
425    def test_cancel_make_subprocess_transport_exec(self):
426
427        async def cancel_make_transport():
428            coro = asyncio.create_subprocess_exec(*PROGRAM_BLOCKED)
429            task = self.loop.create_task(coro)
430
431            self.loop.call_soon(task.cancel)
432            try:
433                await task
434            except asyncio.CancelledError:
435                pass
436
437        # ignore the log:
438        # "Exception during subprocess creation, kill the subprocess"
439        with test_utils.disable_logger():
440            self.loop.run_until_complete(cancel_make_transport())
441
442    def test_cancel_post_init(self):
443
444        async def cancel_make_transport():
445            coro = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
446                                             *PROGRAM_BLOCKED)
447            task = self.loop.create_task(coro)
448
449            self.loop.call_soon(task.cancel)
450            try:
451                await task
452            except asyncio.CancelledError:
453                pass
454
455        # ignore the log:
456        # "Exception during subprocess creation, kill the subprocess"
457        with test_utils.disable_logger():
458            self.loop.run_until_complete(cancel_make_transport())
459            test_utils.run_briefly(self.loop)
460
461    def test_close_kill_running(self):
462
463        async def kill_running():
464            create = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
465                                               *PROGRAM_BLOCKED)
466            transport, protocol = await create
467
468            kill_called = False
469            def kill():
470                nonlocal kill_called
471                kill_called = True
472                orig_kill()
473
474            proc = transport.get_extra_info('subprocess')
475            orig_kill = proc.kill
476            proc.kill = kill
477            returncode = transport.get_returncode()
478            transport.close()
479            await transport._wait()
480            return (returncode, kill_called)
481
482        # Ignore "Close running child process: kill ..." log
483        with test_utils.disable_logger():
484            returncode, killed = self.loop.run_until_complete(kill_running())
485        self.assertIsNone(returncode)
486
487        # transport.close() must kill the process if it is still running
488        self.assertTrue(killed)
489        test_utils.run_briefly(self.loop)
490
491    def test_close_dont_kill_finished(self):
492
493        async def kill_running():
494            create = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
495                                               *PROGRAM_BLOCKED)
496            transport, protocol = await create
497            proc = transport.get_extra_info('subprocess')
498
499            # kill the process (but asyncio is not notified immediately)
500            proc.kill()
501            proc.wait()
502
503            proc.kill = mock.Mock()
504            proc_returncode = proc.poll()
505            transport_returncode = transport.get_returncode()
506            transport.close()
507            return (proc_returncode, transport_returncode, proc.kill.called)
508
509        # Ignore "Unknown child process pid ..." log of SafeChildWatcher,
510        # emitted because the test already consumes the exit status:
511        # proc.wait()
512        with test_utils.disable_logger():
513            result = self.loop.run_until_complete(kill_running())
514            test_utils.run_briefly(self.loop)
515
516        proc_returncode, transport_return_code, killed = result
517
518        self.assertIsNotNone(proc_returncode)
519        self.assertIsNone(transport_return_code)
520
521        # transport.close() must not kill the process if it finished, even if
522        # the transport was not notified yet
523        self.assertFalse(killed)
524
525        # Unlike SafeChildWatcher, FastChildWatcher does not pop the
526        # callbacks if waitpid() is called elsewhere. Let's clear them
527        # manually to avoid a warning when the watcher is detached.
528        if (sys.platform != 'win32' and
529                isinstance(self, SubprocessFastWatcherTests)):
530            asyncio.get_child_watcher()._callbacks.clear()
531
532    async def _test_popen_error(self, stdin):
533        if sys.platform == 'win32':
534            target = 'asyncio.windows_utils.Popen'
535        else:
536            target = 'subprocess.Popen'
537        with mock.patch(target) as popen:
538            exc = ZeroDivisionError
539            popen.side_effect = exc
540
541            with warnings.catch_warnings(record=True) as warns:
542                with self.assertRaises(exc):
543                    await asyncio.create_subprocess_exec(
544                        sys.executable,
545                        '-c',
546                        'pass',
547                        stdin=stdin
548                    )
549                self.assertEqual(warns, [])
550
551    def test_popen_error(self):
552        # Issue #24763: check that the subprocess transport is closed
553        # when BaseSubprocessTransport fails
554        self.loop.run_until_complete(self._test_popen_error(stdin=None))
555
556    def test_popen_error_with_stdin_pipe(self):
557        # Issue #35721: check that newly created socket pair is closed when
558        # Popen fails
559        self.loop.run_until_complete(
560            self._test_popen_error(stdin=subprocess.PIPE))
561
562    def test_read_stdout_after_process_exit(self):
563
564        async def execute():
565            code = '\n'.join(['import sys',
566                              'for _ in range(64):',
567                              '    sys.stdout.write("x" * 4096)',
568                              'sys.stdout.flush()',
569                              'sys.exit(1)'])
570
571            process = await asyncio.create_subprocess_exec(
572                sys.executable, '-c', code,
573                stdout=asyncio.subprocess.PIPE,
574            )
575
576            while True:
577                data = await process.stdout.read(65536)
578                if data:
579                    await asyncio.sleep(0.3)
580                else:
581                    break
582
583        self.loop.run_until_complete(execute())
584
585    def test_create_subprocess_exec_text_mode_fails(self):
586        async def execute():
587            with self.assertRaises(ValueError):
588                await subprocess.create_subprocess_exec(sys.executable,
589                                                        text=True)
590
591            with self.assertRaises(ValueError):
592                await subprocess.create_subprocess_exec(sys.executable,
593                                                        encoding="utf-8")
594
595            with self.assertRaises(ValueError):
596                await subprocess.create_subprocess_exec(sys.executable,
597                                                        errors="strict")
598
599        self.loop.run_until_complete(execute())
600
601    def test_create_subprocess_shell_text_mode_fails(self):
602
603        async def execute():
604            with self.assertRaises(ValueError):
605                await subprocess.create_subprocess_shell(sys.executable,
606                                                         text=True)
607
608            with self.assertRaises(ValueError):
609                await subprocess.create_subprocess_shell(sys.executable,
610                                                         encoding="utf-8")
611
612            with self.assertRaises(ValueError):
613                await subprocess.create_subprocess_shell(sys.executable,
614                                                         errors="strict")
615
616        self.loop.run_until_complete(execute())
617
618    def test_create_subprocess_exec_with_path(self):
619        async def execute():
620            p = await subprocess.create_subprocess_exec(
621                support.FakePath(sys.executable), '-c', 'pass')
622            await p.wait()
623            p = await subprocess.create_subprocess_exec(
624                sys.executable, '-c', 'pass', support.FakePath('.'))
625            await p.wait()
626
627        self.assertIsNone(self.loop.run_until_complete(execute()))
628
629    def test_exec_loop_deprecated(self):
630        async def go():
631            with self.assertWarns(DeprecationWarning):
632                proc = await asyncio.create_subprocess_exec(
633                    sys.executable, '-c', 'pass',
634                    loop=self.loop,
635                )
636            await proc.wait()
637        self.loop.run_until_complete(go())
638
639    def test_shell_loop_deprecated(self):
640        async def go():
641            with self.assertWarns(DeprecationWarning):
642                proc = await asyncio.create_subprocess_shell(
643                    "exit 0",
644                    loop=self.loop,
645                )
646            await proc.wait()
647        self.loop.run_until_complete(go())
648
649
650if sys.platform != 'win32':
651    # Unix
652    class SubprocessWatcherMixin(SubprocessMixin):
653
654        Watcher = None
655
656        def setUp(self):
657            super().setUp()
658            policy = asyncio.get_event_loop_policy()
659            self.loop = policy.new_event_loop()
660            self.set_event_loop(self.loop)
661
662            watcher = self.Watcher()
663            watcher.attach_loop(self.loop)
664            policy.set_child_watcher(watcher)
665
666        def tearDown(self):
667            super().tearDown()
668            policy = asyncio.get_event_loop_policy()
669            watcher = policy.get_child_watcher()
670            policy.set_child_watcher(None)
671            watcher.attach_loop(None)
672            watcher.close()
673
674    class SubprocessThreadedWatcherTests(SubprocessWatcherMixin,
675                                         test_utils.TestCase):
676
677        Watcher = unix_events.ThreadedChildWatcher
678
679    class SubprocessMultiLoopWatcherTests(SubprocessWatcherMixin,
680                                          test_utils.TestCase):
681
682        Watcher = unix_events.MultiLoopChildWatcher
683
684    class SubprocessSafeWatcherTests(SubprocessWatcherMixin,
685                                     test_utils.TestCase):
686
687        Watcher = unix_events.SafeChildWatcher
688
689    class SubprocessFastWatcherTests(SubprocessWatcherMixin,
690                                     test_utils.TestCase):
691
692        Watcher = unix_events.FastChildWatcher
693
694else:
695    # Windows
696    class SubprocessProactorTests(SubprocessMixin, test_utils.TestCase):
697
698        def setUp(self):
699            super().setUp()
700            self.loop = asyncio.ProactorEventLoop()
701            self.set_event_loop(self.loop)
702
703
704class GenericWatcherTests:
705
706    def test_create_subprocess_fails_with_inactive_watcher(self):
707
708        async def execute():
709            watcher = mock.create_authspec(asyncio.AbstractChildWatcher)
710            watcher.is_active.return_value = False
711            asyncio.set_child_watcher(watcher)
712
713            with self.assertRaises(RuntimeError):
714                await subprocess.create_subprocess_exec(
715                    support.FakePath(sys.executable), '-c', 'pass')
716
717            watcher.add_child_handler.assert_not_called()
718
719        self.assertIsNone(self.loop.run_until_complete(execute()))
720
721
722
723
724if __name__ == '__main__':
725    unittest.main()
726