(root)/
Python-3.12.0/
Lib/
test/
libregrtest/
runtest.py
       1  import dataclasses
       2  import doctest
       3  import faulthandler
       4  import gc
       5  import importlib
       6  import io
       7  import os
       8  import sys
       9  import time
      10  import traceback
      11  import unittest
      12  
      13  from test import support
      14  from test.support import TestStats
      15  from test.support import os_helper
      16  from test.support import threading_helper
      17  from test.libregrtest.cmdline import Namespace
      18  from test.libregrtest.save_env import saved_test_environment
      19  from test.libregrtest.utils import clear_caches, format_duration, print_warning
      20  
      21  
      22  MatchTests = list[str]
      23  MatchTestsDict = dict[str, MatchTests]
      24  
      25  
      26  # Avoid enum.Enum to reduce the number of imports when tests are run
      27  class ESC[4;38;5;81mState:
      28      PASSED = "PASSED"
      29      FAILED = "FAILED"
      30      SKIPPED = "SKIPPED"
      31      UNCAUGHT_EXC = "UNCAUGHT_EXC"
      32      REFLEAK = "REFLEAK"
      33      ENV_CHANGED = "ENV_CHANGED"
      34      RESOURCE_DENIED = "RESOURCE_DENIED"
      35      INTERRUPTED = "INTERRUPTED"
      36      MULTIPROCESSING_ERROR = "MULTIPROCESSING_ERROR"
      37      DID_NOT_RUN = "DID_NOT_RUN"
      38      TIMEOUT = "TIMEOUT"
      39  
      40      @staticmethod
      41      def is_failed(state):
      42          return state in {
      43              State.FAILED,
      44              State.UNCAUGHT_EXC,
      45              State.REFLEAK,
      46              State.MULTIPROCESSING_ERROR,
      47              State.TIMEOUT}
      48  
      49      @staticmethod
      50      def has_meaningful_duration(state):
      51          # Consider that the duration is meaningless for these cases.
      52          # For example, if a whole test file is skipped, its duration
      53          # is unlikely to be the duration of executing its tests,
      54          # but just the duration to execute code which skips the test.
      55          return state not in {
      56              State.SKIPPED,
      57              State.RESOURCE_DENIED,
      58              State.INTERRUPTED,
      59              State.MULTIPROCESSING_ERROR,
      60              State.DID_NOT_RUN}
      61  
      62      @staticmethod
      63      def must_stop(state):
      64          return state in {
      65              State.INTERRUPTED,
      66              State.MULTIPROCESSING_ERROR}
      67  
      68  
      69  # gh-90681: When rerunning tests, we might need to rerun the whole
      70  # class or module suite if some its life-cycle hooks fail.
      71  # Test level hooks are not affected.
      72  _TEST_LIFECYCLE_HOOKS = frozenset((
      73      'setUpClass', 'tearDownClass',
      74      'setUpModule', 'tearDownModule',
      75  ))
      76  
      77  def normalize_test_name(test_full_name, *, is_error=False):
      78      short_name = test_full_name.split(" ")[0]
      79      if is_error and short_name in _TEST_LIFECYCLE_HOOKS:
      80          if test_full_name.startswith(('setUpModule (', 'tearDownModule (')):
      81              # if setUpModule() or tearDownModule() failed, don't filter
      82              # tests with the test file name, don't use use filters.
      83              return None
      84  
      85          # This means that we have a failure in a life-cycle hook,
      86          # we need to rerun the whole module or class suite.
      87          # Basically the error looks like this:
      88          #    ERROR: setUpClass (test.test_reg_ex.RegTest)
      89          # or
      90          #    ERROR: setUpModule (test.test_reg_ex)
      91          # So, we need to parse the class / module name.
      92          lpar = test_full_name.index('(')
      93          rpar = test_full_name.index(')')
      94          return test_full_name[lpar + 1: rpar].split('.')[-1]
      95      return short_name
      96  
      97  
      98  @dataclasses.dataclass(slots=True)
      99  class ESC[4;38;5;81mTestResult:
     100      test_name: str
     101      state: str | None = None
     102      # Test duration in seconds
     103      duration: float | None = None
     104      xml_data: list[str] | None = None
     105      stats: TestStats | None = None
     106  
     107      # errors and failures copied from support.TestFailedWithDetails
     108      errors: list[tuple[str, str]] | None = None
     109      failures: list[tuple[str, str]] | None = None
     110  
     111      def is_failed(self, fail_env_changed: bool) -> bool:
     112          if self.state == State.ENV_CHANGED:
     113              return fail_env_changed
     114          return State.is_failed(self.state)
     115  
     116      def _format_failed(self):
     117          if self.errors and self.failures:
     118              le = len(self.errors)
     119              lf = len(self.failures)
     120              error_s = "error" + ("s" if le > 1 else "")
     121              failure_s = "failure" + ("s" if lf > 1 else "")
     122              return f"{self.test_name} failed ({le} {error_s}, {lf} {failure_s})"
     123  
     124          if self.errors:
     125              le = len(self.errors)
     126              error_s = "error" + ("s" if le > 1 else "")
     127              return f"{self.test_name} failed ({le} {error_s})"
     128  
     129          if self.failures:
     130              lf = len(self.failures)
     131              failure_s = "failure" + ("s" if lf > 1 else "")
     132              return f"{self.test_name} failed ({lf} {failure_s})"
     133  
     134          return f"{self.test_name} failed"
     135  
     136      def __str__(self) -> str:
     137          match self.state:
     138              case State.PASSED:
     139                  return f"{self.test_name} passed"
     140              case State.FAILED:
     141                  return self._format_failed()
     142              case State.SKIPPED:
     143                  return f"{self.test_name} skipped"
     144              case State.UNCAUGHT_EXC:
     145                  return f"{self.test_name} failed (uncaught exception)"
     146              case State.REFLEAK:
     147                  return f"{self.test_name} failed (reference leak)"
     148              case State.ENV_CHANGED:
     149                  return f"{self.test_name} failed (env changed)"
     150              case State.RESOURCE_DENIED:
     151                  return f"{self.test_name} skipped (resource denied)"
     152              case State.INTERRUPTED:
     153                  return f"{self.test_name} interrupted"
     154              case State.MULTIPROCESSING_ERROR:
     155                  return f"{self.test_name} process crashed"
     156              case State.DID_NOT_RUN:
     157                  return f"{self.test_name} ran no tests"
     158              case State.TIMEOUT:
     159                  return f"{self.test_name} timed out ({format_duration(self.duration)})"
     160              case _:
     161                  raise ValueError("unknown result state: {state!r}")
     162  
     163      def has_meaningful_duration(self):
     164          return State.has_meaningful_duration(self.state)
     165  
     166      def set_env_changed(self):
     167          if self.state is None or self.state == State.PASSED:
     168              self.state = State.ENV_CHANGED
     169  
     170      def must_stop(self, fail_fast: bool, fail_env_changed: bool) -> bool:
     171          if State.must_stop(self.state):
     172              return True
     173          if fail_fast and self.is_failed(fail_env_changed):
     174              return True
     175          return False
     176  
     177      def get_rerun_match_tests(self):
     178          match_tests = []
     179  
     180          errors = self.errors or []
     181          failures = self.failures or []
     182          for error_list, is_error in (
     183              (errors, True),
     184              (failures, False),
     185          ):
     186              for full_name, *_ in error_list:
     187                  match_name = normalize_test_name(full_name, is_error=is_error)
     188                  if match_name is None:
     189                      # 'setUpModule (test.test_sys)': don't filter tests
     190                      return None
     191                  if not match_name:
     192                      error_type = "ERROR" if is_error else "FAIL"
     193                      print_warning(f"rerun failed to parse {error_type} test name: "
     194                                    f"{full_name!r}: don't filter tests")
     195                      return None
     196                  match_tests.append(match_name)
     197  
     198          return match_tests
     199  
     200  
     201  @dataclasses.dataclass(slots=True, frozen=True)
     202  class ESC[4;38;5;81mRunTests:
     203      tests: list[str]
     204      match_tests: MatchTestsDict | None = None
     205      rerun: bool = False
     206      forever: bool = False
     207  
     208      def get_match_tests(self, test_name) -> MatchTests | None:
     209          if self.match_tests is not None:
     210              return self.match_tests.get(test_name, None)
     211          else:
     212              return None
     213  
     214      def iter_tests(self):
     215          tests = tuple(self.tests)
     216          if self.forever:
     217              while True:
     218                  yield from tests
     219          else:
     220              yield from tests
     221  
     222  
     223  # Minimum duration of a test to display its duration or to mention that
     224  # the test is running in background
     225  PROGRESS_MIN_TIME = 30.0   # seconds
     226  
     227  #If these test directories are encountered recurse into them and treat each
     228  # test_ .py or dir as a separate test module. This can increase parallelism.
     229  # Beware this can't generally be done for any directory with sub-tests as the
     230  # __init__.py may do things which alter what tests are to be run.
     231  
     232  SPLITTESTDIRS = {
     233      "test_asyncio",
     234      "test_concurrent_futures",
     235      "test_multiprocessing_fork",
     236      "test_multiprocessing_forkserver",
     237      "test_multiprocessing_spawn",
     238  }
     239  
     240  
     241  def findtestdir(path=None):
     242      return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
     243  
     244  
     245  def findtests(*, testdir=None, exclude=(),
     246                split_test_dirs=SPLITTESTDIRS, base_mod=""):
     247      """Return a list of all applicable test modules."""
     248      testdir = findtestdir(testdir)
     249      tests = []
     250      for name in os.listdir(testdir):
     251          mod, ext = os.path.splitext(name)
     252          if (not mod.startswith("test_")) or (mod in exclude):
     253              continue
     254          if mod in split_test_dirs:
     255              subdir = os.path.join(testdir, mod)
     256              mod = f"{base_mod or 'test'}.{mod}"
     257              tests.extend(findtests(testdir=subdir, exclude=exclude,
     258                                     split_test_dirs=split_test_dirs, base_mod=mod))
     259          elif ext in (".py", ""):
     260              tests.append(f"{base_mod}.{mod}" if base_mod else mod)
     261      return sorted(tests)
     262  
     263  
     264  def split_test_packages(tests, *, testdir=None, exclude=(),
     265                          split_test_dirs=SPLITTESTDIRS):
     266      testdir = findtestdir(testdir)
     267      splitted = []
     268      for name in tests:
     269          if name in split_test_dirs:
     270              subdir = os.path.join(testdir, name)
     271              splitted.extend(findtests(testdir=subdir, exclude=exclude,
     272                                        split_test_dirs=split_test_dirs,
     273                                        base_mod=name))
     274          else:
     275              splitted.append(name)
     276      return splitted
     277  
     278  
     279  def abs_module_name(test_name: str, test_dir: str | None) -> str:
     280      if test_name.startswith('test.') or test_dir:
     281          return test_name
     282      else:
     283          # Import it from the test package
     284          return 'test.' + test_name
     285  
     286  
     287  def setup_support(ns: Namespace):
     288      support.PGO = ns.pgo
     289      support.PGO_EXTENDED = ns.pgo_extended
     290      support.set_match_tests(ns.match_tests, ns.ignore_tests)
     291      support.failfast = ns.failfast
     292      support.verbose = ns.verbose
     293      if ns.xmlpath:
     294          support.junit_xml_list = []
     295      else:
     296          support.junit_xml_list = None
     297  
     298  
     299  def _runtest(result: TestResult, ns: Namespace) -> None:
     300      # Capture stdout and stderr, set faulthandler timeout,
     301      # and create JUnit XML report.
     302      verbose = ns.verbose
     303      output_on_failure = ns.verbose3
     304      timeout = ns.timeout
     305  
     306      use_timeout = (
     307          timeout is not None and threading_helper.can_start_thread
     308      )
     309      if use_timeout:
     310          faulthandler.dump_traceback_later(timeout, exit=True)
     311  
     312      try:
     313          setup_support(ns)
     314  
     315          if output_on_failure:
     316              support.verbose = True
     317  
     318              stream = io.StringIO()
     319              orig_stdout = sys.stdout
     320              orig_stderr = sys.stderr
     321              print_warning = support.print_warning
     322              orig_print_warnings_stderr = print_warning.orig_stderr
     323  
     324              output = None
     325              try:
     326                  sys.stdout = stream
     327                  sys.stderr = stream
     328                  # print_warning() writes into the temporary stream to preserve
     329                  # messages order. If support.environment_altered becomes true,
     330                  # warnings will be written to sys.stderr below.
     331                  print_warning.orig_stderr = stream
     332  
     333                  _runtest_env_changed_exc(result, ns, display_failure=False)
     334                  # Ignore output if the test passed successfully
     335                  if result.state != State.PASSED:
     336                      output = stream.getvalue()
     337              finally:
     338                  sys.stdout = orig_stdout
     339                  sys.stderr = orig_stderr
     340                  print_warning.orig_stderr = orig_print_warnings_stderr
     341  
     342              if output is not None:
     343                  sys.stderr.write(output)
     344                  sys.stderr.flush()
     345          else:
     346              # Tell tests to be moderately quiet
     347              support.verbose = verbose
     348              _runtest_env_changed_exc(result, ns, display_failure=not verbose)
     349  
     350          xml_list = support.junit_xml_list
     351          if xml_list:
     352              import xml.etree.ElementTree as ET
     353              result.xml_data = [ET.tostring(x).decode('us-ascii')
     354                                 for x in xml_list]
     355      finally:
     356          if use_timeout:
     357              faulthandler.cancel_dump_traceback_later()
     358          support.junit_xml_list = None
     359  
     360  
     361  def runtest(ns: Namespace, test_name: str) -> TestResult:
     362      """Run a single test.
     363  
     364      ns -- regrtest namespace of options
     365      test_name -- the name of the test
     366  
     367      Returns a TestResult.
     368  
     369      If ns.xmlpath is not None, xml_data is a list containing each
     370      generated testsuite element.
     371      """
     372      start_time = time.perf_counter()
     373      result = TestResult(test_name)
     374      try:
     375          _runtest(result, ns)
     376      except:
     377          if not ns.pgo:
     378              msg = traceback.format_exc()
     379              print(f"test {test_name} crashed -- {msg}",
     380                    file=sys.stderr, flush=True)
     381          result.state = State.UNCAUGHT_EXC
     382      result.duration = time.perf_counter() - start_time
     383      return result
     384  
     385  
     386  def run_unittest(test_mod):
     387      loader = unittest.TestLoader()
     388      tests = loader.loadTestsFromModule(test_mod)
     389      for error in loader.errors:
     390          print(error, file=sys.stderr)
     391      if loader.errors:
     392          raise Exception("errors while loading tests")
     393      return support.run_unittest(tests)
     394  
     395  
     396  def save_env(ns: Namespace, test_name: str):
     397      return saved_test_environment(test_name, ns.verbose, ns.quiet, pgo=ns.pgo)
     398  
     399  
     400  def regrtest_runner(result, test_func, ns) -> None:
     401      # Run test_func(), collect statistics, and detect reference and memory
     402      # leaks.
     403      if ns.huntrleaks:
     404          from test.libregrtest.refleak import dash_R
     405          refleak, test_result = dash_R(ns, result.test_name, test_func)
     406      else:
     407          test_result = test_func()
     408          refleak = False
     409  
     410      if refleak:
     411          result.state = State.REFLEAK
     412  
     413      match test_result:
     414          case TestStats():
     415              stats = test_result
     416          case unittest.TestResult():
     417              stats = TestStats.from_unittest(test_result)
     418          case doctest.TestResults():
     419              stats = TestStats.from_doctest(test_result)
     420          case None:
     421              print_warning(f"{result.test_name} test runner returned None: {test_func}")
     422              stats = None
     423          case _:
     424              print_warning(f"Unknown test result type: {type(test_result)}")
     425              stats = None
     426  
     427      result.stats = stats
     428  
     429  
     430  # Storage of uncollectable objects
     431  FOUND_GARBAGE = []
     432  
     433  
     434  def _load_run_test(result: TestResult, ns: Namespace) -> None:
     435      # Load the test function, run the test function.
     436      module_name = abs_module_name(result.test_name, ns.testdir)
     437  
     438      # Remove the module from sys.module to reload it if it was already imported
     439      sys.modules.pop(module_name, None)
     440  
     441      test_mod = importlib.import_module(module_name)
     442  
     443      if hasattr(test_mod, "test_main"):
     444          # https://github.com/python/cpython/issues/89392
     445          raise Exception(f"Module {result.test_name} defines test_main() which is no longer supported by regrtest")
     446      def test_func():
     447          return run_unittest(test_mod)
     448  
     449      try:
     450          with save_env(ns, result.test_name):
     451              regrtest_runner(result, test_func, ns)
     452      finally:
     453          # First kill any dangling references to open files etc.
     454          # This can also issue some ResourceWarnings which would otherwise get
     455          # triggered during the following test run, and possibly produce
     456          # failures.
     457          support.gc_collect()
     458  
     459          remove_testfn(result.test_name, ns.verbose)
     460  
     461      if gc.garbage:
     462          support.environment_altered = True
     463          print_warning(f"{result.test_name} created {len(gc.garbage)} "
     464                        f"uncollectable object(s)")
     465  
     466          # move the uncollectable objects somewhere,
     467          # so we don't see them again
     468          FOUND_GARBAGE.extend(gc.garbage)
     469          gc.garbage.clear()
     470  
     471      support.reap_children()
     472  
     473  
     474  def _runtest_env_changed_exc(result: TestResult, ns: Namespace,
     475                               display_failure: bool = True) -> None:
     476      # Detect environment changes, handle exceptions.
     477  
     478      # Reset the environment_altered flag to detect if a test altered
     479      # the environment
     480      support.environment_altered = False
     481  
     482      if ns.pgo:
     483          display_failure = False
     484  
     485      test_name = result.test_name
     486      try:
     487          clear_caches()
     488          support.gc_collect()
     489  
     490          with save_env(ns, test_name):
     491              _load_run_test(result, ns)
     492      except support.ResourceDenied as msg:
     493          if not ns.quiet and not ns.pgo:
     494              print(f"{test_name} skipped -- {msg}", flush=True)
     495          result.state = State.RESOURCE_DENIED
     496          return
     497      except unittest.SkipTest as msg:
     498          if not ns.quiet and not ns.pgo:
     499              print(f"{test_name} skipped -- {msg}", flush=True)
     500          result.state = State.SKIPPED
     501          return
     502      except support.TestFailedWithDetails as exc:
     503          msg = f"test {test_name} failed"
     504          if display_failure:
     505              msg = f"{msg} -- {exc}"
     506          print(msg, file=sys.stderr, flush=True)
     507          result.state = State.FAILED
     508          result.errors = exc.errors
     509          result.failures = exc.failures
     510          result.stats = exc.stats
     511          return
     512      except support.TestFailed as exc:
     513          msg = f"test {test_name} failed"
     514          if display_failure:
     515              msg = f"{msg} -- {exc}"
     516          print(msg, file=sys.stderr, flush=True)
     517          result.state = State.FAILED
     518          result.stats = exc.stats
     519          return
     520      except support.TestDidNotRun:
     521          result.state = State.DID_NOT_RUN
     522          return
     523      except KeyboardInterrupt:
     524          print()
     525          result.state = State.INTERRUPTED
     526          return
     527      except:
     528          if not ns.pgo:
     529              msg = traceback.format_exc()
     530              print(f"test {test_name} crashed -- {msg}",
     531                    file=sys.stderr, flush=True)
     532          result.state = State.UNCAUGHT_EXC
     533          return
     534  
     535      if support.environment_altered:
     536          result.set_env_changed()
     537      # Don't override the state if it was already set (REFLEAK or ENV_CHANGED)
     538      if result.state is None:
     539          result.state = State.PASSED
     540  
     541  
     542  def remove_testfn(test_name: str, verbose: int) -> None:
     543      # Try to clean up os_helper.TESTFN if left behind.
     544      #
     545      # While tests shouldn't leave any files or directories behind, when a test
     546      # fails that can be tedious for it to arrange.  The consequences can be
     547      # especially nasty on Windows, since if a test leaves a file open, it
     548      # cannot be deleted by name (while there's nothing we can do about that
     549      # here either, we can display the name of the offending test, which is a
     550      # real help).
     551      name = os_helper.TESTFN
     552      if not os.path.exists(name):
     553          return
     554  
     555      if os.path.isdir(name):
     556          import shutil
     557          kind, nuker = "directory", shutil.rmtree
     558      elif os.path.isfile(name):
     559          kind, nuker = "file", os.unlink
     560      else:
     561          raise RuntimeError(f"os.path says {name!r} exists but is neither "
     562                             f"directory nor file")
     563  
     564      if verbose:
     565          print_warning(f"{test_name} left behind {kind} {name!r}")
     566          support.environment_altered = True
     567  
     568      try:
     569          import stat
     570          # fix possible permissions problems that might prevent cleanup
     571          os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
     572          nuker(name)
     573      except Exception as exc:
     574          print_warning(f"{test_name} left behind {kind} {name!r} "
     575                        f"and it couldn't be removed: {exc}")