1  from test.test_importlib import util as test_util
       2  
       3  init = test_util.import_importlib('importlib')
       4  
       5  import sys
       6  import threading
       7  import unittest
       8  import weakref
       9  
      10  from test import support
      11  from test.support import threading_helper
      12  from test import lock_tests
      13  
      14  
      15  threading_helper.requires_working_threading(module=True)
      16  
      17  
      18  class ESC[4;38;5;81mModuleLockAsRLockTests:
      19      locktype = classmethod(lambda cls: cls.LockType("some_lock"))
      20  
      21      # _is_owned() unsupported
      22      test__is_owned = None
      23      # acquire(blocking=False) unsupported
      24      test_try_acquire = None
      25      test_try_acquire_contended = None
      26      # `with` unsupported
      27      test_with = None
      28      # acquire(timeout=...) unsupported
      29      test_timeout = None
      30      # _release_save() unsupported
      31      test_release_save_unacquired = None
      32      # lock status in repr unsupported
      33      test_repr = None
      34      test_locked_repr = None
      35  
      36      def tearDown(self):
      37          for splitinit in init.values():
      38              splitinit._bootstrap._blocking_on.clear()
      39  
      40  
      41  LOCK_TYPES = {kind: splitinit._bootstrap._ModuleLock
      42                for kind, splitinit in init.items()}
      43  
      44  (Frozen_ModuleLockAsRLockTests,
      45   Source_ModuleLockAsRLockTests
      46   ) = test_util.test_both(ModuleLockAsRLockTests, lock_tests.RLockTests,
      47                           LockType=LOCK_TYPES)
      48  
      49  
      50  class ESC[4;38;5;81mDeadlockAvoidanceTests:
      51  
      52      def setUp(self):
      53          try:
      54              self.old_switchinterval = sys.getswitchinterval()
      55              support.setswitchinterval(0.000001)
      56          except AttributeError:
      57              self.old_switchinterval = None
      58  
      59      def tearDown(self):
      60          if self.old_switchinterval is not None:
      61              sys.setswitchinterval(self.old_switchinterval)
      62  
      63      def run_deadlock_avoidance_test(self, create_deadlock):
      64          NLOCKS = 10
      65          locks = [self.LockType(str(i)) for i in range(NLOCKS)]
      66          pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)]
      67          if create_deadlock:
      68              NTHREADS = NLOCKS
      69          else:
      70              NTHREADS = NLOCKS - 1
      71          barrier = threading.Barrier(NTHREADS)
      72          results = []
      73  
      74          def _acquire(lock):
      75              """Try to acquire the lock. Return True on success,
      76              False on deadlock."""
      77              try:
      78                  lock.acquire()
      79              except self.DeadlockError:
      80                  return False
      81              else:
      82                  return True
      83  
      84          def f():
      85              a, b = pairs.pop()
      86              ra = _acquire(a)
      87              barrier.wait()
      88              rb = _acquire(b)
      89              results.append((ra, rb))
      90              if rb:
      91                  b.release()
      92              if ra:
      93                  a.release()
      94          lock_tests.Bunch(f, NTHREADS).wait_for_finished()
      95          self.assertEqual(len(results), NTHREADS)
      96          return results
      97  
      98      def test_deadlock(self):
      99          results = self.run_deadlock_avoidance_test(True)
     100          # At least one of the threads detected a potential deadlock on its
     101          # second acquire() call.  It may be several of them, because the
     102          # deadlock avoidance mechanism is conservative.
     103          nb_deadlocks = results.count((True, False))
     104          self.assertGreaterEqual(nb_deadlocks, 1)
     105          self.assertEqual(results.count((True, True)), len(results) - nb_deadlocks)
     106  
     107      def test_no_deadlock(self):
     108          results = self.run_deadlock_avoidance_test(False)
     109          self.assertEqual(results.count((True, False)), 0)
     110          self.assertEqual(results.count((True, True)), len(results))
     111  
     112  
     113  DEADLOCK_ERRORS = {kind: splitinit._bootstrap._DeadlockError
     114                     for kind, splitinit in init.items()}
     115  
     116  (Frozen_DeadlockAvoidanceTests,
     117   Source_DeadlockAvoidanceTests
     118   ) = test_util.test_both(DeadlockAvoidanceTests,
     119                           LockType=LOCK_TYPES,
     120                           DeadlockError=DEADLOCK_ERRORS)
     121  
     122  
     123  class ESC[4;38;5;81mLifetimeTests:
     124  
     125      @property
     126      def bootstrap(self):
     127          return self.init._bootstrap
     128  
     129      def test_lock_lifetime(self):
     130          name = "xyzzy"
     131          self.assertNotIn(name, self.bootstrap._module_locks)
     132          lock = self.bootstrap._get_module_lock(name)
     133          self.assertIn(name, self.bootstrap._module_locks)
     134          wr = weakref.ref(lock)
     135          del lock
     136          support.gc_collect()
     137          self.assertNotIn(name, self.bootstrap._module_locks)
     138          self.assertIsNone(wr())
     139  
     140      def test_all_locks(self):
     141          support.gc_collect()
     142          self.assertEqual(0, len(self.bootstrap._module_locks),
     143                           self.bootstrap._module_locks)
     144  
     145  
     146  (Frozen_LifetimeTests,
     147   Source_LifetimeTests
     148   ) = test_util.test_both(LifetimeTests, init=init)
     149  
     150  
     151  def setUpModule():
     152      thread_info = threading_helper.threading_setup()
     153      unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
     154  
     155  
     156  if __name__ == '__main__':
     157      unittest.main()