]>
Commit | Line | Data |
---|---|---|
53e6db90 DC |
1 | """Module containing our file processor that tokenizes a file for checks.""" |
2 | from __future__ import annotations | |
3 | ||
4 | import argparse | |
5 | import ast | |
6 | import logging | |
7 | import tokenize | |
8 | from typing import Any | |
9 | from typing import Generator | |
10 | from typing import List | |
11 | from typing import Tuple | |
12 | ||
13 | from flake8 import defaults | |
14 | from flake8 import utils | |
15 | from flake8._compat import FSTRING_END | |
16 | from flake8._compat import FSTRING_MIDDLE | |
17 | from flake8.plugins.finder import LoadedPlugin | |
18 | ||
19 | LOG = logging.getLogger(__name__) | |
20 | NEWLINE = frozenset([tokenize.NL, tokenize.NEWLINE]) | |
21 | ||
22 | SKIP_TOKENS = frozenset( | |
23 | [tokenize.NL, tokenize.NEWLINE, tokenize.INDENT, tokenize.DEDENT] | |
24 | ) | |
25 | ||
26 | _LogicalMapping = List[Tuple[int, Tuple[int, int]]] | |
27 | _Logical = Tuple[List[str], List[str], _LogicalMapping] | |
28 | ||
29 | ||
30 | class FileProcessor: | |
31 | """Processes a file and holds state. | |
32 | ||
33 | This processes a file by generating tokens, logical and physical lines, | |
34 | and AST trees. This also provides a way of passing state about the file | |
35 | to checks expecting that state. Any public attribute on this object can | |
36 | be requested by a plugin. The known public attributes are: | |
37 | ||
38 | - :attr:`blank_before` | |
39 | - :attr:`blank_lines` | |
40 | - :attr:`checker_state` | |
41 | - :attr:`indent_char` | |
42 | - :attr:`indent_level` | |
43 | - :attr:`line_number` | |
44 | - :attr:`logical_line` | |
45 | - :attr:`max_line_length` | |
46 | - :attr:`max_doc_length` | |
47 | - :attr:`multiline` | |
48 | - :attr:`noqa` | |
49 | - :attr:`previous_indent_level` | |
50 | - :attr:`previous_logical` | |
51 | - :attr:`previous_unindented_logical_line` | |
52 | - :attr:`tokens` | |
53 | - :attr:`file_tokens` | |
54 | - :attr:`total_lines` | |
55 | - :attr:`verbose` | |
56 | """ | |
57 | ||
58 | #: always ``False``, included for compatibility | |
59 | noqa = False | |
60 | ||
61 | def __init__( | |
62 | self, | |
63 | filename: str, | |
64 | options: argparse.Namespace, | |
65 | lines: list[str] | None = None, | |
66 | ) -> None: | |
67 | """Initialize our file processor. | |
68 | ||
69 | :param filename: Name of the file to process | |
70 | """ | |
71 | self.options = options | |
72 | self.filename = filename | |
73 | self.lines = lines if lines is not None else self.read_lines() | |
74 | self.strip_utf_bom() | |
75 | ||
76 | # Defaults for public attributes | |
77 | #: Number of preceding blank lines | |
78 | self.blank_before = 0 | |
79 | #: Number of blank lines | |
80 | self.blank_lines = 0 | |
81 | #: Checker states for each plugin? | |
82 | self._checker_states: dict[str, dict[Any, Any]] = {} | |
83 | #: Current checker state | |
84 | self.checker_state: dict[Any, Any] = {} | |
85 | #: User provided option for hang closing | |
86 | self.hang_closing = options.hang_closing | |
87 | #: Character used for indentation | |
88 | self.indent_char: str | None = None | |
89 | #: Current level of indentation | |
90 | self.indent_level = 0 | |
91 | #: Number of spaces used for indentation | |
92 | self.indent_size = options.indent_size | |
93 | #: Line number in the file | |
94 | self.line_number = 0 | |
95 | #: Current logical line | |
96 | self.logical_line = "" | |
97 | #: Maximum line length as configured by the user | |
98 | self.max_line_length = options.max_line_length | |
99 | #: Maximum docstring / comment line length as configured by the user | |
100 | self.max_doc_length = options.max_doc_length | |
101 | #: Whether the current physical line is multiline | |
102 | self.multiline = False | |
103 | #: Previous level of indentation | |
104 | self.previous_indent_level = 0 | |
105 | #: Previous logical line | |
106 | self.previous_logical = "" | |
107 | #: Previous unindented (i.e. top-level) logical line | |
108 | self.previous_unindented_logical_line = "" | |
109 | #: Current set of tokens | |
110 | self.tokens: list[tokenize.TokenInfo] = [] | |
111 | #: Total number of lines in the file | |
112 | self.total_lines = len(self.lines) | |
113 | #: Verbosity level of Flake8 | |
114 | self.verbose = options.verbose | |
115 | #: Statistics dictionary | |
116 | self.statistics = {"logical lines": 0} | |
117 | self._file_tokens: list[tokenize.TokenInfo] | None = None | |
118 | # map from line number to the line we'll search for `noqa` in | |
119 | self._noqa_line_mapping: dict[int, str] | None = None | |
120 | self._fstring_start = -1 | |
121 | ||
122 | @property | |
123 | def file_tokens(self) -> list[tokenize.TokenInfo]: | |
124 | """Return the complete set of tokens for a file.""" | |
125 | if self._file_tokens is None: | |
126 | line_iter = iter(self.lines) | |
127 | self._file_tokens = list( | |
128 | tokenize.generate_tokens(lambda: next(line_iter)) | |
129 | ) | |
130 | ||
131 | return self._file_tokens | |
132 | ||
133 | def fstring_start(self, lineno: int) -> None: | |
134 | """Signal the beginning of an fstring.""" | |
135 | self._fstring_start = lineno | |
136 | ||
137 | def multiline_string( | |
138 | self, token: tokenize.TokenInfo | |
139 | ) -> Generator[str, None, None]: | |
140 | """Iterate through the lines of a multiline string.""" | |
141 | if token.type == FSTRING_END: | |
142 | start = self._fstring_start | |
143 | else: | |
144 | start = token.start[0] | |
145 | ||
146 | self.multiline = True | |
147 | self.line_number = start | |
148 | # intentionally don't include the last line, that line will be | |
149 | # terminated later by a future end-of-line | |
150 | for _ in range(start, token.end[0]): | |
151 | yield self.lines[self.line_number - 1] | |
152 | self.line_number += 1 | |
153 | self.multiline = False | |
154 | ||
155 | def reset_blank_before(self) -> None: | |
156 | """Reset the blank_before attribute to zero.""" | |
157 | self.blank_before = 0 | |
158 | ||
159 | def delete_first_token(self) -> None: | |
160 | """Delete the first token in the list of tokens.""" | |
161 | del self.tokens[0] | |
162 | ||
163 | def visited_new_blank_line(self) -> None: | |
164 | """Note that we visited a new blank line.""" | |
165 | self.blank_lines += 1 | |
166 | ||
167 | def update_state(self, mapping: _LogicalMapping) -> None: | |
168 | """Update the indent level based on the logical line mapping.""" | |
169 | (start_row, start_col) = mapping[0][1] | |
170 | start_line = self.lines[start_row - 1] | |
171 | self.indent_level = expand_indent(start_line[:start_col]) | |
172 | if self.blank_before < self.blank_lines: | |
173 | self.blank_before = self.blank_lines | |
174 | ||
175 | def update_checker_state_for(self, plugin: LoadedPlugin) -> None: | |
176 | """Update the checker_state attribute for the plugin.""" | |
177 | if "checker_state" in plugin.parameters: | |
178 | self.checker_state = self._checker_states.setdefault( | |
179 | plugin.entry_name, {} | |
180 | ) | |
181 | ||
182 | def next_logical_line(self) -> None: | |
183 | """Record the previous logical line. | |
184 | ||
185 | This also resets the tokens list and the blank_lines count. | |
186 | """ | |
187 | if self.logical_line: | |
188 | self.previous_indent_level = self.indent_level | |
189 | self.previous_logical = self.logical_line | |
190 | if not self.indent_level: | |
191 | self.previous_unindented_logical_line = self.logical_line | |
192 | self.blank_lines = 0 | |
193 | self.tokens = [] | |
194 | ||
195 | def build_logical_line_tokens(self) -> _Logical: # noqa: C901 | |
196 | """Build the mapping, comments, and logical line lists.""" | |
197 | logical = [] | |
198 | comments = [] | |
199 | mapping: _LogicalMapping = [] | |
200 | length = 0 | |
201 | previous_row = previous_column = None | |
202 | for token_type, text, start, end, line in self.tokens: | |
203 | if token_type in SKIP_TOKENS: | |
204 | continue | |
205 | if not mapping: | |
206 | mapping = [(0, start)] | |
207 | if token_type == tokenize.COMMENT: | |
208 | comments.append(text) | |
209 | continue | |
210 | if token_type == tokenize.STRING: | |
211 | text = mutate_string(text) | |
212 | elif token_type == FSTRING_MIDDLE: | |
213 | text = "x" * len(text) | |
214 | if previous_row: | |
215 | (start_row, start_column) = start | |
216 | if previous_row != start_row: | |
217 | row_index = previous_row - 1 | |
218 | column_index = previous_column - 1 | |
219 | previous_text = self.lines[row_index][column_index] | |
220 | if previous_text == "," or ( | |
221 | previous_text not in "{[(" and text not in "}])" | |
222 | ): | |
223 | text = f" {text}" | |
224 | elif previous_column != start_column: | |
225 | text = line[previous_column:start_column] + text | |
226 | logical.append(text) | |
227 | length += len(text) | |
228 | mapping.append((length, end)) | |
229 | (previous_row, previous_column) = end | |
230 | return comments, logical, mapping | |
231 | ||
232 | def build_ast(self) -> ast.AST: | |
233 | """Build an abstract syntax tree from the list of lines.""" | |
234 | return ast.parse("".join(self.lines)) | |
235 | ||
236 | def build_logical_line(self) -> tuple[str, str, _LogicalMapping]: | |
237 | """Build a logical line from the current tokens list.""" | |
238 | comments, logical, mapping_list = self.build_logical_line_tokens() | |
239 | joined_comments = "".join(comments) | |
240 | self.logical_line = "".join(logical) | |
241 | self.statistics["logical lines"] += 1 | |
242 | return joined_comments, self.logical_line, mapping_list | |
243 | ||
244 | def keyword_arguments_for( | |
245 | self, | |
246 | parameters: dict[str, bool], | |
247 | arguments: dict[str, Any], | |
248 | ) -> dict[str, Any]: | |
249 | """Generate the keyword arguments for a list of parameters.""" | |
250 | ret = {} | |
251 | for param, required in parameters.items(): | |
252 | if param in arguments: | |
253 | continue | |
254 | try: | |
255 | ret[param] = getattr(self, param) | |
256 | except AttributeError: | |
257 | if required: | |
258 | raise | |
259 | else: | |
260 | LOG.warning( | |
261 | 'Plugin requested optional parameter "%s" ' | |
262 | "but this is not an available parameter.", | |
263 | param, | |
264 | ) | |
265 | return ret | |
266 | ||
267 | def generate_tokens(self) -> Generator[tokenize.TokenInfo, None, None]: | |
268 | """Tokenize the file and yield the tokens.""" | |
269 | for token in tokenize.generate_tokens(self.next_line): | |
270 | if token[2][0] > self.total_lines: | |
271 | break | |
272 | self.tokens.append(token) | |
273 | yield token | |
274 | ||
275 | def _noqa_line_range(self, min_line: int, max_line: int) -> dict[int, str]: | |
276 | line_range = range(min_line, max_line + 1) | |
277 | joined = "".join(self.lines[min_line - 1 : max_line]) | |
278 | return dict.fromkeys(line_range, joined) | |
279 | ||
280 | def noqa_line_for(self, line_number: int) -> str | None: | |
281 | """Retrieve the line which will be used to determine noqa.""" | |
282 | if self._noqa_line_mapping is None: | |
283 | try: | |
284 | file_tokens = self.file_tokens | |
285 | except (tokenize.TokenError, SyntaxError): | |
286 | # if we failed to parse the file tokens, we'll always fail in | |
287 | # the future, so set this so the code does not try again | |
288 | self._noqa_line_mapping = {} | |
289 | else: | |
290 | ret = {} | |
291 | ||
292 | min_line = len(self.lines) + 2 | |
293 | max_line = -1 | |
294 | for tp, _, (s_line, _), (e_line, _), _ in file_tokens: | |
295 | if tp == tokenize.ENDMARKER: | |
296 | break | |
297 | ||
298 | min_line = min(min_line, s_line) | |
299 | max_line = max(max_line, e_line) | |
300 | ||
301 | if tp in (tokenize.NL, tokenize.NEWLINE): | |
302 | ret.update(self._noqa_line_range(min_line, max_line)) | |
303 | ||
304 | min_line = len(self.lines) + 2 | |
305 | max_line = -1 | |
306 | ||
307 | # in newer versions of python, a `NEWLINE` token is inserted | |
308 | # at the end of the file even if it doesn't have one. | |
309 | # on old pythons, they will not have hit a `NEWLINE` | |
310 | if max_line != -1: | |
311 | ret.update(self._noqa_line_range(min_line, max_line)) | |
312 | ||
313 | self._noqa_line_mapping = ret | |
314 | ||
315 | # NOTE(sigmavirus24): Some plugins choose to report errors for empty | |
316 | # files on Line 1. In those cases, we shouldn't bother trying to | |
317 | # retrieve a physical line (since none exist). | |
318 | return self._noqa_line_mapping.get(line_number) | |
319 | ||
320 | def next_line(self) -> str: | |
321 | """Get the next line from the list.""" | |
322 | if self.line_number >= self.total_lines: | |
323 | return "" | |
324 | line = self.lines[self.line_number] | |
325 | self.line_number += 1 | |
326 | if self.indent_char is None and line[:1] in defaults.WHITESPACE: | |
327 | self.indent_char = line[0] | |
328 | return line | |
329 | ||
330 | def read_lines(self) -> list[str]: | |
331 | """Read the lines for this file checker.""" | |
332 | if self.filename == "-": | |
333 | self.filename = self.options.stdin_display_name or "stdin" | |
334 | lines = self.read_lines_from_stdin() | |
335 | else: | |
336 | lines = self.read_lines_from_filename() | |
337 | return lines | |
338 | ||
339 | def read_lines_from_filename(self) -> list[str]: | |
340 | """Read the lines for a file.""" | |
341 | try: | |
342 | with tokenize.open(self.filename) as fd: | |
343 | return fd.readlines() | |
344 | except (SyntaxError, UnicodeError): | |
345 | # If we can't detect the codec with tokenize.detect_encoding, or | |
346 | # the detected encoding is incorrect, just fallback to latin-1. | |
347 | with open(self.filename, encoding="latin-1") as fd: | |
348 | return fd.readlines() | |
349 | ||
350 | def read_lines_from_stdin(self) -> list[str]: | |
351 | """Read the lines from standard in.""" | |
352 | return utils.stdin_get_lines() | |
353 | ||
354 | def should_ignore_file(self) -> bool: | |
355 | """Check if ``flake8: noqa`` is in the file to be ignored. | |
356 | ||
357 | :returns: | |
358 | True if a line matches :attr:`defaults.NOQA_FILE`, | |
359 | otherwise False | |
360 | """ | |
361 | if not self.options.disable_noqa and any( | |
362 | defaults.NOQA_FILE.match(line) for line in self.lines | |
363 | ): | |
364 | return True | |
365 | elif any(defaults.NOQA_FILE.search(line) for line in self.lines): | |
366 | LOG.warning( | |
367 | "Detected `flake8: noqa` on line with code. To ignore an " | |
368 | "error on a line use `noqa` instead." | |
369 | ) | |
370 | return False | |
371 | else: | |
372 | return False | |
373 | ||
374 | def strip_utf_bom(self) -> None: | |
375 | """Strip the UTF bom from the lines of the file.""" | |
376 | if not self.lines: | |
377 | # If we have nothing to analyze quit early | |
378 | return | |
379 | ||
380 | first_byte = ord(self.lines[0][0]) | |
381 | if first_byte not in (0xEF, 0xFEFF): | |
382 | return | |
383 | ||
384 | # If the first byte of the file is a UTF-8 BOM, strip it | |
385 | if first_byte == 0xFEFF: | |
386 | self.lines[0] = self.lines[0][1:] | |
387 | elif self.lines[0][:3] == "\xEF\xBB\xBF": | |
388 | self.lines[0] = self.lines[0][3:] | |
389 | ||
390 | ||
391 | def is_eol_token(token: tokenize.TokenInfo) -> bool: | |
392 | """Check if the token is an end-of-line token.""" | |
393 | return token[0] in NEWLINE or token[4][token[3][1] :].lstrip() == "\\\n" | |
394 | ||
395 | ||
396 | def is_multiline_string(token: tokenize.TokenInfo) -> bool: | |
397 | """Check if this is a multiline string.""" | |
398 | return token.type == FSTRING_END or ( | |
399 | token.type == tokenize.STRING and "\n" in token.string | |
400 | ) | |
401 | ||
402 | ||
403 | def token_is_newline(token: tokenize.TokenInfo) -> bool: | |
404 | """Check if the token type is a newline token type.""" | |
405 | return token[0] in NEWLINE | |
406 | ||
407 | ||
408 | def count_parentheses(current_parentheses_count: int, token_text: str) -> int: | |
409 | """Count the number of parentheses.""" | |
410 | if token_text in "([{": # nosec | |
411 | return current_parentheses_count + 1 | |
412 | elif token_text in "}])": # nosec | |
413 | return current_parentheses_count - 1 | |
414 | return current_parentheses_count | |
415 | ||
416 | ||
417 | def expand_indent(line: str) -> int: | |
418 | r"""Return the amount of indentation. | |
419 | ||
420 | Tabs are expanded to the next multiple of 8. | |
421 | ||
422 | >>> expand_indent(' ') | |
423 | 4 | |
424 | >>> expand_indent('\t') | |
425 | 8 | |
426 | >>> expand_indent(' \t') | |
427 | 8 | |
428 | >>> expand_indent(' \t') | |
429 | 16 | |
430 | """ | |
431 | return len(line.expandtabs(8)) | |
432 | ||
433 | ||
434 | # NOTE(sigmavirus24): This was taken wholesale from | |
435 | # https://github.com/PyCQA/pycodestyle. The in-line comments were edited to be | |
436 | # more descriptive. | |
437 | def mutate_string(text: str) -> str: | |
438 | """Replace contents with 'xxx' to prevent syntax matching. | |
439 | ||
440 | >>> mutate_string('"abc"') | |
441 | '"xxx"' | |
442 | >>> mutate_string("'''abc'''") | |
443 | "'''xxx'''" | |
444 | >>> mutate_string("r'abc'") | |
445 | "r'xxx'" | |
446 | """ | |
447 | # NOTE(sigmavirus24): If there are string modifiers (e.g., b, u, r) | |
448 | # use the last "character" to determine if we're using single or double | |
449 | # quotes and then find the first instance of it | |
450 | start = text.index(text[-1]) + 1 | |
451 | end = len(text) - 1 | |
452 | # Check for triple-quoted strings | |
453 | if text[-3:] in ('"""', "'''"): | |
454 | start += 2 | |
455 | end -= 2 | |
456 | return text[:start] + "x" * (end - start) + text[end:] |