]> crepu.dev Git - config.git/blob - djavu-asus/emacs/elpy/rpc-venv/lib/python3.11/site-packages/flake8/plugins/finder.py
380ec3ac3d5a928dd6c0aff422053b4717fb6a19
[config.git] / djavu-asus / emacs / elpy / rpc-venv / lib / python3.11 / site-packages / flake8 / plugins / finder.py
1 """Functions related to finding and loading plugins."""
2 from __future__ import annotations
3
4 import configparser
5 import importlib.metadata
6 import inspect
7 import itertools
8 import logging
9 import sys
10 from typing import Any
11 from typing import Generator
12 from typing import Iterable
13 from typing import NamedTuple
14
15 from flake8 import utils
16 from flake8.defaults import VALID_CODE_PREFIX
17 from flake8.exceptions import ExecutionError
18 from flake8.exceptions import FailedToLoadPlugin
19
20 LOG = logging.getLogger(__name__)
21
22 FLAKE8_GROUPS = frozenset(("flake8.extension", "flake8.report"))
23
24 BANNED_PLUGINS = {
25 "flake8-colors": "5.0",
26 "flake8-per-file-ignores": "3.7",
27 }
28
29
30 class Plugin(NamedTuple):
31 """A plugin before loading."""
32
33 package: str
34 version: str
35 entry_point: importlib.metadata.EntryPoint
36
37
38 class LoadedPlugin(NamedTuple):
39 """Represents a plugin after being imported."""
40
41 plugin: Plugin
42 obj: Any
43 parameters: dict[str, bool]
44
45 @property
46 def entry_name(self) -> str:
47 """Return the name given in the packaging metadata."""
48 return self.plugin.entry_point.name
49
50 @property
51 def display_name(self) -> str:
52 """Return the name for use in user-facing / error messages."""
53 return f"{self.plugin.package}[{self.entry_name}]"
54
55
56 class Checkers(NamedTuple):
57 """Classified plugins needed for checking."""
58
59 tree: list[LoadedPlugin]
60 logical_line: list[LoadedPlugin]
61 physical_line: list[LoadedPlugin]
62
63
64 class Plugins(NamedTuple):
65 """Classified plugins."""
66
67 checkers: Checkers
68 reporters: dict[str, LoadedPlugin]
69 disabled: list[LoadedPlugin]
70
71 def all_plugins(self) -> Generator[LoadedPlugin, None, None]:
72 """Return an iterator over all :class:`LoadedPlugin`s."""
73 yield from self.checkers.tree
74 yield from self.checkers.logical_line
75 yield from self.checkers.physical_line
76 yield from self.reporters.values()
77
78 def versions_str(self) -> str:
79 """Return a user-displayed list of plugin versions."""
80 return ", ".join(
81 sorted(
82 {
83 f"{loaded.plugin.package}: {loaded.plugin.version}"
84 for loaded in self.all_plugins()
85 if loaded.plugin.package not in {"flake8", "local"}
86 }
87 )
88 )
89
90
91 class PluginOptions(NamedTuple):
92 """Options related to plugin loading."""
93
94 local_plugin_paths: tuple[str, ...]
95 enable_extensions: frozenset[str]
96 require_plugins: frozenset[str]
97
98 @classmethod
99 def blank(cls) -> PluginOptions:
100 """Make a blank PluginOptions, mostly used for tests."""
101 return cls(
102 local_plugin_paths=(),
103 enable_extensions=frozenset(),
104 require_plugins=frozenset(),
105 )
106
107
108 def _parse_option(
109 cfg: configparser.RawConfigParser,
110 cfg_opt_name: str,
111 opt: str | None,
112 ) -> list[str]:
113 # specified on commandline: use that
114 if opt is not None:
115 return utils.parse_comma_separated_list(opt)
116 else:
117 # ideally this would reuse our config parsing framework but we need to
118 # parse this from preliminary options before plugins are enabled
119 for opt_name in (cfg_opt_name, cfg_opt_name.replace("_", "-")):
120 val = cfg.get("flake8", opt_name, fallback=None)
121 if val is not None:
122 return utils.parse_comma_separated_list(val)
123 else:
124 return []
125
126
127 def parse_plugin_options(
128 cfg: configparser.RawConfigParser,
129 cfg_dir: str,
130 *,
131 enable_extensions: str | None,
132 require_plugins: str | None,
133 ) -> PluginOptions:
134 """Parse plugin loading related options."""
135 paths_s = cfg.get("flake8:local-plugins", "paths", fallback="").strip()
136 paths = utils.parse_comma_separated_list(paths_s)
137 paths = utils.normalize_paths(paths, cfg_dir)
138
139 return PluginOptions(
140 local_plugin_paths=tuple(paths),
141 enable_extensions=frozenset(
142 _parse_option(cfg, "enable_extensions", enable_extensions),
143 ),
144 require_plugins=frozenset(
145 _parse_option(cfg, "require_plugins", require_plugins),
146 ),
147 )
148
149
150 def _flake8_plugins(
151 eps: Iterable[importlib.metadata.EntryPoint],
152 name: str,
153 version: str,
154 ) -> Generator[Plugin, None, None]:
155 pyflakes_meta = importlib.metadata.distribution("pyflakes").metadata
156 pycodestyle_meta = importlib.metadata.distribution("pycodestyle").metadata
157
158 for ep in eps:
159 if ep.group not in FLAKE8_GROUPS:
160 continue
161
162 if ep.name == "F":
163 yield Plugin(pyflakes_meta["name"], pyflakes_meta["version"], ep)
164 elif ep.name in "EW":
165 # pycodestyle provides both `E` and `W` -- but our default select
166 # handles those
167 # ideally pycodestyle's plugin entrypoints would exactly represent
168 # the codes they produce...
169 yield Plugin(
170 pycodestyle_meta["name"], pycodestyle_meta["version"], ep
171 )
172 else:
173 yield Plugin(name, version, ep)
174
175
176 def _find_importlib_plugins() -> Generator[Plugin, None, None]:
177 # some misconfigured pythons (RHEL) have things on `sys.path` twice
178 seen = set()
179 for dist in importlib.metadata.distributions():
180 # assigned to prevent continual reparsing
181 eps = dist.entry_points
182
183 # perf: skip parsing `.metadata` (slow) if no entry points match
184 if not any(ep.group in FLAKE8_GROUPS for ep in eps):
185 continue
186
187 # assigned to prevent continual reparsing
188 meta = dist.metadata
189
190 if meta["name"] in seen:
191 continue
192 else:
193 seen.add(meta["name"])
194
195 if meta["name"] in BANNED_PLUGINS:
196 LOG.warning(
197 "%s plugin is obsolete in flake8>=%s",
198 meta["name"],
199 BANNED_PLUGINS[meta["name"]],
200 )
201 continue
202 elif meta["name"] == "flake8":
203 # special case flake8 which provides plugins for pyflakes /
204 # pycodestyle
205 yield from _flake8_plugins(eps, meta["name"], meta["version"])
206 continue
207
208 for ep in eps:
209 if ep.group in FLAKE8_GROUPS:
210 yield Plugin(meta["name"], meta["version"], ep)
211
212
213 def _find_local_plugins(
214 cfg: configparser.RawConfigParser,
215 ) -> Generator[Plugin, None, None]:
216 for plugin_type in ("extension", "report"):
217 group = f"flake8.{plugin_type}"
218 for plugin_s in utils.parse_comma_separated_list(
219 cfg.get("flake8:local-plugins", plugin_type, fallback="").strip(),
220 regexp=utils.LOCAL_PLUGIN_LIST_RE,
221 ):
222 name, _, entry_str = plugin_s.partition("=")
223 name, entry_str = name.strip(), entry_str.strip()
224 ep = importlib.metadata.EntryPoint(name, entry_str, group)
225 yield Plugin("local", "local", ep)
226
227
228 def _check_required_plugins(
229 plugins: list[Plugin],
230 expected: frozenset[str],
231 ) -> None:
232 plugin_names = {
233 utils.normalize_pypi_name(plugin.package) for plugin in plugins
234 }
235 expected_names = {utils.normalize_pypi_name(name) for name in expected}
236 missing_plugins = expected_names - plugin_names
237
238 if missing_plugins:
239 raise ExecutionError(
240 f"required plugins were not installed!\n"
241 f"- installed: {', '.join(sorted(plugin_names))}\n"
242 f"- expected: {', '.join(sorted(expected_names))}\n"
243 f"- missing: {', '.join(sorted(missing_plugins))}"
244 )
245
246
247 def find_plugins(
248 cfg: configparser.RawConfigParser,
249 opts: PluginOptions,
250 ) -> list[Plugin]:
251 """Discovers all plugins (but does not load them)."""
252 ret = [*_find_importlib_plugins(), *_find_local_plugins(cfg)]
253
254 # for determinism, sort the list
255 ret.sort()
256
257 _check_required_plugins(ret, opts.require_plugins)
258
259 return ret
260
261
262 def _parameters_for(func: Any) -> dict[str, bool]:
263 """Return the parameters for the plugin.
264
265 This will inspect the plugin and return either the function parameters
266 if the plugin is a function or the parameters for ``__init__`` after
267 ``self`` if the plugin is a class.
268
269 :returns:
270 A dictionary mapping the parameter name to whether or not it is
271 required (a.k.a., is positional only/does not have a default).
272 """
273 is_class = not inspect.isfunction(func)
274 if is_class:
275 func = func.__init__
276
277 parameters = {
278 parameter.name: parameter.default is inspect.Parameter.empty
279 for parameter in inspect.signature(func).parameters.values()
280 if parameter.kind is inspect.Parameter.POSITIONAL_OR_KEYWORD
281 }
282
283 if is_class:
284 parameters.pop("self", None)
285
286 return parameters
287
288
289 def _load_plugin(plugin: Plugin) -> LoadedPlugin:
290 try:
291 obj = plugin.entry_point.load()
292 except Exception as e:
293 raise FailedToLoadPlugin(plugin.package, e)
294
295 if not callable(obj):
296 err = TypeError("expected loaded plugin to be callable")
297 raise FailedToLoadPlugin(plugin.package, err)
298
299 return LoadedPlugin(plugin, obj, _parameters_for(obj))
300
301
302 def _import_plugins(
303 plugins: list[Plugin],
304 opts: PluginOptions,
305 ) -> list[LoadedPlugin]:
306 sys.path.extend(opts.local_plugin_paths)
307 return [_load_plugin(p) for p in plugins]
308
309
310 def _classify_plugins(
311 plugins: list[LoadedPlugin],
312 opts: PluginOptions,
313 ) -> Plugins:
314 tree = []
315 logical_line = []
316 physical_line = []
317 reporters = {}
318 disabled = []
319
320 for loaded in plugins:
321 if (
322 getattr(loaded.obj, "off_by_default", False)
323 and loaded.plugin.entry_point.name not in opts.enable_extensions
324 ):
325 disabled.append(loaded)
326 elif loaded.plugin.entry_point.group == "flake8.report":
327 reporters[loaded.entry_name] = loaded
328 elif "tree" in loaded.parameters:
329 tree.append(loaded)
330 elif "logical_line" in loaded.parameters:
331 logical_line.append(loaded)
332 elif "physical_line" in loaded.parameters:
333 physical_line.append(loaded)
334 else:
335 raise NotImplementedError(f"what plugin type? {loaded}")
336
337 for loaded in itertools.chain(tree, logical_line, physical_line):
338 if not VALID_CODE_PREFIX.match(loaded.entry_name):
339 raise ExecutionError(
340 f"plugin code for `{loaded.display_name}` does not match "
341 f"{VALID_CODE_PREFIX.pattern}"
342 )
343
344 return Plugins(
345 checkers=Checkers(
346 tree=tree,
347 logical_line=logical_line,
348 physical_line=physical_line,
349 ),
350 reporters=reporters,
351 disabled=disabled,
352 )
353
354
355 def load_plugins(
356 plugins: list[Plugin],
357 opts: PluginOptions,
358 ) -> Plugins:
359 """Load and classify all flake8 plugins.
360
361 - first: extends ``sys.path`` with ``paths`` (to import local plugins)
362 - next: converts the ``Plugin``s to ``LoadedPlugins``
363 - finally: classifies plugins into their specific types
364 """
365 return _classify_plugins(_import_plugins(plugins, opts), opts)