• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""General purpose tools for running presubmit checks."""
15
16import collections.abc
17import logging
18import os
19from pathlib import Path
20import shlex
21import subprocess
22from typing import (
23    Iterable,
24    Iterator,
25    Sequence,
26)
27
28from pw_cli.tool_runner import ToolRunner
29from pw_presubmit.presubmit_context import PRESUBMIT_CONTEXT
30
31_LOG: logging.Logger = logging.getLogger(__name__)
32
33
34def make_box(section_alignments: Sequence[str]) -> str:
35    indices = [i + 1 for i in range(len(section_alignments))]
36    top_sections = '{2}'.join('{1:{1}^{width%d}}' % i for i in indices)
37    mid_sections = '{5}'.join(
38        '{section%d:%s{width%d}}' % (i, section_alignments[i - 1], i)
39        for i in indices
40    )
41    bot_sections = '{9}'.join('{8:{8}^{width%d}}' % i for i in indices)
42
43    return ''.join(
44        [
45            '{0}',
46            *top_sections,
47            '{3}\n',
48            '{4}',
49            *mid_sections,
50            '{6}\n',
51            '{7}',
52            *bot_sections,
53            '{10}',
54        ]
55    )
56
57
58def relative_paths(paths: Iterable[Path], start: Path) -> Iterable[Path]:
59    """Returns relative Paths calculated with os.path.relpath."""
60    for path in paths:
61        yield Path(os.path.relpath(path, start))
62
63
64def _truncate(value, length: int = 60) -> str:
65    value = str(value)
66    return (value[: length - 5] + '[...]') if len(value) > length else value
67
68
69def format_command(args: Sequence, kwargs: dict) -> tuple[str, str]:
70    attr = ', '.join(f'{k}={_truncate(v)}' for k, v in sorted(kwargs.items()))
71    return attr, ' '.join(shlex.quote(str(arg)) for arg in args)
72
73
74def log_run(
75    args, ignore_dry_run: bool = False, **kwargs
76) -> subprocess.CompletedProcess:
77    """Logs a command then runs it with subprocess.run.
78
79    Takes the same arguments as subprocess.run. The command is only executed if
80    dry-run is not enabled.
81    """
82    ctx = PRESUBMIT_CONTEXT.get()
83    if ctx:
84        # Save the subprocess command args for pw build presubmit runner.
85        if not ignore_dry_run:
86            ctx.append_check_command(*args, **kwargs)
87        if ctx.dry_run and not ignore_dry_run:
88            # Return an empty CompletedProcess without actually running anything
89            # if dry-run mode is on.
90            empty_proc: subprocess.CompletedProcess = (
91                subprocess.CompletedProcess('', 0)
92            )
93            empty_proc.stdout = b''
94            empty_proc.stderr = b''
95            return empty_proc
96    _LOG.debug('[COMMAND] %s\n%s', *format_command(args, kwargs))
97    return subprocess.run(args, **kwargs)
98
99
100class PresubmitToolRunner(ToolRunner):
101    """A simple ToolRunner that runs a process via `log_run()`."""
102
103    @staticmethod
104    def _custom_args() -> Iterable[str]:
105        return ['pw_presubmit_ignore_dry_run']
106
107    def _run_tool(
108        self, tool: str, args, pw_presubmit_ignore_dry_run=False, **kwargs
109    ) -> subprocess.CompletedProcess:
110        """Run the requested tool as a subprocess."""
111        return log_run(
112            [tool, *args],
113            **kwargs,
114            ignore_dry_run=pw_presubmit_ignore_dry_run,
115        )
116
117
118def flatten(*items) -> Iterator:
119    """Yields items from a series of items and nested iterables.
120
121    This function is used to flatten arbitrarily nested lists. str and bytes
122    are kept intact.
123    """
124
125    for item in items:
126        if isinstance(item, collections.abc.Iterable) and not isinstance(
127            item, (str, bytes, bytearray)
128        ):
129            yield from flatten(*item)
130        else:
131            yield item
132