(root)/
Python-3.11.7/
Lib/
test/
libregrtest/
single.py
       1  import faulthandler
       2  import gc
       3  import importlib
       4  import io
       5  import sys
       6  import time
       7  import traceback
       8  import unittest
       9  
      10  from test import support
      11  from test.support import threading_helper
      12  
      13  from .filter import match_test
      14  from .result import State, TestResult, TestStats
      15  from .runtests import RunTests
      16  from .save_env import saved_test_environment
      17  from .setup import setup_tests
      18  from .testresult import get_test_runner
      19  from .utils import (
      20      TestName,
      21      clear_caches, remove_testfn, abs_module_name, print_warning)
      22  
      23  
      24  # Minimum duration of a test to display its duration or to mention that
      25  # the test is running in background
      26  PROGRESS_MIN_TIME = 30.0   # seconds
      27  
      28  
      29  def run_unittest(test_mod):
      30      loader = unittest.TestLoader()
      31      tests = loader.loadTestsFromModule(test_mod)
      32      for error in loader.errors:
      33          print(error, file=sys.stderr)
      34      if loader.errors:
      35          raise Exception("errors while loading tests")
      36      _filter_suite(tests, match_test)
      37      return _run_suite(tests)
      38  
      39  def _filter_suite(suite, pred):
      40      """Recursively filter test cases in a suite based on a predicate."""
      41      newtests = []
      42      for test in suite._tests:
      43          if isinstance(test, unittest.TestSuite):
      44              _filter_suite(test, pred)
      45              newtests.append(test)
      46          else:
      47              if pred(test):
      48                  newtests.append(test)
      49      suite._tests = newtests
      50  
      51  def _run_suite(suite):
      52      """Run tests from a unittest.TestSuite-derived class."""
      53      runner = get_test_runner(sys.stdout,
      54                               verbosity=support.verbose,
      55                               capture_output=(support.junit_xml_list is not None))
      56  
      57      result = runner.run(suite)
      58  
      59      if support.junit_xml_list is not None:
      60          support.junit_xml_list.append(result.get_xml_element())
      61  
      62      if not result.testsRun and not result.skipped and not result.errors:
      63          raise support.TestDidNotRun
      64      if not result.wasSuccessful():
      65          stats = TestStats.from_unittest(result)
      66          if len(result.errors) == 1 and not result.failures:
      67              err = result.errors[0][1]
      68          elif len(result.failures) == 1 and not result.errors:
      69              err = result.failures[0][1]
      70          else:
      71              err = "multiple errors occurred"
      72              if not support.verbose: err += "; run in verbose mode for details"
      73          errors = [(str(tc), exc_str) for tc, exc_str in result.errors]
      74          failures = [(str(tc), exc_str) for tc, exc_str in result.failures]
      75          raise support.TestFailedWithDetails(err, errors, failures, stats=stats)
      76      return result
      77  
      78  
      79  def regrtest_runner(result: TestResult, test_func, runtests: RunTests) -> None:
      80      # Run test_func(), collect statistics, and detect reference and memory
      81      # leaks.
      82      if runtests.hunt_refleak:
      83          from .refleak import runtest_refleak
      84          refleak, test_result = runtest_refleak(result.test_name, test_func,
      85                                                 runtests.hunt_refleak,
      86                                                 runtests.quiet)
      87      else:
      88          test_result = test_func()
      89          refleak = False
      90  
      91      if refleak:
      92          result.state = State.REFLEAK
      93  
      94      stats: TestStats | None
      95  
      96      match test_result:
      97          case TestStats():
      98              stats = test_result
      99          case unittest.TestResult():
     100              stats = TestStats.from_unittest(test_result)
     101          case None:
     102              print_warning(f"{result.test_name} test runner returned None: {test_func}")
     103              stats = None
     104          case _:
     105              # Don't import doctest at top level since only few tests return
     106              # a doctest.TestResult instance.
     107              import doctest
     108              if isinstance(test_result, doctest.TestResults):
     109                  stats = TestStats.from_doctest(test_result)
     110              else:
     111                  print_warning(f"Unknown test result type: {type(test_result)}")
     112                  stats = None
     113  
     114      result.stats = stats
     115  
     116  
     117  # Storage of uncollectable GC objects (gc.garbage)
     118  GC_GARBAGE = []
     119  
     120  
     121  def _load_run_test(result: TestResult, runtests: RunTests) -> None:
     122      # Load the test module and run the tests.
     123      test_name = result.test_name
     124      module_name = abs_module_name(test_name, runtests.test_dir)
     125      test_mod = importlib.import_module(module_name)
     126  
     127      if hasattr(test_mod, "test_main"):
     128          # https://github.com/python/cpython/issues/89392
     129          raise Exception(f"Module {test_name} defines test_main() which "
     130                          f"is no longer supported by regrtest")
     131      def test_func():
     132          return run_unittest(test_mod)
     133  
     134      try:
     135          regrtest_runner(result, test_func, runtests)
     136      finally:
     137          # First kill any dangling references to open files etc.
     138          # This can also issue some ResourceWarnings which would otherwise get
     139          # triggered during the following test run, and possibly produce
     140          # failures.
     141          support.gc_collect()
     142  
     143          remove_testfn(test_name, runtests.verbose)
     144  
     145      if gc.garbage:
     146          support.environment_altered = True
     147          print_warning(f"{test_name} created {len(gc.garbage)} "
     148                        f"uncollectable object(s)")
     149  
     150          # move the uncollectable objects somewhere,
     151          # so we don't see them again
     152          GC_GARBAGE.extend(gc.garbage)
     153          gc.garbage.clear()
     154  
     155      support.reap_children()
     156  
     157  
     158  def _runtest_env_changed_exc(result: TestResult, runtests: RunTests,
     159                               display_failure: bool = True) -> None:
     160      # Handle exceptions, detect environment changes.
     161  
     162      # Reset the environment_altered flag to detect if a test altered
     163      # the environment
     164      support.environment_altered = False
     165  
     166      pgo = runtests.pgo
     167      if pgo:
     168          display_failure = False
     169      quiet = runtests.quiet
     170  
     171      test_name = result.test_name
     172      try:
     173          clear_caches()
     174          support.gc_collect()
     175  
     176          with saved_test_environment(test_name,
     177                                      runtests.verbose, quiet, pgo=pgo):
     178              _load_run_test(result, runtests)
     179      except support.ResourceDenied as exc:
     180          if not quiet and not pgo:
     181              print(f"{test_name} skipped -- {exc}", flush=True)
     182          result.state = State.RESOURCE_DENIED
     183          return
     184      except unittest.SkipTest as exc:
     185          if not quiet and not pgo:
     186              print(f"{test_name} skipped -- {exc}", flush=True)
     187          result.state = State.SKIPPED
     188          return
     189      except support.TestFailedWithDetails as exc:
     190          msg = f"test {test_name} failed"
     191          if display_failure:
     192              msg = f"{msg} -- {exc}"
     193          print(msg, file=sys.stderr, flush=True)
     194          result.state = State.FAILED
     195          result.errors = exc.errors
     196          result.failures = exc.failures
     197          result.stats = exc.stats
     198          return
     199      except support.TestFailed as exc:
     200          msg = f"test {test_name} failed"
     201          if display_failure:
     202              msg = f"{msg} -- {exc}"
     203          print(msg, file=sys.stderr, flush=True)
     204          result.state = State.FAILED
     205          result.stats = exc.stats
     206          return
     207      except support.TestDidNotRun:
     208          result.state = State.DID_NOT_RUN
     209          return
     210      except KeyboardInterrupt:
     211          print()
     212          result.state = State.INTERRUPTED
     213          return
     214      except:
     215          if not pgo:
     216              msg = traceback.format_exc()
     217              print(f"test {test_name} crashed -- {msg}",
     218                    file=sys.stderr, flush=True)
     219          result.state = State.UNCAUGHT_EXC
     220          return
     221  
     222      if support.environment_altered:
     223          result.set_env_changed()
     224      # Don't override the state if it was already set (REFLEAK or ENV_CHANGED)
     225      if result.state is None:
     226          result.state = State.PASSED
     227  
     228  
     229  def _runtest(result: TestResult, runtests: RunTests) -> None:
     230      # Capture stdout and stderr, set faulthandler timeout,
     231      # and create JUnit XML report.
     232      verbose = runtests.verbose
     233      output_on_failure = runtests.output_on_failure
     234      timeout = runtests.timeout
     235  
     236      if timeout is not None and threading_helper.can_start_thread:
     237          use_timeout = True
     238          faulthandler.dump_traceback_later(timeout, exit=True)
     239      else:
     240          use_timeout = False
     241  
     242      try:
     243          setup_tests(runtests)
     244  
     245          if output_on_failure:
     246              support.verbose = True
     247  
     248              stream = io.StringIO()
     249              orig_stdout = sys.stdout
     250              orig_stderr = sys.stderr
     251              print_warning = support.print_warning
     252              orig_print_warnings_stderr = print_warning.orig_stderr
     253  
     254              output = None
     255              try:
     256                  sys.stdout = stream
     257                  sys.stderr = stream
     258                  # print_warning() writes into the temporary stream to preserve
     259                  # messages order. If support.environment_altered becomes true,
     260                  # warnings will be written to sys.stderr below.
     261                  print_warning.orig_stderr = stream
     262  
     263                  _runtest_env_changed_exc(result, runtests, display_failure=False)
     264                  # Ignore output if the test passed successfully
     265                  if result.state != State.PASSED:
     266                      output = stream.getvalue()
     267              finally:
     268                  sys.stdout = orig_stdout
     269                  sys.stderr = orig_stderr
     270                  print_warning.orig_stderr = orig_print_warnings_stderr
     271  
     272              if output is not None:
     273                  sys.stderr.write(output)
     274                  sys.stderr.flush()
     275          else:
     276              # Tell tests to be moderately quiet
     277              support.verbose = verbose
     278              _runtest_env_changed_exc(result, runtests,
     279                                       display_failure=not verbose)
     280  
     281          xml_list = support.junit_xml_list
     282          if xml_list:
     283              import xml.etree.ElementTree as ET
     284              result.xml_data = [ET.tostring(x).decode('us-ascii')
     285                                 for x in xml_list]
     286      finally:
     287          if use_timeout:
     288              faulthandler.cancel_dump_traceback_later()
     289          support.junit_xml_list = None
     290  
     291  
     292  def run_single_test(test_name: TestName, runtests: RunTests) -> TestResult:
     293      """Run a single test.
     294  
     295      test_name -- the name of the test
     296  
     297      Returns a TestResult.
     298  
     299      If runtests.use_junit, xml_data is a list containing each generated
     300      testsuite element.
     301      """
     302      start_time = time.perf_counter()
     303      result = TestResult(test_name)
     304      pgo = runtests.pgo
     305      try:
     306          _runtest(result, runtests)
     307      except:
     308          if not pgo:
     309              msg = traceback.format_exc()
     310              print(f"test {test_name} crashed -- {msg}",
     311                    file=sys.stderr, flush=True)
     312          result.state = State.UNCAUGHT_EXC
     313  
     314      sys.stdout.flush()
     315      sys.stderr.flush()
     316  
     317      result.duration = time.perf_counter() - start_time
     318      return result