• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import collections
2import faulthandler
3import json
4import os
5import queue
6import subprocess
7import sys
8import threading
9import time
10import traceback
11import types
12from test import support
13
14from test.libregrtest.runtest import (
15    runtest, INTERRUPTED, CHILD_ERROR, PROGRESS_MIN_TIME,
16    format_test_result, TestResult, is_failed, TIMEOUT)
17from test.libregrtest.setup import setup_tests
18from test.libregrtest.utils import format_duration, print_warning
19
20
21# Display the running tests if nothing happened last N seconds
22PROGRESS_UPDATE = 30.0   # seconds
23assert PROGRESS_UPDATE >= PROGRESS_MIN_TIME
24
25# Kill the main process after 5 minutes. It is supposed to write an update
26# every PROGRESS_UPDATE seconds. Tolerate 5 minutes for Python slowest
27# buildbot workers.
28MAIN_PROCESS_TIMEOUT = 5 * 60.0
29assert MAIN_PROCESS_TIMEOUT >= PROGRESS_UPDATE
30
31# Time to wait until a worker completes: should be immediate
32JOIN_TIMEOUT = 30.0   # seconds
33
34
35def must_stop(result, ns):
36    if result.result == INTERRUPTED:
37        return True
38    if ns.failfast and is_failed(result, ns):
39        return True
40    return False
41
42
43def parse_worker_args(worker_args):
44    ns_dict, test_name = json.loads(worker_args)
45    ns = types.SimpleNamespace(**ns_dict)
46    return (ns, test_name)
47
48
49def run_test_in_subprocess(testname, ns):
50    ns_dict = vars(ns)
51    worker_args = (ns_dict, testname)
52    worker_args = json.dumps(worker_args)
53
54    cmd = [sys.executable, *support.args_from_interpreter_flags(),
55           '-u',    # Unbuffered stdout and stderr
56           '-m', 'test.regrtest',
57           '--worker-args', worker_args]
58
59    # Running the child from the same working directory as regrtest's original
60    # invocation ensures that TEMPDIR for the child is the same when
61    # sysconfig.is_python_build() is true. See issue 15300.
62    return subprocess.Popen(cmd,
63                            stdout=subprocess.PIPE,
64                            stderr=subprocess.PIPE,
65                            universal_newlines=True,
66                            close_fds=(os.name != 'nt'),
67                            cwd=support.SAVEDCWD)
68
69
70def run_tests_worker(ns, test_name):
71    setup_tests(ns)
72
73    result = runtest(ns, test_name)
74
75    print()   # Force a newline (just in case)
76
77    # Serialize TestResult as list in JSON
78    print(json.dumps(list(result)), flush=True)
79    sys.exit(0)
80
81
82# We do not use a generator so multiple threads can call next().
83class MultiprocessIterator:
84
85    """A thread-safe iterator over tests for multiprocess mode."""
86
87    def __init__(self, tests_iter):
88        self.lock = threading.Lock()
89        self.tests_iter = tests_iter
90
91    def __iter__(self):
92        return self
93
94    def __next__(self):
95        with self.lock:
96            if self.tests_iter is None:
97                raise StopIteration
98            return next(self.tests_iter)
99
100    def stop(self):
101        with self.lock:
102            self.tests_iter = None
103
104
105MultiprocessResult = collections.namedtuple('MultiprocessResult',
106    'result stdout stderr error_msg')
107
108class ExitThread(Exception):
109    pass
110
111
112class TestWorkerProcess(threading.Thread):
113    def __init__(self, worker_id, runner):
114        super().__init__()
115        self.worker_id = worker_id
116        self.pending = runner.pending
117        self.output = runner.output
118        self.ns = runner.ns
119        self.timeout = runner.worker_timeout
120        self.regrtest = runner.regrtest
121        self.current_test_name = None
122        self.start_time = None
123        self._popen = None
124        self._killed = False
125        self._stopped = False
126
127    def __repr__(self):
128        info = [f'TestWorkerProcess #{self.worker_id}']
129        if self.is_alive():
130            info.append("running")
131        else:
132            info.append('stopped')
133        test = self.current_test_name
134        if test:
135            info.append(f'test={test}')
136        popen = self._popen
137        if popen is not None:
138            dt = time.monotonic() - self.start_time
139            info.extend((f'pid={self._popen.pid}',
140                         f'time={format_duration(dt)}'))
141        return '<%s>' % ' '.join(info)
142
143    def _kill(self):
144        popen = self._popen
145        if popen is None:
146            return
147
148        if self._killed:
149            return
150        self._killed = True
151
152        print(f"Kill {self}", file=sys.stderr, flush=True)
153        try:
154            popen.kill()
155        except OSError as exc:
156            print_warning(f"Failed to kill {self}: {exc!r}")
157
158    def stop(self):
159        # Method called from a different thread to stop this thread
160        self._stopped = True
161        self._kill()
162
163    def mp_result_error(self, test_name, error_type, stdout='', stderr='',
164                        err_msg=None):
165        test_time = time.monotonic() - self.start_time
166        result = TestResult(test_name, error_type, test_time, None)
167        return MultiprocessResult(result, stdout, stderr, err_msg)
168
169    def _run_process(self, test_name):
170        self.start_time = time.monotonic()
171
172        self.current_test_name = test_name
173        try:
174            popen = run_test_in_subprocess(test_name, self.ns)
175
176            self._killed = False
177            self._popen = popen
178        except:
179            self.current_test_name = None
180            raise
181
182        try:
183            if self._stopped:
184                # If kill() has been called before self._popen is set,
185                # self._popen is still running. Call again kill()
186                # to ensure that the process is killed.
187                self._kill()
188                raise ExitThread
189
190            try:
191                stdout, stderr = popen.communicate(timeout=self.timeout)
192                retcode = popen.returncode
193                assert retcode is not None
194            except subprocess.TimeoutExpired:
195                if self._stopped:
196                    # kill() has been called: communicate() fails
197                    # on reading closed stdout/stderr
198                    raise ExitThread
199
200                # On timeout, kill the process
201                self._kill()
202
203                # None means TIMEOUT for the caller
204                retcode = None
205                # bpo-38207: Don't attempt to call communicate() again: on it
206                # can hang until all child processes using stdout and stderr
207                # pipes completes.
208                stdout = stderr = ''
209            except OSError:
210                if self._stopped:
211                    # kill() has been called: communicate() fails
212                    # on reading closed stdout/stderr
213                    raise ExitThread
214                raise
215            else:
216                stdout = stdout.strip()
217                stderr = stderr.rstrip()
218
219            return (retcode, stdout, stderr)
220        except:
221            self._kill()
222            raise
223        finally:
224            self._wait_completed()
225            self._popen = None
226            self.current_test_name = None
227
228    def _runtest(self, test_name):
229        retcode, stdout, stderr = self._run_process(test_name)
230
231        if retcode is None:
232            return self.mp_result_error(test_name, TIMEOUT, stdout, stderr)
233
234        err_msg = None
235        if retcode != 0:
236            err_msg = "Exit code %s" % retcode
237        else:
238            stdout, _, result = stdout.rpartition("\n")
239            stdout = stdout.rstrip()
240            if not result:
241                err_msg = "Failed to parse worker stdout"
242            else:
243                try:
244                    # deserialize run_tests_worker() output
245                    result = json.loads(result)
246                    result = TestResult(*result)
247                except Exception as exc:
248                    err_msg = "Failed to parse worker JSON: %s" % exc
249
250        if err_msg is not None:
251            return self.mp_result_error(test_name, CHILD_ERROR,
252                                        stdout, stderr, err_msg)
253
254        return MultiprocessResult(result, stdout, stderr, err_msg)
255
256    def run(self):
257        while not self._stopped:
258            try:
259                try:
260                    test_name = next(self.pending)
261                except StopIteration:
262                    break
263
264                mp_result = self._runtest(test_name)
265                self.output.put((False, mp_result))
266
267                if must_stop(mp_result.result, self.ns):
268                    break
269            except ExitThread:
270                break
271            except BaseException:
272                self.output.put((True, traceback.format_exc()))
273                break
274
275    def _wait_completed(self):
276        popen = self._popen
277
278        # stdout and stderr must be closed to ensure that communicate()
279        # does not hang
280        popen.stdout.close()
281        popen.stderr.close()
282
283        try:
284            popen.wait(JOIN_TIMEOUT)
285        except (subprocess.TimeoutExpired, OSError) as exc:
286            print_warning(f"Failed to wait for {self} completion "
287                          f"(timeout={format_duration(JOIN_TIMEOUT)}): "
288                          f"{exc!r}")
289
290    def wait_stopped(self, start_time):
291        # bpo-38207: MultiprocessTestRunner.stop_workers() called self.stop()
292        # which killed the process. Sometimes, killing the process from the
293        # main thread does not interrupt popen.communicate() in
294        # TestWorkerProcess thread. This loop with a timeout is a workaround
295        # for that.
296        #
297        # Moreover, if this method fails to join the thread, it is likely
298        # that Python will hang at exit while calling threading._shutdown()
299        # which tries again to join the blocked thread. Regrtest.main()
300        # uses EXIT_TIMEOUT to workaround this second bug.
301        while True:
302            # Write a message every second
303            self.join(1.0)
304            if not self.is_alive():
305                break
306            dt = time.monotonic() - start_time
307            self.regrtest.log(f"Waiting for {self} thread "
308                              f"for {format_duration(dt)}")
309            if dt > JOIN_TIMEOUT:
310                print_warning(f"Failed to join {self} in {format_duration(dt)}")
311                break
312
313
314def get_running(workers):
315    running = []
316    for worker in workers:
317        current_test_name = worker.current_test_name
318        if not current_test_name:
319            continue
320        dt = time.monotonic() - worker.start_time
321        if dt >= PROGRESS_MIN_TIME:
322            text = '%s (%s)' % (current_test_name, format_duration(dt))
323            running.append(text)
324    return running
325
326
327class MultiprocessTestRunner:
328    def __init__(self, regrtest):
329        self.regrtest = regrtest
330        self.log = self.regrtest.log
331        self.ns = regrtest.ns
332        self.output = queue.Queue()
333        self.pending = MultiprocessIterator(self.regrtest.tests)
334        if self.ns.timeout is not None:
335            self.worker_timeout = self.ns.timeout * 1.5
336        else:
337            self.worker_timeout = None
338        self.workers = None
339
340    def start_workers(self):
341        self.workers = [TestWorkerProcess(index, self)
342                        for index in range(1, self.ns.use_mp + 1)]
343        self.log("Run tests in parallel using %s child processes"
344                 % len(self.workers))
345        for worker in self.workers:
346            worker.start()
347
348    def stop_workers(self):
349        start_time = time.monotonic()
350        for worker in self.workers:
351            worker.stop()
352        for worker in self.workers:
353            worker.wait_stopped(start_time)
354
355    def _get_result(self):
356        if not any(worker.is_alive() for worker in self.workers):
357            # all worker threads are done: consume pending results
358            try:
359                return self.output.get(timeout=0)
360            except queue.Empty:
361                return None
362
363        use_faulthandler = (self.ns.timeout is not None)
364        timeout = PROGRESS_UPDATE
365        while True:
366            if use_faulthandler:
367                faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT,
368                                                  exit=True)
369
370            # wait for a thread
371            try:
372                return self.output.get(timeout=timeout)
373            except queue.Empty:
374                pass
375
376            # display progress
377            running = get_running(self.workers)
378            if running and not self.ns.pgo:
379                self.log('running: %s' % ', '.join(running))
380
381    def display_result(self, mp_result):
382        result = mp_result.result
383
384        text = format_test_result(result)
385        if mp_result.error_msg is not None:
386            # CHILD_ERROR
387            text += ' (%s)' % mp_result.error_msg
388        elif (result.test_time >= PROGRESS_MIN_TIME and not self.ns.pgo):
389            text += ' (%s)' % format_duration(result.test_time)
390        running = get_running(self.workers)
391        if running and not self.ns.pgo:
392            text += ' -- running: %s' % ', '.join(running)
393        self.regrtest.display_progress(self.test_index, text)
394
395    def _process_result(self, item):
396        if item[0]:
397            # Thread got an exception
398            format_exc = item[1]
399            print_warning(f"regrtest worker thread failed: {format_exc}")
400            return True
401
402        self.test_index += 1
403        mp_result = item[1]
404        self.regrtest.accumulate_result(mp_result.result)
405        self.display_result(mp_result)
406
407        if mp_result.stdout:
408            print(mp_result.stdout, flush=True)
409        if mp_result.stderr and not self.ns.pgo:
410            print(mp_result.stderr, file=sys.stderr, flush=True)
411
412        if must_stop(mp_result.result, self.ns):
413            return True
414
415        return False
416
417    def run_tests(self):
418        self.start_workers()
419
420        self.test_index = 0
421        try:
422            while True:
423                item = self._get_result()
424                if item is None:
425                    break
426
427                stop = self._process_result(item)
428                if stop:
429                    break
430        except KeyboardInterrupt:
431            print()
432            self.regrtest.interrupted = True
433        finally:
434            if self.ns.timeout is not None:
435                faulthandler.cancel_dump_traceback_later()
436
437            # Always ensure that all worker processes are no longer
438            # worker when we exit this function
439            self.pending.stop()
440            self.stop_workers()
441
442
443def run_tests_multiprocess(regrtest):
444    MultiprocessTestRunner(regrtest).run_tests()
445