1 from test.test_importlib import util as test_util
2
3 init = test_util.import_importlib('importlib')
4
5 import sys
6 import threading
7 import unittest
8 import weakref
9
10 from test import support
11 from test.support import threading_helper
12 from test import lock_tests
13
14
15 threading_helper.requires_working_threading(module=True)
16
17
18 class ESC[4;38;5;81mModuleLockAsRLockTests:
19 locktype = classmethod(lambda cls: cls.LockType("some_lock"))
20
21 # _is_owned() unsupported
22 test__is_owned = None
23 # acquire(blocking=False) unsupported
24 test_try_acquire = None
25 test_try_acquire_contended = None
26 # `with` unsupported
27 test_with = None
28 # acquire(timeout=...) unsupported
29 test_timeout = None
30 # _release_save() unsupported
31 test_release_save_unacquired = None
32 # _recursion_count() unsupported
33 test_recursion_count = None
34 # lock status in repr unsupported
35 test_repr = None
36 test_locked_repr = None
37
38 LOCK_TYPES = {kind: splitinit._bootstrap._ModuleLock
39 for kind, splitinit in init.items()}
40
41 (Frozen_ModuleLockAsRLockTests,
42 Source_ModuleLockAsRLockTests
43 ) = test_util.test_both(ModuleLockAsRLockTests, lock_tests.RLockTests,
44 LockType=LOCK_TYPES)
45
46
47 class ESC[4;38;5;81mDeadlockAvoidanceTests:
48
49 def setUp(self):
50 try:
51 self.old_switchinterval = sys.getswitchinterval()
52 support.setswitchinterval(0.000001)
53 except AttributeError:
54 self.old_switchinterval = None
55
56 def tearDown(self):
57 if self.old_switchinterval is not None:
58 sys.setswitchinterval(self.old_switchinterval)
59
60 def run_deadlock_avoidance_test(self, create_deadlock):
61 NLOCKS = 10
62 locks = [self.LockType(str(i)) for i in range(NLOCKS)]
63 pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)]
64 if create_deadlock:
65 NTHREADS = NLOCKS
66 else:
67 NTHREADS = NLOCKS - 1
68 barrier = threading.Barrier(NTHREADS)
69 results = []
70
71 def _acquire(lock):
72 """Try to acquire the lock. Return True on success,
73 False on deadlock."""
74 try:
75 lock.acquire()
76 except self.DeadlockError:
77 return False
78 else:
79 return True
80
81 def f():
82 a, b = pairs.pop()
83 ra = _acquire(a)
84 barrier.wait()
85 rb = _acquire(b)
86 results.append((ra, rb))
87 if rb:
88 b.release()
89 if ra:
90 a.release()
91 with lock_tests.Bunch(f, NTHREADS):
92 pass
93 self.assertEqual(len(results), NTHREADS)
94 return results
95
96 def test_deadlock(self):
97 results = self.run_deadlock_avoidance_test(True)
98 # At least one of the threads detected a potential deadlock on its
99 # second acquire() call. It may be several of them, because the
100 # deadlock avoidance mechanism is conservative.
101 nb_deadlocks = results.count((True, False))
102 self.assertGreaterEqual(nb_deadlocks, 1)
103 self.assertEqual(results.count((True, True)), len(results) - nb_deadlocks)
104
105 def test_no_deadlock(self):
106 results = self.run_deadlock_avoidance_test(False)
107 self.assertEqual(results.count((True, False)), 0)
108 self.assertEqual(results.count((True, True)), len(results))
109
110
111 DEADLOCK_ERRORS = {kind: splitinit._bootstrap._DeadlockError
112 for kind, splitinit in init.items()}
113
114 (Frozen_DeadlockAvoidanceTests,
115 Source_DeadlockAvoidanceTests
116 ) = test_util.test_both(DeadlockAvoidanceTests,
117 LockType=LOCK_TYPES,
118 DeadlockError=DEADLOCK_ERRORS)
119
120
121 class ESC[4;38;5;81mLifetimeTests:
122
123 @property
124 def bootstrap(self):
125 return self.init._bootstrap
126
127 def test_lock_lifetime(self):
128 name = "xyzzy"
129 self.assertNotIn(name, self.bootstrap._module_locks)
130 lock = self.bootstrap._get_module_lock(name)
131 self.assertIn(name, self.bootstrap._module_locks)
132 wr = weakref.ref(lock)
133 del lock
134 support.gc_collect()
135 self.assertNotIn(name, self.bootstrap._module_locks)
136 self.assertIsNone(wr())
137
138 def test_all_locks(self):
139 support.gc_collect()
140 self.assertEqual(0, len(self.bootstrap._module_locks),
141 self.bootstrap._module_locks)
142
143
144 (Frozen_LifetimeTests,
145 Source_LifetimeTests
146 ) = test_util.test_both(LifetimeTests, init=init)
147
148
149 def setUpModule():
150 thread_info = threading_helper.threading_setup()
151 unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
152
153
154 if __name__ == '__main__':
155 unittest.main()