python (3.11.7)

(root)/
lib/
python3.11/
site-packages/
pip/
_internal/
utils/
logging.py
       1  import contextlib
       2  import errno
       3  import logging
       4  import logging.handlers
       5  import os
       6  import sys
       7  import threading
       8  from dataclasses import dataclass
       9  from io import TextIOWrapper
      10  from logging import Filter
      11  from typing import Any, ClassVar, Generator, List, Optional, TextIO, Type
      12  
      13  from pip._vendor.rich.console import (
      14      Console,
      15      ConsoleOptions,
      16      ConsoleRenderable,
      17      RenderableType,
      18      RenderResult,
      19      RichCast,
      20  )
      21  from pip._vendor.rich.highlighter import NullHighlighter
      22  from pip._vendor.rich.logging import RichHandler
      23  from pip._vendor.rich.segment import Segment
      24  from pip._vendor.rich.style import Style
      25  
      26  from pip._internal.utils._log import VERBOSE, getLogger
      27  from pip._internal.utils.compat import WINDOWS
      28  from pip._internal.utils.deprecation import DEPRECATION_MSG_PREFIX
      29  from pip._internal.utils.misc import ensure_dir
      30  
      31  _log_state = threading.local()
      32  subprocess_logger = getLogger("pip.subprocessor")
      33  
      34  
      35  class ESC[4;38;5;81mBrokenStdoutLoggingError(ESC[4;38;5;149mException):
      36      """
      37      Raised if BrokenPipeError occurs for the stdout stream while logging.
      38      """
      39  
      40  
      41  def _is_broken_pipe_error(exc_class: Type[BaseException], exc: BaseException) -> bool:
      42      if exc_class is BrokenPipeError:
      43          return True
      44  
      45      # On Windows, a broken pipe can show up as EINVAL rather than EPIPE:
      46      # https://bugs.python.org/issue19612
      47      # https://bugs.python.org/issue30418
      48      if not WINDOWS:
      49          return False
      50  
      51      return isinstance(exc, OSError) and exc.errno in (errno.EINVAL, errno.EPIPE)
      52  
      53  
      54  @contextlib.contextmanager
      55  def indent_log(num: int = 2) -> Generator[None, None, None]:
      56      """
      57      A context manager which will cause the log output to be indented for any
      58      log messages emitted inside it.
      59      """
      60      # For thread-safety
      61      _log_state.indentation = get_indentation()
      62      _log_state.indentation += num
      63      try:
      64          yield
      65      finally:
      66          _log_state.indentation -= num
      67  
      68  
      69  def get_indentation() -> int:
      70      return getattr(_log_state, "indentation", 0)
      71  
      72  
      73  class ESC[4;38;5;81mIndentingFormatter(ESC[4;38;5;149mloggingESC[4;38;5;149m.ESC[4;38;5;149mFormatter):
      74      default_time_format = "%Y-%m-%dT%H:%M:%S"
      75  
      76      def __init__(
      77          self,
      78          *args: Any,
      79          add_timestamp: bool = False,
      80          **kwargs: Any,
      81      ) -> None:
      82          """
      83          A logging.Formatter that obeys the indent_log() context manager.
      84  
      85          :param add_timestamp: A bool indicating output lines should be prefixed
      86              with their record's timestamp.
      87          """
      88          self.add_timestamp = add_timestamp
      89          super().__init__(*args, **kwargs)
      90  
      91      def get_message_start(self, formatted: str, levelno: int) -> str:
      92          """
      93          Return the start of the formatted log message (not counting the
      94          prefix to add to each line).
      95          """
      96          if levelno < logging.WARNING:
      97              return ""
      98          if formatted.startswith(DEPRECATION_MSG_PREFIX):
      99              # Then the message already has a prefix.  We don't want it to
     100              # look like "WARNING: DEPRECATION: ...."
     101              return ""
     102          if levelno < logging.ERROR:
     103              return "WARNING: "
     104  
     105          return "ERROR: "
     106  
     107      def format(self, record: logging.LogRecord) -> str:
     108          """
     109          Calls the standard formatter, but will indent all of the log message
     110          lines by our current indentation level.
     111          """
     112          formatted = super().format(record)
     113          message_start = self.get_message_start(formatted, record.levelno)
     114          formatted = message_start + formatted
     115  
     116          prefix = ""
     117          if self.add_timestamp:
     118              prefix = f"{self.formatTime(record)} "
     119          prefix += " " * get_indentation()
     120          formatted = "".join([prefix + line for line in formatted.splitlines(True)])
     121          return formatted
     122  
     123  
     124  @dataclass
     125  class ESC[4;38;5;81mIndentedRenderable:
     126      renderable: RenderableType
     127      indent: int
     128  
     129      def __rich_console__(
     130          self, console: Console, options: ConsoleOptions
     131      ) -> RenderResult:
     132          segments = console.render(self.renderable, options)
     133          lines = Segment.split_lines(segments)
     134          for line in lines:
     135              yield Segment(" " * self.indent)
     136              yield from line
     137              yield Segment("\n")
     138  
     139  
     140  class ESC[4;38;5;81mRichPipStreamHandler(ESC[4;38;5;149mRichHandler):
     141      KEYWORDS: ClassVar[Optional[List[str]]] = []
     142  
     143      def __init__(self, stream: Optional[TextIO], no_color: bool) -> None:
     144          super().__init__(
     145              console=Console(file=stream, no_color=no_color, soft_wrap=True),
     146              show_time=False,
     147              show_level=False,
     148              show_path=False,
     149              highlighter=NullHighlighter(),
     150          )
     151  
     152      # Our custom override on Rich's logger, to make things work as we need them to.
     153      def emit(self, record: logging.LogRecord) -> None:
     154          style: Optional[Style] = None
     155  
     156          # If we are given a diagnostic error to present, present it with indentation.
     157          assert isinstance(record.args, tuple)
     158          if record.msg == "[present-rich] %s" and len(record.args) == 1:
     159              rich_renderable = record.args[0]
     160              assert isinstance(
     161                  rich_renderable, (ConsoleRenderable, RichCast, str)
     162              ), f"{rich_renderable} is not rich-console-renderable"
     163  
     164              renderable: RenderableType = IndentedRenderable(
     165                  rich_renderable, indent=get_indentation()
     166              )
     167          else:
     168              message = self.format(record)
     169              renderable = self.render_message(record, message)
     170              if record.levelno is not None:
     171                  if record.levelno >= logging.ERROR:
     172                      style = Style(color="red")
     173                  elif record.levelno >= logging.WARNING:
     174                      style = Style(color="yellow")
     175  
     176          try:
     177              self.console.print(renderable, overflow="ignore", crop=False, style=style)
     178          except Exception:
     179              self.handleError(record)
     180  
     181      def handleError(self, record: logging.LogRecord) -> None:
     182          """Called when logging is unable to log some output."""
     183  
     184          exc_class, exc = sys.exc_info()[:2]
     185          # If a broken pipe occurred while calling write() or flush() on the
     186          # stdout stream in logging's Handler.emit(), then raise our special
     187          # exception so we can handle it in main() instead of logging the
     188          # broken pipe error and continuing.
     189          if (
     190              exc_class
     191              and exc
     192              and self.console.file is sys.stdout
     193              and _is_broken_pipe_error(exc_class, exc)
     194          ):
     195              raise BrokenStdoutLoggingError()
     196  
     197          return super().handleError(record)
     198  
     199  
     200  class ESC[4;38;5;81mBetterRotatingFileHandler(ESC[4;38;5;149mloggingESC[4;38;5;149m.ESC[4;38;5;149mhandlersESC[4;38;5;149m.ESC[4;38;5;149mRotatingFileHandler):
     201      def _open(self) -> TextIOWrapper:
     202          ensure_dir(os.path.dirname(self.baseFilename))
     203          return super()._open()
     204  
     205  
     206  class ESC[4;38;5;81mMaxLevelFilter(ESC[4;38;5;149mFilter):
     207      def __init__(self, level: int) -> None:
     208          self.level = level
     209  
     210      def filter(self, record: logging.LogRecord) -> bool:
     211          return record.levelno < self.level
     212  
     213  
     214  class ESC[4;38;5;81mExcludeLoggerFilter(ESC[4;38;5;149mFilter):
     215  
     216      """
     217      A logging Filter that excludes records from a logger (or its children).
     218      """
     219  
     220      def filter(self, record: logging.LogRecord) -> bool:
     221          # The base Filter class allows only records from a logger (or its
     222          # children).
     223          return not super().filter(record)
     224  
     225  
     226  def setup_logging(verbosity: int, no_color: bool, user_log_file: Optional[str]) -> int:
     227      """Configures and sets up all of the logging
     228  
     229      Returns the requested logging level, as its integer value.
     230      """
     231  
     232      # Determine the level to be logging at.
     233      if verbosity >= 2:
     234          level_number = logging.DEBUG
     235      elif verbosity == 1:
     236          level_number = VERBOSE
     237      elif verbosity == -1:
     238          level_number = logging.WARNING
     239      elif verbosity == -2:
     240          level_number = logging.ERROR
     241      elif verbosity <= -3:
     242          level_number = logging.CRITICAL
     243      else:
     244          level_number = logging.INFO
     245  
     246      level = logging.getLevelName(level_number)
     247  
     248      # The "root" logger should match the "console" level *unless* we also need
     249      # to log to a user log file.
     250      include_user_log = user_log_file is not None
     251      if include_user_log:
     252          additional_log_file = user_log_file
     253          root_level = "DEBUG"
     254      else:
     255          additional_log_file = "/dev/null"
     256          root_level = level
     257  
     258      # Disable any logging besides WARNING unless we have DEBUG level logging
     259      # enabled for vendored libraries.
     260      vendored_log_level = "WARNING" if level in ["INFO", "ERROR"] else "DEBUG"
     261  
     262      # Shorthands for clarity
     263      log_streams = {
     264          "stdout": "ext://sys.stdout",
     265          "stderr": "ext://sys.stderr",
     266      }
     267      handler_classes = {
     268          "stream": "pip._internal.utils.logging.RichPipStreamHandler",
     269          "file": "pip._internal.utils.logging.BetterRotatingFileHandler",
     270      }
     271      handlers = ["console", "console_errors", "console_subprocess"] + (
     272          ["user_log"] if include_user_log else []
     273      )
     274  
     275      logging.config.dictConfig(
     276          {
     277              "version": 1,
     278              "disable_existing_loggers": False,
     279              "filters": {
     280                  "exclude_warnings": {
     281                      "()": "pip._internal.utils.logging.MaxLevelFilter",
     282                      "level": logging.WARNING,
     283                  },
     284                  "restrict_to_subprocess": {
     285                      "()": "logging.Filter",
     286                      "name": subprocess_logger.name,
     287                  },
     288                  "exclude_subprocess": {
     289                      "()": "pip._internal.utils.logging.ExcludeLoggerFilter",
     290                      "name": subprocess_logger.name,
     291                  },
     292              },
     293              "formatters": {
     294                  "indent": {
     295                      "()": IndentingFormatter,
     296                      "format": "%(message)s",
     297                  },
     298                  "indent_with_timestamp": {
     299                      "()": IndentingFormatter,
     300                      "format": "%(message)s",
     301                      "add_timestamp": True,
     302                  },
     303              },
     304              "handlers": {
     305                  "console": {
     306                      "level": level,
     307                      "class": handler_classes["stream"],
     308                      "no_color": no_color,
     309                      "stream": log_streams["stdout"],
     310                      "filters": ["exclude_subprocess", "exclude_warnings"],
     311                      "formatter": "indent",
     312                  },
     313                  "console_errors": {
     314                      "level": "WARNING",
     315                      "class": handler_classes["stream"],
     316                      "no_color": no_color,
     317                      "stream": log_streams["stderr"],
     318                      "filters": ["exclude_subprocess"],
     319                      "formatter": "indent",
     320                  },
     321                  # A handler responsible for logging to the console messages
     322                  # from the "subprocessor" logger.
     323                  "console_subprocess": {
     324                      "level": level,
     325                      "class": handler_classes["stream"],
     326                      "stream": log_streams["stderr"],
     327                      "no_color": no_color,
     328                      "filters": ["restrict_to_subprocess"],
     329                      "formatter": "indent",
     330                  },
     331                  "user_log": {
     332                      "level": "DEBUG",
     333                      "class": handler_classes["file"],
     334                      "filename": additional_log_file,
     335                      "encoding": "utf-8",
     336                      "delay": True,
     337                      "formatter": "indent_with_timestamp",
     338                  },
     339              },
     340              "root": {
     341                  "level": root_level,
     342                  "handlers": handlers,
     343              },
     344              "loggers": {"pip._vendor": {"level": vendored_log_level}},
     345          }
     346      )
     347  
     348      return level_number