python (3.11.7)
       1  import email.message
       2  import email.parser
       3  import logging
       4  import os
       5  import zipfile
       6  from typing import Collection, Iterable, Iterator, List, Mapping, NamedTuple, Optional
       7  
       8  from pip._vendor import pkg_resources
       9  from pip._vendor.packaging.requirements import Requirement
      10  from pip._vendor.packaging.utils import NormalizedName, canonicalize_name
      11  from pip._vendor.packaging.version import parse as parse_version
      12  
      13  from pip._internal.exceptions import InvalidWheel, NoneMetadataError, UnsupportedWheel
      14  from pip._internal.utils.egg_link import egg_link_path_from_location
      15  from pip._internal.utils.misc import display_path, normalize_path
      16  from pip._internal.utils.wheel import parse_wheel, read_wheel_metadata_file
      17  
      18  from .base import (
      19      BaseDistribution,
      20      BaseEntryPoint,
      21      BaseEnvironment,
      22      DistributionVersion,
      23      InfoPath,
      24      Wheel,
      25  )
      26  
      27  logger = logging.getLogger(__name__)
      28  
      29  
      30  class ESC[4;38;5;81mEntryPoint(ESC[4;38;5;149mNamedTuple):
      31      name: str
      32      value: str
      33      group: str
      34  
      35  
      36  class ESC[4;38;5;81mInMemoryMetadata:
      37      """IMetadataProvider that reads metadata files from a dictionary.
      38  
      39      This also maps metadata decoding exceptions to our internal exception type.
      40      """
      41  
      42      def __init__(self, metadata: Mapping[str, bytes], wheel_name: str) -> None:
      43          self._metadata = metadata
      44          self._wheel_name = wheel_name
      45  
      46      def has_metadata(self, name: str) -> bool:
      47          return name in self._metadata
      48  
      49      def get_metadata(self, name: str) -> str:
      50          try:
      51              return self._metadata[name].decode()
      52          except UnicodeDecodeError as e:
      53              # Augment the default error with the origin of the file.
      54              raise UnsupportedWheel(
      55                  f"Error decoding metadata for {self._wheel_name}: {e} in {name} file"
      56              )
      57  
      58      def get_metadata_lines(self, name: str) -> Iterable[str]:
      59          return pkg_resources.yield_lines(self.get_metadata(name))
      60  
      61      def metadata_isdir(self, name: str) -> bool:
      62          return False
      63  
      64      def metadata_listdir(self, name: str) -> List[str]:
      65          return []
      66  
      67      def run_script(self, script_name: str, namespace: str) -> None:
      68          pass
      69  
      70  
      71  class ESC[4;38;5;81mDistribution(ESC[4;38;5;149mBaseDistribution):
      72      def __init__(self, dist: pkg_resources.Distribution) -> None:
      73          self._dist = dist
      74  
      75      @classmethod
      76      def from_directory(cls, directory: str) -> BaseDistribution:
      77          dist_dir = directory.rstrip(os.sep)
      78  
      79          # Build a PathMetadata object, from path to metadata. :wink:
      80          base_dir, dist_dir_name = os.path.split(dist_dir)
      81          metadata = pkg_resources.PathMetadata(base_dir, dist_dir)
      82  
      83          # Determine the correct Distribution object type.
      84          if dist_dir.endswith(".egg-info"):
      85              dist_cls = pkg_resources.Distribution
      86              dist_name = os.path.splitext(dist_dir_name)[0]
      87          else:
      88              assert dist_dir.endswith(".dist-info")
      89              dist_cls = pkg_resources.DistInfoDistribution
      90              dist_name = os.path.splitext(dist_dir_name)[0].split("-")[0]
      91  
      92          dist = dist_cls(base_dir, project_name=dist_name, metadata=metadata)
      93          return cls(dist)
      94  
      95      @classmethod
      96      def from_metadata_file_contents(
      97          cls,
      98          metadata_contents: bytes,
      99          filename: str,
     100          project_name: str,
     101      ) -> BaseDistribution:
     102          metadata_dict = {
     103              "METADATA": metadata_contents,
     104          }
     105          dist = pkg_resources.DistInfoDistribution(
     106              location=filename,
     107              metadata=InMemoryMetadata(metadata_dict, filename),
     108              project_name=project_name,
     109          )
     110          return cls(dist)
     111  
     112      @classmethod
     113      def from_wheel(cls, wheel: Wheel, name: str) -> BaseDistribution:
     114          try:
     115              with wheel.as_zipfile() as zf:
     116                  info_dir, _ = parse_wheel(zf, name)
     117                  metadata_dict = {
     118                      path.split("/", 1)[-1]: read_wheel_metadata_file(zf, path)
     119                      for path in zf.namelist()
     120                      if path.startswith(f"{info_dir}/")
     121                  }
     122          except zipfile.BadZipFile as e:
     123              raise InvalidWheel(wheel.location, name) from e
     124          except UnsupportedWheel as e:
     125              raise UnsupportedWheel(f"{name} has an invalid wheel, {e}")
     126          dist = pkg_resources.DistInfoDistribution(
     127              location=wheel.location,
     128              metadata=InMemoryMetadata(metadata_dict, wheel.location),
     129              project_name=name,
     130          )
     131          return cls(dist)
     132  
     133      @property
     134      def location(self) -> Optional[str]:
     135          return self._dist.location
     136  
     137      @property
     138      def installed_location(self) -> Optional[str]:
     139          egg_link = egg_link_path_from_location(self.raw_name)
     140          if egg_link:
     141              location = egg_link
     142          elif self.location:
     143              location = self.location
     144          else:
     145              return None
     146          return normalize_path(location)
     147  
     148      @property
     149      def info_location(self) -> Optional[str]:
     150          return self._dist.egg_info
     151  
     152      @property
     153      def installed_by_distutils(self) -> bool:
     154          # A distutils-installed distribution is provided by FileMetadata. This
     155          # provider has a "path" attribute not present anywhere else. Not the
     156          # best introspection logic, but pip has been doing this for a long time.
     157          try:
     158              return bool(self._dist._provider.path)
     159          except AttributeError:
     160              return False
     161  
     162      @property
     163      def canonical_name(self) -> NormalizedName:
     164          return canonicalize_name(self._dist.project_name)
     165  
     166      @property
     167      def version(self) -> DistributionVersion:
     168          return parse_version(self._dist.version)
     169  
     170      def is_file(self, path: InfoPath) -> bool:
     171          return self._dist.has_metadata(str(path))
     172  
     173      def iter_distutils_script_names(self) -> Iterator[str]:
     174          yield from self._dist.metadata_listdir("scripts")
     175  
     176      def read_text(self, path: InfoPath) -> str:
     177          name = str(path)
     178          if not self._dist.has_metadata(name):
     179              raise FileNotFoundError(name)
     180          content = self._dist.get_metadata(name)
     181          if content is None:
     182              raise NoneMetadataError(self, name)
     183          return content
     184  
     185      def iter_entry_points(self) -> Iterable[BaseEntryPoint]:
     186          for group, entries in self._dist.get_entry_map().items():
     187              for name, entry_point in entries.items():
     188                  name, _, value = str(entry_point).partition("=")
     189                  yield EntryPoint(name=name.strip(), value=value.strip(), group=group)
     190  
     191      def _metadata_impl(self) -> email.message.Message:
     192          """
     193          :raises NoneMetadataError: if the distribution reports `has_metadata()`
     194              True but `get_metadata()` returns None.
     195          """
     196          if isinstance(self._dist, pkg_resources.DistInfoDistribution):
     197              metadata_name = "METADATA"
     198          else:
     199              metadata_name = "PKG-INFO"
     200          try:
     201              metadata = self.read_text(metadata_name)
     202          except FileNotFoundError:
     203              if self.location:
     204                  displaying_path = display_path(self.location)
     205              else:
     206                  displaying_path = repr(self.location)
     207              logger.warning("No metadata found in %s", displaying_path)
     208              metadata = ""
     209          feed_parser = email.parser.FeedParser()
     210          feed_parser.feed(metadata)
     211          return feed_parser.close()
     212  
     213      def iter_dependencies(self, extras: Collection[str] = ()) -> Iterable[Requirement]:
     214          if extras:  # pkg_resources raises on invalid extras, so we sanitize.
     215              extras = frozenset(extras).intersection(self._dist.extras)
     216          return self._dist.requires(extras)
     217  
     218      def iter_provided_extras(self) -> Iterable[str]:
     219          return self._dist.extras
     220  
     221  
     222  class ESC[4;38;5;81mEnvironment(ESC[4;38;5;149mBaseEnvironment):
     223      def __init__(self, ws: pkg_resources.WorkingSet) -> None:
     224          self._ws = ws
     225  
     226      @classmethod
     227      def default(cls) -> BaseEnvironment:
     228          return cls(pkg_resources.working_set)
     229  
     230      @classmethod
     231      def from_paths(cls, paths: Optional[List[str]]) -> BaseEnvironment:
     232          return cls(pkg_resources.WorkingSet(paths))
     233  
     234      def _iter_distributions(self) -> Iterator[BaseDistribution]:
     235          for dist in self._ws:
     236              yield Distribution(dist)
     237  
     238      def _search_distribution(self, name: str) -> Optional[BaseDistribution]:
     239          """Find a distribution matching the ``name`` in the environment.
     240  
     241          This searches from *all* distributions available in the environment, to
     242          match the behavior of ``pkg_resources.get_distribution()``.
     243          """
     244          canonical_name = canonicalize_name(name)
     245          for dist in self.iter_all_distributions():
     246              if dist.canonical_name == canonical_name:
     247                  return dist
     248          return None
     249  
     250      def get_distribution(self, name: str) -> Optional[BaseDistribution]:
     251          # Search the distribution by looking through the working set.
     252          dist = self._search_distribution(name)
     253          if dist:
     254              return dist
     255  
     256          # If distribution could not be found, call working_set.require to
     257          # update the working set, and try to find the distribution again.
     258          # This might happen for e.g. when you install a package twice, once
     259          # using setup.py develop and again using setup.py install. Now when
     260          # running pip uninstall twice, the package gets removed from the
     261          # working set in the first uninstall, so we have to populate the
     262          # working set again so that pip knows about it and the packages gets
     263          # picked up and is successfully uninstalled the second time too.
     264          try:
     265              # We didn't pass in any version specifiers, so this can never
     266              # raise pkg_resources.VersionConflict.
     267              self._ws.require(name)
     268          except pkg_resources.DistributionNotFound:
     269              return None
     270          return self._search_distribution(name)