1import contextlib 2import queue 3import signal 4import sys 5import time 6import unittest 7import unittest.mock 8from pickle import PicklingError 9from concurrent import futures 10from concurrent.futures.process import BrokenProcessPool, _ThreadWakeup 11 12from test import support 13 14from .util import ( 15 create_executor_tests, setup_module, 16 ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin) 17 18 19def _crash(delay=None): 20 """Induces a segfault.""" 21 if delay: 22 time.sleep(delay) 23 import faulthandler 24 faulthandler.disable() 25 faulthandler._sigsegv() 26 27 28def _crash_with_data(data): 29 """Induces a segfault with dummy data in input.""" 30 _crash() 31 32 33def _exit(): 34 """Induces a sys exit with exitcode 1.""" 35 sys.exit(1) 36 37 38def _raise_error(Err): 39 """Function that raises an Exception in process.""" 40 raise Err() 41 42 43def _raise_error_ignore_stderr(Err): 44 """Function that raises an Exception in process and ignores stderr.""" 45 import io 46 sys.stderr = io.StringIO() 47 raise Err() 48 49 50def _return_instance(cls): 51 """Function that returns a instance of cls.""" 52 return cls() 53 54 55class CrashAtPickle(object): 56 """Bad object that triggers a segfault at pickling time.""" 57 def __reduce__(self): 58 _crash() 59 60 61class CrashAtUnpickle(object): 62 """Bad object that triggers a segfault at unpickling time.""" 63 def __reduce__(self): 64 return _crash, () 65 66 67class ExitAtPickle(object): 68 """Bad object that triggers a process exit at pickling time.""" 69 def __reduce__(self): 70 _exit() 71 72 73class ExitAtUnpickle(object): 74 """Bad object that triggers a process exit at unpickling time.""" 75 def __reduce__(self): 76 return _exit, () 77 78 79class ErrorAtPickle(object): 80 """Bad object that triggers an error at pickling time.""" 81 def __reduce__(self): 82 from pickle import PicklingError 83 raise PicklingError("Error in pickle") 84 85 86class ErrorAtUnpickle(object): 87 """Bad object that triggers an error at unpickling time.""" 88 def __reduce__(self): 89 from pickle import UnpicklingError 90 return _raise_error_ignore_stderr, (UnpicklingError, ) 91 92 93class ExecutorDeadlockTest: 94 TIMEOUT = support.LONG_TIMEOUT 95 96 def _fail_on_deadlock(self, executor): 97 # If we did not recover before TIMEOUT seconds, consider that the 98 # executor is in a deadlock state and forcefully clean all its 99 # composants. 100 import faulthandler 101 from tempfile import TemporaryFile 102 with TemporaryFile(mode="w+") as f: 103 faulthandler.dump_traceback(file=f) 104 f.seek(0) 105 tb = f.read() 106 for p in executor._processes.values(): 107 p.terminate() 108 # This should be safe to call executor.shutdown here as all possible 109 # deadlocks should have been broken. 110 executor.shutdown(wait=True) 111 print(f"\nTraceback:\n {tb}", file=sys.__stderr__) 112 self.fail(f"Executor deadlock:\n\n{tb}") 113 114 115 def _check_crash(self, error, func, *args, ignore_stderr=False): 116 # test for deadlock caused by crashes in a pool 117 self.executor.shutdown(wait=True) 118 119 executor = self.executor_type( 120 max_workers=2, mp_context=self.get_context()) 121 res = executor.submit(func, *args) 122 123 if ignore_stderr: 124 cm = support.captured_stderr() 125 else: 126 cm = contextlib.nullcontext() 127 128 try: 129 with self.assertRaises(error): 130 with cm: 131 res.result(timeout=self.TIMEOUT) 132 except futures.TimeoutError: 133 # If we did not recover before TIMEOUT seconds, 134 # consider that the executor is in a deadlock state 135 self._fail_on_deadlock(executor) 136 executor.shutdown(wait=True) 137 138 def test_error_at_task_pickle(self): 139 # Check problem occurring while pickling a task in 140 # the task_handler thread 141 self._check_crash(PicklingError, id, ErrorAtPickle()) 142 143 def test_exit_at_task_unpickle(self): 144 # Check problem occurring while unpickling a task on workers 145 self._check_crash(BrokenProcessPool, id, ExitAtUnpickle()) 146 147 def test_error_at_task_unpickle(self): 148 # gh-109832: Restore stderr overridden by _raise_error_ignore_stderr() 149 self.addCleanup(setattr, sys, 'stderr', sys.stderr) 150 151 # Check problem occurring while unpickling a task on workers 152 self._check_crash(BrokenProcessPool, id, ErrorAtUnpickle()) 153 154 def test_crash_at_task_unpickle(self): 155 # Check problem occurring while unpickling a task on workers 156 self._check_crash(BrokenProcessPool, id, CrashAtUnpickle()) 157 158 def test_crash_during_func_exec_on_worker(self): 159 # Check problem occurring during func execution on workers 160 self._check_crash(BrokenProcessPool, _crash) 161 162 def test_exit_during_func_exec_on_worker(self): 163 # Check problem occurring during func execution on workers 164 self._check_crash(SystemExit, _exit) 165 166 def test_error_during_func_exec_on_worker(self): 167 # Check problem occurring during func execution on workers 168 self._check_crash(RuntimeError, _raise_error, RuntimeError) 169 170 def test_crash_during_result_pickle_on_worker(self): 171 # Check problem occurring while pickling a task result 172 # on workers 173 self._check_crash(BrokenProcessPool, _return_instance, CrashAtPickle) 174 175 def test_exit_during_result_pickle_on_worker(self): 176 # Check problem occurring while pickling a task result 177 # on workers 178 self._check_crash(SystemExit, _return_instance, ExitAtPickle) 179 180 def test_error_during_result_pickle_on_worker(self): 181 # Check problem occurring while pickling a task result 182 # on workers 183 self._check_crash(PicklingError, _return_instance, ErrorAtPickle) 184 185 def test_error_during_result_unpickle_in_result_handler(self): 186 # gh-109832: Restore stderr overridden by _raise_error_ignore_stderr() 187 self.addCleanup(setattr, sys, 'stderr', sys.stderr) 188 189 # Check problem occurring while unpickling a task in 190 # the result_handler thread 191 self._check_crash(BrokenProcessPool, 192 _return_instance, ErrorAtUnpickle, 193 ignore_stderr=True) 194 195 def test_exit_during_result_unpickle_in_result_handler(self): 196 # Check problem occurring while unpickling a task in 197 # the result_handler thread 198 self._check_crash(BrokenProcessPool, _return_instance, ExitAtUnpickle) 199 200 def test_shutdown_deadlock(self): 201 # Test that the pool calling shutdown do not cause deadlock 202 # if a worker fails after the shutdown call. 203 self.executor.shutdown(wait=True) 204 with self.executor_type(max_workers=2, 205 mp_context=self.get_context()) as executor: 206 self.executor = executor # Allow clean up in fail_on_deadlock 207 f = executor.submit(_crash, delay=.1) 208 executor.shutdown(wait=True) 209 with self.assertRaises(BrokenProcessPool): 210 f.result() 211 212 def test_shutdown_deadlock_pickle(self): 213 # Test that the pool calling shutdown with wait=False does not cause 214 # a deadlock if a task fails at pickle after the shutdown call. 215 # Reported in bpo-39104. 216 self.executor.shutdown(wait=True) 217 with self.executor_type(max_workers=2, 218 mp_context=self.get_context()) as executor: 219 self.executor = executor # Allow clean up in fail_on_deadlock 220 221 # Start the executor and get the executor_manager_thread to collect 222 # the threads and avoid dangling thread that should be cleaned up 223 # asynchronously. 224 executor.submit(id, 42).result() 225 executor_manager = executor._executor_manager_thread 226 227 # Submit a task that fails at pickle and shutdown the executor 228 # without waiting 229 f = executor.submit(id, ErrorAtPickle()) 230 executor.shutdown(wait=False) 231 with self.assertRaises(PicklingError): 232 f.result() 233 234 # Make sure the executor is eventually shutdown and do not leave 235 # dangling threads 236 executor_manager.join() 237 238 def test_crash_big_data(self): 239 # Test that there is a clean exception instad of a deadlock when a 240 # child process crashes while some data is being written into the 241 # queue. 242 # https://github.com/python/cpython/issues/94777 243 self.executor.shutdown(wait=True) 244 data = "a" * support.PIPE_MAX_SIZE 245 with self.executor_type(max_workers=2, 246 mp_context=self.get_context()) as executor: 247 self.executor = executor # Allow clean up in fail_on_deadlock 248 with self.assertRaises(BrokenProcessPool): 249 list(executor.map(_crash_with_data, [data] * 10)) 250 251 executor.shutdown(wait=True) 252 253 def test_gh105829_should_not_deadlock_if_wakeup_pipe_full(self): 254 # Issue #105829: The _ExecutorManagerThread wakeup pipe could 255 # fill up and block. See: https://github.com/python/cpython/issues/105829 256 257 # Lots of cargo culting while writing this test, apologies if 258 # something is really stupid... 259 260 self.executor.shutdown(wait=True) 261 262 if not hasattr(signal, 'alarm'): 263 raise unittest.SkipTest( 264 "Tested platform does not support the alarm signal") 265 266 def timeout(_signum, _frame): 267 import faulthandler 268 faulthandler.dump_traceback() 269 270 raise RuntimeError("timed out while submitting jobs?") 271 272 thread_run = futures.process._ExecutorManagerThread.run 273 def mock_run(self): 274 # Delay thread startup so the wakeup pipe can fill up and block 275 time.sleep(3) 276 thread_run(self) 277 278 class MockWakeup(_ThreadWakeup): 279 """Mock wakeup object to force the wakeup to block""" 280 def __init__(self): 281 super().__init__() 282 self._dummy_queue = queue.Queue(maxsize=1) 283 284 def wakeup(self): 285 self._dummy_queue.put(None, block=True) 286 super().wakeup() 287 288 def clear(self): 289 super().clear() 290 try: 291 while True: 292 self._dummy_queue.get_nowait() 293 except queue.Empty: 294 pass 295 296 with (unittest.mock.patch.object(futures.process._ExecutorManagerThread, 297 'run', mock_run), 298 unittest.mock.patch('concurrent.futures.process._ThreadWakeup', 299 MockWakeup)): 300 with self.executor_type(max_workers=2, 301 mp_context=self.get_context()) as executor: 302 self.executor = executor # Allow clean up in fail_on_deadlock 303 304 job_num = 100 305 job_data = range(job_num) 306 307 # Need to use sigalarm for timeout detection because 308 # Executor.submit is not guarded by any timeout (both 309 # self._work_ids.put(self._queue_count) and 310 # self._executor_manager_thread_wakeup.wakeup() might 311 # timeout, maybe more?). In this specific case it was 312 # the wakeup call that deadlocked on a blocking pipe. 313 old_handler = signal.signal(signal.SIGALRM, timeout) 314 try: 315 signal.alarm(int(self.TIMEOUT)) 316 self.assertEqual(job_num, len(list(executor.map(int, job_data)))) 317 finally: 318 signal.alarm(0) 319 signal.signal(signal.SIGALRM, old_handler) 320 321 322create_executor_tests(globals(), ExecutorDeadlockTest, 323 executor_mixins=(ProcessPoolForkMixin, 324 ProcessPoolForkserverMixin, 325 ProcessPoolSpawnMixin)) 326 327def setUpModule(): 328 setup_module() 329 330 331if __name__ == "__main__": 332 unittest.main() 333