(root)/
Python-3.12.0/
Lib/
test/
support/
threading_helper.py
       1  import _thread
       2  import contextlib
       3  import functools
       4  import sys
       5  import threading
       6  import time
       7  import unittest
       8  
       9  from test import support
      10  
      11  
      12  #=======================================================================
      13  # Threading support to prevent reporting refleaks when running regrtest.py -R
      14  
      15  # NOTE: we use thread._count() rather than threading.enumerate() (or the
      16  # moral equivalent thereof) because a threading.Thread object is still alive
      17  # until its __bootstrap() method has returned, even after it has been
      18  # unregistered from the threading module.
      19  # thread._count(), on the other hand, only gets decremented *after* the
      20  # __bootstrap() method has returned, which gives us reliable reference counts
      21  # at the end of a test run.
      22  
      23  
      24  def threading_setup():
      25      return _thread._count(), threading._dangling.copy()
      26  
      27  
      28  def threading_cleanup(*original_values):
      29      _MAX_COUNT = 100
      30  
      31      for count in range(_MAX_COUNT):
      32          values = _thread._count(), threading._dangling
      33          if values == original_values:
      34              break
      35  
      36          if not count:
      37              # Display a warning at the first iteration
      38              support.environment_altered = True
      39              dangling_threads = values[1]
      40              support.print_warning(f"threading_cleanup() failed to cleanup "
      41                                    f"{values[0] - original_values[0]} threads "
      42                                    f"(count: {values[0]}, "
      43                                    f"dangling: {len(dangling_threads)})")
      44              for thread in dangling_threads:
      45                  support.print_warning(f"Dangling thread: {thread!r}")
      46  
      47              # Don't hold references to threads
      48              dangling_threads = None
      49          values = None
      50  
      51          time.sleep(0.01)
      52          support.gc_collect()
      53  
      54  
      55  def reap_threads(func):
      56      """Use this function when threads are being used.  This will
      57      ensure that the threads are cleaned up even when the test fails.
      58      """
      59      @functools.wraps(func)
      60      def decorator(*args):
      61          key = threading_setup()
      62          try:
      63              return func(*args)
      64          finally:
      65              threading_cleanup(*key)
      66      return decorator
      67  
      68  
      69  @contextlib.contextmanager
      70  def wait_threads_exit(timeout=None):
      71      """
      72      bpo-31234: Context manager to wait until all threads created in the with
      73      statement exit.
      74  
      75      Use _thread.count() to check if threads exited. Indirectly, wait until
      76      threads exit the internal t_bootstrap() C function of the _thread module.
      77  
      78      threading_setup() and threading_cleanup() are designed to emit a warning
      79      if a test leaves running threads in the background. This context manager
      80      is designed to cleanup threads started by the _thread.start_new_thread()
      81      which doesn't allow to wait for thread exit, whereas thread.Thread has a
      82      join() method.
      83      """
      84      if timeout is None:
      85          timeout = support.SHORT_TIMEOUT
      86      old_count = _thread._count()
      87      try:
      88          yield
      89      finally:
      90          start_time = time.monotonic()
      91          for _ in support.sleeping_retry(timeout, error=False):
      92              support.gc_collect()
      93              count = _thread._count()
      94              if count <= old_count:
      95                  break
      96          else:
      97              dt = time.monotonic() - start_time
      98              msg = (f"wait_threads() failed to cleanup {count - old_count} "
      99                     f"threads after {dt:.1f} seconds "
     100                     f"(count: {count}, old count: {old_count})")
     101              raise AssertionError(msg)
     102  
     103  
     104  def join_thread(thread, timeout=None):
     105      """Join a thread. Raise an AssertionError if the thread is still alive
     106      after timeout seconds.
     107      """
     108      if timeout is None:
     109          timeout = support.SHORT_TIMEOUT
     110      thread.join(timeout)
     111      if thread.is_alive():
     112          msg = f"failed to join the thread in {timeout:.1f} seconds"
     113          raise AssertionError(msg)
     114  
     115  
     116  @contextlib.contextmanager
     117  def start_threads(threads, unlock=None):
     118      try:
     119          import faulthandler
     120      except ImportError:
     121          # It isn't supported on subinterpreters yet.
     122          faulthandler = None
     123      threads = list(threads)
     124      started = []
     125      try:
     126          try:
     127              for t in threads:
     128                  t.start()
     129                  started.append(t)
     130          except:
     131              if support.verbose:
     132                  print("Can't start %d threads, only %d threads started" %
     133                        (len(threads), len(started)))
     134              raise
     135          yield
     136      finally:
     137          try:
     138              if unlock:
     139                  unlock()
     140              endtime = time.monotonic()
     141              for timeout in range(1, 16):
     142                  endtime += 60
     143                  for t in started:
     144                      t.join(max(endtime - time.monotonic(), 0.01))
     145                  started = [t for t in started if t.is_alive()]
     146                  if not started:
     147                      break
     148                  if support.verbose:
     149                      print('Unable to join %d threads during a period of '
     150                            '%d minutes' % (len(started), timeout))
     151          finally:
     152              started = [t for t in started if t.is_alive()]
     153              if started:
     154                  if faulthandler is not None:
     155                      faulthandler.dump_traceback(sys.stdout)
     156                  raise AssertionError('Unable to join %d threads' % len(started))
     157  
     158  
     159  class ESC[4;38;5;81mcatch_threading_exception:
     160      """
     161      Context manager catching threading.Thread exception using
     162      threading.excepthook.
     163  
     164      Attributes set when an exception is caught:
     165  
     166      * exc_type
     167      * exc_value
     168      * exc_traceback
     169      * thread
     170  
     171      See threading.excepthook() documentation for these attributes.
     172  
     173      These attributes are deleted at the context manager exit.
     174  
     175      Usage:
     176  
     177          with threading_helper.catch_threading_exception() as cm:
     178              # code spawning a thread which raises an exception
     179              ...
     180  
     181              # check the thread exception, use cm attributes:
     182              # exc_type, exc_value, exc_traceback, thread
     183              ...
     184  
     185          # exc_type, exc_value, exc_traceback, thread attributes of cm no longer
     186          # exists at this point
     187          # (to avoid reference cycles)
     188      """
     189  
     190      def __init__(self):
     191          self.exc_type = None
     192          self.exc_value = None
     193          self.exc_traceback = None
     194          self.thread = None
     195          self._old_hook = None
     196  
     197      def _hook(self, args):
     198          self.exc_type = args.exc_type
     199          self.exc_value = args.exc_value
     200          self.exc_traceback = args.exc_traceback
     201          self.thread = args.thread
     202  
     203      def __enter__(self):
     204          self._old_hook = threading.excepthook
     205          threading.excepthook = self._hook
     206          return self
     207  
     208      def __exit__(self, *exc_info):
     209          threading.excepthook = self._old_hook
     210          del self.exc_type
     211          del self.exc_value
     212          del self.exc_traceback
     213          del self.thread
     214  
     215  
     216  def _can_start_thread() -> bool:
     217      """Detect whether Python can start new threads.
     218  
     219      Some WebAssembly platforms do not provide a working pthread
     220      implementation. Thread support is stubbed and any attempt
     221      to create a new thread fails.
     222  
     223      - wasm32-wasi does not have threading.
     224      - wasm32-emscripten can be compiled with or without pthread
     225        support (-s USE_PTHREADS / __EMSCRIPTEN_PTHREADS__).
     226      """
     227      if sys.platform == "emscripten":
     228          return sys._emscripten_info.pthreads
     229      elif sys.platform == "wasi":
     230          return False
     231      else:
     232          # assume all other platforms have working thread support.
     233          return True
     234  
     235  can_start_thread = _can_start_thread()
     236  
     237  def requires_working_threading(*, module=False):
     238      """Skip tests or modules that require working threading.
     239  
     240      Can be used as a function/class decorator or to skip an entire module.
     241      """
     242      msg = "requires threading support"
     243      if module:
     244          if not can_start_thread:
     245              raise unittest.SkipTest(msg)
     246      else:
     247          return unittest.skipUnless(can_start_thread, msg)