1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Provides general purpose plugin functionality. 15 16As used in this module, a plugin is a Python object associated with a name. 17Plugins are registered in a Registry. The plugin object is typically a function, 18but can be anything. 19 20Plugins may be loaded in a variety of ways: 21 22- Listed in a plugins file in the file system (e.g. as "name module target"). 23- Registered in a Python file using a decorator (@my_registry.plugin). 24- Registered directly or by name with function calls on a registry object. 25 26This functionality can be used to create plugins for command line tools, 27interactive consoles, or anything else. Pigweed's pw command uses this module 28for its plugins. 29""" 30 31from __future__ import annotations 32 33import collections 34import collections.abc 35import importlib 36import inspect 37import logging 38from pathlib import Path 39import pkgutil 40import sys 41from textwrap import TextWrapper 42import types 43from typing import Any, Callable, Iterable, Iterator, Set 44 45_LOG = logging.getLogger(__name__) 46_BUILT_IN = '<built-in>' 47 48 49class Error(Exception): 50 """Indicates that a plugin is invalid or cannot be registered.""" 51 52 def __str__(self): 53 """Displays the error as a string, including the __cause__ if present. 54 55 Adding __cause__ gives useful context without displaying a backtrace. 56 """ 57 if self.__cause__ is None: 58 return super().__str__() 59 60 return ( 61 f'{super().__str__()} ' 62 f'({type(self.__cause__).__name__}: {self.__cause__})' 63 ) 64 65 66def _get_module(member: object) -> types.ModuleType: 67 """Gets the module or a fake module if the module isn't found.""" 68 module = inspect.getmodule(member) 69 return module if module else types.ModuleType('<unknown>') 70 71 72class Plugin: 73 """Represents a Python entity registered as a plugin. 74 75 Each plugin resolves to a Python object, typically a function. 76 """ 77 78 @classmethod 79 def from_name( 80 cls, 81 name: str, 82 module_name: str, 83 member_name: str, 84 source: Path | None, 85 ) -> Plugin: 86 """Creates a plugin by module and attribute name. 87 88 Args: 89 name: the name of the plugin 90 module_name: Python module name (e.g. 'foo_pkg.bar') 91 member_name: the name of the member in the module 92 source: path to the plugins file that declared this plugin, if any 93 """ 94 95 # Attempt to access the module and member. Catch any errors that might 96 # occur, since a bad plugin shouldn't be a fatal error. 97 try: 98 module = importlib.import_module(module_name) 99 except Exception as err: 100 _LOG.debug( 101 'Failed to import module "%s" for "%s" plugin', 102 module_name, 103 name, 104 exc_info=True, 105 ) 106 raise Error(f'Failed to import module "{module_name}"') from err 107 108 try: 109 member = getattr(module, member_name) 110 except AttributeError as err: 111 raise Error( 112 f'"{module_name}.{member_name}" does not exist' 113 ) from err 114 115 return cls(name, member, source) 116 117 def __init__( 118 self, name: str, target: Any, source: Path | None = None 119 ) -> None: 120 """Creates a plugin for the provided target.""" 121 self.name = name 122 self._module = _get_module(target) 123 self.target = target 124 self.source = source 125 126 @property 127 def target_name(self) -> str: 128 return ( 129 f'{self._module.__name__}.' 130 f'{getattr(self.target, "__name__", self.target)}' 131 ) 132 133 @property 134 def source_name(self) -> str: 135 return _BUILT_IN if self.source is None else str(self.source) 136 137 def run_with_argv(self, argv: Iterable[str]) -> int: 138 """Sets sys.argv and calls the plugin function. 139 140 This is used to call a plugin as if from the command line. 141 """ 142 original_sys_argv = sys.argv 143 sys.argv = [f'pw {self.name}', *argv] 144 145 try: 146 return self.target() 147 finally: 148 sys.argv = original_sys_argv 149 150 def help(self, full: bool = False) -> str: 151 """Returns a description of this plugin from its docstring.""" 152 docstring = self.target.__doc__ or self._module.__doc__ or '' 153 return docstring if full else next(iter(docstring.splitlines()), '') 154 155 def details(self, full: bool = False) -> Iterator[str]: 156 yield f'help {self.help(full=full)}' 157 yield f'module {self._module.__name__}' 158 yield f'target {getattr(self.target, "__name__", self.target)}' 159 yield f'source {self.source_name}' 160 161 def __repr__(self) -> str: 162 return ( 163 f'{self.__class__.__name__}(name={self.name!r}, ' 164 f'target={self.target_name}' 165 f'{f", source={self.source_name!r}" if self.source else ""})' 166 ) 167 168 169def callable_with_no_args(plugin: Plugin) -> None: 170 """Checks that a plugin is callable without arguments. 171 172 May be used for the validator argument to Registry. 173 """ 174 try: 175 params = inspect.signature(plugin.target).parameters 176 except TypeError: 177 raise Error( 178 'Plugin functions must be callable, but ' 179 f'{plugin.target_name} is a ' 180 f'{type(plugin.target).__name__}' 181 ) 182 183 positional = sum(p.default == p.empty for p in params.values()) 184 if positional: 185 raise Error( 186 f'Plugin functions cannot have any required positional ' 187 f'arguments, but {plugin.target_name} has {positional}' 188 ) 189 190 191class Registry(collections.abc.Mapping): 192 """Manages a set of plugins from Python modules or plugins files.""" 193 194 def __init__( 195 self, validator: Callable[[Plugin], Any] = lambda _: None 196 ) -> None: 197 """Creates a new, empty plugins registry. 198 199 Args: 200 validator: Function that checks whether a plugin is valid and should 201 be registered. Must raise plugins.Error is the plugin is invalid. 202 """ 203 204 self._registry: dict[str, Plugin] = {} 205 self._sources: Set[Path] = set() # Paths to plugins files 206 self._errors: dict[str, list[Exception]] = collections.defaultdict(list) 207 self._validate_plugin = validator 208 209 def __getitem__(self, name: str) -> Plugin: 210 """Accesses a plugin by name; raises KeyError if it does not exist.""" 211 if name in self._registry: 212 return self._registry[name] 213 214 if name in self._errors: 215 raise KeyError( 216 f'Registration for "{name}" failed: ' 217 + ', '.join(str(e) for e in self._errors[name]) 218 ) 219 220 raise KeyError(f'The plugin "{name}" has not been registered') 221 222 def __iter__(self) -> Iterator[str]: 223 return iter(self._registry) 224 225 def __len__(self) -> int: 226 return len(self._registry) 227 228 def errors(self) -> dict[str, list[Exception]]: 229 return self._errors 230 231 def run_with_argv(self, name: str, argv: Iterable[str]) -> int: 232 """Runs a plugin by name, setting sys.argv to the provided args. 233 234 This is used to run a command as if it were executed directly from the 235 command line. The plugin is expected to return an int. 236 237 Raises: 238 KeyError if plugin is not registered. 239 """ 240 return self[name].run_with_argv(argv) 241 242 def _should_register(self, plugin: Plugin) -> bool: 243 """Determines and logs if a plugin should be registered or not. 244 245 Some errors are exceptions, others are not. 246 """ 247 248 if plugin.name in self._registry and plugin.source is None: 249 raise Error( 250 f'Attempted to register built-in plugin "{plugin.name}", but ' 251 'a plugin with that name was previously registered ' 252 f'({self[plugin.name]})!' 253 ) 254 255 # Run the user-provided validation function, which raises exceptions 256 # if there are errors. 257 self._validate_plugin(plugin) 258 259 existing = self._registry.get(plugin.name) 260 261 if existing is None: 262 return True 263 264 if existing.source is None: 265 _LOG.debug( 266 '%s: Overriding built-in plugin "%s" with %s', 267 plugin.source_name, 268 plugin.name, 269 plugin.target_name, 270 ) 271 return True 272 273 if plugin.source != existing.source: 274 _LOG.debug( 275 '%s: The plugin "%s" was previously registered in %s; ' 276 'ignoring registration as %s', 277 plugin.source_name, 278 plugin.name, 279 self._registry[plugin.name].source, 280 plugin.target_name, 281 ) 282 elif plugin.source not in self._sources: 283 _LOG.warning( 284 '%s: "%s" is registered file multiple times in this file! ' 285 'Only the first registration takes effect', 286 plugin.source_name, 287 plugin.name, 288 ) 289 290 return False 291 292 def register(self, name: str, target: Any) -> Plugin | None: 293 """Registers an object as a plugin.""" 294 return self._register(Plugin(name, target, None)) 295 296 def register_by_name( 297 self, 298 name: str, 299 module_name: str, 300 member_name: str, 301 source: Path | None = None, 302 ) -> Plugin | None: 303 """Registers an object from its module and name as a plugin.""" 304 return self._register( 305 Plugin.from_name(name, module_name, member_name, source) 306 ) 307 308 def _register(self, plugin: Plugin) -> Plugin | None: 309 # Prohibit functions not from a plugins file from overriding others. 310 if not self._should_register(plugin): 311 return None 312 313 self._registry[plugin.name] = plugin 314 _LOG.debug( 315 '%s: Registered plugin "%s" for %s', 316 plugin.source_name, 317 plugin.name, 318 plugin.target_name, 319 ) 320 321 return plugin 322 323 def register_config( 324 self, 325 config: dict, 326 path: Path | None = None, 327 ) -> None: 328 """Registers plugins from a Pigweed config. 329 330 Any exceptions raised from parsing the file are caught and logged. 331 """ 332 plugins = config.get('pw', {}).get('pw_cli', {}).get('plugins', {}) 333 for name, location in plugins.items(): 334 module = location.pop('module') 335 function = location.pop('function') 336 if location: 337 raise ValueError(f'unrecognized plugin options: {location}') 338 339 try: 340 self.register_by_name(name, module, function, path) 341 except Error as err: 342 self._errors[name].append(err) 343 _LOG.error( 344 '%s Failed to register plugin "%s": %s', 345 path, 346 name, 347 err, 348 ) 349 350 def register_file(self, path: Path) -> None: 351 """Registers plugins from a plugins file. 352 353 Any exceptions raised from parsing the file are caught and logged. 354 """ 355 with path.open() as contents: 356 for lineno, line in enumerate(contents, 1): 357 line = line.strip() 358 if not line or line.startswith('#'): 359 continue 360 361 try: 362 name, module, function = line.split() 363 except ValueError as err: 364 self._errors[line.strip()].append(Error(err)) 365 _LOG.error( 366 '%s:%d: Failed to parse plugin entry "%s": ' 367 'Expected 3 items (name, module, function), ' 368 'got %d', 369 path, 370 lineno, 371 line, 372 len(line.split()), 373 ) 374 continue 375 376 try: 377 self.register_by_name(name, module, function, path) 378 except Error as err: 379 self._errors[name].append(err) 380 _LOG.error( 381 '%s: Failed to register plugin "%s": %s', 382 path, 383 name, 384 err, 385 ) 386 387 self._sources.add(path) 388 389 def register_directory( 390 self, 391 directory: Path, 392 file_name: str, 393 restrict_to: Path | None = None, 394 ) -> None: 395 """Finds and registers plugins from plugins files in a directory. 396 397 Args: 398 directory: The directory from which to start searching up. 399 file_name: The name of plugins files to look for. 400 restrict_to: If provided, do not search higher than this directory. 401 """ 402 for path in find_all_in_parents(file_name, directory): 403 if not path.is_file(): 404 continue 405 406 if restrict_to is not None and restrict_to not in path.parents: 407 _LOG.debug( 408 "Skipping plugins file %s because it's outside of %s", 409 path, 410 restrict_to, 411 ) 412 continue 413 414 _LOG.debug('Found plugins file %s', path) 415 self.register_file(path) 416 417 def short_help(self) -> str: 418 """Returns a help string for the registered plugins.""" 419 width = ( 420 max(len(name) for name in self._registry) + 1 421 if self._registry 422 else 1 423 ) 424 help_items = '\n'.join( 425 f' {name:{width}} {plugin.help()}' 426 for name, plugin in sorted(self._registry.items()) 427 ) 428 return f'supported plugins:\n{help_items}' 429 430 def detailed_help(self, plugins: Iterable[str] = ()) -> Iterator[str]: 431 """Yields lines of detailed information about commands.""" 432 if not plugins: 433 plugins = list(self._registry) 434 435 yield '\ndetailed plugin information:' 436 437 wrapper = TextWrapper( 438 width=80, initial_indent=' ', subsequent_indent=' ' * 11 439 ) 440 441 plugins = sorted(plugins) 442 for plugin in plugins: 443 yield f' [{plugin}]' 444 445 try: 446 for line in self[plugin].details(full=len(plugins) == 1): 447 yield wrapper.fill(line) 448 except KeyError as err: 449 yield wrapper.fill(f'error {str(err)[1:-1]}') 450 451 yield '' 452 453 yield 'Plugins files:' 454 455 if self._sources: 456 yield from ( 457 f' [{i}] {file}' for i, file in enumerate(self._sources, 1) 458 ) 459 else: 460 yield ' (none found)' 461 462 def plugin( 463 self, function: Callable | None = None, *, name: str | None = None 464 ) -> Callable[[Callable], Callable]: 465 """Decorator that registers a function with this plugin registry.""" 466 467 def decorator(function: Callable) -> Callable: 468 self.register(function.__name__ if name is None else name, function) 469 return function 470 471 if function is None: 472 return decorator 473 474 self.register(function.__name__, function) 475 return function 476 477 478def find_in_parents(name: str, path: Path) -> Path | None: 479 """Searches parent directories of the path for a file or directory.""" 480 path = path.resolve() 481 482 while not path.joinpath(name).exists(): 483 path = path.parent 484 485 if path.samefile(path.parent): 486 return None 487 488 return path.joinpath(name) 489 490 491def find_all_in_parents(name: str, path: Path) -> Iterator[Path]: 492 """Searches all parent directories of the path for files or directories.""" 493 494 while True: 495 result = find_in_parents(name, path) 496 if result is None: 497 return 498 499 yield result 500 path = result.parent.parent 501 502 503def import_submodules( 504 module: types.ModuleType, recursive: bool = False 505) -> None: 506 """Imports the submodules of a package. 507 508 This can be used to collect plugins registered with a decorator from a 509 directory. 510 """ 511 path = module.__path__ # type: ignore[attr-defined] 512 if recursive: 513 modules = pkgutil.walk_packages(path, module.__name__ + '.') 514 else: 515 modules = pkgutil.iter_modules(path, module.__name__ + '.') 516 517 for info in modules: 518 importlib.import_module(info.name) 519