1import faulthandler 2import json 3import os 4import queue 5import sys 6import threading 7import time 8import traceback 9import types 10from test import support 11 12from test.libregrtest.runtest import ( 13 runtest, INTERRUPTED, CHILD_ERROR, PROGRESS_MIN_TIME, 14 format_test_result) 15from test.libregrtest.setup import setup_tests 16from test.libregrtest.utils import format_duration 17 18 19# Display the running tests if nothing happened last N seconds 20PROGRESS_UPDATE = 30.0 # seconds 21 22# If interrupted, display the wait progress every N seconds 23WAIT_PROGRESS = 2.0 # seconds 24 25 26def run_test_in_subprocess(testname, ns): 27 """Run the given test in a subprocess with --worker-args. 28 29 ns is the option Namespace parsed from command-line arguments. regrtest 30 is invoked in a subprocess with the --worker-args argument; when the 31 subprocess exits, its return code, stdout and stderr are returned as a 32 3-tuple. 33 """ 34 from subprocess import Popen, PIPE 35 36 ns_dict = vars(ns) 37 worker_args = (ns_dict, testname) 38 worker_args = json.dumps(worker_args) 39 40 cmd = [sys.executable, *support.args_from_interpreter_flags(), 41 '-u', # Unbuffered stdout and stderr 42 '-m', 'test.regrtest', 43 '--worker-args', worker_args] 44 if ns.pgo: 45 cmd += ['--pgo'] 46 47 # Running the child from the same working directory as regrtest's original 48 # invocation ensures that TEMPDIR for the child is the same when 49 # sysconfig.is_python_build() is true. See issue 15300. 50 popen = Popen(cmd, 51 stdout=PIPE, stderr=PIPE, 52 universal_newlines=True, 53 close_fds=(os.name != 'nt'), 54 cwd=support.SAVEDCWD) 55 with popen: 56 stdout, stderr = popen.communicate() 57 retcode = popen.wait() 58 return retcode, stdout, stderr 59 60 61def run_tests_worker(worker_args): 62 ns_dict, testname = json.loads(worker_args) 63 ns = types.SimpleNamespace(**ns_dict) 64 65 setup_tests(ns) 66 67 try: 68 result = runtest(ns, testname) 69 except KeyboardInterrupt: 70 result = INTERRUPTED, '', None 71 except BaseException as e: 72 traceback.print_exc() 73 result = CHILD_ERROR, str(e) 74 75 print() # Force a newline (just in case) 76 print(json.dumps(result), flush=True) 77 sys.exit(0) 78 79 80# We do not use a generator so multiple threads can call next(). 81class MultiprocessIterator: 82 83 """A thread-safe iterator over tests for multiprocess mode.""" 84 85 def __init__(self, tests): 86 self.interrupted = False 87 self.lock = threading.Lock() 88 self.tests = tests 89 90 def __iter__(self): 91 return self 92 93 def __next__(self): 94 with self.lock: 95 if self.interrupted: 96 raise StopIteration('tests interrupted') 97 return next(self.tests) 98 99 100class MultiprocessThread(threading.Thread): 101 def __init__(self, pending, output, ns): 102 super().__init__() 103 self.pending = pending 104 self.output = output 105 self.ns = ns 106 self.current_test = None 107 self.start_time = None 108 109 def _runtest(self): 110 try: 111 test = next(self.pending) 112 except StopIteration: 113 self.output.put((None, None, None, None)) 114 return True 115 116 try: 117 self.start_time = time.monotonic() 118 self.current_test = test 119 120 retcode, stdout, stderr = run_test_in_subprocess(test, self.ns) 121 finally: 122 self.current_test = None 123 124 if retcode != 0: 125 result = (CHILD_ERROR, "Exit code %s" % retcode, None) 126 self.output.put((test, stdout.rstrip(), stderr.rstrip(), 127 result)) 128 return False 129 130 stdout, _, result = stdout.strip().rpartition("\n") 131 if not result: 132 self.output.put((None, None, None, None)) 133 return True 134 135 result = json.loads(result) 136 assert len(result) == 3, f"Invalid result tuple: {result!r}" 137 self.output.put((test, stdout.rstrip(), stderr.rstrip(), 138 result)) 139 return False 140 141 def run(self): 142 try: 143 stop = False 144 while not stop: 145 stop = self._runtest() 146 except BaseException: 147 self.output.put((None, None, None, None)) 148 raise 149 150 151def run_tests_multiprocess(regrtest): 152 output = queue.Queue() 153 pending = MultiprocessIterator(regrtest.tests) 154 test_timeout = regrtest.ns.timeout 155 use_timeout = (test_timeout is not None) 156 157 workers = [MultiprocessThread(pending, output, regrtest.ns) 158 for i in range(regrtest.ns.use_mp)] 159 print("Run tests in parallel using %s child processes" 160 % len(workers)) 161 for worker in workers: 162 worker.start() 163 164 def get_running(workers): 165 running = [] 166 for worker in workers: 167 current_test = worker.current_test 168 if not current_test: 169 continue 170 dt = time.monotonic() - worker.start_time 171 if dt >= PROGRESS_MIN_TIME: 172 text = '%s (%s)' % (current_test, format_duration(dt)) 173 running.append(text) 174 return running 175 176 finished = 0 177 test_index = 1 178 get_timeout = max(PROGRESS_UPDATE, PROGRESS_MIN_TIME) 179 try: 180 while finished < regrtest.ns.use_mp: 181 if use_timeout: 182 faulthandler.dump_traceback_later(test_timeout, exit=True) 183 184 try: 185 item = output.get(timeout=get_timeout) 186 except queue.Empty: 187 running = get_running(workers) 188 if running and not regrtest.ns.pgo: 189 print('running: %s' % ', '.join(running), flush=True) 190 continue 191 192 test, stdout, stderr, result = item 193 if test is None: 194 finished += 1 195 continue 196 regrtest.accumulate_result(test, result) 197 198 # Display progress 199 ok, test_time, xml_data = result 200 text = format_test_result(test, ok) 201 if (ok not in (CHILD_ERROR, INTERRUPTED) 202 and test_time >= PROGRESS_MIN_TIME 203 and not regrtest.ns.pgo): 204 text += ' (%s)' % format_duration(test_time) 205 elif ok == CHILD_ERROR: 206 text = '%s (%s)' % (text, test_time) 207 running = get_running(workers) 208 if running and not regrtest.ns.pgo: 209 text += ' -- running: %s' % ', '.join(running) 210 regrtest.display_progress(test_index, text) 211 212 # Copy stdout and stderr from the child process 213 if stdout: 214 print(stdout, flush=True) 215 if stderr and not regrtest.ns.pgo: 216 print(stderr, file=sys.stderr, flush=True) 217 218 if result[0] == INTERRUPTED: 219 raise KeyboardInterrupt 220 test_index += 1 221 except KeyboardInterrupt: 222 regrtest.interrupted = True 223 pending.interrupted = True 224 print() 225 finally: 226 if use_timeout: 227 faulthandler.cancel_dump_traceback_later() 228 229 # If tests are interrupted, wait until tests complete 230 wait_start = time.monotonic() 231 while True: 232 running = [worker.current_test for worker in workers] 233 running = list(filter(bool, running)) 234 if not running: 235 break 236 237 dt = time.monotonic() - wait_start 238 line = "Waiting for %s (%s tests)" % (', '.join(running), len(running)) 239 if dt >= WAIT_PROGRESS: 240 line = "%s since %.0f sec" % (line, dt) 241 print(line, flush=True) 242 for worker in workers: 243 worker.join(WAIT_PROGRESS) 244