• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import multiprocessing
2import sys
3import time
4import unittest
5from concurrent import futures
6from concurrent.futures._base import (
7    PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
8    )
9from concurrent.futures.process import _check_system_limits
10
11from test import support
12from test.support import threading_helper
13
14
15def create_future(state=PENDING, exception=None, result=None):
16    f = Future()
17    f._state = state
18    f._exception = exception
19    f._result = result
20    return f
21
22
23PENDING_FUTURE = create_future(state=PENDING)
24RUNNING_FUTURE = create_future(state=RUNNING)
25CANCELLED_FUTURE = create_future(state=CANCELLED)
26CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
27EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError())
28SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
29
30
31class BaseTestCase(unittest.TestCase):
32    def setUp(self):
33        self._thread_key = threading_helper.threading_setup()
34
35    def tearDown(self):
36        support.reap_children()
37        threading_helper.threading_cleanup(*self._thread_key)
38
39
40class ExecutorMixin:
41    worker_count = 5
42    executor_kwargs = {}
43
44    def setUp(self):
45        super().setUp()
46
47        self.t1 = time.monotonic()
48        if hasattr(self, "ctx"):
49            self.executor = self.executor_type(
50                max_workers=self.worker_count,
51                mp_context=self.get_context(),
52                **self.executor_kwargs)
53        else:
54            self.executor = self.executor_type(
55                max_workers=self.worker_count,
56                **self.executor_kwargs)
57
58    def tearDown(self):
59        self.executor.shutdown(wait=True)
60        self.executor = None
61
62        dt = time.monotonic() - self.t1
63        if support.verbose:
64            print("%.2fs" % dt, end=' ')
65        self.assertLess(dt, 300, "synchronization issue: test lasted too long")
66
67        super().tearDown()
68
69    def get_context(self):
70        return multiprocessing.get_context(self.ctx)
71
72
73class ThreadPoolMixin(ExecutorMixin):
74    executor_type = futures.ThreadPoolExecutor
75
76
77class ProcessPoolForkMixin(ExecutorMixin):
78    executor_type = futures.ProcessPoolExecutor
79    ctx = "fork"
80
81    def get_context(self):
82        try:
83            _check_system_limits()
84        except NotImplementedError:
85            self.skipTest("ProcessPoolExecutor unavailable on this system")
86        if sys.platform == "win32":
87            self.skipTest("require unix system")
88        if support.check_sanitizer(thread=True):
89            self.skipTest("TSAN doesn't support threads after fork")
90        return super().get_context()
91
92
93class ProcessPoolSpawnMixin(ExecutorMixin):
94    executor_type = futures.ProcessPoolExecutor
95    ctx = "spawn"
96
97    def get_context(self):
98        try:
99            _check_system_limits()
100        except NotImplementedError:
101            self.skipTest("ProcessPoolExecutor unavailable on this system")
102        return super().get_context()
103
104
105class ProcessPoolForkserverMixin(ExecutorMixin):
106    executor_type = futures.ProcessPoolExecutor
107    ctx = "forkserver"
108
109    def get_context(self):
110        try:
111            _check_system_limits()
112        except NotImplementedError:
113            self.skipTest("ProcessPoolExecutor unavailable on this system")
114        if sys.platform == "win32":
115            self.skipTest("require unix system")
116        if support.check_sanitizer(thread=True):
117            self.skipTest("TSAN doesn't support threads after fork")
118        return super().get_context()
119
120
121def create_executor_tests(remote_globals, mixin, bases=(BaseTestCase,),
122                          executor_mixins=(ThreadPoolMixin,
123                                           ProcessPoolForkMixin,
124                                           ProcessPoolForkserverMixin,
125                                           ProcessPoolSpawnMixin)):
126    def strip_mixin(name):
127        if name.endswith(('Mixin', 'Tests')):
128            return name[:-5]
129        elif name.endswith('Test'):
130            return name[:-4]
131        else:
132            return name
133
134    module = remote_globals['__name__']
135    for exe in executor_mixins:
136        name = ("%s%sTest"
137                % (strip_mixin(exe.__name__), strip_mixin(mixin.__name__)))
138        cls = type(name, (mixin,) + (exe,) + bases, {'__module__': module})
139        remote_globals[name] = cls
140
141
142def setup_module():
143    try:
144        _check_system_limits()
145    except NotImplementedError:
146        pass
147    else:
148        unittest.addModuleCleanup(multiprocessing.util._cleanup_tests)
149
150    thread_info = threading_helper.threading_setup()
151    unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
152