(root)/
Python-3.11.7/
Lib/
test/
test_importlib/
test_threaded_import.py
       1  # This is a variant of the very old (early 90's) file
       2  # Demo/threads/bug.py.  It simply provokes a number of threads into
       3  # trying to import the same module "at the same time".
       4  # There are no pleasant failure modes -- most likely is that Python
       5  # complains several times about module random having no attribute
       6  # randrange, and then Python hangs.
       7  
       8  import _imp as imp
       9  import os
      10  import importlib
      11  import sys
      12  import time
      13  import shutil
      14  import threading
      15  import unittest
      16  from unittest import mock
      17  from test.support import verbose
      18  from test.support.import_helper import forget
      19  from test.support.os_helper import (TESTFN, unlink, rmtree)
      20  from test.support import script_helper, threading_helper
      21  
      22  threading_helper.requires_working_threading(module=True)
      23  
      24  def task(N, done, done_tasks, errors):
      25      try:
      26          # We don't use modulefinder but still import it in order to stress
      27          # importing of different modules from several threads.
      28          if len(done_tasks) % 2:
      29              import modulefinder
      30              import random
      31          else:
      32              import random
      33              import modulefinder
      34          # This will fail if random is not completely initialized
      35          x = random.randrange(1, 3)
      36      except Exception as e:
      37          errors.append(e.with_traceback(None))
      38      finally:
      39          done_tasks.append(threading.get_ident())
      40          finished = len(done_tasks) == N
      41          if finished:
      42              done.set()
      43  
      44  def mock_register_at_fork(func):
      45      # bpo-30599: Mock os.register_at_fork() when importing the random module,
      46      # since this function doesn't allow to unregister callbacks and would leak
      47      # memory.
      48      return mock.patch('os.register_at_fork', create=True)(func)
      49  
      50  # Create a circular import structure: A -> C -> B -> D -> A
      51  # NOTE: `time` is already loaded and therefore doesn't threaten to deadlock.
      52  
      53  circular_imports_modules = {
      54      'A': """if 1:
      55          import time
      56          time.sleep(%(delay)s)
      57          x = 'a'
      58          import C
      59          """,
      60      'B': """if 1:
      61          import time
      62          time.sleep(%(delay)s)
      63          x = 'b'
      64          import D
      65          """,
      66      'C': """import B""",
      67      'D': """import A""",
      68  }
      69  
      70  class ESC[4;38;5;81mFinder:
      71      """A dummy finder to detect concurrent access to its find_spec()
      72      method."""
      73  
      74      def __init__(self):
      75          self.numcalls = 0
      76          self.x = 0
      77          self.lock = threading.Lock()
      78  
      79      def find_spec(self, name, path=None, target=None):
      80          # Simulate some thread-unsafe behaviour. If calls to find_spec()
      81          # are properly serialized, `x` will end up the same as `numcalls`.
      82          # Otherwise not.
      83          assert imp.lock_held()
      84          with self.lock:
      85              self.numcalls += 1
      86          x = self.x
      87          time.sleep(0.01)
      88          self.x = x + 1
      89  
      90  class ESC[4;38;5;81mFlushingFinder:
      91      """A dummy finder which flushes sys.path_importer_cache when it gets
      92      called."""
      93  
      94      def find_spec(self, name, path=None, target=None):
      95          sys.path_importer_cache.clear()
      96  
      97  
      98  class ESC[4;38;5;81mThreadedImportTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      99  
     100      def setUp(self):
     101          self.old_random = sys.modules.pop('random', None)
     102  
     103      def tearDown(self):
     104          # If the `random` module was already initialized, we restore the
     105          # old module at the end so that pickling tests don't fail.
     106          # See http://bugs.python.org/issue3657#msg110461
     107          if self.old_random is not None:
     108              sys.modules['random'] = self.old_random
     109  
     110      @mock_register_at_fork
     111      def check_parallel_module_init(self, mock_os):
     112          if imp.lock_held():
     113              # This triggers on, e.g., from test import autotest.
     114              raise unittest.SkipTest("can't run when import lock is held")
     115  
     116          done = threading.Event()
     117          for N in (20, 50) * 3:
     118              if verbose:
     119                  print("Trying", N, "threads ...", end=' ')
     120              # Make sure that random and modulefinder get reimported freshly
     121              for modname in ['random', 'modulefinder']:
     122                  try:
     123                      del sys.modules[modname]
     124                  except KeyError:
     125                      pass
     126              errors = []
     127              done_tasks = []
     128              done.clear()
     129              t0 = time.monotonic()
     130              with threading_helper.start_threads(
     131                      threading.Thread(target=task, args=(N, done, done_tasks, errors,))
     132                      for i in range(N)):
     133                  pass
     134              completed = done.wait(10 * 60)
     135              dt = time.monotonic() - t0
     136              if verbose:
     137                  print("%.1f ms" % (dt*1e3), flush=True, end=" ")
     138              dbg_info = 'done: %s/%s' % (len(done_tasks), N)
     139              self.assertFalse(errors, dbg_info)
     140              self.assertTrue(completed, dbg_info)
     141              if verbose:
     142                  print("OK.")
     143  
     144      def test_parallel_module_init(self):
     145          self.check_parallel_module_init()
     146  
     147      def test_parallel_meta_path(self):
     148          finder = Finder()
     149          sys.meta_path.insert(0, finder)
     150          try:
     151              self.check_parallel_module_init()
     152              self.assertGreater(finder.numcalls, 0)
     153              self.assertEqual(finder.x, finder.numcalls)
     154          finally:
     155              sys.meta_path.remove(finder)
     156  
     157      def test_parallel_path_hooks(self):
     158          # Here the Finder instance is only used to check concurrent calls
     159          # to path_hook().
     160          finder = Finder()
     161          # In order for our path hook to be called at each import, we need
     162          # to flush the path_importer_cache, which we do by registering a
     163          # dedicated meta_path entry.
     164          flushing_finder = FlushingFinder()
     165          def path_hook(path):
     166              finder.find_spec('')
     167              raise ImportError
     168          sys.path_hooks.insert(0, path_hook)
     169          sys.meta_path.append(flushing_finder)
     170          try:
     171              # Flush the cache a first time
     172              flushing_finder.find_spec('')
     173              numtests = self.check_parallel_module_init()
     174              self.assertGreater(finder.numcalls, 0)
     175              self.assertEqual(finder.x, finder.numcalls)
     176          finally:
     177              sys.meta_path.remove(flushing_finder)
     178              sys.path_hooks.remove(path_hook)
     179  
     180      def test_import_hangers(self):
     181          # In case this test is run again, make sure the helper module
     182          # gets loaded from scratch again.
     183          try:
     184              del sys.modules['test.test_importlib.threaded_import_hangers']
     185          except KeyError:
     186              pass
     187          import test.test_importlib.threaded_import_hangers
     188          self.assertFalse(test.test_importlib.threaded_import_hangers.errors)
     189  
     190      def test_circular_imports(self):
     191          # The goal of this test is to exercise implementations of the import
     192          # lock which use a per-module lock, rather than a global lock.
     193          # In these implementations, there is a possible deadlock with
     194          # circular imports, for example:
     195          # - thread 1 imports A (grabbing the lock for A) which imports B
     196          # - thread 2 imports B (grabbing the lock for B) which imports A
     197          # Such implementations should be able to detect such situations and
     198          # resolve them one way or the other, without freezing.
     199          # NOTE: our test constructs a slightly less trivial import cycle,
     200          # in order to better stress the deadlock avoidance mechanism.
     201          delay = 0.5
     202          os.mkdir(TESTFN)
     203          self.addCleanup(shutil.rmtree, TESTFN)
     204          sys.path.insert(0, TESTFN)
     205          self.addCleanup(sys.path.remove, TESTFN)
     206          for name, contents in circular_imports_modules.items():
     207              contents = contents % {'delay': delay}
     208              with open(os.path.join(TESTFN, name + ".py"), "wb") as f:
     209                  f.write(contents.encode('utf-8'))
     210              self.addCleanup(forget, name)
     211  
     212          importlib.invalidate_caches()
     213          results = []
     214          def import_ab():
     215              import A
     216              results.append(getattr(A, 'x', None))
     217          def import_ba():
     218              import B
     219              results.append(getattr(B, 'x', None))
     220          t1 = threading.Thread(target=import_ab)
     221          t2 = threading.Thread(target=import_ba)
     222          t1.start()
     223          t2.start()
     224          t1.join()
     225          t2.join()
     226          self.assertEqual(set(results), {'a', 'b'})
     227  
     228      @mock_register_at_fork
     229      def test_side_effect_import(self, mock_os):
     230          code = """if 1:
     231              import threading
     232              def target():
     233                  import random
     234              t = threading.Thread(target=target)
     235              t.start()
     236              t.join()
     237              t = None"""
     238          sys.path.insert(0, os.curdir)
     239          self.addCleanup(sys.path.remove, os.curdir)
     240          filename = TESTFN + ".py"
     241          with open(filename, "wb") as f:
     242              f.write(code.encode('utf-8'))
     243          self.addCleanup(unlink, filename)
     244          self.addCleanup(forget, TESTFN)
     245          self.addCleanup(rmtree, '__pycache__')
     246          importlib.invalidate_caches()
     247          with threading_helper.wait_threads_exit():
     248              __import__(TESTFN)
     249          del sys.modules[TESTFN]
     250  
     251      def test_concurrent_futures_circular_import(self):
     252          # Regression test for bpo-43515
     253          fn = os.path.join(os.path.dirname(__file__),
     254                            'partial', 'cfimport.py')
     255          script_helper.assert_python_ok(fn)
     256  
     257      def test_multiprocessing_pool_circular_import(self):
     258          # Regression test for bpo-41567
     259          fn = os.path.join(os.path.dirname(__file__),
     260                            'partial', 'pool_in_threads.py')
     261          script_helper.assert_python_ok(fn)
     262  
     263  
     264  def setUpModule():
     265      thread_info = threading_helper.threading_setup()
     266      unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
     267      try:
     268          old_switchinterval = sys.getswitchinterval()
     269          unittest.addModuleCleanup(sys.setswitchinterval, old_switchinterval)
     270          sys.setswitchinterval(1e-5)
     271      except AttributeError:
     272          pass
     273  
     274  
     275  if __name__ == "__main__":
     276      unittest.main()