1 # This is a variant of the very old (early 90's) file
2 # Demo/threads/bug.py. It simply provokes a number of threads into
3 # trying to import the same module "at the same time".
4 # There are no pleasant failure modes -- most likely is that Python
5 # complains several times about module random having no attribute
6 # randrange, and then Python hangs.
7
8 import _imp as imp
9 import os
10 import importlib
11 import sys
12 import time
13 import shutil
14 import threading
15 import unittest
16 from unittest import mock
17 from test.support import verbose
18 from test.support.import_helper import forget
19 from test.support.os_helper import (TESTFN, unlink, rmtree)
20 from test.support import script_helper, threading_helper
21
22 threading_helper.requires_working_threading(module=True)
23
24 def task(N, done, done_tasks, errors):
25 try:
26 # We don't use modulefinder but still import it in order to stress
27 # importing of different modules from several threads.
28 if len(done_tasks) % 2:
29 import modulefinder
30 import random
31 else:
32 import random
33 import modulefinder
34 # This will fail if random is not completely initialized
35 x = random.randrange(1, 3)
36 except Exception as e:
37 errors.append(e.with_traceback(None))
38 finally:
39 done_tasks.append(threading.get_ident())
40 finished = len(done_tasks) == N
41 if finished:
42 done.set()
43
44 def mock_register_at_fork(func):
45 # bpo-30599: Mock os.register_at_fork() when importing the random module,
46 # since this function doesn't allow to unregister callbacks and would leak
47 # memory.
48 return mock.patch('os.register_at_fork', create=True)(func)
49
50 # Create a circular import structure: A -> C -> B -> D -> A
51 # NOTE: `time` is already loaded and therefore doesn't threaten to deadlock.
52
53 circular_imports_modules = {
54 'A': """if 1:
55 import time
56 time.sleep(%(delay)s)
57 x = 'a'
58 import C
59 """,
60 'B': """if 1:
61 import time
62 time.sleep(%(delay)s)
63 x = 'b'
64 import D
65 """,
66 'C': """import B""",
67 'D': """import A""",
68 }
69
70 class ESC[4;38;5;81mFinder:
71 """A dummy finder to detect concurrent access to its find_spec()
72 method."""
73
74 def __init__(self):
75 self.numcalls = 0
76 self.x = 0
77 self.lock = threading.Lock()
78
79 def find_spec(self, name, path=None, target=None):
80 # Simulate some thread-unsafe behaviour. If calls to find_spec()
81 # are properly serialized, `x` will end up the same as `numcalls`.
82 # Otherwise not.
83 assert imp.lock_held()
84 with self.lock:
85 self.numcalls += 1
86 x = self.x
87 time.sleep(0.01)
88 self.x = x + 1
89
90 class ESC[4;38;5;81mFlushingFinder:
91 """A dummy finder which flushes sys.path_importer_cache when it gets
92 called."""
93
94 def find_spec(self, name, path=None, target=None):
95 sys.path_importer_cache.clear()
96
97
98 class ESC[4;38;5;81mThreadedImportTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
99
100 def setUp(self):
101 self.old_random = sys.modules.pop('random', None)
102
103 def tearDown(self):
104 # If the `random` module was already initialized, we restore the
105 # old module at the end so that pickling tests don't fail.
106 # See http://bugs.python.org/issue3657#msg110461
107 if self.old_random is not None:
108 sys.modules['random'] = self.old_random
109
110 @mock_register_at_fork
111 def check_parallel_module_init(self, mock_os):
112 if imp.lock_held():
113 # This triggers on, e.g., from test import autotest.
114 raise unittest.SkipTest("can't run when import lock is held")
115
116 done = threading.Event()
117 for N in (20, 50) * 3:
118 if verbose:
119 print("Trying", N, "threads ...", end=' ')
120 # Make sure that random and modulefinder get reimported freshly
121 for modname in ['random', 'modulefinder']:
122 try:
123 del sys.modules[modname]
124 except KeyError:
125 pass
126 errors = []
127 done_tasks = []
128 done.clear()
129 t0 = time.monotonic()
130 with threading_helper.start_threads(
131 threading.Thread(target=task, args=(N, done, done_tasks, errors,))
132 for i in range(N)):
133 pass
134 completed = done.wait(10 * 60)
135 dt = time.monotonic() - t0
136 if verbose:
137 print("%.1f ms" % (dt*1e3), flush=True, end=" ")
138 dbg_info = 'done: %s/%s' % (len(done_tasks), N)
139 self.assertFalse(errors, dbg_info)
140 self.assertTrue(completed, dbg_info)
141 if verbose:
142 print("OK.")
143
144 def test_parallel_module_init(self):
145 self.check_parallel_module_init()
146
147 def test_parallel_meta_path(self):
148 finder = Finder()
149 sys.meta_path.insert(0, finder)
150 try:
151 self.check_parallel_module_init()
152 self.assertGreater(finder.numcalls, 0)
153 self.assertEqual(finder.x, finder.numcalls)
154 finally:
155 sys.meta_path.remove(finder)
156
157 def test_parallel_path_hooks(self):
158 # Here the Finder instance is only used to check concurrent calls
159 # to path_hook().
160 finder = Finder()
161 # In order for our path hook to be called at each import, we need
162 # to flush the path_importer_cache, which we do by registering a
163 # dedicated meta_path entry.
164 flushing_finder = FlushingFinder()
165 def path_hook(path):
166 finder.find_spec('')
167 raise ImportError
168 sys.path_hooks.insert(0, path_hook)
169 sys.meta_path.append(flushing_finder)
170 try:
171 # Flush the cache a first time
172 flushing_finder.find_spec('')
173 numtests = self.check_parallel_module_init()
174 self.assertGreater(finder.numcalls, 0)
175 self.assertEqual(finder.x, finder.numcalls)
176 finally:
177 sys.meta_path.remove(flushing_finder)
178 sys.path_hooks.remove(path_hook)
179
180 def test_import_hangers(self):
181 # In case this test is run again, make sure the helper module
182 # gets loaded from scratch again.
183 try:
184 del sys.modules['test.test_importlib.threaded_import_hangers']
185 except KeyError:
186 pass
187 import test.test_importlib.threaded_import_hangers
188 self.assertFalse(test.test_importlib.threaded_import_hangers.errors)
189
190 def test_circular_imports(self):
191 # The goal of this test is to exercise implementations of the import
192 # lock which use a per-module lock, rather than a global lock.
193 # In these implementations, there is a possible deadlock with
194 # circular imports, for example:
195 # - thread 1 imports A (grabbing the lock for A) which imports B
196 # - thread 2 imports B (grabbing the lock for B) which imports A
197 # Such implementations should be able to detect such situations and
198 # resolve them one way or the other, without freezing.
199 # NOTE: our test constructs a slightly less trivial import cycle,
200 # in order to better stress the deadlock avoidance mechanism.
201 delay = 0.5
202 os.mkdir(TESTFN)
203 self.addCleanup(shutil.rmtree, TESTFN)
204 sys.path.insert(0, TESTFN)
205 self.addCleanup(sys.path.remove, TESTFN)
206 for name, contents in circular_imports_modules.items():
207 contents = contents % {'delay': delay}
208 with open(os.path.join(TESTFN, name + ".py"), "wb") as f:
209 f.write(contents.encode('utf-8'))
210 self.addCleanup(forget, name)
211
212 importlib.invalidate_caches()
213 results = []
214 def import_ab():
215 import A
216 results.append(getattr(A, 'x', None))
217 def import_ba():
218 import B
219 results.append(getattr(B, 'x', None))
220 t1 = threading.Thread(target=import_ab)
221 t2 = threading.Thread(target=import_ba)
222 t1.start()
223 t2.start()
224 t1.join()
225 t2.join()
226 self.assertEqual(set(results), {'a', 'b'})
227
228 @mock_register_at_fork
229 def test_side_effect_import(self, mock_os):
230 code = """if 1:
231 import threading
232 def target():
233 import random
234 t = threading.Thread(target=target)
235 t.start()
236 t.join()
237 t = None"""
238 sys.path.insert(0, os.curdir)
239 self.addCleanup(sys.path.remove, os.curdir)
240 filename = TESTFN + ".py"
241 with open(filename, "wb") as f:
242 f.write(code.encode('utf-8'))
243 self.addCleanup(unlink, filename)
244 self.addCleanup(forget, TESTFN)
245 self.addCleanup(rmtree, '__pycache__')
246 importlib.invalidate_caches()
247 with threading_helper.wait_threads_exit():
248 __import__(TESTFN)
249 del sys.modules[TESTFN]
250
251 def test_concurrent_futures_circular_import(self):
252 # Regression test for bpo-43515
253 fn = os.path.join(os.path.dirname(__file__),
254 'partial', 'cfimport.py')
255 script_helper.assert_python_ok(fn)
256
257 def test_multiprocessing_pool_circular_import(self):
258 # Regression test for bpo-41567
259 fn = os.path.join(os.path.dirname(__file__),
260 'partial', 'pool_in_threads.py')
261 script_helper.assert_python_ok(fn)
262
263
264 def setUpModule():
265 thread_info = threading_helper.threading_setup()
266 unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
267 try:
268 old_switchinterval = sys.getswitchinterval()
269 unittest.addModuleCleanup(sys.setswitchinterval, old_switchinterval)
270 sys.setswitchinterval(1e-5)
271 except AttributeError:
272 pass
273
274
275 if __name__ == "__main__":
276 unittest.main()