python (3.11.7)

(root)/
lib/
python3.11/
site-packages/
pip/
_internal/
network/
auth.py
       1  """Network Authentication Helpers
       2  
       3  Contains interface (MultiDomainBasicAuth) and associated glue code for
       4  providing credentials in the context of network requests.
       5  """
       6  import logging
       7  import os
       8  import shutil
       9  import subprocess
      10  import sysconfig
      11  import typing
      12  import urllib.parse
      13  from abc import ABC, abstractmethod
      14  from functools import lru_cache
      15  from os.path import commonprefix
      16  from pathlib import Path
      17  from typing import Any, Dict, List, NamedTuple, Optional, Tuple
      18  
      19  from pip._vendor.requests.auth import AuthBase, HTTPBasicAuth
      20  from pip._vendor.requests.models import Request, Response
      21  from pip._vendor.requests.utils import get_netrc_auth
      22  
      23  from pip._internal.utils.logging import getLogger
      24  from pip._internal.utils.misc import (
      25      ask,
      26      ask_input,
      27      ask_password,
      28      remove_auth_from_url,
      29      split_auth_netloc_from_url,
      30  )
      31  from pip._internal.vcs.versioncontrol import AuthInfo
      32  
      33  logger = getLogger(__name__)
      34  
      35  KEYRING_DISABLED = False
      36  
      37  
      38  class ESC[4;38;5;81mCredentials(ESC[4;38;5;149mNamedTuple):
      39      url: str
      40      username: str
      41      password: str
      42  
      43  
      44  class ESC[4;38;5;81mKeyRingBaseProvider(ESC[4;38;5;149mABC):
      45      """Keyring base provider interface"""
      46  
      47      has_keyring: bool
      48  
      49      @abstractmethod
      50      def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]:
      51          ...
      52  
      53      @abstractmethod
      54      def save_auth_info(self, url: str, username: str, password: str) -> None:
      55          ...
      56  
      57  
      58  class ESC[4;38;5;81mKeyRingNullProvider(ESC[4;38;5;149mKeyRingBaseProvider):
      59      """Keyring null provider"""
      60  
      61      has_keyring = False
      62  
      63      def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]:
      64          return None
      65  
      66      def save_auth_info(self, url: str, username: str, password: str) -> None:
      67          return None
      68  
      69  
      70  class ESC[4;38;5;81mKeyRingPythonProvider(ESC[4;38;5;149mKeyRingBaseProvider):
      71      """Keyring interface which uses locally imported `keyring`"""
      72  
      73      has_keyring = True
      74  
      75      def __init__(self) -> None:
      76          import keyring
      77  
      78          self.keyring = keyring
      79  
      80      def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]:
      81          # Support keyring's get_credential interface which supports getting
      82          # credentials without a username. This is only available for
      83          # keyring>=15.2.0.
      84          if hasattr(self.keyring, "get_credential"):
      85              logger.debug("Getting credentials from keyring for %s", url)
      86              cred = self.keyring.get_credential(url, username)
      87              if cred is not None:
      88                  return cred.username, cred.password
      89              return None
      90  
      91          if username is not None:
      92              logger.debug("Getting password from keyring for %s", url)
      93              password = self.keyring.get_password(url, username)
      94              if password:
      95                  return username, password
      96          return None
      97  
      98      def save_auth_info(self, url: str, username: str, password: str) -> None:
      99          self.keyring.set_password(url, username, password)
     100  
     101  
     102  class ESC[4;38;5;81mKeyRingCliProvider(ESC[4;38;5;149mKeyRingBaseProvider):
     103      """Provider which uses `keyring` cli
     104  
     105      Instead of calling the keyring package installed alongside pip
     106      we call keyring on the command line which will enable pip to
     107      use which ever installation of keyring is available first in
     108      PATH.
     109      """
     110  
     111      has_keyring = True
     112  
     113      def __init__(self, cmd: str) -> None:
     114          self.keyring = cmd
     115  
     116      def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]:
     117          # This is the default implementation of keyring.get_credential
     118          # https://github.com/jaraco/keyring/blob/97689324abcf01bd1793d49063e7ca01e03d7d07/keyring/backend.py#L134-L139
     119          if username is not None:
     120              password = self._get_password(url, username)
     121              if password is not None:
     122                  return username, password
     123          return None
     124  
     125      def save_auth_info(self, url: str, username: str, password: str) -> None:
     126          return self._set_password(url, username, password)
     127  
     128      def _get_password(self, service_name: str, username: str) -> Optional[str]:
     129          """Mirror the implementation of keyring.get_password using cli"""
     130          if self.keyring is None:
     131              return None
     132  
     133          cmd = [self.keyring, "get", service_name, username]
     134          env = os.environ.copy()
     135          env["PYTHONIOENCODING"] = "utf-8"
     136          res = subprocess.run(
     137              cmd,
     138              stdin=subprocess.DEVNULL,
     139              stdout=subprocess.PIPE,
     140              env=env,
     141          )
     142          if res.returncode:
     143              return None
     144          return res.stdout.decode("utf-8").strip(os.linesep)
     145  
     146      def _set_password(self, service_name: str, username: str, password: str) -> None:
     147          """Mirror the implementation of keyring.set_password using cli"""
     148          if self.keyring is None:
     149              return None
     150          env = os.environ.copy()
     151          env["PYTHONIOENCODING"] = "utf-8"
     152          subprocess.run(
     153              [self.keyring, "set", service_name, username],
     154              input=f"{password}{os.linesep}".encode("utf-8"),
     155              env=env,
     156              check=True,
     157          )
     158          return None
     159  
     160  
     161  @lru_cache(maxsize=None)
     162  def get_keyring_provider(provider: str) -> KeyRingBaseProvider:
     163      logger.verbose("Keyring provider requested: %s", provider)
     164  
     165      # keyring has previously failed and been disabled
     166      if KEYRING_DISABLED:
     167          provider = "disabled"
     168      if provider in ["import", "auto"]:
     169          try:
     170              impl = KeyRingPythonProvider()
     171              logger.verbose("Keyring provider set: import")
     172              return impl
     173          except ImportError:
     174              pass
     175          except Exception as exc:
     176              # In the event of an unexpected exception
     177              # we should warn the user
     178              msg = "Installed copy of keyring fails with exception %s"
     179              if provider == "auto":
     180                  msg = msg + ", trying to find a keyring executable as a fallback"
     181              logger.warning(msg, exc, exc_info=logger.isEnabledFor(logging.DEBUG))
     182      if provider in ["subprocess", "auto"]:
     183          cli = shutil.which("keyring")
     184          if cli and cli.startswith(sysconfig.get_path("scripts")):
     185              # all code within this function is stolen from shutil.which implementation
     186              @typing.no_type_check
     187              def PATH_as_shutil_which_determines_it() -> str:
     188                  path = os.environ.get("PATH", None)
     189                  if path is None:
     190                      try:
     191                          path = os.confstr("CS_PATH")
     192                      except (AttributeError, ValueError):
     193                          # os.confstr() or CS_PATH is not available
     194                          path = os.defpath
     195                  # bpo-35755: Don't use os.defpath if the PATH environment variable is
     196                  # set to an empty string
     197  
     198                  return path
     199  
     200              scripts = Path(sysconfig.get_path("scripts"))
     201  
     202              paths = []
     203              for path in PATH_as_shutil_which_determines_it().split(os.pathsep):
     204                  p = Path(path)
     205                  try:
     206                      if not p.samefile(scripts):
     207                          paths.append(path)
     208                  except FileNotFoundError:
     209                      pass
     210  
     211              path = os.pathsep.join(paths)
     212  
     213              cli = shutil.which("keyring", path=path)
     214  
     215          if cli:
     216              logger.verbose("Keyring provider set: subprocess with executable %s", cli)
     217              return KeyRingCliProvider(cli)
     218  
     219      logger.verbose("Keyring provider set: disabled")
     220      return KeyRingNullProvider()
     221  
     222  
     223  class ESC[4;38;5;81mMultiDomainBasicAuth(ESC[4;38;5;149mAuthBase):
     224      def __init__(
     225          self,
     226          prompting: bool = True,
     227          index_urls: Optional[List[str]] = None,
     228          keyring_provider: str = "auto",
     229      ) -> None:
     230          self.prompting = prompting
     231          self.index_urls = index_urls
     232          self.keyring_provider = keyring_provider  # type: ignore[assignment]
     233          self.passwords: Dict[str, AuthInfo] = {}
     234          # When the user is prompted to enter credentials and keyring is
     235          # available, we will offer to save them. If the user accepts,
     236          # this value is set to the credentials they entered. After the
     237          # request authenticates, the caller should call
     238          # ``save_credentials`` to save these.
     239          self._credentials_to_save: Optional[Credentials] = None
     240  
     241      @property
     242      def keyring_provider(self) -> KeyRingBaseProvider:
     243          return get_keyring_provider(self._keyring_provider)
     244  
     245      @keyring_provider.setter
     246      def keyring_provider(self, provider: str) -> None:
     247          # The free function get_keyring_provider has been decorated with
     248          # functools.cache. If an exception occurs in get_keyring_auth that
     249          # cache will be cleared and keyring disabled, take that into account
     250          # if you want to remove this indirection.
     251          self._keyring_provider = provider
     252  
     253      @property
     254      def use_keyring(self) -> bool:
     255          # We won't use keyring when --no-input is passed unless
     256          # a specific provider is requested because it might require
     257          # user interaction
     258          return self.prompting or self._keyring_provider not in ["auto", "disabled"]
     259  
     260      def _get_keyring_auth(
     261          self,
     262          url: Optional[str],
     263          username: Optional[str],
     264      ) -> Optional[AuthInfo]:
     265          """Return the tuple auth for a given url from keyring."""
     266          # Do nothing if no url was provided
     267          if not url:
     268              return None
     269  
     270          try:
     271              return self.keyring_provider.get_auth_info(url, username)
     272          except Exception as exc:
     273              logger.warning(
     274                  "Keyring is skipped due to an exception: %s",
     275                  str(exc),
     276              )
     277              global KEYRING_DISABLED
     278              KEYRING_DISABLED = True
     279              get_keyring_provider.cache_clear()
     280              return None
     281  
     282      def _get_index_url(self, url: str) -> Optional[str]:
     283          """Return the original index URL matching the requested URL.
     284  
     285          Cached or dynamically generated credentials may work against
     286          the original index URL rather than just the netloc.
     287  
     288          The provided url should have had its username and password
     289          removed already. If the original index url had credentials then
     290          they will be included in the return value.
     291  
     292          Returns None if no matching index was found, or if --no-index
     293          was specified by the user.
     294          """
     295          if not url or not self.index_urls:
     296              return None
     297  
     298          url = remove_auth_from_url(url).rstrip("/") + "/"
     299          parsed_url = urllib.parse.urlsplit(url)
     300  
     301          candidates = []
     302  
     303          for index in self.index_urls:
     304              index = index.rstrip("/") + "/"
     305              parsed_index = urllib.parse.urlsplit(remove_auth_from_url(index))
     306              if parsed_url == parsed_index:
     307                  return index
     308  
     309              if parsed_url.netloc != parsed_index.netloc:
     310                  continue
     311  
     312              candidate = urllib.parse.urlsplit(index)
     313              candidates.append(candidate)
     314  
     315          if not candidates:
     316              return None
     317  
     318          candidates.sort(
     319              reverse=True,
     320              key=lambda candidate: commonprefix(
     321                  [
     322                      parsed_url.path,
     323                      candidate.path,
     324                  ]
     325              ).rfind("/"),
     326          )
     327  
     328          return urllib.parse.urlunsplit(candidates[0])
     329  
     330      def _get_new_credentials(
     331          self,
     332          original_url: str,
     333          *,
     334          allow_netrc: bool = True,
     335          allow_keyring: bool = False,
     336      ) -> AuthInfo:
     337          """Find and return credentials for the specified URL."""
     338          # Split the credentials and netloc from the url.
     339          url, netloc, url_user_password = split_auth_netloc_from_url(
     340              original_url,
     341          )
     342  
     343          # Start with the credentials embedded in the url
     344          username, password = url_user_password
     345          if username is not None and password is not None:
     346              logger.debug("Found credentials in url for %s", netloc)
     347              return url_user_password
     348  
     349          # Find a matching index url for this request
     350          index_url = self._get_index_url(url)
     351          if index_url:
     352              # Split the credentials from the url.
     353              index_info = split_auth_netloc_from_url(index_url)
     354              if index_info:
     355                  index_url, _, index_url_user_password = index_info
     356                  logger.debug("Found index url %s", index_url)
     357  
     358          # If an index URL was found, try its embedded credentials
     359          if index_url and index_url_user_password[0] is not None:
     360              username, password = index_url_user_password
     361              if username is not None and password is not None:
     362                  logger.debug("Found credentials in index url for %s", netloc)
     363                  return index_url_user_password
     364  
     365          # Get creds from netrc if we still don't have them
     366          if allow_netrc:
     367              netrc_auth = get_netrc_auth(original_url)
     368              if netrc_auth:
     369                  logger.debug("Found credentials in netrc for %s", netloc)
     370                  return netrc_auth
     371  
     372          # If we don't have a password and keyring is available, use it.
     373          if allow_keyring:
     374              # The index url is more specific than the netloc, so try it first
     375              # fmt: off
     376              kr_auth = (
     377                  self._get_keyring_auth(index_url, username) or
     378                  self._get_keyring_auth(netloc, username)
     379              )
     380              # fmt: on
     381              if kr_auth:
     382                  logger.debug("Found credentials in keyring for %s", netloc)
     383                  return kr_auth
     384  
     385          return username, password
     386  
     387      def _get_url_and_credentials(
     388          self, original_url: str
     389      ) -> Tuple[str, Optional[str], Optional[str]]:
     390          """Return the credentials to use for the provided URL.
     391  
     392          If allowed, netrc and keyring may be used to obtain the
     393          correct credentials.
     394  
     395          Returns (url_without_credentials, username, password). Note
     396          that even if the original URL contains credentials, this
     397          function may return a different username and password.
     398          """
     399          url, netloc, _ = split_auth_netloc_from_url(original_url)
     400  
     401          # Try to get credentials from original url
     402          username, password = self._get_new_credentials(original_url)
     403  
     404          # If credentials not found, use any stored credentials for this netloc.
     405          # Do this if either the username or the password is missing.
     406          # This accounts for the situation in which the user has specified
     407          # the username in the index url, but the password comes from keyring.
     408          if (username is None or password is None) and netloc in self.passwords:
     409              un, pw = self.passwords[netloc]
     410              # It is possible that the cached credentials are for a different username,
     411              # in which case the cache should be ignored.
     412              if username is None or username == un:
     413                  username, password = un, pw
     414  
     415          if username is not None or password is not None:
     416              # Convert the username and password if they're None, so that
     417              # this netloc will show up as "cached" in the conditional above.
     418              # Further, HTTPBasicAuth doesn't accept None, so it makes sense to
     419              # cache the value that is going to be used.
     420              username = username or ""
     421              password = password or ""
     422  
     423              # Store any acquired credentials.
     424              self.passwords[netloc] = (username, password)
     425  
     426          assert (
     427              # Credentials were found
     428              (username is not None and password is not None)
     429              # Credentials were not found
     430              or (username is None and password is None)
     431          ), f"Could not load credentials from url: {original_url}"
     432  
     433          return url, username, password
     434  
     435      def __call__(self, req: Request) -> Request:
     436          # Get credentials for this request
     437          url, username, password = self._get_url_and_credentials(req.url)
     438  
     439          # Set the url of the request to the url without any credentials
     440          req.url = url
     441  
     442          if username is not None and password is not None:
     443              # Send the basic auth with this request
     444              req = HTTPBasicAuth(username, password)(req)
     445  
     446          # Attach a hook to handle 401 responses
     447          req.register_hook("response", self.handle_401)
     448  
     449          return req
     450  
     451      # Factored out to allow for easy patching in tests
     452      def _prompt_for_password(
     453          self, netloc: str
     454      ) -> Tuple[Optional[str], Optional[str], bool]:
     455          username = ask_input(f"User for {netloc}: ") if self.prompting else None
     456          if not username:
     457              return None, None, False
     458          if self.use_keyring:
     459              auth = self._get_keyring_auth(netloc, username)
     460              if auth and auth[0] is not None and auth[1] is not None:
     461                  return auth[0], auth[1], False
     462          password = ask_password("Password: ")
     463          return username, password, True
     464  
     465      # Factored out to allow for easy patching in tests
     466      def _should_save_password_to_keyring(self) -> bool:
     467          if (
     468              not self.prompting
     469              or not self.use_keyring
     470              or not self.keyring_provider.has_keyring
     471          ):
     472              return False
     473          return ask("Save credentials to keyring [y/N]: ", ["y", "n"]) == "y"
     474  
     475      def handle_401(self, resp: Response, **kwargs: Any) -> Response:
     476          # We only care about 401 responses, anything else we want to just
     477          #   pass through the actual response
     478          if resp.status_code != 401:
     479              return resp
     480  
     481          username, password = None, None
     482  
     483          # Query the keyring for credentials:
     484          if self.use_keyring:
     485              username, password = self._get_new_credentials(
     486                  resp.url,
     487                  allow_netrc=False,
     488                  allow_keyring=True,
     489              )
     490  
     491          # We are not able to prompt the user so simply return the response
     492          if not self.prompting and not username and not password:
     493              return resp
     494  
     495          parsed = urllib.parse.urlparse(resp.url)
     496  
     497          # Prompt the user for a new username and password
     498          save = False
     499          if not username and not password:
     500              username, password, save = self._prompt_for_password(parsed.netloc)
     501  
     502          # Store the new username and password to use for future requests
     503          self._credentials_to_save = None
     504          if username is not None and password is not None:
     505              self.passwords[parsed.netloc] = (username, password)
     506  
     507              # Prompt to save the password to keyring
     508              if save and self._should_save_password_to_keyring():
     509                  self._credentials_to_save = Credentials(
     510                      url=parsed.netloc,
     511                      username=username,
     512                      password=password,
     513                  )
     514  
     515          # Consume content and release the original connection to allow our new
     516          #   request to reuse the same one.
     517          # The result of the assignment isn't used, it's just needed to consume
     518          # the content.
     519          _ = resp.content
     520          resp.raw.release_conn()
     521  
     522          # Add our new username and password to the request
     523          req = HTTPBasicAuth(username or "", password or "")(resp.request)
     524          req.register_hook("response", self.warn_on_401)
     525  
     526          # On successful request, save the credentials that were used to
     527          # keyring. (Note that if the user responded "no" above, this member
     528          # is not set and nothing will be saved.)
     529          if self._credentials_to_save:
     530              req.register_hook("response", self.save_credentials)
     531  
     532          # Send our new request
     533          new_resp = resp.connection.send(req, **kwargs)
     534          new_resp.history.append(resp)
     535  
     536          return new_resp
     537  
     538      def warn_on_401(self, resp: Response, **kwargs: Any) -> None:
     539          """Response callback to warn about incorrect credentials."""
     540          if resp.status_code == 401:
     541              logger.warning(
     542                  "401 Error, Credentials not correct for %s",
     543                  resp.request.url,
     544              )
     545  
     546      def save_credentials(self, resp: Response, **kwargs: Any) -> None:
     547          """Response callback to save credentials on success."""
     548          assert (
     549              self.keyring_provider.has_keyring
     550          ), "should never reach here without keyring"
     551  
     552          creds = self._credentials_to_save
     553          self._credentials_to_save = None
     554          if creds and resp.status_code < 400:
     555              try:
     556                  logger.info("Saving credentials to keyring")
     557                  self.keyring_provider.save_auth_info(
     558                      creds.url, creds.username, creds.password
     559                  )
     560              except Exception:
     561                  logger.exception("Failed to save credentials")