• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import contextlib
2import dataclasses
3import json
4import os
5import shlex
6import subprocess
7import sys
8from typing import Any
9
10from test import support
11
12from .utils import (
13    StrPath, StrJSON, TestTuple, TestFilter, FilterTuple, FilterDict)
14
15
16class JsonFileType:
17    UNIX_FD = "UNIX_FD"
18    WINDOWS_HANDLE = "WINDOWS_HANDLE"
19    STDOUT = "STDOUT"
20
21
22@dataclasses.dataclass(slots=True, frozen=True)
23class JsonFile:
24    # file type depends on file_type:
25    # - UNIX_FD: file descriptor (int)
26    # - WINDOWS_HANDLE: handle (int)
27    # - STDOUT: use process stdout (None)
28    file: int | None
29    file_type: str
30
31    def configure_subprocess(self, popen_kwargs: dict[str, Any]) -> None:
32        match self.file_type:
33            case JsonFileType.UNIX_FD:
34                # Unix file descriptor
35                popen_kwargs['pass_fds'] = [self.file]
36            case JsonFileType.WINDOWS_HANDLE:
37                # Windows handle
38                # We run mypy with `--platform=linux` so it complains about this:
39                startupinfo = subprocess.STARTUPINFO()  # type: ignore[attr-defined]
40                startupinfo.lpAttributeList = {"handle_list": [self.file]}
41                popen_kwargs['startupinfo'] = startupinfo
42
43    @contextlib.contextmanager
44    def inherit_subprocess(self):
45        if self.file_type == JsonFileType.WINDOWS_HANDLE:
46            os.set_handle_inheritable(self.file, True)
47            try:
48                yield
49            finally:
50                os.set_handle_inheritable(self.file, False)
51        else:
52            yield
53
54    def open(self, mode='r', *, encoding):
55        if self.file_type == JsonFileType.STDOUT:
56            raise ValueError("for STDOUT file type, just use sys.stdout")
57
58        file = self.file
59        if self.file_type == JsonFileType.WINDOWS_HANDLE:
60            import msvcrt
61            # Create a file descriptor from the handle
62            file = msvcrt.open_osfhandle(file, os.O_WRONLY)
63        return open(file, mode, encoding=encoding)
64
65
66@dataclasses.dataclass(slots=True, frozen=True)
67class HuntRefleak:
68    warmups: int
69    runs: int
70    filename: StrPath
71
72    def bisect_cmd_args(self) -> list[str]:
73        # Ignore filename since it can contain colon (":"),
74        # and usually it's not used. Use the default filename.
75        return ["-R", f"{self.warmups}:{self.runs}:"]
76
77
78@dataclasses.dataclass(slots=True, frozen=True)
79class RunTests:
80    tests: TestTuple
81    fail_fast: bool
82    fail_env_changed: bool
83    match_tests: TestFilter
84    match_tests_dict: FilterDict | None
85    rerun: bool
86    forever: bool
87    pgo: bool
88    pgo_extended: bool
89    output_on_failure: bool
90    timeout: float | None
91    verbose: int
92    quiet: bool
93    hunt_refleak: HuntRefleak | None
94    test_dir: StrPath | None
95    use_junit: bool
96    coverage: bool
97    memory_limit: str | None
98    gc_threshold: int | None
99    use_resources: tuple[str, ...]
100    python_cmd: tuple[str, ...] | None
101    randomize: bool
102    random_seed: int | str
103
104    def copy(self, **override) -> 'RunTests':
105        state = dataclasses.asdict(self)
106        state.update(override)
107        return RunTests(**state)
108
109    def create_worker_runtests(self, **override):
110        state = dataclasses.asdict(self)
111        state.update(override)
112        return WorkerRunTests(**state)
113
114    def get_match_tests(self, test_name) -> FilterTuple | None:
115        if self.match_tests_dict is not None:
116            return self.match_tests_dict.get(test_name, None)
117        else:
118            return None
119
120    def get_jobs(self):
121        # Number of run_single_test() calls needed to run all tests.
122        # None means that there is not bound limit (--forever option).
123        if self.forever:
124            return None
125        return len(self.tests)
126
127    def iter_tests(self):
128        if self.forever:
129            while True:
130                yield from self.tests
131        else:
132            yield from self.tests
133
134    def json_file_use_stdout(self) -> bool:
135        # Use STDOUT in two cases:
136        #
137        # - If --python command line option is used;
138        # - On Emscripten and WASI.
139        #
140        # On other platforms, UNIX_FD or WINDOWS_HANDLE can be used.
141        return (
142            bool(self.python_cmd)
143            or support.is_emscripten
144            or support.is_wasi
145        )
146
147    def create_python_cmd(self) -> list[str]:
148        python_opts = support.args_from_interpreter_flags()
149        if self.python_cmd is not None:
150            executable = self.python_cmd
151            # Remove -E option, since --python=COMMAND can set PYTHON
152            # environment variables, such as PYTHONPATH, in the worker
153            # process.
154            python_opts = [opt for opt in python_opts if opt != "-E"]
155        else:
156            executable = (sys.executable,)
157        cmd = [*executable, *python_opts]
158        if '-u' not in python_opts:
159            cmd.append('-u')  # Unbuffered stdout and stderr
160        if self.coverage:
161            cmd.append("-Xpresite=test.cov")
162        return cmd
163
164    def bisect_cmd_args(self) -> list[str]:
165        args = []
166        if self.fail_fast:
167            args.append("--failfast")
168        if self.fail_env_changed:
169            args.append("--fail-env-changed")
170        if self.timeout:
171            args.append(f"--timeout={self.timeout}")
172        if self.hunt_refleak is not None:
173            args.extend(self.hunt_refleak.bisect_cmd_args())
174        if self.test_dir:
175            args.extend(("--testdir", self.test_dir))
176        if self.memory_limit:
177            args.extend(("--memlimit", self.memory_limit))
178        if self.gc_threshold:
179            args.append(f"--threshold={self.gc_threshold}")
180        if self.use_resources:
181            args.extend(("-u", ','.join(self.use_resources)))
182        if self.python_cmd:
183            cmd = shlex.join(self.python_cmd)
184            args.extend(("--python", cmd))
185        if self.randomize:
186            args.append(f"--randomize")
187        args.append(f"--randseed={self.random_seed}")
188        return args
189
190
191@dataclasses.dataclass(slots=True, frozen=True)
192class WorkerRunTests(RunTests):
193    json_file: JsonFile
194
195    def as_json(self) -> StrJSON:
196        return json.dumps(self, cls=_EncodeRunTests)
197
198    @staticmethod
199    def from_json(worker_json: StrJSON) -> 'WorkerRunTests':
200        return json.loads(worker_json, object_hook=_decode_runtests)
201
202
203class _EncodeRunTests(json.JSONEncoder):
204    def default(self, o: Any) -> dict[str, Any]:
205        if isinstance(o, WorkerRunTests):
206            result = dataclasses.asdict(o)
207            result["__runtests__"] = True
208            return result
209        else:
210            return super().default(o)
211
212
213def _decode_runtests(data: dict[str, Any]) -> RunTests | dict[str, Any]:
214    if "__runtests__" in data:
215        data.pop('__runtests__')
216        if data['hunt_refleak']:
217            data['hunt_refleak'] = HuntRefleak(**data['hunt_refleak'])
218        if data['json_file']:
219            data['json_file'] = JsonFile(**data['json_file'])
220        return WorkerRunTests(**data)
221    else:
222        return data
223