(root)/
Python-3.11.7/
Lib/
test/
test_importlib/
test_locks.py
       1  from test.test_importlib import util as test_util
       2  
       3  init = test_util.import_importlib('importlib')
       4  
       5  import sys
       6  import threading
       7  import unittest
       8  import weakref
       9  
      10  from test import support
      11  from test.support import threading_helper
      12  from test import lock_tests
      13  
      14  
      15  threading_helper.requires_working_threading(module=True)
      16  
      17  
      18  class ESC[4;38;5;81mModuleLockAsRLockTests:
      19      locktype = classmethod(lambda cls: cls.LockType("some_lock"))
      20  
      21      # _is_owned() unsupported
      22      test__is_owned = None
      23      # acquire(blocking=False) unsupported
      24      test_try_acquire = None
      25      test_try_acquire_contended = None
      26      # `with` unsupported
      27      test_with = None
      28      # acquire(timeout=...) unsupported
      29      test_timeout = None
      30      # _release_save() unsupported
      31      test_release_save_unacquired = None
      32      # _recursion_count() unsupported
      33      test_recursion_count = None
      34      # lock status in repr unsupported
      35      test_repr = None
      36      test_locked_repr = None
      37  
      38  LOCK_TYPES = {kind: splitinit._bootstrap._ModuleLock
      39                for kind, splitinit in init.items()}
      40  
      41  (Frozen_ModuleLockAsRLockTests,
      42   Source_ModuleLockAsRLockTests
      43   ) = test_util.test_both(ModuleLockAsRLockTests, lock_tests.RLockTests,
      44                           LockType=LOCK_TYPES)
      45  
      46  
      47  class ESC[4;38;5;81mDeadlockAvoidanceTests:
      48  
      49      def setUp(self):
      50          try:
      51              self.old_switchinterval = sys.getswitchinterval()
      52              support.setswitchinterval(0.000001)
      53          except AttributeError:
      54              self.old_switchinterval = None
      55  
      56      def tearDown(self):
      57          if self.old_switchinterval is not None:
      58              sys.setswitchinterval(self.old_switchinterval)
      59  
      60      def run_deadlock_avoidance_test(self, create_deadlock):
      61          NLOCKS = 10
      62          locks = [self.LockType(str(i)) for i in range(NLOCKS)]
      63          pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)]
      64          if create_deadlock:
      65              NTHREADS = NLOCKS
      66          else:
      67              NTHREADS = NLOCKS - 1
      68          barrier = threading.Barrier(NTHREADS)
      69          results = []
      70  
      71          def _acquire(lock):
      72              """Try to acquire the lock. Return True on success,
      73              False on deadlock."""
      74              try:
      75                  lock.acquire()
      76              except self.DeadlockError:
      77                  return False
      78              else:
      79                  return True
      80  
      81          def f():
      82              a, b = pairs.pop()
      83              ra = _acquire(a)
      84              barrier.wait()
      85              rb = _acquire(b)
      86              results.append((ra, rb))
      87              if rb:
      88                  b.release()
      89              if ra:
      90                  a.release()
      91          with lock_tests.Bunch(f, NTHREADS):
      92              pass
      93          self.assertEqual(len(results), NTHREADS)
      94          return results
      95  
      96      def test_deadlock(self):
      97          results = self.run_deadlock_avoidance_test(True)
      98          # At least one of the threads detected a potential deadlock on its
      99          # second acquire() call.  It may be several of them, because the
     100          # deadlock avoidance mechanism is conservative.
     101          nb_deadlocks = results.count((True, False))
     102          self.assertGreaterEqual(nb_deadlocks, 1)
     103          self.assertEqual(results.count((True, True)), len(results) - nb_deadlocks)
     104  
     105      def test_no_deadlock(self):
     106          results = self.run_deadlock_avoidance_test(False)
     107          self.assertEqual(results.count((True, False)), 0)
     108          self.assertEqual(results.count((True, True)), len(results))
     109  
     110  
     111  DEADLOCK_ERRORS = {kind: splitinit._bootstrap._DeadlockError
     112                     for kind, splitinit in init.items()}
     113  
     114  (Frozen_DeadlockAvoidanceTests,
     115   Source_DeadlockAvoidanceTests
     116   ) = test_util.test_both(DeadlockAvoidanceTests,
     117                           LockType=LOCK_TYPES,
     118                           DeadlockError=DEADLOCK_ERRORS)
     119  
     120  
     121  class ESC[4;38;5;81mLifetimeTests:
     122  
     123      @property
     124      def bootstrap(self):
     125          return self.init._bootstrap
     126  
     127      def test_lock_lifetime(self):
     128          name = "xyzzy"
     129          self.assertNotIn(name, self.bootstrap._module_locks)
     130          lock = self.bootstrap._get_module_lock(name)
     131          self.assertIn(name, self.bootstrap._module_locks)
     132          wr = weakref.ref(lock)
     133          del lock
     134          support.gc_collect()
     135          self.assertNotIn(name, self.bootstrap._module_locks)
     136          self.assertIsNone(wr())
     137  
     138      def test_all_locks(self):
     139          support.gc_collect()
     140          self.assertEqual(0, len(self.bootstrap._module_locks),
     141                           self.bootstrap._module_locks)
     142  
     143  
     144  (Frozen_LifetimeTests,
     145   Source_LifetimeTests
     146   ) = test_util.test_both(LifetimeTests, init=init)
     147  
     148  
     149  def setUpModule():
     150      thread_info = threading_helper.threading_setup()
     151      unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
     152  
     153  
     154  if __name__ == '__main__':
     155      unittest.main()