]>
Commit | Line | Data |
---|---|---|
53e6db90 DC |
1 | """Utility functions to expand configuration directives or special values |
2 | (such glob patterns). | |
3 | ||
4 | We can split the process of interpreting configuration files into 2 steps: | |
5 | ||
6 | 1. The parsing the file contents from strings to value objects | |
7 | that can be understand by Python (for example a string with a comma | |
8 | separated list of keywords into an actual Python list of strings). | |
9 | ||
10 | 2. The expansion (or post-processing) of these values according to the | |
11 | semantics ``setuptools`` assign to them (for example a configuration field | |
12 | with the ``file:`` directive should be expanded from a list of file paths to | |
13 | a single string with the contents of those files concatenated) | |
14 | ||
15 | This module focus on the second step, and therefore allow sharing the expansion | |
16 | functions among several configuration file formats. | |
17 | ||
18 | **PRIVATE MODULE**: API reserved for setuptools internal usage only. | |
19 | """ | |
20 | import ast | |
21 | import importlib | |
22 | import io | |
23 | import os | |
24 | import pathlib | |
25 | import sys | |
26 | import warnings | |
27 | from glob import iglob | |
28 | from configparser import ConfigParser | |
29 | from importlib.machinery import ModuleSpec | |
30 | from itertools import chain | |
31 | from typing import ( | |
32 | TYPE_CHECKING, | |
33 | Callable, | |
34 | Dict, | |
35 | Iterable, | |
36 | Iterator, | |
37 | List, | |
38 | Mapping, | |
39 | Optional, | |
40 | Tuple, | |
41 | TypeVar, | |
42 | Union, | |
43 | cast | |
44 | ) | |
45 | from pathlib import Path | |
46 | from types import ModuleType | |
47 | ||
48 | from distutils.errors import DistutilsOptionError | |
49 | ||
50 | from .._path import same_path as _same_path | |
51 | ||
52 | if TYPE_CHECKING: | |
53 | from setuptools.dist import Distribution # noqa | |
54 | from setuptools.discovery import ConfigDiscovery # noqa | |
55 | from distutils.dist import DistributionMetadata # noqa | |
56 | ||
57 | chain_iter = chain.from_iterable | |
58 | _Path = Union[str, os.PathLike] | |
59 | _K = TypeVar("_K") | |
60 | _V = TypeVar("_V", covariant=True) | |
61 | ||
62 | ||
63 | class StaticModule: | |
64 | """Proxy to a module object that avoids executing arbitrary code.""" | |
65 | ||
66 | def __init__(self, name: str, spec: ModuleSpec): | |
67 | module = ast.parse(pathlib.Path(spec.origin).read_bytes()) | |
68 | vars(self).update(locals()) | |
69 | del self.self | |
70 | ||
71 | def _find_assignments(self) -> Iterator[Tuple[ast.AST, ast.AST]]: | |
72 | for statement in self.module.body: | |
73 | if isinstance(statement, ast.Assign): | |
74 | yield from ((target, statement.value) for target in statement.targets) | |
75 | elif isinstance(statement, ast.AnnAssign) and statement.value: | |
76 | yield (statement.target, statement.value) | |
77 | ||
78 | def __getattr__(self, attr): | |
79 | """Attempt to load an attribute "statically", via :func:`ast.literal_eval`.""" | |
80 | try: | |
81 | return next( | |
82 | ast.literal_eval(value) | |
83 | for target, value in self._find_assignments() | |
84 | if isinstance(target, ast.Name) and target.id == attr | |
85 | ) | |
86 | except Exception as e: | |
87 | raise AttributeError(f"{self.name} has no attribute {attr}") from e | |
88 | ||
89 | ||
90 | def glob_relative( | |
91 | patterns: Iterable[str], root_dir: Optional[_Path] = None | |
92 | ) -> List[str]: | |
93 | """Expand the list of glob patterns, but preserving relative paths. | |
94 | ||
95 | :param list[str] patterns: List of glob patterns | |
96 | :param str root_dir: Path to which globs should be relative | |
97 | (current directory by default) | |
98 | :rtype: list | |
99 | """ | |
100 | glob_characters = {'*', '?', '[', ']', '{', '}'} | |
101 | expanded_values = [] | |
102 | root_dir = root_dir or os.getcwd() | |
103 | for value in patterns: | |
104 | ||
105 | # Has globby characters? | |
106 | if any(char in value for char in glob_characters): | |
107 | # then expand the glob pattern while keeping paths *relative*: | |
108 | glob_path = os.path.abspath(os.path.join(root_dir, value)) | |
109 | expanded_values.extend(sorted( | |
110 | os.path.relpath(path, root_dir).replace(os.sep, "/") | |
111 | for path in iglob(glob_path, recursive=True))) | |
112 | ||
113 | else: | |
114 | # take the value as-is | |
115 | path = os.path.relpath(value, root_dir).replace(os.sep, "/") | |
116 | expanded_values.append(path) | |
117 | ||
118 | return expanded_values | |
119 | ||
120 | ||
121 | def read_files(filepaths: Union[str, bytes, Iterable[_Path]], root_dir=None) -> str: | |
122 | """Return the content of the files concatenated using ``\n`` as str | |
123 | ||
124 | This function is sandboxed and won't reach anything outside ``root_dir`` | |
125 | ||
126 | (By default ``root_dir`` is the current directory). | |
127 | """ | |
128 | from setuptools.extern.more_itertools import always_iterable | |
129 | ||
130 | root_dir = os.path.abspath(root_dir or os.getcwd()) | |
131 | _filepaths = (os.path.join(root_dir, path) for path in always_iterable(filepaths)) | |
132 | return '\n'.join( | |
133 | _read_file(path) | |
134 | for path in _filter_existing_files(_filepaths) | |
135 | if _assert_local(path, root_dir) | |
136 | ) | |
137 | ||
138 | ||
139 | def _filter_existing_files(filepaths: Iterable[_Path]) -> Iterator[_Path]: | |
140 | for path in filepaths: | |
141 | if os.path.isfile(path): | |
142 | yield path | |
143 | else: | |
144 | warnings.warn(f"File {path!r} cannot be found") | |
145 | ||
146 | ||
147 | def _read_file(filepath: Union[bytes, _Path]) -> str: | |
148 | with io.open(filepath, encoding='utf-8') as f: | |
149 | return f.read() | |
150 | ||
151 | ||
152 | def _assert_local(filepath: _Path, root_dir: str): | |
153 | if Path(os.path.abspath(root_dir)) not in Path(os.path.abspath(filepath)).parents: | |
154 | msg = f"Cannot access {filepath!r} (or anything outside {root_dir!r})" | |
155 | raise DistutilsOptionError(msg) | |
156 | ||
157 | return True | |
158 | ||
159 | ||
160 | def read_attr( | |
161 | attr_desc: str, | |
162 | package_dir: Optional[Mapping[str, str]] = None, | |
163 | root_dir: Optional[_Path] = None | |
164 | ): | |
165 | """Reads the value of an attribute from a module. | |
166 | ||
167 | This function will try to read the attributed statically first | |
168 | (via :func:`ast.literal_eval`), and only evaluate the module if it fails. | |
169 | ||
170 | Examples: | |
171 | read_attr("package.attr") | |
172 | read_attr("package.module.attr") | |
173 | ||
174 | :param str attr_desc: Dot-separated string describing how to reach the | |
175 | attribute (see examples above) | |
176 | :param dict[str, str] package_dir: Mapping of package names to their | |
177 | location in disk (represented by paths relative to ``root_dir``). | |
178 | :param str root_dir: Path to directory containing all the packages in | |
179 | ``package_dir`` (current directory by default). | |
180 | :rtype: str | |
181 | """ | |
182 | root_dir = root_dir or os.getcwd() | |
183 | attrs_path = attr_desc.strip().split('.') | |
184 | attr_name = attrs_path.pop() | |
185 | module_name = '.'.join(attrs_path) | |
186 | module_name = module_name or '__init__' | |
187 | _parent_path, path, module_name = _find_module(module_name, package_dir, root_dir) | |
188 | spec = _find_spec(module_name, path) | |
189 | ||
190 | try: | |
191 | return getattr(StaticModule(module_name, spec), attr_name) | |
192 | except Exception: | |
193 | # fallback to evaluate module | |
194 | module = _load_spec(spec, module_name) | |
195 | return getattr(module, attr_name) | |
196 | ||
197 | ||
198 | def _find_spec(module_name: str, module_path: Optional[_Path]) -> ModuleSpec: | |
199 | spec = importlib.util.spec_from_file_location(module_name, module_path) | |
200 | spec = spec or importlib.util.find_spec(module_name) | |
201 | ||
202 | if spec is None: | |
203 | raise ModuleNotFoundError(module_name) | |
204 | ||
205 | return spec | |
206 | ||
207 | ||
208 | def _load_spec(spec: ModuleSpec, module_name: str) -> ModuleType: | |
209 | name = getattr(spec, "__name__", module_name) | |
210 | if name in sys.modules: | |
211 | return sys.modules[name] | |
212 | module = importlib.util.module_from_spec(spec) | |
213 | sys.modules[name] = module # cache (it also ensures `==` works on loaded items) | |
214 | spec.loader.exec_module(module) # type: ignore | |
215 | return module | |
216 | ||
217 | ||
218 | def _find_module( | |
219 | module_name: str, package_dir: Optional[Mapping[str, str]], root_dir: _Path | |
220 | ) -> Tuple[_Path, Optional[str], str]: | |
221 | """Given a module (that could normally be imported by ``module_name`` | |
222 | after the build is complete), find the path to the parent directory where | |
223 | it is contained and the canonical name that could be used to import it | |
224 | considering the ``package_dir`` in the build configuration and ``root_dir`` | |
225 | """ | |
226 | parent_path = root_dir | |
227 | module_parts = module_name.split('.') | |
228 | if package_dir: | |
229 | if module_parts[0] in package_dir: | |
230 | # A custom path was specified for the module we want to import | |
231 | custom_path = package_dir[module_parts[0]] | |
232 | parts = custom_path.rsplit('/', 1) | |
233 | if len(parts) > 1: | |
234 | parent_path = os.path.join(root_dir, parts[0]) | |
235 | parent_module = parts[1] | |
236 | else: | |
237 | parent_module = custom_path | |
238 | module_name = ".".join([parent_module, *module_parts[1:]]) | |
239 | elif '' in package_dir: | |
240 | # A custom parent directory was specified for all root modules | |
241 | parent_path = os.path.join(root_dir, package_dir['']) | |
242 | ||
243 | path_start = os.path.join(parent_path, *module_name.split(".")) | |
244 | candidates = chain( | |
245 | (f"{path_start}.py", os.path.join(path_start, "__init__.py")), | |
246 | iglob(f"{path_start}.*") | |
247 | ) | |
248 | module_path = next((x for x in candidates if os.path.isfile(x)), None) | |
249 | return parent_path, module_path, module_name | |
250 | ||
251 | ||
252 | def resolve_class( | |
253 | qualified_class_name: str, | |
254 | package_dir: Optional[Mapping[str, str]] = None, | |
255 | root_dir: Optional[_Path] = None | |
256 | ) -> Callable: | |
257 | """Given a qualified class name, return the associated class object""" | |
258 | root_dir = root_dir or os.getcwd() | |
259 | idx = qualified_class_name.rfind('.') | |
260 | class_name = qualified_class_name[idx + 1 :] | |
261 | pkg_name = qualified_class_name[:idx] | |
262 | ||
263 | _parent_path, path, module_name = _find_module(pkg_name, package_dir, root_dir) | |
264 | module = _load_spec(_find_spec(module_name, path), module_name) | |
265 | return getattr(module, class_name) | |
266 | ||
267 | ||
268 | def cmdclass( | |
269 | values: Dict[str, str], | |
270 | package_dir: Optional[Mapping[str, str]] = None, | |
271 | root_dir: Optional[_Path] = None | |
272 | ) -> Dict[str, Callable]: | |
273 | """Given a dictionary mapping command names to strings for qualified class | |
274 | names, apply :func:`resolve_class` to the dict values. | |
275 | """ | |
276 | return {k: resolve_class(v, package_dir, root_dir) for k, v in values.items()} | |
277 | ||
278 | ||
279 | def find_packages( | |
280 | *, | |
281 | namespaces=True, | |
282 | fill_package_dir: Optional[Dict[str, str]] = None, | |
283 | root_dir: Optional[_Path] = None, | |
284 | **kwargs | |
285 | ) -> List[str]: | |
286 | """Works similarly to :func:`setuptools.find_packages`, but with all | |
287 | arguments given as keyword arguments. Moreover, ``where`` can be given | |
288 | as a list (the results will be simply concatenated). | |
289 | ||
290 | When the additional keyword argument ``namespaces`` is ``True``, it will | |
291 | behave like :func:`setuptools.find_namespace_packages`` (i.e. include | |
292 | implicit namespaces as per :pep:`420`). | |
293 | ||
294 | The ``where`` argument will be considered relative to ``root_dir`` (or the current | |
295 | working directory when ``root_dir`` is not given). | |
296 | ||
297 | If the ``fill_package_dir`` argument is passed, this function will consider it as a | |
298 | similar data structure to the ``package_dir`` configuration parameter add fill-in | |
299 | any missing package location. | |
300 | ||
301 | :rtype: list | |
302 | """ | |
303 | from setuptools.discovery import construct_package_dir | |
304 | from setuptools.extern.more_itertools import unique_everseen, always_iterable | |
305 | ||
306 | if namespaces: | |
307 | from setuptools.discovery import PEP420PackageFinder as PackageFinder | |
308 | else: | |
309 | from setuptools.discovery import PackageFinder # type: ignore | |
310 | ||
311 | root_dir = root_dir or os.curdir | |
312 | where = kwargs.pop('where', ['.']) | |
313 | packages: List[str] = [] | |
314 | fill_package_dir = {} if fill_package_dir is None else fill_package_dir | |
315 | search = list(unique_everseen(always_iterable(where))) | |
316 | ||
317 | if len(search) == 1 and all(not _same_path(search[0], x) for x in (".", root_dir)): | |
318 | fill_package_dir.setdefault("", search[0]) | |
319 | ||
320 | for path in search: | |
321 | package_path = _nest_path(root_dir, path) | |
322 | pkgs = PackageFinder.find(package_path, **kwargs) | |
323 | packages.extend(pkgs) | |
324 | if pkgs and not ( | |
325 | fill_package_dir.get("") == path | |
326 | or os.path.samefile(package_path, root_dir) | |
327 | ): | |
328 | fill_package_dir.update(construct_package_dir(pkgs, path)) | |
329 | ||
330 | return packages | |
331 | ||
332 | ||
333 | def _nest_path(parent: _Path, path: _Path) -> str: | |
334 | path = parent if path in {".", ""} else os.path.join(parent, path) | |
335 | return os.path.normpath(path) | |
336 | ||
337 | ||
338 | def version(value: Union[Callable, Iterable[Union[str, int]], str]) -> str: | |
339 | """When getting the version directly from an attribute, | |
340 | it should be normalised to string. | |
341 | """ | |
342 | if callable(value): | |
343 | value = value() | |
344 | ||
345 | value = cast(Iterable[Union[str, int]], value) | |
346 | ||
347 | if not isinstance(value, str): | |
348 | if hasattr(value, '__iter__'): | |
349 | value = '.'.join(map(str, value)) | |
350 | else: | |
351 | value = '%s' % value | |
352 | ||
353 | return value | |
354 | ||
355 | ||
356 | def canonic_package_data(package_data: dict) -> dict: | |
357 | if "*" in package_data: | |
358 | package_data[""] = package_data.pop("*") | |
359 | return package_data | |
360 | ||
361 | ||
362 | def canonic_data_files( | |
363 | data_files: Union[list, dict], root_dir: Optional[_Path] = None | |
364 | ) -> List[Tuple[str, List[str]]]: | |
365 | """For compatibility with ``setup.py``, ``data_files`` should be a list | |
366 | of pairs instead of a dict. | |
367 | ||
368 | This function also expands glob patterns. | |
369 | """ | |
370 | if isinstance(data_files, list): | |
371 | return data_files | |
372 | ||
373 | return [ | |
374 | (dest, glob_relative(patterns, root_dir)) | |
375 | for dest, patterns in data_files.items() | |
376 | ] | |
377 | ||
378 | ||
379 | def entry_points(text: str, text_source="entry-points") -> Dict[str, dict]: | |
380 | """Given the contents of entry-points file, | |
381 | process it into a 2-level dictionary (``dict[str, dict[str, str]]``). | |
382 | The first level keys are entry-point groups, the second level keys are | |
383 | entry-point names, and the second level values are references to objects | |
384 | (that correspond to the entry-point value). | |
385 | """ | |
386 | parser = ConfigParser(default_section=None, delimiters=("=",)) # type: ignore | |
387 | parser.optionxform = str # case sensitive | |
388 | parser.read_string(text, text_source) | |
389 | groups = {k: dict(v.items()) for k, v in parser.items()} | |
390 | groups.pop(parser.default_section, None) | |
391 | return groups | |
392 | ||
393 | ||
394 | class EnsurePackagesDiscovered: | |
395 | """Some expand functions require all the packages to already be discovered before | |
396 | they run, e.g. :func:`read_attr`, :func:`resolve_class`, :func:`cmdclass`. | |
397 | ||
398 | Therefore in some cases we will need to run autodiscovery during the evaluation of | |
399 | the configuration. However, it is better to postpone calling package discovery as | |
400 | much as possible, because some parameters can influence it (e.g. ``package_dir``), | |
401 | and those might not have been processed yet. | |
402 | """ | |
403 | ||
404 | def __init__(self, distribution: "Distribution"): | |
405 | self._dist = distribution | |
406 | self._called = False | |
407 | ||
408 | def __call__(self): | |
409 | """Trigger the automatic package discovery, if it is still necessary.""" | |
410 | if not self._called: | |
411 | self._called = True | |
412 | self._dist.set_defaults(name=False) # Skip name, we can still be parsing | |
413 | ||
414 | def __enter__(self): | |
415 | return self | |
416 | ||
417 | def __exit__(self, _exc_type, _exc_value, _traceback): | |
418 | if self._called: | |
419 | self._dist.set_defaults.analyse_name() # Now we can set a default name | |
420 | ||
421 | def _get_package_dir(self) -> Mapping[str, str]: | |
422 | self() | |
423 | pkg_dir = self._dist.package_dir | |
424 | return {} if pkg_dir is None else pkg_dir | |
425 | ||
426 | @property | |
427 | def package_dir(self) -> Mapping[str, str]: | |
428 | """Proxy to ``package_dir`` that may trigger auto-discovery when used.""" | |
429 | return LazyMappingProxy(self._get_package_dir) | |
430 | ||
431 | ||
432 | class LazyMappingProxy(Mapping[_K, _V]): | |
433 | """Mapping proxy that delays resolving the target object, until really needed. | |
434 | ||
435 | >>> def obtain_mapping(): | |
436 | ... print("Running expensive function!") | |
437 | ... return {"key": "value", "other key": "other value"} | |
438 | >>> mapping = LazyMappingProxy(obtain_mapping) | |
439 | >>> mapping["key"] | |
440 | Running expensive function! | |
441 | 'value' | |
442 | >>> mapping["other key"] | |
443 | 'other value' | |
444 | """ | |
445 | ||
446 | def __init__(self, obtain_mapping_value: Callable[[], Mapping[_K, _V]]): | |
447 | self._obtain = obtain_mapping_value | |
448 | self._value: Optional[Mapping[_K, _V]] = None | |
449 | ||
450 | def _target(self) -> Mapping[_K, _V]: | |
451 | if self._value is None: | |
452 | self._value = self._obtain() | |
453 | return self._value | |
454 | ||
455 | def __getitem__(self, key: _K) -> _V: | |
456 | return self._target()[key] | |
457 | ||
458 | def __len__(self) -> int: | |
459 | return len(self._target()) | |
460 | ||
461 | def __iter__(self) -> Iterator[_K]: | |
462 | return iter(self._target()) |