(root)/
Python-3.11.7/
Lib/
test/
test_threading.py
       1  """
       2  Tests for the threading module.
       3  """
       4  
       5  import test.support
       6  from test.support import threading_helper, requires_subprocess
       7  from test.support import verbose, cpython_only, os_helper
       8  from test.support.import_helper import import_module
       9  from test.support.script_helper import assert_python_ok, assert_python_failure
      10  
      11  import random
      12  import sys
      13  import _thread
      14  import threading
      15  import time
      16  import unittest
      17  import weakref
      18  import os
      19  import subprocess
      20  import signal
      21  import textwrap
      22  import traceback
      23  
      24  from unittest import mock
      25  from test import lock_tests
      26  from test import support
      27  
      28  threading_helper.requires_working_threading(module=True)
      29  
      30  # Between fork() and exec(), only async-safe functions are allowed (issues
      31  # #12316 and #11870), and fork() from a worker thread is known to trigger
      32  # problems with some operating systems (issue #3863): skip problematic tests
      33  # on platforms known to behave badly.
      34  platforms_to_skip = ('netbsd5', 'hp-ux11')
      35  
      36  # Is Python built with Py_DEBUG macro defined?
      37  Py_DEBUG = hasattr(sys, 'gettotalrefcount')
      38  
      39  
      40  def skip_unless_reliable_fork(test):
      41      if not support.has_fork_support:
      42          return unittest.skip("requires working os.fork()")(test)
      43      if sys.platform in platforms_to_skip:
      44          return unittest.skip("due to known OS bug related to thread+fork")(test)
      45      if support.HAVE_ASAN_FORK_BUG:
      46          return unittest.skip("libasan has a pthread_create() dead lock related to thread+fork")(test)
      47      return test
      48  
      49  
      50  def restore_default_excepthook(testcase):
      51      testcase.addCleanup(setattr, threading, 'excepthook', threading.excepthook)
      52      threading.excepthook = threading.__excepthook__
      53  
      54  
      55  # A trivial mutable counter.
      56  class ESC[4;38;5;81mCounter(ESC[4;38;5;149mobject):
      57      def __init__(self):
      58          self.value = 0
      59      def inc(self):
      60          self.value += 1
      61      def dec(self):
      62          self.value -= 1
      63      def get(self):
      64          return self.value
      65  
      66  class ESC[4;38;5;81mTestThread(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
      67      def __init__(self, name, testcase, sema, mutex, nrunning):
      68          threading.Thread.__init__(self, name=name)
      69          self.testcase = testcase
      70          self.sema = sema
      71          self.mutex = mutex
      72          self.nrunning = nrunning
      73  
      74      def run(self):
      75          delay = random.random() / 10000.0
      76          if verbose:
      77              print('task %s will run for %.1f usec' %
      78                    (self.name, delay * 1e6))
      79  
      80          with self.sema:
      81              with self.mutex:
      82                  self.nrunning.inc()
      83                  if verbose:
      84                      print(self.nrunning.get(), 'tasks are running')
      85                  self.testcase.assertLessEqual(self.nrunning.get(), 3)
      86  
      87              time.sleep(delay)
      88              if verbose:
      89                  print('task', self.name, 'done')
      90  
      91              with self.mutex:
      92                  self.nrunning.dec()
      93                  self.testcase.assertGreaterEqual(self.nrunning.get(), 0)
      94                  if verbose:
      95                      print('%s is finished. %d tasks are running' %
      96                            (self.name, self.nrunning.get()))
      97  
      98  
      99  class ESC[4;38;5;81mBaseTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     100      def setUp(self):
     101          self._threads = threading_helper.threading_setup()
     102  
     103      def tearDown(self):
     104          threading_helper.threading_cleanup(*self._threads)
     105          test.support.reap_children()
     106  
     107  
     108  class ESC[4;38;5;81mThreadTests(ESC[4;38;5;149mBaseTestCase):
     109  
     110      @cpython_only
     111      def test_name(self):
     112          def func(): pass
     113  
     114          thread = threading.Thread(name="myname1")
     115          self.assertEqual(thread.name, "myname1")
     116  
     117          # Convert int name to str
     118          thread = threading.Thread(name=123)
     119          self.assertEqual(thread.name, "123")
     120  
     121          # target name is ignored if name is specified
     122          thread = threading.Thread(target=func, name="myname2")
     123          self.assertEqual(thread.name, "myname2")
     124  
     125          with mock.patch.object(threading, '_counter', return_value=2):
     126              thread = threading.Thread(name="")
     127              self.assertEqual(thread.name, "Thread-2")
     128  
     129          with mock.patch.object(threading, '_counter', return_value=3):
     130              thread = threading.Thread()
     131              self.assertEqual(thread.name, "Thread-3")
     132  
     133          with mock.patch.object(threading, '_counter', return_value=5):
     134              thread = threading.Thread(target=func)
     135              self.assertEqual(thread.name, "Thread-5 (func)")
     136  
     137      def test_args_argument(self):
     138          # bpo-45735: Using list or tuple as *args* in constructor could
     139          # achieve the same effect.
     140          num_list = [1]
     141          num_tuple = (1,)
     142  
     143          str_list = ["str"]
     144          str_tuple = ("str",)
     145  
     146          list_in_tuple = ([1],)
     147          tuple_in_list = [(1,)]
     148  
     149          test_cases = (
     150              (num_list, lambda arg: self.assertEqual(arg, 1)),
     151              (num_tuple, lambda arg: self.assertEqual(arg, 1)),
     152              (str_list, lambda arg: self.assertEqual(arg, "str")),
     153              (str_tuple, lambda arg: self.assertEqual(arg, "str")),
     154              (list_in_tuple, lambda arg: self.assertEqual(arg, [1])),
     155              (tuple_in_list, lambda arg: self.assertEqual(arg, (1,)))
     156          )
     157  
     158          for args, target in test_cases:
     159              with self.subTest(target=target, args=args):
     160                  t = threading.Thread(target=target, args=args)
     161                  t.start()
     162                  t.join()
     163  
     164      @cpython_only
     165      def test_disallow_instantiation(self):
     166          # Ensure that the type disallows instantiation (bpo-43916)
     167          lock = threading.Lock()
     168          test.support.check_disallow_instantiation(self, type(lock))
     169  
     170      # Create a bunch of threads, let each do some work, wait until all are
     171      # done.
     172      def test_various_ops(self):
     173          # This takes about n/3 seconds to run (about n/3 clumps of tasks,
     174          # times about 1 second per clump).
     175          NUMTASKS = 10
     176  
     177          # no more than 3 of the 10 can run at once
     178          sema = threading.BoundedSemaphore(value=3)
     179          mutex = threading.RLock()
     180          numrunning = Counter()
     181  
     182          threads = []
     183  
     184          for i in range(NUMTASKS):
     185              t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
     186              threads.append(t)
     187              self.assertIsNone(t.ident)
     188              self.assertRegex(repr(t), r'^<TestThread\(.*, initial\)>$')
     189              t.start()
     190  
     191          if hasattr(threading, 'get_native_id'):
     192              native_ids = set(t.native_id for t in threads) | {threading.get_native_id()}
     193              self.assertNotIn(None, native_ids)
     194              self.assertEqual(len(native_ids), NUMTASKS + 1)
     195  
     196          if verbose:
     197              print('waiting for all tasks to complete')
     198          for t in threads:
     199              t.join()
     200              self.assertFalse(t.is_alive())
     201              self.assertNotEqual(t.ident, 0)
     202              self.assertIsNotNone(t.ident)
     203              self.assertRegex(repr(t), r'^<TestThread\(.*, stopped -?\d+\)>$')
     204          if verbose:
     205              print('all tasks done')
     206          self.assertEqual(numrunning.get(), 0)
     207  
     208      def test_ident_of_no_threading_threads(self):
     209          # The ident still must work for the main thread and dummy threads.
     210          self.assertIsNotNone(threading.current_thread().ident)
     211          def f():
     212              ident.append(threading.current_thread().ident)
     213              done.set()
     214          done = threading.Event()
     215          ident = []
     216          with threading_helper.wait_threads_exit():
     217              tid = _thread.start_new_thread(f, ())
     218              done.wait()
     219              self.assertEqual(ident[0], tid)
     220          # Kill the "immortal" _DummyThread
     221          del threading._active[ident[0]]
     222  
     223      # run with a small(ish) thread stack size (256 KiB)
     224      def test_various_ops_small_stack(self):
     225          if verbose:
     226              print('with 256 KiB thread stack size...')
     227          try:
     228              threading.stack_size(262144)
     229          except _thread.error:
     230              raise unittest.SkipTest(
     231                  'platform does not support changing thread stack size')
     232          self.test_various_ops()
     233          threading.stack_size(0)
     234  
     235      # run with a large thread stack size (1 MiB)
     236      def test_various_ops_large_stack(self):
     237          if verbose:
     238              print('with 1 MiB thread stack size...')
     239          try:
     240              threading.stack_size(0x100000)
     241          except _thread.error:
     242              raise unittest.SkipTest(
     243                  'platform does not support changing thread stack size')
     244          self.test_various_ops()
     245          threading.stack_size(0)
     246  
     247      def test_foreign_thread(self):
     248          # Check that a "foreign" thread can use the threading module.
     249          def f(mutex):
     250              # Calling current_thread() forces an entry for the foreign
     251              # thread to get made in the threading._active map.
     252              threading.current_thread()
     253              mutex.release()
     254  
     255          mutex = threading.Lock()
     256          mutex.acquire()
     257          with threading_helper.wait_threads_exit():
     258              tid = _thread.start_new_thread(f, (mutex,))
     259              # Wait for the thread to finish.
     260              mutex.acquire()
     261          self.assertIn(tid, threading._active)
     262          self.assertIsInstance(threading._active[tid], threading._DummyThread)
     263          #Issue 29376
     264          self.assertTrue(threading._active[tid].is_alive())
     265          self.assertRegex(repr(threading._active[tid]), '_DummyThread')
     266          del threading._active[tid]
     267  
     268      # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
     269      # exposed at the Python level.  This test relies on ctypes to get at it.
     270      def test_PyThreadState_SetAsyncExc(self):
     271          ctypes = import_module("ctypes")
     272  
     273          set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc
     274          set_async_exc.argtypes = (ctypes.c_ulong, ctypes.py_object)
     275  
     276          class ESC[4;38;5;81mAsyncExc(ESC[4;38;5;149mException):
     277              pass
     278  
     279          exception = ctypes.py_object(AsyncExc)
     280  
     281          # First check it works when setting the exception from the same thread.
     282          tid = threading.get_ident()
     283          self.assertIsInstance(tid, int)
     284          self.assertGreater(tid, 0)
     285  
     286          try:
     287              result = set_async_exc(tid, exception)
     288              # The exception is async, so we might have to keep the VM busy until
     289              # it notices.
     290              while True:
     291                  pass
     292          except AsyncExc:
     293              pass
     294          else:
     295              # This code is unreachable but it reflects the intent. If we wanted
     296              # to be smarter the above loop wouldn't be infinite.
     297              self.fail("AsyncExc not raised")
     298          try:
     299              self.assertEqual(result, 1) # one thread state modified
     300          except UnboundLocalError:
     301              # The exception was raised too quickly for us to get the result.
     302              pass
     303  
     304          # `worker_started` is set by the thread when it's inside a try/except
     305          # block waiting to catch the asynchronously set AsyncExc exception.
     306          # `worker_saw_exception` is set by the thread upon catching that
     307          # exception.
     308          worker_started = threading.Event()
     309          worker_saw_exception = threading.Event()
     310  
     311          class ESC[4;38;5;81mWorker(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
     312              def run(self):
     313                  self.id = threading.get_ident()
     314                  self.finished = False
     315  
     316                  try:
     317                      while True:
     318                          worker_started.set()
     319                          time.sleep(0.1)
     320                  except AsyncExc:
     321                      self.finished = True
     322                      worker_saw_exception.set()
     323  
     324          t = Worker()
     325          t.daemon = True # so if this fails, we don't hang Python at shutdown
     326          t.start()
     327          if verbose:
     328              print("    started worker thread")
     329  
     330          # Try a thread id that doesn't make sense.
     331          if verbose:
     332              print("    trying nonsensical thread id")
     333          result = set_async_exc(-1, exception)
     334          self.assertEqual(result, 0)  # no thread states modified
     335  
     336          # Now raise an exception in the worker thread.
     337          if verbose:
     338              print("    waiting for worker thread to get started")
     339          ret = worker_started.wait()
     340          self.assertTrue(ret)
     341          if verbose:
     342              print("    verifying worker hasn't exited")
     343          self.assertFalse(t.finished)
     344          if verbose:
     345              print("    attempting to raise asynch exception in worker")
     346          result = set_async_exc(t.id, exception)
     347          self.assertEqual(result, 1) # one thread state modified
     348          if verbose:
     349              print("    waiting for worker to say it caught the exception")
     350          worker_saw_exception.wait(timeout=support.SHORT_TIMEOUT)
     351          self.assertTrue(t.finished)
     352          if verbose:
     353              print("    all OK -- joining worker")
     354          if t.finished:
     355              t.join()
     356          # else the thread is still running, and we have no way to kill it
     357  
     358      def test_limbo_cleanup(self):
     359          # Issue 7481: Failure to start thread should cleanup the limbo map.
     360          def fail_new_thread(*args):
     361              raise threading.ThreadError()
     362          _start_new_thread = threading._start_new_thread
     363          threading._start_new_thread = fail_new_thread
     364          try:
     365              t = threading.Thread(target=lambda: None)
     366              self.assertRaises(threading.ThreadError, t.start)
     367              self.assertFalse(
     368                  t in threading._limbo,
     369                  "Failed to cleanup _limbo map on failure of Thread.start().")
     370          finally:
     371              threading._start_new_thread = _start_new_thread
     372  
     373      def test_finalize_running_thread(self):
     374          # Issue 1402: the PyGILState_Ensure / _Release functions may be called
     375          # very late on python exit: on deallocation of a running thread for
     376          # example.
     377          import_module("ctypes")
     378  
     379          rc, out, err = assert_python_failure("-c", """if 1:
     380              import ctypes, sys, time, _thread
     381  
     382              # This lock is used as a simple event variable.
     383              ready = _thread.allocate_lock()
     384              ready.acquire()
     385  
     386              # Module globals are cleared before __del__ is run
     387              # So we save the functions in class dict
     388              class C:
     389                  ensure = ctypes.pythonapi.PyGILState_Ensure
     390                  release = ctypes.pythonapi.PyGILState_Release
     391                  def __del__(self):
     392                      state = self.ensure()
     393                      self.release(state)
     394  
     395              def waitingThread():
     396                  x = C()
     397                  ready.release()
     398                  time.sleep(100)
     399  
     400              _thread.start_new_thread(waitingThread, ())
     401              ready.acquire()  # Be sure the other thread is waiting.
     402              sys.exit(42)
     403              """)
     404          self.assertEqual(rc, 42)
     405  
     406      def test_finalize_with_trace(self):
     407          # Issue1733757
     408          # Avoid a deadlock when sys.settrace steps into threading._shutdown
     409          assert_python_ok("-c", """if 1:
     410              import sys, threading
     411  
     412              # A deadlock-killer, to prevent the
     413              # testsuite to hang forever
     414              def killer():
     415                  import os, time
     416                  time.sleep(2)
     417                  print('program blocked; aborting')
     418                  os._exit(2)
     419              t = threading.Thread(target=killer)
     420              t.daemon = True
     421              t.start()
     422  
     423              # This is the trace function
     424              def func(frame, event, arg):
     425                  threading.current_thread()
     426                  return func
     427  
     428              sys.settrace(func)
     429              """)
     430  
     431      def test_join_nondaemon_on_shutdown(self):
     432          # Issue 1722344
     433          # Raising SystemExit skipped threading._shutdown
     434          rc, out, err = assert_python_ok("-c", """if 1:
     435                  import threading
     436                  from time import sleep
     437  
     438                  def child():
     439                      sleep(1)
     440                      # As a non-daemon thread we SHOULD wake up and nothing
     441                      # should be torn down yet
     442                      print("Woke up, sleep function is:", sleep)
     443  
     444                  threading.Thread(target=child).start()
     445                  raise SystemExit
     446              """)
     447          self.assertEqual(out.strip(),
     448              b"Woke up, sleep function is: <built-in function sleep>")
     449          self.assertEqual(err, b"")
     450  
     451      def test_enumerate_after_join(self):
     452          # Try hard to trigger #1703448: a thread is still returned in
     453          # threading.enumerate() after it has been join()ed.
     454          enum = threading.enumerate
     455          old_interval = sys.getswitchinterval()
     456          try:
     457              for i in range(1, 100):
     458                  sys.setswitchinterval(i * 0.0002)
     459                  t = threading.Thread(target=lambda: None)
     460                  t.start()
     461                  t.join()
     462                  l = enum()
     463                  self.assertNotIn(t, l,
     464                      "#1703448 triggered after %d trials: %s" % (i, l))
     465          finally:
     466              sys.setswitchinterval(old_interval)
     467  
     468      def test_no_refcycle_through_target(self):
     469          class ESC[4;38;5;81mRunSelfFunction(ESC[4;38;5;149mobject):
     470              def __init__(self, should_raise):
     471                  # The links in this refcycle from Thread back to self
     472                  # should be cleaned up when the thread completes.
     473                  self.should_raise = should_raise
     474                  self.thread = threading.Thread(target=self._run,
     475                                                 args=(self,),
     476                                                 kwargs={'yet_another':self})
     477                  self.thread.start()
     478  
     479              def _run(self, other_ref, yet_another):
     480                  if self.should_raise:
     481                      raise SystemExit
     482  
     483          restore_default_excepthook(self)
     484  
     485          cyclic_object = RunSelfFunction(should_raise=False)
     486          weak_cyclic_object = weakref.ref(cyclic_object)
     487          cyclic_object.thread.join()
     488          del cyclic_object
     489          self.assertIsNone(weak_cyclic_object(),
     490                           msg=('%d references still around' %
     491                                sys.getrefcount(weak_cyclic_object())))
     492  
     493          raising_cyclic_object = RunSelfFunction(should_raise=True)
     494          weak_raising_cyclic_object = weakref.ref(raising_cyclic_object)
     495          raising_cyclic_object.thread.join()
     496          del raising_cyclic_object
     497          self.assertIsNone(weak_raising_cyclic_object(),
     498                           msg=('%d references still around' %
     499                                sys.getrefcount(weak_raising_cyclic_object())))
     500  
     501      def test_old_threading_api(self):
     502          # Just a quick sanity check to make sure the old method names are
     503          # still present
     504          t = threading.Thread()
     505          with self.assertWarnsRegex(DeprecationWarning,
     506                                     r'get the daemon attribute'):
     507              t.isDaemon()
     508          with self.assertWarnsRegex(DeprecationWarning,
     509                                     r'set the daemon attribute'):
     510              t.setDaemon(True)
     511          with self.assertWarnsRegex(DeprecationWarning,
     512                                     r'get the name attribute'):
     513              t.getName()
     514          with self.assertWarnsRegex(DeprecationWarning,
     515                                     r'set the name attribute'):
     516              t.setName("name")
     517  
     518          e = threading.Event()
     519          with self.assertWarnsRegex(DeprecationWarning, 'use is_set()'):
     520              e.isSet()
     521  
     522          cond = threading.Condition()
     523          cond.acquire()
     524          with self.assertWarnsRegex(DeprecationWarning, 'use notify_all()'):
     525              cond.notifyAll()
     526  
     527          with self.assertWarnsRegex(DeprecationWarning, 'use active_count()'):
     528              threading.activeCount()
     529          with self.assertWarnsRegex(DeprecationWarning, 'use current_thread()'):
     530              threading.currentThread()
     531  
     532      def test_repr_daemon(self):
     533          t = threading.Thread()
     534          self.assertNotIn('daemon', repr(t))
     535          t.daemon = True
     536          self.assertIn('daemon', repr(t))
     537  
     538      def test_daemon_param(self):
     539          t = threading.Thread()
     540          self.assertFalse(t.daemon)
     541          t = threading.Thread(daemon=False)
     542          self.assertFalse(t.daemon)
     543          t = threading.Thread(daemon=True)
     544          self.assertTrue(t.daemon)
     545  
     546      @skip_unless_reliable_fork
     547      def test_fork_at_exit(self):
     548          # bpo-42350: Calling os.fork() after threading._shutdown() must
     549          # not log an error.
     550          code = textwrap.dedent("""
     551              import atexit
     552              import os
     553              import sys
     554              from test.support import wait_process
     555  
     556              # Import the threading module to register its "at fork" callback
     557              import threading
     558  
     559              def exit_handler():
     560                  pid = os.fork()
     561                  if not pid:
     562                      print("child process ok", file=sys.stderr, flush=True)
     563                      # child process
     564                  else:
     565                      wait_process(pid, exitcode=0)
     566  
     567              # exit_handler() will be called after threading._shutdown()
     568              atexit.register(exit_handler)
     569          """)
     570          _, out, err = assert_python_ok("-c", code)
     571          self.assertEqual(out, b'')
     572          self.assertEqual(err.rstrip(), b'child process ok')
     573  
     574      @skip_unless_reliable_fork
     575      def test_dummy_thread_after_fork(self):
     576          # Issue #14308: a dummy thread in the active list doesn't mess up
     577          # the after-fork mechanism.
     578          code = """if 1:
     579              import _thread, threading, os, time
     580  
     581              def background_thread(evt):
     582                  # Creates and registers the _DummyThread instance
     583                  threading.current_thread()
     584                  evt.set()
     585                  time.sleep(10)
     586  
     587              evt = threading.Event()
     588              _thread.start_new_thread(background_thread, (evt,))
     589              evt.wait()
     590              assert threading.active_count() == 2, threading.active_count()
     591              if os.fork() == 0:
     592                  assert threading.active_count() == 1, threading.active_count()
     593                  os._exit(0)
     594              else:
     595                  os.wait()
     596          """
     597          _, out, err = assert_python_ok("-c", code)
     598          self.assertEqual(out, b'')
     599          self.assertEqual(err, b'')
     600  
     601      @skip_unless_reliable_fork
     602      def test_is_alive_after_fork(self):
     603          # Try hard to trigger #18418: is_alive() could sometimes be True on
     604          # threads that vanished after a fork.
     605          old_interval = sys.getswitchinterval()
     606          self.addCleanup(sys.setswitchinterval, old_interval)
     607  
     608          # Make the bug more likely to manifest.
     609          test.support.setswitchinterval(1e-6)
     610  
     611          for i in range(20):
     612              t = threading.Thread(target=lambda: None)
     613              t.start()
     614              pid = os.fork()
     615              if pid == 0:
     616                  os._exit(11 if t.is_alive() else 10)
     617              else:
     618                  t.join()
     619  
     620                  support.wait_process(pid, exitcode=10)
     621  
     622      def test_main_thread(self):
     623          main = threading.main_thread()
     624          self.assertEqual(main.name, 'MainThread')
     625          self.assertEqual(main.ident, threading.current_thread().ident)
     626          self.assertEqual(main.ident, threading.get_ident())
     627  
     628          def f():
     629              self.assertNotEqual(threading.main_thread().ident,
     630                                  threading.current_thread().ident)
     631          th = threading.Thread(target=f)
     632          th.start()
     633          th.join()
     634  
     635      @skip_unless_reliable_fork
     636      @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
     637      def test_main_thread_after_fork(self):
     638          code = """if 1:
     639              import os, threading
     640              from test import support
     641  
     642              pid = os.fork()
     643              if pid == 0:
     644                  main = threading.main_thread()
     645                  print(main.name)
     646                  print(main.ident == threading.current_thread().ident)
     647                  print(main.ident == threading.get_ident())
     648              else:
     649                  support.wait_process(pid, exitcode=0)
     650          """
     651          _, out, err = assert_python_ok("-c", code)
     652          data = out.decode().replace('\r', '')
     653          self.assertEqual(err, b"")
     654          self.assertEqual(data, "MainThread\nTrue\nTrue\n")
     655  
     656      @skip_unless_reliable_fork
     657      @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
     658      def test_main_thread_after_fork_from_nonmain_thread(self):
     659          code = """if 1:
     660              import os, threading, sys
     661              from test import support
     662  
     663              def func():
     664                  pid = os.fork()
     665                  if pid == 0:
     666                      main = threading.main_thread()
     667                      print(main.name)
     668                      print(main.ident == threading.current_thread().ident)
     669                      print(main.ident == threading.get_ident())
     670                      # stdout is fully buffered because not a tty,
     671                      # we have to flush before exit.
     672                      sys.stdout.flush()
     673                  else:
     674                      support.wait_process(pid, exitcode=0)
     675  
     676              th = threading.Thread(target=func)
     677              th.start()
     678              th.join()
     679          """
     680          _, out, err = assert_python_ok("-c", code)
     681          data = out.decode().replace('\r', '')
     682          self.assertEqual(err, b"")
     683          self.assertEqual(data, "Thread-1 (func)\nTrue\nTrue\n")
     684  
     685      def test_main_thread_during_shutdown(self):
     686          # bpo-31516: current_thread() should still point to the main thread
     687          # at shutdown
     688          code = """if 1:
     689              import gc, threading
     690  
     691              main_thread = threading.current_thread()
     692              assert main_thread is threading.main_thread()  # sanity check
     693  
     694              class RefCycle:
     695                  def __init__(self):
     696                      self.cycle = self
     697  
     698                  def __del__(self):
     699                      print("GC:",
     700                            threading.current_thread() is main_thread,
     701                            threading.main_thread() is main_thread,
     702                            threading.enumerate() == [main_thread])
     703  
     704              RefCycle()
     705              gc.collect()  # sanity check
     706              x = RefCycle()
     707          """
     708          _, out, err = assert_python_ok("-c", code)
     709          data = out.decode()
     710          self.assertEqual(err, b"")
     711          self.assertEqual(data.splitlines(),
     712                           ["GC: True True True"] * 2)
     713  
     714      def test_finalization_shutdown(self):
     715          # bpo-36402: Py_Finalize() calls threading._shutdown() which must wait
     716          # until Python thread states of all non-daemon threads get deleted.
     717          #
     718          # Test similar to SubinterpThreadingTests.test_threads_join_2(), but
     719          # test the finalization of the main interpreter.
     720          code = """if 1:
     721              import os
     722              import threading
     723              import time
     724              import random
     725  
     726              def random_sleep():
     727                  seconds = random.random() * 0.010
     728                  time.sleep(seconds)
     729  
     730              class Sleeper:
     731                  def __del__(self):
     732                      random_sleep()
     733  
     734              tls = threading.local()
     735  
     736              def f():
     737                  # Sleep a bit so that the thread is still running when
     738                  # Py_Finalize() is called.
     739                  random_sleep()
     740                  tls.x = Sleeper()
     741                  random_sleep()
     742  
     743              threading.Thread(target=f).start()
     744              random_sleep()
     745          """
     746          rc, out, err = assert_python_ok("-c", code)
     747          self.assertEqual(err, b"")
     748  
     749      def test_tstate_lock(self):
     750          # Test an implementation detail of Thread objects.
     751          started = _thread.allocate_lock()
     752          finish = _thread.allocate_lock()
     753          started.acquire()
     754          finish.acquire()
     755          def f():
     756              started.release()
     757              finish.acquire()
     758              time.sleep(0.01)
     759          # The tstate lock is None until the thread is started
     760          t = threading.Thread(target=f)
     761          self.assertIs(t._tstate_lock, None)
     762          t.start()
     763          started.acquire()
     764          self.assertTrue(t.is_alive())
     765          # The tstate lock can't be acquired when the thread is running
     766          # (or suspended).
     767          tstate_lock = t._tstate_lock
     768          self.assertFalse(tstate_lock.acquire(timeout=0), False)
     769          finish.release()
     770          # When the thread ends, the state_lock can be successfully
     771          # acquired.
     772          self.assertTrue(tstate_lock.acquire(timeout=support.SHORT_TIMEOUT), False)
     773          # But is_alive() is still True:  we hold _tstate_lock now, which
     774          # prevents is_alive() from knowing the thread's end-of-life C code
     775          # is done.
     776          self.assertTrue(t.is_alive())
     777          # Let is_alive() find out the C code is done.
     778          tstate_lock.release()
     779          self.assertFalse(t.is_alive())
     780          # And verify the thread disposed of _tstate_lock.
     781          self.assertIsNone(t._tstate_lock)
     782          t.join()
     783  
     784      def test_repr_stopped(self):
     785          # Verify that "stopped" shows up in repr(Thread) appropriately.
     786          started = _thread.allocate_lock()
     787          finish = _thread.allocate_lock()
     788          started.acquire()
     789          finish.acquire()
     790          def f():
     791              started.release()
     792              finish.acquire()
     793          t = threading.Thread(target=f)
     794          t.start()
     795          started.acquire()
     796          self.assertIn("started", repr(t))
     797          finish.release()
     798          # "stopped" should appear in the repr in a reasonable amount of time.
     799          # Implementation detail:  as of this writing, that's trivially true
     800          # if .join() is called, and almost trivially true if .is_alive() is
     801          # called.  The detail we're testing here is that "stopped" shows up
     802          # "all on its own".
     803          LOOKING_FOR = "stopped"
     804          for i in range(500):
     805              if LOOKING_FOR in repr(t):
     806                  break
     807              time.sleep(0.01)
     808          self.assertIn(LOOKING_FOR, repr(t)) # we waited at least 5 seconds
     809          t.join()
     810  
     811      def test_BoundedSemaphore_limit(self):
     812          # BoundedSemaphore should raise ValueError if released too often.
     813          for limit in range(1, 10):
     814              bs = threading.BoundedSemaphore(limit)
     815              threads = [threading.Thread(target=bs.acquire)
     816                         for _ in range(limit)]
     817              for t in threads:
     818                  t.start()
     819              for t in threads:
     820                  t.join()
     821              threads = [threading.Thread(target=bs.release)
     822                         for _ in range(limit)]
     823              for t in threads:
     824                  t.start()
     825              for t in threads:
     826                  t.join()
     827              self.assertRaises(ValueError, bs.release)
     828  
     829      @cpython_only
     830      def test_frame_tstate_tracing(self):
     831          # Issue #14432: Crash when a generator is created in a C thread that is
     832          # destroyed while the generator is still used. The issue was that a
     833          # generator contains a frame, and the frame kept a reference to the
     834          # Python state of the destroyed C thread. The crash occurs when a trace
     835          # function is setup.
     836  
     837          def noop_trace(frame, event, arg):
     838              # no operation
     839              return noop_trace
     840  
     841          def generator():
     842              while 1:
     843                  yield "generator"
     844  
     845          def callback():
     846              if callback.gen is None:
     847                  callback.gen = generator()
     848              return next(callback.gen)
     849          callback.gen = None
     850  
     851          old_trace = sys.gettrace()
     852          sys.settrace(noop_trace)
     853          try:
     854              # Install a trace function
     855              threading.settrace(noop_trace)
     856  
     857              # Create a generator in a C thread which exits after the call
     858              import _testcapi
     859              _testcapi.call_in_temporary_c_thread(callback)
     860  
     861              # Call the generator in a different Python thread, check that the
     862              # generator didn't keep a reference to the destroyed thread state
     863              for test in range(3):
     864                  # The trace function is still called here
     865                  callback()
     866          finally:
     867              sys.settrace(old_trace)
     868  
     869      def test_gettrace(self):
     870          def noop_trace(frame, event, arg):
     871              # no operation
     872              return noop_trace
     873          old_trace = threading.gettrace()
     874          try:
     875              threading.settrace(noop_trace)
     876              trace_func = threading.gettrace()
     877              self.assertEqual(noop_trace,trace_func)
     878          finally:
     879              threading.settrace(old_trace)
     880  
     881      def test_getprofile(self):
     882          def fn(*args): pass
     883          old_profile = threading.getprofile()
     884          try:
     885              threading.setprofile(fn)
     886              self.assertEqual(fn, threading.getprofile())
     887          finally:
     888              threading.setprofile(old_profile)
     889  
     890      @cpython_only
     891      def test_shutdown_locks(self):
     892          for daemon in (False, True):
     893              with self.subTest(daemon=daemon):
     894                  event = threading.Event()
     895                  thread = threading.Thread(target=event.wait, daemon=daemon)
     896  
     897                  # Thread.start() must add lock to _shutdown_locks,
     898                  # but only for non-daemon thread
     899                  thread.start()
     900                  tstate_lock = thread._tstate_lock
     901                  if not daemon:
     902                      self.assertIn(tstate_lock, threading._shutdown_locks)
     903                  else:
     904                      self.assertNotIn(tstate_lock, threading._shutdown_locks)
     905  
     906                  # unblock the thread and join it
     907                  event.set()
     908                  thread.join()
     909  
     910                  # Thread._stop() must remove tstate_lock from _shutdown_locks.
     911                  # Daemon threads must never add it to _shutdown_locks.
     912                  self.assertNotIn(tstate_lock, threading._shutdown_locks)
     913  
     914      def test_locals_at_exit(self):
     915          # bpo-19466: thread locals must not be deleted before destructors
     916          # are called
     917          rc, out, err = assert_python_ok("-c", """if 1:
     918              import threading
     919  
     920              class Atexit:
     921                  def __del__(self):
     922                      print("thread_dict.atexit = %r" % thread_dict.atexit)
     923  
     924              thread_dict = threading.local()
     925              thread_dict.atexit = "value"
     926  
     927              atexit = Atexit()
     928          """)
     929          self.assertEqual(out.rstrip(), b"thread_dict.atexit = 'value'")
     930  
     931      def test_boolean_target(self):
     932          # bpo-41149: A thread that had a boolean value of False would not
     933          # run, regardless of whether it was callable. The correct behaviour
     934          # is for a thread to do nothing if its target is None, and to call
     935          # the target otherwise.
     936          class ESC[4;38;5;81mBooleanTarget(ESC[4;38;5;149mobject):
     937              def __init__(self):
     938                  self.ran = False
     939              def __bool__(self):
     940                  return False
     941              def __call__(self):
     942                  self.ran = True
     943  
     944          target = BooleanTarget()
     945          thread = threading.Thread(target=target)
     946          thread.start()
     947          thread.join()
     948          self.assertTrue(target.ran)
     949  
     950      def test_leak_without_join(self):
     951          # bpo-37788: Test that a thread which is not joined explicitly
     952          # does not leak. Test written for reference leak checks.
     953          def noop(): pass
     954          with threading_helper.wait_threads_exit():
     955              threading.Thread(target=noop).start()
     956              # Thread.join() is not called
     957  
     958      @unittest.skipUnless(Py_DEBUG, 'need debug build (Py_DEBUG)')
     959      def test_debug_deprecation(self):
     960          # bpo-44584: The PYTHONTHREADDEBUG environment variable is deprecated
     961          rc, out, err = assert_python_ok("-Wdefault", "-c", "pass",
     962                                          PYTHONTHREADDEBUG="1")
     963          msg = (b'DeprecationWarning: The threading debug '
     964                 b'(PYTHONTHREADDEBUG environment variable) '
     965                 b'is deprecated and will be removed in Python 3.12')
     966          self.assertIn(msg, err)
     967  
     968      def test_import_from_another_thread(self):
     969          # bpo-1596321: If the threading module is first import from a thread
     970          # different than the main thread, threading._shutdown() must handle
     971          # this case without logging an error at Python exit.
     972          code = textwrap.dedent('''
     973              import _thread
     974              import sys
     975  
     976              event = _thread.allocate_lock()
     977              event.acquire()
     978  
     979              def import_threading():
     980                  import threading
     981                  event.release()
     982  
     983              if 'threading' in sys.modules:
     984                  raise Exception('threading is already imported')
     985  
     986              _thread.start_new_thread(import_threading, ())
     987  
     988              # wait until the threading module is imported
     989              event.acquire()
     990              event.release()
     991  
     992              if 'threading' not in sys.modules:
     993                  raise Exception('threading is not imported')
     994  
     995              # don't wait until the thread completes
     996          ''')
     997          rc, out, err = assert_python_ok("-c", code)
     998          self.assertEqual(out, b'')
     999          self.assertEqual(err, b'')
    1000  
    1001  
    1002  class ESC[4;38;5;81mThreadJoinOnShutdown(ESC[4;38;5;149mBaseTestCase):
    1003  
    1004      def _run_and_join(self, script):
    1005          script = """if 1:
    1006              import sys, os, time, threading
    1007  
    1008              # a thread, which waits for the main program to terminate
    1009              def joiningfunc(mainthread):
    1010                  mainthread.join()
    1011                  print('end of thread')
    1012                  # stdout is fully buffered because not a tty, we have to flush
    1013                  # before exit.
    1014                  sys.stdout.flush()
    1015          \n""" + script
    1016  
    1017          rc, out, err = assert_python_ok("-c", script)
    1018          data = out.decode().replace('\r', '')
    1019          self.assertEqual(data, "end of main\nend of thread\n")
    1020  
    1021      def test_1_join_on_shutdown(self):
    1022          # The usual case: on exit, wait for a non-daemon thread
    1023          script = """if 1:
    1024              import os
    1025              t = threading.Thread(target=joiningfunc,
    1026                                   args=(threading.current_thread(),))
    1027              t.start()
    1028              time.sleep(0.1)
    1029              print('end of main')
    1030              """
    1031          self._run_and_join(script)
    1032  
    1033      @skip_unless_reliable_fork
    1034      def test_2_join_in_forked_process(self):
    1035          # Like the test above, but from a forked interpreter
    1036          script = """if 1:
    1037              from test import support
    1038  
    1039              childpid = os.fork()
    1040              if childpid != 0:
    1041                  # parent process
    1042                  support.wait_process(childpid, exitcode=0)
    1043                  sys.exit(0)
    1044  
    1045              # child process
    1046              t = threading.Thread(target=joiningfunc,
    1047                                   args=(threading.current_thread(),))
    1048              t.start()
    1049              print('end of main')
    1050              """
    1051          self._run_and_join(script)
    1052  
    1053      @skip_unless_reliable_fork
    1054      def test_3_join_in_forked_from_thread(self):
    1055          # Like the test above, but fork() was called from a worker thread
    1056          # In the forked process, the main Thread object must be marked as stopped.
    1057  
    1058          script = """if 1:
    1059              from test import support
    1060  
    1061              main_thread = threading.current_thread()
    1062              def worker():
    1063                  childpid = os.fork()
    1064                  if childpid != 0:
    1065                      # parent process
    1066                      support.wait_process(childpid, exitcode=0)
    1067                      sys.exit(0)
    1068  
    1069                  # child process
    1070                  t = threading.Thread(target=joiningfunc,
    1071                                       args=(main_thread,))
    1072                  print('end of main')
    1073                  t.start()
    1074                  t.join() # Should not block: main_thread is already stopped
    1075  
    1076              w = threading.Thread(target=worker)
    1077              w.start()
    1078              """
    1079          self._run_and_join(script)
    1080  
    1081      @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
    1082      def test_4_daemon_threads(self):
    1083          # Check that a daemon thread cannot crash the interpreter on shutdown
    1084          # by manipulating internal structures that are being disposed of in
    1085          # the main thread.
    1086          script = """if True:
    1087              import os
    1088              import random
    1089              import sys
    1090              import time
    1091              import threading
    1092  
    1093              thread_has_run = set()
    1094  
    1095              def random_io():
    1096                  '''Loop for a while sleeping random tiny amounts and doing some I/O.'''
    1097                  import test.test_threading as mod
    1098                  while True:
    1099                      with open(mod.__file__, 'rb') as in_f:
    1100                          stuff = in_f.read(200)
    1101                          with open(os.devnull, 'wb') as null_f:
    1102                              null_f.write(stuff)
    1103                              time.sleep(random.random() / 1995)
    1104                      thread_has_run.add(threading.current_thread())
    1105  
    1106              def main():
    1107                  count = 0
    1108                  for _ in range(40):
    1109                      new_thread = threading.Thread(target=random_io)
    1110                      new_thread.daemon = True
    1111                      new_thread.start()
    1112                      count += 1
    1113                  while len(thread_has_run) < count:
    1114                      time.sleep(0.001)
    1115                  # Trigger process shutdown
    1116                  sys.exit(0)
    1117  
    1118              main()
    1119              """
    1120          rc, out, err = assert_python_ok('-c', script)
    1121          self.assertFalse(err)
    1122  
    1123      @skip_unless_reliable_fork
    1124      def test_reinit_tls_after_fork(self):
    1125          # Issue #13817: fork() would deadlock in a multithreaded program with
    1126          # the ad-hoc TLS implementation.
    1127  
    1128          def do_fork_and_wait():
    1129              # just fork a child process and wait it
    1130              pid = os.fork()
    1131              if pid > 0:
    1132                  support.wait_process(pid, exitcode=50)
    1133              else:
    1134                  os._exit(50)
    1135  
    1136          # start a bunch of threads that will fork() child processes
    1137          threads = []
    1138          for i in range(16):
    1139              t = threading.Thread(target=do_fork_and_wait)
    1140              threads.append(t)
    1141              t.start()
    1142  
    1143          for t in threads:
    1144              t.join()
    1145  
    1146      @skip_unless_reliable_fork
    1147      def test_clear_threads_states_after_fork(self):
    1148          # Issue #17094: check that threads states are cleared after fork()
    1149  
    1150          # start a bunch of threads
    1151          threads = []
    1152          for i in range(16):
    1153              t = threading.Thread(target=lambda : time.sleep(0.3))
    1154              threads.append(t)
    1155              t.start()
    1156  
    1157          pid = os.fork()
    1158          if pid == 0:
    1159              # check that threads states have been cleared
    1160              if len(sys._current_frames()) == 1:
    1161                  os._exit(51)
    1162              else:
    1163                  os._exit(52)
    1164          else:
    1165              support.wait_process(pid, exitcode=51)
    1166  
    1167          for t in threads:
    1168              t.join()
    1169  
    1170  
    1171  class ESC[4;38;5;81mSubinterpThreadingTests(ESC[4;38;5;149mBaseTestCase):
    1172      def pipe(self):
    1173          r, w = os.pipe()
    1174          self.addCleanup(os.close, r)
    1175          self.addCleanup(os.close, w)
    1176          if hasattr(os, 'set_blocking'):
    1177              os.set_blocking(r, False)
    1178          return (r, w)
    1179  
    1180      def test_threads_join(self):
    1181          # Non-daemon threads should be joined at subinterpreter shutdown
    1182          # (issue #18808)
    1183          r, w = self.pipe()
    1184          code = textwrap.dedent(r"""
    1185              import os
    1186              import random
    1187              import threading
    1188              import time
    1189  
    1190              def random_sleep():
    1191                  seconds = random.random() * 0.010
    1192                  time.sleep(seconds)
    1193  
    1194              def f():
    1195                  # Sleep a bit so that the thread is still running when
    1196                  # Py_EndInterpreter is called.
    1197                  random_sleep()
    1198                  os.write(%d, b"x")
    1199  
    1200              threading.Thread(target=f).start()
    1201              random_sleep()
    1202          """ % (w,))
    1203          ret = test.support.run_in_subinterp(code)
    1204          self.assertEqual(ret, 0)
    1205          # The thread was joined properly.
    1206          self.assertEqual(os.read(r, 1), b"x")
    1207  
    1208      def test_threads_join_2(self):
    1209          # Same as above, but a delay gets introduced after the thread's
    1210          # Python code returned but before the thread state is deleted.
    1211          # To achieve this, we register a thread-local object which sleeps
    1212          # a bit when deallocated.
    1213          r, w = self.pipe()
    1214          code = textwrap.dedent(r"""
    1215              import os
    1216              import random
    1217              import threading
    1218              import time
    1219  
    1220              def random_sleep():
    1221                  seconds = random.random() * 0.010
    1222                  time.sleep(seconds)
    1223  
    1224              class Sleeper:
    1225                  def __del__(self):
    1226                      random_sleep()
    1227  
    1228              tls = threading.local()
    1229  
    1230              def f():
    1231                  # Sleep a bit so that the thread is still running when
    1232                  # Py_EndInterpreter is called.
    1233                  random_sleep()
    1234                  tls.x = Sleeper()
    1235                  os.write(%d, b"x")
    1236  
    1237              threading.Thread(target=f).start()
    1238              random_sleep()
    1239          """ % (w,))
    1240          ret = test.support.run_in_subinterp(code)
    1241          self.assertEqual(ret, 0)
    1242          # The thread was joined properly.
    1243          self.assertEqual(os.read(r, 1), b"x")
    1244  
    1245      @cpython_only
    1246      def test_daemon_threads_fatal_error(self):
    1247          subinterp_code = f"""if 1:
    1248              import os
    1249              import threading
    1250              import time
    1251  
    1252              def f():
    1253                  # Make sure the daemon thread is still running when
    1254                  # Py_EndInterpreter is called.
    1255                  time.sleep({test.support.SHORT_TIMEOUT})
    1256              threading.Thread(target=f, daemon=True).start()
    1257              """
    1258          script = r"""if 1:
    1259              import _testcapi
    1260  
    1261              _testcapi.run_in_subinterp(%r)
    1262              """ % (subinterp_code,)
    1263          with test.support.SuppressCrashReport():
    1264              rc, out, err = assert_python_failure("-c", script)
    1265          self.assertIn("Fatal Python error: Py_EndInterpreter: "
    1266                        "not the last thread", err.decode())
    1267  
    1268  
    1269  class ESC[4;38;5;81mThreadingExceptionTests(ESC[4;38;5;149mBaseTestCase):
    1270      # A RuntimeError should be raised if Thread.start() is called
    1271      # multiple times.
    1272      def test_start_thread_again(self):
    1273          thread = threading.Thread()
    1274          thread.start()
    1275          self.assertRaises(RuntimeError, thread.start)
    1276          thread.join()
    1277  
    1278      def test_joining_current_thread(self):
    1279          current_thread = threading.current_thread()
    1280          self.assertRaises(RuntimeError, current_thread.join);
    1281  
    1282      def test_joining_inactive_thread(self):
    1283          thread = threading.Thread()
    1284          self.assertRaises(RuntimeError, thread.join)
    1285  
    1286      def test_daemonize_active_thread(self):
    1287          thread = threading.Thread()
    1288          thread.start()
    1289          self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
    1290          thread.join()
    1291  
    1292      def test_releasing_unacquired_lock(self):
    1293          lock = threading.Lock()
    1294          self.assertRaises(RuntimeError, lock.release)
    1295  
    1296      @requires_subprocess()
    1297      def test_recursion_limit(self):
    1298          # Issue 9670
    1299          # test that excessive recursion within a non-main thread causes
    1300          # an exception rather than crashing the interpreter on platforms
    1301          # like Mac OS X or FreeBSD which have small default stack sizes
    1302          # for threads
    1303          script = """if True:
    1304              import threading
    1305  
    1306              def recurse():
    1307                  return recurse()
    1308  
    1309              def outer():
    1310                  try:
    1311                      recurse()
    1312                  except RecursionError:
    1313                      pass
    1314  
    1315              w = threading.Thread(target=outer)
    1316              w.start()
    1317              w.join()
    1318              print('end of main thread')
    1319              """
    1320          expected_output = "end of main thread\n"
    1321          p = subprocess.Popen([sys.executable, "-c", script],
    1322                               stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    1323          stdout, stderr = p.communicate()
    1324          data = stdout.decode().replace('\r', '')
    1325          self.assertEqual(p.returncode, 0, "Unexpected error: " + stderr.decode())
    1326          self.assertEqual(data, expected_output)
    1327  
    1328      def test_print_exception(self):
    1329          script = r"""if True:
    1330              import threading
    1331              import time
    1332  
    1333              running = False
    1334              def run():
    1335                  global running
    1336                  running = True
    1337                  while running:
    1338                      time.sleep(0.01)
    1339                  1/0
    1340              t = threading.Thread(target=run)
    1341              t.start()
    1342              while not running:
    1343                  time.sleep(0.01)
    1344              running = False
    1345              t.join()
    1346              """
    1347          rc, out, err = assert_python_ok("-c", script)
    1348          self.assertEqual(out, b'')
    1349          err = err.decode()
    1350          self.assertIn("Exception in thread", err)
    1351          self.assertIn("Traceback (most recent call last):", err)
    1352          self.assertIn("ZeroDivisionError", err)
    1353          self.assertNotIn("Unhandled exception", err)
    1354  
    1355      def test_print_exception_stderr_is_none_1(self):
    1356          script = r"""if True:
    1357              import sys
    1358              import threading
    1359              import time
    1360  
    1361              running = False
    1362              def run():
    1363                  global running
    1364                  running = True
    1365                  while running:
    1366                      time.sleep(0.01)
    1367                  1/0
    1368              t = threading.Thread(target=run)
    1369              t.start()
    1370              while not running:
    1371                  time.sleep(0.01)
    1372              sys.stderr = None
    1373              running = False
    1374              t.join()
    1375              """
    1376          rc, out, err = assert_python_ok("-c", script)
    1377          self.assertEqual(out, b'')
    1378          err = err.decode()
    1379          self.assertIn("Exception in thread", err)
    1380          self.assertIn("Traceback (most recent call last):", err)
    1381          self.assertIn("ZeroDivisionError", err)
    1382          self.assertNotIn("Unhandled exception", err)
    1383  
    1384      def test_print_exception_stderr_is_none_2(self):
    1385          script = r"""if True:
    1386              import sys
    1387              import threading
    1388              import time
    1389  
    1390              running = False
    1391              def run():
    1392                  global running
    1393                  running = True
    1394                  while running:
    1395                      time.sleep(0.01)
    1396                  1/0
    1397              sys.stderr = None
    1398              t = threading.Thread(target=run)
    1399              t.start()
    1400              while not running:
    1401                  time.sleep(0.01)
    1402              running = False
    1403              t.join()
    1404              """
    1405          rc, out, err = assert_python_ok("-c", script)
    1406          self.assertEqual(out, b'')
    1407          self.assertNotIn("Unhandled exception", err.decode())
    1408  
    1409      def test_bare_raise_in_brand_new_thread(self):
    1410          def bare_raise():
    1411              raise
    1412  
    1413          class ESC[4;38;5;81mIssue27558(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
    1414              exc = None
    1415  
    1416              def run(self):
    1417                  try:
    1418                      bare_raise()
    1419                  except Exception as exc:
    1420                      self.exc = exc
    1421  
    1422          thread = Issue27558()
    1423          thread.start()
    1424          thread.join()
    1425          self.assertIsNotNone(thread.exc)
    1426          self.assertIsInstance(thread.exc, RuntimeError)
    1427          # explicitly break the reference cycle to not leak a dangling thread
    1428          thread.exc = None
    1429  
    1430      def test_multithread_modify_file_noerror(self):
    1431          # See issue25872
    1432          def modify_file():
    1433              with open(os_helper.TESTFN, 'w', encoding='utf-8') as fp:
    1434                  fp.write(' ')
    1435                  traceback.format_stack()
    1436  
    1437          self.addCleanup(os_helper.unlink, os_helper.TESTFN)
    1438          threads = [
    1439              threading.Thread(target=modify_file)
    1440              for i in range(100)
    1441          ]
    1442          for t in threads:
    1443              t.start()
    1444              t.join()
    1445  
    1446  
    1447  class ESC[4;38;5;81mThreadRunFail(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
    1448      def run(self):
    1449          raise ValueError("run failed")
    1450  
    1451  
    1452  class ESC[4;38;5;81mExceptHookTests(ESC[4;38;5;149mBaseTestCase):
    1453      def setUp(self):
    1454          restore_default_excepthook(self)
    1455          super().setUp()
    1456  
    1457      def test_excepthook(self):
    1458          with support.captured_output("stderr") as stderr:
    1459              thread = ThreadRunFail(name="excepthook thread")
    1460              thread.start()
    1461              thread.join()
    1462  
    1463          stderr = stderr.getvalue().strip()
    1464          self.assertIn(f'Exception in thread {thread.name}:\n', stderr)
    1465          self.assertIn('Traceback (most recent call last):\n', stderr)
    1466          self.assertIn('  raise ValueError("run failed")', stderr)
    1467          self.assertIn('ValueError: run failed', stderr)
    1468  
    1469      @support.cpython_only
    1470      def test_excepthook_thread_None(self):
    1471          # threading.excepthook called with thread=None: log the thread
    1472          # identifier in this case.
    1473          with support.captured_output("stderr") as stderr:
    1474              try:
    1475                  raise ValueError("bug")
    1476              except Exception as exc:
    1477                  args = threading.ExceptHookArgs([*sys.exc_info(), None])
    1478                  try:
    1479                      threading.excepthook(args)
    1480                  finally:
    1481                      # Explicitly break a reference cycle
    1482                      args = None
    1483  
    1484          stderr = stderr.getvalue().strip()
    1485          self.assertIn(f'Exception in thread {threading.get_ident()}:\n', stderr)
    1486          self.assertIn('Traceback (most recent call last):\n', stderr)
    1487          self.assertIn('  raise ValueError("bug")', stderr)
    1488          self.assertIn('ValueError: bug', stderr)
    1489  
    1490      def test_system_exit(self):
    1491          class ESC[4;38;5;81mThreadExit(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
    1492              def run(self):
    1493                  sys.exit(1)
    1494  
    1495          # threading.excepthook() silently ignores SystemExit
    1496          with support.captured_output("stderr") as stderr:
    1497              thread = ThreadExit()
    1498              thread.start()
    1499              thread.join()
    1500  
    1501          self.assertEqual(stderr.getvalue(), '')
    1502  
    1503      def test_custom_excepthook(self):
    1504          args = None
    1505  
    1506          def hook(hook_args):
    1507              nonlocal args
    1508              args = hook_args
    1509  
    1510          try:
    1511              with support.swap_attr(threading, 'excepthook', hook):
    1512                  thread = ThreadRunFail()
    1513                  thread.start()
    1514                  thread.join()
    1515  
    1516              self.assertEqual(args.exc_type, ValueError)
    1517              self.assertEqual(str(args.exc_value), 'run failed')
    1518              self.assertEqual(args.exc_traceback, args.exc_value.__traceback__)
    1519              self.assertIs(args.thread, thread)
    1520          finally:
    1521              # Break reference cycle
    1522              args = None
    1523  
    1524      def test_custom_excepthook_fail(self):
    1525          def threading_hook(args):
    1526              raise ValueError("threading_hook failed")
    1527  
    1528          err_str = None
    1529  
    1530          def sys_hook(exc_type, exc_value, exc_traceback):
    1531              nonlocal err_str
    1532              err_str = str(exc_value)
    1533  
    1534          with support.swap_attr(threading, 'excepthook', threading_hook), \
    1535               support.swap_attr(sys, 'excepthook', sys_hook), \
    1536               support.captured_output('stderr') as stderr:
    1537              thread = ThreadRunFail()
    1538              thread.start()
    1539              thread.join()
    1540  
    1541          self.assertEqual(stderr.getvalue(),
    1542                           'Exception in threading.excepthook:\n')
    1543          self.assertEqual(err_str, 'threading_hook failed')
    1544  
    1545      def test_original_excepthook(self):
    1546          def run_thread():
    1547              with support.captured_output("stderr") as output:
    1548                  thread = ThreadRunFail(name="excepthook thread")
    1549                  thread.start()
    1550                  thread.join()
    1551              return output.getvalue()
    1552  
    1553          def threading_hook(args):
    1554              print("Running a thread failed", file=sys.stderr)
    1555  
    1556          default_output = run_thread()
    1557          with support.swap_attr(threading, 'excepthook', threading_hook):
    1558              custom_hook_output = run_thread()
    1559              threading.excepthook = threading.__excepthook__
    1560              recovered_output = run_thread()
    1561  
    1562          self.assertEqual(default_output, recovered_output)
    1563          self.assertNotEqual(default_output, custom_hook_output)
    1564          self.assertEqual(custom_hook_output, "Running a thread failed\n")
    1565  
    1566  
    1567  class ESC[4;38;5;81mTimerTests(ESC[4;38;5;149mBaseTestCase):
    1568  
    1569      def setUp(self):
    1570          BaseTestCase.setUp(self)
    1571          self.callback_args = []
    1572          self.callback_event = threading.Event()
    1573  
    1574      def test_init_immutable_default_args(self):
    1575          # Issue 17435: constructor defaults were mutable objects, they could be
    1576          # mutated via the object attributes and affect other Timer objects.
    1577          timer1 = threading.Timer(0.01, self._callback_spy)
    1578          timer1.start()
    1579          self.callback_event.wait()
    1580          timer1.args.append("blah")
    1581          timer1.kwargs["foo"] = "bar"
    1582          self.callback_event.clear()
    1583          timer2 = threading.Timer(0.01, self._callback_spy)
    1584          timer2.start()
    1585          self.callback_event.wait()
    1586          self.assertEqual(len(self.callback_args), 2)
    1587          self.assertEqual(self.callback_args, [((), {}), ((), {})])
    1588          timer1.join()
    1589          timer2.join()
    1590  
    1591      def _callback_spy(self, *args, **kwargs):
    1592          self.callback_args.append((args[:], kwargs.copy()))
    1593          self.callback_event.set()
    1594  
    1595  class ESC[4;38;5;81mLockTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mLockTests):
    1596      locktype = staticmethod(threading.Lock)
    1597  
    1598  class ESC[4;38;5;81mPyRLockTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mRLockTests):
    1599      locktype = staticmethod(threading._PyRLock)
    1600  
    1601  @unittest.skipIf(threading._CRLock is None, 'RLock not implemented in C')
    1602  class ESC[4;38;5;81mCRLockTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mRLockTests):
    1603      locktype = staticmethod(threading._CRLock)
    1604  
    1605  class ESC[4;38;5;81mEventTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mEventTests):
    1606      eventtype = staticmethod(threading.Event)
    1607  
    1608  class ESC[4;38;5;81mConditionAsRLockTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mRLockTests):
    1609      # Condition uses an RLock by default and exports its API.
    1610      locktype = staticmethod(threading.Condition)
    1611  
    1612      def test_recursion_count(self):
    1613          self.skipTest("Condition does not expose _recursion_count()")
    1614  
    1615  class ESC[4;38;5;81mConditionTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mConditionTests):
    1616      condtype = staticmethod(threading.Condition)
    1617  
    1618  class ESC[4;38;5;81mSemaphoreTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mSemaphoreTests):
    1619      semtype = staticmethod(threading.Semaphore)
    1620  
    1621  class ESC[4;38;5;81mBoundedSemaphoreTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mBoundedSemaphoreTests):
    1622      semtype = staticmethod(threading.BoundedSemaphore)
    1623  
    1624  class ESC[4;38;5;81mBarrierTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mBarrierTests):
    1625      barriertype = staticmethod(threading.Barrier)
    1626  
    1627  
    1628  class ESC[4;38;5;81mMiscTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1629      def test__all__(self):
    1630          restore_default_excepthook(self)
    1631  
    1632          extra = {"ThreadError"}
    1633          not_exported = {'currentThread', 'activeCount'}
    1634          support.check__all__(self, threading, ('threading', '_thread'),
    1635                               extra=extra, not_exported=not_exported)
    1636  
    1637  
    1638  class ESC[4;38;5;81mInterruptMainTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1639      def check_interrupt_main_with_signal_handler(self, signum):
    1640          def handler(signum, frame):
    1641              1/0
    1642  
    1643          old_handler = signal.signal(signum, handler)
    1644          self.addCleanup(signal.signal, signum, old_handler)
    1645  
    1646          with self.assertRaises(ZeroDivisionError):
    1647              _thread.interrupt_main()
    1648  
    1649      def check_interrupt_main_noerror(self, signum):
    1650          handler = signal.getsignal(signum)
    1651          try:
    1652              # No exception should arise.
    1653              signal.signal(signum, signal.SIG_IGN)
    1654              _thread.interrupt_main(signum)
    1655  
    1656              signal.signal(signum, signal.SIG_DFL)
    1657              _thread.interrupt_main(signum)
    1658          finally:
    1659              # Restore original handler
    1660              signal.signal(signum, handler)
    1661  
    1662      def test_interrupt_main_subthread(self):
    1663          # Calling start_new_thread with a function that executes interrupt_main
    1664          # should raise KeyboardInterrupt upon completion.
    1665          def call_interrupt():
    1666              _thread.interrupt_main()
    1667          t = threading.Thread(target=call_interrupt)
    1668          with self.assertRaises(KeyboardInterrupt):
    1669              t.start()
    1670              t.join()
    1671          t.join()
    1672  
    1673      def test_interrupt_main_mainthread(self):
    1674          # Make sure that if interrupt_main is called in main thread that
    1675          # KeyboardInterrupt is raised instantly.
    1676          with self.assertRaises(KeyboardInterrupt):
    1677              _thread.interrupt_main()
    1678  
    1679      def test_interrupt_main_with_signal_handler(self):
    1680          self.check_interrupt_main_with_signal_handler(signal.SIGINT)
    1681          self.check_interrupt_main_with_signal_handler(signal.SIGTERM)
    1682  
    1683      def test_interrupt_main_noerror(self):
    1684          self.check_interrupt_main_noerror(signal.SIGINT)
    1685          self.check_interrupt_main_noerror(signal.SIGTERM)
    1686  
    1687      def test_interrupt_main_invalid_signal(self):
    1688          self.assertRaises(ValueError, _thread.interrupt_main, -1)
    1689          self.assertRaises(ValueError, _thread.interrupt_main, signal.NSIG)
    1690          self.assertRaises(ValueError, _thread.interrupt_main, 1000000)
    1691  
    1692      @threading_helper.reap_threads
    1693      def test_can_interrupt_tight_loops(self):
    1694          cont = [True]
    1695          started = [False]
    1696          interrupted = [False]
    1697  
    1698          def worker(started, cont, interrupted):
    1699              iterations = 100_000_000
    1700              started[0] = True
    1701              while cont[0]:
    1702                  if iterations:
    1703                      iterations -= 1
    1704                  else:
    1705                      return
    1706                  pass
    1707              interrupted[0] = True
    1708  
    1709          t = threading.Thread(target=worker,args=(started, cont, interrupted))
    1710          t.start()
    1711          while not started[0]:
    1712              pass
    1713          cont[0] = False
    1714          t.join()
    1715          self.assertTrue(interrupted[0])
    1716  
    1717  
    1718  class ESC[4;38;5;81mAtexitTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1719  
    1720      def test_atexit_output(self):
    1721          rc, out, err = assert_python_ok("-c", """if True:
    1722              import threading
    1723  
    1724              def run_last():
    1725                  print('parrot')
    1726  
    1727              threading._register_atexit(run_last)
    1728          """)
    1729  
    1730          self.assertFalse(err)
    1731          self.assertEqual(out.strip(), b'parrot')
    1732  
    1733      def test_atexit_called_once(self):
    1734          rc, out, err = assert_python_ok("-c", """if True:
    1735              import threading
    1736              from unittest.mock import Mock
    1737  
    1738              mock = Mock()
    1739              threading._register_atexit(mock)
    1740              mock.assert_not_called()
    1741              # force early shutdown to ensure it was called once
    1742              threading._shutdown()
    1743              mock.assert_called_once()
    1744          """)
    1745  
    1746          self.assertFalse(err)
    1747  
    1748      def test_atexit_after_shutdown(self):
    1749          # The only way to do this is by registering an atexit within
    1750          # an atexit, which is intended to raise an exception.
    1751          rc, out, err = assert_python_ok("-c", """if True:
    1752              import threading
    1753  
    1754              def func():
    1755                  pass
    1756  
    1757              def run_last():
    1758                  threading._register_atexit(func)
    1759  
    1760              threading._register_atexit(run_last)
    1761          """)
    1762  
    1763          self.assertTrue(err)
    1764          self.assertIn("RuntimeError: can't register atexit after shutdown",
    1765                  err.decode())
    1766  
    1767  
    1768  if __name__ == "__main__":
    1769      unittest.main()