python (3.12.0)
1 """
2 Tests for the threading module.
3 """
4
5 import test.support
6 from test.support import threading_helper, requires_subprocess
7 from test.support import verbose, cpython_only, os_helper
8 from test.support.import_helper import import_module
9 from test.support.script_helper import assert_python_ok, assert_python_failure
10
11 import random
12 import sys
13 import _thread
14 import threading
15 import time
16 import unittest
17 import weakref
18 import os
19 import subprocess
20 import signal
21 import textwrap
22 import traceback
23 import warnings
24
25 from unittest import mock
26 from test import lock_tests
27 from test import support
28
29 threading_helper.requires_working_threading(module=True)
30
31 # Between fork() and exec(), only async-safe functions are allowed (issues
32 # #12316 and #11870), and fork() from a worker thread is known to trigger
33 # problems with some operating systems (issue #3863): skip problematic tests
34 # on platforms known to behave badly.
35 platforms_to_skip = ('netbsd5', 'hp-ux11')
36
37
38 def restore_default_excepthook(testcase):
39 testcase.addCleanup(setattr, threading, 'excepthook', threading.excepthook)
40 threading.excepthook = threading.__excepthook__
41
42
43 # A trivial mutable counter.
44 class ESC[4;38;5;81mCounter(ESC[4;38;5;149mobject):
45 def __init__(self):
46 self.value = 0
47 def inc(self):
48 self.value += 1
49 def dec(self):
50 self.value -= 1
51 def get(self):
52 return self.value
53
54 class ESC[4;38;5;81mTestThread(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
55 def __init__(self, name, testcase, sema, mutex, nrunning):
56 threading.Thread.__init__(self, name=name)
57 self.testcase = testcase
58 self.sema = sema
59 self.mutex = mutex
60 self.nrunning = nrunning
61
62 def run(self):
63 delay = random.random() / 10000.0
64 if verbose:
65 print('task %s will run for %.1f usec' %
66 (self.name, delay * 1e6))
67
68 with self.sema:
69 with self.mutex:
70 self.nrunning.inc()
71 if verbose:
72 print(self.nrunning.get(), 'tasks are running')
73 self.testcase.assertLessEqual(self.nrunning.get(), 3)
74
75 time.sleep(delay)
76 if verbose:
77 print('task', self.name, 'done')
78
79 with self.mutex:
80 self.nrunning.dec()
81 self.testcase.assertGreaterEqual(self.nrunning.get(), 0)
82 if verbose:
83 print('%s is finished. %d tasks are running' %
84 (self.name, self.nrunning.get()))
85
86
87 class ESC[4;38;5;81mBaseTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
88 def setUp(self):
89 self._threads = threading_helper.threading_setup()
90
91 def tearDown(self):
92 threading_helper.threading_cleanup(*self._threads)
93 test.support.reap_children()
94
95
96 class ESC[4;38;5;81mThreadTests(ESC[4;38;5;149mBaseTestCase):
97
98 @cpython_only
99 def test_name(self):
100 def func(): pass
101
102 thread = threading.Thread(name="myname1")
103 self.assertEqual(thread.name, "myname1")
104
105 # Convert int name to str
106 thread = threading.Thread(name=123)
107 self.assertEqual(thread.name, "123")
108
109 # target name is ignored if name is specified
110 thread = threading.Thread(target=func, name="myname2")
111 self.assertEqual(thread.name, "myname2")
112
113 with mock.patch.object(threading, '_counter', return_value=2):
114 thread = threading.Thread(name="")
115 self.assertEqual(thread.name, "Thread-2")
116
117 with mock.patch.object(threading, '_counter', return_value=3):
118 thread = threading.Thread()
119 self.assertEqual(thread.name, "Thread-3")
120
121 with mock.patch.object(threading, '_counter', return_value=5):
122 thread = threading.Thread(target=func)
123 self.assertEqual(thread.name, "Thread-5 (func)")
124
125 def test_args_argument(self):
126 # bpo-45735: Using list or tuple as *args* in constructor could
127 # achieve the same effect.
128 num_list = [1]
129 num_tuple = (1,)
130
131 str_list = ["str"]
132 str_tuple = ("str",)
133
134 list_in_tuple = ([1],)
135 tuple_in_list = [(1,)]
136
137 test_cases = (
138 (num_list, lambda arg: self.assertEqual(arg, 1)),
139 (num_tuple, lambda arg: self.assertEqual(arg, 1)),
140 (str_list, lambda arg: self.assertEqual(arg, "str")),
141 (str_tuple, lambda arg: self.assertEqual(arg, "str")),
142 (list_in_tuple, lambda arg: self.assertEqual(arg, [1])),
143 (tuple_in_list, lambda arg: self.assertEqual(arg, (1,)))
144 )
145
146 for args, target in test_cases:
147 with self.subTest(target=target, args=args):
148 t = threading.Thread(target=target, args=args)
149 t.start()
150 t.join()
151
152 @cpython_only
153 def test_disallow_instantiation(self):
154 # Ensure that the type disallows instantiation (bpo-43916)
155 lock = threading.Lock()
156 test.support.check_disallow_instantiation(self, type(lock))
157
158 # Create a bunch of threads, let each do some work, wait until all are
159 # done.
160 def test_various_ops(self):
161 # This takes about n/3 seconds to run (about n/3 clumps of tasks,
162 # times about 1 second per clump).
163 NUMTASKS = 10
164
165 # no more than 3 of the 10 can run at once
166 sema = threading.BoundedSemaphore(value=3)
167 mutex = threading.RLock()
168 numrunning = Counter()
169
170 threads = []
171
172 for i in range(NUMTASKS):
173 t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
174 threads.append(t)
175 self.assertIsNone(t.ident)
176 self.assertRegex(repr(t), r'^<TestThread\(.*, initial\)>$')
177 t.start()
178
179 if hasattr(threading, 'get_native_id'):
180 native_ids = set(t.native_id for t in threads) | {threading.get_native_id()}
181 self.assertNotIn(None, native_ids)
182 self.assertEqual(len(native_ids), NUMTASKS + 1)
183
184 if verbose:
185 print('waiting for all tasks to complete')
186 for t in threads:
187 t.join()
188 self.assertFalse(t.is_alive())
189 self.assertNotEqual(t.ident, 0)
190 self.assertIsNotNone(t.ident)
191 self.assertRegex(repr(t), r'^<TestThread\(.*, stopped -?\d+\)>$')
192 if verbose:
193 print('all tasks done')
194 self.assertEqual(numrunning.get(), 0)
195
196 def test_ident_of_no_threading_threads(self):
197 # The ident still must work for the main thread and dummy threads.
198 self.assertIsNotNone(threading.current_thread().ident)
199 def f():
200 ident.append(threading.current_thread().ident)
201 done.set()
202 done = threading.Event()
203 ident = []
204 with threading_helper.wait_threads_exit():
205 tid = _thread.start_new_thread(f, ())
206 done.wait()
207 self.assertEqual(ident[0], tid)
208 # Kill the "immortal" _DummyThread
209 del threading._active[ident[0]]
210
211 # run with a small(ish) thread stack size (256 KiB)
212 def test_various_ops_small_stack(self):
213 if verbose:
214 print('with 256 KiB thread stack size...')
215 try:
216 threading.stack_size(262144)
217 except _thread.error:
218 raise unittest.SkipTest(
219 'platform does not support changing thread stack size')
220 self.test_various_ops()
221 threading.stack_size(0)
222
223 # run with a large thread stack size (1 MiB)
224 def test_various_ops_large_stack(self):
225 if verbose:
226 print('with 1 MiB thread stack size...')
227 try:
228 threading.stack_size(0x100000)
229 except _thread.error:
230 raise unittest.SkipTest(
231 'platform does not support changing thread stack size')
232 self.test_various_ops()
233 threading.stack_size(0)
234
235 def test_foreign_thread(self):
236 # Check that a "foreign" thread can use the threading module.
237 def f(mutex):
238 # Calling current_thread() forces an entry for the foreign
239 # thread to get made in the threading._active map.
240 threading.current_thread()
241 mutex.release()
242
243 mutex = threading.Lock()
244 mutex.acquire()
245 with threading_helper.wait_threads_exit():
246 tid = _thread.start_new_thread(f, (mutex,))
247 # Wait for the thread to finish.
248 mutex.acquire()
249 self.assertIn(tid, threading._active)
250 self.assertIsInstance(threading._active[tid], threading._DummyThread)
251 #Issue 29376
252 self.assertTrue(threading._active[tid].is_alive())
253 self.assertRegex(repr(threading._active[tid]), '_DummyThread')
254 del threading._active[tid]
255
256 # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
257 # exposed at the Python level. This test relies on ctypes to get at it.
258 def test_PyThreadState_SetAsyncExc(self):
259 ctypes = import_module("ctypes")
260
261 set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc
262 set_async_exc.argtypes = (ctypes.c_ulong, ctypes.py_object)
263
264 class ESC[4;38;5;81mAsyncExc(ESC[4;38;5;149mException):
265 pass
266
267 exception = ctypes.py_object(AsyncExc)
268
269 # First check it works when setting the exception from the same thread.
270 tid = threading.get_ident()
271 self.assertIsInstance(tid, int)
272 self.assertGreater(tid, 0)
273
274 try:
275 result = set_async_exc(tid, exception)
276 # The exception is async, so we might have to keep the VM busy until
277 # it notices.
278 while True:
279 pass
280 except AsyncExc:
281 pass
282 else:
283 # This code is unreachable but it reflects the intent. If we wanted
284 # to be smarter the above loop wouldn't be infinite.
285 self.fail("AsyncExc not raised")
286 try:
287 self.assertEqual(result, 1) # one thread state modified
288 except UnboundLocalError:
289 # The exception was raised too quickly for us to get the result.
290 pass
291
292 # `worker_started` is set by the thread when it's inside a try/except
293 # block waiting to catch the asynchronously set AsyncExc exception.
294 # `worker_saw_exception` is set by the thread upon catching that
295 # exception.
296 worker_started = threading.Event()
297 worker_saw_exception = threading.Event()
298
299 class ESC[4;38;5;81mWorker(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
300 def run(self):
301 self.id = threading.get_ident()
302 self.finished = False
303
304 try:
305 while True:
306 worker_started.set()
307 time.sleep(0.1)
308 except AsyncExc:
309 self.finished = True
310 worker_saw_exception.set()
311
312 t = Worker()
313 t.daemon = True # so if this fails, we don't hang Python at shutdown
314 t.start()
315 if verbose:
316 print(" started worker thread")
317
318 # Try a thread id that doesn't make sense.
319 if verbose:
320 print(" trying nonsensical thread id")
321 result = set_async_exc(-1, exception)
322 self.assertEqual(result, 0) # no thread states modified
323
324 # Now raise an exception in the worker thread.
325 if verbose:
326 print(" waiting for worker thread to get started")
327 ret = worker_started.wait()
328 self.assertTrue(ret)
329 if verbose:
330 print(" verifying worker hasn't exited")
331 self.assertFalse(t.finished)
332 if verbose:
333 print(" attempting to raise asynch exception in worker")
334 result = set_async_exc(t.id, exception)
335 self.assertEqual(result, 1) # one thread state modified
336 if verbose:
337 print(" waiting for worker to say it caught the exception")
338 worker_saw_exception.wait(timeout=support.SHORT_TIMEOUT)
339 self.assertTrue(t.finished)
340 if verbose:
341 print(" all OK -- joining worker")
342 if t.finished:
343 t.join()
344 # else the thread is still running, and we have no way to kill it
345
346 def test_limbo_cleanup(self):
347 # Issue 7481: Failure to start thread should cleanup the limbo map.
348 def fail_new_thread(*args):
349 raise threading.ThreadError()
350 _start_new_thread = threading._start_new_thread
351 threading._start_new_thread = fail_new_thread
352 try:
353 t = threading.Thread(target=lambda: None)
354 self.assertRaises(threading.ThreadError, t.start)
355 self.assertFalse(
356 t in threading._limbo,
357 "Failed to cleanup _limbo map on failure of Thread.start().")
358 finally:
359 threading._start_new_thread = _start_new_thread
360
361 def test_finalize_running_thread(self):
362 # Issue 1402: the PyGILState_Ensure / _Release functions may be called
363 # very late on python exit: on deallocation of a running thread for
364 # example.
365 import_module("ctypes")
366
367 rc, out, err = assert_python_failure("-c", """if 1:
368 import ctypes, sys, time, _thread
369
370 # This lock is used as a simple event variable.
371 ready = _thread.allocate_lock()
372 ready.acquire()
373
374 # Module globals are cleared before __del__ is run
375 # So we save the functions in class dict
376 class C:
377 ensure = ctypes.pythonapi.PyGILState_Ensure
378 release = ctypes.pythonapi.PyGILState_Release
379 def __del__(self):
380 state = self.ensure()
381 self.release(state)
382
383 def waitingThread():
384 x = C()
385 ready.release()
386 time.sleep(100)
387
388 _thread.start_new_thread(waitingThread, ())
389 ready.acquire() # Be sure the other thread is waiting.
390 sys.exit(42)
391 """)
392 self.assertEqual(rc, 42)
393
394 def test_finalize_with_trace(self):
395 # Issue1733757
396 # Avoid a deadlock when sys.settrace steps into threading._shutdown
397 assert_python_ok("-c", """if 1:
398 import sys, threading
399
400 # A deadlock-killer, to prevent the
401 # testsuite to hang forever
402 def killer():
403 import os, time
404 time.sleep(2)
405 print('program blocked; aborting')
406 os._exit(2)
407 t = threading.Thread(target=killer)
408 t.daemon = True
409 t.start()
410
411 # This is the trace function
412 def func(frame, event, arg):
413 threading.current_thread()
414 return func
415
416 sys.settrace(func)
417 """)
418
419 def test_join_nondaemon_on_shutdown(self):
420 # Issue 1722344
421 # Raising SystemExit skipped threading._shutdown
422 rc, out, err = assert_python_ok("-c", """if 1:
423 import threading
424 from time import sleep
425
426 def child():
427 sleep(1)
428 # As a non-daemon thread we SHOULD wake up and nothing
429 # should be torn down yet
430 print("Woke up, sleep function is:", sleep)
431
432 threading.Thread(target=child).start()
433 raise SystemExit
434 """)
435 self.assertEqual(out.strip(),
436 b"Woke up, sleep function is: <built-in function sleep>")
437 self.assertEqual(err, b"")
438
439 def test_enumerate_after_join(self):
440 # Try hard to trigger #1703448: a thread is still returned in
441 # threading.enumerate() after it has been join()ed.
442 enum = threading.enumerate
443 old_interval = sys.getswitchinterval()
444 try:
445 for i in range(1, 100):
446 sys.setswitchinterval(i * 0.0002)
447 t = threading.Thread(target=lambda: None)
448 t.start()
449 t.join()
450 l = enum()
451 self.assertNotIn(t, l,
452 "#1703448 triggered after %d trials: %s" % (i, l))
453 finally:
454 sys.setswitchinterval(old_interval)
455
456 def test_no_refcycle_through_target(self):
457 class ESC[4;38;5;81mRunSelfFunction(ESC[4;38;5;149mobject):
458 def __init__(self, should_raise):
459 # The links in this refcycle from Thread back to self
460 # should be cleaned up when the thread completes.
461 self.should_raise = should_raise
462 self.thread = threading.Thread(target=self._run,
463 args=(self,),
464 kwargs={'yet_another':self})
465 self.thread.start()
466
467 def _run(self, other_ref, yet_another):
468 if self.should_raise:
469 raise SystemExit
470
471 restore_default_excepthook(self)
472
473 cyclic_object = RunSelfFunction(should_raise=False)
474 weak_cyclic_object = weakref.ref(cyclic_object)
475 cyclic_object.thread.join()
476 del cyclic_object
477 self.assertIsNone(weak_cyclic_object(),
478 msg=('%d references still around' %
479 sys.getrefcount(weak_cyclic_object())))
480
481 raising_cyclic_object = RunSelfFunction(should_raise=True)
482 weak_raising_cyclic_object = weakref.ref(raising_cyclic_object)
483 raising_cyclic_object.thread.join()
484 del raising_cyclic_object
485 self.assertIsNone(weak_raising_cyclic_object(),
486 msg=('%d references still around' %
487 sys.getrefcount(weak_raising_cyclic_object())))
488
489 def test_old_threading_api(self):
490 # Just a quick sanity check to make sure the old method names are
491 # still present
492 t = threading.Thread()
493 with self.assertWarnsRegex(DeprecationWarning,
494 r'get the daemon attribute'):
495 t.isDaemon()
496 with self.assertWarnsRegex(DeprecationWarning,
497 r'set the daemon attribute'):
498 t.setDaemon(True)
499 with self.assertWarnsRegex(DeprecationWarning,
500 r'get the name attribute'):
501 t.getName()
502 with self.assertWarnsRegex(DeprecationWarning,
503 r'set the name attribute'):
504 t.setName("name")
505
506 e = threading.Event()
507 with self.assertWarnsRegex(DeprecationWarning, 'use is_set()'):
508 e.isSet()
509
510 cond = threading.Condition()
511 cond.acquire()
512 with self.assertWarnsRegex(DeprecationWarning, 'use notify_all()'):
513 cond.notifyAll()
514
515 with self.assertWarnsRegex(DeprecationWarning, 'use active_count()'):
516 threading.activeCount()
517 with self.assertWarnsRegex(DeprecationWarning, 'use current_thread()'):
518 threading.currentThread()
519
520 def test_repr_daemon(self):
521 t = threading.Thread()
522 self.assertNotIn('daemon', repr(t))
523 t.daemon = True
524 self.assertIn('daemon', repr(t))
525
526 def test_daemon_param(self):
527 t = threading.Thread()
528 self.assertFalse(t.daemon)
529 t = threading.Thread(daemon=False)
530 self.assertFalse(t.daemon)
531 t = threading.Thread(daemon=True)
532 self.assertTrue(t.daemon)
533
534 @support.requires_fork()
535 def test_dummy_thread_after_fork(self):
536 # Issue #14308: a dummy thread in the active list doesn't mess up
537 # the after-fork mechanism.
538 code = """if 1:
539 import _thread, threading, os, time, warnings
540
541 def background_thread(evt):
542 # Creates and registers the _DummyThread instance
543 threading.current_thread()
544 evt.set()
545 time.sleep(10)
546
547 evt = threading.Event()
548 _thread.start_new_thread(background_thread, (evt,))
549 evt.wait()
550 assert threading.active_count() == 2, threading.active_count()
551 with warnings.catch_warnings(record=True) as ws:
552 warnings.filterwarnings(
553 "always", category=DeprecationWarning)
554 if os.fork() == 0:
555 assert threading.active_count() == 1, threading.active_count()
556 os._exit(0)
557 else:
558 assert ws[0].category == DeprecationWarning, ws[0]
559 assert 'fork' in str(ws[0].message), ws[0]
560 os.wait()
561 """
562 _, out, err = assert_python_ok("-c", code)
563 self.assertEqual(out, b'')
564 self.assertEqual(err, b'')
565
566 @support.requires_fork()
567 def test_is_alive_after_fork(self):
568 # Try hard to trigger #18418: is_alive() could sometimes be True on
569 # threads that vanished after a fork.
570 old_interval = sys.getswitchinterval()
571 self.addCleanup(sys.setswitchinterval, old_interval)
572
573 # Make the bug more likely to manifest.
574 test.support.setswitchinterval(1e-6)
575
576 for i in range(20):
577 t = threading.Thread(target=lambda: None)
578 t.start()
579 # Ignore the warning about fork with threads.
580 with warnings.catch_warnings(category=DeprecationWarning,
581 action="ignore"):
582 if (pid := os.fork()) == 0:
583 os._exit(11 if t.is_alive() else 10)
584 else:
585 t.join()
586
587 support.wait_process(pid, exitcode=10)
588
589 def test_main_thread(self):
590 main = threading.main_thread()
591 self.assertEqual(main.name, 'MainThread')
592 self.assertEqual(main.ident, threading.current_thread().ident)
593 self.assertEqual(main.ident, threading.get_ident())
594
595 def f():
596 self.assertNotEqual(threading.main_thread().ident,
597 threading.current_thread().ident)
598 th = threading.Thread(target=f)
599 th.start()
600 th.join()
601
602 @support.requires_fork()
603 @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
604 def test_main_thread_after_fork(self):
605 code = """if 1:
606 import os, threading
607 from test import support
608
609 pid = os.fork()
610 if pid == 0:
611 main = threading.main_thread()
612 print(main.name)
613 print(main.ident == threading.current_thread().ident)
614 print(main.ident == threading.get_ident())
615 else:
616 support.wait_process(pid, exitcode=0)
617 """
618 _, out, err = assert_python_ok("-c", code)
619 data = out.decode().replace('\r', '')
620 self.assertEqual(err, b"")
621 self.assertEqual(data, "MainThread\nTrue\nTrue\n")
622
623 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
624 @support.requires_fork()
625 @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
626 def test_main_thread_after_fork_from_nonmain_thread(self):
627 code = """if 1:
628 import os, threading, sys, warnings
629 from test import support
630
631 def func():
632 with warnings.catch_warnings(record=True) as ws:
633 warnings.filterwarnings(
634 "always", category=DeprecationWarning)
635 pid = os.fork()
636 if pid == 0:
637 main = threading.main_thread()
638 print(main.name)
639 print(main.ident == threading.current_thread().ident)
640 print(main.ident == threading.get_ident())
641 # stdout is fully buffered because not a tty,
642 # we have to flush before exit.
643 sys.stdout.flush()
644 else:
645 assert ws[0].category == DeprecationWarning, ws[0]
646 assert 'fork' in str(ws[0].message), ws[0]
647 support.wait_process(pid, exitcode=0)
648
649 th = threading.Thread(target=func)
650 th.start()
651 th.join()
652 """
653 _, out, err = assert_python_ok("-c", code)
654 data = out.decode().replace('\r', '')
655 self.assertEqual(err.decode('utf-8'), "")
656 self.assertEqual(data, "Thread-1 (func)\nTrue\nTrue\n")
657
658 def test_main_thread_during_shutdown(self):
659 # bpo-31516: current_thread() should still point to the main thread
660 # at shutdown
661 code = """if 1:
662 import gc, threading
663
664 main_thread = threading.current_thread()
665 assert main_thread is threading.main_thread() # sanity check
666
667 class RefCycle:
668 def __init__(self):
669 self.cycle = self
670
671 def __del__(self):
672 print("GC:",
673 threading.current_thread() is main_thread,
674 threading.main_thread() is main_thread,
675 threading.enumerate() == [main_thread])
676
677 RefCycle()
678 gc.collect() # sanity check
679 x = RefCycle()
680 """
681 _, out, err = assert_python_ok("-c", code)
682 data = out.decode()
683 self.assertEqual(err, b"")
684 self.assertEqual(data.splitlines(),
685 ["GC: True True True"] * 2)
686
687 def test_finalization_shutdown(self):
688 # bpo-36402: Py_Finalize() calls threading._shutdown() which must wait
689 # until Python thread states of all non-daemon threads get deleted.
690 #
691 # Test similar to SubinterpThreadingTests.test_threads_join_2(), but
692 # test the finalization of the main interpreter.
693 code = """if 1:
694 import os
695 import threading
696 import time
697 import random
698
699 def random_sleep():
700 seconds = random.random() * 0.010
701 time.sleep(seconds)
702
703 class Sleeper:
704 def __del__(self):
705 random_sleep()
706
707 tls = threading.local()
708
709 def f():
710 # Sleep a bit so that the thread is still running when
711 # Py_Finalize() is called.
712 random_sleep()
713 tls.x = Sleeper()
714 random_sleep()
715
716 threading.Thread(target=f).start()
717 random_sleep()
718 """
719 rc, out, err = assert_python_ok("-c", code)
720 self.assertEqual(err, b"")
721
722 def test_tstate_lock(self):
723 # Test an implementation detail of Thread objects.
724 started = _thread.allocate_lock()
725 finish = _thread.allocate_lock()
726 started.acquire()
727 finish.acquire()
728 def f():
729 started.release()
730 finish.acquire()
731 time.sleep(0.01)
732 # The tstate lock is None until the thread is started
733 t = threading.Thread(target=f)
734 self.assertIs(t._tstate_lock, None)
735 t.start()
736 started.acquire()
737 self.assertTrue(t.is_alive())
738 # The tstate lock can't be acquired when the thread is running
739 # (or suspended).
740 tstate_lock = t._tstate_lock
741 self.assertFalse(tstate_lock.acquire(timeout=0), False)
742 finish.release()
743 # When the thread ends, the state_lock can be successfully
744 # acquired.
745 self.assertTrue(tstate_lock.acquire(timeout=support.SHORT_TIMEOUT), False)
746 # But is_alive() is still True: we hold _tstate_lock now, which
747 # prevents is_alive() from knowing the thread's end-of-life C code
748 # is done.
749 self.assertTrue(t.is_alive())
750 # Let is_alive() find out the C code is done.
751 tstate_lock.release()
752 self.assertFalse(t.is_alive())
753 # And verify the thread disposed of _tstate_lock.
754 self.assertIsNone(t._tstate_lock)
755 t.join()
756
757 def test_repr_stopped(self):
758 # Verify that "stopped" shows up in repr(Thread) appropriately.
759 started = _thread.allocate_lock()
760 finish = _thread.allocate_lock()
761 started.acquire()
762 finish.acquire()
763 def f():
764 started.release()
765 finish.acquire()
766 t = threading.Thread(target=f)
767 t.start()
768 started.acquire()
769 self.assertIn("started", repr(t))
770 finish.release()
771 # "stopped" should appear in the repr in a reasonable amount of time.
772 # Implementation detail: as of this writing, that's trivially true
773 # if .join() is called, and almost trivially true if .is_alive() is
774 # called. The detail we're testing here is that "stopped" shows up
775 # "all on its own".
776 LOOKING_FOR = "stopped"
777 for i in range(500):
778 if LOOKING_FOR in repr(t):
779 break
780 time.sleep(0.01)
781 self.assertIn(LOOKING_FOR, repr(t)) # we waited at least 5 seconds
782 t.join()
783
784 def test_BoundedSemaphore_limit(self):
785 # BoundedSemaphore should raise ValueError if released too often.
786 for limit in range(1, 10):
787 bs = threading.BoundedSemaphore(limit)
788 threads = [threading.Thread(target=bs.acquire)
789 for _ in range(limit)]
790 for t in threads:
791 t.start()
792 for t in threads:
793 t.join()
794 threads = [threading.Thread(target=bs.release)
795 for _ in range(limit)]
796 for t in threads:
797 t.start()
798 for t in threads:
799 t.join()
800 self.assertRaises(ValueError, bs.release)
801
802 @cpython_only
803 def test_frame_tstate_tracing(self):
804 # Issue #14432: Crash when a generator is created in a C thread that is
805 # destroyed while the generator is still used. The issue was that a
806 # generator contains a frame, and the frame kept a reference to the
807 # Python state of the destroyed C thread. The crash occurs when a trace
808 # function is setup.
809
810 def noop_trace(frame, event, arg):
811 # no operation
812 return noop_trace
813
814 def generator():
815 while 1:
816 yield "generator"
817
818 def callback():
819 if callback.gen is None:
820 callback.gen = generator()
821 return next(callback.gen)
822 callback.gen = None
823
824 old_trace = sys.gettrace()
825 sys.settrace(noop_trace)
826 try:
827 # Install a trace function
828 threading.settrace(noop_trace)
829
830 # Create a generator in a C thread which exits after the call
831 import _testcapi
832 _testcapi.call_in_temporary_c_thread(callback)
833
834 # Call the generator in a different Python thread, check that the
835 # generator didn't keep a reference to the destroyed thread state
836 for test in range(3):
837 # The trace function is still called here
838 callback()
839 finally:
840 sys.settrace(old_trace)
841 threading.settrace(old_trace)
842
843 def test_gettrace(self):
844 def noop_trace(frame, event, arg):
845 # no operation
846 return noop_trace
847 old_trace = threading.gettrace()
848 try:
849 threading.settrace(noop_trace)
850 trace_func = threading.gettrace()
851 self.assertEqual(noop_trace,trace_func)
852 finally:
853 threading.settrace(old_trace)
854
855 def test_gettrace_all_threads(self):
856 def fn(*args): pass
857 old_trace = threading.gettrace()
858 first_check = threading.Event()
859 second_check = threading.Event()
860
861 trace_funcs = []
862 def checker():
863 trace_funcs.append(sys.gettrace())
864 first_check.set()
865 second_check.wait()
866 trace_funcs.append(sys.gettrace())
867
868 try:
869 t = threading.Thread(target=checker)
870 t.start()
871 first_check.wait()
872 threading.settrace_all_threads(fn)
873 second_check.set()
874 t.join()
875 self.assertEqual(trace_funcs, [None, fn])
876 self.assertEqual(threading.gettrace(), fn)
877 self.assertEqual(sys.gettrace(), fn)
878 finally:
879 threading.settrace_all_threads(old_trace)
880
881 self.assertEqual(threading.gettrace(), old_trace)
882 self.assertEqual(sys.gettrace(), old_trace)
883
884 def test_getprofile(self):
885 def fn(*args): pass
886 old_profile = threading.getprofile()
887 try:
888 threading.setprofile(fn)
889 self.assertEqual(fn, threading.getprofile())
890 finally:
891 threading.setprofile(old_profile)
892
893 def test_getprofile_all_threads(self):
894 def fn(*args): pass
895 old_profile = threading.getprofile()
896 first_check = threading.Event()
897 second_check = threading.Event()
898
899 profile_funcs = []
900 def checker():
901 profile_funcs.append(sys.getprofile())
902 first_check.set()
903 second_check.wait()
904 profile_funcs.append(sys.getprofile())
905
906 try:
907 t = threading.Thread(target=checker)
908 t.start()
909 first_check.wait()
910 threading.setprofile_all_threads(fn)
911 second_check.set()
912 t.join()
913 self.assertEqual(profile_funcs, [None, fn])
914 self.assertEqual(threading.getprofile(), fn)
915 self.assertEqual(sys.getprofile(), fn)
916 finally:
917 threading.setprofile_all_threads(old_profile)
918
919 self.assertEqual(threading.getprofile(), old_profile)
920 self.assertEqual(sys.getprofile(), old_profile)
921
922 @cpython_only
923 def test_shutdown_locks(self):
924 for daemon in (False, True):
925 with self.subTest(daemon=daemon):
926 event = threading.Event()
927 thread = threading.Thread(target=event.wait, daemon=daemon)
928
929 # Thread.start() must add lock to _shutdown_locks,
930 # but only for non-daemon thread
931 thread.start()
932 tstate_lock = thread._tstate_lock
933 if not daemon:
934 self.assertIn(tstate_lock, threading._shutdown_locks)
935 else:
936 self.assertNotIn(tstate_lock, threading._shutdown_locks)
937
938 # unblock the thread and join it
939 event.set()
940 thread.join()
941
942 # Thread._stop() must remove tstate_lock from _shutdown_locks.
943 # Daemon threads must never add it to _shutdown_locks.
944 self.assertNotIn(tstate_lock, threading._shutdown_locks)
945
946 def test_locals_at_exit(self):
947 # bpo-19466: thread locals must not be deleted before destructors
948 # are called
949 rc, out, err = assert_python_ok("-c", """if 1:
950 import threading
951
952 class Atexit:
953 def __del__(self):
954 print("thread_dict.atexit = %r" % thread_dict.atexit)
955
956 thread_dict = threading.local()
957 thread_dict.atexit = "value"
958
959 atexit = Atexit()
960 """)
961 self.assertEqual(out.rstrip(), b"thread_dict.atexit = 'value'")
962
963 def test_boolean_target(self):
964 # bpo-41149: A thread that had a boolean value of False would not
965 # run, regardless of whether it was callable. The correct behaviour
966 # is for a thread to do nothing if its target is None, and to call
967 # the target otherwise.
968 class ESC[4;38;5;81mBooleanTarget(ESC[4;38;5;149mobject):
969 def __init__(self):
970 self.ran = False
971 def __bool__(self):
972 return False
973 def __call__(self):
974 self.ran = True
975
976 target = BooleanTarget()
977 thread = threading.Thread(target=target)
978 thread.start()
979 thread.join()
980 self.assertTrue(target.ran)
981
982 def test_leak_without_join(self):
983 # bpo-37788: Test that a thread which is not joined explicitly
984 # does not leak. Test written for reference leak checks.
985 def noop(): pass
986 with threading_helper.wait_threads_exit():
987 threading.Thread(target=noop).start()
988 # Thread.join() is not called
989
990 def test_import_from_another_thread(self):
991 # bpo-1596321: If the threading module is first import from a thread
992 # different than the main thread, threading._shutdown() must handle
993 # this case without logging an error at Python exit.
994 code = textwrap.dedent('''
995 import _thread
996 import sys
997
998 event = _thread.allocate_lock()
999 event.acquire()
1000
1001 def import_threading():
1002 import threading
1003 event.release()
1004
1005 if 'threading' in sys.modules:
1006 raise Exception('threading is already imported')
1007
1008 _thread.start_new_thread(import_threading, ())
1009
1010 # wait until the threading module is imported
1011 event.acquire()
1012 event.release()
1013
1014 if 'threading' not in sys.modules:
1015 raise Exception('threading is not imported')
1016
1017 # don't wait until the thread completes
1018 ''')
1019 rc, out, err = assert_python_ok("-c", code)
1020 self.assertEqual(out, b'')
1021 self.assertEqual(err, b'')
1022
1023 def test_start_new_thread_at_exit(self):
1024 code = """if 1:
1025 import atexit
1026 import _thread
1027
1028 def f():
1029 print("shouldn't be printed")
1030
1031 def exit_handler():
1032 _thread.start_new_thread(f, ())
1033
1034 atexit.register(exit_handler)
1035 """
1036 _, out, err = assert_python_ok("-c", code)
1037 self.assertEqual(out, b'')
1038 self.assertIn(b"can't create new thread at interpreter shutdown", err)
1039
1040 class ESC[4;38;5;81mThreadJoinOnShutdown(ESC[4;38;5;149mBaseTestCase):
1041
1042 def _run_and_join(self, script):
1043 script = """if 1:
1044 import sys, os, time, threading
1045
1046 # a thread, which waits for the main program to terminate
1047 def joiningfunc(mainthread):
1048 mainthread.join()
1049 print('end of thread')
1050 # stdout is fully buffered because not a tty, we have to flush
1051 # before exit.
1052 sys.stdout.flush()
1053 \n""" + script
1054
1055 rc, out, err = assert_python_ok("-c", script)
1056 data = out.decode().replace('\r', '')
1057 self.assertEqual(data, "end of main\nend of thread\n")
1058
1059 def test_1_join_on_shutdown(self):
1060 # The usual case: on exit, wait for a non-daemon thread
1061 script = """if 1:
1062 import os
1063 t = threading.Thread(target=joiningfunc,
1064 args=(threading.current_thread(),))
1065 t.start()
1066 time.sleep(0.1)
1067 print('end of main')
1068 """
1069 self._run_and_join(script)
1070
1071 @support.requires_fork()
1072 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
1073 def test_2_join_in_forked_process(self):
1074 # Like the test above, but from a forked interpreter
1075 script = """if 1:
1076 from test import support
1077
1078 childpid = os.fork()
1079 if childpid != 0:
1080 # parent process
1081 support.wait_process(childpid, exitcode=0)
1082 sys.exit(0)
1083
1084 # child process
1085 t = threading.Thread(target=joiningfunc,
1086 args=(threading.current_thread(),))
1087 t.start()
1088 print('end of main')
1089 """
1090 self._run_and_join(script)
1091
1092 @support.requires_fork()
1093 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
1094 def test_3_join_in_forked_from_thread(self):
1095 # Like the test above, but fork() was called from a worker thread
1096 # In the forked process, the main Thread object must be marked as stopped.
1097
1098 script = """if 1:
1099 from test import support
1100
1101 main_thread = threading.current_thread()
1102 def worker():
1103 childpid = os.fork()
1104 if childpid != 0:
1105 # parent process
1106 support.wait_process(childpid, exitcode=0)
1107 sys.exit(0)
1108
1109 # child process
1110 t = threading.Thread(target=joiningfunc,
1111 args=(main_thread,))
1112 print('end of main')
1113 t.start()
1114 t.join() # Should not block: main_thread is already stopped
1115
1116 w = threading.Thread(target=worker)
1117 w.start()
1118 """
1119 self._run_and_join(script)
1120
1121 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
1122 def test_4_daemon_threads(self):
1123 # Check that a daemon thread cannot crash the interpreter on shutdown
1124 # by manipulating internal structures that are being disposed of in
1125 # the main thread.
1126 script = """if True:
1127 import os
1128 import random
1129 import sys
1130 import time
1131 import threading
1132
1133 thread_has_run = set()
1134
1135 def random_io():
1136 '''Loop for a while sleeping random tiny amounts and doing some I/O.'''
1137 import test.test_threading as mod
1138 while True:
1139 with open(mod.__file__, 'rb') as in_f:
1140 stuff = in_f.read(200)
1141 with open(os.devnull, 'wb') as null_f:
1142 null_f.write(stuff)
1143 time.sleep(random.random() / 1995)
1144 thread_has_run.add(threading.current_thread())
1145
1146 def main():
1147 count = 0
1148 for _ in range(40):
1149 new_thread = threading.Thread(target=random_io)
1150 new_thread.daemon = True
1151 new_thread.start()
1152 count += 1
1153 while len(thread_has_run) < count:
1154 time.sleep(0.001)
1155 # Trigger process shutdown
1156 sys.exit(0)
1157
1158 main()
1159 """
1160 rc, out, err = assert_python_ok('-c', script)
1161 self.assertFalse(err)
1162
1163 @support.requires_fork()
1164 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
1165 def test_reinit_tls_after_fork(self):
1166 # Issue #13817: fork() would deadlock in a multithreaded program with
1167 # the ad-hoc TLS implementation.
1168
1169 def do_fork_and_wait():
1170 # just fork a child process and wait it
1171 pid = os.fork()
1172 if pid > 0:
1173 support.wait_process(pid, exitcode=50)
1174 else:
1175 os._exit(50)
1176
1177 # Ignore the warning about fork with threads.
1178 with warnings.catch_warnings(category=DeprecationWarning,
1179 action="ignore"):
1180 # start a bunch of threads that will fork() child processes
1181 threads = []
1182 for i in range(16):
1183 t = threading.Thread(target=do_fork_and_wait)
1184 threads.append(t)
1185 t.start()
1186
1187 for t in threads:
1188 t.join()
1189
1190 @support.requires_fork()
1191 def test_clear_threads_states_after_fork(self):
1192 # Issue #17094: check that threads states are cleared after fork()
1193
1194 # start a bunch of threads
1195 threads = []
1196 for i in range(16):
1197 t = threading.Thread(target=lambda : time.sleep(0.3))
1198 threads.append(t)
1199 t.start()
1200
1201 try:
1202 # Ignore the warning about fork with threads.
1203 with warnings.catch_warnings(category=DeprecationWarning,
1204 action="ignore"):
1205 pid = os.fork()
1206 if pid == 0:
1207 # check that threads states have been cleared
1208 if len(sys._current_frames()) == 1:
1209 os._exit(51)
1210 else:
1211 os._exit(52)
1212 else:
1213 support.wait_process(pid, exitcode=51)
1214 finally:
1215 for t in threads:
1216 t.join()
1217
1218
1219 class ESC[4;38;5;81mSubinterpThreadingTests(ESC[4;38;5;149mBaseTestCase):
1220 def pipe(self):
1221 r, w = os.pipe()
1222 self.addCleanup(os.close, r)
1223 self.addCleanup(os.close, w)
1224 if hasattr(os, 'set_blocking'):
1225 os.set_blocking(r, False)
1226 return (r, w)
1227
1228 def test_threads_join(self):
1229 # Non-daemon threads should be joined at subinterpreter shutdown
1230 # (issue #18808)
1231 r, w = self.pipe()
1232 code = textwrap.dedent(r"""
1233 import os
1234 import random
1235 import threading
1236 import time
1237
1238 def random_sleep():
1239 seconds = random.random() * 0.010
1240 time.sleep(seconds)
1241
1242 def f():
1243 # Sleep a bit so that the thread is still running when
1244 # Py_EndInterpreter is called.
1245 random_sleep()
1246 os.write(%d, b"x")
1247
1248 threading.Thread(target=f).start()
1249 random_sleep()
1250 """ % (w,))
1251 ret = test.support.run_in_subinterp(code)
1252 self.assertEqual(ret, 0)
1253 # The thread was joined properly.
1254 self.assertEqual(os.read(r, 1), b"x")
1255
1256 def test_threads_join_2(self):
1257 # Same as above, but a delay gets introduced after the thread's
1258 # Python code returned but before the thread state is deleted.
1259 # To achieve this, we register a thread-local object which sleeps
1260 # a bit when deallocated.
1261 r, w = self.pipe()
1262 code = textwrap.dedent(r"""
1263 import os
1264 import random
1265 import threading
1266 import time
1267
1268 def random_sleep():
1269 seconds = random.random() * 0.010
1270 time.sleep(seconds)
1271
1272 class Sleeper:
1273 def __del__(self):
1274 random_sleep()
1275
1276 tls = threading.local()
1277
1278 def f():
1279 # Sleep a bit so that the thread is still running when
1280 # Py_EndInterpreter is called.
1281 random_sleep()
1282 tls.x = Sleeper()
1283 os.write(%d, b"x")
1284
1285 threading.Thread(target=f).start()
1286 random_sleep()
1287 """ % (w,))
1288 ret = test.support.run_in_subinterp(code)
1289 self.assertEqual(ret, 0)
1290 # The thread was joined properly.
1291 self.assertEqual(os.read(r, 1), b"x")
1292
1293 @cpython_only
1294 def test_daemon_threads_fatal_error(self):
1295 subinterp_code = f"""if 1:
1296 import os
1297 import threading
1298 import time
1299
1300 def f():
1301 # Make sure the daemon thread is still running when
1302 # Py_EndInterpreter is called.
1303 time.sleep({test.support.SHORT_TIMEOUT})
1304 threading.Thread(target=f, daemon=True).start()
1305 """
1306 script = r"""if 1:
1307 import _testcapi
1308
1309 _testcapi.run_in_subinterp(%r)
1310 """ % (subinterp_code,)
1311 with test.support.SuppressCrashReport():
1312 rc, out, err = assert_python_failure("-c", script)
1313 self.assertIn("Fatal Python error: Py_EndInterpreter: "
1314 "not the last thread", err.decode())
1315
1316 def _check_allowed(self, before_start='', *,
1317 allowed=True,
1318 daemon_allowed=True,
1319 daemon=False,
1320 ):
1321 subinterp_code = textwrap.dedent(f"""
1322 import test.support
1323 import threading
1324 def func():
1325 print('this should not have run!')
1326 t = threading.Thread(target=func, daemon={daemon})
1327 {before_start}
1328 t.start()
1329 """)
1330 script = textwrap.dedent(f"""
1331 import test.support
1332 test.support.run_in_subinterp_with_config(
1333 {subinterp_code!r},
1334 use_main_obmalloc=True,
1335 allow_fork=True,
1336 allow_exec=True,
1337 allow_threads={allowed},
1338 allow_daemon_threads={daemon_allowed},
1339 check_multi_interp_extensions=False,
1340 own_gil=False,
1341 )
1342 """)
1343 with test.support.SuppressCrashReport():
1344 _, _, err = assert_python_ok("-c", script)
1345 return err.decode()
1346
1347 @cpython_only
1348 def test_threads_not_allowed(self):
1349 err = self._check_allowed(
1350 allowed=False,
1351 daemon_allowed=False,
1352 daemon=False,
1353 )
1354 self.assertIn('RuntimeError', err)
1355
1356 @cpython_only
1357 def test_daemon_threads_not_allowed(self):
1358 with self.subTest('via Thread()'):
1359 err = self._check_allowed(
1360 allowed=True,
1361 daemon_allowed=False,
1362 daemon=True,
1363 )
1364 self.assertIn('RuntimeError', err)
1365
1366 with self.subTest('via Thread.daemon setter'):
1367 err = self._check_allowed(
1368 't.daemon = True',
1369 allowed=True,
1370 daemon_allowed=False,
1371 daemon=False,
1372 )
1373 self.assertIn('RuntimeError', err)
1374
1375
1376 class ESC[4;38;5;81mThreadingExceptionTests(ESC[4;38;5;149mBaseTestCase):
1377 # A RuntimeError should be raised if Thread.start() is called
1378 # multiple times.
1379 def test_start_thread_again(self):
1380 thread = threading.Thread()
1381 thread.start()
1382 self.assertRaises(RuntimeError, thread.start)
1383 thread.join()
1384
1385 def test_joining_current_thread(self):
1386 current_thread = threading.current_thread()
1387 self.assertRaises(RuntimeError, current_thread.join);
1388
1389 def test_joining_inactive_thread(self):
1390 thread = threading.Thread()
1391 self.assertRaises(RuntimeError, thread.join)
1392
1393 def test_daemonize_active_thread(self):
1394 thread = threading.Thread()
1395 thread.start()
1396 self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
1397 thread.join()
1398
1399 def test_releasing_unacquired_lock(self):
1400 lock = threading.Lock()
1401 self.assertRaises(RuntimeError, lock.release)
1402
1403 @requires_subprocess()
1404 def test_recursion_limit(self):
1405 # Issue 9670
1406 # test that excessive recursion within a non-main thread causes
1407 # an exception rather than crashing the interpreter on platforms
1408 # like Mac OS X or FreeBSD which have small default stack sizes
1409 # for threads
1410 script = """if True:
1411 import threading
1412
1413 def recurse():
1414 return recurse()
1415
1416 def outer():
1417 try:
1418 recurse()
1419 except RecursionError:
1420 pass
1421
1422 w = threading.Thread(target=outer)
1423 w.start()
1424 w.join()
1425 print('end of main thread')
1426 """
1427 expected_output = "end of main thread\n"
1428 p = subprocess.Popen([sys.executable, "-c", script],
1429 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1430 stdout, stderr = p.communicate()
1431 data = stdout.decode().replace('\r', '')
1432 self.assertEqual(p.returncode, 0, "Unexpected error: " + stderr.decode())
1433 self.assertEqual(data, expected_output)
1434
1435 def test_print_exception(self):
1436 script = r"""if True:
1437 import threading
1438 import time
1439
1440 running = False
1441 def run():
1442 global running
1443 running = True
1444 while running:
1445 time.sleep(0.01)
1446 1/0
1447 t = threading.Thread(target=run)
1448 t.start()
1449 while not running:
1450 time.sleep(0.01)
1451 running = False
1452 t.join()
1453 """
1454 rc, out, err = assert_python_ok("-c", script)
1455 self.assertEqual(out, b'')
1456 err = err.decode()
1457 self.assertIn("Exception in thread", err)
1458 self.assertIn("Traceback (most recent call last):", err)
1459 self.assertIn("ZeroDivisionError", err)
1460 self.assertNotIn("Unhandled exception", err)
1461
1462 def test_print_exception_stderr_is_none_1(self):
1463 script = r"""if True:
1464 import sys
1465 import threading
1466 import time
1467
1468 running = False
1469 def run():
1470 global running
1471 running = True
1472 while running:
1473 time.sleep(0.01)
1474 1/0
1475 t = threading.Thread(target=run)
1476 t.start()
1477 while not running:
1478 time.sleep(0.01)
1479 sys.stderr = None
1480 running = False
1481 t.join()
1482 """
1483 rc, out, err = assert_python_ok("-c", script)
1484 self.assertEqual(out, b'')
1485 err = err.decode()
1486 self.assertIn("Exception in thread", err)
1487 self.assertIn("Traceback (most recent call last):", err)
1488 self.assertIn("ZeroDivisionError", err)
1489 self.assertNotIn("Unhandled exception", err)
1490
1491 def test_print_exception_stderr_is_none_2(self):
1492 script = r"""if True:
1493 import sys
1494 import threading
1495 import time
1496
1497 running = False
1498 def run():
1499 global running
1500 running = True
1501 while running:
1502 time.sleep(0.01)
1503 1/0
1504 sys.stderr = None
1505 t = threading.Thread(target=run)
1506 t.start()
1507 while not running:
1508 time.sleep(0.01)
1509 running = False
1510 t.join()
1511 """
1512 rc, out, err = assert_python_ok("-c", script)
1513 self.assertEqual(out, b'')
1514 self.assertNotIn("Unhandled exception", err.decode())
1515
1516 def test_print_exception_gh_102056(self):
1517 # This used to crash. See gh-102056.
1518 script = r"""if True:
1519 import time
1520 import threading
1521 import _thread
1522
1523 def f():
1524 try:
1525 f()
1526 except RecursionError:
1527 f()
1528
1529 def g():
1530 try:
1531 raise ValueError()
1532 except* ValueError:
1533 f()
1534
1535 def h():
1536 time.sleep(1)
1537 _thread.interrupt_main()
1538
1539 t = threading.Thread(target=h)
1540 t.start()
1541 g()
1542 t.join()
1543 """
1544
1545 assert_python_failure("-c", script)
1546
1547 def test_bare_raise_in_brand_new_thread(self):
1548 def bare_raise():
1549 raise
1550
1551 class ESC[4;38;5;81mIssue27558(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
1552 exc = None
1553
1554 def run(self):
1555 try:
1556 bare_raise()
1557 except Exception as exc:
1558 self.exc = exc
1559
1560 thread = Issue27558()
1561 thread.start()
1562 thread.join()
1563 self.assertIsNotNone(thread.exc)
1564 self.assertIsInstance(thread.exc, RuntimeError)
1565 # explicitly break the reference cycle to not leak a dangling thread
1566 thread.exc = None
1567
1568 def test_multithread_modify_file_noerror(self):
1569 # See issue25872
1570 def modify_file():
1571 with open(os_helper.TESTFN, 'w', encoding='utf-8') as fp:
1572 fp.write(' ')
1573 traceback.format_stack()
1574
1575 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1576 threads = [
1577 threading.Thread(target=modify_file)
1578 for i in range(100)
1579 ]
1580 for t in threads:
1581 t.start()
1582 t.join()
1583
1584
1585 class ESC[4;38;5;81mThreadRunFail(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
1586 def run(self):
1587 raise ValueError("run failed")
1588
1589
1590 class ESC[4;38;5;81mExceptHookTests(ESC[4;38;5;149mBaseTestCase):
1591 def setUp(self):
1592 restore_default_excepthook(self)
1593 super().setUp()
1594
1595 def test_excepthook(self):
1596 with support.captured_output("stderr") as stderr:
1597 thread = ThreadRunFail(name="excepthook thread")
1598 thread.start()
1599 thread.join()
1600
1601 stderr = stderr.getvalue().strip()
1602 self.assertIn(f'Exception in thread {thread.name}:\n', stderr)
1603 self.assertIn('Traceback (most recent call last):\n', stderr)
1604 self.assertIn(' raise ValueError("run failed")', stderr)
1605 self.assertIn('ValueError: run failed', stderr)
1606
1607 @support.cpython_only
1608 def test_excepthook_thread_None(self):
1609 # threading.excepthook called with thread=None: log the thread
1610 # identifier in this case.
1611 with support.captured_output("stderr") as stderr:
1612 try:
1613 raise ValueError("bug")
1614 except Exception as exc:
1615 args = threading.ExceptHookArgs([*sys.exc_info(), None])
1616 try:
1617 threading.excepthook(args)
1618 finally:
1619 # Explicitly break a reference cycle
1620 args = None
1621
1622 stderr = stderr.getvalue().strip()
1623 self.assertIn(f'Exception in thread {threading.get_ident()}:\n', stderr)
1624 self.assertIn('Traceback (most recent call last):\n', stderr)
1625 self.assertIn(' raise ValueError("bug")', stderr)
1626 self.assertIn('ValueError: bug', stderr)
1627
1628 def test_system_exit(self):
1629 class ESC[4;38;5;81mThreadExit(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
1630 def run(self):
1631 sys.exit(1)
1632
1633 # threading.excepthook() silently ignores SystemExit
1634 with support.captured_output("stderr") as stderr:
1635 thread = ThreadExit()
1636 thread.start()
1637 thread.join()
1638
1639 self.assertEqual(stderr.getvalue(), '')
1640
1641 def test_custom_excepthook(self):
1642 args = None
1643
1644 def hook(hook_args):
1645 nonlocal args
1646 args = hook_args
1647
1648 try:
1649 with support.swap_attr(threading, 'excepthook', hook):
1650 thread = ThreadRunFail()
1651 thread.start()
1652 thread.join()
1653
1654 self.assertEqual(args.exc_type, ValueError)
1655 self.assertEqual(str(args.exc_value), 'run failed')
1656 self.assertEqual(args.exc_traceback, args.exc_value.__traceback__)
1657 self.assertIs(args.thread, thread)
1658 finally:
1659 # Break reference cycle
1660 args = None
1661
1662 def test_custom_excepthook_fail(self):
1663 def threading_hook(args):
1664 raise ValueError("threading_hook failed")
1665
1666 err_str = None
1667
1668 def sys_hook(exc_type, exc_value, exc_traceback):
1669 nonlocal err_str
1670 err_str = str(exc_value)
1671
1672 with support.swap_attr(threading, 'excepthook', threading_hook), \
1673 support.swap_attr(sys, 'excepthook', sys_hook), \
1674 support.captured_output('stderr') as stderr:
1675 thread = ThreadRunFail()
1676 thread.start()
1677 thread.join()
1678
1679 self.assertEqual(stderr.getvalue(),
1680 'Exception in threading.excepthook:\n')
1681 self.assertEqual(err_str, 'threading_hook failed')
1682
1683 def test_original_excepthook(self):
1684 def run_thread():
1685 with support.captured_output("stderr") as output:
1686 thread = ThreadRunFail(name="excepthook thread")
1687 thread.start()
1688 thread.join()
1689 return output.getvalue()
1690
1691 def threading_hook(args):
1692 print("Running a thread failed", file=sys.stderr)
1693
1694 default_output = run_thread()
1695 with support.swap_attr(threading, 'excepthook', threading_hook):
1696 custom_hook_output = run_thread()
1697 threading.excepthook = threading.__excepthook__
1698 recovered_output = run_thread()
1699
1700 self.assertEqual(default_output, recovered_output)
1701 self.assertNotEqual(default_output, custom_hook_output)
1702 self.assertEqual(custom_hook_output, "Running a thread failed\n")
1703
1704
1705 class ESC[4;38;5;81mTimerTests(ESC[4;38;5;149mBaseTestCase):
1706
1707 def setUp(self):
1708 BaseTestCase.setUp(self)
1709 self.callback_args = []
1710 self.callback_event = threading.Event()
1711
1712 def test_init_immutable_default_args(self):
1713 # Issue 17435: constructor defaults were mutable objects, they could be
1714 # mutated via the object attributes and affect other Timer objects.
1715 timer1 = threading.Timer(0.01, self._callback_spy)
1716 timer1.start()
1717 self.callback_event.wait()
1718 timer1.args.append("blah")
1719 timer1.kwargs["foo"] = "bar"
1720 self.callback_event.clear()
1721 timer2 = threading.Timer(0.01, self._callback_spy)
1722 timer2.start()
1723 self.callback_event.wait()
1724 self.assertEqual(len(self.callback_args), 2)
1725 self.assertEqual(self.callback_args, [((), {}), ((), {})])
1726 timer1.join()
1727 timer2.join()
1728
1729 def _callback_spy(self, *args, **kwargs):
1730 self.callback_args.append((args[:], kwargs.copy()))
1731 self.callback_event.set()
1732
1733 class ESC[4;38;5;81mLockTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mLockTests):
1734 locktype = staticmethod(threading.Lock)
1735
1736 class ESC[4;38;5;81mPyRLockTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mRLockTests):
1737 locktype = staticmethod(threading._PyRLock)
1738
1739 @unittest.skipIf(threading._CRLock is None, 'RLock not implemented in C')
1740 class ESC[4;38;5;81mCRLockTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mRLockTests):
1741 locktype = staticmethod(threading._CRLock)
1742
1743 class ESC[4;38;5;81mEventTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mEventTests):
1744 eventtype = staticmethod(threading.Event)
1745
1746 class ESC[4;38;5;81mConditionAsRLockTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mRLockTests):
1747 # Condition uses an RLock by default and exports its API.
1748 locktype = staticmethod(threading.Condition)
1749
1750 class ESC[4;38;5;81mConditionTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mConditionTests):
1751 condtype = staticmethod(threading.Condition)
1752
1753 class ESC[4;38;5;81mSemaphoreTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mSemaphoreTests):
1754 semtype = staticmethod(threading.Semaphore)
1755
1756 class ESC[4;38;5;81mBoundedSemaphoreTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mBoundedSemaphoreTests):
1757 semtype = staticmethod(threading.BoundedSemaphore)
1758
1759 class ESC[4;38;5;81mBarrierTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mBarrierTests):
1760 barriertype = staticmethod(threading.Barrier)
1761
1762
1763 class ESC[4;38;5;81mMiscTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1764 def test__all__(self):
1765 restore_default_excepthook(self)
1766
1767 extra = {"ThreadError"}
1768 not_exported = {'currentThread', 'activeCount'}
1769 support.check__all__(self, threading, ('threading', '_thread'),
1770 extra=extra, not_exported=not_exported)
1771
1772
1773 class ESC[4;38;5;81mInterruptMainTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1774 def check_interrupt_main_with_signal_handler(self, signum):
1775 def handler(signum, frame):
1776 1/0
1777
1778 old_handler = signal.signal(signum, handler)
1779 self.addCleanup(signal.signal, signum, old_handler)
1780
1781 with self.assertRaises(ZeroDivisionError):
1782 _thread.interrupt_main()
1783
1784 def check_interrupt_main_noerror(self, signum):
1785 handler = signal.getsignal(signum)
1786 try:
1787 # No exception should arise.
1788 signal.signal(signum, signal.SIG_IGN)
1789 _thread.interrupt_main(signum)
1790
1791 signal.signal(signum, signal.SIG_DFL)
1792 _thread.interrupt_main(signum)
1793 finally:
1794 # Restore original handler
1795 signal.signal(signum, handler)
1796
1797 def test_interrupt_main_subthread(self):
1798 # Calling start_new_thread with a function that executes interrupt_main
1799 # should raise KeyboardInterrupt upon completion.
1800 def call_interrupt():
1801 _thread.interrupt_main()
1802 t = threading.Thread(target=call_interrupt)
1803 with self.assertRaises(KeyboardInterrupt):
1804 t.start()
1805 t.join()
1806 t.join()
1807
1808 def test_interrupt_main_mainthread(self):
1809 # Make sure that if interrupt_main is called in main thread that
1810 # KeyboardInterrupt is raised instantly.
1811 with self.assertRaises(KeyboardInterrupt):
1812 _thread.interrupt_main()
1813
1814 def test_interrupt_main_with_signal_handler(self):
1815 self.check_interrupt_main_with_signal_handler(signal.SIGINT)
1816 self.check_interrupt_main_with_signal_handler(signal.SIGTERM)
1817
1818 def test_interrupt_main_noerror(self):
1819 self.check_interrupt_main_noerror(signal.SIGINT)
1820 self.check_interrupt_main_noerror(signal.SIGTERM)
1821
1822 def test_interrupt_main_invalid_signal(self):
1823 self.assertRaises(ValueError, _thread.interrupt_main, -1)
1824 self.assertRaises(ValueError, _thread.interrupt_main, signal.NSIG)
1825 self.assertRaises(ValueError, _thread.interrupt_main, 1000000)
1826
1827 @threading_helper.reap_threads
1828 def test_can_interrupt_tight_loops(self):
1829 cont = [True]
1830 started = [False]
1831 interrupted = [False]
1832
1833 def worker(started, cont, interrupted):
1834 iterations = 100_000_000
1835 started[0] = True
1836 while cont[0]:
1837 if iterations:
1838 iterations -= 1
1839 else:
1840 return
1841 pass
1842 interrupted[0] = True
1843
1844 t = threading.Thread(target=worker,args=(started, cont, interrupted))
1845 t.start()
1846 while not started[0]:
1847 pass
1848 cont[0] = False
1849 t.join()
1850 self.assertTrue(interrupted[0])
1851
1852
1853 class ESC[4;38;5;81mAtexitTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1854
1855 def test_atexit_output(self):
1856 rc, out, err = assert_python_ok("-c", """if True:
1857 import threading
1858
1859 def run_last():
1860 print('parrot')
1861
1862 threading._register_atexit(run_last)
1863 """)
1864
1865 self.assertFalse(err)
1866 self.assertEqual(out.strip(), b'parrot')
1867
1868 def test_atexit_called_once(self):
1869 rc, out, err = assert_python_ok("-c", """if True:
1870 import threading
1871 from unittest.mock import Mock
1872
1873 mock = Mock()
1874 threading._register_atexit(mock)
1875 mock.assert_not_called()
1876 # force early shutdown to ensure it was called once
1877 threading._shutdown()
1878 mock.assert_called_once()
1879 """)
1880
1881 self.assertFalse(err)
1882
1883 def test_atexit_after_shutdown(self):
1884 # The only way to do this is by registering an atexit within
1885 # an atexit, which is intended to raise an exception.
1886 rc, out, err = assert_python_ok("-c", """if True:
1887 import threading
1888
1889 def func():
1890 pass
1891
1892 def run_last():
1893 threading._register_atexit(func)
1894
1895 threading._register_atexit(run_last)
1896 """)
1897
1898 self.assertTrue(err)
1899 self.assertIn("RuntimeError: can't register atexit after shutdown",
1900 err.decode())
1901
1902
1903 if __name__ == "__main__":
1904 unittest.main()