1import collections 2import faulthandler 3import json 4import os 5import queue 6import subprocess 7import sys 8import threading 9import time 10import traceback 11import types 12from test import support 13 14from test.libregrtest.runtest import ( 15 runtest, INTERRUPTED, CHILD_ERROR, PROGRESS_MIN_TIME, 16 format_test_result, TestResult, is_failed, TIMEOUT) 17from test.libregrtest.setup import setup_tests 18from test.libregrtest.utils import format_duration, print_warning 19 20 21# Display the running tests if nothing happened last N seconds 22PROGRESS_UPDATE = 30.0 # seconds 23assert PROGRESS_UPDATE >= PROGRESS_MIN_TIME 24 25# Kill the main process after 5 minutes. It is supposed to write an update 26# every PROGRESS_UPDATE seconds. Tolerate 5 minutes for Python slowest 27# buildbot workers. 28MAIN_PROCESS_TIMEOUT = 5 * 60.0 29assert MAIN_PROCESS_TIMEOUT >= PROGRESS_UPDATE 30 31# Time to wait until a worker completes: should be immediate 32JOIN_TIMEOUT = 30.0 # seconds 33 34 35def must_stop(result, ns): 36 if result.result == INTERRUPTED: 37 return True 38 if ns.failfast and is_failed(result, ns): 39 return True 40 return False 41 42 43def parse_worker_args(worker_args): 44 ns_dict, test_name = json.loads(worker_args) 45 ns = types.SimpleNamespace(**ns_dict) 46 return (ns, test_name) 47 48 49def run_test_in_subprocess(testname, ns): 50 ns_dict = vars(ns) 51 worker_args = (ns_dict, testname) 52 worker_args = json.dumps(worker_args) 53 54 cmd = [sys.executable, *support.args_from_interpreter_flags(), 55 '-u', # Unbuffered stdout and stderr 56 '-m', 'test.regrtest', 57 '--worker-args', worker_args] 58 59 # Running the child from the same working directory as regrtest's original 60 # invocation ensures that TEMPDIR for the child is the same when 61 # sysconfig.is_python_build() is true. See issue 15300. 62 return subprocess.Popen(cmd, 63 stdout=subprocess.PIPE, 64 stderr=subprocess.PIPE, 65 universal_newlines=True, 66 close_fds=(os.name != 'nt'), 67 cwd=support.SAVEDCWD) 68 69 70def run_tests_worker(ns, test_name): 71 setup_tests(ns) 72 73 result = runtest(ns, test_name) 74 75 print() # Force a newline (just in case) 76 77 # Serialize TestResult as list in JSON 78 print(json.dumps(list(result)), flush=True) 79 sys.exit(0) 80 81 82# We do not use a generator so multiple threads can call next(). 83class MultiprocessIterator: 84 85 """A thread-safe iterator over tests for multiprocess mode.""" 86 87 def __init__(self, tests_iter): 88 self.lock = threading.Lock() 89 self.tests_iter = tests_iter 90 91 def __iter__(self): 92 return self 93 94 def __next__(self): 95 with self.lock: 96 if self.tests_iter is None: 97 raise StopIteration 98 return next(self.tests_iter) 99 100 def stop(self): 101 with self.lock: 102 self.tests_iter = None 103 104 105MultiprocessResult = collections.namedtuple('MultiprocessResult', 106 'result stdout stderr error_msg') 107 108class ExitThread(Exception): 109 pass 110 111 112class TestWorkerProcess(threading.Thread): 113 def __init__(self, worker_id, runner): 114 super().__init__() 115 self.worker_id = worker_id 116 self.pending = runner.pending 117 self.output = runner.output 118 self.ns = runner.ns 119 self.timeout = runner.worker_timeout 120 self.regrtest = runner.regrtest 121 self.current_test_name = None 122 self.start_time = None 123 self._popen = None 124 self._killed = False 125 self._stopped = False 126 127 def __repr__(self): 128 info = [f'TestWorkerProcess #{self.worker_id}'] 129 if self.is_alive(): 130 info.append("running") 131 else: 132 info.append('stopped') 133 test = self.current_test_name 134 if test: 135 info.append(f'test={test}') 136 popen = self._popen 137 if popen is not None: 138 dt = time.monotonic() - self.start_time 139 info.extend((f'pid={self._popen.pid}', 140 f'time={format_duration(dt)}')) 141 return '<%s>' % ' '.join(info) 142 143 def _kill(self): 144 popen = self._popen 145 if popen is None: 146 return 147 148 if self._killed: 149 return 150 self._killed = True 151 152 print(f"Kill {self}", file=sys.stderr, flush=True) 153 try: 154 popen.kill() 155 except OSError as exc: 156 print_warning(f"Failed to kill {self}: {exc!r}") 157 158 def stop(self): 159 # Method called from a different thread to stop this thread 160 self._stopped = True 161 self._kill() 162 163 def mp_result_error(self, test_name, error_type, stdout='', stderr='', 164 err_msg=None): 165 test_time = time.monotonic() - self.start_time 166 result = TestResult(test_name, error_type, test_time, None) 167 return MultiprocessResult(result, stdout, stderr, err_msg) 168 169 def _run_process(self, test_name): 170 self.start_time = time.monotonic() 171 172 self.current_test_name = test_name 173 try: 174 popen = run_test_in_subprocess(test_name, self.ns) 175 176 self._killed = False 177 self._popen = popen 178 except: 179 self.current_test_name = None 180 raise 181 182 try: 183 if self._stopped: 184 # If kill() has been called before self._popen is set, 185 # self._popen is still running. Call again kill() 186 # to ensure that the process is killed. 187 self._kill() 188 raise ExitThread 189 190 try: 191 stdout, stderr = popen.communicate(timeout=self.timeout) 192 retcode = popen.returncode 193 assert retcode is not None 194 except subprocess.TimeoutExpired: 195 if self._stopped: 196 # kill() has been called: communicate() fails 197 # on reading closed stdout/stderr 198 raise ExitThread 199 200 # On timeout, kill the process 201 self._kill() 202 203 # None means TIMEOUT for the caller 204 retcode = None 205 # bpo-38207: Don't attempt to call communicate() again: on it 206 # can hang until all child processes using stdout and stderr 207 # pipes completes. 208 stdout = stderr = '' 209 except OSError: 210 if self._stopped: 211 # kill() has been called: communicate() fails 212 # on reading closed stdout/stderr 213 raise ExitThread 214 raise 215 else: 216 stdout = stdout.strip() 217 stderr = stderr.rstrip() 218 219 return (retcode, stdout, stderr) 220 except: 221 self._kill() 222 raise 223 finally: 224 self._wait_completed() 225 self._popen = None 226 self.current_test_name = None 227 228 def _runtest(self, test_name): 229 retcode, stdout, stderr = self._run_process(test_name) 230 231 if retcode is None: 232 return self.mp_result_error(test_name, TIMEOUT, stdout, stderr) 233 234 err_msg = None 235 if retcode != 0: 236 err_msg = "Exit code %s" % retcode 237 else: 238 stdout, _, result = stdout.rpartition("\n") 239 stdout = stdout.rstrip() 240 if not result: 241 err_msg = "Failed to parse worker stdout" 242 else: 243 try: 244 # deserialize run_tests_worker() output 245 result = json.loads(result) 246 result = TestResult(*result) 247 except Exception as exc: 248 err_msg = "Failed to parse worker JSON: %s" % exc 249 250 if err_msg is not None: 251 return self.mp_result_error(test_name, CHILD_ERROR, 252 stdout, stderr, err_msg) 253 254 return MultiprocessResult(result, stdout, stderr, err_msg) 255 256 def run(self): 257 while not self._stopped: 258 try: 259 try: 260 test_name = next(self.pending) 261 except StopIteration: 262 break 263 264 mp_result = self._runtest(test_name) 265 self.output.put((False, mp_result)) 266 267 if must_stop(mp_result.result, self.ns): 268 break 269 except ExitThread: 270 break 271 except BaseException: 272 self.output.put((True, traceback.format_exc())) 273 break 274 275 def _wait_completed(self): 276 popen = self._popen 277 278 # stdout and stderr must be closed to ensure that communicate() 279 # does not hang 280 popen.stdout.close() 281 popen.stderr.close() 282 283 try: 284 popen.wait(JOIN_TIMEOUT) 285 except (subprocess.TimeoutExpired, OSError) as exc: 286 print_warning(f"Failed to wait for {self} completion " 287 f"(timeout={format_duration(JOIN_TIMEOUT)}): " 288 f"{exc!r}") 289 290 def wait_stopped(self, start_time): 291 # bpo-38207: MultiprocessTestRunner.stop_workers() called self.stop() 292 # which killed the process. Sometimes, killing the process from the 293 # main thread does not interrupt popen.communicate() in 294 # TestWorkerProcess thread. This loop with a timeout is a workaround 295 # for that. 296 # 297 # Moreover, if this method fails to join the thread, it is likely 298 # that Python will hang at exit while calling threading._shutdown() 299 # which tries again to join the blocked thread. Regrtest.main() 300 # uses EXIT_TIMEOUT to workaround this second bug. 301 while True: 302 # Write a message every second 303 self.join(1.0) 304 if not self.is_alive(): 305 break 306 dt = time.monotonic() - start_time 307 self.regrtest.log(f"Waiting for {self} thread " 308 f"for {format_duration(dt)}") 309 if dt > JOIN_TIMEOUT: 310 print_warning(f"Failed to join {self} in {format_duration(dt)}") 311 break 312 313 314def get_running(workers): 315 running = [] 316 for worker in workers: 317 current_test_name = worker.current_test_name 318 if not current_test_name: 319 continue 320 dt = time.monotonic() - worker.start_time 321 if dt >= PROGRESS_MIN_TIME: 322 text = '%s (%s)' % (current_test_name, format_duration(dt)) 323 running.append(text) 324 return running 325 326 327class MultiprocessTestRunner: 328 def __init__(self, regrtest): 329 self.regrtest = regrtest 330 self.log = self.regrtest.log 331 self.ns = regrtest.ns 332 self.output = queue.Queue() 333 self.pending = MultiprocessIterator(self.regrtest.tests) 334 if self.ns.timeout is not None: 335 self.worker_timeout = self.ns.timeout * 1.5 336 else: 337 self.worker_timeout = None 338 self.workers = None 339 340 def start_workers(self): 341 self.workers = [TestWorkerProcess(index, self) 342 for index in range(1, self.ns.use_mp + 1)] 343 self.log("Run tests in parallel using %s child processes" 344 % len(self.workers)) 345 for worker in self.workers: 346 worker.start() 347 348 def stop_workers(self): 349 start_time = time.monotonic() 350 for worker in self.workers: 351 worker.stop() 352 for worker in self.workers: 353 worker.wait_stopped(start_time) 354 355 def _get_result(self): 356 if not any(worker.is_alive() for worker in self.workers): 357 # all worker threads are done: consume pending results 358 try: 359 return self.output.get(timeout=0) 360 except queue.Empty: 361 return None 362 363 use_faulthandler = (self.ns.timeout is not None) 364 timeout = PROGRESS_UPDATE 365 while True: 366 if use_faulthandler: 367 faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT, 368 exit=True) 369 370 # wait for a thread 371 try: 372 return self.output.get(timeout=timeout) 373 except queue.Empty: 374 pass 375 376 # display progress 377 running = get_running(self.workers) 378 if running and not self.ns.pgo: 379 self.log('running: %s' % ', '.join(running)) 380 381 def display_result(self, mp_result): 382 result = mp_result.result 383 384 text = format_test_result(result) 385 if mp_result.error_msg is not None: 386 # CHILD_ERROR 387 text += ' (%s)' % mp_result.error_msg 388 elif (result.test_time >= PROGRESS_MIN_TIME and not self.ns.pgo): 389 text += ' (%s)' % format_duration(result.test_time) 390 running = get_running(self.workers) 391 if running and not self.ns.pgo: 392 text += ' -- running: %s' % ', '.join(running) 393 self.regrtest.display_progress(self.test_index, text) 394 395 def _process_result(self, item): 396 if item[0]: 397 # Thread got an exception 398 format_exc = item[1] 399 print_warning(f"regrtest worker thread failed: {format_exc}") 400 return True 401 402 self.test_index += 1 403 mp_result = item[1] 404 self.regrtest.accumulate_result(mp_result.result) 405 self.display_result(mp_result) 406 407 if mp_result.stdout: 408 print(mp_result.stdout, flush=True) 409 if mp_result.stderr and not self.ns.pgo: 410 print(mp_result.stderr, file=sys.stderr, flush=True) 411 412 if must_stop(mp_result.result, self.ns): 413 return True 414 415 return False 416 417 def run_tests(self): 418 self.start_workers() 419 420 self.test_index = 0 421 try: 422 while True: 423 item = self._get_result() 424 if item is None: 425 break 426 427 stop = self._process_result(item) 428 if stop: 429 break 430 except KeyboardInterrupt: 431 print() 432 self.regrtest.interrupted = True 433 finally: 434 if self.ns.timeout is not None: 435 faulthandler.cancel_dump_traceback_later() 436 437 # Always ensure that all worker processes are no longer 438 # worker when we exit this function 439 self.pending.stop() 440 self.stop_workers() 441 442 443def run_tests_multiprocess(regrtest): 444 MultiprocessTestRunner(regrtest).run_tests() 445