python (3.11.7)

(root)/
lib/
python3.11/
site-packages/
pip/
_internal/
vcs/
git.py
       1  import logging
       2  import os.path
       3  import pathlib
       4  import re
       5  import urllib.parse
       6  import urllib.request
       7  from typing import List, Optional, Tuple
       8  
       9  from pip._internal.exceptions import BadCommand, InstallationError
      10  from pip._internal.utils.misc import HiddenText, display_path, hide_url
      11  from pip._internal.utils.subprocess import make_command
      12  from pip._internal.vcs.versioncontrol import (
      13      AuthInfo,
      14      RemoteNotFoundError,
      15      RemoteNotValidError,
      16      RevOptions,
      17      VersionControl,
      18      find_path_to_project_root_from_repo_root,
      19      vcs,
      20  )
      21  
      22  urlsplit = urllib.parse.urlsplit
      23  urlunsplit = urllib.parse.urlunsplit
      24  
      25  
      26  logger = logging.getLogger(__name__)
      27  
      28  
      29  GIT_VERSION_REGEX = re.compile(
      30      r"^git version "  # Prefix.
      31      r"(\d+)"  # Major.
      32      r"\.(\d+)"  # Dot, minor.
      33      r"(?:\.(\d+))?"  # Optional dot, patch.
      34      r".*$"  # Suffix, including any pre- and post-release segments we don't care about.
      35  )
      36  
      37  HASH_REGEX = re.compile("^[a-fA-F0-9]{40}$")
      38  
      39  # SCP (Secure copy protocol) shorthand. e.g. 'git@example.com:foo/bar.git'
      40  SCP_REGEX = re.compile(
      41      r"""^
      42      # Optional user, e.g. 'git@'
      43      (\w+@)?
      44      # Server, e.g. 'github.com'.
      45      ([^/:]+):
      46      # The server-side path. e.g. 'user/project.git'. Must start with an
      47      # alphanumeric character so as not to be confusable with a Windows paths
      48      # like 'C:/foo/bar' or 'C:\foo\bar'.
      49      (\w[^:]*)
      50      $""",
      51      re.VERBOSE,
      52  )
      53  
      54  
      55  def looks_like_hash(sha: str) -> bool:
      56      return bool(HASH_REGEX.match(sha))
      57  
      58  
      59  class ESC[4;38;5;81mGit(ESC[4;38;5;149mVersionControl):
      60      name = "git"
      61      dirname = ".git"
      62      repo_name = "clone"
      63      schemes = (
      64          "git+http",
      65          "git+https",
      66          "git+ssh",
      67          "git+git",
      68          "git+file",
      69      )
      70      # Prevent the user's environment variables from interfering with pip:
      71      # https://github.com/pypa/pip/issues/1130
      72      unset_environ = ("GIT_DIR", "GIT_WORK_TREE")
      73      default_arg_rev = "HEAD"
      74  
      75      @staticmethod
      76      def get_base_rev_args(rev: str) -> List[str]:
      77          return [rev]
      78  
      79      def is_immutable_rev_checkout(self, url: str, dest: str) -> bool:
      80          _, rev_options = self.get_url_rev_options(hide_url(url))
      81          if not rev_options.rev:
      82              return False
      83          if not self.is_commit_id_equal(dest, rev_options.rev):
      84              # the current commit is different from rev,
      85              # which means rev was something else than a commit hash
      86              return False
      87          # return False in the rare case rev is both a commit hash
      88          # and a tag or a branch; we don't want to cache in that case
      89          # because that branch/tag could point to something else in the future
      90          is_tag_or_branch = bool(self.get_revision_sha(dest, rev_options.rev)[0])
      91          return not is_tag_or_branch
      92  
      93      def get_git_version(self) -> Tuple[int, ...]:
      94          version = self.run_command(
      95              ["version"],
      96              command_desc="git version",
      97              show_stdout=False,
      98              stdout_only=True,
      99          )
     100          match = GIT_VERSION_REGEX.match(version)
     101          if not match:
     102              logger.warning("Can't parse git version: %s", version)
     103              return ()
     104          return tuple(int(c) for c in match.groups())
     105  
     106      @classmethod
     107      def get_current_branch(cls, location: str) -> Optional[str]:
     108          """
     109          Return the current branch, or None if HEAD isn't at a branch
     110          (e.g. detached HEAD).
     111          """
     112          # git-symbolic-ref exits with empty stdout if "HEAD" is a detached
     113          # HEAD rather than a symbolic ref.  In addition, the -q causes the
     114          # command to exit with status code 1 instead of 128 in this case
     115          # and to suppress the message to stderr.
     116          args = ["symbolic-ref", "-q", "HEAD"]
     117          output = cls.run_command(
     118              args,
     119              extra_ok_returncodes=(1,),
     120              show_stdout=False,
     121              stdout_only=True,
     122              cwd=location,
     123          )
     124          ref = output.strip()
     125  
     126          if ref.startswith("refs/heads/"):
     127              return ref[len("refs/heads/") :]
     128  
     129          return None
     130  
     131      @classmethod
     132      def get_revision_sha(cls, dest: str, rev: str) -> Tuple[Optional[str], bool]:
     133          """
     134          Return (sha_or_none, is_branch), where sha_or_none is a commit hash
     135          if the revision names a remote branch or tag, otherwise None.
     136  
     137          Args:
     138            dest: the repository directory.
     139            rev: the revision name.
     140          """
     141          # Pass rev to pre-filter the list.
     142          output = cls.run_command(
     143              ["show-ref", rev],
     144              cwd=dest,
     145              show_stdout=False,
     146              stdout_only=True,
     147              on_returncode="ignore",
     148          )
     149          refs = {}
     150          # NOTE: We do not use splitlines here since that would split on other
     151          #       unicode separators, which can be maliciously used to install a
     152          #       different revision.
     153          for line in output.strip().split("\n"):
     154              line = line.rstrip("\r")
     155              if not line:
     156                  continue
     157              try:
     158                  ref_sha, ref_name = line.split(" ", maxsplit=2)
     159              except ValueError:
     160                  # Include the offending line to simplify troubleshooting if
     161                  # this error ever occurs.
     162                  raise ValueError(f"unexpected show-ref line: {line!r}")
     163  
     164              refs[ref_name] = ref_sha
     165  
     166          branch_ref = f"refs/remotes/origin/{rev}"
     167          tag_ref = f"refs/tags/{rev}"
     168  
     169          sha = refs.get(branch_ref)
     170          if sha is not None:
     171              return (sha, True)
     172  
     173          sha = refs.get(tag_ref)
     174  
     175          return (sha, False)
     176  
     177      @classmethod
     178      def _should_fetch(cls, dest: str, rev: str) -> bool:
     179          """
     180          Return true if rev is a ref or is a commit that we don't have locally.
     181  
     182          Branches and tags are not considered in this method because they are
     183          assumed to be always available locally (which is a normal outcome of
     184          ``git clone`` and ``git fetch --tags``).
     185          """
     186          if rev.startswith("refs/"):
     187              # Always fetch remote refs.
     188              return True
     189  
     190          if not looks_like_hash(rev):
     191              # Git fetch would fail with abbreviated commits.
     192              return False
     193  
     194          if cls.has_commit(dest, rev):
     195              # Don't fetch if we have the commit locally.
     196              return False
     197  
     198          return True
     199  
     200      @classmethod
     201      def resolve_revision(
     202          cls, dest: str, url: HiddenText, rev_options: RevOptions
     203      ) -> RevOptions:
     204          """
     205          Resolve a revision to a new RevOptions object with the SHA1 of the
     206          branch, tag, or ref if found.
     207  
     208          Args:
     209            rev_options: a RevOptions object.
     210          """
     211          rev = rev_options.arg_rev
     212          # The arg_rev property's implementation for Git ensures that the
     213          # rev return value is always non-None.
     214          assert rev is not None
     215  
     216          sha, is_branch = cls.get_revision_sha(dest, rev)
     217  
     218          if sha is not None:
     219              rev_options = rev_options.make_new(sha)
     220              rev_options.branch_name = rev if is_branch else None
     221  
     222              return rev_options
     223  
     224          # Do not show a warning for the common case of something that has
     225          # the form of a Git commit hash.
     226          if not looks_like_hash(rev):
     227              logger.warning(
     228                  "Did not find branch or tag '%s', assuming revision or ref.",
     229                  rev,
     230              )
     231  
     232          if not cls._should_fetch(dest, rev):
     233              return rev_options
     234  
     235          # fetch the requested revision
     236          cls.run_command(
     237              make_command("fetch", "-q", url, rev_options.to_args()),
     238              cwd=dest,
     239          )
     240          # Change the revision to the SHA of the ref we fetched
     241          sha = cls.get_revision(dest, rev="FETCH_HEAD")
     242          rev_options = rev_options.make_new(sha)
     243  
     244          return rev_options
     245  
     246      @classmethod
     247      def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool:
     248          """
     249          Return whether the current commit hash equals the given name.
     250  
     251          Args:
     252            dest: the repository directory.
     253            name: a string name.
     254          """
     255          if not name:
     256              # Then avoid an unnecessary subprocess call.
     257              return False
     258  
     259          return cls.get_revision(dest) == name
     260  
     261      def fetch_new(
     262          self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int
     263      ) -> None:
     264          rev_display = rev_options.to_display()
     265          logger.info("Cloning %s%s to %s", url, rev_display, display_path(dest))
     266          if verbosity <= 0:
     267              flags: Tuple[str, ...] = ("--quiet",)
     268          elif verbosity == 1:
     269              flags = ()
     270          else:
     271              flags = ("--verbose", "--progress")
     272          if self.get_git_version() >= (2, 17):
     273              # Git added support for partial clone in 2.17
     274              # https://git-scm.com/docs/partial-clone
     275              # Speeds up cloning by functioning without a complete copy of repository
     276              self.run_command(
     277                  make_command(
     278                      "clone",
     279                      "--filter=blob:none",
     280                      *flags,
     281                      url,
     282                      dest,
     283                  )
     284              )
     285          else:
     286              self.run_command(make_command("clone", *flags, url, dest))
     287  
     288          if rev_options.rev:
     289              # Then a specific revision was requested.
     290              rev_options = self.resolve_revision(dest, url, rev_options)
     291              branch_name = getattr(rev_options, "branch_name", None)
     292              logger.debug("Rev options %s, branch_name %s", rev_options, branch_name)
     293              if branch_name is None:
     294                  # Only do a checkout if the current commit id doesn't match
     295                  # the requested revision.
     296                  if not self.is_commit_id_equal(dest, rev_options.rev):
     297                      cmd_args = make_command(
     298                          "checkout",
     299                          "-q",
     300                          rev_options.to_args(),
     301                      )
     302                      self.run_command(cmd_args, cwd=dest)
     303              elif self.get_current_branch(dest) != branch_name:
     304                  # Then a specific branch was requested, and that branch
     305                  # is not yet checked out.
     306                  track_branch = f"origin/{branch_name}"
     307                  cmd_args = [
     308                      "checkout",
     309                      "-b",
     310                      branch_name,
     311                      "--track",
     312                      track_branch,
     313                  ]
     314                  self.run_command(cmd_args, cwd=dest)
     315          else:
     316              sha = self.get_revision(dest)
     317              rev_options = rev_options.make_new(sha)
     318  
     319          logger.info("Resolved %s to commit %s", url, rev_options.rev)
     320  
     321          #: repo may contain submodules
     322          self.update_submodules(dest)
     323  
     324      def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
     325          self.run_command(
     326              make_command("config", "remote.origin.url", url),
     327              cwd=dest,
     328          )
     329          cmd_args = make_command("checkout", "-q", rev_options.to_args())
     330          self.run_command(cmd_args, cwd=dest)
     331  
     332          self.update_submodules(dest)
     333  
     334      def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
     335          # First fetch changes from the default remote
     336          if self.get_git_version() >= (1, 9):
     337              # fetch tags in addition to everything else
     338              self.run_command(["fetch", "-q", "--tags"], cwd=dest)
     339          else:
     340              self.run_command(["fetch", "-q"], cwd=dest)
     341          # Then reset to wanted revision (maybe even origin/master)
     342          rev_options = self.resolve_revision(dest, url, rev_options)
     343          cmd_args = make_command("reset", "--hard", "-q", rev_options.to_args())
     344          self.run_command(cmd_args, cwd=dest)
     345          #: update submodules
     346          self.update_submodules(dest)
     347  
     348      @classmethod
     349      def get_remote_url(cls, location: str) -> str:
     350          """
     351          Return URL of the first remote encountered.
     352  
     353          Raises RemoteNotFoundError if the repository does not have a remote
     354          url configured.
     355          """
     356          # We need to pass 1 for extra_ok_returncodes since the command
     357          # exits with return code 1 if there are no matching lines.
     358          stdout = cls.run_command(
     359              ["config", "--get-regexp", r"remote\..*\.url"],
     360              extra_ok_returncodes=(1,),
     361              show_stdout=False,
     362              stdout_only=True,
     363              cwd=location,
     364          )
     365          remotes = stdout.splitlines()
     366          try:
     367              found_remote = remotes[0]
     368          except IndexError:
     369              raise RemoteNotFoundError
     370  
     371          for remote in remotes:
     372              if remote.startswith("remote.origin.url "):
     373                  found_remote = remote
     374                  break
     375          url = found_remote.split(" ")[1]
     376          return cls._git_remote_to_pip_url(url.strip())
     377  
     378      @staticmethod
     379      def _git_remote_to_pip_url(url: str) -> str:
     380          """
     381          Convert a remote url from what git uses to what pip accepts.
     382  
     383          There are 3 legal forms **url** may take:
     384  
     385              1. A fully qualified url: ssh://git@example.com/foo/bar.git
     386              2. A local project.git folder: /path/to/bare/repository.git
     387              3. SCP shorthand for form 1: git@example.com:foo/bar.git
     388  
     389          Form 1 is output as-is. Form 2 must be converted to URI and form 3 must
     390          be converted to form 1.
     391  
     392          See the corresponding test test_git_remote_url_to_pip() for examples of
     393          sample inputs/outputs.
     394          """
     395          if re.match(r"\w+://", url):
     396              # This is already valid. Pass it though as-is.
     397              return url
     398          if os.path.exists(url):
     399              # A local bare remote (git clone --mirror).
     400              # Needs a file:// prefix.
     401              return pathlib.PurePath(url).as_uri()
     402          scp_match = SCP_REGEX.match(url)
     403          if scp_match:
     404              # Add an ssh:// prefix and replace the ':' with a '/'.
     405              return scp_match.expand(r"ssh://\1\2/\3")
     406          # Otherwise, bail out.
     407          raise RemoteNotValidError(url)
     408  
     409      @classmethod
     410      def has_commit(cls, location: str, rev: str) -> bool:
     411          """
     412          Check if rev is a commit that is available in the local repository.
     413          """
     414          try:
     415              cls.run_command(
     416                  ["rev-parse", "-q", "--verify", "sha^" + rev],
     417                  cwd=location,
     418                  log_failed_cmd=False,
     419              )
     420          except InstallationError:
     421              return False
     422          else:
     423              return True
     424  
     425      @classmethod
     426      def get_revision(cls, location: str, rev: Optional[str] = None) -> str:
     427          if rev is None:
     428              rev = "HEAD"
     429          current_rev = cls.run_command(
     430              ["rev-parse", rev],
     431              show_stdout=False,
     432              stdout_only=True,
     433              cwd=location,
     434          )
     435          return current_rev.strip()
     436  
     437      @classmethod
     438      def get_subdirectory(cls, location: str) -> Optional[str]:
     439          """
     440          Return the path to Python project root, relative to the repo root.
     441          Return None if the project root is in the repo root.
     442          """
     443          # find the repo root
     444          git_dir = cls.run_command(
     445              ["rev-parse", "--git-dir"],
     446              show_stdout=False,
     447              stdout_only=True,
     448              cwd=location,
     449          ).strip()
     450          if not os.path.isabs(git_dir):
     451              git_dir = os.path.join(location, git_dir)
     452          repo_root = os.path.abspath(os.path.join(git_dir, ".."))
     453          return find_path_to_project_root_from_repo_root(location, repo_root)
     454  
     455      @classmethod
     456      def get_url_rev_and_auth(cls, url: str) -> Tuple[str, Optional[str], AuthInfo]:
     457          """
     458          Prefixes stub URLs like 'user@hostname:user/repo.git' with 'ssh://'.
     459          That's required because although they use SSH they sometimes don't
     460          work with a ssh:// scheme (e.g. GitHub). But we need a scheme for
     461          parsing. Hence we remove it again afterwards and return it as a stub.
     462          """
     463          # Works around an apparent Git bug
     464          # (see https://article.gmane.org/gmane.comp.version-control.git/146500)
     465          scheme, netloc, path, query, fragment = urlsplit(url)
     466          if scheme.endswith("file"):
     467              initial_slashes = path[: -len(path.lstrip("/"))]
     468              newpath = initial_slashes + urllib.request.url2pathname(path).replace(
     469                  "\\", "/"
     470              ).lstrip("/")
     471              after_plus = scheme.find("+") + 1
     472              url = scheme[:after_plus] + urlunsplit(
     473                  (scheme[after_plus:], netloc, newpath, query, fragment),
     474              )
     475  
     476          if "://" not in url:
     477              assert "file:" not in url
     478              url = url.replace("git+", "git+ssh://")
     479              url, rev, user_pass = super().get_url_rev_and_auth(url)
     480              url = url.replace("ssh://", "")
     481          else:
     482              url, rev, user_pass = super().get_url_rev_and_auth(url)
     483  
     484          return url, rev, user_pass
     485  
     486      @classmethod
     487      def update_submodules(cls, location: str) -> None:
     488          if not os.path.exists(os.path.join(location, ".gitmodules")):
     489              return
     490          cls.run_command(
     491              ["submodule", "update", "--init", "--recursive", "-q"],
     492              cwd=location,
     493          )
     494  
     495      @classmethod
     496      def get_repository_root(cls, location: str) -> Optional[str]:
     497          loc = super().get_repository_root(location)
     498          if loc:
     499              return loc
     500          try:
     501              r = cls.run_command(
     502                  ["rev-parse", "--show-toplevel"],
     503                  cwd=location,
     504                  show_stdout=False,
     505                  stdout_only=True,
     506                  on_returncode="raise",
     507                  log_failed_cmd=False,
     508              )
     509          except BadCommand:
     510              logger.debug(
     511                  "could not determine if %s is under git control "
     512                  "because git is not available",
     513                  location,
     514              )
     515              return None
     516          except InstallationError:
     517              return None
     518          return os.path.normpath(r.rstrip("\r\n"))
     519  
     520      @staticmethod
     521      def should_add_vcs_url_prefix(repo_url: str) -> bool:
     522          """In either https or ssh form, requirements must be prefixed with git+."""
     523          return True
     524  
     525  
     526  vcs.register(Git)