]>
Commit | Line | Data |
---|---|---|
53e6db90 DC |
1 | """Functions related to finding and loading plugins.""" |
2 | from __future__ import annotations | |
3 | ||
4 | import configparser | |
5 | import importlib.metadata | |
6 | import inspect | |
7 | import itertools | |
8 | import logging | |
9 | import sys | |
10 | from typing import Any | |
11 | from typing import Generator | |
12 | from typing import Iterable | |
13 | from typing import NamedTuple | |
14 | ||
15 | from flake8 import utils | |
16 | from flake8.defaults import VALID_CODE_PREFIX | |
17 | from flake8.exceptions import ExecutionError | |
18 | from flake8.exceptions import FailedToLoadPlugin | |
19 | ||
20 | LOG = logging.getLogger(__name__) | |
21 | ||
22 | FLAKE8_GROUPS = frozenset(("flake8.extension", "flake8.report")) | |
23 | ||
24 | BANNED_PLUGINS = { | |
25 | "flake8-colors": "5.0", | |
26 | "flake8-per-file-ignores": "3.7", | |
27 | } | |
28 | ||
29 | ||
30 | class Plugin(NamedTuple): | |
31 | """A plugin before loading.""" | |
32 | ||
33 | package: str | |
34 | version: str | |
35 | entry_point: importlib.metadata.EntryPoint | |
36 | ||
37 | ||
38 | class LoadedPlugin(NamedTuple): | |
39 | """Represents a plugin after being imported.""" | |
40 | ||
41 | plugin: Plugin | |
42 | obj: Any | |
43 | parameters: dict[str, bool] | |
44 | ||
45 | @property | |
46 | def entry_name(self) -> str: | |
47 | """Return the name given in the packaging metadata.""" | |
48 | return self.plugin.entry_point.name | |
49 | ||
50 | @property | |
51 | def display_name(self) -> str: | |
52 | """Return the name for use in user-facing / error messages.""" | |
53 | return f"{self.plugin.package}[{self.entry_name}]" | |
54 | ||
55 | ||
56 | class Checkers(NamedTuple): | |
57 | """Classified plugins needed for checking.""" | |
58 | ||
59 | tree: list[LoadedPlugin] | |
60 | logical_line: list[LoadedPlugin] | |
61 | physical_line: list[LoadedPlugin] | |
62 | ||
63 | ||
64 | class Plugins(NamedTuple): | |
65 | """Classified plugins.""" | |
66 | ||
67 | checkers: Checkers | |
68 | reporters: dict[str, LoadedPlugin] | |
69 | disabled: list[LoadedPlugin] | |
70 | ||
71 | def all_plugins(self) -> Generator[LoadedPlugin, None, None]: | |
72 | """Return an iterator over all :class:`LoadedPlugin`s.""" | |
73 | yield from self.checkers.tree | |
74 | yield from self.checkers.logical_line | |
75 | yield from self.checkers.physical_line | |
76 | yield from self.reporters.values() | |
77 | ||
78 | def versions_str(self) -> str: | |
79 | """Return a user-displayed list of plugin versions.""" | |
80 | return ", ".join( | |
81 | sorted( | |
82 | { | |
83 | f"{loaded.plugin.package}: {loaded.plugin.version}" | |
84 | for loaded in self.all_plugins() | |
85 | if loaded.plugin.package not in {"flake8", "local"} | |
86 | } | |
87 | ) | |
88 | ) | |
89 | ||
90 | ||
91 | class PluginOptions(NamedTuple): | |
92 | """Options related to plugin loading.""" | |
93 | ||
94 | local_plugin_paths: tuple[str, ...] | |
95 | enable_extensions: frozenset[str] | |
96 | require_plugins: frozenset[str] | |
97 | ||
98 | @classmethod | |
99 | def blank(cls) -> PluginOptions: | |
100 | """Make a blank PluginOptions, mostly used for tests.""" | |
101 | return cls( | |
102 | local_plugin_paths=(), | |
103 | enable_extensions=frozenset(), | |
104 | require_plugins=frozenset(), | |
105 | ) | |
106 | ||
107 | ||
108 | def _parse_option( | |
109 | cfg: configparser.RawConfigParser, | |
110 | cfg_opt_name: str, | |
111 | opt: str | None, | |
112 | ) -> list[str]: | |
113 | # specified on commandline: use that | |
114 | if opt is not None: | |
115 | return utils.parse_comma_separated_list(opt) | |
116 | else: | |
117 | # ideally this would reuse our config parsing framework but we need to | |
118 | # parse this from preliminary options before plugins are enabled | |
119 | for opt_name in (cfg_opt_name, cfg_opt_name.replace("_", "-")): | |
120 | val = cfg.get("flake8", opt_name, fallback=None) | |
121 | if val is not None: | |
122 | return utils.parse_comma_separated_list(val) | |
123 | else: | |
124 | return [] | |
125 | ||
126 | ||
127 | def parse_plugin_options( | |
128 | cfg: configparser.RawConfigParser, | |
129 | cfg_dir: str, | |
130 | *, | |
131 | enable_extensions: str | None, | |
132 | require_plugins: str | None, | |
133 | ) -> PluginOptions: | |
134 | """Parse plugin loading related options.""" | |
135 | paths_s = cfg.get("flake8:local-plugins", "paths", fallback="").strip() | |
136 | paths = utils.parse_comma_separated_list(paths_s) | |
137 | paths = utils.normalize_paths(paths, cfg_dir) | |
138 | ||
139 | return PluginOptions( | |
140 | local_plugin_paths=tuple(paths), | |
141 | enable_extensions=frozenset( | |
142 | _parse_option(cfg, "enable_extensions", enable_extensions), | |
143 | ), | |
144 | require_plugins=frozenset( | |
145 | _parse_option(cfg, "require_plugins", require_plugins), | |
146 | ), | |
147 | ) | |
148 | ||
149 | ||
150 | def _flake8_plugins( | |
151 | eps: Iterable[importlib.metadata.EntryPoint], | |
152 | name: str, | |
153 | version: str, | |
154 | ) -> Generator[Plugin, None, None]: | |
155 | pyflakes_meta = importlib.metadata.distribution("pyflakes").metadata | |
156 | pycodestyle_meta = importlib.metadata.distribution("pycodestyle").metadata | |
157 | ||
158 | for ep in eps: | |
159 | if ep.group not in FLAKE8_GROUPS: | |
160 | continue | |
161 | ||
162 | if ep.name == "F": | |
163 | yield Plugin(pyflakes_meta["name"], pyflakes_meta["version"], ep) | |
164 | elif ep.name in "EW": | |
165 | # pycodestyle provides both `E` and `W` -- but our default select | |
166 | # handles those | |
167 | # ideally pycodestyle's plugin entrypoints would exactly represent | |
168 | # the codes they produce... | |
169 | yield Plugin( | |
170 | pycodestyle_meta["name"], pycodestyle_meta["version"], ep | |
171 | ) | |
172 | else: | |
173 | yield Plugin(name, version, ep) | |
174 | ||
175 | ||
176 | def _find_importlib_plugins() -> Generator[Plugin, None, None]: | |
177 | # some misconfigured pythons (RHEL) have things on `sys.path` twice | |
178 | seen = set() | |
179 | for dist in importlib.metadata.distributions(): | |
180 | # assigned to prevent continual reparsing | |
181 | eps = dist.entry_points | |
182 | ||
183 | # perf: skip parsing `.metadata` (slow) if no entry points match | |
184 | if not any(ep.group in FLAKE8_GROUPS for ep in eps): | |
185 | continue | |
186 | ||
187 | # assigned to prevent continual reparsing | |
188 | meta = dist.metadata | |
189 | ||
190 | if meta["name"] in seen: | |
191 | continue | |
192 | else: | |
193 | seen.add(meta["name"]) | |
194 | ||
195 | if meta["name"] in BANNED_PLUGINS: | |
196 | LOG.warning( | |
197 | "%s plugin is obsolete in flake8>=%s", | |
198 | meta["name"], | |
199 | BANNED_PLUGINS[meta["name"]], | |
200 | ) | |
201 | continue | |
202 | elif meta["name"] == "flake8": | |
203 | # special case flake8 which provides plugins for pyflakes / | |
204 | # pycodestyle | |
205 | yield from _flake8_plugins(eps, meta["name"], meta["version"]) | |
206 | continue | |
207 | ||
208 | for ep in eps: | |
209 | if ep.group in FLAKE8_GROUPS: | |
210 | yield Plugin(meta["name"], meta["version"], ep) | |
211 | ||
212 | ||
213 | def _find_local_plugins( | |
214 | cfg: configparser.RawConfigParser, | |
215 | ) -> Generator[Plugin, None, None]: | |
216 | for plugin_type in ("extension", "report"): | |
217 | group = f"flake8.{plugin_type}" | |
218 | for plugin_s in utils.parse_comma_separated_list( | |
219 | cfg.get("flake8:local-plugins", plugin_type, fallback="").strip(), | |
220 | regexp=utils.LOCAL_PLUGIN_LIST_RE, | |
221 | ): | |
222 | name, _, entry_str = plugin_s.partition("=") | |
223 | name, entry_str = name.strip(), entry_str.strip() | |
224 | ep = importlib.metadata.EntryPoint(name, entry_str, group) | |
225 | yield Plugin("local", "local", ep) | |
226 | ||
227 | ||
228 | def _check_required_plugins( | |
229 | plugins: list[Plugin], | |
230 | expected: frozenset[str], | |
231 | ) -> None: | |
232 | plugin_names = { | |
233 | utils.normalize_pypi_name(plugin.package) for plugin in plugins | |
234 | } | |
235 | expected_names = {utils.normalize_pypi_name(name) for name in expected} | |
236 | missing_plugins = expected_names - plugin_names | |
237 | ||
238 | if missing_plugins: | |
239 | raise ExecutionError( | |
240 | f"required plugins were not installed!\n" | |
241 | f"- installed: {', '.join(sorted(plugin_names))}\n" | |
242 | f"- expected: {', '.join(sorted(expected_names))}\n" | |
243 | f"- missing: {', '.join(sorted(missing_plugins))}" | |
244 | ) | |
245 | ||
246 | ||
247 | def find_plugins( | |
248 | cfg: configparser.RawConfigParser, | |
249 | opts: PluginOptions, | |
250 | ) -> list[Plugin]: | |
251 | """Discovers all plugins (but does not load them).""" | |
252 | ret = [*_find_importlib_plugins(), *_find_local_plugins(cfg)] | |
253 | ||
254 | # for determinism, sort the list | |
255 | ret.sort() | |
256 | ||
257 | _check_required_plugins(ret, opts.require_plugins) | |
258 | ||
259 | return ret | |
260 | ||
261 | ||
262 | def _parameters_for(func: Any) -> dict[str, bool]: | |
263 | """Return the parameters for the plugin. | |
264 | ||
265 | This will inspect the plugin and return either the function parameters | |
266 | if the plugin is a function or the parameters for ``__init__`` after | |
267 | ``self`` if the plugin is a class. | |
268 | ||
269 | :returns: | |
270 | A dictionary mapping the parameter name to whether or not it is | |
271 | required (a.k.a., is positional only/does not have a default). | |
272 | """ | |
273 | is_class = not inspect.isfunction(func) | |
274 | if is_class: | |
275 | func = func.__init__ | |
276 | ||
277 | parameters = { | |
278 | parameter.name: parameter.default is inspect.Parameter.empty | |
279 | for parameter in inspect.signature(func).parameters.values() | |
280 | if parameter.kind is inspect.Parameter.POSITIONAL_OR_KEYWORD | |
281 | } | |
282 | ||
283 | if is_class: | |
284 | parameters.pop("self", None) | |
285 | ||
286 | return parameters | |
287 | ||
288 | ||
289 | def _load_plugin(plugin: Plugin) -> LoadedPlugin: | |
290 | try: | |
291 | obj = plugin.entry_point.load() | |
292 | except Exception as e: | |
293 | raise FailedToLoadPlugin(plugin.package, e) | |
294 | ||
295 | if not callable(obj): | |
296 | err = TypeError("expected loaded plugin to be callable") | |
297 | raise FailedToLoadPlugin(plugin.package, err) | |
298 | ||
299 | return LoadedPlugin(plugin, obj, _parameters_for(obj)) | |
300 | ||
301 | ||
302 | def _import_plugins( | |
303 | plugins: list[Plugin], | |
304 | opts: PluginOptions, | |
305 | ) -> list[LoadedPlugin]: | |
306 | sys.path.extend(opts.local_plugin_paths) | |
307 | return [_load_plugin(p) for p in plugins] | |
308 | ||
309 | ||
310 | def _classify_plugins( | |
311 | plugins: list[LoadedPlugin], | |
312 | opts: PluginOptions, | |
313 | ) -> Plugins: | |
314 | tree = [] | |
315 | logical_line = [] | |
316 | physical_line = [] | |
317 | reporters = {} | |
318 | disabled = [] | |
319 | ||
320 | for loaded in plugins: | |
321 | if ( | |
322 | getattr(loaded.obj, "off_by_default", False) | |
323 | and loaded.plugin.entry_point.name not in opts.enable_extensions | |
324 | ): | |
325 | disabled.append(loaded) | |
326 | elif loaded.plugin.entry_point.group == "flake8.report": | |
327 | reporters[loaded.entry_name] = loaded | |
328 | elif "tree" in loaded.parameters: | |
329 | tree.append(loaded) | |
330 | elif "logical_line" in loaded.parameters: | |
331 | logical_line.append(loaded) | |
332 | elif "physical_line" in loaded.parameters: | |
333 | physical_line.append(loaded) | |
334 | else: | |
335 | raise NotImplementedError(f"what plugin type? {loaded}") | |
336 | ||
337 | for loaded in itertools.chain(tree, logical_line, physical_line): | |
338 | if not VALID_CODE_PREFIX.match(loaded.entry_name): | |
339 | raise ExecutionError( | |
340 | f"plugin code for `{loaded.display_name}` does not match " | |
341 | f"{VALID_CODE_PREFIX.pattern}" | |
342 | ) | |
343 | ||
344 | return Plugins( | |
345 | checkers=Checkers( | |
346 | tree=tree, | |
347 | logical_line=logical_line, | |
348 | physical_line=physical_line, | |
349 | ), | |
350 | reporters=reporters, | |
351 | disabled=disabled, | |
352 | ) | |
353 | ||
354 | ||
355 | def load_plugins( | |
356 | plugins: list[Plugin], | |
357 | opts: PluginOptions, | |
358 | ) -> Plugins: | |
359 | """Load and classify all flake8 plugins. | |
360 | ||
361 | - first: extends ``sys.path`` with ``paths`` (to import local plugins) | |
362 | - next: converts the ``Plugin``s to ``LoadedPlugins`` | |
363 | - finally: classifies plugins into their specific types | |
364 | """ | |
365 | return _classify_plugins(_import_plugins(plugins, opts), opts) |