• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2# Copyright 2025 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15"""``pw_watch.run`` executes arbitrary commands when watched files change.
16
17One or more commands to run are provided on the command line. These are executed
18in order when changes are detected.
19
20Examples:
21
22.. code-block:: sh
23
24   # Run `bazelisk --symlink_prefix=/ build //...`
25   run.py --prefix bazelisk --symlink_prefix=/ --pw-watch-commands build //...
26
27Multiple commands may be specified, separated by ``,``:
28
29.. code-block:: sh
30
31   # Run `cowsay` then `cowthink` when watched files change.
32   run.py cowsay "Hey, how are you?" , cowthink Not in moood to talk
33
34The Bazel build's ``//:watch`` watch entrypoint invokes ``//pw_watch/py:run``
35with ``bazelisk`` as the command prefix.
36"""
37
38import argparse
39import errno
40import logging
41import os
42from pathlib import Path
43import shlex
44import subprocess
45import sys
46import threading
47import time
48from typing import Iterable, Sequence
49
50from watchdog.events import FileSystemEventHandler
51
52import pw_cli.color
53import pw_cli.env
54import pw_cli.log
55from pw_cli.plural import plural
56
57from pw_watch import common
58from pw_watch.debounce import DebouncedFunction, Debouncer
59
60_LOG = logging.getLogger('pw_watch')
61_COLOR = pw_cli.color.colors()
62
63_WIDTH = 80
64_STEP_START = '━' * (_WIDTH - 1) + '┓'
65_STEP_FINISH = '━' * (_WIDTH - 1) + '┛'
66
67
68def _format_time(time_s: float) -> str:
69    minutes, seconds = divmod(time_s, 60)
70    if minutes < 60:
71        return f' {int(minutes)}:{seconds:04.1f}'
72    hours, minutes = divmod(minutes, 60)
73    return f'{int(hours):d}:{int(minutes):02}:{int(seconds):02}'
74
75
76class Watcher(FileSystemEventHandler, DebouncedFunction):
77    """Process filesystem events and run commands on changes."""
78
79    def __init__(
80        self,
81        commands: Iterable[Sequence[str]],
82        patterns: Iterable[str] = (),
83        ignore_patterns: Iterable[str] = (),
84        keep_going: bool = False,
85    ) -> None:
86        super().__init__()
87
88        self.commands = tuple(tuple(cmd) for cmd in commands)
89        self.patterns = patterns
90        self.ignore_patterns = ignore_patterns
91        self.keep_going = keep_going
92
93        self._debouncer = Debouncer(self)
94        threading.Thread(None, self._wait_for_enter).start()
95
96    def trigger_run(self, message: str) -> None:
97        self._debouncer.press(message)
98
99    def _wait_for_enter(self) -> None:
100        try:
101            while True:
102                _ = input()
103                self.trigger_run('Manual run requested')
104        # Ctrl-C on Unix generates KeyboardInterrupt
105        # Ctrl-Z on Windows generates EOFError
106        except (KeyboardInterrupt, EOFError):
107            _exit_due_to_interrupt()
108
109    def dispatch(self, event) -> None:
110        path = common.handle_watchdog_event(
111            event, self.patterns, self.ignore_patterns
112        )
113        if path is not None:
114            self.trigger_run(
115                f'File change detected: {os.path.relpath(path)}; '
116                'Triggering build...'
117            )
118
119    # Implementation of DebouncedFunction.run()
120    #
121    # Note: This will run on the timer thread created by the Debouncer, rather
122    # than on the main thread that's watching file events. This enables the
123    # watcher to continue receiving file change events during a build.
124    def run(self) -> None:
125        print('\033c', end='', flush=True)  # clear the screen
126
127        for i, command in enumerate(self.commands, 1):
128            count = f' {i}/{len(self.commands)}   '
129            print(
130                f'{_STEP_START}\n{count}{shlex.join(command)}\n',
131                flush=True,
132            )
133            start = time.time()
134            code = subprocess.run(command).returncode
135            total_time = time.time() - start
136
137            if code == 0:
138                result = _COLOR.bold_green('PASSED')
139                msg = ''
140            else:
141                result = _COLOR.bold_red('FAILED')
142                msg = f' with exit code {code}'
143
144            remaining_width = _WIDTH - len(count) - 6 - len(msg) - 2
145            timestamp = _format_time(total_time).rjust(remaining_width)
146            print(
147                f'\n{count}{result}{msg}{timestamp}\n{_STEP_FINISH}\n',
148                flush=True,
149            )
150
151            if code and not self.keep_going and i < len(self.commands):
152                _LOG.info(
153                    'Skipping %d remaining %s; '
154                    'run with --keep-going to continue on errors',
155                    len(self.commands) - i,
156                    plural(len(self.commands) - i, 'command'),
157                )
158                break
159
160    def cancel(self) -> bool:
161        """No-op implementation of DebouncedFunction.cancel()."""
162        return False
163
164    def on_complete(self, cancelled: bool = False) -> None:
165        """No-op implementation of DebouncedFunction.on_complete()."""
166
167    def on_keyboard_interrupt(self) -> None:
168        """Implementation of DebouncedFunction.on_keyboard_interrupt()."""
169        _exit_due_to_interrupt()
170
171
172def _exit_due_to_interrupt() -> None:
173    # To keep the log lines aligned with each other in the presence of
174    # a '^C' from the keyboard interrupt, add a newline before the log.
175    print('')
176    _LOG.info('Got Ctrl-C; exiting...')
177    common.exit_immediately(1)
178
179
180def watch_setup(
181    root: Path,
182    keep_going: bool,
183    commands: Sequence[tuple[str, ...]],
184    watch_patterns: Sequence[str] = common.WATCH_PATTERNS,
185    ignore_patterns: Sequence[str] = (),
186    exclude_dirs: Sequence[Path] | None = None,
187) -> tuple[Watcher, Sequence[Path]]:
188    """Returns a Watcher and list of directories to ignore."""
189    os.chdir(root)
190
191    if exclude_dirs is None:
192        excludes = tuple(common.get_common_excludes(root))
193    else:
194        excludes = tuple(exclude_dirs)
195
196    event_handler = Watcher(
197        commands,
198        patterns=watch_patterns,
199        ignore_patterns=ignore_patterns,
200        keep_going=keep_going,
201    )
202    return event_handler, excludes
203
204
205def watch(
206    event_handler: Watcher,
207    exclude_dirs: Sequence[Path],
208    watch_path: Path,
209) -> None:
210    """Watches files and runs commands when they change."""
211    try:
212        event_handler.trigger_run('Triggering initial run...')
213        wait = common.watch(watch_path, exclude_dirs, event_handler)
214        wait()
215    # Ctrl-C on Unix generates KeyboardInterrupt
216    # Ctrl-Z on Windows generates EOFError
217    except (KeyboardInterrupt, EOFError):
218        _exit_due_to_interrupt()
219    except OSError as err:
220        if err.args[0] == common.ERRNO_INOTIFY_LIMIT_REACHED:
221            common.log_inotify_watch_limit_reached()
222            common.exit_immediately(1)
223        if err.errno == errno.EMFILE:
224            common.log_inotify_instance_limit_reached()
225            common.exit_immediately(1)
226        raise err
227
228
229_PREFIX_ARG = '--prefix'
230_COMMANDS_ARG = '--pw-watch-commands'
231_CMD_DELIMITER = ','
232
233
234def _parse_args() -> argparse.Namespace:
235    # Remove reST / Sphinx syntax from the docstring for use in --help.
236    help_string = __doc__.replace('``', '`').replace('.. code-block:: sh\n', '')
237    parser = argparse.ArgumentParser(
238        description=help_string,
239        formatter_class=argparse.RawDescriptionHelpFormatter,
240    )
241
242    parser.add_argument(
243        '--root',
244        type=Path,
245        default=pw_cli.env.project_root(),
246        help='Directory from which to execute commands',
247    )
248    parser.add_argument(
249        '-k',
250        '--keep-going',
251        action=argparse.BooleanOptionalAction,
252        default=False,
253        help='Continue executing commands after errors',
254    )
255    parser.add_argument(
256        _PREFIX_ARG,
257        dest='prefix',
258        metavar='PREFIX...',
259        help=(
260            'Prefix to apply to all commands; consumes all arguments, '
261            f'including -- args, until the mandatory {_COMMANDS_ARG} argument; '
262            f'so must be the final argument before {_COMMANDS_ARG}'
263        ),
264    )
265    parser.add_argument(
266        _COMMANDS_ARG,
267        dest='commands',
268        metavar=f'COMMAND... [{_CMD_DELIMITER} COMMAND...]',
269        required=True,
270        help=(
271            'Command to run; may specify multiple commands separated by '
272            f'{_CMD_DELIMITER} (e.g. /bin/cmd1 --arg1 {_CMD_DELIMITER} '
273            '/bin/cmd_2 -a2)'
274        ),
275    )
276    parser.add_argument(
277        'commands', nargs=argparse.REMAINDER, help='Command to execute'
278    )
279
280    # Parse the prefix and command arguments manually, since argparse doesn't
281    # support capturing -- arguments in other arguments.
282    args = sys.argv[1:]
283
284    try:
285        first_cmd_index = args.index(_COMMANDS_ARG)
286        raw_commands = args[first_cmd_index + 1 :]
287        del args[first_cmd_index:]  # already parsed, remove it from argparse
288    except ValueError:
289        first_cmd_index = None
290
291    prefix: tuple[str, ...] = ()
292    try:
293        prefix_index = args.index(_PREFIX_ARG)
294        if first_cmd_index is not None and prefix_index < first_cmd_index:
295            prefix = tuple(args[prefix_index + 1 : first_cmd_index])
296        del args[prefix_index:]  # already parsed, remove it from argparse
297    except ValueError:
298        pass
299
300    # Parse remaining args, using an empty stand-in for the commands.
301    parsed = parser.parse_args(args + [_COMMANDS_ARG, ''])
302
303    if first_cmd_index is None:
304        parser.error(f'{_COMMANDS_ARG} must be specified as the final argument')
305    if not raw_commands:
306        parser.error(f'{_COMMANDS_ARG} requires at least one command')
307
308    # Account for the arguments that were manually parsed.
309    parsed.commands = tuple(_parse_commands(prefix, raw_commands))
310    del parsed.prefix
311    return parsed
312
313
314def _parse_commands(
315    prefix: Sequence[str], raw_commands: Sequence[str]
316) -> Iterable[tuple[str, ...]]:
317    start = 0
318    while start < len(raw_commands):
319        try:
320            end = raw_commands.index(_CMD_DELIMITER, start)
321        except ValueError:
322            end = len(raw_commands)
323
324        yield (*prefix, *raw_commands[start:end])
325        start = end + 1
326
327
328def main() -> int:
329    """Watch files for changes and run commands."""
330    pw_cli.log.install(level=logging.INFO, use_color=True, hide_timestamp=False)
331
332    parsed_args = _parse_args()
333    event_handler, exclude_dirs = watch_setup(**vars(parsed_args))
334    watch(event_handler, exclude_dirs, parsed_args.root)
335
336    return 0
337
338
339if __name__ == '__main__':
340    main()
341