• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2019 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Runs Pigweed unit tests built using GN."""
15
16import argparse
17import asyncio
18import base64
19import datetime
20import enum
21import json
22import logging
23import os
24import re
25import subprocess
26import sys
27import time
28
29from pathlib import Path
30from typing import Dict, Iterable, List, Optional, Sequence, Set, Tuple
31
32import requests
33
34import pw_cli.log
35import pw_cli.process
36
37# Global logger for the script.
38_LOG: logging.Logger = logging.getLogger(__name__)
39
40_ANSI_SEQUENCE_REGEX = re.compile(rb'\x1b[^m]*m')
41
42
43def _strip_ansi(bytes_with_sequences: bytes) -> bytes:
44    """Strip out ANSI escape sequences."""
45    return _ANSI_SEQUENCE_REGEX.sub(b'', bytes_with_sequences)
46
47
48def register_arguments(parser: argparse.ArgumentParser) -> None:
49    """Registers command-line arguments."""
50
51    parser.add_argument(
52        '--root',
53        type=str,
54        default='out',
55        help='Path to the root build directory',
56    )
57    parser.add_argument(
58        '-r',
59        '--runner',
60        type=str,
61        required=True,
62        help='Executable which runs a test on the target',
63    )
64    parser.add_argument(
65        '-m', '--timeout', type=float, help='Timeout for test runner in seconds'
66    )
67    parser.add_argument(
68        '--coverage-profraw',
69        type=str,
70        help='The name of the coverage profraw file to produce with the'
71        ' coverage information from this test. Only provide this if the test'
72        ' should be run for coverage and is properly instrumented.',
73    )
74    parser.add_argument(
75        'runner_args', nargs="*", help='Arguments to forward to the test runner'
76    )
77
78    # The runner script can either run binaries directly or groups.
79    group = parser.add_mutually_exclusive_group()
80    group.add_argument(
81        '-g', '--group', action='append', help='Test groups to run'
82    )
83    group.add_argument(
84        '-t', '--test', action='append', help='Test binaries to run'
85    )
86
87
88class TestResult(enum.Enum):
89    """Result of a single unit test run."""
90
91    UNKNOWN = 0
92    SUCCESS = 1
93    FAILURE = 2
94
95
96class Test:
97    """A unit test executable."""
98
99    def __init__(self, name: str, file_path: str) -> None:
100        self.name: str = name
101        self.file_path: str = file_path
102        self.status: TestResult = TestResult.UNKNOWN
103        self.start_time: datetime.datetime
104        self.duration_s: float
105
106    def __repr__(self) -> str:
107        return f'Test({self.name})'
108
109    def __eq__(self, other: object) -> bool:
110        if not isinstance(other, Test):
111            return NotImplemented
112        return self.file_path == other.file_path
113
114    def __hash__(self) -> int:
115        return hash(self.file_path)
116
117
118class TestGroup:
119    """Graph node representing a group of unit tests."""
120
121    def __init__(self, name: str, tests: Iterable[Test]):
122        self._name: str = name
123        self._deps: Iterable['TestGroup'] = []
124        self._tests: Iterable[Test] = tests
125
126    def set_deps(self, deps: Iterable['TestGroup']) -> None:
127        """Updates the dependency list of this group."""
128        self._deps = deps
129
130    def all_test_dependencies(self) -> List[Test]:
131        """Returns a list of all tests in this group and its dependencies."""
132        return list(self._all_test_dependencies(set()))
133
134    def _all_test_dependencies(self, processed_groups: Set[str]) -> Set[Test]:
135        if self._name in processed_groups:
136            return set()
137
138        tests: Set[Test] = set()
139        for dep in self._deps:
140            tests.update(
141                dep._all_test_dependencies(  # pylint: disable=protected-access
142                    processed_groups
143                )
144            )
145
146        tests.update(self._tests)
147        processed_groups.add(self._name)
148
149        return tests
150
151    def __repr__(self) -> str:
152        return f'TestGroup({self._name})'
153
154
155class TestRunner:
156    """Runs unit tests by calling out to a runner script."""
157
158    def __init__(
159        self,
160        executable: str,
161        args: Sequence[str],
162        tests: Iterable[Test],
163        coverage_profraw: Optional[str] = None,
164        timeout: Optional[float] = None,
165    ) -> None:
166        self._executable: str = executable
167        self._args: Sequence[str] = args
168        self._tests: List[Test] = list(tests)
169        self._coverage_profraw = coverage_profraw
170        self._timeout = timeout
171        self._result_sink: Optional[Dict[str, str]] = None
172
173        # Access go/result-sink, if available.
174        ctx_path = Path(os.environ.get("LUCI_CONTEXT", ''))
175        if not ctx_path.is_file():
176            return
177
178        ctx = json.loads(ctx_path.read_text(encoding='utf-8'))
179        self._result_sink = ctx.get('result_sink', None)
180
181    async def run_tests(self) -> None:
182        """Runs all registered unit tests through the runner script."""
183
184        for idx, test in enumerate(self._tests, 1):
185            total = str(len(self._tests))
186            test_counter = f'Test {idx:{len(total)}}/{total}'
187
188            _LOG.debug('%s: [ RUN] %s', test_counter, test.name)
189
190            # Convert POSIX to native directory seperators as GN produces '/'
191            # but the Windows test runner needs '\\'.
192            command = [
193                str(Path(self._executable)),
194                *self._args,
195                str(Path(test.file_path)),
196            ]
197
198            if self._executable.endswith('.py'):
199                command.insert(0, sys.executable)
200
201            test.start_time = datetime.datetime.now(datetime.timezone.utc)
202            start_time = time.monotonic()
203            try:
204                env = {}
205                if self._coverage_profraw is not None:
206                    env['LLVM_PROFILE_FILE'] = str(Path(self._coverage_profraw))
207                process = await pw_cli.process.run_async(
208                    *command, env=env, timeout=self._timeout
209                )
210            except subprocess.CalledProcessError as err:
211                _LOG.error(err)
212                return
213            test.duration_s = time.monotonic() - start_time
214
215            if process.returncode == 0:
216                test.status = TestResult.SUCCESS
217                test_result = 'PASS'
218            else:
219                test.status = TestResult.FAILURE
220                test_result = 'FAIL'
221
222                _LOG.log(
223                    pw_cli.log.LOGLEVEL_STDOUT,
224                    '[Pid: %s]\n%s',
225                    pw_cli.color.colors().bold_white(process.pid),
226                    process.output.decode(errors='ignore').rstrip(),
227                )
228
229                _LOG.info(
230                    '%s: [%s] %s in %.3f s',
231                    test_counter,
232                    test_result,
233                    test.name,
234                    test.duration_s,
235                )
236
237            try:
238                self._maybe_upload_to_resultdb(test, process)
239            except requests.exceptions.HTTPError as err:
240                _LOG.error(err)
241                return
242
243    def all_passed(self) -> bool:
244        """Returns true if all unit tests passed."""
245        return all(test.status is TestResult.SUCCESS for test in self._tests)
246
247    def _maybe_upload_to_resultdb(
248        self, test: Test, process: pw_cli.process.CompletedProcess
249    ):
250        """Uploads test result to ResultDB, if available."""
251        if self._result_sink is None:
252            # ResultDB integration not enabled.
253            return
254
255        test_result = {
256            # The test.name is not suitable as an identifier because it's just
257            # the basename of the test (channel_test). We want the full path,
258            # including the toolchain used.
259            "testId": test.file_path,
260            # ResultDB also supports CRASH and ABORT, but there's currently no
261            # way to distinguish these in pw_unit_test.
262            "status": "PASS" if test.status is TestResult.SUCCESS else "FAIL",
263            # The "expected" field is required. It could be used to report
264            # expected failures, but we don't currently support these in
265            # pw_unit_test.
266            "expected": test.status is TestResult.SUCCESS,
267            # Ensure to format the duration with '%.9fs' to avoid scientific
268            # notation.  If a value is too large or small and formatted with
269            # str() or '%s', python formats the value in scientific notation,
270            # like '1.1e-10', which is an invalid input for
271            # google.protobuf.duration.
272            "duration": "%.9fs" % test.duration_s,
273            "start_time": test.start_time.isoformat(),
274            "testMetadata": {
275                # Use the file path as the test name in the Milo UI. (If this is
276                # left unspecified, the UI will attempt to build a "good enough"
277                # name by truncating the testId. That produces less readable
278                # results.)
279                "name": test.file_path,
280            },
281            "summaryHtml": (
282                '<p><text-artifact '
283                'artifact-id="artifact-content-in-request"></p>'
284            ),
285            "artifacts": {
286                "artifact-content-in-request": {
287                    # Need to decode the bytes back to ASCII or they will not be
288                    # encodable by json.dumps.
289                    #
290                    # TODO(b/248349219): Instead of stripping the ANSI color
291                    # codes, convert them to HTML.
292                    "contents": base64.b64encode(
293                        _strip_ansi(process.output)
294                    ).decode('ascii'),
295                },
296            },
297        }
298
299        requests.post(
300            url='http://%s/prpc/luci.resultsink.v1.Sink/ReportTestResults'
301            % self._result_sink['address'],
302            headers={
303                'Content-Type': 'application/json',
304                'Accept': 'application/json',
305                'Authorization': 'ResultSink %s'
306                % self._result_sink['auth_token'],
307            },
308            data=json.dumps({'testResults': [test_result]}),
309            timeout=5.0,
310        ).raise_for_status()
311
312
313# Filename extension for unit test metadata files.
314METADATA_EXTENSION = '.testinfo.json'
315
316
317def find_test_metadata(root: str) -> List[str]:
318    """Locates all test metadata files located within a directory tree."""
319
320    metadata: List[str] = []
321    for path, _, files in os.walk(root):
322        for filename in files:
323            if not filename.endswith(METADATA_EXTENSION):
324                continue
325
326            full_path = os.path.join(path, filename)
327            _LOG.debug('Found group metadata at %s', full_path)
328            metadata.append(full_path)
329
330    return metadata
331
332
333# TODO(frolv): This is copied from the Python runner script.
334# It should be extracted into a library and imported instead.
335def find_binary(target: str) -> str:
336    """Tries to find a binary for a gn build target.
337
338    Args:
339        target: Relative filesystem path to the target's output directory and
340            target name, separated by a colon.
341
342    Returns:
343        Full path to the target's binary.
344
345    Raises:
346        FileNotFoundError: No binary found for target.
347    """
348
349    target_path, target_name = target.split(':')
350
351    for extension in ['', '.elf', '.exe']:
352        potential_filename = f'{target_path}/{target_name}{extension}'
353        if os.path.isfile(potential_filename):
354            return potential_filename
355
356    raise FileNotFoundError(
357        f'Could not find output binary for build target {target}'
358    )
359
360
361def parse_metadata(metadata: List[str], root: str) -> Dict[str, TestGroup]:
362    """Builds a graph of test group objects from metadata.
363
364    Args:
365        metadata: List of paths to JSON test metadata files.
366        root: Root output directory of the build.
367
368    Returns:
369        Map of group name to TestGroup object. All TestGroup objects are fully
370        populated with the paths to their unit tests and references to their
371        dependencies.
372    """
373
374    def canonicalize(path: str) -> str:
375        """Removes a trailing slash from a GN target's directory.
376
377        '//module:target'  -> '//module:target'
378        '//module/:target' -> '//module:target'
379        """
380        index = path.find(':')
381        if index == -1 or path[index - 1] != '/':
382            return path
383        return path[: index - 1] + path[index:]
384
385    group_deps: List[Tuple[str, List[str]]] = []
386    all_tests: Dict[str, Test] = {}
387    test_groups: Dict[str, TestGroup] = {}
388    num_tests = 0
389
390    for path in metadata:
391        with open(path, 'r') as metadata_file:
392            metadata_list = json.load(metadata_file)
393
394        deps: List[str] = []
395        tests: List[Test] = []
396
397        for entry in metadata_list:
398            if entry['type'] == 'self':
399                group_name = canonicalize(entry['name'])
400            elif entry['type'] == 'dep':
401                deps.append(canonicalize(entry['group']))
402            elif entry['type'] == 'test':
403                test_directory = os.path.join(root, entry['test_directory'])
404                test_binary = find_binary(
405                    f'{test_directory}:{entry["test_name"]}'
406                )
407
408                if test_binary not in all_tests:
409                    all_tests[test_binary] = Test(
410                        entry['test_name'], test_binary
411                    )
412
413                tests.append(all_tests[test_binary])
414
415        if deps:
416            group_deps.append((group_name, deps))
417
418        num_tests += len(tests)
419        test_groups[group_name] = TestGroup(group_name, tests)
420
421    for name, deps in group_deps:
422        test_groups[name].set_deps([test_groups[dep] for dep in deps])
423
424    _LOG.info('Found %d test groups (%d tests).', len(metadata), num_tests)
425    return test_groups
426
427
428def tests_from_groups(
429    group_names: Optional[Sequence[str]], root: str
430) -> List[Test]:
431    """Returns unit tests belonging to test groups and their dependencies.
432
433    If args.names is nonempty, only searches groups specified there.
434    Otherwise, finds tests from all known test groups.
435    """
436
437    _LOG.info('Scanning for tests...')
438    metadata = find_test_metadata(root)
439    test_groups = parse_metadata(metadata, root)
440
441    groups_to_run = group_names if group_names else test_groups.keys()
442    tests_to_run: Set[Test] = set()
443
444    for name in groups_to_run:
445        try:
446            tests_to_run.update(test_groups[name].all_test_dependencies())
447        except KeyError:
448            _LOG.error('Unknown test group: %s', name)
449            sys.exit(1)
450
451    _LOG.info('Running test groups %s', ', '.join(groups_to_run))
452    return list(tests_to_run)
453
454
455def tests_from_paths(paths: Sequence[str]) -> List[Test]:
456    """Returns a list of tests from test executable paths."""
457
458    tests: List[Test] = []
459    for path in paths:
460        name = os.path.splitext(os.path.basename(path))[0]
461        tests.append(Test(name, path))
462    return tests
463
464
465async def find_and_run_tests(
466    root: str,
467    runner: str,
468    runner_args: Sequence[str] = (),
469    coverage_profraw: Optional[str] = None,
470    timeout: Optional[float] = None,
471    group: Optional[Sequence[str]] = None,
472    test: Optional[Sequence[str]] = None,
473) -> int:
474    """Runs some unit tests."""
475
476    if test:
477        tests = tests_from_paths(test)
478    else:
479        tests = tests_from_groups(group, root)
480
481    test_runner = TestRunner(
482        runner, runner_args, tests, coverage_profraw, timeout
483    )
484    await test_runner.run_tests()
485
486    return 0 if test_runner.all_passed() else 1
487
488
489def main() -> int:
490    """Run Pigweed unit tests built using GN."""
491
492    parser = argparse.ArgumentParser(description=main.__doc__)
493    register_arguments(parser)
494    parser.add_argument(
495        '-v',
496        '--verbose',
497        action='store_true',
498        help='Output additional logs as the script runs',
499    )
500
501    args_as_dict = dict(vars(parser.parse_args()))
502    del args_as_dict['verbose']
503    return asyncio.run(find_and_run_tests(**args_as_dict))
504
505
506if __name__ == '__main__':
507    pw_cli.log.install(hide_timestamp=True)
508    sys.exit(main())
509