1  from test.test_importlib import util
       2  
       3  abc = util.import_importlib('importlib.abc')
       4  init = util.import_importlib('importlib')
       5  machinery = util.import_importlib('importlib.machinery')
       6  importlib_util = util.import_importlib('importlib.util')
       7  
       8  import importlib.util
       9  import os
      10  import pathlib
      11  import re
      12  import string
      13  import sys
      14  from test import support
      15  import textwrap
      16  import types
      17  import unittest
      18  import unittest.mock
      19  import warnings
      20  
      21  try:
      22      import _testsinglephase
      23  except ImportError:
      24      _testsinglephase = None
      25  try:
      26      import _testmultiphase
      27  except ImportError:
      28      _testmultiphase = None
      29  try:
      30      import _xxsubinterpreters as _interpreters
      31  except ModuleNotFoundError:
      32      _interpreters = None
      33  
      34  
      35  class ESC[4;38;5;81mDecodeSourceBytesTests:
      36  
      37      source = "string ='ΓΌ'"
      38  
      39      def test_ut8_default(self):
      40          source_bytes = self.source.encode('utf-8')
      41          self.assertEqual(self.util.decode_source(source_bytes), self.source)
      42  
      43      def test_specified_encoding(self):
      44          source = '# coding=latin-1\n' + self.source
      45          source_bytes = source.encode('latin-1')
      46          assert source_bytes != source.encode('utf-8')
      47          self.assertEqual(self.util.decode_source(source_bytes), source)
      48  
      49      def test_universal_newlines(self):
      50          source = '\r\n'.join([self.source, self.source])
      51          source_bytes = source.encode('utf-8')
      52          self.assertEqual(self.util.decode_source(source_bytes),
      53                           '\n'.join([self.source, self.source]))
      54  
      55  
      56  (Frozen_DecodeSourceBytesTests,
      57   Source_DecodeSourceBytesTests
      58   ) = util.test_both(DecodeSourceBytesTests, util=importlib_util)
      59  
      60  
      61  class ESC[4;38;5;81mModuleFromSpecTests:
      62  
      63      def test_no_create_module(self):
      64          class ESC[4;38;5;81mLoader:
      65              def exec_module(self, module):
      66                  pass
      67          spec = self.machinery.ModuleSpec('test', Loader())
      68          with self.assertRaises(ImportError):
      69              module = self.util.module_from_spec(spec)
      70  
      71      def test_create_module_returns_None(self):
      72          class ESC[4;38;5;81mLoader(ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149mabcESC[4;38;5;149m.ESC[4;38;5;149mLoader):
      73              def create_module(self, spec):
      74                  return None
      75          spec = self.machinery.ModuleSpec('test', Loader())
      76          module = self.util.module_from_spec(spec)
      77          self.assertIsInstance(module, types.ModuleType)
      78          self.assertEqual(module.__name__, spec.name)
      79  
      80      def test_create_module(self):
      81          name = 'already set'
      82          class ESC[4;38;5;81mCustomModule(ESC[4;38;5;149mtypesESC[4;38;5;149m.ESC[4;38;5;149mModuleType):
      83              pass
      84          class ESC[4;38;5;81mLoader(ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149mabcESC[4;38;5;149m.ESC[4;38;5;149mLoader):
      85              def create_module(self, spec):
      86                  module = CustomModule(spec.name)
      87                  module.__name__ = name
      88                  return module
      89          spec = self.machinery.ModuleSpec('test', Loader())
      90          module = self.util.module_from_spec(spec)
      91          self.assertIsInstance(module, CustomModule)
      92          self.assertEqual(module.__name__, name)
      93  
      94      def test___name__(self):
      95          spec = self.machinery.ModuleSpec('test', object())
      96          module = self.util.module_from_spec(spec)
      97          self.assertEqual(module.__name__, spec.name)
      98  
      99      def test___spec__(self):
     100          spec = self.machinery.ModuleSpec('test', object())
     101          module = self.util.module_from_spec(spec)
     102          self.assertEqual(module.__spec__, spec)
     103  
     104      def test___loader__(self):
     105          loader = object()
     106          spec = self.machinery.ModuleSpec('test', loader)
     107          module = self.util.module_from_spec(spec)
     108          self.assertIs(module.__loader__, loader)
     109  
     110      def test___package__(self):
     111          spec = self.machinery.ModuleSpec('test.pkg', object())
     112          module = self.util.module_from_spec(spec)
     113          self.assertEqual(module.__package__, spec.parent)
     114  
     115      def test___path__(self):
     116          spec = self.machinery.ModuleSpec('test', object(), is_package=True)
     117          module = self.util.module_from_spec(spec)
     118          self.assertEqual(module.__path__, spec.submodule_search_locations)
     119  
     120      def test___file__(self):
     121          spec = self.machinery.ModuleSpec('test', object(), origin='some/path')
     122          spec.has_location = True
     123          module = self.util.module_from_spec(spec)
     124          self.assertEqual(module.__file__, spec.origin)
     125  
     126      def test___cached__(self):
     127          spec = self.machinery.ModuleSpec('test', object())
     128          spec.cached = 'some/path'
     129          spec.has_location = True
     130          module = self.util.module_from_spec(spec)
     131          self.assertEqual(module.__cached__, spec.cached)
     132  
     133  (Frozen_ModuleFromSpecTests,
     134   Source_ModuleFromSpecTests
     135  ) = util.test_both(ModuleFromSpecTests, abc=abc, machinery=machinery,
     136                     util=importlib_util)
     137  
     138  
     139  class ESC[4;38;5;81mResolveNameTests:
     140  
     141      """Tests importlib.util.resolve_name()."""
     142  
     143      def test_absolute(self):
     144          # bacon
     145          self.assertEqual('bacon', self.util.resolve_name('bacon', None))
     146  
     147      def test_absolute_within_package(self):
     148          # bacon in spam
     149          self.assertEqual('bacon', self.util.resolve_name('bacon', 'spam'))
     150  
     151      def test_no_package(self):
     152          # .bacon in ''
     153          with self.assertRaises(ImportError):
     154              self.util.resolve_name('.bacon', '')
     155  
     156      def test_in_package(self):
     157          # .bacon in spam
     158          self.assertEqual('spam.eggs.bacon',
     159                           self.util.resolve_name('.bacon', 'spam.eggs'))
     160  
     161      def test_other_package(self):
     162          # ..bacon in spam.bacon
     163          self.assertEqual('spam.bacon',
     164                           self.util.resolve_name('..bacon', 'spam.eggs'))
     165  
     166      def test_escape(self):
     167          # ..bacon in spam
     168          with self.assertRaises(ImportError):
     169              self.util.resolve_name('..bacon', 'spam')
     170  
     171  
     172  (Frozen_ResolveNameTests,
     173   Source_ResolveNameTests
     174   ) = util.test_both(ResolveNameTests, util=importlib_util)
     175  
     176  
     177  class ESC[4;38;5;81mFindSpecTests:
     178  
     179      class ESC[4;38;5;81mFakeMetaFinder:
     180          @staticmethod
     181          def find_spec(name, path=None, target=None): return name, path, target
     182  
     183      def test_sys_modules(self):
     184          name = 'some_mod'
     185          with util.uncache(name):
     186              module = types.ModuleType(name)
     187              loader = 'a loader!'
     188              spec = self.machinery.ModuleSpec(name, loader)
     189              module.__loader__ = loader
     190              module.__spec__ = spec
     191              sys.modules[name] = module
     192              found = self.util.find_spec(name)
     193              self.assertEqual(found, spec)
     194  
     195      def test_sys_modules_without___loader__(self):
     196          name = 'some_mod'
     197          with util.uncache(name):
     198              module = types.ModuleType(name)
     199              del module.__loader__
     200              loader = 'a loader!'
     201              spec = self.machinery.ModuleSpec(name, loader)
     202              module.__spec__ = spec
     203              sys.modules[name] = module
     204              found = self.util.find_spec(name)
     205              self.assertEqual(found, spec)
     206  
     207      def test_sys_modules_spec_is_None(self):
     208          name = 'some_mod'
     209          with util.uncache(name):
     210              module = types.ModuleType(name)
     211              module.__spec__ = None
     212              sys.modules[name] = module
     213              with self.assertRaises(ValueError):
     214                  self.util.find_spec(name)
     215  
     216      def test_sys_modules_loader_is_None(self):
     217          name = 'some_mod'
     218          with util.uncache(name):
     219              module = types.ModuleType(name)
     220              spec = self.machinery.ModuleSpec(name, None)
     221              module.__spec__ = spec
     222              sys.modules[name] = module
     223              found = self.util.find_spec(name)
     224              self.assertEqual(found, spec)
     225  
     226      def test_sys_modules_spec_is_not_set(self):
     227          name = 'some_mod'
     228          with util.uncache(name):
     229              module = types.ModuleType(name)
     230              try:
     231                  del module.__spec__
     232              except AttributeError:
     233                  pass
     234              sys.modules[name] = module
     235              with self.assertRaises(ValueError):
     236                  self.util.find_spec(name)
     237  
     238      def test_success(self):
     239          name = 'some_mod'
     240          with util.uncache(name):
     241              with util.import_state(meta_path=[self.FakeMetaFinder]):
     242                  self.assertEqual((name, None, None),
     243                                   self.util.find_spec(name))
     244  
     245      def test_nothing(self):
     246          # None is returned upon failure to find a loader.
     247          self.assertIsNone(self.util.find_spec('nevergoingtofindthismodule'))
     248  
     249      def test_find_submodule(self):
     250          name = 'spam'
     251          subname = 'ham'
     252          with util.temp_module(name, pkg=True) as pkg_dir:
     253              fullname, _ = util.submodule(name, subname, pkg_dir)
     254              spec = self.util.find_spec(fullname)
     255              self.assertIsNot(spec, None)
     256              self.assertIn(name, sorted(sys.modules))
     257              self.assertNotIn(fullname, sorted(sys.modules))
     258              # Ensure successive calls behave the same.
     259              spec_again = self.util.find_spec(fullname)
     260              self.assertEqual(spec_again, spec)
     261  
     262      def test_find_submodule_parent_already_imported(self):
     263          name = 'spam'
     264          subname = 'ham'
     265          with util.temp_module(name, pkg=True) as pkg_dir:
     266              self.init.import_module(name)
     267              fullname, _ = util.submodule(name, subname, pkg_dir)
     268              spec = self.util.find_spec(fullname)
     269              self.assertIsNot(spec, None)
     270              self.assertIn(name, sorted(sys.modules))
     271              self.assertNotIn(fullname, sorted(sys.modules))
     272              # Ensure successive calls behave the same.
     273              spec_again = self.util.find_spec(fullname)
     274              self.assertEqual(spec_again, spec)
     275  
     276      def test_find_relative_module(self):
     277          name = 'spam'
     278          subname = 'ham'
     279          with util.temp_module(name, pkg=True) as pkg_dir:
     280              fullname, _ = util.submodule(name, subname, pkg_dir)
     281              relname = '.' + subname
     282              spec = self.util.find_spec(relname, name)
     283              self.assertIsNot(spec, None)
     284              self.assertIn(name, sorted(sys.modules))
     285              self.assertNotIn(fullname, sorted(sys.modules))
     286              # Ensure successive calls behave the same.
     287              spec_again = self.util.find_spec(fullname)
     288              self.assertEqual(spec_again, spec)
     289  
     290      def test_find_relative_module_missing_package(self):
     291          name = 'spam'
     292          subname = 'ham'
     293          with util.temp_module(name, pkg=True) as pkg_dir:
     294              fullname, _ = util.submodule(name, subname, pkg_dir)
     295              relname = '.' + subname
     296              with self.assertRaises(ImportError):
     297                  self.util.find_spec(relname)
     298              self.assertNotIn(name, sorted(sys.modules))
     299              self.assertNotIn(fullname, sorted(sys.modules))
     300  
     301      def test_find_submodule_in_module(self):
     302          # ModuleNotFoundError raised when a module is specified as
     303          # a parent instead of a package.
     304          with self.assertRaises(ModuleNotFoundError):
     305              self.util.find_spec('module.name')
     306  
     307  
     308  (Frozen_FindSpecTests,
     309   Source_FindSpecTests
     310   ) = util.test_both(FindSpecTests, init=init, util=importlib_util,
     311                           machinery=machinery)
     312  
     313  
     314  class ESC[4;38;5;81mMagicNumberTests:
     315  
     316      def test_length(self):
     317          # Should be 4 bytes.
     318          self.assertEqual(len(self.util.MAGIC_NUMBER), 4)
     319  
     320      def test_incorporates_rn(self):
     321          # The magic number uses \r\n to come out wrong when splitting on lines.
     322          self.assertTrue(self.util.MAGIC_NUMBER.endswith(b'\r\n'))
     323  
     324  
     325  (Frozen_MagicNumberTests,
     326   Source_MagicNumberTests
     327   ) = util.test_both(MagicNumberTests, util=importlib_util)
     328  
     329  
     330  class ESC[4;38;5;81mPEP3147Tests:
     331  
     332      """Tests of PEP 3147-related functions: cache_from_source and source_from_cache."""
     333  
     334      tag = sys.implementation.cache_tag
     335  
     336      @unittest.skipIf(sys.implementation.cache_tag is None,
     337                       'requires sys.implementation.cache_tag not be None')
     338      def test_cache_from_source(self):
     339          # Given the path to a .py file, return the path to its PEP 3147
     340          # defined .pyc file (i.e. under __pycache__).
     341          path = os.path.join('foo', 'bar', 'baz', 'qux.py')
     342          expect = os.path.join('foo', 'bar', 'baz', '__pycache__',
     343                                'qux.{}.pyc'.format(self.tag))
     344          self.assertEqual(self.util.cache_from_source(path, optimization=''),
     345                           expect)
     346  
     347      def test_cache_from_source_no_cache_tag(self):
     348          # No cache tag means NotImplementedError.
     349          with support.swap_attr(sys.implementation, 'cache_tag', None):
     350              with self.assertRaises(NotImplementedError):
     351                  self.util.cache_from_source('whatever.py')
     352  
     353      def test_cache_from_source_no_dot(self):
     354          # Directory with a dot, filename without dot.
     355          path = os.path.join('foo.bar', 'file')
     356          expect = os.path.join('foo.bar', '__pycache__',
     357                                'file{}.pyc'.format(self.tag))
     358          self.assertEqual(self.util.cache_from_source(path, optimization=''),
     359                           expect)
     360  
     361      def test_cache_from_source_debug_override(self):
     362          # Given the path to a .py file, return the path to its PEP 3147/PEP 488
     363          # defined .pyc file (i.e. under __pycache__).
     364          path = os.path.join('foo', 'bar', 'baz', 'qux.py')
     365          with warnings.catch_warnings():
     366              warnings.simplefilter('ignore')
     367              self.assertEqual(self.util.cache_from_source(path, False),
     368                               self.util.cache_from_source(path, optimization=1))
     369              self.assertEqual(self.util.cache_from_source(path, True),
     370                               self.util.cache_from_source(path, optimization=''))
     371          with warnings.catch_warnings():
     372              warnings.simplefilter('error')
     373              with self.assertRaises(DeprecationWarning):
     374                  self.util.cache_from_source(path, False)
     375              with self.assertRaises(DeprecationWarning):
     376                  self.util.cache_from_source(path, True)
     377  
     378      def test_cache_from_source_cwd(self):
     379          path = 'foo.py'
     380          expect = os.path.join('__pycache__', 'foo.{}.pyc'.format(self.tag))
     381          self.assertEqual(self.util.cache_from_source(path, optimization=''),
     382                           expect)
     383  
     384      def test_cache_from_source_override(self):
     385          # When debug_override is not None, it can be any true-ish or false-ish
     386          # value.
     387          path = os.path.join('foo', 'bar', 'baz.py')
     388          # However if the bool-ishness can't be determined, the exception
     389          # propagates.
     390          class ESC[4;38;5;81mBearish:
     391              def __bool__(self): raise RuntimeError
     392          with warnings.catch_warnings():
     393              warnings.simplefilter('ignore')
     394              self.assertEqual(self.util.cache_from_source(path, []),
     395                               self.util.cache_from_source(path, optimization=1))
     396              self.assertEqual(self.util.cache_from_source(path, [17]),
     397                               self.util.cache_from_source(path, optimization=''))
     398              with self.assertRaises(RuntimeError):
     399                  self.util.cache_from_source('/foo/bar/baz.py', Bearish())
     400  
     401  
     402      def test_cache_from_source_optimization_empty_string(self):
     403          # Setting 'optimization' to '' leads to no optimization tag (PEP 488).
     404          path = 'foo.py'
     405          expect = os.path.join('__pycache__', 'foo.{}.pyc'.format(self.tag))
     406          self.assertEqual(self.util.cache_from_source(path, optimization=''),
     407                           expect)
     408  
     409      def test_cache_from_source_optimization_None(self):
     410          # Setting 'optimization' to None uses the interpreter's optimization.
     411          # (PEP 488)
     412          path = 'foo.py'
     413          optimization_level = sys.flags.optimize
     414          almost_expect = os.path.join('__pycache__', 'foo.{}'.format(self.tag))
     415          if optimization_level == 0:
     416              expect = almost_expect + '.pyc'
     417          elif optimization_level <= 2:
     418              expect = almost_expect + '.opt-{}.pyc'.format(optimization_level)
     419          else:
     420              msg = '{!r} is a non-standard optimization level'.format(optimization_level)
     421              self.skipTest(msg)
     422          self.assertEqual(self.util.cache_from_source(path, optimization=None),
     423                           expect)
     424  
     425      def test_cache_from_source_optimization_set(self):
     426          # The 'optimization' parameter accepts anything that has a string repr
     427          # that passes str.alnum().
     428          path = 'foo.py'
     429          valid_characters = string.ascii_letters + string.digits
     430          almost_expect = os.path.join('__pycache__', 'foo.{}'.format(self.tag))
     431          got = self.util.cache_from_source(path, optimization=valid_characters)
     432          # Test all valid characters are accepted.
     433          self.assertEqual(got,
     434                           almost_expect + '.opt-{}.pyc'.format(valid_characters))
     435          # str() should be called on argument.
     436          self.assertEqual(self.util.cache_from_source(path, optimization=42),
     437                           almost_expect + '.opt-42.pyc')
     438          # Invalid characters raise ValueError.
     439          with self.assertRaises(ValueError):
     440              self.util.cache_from_source(path, optimization='path/is/bad')
     441  
     442      def test_cache_from_source_debug_override_optimization_both_set(self):
     443          # Can only set one of the optimization-related parameters.
     444          with warnings.catch_warnings():
     445              warnings.simplefilter('ignore')
     446              with self.assertRaises(TypeError):
     447                  self.util.cache_from_source('foo.py', False, optimization='')
     448  
     449      @unittest.skipUnless(os.sep == '\\' and os.altsep == '/',
     450                       'test meaningful only where os.altsep is defined')
     451      def test_sep_altsep_and_sep_cache_from_source(self):
     452          # Windows path and PEP 3147 where sep is right of altsep.
     453          self.assertEqual(
     454              self.util.cache_from_source('\\foo\\bar\\baz/qux.py', optimization=''),
     455              '\\foo\\bar\\baz\\__pycache__\\qux.{}.pyc'.format(self.tag))
     456  
     457      @unittest.skipIf(sys.implementation.cache_tag is None,
     458                       'requires sys.implementation.cache_tag not be None')
     459      def test_cache_from_source_path_like_arg(self):
     460          path = pathlib.PurePath('foo', 'bar', 'baz', 'qux.py')
     461          expect = os.path.join('foo', 'bar', 'baz', '__pycache__',
     462                                'qux.{}.pyc'.format(self.tag))
     463          self.assertEqual(self.util.cache_from_source(path, optimization=''),
     464                           expect)
     465  
     466      @unittest.skipIf(sys.implementation.cache_tag is None,
     467                       'requires sys.implementation.cache_tag to not be None')
     468      def test_source_from_cache(self):
     469          # Given the path to a PEP 3147 defined .pyc file, return the path to
     470          # its source.  This tests the good path.
     471          path = os.path.join('foo', 'bar', 'baz', '__pycache__',
     472                              'qux.{}.pyc'.format(self.tag))
     473          expect = os.path.join('foo', 'bar', 'baz', 'qux.py')
     474          self.assertEqual(self.util.source_from_cache(path), expect)
     475  
     476      def test_source_from_cache_no_cache_tag(self):
     477          # If sys.implementation.cache_tag is None, raise NotImplementedError.
     478          path = os.path.join('blah', '__pycache__', 'whatever.pyc')
     479          with support.swap_attr(sys.implementation, 'cache_tag', None):
     480              with self.assertRaises(NotImplementedError):
     481                  self.util.source_from_cache(path)
     482  
     483      def test_source_from_cache_bad_path(self):
     484          # When the path to a pyc file is not in PEP 3147 format, a ValueError
     485          # is raised.
     486          self.assertRaises(
     487              ValueError, self.util.source_from_cache, '/foo/bar/bazqux.pyc')
     488  
     489      def test_source_from_cache_no_slash(self):
     490          # No slashes at all in path -> ValueError
     491          self.assertRaises(
     492              ValueError, self.util.source_from_cache, 'foo.cpython-32.pyc')
     493  
     494      def test_source_from_cache_too_few_dots(self):
     495          # Too few dots in final path component -> ValueError
     496          self.assertRaises(
     497              ValueError, self.util.source_from_cache, '__pycache__/foo.pyc')
     498  
     499      def test_source_from_cache_too_many_dots(self):
     500          with self.assertRaises(ValueError):
     501              self.util.source_from_cache(
     502                      '__pycache__/foo.cpython-32.opt-1.foo.pyc')
     503  
     504      def test_source_from_cache_not_opt(self):
     505          # Non-`opt-` path component -> ValueError
     506          self.assertRaises(
     507              ValueError, self.util.source_from_cache,
     508              '__pycache__/foo.cpython-32.foo.pyc')
     509  
     510      def test_source_from_cache_no__pycache__(self):
     511          # Another problem with the path -> ValueError
     512          self.assertRaises(
     513              ValueError, self.util.source_from_cache,
     514              '/foo/bar/foo.cpython-32.foo.pyc')
     515  
     516      def test_source_from_cache_optimized_bytecode(self):
     517          # Optimized bytecode is not an issue.
     518          path = os.path.join('__pycache__', 'foo.{}.opt-1.pyc'.format(self.tag))
     519          self.assertEqual(self.util.source_from_cache(path), 'foo.py')
     520  
     521      def test_source_from_cache_missing_optimization(self):
     522          # An empty optimization level is a no-no.
     523          path = os.path.join('__pycache__', 'foo.{}.opt-.pyc'.format(self.tag))
     524          with self.assertRaises(ValueError):
     525              self.util.source_from_cache(path)
     526  
     527      @unittest.skipIf(sys.implementation.cache_tag is None,
     528                       'requires sys.implementation.cache_tag to not be None')
     529      def test_source_from_cache_path_like_arg(self):
     530          path = pathlib.PurePath('foo', 'bar', 'baz', '__pycache__',
     531                                  'qux.{}.pyc'.format(self.tag))
     532          expect = os.path.join('foo', 'bar', 'baz', 'qux.py')
     533          self.assertEqual(self.util.source_from_cache(path), expect)
     534  
     535      @unittest.skipIf(sys.implementation.cache_tag is None,
     536                       'requires sys.implementation.cache_tag to not be None')
     537      def test_cache_from_source_respects_pycache_prefix(self):
     538          # If pycache_prefix is set, cache_from_source will return a bytecode
     539          # path inside that directory (in a subdirectory mirroring the .py file's
     540          # path) rather than in a __pycache__ dir next to the py file.
     541          pycache_prefixes = [
     542              os.path.join(os.path.sep, 'tmp', 'bytecode'),
     543              os.path.join(os.path.sep, 'tmp', '\u2603'),  # non-ASCII in path!
     544              os.path.join(os.path.sep, 'tmp', 'trailing-slash') + os.path.sep,
     545          ]
     546          drive = ''
     547          if os.name == 'nt':
     548              drive = 'C:'
     549              pycache_prefixes = [
     550                  f'{drive}{prefix}' for prefix in pycache_prefixes]
     551              pycache_prefixes += [r'\\?\C:\foo', r'\\localhost\c$\bar']
     552          for pycache_prefix in pycache_prefixes:
     553              with self.subTest(path=pycache_prefix):
     554                  path = drive + os.path.join(
     555                      os.path.sep, 'foo', 'bar', 'baz', 'qux.py')
     556                  expect = os.path.join(
     557                      pycache_prefix, 'foo', 'bar', 'baz',
     558                      'qux.{}.pyc'.format(self.tag))
     559                  with util.temporary_pycache_prefix(pycache_prefix):
     560                      self.assertEqual(
     561                          self.util.cache_from_source(path, optimization=''),
     562                          expect)
     563  
     564      @unittest.skipIf(sys.implementation.cache_tag is None,
     565                       'requires sys.implementation.cache_tag to not be None')
     566      def test_cache_from_source_respects_pycache_prefix_relative(self):
     567          # If the .py path we are given is relative, we will resolve to an
     568          # absolute path before prefixing with pycache_prefix, to avoid any
     569          # possible ambiguity.
     570          pycache_prefix = os.path.join(os.path.sep, 'tmp', 'bytecode')
     571          path = os.path.join('foo', 'bar', 'baz', 'qux.py')
     572          root = os.path.splitdrive(os.getcwd())[0] + os.path.sep
     573          expect = os.path.join(
     574              pycache_prefix,
     575              os.path.relpath(os.getcwd(), root),
     576              'foo', 'bar', 'baz', f'qux.{self.tag}.pyc')
     577          with util.temporary_pycache_prefix(pycache_prefix):
     578              self.assertEqual(
     579                  self.util.cache_from_source(path, optimization=''),
     580                  expect)
     581  
     582      @unittest.skipIf(sys.implementation.cache_tag is None,
     583                       'requires sys.implementation.cache_tag to not be None')
     584      def test_source_from_cache_inside_pycache_prefix(self):
     585          # If pycache_prefix is set and the cache path we get is inside it,
     586          # we return an absolute path to the py file based on the remainder of
     587          # the path within pycache_prefix.
     588          pycache_prefix = os.path.join(os.path.sep, 'tmp', 'bytecode')
     589          path = os.path.join(pycache_prefix, 'foo', 'bar', 'baz',
     590                              f'qux.{self.tag}.pyc')
     591          expect = os.path.join(os.path.sep, 'foo', 'bar', 'baz', 'qux.py')
     592          with util.temporary_pycache_prefix(pycache_prefix):
     593              self.assertEqual(self.util.source_from_cache(path), expect)
     594  
     595      @unittest.skipIf(sys.implementation.cache_tag is None,
     596                       'requires sys.implementation.cache_tag to not be None')
     597      def test_source_from_cache_outside_pycache_prefix(self):
     598          # If pycache_prefix is set but the cache path we get is not inside
     599          # it, just ignore it and handle the cache path according to the default
     600          # behavior.
     601          pycache_prefix = os.path.join(os.path.sep, 'tmp', 'bytecode')
     602          path = os.path.join('foo', 'bar', 'baz', '__pycache__',
     603                              f'qux.{self.tag}.pyc')
     604          expect = os.path.join('foo', 'bar', 'baz', 'qux.py')
     605          with util.temporary_pycache_prefix(pycache_prefix):
     606              self.assertEqual(self.util.source_from_cache(path), expect)
     607  
     608  
     609  (Frozen_PEP3147Tests,
     610   Source_PEP3147Tests
     611   ) = util.test_both(PEP3147Tests, util=importlib_util)
     612  
     613  
     614  class ESC[4;38;5;81mMagicNumberTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     615      """
     616      Test release compatibility issues relating to importlib
     617      """
     618      @unittest.skipUnless(
     619          sys.version_info.releaselevel in ('candidate', 'final'),
     620          'only applies to candidate or final python release levels'
     621      )
     622      def test_magic_number(self):
     623          # Each python minor release should generally have a MAGIC_NUMBER
     624          # that does not change once the release reaches candidate status.
     625  
     626          # Once a release reaches candidate status, the value of the constant
     627          # EXPECTED_MAGIC_NUMBER in this test should be changed.
     628          # This test will then check that the actual MAGIC_NUMBER matches
     629          # the expected value for the release.
     630  
     631          # In exceptional cases, it may be required to change the MAGIC_NUMBER
     632          # for a maintenance release. In this case the change should be
     633          # discussed in python-dev. If a change is required, community
     634          # stakeholders such as OS package maintainers must be notified
     635          # in advance. Such exceptional releases will then require an
     636          # adjustment to this test case.
     637          EXPECTED_MAGIC_NUMBER = 3531
     638          actual = int.from_bytes(importlib.util.MAGIC_NUMBER[:2], 'little')
     639  
     640          msg = (
     641              "To avoid breaking backwards compatibility with cached bytecode "
     642              "files that can't be automatically regenerated by the current "
     643              "user, candidate and final releases require the current  "
     644              "importlib.util.MAGIC_NUMBER to match the expected "
     645              "magic number in this test. Set the expected "
     646              "magic number in this test to the current MAGIC_NUMBER to "
     647              "continue with the release.\n\n"
     648              "Changing the MAGIC_NUMBER for a maintenance release "
     649              "requires discussion in python-dev and notification of "
     650              "community stakeholders."
     651          )
     652          self.assertEqual(EXPECTED_MAGIC_NUMBER, actual, msg)
     653  
     654  
     655  @unittest.skipIf(_interpreters is None, 'subinterpreters required')
     656  class ESC[4;38;5;81mIncompatibleExtensionModuleRestrictionsTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     657  
     658      ERROR = re.compile("^<class 'ImportError'>: module (.*) does not support loading in subinterpreters")
     659  
     660      def run_with_own_gil(self, script):
     661          interpid = _interpreters.create(isolated=True)
     662          try:
     663              _interpreters.run_string(interpid, script)
     664          except _interpreters.RunFailedError as exc:
     665              if m := self.ERROR.match(str(exc)):
     666                  modname, = m.groups()
     667                  raise ImportError(modname)
     668  
     669      def run_with_shared_gil(self, script):
     670          interpid = _interpreters.create(isolated=False)
     671          try:
     672              _interpreters.run_string(interpid, script)
     673          except _interpreters.RunFailedError as exc:
     674              if m := self.ERROR.match(str(exc)):
     675                  modname, = m.groups()
     676                  raise ImportError(modname)
     677  
     678      @unittest.skipIf(_testsinglephase is None, "test requires _testsinglephase module")
     679      def test_single_phase_init_module(self):
     680          script = textwrap.dedent('''
     681              from importlib.util import _incompatible_extension_module_restrictions
     682              with _incompatible_extension_module_restrictions(disable_check=True):
     683                  import _testsinglephase
     684              ''')
     685          with self.subTest('check disabled, shared GIL'):
     686              self.run_with_shared_gil(script)
     687          with self.subTest('check disabled, per-interpreter GIL'):
     688              self.run_with_own_gil(script)
     689  
     690          script = textwrap.dedent(f'''
     691              from importlib.util import _incompatible_extension_module_restrictions
     692              with _incompatible_extension_module_restrictions(disable_check=False):
     693                  import _testsinglephase
     694              ''')
     695          with self.subTest('check enabled, shared GIL'):
     696              with self.assertRaises(ImportError):
     697                  self.run_with_shared_gil(script)
     698          with self.subTest('check enabled, per-interpreter GIL'):
     699              with self.assertRaises(ImportError):
     700                  self.run_with_own_gil(script)
     701  
     702      @unittest.skipIf(_testmultiphase is None, "test requires _testmultiphase module")
     703      def test_incomplete_multi_phase_init_module(self):
     704          prescript = textwrap.dedent(f'''
     705              from importlib.util import spec_from_loader, module_from_spec
     706              from importlib.machinery import ExtensionFileLoader
     707  
     708              name = '_test_shared_gil_only'
     709              filename = {_testmultiphase.__file__!r}
     710              loader = ExtensionFileLoader(name, filename)
     711              spec = spec_from_loader(name, loader)
     712  
     713              ''')
     714  
     715          script = prescript + textwrap.dedent('''
     716              from importlib.util import _incompatible_extension_module_restrictions
     717              with _incompatible_extension_module_restrictions(disable_check=True):
     718                  module = module_from_spec(spec)
     719                  loader.exec_module(module)
     720              ''')
     721          with self.subTest('check disabled, shared GIL'):
     722              self.run_with_shared_gil(script)
     723          with self.subTest('check disabled, per-interpreter GIL'):
     724              self.run_with_own_gil(script)
     725  
     726          script = prescript + textwrap.dedent('''
     727              from importlib.util import _incompatible_extension_module_restrictions
     728              with _incompatible_extension_module_restrictions(disable_check=False):
     729                  module = module_from_spec(spec)
     730                  loader.exec_module(module)
     731              ''')
     732          with self.subTest('check enabled, shared GIL'):
     733              self.run_with_shared_gil(script)
     734          with self.subTest('check enabled, per-interpreter GIL'):
     735              with self.assertRaises(ImportError):
     736                  self.run_with_own_gil(script)
     737  
     738      @unittest.skipIf(_testmultiphase is None, "test requires _testmultiphase module")
     739      def test_complete_multi_phase_init_module(self):
     740          script = textwrap.dedent('''
     741              from importlib.util import _incompatible_extension_module_restrictions
     742              with _incompatible_extension_module_restrictions(disable_check=True):
     743                  import _testmultiphase
     744              ''')
     745          with self.subTest('check disabled, shared GIL'):
     746              self.run_with_shared_gil(script)
     747          with self.subTest('check disabled, per-interpreter GIL'):
     748              self.run_with_own_gil(script)
     749  
     750          script = textwrap.dedent(f'''
     751              from importlib.util import _incompatible_extension_module_restrictions
     752              with _incompatible_extension_module_restrictions(disable_check=False):
     753                  import _testmultiphase
     754              ''')
     755          with self.subTest('check enabled, shared GIL'):
     756              self.run_with_shared_gil(script)
     757          with self.subTest('check enabled, per-interpreter GIL'):
     758              self.run_with_own_gil(script)
     759  
     760  
     761  if __name__ == '__main__':
     762      unittest.main()