python (3.11.7)
       1  import errno
       2  import itertools
       3  import logging
       4  import os.path
       5  import tempfile
       6  from contextlib import ExitStack, contextmanager
       7  from typing import Any, Dict, Generator, Optional, TypeVar, Union
       8  
       9  from pip._internal.utils.misc import enum, rmtree
      10  
      11  logger = logging.getLogger(__name__)
      12  
      13  _T = TypeVar("_T", bound="TempDirectory")
      14  
      15  
      16  # Kinds of temporary directories. Only needed for ones that are
      17  # globally-managed.
      18  tempdir_kinds = enum(
      19      BUILD_ENV="build-env",
      20      EPHEM_WHEEL_CACHE="ephem-wheel-cache",
      21      REQ_BUILD="req-build",
      22  )
      23  
      24  
      25  _tempdir_manager: Optional[ExitStack] = None
      26  
      27  
      28  @contextmanager
      29  def global_tempdir_manager() -> Generator[None, None, None]:
      30      global _tempdir_manager
      31      with ExitStack() as stack:
      32          old_tempdir_manager, _tempdir_manager = _tempdir_manager, stack
      33          try:
      34              yield
      35          finally:
      36              _tempdir_manager = old_tempdir_manager
      37  
      38  
      39  class ESC[4;38;5;81mTempDirectoryTypeRegistry:
      40      """Manages temp directory behavior"""
      41  
      42      def __init__(self) -> None:
      43          self._should_delete: Dict[str, bool] = {}
      44  
      45      def set_delete(self, kind: str, value: bool) -> None:
      46          """Indicate whether a TempDirectory of the given kind should be
      47          auto-deleted.
      48          """
      49          self._should_delete[kind] = value
      50  
      51      def get_delete(self, kind: str) -> bool:
      52          """Get configured auto-delete flag for a given TempDirectory type,
      53          default True.
      54          """
      55          return self._should_delete.get(kind, True)
      56  
      57  
      58  _tempdir_registry: Optional[TempDirectoryTypeRegistry] = None
      59  
      60  
      61  @contextmanager
      62  def tempdir_registry() -> Generator[TempDirectoryTypeRegistry, None, None]:
      63      """Provides a scoped global tempdir registry that can be used to dictate
      64      whether directories should be deleted.
      65      """
      66      global _tempdir_registry
      67      old_tempdir_registry = _tempdir_registry
      68      _tempdir_registry = TempDirectoryTypeRegistry()
      69      try:
      70          yield _tempdir_registry
      71      finally:
      72          _tempdir_registry = old_tempdir_registry
      73  
      74  
      75  class ESC[4;38;5;81m_Default:
      76      pass
      77  
      78  
      79  _default = _Default()
      80  
      81  
      82  class ESC[4;38;5;81mTempDirectory:
      83      """Helper class that owns and cleans up a temporary directory.
      84  
      85      This class can be used as a context manager or as an OO representation of a
      86      temporary directory.
      87  
      88      Attributes:
      89          path
      90              Location to the created temporary directory
      91          delete
      92              Whether the directory should be deleted when exiting
      93              (when used as a contextmanager)
      94  
      95      Methods:
      96          cleanup()
      97              Deletes the temporary directory
      98  
      99      When used as a context manager, if the delete attribute is True, on
     100      exiting the context the temporary directory is deleted.
     101      """
     102  
     103      def __init__(
     104          self,
     105          path: Optional[str] = None,
     106          delete: Union[bool, None, _Default] = _default,
     107          kind: str = "temp",
     108          globally_managed: bool = False,
     109      ):
     110          super().__init__()
     111  
     112          if delete is _default:
     113              if path is not None:
     114                  # If we were given an explicit directory, resolve delete option
     115                  # now.
     116                  delete = False
     117              else:
     118                  # Otherwise, we wait until cleanup and see what
     119                  # tempdir_registry says.
     120                  delete = None
     121  
     122          # The only time we specify path is in for editables where it
     123          # is the value of the --src option.
     124          if path is None:
     125              path = self._create(kind)
     126  
     127          self._path = path
     128          self._deleted = False
     129          self.delete = delete
     130          self.kind = kind
     131  
     132          if globally_managed:
     133              assert _tempdir_manager is not None
     134              _tempdir_manager.enter_context(self)
     135  
     136      @property
     137      def path(self) -> str:
     138          assert not self._deleted, f"Attempted to access deleted path: {self._path}"
     139          return self._path
     140  
     141      def __repr__(self) -> str:
     142          return f"<{self.__class__.__name__} {self.path!r}>"
     143  
     144      def __enter__(self: _T) -> _T:
     145          return self
     146  
     147      def __exit__(self, exc: Any, value: Any, tb: Any) -> None:
     148          if self.delete is not None:
     149              delete = self.delete
     150          elif _tempdir_registry:
     151              delete = _tempdir_registry.get_delete(self.kind)
     152          else:
     153              delete = True
     154  
     155          if delete:
     156              self.cleanup()
     157  
     158      def _create(self, kind: str) -> str:
     159          """Create a temporary directory and store its path in self.path"""
     160          # We realpath here because some systems have their default tmpdir
     161          # symlinked to another directory.  This tends to confuse build
     162          # scripts, so we canonicalize the path by traversing potential
     163          # symlinks here.
     164          path = os.path.realpath(tempfile.mkdtemp(prefix=f"pip-{kind}-"))
     165          logger.debug("Created temporary directory: %s", path)
     166          return path
     167  
     168      def cleanup(self) -> None:
     169          """Remove the temporary directory created and reset state"""
     170          self._deleted = True
     171          if not os.path.exists(self._path):
     172              return
     173          rmtree(self._path)
     174  
     175  
     176  class ESC[4;38;5;81mAdjacentTempDirectory(ESC[4;38;5;149mTempDirectory):
     177      """Helper class that creates a temporary directory adjacent to a real one.
     178  
     179      Attributes:
     180          original
     181              The original directory to create a temp directory for.
     182          path
     183              After calling create() or entering, contains the full
     184              path to the temporary directory.
     185          delete
     186              Whether the directory should be deleted when exiting
     187              (when used as a contextmanager)
     188  
     189      """
     190  
     191      # The characters that may be used to name the temp directory
     192      # We always prepend a ~ and then rotate through these until
     193      # a usable name is found.
     194      # pkg_resources raises a different error for .dist-info folder
     195      # with leading '-' and invalid metadata
     196      LEADING_CHARS = "-~.=%0123456789"
     197  
     198      def __init__(self, original: str, delete: Optional[bool] = None) -> None:
     199          self.original = original.rstrip("/\\")
     200          super().__init__(delete=delete)
     201  
     202      @classmethod
     203      def _generate_names(cls, name: str) -> Generator[str, None, None]:
     204          """Generates a series of temporary names.
     205  
     206          The algorithm replaces the leading characters in the name
     207          with ones that are valid filesystem characters, but are not
     208          valid package names (for both Python and pip definitions of
     209          package).
     210          """
     211          for i in range(1, len(name)):
     212              for candidate in itertools.combinations_with_replacement(
     213                  cls.LEADING_CHARS, i - 1
     214              ):
     215                  new_name = "~" + "".join(candidate) + name[i:]
     216                  if new_name != name:
     217                      yield new_name
     218  
     219          # If we make it this far, we will have to make a longer name
     220          for i in range(len(cls.LEADING_CHARS)):
     221              for candidate in itertools.combinations_with_replacement(
     222                  cls.LEADING_CHARS, i
     223              ):
     224                  new_name = "~" + "".join(candidate) + name
     225                  if new_name != name:
     226                      yield new_name
     227  
     228      def _create(self, kind: str) -> str:
     229          root, name = os.path.split(self.original)
     230          for candidate in self._generate_names(name):
     231              path = os.path.join(root, candidate)
     232              try:
     233                  os.mkdir(path)
     234              except OSError as ex:
     235                  # Continue if the name exists already
     236                  if ex.errno != errno.EEXIST:
     237                      raise
     238              else:
     239                  path = os.path.realpath(path)
     240                  break
     241          else:
     242              # Final fallback on the default behavior.
     243              path = os.path.realpath(tempfile.mkdtemp(prefix=f"pip-{kind}-"))
     244  
     245          logger.debug("Created temporary directory: %s", path)
     246          return path