python (3.12.0)

(root)/
lib/
python3.12/
test/
test_threading.py
       1  """
       2  Tests for the threading module.
       3  """
       4  
       5  import test.support
       6  from test.support import threading_helper, requires_subprocess
       7  from test.support import verbose, cpython_only, os_helper
       8  from test.support.import_helper import import_module
       9  from test.support.script_helper import assert_python_ok, assert_python_failure
      10  
      11  import random
      12  import sys
      13  import _thread
      14  import threading
      15  import time
      16  import unittest
      17  import weakref
      18  import os
      19  import subprocess
      20  import signal
      21  import textwrap
      22  import traceback
      23  import warnings
      24  
      25  from unittest import mock
      26  from test import lock_tests
      27  from test import support
      28  
      29  threading_helper.requires_working_threading(module=True)
      30  
      31  # Between fork() and exec(), only async-safe functions are allowed (issues
      32  # #12316 and #11870), and fork() from a worker thread is known to trigger
      33  # problems with some operating systems (issue #3863): skip problematic tests
      34  # on platforms known to behave badly.
      35  platforms_to_skip = ('netbsd5', 'hp-ux11')
      36  
      37  
      38  def restore_default_excepthook(testcase):
      39      testcase.addCleanup(setattr, threading, 'excepthook', threading.excepthook)
      40      threading.excepthook = threading.__excepthook__
      41  
      42  
      43  # A trivial mutable counter.
      44  class ESC[4;38;5;81mCounter(ESC[4;38;5;149mobject):
      45      def __init__(self):
      46          self.value = 0
      47      def inc(self):
      48          self.value += 1
      49      def dec(self):
      50          self.value -= 1
      51      def get(self):
      52          return self.value
      53  
      54  class ESC[4;38;5;81mTestThread(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
      55      def __init__(self, name, testcase, sema, mutex, nrunning):
      56          threading.Thread.__init__(self, name=name)
      57          self.testcase = testcase
      58          self.sema = sema
      59          self.mutex = mutex
      60          self.nrunning = nrunning
      61  
      62      def run(self):
      63          delay = random.random() / 10000.0
      64          if verbose:
      65              print('task %s will run for %.1f usec' %
      66                    (self.name, delay * 1e6))
      67  
      68          with self.sema:
      69              with self.mutex:
      70                  self.nrunning.inc()
      71                  if verbose:
      72                      print(self.nrunning.get(), 'tasks are running')
      73                  self.testcase.assertLessEqual(self.nrunning.get(), 3)
      74  
      75              time.sleep(delay)
      76              if verbose:
      77                  print('task', self.name, 'done')
      78  
      79              with self.mutex:
      80                  self.nrunning.dec()
      81                  self.testcase.assertGreaterEqual(self.nrunning.get(), 0)
      82                  if verbose:
      83                      print('%s is finished. %d tasks are running' %
      84                            (self.name, self.nrunning.get()))
      85  
      86  
      87  class ESC[4;38;5;81mBaseTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      88      def setUp(self):
      89          self._threads = threading_helper.threading_setup()
      90  
      91      def tearDown(self):
      92          threading_helper.threading_cleanup(*self._threads)
      93          test.support.reap_children()
      94  
      95  
      96  class ESC[4;38;5;81mThreadTests(ESC[4;38;5;149mBaseTestCase):
      97  
      98      @cpython_only
      99      def test_name(self):
     100          def func(): pass
     101  
     102          thread = threading.Thread(name="myname1")
     103          self.assertEqual(thread.name, "myname1")
     104  
     105          # Convert int name to str
     106          thread = threading.Thread(name=123)
     107          self.assertEqual(thread.name, "123")
     108  
     109          # target name is ignored if name is specified
     110          thread = threading.Thread(target=func, name="myname2")
     111          self.assertEqual(thread.name, "myname2")
     112  
     113          with mock.patch.object(threading, '_counter', return_value=2):
     114              thread = threading.Thread(name="")
     115              self.assertEqual(thread.name, "Thread-2")
     116  
     117          with mock.patch.object(threading, '_counter', return_value=3):
     118              thread = threading.Thread()
     119              self.assertEqual(thread.name, "Thread-3")
     120  
     121          with mock.patch.object(threading, '_counter', return_value=5):
     122              thread = threading.Thread(target=func)
     123              self.assertEqual(thread.name, "Thread-5 (func)")
     124  
     125      def test_args_argument(self):
     126          # bpo-45735: Using list or tuple as *args* in constructor could
     127          # achieve the same effect.
     128          num_list = [1]
     129          num_tuple = (1,)
     130  
     131          str_list = ["str"]
     132          str_tuple = ("str",)
     133  
     134          list_in_tuple = ([1],)
     135          tuple_in_list = [(1,)]
     136  
     137          test_cases = (
     138              (num_list, lambda arg: self.assertEqual(arg, 1)),
     139              (num_tuple, lambda arg: self.assertEqual(arg, 1)),
     140              (str_list, lambda arg: self.assertEqual(arg, "str")),
     141              (str_tuple, lambda arg: self.assertEqual(arg, "str")),
     142              (list_in_tuple, lambda arg: self.assertEqual(arg, [1])),
     143              (tuple_in_list, lambda arg: self.assertEqual(arg, (1,)))
     144          )
     145  
     146          for args, target in test_cases:
     147              with self.subTest(target=target, args=args):
     148                  t = threading.Thread(target=target, args=args)
     149                  t.start()
     150                  t.join()
     151  
     152      @cpython_only
     153      def test_disallow_instantiation(self):
     154          # Ensure that the type disallows instantiation (bpo-43916)
     155          lock = threading.Lock()
     156          test.support.check_disallow_instantiation(self, type(lock))
     157  
     158      # Create a bunch of threads, let each do some work, wait until all are
     159      # done.
     160      def test_various_ops(self):
     161          # This takes about n/3 seconds to run (about n/3 clumps of tasks,
     162          # times about 1 second per clump).
     163          NUMTASKS = 10
     164  
     165          # no more than 3 of the 10 can run at once
     166          sema = threading.BoundedSemaphore(value=3)
     167          mutex = threading.RLock()
     168          numrunning = Counter()
     169  
     170          threads = []
     171  
     172          for i in range(NUMTASKS):
     173              t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
     174              threads.append(t)
     175              self.assertIsNone(t.ident)
     176              self.assertRegex(repr(t), r'^<TestThread\(.*, initial\)>$')
     177              t.start()
     178  
     179          if hasattr(threading, 'get_native_id'):
     180              native_ids = set(t.native_id for t in threads) | {threading.get_native_id()}
     181              self.assertNotIn(None, native_ids)
     182              self.assertEqual(len(native_ids), NUMTASKS + 1)
     183  
     184          if verbose:
     185              print('waiting for all tasks to complete')
     186          for t in threads:
     187              t.join()
     188              self.assertFalse(t.is_alive())
     189              self.assertNotEqual(t.ident, 0)
     190              self.assertIsNotNone(t.ident)
     191              self.assertRegex(repr(t), r'^<TestThread\(.*, stopped -?\d+\)>$')
     192          if verbose:
     193              print('all tasks done')
     194          self.assertEqual(numrunning.get(), 0)
     195  
     196      def test_ident_of_no_threading_threads(self):
     197          # The ident still must work for the main thread and dummy threads.
     198          self.assertIsNotNone(threading.current_thread().ident)
     199          def f():
     200              ident.append(threading.current_thread().ident)
     201              done.set()
     202          done = threading.Event()
     203          ident = []
     204          with threading_helper.wait_threads_exit():
     205              tid = _thread.start_new_thread(f, ())
     206              done.wait()
     207              self.assertEqual(ident[0], tid)
     208          # Kill the "immortal" _DummyThread
     209          del threading._active[ident[0]]
     210  
     211      # run with a small(ish) thread stack size (256 KiB)
     212      def test_various_ops_small_stack(self):
     213          if verbose:
     214              print('with 256 KiB thread stack size...')
     215          try:
     216              threading.stack_size(262144)
     217          except _thread.error:
     218              raise unittest.SkipTest(
     219                  'platform does not support changing thread stack size')
     220          self.test_various_ops()
     221          threading.stack_size(0)
     222  
     223      # run with a large thread stack size (1 MiB)
     224      def test_various_ops_large_stack(self):
     225          if verbose:
     226              print('with 1 MiB thread stack size...')
     227          try:
     228              threading.stack_size(0x100000)
     229          except _thread.error:
     230              raise unittest.SkipTest(
     231                  'platform does not support changing thread stack size')
     232          self.test_various_ops()
     233          threading.stack_size(0)
     234  
     235      def test_foreign_thread(self):
     236          # Check that a "foreign" thread can use the threading module.
     237          def f(mutex):
     238              # Calling current_thread() forces an entry for the foreign
     239              # thread to get made in the threading._active map.
     240              threading.current_thread()
     241              mutex.release()
     242  
     243          mutex = threading.Lock()
     244          mutex.acquire()
     245          with threading_helper.wait_threads_exit():
     246              tid = _thread.start_new_thread(f, (mutex,))
     247              # Wait for the thread to finish.
     248              mutex.acquire()
     249          self.assertIn(tid, threading._active)
     250          self.assertIsInstance(threading._active[tid], threading._DummyThread)
     251          #Issue 29376
     252          self.assertTrue(threading._active[tid].is_alive())
     253          self.assertRegex(repr(threading._active[tid]), '_DummyThread')
     254          del threading._active[tid]
     255  
     256      # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
     257      # exposed at the Python level.  This test relies on ctypes to get at it.
     258      def test_PyThreadState_SetAsyncExc(self):
     259          ctypes = import_module("ctypes")
     260  
     261          set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc
     262          set_async_exc.argtypes = (ctypes.c_ulong, ctypes.py_object)
     263  
     264          class ESC[4;38;5;81mAsyncExc(ESC[4;38;5;149mException):
     265              pass
     266  
     267          exception = ctypes.py_object(AsyncExc)
     268  
     269          # First check it works when setting the exception from the same thread.
     270          tid = threading.get_ident()
     271          self.assertIsInstance(tid, int)
     272          self.assertGreater(tid, 0)
     273  
     274          try:
     275              result = set_async_exc(tid, exception)
     276              # The exception is async, so we might have to keep the VM busy until
     277              # it notices.
     278              while True:
     279                  pass
     280          except AsyncExc:
     281              pass
     282          else:
     283              # This code is unreachable but it reflects the intent. If we wanted
     284              # to be smarter the above loop wouldn't be infinite.
     285              self.fail("AsyncExc not raised")
     286          try:
     287              self.assertEqual(result, 1) # one thread state modified
     288          except UnboundLocalError:
     289              # The exception was raised too quickly for us to get the result.
     290              pass
     291  
     292          # `worker_started` is set by the thread when it's inside a try/except
     293          # block waiting to catch the asynchronously set AsyncExc exception.
     294          # `worker_saw_exception` is set by the thread upon catching that
     295          # exception.
     296          worker_started = threading.Event()
     297          worker_saw_exception = threading.Event()
     298  
     299          class ESC[4;38;5;81mWorker(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
     300              def run(self):
     301                  self.id = threading.get_ident()
     302                  self.finished = False
     303  
     304                  try:
     305                      while True:
     306                          worker_started.set()
     307                          time.sleep(0.1)
     308                  except AsyncExc:
     309                      self.finished = True
     310                      worker_saw_exception.set()
     311  
     312          t = Worker()
     313          t.daemon = True # so if this fails, we don't hang Python at shutdown
     314          t.start()
     315          if verbose:
     316              print("    started worker thread")
     317  
     318          # Try a thread id that doesn't make sense.
     319          if verbose:
     320              print("    trying nonsensical thread id")
     321          result = set_async_exc(-1, exception)
     322          self.assertEqual(result, 0)  # no thread states modified
     323  
     324          # Now raise an exception in the worker thread.
     325          if verbose:
     326              print("    waiting for worker thread to get started")
     327          ret = worker_started.wait()
     328          self.assertTrue(ret)
     329          if verbose:
     330              print("    verifying worker hasn't exited")
     331          self.assertFalse(t.finished)
     332          if verbose:
     333              print("    attempting to raise asynch exception in worker")
     334          result = set_async_exc(t.id, exception)
     335          self.assertEqual(result, 1) # one thread state modified
     336          if verbose:
     337              print("    waiting for worker to say it caught the exception")
     338          worker_saw_exception.wait(timeout=support.SHORT_TIMEOUT)
     339          self.assertTrue(t.finished)
     340          if verbose:
     341              print("    all OK -- joining worker")
     342          if t.finished:
     343              t.join()
     344          # else the thread is still running, and we have no way to kill it
     345  
     346      def test_limbo_cleanup(self):
     347          # Issue 7481: Failure to start thread should cleanup the limbo map.
     348          def fail_new_thread(*args):
     349              raise threading.ThreadError()
     350          _start_new_thread = threading._start_new_thread
     351          threading._start_new_thread = fail_new_thread
     352          try:
     353              t = threading.Thread(target=lambda: None)
     354              self.assertRaises(threading.ThreadError, t.start)
     355              self.assertFalse(
     356                  t in threading._limbo,
     357                  "Failed to cleanup _limbo map on failure of Thread.start().")
     358          finally:
     359              threading._start_new_thread = _start_new_thread
     360  
     361      def test_finalize_running_thread(self):
     362          # Issue 1402: the PyGILState_Ensure / _Release functions may be called
     363          # very late on python exit: on deallocation of a running thread for
     364          # example.
     365          import_module("ctypes")
     366  
     367          rc, out, err = assert_python_failure("-c", """if 1:
     368              import ctypes, sys, time, _thread
     369  
     370              # This lock is used as a simple event variable.
     371              ready = _thread.allocate_lock()
     372              ready.acquire()
     373  
     374              # Module globals are cleared before __del__ is run
     375              # So we save the functions in class dict
     376              class C:
     377                  ensure = ctypes.pythonapi.PyGILState_Ensure
     378                  release = ctypes.pythonapi.PyGILState_Release
     379                  def __del__(self):
     380                      state = self.ensure()
     381                      self.release(state)
     382  
     383              def waitingThread():
     384                  x = C()
     385                  ready.release()
     386                  time.sleep(100)
     387  
     388              _thread.start_new_thread(waitingThread, ())
     389              ready.acquire()  # Be sure the other thread is waiting.
     390              sys.exit(42)
     391              """)
     392          self.assertEqual(rc, 42)
     393  
     394      def test_finalize_with_trace(self):
     395          # Issue1733757
     396          # Avoid a deadlock when sys.settrace steps into threading._shutdown
     397          assert_python_ok("-c", """if 1:
     398              import sys, threading
     399  
     400              # A deadlock-killer, to prevent the
     401              # testsuite to hang forever
     402              def killer():
     403                  import os, time
     404                  time.sleep(2)
     405                  print('program blocked; aborting')
     406                  os._exit(2)
     407              t = threading.Thread(target=killer)
     408              t.daemon = True
     409              t.start()
     410  
     411              # This is the trace function
     412              def func(frame, event, arg):
     413                  threading.current_thread()
     414                  return func
     415  
     416              sys.settrace(func)
     417              """)
     418  
     419      def test_join_nondaemon_on_shutdown(self):
     420          # Issue 1722344
     421          # Raising SystemExit skipped threading._shutdown
     422          rc, out, err = assert_python_ok("-c", """if 1:
     423                  import threading
     424                  from time import sleep
     425  
     426                  def child():
     427                      sleep(1)
     428                      # As a non-daemon thread we SHOULD wake up and nothing
     429                      # should be torn down yet
     430                      print("Woke up, sleep function is:", sleep)
     431  
     432                  threading.Thread(target=child).start()
     433                  raise SystemExit
     434              """)
     435          self.assertEqual(out.strip(),
     436              b"Woke up, sleep function is: <built-in function sleep>")
     437          self.assertEqual(err, b"")
     438  
     439      def test_enumerate_after_join(self):
     440          # Try hard to trigger #1703448: a thread is still returned in
     441          # threading.enumerate() after it has been join()ed.
     442          enum = threading.enumerate
     443          old_interval = sys.getswitchinterval()
     444          try:
     445              for i in range(1, 100):
     446                  sys.setswitchinterval(i * 0.0002)
     447                  t = threading.Thread(target=lambda: None)
     448                  t.start()
     449                  t.join()
     450                  l = enum()
     451                  self.assertNotIn(t, l,
     452                      "#1703448 triggered after %d trials: %s" % (i, l))
     453          finally:
     454              sys.setswitchinterval(old_interval)
     455  
     456      def test_no_refcycle_through_target(self):
     457          class ESC[4;38;5;81mRunSelfFunction(ESC[4;38;5;149mobject):
     458              def __init__(self, should_raise):
     459                  # The links in this refcycle from Thread back to self
     460                  # should be cleaned up when the thread completes.
     461                  self.should_raise = should_raise
     462                  self.thread = threading.Thread(target=self._run,
     463                                                 args=(self,),
     464                                                 kwargs={'yet_another':self})
     465                  self.thread.start()
     466  
     467              def _run(self, other_ref, yet_another):
     468                  if self.should_raise:
     469                      raise SystemExit
     470  
     471          restore_default_excepthook(self)
     472  
     473          cyclic_object = RunSelfFunction(should_raise=False)
     474          weak_cyclic_object = weakref.ref(cyclic_object)
     475          cyclic_object.thread.join()
     476          del cyclic_object
     477          self.assertIsNone(weak_cyclic_object(),
     478                           msg=('%d references still around' %
     479                                sys.getrefcount(weak_cyclic_object())))
     480  
     481          raising_cyclic_object = RunSelfFunction(should_raise=True)
     482          weak_raising_cyclic_object = weakref.ref(raising_cyclic_object)
     483          raising_cyclic_object.thread.join()
     484          del raising_cyclic_object
     485          self.assertIsNone(weak_raising_cyclic_object(),
     486                           msg=('%d references still around' %
     487                                sys.getrefcount(weak_raising_cyclic_object())))
     488  
     489      def test_old_threading_api(self):
     490          # Just a quick sanity check to make sure the old method names are
     491          # still present
     492          t = threading.Thread()
     493          with self.assertWarnsRegex(DeprecationWarning,
     494                                     r'get the daemon attribute'):
     495              t.isDaemon()
     496          with self.assertWarnsRegex(DeprecationWarning,
     497                                     r'set the daemon attribute'):
     498              t.setDaemon(True)
     499          with self.assertWarnsRegex(DeprecationWarning,
     500                                     r'get the name attribute'):
     501              t.getName()
     502          with self.assertWarnsRegex(DeprecationWarning,
     503                                     r'set the name attribute'):
     504              t.setName("name")
     505  
     506          e = threading.Event()
     507          with self.assertWarnsRegex(DeprecationWarning, 'use is_set()'):
     508              e.isSet()
     509  
     510          cond = threading.Condition()
     511          cond.acquire()
     512          with self.assertWarnsRegex(DeprecationWarning, 'use notify_all()'):
     513              cond.notifyAll()
     514  
     515          with self.assertWarnsRegex(DeprecationWarning, 'use active_count()'):
     516              threading.activeCount()
     517          with self.assertWarnsRegex(DeprecationWarning, 'use current_thread()'):
     518              threading.currentThread()
     519  
     520      def test_repr_daemon(self):
     521          t = threading.Thread()
     522          self.assertNotIn('daemon', repr(t))
     523          t.daemon = True
     524          self.assertIn('daemon', repr(t))
     525  
     526      def test_daemon_param(self):
     527          t = threading.Thread()
     528          self.assertFalse(t.daemon)
     529          t = threading.Thread(daemon=False)
     530          self.assertFalse(t.daemon)
     531          t = threading.Thread(daemon=True)
     532          self.assertTrue(t.daemon)
     533  
     534      @support.requires_fork()
     535      def test_dummy_thread_after_fork(self):
     536          # Issue #14308: a dummy thread in the active list doesn't mess up
     537          # the after-fork mechanism.
     538          code = """if 1:
     539              import _thread, threading, os, time, warnings
     540  
     541              def background_thread(evt):
     542                  # Creates and registers the _DummyThread instance
     543                  threading.current_thread()
     544                  evt.set()
     545                  time.sleep(10)
     546  
     547              evt = threading.Event()
     548              _thread.start_new_thread(background_thread, (evt,))
     549              evt.wait()
     550              assert threading.active_count() == 2, threading.active_count()
     551              with warnings.catch_warnings(record=True) as ws:
     552                  warnings.filterwarnings(
     553                          "always", category=DeprecationWarning)
     554                  if os.fork() == 0:
     555                      assert threading.active_count() == 1, threading.active_count()
     556                      os._exit(0)
     557                  else:
     558                      assert ws[0].category == DeprecationWarning, ws[0]
     559                      assert 'fork' in str(ws[0].message), ws[0]
     560                      os.wait()
     561          """
     562          _, out, err = assert_python_ok("-c", code)
     563          self.assertEqual(out, b'')
     564          self.assertEqual(err, b'')
     565  
     566      @support.requires_fork()
     567      def test_is_alive_after_fork(self):
     568          # Try hard to trigger #18418: is_alive() could sometimes be True on
     569          # threads that vanished after a fork.
     570          old_interval = sys.getswitchinterval()
     571          self.addCleanup(sys.setswitchinterval, old_interval)
     572  
     573          # Make the bug more likely to manifest.
     574          test.support.setswitchinterval(1e-6)
     575  
     576          for i in range(20):
     577              t = threading.Thread(target=lambda: None)
     578              t.start()
     579              # Ignore the warning about fork with threads.
     580              with warnings.catch_warnings(category=DeprecationWarning,
     581                                           action="ignore"):
     582                  if (pid := os.fork()) == 0:
     583                      os._exit(11 if t.is_alive() else 10)
     584                  else:
     585                      t.join()
     586  
     587                      support.wait_process(pid, exitcode=10)
     588  
     589      def test_main_thread(self):
     590          main = threading.main_thread()
     591          self.assertEqual(main.name, 'MainThread')
     592          self.assertEqual(main.ident, threading.current_thread().ident)
     593          self.assertEqual(main.ident, threading.get_ident())
     594  
     595          def f():
     596              self.assertNotEqual(threading.main_thread().ident,
     597                                  threading.current_thread().ident)
     598          th = threading.Thread(target=f)
     599          th.start()
     600          th.join()
     601  
     602      @support.requires_fork()
     603      @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
     604      def test_main_thread_after_fork(self):
     605          code = """if 1:
     606              import os, threading
     607              from test import support
     608  
     609              pid = os.fork()
     610              if pid == 0:
     611                  main = threading.main_thread()
     612                  print(main.name)
     613                  print(main.ident == threading.current_thread().ident)
     614                  print(main.ident == threading.get_ident())
     615              else:
     616                  support.wait_process(pid, exitcode=0)
     617          """
     618          _, out, err = assert_python_ok("-c", code)
     619          data = out.decode().replace('\r', '')
     620          self.assertEqual(err, b"")
     621          self.assertEqual(data, "MainThread\nTrue\nTrue\n")
     622  
     623      @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
     624      @support.requires_fork()
     625      @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
     626      def test_main_thread_after_fork_from_nonmain_thread(self):
     627          code = """if 1:
     628              import os, threading, sys, warnings
     629              from test import support
     630  
     631              def func():
     632                  with warnings.catch_warnings(record=True) as ws:
     633                      warnings.filterwarnings(
     634                              "always", category=DeprecationWarning)
     635                      pid = os.fork()
     636                      if pid == 0:
     637                          main = threading.main_thread()
     638                          print(main.name)
     639                          print(main.ident == threading.current_thread().ident)
     640                          print(main.ident == threading.get_ident())
     641                          # stdout is fully buffered because not a tty,
     642                          # we have to flush before exit.
     643                          sys.stdout.flush()
     644                      else:
     645                          assert ws[0].category == DeprecationWarning, ws[0]
     646                          assert 'fork' in str(ws[0].message), ws[0]
     647                          support.wait_process(pid, exitcode=0)
     648  
     649              th = threading.Thread(target=func)
     650              th.start()
     651              th.join()
     652          """
     653          _, out, err = assert_python_ok("-c", code)
     654          data = out.decode().replace('\r', '')
     655          self.assertEqual(err.decode('utf-8'), "")
     656          self.assertEqual(data, "Thread-1 (func)\nTrue\nTrue\n")
     657  
     658      def test_main_thread_during_shutdown(self):
     659          # bpo-31516: current_thread() should still point to the main thread
     660          # at shutdown
     661          code = """if 1:
     662              import gc, threading
     663  
     664              main_thread = threading.current_thread()
     665              assert main_thread is threading.main_thread()  # sanity check
     666  
     667              class RefCycle:
     668                  def __init__(self):
     669                      self.cycle = self
     670  
     671                  def __del__(self):
     672                      print("GC:",
     673                            threading.current_thread() is main_thread,
     674                            threading.main_thread() is main_thread,
     675                            threading.enumerate() == [main_thread])
     676  
     677              RefCycle()
     678              gc.collect()  # sanity check
     679              x = RefCycle()
     680          """
     681          _, out, err = assert_python_ok("-c", code)
     682          data = out.decode()
     683          self.assertEqual(err, b"")
     684          self.assertEqual(data.splitlines(),
     685                           ["GC: True True True"] * 2)
     686  
     687      def test_finalization_shutdown(self):
     688          # bpo-36402: Py_Finalize() calls threading._shutdown() which must wait
     689          # until Python thread states of all non-daemon threads get deleted.
     690          #
     691          # Test similar to SubinterpThreadingTests.test_threads_join_2(), but
     692          # test the finalization of the main interpreter.
     693          code = """if 1:
     694              import os
     695              import threading
     696              import time
     697              import random
     698  
     699              def random_sleep():
     700                  seconds = random.random() * 0.010
     701                  time.sleep(seconds)
     702  
     703              class Sleeper:
     704                  def __del__(self):
     705                      random_sleep()
     706  
     707              tls = threading.local()
     708  
     709              def f():
     710                  # Sleep a bit so that the thread is still running when
     711                  # Py_Finalize() is called.
     712                  random_sleep()
     713                  tls.x = Sleeper()
     714                  random_sleep()
     715  
     716              threading.Thread(target=f).start()
     717              random_sleep()
     718          """
     719          rc, out, err = assert_python_ok("-c", code)
     720          self.assertEqual(err, b"")
     721  
     722      def test_tstate_lock(self):
     723          # Test an implementation detail of Thread objects.
     724          started = _thread.allocate_lock()
     725          finish = _thread.allocate_lock()
     726          started.acquire()
     727          finish.acquire()
     728          def f():
     729              started.release()
     730              finish.acquire()
     731              time.sleep(0.01)
     732          # The tstate lock is None until the thread is started
     733          t = threading.Thread(target=f)
     734          self.assertIs(t._tstate_lock, None)
     735          t.start()
     736          started.acquire()
     737          self.assertTrue(t.is_alive())
     738          # The tstate lock can't be acquired when the thread is running
     739          # (or suspended).
     740          tstate_lock = t._tstate_lock
     741          self.assertFalse(tstate_lock.acquire(timeout=0), False)
     742          finish.release()
     743          # When the thread ends, the state_lock can be successfully
     744          # acquired.
     745          self.assertTrue(tstate_lock.acquire(timeout=support.SHORT_TIMEOUT), False)
     746          # But is_alive() is still True:  we hold _tstate_lock now, which
     747          # prevents is_alive() from knowing the thread's end-of-life C code
     748          # is done.
     749          self.assertTrue(t.is_alive())
     750          # Let is_alive() find out the C code is done.
     751          tstate_lock.release()
     752          self.assertFalse(t.is_alive())
     753          # And verify the thread disposed of _tstate_lock.
     754          self.assertIsNone(t._tstate_lock)
     755          t.join()
     756  
     757      def test_repr_stopped(self):
     758          # Verify that "stopped" shows up in repr(Thread) appropriately.
     759          started = _thread.allocate_lock()
     760          finish = _thread.allocate_lock()
     761          started.acquire()
     762          finish.acquire()
     763          def f():
     764              started.release()
     765              finish.acquire()
     766          t = threading.Thread(target=f)
     767          t.start()
     768          started.acquire()
     769          self.assertIn("started", repr(t))
     770          finish.release()
     771          # "stopped" should appear in the repr in a reasonable amount of time.
     772          # Implementation detail:  as of this writing, that's trivially true
     773          # if .join() is called, and almost trivially true if .is_alive() is
     774          # called.  The detail we're testing here is that "stopped" shows up
     775          # "all on its own".
     776          LOOKING_FOR = "stopped"
     777          for i in range(500):
     778              if LOOKING_FOR in repr(t):
     779                  break
     780              time.sleep(0.01)
     781          self.assertIn(LOOKING_FOR, repr(t)) # we waited at least 5 seconds
     782          t.join()
     783  
     784      def test_BoundedSemaphore_limit(self):
     785          # BoundedSemaphore should raise ValueError if released too often.
     786          for limit in range(1, 10):
     787              bs = threading.BoundedSemaphore(limit)
     788              threads = [threading.Thread(target=bs.acquire)
     789                         for _ in range(limit)]
     790              for t in threads:
     791                  t.start()
     792              for t in threads:
     793                  t.join()
     794              threads = [threading.Thread(target=bs.release)
     795                         for _ in range(limit)]
     796              for t in threads:
     797                  t.start()
     798              for t in threads:
     799                  t.join()
     800              self.assertRaises(ValueError, bs.release)
     801  
     802      @cpython_only
     803      def test_frame_tstate_tracing(self):
     804          # Issue #14432: Crash when a generator is created in a C thread that is
     805          # destroyed while the generator is still used. The issue was that a
     806          # generator contains a frame, and the frame kept a reference to the
     807          # Python state of the destroyed C thread. The crash occurs when a trace
     808          # function is setup.
     809  
     810          def noop_trace(frame, event, arg):
     811              # no operation
     812              return noop_trace
     813  
     814          def generator():
     815              while 1:
     816                  yield "generator"
     817  
     818          def callback():
     819              if callback.gen is None:
     820                  callback.gen = generator()
     821              return next(callback.gen)
     822          callback.gen = None
     823  
     824          old_trace = sys.gettrace()
     825          sys.settrace(noop_trace)
     826          try:
     827              # Install a trace function
     828              threading.settrace(noop_trace)
     829  
     830              # Create a generator in a C thread which exits after the call
     831              import _testcapi
     832              _testcapi.call_in_temporary_c_thread(callback)
     833  
     834              # Call the generator in a different Python thread, check that the
     835              # generator didn't keep a reference to the destroyed thread state
     836              for test in range(3):
     837                  # The trace function is still called here
     838                  callback()
     839          finally:
     840              sys.settrace(old_trace)
     841              threading.settrace(old_trace)
     842  
     843      def test_gettrace(self):
     844          def noop_trace(frame, event, arg):
     845              # no operation
     846              return noop_trace
     847          old_trace = threading.gettrace()
     848          try:
     849              threading.settrace(noop_trace)
     850              trace_func = threading.gettrace()
     851              self.assertEqual(noop_trace,trace_func)
     852          finally:
     853              threading.settrace(old_trace)
     854  
     855      def test_gettrace_all_threads(self):
     856          def fn(*args): pass
     857          old_trace = threading.gettrace()
     858          first_check = threading.Event()
     859          second_check = threading.Event()
     860  
     861          trace_funcs = []
     862          def checker():
     863              trace_funcs.append(sys.gettrace())
     864              first_check.set()
     865              second_check.wait()
     866              trace_funcs.append(sys.gettrace())
     867  
     868          try:
     869              t = threading.Thread(target=checker)
     870              t.start()
     871              first_check.wait()
     872              threading.settrace_all_threads(fn)
     873              second_check.set()
     874              t.join()
     875              self.assertEqual(trace_funcs, [None, fn])
     876              self.assertEqual(threading.gettrace(), fn)
     877              self.assertEqual(sys.gettrace(), fn)
     878          finally:
     879              threading.settrace_all_threads(old_trace)
     880  
     881          self.assertEqual(threading.gettrace(), old_trace)
     882          self.assertEqual(sys.gettrace(), old_trace)
     883  
     884      def test_getprofile(self):
     885          def fn(*args): pass
     886          old_profile = threading.getprofile()
     887          try:
     888              threading.setprofile(fn)
     889              self.assertEqual(fn, threading.getprofile())
     890          finally:
     891              threading.setprofile(old_profile)
     892  
     893      def test_getprofile_all_threads(self):
     894          def fn(*args): pass
     895          old_profile = threading.getprofile()
     896          first_check = threading.Event()
     897          second_check = threading.Event()
     898  
     899          profile_funcs = []
     900          def checker():
     901              profile_funcs.append(sys.getprofile())
     902              first_check.set()
     903              second_check.wait()
     904              profile_funcs.append(sys.getprofile())
     905  
     906          try:
     907              t = threading.Thread(target=checker)
     908              t.start()
     909              first_check.wait()
     910              threading.setprofile_all_threads(fn)
     911              second_check.set()
     912              t.join()
     913              self.assertEqual(profile_funcs, [None, fn])
     914              self.assertEqual(threading.getprofile(), fn)
     915              self.assertEqual(sys.getprofile(), fn)
     916          finally:
     917              threading.setprofile_all_threads(old_profile)
     918  
     919          self.assertEqual(threading.getprofile(), old_profile)
     920          self.assertEqual(sys.getprofile(), old_profile)
     921  
     922      @cpython_only
     923      def test_shutdown_locks(self):
     924          for daemon in (False, True):
     925              with self.subTest(daemon=daemon):
     926                  event = threading.Event()
     927                  thread = threading.Thread(target=event.wait, daemon=daemon)
     928  
     929                  # Thread.start() must add lock to _shutdown_locks,
     930                  # but only for non-daemon thread
     931                  thread.start()
     932                  tstate_lock = thread._tstate_lock
     933                  if not daemon:
     934                      self.assertIn(tstate_lock, threading._shutdown_locks)
     935                  else:
     936                      self.assertNotIn(tstate_lock, threading._shutdown_locks)
     937  
     938                  # unblock the thread and join it
     939                  event.set()
     940                  thread.join()
     941  
     942                  # Thread._stop() must remove tstate_lock from _shutdown_locks.
     943                  # Daemon threads must never add it to _shutdown_locks.
     944                  self.assertNotIn(tstate_lock, threading._shutdown_locks)
     945  
     946      def test_locals_at_exit(self):
     947          # bpo-19466: thread locals must not be deleted before destructors
     948          # are called
     949          rc, out, err = assert_python_ok("-c", """if 1:
     950              import threading
     951  
     952              class Atexit:
     953                  def __del__(self):
     954                      print("thread_dict.atexit = %r" % thread_dict.atexit)
     955  
     956              thread_dict = threading.local()
     957              thread_dict.atexit = "value"
     958  
     959              atexit = Atexit()
     960          """)
     961          self.assertEqual(out.rstrip(), b"thread_dict.atexit = 'value'")
     962  
     963      def test_boolean_target(self):
     964          # bpo-41149: A thread that had a boolean value of False would not
     965          # run, regardless of whether it was callable. The correct behaviour
     966          # is for a thread to do nothing if its target is None, and to call
     967          # the target otherwise.
     968          class ESC[4;38;5;81mBooleanTarget(ESC[4;38;5;149mobject):
     969              def __init__(self):
     970                  self.ran = False
     971              def __bool__(self):
     972                  return False
     973              def __call__(self):
     974                  self.ran = True
     975  
     976          target = BooleanTarget()
     977          thread = threading.Thread(target=target)
     978          thread.start()
     979          thread.join()
     980          self.assertTrue(target.ran)
     981  
     982      def test_leak_without_join(self):
     983          # bpo-37788: Test that a thread which is not joined explicitly
     984          # does not leak. Test written for reference leak checks.
     985          def noop(): pass
     986          with threading_helper.wait_threads_exit():
     987              threading.Thread(target=noop).start()
     988              # Thread.join() is not called
     989  
     990      def test_import_from_another_thread(self):
     991          # bpo-1596321: If the threading module is first import from a thread
     992          # different than the main thread, threading._shutdown() must handle
     993          # this case without logging an error at Python exit.
     994          code = textwrap.dedent('''
     995              import _thread
     996              import sys
     997  
     998              event = _thread.allocate_lock()
     999              event.acquire()
    1000  
    1001              def import_threading():
    1002                  import threading
    1003                  event.release()
    1004  
    1005              if 'threading' in sys.modules:
    1006                  raise Exception('threading is already imported')
    1007  
    1008              _thread.start_new_thread(import_threading, ())
    1009  
    1010              # wait until the threading module is imported
    1011              event.acquire()
    1012              event.release()
    1013  
    1014              if 'threading' not in sys.modules:
    1015                  raise Exception('threading is not imported')
    1016  
    1017              # don't wait until the thread completes
    1018          ''')
    1019          rc, out, err = assert_python_ok("-c", code)
    1020          self.assertEqual(out, b'')
    1021          self.assertEqual(err, b'')
    1022  
    1023      def test_start_new_thread_at_exit(self):
    1024          code = """if 1:
    1025              import atexit
    1026              import _thread
    1027  
    1028              def f():
    1029                  print("shouldn't be printed")
    1030  
    1031              def exit_handler():
    1032                  _thread.start_new_thread(f, ())
    1033  
    1034              atexit.register(exit_handler)
    1035          """
    1036          _, out, err = assert_python_ok("-c", code)
    1037          self.assertEqual(out, b'')
    1038          self.assertIn(b"can't create new thread at interpreter shutdown", err)
    1039  
    1040  class ESC[4;38;5;81mThreadJoinOnShutdown(ESC[4;38;5;149mBaseTestCase):
    1041  
    1042      def _run_and_join(self, script):
    1043          script = """if 1:
    1044              import sys, os, time, threading
    1045  
    1046              # a thread, which waits for the main program to terminate
    1047              def joiningfunc(mainthread):
    1048                  mainthread.join()
    1049                  print('end of thread')
    1050                  # stdout is fully buffered because not a tty, we have to flush
    1051                  # before exit.
    1052                  sys.stdout.flush()
    1053          \n""" + script
    1054  
    1055          rc, out, err = assert_python_ok("-c", script)
    1056          data = out.decode().replace('\r', '')
    1057          self.assertEqual(data, "end of main\nend of thread\n")
    1058  
    1059      def test_1_join_on_shutdown(self):
    1060          # The usual case: on exit, wait for a non-daemon thread
    1061          script = """if 1:
    1062              import os
    1063              t = threading.Thread(target=joiningfunc,
    1064                                   args=(threading.current_thread(),))
    1065              t.start()
    1066              time.sleep(0.1)
    1067              print('end of main')
    1068              """
    1069          self._run_and_join(script)
    1070  
    1071      @support.requires_fork()
    1072      @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
    1073      def test_2_join_in_forked_process(self):
    1074          # Like the test above, but from a forked interpreter
    1075          script = """if 1:
    1076              from test import support
    1077  
    1078              childpid = os.fork()
    1079              if childpid != 0:
    1080                  # parent process
    1081                  support.wait_process(childpid, exitcode=0)
    1082                  sys.exit(0)
    1083  
    1084              # child process
    1085              t = threading.Thread(target=joiningfunc,
    1086                                   args=(threading.current_thread(),))
    1087              t.start()
    1088              print('end of main')
    1089              """
    1090          self._run_and_join(script)
    1091  
    1092      @support.requires_fork()
    1093      @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
    1094      def test_3_join_in_forked_from_thread(self):
    1095          # Like the test above, but fork() was called from a worker thread
    1096          # In the forked process, the main Thread object must be marked as stopped.
    1097  
    1098          script = """if 1:
    1099              from test import support
    1100  
    1101              main_thread = threading.current_thread()
    1102              def worker():
    1103                  childpid = os.fork()
    1104                  if childpid != 0:
    1105                      # parent process
    1106                      support.wait_process(childpid, exitcode=0)
    1107                      sys.exit(0)
    1108  
    1109                  # child process
    1110                  t = threading.Thread(target=joiningfunc,
    1111                                       args=(main_thread,))
    1112                  print('end of main')
    1113                  t.start()
    1114                  t.join() # Should not block: main_thread is already stopped
    1115  
    1116              w = threading.Thread(target=worker)
    1117              w.start()
    1118              """
    1119          self._run_and_join(script)
    1120  
    1121      @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
    1122      def test_4_daemon_threads(self):
    1123          # Check that a daemon thread cannot crash the interpreter on shutdown
    1124          # by manipulating internal structures that are being disposed of in
    1125          # the main thread.
    1126          script = """if True:
    1127              import os
    1128              import random
    1129              import sys
    1130              import time
    1131              import threading
    1132  
    1133              thread_has_run = set()
    1134  
    1135              def random_io():
    1136                  '''Loop for a while sleeping random tiny amounts and doing some I/O.'''
    1137                  import test.test_threading as mod
    1138                  while True:
    1139                      with open(mod.__file__, 'rb') as in_f:
    1140                          stuff = in_f.read(200)
    1141                          with open(os.devnull, 'wb') as null_f:
    1142                              null_f.write(stuff)
    1143                              time.sleep(random.random() / 1995)
    1144                      thread_has_run.add(threading.current_thread())
    1145  
    1146              def main():
    1147                  count = 0
    1148                  for _ in range(40):
    1149                      new_thread = threading.Thread(target=random_io)
    1150                      new_thread.daemon = True
    1151                      new_thread.start()
    1152                      count += 1
    1153                  while len(thread_has_run) < count:
    1154                      time.sleep(0.001)
    1155                  # Trigger process shutdown
    1156                  sys.exit(0)
    1157  
    1158              main()
    1159              """
    1160          rc, out, err = assert_python_ok('-c', script)
    1161          self.assertFalse(err)
    1162  
    1163      @support.requires_fork()
    1164      @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
    1165      def test_reinit_tls_after_fork(self):
    1166          # Issue #13817: fork() would deadlock in a multithreaded program with
    1167          # the ad-hoc TLS implementation.
    1168  
    1169          def do_fork_and_wait():
    1170              # just fork a child process and wait it
    1171              pid = os.fork()
    1172              if pid > 0:
    1173                  support.wait_process(pid, exitcode=50)
    1174              else:
    1175                  os._exit(50)
    1176  
    1177          # Ignore the warning about fork with threads.
    1178          with warnings.catch_warnings(category=DeprecationWarning,
    1179                                       action="ignore"):
    1180              # start a bunch of threads that will fork() child processes
    1181              threads = []
    1182              for i in range(16):
    1183                  t = threading.Thread(target=do_fork_and_wait)
    1184                  threads.append(t)
    1185                  t.start()
    1186  
    1187              for t in threads:
    1188                  t.join()
    1189  
    1190      @support.requires_fork()
    1191      def test_clear_threads_states_after_fork(self):
    1192          # Issue #17094: check that threads states are cleared after fork()
    1193  
    1194          # start a bunch of threads
    1195          threads = []
    1196          for i in range(16):
    1197              t = threading.Thread(target=lambda : time.sleep(0.3))
    1198              threads.append(t)
    1199              t.start()
    1200  
    1201          try:
    1202              # Ignore the warning about fork with threads.
    1203              with warnings.catch_warnings(category=DeprecationWarning,
    1204                                           action="ignore"):
    1205                  pid = os.fork()
    1206                  if pid == 0:
    1207                      # check that threads states have been cleared
    1208                      if len(sys._current_frames()) == 1:
    1209                          os._exit(51)
    1210                      else:
    1211                          os._exit(52)
    1212                  else:
    1213                      support.wait_process(pid, exitcode=51)
    1214          finally:
    1215              for t in threads:
    1216                  t.join()
    1217  
    1218  
    1219  class ESC[4;38;5;81mSubinterpThreadingTests(ESC[4;38;5;149mBaseTestCase):
    1220      def pipe(self):
    1221          r, w = os.pipe()
    1222          self.addCleanup(os.close, r)
    1223          self.addCleanup(os.close, w)
    1224          if hasattr(os, 'set_blocking'):
    1225              os.set_blocking(r, False)
    1226          return (r, w)
    1227  
    1228      def test_threads_join(self):
    1229          # Non-daemon threads should be joined at subinterpreter shutdown
    1230          # (issue #18808)
    1231          r, w = self.pipe()
    1232          code = textwrap.dedent(r"""
    1233              import os
    1234              import random
    1235              import threading
    1236              import time
    1237  
    1238              def random_sleep():
    1239                  seconds = random.random() * 0.010
    1240                  time.sleep(seconds)
    1241  
    1242              def f():
    1243                  # Sleep a bit so that the thread is still running when
    1244                  # Py_EndInterpreter is called.
    1245                  random_sleep()
    1246                  os.write(%d, b"x")
    1247  
    1248              threading.Thread(target=f).start()
    1249              random_sleep()
    1250          """ % (w,))
    1251          ret = test.support.run_in_subinterp(code)
    1252          self.assertEqual(ret, 0)
    1253          # The thread was joined properly.
    1254          self.assertEqual(os.read(r, 1), b"x")
    1255  
    1256      def test_threads_join_2(self):
    1257          # Same as above, but a delay gets introduced after the thread's
    1258          # Python code returned but before the thread state is deleted.
    1259          # To achieve this, we register a thread-local object which sleeps
    1260          # a bit when deallocated.
    1261          r, w = self.pipe()
    1262          code = textwrap.dedent(r"""
    1263              import os
    1264              import random
    1265              import threading
    1266              import time
    1267  
    1268              def random_sleep():
    1269                  seconds = random.random() * 0.010
    1270                  time.sleep(seconds)
    1271  
    1272              class Sleeper:
    1273                  def __del__(self):
    1274                      random_sleep()
    1275  
    1276              tls = threading.local()
    1277  
    1278              def f():
    1279                  # Sleep a bit so that the thread is still running when
    1280                  # Py_EndInterpreter is called.
    1281                  random_sleep()
    1282                  tls.x = Sleeper()
    1283                  os.write(%d, b"x")
    1284  
    1285              threading.Thread(target=f).start()
    1286              random_sleep()
    1287          """ % (w,))
    1288          ret = test.support.run_in_subinterp(code)
    1289          self.assertEqual(ret, 0)
    1290          # The thread was joined properly.
    1291          self.assertEqual(os.read(r, 1), b"x")
    1292  
    1293      @cpython_only
    1294      def test_daemon_threads_fatal_error(self):
    1295          subinterp_code = f"""if 1:
    1296              import os
    1297              import threading
    1298              import time
    1299  
    1300              def f():
    1301                  # Make sure the daemon thread is still running when
    1302                  # Py_EndInterpreter is called.
    1303                  time.sleep({test.support.SHORT_TIMEOUT})
    1304              threading.Thread(target=f, daemon=True).start()
    1305              """
    1306          script = r"""if 1:
    1307              import _testcapi
    1308  
    1309              _testcapi.run_in_subinterp(%r)
    1310              """ % (subinterp_code,)
    1311          with test.support.SuppressCrashReport():
    1312              rc, out, err = assert_python_failure("-c", script)
    1313          self.assertIn("Fatal Python error: Py_EndInterpreter: "
    1314                        "not the last thread", err.decode())
    1315  
    1316      def _check_allowed(self, before_start='', *,
    1317                         allowed=True,
    1318                         daemon_allowed=True,
    1319                         daemon=False,
    1320                         ):
    1321          subinterp_code = textwrap.dedent(f"""
    1322              import test.support
    1323              import threading
    1324              def func():
    1325                  print('this should not have run!')
    1326              t = threading.Thread(target=func, daemon={daemon})
    1327              {before_start}
    1328              t.start()
    1329              """)
    1330          script = textwrap.dedent(f"""
    1331              import test.support
    1332              test.support.run_in_subinterp_with_config(
    1333                  {subinterp_code!r},
    1334                  use_main_obmalloc=True,
    1335                  allow_fork=True,
    1336                  allow_exec=True,
    1337                  allow_threads={allowed},
    1338                  allow_daemon_threads={daemon_allowed},
    1339                  check_multi_interp_extensions=False,
    1340                  own_gil=False,
    1341              )
    1342              """)
    1343          with test.support.SuppressCrashReport():
    1344              _, _, err = assert_python_ok("-c", script)
    1345          return err.decode()
    1346  
    1347      @cpython_only
    1348      def test_threads_not_allowed(self):
    1349          err = self._check_allowed(
    1350              allowed=False,
    1351              daemon_allowed=False,
    1352              daemon=False,
    1353          )
    1354          self.assertIn('RuntimeError', err)
    1355  
    1356      @cpython_only
    1357      def test_daemon_threads_not_allowed(self):
    1358          with self.subTest('via Thread()'):
    1359              err = self._check_allowed(
    1360                  allowed=True,
    1361                  daemon_allowed=False,
    1362                  daemon=True,
    1363              )
    1364              self.assertIn('RuntimeError', err)
    1365  
    1366          with self.subTest('via Thread.daemon setter'):
    1367              err = self._check_allowed(
    1368                  't.daemon = True',
    1369                  allowed=True,
    1370                  daemon_allowed=False,
    1371                  daemon=False,
    1372              )
    1373              self.assertIn('RuntimeError', err)
    1374  
    1375  
    1376  class ESC[4;38;5;81mThreadingExceptionTests(ESC[4;38;5;149mBaseTestCase):
    1377      # A RuntimeError should be raised if Thread.start() is called
    1378      # multiple times.
    1379      def test_start_thread_again(self):
    1380          thread = threading.Thread()
    1381          thread.start()
    1382          self.assertRaises(RuntimeError, thread.start)
    1383          thread.join()
    1384  
    1385      def test_joining_current_thread(self):
    1386          current_thread = threading.current_thread()
    1387          self.assertRaises(RuntimeError, current_thread.join);
    1388  
    1389      def test_joining_inactive_thread(self):
    1390          thread = threading.Thread()
    1391          self.assertRaises(RuntimeError, thread.join)
    1392  
    1393      def test_daemonize_active_thread(self):
    1394          thread = threading.Thread()
    1395          thread.start()
    1396          self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
    1397          thread.join()
    1398  
    1399      def test_releasing_unacquired_lock(self):
    1400          lock = threading.Lock()
    1401          self.assertRaises(RuntimeError, lock.release)
    1402  
    1403      @requires_subprocess()
    1404      def test_recursion_limit(self):
    1405          # Issue 9670
    1406          # test that excessive recursion within a non-main thread causes
    1407          # an exception rather than crashing the interpreter on platforms
    1408          # like Mac OS X or FreeBSD which have small default stack sizes
    1409          # for threads
    1410          script = """if True:
    1411              import threading
    1412  
    1413              def recurse():
    1414                  return recurse()
    1415  
    1416              def outer():
    1417                  try:
    1418                      recurse()
    1419                  except RecursionError:
    1420                      pass
    1421  
    1422              w = threading.Thread(target=outer)
    1423              w.start()
    1424              w.join()
    1425              print('end of main thread')
    1426              """
    1427          expected_output = "end of main thread\n"
    1428          p = subprocess.Popen([sys.executable, "-c", script],
    1429                               stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    1430          stdout, stderr = p.communicate()
    1431          data = stdout.decode().replace('\r', '')
    1432          self.assertEqual(p.returncode, 0, "Unexpected error: " + stderr.decode())
    1433          self.assertEqual(data, expected_output)
    1434  
    1435      def test_print_exception(self):
    1436          script = r"""if True:
    1437              import threading
    1438              import time
    1439  
    1440              running = False
    1441              def run():
    1442                  global running
    1443                  running = True
    1444                  while running:
    1445                      time.sleep(0.01)
    1446                  1/0
    1447              t = threading.Thread(target=run)
    1448              t.start()
    1449              while not running:
    1450                  time.sleep(0.01)
    1451              running = False
    1452              t.join()
    1453              """
    1454          rc, out, err = assert_python_ok("-c", script)
    1455          self.assertEqual(out, b'')
    1456          err = err.decode()
    1457          self.assertIn("Exception in thread", err)
    1458          self.assertIn("Traceback (most recent call last):", err)
    1459          self.assertIn("ZeroDivisionError", err)
    1460          self.assertNotIn("Unhandled exception", err)
    1461  
    1462      def test_print_exception_stderr_is_none_1(self):
    1463          script = r"""if True:
    1464              import sys
    1465              import threading
    1466              import time
    1467  
    1468              running = False
    1469              def run():
    1470                  global running
    1471                  running = True
    1472                  while running:
    1473                      time.sleep(0.01)
    1474                  1/0
    1475              t = threading.Thread(target=run)
    1476              t.start()
    1477              while not running:
    1478                  time.sleep(0.01)
    1479              sys.stderr = None
    1480              running = False
    1481              t.join()
    1482              """
    1483          rc, out, err = assert_python_ok("-c", script)
    1484          self.assertEqual(out, b'')
    1485          err = err.decode()
    1486          self.assertIn("Exception in thread", err)
    1487          self.assertIn("Traceback (most recent call last):", err)
    1488          self.assertIn("ZeroDivisionError", err)
    1489          self.assertNotIn("Unhandled exception", err)
    1490  
    1491      def test_print_exception_stderr_is_none_2(self):
    1492          script = r"""if True:
    1493              import sys
    1494              import threading
    1495              import time
    1496  
    1497              running = False
    1498              def run():
    1499                  global running
    1500                  running = True
    1501                  while running:
    1502                      time.sleep(0.01)
    1503                  1/0
    1504              sys.stderr = None
    1505              t = threading.Thread(target=run)
    1506              t.start()
    1507              while not running:
    1508                  time.sleep(0.01)
    1509              running = False
    1510              t.join()
    1511              """
    1512          rc, out, err = assert_python_ok("-c", script)
    1513          self.assertEqual(out, b'')
    1514          self.assertNotIn("Unhandled exception", err.decode())
    1515  
    1516      def test_print_exception_gh_102056(self):
    1517          # This used to crash. See gh-102056.
    1518          script = r"""if True:
    1519              import time
    1520              import threading
    1521              import _thread
    1522  
    1523              def f():
    1524                  try:
    1525                      f()
    1526                  except RecursionError:
    1527                      f()
    1528  
    1529              def g():
    1530                  try:
    1531                      raise ValueError()
    1532                  except* ValueError:
    1533                      f()
    1534  
    1535              def h():
    1536                  time.sleep(1)
    1537                  _thread.interrupt_main()
    1538  
    1539              t = threading.Thread(target=h)
    1540              t.start()
    1541              g()
    1542              t.join()
    1543              """
    1544  
    1545          assert_python_failure("-c", script)
    1546  
    1547      def test_bare_raise_in_brand_new_thread(self):
    1548          def bare_raise():
    1549              raise
    1550  
    1551          class ESC[4;38;5;81mIssue27558(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
    1552              exc = None
    1553  
    1554              def run(self):
    1555                  try:
    1556                      bare_raise()
    1557                  except Exception as exc:
    1558                      self.exc = exc
    1559  
    1560          thread = Issue27558()
    1561          thread.start()
    1562          thread.join()
    1563          self.assertIsNotNone(thread.exc)
    1564          self.assertIsInstance(thread.exc, RuntimeError)
    1565          # explicitly break the reference cycle to not leak a dangling thread
    1566          thread.exc = None
    1567  
    1568      def test_multithread_modify_file_noerror(self):
    1569          # See issue25872
    1570          def modify_file():
    1571              with open(os_helper.TESTFN, 'w', encoding='utf-8') as fp:
    1572                  fp.write(' ')
    1573                  traceback.format_stack()
    1574  
    1575          self.addCleanup(os_helper.unlink, os_helper.TESTFN)
    1576          threads = [
    1577              threading.Thread(target=modify_file)
    1578              for i in range(100)
    1579          ]
    1580          for t in threads:
    1581              t.start()
    1582              t.join()
    1583  
    1584  
    1585  class ESC[4;38;5;81mThreadRunFail(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
    1586      def run(self):
    1587          raise ValueError("run failed")
    1588  
    1589  
    1590  class ESC[4;38;5;81mExceptHookTests(ESC[4;38;5;149mBaseTestCase):
    1591      def setUp(self):
    1592          restore_default_excepthook(self)
    1593          super().setUp()
    1594  
    1595      def test_excepthook(self):
    1596          with support.captured_output("stderr") as stderr:
    1597              thread = ThreadRunFail(name="excepthook thread")
    1598              thread.start()
    1599              thread.join()
    1600  
    1601          stderr = stderr.getvalue().strip()
    1602          self.assertIn(f'Exception in thread {thread.name}:\n', stderr)
    1603          self.assertIn('Traceback (most recent call last):\n', stderr)
    1604          self.assertIn('  raise ValueError("run failed")', stderr)
    1605          self.assertIn('ValueError: run failed', stderr)
    1606  
    1607      @support.cpython_only
    1608      def test_excepthook_thread_None(self):
    1609          # threading.excepthook called with thread=None: log the thread
    1610          # identifier in this case.
    1611          with support.captured_output("stderr") as stderr:
    1612              try:
    1613                  raise ValueError("bug")
    1614              except Exception as exc:
    1615                  args = threading.ExceptHookArgs([*sys.exc_info(), None])
    1616                  try:
    1617                      threading.excepthook(args)
    1618                  finally:
    1619                      # Explicitly break a reference cycle
    1620                      args = None
    1621  
    1622          stderr = stderr.getvalue().strip()
    1623          self.assertIn(f'Exception in thread {threading.get_ident()}:\n', stderr)
    1624          self.assertIn('Traceback (most recent call last):\n', stderr)
    1625          self.assertIn('  raise ValueError("bug")', stderr)
    1626          self.assertIn('ValueError: bug', stderr)
    1627  
    1628      def test_system_exit(self):
    1629          class ESC[4;38;5;81mThreadExit(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
    1630              def run(self):
    1631                  sys.exit(1)
    1632  
    1633          # threading.excepthook() silently ignores SystemExit
    1634          with support.captured_output("stderr") as stderr:
    1635              thread = ThreadExit()
    1636              thread.start()
    1637              thread.join()
    1638  
    1639          self.assertEqual(stderr.getvalue(), '')
    1640  
    1641      def test_custom_excepthook(self):
    1642          args = None
    1643  
    1644          def hook(hook_args):
    1645              nonlocal args
    1646              args = hook_args
    1647  
    1648          try:
    1649              with support.swap_attr(threading, 'excepthook', hook):
    1650                  thread = ThreadRunFail()
    1651                  thread.start()
    1652                  thread.join()
    1653  
    1654              self.assertEqual(args.exc_type, ValueError)
    1655              self.assertEqual(str(args.exc_value), 'run failed')
    1656              self.assertEqual(args.exc_traceback, args.exc_value.__traceback__)
    1657              self.assertIs(args.thread, thread)
    1658          finally:
    1659              # Break reference cycle
    1660              args = None
    1661  
    1662      def test_custom_excepthook_fail(self):
    1663          def threading_hook(args):
    1664              raise ValueError("threading_hook failed")
    1665  
    1666          err_str = None
    1667  
    1668          def sys_hook(exc_type, exc_value, exc_traceback):
    1669              nonlocal err_str
    1670              err_str = str(exc_value)
    1671  
    1672          with support.swap_attr(threading, 'excepthook', threading_hook), \
    1673               support.swap_attr(sys, 'excepthook', sys_hook), \
    1674               support.captured_output('stderr') as stderr:
    1675              thread = ThreadRunFail()
    1676              thread.start()
    1677              thread.join()
    1678  
    1679          self.assertEqual(stderr.getvalue(),
    1680                           'Exception in threading.excepthook:\n')
    1681          self.assertEqual(err_str, 'threading_hook failed')
    1682  
    1683      def test_original_excepthook(self):
    1684          def run_thread():
    1685              with support.captured_output("stderr") as output:
    1686                  thread = ThreadRunFail(name="excepthook thread")
    1687                  thread.start()
    1688                  thread.join()
    1689              return output.getvalue()
    1690  
    1691          def threading_hook(args):
    1692              print("Running a thread failed", file=sys.stderr)
    1693  
    1694          default_output = run_thread()
    1695          with support.swap_attr(threading, 'excepthook', threading_hook):
    1696              custom_hook_output = run_thread()
    1697              threading.excepthook = threading.__excepthook__
    1698              recovered_output = run_thread()
    1699  
    1700          self.assertEqual(default_output, recovered_output)
    1701          self.assertNotEqual(default_output, custom_hook_output)
    1702          self.assertEqual(custom_hook_output, "Running a thread failed\n")
    1703  
    1704  
    1705  class ESC[4;38;5;81mTimerTests(ESC[4;38;5;149mBaseTestCase):
    1706  
    1707      def setUp(self):
    1708          BaseTestCase.setUp(self)
    1709          self.callback_args = []
    1710          self.callback_event = threading.Event()
    1711  
    1712      def test_init_immutable_default_args(self):
    1713          # Issue 17435: constructor defaults were mutable objects, they could be
    1714          # mutated via the object attributes and affect other Timer objects.
    1715          timer1 = threading.Timer(0.01, self._callback_spy)
    1716          timer1.start()
    1717          self.callback_event.wait()
    1718          timer1.args.append("blah")
    1719          timer1.kwargs["foo"] = "bar"
    1720          self.callback_event.clear()
    1721          timer2 = threading.Timer(0.01, self._callback_spy)
    1722          timer2.start()
    1723          self.callback_event.wait()
    1724          self.assertEqual(len(self.callback_args), 2)
    1725          self.assertEqual(self.callback_args, [((), {}), ((), {})])
    1726          timer1.join()
    1727          timer2.join()
    1728  
    1729      def _callback_spy(self, *args, **kwargs):
    1730          self.callback_args.append((args[:], kwargs.copy()))
    1731          self.callback_event.set()
    1732  
    1733  class ESC[4;38;5;81mLockTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mLockTests):
    1734      locktype = staticmethod(threading.Lock)
    1735  
    1736  class ESC[4;38;5;81mPyRLockTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mRLockTests):
    1737      locktype = staticmethod(threading._PyRLock)
    1738  
    1739  @unittest.skipIf(threading._CRLock is None, 'RLock not implemented in C')
    1740  class ESC[4;38;5;81mCRLockTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mRLockTests):
    1741      locktype = staticmethod(threading._CRLock)
    1742  
    1743  class ESC[4;38;5;81mEventTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mEventTests):
    1744      eventtype = staticmethod(threading.Event)
    1745  
    1746  class ESC[4;38;5;81mConditionAsRLockTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mRLockTests):
    1747      # Condition uses an RLock by default and exports its API.
    1748      locktype = staticmethod(threading.Condition)
    1749  
    1750  class ESC[4;38;5;81mConditionTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mConditionTests):
    1751      condtype = staticmethod(threading.Condition)
    1752  
    1753  class ESC[4;38;5;81mSemaphoreTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mSemaphoreTests):
    1754      semtype = staticmethod(threading.Semaphore)
    1755  
    1756  class ESC[4;38;5;81mBoundedSemaphoreTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mBoundedSemaphoreTests):
    1757      semtype = staticmethod(threading.BoundedSemaphore)
    1758  
    1759  class ESC[4;38;5;81mBarrierTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mBarrierTests):
    1760      barriertype = staticmethod(threading.Barrier)
    1761  
    1762  
    1763  class ESC[4;38;5;81mMiscTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1764      def test__all__(self):
    1765          restore_default_excepthook(self)
    1766  
    1767          extra = {"ThreadError"}
    1768          not_exported = {'currentThread', 'activeCount'}
    1769          support.check__all__(self, threading, ('threading', '_thread'),
    1770                               extra=extra, not_exported=not_exported)
    1771  
    1772  
    1773  class ESC[4;38;5;81mInterruptMainTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1774      def check_interrupt_main_with_signal_handler(self, signum):
    1775          def handler(signum, frame):
    1776              1/0
    1777  
    1778          old_handler = signal.signal(signum, handler)
    1779          self.addCleanup(signal.signal, signum, old_handler)
    1780  
    1781          with self.assertRaises(ZeroDivisionError):
    1782              _thread.interrupt_main()
    1783  
    1784      def check_interrupt_main_noerror(self, signum):
    1785          handler = signal.getsignal(signum)
    1786          try:
    1787              # No exception should arise.
    1788              signal.signal(signum, signal.SIG_IGN)
    1789              _thread.interrupt_main(signum)
    1790  
    1791              signal.signal(signum, signal.SIG_DFL)
    1792              _thread.interrupt_main(signum)
    1793          finally:
    1794              # Restore original handler
    1795              signal.signal(signum, handler)
    1796  
    1797      def test_interrupt_main_subthread(self):
    1798          # Calling start_new_thread with a function that executes interrupt_main
    1799          # should raise KeyboardInterrupt upon completion.
    1800          def call_interrupt():
    1801              _thread.interrupt_main()
    1802          t = threading.Thread(target=call_interrupt)
    1803          with self.assertRaises(KeyboardInterrupt):
    1804              t.start()
    1805              t.join()
    1806          t.join()
    1807  
    1808      def test_interrupt_main_mainthread(self):
    1809          # Make sure that if interrupt_main is called in main thread that
    1810          # KeyboardInterrupt is raised instantly.
    1811          with self.assertRaises(KeyboardInterrupt):
    1812              _thread.interrupt_main()
    1813  
    1814      def test_interrupt_main_with_signal_handler(self):
    1815          self.check_interrupt_main_with_signal_handler(signal.SIGINT)
    1816          self.check_interrupt_main_with_signal_handler(signal.SIGTERM)
    1817  
    1818      def test_interrupt_main_noerror(self):
    1819          self.check_interrupt_main_noerror(signal.SIGINT)
    1820          self.check_interrupt_main_noerror(signal.SIGTERM)
    1821  
    1822      def test_interrupt_main_invalid_signal(self):
    1823          self.assertRaises(ValueError, _thread.interrupt_main, -1)
    1824          self.assertRaises(ValueError, _thread.interrupt_main, signal.NSIG)
    1825          self.assertRaises(ValueError, _thread.interrupt_main, 1000000)
    1826  
    1827      @threading_helper.reap_threads
    1828      def test_can_interrupt_tight_loops(self):
    1829          cont = [True]
    1830          started = [False]
    1831          interrupted = [False]
    1832  
    1833          def worker(started, cont, interrupted):
    1834              iterations = 100_000_000
    1835              started[0] = True
    1836              while cont[0]:
    1837                  if iterations:
    1838                      iterations -= 1
    1839                  else:
    1840                      return
    1841                  pass
    1842              interrupted[0] = True
    1843  
    1844          t = threading.Thread(target=worker,args=(started, cont, interrupted))
    1845          t.start()
    1846          while not started[0]:
    1847              pass
    1848          cont[0] = False
    1849          t.join()
    1850          self.assertTrue(interrupted[0])
    1851  
    1852  
    1853  class ESC[4;38;5;81mAtexitTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1854  
    1855      def test_atexit_output(self):
    1856          rc, out, err = assert_python_ok("-c", """if True:
    1857              import threading
    1858  
    1859              def run_last():
    1860                  print('parrot')
    1861  
    1862              threading._register_atexit(run_last)
    1863          """)
    1864  
    1865          self.assertFalse(err)
    1866          self.assertEqual(out.strip(), b'parrot')
    1867  
    1868      def test_atexit_called_once(self):
    1869          rc, out, err = assert_python_ok("-c", """if True:
    1870              import threading
    1871              from unittest.mock import Mock
    1872  
    1873              mock = Mock()
    1874              threading._register_atexit(mock)
    1875              mock.assert_not_called()
    1876              # force early shutdown to ensure it was called once
    1877              threading._shutdown()
    1878              mock.assert_called_once()
    1879          """)
    1880  
    1881          self.assertFalse(err)
    1882  
    1883      def test_atexit_after_shutdown(self):
    1884          # The only way to do this is by registering an atexit within
    1885          # an atexit, which is intended to raise an exception.
    1886          rc, out, err = assert_python_ok("-c", """if True:
    1887              import threading
    1888  
    1889              def func():
    1890                  pass
    1891  
    1892              def run_last():
    1893                  threading._register_atexit(func)
    1894  
    1895              threading._register_atexit(run_last)
    1896          """)
    1897  
    1898          self.assertTrue(err)
    1899          self.assertIn("RuntimeError: can't register atexit after shutdown",
    1900                  err.decode())
    1901  
    1902  
    1903  if __name__ == "__main__":
    1904      unittest.main()