]>
crepu.dev Git - config.git/blob - djavu-asus/emacs/elpy/rpc-venv/lib/python3.11/site-packages/click/_winconsole.py
6b20df315b23ecd1e3d0ec32c11c0b5ced577efe
1 # This module is based on the excellent work by Adam Bartoš who
2 # provided a lot of what went into the implementation here in
3 # the discussion to issue1602 in the Python bug tracker.
5 # There are some general differences in regards to how this works
6 # compared to the original patches as we do not need to patch
7 # the entire interpreter but just work in our little world of
13 from ctypes
import byref
14 from ctypes
import c_char
15 from ctypes
import c_char_p
16 from ctypes
import c_int
17 from ctypes
import c_ssize_t
18 from ctypes
import c_ulong
19 from ctypes
import c_void_p
20 from ctypes
import POINTER
21 from ctypes
import py_object
22 from ctypes
import Structure
23 from ctypes
.wintypes
import DWORD
24 from ctypes
.wintypes
import HANDLE
25 from ctypes
.wintypes
import LPCWSTR
26 from ctypes
.wintypes
import LPWSTR
28 from ._compat
import _NonClosingTextIOWrapper
30 assert sys
.platform
== "win32"
31 import msvcrt
# noqa: E402
32 from ctypes
import windll
# noqa: E402
33 from ctypes
import WINFUNCTYPE
# noqa: E402
35 c_ssize_p
= POINTER(c_ssize_t
)
37 kernel32
= windll
.kernel32
38 GetStdHandle
= kernel32
.GetStdHandle
39 ReadConsoleW
= kernel32
.ReadConsoleW
40 WriteConsoleW
= kernel32
.WriteConsoleW
41 GetConsoleMode
= kernel32
.GetConsoleMode
42 GetLastError
= kernel32
.GetLastError
43 GetCommandLineW
= WINFUNCTYPE(LPWSTR
)(("GetCommandLineW", windll
.kernel32
))
44 CommandLineToArgvW
= WINFUNCTYPE(POINTER(LPWSTR
), LPCWSTR
, POINTER(c_int
))(
45 ("CommandLineToArgvW", windll
.shell32
)
47 LocalFree
= WINFUNCTYPE(c_void_p
, c_void_p
)(("LocalFree", windll
.kernel32
))
49 STDIN_HANDLE
= GetStdHandle(-10)
50 STDOUT_HANDLE
= GetStdHandle(-11)
51 STDERR_HANDLE
= GetStdHandle(-12)
57 ERROR_NOT_ENOUGH_MEMORY
= 8
58 ERROR_OPERATION_ABORTED
= 995
65 MAX_BYTES_WRITTEN
= 32767
68 from ctypes
import pythonapi
70 # On PyPy we cannot get buffers so our ability to operate here is
75 class Py_buffer(Structure
):
80 ("itemsize", c_ssize_t
),
85 ("strides", c_ssize_p
),
86 ("suboffsets", c_ssize_p
),
87 ("internal", c_void_p
),
90 PyObject_GetBuffer
= pythonapi
.PyObject_GetBuffer
91 PyBuffer_Release
= pythonapi
.PyBuffer_Release
93 def get_buffer(obj
, writable
=False):
95 flags
= PyBUF_WRITABLE
if writable
else PyBUF_SIMPLE
96 PyObject_GetBuffer(py_object(obj
), byref(buf
), flags
)
99 buffer_type
= c_char
* buf
.len
100 return buffer_type
.from_address(buf
.buf
)
102 PyBuffer_Release(byref(buf
))
105 class _WindowsConsoleRawIOBase(io
.RawIOBase
):
106 def __init__(self
, handle
):
114 class _WindowsConsoleReader(_WindowsConsoleRawIOBase
):
118 def readinto(self
, b
):
119 bytes_to_be_read
= len(b
)
120 if not bytes_to_be_read
:
122 elif bytes_to_be_read
% 2:
124 "cannot read odd number of bytes from UTF-16-LE encoded console"
127 buffer = get_buffer(b
, writable
=True)
128 code_units_to_be_read
= bytes_to_be_read
// 2
129 code_units_read
= c_ulong()
134 code_units_to_be_read
,
135 byref(code_units_read
),
138 if GetLastError() == ERROR_OPERATION_ABORTED
:
139 # wait for KeyboardInterrupt
142 raise OSError(f
"Windows error: {GetLastError()}")
146 return 2 * code_units_read
.value
149 class _WindowsConsoleWriter(_WindowsConsoleRawIOBase
):
154 def _get_error_message(errno
):
155 if errno
== ERROR_SUCCESS
:
156 return "ERROR_SUCCESS"
157 elif errno
== ERROR_NOT_ENOUGH_MEMORY
:
158 return "ERROR_NOT_ENOUGH_MEMORY"
159 return f
"Windows error {errno}"
162 bytes_to_be_written
= len(b
)
164 code_units_to_be_written
= min(bytes_to_be_written
, MAX_BYTES_WRITTEN
) // 2
165 code_units_written
= c_ulong()
170 code_units_to_be_written
,
171 byref(code_units_written
),
174 bytes_written
= 2 * code_units_written
.value
176 if bytes_written
== 0 and bytes_to_be_written
> 0:
177 raise OSError(self
._get
_error
_message
(GetLastError()))
182 def __init__(self
, text_stream
: t
.TextIO
, byte_stream
: t
.BinaryIO
) -> None:
183 self
._text
_stream
= text_stream
184 self
.buffer = byte_stream
187 def name(self
) -> str:
188 return self
.buffer.name
190 def write(self
, x
: t
.AnyStr
) -> int:
191 if isinstance(x
, str):
192 return self
._text
_stream
.write(x
)
197 return self
.buffer.write(x
)
199 def writelines(self
, lines
: t
.Iterable
[t
.AnyStr
]) -> None:
203 def __getattr__(self
, name
: str) -> t
.Any
:
204 return getattr(self
._text
_stream
, name
)
206 def isatty(self
) -> bool:
207 return self
.buffer.isatty()
210 return f
"<ConsoleStream name={self.name!r} encoding={self.encoding!r}>"
213 def _get_text_stdin(buffer_stream
: t
.BinaryIO
) -> t
.TextIO
:
214 text_stream
= _NonClosingTextIOWrapper(
215 io
.BufferedReader(_WindowsConsoleReader(STDIN_HANDLE
)),
220 return t
.cast(t
.TextIO
, ConsoleStream(text_stream
, buffer_stream
))
223 def _get_text_stdout(buffer_stream
: t
.BinaryIO
) -> t
.TextIO
:
224 text_stream
= _NonClosingTextIOWrapper(
225 io
.BufferedWriter(_WindowsConsoleWriter(STDOUT_HANDLE
)),
230 return t
.cast(t
.TextIO
, ConsoleStream(text_stream
, buffer_stream
))
233 def _get_text_stderr(buffer_stream
: t
.BinaryIO
) -> t
.TextIO
:
234 text_stream
= _NonClosingTextIOWrapper(
235 io
.BufferedWriter(_WindowsConsoleWriter(STDERR_HANDLE
)),
240 return t
.cast(t
.TextIO
, ConsoleStream(text_stream
, buffer_stream
))
243 _stream_factories
: t
.Mapping
[int, t
.Callable
[[t
.BinaryIO
], t
.TextIO
]] = {
250 def _is_console(f
: t
.TextIO
) -> bool:
251 if not hasattr(f
, "fileno"):
256 except (OSError, io
.UnsupportedOperation
):
259 handle
= msvcrt
.get_osfhandle(fileno
)
260 return bool(GetConsoleMode(handle
, byref(DWORD())))
263 def _get_windows_console_stream(
264 f
: t
.TextIO
, encoding
: t
.Optional
[str], errors
: t
.Optional
[str]
265 ) -> t
.Optional
[t
.TextIO
]:
267 get_buffer
is not None
268 and encoding
in {"utf-16-le", None}
269 and errors
in {"strict", None}
272 func
= _stream_factories
.get(f
.fileno())
274 b
= getattr(f
, "buffer", None)