]>
crepu.dev Git - config.git/blob - djavu-asus/emacs/elpy/rpc-venv/lib/python3.11/site-packages/click/testing.py
9 from types
import TracebackType
11 from . import formatting
14 from ._compat
import _find_binary_reader
17 from .core
import BaseCommand
21 def __init__(self
, input: t
.BinaryIO
, output
: t
.BinaryIO
) -> None:
26 def __getattr__(self
, x
: str) -> t
.Any
:
27 return getattr(self
._input
, x
)
29 def _echo(self
, rv
: bytes
) -> bytes
:
31 self
._output
.write(rv
)
35 def read(self
, n
: int = -1) -> bytes
:
36 return self
._echo
(self
._input
.read(n
))
38 def read1(self
, n
: int = -1) -> bytes
:
39 return self
._echo
(self
._input
.read1(n
)) # type: ignore
41 def readline(self
, n
: int = -1) -> bytes
:
42 return self
._echo
(self
._input
.readline(n
))
44 def readlines(self
) -> t
.List
[bytes
]:
45 return [self
._echo
(x
) for x
in self
._input
.readlines()]
47 def __iter__(self
) -> t
.Iterator
[bytes
]:
48 return iter(self
._echo
(x
) for x
in self
._input
)
50 def __repr__(self
) -> str:
51 return repr(self
._input
)
54 @contextlib.contextmanager
55 def _pause_echo(stream
: t
.Optional
[EchoingStdin
]) -> t
.Iterator
[None]:
61 stream
._paused
= False
64 class _NamedTextIOWrapper(io
.TextIOWrapper
):
66 self
, buffer: t
.BinaryIO
, name
: str, mode
: str, **kwargs
: t
.Any
68 super().__init
__(buffer, **kwargs
)
73 def name(self
) -> str:
77 def mode(self
) -> str:
81 def make_input_stream(
82 input: t
.Optional
[t
.Union
[str, bytes
, t
.IO
[t
.Any
]]], charset
: str
84 # Is already an input stream.
85 if hasattr(input, "read"):
86 rv
= _find_binary_reader(t
.cast(t
.IO
[t
.Any
], input))
91 raise TypeError("Could not find binary reader for input stream.")
95 elif isinstance(input, str):
96 input = input.encode(charset
)
98 return io
.BytesIO(input)
102 """Holds the captured result of an invoked CLI script."""
108 stderr_bytes
: t
.Optional
[bytes
],
111 exception
: t
.Optional
[BaseException
],
112 exc_info
: t
.Optional
[
113 t
.Tuple
[t
.Type
[BaseException
], BaseException
, TracebackType
]
116 #: The runner that created the result
118 #: The standard output as bytes.
119 self
.stdout_bytes
= stdout_bytes
120 #: The standard error as bytes, or None if not available
121 self
.stderr_bytes
= stderr_bytes
122 #: The value returned from the invoked command.
124 #: .. versionadded:: 8.0
125 self
.return_value
= return_value
126 #: The exit code as integer.
127 self
.exit_code
= exit_code
128 #: The exception that happened if one did.
129 self
.exception
= exception
131 self
.exc_info
= exc_info
134 def output(self
) -> str:
135 """The (standard) output as unicode string."""
139 def stdout(self
) -> str:
140 """The standard output as unicode string."""
141 return self
.stdout_bytes
.decode(self
.runner
.charset
, "replace").replace(
146 def stderr(self
) -> str:
147 """The standard error as unicode string."""
148 if self
.stderr_bytes
is None:
149 raise ValueError("stderr not separately captured")
150 return self
.stderr_bytes
.decode(self
.runner
.charset
, "replace").replace(
154 def __repr__(self
) -> str:
155 exc_str
= repr(self
.exception
) if self
.exception
else "okay"
156 return f
"<{type(self).__name__} {exc_str}>"
160 """The CLI runner provides functionality to invoke a Click command line
161 script for unittesting purposes in a isolated environment. This only
162 works in single-threaded systems without any concurrency as it changes the
163 global interpreter state.
165 :param charset: the character set for the input and output data.
166 :param env: a dictionary with environment variables for overriding.
167 :param echo_stdin: if this is set to `True`, then reading from stdin writes
168 to stdout. This is useful for showing examples in
169 some circumstances. Note that regular prompts
170 will automatically echo the input.
171 :param mix_stderr: if this is set to `False`, then stdout and stderr are
172 preserved as independent streams. This is useful for
173 Unix-philosophy apps that have predictable stdout and
174 noisy stderr, such that each may be measured
180 charset
: str = "utf-8",
181 env
: t
.Optional
[t
.Mapping
[str, t
.Optional
[str]]] = None,
182 echo_stdin
: bool = False,
183 mix_stderr
: bool = True,
185 self
.charset
= charset
186 self
.env
: t
.Mapping
[str, t
.Optional
[str]] = env
or {}
187 self
.echo_stdin
= echo_stdin
188 self
.mix_stderr
= mix_stderr
190 def get_default_prog_name(self
, cli
: "BaseCommand") -> str:
191 """Given a command object it will return the default program name
192 for it. The default is the `name` attribute or ``"root"`` if not
195 return cli
.name
or "root"
198 self
, overrides
: t
.Optional
[t
.Mapping
[str, t
.Optional
[str]]] = None
199 ) -> t
.Mapping
[str, t
.Optional
[str]]:
200 """Returns the environment overrides for invoking a script."""
206 @contextlib.contextmanager
209 input: t
.Optional
[t
.Union
[str, bytes
, t
.IO
[t
.Any
]]] = None,
210 env
: t
.Optional
[t
.Mapping
[str, t
.Optional
[str]]] = None,
212 ) -> t
.Iterator
[t
.Tuple
[io
.BytesIO
, t
.Optional
[io
.BytesIO
]]]:
213 """A context manager that sets up the isolation for invoking of a
214 command line tool. This sets up stdin with the given input data
215 and `os.environ` with the overrides from the given dictionary.
216 This also rebinds some internals in Click to be mocked (like the
217 prompt functionality).
219 This is automatically done in the :meth:`invoke` method.
221 :param input: the input stream to put into sys.stdin.
222 :param env: the environment overrides as dictionary.
223 :param color: whether the output should contain color codes. The
224 application can still override this explicitly.
226 .. versionchanged:: 8.0
227 ``stderr`` is opened with ``errors="backslashreplace"``
228 instead of the default ``"strict"``.
230 .. versionchanged:: 4.0
231 Added the ``color`` parameter.
233 bytes_input
= make_input_stream(input, self
.charset
)
236 old_stdin
= sys
.stdin
237 old_stdout
= sys
.stdout
238 old_stderr
= sys
.stderr
239 old_forced_width
= formatting
.FORCED_WIDTH
240 formatting
.FORCED_WIDTH
= 80
242 env
= self
.make_env(env
)
244 bytes_output
= io
.BytesIO()
247 bytes_input
= echo_input
= t
.cast(
248 t
.BinaryIO
, EchoingStdin(bytes_input
, bytes_output
)
251 sys
.stdin
= text_input
= _NamedTextIOWrapper(
252 bytes_input
, encoding
=self
.charset
, name
="<stdin>", mode
="r"
256 # Force unbuffered reads, otherwise TextIOWrapper reads a
257 # large chunk which is echoed early.
258 text_input
._CHUNK
_SIZE
= 1 # type: ignore
260 sys
.stdout
= _NamedTextIOWrapper(
261 bytes_output
, encoding
=self
.charset
, name
="<stdout>", mode
="w"
266 sys
.stderr
= sys
.stdout
268 bytes_error
= io
.BytesIO()
269 sys
.stderr
= _NamedTextIOWrapper(
271 encoding
=self
.charset
,
274 errors
="backslashreplace",
277 @_pause_echo(echo_input
) # type: ignore
278 def visible_input(prompt
: t
.Optional
[str] = None) -> str:
279 sys
.stdout
.write(prompt
or "")
280 val
= text_input
.readline().rstrip("\r\n")
281 sys
.stdout
.write(f
"{val}\n")
285 @_pause_echo(echo_input
) # type: ignore
286 def hidden_input(prompt
: t
.Optional
[str] = None) -> str:
287 sys
.stdout
.write(f
"{prompt or ''}\n")
289 return text_input
.readline().rstrip("\r\n")
291 @_pause_echo(echo_input
) # type: ignore
292 def _getchar(echo
: bool) -> str:
293 char
= sys
.stdin
.read(1)
296 sys
.stdout
.write(char
)
301 default_color
= color
303 def should_strip_ansi(
304 stream
: t
.Optional
[t
.IO
[t
.Any
]] = None, color
: t
.Optional
[bool] = None
307 return not default_color
310 old_visible_prompt_func
= termui
.visible_prompt_func
311 old_hidden_prompt_func
= termui
.hidden_prompt_func
312 old__getchar_func
= termui
._getchar
313 old_should_strip_ansi
= utils
.should_strip_ansi
# type: ignore
314 termui
.visible_prompt_func
= visible_input
315 termui
.hidden_prompt_func
= hidden_input
316 termui
._getchar
= _getchar
317 utils
.should_strip_ansi
= should_strip_ansi
# type: ignore
321 for key
, value
in env
.items():
322 old_env
[key
] = os
.environ
.get(key
)
329 os
.environ
[key
] = value
330 yield (bytes_output
, bytes_error
)
332 for key
, value
in old_env
.items():
339 os
.environ
[key
] = value
340 sys
.stdout
= old_stdout
341 sys
.stderr
= old_stderr
342 sys
.stdin
= old_stdin
343 termui
.visible_prompt_func
= old_visible_prompt_func
344 termui
.hidden_prompt_func
= old_hidden_prompt_func
345 termui
._getchar
= old__getchar_func
346 utils
.should_strip_ansi
= old_should_strip_ansi
# type: ignore
347 formatting
.FORCED_WIDTH
= old_forced_width
352 args
: t
.Optional
[t
.Union
[str, t
.Sequence
[str]]] = None,
353 input: t
.Optional
[t
.Union
[str, bytes
, t
.IO
[t
.Any
]]] = None,
354 env
: t
.Optional
[t
.Mapping
[str, t
.Optional
[str]]] = None,
355 catch_exceptions
: bool = True,
359 """Invokes a command in an isolated environment. The arguments are
360 forwarded directly to the command line script, the `extra` keyword
361 arguments are passed to the :meth:`~clickpkg.Command.main` function of
364 This returns a :class:`Result` object.
366 :param cli: the command to invoke
367 :param args: the arguments to invoke. It may be given as an iterable
368 or a string. When given as string it will be interpreted
369 as a Unix shell command. More details at
371 :param input: the input data for `sys.stdin`.
372 :param env: the environment overrides.
373 :param catch_exceptions: Whether to catch any other exceptions than
375 :param extra: the keyword arguments to pass to :meth:`main`.
376 :param color: whether the output should contain color codes. The
377 application can still override this explicitly.
379 .. versionchanged:: 8.0
380 The result object has the ``return_value`` attribute with
381 the value returned from the invoked command.
383 .. versionchanged:: 4.0
384 Added the ``color`` parameter.
386 .. versionchanged:: 3.0
387 Added the ``catch_exceptions`` parameter.
389 .. versionchanged:: 3.0
390 The result object has the ``exc_info`` attribute with the
391 traceback if available.
394 with self
.isolation(input=input, env
=env
, color
=color
) as outstreams
:
396 exception
: t
.Optional
[BaseException
] = None
399 if isinstance(args
, str):
400 args
= shlex
.split(args
)
403 prog_name
= extra
.pop("prog_name")
405 prog_name
= self
.get_default_prog_name(cli
)
408 return_value
= cli
.main(args
=args
or (), prog_name
=prog_name
, **extra
)
409 except SystemExit as e
:
410 exc_info
= sys
.exc_info()
411 e_code
= t
.cast(t
.Optional
[t
.Union
[int, t
.Any
]], e
.code
)
419 if not isinstance(e_code
, int):
420 sys
.stdout
.write(str(e_code
))
421 sys
.stdout
.write("\n")
426 except Exception as e
:
427 if not catch_exceptions
:
431 exc_info
= sys
.exc_info()
434 stdout
= outstreams
[0].getvalue()
438 stderr
= outstreams
[1].getvalue() # type: ignore
444 return_value
=return_value
,
447 exc_info
=exc_info
, # type: ignore
450 @contextlib.contextmanager
451 def isolated_filesystem(
452 self
, temp_dir
: t
.Optional
[t
.Union
[str, "os.PathLike[str]"]] = None
453 ) -> t
.Iterator
[str]:
454 """A context manager that creates a temporary directory and
455 changes the current working directory to it. This isolates tests
456 that affect the contents of the CWD to prevent them from
457 interfering with each other.
459 :param temp_dir: Create the temporary directory under this
460 directory. If given, the created directory is not removed
463 .. versionchanged:: 8.0
464 Added the ``temp_dir`` parameter.
467 dt
= tempfile
.mkdtemp(dir=temp_dir
)
478 except OSError: # noqa: B014