python (3.11.7)
       1  """Routines related to PyPI, indexes"""
       2  
       3  import enum
       4  import functools
       5  import itertools
       6  import logging
       7  import re
       8  from typing import TYPE_CHECKING, FrozenSet, Iterable, List, Optional, Set, Tuple, Union
       9  
      10  from pip._vendor.packaging import specifiers
      11  from pip._vendor.packaging.tags import Tag
      12  from pip._vendor.packaging.utils import canonicalize_name
      13  from pip._vendor.packaging.version import _BaseVersion
      14  from pip._vendor.packaging.version import parse as parse_version
      15  
      16  from pip._internal.exceptions import (
      17      BestVersionAlreadyInstalled,
      18      DistributionNotFound,
      19      InvalidWheelFilename,
      20      UnsupportedWheel,
      21  )
      22  from pip._internal.index.collector import LinkCollector, parse_links
      23  from pip._internal.models.candidate import InstallationCandidate
      24  from pip._internal.models.format_control import FormatControl
      25  from pip._internal.models.link import Link
      26  from pip._internal.models.search_scope import SearchScope
      27  from pip._internal.models.selection_prefs import SelectionPreferences
      28  from pip._internal.models.target_python import TargetPython
      29  from pip._internal.models.wheel import Wheel
      30  from pip._internal.req import InstallRequirement
      31  from pip._internal.utils._log import getLogger
      32  from pip._internal.utils.filetypes import WHEEL_EXTENSION
      33  from pip._internal.utils.hashes import Hashes
      34  from pip._internal.utils.logging import indent_log
      35  from pip._internal.utils.misc import build_netloc
      36  from pip._internal.utils.packaging import check_requires_python
      37  from pip._internal.utils.unpacking import SUPPORTED_EXTENSIONS
      38  
      39  if TYPE_CHECKING:
      40      from pip._vendor.typing_extensions import TypeGuard
      41  
      42  __all__ = ["FormatControl", "BestCandidateResult", "PackageFinder"]
      43  
      44  
      45  logger = getLogger(__name__)
      46  
      47  BuildTag = Union[Tuple[()], Tuple[int, str]]
      48  CandidateSortingKey = Tuple[int, int, int, _BaseVersion, Optional[int], BuildTag]
      49  
      50  
      51  def _check_link_requires_python(
      52      link: Link,
      53      version_info: Tuple[int, int, int],
      54      ignore_requires_python: bool = False,
      55  ) -> bool:
      56      """
      57      Return whether the given Python version is compatible with a link's
      58      "Requires-Python" value.
      59  
      60      :param version_info: A 3-tuple of ints representing the Python
      61          major-minor-micro version to check.
      62      :param ignore_requires_python: Whether to ignore the "Requires-Python"
      63          value if the given Python version isn't compatible.
      64      """
      65      try:
      66          is_compatible = check_requires_python(
      67              link.requires_python,
      68              version_info=version_info,
      69          )
      70      except specifiers.InvalidSpecifier:
      71          logger.debug(
      72              "Ignoring invalid Requires-Python (%r) for link: %s",
      73              link.requires_python,
      74              link,
      75          )
      76      else:
      77          if not is_compatible:
      78              version = ".".join(map(str, version_info))
      79              if not ignore_requires_python:
      80                  logger.verbose(
      81                      "Link requires a different Python (%s not in: %r): %s",
      82                      version,
      83                      link.requires_python,
      84                      link,
      85                  )
      86                  return False
      87  
      88              logger.debug(
      89                  "Ignoring failed Requires-Python check (%s not in: %r) for link: %s",
      90                  version,
      91                  link.requires_python,
      92                  link,
      93              )
      94  
      95      return True
      96  
      97  
      98  class ESC[4;38;5;81mLinkType(ESC[4;38;5;149menumESC[4;38;5;149m.ESC[4;38;5;149mEnum):
      99      candidate = enum.auto()
     100      different_project = enum.auto()
     101      yanked = enum.auto()
     102      format_unsupported = enum.auto()
     103      format_invalid = enum.auto()
     104      platform_mismatch = enum.auto()
     105      requires_python_mismatch = enum.auto()
     106  
     107  
     108  class ESC[4;38;5;81mLinkEvaluator:
     109  
     110      """
     111      Responsible for evaluating links for a particular project.
     112      """
     113  
     114      _py_version_re = re.compile(r"-py([123]\.?[0-9]?)$")
     115  
     116      # Don't include an allow_yanked default value to make sure each call
     117      # site considers whether yanked releases are allowed. This also causes
     118      # that decision to be made explicit in the calling code, which helps
     119      # people when reading the code.
     120      def __init__(
     121          self,
     122          project_name: str,
     123          canonical_name: str,
     124          formats: FrozenSet[str],
     125          target_python: TargetPython,
     126          allow_yanked: bool,
     127          ignore_requires_python: Optional[bool] = None,
     128      ) -> None:
     129          """
     130          :param project_name: The user supplied package name.
     131          :param canonical_name: The canonical package name.
     132          :param formats: The formats allowed for this package. Should be a set
     133              with 'binary' or 'source' or both in it.
     134          :param target_python: The target Python interpreter to use when
     135              evaluating link compatibility. This is used, for example, to
     136              check wheel compatibility, as well as when checking the Python
     137              version, e.g. the Python version embedded in a link filename
     138              (or egg fragment) and against an HTML link's optional PEP 503
     139              "data-requires-python" attribute.
     140          :param allow_yanked: Whether files marked as yanked (in the sense
     141              of PEP 592) are permitted to be candidates for install.
     142          :param ignore_requires_python: Whether to ignore incompatible
     143              PEP 503 "data-requires-python" values in HTML links. Defaults
     144              to False.
     145          """
     146          if ignore_requires_python is None:
     147              ignore_requires_python = False
     148  
     149          self._allow_yanked = allow_yanked
     150          self._canonical_name = canonical_name
     151          self._ignore_requires_python = ignore_requires_python
     152          self._formats = formats
     153          self._target_python = target_python
     154  
     155          self.project_name = project_name
     156  
     157      def evaluate_link(self, link: Link) -> Tuple[LinkType, str]:
     158          """
     159          Determine whether a link is a candidate for installation.
     160  
     161          :return: A tuple (result, detail), where *result* is an enum
     162              representing whether the evaluation found a candidate, or the reason
     163              why one is not found. If a candidate is found, *detail* will be the
     164              candidate's version string; if one is not found, it contains the
     165              reason the link fails to qualify.
     166          """
     167          version = None
     168          if link.is_yanked and not self._allow_yanked:
     169              reason = link.yanked_reason or "<none given>"
     170              return (LinkType.yanked, f"yanked for reason: {reason}")
     171  
     172          if link.egg_fragment:
     173              egg_info = link.egg_fragment
     174              ext = link.ext
     175          else:
     176              egg_info, ext = link.splitext()
     177              if not ext:
     178                  return (LinkType.format_unsupported, "not a file")
     179              if ext not in SUPPORTED_EXTENSIONS:
     180                  return (
     181                      LinkType.format_unsupported,
     182                      f"unsupported archive format: {ext}",
     183                  )
     184              if "binary" not in self._formats and ext == WHEEL_EXTENSION:
     185                  reason = f"No binaries permitted for {self.project_name}"
     186                  return (LinkType.format_unsupported, reason)
     187              if "macosx10" in link.path and ext == ".zip":
     188                  return (LinkType.format_unsupported, "macosx10 one")
     189              if ext == WHEEL_EXTENSION:
     190                  try:
     191                      wheel = Wheel(link.filename)
     192                  except InvalidWheelFilename:
     193                      return (
     194                          LinkType.format_invalid,
     195                          "invalid wheel filename",
     196                      )
     197                  if canonicalize_name(wheel.name) != self._canonical_name:
     198                      reason = f"wrong project name (not {self.project_name})"
     199                      return (LinkType.different_project, reason)
     200  
     201                  supported_tags = self._target_python.get_tags()
     202                  if not wheel.supported(supported_tags):
     203                      # Include the wheel's tags in the reason string to
     204                      # simplify troubleshooting compatibility issues.
     205                      file_tags = ", ".join(wheel.get_formatted_file_tags())
     206                      reason = (
     207                          f"none of the wheel's tags ({file_tags}) are compatible "
     208                          f"(run pip debug --verbose to show compatible tags)"
     209                      )
     210                      return (LinkType.platform_mismatch, reason)
     211  
     212                  version = wheel.version
     213  
     214          # This should be up by the self.ok_binary check, but see issue 2700.
     215          if "source" not in self._formats and ext != WHEEL_EXTENSION:
     216              reason = f"No sources permitted for {self.project_name}"
     217              return (LinkType.format_unsupported, reason)
     218  
     219          if not version:
     220              version = _extract_version_from_fragment(
     221                  egg_info,
     222                  self._canonical_name,
     223              )
     224          if not version:
     225              reason = f"Missing project version for {self.project_name}"
     226              return (LinkType.format_invalid, reason)
     227  
     228          match = self._py_version_re.search(version)
     229          if match:
     230              version = version[: match.start()]
     231              py_version = match.group(1)
     232              if py_version != self._target_python.py_version:
     233                  return (
     234                      LinkType.platform_mismatch,
     235                      "Python version is incorrect",
     236                  )
     237  
     238          supports_python = _check_link_requires_python(
     239              link,
     240              version_info=self._target_python.py_version_info,
     241              ignore_requires_python=self._ignore_requires_python,
     242          )
     243          if not supports_python:
     244              reason = f"{version} Requires-Python {link.requires_python}"
     245              return (LinkType.requires_python_mismatch, reason)
     246  
     247          logger.debug("Found link %s, version: %s", link, version)
     248  
     249          return (LinkType.candidate, version)
     250  
     251  
     252  def filter_unallowed_hashes(
     253      candidates: List[InstallationCandidate],
     254      hashes: Optional[Hashes],
     255      project_name: str,
     256  ) -> List[InstallationCandidate]:
     257      """
     258      Filter out candidates whose hashes aren't allowed, and return a new
     259      list of candidates.
     260  
     261      If at least one candidate has an allowed hash, then all candidates with
     262      either an allowed hash or no hash specified are returned.  Otherwise,
     263      the given candidates are returned.
     264  
     265      Including the candidates with no hash specified when there is a match
     266      allows a warning to be logged if there is a more preferred candidate
     267      with no hash specified.  Returning all candidates in the case of no
     268      matches lets pip report the hash of the candidate that would otherwise
     269      have been installed (e.g. permitting the user to more easily update
     270      their requirements file with the desired hash).
     271      """
     272      if not hashes:
     273          logger.debug(
     274              "Given no hashes to check %s links for project %r: "
     275              "discarding no candidates",
     276              len(candidates),
     277              project_name,
     278          )
     279          # Make sure we're not returning back the given value.
     280          return list(candidates)
     281  
     282      matches_or_no_digest = []
     283      # Collect the non-matches for logging purposes.
     284      non_matches = []
     285      match_count = 0
     286      for candidate in candidates:
     287          link = candidate.link
     288          if not link.has_hash:
     289              pass
     290          elif link.is_hash_allowed(hashes=hashes):
     291              match_count += 1
     292          else:
     293              non_matches.append(candidate)
     294              continue
     295  
     296          matches_or_no_digest.append(candidate)
     297  
     298      if match_count:
     299          filtered = matches_or_no_digest
     300      else:
     301          # Make sure we're not returning back the given value.
     302          filtered = list(candidates)
     303  
     304      if len(filtered) == len(candidates):
     305          discard_message = "discarding no candidates"
     306      else:
     307          discard_message = "discarding {} non-matches:\n  {}".format(
     308              len(non_matches),
     309              "\n  ".join(str(candidate.link) for candidate in non_matches),
     310          )
     311  
     312      logger.debug(
     313          "Checked %s links for project %r against %s hashes "
     314          "(%s matches, %s no digest): %s",
     315          len(candidates),
     316          project_name,
     317          hashes.digest_count,
     318          match_count,
     319          len(matches_or_no_digest) - match_count,
     320          discard_message,
     321      )
     322  
     323      return filtered
     324  
     325  
     326  class ESC[4;38;5;81mCandidatePreferences:
     327  
     328      """
     329      Encapsulates some of the preferences for filtering and sorting
     330      InstallationCandidate objects.
     331      """
     332  
     333      def __init__(
     334          self,
     335          prefer_binary: bool = False,
     336          allow_all_prereleases: bool = False,
     337      ) -> None:
     338          """
     339          :param allow_all_prereleases: Whether to allow all pre-releases.
     340          """
     341          self.allow_all_prereleases = allow_all_prereleases
     342          self.prefer_binary = prefer_binary
     343  
     344  
     345  class ESC[4;38;5;81mBestCandidateResult:
     346      """A collection of candidates, returned by `PackageFinder.find_best_candidate`.
     347  
     348      This class is only intended to be instantiated by CandidateEvaluator's
     349      `compute_best_candidate()` method.
     350      """
     351  
     352      def __init__(
     353          self,
     354          candidates: List[InstallationCandidate],
     355          applicable_candidates: List[InstallationCandidate],
     356          best_candidate: Optional[InstallationCandidate],
     357      ) -> None:
     358          """
     359          :param candidates: A sequence of all available candidates found.
     360          :param applicable_candidates: The applicable candidates.
     361          :param best_candidate: The most preferred candidate found, or None
     362              if no applicable candidates were found.
     363          """
     364          assert set(applicable_candidates) <= set(candidates)
     365  
     366          if best_candidate is None:
     367              assert not applicable_candidates
     368          else:
     369              assert best_candidate in applicable_candidates
     370  
     371          self._applicable_candidates = applicable_candidates
     372          self._candidates = candidates
     373  
     374          self.best_candidate = best_candidate
     375  
     376      def iter_all(self) -> Iterable[InstallationCandidate]:
     377          """Iterate through all candidates."""
     378          return iter(self._candidates)
     379  
     380      def iter_applicable(self) -> Iterable[InstallationCandidate]:
     381          """Iterate through the applicable candidates."""
     382          return iter(self._applicable_candidates)
     383  
     384  
     385  class ESC[4;38;5;81mCandidateEvaluator:
     386  
     387      """
     388      Responsible for filtering and sorting candidates for installation based
     389      on what tags are valid.
     390      """
     391  
     392      @classmethod
     393      def create(
     394          cls,
     395          project_name: str,
     396          target_python: Optional[TargetPython] = None,
     397          prefer_binary: bool = False,
     398          allow_all_prereleases: bool = False,
     399          specifier: Optional[specifiers.BaseSpecifier] = None,
     400          hashes: Optional[Hashes] = None,
     401      ) -> "CandidateEvaluator":
     402          """Create a CandidateEvaluator object.
     403  
     404          :param target_python: The target Python interpreter to use when
     405              checking compatibility. If None (the default), a TargetPython
     406              object will be constructed from the running Python.
     407          :param specifier: An optional object implementing `filter`
     408              (e.g. `packaging.specifiers.SpecifierSet`) to filter applicable
     409              versions.
     410          :param hashes: An optional collection of allowed hashes.
     411          """
     412          if target_python is None:
     413              target_python = TargetPython()
     414          if specifier is None:
     415              specifier = specifiers.SpecifierSet()
     416  
     417          supported_tags = target_python.get_tags()
     418  
     419          return cls(
     420              project_name=project_name,
     421              supported_tags=supported_tags,
     422              specifier=specifier,
     423              prefer_binary=prefer_binary,
     424              allow_all_prereleases=allow_all_prereleases,
     425              hashes=hashes,
     426          )
     427  
     428      def __init__(
     429          self,
     430          project_name: str,
     431          supported_tags: List[Tag],
     432          specifier: specifiers.BaseSpecifier,
     433          prefer_binary: bool = False,
     434          allow_all_prereleases: bool = False,
     435          hashes: Optional[Hashes] = None,
     436      ) -> None:
     437          """
     438          :param supported_tags: The PEP 425 tags supported by the target
     439              Python in order of preference (most preferred first).
     440          """
     441          self._allow_all_prereleases = allow_all_prereleases
     442          self._hashes = hashes
     443          self._prefer_binary = prefer_binary
     444          self._project_name = project_name
     445          self._specifier = specifier
     446          self._supported_tags = supported_tags
     447          # Since the index of the tag in the _supported_tags list is used
     448          # as a priority, precompute a map from tag to index/priority to be
     449          # used in wheel.find_most_preferred_tag.
     450          self._wheel_tag_preferences = {
     451              tag: idx for idx, tag in enumerate(supported_tags)
     452          }
     453  
     454      def get_applicable_candidates(
     455          self,
     456          candidates: List[InstallationCandidate],
     457      ) -> List[InstallationCandidate]:
     458          """
     459          Return the applicable candidates from a list of candidates.
     460          """
     461          # Using None infers from the specifier instead.
     462          allow_prereleases = self._allow_all_prereleases or None
     463          specifier = self._specifier
     464          versions = {
     465              str(v)
     466              for v in specifier.filter(
     467                  # We turn the version object into a str here because otherwise
     468                  # when we're debundled but setuptools isn't, Python will see
     469                  # packaging.version.Version and
     470                  # pkg_resources._vendor.packaging.version.Version as different
     471                  # types. This way we'll use a str as a common data interchange
     472                  # format. If we stop using the pkg_resources provided specifier
     473                  # and start using our own, we can drop the cast to str().
     474                  (str(c.version) for c in candidates),
     475                  prereleases=allow_prereleases,
     476              )
     477          }
     478  
     479          # Again, converting version to str to deal with debundling.
     480          applicable_candidates = [c for c in candidates if str(c.version) in versions]
     481  
     482          filtered_applicable_candidates = filter_unallowed_hashes(
     483              candidates=applicable_candidates,
     484              hashes=self._hashes,
     485              project_name=self._project_name,
     486          )
     487  
     488          return sorted(filtered_applicable_candidates, key=self._sort_key)
     489  
     490      def _sort_key(self, candidate: InstallationCandidate) -> CandidateSortingKey:
     491          """
     492          Function to pass as the `key` argument to a call to sorted() to sort
     493          InstallationCandidates by preference.
     494  
     495          Returns a tuple such that tuples sorting as greater using Python's
     496          default comparison operator are more preferred.
     497  
     498          The preference is as follows:
     499  
     500          First and foremost, candidates with allowed (matching) hashes are
     501          always preferred over candidates without matching hashes. This is
     502          because e.g. if the only candidate with an allowed hash is yanked,
     503          we still want to use that candidate.
     504  
     505          Second, excepting hash considerations, candidates that have been
     506          yanked (in the sense of PEP 592) are always less preferred than
     507          candidates that haven't been yanked. Then:
     508  
     509          If not finding wheels, they are sorted by version only.
     510          If finding wheels, then the sort order is by version, then:
     511            1. existing installs
     512            2. wheels ordered via Wheel.support_index_min(self._supported_tags)
     513            3. source archives
     514          If prefer_binary was set, then all wheels are sorted above sources.
     515  
     516          Note: it was considered to embed this logic into the Link
     517                comparison operators, but then different sdist links
     518                with the same version, would have to be considered equal
     519          """
     520          valid_tags = self._supported_tags
     521          support_num = len(valid_tags)
     522          build_tag: BuildTag = ()
     523          binary_preference = 0
     524          link = candidate.link
     525          if link.is_wheel:
     526              # can raise InvalidWheelFilename
     527              wheel = Wheel(link.filename)
     528              try:
     529                  pri = -(
     530                      wheel.find_most_preferred_tag(
     531                          valid_tags, self._wheel_tag_preferences
     532                      )
     533                  )
     534              except ValueError:
     535                  raise UnsupportedWheel(
     536                      "{} is not a supported wheel for this platform. It "
     537                      "can't be sorted.".format(wheel.filename)
     538                  )
     539              if self._prefer_binary:
     540                  binary_preference = 1
     541              if wheel.build_tag is not None:
     542                  match = re.match(r"^(\d+)(.*)$", wheel.build_tag)
     543                  assert match is not None, "guaranteed by filename validation"
     544                  build_tag_groups = match.groups()
     545                  build_tag = (int(build_tag_groups[0]), build_tag_groups[1])
     546          else:  # sdist
     547              pri = -(support_num)
     548          has_allowed_hash = int(link.is_hash_allowed(self._hashes))
     549          yank_value = -1 * int(link.is_yanked)  # -1 for yanked.
     550          return (
     551              has_allowed_hash,
     552              yank_value,
     553              binary_preference,
     554              candidate.version,
     555              pri,
     556              build_tag,
     557          )
     558  
     559      def sort_best_candidate(
     560          self,
     561          candidates: List[InstallationCandidate],
     562      ) -> Optional[InstallationCandidate]:
     563          """
     564          Return the best candidate per the instance's sort order, or None if
     565          no candidate is acceptable.
     566          """
     567          if not candidates:
     568              return None
     569          best_candidate = max(candidates, key=self._sort_key)
     570          return best_candidate
     571  
     572      def compute_best_candidate(
     573          self,
     574          candidates: List[InstallationCandidate],
     575      ) -> BestCandidateResult:
     576          """
     577          Compute and return a `BestCandidateResult` instance.
     578          """
     579          applicable_candidates = self.get_applicable_candidates(candidates)
     580  
     581          best_candidate = self.sort_best_candidate(applicable_candidates)
     582  
     583          return BestCandidateResult(
     584              candidates,
     585              applicable_candidates=applicable_candidates,
     586              best_candidate=best_candidate,
     587          )
     588  
     589  
     590  class ESC[4;38;5;81mPackageFinder:
     591      """This finds packages.
     592  
     593      This is meant to match easy_install's technique for looking for
     594      packages, by reading pages and looking for appropriate links.
     595      """
     596  
     597      def __init__(
     598          self,
     599          link_collector: LinkCollector,
     600          target_python: TargetPython,
     601          allow_yanked: bool,
     602          format_control: Optional[FormatControl] = None,
     603          candidate_prefs: Optional[CandidatePreferences] = None,
     604          ignore_requires_python: Optional[bool] = None,
     605      ) -> None:
     606          """
     607          This constructor is primarily meant to be used by the create() class
     608          method and from tests.
     609  
     610          :param format_control: A FormatControl object, used to control
     611              the selection of source packages / binary packages when consulting
     612              the index and links.
     613          :param candidate_prefs: Options to use when creating a
     614              CandidateEvaluator object.
     615          """
     616          if candidate_prefs is None:
     617              candidate_prefs = CandidatePreferences()
     618  
     619          format_control = format_control or FormatControl(set(), set())
     620  
     621          self._allow_yanked = allow_yanked
     622          self._candidate_prefs = candidate_prefs
     623          self._ignore_requires_python = ignore_requires_python
     624          self._link_collector = link_collector
     625          self._target_python = target_python
     626  
     627          self.format_control = format_control
     628  
     629          # These are boring links that have already been logged somehow.
     630          self._logged_links: Set[Tuple[Link, LinkType, str]] = set()
     631  
     632      # Don't include an allow_yanked default value to make sure each call
     633      # site considers whether yanked releases are allowed. This also causes
     634      # that decision to be made explicit in the calling code, which helps
     635      # people when reading the code.
     636      @classmethod
     637      def create(
     638          cls,
     639          link_collector: LinkCollector,
     640          selection_prefs: SelectionPreferences,
     641          target_python: Optional[TargetPython] = None,
     642      ) -> "PackageFinder":
     643          """Create a PackageFinder.
     644  
     645          :param selection_prefs: The candidate selection preferences, as a
     646              SelectionPreferences object.
     647          :param target_python: The target Python interpreter to use when
     648              checking compatibility. If None (the default), a TargetPython
     649              object will be constructed from the running Python.
     650          """
     651          if target_python is None:
     652              target_python = TargetPython()
     653  
     654          candidate_prefs = CandidatePreferences(
     655              prefer_binary=selection_prefs.prefer_binary,
     656              allow_all_prereleases=selection_prefs.allow_all_prereleases,
     657          )
     658  
     659          return cls(
     660              candidate_prefs=candidate_prefs,
     661              link_collector=link_collector,
     662              target_python=target_python,
     663              allow_yanked=selection_prefs.allow_yanked,
     664              format_control=selection_prefs.format_control,
     665              ignore_requires_python=selection_prefs.ignore_requires_python,
     666          )
     667  
     668      @property
     669      def target_python(self) -> TargetPython:
     670          return self._target_python
     671  
     672      @property
     673      def search_scope(self) -> SearchScope:
     674          return self._link_collector.search_scope
     675  
     676      @search_scope.setter
     677      def search_scope(self, search_scope: SearchScope) -> None:
     678          self._link_collector.search_scope = search_scope
     679  
     680      @property
     681      def find_links(self) -> List[str]:
     682          return self._link_collector.find_links
     683  
     684      @property
     685      def index_urls(self) -> List[str]:
     686          return self.search_scope.index_urls
     687  
     688      @property
     689      def trusted_hosts(self) -> Iterable[str]:
     690          for host_port in self._link_collector.session.pip_trusted_origins:
     691              yield build_netloc(*host_port)
     692  
     693      @property
     694      def allow_all_prereleases(self) -> bool:
     695          return self._candidate_prefs.allow_all_prereleases
     696  
     697      def set_allow_all_prereleases(self) -> None:
     698          self._candidate_prefs.allow_all_prereleases = True
     699  
     700      @property
     701      def prefer_binary(self) -> bool:
     702          return self._candidate_prefs.prefer_binary
     703  
     704      def set_prefer_binary(self) -> None:
     705          self._candidate_prefs.prefer_binary = True
     706  
     707      def requires_python_skipped_reasons(self) -> List[str]:
     708          reasons = {
     709              detail
     710              for _, result, detail in self._logged_links
     711              if result == LinkType.requires_python_mismatch
     712          }
     713          return sorted(reasons)
     714  
     715      def make_link_evaluator(self, project_name: str) -> LinkEvaluator:
     716          canonical_name = canonicalize_name(project_name)
     717          formats = self.format_control.get_allowed_formats(canonical_name)
     718  
     719          return LinkEvaluator(
     720              project_name=project_name,
     721              canonical_name=canonical_name,
     722              formats=formats,
     723              target_python=self._target_python,
     724              allow_yanked=self._allow_yanked,
     725              ignore_requires_python=self._ignore_requires_python,
     726          )
     727  
     728      def _sort_links(self, links: Iterable[Link]) -> List[Link]:
     729          """
     730          Returns elements of links in order, non-egg links first, egg links
     731          second, while eliminating duplicates
     732          """
     733          eggs, no_eggs = [], []
     734          seen: Set[Link] = set()
     735          for link in links:
     736              if link not in seen:
     737                  seen.add(link)
     738                  if link.egg_fragment:
     739                      eggs.append(link)
     740                  else:
     741                      no_eggs.append(link)
     742          return no_eggs + eggs
     743  
     744      def _log_skipped_link(self, link: Link, result: LinkType, detail: str) -> None:
     745          entry = (link, result, detail)
     746          if entry not in self._logged_links:
     747              # Put the link at the end so the reason is more visible and because
     748              # the link string is usually very long.
     749              logger.debug("Skipping link: %s: %s", detail, link)
     750              self._logged_links.add(entry)
     751  
     752      def get_install_candidate(
     753          self, link_evaluator: LinkEvaluator, link: Link
     754      ) -> Optional[InstallationCandidate]:
     755          """
     756          If the link is a candidate for install, convert it to an
     757          InstallationCandidate and return it. Otherwise, return None.
     758          """
     759          result, detail = link_evaluator.evaluate_link(link)
     760          if result != LinkType.candidate:
     761              self._log_skipped_link(link, result, detail)
     762              return None
     763  
     764          return InstallationCandidate(
     765              name=link_evaluator.project_name,
     766              link=link,
     767              version=detail,
     768          )
     769  
     770      def evaluate_links(
     771          self, link_evaluator: LinkEvaluator, links: Iterable[Link]
     772      ) -> List[InstallationCandidate]:
     773          """
     774          Convert links that are candidates to InstallationCandidate objects.
     775          """
     776          candidates = []
     777          for link in self._sort_links(links):
     778              candidate = self.get_install_candidate(link_evaluator, link)
     779              if candidate is not None:
     780                  candidates.append(candidate)
     781  
     782          return candidates
     783  
     784      def process_project_url(
     785          self, project_url: Link, link_evaluator: LinkEvaluator
     786      ) -> List[InstallationCandidate]:
     787          logger.debug(
     788              "Fetching project page and analyzing links: %s",
     789              project_url,
     790          )
     791          index_response = self._link_collector.fetch_response(project_url)
     792          if index_response is None:
     793              return []
     794  
     795          page_links = list(parse_links(index_response))
     796  
     797          with indent_log():
     798              package_links = self.evaluate_links(
     799                  link_evaluator,
     800                  links=page_links,
     801              )
     802  
     803          return package_links
     804  
     805      @functools.lru_cache(maxsize=None)
     806      def find_all_candidates(self, project_name: str) -> List[InstallationCandidate]:
     807          """Find all available InstallationCandidate for project_name
     808  
     809          This checks index_urls and find_links.
     810          All versions found are returned as an InstallationCandidate list.
     811  
     812          See LinkEvaluator.evaluate_link() for details on which files
     813          are accepted.
     814          """
     815          link_evaluator = self.make_link_evaluator(project_name)
     816  
     817          collected_sources = self._link_collector.collect_sources(
     818              project_name=project_name,
     819              candidates_from_page=functools.partial(
     820                  self.process_project_url,
     821                  link_evaluator=link_evaluator,
     822              ),
     823          )
     824  
     825          page_candidates_it = itertools.chain.from_iterable(
     826              source.page_candidates()
     827              for sources in collected_sources
     828              for source in sources
     829              if source is not None
     830          )
     831          page_candidates = list(page_candidates_it)
     832  
     833          file_links_it = itertools.chain.from_iterable(
     834              source.file_links()
     835              for sources in collected_sources
     836              for source in sources
     837              if source is not None
     838          )
     839          file_candidates = self.evaluate_links(
     840              link_evaluator,
     841              sorted(file_links_it, reverse=True),
     842          )
     843  
     844          if logger.isEnabledFor(logging.DEBUG) and file_candidates:
     845              paths = []
     846              for candidate in file_candidates:
     847                  assert candidate.link.url  # we need to have a URL
     848                  try:
     849                      paths.append(candidate.link.file_path)
     850                  except Exception:
     851                      paths.append(candidate.link.url)  # it's not a local file
     852  
     853              logger.debug("Local files found: %s", ", ".join(paths))
     854  
     855          # This is an intentional priority ordering
     856          return file_candidates + page_candidates
     857  
     858      def make_candidate_evaluator(
     859          self,
     860          project_name: str,
     861          specifier: Optional[specifiers.BaseSpecifier] = None,
     862          hashes: Optional[Hashes] = None,
     863      ) -> CandidateEvaluator:
     864          """Create a CandidateEvaluator object to use."""
     865          candidate_prefs = self._candidate_prefs
     866          return CandidateEvaluator.create(
     867              project_name=project_name,
     868              target_python=self._target_python,
     869              prefer_binary=candidate_prefs.prefer_binary,
     870              allow_all_prereleases=candidate_prefs.allow_all_prereleases,
     871              specifier=specifier,
     872              hashes=hashes,
     873          )
     874  
     875      @functools.lru_cache(maxsize=None)
     876      def find_best_candidate(
     877          self,
     878          project_name: str,
     879          specifier: Optional[specifiers.BaseSpecifier] = None,
     880          hashes: Optional[Hashes] = None,
     881      ) -> BestCandidateResult:
     882          """Find matches for the given project and specifier.
     883  
     884          :param specifier: An optional object implementing `filter`
     885              (e.g. `packaging.specifiers.SpecifierSet`) to filter applicable
     886              versions.
     887  
     888          :return: A `BestCandidateResult` instance.
     889          """
     890          candidates = self.find_all_candidates(project_name)
     891          candidate_evaluator = self.make_candidate_evaluator(
     892              project_name=project_name,
     893              specifier=specifier,
     894              hashes=hashes,
     895          )
     896          return candidate_evaluator.compute_best_candidate(candidates)
     897  
     898      def find_requirement(
     899          self, req: InstallRequirement, upgrade: bool
     900      ) -> Optional[InstallationCandidate]:
     901          """Try to find a Link matching req
     902  
     903          Expects req, an InstallRequirement and upgrade, a boolean
     904          Returns a InstallationCandidate if found,
     905          Raises DistributionNotFound or BestVersionAlreadyInstalled otherwise
     906          """
     907          hashes = req.hashes(trust_internet=False)
     908          best_candidate_result = self.find_best_candidate(
     909              req.name,
     910              specifier=req.specifier,
     911              hashes=hashes,
     912          )
     913          best_candidate = best_candidate_result.best_candidate
     914  
     915          installed_version: Optional[_BaseVersion] = None
     916          if req.satisfied_by is not None:
     917              installed_version = req.satisfied_by.version
     918  
     919          def _format_versions(cand_iter: Iterable[InstallationCandidate]) -> str:
     920              # This repeated parse_version and str() conversion is needed to
     921              # handle different vendoring sources from pip and pkg_resources.
     922              # If we stop using the pkg_resources provided specifier and start
     923              # using our own, we can drop the cast to str().
     924              return (
     925                  ", ".join(
     926                      sorted(
     927                          {str(c.version) for c in cand_iter},
     928                          key=parse_version,
     929                      )
     930                  )
     931                  or "none"
     932              )
     933  
     934          if installed_version is None and best_candidate is None:
     935              logger.critical(
     936                  "Could not find a version that satisfies the requirement %s "
     937                  "(from versions: %s)",
     938                  req,
     939                  _format_versions(best_candidate_result.iter_all()),
     940              )
     941  
     942              raise DistributionNotFound(
     943                  "No matching distribution found for {}".format(req)
     944              )
     945  
     946          def _should_install_candidate(
     947              candidate: Optional[InstallationCandidate],
     948          ) -> "TypeGuard[InstallationCandidate]":
     949              if installed_version is None:
     950                  return True
     951              if best_candidate is None:
     952                  return False
     953              return best_candidate.version > installed_version
     954  
     955          if not upgrade and installed_version is not None:
     956              if _should_install_candidate(best_candidate):
     957                  logger.debug(
     958                      "Existing installed version (%s) satisfies requirement "
     959                      "(most up-to-date version is %s)",
     960                      installed_version,
     961                      best_candidate.version,
     962                  )
     963              else:
     964                  logger.debug(
     965                      "Existing installed version (%s) is most up-to-date and "
     966                      "satisfies requirement",
     967                      installed_version,
     968                  )
     969              return None
     970  
     971          if _should_install_candidate(best_candidate):
     972              logger.debug(
     973                  "Using version %s (newest of versions: %s)",
     974                  best_candidate.version,
     975                  _format_versions(best_candidate_result.iter_applicable()),
     976              )
     977              return best_candidate
     978  
     979          # We have an existing version, and its the best version
     980          logger.debug(
     981              "Installed version (%s) is most up-to-date (past versions: %s)",
     982              installed_version,
     983              _format_versions(best_candidate_result.iter_applicable()),
     984          )
     985          raise BestVersionAlreadyInstalled
     986  
     987  
     988  def _find_name_version_sep(fragment: str, canonical_name: str) -> int:
     989      """Find the separator's index based on the package's canonical name.
     990  
     991      :param fragment: A <package>+<version> filename "fragment" (stem) or
     992          egg fragment.
     993      :param canonical_name: The package's canonical name.
     994  
     995      This function is needed since the canonicalized name does not necessarily
     996      have the same length as the egg info's name part. An example::
     997  
     998      >>> fragment = 'foo__bar-1.0'
     999      >>> canonical_name = 'foo-bar'
    1000      >>> _find_name_version_sep(fragment, canonical_name)
    1001      8
    1002      """
    1003      # Project name and version must be separated by one single dash. Find all
    1004      # occurrences of dashes; if the string in front of it matches the canonical
    1005      # name, this is the one separating the name and version parts.
    1006      for i, c in enumerate(fragment):
    1007          if c != "-":
    1008              continue
    1009          if canonicalize_name(fragment[:i]) == canonical_name:
    1010              return i
    1011      raise ValueError(f"{fragment} does not match {canonical_name}")
    1012  
    1013  
    1014  def _extract_version_from_fragment(fragment: str, canonical_name: str) -> Optional[str]:
    1015      """Parse the version string from a <package>+<version> filename
    1016      "fragment" (stem) or egg fragment.
    1017  
    1018      :param fragment: The string to parse. E.g. foo-2.1
    1019      :param canonical_name: The canonicalized name of the package this
    1020          belongs to.
    1021      """
    1022      try:
    1023          version_start = _find_name_version_sep(fragment, canonical_name) + 1
    1024      except ValueError:
    1025          return None
    1026      version = fragment[version_start:]
    1027      if not version:
    1028          return None
    1029      return version