]>
Commit | Line | Data |
---|---|---|
1 | import codecs | |
2 | import io | |
3 | import os | |
4 | import re | |
5 | import sys | |
6 | import typing as t | |
7 | from weakref import WeakKeyDictionary | |
8 | ||
9 | CYGWIN = sys.platform.startswith("cygwin") | |
10 | WIN = sys.platform.startswith("win") | |
11 | auto_wrap_for_ansi: t.Optional[t.Callable[[t.TextIO], t.TextIO]] = None | |
12 | _ansi_re = re.compile(r"\033\[[;?0-9]*[a-zA-Z]") | |
13 | ||
14 | ||
15 | def _make_text_stream( | |
16 | stream: t.BinaryIO, | |
17 | encoding: t.Optional[str], | |
18 | errors: t.Optional[str], | |
19 | force_readable: bool = False, | |
20 | force_writable: bool = False, | |
21 | ) -> t.TextIO: | |
22 | if encoding is None: | |
23 | encoding = get_best_encoding(stream) | |
24 | if errors is None: | |
25 | errors = "replace" | |
26 | return _NonClosingTextIOWrapper( | |
27 | stream, | |
28 | encoding, | |
29 | errors, | |
30 | line_buffering=True, | |
31 | force_readable=force_readable, | |
32 | force_writable=force_writable, | |
33 | ) | |
34 | ||
35 | ||
36 | def is_ascii_encoding(encoding: str) -> bool: | |
37 | """Checks if a given encoding is ascii.""" | |
38 | try: | |
39 | return codecs.lookup(encoding).name == "ascii" | |
40 | except LookupError: | |
41 | return False | |
42 | ||
43 | ||
44 | def get_best_encoding(stream: t.IO[t.Any]) -> str: | |
45 | """Returns the default stream encoding if not found.""" | |
46 | rv = getattr(stream, "encoding", None) or sys.getdefaultencoding() | |
47 | if is_ascii_encoding(rv): | |
48 | return "utf-8" | |
49 | return rv | |
50 | ||
51 | ||
52 | class _NonClosingTextIOWrapper(io.TextIOWrapper): | |
53 | def __init__( | |
54 | self, | |
55 | stream: t.BinaryIO, | |
56 | encoding: t.Optional[str], | |
57 | errors: t.Optional[str], | |
58 | force_readable: bool = False, | |
59 | force_writable: bool = False, | |
60 | **extra: t.Any, | |
61 | ) -> None: | |
62 | self._stream = stream = t.cast( | |
63 | t.BinaryIO, _FixupStream(stream, force_readable, force_writable) | |
64 | ) | |
65 | super().__init__(stream, encoding, errors, **extra) | |
66 | ||
67 | def __del__(self) -> None: | |
68 | try: | |
69 | self.detach() | |
70 | except Exception: | |
71 | pass | |
72 | ||
73 | def isatty(self) -> bool: | |
74 | # https://bitbucket.org/pypy/pypy/issue/1803 | |
75 | return self._stream.isatty() | |
76 | ||
77 | ||
78 | class _FixupStream: | |
79 | """The new io interface needs more from streams than streams | |
80 | traditionally implement. As such, this fix-up code is necessary in | |
81 | some circumstances. | |
82 | ||
83 | The forcing of readable and writable flags are there because some tools | |
84 | put badly patched objects on sys (one such offender are certain version | |
85 | of jupyter notebook). | |
86 | """ | |
87 | ||
88 | def __init__( | |
89 | self, | |
90 | stream: t.BinaryIO, | |
91 | force_readable: bool = False, | |
92 | force_writable: bool = False, | |
93 | ): | |
94 | self._stream = stream | |
95 | self._force_readable = force_readable | |
96 | self._force_writable = force_writable | |
97 | ||
98 | def __getattr__(self, name: str) -> t.Any: | |
99 | return getattr(self._stream, name) | |
100 | ||
101 | def read1(self, size: int) -> bytes: | |
102 | f = getattr(self._stream, "read1", None) | |
103 | ||
104 | if f is not None: | |
105 | return t.cast(bytes, f(size)) | |
106 | ||
107 | return self._stream.read(size) | |
108 | ||
109 | def readable(self) -> bool: | |
110 | if self._force_readable: | |
111 | return True | |
112 | x = getattr(self._stream, "readable", None) | |
113 | if x is not None: | |
114 | return t.cast(bool, x()) | |
115 | try: | |
116 | self._stream.read(0) | |
117 | except Exception: | |
118 | return False | |
119 | return True | |
120 | ||
121 | def writable(self) -> bool: | |
122 | if self._force_writable: | |
123 | return True | |
124 | x = getattr(self._stream, "writable", None) | |
125 | if x is not None: | |
126 | return t.cast(bool, x()) | |
127 | try: | |
128 | self._stream.write("") # type: ignore | |
129 | except Exception: | |
130 | try: | |
131 | self._stream.write(b"") | |
132 | except Exception: | |
133 | return False | |
134 | return True | |
135 | ||
136 | def seekable(self) -> bool: | |
137 | x = getattr(self._stream, "seekable", None) | |
138 | if x is not None: | |
139 | return t.cast(bool, x()) | |
140 | try: | |
141 | self._stream.seek(self._stream.tell()) | |
142 | except Exception: | |
143 | return False | |
144 | return True | |
145 | ||
146 | ||
147 | def _is_binary_reader(stream: t.IO[t.Any], default: bool = False) -> bool: | |
148 | try: | |
149 | return isinstance(stream.read(0), bytes) | |
150 | except Exception: | |
151 | return default | |
152 | # This happens in some cases where the stream was already | |
153 | # closed. In this case, we assume the default. | |
154 | ||
155 | ||
156 | def _is_binary_writer(stream: t.IO[t.Any], default: bool = False) -> bool: | |
157 | try: | |
158 | stream.write(b"") | |
159 | except Exception: | |
160 | try: | |
161 | stream.write("") | |
162 | return False | |
163 | except Exception: | |
164 | pass | |
165 | return default | |
166 | return True | |
167 | ||
168 | ||
169 | def _find_binary_reader(stream: t.IO[t.Any]) -> t.Optional[t.BinaryIO]: | |
170 | # We need to figure out if the given stream is already binary. | |
171 | # This can happen because the official docs recommend detaching | |
172 | # the streams to get binary streams. Some code might do this, so | |
173 | # we need to deal with this case explicitly. | |
174 | if _is_binary_reader(stream, False): | |
175 | return t.cast(t.BinaryIO, stream) | |
176 | ||
177 | buf = getattr(stream, "buffer", None) | |
178 | ||
179 | # Same situation here; this time we assume that the buffer is | |
180 | # actually binary in case it's closed. | |
181 | if buf is not None and _is_binary_reader(buf, True): | |
182 | return t.cast(t.BinaryIO, buf) | |
183 | ||
184 | return None | |
185 | ||
186 | ||
187 | def _find_binary_writer(stream: t.IO[t.Any]) -> t.Optional[t.BinaryIO]: | |
188 | # We need to figure out if the given stream is already binary. | |
189 | # This can happen because the official docs recommend detaching | |
190 | # the streams to get binary streams. Some code might do this, so | |
191 | # we need to deal with this case explicitly. | |
192 | if _is_binary_writer(stream, False): | |
193 | return t.cast(t.BinaryIO, stream) | |
194 | ||
195 | buf = getattr(stream, "buffer", None) | |
196 | ||
197 | # Same situation here; this time we assume that the buffer is | |
198 | # actually binary in case it's closed. | |
199 | if buf is not None and _is_binary_writer(buf, True): | |
200 | return t.cast(t.BinaryIO, buf) | |
201 | ||
202 | return None | |
203 | ||
204 | ||
205 | def _stream_is_misconfigured(stream: t.TextIO) -> bool: | |
206 | """A stream is misconfigured if its encoding is ASCII.""" | |
207 | # If the stream does not have an encoding set, we assume it's set | |
208 | # to ASCII. This appears to happen in certain unittest | |
209 | # environments. It's not quite clear what the correct behavior is | |
210 | # but this at least will force Click to recover somehow. | |
211 | return is_ascii_encoding(getattr(stream, "encoding", None) or "ascii") | |
212 | ||
213 | ||
214 | def _is_compat_stream_attr(stream: t.TextIO, attr: str, value: t.Optional[str]) -> bool: | |
215 | """A stream attribute is compatible if it is equal to the | |
216 | desired value or the desired value is unset and the attribute | |
217 | has a value. | |
218 | """ | |
219 | stream_value = getattr(stream, attr, None) | |
220 | return stream_value == value or (value is None and stream_value is not None) | |
221 | ||
222 | ||
223 | def _is_compatible_text_stream( | |
224 | stream: t.TextIO, encoding: t.Optional[str], errors: t.Optional[str] | |
225 | ) -> bool: | |
226 | """Check if a stream's encoding and errors attributes are | |
227 | compatible with the desired values. | |
228 | """ | |
229 | return _is_compat_stream_attr( | |
230 | stream, "encoding", encoding | |
231 | ) and _is_compat_stream_attr(stream, "errors", errors) | |
232 | ||
233 | ||
234 | def _force_correct_text_stream( | |
235 | text_stream: t.IO[t.Any], | |
236 | encoding: t.Optional[str], | |
237 | errors: t.Optional[str], | |
238 | is_binary: t.Callable[[t.IO[t.Any], bool], bool], | |
239 | find_binary: t.Callable[[t.IO[t.Any]], t.Optional[t.BinaryIO]], | |
240 | force_readable: bool = False, | |
241 | force_writable: bool = False, | |
242 | ) -> t.TextIO: | |
243 | if is_binary(text_stream, False): | |
244 | binary_reader = t.cast(t.BinaryIO, text_stream) | |
245 | else: | |
246 | text_stream = t.cast(t.TextIO, text_stream) | |
247 | # If the stream looks compatible, and won't default to a | |
248 | # misconfigured ascii encoding, return it as-is. | |
249 | if _is_compatible_text_stream(text_stream, encoding, errors) and not ( | |
250 | encoding is None and _stream_is_misconfigured(text_stream) | |
251 | ): | |
252 | return text_stream | |
253 | ||
254 | # Otherwise, get the underlying binary reader. | |
255 | possible_binary_reader = find_binary(text_stream) | |
256 | ||
257 | # If that's not possible, silently use the original reader | |
258 | # and get mojibake instead of exceptions. | |
259 | if possible_binary_reader is None: | |
260 | return text_stream | |
261 | ||
262 | binary_reader = possible_binary_reader | |
263 | ||
264 | # Default errors to replace instead of strict in order to get | |
265 | # something that works. | |
266 | if errors is None: | |
267 | errors = "replace" | |
268 | ||
269 | # Wrap the binary stream in a text stream with the correct | |
270 | # encoding parameters. | |
271 | return _make_text_stream( | |
272 | binary_reader, | |
273 | encoding, | |
274 | errors, | |
275 | force_readable=force_readable, | |
276 | force_writable=force_writable, | |
277 | ) | |
278 | ||
279 | ||
280 | def _force_correct_text_reader( | |
281 | text_reader: t.IO[t.Any], | |
282 | encoding: t.Optional[str], | |
283 | errors: t.Optional[str], | |
284 | force_readable: bool = False, | |
285 | ) -> t.TextIO: | |
286 | return _force_correct_text_stream( | |
287 | text_reader, | |
288 | encoding, | |
289 | errors, | |
290 | _is_binary_reader, | |
291 | _find_binary_reader, | |
292 | force_readable=force_readable, | |
293 | ) | |
294 | ||
295 | ||
296 | def _force_correct_text_writer( | |
297 | text_writer: t.IO[t.Any], | |
298 | encoding: t.Optional[str], | |
299 | errors: t.Optional[str], | |
300 | force_writable: bool = False, | |
301 | ) -> t.TextIO: | |
302 | return _force_correct_text_stream( | |
303 | text_writer, | |
304 | encoding, | |
305 | errors, | |
306 | _is_binary_writer, | |
307 | _find_binary_writer, | |
308 | force_writable=force_writable, | |
309 | ) | |
310 | ||
311 | ||
312 | def get_binary_stdin() -> t.BinaryIO: | |
313 | reader = _find_binary_reader(sys.stdin) | |
314 | if reader is None: | |
315 | raise RuntimeError("Was not able to determine binary stream for sys.stdin.") | |
316 | return reader | |
317 | ||
318 | ||
319 | def get_binary_stdout() -> t.BinaryIO: | |
320 | writer = _find_binary_writer(sys.stdout) | |
321 | if writer is None: | |
322 | raise RuntimeError("Was not able to determine binary stream for sys.stdout.") | |
323 | return writer | |
324 | ||
325 | ||
326 | def get_binary_stderr() -> t.BinaryIO: | |
327 | writer = _find_binary_writer(sys.stderr) | |
328 | if writer is None: | |
329 | raise RuntimeError("Was not able to determine binary stream for sys.stderr.") | |
330 | return writer | |
331 | ||
332 | ||
333 | def get_text_stdin( | |
334 | encoding: t.Optional[str] = None, errors: t.Optional[str] = None | |
335 | ) -> t.TextIO: | |
336 | rv = _get_windows_console_stream(sys.stdin, encoding, errors) | |
337 | if rv is not None: | |
338 | return rv | |
339 | return _force_correct_text_reader(sys.stdin, encoding, errors, force_readable=True) | |
340 | ||
341 | ||
342 | def get_text_stdout( | |
343 | encoding: t.Optional[str] = None, errors: t.Optional[str] = None | |
344 | ) -> t.TextIO: | |
345 | rv = _get_windows_console_stream(sys.stdout, encoding, errors) | |
346 | if rv is not None: | |
347 | return rv | |
348 | return _force_correct_text_writer(sys.stdout, encoding, errors, force_writable=True) | |
349 | ||
350 | ||
351 | def get_text_stderr( | |
352 | encoding: t.Optional[str] = None, errors: t.Optional[str] = None | |
353 | ) -> t.TextIO: | |
354 | rv = _get_windows_console_stream(sys.stderr, encoding, errors) | |
355 | if rv is not None: | |
356 | return rv | |
357 | return _force_correct_text_writer(sys.stderr, encoding, errors, force_writable=True) | |
358 | ||
359 | ||
360 | def _wrap_io_open( | |
361 | file: t.Union[str, "os.PathLike[str]", int], | |
362 | mode: str, | |
363 | encoding: t.Optional[str], | |
364 | errors: t.Optional[str], | |
365 | ) -> t.IO[t.Any]: | |
366 | """Handles not passing ``encoding`` and ``errors`` in binary mode.""" | |
367 | if "b" in mode: | |
368 | return open(file, mode) | |
369 | ||
370 | return open(file, mode, encoding=encoding, errors=errors) | |
371 | ||
372 | ||
373 | def open_stream( | |
374 | filename: "t.Union[str, os.PathLike[str]]", | |
375 | mode: str = "r", | |
376 | encoding: t.Optional[str] = None, | |
377 | errors: t.Optional[str] = "strict", | |
378 | atomic: bool = False, | |
379 | ) -> t.Tuple[t.IO[t.Any], bool]: | |
380 | binary = "b" in mode | |
381 | filename = os.fspath(filename) | |
382 | ||
383 | # Standard streams first. These are simple because they ignore the | |
384 | # atomic flag. Use fsdecode to handle Path("-"). | |
385 | if os.fsdecode(filename) == "-": | |
386 | if any(m in mode for m in ["w", "a", "x"]): | |
387 | if binary: | |
388 | return get_binary_stdout(), False | |
389 | return get_text_stdout(encoding=encoding, errors=errors), False | |
390 | if binary: | |
391 | return get_binary_stdin(), False | |
392 | return get_text_stdin(encoding=encoding, errors=errors), False | |
393 | ||
394 | # Non-atomic writes directly go out through the regular open functions. | |
395 | if not atomic: | |
396 | return _wrap_io_open(filename, mode, encoding, errors), True | |
397 | ||
398 | # Some usability stuff for atomic writes | |
399 | if "a" in mode: | |
400 | raise ValueError( | |
401 | "Appending to an existing file is not supported, because that" | |
402 | " would involve an expensive `copy`-operation to a temporary" | |
403 | " file. Open the file in normal `w`-mode and copy explicitly" | |
404 | " if that's what you're after." | |
405 | ) | |
406 | if "x" in mode: | |
407 | raise ValueError("Use the `overwrite`-parameter instead.") | |
408 | if "w" not in mode: | |
409 | raise ValueError("Atomic writes only make sense with `w`-mode.") | |
410 | ||
411 | # Atomic writes are more complicated. They work by opening a file | |
412 | # as a proxy in the same folder and then using the fdopen | |
413 | # functionality to wrap it in a Python file. Then we wrap it in an | |
414 | # atomic file that moves the file over on close. | |
415 | import errno | |
416 | import random | |
417 | ||
418 | try: | |
419 | perm: t.Optional[int] = os.stat(filename).st_mode | |
420 | except OSError: | |
421 | perm = None | |
422 | ||
423 | flags = os.O_RDWR | os.O_CREAT | os.O_EXCL | |
424 | ||
425 | if binary: | |
426 | flags |= getattr(os, "O_BINARY", 0) | |
427 | ||
428 | while True: | |
429 | tmp_filename = os.path.join( | |
430 | os.path.dirname(filename), | |
431 | f".__atomic-write{random.randrange(1 << 32):08x}", | |
432 | ) | |
433 | try: | |
434 | fd = os.open(tmp_filename, flags, 0o666 if perm is None else perm) | |
435 | break | |
436 | except OSError as e: | |
437 | if e.errno == errno.EEXIST or ( | |
438 | os.name == "nt" | |
439 | and e.errno == errno.EACCES | |
440 | and os.path.isdir(e.filename) | |
441 | and os.access(e.filename, os.W_OK) | |
442 | ): | |
443 | continue | |
444 | raise | |
445 | ||
446 | if perm is not None: | |
447 | os.chmod(tmp_filename, perm) # in case perm includes bits in umask | |
448 | ||
449 | f = _wrap_io_open(fd, mode, encoding, errors) | |
450 | af = _AtomicFile(f, tmp_filename, os.path.realpath(filename)) | |
451 | return t.cast(t.IO[t.Any], af), True | |
452 | ||
453 | ||
454 | class _AtomicFile: | |
455 | def __init__(self, f: t.IO[t.Any], tmp_filename: str, real_filename: str) -> None: | |
456 | self._f = f | |
457 | self._tmp_filename = tmp_filename | |
458 | self._real_filename = real_filename | |
459 | self.closed = False | |
460 | ||
461 | @property | |
462 | def name(self) -> str: | |
463 | return self._real_filename | |
464 | ||
465 | def close(self, delete: bool = False) -> None: | |
466 | if self.closed: | |
467 | return | |
468 | self._f.close() | |
469 | os.replace(self._tmp_filename, self._real_filename) | |
470 | self.closed = True | |
471 | ||
472 | def __getattr__(self, name: str) -> t.Any: | |
473 | return getattr(self._f, name) | |
474 | ||
475 | def __enter__(self) -> "_AtomicFile": | |
476 | return self | |
477 | ||
478 | def __exit__(self, exc_type: t.Optional[t.Type[BaseException]], *_: t.Any) -> None: | |
479 | self.close(delete=exc_type is not None) | |
480 | ||
481 | def __repr__(self) -> str: | |
482 | return repr(self._f) | |
483 | ||
484 | ||
485 | def strip_ansi(value: str) -> str: | |
486 | return _ansi_re.sub("", value) | |
487 | ||
488 | ||
489 | def _is_jupyter_kernel_output(stream: t.IO[t.Any]) -> bool: | |
490 | while isinstance(stream, (_FixupStream, _NonClosingTextIOWrapper)): | |
491 | stream = stream._stream | |
492 | ||
493 | return stream.__class__.__module__.startswith("ipykernel.") | |
494 | ||
495 | ||
496 | def should_strip_ansi( | |
497 | stream: t.Optional[t.IO[t.Any]] = None, color: t.Optional[bool] = None | |
498 | ) -> bool: | |
499 | if color is None: | |
500 | if stream is None: | |
501 | stream = sys.stdin | |
502 | return not isatty(stream) and not _is_jupyter_kernel_output(stream) | |
503 | return not color | |
504 | ||
505 | ||
506 | # On Windows, wrap the output streams with colorama to support ANSI | |
507 | # color codes. | |
508 | # NOTE: double check is needed so mypy does not analyze this on Linux | |
509 | if sys.platform.startswith("win") and WIN: | |
510 | from ._winconsole import _get_windows_console_stream | |
511 | ||
512 | def _get_argv_encoding() -> str: | |
513 | import locale | |
514 | ||
515 | return locale.getpreferredencoding() | |
516 | ||
517 | _ansi_stream_wrappers: t.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary() | |
518 | ||
519 | def auto_wrap_for_ansi( # noqa: F811 | |
520 | stream: t.TextIO, color: t.Optional[bool] = None | |
521 | ) -> t.TextIO: | |
522 | """Support ANSI color and style codes on Windows by wrapping a | |
523 | stream with colorama. | |
524 | """ | |
525 | try: | |
526 | cached = _ansi_stream_wrappers.get(stream) | |
527 | except Exception: | |
528 | cached = None | |
529 | ||
530 | if cached is not None: | |
531 | return cached | |
532 | ||
533 | import colorama | |
534 | ||
535 | strip = should_strip_ansi(stream, color) | |
536 | ansi_wrapper = colorama.AnsiToWin32(stream, strip=strip) | |
537 | rv = t.cast(t.TextIO, ansi_wrapper.stream) | |
538 | _write = rv.write | |
539 | ||
540 | def _safe_write(s): | |
541 | try: | |
542 | return _write(s) | |
543 | except BaseException: | |
544 | ansi_wrapper.reset_all() | |
545 | raise | |
546 | ||
547 | rv.write = _safe_write | |
548 | ||
549 | try: | |
550 | _ansi_stream_wrappers[stream] = rv | |
551 | except Exception: | |
552 | pass | |
553 | ||
554 | return rv | |
555 | ||
556 | else: | |
557 | ||
558 | def _get_argv_encoding() -> str: | |
559 | return getattr(sys.stdin, "encoding", None) or sys.getfilesystemencoding() | |
560 | ||
561 | def _get_windows_console_stream( | |
562 | f: t.TextIO, encoding: t.Optional[str], errors: t.Optional[str] | |
563 | ) -> t.Optional[t.TextIO]: | |
564 | return None | |
565 | ||
566 | ||
567 | def term_len(x: str) -> int: | |
568 | return len(strip_ansi(x)) | |
569 | ||
570 | ||
571 | def isatty(stream: t.IO[t.Any]) -> bool: | |
572 | try: | |
573 | return stream.isatty() | |
574 | except Exception: | |
575 | return False | |
576 | ||
577 | ||
578 | def _make_cached_stream_func( | |
579 | src_func: t.Callable[[], t.Optional[t.TextIO]], | |
580 | wrapper_func: t.Callable[[], t.TextIO], | |
581 | ) -> t.Callable[[], t.Optional[t.TextIO]]: | |
582 | cache: t.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary() | |
583 | ||
584 | def func() -> t.Optional[t.TextIO]: | |
585 | stream = src_func() | |
586 | ||
587 | if stream is None: | |
588 | return None | |
589 | ||
590 | try: | |
591 | rv = cache.get(stream) | |
592 | except Exception: | |
593 | rv = None | |
594 | if rv is not None: | |
595 | return rv | |
596 | rv = wrapper_func() | |
597 | try: | |
598 | cache[stream] = rv | |
599 | except Exception: | |
600 | pass | |
601 | return rv | |
602 | ||
603 | return func | |
604 | ||
605 | ||
606 | _default_text_stdin = _make_cached_stream_func(lambda: sys.stdin, get_text_stdin) | |
607 | _default_text_stdout = _make_cached_stream_func(lambda: sys.stdout, get_text_stdout) | |
608 | _default_text_stderr = _make_cached_stream_func(lambda: sys.stderr, get_text_stderr) | |
609 | ||
610 | ||
611 | binary_streams: t.Mapping[str, t.Callable[[], t.BinaryIO]] = { | |
612 | "stdin": get_binary_stdin, | |
613 | "stdout": get_binary_stdout, | |
614 | "stderr": get_binary_stderr, | |
615 | } | |
616 | ||
617 | text_streams: t.Mapping[ | |
618 | str, t.Callable[[t.Optional[str], t.Optional[str]], t.TextIO] | |
619 | ] = { | |
620 | "stdin": get_text_stdin, | |
621 | "stdout": get_text_stdout, | |
622 | "stderr": get_text_stderr, | |
623 | } |