• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2025 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Utilities for managing and filtering paths to watch with pw_watch."""
15
16import logging
17import os
18from pathlib import Path
19import subprocess
20from typing import Callable, Iterable, NoReturn
21
22import pw_cli.color
23
24from watchdog.events import FileSystemEventHandler
25from watchdog.observers import Observer
26
27_LOG = logging.getLogger('pw_watch')
28_COLOR = pw_cli.color.colors()
29
30# Suppress events under 'fsevents', generated by watchdog on every file
31# event on MacOS.
32# TODO: b/182281481 - Fix file ignoring, rather than just suppressing logs
33logging.getLogger('fsevents').setLevel(logging.WARNING)
34
35
36ERRNO_INOTIFY_LIMIT_REACHED = 28
37WATCH_PATTERNS = (
38    '*.bazel',
39    '*.bzl',
40    '*.bloaty',
41    '*.c',
42    '*.cc',
43    '*.css',
44    '*.cpp',
45    '*.cmake',
46    'CMakeLists.txt',
47    '*.dts',
48    '*.dtsi',
49    '*.emb',
50    '*.gn',
51    '*.gni',
52    '*.go',
53    '*.h',
54    '*.hpp',
55    '*.html',
56    '*.java',
57    '*.js',
58    '*.ld',
59    '*.md',
60    '*.options',
61    '*.proto',
62    '*.py',
63    '*.rs',
64    '*.rst',
65    '*.s',
66    '*.S',
67    '*.toml',
68    '*.ts',
69)
70
71
72def git_ignored(file: Path) -> bool:
73    """Returns true if this file is in a Git repo and ignored by that repo.
74
75    Returns true for ignored files that were manually added to a repo.
76    """
77    file = file.resolve()
78    directory = file.parent
79
80    # Run the Git command from file's parent so that the correct repo is used.
81    while True:
82        try:
83            returncode = subprocess.run(
84                ['git', 'check-ignore', '--quiet', '--no-index', file],
85                stdout=subprocess.DEVNULL,
86                stderr=subprocess.DEVNULL,
87                cwd=directory,
88            ).returncode
89            return returncode in (0, 128)
90        except FileNotFoundError:
91            # If the directory no longer exists, try parent directories until
92            # an existing directory is found or all directories have been
93            # checked. This approach makes it possible to check if a deleted
94            # path is ignored in the repo it was originally created in.
95            if directory == directory.parent:
96                return False
97
98            directory = directory.parent
99
100
101def get_common_excludes(root: Path) -> Iterable[Path]:
102    """Find commonly excluded directories, and return them as a [Path]"""
103    exclude_list: list[Path] = []
104
105    typical_ignored_directories: list[str] = [
106        '.environment',  # Legacy bootstrap-created CIPD and Python venv.
107        '.presubmit',  # Presubmit-created CIPD and Python venv.
108        '.git',  # Pigweed's git repo.
109        '.mypy_cache',  # Python static analyzer.
110        '.cargo',  # Rust package manager.
111        'environment',  # Bootstrap-created CIPD and Python venv.
112        'out',  # Typical build directory.
113    ]
114
115    # Preset exclude for common project structures.
116    exclude_list.extend(
117        root / ignored_directory
118        for ignored_directory in typical_ignored_directories
119    )
120
121    # Ignore bazel-* directories
122    exclude_list.extend(
123        d for d in root.glob('bazel-*') if d.is_dir() and d.is_symlink()
124    )
125
126    # Check for and warn about legacy directories.
127    legacy_directories = [
128        '.cipd',  # Legacy CIPD location.
129        '.python3-venv',  # Legacy Python venv location.
130    ]
131    found_legacy = False
132    for legacy_directory in legacy_directories:
133        full_legacy_directory = root / legacy_directory
134        if full_legacy_directory.is_dir():
135            _LOG.warning(
136                'Legacy environment directory found: %s',
137                str(full_legacy_directory),
138            )
139            exclude_list.append(full_legacy_directory)
140            found_legacy = True
141    if found_legacy:
142        _LOG.warning(
143            'Found legacy environment directory(s); these ' 'should be deleted'
144        )
145
146    return exclude_list
147
148
149_FILESYSTEM_EVENTS_THAT_TRIGGER_BUILDS = (
150    'created',
151    'modified',
152    'deleted',
153    'moved',
154)
155
156
157def handle_watchdog_event(
158    event, watch_patterns: Iterable[str], ignore_patterns: Iterable[str]
159) -> Path | None:
160    """Returns the path if the event is significant, otherwise None."""
161
162    def path_matches(path: Path) -> bool:
163        return not any(path.match(x) for x in ignore_patterns) and any(
164            path.match(x) for x in watch_patterns
165        )
166
167    # There isn't any point in triggering builds on new directory creation.
168    # It's the creation or modification of files that indicate something
169    # meaningful enough changed for a build.
170    if event.is_directory:
171        return None
172
173    if event.event_type not in _FILESYSTEM_EVENTS_THAT_TRIGGER_BUILDS:
174        return None
175
176    # Collect paths of interest from the event.
177    paths: list[str] = []
178    if hasattr(event, 'dest_path'):
179        paths.append(os.fsdecode(event.dest_path))
180    if event.src_path:
181        paths.append(os.fsdecode(event.src_path))
182
183    # Check whether Git cares about any of these paths.
184    for path in (Path(p).resolve() for p in paths):
185        if not git_ignored(path) and path_matches(path):
186            return path
187
188    return None
189
190
191# Go over each directory inside of the current directory.
192# If it is not on the path of elements in directories_to_exclude, add
193# (directory, True) to subdirectories_to_watch and later recursively call
194# Observer() on them.
195# Otherwise add (directory, False) to subdirectories_to_watch and later call
196# Observer() with recursion=False.
197def minimal_watch_directories(to_watch: Path, to_exclude: Iterable[Path]):
198    """Determine which subdirectory to watch recursively"""
199    try:
200        to_watch = Path(to_watch)
201    except TypeError:
202        assert False, "Please watch one directory at a time."
203
204    # Reformat to_exclude.
205    directories_to_exclude: list[Path] = [
206        to_watch.joinpath(directory_to_exclude)
207        for directory_to_exclude in to_exclude
208        if to_watch.joinpath(directory_to_exclude).is_dir()
209    ]
210
211    # Split the relative path of directories_to_exclude (compared to to_watch),
212    # and generate all parent paths needed to be watched without recursion.
213    exclude_dir_parents = {to_watch}
214    for directory_to_exclude in directories_to_exclude:
215        # Irrelevant excluded path
216        if not Path(directory_to_exclude).is_relative_to(to_watch):
217            continue
218
219        parts = list(Path(directory_to_exclude).relative_to(to_watch).parts)[
220            :-1
221        ]
222        dir_tmp = to_watch
223        for part in parts:
224            dir_tmp = Path(dir_tmp, part)
225            exclude_dir_parents.add(dir_tmp)
226
227    # Go over all layers of directory. Append those that are the parents of
228    # directories_to_exclude to the list with recursion==False, and others
229    # with recursion==True.
230    for directory in exclude_dir_parents:
231        dir_path = Path(directory)
232        yield dir_path, False
233        for item in Path(directory).iterdir():
234            if (
235                item.is_dir()
236                and item not in exclude_dir_parents
237                and item not in directories_to_exclude
238            ):
239                yield item, True
240
241
242def watch(
243    watch_path: Path,
244    exclude_list: Iterable[Path],
245    event_handler: FileSystemEventHandler,
246) -> Callable[[], None]:
247    """Attaches the filesystem watcher for the specified paths.
248
249    Returns:
250      A function that, when called, blocks the thread until an internal watcher
251      error occurs.
252    """
253    # It can take awhile to configure the filesystem watcher, so have the
254    # message reflect that with the "...". Run inside the try: to
255    # gracefully handle the user Ctrl-C'ing out during startup.
256
257    # Try to make a short display path for the watched directory that has
258    # "$HOME" instead of the full home directory. This is nice for users
259    # who have deeply nested home directories.
260    path_to_log = str(watch_path.resolve()).replace(str(Path.home()), '$HOME')
261    _LOG.info('Attaching filesystem watcher to %s/...', path_to_log)
262
263    # Observe changes for all files in the root directory. Whether the
264    # directory should be observed recursively or not is determined by the
265    # second element in subdirectories_to_watch.
266    observers = []
267    for path, rec in minimal_watch_directories(watch_path, exclude_list):
268        observer = Observer()
269        observer.schedule(
270            event_handler,
271            str(path),
272            recursive=rec,
273        )
274        observer.start()
275        observers.append(observer)
276
277    def wait_function() -> None:
278        for observer in observers:
279            while observer.is_alive():
280                observer.join(1)
281        _LOG.error('Observers joined unexpectedly')
282
283    return wait_function
284
285
286def log_inotify_watch_limit_reached() -> None:
287    """Log that the inotify watch limit was reached.
288
289    Show information and suggested commands in OSError: inotify limit reached.
290    """
291    _LOG.error(
292        'Inotify watch limit reached: run this in your terminal if '
293        'you are in Linux to temporarily increase inotify limit.'
294    )
295    _LOG.info('')
296    _LOG.info(
297        _COLOR.green(
298            '        sudo sysctl fs.inotify.max_user_watches=' '$NEW_LIMIT$'
299        )
300    )
301    _LOG.info('')
302    _LOG.info(
303        '  Change $NEW_LIMIT$ with an integer number, '
304        'e.g., 20000 should be enough.'
305    )
306
307
308def log_inotify_instance_limit_reached() -> None:
309    """Log that the inotify instance limit was reached.
310
311    Show information and suggested commands in OSError: inotify limit reached.
312    """
313    _LOG.error(
314        'Inotify instance limit reached: run this in your terminal if '
315        'you are in Linux to temporarily increase inotify limit.'
316    )
317    _LOG.info('')
318    _LOG.info(
319        _COLOR.green(
320            '        sudo sysctl fs.inotify.max_user_instances=' '$NEW_LIMIT$'
321        )
322    )
323    _LOG.info('')
324    _LOG.info(
325        '  Change $NEW_LIMIT$ with an integer number, '
326        'e.g., 20000 should be enough.'
327    )
328
329
330def exit_immediately(code: int) -> NoReturn:
331    """Exits quickly without waiting for threads to finish."""
332    # Flush all log handlers
333    logging.shutdown()
334    # Note: The "proper" way to exit is via observer.stop(), then
335    # running a join. However it's slower, so just exit immediately.
336    #
337    # Additionally, since there are several threads in the watcher, the usual
338    # sys.exit approach doesn't work. Instead, run the low level exit which
339    # kills all threads.
340    os._exit(code)  # pylint: disable=protected-access
341