1  # This is a variant of the very old (early 90's) file
       2  # Demo/threads/bug.py.  It simply provokes a number of threads into
       3  # trying to import the same module "at the same time".
       4  # There are no pleasant failure modes -- most likely is that Python
       5  # complains several times about module random having no attribute
       6  # randrange, and then Python hangs.
       7  
       8  import _imp as imp
       9  import os
      10  import importlib
      11  import sys
      12  import time
      13  import shutil
      14  import threading
      15  import unittest
      16  from unittest import mock
      17  from test.support import verbose
      18  from test.support.import_helper import forget, mock_register_at_fork
      19  from test.support.os_helper import (TESTFN, unlink, rmtree)
      20  from test.support import script_helper, threading_helper
      21  
      22  threading_helper.requires_working_threading(module=True)
      23  
      24  def task(N, done, done_tasks, errors):
      25      try:
      26          # We don't use modulefinder but still import it in order to stress
      27          # importing of different modules from several threads.
      28          if len(done_tasks) % 2:
      29              import modulefinder
      30              import random
      31          else:
      32              import random
      33              import modulefinder
      34          # This will fail if random is not completely initialized
      35          x = random.randrange(1, 3)
      36      except Exception as e:
      37          errors.append(e.with_traceback(None))
      38      finally:
      39          done_tasks.append(threading.get_ident())
      40          finished = len(done_tasks) == N
      41          if finished:
      42              done.set()
      43  
      44  # Create a circular import structure: A -> C -> B -> D -> A
      45  # NOTE: `time` is already loaded and therefore doesn't threaten to deadlock.
      46  
      47  circular_imports_modules = {
      48      'A': """if 1:
      49          import time
      50          time.sleep(%(delay)s)
      51          x = 'a'
      52          import C
      53          """,
      54      'B': """if 1:
      55          import time
      56          time.sleep(%(delay)s)
      57          x = 'b'
      58          import D
      59          """,
      60      'C': """import B""",
      61      'D': """import A""",
      62  }
      63  
      64  class ESC[4;38;5;81mFinder:
      65      """A dummy finder to detect concurrent access to its find_spec()
      66      method."""
      67  
      68      def __init__(self):
      69          self.numcalls = 0
      70          self.x = 0
      71          self.lock = threading.Lock()
      72  
      73      def find_spec(self, name, path=None, target=None):
      74          # Simulate some thread-unsafe behaviour. If calls to find_spec()
      75          # are properly serialized, `x` will end up the same as `numcalls`.
      76          # Otherwise not.
      77          assert imp.lock_held()
      78          with self.lock:
      79              self.numcalls += 1
      80          x = self.x
      81          time.sleep(0.01)
      82          self.x = x + 1
      83  
      84  class ESC[4;38;5;81mFlushingFinder:
      85      """A dummy finder which flushes sys.path_importer_cache when it gets
      86      called."""
      87  
      88      def find_spec(self, name, path=None, target=None):
      89          sys.path_importer_cache.clear()
      90  
      91  
      92  class ESC[4;38;5;81mThreadedImportTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      93  
      94      def setUp(self):
      95          self.old_random = sys.modules.pop('random', None)
      96  
      97      def tearDown(self):
      98          # If the `random` module was already initialized, we restore the
      99          # old module at the end so that pickling tests don't fail.
     100          # See http://bugs.python.org/issue3657#msg110461
     101          if self.old_random is not None:
     102              sys.modules['random'] = self.old_random
     103  
     104      @mock_register_at_fork
     105      def check_parallel_module_init(self, mock_os):
     106          if imp.lock_held():
     107              # This triggers on, e.g., from test import autotest.
     108              raise unittest.SkipTest("can't run when import lock is held")
     109  
     110          done = threading.Event()
     111          for N in (20, 50) * 3:
     112              if verbose:
     113                  print("Trying", N, "threads ...", end=' ')
     114              # Make sure that random and modulefinder get reimported freshly
     115              for modname in ['random', 'modulefinder']:
     116                  try:
     117                      del sys.modules[modname]
     118                  except KeyError:
     119                      pass
     120              errors = []
     121              done_tasks = []
     122              done.clear()
     123              t0 = time.monotonic()
     124              with threading_helper.start_threads(
     125                      threading.Thread(target=task, args=(N, done, done_tasks, errors,))
     126                      for i in range(N)):
     127                  pass
     128              completed = done.wait(10 * 60)
     129              dt = time.monotonic() - t0
     130              if verbose:
     131                  print("%.1f ms" % (dt*1e3), flush=True, end=" ")
     132              dbg_info = 'done: %s/%s' % (len(done_tasks), N)
     133              self.assertFalse(errors, dbg_info)
     134              self.assertTrue(completed, dbg_info)
     135              if verbose:
     136                  print("OK.")
     137  
     138      def test_parallel_module_init(self):
     139          self.check_parallel_module_init()
     140  
     141      def test_parallel_meta_path(self):
     142          finder = Finder()
     143          sys.meta_path.insert(0, finder)
     144          try:
     145              self.check_parallel_module_init()
     146              self.assertGreater(finder.numcalls, 0)
     147              self.assertEqual(finder.x, finder.numcalls)
     148          finally:
     149              sys.meta_path.remove(finder)
     150  
     151      def test_parallel_path_hooks(self):
     152          # Here the Finder instance is only used to check concurrent calls
     153          # to path_hook().
     154          finder = Finder()
     155          # In order for our path hook to be called at each import, we need
     156          # to flush the path_importer_cache, which we do by registering a
     157          # dedicated meta_path entry.
     158          flushing_finder = FlushingFinder()
     159          def path_hook(path):
     160              finder.find_spec('')
     161              raise ImportError
     162          sys.path_hooks.insert(0, path_hook)
     163          sys.meta_path.append(flushing_finder)
     164          try:
     165              # Flush the cache a first time
     166              flushing_finder.find_spec('')
     167              numtests = self.check_parallel_module_init()
     168              self.assertGreater(finder.numcalls, 0)
     169              self.assertEqual(finder.x, finder.numcalls)
     170          finally:
     171              sys.meta_path.remove(flushing_finder)
     172              sys.path_hooks.remove(path_hook)
     173  
     174      def test_import_hangers(self):
     175          # In case this test is run again, make sure the helper module
     176          # gets loaded from scratch again.
     177          try:
     178              del sys.modules['test.test_importlib.threaded_import_hangers']
     179          except KeyError:
     180              pass
     181          import test.test_importlib.threaded_import_hangers
     182          self.assertFalse(test.test_importlib.threaded_import_hangers.errors)
     183  
     184      def test_circular_imports(self):
     185          # The goal of this test is to exercise implementations of the import
     186          # lock which use a per-module lock, rather than a global lock.
     187          # In these implementations, there is a possible deadlock with
     188          # circular imports, for example:
     189          # - thread 1 imports A (grabbing the lock for A) which imports B
     190          # - thread 2 imports B (grabbing the lock for B) which imports A
     191          # Such implementations should be able to detect such situations and
     192          # resolve them one way or the other, without freezing.
     193          # NOTE: our test constructs a slightly less trivial import cycle,
     194          # in order to better stress the deadlock avoidance mechanism.
     195          delay = 0.5
     196          os.mkdir(TESTFN)
     197          self.addCleanup(shutil.rmtree, TESTFN)
     198          sys.path.insert(0, TESTFN)
     199          self.addCleanup(sys.path.remove, TESTFN)
     200          for name, contents in circular_imports_modules.items():
     201              contents = contents % {'delay': delay}
     202              with open(os.path.join(TESTFN, name + ".py"), "wb") as f:
     203                  f.write(contents.encode('utf-8'))
     204              self.addCleanup(forget, name)
     205  
     206          importlib.invalidate_caches()
     207          results = []
     208          def import_ab():
     209              import A
     210              results.append(getattr(A, 'x', None))
     211          def import_ba():
     212              import B
     213              results.append(getattr(B, 'x', None))
     214          t1 = threading.Thread(target=import_ab)
     215          t2 = threading.Thread(target=import_ba)
     216          t1.start()
     217          t2.start()
     218          t1.join()
     219          t2.join()
     220          self.assertEqual(set(results), {'a', 'b'})
     221  
     222      @mock_register_at_fork
     223      def test_side_effect_import(self, mock_os):
     224          code = """if 1:
     225              import threading
     226              def target():
     227                  import random
     228              t = threading.Thread(target=target)
     229              t.start()
     230              t.join()
     231              t = None"""
     232          sys.path.insert(0, os.curdir)
     233          self.addCleanup(sys.path.remove, os.curdir)
     234          filename = TESTFN + ".py"
     235          with open(filename, "wb") as f:
     236              f.write(code.encode('utf-8'))
     237          self.addCleanup(unlink, filename)
     238          self.addCleanup(forget, TESTFN)
     239          self.addCleanup(rmtree, '__pycache__')
     240          importlib.invalidate_caches()
     241          with threading_helper.wait_threads_exit():
     242              __import__(TESTFN)
     243          del sys.modules[TESTFN]
     244  
     245      def test_concurrent_futures_circular_import(self):
     246          # Regression test for bpo-43515
     247          fn = os.path.join(os.path.dirname(__file__),
     248                            'partial', 'cfimport.py')
     249          script_helper.assert_python_ok(fn)
     250  
     251      def test_multiprocessing_pool_circular_import(self):
     252          # Regression test for bpo-41567
     253          fn = os.path.join(os.path.dirname(__file__),
     254                            'partial', 'pool_in_threads.py')
     255          script_helper.assert_python_ok(fn)
     256  
     257  
     258  def setUpModule():
     259      thread_info = threading_helper.threading_setup()
     260      unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
     261      try:
     262          old_switchinterval = sys.getswitchinterval()
     263          unittest.addModuleCleanup(sys.setswitchinterval, old_switchinterval)
     264          sys.setswitchinterval(1e-5)
     265      except AttributeError:
     266          pass
     267  
     268  
     269  if __name__ == "__main__":
     270      unittest.main()