python (3.12.0)
1 import dataclasses
2 import doctest
3 import faulthandler
4 import gc
5 import importlib
6 import io
7 import os
8 import sys
9 import time
10 import traceback
11 import unittest
12
13 from test import support
14 from test.support import TestStats
15 from test.support import os_helper
16 from test.support import threading_helper
17 from test.libregrtest.cmdline import Namespace
18 from test.libregrtest.save_env import saved_test_environment
19 from test.libregrtest.utils import clear_caches, format_duration, print_warning
20
21
22 MatchTests = list[str]
23 MatchTestsDict = dict[str, MatchTests]
24
25
26 # Avoid enum.Enum to reduce the number of imports when tests are run
27 class ESC[4;38;5;81mState:
28 PASSED = "PASSED"
29 FAILED = "FAILED"
30 SKIPPED = "SKIPPED"
31 UNCAUGHT_EXC = "UNCAUGHT_EXC"
32 REFLEAK = "REFLEAK"
33 ENV_CHANGED = "ENV_CHANGED"
34 RESOURCE_DENIED = "RESOURCE_DENIED"
35 INTERRUPTED = "INTERRUPTED"
36 MULTIPROCESSING_ERROR = "MULTIPROCESSING_ERROR"
37 DID_NOT_RUN = "DID_NOT_RUN"
38 TIMEOUT = "TIMEOUT"
39
40 @staticmethod
41 def is_failed(state):
42 return state in {
43 State.FAILED,
44 State.UNCAUGHT_EXC,
45 State.REFLEAK,
46 State.MULTIPROCESSING_ERROR,
47 State.TIMEOUT}
48
49 @staticmethod
50 def has_meaningful_duration(state):
51 # Consider that the duration is meaningless for these cases.
52 # For example, if a whole test file is skipped, its duration
53 # is unlikely to be the duration of executing its tests,
54 # but just the duration to execute code which skips the test.
55 return state not in {
56 State.SKIPPED,
57 State.RESOURCE_DENIED,
58 State.INTERRUPTED,
59 State.MULTIPROCESSING_ERROR,
60 State.DID_NOT_RUN}
61
62 @staticmethod
63 def must_stop(state):
64 return state in {
65 State.INTERRUPTED,
66 State.MULTIPROCESSING_ERROR}
67
68
69 # gh-90681: When rerunning tests, we might need to rerun the whole
70 # class or module suite if some its life-cycle hooks fail.
71 # Test level hooks are not affected.
72 _TEST_LIFECYCLE_HOOKS = frozenset((
73 'setUpClass', 'tearDownClass',
74 'setUpModule', 'tearDownModule',
75 ))
76
77 def normalize_test_name(test_full_name, *, is_error=False):
78 short_name = test_full_name.split(" ")[0]
79 if is_error and short_name in _TEST_LIFECYCLE_HOOKS:
80 if test_full_name.startswith(('setUpModule (', 'tearDownModule (')):
81 # if setUpModule() or tearDownModule() failed, don't filter
82 # tests with the test file name, don't use use filters.
83 return None
84
85 # This means that we have a failure in a life-cycle hook,
86 # we need to rerun the whole module or class suite.
87 # Basically the error looks like this:
88 # ERROR: setUpClass (test.test_reg_ex.RegTest)
89 # or
90 # ERROR: setUpModule (test.test_reg_ex)
91 # So, we need to parse the class / module name.
92 lpar = test_full_name.index('(')
93 rpar = test_full_name.index(')')
94 return test_full_name[lpar + 1: rpar].split('.')[-1]
95 return short_name
96
97
98 @dataclasses.dataclass(slots=True)
99 class ESC[4;38;5;81mTestResult:
100 test_name: str
101 state: str | None = None
102 # Test duration in seconds
103 duration: float | None = None
104 xml_data: list[str] | None = None
105 stats: TestStats | None = None
106
107 # errors and failures copied from support.TestFailedWithDetails
108 errors: list[tuple[str, str]] | None = None
109 failures: list[tuple[str, str]] | None = None
110
111 def is_failed(self, fail_env_changed: bool) -> bool:
112 if self.state == State.ENV_CHANGED:
113 return fail_env_changed
114 return State.is_failed(self.state)
115
116 def _format_failed(self):
117 if self.errors and self.failures:
118 le = len(self.errors)
119 lf = len(self.failures)
120 error_s = "error" + ("s" if le > 1 else "")
121 failure_s = "failure" + ("s" if lf > 1 else "")
122 return f"{self.test_name} failed ({le} {error_s}, {lf} {failure_s})"
123
124 if self.errors:
125 le = len(self.errors)
126 error_s = "error" + ("s" if le > 1 else "")
127 return f"{self.test_name} failed ({le} {error_s})"
128
129 if self.failures:
130 lf = len(self.failures)
131 failure_s = "failure" + ("s" if lf > 1 else "")
132 return f"{self.test_name} failed ({lf} {failure_s})"
133
134 return f"{self.test_name} failed"
135
136 def __str__(self) -> str:
137 match self.state:
138 case State.PASSED:
139 return f"{self.test_name} passed"
140 case State.FAILED:
141 return self._format_failed()
142 case State.SKIPPED:
143 return f"{self.test_name} skipped"
144 case State.UNCAUGHT_EXC:
145 return f"{self.test_name} failed (uncaught exception)"
146 case State.REFLEAK:
147 return f"{self.test_name} failed (reference leak)"
148 case State.ENV_CHANGED:
149 return f"{self.test_name} failed (env changed)"
150 case State.RESOURCE_DENIED:
151 return f"{self.test_name} skipped (resource denied)"
152 case State.INTERRUPTED:
153 return f"{self.test_name} interrupted"
154 case State.MULTIPROCESSING_ERROR:
155 return f"{self.test_name} process crashed"
156 case State.DID_NOT_RUN:
157 return f"{self.test_name} ran no tests"
158 case State.TIMEOUT:
159 return f"{self.test_name} timed out ({format_duration(self.duration)})"
160 case _:
161 raise ValueError("unknown result state: {state!r}")
162
163 def has_meaningful_duration(self):
164 return State.has_meaningful_duration(self.state)
165
166 def set_env_changed(self):
167 if self.state is None or self.state == State.PASSED:
168 self.state = State.ENV_CHANGED
169
170 def must_stop(self, fail_fast: bool, fail_env_changed: bool) -> bool:
171 if State.must_stop(self.state):
172 return True
173 if fail_fast and self.is_failed(fail_env_changed):
174 return True
175 return False
176
177 def get_rerun_match_tests(self):
178 match_tests = []
179
180 errors = self.errors or []
181 failures = self.failures or []
182 for error_list, is_error in (
183 (errors, True),
184 (failures, False),
185 ):
186 for full_name, *_ in error_list:
187 match_name = normalize_test_name(full_name, is_error=is_error)
188 if match_name is None:
189 # 'setUpModule (test.test_sys)': don't filter tests
190 return None
191 if not match_name:
192 error_type = "ERROR" if is_error else "FAIL"
193 print_warning(f"rerun failed to parse {error_type} test name: "
194 f"{full_name!r}: don't filter tests")
195 return None
196 match_tests.append(match_name)
197
198 return match_tests
199
200
201 @dataclasses.dataclass(slots=True, frozen=True)
202 class ESC[4;38;5;81mRunTests:
203 tests: list[str]
204 match_tests: MatchTestsDict | None = None
205 rerun: bool = False
206 forever: bool = False
207
208 def get_match_tests(self, test_name) -> MatchTests | None:
209 if self.match_tests is not None:
210 return self.match_tests.get(test_name, None)
211 else:
212 return None
213
214 def iter_tests(self):
215 tests = tuple(self.tests)
216 if self.forever:
217 while True:
218 yield from tests
219 else:
220 yield from tests
221
222
223 # Minimum duration of a test to display its duration or to mention that
224 # the test is running in background
225 PROGRESS_MIN_TIME = 30.0 # seconds
226
227 #If these test directories are encountered recurse into them and treat each
228 # test_ .py or dir as a separate test module. This can increase parallelism.
229 # Beware this can't generally be done for any directory with sub-tests as the
230 # __init__.py may do things which alter what tests are to be run.
231
232 SPLITTESTDIRS = {
233 "test_asyncio",
234 "test_concurrent_futures",
235 "test_multiprocessing_fork",
236 "test_multiprocessing_forkserver",
237 "test_multiprocessing_spawn",
238 }
239
240
241 def findtestdir(path=None):
242 return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
243
244
245 def findtests(*, testdir=None, exclude=(),
246 split_test_dirs=SPLITTESTDIRS, base_mod=""):
247 """Return a list of all applicable test modules."""
248 testdir = findtestdir(testdir)
249 tests = []
250 for name in os.listdir(testdir):
251 mod, ext = os.path.splitext(name)
252 if (not mod.startswith("test_")) or (mod in exclude):
253 continue
254 if mod in split_test_dirs:
255 subdir = os.path.join(testdir, mod)
256 mod = f"{base_mod or 'test'}.{mod}"
257 tests.extend(findtests(testdir=subdir, exclude=exclude,
258 split_test_dirs=split_test_dirs, base_mod=mod))
259 elif ext in (".py", ""):
260 tests.append(f"{base_mod}.{mod}" if base_mod else mod)
261 return sorted(tests)
262
263
264 def split_test_packages(tests, *, testdir=None, exclude=(),
265 split_test_dirs=SPLITTESTDIRS):
266 testdir = findtestdir(testdir)
267 splitted = []
268 for name in tests:
269 if name in split_test_dirs:
270 subdir = os.path.join(testdir, name)
271 splitted.extend(findtests(testdir=subdir, exclude=exclude,
272 split_test_dirs=split_test_dirs,
273 base_mod=name))
274 else:
275 splitted.append(name)
276 return splitted
277
278
279 def abs_module_name(test_name: str, test_dir: str | None) -> str:
280 if test_name.startswith('test.') or test_dir:
281 return test_name
282 else:
283 # Import it from the test package
284 return 'test.' + test_name
285
286
287 def setup_support(ns: Namespace):
288 support.PGO = ns.pgo
289 support.PGO_EXTENDED = ns.pgo_extended
290 support.set_match_tests(ns.match_tests, ns.ignore_tests)
291 support.failfast = ns.failfast
292 support.verbose = ns.verbose
293 if ns.xmlpath:
294 support.junit_xml_list = []
295 else:
296 support.junit_xml_list = None
297
298
299 def _runtest(result: TestResult, ns: Namespace) -> None:
300 # Capture stdout and stderr, set faulthandler timeout,
301 # and create JUnit XML report.
302 verbose = ns.verbose
303 output_on_failure = ns.verbose3
304 timeout = ns.timeout
305
306 use_timeout = (
307 timeout is not None and threading_helper.can_start_thread
308 )
309 if use_timeout:
310 faulthandler.dump_traceback_later(timeout, exit=True)
311
312 try:
313 setup_support(ns)
314
315 if output_on_failure:
316 support.verbose = True
317
318 stream = io.StringIO()
319 orig_stdout = sys.stdout
320 orig_stderr = sys.stderr
321 print_warning = support.print_warning
322 orig_print_warnings_stderr = print_warning.orig_stderr
323
324 output = None
325 try:
326 sys.stdout = stream
327 sys.stderr = stream
328 # print_warning() writes into the temporary stream to preserve
329 # messages order. If support.environment_altered becomes true,
330 # warnings will be written to sys.stderr below.
331 print_warning.orig_stderr = stream
332
333 _runtest_env_changed_exc(result, ns, display_failure=False)
334 # Ignore output if the test passed successfully
335 if result.state != State.PASSED:
336 output = stream.getvalue()
337 finally:
338 sys.stdout = orig_stdout
339 sys.stderr = orig_stderr
340 print_warning.orig_stderr = orig_print_warnings_stderr
341
342 if output is not None:
343 sys.stderr.write(output)
344 sys.stderr.flush()
345 else:
346 # Tell tests to be moderately quiet
347 support.verbose = verbose
348 _runtest_env_changed_exc(result, ns, display_failure=not verbose)
349
350 xml_list = support.junit_xml_list
351 if xml_list:
352 import xml.etree.ElementTree as ET
353 result.xml_data = [ET.tostring(x).decode('us-ascii')
354 for x in xml_list]
355 finally:
356 if use_timeout:
357 faulthandler.cancel_dump_traceback_later()
358 support.junit_xml_list = None
359
360
361 def runtest(ns: Namespace, test_name: str) -> TestResult:
362 """Run a single test.
363
364 ns -- regrtest namespace of options
365 test_name -- the name of the test
366
367 Returns a TestResult.
368
369 If ns.xmlpath is not None, xml_data is a list containing each
370 generated testsuite element.
371 """
372 start_time = time.perf_counter()
373 result = TestResult(test_name)
374 try:
375 _runtest(result, ns)
376 except:
377 if not ns.pgo:
378 msg = traceback.format_exc()
379 print(f"test {test_name} crashed -- {msg}",
380 file=sys.stderr, flush=True)
381 result.state = State.UNCAUGHT_EXC
382 result.duration = time.perf_counter() - start_time
383 return result
384
385
386 def run_unittest(test_mod):
387 loader = unittest.TestLoader()
388 tests = loader.loadTestsFromModule(test_mod)
389 for error in loader.errors:
390 print(error, file=sys.stderr)
391 if loader.errors:
392 raise Exception("errors while loading tests")
393 return support.run_unittest(tests)
394
395
396 def save_env(ns: Namespace, test_name: str):
397 return saved_test_environment(test_name, ns.verbose, ns.quiet, pgo=ns.pgo)
398
399
400 def regrtest_runner(result, test_func, ns) -> None:
401 # Run test_func(), collect statistics, and detect reference and memory
402 # leaks.
403 if ns.huntrleaks:
404 from test.libregrtest.refleak import dash_R
405 refleak, test_result = dash_R(ns, result.test_name, test_func)
406 else:
407 test_result = test_func()
408 refleak = False
409
410 if refleak:
411 result.state = State.REFLEAK
412
413 match test_result:
414 case TestStats():
415 stats = test_result
416 case unittest.TestResult():
417 stats = TestStats.from_unittest(test_result)
418 case doctest.TestResults():
419 stats = TestStats.from_doctest(test_result)
420 case None:
421 print_warning(f"{result.test_name} test runner returned None: {test_func}")
422 stats = None
423 case _:
424 print_warning(f"Unknown test result type: {type(test_result)}")
425 stats = None
426
427 result.stats = stats
428
429
430 # Storage of uncollectable objects
431 FOUND_GARBAGE = []
432
433
434 def _load_run_test(result: TestResult, ns: Namespace) -> None:
435 # Load the test function, run the test function.
436 module_name = abs_module_name(result.test_name, ns.testdir)
437
438 # Remove the module from sys.module to reload it if it was already imported
439 sys.modules.pop(module_name, None)
440
441 test_mod = importlib.import_module(module_name)
442
443 if hasattr(test_mod, "test_main"):
444 # https://github.com/python/cpython/issues/89392
445 raise Exception(f"Module {result.test_name} defines test_main() which is no longer supported by regrtest")
446 def test_func():
447 return run_unittest(test_mod)
448
449 try:
450 with save_env(ns, result.test_name):
451 regrtest_runner(result, test_func, ns)
452 finally:
453 # First kill any dangling references to open files etc.
454 # This can also issue some ResourceWarnings which would otherwise get
455 # triggered during the following test run, and possibly produce
456 # failures.
457 support.gc_collect()
458
459 remove_testfn(result.test_name, ns.verbose)
460
461 if gc.garbage:
462 support.environment_altered = True
463 print_warning(f"{result.test_name} created {len(gc.garbage)} "
464 f"uncollectable object(s)")
465
466 # move the uncollectable objects somewhere,
467 # so we don't see them again
468 FOUND_GARBAGE.extend(gc.garbage)
469 gc.garbage.clear()
470
471 support.reap_children()
472
473
474 def _runtest_env_changed_exc(result: TestResult, ns: Namespace,
475 display_failure: bool = True) -> None:
476 # Detect environment changes, handle exceptions.
477
478 # Reset the environment_altered flag to detect if a test altered
479 # the environment
480 support.environment_altered = False
481
482 if ns.pgo:
483 display_failure = False
484
485 test_name = result.test_name
486 try:
487 clear_caches()
488 support.gc_collect()
489
490 with save_env(ns, test_name):
491 _load_run_test(result, ns)
492 except support.ResourceDenied as msg:
493 if not ns.quiet and not ns.pgo:
494 print(f"{test_name} skipped -- {msg}", flush=True)
495 result.state = State.RESOURCE_DENIED
496 return
497 except unittest.SkipTest as msg:
498 if not ns.quiet and not ns.pgo:
499 print(f"{test_name} skipped -- {msg}", flush=True)
500 result.state = State.SKIPPED
501 return
502 except support.TestFailedWithDetails as exc:
503 msg = f"test {test_name} failed"
504 if display_failure:
505 msg = f"{msg} -- {exc}"
506 print(msg, file=sys.stderr, flush=True)
507 result.state = State.FAILED
508 result.errors = exc.errors
509 result.failures = exc.failures
510 result.stats = exc.stats
511 return
512 except support.TestFailed as exc:
513 msg = f"test {test_name} failed"
514 if display_failure:
515 msg = f"{msg} -- {exc}"
516 print(msg, file=sys.stderr, flush=True)
517 result.state = State.FAILED
518 result.stats = exc.stats
519 return
520 except support.TestDidNotRun:
521 result.state = State.DID_NOT_RUN
522 return
523 except KeyboardInterrupt:
524 print()
525 result.state = State.INTERRUPTED
526 return
527 except:
528 if not ns.pgo:
529 msg = traceback.format_exc()
530 print(f"test {test_name} crashed -- {msg}",
531 file=sys.stderr, flush=True)
532 result.state = State.UNCAUGHT_EXC
533 return
534
535 if support.environment_altered:
536 result.set_env_changed()
537 # Don't override the state if it was already set (REFLEAK or ENV_CHANGED)
538 if result.state is None:
539 result.state = State.PASSED
540
541
542 def remove_testfn(test_name: str, verbose: int) -> None:
543 # Try to clean up os_helper.TESTFN if left behind.
544 #
545 # While tests shouldn't leave any files or directories behind, when a test
546 # fails that can be tedious for it to arrange. The consequences can be
547 # especially nasty on Windows, since if a test leaves a file open, it
548 # cannot be deleted by name (while there's nothing we can do about that
549 # here either, we can display the name of the offending test, which is a
550 # real help).
551 name = os_helper.TESTFN
552 if not os.path.exists(name):
553 return
554
555 if os.path.isdir(name):
556 import shutil
557 kind, nuker = "directory", shutil.rmtree
558 elif os.path.isfile(name):
559 kind, nuker = "file", os.unlink
560 else:
561 raise RuntimeError(f"os.path says {name!r} exists but is neither "
562 f"directory nor file")
563
564 if verbose:
565 print_warning(f"{test_name} left behind {kind} {name!r}")
566 support.environment_altered = True
567
568 try:
569 import stat
570 # fix possible permissions problems that might prevent cleanup
571 os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
572 nuker(name)
573 except Exception as exc:
574 print_warning(f"{test_name} left behind {kind} {name!r} "
575 f"and it couldn't be removed: {exc}")