]> crepu.dev Git - config.git/blame - djavu-asus/elpy/rpc-venv/lib/python3.11/site-packages/flake8/checker.py
Configuracion en desarrollo PC pega
[config.git] / djavu-asus / elpy / rpc-venv / lib / python3.11 / site-packages / flake8 / checker.py
CommitLineData
53e6db90
DC
1"""Checker Manager and Checker classes."""
2from __future__ import annotations
3
4import argparse
5import contextlib
6import errno
7import logging
8import multiprocessing.pool
9import operator
10import signal
11import tokenize
12from typing import Any
13from typing import Generator
14from typing import List
15from typing import Optional
16from typing import Sequence
17from typing import Tuple
18
19from flake8 import defaults
20from flake8 import exceptions
21from flake8 import processor
22from flake8 import utils
23from flake8._compat import FSTRING_START
24from flake8.discover_files import expand_paths
25from flake8.options.parse_args import parse_args
26from flake8.plugins.finder import Checkers
27from flake8.plugins.finder import LoadedPlugin
28from flake8.style_guide import StyleGuideManager
29
30Results = List[Tuple[str, int, int, str, Optional[str]]]
31
32LOG = logging.getLogger(__name__)
33
34SERIAL_RETRY_ERRNOS = {
35 # ENOSPC: Added by sigmavirus24
36 # > On some operating systems (OSX), multiprocessing may cause an
37 # > ENOSPC error while trying to create a Semaphore.
38 # > In those cases, we should replace the customized Queue Report
39 # > class with pep8's StandardReport class to ensure users don't run
40 # > into this problem.
41 # > (See also: https://github.com/pycqa/flake8/issues/117)
42 errno.ENOSPC,
43 # NOTE(sigmavirus24): When adding to this list, include the reasoning
44 # on the lines before the error code and always append your error
45 # code. Further, please always add a trailing `,` to reduce the visual
46 # noise in diffs.
47}
48
49_mp_plugins: Checkers
50_mp_options: argparse.Namespace
51
52
53@contextlib.contextmanager
54def _mp_prefork(
55 plugins: Checkers, options: argparse.Namespace
56) -> Generator[None, None, None]:
57 # we can save significant startup work w/ `fork` multiprocessing
58 global _mp_plugins, _mp_options
59 _mp_plugins, _mp_options = plugins, options
60 try:
61 yield
62 finally:
63 del _mp_plugins, _mp_options
64
65
66def _mp_init(argv: Sequence[str]) -> None:
67 global _mp_plugins, _mp_options
68
69 # Ensure correct signaling of ^C using multiprocessing.Pool.
70 signal.signal(signal.SIGINT, signal.SIG_IGN)
71
72 try:
73 _mp_plugins, _mp_options # for `fork` this'll already be set
74 except NameError:
75 plugins, options = parse_args(argv)
76 _mp_plugins, _mp_options = plugins.checkers, options
77
78
79def _mp_run(filename: str) -> tuple[str, Results, dict[str, int]]:
80 return FileChecker(
81 filename=filename, plugins=_mp_plugins, options=_mp_options
82 ).run_checks()
83
84
85class Manager:
86 """Manage the parallelism and checker instances for each plugin and file.
87
88 This class will be responsible for the following:
89
90 - Determining the parallelism of Flake8, e.g.:
91
92 * Do we use :mod:`multiprocessing` or is it unavailable?
93
94 * Do we automatically decide on the number of jobs to use or did the
95 user provide that?
96
97 - Falling back to a serial way of processing files if we run into an
98 OSError related to :mod:`multiprocessing`
99
100 - Organizing the results of each checker so we can group the output
101 together and make our output deterministic.
102 """
103
104 def __init__(
105 self,
106 style_guide: StyleGuideManager,
107 plugins: Checkers,
108 argv: Sequence[str],
109 ) -> None:
110 """Initialize our Manager instance."""
111 self.style_guide = style_guide
112 self.options = style_guide.options
113 self.plugins = plugins
114 self.jobs = self._job_count()
115 self.statistics = {
116 "files": 0,
117 "logical lines": 0,
118 "physical lines": 0,
119 "tokens": 0,
120 }
121 self.exclude = (*self.options.exclude, *self.options.extend_exclude)
122 self.argv = argv
123 self.results: list[tuple[str, Results, dict[str, int]]] = []
124
125 def _process_statistics(self) -> None:
126 for _, _, statistics in self.results:
127 for statistic in defaults.STATISTIC_NAMES:
128 self.statistics[statistic] += statistics[statistic]
129 self.statistics["files"] += len(self.filenames)
130
131 def _job_count(self) -> int:
132 # First we walk through all of our error cases:
133 # - multiprocessing library is not present
134 # - the user provided stdin and that's not something we can handle
135 # well
136 # - the user provided some awful input
137
138 if utils.is_using_stdin(self.options.filenames):
139 LOG.warning(
140 "The --jobs option is not compatible with supplying "
141 "input using - . Ignoring --jobs arguments."
142 )
143 return 0
144
145 jobs = self.options.jobs
146
147 # If the value is "auto", we want to let the multiprocessing library
148 # decide the number based on the number of CPUs. However, if that
149 # function is not implemented for this particular value of Python we
150 # default to 1
151 if jobs.is_auto:
152 try:
153 return multiprocessing.cpu_count()
154 except NotImplementedError:
155 return 0
156
157 # Otherwise, we know jobs should be an integer and we can just convert
158 # it to an integer
159 return jobs.n_jobs
160
161 def _handle_results(self, filename: str, results: Results) -> int:
162 style_guide = self.style_guide
163 reported_results_count = 0
164 for error_code, line_number, column, text, physical_line in results:
165 reported_results_count += style_guide.handle_error(
166 code=error_code,
167 filename=filename,
168 line_number=line_number,
169 column_number=column,
170 text=text,
171 physical_line=physical_line,
172 )
173 return reported_results_count
174
175 def report(self) -> tuple[int, int]:
176 """Report all of the errors found in the managed file checkers.
177
178 This iterates over each of the checkers and reports the errors sorted
179 by line number.
180
181 :returns:
182 A tuple of the total results found and the results reported.
183 """
184 results_reported = results_found = 0
185 self.results.sort(key=operator.itemgetter(0))
186 for filename, results, _ in self.results:
187 results.sort(key=operator.itemgetter(1, 2))
188 with self.style_guide.processing_file(filename):
189 results_reported += self._handle_results(filename, results)
190 results_found += len(results)
191 return (results_found, results_reported)
192
193 def run_parallel(self) -> None:
194 """Run the checkers in parallel."""
195 with _mp_prefork(self.plugins, self.options):
196 pool = _try_initialize_processpool(self.jobs, self.argv)
197
198 if pool is None:
199 self.run_serial()
200 return
201
202 pool_closed = False
203 try:
204 self.results = list(pool.imap_unordered(_mp_run, self.filenames))
205 pool.close()
206 pool.join()
207 pool_closed = True
208 finally:
209 if not pool_closed:
210 pool.terminate()
211 pool.join()
212
213 def run_serial(self) -> None:
214 """Run the checkers in serial."""
215 self.results = [
216 FileChecker(
217 filename=filename,
218 plugins=self.plugins,
219 options=self.options,
220 ).run_checks()
221 for filename in self.filenames
222 ]
223
224 def run(self) -> None:
225 """Run all the checkers.
226
227 This will intelligently decide whether to run the checks in parallel
228 or whether to run them in serial.
229
230 If running the checks in parallel causes a problem (e.g.,
231 :issue:`117`) this also implements fallback to serial processing.
232 """
233 try:
234 if self.jobs > 1 and len(self.filenames) > 1:
235 self.run_parallel()
236 else:
237 self.run_serial()
238 except KeyboardInterrupt:
239 LOG.warning("Flake8 was interrupted by the user")
240 raise exceptions.EarlyQuit("Early quit while running checks")
241
242 def start(self) -> None:
243 """Start checking files.
244
245 :param paths:
246 Path names to check. This is passed directly to
247 :meth:`~Manager.make_checkers`.
248 """
249 LOG.info("Making checkers")
250 self.filenames = tuple(
251 expand_paths(
252 paths=self.options.filenames,
253 stdin_display_name=self.options.stdin_display_name,
254 filename_patterns=self.options.filename,
255 exclude=self.exclude,
256 )
257 )
258
259 def stop(self) -> None:
260 """Stop checking files."""
261 self._process_statistics()
262
263
264class FileChecker:
265 """Manage running checks for a file and aggregate the results."""
266
267 def __init__(
268 self,
269 *,
270 filename: str,
271 plugins: Checkers,
272 options: argparse.Namespace,
273 ) -> None:
274 """Initialize our file checker."""
275 self.options = options
276 self.filename = filename
277 self.plugins = plugins
278 self.results: Results = []
279 self.statistics = {
280 "tokens": 0,
281 "logical lines": 0,
282 "physical lines": 0,
283 }
284 self.processor = self._make_processor()
285 self.display_name = filename
286 self.should_process = False
287 if self.processor is not None:
288 self.display_name = self.processor.filename
289 self.should_process = not self.processor.should_ignore_file()
290 self.statistics["physical lines"] = len(self.processor.lines)
291
292 def __repr__(self) -> str:
293 """Provide helpful debugging representation."""
294 return f"FileChecker for {self.filename}"
295
296 def _make_processor(self) -> processor.FileProcessor | None:
297 try:
298 return processor.FileProcessor(self.filename, self.options)
299 except OSError as e:
300 # If we can not read the file due to an IOError (e.g., the file
301 # does not exist or we do not have the permissions to open it)
302 # then we need to format that exception for the user.
303 # NOTE(sigmavirus24): Historically, pep8 has always reported this
304 # as an E902. We probably *want* a better error code for this
305 # going forward.
306 self.report("E902", 0, 0, f"{type(e).__name__}: {e}")
307 return None
308
309 def report(
310 self,
311 error_code: str | None,
312 line_number: int,
313 column: int,
314 text: str,
315 ) -> str:
316 """Report an error by storing it in the results list."""
317 if error_code is None:
318 error_code, text = text.split(" ", 1)
319
320 # If we're recovering from a problem in _make_processor, we will not
321 # have this attribute.
322 if hasattr(self, "processor") and self.processor is not None:
323 line = self.processor.noqa_line_for(line_number)
324 else:
325 line = None
326
327 self.results.append((error_code, line_number, column, text, line))
328 return error_code
329
330 def run_check(self, plugin: LoadedPlugin, **arguments: Any) -> Any:
331 """Run the check in a single plugin."""
332 assert self.processor is not None, self.filename
333 try:
334 params = self.processor.keyword_arguments_for(
335 plugin.parameters, arguments
336 )
337 except AttributeError as ae:
338 raise exceptions.PluginRequestedUnknownParameters(
339 plugin_name=plugin.display_name, exception=ae
340 )
341 try:
342 return plugin.obj(**arguments, **params)
343 except Exception as all_exc:
344 LOG.critical(
345 "Plugin %s raised an unexpected exception",
346 plugin.display_name,
347 exc_info=True,
348 )
349 raise exceptions.PluginExecutionFailed(
350 filename=self.filename,
351 plugin_name=plugin.display_name,
352 exception=all_exc,
353 )
354
355 @staticmethod
356 def _extract_syntax_information(exception: Exception) -> tuple[int, int]:
357 if (
358 len(exception.args) > 1
359 and exception.args[1]
360 and len(exception.args[1]) > 2
361 ):
362 token = exception.args[1]
363 row, column = token[1:3]
364 elif (
365 isinstance(exception, tokenize.TokenError)
366 and len(exception.args) == 2
367 and len(exception.args[1]) == 2
368 ):
369 token = ()
370 row, column = exception.args[1]
371 else:
372 token = ()
373 row, column = (1, 0)
374
375 if (
376 column > 0
377 and token
378 and isinstance(exception, SyntaxError)
379 and len(token) == 4 # Python 3.9 or earlier
380 ):
381 # NOTE(sigmavirus24): SyntaxErrors report 1-indexed column
382 # numbers. We need to decrement the column number by 1 at
383 # least.
384 column_offset = 1
385 row_offset = 0
386 # See also: https://github.com/pycqa/flake8/issues/169,
387 # https://github.com/PyCQA/flake8/issues/1372
388 # On Python 3.9 and earlier, token will be a 4-item tuple with the
389 # last item being the string. Starting with 3.10, they added to
390 # the tuple so now instead of it ending with the code that failed
391 # to parse, it ends with the end of the section of code that
392 # failed to parse. Luckily the absolute position in the tuple is
393 # stable across versions so we can use that here
394 physical_line = token[3]
395
396 # NOTE(sigmavirus24): Not all "tokens" have a string as the last
397 # argument. In this event, let's skip trying to find the correct
398 # column and row values.
399 if physical_line is not None:
400 # NOTE(sigmavirus24): SyntaxErrors also don't exactly have a
401 # "physical" line so much as what was accumulated by the point
402 # tokenizing failed.
403 # See also: https://github.com/pycqa/flake8/issues/169
404 lines = physical_line.rstrip("\n").split("\n")
405 row_offset = len(lines) - 1
406 logical_line = lines[0]
407 logical_line_length = len(logical_line)
408 if column > logical_line_length:
409 column = logical_line_length
410 row -= row_offset
411 column -= column_offset
412 return row, column
413
414 def run_ast_checks(self) -> None:
415 """Run all checks expecting an abstract syntax tree."""
416 assert self.processor is not None, self.filename
417 ast = self.processor.build_ast()
418
419 for plugin in self.plugins.tree:
420 checker = self.run_check(plugin, tree=ast)
421 # If the plugin uses a class, call the run method of it, otherwise
422 # the call should return something iterable itself
423 try:
424 runner = checker.run()
425 except AttributeError:
426 runner = checker
427 for line_number, offset, text, _ in runner:
428 self.report(
429 error_code=None,
430 line_number=line_number,
431 column=offset,
432 text=text,
433 )
434
435 def run_logical_checks(self) -> None:
436 """Run all checks expecting a logical line."""
437 assert self.processor is not None
438 comments, logical_line, mapping = self.processor.build_logical_line()
439 if not mapping:
440 return
441 self.processor.update_state(mapping)
442
443 LOG.debug('Logical line: "%s"', logical_line.rstrip())
444
445 for plugin in self.plugins.logical_line:
446 self.processor.update_checker_state_for(plugin)
447 results = self.run_check(plugin, logical_line=logical_line) or ()
448 for offset, text in results:
449 line_number, column_offset = find_offset(offset, mapping)
450 if line_number == column_offset == 0:
451 LOG.warning("position of error out of bounds: %s", plugin)
452 self.report(
453 error_code=None,
454 line_number=line_number,
455 column=column_offset,
456 text=text,
457 )
458
459 self.processor.next_logical_line()
460
461 def run_physical_checks(self, physical_line: str) -> None:
462 """Run all checks for a given physical line.
463
464 A single physical check may return multiple errors.
465 """
466 assert self.processor is not None
467 for plugin in self.plugins.physical_line:
468 self.processor.update_checker_state_for(plugin)
469 result = self.run_check(plugin, physical_line=physical_line)
470
471 if result is not None:
472 # This is a single result if first element is an int
473 column_offset = None
474 try:
475 column_offset = result[0]
476 except (IndexError, TypeError):
477 pass
478
479 if isinstance(column_offset, int):
480 # If we only have a single result, convert to a collection
481 result = (result,)
482
483 for result_single in result:
484 column_offset, text = result_single
485 self.report(
486 error_code=None,
487 line_number=self.processor.line_number,
488 column=column_offset,
489 text=text,
490 )
491
492 def process_tokens(self) -> None:
493 """Process tokens and trigger checks.
494
495 Instead of using this directly, you should use
496 :meth:`flake8.checker.FileChecker.run_checks`.
497 """
498 assert self.processor is not None
499 parens = 0
500 statistics = self.statistics
501 file_processor = self.processor
502 prev_physical = ""
503 for token in file_processor.generate_tokens():
504 statistics["tokens"] += 1
505 self.check_physical_eol(token, prev_physical)
506 token_type, text = token[0:2]
507 if token_type == tokenize.OP:
508 parens = processor.count_parentheses(parens, text)
509 elif parens == 0:
510 if processor.token_is_newline(token):
511 self.handle_newline(token_type)
512 prev_physical = token[4]
513
514 if file_processor.tokens:
515 # If any tokens are left over, process them
516 self.run_physical_checks(file_processor.lines[-1])
517 self.run_logical_checks()
518
519 def run_checks(self) -> tuple[str, Results, dict[str, int]]:
520 """Run checks against the file."""
521 if self.processor is None or not self.should_process:
522 return self.display_name, self.results, self.statistics
523
524 try:
525 self.run_ast_checks()
526 self.process_tokens()
527 except (SyntaxError, tokenize.TokenError) as e:
528 code = "E902" if isinstance(e, tokenize.TokenError) else "E999"
529 row, column = self._extract_syntax_information(e)
530 self.report(code, row, column, f"{type(e).__name__}: {e.args[0]}")
531 return self.display_name, self.results, self.statistics
532
533 logical_lines = self.processor.statistics["logical lines"]
534 self.statistics["logical lines"] = logical_lines
535 return self.display_name, self.results, self.statistics
536
537 def handle_newline(self, token_type: int) -> None:
538 """Handle the logic when encountering a newline token."""
539 assert self.processor is not None
540 if token_type == tokenize.NEWLINE:
541 self.run_logical_checks()
542 self.processor.reset_blank_before()
543 elif len(self.processor.tokens) == 1:
544 # The physical line contains only this token.
545 self.processor.visited_new_blank_line()
546 self.processor.delete_first_token()
547 else:
548 self.run_logical_checks()
549
550 def check_physical_eol(
551 self, token: tokenize.TokenInfo, prev_physical: str
552 ) -> None:
553 """Run physical checks if and only if it is at the end of the line."""
554 assert self.processor is not None
555 if token.type == FSTRING_START: # pragma: >=3.12 cover
556 self.processor.fstring_start(token.start[0])
557 # a newline token ends a single physical line.
558 elif processor.is_eol_token(token):
559 # if the file does not end with a newline, the NEWLINE
560 # token is inserted by the parser, but it does not contain
561 # the previous physical line in `token[4]`
562 if token.line == "":
563 self.run_physical_checks(prev_physical)
564 else:
565 self.run_physical_checks(token.line)
566 elif processor.is_multiline_string(token):
567 # Less obviously, a string that contains newlines is a
568 # multiline string, either triple-quoted or with internal
569 # newlines backslash-escaped. Check every physical line in the
570 # string *except* for the last one: its newline is outside of
571 # the multiline string, so we consider it a regular physical
572 # line, and will check it like any other physical line.
573 #
574 # Subtleties:
575 # - have to wind self.line_number back because initially it
576 # points to the last line of the string, and we want
577 # check_physical() to give accurate feedback
578 for line in self.processor.multiline_string(token):
579 self.run_physical_checks(line)
580
581
582def _try_initialize_processpool(
583 job_count: int,
584 argv: Sequence[str],
585) -> multiprocessing.pool.Pool | None:
586 """Return a new process pool instance if we are able to create one."""
587 try:
588 return multiprocessing.Pool(job_count, _mp_init, initargs=(argv,))
589 except OSError as err:
590 if err.errno not in SERIAL_RETRY_ERRNOS:
591 raise
592 except ImportError:
593 pass
594
595 return None
596
597
598def find_offset(
599 offset: int, mapping: processor._LogicalMapping
600) -> tuple[int, int]:
601 """Find the offset tuple for a single offset."""
602 if isinstance(offset, tuple):
603 return offset
604
605 for token in mapping:
606 token_offset = token[0]
607 if offset <= token_offset:
608 position = token[1]
609 break
610 else:
611 position = (0, 0)
612 offset = token_offset = 0
613 return (position[0], position[1] + offset - token_offset)