• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import contextlib
2import queue
3import signal
4import sys
5import time
6import unittest
7import unittest.mock
8from pickle import PicklingError
9from concurrent import futures
10from concurrent.futures.process import BrokenProcessPool, _ThreadWakeup
11
12from test import support
13
14from .util import (
15    create_executor_tests, setup_module,
16    ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin)
17
18
19def _crash(delay=None):
20    """Induces a segfault."""
21    if delay:
22        time.sleep(delay)
23    import faulthandler
24    faulthandler.disable()
25    faulthandler._sigsegv()
26
27
28def _crash_with_data(data):
29    """Induces a segfault with dummy data in input."""
30    _crash()
31
32
33def _exit():
34    """Induces a sys exit with exitcode 1."""
35    sys.exit(1)
36
37
38def _raise_error(Err):
39    """Function that raises an Exception in process."""
40    raise Err()
41
42
43def _raise_error_ignore_stderr(Err):
44    """Function that raises an Exception in process and ignores stderr."""
45    import io
46    sys.stderr = io.StringIO()
47    raise Err()
48
49
50def _return_instance(cls):
51    """Function that returns a instance of cls."""
52    return cls()
53
54
55class CrashAtPickle(object):
56    """Bad object that triggers a segfault at pickling time."""
57    def __reduce__(self):
58        _crash()
59
60
61class CrashAtUnpickle(object):
62    """Bad object that triggers a segfault at unpickling time."""
63    def __reduce__(self):
64        return _crash, ()
65
66
67class ExitAtPickle(object):
68    """Bad object that triggers a process exit at pickling time."""
69    def __reduce__(self):
70        _exit()
71
72
73class ExitAtUnpickle(object):
74    """Bad object that triggers a process exit at unpickling time."""
75    def __reduce__(self):
76        return _exit, ()
77
78
79class ErrorAtPickle(object):
80    """Bad object that triggers an error at pickling time."""
81    def __reduce__(self):
82        from pickle import PicklingError
83        raise PicklingError("Error in pickle")
84
85
86class ErrorAtUnpickle(object):
87    """Bad object that triggers an error at unpickling time."""
88    def __reduce__(self):
89        from pickle import UnpicklingError
90        return _raise_error_ignore_stderr, (UnpicklingError, )
91
92
93class ExecutorDeadlockTest:
94    TIMEOUT = support.LONG_TIMEOUT
95
96    def _fail_on_deadlock(self, executor):
97        # If we did not recover before TIMEOUT seconds, consider that the
98        # executor is in a deadlock state and forcefully clean all its
99        # composants.
100        import faulthandler
101        from tempfile import TemporaryFile
102        with TemporaryFile(mode="w+") as f:
103            faulthandler.dump_traceback(file=f)
104            f.seek(0)
105            tb = f.read()
106        for p in executor._processes.values():
107            p.terminate()
108        # This should be safe to call executor.shutdown here as all possible
109        # deadlocks should have been broken.
110        executor.shutdown(wait=True)
111        print(f"\nTraceback:\n {tb}", file=sys.__stderr__)
112        self.fail(f"Executor deadlock:\n\n{tb}")
113
114
115    def _check_crash(self, error, func, *args, ignore_stderr=False):
116        # test for deadlock caused by crashes in a pool
117        self.executor.shutdown(wait=True)
118
119        executor = self.executor_type(
120            max_workers=2, mp_context=self.get_context())
121        res = executor.submit(func, *args)
122
123        if ignore_stderr:
124            cm = support.captured_stderr()
125        else:
126            cm = contextlib.nullcontext()
127
128        try:
129            with self.assertRaises(error):
130                with cm:
131                    res.result(timeout=self.TIMEOUT)
132        except futures.TimeoutError:
133            # If we did not recover before TIMEOUT seconds,
134            # consider that the executor is in a deadlock state
135            self._fail_on_deadlock(executor)
136        executor.shutdown(wait=True)
137
138    def test_error_at_task_pickle(self):
139        # Check problem occurring while pickling a task in
140        # the task_handler thread
141        self._check_crash(PicklingError, id, ErrorAtPickle())
142
143    def test_exit_at_task_unpickle(self):
144        # Check problem occurring while unpickling a task on workers
145        self._check_crash(BrokenProcessPool, id, ExitAtUnpickle())
146
147    def test_error_at_task_unpickle(self):
148        # gh-109832: Restore stderr overridden by _raise_error_ignore_stderr()
149        self.addCleanup(setattr, sys, 'stderr', sys.stderr)
150
151        # Check problem occurring while unpickling a task on workers
152        self._check_crash(BrokenProcessPool, id, ErrorAtUnpickle())
153
154    def test_crash_at_task_unpickle(self):
155        # Check problem occurring while unpickling a task on workers
156        self._check_crash(BrokenProcessPool, id, CrashAtUnpickle())
157
158    def test_crash_during_func_exec_on_worker(self):
159        # Check problem occurring during func execution on workers
160        self._check_crash(BrokenProcessPool, _crash)
161
162    def test_exit_during_func_exec_on_worker(self):
163        # Check problem occurring during func execution on workers
164        self._check_crash(SystemExit, _exit)
165
166    def test_error_during_func_exec_on_worker(self):
167        # Check problem occurring during func execution on workers
168        self._check_crash(RuntimeError, _raise_error, RuntimeError)
169
170    def test_crash_during_result_pickle_on_worker(self):
171        # Check problem occurring while pickling a task result
172        # on workers
173        self._check_crash(BrokenProcessPool, _return_instance, CrashAtPickle)
174
175    def test_exit_during_result_pickle_on_worker(self):
176        # Check problem occurring while pickling a task result
177        # on workers
178        self._check_crash(SystemExit, _return_instance, ExitAtPickle)
179
180    def test_error_during_result_pickle_on_worker(self):
181        # Check problem occurring while pickling a task result
182        # on workers
183        self._check_crash(PicklingError, _return_instance, ErrorAtPickle)
184
185    def test_error_during_result_unpickle_in_result_handler(self):
186        # gh-109832: Restore stderr overridden by _raise_error_ignore_stderr()
187        self.addCleanup(setattr, sys, 'stderr', sys.stderr)
188
189        # Check problem occurring while unpickling a task in
190        # the result_handler thread
191        self._check_crash(BrokenProcessPool,
192                          _return_instance, ErrorAtUnpickle,
193                          ignore_stderr=True)
194
195    def test_exit_during_result_unpickle_in_result_handler(self):
196        # Check problem occurring while unpickling a task in
197        # the result_handler thread
198        self._check_crash(BrokenProcessPool, _return_instance, ExitAtUnpickle)
199
200    def test_shutdown_deadlock(self):
201        # Test that the pool calling shutdown do not cause deadlock
202        # if a worker fails after the shutdown call.
203        self.executor.shutdown(wait=True)
204        with self.executor_type(max_workers=2,
205                                mp_context=self.get_context()) as executor:
206            self.executor = executor  # Allow clean up in fail_on_deadlock
207            f = executor.submit(_crash, delay=.1)
208            executor.shutdown(wait=True)
209            with self.assertRaises(BrokenProcessPool):
210                f.result()
211
212    def test_shutdown_deadlock_pickle(self):
213        # Test that the pool calling shutdown with wait=False does not cause
214        # a deadlock if a task fails at pickle after the shutdown call.
215        # Reported in bpo-39104.
216        self.executor.shutdown(wait=True)
217        with self.executor_type(max_workers=2,
218                                mp_context=self.get_context()) as executor:
219            self.executor = executor  # Allow clean up in fail_on_deadlock
220
221            # Start the executor and get the executor_manager_thread to collect
222            # the threads and avoid dangling thread that should be cleaned up
223            # asynchronously.
224            executor.submit(id, 42).result()
225            executor_manager = executor._executor_manager_thread
226
227            # Submit a task that fails at pickle and shutdown the executor
228            # without waiting
229            f = executor.submit(id, ErrorAtPickle())
230            executor.shutdown(wait=False)
231            with self.assertRaises(PicklingError):
232                f.result()
233
234        # Make sure the executor is eventually shutdown and do not leave
235        # dangling threads
236        executor_manager.join()
237
238    def test_crash_big_data(self):
239        # Test that there is a clean exception instad of a deadlock when a
240        # child process crashes while some data is being written into the
241        # queue.
242        # https://github.com/python/cpython/issues/94777
243        self.executor.shutdown(wait=True)
244        data = "a" * support.PIPE_MAX_SIZE
245        with self.executor_type(max_workers=2,
246                                mp_context=self.get_context()) as executor:
247            self.executor = executor  # Allow clean up in fail_on_deadlock
248            with self.assertRaises(BrokenProcessPool):
249                list(executor.map(_crash_with_data, [data] * 10))
250
251        executor.shutdown(wait=True)
252
253    def test_gh105829_should_not_deadlock_if_wakeup_pipe_full(self):
254        # Issue #105829: The _ExecutorManagerThread wakeup pipe could
255        # fill up and block. See: https://github.com/python/cpython/issues/105829
256
257        # Lots of cargo culting while writing this test, apologies if
258        # something is really stupid...
259
260        self.executor.shutdown(wait=True)
261
262        if not hasattr(signal, 'alarm'):
263            raise unittest.SkipTest(
264                "Tested platform does not support the alarm signal")
265
266        def timeout(_signum, _frame):
267            import faulthandler
268            faulthandler.dump_traceback()
269
270            raise RuntimeError("timed out while submitting jobs?")
271
272        thread_run = futures.process._ExecutorManagerThread.run
273        def mock_run(self):
274            # Delay thread startup so the wakeup pipe can fill up and block
275            time.sleep(3)
276            thread_run(self)
277
278        class MockWakeup(_ThreadWakeup):
279            """Mock wakeup object to force the wakeup to block"""
280            def __init__(self):
281                super().__init__()
282                self._dummy_queue = queue.Queue(maxsize=1)
283
284            def wakeup(self):
285                self._dummy_queue.put(None, block=True)
286                super().wakeup()
287
288            def clear(self):
289                super().clear()
290                try:
291                    while True:
292                        self._dummy_queue.get_nowait()
293                except queue.Empty:
294                    pass
295
296        with (unittest.mock.patch.object(futures.process._ExecutorManagerThread,
297                                         'run', mock_run),
298              unittest.mock.patch('concurrent.futures.process._ThreadWakeup',
299                                  MockWakeup)):
300            with self.executor_type(max_workers=2,
301                                    mp_context=self.get_context()) as executor:
302                self.executor = executor  # Allow clean up in fail_on_deadlock
303
304                job_num = 100
305                job_data = range(job_num)
306
307                # Need to use sigalarm for timeout detection because
308                # Executor.submit is not guarded by any timeout (both
309                # self._work_ids.put(self._queue_count) and
310                # self._executor_manager_thread_wakeup.wakeup() might
311                # timeout, maybe more?). In this specific case it was
312                # the wakeup call that deadlocked on a blocking pipe.
313                old_handler = signal.signal(signal.SIGALRM, timeout)
314                try:
315                    signal.alarm(int(self.TIMEOUT))
316                    self.assertEqual(job_num, len(list(executor.map(int, job_data))))
317                finally:
318                    signal.alarm(0)
319                    signal.signal(signal.SIGALRM, old_handler)
320
321
322create_executor_tests(globals(), ExecutorDeadlockTest,
323                      executor_mixins=(ProcessPoolForkMixin,
324                                       ProcessPoolForkserverMixin,
325                                       ProcessPoolSpawnMixin))
326
327def setUpModule():
328    setup_module()
329
330
331if __name__ == "__main__":
332    unittest.main()
333