]>
Commit | Line | Data |
---|---|---|
53e6db90 DC |
1 | """ |
2 | Formatting many files at once via multiprocessing. Contains entrypoint and utilities. | |
3 | ||
4 | NOTE: this module is only imported if we need to format several files at once. | |
5 | """ | |
6 | ||
7 | import asyncio | |
8 | import logging | |
9 | import os | |
10 | import signal | |
11 | import sys | |
12 | from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor | |
13 | from multiprocessing import Manager | |
14 | from pathlib import Path | |
15 | from typing import Any, Iterable, Optional, Set | |
16 | ||
17 | from mypy_extensions import mypyc_attr | |
18 | ||
19 | from black import WriteBack, format_file_in_place | |
20 | from black.cache import Cache | |
21 | from black.mode import Mode | |
22 | from black.output import err | |
23 | from black.report import Changed, Report | |
24 | ||
25 | ||
26 | def maybe_install_uvloop() -> None: | |
27 | """If our environment has uvloop installed we use it. | |
28 | ||
29 | This is called only from command-line entry points to avoid | |
30 | interfering with the parent process if Black is used as a library. | |
31 | """ | |
32 | try: | |
33 | import uvloop | |
34 | ||
35 | uvloop.install() | |
36 | except ImportError: | |
37 | pass | |
38 | ||
39 | ||
40 | def cancel(tasks: Iterable["asyncio.Task[Any]"]) -> None: | |
41 | """asyncio signal handler that cancels all `tasks` and reports to stderr.""" | |
42 | err("Aborted!") | |
43 | for task in tasks: | |
44 | task.cancel() | |
45 | ||
46 | ||
47 | def shutdown(loop: asyncio.AbstractEventLoop) -> None: | |
48 | """Cancel all pending tasks on `loop`, wait for them, and close the loop.""" | |
49 | try: | |
50 | # This part is borrowed from asyncio/runners.py in Python 3.7b2. | |
51 | to_cancel = [task for task in asyncio.all_tasks(loop) if not task.done()] | |
52 | if not to_cancel: | |
53 | return | |
54 | ||
55 | for task in to_cancel: | |
56 | task.cancel() | |
57 | loop.run_until_complete(asyncio.gather(*to_cancel, return_exceptions=True)) | |
58 | finally: | |
59 | # `concurrent.futures.Future` objects cannot be cancelled once they | |
60 | # are already running. There might be some when the `shutdown()` happened. | |
61 | # Silence their logger's spew about the event loop being closed. | |
62 | cf_logger = logging.getLogger("concurrent.futures") | |
63 | cf_logger.setLevel(logging.CRITICAL) | |
64 | loop.close() | |
65 | ||
66 | ||
67 | # diff-shades depends on being to monkeypatch this function to operate. I know it's | |
68 | # not ideal, but this shouldn't cause any issues ... hopefully. ~ichard26 | |
69 | @mypyc_attr(patchable=True) | |
70 | def reformat_many( | |
71 | sources: Set[Path], | |
72 | fast: bool, | |
73 | write_back: WriteBack, | |
74 | mode: Mode, | |
75 | report: Report, | |
76 | workers: Optional[int], | |
77 | ) -> None: | |
78 | """Reformat multiple files using a ProcessPoolExecutor.""" | |
79 | maybe_install_uvloop() | |
80 | ||
81 | executor: Executor | |
82 | if workers is None: | |
83 | workers = int(os.environ.get("BLACK_NUM_WORKERS", 0)) | |
84 | workers = workers or os.cpu_count() or 1 | |
85 | if sys.platform == "win32": | |
86 | # Work around https://bugs.python.org/issue26903 | |
87 | workers = min(workers, 60) | |
88 | try: | |
89 | executor = ProcessPoolExecutor(max_workers=workers) | |
90 | except (ImportError, NotImplementedError, OSError): | |
91 | # we arrive here if the underlying system does not support multi-processing | |
92 | # like in AWS Lambda or Termux, in which case we gracefully fallback to | |
93 | # a ThreadPoolExecutor with just a single worker (more workers would not do us | |
94 | # any good due to the Global Interpreter Lock) | |
95 | executor = ThreadPoolExecutor(max_workers=1) | |
96 | ||
97 | loop = asyncio.new_event_loop() | |
98 | asyncio.set_event_loop(loop) | |
99 | try: | |
100 | loop.run_until_complete( | |
101 | schedule_formatting( | |
102 | sources=sources, | |
103 | fast=fast, | |
104 | write_back=write_back, | |
105 | mode=mode, | |
106 | report=report, | |
107 | loop=loop, | |
108 | executor=executor, | |
109 | ) | |
110 | ) | |
111 | finally: | |
112 | try: | |
113 | shutdown(loop) | |
114 | finally: | |
115 | asyncio.set_event_loop(None) | |
116 | if executor is not None: | |
117 | executor.shutdown() | |
118 | ||
119 | ||
120 | async def schedule_formatting( | |
121 | sources: Set[Path], | |
122 | fast: bool, | |
123 | write_back: WriteBack, | |
124 | mode: Mode, | |
125 | report: "Report", | |
126 | loop: asyncio.AbstractEventLoop, | |
127 | executor: "Executor", | |
128 | ) -> None: | |
129 | """Run formatting of `sources` in parallel using the provided `executor`. | |
130 | ||
131 | (Use ProcessPoolExecutors for actual parallelism.) | |
132 | ||
133 | `write_back`, `fast`, and `mode` options are passed to | |
134 | :func:`format_file_in_place`. | |
135 | """ | |
136 | cache = Cache.read(mode) | |
137 | if write_back not in (WriteBack.DIFF, WriteBack.COLOR_DIFF): | |
138 | sources, cached = cache.filtered_cached(sources) | |
139 | for src in sorted(cached): | |
140 | report.done(src, Changed.CACHED) | |
141 | if not sources: | |
142 | return | |
143 | ||
144 | cancelled = [] | |
145 | sources_to_cache = [] | |
146 | lock = None | |
147 | if write_back in (WriteBack.DIFF, WriteBack.COLOR_DIFF): | |
148 | # For diff output, we need locks to ensure we don't interleave output | |
149 | # from different processes. | |
150 | manager = Manager() | |
151 | lock = manager.Lock() | |
152 | tasks = { | |
153 | asyncio.ensure_future( | |
154 | loop.run_in_executor( | |
155 | executor, format_file_in_place, src, fast, mode, write_back, lock | |
156 | ) | |
157 | ): src | |
158 | for src in sorted(sources) | |
159 | } | |
160 | pending = tasks.keys() | |
161 | try: | |
162 | loop.add_signal_handler(signal.SIGINT, cancel, pending) | |
163 | loop.add_signal_handler(signal.SIGTERM, cancel, pending) | |
164 | except NotImplementedError: | |
165 | # There are no good alternatives for these on Windows. | |
166 | pass | |
167 | while pending: | |
168 | done, _ = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) | |
169 | for task in done: | |
170 | src = tasks.pop(task) | |
171 | if task.cancelled(): | |
172 | cancelled.append(task) | |
173 | elif task.exception(): | |
174 | report.failed(src, str(task.exception())) | |
175 | else: | |
176 | changed = Changed.YES if task.result() else Changed.NO | |
177 | # If the file was written back or was successfully checked as | |
178 | # well-formatted, store this information in the cache. | |
179 | if write_back is WriteBack.YES or ( | |
180 | write_back is WriteBack.CHECK and changed is Changed.NO | |
181 | ): | |
182 | sources_to_cache.append(src) | |
183 | report.done(src, changed) | |
184 | if cancelled: | |
185 | await asyncio.gather(*cancelled, return_exceptions=True) | |
186 | if sources_to_cache: | |
187 | cache.write(sources_to_cache) |