python (3.11.7)
       1  """Contains the Command base classes that depend on PipSession.
       2  
       3  The classes in this module are in a separate module so the commands not
       4  needing download / PackageFinder capability don't unnecessarily import the
       5  PackageFinder machinery and all its vendored dependencies, etc.
       6  """
       7  
       8  import logging
       9  import os
      10  import sys
      11  from functools import partial
      12  from optparse import Values
      13  from typing import TYPE_CHECKING, Any, List, Optional, Tuple
      14  
      15  from pip._internal.cache import WheelCache
      16  from pip._internal.cli import cmdoptions
      17  from pip._internal.cli.base_command import Command
      18  from pip._internal.cli.command_context import CommandContextMixIn
      19  from pip._internal.exceptions import CommandError, PreviousBuildDirError
      20  from pip._internal.index.collector import LinkCollector
      21  from pip._internal.index.package_finder import PackageFinder
      22  from pip._internal.models.selection_prefs import SelectionPreferences
      23  from pip._internal.models.target_python import TargetPython
      24  from pip._internal.network.session import PipSession
      25  from pip._internal.operations.build.build_tracker import BuildTracker
      26  from pip._internal.operations.prepare import RequirementPreparer
      27  from pip._internal.req.constructors import (
      28      install_req_from_editable,
      29      install_req_from_line,
      30      install_req_from_parsed_requirement,
      31      install_req_from_req_string,
      32  )
      33  from pip._internal.req.req_file import parse_requirements
      34  from pip._internal.req.req_install import InstallRequirement
      35  from pip._internal.resolution.base import BaseResolver
      36  from pip._internal.self_outdated_check import pip_self_version_check
      37  from pip._internal.utils.temp_dir import (
      38      TempDirectory,
      39      TempDirectoryTypeRegistry,
      40      tempdir_kinds,
      41  )
      42  from pip._internal.utils.virtualenv import running_under_virtualenv
      43  
      44  if TYPE_CHECKING:
      45      from ssl import SSLContext
      46  
      47  logger = logging.getLogger(__name__)
      48  
      49  
      50  def _create_truststore_ssl_context() -> Optional["SSLContext"]:
      51      if sys.version_info < (3, 10):
      52          raise CommandError("The truststore feature is only available for Python 3.10+")
      53  
      54      try:
      55          import ssl
      56      except ImportError:
      57          logger.warning("Disabling truststore since ssl support is missing")
      58          return None
      59  
      60      try:
      61          import truststore
      62      except ImportError:
      63          raise CommandError(
      64              "To use the truststore feature, 'truststore' must be installed into "
      65              "pip's current environment."
      66          )
      67  
      68      return truststore.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
      69  
      70  
      71  class ESC[4;38;5;81mSessionCommandMixin(ESC[4;38;5;149mCommandContextMixIn):
      72  
      73      """
      74      A class mixin for command classes needing _build_session().
      75      """
      76  
      77      def __init__(self) -> None:
      78          super().__init__()
      79          self._session: Optional[PipSession] = None
      80  
      81      @classmethod
      82      def _get_index_urls(cls, options: Values) -> Optional[List[str]]:
      83          """Return a list of index urls from user-provided options."""
      84          index_urls = []
      85          if not getattr(options, "no_index", False):
      86              url = getattr(options, "index_url", None)
      87              if url:
      88                  index_urls.append(url)
      89          urls = getattr(options, "extra_index_urls", None)
      90          if urls:
      91              index_urls.extend(urls)
      92          # Return None rather than an empty list
      93          return index_urls or None
      94  
      95      def get_default_session(self, options: Values) -> PipSession:
      96          """Get a default-managed session."""
      97          if self._session is None:
      98              self._session = self.enter_context(self._build_session(options))
      99              # there's no type annotation on requests.Session, so it's
     100              # automatically ContextManager[Any] and self._session becomes Any,
     101              # then https://github.com/python/mypy/issues/7696 kicks in
     102              assert self._session is not None
     103          return self._session
     104  
     105      def _build_session(
     106          self,
     107          options: Values,
     108          retries: Optional[int] = None,
     109          timeout: Optional[int] = None,
     110          fallback_to_certifi: bool = False,
     111      ) -> PipSession:
     112          cache_dir = options.cache_dir
     113          assert not cache_dir or os.path.isabs(cache_dir)
     114  
     115          if "truststore" in options.features_enabled:
     116              try:
     117                  ssl_context = _create_truststore_ssl_context()
     118              except Exception:
     119                  if not fallback_to_certifi:
     120                      raise
     121                  ssl_context = None
     122          else:
     123              ssl_context = None
     124  
     125          session = PipSession(
     126              cache=os.path.join(cache_dir, "http") if cache_dir else None,
     127              retries=retries if retries is not None else options.retries,
     128              trusted_hosts=options.trusted_hosts,
     129              index_urls=self._get_index_urls(options),
     130              ssl_context=ssl_context,
     131          )
     132  
     133          # Handle custom ca-bundles from the user
     134          if options.cert:
     135              session.verify = options.cert
     136  
     137          # Handle SSL client certificate
     138          if options.client_cert:
     139              session.cert = options.client_cert
     140  
     141          # Handle timeouts
     142          if options.timeout or timeout:
     143              session.timeout = timeout if timeout is not None else options.timeout
     144  
     145          # Handle configured proxies
     146          if options.proxy:
     147              session.proxies = {
     148                  "http": options.proxy,
     149                  "https": options.proxy,
     150              }
     151  
     152          # Determine if we can prompt the user for authentication or not
     153          session.auth.prompting = not options.no_input
     154          session.auth.keyring_provider = options.keyring_provider
     155  
     156          return session
     157  
     158  
     159  class ESC[4;38;5;81mIndexGroupCommand(ESC[4;38;5;149mCommand, ESC[4;38;5;149mSessionCommandMixin):
     160  
     161      """
     162      Abstract base class for commands with the index_group options.
     163  
     164      This also corresponds to the commands that permit the pip version check.
     165      """
     166  
     167      def handle_pip_version_check(self, options: Values) -> None:
     168          """
     169          Do the pip version check if not disabled.
     170  
     171          This overrides the default behavior of not doing the check.
     172          """
     173          # Make sure the index_group options are present.
     174          assert hasattr(options, "no_index")
     175  
     176          if options.disable_pip_version_check or options.no_index:
     177              return
     178  
     179          # Otherwise, check if we're using the latest version of pip available.
     180          session = self._build_session(
     181              options,
     182              retries=0,
     183              timeout=min(5, options.timeout),
     184              # This is set to ensure the function does not fail when truststore is
     185              # specified in use-feature but cannot be loaded. This usually raises a
     186              # CommandError and shows a nice user-facing error, but this function is not
     187              # called in that try-except block.
     188              fallback_to_certifi=True,
     189          )
     190          with session:
     191              pip_self_version_check(session, options)
     192  
     193  
     194  KEEPABLE_TEMPDIR_TYPES = [
     195      tempdir_kinds.BUILD_ENV,
     196      tempdir_kinds.EPHEM_WHEEL_CACHE,
     197      tempdir_kinds.REQ_BUILD,
     198  ]
     199  
     200  
     201  def warn_if_run_as_root() -> None:
     202      """Output a warning for sudo users on Unix.
     203  
     204      In a virtual environment, sudo pip still writes to virtualenv.
     205      On Windows, users may run pip as Administrator without issues.
     206      This warning only applies to Unix root users outside of virtualenv.
     207      """
     208      if running_under_virtualenv():
     209          return
     210      if not hasattr(os, "getuid"):
     211          return
     212      # On Windows, there are no "system managed" Python packages. Installing as
     213      # Administrator via pip is the correct way of updating system environments.
     214      #
     215      # We choose sys.platform over utils.compat.WINDOWS here to enable Mypy platform
     216      # checks: https://mypy.readthedocs.io/en/stable/common_issues.html
     217      if sys.platform == "win32" or sys.platform == "cygwin":
     218          return
     219  
     220      if os.getuid() != 0:
     221          return
     222  
     223      logger.warning(
     224          "Running pip as the 'root' user can result in broken permissions and "
     225          "conflicting behaviour with the system package manager. "
     226          "It is recommended to use a virtual environment instead: "
     227          "https://pip.pypa.io/warnings/venv"
     228      )
     229  
     230  
     231  def with_cleanup(func: Any) -> Any:
     232      """Decorator for common logic related to managing temporary
     233      directories.
     234      """
     235  
     236      def configure_tempdir_registry(registry: TempDirectoryTypeRegistry) -> None:
     237          for t in KEEPABLE_TEMPDIR_TYPES:
     238              registry.set_delete(t, False)
     239  
     240      def wrapper(
     241          self: RequirementCommand, options: Values, args: List[Any]
     242      ) -> Optional[int]:
     243          assert self.tempdir_registry is not None
     244          if options.no_clean:
     245              configure_tempdir_registry(self.tempdir_registry)
     246  
     247          try:
     248              return func(self, options, args)
     249          except PreviousBuildDirError:
     250              # This kind of conflict can occur when the user passes an explicit
     251              # build directory with a pre-existing folder. In that case we do
     252              # not want to accidentally remove it.
     253              configure_tempdir_registry(self.tempdir_registry)
     254              raise
     255  
     256      return wrapper
     257  
     258  
     259  class ESC[4;38;5;81mRequirementCommand(ESC[4;38;5;149mIndexGroupCommand):
     260      def __init__(self, *args: Any, **kw: Any) -> None:
     261          super().__init__(*args, **kw)
     262  
     263          self.cmd_opts.add_option(cmdoptions.no_clean())
     264  
     265      @staticmethod
     266      def determine_resolver_variant(options: Values) -> str:
     267          """Determines which resolver should be used, based on the given options."""
     268          if "legacy-resolver" in options.deprecated_features_enabled:
     269              return "legacy"
     270  
     271          return "2020-resolver"
     272  
     273      @classmethod
     274      def make_requirement_preparer(
     275          cls,
     276          temp_build_dir: TempDirectory,
     277          options: Values,
     278          build_tracker: BuildTracker,
     279          session: PipSession,
     280          finder: PackageFinder,
     281          use_user_site: bool,
     282          download_dir: Optional[str] = None,
     283          verbosity: int = 0,
     284      ) -> RequirementPreparer:
     285          """
     286          Create a RequirementPreparer instance for the given parameters.
     287          """
     288          temp_build_dir_path = temp_build_dir.path
     289          assert temp_build_dir_path is not None
     290          legacy_resolver = False
     291  
     292          resolver_variant = cls.determine_resolver_variant(options)
     293          if resolver_variant == "2020-resolver":
     294              lazy_wheel = "fast-deps" in options.features_enabled
     295              if lazy_wheel:
     296                  logger.warning(
     297                      "pip is using lazily downloaded wheels using HTTP "
     298                      "range requests to obtain dependency information. "
     299                      "This experimental feature is enabled through "
     300                      "--use-feature=fast-deps and it is not ready for "
     301                      "production."
     302                  )
     303          else:
     304              legacy_resolver = True
     305              lazy_wheel = False
     306              if "fast-deps" in options.features_enabled:
     307                  logger.warning(
     308                      "fast-deps has no effect when used with the legacy resolver."
     309                  )
     310  
     311          return RequirementPreparer(
     312              build_dir=temp_build_dir_path,
     313              src_dir=options.src_dir,
     314              download_dir=download_dir,
     315              build_isolation=options.build_isolation,
     316              check_build_deps=options.check_build_deps,
     317              build_tracker=build_tracker,
     318              session=session,
     319              progress_bar=options.progress_bar,
     320              finder=finder,
     321              require_hashes=options.require_hashes,
     322              use_user_site=use_user_site,
     323              lazy_wheel=lazy_wheel,
     324              verbosity=verbosity,
     325              legacy_resolver=legacy_resolver,
     326          )
     327  
     328      @classmethod
     329      def make_resolver(
     330          cls,
     331          preparer: RequirementPreparer,
     332          finder: PackageFinder,
     333          options: Values,
     334          wheel_cache: Optional[WheelCache] = None,
     335          use_user_site: bool = False,
     336          ignore_installed: bool = True,
     337          ignore_requires_python: bool = False,
     338          force_reinstall: bool = False,
     339          upgrade_strategy: str = "to-satisfy-only",
     340          use_pep517: Optional[bool] = None,
     341          py_version_info: Optional[Tuple[int, ...]] = None,
     342      ) -> BaseResolver:
     343          """
     344          Create a Resolver instance for the given parameters.
     345          """
     346          make_install_req = partial(
     347              install_req_from_req_string,
     348              isolated=options.isolated_mode,
     349              use_pep517=use_pep517,
     350          )
     351          resolver_variant = cls.determine_resolver_variant(options)
     352          # The long import name and duplicated invocation is needed to convince
     353          # Mypy into correctly typechecking. Otherwise it would complain the
     354          # "Resolver" class being redefined.
     355          if resolver_variant == "2020-resolver":
     356              import pip._internal.resolution.resolvelib.resolver
     357  
     358              return pip._internal.resolution.resolvelib.resolver.Resolver(
     359                  preparer=preparer,
     360                  finder=finder,
     361                  wheel_cache=wheel_cache,
     362                  make_install_req=make_install_req,
     363                  use_user_site=use_user_site,
     364                  ignore_dependencies=options.ignore_dependencies,
     365                  ignore_installed=ignore_installed,
     366                  ignore_requires_python=ignore_requires_python,
     367                  force_reinstall=force_reinstall,
     368                  upgrade_strategy=upgrade_strategy,
     369                  py_version_info=py_version_info,
     370              )
     371          import pip._internal.resolution.legacy.resolver
     372  
     373          return pip._internal.resolution.legacy.resolver.Resolver(
     374              preparer=preparer,
     375              finder=finder,
     376              wheel_cache=wheel_cache,
     377              make_install_req=make_install_req,
     378              use_user_site=use_user_site,
     379              ignore_dependencies=options.ignore_dependencies,
     380              ignore_installed=ignore_installed,
     381              ignore_requires_python=ignore_requires_python,
     382              force_reinstall=force_reinstall,
     383              upgrade_strategy=upgrade_strategy,
     384              py_version_info=py_version_info,
     385          )
     386  
     387      def get_requirements(
     388          self,
     389          args: List[str],
     390          options: Values,
     391          finder: PackageFinder,
     392          session: PipSession,
     393      ) -> List[InstallRequirement]:
     394          """
     395          Parse command-line arguments into the corresponding requirements.
     396          """
     397          requirements: List[InstallRequirement] = []
     398          for filename in options.constraints:
     399              for parsed_req in parse_requirements(
     400                  filename,
     401                  constraint=True,
     402                  finder=finder,
     403                  options=options,
     404                  session=session,
     405              ):
     406                  req_to_add = install_req_from_parsed_requirement(
     407                      parsed_req,
     408                      isolated=options.isolated_mode,
     409                      user_supplied=False,
     410                  )
     411                  requirements.append(req_to_add)
     412  
     413          for req in args:
     414              req_to_add = install_req_from_line(
     415                  req,
     416                  comes_from=None,
     417                  isolated=options.isolated_mode,
     418                  use_pep517=options.use_pep517,
     419                  user_supplied=True,
     420                  config_settings=getattr(options, "config_settings", None),
     421              )
     422              requirements.append(req_to_add)
     423  
     424          for req in options.editables:
     425              req_to_add = install_req_from_editable(
     426                  req,
     427                  user_supplied=True,
     428                  isolated=options.isolated_mode,
     429                  use_pep517=options.use_pep517,
     430                  config_settings=getattr(options, "config_settings", None),
     431              )
     432              requirements.append(req_to_add)
     433  
     434          # NOTE: options.require_hashes may be set if --require-hashes is True
     435          for filename in options.requirements:
     436              for parsed_req in parse_requirements(
     437                  filename, finder=finder, options=options, session=session
     438              ):
     439                  req_to_add = install_req_from_parsed_requirement(
     440                      parsed_req,
     441                      isolated=options.isolated_mode,
     442                      use_pep517=options.use_pep517,
     443                      user_supplied=True,
     444                      config_settings=parsed_req.options.get("config_settings")
     445                      if parsed_req.options
     446                      else None,
     447                  )
     448                  requirements.append(req_to_add)
     449  
     450          # If any requirement has hash options, enable hash checking.
     451          if any(req.has_hash_options for req in requirements):
     452              options.require_hashes = True
     453  
     454          if not (args or options.editables or options.requirements):
     455              opts = {"name": self.name}
     456              if options.find_links:
     457                  raise CommandError(
     458                      "You must give at least one requirement to {name} "
     459                      '(maybe you meant "pip {name} {links}"?)'.format(
     460                          **dict(opts, links=" ".join(options.find_links))
     461                      )
     462                  )
     463              else:
     464                  raise CommandError(
     465                      "You must give at least one requirement to {name} "
     466                      '(see "pip help {name}")'.format(**opts)
     467                  )
     468  
     469          return requirements
     470  
     471      @staticmethod
     472      def trace_basic_info(finder: PackageFinder) -> None:
     473          """
     474          Trace basic information about the provided objects.
     475          """
     476          # Display where finder is looking for packages
     477          search_scope = finder.search_scope
     478          locations = search_scope.get_formatted_locations()
     479          if locations:
     480              logger.info(locations)
     481  
     482      def _build_package_finder(
     483          self,
     484          options: Values,
     485          session: PipSession,
     486          target_python: Optional[TargetPython] = None,
     487          ignore_requires_python: Optional[bool] = None,
     488      ) -> PackageFinder:
     489          """
     490          Create a package finder appropriate to this requirement command.
     491  
     492          :param ignore_requires_python: Whether to ignore incompatible
     493              "Requires-Python" values in links. Defaults to False.
     494          """
     495          link_collector = LinkCollector.create(session, options=options)
     496          selection_prefs = SelectionPreferences(
     497              allow_yanked=True,
     498              format_control=options.format_control,
     499              allow_all_prereleases=options.pre,
     500              prefer_binary=options.prefer_binary,
     501              ignore_requires_python=ignore_requires_python,
     502          )
     503  
     504          return PackageFinder.create(
     505              link_collector=link_collector,
     506              selection_prefs=selection_prefs,
     507              target_python=target_python,
     508          )