]>
crepu.dev Git - config.git/blob - djavu-asus/elpy/rpc-venv/lib/python3.11/site-packages/flake8/checker.py
1 """Checker Manager and Checker classes."""
2 from __future__
import annotations
8 import multiprocessing
.pool
12 from typing
import Any
13 from typing
import Generator
14 from typing
import List
15 from typing
import Optional
16 from typing
import Sequence
17 from typing
import Tuple
19 from flake8
import defaults
20 from flake8
import exceptions
21 from flake8
import processor
22 from flake8
import utils
23 from flake8
._compat
import FSTRING_START
24 from flake8
.discover_files
import expand_paths
25 from flake8
.options
.parse_args
import parse_args
26 from flake8
.plugins
.finder
import Checkers
27 from flake8
.plugins
.finder
import LoadedPlugin
28 from flake8
.style_guide
import StyleGuideManager
30 Results
= List
[Tuple
[str, int, int, str, Optional
[str]]]
32 LOG
= logging
.getLogger(__name__
)
34 SERIAL_RETRY_ERRNOS
= {
35 # ENOSPC: Added by sigmavirus24
36 # > On some operating systems (OSX), multiprocessing may cause an
37 # > ENOSPC error while trying to create a Semaphore.
38 # > In those cases, we should replace the customized Queue Report
39 # > class with pep8's StandardReport class to ensure users don't run
40 # > into this problem.
41 # > (See also: https://github.com/pycqa/flake8/issues/117)
43 # NOTE(sigmavirus24): When adding to this list, include the reasoning
44 # on the lines before the error code and always append your error
45 # code. Further, please always add a trailing `,` to reduce the visual
50 _mp_options
: argparse
.Namespace
53 @contextlib.contextmanager
55 plugins
: Checkers
, options
: argparse
.Namespace
56 ) -> Generator
[None, None, None]:
57 # we can save significant startup work w/ `fork` multiprocessing
58 global _mp_plugins
, _mp_options
59 _mp_plugins
, _mp_options
= plugins
, options
63 del _mp_plugins
, _mp_options
66 def _mp_init(argv
: Sequence
[str]) -> None:
67 global _mp_plugins
, _mp_options
69 # Ensure correct signaling of ^C using multiprocessing.Pool.
70 signal
.signal(signal
.SIGINT
, signal
.SIG_IGN
)
73 _mp_plugins
, _mp_options
# for `fork` this'll already be set
75 plugins
, options
= parse_args(argv
)
76 _mp_plugins
, _mp_options
= plugins
.checkers
, options
79 def _mp_run(filename
: str) -> tuple[str, Results
, dict[str, int]]:
81 filename
=filename
, plugins
=_mp_plugins
, options
=_mp_options
86 """Manage the parallelism and checker instances for each plugin and file.
88 This class will be responsible for the following:
90 - Determining the parallelism of Flake8, e.g.:
92 * Do we use :mod:`multiprocessing` or is it unavailable?
94 * Do we automatically decide on the number of jobs to use or did the
97 - Falling back to a serial way of processing files if we run into an
98 OSError related to :mod:`multiprocessing`
100 - Organizing the results of each checker so we can group the output
101 together and make our output deterministic.
106 style_guide
: StyleGuideManager
,
110 """Initialize our Manager instance."""
111 self
.style_guide
= style_guide
112 self
.options
= style_guide
.options
113 self
.plugins
= plugins
114 self
.jobs
= self
._job
_count
()
121 self
.exclude
= (*self
.options
.exclude
, *self
.options
.extend_exclude
)
123 self
.results
: list[tuple[str, Results
, dict[str, int]]] = []
125 def _process_statistics(self
) -> None:
126 for _
, _
, statistics
in self
.results
:
127 for statistic
in defaults
.STATISTIC_NAMES
:
128 self
.statistics
[statistic
] += statistics
[statistic
]
129 self
.statistics
["files"] += len(self
.filenames
)
131 def _job_count(self
) -> int:
132 # First we walk through all of our error cases:
133 # - multiprocessing library is not present
134 # - the user provided stdin and that's not something we can handle
136 # - the user provided some awful input
138 if utils
.is_using_stdin(self
.options
.filenames
):
140 "The --jobs option is not compatible with supplying "
141 "input using - . Ignoring --jobs arguments."
145 jobs
= self
.options
.jobs
147 # If the value is "auto", we want to let the multiprocessing library
148 # decide the number based on the number of CPUs. However, if that
149 # function is not implemented for this particular value of Python we
153 return multiprocessing
.cpu_count()
154 except NotImplementedError:
157 # Otherwise, we know jobs should be an integer and we can just convert
161 def _handle_results(self
, filename
: str, results
: Results
) -> int:
162 style_guide
= self
.style_guide
163 reported_results_count
= 0
164 for error_code
, line_number
, column
, text
, physical_line
in results
:
165 reported_results_count
+= style_guide
.handle_error(
168 line_number
=line_number
,
169 column_number
=column
,
171 physical_line
=physical_line
,
173 return reported_results_count
175 def report(self
) -> tuple[int, int]:
176 """Report all of the errors found in the managed file checkers.
178 This iterates over each of the checkers and reports the errors sorted
182 A tuple of the total results found and the results reported.
184 results_reported
= results_found
= 0
185 self
.results
.sort(key
=operator
.itemgetter(0))
186 for filename
, results
, _
in self
.results
:
187 results
.sort(key
=operator
.itemgetter(1, 2))
188 with self
.style_guide
.processing_file(filename
):
189 results_reported
+= self
._handle
_results
(filename
, results
)
190 results_found
+= len(results
)
191 return (results_found
, results_reported
)
193 def run_parallel(self
) -> None:
194 """Run the checkers in parallel."""
195 with
_mp_prefork(self
.plugins
, self
.options
):
196 pool
= _try_initialize_processpool(self
.jobs
, self
.argv
)
204 self
.results
= list(pool
.imap_unordered(_mp_run
, self
.filenames
))
213 def run_serial(self
) -> None:
214 """Run the checkers in serial."""
218 plugins
=self
.plugins
,
219 options
=self
.options
,
221 for filename
in self
.filenames
224 def run(self
) -> None:
225 """Run all the checkers.
227 This will intelligently decide whether to run the checks in parallel
228 or whether to run them in serial.
230 If running the checks in parallel causes a problem (e.g.,
231 :issue:`117`) this also implements fallback to serial processing.
234 if self
.jobs
> 1 and len(self
.filenames
) > 1:
238 except KeyboardInterrupt:
239 LOG
.warning("Flake8 was interrupted by the user")
240 raise exceptions
.EarlyQuit("Early quit while running checks")
242 def start(self
) -> None:
243 """Start checking files.
246 Path names to check. This is passed directly to
247 :meth:`~Manager.make_checkers`.
249 LOG
.info("Making checkers")
250 self
.filenames
= tuple(
252 paths
=self
.options
.filenames
,
253 stdin_display_name
=self
.options
.stdin_display_name
,
254 filename_patterns
=self
.options
.filename
,
255 exclude
=self
.exclude
,
259 def stop(self
) -> None:
260 """Stop checking files."""
261 self
._process
_statistics
()
265 """Manage running checks for a file and aggregate the results."""
272 options
: argparse
.Namespace
,
274 """Initialize our file checker."""
275 self
.options
= options
276 self
.filename
= filename
277 self
.plugins
= plugins
278 self
.results
: Results
= []
284 self
.processor
= self
._make
_processor
()
285 self
.display_name
= filename
286 self
.should_process
= False
287 if self
.processor
is not None:
288 self
.display_name
= self
.processor
.filename
289 self
.should_process
= not self
.processor
.should_ignore_file()
290 self
.statistics
["physical lines"] = len(self
.processor
.lines
)
292 def __repr__(self
) -> str:
293 """Provide helpful debugging representation."""
294 return f
"FileChecker for {self.filename}"
296 def _make_processor(self
) -> processor
.FileProcessor |
None:
298 return processor
.FileProcessor(self
.filename
, self
.options
)
300 # If we can not read the file due to an IOError (e.g., the file
301 # does not exist or we do not have the permissions to open it)
302 # then we need to format that exception for the user.
303 # NOTE(sigmavirus24): Historically, pep8 has always reported this
304 # as an E902. We probably *want* a better error code for this
306 self
.report("E902", 0, 0, f
"{type(e).__name__}: {e}")
311 error_code
: str |
None,
316 """Report an error by storing it in the results list."""
317 if error_code
is None:
318 error_code
, text
= text
.split(" ", 1)
320 # If we're recovering from a problem in _make_processor, we will not
321 # have this attribute.
322 if hasattr(self
, "processor") and self
.processor
is not None:
323 line
= self
.processor
.noqa_line_for(line_number
)
327 self
.results
.append((error_code
, line_number
, column
, text
, line
))
330 def run_check(self
, plugin
: LoadedPlugin
, **arguments
: Any
) -> Any
:
331 """Run the check in a single plugin."""
332 assert self
.processor
is not None, self
.filename
334 params
= self
.processor
.keyword_arguments_for(
335 plugin
.parameters
, arguments
337 except AttributeError as ae
:
338 raise exceptions
.PluginRequestedUnknownParameters(
339 plugin_name
=plugin
.display_name
, exception
=ae
342 return plugin
.obj(**arguments
, **params
)
343 except Exception as all_exc
:
345 "Plugin %s raised an unexpected exception",
349 raise exceptions
.PluginExecutionFailed(
350 filename
=self
.filename
,
351 plugin_name
=plugin
.display_name
,
356 def _extract_syntax_information(exception
: Exception) -> tuple[int, int]:
358 len(exception
.args
) > 1
359 and exception
.args
[1]
360 and len(exception
.args
[1]) > 2
362 token
= exception
.args
[1]
363 row
, column
= token
[1:3]
365 isinstance(exception
, tokenize
.TokenError
)
366 and len(exception
.args
) == 2
367 and len(exception
.args
[1]) == 2
370 row
, column
= exception
.args
[1]
378 and isinstance(exception
, SyntaxError)
379 and len(token
) == 4 # Python 3.9 or earlier
381 # NOTE(sigmavirus24): SyntaxErrors report 1-indexed column
382 # numbers. We need to decrement the column number by 1 at
386 # See also: https://github.com/pycqa/flake8/issues/169,
387 # https://github.com/PyCQA/flake8/issues/1372
388 # On Python 3.9 and earlier, token will be a 4-item tuple with the
389 # last item being the string. Starting with 3.10, they added to
390 # the tuple so now instead of it ending with the code that failed
391 # to parse, it ends with the end of the section of code that
392 # failed to parse. Luckily the absolute position in the tuple is
393 # stable across versions so we can use that here
394 physical_line
= token
[3]
396 # NOTE(sigmavirus24): Not all "tokens" have a string as the last
397 # argument. In this event, let's skip trying to find the correct
398 # column and row values.
399 if physical_line
is not None:
400 # NOTE(sigmavirus24): SyntaxErrors also don't exactly have a
401 # "physical" line so much as what was accumulated by the point
403 # See also: https://github.com/pycqa/flake8/issues/169
404 lines
= physical_line
.rstrip("\n").split("\n")
405 row_offset
= len(lines
) - 1
406 logical_line
= lines
[0]
407 logical_line_length
= len(logical_line
)
408 if column
> logical_line_length
:
409 column
= logical_line_length
411 column
-= column_offset
414 def run_ast_checks(self
) -> None:
415 """Run all checks expecting an abstract syntax tree."""
416 assert self
.processor
is not None, self
.filename
417 ast
= self
.processor
.build_ast()
419 for plugin
in self
.plugins
.tree
:
420 checker
= self
.run_check(plugin
, tree
=ast
)
421 # If the plugin uses a class, call the run method of it, otherwise
422 # the call should return something iterable itself
424 runner
= checker
.run()
425 except AttributeError:
427 for line_number
, offset
, text
, _
in runner
:
430 line_number
=line_number
,
435 def run_logical_checks(self
) -> None:
436 """Run all checks expecting a logical line."""
437 assert self
.processor
is not None
438 comments
, logical_line
, mapping
= self
.processor
.build_logical_line()
441 self
.processor
.update_state(mapping
)
443 LOG
.debug('Logical line: "%s"', logical_line
.rstrip())
445 for plugin
in self
.plugins
.logical_line
:
446 self
.processor
.update_checker_state_for(plugin
)
447 results
= self
.run_check(plugin
, logical_line
=logical_line
) or ()
448 for offset
, text
in results
:
449 line_number
, column_offset
= find_offset(offset
, mapping
)
450 if line_number
== column_offset
== 0:
451 LOG
.warning("position of error out of bounds: %s", plugin
)
454 line_number
=line_number
,
455 column
=column_offset
,
459 self
.processor
.next_logical_line()
461 def run_physical_checks(self
, physical_line
: str) -> None:
462 """Run all checks for a given physical line.
464 A single physical check may return multiple errors.
466 assert self
.processor
is not None
467 for plugin
in self
.plugins
.physical_line
:
468 self
.processor
.update_checker_state_for(plugin
)
469 result
= self
.run_check(plugin
, physical_line
=physical_line
)
471 if result
is not None:
472 # This is a single result if first element is an int
475 column_offset
= result
[0]
476 except (IndexError, TypeError):
479 if isinstance(column_offset
, int):
480 # If we only have a single result, convert to a collection
483 for result_single
in result
:
484 column_offset
, text
= result_single
487 line_number
=self
.processor
.line_number
,
488 column
=column_offset
,
492 def process_tokens(self
) -> None:
493 """Process tokens and trigger checks.
495 Instead of using this directly, you should use
496 :meth:`flake8.checker.FileChecker.run_checks`.
498 assert self
.processor
is not None
500 statistics
= self
.statistics
501 file_processor
= self
.processor
503 for token
in file_processor
.generate_tokens():
504 statistics
["tokens"] += 1
505 self
.check_physical_eol(token
, prev_physical
)
506 token_type
, text
= token
[0:2]
507 if token_type
== tokenize
.OP
:
508 parens
= processor
.count_parentheses(parens
, text
)
510 if processor
.token_is_newline(token
):
511 self
.handle_newline(token_type
)
512 prev_physical
= token
[4]
514 if file_processor
.tokens
:
515 # If any tokens are left over, process them
516 self
.run_physical_checks(file_processor
.lines
[-1])
517 self
.run_logical_checks()
519 def run_checks(self
) -> tuple[str, Results
, dict[str, int]]:
520 """Run checks against the file."""
521 if self
.processor
is None or not self
.should_process
:
522 return self
.display_name
, self
.results
, self
.statistics
525 self
.run_ast_checks()
526 self
.process_tokens()
527 except (SyntaxError, tokenize
.TokenError
) as e
:
528 code
= "E902" if isinstance(e
, tokenize
.TokenError
) else "E999"
529 row
, column
= self
._extract
_syntax
_information
(e
)
530 self
.report(code
, row
, column
, f
"{type(e).__name__}: {e.args[0]}")
531 return self
.display_name
, self
.results
, self
.statistics
533 logical_lines
= self
.processor
.statistics
["logical lines"]
534 self
.statistics
["logical lines"] = logical_lines
535 return self
.display_name
, self
.results
, self
.statistics
537 def handle_newline(self
, token_type
: int) -> None:
538 """Handle the logic when encountering a newline token."""
539 assert self
.processor
is not None
540 if token_type
== tokenize
.NEWLINE
:
541 self
.run_logical_checks()
542 self
.processor
.reset_blank_before()
543 elif len(self
.processor
.tokens
) == 1:
544 # The physical line contains only this token.
545 self
.processor
.visited_new_blank_line()
546 self
.processor
.delete_first_token()
548 self
.run_logical_checks()
550 def check_physical_eol(
551 self
, token
: tokenize
.TokenInfo
, prev_physical
: str
553 """Run physical checks if and only if it is at the end of the line."""
554 assert self
.processor
is not None
555 if token
.type == FSTRING_START
: # pragma: >=3.12 cover
556 self
.processor
.fstring_start(token
.start
[0])
557 # a newline token ends a single physical line.
558 elif processor
.is_eol_token(token
):
559 # if the file does not end with a newline, the NEWLINE
560 # token is inserted by the parser, but it does not contain
561 # the previous physical line in `token[4]`
563 self
.run_physical_checks(prev_physical
)
565 self
.run_physical_checks(token
.line
)
566 elif processor
.is_multiline_string(token
):
567 # Less obviously, a string that contains newlines is a
568 # multiline string, either triple-quoted or with internal
569 # newlines backslash-escaped. Check every physical line in the
570 # string *except* for the last one: its newline is outside of
571 # the multiline string, so we consider it a regular physical
572 # line, and will check it like any other physical line.
575 # - have to wind self.line_number back because initially it
576 # points to the last line of the string, and we want
577 # check_physical() to give accurate feedback
578 for line
in self
.processor
.multiline_string(token
):
579 self
.run_physical_checks(line
)
582 def _try_initialize_processpool(
585 ) -> multiprocessing
.pool
.Pool |
None:
586 """Return a new process pool instance if we are able to create one."""
588 return multiprocessing
.Pool(job_count
, _mp_init
, initargs
=(argv
,))
589 except OSError as err
:
590 if err
.errno
not in SERIAL_RETRY_ERRNOS
:
599 offset
: int, mapping
: processor
._LogicalMapping
600 ) -> tuple[int, int]:
601 """Find the offset tuple for a single offset."""
602 if isinstance(offset
, tuple):
605 for token
in mapping
:
606 token_offset
= token
[0]
607 if offset
<= token_offset
:
612 offset
= token_offset
= 0
613 return (position
[0], position
[1] + offset
- token_offset
)