1import _thread 2import contextlib 3import functools 4import sys 5import threading 6import time 7 8from test import support 9 10 11#======================================================================= 12# Threading support to prevent reporting refleaks when running regrtest.py -R 13 14# NOTE: we use thread._count() rather than threading.enumerate() (or the 15# moral equivalent thereof) because a threading.Thread object is still alive 16# until its __bootstrap() method has returned, even after it has been 17# unregistered from the threading module. 18# thread._count(), on the other hand, only gets decremented *after* the 19# __bootstrap() method has returned, which gives us reliable reference counts 20# at the end of a test run. 21 22 23def threading_setup(): 24 return _thread._count(), threading._dangling.copy() 25 26 27def threading_cleanup(*original_values): 28 _MAX_COUNT = 100 29 30 for count in range(_MAX_COUNT): 31 values = _thread._count(), threading._dangling 32 if values == original_values: 33 break 34 35 if not count: 36 # Display a warning at the first iteration 37 support.environment_altered = True 38 dangling_threads = values[1] 39 support.print_warning(f"threading_cleanup() failed to cleanup " 40 f"{values[0] - original_values[0]} threads " 41 f"(count: {values[0]}, " 42 f"dangling: {len(dangling_threads)})") 43 for thread in dangling_threads: 44 support.print_warning(f"Dangling thread: {thread!r}") 45 46 # Don't hold references to threads 47 dangling_threads = None 48 values = None 49 50 time.sleep(0.01) 51 support.gc_collect() 52 53 54def reap_threads(func): 55 """Use this function when threads are being used. This will 56 ensure that the threads are cleaned up even when the test fails. 57 """ 58 @functools.wraps(func) 59 def decorator(*args): 60 key = threading_setup() 61 try: 62 return func(*args) 63 finally: 64 threading_cleanup(*key) 65 return decorator 66 67 68@contextlib.contextmanager 69def wait_threads_exit(timeout=None): 70 """ 71 bpo-31234: Context manager to wait until all threads created in the with 72 statement exit. 73 74 Use _thread.count() to check if threads exited. Indirectly, wait until 75 threads exit the internal t_bootstrap() C function of the _thread module. 76 77 threading_setup() and threading_cleanup() are designed to emit a warning 78 if a test leaves running threads in the background. This context manager 79 is designed to cleanup threads started by the _thread.start_new_thread() 80 which doesn't allow to wait for thread exit, whereas thread.Thread has a 81 join() method. 82 """ 83 if timeout is None: 84 timeout = support.SHORT_TIMEOUT 85 old_count = _thread._count() 86 try: 87 yield 88 finally: 89 start_time = time.monotonic() 90 deadline = start_time + timeout 91 while True: 92 count = _thread._count() 93 if count <= old_count: 94 break 95 if time.monotonic() > deadline: 96 dt = time.monotonic() - start_time 97 msg = (f"wait_threads() failed to cleanup {count - old_count} " 98 f"threads after {dt:.1f} seconds " 99 f"(count: {count}, old count: {old_count})") 100 raise AssertionError(msg) 101 time.sleep(0.010) 102 support.gc_collect() 103 104 105def join_thread(thread, timeout=None): 106 """Join a thread. Raise an AssertionError if the thread is still alive 107 after timeout seconds. 108 """ 109 if timeout is None: 110 timeout = support.SHORT_TIMEOUT 111 thread.join(timeout) 112 if thread.is_alive(): 113 msg = f"failed to join the thread in {timeout:.1f} seconds" 114 raise AssertionError(msg) 115 116 117@contextlib.contextmanager 118def start_threads(threads, unlock=None): 119 import faulthandler 120 threads = list(threads) 121 started = [] 122 try: 123 try: 124 for t in threads: 125 t.start() 126 started.append(t) 127 except: 128 if support.verbose: 129 print("Can't start %d threads, only %d threads started" % 130 (len(threads), len(started))) 131 raise 132 yield 133 finally: 134 try: 135 if unlock: 136 unlock() 137 endtime = time.monotonic() 138 for timeout in range(1, 16): 139 endtime += 60 140 for t in started: 141 t.join(max(endtime - time.monotonic(), 0.01)) 142 started = [t for t in started if t.is_alive()] 143 if not started: 144 break 145 if support.verbose: 146 print('Unable to join %d threads during a period of ' 147 '%d minutes' % (len(started), timeout)) 148 finally: 149 started = [t for t in started if t.is_alive()] 150 if started: 151 faulthandler.dump_traceback(sys.stdout) 152 raise AssertionError('Unable to join %d threads' % len(started)) 153 154 155class catch_threading_exception: 156 """ 157 Context manager catching threading.Thread exception using 158 threading.excepthook. 159 160 Attributes set when an exception is caught: 161 162 * exc_type 163 * exc_value 164 * exc_traceback 165 * thread 166 167 See threading.excepthook() documentation for these attributes. 168 169 These attributes are deleted at the context manager exit. 170 171 Usage: 172 173 with threading_helper.catch_threading_exception() as cm: 174 # code spawning a thread which raises an exception 175 ... 176 177 # check the thread exception, use cm attributes: 178 # exc_type, exc_value, exc_traceback, thread 179 ... 180 181 # exc_type, exc_value, exc_traceback, thread attributes of cm no longer 182 # exists at this point 183 # (to avoid reference cycles) 184 """ 185 186 def __init__(self): 187 self.exc_type = None 188 self.exc_value = None 189 self.exc_traceback = None 190 self.thread = None 191 self._old_hook = None 192 193 def _hook(self, args): 194 self.exc_type = args.exc_type 195 self.exc_value = args.exc_value 196 self.exc_traceback = args.exc_traceback 197 self.thread = args.thread 198 199 def __enter__(self): 200 self._old_hook = threading.excepthook 201 threading.excepthook = self._hook 202 return self 203 204 def __exit__(self, *exc_info): 205 threading.excepthook = self._old_hook 206 del self.exc_type 207 del self.exc_value 208 del self.exc_traceback 209 del self.thread 210