1 import contextlib
2 import faulthandler
3 import locale
4 import math
5 import os.path
6 import platform
7 import random
8 import shlex
9 import signal
10 import subprocess
11 import sys
12 import sysconfig
13 import tempfile
14 import textwrap
15 from collections.abc import Callable, Iterable
16
17 from test import support
18 from test.support import os_helper
19 from test.support import threading_helper
20
21
22 # All temporary files and temporary directories created by libregrtest should
23 # use TMP_PREFIX so cleanup_temp_dir() can remove them all.
24 TMP_PREFIX = 'test_python_'
25 WORK_DIR_PREFIX = TMP_PREFIX
26 WORKER_WORK_DIR_PREFIX = WORK_DIR_PREFIX + 'worker_'
27
28 # bpo-38203: Maximum delay in seconds to exit Python (call Py_Finalize()).
29 # Used to protect against threading._shutdown() hang.
30 # Must be smaller than buildbot "1200 seconds without output" limit.
31 EXIT_TIMEOUT = 120.0
32
33
34 ALL_RESOURCES = ('audio', 'curses', 'largefile', 'network',
35 'decimal', 'cpu', 'subprocess', 'urlfetch', 'gui', 'walltime')
36
37 # Other resources excluded from --use=all:
38 #
39 # - extralagefile (ex: test_zipfile64): really too slow to be enabled
40 # "by default"
41 # - tzdata: while needed to validate fully test_datetime, it makes
42 # test_datetime too slow (15-20 min on some buildbots) and so is disabled by
43 # default (see bpo-30822).
44 RESOURCE_NAMES = ALL_RESOURCES + ('extralargefile', 'tzdata')
45
46
47 # Types for types hints
48 StrPath = str
49 TestName = str
50 StrJSON = str
51 TestTuple = tuple[TestName, ...]
52 TestList = list[TestName]
53 # --match and --ignore options: list of patterns
54 # ('*' joker character can be used)
55 TestFilter = list[tuple[TestName, bool]]
56 FilterTuple = tuple[TestName, ...]
57 FilterDict = dict[TestName, FilterTuple]
58
59
60 def format_duration(seconds):
61 ms = math.ceil(seconds * 1e3)
62 seconds, ms = divmod(ms, 1000)
63 minutes, seconds = divmod(seconds, 60)
64 hours, minutes = divmod(minutes, 60)
65
66 parts = []
67 if hours:
68 parts.append('%s hour' % hours)
69 if minutes:
70 parts.append('%s min' % minutes)
71 if seconds:
72 if parts:
73 # 2 min 1 sec
74 parts.append('%s sec' % seconds)
75 else:
76 # 1.0 sec
77 parts.append('%.1f sec' % (seconds + ms / 1000))
78 if not parts:
79 return '%s ms' % ms
80
81 parts = parts[:2]
82 return ' '.join(parts)
83
84
85 def strip_py_suffix(names: list[str] | None) -> None:
86 if not names:
87 return
88 for idx, name in enumerate(names):
89 basename, ext = os.path.splitext(name)
90 if ext == '.py':
91 names[idx] = basename
92
93
94 def plural(n, singular, plural=None):
95 if n == 1:
96 return singular
97 elif plural is not None:
98 return plural
99 else:
100 return singular + 's'
101
102
103 def count(n, word):
104 if n == 1:
105 return f"{n} {word}"
106 else:
107 return f"{n} {word}s"
108
109
110 def printlist(x, width=70, indent=4, file=None):
111 """Print the elements of iterable x to stdout.
112
113 Optional arg width (default 70) is the maximum line length.
114 Optional arg indent (default 4) is the number of blanks with which to
115 begin each line.
116 """
117
118 blanks = ' ' * indent
119 # Print the sorted list: 'x' may be a '--random' list or a set()
120 print(textwrap.fill(' '.join(str(elt) for elt in sorted(x)), width,
121 initial_indent=blanks, subsequent_indent=blanks),
122 file=file)
123
124
125 def print_warning(msg):
126 support.print_warning(msg)
127
128
129 orig_unraisablehook = None
130
131
132 def regrtest_unraisable_hook(unraisable):
133 global orig_unraisablehook
134 support.environment_altered = True
135 support.print_warning("Unraisable exception")
136 old_stderr = sys.stderr
137 try:
138 support.flush_std_streams()
139 sys.stderr = support.print_warning.orig_stderr
140 orig_unraisablehook(unraisable)
141 sys.stderr.flush()
142 finally:
143 sys.stderr = old_stderr
144
145
146 def setup_unraisable_hook():
147 global orig_unraisablehook
148 orig_unraisablehook = sys.unraisablehook
149 sys.unraisablehook = regrtest_unraisable_hook
150
151
152 orig_threading_excepthook = None
153
154
155 def regrtest_threading_excepthook(args):
156 global orig_threading_excepthook
157 support.environment_altered = True
158 support.print_warning(f"Uncaught thread exception: {args.exc_type.__name__}")
159 old_stderr = sys.stderr
160 try:
161 support.flush_std_streams()
162 sys.stderr = support.print_warning.orig_stderr
163 orig_threading_excepthook(args)
164 sys.stderr.flush()
165 finally:
166 sys.stderr = old_stderr
167
168
169 def setup_threading_excepthook():
170 global orig_threading_excepthook
171 import threading
172 orig_threading_excepthook = threading.excepthook
173 threading.excepthook = regrtest_threading_excepthook
174
175
176 def clear_caches():
177 # Clear the warnings registry, so they can be displayed again
178 for mod in sys.modules.values():
179 if hasattr(mod, '__warningregistry__'):
180 del mod.__warningregistry__
181
182 # Flush standard output, so that buffered data is sent to the OS and
183 # associated Python objects are reclaimed.
184 for stream in (sys.stdout, sys.stderr, sys.__stdout__, sys.__stderr__):
185 if stream is not None:
186 stream.flush()
187
188 # Clear assorted module caches.
189 # Don't worry about resetting the cache if the module is not loaded
190 try:
191 distutils_dir_util = sys.modules['distutils.dir_util']
192 except KeyError:
193 pass
194 else:
195 distutils_dir_util._path_created.clear()
196
197 try:
198 re = sys.modules['re']
199 except KeyError:
200 pass
201 else:
202 re.purge()
203
204 try:
205 _strptime = sys.modules['_strptime']
206 except KeyError:
207 pass
208 else:
209 _strptime._regex_cache.clear()
210
211 try:
212 urllib_parse = sys.modules['urllib.parse']
213 except KeyError:
214 pass
215 else:
216 urllib_parse.clear_cache()
217
218 try:
219 urllib_request = sys.modules['urllib.request']
220 except KeyError:
221 pass
222 else:
223 urllib_request.urlcleanup()
224
225 try:
226 linecache = sys.modules['linecache']
227 except KeyError:
228 pass
229 else:
230 linecache.clearcache()
231
232 try:
233 mimetypes = sys.modules['mimetypes']
234 except KeyError:
235 pass
236 else:
237 mimetypes._default_mime_types()
238
239 try:
240 filecmp = sys.modules['filecmp']
241 except KeyError:
242 pass
243 else:
244 filecmp._cache.clear()
245
246 try:
247 struct = sys.modules['struct']
248 except KeyError:
249 pass
250 else:
251 struct._clearcache()
252
253 try:
254 doctest = sys.modules['doctest']
255 except KeyError:
256 pass
257 else:
258 doctest.master = None
259
260 try:
261 ctypes = sys.modules['ctypes']
262 except KeyError:
263 pass
264 else:
265 ctypes._reset_cache()
266
267 try:
268 typing = sys.modules['typing']
269 except KeyError:
270 pass
271 else:
272 for f in typing._cleanups:
273 f()
274
275
276 def get_build_info():
277 # Get most important configure and build options as a list of strings.
278 # Example: ['debug', 'ASAN+MSAN'] or ['release', 'LTO+PGO'].
279
280 config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
281 cflags = sysconfig.get_config_var('PY_CFLAGS') or ''
282 cflags_nodist = sysconfig.get_config_var('PY_CFLAGS_NODIST') or ''
283 ldflags_nodist = sysconfig.get_config_var('PY_LDFLAGS_NODIST') or ''
284
285 build = []
286
287 # --disable-gil
288 if sysconfig.get_config_var('Py_NOGIL'):
289 build.append("nogil")
290
291 if hasattr(sys, 'gettotalrefcount'):
292 # --with-pydebug
293 build.append('debug')
294
295 if '-DNDEBUG' in (cflags + cflags_nodist):
296 build.append('without_assert')
297 else:
298 build.append('release')
299
300 if '--with-assertions' in config_args:
301 build.append('with_assert')
302 elif '-DNDEBUG' not in (cflags + cflags_nodist):
303 build.append('with_assert')
304
305 # --enable-framework=name
306 framework = sysconfig.get_config_var('PYTHONFRAMEWORK')
307 if framework:
308 build.append(f'framework={framework}')
309
310 # --enable-shared
311 shared = int(sysconfig.get_config_var('PY_ENABLE_SHARED') or '0')
312 if shared:
313 build.append('shared')
314
315 # --with-lto
316 optimizations = []
317 if '-flto=thin' in ldflags_nodist:
318 optimizations.append('ThinLTO')
319 elif '-flto' in ldflags_nodist:
320 optimizations.append('LTO')
321
322 if support.check_cflags_pgo():
323 # PGO (--enable-optimizations)
324 optimizations.append('PGO')
325 if optimizations:
326 build.append('+'.join(optimizations))
327
328 # --with-address-sanitizer
329 sanitizers = []
330 if support.check_sanitizer(address=True):
331 sanitizers.append("ASAN")
332 # --with-memory-sanitizer
333 if support.check_sanitizer(memory=True):
334 sanitizers.append("MSAN")
335 # --with-undefined-behavior-sanitizer
336 if support.check_sanitizer(ub=True):
337 sanitizers.append("UBSAN")
338 if sanitizers:
339 build.append('+'.join(sanitizers))
340
341 # --with-trace-refs
342 if hasattr(sys, 'getobjects'):
343 build.append("TraceRefs")
344 # --enable-pystats
345 if hasattr(sys, '_stats_on'):
346 build.append("pystats")
347 # --with-valgrind
348 if sysconfig.get_config_var('WITH_VALGRIND'):
349 build.append("valgrind")
350 # --with-dtrace
351 if sysconfig.get_config_var('WITH_DTRACE'):
352 build.append("dtrace")
353
354 return build
355
356
357 def get_temp_dir(tmp_dir: StrPath | None = None) -> StrPath:
358 if tmp_dir:
359 tmp_dir = os.path.expanduser(tmp_dir)
360 else:
361 # When tests are run from the Python build directory, it is best practice
362 # to keep the test files in a subfolder. This eases the cleanup of leftover
363 # files using the "make distclean" command.
364 if sysconfig.is_python_build():
365 if not support.is_wasi:
366 tmp_dir = sysconfig.get_config_var('abs_builddir')
367 if tmp_dir is None:
368 tmp_dir = sysconfig.get_config_var('abs_srcdir')
369 if not tmp_dir:
370 # gh-74470: On Windows, only srcdir is available. Using
371 # abs_builddir mostly matters on UNIX when building
372 # Python out of the source tree, especially when the
373 # source tree is read only.
374 tmp_dir = sysconfig.get_config_var('srcdir')
375 if not tmp_dir:
376 raise RuntimeError(
377 "Could not determine the correct value for tmp_dir"
378 )
379 tmp_dir = os.path.join(tmp_dir, 'build')
380 else:
381 # WASI platform
382 tmp_dir = sysconfig.get_config_var('projectbase')
383 if not tmp_dir:
384 raise RuntimeError(
385 "sysconfig.get_config_var('projectbase') "
386 f"unexpectedly returned {tmp_dir!r} on WASI"
387 )
388 tmp_dir = os.path.join(tmp_dir, 'build')
389
390 # When get_temp_dir() is called in a worker process,
391 # get_temp_dir() path is different than in the parent process
392 # which is not a WASI process. So the parent does not create
393 # the same "tmp_dir" than the test worker process.
394 os.makedirs(tmp_dir, exist_ok=True)
395 else:
396 tmp_dir = tempfile.gettempdir()
397
398 return os.path.abspath(tmp_dir)
399
400
401 def fix_umask():
402 if support.is_emscripten:
403 # Emscripten has default umask 0o777, which breaks some tests.
404 # see https://github.com/emscripten-core/emscripten/issues/17269
405 old_mask = os.umask(0)
406 if old_mask == 0o777:
407 os.umask(0o027)
408 else:
409 os.umask(old_mask)
410
411
412 def get_work_dir(parent_dir: StrPath, worker: bool = False) -> StrPath:
413 # Define a writable temp dir that will be used as cwd while running
414 # the tests. The name of the dir includes the pid to allow parallel
415 # testing (see the -j option).
416 # Emscripten and WASI have stubbed getpid(), Emscripten has only
417 # milisecond clock resolution. Use randint() instead.
418 if support.is_emscripten or support.is_wasi:
419 nounce = random.randint(0, 1_000_000)
420 else:
421 nounce = os.getpid()
422
423 if worker:
424 work_dir = WORK_DIR_PREFIX + str(nounce)
425 else:
426 work_dir = WORKER_WORK_DIR_PREFIX + str(nounce)
427 work_dir += os_helper.FS_NONASCII
428 work_dir = os.path.join(parent_dir, work_dir)
429 return work_dir
430
431
432 @contextlib.contextmanager
433 def exit_timeout():
434 try:
435 yield
436 except SystemExit as exc:
437 # bpo-38203: Python can hang at exit in Py_Finalize(), especially
438 # on threading._shutdown() call: put a timeout
439 if threading_helper.can_start_thread:
440 faulthandler.dump_traceback_later(EXIT_TIMEOUT, exit=True)
441 sys.exit(exc.code)
442
443
444 def remove_testfn(test_name: TestName, verbose: int) -> None:
445 # Try to clean up os_helper.TESTFN if left behind.
446 #
447 # While tests shouldn't leave any files or directories behind, when a test
448 # fails that can be tedious for it to arrange. The consequences can be
449 # especially nasty on Windows, since if a test leaves a file open, it
450 # cannot be deleted by name (while there's nothing we can do about that
451 # here either, we can display the name of the offending test, which is a
452 # real help).
453 name = os_helper.TESTFN
454 if not os.path.exists(name):
455 return
456
457 nuker: Callable[[str], None]
458 if os.path.isdir(name):
459 import shutil
460 kind, nuker = "directory", shutil.rmtree
461 elif os.path.isfile(name):
462 kind, nuker = "file", os.unlink
463 else:
464 raise RuntimeError(f"os.path says {name!r} exists but is neither "
465 f"directory nor file")
466
467 if verbose:
468 print_warning(f"{test_name} left behind {kind} {name!r}")
469 support.environment_altered = True
470
471 try:
472 import stat
473 # fix possible permissions problems that might prevent cleanup
474 os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
475 nuker(name)
476 except Exception as exc:
477 print_warning(f"{test_name} left behind {kind} {name!r} "
478 f"and it couldn't be removed: {exc}")
479
480
481 def abs_module_name(test_name: TestName, test_dir: StrPath | None) -> TestName:
482 if test_name.startswith('test.') or test_dir:
483 return test_name
484 else:
485 # Import it from the test package
486 return 'test.' + test_name
487
488
489 # gh-90681: When rerunning tests, we might need to rerun the whole
490 # class or module suite if some its life-cycle hooks fail.
491 # Test level hooks are not affected.
492 _TEST_LIFECYCLE_HOOKS = frozenset((
493 'setUpClass', 'tearDownClass',
494 'setUpModule', 'tearDownModule',
495 ))
496
497 def normalize_test_name(test_full_name, *, is_error=False):
498 short_name = test_full_name.split(" ")[0]
499 if is_error and short_name in _TEST_LIFECYCLE_HOOKS:
500 if test_full_name.startswith(('setUpModule (', 'tearDownModule (')):
501 # if setUpModule() or tearDownModule() failed, don't filter
502 # tests with the test file name, don't use use filters.
503 return None
504
505 # This means that we have a failure in a life-cycle hook,
506 # we need to rerun the whole module or class suite.
507 # Basically the error looks like this:
508 # ERROR: setUpClass (test.test_reg_ex.RegTest)
509 # or
510 # ERROR: setUpModule (test.test_reg_ex)
511 # So, we need to parse the class / module name.
512 lpar = test_full_name.index('(')
513 rpar = test_full_name.index(')')
514 return test_full_name[lpar + 1: rpar].split('.')[-1]
515 return short_name
516
517
518 def adjust_rlimit_nofile():
519 """
520 On macOS the default fd limit (RLIMIT_NOFILE) is sometimes too low (256)
521 for our test suite to succeed. Raise it to something more reasonable. 1024
522 is a common Linux default.
523 """
524 try:
525 import resource
526 except ImportError:
527 return
528
529 fd_limit, max_fds = resource.getrlimit(resource.RLIMIT_NOFILE)
530
531 desired_fds = 1024
532
533 if fd_limit < desired_fds and fd_limit < max_fds:
534 new_fd_limit = min(desired_fds, max_fds)
535 try:
536 resource.setrlimit(resource.RLIMIT_NOFILE,
537 (new_fd_limit, max_fds))
538 print(f"Raised RLIMIT_NOFILE: {fd_limit} -> {new_fd_limit}")
539 except (ValueError, OSError) as err:
540 print_warning(f"Unable to raise RLIMIT_NOFILE from {fd_limit} to "
541 f"{new_fd_limit}: {err}.")
542
543
544 def get_host_runner():
545 if (hostrunner := os.environ.get("_PYTHON_HOSTRUNNER")) is None:
546 hostrunner = sysconfig.get_config_var("HOSTRUNNER")
547 return hostrunner
548
549
550 def is_cross_compiled():
551 return ('_PYTHON_HOST_PLATFORM' in os.environ)
552
553
554 def format_resources(use_resources: Iterable[str]):
555 use_resources = set(use_resources)
556 all_resources = set(ALL_RESOURCES)
557
558 # Express resources relative to "all"
559 relative_all = ['all']
560 for name in sorted(all_resources - use_resources):
561 relative_all.append(f'-{name}')
562 for name in sorted(use_resources - all_resources):
563 relative_all.append(f'{name}')
564 all_text = ','.join(relative_all)
565 all_text = f"resources: {all_text}"
566
567 # List of enabled resources
568 text = ','.join(sorted(use_resources))
569 text = f"resources ({len(use_resources)}): {text}"
570
571 # Pick the shortest string (prefer relative to all if lengths are equal)
572 if len(all_text) <= len(text):
573 return all_text
574 else:
575 return text
576
577
578 def process_cpu_count():
579 if hasattr(os, 'sched_getaffinity'):
580 return len(os.sched_getaffinity(0))
581 else:
582 return os.cpu_count()
583
584
585 def display_header(use_resources: tuple[str, ...],
586 python_cmd: tuple[str, ...] | None):
587 # Print basic platform information
588 print("==", platform.python_implementation(), *sys.version.split())
589 print("==", platform.platform(aliased=True),
590 "%s-endian" % sys.byteorder)
591 print("== Python build:", ' '.join(get_build_info()))
592 print("== cwd:", os.getcwd())
593
594 cpu_count: object = os.cpu_count()
595 if cpu_count:
596 affinity = process_cpu_count()
597 if affinity and affinity != cpu_count:
598 cpu_count = f"{affinity} (process) / {cpu_count} (system)"
599 print("== CPU count:", cpu_count)
600 print("== encodings: locale=%s FS=%s"
601 % (locale.getencoding(), sys.getfilesystemencoding()))
602
603 if use_resources:
604 text = format_resources(use_resources)
605 print(f"== {text}")
606 else:
607 print("== resources: all test resources are disabled, "
608 "use -u option to unskip tests")
609
610 cross_compile = is_cross_compiled()
611 if cross_compile:
612 print("== cross compiled: Yes")
613 if python_cmd:
614 cmd = shlex.join(python_cmd)
615 print(f"== host python: {cmd}")
616
617 get_cmd = [*python_cmd, '-m', 'platform']
618 proc = subprocess.run(
619 get_cmd,
620 stdout=subprocess.PIPE,
621 text=True,
622 cwd=os_helper.SAVEDCWD)
623 stdout = proc.stdout.replace('\n', ' ').strip()
624 if stdout:
625 print(f"== host platform: {stdout}")
626 elif proc.returncode:
627 print(f"== host platform: <command failed with exit code {proc.returncode}>")
628 else:
629 hostrunner = get_host_runner()
630 if hostrunner:
631 print(f"== host runner: {hostrunner}")
632
633 # This makes it easier to remember what to set in your local
634 # environment when trying to reproduce a sanitizer failure.
635 asan = support.check_sanitizer(address=True)
636 msan = support.check_sanitizer(memory=True)
637 ubsan = support.check_sanitizer(ub=True)
638 sanitizers = []
639 if asan:
640 sanitizers.append("address")
641 if msan:
642 sanitizers.append("memory")
643 if ubsan:
644 sanitizers.append("undefined behavior")
645 if sanitizers:
646 print(f"== sanitizers: {', '.join(sanitizers)}")
647 for sanitizer, env_var in (
648 (asan, "ASAN_OPTIONS"),
649 (msan, "MSAN_OPTIONS"),
650 (ubsan, "UBSAN_OPTIONS"),
651 ):
652 options= os.environ.get(env_var)
653 if sanitizer and options is not None:
654 print(f"== {env_var}={options!r}")
655
656 print(flush=True)
657
658
659 def cleanup_temp_dir(tmp_dir: StrPath):
660 import glob
661
662 path = os.path.join(glob.escape(tmp_dir), TMP_PREFIX + '*')
663 print("Cleanup %s directory" % tmp_dir)
664 for name in glob.glob(path):
665 if os.path.isdir(name):
666 print("Remove directory: %s" % name)
667 os_helper.rmtree(name)
668 else:
669 print("Remove file: %s" % name)
670 os_helper.unlink(name)
671
672 WINDOWS_STATUS = {
673 0xC0000005: "STATUS_ACCESS_VIOLATION",
674 0xC00000FD: "STATUS_STACK_OVERFLOW",
675 0xC000013A: "STATUS_CONTROL_C_EXIT",
676 }
677
678 def get_signal_name(exitcode):
679 if exitcode < 0:
680 signum = -exitcode
681 try:
682 return signal.Signals(signum).name
683 except ValueError:
684 pass
685
686 try:
687 return WINDOWS_STATUS[exitcode]
688 except KeyError:
689 pass
690
691 return None