• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import collections
2import faulthandler
3import json
4import os
5import queue
6import signal
7import subprocess
8import sys
9import threading
10import time
11import traceback
12from typing import NamedTuple, NoReturn, Literal, Any
13
14from test import support
15from test.support import os_helper
16
17from test.libregrtest.cmdline import Namespace
18from test.libregrtest.main import Regrtest
19from test.libregrtest.runtest import (
20    runtest, is_failed, TestResult, Interrupted, Timeout, ChildError, PROGRESS_MIN_TIME)
21from test.libregrtest.setup import setup_tests
22from test.libregrtest.utils import format_duration, print_warning
23
24
25# Display the running tests if nothing happened last N seconds
26PROGRESS_UPDATE = 30.0   # seconds
27assert PROGRESS_UPDATE >= PROGRESS_MIN_TIME
28
29# Kill the main process after 5 minutes. It is supposed to write an update
30# every PROGRESS_UPDATE seconds. Tolerate 5 minutes for Python slowest
31# buildbot workers.
32MAIN_PROCESS_TIMEOUT = 5 * 60.0
33assert MAIN_PROCESS_TIMEOUT >= PROGRESS_UPDATE
34
35# Time to wait until a worker completes: should be immediate
36JOIN_TIMEOUT = 30.0   # seconds
37
38USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg"))
39
40
41def must_stop(result: TestResult, ns: Namespace) -> bool:
42    if isinstance(result, Interrupted):
43        return True
44    if ns.failfast and is_failed(result, ns):
45        return True
46    return False
47
48
49def parse_worker_args(worker_args) -> tuple[Namespace, str]:
50    ns_dict, test_name = json.loads(worker_args)
51    ns = Namespace(**ns_dict)
52    return (ns, test_name)
53
54
55def run_test_in_subprocess(testname: str, ns: Namespace) -> subprocess.Popen:
56    ns_dict = vars(ns)
57    worker_args = (ns_dict, testname)
58    worker_args = json.dumps(worker_args)
59
60    cmd = [sys.executable, *support.args_from_interpreter_flags(),
61           '-u',    # Unbuffered stdout and stderr
62           '-m', 'test.regrtest',
63           '--worker-args', worker_args]
64
65    # Running the child from the same working directory as regrtest's original
66    # invocation ensures that TEMPDIR for the child is the same when
67    # sysconfig.is_python_build() is true. See issue 15300.
68    kw = {}
69    if USE_PROCESS_GROUP:
70        kw['start_new_session'] = True
71    return subprocess.Popen(cmd,
72                            stdout=subprocess.PIPE,
73                            stderr=subprocess.PIPE,
74                            universal_newlines=True,
75                            close_fds=(os.name != 'nt'),
76                            cwd=os_helper.SAVEDCWD,
77                            **kw)
78
79
80def run_tests_worker(ns: Namespace, test_name: str) -> NoReturn:
81    setup_tests(ns)
82
83    result = runtest(ns, test_name)
84
85    print()   # Force a newline (just in case)
86
87    # Serialize TestResult as dict in JSON
88    print(json.dumps(result, cls=EncodeTestResult), flush=True)
89    sys.exit(0)
90
91
92# We do not use a generator so multiple threads can call next().
93class MultiprocessIterator:
94
95    """A thread-safe iterator over tests for multiprocess mode."""
96
97    def __init__(self, tests_iter):
98        self.lock = threading.Lock()
99        self.tests_iter = tests_iter
100
101    def __iter__(self):
102        return self
103
104    def __next__(self):
105        with self.lock:
106            if self.tests_iter is None:
107                raise StopIteration
108            return next(self.tests_iter)
109
110    def stop(self):
111        with self.lock:
112            self.tests_iter = None
113
114
115class MultiprocessResult(NamedTuple):
116    result: TestResult
117    stdout: str
118    stderr: str
119    error_msg: str
120
121
122ExcStr = str
123QueueOutput = tuple[Literal[False], MultiprocessResult] | tuple[Literal[True], ExcStr]
124
125
126class ExitThread(Exception):
127    pass
128
129
130class TestWorkerProcess(threading.Thread):
131    def __init__(self, worker_id: int, runner: "MultiprocessTestRunner") -> None:
132        super().__init__()
133        self.worker_id = worker_id
134        self.pending = runner.pending
135        self.output = runner.output
136        self.ns = runner.ns
137        self.timeout = runner.worker_timeout
138        self.regrtest = runner.regrtest
139        self.current_test_name = None
140        self.start_time = None
141        self._popen = None
142        self._killed = False
143        self._stopped = False
144
145    def __repr__(self) -> str:
146        info = [f'TestWorkerProcess #{self.worker_id}']
147        if self.is_alive():
148            info.append("running")
149        else:
150            info.append('stopped')
151        test = self.current_test_name
152        if test:
153            info.append(f'test={test}')
154        popen = self._popen
155        if popen is not None:
156            dt = time.monotonic() - self.start_time
157            info.extend((f'pid={self._popen.pid}',
158                         f'time={format_duration(dt)}'))
159        return '<%s>' % ' '.join(info)
160
161    def _kill(self) -> None:
162        popen = self._popen
163        if popen is None:
164            return
165
166        if self._killed:
167            return
168        self._killed = True
169
170        if USE_PROCESS_GROUP:
171            what = f"{self} process group"
172        else:
173            what = f"{self}"
174
175        print(f"Kill {what}", file=sys.stderr, flush=True)
176        try:
177            if USE_PROCESS_GROUP:
178                os.killpg(popen.pid, signal.SIGKILL)
179            else:
180                popen.kill()
181        except ProcessLookupError:
182            # popen.kill(): the process completed, the TestWorkerProcess thread
183            # read its exit status, but Popen.send_signal() read the returncode
184            # just before Popen.wait() set returncode.
185            pass
186        except OSError as exc:
187            print_warning(f"Failed to kill {what}: {exc!r}")
188
189    def stop(self) -> None:
190        # Method called from a different thread to stop this thread
191        self._stopped = True
192        self._kill()
193
194    def mp_result_error(
195        self,
196        test_result: TestResult,
197        stdout: str = '',
198        stderr: str = '',
199        err_msg=None
200    ) -> MultiprocessResult:
201        test_result.duration_sec = time.monotonic() - self.start_time
202        return MultiprocessResult(test_result, stdout, stderr, err_msg)
203
204    def _run_process(self, test_name: str) -> tuple[int, str, str]:
205        self.start_time = time.monotonic()
206
207        self.current_test_name = test_name
208        try:
209            popen = run_test_in_subprocess(test_name, self.ns)
210
211            self._killed = False
212            self._popen = popen
213        except:
214            self.current_test_name = None
215            raise
216
217        try:
218            if self._stopped:
219                # If kill() has been called before self._popen is set,
220                # self._popen is still running. Call again kill()
221                # to ensure that the process is killed.
222                self._kill()
223                raise ExitThread
224
225            try:
226                stdout, stderr = popen.communicate(timeout=self.timeout)
227                retcode = popen.returncode
228                assert retcode is not None
229            except subprocess.TimeoutExpired:
230                if self._stopped:
231                    # kill() has been called: communicate() fails
232                    # on reading closed stdout/stderr
233                    raise ExitThread
234
235                # On timeout, kill the process
236                self._kill()
237
238                # None means TIMEOUT for the caller
239                retcode = None
240                # bpo-38207: Don't attempt to call communicate() again: on it
241                # can hang until all child processes using stdout and stderr
242                # pipes completes.
243                stdout = stderr = ''
244            except OSError:
245                if self._stopped:
246                    # kill() has been called: communicate() fails
247                    # on reading closed stdout/stderr
248                    raise ExitThread
249                raise
250            else:
251                stdout = stdout.strip()
252                stderr = stderr.rstrip()
253
254            return (retcode, stdout, stderr)
255        except:
256            self._kill()
257            raise
258        finally:
259            self._wait_completed()
260            self._popen = None
261            self.current_test_name = None
262
263    def _runtest(self, test_name: str) -> MultiprocessResult:
264        retcode, stdout, stderr = self._run_process(test_name)
265
266        if retcode is None:
267            return self.mp_result_error(Timeout(test_name), stdout, stderr)
268
269        err_msg = None
270        if retcode != 0:
271            err_msg = "Exit code %s" % retcode
272        else:
273            stdout, _, result = stdout.rpartition("\n")
274            stdout = stdout.rstrip()
275            if not result:
276                err_msg = "Failed to parse worker stdout"
277            else:
278                try:
279                    # deserialize run_tests_worker() output
280                    result = json.loads(result, object_hook=decode_test_result)
281                except Exception as exc:
282                    err_msg = "Failed to parse worker JSON: %s" % exc
283
284        if err_msg is not None:
285            return self.mp_result_error(ChildError(test_name),
286                                        stdout, stderr, err_msg)
287
288        return MultiprocessResult(result, stdout, stderr, err_msg)
289
290    def run(self) -> None:
291        while not self._stopped:
292            try:
293                try:
294                    test_name = next(self.pending)
295                except StopIteration:
296                    break
297
298                mp_result = self._runtest(test_name)
299                self.output.put((False, mp_result))
300
301                if must_stop(mp_result.result, self.ns):
302                    break
303            except ExitThread:
304                break
305            except BaseException:
306                self.output.put((True, traceback.format_exc()))
307                break
308
309    def _wait_completed(self) -> None:
310        popen = self._popen
311
312        # stdout and stderr must be closed to ensure that communicate()
313        # does not hang
314        popen.stdout.close()
315        popen.stderr.close()
316
317        try:
318            popen.wait(JOIN_TIMEOUT)
319        except (subprocess.TimeoutExpired, OSError) as exc:
320            print_warning(f"Failed to wait for {self} completion "
321                          f"(timeout={format_duration(JOIN_TIMEOUT)}): "
322                          f"{exc!r}")
323
324    def wait_stopped(self, start_time: float) -> None:
325        # bpo-38207: MultiprocessTestRunner.stop_workers() called self.stop()
326        # which killed the process. Sometimes, killing the process from the
327        # main thread does not interrupt popen.communicate() in
328        # TestWorkerProcess thread. This loop with a timeout is a workaround
329        # for that.
330        #
331        # Moreover, if this method fails to join the thread, it is likely
332        # that Python will hang at exit while calling threading._shutdown()
333        # which tries again to join the blocked thread. Regrtest.main()
334        # uses EXIT_TIMEOUT to workaround this second bug.
335        while True:
336            # Write a message every second
337            self.join(1.0)
338            if not self.is_alive():
339                break
340            dt = time.monotonic() - start_time
341            self.regrtest.log(f"Waiting for {self} thread "
342                              f"for {format_duration(dt)}")
343            if dt > JOIN_TIMEOUT:
344                print_warning(f"Failed to join {self} in {format_duration(dt)}")
345                break
346
347
348def get_running(workers: list[TestWorkerProcess]) -> list[TestWorkerProcess]:
349    running = []
350    for worker in workers:
351        current_test_name = worker.current_test_name
352        if not current_test_name:
353            continue
354        dt = time.monotonic() - worker.start_time
355        if dt >= PROGRESS_MIN_TIME:
356            text = '%s (%s)' % (current_test_name, format_duration(dt))
357            running.append(text)
358    return running
359
360
361class MultiprocessTestRunner:
362    def __init__(self, regrtest: Regrtest) -> None:
363        self.regrtest = regrtest
364        self.log = self.regrtest.log
365        self.ns = regrtest.ns
366        self.output: queue.Queue[QueueOutput] = queue.Queue()
367        self.pending = MultiprocessIterator(self.regrtest.tests)
368        if self.ns.timeout is not None:
369            # Rely on faulthandler to kill a worker process. This timouet is
370            # when faulthandler fails to kill a worker process. Give a maximum
371            # of 5 minutes to faulthandler to kill the worker.
372            self.worker_timeout = min(self.ns.timeout * 1.5,
373                                      self.ns.timeout + 5 * 60)
374        else:
375            self.worker_timeout = None
376        self.workers = None
377
378    def start_workers(self) -> None:
379        self.workers = [TestWorkerProcess(index, self)
380                        for index in range(1, self.ns.use_mp + 1)]
381        msg = f"Run tests in parallel using {len(self.workers)} child processes"
382        if self.ns.timeout:
383            msg += (" (timeout: %s, worker timeout: %s)"
384                    % (format_duration(self.ns.timeout),
385                       format_duration(self.worker_timeout)))
386        self.log(msg)
387        for worker in self.workers:
388            worker.start()
389
390    def stop_workers(self) -> None:
391        start_time = time.monotonic()
392        for worker in self.workers:
393            worker.stop()
394        for worker in self.workers:
395            worker.wait_stopped(start_time)
396
397    def _get_result(self) -> QueueOutput | None:
398        use_faulthandler = (self.ns.timeout is not None)
399        timeout = PROGRESS_UPDATE
400
401        # bpo-46205: check the status of workers every iteration to avoid
402        # waiting forever on an empty queue.
403        while any(worker.is_alive() for worker in self.workers):
404            if use_faulthandler:
405                faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT,
406                                                  exit=True)
407
408            # wait for a thread
409            try:
410                return self.output.get(timeout=timeout)
411            except queue.Empty:
412                pass
413
414            # display progress
415            running = get_running(self.workers)
416            if running and not self.ns.pgo:
417                self.log('running: %s' % ', '.join(running))
418
419        # all worker threads are done: consume pending results
420        try:
421            return self.output.get(timeout=0)
422        except queue.Empty:
423            return None
424
425    def display_result(self, mp_result: MultiprocessResult) -> None:
426        result = mp_result.result
427
428        text = str(result)
429        if mp_result.error_msg is not None:
430            # CHILD_ERROR
431            text += ' (%s)' % mp_result.error_msg
432        elif (result.duration_sec >= PROGRESS_MIN_TIME and not self.ns.pgo):
433            text += ' (%s)' % format_duration(result.duration_sec)
434        running = get_running(self.workers)
435        if running and not self.ns.pgo:
436            text += ' -- running: %s' % ', '.join(running)
437        self.regrtest.display_progress(self.test_index, text)
438
439    def _process_result(self, item: QueueOutput) -> bool:
440        """Returns True if test runner must stop."""
441        if item[0]:
442            # Thread got an exception
443            format_exc = item[1]
444            print_warning(f"regrtest worker thread failed: {format_exc}")
445            return True
446
447        self.test_index += 1
448        mp_result = item[1]
449        self.regrtest.accumulate_result(mp_result.result)
450        self.display_result(mp_result)
451
452        if mp_result.stdout:
453            print(mp_result.stdout, flush=True)
454        if mp_result.stderr and not self.ns.pgo:
455            print(mp_result.stderr, file=sys.stderr, flush=True)
456
457        if must_stop(mp_result.result, self.ns):
458            return True
459
460        return False
461
462    def run_tests(self) -> None:
463        self.start_workers()
464
465        self.test_index = 0
466        try:
467            while True:
468                item = self._get_result()
469                if item is None:
470                    break
471
472                stop = self._process_result(item)
473                if stop:
474                    break
475        except KeyboardInterrupt:
476            print()
477            self.regrtest.interrupted = True
478        finally:
479            if self.ns.timeout is not None:
480                faulthandler.cancel_dump_traceback_later()
481
482            # Always ensure that all worker processes are no longer
483            # worker when we exit this function
484            self.pending.stop()
485            self.stop_workers()
486
487
488def run_tests_multiprocess(regrtest: Regrtest) -> None:
489    MultiprocessTestRunner(regrtest).run_tests()
490
491
492class EncodeTestResult(json.JSONEncoder):
493    """Encode a TestResult (sub)class object into a JSON dict."""
494
495    def default(self, o: Any) -> dict[str, Any]:
496        if isinstance(o, TestResult):
497            result = vars(o)
498            result["__test_result__"] = o.__class__.__name__
499            return result
500
501        return super().default(o)
502
503
504def decode_test_result(d: dict[str, Any]) -> TestResult | dict[str, Any]:
505    """Decode a TestResult (sub)class object from a JSON dict."""
506
507    if "__test_result__" not in d:
508        return d
509
510    cls_name = d.pop("__test_result__")
511    for cls in get_all_test_result_classes():
512        if cls.__name__ == cls_name:
513            return cls(**d)
514
515
516def get_all_test_result_classes() -> set[type[TestResult]]:
517    prev_count = 0
518    classes = {TestResult}
519    while len(classes) > prev_count:
520        prev_count = len(classes)
521        to_add = []
522        for cls in classes:
523            to_add.extend(cls.__subclasses__())
524        classes.update(to_add)
525    return classes
526