python (3.12.0)

(root)/
lib/
python3.12/
test/
support/
__init__.py
       1  """Supporting definitions for the Python regression tests."""
       2  
       3  if __name__ != 'test.support':
       4      raise ImportError('support must be imported from the test package')
       5  
       6  import contextlib
       7  import dataclasses
       8  import functools
       9  import getpass
      10  import opcode
      11  import os
      12  import re
      13  import stat
      14  import sys
      15  import sysconfig
      16  import textwrap
      17  import time
      18  import types
      19  import unittest
      20  import warnings
      21  
      22  from .testresult import get_test_runner
      23  
      24  
      25  __all__ = [
      26      # globals
      27      "PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast",
      28      # exceptions
      29      "Error", "TestFailed", "TestDidNotRun", "ResourceDenied",
      30      # io
      31      "record_original_stdout", "get_original_stdout", "captured_stdout",
      32      "captured_stdin", "captured_stderr",
      33      # unittest
      34      "is_resource_enabled", "requires", "requires_freebsd_version",
      35      "requires_linux_version", "requires_mac_ver",
      36      "check_syntax_error",
      37      "run_unittest", "run_doctest",
      38      "requires_gzip", "requires_bz2", "requires_lzma",
      39      "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute",
      40      "requires_IEEE_754", "requires_zlib",
      41      "has_fork_support", "requires_fork",
      42      "has_subprocess_support", "requires_subprocess",
      43      "has_socket_support", "requires_working_socket",
      44      "anticipate_failure", "load_package_tests", "detect_api_mismatch",
      45      "check__all__", "skip_if_buggy_ucrt_strfptime",
      46      "check_disallow_instantiation", "check_sanitizer", "skip_if_sanitizer",
      47      "requires_limited_api", "requires_specialization",
      48      # sys
      49      "is_jython", "is_android", "is_emscripten", "is_wasi",
      50      "check_impl_detail", "unix_shell", "setswitchinterval",
      51      # os
      52      "get_pagesize",
      53      # network
      54      "open_urlresource",
      55      # processes
      56      "reap_children",
      57      # miscellaneous
      58      "run_with_locale", "swap_item", "findfile", "infinite_recursion",
      59      "swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict",
      60      "run_with_tz", "PGO", "missing_compiler_executable",
      61      "ALWAYS_EQ", "NEVER_EQ", "LARGEST", "SMALLEST",
      62      "LOOPBACK_TIMEOUT", "INTERNET_TIMEOUT", "SHORT_TIMEOUT", "LONG_TIMEOUT",
      63      "Py_DEBUG", "EXCEEDS_RECURSION_LIMIT", "C_RECURSION_LIMIT",
      64      "skip_on_s390x",
      65      ]
      66  
      67  
      68  # Timeout in seconds for tests using a network server listening on the network
      69  # local loopback interface like 127.0.0.1.
      70  #
      71  # The timeout is long enough to prevent test failure: it takes into account
      72  # that the client and the server can run in different threads or even different
      73  # processes.
      74  #
      75  # The timeout should be long enough for connect(), recv() and send() methods
      76  # of socket.socket.
      77  LOOPBACK_TIMEOUT = 5.0
      78  if sys.platform == 'win32' and ' 32 bit (ARM)' in sys.version:
      79      # bpo-37553: test_socket.SendfileUsingSendTest is taking longer than 2
      80      # seconds on Windows ARM32 buildbot
      81      LOOPBACK_TIMEOUT = 10
      82  elif sys.platform == 'vxworks':
      83      LOOPBACK_TIMEOUT = 10
      84  
      85  # Timeout in seconds for network requests going to the internet. The timeout is
      86  # short enough to prevent a test to wait for too long if the internet request
      87  # is blocked for whatever reason.
      88  #
      89  # Usually, a timeout using INTERNET_TIMEOUT should not mark a test as failed,
      90  # but skip the test instead: see transient_internet().
      91  INTERNET_TIMEOUT = 60.0
      92  
      93  # Timeout in seconds to mark a test as failed if the test takes "too long".
      94  #
      95  # The timeout value depends on the regrtest --timeout command line option.
      96  #
      97  # If a test using SHORT_TIMEOUT starts to fail randomly on slow buildbots, use
      98  # LONG_TIMEOUT instead.
      99  SHORT_TIMEOUT = 30.0
     100  
     101  # Timeout in seconds to detect when a test hangs.
     102  #
     103  # It is long enough to reduce the risk of test failure on the slowest Python
     104  # buildbots. It should not be used to mark a test as failed if the test takes
     105  # "too long". The timeout value depends on the regrtest --timeout command line
     106  # option.
     107  LONG_TIMEOUT = 5 * 60.0
     108  
     109  # TEST_HOME_DIR refers to the top level directory of the "test" package
     110  # that contains Python's regression test suite
     111  TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__))
     112  TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR)
     113  STDLIB_DIR = os.path.dirname(TEST_HOME_DIR)
     114  REPO_ROOT = os.path.dirname(STDLIB_DIR)
     115  
     116  
     117  class ESC[4;38;5;81mError(ESC[4;38;5;149mException):
     118      """Base class for regression test exceptions."""
     119  
     120  class ESC[4;38;5;81mTestFailed(ESC[4;38;5;149mError):
     121      """Test failed."""
     122      def __init__(self, msg, *args, stats=None):
     123          self.msg = msg
     124          self.stats = stats
     125          super().__init__(msg, *args)
     126  
     127      def __str__(self):
     128          return self.msg
     129  
     130  class ESC[4;38;5;81mTestFailedWithDetails(ESC[4;38;5;149mTestFailed):
     131      """Test failed."""
     132      def __init__(self, msg, errors, failures, stats):
     133          self.errors = errors
     134          self.failures = failures
     135          super().__init__(msg, errors, failures, stats=stats)
     136  
     137  class ESC[4;38;5;81mTestDidNotRun(ESC[4;38;5;149mError):
     138      """Test did not run any subtests."""
     139  
     140  class ESC[4;38;5;81mResourceDenied(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mSkipTest):
     141      """Test skipped because it requested a disallowed resource.
     142  
     143      This is raised when a test calls requires() for a resource that
     144      has not be enabled.  It is used to distinguish between expected
     145      and unexpected skips.
     146      """
     147  
     148  def anticipate_failure(condition):
     149      """Decorator to mark a test that is known to be broken in some cases
     150  
     151         Any use of this decorator should have a comment identifying the
     152         associated tracker issue.
     153      """
     154      if condition:
     155          return unittest.expectedFailure
     156      return lambda f: f
     157  
     158  def load_package_tests(pkg_dir, loader, standard_tests, pattern):
     159      """Generic load_tests implementation for simple test packages.
     160  
     161      Most packages can implement load_tests using this function as follows:
     162  
     163         def load_tests(*args):
     164             return load_package_tests(os.path.dirname(__file__), *args)
     165      """
     166      if pattern is None:
     167          pattern = "test*"
     168      top_dir = STDLIB_DIR
     169      package_tests = loader.discover(start_dir=pkg_dir,
     170                                      top_level_dir=top_dir,
     171                                      pattern=pattern)
     172      standard_tests.addTests(package_tests)
     173      return standard_tests
     174  
     175  
     176  def get_attribute(obj, name):
     177      """Get an attribute, raising SkipTest if AttributeError is raised."""
     178      try:
     179          attribute = getattr(obj, name)
     180      except AttributeError:
     181          raise unittest.SkipTest("object %r has no attribute %r" % (obj, name))
     182      else:
     183          return attribute
     184  
     185  verbose = 1              # Flag set to 0 by regrtest.py
     186  use_resources = None     # Flag set to [] by regrtest.py
     187  max_memuse = 0           # Disable bigmem tests (they will still be run with
     188                           # small sizes, to make sure they work.)
     189  real_max_memuse = 0
     190  junit_xml_list = None    # list of testsuite XML elements
     191  failfast = False
     192  
     193  # _original_stdout is meant to hold stdout at the time regrtest began.
     194  # This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
     195  # The point is to have some flavor of stdout the user can actually see.
     196  _original_stdout = None
     197  def record_original_stdout(stdout):
     198      global _original_stdout
     199      _original_stdout = stdout
     200  
     201  def get_original_stdout():
     202      return _original_stdout or sys.stdout
     203  
     204  
     205  def _force_run(path, func, *args):
     206      try:
     207          return func(*args)
     208      except FileNotFoundError as err:
     209          # chmod() won't fix a missing file.
     210          if verbose >= 2:
     211              print('%s: %s' % (err.__class__.__name__, err))
     212          raise
     213      except OSError as err:
     214          if verbose >= 2:
     215              print('%s: %s' % (err.__class__.__name__, err))
     216              print('re-run %s%r' % (func.__name__, args))
     217          os.chmod(path, stat.S_IRWXU)
     218          return func(*args)
     219  
     220  
     221  # Check whether a gui is actually available
     222  def _is_gui_available():
     223      if hasattr(_is_gui_available, 'result'):
     224          return _is_gui_available.result
     225      import platform
     226      reason = None
     227      if sys.platform.startswith('win') and platform.win32_is_iot():
     228          reason = "gui is not available on Windows IoT Core"
     229      elif sys.platform.startswith('win'):
     230          # if Python is running as a service (such as the buildbot service),
     231          # gui interaction may be disallowed
     232          import ctypes
     233          import ctypes.wintypes
     234          UOI_FLAGS = 1
     235          WSF_VISIBLE = 0x0001
     236          class ESC[4;38;5;81mUSEROBJECTFLAGS(ESC[4;38;5;149mctypesESC[4;38;5;149m.ESC[4;38;5;149mStructure):
     237              _fields_ = [("fInherit", ctypes.wintypes.BOOL),
     238                          ("fReserved", ctypes.wintypes.BOOL),
     239                          ("dwFlags", ctypes.wintypes.DWORD)]
     240          dll = ctypes.windll.user32
     241          h = dll.GetProcessWindowStation()
     242          if not h:
     243              raise ctypes.WinError()
     244          uof = USEROBJECTFLAGS()
     245          needed = ctypes.wintypes.DWORD()
     246          res = dll.GetUserObjectInformationW(h,
     247              UOI_FLAGS,
     248              ctypes.byref(uof),
     249              ctypes.sizeof(uof),
     250              ctypes.byref(needed))
     251          if not res:
     252              raise ctypes.WinError()
     253          if not bool(uof.dwFlags & WSF_VISIBLE):
     254              reason = "gui not available (WSF_VISIBLE flag not set)"
     255      elif sys.platform == 'darwin':
     256          # The Aqua Tk implementations on OS X can abort the process if
     257          # being called in an environment where a window server connection
     258          # cannot be made, for instance when invoked by a buildbot or ssh
     259          # process not running under the same user id as the current console
     260          # user.  To avoid that, raise an exception if the window manager
     261          # connection is not available.
     262          from ctypes import cdll, c_int, pointer, Structure
     263          from ctypes.util import find_library
     264  
     265          app_services = cdll.LoadLibrary(find_library("ApplicationServices"))
     266  
     267          if app_services.CGMainDisplayID() == 0:
     268              reason = "gui tests cannot run without OS X window manager"
     269          else:
     270              class ESC[4;38;5;81mProcessSerialNumber(ESC[4;38;5;149mStructure):
     271                  _fields_ = [("highLongOfPSN", c_int),
     272                              ("lowLongOfPSN", c_int)]
     273              psn = ProcessSerialNumber()
     274              psn_p = pointer(psn)
     275              if (  (app_services.GetCurrentProcess(psn_p) < 0) or
     276                    (app_services.SetFrontProcess(psn_p) < 0) ):
     277                  reason = "cannot run without OS X gui process"
     278  
     279      # check on every platform whether tkinter can actually do anything
     280      if not reason:
     281          try:
     282              from tkinter import Tk
     283              root = Tk()
     284              root.withdraw()
     285              root.update()
     286              root.destroy()
     287          except Exception as e:
     288              err_string = str(e)
     289              if len(err_string) > 50:
     290                  err_string = err_string[:50] + ' [...]'
     291              reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__,
     292                                                             err_string)
     293  
     294      _is_gui_available.reason = reason
     295      _is_gui_available.result = not reason
     296  
     297      return _is_gui_available.result
     298  
     299  def is_resource_enabled(resource):
     300      """Test whether a resource is enabled.
     301  
     302      Known resources are set by regrtest.py.  If not running under regrtest.py,
     303      all resources are assumed enabled unless use_resources has been set.
     304      """
     305      return use_resources is None or resource in use_resources
     306  
     307  def requires(resource, msg=None):
     308      """Raise ResourceDenied if the specified resource is not available."""
     309      if not is_resource_enabled(resource):
     310          if msg is None:
     311              msg = "Use of the %r resource not enabled" % resource
     312          raise ResourceDenied(msg)
     313      if resource in {"network", "urlfetch"} and not has_socket_support:
     314          raise ResourceDenied("No socket support")
     315      if resource == 'gui' and not _is_gui_available():
     316          raise ResourceDenied(_is_gui_available.reason)
     317  
     318  def _requires_unix_version(sysname, min_version):
     319      """Decorator raising SkipTest if the OS is `sysname` and the version is less
     320      than `min_version`.
     321  
     322      For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if
     323      the FreeBSD version is less than 7.2.
     324      """
     325      import platform
     326      min_version_txt = '.'.join(map(str, min_version))
     327      version_txt = platform.release().split('-', 1)[0]
     328      if platform.system() == sysname:
     329          try:
     330              version = tuple(map(int, version_txt.split('.')))
     331          except ValueError:
     332              skip = False
     333          else:
     334              skip = version < min_version
     335      else:
     336          skip = False
     337  
     338      return unittest.skipIf(
     339          skip,
     340          f"{sysname} version {min_version_txt} or higher required, not "
     341          f"{version_txt}"
     342      )
     343  
     344  
     345  def requires_freebsd_version(*min_version):
     346      """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is
     347      less than `min_version`.
     348  
     349      For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD
     350      version is less than 7.2.
     351      """
     352      return _requires_unix_version('FreeBSD', min_version)
     353  
     354  def requires_linux_version(*min_version):
     355      """Decorator raising SkipTest if the OS is Linux and the Linux version is
     356      less than `min_version`.
     357  
     358      For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux
     359      version is less than 2.6.32.
     360      """
     361      return _requires_unix_version('Linux', min_version)
     362  
     363  def requires_mac_ver(*min_version):
     364      """Decorator raising SkipTest if the OS is Mac OS X and the OS X
     365      version if less than min_version.
     366  
     367      For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version
     368      is lesser than 10.5.
     369      """
     370      def decorator(func):
     371          @functools.wraps(func)
     372          def wrapper(*args, **kw):
     373              if sys.platform == 'darwin':
     374                  import platform
     375                  version_txt = platform.mac_ver()[0]
     376                  try:
     377                      version = tuple(map(int, version_txt.split('.')))
     378                  except ValueError:
     379                      pass
     380                  else:
     381                      if version < min_version:
     382                          min_version_txt = '.'.join(map(str, min_version))
     383                          raise unittest.SkipTest(
     384                              "Mac OS X %s or higher required, not %s"
     385                              % (min_version_txt, version_txt))
     386              return func(*args, **kw)
     387          wrapper.min_version = min_version
     388          return wrapper
     389      return decorator
     390  
     391  
     392  def skip_if_buildbot(reason=None):
     393      """Decorator raising SkipTest if running on a buildbot."""
     394      if not reason:
     395          reason = 'not suitable for buildbots'
     396      try:
     397          isbuildbot = getpass.getuser().lower() == 'buildbot'
     398      except (KeyError, EnvironmentError) as err:
     399          warnings.warn(f'getpass.getuser() failed {err}.', RuntimeWarning)
     400          isbuildbot = False
     401      return unittest.skipIf(isbuildbot, reason)
     402  
     403  def check_sanitizer(*, address=False, memory=False, ub=False):
     404      """Returns True if Python is compiled with sanitizer support"""
     405      if not (address or memory or ub):
     406          raise ValueError('At least one of address, memory, or ub must be True')
     407  
     408  
     409      _cflags = sysconfig.get_config_var('CFLAGS') or ''
     410      _config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
     411      memory_sanitizer = (
     412          '-fsanitize=memory' in _cflags or
     413          '--with-memory-sanitizer' in _config_args
     414      )
     415      address_sanitizer = (
     416          '-fsanitize=address' in _cflags or
     417          '--with-address-sanitizer' in _config_args
     418      )
     419      ub_sanitizer = (
     420          '-fsanitize=undefined' in _cflags or
     421          '--with-undefined-behavior-sanitizer' in _config_args
     422      )
     423      return (
     424          (memory and memory_sanitizer) or
     425          (address and address_sanitizer) or
     426          (ub and ub_sanitizer)
     427      )
     428  
     429  
     430  def skip_if_sanitizer(reason=None, *, address=False, memory=False, ub=False):
     431      """Decorator raising SkipTest if running with a sanitizer active."""
     432      if not reason:
     433          reason = 'not working with sanitizers active'
     434      skip = check_sanitizer(address=address, memory=memory, ub=ub)
     435      return unittest.skipIf(skip, reason)
     436  
     437  
     438  def system_must_validate_cert(f):
     439      """Skip the test on TLS certificate validation failures."""
     440      @functools.wraps(f)
     441      def dec(*args, **kwargs):
     442          try:
     443              f(*args, **kwargs)
     444          except OSError as e:
     445              if "CERTIFICATE_VERIFY_FAILED" in str(e):
     446                  raise unittest.SkipTest("system does not contain "
     447                                          "necessary certificates")
     448              raise
     449      return dec
     450  
     451  # A constant likely larger than the underlying OS pipe buffer size, to
     452  # make writes blocking.
     453  # Windows limit seems to be around 512 B, and many Unix kernels have a
     454  # 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure.
     455  # (see issue #17835 for a discussion of this number).
     456  PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1
     457  
     458  # A constant likely larger than the underlying OS socket buffer size, to make
     459  # writes blocking.
     460  # The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl
     461  # on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF).  See issue #18643
     462  # for a discussion of this number.
     463  SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1
     464  
     465  # decorator for skipping tests on non-IEEE 754 platforms
     466  requires_IEEE_754 = unittest.skipUnless(
     467      float.__getformat__("double").startswith("IEEE"),
     468      "test requires IEEE 754 doubles")
     469  
     470  def requires_zlib(reason='requires zlib'):
     471      try:
     472          import zlib
     473      except ImportError:
     474          zlib = None
     475      return unittest.skipUnless(zlib, reason)
     476  
     477  def requires_gzip(reason='requires gzip'):
     478      try:
     479          import gzip
     480      except ImportError:
     481          gzip = None
     482      return unittest.skipUnless(gzip, reason)
     483  
     484  def requires_bz2(reason='requires bz2'):
     485      try:
     486          import bz2
     487      except ImportError:
     488          bz2 = None
     489      return unittest.skipUnless(bz2, reason)
     490  
     491  def requires_lzma(reason='requires lzma'):
     492      try:
     493          import lzma
     494      except ImportError:
     495          lzma = None
     496      return unittest.skipUnless(lzma, reason)
     497  
     498  def has_no_debug_ranges():
     499      try:
     500          import _testinternalcapi
     501      except ImportError:
     502          raise unittest.SkipTest("_testinternalcapi required")
     503      config = _testinternalcapi.get_config()
     504      return not bool(config['code_debug_ranges'])
     505  
     506  def requires_debug_ranges(reason='requires co_positions / debug_ranges'):
     507      return unittest.skipIf(has_no_debug_ranges(), reason)
     508  
     509  def requires_legacy_unicode_capi():
     510      try:
     511          from _testcapi import unicode_legacy_string
     512      except ImportError:
     513          unicode_legacy_string = None
     514  
     515      return unittest.skipUnless(unicode_legacy_string,
     516                                 'requires legacy Unicode C API')
     517  
     518  # Is not actually used in tests, but is kept for compatibility.
     519  is_jython = sys.platform.startswith('java')
     520  
     521  is_android = hasattr(sys, 'getandroidapilevel')
     522  
     523  if sys.platform not in ('win32', 'vxworks'):
     524      unix_shell = '/system/bin/sh' if is_android else '/bin/sh'
     525  else:
     526      unix_shell = None
     527  
     528  # wasm32-emscripten and -wasi are POSIX-like but do not
     529  # have subprocess or fork support.
     530  is_emscripten = sys.platform == "emscripten"
     531  is_wasi = sys.platform == "wasi"
     532  
     533  has_fork_support = hasattr(os, "fork") and not is_emscripten and not is_wasi
     534  
     535  def requires_fork():
     536      return unittest.skipUnless(has_fork_support, "requires working os.fork()")
     537  
     538  has_subprocess_support = not is_emscripten and not is_wasi
     539  
     540  def requires_subprocess():
     541      """Used for subprocess, os.spawn calls, fd inheritance"""
     542      return unittest.skipUnless(has_subprocess_support, "requires subprocess support")
     543  
     544  # Emscripten's socket emulation and WASI sockets have limitations.
     545  has_socket_support = not is_emscripten and not is_wasi
     546  
     547  def requires_working_socket(*, module=False):
     548      """Skip tests or modules that require working sockets
     549  
     550      Can be used as a function/class decorator or to skip an entire module.
     551      """
     552      msg = "requires socket support"
     553      if module:
     554          if not has_socket_support:
     555              raise unittest.SkipTest(msg)
     556      else:
     557          return unittest.skipUnless(has_socket_support, msg)
     558  
     559  # Does strftime() support glibc extension like '%4Y'?
     560  has_strftime_extensions = False
     561  if sys.platform != "win32":
     562      # bpo-47037: Windows debug builds crash with "Debug Assertion Failed"
     563      try:
     564          has_strftime_extensions = time.strftime("%4Y") != "%4Y"
     565      except ValueError:
     566          pass
     567  
     568  # Define the URL of a dedicated HTTP server for the network tests.
     569  # The URL must use clear-text HTTP: no redirection to encrypted HTTPS.
     570  TEST_HTTP_URL = "http://www.pythontest.net"
     571  
     572  # Set by libregrtest/main.py so we can skip tests that are not
     573  # useful for PGO
     574  PGO = False
     575  
     576  # Set by libregrtest/main.py if we are running the extended (time consuming)
     577  # PGO task.  If this is True, PGO is also True.
     578  PGO_EXTENDED = False
     579  
     580  # TEST_DATA_DIR is used as a target download location for remote resources
     581  TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data")
     582  
     583  
     584  def darwin_malloc_err_warning(test_name):
     585      """Assure user that loud errors generated by macOS libc's malloc are
     586      expected."""
     587      if sys.platform != 'darwin':
     588          return
     589  
     590      import shutil
     591      msg = ' NOTICE '
     592      detail = (f'{test_name} may generate "malloc can\'t allocate region"\n'
     593                'warnings on macOS systems. This behavior is known. Do not\n'
     594                'report a bug unless tests are also failing.\n'
     595                'See https://github.com/python/cpython/issues/85100')
     596  
     597      padding, _ = shutil.get_terminal_size()
     598      print(msg.center(padding, '-'))
     599      print(detail)
     600      print('-' * padding)
     601  
     602  
     603  def findfile(filename, subdir=None):
     604      """Try to find a file on sys.path or in the test directory.  If it is not
     605      found the argument passed to the function is returned (this does not
     606      necessarily signal failure; could still be the legitimate path).
     607  
     608      Setting *subdir* indicates a relative path to use to find the file
     609      rather than looking directly in the path directories.
     610      """
     611      if os.path.isabs(filename):
     612          return filename
     613      if subdir is not None:
     614          filename = os.path.join(subdir, filename)
     615      path = [TEST_HOME_DIR] + sys.path
     616      for dn in path:
     617          fn = os.path.join(dn, filename)
     618          if os.path.exists(fn): return fn
     619      return filename
     620  
     621  
     622  def sortdict(dict):
     623      "Like repr(dict), but in sorted order."
     624      items = sorted(dict.items())
     625      reprpairs = ["%r: %r" % pair for pair in items]
     626      withcommas = ", ".join(reprpairs)
     627      return "{%s}" % withcommas
     628  
     629  
     630  def run_code(code: str) -> dict[str, object]:
     631      """Run a piece of code after dedenting it, and return its global namespace."""
     632      ns = {}
     633      exec(textwrap.dedent(code), ns)
     634      return ns
     635  
     636  
     637  def check_syntax_error(testcase, statement, errtext='', *, lineno=None, offset=None):
     638      with testcase.assertRaisesRegex(SyntaxError, errtext) as cm:
     639          compile(statement, '<test string>', 'exec')
     640      err = cm.exception
     641      testcase.assertIsNotNone(err.lineno)
     642      if lineno is not None:
     643          testcase.assertEqual(err.lineno, lineno)
     644      testcase.assertIsNotNone(err.offset)
     645      if offset is not None:
     646          testcase.assertEqual(err.offset, offset)
     647  
     648  
     649  def open_urlresource(url, *args, **kw):
     650      import urllib.request, urllib.parse
     651      from .os_helper import unlink
     652      try:
     653          import gzip
     654      except ImportError:
     655          gzip = None
     656  
     657      check = kw.pop('check', None)
     658  
     659      filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
     660  
     661      fn = os.path.join(TEST_DATA_DIR, filename)
     662  
     663      def check_valid_file(fn):
     664          f = open(fn, *args, **kw)
     665          if check is None:
     666              return f
     667          elif check(f):
     668              f.seek(0)
     669              return f
     670          f.close()
     671  
     672      if os.path.exists(fn):
     673          f = check_valid_file(fn)
     674          if f is not None:
     675              return f
     676          unlink(fn)
     677  
     678      # Verify the requirement before downloading the file
     679      requires('urlfetch')
     680  
     681      if verbose:
     682          print('\tfetching %s ...' % url, file=get_original_stdout())
     683      opener = urllib.request.build_opener()
     684      if gzip:
     685          opener.addheaders.append(('Accept-Encoding', 'gzip'))
     686      f = opener.open(url, timeout=INTERNET_TIMEOUT)
     687      if gzip and f.headers.get('Content-Encoding') == 'gzip':
     688          f = gzip.GzipFile(fileobj=f)
     689      try:
     690          with open(fn, "wb") as out:
     691              s = f.read()
     692              while s:
     693                  out.write(s)
     694                  s = f.read()
     695      finally:
     696          f.close()
     697  
     698      f = check_valid_file(fn)
     699      if f is not None:
     700          return f
     701      raise TestFailed('invalid resource %r' % fn)
     702  
     703  
     704  @contextlib.contextmanager
     705  def captured_output(stream_name):
     706      """Return a context manager used by captured_stdout/stdin/stderr
     707      that temporarily replaces the sys stream *stream_name* with a StringIO."""
     708      import io
     709      orig_stdout = getattr(sys, stream_name)
     710      setattr(sys, stream_name, io.StringIO())
     711      try:
     712          yield getattr(sys, stream_name)
     713      finally:
     714          setattr(sys, stream_name, orig_stdout)
     715  
     716  def captured_stdout():
     717      """Capture the output of sys.stdout:
     718  
     719         with captured_stdout() as stdout:
     720             print("hello")
     721         self.assertEqual(stdout.getvalue(), "hello\\n")
     722      """
     723      return captured_output("stdout")
     724  
     725  def captured_stderr():
     726      """Capture the output of sys.stderr:
     727  
     728         with captured_stderr() as stderr:
     729             print("hello", file=sys.stderr)
     730         self.assertEqual(stderr.getvalue(), "hello\\n")
     731      """
     732      return captured_output("stderr")
     733  
     734  def captured_stdin():
     735      """Capture the input to sys.stdin:
     736  
     737         with captured_stdin() as stdin:
     738             stdin.write('hello\\n')
     739             stdin.seek(0)
     740             # call test code that consumes from sys.stdin
     741             captured = input()
     742         self.assertEqual(captured, "hello")
     743      """
     744      return captured_output("stdin")
     745  
     746  
     747  def gc_collect():
     748      """Force as many objects as possible to be collected.
     749  
     750      In non-CPython implementations of Python, this is needed because timely
     751      deallocation is not guaranteed by the garbage collector.  (Even in CPython
     752      this can be the case in case of reference cycles.)  This means that __del__
     753      methods may be called later than expected and weakrefs may remain alive for
     754      longer than expected.  This function tries its best to force all garbage
     755      objects to disappear.
     756      """
     757      import gc
     758      gc.collect()
     759      gc.collect()
     760      gc.collect()
     761  
     762  @contextlib.contextmanager
     763  def disable_gc():
     764      import gc
     765      have_gc = gc.isenabled()
     766      gc.disable()
     767      try:
     768          yield
     769      finally:
     770          if have_gc:
     771              gc.enable()
     772  
     773  
     774  def python_is_optimized():
     775      """Find if Python was built with optimizations."""
     776      cflags = sysconfig.get_config_var('PY_CFLAGS') or ''
     777      final_opt = ""
     778      for opt in cflags.split():
     779          if opt.startswith('-O'):
     780              final_opt = opt
     781      return final_opt not in ('', '-O0', '-Og')
     782  
     783  
     784  _header = 'nP'
     785  _align = '0n'
     786  if hasattr(sys, "getobjects"):
     787      _header = '2P' + _header
     788      _align = '0P'
     789  _vheader = _header + 'n'
     790  
     791  def calcobjsize(fmt):
     792      import struct
     793      return struct.calcsize(_header + fmt + _align)
     794  
     795  def calcvobjsize(fmt):
     796      import struct
     797      return struct.calcsize(_vheader + fmt + _align)
     798  
     799  
     800  _TPFLAGS_HAVE_GC = 1<<14
     801  _TPFLAGS_HEAPTYPE = 1<<9
     802  
     803  def check_sizeof(test, o, size):
     804      try:
     805          import _testinternalcapi
     806      except ImportError:
     807          raise unittest.SkipTest("_testinternalcapi required")
     808      result = sys.getsizeof(o)
     809      # add GC header size
     810      if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
     811          ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
     812          size += _testinternalcapi.SIZEOF_PYGC_HEAD
     813      msg = 'wrong size for %s: got %d, expected %d' \
     814              % (type(o), result, size)
     815      test.assertEqual(result, size, msg)
     816  
     817  #=======================================================================
     818  # Decorator for running a function in a different locale, correctly resetting
     819  # it afterwards.
     820  
     821  @contextlib.contextmanager
     822  def run_with_locale(catstr, *locales):
     823      try:
     824          import locale
     825          category = getattr(locale, catstr)
     826          orig_locale = locale.setlocale(category)
     827      except AttributeError:
     828          # if the test author gives us an invalid category string
     829          raise
     830      except:
     831          # cannot retrieve original locale, so do nothing
     832          locale = orig_locale = None
     833      else:
     834          for loc in locales:
     835              try:
     836                  locale.setlocale(category, loc)
     837                  break
     838              except:
     839                  pass
     840  
     841      try:
     842          yield
     843      finally:
     844          if locale and orig_locale:
     845              locale.setlocale(category, orig_locale)
     846  
     847  #=======================================================================
     848  # Decorator for running a function in a specific timezone, correctly
     849  # resetting it afterwards.
     850  
     851  def run_with_tz(tz):
     852      def decorator(func):
     853          def inner(*args, **kwds):
     854              try:
     855                  tzset = time.tzset
     856              except AttributeError:
     857                  raise unittest.SkipTest("tzset required")
     858              if 'TZ' in os.environ:
     859                  orig_tz = os.environ['TZ']
     860              else:
     861                  orig_tz = None
     862              os.environ['TZ'] = tz
     863              tzset()
     864  
     865              # now run the function, resetting the tz on exceptions
     866              try:
     867                  return func(*args, **kwds)
     868              finally:
     869                  if orig_tz is None:
     870                      del os.environ['TZ']
     871                  else:
     872                      os.environ['TZ'] = orig_tz
     873                  time.tzset()
     874  
     875          inner.__name__ = func.__name__
     876          inner.__doc__ = func.__doc__
     877          return inner
     878      return decorator
     879  
     880  #=======================================================================
     881  # Big-memory-test support. Separate from 'resources' because memory use
     882  # should be configurable.
     883  
     884  # Some handy shorthands. Note that these are used for byte-limits as well
     885  # as size-limits, in the various bigmem tests
     886  _1M = 1024*1024
     887  _1G = 1024 * _1M
     888  _2G = 2 * _1G
     889  _4G = 4 * _1G
     890  
     891  MAX_Py_ssize_t = sys.maxsize
     892  
     893  def set_memlimit(limit):
     894      global max_memuse
     895      global real_max_memuse
     896      sizes = {
     897          'k': 1024,
     898          'm': _1M,
     899          'g': _1G,
     900          't': 1024*_1G,
     901      }
     902      m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
     903                   re.IGNORECASE | re.VERBOSE)
     904      if m is None:
     905          raise ValueError('Invalid memory limit %r' % (limit,))
     906      memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
     907      real_max_memuse = memlimit
     908      if memlimit > MAX_Py_ssize_t:
     909          memlimit = MAX_Py_ssize_t
     910      if memlimit < _2G - 1:
     911          raise ValueError('Memory limit %r too low to be useful' % (limit,))
     912      max_memuse = memlimit
     913  
     914  class ESC[4;38;5;81m_MemoryWatchdog:
     915      """An object which periodically watches the process' memory consumption
     916      and prints it out.
     917      """
     918  
     919      def __init__(self):
     920          self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid())
     921          self.started = False
     922  
     923      def start(self):
     924          import warnings
     925          try:
     926              f = open(self.procfile, 'r')
     927          except OSError as e:
     928              warnings.warn('/proc not available for stats: {}'.format(e),
     929                            RuntimeWarning)
     930              sys.stderr.flush()
     931              return
     932  
     933          import subprocess
     934          with f:
     935              watchdog_script = findfile("memory_watchdog.py")
     936              self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script],
     937                                                   stdin=f,
     938                                                   stderr=subprocess.DEVNULL)
     939          self.started = True
     940  
     941      def stop(self):
     942          if self.started:
     943              self.mem_watchdog.terminate()
     944              self.mem_watchdog.wait()
     945  
     946  
     947  def bigmemtest(size, memuse, dry_run=True):
     948      """Decorator for bigmem tests.
     949  
     950      'size' is a requested size for the test (in arbitrary, test-interpreted
     951      units.) 'memuse' is the number of bytes per unit for the test, or a good
     952      estimate of it. For example, a test that needs two byte buffers, of 4 GiB
     953      each, could be decorated with @bigmemtest(size=_4G, memuse=2).
     954  
     955      The 'size' argument is normally passed to the decorated test method as an
     956      extra argument. If 'dry_run' is true, the value passed to the test method
     957      may be less than the requested value. If 'dry_run' is false, it means the
     958      test doesn't support dummy runs when -M is not specified.
     959      """
     960      def decorator(f):
     961          def wrapper(self):
     962              size = wrapper.size
     963              memuse = wrapper.memuse
     964              if not real_max_memuse:
     965                  maxsize = 5147
     966              else:
     967                  maxsize = size
     968  
     969              if ((real_max_memuse or not dry_run)
     970                  and real_max_memuse < maxsize * memuse):
     971                  raise unittest.SkipTest(
     972                      "not enough memory: %.1fG minimum needed"
     973                      % (size * memuse / (1024 ** 3)))
     974  
     975              if real_max_memuse and verbose:
     976                  print()
     977                  print(" ... expected peak memory use: {peak:.1f}G"
     978                        .format(peak=size * memuse / (1024 ** 3)))
     979                  watchdog = _MemoryWatchdog()
     980                  watchdog.start()
     981              else:
     982                  watchdog = None
     983  
     984              try:
     985                  return f(self, maxsize)
     986              finally:
     987                  if watchdog:
     988                      watchdog.stop()
     989  
     990          wrapper.size = size
     991          wrapper.memuse = memuse
     992          return wrapper
     993      return decorator
     994  
     995  def bigaddrspacetest(f):
     996      """Decorator for tests that fill the address space."""
     997      def wrapper(self):
     998          if max_memuse < MAX_Py_ssize_t:
     999              if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
    1000                  raise unittest.SkipTest(
    1001                      "not enough memory: try a 32-bit build instead")
    1002              else:
    1003                  raise unittest.SkipTest(
    1004                      "not enough memory: %.1fG minimum needed"
    1005                      % (MAX_Py_ssize_t / (1024 ** 3)))
    1006          else:
    1007              return f(self)
    1008      return wrapper
    1009  
    1010  #=======================================================================
    1011  # unittest integration.
    1012  
    1013  def _id(obj):
    1014      return obj
    1015  
    1016  def requires_resource(resource):
    1017      if resource == 'gui' and not _is_gui_available():
    1018          return unittest.skip(_is_gui_available.reason)
    1019      if is_resource_enabled(resource):
    1020          return _id
    1021      else:
    1022          return unittest.skip("resource {0!r} is not enabled".format(resource))
    1023  
    1024  def cpython_only(test):
    1025      """
    1026      Decorator for tests only applicable on CPython.
    1027      """
    1028      return impl_detail(cpython=True)(test)
    1029  
    1030  def impl_detail(msg=None, **guards):
    1031      if check_impl_detail(**guards):
    1032          return _id
    1033      if msg is None:
    1034          guardnames, default = _parse_guards(guards)
    1035          if default:
    1036              msg = "implementation detail not available on {0}"
    1037          else:
    1038              msg = "implementation detail specific to {0}"
    1039          guardnames = sorted(guardnames.keys())
    1040          msg = msg.format(' or '.join(guardnames))
    1041      return unittest.skip(msg)
    1042  
    1043  def _parse_guards(guards):
    1044      # Returns a tuple ({platform_name: run_me}, default_value)
    1045      if not guards:
    1046          return ({'cpython': True}, False)
    1047      is_true = list(guards.values())[0]
    1048      assert list(guards.values()) == [is_true] * len(guards)   # all True or all False
    1049      return (guards, not is_true)
    1050  
    1051  # Use the following check to guard CPython's implementation-specific tests --
    1052  # or to run them only on the implementation(s) guarded by the arguments.
    1053  def check_impl_detail(**guards):
    1054      """This function returns True or False depending on the host platform.
    1055         Examples:
    1056            if check_impl_detail():               # only on CPython (default)
    1057            if check_impl_detail(jython=True):    # only on Jython
    1058            if check_impl_detail(cpython=False):  # everywhere except on CPython
    1059      """
    1060      guards, default = _parse_guards(guards)
    1061      return guards.get(sys.implementation.name, default)
    1062  
    1063  
    1064  def no_tracing(func):
    1065      """Decorator to temporarily turn off tracing for the duration of a test."""
    1066      if not hasattr(sys, 'gettrace'):
    1067          return func
    1068      else:
    1069          @functools.wraps(func)
    1070          def wrapper(*args, **kwargs):
    1071              original_trace = sys.gettrace()
    1072              try:
    1073                  sys.settrace(None)
    1074                  return func(*args, **kwargs)
    1075              finally:
    1076                  sys.settrace(original_trace)
    1077          return wrapper
    1078  
    1079  
    1080  def refcount_test(test):
    1081      """Decorator for tests which involve reference counting.
    1082  
    1083      To start, the decorator does not run the test if is not run by CPython.
    1084      After that, any trace function is unset during the test to prevent
    1085      unexpected refcounts caused by the trace function.
    1086  
    1087      """
    1088      return no_tracing(cpython_only(test))
    1089  
    1090  
    1091  def requires_limited_api(test):
    1092      try:
    1093          import _testcapi
    1094      except ImportError:
    1095          return unittest.skip('needs _testcapi module')(test)
    1096      return unittest.skipUnless(
    1097          _testcapi.LIMITED_API_AVAILABLE, 'needs Limited API support')(test)
    1098  
    1099  def requires_specialization(test):
    1100      return unittest.skipUnless(
    1101          opcode.ENABLE_SPECIALIZATION, "requires specialization")(test)
    1102  
    1103  def _filter_suite(suite, pred):
    1104      """Recursively filter test cases in a suite based on a predicate."""
    1105      newtests = []
    1106      for test in suite._tests:
    1107          if isinstance(test, unittest.TestSuite):
    1108              _filter_suite(test, pred)
    1109              newtests.append(test)
    1110          else:
    1111              if pred(test):
    1112                  newtests.append(test)
    1113      suite._tests = newtests
    1114  
    1115  @dataclasses.dataclass(slots=True)
    1116  class ESC[4;38;5;81mTestStats:
    1117      tests_run: int = 0
    1118      failures: int = 0
    1119      skipped: int = 0
    1120  
    1121      @staticmethod
    1122      def from_unittest(result):
    1123          return TestStats(result.testsRun,
    1124                           len(result.failures),
    1125                           len(result.skipped))
    1126  
    1127      @staticmethod
    1128      def from_doctest(results):
    1129          return TestStats(results.attempted,
    1130                           results.failed)
    1131  
    1132      def accumulate(self, stats):
    1133          self.tests_run += stats.tests_run
    1134          self.failures += stats.failures
    1135          self.skipped += stats.skipped
    1136  
    1137  
    1138  def _run_suite(suite):
    1139      """Run tests from a unittest.TestSuite-derived class."""
    1140      runner = get_test_runner(sys.stdout,
    1141                               verbosity=verbose,
    1142                               capture_output=(junit_xml_list is not None))
    1143  
    1144      result = runner.run(suite)
    1145  
    1146      if junit_xml_list is not None:
    1147          junit_xml_list.append(result.get_xml_element())
    1148  
    1149      if not result.testsRun and not result.skipped and not result.errors:
    1150          raise TestDidNotRun
    1151      if not result.wasSuccessful():
    1152          stats = TestStats.from_unittest(result)
    1153          if len(result.errors) == 1 and not result.failures:
    1154              err = result.errors[0][1]
    1155          elif len(result.failures) == 1 and not result.errors:
    1156              err = result.failures[0][1]
    1157          else:
    1158              err = "multiple errors occurred"
    1159              if not verbose: err += "; run in verbose mode for details"
    1160          errors = [(str(tc), exc_str) for tc, exc_str in result.errors]
    1161          failures = [(str(tc), exc_str) for tc, exc_str in result.failures]
    1162          raise TestFailedWithDetails(err, errors, failures, stats=stats)
    1163      return result
    1164  
    1165  
    1166  # By default, don't filter tests
    1167  _match_test_func = None
    1168  
    1169  _accept_test_patterns = None
    1170  _ignore_test_patterns = None
    1171  
    1172  
    1173  def match_test(test):
    1174      # Function used by support.run_unittest() and regrtest --list-cases
    1175      if _match_test_func is None:
    1176          return True
    1177      else:
    1178          return _match_test_func(test.id())
    1179  
    1180  
    1181  def _is_full_match_test(pattern):
    1182      # If a pattern contains at least one dot, it's considered
    1183      # as a full test identifier.
    1184      # Example: 'test.test_os.FileTests.test_access'.
    1185      #
    1186      # ignore patterns which contain fnmatch patterns: '*', '?', '[...]'
    1187      # or '[!...]'. For example, ignore 'test_access*'.
    1188      return ('.' in pattern) and (not re.search(r'[?*\[\]]', pattern))
    1189  
    1190  
    1191  def set_match_tests(accept_patterns=None, ignore_patterns=None):
    1192      global _match_test_func, _accept_test_patterns, _ignore_test_patterns
    1193  
    1194      if accept_patterns is None:
    1195          accept_patterns = ()
    1196      if ignore_patterns is None:
    1197          ignore_patterns = ()
    1198  
    1199      accept_func = ignore_func = None
    1200  
    1201      if accept_patterns != _accept_test_patterns:
    1202          accept_patterns, accept_func = _compile_match_function(accept_patterns)
    1203      if ignore_patterns != _ignore_test_patterns:
    1204          ignore_patterns, ignore_func = _compile_match_function(ignore_patterns)
    1205  
    1206      # Create a copy since patterns can be mutable and so modified later
    1207      _accept_test_patterns = tuple(accept_patterns)
    1208      _ignore_test_patterns = tuple(ignore_patterns)
    1209  
    1210      if accept_func is not None or ignore_func is not None:
    1211          def match_function(test_id):
    1212              accept = True
    1213              ignore = False
    1214              if accept_func:
    1215                  accept = accept_func(test_id)
    1216              if ignore_func:
    1217                  ignore = ignore_func(test_id)
    1218              return accept and not ignore
    1219  
    1220          _match_test_func = match_function
    1221  
    1222  
    1223  def _compile_match_function(patterns):
    1224      if not patterns:
    1225          func = None
    1226          # set_match_tests(None) behaves as set_match_tests(())
    1227          patterns = ()
    1228      elif all(map(_is_full_match_test, patterns)):
    1229          # Simple case: all patterns are full test identifier.
    1230          # The test.bisect_cmd utility only uses such full test identifiers.
    1231          func = set(patterns).__contains__
    1232      else:
    1233          import fnmatch
    1234          regex = '|'.join(map(fnmatch.translate, patterns))
    1235          # The search *is* case sensitive on purpose:
    1236          # don't use flags=re.IGNORECASE
    1237          regex_match = re.compile(regex).match
    1238  
    1239          def match_test_regex(test_id):
    1240              if regex_match(test_id):
    1241                  # The regex matches the whole identifier, for example
    1242                  # 'test.test_os.FileTests.test_access'.
    1243                  return True
    1244              else:
    1245                  # Try to match parts of the test identifier.
    1246                  # For example, split 'test.test_os.FileTests.test_access'
    1247                  # into: 'test', 'test_os', 'FileTests' and 'test_access'.
    1248                  return any(map(regex_match, test_id.split(".")))
    1249  
    1250          func = match_test_regex
    1251  
    1252      return patterns, func
    1253  
    1254  
    1255  def run_unittest(*classes):
    1256      """Run tests from unittest.TestCase-derived classes."""
    1257      valid_types = (unittest.TestSuite, unittest.TestCase)
    1258      loader = unittest.TestLoader()
    1259      suite = unittest.TestSuite()
    1260      for cls in classes:
    1261          if isinstance(cls, str):
    1262              if cls in sys.modules:
    1263                  suite.addTest(loader.loadTestsFromModule(sys.modules[cls]))
    1264              else:
    1265                  raise ValueError("str arguments must be keys in sys.modules")
    1266          elif isinstance(cls, valid_types):
    1267              suite.addTest(cls)
    1268          else:
    1269              suite.addTest(loader.loadTestsFromTestCase(cls))
    1270      _filter_suite(suite, match_test)
    1271      return _run_suite(suite)
    1272  
    1273  #=======================================================================
    1274  # Check for the presence of docstrings.
    1275  
    1276  # Rather than trying to enumerate all the cases where docstrings may be
    1277  # disabled, we just check for that directly
    1278  
    1279  def _check_docstrings():
    1280      """Just used to check if docstrings are enabled"""
    1281  
    1282  MISSING_C_DOCSTRINGS = (check_impl_detail() and
    1283                          sys.platform != 'win32' and
    1284                          not sysconfig.get_config_var('WITH_DOC_STRINGS'))
    1285  
    1286  HAVE_DOCSTRINGS = (_check_docstrings.__doc__ is not None and
    1287                     not MISSING_C_DOCSTRINGS)
    1288  
    1289  requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
    1290                                            "test requires docstrings")
    1291  
    1292  
    1293  #=======================================================================
    1294  # doctest driver.
    1295  
    1296  def run_doctest(module, verbosity=None, optionflags=0):
    1297      """Run doctest on the given module.  Return (#failures, #tests).
    1298  
    1299      If optional argument verbosity is not specified (or is None), pass
    1300      support's belief about verbosity on to doctest.  Else doctest's
    1301      usual behavior is used (it searches sys.argv for -v).
    1302      """
    1303  
    1304      import doctest
    1305  
    1306      if verbosity is None:
    1307          verbosity = verbose
    1308      else:
    1309          verbosity = None
    1310  
    1311      results = doctest.testmod(module,
    1312                               verbose=verbosity,
    1313                               optionflags=optionflags)
    1314      if results.failed:
    1315          stats = TestStats.from_doctest(results)
    1316          raise TestFailed(f"{results.failed} of {results.attempted} "
    1317                           f"doctests failed",
    1318                           stats=stats)
    1319      if verbose:
    1320          print('doctest (%s) ... %d tests with zero failures' %
    1321                (module.__name__, results.attempted))
    1322      return results
    1323  
    1324  
    1325  #=======================================================================
    1326  # Support for saving and restoring the imported modules.
    1327  
    1328  def flush_std_streams():
    1329      if sys.stdout is not None:
    1330          sys.stdout.flush()
    1331      if sys.stderr is not None:
    1332          sys.stderr.flush()
    1333  
    1334  
    1335  def print_warning(msg):
    1336      # bpo-45410: Explicitly flush stdout to keep logs in order
    1337      flush_std_streams()
    1338      stream = print_warning.orig_stderr
    1339      for line in msg.splitlines():
    1340          print(f"Warning -- {line}", file=stream)
    1341      stream.flush()
    1342  
    1343  # bpo-39983: Store the original sys.stderr at Python startup to be able to
    1344  # log warnings even if sys.stderr is captured temporarily by a test.
    1345  print_warning.orig_stderr = sys.stderr
    1346  
    1347  
    1348  # Flag used by saved_test_environment of test.libregrtest.save_env,
    1349  # to check if a test modified the environment. The flag should be set to False
    1350  # before running a new test.
    1351  #
    1352  # For example, threading_helper.threading_cleanup() sets the flag is the function fails
    1353  # to cleanup threads.
    1354  environment_altered = False
    1355  
    1356  def reap_children():
    1357      """Use this function at the end of test_main() whenever sub-processes
    1358      are started.  This will help ensure that no extra children (zombies)
    1359      stick around to hog resources and create problems when looking
    1360      for refleaks.
    1361      """
    1362      global environment_altered
    1363  
    1364      # Need os.waitpid(-1, os.WNOHANG): Windows is not supported
    1365      if not (hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG')):
    1366          return
    1367      elif not has_subprocess_support:
    1368          return
    1369  
    1370      # Reap all our dead child processes so we don't leave zombies around.
    1371      # These hog resources and might be causing some of the buildbots to die.
    1372      while True:
    1373          try:
    1374              # Read the exit status of any child process which already completed
    1375              pid, status = os.waitpid(-1, os.WNOHANG)
    1376          except OSError:
    1377              break
    1378  
    1379          if pid == 0:
    1380              break
    1381  
    1382          print_warning(f"reap_children() reaped child process {pid}")
    1383          environment_altered = True
    1384  
    1385  
    1386  @contextlib.contextmanager
    1387  def swap_attr(obj, attr, new_val):
    1388      """Temporary swap out an attribute with a new object.
    1389  
    1390      Usage:
    1391          with swap_attr(obj, "attr", 5):
    1392              ...
    1393  
    1394          This will set obj.attr to 5 for the duration of the with: block,
    1395          restoring the old value at the end of the block. If `attr` doesn't
    1396          exist on `obj`, it will be created and then deleted at the end of the
    1397          block.
    1398  
    1399          The old value (or None if it doesn't exist) will be assigned to the
    1400          target of the "as" clause, if there is one.
    1401      """
    1402      if hasattr(obj, attr):
    1403          real_val = getattr(obj, attr)
    1404          setattr(obj, attr, new_val)
    1405          try:
    1406              yield real_val
    1407          finally:
    1408              setattr(obj, attr, real_val)
    1409      else:
    1410          setattr(obj, attr, new_val)
    1411          try:
    1412              yield
    1413          finally:
    1414              if hasattr(obj, attr):
    1415                  delattr(obj, attr)
    1416  
    1417  @contextlib.contextmanager
    1418  def swap_item(obj, item, new_val):
    1419      """Temporary swap out an item with a new object.
    1420  
    1421      Usage:
    1422          with swap_item(obj, "item", 5):
    1423              ...
    1424  
    1425          This will set obj["item"] to 5 for the duration of the with: block,
    1426          restoring the old value at the end of the block. If `item` doesn't
    1427          exist on `obj`, it will be created and then deleted at the end of the
    1428          block.
    1429  
    1430          The old value (or None if it doesn't exist) will be assigned to the
    1431          target of the "as" clause, if there is one.
    1432      """
    1433      if item in obj:
    1434          real_val = obj[item]
    1435          obj[item] = new_val
    1436          try:
    1437              yield real_val
    1438          finally:
    1439              obj[item] = real_val
    1440      else:
    1441          obj[item] = new_val
    1442          try:
    1443              yield
    1444          finally:
    1445              if item in obj:
    1446                  del obj[item]
    1447  
    1448  def args_from_interpreter_flags():
    1449      """Return a list of command-line arguments reproducing the current
    1450      settings in sys.flags and sys.warnoptions."""
    1451      import subprocess
    1452      return subprocess._args_from_interpreter_flags()
    1453  
    1454  def optim_args_from_interpreter_flags():
    1455      """Return a list of command-line arguments reproducing the current
    1456      optimization settings in sys.flags."""
    1457      import subprocess
    1458      return subprocess._optim_args_from_interpreter_flags()
    1459  
    1460  
    1461  class ESC[4;38;5;81mMatcher(ESC[4;38;5;149mobject):
    1462  
    1463      _partial_matches = ('msg', 'message')
    1464  
    1465      def matches(self, d, **kwargs):
    1466          """
    1467          Try to match a single dict with the supplied arguments.
    1468  
    1469          Keys whose values are strings and which are in self._partial_matches
    1470          will be checked for partial (i.e. substring) matches. You can extend
    1471          this scheme to (for example) do regular expression matching, etc.
    1472          """
    1473          result = True
    1474          for k in kwargs:
    1475              v = kwargs[k]
    1476              dv = d.get(k)
    1477              if not self.match_value(k, dv, v):
    1478                  result = False
    1479                  break
    1480          return result
    1481  
    1482      def match_value(self, k, dv, v):
    1483          """
    1484          Try to match a single stored value (dv) with a supplied value (v).
    1485          """
    1486          if type(v) != type(dv):
    1487              result = False
    1488          elif type(dv) is not str or k not in self._partial_matches:
    1489              result = (v == dv)
    1490          else:
    1491              result = dv.find(v) >= 0
    1492          return result
    1493  
    1494  
    1495  _buggy_ucrt = None
    1496  def skip_if_buggy_ucrt_strfptime(test):
    1497      """
    1498      Skip decorator for tests that use buggy strptime/strftime
    1499  
    1500      If the UCRT bugs are present time.localtime().tm_zone will be
    1501      an empty string, otherwise we assume the UCRT bugs are fixed
    1502  
    1503      See bpo-37552 [Windows] strptime/strftime return invalid
    1504      results with UCRT version 17763.615
    1505      """
    1506      import locale
    1507      global _buggy_ucrt
    1508      if _buggy_ucrt is None:
    1509          if(sys.platform == 'win32' and
    1510                  locale.getencoding() == 'cp65001' and
    1511                  time.localtime().tm_zone == ''):
    1512              _buggy_ucrt = True
    1513          else:
    1514              _buggy_ucrt = False
    1515      return unittest.skip("buggy MSVC UCRT strptime/strftime")(test) if _buggy_ucrt else test
    1516  
    1517  class ESC[4;38;5;81mPythonSymlink:
    1518      """Creates a symlink for the current Python executable"""
    1519      def __init__(self, link=None):
    1520          from .os_helper import TESTFN
    1521  
    1522          self.link = link or os.path.abspath(TESTFN)
    1523          self._linked = []
    1524          self.real = os.path.realpath(sys.executable)
    1525          self._also_link = []
    1526  
    1527          self._env = None
    1528  
    1529          self._platform_specific()
    1530  
    1531      if sys.platform == "win32":
    1532          def _platform_specific(self):
    1533              import glob
    1534              import _winapi
    1535  
    1536              if os.path.lexists(self.real) and not os.path.exists(self.real):
    1537                  # App symlink appears to not exist, but we want the
    1538                  # real executable here anyway
    1539                  self.real = _winapi.GetModuleFileName(0)
    1540  
    1541              dll = _winapi.GetModuleFileName(sys.dllhandle)
    1542              src_dir = os.path.dirname(dll)
    1543              dest_dir = os.path.dirname(self.link)
    1544              self._also_link.append((
    1545                  dll,
    1546                  os.path.join(dest_dir, os.path.basename(dll))
    1547              ))
    1548              for runtime in glob.glob(os.path.join(glob.escape(src_dir), "vcruntime*.dll")):
    1549                  self._also_link.append((
    1550                      runtime,
    1551                      os.path.join(dest_dir, os.path.basename(runtime))
    1552                  ))
    1553  
    1554              self._env = {k.upper(): os.getenv(k) for k in os.environ}
    1555              self._env["PYTHONHOME"] = os.path.dirname(self.real)
    1556              if sysconfig.is_python_build():
    1557                  self._env["PYTHONPATH"] = STDLIB_DIR
    1558      else:
    1559          def _platform_specific(self):
    1560              pass
    1561  
    1562      def __enter__(self):
    1563          os.symlink(self.real, self.link)
    1564          self._linked.append(self.link)
    1565          for real, link in self._also_link:
    1566              os.symlink(real, link)
    1567              self._linked.append(link)
    1568          return self
    1569  
    1570      def __exit__(self, exc_type, exc_value, exc_tb):
    1571          for link in self._linked:
    1572              try:
    1573                  os.remove(link)
    1574              except IOError as ex:
    1575                  if verbose:
    1576                      print("failed to clean up {}: {}".format(link, ex))
    1577  
    1578      def _call(self, python, args, env, returncode):
    1579          import subprocess
    1580          cmd = [python, *args]
    1581          p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
    1582                               stderr=subprocess.PIPE, env=env)
    1583          r = p.communicate()
    1584          if p.returncode != returncode:
    1585              if verbose:
    1586                  print(repr(r[0]))
    1587                  print(repr(r[1]), file=sys.stderr)
    1588              raise RuntimeError(
    1589                  'unexpected return code: {0} (0x{0:08X})'.format(p.returncode))
    1590          return r
    1591  
    1592      def call_real(self, *args, returncode=0):
    1593          return self._call(self.real, args, None, returncode)
    1594  
    1595      def call_link(self, *args, returncode=0):
    1596          return self._call(self.link, args, self._env, returncode)
    1597  
    1598  
    1599  def skip_if_pgo_task(test):
    1600      """Skip decorator for tests not run in (non-extended) PGO task"""
    1601      ok = not PGO or PGO_EXTENDED
    1602      msg = "Not run for (non-extended) PGO task"
    1603      return test if ok else unittest.skip(msg)(test)
    1604  
    1605  
    1606  def detect_api_mismatch(ref_api, other_api, *, ignore=()):
    1607      """Returns the set of items in ref_api not in other_api, except for a
    1608      defined list of items to be ignored in this check.
    1609  
    1610      By default this skips private attributes beginning with '_' but
    1611      includes all magic methods, i.e. those starting and ending in '__'.
    1612      """
    1613      missing_items = set(dir(ref_api)) - set(dir(other_api))
    1614      if ignore:
    1615          missing_items -= set(ignore)
    1616      missing_items = set(m for m in missing_items
    1617                          if not m.startswith('_') or m.endswith('__'))
    1618      return missing_items
    1619  
    1620  
    1621  def check__all__(test_case, module, name_of_module=None, extra=(),
    1622                   not_exported=()):
    1623      """Assert that the __all__ variable of 'module' contains all public names.
    1624  
    1625      The module's public names (its API) are detected automatically based on
    1626      whether they match the public name convention and were defined in
    1627      'module'.
    1628  
    1629      The 'name_of_module' argument can specify (as a string or tuple thereof)
    1630      what module(s) an API could be defined in in order to be detected as a
    1631      public API. One case for this is when 'module' imports part of its public
    1632      API from other modules, possibly a C backend (like 'csv' and its '_csv').
    1633  
    1634      The 'extra' argument can be a set of names that wouldn't otherwise be
    1635      automatically detected as "public", like objects without a proper
    1636      '__module__' attribute. If provided, it will be added to the
    1637      automatically detected ones.
    1638  
    1639      The 'not_exported' argument can be a set of names that must not be treated
    1640      as part of the public API even though their names indicate otherwise.
    1641  
    1642      Usage:
    1643          import bar
    1644          import foo
    1645          import unittest
    1646          from test import support
    1647  
    1648          class MiscTestCase(unittest.TestCase):
    1649              def test__all__(self):
    1650                  support.check__all__(self, foo)
    1651  
    1652          class OtherTestCase(unittest.TestCase):
    1653              def test__all__(self):
    1654                  extra = {'BAR_CONST', 'FOO_CONST'}
    1655                  not_exported = {'baz'}  # Undocumented name.
    1656                  # bar imports part of its API from _bar.
    1657                  support.check__all__(self, bar, ('bar', '_bar'),
    1658                                       extra=extra, not_exported=not_exported)
    1659  
    1660      """
    1661  
    1662      if name_of_module is None:
    1663          name_of_module = (module.__name__, )
    1664      elif isinstance(name_of_module, str):
    1665          name_of_module = (name_of_module, )
    1666  
    1667      expected = set(extra)
    1668  
    1669      for name in dir(module):
    1670          if name.startswith('_') or name in not_exported:
    1671              continue
    1672          obj = getattr(module, name)
    1673          if (getattr(obj, '__module__', None) in name_of_module or
    1674                  (not hasattr(obj, '__module__') and
    1675                   not isinstance(obj, types.ModuleType))):
    1676              expected.add(name)
    1677      test_case.assertCountEqual(module.__all__, expected)
    1678  
    1679  
    1680  def suppress_msvcrt_asserts(verbose=False):
    1681      try:
    1682          import msvcrt
    1683      except ImportError:
    1684          return
    1685  
    1686      msvcrt.SetErrorMode(msvcrt.SEM_FAILCRITICALERRORS
    1687                          | msvcrt.SEM_NOALIGNMENTFAULTEXCEPT
    1688                          | msvcrt.SEM_NOGPFAULTERRORBOX
    1689                          | msvcrt.SEM_NOOPENFILEERRORBOX)
    1690  
    1691      # CrtSetReportMode() is only available in debug build
    1692      if hasattr(msvcrt, 'CrtSetReportMode'):
    1693          for m in [msvcrt.CRT_WARN, msvcrt.CRT_ERROR, msvcrt.CRT_ASSERT]:
    1694              if verbose:
    1695                  msvcrt.CrtSetReportMode(m, msvcrt.CRTDBG_MODE_FILE)
    1696                  msvcrt.CrtSetReportFile(m, msvcrt.CRTDBG_FILE_STDERR)
    1697              else:
    1698                  msvcrt.CrtSetReportMode(m, 0)
    1699  
    1700  
    1701  class ESC[4;38;5;81mSuppressCrashReport:
    1702      """Try to prevent a crash report from popping up.
    1703  
    1704      On Windows, don't display the Windows Error Reporting dialog.  On UNIX,
    1705      disable the creation of coredump file.
    1706      """
    1707      old_value = None
    1708      old_modes = None
    1709  
    1710      def __enter__(self):
    1711          """On Windows, disable Windows Error Reporting dialogs using
    1712          SetErrorMode() and CrtSetReportMode().
    1713  
    1714          On UNIX, try to save the previous core file size limit, then set
    1715          soft limit to 0.
    1716          """
    1717          if sys.platform.startswith('win'):
    1718              # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx
    1719              try:
    1720                  import msvcrt
    1721              except ImportError:
    1722                  return
    1723  
    1724              self.old_value = msvcrt.GetErrorMode()
    1725  
    1726              msvcrt.SetErrorMode(self.old_value | msvcrt.SEM_NOGPFAULTERRORBOX)
    1727  
    1728              # bpo-23314: Suppress assert dialogs in debug builds.
    1729              # CrtSetReportMode() is only available in debug build.
    1730              if hasattr(msvcrt, 'CrtSetReportMode'):
    1731                  self.old_modes = {}
    1732                  for report_type in [msvcrt.CRT_WARN,
    1733                                      msvcrt.CRT_ERROR,
    1734                                      msvcrt.CRT_ASSERT]:
    1735                      old_mode = msvcrt.CrtSetReportMode(report_type,
    1736                              msvcrt.CRTDBG_MODE_FILE)
    1737                      old_file = msvcrt.CrtSetReportFile(report_type,
    1738                              msvcrt.CRTDBG_FILE_STDERR)
    1739                      self.old_modes[report_type] = old_mode, old_file
    1740  
    1741          else:
    1742              try:
    1743                  import resource
    1744                  self.resource = resource
    1745              except ImportError:
    1746                  self.resource = None
    1747              if self.resource is not None:
    1748                  try:
    1749                      self.old_value = self.resource.getrlimit(self.resource.RLIMIT_CORE)
    1750                      self.resource.setrlimit(self.resource.RLIMIT_CORE,
    1751                                              (0, self.old_value[1]))
    1752                  except (ValueError, OSError):
    1753                      pass
    1754  
    1755              if sys.platform == 'darwin':
    1756                  import subprocess
    1757                  # Check if the 'Crash Reporter' on OSX was configured
    1758                  # in 'Developer' mode and warn that it will get triggered
    1759                  # when it is.
    1760                  #
    1761                  # This assumes that this context manager is used in tests
    1762                  # that might trigger the next manager.
    1763                  cmd = ['/usr/bin/defaults', 'read',
    1764                         'com.apple.CrashReporter', 'DialogType']
    1765                  proc = subprocess.Popen(cmd,
    1766                                          stdout=subprocess.PIPE,
    1767                                          stderr=subprocess.PIPE)
    1768                  with proc:
    1769                      stdout = proc.communicate()[0]
    1770                  if stdout.strip() == b'developer':
    1771                      print("this test triggers the Crash Reporter, "
    1772                            "that is intentional", end='', flush=True)
    1773  
    1774          return self
    1775  
    1776      def __exit__(self, *ignore_exc):
    1777          """Restore Windows ErrorMode or core file behavior to initial value."""
    1778          if self.old_value is None:
    1779              return
    1780  
    1781          if sys.platform.startswith('win'):
    1782              import msvcrt
    1783              msvcrt.SetErrorMode(self.old_value)
    1784  
    1785              if self.old_modes:
    1786                  for report_type, (old_mode, old_file) in self.old_modes.items():
    1787                      msvcrt.CrtSetReportMode(report_type, old_mode)
    1788                      msvcrt.CrtSetReportFile(report_type, old_file)
    1789          else:
    1790              if self.resource is not None:
    1791                  try:
    1792                      self.resource.setrlimit(self.resource.RLIMIT_CORE, self.old_value)
    1793                  except (ValueError, OSError):
    1794                      pass
    1795  
    1796  
    1797  def patch(test_instance, object_to_patch, attr_name, new_value):
    1798      """Override 'object_to_patch'.'attr_name' with 'new_value'.
    1799  
    1800      Also, add a cleanup procedure to 'test_instance' to restore
    1801      'object_to_patch' value for 'attr_name'.
    1802      The 'attr_name' should be a valid attribute for 'object_to_patch'.
    1803  
    1804      """
    1805      # check that 'attr_name' is a real attribute for 'object_to_patch'
    1806      # will raise AttributeError if it does not exist
    1807      getattr(object_to_patch, attr_name)
    1808  
    1809      # keep a copy of the old value
    1810      attr_is_local = False
    1811      try:
    1812          old_value = object_to_patch.__dict__[attr_name]
    1813      except (AttributeError, KeyError):
    1814          old_value = getattr(object_to_patch, attr_name, None)
    1815      else:
    1816          attr_is_local = True
    1817  
    1818      # restore the value when the test is done
    1819      def cleanup():
    1820          if attr_is_local:
    1821              setattr(object_to_patch, attr_name, old_value)
    1822          else:
    1823              delattr(object_to_patch, attr_name)
    1824  
    1825      test_instance.addCleanup(cleanup)
    1826  
    1827      # actually override the attribute
    1828      setattr(object_to_patch, attr_name, new_value)
    1829  
    1830  
    1831  @contextlib.contextmanager
    1832  def patch_list(orig):
    1833      """Like unittest.mock.patch.dict, but for lists."""
    1834      try:
    1835          saved = orig[:]
    1836          yield
    1837      finally:
    1838          orig[:] = saved
    1839  
    1840  
    1841  def run_in_subinterp(code):
    1842      """
    1843      Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc
    1844      module is enabled.
    1845      """
    1846      _check_tracemalloc()
    1847      import _testcapi
    1848      return _testcapi.run_in_subinterp(code)
    1849  
    1850  
    1851  def run_in_subinterp_with_config(code, *, own_gil=None, **config):
    1852      """
    1853      Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc
    1854      module is enabled.
    1855      """
    1856      _check_tracemalloc()
    1857      import _testcapi
    1858      if own_gil is not None:
    1859          assert 'gil' not in config, (own_gil, config)
    1860          config['gil'] = 2 if own_gil else 1
    1861      return _testcapi.run_in_subinterp_with_config(code, **config)
    1862  
    1863  
    1864  def _check_tracemalloc():
    1865      # Issue #10915, #15751: PyGILState_*() functions don't work with
    1866      # sub-interpreters, the tracemalloc module uses these functions internally
    1867      try:
    1868          import tracemalloc
    1869      except ImportError:
    1870          pass
    1871      else:
    1872          if tracemalloc.is_tracing():
    1873              raise unittest.SkipTest("run_in_subinterp() cannot be used "
    1874                                       "if tracemalloc module is tracing "
    1875                                       "memory allocations")
    1876  
    1877  
    1878  def check_free_after_iterating(test, iter, cls, args=()):
    1879      class ESC[4;38;5;81mA(ESC[4;38;5;149mcls):
    1880          def __del__(self):
    1881              nonlocal done
    1882              done = True
    1883              try:
    1884                  next(it)
    1885              except StopIteration:
    1886                  pass
    1887  
    1888      done = False
    1889      it = iter(A(*args))
    1890      # Issue 26494: Shouldn't crash
    1891      test.assertRaises(StopIteration, next, it)
    1892      # The sequence should be deallocated just after the end of iterating
    1893      gc_collect()
    1894      test.assertTrue(done)
    1895  
    1896  
    1897  def missing_compiler_executable(cmd_names=[]):
    1898      """Check if the compiler components used to build the interpreter exist.
    1899  
    1900      Check for the existence of the compiler executables whose names are listed
    1901      in 'cmd_names' or all the compiler executables when 'cmd_names' is empty
    1902      and return the first missing executable or None when none is found
    1903      missing.
    1904  
    1905      """
    1906      from setuptools._distutils import ccompiler, sysconfig, spawn
    1907      from setuptools import errors
    1908  
    1909      compiler = ccompiler.new_compiler()
    1910      sysconfig.customize_compiler(compiler)
    1911      if compiler.compiler_type == "msvc":
    1912          # MSVC has no executables, so check whether initialization succeeds
    1913          try:
    1914              compiler.initialize()
    1915          except errors.PlatformError:
    1916              return "msvc"
    1917      for name in compiler.executables:
    1918          if cmd_names and name not in cmd_names:
    1919              continue
    1920          cmd = getattr(compiler, name)
    1921          if cmd_names:
    1922              assert cmd is not None, \
    1923                      "the '%s' executable is not configured" % name
    1924          elif not cmd:
    1925              continue
    1926          if spawn.find_executable(cmd[0]) is None:
    1927              return cmd[0]
    1928  
    1929  
    1930  _is_android_emulator = None
    1931  def setswitchinterval(interval):
    1932      # Setting a very low gil interval on the Android emulator causes python
    1933      # to hang (issue #26939).
    1934      minimum_interval = 1e-5
    1935      if is_android and interval < minimum_interval:
    1936          global _is_android_emulator
    1937          if _is_android_emulator is None:
    1938              import subprocess
    1939              _is_android_emulator = (subprocess.check_output(
    1940                                 ['getprop', 'ro.kernel.qemu']).strip() == b'1')
    1941          if _is_android_emulator:
    1942              interval = minimum_interval
    1943      return sys.setswitchinterval(interval)
    1944  
    1945  
    1946  def get_pagesize():
    1947      """Get size of a page in bytes."""
    1948      try:
    1949          page_size = os.sysconf('SC_PAGESIZE')
    1950      except (ValueError, AttributeError):
    1951          try:
    1952              page_size = os.sysconf('SC_PAGE_SIZE')
    1953          except (ValueError, AttributeError):
    1954              page_size = 4096
    1955      return page_size
    1956  
    1957  
    1958  @contextlib.contextmanager
    1959  def disable_faulthandler():
    1960      import faulthandler
    1961  
    1962      # use sys.__stderr__ instead of sys.stderr, since regrtest replaces
    1963      # sys.stderr with a StringIO which has no file descriptor when a test
    1964      # is run with -W/--verbose3.
    1965      fd = sys.__stderr__.fileno()
    1966  
    1967      is_enabled = faulthandler.is_enabled()
    1968      try:
    1969          faulthandler.disable()
    1970          yield
    1971      finally:
    1972          if is_enabled:
    1973              faulthandler.enable(file=fd, all_threads=True)
    1974  
    1975  
    1976  class ESC[4;38;5;81mSaveSignals:
    1977      """
    1978      Save and restore signal handlers.
    1979  
    1980      This class is only able to save/restore signal handlers registered
    1981      by the Python signal module: see bpo-13285 for "external" signal
    1982      handlers.
    1983      """
    1984  
    1985      def __init__(self):
    1986          import signal
    1987          self.signal = signal
    1988          self.signals = signal.valid_signals()
    1989          # SIGKILL and SIGSTOP signals cannot be ignored nor caught
    1990          for signame in ('SIGKILL', 'SIGSTOP'):
    1991              try:
    1992                  signum = getattr(signal, signame)
    1993              except AttributeError:
    1994                  continue
    1995              self.signals.remove(signum)
    1996          self.handlers = {}
    1997  
    1998      def save(self):
    1999          for signum in self.signals:
    2000              handler = self.signal.getsignal(signum)
    2001              if handler is None:
    2002                  # getsignal() returns None if a signal handler was not
    2003                  # registered by the Python signal module,
    2004                  # and the handler is not SIG_DFL nor SIG_IGN.
    2005                  #
    2006                  # Ignore the signal: we cannot restore the handler.
    2007                  continue
    2008              self.handlers[signum] = handler
    2009  
    2010      def restore(self):
    2011          for signum, handler in self.handlers.items():
    2012              self.signal.signal(signum, handler)
    2013  
    2014  
    2015  def with_pymalloc():
    2016      import _testcapi
    2017      return _testcapi.WITH_PYMALLOC
    2018  
    2019  
    2020  class ESC[4;38;5;81m_ALWAYS_EQ:
    2021      """
    2022      Object that is equal to anything.
    2023      """
    2024      def __eq__(self, other):
    2025          return True
    2026      def __ne__(self, other):
    2027          return False
    2028  
    2029  ALWAYS_EQ = _ALWAYS_EQ()
    2030  
    2031  class ESC[4;38;5;81m_NEVER_EQ:
    2032      """
    2033      Object that is not equal to anything.
    2034      """
    2035      def __eq__(self, other):
    2036          return False
    2037      def __ne__(self, other):
    2038          return True
    2039      def __hash__(self):
    2040          return 1
    2041  
    2042  NEVER_EQ = _NEVER_EQ()
    2043  
    2044  @functools.total_ordering
    2045  class ESC[4;38;5;81m_LARGEST:
    2046      """
    2047      Object that is greater than anything (except itself).
    2048      """
    2049      def __eq__(self, other):
    2050          return isinstance(other, _LARGEST)
    2051      def __lt__(self, other):
    2052          return False
    2053  
    2054  LARGEST = _LARGEST()
    2055  
    2056  @functools.total_ordering
    2057  class ESC[4;38;5;81m_SMALLEST:
    2058      """
    2059      Object that is less than anything (except itself).
    2060      """
    2061      def __eq__(self, other):
    2062          return isinstance(other, _SMALLEST)
    2063      def __gt__(self, other):
    2064          return False
    2065  
    2066  SMALLEST = _SMALLEST()
    2067  
    2068  def maybe_get_event_loop_policy():
    2069      """Return the global event loop policy if one is set, else return None."""
    2070      import asyncio.events
    2071      return asyncio.events._event_loop_policy
    2072  
    2073  # Helpers for testing hashing.
    2074  NHASHBITS = sys.hash_info.width # number of bits in hash() result
    2075  assert NHASHBITS in (32, 64)
    2076  
    2077  # Return mean and sdev of number of collisions when tossing nballs balls
    2078  # uniformly at random into nbins bins.  By definition, the number of
    2079  # collisions is the number of balls minus the number of occupied bins at
    2080  # the end.
    2081  def collision_stats(nbins, nballs):
    2082      n, k = nbins, nballs
    2083      # prob a bin empty after k trials = (1 - 1/n)**k
    2084      # mean # empty is then n * (1 - 1/n)**k
    2085      # so mean # occupied is n - n * (1 - 1/n)**k
    2086      # so collisions = k - (n - n*(1 - 1/n)**k)
    2087      #
    2088      # For the variance:
    2089      # n*(n-1)*(1-2/n)**k + meanempty - meanempty**2 =
    2090      # n*(n-1)*(1-2/n)**k + meanempty * (1 - meanempty)
    2091      #
    2092      # Massive cancellation occurs, and, e.g., for a 64-bit hash code
    2093      # 1-1/2**64 rounds uselessly to 1.0.  Rather than make heroic (and
    2094      # error-prone) efforts to rework the naive formulas to avoid those,
    2095      # we use the `decimal` module to get plenty of extra precision.
    2096      #
    2097      # Note:  the exact values are straightforward to compute with
    2098      # rationals, but in context that's unbearably slow, requiring
    2099      # multi-million bit arithmetic.
    2100      import decimal
    2101      with decimal.localcontext() as ctx:
    2102          bits = n.bit_length() * 2  # bits in n**2
    2103          # At least that many bits will likely cancel out.
    2104          # Use that many decimal digits instead.
    2105          ctx.prec = max(bits, 30)
    2106          dn = decimal.Decimal(n)
    2107          p1empty = ((dn - 1) / dn) ** k
    2108          meanempty = n * p1empty
    2109          occupied = n - meanempty
    2110          collisions = k - occupied
    2111          var = dn*(dn-1)*((dn-2)/dn)**k + meanempty * (1 - meanempty)
    2112          return float(collisions), float(var.sqrt())
    2113  
    2114  
    2115  class ESC[4;38;5;81mcatch_unraisable_exception:
    2116      """
    2117      Context manager catching unraisable exception using sys.unraisablehook.
    2118  
    2119      Storing the exception value (cm.unraisable.exc_value) creates a reference
    2120      cycle. The reference cycle is broken explicitly when the context manager
    2121      exits.
    2122  
    2123      Storing the object (cm.unraisable.object) can resurrect it if it is set to
    2124      an object which is being finalized. Exiting the context manager clears the
    2125      stored object.
    2126  
    2127      Usage:
    2128  
    2129          with support.catch_unraisable_exception() as cm:
    2130              # code creating an "unraisable exception"
    2131              ...
    2132  
    2133              # check the unraisable exception: use cm.unraisable
    2134              ...
    2135  
    2136          # cm.unraisable attribute no longer exists at this point
    2137          # (to break a reference cycle)
    2138      """
    2139  
    2140      def __init__(self):
    2141          self.unraisable = None
    2142          self._old_hook = None
    2143  
    2144      def _hook(self, unraisable):
    2145          # Storing unraisable.object can resurrect an object which is being
    2146          # finalized. Storing unraisable.exc_value creates a reference cycle.
    2147          self.unraisable = unraisable
    2148  
    2149      def __enter__(self):
    2150          self._old_hook = sys.unraisablehook
    2151          sys.unraisablehook = self._hook
    2152          return self
    2153  
    2154      def __exit__(self, *exc_info):
    2155          sys.unraisablehook = self._old_hook
    2156          del self.unraisable
    2157  
    2158  
    2159  def wait_process(pid, *, exitcode, timeout=None):
    2160      """
    2161      Wait until process pid completes and check that the process exit code is
    2162      exitcode.
    2163  
    2164      Raise an AssertionError if the process exit code is not equal to exitcode.
    2165  
    2166      If the process runs longer than timeout seconds (LONG_TIMEOUT by default),
    2167      kill the process (if signal.SIGKILL is available) and raise an
    2168      AssertionError. The timeout feature is not available on Windows.
    2169      """
    2170      if os.name != "nt":
    2171          import signal
    2172  
    2173          if timeout is None:
    2174              timeout = LONG_TIMEOUT
    2175  
    2176          start_time = time.monotonic()
    2177          for _ in sleeping_retry(timeout, error=False):
    2178              pid2, status = os.waitpid(pid, os.WNOHANG)
    2179              if pid2 != 0:
    2180                  break
    2181              # rety: the process is still running
    2182          else:
    2183              try:
    2184                  os.kill(pid, signal.SIGKILL)
    2185                  os.waitpid(pid, 0)
    2186              except OSError:
    2187                  # Ignore errors like ChildProcessError or PermissionError
    2188                  pass
    2189  
    2190              dt = time.monotonic() - start_time
    2191              raise AssertionError(f"process {pid} is still running "
    2192                                   f"after {dt:.1f} seconds")
    2193      else:
    2194          # Windows implementation: don't support timeout :-(
    2195          pid2, status = os.waitpid(pid, 0)
    2196  
    2197      exitcode2 = os.waitstatus_to_exitcode(status)
    2198      if exitcode2 != exitcode:
    2199          raise AssertionError(f"process {pid} exited with code {exitcode2}, "
    2200                               f"but exit code {exitcode} is expected")
    2201  
    2202      # sanity check: it should not fail in practice
    2203      if pid2 != pid:
    2204          raise AssertionError(f"pid {pid2} != pid {pid}")
    2205  
    2206  def skip_if_broken_multiprocessing_synchronize():
    2207      """
    2208      Skip tests if the multiprocessing.synchronize module is missing, if there
    2209      is no available semaphore implementation, or if creating a lock raises an
    2210      OSError (on Linux only).
    2211      """
    2212      from .import_helper import import_module
    2213  
    2214      # Skip tests if the _multiprocessing extension is missing.
    2215      import_module('_multiprocessing')
    2216  
    2217      # Skip tests if there is no available semaphore implementation:
    2218      # multiprocessing.synchronize requires _multiprocessing.SemLock.
    2219      synchronize = import_module('multiprocessing.synchronize')
    2220  
    2221      if sys.platform == "linux":
    2222          try:
    2223              # bpo-38377: On Linux, creating a semaphore fails with OSError
    2224              # if the current user does not have the permission to create
    2225              # a file in /dev/shm/ directory.
    2226              synchronize.Lock(ctx=None)
    2227          except OSError as exc:
    2228              raise unittest.SkipTest(f"broken multiprocessing SemLock: {exc!r}")
    2229  
    2230  
    2231  def check_disallow_instantiation(testcase, tp, *args, **kwds):
    2232      """
    2233      Check that given type cannot be instantiated using *args and **kwds.
    2234  
    2235      See bpo-43916: Add Py_TPFLAGS_DISALLOW_INSTANTIATION type flag.
    2236      """
    2237      mod = tp.__module__
    2238      name = tp.__name__
    2239      if mod != 'builtins':
    2240          qualname = f"{mod}.{name}"
    2241      else:
    2242          qualname = f"{name}"
    2243      msg = f"cannot create '{re.escape(qualname)}' instances"
    2244      testcase.assertRaisesRegex(TypeError, msg, tp, *args, **kwds)
    2245  
    2246  def get_recursion_depth():
    2247      """Get the recursion depth of the caller function.
    2248  
    2249      In the __main__ module, at the module level, it should be 1.
    2250      """
    2251      try:
    2252          import _testinternalcapi
    2253          depth = _testinternalcapi.get_recursion_depth()
    2254      except (ImportError, RecursionError) as exc:
    2255          # sys._getframe() + frame.f_back implementation.
    2256          try:
    2257              depth = 0
    2258              frame = sys._getframe()
    2259              while frame is not None:
    2260                  depth += 1
    2261                  frame = frame.f_back
    2262          finally:
    2263              # Break any reference cycles.
    2264              frame = None
    2265  
    2266      # Ignore get_recursion_depth() frame.
    2267      return max(depth - 1, 1)
    2268  
    2269  def get_recursion_available():
    2270      """Get the number of available frames before RecursionError.
    2271  
    2272      It depends on the current recursion depth of the caller function and
    2273      sys.getrecursionlimit().
    2274      """
    2275      limit = sys.getrecursionlimit()
    2276      depth = get_recursion_depth()
    2277      return limit - depth
    2278  
    2279  @contextlib.contextmanager
    2280  def set_recursion_limit(limit):
    2281      """Temporarily change the recursion limit."""
    2282      original_limit = sys.getrecursionlimit()
    2283      try:
    2284          sys.setrecursionlimit(limit)
    2285          yield
    2286      finally:
    2287          sys.setrecursionlimit(original_limit)
    2288  
    2289  def infinite_recursion(max_depth=100):
    2290      """Set a lower limit for tests that interact with infinite recursions
    2291      (e.g test_ast.ASTHelpers_Test.test_recursion_direct) since on some
    2292      debug windows builds, due to not enough functions being inlined the
    2293      stack size might not handle the default recursion limit (1000). See
    2294      bpo-11105 for details."""
    2295      if max_depth < 3:
    2296          raise ValueError("max_depth must be at least 3, got {max_depth}")
    2297      depth = get_recursion_depth()
    2298      depth = max(depth - 1, 1)  # Ignore infinite_recursion() frame.
    2299      limit = depth + max_depth
    2300      return set_recursion_limit(limit)
    2301  
    2302  def ignore_deprecations_from(module: str, *, like: str) -> object:
    2303      token = object()
    2304      warnings.filterwarnings(
    2305          "ignore",
    2306          category=DeprecationWarning,
    2307          module=module,
    2308          message=like + fr"(?#support{id(token)})",
    2309      )
    2310      return token
    2311  
    2312  def clear_ignored_deprecations(*tokens: object) -> None:
    2313      if not tokens:
    2314          raise ValueError("Provide token or tokens returned by ignore_deprecations_from")
    2315  
    2316      new_filters = []
    2317      endswith = tuple(rf"(?#support{id(token)})" for token in tokens)
    2318      for action, message, category, module, lineno in warnings.filters:
    2319          if action == "ignore" and category is DeprecationWarning:
    2320              if isinstance(message, re.Pattern):
    2321                  msg = message.pattern
    2322              else:
    2323                  msg = message or ""
    2324              if msg.endswith(endswith):
    2325                  continue
    2326          new_filters.append((action, message, category, module, lineno))
    2327      if warnings.filters != new_filters:
    2328          warnings.filters[:] = new_filters
    2329          warnings._filters_mutated()
    2330  
    2331  
    2332  # Skip a test if venv with pip is known to not work.
    2333  def requires_venv_with_pip():
    2334      # ensurepip requires zlib to open ZIP archives (.whl binary wheel packages)
    2335      try:
    2336          import zlib
    2337      except ImportError:
    2338          return unittest.skipIf(True, "venv: ensurepip requires zlib")
    2339  
    2340      # bpo-26610: pip/pep425tags.py requires ctypes.
    2341      # gh-92820: setuptools/windows_support.py uses ctypes (setuptools 58.1).
    2342      try:
    2343          import ctypes
    2344      except ImportError:
    2345          ctypes = None
    2346      return unittest.skipUnless(ctypes, 'venv: pip requires ctypes')
    2347  
    2348  
    2349  @functools.cache
    2350  def _findwheel(pkgname):
    2351      """Try to find a wheel with the package specified as pkgname.
    2352  
    2353      If set, the wheels are searched for in WHEEL_PKG_DIR (see ensurepip).
    2354      Otherwise, they are searched for in the test directory.
    2355      """
    2356      wheel_dir = sysconfig.get_config_var('WHEEL_PKG_DIR') or TEST_HOME_DIR
    2357      filenames = os.listdir(wheel_dir)
    2358      filenames = sorted(filenames, reverse=True)  # approximate "newest" first
    2359      for filename in filenames:
    2360          # filename is like 'setuptools-67.6.1-py3-none-any.whl'
    2361          if not filename.endswith(".whl"):
    2362              continue
    2363          prefix = pkgname + '-'
    2364          if filename.startswith(prefix):
    2365              return os.path.join(wheel_dir, filename)
    2366      raise FileNotFoundError(f"No wheel for {pkgname} found in {wheel_dir}")
    2367  
    2368  
    2369  # Context manager that creates a virtual environment, install setuptools and wheel in it
    2370  # and returns the path to the venv directory and the path to the python executable
    2371  @contextlib.contextmanager
    2372  def setup_venv_with_pip_setuptools_wheel(venv_dir):
    2373      import subprocess
    2374      from .os_helper import temp_cwd
    2375  
    2376      with temp_cwd() as temp_dir:
    2377          # Create virtual environment to get setuptools
    2378          cmd = [sys.executable, '-X', 'dev', '-m', 'venv', venv_dir]
    2379          if verbose:
    2380              print()
    2381              print('Run:', ' '.join(cmd))
    2382          subprocess.run(cmd, check=True)
    2383  
    2384          venv = os.path.join(temp_dir, venv_dir)
    2385  
    2386          # Get the Python executable of the venv
    2387          python_exe = os.path.basename(sys.executable)
    2388          if sys.platform == 'win32':
    2389              python = os.path.join(venv, 'Scripts', python_exe)
    2390          else:
    2391              python = os.path.join(venv, 'bin', python_exe)
    2392  
    2393          cmd = [python, '-X', 'dev',
    2394                 '-m', 'pip', 'install',
    2395                 _findwheel('setuptools'),
    2396                 _findwheel('wheel')]
    2397          if verbose:
    2398              print()
    2399              print('Run:', ' '.join(cmd))
    2400          subprocess.run(cmd, check=True)
    2401  
    2402          yield python
    2403  
    2404  
    2405  # True if Python is built with the Py_DEBUG macro defined: if
    2406  # Python is built in debug mode (./configure --with-pydebug).
    2407  Py_DEBUG = hasattr(sys, 'gettotalrefcount')
    2408  
    2409  
    2410  def late_deletion(obj):
    2411      """
    2412      Keep a Python alive as long as possible.
    2413  
    2414      Create a reference cycle and store the cycle in an object deleted late in
    2415      Python finalization. Try to keep the object alive until the very last
    2416      garbage collection.
    2417  
    2418      The function keeps a strong reference by design. It should be called in a
    2419      subprocess to not mark a test as "leaking a reference".
    2420      """
    2421  
    2422      # Late CPython finalization:
    2423      # - finalize_interp_clear()
    2424      # - _PyInterpreterState_Clear(): Clear PyInterpreterState members
    2425      #   (ex: codec_search_path, before_forkers)
    2426      # - clear os.register_at_fork() callbacks
    2427      # - clear codecs.register() callbacks
    2428  
    2429      ref_cycle = [obj]
    2430      ref_cycle.append(ref_cycle)
    2431  
    2432      # Store a reference in PyInterpreterState.codec_search_path
    2433      import codecs
    2434      def search_func(encoding):
    2435          return None
    2436      search_func.reference = ref_cycle
    2437      codecs.register(search_func)
    2438  
    2439      if hasattr(os, 'register_at_fork'):
    2440          # Store a reference in PyInterpreterState.before_forkers
    2441          def atfork_func():
    2442              pass
    2443          atfork_func.reference = ref_cycle
    2444          os.register_at_fork(before=atfork_func)
    2445  
    2446  
    2447  def busy_retry(timeout, err_msg=None, /, *, error=True):
    2448      """
    2449      Run the loop body until "break" stops the loop.
    2450  
    2451      After *timeout* seconds, raise an AssertionError if *error* is true,
    2452      or just stop if *error is false.
    2453  
    2454      Example:
    2455  
    2456          for _ in support.busy_retry(support.SHORT_TIMEOUT):
    2457              if check():
    2458                  break
    2459  
    2460      Example of error=False usage:
    2461  
    2462          for _ in support.busy_retry(support.SHORT_TIMEOUT, error=False):
    2463              if check():
    2464                  break
    2465          else:
    2466              raise RuntimeError('my custom error')
    2467  
    2468      """
    2469      if timeout <= 0:
    2470          raise ValueError("timeout must be greater than zero")
    2471  
    2472      start_time = time.monotonic()
    2473      deadline = start_time + timeout
    2474  
    2475      while True:
    2476          yield
    2477  
    2478          if time.monotonic() >= deadline:
    2479              break
    2480  
    2481      if error:
    2482          dt = time.monotonic() - start_time
    2483          msg = f"timeout ({dt:.1f} seconds)"
    2484          if err_msg:
    2485              msg = f"{msg}: {err_msg}"
    2486          raise AssertionError(msg)
    2487  
    2488  
    2489  def sleeping_retry(timeout, err_msg=None, /,
    2490                       *, init_delay=0.010, max_delay=1.0, error=True):
    2491      """
    2492      Wait strategy that applies exponential backoff.
    2493  
    2494      Run the loop body until "break" stops the loop. Sleep at each loop
    2495      iteration, but not at the first iteration. The sleep delay is doubled at
    2496      each iteration (up to *max_delay* seconds).
    2497  
    2498      See busy_retry() documentation for the parameters usage.
    2499  
    2500      Example raising an exception after SHORT_TIMEOUT seconds:
    2501  
    2502          for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
    2503              if check():
    2504                  break
    2505  
    2506      Example of error=False usage:
    2507  
    2508          for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
    2509              if check():
    2510                  break
    2511          else:
    2512              raise RuntimeError('my custom error')
    2513      """
    2514  
    2515      delay = init_delay
    2516      for _ in busy_retry(timeout, err_msg, error=error):
    2517          yield
    2518  
    2519          time.sleep(delay)
    2520          delay = min(delay * 2, max_delay)
    2521  
    2522  
    2523  @contextlib.contextmanager
    2524  def adjust_int_max_str_digits(max_digits):
    2525      """Temporarily change the integer string conversion length limit."""
    2526      current = sys.get_int_max_str_digits()
    2527      try:
    2528          sys.set_int_max_str_digits(max_digits)
    2529          yield
    2530      finally:
    2531          sys.set_int_max_str_digits(current)
    2532  
    2533  #For recursion tests, easily exceeds default recursion limit
    2534  EXCEEDS_RECURSION_LIMIT = 5000
    2535  
    2536  # The default C recursion limit (from Include/cpython/pystate.h).
    2537  C_RECURSION_LIMIT = 1500
    2538  
    2539  #Windows doesn't have os.uname() but it doesn't support s390x.
    2540  skip_on_s390x = unittest.skipIf(hasattr(os, 'uname') and os.uname().machine == 's390x',
    2541                                  'skipped on s390x')