• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2019 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Runs Pigweed unit tests built using GN."""
15
16import argparse
17import asyncio
18import enum
19import json
20import logging
21import os
22import subprocess
23import sys
24
25from pathlib import Path
26from typing import Dict, Iterable, List, Optional, Sequence, Set, Tuple
27
28import pw_cli.log
29import pw_cli.process
30
31# Global logger for the script.
32_LOG: logging.Logger = logging.getLogger(__name__)
33
34
35def register_arguments(parser: argparse.ArgumentParser) -> None:
36    """Registers command-line arguments."""
37
38    parser.add_argument('--root',
39                        type=str,
40                        default='out',
41                        help='Path to the root build directory')
42    parser.add_argument('-r',
43                        '--runner',
44                        type=str,
45                        required=True,
46                        help='Executable which runs a test on the target')
47    parser.add_argument('-m',
48                        '--timeout',
49                        type=float,
50                        help='Timeout for test runner in seconds')
51    parser.add_argument('runner_args',
52                        nargs="*",
53                        help='Arguments to forward to the test runner')
54
55    # The runner script can either run binaries directly or groups.
56    group = parser.add_mutually_exclusive_group()
57    group.add_argument('-g',
58                       '--group',
59                       action='append',
60                       help='Test groups to run')
61    group.add_argument('-t',
62                       '--test',
63                       action='append',
64                       help='Test binaries to run')
65
66
67class TestResult(enum.Enum):
68    """Result of a single unit test run."""
69    UNKNOWN = 0
70    SUCCESS = 1
71    FAILURE = 2
72
73
74class Test:
75    """A unit test executable."""
76    def __init__(self, name: str, file_path: str):
77        self.name: str = name
78        self.file_path: str = file_path
79        self.status: TestResult = TestResult.UNKNOWN
80
81    def __repr__(self) -> str:
82        return f'Test({self.name})'
83
84    def __eq__(self, other: object) -> bool:
85        if not isinstance(other, Test):
86            return NotImplemented
87        return self.file_path == other.file_path
88
89    def __hash__(self) -> int:
90        return hash(self.file_path)
91
92
93class TestGroup:
94    """Graph node representing a group of unit tests."""
95    def __init__(self, name: str, tests: Iterable[Test]):
96        self._name: str = name
97        self._deps: Iterable['TestGroup'] = []
98        self._tests: Iterable[Test] = tests
99
100    def set_deps(self, deps: Iterable['TestGroup']) -> None:
101        """Updates the dependency list of this group."""
102        self._deps = deps
103
104    def all_test_dependencies(self) -> List[Test]:
105        """Returns a list of all tests in this group and its dependencies."""
106        return list(self._all_test_dependencies(set()))
107
108    def _all_test_dependencies(self, processed_groups: Set[str]) -> Set[Test]:
109        if self._name in processed_groups:
110            return set()
111
112        tests: Set[Test] = set()
113        for dep in self._deps:
114            tests.update(
115                dep._all_test_dependencies(  # pylint: disable=protected-access
116                    processed_groups))
117
118        tests.update(self._tests)
119        processed_groups.add(self._name)
120
121        return tests
122
123    def __repr__(self) -> str:
124        return f'TestGroup({self._name})'
125
126
127class TestRunner:
128    """Runs unit tests by calling out to a runner script."""
129    def __init__(self,
130                 executable: str,
131                 args: Sequence[str],
132                 tests: Iterable[Test],
133                 timeout: Optional[float] = None):
134        self._executable: str = executable
135        self._args: Sequence[str] = args
136        self._tests: List[Test] = list(tests)
137        self._timeout = timeout
138
139    async def run_tests(self) -> None:
140        """Runs all registered unit tests through the runner script."""
141
142        for idx, test in enumerate(self._tests, 1):
143            total = str(len(self._tests))
144            test_counter = f'Test {idx:{len(total)}}/{total}'
145
146            _LOG.info('%s: [ RUN] %s', test_counter, test.name)
147
148            # Convert POSIX to native directory seperators as GN produces '/'
149            # but the Windows test runner needs '\\'.
150            command = [
151                str(Path(self._executable)),
152                str(Path(test.file_path)), *self._args
153            ]
154
155            if self._executable.endswith('.py'):
156                command.insert(0, sys.executable)
157
158            try:
159                process = await pw_cli.process.run_async(*command,
160                                                         timeout=self._timeout)
161                if process.returncode == 0:
162                    test.status = TestResult.SUCCESS
163                    test_result = 'PASS'
164                else:
165                    test.status = TestResult.FAILURE
166                    test_result = 'FAIL'
167
168                    _LOG.log(pw_cli.log.LOGLEVEL_STDOUT, '[%s]\n%s',
169                             pw_cli.color.colors().bold_white(process.pid),
170                             process.output.decode(errors='ignore').rstrip())
171
172                    _LOG.info('%s: [%s] %s', test_counter, test_result,
173                              test.name)
174            except subprocess.CalledProcessError as err:
175                _LOG.error(err)
176                return
177
178    def all_passed(self) -> bool:
179        """Returns true if all unit tests passed."""
180        return all(test.status is TestResult.SUCCESS for test in self._tests)
181
182
183# Filename extension for unit test metadata files.
184METADATA_EXTENSION = '.testinfo.json'
185
186
187def find_test_metadata(root: str) -> List[str]:
188    """Locates all test metadata files located within a directory tree."""
189
190    metadata: List[str] = []
191    for path, _, files in os.walk(root):
192        for filename in files:
193            if not filename.endswith(METADATA_EXTENSION):
194                continue
195
196            full_path = os.path.join(path, filename)
197            _LOG.debug('Found group metadata at %s', full_path)
198            metadata.append(full_path)
199
200    return metadata
201
202
203# TODO(frolv): This is copied from the Python runner script.
204# It should be extracted into a library and imported instead.
205def find_binary(target: str) -> str:
206    """Tries to find a binary for a gn build target.
207
208    Args:
209        target: Relative filesystem path to the target's output directory and
210            target name, separated by a colon.
211
212    Returns:
213        Full path to the target's binary.
214
215    Raises:
216        FileNotFoundError: No binary found for target.
217    """
218
219    target_path, target_name = target.split(':')
220
221    for extension in ['', '.elf', '.exe']:
222        potential_filename = f'{target_path}/{target_name}{extension}'
223        if os.path.isfile(potential_filename):
224            return potential_filename
225
226    raise FileNotFoundError(
227        f'Could not find output binary for build target {target}')
228
229
230def parse_metadata(metadata: List[str], root: str) -> Dict[str, TestGroup]:
231    """Builds a graph of test group objects from metadata.
232
233    Args:
234        metadata: List of paths to JSON test metadata files.
235        root: Root output directory of the build.
236
237    Returns:
238        Map of group name to TestGroup object. All TestGroup objects are fully
239        populated with the paths to their unit tests and references to their
240        dependencies.
241    """
242    def canonicalize(path: str) -> str:
243        """Removes a trailing slash from a GN target's directory.
244
245        '//module:target'  -> '//module:target'
246        '//module/:target' -> '//module:target'
247        """
248        index = path.find(':')
249        if index == -1 or path[index - 1] != '/':
250            return path
251        return path[:index - 1] + path[index:]
252
253    group_deps: List[Tuple[str, List[str]]] = []
254    all_tests: Dict[str, Test] = {}
255    test_groups: Dict[str, TestGroup] = {}
256    num_tests = 0
257
258    for path in metadata:
259        with open(path, 'r') as metadata_file:
260            metadata_list = json.load(metadata_file)
261
262        deps: List[str] = []
263        tests: List[Test] = []
264
265        for entry in metadata_list:
266            if entry['type'] == 'self':
267                group_name = canonicalize(entry['name'])
268            elif entry['type'] == 'dep':
269                deps.append(canonicalize(entry['group']))
270            elif entry['type'] == 'test':
271                test_directory = os.path.join(root, entry['test_directory'])
272                test_binary = find_binary(
273                    f'{test_directory}:{entry["test_name"]}')
274
275                if test_binary not in all_tests:
276                    all_tests[test_binary] = Test(entry['test_name'],
277                                                  test_binary)
278
279                tests.append(all_tests[test_binary])
280
281        if deps:
282            group_deps.append((group_name, deps))
283
284        num_tests += len(tests)
285        test_groups[group_name] = TestGroup(group_name, tests)
286
287    for name, deps in group_deps:
288        test_groups[name].set_deps([test_groups[dep] for dep in deps])
289
290    _LOG.info('Found %d test groups (%d tests).', len(metadata), num_tests)
291    return test_groups
292
293
294def tests_from_groups(group_names: Optional[Sequence[str]],
295                      root: str) -> List[Test]:
296    """Returns unit tests belonging to test groups and their dependencies.
297
298    If args.names is nonempty, only searches groups specified there.
299    Otherwise, finds tests from all known test groups.
300    """
301
302    _LOG.info('Scanning for tests...')
303    metadata = find_test_metadata(root)
304    test_groups = parse_metadata(metadata, root)
305
306    groups_to_run = group_names if group_names else test_groups.keys()
307    tests_to_run: Set[Test] = set()
308
309    for name in groups_to_run:
310        try:
311            tests_to_run.update(test_groups[name].all_test_dependencies())
312        except KeyError:
313            _LOG.error('Unknown test group: %s', name)
314            sys.exit(1)
315
316    _LOG.info('Running test groups %s', ', '.join(groups_to_run))
317    return list(tests_to_run)
318
319
320def tests_from_paths(paths: Sequence[str]) -> List[Test]:
321    """Returns a list of tests from test executable paths."""
322
323    tests: List[Test] = []
324    for path in paths:
325        name = os.path.splitext(os.path.basename(path))[0]
326        tests.append(Test(name, path))
327    return tests
328
329
330async def find_and_run_tests(
331    root: str,
332    runner: str,
333    timeout: Optional[float],
334    runner_args: Sequence[str] = (),
335    group: Optional[Sequence[str]] = None,
336    test: Optional[Sequence[str]] = None,
337) -> int:
338    """Runs some unit tests."""
339
340    if test:
341        tests = tests_from_paths(test)
342    else:
343        tests = tests_from_groups(group, root)
344
345    test_runner = TestRunner(runner, runner_args, tests, timeout)
346    await test_runner.run_tests()
347
348    return 0 if test_runner.all_passed() else 1
349
350
351def main() -> int:
352    """Run Pigweed unit tests built using GN."""
353
354    parser = argparse.ArgumentParser(description=main.__doc__)
355    register_arguments(parser)
356    parser.add_argument('-v',
357                        '--verbose',
358                        action='store_true',
359                        help='Output additional logs as the script runs')
360
361    args_as_dict = dict(vars(parser.parse_args()))
362    del args_as_dict['verbose']
363    return asyncio.run(find_and_run_tests(**args_as_dict))
364
365
366if __name__ == '__main__':
367    pw_cli.log.install(hide_timestamp=True)
368    sys.exit(main())
369