7 from weakref
import WeakKeyDictionary
9 CYGWIN
= sys
.platform
.startswith("cygwin")
10 WIN
= sys
.platform
.startswith("win")
11 auto_wrap_for_ansi
: t
.Optional
[t
.Callable
[[t
.TextIO
], t
.TextIO
]] = None
12 _ansi_re
= re
.compile(r
"\033\[[;?0-9]*[a-zA-Z]")
15 def _make_text_stream(
17 encoding
: t
.Optional
[str],
18 errors
: t
.Optional
[str],
19 force_readable
: bool = False,
20 force_writable
: bool = False,
23 encoding
= get_best_encoding(stream
)
26 return _NonClosingTextIOWrapper(
31 force_readable
=force_readable
,
32 force_writable
=force_writable
,
36 def is_ascii_encoding(encoding
: str) -> bool:
37 """Checks if a given encoding is ascii."""
39 return codecs
.lookup(encoding
).name
== "ascii"
44 def get_best_encoding(stream
: t
.IO
[t
.Any
]) -> str:
45 """Returns the default stream encoding if not found."""
46 rv
= getattr(stream
, "encoding", None) or sys
.getdefaultencoding()
47 if is_ascii_encoding(rv
):
52 class _NonClosingTextIOWrapper(io
.TextIOWrapper
):
56 encoding
: t
.Optional
[str],
57 errors
: t
.Optional
[str],
58 force_readable
: bool = False,
59 force_writable
: bool = False,
62 self
._stream
= stream
= t
.cast(
63 t
.BinaryIO
, _FixupStream(stream
, force_readable
, force_writable
)
65 super().__init
__(stream
, encoding
, errors
, **extra
)
67 def __del__(self
) -> None:
73 def isatty(self
) -> bool:
74 # https://bitbucket.org/pypy/pypy/issue/1803
75 return self
._stream
.isatty()
79 """The new io interface needs more from streams than streams
80 traditionally implement. As such, this fix-up code is necessary in
83 The forcing of readable and writable flags are there because some tools
84 put badly patched objects on sys (one such offender are certain version
91 force_readable
: bool = False,
92 force_writable
: bool = False,
95 self
._force
_readable
= force_readable
96 self
._force
_writable
= force_writable
98 def __getattr__(self
, name
: str) -> t
.Any
:
99 return getattr(self
._stream
, name
)
101 def read1(self
, size
: int) -> bytes
:
102 f
= getattr(self
._stream
, "read1", None)
105 return t
.cast(bytes
, f(size
))
107 return self
._stream
.read(size
)
109 def readable(self
) -> bool:
110 if self
._force
_readable
:
112 x
= getattr(self
._stream
, "readable", None)
114 return t
.cast(bool, x())
121 def writable(self
) -> bool:
122 if self
._force
_writable
:
124 x
= getattr(self
._stream
, "writable", None)
126 return t
.cast(bool, x())
128 self
._stream
.write("") # type: ignore
131 self
._stream
.write(b
"")
136 def seekable(self
) -> bool:
137 x
= getattr(self
._stream
, "seekable", None)
139 return t
.cast(bool, x())
141 self
._stream
.seek(self
._stream
.tell())
147 def _is_binary_reader(stream
: t
.IO
[t
.Any
], default
: bool = False) -> bool:
149 return isinstance(stream
.read(0), bytes
)
152 # This happens in some cases where the stream was already
153 # closed. In this case, we assume the default.
156 def _is_binary_writer(stream
: t
.IO
[t
.Any
], default
: bool = False) -> bool:
169 def _find_binary_reader(stream
: t
.IO
[t
.Any
]) -> t
.Optional
[t
.BinaryIO
]:
170 # We need to figure out if the given stream is already binary.
171 # This can happen because the official docs recommend detaching
172 # the streams to get binary streams. Some code might do this, so
173 # we need to deal with this case explicitly.
174 if _is_binary_reader(stream
, False):
175 return t
.cast(t
.BinaryIO
, stream
)
177 buf
= getattr(stream
, "buffer", None)
179 # Same situation here; this time we assume that the buffer is
180 # actually binary in case it's closed.
181 if buf
is not None and _is_binary_reader(buf
, True):
182 return t
.cast(t
.BinaryIO
, buf
)
187 def _find_binary_writer(stream
: t
.IO
[t
.Any
]) -> t
.Optional
[t
.BinaryIO
]:
188 # We need to figure out if the given stream is already binary.
189 # This can happen because the official docs recommend detaching
190 # the streams to get binary streams. Some code might do this, so
191 # we need to deal with this case explicitly.
192 if _is_binary_writer(stream
, False):
193 return t
.cast(t
.BinaryIO
, stream
)
195 buf
= getattr(stream
, "buffer", None)
197 # Same situation here; this time we assume that the buffer is
198 # actually binary in case it's closed.
199 if buf
is not None and _is_binary_writer(buf
, True):
200 return t
.cast(t
.BinaryIO
, buf
)
205 def _stream_is_misconfigured(stream
: t
.TextIO
) -> bool:
206 """A stream is misconfigured if its encoding is ASCII."""
207 # If the stream does not have an encoding set, we assume it's set
208 # to ASCII. This appears to happen in certain unittest
209 # environments. It's not quite clear what the correct behavior is
210 # but this at least will force Click to recover somehow.
211 return is_ascii_encoding(getattr(stream
, "encoding", None) or "ascii")
214 def _is_compat_stream_attr(stream
: t
.TextIO
, attr
: str, value
: t
.Optional
[str]) -> bool:
215 """A stream attribute is compatible if it is equal to the
216 desired value or the desired value is unset and the attribute
219 stream_value
= getattr(stream
, attr
, None)
220 return stream_value
== value
or (value
is None and stream_value
is not None)
223 def _is_compatible_text_stream(
224 stream
: t
.TextIO
, encoding
: t
.Optional
[str], errors
: t
.Optional
[str]
226 """Check if a stream's encoding and errors attributes are
227 compatible with the desired values.
229 return _is_compat_stream_attr(
230 stream
, "encoding", encoding
231 ) and _is_compat_stream_attr(stream
, "errors", errors
)
234 def _force_correct_text_stream(
235 text_stream
: t
.IO
[t
.Any
],
236 encoding
: t
.Optional
[str],
237 errors
: t
.Optional
[str],
238 is_binary
: t
.Callable
[[t
.IO
[t
.Any
], bool], bool],
239 find_binary
: t
.Callable
[[t
.IO
[t
.Any
]], t
.Optional
[t
.BinaryIO
]],
240 force_readable
: bool = False,
241 force_writable
: bool = False,
243 if is_binary(text_stream
, False):
244 binary_reader
= t
.cast(t
.BinaryIO
, text_stream
)
246 text_stream
= t
.cast(t
.TextIO
, text_stream
)
247 # If the stream looks compatible, and won't default to a
248 # misconfigured ascii encoding, return it as-is.
249 if _is_compatible_text_stream(text_stream
, encoding
, errors
) and not (
250 encoding
is None and _stream_is_misconfigured(text_stream
)
254 # Otherwise, get the underlying binary reader.
255 possible_binary_reader
= find_binary(text_stream
)
257 # If that's not possible, silently use the original reader
258 # and get mojibake instead of exceptions.
259 if possible_binary_reader
is None:
262 binary_reader
= possible_binary_reader
264 # Default errors to replace instead of strict in order to get
265 # something that works.
269 # Wrap the binary stream in a text stream with the correct
270 # encoding parameters.
271 return _make_text_stream(
275 force_readable
=force_readable
,
276 force_writable
=force_writable
,
280 def _force_correct_text_reader(
281 text_reader
: t
.IO
[t
.Any
],
282 encoding
: t
.Optional
[str],
283 errors
: t
.Optional
[str],
284 force_readable
: bool = False,
286 return _force_correct_text_stream(
292 force_readable
=force_readable
,
296 def _force_correct_text_writer(
297 text_writer
: t
.IO
[t
.Any
],
298 encoding
: t
.Optional
[str],
299 errors
: t
.Optional
[str],
300 force_writable
: bool = False,
302 return _force_correct_text_stream(
308 force_writable
=force_writable
,
312 def get_binary_stdin() -> t
.BinaryIO
:
313 reader
= _find_binary_reader(sys
.stdin
)
315 raise RuntimeError("Was not able to determine binary stream for sys.stdin.")
319 def get_binary_stdout() -> t
.BinaryIO
:
320 writer
= _find_binary_writer(sys
.stdout
)
322 raise RuntimeError("Was not able to determine binary stream for sys.stdout.")
326 def get_binary_stderr() -> t
.BinaryIO
:
327 writer
= _find_binary_writer(sys
.stderr
)
329 raise RuntimeError("Was not able to determine binary stream for sys.stderr.")
334 encoding
: t
.Optional
[str] = None, errors
: t
.Optional
[str] = None
336 rv
= _get_windows_console_stream(sys
.stdin
, encoding
, errors
)
339 return _force_correct_text_reader(sys
.stdin
, encoding
, errors
, force_readable
=True)
343 encoding
: t
.Optional
[str] = None, errors
: t
.Optional
[str] = None
345 rv
= _get_windows_console_stream(sys
.stdout
, encoding
, errors
)
348 return _force_correct_text_writer(sys
.stdout
, encoding
, errors
, force_writable
=True)
352 encoding
: t
.Optional
[str] = None, errors
: t
.Optional
[str] = None
354 rv
= _get_windows_console_stream(sys
.stderr
, encoding
, errors
)
357 return _force_correct_text_writer(sys
.stderr
, encoding
, errors
, force_writable
=True)
361 file: t
.Union
[str, "os.PathLike[str]", int],
363 encoding
: t
.Optional
[str],
364 errors
: t
.Optional
[str],
366 """Handles not passing ``encoding`` and ``errors`` in binary mode."""
368 return open(file, mode
)
370 return open(file, mode
, encoding
=encoding
, errors
=errors
)
374 filename
: "t.Union[str, os.PathLike[str]]",
376 encoding
: t
.Optional
[str] = None,
377 errors
: t
.Optional
[str] = "strict",
378 atomic
: bool = False,
379 ) -> t
.Tuple
[t
.IO
[t
.Any
], bool]:
381 filename
= os
.fspath(filename
)
383 # Standard streams first. These are simple because they ignore the
384 # atomic flag. Use fsdecode to handle Path("-").
385 if os
.fsdecode(filename
) == "-":
386 if any(m
in mode
for m
in ["w", "a", "x"]):
388 return get_binary_stdout(), False
389 return get_text_stdout(encoding
=encoding
, errors
=errors
), False
391 return get_binary_stdin(), False
392 return get_text_stdin(encoding
=encoding
, errors
=errors
), False
394 # Non-atomic writes directly go out through the regular open functions.
396 return _wrap_io_open(filename
, mode
, encoding
, errors
), True
398 # Some usability stuff for atomic writes
401 "Appending to an existing file is not supported, because that"
402 " would involve an expensive `copy`-operation to a temporary"
403 " file. Open the file in normal `w`-mode and copy explicitly"
404 " if that's what you're after."
407 raise ValueError("Use the `overwrite`-parameter instead.")
409 raise ValueError("Atomic writes only make sense with `w`-mode.")
411 # Atomic writes are more complicated. They work by opening a file
412 # as a proxy in the same folder and then using the fdopen
413 # functionality to wrap it in a Python file. Then we wrap it in an
414 # atomic file that moves the file over on close.
419 perm
: t
.Optional
[int] = os
.stat(filename
).st_mode
423 flags
= os
.O_RDWR | os
.O_CREAT | os
.O_EXCL
426 flags |
= getattr(os
, "O_BINARY", 0)
429 tmp_filename
= os
.path
.join(
430 os
.path
.dirname(filename
),
431 f
".__atomic-write{random.randrange(1 << 32):08x}",
434 fd
= os
.open(tmp_filename
, flags
, 0o666 if perm
is None else perm
)
437 if e
.errno
== errno
.EEXIST
or (
439 and e
.errno
== errno
.EACCES
440 and os
.path
.isdir(e
.filename
)
441 and os
.access(e
.filename
, os
.W_OK
)
447 os
.chmod(tmp_filename
, perm
) # in case perm includes bits in umask
449 f
= _wrap_io_open(fd
, mode
, encoding
, errors
)
450 af
= _AtomicFile(f
, tmp_filename
, os
.path
.realpath(filename
))
451 return t
.cast(t
.IO
[t
.Any
], af
), True
455 def __init__(self
, f
: t
.IO
[t
.Any
], tmp_filename
: str, real_filename
: str) -> None:
457 self
._tmp
_filename
= tmp_filename
458 self
._real
_filename
= real_filename
462 def name(self
) -> str:
463 return self
._real
_filename
465 def close(self
, delete
: bool = False) -> None:
469 os
.replace(self
._tmp
_filename
, self
._real
_filename
)
472 def __getattr__(self
, name
: str) -> t
.Any
:
473 return getattr(self
._f
, name
)
475 def __enter__(self
) -> "_AtomicFile":
478 def __exit__(self
, exc_type
: t
.Optional
[t
.Type
[BaseException
]], *_
: t
.Any
) -> None:
479 self
.close(delete
=exc_type
is not None)
481 def __repr__(self
) -> str:
485 def strip_ansi(value
: str) -> str:
486 return _ansi_re
.sub("", value
)
489 def _is_jupyter_kernel_output(stream
: t
.IO
[t
.Any
]) -> bool:
490 while isinstance(stream
, (_FixupStream
, _NonClosingTextIOWrapper
)):
491 stream
= stream
._stream
493 return stream
.__class
__.__module
__.startswith("ipykernel.")
496 def should_strip_ansi(
497 stream
: t
.Optional
[t
.IO
[t
.Any
]] = None, color
: t
.Optional
[bool] = None
502 return not isatty(stream
) and not _is_jupyter_kernel_output(stream
)
506 # On Windows, wrap the output streams with colorama to support ANSI
508 # NOTE: double check is needed so mypy does not analyze this on Linux
509 if sys
.platform
.startswith("win") and WIN
:
510 from ._winconsole
import _get_windows_console_stream
512 def _get_argv_encoding() -> str:
515 return locale
.getpreferredencoding()
517 _ansi_stream_wrappers
: t
.MutableMapping
[t
.TextIO
, t
.TextIO
] = WeakKeyDictionary()
519 def auto_wrap_for_ansi( # noqa: F811
520 stream
: t
.TextIO
, color
: t
.Optional
[bool] = None
522 """Support ANSI color and style codes on Windows by wrapping a
523 stream with colorama.
526 cached
= _ansi_stream_wrappers
.get(stream
)
530 if cached
is not None:
535 strip
= should_strip_ansi(stream
, color
)
536 ansi_wrapper
= colorama
.AnsiToWin32(stream
, strip
=strip
)
537 rv
= t
.cast(t
.TextIO
, ansi_wrapper
.stream
)
543 except BaseException
:
544 ansi_wrapper
.reset_all()
547 rv
.write
= _safe_write
550 _ansi_stream_wrappers
[stream
] = rv
558 def _get_argv_encoding() -> str:
559 return getattr(sys
.stdin
, "encoding", None) or sys
.getfilesystemencoding()
561 def _get_windows_console_stream(
562 f
: t
.TextIO
, encoding
: t
.Optional
[str], errors
: t
.Optional
[str]
563 ) -> t
.Optional
[t
.TextIO
]:
567 def term_len(x
: str) -> int:
568 return len(strip_ansi(x
))
571 def isatty(stream
: t
.IO
[t
.Any
]) -> bool:
573 return stream
.isatty()
578 def _make_cached_stream_func(
579 src_func
: t
.Callable
[[], t
.Optional
[t
.TextIO
]],
580 wrapper_func
: t
.Callable
[[], t
.TextIO
],
581 ) -> t
.Callable
[[], t
.Optional
[t
.TextIO
]]:
582 cache
: t
.MutableMapping
[t
.TextIO
, t
.TextIO
] = WeakKeyDictionary()
584 def func() -> t
.Optional
[t
.TextIO
]:
591 rv
= cache
.get(stream
)
606 _default_text_stdin
= _make_cached_stream_func(lambda: sys
.stdin
, get_text_stdin
)
607 _default_text_stdout
= _make_cached_stream_func(lambda: sys
.stdout
, get_text_stdout
)
608 _default_text_stderr
= _make_cached_stream_func(lambda: sys
.stderr
, get_text_stderr
)
611 binary_streams
: t
.Mapping
[str, t
.Callable
[[], t
.BinaryIO
]] = {
612 "stdin": get_binary_stdin
,
613 "stdout": get_binary_stdout
,
614 "stderr": get_binary_stderr
,
617 text_streams
: t
.Mapping
[
618 str, t
.Callable
[[t
.Optional
[str], t
.Optional
[str]], t
.TextIO
]
620 "stdin": get_text_stdin
,
621 "stdout": get_text_stdout
,
622 "stderr": get_text_stderr
,