python (3.11.7)
       1  """
       2  The main purpose of this module is to expose LinkCollector.collect_sources().
       3  """
       4  
       5  import collections
       6  import email.message
       7  import functools
       8  import itertools
       9  import json
      10  import logging
      11  import os
      12  import urllib.parse
      13  import urllib.request
      14  from html.parser import HTMLParser
      15  from optparse import Values
      16  from typing import (
      17      TYPE_CHECKING,
      18      Callable,
      19      Dict,
      20      Iterable,
      21      List,
      22      MutableMapping,
      23      NamedTuple,
      24      Optional,
      25      Sequence,
      26      Tuple,
      27      Union,
      28  )
      29  
      30  from pip._vendor import requests
      31  from pip._vendor.requests import Response
      32  from pip._vendor.requests.exceptions import RetryError, SSLError
      33  
      34  from pip._internal.exceptions import NetworkConnectionError
      35  from pip._internal.models.link import Link
      36  from pip._internal.models.search_scope import SearchScope
      37  from pip._internal.network.session import PipSession
      38  from pip._internal.network.utils import raise_for_status
      39  from pip._internal.utils.filetypes import is_archive_file
      40  from pip._internal.utils.misc import redact_auth_from_url
      41  from pip._internal.vcs import vcs
      42  
      43  from .sources import CandidatesFromPage, LinkSource, build_source
      44  
      45  if TYPE_CHECKING:
      46      from typing import Protocol
      47  else:
      48      Protocol = object
      49  
      50  logger = logging.getLogger(__name__)
      51  
      52  ResponseHeaders = MutableMapping[str, str]
      53  
      54  
      55  def _match_vcs_scheme(url: str) -> Optional[str]:
      56      """Look for VCS schemes in the URL.
      57  
      58      Returns the matched VCS scheme, or None if there's no match.
      59      """
      60      for scheme in vcs.schemes:
      61          if url.lower().startswith(scheme) and url[len(scheme)] in "+:":
      62              return scheme
      63      return None
      64  
      65  
      66  class ESC[4;38;5;81m_NotAPIContent(ESC[4;38;5;149mException):
      67      def __init__(self, content_type: str, request_desc: str) -> None:
      68          super().__init__(content_type, request_desc)
      69          self.content_type = content_type
      70          self.request_desc = request_desc
      71  
      72  
      73  def _ensure_api_header(response: Response) -> None:
      74      """
      75      Check the Content-Type header to ensure the response contains a Simple
      76      API Response.
      77  
      78      Raises `_NotAPIContent` if the content type is not a valid content-type.
      79      """
      80      content_type = response.headers.get("Content-Type", "Unknown")
      81  
      82      content_type_l = content_type.lower()
      83      if content_type_l.startswith(
      84          (
      85              "text/html",
      86              "application/vnd.pypi.simple.v1+html",
      87              "application/vnd.pypi.simple.v1+json",
      88          )
      89      ):
      90          return
      91  
      92      raise _NotAPIContent(content_type, response.request.method)
      93  
      94  
      95  class ESC[4;38;5;81m_NotHTTP(ESC[4;38;5;149mException):
      96      pass
      97  
      98  
      99  def _ensure_api_response(url: str, session: PipSession) -> None:
     100      """
     101      Send a HEAD request to the URL, and ensure the response contains a simple
     102      API Response.
     103  
     104      Raises `_NotHTTP` if the URL is not available for a HEAD request, or
     105      `_NotAPIContent` if the content type is not a valid content type.
     106      """
     107      scheme, netloc, path, query, fragment = urllib.parse.urlsplit(url)
     108      if scheme not in {"http", "https"}:
     109          raise _NotHTTP()
     110  
     111      resp = session.head(url, allow_redirects=True)
     112      raise_for_status(resp)
     113  
     114      _ensure_api_header(resp)
     115  
     116  
     117  def _get_simple_response(url: str, session: PipSession) -> Response:
     118      """Access an Simple API response with GET, and return the response.
     119  
     120      This consists of three parts:
     121  
     122      1. If the URL looks suspiciously like an archive, send a HEAD first to
     123         check the Content-Type is HTML or Simple API, to avoid downloading a
     124         large file. Raise `_NotHTTP` if the content type cannot be determined, or
     125         `_NotAPIContent` if it is not HTML or a Simple API.
     126      2. Actually perform the request. Raise HTTP exceptions on network failures.
     127      3. Check the Content-Type header to make sure we got a Simple API response,
     128         and raise `_NotAPIContent` otherwise.
     129      """
     130      if is_archive_file(Link(url).filename):
     131          _ensure_api_response(url, session=session)
     132  
     133      logger.debug("Getting page %s", redact_auth_from_url(url))
     134  
     135      resp = session.get(
     136          url,
     137          headers={
     138              "Accept": ", ".join(
     139                  [
     140                      "application/vnd.pypi.simple.v1+json",
     141                      "application/vnd.pypi.simple.v1+html; q=0.1",
     142                      "text/html; q=0.01",
     143                  ]
     144              ),
     145              # We don't want to blindly returned cached data for
     146              # /simple/, because authors generally expecting that
     147              # twine upload && pip install will function, but if
     148              # they've done a pip install in the last ~10 minutes
     149              # it won't. Thus by setting this to zero we will not
     150              # blindly use any cached data, however the benefit of
     151              # using max-age=0 instead of no-cache, is that we will
     152              # still support conditional requests, so we will still
     153              # minimize traffic sent in cases where the page hasn't
     154              # changed at all, we will just always incur the round
     155              # trip for the conditional GET now instead of only
     156              # once per 10 minutes.
     157              # For more information, please see pypa/pip#5670.
     158              "Cache-Control": "max-age=0",
     159          },
     160      )
     161      raise_for_status(resp)
     162  
     163      # The check for archives above only works if the url ends with
     164      # something that looks like an archive. However that is not a
     165      # requirement of an url. Unless we issue a HEAD request on every
     166      # url we cannot know ahead of time for sure if something is a
     167      # Simple API response or not. However we can check after we've
     168      # downloaded it.
     169      _ensure_api_header(resp)
     170  
     171      logger.debug(
     172          "Fetched page %s as %s",
     173          redact_auth_from_url(url),
     174          resp.headers.get("Content-Type", "Unknown"),
     175      )
     176  
     177      return resp
     178  
     179  
     180  def _get_encoding_from_headers(headers: ResponseHeaders) -> Optional[str]:
     181      """Determine if we have any encoding information in our headers."""
     182      if headers and "Content-Type" in headers:
     183          m = email.message.Message()
     184          m["content-type"] = headers["Content-Type"]
     185          charset = m.get_param("charset")
     186          if charset:
     187              return str(charset)
     188      return None
     189  
     190  
     191  class ESC[4;38;5;81mCacheablePageContent:
     192      def __init__(self, page: "IndexContent") -> None:
     193          assert page.cache_link_parsing
     194          self.page = page
     195  
     196      def __eq__(self, other: object) -> bool:
     197          return isinstance(other, type(self)) and self.page.url == other.page.url
     198  
     199      def __hash__(self) -> int:
     200          return hash(self.page.url)
     201  
     202  
     203  class ESC[4;38;5;81mParseLinks(ESC[4;38;5;149mProtocol):
     204      def __call__(self, page: "IndexContent") -> Iterable[Link]:
     205          ...
     206  
     207  
     208  def with_cached_index_content(fn: ParseLinks) -> ParseLinks:
     209      """
     210      Given a function that parses an Iterable[Link] from an IndexContent, cache the
     211      function's result (keyed by CacheablePageContent), unless the IndexContent
     212      `page` has `page.cache_link_parsing == False`.
     213      """
     214  
     215      @functools.lru_cache(maxsize=None)
     216      def wrapper(cacheable_page: CacheablePageContent) -> List[Link]:
     217          return list(fn(cacheable_page.page))
     218  
     219      @functools.wraps(fn)
     220      def wrapper_wrapper(page: "IndexContent") -> List[Link]:
     221          if page.cache_link_parsing:
     222              return wrapper(CacheablePageContent(page))
     223          return list(fn(page))
     224  
     225      return wrapper_wrapper
     226  
     227  
     228  @with_cached_index_content
     229  def parse_links(page: "IndexContent") -> Iterable[Link]:
     230      """
     231      Parse a Simple API's Index Content, and yield its anchor elements as Link objects.
     232      """
     233  
     234      content_type_l = page.content_type.lower()
     235      if content_type_l.startswith("application/vnd.pypi.simple.v1+json"):
     236          data = json.loads(page.content)
     237          for file in data.get("files", []):
     238              link = Link.from_json(file, page.url)
     239              if link is None:
     240                  continue
     241              yield link
     242          return
     243  
     244      parser = HTMLLinkParser(page.url)
     245      encoding = page.encoding or "utf-8"
     246      parser.feed(page.content.decode(encoding))
     247  
     248      url = page.url
     249      base_url = parser.base_url or url
     250      for anchor in parser.anchors:
     251          link = Link.from_element(anchor, page_url=url, base_url=base_url)
     252          if link is None:
     253              continue
     254          yield link
     255  
     256  
     257  class ESC[4;38;5;81mIndexContent:
     258      """Represents one response (or page), along with its URL"""
     259  
     260      def __init__(
     261          self,
     262          content: bytes,
     263          content_type: str,
     264          encoding: Optional[str],
     265          url: str,
     266          cache_link_parsing: bool = True,
     267      ) -> None:
     268          """
     269          :param encoding: the encoding to decode the given content.
     270          :param url: the URL from which the HTML was downloaded.
     271          :param cache_link_parsing: whether links parsed from this page's url
     272                                     should be cached. PyPI index urls should
     273                                     have this set to False, for example.
     274          """
     275          self.content = content
     276          self.content_type = content_type
     277          self.encoding = encoding
     278          self.url = url
     279          self.cache_link_parsing = cache_link_parsing
     280  
     281      def __str__(self) -> str:
     282          return redact_auth_from_url(self.url)
     283  
     284  
     285  class ESC[4;38;5;81mHTMLLinkParser(ESC[4;38;5;149mHTMLParser):
     286      """
     287      HTMLParser that keeps the first base HREF and a list of all anchor
     288      elements' attributes.
     289      """
     290  
     291      def __init__(self, url: str) -> None:
     292          super().__init__(convert_charrefs=True)
     293  
     294          self.url: str = url
     295          self.base_url: Optional[str] = None
     296          self.anchors: List[Dict[str, Optional[str]]] = []
     297  
     298      def handle_starttag(self, tag: str, attrs: List[Tuple[str, Optional[str]]]) -> None:
     299          if tag == "base" and self.base_url is None:
     300              href = self.get_href(attrs)
     301              if href is not None:
     302                  self.base_url = href
     303          elif tag == "a":
     304              self.anchors.append(dict(attrs))
     305  
     306      def get_href(self, attrs: List[Tuple[str, Optional[str]]]) -> Optional[str]:
     307          for name, value in attrs:
     308              if name == "href":
     309                  return value
     310          return None
     311  
     312  
     313  def _handle_get_simple_fail(
     314      link: Link,
     315      reason: Union[str, Exception],
     316      meth: Optional[Callable[..., None]] = None,
     317  ) -> None:
     318      if meth is None:
     319          meth = logger.debug
     320      meth("Could not fetch URL %s: %s - skipping", link, reason)
     321  
     322  
     323  def _make_index_content(
     324      response: Response, cache_link_parsing: bool = True
     325  ) -> IndexContent:
     326      encoding = _get_encoding_from_headers(response.headers)
     327      return IndexContent(
     328          response.content,
     329          response.headers["Content-Type"],
     330          encoding=encoding,
     331          url=response.url,
     332          cache_link_parsing=cache_link_parsing,
     333      )
     334  
     335  
     336  def _get_index_content(link: Link, *, session: PipSession) -> Optional["IndexContent"]:
     337      url = link.url.split("#", 1)[0]
     338  
     339      # Check for VCS schemes that do not support lookup as web pages.
     340      vcs_scheme = _match_vcs_scheme(url)
     341      if vcs_scheme:
     342          logger.warning(
     343              "Cannot look at %s URL %s because it does not support lookup as web pages.",
     344              vcs_scheme,
     345              link,
     346          )
     347          return None
     348  
     349      # Tack index.html onto file:// URLs that point to directories
     350      scheme, _, path, _, _, _ = urllib.parse.urlparse(url)
     351      if scheme == "file" and os.path.isdir(urllib.request.url2pathname(path)):
     352          # add trailing slash if not present so urljoin doesn't trim
     353          # final segment
     354          if not url.endswith("/"):
     355              url += "/"
     356          # TODO: In the future, it would be nice if pip supported PEP 691
     357          #       style responses in the file:// URLs, however there's no
     358          #       standard file extension for application/vnd.pypi.simple.v1+json
     359          #       so we'll need to come up with something on our own.
     360          url = urllib.parse.urljoin(url, "index.html")
     361          logger.debug(" file: URL is directory, getting %s", url)
     362  
     363      try:
     364          resp = _get_simple_response(url, session=session)
     365      except _NotHTTP:
     366          logger.warning(
     367              "Skipping page %s because it looks like an archive, and cannot "
     368              "be checked by a HTTP HEAD request.",
     369              link,
     370          )
     371      except _NotAPIContent as exc:
     372          logger.warning(
     373              "Skipping page %s because the %s request got Content-Type: %s. "
     374              "The only supported Content-Types are application/vnd.pypi.simple.v1+json, "
     375              "application/vnd.pypi.simple.v1+html, and text/html",
     376              link,
     377              exc.request_desc,
     378              exc.content_type,
     379          )
     380      except NetworkConnectionError as exc:
     381          _handle_get_simple_fail(link, exc)
     382      except RetryError as exc:
     383          _handle_get_simple_fail(link, exc)
     384      except SSLError as exc:
     385          reason = "There was a problem confirming the ssl certificate: "
     386          reason += str(exc)
     387          _handle_get_simple_fail(link, reason, meth=logger.info)
     388      except requests.ConnectionError as exc:
     389          _handle_get_simple_fail(link, f"connection error: {exc}")
     390      except requests.Timeout:
     391          _handle_get_simple_fail(link, "timed out")
     392      else:
     393          return _make_index_content(resp, cache_link_parsing=link.cache_link_parsing)
     394      return None
     395  
     396  
     397  class ESC[4;38;5;81mCollectedSources(ESC[4;38;5;149mNamedTuple):
     398      find_links: Sequence[Optional[LinkSource]]
     399      index_urls: Sequence[Optional[LinkSource]]
     400  
     401  
     402  class ESC[4;38;5;81mLinkCollector:
     403  
     404      """
     405      Responsible for collecting Link objects from all configured locations,
     406      making network requests as needed.
     407  
     408      The class's main method is its collect_sources() method.
     409      """
     410  
     411      def __init__(
     412          self,
     413          session: PipSession,
     414          search_scope: SearchScope,
     415      ) -> None:
     416          self.search_scope = search_scope
     417          self.session = session
     418  
     419      @classmethod
     420      def create(
     421          cls,
     422          session: PipSession,
     423          options: Values,
     424          suppress_no_index: bool = False,
     425      ) -> "LinkCollector":
     426          """
     427          :param session: The Session to use to make requests.
     428          :param suppress_no_index: Whether to ignore the --no-index option
     429              when constructing the SearchScope object.
     430          """
     431          index_urls = [options.index_url] + options.extra_index_urls
     432          if options.no_index and not suppress_no_index:
     433              logger.debug(
     434                  "Ignoring indexes: %s",
     435                  ",".join(redact_auth_from_url(url) for url in index_urls),
     436              )
     437              index_urls = []
     438  
     439          # Make sure find_links is a list before passing to create().
     440          find_links = options.find_links or []
     441  
     442          search_scope = SearchScope.create(
     443              find_links=find_links,
     444              index_urls=index_urls,
     445              no_index=options.no_index,
     446          )
     447          link_collector = LinkCollector(
     448              session=session,
     449              search_scope=search_scope,
     450          )
     451          return link_collector
     452  
     453      @property
     454      def find_links(self) -> List[str]:
     455          return self.search_scope.find_links
     456  
     457      def fetch_response(self, location: Link) -> Optional[IndexContent]:
     458          """
     459          Fetch an HTML page containing package links.
     460          """
     461          return _get_index_content(location, session=self.session)
     462  
     463      def collect_sources(
     464          self,
     465          project_name: str,
     466          candidates_from_page: CandidatesFromPage,
     467      ) -> CollectedSources:
     468          # The OrderedDict calls deduplicate sources by URL.
     469          index_url_sources = collections.OrderedDict(
     470              build_source(
     471                  loc,
     472                  candidates_from_page=candidates_from_page,
     473                  page_validator=self.session.is_secure_origin,
     474                  expand_dir=False,
     475                  cache_link_parsing=False,
     476              )
     477              for loc in self.search_scope.get_index_urls_locations(project_name)
     478          ).values()
     479          find_links_sources = collections.OrderedDict(
     480              build_source(
     481                  loc,
     482                  candidates_from_page=candidates_from_page,
     483                  page_validator=self.session.is_secure_origin,
     484                  expand_dir=True,
     485                  cache_link_parsing=True,
     486              )
     487              for loc in self.find_links
     488          ).values()
     489  
     490          if logger.isEnabledFor(logging.DEBUG):
     491              lines = [
     492                  f"* {s.link}"
     493                  for s in itertools.chain(find_links_sources, index_url_sources)
     494                  if s is not None and s.link is not None
     495              ]
     496              lines = [
     497                  f"{len(lines)} location(s) to search "
     498                  f"for versions of {project_name}:"
     499              ] + lines
     500              logger.debug("\n".join(lines))
     501  
     502          return CollectedSources(
     503              find_links=list(find_links_sources),
     504              index_urls=list(index_url_sources),
     505          )