1import multiprocessing 2import sys 3import time 4import unittest 5from concurrent import futures 6from concurrent.futures._base import ( 7 PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future, 8 ) 9from concurrent.futures.process import _check_system_limits 10 11from test import support 12from test.support import threading_helper 13 14 15def create_future(state=PENDING, exception=None, result=None): 16 f = Future() 17 f._state = state 18 f._exception = exception 19 f._result = result 20 return f 21 22 23PENDING_FUTURE = create_future(state=PENDING) 24RUNNING_FUTURE = create_future(state=RUNNING) 25CANCELLED_FUTURE = create_future(state=CANCELLED) 26CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) 27EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError()) 28SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) 29 30 31class BaseTestCase(unittest.TestCase): 32 def setUp(self): 33 self._thread_key = threading_helper.threading_setup() 34 35 def tearDown(self): 36 support.reap_children() 37 threading_helper.threading_cleanup(*self._thread_key) 38 39 40class ExecutorMixin: 41 worker_count = 5 42 executor_kwargs = {} 43 44 def setUp(self): 45 super().setUp() 46 47 self.t1 = time.monotonic() 48 if hasattr(self, "ctx"): 49 self.executor = self.executor_type( 50 max_workers=self.worker_count, 51 mp_context=self.get_context(), 52 **self.executor_kwargs) 53 else: 54 self.executor = self.executor_type( 55 max_workers=self.worker_count, 56 **self.executor_kwargs) 57 58 def tearDown(self): 59 self.executor.shutdown(wait=True) 60 self.executor = None 61 62 dt = time.monotonic() - self.t1 63 if support.verbose: 64 print("%.2fs" % dt, end=' ') 65 self.assertLess(dt, 300, "synchronization issue: test lasted too long") 66 67 super().tearDown() 68 69 def get_context(self): 70 return multiprocessing.get_context(self.ctx) 71 72 73class ThreadPoolMixin(ExecutorMixin): 74 executor_type = futures.ThreadPoolExecutor 75 76 77class ProcessPoolForkMixin(ExecutorMixin): 78 executor_type = futures.ProcessPoolExecutor 79 ctx = "fork" 80 81 def get_context(self): 82 try: 83 _check_system_limits() 84 except NotImplementedError: 85 self.skipTest("ProcessPoolExecutor unavailable on this system") 86 if sys.platform == "win32": 87 self.skipTest("require unix system") 88 if support.check_sanitizer(thread=True): 89 self.skipTest("TSAN doesn't support threads after fork") 90 return super().get_context() 91 92 93class ProcessPoolSpawnMixin(ExecutorMixin): 94 executor_type = futures.ProcessPoolExecutor 95 ctx = "spawn" 96 97 def get_context(self): 98 try: 99 _check_system_limits() 100 except NotImplementedError: 101 self.skipTest("ProcessPoolExecutor unavailable on this system") 102 return super().get_context() 103 104 105class ProcessPoolForkserverMixin(ExecutorMixin): 106 executor_type = futures.ProcessPoolExecutor 107 ctx = "forkserver" 108 109 def get_context(self): 110 try: 111 _check_system_limits() 112 except NotImplementedError: 113 self.skipTest("ProcessPoolExecutor unavailable on this system") 114 if sys.platform == "win32": 115 self.skipTest("require unix system") 116 if support.check_sanitizer(thread=True): 117 self.skipTest("TSAN doesn't support threads after fork") 118 return super().get_context() 119 120 121def create_executor_tests(remote_globals, mixin, bases=(BaseTestCase,), 122 executor_mixins=(ThreadPoolMixin, 123 ProcessPoolForkMixin, 124 ProcessPoolForkserverMixin, 125 ProcessPoolSpawnMixin)): 126 def strip_mixin(name): 127 if name.endswith(('Mixin', 'Tests')): 128 return name[:-5] 129 elif name.endswith('Test'): 130 return name[:-4] 131 else: 132 return name 133 134 module = remote_globals['__name__'] 135 for exe in executor_mixins: 136 name = ("%s%sTest" 137 % (strip_mixin(exe.__name__), strip_mixin(mixin.__name__))) 138 cls = type(name, (mixin,) + (exe,) + bases, {'__module__': module}) 139 remote_globals[name] = cls 140 141 142def setup_module(): 143 try: 144 _check_system_limits() 145 except NotImplementedError: 146 pass 147 else: 148 unittest.addModuleCleanup(multiprocessing.util._cleanup_tests) 149 150 thread_info = threading_helper.threading_setup() 151 unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info) 152