python (3.11.7)
       1  import functools
       2  import itertools
       3  import logging
       4  import os
       5  import posixpath
       6  import re
       7  import urllib.parse
       8  from dataclasses import dataclass
       9  from typing import (
      10      TYPE_CHECKING,
      11      Any,
      12      Dict,
      13      List,
      14      Mapping,
      15      NamedTuple,
      16      Optional,
      17      Tuple,
      18      Union,
      19  )
      20  
      21  from pip._internal.utils.deprecation import deprecated
      22  from pip._internal.utils.filetypes import WHEEL_EXTENSION
      23  from pip._internal.utils.hashes import Hashes
      24  from pip._internal.utils.misc import (
      25      pairwise,
      26      redact_auth_from_url,
      27      split_auth_from_netloc,
      28      splitext,
      29  )
      30  from pip._internal.utils.models import KeyBasedCompareMixin
      31  from pip._internal.utils.urls import path_to_url, url_to_path
      32  
      33  if TYPE_CHECKING:
      34      from pip._internal.index.collector import IndexContent
      35  
      36  logger = logging.getLogger(__name__)
      37  
      38  
      39  # Order matters, earlier hashes have a precedence over later hashes for what
      40  # we will pick to use.
      41  _SUPPORTED_HASHES = ("sha512", "sha384", "sha256", "sha224", "sha1", "md5")
      42  
      43  
      44  @dataclass(frozen=True)
      45  class ESC[4;38;5;81mLinkHash:
      46      """Links to content may have embedded hash values. This class parses those.
      47  
      48      `name` must be any member of `_SUPPORTED_HASHES`.
      49  
      50      This class can be converted to and from `ArchiveInfo`. While ArchiveInfo intends to
      51      be JSON-serializable to conform to PEP 610, this class contains the logic for
      52      parsing a hash name and value for correctness, and then checking whether that hash
      53      conforms to a schema with `.is_hash_allowed()`."""
      54  
      55      name: str
      56      value: str
      57  
      58      _hash_url_fragment_re = re.compile(
      59          # NB: we do not validate that the second group (.*) is a valid hex
      60          # digest. Instead, we simply keep that string in this class, and then check it
      61          # against Hashes when hash-checking is needed. This is easier to debug than
      62          # proactively discarding an invalid hex digest, as we handle incorrect hashes
      63          # and malformed hashes in the same place.
      64          r"[#&]({choices})=([^&]*)".format(
      65              choices="|".join(re.escape(hash_name) for hash_name in _SUPPORTED_HASHES)
      66          ),
      67      )
      68  
      69      def __post_init__(self) -> None:
      70          assert self.name in _SUPPORTED_HASHES
      71  
      72      @classmethod
      73      @functools.lru_cache(maxsize=None)
      74      def find_hash_url_fragment(cls, url: str) -> Optional["LinkHash"]:
      75          """Search a string for a checksum algorithm name and encoded output value."""
      76          match = cls._hash_url_fragment_re.search(url)
      77          if match is None:
      78              return None
      79          name, value = match.groups()
      80          return cls(name=name, value=value)
      81  
      82      def as_dict(self) -> Dict[str, str]:
      83          return {self.name: self.value}
      84  
      85      def as_hashes(self) -> Hashes:
      86          """Return a Hashes instance which checks only for the current hash."""
      87          return Hashes({self.name: [self.value]})
      88  
      89      def is_hash_allowed(self, hashes: Optional[Hashes]) -> bool:
      90          """
      91          Return True if the current hash is allowed by `hashes`.
      92          """
      93          if hashes is None:
      94              return False
      95          return hashes.is_hash_allowed(self.name, hex_digest=self.value)
      96  
      97  
      98  @dataclass(frozen=True)
      99  class ESC[4;38;5;81mMetadataFile:
     100      """Information about a core metadata file associated with a distribution."""
     101  
     102      hashes: Optional[Dict[str, str]]
     103  
     104      def __post_init__(self) -> None:
     105          if self.hashes is not None:
     106              assert all(name in _SUPPORTED_HASHES for name in self.hashes)
     107  
     108  
     109  def supported_hashes(hashes: Optional[Dict[str, str]]) -> Optional[Dict[str, str]]:
     110      # Remove any unsupported hash types from the mapping. If this leaves no
     111      # supported hashes, return None
     112      if hashes is None:
     113          return None
     114      hashes = {n: v for n, v in hashes.items() if n in _SUPPORTED_HASHES}
     115      if not hashes:
     116          return None
     117      return hashes
     118  
     119  
     120  def _clean_url_path_part(part: str) -> str:
     121      """
     122      Clean a "part" of a URL path (i.e. after splitting on "@" characters).
     123      """
     124      # We unquote prior to quoting to make sure nothing is double quoted.
     125      return urllib.parse.quote(urllib.parse.unquote(part))
     126  
     127  
     128  def _clean_file_url_path(part: str) -> str:
     129      """
     130      Clean the first part of a URL path that corresponds to a local
     131      filesystem path (i.e. the first part after splitting on "@" characters).
     132      """
     133      # We unquote prior to quoting to make sure nothing is double quoted.
     134      # Also, on Windows the path part might contain a drive letter which
     135      # should not be quoted. On Linux where drive letters do not
     136      # exist, the colon should be quoted. We rely on urllib.request
     137      # to do the right thing here.
     138      return urllib.request.pathname2url(urllib.request.url2pathname(part))
     139  
     140  
     141  # percent-encoded:                   /
     142  _reserved_chars_re = re.compile("(@|%2F)", re.IGNORECASE)
     143  
     144  
     145  def _clean_url_path(path: str, is_local_path: bool) -> str:
     146      """
     147      Clean the path portion of a URL.
     148      """
     149      if is_local_path:
     150          clean_func = _clean_file_url_path
     151      else:
     152          clean_func = _clean_url_path_part
     153  
     154      # Split on the reserved characters prior to cleaning so that
     155      # revision strings in VCS URLs are properly preserved.
     156      parts = _reserved_chars_re.split(path)
     157  
     158      cleaned_parts = []
     159      for to_clean, reserved in pairwise(itertools.chain(parts, [""])):
     160          cleaned_parts.append(clean_func(to_clean))
     161          # Normalize %xx escapes (e.g. %2f -> %2F)
     162          cleaned_parts.append(reserved.upper())
     163  
     164      return "".join(cleaned_parts)
     165  
     166  
     167  def _ensure_quoted_url(url: str) -> str:
     168      """
     169      Make sure a link is fully quoted.
     170      For example, if ' ' occurs in the URL, it will be replaced with "%20",
     171      and without double-quoting other characters.
     172      """
     173      # Split the URL into parts according to the general structure
     174      # `scheme://netloc/path;parameters?query#fragment`.
     175      result = urllib.parse.urlparse(url)
     176      # If the netloc is empty, then the URL refers to a local filesystem path.
     177      is_local_path = not result.netloc
     178      path = _clean_url_path(result.path, is_local_path=is_local_path)
     179      return urllib.parse.urlunparse(result._replace(path=path))
     180  
     181  
     182  class ESC[4;38;5;81mLink(ESC[4;38;5;149mKeyBasedCompareMixin):
     183      """Represents a parsed link from a Package Index's simple URL"""
     184  
     185      __slots__ = [
     186          "_parsed_url",
     187          "_url",
     188          "_hashes",
     189          "comes_from",
     190          "requires_python",
     191          "yanked_reason",
     192          "metadata_file_data",
     193          "cache_link_parsing",
     194          "egg_fragment",
     195      ]
     196  
     197      def __init__(
     198          self,
     199          url: str,
     200          comes_from: Optional[Union[str, "IndexContent"]] = None,
     201          requires_python: Optional[str] = None,
     202          yanked_reason: Optional[str] = None,
     203          metadata_file_data: Optional[MetadataFile] = None,
     204          cache_link_parsing: bool = True,
     205          hashes: Optional[Mapping[str, str]] = None,
     206      ) -> None:
     207          """
     208          :param url: url of the resource pointed to (href of the link)
     209          :param comes_from: instance of IndexContent where the link was found,
     210              or string.
     211          :param requires_python: String containing the `Requires-Python`
     212              metadata field, specified in PEP 345. This may be specified by
     213              a data-requires-python attribute in the HTML link tag, as
     214              described in PEP 503.
     215          :param yanked_reason: the reason the file has been yanked, if the
     216              file has been yanked, or None if the file hasn't been yanked.
     217              This is the value of the "data-yanked" attribute, if present, in
     218              a simple repository HTML link. If the file has been yanked but
     219              no reason was provided, this should be the empty string. See
     220              PEP 592 for more information and the specification.
     221          :param metadata_file_data: the metadata attached to the file, or None if
     222              no such metadata is provided. This argument, if not None, indicates
     223              that a separate metadata file exists, and also optionally supplies
     224              hashes for that file.
     225          :param cache_link_parsing: A flag that is used elsewhere to determine
     226              whether resources retrieved from this link should be cached. PyPI
     227              URLs should generally have this set to False, for example.
     228          :param hashes: A mapping of hash names to digests to allow us to
     229              determine the validity of a download.
     230          """
     231  
     232          # The comes_from, requires_python, and metadata_file_data arguments are
     233          # only used by classmethods of this class, and are not used in client
     234          # code directly.
     235  
     236          # url can be a UNC windows share
     237          if url.startswith("\\\\"):
     238              url = path_to_url(url)
     239  
     240          self._parsed_url = urllib.parse.urlsplit(url)
     241          # Store the url as a private attribute to prevent accidentally
     242          # trying to set a new value.
     243          self._url = url
     244  
     245          link_hash = LinkHash.find_hash_url_fragment(url)
     246          hashes_from_link = {} if link_hash is None else link_hash.as_dict()
     247          if hashes is None:
     248              self._hashes = hashes_from_link
     249          else:
     250              self._hashes = {**hashes, **hashes_from_link}
     251  
     252          self.comes_from = comes_from
     253          self.requires_python = requires_python if requires_python else None
     254          self.yanked_reason = yanked_reason
     255          self.metadata_file_data = metadata_file_data
     256  
     257          super().__init__(key=url, defining_class=Link)
     258  
     259          self.cache_link_parsing = cache_link_parsing
     260          self.egg_fragment = self._egg_fragment()
     261  
     262      @classmethod
     263      def from_json(
     264          cls,
     265          file_data: Dict[str, Any],
     266          page_url: str,
     267      ) -> Optional["Link"]:
     268          """
     269          Convert an pypi json document from a simple repository page into a Link.
     270          """
     271          file_url = file_data.get("url")
     272          if file_url is None:
     273              return None
     274  
     275          url = _ensure_quoted_url(urllib.parse.urljoin(page_url, file_url))
     276          pyrequire = file_data.get("requires-python")
     277          yanked_reason = file_data.get("yanked")
     278          hashes = file_data.get("hashes", {})
     279  
     280          # PEP 714: Indexes must use the name core-metadata, but
     281          # clients should support the old name as a fallback for compatibility.
     282          metadata_info = file_data.get("core-metadata")
     283          if metadata_info is None:
     284              metadata_info = file_data.get("dist-info-metadata")
     285  
     286          # The metadata info value may be a boolean, or a dict of hashes.
     287          if isinstance(metadata_info, dict):
     288              # The file exists, and hashes have been supplied
     289              metadata_file_data = MetadataFile(supported_hashes(metadata_info))
     290          elif metadata_info:
     291              # The file exists, but there are no hashes
     292              metadata_file_data = MetadataFile(None)
     293          else:
     294              # False or not present: the file does not exist
     295              metadata_file_data = None
     296  
     297          # The Link.yanked_reason expects an empty string instead of a boolean.
     298          if yanked_reason and not isinstance(yanked_reason, str):
     299              yanked_reason = ""
     300          # The Link.yanked_reason expects None instead of False.
     301          elif not yanked_reason:
     302              yanked_reason = None
     303  
     304          return cls(
     305              url,
     306              comes_from=page_url,
     307              requires_python=pyrequire,
     308              yanked_reason=yanked_reason,
     309              hashes=hashes,
     310              metadata_file_data=metadata_file_data,
     311          )
     312  
     313      @classmethod
     314      def from_element(
     315          cls,
     316          anchor_attribs: Dict[str, Optional[str]],
     317          page_url: str,
     318          base_url: str,
     319      ) -> Optional["Link"]:
     320          """
     321          Convert an anchor element's attributes in a simple repository page to a Link.
     322          """
     323          href = anchor_attribs.get("href")
     324          if not href:
     325              return None
     326  
     327          url = _ensure_quoted_url(urllib.parse.urljoin(base_url, href))
     328          pyrequire = anchor_attribs.get("data-requires-python")
     329          yanked_reason = anchor_attribs.get("data-yanked")
     330  
     331          # PEP 714: Indexes must use the name data-core-metadata, but
     332          # clients should support the old name as a fallback for compatibility.
     333          metadata_info = anchor_attribs.get("data-core-metadata")
     334          if metadata_info is None:
     335              metadata_info = anchor_attribs.get("data-dist-info-metadata")
     336          # The metadata info value may be the string "true", or a string of
     337          # the form "hashname=hashval"
     338          if metadata_info == "true":
     339              # The file exists, but there are no hashes
     340              metadata_file_data = MetadataFile(None)
     341          elif metadata_info is None:
     342              # The file does not exist
     343              metadata_file_data = None
     344          else:
     345              # The file exists, and hashes have been supplied
     346              hashname, sep, hashval = metadata_info.partition("=")
     347              if sep == "=":
     348                  metadata_file_data = MetadataFile(supported_hashes({hashname: hashval}))
     349              else:
     350                  # Error - data is wrong. Treat as no hashes supplied.
     351                  logger.debug(
     352                      "Index returned invalid data-dist-info-metadata value: %s",
     353                      metadata_info,
     354                  )
     355                  metadata_file_data = MetadataFile(None)
     356  
     357          return cls(
     358              url,
     359              comes_from=page_url,
     360              requires_python=pyrequire,
     361              yanked_reason=yanked_reason,
     362              metadata_file_data=metadata_file_data,
     363          )
     364  
     365      def __str__(self) -> str:
     366          if self.requires_python:
     367              rp = f" (requires-python:{self.requires_python})"
     368          else:
     369              rp = ""
     370          if self.comes_from:
     371              return "{} (from {}){}".format(
     372                  redact_auth_from_url(self._url), self.comes_from, rp
     373              )
     374          else:
     375              return redact_auth_from_url(str(self._url))
     376  
     377      def __repr__(self) -> str:
     378          return f"<Link {self}>"
     379  
     380      @property
     381      def url(self) -> str:
     382          return self._url
     383  
     384      @property
     385      def filename(self) -> str:
     386          path = self.path.rstrip("/")
     387          name = posixpath.basename(path)
     388          if not name:
     389              # Make sure we don't leak auth information if the netloc
     390              # includes a username and password.
     391              netloc, user_pass = split_auth_from_netloc(self.netloc)
     392              return netloc
     393  
     394          name = urllib.parse.unquote(name)
     395          assert name, f"URL {self._url!r} produced no filename"
     396          return name
     397  
     398      @property
     399      def file_path(self) -> str:
     400          return url_to_path(self.url)
     401  
     402      @property
     403      def scheme(self) -> str:
     404          return self._parsed_url.scheme
     405  
     406      @property
     407      def netloc(self) -> str:
     408          """
     409          This can contain auth information.
     410          """
     411          return self._parsed_url.netloc
     412  
     413      @property
     414      def path(self) -> str:
     415          return urllib.parse.unquote(self._parsed_url.path)
     416  
     417      def splitext(self) -> Tuple[str, str]:
     418          return splitext(posixpath.basename(self.path.rstrip("/")))
     419  
     420      @property
     421      def ext(self) -> str:
     422          return self.splitext()[1]
     423  
     424      @property
     425      def url_without_fragment(self) -> str:
     426          scheme, netloc, path, query, fragment = self._parsed_url
     427          return urllib.parse.urlunsplit((scheme, netloc, path, query, ""))
     428  
     429      _egg_fragment_re = re.compile(r"[#&]egg=([^&]*)")
     430  
     431      # Per PEP 508.
     432      _project_name_re = re.compile(
     433          r"^([A-Z0-9]|[A-Z0-9][A-Z0-9._-]*[A-Z0-9])$", re.IGNORECASE
     434      )
     435  
     436      def _egg_fragment(self) -> Optional[str]:
     437          match = self._egg_fragment_re.search(self._url)
     438          if not match:
     439              return None
     440  
     441          # An egg fragment looks like a PEP 508 project name, along with
     442          # an optional extras specifier. Anything else is invalid.
     443          project_name = match.group(1)
     444          if not self._project_name_re.match(project_name):
     445              deprecated(
     446                  reason=f"{self} contains an egg fragment with a non-PEP 508 name",
     447                  replacement="to use the req @ url syntax, and remove the egg fragment",
     448                  gone_in="25.0",
     449                  issue=11617,
     450              )
     451  
     452          return project_name
     453  
     454      _subdirectory_fragment_re = re.compile(r"[#&]subdirectory=([^&]*)")
     455  
     456      @property
     457      def subdirectory_fragment(self) -> Optional[str]:
     458          match = self._subdirectory_fragment_re.search(self._url)
     459          if not match:
     460              return None
     461          return match.group(1)
     462  
     463      def metadata_link(self) -> Optional["Link"]:
     464          """Return a link to the associated core metadata file (if any)."""
     465          if self.metadata_file_data is None:
     466              return None
     467          metadata_url = f"{self.url_without_fragment}.metadata"
     468          if self.metadata_file_data.hashes is None:
     469              return Link(metadata_url)
     470          return Link(metadata_url, hashes=self.metadata_file_data.hashes)
     471  
     472      def as_hashes(self) -> Hashes:
     473          return Hashes({k: [v] for k, v in self._hashes.items()})
     474  
     475      @property
     476      def hash(self) -> Optional[str]:
     477          return next(iter(self._hashes.values()), None)
     478  
     479      @property
     480      def hash_name(self) -> Optional[str]:
     481          return next(iter(self._hashes), None)
     482  
     483      @property
     484      def show_url(self) -> str:
     485          return posixpath.basename(self._url.split("#", 1)[0].split("?", 1)[0])
     486  
     487      @property
     488      def is_file(self) -> bool:
     489          return self.scheme == "file"
     490  
     491      def is_existing_dir(self) -> bool:
     492          return self.is_file and os.path.isdir(self.file_path)
     493  
     494      @property
     495      def is_wheel(self) -> bool:
     496          return self.ext == WHEEL_EXTENSION
     497  
     498      @property
     499      def is_vcs(self) -> bool:
     500          from pip._internal.vcs import vcs
     501  
     502          return self.scheme in vcs.all_schemes
     503  
     504      @property
     505      def is_yanked(self) -> bool:
     506          return self.yanked_reason is not None
     507  
     508      @property
     509      def has_hash(self) -> bool:
     510          return bool(self._hashes)
     511  
     512      def is_hash_allowed(self, hashes: Optional[Hashes]) -> bool:
     513          """
     514          Return True if the link has a hash and it is allowed by `hashes`.
     515          """
     516          if hashes is None:
     517              return False
     518          return any(hashes.is_hash_allowed(k, v) for k, v in self._hashes.items())
     519  
     520  
     521  class ESC[4;38;5;81m_CleanResult(ESC[4;38;5;149mNamedTuple):
     522      """Convert link for equivalency check.
     523  
     524      This is used in the resolver to check whether two URL-specified requirements
     525      likely point to the same distribution and can be considered equivalent. This
     526      equivalency logic avoids comparing URLs literally, which can be too strict
     527      (e.g. "a=1&b=2" vs "b=2&a=1") and produce conflicts unexpecting to users.
     528  
     529      Currently this does three things:
     530  
     531      1. Drop the basic auth part. This is technically wrong since a server can
     532         serve different content based on auth, but if it does that, it is even
     533         impossible to guarantee two URLs without auth are equivalent, since
     534         the user can input different auth information when prompted. So the
     535         practical solution is to assume the auth doesn't affect the response.
     536      2. Parse the query to avoid the ordering issue. Note that ordering under the
     537         same key in the query are NOT cleaned; i.e. "a=1&a=2" and "a=2&a=1" are
     538         still considered different.
     539      3. Explicitly drop most of the fragment part, except ``subdirectory=`` and
     540         hash values, since it should have no impact the downloaded content. Note
     541         that this drops the "egg=" part historically used to denote the requested
     542         project (and extras), which is wrong in the strictest sense, but too many
     543         people are supplying it inconsistently to cause superfluous resolution
     544         conflicts, so we choose to also ignore them.
     545      """
     546  
     547      parsed: urllib.parse.SplitResult
     548      query: Dict[str, List[str]]
     549      subdirectory: str
     550      hashes: Dict[str, str]
     551  
     552  
     553  def _clean_link(link: Link) -> _CleanResult:
     554      parsed = link._parsed_url
     555      netloc = parsed.netloc.rsplit("@", 1)[-1]
     556      # According to RFC 8089, an empty host in file: means localhost.
     557      if parsed.scheme == "file" and not netloc:
     558          netloc = "localhost"
     559      fragment = urllib.parse.parse_qs(parsed.fragment)
     560      if "egg" in fragment:
     561          logger.debug("Ignoring egg= fragment in %s", link)
     562      try:
     563          # If there are multiple subdirectory values, use the first one.
     564          # This matches the behavior of Link.subdirectory_fragment.
     565          subdirectory = fragment["subdirectory"][0]
     566      except (IndexError, KeyError):
     567          subdirectory = ""
     568      # If there are multiple hash values under the same algorithm, use the
     569      # first one. This matches the behavior of Link.hash_value.
     570      hashes = {k: fragment[k][0] for k in _SUPPORTED_HASHES if k in fragment}
     571      return _CleanResult(
     572          parsed=parsed._replace(netloc=netloc, query="", fragment=""),
     573          query=urllib.parse.parse_qs(parsed.query),
     574          subdirectory=subdirectory,
     575          hashes=hashes,
     576      )
     577  
     578  
     579  @functools.lru_cache(maxsize=None)
     580  def links_equivalent(link1: Link, link2: Link) -> bool:
     581      return _clean_link(link1) == _clean_link(link2)