• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Utility functions to expand configuration directives or special values
2(such glob patterns).
3
4We can split the process of interpreting configuration files into 2 steps:
5
61. The parsing the file contents from strings to value objects
7   that can be understand by Python (for example a string with a comma
8   separated list of keywords into an actual Python list of strings).
9
102. The expansion (or post-processing) of these values according to the
11   semantics ``setuptools`` assign to them (for example a configuration field
12   with the ``file:`` directive should be expanded from a list of file paths to
13   a single string with the contents of those files concatenated)
14
15This module focus on the second step, and therefore allow sharing the expansion
16functions among several configuration file formats.
17"""
18import ast
19import importlib
20import io
21import os
22import sys
23import warnings
24from glob import iglob
25from configparser import ConfigParser
26from importlib.machinery import ModuleSpec
27from itertools import chain
28from typing import (
29    TYPE_CHECKING,
30    Callable,
31    Dict,
32    Iterable,
33    Iterator,
34    List,
35    Mapping,
36    Optional,
37    Tuple,
38    TypeVar,
39    Union,
40    cast
41)
42from types import ModuleType
43
44from distutils.errors import DistutilsOptionError
45
46if TYPE_CHECKING:
47    from setuptools.dist import Distribution  # noqa
48    from setuptools.discovery import ConfigDiscovery  # noqa
49    from distutils.dist import DistributionMetadata  # noqa
50
51chain_iter = chain.from_iterable
52_Path = Union[str, os.PathLike]
53_K = TypeVar("_K")
54_V = TypeVar("_V", covariant=True)
55
56
57class StaticModule:
58    """Proxy to a module object that avoids executing arbitrary code."""
59
60    def __init__(self, name: str, spec: ModuleSpec):
61        with open(spec.origin) as strm:  # type: ignore
62            src = strm.read()
63        module = ast.parse(src)
64        vars(self).update(locals())
65        del self.self
66
67    def __getattr__(self, attr):
68        """Attempt to load an attribute "statically", via :func:`ast.literal_eval`."""
69        try:
70            assignment_expressions = (
71                statement
72                for statement in self.module.body
73                if isinstance(statement, ast.Assign)
74            )
75            expressions_with_target = (
76                (statement, target)
77                for statement in assignment_expressions
78                for target in statement.targets
79            )
80            matching_values = (
81                statement.value
82                for statement, target in expressions_with_target
83                if isinstance(target, ast.Name) and target.id == attr
84            )
85            return next(ast.literal_eval(value) for value in matching_values)
86        except Exception as e:
87            raise AttributeError(f"{self.name} has no attribute {attr}") from e
88
89
90def glob_relative(
91    patterns: Iterable[str], root_dir: Optional[_Path] = None
92) -> List[str]:
93    """Expand the list of glob patterns, but preserving relative paths.
94
95    :param list[str] patterns: List of glob patterns
96    :param str root_dir: Path to which globs should be relative
97                         (current directory by default)
98    :rtype: list
99    """
100    glob_characters = {'*', '?', '[', ']', '{', '}'}
101    expanded_values = []
102    root_dir = root_dir or os.getcwd()
103    for value in patterns:
104
105        # Has globby characters?
106        if any(char in value for char in glob_characters):
107            # then expand the glob pattern while keeping paths *relative*:
108            glob_path = os.path.abspath(os.path.join(root_dir, value))
109            expanded_values.extend(sorted(
110                os.path.relpath(path, root_dir).replace(os.sep, "/")
111                for path in iglob(glob_path, recursive=True)))
112
113        else:
114            # take the value as-is
115            path = os.path.relpath(value, root_dir).replace(os.sep, "/")
116            expanded_values.append(path)
117
118    return expanded_values
119
120
121def read_files(filepaths: Union[str, bytes, Iterable[_Path]], root_dir=None) -> str:
122    """Return the content of the files concatenated using ``\n`` as str
123
124    This function is sandboxed and won't reach anything outside ``root_dir``
125
126    (By default ``root_dir`` is the current directory).
127    """
128    from setuptools.extern.more_itertools import always_iterable
129
130    root_dir = os.path.abspath(root_dir or os.getcwd())
131    _filepaths = (os.path.join(root_dir, path) for path in always_iterable(filepaths))
132    return '\n'.join(
133        _read_file(path)
134        for path in _filter_existing_files(_filepaths)
135        if _assert_local(path, root_dir)
136    )
137
138
139def _filter_existing_files(filepaths: Iterable[_Path]) -> Iterator[_Path]:
140    for path in filepaths:
141        if os.path.isfile(path):
142            yield path
143        else:
144            warnings.warn(f"File {path!r} cannot be found")
145
146
147def _read_file(filepath: Union[bytes, _Path]) -> str:
148    with io.open(filepath, encoding='utf-8') as f:
149        return f.read()
150
151
152def _assert_local(filepath: _Path, root_dir: str):
153    if not os.path.abspath(filepath).startswith(root_dir):
154        msg = f"Cannot access {filepath!r} (or anything outside {root_dir!r})"
155        raise DistutilsOptionError(msg)
156
157    return True
158
159
160def read_attr(
161    attr_desc: str,
162    package_dir: Optional[Mapping[str, str]] = None,
163    root_dir: Optional[_Path] = None
164):
165    """Reads the value of an attribute from a module.
166
167    This function will try to read the attributed statically first
168    (via :func:`ast.literal_eval`), and only evaluate the module if it fails.
169
170    Examples:
171        read_attr("package.attr")
172        read_attr("package.module.attr")
173
174    :param str attr_desc: Dot-separated string describing how to reach the
175        attribute (see examples above)
176    :param dict[str, str] package_dir: Mapping of package names to their
177        location in disk (represented by paths relative to ``root_dir``).
178    :param str root_dir: Path to directory containing all the packages in
179        ``package_dir`` (current directory by default).
180    :rtype: str
181    """
182    root_dir = root_dir or os.getcwd()
183    attrs_path = attr_desc.strip().split('.')
184    attr_name = attrs_path.pop()
185    module_name = '.'.join(attrs_path)
186    module_name = module_name or '__init__'
187    _parent_path, path, module_name = _find_module(module_name, package_dir, root_dir)
188    spec = _find_spec(module_name, path)
189
190    try:
191        return getattr(StaticModule(module_name, spec), attr_name)
192    except Exception:
193        # fallback to evaluate module
194        module = _load_spec(spec, module_name)
195        return getattr(module, attr_name)
196
197
198def _find_spec(module_name: str, module_path: Optional[_Path]) -> ModuleSpec:
199    spec = importlib.util.spec_from_file_location(module_name, module_path)
200    spec = spec or importlib.util.find_spec(module_name)
201
202    if spec is None:
203        raise ModuleNotFoundError(module_name)
204
205    return spec
206
207
208def _load_spec(spec: ModuleSpec, module_name: str) -> ModuleType:
209    name = getattr(spec, "__name__", module_name)
210    if name in sys.modules:
211        return sys.modules[name]
212    module = importlib.util.module_from_spec(spec)
213    sys.modules[name] = module  # cache (it also ensures `==` works on loaded items)
214    spec.loader.exec_module(module)  # type: ignore
215    return module
216
217
218def _find_module(
219    module_name: str, package_dir: Optional[Mapping[str, str]], root_dir: _Path
220) -> Tuple[_Path, Optional[str], str]:
221    """Given a module (that could normally be imported by ``module_name``
222    after the build is complete), find the path to the parent directory where
223    it is contained and the canonical name that could be used to import it
224    considering the ``package_dir`` in the build configuration and ``root_dir``
225    """
226    parent_path = root_dir
227    module_parts = module_name.split('.')
228    if package_dir:
229        if module_parts[0] in package_dir:
230            # A custom path was specified for the module we want to import
231            custom_path = package_dir[module_parts[0]]
232            parts = custom_path.rsplit('/', 1)
233            if len(parts) > 1:
234                parent_path = os.path.join(root_dir, parts[0])
235                parent_module = parts[1]
236            else:
237                parent_module = custom_path
238            module_name = ".".join([parent_module, *module_parts[1:]])
239        elif '' in package_dir:
240            # A custom parent directory was specified for all root modules
241            parent_path = os.path.join(root_dir, package_dir[''])
242
243    path_start = os.path.join(parent_path, *module_name.split("."))
244    candidates = chain(
245        (f"{path_start}.py", os.path.join(path_start, "__init__.py")),
246        iglob(f"{path_start}.*")
247    )
248    module_path = next((x for x in candidates if os.path.isfile(x)), None)
249    return parent_path, module_path, module_name
250
251
252def resolve_class(
253    qualified_class_name: str,
254    package_dir: Optional[Mapping[str, str]] = None,
255    root_dir: Optional[_Path] = None
256) -> Callable:
257    """Given a qualified class name, return the associated class object"""
258    root_dir = root_dir or os.getcwd()
259    idx = qualified_class_name.rfind('.')
260    class_name = qualified_class_name[idx + 1 :]
261    pkg_name = qualified_class_name[:idx]
262
263    _parent_path, path, module_name = _find_module(pkg_name, package_dir, root_dir)
264    module = _load_spec(_find_spec(module_name, path), module_name)
265    return getattr(module, class_name)
266
267
268def cmdclass(
269    values: Dict[str, str],
270    package_dir: Optional[Mapping[str, str]] = None,
271    root_dir: Optional[_Path] = None
272) -> Dict[str, Callable]:
273    """Given a dictionary mapping command names to strings for qualified class
274    names, apply :func:`resolve_class` to the dict values.
275    """
276    return {k: resolve_class(v, package_dir, root_dir) for k, v in values.items()}
277
278
279def find_packages(
280    *,
281    namespaces=True,
282    fill_package_dir: Optional[Dict[str, str]] = None,
283    root_dir: Optional[_Path] = None,
284    **kwargs
285) -> List[str]:
286    """Works similarly to :func:`setuptools.find_packages`, but with all
287    arguments given as keyword arguments. Moreover, ``where`` can be given
288    as a list (the results will be simply concatenated).
289
290    When the additional keyword argument ``namespaces`` is ``True``, it will
291    behave like :func:`setuptools.find_namespace_packages`` (i.e. include
292    implicit namespaces as per :pep:`420`).
293
294    The ``where`` argument will be considered relative to ``root_dir`` (or the current
295    working directory when ``root_dir`` is not given).
296
297    If the ``fill_package_dir`` argument is passed, this function will consider it as a
298    similar data structure to the ``package_dir`` configuration parameter add fill-in
299    any missing package location.
300
301    :rtype: list
302    """
303    from setuptools.discovery import construct_package_dir
304    from setuptools.extern.more_itertools import unique_everseen, always_iterable
305
306    if namespaces:
307        from setuptools.discovery import PEP420PackageFinder as PackageFinder
308    else:
309        from setuptools.discovery import PackageFinder  # type: ignore
310
311    root_dir = root_dir or os.curdir
312    where = kwargs.pop('where', ['.'])
313    packages: List[str] = []
314    fill_package_dir = {} if fill_package_dir is None else fill_package_dir
315
316    for path in unique_everseen(always_iterable(where)):
317        package_path = _nest_path(root_dir, path)
318        pkgs = PackageFinder.find(package_path, **kwargs)
319        packages.extend(pkgs)
320        if pkgs and not (
321            fill_package_dir.get("") == path
322            or os.path.samefile(package_path, root_dir)
323        ):
324            fill_package_dir.update(construct_package_dir(pkgs, path))
325
326    return packages
327
328
329def _nest_path(parent: _Path, path: _Path) -> str:
330    path = parent if path == "." else os.path.join(parent, path)
331    return os.path.normpath(path)
332
333
334def version(value: Union[Callable, Iterable[Union[str, int]], str]) -> str:
335    """When getting the version directly from an attribute,
336    it should be normalised to string.
337    """
338    if callable(value):
339        value = value()
340
341    value = cast(Iterable[Union[str, int]], value)
342
343    if not isinstance(value, str):
344        if hasattr(value, '__iter__'):
345            value = '.'.join(map(str, value))
346        else:
347            value = '%s' % value
348
349    return value
350
351
352def canonic_package_data(package_data: dict) -> dict:
353    if "*" in package_data:
354        package_data[""] = package_data.pop("*")
355    return package_data
356
357
358def canonic_data_files(
359    data_files: Union[list, dict], root_dir: Optional[_Path] = None
360) -> List[Tuple[str, List[str]]]:
361    """For compatibility with ``setup.py``, ``data_files`` should be a list
362    of pairs instead of a dict.
363
364    This function also expands glob patterns.
365    """
366    if isinstance(data_files, list):
367        return data_files
368
369    return [
370        (dest, glob_relative(patterns, root_dir))
371        for dest, patterns in data_files.items()
372    ]
373
374
375def entry_points(text: str, text_source="entry-points") -> Dict[str, dict]:
376    """Given the contents of entry-points file,
377    process it into a 2-level dictionary (``dict[str, dict[str, str]]``).
378    The first level keys are entry-point groups, the second level keys are
379    entry-point names, and the second level values are references to objects
380    (that correspond to the entry-point value).
381    """
382    parser = ConfigParser(default_section=None, delimiters=("=",))  # type: ignore
383    parser.optionxform = str  # case sensitive
384    parser.read_string(text, text_source)
385    groups = {k: dict(v.items()) for k, v in parser.items()}
386    groups.pop(parser.default_section, None)
387    return groups
388
389
390class EnsurePackagesDiscovered:
391    """Some expand functions require all the packages to already be discovered before
392    they run, e.g. :func:`read_attr`, :func:`resolve_class`, :func:`cmdclass`.
393
394    Therefore in some cases we will need to run autodiscovery during the evaluation of
395    the configuration. However, it is better to postpone calling package discovery as
396    much as possible, because some parameters can influence it (e.g. ``package_dir``),
397    and those might not have been processed yet.
398    """
399
400    def __init__(self, distribution: "Distribution"):
401        self._dist = distribution
402        self._called = False
403
404    def __call__(self):
405        """Trigger the automatic package discovery, if it is still necessary."""
406        if not self._called:
407            self._called = True
408            self._dist.set_defaults(name=False)  # Skip name, we can still be parsing
409
410    def __enter__(self):
411        return self
412
413    def __exit__(self, _exc_type, _exc_value, _traceback):
414        if self._called:
415            self._dist.set_defaults.analyse_name()  # Now we can set a default name
416
417    def _get_package_dir(self) -> Mapping[str, str]:
418        self()
419        pkg_dir = self._dist.package_dir
420        return {} if pkg_dir is None else pkg_dir
421
422    @property
423    def package_dir(self) -> Mapping[str, str]:
424        """Proxy to ``package_dir`` that may trigger auto-discovery when used."""
425        return LazyMappingProxy(self._get_package_dir)
426
427
428class LazyMappingProxy(Mapping[_K, _V]):
429    """Mapping proxy that delays resolving the target object, until really needed.
430
431    >>> def obtain_mapping():
432    ...     print("Running expensive function!")
433    ...     return {"key": "value", "other key": "other value"}
434    >>> mapping = LazyMappingProxy(obtain_mapping)
435    >>> mapping["key"]
436    Running expensive function!
437    'value'
438    >>> mapping["other key"]
439    'other value'
440    """
441
442    def __init__(self, obtain_mapping_value: Callable[[], Mapping[_K, _V]]):
443        self._obtain = obtain_mapping_value
444        self._value: Optional[Mapping[_K, _V]] = None
445
446    def _target(self) -> Mapping[_K, _V]:
447        if self._value is None:
448            self._value = self._obtain()
449        return self._value
450
451    def __getitem__(self, key: _K) -> _V:
452        return self._target()[key]
453
454    def __len__(self) -> int:
455        return len(self._target())
456
457    def __iter__(self) -> Iterator[_K]:
458        return iter(self._target())
459