python (3.12.0)
1 from test.test_importlib import util as test_util
2
3 init = test_util.import_importlib('importlib')
4
5 import sys
6 import threading
7 import unittest
8 import weakref
9
10 from test import support
11 from test.support import threading_helper
12 from test import lock_tests
13
14
15 threading_helper.requires_working_threading(module=True)
16
17
18 class ESC[4;38;5;81mModuleLockAsRLockTests:
19 locktype = classmethod(lambda cls: cls.LockType("some_lock"))
20
21 # _is_owned() unsupported
22 test__is_owned = None
23 # acquire(blocking=False) unsupported
24 test_try_acquire = None
25 test_try_acquire_contended = None
26 # `with` unsupported
27 test_with = None
28 # acquire(timeout=...) unsupported
29 test_timeout = None
30 # _release_save() unsupported
31 test_release_save_unacquired = None
32 # lock status in repr unsupported
33 test_repr = None
34 test_locked_repr = None
35
36 def tearDown(self):
37 for splitinit in init.values():
38 splitinit._bootstrap._blocking_on.clear()
39
40
41 LOCK_TYPES = {kind: splitinit._bootstrap._ModuleLock
42 for kind, splitinit in init.items()}
43
44 (Frozen_ModuleLockAsRLockTests,
45 Source_ModuleLockAsRLockTests
46 ) = test_util.test_both(ModuleLockAsRLockTests, lock_tests.RLockTests,
47 LockType=LOCK_TYPES)
48
49
50 class ESC[4;38;5;81mDeadlockAvoidanceTests:
51
52 def setUp(self):
53 try:
54 self.old_switchinterval = sys.getswitchinterval()
55 support.setswitchinterval(0.000001)
56 except AttributeError:
57 self.old_switchinterval = None
58
59 def tearDown(self):
60 if self.old_switchinterval is not None:
61 sys.setswitchinterval(self.old_switchinterval)
62
63 def run_deadlock_avoidance_test(self, create_deadlock):
64 NLOCKS = 10
65 locks = [self.LockType(str(i)) for i in range(NLOCKS)]
66 pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)]
67 if create_deadlock:
68 NTHREADS = NLOCKS
69 else:
70 NTHREADS = NLOCKS - 1
71 barrier = threading.Barrier(NTHREADS)
72 results = []
73
74 def _acquire(lock):
75 """Try to acquire the lock. Return True on success,
76 False on deadlock."""
77 try:
78 lock.acquire()
79 except self.DeadlockError:
80 return False
81 else:
82 return True
83
84 def f():
85 a, b = pairs.pop()
86 ra = _acquire(a)
87 barrier.wait()
88 rb = _acquire(b)
89 results.append((ra, rb))
90 if rb:
91 b.release()
92 if ra:
93 a.release()
94 lock_tests.Bunch(f, NTHREADS).wait_for_finished()
95 self.assertEqual(len(results), NTHREADS)
96 return results
97
98 def test_deadlock(self):
99 results = self.run_deadlock_avoidance_test(True)
100 # At least one of the threads detected a potential deadlock on its
101 # second acquire() call. It may be several of them, because the
102 # deadlock avoidance mechanism is conservative.
103 nb_deadlocks = results.count((True, False))
104 self.assertGreaterEqual(nb_deadlocks, 1)
105 self.assertEqual(results.count((True, True)), len(results) - nb_deadlocks)
106
107 def test_no_deadlock(self):
108 results = self.run_deadlock_avoidance_test(False)
109 self.assertEqual(results.count((True, False)), 0)
110 self.assertEqual(results.count((True, True)), len(results))
111
112
113 DEADLOCK_ERRORS = {kind: splitinit._bootstrap._DeadlockError
114 for kind, splitinit in init.items()}
115
116 (Frozen_DeadlockAvoidanceTests,
117 Source_DeadlockAvoidanceTests
118 ) = test_util.test_both(DeadlockAvoidanceTests,
119 LockType=LOCK_TYPES,
120 DeadlockError=DEADLOCK_ERRORS)
121
122
123 class ESC[4;38;5;81mLifetimeTests:
124
125 @property
126 def bootstrap(self):
127 return self.init._bootstrap
128
129 def test_lock_lifetime(self):
130 name = "xyzzy"
131 self.assertNotIn(name, self.bootstrap._module_locks)
132 lock = self.bootstrap._get_module_lock(name)
133 self.assertIn(name, self.bootstrap._module_locks)
134 wr = weakref.ref(lock)
135 del lock
136 support.gc_collect()
137 self.assertNotIn(name, self.bootstrap._module_locks)
138 self.assertIsNone(wr())
139
140 def test_all_locks(self):
141 support.gc_collect()
142 self.assertEqual(0, len(self.bootstrap._module_locks),
143 self.bootstrap._module_locks)
144
145
146 (Frozen_LifetimeTests,
147 Source_LifetimeTests
148 ) = test_util.test_both(LifetimeTests, init=init)
149
150
151 def setUpModule():
152 thread_info = threading_helper.threading_setup()
153 unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
154
155
156 if __name__ == '__main__':
157 unittest.main()