(root)/
Python-3.11.7/
Lib/
test/
test_thread.py
       1  import os
       2  import unittest
       3  import random
       4  from test import support
       5  from test.support import threading_helper
       6  import _thread as thread
       7  import time
       8  import weakref
       9  
      10  from test import lock_tests
      11  
      12  threading_helper.requires_working_threading(module=True)
      13  
      14  NUMTASKS = 10
      15  NUMTRIPS = 3
      16  POLL_SLEEP = 0.010 # seconds = 10 ms
      17  
      18  _print_mutex = thread.allocate_lock()
      19  
      20  def verbose_print(arg):
      21      """Helper function for printing out debugging output."""
      22      if support.verbose:
      23          with _print_mutex:
      24              print(arg)
      25  
      26  
      27  class ESC[4;38;5;81mBasicThreadTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      28  
      29      def setUp(self):
      30          self.done_mutex = thread.allocate_lock()
      31          self.done_mutex.acquire()
      32          self.running_mutex = thread.allocate_lock()
      33          self.random_mutex = thread.allocate_lock()
      34          self.created = 0
      35          self.running = 0
      36          self.next_ident = 0
      37  
      38          key = threading_helper.threading_setup()
      39          self.addCleanup(threading_helper.threading_cleanup, *key)
      40  
      41  
      42  class ESC[4;38;5;81mThreadRunningTests(ESC[4;38;5;149mBasicThreadTest):
      43  
      44      def newtask(self):
      45          with self.running_mutex:
      46              self.next_ident += 1
      47              verbose_print("creating task %s" % self.next_ident)
      48              thread.start_new_thread(self.task, (self.next_ident,))
      49              self.created += 1
      50              self.running += 1
      51  
      52      def task(self, ident):
      53          with self.random_mutex:
      54              delay = random.random() / 10000.0
      55          verbose_print("task %s will run for %sus" % (ident, round(delay*1e6)))
      56          time.sleep(delay)
      57          verbose_print("task %s done" % ident)
      58          with self.running_mutex:
      59              self.running -= 1
      60              if self.created == NUMTASKS and self.running == 0:
      61                  self.done_mutex.release()
      62  
      63      def test_starting_threads(self):
      64          with threading_helper.wait_threads_exit():
      65              # Basic test for thread creation.
      66              for i in range(NUMTASKS):
      67                  self.newtask()
      68              verbose_print("waiting for tasks to complete...")
      69              self.done_mutex.acquire()
      70              verbose_print("all tasks done")
      71  
      72      def test_stack_size(self):
      73          # Various stack size tests.
      74          self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0")
      75  
      76          thread.stack_size(0)
      77          self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default")
      78  
      79      @unittest.skipIf(os.name not in ("nt", "posix"), 'test meant for nt and posix')
      80      def test_nt_and_posix_stack_size(self):
      81          try:
      82              thread.stack_size(4096)
      83          except ValueError:
      84              verbose_print("caught expected ValueError setting "
      85                              "stack_size(4096)")
      86          except thread.error:
      87              self.skipTest("platform does not support changing thread stack "
      88                            "size")
      89  
      90          fail_msg = "stack_size(%d) failed - should succeed"
      91          for tss in (262144, 0x100000, 0):
      92              thread.stack_size(tss)
      93              self.assertEqual(thread.stack_size(), tss, fail_msg % tss)
      94              verbose_print("successfully set stack_size(%d)" % tss)
      95  
      96          for tss in (262144, 0x100000):
      97              verbose_print("trying stack_size = (%d)" % tss)
      98              self.next_ident = 0
      99              self.created = 0
     100              with threading_helper.wait_threads_exit():
     101                  for i in range(NUMTASKS):
     102                      self.newtask()
     103  
     104                  verbose_print("waiting for all tasks to complete")
     105                  self.done_mutex.acquire()
     106                  verbose_print("all tasks done")
     107  
     108          thread.stack_size(0)
     109  
     110      def test__count(self):
     111          # Test the _count() function.
     112          orig = thread._count()
     113          mut = thread.allocate_lock()
     114          mut.acquire()
     115          started = []
     116  
     117          def task():
     118              started.append(None)
     119              mut.acquire()
     120              mut.release()
     121  
     122          with threading_helper.wait_threads_exit():
     123              thread.start_new_thread(task, ())
     124              while not started:
     125                  time.sleep(POLL_SLEEP)
     126              self.assertEqual(thread._count(), orig + 1)
     127              # Allow the task to finish.
     128              mut.release()
     129              # The only reliable way to be sure that the thread ended from the
     130              # interpreter's point of view is to wait for the function object to be
     131              # destroyed.
     132              done = []
     133              wr = weakref.ref(task, lambda _: done.append(None))
     134              del task
     135              while not done:
     136                  time.sleep(POLL_SLEEP)
     137                  support.gc_collect()  # For PyPy or other GCs.
     138              self.assertEqual(thread._count(), orig)
     139  
     140      def test_unraisable_exception(self):
     141          def task():
     142              started.release()
     143              raise ValueError("task failed")
     144  
     145          started = thread.allocate_lock()
     146          with support.catch_unraisable_exception() as cm:
     147              with threading_helper.wait_threads_exit():
     148                  started.acquire()
     149                  thread.start_new_thread(task, ())
     150                  started.acquire()
     151  
     152              self.assertEqual(str(cm.unraisable.exc_value), "task failed")
     153              self.assertIs(cm.unraisable.object, task)
     154              self.assertEqual(cm.unraisable.err_msg,
     155                               "Exception ignored in thread started by")
     156              self.assertIsNotNone(cm.unraisable.exc_traceback)
     157  
     158  
     159  class ESC[4;38;5;81mBarrier:
     160      def __init__(self, num_threads):
     161          self.num_threads = num_threads
     162          self.waiting = 0
     163          self.checkin_mutex  = thread.allocate_lock()
     164          self.checkout_mutex = thread.allocate_lock()
     165          self.checkout_mutex.acquire()
     166  
     167      def enter(self):
     168          self.checkin_mutex.acquire()
     169          self.waiting = self.waiting + 1
     170          if self.waiting == self.num_threads:
     171              self.waiting = self.num_threads - 1
     172              self.checkout_mutex.release()
     173              return
     174          self.checkin_mutex.release()
     175  
     176          self.checkout_mutex.acquire()
     177          self.waiting = self.waiting - 1
     178          if self.waiting == 0:
     179              self.checkin_mutex.release()
     180              return
     181          self.checkout_mutex.release()
     182  
     183  
     184  class ESC[4;38;5;81mBarrierTest(ESC[4;38;5;149mBasicThreadTest):
     185  
     186      def test_barrier(self):
     187          with threading_helper.wait_threads_exit():
     188              self.bar = Barrier(NUMTASKS)
     189              self.running = NUMTASKS
     190              for i in range(NUMTASKS):
     191                  thread.start_new_thread(self.task2, (i,))
     192              verbose_print("waiting for tasks to end")
     193              self.done_mutex.acquire()
     194              verbose_print("tasks done")
     195  
     196      def task2(self, ident):
     197          for i in range(NUMTRIPS):
     198              if ident == 0:
     199                  # give it a good chance to enter the next
     200                  # barrier before the others are all out
     201                  # of the current one
     202                  delay = 0
     203              else:
     204                  with self.random_mutex:
     205                      delay = random.random() / 10000.0
     206              verbose_print("task %s will run for %sus" %
     207                            (ident, round(delay * 1e6)))
     208              time.sleep(delay)
     209              verbose_print("task %s entering %s" % (ident, i))
     210              self.bar.enter()
     211              verbose_print("task %s leaving barrier" % ident)
     212          with self.running_mutex:
     213              self.running -= 1
     214              # Must release mutex before releasing done, else the main thread can
     215              # exit and set mutex to None as part of global teardown; then
     216              # mutex.release() raises AttributeError.
     217              finished = self.running == 0
     218          if finished:
     219              self.done_mutex.release()
     220  
     221  class ESC[4;38;5;81mLockTests(ESC[4;38;5;149mlock_testsESC[4;38;5;149m.ESC[4;38;5;149mLockTests):
     222      locktype = thread.allocate_lock
     223  
     224  
     225  class ESC[4;38;5;81mTestForkInThread(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     226      def setUp(self):
     227          self.read_fd, self.write_fd = os.pipe()
     228  
     229      @support.requires_fork()
     230      @threading_helper.reap_threads
     231      def test_forkinthread(self):
     232          pid = None
     233  
     234          def fork_thread(read_fd, write_fd):
     235              nonlocal pid
     236  
     237              # fork in a thread
     238              pid = os.fork()
     239              if pid:
     240                  # parent process
     241                  return
     242  
     243              # child process
     244              try:
     245                  os.close(read_fd)
     246                  os.write(write_fd, b"OK")
     247              finally:
     248                  os._exit(0)
     249  
     250          with threading_helper.wait_threads_exit():
     251              thread.start_new_thread(fork_thread, (self.read_fd, self.write_fd))
     252              self.assertEqual(os.read(self.read_fd, 2), b"OK")
     253              os.close(self.write_fd)
     254  
     255          self.assertIsNotNone(pid)
     256          support.wait_process(pid, exitcode=0)
     257  
     258      def tearDown(self):
     259          try:
     260              os.close(self.read_fd)
     261          except OSError:
     262              pass
     263  
     264          try:
     265              os.close(self.write_fd)
     266          except OSError:
     267              pass
     268  
     269  
     270  if __name__ == "__main__":
     271      unittest.main()