1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Provides general purpose plugin functionality. 15 16As used in this module, a plugin is a Python object associated with a name. 17Plugins are registered in a Registry. The plugin object is typically a function, 18but can be anything. 19 20Plugins may be loaded in a variety of ways: 21 22- Listed in a plugins file in the file system (e.g. as "name module target"). 23- Registered in a Python file using a decorator (@my_registry.plugin). 24- Registered directly or by name with function calls on a registry object. 25 26This functionality can be used to create plugins for command line tools, 27interactive consoles, or anything else. Pigweed's pw command uses this module 28for its plugins. 29""" 30 31import collections 32import collections.abc 33import importlib 34import inspect 35import logging 36from pathlib import Path 37import pkgutil 38import sys 39from textwrap import TextWrapper 40import types 41from typing import Any, Callable, Dict, List, Iterable, Iterator, Optional, Set 42 43_LOG = logging.getLogger(__name__) 44_BUILT_IN = '<built-in>' 45 46 47class Error(Exception): 48 """Indicates that a plugin is invalid or cannot be registered.""" 49 50 def __str__(self): 51 """Displays the error as a string, including the __cause__ if present. 52 53 Adding __cause__ gives useful context without displaying a backtrace. 54 """ 55 if self.__cause__ is None: 56 return super().__str__() 57 58 return ( 59 f'{super().__str__()} ' 60 f'({type(self.__cause__).__name__}: {self.__cause__})' 61 ) 62 63 64def _get_module(member: object) -> types.ModuleType: 65 """Gets the module or a fake module if the module isn't found.""" 66 module = inspect.getmodule(member) 67 return module if module else types.ModuleType('<unknown>') 68 69 70class Plugin: 71 """Represents a Python entity registered as a plugin. 72 73 Each plugin resolves to a Python object, typically a function. 74 """ 75 76 @classmethod 77 def from_name( 78 cls, 79 name: str, 80 module_name: str, 81 member_name: str, 82 source: Optional[Path], 83 ) -> 'Plugin': 84 """Creates a plugin by module and attribute name. 85 86 Args: 87 name: the name of the plugin 88 module_name: Python module name (e.g. 'foo_pkg.bar') 89 member_name: the name of the member in the module 90 source: path to the plugins file that declared this plugin, if any 91 """ 92 93 # Attempt to access the module and member. Catch any errors that might 94 # occur, since a bad plugin shouldn't be a fatal error. 95 try: 96 module = importlib.import_module(module_name) 97 except Exception as err: 98 _LOG.debug( 99 'Failed to import module "%s" for "%s" plugin', 100 module_name, 101 name, 102 exc_info=True, 103 ) 104 raise Error(f'Failed to import module "{module_name}"') from err 105 106 try: 107 member = getattr(module, member_name) 108 except AttributeError as err: 109 raise Error( 110 f'"{module_name}.{member_name}" does not exist' 111 ) from err 112 113 return cls(name, member, source) 114 115 def __init__( 116 self, name: str, target: Any, source: Optional[Path] = None 117 ) -> None: 118 """Creates a plugin for the provided target.""" 119 self.name = name 120 self._module = _get_module(target) 121 self.target = target 122 self.source = source 123 124 @property 125 def target_name(self) -> str: 126 return ( 127 f'{self._module.__name__}.' 128 f'{getattr(self.target, "__name__", self.target)}' 129 ) 130 131 @property 132 def source_name(self) -> str: 133 return _BUILT_IN if self.source is None else str(self.source) 134 135 def run_with_argv(self, argv: Iterable[str]) -> int: 136 """Sets sys.argv and calls the plugin function. 137 138 This is used to call a plugin as if from the command line. 139 """ 140 original_sys_argv = sys.argv 141 sys.argv = [f'pw {self.name}', *argv] 142 143 try: 144 return self.target() 145 finally: 146 sys.argv = original_sys_argv 147 148 def help(self, full: bool = False) -> str: 149 """Returns a description of this plugin from its docstring.""" 150 docstring = self.target.__doc__ or self._module.__doc__ or '' 151 return docstring if full else next(iter(docstring.splitlines()), '') 152 153 def details(self, full: bool = False) -> Iterator[str]: 154 yield f'help {self.help(full=full)}' 155 yield f'module {self._module.__name__}' 156 yield f'target {getattr(self.target, "__name__", self.target)}' 157 yield f'source {self.source_name}' 158 159 def __repr__(self) -> str: 160 return ( 161 f'{self.__class__.__name__}(name={self.name!r}, ' 162 f'target={self.target_name}' 163 f'{f", source={self.source_name!r}" if self.source else ""})' 164 ) 165 166 167def callable_with_no_args(plugin: Plugin) -> None: 168 """Checks that a plugin is callable without arguments. 169 170 May be used for the validator argument to Registry. 171 """ 172 try: 173 params = inspect.signature(plugin.target).parameters 174 except TypeError: 175 raise Error( 176 'Plugin functions must be callable, but ' 177 f'{plugin.target_name} is a ' 178 f'{type(plugin.target).__name__}' 179 ) 180 181 positional = sum(p.default == p.empty for p in params.values()) 182 if positional: 183 raise Error( 184 f'Plugin functions cannot have any required positional ' 185 f'arguments, but {plugin.target_name} has {positional}' 186 ) 187 188 189class Registry(collections.abc.Mapping): 190 """Manages a set of plugins from Python modules or plugins files.""" 191 192 def __init__( 193 self, validator: Callable[[Plugin], Any] = lambda _: None 194 ) -> None: 195 """Creates a new, empty plugins registry. 196 197 Args: 198 validator: Function that checks whether a plugin is valid and should 199 be registered. Must raise plugins.Error is the plugin is invalid. 200 """ 201 202 self._registry: Dict[str, Plugin] = {} 203 self._sources: Set[Path] = set() # Paths to plugins files 204 self._errors: Dict[str, List[Exception]] = collections.defaultdict(list) 205 self._validate_plugin = validator 206 207 def __getitem__(self, name: str) -> Plugin: 208 """Accesses a plugin by name; raises KeyError if it does not exist.""" 209 if name in self._registry: 210 return self._registry[name] 211 212 if name in self._errors: 213 raise KeyError( 214 f'Registration for "{name}" failed: ' 215 + ', '.join(str(e) for e in self._errors[name]) 216 ) 217 218 raise KeyError(f'The plugin "{name}" has not been registered') 219 220 def __iter__(self) -> Iterator[str]: 221 return iter(self._registry) 222 223 def __len__(self) -> int: 224 return len(self._registry) 225 226 def errors(self) -> Dict[str, List[Exception]]: 227 return self._errors 228 229 def run_with_argv(self, name: str, argv: Iterable[str]) -> int: 230 """Runs a plugin by name, setting sys.argv to the provided args. 231 232 This is used to run a command as if it were executed directly from the 233 command line. The plugin is expected to return an int. 234 235 Raises: 236 KeyError if plugin is not registered. 237 """ 238 return self[name].run_with_argv(argv) 239 240 def _should_register(self, plugin: Plugin) -> bool: 241 """Determines and logs if a plugin should be registered or not. 242 243 Some errors are exceptions, others are not. 244 """ 245 246 if plugin.name in self._registry and plugin.source is None: 247 raise Error( 248 f'Attempted to register built-in plugin "{plugin.name}", but ' 249 'a plugin with that name was previously registered ' 250 f'({self[plugin.name]})!' 251 ) 252 253 # Run the user-provided validation function, which raises exceptions 254 # if there are errors. 255 self._validate_plugin(plugin) 256 257 existing = self._registry.get(plugin.name) 258 259 if existing is None: 260 return True 261 262 if existing.source is None: 263 _LOG.debug( 264 '%s: Overriding built-in plugin "%s" with %s', 265 plugin.source_name, 266 plugin.name, 267 plugin.target_name, 268 ) 269 return True 270 271 if plugin.source != existing.source: 272 _LOG.debug( 273 '%s: The plugin "%s" was previously registered in %s; ' 274 'ignoring registration as %s', 275 plugin.source_name, 276 plugin.name, 277 self._registry[plugin.name].source, 278 plugin.target_name, 279 ) 280 elif plugin.source not in self._sources: 281 _LOG.warning( 282 '%s: "%s" is registered file multiple times in this file! ' 283 'Only the first registration takes effect', 284 plugin.source_name, 285 plugin.name, 286 ) 287 288 return False 289 290 def register(self, name: str, target: Any) -> Optional[Plugin]: 291 """Registers an object as a plugin.""" 292 return self._register(Plugin(name, target, None)) 293 294 def register_by_name( 295 self, 296 name: str, 297 module_name: str, 298 member_name: str, 299 source: Optional[Path] = None, 300 ) -> Optional[Plugin]: 301 """Registers an object from its module and name as a plugin.""" 302 return self._register( 303 Plugin.from_name(name, module_name, member_name, source) 304 ) 305 306 def _register(self, plugin: Plugin) -> Optional[Plugin]: 307 # Prohibit functions not from a plugins file from overriding others. 308 if not self._should_register(plugin): 309 return None 310 311 self._registry[plugin.name] = plugin 312 _LOG.debug( 313 '%s: Registered plugin "%s" for %s', 314 plugin.source_name, 315 plugin.name, 316 plugin.target_name, 317 ) 318 319 return plugin 320 321 def register_file(self, path: Path) -> None: 322 """Registers plugins from a plugins file. 323 324 Any exceptions raised from parsing the file are caught and logged. 325 """ 326 with path.open() as contents: 327 for lineno, line in enumerate(contents, 1): 328 line = line.strip() 329 if not line or line.startswith('#'): 330 continue 331 332 try: 333 name, module, function = line.split() 334 except ValueError as err: 335 self._errors[line.strip()].append(Error(err)) 336 _LOG.error( 337 '%s:%d: Failed to parse plugin entry "%s": ' 338 'Expected 3 items (name, module, function), ' 339 'got %d', 340 path, 341 lineno, 342 line, 343 len(line.split()), 344 ) 345 continue 346 347 try: 348 self.register_by_name(name, module, function, path) 349 except Error as err: 350 self._errors[name].append(err) 351 _LOG.error( 352 '%s: Failed to register plugin "%s": %s', 353 path, 354 name, 355 err, 356 ) 357 358 self._sources.add(path) 359 360 def register_directory( 361 self, 362 directory: Path, 363 file_name: str, 364 restrict_to: Optional[Path] = None, 365 ) -> None: 366 """Finds and registers plugins from plugins files in a directory. 367 368 Args: 369 directory: The directory from which to start searching up. 370 file_name: The name of plugins files to look for. 371 restrict_to: If provided, do not search higher than this directory. 372 """ 373 for path in find_all_in_parents(file_name, directory): 374 if not path.is_file(): 375 continue 376 377 if restrict_to is not None and restrict_to not in path.parents: 378 _LOG.debug( 379 "Skipping plugins file %s because it's outside of %s", 380 path, 381 restrict_to, 382 ) 383 continue 384 385 _LOG.debug('Found plugins file %s', path) 386 self.register_file(path) 387 388 def short_help(self) -> str: 389 """Returns a help string for the registered plugins.""" 390 width = ( 391 max(len(name) for name in self._registry) + 1 392 if self._registry 393 else 1 394 ) 395 help_items = '\n'.join( 396 f' {name:{width}} {plugin.help()}' 397 for name, plugin in sorted(self._registry.items()) 398 ) 399 return f'supported plugins:\n{help_items}' 400 401 def detailed_help(self, plugins: Iterable[str] = ()) -> Iterator[str]: 402 """Yields lines of detailed information about commands.""" 403 if not plugins: 404 plugins = list(self._registry) 405 406 yield '\ndetailed plugin information:' 407 408 wrapper = TextWrapper( 409 width=80, initial_indent=' ', subsequent_indent=' ' * 11 410 ) 411 412 plugins = sorted(plugins) 413 for plugin in plugins: 414 yield f' [{plugin}]' 415 416 try: 417 for line in self[plugin].details(full=len(plugins) == 1): 418 yield wrapper.fill(line) 419 except KeyError as err: 420 yield wrapper.fill(f'error {str(err)[1:-1]}') 421 422 yield '' 423 424 yield 'Plugins files:' 425 426 if self._sources: 427 yield from ( 428 f' [{i}] {file}' for i, file in enumerate(self._sources, 1) 429 ) 430 else: 431 yield ' (none found)' 432 433 def plugin( 434 self, function: Optional[Callable] = None, *, name: Optional[str] = None 435 ) -> Callable[[Callable], Callable]: 436 """Decorator that registers a function with this plugin registry.""" 437 438 def decorator(function: Callable) -> Callable: 439 self.register(function.__name__ if name is None else name, function) 440 return function 441 442 if function is None: 443 return decorator 444 445 self.register(function.__name__, function) 446 return function 447 448 449def find_in_parents(name: str, path: Path) -> Optional[Path]: 450 """Searches parent directories of the path for a file or directory.""" 451 path = path.resolve() 452 453 while not path.joinpath(name).exists(): 454 path = path.parent 455 456 if path.samefile(path.parent): 457 return None 458 459 return path.joinpath(name) 460 461 462def find_all_in_parents(name: str, path: Path) -> Iterator[Path]: 463 """Searches all parent directories of the path for files or directories.""" 464 465 while True: 466 result = find_in_parents(name, path) 467 if result is None: 468 return 469 470 yield result 471 path = result.parent.parent 472 473 474def import_submodules( 475 module: types.ModuleType, recursive: bool = False 476) -> None: 477 """Imports the submodules of a package. 478 479 This can be used to collect plugins registered with a decorator from a 480 directory. 481 """ 482 path = module.__path__ # type: ignore[attr-defined] 483 if recursive: 484 modules = pkgutil.walk_packages(path, module.__name__ + '.') 485 else: 486 modules = pkgutil.iter_modules(path, module.__name__ + '.') 487 488 for info in modules: 489 importlib.import_module(info.name) 490