]> crepu.dev Git - config.git/blob - djavu-asus/emacs/elpy/rpc-venv/lib/python3.11/site-packages/click/_compat.py
Reorganización de directorios
[config.git] / djavu-asus / emacs / elpy / rpc-venv / lib / python3.11 / site-packages / click / _compat.py
1 import codecs
2 import io
3 import os
4 import re
5 import sys
6 import typing as t
7 from weakref import WeakKeyDictionary
8
9 CYGWIN = sys.platform.startswith("cygwin")
10 WIN = sys.platform.startswith("win")
11 auto_wrap_for_ansi: t.Optional[t.Callable[[t.TextIO], t.TextIO]] = None
12 _ansi_re = re.compile(r"\033\[[;?0-9]*[a-zA-Z]")
13
14
15 def _make_text_stream(
16 stream: t.BinaryIO,
17 encoding: t.Optional[str],
18 errors: t.Optional[str],
19 force_readable: bool = False,
20 force_writable: bool = False,
21 ) -> t.TextIO:
22 if encoding is None:
23 encoding = get_best_encoding(stream)
24 if errors is None:
25 errors = "replace"
26 return _NonClosingTextIOWrapper(
27 stream,
28 encoding,
29 errors,
30 line_buffering=True,
31 force_readable=force_readable,
32 force_writable=force_writable,
33 )
34
35
36 def is_ascii_encoding(encoding: str) -> bool:
37 """Checks if a given encoding is ascii."""
38 try:
39 return codecs.lookup(encoding).name == "ascii"
40 except LookupError:
41 return False
42
43
44 def get_best_encoding(stream: t.IO[t.Any]) -> str:
45 """Returns the default stream encoding if not found."""
46 rv = getattr(stream, "encoding", None) or sys.getdefaultencoding()
47 if is_ascii_encoding(rv):
48 return "utf-8"
49 return rv
50
51
52 class _NonClosingTextIOWrapper(io.TextIOWrapper):
53 def __init__(
54 self,
55 stream: t.BinaryIO,
56 encoding: t.Optional[str],
57 errors: t.Optional[str],
58 force_readable: bool = False,
59 force_writable: bool = False,
60 **extra: t.Any,
61 ) -> None:
62 self._stream = stream = t.cast(
63 t.BinaryIO, _FixupStream(stream, force_readable, force_writable)
64 )
65 super().__init__(stream, encoding, errors, **extra)
66
67 def __del__(self) -> None:
68 try:
69 self.detach()
70 except Exception:
71 pass
72
73 def isatty(self) -> bool:
74 # https://bitbucket.org/pypy/pypy/issue/1803
75 return self._stream.isatty()
76
77
78 class _FixupStream:
79 """The new io interface needs more from streams than streams
80 traditionally implement. As such, this fix-up code is necessary in
81 some circumstances.
82
83 The forcing of readable and writable flags are there because some tools
84 put badly patched objects on sys (one such offender are certain version
85 of jupyter notebook).
86 """
87
88 def __init__(
89 self,
90 stream: t.BinaryIO,
91 force_readable: bool = False,
92 force_writable: bool = False,
93 ):
94 self._stream = stream
95 self._force_readable = force_readable
96 self._force_writable = force_writable
97
98 def __getattr__(self, name: str) -> t.Any:
99 return getattr(self._stream, name)
100
101 def read1(self, size: int) -> bytes:
102 f = getattr(self._stream, "read1", None)
103
104 if f is not None:
105 return t.cast(bytes, f(size))
106
107 return self._stream.read(size)
108
109 def readable(self) -> bool:
110 if self._force_readable:
111 return True
112 x = getattr(self._stream, "readable", None)
113 if x is not None:
114 return t.cast(bool, x())
115 try:
116 self._stream.read(0)
117 except Exception:
118 return False
119 return True
120
121 def writable(self) -> bool:
122 if self._force_writable:
123 return True
124 x = getattr(self._stream, "writable", None)
125 if x is not None:
126 return t.cast(bool, x())
127 try:
128 self._stream.write("") # type: ignore
129 except Exception:
130 try:
131 self._stream.write(b"")
132 except Exception:
133 return False
134 return True
135
136 def seekable(self) -> bool:
137 x = getattr(self._stream, "seekable", None)
138 if x is not None:
139 return t.cast(bool, x())
140 try:
141 self._stream.seek(self._stream.tell())
142 except Exception:
143 return False
144 return True
145
146
147 def _is_binary_reader(stream: t.IO[t.Any], default: bool = False) -> bool:
148 try:
149 return isinstance(stream.read(0), bytes)
150 except Exception:
151 return default
152 # This happens in some cases where the stream was already
153 # closed. In this case, we assume the default.
154
155
156 def _is_binary_writer(stream: t.IO[t.Any], default: bool = False) -> bool:
157 try:
158 stream.write(b"")
159 except Exception:
160 try:
161 stream.write("")
162 return False
163 except Exception:
164 pass
165 return default
166 return True
167
168
169 def _find_binary_reader(stream: t.IO[t.Any]) -> t.Optional[t.BinaryIO]:
170 # We need to figure out if the given stream is already binary.
171 # This can happen because the official docs recommend detaching
172 # the streams to get binary streams. Some code might do this, so
173 # we need to deal with this case explicitly.
174 if _is_binary_reader(stream, False):
175 return t.cast(t.BinaryIO, stream)
176
177 buf = getattr(stream, "buffer", None)
178
179 # Same situation here; this time we assume that the buffer is
180 # actually binary in case it's closed.
181 if buf is not None and _is_binary_reader(buf, True):
182 return t.cast(t.BinaryIO, buf)
183
184 return None
185
186
187 def _find_binary_writer(stream: t.IO[t.Any]) -> t.Optional[t.BinaryIO]:
188 # We need to figure out if the given stream is already binary.
189 # This can happen because the official docs recommend detaching
190 # the streams to get binary streams. Some code might do this, so
191 # we need to deal with this case explicitly.
192 if _is_binary_writer(stream, False):
193 return t.cast(t.BinaryIO, stream)
194
195 buf = getattr(stream, "buffer", None)
196
197 # Same situation here; this time we assume that the buffer is
198 # actually binary in case it's closed.
199 if buf is not None and _is_binary_writer(buf, True):
200 return t.cast(t.BinaryIO, buf)
201
202 return None
203
204
205 def _stream_is_misconfigured(stream: t.TextIO) -> bool:
206 """A stream is misconfigured if its encoding is ASCII."""
207 # If the stream does not have an encoding set, we assume it's set
208 # to ASCII. This appears to happen in certain unittest
209 # environments. It's not quite clear what the correct behavior is
210 # but this at least will force Click to recover somehow.
211 return is_ascii_encoding(getattr(stream, "encoding", None) or "ascii")
212
213
214 def _is_compat_stream_attr(stream: t.TextIO, attr: str, value: t.Optional[str]) -> bool:
215 """A stream attribute is compatible if it is equal to the
216 desired value or the desired value is unset and the attribute
217 has a value.
218 """
219 stream_value = getattr(stream, attr, None)
220 return stream_value == value or (value is None and stream_value is not None)
221
222
223 def _is_compatible_text_stream(
224 stream: t.TextIO, encoding: t.Optional[str], errors: t.Optional[str]
225 ) -> bool:
226 """Check if a stream's encoding and errors attributes are
227 compatible with the desired values.
228 """
229 return _is_compat_stream_attr(
230 stream, "encoding", encoding
231 ) and _is_compat_stream_attr(stream, "errors", errors)
232
233
234 def _force_correct_text_stream(
235 text_stream: t.IO[t.Any],
236 encoding: t.Optional[str],
237 errors: t.Optional[str],
238 is_binary: t.Callable[[t.IO[t.Any], bool], bool],
239 find_binary: t.Callable[[t.IO[t.Any]], t.Optional[t.BinaryIO]],
240 force_readable: bool = False,
241 force_writable: bool = False,
242 ) -> t.TextIO:
243 if is_binary(text_stream, False):
244 binary_reader = t.cast(t.BinaryIO, text_stream)
245 else:
246 text_stream = t.cast(t.TextIO, text_stream)
247 # If the stream looks compatible, and won't default to a
248 # misconfigured ascii encoding, return it as-is.
249 if _is_compatible_text_stream(text_stream, encoding, errors) and not (
250 encoding is None and _stream_is_misconfigured(text_stream)
251 ):
252 return text_stream
253
254 # Otherwise, get the underlying binary reader.
255 possible_binary_reader = find_binary(text_stream)
256
257 # If that's not possible, silently use the original reader
258 # and get mojibake instead of exceptions.
259 if possible_binary_reader is None:
260 return text_stream
261
262 binary_reader = possible_binary_reader
263
264 # Default errors to replace instead of strict in order to get
265 # something that works.
266 if errors is None:
267 errors = "replace"
268
269 # Wrap the binary stream in a text stream with the correct
270 # encoding parameters.
271 return _make_text_stream(
272 binary_reader,
273 encoding,
274 errors,
275 force_readable=force_readable,
276 force_writable=force_writable,
277 )
278
279
280 def _force_correct_text_reader(
281 text_reader: t.IO[t.Any],
282 encoding: t.Optional[str],
283 errors: t.Optional[str],
284 force_readable: bool = False,
285 ) -> t.TextIO:
286 return _force_correct_text_stream(
287 text_reader,
288 encoding,
289 errors,
290 _is_binary_reader,
291 _find_binary_reader,
292 force_readable=force_readable,
293 )
294
295
296 def _force_correct_text_writer(
297 text_writer: t.IO[t.Any],
298 encoding: t.Optional[str],
299 errors: t.Optional[str],
300 force_writable: bool = False,
301 ) -> t.TextIO:
302 return _force_correct_text_stream(
303 text_writer,
304 encoding,
305 errors,
306 _is_binary_writer,
307 _find_binary_writer,
308 force_writable=force_writable,
309 )
310
311
312 def get_binary_stdin() -> t.BinaryIO:
313 reader = _find_binary_reader(sys.stdin)
314 if reader is None:
315 raise RuntimeError("Was not able to determine binary stream for sys.stdin.")
316 return reader
317
318
319 def get_binary_stdout() -> t.BinaryIO:
320 writer = _find_binary_writer(sys.stdout)
321 if writer is None:
322 raise RuntimeError("Was not able to determine binary stream for sys.stdout.")
323 return writer
324
325
326 def get_binary_stderr() -> t.BinaryIO:
327 writer = _find_binary_writer(sys.stderr)
328 if writer is None:
329 raise RuntimeError("Was not able to determine binary stream for sys.stderr.")
330 return writer
331
332
333 def get_text_stdin(
334 encoding: t.Optional[str] = None, errors: t.Optional[str] = None
335 ) -> t.TextIO:
336 rv = _get_windows_console_stream(sys.stdin, encoding, errors)
337 if rv is not None:
338 return rv
339 return _force_correct_text_reader(sys.stdin, encoding, errors, force_readable=True)
340
341
342 def get_text_stdout(
343 encoding: t.Optional[str] = None, errors: t.Optional[str] = None
344 ) -> t.TextIO:
345 rv = _get_windows_console_stream(sys.stdout, encoding, errors)
346 if rv is not None:
347 return rv
348 return _force_correct_text_writer(sys.stdout, encoding, errors, force_writable=True)
349
350
351 def get_text_stderr(
352 encoding: t.Optional[str] = None, errors: t.Optional[str] = None
353 ) -> t.TextIO:
354 rv = _get_windows_console_stream(sys.stderr, encoding, errors)
355 if rv is not None:
356 return rv
357 return _force_correct_text_writer(sys.stderr, encoding, errors, force_writable=True)
358
359
360 def _wrap_io_open(
361 file: t.Union[str, "os.PathLike[str]", int],
362 mode: str,
363 encoding: t.Optional[str],
364 errors: t.Optional[str],
365 ) -> t.IO[t.Any]:
366 """Handles not passing ``encoding`` and ``errors`` in binary mode."""
367 if "b" in mode:
368 return open(file, mode)
369
370 return open(file, mode, encoding=encoding, errors=errors)
371
372
373 def open_stream(
374 filename: "t.Union[str, os.PathLike[str]]",
375 mode: str = "r",
376 encoding: t.Optional[str] = None,
377 errors: t.Optional[str] = "strict",
378 atomic: bool = False,
379 ) -> t.Tuple[t.IO[t.Any], bool]:
380 binary = "b" in mode
381 filename = os.fspath(filename)
382
383 # Standard streams first. These are simple because they ignore the
384 # atomic flag. Use fsdecode to handle Path("-").
385 if os.fsdecode(filename) == "-":
386 if any(m in mode for m in ["w", "a", "x"]):
387 if binary:
388 return get_binary_stdout(), False
389 return get_text_stdout(encoding=encoding, errors=errors), False
390 if binary:
391 return get_binary_stdin(), False
392 return get_text_stdin(encoding=encoding, errors=errors), False
393
394 # Non-atomic writes directly go out through the regular open functions.
395 if not atomic:
396 return _wrap_io_open(filename, mode, encoding, errors), True
397
398 # Some usability stuff for atomic writes
399 if "a" in mode:
400 raise ValueError(
401 "Appending to an existing file is not supported, because that"
402 " would involve an expensive `copy`-operation to a temporary"
403 " file. Open the file in normal `w`-mode and copy explicitly"
404 " if that's what you're after."
405 )
406 if "x" in mode:
407 raise ValueError("Use the `overwrite`-parameter instead.")
408 if "w" not in mode:
409 raise ValueError("Atomic writes only make sense with `w`-mode.")
410
411 # Atomic writes are more complicated. They work by opening a file
412 # as a proxy in the same folder and then using the fdopen
413 # functionality to wrap it in a Python file. Then we wrap it in an
414 # atomic file that moves the file over on close.
415 import errno
416 import random
417
418 try:
419 perm: t.Optional[int] = os.stat(filename).st_mode
420 except OSError:
421 perm = None
422
423 flags = os.O_RDWR | os.O_CREAT | os.O_EXCL
424
425 if binary:
426 flags |= getattr(os, "O_BINARY", 0)
427
428 while True:
429 tmp_filename = os.path.join(
430 os.path.dirname(filename),
431 f".__atomic-write{random.randrange(1 << 32):08x}",
432 )
433 try:
434 fd = os.open(tmp_filename, flags, 0o666 if perm is None else perm)
435 break
436 except OSError as e:
437 if e.errno == errno.EEXIST or (
438 os.name == "nt"
439 and e.errno == errno.EACCES
440 and os.path.isdir(e.filename)
441 and os.access(e.filename, os.W_OK)
442 ):
443 continue
444 raise
445
446 if perm is not None:
447 os.chmod(tmp_filename, perm) # in case perm includes bits in umask
448
449 f = _wrap_io_open(fd, mode, encoding, errors)
450 af = _AtomicFile(f, tmp_filename, os.path.realpath(filename))
451 return t.cast(t.IO[t.Any], af), True
452
453
454 class _AtomicFile:
455 def __init__(self, f: t.IO[t.Any], tmp_filename: str, real_filename: str) -> None:
456 self._f = f
457 self._tmp_filename = tmp_filename
458 self._real_filename = real_filename
459 self.closed = False
460
461 @property
462 def name(self) -> str:
463 return self._real_filename
464
465 def close(self, delete: bool = False) -> None:
466 if self.closed:
467 return
468 self._f.close()
469 os.replace(self._tmp_filename, self._real_filename)
470 self.closed = True
471
472 def __getattr__(self, name: str) -> t.Any:
473 return getattr(self._f, name)
474
475 def __enter__(self) -> "_AtomicFile":
476 return self
477
478 def __exit__(self, exc_type: t.Optional[t.Type[BaseException]], *_: t.Any) -> None:
479 self.close(delete=exc_type is not None)
480
481 def __repr__(self) -> str:
482 return repr(self._f)
483
484
485 def strip_ansi(value: str) -> str:
486 return _ansi_re.sub("", value)
487
488
489 def _is_jupyter_kernel_output(stream: t.IO[t.Any]) -> bool:
490 while isinstance(stream, (_FixupStream, _NonClosingTextIOWrapper)):
491 stream = stream._stream
492
493 return stream.__class__.__module__.startswith("ipykernel.")
494
495
496 def should_strip_ansi(
497 stream: t.Optional[t.IO[t.Any]] = None, color: t.Optional[bool] = None
498 ) -> bool:
499 if color is None:
500 if stream is None:
501 stream = sys.stdin
502 return not isatty(stream) and not _is_jupyter_kernel_output(stream)
503 return not color
504
505
506 # On Windows, wrap the output streams with colorama to support ANSI
507 # color codes.
508 # NOTE: double check is needed so mypy does not analyze this on Linux
509 if sys.platform.startswith("win") and WIN:
510 from ._winconsole import _get_windows_console_stream
511
512 def _get_argv_encoding() -> str:
513 import locale
514
515 return locale.getpreferredencoding()
516
517 _ansi_stream_wrappers: t.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary()
518
519 def auto_wrap_for_ansi( # noqa: F811
520 stream: t.TextIO, color: t.Optional[bool] = None
521 ) -> t.TextIO:
522 """Support ANSI color and style codes on Windows by wrapping a
523 stream with colorama.
524 """
525 try:
526 cached = _ansi_stream_wrappers.get(stream)
527 except Exception:
528 cached = None
529
530 if cached is not None:
531 return cached
532
533 import colorama
534
535 strip = should_strip_ansi(stream, color)
536 ansi_wrapper = colorama.AnsiToWin32(stream, strip=strip)
537 rv = t.cast(t.TextIO, ansi_wrapper.stream)
538 _write = rv.write
539
540 def _safe_write(s):
541 try:
542 return _write(s)
543 except BaseException:
544 ansi_wrapper.reset_all()
545 raise
546
547 rv.write = _safe_write
548
549 try:
550 _ansi_stream_wrappers[stream] = rv
551 except Exception:
552 pass
553
554 return rv
555
556 else:
557
558 def _get_argv_encoding() -> str:
559 return getattr(sys.stdin, "encoding", None) or sys.getfilesystemencoding()
560
561 def _get_windows_console_stream(
562 f: t.TextIO, encoding: t.Optional[str], errors: t.Optional[str]
563 ) -> t.Optional[t.TextIO]:
564 return None
565
566
567 def term_len(x: str) -> int:
568 return len(strip_ansi(x))
569
570
571 def isatty(stream: t.IO[t.Any]) -> bool:
572 try:
573 return stream.isatty()
574 except Exception:
575 return False
576
577
578 def _make_cached_stream_func(
579 src_func: t.Callable[[], t.Optional[t.TextIO]],
580 wrapper_func: t.Callable[[], t.TextIO],
581 ) -> t.Callable[[], t.Optional[t.TextIO]]:
582 cache: t.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary()
583
584 def func() -> t.Optional[t.TextIO]:
585 stream = src_func()
586
587 if stream is None:
588 return None
589
590 try:
591 rv = cache.get(stream)
592 except Exception:
593 rv = None
594 if rv is not None:
595 return rv
596 rv = wrapper_func()
597 try:
598 cache[stream] = rv
599 except Exception:
600 pass
601 return rv
602
603 return func
604
605
606 _default_text_stdin = _make_cached_stream_func(lambda: sys.stdin, get_text_stdin)
607 _default_text_stdout = _make_cached_stream_func(lambda: sys.stdout, get_text_stdout)
608 _default_text_stderr = _make_cached_stream_func(lambda: sys.stderr, get_text_stderr)
609
610
611 binary_streams: t.Mapping[str, t.Callable[[], t.BinaryIO]] = {
612 "stdin": get_binary_stdin,
613 "stdout": get_binary_stdout,
614 "stderr": get_binary_stderr,
615 }
616
617 text_streams: t.Mapping[
618 str, t.Callable[[t.Optional[str], t.Optional[str]], t.TextIO]
619 ] = {
620 "stdin": get_text_stdin,
621 "stdout": get_text_stdout,
622 "stderr": get_text_stderr,
623 }