python (3.12.0)
1 import contextlib
2 import itertools
3 import logging
4 import sys
5 import time
6 from typing import IO, Generator, Optional
7
8 from pip._internal.utils.compat import WINDOWS
9 from pip._internal.utils.logging import get_indentation
10
11 logger = logging.getLogger(__name__)
12
13
14 class ESC[4;38;5;81mSpinnerInterface:
15 def spin(self) -> None:
16 raise NotImplementedError()
17
18 def finish(self, final_status: str) -> None:
19 raise NotImplementedError()
20
21
22 class ESC[4;38;5;81mInteractiveSpinner(ESC[4;38;5;149mSpinnerInterface):
23 def __init__(
24 self,
25 message: str,
26 file: Optional[IO[str]] = None,
27 spin_chars: str = "-\\|/",
28 # Empirically, 8 updates/second looks nice
29 min_update_interval_seconds: float = 0.125,
30 ):
31 self._message = message
32 if file is None:
33 file = sys.stdout
34 self._file = file
35 self._rate_limiter = RateLimiter(min_update_interval_seconds)
36 self._finished = False
37
38 self._spin_cycle = itertools.cycle(spin_chars)
39
40 self._file.write(" " * get_indentation() + self._message + " ... ")
41 self._width = 0
42
43 def _write(self, status: str) -> None:
44 assert not self._finished
45 # Erase what we wrote before by backspacing to the beginning, writing
46 # spaces to overwrite the old text, and then backspacing again
47 backup = "\b" * self._width
48 self._file.write(backup + " " * self._width + backup)
49 # Now we have a blank slate to add our status
50 self._file.write(status)
51 self._width = len(status)
52 self._file.flush()
53 self._rate_limiter.reset()
54
55 def spin(self) -> None:
56 if self._finished:
57 return
58 if not self._rate_limiter.ready():
59 return
60 self._write(next(self._spin_cycle))
61
62 def finish(self, final_status: str) -> None:
63 if self._finished:
64 return
65 self._write(final_status)
66 self._file.write("\n")
67 self._file.flush()
68 self._finished = True
69
70
71 # Used for dumb terminals, non-interactive installs (no tty), etc.
72 # We still print updates occasionally (once every 60 seconds by default) to
73 # act as a keep-alive for systems like Travis-CI that take lack-of-output as
74 # an indication that a task has frozen.
75 class ESC[4;38;5;81mNonInteractiveSpinner(ESC[4;38;5;149mSpinnerInterface):
76 def __init__(self, message: str, min_update_interval_seconds: float = 60.0) -> None:
77 self._message = message
78 self._finished = False
79 self._rate_limiter = RateLimiter(min_update_interval_seconds)
80 self._update("started")
81
82 def _update(self, status: str) -> None:
83 assert not self._finished
84 self._rate_limiter.reset()
85 logger.info("%s: %s", self._message, status)
86
87 def spin(self) -> None:
88 if self._finished:
89 return
90 if not self._rate_limiter.ready():
91 return
92 self._update("still running...")
93
94 def finish(self, final_status: str) -> None:
95 if self._finished:
96 return
97 self._update(f"finished with status '{final_status}'")
98 self._finished = True
99
100
101 class ESC[4;38;5;81mRateLimiter:
102 def __init__(self, min_update_interval_seconds: float) -> None:
103 self._min_update_interval_seconds = min_update_interval_seconds
104 self._last_update: float = 0
105
106 def ready(self) -> bool:
107 now = time.time()
108 delta = now - self._last_update
109 return delta >= self._min_update_interval_seconds
110
111 def reset(self) -> None:
112 self._last_update = time.time()
113
114
115 @contextlib.contextmanager
116 def open_spinner(message: str) -> Generator[SpinnerInterface, None, None]:
117 # Interactive spinner goes directly to sys.stdout rather than being routed
118 # through the logging system, but it acts like it has level INFO,
119 # i.e. it's only displayed if we're at level INFO or better.
120 # Non-interactive spinner goes through the logging system, so it is always
121 # in sync with logging configuration.
122 if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO:
123 spinner: SpinnerInterface = InteractiveSpinner(message)
124 else:
125 spinner = NonInteractiveSpinner(message)
126 try:
127 with hidden_cursor(sys.stdout):
128 yield spinner
129 except KeyboardInterrupt:
130 spinner.finish("canceled")
131 raise
132 except Exception:
133 spinner.finish("error")
134 raise
135 else:
136 spinner.finish("done")
137
138
139 HIDE_CURSOR = "\x1b[?25l"
140 SHOW_CURSOR = "\x1b[?25h"
141
142
143 @contextlib.contextmanager
144 def hidden_cursor(file: IO[str]) -> Generator[None, None, None]:
145 # The Windows terminal does not support the hide/show cursor ANSI codes,
146 # even via colorama. So don't even try.
147 if WINDOWS:
148 yield
149 # We don't want to clutter the output with control characters if we're
150 # writing to a file, or if the user is running with --quiet.
151 # See https://github.com/pypa/pip/issues/3418
152 elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO:
153 yield
154 else:
155 file.write(HIDE_CURSOR)
156 try:
157 yield
158 finally:
159 file.write(SHOW_CURSOR)