python (3.11.7)
       1  """Handles all VCS (version control) support"""
       2  
       3  import logging
       4  import os
       5  import shutil
       6  import sys
       7  import urllib.parse
       8  from typing import (
       9      TYPE_CHECKING,
      10      Any,
      11      Dict,
      12      Iterable,
      13      Iterator,
      14      List,
      15      Mapping,
      16      Optional,
      17      Tuple,
      18      Type,
      19      Union,
      20  )
      21  
      22  from pip._internal.cli.spinners import SpinnerInterface
      23  from pip._internal.exceptions import BadCommand, InstallationError
      24  from pip._internal.utils.misc import (
      25      HiddenText,
      26      ask_path_exists,
      27      backup_dir,
      28      display_path,
      29      hide_url,
      30      hide_value,
      31      is_installable_dir,
      32      rmtree,
      33  )
      34  from pip._internal.utils.subprocess import (
      35      CommandArgs,
      36      call_subprocess,
      37      format_command_args,
      38      make_command,
      39  )
      40  from pip._internal.utils.urls import get_url_scheme
      41  
      42  if TYPE_CHECKING:
      43      # Literal was introduced in Python 3.8.
      44      #
      45      # TODO: Remove `if TYPE_CHECKING` when dropping support for Python 3.7.
      46      from typing import Literal
      47  
      48  
      49  __all__ = ["vcs"]
      50  
      51  
      52  logger = logging.getLogger(__name__)
      53  
      54  AuthInfo = Tuple[Optional[str], Optional[str]]
      55  
      56  
      57  def is_url(name: str) -> bool:
      58      """
      59      Return true if the name looks like a URL.
      60      """
      61      scheme = get_url_scheme(name)
      62      if scheme is None:
      63          return False
      64      return scheme in ["http", "https", "file", "ftp"] + vcs.all_schemes
      65  
      66  
      67  def make_vcs_requirement_url(
      68      repo_url: str, rev: str, project_name: str, subdir: Optional[str] = None
      69  ) -> str:
      70      """
      71      Return the URL for a VCS requirement.
      72  
      73      Args:
      74        repo_url: the remote VCS url, with any needed VCS prefix (e.g. "git+").
      75        project_name: the (unescaped) project name.
      76      """
      77      egg_project_name = project_name.replace("-", "_")
      78      req = f"{repo_url}@{rev}#egg={egg_project_name}"
      79      if subdir:
      80          req += f"&subdirectory={subdir}"
      81  
      82      return req
      83  
      84  
      85  def find_path_to_project_root_from_repo_root(
      86      location: str, repo_root: str
      87  ) -> Optional[str]:
      88      """
      89      Find the the Python project's root by searching up the filesystem from
      90      `location`. Return the path to project root relative to `repo_root`.
      91      Return None if the project root is `repo_root`, or cannot be found.
      92      """
      93      # find project root.
      94      orig_location = location
      95      while not is_installable_dir(location):
      96          last_location = location
      97          location = os.path.dirname(location)
      98          if location == last_location:
      99              # We've traversed up to the root of the filesystem without
     100              # finding a Python project.
     101              logger.warning(
     102                  "Could not find a Python project for directory %s (tried all "
     103                  "parent directories)",
     104                  orig_location,
     105              )
     106              return None
     107  
     108      if os.path.samefile(repo_root, location):
     109          return None
     110  
     111      return os.path.relpath(location, repo_root)
     112  
     113  
     114  class ESC[4;38;5;81mRemoteNotFoundError(ESC[4;38;5;149mException):
     115      pass
     116  
     117  
     118  class ESC[4;38;5;81mRemoteNotValidError(ESC[4;38;5;149mException):
     119      def __init__(self, url: str):
     120          super().__init__(url)
     121          self.url = url
     122  
     123  
     124  class ESC[4;38;5;81mRevOptions:
     125  
     126      """
     127      Encapsulates a VCS-specific revision to install, along with any VCS
     128      install options.
     129  
     130      Instances of this class should be treated as if immutable.
     131      """
     132  
     133      def __init__(
     134          self,
     135          vc_class: Type["VersionControl"],
     136          rev: Optional[str] = None,
     137          extra_args: Optional[CommandArgs] = None,
     138      ) -> None:
     139          """
     140          Args:
     141            vc_class: a VersionControl subclass.
     142            rev: the name of the revision to install.
     143            extra_args: a list of extra options.
     144          """
     145          if extra_args is None:
     146              extra_args = []
     147  
     148          self.extra_args = extra_args
     149          self.rev = rev
     150          self.vc_class = vc_class
     151          self.branch_name: Optional[str] = None
     152  
     153      def __repr__(self) -> str:
     154          return f"<RevOptions {self.vc_class.name}: rev={self.rev!r}>"
     155  
     156      @property
     157      def arg_rev(self) -> Optional[str]:
     158          if self.rev is None:
     159              return self.vc_class.default_arg_rev
     160  
     161          return self.rev
     162  
     163      def to_args(self) -> CommandArgs:
     164          """
     165          Return the VCS-specific command arguments.
     166          """
     167          args: CommandArgs = []
     168          rev = self.arg_rev
     169          if rev is not None:
     170              args += self.vc_class.get_base_rev_args(rev)
     171          args += self.extra_args
     172  
     173          return args
     174  
     175      def to_display(self) -> str:
     176          if not self.rev:
     177              return ""
     178  
     179          return f" (to revision {self.rev})"
     180  
     181      def make_new(self, rev: str) -> "RevOptions":
     182          """
     183          Make a copy of the current instance, but with a new rev.
     184  
     185          Args:
     186            rev: the name of the revision for the new object.
     187          """
     188          return self.vc_class.make_rev_options(rev, extra_args=self.extra_args)
     189  
     190  
     191  class ESC[4;38;5;81mVcsSupport:
     192      _registry: Dict[str, "VersionControl"] = {}
     193      schemes = ["ssh", "git", "hg", "bzr", "sftp", "svn"]
     194  
     195      def __init__(self) -> None:
     196          # Register more schemes with urlparse for various version control
     197          # systems
     198          urllib.parse.uses_netloc.extend(self.schemes)
     199          super().__init__()
     200  
     201      def __iter__(self) -> Iterator[str]:
     202          return self._registry.__iter__()
     203  
     204      @property
     205      def backends(self) -> List["VersionControl"]:
     206          return list(self._registry.values())
     207  
     208      @property
     209      def dirnames(self) -> List[str]:
     210          return [backend.dirname for backend in self.backends]
     211  
     212      @property
     213      def all_schemes(self) -> List[str]:
     214          schemes: List[str] = []
     215          for backend in self.backends:
     216              schemes.extend(backend.schemes)
     217          return schemes
     218  
     219      def register(self, cls: Type["VersionControl"]) -> None:
     220          if not hasattr(cls, "name"):
     221              logger.warning("Cannot register VCS %s", cls.__name__)
     222              return
     223          if cls.name not in self._registry:
     224              self._registry[cls.name] = cls()
     225              logger.debug("Registered VCS backend: %s", cls.name)
     226  
     227      def unregister(self, name: str) -> None:
     228          if name in self._registry:
     229              del self._registry[name]
     230  
     231      def get_backend_for_dir(self, location: str) -> Optional["VersionControl"]:
     232          """
     233          Return a VersionControl object if a repository of that type is found
     234          at the given directory.
     235          """
     236          vcs_backends = {}
     237          for vcs_backend in self._registry.values():
     238              repo_path = vcs_backend.get_repository_root(location)
     239              if not repo_path:
     240                  continue
     241              logger.debug("Determine that %s uses VCS: %s", location, vcs_backend.name)
     242              vcs_backends[repo_path] = vcs_backend
     243  
     244          if not vcs_backends:
     245              return None
     246  
     247          # Choose the VCS in the inner-most directory. Since all repository
     248          # roots found here would be either `location` or one of its
     249          # parents, the longest path should have the most path components,
     250          # i.e. the backend representing the inner-most repository.
     251          inner_most_repo_path = max(vcs_backends, key=len)
     252          return vcs_backends[inner_most_repo_path]
     253  
     254      def get_backend_for_scheme(self, scheme: str) -> Optional["VersionControl"]:
     255          """
     256          Return a VersionControl object or None.
     257          """
     258          for vcs_backend in self._registry.values():
     259              if scheme in vcs_backend.schemes:
     260                  return vcs_backend
     261          return None
     262  
     263      def get_backend(self, name: str) -> Optional["VersionControl"]:
     264          """
     265          Return a VersionControl object or None.
     266          """
     267          name = name.lower()
     268          return self._registry.get(name)
     269  
     270  
     271  vcs = VcsSupport()
     272  
     273  
     274  class ESC[4;38;5;81mVersionControl:
     275      name = ""
     276      dirname = ""
     277      repo_name = ""
     278      # List of supported schemes for this Version Control
     279      schemes: Tuple[str, ...] = ()
     280      # Iterable of environment variable names to pass to call_subprocess().
     281      unset_environ: Tuple[str, ...] = ()
     282      default_arg_rev: Optional[str] = None
     283  
     284      @classmethod
     285      def should_add_vcs_url_prefix(cls, remote_url: str) -> bool:
     286          """
     287          Return whether the vcs prefix (e.g. "git+") should be added to a
     288          repository's remote url when used in a requirement.
     289          """
     290          return not remote_url.lower().startswith(f"{cls.name}:")
     291  
     292      @classmethod
     293      def get_subdirectory(cls, location: str) -> Optional[str]:
     294          """
     295          Return the path to Python project root, relative to the repo root.
     296          Return None if the project root is in the repo root.
     297          """
     298          return None
     299  
     300      @classmethod
     301      def get_requirement_revision(cls, repo_dir: str) -> str:
     302          """
     303          Return the revision string that should be used in a requirement.
     304          """
     305          return cls.get_revision(repo_dir)
     306  
     307      @classmethod
     308      def get_src_requirement(cls, repo_dir: str, project_name: str) -> str:
     309          """
     310          Return the requirement string to use to redownload the files
     311          currently at the given repository directory.
     312  
     313          Args:
     314            project_name: the (unescaped) project name.
     315  
     316          The return value has a form similar to the following:
     317  
     318              {repository_url}@{revision}#egg={project_name}
     319          """
     320          repo_url = cls.get_remote_url(repo_dir)
     321  
     322          if cls.should_add_vcs_url_prefix(repo_url):
     323              repo_url = f"{cls.name}+{repo_url}"
     324  
     325          revision = cls.get_requirement_revision(repo_dir)
     326          subdir = cls.get_subdirectory(repo_dir)
     327          req = make_vcs_requirement_url(repo_url, revision, project_name, subdir=subdir)
     328  
     329          return req
     330  
     331      @staticmethod
     332      def get_base_rev_args(rev: str) -> List[str]:
     333          """
     334          Return the base revision arguments for a vcs command.
     335  
     336          Args:
     337            rev: the name of a revision to install.  Cannot be None.
     338          """
     339          raise NotImplementedError
     340  
     341      def is_immutable_rev_checkout(self, url: str, dest: str) -> bool:
     342          """
     343          Return true if the commit hash checked out at dest matches
     344          the revision in url.
     345  
     346          Always return False, if the VCS does not support immutable commit
     347          hashes.
     348  
     349          This method does not check if there are local uncommitted changes
     350          in dest after checkout, as pip currently has no use case for that.
     351          """
     352          return False
     353  
     354      @classmethod
     355      def make_rev_options(
     356          cls, rev: Optional[str] = None, extra_args: Optional[CommandArgs] = None
     357      ) -> RevOptions:
     358          """
     359          Return a RevOptions object.
     360  
     361          Args:
     362            rev: the name of a revision to install.
     363            extra_args: a list of extra options.
     364          """
     365          return RevOptions(cls, rev, extra_args=extra_args)
     366  
     367      @classmethod
     368      def _is_local_repository(cls, repo: str) -> bool:
     369          """
     370          posix absolute paths start with os.path.sep,
     371          win32 ones start with drive (like c:\\folder)
     372          """
     373          drive, tail = os.path.splitdrive(repo)
     374          return repo.startswith(os.path.sep) or bool(drive)
     375  
     376      @classmethod
     377      def get_netloc_and_auth(
     378          cls, netloc: str, scheme: str
     379      ) -> Tuple[str, Tuple[Optional[str], Optional[str]]]:
     380          """
     381          Parse the repository URL's netloc, and return the new netloc to use
     382          along with auth information.
     383  
     384          Args:
     385            netloc: the original repository URL netloc.
     386            scheme: the repository URL's scheme without the vcs prefix.
     387  
     388          This is mainly for the Subversion class to override, so that auth
     389          information can be provided via the --username and --password options
     390          instead of through the URL.  For other subclasses like Git without
     391          such an option, auth information must stay in the URL.
     392  
     393          Returns: (netloc, (username, password)).
     394          """
     395          return netloc, (None, None)
     396  
     397      @classmethod
     398      def get_url_rev_and_auth(cls, url: str) -> Tuple[str, Optional[str], AuthInfo]:
     399          """
     400          Parse the repository URL to use, and return the URL, revision,
     401          and auth info to use.
     402  
     403          Returns: (url, rev, (username, password)).
     404          """
     405          scheme, netloc, path, query, frag = urllib.parse.urlsplit(url)
     406          if "+" not in scheme:
     407              raise ValueError(
     408                  "Sorry, {!r} is a malformed VCS url. "
     409                  "The format is <vcs>+<protocol>://<url>, "
     410                  "e.g. svn+http://myrepo/svn/MyApp#egg=MyApp".format(url)
     411              )
     412          # Remove the vcs prefix.
     413          scheme = scheme.split("+", 1)[1]
     414          netloc, user_pass = cls.get_netloc_and_auth(netloc, scheme)
     415          rev = None
     416          if "@" in path:
     417              path, rev = path.rsplit("@", 1)
     418              if not rev:
     419                  raise InstallationError(
     420                      "The URL {!r} has an empty revision (after @) "
     421                      "which is not supported. Include a revision after @ "
     422                      "or remove @ from the URL.".format(url)
     423                  )
     424          url = urllib.parse.urlunsplit((scheme, netloc, path, query, ""))
     425          return url, rev, user_pass
     426  
     427      @staticmethod
     428      def make_rev_args(
     429          username: Optional[str], password: Optional[HiddenText]
     430      ) -> CommandArgs:
     431          """
     432          Return the RevOptions "extra arguments" to use in obtain().
     433          """
     434          return []
     435  
     436      def get_url_rev_options(self, url: HiddenText) -> Tuple[HiddenText, RevOptions]:
     437          """
     438          Return the URL and RevOptions object to use in obtain(),
     439          as a tuple (url, rev_options).
     440          """
     441          secret_url, rev, user_pass = self.get_url_rev_and_auth(url.secret)
     442          username, secret_password = user_pass
     443          password: Optional[HiddenText] = None
     444          if secret_password is not None:
     445              password = hide_value(secret_password)
     446          extra_args = self.make_rev_args(username, password)
     447          rev_options = self.make_rev_options(rev, extra_args=extra_args)
     448  
     449          return hide_url(secret_url), rev_options
     450  
     451      @staticmethod
     452      def normalize_url(url: str) -> str:
     453          """
     454          Normalize a URL for comparison by unquoting it and removing any
     455          trailing slash.
     456          """
     457          return urllib.parse.unquote(url).rstrip("/")
     458  
     459      @classmethod
     460      def compare_urls(cls, url1: str, url2: str) -> bool:
     461          """
     462          Compare two repo URLs for identity, ignoring incidental differences.
     463          """
     464          return cls.normalize_url(url1) == cls.normalize_url(url2)
     465  
     466      def fetch_new(
     467          self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int
     468      ) -> None:
     469          """
     470          Fetch a revision from a repository, in the case that this is the
     471          first fetch from the repository.
     472  
     473          Args:
     474            dest: the directory to fetch the repository to.
     475            rev_options: a RevOptions object.
     476            verbosity: verbosity level.
     477          """
     478          raise NotImplementedError
     479  
     480      def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
     481          """
     482          Switch the repo at ``dest`` to point to ``URL``.
     483  
     484          Args:
     485            rev_options: a RevOptions object.
     486          """
     487          raise NotImplementedError
     488  
     489      def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
     490          """
     491          Update an already-existing repo to the given ``rev_options``.
     492  
     493          Args:
     494            rev_options: a RevOptions object.
     495          """
     496          raise NotImplementedError
     497  
     498      @classmethod
     499      def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool:
     500          """
     501          Return whether the id of the current commit equals the given name.
     502  
     503          Args:
     504            dest: the repository directory.
     505            name: a string name.
     506          """
     507          raise NotImplementedError
     508  
     509      def obtain(self, dest: str, url: HiddenText, verbosity: int) -> None:
     510          """
     511          Install or update in editable mode the package represented by this
     512          VersionControl object.
     513  
     514          :param dest: the repository directory in which to install or update.
     515          :param url: the repository URL starting with a vcs prefix.
     516          :param verbosity: verbosity level.
     517          """
     518          url, rev_options = self.get_url_rev_options(url)
     519  
     520          if not os.path.exists(dest):
     521              self.fetch_new(dest, url, rev_options, verbosity=verbosity)
     522              return
     523  
     524          rev_display = rev_options.to_display()
     525          if self.is_repository_directory(dest):
     526              existing_url = self.get_remote_url(dest)
     527              if self.compare_urls(existing_url, url.secret):
     528                  logger.debug(
     529                      "%s in %s exists, and has correct URL (%s)",
     530                      self.repo_name.title(),
     531                      display_path(dest),
     532                      url,
     533                  )
     534                  if not self.is_commit_id_equal(dest, rev_options.rev):
     535                      logger.info(
     536                          "Updating %s %s%s",
     537                          display_path(dest),
     538                          self.repo_name,
     539                          rev_display,
     540                      )
     541                      self.update(dest, url, rev_options)
     542                  else:
     543                      logger.info("Skipping because already up-to-date.")
     544                  return
     545  
     546              logger.warning(
     547                  "%s %s in %s exists with URL %s",
     548                  self.name,
     549                  self.repo_name,
     550                  display_path(dest),
     551                  existing_url,
     552              )
     553              prompt = ("(s)witch, (i)gnore, (w)ipe, (b)ackup ", ("s", "i", "w", "b"))
     554          else:
     555              logger.warning(
     556                  "Directory %s already exists, and is not a %s %s.",
     557                  dest,
     558                  self.name,
     559                  self.repo_name,
     560              )
     561              # https://github.com/python/mypy/issues/1174
     562              prompt = ("(i)gnore, (w)ipe, (b)ackup ", ("i", "w", "b"))  # type: ignore
     563  
     564          logger.warning(
     565              "The plan is to install the %s repository %s",
     566              self.name,
     567              url,
     568          )
     569          response = ask_path_exists("What to do?  {}".format(prompt[0]), prompt[1])
     570  
     571          if response == "a":
     572              sys.exit(-1)
     573  
     574          if response == "w":
     575              logger.warning("Deleting %s", display_path(dest))
     576              rmtree(dest)
     577              self.fetch_new(dest, url, rev_options, verbosity=verbosity)
     578              return
     579  
     580          if response == "b":
     581              dest_dir = backup_dir(dest)
     582              logger.warning("Backing up %s to %s", display_path(dest), dest_dir)
     583              shutil.move(dest, dest_dir)
     584              self.fetch_new(dest, url, rev_options, verbosity=verbosity)
     585              return
     586  
     587          # Do nothing if the response is "i".
     588          if response == "s":
     589              logger.info(
     590                  "Switching %s %s to %s%s",
     591                  self.repo_name,
     592                  display_path(dest),
     593                  url,
     594                  rev_display,
     595              )
     596              self.switch(dest, url, rev_options)
     597  
     598      def unpack(self, location: str, url: HiddenText, verbosity: int) -> None:
     599          """
     600          Clean up current location and download the url repository
     601          (and vcs infos) into location
     602  
     603          :param url: the repository URL starting with a vcs prefix.
     604          :param verbosity: verbosity level.
     605          """
     606          if os.path.exists(location):
     607              rmtree(location)
     608          self.obtain(location, url=url, verbosity=verbosity)
     609  
     610      @classmethod
     611      def get_remote_url(cls, location: str) -> str:
     612          """
     613          Return the url used at location
     614  
     615          Raises RemoteNotFoundError if the repository does not have a remote
     616          url configured.
     617          """
     618          raise NotImplementedError
     619  
     620      @classmethod
     621      def get_revision(cls, location: str) -> str:
     622          """
     623          Return the current commit id of the files at the given location.
     624          """
     625          raise NotImplementedError
     626  
     627      @classmethod
     628      def run_command(
     629          cls,
     630          cmd: Union[List[str], CommandArgs],
     631          show_stdout: bool = True,
     632          cwd: Optional[str] = None,
     633          on_returncode: 'Literal["raise", "warn", "ignore"]' = "raise",
     634          extra_ok_returncodes: Optional[Iterable[int]] = None,
     635          command_desc: Optional[str] = None,
     636          extra_environ: Optional[Mapping[str, Any]] = None,
     637          spinner: Optional[SpinnerInterface] = None,
     638          log_failed_cmd: bool = True,
     639          stdout_only: bool = False,
     640      ) -> str:
     641          """
     642          Run a VCS subcommand
     643          This is simply a wrapper around call_subprocess that adds the VCS
     644          command name, and checks that the VCS is available
     645          """
     646          cmd = make_command(cls.name, *cmd)
     647          if command_desc is None:
     648              command_desc = format_command_args(cmd)
     649          try:
     650              return call_subprocess(
     651                  cmd,
     652                  show_stdout,
     653                  cwd,
     654                  on_returncode=on_returncode,
     655                  extra_ok_returncodes=extra_ok_returncodes,
     656                  command_desc=command_desc,
     657                  extra_environ=extra_environ,
     658                  unset_environ=cls.unset_environ,
     659                  spinner=spinner,
     660                  log_failed_cmd=log_failed_cmd,
     661                  stdout_only=stdout_only,
     662              )
     663          except FileNotFoundError:
     664              # errno.ENOENT = no such file or directory
     665              # In other words, the VCS executable isn't available
     666              raise BadCommand(
     667                  f"Cannot find command {cls.name!r} - do you have "
     668                  f"{cls.name!r} installed and in your PATH?"
     669              )
     670          except PermissionError:
     671              # errno.EACCES = Permission denied
     672              # This error occurs, for instance, when the command is installed
     673              # only for another user. So, the current user don't have
     674              # permission to call the other user command.
     675              raise BadCommand(
     676                  f"No permission to execute {cls.name!r} - install it "
     677                  f"locally, globally (ask admin), or check your PATH. "
     678                  f"See possible solutions at "
     679                  f"https://pip.pypa.io/en/latest/reference/pip_freeze/"
     680                  f"#fixing-permission-denied."
     681              )
     682  
     683      @classmethod
     684      def is_repository_directory(cls, path: str) -> bool:
     685          """
     686          Return whether a directory path is a repository directory.
     687          """
     688          logger.debug("Checking in %s for %s (%s)...", path, cls.dirname, cls.name)
     689          return os.path.exists(os.path.join(path, cls.dirname))
     690  
     691      @classmethod
     692      def get_repository_root(cls, location: str) -> Optional[str]:
     693          """
     694          Return the "root" (top-level) directory controlled by the vcs,
     695          or `None` if the directory is not in any.
     696  
     697          It is meant to be overridden to implement smarter detection
     698          mechanisms for specific vcs.
     699  
     700          This can do more than is_repository_directory() alone. For
     701          example, the Git override checks that Git is actually available.
     702          """
     703          if cls.is_repository_directory(location):
     704              return location
     705          return None