python (3.11.7)
       1  """PipSession and supporting code, containing all pip-specific
       2  network request configuration and behavior.
       3  """
       4  
       5  import email.utils
       6  import io
       7  import ipaddress
       8  import json
       9  import logging
      10  import mimetypes
      11  import os
      12  import platform
      13  import shutil
      14  import subprocess
      15  import sys
      16  import urllib.parse
      17  import warnings
      18  from typing import (
      19      TYPE_CHECKING,
      20      Any,
      21      Dict,
      22      Generator,
      23      List,
      24      Mapping,
      25      Optional,
      26      Sequence,
      27      Tuple,
      28      Union,
      29  )
      30  
      31  from pip._vendor import requests, urllib3
      32  from pip._vendor.cachecontrol import CacheControlAdapter as _BaseCacheControlAdapter
      33  from pip._vendor.requests.adapters import DEFAULT_POOLBLOCK, BaseAdapter
      34  from pip._vendor.requests.adapters import HTTPAdapter as _BaseHTTPAdapter
      35  from pip._vendor.requests.models import PreparedRequest, Response
      36  from pip._vendor.requests.structures import CaseInsensitiveDict
      37  from pip._vendor.urllib3.connectionpool import ConnectionPool
      38  from pip._vendor.urllib3.exceptions import InsecureRequestWarning
      39  
      40  from pip import __version__
      41  from pip._internal.metadata import get_default_environment
      42  from pip._internal.models.link import Link
      43  from pip._internal.network.auth import MultiDomainBasicAuth
      44  from pip._internal.network.cache import SafeFileCache
      45  
      46  # Import ssl from compat so the initial import occurs in only one place.
      47  from pip._internal.utils.compat import has_tls
      48  from pip._internal.utils.glibc import libc_ver
      49  from pip._internal.utils.misc import build_url_from_netloc, parse_netloc
      50  from pip._internal.utils.urls import url_to_path
      51  
      52  if TYPE_CHECKING:
      53      from ssl import SSLContext
      54  
      55      from pip._vendor.urllib3.poolmanager import PoolManager
      56  
      57  
      58  logger = logging.getLogger(__name__)
      59  
      60  SecureOrigin = Tuple[str, str, Optional[Union[int, str]]]
      61  
      62  
      63  # Ignore warning raised when using --trusted-host.
      64  warnings.filterwarnings("ignore", category=InsecureRequestWarning)
      65  
      66  
      67  SECURE_ORIGINS: List[SecureOrigin] = [
      68      # protocol, hostname, port
      69      # Taken from Chrome's list of secure origins (See: http://bit.ly/1qrySKC)
      70      ("https", "*", "*"),
      71      ("*", "localhost", "*"),
      72      ("*", "127.0.0.0/8", "*"),
      73      ("*", "::1/128", "*"),
      74      ("file", "*", None),
      75      # ssh is always secure.
      76      ("ssh", "*", "*"),
      77  ]
      78  
      79  
      80  # These are environment variables present when running under various
      81  # CI systems.  For each variable, some CI systems that use the variable
      82  # are indicated.  The collection was chosen so that for each of a number
      83  # of popular systems, at least one of the environment variables is used.
      84  # This list is used to provide some indication of and lower bound for
      85  # CI traffic to PyPI.  Thus, it is okay if the list is not comprehensive.
      86  # For more background, see: https://github.com/pypa/pip/issues/5499
      87  CI_ENVIRONMENT_VARIABLES = (
      88      # Azure Pipelines
      89      "BUILD_BUILDID",
      90      # Jenkins
      91      "BUILD_ID",
      92      # AppVeyor, CircleCI, Codeship, Gitlab CI, Shippable, Travis CI
      93      "CI",
      94      # Explicit environment variable.
      95      "PIP_IS_CI",
      96  )
      97  
      98  
      99  def looks_like_ci() -> bool:
     100      """
     101      Return whether it looks like pip is running under CI.
     102      """
     103      # We don't use the method of checking for a tty (e.g. using isatty())
     104      # because some CI systems mimic a tty (e.g. Travis CI).  Thus that
     105      # method doesn't provide definitive information in either direction.
     106      return any(name in os.environ for name in CI_ENVIRONMENT_VARIABLES)
     107  
     108  
     109  def user_agent() -> str:
     110      """
     111      Return a string representing the user agent.
     112      """
     113      data: Dict[str, Any] = {
     114          "installer": {"name": "pip", "version": __version__},
     115          "python": platform.python_version(),
     116          "implementation": {
     117              "name": platform.python_implementation(),
     118          },
     119      }
     120  
     121      if data["implementation"]["name"] == "CPython":
     122          data["implementation"]["version"] = platform.python_version()
     123      elif data["implementation"]["name"] == "PyPy":
     124          pypy_version_info = sys.pypy_version_info  # type: ignore
     125          if pypy_version_info.releaselevel == "final":
     126              pypy_version_info = pypy_version_info[:3]
     127          data["implementation"]["version"] = ".".join(
     128              [str(x) for x in pypy_version_info]
     129          )
     130      elif data["implementation"]["name"] == "Jython":
     131          # Complete Guess
     132          data["implementation"]["version"] = platform.python_version()
     133      elif data["implementation"]["name"] == "IronPython":
     134          # Complete Guess
     135          data["implementation"]["version"] = platform.python_version()
     136  
     137      if sys.platform.startswith("linux"):
     138          from pip._vendor import distro
     139  
     140          linux_distribution = distro.name(), distro.version(), distro.codename()
     141          distro_infos: Dict[str, Any] = dict(
     142              filter(
     143                  lambda x: x[1],
     144                  zip(["name", "version", "id"], linux_distribution),
     145              )
     146          )
     147          libc = dict(
     148              filter(
     149                  lambda x: x[1],
     150                  zip(["lib", "version"], libc_ver()),
     151              )
     152          )
     153          if libc:
     154              distro_infos["libc"] = libc
     155          if distro_infos:
     156              data["distro"] = distro_infos
     157  
     158      if sys.platform.startswith("darwin") and platform.mac_ver()[0]:
     159          data["distro"] = {"name": "macOS", "version": platform.mac_ver()[0]}
     160  
     161      if platform.system():
     162          data.setdefault("system", {})["name"] = platform.system()
     163  
     164      if platform.release():
     165          data.setdefault("system", {})["release"] = platform.release()
     166  
     167      if platform.machine():
     168          data["cpu"] = platform.machine()
     169  
     170      if has_tls():
     171          import _ssl as ssl
     172  
     173          data["openssl_version"] = ssl.OPENSSL_VERSION
     174  
     175      setuptools_dist = get_default_environment().get_distribution("setuptools")
     176      if setuptools_dist is not None:
     177          data["setuptools_version"] = str(setuptools_dist.version)
     178  
     179      if shutil.which("rustc") is not None:
     180          # If for any reason `rustc --version` fails, silently ignore it
     181          try:
     182              rustc_output = subprocess.check_output(
     183                  ["rustc", "--version"], stderr=subprocess.STDOUT, timeout=0.5
     184              )
     185          except Exception:
     186              pass
     187          else:
     188              if rustc_output.startswith(b"rustc "):
     189                  # The format of `rustc --version` is:
     190                  # `b'rustc 1.52.1 (9bc8c42bb 2021-05-09)\n'`
     191                  # We extract just the middle (1.52.1) part
     192                  data["rustc_version"] = rustc_output.split(b" ")[1].decode()
     193  
     194      # Use None rather than False so as not to give the impression that
     195      # pip knows it is not being run under CI.  Rather, it is a null or
     196      # inconclusive result.  Also, we include some value rather than no
     197      # value to make it easier to know that the check has been run.
     198      data["ci"] = True if looks_like_ci() else None
     199  
     200      user_data = os.environ.get("PIP_USER_AGENT_USER_DATA")
     201      if user_data is not None:
     202          data["user_data"] = user_data
     203  
     204      return "{data[installer][name]}/{data[installer][version]} {json}".format(
     205          data=data,
     206          json=json.dumps(data, separators=(",", ":"), sort_keys=True),
     207      )
     208  
     209  
     210  class ESC[4;38;5;81mLocalFSAdapter(ESC[4;38;5;149mBaseAdapter):
     211      def send(
     212          self,
     213          request: PreparedRequest,
     214          stream: bool = False,
     215          timeout: Optional[Union[float, Tuple[float, float]]] = None,
     216          verify: Union[bool, str] = True,
     217          cert: Optional[Union[str, Tuple[str, str]]] = None,
     218          proxies: Optional[Mapping[str, str]] = None,
     219      ) -> Response:
     220          pathname = url_to_path(request.url)
     221  
     222          resp = Response()
     223          resp.status_code = 200
     224          resp.url = request.url
     225  
     226          try:
     227              stats = os.stat(pathname)
     228          except OSError as exc:
     229              # format the exception raised as a io.BytesIO object,
     230              # to return a better error message:
     231              resp.status_code = 404
     232              resp.reason = type(exc).__name__
     233              resp.raw = io.BytesIO(f"{resp.reason}: {exc}".encode("utf8"))
     234          else:
     235              modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
     236              content_type = mimetypes.guess_type(pathname)[0] or "text/plain"
     237              resp.headers = CaseInsensitiveDict(
     238                  {
     239                      "Content-Type": content_type,
     240                      "Content-Length": stats.st_size,
     241                      "Last-Modified": modified,
     242                  }
     243              )
     244  
     245              resp.raw = open(pathname, "rb")
     246              resp.close = resp.raw.close
     247  
     248          return resp
     249  
     250      def close(self) -> None:
     251          pass
     252  
     253  
     254  class ESC[4;38;5;81m_SSLContextAdapterMixin:
     255      """Mixin to add the ``ssl_context`` constructor argument to HTTP adapters.
     256  
     257      The additional argument is forwarded directly to the pool manager. This allows us
     258      to dynamically decide what SSL store to use at runtime, which is used to implement
     259      the optional ``truststore`` backend.
     260      """
     261  
     262      def __init__(
     263          self,
     264          *,
     265          ssl_context: Optional["SSLContext"] = None,
     266          **kwargs: Any,
     267      ) -> None:
     268          self._ssl_context = ssl_context
     269          super().__init__(**kwargs)
     270  
     271      def init_poolmanager(
     272          self,
     273          connections: int,
     274          maxsize: int,
     275          block: bool = DEFAULT_POOLBLOCK,
     276          **pool_kwargs: Any,
     277      ) -> "PoolManager":
     278          if self._ssl_context is not None:
     279              pool_kwargs.setdefault("ssl_context", self._ssl_context)
     280          return super().init_poolmanager(  # type: ignore[misc]
     281              connections=connections,
     282              maxsize=maxsize,
     283              block=block,
     284              **pool_kwargs,
     285          )
     286  
     287  
     288  class ESC[4;38;5;81mHTTPAdapter(ESC[4;38;5;149m_SSLContextAdapterMixin, ESC[4;38;5;149m_BaseHTTPAdapter):
     289      pass
     290  
     291  
     292  class ESC[4;38;5;81mCacheControlAdapter(ESC[4;38;5;149m_SSLContextAdapterMixin, ESC[4;38;5;149m_BaseCacheControlAdapter):
     293      pass
     294  
     295  
     296  class ESC[4;38;5;81mInsecureHTTPAdapter(ESC[4;38;5;149mHTTPAdapter):
     297      def cert_verify(
     298          self,
     299          conn: ConnectionPool,
     300          url: str,
     301          verify: Union[bool, str],
     302          cert: Optional[Union[str, Tuple[str, str]]],
     303      ) -> None:
     304          super().cert_verify(conn=conn, url=url, verify=False, cert=cert)
     305  
     306  
     307  class ESC[4;38;5;81mInsecureCacheControlAdapter(ESC[4;38;5;149mCacheControlAdapter):
     308      def cert_verify(
     309          self,
     310          conn: ConnectionPool,
     311          url: str,
     312          verify: Union[bool, str],
     313          cert: Optional[Union[str, Tuple[str, str]]],
     314      ) -> None:
     315          super().cert_verify(conn=conn, url=url, verify=False, cert=cert)
     316  
     317  
     318  class ESC[4;38;5;81mPipSession(ESC[4;38;5;149mrequestsESC[4;38;5;149m.ESC[4;38;5;149mSession):
     319      timeout: Optional[int] = None
     320  
     321      def __init__(
     322          self,
     323          *args: Any,
     324          retries: int = 0,
     325          cache: Optional[str] = None,
     326          trusted_hosts: Sequence[str] = (),
     327          index_urls: Optional[List[str]] = None,
     328          ssl_context: Optional["SSLContext"] = None,
     329          **kwargs: Any,
     330      ) -> None:
     331          """
     332          :param trusted_hosts: Domains not to emit warnings for when not using
     333              HTTPS.
     334          """
     335          super().__init__(*args, **kwargs)
     336  
     337          # Namespace the attribute with "pip_" just in case to prevent
     338          # possible conflicts with the base class.
     339          self.pip_trusted_origins: List[Tuple[str, Optional[int]]] = []
     340  
     341          # Attach our User Agent to the request
     342          self.headers["User-Agent"] = user_agent()
     343  
     344          # Attach our Authentication handler to the session
     345          self.auth = MultiDomainBasicAuth(index_urls=index_urls)
     346  
     347          # Create our urllib3.Retry instance which will allow us to customize
     348          # how we handle retries.
     349          retries = urllib3.Retry(
     350              # Set the total number of retries that a particular request can
     351              # have.
     352              total=retries,
     353              # A 503 error from PyPI typically means that the Fastly -> Origin
     354              # connection got interrupted in some way. A 503 error in general
     355              # is typically considered a transient error so we'll go ahead and
     356              # retry it.
     357              # A 500 may indicate transient error in Amazon S3
     358              # A 520 or 527 - may indicate transient error in CloudFlare
     359              status_forcelist=[500, 503, 520, 527],
     360              # Add a small amount of back off between failed requests in
     361              # order to prevent hammering the service.
     362              backoff_factor=0.25,
     363          )  # type: ignore
     364  
     365          # Our Insecure HTTPAdapter disables HTTPS validation. It does not
     366          # support caching so we'll use it for all http:// URLs.
     367          # If caching is disabled, we will also use it for
     368          # https:// hosts that we've marked as ignoring
     369          # TLS errors for (trusted-hosts).
     370          insecure_adapter = InsecureHTTPAdapter(max_retries=retries)
     371  
     372          # We want to _only_ cache responses on securely fetched origins or when
     373          # the host is specified as trusted. We do this because
     374          # we can't validate the response of an insecurely/untrusted fetched
     375          # origin, and we don't want someone to be able to poison the cache and
     376          # require manual eviction from the cache to fix it.
     377          if cache:
     378              secure_adapter = CacheControlAdapter(
     379                  cache=SafeFileCache(cache),
     380                  max_retries=retries,
     381                  ssl_context=ssl_context,
     382              )
     383              self._trusted_host_adapter = InsecureCacheControlAdapter(
     384                  cache=SafeFileCache(cache),
     385                  max_retries=retries,
     386              )
     387          else:
     388              secure_adapter = HTTPAdapter(max_retries=retries, ssl_context=ssl_context)
     389              self._trusted_host_adapter = insecure_adapter
     390  
     391          self.mount("https://", secure_adapter)
     392          self.mount("http://", insecure_adapter)
     393  
     394          # Enable file:// urls
     395          self.mount("file://", LocalFSAdapter())
     396  
     397          for host in trusted_hosts:
     398              self.add_trusted_host(host, suppress_logging=True)
     399  
     400      def update_index_urls(self, new_index_urls: List[str]) -> None:
     401          """
     402          :param new_index_urls: New index urls to update the authentication
     403              handler with.
     404          """
     405          self.auth.index_urls = new_index_urls
     406  
     407      def add_trusted_host(
     408          self, host: str, source: Optional[str] = None, suppress_logging: bool = False
     409      ) -> None:
     410          """
     411          :param host: It is okay to provide a host that has previously been
     412              added.
     413          :param source: An optional source string, for logging where the host
     414              string came from.
     415          """
     416          if not suppress_logging:
     417              msg = f"adding trusted host: {host!r}"
     418              if source is not None:
     419                  msg += f" (from {source})"
     420              logger.info(msg)
     421  
     422          parsed_host, parsed_port = parse_netloc(host)
     423          if parsed_host is None:
     424              raise ValueError(f"Trusted host URL must include a host part: {host!r}")
     425          if (parsed_host, parsed_port) not in self.pip_trusted_origins:
     426              self.pip_trusted_origins.append((parsed_host, parsed_port))
     427  
     428          self.mount(
     429              build_url_from_netloc(host, scheme="http") + "/", self._trusted_host_adapter
     430          )
     431          self.mount(build_url_from_netloc(host) + "/", self._trusted_host_adapter)
     432          if not parsed_port:
     433              self.mount(
     434                  build_url_from_netloc(host, scheme="http") + ":",
     435                  self._trusted_host_adapter,
     436              )
     437              # Mount wildcard ports for the same host.
     438              self.mount(build_url_from_netloc(host) + ":", self._trusted_host_adapter)
     439  
     440      def iter_secure_origins(self) -> Generator[SecureOrigin, None, None]:
     441          yield from SECURE_ORIGINS
     442          for host, port in self.pip_trusted_origins:
     443              yield ("*", host, "*" if port is None else port)
     444  
     445      def is_secure_origin(self, location: Link) -> bool:
     446          # Determine if this url used a secure transport mechanism
     447          parsed = urllib.parse.urlparse(str(location))
     448          origin_protocol, origin_host, origin_port = (
     449              parsed.scheme,
     450              parsed.hostname,
     451              parsed.port,
     452          )
     453  
     454          # The protocol to use to see if the protocol matches.
     455          # Don't count the repository type as part of the protocol: in
     456          # cases such as "git+ssh", only use "ssh". (I.e., Only verify against
     457          # the last scheme.)
     458          origin_protocol = origin_protocol.rsplit("+", 1)[-1]
     459  
     460          # Determine if our origin is a secure origin by looking through our
     461          # hardcoded list of secure origins, as well as any additional ones
     462          # configured on this PackageFinder instance.
     463          for secure_origin in self.iter_secure_origins():
     464              secure_protocol, secure_host, secure_port = secure_origin
     465              if origin_protocol != secure_protocol and secure_protocol != "*":
     466                  continue
     467  
     468              try:
     469                  addr = ipaddress.ip_address(origin_host or "")
     470                  network = ipaddress.ip_network(secure_host)
     471              except ValueError:
     472                  # We don't have both a valid address or a valid network, so
     473                  # we'll check this origin against hostnames.
     474                  if (
     475                      origin_host
     476                      and origin_host.lower() != secure_host.lower()
     477                      and secure_host != "*"
     478                  ):
     479                      continue
     480              else:
     481                  # We have a valid address and network, so see if the address
     482                  # is contained within the network.
     483                  if addr not in network:
     484                      continue
     485  
     486              # Check to see if the port matches.
     487              if (
     488                  origin_port != secure_port
     489                  and secure_port != "*"
     490                  and secure_port is not None
     491              ):
     492                  continue
     493  
     494              # If we've gotten here, then this origin matches the current
     495              # secure origin and we should return True
     496              return True
     497  
     498          # If we've gotten to this point, then the origin isn't secure and we
     499          # will not accept it as a valid location to search. We will however
     500          # log a warning that we are ignoring it.
     501          logger.warning(
     502              "The repository located at %s is not a trusted or secure host and "
     503              "is being ignored. If this repository is available via HTTPS we "
     504              "recommend you use HTTPS instead, otherwise you may silence "
     505              "this warning and allow it anyway with '--trusted-host %s'.",
     506              origin_host,
     507              origin_host,
     508          )
     509  
     510          return False
     511  
     512      def request(self, method: str, url: str, *args: Any, **kwargs: Any) -> Response:
     513          # Allow setting a default timeout on a session
     514          kwargs.setdefault("timeout", self.timeout)
     515          # Allow setting a default proxies on a session
     516          kwargs.setdefault("proxies", self.proxies)
     517  
     518          # Dispatch the actual request
     519          return super().request(method, url, *args, **kwargs)