python (3.11.7)

(root)/
lib/
python3.11/
site-packages/
pip/
_internal/
cli/
spinners.py
       1  import contextlib
       2  import itertools
       3  import logging
       4  import sys
       5  import time
       6  from typing import IO, Generator, Optional
       7  
       8  from pip._internal.utils.compat import WINDOWS
       9  from pip._internal.utils.logging import get_indentation
      10  
      11  logger = logging.getLogger(__name__)
      12  
      13  
      14  class ESC[4;38;5;81mSpinnerInterface:
      15      def spin(self) -> None:
      16          raise NotImplementedError()
      17  
      18      def finish(self, final_status: str) -> None:
      19          raise NotImplementedError()
      20  
      21  
      22  class ESC[4;38;5;81mInteractiveSpinner(ESC[4;38;5;149mSpinnerInterface):
      23      def __init__(
      24          self,
      25          message: str,
      26          file: Optional[IO[str]] = None,
      27          spin_chars: str = "-\\|/",
      28          # Empirically, 8 updates/second looks nice
      29          min_update_interval_seconds: float = 0.125,
      30      ):
      31          self._message = message
      32          if file is None:
      33              file = sys.stdout
      34          self._file = file
      35          self._rate_limiter = RateLimiter(min_update_interval_seconds)
      36          self._finished = False
      37  
      38          self._spin_cycle = itertools.cycle(spin_chars)
      39  
      40          self._file.write(" " * get_indentation() + self._message + " ... ")
      41          self._width = 0
      42  
      43      def _write(self, status: str) -> None:
      44          assert not self._finished
      45          # Erase what we wrote before by backspacing to the beginning, writing
      46          # spaces to overwrite the old text, and then backspacing again
      47          backup = "\b" * self._width
      48          self._file.write(backup + " " * self._width + backup)
      49          # Now we have a blank slate to add our status
      50          self._file.write(status)
      51          self._width = len(status)
      52          self._file.flush()
      53          self._rate_limiter.reset()
      54  
      55      def spin(self) -> None:
      56          if self._finished:
      57              return
      58          if not self._rate_limiter.ready():
      59              return
      60          self._write(next(self._spin_cycle))
      61  
      62      def finish(self, final_status: str) -> None:
      63          if self._finished:
      64              return
      65          self._write(final_status)
      66          self._file.write("\n")
      67          self._file.flush()
      68          self._finished = True
      69  
      70  
      71  # Used for dumb terminals, non-interactive installs (no tty), etc.
      72  # We still print updates occasionally (once every 60 seconds by default) to
      73  # act as a keep-alive for systems like Travis-CI that take lack-of-output as
      74  # an indication that a task has frozen.
      75  class ESC[4;38;5;81mNonInteractiveSpinner(ESC[4;38;5;149mSpinnerInterface):
      76      def __init__(self, message: str, min_update_interval_seconds: float = 60.0) -> None:
      77          self._message = message
      78          self._finished = False
      79          self._rate_limiter = RateLimiter(min_update_interval_seconds)
      80          self._update("started")
      81  
      82      def _update(self, status: str) -> None:
      83          assert not self._finished
      84          self._rate_limiter.reset()
      85          logger.info("%s: %s", self._message, status)
      86  
      87      def spin(self) -> None:
      88          if self._finished:
      89              return
      90          if not self._rate_limiter.ready():
      91              return
      92          self._update("still running...")
      93  
      94      def finish(self, final_status: str) -> None:
      95          if self._finished:
      96              return
      97          self._update(f"finished with status '{final_status}'")
      98          self._finished = True
      99  
     100  
     101  class ESC[4;38;5;81mRateLimiter:
     102      def __init__(self, min_update_interval_seconds: float) -> None:
     103          self._min_update_interval_seconds = min_update_interval_seconds
     104          self._last_update: float = 0
     105  
     106      def ready(self) -> bool:
     107          now = time.time()
     108          delta = now - self._last_update
     109          return delta >= self._min_update_interval_seconds
     110  
     111      def reset(self) -> None:
     112          self._last_update = time.time()
     113  
     114  
     115  @contextlib.contextmanager
     116  def open_spinner(message: str) -> Generator[SpinnerInterface, None, None]:
     117      # Interactive spinner goes directly to sys.stdout rather than being routed
     118      # through the logging system, but it acts like it has level INFO,
     119      # i.e. it's only displayed if we're at level INFO or better.
     120      # Non-interactive spinner goes through the logging system, so it is always
     121      # in sync with logging configuration.
     122      if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO:
     123          spinner: SpinnerInterface = InteractiveSpinner(message)
     124      else:
     125          spinner = NonInteractiveSpinner(message)
     126      try:
     127          with hidden_cursor(sys.stdout):
     128              yield spinner
     129      except KeyboardInterrupt:
     130          spinner.finish("canceled")
     131          raise
     132      except Exception:
     133          spinner.finish("error")
     134          raise
     135      else:
     136          spinner.finish("done")
     137  
     138  
     139  HIDE_CURSOR = "\x1b[?25l"
     140  SHOW_CURSOR = "\x1b[?25h"
     141  
     142  
     143  @contextlib.contextmanager
     144  def hidden_cursor(file: IO[str]) -> Generator[None, None, None]:
     145      # The Windows terminal does not support the hide/show cursor ANSI codes,
     146      # even via colorama. So don't even try.
     147      if WINDOWS:
     148          yield
     149      # We don't want to clutter the output with control characters if we're
     150      # writing to a file, or if the user is running with --quiet.
     151      # See https://github.com/pypa/pip/issues/3418
     152      elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO:
     153          yield
     154      else:
     155          file.write(HIDE_CURSOR)
     156          try:
     157              yield
     158          finally:
     159              file.write(SHOW_CURSOR)