1import contextlib 2import logging 3import queue 4import time 5import unittest 6import sys 7import io 8from concurrent.futures._base import BrokenExecutor 9from concurrent.futures.process import _check_system_limits 10 11from logging.handlers import QueueHandler 12 13from test import support 14 15from .util import ExecutorMixin, create_executor_tests, setup_module 16 17 18INITIALIZER_STATUS = 'uninitialized' 19 20def init(x): 21 global INITIALIZER_STATUS 22 INITIALIZER_STATUS = x 23 24def get_init_status(): 25 return INITIALIZER_STATUS 26 27def init_fail(log_queue=None): 28 if log_queue is not None: 29 logger = logging.getLogger('concurrent.futures') 30 logger.addHandler(QueueHandler(log_queue)) 31 logger.setLevel('CRITICAL') 32 logger.propagate = False 33 time.sleep(0.1) # let some futures be scheduled 34 raise ValueError('error in initializer') 35 36 37class InitializerMixin(ExecutorMixin): 38 worker_count = 2 39 40 def setUp(self): 41 global INITIALIZER_STATUS 42 INITIALIZER_STATUS = 'uninitialized' 43 self.executor_kwargs = dict(initializer=init, 44 initargs=('initialized',)) 45 super().setUp() 46 47 def test_initializer(self): 48 futures = [self.executor.submit(get_init_status) 49 for _ in range(self.worker_count)] 50 51 for f in futures: 52 self.assertEqual(f.result(), 'initialized') 53 54 55class FailingInitializerMixin(ExecutorMixin): 56 worker_count = 2 57 58 def setUp(self): 59 if hasattr(self, "ctx"): 60 # Pass a queue to redirect the child's logging output 61 self.mp_context = self.get_context() 62 self.log_queue = self.mp_context.Queue() 63 self.executor_kwargs = dict(initializer=init_fail, 64 initargs=(self.log_queue,)) 65 else: 66 # In a thread pool, the child shares our logging setup 67 # (see _assert_logged()) 68 self.mp_context = None 69 self.log_queue = None 70 self.executor_kwargs = dict(initializer=init_fail) 71 super().setUp() 72 73 def test_initializer(self): 74 with self._assert_logged('ValueError: error in initializer'): 75 try: 76 future = self.executor.submit(get_init_status) 77 except BrokenExecutor: 78 # Perhaps the executor is already broken 79 pass 80 else: 81 with self.assertRaises(BrokenExecutor): 82 future.result() 83 84 # At some point, the executor should break 85 for _ in support.sleeping_retry(support.SHORT_TIMEOUT, 86 "executor not broken"): 87 if self.executor._broken: 88 break 89 90 # ... and from this point submit() is guaranteed to fail 91 with self.assertRaises(BrokenExecutor): 92 self.executor.submit(get_init_status) 93 94 @contextlib.contextmanager 95 def _assert_logged(self, msg): 96 if self.log_queue is not None: 97 yield 98 output = [] 99 try: 100 while True: 101 output.append(self.log_queue.get_nowait().getMessage()) 102 except queue.Empty: 103 pass 104 else: 105 with self.assertLogs('concurrent.futures', 'CRITICAL') as cm: 106 yield 107 output = cm.output 108 self.assertTrue(any(msg in line for line in output), 109 output) 110 111 112create_executor_tests(globals(), InitializerMixin) 113create_executor_tests(globals(), FailingInitializerMixin) 114 115 116@unittest.skipIf(sys.platform == "win32", "Resource Tracker doesn't run on Windows") 117class FailingInitializerResourcesTest(unittest.TestCase): 118 """ 119 Source: https://github.com/python/cpython/issues/104090 120 """ 121 122 def _test(self, test_class): 123 try: 124 _check_system_limits() 125 except NotImplementedError: 126 self.skipTest("ProcessPoolExecutor unavailable on this system") 127 128 runner = unittest.TextTestRunner(stream=io.StringIO()) 129 runner.run(test_class('test_initializer')) 130 131 # GH-104090: 132 # Stop resource tracker manually now, so we can verify there are not leaked resources by checking 133 # the process exit code 134 from multiprocessing.resource_tracker import _resource_tracker 135 _resource_tracker._stop() 136 137 self.assertEqual(_resource_tracker._exitcode, 0) 138 139 def test_spawn(self): 140 self._test(ProcessPoolSpawnFailingInitializerTest) 141 142 @support.skip_if_sanitizer("TSAN doesn't support threads after fork", thread=True) 143 def test_forkserver(self): 144 self._test(ProcessPoolForkserverFailingInitializerTest) 145 146 147def setUpModule(): 148 setup_module() 149 150 151if __name__ == "__main__": 152 unittest.main() 153