1 import faulthandler
2 import gc
3 import importlib
4 import io
5 import sys
6 import time
7 import traceback
8 import unittest
9
10 from test import support
11 from test.support import threading_helper
12
13 from .filter import match_test
14 from .result import State, TestResult, TestStats
15 from .runtests import RunTests
16 from .save_env import saved_test_environment
17 from .setup import setup_tests
18 from .testresult import get_test_runner
19 from .utils import (
20 TestName,
21 clear_caches, remove_testfn, abs_module_name, print_warning)
22
23
24 # Minimum duration of a test to display its duration or to mention that
25 # the test is running in background
26 PROGRESS_MIN_TIME = 30.0 # seconds
27
28
29 def run_unittest(test_mod):
30 loader = unittest.TestLoader()
31 tests = loader.loadTestsFromModule(test_mod)
32 for error in loader.errors:
33 print(error, file=sys.stderr)
34 if loader.errors:
35 raise Exception("errors while loading tests")
36 _filter_suite(tests, match_test)
37 return _run_suite(tests)
38
39 def _filter_suite(suite, pred):
40 """Recursively filter test cases in a suite based on a predicate."""
41 newtests = []
42 for test in suite._tests:
43 if isinstance(test, unittest.TestSuite):
44 _filter_suite(test, pred)
45 newtests.append(test)
46 else:
47 if pred(test):
48 newtests.append(test)
49 suite._tests = newtests
50
51 def _run_suite(suite):
52 """Run tests from a unittest.TestSuite-derived class."""
53 runner = get_test_runner(sys.stdout,
54 verbosity=support.verbose,
55 capture_output=(support.junit_xml_list is not None))
56
57 result = runner.run(suite)
58
59 if support.junit_xml_list is not None:
60 support.junit_xml_list.append(result.get_xml_element())
61
62 if not result.testsRun and not result.skipped and not result.errors:
63 raise support.TestDidNotRun
64 if not result.wasSuccessful():
65 stats = TestStats.from_unittest(result)
66 if len(result.errors) == 1 and not result.failures:
67 err = result.errors[0][1]
68 elif len(result.failures) == 1 and not result.errors:
69 err = result.failures[0][1]
70 else:
71 err = "multiple errors occurred"
72 if not support.verbose: err += "; run in verbose mode for details"
73 errors = [(str(tc), exc_str) for tc, exc_str in result.errors]
74 failures = [(str(tc), exc_str) for tc, exc_str in result.failures]
75 raise support.TestFailedWithDetails(err, errors, failures, stats=stats)
76 return result
77
78
79 def regrtest_runner(result: TestResult, test_func, runtests: RunTests) -> None:
80 # Run test_func(), collect statistics, and detect reference and memory
81 # leaks.
82 if runtests.hunt_refleak:
83 from .refleak import runtest_refleak
84 refleak, test_result = runtest_refleak(result.test_name, test_func,
85 runtests.hunt_refleak,
86 runtests.quiet)
87 else:
88 test_result = test_func()
89 refleak = False
90
91 if refleak:
92 result.state = State.REFLEAK
93
94 stats: TestStats | None
95
96 match test_result:
97 case TestStats():
98 stats = test_result
99 case unittest.TestResult():
100 stats = TestStats.from_unittest(test_result)
101 case None:
102 print_warning(f"{result.test_name} test runner returned None: {test_func}")
103 stats = None
104 case _:
105 # Don't import doctest at top level since only few tests return
106 # a doctest.TestResult instance.
107 import doctest
108 if isinstance(test_result, doctest.TestResults):
109 stats = TestStats.from_doctest(test_result)
110 else:
111 print_warning(f"Unknown test result type: {type(test_result)}")
112 stats = None
113
114 result.stats = stats
115
116
117 # Storage of uncollectable GC objects (gc.garbage)
118 GC_GARBAGE = []
119
120
121 def _load_run_test(result: TestResult, runtests: RunTests) -> None:
122 # Load the test module and run the tests.
123 test_name = result.test_name
124 module_name = abs_module_name(test_name, runtests.test_dir)
125 test_mod = importlib.import_module(module_name)
126
127 if hasattr(test_mod, "test_main"):
128 # https://github.com/python/cpython/issues/89392
129 raise Exception(f"Module {test_name} defines test_main() which "
130 f"is no longer supported by regrtest")
131 def test_func():
132 return run_unittest(test_mod)
133
134 try:
135 regrtest_runner(result, test_func, runtests)
136 finally:
137 # First kill any dangling references to open files etc.
138 # This can also issue some ResourceWarnings which would otherwise get
139 # triggered during the following test run, and possibly produce
140 # failures.
141 support.gc_collect()
142
143 remove_testfn(test_name, runtests.verbose)
144
145 if gc.garbage:
146 support.environment_altered = True
147 print_warning(f"{test_name} created {len(gc.garbage)} "
148 f"uncollectable object(s)")
149
150 # move the uncollectable objects somewhere,
151 # so we don't see them again
152 GC_GARBAGE.extend(gc.garbage)
153 gc.garbage.clear()
154
155 support.reap_children()
156
157
158 def _runtest_env_changed_exc(result: TestResult, runtests: RunTests,
159 display_failure: bool = True) -> None:
160 # Handle exceptions, detect environment changes.
161
162 # Reset the environment_altered flag to detect if a test altered
163 # the environment
164 support.environment_altered = False
165
166 pgo = runtests.pgo
167 if pgo:
168 display_failure = False
169 quiet = runtests.quiet
170
171 test_name = result.test_name
172 try:
173 clear_caches()
174 support.gc_collect()
175
176 with saved_test_environment(test_name,
177 runtests.verbose, quiet, pgo=pgo):
178 _load_run_test(result, runtests)
179 except support.ResourceDenied as exc:
180 if not quiet and not pgo:
181 print(f"{test_name} skipped -- {exc}", flush=True)
182 result.state = State.RESOURCE_DENIED
183 return
184 except unittest.SkipTest as exc:
185 if not quiet and not pgo:
186 print(f"{test_name} skipped -- {exc}", flush=True)
187 result.state = State.SKIPPED
188 return
189 except support.TestFailedWithDetails as exc:
190 msg = f"test {test_name} failed"
191 if display_failure:
192 msg = f"{msg} -- {exc}"
193 print(msg, file=sys.stderr, flush=True)
194 result.state = State.FAILED
195 result.errors = exc.errors
196 result.failures = exc.failures
197 result.stats = exc.stats
198 return
199 except support.TestFailed as exc:
200 msg = f"test {test_name} failed"
201 if display_failure:
202 msg = f"{msg} -- {exc}"
203 print(msg, file=sys.stderr, flush=True)
204 result.state = State.FAILED
205 result.stats = exc.stats
206 return
207 except support.TestDidNotRun:
208 result.state = State.DID_NOT_RUN
209 return
210 except KeyboardInterrupt:
211 print()
212 result.state = State.INTERRUPTED
213 return
214 except:
215 if not pgo:
216 msg = traceback.format_exc()
217 print(f"test {test_name} crashed -- {msg}",
218 file=sys.stderr, flush=True)
219 result.state = State.UNCAUGHT_EXC
220 return
221
222 if support.environment_altered:
223 result.set_env_changed()
224 # Don't override the state if it was already set (REFLEAK or ENV_CHANGED)
225 if result.state is None:
226 result.state = State.PASSED
227
228
229 def _runtest(result: TestResult, runtests: RunTests) -> None:
230 # Capture stdout and stderr, set faulthandler timeout,
231 # and create JUnit XML report.
232 verbose = runtests.verbose
233 output_on_failure = runtests.output_on_failure
234 timeout = runtests.timeout
235
236 if timeout is not None and threading_helper.can_start_thread:
237 use_timeout = True
238 faulthandler.dump_traceback_later(timeout, exit=True)
239 else:
240 use_timeout = False
241
242 try:
243 setup_tests(runtests)
244
245 if output_on_failure:
246 support.verbose = True
247
248 stream = io.StringIO()
249 orig_stdout = sys.stdout
250 orig_stderr = sys.stderr
251 print_warning = support.print_warning
252 orig_print_warnings_stderr = print_warning.orig_stderr
253
254 output = None
255 try:
256 sys.stdout = stream
257 sys.stderr = stream
258 # print_warning() writes into the temporary stream to preserve
259 # messages order. If support.environment_altered becomes true,
260 # warnings will be written to sys.stderr below.
261 print_warning.orig_stderr = stream
262
263 _runtest_env_changed_exc(result, runtests, display_failure=False)
264 # Ignore output if the test passed successfully
265 if result.state != State.PASSED:
266 output = stream.getvalue()
267 finally:
268 sys.stdout = orig_stdout
269 sys.stderr = orig_stderr
270 print_warning.orig_stderr = orig_print_warnings_stderr
271
272 if output is not None:
273 sys.stderr.write(output)
274 sys.stderr.flush()
275 else:
276 # Tell tests to be moderately quiet
277 support.verbose = verbose
278 _runtest_env_changed_exc(result, runtests,
279 display_failure=not verbose)
280
281 xml_list = support.junit_xml_list
282 if xml_list:
283 import xml.etree.ElementTree as ET
284 result.xml_data = [ET.tostring(x).decode('us-ascii')
285 for x in xml_list]
286 finally:
287 if use_timeout:
288 faulthandler.cancel_dump_traceback_later()
289 support.junit_xml_list = None
290
291
292 def run_single_test(test_name: TestName, runtests: RunTests) -> TestResult:
293 """Run a single test.
294
295 test_name -- the name of the test
296
297 Returns a TestResult.
298
299 If runtests.use_junit, xml_data is a list containing each generated
300 testsuite element.
301 """
302 start_time = time.perf_counter()
303 result = TestResult(test_name)
304 pgo = runtests.pgo
305 try:
306 _runtest(result, runtests)
307 except:
308 if not pgo:
309 msg = traceback.format_exc()
310 print(f"test {test_name} crashed -- {msg}",
311 file=sys.stderr, flush=True)
312 result.state = State.UNCAUGHT_EXC
313
314 sys.stdout.flush()
315 sys.stderr.flush()
316
317 result.duration = time.perf_counter() - start_time
318 return result