]>
Commit | Line | Data |
---|---|---|
1 | """ Meager code path measurement tool. | |
2 | Ned Batchelder | |
3 | http://nedbatchelder.com/blog/200803/python_code_complexity_microtool.html | |
4 | MIT License. | |
5 | """ | |
6 | from __future__ import with_statement | |
7 | ||
8 | import optparse | |
9 | import sys | |
10 | import tokenize | |
11 | ||
12 | from collections import defaultdict | |
13 | try: | |
14 | import ast | |
15 | from ast import iter_child_nodes | |
16 | except ImportError: # Python 2.5 | |
17 | from flake8.util import ast, iter_child_nodes | |
18 | ||
19 | __version__ = '0.7.0' | |
20 | ||
21 | ||
22 | class ASTVisitor(object): | |
23 | """Performs a depth-first walk of the AST.""" | |
24 | ||
25 | def __init__(self): | |
26 | self.node = None | |
27 | self._cache = {} | |
28 | ||
29 | def default(self, node, *args): | |
30 | for child in iter_child_nodes(node): | |
31 | self.dispatch(child, *args) | |
32 | ||
33 | def dispatch(self, node, *args): | |
34 | self.node = node | |
35 | klass = node.__class__ | |
36 | meth = self._cache.get(klass) | |
37 | if meth is None: | |
38 | className = klass.__name__ | |
39 | meth = getattr(self.visitor, 'visit' + className, self.default) | |
40 | self._cache[klass] = meth | |
41 | return meth(node, *args) | |
42 | ||
43 | def preorder(self, tree, visitor, *args): | |
44 | """Do preorder walk of tree using visitor""" | |
45 | self.visitor = visitor | |
46 | visitor.visit = self.dispatch | |
47 | self.dispatch(tree, *args) # XXX *args make sense? | |
48 | ||
49 | ||
50 | class PathNode(object): | |
51 | def __init__(self, name, look="circle"): | |
52 | self.name = name | |
53 | self.look = look | |
54 | ||
55 | def to_dot(self): | |
56 | print('node [shape=%s,label="%s"] %d;' % ( | |
57 | self.look, self.name, self.dot_id())) | |
58 | ||
59 | def dot_id(self): | |
60 | return id(self) | |
61 | ||
62 | ||
63 | class PathGraph(object): | |
64 | def __init__(self, name, entity, lineno, column=0): | |
65 | self.name = name | |
66 | self.entity = entity | |
67 | self.lineno = lineno | |
68 | self.column = column | |
69 | self.nodes = defaultdict(list) | |
70 | ||
71 | def connect(self, n1, n2): | |
72 | self.nodes[n1].append(n2) | |
73 | # Ensure that the destination node is always counted. | |
74 | self.nodes[n2] = [] | |
75 | ||
76 | def to_dot(self): | |
77 | print('subgraph {') | |
78 | for node in self.nodes: | |
79 | node.to_dot() | |
80 | for node, nexts in self.nodes.items(): | |
81 | for next in nexts: | |
82 | print('%s -- %s;' % (node.dot_id(), next.dot_id())) | |
83 | print('}') | |
84 | ||
85 | def complexity(self): | |
86 | """ Return the McCabe complexity for the graph. | |
87 | V-E+2 | |
88 | """ | |
89 | num_edges = sum([len(n) for n in self.nodes.values()]) | |
90 | num_nodes = len(self.nodes) | |
91 | return num_edges - num_nodes + 2 | |
92 | ||
93 | ||
94 | class PathGraphingAstVisitor(ASTVisitor): | |
95 | """ A visitor for a parsed Abstract Syntax Tree which finds executable | |
96 | statements. | |
97 | """ | |
98 | ||
99 | def __init__(self): | |
100 | super(PathGraphingAstVisitor, self).__init__() | |
101 | self.classname = "" | |
102 | self.graphs = {} | |
103 | self.reset() | |
104 | ||
105 | def reset(self): | |
106 | self.graph = None | |
107 | self.tail = None | |
108 | ||
109 | def dispatch_list(self, node_list): | |
110 | for node in node_list: | |
111 | self.dispatch(node) | |
112 | ||
113 | def visitFunctionDef(self, node): | |
114 | ||
115 | if self.classname: | |
116 | entity = '%s%s' % (self.classname, node.name) | |
117 | else: | |
118 | entity = node.name | |
119 | ||
120 | name = '%d:%d: %r' % (node.lineno, node.col_offset, entity) | |
121 | ||
122 | if self.graph is not None: | |
123 | # closure | |
124 | pathnode = self.appendPathNode(name) | |
125 | self.tail = pathnode | |
126 | self.dispatch_list(node.body) | |
127 | bottom = PathNode("", look='point') | |
128 | self.graph.connect(self.tail, bottom) | |
129 | self.graph.connect(pathnode, bottom) | |
130 | self.tail = bottom | |
131 | else: | |
132 | self.graph = PathGraph(name, entity, node.lineno, node.col_offset) | |
133 | pathnode = PathNode(name) | |
134 | self.tail = pathnode | |
135 | self.dispatch_list(node.body) | |
136 | self.graphs["%s%s" % (self.classname, node.name)] = self.graph | |
137 | self.reset() | |
138 | ||
139 | visitAsyncFunctionDef = visitFunctionDef | |
140 | ||
141 | def visitClassDef(self, node): | |
142 | old_classname = self.classname | |
143 | self.classname += node.name + "." | |
144 | self.dispatch_list(node.body) | |
145 | self.classname = old_classname | |
146 | ||
147 | def appendPathNode(self, name): | |
148 | if not self.tail: | |
149 | return | |
150 | pathnode = PathNode(name) | |
151 | self.graph.connect(self.tail, pathnode) | |
152 | self.tail = pathnode | |
153 | return pathnode | |
154 | ||
155 | def visitSimpleStatement(self, node): | |
156 | if node.lineno is None: | |
157 | lineno = 0 | |
158 | else: | |
159 | lineno = node.lineno | |
160 | name = "Stmt %d" % lineno | |
161 | self.appendPathNode(name) | |
162 | ||
163 | def default(self, node, *args): | |
164 | if isinstance(node, ast.stmt): | |
165 | self.visitSimpleStatement(node) | |
166 | else: | |
167 | super(PathGraphingAstVisitor, self).default(node, *args) | |
168 | ||
169 | def visitLoop(self, node): | |
170 | name = "Loop %d" % node.lineno | |
171 | self._subgraph(node, name) | |
172 | ||
173 | visitAsyncFor = visitFor = visitWhile = visitLoop | |
174 | ||
175 | def visitIf(self, node): | |
176 | name = "If %d" % node.lineno | |
177 | self._subgraph(node, name) | |
178 | ||
179 | def _subgraph(self, node, name, extra_blocks=()): | |
180 | """create the subgraphs representing any `if` and `for` statements""" | |
181 | if self.graph is None: | |
182 | # global loop | |
183 | self.graph = PathGraph(name, name, node.lineno, node.col_offset) | |
184 | pathnode = PathNode(name) | |
185 | self._subgraph_parse(node, pathnode, extra_blocks) | |
186 | self.graphs["%s%s" % (self.classname, name)] = self.graph | |
187 | self.reset() | |
188 | else: | |
189 | pathnode = self.appendPathNode(name) | |
190 | self._subgraph_parse(node, pathnode, extra_blocks) | |
191 | ||
192 | def _subgraph_parse(self, node, pathnode, extra_blocks): | |
193 | """parse the body and any `else` block of `if` and `for` statements""" | |
194 | loose_ends = [] | |
195 | self.tail = pathnode | |
196 | self.dispatch_list(node.body) | |
197 | loose_ends.append(self.tail) | |
198 | for extra in extra_blocks: | |
199 | self.tail = pathnode | |
200 | self.dispatch_list(extra.body) | |
201 | loose_ends.append(self.tail) | |
202 | if node.orelse: | |
203 | self.tail = pathnode | |
204 | self.dispatch_list(node.orelse) | |
205 | loose_ends.append(self.tail) | |
206 | else: | |
207 | loose_ends.append(pathnode) | |
208 | if pathnode: | |
209 | bottom = PathNode("", look='point') | |
210 | for le in loose_ends: | |
211 | self.graph.connect(le, bottom) | |
212 | self.tail = bottom | |
213 | ||
214 | def visitTryExcept(self, node): | |
215 | name = "TryExcept %d" % node.lineno | |
216 | self._subgraph(node, name, extra_blocks=node.handlers) | |
217 | ||
218 | visitTry = visitTryExcept | |
219 | ||
220 | def visitWith(self, node): | |
221 | name = "With %d" % node.lineno | |
222 | self.appendPathNode(name) | |
223 | self.dispatch_list(node.body) | |
224 | ||
225 | visitAsyncWith = visitWith | |
226 | ||
227 | ||
228 | class McCabeChecker(object): | |
229 | """McCabe cyclomatic complexity checker.""" | |
230 | name = 'mccabe' | |
231 | version = __version__ | |
232 | _code = 'C901' | |
233 | _error_tmpl = "C901 %r is too complex (%d)" | |
234 | max_complexity = -1 | |
235 | ||
236 | def __init__(self, tree, filename): | |
237 | self.tree = tree | |
238 | ||
239 | @classmethod | |
240 | def add_options(cls, parser): | |
241 | flag = '--max-complexity' | |
242 | kwargs = { | |
243 | 'default': -1, | |
244 | 'action': 'store', | |
245 | 'type': int, | |
246 | 'help': 'McCabe complexity threshold', | |
247 | 'parse_from_config': 'True', | |
248 | } | |
249 | config_opts = getattr(parser, 'config_options', None) | |
250 | if isinstance(config_opts, list): | |
251 | # Flake8 2.x | |
252 | kwargs.pop('parse_from_config') | |
253 | parser.add_option(flag, **kwargs) | |
254 | parser.config_options.append('max-complexity') | |
255 | else: | |
256 | parser.add_option(flag, **kwargs) | |
257 | ||
258 | @classmethod | |
259 | def parse_options(cls, options): | |
260 | cls.max_complexity = int(options.max_complexity) | |
261 | ||
262 | def run(self): | |
263 | if self.max_complexity < 0: | |
264 | return | |
265 | visitor = PathGraphingAstVisitor() | |
266 | visitor.preorder(self.tree, visitor) | |
267 | for graph in visitor.graphs.values(): | |
268 | if graph.complexity() > self.max_complexity: | |
269 | text = self._error_tmpl % (graph.entity, graph.complexity()) | |
270 | yield graph.lineno, graph.column, text, type(self) | |
271 | ||
272 | ||
273 | def get_code_complexity(code, threshold=7, filename='stdin'): | |
274 | try: | |
275 | tree = compile(code, filename, "exec", ast.PyCF_ONLY_AST) | |
276 | except SyntaxError: | |
277 | e = sys.exc_info()[1] | |
278 | sys.stderr.write("Unable to parse %s: %s\n" % (filename, e)) | |
279 | return 0 | |
280 | ||
281 | complx = [] | |
282 | McCabeChecker.max_complexity = threshold | |
283 | for lineno, offset, text, check in McCabeChecker(tree, filename).run(): | |
284 | complx.append('%s:%d:1: %s' % (filename, lineno, text)) | |
285 | ||
286 | if len(complx) == 0: | |
287 | return 0 | |
288 | print('\n'.join(complx)) | |
289 | return len(complx) | |
290 | ||
291 | ||
292 | def get_module_complexity(module_path, threshold=7): | |
293 | """Returns the complexity of a module""" | |
294 | code = _read(module_path) | |
295 | return get_code_complexity(code, threshold, filename=module_path) | |
296 | ||
297 | ||
298 | def _read(filename): | |
299 | if (2, 5) < sys.version_info < (3, 0): | |
300 | with open(filename, 'rU') as f: | |
301 | return f.read() | |
302 | elif (3, 0) <= sys.version_info < (4, 0): | |
303 | """Read the source code.""" | |
304 | try: | |
305 | with open(filename, 'rb') as f: | |
306 | (encoding, _) = tokenize.detect_encoding(f.readline) | |
307 | except (LookupError, SyntaxError, UnicodeError): | |
308 | # Fall back if file encoding is improperly declared | |
309 | with open(filename, encoding='latin-1') as f: | |
310 | return f.read() | |
311 | with open(filename, 'r', encoding=encoding) as f: | |
312 | return f.read() | |
313 | ||
314 | ||
315 | def main(argv=None): | |
316 | if argv is None: | |
317 | argv = sys.argv[1:] | |
318 | opar = optparse.OptionParser() | |
319 | opar.add_option("-d", "--dot", dest="dot", | |
320 | help="output a graphviz dot file", action="store_true") | |
321 | opar.add_option("-m", "--min", dest="threshold", | |
322 | help="minimum complexity for output", type="int", | |
323 | default=1) | |
324 | ||
325 | options, args = opar.parse_args(argv) | |
326 | ||
327 | code = _read(args[0]) | |
328 | tree = compile(code, args[0], "exec", ast.PyCF_ONLY_AST) | |
329 | visitor = PathGraphingAstVisitor() | |
330 | visitor.preorder(tree, visitor) | |
331 | ||
332 | if options.dot: | |
333 | print('graph {') | |
334 | for graph in visitor.graphs.values(): | |
335 | if (not options.threshold or | |
336 | graph.complexity() >= options.threshold): | |
337 | graph.to_dot() | |
338 | print('}') | |
339 | else: | |
340 | for graph in visitor.graphs.values(): | |
341 | if graph.complexity() >= options.threshold: | |
342 | print(graph.name, graph.complexity()) | |
343 | ||
344 | ||
345 | if __name__ == '__main__': | |
346 | main(sys.argv[1:]) |