1#!/usr/bin/env python 2# Copyright 2025 The Pigweed Authors 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5# use this file except in compliance with the License. You may obtain a copy of 6# the License at 7# 8# https://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15"""``pw_watch.run`` executes arbitrary commands when watched files change. 16 17One or more commands to run are provided on the command line. These are executed 18in order when changes are detected. 19 20Examples: 21 22.. code-block:: sh 23 24 # Run `bazelisk --symlink_prefix=/ build //...` 25 run.py --prefix bazelisk --symlink_prefix=/ --pw-watch-commands build //... 26 27Multiple commands may be specified, separated by ``,``: 28 29.. code-block:: sh 30 31 # Run `cowsay` then `cowthink` when watched files change. 32 run.py cowsay "Hey, how are you?" , cowthink Not in moood to talk 33 34The Bazel build's ``//:watch`` watch entrypoint invokes ``//pw_watch/py:run`` 35with ``bazelisk`` as the command prefix. 36""" 37 38import argparse 39import errno 40import logging 41import os 42from pathlib import Path 43import shlex 44import subprocess 45import sys 46import threading 47import time 48from typing import Iterable, Sequence 49 50from watchdog.events import FileSystemEventHandler 51 52import pw_cli.color 53import pw_cli.env 54import pw_cli.log 55from pw_cli.plural import plural 56 57from pw_watch import common 58from pw_watch.debounce import DebouncedFunction, Debouncer 59 60_LOG = logging.getLogger('pw_watch') 61_COLOR = pw_cli.color.colors() 62 63_WIDTH = 80 64_STEP_START = '━' * (_WIDTH - 1) + '┓' 65_STEP_FINISH = '━' * (_WIDTH - 1) + '┛' 66 67 68def _format_time(time_s: float) -> str: 69 minutes, seconds = divmod(time_s, 60) 70 if minutes < 60: 71 return f' {int(minutes)}:{seconds:04.1f}' 72 hours, minutes = divmod(minutes, 60) 73 return f'{int(hours):d}:{int(minutes):02}:{int(seconds):02}' 74 75 76class Watcher(FileSystemEventHandler, DebouncedFunction): 77 """Process filesystem events and run commands on changes.""" 78 79 def __init__( 80 self, 81 commands: Iterable[Sequence[str]], 82 patterns: Iterable[str] = (), 83 ignore_patterns: Iterable[str] = (), 84 keep_going: bool = False, 85 ) -> None: 86 super().__init__() 87 88 self.commands = tuple(tuple(cmd) for cmd in commands) 89 self.patterns = patterns 90 self.ignore_patterns = ignore_patterns 91 self.keep_going = keep_going 92 93 self._debouncer = Debouncer(self) 94 threading.Thread(None, self._wait_for_enter).start() 95 96 def trigger_run(self, message: str) -> None: 97 self._debouncer.press(message) 98 99 def _wait_for_enter(self) -> None: 100 try: 101 while True: 102 _ = input() 103 self.trigger_run('Manual run requested') 104 # Ctrl-C on Unix generates KeyboardInterrupt 105 # Ctrl-Z on Windows generates EOFError 106 except (KeyboardInterrupt, EOFError): 107 _exit_due_to_interrupt() 108 109 def dispatch(self, event) -> None: 110 path = common.handle_watchdog_event( 111 event, self.patterns, self.ignore_patterns 112 ) 113 if path is not None: 114 self.trigger_run( 115 f'File change detected: {os.path.relpath(path)}; ' 116 'Triggering build...' 117 ) 118 119 # Implementation of DebouncedFunction.run() 120 # 121 # Note: This will run on the timer thread created by the Debouncer, rather 122 # than on the main thread that's watching file events. This enables the 123 # watcher to continue receiving file change events during a build. 124 def run(self) -> None: 125 print('\033c', end='', flush=True) # clear the screen 126 127 for i, command in enumerate(self.commands, 1): 128 count = f' {i}/{len(self.commands)} ' 129 print( 130 f'{_STEP_START}\n{count}{shlex.join(command)}\n', 131 flush=True, 132 ) 133 start = time.time() 134 code = subprocess.run(command).returncode 135 total_time = time.time() - start 136 137 if code == 0: 138 result = _COLOR.bold_green('PASSED') 139 msg = '' 140 else: 141 result = _COLOR.bold_red('FAILED') 142 msg = f' with exit code {code}' 143 144 remaining_width = _WIDTH - len(count) - 6 - len(msg) - 2 145 timestamp = _format_time(total_time).rjust(remaining_width) 146 print( 147 f'\n{count}{result}{msg}{timestamp}\n{_STEP_FINISH}\n', 148 flush=True, 149 ) 150 151 if code and not self.keep_going and i < len(self.commands): 152 _LOG.info( 153 'Skipping %d remaining %s; ' 154 'run with --keep-going to continue on errors', 155 len(self.commands) - i, 156 plural(len(self.commands) - i, 'command'), 157 ) 158 break 159 160 def cancel(self) -> bool: 161 """No-op implementation of DebouncedFunction.cancel().""" 162 return False 163 164 def on_complete(self, cancelled: bool = False) -> None: 165 """No-op implementation of DebouncedFunction.on_complete().""" 166 167 def on_keyboard_interrupt(self) -> None: 168 """Implementation of DebouncedFunction.on_keyboard_interrupt().""" 169 _exit_due_to_interrupt() 170 171 172def _exit_due_to_interrupt() -> None: 173 # To keep the log lines aligned with each other in the presence of 174 # a '^C' from the keyboard interrupt, add a newline before the log. 175 print('') 176 _LOG.info('Got Ctrl-C; exiting...') 177 common.exit_immediately(1) 178 179 180def watch_setup( 181 root: Path, 182 keep_going: bool, 183 commands: Sequence[tuple[str, ...]], 184 watch_patterns: Sequence[str] = common.WATCH_PATTERNS, 185 ignore_patterns: Sequence[str] = (), 186 exclude_dirs: Sequence[Path] | None = None, 187) -> tuple[Watcher, Sequence[Path]]: 188 """Returns a Watcher and list of directories to ignore.""" 189 os.chdir(root) 190 191 if exclude_dirs is None: 192 excludes = tuple(common.get_common_excludes(root)) 193 else: 194 excludes = tuple(exclude_dirs) 195 196 event_handler = Watcher( 197 commands, 198 patterns=watch_patterns, 199 ignore_patterns=ignore_patterns, 200 keep_going=keep_going, 201 ) 202 return event_handler, excludes 203 204 205def watch( 206 event_handler: Watcher, 207 exclude_dirs: Sequence[Path], 208 watch_path: Path, 209) -> None: 210 """Watches files and runs commands when they change.""" 211 try: 212 event_handler.trigger_run('Triggering initial run...') 213 wait = common.watch(watch_path, exclude_dirs, event_handler) 214 wait() 215 # Ctrl-C on Unix generates KeyboardInterrupt 216 # Ctrl-Z on Windows generates EOFError 217 except (KeyboardInterrupt, EOFError): 218 _exit_due_to_interrupt() 219 except OSError as err: 220 if err.args[0] == common.ERRNO_INOTIFY_LIMIT_REACHED: 221 common.log_inotify_watch_limit_reached() 222 common.exit_immediately(1) 223 if err.errno == errno.EMFILE: 224 common.log_inotify_instance_limit_reached() 225 common.exit_immediately(1) 226 raise err 227 228 229_PREFIX_ARG = '--prefix' 230_COMMANDS_ARG = '--pw-watch-commands' 231_CMD_DELIMITER = ',' 232 233 234def _parse_args() -> argparse.Namespace: 235 # Remove reST / Sphinx syntax from the docstring for use in --help. 236 help_string = __doc__.replace('``', '`').replace('.. code-block:: sh\n', '') 237 parser = argparse.ArgumentParser( 238 description=help_string, 239 formatter_class=argparse.RawDescriptionHelpFormatter, 240 ) 241 242 parser.add_argument( 243 '--root', 244 type=Path, 245 default=pw_cli.env.project_root(), 246 help='Directory from which to execute commands', 247 ) 248 parser.add_argument( 249 '-k', 250 '--keep-going', 251 action=argparse.BooleanOptionalAction, 252 default=False, 253 help='Continue executing commands after errors', 254 ) 255 parser.add_argument( 256 _PREFIX_ARG, 257 dest='prefix', 258 metavar='PREFIX...', 259 help=( 260 'Prefix to apply to all commands; consumes all arguments, ' 261 f'including -- args, until the mandatory {_COMMANDS_ARG} argument; ' 262 f'so must be the final argument before {_COMMANDS_ARG}' 263 ), 264 ) 265 parser.add_argument( 266 _COMMANDS_ARG, 267 dest='commands', 268 metavar=f'COMMAND... [{_CMD_DELIMITER} COMMAND...]', 269 required=True, 270 help=( 271 'Command to run; may specify multiple commands separated by ' 272 f'{_CMD_DELIMITER} (e.g. /bin/cmd1 --arg1 {_CMD_DELIMITER} ' 273 '/bin/cmd_2 -a2)' 274 ), 275 ) 276 parser.add_argument( 277 'commands', nargs=argparse.REMAINDER, help='Command to execute' 278 ) 279 280 # Parse the prefix and command arguments manually, since argparse doesn't 281 # support capturing -- arguments in other arguments. 282 args = sys.argv[1:] 283 284 try: 285 first_cmd_index = args.index(_COMMANDS_ARG) 286 raw_commands = args[first_cmd_index + 1 :] 287 del args[first_cmd_index:] # already parsed, remove it from argparse 288 except ValueError: 289 first_cmd_index = None 290 291 prefix: tuple[str, ...] = () 292 try: 293 prefix_index = args.index(_PREFIX_ARG) 294 if first_cmd_index is not None and prefix_index < first_cmd_index: 295 prefix = tuple(args[prefix_index + 1 : first_cmd_index]) 296 del args[prefix_index:] # already parsed, remove it from argparse 297 except ValueError: 298 pass 299 300 # Parse remaining args, using an empty stand-in for the commands. 301 parsed = parser.parse_args(args + [_COMMANDS_ARG, '']) 302 303 if first_cmd_index is None: 304 parser.error(f'{_COMMANDS_ARG} must be specified as the final argument') 305 if not raw_commands: 306 parser.error(f'{_COMMANDS_ARG} requires at least one command') 307 308 # Account for the arguments that were manually parsed. 309 parsed.commands = tuple(_parse_commands(prefix, raw_commands)) 310 del parsed.prefix 311 return parsed 312 313 314def _parse_commands( 315 prefix: Sequence[str], raw_commands: Sequence[str] 316) -> Iterable[tuple[str, ...]]: 317 start = 0 318 while start < len(raw_commands): 319 try: 320 end = raw_commands.index(_CMD_DELIMITER, start) 321 except ValueError: 322 end = len(raw_commands) 323 324 yield (*prefix, *raw_commands[start:end]) 325 start = end + 1 326 327 328def main() -> int: 329 """Watch files for changes and run commands.""" 330 pw_cli.log.install(level=logging.INFO, use_color=True, hide_timestamp=False) 331 332 parsed_args = _parse_args() 333 event_handler, exclude_dirs = watch_setup(**vars(parsed_args)) 334 watch(event_handler, exclude_dirs, parsed_args.root) 335 336 return 0 337 338 339if __name__ == '__main__': 340 main() 341