(root)/
Python-3.12.0/
Lib/
test/
test_importlib/
util.py
       1  import builtins
       2  import contextlib
       3  import errno
       4  import functools
       5  from importlib import machinery, util, invalidate_caches
       6  import marshal
       7  import os
       8  import os.path
       9  from test.support import import_helper
      10  from test.support import os_helper
      11  import unittest
      12  import sys
      13  import tempfile
      14  import types
      15  
      16  
      17  BUILTINS = types.SimpleNamespace()
      18  BUILTINS.good_name = None
      19  BUILTINS.bad_name = None
      20  if 'errno' in sys.builtin_module_names:
      21      BUILTINS.good_name = 'errno'
      22  if 'importlib' not in sys.builtin_module_names:
      23      BUILTINS.bad_name = 'importlib'
      24  
      25  EXTENSIONS = types.SimpleNamespace()
      26  EXTENSIONS.path = None
      27  EXTENSIONS.ext = None
      28  EXTENSIONS.filename = None
      29  EXTENSIONS.file_path = None
      30  EXTENSIONS.name = '_testsinglephase'
      31  
      32  def _extension_details():
      33      global EXTENSIONS
      34      for path in sys.path:
      35          for ext in machinery.EXTENSION_SUFFIXES:
      36              filename = EXTENSIONS.name + ext
      37              file_path = os.path.join(path, filename)
      38              if os.path.exists(file_path):
      39                  EXTENSIONS.path = path
      40                  EXTENSIONS.ext = ext
      41                  EXTENSIONS.filename = filename
      42                  EXTENSIONS.file_path = file_path
      43                  return
      44  
      45  _extension_details()
      46  
      47  
      48  def import_importlib(module_name):
      49      """Import a module from importlib both w/ and w/o _frozen_importlib."""
      50      fresh = ('importlib',) if '.' in module_name else ()
      51      frozen = import_helper.import_fresh_module(module_name)
      52      source = import_helper.import_fresh_module(module_name, fresh=fresh,
      53                                           blocked=('_frozen_importlib', '_frozen_importlib_external'))
      54      return {'Frozen': frozen, 'Source': source}
      55  
      56  
      57  def specialize_class(cls, kind, base=None, **kwargs):
      58      # XXX Support passing in submodule names--load (and cache) them?
      59      # That would clean up the test modules a bit more.
      60      if base is None:
      61          base = unittest.TestCase
      62      elif not isinstance(base, type):
      63          base = base[kind]
      64      name = '{}_{}'.format(kind, cls.__name__)
      65      bases = (cls, base)
      66      specialized = types.new_class(name, bases)
      67      specialized.__module__ = cls.__module__
      68      specialized._NAME = cls.__name__
      69      specialized._KIND = kind
      70      for attr, values in kwargs.items():
      71          value = values[kind]
      72          setattr(specialized, attr, value)
      73      return specialized
      74  
      75  
      76  def split_frozen(cls, base=None, **kwargs):
      77      frozen = specialize_class(cls, 'Frozen', base, **kwargs)
      78      source = specialize_class(cls, 'Source', base, **kwargs)
      79      return frozen, source
      80  
      81  
      82  def test_both(test_class, base=None, **kwargs):
      83      return split_frozen(test_class, base, **kwargs)
      84  
      85  
      86  CASE_INSENSITIVE_FS = True
      87  # Windows is the only OS that is *always* case-insensitive
      88  # (OS X *can* be case-sensitive).
      89  if sys.platform not in ('win32', 'cygwin'):
      90      changed_name = __file__.upper()
      91      if changed_name == __file__:
      92          changed_name = __file__.lower()
      93      if not os.path.exists(changed_name):
      94          CASE_INSENSITIVE_FS = False
      95  
      96  source_importlib = import_importlib('importlib')['Source']
      97  __import__ = {'Frozen': staticmethod(builtins.__import__),
      98                'Source': staticmethod(source_importlib.__import__)}
      99  
     100  
     101  def case_insensitive_tests(test):
     102      """Class decorator that nullifies tests requiring a case-insensitive
     103      file system."""
     104      return unittest.skipIf(not CASE_INSENSITIVE_FS,
     105                              "requires a case-insensitive filesystem")(test)
     106  
     107  
     108  def submodule(parent, name, pkg_dir, content=''):
     109      path = os.path.join(pkg_dir, name + '.py')
     110      with open(path, 'w', encoding='utf-8') as subfile:
     111          subfile.write(content)
     112      return '{}.{}'.format(parent, name), path
     113  
     114  
     115  def get_code_from_pyc(pyc_path):
     116      """Reads a pyc file and returns the unmarshalled code object within.
     117  
     118      No header validation is performed.
     119      """
     120      with open(pyc_path, 'rb') as pyc_f:
     121          pyc_f.seek(16)
     122          return marshal.load(pyc_f)
     123  
     124  
     125  @contextlib.contextmanager
     126  def uncache(*names):
     127      """Uncache a module from sys.modules.
     128  
     129      A basic sanity check is performed to prevent uncaching modules that either
     130      cannot/shouldn't be uncached.
     131  
     132      """
     133      for name in names:
     134          if name in ('sys', 'marshal'):
     135              raise ValueError("cannot uncache {}".format(name))
     136          try:
     137              del sys.modules[name]
     138          except KeyError:
     139              pass
     140      try:
     141          yield
     142      finally:
     143          for name in names:
     144              try:
     145                  del sys.modules[name]
     146              except KeyError:
     147                  pass
     148  
     149  
     150  @contextlib.contextmanager
     151  def temp_module(name, content='', *, pkg=False):
     152      conflicts = [n for n in sys.modules if n.partition('.')[0] == name]
     153      with os_helper.temp_cwd(None) as cwd:
     154          with uncache(name, *conflicts):
     155              with import_helper.DirsOnSysPath(cwd):
     156                  invalidate_caches()
     157  
     158                  location = os.path.join(cwd, name)
     159                  if pkg:
     160                      modpath = os.path.join(location, '__init__.py')
     161                      os.mkdir(name)
     162                  else:
     163                      modpath = location + '.py'
     164                      if content is None:
     165                          # Make sure the module file gets created.
     166                          content = ''
     167                  if content is not None:
     168                      # not a namespace package
     169                      with open(modpath, 'w', encoding='utf-8') as modfile:
     170                          modfile.write(content)
     171                  yield location
     172  
     173  
     174  @contextlib.contextmanager
     175  def import_state(**kwargs):
     176      """Context manager to manage the various importers and stored state in the
     177      sys module.
     178  
     179      The 'modules' attribute is not supported as the interpreter state stores a
     180      pointer to the dict that the interpreter uses internally;
     181      reassigning to sys.modules does not have the desired effect.
     182  
     183      """
     184      originals = {}
     185      try:
     186          for attr, default in (('meta_path', []), ('path', []),
     187                                ('path_hooks', []),
     188                                ('path_importer_cache', {})):
     189              originals[attr] = getattr(sys, attr)
     190              if attr in kwargs:
     191                  new_value = kwargs[attr]
     192                  del kwargs[attr]
     193              else:
     194                  new_value = default
     195              setattr(sys, attr, new_value)
     196          if len(kwargs):
     197              raise ValueError('unrecognized arguments: {}'.format(kwargs))
     198          yield
     199      finally:
     200          for attr, value in originals.items():
     201              setattr(sys, attr, value)
     202  
     203  
     204  class ESC[4;38;5;81m_ImporterMock:
     205  
     206      """Base class to help with creating importer mocks."""
     207  
     208      def __init__(self, *names, module_code={}):
     209          self.modules = {}
     210          self.module_code = {}
     211          for name in names:
     212              if not name.endswith('.__init__'):
     213                  import_name = name
     214              else:
     215                  import_name = name[:-len('.__init__')]
     216              if '.' not in name:
     217                  package = None
     218              elif import_name == name:
     219                  package = name.rsplit('.', 1)[0]
     220              else:
     221                  package = import_name
     222              module = types.ModuleType(import_name)
     223              module.__loader__ = self
     224              module.__file__ = '<mock __file__>'
     225              module.__package__ = package
     226              module.attr = name
     227              if import_name != name:
     228                  module.__path__ = ['<mock __path__>']
     229              self.modules[import_name] = module
     230              if import_name in module_code:
     231                  self.module_code[import_name] = module_code[import_name]
     232  
     233      def __getitem__(self, name):
     234          return self.modules[name]
     235  
     236      def __enter__(self):
     237          self._uncache = uncache(*self.modules.keys())
     238          self._uncache.__enter__()
     239          return self
     240  
     241      def __exit__(self, *exc_info):
     242          self._uncache.__exit__(None, None, None)
     243  
     244  
     245  class ESC[4;38;5;81mmock_spec(ESC[4;38;5;149m_ImporterMock):
     246  
     247      """Importer mock using PEP 451 APIs."""
     248  
     249      def find_spec(self, fullname, path=None, parent=None):
     250          try:
     251              module = self.modules[fullname]
     252          except KeyError:
     253              return None
     254          spec = util.spec_from_file_location(
     255                  fullname, module.__file__, loader=self,
     256                  submodule_search_locations=getattr(module, '__path__', None))
     257          return spec
     258  
     259      def create_module(self, spec):
     260          if spec.name not in self.modules:
     261              raise ImportError
     262          return self.modules[spec.name]
     263  
     264      def exec_module(self, module):
     265          try:
     266              self.module_code[module.__spec__.name]()
     267          except KeyError:
     268              pass
     269  
     270  
     271  def writes_bytecode_files(fxn):
     272      """Decorator to protect sys.dont_write_bytecode from mutation and to skip
     273      tests that require it to be set to False."""
     274      if sys.dont_write_bytecode:
     275          return unittest.skip("relies on writing bytecode")(fxn)
     276      @functools.wraps(fxn)
     277      def wrapper(*args, **kwargs):
     278          original = sys.dont_write_bytecode
     279          sys.dont_write_bytecode = False
     280          try:
     281              to_return = fxn(*args, **kwargs)
     282          finally:
     283              sys.dont_write_bytecode = original
     284          return to_return
     285      return wrapper
     286  
     287  
     288  def ensure_bytecode_path(bytecode_path):
     289      """Ensure that the __pycache__ directory for PEP 3147 pyc file exists.
     290  
     291      :param bytecode_path: File system path to PEP 3147 pyc file.
     292      """
     293      try:
     294          os.mkdir(os.path.dirname(bytecode_path))
     295      except OSError as error:
     296          if error.errno != errno.EEXIST:
     297              raise
     298  
     299  
     300  @contextlib.contextmanager
     301  def temporary_pycache_prefix(prefix):
     302      """Adjust and restore sys.pycache_prefix."""
     303      _orig_prefix = sys.pycache_prefix
     304      sys.pycache_prefix = prefix
     305      try:
     306          yield
     307      finally:
     308          sys.pycache_prefix = _orig_prefix
     309  
     310  
     311  @contextlib.contextmanager
     312  def create_modules(*names):
     313      """Temporarily create each named module with an attribute (named 'attr')
     314      that contains the name passed into the context manager that caused the
     315      creation of the module.
     316  
     317      All files are created in a temporary directory returned by
     318      tempfile.mkdtemp(). This directory is inserted at the beginning of
     319      sys.path. When the context manager exits all created files (source and
     320      bytecode) are explicitly deleted.
     321  
     322      No magic is performed when creating packages! This means that if you create
     323      a module within a package you must also create the package's __init__ as
     324      well.
     325  
     326      """
     327      source = 'attr = {0!r}'
     328      created_paths = []
     329      mapping = {}
     330      state_manager = None
     331      uncache_manager = None
     332      try:
     333          temp_dir = tempfile.mkdtemp()
     334          mapping['.root'] = temp_dir
     335          import_names = set()
     336          for name in names:
     337              if not name.endswith('__init__'):
     338                  import_name = name
     339              else:
     340                  import_name = name[:-len('.__init__')]
     341              import_names.add(import_name)
     342              if import_name in sys.modules:
     343                  del sys.modules[import_name]
     344              name_parts = name.split('.')
     345              file_path = temp_dir
     346              for directory in name_parts[:-1]:
     347                  file_path = os.path.join(file_path, directory)
     348                  if not os.path.exists(file_path):
     349                      os.mkdir(file_path)
     350                      created_paths.append(file_path)
     351              file_path = os.path.join(file_path, name_parts[-1] + '.py')
     352              with open(file_path, 'w', encoding='utf-8') as file:
     353                  file.write(source.format(name))
     354              created_paths.append(file_path)
     355              mapping[name] = file_path
     356          uncache_manager = uncache(*import_names)
     357          uncache_manager.__enter__()
     358          state_manager = import_state(path=[temp_dir])
     359          state_manager.__enter__()
     360          yield mapping
     361      finally:
     362          if state_manager is not None:
     363              state_manager.__exit__(None, None, None)
     364          if uncache_manager is not None:
     365              uncache_manager.__exit__(None, None, None)
     366          os_helper.rmtree(temp_dir)
     367  
     368  
     369  def mock_path_hook(*entries, importer):
     370      """A mock sys.path_hooks entry."""
     371      def hook(entry):
     372          if entry not in entries:
     373              raise ImportError
     374          return importer
     375      return hook
     376  
     377  
     378  class ESC[4;38;5;81mCASEOKTestBase:
     379  
     380      def caseok_env_changed(self, *, should_exist):
     381          possibilities = b'PYTHONCASEOK', 'PYTHONCASEOK'
     382          if any(x in self.importlib._bootstrap_external._os.environ
     383                      for x in possibilities) != should_exist:
     384              self.skipTest('os.environ changes not reflected in _os.environ')