python (3.11.7)

(root)/
lib/
python3.11/
site-packages/
pip/
_internal/
operations/
install/
wheel.py
       1  """Support for installing and building the "wheel" binary package format.
       2  """
       3  
       4  import collections
       5  import compileall
       6  import contextlib
       7  import csv
       8  import importlib
       9  import logging
      10  import os.path
      11  import re
      12  import shutil
      13  import sys
      14  import warnings
      15  from base64 import urlsafe_b64encode
      16  from email.message import Message
      17  from itertools import chain, filterfalse, starmap
      18  from typing import (
      19      IO,
      20      TYPE_CHECKING,
      21      Any,
      22      BinaryIO,
      23      Callable,
      24      Dict,
      25      Generator,
      26      Iterable,
      27      Iterator,
      28      List,
      29      NewType,
      30      Optional,
      31      Sequence,
      32      Set,
      33      Tuple,
      34      Union,
      35      cast,
      36  )
      37  from zipfile import ZipFile, ZipInfo
      38  
      39  from pip._vendor.distlib.scripts import ScriptMaker
      40  from pip._vendor.distlib.util import get_export_entry
      41  from pip._vendor.packaging.utils import canonicalize_name
      42  
      43  from pip._internal.exceptions import InstallationError
      44  from pip._internal.locations import get_major_minor_version
      45  from pip._internal.metadata import (
      46      BaseDistribution,
      47      FilesystemWheel,
      48      get_wheel_distribution,
      49  )
      50  from pip._internal.models.direct_url import DIRECT_URL_METADATA_NAME, DirectUrl
      51  from pip._internal.models.scheme import SCHEME_KEYS, Scheme
      52  from pip._internal.utils.filesystem import adjacent_tmp_file, replace
      53  from pip._internal.utils.misc import captured_stdout, ensure_dir, hash_file, partition
      54  from pip._internal.utils.unpacking import (
      55      current_umask,
      56      is_within_directory,
      57      set_extracted_file_to_default_mode_plus_executable,
      58      zip_item_is_executable,
      59  )
      60  from pip._internal.utils.wheel import parse_wheel
      61  
      62  if TYPE_CHECKING:
      63      from typing import Protocol
      64  
      65      class ESC[4;38;5;81mFile(ESC[4;38;5;149mProtocol):
      66          src_record_path: "RecordPath"
      67          dest_path: str
      68          changed: bool
      69  
      70          def save(self) -> None:
      71              pass
      72  
      73  
      74  logger = logging.getLogger(__name__)
      75  
      76  RecordPath = NewType("RecordPath", str)
      77  InstalledCSVRow = Tuple[RecordPath, str, Union[int, str]]
      78  
      79  
      80  def rehash(path: str, blocksize: int = 1 << 20) -> Tuple[str, str]:
      81      """Return (encoded_digest, length) for path using hashlib.sha256()"""
      82      h, length = hash_file(path, blocksize)
      83      digest = "sha256=" + urlsafe_b64encode(h.digest()).decode("latin1").rstrip("=")
      84      return (digest, str(length))
      85  
      86  
      87  def csv_io_kwargs(mode: str) -> Dict[str, Any]:
      88      """Return keyword arguments to properly open a CSV file
      89      in the given mode.
      90      """
      91      return {"mode": mode, "newline": "", "encoding": "utf-8"}
      92  
      93  
      94  def fix_script(path: str) -> bool:
      95      """Replace #!python with #!/path/to/python
      96      Return True if file was changed.
      97      """
      98      # XXX RECORD hashes will need to be updated
      99      assert os.path.isfile(path)
     100  
     101      with open(path, "rb") as script:
     102          firstline = script.readline()
     103          if not firstline.startswith(b"#!python"):
     104              return False
     105          exename = sys.executable.encode(sys.getfilesystemencoding())
     106          firstline = b"#!" + exename + os.linesep.encode("ascii")
     107          rest = script.read()
     108      with open(path, "wb") as script:
     109          script.write(firstline)
     110          script.write(rest)
     111      return True
     112  
     113  
     114  def wheel_root_is_purelib(metadata: Message) -> bool:
     115      return metadata.get("Root-Is-Purelib", "").lower() == "true"
     116  
     117  
     118  def get_entrypoints(dist: BaseDistribution) -> Tuple[Dict[str, str], Dict[str, str]]:
     119      console_scripts = {}
     120      gui_scripts = {}
     121      for entry_point in dist.iter_entry_points():
     122          if entry_point.group == "console_scripts":
     123              console_scripts[entry_point.name] = entry_point.value
     124          elif entry_point.group == "gui_scripts":
     125              gui_scripts[entry_point.name] = entry_point.value
     126      return console_scripts, gui_scripts
     127  
     128  
     129  def message_about_scripts_not_on_PATH(scripts: Sequence[str]) -> Optional[str]:
     130      """Determine if any scripts are not on PATH and format a warning.
     131      Returns a warning message if one or more scripts are not on PATH,
     132      otherwise None.
     133      """
     134      if not scripts:
     135          return None
     136  
     137      # Group scripts by the path they were installed in
     138      grouped_by_dir: Dict[str, Set[str]] = collections.defaultdict(set)
     139      for destfile in scripts:
     140          parent_dir = os.path.dirname(destfile)
     141          script_name = os.path.basename(destfile)
     142          grouped_by_dir[parent_dir].add(script_name)
     143  
     144      # We don't want to warn for directories that are on PATH.
     145      not_warn_dirs = [
     146          os.path.normcase(os.path.normpath(i)).rstrip(os.sep)
     147          for i in os.environ.get("PATH", "").split(os.pathsep)
     148      ]
     149      # If an executable sits with sys.executable, we don't warn for it.
     150      #     This covers the case of venv invocations without activating the venv.
     151      not_warn_dirs.append(
     152          os.path.normcase(os.path.normpath(os.path.dirname(sys.executable)))
     153      )
     154      warn_for: Dict[str, Set[str]] = {
     155          parent_dir: scripts
     156          for parent_dir, scripts in grouped_by_dir.items()
     157          if os.path.normcase(os.path.normpath(parent_dir)) not in not_warn_dirs
     158      }
     159      if not warn_for:
     160          return None
     161  
     162      # Format a message
     163      msg_lines = []
     164      for parent_dir, dir_scripts in warn_for.items():
     165          sorted_scripts: List[str] = sorted(dir_scripts)
     166          if len(sorted_scripts) == 1:
     167              start_text = "script {} is".format(sorted_scripts[0])
     168          else:
     169              start_text = "scripts {} are".format(
     170                  ", ".join(sorted_scripts[:-1]) + " and " + sorted_scripts[-1]
     171              )
     172  
     173          msg_lines.append(
     174              "The {} installed in '{}' which is not on PATH.".format(
     175                  start_text, parent_dir
     176              )
     177          )
     178  
     179      last_line_fmt = (
     180          "Consider adding {} to PATH or, if you prefer "
     181          "to suppress this warning, use --no-warn-script-location."
     182      )
     183      if len(msg_lines) == 1:
     184          msg_lines.append(last_line_fmt.format("this directory"))
     185      else:
     186          msg_lines.append(last_line_fmt.format("these directories"))
     187  
     188      # Add a note if any directory starts with ~
     189      warn_for_tilde = any(
     190          i[0] == "~" for i in os.environ.get("PATH", "").split(os.pathsep) if i
     191      )
     192      if warn_for_tilde:
     193          tilde_warning_msg = (
     194              "NOTE: The current PATH contains path(s) starting with `~`, "
     195              "which may not be expanded by all applications."
     196          )
     197          msg_lines.append(tilde_warning_msg)
     198  
     199      # Returns the formatted multiline message
     200      return "\n".join(msg_lines)
     201  
     202  
     203  def _normalized_outrows(
     204      outrows: Iterable[InstalledCSVRow],
     205  ) -> List[Tuple[str, str, str]]:
     206      """Normalize the given rows of a RECORD file.
     207  
     208      Items in each row are converted into str. Rows are then sorted to make
     209      the value more predictable for tests.
     210  
     211      Each row is a 3-tuple (path, hash, size) and corresponds to a record of
     212      a RECORD file (see PEP 376 and PEP 427 for details).  For the rows
     213      passed to this function, the size can be an integer as an int or string,
     214      or the empty string.
     215      """
     216      # Normally, there should only be one row per path, in which case the
     217      # second and third elements don't come into play when sorting.
     218      # However, in cases in the wild where a path might happen to occur twice,
     219      # we don't want the sort operation to trigger an error (but still want
     220      # determinism).  Since the third element can be an int or string, we
     221      # coerce each element to a string to avoid a TypeError in this case.
     222      # For additional background, see--
     223      # https://github.com/pypa/pip/issues/5868
     224      return sorted(
     225          (record_path, hash_, str(size)) for record_path, hash_, size in outrows
     226      )
     227  
     228  
     229  def _record_to_fs_path(record_path: RecordPath, lib_dir: str) -> str:
     230      return os.path.join(lib_dir, record_path)
     231  
     232  
     233  def _fs_to_record_path(path: str, lib_dir: str) -> RecordPath:
     234      # On Windows, do not handle relative paths if they belong to different
     235      # logical disks
     236      if os.path.splitdrive(path)[0].lower() == os.path.splitdrive(lib_dir)[0].lower():
     237          path = os.path.relpath(path, lib_dir)
     238  
     239      path = path.replace(os.path.sep, "/")
     240      return cast("RecordPath", path)
     241  
     242  
     243  def get_csv_rows_for_installed(
     244      old_csv_rows: List[List[str]],
     245      installed: Dict[RecordPath, RecordPath],
     246      changed: Set[RecordPath],
     247      generated: List[str],
     248      lib_dir: str,
     249  ) -> List[InstalledCSVRow]:
     250      """
     251      :param installed: A map from archive RECORD path to installation RECORD
     252          path.
     253      """
     254      installed_rows: List[InstalledCSVRow] = []
     255      for row in old_csv_rows:
     256          if len(row) > 3:
     257              logger.warning("RECORD line has more than three elements: %s", row)
     258          old_record_path = cast("RecordPath", row[0])
     259          new_record_path = installed.pop(old_record_path, old_record_path)
     260          if new_record_path in changed:
     261              digest, length = rehash(_record_to_fs_path(new_record_path, lib_dir))
     262          else:
     263              digest = row[1] if len(row) > 1 else ""
     264              length = row[2] if len(row) > 2 else ""
     265          installed_rows.append((new_record_path, digest, length))
     266      for f in generated:
     267          path = _fs_to_record_path(f, lib_dir)
     268          digest, length = rehash(f)
     269          installed_rows.append((path, digest, length))
     270      for installed_record_path in installed.values():
     271          installed_rows.append((installed_record_path, "", ""))
     272      return installed_rows
     273  
     274  
     275  def get_console_script_specs(console: Dict[str, str]) -> List[str]:
     276      """
     277      Given the mapping from entrypoint name to callable, return the relevant
     278      console script specs.
     279      """
     280      # Don't mutate caller's version
     281      console = console.copy()
     282  
     283      scripts_to_generate = []
     284  
     285      # Special case pip and setuptools to generate versioned wrappers
     286      #
     287      # The issue is that some projects (specifically, pip and setuptools) use
     288      # code in setup.py to create "versioned" entry points - pip2.7 on Python
     289      # 2.7, pip3.3 on Python 3.3, etc. But these entry points are baked into
     290      # the wheel metadata at build time, and so if the wheel is installed with
     291      # a *different* version of Python the entry points will be wrong. The
     292      # correct fix for this is to enhance the metadata to be able to describe
     293      # such versioned entry points, but that won't happen till Metadata 2.0 is
     294      # available.
     295      # In the meantime, projects using versioned entry points will either have
     296      # incorrect versioned entry points, or they will not be able to distribute
     297      # "universal" wheels (i.e., they will need a wheel per Python version).
     298      #
     299      # Because setuptools and pip are bundled with _ensurepip and virtualenv,
     300      # we need to use universal wheels. So, as a stopgap until Metadata 2.0, we
     301      # override the versioned entry points in the wheel and generate the
     302      # correct ones. This code is purely a short-term measure until Metadata 2.0
     303      # is available.
     304      #
     305      # To add the level of hack in this section of code, in order to support
     306      # ensurepip this code will look for an ``ENSUREPIP_OPTIONS`` environment
     307      # variable which will control which version scripts get installed.
     308      #
     309      # ENSUREPIP_OPTIONS=altinstall
     310      #   - Only pipX.Y and easy_install-X.Y will be generated and installed
     311      # ENSUREPIP_OPTIONS=install
     312      #   - pipX.Y, pipX, easy_install-X.Y will be generated and installed. Note
     313      #     that this option is technically if ENSUREPIP_OPTIONS is set and is
     314      #     not altinstall
     315      # DEFAULT
     316      #   - The default behavior is to install pip, pipX, pipX.Y, easy_install
     317      #     and easy_install-X.Y.
     318      pip_script = console.pop("pip", None)
     319      if pip_script:
     320          if "ENSUREPIP_OPTIONS" not in os.environ:
     321              scripts_to_generate.append("pip = " + pip_script)
     322  
     323          if os.environ.get("ENSUREPIP_OPTIONS", "") != "altinstall":
     324              scripts_to_generate.append(
     325                  "pip{} = {}".format(sys.version_info[0], pip_script)
     326              )
     327  
     328          scripts_to_generate.append(f"pip{get_major_minor_version()} = {pip_script}")
     329          # Delete any other versioned pip entry points
     330          pip_ep = [k for k in console if re.match(r"pip(\d+(\.\d+)?)?$", k)]
     331          for k in pip_ep:
     332              del console[k]
     333      easy_install_script = console.pop("easy_install", None)
     334      if easy_install_script:
     335          if "ENSUREPIP_OPTIONS" not in os.environ:
     336              scripts_to_generate.append("easy_install = " + easy_install_script)
     337  
     338          scripts_to_generate.append(
     339              "easy_install-{} = {}".format(
     340                  get_major_minor_version(), easy_install_script
     341              )
     342          )
     343          # Delete any other versioned easy_install entry points
     344          easy_install_ep = [
     345              k for k in console if re.match(r"easy_install(-\d+\.\d+)?$", k)
     346          ]
     347          for k in easy_install_ep:
     348              del console[k]
     349  
     350      # Generate the console entry points specified in the wheel
     351      scripts_to_generate.extend(starmap("{} = {}".format, console.items()))
     352  
     353      return scripts_to_generate
     354  
     355  
     356  class ESC[4;38;5;81mZipBackedFile:
     357      def __init__(
     358          self, src_record_path: RecordPath, dest_path: str, zip_file: ZipFile
     359      ) -> None:
     360          self.src_record_path = src_record_path
     361          self.dest_path = dest_path
     362          self._zip_file = zip_file
     363          self.changed = False
     364  
     365      def _getinfo(self) -> ZipInfo:
     366          return self._zip_file.getinfo(self.src_record_path)
     367  
     368      def save(self) -> None:
     369          # directory creation is lazy and after file filtering
     370          # to ensure we don't install empty dirs; empty dirs can't be
     371          # uninstalled.
     372          parent_dir = os.path.dirname(self.dest_path)
     373          ensure_dir(parent_dir)
     374  
     375          # When we open the output file below, any existing file is truncated
     376          # before we start writing the new contents. This is fine in most
     377          # cases, but can cause a segfault if pip has loaded a shared
     378          # object (e.g. from pyopenssl through its vendored urllib3)
     379          # Since the shared object is mmap'd an attempt to call a
     380          # symbol in it will then cause a segfault. Unlinking the file
     381          # allows writing of new contents while allowing the process to
     382          # continue to use the old copy.
     383          if os.path.exists(self.dest_path):
     384              os.unlink(self.dest_path)
     385  
     386          zipinfo = self._getinfo()
     387  
     388          with self._zip_file.open(zipinfo) as f:
     389              with open(self.dest_path, "wb") as dest:
     390                  shutil.copyfileobj(f, dest)
     391  
     392          if zip_item_is_executable(zipinfo):
     393              set_extracted_file_to_default_mode_plus_executable(self.dest_path)
     394  
     395  
     396  class ESC[4;38;5;81mScriptFile:
     397      def __init__(self, file: "File") -> None:
     398          self._file = file
     399          self.src_record_path = self._file.src_record_path
     400          self.dest_path = self._file.dest_path
     401          self.changed = False
     402  
     403      def save(self) -> None:
     404          self._file.save()
     405          self.changed = fix_script(self.dest_path)
     406  
     407  
     408  class ESC[4;38;5;81mMissingCallableSuffix(ESC[4;38;5;149mInstallationError):
     409      def __init__(self, entry_point: str) -> None:
     410          super().__init__(
     411              "Invalid script entry point: {} - A callable "
     412              "suffix is required. Cf https://packaging.python.org/"
     413              "specifications/entry-points/#use-for-scripts for more "
     414              "information.".format(entry_point)
     415          )
     416  
     417  
     418  def _raise_for_invalid_entrypoint(specification: str) -> None:
     419      entry = get_export_entry(specification)
     420      if entry is not None and entry.suffix is None:
     421          raise MissingCallableSuffix(str(entry))
     422  
     423  
     424  class ESC[4;38;5;81mPipScriptMaker(ESC[4;38;5;149mScriptMaker):
     425      def make(
     426          self, specification: str, options: Optional[Dict[str, Any]] = None
     427      ) -> List[str]:
     428          _raise_for_invalid_entrypoint(specification)
     429          return super().make(specification, options)
     430  
     431  
     432  def _install_wheel(
     433      name: str,
     434      wheel_zip: ZipFile,
     435      wheel_path: str,
     436      scheme: Scheme,
     437      pycompile: bool = True,
     438      warn_script_location: bool = True,
     439      direct_url: Optional[DirectUrl] = None,
     440      requested: bool = False,
     441  ) -> None:
     442      """Install a wheel.
     443  
     444      :param name: Name of the project to install
     445      :param wheel_zip: open ZipFile for wheel being installed
     446      :param scheme: Distutils scheme dictating the install directories
     447      :param req_description: String used in place of the requirement, for
     448          logging
     449      :param pycompile: Whether to byte-compile installed Python files
     450      :param warn_script_location: Whether to check that scripts are installed
     451          into a directory on PATH
     452      :raises UnsupportedWheel:
     453          * when the directory holds an unpacked wheel with incompatible
     454            Wheel-Version
     455          * when the .dist-info dir does not match the wheel
     456      """
     457      info_dir, metadata = parse_wheel(wheel_zip, name)
     458  
     459      if wheel_root_is_purelib(metadata):
     460          lib_dir = scheme.purelib
     461      else:
     462          lib_dir = scheme.platlib
     463  
     464      # Record details of the files moved
     465      #   installed = files copied from the wheel to the destination
     466      #   changed = files changed while installing (scripts #! line typically)
     467      #   generated = files newly generated during the install (script wrappers)
     468      installed: Dict[RecordPath, RecordPath] = {}
     469      changed: Set[RecordPath] = set()
     470      generated: List[str] = []
     471  
     472      def record_installed(
     473          srcfile: RecordPath, destfile: str, modified: bool = False
     474      ) -> None:
     475          """Map archive RECORD paths to installation RECORD paths."""
     476          newpath = _fs_to_record_path(destfile, lib_dir)
     477          installed[srcfile] = newpath
     478          if modified:
     479              changed.add(newpath)
     480  
     481      def is_dir_path(path: RecordPath) -> bool:
     482          return path.endswith("/")
     483  
     484      def assert_no_path_traversal(dest_dir_path: str, target_path: str) -> None:
     485          if not is_within_directory(dest_dir_path, target_path):
     486              message = (
     487                  "The wheel {!r} has a file {!r} trying to install"
     488                  " outside the target directory {!r}"
     489              )
     490              raise InstallationError(
     491                  message.format(wheel_path, target_path, dest_dir_path)
     492              )
     493  
     494      def root_scheme_file_maker(
     495          zip_file: ZipFile, dest: str
     496      ) -> Callable[[RecordPath], "File"]:
     497          def make_root_scheme_file(record_path: RecordPath) -> "File":
     498              normed_path = os.path.normpath(record_path)
     499              dest_path = os.path.join(dest, normed_path)
     500              assert_no_path_traversal(dest, dest_path)
     501              return ZipBackedFile(record_path, dest_path, zip_file)
     502  
     503          return make_root_scheme_file
     504  
     505      def data_scheme_file_maker(
     506          zip_file: ZipFile, scheme: Scheme
     507      ) -> Callable[[RecordPath], "File"]:
     508          scheme_paths = {key: getattr(scheme, key) for key in SCHEME_KEYS}
     509  
     510          def make_data_scheme_file(record_path: RecordPath) -> "File":
     511              normed_path = os.path.normpath(record_path)
     512              try:
     513                  _, scheme_key, dest_subpath = normed_path.split(os.path.sep, 2)
     514              except ValueError:
     515                  message = (
     516                      "Unexpected file in {}: {!r}. .data directory contents"
     517                      " should be named like: '<scheme key>/<path>'."
     518                  ).format(wheel_path, record_path)
     519                  raise InstallationError(message)
     520  
     521              try:
     522                  scheme_path = scheme_paths[scheme_key]
     523              except KeyError:
     524                  valid_scheme_keys = ", ".join(sorted(scheme_paths))
     525                  message = (
     526                      "Unknown scheme key used in {}: {} (for file {!r}). .data"
     527                      " directory contents should be in subdirectories named"
     528                      " with a valid scheme key ({})"
     529                  ).format(wheel_path, scheme_key, record_path, valid_scheme_keys)
     530                  raise InstallationError(message)
     531  
     532              dest_path = os.path.join(scheme_path, dest_subpath)
     533              assert_no_path_traversal(scheme_path, dest_path)
     534              return ZipBackedFile(record_path, dest_path, zip_file)
     535  
     536          return make_data_scheme_file
     537  
     538      def is_data_scheme_path(path: RecordPath) -> bool:
     539          return path.split("/", 1)[0].endswith(".data")
     540  
     541      paths = cast(List[RecordPath], wheel_zip.namelist())
     542      file_paths = filterfalse(is_dir_path, paths)
     543      root_scheme_paths, data_scheme_paths = partition(is_data_scheme_path, file_paths)
     544  
     545      make_root_scheme_file = root_scheme_file_maker(wheel_zip, lib_dir)
     546      files: Iterator[File] = map(make_root_scheme_file, root_scheme_paths)
     547  
     548      def is_script_scheme_path(path: RecordPath) -> bool:
     549          parts = path.split("/", 2)
     550          return len(parts) > 2 and parts[0].endswith(".data") and parts[1] == "scripts"
     551  
     552      other_scheme_paths, script_scheme_paths = partition(
     553          is_script_scheme_path, data_scheme_paths
     554      )
     555  
     556      make_data_scheme_file = data_scheme_file_maker(wheel_zip, scheme)
     557      other_scheme_files = map(make_data_scheme_file, other_scheme_paths)
     558      files = chain(files, other_scheme_files)
     559  
     560      # Get the defined entry points
     561      distribution = get_wheel_distribution(
     562          FilesystemWheel(wheel_path),
     563          canonicalize_name(name),
     564      )
     565      console, gui = get_entrypoints(distribution)
     566  
     567      def is_entrypoint_wrapper(file: "File") -> bool:
     568          # EP, EP.exe and EP-script.py are scripts generated for
     569          # entry point EP by setuptools
     570          path = file.dest_path
     571          name = os.path.basename(path)
     572          if name.lower().endswith(".exe"):
     573              matchname = name[:-4]
     574          elif name.lower().endswith("-script.py"):
     575              matchname = name[:-10]
     576          elif name.lower().endswith(".pya"):
     577              matchname = name[:-4]
     578          else:
     579              matchname = name
     580          # Ignore setuptools-generated scripts
     581          return matchname in console or matchname in gui
     582  
     583      script_scheme_files: Iterator[File] = map(
     584          make_data_scheme_file, script_scheme_paths
     585      )
     586      script_scheme_files = filterfalse(is_entrypoint_wrapper, script_scheme_files)
     587      script_scheme_files = map(ScriptFile, script_scheme_files)
     588      files = chain(files, script_scheme_files)
     589  
     590      for file in files:
     591          file.save()
     592          record_installed(file.src_record_path, file.dest_path, file.changed)
     593  
     594      def pyc_source_file_paths() -> Generator[str, None, None]:
     595          # We de-duplicate installation paths, since there can be overlap (e.g.
     596          # file in .data maps to same location as file in wheel root).
     597          # Sorting installation paths makes it easier to reproduce and debug
     598          # issues related to permissions on existing files.
     599          for installed_path in sorted(set(installed.values())):
     600              full_installed_path = os.path.join(lib_dir, installed_path)
     601              if not os.path.isfile(full_installed_path):
     602                  continue
     603              if not full_installed_path.endswith(".py"):
     604                  continue
     605              yield full_installed_path
     606  
     607      def pyc_output_path(path: str) -> str:
     608          """Return the path the pyc file would have been written to."""
     609          return importlib.util.cache_from_source(path)
     610  
     611      # Compile all of the pyc files for the installed files
     612      if pycompile:
     613          with captured_stdout() as stdout:
     614              with warnings.catch_warnings():
     615                  warnings.filterwarnings("ignore")
     616                  for path in pyc_source_file_paths():
     617                      success = compileall.compile_file(path, force=True, quiet=True)
     618                      if success:
     619                          pyc_path = pyc_output_path(path)
     620                          assert os.path.exists(pyc_path)
     621                          pyc_record_path = cast(
     622                              "RecordPath", pyc_path.replace(os.path.sep, "/")
     623                          )
     624                          record_installed(pyc_record_path, pyc_path)
     625          logger.debug(stdout.getvalue())
     626  
     627      maker = PipScriptMaker(None, scheme.scripts)
     628  
     629      # Ensure old scripts are overwritten.
     630      # See https://github.com/pypa/pip/issues/1800
     631      maker.clobber = True
     632  
     633      # Ensure we don't generate any variants for scripts because this is almost
     634      # never what somebody wants.
     635      # See https://bitbucket.org/pypa/distlib/issue/35/
     636      maker.variants = {""}
     637  
     638      # This is required because otherwise distlib creates scripts that are not
     639      # executable.
     640      # See https://bitbucket.org/pypa/distlib/issue/32/
     641      maker.set_mode = True
     642  
     643      # Generate the console and GUI entry points specified in the wheel
     644      scripts_to_generate = get_console_script_specs(console)
     645  
     646      gui_scripts_to_generate = list(starmap("{} = {}".format, gui.items()))
     647  
     648      generated_console_scripts = maker.make_multiple(scripts_to_generate)
     649      generated.extend(generated_console_scripts)
     650  
     651      generated.extend(maker.make_multiple(gui_scripts_to_generate, {"gui": True}))
     652  
     653      if warn_script_location:
     654          msg = message_about_scripts_not_on_PATH(generated_console_scripts)
     655          if msg is not None:
     656              logger.warning(msg)
     657  
     658      generated_file_mode = 0o666 & ~current_umask()
     659  
     660      @contextlib.contextmanager
     661      def _generate_file(path: str, **kwargs: Any) -> Generator[BinaryIO, None, None]:
     662          with adjacent_tmp_file(path, **kwargs) as f:
     663              yield f
     664          os.chmod(f.name, generated_file_mode)
     665          replace(f.name, path)
     666  
     667      dest_info_dir = os.path.join(lib_dir, info_dir)
     668  
     669      # Record pip as the installer
     670      installer_path = os.path.join(dest_info_dir, "INSTALLER")
     671      with _generate_file(installer_path) as installer_file:
     672          installer_file.write(b"pip\n")
     673      generated.append(installer_path)
     674  
     675      # Record the PEP 610 direct URL reference
     676      if direct_url is not None:
     677          direct_url_path = os.path.join(dest_info_dir, DIRECT_URL_METADATA_NAME)
     678          with _generate_file(direct_url_path) as direct_url_file:
     679              direct_url_file.write(direct_url.to_json().encode("utf-8"))
     680          generated.append(direct_url_path)
     681  
     682      # Record the REQUESTED file
     683      if requested:
     684          requested_path = os.path.join(dest_info_dir, "REQUESTED")
     685          with open(requested_path, "wb"):
     686              pass
     687          generated.append(requested_path)
     688  
     689      record_text = distribution.read_text("RECORD")
     690      record_rows = list(csv.reader(record_text.splitlines()))
     691  
     692      rows = get_csv_rows_for_installed(
     693          record_rows,
     694          installed=installed,
     695          changed=changed,
     696          generated=generated,
     697          lib_dir=lib_dir,
     698      )
     699  
     700      # Record details of all files installed
     701      record_path = os.path.join(dest_info_dir, "RECORD")
     702  
     703      with _generate_file(record_path, **csv_io_kwargs("w")) as record_file:
     704          # Explicitly cast to typing.IO[str] as a workaround for the mypy error:
     705          # "writer" has incompatible type "BinaryIO"; expected "_Writer"
     706          writer = csv.writer(cast("IO[str]", record_file))
     707          writer.writerows(_normalized_outrows(rows))
     708  
     709  
     710  @contextlib.contextmanager
     711  def req_error_context(req_description: str) -> Generator[None, None, None]:
     712      try:
     713          yield
     714      except InstallationError as e:
     715          message = "For req: {}. {}".format(req_description, e.args[0])
     716          raise InstallationError(message) from e
     717  
     718  
     719  def install_wheel(
     720      name: str,
     721      wheel_path: str,
     722      scheme: Scheme,
     723      req_description: str,
     724      pycompile: bool = True,
     725      warn_script_location: bool = True,
     726      direct_url: Optional[DirectUrl] = None,
     727      requested: bool = False,
     728  ) -> None:
     729      with ZipFile(wheel_path, allowZip64=True) as z:
     730          with req_error_context(req_description):
     731              _install_wheel(
     732                  name=name,
     733                  wheel_zip=z,
     734                  wheel_path=wheel_path,
     735                  scheme=scheme,
     736                  pycompile=pycompile,
     737                  warn_script_location=warn_script_location,
     738                  direct_url=direct_url,
     739                  requested=requested,
     740              )