1 """
2 Tests for the threading module.
3 """
4
5 import test.support
6 from test.support import threading_helper, requires_subprocess
7 from test.support import verbose, cpython_only, os_helper
8 from test.support.import_helper import import_module
9 from test.support.script_helper import assert_python_ok, assert_python_failure
10
11 import random
12 import sys
13 import _thread
14 import threading
15 import time
16 import unittest
17 import weakref
18 import os
19 import subprocess
20 import signal
21 import textwrap
22 import traceback
23
24 from unittest import mock
25 from test import lock_tests
26 from test import support
27
28 threading_helper.requires_working_threading(module=True)
29
30 # Between fork() and exec(), only async-safe functions are allowed (issues
31 # #12316 and #11870), and fork() from a worker thread is known to trigger
32 # problems with some operating systems (issue #3863): skip problematic tests
33 # on platforms known to behave badly.
34 platforms_to_skip = ('netbsd5', 'hp-ux11')
35
36 # Is Python built with Py_DEBUG macro defined?
37 Py_DEBUG = hasattr(sys, 'gettotalrefcount')
38
39
40 def skip_unless_reliable_fork(test):
41 if not support.has_fork_support:
42 return unittest.skip("requires working os.fork()")(test)
43 if sys.platform in platforms_to_skip:
44 return unittest.skip("due to known OS bug related to thread+fork")(test)
45 if support.HAVE_ASAN_FORK_BUG:
46 return unittest.skip("libasan has a pthread_create() dead lock related to thread+fork")(test)
47 return test
48
49
50 def restore_default_excepthook(testcase):
51 testcase.addCleanup(setattr, threading, 'excepthook', threading.excepthook)
52 threading.excepthook = threading.__excepthook__
53
54
55 # A trivial mutable counter.
56 class ESC[4;38;5;81mCounter(ESC[4;38;5;149mobject):
57 def __init__(self):
58 self.value = 0
59 def inc(self):
60 self.value += 1
61 def dec(self):
62 self.value -= 1
63 def get(self):
64 return self.value
65
66 class ESC[4;38;5;81mTestThread(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
67 def __init__(self, name, testcase, sema, mutex, nrunning):
68 threading.Thread.__init__(self, name=name)
69 self.testcase = testcase
70 self.sema = sema
71 self.mutex = mutex
72 self.nrunning = nrunning
73
74 def run(self):
75 delay = random.random() / 10000.0
76 if verbose:
77 print('task %s will run for %.1f usec' %
78 (self.name, delay * 1e6))
79
80 with self.sema:
81 with self.mutex:
82 self.nrunning.inc()
83 if verbose:
84 print(self.nrunning.get(), 'tasks are running')
85 self.testcase.assertLessEqual(self.nrunning.get(), 3)
86
87 time.sleep(delay)
88 if verbose:
89 print('task', self.name, 'done')
90
91 with self.mutex:
92 self.nrunning.dec()
93 self.testcase.assertGreaterEqual(self.nrunning.get(), 0)
94 if verbose:
95 print('%s is finished. %d tasks are running' %
96 (self.name, self.nrunning.get()))
97
98
99 class ESC[4;38;5;81mBaseTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
100 def setUp(self):
101 self._threads = threading_helper.threading_setup()
102
103 def tearDown(self):
104 threading_helper.threading_cleanup(*self._threads)
105 test.support.reap_children()
106
107
108 class ESC[4;38;5;81mThreadTests(ESC[4;38;5;149mBaseTestCase):
109
110 @cpython_only
111 def test_name(self):
112 def func(): pass
113
114 thread = threading.Thread(name="myname1")
115 self.assertEqual(thread.name, "myname1")
116
117 # Convert int name to str
118 thread = threading.Thread(name=123)
119 self.assertEqual(thread.name, "123")
120
121 # target name is ignored if name is specified
122 thread = threading.Thread(target=func, name="myname2")
123 self.assertEqual(thread.name, "myname2")
124
125 with mock.patch.object(threading, '_counter', return_value=2):
126 thread = threading.Thread(name="")
127 self.assertEqual(thread.name, "Thread-2")
128
129 with mock.patch.object(threading, '_counter', return_value=3):
130 thread = threading.Thread()
131 self.assertEqual(thread.name, "Thread-3")
132
133 with mock.patch.object(threading, '_counter', return_value=5):
134 thread = threading.Thread(target=func)
135 self.assertEqual(thread.name, "Thread-5 (func)")
136
137 def test_args_argument(self):
138 # bpo-45735: Using list or tuple as *args* in constructor could
139 # achieve the same effect.
140 num_list = [1]
141 num_tuple = (1,)
142
143 str_list = ["str"]
144 str_tuple = ("str",)
145
146 list_in_tuple = ([1],)
147 tuple_in_list = [(1,)]
148
149 test_cases = (
150 (num_list, lambda arg: self.assertEqual(arg, 1)),
151 (num_tuple, lambda arg: self.assertEqual(arg, 1)),
152 (str_list, lambda arg: self.assertEqual(arg, "str")),
153 (str_tuple, lambda arg: self.assertEqual(arg, "str")),
154 (list_in_tuple, lambda arg: self.assertEqual(arg, [1])),
155 (tuple_in_list, lambda arg: self.assertEqual(arg, (1,)))
156 )
157
158 for args, target in test_cases:
159 with self.subTest(target=target, args=args):
160 t = threading.Thread(target=target, args=args)
161 t.start()
162 t.join()
163
164 @cpython_only
165 def test_disallow_instantiation(self):
166 # Ensure that the type disallows instantiation (bpo-43916)
167 lock = threading.Lock()
168 test.support.check_disallow_instantiation(self, type(lock))
169
170 # Create a bunch of threads, let each do some work, wait until all are
171 # done.
172 def test_various_ops(self):
173 # This takes about n/3 seconds to run (about n/3 clumps of tasks,
174 # times about 1 second per clump).
175 NUMTASKS = 10
176
177 # no more than 3 of the 10 can run at once
178 sema = threading.BoundedSemaphore(value=3)
179 mutex = threading.RLock()
180 numrunning = Counter()
181
182 threads = []
183
184 for i in range(NUMTASKS):
185 t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
186 threads.append(t)
187 self.assertIsNone(t.ident)
188 self.assertRegex(repr(t), r'^<TestThread\(.*, initial\)>$')
189 t.start()
190
191 if hasattr(threading, 'get_native_id'):
192 native_ids = set(t.native_id for t in threads) | {threading.get_native_id()}
193 self.assertNotIn(None, native_ids)
194 self.assertEqual(len(native_ids), NUMTASKS + 1)
195
196 if verbose:
197 print('waiting for all tasks to complete')
198 for t in threads:
199 t.join()
200 self.assertFalse(t.is_alive())
201 self.assertNotEqual(t.ident, 0)
202 self.assertIsNotNone(t.ident)
203 self.assertRegex(repr(t), r'^<TestThread\(.*, stopped -?\d+\)>$')
204 if verbose:
205 print('all tasks done')
206 self.assertEqual(numrunning.get(), 0)
207
208 def test_ident_of_no_threading_threads(self):
209 # The ident still must work for the main thread and dummy threads.
210 self.assertIsNotNone(threading.current_thread().ident)
211 def f():
212 ident.append(threading.current_thread().ident)
213 done.set()
214 done = threading.Event()
215 ident = []
216 with threading_helper.wait_threads_exit():
217 tid = _thread.start_new_thread(f, ())
218 done.wait()
219 self.assertEqual(ident[0], tid)
220 # Kill the "immortal" _DummyThread
221 del threading._active[ident[0]]
222
223 # run with a small(ish) thread stack size (256 KiB)
224 def test_various_ops_small_stack(self):
225 if verbose:
226 print('with 256 KiB thread stack size...')
227 try:
228 threading.stack_size(262144)
229 except _thread.error:
230 raise unittest.SkipTest(
231 'platform does not support changing thread stack size')
232 self.test_various_ops()
233 threading.stack_size(0)
234
235 # run with a large thread stack size (1 MiB)
236 def test_various_ops_large_stack(self):
237 if verbose:
238 print('with 1 MiB thread stack size...')
239 try:
240 threading.stack_size(0x100000)
241 except _thread.error:
242 raise unittest.SkipTest(
243 'platform does not support changing thread stack size')
244 self.test_various_ops()
245 threading.stack_size(0)
246
247 def test_foreign_thread(self):
248 # Check that a "foreign" thread can use the threading module.
249 def f(mutex):
250 # Calling current_thread() forces an entry for the foreign
251 # thread to get made in the threading._active map.
252 threading.current_thread()
253 mutex.release()
254
255 mutex = threading.Lock()
256 mutex.acquire()
257 with threading_helper.wait_threads_exit():
258 tid = _thread.start_new_thread(f, (mutex,))
259 # Wait for the thread to finish.
260 mutex.acquire()
261 self.assertIn(tid, threading._active)
262 self.assertIsInstance(threading._active[tid], threading._DummyThread)
263 #Issue 29376
264 self.assertTrue(threading._active[tid].is_alive())
265 self.assertRegex(repr(threading._active[tid]), '_DummyThread')
266 del threading._active[tid]
267
268 # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
269 # exposed at the Python level. This test relies on ctypes to get at it.
270 def test_PyThreadState_SetAsyncExc(self):
271 ctypes = import_module("ctypes")
272
273 set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc
274 set_async_exc.argtypes = (ctypes.c_ulong, ctypes.py_object)
275
276 class ESC[4;38;5;81mAsyncExc(ESC[4;38;5;149mException):
277 pass
278
279 exception = ctypes.py_object(AsyncExc)
280
281 # First check it works when setting the exception from the same thread.
282 tid = threading.get_ident()
283 self.assertIsInstance(tid, int)
284 self.assertGreater(tid, 0)
285
286 try:
287 result = set_async_exc(tid, exception)
288 # The exception is async, so we might have to keep the VM busy until
289 # it notices.
290 while True:
291 pass
292 except AsyncExc:
293 pass
294 else:
295 # This code is unreachable but it reflects the intent. If we wanted
296 # to be smarter the above loop wouldn't be infinite.
297 self.fail("AsyncExc not raised")
298 try:
299 self.assertEqual(result, 1) # one thread state modified
300 except UnboundLocalError:
301 # The exception was raised too quickly for us to get the result.
302 pass
303
304 # `worker_started` is set by the thread when it's inside a try/except
305 # block waiting to catch the asynchronously set AsyncExc exception.
306 # `worker_saw_exception` is set by the thread upon catching that
307 # exception.
308 worker_started = threading.Event()
309 worker_saw_exception = threading.Event()
310
311 class ESC[4;38;5;81mWorker(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
312 def run(self):
313 self.id = threading.get_ident()
314 self.finished = False
315
316 try:
317 while True:
318 worker_started.set()
319 time.sleep(0.1)
320 except AsyncExc:
321 self.finished = True
322 worker_saw_exception.set()
323
324 t = Worker()
325 t.daemon = True # so if this fails, we don't hang Python at shutdown
326 t.start()
327 if verbose:
328 print(" started worker thread")
329
330 # Try a thread id that doesn't make sense.
331 if verbose:
332 print(" trying nonsensical thread id")
333 result = set_async_exc(-1, exception)
334 self.assertEqual(result, 0) # no thread states modified
335
336 # Now raise an exception in the worker thread.
337 if verbose:
338 print(" waiting for worker thread to get started")
339 ret = worker_started.wait()
340 self.assertTrue(ret)
341 if verbose:
342 print(" verifying worker hasn't exited")
343 self.assertFalse(t.finished)
344 if verbose:
345 print(" attempting to raise asynch exception in worker")
346 result = set_async_exc(t.id, exception)
347 self.assertEqual(result, 1) # one thread state modified
348 if verbose:
349 print(" waiting for worker to say it caught the exception")
350 worker_saw_exception.wait(timeout=support.SHORT_TIMEOUT)
351 self.assertTrue(t.finished)
352 if verbose:
353 print(" all OK -- joining worker")
354 if t.finished:
355 t.join()
356 # else the thread is still running, and we have no way to kill it
357
358 def test_limbo_cleanup(self):
359 # Issue 7481: Failure to start thread should cleanup the limbo map.
360 def fail_new_thread(*args):
361 raise threading.ThreadError()
362 _start_new_thread = threading._start_new_thread
363 threading._start_new_thread = fail_new_thread
364 try:
365 t = threading.Thread(target=lambda: None)
366 self.assertRaises(threading.ThreadError, t.start)
367 self.assertFalse(
368 t in threading._limbo,
369 "Failed to cleanup _limbo map on failure of Thread.start().")
370 finally:
371 threading._start_new_thread = _start_new_thread
372
373 def test_finalize_running_thread(self):
374 # Issue 1402: the PyGILState_Ensure / _Release functions may be called
375 # very late on python exit: on deallocation of a running thread for
376 # example.
377 import_module("ctypes")
378
379 rc, out, err = assert_python_failure("-c", """if 1:
380 import ctypes, sys, time, _thread
381
382 # This lock is used as a simple event variable.
383 ready = _thread.allocate_lock()
384 ready.acquire()
385
386 # Module globals are cleared before __del__ is run
387 # So we save the functions in class dict
388 class C:
389 ensure = ctypes.pythonapi.PyGILState_Ensure
390 release = ctypes.pythonapi.PyGILState_Release
391 def __del__(self):
392 state = self.ensure()
393 self.release(state)
394
395 def waitingThread():
396 x = C()
397 ready.release()
398 time.sleep(100)
399
400 _thread.start_new_thread(waitingThread, ())
401 ready.acquire() # Be sure the other thread is waiting.
402 sys.exit(42)
403 """)
404 self.assertEqual(rc, 42)
405
406 def test_finalize_with_trace(self):
407 # Issue1733757
408 # Avoid a deadlock when sys.settrace steps into threading._shutdown
409 assert_python_ok("-c", """if 1:
410 import sys, threading
411
412 # A deadlock-killer, to prevent the
413 # testsuite to hang forever
414 def killer():
415 import os, time
416 time.sleep(2)
417 print('program blocked; aborting')
418 os._exit(2)
419 t = threading.Thread(target=killer)
420 t.daemon = True
421 t.start()
422
423 # This is the trace function
424 def func(frame, event, arg):
425 threading.current_thread()
426 return func
427
428 sys.settrace(func)
429 """)
430
431 def test_join_nondaemon_on_shutdown(self):
432 # Issue 1722344
433 # Raising SystemExit skipped threading._shutdown
434 rc, out, err = assert_python_ok("-c", """if 1:
435 import threading
436 from time import sleep
437
438 def child():
439 sleep(1)
440 # As a non-daemon thread we SHOULD wake up and nothing
441 # should be torn down yet
442 print("Woke up, sleep function is:", sleep)
443
444 threading.Thread(target=child).start()
445 raise SystemExit
446 """)
447 self.assertEqual(out.strip(),
448 b"Woke up, sleep function is: <built-in function sleep>")
449 self.assertEqual(err, b"")
450
451 def test_enumerate_after_join(self):
452 # Try hard to trigger #1703448: a thread is still returned in
453 # threading.enumerate() after it has been join()ed.
454 enum = threading.enumerate
455 old_interval = sys.getswitchinterval()
456 try:
457 for i in range(1, 100):
458 sys.setswitchinterval(i * 0.0002)
459 t = threading.Thread(target=lambda: None)
460 t.start()
461 t.join()
462 l = enum()
463 self.assertNotIn(t, l,
464 "#1703448 triggered after %d trials: %s" % (i, l))
465 finally:
466 sys.setswitchinterval(old_interval)
467
468 def test_no_refcycle_through_target(self):
469 class ESC[4;38;5;81mRunSelfFunction(ESC[4;38;5;149mobject):
470 def __init__(self, should_raise):
471 # The links in this refcycle from Thread back to self
472 # should be cleaned up when the thread completes.
473 self.should_raise = should_raise
474 self.thread = threading.Thread(target=self._run,
475 args=(self,),
476 kwargs={'yet_another':self})
477 self.thread.start()
478
479 def _run(self, other_ref, yet_another):
480 if self.should_raise:
481 raise SystemExit
482
483 restore_default_excepthook(self)
484
485 cyclic_object = RunSelfFunction(should_raise=False)
486 weak_cyclic_object = weakref.ref(cyclic_object)
487 cyclic_object.thread.join()
488 del cyclic_object
489 self.assertIsNone(weak_cyclic_object(),
490 msg=('%d references still around' %
491 sys.getrefcount(weak_cyclic_object())))
492
493 raising_cyclic_object = RunSelfFunction(should_raise=True)
494 weak_raising_cyclic_object = weakref.ref(raising_cyclic_object)
495 raising_cyclic_object.thread.join()
496 del raising_cyclic_object
497 self.assertIsNone(weak_raising_cyclic_object(),
498 msg=('%d references still around' %
499 sys.getrefcount(weak_raising_cyclic_object())))
500
501 def test_old_threading_api(self):
502 # Just a quick sanity check to make sure the old method names are
503 # still present
504 t = threading.Thread()
505 with self.assertWarnsRegex(DeprecationWarning,
506 r'get the daemon attribute'):
507 t.isDaemon()
508 with self.assertWarnsRegex(DeprecationWarning,
509 r'set the daemon attribute'):
510 t.setDaemon(True)
511 with self.assertWarnsRegex(DeprecationWarning,
512 r'get the name attribute'):
513 t.getName()
514 with self.assertWarnsRegex(DeprecationWarning,
515 r'set the name attribute'):
516 t.setName("name")
517
518 e = threading.Event()
519 with self.assertWarnsRegex(DeprecationWarning, 'use is_set()'):
520 e.isSet()
521
522 cond = threading.Condition()
523 cond.acquire()
524 with self.assertWarnsRegex(DeprecationWarning, 'use notify_all()'):
525 cond.notifyAll()
526
527 with self.assertWarnsRegex(DeprecationWarning, 'use active_count()'):
528 threading.activeCount()
529 with self.assertWarnsRegex(DeprecationWarning, 'use current_thread()'):
530 threading.currentThread()
531
532 def test_repr_daemon(self):
533 t = threading.Thread()
534 self.assertNotIn('daemon', repr(t))
535 t.daemon = True
536 self.assertIn('daemon', repr(t))
537
538 def test_daemon_param(self):
539 t = threading.Thread()
540 self.assertFalse(t.daemon)
541 t = threading.Thread(daemon=False)
542 self.assertFalse(t.daemon)
543 t = threading.Thread(daemon=True)
544 self.assertTrue(t.daemon)
545
546 @skip_unless_reliable_fork
547 def test_fork_at_exit(self):
548 # bpo-42350: Calling os.fork() after threading._shutdown() must
549 # not log an error.
550 code = textwrap.dedent("""
551 import atexit
552 import os
553 import sys
554 from test.support import wait_process
555
556 # Import the threading module to register its "at fork" callback
557 import threading
558
559 def exit_handler():
560 pid = os.fork()
561 if not pid:
562 print("child process ok", file=sys.stderr, flush=True)
563 # child process
564 else:
565 wait_process(pid, exitcode=0)
566
567 # exit_handler() will be called after threading._shutdown()
568 atexit.register(exit_handler)
569 """)
570 _, out, err = assert_python_ok("-c", code)
571 self.assertEqual(out, b'')
572 self.assertEqual(err.rstrip(), b'child process ok')
573
574 @skip_unless_reliable_fork
575 def test_dummy_thread_after_fork(self):
576 # Issue #14308: a dummy thread in the active list doesn't mess up
577 # the after-fork mechanism.
578 code = """if 1:
579 import _thread, threading, os, time
580
581 def background_thread(evt):
582 # Creates and registers the _DummyThread instance
583 threading.current_thread()
584 evt.set()
585 time.sleep(10)
586
587 evt = threading.Event()
588 _thread.start_new_thread(background_thread, (evt,))
589 evt.wait()
590 assert threading.active_count() == 2, threading.active_count()
591 if os.fork() == 0:
592 assert threading.active_count() == 1, threading.active_count()
593 os._exit(0)
594 else:
595 os.wait()
596 """
597 _, out, err = assert_python_ok("-c", code)
598 self.assertEqual(out, b'')
599 self.assertEqual(err, b'')
600
601 @skip_unless_reliable_fork
602 def test_is_alive_after_fork(self):
603 # Try hard to trigger #18418: is_alive() could sometimes be True on
604 # threads that vanished after a fork.
605 old_interval = sys.getswitchinterval()
606 self.addCleanup(sys.setswitchinterval, old_interval)
607
608 # Make the bug more likely to manifest.
609 test.support.setswitchinterval(1e-6)
610
611 for i in range(20):
612 t = threading.Thread(target=lambda: None)
613 t.start()
614 pid = os.fork()
615 if pid == 0:
616 os._exit(11 if t.is_alive() else 10)
617 else:
618 t.join()
619
620 support.wait_process(pid, exitcode=10)
621
622 def test_main_thread(self):
623 main = threading.main_thread()
624 self.assertEqual(main.name, 'MainThread')
625 self.assertEqual(main.ident, threading.current_thread().ident)
626 self.assertEqual(main.ident, threading.get_ident())
627
628 def f():
629 self.assertNotEqual(threading.main_thread().ident,
630 threading.current_thread().ident)
631 th = threading.Thread(target=f)
632 th.start()
633 th.join()
634
635 @skip_unless_reliable_fork
636 @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
637 def test_main_thread_after_fork(self):
638 code = """if 1:
639 import os, threading
640 from test import support
641
642 pid = os.fork()
643 if pid == 0:
644 main = threading.main_thread()
645 print(main.name)
646 print(main.ident == threading.current_thread().ident)
647 print(main.ident == threading.get_ident())
648 else:
649 support.wait_process(pid, exitcode=0)
650 """
651 _, out, err = assert_python_ok("-c", code)
652 data = out.decode().replace('\r', '')
653 self.assertEqual(err, b"")
654 self.assertEqual(data, "MainThread\nTrue\nTrue\n")
655
656 @skip_unless_reliable_fork
657 @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
658 def test_main_thread_after_fork_from_nonmain_thread(self):
659 code = """if 1:
660 import os, threading, sys
661 from test import support
662
663 def func():
664 pid = os.fork()
665 if pid == 0:
666 main = threading.main_thread()
667 print(main.name)
668 print(main.ident == threading.current_thread().ident)
669 print(main.ident == threading.get_ident())
670 # stdout is fully buffered because not a tty,
671 # we have to flush before exit.
672 sys.stdout.flush()
673 else:
674 support.wait_process(pid, exitcode=0)
675
676 th = threading.Thread(target=func)
677 th.start()
678 th.join()
679 """
680 _, out, err = assert_python_ok("-c", code)
681 data = out.decode().replace('\r', '')
682 self.assertEqual(err, b"")
683 self.assertEqual(data, "Thread-1 (func)\nTrue\nTrue\n")
684
685 def test_main_thread_during_shutdown(self):
686 # bpo-31516: current_thread() should still point to the main thread
687 # at shutdown
688 code = """if 1:
689 import gc, threading
690
691 main_thread = threading.current_thread()
692 assert main_thread is threading.main_thread() # sanity check
693
694 class RefCycle:
695 def __init__(self):
696 self.cycle = self
697
698 def __del__(self):
699 print("GC:",
700 threading.current_thread() is main_thread,
701 threading.main_thread() is main_thread,
702 threading.enumerate() == [main_thread])
703
704 RefCycle()
705 gc.collect() # sanity check
706 x = RefCycle()
707 """
708 _, out, err = assert_python_ok("-c", code)
709 data = out.decode()
710 self.assertEqual(err, b"")
711 self.assertEqual(data.splitlines(),
712 ["GC: True True True"] * 2)
713
714 def test_finalization_shutdown(self):
715 # bpo-36402: Py_Finalize() calls threading._shutdown() which must wait
716 # until Python thread states of all non-daemon threads get deleted.
717 #
718 # Test similar to SubinterpThreadingTests.test_threads_join_2(), but
719 # test the finalization of the main interpreter.
720 code = """if 1:
721 import os
722 import threading
723 import time
724 import random
725
726 def random_sleep():
727 seconds = random.random() * 0.010
728 time.sleep(seconds)
729
730 class Sleeper:
731 def __del__(self):
732 random_sleep()
733
734 tls = threading.local()
735
736 def f():
737 # Sleep a bit so that the thread is still running when
738 # Py_Finalize() is called.
739 random_sleep()
740 tls.x = Sleeper()
741 random_sleep()
742
743 threading.Thread(target=f).start()
744 random_sleep()
745 """
746 rc, out, err = assert_python_ok("-c", code)
747 self.assertEqual(err, b"")
748
749 def test_tstate_lock(self):
750 # Test an implementation detail of Thread objects.
751 started = _thread.allocate_lock()
752 finish = _thread.allocate_lock()
753 started.acquire()
754 finish.acquire()
755 def f():
756 started.release()
757 finish.acquire()
758 time.sleep(0.01)
759 # The tstate lock is None until the thread is started
760 t = threading.Thread(target=f)
761 self.assertIs(t._tstate_lock, None)
762 t.start()
763 started.acquire()
764 self.assertTrue(t.is_alive())
765 # The tstate lock can't be acquired when the thread is running
766 # (or suspended).
767 tstate_lock = t._tstate_lock
768 self.assertFalse(tstate_lock.acquire(timeout=0), False)
769 finish.release()
770 # When the thread ends, the state_lock can be successfully
771 # acquired.
772 self.assertTrue(tstate_lock.acquire(timeout=support.SHORT_TIMEOUT), False)
773 # But is_alive() is still True: we hold _tstate_lock now, which
774 # prevents is_alive() from knowing the thread's end-of-life C code
775 # is done.
776 self.assertTrue(t.is_alive())
777 # Let is_alive() find out the C code is done.
778 tstate_lock.release()
779 self.assertFalse(t.is_alive())
780 # And verify the thread disposed of _tstate_lock.
781 self.assertIsNone(t._tstate_lock)
782 t.join()
783
784 def test_repr_stopped(self):
785 # Verify that "stopped" shows up in repr(Thread) appropriately.
786 started = _thread.allocate_lock()
787 finish = _thread.allocate_lock()
788 started.acquire()
789 finish.acquire()
790 def f():
791 started.release()
792 finish.acquire()
793 t = threading.Thread(target=f)
794 t.start()
795 started.acquire()
796 self.assertIn("started", repr(t))
797 finish.release()
798 # "stopped" should appear in the repr in a reasonable amount of time.
799 # Implementation detail: as of this writing, that's trivially true
800 # if .join() is called, and almost trivially true if .is_alive() is
801 # called. The detail we're testing here is that "stopped" shows up
802 # "all on its own".
803 LOOKING_FOR = "stopped"
804 for i in range(500):
805 if LOOKING_FOR in repr(t):
806 break
807 time.sleep(0.01)
808 self.assertIn(LOOKING_FOR, repr(t)) # we waited at least 5 seconds
809 t.join()
810
811 def test_BoundedSemaphore_limit(self):
812 # BoundedSemaphore should raise ValueError if released too often.
813 for limit in range(1, 10):
814 bs = threading.BoundedSemaphore(limit)
815 threads = [threading.Thread(target=bs.acquire)
816 for _ in range(limit)]
817 for t in threads:
818 t.start()
819 for t in threads:
820 t.join()
821 threads = [threading.Thread(target=bs.release)
822 for _ in range(limit)]
823 for t in threads:
824 t.start()
825 for t in threads:
826 t.join()
827 self.assertRaises(ValueError, bs.release)
828
829 @cpython_only
830 def test_frame_tstate_tracing(self):
831 # Issue #14432: Crash when a generator is created in a C thread that is
832 # destroyed while the generator is still used. The issue was that a
833 # generator contains a frame, and the frame kept a reference to the
834 # Python state of the destroyed C thread. The crash occurs when a trace
835 # function is setup.
836
837 def noop_trace(frame, event, arg):
838 # no operation
839 return noop_trace
840
841 def generator():
842 while 1:
843 yield "generator"
844
845 def callback():
846 if callback.gen is None:
847 callback.gen = generator()
848 return next(callback.gen)
849 callback.gen = None
850
851 old_trace = sys.gettrace()
852 sys.settrace(noop_trace)
853 try:
854 # Install a trace function
855 threading.settrace(noop_trace)
856
857 # Create a generator in a C thread which exits after the call
858 import _testcapi
859 _testcapi.call_in_temporary_c_thread(callback)
860
861 # Call the generator in a different Python thread, check that the
862 # generator didn't keep a reference to the destroyed thread state
863 for test in range(3):
864 # The trace function is still called here
865 callback()
866 finally:
867 sys.settrace(old_trace)
868
869 def test_gettrace(self):
870 def noop_trace(frame, event, arg):
871 # no operation
872 return noop_trace
873 old_trace = threading.gettrace()
874 try:
875 threading.settrace(noop_trace)
876 trace_func = threading.gettrace()
877 self.assertEqual(noop_trace,trace_func)
878 finally:
879 threading.settrace(old_trace)
880
881 def test_getprofile(self):
882 def fn(*args): pass
883 old_profile = threading.getprofile()
884 try:
885 threading.setprofile(fn)
886 self.assertEqual(fn, threading.getprofile())
887 finally:
888 threading.setprofile(old_profile)
889
890 @cpython_only
891 def test_shutdown_locks(self):
892 for daemon in (False, True):
893 with self.subTest(daemon=daemon):
894 event = threading.Event()
895 thread = threading.Thread(target=event.wait, daemon=daemon)
896
897 # Thread.start() must add lock to _shutdown_locks,
898 # but only for non-daemon thread
899 thread.start()
900 tstate_lock = thread._tstate_lock
901 if not daemon:
902 self.assertIn(tstate_lock, threading._shutdown_locks)
903 else:
904 self.assertNotIn(tstate_lock, threading._shutdown_locks)
905
906 # unblock the thread and join it
907 event.set()
908 thread.join()
909
910 # Thread._stop() must remove tstate_lock from _shutdown_locks.
911 # Daemon threads must never add it to _shutdown_locks.
912 self.assertNotIn(tstate_lock, threading._shutdown_locks)
913
914 def test_locals_at_exit(self):
915 # bpo-19466: thread locals must not be deleted before destructors
916 # are called
917 rc, out, err = assert_python_ok("-c", """if 1:
918 import threading
919
920 class Atexit:
921 def __del__(self):
922 print("thread_dict.atexit = %r" % thread_dict.atexit)
923
924 thread_dict = threading.local()
925 thread_dict.atexit = "value"
926
927 atexit = Atexit()
928 """)
929 self.assertEqual(out.rstrip(), b"thread_dict.atexit = 'value'")
930
931 def test_boolean_target(self):
932 # bpo-41149: A thread that had a boolean value of False would not
933 # run, regardless of whether it was callable. The correct behaviour
934 # is for a thread to do nothing if its target is None, and to call
935 # the target otherwise.
936 class ESC[4;38;5;81mBooleanTarget(ESC[4;38;5;149mobject):
937 def __init__(self):
938 self.ran = False
939 def __bool__(self):
940 return False
941 def __call__(self):
942 self.ran = True
943
944 target = BooleanTarget()
945 thread = threading.Thread(target=target)
946 thread.start()
947 thread.join()
948 self.assertTrue(target.ran)
949
950 def test_leak_without_join(self):
951 # bpo-37788: Test that a thread which is not joined explicitly
952 # does not leak. Test written for reference leak checks.
953 def noop(): pass
954 with threading_helper.wait_threads_exit():
955 threading.Thread(target=noop).start()
956 # Thread.join() is not called
957
958 @unittest.skipUnless(Py_DEBUG, 'need debug build (Py_DEBUG)')
959 def test_debug_deprecation(self):
960 # bpo-44584: The PYTHONTHREADDEBUG environment variable is deprecated
961 rc, out, err = assert_python_ok("-Wdefault", "-c", "pass",
962 PYTHONTHREADDEBUG="1")
963 msg = (b'DeprecationWarning: The threading debug '
964 b'(PYTHONTHREADDEBUG environment variable) '
965 b'is deprecated and will be removed in Python 3.12')
966 self.assertIn(msg, err)
967
968 def test_import_from_another_thread(self):
969 # bpo-1596321: If the threading module is first import from a thread
970 # different than the main thread, threading._shutdown() must handle
971 # this case without logging an error at Python exit.
972 code = textwrap.dedent('''
973 import _thread
974 import sys
975
976 event = _thread.allocate_lock()
977 event.acquire()
978
979 def import_threading():
980 import threading
981 event.release()
982
983 if 'threading' in sys.modules:
984 raise Exception('threading is already imported')
985
986 _thread.start_new_thread(import_threading, ())
987
988 # wait until the threading module is imported
989 event.acquire()
990 event.release()
991
992 if 'threading' not in sys.modules:
993 raise Exception('threading is not imported')
994
995 # don't wait until the thread completes
996 ''')
997 rc, out, err = assert_python_ok("-c", code)
998 self.assertEqual(out, b'')
999 self.assertEqual(err, b'')
1000
1001
1002 class ESC[4;38;5;81mThreadJoinOnShutdown(ESC[4;38;5;149mBaseTestCase):
1003
1004 def _run_and_join(self, script):
1005 script = """if 1:
1006 import sys, os, time, threading
1007
1008 # a thread, which waits for the main program to terminate
1009 def joiningfunc(mainthread):
1010 mainthread.join()
1011 print('end of thread')
1012 # stdout is fully buffered because not a tty, we have to flush
1013 # before exit.
1014 sys.stdout.flush()
1015 \n""" + script
1016
1017 rc, out, err = assert_python_ok("-c", script)
1018 data = out.decode().replace('\r', '')
1019 self.assertEqual(data, "end of main\nend of thread\n")
1020
1021 def test_1_join_on_shutdown(self):
1022 # The usual case: on exit, wait for a non-daemon thread
1023 script = """if 1:
1024 import os
1025 t = threading.Thread(target=joiningfunc,
1026 args=(threading.current_thread(),))
1027 t.start()
1028 time.sleep(0.1)
1029 print('end of main')
1030 """
1031 self._run_and_join(script)
1032
1033 @skip_unless_reliable_fork
1034 def test_2_join_in_forked_process(self):
1035 # Like the test above, but from a forked interpreter
1036 script = """if 1:
1037 from test import support
1038
1039 childpid = os.fork()
1040 if childpid != 0:
1041 # parent process
1042 support.wait_process(childpid, exitcode=0)
1043 sys.exit(0)
1044
1045 # child process
1046 t = threading.Thread(target=joiningfunc,
1047 args=(threading.current_thread(),))
1048 t.start()
1049 print('end of main')
1050 """
1051 self._run_and_join(script)
1052
1053 @skip_unless_reliable_fork
1054 def test_3_join_in_forked_from_thread(self):
1055 # Like the test above, but fork() was called from a worker thread
1056 # In the forked process, the main Thread object must be marked as stopped.
1057
1058 script = """if 1:
1059 from test import support
1060
1061 main_thread = threading.current_thread()
1062 def worker():
1063 childpid = os.fork()
1064 if childpid != 0:
1065 # parent process
1066 support.wait_process(childpid, exitcode=0)
1067 sys.exit(0)
1068
1069 # child process
1070 t = threading.Thread(target=joiningfunc,
1071 args=(main_thread,))
1072 print('end of main')
1073 t.start()
1074 t.join() # Should not block: main_thread is already stopped
1075
1076 w = threading.Thread(target=worker)
1077 w.start()
1078 """
1079 self._run_and_join(script)
1080
1081 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
1082 def test_4_daemon_threads(self):
1083 # Check that a daemon thread cannot crash the interpreter on shutdown
1084 # by manipulating internal structures that are being disposed of in
1085 # the main thread.
1086 script = """if True:
1087 import os
1088 import random
1089 import sys
1090 import time
1091 import threading
1092
1093 thread_has_run = set()
1094
1095 def random_io():
1096 '''Loop for a while sleeping random tiny amounts and doing some I/O.'''
1097 import test.test_threading as mod
1098 while True:
1099 with open(mod.__file__, 'rb') as in_f:
1100 stuff = in_f.read(200)
1101 with open(os.devnull, 'wb') as null_f:
1102 null_f.write(stuff)
1103 time.sleep(random.random() / 1995)
1104 thread_has_run.add(threading.current_thread())
1105
1106 def main():
1107 count = 0
1108 for _ in range(40):
1109 new_thread = threading.Thread(target=random_io)
1110 new_thread.daemon = True
1111 new_thread.start()
1112 count += 1
1113 while len(thread_has_run) < count:
1114 time.sleep(0.001)
1115 # Trigger process shutdown
1116 sys.exit(0)
1117
1118 main()
1119 """
1120 rc, out, err = assert_python_ok('-c', script)
1121 self.assertFalse(err)
1122
1123 @skip_unless_reliable_fork
1124 def test_reinit_tls_after_fork(self):
1125 # Issue #13817: fork() would deadlock in a multithreaded program with
1126 # the ad-hoc TLS implementation.
1127
1128 def do_fork_and_wait():
1129 # just fork a child process and wait it
1130 pid = os.fork()
1131 if pid > 0:
1132 support.wait_process(pid, exitcode=50)
1133 else:
1134 os._exit(50)
1135
1136 # start a bunch of threads that will fork() child processes
1137 threads = []
1138 for i in range(16):
1139 t = threading.Thread(target=do_fork_and_wait)
1140 threads.append(t)
1141 t.start()
1142
1143 for t in threads:
1144 t.join()
1145
1146 @skip_unless_reliable_fork
1147 def test_clear_threads_states_after_fork(self):
1148 # Issue #17094: check that threads states are cleared after fork()
1149
1150 # start a bunch of threads
1151 threads = []
1152 for i in range(16):
1153 t = threading.Thread(target=lambda : time.sleep(0.3))
1154 threads.append(t)
1155 t.start()
1156
1157 pid = os.fork()
1158 if pid == 0:
1159 # check that threads states have been cleared
1160 if len(sys._current_frames()) == 1:
1161 os._exit(51)
1162 else:
1163 os._exit(52)
1164 else:
1165 support.wait_process(pid, exitcode=51)
1166
1167 for t in threads:
1168 t.join()
1169
1170
1171 class ESC[4;38;5;81mSubinterpThreadingTests(ESC[4;38;5;149mBaseTestCase):
1172 def pipe(self):
1173 r, w = os.pipe()
1174 self.addCleanup(os.close, r)
1175 self.addCleanup(os.close, w)
1176 if hasattr(os, 'set_blocking'):
1177 os.set_blocking(r, False)
1178 return (r, w)
1179
1180 def test_threads_join(self):
1181 # Non-daemon threads should be joined at subinterpreter shutdown
1182 # (issue #18808)
1183 r, w = self.pipe()
1184 code = textwrap.dedent(r"""
1185 import os
1186 import random
1187 import threading
1188 import time
1189
1190 def random_sleep():
1191 seconds = random.random() * 0.010
1192 time.sleep(seconds)
1193
1194 def f():
1195 # Sleep a bit so that the thread is still running when
1196 # Py_EndInterpreter is called.
1197 random_sleep()
1198 os.write(%d, b"x")
1199
1200 threading.Thread(target=f).start()
1201 random_sleep()
1202 """ % (w,))
1203 ret = test.support.run_in_subinterp(code)
1204 self.assertEqual(ret, 0)
1205 # The thread was joined properly.
1206 self.assertEqual(os.read(r, 1), b"x")
1207
1208 def test_threads_join_2(self):
1209 # Same as above, but a delay gets introduced after the thread's
1210 # Python code returned but before the thread state is deleted.
1211 # To achieve this, we register a thread-local object which sleeps
1212 # a bit when deallocated.
1213 r, w = self.pipe()
1214 code = textwrap.dedent(r"""
1215 import os
1216 import random
1217 import threading
1218 import time
1219
1220 def random_sleep():
1221 seconds = random.random() * 0.010
1222 time.sleep(seconds)
1223
1224 class Sleeper:
1225 def __del__(self):
1226 random_sleep()
1227
1228 tls = threading.local()
1229
1230 def f():
1231 # Sleep a bit so that the thread is still running when
1232 # Py_EndInterpreter is called.
1233 random_sleep()
1234 tls.x = Sleeper()
1235 os.write(%d, b"x")
1236
1237 threading.Thread(target=f).start()
1238 random_sleep()
1239 """ % (w,))
1240 ret = test.support.run_in_subinterp(code)
1241 self.assertEqual(ret, 0)
1242 # The thread was joined properly.
1243 self.assertEqual(os.read(r, 1), b"x")
1244
1245 @cpython_only
1246 def test_daemon_threads_fatal_error(self):
1247 subinterp_code = f"""if 1:
1248 import os
1249 import threading
1250 import time
1251
1252 def f():
1253 # Make sure the daemon thread is still running when
1254 # Py_EndInterpreter is called.
1255 time.sleep({test.support.SHORT_TIMEOUT})
1256 threading.Thread(target=f, daemon=True).start()
1257 """
1258 script = r"""if 1:
1259 import _testcapi
1260
1261 _testcapi.run_in_subinterp(%r)
1262 """ % (subinterp_code,)
1263 with test.support.SuppressCrashReport():
1264 rc, out, err = assert_python_failure("-c", script)
1265 self.assertIn("Fatal Python error: Py_EndInterpreter: "
1266 "not the last thread", err.decode())
1267
1268
1269 class ESC[4;38;5;81mThreadingExceptionTests(ESC[4;38;5;149mBaseTestCase):
1270 # A RuntimeError should be raised if Thread.start() is called
1271 # multiple times.
1272 def test_start_thread_again(self):
1273 thread = threading.Thread()
1274 thread.start()
1275 self.assertRaises(RuntimeError, thread.start)
1276 thread.join()
1277
1278 def test_joining_current_thread(self):
1279 current_thread = threading.current_thread()
1280 self.assertRaises(RuntimeError, current_thread.join);
1281
1282 def test_joining_inactive_thread(self):
1283 thread = threading.Thread()
1284 self.assertRaises(RuntimeError, thread.join)
1285
1286 def test_daemonize_active_thread(self):
1287 thread = threading.Thread()
1288 thread.start()
1289 self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
1290 thread.join()
1291
1292 def test_releasing_unacquired_lock(self):
1293 lock = threading.Lock()
1294 self.assertRaises(RuntimeError, lock.release)
1295
1296 @requires_subprocess()
1297 def test_recursion_limit(self):
1298 # Issue 9670
1299 # test that excessive recursion within a non-main thread causes
1300 # an exception rather than crashing the interpreter on platforms
1301 # like Mac OS X or FreeBSD which have small default stack sizes
1302 # for threads
1303 script = """if True:
1304 import threading
1305
1306 def recurse():
1307 return recurse()
1308
1309 def outer():
1310 try:
1311 recurse()
1312 except RecursionError:
1313 pass
1314
1315 w = threading.Thread(target=outer)
1316 w.start()
1317 w.join()
1318 print('end of main thread')
1319 """
1320 expected_output = "end of main thread\n"
1321 p = subprocess.Popen([sys.executable, "-c", script],
1322 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1323 stdout, stderr = p.communicate()
1324 data = stdout.decode().replace('\r', '')
1325 self.assertEqual(p.returncode, 0, "Unexpected error: " + stderr.decode())
1326 self.assertEqual(data, expected_output)
1327
1328 def test_print_exception(self):
1329 script = r"""if True:
1330 import threading
1331 import time
1332
1333 running = False
1334 def run():
1335 global running
1336 running = True
1337 while running:
1338 time.sleep(0.01)
1339 1/0
1340 t = threading.Thread(target=run)
1341 t.start()
1342 while not running:
1343 time.sleep(0.01)
1344 running = False
1345 t.join()
1346 """
1347 rc, out, err = assert_python_ok("-c", script)
1348 self.assertEqual(out, b'')
1349 err = err.decode()
1350 self.assertIn("Exception in thread", err)
1351 self.assertIn("Traceback (most recent call last):", err)
1352 self.assertIn("ZeroDivisionError", err)
1353 self.assertNotIn("Unhandled exception", err)
1354
1355 def test_print_exception_stderr_is_none_1(self):
1356 script = r"""if True:
1357 import sys
1358 import threading
1359 import time
1360
1361 running = False
1362 def run():
1363 global running
1364 running = True
1365 while running:
1366 time.sleep(0.01)
1367 1/0
1368 t = threading.Thread(target=run)
1369 t.start()
1370 while not running:
1371 time.sleep(0.01)
1372 sys.stderr = None
1373 running = False
1374 t.join()
1375 """
1376 rc, out, err = assert_python_ok("-c", script)
1377 self.assertEqual(out, b'')
1378 err = err.decode()
1379 self.assertIn("Exception in thread", err)
1380 self.assertIn("Traceback (most recent call last):", err)
1381 self.assertIn("ZeroDivisionError", err)
1382 self.assertNotIn("Unhandled exception", err)
1383
1384 def test_print_exception_stderr_is_none_2(self):
1385 script = r"""if True:
1386 import sys
1387 import threading
1388 import time
1389
1390 running = False
1391 def run():
1392 global running
1393 running = True
1394 while running:
1395 time.sleep(0.01)
1396 1/0
1397 sys.stderr = None
1398 t = threading.Thread(target=run)
1399 t.start()
1400 while not running:
1401 time.sleep(0.01)
1402 running = False
1403 t.join()
1404 """
1405 rc, out, err = assert_python_ok("-c", script)
1406 self.assertEqual(out, b'')
1407 self.assertNotIn("Unhandled exception", err.decode())
1408
1409 def test_bare_raise_in_brand_new_thread(self):
1410 def bare_raise():
1411 raise
1412
1413 class ESC[4;38;5;81mIssue27558(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
1414 exc = None
1415
1416 def run(self):
1417 try:
1418 bare_raise()
1419 except Exception as exc:
1420 self.exc = exc
1421
1422 thread = Issue27558()
1423 thread.start()
1424 thread.join()
1425 self.assertIsNotNone(thread.exc)
1426 self.assertIsInstance(thread.exc, RuntimeError)
1427 # explicitly break the reference cycle to not leak a dangling thread
1428 thread.exc = None
1429
1430 def test_multithread_modify_file_noerror(self):
1431 # See issue25872
1432 def modify_file():
1433 with open(os_helper.TESTFN, 'w', encoding='utf-8') as fp:
1434 fp.write(' ')
1435 traceback.format_stack()
1436
1437 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1438 threads = [
1439 threading.Thread(target=modify_file)
1440 for i in range(100)
1441 ]
1442 for t in threads:
1443 t.start()
1444 t.join()
1445
1446
1447 class ESC[4;38;5;81mThreadRunFail(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
1448 def run(self):
1449 raise ValueError("run failed")
1450
1451
1452 class ESC[4;38;5;81mExceptHookTests(ESC[4;38;5;149mBaseTestCase):
1453 def setUp(self):
1454 restore_default_excepthook(self)
1455 super().setUp()
1456
1457 def test_excepthook(self):
1458 with support.captured_output("stderr") as stderr:
1459 thread = ThreadRunFail(name="excepthook thread")
1460 thread.start()
1461 thread.join()
1462
1463 stderr = stderr.getvalue().strip()
1464 self.assertIn(f'Exception in thread {thread.name}:\n', stderr)
1465 self.assertIn('Traceback (most recent call last):\n', stderr)
1466 self.assertIn(' raise ValueError("run failed")', stderr)
1467 self.assertIn('ValueError: run failed', stderr)
1468
1469 @support.cpython_only
1470 def test_excepthook_thread_None(self):
1471 # threading.excepthook called with thread=None: log the thread
1472 # identifier in this case.
1473 with support.captured_output("stderr") as stderr:
1474 try:
1475 raise ValueError("bug")
1476 except Exception as exc:
1477 args = threading.ExceptHookArgs([*sys.exc_info(), None])
1478 try:
1479 threading.excepthook(args)
1480 finally:
1481 # Explicitly break a reference cycle
1482 args = None
1483
1484 stderr = stderr.getvalue().strip()
1485 self.assertIn(f'Exception in thread {threading.get_ident()}:\n', stderr)
1486 self.assertIn('Traceback (most recent call last):\n', stderr)
1487 self.assertIn(' raise ValueError("bug")', stderr)
1488 self.assertIn('ValueError: bug', stderr)
1489
1490 def test_system_exit(self):
1491 class ESC[4;38;5;81mThreadExit(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
1492 def run(self):
1493 sys.exit(1)
1494
1495 # threading.excepthook() silently ignores SystemExit
1496 with support.captured_output("stderr") as stderr:
1497 thread = ThreadExit()
1498 thread.start()
1499 thread.join()
1500
1501 self.assertEqual(stderr.getvalue(), '')
1502
1503 def test_custom_excepthook(self):
1504 args = None
1505
1506 def hook(hook_args):
1507 nonlocal args
1508 args = hook_args
1509
1510 try:
1511 with support.swap_attr(threading, 'excepthook', hook):
1512 thread = ThreadRunFail()
1513 thread.start()
1514 thread.join()
1515
1516 self.assertEqual(args.exc_type, ValueError)
1517 self.assertEqual(str(args.exc_value), 'run failed')
1518 self.assertEqual(args.exc_traceback, args.exc_value.__traceback__)
1519 self.assertIs(args.thread, thread)
1520 finally:
1521 # Break reference cycle
1522 args = None
1523
1524 def test_custom_excepthook_fail(self):
1525 def threading_hook(args):
1526 raise ValueError("threading_hook failed")
1527
1528 err_str = None
1529
1530 def sys_hook(exc_type, exc_value, exc_traceback):
1531 nonlocal err_str
1532 err_str = str(exc_value)
1533
1534 with support.swap_attr(threading, 'excepthook', threading_hook), \
1535 support.swap_attr(sys, 'excepthook', sys_hook), \
1536 support.captured_output('stderr') as stderr:
1537 thread = ThreadRunFail()
1538 thread.start()
1539 thread.join()
1540
1541 self.assertEqual(stderr.getvalue(),
1542 'Exception in threading.excepthook:\n')
1543 self.assertEqual(err_str, 'threading_hook failed')
1544
1545 def test_original_excepthook(self):
1546 def run_thread():
1547 with support.captured_output("stderr") as output:
1548 thread = ThreadRunFail(name="excepthook thread")
1549 thread.start()
1550 thread.join()
1551 return output.getvalue()
1552
1553 def threading_hook(args):
1554 print("Running a thread failed", file=sys.stderr)
1555
1556 default_output = run_thread()
1557 with support.swap_attr(threading, 'excepthook', threading_hook):
1558 custom_hook_output = run_thread()
1559 threading.excepthook = threading.__excepthook__
1560 recovered_output = run_thread()
1561
1562 self.assertEqual(default_output, recovered_output)
1563 self.assertNotEqual(default_output, custom_hook_output)
1564 self.assertEqual(custom_hook_output, "Running a thread failed\n")
1565
1566
1567 class ESC[4;38;5;81mTimerTests(ESC[4;38;5;149mBaseTestCase):
1568
1569 def setUp(self):
1570 BaseTestCase.setUp(self)
1571 self.callback_args = []
1572 self.callback_event = threading.Event()
1573
1574 def test_init_immutable_default_args(self):
1575 # Issue 17435: constructor defaults were mutable objects, they could be
1576 # mutated via the object attributes and affect other Timer objects.
1577 timer1 = threading.Timer(0.01, self._callback_spy)
1578 timer1.start()
1579 self.callback_event.wait()
1580 timer1.args.append("blah")
1581 timer1.kwargs["foo"] = "bar"
1582 self.callback_event.clear()
1583 timer2 = threading.Timer(0.01, self._callback_spy)
1584 timer2.start()
1585 self.callback_event.wait()
1586 self.assertEqual(len(self.callback_args), 2)
1587 self.assertEqual(self.callback_args, [((), {}), ((), {})])
1588 timer1.join()
1589 timer2.join()
1590
1591 def _callback_spy(self, *args, **kwargs):
1592 self.callback_args.append((args[:], kwargs.copy()))
1593 self.callback_event.set()
1594
1595 class ESC[4;38;5;81mLockTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mLockTests):
1596 locktype = staticmethod(threading.Lock)
1597
1598 class ESC[4;38;5;81mPyRLockTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mRLockTests):
1599 locktype = staticmethod(threading._PyRLock)
1600
1601 @unittest.skipIf(threading._CRLock is None, 'RLock not implemented in C')
1602 class ESC[4;38;5;81mCRLockTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mRLockTests):
1603 locktype = staticmethod(threading._CRLock)
1604
1605 class ESC[4;38;5;81mEventTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mEventTests):
1606 eventtype = staticmethod(threading.Event)
1607
1608 class ESC[4;38;5;81mConditionAsRLockTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mRLockTests):
1609 # Condition uses an RLock by default and exports its API.
1610 locktype = staticmethod(threading.Condition)
1611
1612 def test_recursion_count(self):
1613 self.skipTest("Condition does not expose _recursion_count()")
1614
1615 class ESC[4;38;5;81mConditionTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mConditionTests):
1616 condtype = staticmethod(threading.Condition)
1617
1618 class ESC[4;38;5;81mSemaphoreTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mSemaphoreTests):
1619 semtype = staticmethod(threading.Semaphore)
1620
1621 class ESC[4;38;5;81mBoundedSemaphoreTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mBoundedSemaphoreTests):
1622 semtype = staticmethod(threading.BoundedSemaphore)
1623
1624 class ESC[4;38;5;81mBarrierTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mBarrierTests):
1625 barriertype = staticmethod(threading.Barrier)
1626
1627
1628 class ESC[4;38;5;81mMiscTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1629 def test__all__(self):
1630 restore_default_excepthook(self)
1631
1632 extra = {"ThreadError"}
1633 not_exported = {'currentThread', 'activeCount'}
1634 support.check__all__(self, threading, ('threading', '_thread'),
1635 extra=extra, not_exported=not_exported)
1636
1637
1638 class ESC[4;38;5;81mInterruptMainTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1639 def check_interrupt_main_with_signal_handler(self, signum):
1640 def handler(signum, frame):
1641 1/0
1642
1643 old_handler = signal.signal(signum, handler)
1644 self.addCleanup(signal.signal, signum, old_handler)
1645
1646 with self.assertRaises(ZeroDivisionError):
1647 _thread.interrupt_main()
1648
1649 def check_interrupt_main_noerror(self, signum):
1650 handler = signal.getsignal(signum)
1651 try:
1652 # No exception should arise.
1653 signal.signal(signum, signal.SIG_IGN)
1654 _thread.interrupt_main(signum)
1655
1656 signal.signal(signum, signal.SIG_DFL)
1657 _thread.interrupt_main(signum)
1658 finally:
1659 # Restore original handler
1660 signal.signal(signum, handler)
1661
1662 def test_interrupt_main_subthread(self):
1663 # Calling start_new_thread with a function that executes interrupt_main
1664 # should raise KeyboardInterrupt upon completion.
1665 def call_interrupt():
1666 _thread.interrupt_main()
1667 t = threading.Thread(target=call_interrupt)
1668 with self.assertRaises(KeyboardInterrupt):
1669 t.start()
1670 t.join()
1671 t.join()
1672
1673 def test_interrupt_main_mainthread(self):
1674 # Make sure that if interrupt_main is called in main thread that
1675 # KeyboardInterrupt is raised instantly.
1676 with self.assertRaises(KeyboardInterrupt):
1677 _thread.interrupt_main()
1678
1679 def test_interrupt_main_with_signal_handler(self):
1680 self.check_interrupt_main_with_signal_handler(signal.SIGINT)
1681 self.check_interrupt_main_with_signal_handler(signal.SIGTERM)
1682
1683 def test_interrupt_main_noerror(self):
1684 self.check_interrupt_main_noerror(signal.SIGINT)
1685 self.check_interrupt_main_noerror(signal.SIGTERM)
1686
1687 def test_interrupt_main_invalid_signal(self):
1688 self.assertRaises(ValueError, _thread.interrupt_main, -1)
1689 self.assertRaises(ValueError, _thread.interrupt_main, signal.NSIG)
1690 self.assertRaises(ValueError, _thread.interrupt_main, 1000000)
1691
1692 @threading_helper.reap_threads
1693 def test_can_interrupt_tight_loops(self):
1694 cont = [True]
1695 started = [False]
1696 interrupted = [False]
1697
1698 def worker(started, cont, interrupted):
1699 iterations = 100_000_000
1700 started[0] = True
1701 while cont[0]:
1702 if iterations:
1703 iterations -= 1
1704 else:
1705 return
1706 pass
1707 interrupted[0] = True
1708
1709 t = threading.Thread(target=worker,args=(started, cont, interrupted))
1710 t.start()
1711 while not started[0]:
1712 pass
1713 cont[0] = False
1714 t.join()
1715 self.assertTrue(interrupted[0])
1716
1717
1718 class ESC[4;38;5;81mAtexitTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1719
1720 def test_atexit_output(self):
1721 rc, out, err = assert_python_ok("-c", """if True:
1722 import threading
1723
1724 def run_last():
1725 print('parrot')
1726
1727 threading._register_atexit(run_last)
1728 """)
1729
1730 self.assertFalse(err)
1731 self.assertEqual(out.strip(), b'parrot')
1732
1733 def test_atexit_called_once(self):
1734 rc, out, err = assert_python_ok("-c", """if True:
1735 import threading
1736 from unittest.mock import Mock
1737
1738 mock = Mock()
1739 threading._register_atexit(mock)
1740 mock.assert_not_called()
1741 # force early shutdown to ensure it was called once
1742 threading._shutdown()
1743 mock.assert_called_once()
1744 """)
1745
1746 self.assertFalse(err)
1747
1748 def test_atexit_after_shutdown(self):
1749 # The only way to do this is by registering an atexit within
1750 # an atexit, which is intended to raise an exception.
1751 rc, out, err = assert_python_ok("-c", """if True:
1752 import threading
1753
1754 def func():
1755 pass
1756
1757 def run_last():
1758 threading._register_atexit(func)
1759
1760 threading._register_atexit(run_last)
1761 """)
1762
1763 self.assertTrue(err)
1764 self.assertIn("RuntimeError: can't register atexit after shutdown",
1765 err.decode())
1766
1767
1768 if __name__ == "__main__":
1769 unittest.main()