(root)/
Python-3.12.0/
Lib/
test/
test_thread.py
       1  import os
       2  import unittest
       3  import random
       4  from test import support
       5  from test.support import threading_helper
       6  import _thread as thread
       7  import time
       8  import warnings
       9  import weakref
      10  
      11  from test import lock_tests
      12  
      13  threading_helper.requires_working_threading(module=True)
      14  
      15  NUMTASKS = 10
      16  NUMTRIPS = 3
      17  
      18  _print_mutex = thread.allocate_lock()
      19  
      20  def verbose_print(arg):
      21      """Helper function for printing out debugging output."""
      22      if support.verbose:
      23          with _print_mutex:
      24              print(arg)
      25  
      26  
      27  class ESC[4;38;5;81mBasicThreadTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      28  
      29      def setUp(self):
      30          self.done_mutex = thread.allocate_lock()
      31          self.done_mutex.acquire()
      32          self.running_mutex = thread.allocate_lock()
      33          self.random_mutex = thread.allocate_lock()
      34          self.created = 0
      35          self.running = 0
      36          self.next_ident = 0
      37  
      38          key = threading_helper.threading_setup()
      39          self.addCleanup(threading_helper.threading_cleanup, *key)
      40  
      41  
      42  class ESC[4;38;5;81mThreadRunningTests(ESC[4;38;5;149mBasicThreadTest):
      43  
      44      def newtask(self):
      45          with self.running_mutex:
      46              self.next_ident += 1
      47              verbose_print("creating task %s" % self.next_ident)
      48              thread.start_new_thread(self.task, (self.next_ident,))
      49              self.created += 1
      50              self.running += 1
      51  
      52      def task(self, ident):
      53          with self.random_mutex:
      54              delay = random.random() / 10000.0
      55          verbose_print("task %s will run for %sus" % (ident, round(delay*1e6)))
      56          time.sleep(delay)
      57          verbose_print("task %s done" % ident)
      58          with self.running_mutex:
      59              self.running -= 1
      60              if self.created == NUMTASKS and self.running == 0:
      61                  self.done_mutex.release()
      62  
      63      def test_starting_threads(self):
      64          with threading_helper.wait_threads_exit():
      65              # Basic test for thread creation.
      66              for i in range(NUMTASKS):
      67                  self.newtask()
      68              verbose_print("waiting for tasks to complete...")
      69              self.done_mutex.acquire()
      70              verbose_print("all tasks done")
      71  
      72      def test_stack_size(self):
      73          # Various stack size tests.
      74          self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0")
      75  
      76          thread.stack_size(0)
      77          self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default")
      78  
      79      @unittest.skipIf(os.name not in ("nt", "posix"), 'test meant for nt and posix')
      80      def test_nt_and_posix_stack_size(self):
      81          try:
      82              thread.stack_size(4096)
      83          except ValueError:
      84              verbose_print("caught expected ValueError setting "
      85                              "stack_size(4096)")
      86          except thread.error:
      87              self.skipTest("platform does not support changing thread stack "
      88                            "size")
      89  
      90          fail_msg = "stack_size(%d) failed - should succeed"
      91          for tss in (262144, 0x100000, 0):
      92              thread.stack_size(tss)
      93              self.assertEqual(thread.stack_size(), tss, fail_msg % tss)
      94              verbose_print("successfully set stack_size(%d)" % tss)
      95  
      96          for tss in (262144, 0x100000):
      97              verbose_print("trying stack_size = (%d)" % tss)
      98              self.next_ident = 0
      99              self.created = 0
     100              with threading_helper.wait_threads_exit():
     101                  for i in range(NUMTASKS):
     102                      self.newtask()
     103  
     104                  verbose_print("waiting for all tasks to complete")
     105                  self.done_mutex.acquire()
     106                  verbose_print("all tasks done")
     107  
     108          thread.stack_size(0)
     109  
     110      def test__count(self):
     111          # Test the _count() function.
     112          orig = thread._count()
     113          mut = thread.allocate_lock()
     114          mut.acquire()
     115          started = []
     116  
     117          def task():
     118              started.append(None)
     119              mut.acquire()
     120              mut.release()
     121  
     122          with threading_helper.wait_threads_exit():
     123              thread.start_new_thread(task, ())
     124              for _ in support.sleeping_retry(support.LONG_TIMEOUT):
     125                  if started:
     126                      break
     127              self.assertEqual(thread._count(), orig + 1)
     128  
     129              # Allow the task to finish.
     130              mut.release()
     131  
     132              # The only reliable way to be sure that the thread ended from the
     133              # interpreter's point of view is to wait for the function object to
     134              # be destroyed.
     135              done = []
     136              wr = weakref.ref(task, lambda _: done.append(None))
     137              del task
     138  
     139              for _ in support.sleeping_retry(support.LONG_TIMEOUT):
     140                  if done:
     141                      break
     142                  support.gc_collect()  # For PyPy or other GCs.
     143              self.assertEqual(thread._count(), orig)
     144  
     145      def test_unraisable_exception(self):
     146          def task():
     147              started.release()
     148              raise ValueError("task failed")
     149  
     150          started = thread.allocate_lock()
     151          with support.catch_unraisable_exception() as cm:
     152              with threading_helper.wait_threads_exit():
     153                  started.acquire()
     154                  thread.start_new_thread(task, ())
     155                  started.acquire()
     156  
     157              self.assertEqual(str(cm.unraisable.exc_value), "task failed")
     158              self.assertIs(cm.unraisable.object, task)
     159              self.assertEqual(cm.unraisable.err_msg,
     160                               "Exception ignored in thread started by")
     161              self.assertIsNotNone(cm.unraisable.exc_traceback)
     162  
     163  
     164  class ESC[4;38;5;81mBarrier:
     165      def __init__(self, num_threads):
     166          self.num_threads = num_threads
     167          self.waiting = 0
     168          self.checkin_mutex  = thread.allocate_lock()
     169          self.checkout_mutex = thread.allocate_lock()
     170          self.checkout_mutex.acquire()
     171  
     172      def enter(self):
     173          self.checkin_mutex.acquire()
     174          self.waiting = self.waiting + 1
     175          if self.waiting == self.num_threads:
     176              self.waiting = self.num_threads - 1
     177              self.checkout_mutex.release()
     178              return
     179          self.checkin_mutex.release()
     180  
     181          self.checkout_mutex.acquire()
     182          self.waiting = self.waiting - 1
     183          if self.waiting == 0:
     184              self.checkin_mutex.release()
     185              return
     186          self.checkout_mutex.release()
     187  
     188  
     189  class ESC[4;38;5;81mBarrierTest(ESC[4;38;5;149mBasicThreadTest):
     190  
     191      def test_barrier(self):
     192          with threading_helper.wait_threads_exit():
     193              self.bar = Barrier(NUMTASKS)
     194              self.running = NUMTASKS
     195              for i in range(NUMTASKS):
     196                  thread.start_new_thread(self.task2, (i,))
     197              verbose_print("waiting for tasks to end")
     198              self.done_mutex.acquire()
     199              verbose_print("tasks done")
     200  
     201      def task2(self, ident):
     202          for i in range(NUMTRIPS):
     203              if ident == 0:
     204                  # give it a good chance to enter the next
     205                  # barrier before the others are all out
     206                  # of the current one
     207                  delay = 0
     208              else:
     209                  with self.random_mutex:
     210                      delay = random.random() / 10000.0
     211              verbose_print("task %s will run for %sus" %
     212                            (ident, round(delay * 1e6)))
     213              time.sleep(delay)
     214              verbose_print("task %s entering %s" % (ident, i))
     215              self.bar.enter()
     216              verbose_print("task %s leaving barrier" % ident)
     217          with self.running_mutex:
     218              self.running -= 1
     219              # Must release mutex before releasing done, else the main thread can
     220              # exit and set mutex to None as part of global teardown; then
     221              # mutex.release() raises AttributeError.
     222              finished = self.running == 0
     223          if finished:
     224              self.done_mutex.release()
     225  
     226  class ESC[4;38;5;81mLockTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mLockTests):
     227      locktype = thread.allocate_lock
     228  
     229  
     230  class ESC[4;38;5;81mTestForkInThread(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     231      def setUp(self):
     232          self.read_fd, self.write_fd = os.pipe()
     233  
     234      @support.requires_fork()
     235      @threading_helper.reap_threads
     236      def test_forkinthread(self):
     237          pid = None
     238  
     239          def fork_thread(read_fd, write_fd):
     240              nonlocal pid
     241  
     242              # Ignore the warning about fork with threads.
     243              with warnings.catch_warnings(category=DeprecationWarning,
     244                                           action="ignore"):
     245                  # fork in a thread (DANGER, undefined per POSIX)
     246                  if (pid := os.fork()):
     247                      # parent process
     248                      return
     249  
     250              # child process
     251              try:
     252                  os.close(read_fd)
     253                  os.write(write_fd, b"OK")
     254              finally:
     255                  os._exit(0)
     256  
     257          with threading_helper.wait_threads_exit():
     258              thread.start_new_thread(fork_thread, (self.read_fd, self.write_fd))
     259              self.assertEqual(os.read(self.read_fd, 2), b"OK")
     260              os.close(self.write_fd)
     261  
     262          self.assertIsNotNone(pid)
     263          support.wait_process(pid, exitcode=0)
     264  
     265      def tearDown(self):
     266          try:
     267              os.close(self.read_fd)
     268          except OSError:
     269              pass
     270  
     271          try:
     272              os.close(self.write_fd)
     273          except OSError:
     274              pass
     275  
     276  
     277  if __name__ == "__main__":
     278      unittest.main()