(root)/
Python-3.11.7/
Lib/
test/
support/
threading_helper.py
       1  import _thread
       2  import contextlib
       3  import functools
       4  import sys
       5  import threading
       6  import time
       7  import unittest
       8  
       9  from test import support
      10  
      11  
      12  #=======================================================================
      13  # Threading support to prevent reporting refleaks when running regrtest.py -R
      14  
      15  # NOTE: we use thread._count() rather than threading.enumerate() (or the
      16  # moral equivalent thereof) because a threading.Thread object is still alive
      17  # until its __bootstrap() method has returned, even after it has been
      18  # unregistered from the threading module.
      19  # thread._count(), on the other hand, only gets decremented *after* the
      20  # __bootstrap() method has returned, which gives us reliable reference counts
      21  # at the end of a test run.
      22  
      23  
      24  def threading_setup():
      25      return _thread._count(), len(threading._dangling)
      26  
      27  
      28  def threading_cleanup(*original_values):
      29      orig_count, orig_ndangling = original_values
      30  
      31      timeout = 1.0
      32      for _ in support.sleeping_retry(timeout, error=False):
      33          # Copy the thread list to get a consistent output. threading._dangling
      34          # is a WeakSet, its value changes when it's read.
      35          dangling_threads = list(threading._dangling)
      36          count = _thread._count()
      37  
      38          if count <= orig_count:
      39              return
      40  
      41      # Timeout!
      42      support.environment_altered = True
      43      support.print_warning(
      44          f"threading_cleanup() failed to clean up threads "
      45          f"in {timeout:.1f} seconds\n"
      46          f"  before: thread count={orig_count}, dangling={orig_ndangling}\n"
      47          f"  after: thread count={count}, dangling={len(dangling_threads)}")
      48      for thread in dangling_threads:
      49          support.print_warning(f"Dangling thread: {thread!r}")
      50  
      51      # The warning happens when a test spawns threads and some of these threads
      52      # are still running after the test completes. To fix this warning, join
      53      # threads explicitly to wait until they complete.
      54      #
      55      # To make the warning more likely, reduce the timeout.
      56  
      57  
      58  def reap_threads(func):
      59      """Use this function when threads are being used.  This will
      60      ensure that the threads are cleaned up even when the test fails.
      61      """
      62      @functools.wraps(func)
      63      def decorator(*args):
      64          key = threading_setup()
      65          try:
      66              return func(*args)
      67          finally:
      68              threading_cleanup(*key)
      69      return decorator
      70  
      71  
      72  @contextlib.contextmanager
      73  def wait_threads_exit(timeout=None):
      74      """
      75      bpo-31234: Context manager to wait until all threads created in the with
      76      statement exit.
      77  
      78      Use _thread.count() to check if threads exited. Indirectly, wait until
      79      threads exit the internal t_bootstrap() C function of the _thread module.
      80  
      81      threading_setup() and threading_cleanup() are designed to emit a warning
      82      if a test leaves running threads in the background. This context manager
      83      is designed to cleanup threads started by the _thread.start_new_thread()
      84      which doesn't allow to wait for thread exit, whereas thread.Thread has a
      85      join() method.
      86      """
      87      if timeout is None:
      88          timeout = support.SHORT_TIMEOUT
      89      old_count = _thread._count()
      90      try:
      91          yield
      92      finally:
      93          start_time = time.monotonic()
      94          for _ in support.sleeping_retry(timeout, error=False):
      95              support.gc_collect()
      96              count = _thread._count()
      97              if count <= old_count:
      98                  break
      99          else:
     100              dt = time.monotonic() - start_time
     101              msg = (f"wait_threads() failed to cleanup {count - old_count} "
     102                     f"threads after {dt:.1f} seconds "
     103                     f"(count: {count}, old count: {old_count})")
     104              raise AssertionError(msg)
     105  
     106  
     107  def join_thread(thread, timeout=None):
     108      """Join a thread. Raise an AssertionError if the thread is still alive
     109      after timeout seconds.
     110      """
     111      if timeout is None:
     112          timeout = support.SHORT_TIMEOUT
     113      thread.join(timeout)
     114      if thread.is_alive():
     115          msg = f"failed to join the thread in {timeout:.1f} seconds"
     116          raise AssertionError(msg)
     117  
     118  
     119  @contextlib.contextmanager
     120  def start_threads(threads, unlock=None):
     121      import faulthandler
     122      threads = list(threads)
     123      started = []
     124      try:
     125          try:
     126              for t in threads:
     127                  t.start()
     128                  started.append(t)
     129          except:
     130              if support.verbose:
     131                  print("Can't start %d threads, only %d threads started" %
     132                        (len(threads), len(started)))
     133              raise
     134          yield
     135      finally:
     136          try:
     137              if unlock:
     138                  unlock()
     139              endtime = time.monotonic()
     140              for timeout in range(1, 16):
     141                  endtime += 60
     142                  for t in started:
     143                      t.join(max(endtime - time.monotonic(), 0.01))
     144                  started = [t for t in started if t.is_alive()]
     145                  if not started:
     146                      break
     147                  if support.verbose:
     148                      print('Unable to join %d threads during a period of '
     149                            '%d minutes' % (len(started), timeout))
     150          finally:
     151              started = [t for t in started if t.is_alive()]
     152              if started:
     153                  faulthandler.dump_traceback(sys.stdout)
     154                  raise AssertionError('Unable to join %d threads' % len(started))
     155  
     156  
     157  class ESC[4;38;5;81mcatch_threading_exception:
     158      """
     159      Context manager catching threading.Thread exception using
     160      threading.excepthook.
     161  
     162      Attributes set when an exception is caught:
     163  
     164      * exc_type
     165      * exc_value
     166      * exc_traceback
     167      * thread
     168  
     169      See threading.excepthook() documentation for these attributes.
     170  
     171      These attributes are deleted at the context manager exit.
     172  
     173      Usage:
     174  
     175          with threading_helper.catch_threading_exception() as cm:
     176              # code spawning a thread which raises an exception
     177              ...
     178  
     179              # check the thread exception, use cm attributes:
     180              # exc_type, exc_value, exc_traceback, thread
     181              ...
     182  
     183          # exc_type, exc_value, exc_traceback, thread attributes of cm no longer
     184          # exists at this point
     185          # (to avoid reference cycles)
     186      """
     187  
     188      def __init__(self):
     189          self.exc_type = None
     190          self.exc_value = None
     191          self.exc_traceback = None
     192          self.thread = None
     193          self._old_hook = None
     194  
     195      def _hook(self, args):
     196          self.exc_type = args.exc_type
     197          self.exc_value = args.exc_value
     198          self.exc_traceback = args.exc_traceback
     199          self.thread = args.thread
     200  
     201      def __enter__(self):
     202          self._old_hook = threading.excepthook
     203          threading.excepthook = self._hook
     204          return self
     205  
     206      def __exit__(self, *exc_info):
     207          threading.excepthook = self._old_hook
     208          del self.exc_type
     209          del self.exc_value
     210          del self.exc_traceback
     211          del self.thread
     212  
     213  
     214  def _can_start_thread() -> bool:
     215      """Detect whether Python can start new threads.
     216  
     217      Some WebAssembly platforms do not provide a working pthread
     218      implementation. Thread support is stubbed and any attempt
     219      to create a new thread fails.
     220  
     221      - wasm32-wasi does not have threading.
     222      - wasm32-emscripten can be compiled with or without pthread
     223        support (-s USE_PTHREADS / __EMSCRIPTEN_PTHREADS__).
     224      """
     225      if sys.platform == "emscripten":
     226          return sys._emscripten_info.pthreads
     227      elif sys.platform == "wasi":
     228          return False
     229      else:
     230          # assume all other platforms have working thread support.
     231          return True
     232  
     233  can_start_thread = _can_start_thread()
     234  
     235  def requires_working_threading(*, module=False):
     236      """Skip tests or modules that require working threading.
     237  
     238      Can be used as a function/class decorator or to skip an entire module.
     239      """
     240      msg = "requires threading support"
     241      if module:
     242          if not can_start_thread:
     243              raise unittest.SkipTest(msg)
     244      else:
     245          return unittest.skipUnless(can_start_thread, msg)