]>
crepu.dev Git - config.git/blob - djavu-asus/elpy/rpc-venv/lib/python3.11/site-packages/click/_termui_impl.py
2 This module contains implementations for the termui module. To keep the
3 import time of Click down, some infrequently used functionality is
4 placed in this module and only imported as needed.
12 from gettext
import gettext
as _
13 from io
import StringIO
14 from types
import TracebackType
16 from ._compat
import _default_text_stdout
17 from ._compat
import CYGWIN
18 from ._compat
import get_best_encoding
19 from ._compat
import isatty
20 from ._compat
import open_stream
21 from ._compat
import strip_ansi
22 from ._compat
import term_len
23 from ._compat
import WIN
24 from .exceptions
import ClickException
25 from .utils
import echo
33 BEFORE_BAR
= "\r\033[?25l"
34 AFTER_BAR
= "\033[?25h\n"
37 class ProgressBar(t
.Generic
[V
]):
40 iterable
: t
.Optional
[t
.Iterable
[V
]],
41 length
: t
.Optional
[int] = None,
43 empty_char
: str = " ",
44 bar_template
: str = "%(bar)s",
46 show_eta
: bool = True,
47 show_percent
: t
.Optional
[bool] = None,
48 show_pos
: bool = False,
49 item_show_func
: t
.Optional
[t
.Callable
[[t
.Optional
[V
]], t
.Optional
[str]]] = None,
50 label
: t
.Optional
[str] = None,
51 file: t
.Optional
[t
.TextIO
] = None,
52 color
: t
.Optional
[bool] = None,
53 update_min_steps
: int = 1,
56 self
.fill_char
= fill_char
57 self
.empty_char
= empty_char
58 self
.bar_template
= bar_template
59 self
.info_sep
= info_sep
60 self
.show_eta
= show_eta
61 self
.show_percent
= show_percent
62 self
.show_pos
= show_pos
63 self
.item_show_func
= item_show_func
64 self
.label
: str = label
or ""
67 file = _default_text_stdout()
69 # There are no standard streams attached to write to. For example,
76 self
.update_min_steps
= update_min_steps
77 self
._completed
_intervals
= 0
78 self
.width
: int = width
79 self
.autowidth
: bool = width
== 0
82 from operator
import length_hint
84 length
= length_hint(iterable
, -1)
90 raise TypeError("iterable or length is required")
91 iterable
= t
.cast(t
.Iterable
[V
], range(length
))
92 self
.iter: t
.Iterable
[V
] = iter(iterable
)
95 self
.avg
: t
.List
[float] = []
98 self
.start
= self
.last_eta
= time
.time()
99 self
.eta_known
: bool = False
100 self
.finished
: bool = False
101 self
.max_width
: t
.Optional
[int] = None
102 self
.entered
: bool = False
103 self
.current_item
: t
.Optional
[V
] = None
104 self
.is_hidden
: bool = not isatty(self
.file)
105 self
._last
_line
: t
.Optional
[str] = None
107 def __enter__(self
) -> "ProgressBar[V]":
109 self
.render_progress()
114 exc_type
: t
.Optional
[t
.Type
[BaseException
]],
115 exc_value
: t
.Optional
[BaseException
],
116 tb
: t
.Optional
[TracebackType
],
120 def __iter__(self
) -> t
.Iterator
[V
]:
122 raise RuntimeError("You need to use progress bars in a with block.")
123 self
.render_progress()
124 return self
.generator()
126 def __next__(self
) -> V
:
127 # Iteration is defined in terms of a generator function,
128 # returned by iter(self); use that to define next(). This works
129 # because `self.iter` is an iterable consumed by that generator,
130 # so it is re-entry safe. Calling `next(self.generator())`
131 # twice works and does "what you want".
132 return next(iter(self
))
134 def render_finish(self
) -> None:
137 self
.file.write(AFTER_BAR
)
141 def pct(self
) -> float:
144 return min(self
.pos
/ (float(self
.length
or 1) or 1), 1.0)
147 def time_per_iteration(self
) -> float:
150 return sum(self
.avg
) / float(len(self
.avg
))
153 def eta(self
) -> float:
154 if self
.length
is not None and not self
.finished
:
155 return self
.time_per_iteration
* (self
.length
- self
.pos
)
158 def format_eta(self
) -> str:
168 return f
"{t}d {hours:02}:{minutes:02}:{seconds:02}"
170 return f
"{hours:02}:{minutes:02}:{seconds:02}"
173 def format_pos(self
) -> str:
175 if self
.length
is not None:
176 pos
+= f
"/{self.length}"
179 def format_pct(self
) -> str:
180 return f
"{int(self.pct * 100): 4}%"[1:]
182 def format_bar(self
) -> str:
183 if self
.length
is not None:
184 bar_length
= int(self
.pct
* self
.width
)
185 bar
= self
.fill_char
* bar_length
186 bar
+= self
.empty_char
* (self
.width
- bar_length
)
188 bar
= self
.fill_char
* self
.width
190 chars
= list(self
.empty_char
* (self
.width
or 1))
191 if self
.time_per_iteration
!= 0:
194 (math
.cos(self
.pos
* self
.time_per_iteration
) / 2.0 + 0.5)
201 def format_progress_line(self
) -> str:
202 show_percent
= self
.show_percent
205 if self
.length
is not None and show_percent
is None:
206 show_percent
= not self
.show_pos
209 info_bits
.append(self
.format_pos())
211 info_bits
.append(self
.format_pct())
212 if self
.show_eta
and self
.eta_known
and not self
.finished
:
213 info_bits
.append(self
.format_eta())
214 if self
.item_show_func
is not None:
215 item_info
= self
.item_show_func(self
.current_item
)
216 if item_info
is not None:
217 info_bits
.append(item_info
)
223 "bar": self
.format_bar(),
224 "info": self
.info_sep
.join(info_bits
),
228 def render_progress(self
) -> None:
232 # Only output the label as it changes if the output is not a
233 # TTY. Use file=stderr if you expect to be piping stdout.
234 if self
._last
_line
!= self
.label
:
235 self
._last
_line
= self
.label
236 echo(self
.label
, file=self
.file, color
=self
.color
)
241 # Update width in case the terminal has been resized
243 old_width
= self
.width
245 clutter_length
= term_len(self
.format_progress_line())
246 new_width
= max(0, shutil
.get_terminal_size().columns
- clutter_length
)
247 if new_width
< old_width
:
248 buf
.append(BEFORE_BAR
)
249 buf
.append(" " * self
.max_width
) # type: ignore
250 self
.max_width
= new_width
251 self
.width
= new_width
253 clear_width
= self
.width
254 if self
.max_width
is not None:
255 clear_width
= self
.max_width
257 buf
.append(BEFORE_BAR
)
258 line
= self
.format_progress_line()
259 line_len
= term_len(line
)
260 if self
.max_width
is None or self
.max_width
< line_len
:
261 self
.max_width
= line_len
264 buf
.append(" " * (clear_width
- line_len
))
266 # Render the line only if it changed.
268 if line
!= self
._last
_line
:
269 self
._last
_line
= line
270 echo(line
, file=self
.file, color
=self
.color
, nl
=False)
273 def make_step(self
, n_steps
: int) -> None:
275 if self
.length
is not None and self
.pos
>= self
.length
:
278 if (time
.time() - self
.last_eta
) < 1.0:
281 self
.last_eta
= time
.time()
283 # self.avg is a rolling list of length <= 7 of steps where steps are
284 # defined as time elapsed divided by the total progress through
287 step
= (time
.time() - self
.start
) / self
.pos
289 step
= time
.time() - self
.start
291 self
.avg
= self
.avg
[-6:] + [step
]
293 self
.eta_known
= self
.length
is not None
295 def update(self
, n_steps
: int, current_item
: t
.Optional
[V
] = None) -> None:
296 """Update the progress bar by advancing a specified number of
297 steps, and optionally set the ``current_item`` for this new
300 :param n_steps: Number of steps to advance.
301 :param current_item: Optional item to set as ``current_item``
302 for the updated position.
304 .. versionchanged:: 8.0
305 Added the ``current_item`` optional parameter.
307 .. versionchanged:: 8.0
308 Only render when the number of steps meets the
309 ``update_min_steps`` threshold.
311 if current_item
is not None:
312 self
.current_item
= current_item
314 self
._completed
_intervals
+= n_steps
316 if self
._completed
_intervals
>= self
.update_min_steps
:
317 self
.make_step(self
._completed
_intervals
)
318 self
.render_progress()
319 self
._completed
_intervals
= 0
321 def finish(self
) -> None:
322 self
.eta_known
= False
323 self
.current_item
= None
326 def generator(self
) -> t
.Iterator
[V
]:
327 """Return a generator which yields the items added to the bar
328 during construction, and updates the progress bar *after* the
329 yielded block returns.
331 # WARNING: the iterator interface for `ProgressBar` relies on
332 # this and only works because this is a simple generator which
333 # doesn't create or manage additional state. If this function
334 # changes, the impact should be evaluated both against
335 # `iter(bar)` and `next(bar)`. `next()` in particular may call
336 # `self.generator()` repeatedly, and this must remain safe in
337 # order for that interface to work.
339 raise RuntimeError("You need to use progress bars in a with block.")
345 self
.current_item
= rv
347 # This allows show_item_func to be updated before the
348 # item is processed. Only trigger at the beginning of
349 # the update interval.
350 if self
._completed
_intervals
== 0:
351 self
.render_progress()
357 self
.render_progress()
360 def pager(generator
: t
.Iterable
[str], color
: t
.Optional
[bool] = None) -> None:
361 """Decide what method to use for paging through text."""
362 stdout
= _default_text_stdout()
364 # There are no standard streams attached to write to. For example,
365 # pythonw on Windows.
369 if not isatty(sys
.stdin
) or not isatty(stdout
):
370 return _nullpager(stdout
, generator
, color
)
371 pager_cmd
= (os
.environ
.get("PAGER", None) or "").strip()
374 return _tempfilepager(generator
, pager_cmd
, color
)
375 return _pipepager(generator
, pager_cmd
, color
)
376 if os
.environ
.get("TERM") in ("dumb", "emacs"):
377 return _nullpager(stdout
, generator
, color
)
378 if WIN
or sys
.platform
.startswith("os2"):
379 return _tempfilepager(generator
, "more <", color
)
380 if hasattr(os
, "system") and os
.system("(less) 2>/dev/null") == 0:
381 return _pipepager(generator
, "less", color
)
385 fd
, filename
= tempfile
.mkstemp()
388 if hasattr(os
, "system") and os
.system(f
'more "{filename}"') == 0:
389 return _pipepager(generator
, "more", color
)
390 return _nullpager(stdout
, generator
, color
)
395 def _pipepager(generator
: t
.Iterable
[str], cmd
: str, color
: t
.Optional
[bool]) -> None:
396 """Page through text by feeding it to another program. Invoking a
397 pager through this might support colors.
401 env
= dict(os
.environ
)
403 # If we're piping to less we might support colors under the
405 cmd_detail
= cmd
.rsplit("/", 1)[-1].split()
406 if color
is None and cmd_detail
[0] == "less":
407 less_flags
= f
"{os.environ.get('LESS', '')}{' '.join(cmd_detail[1:])}"
411 elif "r" in less_flags
or "R" in less_flags
:
414 c
= subprocess
.Popen(cmd
, shell
=True, stdin
=subprocess
.PIPE
, env
=env
)
415 stdin
= t
.cast(t
.BinaryIO
, c
.stdin
)
416 encoding
= get_best_encoding(stdin
)
418 for text
in generator
:
420 text
= strip_ansi(text
)
422 stdin
.write(text
.encode(encoding
, "replace"))
423 except (OSError, KeyboardInterrupt):
428 # Less doesn't respect ^C, but catches it for its own UI purposes (aborting
429 # search or other commands inside less).
431 # That means when the user hits ^C, the parent process (click) terminates,
432 # but less is still alive, paging the output and messing up the terminal.
434 # If the user wants to make the pager exit on ^C, they should set
435 # `LESS='-K'`. It's not our decision to make.
439 except KeyboardInterrupt:
446 generator
: t
.Iterable
[str], cmd
: str, color
: t
.Optional
[bool]
448 """Page through text by invoking a program on a temporary file."""
451 fd
, filename
= tempfile
.mkstemp()
452 # TODO: This never terminates if the passed generator never terminates.
453 text
= "".join(generator
)
455 text
= strip_ansi(text
)
456 encoding
= get_best_encoding(sys
.stdout
)
457 with
open_stream(filename
, "wb")[0] as f
:
458 f
.write(text
.encode(encoding
))
460 os
.system(f
'{cmd} "{filename}"')
467 stream
: t
.TextIO
, generator
: t
.Iterable
[str], color
: t
.Optional
[bool]
469 """Simply print unformatted text. This is the ultimate fallback."""
470 for text
in generator
:
472 text
= strip_ansi(text
)
479 editor
: t
.Optional
[str] = None,
480 env
: t
.Optional
[t
.Mapping
[str, str]] = None,
481 require_save
: bool = True,
482 extension
: str = ".txt",
486 self
.require_save
= require_save
487 self
.extension
= extension
489 def get_editor(self
) -> str:
490 if self
.editor
is not None:
492 for key
in "VISUAL", "EDITOR":
493 rv
= os
.environ
.get(key
)
498 for editor
in "sensible-editor", "vim", "nano":
499 if os
.system(f
"which {editor} >/dev/null 2>&1") == 0:
503 def edit_file(self
, filename
: str) -> None:
506 editor
= self
.get_editor()
507 environ
: t
.Optional
[t
.Dict
[str, str]] = None
510 environ
= os
.environ
.copy()
511 environ
.update(self
.env
)
514 c
= subprocess
.Popen(f
'{editor} "{filename}"', env
=environ
, shell
=True)
517 raise ClickException(
518 _("{editor}: Editing failed").format(editor
=editor
)
521 raise ClickException(
522 _("{editor}: Editing failed: {e}").format(editor
=editor
, e
=e
)
525 def edit(self
, text
: t
.Optional
[t
.AnyStr
]) -> t
.Optional
[t
.AnyStr
]:
530 elif isinstance(text
, (bytes
, bytearray
)):
533 if text
and not text
.endswith("\n"):
537 data
= text
.replace("\n", "\r\n").encode("utf-8-sig")
539 data
= text
.encode("utf-8")
541 fd
, name
= tempfile
.mkstemp(prefix
="editor-", suffix
=self
.extension
)
545 with os
.fdopen(fd
, "wb") as f
:
548 # If the filesystem resolution is 1 second, like Mac OS
549 # 10.12 Extended, or 2 seconds, like FAT32, and the editor
550 # closes very fast, require_save can fail. Set the modified
551 # time to be 2 seconds in the past to work around this.
552 os
.utime(name
, (os
.path
.getatime(name
), os
.path
.getmtime(name
) - 2))
553 # Depending on the resolution, the exact value might not be
554 # recorded, so get the new recorded value.
555 timestamp
= os
.path
.getmtime(name
)
559 if self
.require_save
and os
.path
.getmtime(name
) == timestamp
:
562 with
open(name
, "rb") as f
:
565 if isinstance(text
, (bytes
, bytearray
)):
568 return rv
.decode("utf-8-sig").replace("\r\n", "\n") # type: ignore
573 def open_url(url
: str, wait
: bool = False, locate
: bool = False) -> int:
576 def _unquote_file(url
: str) -> str:
577 from urllib
.parse
import unquote
579 if url
.startswith("file://"):
580 url
= unquote(url
[7:])
584 if sys
.platform
== "darwin":
590 args
.append(_unquote_file(url
))
591 null
= open("/dev/null", "w")
593 return subprocess
.Popen(args
, stderr
=null
).wait()
598 url
= _unquote_file(url
.replace('"', ""))
599 args
= f
'explorer /select,"{url}"'
601 url
= url
.replace('"', "")
602 wait_str
= "/WAIT" if wait
else ""
603 args
= f
'start {wait_str} "" "{url}"'
604 return os
.system(args
)
607 url
= os
.path
.dirname(_unquote_file(url
).replace('"', ""))
608 args
= f
'cygstart "{url}"'
610 url
= url
.replace('"', "")
611 wait_str
= "-w" if wait
else ""
612 args
= f
'cygstart {wait_str} "{url}"'
613 return os
.system(args
)
617 url
= os
.path
.dirname(_unquote_file(url
)) or "."
619 url
= _unquote_file(url
)
620 c
= subprocess
.Popen(["xdg-open", url
])
625 if url
.startswith(("http://", "https://")) and not locate
and not wait
:
633 def _translate_ch_to_exc(ch
: str) -> t
.Optional
[BaseException
]:
635 raise KeyboardInterrupt()
637 if ch
== "\x04" and not WIN
: # Unix-like, Ctrl+D
640 if ch
== "\x1a" and WIN
: # Windows, Ctrl+Z
649 @contextlib.contextmanager
650 def raw_terminal() -> t
.Iterator
[int]:
653 def getchar(echo
: bool) -> str:
654 # The function `getch` will return a bytes object corresponding to
655 # the pressed character. Since Windows 10 build 1803, it will also
656 # return \x00 when called a second time after pressing a regular key.
658 # `getwch` does not share this probably-bugged behavior. Moreover, it
659 # returns a Unicode object by default, which is what we want.
661 # Either of these functions will return \x00 or \xe0 to indicate
662 # a special key, and you need to call the same function again to get
663 # the "rest" of the code. The fun part is that \u00e0 is
664 # "latin small letter a with grave", so if you type that on a French
665 # keyboard, you _also_ get a \xe0.
666 # E.g., consider the Up arrow. This returns \xe0 and then \x48. The
667 # resulting Unicode string reads as "a with grave" + "capital H".
668 # This is indistinguishable from when the user actually types
669 # "a with grave" and then "capital H".
671 # When \xe0 is returned, we assume it's part of a special-key sequence
672 # and call `getwch` again, but that means that when the user types
673 # the \u00e0 character, `getchar` doesn't return until a second
674 # character is typed.
675 # The alternative is returning immediately, but that would mess up
676 # cross-platform handling of arrow keys and others that start with
677 # \xe0. Another option is using `getch`, but then we can't reliably
678 # read non-ASCII characters, because return values of `getch` are
679 # limited to the current 8-bit codepage.
681 # Anyway, Click doesn't claim to do this Right(tm), and using `getwch`
682 # is doing the right thing in more situations than with `getch`.
683 func
: t
.Callable
[[], str]
686 func
= msvcrt
.getwche
# type: ignore
688 func
= msvcrt
.getwch
# type: ignore
692 if rv
in ("\x00", "\xe0"):
693 # \x00 and \xe0 are control characters that indicate special key,
697 _translate_ch_to_exc(rv
)
704 @contextlib.contextmanager
705 def raw_terminal() -> t
.Iterator
[int]:
706 f
: t
.Optional
[t
.TextIO
]
709 if not isatty(sys
.stdin
):
713 fd
= sys
.stdin
.fileno()
717 old_settings
= termios
.tcgetattr(fd
)
723 termios
.tcsetattr(fd
, termios
.TCSADRAIN
, old_settings
)
728 except termios
.error
:
731 def getchar(echo
: bool) -> str:
732 with
raw_terminal() as fd
:
733 ch
= os
.read(fd
, 32).decode(get_best_encoding(sys
.stdin
), "replace")
735 if echo
and isatty(sys
.stdout
):
738 _translate_ch_to_exc(ch
)