1# Copyright 2019 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Runs Pigweed unit tests built using GN.""" 15 16import argparse 17import asyncio 18import base64 19import datetime 20import enum 21import json 22import logging 23import os 24import re 25import subprocess 26import sys 27import time 28 29from pathlib import Path 30from typing import Dict, Iterable, List, Optional, Sequence, Set, Tuple 31 32import requests 33 34import pw_cli.log 35import pw_cli.process 36 37# Global logger for the script. 38_LOG: logging.Logger = logging.getLogger(__name__) 39 40_ANSI_SEQUENCE_REGEX = re.compile(rb'\x1b[^m]*m') 41 42 43def _strip_ansi(bytes_with_sequences: bytes) -> bytes: 44 """Strip out ANSI escape sequences.""" 45 return _ANSI_SEQUENCE_REGEX.sub(b'', bytes_with_sequences) 46 47 48def register_arguments(parser: argparse.ArgumentParser) -> None: 49 """Registers command-line arguments.""" 50 51 parser.add_argument( 52 '--root', 53 type=str, 54 default='out', 55 help='Path to the root build directory', 56 ) 57 parser.add_argument( 58 '-r', 59 '--runner', 60 type=str, 61 required=True, 62 help='Executable which runs a test on the target', 63 ) 64 parser.add_argument( 65 '-m', '--timeout', type=float, help='Timeout for test runner in seconds' 66 ) 67 parser.add_argument( 68 '--coverage-profraw', 69 type=str, 70 help='The name of the coverage profraw file to produce with the' 71 ' coverage information from this test. Only provide this if the test' 72 ' should be run for coverage and is properly instrumented.', 73 ) 74 parser.add_argument( 75 'runner_args', nargs="*", help='Arguments to forward to the test runner' 76 ) 77 78 # The runner script can either run binaries directly or groups. 79 group = parser.add_mutually_exclusive_group() 80 group.add_argument( 81 '-g', '--group', action='append', help='Test groups to run' 82 ) 83 group.add_argument( 84 '-t', '--test', action='append', help='Test binaries to run' 85 ) 86 87 88class TestResult(enum.Enum): 89 """Result of a single unit test run.""" 90 91 UNKNOWN = 0 92 SUCCESS = 1 93 FAILURE = 2 94 95 96class Test: 97 """A unit test executable.""" 98 99 def __init__(self, name: str, file_path: str) -> None: 100 self.name: str = name 101 self.file_path: str = file_path 102 self.status: TestResult = TestResult.UNKNOWN 103 self.start_time: datetime.datetime 104 self.duration_s: float 105 106 def __repr__(self) -> str: 107 return f'Test({self.name})' 108 109 def __eq__(self, other: object) -> bool: 110 if not isinstance(other, Test): 111 return NotImplemented 112 return self.file_path == other.file_path 113 114 def __hash__(self) -> int: 115 return hash(self.file_path) 116 117 118class TestGroup: 119 """Graph node representing a group of unit tests.""" 120 121 def __init__(self, name: str, tests: Iterable[Test]): 122 self._name: str = name 123 self._deps: Iterable['TestGroup'] = [] 124 self._tests: Iterable[Test] = tests 125 126 def set_deps(self, deps: Iterable['TestGroup']) -> None: 127 """Updates the dependency list of this group.""" 128 self._deps = deps 129 130 def all_test_dependencies(self) -> List[Test]: 131 """Returns a list of all tests in this group and its dependencies.""" 132 return list(self._all_test_dependencies(set())) 133 134 def _all_test_dependencies(self, processed_groups: Set[str]) -> Set[Test]: 135 if self._name in processed_groups: 136 return set() 137 138 tests: Set[Test] = set() 139 for dep in self._deps: 140 tests.update( 141 dep._all_test_dependencies( # pylint: disable=protected-access 142 processed_groups 143 ) 144 ) 145 146 tests.update(self._tests) 147 processed_groups.add(self._name) 148 149 return tests 150 151 def __repr__(self) -> str: 152 return f'TestGroup({self._name})' 153 154 155class TestRunner: 156 """Runs unit tests by calling out to a runner script.""" 157 158 def __init__( 159 self, 160 executable: str, 161 args: Sequence[str], 162 tests: Iterable[Test], 163 coverage_profraw: Optional[str] = None, 164 timeout: Optional[float] = None, 165 ) -> None: 166 self._executable: str = executable 167 self._args: Sequence[str] = args 168 self._tests: List[Test] = list(tests) 169 self._coverage_profraw = coverage_profraw 170 self._timeout = timeout 171 self._result_sink: Optional[Dict[str, str]] = None 172 173 # Access go/result-sink, if available. 174 ctx_path = Path(os.environ.get("LUCI_CONTEXT", '')) 175 if not ctx_path.is_file(): 176 return 177 178 ctx = json.loads(ctx_path.read_text(encoding='utf-8')) 179 self._result_sink = ctx.get('result_sink', None) 180 181 async def run_tests(self) -> None: 182 """Runs all registered unit tests through the runner script.""" 183 184 for idx, test in enumerate(self._tests, 1): 185 total = str(len(self._tests)) 186 test_counter = f'Test {idx:{len(total)}}/{total}' 187 188 _LOG.debug('%s: [ RUN] %s', test_counter, test.name) 189 190 # Convert POSIX to native directory seperators as GN produces '/' 191 # but the Windows test runner needs '\\'. 192 command = [ 193 str(Path(self._executable)), 194 *self._args, 195 str(Path(test.file_path)), 196 ] 197 198 if self._executable.endswith('.py'): 199 command.insert(0, sys.executable) 200 201 test.start_time = datetime.datetime.now(datetime.timezone.utc) 202 start_time = time.monotonic() 203 try: 204 env = {} 205 if self._coverage_profraw is not None: 206 env['LLVM_PROFILE_FILE'] = str(Path(self._coverage_profraw)) 207 process = await pw_cli.process.run_async( 208 *command, env=env, timeout=self._timeout 209 ) 210 except subprocess.CalledProcessError as err: 211 _LOG.error(err) 212 return 213 test.duration_s = time.monotonic() - start_time 214 215 if process.returncode == 0: 216 test.status = TestResult.SUCCESS 217 test_result = 'PASS' 218 else: 219 test.status = TestResult.FAILURE 220 test_result = 'FAIL' 221 222 _LOG.log( 223 pw_cli.log.LOGLEVEL_STDOUT, 224 '[Pid: %s]\n%s', 225 pw_cli.color.colors().bold_white(process.pid), 226 process.output.decode(errors='ignore').rstrip(), 227 ) 228 229 _LOG.info( 230 '%s: [%s] %s in %.3f s', 231 test_counter, 232 test_result, 233 test.name, 234 test.duration_s, 235 ) 236 237 try: 238 self._maybe_upload_to_resultdb(test, process) 239 except requests.exceptions.HTTPError as err: 240 _LOG.error(err) 241 return 242 243 def all_passed(self) -> bool: 244 """Returns true if all unit tests passed.""" 245 return all(test.status is TestResult.SUCCESS for test in self._tests) 246 247 def _maybe_upload_to_resultdb( 248 self, test: Test, process: pw_cli.process.CompletedProcess 249 ): 250 """Uploads test result to ResultDB, if available.""" 251 if self._result_sink is None: 252 # ResultDB integration not enabled. 253 return 254 255 test_result = { 256 # The test.name is not suitable as an identifier because it's just 257 # the basename of the test (channel_test). We want the full path, 258 # including the toolchain used. 259 "testId": test.file_path, 260 # ResultDB also supports CRASH and ABORT, but there's currently no 261 # way to distinguish these in pw_unit_test. 262 "status": "PASS" if test.status is TestResult.SUCCESS else "FAIL", 263 # The "expected" field is required. It could be used to report 264 # expected failures, but we don't currently support these in 265 # pw_unit_test. 266 "expected": test.status is TestResult.SUCCESS, 267 # Ensure to format the duration with '%.9fs' to avoid scientific 268 # notation. If a value is too large or small and formatted with 269 # str() or '%s', python formats the value in scientific notation, 270 # like '1.1e-10', which is an invalid input for 271 # google.protobuf.duration. 272 "duration": "%.9fs" % test.duration_s, 273 "start_time": test.start_time.isoformat(), 274 "testMetadata": { 275 # Use the file path as the test name in the Milo UI. (If this is 276 # left unspecified, the UI will attempt to build a "good enough" 277 # name by truncating the testId. That produces less readable 278 # results.) 279 "name": test.file_path, 280 }, 281 "summaryHtml": ( 282 '<p><text-artifact ' 283 'artifact-id="artifact-content-in-request"></p>' 284 ), 285 "artifacts": { 286 "artifact-content-in-request": { 287 # Need to decode the bytes back to ASCII or they will not be 288 # encodable by json.dumps. 289 # 290 # TODO(b/248349219): Instead of stripping the ANSI color 291 # codes, convert them to HTML. 292 "contents": base64.b64encode( 293 _strip_ansi(process.output) 294 ).decode('ascii'), 295 }, 296 }, 297 } 298 299 requests.post( 300 url='http://%s/prpc/luci.resultsink.v1.Sink/ReportTestResults' 301 % self._result_sink['address'], 302 headers={ 303 'Content-Type': 'application/json', 304 'Accept': 'application/json', 305 'Authorization': 'ResultSink %s' 306 % self._result_sink['auth_token'], 307 }, 308 data=json.dumps({'testResults': [test_result]}), 309 timeout=5.0, 310 ).raise_for_status() 311 312 313# Filename extension for unit test metadata files. 314METADATA_EXTENSION = '.testinfo.json' 315 316 317def find_test_metadata(root: str) -> List[str]: 318 """Locates all test metadata files located within a directory tree.""" 319 320 metadata: List[str] = [] 321 for path, _, files in os.walk(root): 322 for filename in files: 323 if not filename.endswith(METADATA_EXTENSION): 324 continue 325 326 full_path = os.path.join(path, filename) 327 _LOG.debug('Found group metadata at %s', full_path) 328 metadata.append(full_path) 329 330 return metadata 331 332 333# TODO(frolv): This is copied from the Python runner script. 334# It should be extracted into a library and imported instead. 335def find_binary(target: str) -> str: 336 """Tries to find a binary for a gn build target. 337 338 Args: 339 target: Relative filesystem path to the target's output directory and 340 target name, separated by a colon. 341 342 Returns: 343 Full path to the target's binary. 344 345 Raises: 346 FileNotFoundError: No binary found for target. 347 """ 348 349 target_path, target_name = target.split(':') 350 351 for extension in ['', '.elf', '.exe']: 352 potential_filename = f'{target_path}/{target_name}{extension}' 353 if os.path.isfile(potential_filename): 354 return potential_filename 355 356 raise FileNotFoundError( 357 f'Could not find output binary for build target {target}' 358 ) 359 360 361def parse_metadata(metadata: List[str], root: str) -> Dict[str, TestGroup]: 362 """Builds a graph of test group objects from metadata. 363 364 Args: 365 metadata: List of paths to JSON test metadata files. 366 root: Root output directory of the build. 367 368 Returns: 369 Map of group name to TestGroup object. All TestGroup objects are fully 370 populated with the paths to their unit tests and references to their 371 dependencies. 372 """ 373 374 def canonicalize(path: str) -> str: 375 """Removes a trailing slash from a GN target's directory. 376 377 '//module:target' -> '//module:target' 378 '//module/:target' -> '//module:target' 379 """ 380 index = path.find(':') 381 if index == -1 or path[index - 1] != '/': 382 return path 383 return path[: index - 1] + path[index:] 384 385 group_deps: List[Tuple[str, List[str]]] = [] 386 all_tests: Dict[str, Test] = {} 387 test_groups: Dict[str, TestGroup] = {} 388 num_tests = 0 389 390 for path in metadata: 391 with open(path, 'r') as metadata_file: 392 metadata_list = json.load(metadata_file) 393 394 deps: List[str] = [] 395 tests: List[Test] = [] 396 397 for entry in metadata_list: 398 if entry['type'] == 'self': 399 group_name = canonicalize(entry['name']) 400 elif entry['type'] == 'dep': 401 deps.append(canonicalize(entry['group'])) 402 elif entry['type'] == 'test': 403 test_directory = os.path.join(root, entry['test_directory']) 404 test_binary = find_binary( 405 f'{test_directory}:{entry["test_name"]}' 406 ) 407 408 if test_binary not in all_tests: 409 all_tests[test_binary] = Test( 410 entry['test_name'], test_binary 411 ) 412 413 tests.append(all_tests[test_binary]) 414 415 if deps: 416 group_deps.append((group_name, deps)) 417 418 num_tests += len(tests) 419 test_groups[group_name] = TestGroup(group_name, tests) 420 421 for name, deps in group_deps: 422 test_groups[name].set_deps([test_groups[dep] for dep in deps]) 423 424 _LOG.info('Found %d test groups (%d tests).', len(metadata), num_tests) 425 return test_groups 426 427 428def tests_from_groups( 429 group_names: Optional[Sequence[str]], root: str 430) -> List[Test]: 431 """Returns unit tests belonging to test groups and their dependencies. 432 433 If args.names is nonempty, only searches groups specified there. 434 Otherwise, finds tests from all known test groups. 435 """ 436 437 _LOG.info('Scanning for tests...') 438 metadata = find_test_metadata(root) 439 test_groups = parse_metadata(metadata, root) 440 441 groups_to_run = group_names if group_names else test_groups.keys() 442 tests_to_run: Set[Test] = set() 443 444 for name in groups_to_run: 445 try: 446 tests_to_run.update(test_groups[name].all_test_dependencies()) 447 except KeyError: 448 _LOG.error('Unknown test group: %s', name) 449 sys.exit(1) 450 451 _LOG.info('Running test groups %s', ', '.join(groups_to_run)) 452 return list(tests_to_run) 453 454 455def tests_from_paths(paths: Sequence[str]) -> List[Test]: 456 """Returns a list of tests from test executable paths.""" 457 458 tests: List[Test] = [] 459 for path in paths: 460 name = os.path.splitext(os.path.basename(path))[0] 461 tests.append(Test(name, path)) 462 return tests 463 464 465async def find_and_run_tests( 466 root: str, 467 runner: str, 468 runner_args: Sequence[str] = (), 469 coverage_profraw: Optional[str] = None, 470 timeout: Optional[float] = None, 471 group: Optional[Sequence[str]] = None, 472 test: Optional[Sequence[str]] = None, 473) -> int: 474 """Runs some unit tests.""" 475 476 if test: 477 tests = tests_from_paths(test) 478 else: 479 tests = tests_from_groups(group, root) 480 481 test_runner = TestRunner( 482 runner, runner_args, tests, coverage_profraw, timeout 483 ) 484 await test_runner.run_tests() 485 486 return 0 if test_runner.all_passed() else 1 487 488 489def main() -> int: 490 """Run Pigweed unit tests built using GN.""" 491 492 parser = argparse.ArgumentParser(description=main.__doc__) 493 register_arguments(parser) 494 parser.add_argument( 495 '-v', 496 '--verbose', 497 action='store_true', 498 help='Output additional logs as the script runs', 499 ) 500 501 args_as_dict = dict(vars(parser.parse_args())) 502 del args_as_dict['verbose'] 503 return asyncio.run(find_and_run_tests(**args_as_dict)) 504 505 506if __name__ == '__main__': 507 pw_cli.log.install(hide_timestamp=True) 508 sys.exit(main()) 509