python (3.12.0)

(root)/
lib/
python3.12/
test/
libregrtest/
main.py
       1  import faulthandler
       2  import locale
       3  import os
       4  import platform
       5  import random
       6  import re
       7  import sys
       8  import sysconfig
       9  import tempfile
      10  import time
      11  import unittest
      12  from test.libregrtest.cmdline import _parse_args
      13  from test.libregrtest.runtest import (
      14      findtests, split_test_packages, runtest, abs_module_name,
      15      PROGRESS_MIN_TIME, State, MatchTestsDict, RunTests)
      16  from test.libregrtest.setup import setup_tests
      17  from test.libregrtest.pgo import setup_pgo_tests
      18  from test.libregrtest.utils import (strip_py_suffix, count, format_duration,
      19                                      printlist, get_build_info)
      20  from test import support
      21  from test.support import TestStats
      22  from test.support import os_helper
      23  from test.support import threading_helper
      24  
      25  
      26  # bpo-38203: Maximum delay in seconds to exit Python (call Py_Finalize()).
      27  # Used to protect against threading._shutdown() hang.
      28  # Must be smaller than buildbot "1200 seconds without output" limit.
      29  EXIT_TIMEOUT = 120.0
      30  
      31  EXITCODE_BAD_TEST = 2
      32  EXITCODE_ENV_CHANGED = 3
      33  EXITCODE_NO_TESTS_RAN = 4
      34  EXITCODE_RERUN_FAIL = 5
      35  EXITCODE_INTERRUPTED = 130
      36  
      37  
      38  class ESC[4;38;5;81mRegrtest:
      39      """Execute a test suite.
      40  
      41      This also parses command-line options and modifies its behavior
      42      accordingly.
      43  
      44      tests -- a list of strings containing test names (optional)
      45      testdir -- the directory in which to look for tests (optional)
      46  
      47      Users other than the Python test suite will certainly want to
      48      specify testdir; if it's omitted, the directory containing the
      49      Python test suite is searched for.
      50  
      51      If the tests argument is omitted, the tests listed on the
      52      command-line will be used.  If that's empty, too, then all *.py
      53      files beginning with test_ will be used.
      54  
      55      The other default arguments (verbose, quiet, exclude,
      56      single, randomize, use_resources, trace, coverdir,
      57      print_slow, and random_seed) allow programmers calling main()
      58      directly to set the values that would normally be set by flags
      59      on the command line.
      60      """
      61      def __init__(self):
      62          # Namespace of command line options
      63          self.ns = None
      64  
      65          # tests
      66          self.tests = []
      67          self.selected = []
      68          self.all_runtests: list[RunTests] = []
      69  
      70          # test results
      71          self.good: list[str] = []
      72          self.bad: list[str] = []
      73          self.rerun_bad: list[str] = []
      74          self.skipped: list[str] = []
      75          self.resource_denied: list[str] = []
      76          self.environment_changed: list[str] = []
      77          self.run_no_tests: list[str] = []
      78          self.rerun: list[str] = []
      79  
      80          self.need_rerun: list[TestResult] = []
      81          self.first_state: str | None = None
      82          self.interrupted = False
      83          self.total_stats = TestStats()
      84  
      85          # used by --slow
      86          self.test_times = []
      87  
      88          # used by --coverage, trace.Trace instance
      89          self.tracer = None
      90  
      91          # used to display the progress bar "[ 3/100]"
      92          self.start_time = time.perf_counter()
      93          self.test_count_text = ''
      94          self.test_count_width = 1
      95  
      96          # used by --single
      97          self.next_single_test = None
      98          self.next_single_filename = None
      99  
     100          # used by --junit-xml
     101          self.testsuite_xml = None
     102  
     103          # misc
     104          self.win_load_tracker = None
     105          self.tmp_dir = None
     106  
     107      def get_executed(self):
     108          return (set(self.good) | set(self.bad) | set(self.skipped)
     109                  | set(self.resource_denied) | set(self.environment_changed)
     110                  | set(self.run_no_tests))
     111  
     112      def accumulate_result(self, result, rerun=False):
     113          fail_env_changed = self.ns.fail_env_changed
     114          test_name = result.test_name
     115  
     116          match result.state:
     117              case State.PASSED:
     118                  self.good.append(test_name)
     119              case State.ENV_CHANGED:
     120                  self.environment_changed.append(test_name)
     121              case State.SKIPPED:
     122                  self.skipped.append(test_name)
     123              case State.RESOURCE_DENIED:
     124                  self.resource_denied.append(test_name)
     125              case State.INTERRUPTED:
     126                  self.interrupted = True
     127              case State.DID_NOT_RUN:
     128                  self.run_no_tests.append(test_name)
     129              case _:
     130                  if result.is_failed(fail_env_changed):
     131                      self.bad.append(test_name)
     132                      self.need_rerun.append(result)
     133                  else:
     134                      raise ValueError(f"invalid test state: {result.state!r}")
     135  
     136          if result.has_meaningful_duration() and not rerun:
     137              self.test_times.append((result.duration, test_name))
     138          if result.stats is not None:
     139              self.total_stats.accumulate(result.stats)
     140          if rerun:
     141              self.rerun.append(test_name)
     142  
     143          xml_data = result.xml_data
     144          if xml_data:
     145              import xml.etree.ElementTree as ET
     146              for e in xml_data:
     147                  try:
     148                      self.testsuite_xml.append(ET.fromstring(e))
     149                  except ET.ParseError:
     150                      print(xml_data, file=sys.__stderr__)
     151                      raise
     152  
     153      def log(self, line=''):
     154          empty = not line
     155  
     156          # add the system load prefix: "load avg: 1.80 "
     157          load_avg = self.getloadavg()
     158          if load_avg is not None:
     159              line = f"load avg: {load_avg:.2f} {line}"
     160  
     161          # add the timestamp prefix:  "0:01:05 "
     162          test_time = time.perf_counter() - self.start_time
     163  
     164          mins, secs = divmod(int(test_time), 60)
     165          hours, mins = divmod(mins, 60)
     166          test_time = "%d:%02d:%02d" % (hours, mins, secs)
     167  
     168          line = f"{test_time} {line}"
     169          if empty:
     170              line = line[:-1]
     171  
     172          print(line, flush=True)
     173  
     174      def display_progress(self, test_index, text):
     175          quiet = self.ns.quiet
     176          pgo = self.ns.pgo
     177          if quiet:
     178              return
     179  
     180          # "[ 51/405/1] test_tcl passed"
     181          line = f"{test_index:{self.test_count_width}}{self.test_count_text}"
     182          fails = len(self.bad) + len(self.environment_changed)
     183          if fails and not pgo:
     184              line = f"{line}/{fails}"
     185          self.log(f"[{line}] {text}")
     186  
     187      def parse_args(self, kwargs):
     188          ns = _parse_args(sys.argv[1:], **kwargs)
     189  
     190          if ns.xmlpath:
     191              support.junit_xml_list = self.testsuite_xml = []
     192  
     193          strip_py_suffix(ns.args)
     194  
     195          if ns.huntrleaks:
     196              warmup, repetitions, _ = ns.huntrleaks
     197              if warmup < 1 or repetitions < 1:
     198                  msg = ("Invalid values for the --huntrleaks/-R parameters. The "
     199                         "number of warmups and repetitions must be at least 1 "
     200                         "each (1:1).")
     201                  print(msg, file=sys.stderr, flush=True)
     202                  sys.exit(2)
     203  
     204          if ns.tempdir:
     205              ns.tempdir = os.path.expanduser(ns.tempdir)
     206  
     207          self.ns = ns
     208  
     209      def find_tests(self, tests):
     210          ns = self.ns
     211          single = ns.single
     212          fromfile = ns.fromfile
     213          pgo = ns.pgo
     214          exclude = ns.exclude
     215          test_dir = ns.testdir
     216          starting_test = ns.start
     217          randomize = ns.randomize
     218  
     219          self.tests = tests
     220  
     221          if single:
     222              self.next_single_filename = os.path.join(self.tmp_dir, 'pynexttest')
     223              try:
     224                  with open(self.next_single_filename, 'r') as fp:
     225                      next_test = fp.read().strip()
     226                      self.tests = [next_test]
     227              except OSError:
     228                  pass
     229  
     230          if fromfile:
     231              self.tests = []
     232              # regex to match 'test_builtin' in line:
     233              # '0:00:00 [  4/400] test_builtin -- test_dict took 1 sec'
     234              regex = re.compile(r'\btest_[a-zA-Z0-9_]+\b')
     235              with open(os.path.join(os_helper.SAVEDCWD, fromfile)) as fp:
     236                  for line in fp:
     237                      line = line.split('#', 1)[0]
     238                      line = line.strip()
     239                      match = regex.search(line)
     240                      if match is not None:
     241                          self.tests.append(match.group())
     242  
     243          strip_py_suffix(self.tests)
     244  
     245          if pgo:
     246              # add default PGO tests if no tests are specified
     247              setup_pgo_tests(ns)
     248  
     249          exclude_tests = set()
     250          if exclude:
     251              for arg in ns.args:
     252                  exclude_tests.add(arg)
     253              ns.args = []
     254  
     255          alltests = findtests(testdir=test_dir, exclude=exclude_tests)
     256  
     257          if not fromfile:
     258              self.selected = self.tests or ns.args
     259              if self.selected:
     260                  self.selected = split_test_packages(self.selected)
     261              else:
     262                  self.selected = alltests
     263          else:
     264              self.selected = self.tests
     265  
     266          if single:
     267              self.selected = self.selected[:1]
     268              try:
     269                  pos = alltests.index(self.selected[0])
     270                  self.next_single_test = alltests[pos + 1]
     271              except IndexError:
     272                  pass
     273  
     274          # Remove all the selected tests that precede start if it's set.
     275          if starting_test:
     276              try:
     277                  del self.selected[:self.selected.index(starting_test)]
     278              except ValueError:
     279                  print(f"Cannot find starting test: {starting_test}")
     280                  sys.exit(1)
     281  
     282          if randomize:
     283              if ns.random_seed is None:
     284                  ns.random_seed = random.randrange(10000000)
     285              random.seed(ns.random_seed)
     286              random.shuffle(self.selected)
     287  
     288      def list_tests(self):
     289          for name in self.selected:
     290              print(name)
     291  
     292      def _list_cases(self, suite):
     293          for test in suite:
     294              if isinstance(test, unittest.loader._FailedTest):
     295                  continue
     296              if isinstance(test, unittest.TestSuite):
     297                  self._list_cases(test)
     298              elif isinstance(test, unittest.TestCase):
     299                  if support.match_test(test):
     300                      print(test.id())
     301  
     302      def list_cases(self):
     303          ns = self.ns
     304          test_dir = ns.testdir
     305          support.verbose = False
     306          support.set_match_tests(ns.match_tests, ns.ignore_tests)
     307  
     308          skipped = []
     309          for test_name in self.selected:
     310              module_name = abs_module_name(test_name, test_dir)
     311              try:
     312                  suite = unittest.defaultTestLoader.loadTestsFromName(module_name)
     313                  self._list_cases(suite)
     314              except unittest.SkipTest:
     315                  skipped.append(test_name)
     316  
     317          if skipped:
     318              sys.stdout.flush()
     319              stderr = sys.stderr
     320              print(file=stderr)
     321              print(count(len(skipped), "test"), "skipped:", file=stderr)
     322              printlist(skipped, file=stderr)
     323  
     324      def get_rerun_match(self, rerun_list) -> MatchTestsDict:
     325          rerun_match_tests = {}
     326          for result in rerun_list:
     327              match_tests = result.get_rerun_match_tests()
     328              # ignore empty match list
     329              if match_tests:
     330                  rerun_match_tests[result.test_name] = match_tests
     331          return rerun_match_tests
     332  
     333      def _rerun_failed_tests(self, need_rerun):
     334          # Configure the runner to re-run tests
     335          ns = self.ns
     336          ns.verbose = True
     337          ns.failfast = False
     338          ns.verbose3 = False
     339          ns.forever = False
     340          if ns.use_mp is None:
     341              ns.use_mp = 1
     342  
     343          # Get tests to re-run
     344          tests = [result.test_name for result in need_rerun]
     345          match_tests = self.get_rerun_match(need_rerun)
     346          self.set_tests(tests)
     347  
     348          # Clear previously failed tests
     349          self.rerun_bad.extend(self.bad)
     350          self.bad.clear()
     351          self.need_rerun.clear()
     352  
     353          # Re-run failed tests
     354          self.log(f"Re-running {len(tests)} failed tests in verbose mode in subprocesses")
     355          runtests = RunTests(tests, match_tests=match_tests, rerun=True)
     356          self.all_runtests.append(runtests)
     357          self._run_tests_mp(runtests)
     358  
     359      def rerun_failed_tests(self, need_rerun):
     360          if self.ns.python:
     361              # Temp patch for https://github.com/python/cpython/issues/94052
     362              self.log(
     363                  "Re-running failed tests is not supported with --python "
     364                  "host runner option."
     365              )
     366              return
     367  
     368          self.first_state = self.get_tests_state()
     369  
     370          print()
     371          self._rerun_failed_tests(need_rerun)
     372  
     373          if self.bad:
     374              print(count(len(self.bad), 'test'), "failed again:")
     375              printlist(self.bad)
     376  
     377          self.display_result()
     378  
     379      def display_result(self):
     380          pgo = self.ns.pgo
     381          quiet = self.ns.quiet
     382          print_slow = self.ns.print_slow
     383  
     384          # If running the test suite for PGO then no one cares about results.
     385          if pgo:
     386              return
     387  
     388          print()
     389          print("== Tests result: %s ==" % self.get_tests_state())
     390  
     391          if self.interrupted:
     392              print("Test suite interrupted by signal SIGINT.")
     393  
     394          omitted = set(self.selected) - self.get_executed()
     395          if omitted:
     396              print()
     397              print(count(len(omitted), "test"), "omitted:")
     398              printlist(omitted)
     399  
     400          if self.good and not quiet:
     401              print()
     402              if (not self.bad
     403                  and not self.skipped
     404                  and not self.interrupted
     405                  and len(self.good) > 1):
     406                  print("All", end=' ')
     407              print(count(len(self.good), "test"), "OK.")
     408  
     409          if print_slow:
     410              self.test_times.sort(reverse=True)
     411              print()
     412              print("10 slowest tests:")
     413              for test_time, test in self.test_times[:10]:
     414                  print("- %s: %s" % (test, format_duration(test_time)))
     415  
     416          if self.bad:
     417              print()
     418              print(count(len(self.bad), "test"), "failed:")
     419              printlist(self.bad)
     420  
     421          if self.environment_changed:
     422              print()
     423              print("{} altered the execution environment:".format(
     424                       count(len(self.environment_changed), "test")))
     425              printlist(self.environment_changed)
     426  
     427          if self.skipped and not quiet:
     428              print()
     429              print(count(len(self.skipped), "test"), "skipped:")
     430              printlist(self.skipped)
     431  
     432          if self.resource_denied and not quiet:
     433              print()
     434              print(count(len(self.resource_denied), "test"), "skipped (resource denied):")
     435              printlist(self.resource_denied)
     436  
     437          if self.rerun:
     438              print()
     439              print("%s:" % count(len(self.rerun), "re-run test"))
     440              printlist(self.rerun)
     441  
     442          if self.run_no_tests:
     443              print()
     444              print(count(len(self.run_no_tests), "test"), "run no tests:")
     445              printlist(self.run_no_tests)
     446  
     447      def run_test(self, test_index, test_name, previous_test, save_modules):
     448          text = test_name
     449          if previous_test:
     450              text = '%s -- %s' % (text, previous_test)
     451          self.display_progress(test_index, text)
     452  
     453          if self.tracer:
     454              # If we're tracing code coverage, then we don't exit with status
     455              # if on a false return value from main.
     456              cmd = ('result = runtest(self.ns, test_name); '
     457                     'self.accumulate_result(result)')
     458              ns = dict(locals())
     459              self.tracer.runctx(cmd, globals=globals(), locals=ns)
     460              result = ns['result']
     461          else:
     462              result = runtest(self.ns, test_name)
     463              self.accumulate_result(result)
     464  
     465          # Unload the newly imported modules (best effort finalization)
     466          for module in sys.modules.keys():
     467              if module not in save_modules and module.startswith("test."):
     468                  support.unload(module)
     469  
     470          return result
     471  
     472      def run_tests_sequentially(self, runtests):
     473          ns = self.ns
     474          coverage = ns.trace
     475          fail_fast = ns.failfast
     476          fail_env_changed = ns.fail_env_changed
     477          timeout = ns.timeout
     478  
     479          if coverage:
     480              import trace
     481              self.tracer = trace.Trace(trace=False, count=True)
     482  
     483          save_modules = sys.modules.keys()
     484  
     485          msg = "Run tests sequentially"
     486          if timeout:
     487              msg += " (timeout: %s)" % format_duration(timeout)
     488          self.log(msg)
     489  
     490          previous_test = None
     491          tests_iter = runtests.iter_tests()
     492          for test_index, test_name in enumerate(tests_iter, 1):
     493              start_time = time.perf_counter()
     494  
     495              result = self.run_test(test_index, test_name,
     496                                     previous_test, save_modules)
     497  
     498              if result.must_stop(fail_fast, fail_env_changed):
     499                  break
     500  
     501              previous_test = str(result)
     502              test_time = time.perf_counter() - start_time
     503              if test_time >= PROGRESS_MIN_TIME:
     504                  previous_test = "%s in %s" % (previous_test, format_duration(test_time))
     505              elif result.state == State.PASSED:
     506                  # be quiet: say nothing if the test passed shortly
     507                  previous_test = None
     508  
     509          if previous_test:
     510              print(previous_test)
     511  
     512      def display_header(self):
     513          # Print basic platform information
     514          print("==", platform.python_implementation(), *sys.version.split())
     515          print("==", platform.platform(aliased=True),
     516                        "%s-endian" % sys.byteorder)
     517          print("== Python build:", ' '.join(get_build_info()))
     518          print("== cwd:", os.getcwd())
     519          cpu_count = os.cpu_count()
     520          if cpu_count:
     521              print("== CPU count:", cpu_count)
     522          print("== encodings: locale=%s, FS=%s"
     523                % (locale.getencoding(), sys.getfilesystemencoding()))
     524          self.display_sanitizers()
     525  
     526      def display_sanitizers(self):
     527          # This makes it easier to remember what to set in your local
     528          # environment when trying to reproduce a sanitizer failure.
     529          asan = support.check_sanitizer(address=True)
     530          msan = support.check_sanitizer(memory=True)
     531          ubsan = support.check_sanitizer(ub=True)
     532          sanitizers = []
     533          if asan:
     534              sanitizers.append("address")
     535          if msan:
     536              sanitizers.append("memory")
     537          if ubsan:
     538              sanitizers.append("undefined behavior")
     539          if not sanitizers:
     540              return
     541  
     542          print(f"== sanitizers: {', '.join(sanitizers)}")
     543          for sanitizer, env_var in (
     544              (asan, "ASAN_OPTIONS"),
     545              (msan, "MSAN_OPTIONS"),
     546              (ubsan, "UBSAN_OPTIONS"),
     547          ):
     548              options= os.environ.get(env_var)
     549              if sanitizer and options is not None:
     550                  print(f"== {env_var}={options!r}")
     551  
     552      def no_tests_run(self):
     553          return not any((self.good, self.bad, self.skipped, self.interrupted,
     554                          self.environment_changed))
     555  
     556      def get_tests_state(self):
     557          fail_env_changed = self.ns.fail_env_changed
     558  
     559          result = []
     560          if self.bad:
     561              result.append("FAILURE")
     562          elif fail_env_changed and self.environment_changed:
     563              result.append("ENV CHANGED")
     564          elif self.no_tests_run():
     565              result.append("NO TESTS RAN")
     566  
     567          if self.interrupted:
     568              result.append("INTERRUPTED")
     569  
     570          if not result:
     571              result.append("SUCCESS")
     572  
     573          result = ', '.join(result)
     574          if self.first_state:
     575              result = '%s then %s' % (self.first_state, result)
     576          return result
     577  
     578      def _run_tests_mp(self, runtests: RunTests) -> None:
     579          from test.libregrtest.runtest_mp import run_tests_multiprocess
     580          # If we're on windows and this is the parent runner (not a worker),
     581          # track the load average.
     582          if sys.platform == 'win32':
     583              from test.libregrtest.win_utils import WindowsLoadTracker
     584  
     585              try:
     586                  self.win_load_tracker = WindowsLoadTracker()
     587              except PermissionError as error:
     588                  # Standard accounts may not have access to the performance
     589                  # counters.
     590                  print(f'Failed to create WindowsLoadTracker: {error}')
     591  
     592          try:
     593              run_tests_multiprocess(self, runtests)
     594          finally:
     595              if self.win_load_tracker is not None:
     596                  self.win_load_tracker.close()
     597                  self.win_load_tracker = None
     598  
     599      def set_tests(self, tests):
     600          self.tests = tests
     601          if self.ns.forever:
     602              self.test_count_text = ''
     603              self.test_count_width = 3
     604          else:
     605              self.test_count_text = '/{}'.format(len(self.tests))
     606              self.test_count_width = len(self.test_count_text) - 1
     607  
     608      def run_tests(self):
     609          # For a partial run, we do not need to clutter the output.
     610          if (self.ns.header
     611              or not(self.ns.pgo or self.ns.quiet or self.ns.single
     612                     or self.tests or self.ns.args)):
     613              self.display_header()
     614  
     615          if self.ns.huntrleaks:
     616              warmup, repetitions, _ = self.ns.huntrleaks
     617              if warmup < 3:
     618                  msg = ("WARNING: Running tests with --huntrleaks/-R and less than "
     619                          "3 warmup repetitions can give false positives!")
     620                  print(msg, file=sys.stdout, flush=True)
     621  
     622          if self.ns.randomize:
     623              print("Using random seed", self.ns.random_seed)
     624  
     625          tests = self.selected
     626          self.set_tests(tests)
     627          runtests = RunTests(tests, forever=self.ns.forever)
     628          self.all_runtests.append(runtests)
     629          if self.ns.use_mp:
     630              self._run_tests_mp(runtests)
     631          else:
     632              self.run_tests_sequentially(runtests)
     633  
     634      def finalize(self):
     635          if self.next_single_filename:
     636              if self.next_single_test:
     637                  with open(self.next_single_filename, 'w') as fp:
     638                      fp.write(self.next_single_test + '\n')
     639              else:
     640                  os.unlink(self.next_single_filename)
     641  
     642          if self.tracer:
     643              r = self.tracer.results()
     644              r.write_results(show_missing=True, summary=True,
     645                              coverdir=self.ns.coverdir)
     646  
     647          if self.ns.runleaks:
     648              os.system("leaks %d" % os.getpid())
     649  
     650          self.save_xml_result()
     651  
     652      def display_summary(self):
     653          duration = time.perf_counter() - self.start_time
     654          first_runtests = self.all_runtests[0]
     655          # the second runtests (re-run failed tests) disables forever,
     656          # use the first runtests
     657          forever = first_runtests.forever
     658          filtered = bool(self.ns.match_tests) or bool(self.ns.ignore_tests)
     659  
     660          # Total duration
     661          print()
     662          print("Total duration: %s" % format_duration(duration))
     663  
     664          # Total tests
     665          total = self.total_stats
     666          text = f'run={total.tests_run:,}'
     667          if filtered:
     668              text = f"{text} (filtered)"
     669          stats = [text]
     670          if total.failures:
     671              stats.append(f'failures={total.failures:,}')
     672          if total.skipped:
     673              stats.append(f'skipped={total.skipped:,}')
     674          print(f"Total tests: {' '.join(stats)}")
     675  
     676          # Total test files
     677          all_tests = [self.good, self.bad, self.rerun,
     678                       self.skipped,
     679                       self.environment_changed, self.run_no_tests]
     680          run = sum(map(len, all_tests))
     681          text = f'run={run}'
     682          if not forever:
     683              ntest = len(first_runtests.tests)
     684              text = f"{text}/{ntest}"
     685          if filtered:
     686              text = f"{text} (filtered)"
     687          report = [text]
     688          for name, tests in (
     689              ('failed', self.bad),
     690              ('env_changed', self.environment_changed),
     691              ('skipped', self.skipped),
     692              ('resource_denied', self.resource_denied),
     693              ('rerun', self.rerun),
     694              ('run_no_tests', self.run_no_tests),
     695          ):
     696              if tests:
     697                  report.append(f'{name}={len(tests)}')
     698          print(f"Total test files: {' '.join(report)}")
     699  
     700          # Result
     701          result = self.get_tests_state()
     702          print(f"Result: {result}")
     703  
     704      def save_xml_result(self):
     705          if not self.ns.xmlpath and not self.testsuite_xml:
     706              return
     707  
     708          import xml.etree.ElementTree as ET
     709          root = ET.Element("testsuites")
     710  
     711          # Manually count the totals for the overall summary
     712          totals = {'tests': 0, 'errors': 0, 'failures': 0}
     713          for suite in self.testsuite_xml:
     714              root.append(suite)
     715              for k in totals:
     716                  try:
     717                      totals[k] += int(suite.get(k, 0))
     718                  except ValueError:
     719                      pass
     720  
     721          for k, v in totals.items():
     722              root.set(k, str(v))
     723  
     724          xmlpath = os.path.join(os_helper.SAVEDCWD, self.ns.xmlpath)
     725          with open(xmlpath, 'wb') as f:
     726              for s in ET.tostringlist(root):
     727                  f.write(s)
     728  
     729      def fix_umask(self):
     730          if support.is_emscripten:
     731              # Emscripten has default umask 0o777, which breaks some tests.
     732              # see https://github.com/emscripten-core/emscripten/issues/17269
     733              old_mask = os.umask(0)
     734              if old_mask == 0o777:
     735                  os.umask(0o027)
     736              else:
     737                  os.umask(old_mask)
     738  
     739      def set_temp_dir(self):
     740          if self.ns.tempdir:
     741              self.tmp_dir = self.ns.tempdir
     742  
     743          if not self.tmp_dir:
     744              # When tests are run from the Python build directory, it is best practice
     745              # to keep the test files in a subfolder.  This eases the cleanup of leftover
     746              # files using the "make distclean" command.
     747              if sysconfig.is_python_build():
     748                  self.tmp_dir = sysconfig.get_config_var('abs_builddir')
     749                  if self.tmp_dir is None:
     750                      # bpo-30284: On Windows, only srcdir is available. Using
     751                      # abs_builddir mostly matters on UNIX when building Python
     752                      # out of the source tree, especially when the source tree
     753                      # is read only.
     754                      self.tmp_dir = sysconfig.get_config_var('srcdir')
     755                  self.tmp_dir = os.path.join(self.tmp_dir, 'build')
     756              else:
     757                  self.tmp_dir = tempfile.gettempdir()
     758  
     759          self.tmp_dir = os.path.abspath(self.tmp_dir)
     760  
     761      def is_worker(self):
     762          return (self.ns.worker_args is not None)
     763  
     764      def create_temp_dir(self):
     765          os.makedirs(self.tmp_dir, exist_ok=True)
     766  
     767          # Define a writable temp dir that will be used as cwd while running
     768          # the tests. The name of the dir includes the pid to allow parallel
     769          # testing (see the -j option).
     770          # Emscripten and WASI have stubbed getpid(), Emscripten has only
     771          # milisecond clock resolution. Use randint() instead.
     772          if sys.platform in {"emscripten", "wasi"}:
     773              nounce = random.randint(0, 1_000_000)
     774          else:
     775              nounce = os.getpid()
     776  
     777          if self.is_worker():
     778              test_cwd = 'test_python_worker_{}'.format(nounce)
     779          else:
     780              test_cwd = 'test_python_{}'.format(nounce)
     781          test_cwd += os_helper.FS_NONASCII
     782          test_cwd = os.path.join(self.tmp_dir, test_cwd)
     783          return test_cwd
     784  
     785      def cleanup(self):
     786          import glob
     787  
     788          path = os.path.join(glob.escape(self.tmp_dir), 'test_python_*')
     789          print("Cleanup %s directory" % self.tmp_dir)
     790          for name in glob.glob(path):
     791              if os.path.isdir(name):
     792                  print("Remove directory: %s" % name)
     793                  os_helper.rmtree(name)
     794              else:
     795                  print("Remove file: %s" % name)
     796                  os_helper.unlink(name)
     797  
     798      def main(self, tests=None, **kwargs):
     799          self.parse_args(kwargs)
     800  
     801          self.set_temp_dir()
     802  
     803          self.fix_umask()
     804  
     805          if self.ns.cleanup:
     806              self.cleanup()
     807              sys.exit(0)
     808  
     809          test_cwd = self.create_temp_dir()
     810  
     811          try:
     812              # Run the tests in a context manager that temporarily changes the CWD
     813              # to a temporary and writable directory. If it's not possible to
     814              # create or change the CWD, the original CWD will be used.
     815              # The original CWD is available from os_helper.SAVEDCWD.
     816              with os_helper.temp_cwd(test_cwd, quiet=True):
     817                  # When using multiprocessing, worker processes will use test_cwd
     818                  # as their parent temporary directory. So when the main process
     819                  # exit, it removes also subdirectories of worker processes.
     820                  self.ns.tempdir = test_cwd
     821  
     822                  self._main(tests, kwargs)
     823          except SystemExit as exc:
     824              # bpo-38203: Python can hang at exit in Py_Finalize(), especially
     825              # on threading._shutdown() call: put a timeout
     826              if threading_helper.can_start_thread:
     827                  faulthandler.dump_traceback_later(EXIT_TIMEOUT, exit=True)
     828  
     829              sys.exit(exc.code)
     830  
     831      def getloadavg(self):
     832          if self.win_load_tracker is not None:
     833              return self.win_load_tracker.getloadavg()
     834  
     835          if hasattr(os, 'getloadavg'):
     836              return os.getloadavg()[0]
     837  
     838          return None
     839  
     840      def get_exitcode(self):
     841          exitcode = 0
     842          if self.bad:
     843              exitcode = EXITCODE_BAD_TEST
     844          elif self.interrupted:
     845              exitcode = EXITCODE_INTERRUPTED
     846          elif self.ns.fail_env_changed and self.environment_changed:
     847              exitcode = EXITCODE_ENV_CHANGED
     848          elif self.no_tests_run():
     849              exitcode = EXITCODE_NO_TESTS_RAN
     850          elif self.rerun and self.ns.fail_rerun:
     851              exitcode = EXITCODE_RERUN_FAIL
     852          return exitcode
     853  
     854      def action_run_tests(self):
     855          self.run_tests()
     856          self.display_result()
     857  
     858          need_rerun = self.need_rerun
     859          if self.ns.rerun and need_rerun:
     860              self.rerun_failed_tests(need_rerun)
     861  
     862          self.display_summary()
     863          self.finalize()
     864  
     865      def _main(self, tests, kwargs):
     866          if self.is_worker():
     867              from test.libregrtest.runtest_mp import run_tests_worker
     868              run_tests_worker(self.ns.worker_args)
     869              return
     870  
     871          if self.ns.wait:
     872              input("Press any key to continue...")
     873  
     874          setup_tests(self.ns)
     875          self.find_tests(tests)
     876  
     877          exitcode = 0
     878          if self.ns.list_tests:
     879              self.list_tests()
     880          elif self.ns.list_cases:
     881              self.list_cases()
     882          else:
     883              self.action_run_tests()
     884              exitcode = self.get_exitcode()
     885  
     886          sys.exit(exitcode)
     887  
     888  
     889  def main(tests=None, **kwargs):
     890      """Run the Python suite."""
     891      Regrtest().main(tests=tests, **kwargs)