• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Provides general purpose plugin functionality.
15
16As used in this module, a plugin is a Python object associated with a name.
17Plugins are registered in a Registry. The plugin object is typically a function,
18but can be anything.
19
20Plugins may be loaded in a variety of ways:
21
22- Listed in a plugins file in the file system (e.g. as "name module target").
23- Registered in a Python file using a decorator (@my_registry.plugin).
24- Registered directly or by name with function calls on a registry object.
25
26This functionality can be used to create plugins for command line tools,
27interactive consoles, or anything else. Pigweed's pw command uses this module
28for its plugins.
29"""
30
31from __future__ import annotations
32
33import collections
34import collections.abc
35import importlib
36import inspect
37import logging
38from pathlib import Path
39import pkgutil
40import sys
41from textwrap import TextWrapper
42import types
43from typing import Any, Callable, Iterable, Iterator, Set
44
45_LOG = logging.getLogger(__name__)
46_BUILT_IN = '<built-in>'
47
48
49class Error(Exception):
50    """Indicates that a plugin is invalid or cannot be registered."""
51
52    def __str__(self):
53        """Displays the error as a string, including the __cause__ if present.
54
55        Adding __cause__ gives useful context without displaying a backtrace.
56        """
57        if self.__cause__ is None:
58            return super().__str__()
59
60        return (
61            f'{super().__str__()} '
62            f'({type(self.__cause__).__name__}: {self.__cause__})'
63        )
64
65
66def _get_module(member: object) -> types.ModuleType:
67    """Gets the module or a fake module if the module isn't found."""
68    module = inspect.getmodule(member)
69    return module if module else types.ModuleType('<unknown>')
70
71
72class Plugin:
73    """Represents a Python entity registered as a plugin.
74
75    Each plugin resolves to a Python object, typically a function.
76    """
77
78    @classmethod
79    def from_name(
80        cls,
81        name: str,
82        module_name: str,
83        member_name: str,
84        source: Path | None,
85    ) -> Plugin:
86        """Creates a plugin by module and attribute name.
87
88        Args:
89          name: the name of the plugin
90          module_name: Python module name (e.g. 'foo_pkg.bar')
91          member_name: the name of the member in the module
92          source: path to the plugins file that declared this plugin, if any
93        """
94
95        # Attempt to access the module and member. Catch any errors that might
96        # occur, since a bad plugin shouldn't be a fatal error.
97        try:
98            module = importlib.import_module(module_name)
99        except Exception as err:
100            _LOG.debug(
101                'Failed to import module "%s" for "%s" plugin',
102                module_name,
103                name,
104                exc_info=True,
105            )
106            raise Error(f'Failed to import module "{module_name}"') from err
107
108        try:
109            member = getattr(module, member_name)
110        except AttributeError as err:
111            raise Error(
112                f'"{module_name}.{member_name}" does not exist'
113            ) from err
114
115        return cls(name, member, source)
116
117    def __init__(
118        self, name: str, target: Any, source: Path | None = None
119    ) -> None:
120        """Creates a plugin for the provided target."""
121        self.name = name
122        self._module = _get_module(target)
123        self.target = target
124        self.source = source
125
126    @property
127    def target_name(self) -> str:
128        return (
129            f'{self._module.__name__}.'
130            f'{getattr(self.target, "__name__", self.target)}'
131        )
132
133    @property
134    def source_name(self) -> str:
135        return _BUILT_IN if self.source is None else str(self.source)
136
137    def run_with_argv(self, argv: Iterable[str]) -> int:
138        """Sets sys.argv and calls the plugin function.
139
140        This is used to call a plugin as if from the command line.
141        """
142        original_sys_argv = sys.argv
143        sys.argv = [f'pw {self.name}', *argv]
144
145        try:
146            return self.target()
147        finally:
148            sys.argv = original_sys_argv
149
150    def help(self, full: bool = False) -> str:
151        """Returns a description of this plugin from its docstring."""
152        docstring = self.target.__doc__ or self._module.__doc__ or ''
153        return docstring if full else next(iter(docstring.splitlines()), '')
154
155    def details(self, full: bool = False) -> Iterator[str]:
156        yield f'help    {self.help(full=full)}'
157        yield f'module  {self._module.__name__}'
158        yield f'target  {getattr(self.target, "__name__", self.target)}'
159        yield f'source  {self.source_name}'
160
161    def __repr__(self) -> str:
162        return (
163            f'{self.__class__.__name__}(name={self.name!r}, '
164            f'target={self.target_name}'
165            f'{f", source={self.source_name!r}" if self.source else ""})'
166        )
167
168
169def callable_with_no_args(plugin: Plugin) -> None:
170    """Checks that a plugin is callable without arguments.
171
172    May be used for the validator argument to Registry.
173    """
174    try:
175        params = inspect.signature(plugin.target).parameters
176    except TypeError:
177        raise Error(
178            'Plugin functions must be callable, but '
179            f'{plugin.target_name} is a '
180            f'{type(plugin.target).__name__}'
181        )
182
183    positional = sum(p.default == p.empty for p in params.values())
184    if positional:
185        raise Error(
186            f'Plugin functions cannot have any required positional '
187            f'arguments, but {plugin.target_name} has {positional}'
188        )
189
190
191class Registry(collections.abc.Mapping):
192    """Manages a set of plugins from Python modules or plugins files."""
193
194    def __init__(
195        self, validator: Callable[[Plugin], Any] = lambda _: None
196    ) -> None:
197        """Creates a new, empty plugins registry.
198
199        Args:
200          validator: Function that checks whether a plugin is valid and should
201              be registered. Must raise plugins.Error is the plugin is invalid.
202        """
203
204        self._registry: dict[str, Plugin] = {}
205        self._sources: Set[Path] = set()  # Paths to plugins files
206        self._errors: dict[str, list[Exception]] = collections.defaultdict(list)
207        self._validate_plugin = validator
208
209    def __getitem__(self, name: str) -> Plugin:
210        """Accesses a plugin by name; raises KeyError if it does not exist."""
211        if name in self._registry:
212            return self._registry[name]
213
214        if name in self._errors:
215            raise KeyError(
216                f'Registration for "{name}" failed: '
217                + ', '.join(str(e) for e in self._errors[name])
218            )
219
220        raise KeyError(f'The plugin "{name}" has not been registered')
221
222    def __iter__(self) -> Iterator[str]:
223        return iter(self._registry)
224
225    def __len__(self) -> int:
226        return len(self._registry)
227
228    def errors(self) -> dict[str, list[Exception]]:
229        return self._errors
230
231    def run_with_argv(self, name: str, argv: Iterable[str]) -> int:
232        """Runs a plugin by name, setting sys.argv to the provided args.
233
234        This is used to run a command as if it were executed directly from the
235        command line. The plugin is expected to return an int.
236
237        Raises:
238          KeyError if plugin is not registered.
239        """
240        return self[name].run_with_argv(argv)
241
242    def _should_register(self, plugin: Plugin) -> bool:
243        """Determines and logs if a plugin should be registered or not.
244
245        Some errors are exceptions, others are not.
246        """
247
248        if plugin.name in self._registry and plugin.source is None:
249            raise Error(
250                f'Attempted to register built-in plugin "{plugin.name}", but '
251                'a plugin with that name was previously registered '
252                f'({self[plugin.name]})!'
253            )
254
255        # Run the user-provided validation function, which raises exceptions
256        # if there are errors.
257        self._validate_plugin(plugin)
258
259        existing = self._registry.get(plugin.name)
260
261        if existing is None:
262            return True
263
264        if existing.source is None:
265            _LOG.debug(
266                '%s: Overriding built-in plugin "%s" with %s',
267                plugin.source_name,
268                plugin.name,
269                plugin.target_name,
270            )
271            return True
272
273        if plugin.source != existing.source:
274            _LOG.debug(
275                '%s: The plugin "%s" was previously registered in %s; '
276                'ignoring registration as %s',
277                plugin.source_name,
278                plugin.name,
279                self._registry[plugin.name].source,
280                plugin.target_name,
281            )
282        elif plugin.source not in self._sources:
283            _LOG.warning(
284                '%s: "%s" is registered file multiple times in this file! '
285                'Only the first registration takes effect',
286                plugin.source_name,
287                plugin.name,
288            )
289
290        return False
291
292    def register(self, name: str, target: Any) -> Plugin | None:
293        """Registers an object as a plugin."""
294        return self._register(Plugin(name, target, None))
295
296    def register_by_name(
297        self,
298        name: str,
299        module_name: str,
300        member_name: str,
301        source: Path | None = None,
302    ) -> Plugin | None:
303        """Registers an object from its module and name as a plugin."""
304        return self._register(
305            Plugin.from_name(name, module_name, member_name, source)
306        )
307
308    def _register(self, plugin: Plugin) -> Plugin | None:
309        # Prohibit functions not from a plugins file from overriding others.
310        if not self._should_register(plugin):
311            return None
312
313        self._registry[plugin.name] = plugin
314        _LOG.debug(
315            '%s: Registered plugin "%s" for %s',
316            plugin.source_name,
317            plugin.name,
318            plugin.target_name,
319        )
320
321        return plugin
322
323    def register_config(
324        self,
325        config: dict,
326        path: Path | None = None,
327    ) -> None:
328        """Registers plugins from a Pigweed config.
329
330        Any exceptions raised from parsing the file are caught and logged.
331        """
332        plugins = config.get('pw', {}).get('pw_cli', {}).get('plugins', {})
333        for name, location in plugins.items():
334            module = location.pop('module')
335            function = location.pop('function')
336            if location:
337                raise ValueError(f'unrecognized plugin options: {location}')
338
339            try:
340                self.register_by_name(name, module, function, path)
341            except Error as err:
342                self._errors[name].append(err)
343                _LOG.error(
344                    '%s Failed to register plugin "%s": %s',
345                    path,
346                    name,
347                    err,
348                )
349
350    def register_file(self, path: Path) -> None:
351        """Registers plugins from a plugins file.
352
353        Any exceptions raised from parsing the file are caught and logged.
354        """
355        with path.open() as contents:
356            for lineno, line in enumerate(contents, 1):
357                line = line.strip()
358                if not line or line.startswith('#'):
359                    continue
360
361                try:
362                    name, module, function = line.split()
363                except ValueError as err:
364                    self._errors[line.strip()].append(Error(err))
365                    _LOG.error(
366                        '%s:%d: Failed to parse plugin entry "%s": '
367                        'Expected 3 items (name, module, function), '
368                        'got %d',
369                        path,
370                        lineno,
371                        line,
372                        len(line.split()),
373                    )
374                    continue
375
376                try:
377                    self.register_by_name(name, module, function, path)
378                except Error as err:
379                    self._errors[name].append(err)
380                    _LOG.error(
381                        '%s: Failed to register plugin "%s": %s',
382                        path,
383                        name,
384                        err,
385                    )
386
387        self._sources.add(path)
388
389    def register_directory(
390        self,
391        directory: Path,
392        file_name: str,
393        restrict_to: Path | None = None,
394    ) -> None:
395        """Finds and registers plugins from plugins files in a directory.
396
397        Args:
398          directory: The directory from which to start searching up.
399          file_name: The name of plugins files to look for.
400          restrict_to: If provided, do not search higher than this directory.
401        """
402        for path in find_all_in_parents(file_name, directory):
403            if not path.is_file():
404                continue
405
406            if restrict_to is not None and restrict_to not in path.parents:
407                _LOG.debug(
408                    "Skipping plugins file %s because it's outside of %s",
409                    path,
410                    restrict_to,
411                )
412                continue
413
414            _LOG.debug('Found plugins file %s', path)
415            self.register_file(path)
416
417    def short_help(self) -> str:
418        """Returns a help string for the registered plugins."""
419        width = (
420            max(len(name) for name in self._registry) + 1
421            if self._registry
422            else 1
423        )
424        help_items = '\n'.join(
425            f'  {name:{width}} {plugin.help()}'
426            for name, plugin in sorted(self._registry.items())
427        )
428        return f'supported plugins:\n{help_items}'
429
430    def detailed_help(self, plugins: Iterable[str] = ()) -> Iterator[str]:
431        """Yields lines of detailed information about commands."""
432        if not plugins:
433            plugins = list(self._registry)
434
435        yield '\ndetailed plugin information:'
436
437        wrapper = TextWrapper(
438            width=80, initial_indent='   ', subsequent_indent=' ' * 11
439        )
440
441        plugins = sorted(plugins)
442        for plugin in plugins:
443            yield f'  [{plugin}]'
444
445            try:
446                for line in self[plugin].details(full=len(plugins) == 1):
447                    yield wrapper.fill(line)
448            except KeyError as err:
449                yield wrapper.fill(f'error   {str(err)[1:-1]}')
450
451            yield ''
452
453        yield 'Plugins files:'
454
455        if self._sources:
456            yield from (
457                f'  [{i}] {file}' for i, file in enumerate(self._sources, 1)
458            )
459        else:
460            yield '  (none found)'
461
462    def plugin(
463        self, function: Callable | None = None, *, name: str | None = None
464    ) -> Callable[[Callable], Callable]:
465        """Decorator that registers a function with this plugin registry."""
466
467        def decorator(function: Callable) -> Callable:
468            self.register(function.__name__ if name is None else name, function)
469            return function
470
471        if function is None:
472            return decorator
473
474        self.register(function.__name__, function)
475        return function
476
477
478def find_in_parents(name: str, path: Path) -> Path | None:
479    """Searches parent directories of the path for a file or directory."""
480    path = path.resolve()
481
482    while not path.joinpath(name).exists():
483        path = path.parent
484
485        if path.samefile(path.parent):
486            return None
487
488    return path.joinpath(name)
489
490
491def find_all_in_parents(name: str, path: Path) -> Iterator[Path]:
492    """Searches all parent directories of the path for files or directories."""
493
494    while True:
495        result = find_in_parents(name, path)
496        if result is None:
497            return
498
499        yield result
500        path = result.parent.parent
501
502
503def import_submodules(
504    module: types.ModuleType, recursive: bool = False
505) -> None:
506    """Imports the submodules of a package.
507
508    This can be used to collect plugins registered with a decorator from a
509    directory.
510    """
511    path = module.__path__  # type: ignore[attr-defined]
512    if recursive:
513        modules = pkgutil.walk_packages(path, module.__name__ + '.')
514    else:
515        modules = pkgutil.iter_modules(path, module.__name__ + '.')
516
517    for info in modules:
518        importlib.import_module(info.name)
519