• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import contextlib
2import logging
3import queue
4import time
5import unittest
6import sys
7import io
8from concurrent.futures._base import BrokenExecutor
9from concurrent.futures.process import _check_system_limits
10
11from logging.handlers import QueueHandler
12
13from test import support
14
15from .util import ExecutorMixin, create_executor_tests, setup_module
16
17
18INITIALIZER_STATUS = 'uninitialized'
19
20def init(x):
21    global INITIALIZER_STATUS
22    INITIALIZER_STATUS = x
23
24def get_init_status():
25    return INITIALIZER_STATUS
26
27def init_fail(log_queue=None):
28    if log_queue is not None:
29        logger = logging.getLogger('concurrent.futures')
30        logger.addHandler(QueueHandler(log_queue))
31        logger.setLevel('CRITICAL')
32        logger.propagate = False
33    time.sleep(0.1)  # let some futures be scheduled
34    raise ValueError('error in initializer')
35
36
37class InitializerMixin(ExecutorMixin):
38    worker_count = 2
39
40    def setUp(self):
41        global INITIALIZER_STATUS
42        INITIALIZER_STATUS = 'uninitialized'
43        self.executor_kwargs = dict(initializer=init,
44                                    initargs=('initialized',))
45        super().setUp()
46
47    def test_initializer(self):
48        futures = [self.executor.submit(get_init_status)
49                   for _ in range(self.worker_count)]
50
51        for f in futures:
52            self.assertEqual(f.result(), 'initialized')
53
54
55class FailingInitializerMixin(ExecutorMixin):
56    worker_count = 2
57
58    def setUp(self):
59        if hasattr(self, "ctx"):
60            # Pass a queue to redirect the child's logging output
61            self.mp_context = self.get_context()
62            self.log_queue = self.mp_context.Queue()
63            self.executor_kwargs = dict(initializer=init_fail,
64                                        initargs=(self.log_queue,))
65        else:
66            # In a thread pool, the child shares our logging setup
67            # (see _assert_logged())
68            self.mp_context = None
69            self.log_queue = None
70            self.executor_kwargs = dict(initializer=init_fail)
71        super().setUp()
72
73    def test_initializer(self):
74        with self._assert_logged('ValueError: error in initializer'):
75            try:
76                future = self.executor.submit(get_init_status)
77            except BrokenExecutor:
78                # Perhaps the executor is already broken
79                pass
80            else:
81                with self.assertRaises(BrokenExecutor):
82                    future.result()
83
84            # At some point, the executor should break
85            for _ in support.sleeping_retry(support.SHORT_TIMEOUT,
86                                            "executor not broken"):
87                if self.executor._broken:
88                    break
89
90            # ... and from this point submit() is guaranteed to fail
91            with self.assertRaises(BrokenExecutor):
92                self.executor.submit(get_init_status)
93
94    @contextlib.contextmanager
95    def _assert_logged(self, msg):
96        if self.log_queue is not None:
97            yield
98            output = []
99            try:
100                while True:
101                    output.append(self.log_queue.get_nowait().getMessage())
102            except queue.Empty:
103                pass
104        else:
105            with self.assertLogs('concurrent.futures', 'CRITICAL') as cm:
106                yield
107            output = cm.output
108        self.assertTrue(any(msg in line for line in output),
109                        output)
110
111
112create_executor_tests(globals(), InitializerMixin)
113create_executor_tests(globals(), FailingInitializerMixin)
114
115
116@unittest.skipIf(sys.platform == "win32", "Resource Tracker doesn't run on Windows")
117class FailingInitializerResourcesTest(unittest.TestCase):
118    """
119    Source: https://github.com/python/cpython/issues/104090
120    """
121
122    def _test(self, test_class):
123        try:
124            _check_system_limits()
125        except NotImplementedError:
126            self.skipTest("ProcessPoolExecutor unavailable on this system")
127
128        runner = unittest.TextTestRunner(stream=io.StringIO())
129        runner.run(test_class('test_initializer'))
130
131        # GH-104090:
132        # Stop resource tracker manually now, so we can verify there are not leaked resources by checking
133        # the process exit code
134        from multiprocessing.resource_tracker import _resource_tracker
135        _resource_tracker._stop()
136
137        self.assertEqual(_resource_tracker._exitcode, 0)
138
139    def test_spawn(self):
140        self._test(ProcessPoolSpawnFailingInitializerTest)
141
142    @support.skip_if_sanitizer("TSAN doesn't support threads after fork", thread=True)
143    def test_forkserver(self):
144        self._test(ProcessPoolForkserverFailingInitializerTest)
145
146
147def setUpModule():
148    setup_module()
149
150
151if __name__ == "__main__":
152    unittest.main()
153