python (3.12.0)
1 """Supporting definitions for the Python regression tests."""
2
3 if __name__ != 'test.support':
4 raise ImportError('support must be imported from the test package')
5
6 import contextlib
7 import dataclasses
8 import functools
9 import getpass
10 import opcode
11 import os
12 import re
13 import stat
14 import sys
15 import sysconfig
16 import textwrap
17 import time
18 import types
19 import unittest
20 import warnings
21
22 from .testresult import get_test_runner
23
24
25 __all__ = [
26 # globals
27 "PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast",
28 # exceptions
29 "Error", "TestFailed", "TestDidNotRun", "ResourceDenied",
30 # io
31 "record_original_stdout", "get_original_stdout", "captured_stdout",
32 "captured_stdin", "captured_stderr",
33 # unittest
34 "is_resource_enabled", "requires", "requires_freebsd_version",
35 "requires_linux_version", "requires_mac_ver",
36 "check_syntax_error",
37 "run_unittest", "run_doctest",
38 "requires_gzip", "requires_bz2", "requires_lzma",
39 "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute",
40 "requires_IEEE_754", "requires_zlib",
41 "has_fork_support", "requires_fork",
42 "has_subprocess_support", "requires_subprocess",
43 "has_socket_support", "requires_working_socket",
44 "anticipate_failure", "load_package_tests", "detect_api_mismatch",
45 "check__all__", "skip_if_buggy_ucrt_strfptime",
46 "check_disallow_instantiation", "check_sanitizer", "skip_if_sanitizer",
47 "requires_limited_api", "requires_specialization",
48 # sys
49 "is_jython", "is_android", "is_emscripten", "is_wasi",
50 "check_impl_detail", "unix_shell", "setswitchinterval",
51 # os
52 "get_pagesize",
53 # network
54 "open_urlresource",
55 # processes
56 "reap_children",
57 # miscellaneous
58 "run_with_locale", "swap_item", "findfile", "infinite_recursion",
59 "swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict",
60 "run_with_tz", "PGO", "missing_compiler_executable",
61 "ALWAYS_EQ", "NEVER_EQ", "LARGEST", "SMALLEST",
62 "LOOPBACK_TIMEOUT", "INTERNET_TIMEOUT", "SHORT_TIMEOUT", "LONG_TIMEOUT",
63 "Py_DEBUG", "EXCEEDS_RECURSION_LIMIT", "C_RECURSION_LIMIT",
64 "skip_on_s390x",
65 ]
66
67
68 # Timeout in seconds for tests using a network server listening on the network
69 # local loopback interface like 127.0.0.1.
70 #
71 # The timeout is long enough to prevent test failure: it takes into account
72 # that the client and the server can run in different threads or even different
73 # processes.
74 #
75 # The timeout should be long enough for connect(), recv() and send() methods
76 # of socket.socket.
77 LOOPBACK_TIMEOUT = 5.0
78 if sys.platform == 'win32' and ' 32 bit (ARM)' in sys.version:
79 # bpo-37553: test_socket.SendfileUsingSendTest is taking longer than 2
80 # seconds on Windows ARM32 buildbot
81 LOOPBACK_TIMEOUT = 10
82 elif sys.platform == 'vxworks':
83 LOOPBACK_TIMEOUT = 10
84
85 # Timeout in seconds for network requests going to the internet. The timeout is
86 # short enough to prevent a test to wait for too long if the internet request
87 # is blocked for whatever reason.
88 #
89 # Usually, a timeout using INTERNET_TIMEOUT should not mark a test as failed,
90 # but skip the test instead: see transient_internet().
91 INTERNET_TIMEOUT = 60.0
92
93 # Timeout in seconds to mark a test as failed if the test takes "too long".
94 #
95 # The timeout value depends on the regrtest --timeout command line option.
96 #
97 # If a test using SHORT_TIMEOUT starts to fail randomly on slow buildbots, use
98 # LONG_TIMEOUT instead.
99 SHORT_TIMEOUT = 30.0
100
101 # Timeout in seconds to detect when a test hangs.
102 #
103 # It is long enough to reduce the risk of test failure on the slowest Python
104 # buildbots. It should not be used to mark a test as failed if the test takes
105 # "too long". The timeout value depends on the regrtest --timeout command line
106 # option.
107 LONG_TIMEOUT = 5 * 60.0
108
109 # TEST_HOME_DIR refers to the top level directory of the "test" package
110 # that contains Python's regression test suite
111 TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__))
112 TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR)
113 STDLIB_DIR = os.path.dirname(TEST_HOME_DIR)
114 REPO_ROOT = os.path.dirname(STDLIB_DIR)
115
116
117 class ESC[4;38;5;81mError(ESC[4;38;5;149mException):
118 """Base class for regression test exceptions."""
119
120 class ESC[4;38;5;81mTestFailed(ESC[4;38;5;149mError):
121 """Test failed."""
122 def __init__(self, msg, *args, stats=None):
123 self.msg = msg
124 self.stats = stats
125 super().__init__(msg, *args)
126
127 def __str__(self):
128 return self.msg
129
130 class ESC[4;38;5;81mTestFailedWithDetails(ESC[4;38;5;149mTestFailed):
131 """Test failed."""
132 def __init__(self, msg, errors, failures, stats):
133 self.errors = errors
134 self.failures = failures
135 super().__init__(msg, errors, failures, stats=stats)
136
137 class ESC[4;38;5;81mTestDidNotRun(ESC[4;38;5;149mError):
138 """Test did not run any subtests."""
139
140 class ESC[4;38;5;81mResourceDenied(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mSkipTest):
141 """Test skipped because it requested a disallowed resource.
142
143 This is raised when a test calls requires() for a resource that
144 has not be enabled. It is used to distinguish between expected
145 and unexpected skips.
146 """
147
148 def anticipate_failure(condition):
149 """Decorator to mark a test that is known to be broken in some cases
150
151 Any use of this decorator should have a comment identifying the
152 associated tracker issue.
153 """
154 if condition:
155 return unittest.expectedFailure
156 return lambda f: f
157
158 def load_package_tests(pkg_dir, loader, standard_tests, pattern):
159 """Generic load_tests implementation for simple test packages.
160
161 Most packages can implement load_tests using this function as follows:
162
163 def load_tests(*args):
164 return load_package_tests(os.path.dirname(__file__), *args)
165 """
166 if pattern is None:
167 pattern = "test*"
168 top_dir = STDLIB_DIR
169 package_tests = loader.discover(start_dir=pkg_dir,
170 top_level_dir=top_dir,
171 pattern=pattern)
172 standard_tests.addTests(package_tests)
173 return standard_tests
174
175
176 def get_attribute(obj, name):
177 """Get an attribute, raising SkipTest if AttributeError is raised."""
178 try:
179 attribute = getattr(obj, name)
180 except AttributeError:
181 raise unittest.SkipTest("object %r has no attribute %r" % (obj, name))
182 else:
183 return attribute
184
185 verbose = 1 # Flag set to 0 by regrtest.py
186 use_resources = None # Flag set to [] by regrtest.py
187 max_memuse = 0 # Disable bigmem tests (they will still be run with
188 # small sizes, to make sure they work.)
189 real_max_memuse = 0
190 junit_xml_list = None # list of testsuite XML elements
191 failfast = False
192
193 # _original_stdout is meant to hold stdout at the time regrtest began.
194 # This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
195 # The point is to have some flavor of stdout the user can actually see.
196 _original_stdout = None
197 def record_original_stdout(stdout):
198 global _original_stdout
199 _original_stdout = stdout
200
201 def get_original_stdout():
202 return _original_stdout or sys.stdout
203
204
205 def _force_run(path, func, *args):
206 try:
207 return func(*args)
208 except FileNotFoundError as err:
209 # chmod() won't fix a missing file.
210 if verbose >= 2:
211 print('%s: %s' % (err.__class__.__name__, err))
212 raise
213 except OSError as err:
214 if verbose >= 2:
215 print('%s: %s' % (err.__class__.__name__, err))
216 print('re-run %s%r' % (func.__name__, args))
217 os.chmod(path, stat.S_IRWXU)
218 return func(*args)
219
220
221 # Check whether a gui is actually available
222 def _is_gui_available():
223 if hasattr(_is_gui_available, 'result'):
224 return _is_gui_available.result
225 import platform
226 reason = None
227 if sys.platform.startswith('win') and platform.win32_is_iot():
228 reason = "gui is not available on Windows IoT Core"
229 elif sys.platform.startswith('win'):
230 # if Python is running as a service (such as the buildbot service),
231 # gui interaction may be disallowed
232 import ctypes
233 import ctypes.wintypes
234 UOI_FLAGS = 1
235 WSF_VISIBLE = 0x0001
236 class ESC[4;38;5;81mUSEROBJECTFLAGS(ESC[4;38;5;149mctypesESC[4;38;5;149m.ESC[4;38;5;149mStructure):
237 _fields_ = [("fInherit", ctypes.wintypes.BOOL),
238 ("fReserved", ctypes.wintypes.BOOL),
239 ("dwFlags", ctypes.wintypes.DWORD)]
240 dll = ctypes.windll.user32
241 h = dll.GetProcessWindowStation()
242 if not h:
243 raise ctypes.WinError()
244 uof = USEROBJECTFLAGS()
245 needed = ctypes.wintypes.DWORD()
246 res = dll.GetUserObjectInformationW(h,
247 UOI_FLAGS,
248 ctypes.byref(uof),
249 ctypes.sizeof(uof),
250 ctypes.byref(needed))
251 if not res:
252 raise ctypes.WinError()
253 if not bool(uof.dwFlags & WSF_VISIBLE):
254 reason = "gui not available (WSF_VISIBLE flag not set)"
255 elif sys.platform == 'darwin':
256 # The Aqua Tk implementations on OS X can abort the process if
257 # being called in an environment where a window server connection
258 # cannot be made, for instance when invoked by a buildbot or ssh
259 # process not running under the same user id as the current console
260 # user. To avoid that, raise an exception if the window manager
261 # connection is not available.
262 from ctypes import cdll, c_int, pointer, Structure
263 from ctypes.util import find_library
264
265 app_services = cdll.LoadLibrary(find_library("ApplicationServices"))
266
267 if app_services.CGMainDisplayID() == 0:
268 reason = "gui tests cannot run without OS X window manager"
269 else:
270 class ESC[4;38;5;81mProcessSerialNumber(ESC[4;38;5;149mStructure):
271 _fields_ = [("highLongOfPSN", c_int),
272 ("lowLongOfPSN", c_int)]
273 psn = ProcessSerialNumber()
274 psn_p = pointer(psn)
275 if ( (app_services.GetCurrentProcess(psn_p) < 0) or
276 (app_services.SetFrontProcess(psn_p) < 0) ):
277 reason = "cannot run without OS X gui process"
278
279 # check on every platform whether tkinter can actually do anything
280 if not reason:
281 try:
282 from tkinter import Tk
283 root = Tk()
284 root.withdraw()
285 root.update()
286 root.destroy()
287 except Exception as e:
288 err_string = str(e)
289 if len(err_string) > 50:
290 err_string = err_string[:50] + ' [...]'
291 reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__,
292 err_string)
293
294 _is_gui_available.reason = reason
295 _is_gui_available.result = not reason
296
297 return _is_gui_available.result
298
299 def is_resource_enabled(resource):
300 """Test whether a resource is enabled.
301
302 Known resources are set by regrtest.py. If not running under regrtest.py,
303 all resources are assumed enabled unless use_resources has been set.
304 """
305 return use_resources is None or resource in use_resources
306
307 def requires(resource, msg=None):
308 """Raise ResourceDenied if the specified resource is not available."""
309 if not is_resource_enabled(resource):
310 if msg is None:
311 msg = "Use of the %r resource not enabled" % resource
312 raise ResourceDenied(msg)
313 if resource in {"network", "urlfetch"} and not has_socket_support:
314 raise ResourceDenied("No socket support")
315 if resource == 'gui' and not _is_gui_available():
316 raise ResourceDenied(_is_gui_available.reason)
317
318 def _requires_unix_version(sysname, min_version):
319 """Decorator raising SkipTest if the OS is `sysname` and the version is less
320 than `min_version`.
321
322 For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if
323 the FreeBSD version is less than 7.2.
324 """
325 import platform
326 min_version_txt = '.'.join(map(str, min_version))
327 version_txt = platform.release().split('-', 1)[0]
328 if platform.system() == sysname:
329 try:
330 version = tuple(map(int, version_txt.split('.')))
331 except ValueError:
332 skip = False
333 else:
334 skip = version < min_version
335 else:
336 skip = False
337
338 return unittest.skipIf(
339 skip,
340 f"{sysname} version {min_version_txt} or higher required, not "
341 f"{version_txt}"
342 )
343
344
345 def requires_freebsd_version(*min_version):
346 """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is
347 less than `min_version`.
348
349 For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD
350 version is less than 7.2.
351 """
352 return _requires_unix_version('FreeBSD', min_version)
353
354 def requires_linux_version(*min_version):
355 """Decorator raising SkipTest if the OS is Linux and the Linux version is
356 less than `min_version`.
357
358 For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux
359 version is less than 2.6.32.
360 """
361 return _requires_unix_version('Linux', min_version)
362
363 def requires_mac_ver(*min_version):
364 """Decorator raising SkipTest if the OS is Mac OS X and the OS X
365 version if less than min_version.
366
367 For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version
368 is lesser than 10.5.
369 """
370 def decorator(func):
371 @functools.wraps(func)
372 def wrapper(*args, **kw):
373 if sys.platform == 'darwin':
374 import platform
375 version_txt = platform.mac_ver()[0]
376 try:
377 version = tuple(map(int, version_txt.split('.')))
378 except ValueError:
379 pass
380 else:
381 if version < min_version:
382 min_version_txt = '.'.join(map(str, min_version))
383 raise unittest.SkipTest(
384 "Mac OS X %s or higher required, not %s"
385 % (min_version_txt, version_txt))
386 return func(*args, **kw)
387 wrapper.min_version = min_version
388 return wrapper
389 return decorator
390
391
392 def skip_if_buildbot(reason=None):
393 """Decorator raising SkipTest if running on a buildbot."""
394 if not reason:
395 reason = 'not suitable for buildbots'
396 try:
397 isbuildbot = getpass.getuser().lower() == 'buildbot'
398 except (KeyError, EnvironmentError) as err:
399 warnings.warn(f'getpass.getuser() failed {err}.', RuntimeWarning)
400 isbuildbot = False
401 return unittest.skipIf(isbuildbot, reason)
402
403 def check_sanitizer(*, address=False, memory=False, ub=False):
404 """Returns True if Python is compiled with sanitizer support"""
405 if not (address or memory or ub):
406 raise ValueError('At least one of address, memory, or ub must be True')
407
408
409 _cflags = sysconfig.get_config_var('CFLAGS') or ''
410 _config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
411 memory_sanitizer = (
412 '-fsanitize=memory' in _cflags or
413 '--with-memory-sanitizer' in _config_args
414 )
415 address_sanitizer = (
416 '-fsanitize=address' in _cflags or
417 '--with-address-sanitizer' in _config_args
418 )
419 ub_sanitizer = (
420 '-fsanitize=undefined' in _cflags or
421 '--with-undefined-behavior-sanitizer' in _config_args
422 )
423 return (
424 (memory and memory_sanitizer) or
425 (address and address_sanitizer) or
426 (ub and ub_sanitizer)
427 )
428
429
430 def skip_if_sanitizer(reason=None, *, address=False, memory=False, ub=False):
431 """Decorator raising SkipTest if running with a sanitizer active."""
432 if not reason:
433 reason = 'not working with sanitizers active'
434 skip = check_sanitizer(address=address, memory=memory, ub=ub)
435 return unittest.skipIf(skip, reason)
436
437
438 def system_must_validate_cert(f):
439 """Skip the test on TLS certificate validation failures."""
440 @functools.wraps(f)
441 def dec(*args, **kwargs):
442 try:
443 f(*args, **kwargs)
444 except OSError as e:
445 if "CERTIFICATE_VERIFY_FAILED" in str(e):
446 raise unittest.SkipTest("system does not contain "
447 "necessary certificates")
448 raise
449 return dec
450
451 # A constant likely larger than the underlying OS pipe buffer size, to
452 # make writes blocking.
453 # Windows limit seems to be around 512 B, and many Unix kernels have a
454 # 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure.
455 # (see issue #17835 for a discussion of this number).
456 PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1
457
458 # A constant likely larger than the underlying OS socket buffer size, to make
459 # writes blocking.
460 # The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl
461 # on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643
462 # for a discussion of this number.
463 SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1
464
465 # decorator for skipping tests on non-IEEE 754 platforms
466 requires_IEEE_754 = unittest.skipUnless(
467 float.__getformat__("double").startswith("IEEE"),
468 "test requires IEEE 754 doubles")
469
470 def requires_zlib(reason='requires zlib'):
471 try:
472 import zlib
473 except ImportError:
474 zlib = None
475 return unittest.skipUnless(zlib, reason)
476
477 def requires_gzip(reason='requires gzip'):
478 try:
479 import gzip
480 except ImportError:
481 gzip = None
482 return unittest.skipUnless(gzip, reason)
483
484 def requires_bz2(reason='requires bz2'):
485 try:
486 import bz2
487 except ImportError:
488 bz2 = None
489 return unittest.skipUnless(bz2, reason)
490
491 def requires_lzma(reason='requires lzma'):
492 try:
493 import lzma
494 except ImportError:
495 lzma = None
496 return unittest.skipUnless(lzma, reason)
497
498 def has_no_debug_ranges():
499 try:
500 import _testinternalcapi
501 except ImportError:
502 raise unittest.SkipTest("_testinternalcapi required")
503 config = _testinternalcapi.get_config()
504 return not bool(config['code_debug_ranges'])
505
506 def requires_debug_ranges(reason='requires co_positions / debug_ranges'):
507 return unittest.skipIf(has_no_debug_ranges(), reason)
508
509 def requires_legacy_unicode_capi():
510 try:
511 from _testcapi import unicode_legacy_string
512 except ImportError:
513 unicode_legacy_string = None
514
515 return unittest.skipUnless(unicode_legacy_string,
516 'requires legacy Unicode C API')
517
518 # Is not actually used in tests, but is kept for compatibility.
519 is_jython = sys.platform.startswith('java')
520
521 is_android = hasattr(sys, 'getandroidapilevel')
522
523 if sys.platform not in ('win32', 'vxworks'):
524 unix_shell = '/system/bin/sh' if is_android else '/bin/sh'
525 else:
526 unix_shell = None
527
528 # wasm32-emscripten and -wasi are POSIX-like but do not
529 # have subprocess or fork support.
530 is_emscripten = sys.platform == "emscripten"
531 is_wasi = sys.platform == "wasi"
532
533 has_fork_support = hasattr(os, "fork") and not is_emscripten and not is_wasi
534
535 def requires_fork():
536 return unittest.skipUnless(has_fork_support, "requires working os.fork()")
537
538 has_subprocess_support = not is_emscripten and not is_wasi
539
540 def requires_subprocess():
541 """Used for subprocess, os.spawn calls, fd inheritance"""
542 return unittest.skipUnless(has_subprocess_support, "requires subprocess support")
543
544 # Emscripten's socket emulation and WASI sockets have limitations.
545 has_socket_support = not is_emscripten and not is_wasi
546
547 def requires_working_socket(*, module=False):
548 """Skip tests or modules that require working sockets
549
550 Can be used as a function/class decorator or to skip an entire module.
551 """
552 msg = "requires socket support"
553 if module:
554 if not has_socket_support:
555 raise unittest.SkipTest(msg)
556 else:
557 return unittest.skipUnless(has_socket_support, msg)
558
559 # Does strftime() support glibc extension like '%4Y'?
560 has_strftime_extensions = False
561 if sys.platform != "win32":
562 # bpo-47037: Windows debug builds crash with "Debug Assertion Failed"
563 try:
564 has_strftime_extensions = time.strftime("%4Y") != "%4Y"
565 except ValueError:
566 pass
567
568 # Define the URL of a dedicated HTTP server for the network tests.
569 # The URL must use clear-text HTTP: no redirection to encrypted HTTPS.
570 TEST_HTTP_URL = "http://www.pythontest.net"
571
572 # Set by libregrtest/main.py so we can skip tests that are not
573 # useful for PGO
574 PGO = False
575
576 # Set by libregrtest/main.py if we are running the extended (time consuming)
577 # PGO task. If this is True, PGO is also True.
578 PGO_EXTENDED = False
579
580 # TEST_DATA_DIR is used as a target download location for remote resources
581 TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data")
582
583
584 def darwin_malloc_err_warning(test_name):
585 """Assure user that loud errors generated by macOS libc's malloc are
586 expected."""
587 if sys.platform != 'darwin':
588 return
589
590 import shutil
591 msg = ' NOTICE '
592 detail = (f'{test_name} may generate "malloc can\'t allocate region"\n'
593 'warnings on macOS systems. This behavior is known. Do not\n'
594 'report a bug unless tests are also failing.\n'
595 'See https://github.com/python/cpython/issues/85100')
596
597 padding, _ = shutil.get_terminal_size()
598 print(msg.center(padding, '-'))
599 print(detail)
600 print('-' * padding)
601
602
603 def findfile(filename, subdir=None):
604 """Try to find a file on sys.path or in the test directory. If it is not
605 found the argument passed to the function is returned (this does not
606 necessarily signal failure; could still be the legitimate path).
607
608 Setting *subdir* indicates a relative path to use to find the file
609 rather than looking directly in the path directories.
610 """
611 if os.path.isabs(filename):
612 return filename
613 if subdir is not None:
614 filename = os.path.join(subdir, filename)
615 path = [TEST_HOME_DIR] + sys.path
616 for dn in path:
617 fn = os.path.join(dn, filename)
618 if os.path.exists(fn): return fn
619 return filename
620
621
622 def sortdict(dict):
623 "Like repr(dict), but in sorted order."
624 items = sorted(dict.items())
625 reprpairs = ["%r: %r" % pair for pair in items]
626 withcommas = ", ".join(reprpairs)
627 return "{%s}" % withcommas
628
629
630 def run_code(code: str) -> dict[str, object]:
631 """Run a piece of code after dedenting it, and return its global namespace."""
632 ns = {}
633 exec(textwrap.dedent(code), ns)
634 return ns
635
636
637 def check_syntax_error(testcase, statement, errtext='', *, lineno=None, offset=None):
638 with testcase.assertRaisesRegex(SyntaxError, errtext) as cm:
639 compile(statement, '<test string>', 'exec')
640 err = cm.exception
641 testcase.assertIsNotNone(err.lineno)
642 if lineno is not None:
643 testcase.assertEqual(err.lineno, lineno)
644 testcase.assertIsNotNone(err.offset)
645 if offset is not None:
646 testcase.assertEqual(err.offset, offset)
647
648
649 def open_urlresource(url, *args, **kw):
650 import urllib.request, urllib.parse
651 from .os_helper import unlink
652 try:
653 import gzip
654 except ImportError:
655 gzip = None
656
657 check = kw.pop('check', None)
658
659 filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
660
661 fn = os.path.join(TEST_DATA_DIR, filename)
662
663 def check_valid_file(fn):
664 f = open(fn, *args, **kw)
665 if check is None:
666 return f
667 elif check(f):
668 f.seek(0)
669 return f
670 f.close()
671
672 if os.path.exists(fn):
673 f = check_valid_file(fn)
674 if f is not None:
675 return f
676 unlink(fn)
677
678 # Verify the requirement before downloading the file
679 requires('urlfetch')
680
681 if verbose:
682 print('\tfetching %s ...' % url, file=get_original_stdout())
683 opener = urllib.request.build_opener()
684 if gzip:
685 opener.addheaders.append(('Accept-Encoding', 'gzip'))
686 f = opener.open(url, timeout=INTERNET_TIMEOUT)
687 if gzip and f.headers.get('Content-Encoding') == 'gzip':
688 f = gzip.GzipFile(fileobj=f)
689 try:
690 with open(fn, "wb") as out:
691 s = f.read()
692 while s:
693 out.write(s)
694 s = f.read()
695 finally:
696 f.close()
697
698 f = check_valid_file(fn)
699 if f is not None:
700 return f
701 raise TestFailed('invalid resource %r' % fn)
702
703
704 @contextlib.contextmanager
705 def captured_output(stream_name):
706 """Return a context manager used by captured_stdout/stdin/stderr
707 that temporarily replaces the sys stream *stream_name* with a StringIO."""
708 import io
709 orig_stdout = getattr(sys, stream_name)
710 setattr(sys, stream_name, io.StringIO())
711 try:
712 yield getattr(sys, stream_name)
713 finally:
714 setattr(sys, stream_name, orig_stdout)
715
716 def captured_stdout():
717 """Capture the output of sys.stdout:
718
719 with captured_stdout() as stdout:
720 print("hello")
721 self.assertEqual(stdout.getvalue(), "hello\\n")
722 """
723 return captured_output("stdout")
724
725 def captured_stderr():
726 """Capture the output of sys.stderr:
727
728 with captured_stderr() as stderr:
729 print("hello", file=sys.stderr)
730 self.assertEqual(stderr.getvalue(), "hello\\n")
731 """
732 return captured_output("stderr")
733
734 def captured_stdin():
735 """Capture the input to sys.stdin:
736
737 with captured_stdin() as stdin:
738 stdin.write('hello\\n')
739 stdin.seek(0)
740 # call test code that consumes from sys.stdin
741 captured = input()
742 self.assertEqual(captured, "hello")
743 """
744 return captured_output("stdin")
745
746
747 def gc_collect():
748 """Force as many objects as possible to be collected.
749
750 In non-CPython implementations of Python, this is needed because timely
751 deallocation is not guaranteed by the garbage collector. (Even in CPython
752 this can be the case in case of reference cycles.) This means that __del__
753 methods may be called later than expected and weakrefs may remain alive for
754 longer than expected. This function tries its best to force all garbage
755 objects to disappear.
756 """
757 import gc
758 gc.collect()
759 gc.collect()
760 gc.collect()
761
762 @contextlib.contextmanager
763 def disable_gc():
764 import gc
765 have_gc = gc.isenabled()
766 gc.disable()
767 try:
768 yield
769 finally:
770 if have_gc:
771 gc.enable()
772
773
774 def python_is_optimized():
775 """Find if Python was built with optimizations."""
776 cflags = sysconfig.get_config_var('PY_CFLAGS') or ''
777 final_opt = ""
778 for opt in cflags.split():
779 if opt.startswith('-O'):
780 final_opt = opt
781 return final_opt not in ('', '-O0', '-Og')
782
783
784 _header = 'nP'
785 _align = '0n'
786 if hasattr(sys, "getobjects"):
787 _header = '2P' + _header
788 _align = '0P'
789 _vheader = _header + 'n'
790
791 def calcobjsize(fmt):
792 import struct
793 return struct.calcsize(_header + fmt + _align)
794
795 def calcvobjsize(fmt):
796 import struct
797 return struct.calcsize(_vheader + fmt + _align)
798
799
800 _TPFLAGS_HAVE_GC = 1<<14
801 _TPFLAGS_HEAPTYPE = 1<<9
802
803 def check_sizeof(test, o, size):
804 try:
805 import _testinternalcapi
806 except ImportError:
807 raise unittest.SkipTest("_testinternalcapi required")
808 result = sys.getsizeof(o)
809 # add GC header size
810 if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
811 ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
812 size += _testinternalcapi.SIZEOF_PYGC_HEAD
813 msg = 'wrong size for %s: got %d, expected %d' \
814 % (type(o), result, size)
815 test.assertEqual(result, size, msg)
816
817 #=======================================================================
818 # Decorator for running a function in a different locale, correctly resetting
819 # it afterwards.
820
821 @contextlib.contextmanager
822 def run_with_locale(catstr, *locales):
823 try:
824 import locale
825 category = getattr(locale, catstr)
826 orig_locale = locale.setlocale(category)
827 except AttributeError:
828 # if the test author gives us an invalid category string
829 raise
830 except:
831 # cannot retrieve original locale, so do nothing
832 locale = orig_locale = None
833 else:
834 for loc in locales:
835 try:
836 locale.setlocale(category, loc)
837 break
838 except:
839 pass
840
841 try:
842 yield
843 finally:
844 if locale and orig_locale:
845 locale.setlocale(category, orig_locale)
846
847 #=======================================================================
848 # Decorator for running a function in a specific timezone, correctly
849 # resetting it afterwards.
850
851 def run_with_tz(tz):
852 def decorator(func):
853 def inner(*args, **kwds):
854 try:
855 tzset = time.tzset
856 except AttributeError:
857 raise unittest.SkipTest("tzset required")
858 if 'TZ' in os.environ:
859 orig_tz = os.environ['TZ']
860 else:
861 orig_tz = None
862 os.environ['TZ'] = tz
863 tzset()
864
865 # now run the function, resetting the tz on exceptions
866 try:
867 return func(*args, **kwds)
868 finally:
869 if orig_tz is None:
870 del os.environ['TZ']
871 else:
872 os.environ['TZ'] = orig_tz
873 time.tzset()
874
875 inner.__name__ = func.__name__
876 inner.__doc__ = func.__doc__
877 return inner
878 return decorator
879
880 #=======================================================================
881 # Big-memory-test support. Separate from 'resources' because memory use
882 # should be configurable.
883
884 # Some handy shorthands. Note that these are used for byte-limits as well
885 # as size-limits, in the various bigmem tests
886 _1M = 1024*1024
887 _1G = 1024 * _1M
888 _2G = 2 * _1G
889 _4G = 4 * _1G
890
891 MAX_Py_ssize_t = sys.maxsize
892
893 def set_memlimit(limit):
894 global max_memuse
895 global real_max_memuse
896 sizes = {
897 'k': 1024,
898 'm': _1M,
899 'g': _1G,
900 't': 1024*_1G,
901 }
902 m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
903 re.IGNORECASE | re.VERBOSE)
904 if m is None:
905 raise ValueError('Invalid memory limit %r' % (limit,))
906 memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
907 real_max_memuse = memlimit
908 if memlimit > MAX_Py_ssize_t:
909 memlimit = MAX_Py_ssize_t
910 if memlimit < _2G - 1:
911 raise ValueError('Memory limit %r too low to be useful' % (limit,))
912 max_memuse = memlimit
913
914 class ESC[4;38;5;81m_MemoryWatchdog:
915 """An object which periodically watches the process' memory consumption
916 and prints it out.
917 """
918
919 def __init__(self):
920 self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid())
921 self.started = False
922
923 def start(self):
924 import warnings
925 try:
926 f = open(self.procfile, 'r')
927 except OSError as e:
928 warnings.warn('/proc not available for stats: {}'.format(e),
929 RuntimeWarning)
930 sys.stderr.flush()
931 return
932
933 import subprocess
934 with f:
935 watchdog_script = findfile("memory_watchdog.py")
936 self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script],
937 stdin=f,
938 stderr=subprocess.DEVNULL)
939 self.started = True
940
941 def stop(self):
942 if self.started:
943 self.mem_watchdog.terminate()
944 self.mem_watchdog.wait()
945
946
947 def bigmemtest(size, memuse, dry_run=True):
948 """Decorator for bigmem tests.
949
950 'size' is a requested size for the test (in arbitrary, test-interpreted
951 units.) 'memuse' is the number of bytes per unit for the test, or a good
952 estimate of it. For example, a test that needs two byte buffers, of 4 GiB
953 each, could be decorated with @bigmemtest(size=_4G, memuse=2).
954
955 The 'size' argument is normally passed to the decorated test method as an
956 extra argument. If 'dry_run' is true, the value passed to the test method
957 may be less than the requested value. If 'dry_run' is false, it means the
958 test doesn't support dummy runs when -M is not specified.
959 """
960 def decorator(f):
961 def wrapper(self):
962 size = wrapper.size
963 memuse = wrapper.memuse
964 if not real_max_memuse:
965 maxsize = 5147
966 else:
967 maxsize = size
968
969 if ((real_max_memuse or not dry_run)
970 and real_max_memuse < maxsize * memuse):
971 raise unittest.SkipTest(
972 "not enough memory: %.1fG minimum needed"
973 % (size * memuse / (1024 ** 3)))
974
975 if real_max_memuse and verbose:
976 print()
977 print(" ... expected peak memory use: {peak:.1f}G"
978 .format(peak=size * memuse / (1024 ** 3)))
979 watchdog = _MemoryWatchdog()
980 watchdog.start()
981 else:
982 watchdog = None
983
984 try:
985 return f(self, maxsize)
986 finally:
987 if watchdog:
988 watchdog.stop()
989
990 wrapper.size = size
991 wrapper.memuse = memuse
992 return wrapper
993 return decorator
994
995 def bigaddrspacetest(f):
996 """Decorator for tests that fill the address space."""
997 def wrapper(self):
998 if max_memuse < MAX_Py_ssize_t:
999 if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
1000 raise unittest.SkipTest(
1001 "not enough memory: try a 32-bit build instead")
1002 else:
1003 raise unittest.SkipTest(
1004 "not enough memory: %.1fG minimum needed"
1005 % (MAX_Py_ssize_t / (1024 ** 3)))
1006 else:
1007 return f(self)
1008 return wrapper
1009
1010 #=======================================================================
1011 # unittest integration.
1012
1013 def _id(obj):
1014 return obj
1015
1016 def requires_resource(resource):
1017 if resource == 'gui' and not _is_gui_available():
1018 return unittest.skip(_is_gui_available.reason)
1019 if is_resource_enabled(resource):
1020 return _id
1021 else:
1022 return unittest.skip("resource {0!r} is not enabled".format(resource))
1023
1024 def cpython_only(test):
1025 """
1026 Decorator for tests only applicable on CPython.
1027 """
1028 return impl_detail(cpython=True)(test)
1029
1030 def impl_detail(msg=None, **guards):
1031 if check_impl_detail(**guards):
1032 return _id
1033 if msg is None:
1034 guardnames, default = _parse_guards(guards)
1035 if default:
1036 msg = "implementation detail not available on {0}"
1037 else:
1038 msg = "implementation detail specific to {0}"
1039 guardnames = sorted(guardnames.keys())
1040 msg = msg.format(' or '.join(guardnames))
1041 return unittest.skip(msg)
1042
1043 def _parse_guards(guards):
1044 # Returns a tuple ({platform_name: run_me}, default_value)
1045 if not guards:
1046 return ({'cpython': True}, False)
1047 is_true = list(guards.values())[0]
1048 assert list(guards.values()) == [is_true] * len(guards) # all True or all False
1049 return (guards, not is_true)
1050
1051 # Use the following check to guard CPython's implementation-specific tests --
1052 # or to run them only on the implementation(s) guarded by the arguments.
1053 def check_impl_detail(**guards):
1054 """This function returns True or False depending on the host platform.
1055 Examples:
1056 if check_impl_detail(): # only on CPython (default)
1057 if check_impl_detail(jython=True): # only on Jython
1058 if check_impl_detail(cpython=False): # everywhere except on CPython
1059 """
1060 guards, default = _parse_guards(guards)
1061 return guards.get(sys.implementation.name, default)
1062
1063
1064 def no_tracing(func):
1065 """Decorator to temporarily turn off tracing for the duration of a test."""
1066 if not hasattr(sys, 'gettrace'):
1067 return func
1068 else:
1069 @functools.wraps(func)
1070 def wrapper(*args, **kwargs):
1071 original_trace = sys.gettrace()
1072 try:
1073 sys.settrace(None)
1074 return func(*args, **kwargs)
1075 finally:
1076 sys.settrace(original_trace)
1077 return wrapper
1078
1079
1080 def refcount_test(test):
1081 """Decorator for tests which involve reference counting.
1082
1083 To start, the decorator does not run the test if is not run by CPython.
1084 After that, any trace function is unset during the test to prevent
1085 unexpected refcounts caused by the trace function.
1086
1087 """
1088 return no_tracing(cpython_only(test))
1089
1090
1091 def requires_limited_api(test):
1092 try:
1093 import _testcapi
1094 except ImportError:
1095 return unittest.skip('needs _testcapi module')(test)
1096 return unittest.skipUnless(
1097 _testcapi.LIMITED_API_AVAILABLE, 'needs Limited API support')(test)
1098
1099 def requires_specialization(test):
1100 return unittest.skipUnless(
1101 opcode.ENABLE_SPECIALIZATION, "requires specialization")(test)
1102
1103 def _filter_suite(suite, pred):
1104 """Recursively filter test cases in a suite based on a predicate."""
1105 newtests = []
1106 for test in suite._tests:
1107 if isinstance(test, unittest.TestSuite):
1108 _filter_suite(test, pred)
1109 newtests.append(test)
1110 else:
1111 if pred(test):
1112 newtests.append(test)
1113 suite._tests = newtests
1114
1115 @dataclasses.dataclass(slots=True)
1116 class ESC[4;38;5;81mTestStats:
1117 tests_run: int = 0
1118 failures: int = 0
1119 skipped: int = 0
1120
1121 @staticmethod
1122 def from_unittest(result):
1123 return TestStats(result.testsRun,
1124 len(result.failures),
1125 len(result.skipped))
1126
1127 @staticmethod
1128 def from_doctest(results):
1129 return TestStats(results.attempted,
1130 results.failed)
1131
1132 def accumulate(self, stats):
1133 self.tests_run += stats.tests_run
1134 self.failures += stats.failures
1135 self.skipped += stats.skipped
1136
1137
1138 def _run_suite(suite):
1139 """Run tests from a unittest.TestSuite-derived class."""
1140 runner = get_test_runner(sys.stdout,
1141 verbosity=verbose,
1142 capture_output=(junit_xml_list is not None))
1143
1144 result = runner.run(suite)
1145
1146 if junit_xml_list is not None:
1147 junit_xml_list.append(result.get_xml_element())
1148
1149 if not result.testsRun and not result.skipped and not result.errors:
1150 raise TestDidNotRun
1151 if not result.wasSuccessful():
1152 stats = TestStats.from_unittest(result)
1153 if len(result.errors) == 1 and not result.failures:
1154 err = result.errors[0][1]
1155 elif len(result.failures) == 1 and not result.errors:
1156 err = result.failures[0][1]
1157 else:
1158 err = "multiple errors occurred"
1159 if not verbose: err += "; run in verbose mode for details"
1160 errors = [(str(tc), exc_str) for tc, exc_str in result.errors]
1161 failures = [(str(tc), exc_str) for tc, exc_str in result.failures]
1162 raise TestFailedWithDetails(err, errors, failures, stats=stats)
1163 return result
1164
1165
1166 # By default, don't filter tests
1167 _match_test_func = None
1168
1169 _accept_test_patterns = None
1170 _ignore_test_patterns = None
1171
1172
1173 def match_test(test):
1174 # Function used by support.run_unittest() and regrtest --list-cases
1175 if _match_test_func is None:
1176 return True
1177 else:
1178 return _match_test_func(test.id())
1179
1180
1181 def _is_full_match_test(pattern):
1182 # If a pattern contains at least one dot, it's considered
1183 # as a full test identifier.
1184 # Example: 'test.test_os.FileTests.test_access'.
1185 #
1186 # ignore patterns which contain fnmatch patterns: '*', '?', '[...]'
1187 # or '[!...]'. For example, ignore 'test_access*'.
1188 return ('.' in pattern) and (not re.search(r'[?*\[\]]', pattern))
1189
1190
1191 def set_match_tests(accept_patterns=None, ignore_patterns=None):
1192 global _match_test_func, _accept_test_patterns, _ignore_test_patterns
1193
1194 if accept_patterns is None:
1195 accept_patterns = ()
1196 if ignore_patterns is None:
1197 ignore_patterns = ()
1198
1199 accept_func = ignore_func = None
1200
1201 if accept_patterns != _accept_test_patterns:
1202 accept_patterns, accept_func = _compile_match_function(accept_patterns)
1203 if ignore_patterns != _ignore_test_patterns:
1204 ignore_patterns, ignore_func = _compile_match_function(ignore_patterns)
1205
1206 # Create a copy since patterns can be mutable and so modified later
1207 _accept_test_patterns = tuple(accept_patterns)
1208 _ignore_test_patterns = tuple(ignore_patterns)
1209
1210 if accept_func is not None or ignore_func is not None:
1211 def match_function(test_id):
1212 accept = True
1213 ignore = False
1214 if accept_func:
1215 accept = accept_func(test_id)
1216 if ignore_func:
1217 ignore = ignore_func(test_id)
1218 return accept and not ignore
1219
1220 _match_test_func = match_function
1221
1222
1223 def _compile_match_function(patterns):
1224 if not patterns:
1225 func = None
1226 # set_match_tests(None) behaves as set_match_tests(())
1227 patterns = ()
1228 elif all(map(_is_full_match_test, patterns)):
1229 # Simple case: all patterns are full test identifier.
1230 # The test.bisect_cmd utility only uses such full test identifiers.
1231 func = set(patterns).__contains__
1232 else:
1233 import fnmatch
1234 regex = '|'.join(map(fnmatch.translate, patterns))
1235 # The search *is* case sensitive on purpose:
1236 # don't use flags=re.IGNORECASE
1237 regex_match = re.compile(regex).match
1238
1239 def match_test_regex(test_id):
1240 if regex_match(test_id):
1241 # The regex matches the whole identifier, for example
1242 # 'test.test_os.FileTests.test_access'.
1243 return True
1244 else:
1245 # Try to match parts of the test identifier.
1246 # For example, split 'test.test_os.FileTests.test_access'
1247 # into: 'test', 'test_os', 'FileTests' and 'test_access'.
1248 return any(map(regex_match, test_id.split(".")))
1249
1250 func = match_test_regex
1251
1252 return patterns, func
1253
1254
1255 def run_unittest(*classes):
1256 """Run tests from unittest.TestCase-derived classes."""
1257 valid_types = (unittest.TestSuite, unittest.TestCase)
1258 loader = unittest.TestLoader()
1259 suite = unittest.TestSuite()
1260 for cls in classes:
1261 if isinstance(cls, str):
1262 if cls in sys.modules:
1263 suite.addTest(loader.loadTestsFromModule(sys.modules[cls]))
1264 else:
1265 raise ValueError("str arguments must be keys in sys.modules")
1266 elif isinstance(cls, valid_types):
1267 suite.addTest(cls)
1268 else:
1269 suite.addTest(loader.loadTestsFromTestCase(cls))
1270 _filter_suite(suite, match_test)
1271 return _run_suite(suite)
1272
1273 #=======================================================================
1274 # Check for the presence of docstrings.
1275
1276 # Rather than trying to enumerate all the cases where docstrings may be
1277 # disabled, we just check for that directly
1278
1279 def _check_docstrings():
1280 """Just used to check if docstrings are enabled"""
1281
1282 MISSING_C_DOCSTRINGS = (check_impl_detail() and
1283 sys.platform != 'win32' and
1284 not sysconfig.get_config_var('WITH_DOC_STRINGS'))
1285
1286 HAVE_DOCSTRINGS = (_check_docstrings.__doc__ is not None and
1287 not MISSING_C_DOCSTRINGS)
1288
1289 requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
1290 "test requires docstrings")
1291
1292
1293 #=======================================================================
1294 # doctest driver.
1295
1296 def run_doctest(module, verbosity=None, optionflags=0):
1297 """Run doctest on the given module. Return (#failures, #tests).
1298
1299 If optional argument verbosity is not specified (or is None), pass
1300 support's belief about verbosity on to doctest. Else doctest's
1301 usual behavior is used (it searches sys.argv for -v).
1302 """
1303
1304 import doctest
1305
1306 if verbosity is None:
1307 verbosity = verbose
1308 else:
1309 verbosity = None
1310
1311 results = doctest.testmod(module,
1312 verbose=verbosity,
1313 optionflags=optionflags)
1314 if results.failed:
1315 stats = TestStats.from_doctest(results)
1316 raise TestFailed(f"{results.failed} of {results.attempted} "
1317 f"doctests failed",
1318 stats=stats)
1319 if verbose:
1320 print('doctest (%s) ... %d tests with zero failures' %
1321 (module.__name__, results.attempted))
1322 return results
1323
1324
1325 #=======================================================================
1326 # Support for saving and restoring the imported modules.
1327
1328 def flush_std_streams():
1329 if sys.stdout is not None:
1330 sys.stdout.flush()
1331 if sys.stderr is not None:
1332 sys.stderr.flush()
1333
1334
1335 def print_warning(msg):
1336 # bpo-45410: Explicitly flush stdout to keep logs in order
1337 flush_std_streams()
1338 stream = print_warning.orig_stderr
1339 for line in msg.splitlines():
1340 print(f"Warning -- {line}", file=stream)
1341 stream.flush()
1342
1343 # bpo-39983: Store the original sys.stderr at Python startup to be able to
1344 # log warnings even if sys.stderr is captured temporarily by a test.
1345 print_warning.orig_stderr = sys.stderr
1346
1347
1348 # Flag used by saved_test_environment of test.libregrtest.save_env,
1349 # to check if a test modified the environment. The flag should be set to False
1350 # before running a new test.
1351 #
1352 # For example, threading_helper.threading_cleanup() sets the flag is the function fails
1353 # to cleanup threads.
1354 environment_altered = False
1355
1356 def reap_children():
1357 """Use this function at the end of test_main() whenever sub-processes
1358 are started. This will help ensure that no extra children (zombies)
1359 stick around to hog resources and create problems when looking
1360 for refleaks.
1361 """
1362 global environment_altered
1363
1364 # Need os.waitpid(-1, os.WNOHANG): Windows is not supported
1365 if not (hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG')):
1366 return
1367 elif not has_subprocess_support:
1368 return
1369
1370 # Reap all our dead child processes so we don't leave zombies around.
1371 # These hog resources and might be causing some of the buildbots to die.
1372 while True:
1373 try:
1374 # Read the exit status of any child process which already completed
1375 pid, status = os.waitpid(-1, os.WNOHANG)
1376 except OSError:
1377 break
1378
1379 if pid == 0:
1380 break
1381
1382 print_warning(f"reap_children() reaped child process {pid}")
1383 environment_altered = True
1384
1385
1386 @contextlib.contextmanager
1387 def swap_attr(obj, attr, new_val):
1388 """Temporary swap out an attribute with a new object.
1389
1390 Usage:
1391 with swap_attr(obj, "attr", 5):
1392 ...
1393
1394 This will set obj.attr to 5 for the duration of the with: block,
1395 restoring the old value at the end of the block. If `attr` doesn't
1396 exist on `obj`, it will be created and then deleted at the end of the
1397 block.
1398
1399 The old value (or None if it doesn't exist) will be assigned to the
1400 target of the "as" clause, if there is one.
1401 """
1402 if hasattr(obj, attr):
1403 real_val = getattr(obj, attr)
1404 setattr(obj, attr, new_val)
1405 try:
1406 yield real_val
1407 finally:
1408 setattr(obj, attr, real_val)
1409 else:
1410 setattr(obj, attr, new_val)
1411 try:
1412 yield
1413 finally:
1414 if hasattr(obj, attr):
1415 delattr(obj, attr)
1416
1417 @contextlib.contextmanager
1418 def swap_item(obj, item, new_val):
1419 """Temporary swap out an item with a new object.
1420
1421 Usage:
1422 with swap_item(obj, "item", 5):
1423 ...
1424
1425 This will set obj["item"] to 5 for the duration of the with: block,
1426 restoring the old value at the end of the block. If `item` doesn't
1427 exist on `obj`, it will be created and then deleted at the end of the
1428 block.
1429
1430 The old value (or None if it doesn't exist) will be assigned to the
1431 target of the "as" clause, if there is one.
1432 """
1433 if item in obj:
1434 real_val = obj[item]
1435 obj[item] = new_val
1436 try:
1437 yield real_val
1438 finally:
1439 obj[item] = real_val
1440 else:
1441 obj[item] = new_val
1442 try:
1443 yield
1444 finally:
1445 if item in obj:
1446 del obj[item]
1447
1448 def args_from_interpreter_flags():
1449 """Return a list of command-line arguments reproducing the current
1450 settings in sys.flags and sys.warnoptions."""
1451 import subprocess
1452 return subprocess._args_from_interpreter_flags()
1453
1454 def optim_args_from_interpreter_flags():
1455 """Return a list of command-line arguments reproducing the current
1456 optimization settings in sys.flags."""
1457 import subprocess
1458 return subprocess._optim_args_from_interpreter_flags()
1459
1460
1461 class ESC[4;38;5;81mMatcher(ESC[4;38;5;149mobject):
1462
1463 _partial_matches = ('msg', 'message')
1464
1465 def matches(self, d, **kwargs):
1466 """
1467 Try to match a single dict with the supplied arguments.
1468
1469 Keys whose values are strings and which are in self._partial_matches
1470 will be checked for partial (i.e. substring) matches. You can extend
1471 this scheme to (for example) do regular expression matching, etc.
1472 """
1473 result = True
1474 for k in kwargs:
1475 v = kwargs[k]
1476 dv = d.get(k)
1477 if not self.match_value(k, dv, v):
1478 result = False
1479 break
1480 return result
1481
1482 def match_value(self, k, dv, v):
1483 """
1484 Try to match a single stored value (dv) with a supplied value (v).
1485 """
1486 if type(v) != type(dv):
1487 result = False
1488 elif type(dv) is not str or k not in self._partial_matches:
1489 result = (v == dv)
1490 else:
1491 result = dv.find(v) >= 0
1492 return result
1493
1494
1495 _buggy_ucrt = None
1496 def skip_if_buggy_ucrt_strfptime(test):
1497 """
1498 Skip decorator for tests that use buggy strptime/strftime
1499
1500 If the UCRT bugs are present time.localtime().tm_zone will be
1501 an empty string, otherwise we assume the UCRT bugs are fixed
1502
1503 See bpo-37552 [Windows] strptime/strftime return invalid
1504 results with UCRT version 17763.615
1505 """
1506 import locale
1507 global _buggy_ucrt
1508 if _buggy_ucrt is None:
1509 if(sys.platform == 'win32' and
1510 locale.getencoding() == 'cp65001' and
1511 time.localtime().tm_zone == ''):
1512 _buggy_ucrt = True
1513 else:
1514 _buggy_ucrt = False
1515 return unittest.skip("buggy MSVC UCRT strptime/strftime")(test) if _buggy_ucrt else test
1516
1517 class ESC[4;38;5;81mPythonSymlink:
1518 """Creates a symlink for the current Python executable"""
1519 def __init__(self, link=None):
1520 from .os_helper import TESTFN
1521
1522 self.link = link or os.path.abspath(TESTFN)
1523 self._linked = []
1524 self.real = os.path.realpath(sys.executable)
1525 self._also_link = []
1526
1527 self._env = None
1528
1529 self._platform_specific()
1530
1531 if sys.platform == "win32":
1532 def _platform_specific(self):
1533 import glob
1534 import _winapi
1535
1536 if os.path.lexists(self.real) and not os.path.exists(self.real):
1537 # App symlink appears to not exist, but we want the
1538 # real executable here anyway
1539 self.real = _winapi.GetModuleFileName(0)
1540
1541 dll = _winapi.GetModuleFileName(sys.dllhandle)
1542 src_dir = os.path.dirname(dll)
1543 dest_dir = os.path.dirname(self.link)
1544 self._also_link.append((
1545 dll,
1546 os.path.join(dest_dir, os.path.basename(dll))
1547 ))
1548 for runtime in glob.glob(os.path.join(glob.escape(src_dir), "vcruntime*.dll")):
1549 self._also_link.append((
1550 runtime,
1551 os.path.join(dest_dir, os.path.basename(runtime))
1552 ))
1553
1554 self._env = {k.upper(): os.getenv(k) for k in os.environ}
1555 self._env["PYTHONHOME"] = os.path.dirname(self.real)
1556 if sysconfig.is_python_build():
1557 self._env["PYTHONPATH"] = STDLIB_DIR
1558 else:
1559 def _platform_specific(self):
1560 pass
1561
1562 def __enter__(self):
1563 os.symlink(self.real, self.link)
1564 self._linked.append(self.link)
1565 for real, link in self._also_link:
1566 os.symlink(real, link)
1567 self._linked.append(link)
1568 return self
1569
1570 def __exit__(self, exc_type, exc_value, exc_tb):
1571 for link in self._linked:
1572 try:
1573 os.remove(link)
1574 except IOError as ex:
1575 if verbose:
1576 print("failed to clean up {}: {}".format(link, ex))
1577
1578 def _call(self, python, args, env, returncode):
1579 import subprocess
1580 cmd = [python, *args]
1581 p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
1582 stderr=subprocess.PIPE, env=env)
1583 r = p.communicate()
1584 if p.returncode != returncode:
1585 if verbose:
1586 print(repr(r[0]))
1587 print(repr(r[1]), file=sys.stderr)
1588 raise RuntimeError(
1589 'unexpected return code: {0} (0x{0:08X})'.format(p.returncode))
1590 return r
1591
1592 def call_real(self, *args, returncode=0):
1593 return self._call(self.real, args, None, returncode)
1594
1595 def call_link(self, *args, returncode=0):
1596 return self._call(self.link, args, self._env, returncode)
1597
1598
1599 def skip_if_pgo_task(test):
1600 """Skip decorator for tests not run in (non-extended) PGO task"""
1601 ok = not PGO or PGO_EXTENDED
1602 msg = "Not run for (non-extended) PGO task"
1603 return test if ok else unittest.skip(msg)(test)
1604
1605
1606 def detect_api_mismatch(ref_api, other_api, *, ignore=()):
1607 """Returns the set of items in ref_api not in other_api, except for a
1608 defined list of items to be ignored in this check.
1609
1610 By default this skips private attributes beginning with '_' but
1611 includes all magic methods, i.e. those starting and ending in '__'.
1612 """
1613 missing_items = set(dir(ref_api)) - set(dir(other_api))
1614 if ignore:
1615 missing_items -= set(ignore)
1616 missing_items = set(m for m in missing_items
1617 if not m.startswith('_') or m.endswith('__'))
1618 return missing_items
1619
1620
1621 def check__all__(test_case, module, name_of_module=None, extra=(),
1622 not_exported=()):
1623 """Assert that the __all__ variable of 'module' contains all public names.
1624
1625 The module's public names (its API) are detected automatically based on
1626 whether they match the public name convention and were defined in
1627 'module'.
1628
1629 The 'name_of_module' argument can specify (as a string or tuple thereof)
1630 what module(s) an API could be defined in in order to be detected as a
1631 public API. One case for this is when 'module' imports part of its public
1632 API from other modules, possibly a C backend (like 'csv' and its '_csv').
1633
1634 The 'extra' argument can be a set of names that wouldn't otherwise be
1635 automatically detected as "public", like objects without a proper
1636 '__module__' attribute. If provided, it will be added to the
1637 automatically detected ones.
1638
1639 The 'not_exported' argument can be a set of names that must not be treated
1640 as part of the public API even though their names indicate otherwise.
1641
1642 Usage:
1643 import bar
1644 import foo
1645 import unittest
1646 from test import support
1647
1648 class MiscTestCase(unittest.TestCase):
1649 def test__all__(self):
1650 support.check__all__(self, foo)
1651
1652 class OtherTestCase(unittest.TestCase):
1653 def test__all__(self):
1654 extra = {'BAR_CONST', 'FOO_CONST'}
1655 not_exported = {'baz'} # Undocumented name.
1656 # bar imports part of its API from _bar.
1657 support.check__all__(self, bar, ('bar', '_bar'),
1658 extra=extra, not_exported=not_exported)
1659
1660 """
1661
1662 if name_of_module is None:
1663 name_of_module = (module.__name__, )
1664 elif isinstance(name_of_module, str):
1665 name_of_module = (name_of_module, )
1666
1667 expected = set(extra)
1668
1669 for name in dir(module):
1670 if name.startswith('_') or name in not_exported:
1671 continue
1672 obj = getattr(module, name)
1673 if (getattr(obj, '__module__', None) in name_of_module or
1674 (not hasattr(obj, '__module__') and
1675 not isinstance(obj, types.ModuleType))):
1676 expected.add(name)
1677 test_case.assertCountEqual(module.__all__, expected)
1678
1679
1680 def suppress_msvcrt_asserts(verbose=False):
1681 try:
1682 import msvcrt
1683 except ImportError:
1684 return
1685
1686 msvcrt.SetErrorMode(msvcrt.SEM_FAILCRITICALERRORS
1687 | msvcrt.SEM_NOALIGNMENTFAULTEXCEPT
1688 | msvcrt.SEM_NOGPFAULTERRORBOX
1689 | msvcrt.SEM_NOOPENFILEERRORBOX)
1690
1691 # CrtSetReportMode() is only available in debug build
1692 if hasattr(msvcrt, 'CrtSetReportMode'):
1693 for m in [msvcrt.CRT_WARN, msvcrt.CRT_ERROR, msvcrt.CRT_ASSERT]:
1694 if verbose:
1695 msvcrt.CrtSetReportMode(m, msvcrt.CRTDBG_MODE_FILE)
1696 msvcrt.CrtSetReportFile(m, msvcrt.CRTDBG_FILE_STDERR)
1697 else:
1698 msvcrt.CrtSetReportMode(m, 0)
1699
1700
1701 class ESC[4;38;5;81mSuppressCrashReport:
1702 """Try to prevent a crash report from popping up.
1703
1704 On Windows, don't display the Windows Error Reporting dialog. On UNIX,
1705 disable the creation of coredump file.
1706 """
1707 old_value = None
1708 old_modes = None
1709
1710 def __enter__(self):
1711 """On Windows, disable Windows Error Reporting dialogs using
1712 SetErrorMode() and CrtSetReportMode().
1713
1714 On UNIX, try to save the previous core file size limit, then set
1715 soft limit to 0.
1716 """
1717 if sys.platform.startswith('win'):
1718 # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx
1719 try:
1720 import msvcrt
1721 except ImportError:
1722 return
1723
1724 self.old_value = msvcrt.GetErrorMode()
1725
1726 msvcrt.SetErrorMode(self.old_value | msvcrt.SEM_NOGPFAULTERRORBOX)
1727
1728 # bpo-23314: Suppress assert dialogs in debug builds.
1729 # CrtSetReportMode() is only available in debug build.
1730 if hasattr(msvcrt, 'CrtSetReportMode'):
1731 self.old_modes = {}
1732 for report_type in [msvcrt.CRT_WARN,
1733 msvcrt.CRT_ERROR,
1734 msvcrt.CRT_ASSERT]:
1735 old_mode = msvcrt.CrtSetReportMode(report_type,
1736 msvcrt.CRTDBG_MODE_FILE)
1737 old_file = msvcrt.CrtSetReportFile(report_type,
1738 msvcrt.CRTDBG_FILE_STDERR)
1739 self.old_modes[report_type] = old_mode, old_file
1740
1741 else:
1742 try:
1743 import resource
1744 self.resource = resource
1745 except ImportError:
1746 self.resource = None
1747 if self.resource is not None:
1748 try:
1749 self.old_value = self.resource.getrlimit(self.resource.RLIMIT_CORE)
1750 self.resource.setrlimit(self.resource.RLIMIT_CORE,
1751 (0, self.old_value[1]))
1752 except (ValueError, OSError):
1753 pass
1754
1755 if sys.platform == 'darwin':
1756 import subprocess
1757 # Check if the 'Crash Reporter' on OSX was configured
1758 # in 'Developer' mode and warn that it will get triggered
1759 # when it is.
1760 #
1761 # This assumes that this context manager is used in tests
1762 # that might trigger the next manager.
1763 cmd = ['/usr/bin/defaults', 'read',
1764 'com.apple.CrashReporter', 'DialogType']
1765 proc = subprocess.Popen(cmd,
1766 stdout=subprocess.PIPE,
1767 stderr=subprocess.PIPE)
1768 with proc:
1769 stdout = proc.communicate()[0]
1770 if stdout.strip() == b'developer':
1771 print("this test triggers the Crash Reporter, "
1772 "that is intentional", end='', flush=True)
1773
1774 return self
1775
1776 def __exit__(self, *ignore_exc):
1777 """Restore Windows ErrorMode or core file behavior to initial value."""
1778 if self.old_value is None:
1779 return
1780
1781 if sys.platform.startswith('win'):
1782 import msvcrt
1783 msvcrt.SetErrorMode(self.old_value)
1784
1785 if self.old_modes:
1786 for report_type, (old_mode, old_file) in self.old_modes.items():
1787 msvcrt.CrtSetReportMode(report_type, old_mode)
1788 msvcrt.CrtSetReportFile(report_type, old_file)
1789 else:
1790 if self.resource is not None:
1791 try:
1792 self.resource.setrlimit(self.resource.RLIMIT_CORE, self.old_value)
1793 except (ValueError, OSError):
1794 pass
1795
1796
1797 def patch(test_instance, object_to_patch, attr_name, new_value):
1798 """Override 'object_to_patch'.'attr_name' with 'new_value'.
1799
1800 Also, add a cleanup procedure to 'test_instance' to restore
1801 'object_to_patch' value for 'attr_name'.
1802 The 'attr_name' should be a valid attribute for 'object_to_patch'.
1803
1804 """
1805 # check that 'attr_name' is a real attribute for 'object_to_patch'
1806 # will raise AttributeError if it does not exist
1807 getattr(object_to_patch, attr_name)
1808
1809 # keep a copy of the old value
1810 attr_is_local = False
1811 try:
1812 old_value = object_to_patch.__dict__[attr_name]
1813 except (AttributeError, KeyError):
1814 old_value = getattr(object_to_patch, attr_name, None)
1815 else:
1816 attr_is_local = True
1817
1818 # restore the value when the test is done
1819 def cleanup():
1820 if attr_is_local:
1821 setattr(object_to_patch, attr_name, old_value)
1822 else:
1823 delattr(object_to_patch, attr_name)
1824
1825 test_instance.addCleanup(cleanup)
1826
1827 # actually override the attribute
1828 setattr(object_to_patch, attr_name, new_value)
1829
1830
1831 @contextlib.contextmanager
1832 def patch_list(orig):
1833 """Like unittest.mock.patch.dict, but for lists."""
1834 try:
1835 saved = orig[:]
1836 yield
1837 finally:
1838 orig[:] = saved
1839
1840
1841 def run_in_subinterp(code):
1842 """
1843 Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc
1844 module is enabled.
1845 """
1846 _check_tracemalloc()
1847 import _testcapi
1848 return _testcapi.run_in_subinterp(code)
1849
1850
1851 def run_in_subinterp_with_config(code, *, own_gil=None, **config):
1852 """
1853 Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc
1854 module is enabled.
1855 """
1856 _check_tracemalloc()
1857 import _testcapi
1858 if own_gil is not None:
1859 assert 'gil' not in config, (own_gil, config)
1860 config['gil'] = 2 if own_gil else 1
1861 return _testcapi.run_in_subinterp_with_config(code, **config)
1862
1863
1864 def _check_tracemalloc():
1865 # Issue #10915, #15751: PyGILState_*() functions don't work with
1866 # sub-interpreters, the tracemalloc module uses these functions internally
1867 try:
1868 import tracemalloc
1869 except ImportError:
1870 pass
1871 else:
1872 if tracemalloc.is_tracing():
1873 raise unittest.SkipTest("run_in_subinterp() cannot be used "
1874 "if tracemalloc module is tracing "
1875 "memory allocations")
1876
1877
1878 def check_free_after_iterating(test, iter, cls, args=()):
1879 class ESC[4;38;5;81mA(ESC[4;38;5;149mcls):
1880 def __del__(self):
1881 nonlocal done
1882 done = True
1883 try:
1884 next(it)
1885 except StopIteration:
1886 pass
1887
1888 done = False
1889 it = iter(A(*args))
1890 # Issue 26494: Shouldn't crash
1891 test.assertRaises(StopIteration, next, it)
1892 # The sequence should be deallocated just after the end of iterating
1893 gc_collect()
1894 test.assertTrue(done)
1895
1896
1897 def missing_compiler_executable(cmd_names=[]):
1898 """Check if the compiler components used to build the interpreter exist.
1899
1900 Check for the existence of the compiler executables whose names are listed
1901 in 'cmd_names' or all the compiler executables when 'cmd_names' is empty
1902 and return the first missing executable or None when none is found
1903 missing.
1904
1905 """
1906 from setuptools._distutils import ccompiler, sysconfig, spawn
1907 from setuptools import errors
1908
1909 compiler = ccompiler.new_compiler()
1910 sysconfig.customize_compiler(compiler)
1911 if compiler.compiler_type == "msvc":
1912 # MSVC has no executables, so check whether initialization succeeds
1913 try:
1914 compiler.initialize()
1915 except errors.PlatformError:
1916 return "msvc"
1917 for name in compiler.executables:
1918 if cmd_names and name not in cmd_names:
1919 continue
1920 cmd = getattr(compiler, name)
1921 if cmd_names:
1922 assert cmd is not None, \
1923 "the '%s' executable is not configured" % name
1924 elif not cmd:
1925 continue
1926 if spawn.find_executable(cmd[0]) is None:
1927 return cmd[0]
1928
1929
1930 _is_android_emulator = None
1931 def setswitchinterval(interval):
1932 # Setting a very low gil interval on the Android emulator causes python
1933 # to hang (issue #26939).
1934 minimum_interval = 1e-5
1935 if is_android and interval < minimum_interval:
1936 global _is_android_emulator
1937 if _is_android_emulator is None:
1938 import subprocess
1939 _is_android_emulator = (subprocess.check_output(
1940 ['getprop', 'ro.kernel.qemu']).strip() == b'1')
1941 if _is_android_emulator:
1942 interval = minimum_interval
1943 return sys.setswitchinterval(interval)
1944
1945
1946 def get_pagesize():
1947 """Get size of a page in bytes."""
1948 try:
1949 page_size = os.sysconf('SC_PAGESIZE')
1950 except (ValueError, AttributeError):
1951 try:
1952 page_size = os.sysconf('SC_PAGE_SIZE')
1953 except (ValueError, AttributeError):
1954 page_size = 4096
1955 return page_size
1956
1957
1958 @contextlib.contextmanager
1959 def disable_faulthandler():
1960 import faulthandler
1961
1962 # use sys.__stderr__ instead of sys.stderr, since regrtest replaces
1963 # sys.stderr with a StringIO which has no file descriptor when a test
1964 # is run with -W/--verbose3.
1965 fd = sys.__stderr__.fileno()
1966
1967 is_enabled = faulthandler.is_enabled()
1968 try:
1969 faulthandler.disable()
1970 yield
1971 finally:
1972 if is_enabled:
1973 faulthandler.enable(file=fd, all_threads=True)
1974
1975
1976 class ESC[4;38;5;81mSaveSignals:
1977 """
1978 Save and restore signal handlers.
1979
1980 This class is only able to save/restore signal handlers registered
1981 by the Python signal module: see bpo-13285 for "external" signal
1982 handlers.
1983 """
1984
1985 def __init__(self):
1986 import signal
1987 self.signal = signal
1988 self.signals = signal.valid_signals()
1989 # SIGKILL and SIGSTOP signals cannot be ignored nor caught
1990 for signame in ('SIGKILL', 'SIGSTOP'):
1991 try:
1992 signum = getattr(signal, signame)
1993 except AttributeError:
1994 continue
1995 self.signals.remove(signum)
1996 self.handlers = {}
1997
1998 def save(self):
1999 for signum in self.signals:
2000 handler = self.signal.getsignal(signum)
2001 if handler is None:
2002 # getsignal() returns None if a signal handler was not
2003 # registered by the Python signal module,
2004 # and the handler is not SIG_DFL nor SIG_IGN.
2005 #
2006 # Ignore the signal: we cannot restore the handler.
2007 continue
2008 self.handlers[signum] = handler
2009
2010 def restore(self):
2011 for signum, handler in self.handlers.items():
2012 self.signal.signal(signum, handler)
2013
2014
2015 def with_pymalloc():
2016 import _testcapi
2017 return _testcapi.WITH_PYMALLOC
2018
2019
2020 class ESC[4;38;5;81m_ALWAYS_EQ:
2021 """
2022 Object that is equal to anything.
2023 """
2024 def __eq__(self, other):
2025 return True
2026 def __ne__(self, other):
2027 return False
2028
2029 ALWAYS_EQ = _ALWAYS_EQ()
2030
2031 class ESC[4;38;5;81m_NEVER_EQ:
2032 """
2033 Object that is not equal to anything.
2034 """
2035 def __eq__(self, other):
2036 return False
2037 def __ne__(self, other):
2038 return True
2039 def __hash__(self):
2040 return 1
2041
2042 NEVER_EQ = _NEVER_EQ()
2043
2044 @functools.total_ordering
2045 class ESC[4;38;5;81m_LARGEST:
2046 """
2047 Object that is greater than anything (except itself).
2048 """
2049 def __eq__(self, other):
2050 return isinstance(other, _LARGEST)
2051 def __lt__(self, other):
2052 return False
2053
2054 LARGEST = _LARGEST()
2055
2056 @functools.total_ordering
2057 class ESC[4;38;5;81m_SMALLEST:
2058 """
2059 Object that is less than anything (except itself).
2060 """
2061 def __eq__(self, other):
2062 return isinstance(other, _SMALLEST)
2063 def __gt__(self, other):
2064 return False
2065
2066 SMALLEST = _SMALLEST()
2067
2068 def maybe_get_event_loop_policy():
2069 """Return the global event loop policy if one is set, else return None."""
2070 import asyncio.events
2071 return asyncio.events._event_loop_policy
2072
2073 # Helpers for testing hashing.
2074 NHASHBITS = sys.hash_info.width # number of bits in hash() result
2075 assert NHASHBITS in (32, 64)
2076
2077 # Return mean and sdev of number of collisions when tossing nballs balls
2078 # uniformly at random into nbins bins. By definition, the number of
2079 # collisions is the number of balls minus the number of occupied bins at
2080 # the end.
2081 def collision_stats(nbins, nballs):
2082 n, k = nbins, nballs
2083 # prob a bin empty after k trials = (1 - 1/n)**k
2084 # mean # empty is then n * (1 - 1/n)**k
2085 # so mean # occupied is n - n * (1 - 1/n)**k
2086 # so collisions = k - (n - n*(1 - 1/n)**k)
2087 #
2088 # For the variance:
2089 # n*(n-1)*(1-2/n)**k + meanempty - meanempty**2 =
2090 # n*(n-1)*(1-2/n)**k + meanempty * (1 - meanempty)
2091 #
2092 # Massive cancellation occurs, and, e.g., for a 64-bit hash code
2093 # 1-1/2**64 rounds uselessly to 1.0. Rather than make heroic (and
2094 # error-prone) efforts to rework the naive formulas to avoid those,
2095 # we use the `decimal` module to get plenty of extra precision.
2096 #
2097 # Note: the exact values are straightforward to compute with
2098 # rationals, but in context that's unbearably slow, requiring
2099 # multi-million bit arithmetic.
2100 import decimal
2101 with decimal.localcontext() as ctx:
2102 bits = n.bit_length() * 2 # bits in n**2
2103 # At least that many bits will likely cancel out.
2104 # Use that many decimal digits instead.
2105 ctx.prec = max(bits, 30)
2106 dn = decimal.Decimal(n)
2107 p1empty = ((dn - 1) / dn) ** k
2108 meanempty = n * p1empty
2109 occupied = n - meanempty
2110 collisions = k - occupied
2111 var = dn*(dn-1)*((dn-2)/dn)**k + meanempty * (1 - meanempty)
2112 return float(collisions), float(var.sqrt())
2113
2114
2115 class ESC[4;38;5;81mcatch_unraisable_exception:
2116 """
2117 Context manager catching unraisable exception using sys.unraisablehook.
2118
2119 Storing the exception value (cm.unraisable.exc_value) creates a reference
2120 cycle. The reference cycle is broken explicitly when the context manager
2121 exits.
2122
2123 Storing the object (cm.unraisable.object) can resurrect it if it is set to
2124 an object which is being finalized. Exiting the context manager clears the
2125 stored object.
2126
2127 Usage:
2128
2129 with support.catch_unraisable_exception() as cm:
2130 # code creating an "unraisable exception"
2131 ...
2132
2133 # check the unraisable exception: use cm.unraisable
2134 ...
2135
2136 # cm.unraisable attribute no longer exists at this point
2137 # (to break a reference cycle)
2138 """
2139
2140 def __init__(self):
2141 self.unraisable = None
2142 self._old_hook = None
2143
2144 def _hook(self, unraisable):
2145 # Storing unraisable.object can resurrect an object which is being
2146 # finalized. Storing unraisable.exc_value creates a reference cycle.
2147 self.unraisable = unraisable
2148
2149 def __enter__(self):
2150 self._old_hook = sys.unraisablehook
2151 sys.unraisablehook = self._hook
2152 return self
2153
2154 def __exit__(self, *exc_info):
2155 sys.unraisablehook = self._old_hook
2156 del self.unraisable
2157
2158
2159 def wait_process(pid, *, exitcode, timeout=None):
2160 """
2161 Wait until process pid completes and check that the process exit code is
2162 exitcode.
2163
2164 Raise an AssertionError if the process exit code is not equal to exitcode.
2165
2166 If the process runs longer than timeout seconds (LONG_TIMEOUT by default),
2167 kill the process (if signal.SIGKILL is available) and raise an
2168 AssertionError. The timeout feature is not available on Windows.
2169 """
2170 if os.name != "nt":
2171 import signal
2172
2173 if timeout is None:
2174 timeout = LONG_TIMEOUT
2175
2176 start_time = time.monotonic()
2177 for _ in sleeping_retry(timeout, error=False):
2178 pid2, status = os.waitpid(pid, os.WNOHANG)
2179 if pid2 != 0:
2180 break
2181 # rety: the process is still running
2182 else:
2183 try:
2184 os.kill(pid, signal.SIGKILL)
2185 os.waitpid(pid, 0)
2186 except OSError:
2187 # Ignore errors like ChildProcessError or PermissionError
2188 pass
2189
2190 dt = time.monotonic() - start_time
2191 raise AssertionError(f"process {pid} is still running "
2192 f"after {dt:.1f} seconds")
2193 else:
2194 # Windows implementation: don't support timeout :-(
2195 pid2, status = os.waitpid(pid, 0)
2196
2197 exitcode2 = os.waitstatus_to_exitcode(status)
2198 if exitcode2 != exitcode:
2199 raise AssertionError(f"process {pid} exited with code {exitcode2}, "
2200 f"but exit code {exitcode} is expected")
2201
2202 # sanity check: it should not fail in practice
2203 if pid2 != pid:
2204 raise AssertionError(f"pid {pid2} != pid {pid}")
2205
2206 def skip_if_broken_multiprocessing_synchronize():
2207 """
2208 Skip tests if the multiprocessing.synchronize module is missing, if there
2209 is no available semaphore implementation, or if creating a lock raises an
2210 OSError (on Linux only).
2211 """
2212 from .import_helper import import_module
2213
2214 # Skip tests if the _multiprocessing extension is missing.
2215 import_module('_multiprocessing')
2216
2217 # Skip tests if there is no available semaphore implementation:
2218 # multiprocessing.synchronize requires _multiprocessing.SemLock.
2219 synchronize = import_module('multiprocessing.synchronize')
2220
2221 if sys.platform == "linux":
2222 try:
2223 # bpo-38377: On Linux, creating a semaphore fails with OSError
2224 # if the current user does not have the permission to create
2225 # a file in /dev/shm/ directory.
2226 synchronize.Lock(ctx=None)
2227 except OSError as exc:
2228 raise unittest.SkipTest(f"broken multiprocessing SemLock: {exc!r}")
2229
2230
2231 def check_disallow_instantiation(testcase, tp, *args, **kwds):
2232 """
2233 Check that given type cannot be instantiated using *args and **kwds.
2234
2235 See bpo-43916: Add Py_TPFLAGS_DISALLOW_INSTANTIATION type flag.
2236 """
2237 mod = tp.__module__
2238 name = tp.__name__
2239 if mod != 'builtins':
2240 qualname = f"{mod}.{name}"
2241 else:
2242 qualname = f"{name}"
2243 msg = f"cannot create '{re.escape(qualname)}' instances"
2244 testcase.assertRaisesRegex(TypeError, msg, tp, *args, **kwds)
2245
2246 def get_recursion_depth():
2247 """Get the recursion depth of the caller function.
2248
2249 In the __main__ module, at the module level, it should be 1.
2250 """
2251 try:
2252 import _testinternalcapi
2253 depth = _testinternalcapi.get_recursion_depth()
2254 except (ImportError, RecursionError) as exc:
2255 # sys._getframe() + frame.f_back implementation.
2256 try:
2257 depth = 0
2258 frame = sys._getframe()
2259 while frame is not None:
2260 depth += 1
2261 frame = frame.f_back
2262 finally:
2263 # Break any reference cycles.
2264 frame = None
2265
2266 # Ignore get_recursion_depth() frame.
2267 return max(depth - 1, 1)
2268
2269 def get_recursion_available():
2270 """Get the number of available frames before RecursionError.
2271
2272 It depends on the current recursion depth of the caller function and
2273 sys.getrecursionlimit().
2274 """
2275 limit = sys.getrecursionlimit()
2276 depth = get_recursion_depth()
2277 return limit - depth
2278
2279 @contextlib.contextmanager
2280 def set_recursion_limit(limit):
2281 """Temporarily change the recursion limit."""
2282 original_limit = sys.getrecursionlimit()
2283 try:
2284 sys.setrecursionlimit(limit)
2285 yield
2286 finally:
2287 sys.setrecursionlimit(original_limit)
2288
2289 def infinite_recursion(max_depth=100):
2290 """Set a lower limit for tests that interact with infinite recursions
2291 (e.g test_ast.ASTHelpers_Test.test_recursion_direct) since on some
2292 debug windows builds, due to not enough functions being inlined the
2293 stack size might not handle the default recursion limit (1000). See
2294 bpo-11105 for details."""
2295 if max_depth < 3:
2296 raise ValueError("max_depth must be at least 3, got {max_depth}")
2297 depth = get_recursion_depth()
2298 depth = max(depth - 1, 1) # Ignore infinite_recursion() frame.
2299 limit = depth + max_depth
2300 return set_recursion_limit(limit)
2301
2302 def ignore_deprecations_from(module: str, *, like: str) -> object:
2303 token = object()
2304 warnings.filterwarnings(
2305 "ignore",
2306 category=DeprecationWarning,
2307 module=module,
2308 message=like + fr"(?#support{id(token)})",
2309 )
2310 return token
2311
2312 def clear_ignored_deprecations(*tokens: object) -> None:
2313 if not tokens:
2314 raise ValueError("Provide token or tokens returned by ignore_deprecations_from")
2315
2316 new_filters = []
2317 endswith = tuple(rf"(?#support{id(token)})" for token in tokens)
2318 for action, message, category, module, lineno in warnings.filters:
2319 if action == "ignore" and category is DeprecationWarning:
2320 if isinstance(message, re.Pattern):
2321 msg = message.pattern
2322 else:
2323 msg = message or ""
2324 if msg.endswith(endswith):
2325 continue
2326 new_filters.append((action, message, category, module, lineno))
2327 if warnings.filters != new_filters:
2328 warnings.filters[:] = new_filters
2329 warnings._filters_mutated()
2330
2331
2332 # Skip a test if venv with pip is known to not work.
2333 def requires_venv_with_pip():
2334 # ensurepip requires zlib to open ZIP archives (.whl binary wheel packages)
2335 try:
2336 import zlib
2337 except ImportError:
2338 return unittest.skipIf(True, "venv: ensurepip requires zlib")
2339
2340 # bpo-26610: pip/pep425tags.py requires ctypes.
2341 # gh-92820: setuptools/windows_support.py uses ctypes (setuptools 58.1).
2342 try:
2343 import ctypes
2344 except ImportError:
2345 ctypes = None
2346 return unittest.skipUnless(ctypes, 'venv: pip requires ctypes')
2347
2348
2349 @functools.cache
2350 def _findwheel(pkgname):
2351 """Try to find a wheel with the package specified as pkgname.
2352
2353 If set, the wheels are searched for in WHEEL_PKG_DIR (see ensurepip).
2354 Otherwise, they are searched for in the test directory.
2355 """
2356 wheel_dir = sysconfig.get_config_var('WHEEL_PKG_DIR') or TEST_HOME_DIR
2357 filenames = os.listdir(wheel_dir)
2358 filenames = sorted(filenames, reverse=True) # approximate "newest" first
2359 for filename in filenames:
2360 # filename is like 'setuptools-67.6.1-py3-none-any.whl'
2361 if not filename.endswith(".whl"):
2362 continue
2363 prefix = pkgname + '-'
2364 if filename.startswith(prefix):
2365 return os.path.join(wheel_dir, filename)
2366 raise FileNotFoundError(f"No wheel for {pkgname} found in {wheel_dir}")
2367
2368
2369 # Context manager that creates a virtual environment, install setuptools and wheel in it
2370 # and returns the path to the venv directory and the path to the python executable
2371 @contextlib.contextmanager
2372 def setup_venv_with_pip_setuptools_wheel(venv_dir):
2373 import subprocess
2374 from .os_helper import temp_cwd
2375
2376 with temp_cwd() as temp_dir:
2377 # Create virtual environment to get setuptools
2378 cmd = [sys.executable, '-X', 'dev', '-m', 'venv', venv_dir]
2379 if verbose:
2380 print()
2381 print('Run:', ' '.join(cmd))
2382 subprocess.run(cmd, check=True)
2383
2384 venv = os.path.join(temp_dir, venv_dir)
2385
2386 # Get the Python executable of the venv
2387 python_exe = os.path.basename(sys.executable)
2388 if sys.platform == 'win32':
2389 python = os.path.join(venv, 'Scripts', python_exe)
2390 else:
2391 python = os.path.join(venv, 'bin', python_exe)
2392
2393 cmd = [python, '-X', 'dev',
2394 '-m', 'pip', 'install',
2395 _findwheel('setuptools'),
2396 _findwheel('wheel')]
2397 if verbose:
2398 print()
2399 print('Run:', ' '.join(cmd))
2400 subprocess.run(cmd, check=True)
2401
2402 yield python
2403
2404
2405 # True if Python is built with the Py_DEBUG macro defined: if
2406 # Python is built in debug mode (./configure --with-pydebug).
2407 Py_DEBUG = hasattr(sys, 'gettotalrefcount')
2408
2409
2410 def late_deletion(obj):
2411 """
2412 Keep a Python alive as long as possible.
2413
2414 Create a reference cycle and store the cycle in an object deleted late in
2415 Python finalization. Try to keep the object alive until the very last
2416 garbage collection.
2417
2418 The function keeps a strong reference by design. It should be called in a
2419 subprocess to not mark a test as "leaking a reference".
2420 """
2421
2422 # Late CPython finalization:
2423 # - finalize_interp_clear()
2424 # - _PyInterpreterState_Clear(): Clear PyInterpreterState members
2425 # (ex: codec_search_path, before_forkers)
2426 # - clear os.register_at_fork() callbacks
2427 # - clear codecs.register() callbacks
2428
2429 ref_cycle = [obj]
2430 ref_cycle.append(ref_cycle)
2431
2432 # Store a reference in PyInterpreterState.codec_search_path
2433 import codecs
2434 def search_func(encoding):
2435 return None
2436 search_func.reference = ref_cycle
2437 codecs.register(search_func)
2438
2439 if hasattr(os, 'register_at_fork'):
2440 # Store a reference in PyInterpreterState.before_forkers
2441 def atfork_func():
2442 pass
2443 atfork_func.reference = ref_cycle
2444 os.register_at_fork(before=atfork_func)
2445
2446
2447 def busy_retry(timeout, err_msg=None, /, *, error=True):
2448 """
2449 Run the loop body until "break" stops the loop.
2450
2451 After *timeout* seconds, raise an AssertionError if *error* is true,
2452 or just stop if *error is false.
2453
2454 Example:
2455
2456 for _ in support.busy_retry(support.SHORT_TIMEOUT):
2457 if check():
2458 break
2459
2460 Example of error=False usage:
2461
2462 for _ in support.busy_retry(support.SHORT_TIMEOUT, error=False):
2463 if check():
2464 break
2465 else:
2466 raise RuntimeError('my custom error')
2467
2468 """
2469 if timeout <= 0:
2470 raise ValueError("timeout must be greater than zero")
2471
2472 start_time = time.monotonic()
2473 deadline = start_time + timeout
2474
2475 while True:
2476 yield
2477
2478 if time.monotonic() >= deadline:
2479 break
2480
2481 if error:
2482 dt = time.monotonic() - start_time
2483 msg = f"timeout ({dt:.1f} seconds)"
2484 if err_msg:
2485 msg = f"{msg}: {err_msg}"
2486 raise AssertionError(msg)
2487
2488
2489 def sleeping_retry(timeout, err_msg=None, /,
2490 *, init_delay=0.010, max_delay=1.0, error=True):
2491 """
2492 Wait strategy that applies exponential backoff.
2493
2494 Run the loop body until "break" stops the loop. Sleep at each loop
2495 iteration, but not at the first iteration. The sleep delay is doubled at
2496 each iteration (up to *max_delay* seconds).
2497
2498 See busy_retry() documentation for the parameters usage.
2499
2500 Example raising an exception after SHORT_TIMEOUT seconds:
2501
2502 for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
2503 if check():
2504 break
2505
2506 Example of error=False usage:
2507
2508 for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
2509 if check():
2510 break
2511 else:
2512 raise RuntimeError('my custom error')
2513 """
2514
2515 delay = init_delay
2516 for _ in busy_retry(timeout, err_msg, error=error):
2517 yield
2518
2519 time.sleep(delay)
2520 delay = min(delay * 2, max_delay)
2521
2522
2523 @contextlib.contextmanager
2524 def adjust_int_max_str_digits(max_digits):
2525 """Temporarily change the integer string conversion length limit."""
2526 current = sys.get_int_max_str_digits()
2527 try:
2528 sys.set_int_max_str_digits(max_digits)
2529 yield
2530 finally:
2531 sys.set_int_max_str_digits(current)
2532
2533 #For recursion tests, easily exceeds default recursion limit
2534 EXCEEDS_RECURSION_LIMIT = 5000
2535
2536 # The default C recursion limit (from Include/cpython/pystate.h).
2537 C_RECURSION_LIMIT = 1500
2538
2539 #Windows doesn't have os.uname() but it doesn't support s390x.
2540 skip_on_s390x = unittest.skipIf(hasattr(os, 'uname') and os.uname().machine == 's390x',
2541 'skipped on s390x')