python (3.12.0)
1 import faulthandler
2 import locale
3 import os
4 import platform
5 import random
6 import re
7 import sys
8 import sysconfig
9 import tempfile
10 import time
11 import unittest
12 from test.libregrtest.cmdline import _parse_args
13 from test.libregrtest.runtest import (
14 findtests, split_test_packages, runtest, abs_module_name,
15 PROGRESS_MIN_TIME, State, MatchTestsDict, RunTests)
16 from test.libregrtest.setup import setup_tests
17 from test.libregrtest.pgo import setup_pgo_tests
18 from test.libregrtest.utils import (strip_py_suffix, count, format_duration,
19 printlist, get_build_info)
20 from test import support
21 from test.support import TestStats
22 from test.support import os_helper
23 from test.support import threading_helper
24
25
26 # bpo-38203: Maximum delay in seconds to exit Python (call Py_Finalize()).
27 # Used to protect against threading._shutdown() hang.
28 # Must be smaller than buildbot "1200 seconds without output" limit.
29 EXIT_TIMEOUT = 120.0
30
31 EXITCODE_BAD_TEST = 2
32 EXITCODE_ENV_CHANGED = 3
33 EXITCODE_NO_TESTS_RAN = 4
34 EXITCODE_RERUN_FAIL = 5
35 EXITCODE_INTERRUPTED = 130
36
37
38 class ESC[4;38;5;81mRegrtest:
39 """Execute a test suite.
40
41 This also parses command-line options and modifies its behavior
42 accordingly.
43
44 tests -- a list of strings containing test names (optional)
45 testdir -- the directory in which to look for tests (optional)
46
47 Users other than the Python test suite will certainly want to
48 specify testdir; if it's omitted, the directory containing the
49 Python test suite is searched for.
50
51 If the tests argument is omitted, the tests listed on the
52 command-line will be used. If that's empty, too, then all *.py
53 files beginning with test_ will be used.
54
55 The other default arguments (verbose, quiet, exclude,
56 single, randomize, use_resources, trace, coverdir,
57 print_slow, and random_seed) allow programmers calling main()
58 directly to set the values that would normally be set by flags
59 on the command line.
60 """
61 def __init__(self):
62 # Namespace of command line options
63 self.ns = None
64
65 # tests
66 self.tests = []
67 self.selected = []
68 self.all_runtests: list[RunTests] = []
69
70 # test results
71 self.good: list[str] = []
72 self.bad: list[str] = []
73 self.rerun_bad: list[str] = []
74 self.skipped: list[str] = []
75 self.resource_denied: list[str] = []
76 self.environment_changed: list[str] = []
77 self.run_no_tests: list[str] = []
78 self.rerun: list[str] = []
79
80 self.need_rerun: list[TestResult] = []
81 self.first_state: str | None = None
82 self.interrupted = False
83 self.total_stats = TestStats()
84
85 # used by --slow
86 self.test_times = []
87
88 # used by --coverage, trace.Trace instance
89 self.tracer = None
90
91 # used to display the progress bar "[ 3/100]"
92 self.start_time = time.perf_counter()
93 self.test_count_text = ''
94 self.test_count_width = 1
95
96 # used by --single
97 self.next_single_test = None
98 self.next_single_filename = None
99
100 # used by --junit-xml
101 self.testsuite_xml = None
102
103 # misc
104 self.win_load_tracker = None
105 self.tmp_dir = None
106
107 def get_executed(self):
108 return (set(self.good) | set(self.bad) | set(self.skipped)
109 | set(self.resource_denied) | set(self.environment_changed)
110 | set(self.run_no_tests))
111
112 def accumulate_result(self, result, rerun=False):
113 fail_env_changed = self.ns.fail_env_changed
114 test_name = result.test_name
115
116 match result.state:
117 case State.PASSED:
118 self.good.append(test_name)
119 case State.ENV_CHANGED:
120 self.environment_changed.append(test_name)
121 case State.SKIPPED:
122 self.skipped.append(test_name)
123 case State.RESOURCE_DENIED:
124 self.resource_denied.append(test_name)
125 case State.INTERRUPTED:
126 self.interrupted = True
127 case State.DID_NOT_RUN:
128 self.run_no_tests.append(test_name)
129 case _:
130 if result.is_failed(fail_env_changed):
131 self.bad.append(test_name)
132 self.need_rerun.append(result)
133 else:
134 raise ValueError(f"invalid test state: {result.state!r}")
135
136 if result.has_meaningful_duration() and not rerun:
137 self.test_times.append((result.duration, test_name))
138 if result.stats is not None:
139 self.total_stats.accumulate(result.stats)
140 if rerun:
141 self.rerun.append(test_name)
142
143 xml_data = result.xml_data
144 if xml_data:
145 import xml.etree.ElementTree as ET
146 for e in xml_data:
147 try:
148 self.testsuite_xml.append(ET.fromstring(e))
149 except ET.ParseError:
150 print(xml_data, file=sys.__stderr__)
151 raise
152
153 def log(self, line=''):
154 empty = not line
155
156 # add the system load prefix: "load avg: 1.80 "
157 load_avg = self.getloadavg()
158 if load_avg is not None:
159 line = f"load avg: {load_avg:.2f} {line}"
160
161 # add the timestamp prefix: "0:01:05 "
162 test_time = time.perf_counter() - self.start_time
163
164 mins, secs = divmod(int(test_time), 60)
165 hours, mins = divmod(mins, 60)
166 test_time = "%d:%02d:%02d" % (hours, mins, secs)
167
168 line = f"{test_time} {line}"
169 if empty:
170 line = line[:-1]
171
172 print(line, flush=True)
173
174 def display_progress(self, test_index, text):
175 quiet = self.ns.quiet
176 pgo = self.ns.pgo
177 if quiet:
178 return
179
180 # "[ 51/405/1] test_tcl passed"
181 line = f"{test_index:{self.test_count_width}}{self.test_count_text}"
182 fails = len(self.bad) + len(self.environment_changed)
183 if fails and not pgo:
184 line = f"{line}/{fails}"
185 self.log(f"[{line}] {text}")
186
187 def parse_args(self, kwargs):
188 ns = _parse_args(sys.argv[1:], **kwargs)
189
190 if ns.xmlpath:
191 support.junit_xml_list = self.testsuite_xml = []
192
193 strip_py_suffix(ns.args)
194
195 if ns.huntrleaks:
196 warmup, repetitions, _ = ns.huntrleaks
197 if warmup < 1 or repetitions < 1:
198 msg = ("Invalid values for the --huntrleaks/-R parameters. The "
199 "number of warmups and repetitions must be at least 1 "
200 "each (1:1).")
201 print(msg, file=sys.stderr, flush=True)
202 sys.exit(2)
203
204 if ns.tempdir:
205 ns.tempdir = os.path.expanduser(ns.tempdir)
206
207 self.ns = ns
208
209 def find_tests(self, tests):
210 ns = self.ns
211 single = ns.single
212 fromfile = ns.fromfile
213 pgo = ns.pgo
214 exclude = ns.exclude
215 test_dir = ns.testdir
216 starting_test = ns.start
217 randomize = ns.randomize
218
219 self.tests = tests
220
221 if single:
222 self.next_single_filename = os.path.join(self.tmp_dir, 'pynexttest')
223 try:
224 with open(self.next_single_filename, 'r') as fp:
225 next_test = fp.read().strip()
226 self.tests = [next_test]
227 except OSError:
228 pass
229
230 if fromfile:
231 self.tests = []
232 # regex to match 'test_builtin' in line:
233 # '0:00:00 [ 4/400] test_builtin -- test_dict took 1 sec'
234 regex = re.compile(r'\btest_[a-zA-Z0-9_]+\b')
235 with open(os.path.join(os_helper.SAVEDCWD, fromfile)) as fp:
236 for line in fp:
237 line = line.split('#', 1)[0]
238 line = line.strip()
239 match = regex.search(line)
240 if match is not None:
241 self.tests.append(match.group())
242
243 strip_py_suffix(self.tests)
244
245 if pgo:
246 # add default PGO tests if no tests are specified
247 setup_pgo_tests(ns)
248
249 exclude_tests = set()
250 if exclude:
251 for arg in ns.args:
252 exclude_tests.add(arg)
253 ns.args = []
254
255 alltests = findtests(testdir=test_dir, exclude=exclude_tests)
256
257 if not fromfile:
258 self.selected = self.tests or ns.args
259 if self.selected:
260 self.selected = split_test_packages(self.selected)
261 else:
262 self.selected = alltests
263 else:
264 self.selected = self.tests
265
266 if single:
267 self.selected = self.selected[:1]
268 try:
269 pos = alltests.index(self.selected[0])
270 self.next_single_test = alltests[pos + 1]
271 except IndexError:
272 pass
273
274 # Remove all the selected tests that precede start if it's set.
275 if starting_test:
276 try:
277 del self.selected[:self.selected.index(starting_test)]
278 except ValueError:
279 print(f"Cannot find starting test: {starting_test}")
280 sys.exit(1)
281
282 if randomize:
283 if ns.random_seed is None:
284 ns.random_seed = random.randrange(10000000)
285 random.seed(ns.random_seed)
286 random.shuffle(self.selected)
287
288 def list_tests(self):
289 for name in self.selected:
290 print(name)
291
292 def _list_cases(self, suite):
293 for test in suite:
294 if isinstance(test, unittest.loader._FailedTest):
295 continue
296 if isinstance(test, unittest.TestSuite):
297 self._list_cases(test)
298 elif isinstance(test, unittest.TestCase):
299 if support.match_test(test):
300 print(test.id())
301
302 def list_cases(self):
303 ns = self.ns
304 test_dir = ns.testdir
305 support.verbose = False
306 support.set_match_tests(ns.match_tests, ns.ignore_tests)
307
308 skipped = []
309 for test_name in self.selected:
310 module_name = abs_module_name(test_name, test_dir)
311 try:
312 suite = unittest.defaultTestLoader.loadTestsFromName(module_name)
313 self._list_cases(suite)
314 except unittest.SkipTest:
315 skipped.append(test_name)
316
317 if skipped:
318 sys.stdout.flush()
319 stderr = sys.stderr
320 print(file=stderr)
321 print(count(len(skipped), "test"), "skipped:", file=stderr)
322 printlist(skipped, file=stderr)
323
324 def get_rerun_match(self, rerun_list) -> MatchTestsDict:
325 rerun_match_tests = {}
326 for result in rerun_list:
327 match_tests = result.get_rerun_match_tests()
328 # ignore empty match list
329 if match_tests:
330 rerun_match_tests[result.test_name] = match_tests
331 return rerun_match_tests
332
333 def _rerun_failed_tests(self, need_rerun):
334 # Configure the runner to re-run tests
335 ns = self.ns
336 ns.verbose = True
337 ns.failfast = False
338 ns.verbose3 = False
339 ns.forever = False
340 if ns.use_mp is None:
341 ns.use_mp = 1
342
343 # Get tests to re-run
344 tests = [result.test_name for result in need_rerun]
345 match_tests = self.get_rerun_match(need_rerun)
346 self.set_tests(tests)
347
348 # Clear previously failed tests
349 self.rerun_bad.extend(self.bad)
350 self.bad.clear()
351 self.need_rerun.clear()
352
353 # Re-run failed tests
354 self.log(f"Re-running {len(tests)} failed tests in verbose mode in subprocesses")
355 runtests = RunTests(tests, match_tests=match_tests, rerun=True)
356 self.all_runtests.append(runtests)
357 self._run_tests_mp(runtests)
358
359 def rerun_failed_tests(self, need_rerun):
360 if self.ns.python:
361 # Temp patch for https://github.com/python/cpython/issues/94052
362 self.log(
363 "Re-running failed tests is not supported with --python "
364 "host runner option."
365 )
366 return
367
368 self.first_state = self.get_tests_state()
369
370 print()
371 self._rerun_failed_tests(need_rerun)
372
373 if self.bad:
374 print(count(len(self.bad), 'test'), "failed again:")
375 printlist(self.bad)
376
377 self.display_result()
378
379 def display_result(self):
380 pgo = self.ns.pgo
381 quiet = self.ns.quiet
382 print_slow = self.ns.print_slow
383
384 # If running the test suite for PGO then no one cares about results.
385 if pgo:
386 return
387
388 print()
389 print("== Tests result: %s ==" % self.get_tests_state())
390
391 if self.interrupted:
392 print("Test suite interrupted by signal SIGINT.")
393
394 omitted = set(self.selected) - self.get_executed()
395 if omitted:
396 print()
397 print(count(len(omitted), "test"), "omitted:")
398 printlist(omitted)
399
400 if self.good and not quiet:
401 print()
402 if (not self.bad
403 and not self.skipped
404 and not self.interrupted
405 and len(self.good) > 1):
406 print("All", end=' ')
407 print(count(len(self.good), "test"), "OK.")
408
409 if print_slow:
410 self.test_times.sort(reverse=True)
411 print()
412 print("10 slowest tests:")
413 for test_time, test in self.test_times[:10]:
414 print("- %s: %s" % (test, format_duration(test_time)))
415
416 if self.bad:
417 print()
418 print(count(len(self.bad), "test"), "failed:")
419 printlist(self.bad)
420
421 if self.environment_changed:
422 print()
423 print("{} altered the execution environment:".format(
424 count(len(self.environment_changed), "test")))
425 printlist(self.environment_changed)
426
427 if self.skipped and not quiet:
428 print()
429 print(count(len(self.skipped), "test"), "skipped:")
430 printlist(self.skipped)
431
432 if self.resource_denied and not quiet:
433 print()
434 print(count(len(self.resource_denied), "test"), "skipped (resource denied):")
435 printlist(self.resource_denied)
436
437 if self.rerun:
438 print()
439 print("%s:" % count(len(self.rerun), "re-run test"))
440 printlist(self.rerun)
441
442 if self.run_no_tests:
443 print()
444 print(count(len(self.run_no_tests), "test"), "run no tests:")
445 printlist(self.run_no_tests)
446
447 def run_test(self, test_index, test_name, previous_test, save_modules):
448 text = test_name
449 if previous_test:
450 text = '%s -- %s' % (text, previous_test)
451 self.display_progress(test_index, text)
452
453 if self.tracer:
454 # If we're tracing code coverage, then we don't exit with status
455 # if on a false return value from main.
456 cmd = ('result = runtest(self.ns, test_name); '
457 'self.accumulate_result(result)')
458 ns = dict(locals())
459 self.tracer.runctx(cmd, globals=globals(), locals=ns)
460 result = ns['result']
461 else:
462 result = runtest(self.ns, test_name)
463 self.accumulate_result(result)
464
465 # Unload the newly imported modules (best effort finalization)
466 for module in sys.modules.keys():
467 if module not in save_modules and module.startswith("test."):
468 support.unload(module)
469
470 return result
471
472 def run_tests_sequentially(self, runtests):
473 ns = self.ns
474 coverage = ns.trace
475 fail_fast = ns.failfast
476 fail_env_changed = ns.fail_env_changed
477 timeout = ns.timeout
478
479 if coverage:
480 import trace
481 self.tracer = trace.Trace(trace=False, count=True)
482
483 save_modules = sys.modules.keys()
484
485 msg = "Run tests sequentially"
486 if timeout:
487 msg += " (timeout: %s)" % format_duration(timeout)
488 self.log(msg)
489
490 previous_test = None
491 tests_iter = runtests.iter_tests()
492 for test_index, test_name in enumerate(tests_iter, 1):
493 start_time = time.perf_counter()
494
495 result = self.run_test(test_index, test_name,
496 previous_test, save_modules)
497
498 if result.must_stop(fail_fast, fail_env_changed):
499 break
500
501 previous_test = str(result)
502 test_time = time.perf_counter() - start_time
503 if test_time >= PROGRESS_MIN_TIME:
504 previous_test = "%s in %s" % (previous_test, format_duration(test_time))
505 elif result.state == State.PASSED:
506 # be quiet: say nothing if the test passed shortly
507 previous_test = None
508
509 if previous_test:
510 print(previous_test)
511
512 def display_header(self):
513 # Print basic platform information
514 print("==", platform.python_implementation(), *sys.version.split())
515 print("==", platform.platform(aliased=True),
516 "%s-endian" % sys.byteorder)
517 print("== Python build:", ' '.join(get_build_info()))
518 print("== cwd:", os.getcwd())
519 cpu_count = os.cpu_count()
520 if cpu_count:
521 print("== CPU count:", cpu_count)
522 print("== encodings: locale=%s, FS=%s"
523 % (locale.getencoding(), sys.getfilesystemencoding()))
524 self.display_sanitizers()
525
526 def display_sanitizers(self):
527 # This makes it easier to remember what to set in your local
528 # environment when trying to reproduce a sanitizer failure.
529 asan = support.check_sanitizer(address=True)
530 msan = support.check_sanitizer(memory=True)
531 ubsan = support.check_sanitizer(ub=True)
532 sanitizers = []
533 if asan:
534 sanitizers.append("address")
535 if msan:
536 sanitizers.append("memory")
537 if ubsan:
538 sanitizers.append("undefined behavior")
539 if not sanitizers:
540 return
541
542 print(f"== sanitizers: {', '.join(sanitizers)}")
543 for sanitizer, env_var in (
544 (asan, "ASAN_OPTIONS"),
545 (msan, "MSAN_OPTIONS"),
546 (ubsan, "UBSAN_OPTIONS"),
547 ):
548 options= os.environ.get(env_var)
549 if sanitizer and options is not None:
550 print(f"== {env_var}={options!r}")
551
552 def no_tests_run(self):
553 return not any((self.good, self.bad, self.skipped, self.interrupted,
554 self.environment_changed))
555
556 def get_tests_state(self):
557 fail_env_changed = self.ns.fail_env_changed
558
559 result = []
560 if self.bad:
561 result.append("FAILURE")
562 elif fail_env_changed and self.environment_changed:
563 result.append("ENV CHANGED")
564 elif self.no_tests_run():
565 result.append("NO TESTS RAN")
566
567 if self.interrupted:
568 result.append("INTERRUPTED")
569
570 if not result:
571 result.append("SUCCESS")
572
573 result = ', '.join(result)
574 if self.first_state:
575 result = '%s then %s' % (self.first_state, result)
576 return result
577
578 def _run_tests_mp(self, runtests: RunTests) -> None:
579 from test.libregrtest.runtest_mp import run_tests_multiprocess
580 # If we're on windows and this is the parent runner (not a worker),
581 # track the load average.
582 if sys.platform == 'win32':
583 from test.libregrtest.win_utils import WindowsLoadTracker
584
585 try:
586 self.win_load_tracker = WindowsLoadTracker()
587 except PermissionError as error:
588 # Standard accounts may not have access to the performance
589 # counters.
590 print(f'Failed to create WindowsLoadTracker: {error}')
591
592 try:
593 run_tests_multiprocess(self, runtests)
594 finally:
595 if self.win_load_tracker is not None:
596 self.win_load_tracker.close()
597 self.win_load_tracker = None
598
599 def set_tests(self, tests):
600 self.tests = tests
601 if self.ns.forever:
602 self.test_count_text = ''
603 self.test_count_width = 3
604 else:
605 self.test_count_text = '/{}'.format(len(self.tests))
606 self.test_count_width = len(self.test_count_text) - 1
607
608 def run_tests(self):
609 # For a partial run, we do not need to clutter the output.
610 if (self.ns.header
611 or not(self.ns.pgo or self.ns.quiet or self.ns.single
612 or self.tests or self.ns.args)):
613 self.display_header()
614
615 if self.ns.huntrleaks:
616 warmup, repetitions, _ = self.ns.huntrleaks
617 if warmup < 3:
618 msg = ("WARNING: Running tests with --huntrleaks/-R and less than "
619 "3 warmup repetitions can give false positives!")
620 print(msg, file=sys.stdout, flush=True)
621
622 if self.ns.randomize:
623 print("Using random seed", self.ns.random_seed)
624
625 tests = self.selected
626 self.set_tests(tests)
627 runtests = RunTests(tests, forever=self.ns.forever)
628 self.all_runtests.append(runtests)
629 if self.ns.use_mp:
630 self._run_tests_mp(runtests)
631 else:
632 self.run_tests_sequentially(runtests)
633
634 def finalize(self):
635 if self.next_single_filename:
636 if self.next_single_test:
637 with open(self.next_single_filename, 'w') as fp:
638 fp.write(self.next_single_test + '\n')
639 else:
640 os.unlink(self.next_single_filename)
641
642 if self.tracer:
643 r = self.tracer.results()
644 r.write_results(show_missing=True, summary=True,
645 coverdir=self.ns.coverdir)
646
647 if self.ns.runleaks:
648 os.system("leaks %d" % os.getpid())
649
650 self.save_xml_result()
651
652 def display_summary(self):
653 duration = time.perf_counter() - self.start_time
654 first_runtests = self.all_runtests[0]
655 # the second runtests (re-run failed tests) disables forever,
656 # use the first runtests
657 forever = first_runtests.forever
658 filtered = bool(self.ns.match_tests) or bool(self.ns.ignore_tests)
659
660 # Total duration
661 print()
662 print("Total duration: %s" % format_duration(duration))
663
664 # Total tests
665 total = self.total_stats
666 text = f'run={total.tests_run:,}'
667 if filtered:
668 text = f"{text} (filtered)"
669 stats = [text]
670 if total.failures:
671 stats.append(f'failures={total.failures:,}')
672 if total.skipped:
673 stats.append(f'skipped={total.skipped:,}')
674 print(f"Total tests: {' '.join(stats)}")
675
676 # Total test files
677 all_tests = [self.good, self.bad, self.rerun,
678 self.skipped,
679 self.environment_changed, self.run_no_tests]
680 run = sum(map(len, all_tests))
681 text = f'run={run}'
682 if not forever:
683 ntest = len(first_runtests.tests)
684 text = f"{text}/{ntest}"
685 if filtered:
686 text = f"{text} (filtered)"
687 report = [text]
688 for name, tests in (
689 ('failed', self.bad),
690 ('env_changed', self.environment_changed),
691 ('skipped', self.skipped),
692 ('resource_denied', self.resource_denied),
693 ('rerun', self.rerun),
694 ('run_no_tests', self.run_no_tests),
695 ):
696 if tests:
697 report.append(f'{name}={len(tests)}')
698 print(f"Total test files: {' '.join(report)}")
699
700 # Result
701 result = self.get_tests_state()
702 print(f"Result: {result}")
703
704 def save_xml_result(self):
705 if not self.ns.xmlpath and not self.testsuite_xml:
706 return
707
708 import xml.etree.ElementTree as ET
709 root = ET.Element("testsuites")
710
711 # Manually count the totals for the overall summary
712 totals = {'tests': 0, 'errors': 0, 'failures': 0}
713 for suite in self.testsuite_xml:
714 root.append(suite)
715 for k in totals:
716 try:
717 totals[k] += int(suite.get(k, 0))
718 except ValueError:
719 pass
720
721 for k, v in totals.items():
722 root.set(k, str(v))
723
724 xmlpath = os.path.join(os_helper.SAVEDCWD, self.ns.xmlpath)
725 with open(xmlpath, 'wb') as f:
726 for s in ET.tostringlist(root):
727 f.write(s)
728
729 def fix_umask(self):
730 if support.is_emscripten:
731 # Emscripten has default umask 0o777, which breaks some tests.
732 # see https://github.com/emscripten-core/emscripten/issues/17269
733 old_mask = os.umask(0)
734 if old_mask == 0o777:
735 os.umask(0o027)
736 else:
737 os.umask(old_mask)
738
739 def set_temp_dir(self):
740 if self.ns.tempdir:
741 self.tmp_dir = self.ns.tempdir
742
743 if not self.tmp_dir:
744 # When tests are run from the Python build directory, it is best practice
745 # to keep the test files in a subfolder. This eases the cleanup of leftover
746 # files using the "make distclean" command.
747 if sysconfig.is_python_build():
748 self.tmp_dir = sysconfig.get_config_var('abs_builddir')
749 if self.tmp_dir is None:
750 # bpo-30284: On Windows, only srcdir is available. Using
751 # abs_builddir mostly matters on UNIX when building Python
752 # out of the source tree, especially when the source tree
753 # is read only.
754 self.tmp_dir = sysconfig.get_config_var('srcdir')
755 self.tmp_dir = os.path.join(self.tmp_dir, 'build')
756 else:
757 self.tmp_dir = tempfile.gettempdir()
758
759 self.tmp_dir = os.path.abspath(self.tmp_dir)
760
761 def is_worker(self):
762 return (self.ns.worker_args is not None)
763
764 def create_temp_dir(self):
765 os.makedirs(self.tmp_dir, exist_ok=True)
766
767 # Define a writable temp dir that will be used as cwd while running
768 # the tests. The name of the dir includes the pid to allow parallel
769 # testing (see the -j option).
770 # Emscripten and WASI have stubbed getpid(), Emscripten has only
771 # milisecond clock resolution. Use randint() instead.
772 if sys.platform in {"emscripten", "wasi"}:
773 nounce = random.randint(0, 1_000_000)
774 else:
775 nounce = os.getpid()
776
777 if self.is_worker():
778 test_cwd = 'test_python_worker_{}'.format(nounce)
779 else:
780 test_cwd = 'test_python_{}'.format(nounce)
781 test_cwd += os_helper.FS_NONASCII
782 test_cwd = os.path.join(self.tmp_dir, test_cwd)
783 return test_cwd
784
785 def cleanup(self):
786 import glob
787
788 path = os.path.join(glob.escape(self.tmp_dir), 'test_python_*')
789 print("Cleanup %s directory" % self.tmp_dir)
790 for name in glob.glob(path):
791 if os.path.isdir(name):
792 print("Remove directory: %s" % name)
793 os_helper.rmtree(name)
794 else:
795 print("Remove file: %s" % name)
796 os_helper.unlink(name)
797
798 def main(self, tests=None, **kwargs):
799 self.parse_args(kwargs)
800
801 self.set_temp_dir()
802
803 self.fix_umask()
804
805 if self.ns.cleanup:
806 self.cleanup()
807 sys.exit(0)
808
809 test_cwd = self.create_temp_dir()
810
811 try:
812 # Run the tests in a context manager that temporarily changes the CWD
813 # to a temporary and writable directory. If it's not possible to
814 # create or change the CWD, the original CWD will be used.
815 # The original CWD is available from os_helper.SAVEDCWD.
816 with os_helper.temp_cwd(test_cwd, quiet=True):
817 # When using multiprocessing, worker processes will use test_cwd
818 # as their parent temporary directory. So when the main process
819 # exit, it removes also subdirectories of worker processes.
820 self.ns.tempdir = test_cwd
821
822 self._main(tests, kwargs)
823 except SystemExit as exc:
824 # bpo-38203: Python can hang at exit in Py_Finalize(), especially
825 # on threading._shutdown() call: put a timeout
826 if threading_helper.can_start_thread:
827 faulthandler.dump_traceback_later(EXIT_TIMEOUT, exit=True)
828
829 sys.exit(exc.code)
830
831 def getloadavg(self):
832 if self.win_load_tracker is not None:
833 return self.win_load_tracker.getloadavg()
834
835 if hasattr(os, 'getloadavg'):
836 return os.getloadavg()[0]
837
838 return None
839
840 def get_exitcode(self):
841 exitcode = 0
842 if self.bad:
843 exitcode = EXITCODE_BAD_TEST
844 elif self.interrupted:
845 exitcode = EXITCODE_INTERRUPTED
846 elif self.ns.fail_env_changed and self.environment_changed:
847 exitcode = EXITCODE_ENV_CHANGED
848 elif self.no_tests_run():
849 exitcode = EXITCODE_NO_TESTS_RAN
850 elif self.rerun and self.ns.fail_rerun:
851 exitcode = EXITCODE_RERUN_FAIL
852 return exitcode
853
854 def action_run_tests(self):
855 self.run_tests()
856 self.display_result()
857
858 need_rerun = self.need_rerun
859 if self.ns.rerun and need_rerun:
860 self.rerun_failed_tests(need_rerun)
861
862 self.display_summary()
863 self.finalize()
864
865 def _main(self, tests, kwargs):
866 if self.is_worker():
867 from test.libregrtest.runtest_mp import run_tests_worker
868 run_tests_worker(self.ns.worker_args)
869 return
870
871 if self.ns.wait:
872 input("Press any key to continue...")
873
874 setup_tests(self.ns)
875 self.find_tests(tests)
876
877 exitcode = 0
878 if self.ns.list_tests:
879 self.list_tests()
880 elif self.ns.list_cases:
881 self.list_cases()
882 else:
883 self.action_run_tests()
884 exitcode = self.get_exitcode()
885
886 sys.exit(exitcode)
887
888
889 def main(tests=None, **kwargs):
890 """Run the Python suite."""
891 Regrtest().main(tests=tests, **kwargs)