1# Copyright 2019 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Runs Pigweed unit tests built using GN.""" 15 16import argparse 17import asyncio 18import enum 19import json 20import logging 21import os 22import subprocess 23import sys 24 25from pathlib import Path 26from typing import Dict, Iterable, List, Optional, Sequence, Set, Tuple 27 28import pw_cli.log 29import pw_cli.process 30 31# Global logger for the script. 32_LOG: logging.Logger = logging.getLogger(__name__) 33 34 35def register_arguments(parser: argparse.ArgumentParser) -> None: 36 """Registers command-line arguments.""" 37 38 parser.add_argument('--root', 39 type=str, 40 default='out', 41 help='Path to the root build directory') 42 parser.add_argument('-r', 43 '--runner', 44 type=str, 45 required=True, 46 help='Executable which runs a test on the target') 47 parser.add_argument('-m', 48 '--timeout', 49 type=float, 50 help='Timeout for test runner in seconds') 51 parser.add_argument('runner_args', 52 nargs="*", 53 help='Arguments to forward to the test runner') 54 55 # The runner script can either run binaries directly or groups. 56 group = parser.add_mutually_exclusive_group() 57 group.add_argument('-g', 58 '--group', 59 action='append', 60 help='Test groups to run') 61 group.add_argument('-t', 62 '--test', 63 action='append', 64 help='Test binaries to run') 65 66 67class TestResult(enum.Enum): 68 """Result of a single unit test run.""" 69 UNKNOWN = 0 70 SUCCESS = 1 71 FAILURE = 2 72 73 74class Test: 75 """A unit test executable.""" 76 def __init__(self, name: str, file_path: str): 77 self.name: str = name 78 self.file_path: str = file_path 79 self.status: TestResult = TestResult.UNKNOWN 80 81 def __repr__(self) -> str: 82 return f'Test({self.name})' 83 84 def __eq__(self, other: object) -> bool: 85 if not isinstance(other, Test): 86 return NotImplemented 87 return self.file_path == other.file_path 88 89 def __hash__(self) -> int: 90 return hash(self.file_path) 91 92 93class TestGroup: 94 """Graph node representing a group of unit tests.""" 95 def __init__(self, name: str, tests: Iterable[Test]): 96 self._name: str = name 97 self._deps: Iterable['TestGroup'] = [] 98 self._tests: Iterable[Test] = tests 99 100 def set_deps(self, deps: Iterable['TestGroup']) -> None: 101 """Updates the dependency list of this group.""" 102 self._deps = deps 103 104 def all_test_dependencies(self) -> List[Test]: 105 """Returns a list of all tests in this group and its dependencies.""" 106 return list(self._all_test_dependencies(set())) 107 108 def _all_test_dependencies(self, processed_groups: Set[str]) -> Set[Test]: 109 if self._name in processed_groups: 110 return set() 111 112 tests: Set[Test] = set() 113 for dep in self._deps: 114 tests.update( 115 dep._all_test_dependencies( # pylint: disable=protected-access 116 processed_groups)) 117 118 tests.update(self._tests) 119 processed_groups.add(self._name) 120 121 return tests 122 123 def __repr__(self) -> str: 124 return f'TestGroup({self._name})' 125 126 127class TestRunner: 128 """Runs unit tests by calling out to a runner script.""" 129 def __init__(self, 130 executable: str, 131 args: Sequence[str], 132 tests: Iterable[Test], 133 timeout: Optional[float] = None): 134 self._executable: str = executable 135 self._args: Sequence[str] = args 136 self._tests: List[Test] = list(tests) 137 self._timeout = timeout 138 139 async def run_tests(self) -> None: 140 """Runs all registered unit tests through the runner script.""" 141 142 for idx, test in enumerate(self._tests, 1): 143 total = str(len(self._tests)) 144 test_counter = f'Test {idx:{len(total)}}/{total}' 145 146 _LOG.info('%s: [ RUN] %s', test_counter, test.name) 147 148 # Convert POSIX to native directory seperators as GN produces '/' 149 # but the Windows test runner needs '\\'. 150 command = [ 151 str(Path(self._executable)), 152 str(Path(test.file_path)), *self._args 153 ] 154 155 if self._executable.endswith('.py'): 156 command.insert(0, sys.executable) 157 158 try: 159 process = await pw_cli.process.run_async(*command, 160 timeout=self._timeout) 161 if process.returncode == 0: 162 test.status = TestResult.SUCCESS 163 test_result = 'PASS' 164 else: 165 test.status = TestResult.FAILURE 166 test_result = 'FAIL' 167 168 _LOG.log(pw_cli.log.LOGLEVEL_STDOUT, '[%s]\n%s', 169 pw_cli.color.colors().bold_white(process.pid), 170 process.output.decode(errors='ignore').rstrip()) 171 172 _LOG.info('%s: [%s] %s', test_counter, test_result, 173 test.name) 174 except subprocess.CalledProcessError as err: 175 _LOG.error(err) 176 return 177 178 def all_passed(self) -> bool: 179 """Returns true if all unit tests passed.""" 180 return all(test.status is TestResult.SUCCESS for test in self._tests) 181 182 183# Filename extension for unit test metadata files. 184METADATA_EXTENSION = '.testinfo.json' 185 186 187def find_test_metadata(root: str) -> List[str]: 188 """Locates all test metadata files located within a directory tree.""" 189 190 metadata: List[str] = [] 191 for path, _, files in os.walk(root): 192 for filename in files: 193 if not filename.endswith(METADATA_EXTENSION): 194 continue 195 196 full_path = os.path.join(path, filename) 197 _LOG.debug('Found group metadata at %s', full_path) 198 metadata.append(full_path) 199 200 return metadata 201 202 203# TODO(frolv): This is copied from the Python runner script. 204# It should be extracted into a library and imported instead. 205def find_binary(target: str) -> str: 206 """Tries to find a binary for a gn build target. 207 208 Args: 209 target: Relative filesystem path to the target's output directory and 210 target name, separated by a colon. 211 212 Returns: 213 Full path to the target's binary. 214 215 Raises: 216 FileNotFoundError: No binary found for target. 217 """ 218 219 target_path, target_name = target.split(':') 220 221 for extension in ['', '.elf', '.exe']: 222 potential_filename = f'{target_path}/{target_name}{extension}' 223 if os.path.isfile(potential_filename): 224 return potential_filename 225 226 raise FileNotFoundError( 227 f'Could not find output binary for build target {target}') 228 229 230def parse_metadata(metadata: List[str], root: str) -> Dict[str, TestGroup]: 231 """Builds a graph of test group objects from metadata. 232 233 Args: 234 metadata: List of paths to JSON test metadata files. 235 root: Root output directory of the build. 236 237 Returns: 238 Map of group name to TestGroup object. All TestGroup objects are fully 239 populated with the paths to their unit tests and references to their 240 dependencies. 241 """ 242 def canonicalize(path: str) -> str: 243 """Removes a trailing slash from a GN target's directory. 244 245 '//module:target' -> '//module:target' 246 '//module/:target' -> '//module:target' 247 """ 248 index = path.find(':') 249 if index == -1 or path[index - 1] != '/': 250 return path 251 return path[:index - 1] + path[index:] 252 253 group_deps: List[Tuple[str, List[str]]] = [] 254 all_tests: Dict[str, Test] = {} 255 test_groups: Dict[str, TestGroup] = {} 256 num_tests = 0 257 258 for path in metadata: 259 with open(path, 'r') as metadata_file: 260 metadata_list = json.load(metadata_file) 261 262 deps: List[str] = [] 263 tests: List[Test] = [] 264 265 for entry in metadata_list: 266 if entry['type'] == 'self': 267 group_name = canonicalize(entry['name']) 268 elif entry['type'] == 'dep': 269 deps.append(canonicalize(entry['group'])) 270 elif entry['type'] == 'test': 271 test_directory = os.path.join(root, entry['test_directory']) 272 test_binary = find_binary( 273 f'{test_directory}:{entry["test_name"]}') 274 275 if test_binary not in all_tests: 276 all_tests[test_binary] = Test(entry['test_name'], 277 test_binary) 278 279 tests.append(all_tests[test_binary]) 280 281 if deps: 282 group_deps.append((group_name, deps)) 283 284 num_tests += len(tests) 285 test_groups[group_name] = TestGroup(group_name, tests) 286 287 for name, deps in group_deps: 288 test_groups[name].set_deps([test_groups[dep] for dep in deps]) 289 290 _LOG.info('Found %d test groups (%d tests).', len(metadata), num_tests) 291 return test_groups 292 293 294def tests_from_groups(group_names: Optional[Sequence[str]], 295 root: str) -> List[Test]: 296 """Returns unit tests belonging to test groups and their dependencies. 297 298 If args.names is nonempty, only searches groups specified there. 299 Otherwise, finds tests from all known test groups. 300 """ 301 302 _LOG.info('Scanning for tests...') 303 metadata = find_test_metadata(root) 304 test_groups = parse_metadata(metadata, root) 305 306 groups_to_run = group_names if group_names else test_groups.keys() 307 tests_to_run: Set[Test] = set() 308 309 for name in groups_to_run: 310 try: 311 tests_to_run.update(test_groups[name].all_test_dependencies()) 312 except KeyError: 313 _LOG.error('Unknown test group: %s', name) 314 sys.exit(1) 315 316 _LOG.info('Running test groups %s', ', '.join(groups_to_run)) 317 return list(tests_to_run) 318 319 320def tests_from_paths(paths: Sequence[str]) -> List[Test]: 321 """Returns a list of tests from test executable paths.""" 322 323 tests: List[Test] = [] 324 for path in paths: 325 name = os.path.splitext(os.path.basename(path))[0] 326 tests.append(Test(name, path)) 327 return tests 328 329 330async def find_and_run_tests( 331 root: str, 332 runner: str, 333 timeout: Optional[float], 334 runner_args: Sequence[str] = (), 335 group: Optional[Sequence[str]] = None, 336 test: Optional[Sequence[str]] = None, 337) -> int: 338 """Runs some unit tests.""" 339 340 if test: 341 tests = tests_from_paths(test) 342 else: 343 tests = tests_from_groups(group, root) 344 345 test_runner = TestRunner(runner, runner_args, tests, timeout) 346 await test_runner.run_tests() 347 348 return 0 if test_runner.all_passed() else 1 349 350 351def main() -> int: 352 """Run Pigweed unit tests built using GN.""" 353 354 parser = argparse.ArgumentParser(description=main.__doc__) 355 register_arguments(parser) 356 parser.add_argument('-v', 357 '--verbose', 358 action='store_true', 359 help='Output additional logs as the script runs') 360 361 args_as_dict = dict(vars(parser.parse_args())) 362 del args_as_dict['verbose'] 363 return asyncio.run(find_and_run_tests(**args_as_dict)) 364 365 366if __name__ == '__main__': 367 pw_cli.log.install(hide_timestamp=True) 368 sys.exit(main()) 369