python (3.11.7)

(root)/
lib/
python3.11/
site-packages/
pip/
_internal/
self_outdated_check.py
       1  import datetime
       2  import functools
       3  import hashlib
       4  import json
       5  import logging
       6  import optparse
       7  import os.path
       8  import sys
       9  from dataclasses import dataclass
      10  from typing import Any, Callable, Dict, Optional
      11  
      12  from pip._vendor.packaging.version import parse as parse_version
      13  from pip._vendor.rich.console import Group
      14  from pip._vendor.rich.markup import escape
      15  from pip._vendor.rich.text import Text
      16  
      17  from pip._internal.index.collector import LinkCollector
      18  from pip._internal.index.package_finder import PackageFinder
      19  from pip._internal.metadata import get_default_environment
      20  from pip._internal.metadata.base import DistributionVersion
      21  from pip._internal.models.selection_prefs import SelectionPreferences
      22  from pip._internal.network.session import PipSession
      23  from pip._internal.utils.compat import WINDOWS
      24  from pip._internal.utils.entrypoints import (
      25      get_best_invocation_for_this_pip,
      26      get_best_invocation_for_this_python,
      27  )
      28  from pip._internal.utils.filesystem import adjacent_tmp_file, check_path_owner, replace
      29  from pip._internal.utils.misc import ensure_dir
      30  
      31  _DATE_FMT = "%Y-%m-%dT%H:%M:%SZ"
      32  
      33  
      34  logger = logging.getLogger(__name__)
      35  
      36  
      37  def _get_statefile_name(key: str) -> str:
      38      key_bytes = key.encode()
      39      name = hashlib.sha224(key_bytes).hexdigest()
      40      return name
      41  
      42  
      43  class ESC[4;38;5;81mSelfCheckState:
      44      def __init__(self, cache_dir: str) -> None:
      45          self._state: Dict[str, Any] = {}
      46          self._statefile_path = None
      47  
      48          # Try to load the existing state
      49          if cache_dir:
      50              self._statefile_path = os.path.join(
      51                  cache_dir, "selfcheck", _get_statefile_name(self.key)
      52              )
      53              try:
      54                  with open(self._statefile_path, encoding="utf-8") as statefile:
      55                      self._state = json.load(statefile)
      56              except (OSError, ValueError, KeyError):
      57                  # Explicitly suppressing exceptions, since we don't want to
      58                  # error out if the cache file is invalid.
      59                  pass
      60  
      61      @property
      62      def key(self) -> str:
      63          return sys.prefix
      64  
      65      def get(self, current_time: datetime.datetime) -> Optional[str]:
      66          """Check if we have a not-outdated version loaded already."""
      67          if not self._state:
      68              return None
      69  
      70          if "last_check" not in self._state:
      71              return None
      72  
      73          if "pypi_version" not in self._state:
      74              return None
      75  
      76          seven_days_in_seconds = 7 * 24 * 60 * 60
      77  
      78          # Determine if we need to refresh the state
      79          last_check = datetime.datetime.strptime(self._state["last_check"], _DATE_FMT)
      80          seconds_since_last_check = (current_time - last_check).total_seconds()
      81          if seconds_since_last_check > seven_days_in_seconds:
      82              return None
      83  
      84          return self._state["pypi_version"]
      85  
      86      def set(self, pypi_version: str, current_time: datetime.datetime) -> None:
      87          # If we do not have a path to cache in, don't bother saving.
      88          if not self._statefile_path:
      89              return
      90  
      91          # Check to make sure that we own the directory
      92          if not check_path_owner(os.path.dirname(self._statefile_path)):
      93              return
      94  
      95          # Now that we've ensured the directory is owned by this user, we'll go
      96          # ahead and make sure that all our directories are created.
      97          ensure_dir(os.path.dirname(self._statefile_path))
      98  
      99          state = {
     100              # Include the key so it's easy to tell which pip wrote the
     101              # file.
     102              "key": self.key,
     103              "last_check": current_time.strftime(_DATE_FMT),
     104              "pypi_version": pypi_version,
     105          }
     106  
     107          text = json.dumps(state, sort_keys=True, separators=(",", ":"))
     108  
     109          with adjacent_tmp_file(self._statefile_path) as f:
     110              f.write(text.encode())
     111  
     112          try:
     113              # Since we have a prefix-specific state file, we can just
     114              # overwrite whatever is there, no need to check.
     115              replace(f.name, self._statefile_path)
     116          except OSError:
     117              # Best effort.
     118              pass
     119  
     120  
     121  @dataclass
     122  class ESC[4;38;5;81mUpgradePrompt:
     123      old: str
     124      new: str
     125  
     126      def __rich__(self) -> Group:
     127          if WINDOWS:
     128              pip_cmd = f"{get_best_invocation_for_this_python()} -m pip"
     129          else:
     130              pip_cmd = get_best_invocation_for_this_pip()
     131  
     132          notice = "[bold][[reset][blue]notice[reset][bold]][reset]"
     133          return Group(
     134              Text(),
     135              Text.from_markup(
     136                  f"{notice} A new release of pip is available: "
     137                  f"[red]{self.old}[reset] -> [green]{self.new}[reset]"
     138              ),
     139              Text.from_markup(
     140                  f"{notice} To update, run: "
     141                  f"[green]{escape(pip_cmd)} install --upgrade pip"
     142              ),
     143          )
     144  
     145  
     146  def was_installed_by_pip(pkg: str) -> bool:
     147      """Checks whether pkg was installed by pip
     148  
     149      This is used not to display the upgrade message when pip is in fact
     150      installed by system package manager, such as dnf on Fedora.
     151      """
     152      dist = get_default_environment().get_distribution(pkg)
     153      return dist is not None and "pip" == dist.installer
     154  
     155  
     156  def _get_current_remote_pip_version(
     157      session: PipSession, options: optparse.Values
     158  ) -> Optional[str]:
     159      # Lets use PackageFinder to see what the latest pip version is
     160      link_collector = LinkCollector.create(
     161          session,
     162          options=options,
     163          suppress_no_index=True,
     164      )
     165  
     166      # Pass allow_yanked=False so we don't suggest upgrading to a
     167      # yanked version.
     168      selection_prefs = SelectionPreferences(
     169          allow_yanked=False,
     170          allow_all_prereleases=False,  # Explicitly set to False
     171      )
     172  
     173      finder = PackageFinder.create(
     174          link_collector=link_collector,
     175          selection_prefs=selection_prefs,
     176      )
     177      best_candidate = finder.find_best_candidate("pip").best_candidate
     178      if best_candidate is None:
     179          return None
     180  
     181      return str(best_candidate.version)
     182  
     183  
     184  def _self_version_check_logic(
     185      *,
     186      state: SelfCheckState,
     187      current_time: datetime.datetime,
     188      local_version: DistributionVersion,
     189      get_remote_version: Callable[[], Optional[str]],
     190  ) -> Optional[UpgradePrompt]:
     191      remote_version_str = state.get(current_time)
     192      if remote_version_str is None:
     193          remote_version_str = get_remote_version()
     194          if remote_version_str is None:
     195              logger.debug("No remote pip version found")
     196              return None
     197          state.set(remote_version_str, current_time)
     198  
     199      remote_version = parse_version(remote_version_str)
     200      logger.debug("Remote version of pip: %s", remote_version)
     201      logger.debug("Local version of pip:  %s", local_version)
     202  
     203      pip_installed_by_pip = was_installed_by_pip("pip")
     204      logger.debug("Was pip installed by pip? %s", pip_installed_by_pip)
     205      if not pip_installed_by_pip:
     206          return None  # Only suggest upgrade if pip is installed by pip.
     207  
     208      local_version_is_older = (
     209          local_version < remote_version
     210          and local_version.base_version != remote_version.base_version
     211      )
     212      if local_version_is_older:
     213          return UpgradePrompt(old=str(local_version), new=remote_version_str)
     214  
     215      return None
     216  
     217  
     218  def pip_self_version_check(session: PipSession, options: optparse.Values) -> None:
     219      """Check for an update for pip.
     220  
     221      Limit the frequency of checks to once per week. State is stored either in
     222      the active virtualenv or in the user's USER_CACHE_DIR keyed off the prefix
     223      of the pip script path.
     224      """
     225      installed_dist = get_default_environment().get_distribution("pip")
     226      if not installed_dist:
     227          return
     228  
     229      try:
     230          upgrade_prompt = _self_version_check_logic(
     231              state=SelfCheckState(cache_dir=options.cache_dir),
     232              current_time=datetime.datetime.utcnow(),
     233              local_version=installed_dist.version,
     234              get_remote_version=functools.partial(
     235                  _get_current_remote_pip_version, session, options
     236              ),
     237          )
     238          if upgrade_prompt is not None:
     239              logger.warning("[present-rich] %s", upgrade_prompt)
     240      except Exception:
     241          logger.warning("There was an error checking the latest version of pip.")
     242          logger.debug("See below for error", exc_info=True)