1import collections 2import faulthandler 3import json 4import os 5import queue 6import signal 7import subprocess 8import sys 9import threading 10import time 11import traceback 12from typing import NamedTuple, NoReturn, Literal, Any 13 14from test import support 15from test.support import os_helper 16 17from test.libregrtest.cmdline import Namespace 18from test.libregrtest.main import Regrtest 19from test.libregrtest.runtest import ( 20 runtest, is_failed, TestResult, Interrupted, Timeout, ChildError, PROGRESS_MIN_TIME) 21from test.libregrtest.setup import setup_tests 22from test.libregrtest.utils import format_duration, print_warning 23 24 25# Display the running tests if nothing happened last N seconds 26PROGRESS_UPDATE = 30.0 # seconds 27assert PROGRESS_UPDATE >= PROGRESS_MIN_TIME 28 29# Kill the main process after 5 minutes. It is supposed to write an update 30# every PROGRESS_UPDATE seconds. Tolerate 5 minutes for Python slowest 31# buildbot workers. 32MAIN_PROCESS_TIMEOUT = 5 * 60.0 33assert MAIN_PROCESS_TIMEOUT >= PROGRESS_UPDATE 34 35# Time to wait until a worker completes: should be immediate 36JOIN_TIMEOUT = 30.0 # seconds 37 38USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg")) 39 40 41def must_stop(result: TestResult, ns: Namespace) -> bool: 42 if isinstance(result, Interrupted): 43 return True 44 if ns.failfast and is_failed(result, ns): 45 return True 46 return False 47 48 49def parse_worker_args(worker_args) -> tuple[Namespace, str]: 50 ns_dict, test_name = json.loads(worker_args) 51 ns = Namespace(**ns_dict) 52 return (ns, test_name) 53 54 55def run_test_in_subprocess(testname: str, ns: Namespace) -> subprocess.Popen: 56 ns_dict = vars(ns) 57 worker_args = (ns_dict, testname) 58 worker_args = json.dumps(worker_args) 59 60 cmd = [sys.executable, *support.args_from_interpreter_flags(), 61 '-u', # Unbuffered stdout and stderr 62 '-m', 'test.regrtest', 63 '--worker-args', worker_args] 64 65 # Running the child from the same working directory as regrtest's original 66 # invocation ensures that TEMPDIR for the child is the same when 67 # sysconfig.is_python_build() is true. See issue 15300. 68 kw = {} 69 if USE_PROCESS_GROUP: 70 kw['start_new_session'] = True 71 return subprocess.Popen(cmd, 72 stdout=subprocess.PIPE, 73 stderr=subprocess.PIPE, 74 universal_newlines=True, 75 close_fds=(os.name != 'nt'), 76 cwd=os_helper.SAVEDCWD, 77 **kw) 78 79 80def run_tests_worker(ns: Namespace, test_name: str) -> NoReturn: 81 setup_tests(ns) 82 83 result = runtest(ns, test_name) 84 85 print() # Force a newline (just in case) 86 87 # Serialize TestResult as dict in JSON 88 print(json.dumps(result, cls=EncodeTestResult), flush=True) 89 sys.exit(0) 90 91 92# We do not use a generator so multiple threads can call next(). 93class MultiprocessIterator: 94 95 """A thread-safe iterator over tests for multiprocess mode.""" 96 97 def __init__(self, tests_iter): 98 self.lock = threading.Lock() 99 self.tests_iter = tests_iter 100 101 def __iter__(self): 102 return self 103 104 def __next__(self): 105 with self.lock: 106 if self.tests_iter is None: 107 raise StopIteration 108 return next(self.tests_iter) 109 110 def stop(self): 111 with self.lock: 112 self.tests_iter = None 113 114 115class MultiprocessResult(NamedTuple): 116 result: TestResult 117 stdout: str 118 stderr: str 119 error_msg: str 120 121 122ExcStr = str 123QueueOutput = tuple[Literal[False], MultiprocessResult] | tuple[Literal[True], ExcStr] 124 125 126class ExitThread(Exception): 127 pass 128 129 130class TestWorkerProcess(threading.Thread): 131 def __init__(self, worker_id: int, runner: "MultiprocessTestRunner") -> None: 132 super().__init__() 133 self.worker_id = worker_id 134 self.pending = runner.pending 135 self.output = runner.output 136 self.ns = runner.ns 137 self.timeout = runner.worker_timeout 138 self.regrtest = runner.regrtest 139 self.current_test_name = None 140 self.start_time = None 141 self._popen = None 142 self._killed = False 143 self._stopped = False 144 145 def __repr__(self) -> str: 146 info = [f'TestWorkerProcess #{self.worker_id}'] 147 if self.is_alive(): 148 info.append("running") 149 else: 150 info.append('stopped') 151 test = self.current_test_name 152 if test: 153 info.append(f'test={test}') 154 popen = self._popen 155 if popen is not None: 156 dt = time.monotonic() - self.start_time 157 info.extend((f'pid={self._popen.pid}', 158 f'time={format_duration(dt)}')) 159 return '<%s>' % ' '.join(info) 160 161 def _kill(self) -> None: 162 popen = self._popen 163 if popen is None: 164 return 165 166 if self._killed: 167 return 168 self._killed = True 169 170 if USE_PROCESS_GROUP: 171 what = f"{self} process group" 172 else: 173 what = f"{self}" 174 175 print(f"Kill {what}", file=sys.stderr, flush=True) 176 try: 177 if USE_PROCESS_GROUP: 178 os.killpg(popen.pid, signal.SIGKILL) 179 else: 180 popen.kill() 181 except ProcessLookupError: 182 # popen.kill(): the process completed, the TestWorkerProcess thread 183 # read its exit status, but Popen.send_signal() read the returncode 184 # just before Popen.wait() set returncode. 185 pass 186 except OSError as exc: 187 print_warning(f"Failed to kill {what}: {exc!r}") 188 189 def stop(self) -> None: 190 # Method called from a different thread to stop this thread 191 self._stopped = True 192 self._kill() 193 194 def mp_result_error( 195 self, 196 test_result: TestResult, 197 stdout: str = '', 198 stderr: str = '', 199 err_msg=None 200 ) -> MultiprocessResult: 201 test_result.duration_sec = time.monotonic() - self.start_time 202 return MultiprocessResult(test_result, stdout, stderr, err_msg) 203 204 def _run_process(self, test_name: str) -> tuple[int, str, str]: 205 self.start_time = time.monotonic() 206 207 self.current_test_name = test_name 208 try: 209 popen = run_test_in_subprocess(test_name, self.ns) 210 211 self._killed = False 212 self._popen = popen 213 except: 214 self.current_test_name = None 215 raise 216 217 try: 218 if self._stopped: 219 # If kill() has been called before self._popen is set, 220 # self._popen is still running. Call again kill() 221 # to ensure that the process is killed. 222 self._kill() 223 raise ExitThread 224 225 try: 226 stdout, stderr = popen.communicate(timeout=self.timeout) 227 retcode = popen.returncode 228 assert retcode is not None 229 except subprocess.TimeoutExpired: 230 if self._stopped: 231 # kill() has been called: communicate() fails 232 # on reading closed stdout/stderr 233 raise ExitThread 234 235 # On timeout, kill the process 236 self._kill() 237 238 # None means TIMEOUT for the caller 239 retcode = None 240 # bpo-38207: Don't attempt to call communicate() again: on it 241 # can hang until all child processes using stdout and stderr 242 # pipes completes. 243 stdout = stderr = '' 244 except OSError: 245 if self._stopped: 246 # kill() has been called: communicate() fails 247 # on reading closed stdout/stderr 248 raise ExitThread 249 raise 250 else: 251 stdout = stdout.strip() 252 stderr = stderr.rstrip() 253 254 return (retcode, stdout, stderr) 255 except: 256 self._kill() 257 raise 258 finally: 259 self._wait_completed() 260 self._popen = None 261 self.current_test_name = None 262 263 def _runtest(self, test_name: str) -> MultiprocessResult: 264 retcode, stdout, stderr = self._run_process(test_name) 265 266 if retcode is None: 267 return self.mp_result_error(Timeout(test_name), stdout, stderr) 268 269 err_msg = None 270 if retcode != 0: 271 err_msg = "Exit code %s" % retcode 272 else: 273 stdout, _, result = stdout.rpartition("\n") 274 stdout = stdout.rstrip() 275 if not result: 276 err_msg = "Failed to parse worker stdout" 277 else: 278 try: 279 # deserialize run_tests_worker() output 280 result = json.loads(result, object_hook=decode_test_result) 281 except Exception as exc: 282 err_msg = "Failed to parse worker JSON: %s" % exc 283 284 if err_msg is not None: 285 return self.mp_result_error(ChildError(test_name), 286 stdout, stderr, err_msg) 287 288 return MultiprocessResult(result, stdout, stderr, err_msg) 289 290 def run(self) -> None: 291 while not self._stopped: 292 try: 293 try: 294 test_name = next(self.pending) 295 except StopIteration: 296 break 297 298 mp_result = self._runtest(test_name) 299 self.output.put((False, mp_result)) 300 301 if must_stop(mp_result.result, self.ns): 302 break 303 except ExitThread: 304 break 305 except BaseException: 306 self.output.put((True, traceback.format_exc())) 307 break 308 309 def _wait_completed(self) -> None: 310 popen = self._popen 311 312 # stdout and stderr must be closed to ensure that communicate() 313 # does not hang 314 popen.stdout.close() 315 popen.stderr.close() 316 317 try: 318 popen.wait(JOIN_TIMEOUT) 319 except (subprocess.TimeoutExpired, OSError) as exc: 320 print_warning(f"Failed to wait for {self} completion " 321 f"(timeout={format_duration(JOIN_TIMEOUT)}): " 322 f"{exc!r}") 323 324 def wait_stopped(self, start_time: float) -> None: 325 # bpo-38207: MultiprocessTestRunner.stop_workers() called self.stop() 326 # which killed the process. Sometimes, killing the process from the 327 # main thread does not interrupt popen.communicate() in 328 # TestWorkerProcess thread. This loop with a timeout is a workaround 329 # for that. 330 # 331 # Moreover, if this method fails to join the thread, it is likely 332 # that Python will hang at exit while calling threading._shutdown() 333 # which tries again to join the blocked thread. Regrtest.main() 334 # uses EXIT_TIMEOUT to workaround this second bug. 335 while True: 336 # Write a message every second 337 self.join(1.0) 338 if not self.is_alive(): 339 break 340 dt = time.monotonic() - start_time 341 self.regrtest.log(f"Waiting for {self} thread " 342 f"for {format_duration(dt)}") 343 if dt > JOIN_TIMEOUT: 344 print_warning(f"Failed to join {self} in {format_duration(dt)}") 345 break 346 347 348def get_running(workers: list[TestWorkerProcess]) -> list[TestWorkerProcess]: 349 running = [] 350 for worker in workers: 351 current_test_name = worker.current_test_name 352 if not current_test_name: 353 continue 354 dt = time.monotonic() - worker.start_time 355 if dt >= PROGRESS_MIN_TIME: 356 text = '%s (%s)' % (current_test_name, format_duration(dt)) 357 running.append(text) 358 return running 359 360 361class MultiprocessTestRunner: 362 def __init__(self, regrtest: Regrtest) -> None: 363 self.regrtest = regrtest 364 self.log = self.regrtest.log 365 self.ns = regrtest.ns 366 self.output: queue.Queue[QueueOutput] = queue.Queue() 367 self.pending = MultiprocessIterator(self.regrtest.tests) 368 if self.ns.timeout is not None: 369 # Rely on faulthandler to kill a worker process. This timouet is 370 # when faulthandler fails to kill a worker process. Give a maximum 371 # of 5 minutes to faulthandler to kill the worker. 372 self.worker_timeout = min(self.ns.timeout * 1.5, 373 self.ns.timeout + 5 * 60) 374 else: 375 self.worker_timeout = None 376 self.workers = None 377 378 def start_workers(self) -> None: 379 self.workers = [TestWorkerProcess(index, self) 380 for index in range(1, self.ns.use_mp + 1)] 381 msg = f"Run tests in parallel using {len(self.workers)} child processes" 382 if self.ns.timeout: 383 msg += (" (timeout: %s, worker timeout: %s)" 384 % (format_duration(self.ns.timeout), 385 format_duration(self.worker_timeout))) 386 self.log(msg) 387 for worker in self.workers: 388 worker.start() 389 390 def stop_workers(self) -> None: 391 start_time = time.monotonic() 392 for worker in self.workers: 393 worker.stop() 394 for worker in self.workers: 395 worker.wait_stopped(start_time) 396 397 def _get_result(self) -> QueueOutput | None: 398 use_faulthandler = (self.ns.timeout is not None) 399 timeout = PROGRESS_UPDATE 400 401 # bpo-46205: check the status of workers every iteration to avoid 402 # waiting forever on an empty queue. 403 while any(worker.is_alive() for worker in self.workers): 404 if use_faulthandler: 405 faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT, 406 exit=True) 407 408 # wait for a thread 409 try: 410 return self.output.get(timeout=timeout) 411 except queue.Empty: 412 pass 413 414 # display progress 415 running = get_running(self.workers) 416 if running and not self.ns.pgo: 417 self.log('running: %s' % ', '.join(running)) 418 419 # all worker threads are done: consume pending results 420 try: 421 return self.output.get(timeout=0) 422 except queue.Empty: 423 return None 424 425 def display_result(self, mp_result: MultiprocessResult) -> None: 426 result = mp_result.result 427 428 text = str(result) 429 if mp_result.error_msg is not None: 430 # CHILD_ERROR 431 text += ' (%s)' % mp_result.error_msg 432 elif (result.duration_sec >= PROGRESS_MIN_TIME and not self.ns.pgo): 433 text += ' (%s)' % format_duration(result.duration_sec) 434 running = get_running(self.workers) 435 if running and not self.ns.pgo: 436 text += ' -- running: %s' % ', '.join(running) 437 self.regrtest.display_progress(self.test_index, text) 438 439 def _process_result(self, item: QueueOutput) -> bool: 440 """Returns True if test runner must stop.""" 441 if item[0]: 442 # Thread got an exception 443 format_exc = item[1] 444 print_warning(f"regrtest worker thread failed: {format_exc}") 445 return True 446 447 self.test_index += 1 448 mp_result = item[1] 449 self.regrtest.accumulate_result(mp_result.result) 450 self.display_result(mp_result) 451 452 if mp_result.stdout: 453 print(mp_result.stdout, flush=True) 454 if mp_result.stderr and not self.ns.pgo: 455 print(mp_result.stderr, file=sys.stderr, flush=True) 456 457 if must_stop(mp_result.result, self.ns): 458 return True 459 460 return False 461 462 def run_tests(self) -> None: 463 self.start_workers() 464 465 self.test_index = 0 466 try: 467 while True: 468 item = self._get_result() 469 if item is None: 470 break 471 472 stop = self._process_result(item) 473 if stop: 474 break 475 except KeyboardInterrupt: 476 print() 477 self.regrtest.interrupted = True 478 finally: 479 if self.ns.timeout is not None: 480 faulthandler.cancel_dump_traceback_later() 481 482 # Always ensure that all worker processes are no longer 483 # worker when we exit this function 484 self.pending.stop() 485 self.stop_workers() 486 487 488def run_tests_multiprocess(regrtest: Regrtest) -> None: 489 MultiprocessTestRunner(regrtest).run_tests() 490 491 492class EncodeTestResult(json.JSONEncoder): 493 """Encode a TestResult (sub)class object into a JSON dict.""" 494 495 def default(self, o: Any) -> dict[str, Any]: 496 if isinstance(o, TestResult): 497 result = vars(o) 498 result["__test_result__"] = o.__class__.__name__ 499 return result 500 501 return super().default(o) 502 503 504def decode_test_result(d: dict[str, Any]) -> TestResult | dict[str, Any]: 505 """Decode a TestResult (sub)class object from a JSON dict.""" 506 507 if "__test_result__" not in d: 508 return d 509 510 cls_name = d.pop("__test_result__") 511 for cls in get_all_test_result_classes(): 512 if cls.__name__ == cls_name: 513 return cls(**d) 514 515 516def get_all_test_result_classes() -> set[type[TestResult]]: 517 prev_count = 0 518 classes = {TestResult} 519 while len(classes) > prev_count: 520 prev_count = len(classes) 521 to_add = [] 522 for cls in classes: 523 to_add.extend(cls.__subclasses__()) 524 classes.update(to_add) 525 return classes 526