1# Copyright 2025 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Utilities for managing and filtering paths to watch with pw_watch.""" 15 16import logging 17import os 18from pathlib import Path 19import subprocess 20from typing import Callable, Iterable, NoReturn 21 22import pw_cli.color 23 24from watchdog.events import FileSystemEventHandler 25from watchdog.observers import Observer 26 27_LOG = logging.getLogger('pw_watch') 28_COLOR = pw_cli.color.colors() 29 30# Suppress events under 'fsevents', generated by watchdog on every file 31# event on MacOS. 32# TODO: b/182281481 - Fix file ignoring, rather than just suppressing logs 33logging.getLogger('fsevents').setLevel(logging.WARNING) 34 35 36ERRNO_INOTIFY_LIMIT_REACHED = 28 37WATCH_PATTERNS = ( 38 '*.bazel', 39 '*.bzl', 40 '*.bloaty', 41 '*.c', 42 '*.cc', 43 '*.css', 44 '*.cpp', 45 '*.cmake', 46 'CMakeLists.txt', 47 '*.dts', 48 '*.dtsi', 49 '*.emb', 50 '*.gn', 51 '*.gni', 52 '*.go', 53 '*.h', 54 '*.hpp', 55 '*.html', 56 '*.java', 57 '*.js', 58 '*.ld', 59 '*.md', 60 '*.options', 61 '*.proto', 62 '*.py', 63 '*.rs', 64 '*.rst', 65 '*.s', 66 '*.S', 67 '*.toml', 68 '*.ts', 69) 70 71 72def git_ignored(file: Path) -> bool: 73 """Returns true if this file is in a Git repo and ignored by that repo. 74 75 Returns true for ignored files that were manually added to a repo. 76 """ 77 file = file.resolve() 78 directory = file.parent 79 80 # Run the Git command from file's parent so that the correct repo is used. 81 while True: 82 try: 83 returncode = subprocess.run( 84 ['git', 'check-ignore', '--quiet', '--no-index', file], 85 stdout=subprocess.DEVNULL, 86 stderr=subprocess.DEVNULL, 87 cwd=directory, 88 ).returncode 89 return returncode in (0, 128) 90 except FileNotFoundError: 91 # If the directory no longer exists, try parent directories until 92 # an existing directory is found or all directories have been 93 # checked. This approach makes it possible to check if a deleted 94 # path is ignored in the repo it was originally created in. 95 if directory == directory.parent: 96 return False 97 98 directory = directory.parent 99 100 101def get_common_excludes(root: Path) -> Iterable[Path]: 102 """Find commonly excluded directories, and return them as a [Path]""" 103 exclude_list: list[Path] = [] 104 105 typical_ignored_directories: list[str] = [ 106 '.environment', # Legacy bootstrap-created CIPD and Python venv. 107 '.presubmit', # Presubmit-created CIPD and Python venv. 108 '.git', # Pigweed's git repo. 109 '.mypy_cache', # Python static analyzer. 110 '.cargo', # Rust package manager. 111 'environment', # Bootstrap-created CIPD and Python venv. 112 'out', # Typical build directory. 113 ] 114 115 # Preset exclude for common project structures. 116 exclude_list.extend( 117 root / ignored_directory 118 for ignored_directory in typical_ignored_directories 119 ) 120 121 # Ignore bazel-* directories 122 exclude_list.extend( 123 d for d in root.glob('bazel-*') if d.is_dir() and d.is_symlink() 124 ) 125 126 # Check for and warn about legacy directories. 127 legacy_directories = [ 128 '.cipd', # Legacy CIPD location. 129 '.python3-venv', # Legacy Python venv location. 130 ] 131 found_legacy = False 132 for legacy_directory in legacy_directories: 133 full_legacy_directory = root / legacy_directory 134 if full_legacy_directory.is_dir(): 135 _LOG.warning( 136 'Legacy environment directory found: %s', 137 str(full_legacy_directory), 138 ) 139 exclude_list.append(full_legacy_directory) 140 found_legacy = True 141 if found_legacy: 142 _LOG.warning( 143 'Found legacy environment directory(s); these ' 'should be deleted' 144 ) 145 146 return exclude_list 147 148 149_FILESYSTEM_EVENTS_THAT_TRIGGER_BUILDS = ( 150 'created', 151 'modified', 152 'deleted', 153 'moved', 154) 155 156 157def handle_watchdog_event( 158 event, watch_patterns: Iterable[str], ignore_patterns: Iterable[str] 159) -> Path | None: 160 """Returns the path if the event is significant, otherwise None.""" 161 162 def path_matches(path: Path) -> bool: 163 return not any(path.match(x) for x in ignore_patterns) and any( 164 path.match(x) for x in watch_patterns 165 ) 166 167 # There isn't any point in triggering builds on new directory creation. 168 # It's the creation or modification of files that indicate something 169 # meaningful enough changed for a build. 170 if event.is_directory: 171 return None 172 173 if event.event_type not in _FILESYSTEM_EVENTS_THAT_TRIGGER_BUILDS: 174 return None 175 176 # Collect paths of interest from the event. 177 paths: list[str] = [] 178 if hasattr(event, 'dest_path'): 179 paths.append(os.fsdecode(event.dest_path)) 180 if event.src_path: 181 paths.append(os.fsdecode(event.src_path)) 182 183 # Check whether Git cares about any of these paths. 184 for path in (Path(p).resolve() for p in paths): 185 if not git_ignored(path) and path_matches(path): 186 return path 187 188 return None 189 190 191# Go over each directory inside of the current directory. 192# If it is not on the path of elements in directories_to_exclude, add 193# (directory, True) to subdirectories_to_watch and later recursively call 194# Observer() on them. 195# Otherwise add (directory, False) to subdirectories_to_watch and later call 196# Observer() with recursion=False. 197def minimal_watch_directories(to_watch: Path, to_exclude: Iterable[Path]): 198 """Determine which subdirectory to watch recursively""" 199 try: 200 to_watch = Path(to_watch) 201 except TypeError: 202 assert False, "Please watch one directory at a time." 203 204 # Reformat to_exclude. 205 directories_to_exclude: list[Path] = [ 206 to_watch.joinpath(directory_to_exclude) 207 for directory_to_exclude in to_exclude 208 if to_watch.joinpath(directory_to_exclude).is_dir() 209 ] 210 211 # Split the relative path of directories_to_exclude (compared to to_watch), 212 # and generate all parent paths needed to be watched without recursion. 213 exclude_dir_parents = {to_watch} 214 for directory_to_exclude in directories_to_exclude: 215 # Irrelevant excluded path 216 if not Path(directory_to_exclude).is_relative_to(to_watch): 217 continue 218 219 parts = list(Path(directory_to_exclude).relative_to(to_watch).parts)[ 220 :-1 221 ] 222 dir_tmp = to_watch 223 for part in parts: 224 dir_tmp = Path(dir_tmp, part) 225 exclude_dir_parents.add(dir_tmp) 226 227 # Go over all layers of directory. Append those that are the parents of 228 # directories_to_exclude to the list with recursion==False, and others 229 # with recursion==True. 230 for directory in exclude_dir_parents: 231 dir_path = Path(directory) 232 yield dir_path, False 233 for item in Path(directory).iterdir(): 234 if ( 235 item.is_dir() 236 and item not in exclude_dir_parents 237 and item not in directories_to_exclude 238 ): 239 yield item, True 240 241 242def watch( 243 watch_path: Path, 244 exclude_list: Iterable[Path], 245 event_handler: FileSystemEventHandler, 246) -> Callable[[], None]: 247 """Attaches the filesystem watcher for the specified paths. 248 249 Returns: 250 A function that, when called, blocks the thread until an internal watcher 251 error occurs. 252 """ 253 # It can take awhile to configure the filesystem watcher, so have the 254 # message reflect that with the "...". Run inside the try: to 255 # gracefully handle the user Ctrl-C'ing out during startup. 256 257 # Try to make a short display path for the watched directory that has 258 # "$HOME" instead of the full home directory. This is nice for users 259 # who have deeply nested home directories. 260 path_to_log = str(watch_path.resolve()).replace(str(Path.home()), '$HOME') 261 _LOG.info('Attaching filesystem watcher to %s/...', path_to_log) 262 263 # Observe changes for all files in the root directory. Whether the 264 # directory should be observed recursively or not is determined by the 265 # second element in subdirectories_to_watch. 266 observers = [] 267 for path, rec in minimal_watch_directories(watch_path, exclude_list): 268 observer = Observer() 269 observer.schedule( 270 event_handler, 271 str(path), 272 recursive=rec, 273 ) 274 observer.start() 275 observers.append(observer) 276 277 def wait_function() -> None: 278 for observer in observers: 279 while observer.is_alive(): 280 observer.join(1) 281 _LOG.error('Observers joined unexpectedly') 282 283 return wait_function 284 285 286def log_inotify_watch_limit_reached() -> None: 287 """Log that the inotify watch limit was reached. 288 289 Show information and suggested commands in OSError: inotify limit reached. 290 """ 291 _LOG.error( 292 'Inotify watch limit reached: run this in your terminal if ' 293 'you are in Linux to temporarily increase inotify limit.' 294 ) 295 _LOG.info('') 296 _LOG.info( 297 _COLOR.green( 298 ' sudo sysctl fs.inotify.max_user_watches=' '$NEW_LIMIT$' 299 ) 300 ) 301 _LOG.info('') 302 _LOG.info( 303 ' Change $NEW_LIMIT$ with an integer number, ' 304 'e.g., 20000 should be enough.' 305 ) 306 307 308def log_inotify_instance_limit_reached() -> None: 309 """Log that the inotify instance limit was reached. 310 311 Show information and suggested commands in OSError: inotify limit reached. 312 """ 313 _LOG.error( 314 'Inotify instance limit reached: run this in your terminal if ' 315 'you are in Linux to temporarily increase inotify limit.' 316 ) 317 _LOG.info('') 318 _LOG.info( 319 _COLOR.green( 320 ' sudo sysctl fs.inotify.max_user_instances=' '$NEW_LIMIT$' 321 ) 322 ) 323 _LOG.info('') 324 _LOG.info( 325 ' Change $NEW_LIMIT$ with an integer number, ' 326 'e.g., 20000 should be enough.' 327 ) 328 329 330def exit_immediately(code: int) -> NoReturn: 331 """Exits quickly without waiting for threads to finish.""" 332 # Flush all log handlers 333 logging.shutdown() 334 # Note: The "proper" way to exit is via observer.stop(), then 335 # running a join. However it's slower, so just exit immediately. 336 # 337 # Additionally, since there are several threads in the watcher, the usual 338 # sys.exit approach doesn't work. Instead, run the low level exit which 339 # kills all threads. 340 os._exit(code) # pylint: disable=protected-access 341