• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Provides general purpose plugin functionality.
15
16As used in this module, a plugin is a Python object associated with a name.
17Plugins are registered in a Registry. The plugin object is typically a function,
18but can be anything.
19
20Plugins may be loaded in a variety of ways:
21
22- Listed in a plugins file in the file system (e.g. as "name module target").
23- Registered in a Python file using a decorator (@my_registry.plugin).
24- Registered directly or by name with function calls on a registry object.
25
26This functionality can be used to create plugins for command line tools,
27interactive consoles, or anything else. Pigweed's pw command uses this module
28for its plugins.
29"""
30
31import collections
32import collections.abc
33import importlib
34import inspect
35import logging
36from pathlib import Path
37import pkgutil
38import sys
39from textwrap import TextWrapper
40import types
41from typing import Any, Callable, Dict, List, Iterable, Iterator, Optional, Set
42
43_LOG = logging.getLogger(__name__)
44_BUILT_IN = '<built-in>'
45
46
47class Error(Exception):
48    """Indicates that a plugin is invalid or cannot be registered."""
49
50    def __str__(self):
51        """Displays the error as a string, including the __cause__ if present.
52
53        Adding __cause__ gives useful context without displaying a backtrace.
54        """
55        if self.__cause__ is None:
56            return super().__str__()
57
58        return (
59            f'{super().__str__()} '
60            f'({type(self.__cause__).__name__}: {self.__cause__})'
61        )
62
63
64def _get_module(member: object) -> types.ModuleType:
65    """Gets the module or a fake module if the module isn't found."""
66    module = inspect.getmodule(member)
67    return module if module else types.ModuleType('<unknown>')
68
69
70class Plugin:
71    """Represents a Python entity registered as a plugin.
72
73    Each plugin resolves to a Python object, typically a function.
74    """
75
76    @classmethod
77    def from_name(
78        cls,
79        name: str,
80        module_name: str,
81        member_name: str,
82        source: Optional[Path],
83    ) -> 'Plugin':
84        """Creates a plugin by module and attribute name.
85
86        Args:
87          name: the name of the plugin
88          module_name: Python module name (e.g. 'foo_pkg.bar')
89          member_name: the name of the member in the module
90          source: path to the plugins file that declared this plugin, if any
91        """
92
93        # Attempt to access the module and member. Catch any errors that might
94        # occur, since a bad plugin shouldn't be a fatal error.
95        try:
96            module = importlib.import_module(module_name)
97        except Exception as err:
98            _LOG.debug(
99                'Failed to import module "%s" for "%s" plugin',
100                module_name,
101                name,
102                exc_info=True,
103            )
104            raise Error(f'Failed to import module "{module_name}"') from err
105
106        try:
107            member = getattr(module, member_name)
108        except AttributeError as err:
109            raise Error(
110                f'"{module_name}.{member_name}" does not exist'
111            ) from err
112
113        return cls(name, member, source)
114
115    def __init__(
116        self, name: str, target: Any, source: Optional[Path] = None
117    ) -> None:
118        """Creates a plugin for the provided target."""
119        self.name = name
120        self._module = _get_module(target)
121        self.target = target
122        self.source = source
123
124    @property
125    def target_name(self) -> str:
126        return (
127            f'{self._module.__name__}.'
128            f'{getattr(self.target, "__name__", self.target)}'
129        )
130
131    @property
132    def source_name(self) -> str:
133        return _BUILT_IN if self.source is None else str(self.source)
134
135    def run_with_argv(self, argv: Iterable[str]) -> int:
136        """Sets sys.argv and calls the plugin function.
137
138        This is used to call a plugin as if from the command line.
139        """
140        original_sys_argv = sys.argv
141        sys.argv = [f'pw {self.name}', *argv]
142
143        try:
144            return self.target()
145        finally:
146            sys.argv = original_sys_argv
147
148    def help(self, full: bool = False) -> str:
149        """Returns a description of this plugin from its docstring."""
150        docstring = self.target.__doc__ or self._module.__doc__ or ''
151        return docstring if full else next(iter(docstring.splitlines()), '')
152
153    def details(self, full: bool = False) -> Iterator[str]:
154        yield f'help    {self.help(full=full)}'
155        yield f'module  {self._module.__name__}'
156        yield f'target  {getattr(self.target, "__name__", self.target)}'
157        yield f'source  {self.source_name}'
158
159    def __repr__(self) -> str:
160        return (
161            f'{self.__class__.__name__}(name={self.name!r}, '
162            f'target={self.target_name}'
163            f'{f", source={self.source_name!r}" if self.source else ""})'
164        )
165
166
167def callable_with_no_args(plugin: Plugin) -> None:
168    """Checks that a plugin is callable without arguments.
169
170    May be used for the validator argument to Registry.
171    """
172    try:
173        params = inspect.signature(plugin.target).parameters
174    except TypeError:
175        raise Error(
176            'Plugin functions must be callable, but '
177            f'{plugin.target_name} is a '
178            f'{type(plugin.target).__name__}'
179        )
180
181    positional = sum(p.default == p.empty for p in params.values())
182    if positional:
183        raise Error(
184            f'Plugin functions cannot have any required positional '
185            f'arguments, but {plugin.target_name} has {positional}'
186        )
187
188
189class Registry(collections.abc.Mapping):
190    """Manages a set of plugins from Python modules or plugins files."""
191
192    def __init__(
193        self, validator: Callable[[Plugin], Any] = lambda _: None
194    ) -> None:
195        """Creates a new, empty plugins registry.
196
197        Args:
198          validator: Function that checks whether a plugin is valid and should
199              be registered. Must raise plugins.Error is the plugin is invalid.
200        """
201
202        self._registry: Dict[str, Plugin] = {}
203        self._sources: Set[Path] = set()  # Paths to plugins files
204        self._errors: Dict[str, List[Exception]] = collections.defaultdict(list)
205        self._validate_plugin = validator
206
207    def __getitem__(self, name: str) -> Plugin:
208        """Accesses a plugin by name; raises KeyError if it does not exist."""
209        if name in self._registry:
210            return self._registry[name]
211
212        if name in self._errors:
213            raise KeyError(
214                f'Registration for "{name}" failed: '
215                + ', '.join(str(e) for e in self._errors[name])
216            )
217
218        raise KeyError(f'The plugin "{name}" has not been registered')
219
220    def __iter__(self) -> Iterator[str]:
221        return iter(self._registry)
222
223    def __len__(self) -> int:
224        return len(self._registry)
225
226    def errors(self) -> Dict[str, List[Exception]]:
227        return self._errors
228
229    def run_with_argv(self, name: str, argv: Iterable[str]) -> int:
230        """Runs a plugin by name, setting sys.argv to the provided args.
231
232        This is used to run a command as if it were executed directly from the
233        command line. The plugin is expected to return an int.
234
235        Raises:
236          KeyError if plugin is not registered.
237        """
238        return self[name].run_with_argv(argv)
239
240    def _should_register(self, plugin: Plugin) -> bool:
241        """Determines and logs if a plugin should be registered or not.
242
243        Some errors are exceptions, others are not.
244        """
245
246        if plugin.name in self._registry and plugin.source is None:
247            raise Error(
248                f'Attempted to register built-in plugin "{plugin.name}", but '
249                'a plugin with that name was previously registered '
250                f'({self[plugin.name]})!'
251            )
252
253        # Run the user-provided validation function, which raises exceptions
254        # if there are errors.
255        self._validate_plugin(plugin)
256
257        existing = self._registry.get(plugin.name)
258
259        if existing is None:
260            return True
261
262        if existing.source is None:
263            _LOG.debug(
264                '%s: Overriding built-in plugin "%s" with %s',
265                plugin.source_name,
266                plugin.name,
267                plugin.target_name,
268            )
269            return True
270
271        if plugin.source != existing.source:
272            _LOG.debug(
273                '%s: The plugin "%s" was previously registered in %s; '
274                'ignoring registration as %s',
275                plugin.source_name,
276                plugin.name,
277                self._registry[plugin.name].source,
278                plugin.target_name,
279            )
280        elif plugin.source not in self._sources:
281            _LOG.warning(
282                '%s: "%s" is registered file multiple times in this file! '
283                'Only the first registration takes effect',
284                plugin.source_name,
285                plugin.name,
286            )
287
288        return False
289
290    def register(self, name: str, target: Any) -> Optional[Plugin]:
291        """Registers an object as a plugin."""
292        return self._register(Plugin(name, target, None))
293
294    def register_by_name(
295        self,
296        name: str,
297        module_name: str,
298        member_name: str,
299        source: Optional[Path] = None,
300    ) -> Optional[Plugin]:
301        """Registers an object from its module and name as a plugin."""
302        return self._register(
303            Plugin.from_name(name, module_name, member_name, source)
304        )
305
306    def _register(self, plugin: Plugin) -> Optional[Plugin]:
307        # Prohibit functions not from a plugins file from overriding others.
308        if not self._should_register(plugin):
309            return None
310
311        self._registry[plugin.name] = plugin
312        _LOG.debug(
313            '%s: Registered plugin "%s" for %s',
314            plugin.source_name,
315            plugin.name,
316            plugin.target_name,
317        )
318
319        return plugin
320
321    def register_file(self, path: Path) -> None:
322        """Registers plugins from a plugins file.
323
324        Any exceptions raised from parsing the file are caught and logged.
325        """
326        with path.open() as contents:
327            for lineno, line in enumerate(contents, 1):
328                line = line.strip()
329                if not line or line.startswith('#'):
330                    continue
331
332                try:
333                    name, module, function = line.split()
334                except ValueError as err:
335                    self._errors[line.strip()].append(Error(err))
336                    _LOG.error(
337                        '%s:%d: Failed to parse plugin entry "%s": '
338                        'Expected 3 items (name, module, function), '
339                        'got %d',
340                        path,
341                        lineno,
342                        line,
343                        len(line.split()),
344                    )
345                    continue
346
347                try:
348                    self.register_by_name(name, module, function, path)
349                except Error as err:
350                    self._errors[name].append(err)
351                    _LOG.error(
352                        '%s: Failed to register plugin "%s": %s',
353                        path,
354                        name,
355                        err,
356                    )
357
358        self._sources.add(path)
359
360    def register_directory(
361        self,
362        directory: Path,
363        file_name: str,
364        restrict_to: Optional[Path] = None,
365    ) -> None:
366        """Finds and registers plugins from plugins files in a directory.
367
368        Args:
369          directory: The directory from which to start searching up.
370          file_name: The name of plugins files to look for.
371          restrict_to: If provided, do not search higher than this directory.
372        """
373        for path in find_all_in_parents(file_name, directory):
374            if not path.is_file():
375                continue
376
377            if restrict_to is not None and restrict_to not in path.parents:
378                _LOG.debug(
379                    "Skipping plugins file %s because it's outside of %s",
380                    path,
381                    restrict_to,
382                )
383                continue
384
385            _LOG.debug('Found plugins file %s', path)
386            self.register_file(path)
387
388    def short_help(self) -> str:
389        """Returns a help string for the registered plugins."""
390        width = (
391            max(len(name) for name in self._registry) + 1
392            if self._registry
393            else 1
394        )
395        help_items = '\n'.join(
396            f'  {name:{width}} {plugin.help()}'
397            for name, plugin in sorted(self._registry.items())
398        )
399        return f'supported plugins:\n{help_items}'
400
401    def detailed_help(self, plugins: Iterable[str] = ()) -> Iterator[str]:
402        """Yields lines of detailed information about commands."""
403        if not plugins:
404            plugins = list(self._registry)
405
406        yield '\ndetailed plugin information:'
407
408        wrapper = TextWrapper(
409            width=80, initial_indent='   ', subsequent_indent=' ' * 11
410        )
411
412        plugins = sorted(plugins)
413        for plugin in plugins:
414            yield f'  [{plugin}]'
415
416            try:
417                for line in self[plugin].details(full=len(plugins) == 1):
418                    yield wrapper.fill(line)
419            except KeyError as err:
420                yield wrapper.fill(f'error   {str(err)[1:-1]}')
421
422            yield ''
423
424        yield 'Plugins files:'
425
426        if self._sources:
427            yield from (
428                f'  [{i}] {file}' for i, file in enumerate(self._sources, 1)
429            )
430        else:
431            yield '  (none found)'
432
433    def plugin(
434        self, function: Optional[Callable] = None, *, name: Optional[str] = None
435    ) -> Callable[[Callable], Callable]:
436        """Decorator that registers a function with this plugin registry."""
437
438        def decorator(function: Callable) -> Callable:
439            self.register(function.__name__ if name is None else name, function)
440            return function
441
442        if function is None:
443            return decorator
444
445        self.register(function.__name__, function)
446        return function
447
448
449def find_in_parents(name: str, path: Path) -> Optional[Path]:
450    """Searches parent directories of the path for a file or directory."""
451    path = path.resolve()
452
453    while not path.joinpath(name).exists():
454        path = path.parent
455
456        if path.samefile(path.parent):
457            return None
458
459    return path.joinpath(name)
460
461
462def find_all_in_parents(name: str, path: Path) -> Iterator[Path]:
463    """Searches all parent directories of the path for files or directories."""
464
465    while True:
466        result = find_in_parents(name, path)
467        if result is None:
468            return
469
470        yield result
471        path = result.parent.parent
472
473
474def import_submodules(
475    module: types.ModuleType, recursive: bool = False
476) -> None:
477    """Imports the submodules of a package.
478
479    This can be used to collect plugins registered with a decorator from a
480    directory.
481    """
482    path = module.__path__  # type: ignore[attr-defined]
483    if recursive:
484        modules = pkgutil.walk_packages(path, module.__name__ + '.')
485    else:
486        modules = pkgutil.iter_modules(path, module.__name__ + '.')
487
488    for info in modules:
489        importlib.import_module(info.name)
490