(root)/
Python-3.11.7/
Lib/
test/
libregrtest/
utils.py
       1  import contextlib
       2  import faulthandler
       3  import locale
       4  import math
       5  import os.path
       6  import platform
       7  import random
       8  import shlex
       9  import signal
      10  import subprocess
      11  import sys
      12  import sysconfig
      13  import tempfile
      14  import textwrap
      15  from collections.abc import Callable, Iterable
      16  
      17  from test import support
      18  from test.support import os_helper
      19  from test.support import threading_helper
      20  
      21  
      22  # All temporary files and temporary directories created by libregrtest should
      23  # use TMP_PREFIX so cleanup_temp_dir() can remove them all.
      24  TMP_PREFIX = 'test_python_'
      25  WORK_DIR_PREFIX = TMP_PREFIX
      26  WORKER_WORK_DIR_PREFIX = WORK_DIR_PREFIX + 'worker_'
      27  
      28  # bpo-38203: Maximum delay in seconds to exit Python (call Py_Finalize()).
      29  # Used to protect against threading._shutdown() hang.
      30  # Must be smaller than buildbot "1200 seconds without output" limit.
      31  EXIT_TIMEOUT = 120.0
      32  
      33  
      34  ALL_RESOURCES = ('audio', 'curses', 'largefile', 'network',
      35                   'decimal', 'cpu', 'subprocess', 'urlfetch', 'gui', 'walltime')
      36  
      37  # Other resources excluded from --use=all:
      38  #
      39  # - extralagefile (ex: test_zipfile64): really too slow to be enabled
      40  #   "by default"
      41  # - tzdata: while needed to validate fully test_datetime, it makes
      42  #   test_datetime too slow (15-20 min on some buildbots) and so is disabled by
      43  #   default (see bpo-30822).
      44  RESOURCE_NAMES = ALL_RESOURCES + ('extralargefile', 'tzdata')
      45  
      46  
      47  # Types for types hints
      48  StrPath = str
      49  TestName = str
      50  StrJSON = str
      51  TestTuple = tuple[TestName, ...]
      52  TestList = list[TestName]
      53  # --match and --ignore options: list of patterns
      54  # ('*' joker character can be used)
      55  TestFilter = list[tuple[TestName, bool]]
      56  FilterTuple = tuple[TestName, ...]
      57  FilterDict = dict[TestName, FilterTuple]
      58  
      59  
      60  def format_duration(seconds):
      61      ms = math.ceil(seconds * 1e3)
      62      seconds, ms = divmod(ms, 1000)
      63      minutes, seconds = divmod(seconds, 60)
      64      hours, minutes = divmod(minutes, 60)
      65  
      66      parts = []
      67      if hours:
      68          parts.append('%s hour' % hours)
      69      if minutes:
      70          parts.append('%s min' % minutes)
      71      if seconds:
      72          if parts:
      73              # 2 min 1 sec
      74              parts.append('%s sec' % seconds)
      75          else:
      76              # 1.0 sec
      77              parts.append('%.1f sec' % (seconds + ms / 1000))
      78      if not parts:
      79          return '%s ms' % ms
      80  
      81      parts = parts[:2]
      82      return ' '.join(parts)
      83  
      84  
      85  def strip_py_suffix(names: list[str] | None) -> None:
      86      if not names:
      87          return
      88      for idx, name in enumerate(names):
      89          basename, ext = os.path.splitext(name)
      90          if ext == '.py':
      91              names[idx] = basename
      92  
      93  
      94  def plural(n, singular, plural=None):
      95      if n == 1:
      96          return singular
      97      elif plural is not None:
      98          return plural
      99      else:
     100          return singular + 's'
     101  
     102  
     103  def count(n, word):
     104      if n == 1:
     105          return f"{n} {word}"
     106      else:
     107          return f"{n} {word}s"
     108  
     109  
     110  def printlist(x, width=70, indent=4, file=None):
     111      """Print the elements of iterable x to stdout.
     112  
     113      Optional arg width (default 70) is the maximum line length.
     114      Optional arg indent (default 4) is the number of blanks with which to
     115      begin each line.
     116      """
     117  
     118      blanks = ' ' * indent
     119      # Print the sorted list: 'x' may be a '--random' list or a set()
     120      print(textwrap.fill(' '.join(str(elt) for elt in sorted(x)), width,
     121                          initial_indent=blanks, subsequent_indent=blanks),
     122            file=file)
     123  
     124  
     125  def print_warning(msg):
     126      support.print_warning(msg)
     127  
     128  
     129  orig_unraisablehook = None
     130  
     131  
     132  def regrtest_unraisable_hook(unraisable):
     133      global orig_unraisablehook
     134      support.environment_altered = True
     135      support.print_warning("Unraisable exception")
     136      old_stderr = sys.stderr
     137      try:
     138          support.flush_std_streams()
     139          sys.stderr = support.print_warning.orig_stderr
     140          orig_unraisablehook(unraisable)
     141          sys.stderr.flush()
     142      finally:
     143          sys.stderr = old_stderr
     144  
     145  
     146  def setup_unraisable_hook():
     147      global orig_unraisablehook
     148      orig_unraisablehook = sys.unraisablehook
     149      sys.unraisablehook = regrtest_unraisable_hook
     150  
     151  
     152  orig_threading_excepthook = None
     153  
     154  
     155  def regrtest_threading_excepthook(args):
     156      global orig_threading_excepthook
     157      support.environment_altered = True
     158      support.print_warning(f"Uncaught thread exception: {args.exc_type.__name__}")
     159      old_stderr = sys.stderr
     160      try:
     161          support.flush_std_streams()
     162          sys.stderr = support.print_warning.orig_stderr
     163          orig_threading_excepthook(args)
     164          sys.stderr.flush()
     165      finally:
     166          sys.stderr = old_stderr
     167  
     168  
     169  def setup_threading_excepthook():
     170      global orig_threading_excepthook
     171      import threading
     172      orig_threading_excepthook = threading.excepthook
     173      threading.excepthook = regrtest_threading_excepthook
     174  
     175  
     176  def clear_caches():
     177      # Clear the warnings registry, so they can be displayed again
     178      for mod in sys.modules.values():
     179          if hasattr(mod, '__warningregistry__'):
     180              del mod.__warningregistry__
     181  
     182      # Flush standard output, so that buffered data is sent to the OS and
     183      # associated Python objects are reclaimed.
     184      for stream in (sys.stdout, sys.stderr, sys.__stdout__, sys.__stderr__):
     185          if stream is not None:
     186              stream.flush()
     187  
     188      # Clear assorted module caches.
     189      # Don't worry about resetting the cache if the module is not loaded
     190      try:
     191          distutils_dir_util = sys.modules['distutils.dir_util']
     192      except KeyError:
     193          pass
     194      else:
     195          distutils_dir_util._path_created.clear()
     196  
     197      try:
     198          re = sys.modules['re']
     199      except KeyError:
     200          pass
     201      else:
     202          re.purge()
     203  
     204      try:
     205          _strptime = sys.modules['_strptime']
     206      except KeyError:
     207          pass
     208      else:
     209          _strptime._regex_cache.clear()
     210  
     211      try:
     212          urllib_parse = sys.modules['urllib.parse']
     213      except KeyError:
     214          pass
     215      else:
     216          urllib_parse.clear_cache()
     217  
     218      try:
     219          urllib_request = sys.modules['urllib.request']
     220      except KeyError:
     221          pass
     222      else:
     223          urllib_request.urlcleanup()
     224  
     225      try:
     226          linecache = sys.modules['linecache']
     227      except KeyError:
     228          pass
     229      else:
     230          linecache.clearcache()
     231  
     232      try:
     233          mimetypes = sys.modules['mimetypes']
     234      except KeyError:
     235          pass
     236      else:
     237          mimetypes._default_mime_types()
     238  
     239      try:
     240          filecmp = sys.modules['filecmp']
     241      except KeyError:
     242          pass
     243      else:
     244          filecmp._cache.clear()
     245  
     246      try:
     247          struct = sys.modules['struct']
     248      except KeyError:
     249          pass
     250      else:
     251          struct._clearcache()
     252  
     253      try:
     254          doctest = sys.modules['doctest']
     255      except KeyError:
     256          pass
     257      else:
     258          doctest.master = None
     259  
     260      try:
     261          ctypes = sys.modules['ctypes']
     262      except KeyError:
     263          pass
     264      else:
     265          ctypes._reset_cache()
     266  
     267      try:
     268          typing = sys.modules['typing']
     269      except KeyError:
     270          pass
     271      else:
     272          for f in typing._cleanups:
     273              f()
     274  
     275  
     276  def get_build_info():
     277      # Get most important configure and build options as a list of strings.
     278      # Example: ['debug', 'ASAN+MSAN'] or ['release', 'LTO+PGO'].
     279  
     280      config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
     281      cflags = sysconfig.get_config_var('PY_CFLAGS') or ''
     282      cflags_nodist = sysconfig.get_config_var('PY_CFLAGS_NODIST') or ''
     283      ldflags_nodist = sysconfig.get_config_var('PY_LDFLAGS_NODIST') or ''
     284  
     285      build = []
     286  
     287      # --disable-gil
     288      if sysconfig.get_config_var('Py_NOGIL'):
     289          build.append("nogil")
     290  
     291      if hasattr(sys, 'gettotalrefcount'):
     292          # --with-pydebug
     293          build.append('debug')
     294  
     295          if '-DNDEBUG' in (cflags + cflags_nodist):
     296              build.append('without_assert')
     297      else:
     298          build.append('release')
     299  
     300          if '--with-assertions' in config_args:
     301              build.append('with_assert')
     302          elif '-DNDEBUG' not in (cflags + cflags_nodist):
     303              build.append('with_assert')
     304  
     305      # --enable-framework=name
     306      framework = sysconfig.get_config_var('PYTHONFRAMEWORK')
     307      if framework:
     308          build.append(f'framework={framework}')
     309  
     310      # --enable-shared
     311      shared = int(sysconfig.get_config_var('PY_ENABLE_SHARED') or '0')
     312      if shared:
     313          build.append('shared')
     314  
     315      # --with-lto
     316      optimizations = []
     317      if '-flto=thin' in ldflags_nodist:
     318          optimizations.append('ThinLTO')
     319      elif '-flto' in ldflags_nodist:
     320          optimizations.append('LTO')
     321  
     322      if support.check_cflags_pgo():
     323          # PGO (--enable-optimizations)
     324          optimizations.append('PGO')
     325      if optimizations:
     326          build.append('+'.join(optimizations))
     327  
     328      # --with-address-sanitizer
     329      sanitizers = []
     330      if support.check_sanitizer(address=True):
     331          sanitizers.append("ASAN")
     332      # --with-memory-sanitizer
     333      if support.check_sanitizer(memory=True):
     334          sanitizers.append("MSAN")
     335      # --with-undefined-behavior-sanitizer
     336      if support.check_sanitizer(ub=True):
     337          sanitizers.append("UBSAN")
     338      if sanitizers:
     339          build.append('+'.join(sanitizers))
     340  
     341      # --with-trace-refs
     342      if hasattr(sys, 'getobjects'):
     343          build.append("TraceRefs")
     344      # --enable-pystats
     345      if hasattr(sys, '_stats_on'):
     346          build.append("pystats")
     347      # --with-valgrind
     348      if sysconfig.get_config_var('WITH_VALGRIND'):
     349          build.append("valgrind")
     350      # --with-dtrace
     351      if sysconfig.get_config_var('WITH_DTRACE'):
     352          build.append("dtrace")
     353  
     354      return build
     355  
     356  
     357  def get_temp_dir(tmp_dir: StrPath | None = None) -> StrPath:
     358      if tmp_dir:
     359          tmp_dir = os.path.expanduser(tmp_dir)
     360      else:
     361          # When tests are run from the Python build directory, it is best practice
     362          # to keep the test files in a subfolder.  This eases the cleanup of leftover
     363          # files using the "make distclean" command.
     364          if sysconfig.is_python_build():
     365              if not support.is_wasi:
     366                  tmp_dir = sysconfig.get_config_var('abs_builddir')
     367                  if tmp_dir is None:
     368                      tmp_dir = sysconfig.get_config_var('abs_srcdir')
     369                      if not tmp_dir:
     370                          # gh-74470: On Windows, only srcdir is available. Using
     371                          # abs_builddir mostly matters on UNIX when building
     372                          # Python out of the source tree, especially when the
     373                          # source tree is read only.
     374                          tmp_dir = sysconfig.get_config_var('srcdir')
     375                          if not tmp_dir:
     376                              raise RuntimeError(
     377                                  "Could not determine the correct value for tmp_dir"
     378                              )
     379                  tmp_dir = os.path.join(tmp_dir, 'build')
     380              else:
     381                  # WASI platform
     382                  tmp_dir = sysconfig.get_config_var('projectbase')
     383                  if not tmp_dir:
     384                      raise RuntimeError(
     385                          "sysconfig.get_config_var('projectbase') "
     386                          f"unexpectedly returned {tmp_dir!r} on WASI"
     387                      )
     388                  tmp_dir = os.path.join(tmp_dir, 'build')
     389  
     390                  # When get_temp_dir() is called in a worker process,
     391                  # get_temp_dir() path is different than in the parent process
     392                  # which is not a WASI process. So the parent does not create
     393                  # the same "tmp_dir" than the test worker process.
     394                  os.makedirs(tmp_dir, exist_ok=True)
     395          else:
     396              tmp_dir = tempfile.gettempdir()
     397  
     398      return os.path.abspath(tmp_dir)
     399  
     400  
     401  def fix_umask():
     402      if support.is_emscripten:
     403          # Emscripten has default umask 0o777, which breaks some tests.
     404          # see https://github.com/emscripten-core/emscripten/issues/17269
     405          old_mask = os.umask(0)
     406          if old_mask == 0o777:
     407              os.umask(0o027)
     408          else:
     409              os.umask(old_mask)
     410  
     411  
     412  def get_work_dir(parent_dir: StrPath, worker: bool = False) -> StrPath:
     413      # Define a writable temp dir that will be used as cwd while running
     414      # the tests. The name of the dir includes the pid to allow parallel
     415      # testing (see the -j option).
     416      # Emscripten and WASI have stubbed getpid(), Emscripten has only
     417      # milisecond clock resolution. Use randint() instead.
     418      if support.is_emscripten or support.is_wasi:
     419          nounce = random.randint(0, 1_000_000)
     420      else:
     421          nounce = os.getpid()
     422  
     423      if worker:
     424          work_dir = WORK_DIR_PREFIX + str(nounce)
     425      else:
     426          work_dir = WORKER_WORK_DIR_PREFIX + str(nounce)
     427      work_dir += os_helper.FS_NONASCII
     428      work_dir = os.path.join(parent_dir, work_dir)
     429      return work_dir
     430  
     431  
     432  @contextlib.contextmanager
     433  def exit_timeout():
     434      try:
     435          yield
     436      except SystemExit as exc:
     437          # bpo-38203: Python can hang at exit in Py_Finalize(), especially
     438          # on threading._shutdown() call: put a timeout
     439          if threading_helper.can_start_thread:
     440              faulthandler.dump_traceback_later(EXIT_TIMEOUT, exit=True)
     441          sys.exit(exc.code)
     442  
     443  
     444  def remove_testfn(test_name: TestName, verbose: int) -> None:
     445      # Try to clean up os_helper.TESTFN if left behind.
     446      #
     447      # While tests shouldn't leave any files or directories behind, when a test
     448      # fails that can be tedious for it to arrange.  The consequences can be
     449      # especially nasty on Windows, since if a test leaves a file open, it
     450      # cannot be deleted by name (while there's nothing we can do about that
     451      # here either, we can display the name of the offending test, which is a
     452      # real help).
     453      name = os_helper.TESTFN
     454      if not os.path.exists(name):
     455          return
     456  
     457      nuker: Callable[[str], None]
     458      if os.path.isdir(name):
     459          import shutil
     460          kind, nuker = "directory", shutil.rmtree
     461      elif os.path.isfile(name):
     462          kind, nuker = "file", os.unlink
     463      else:
     464          raise RuntimeError(f"os.path says {name!r} exists but is neither "
     465                             f"directory nor file")
     466  
     467      if verbose:
     468          print_warning(f"{test_name} left behind {kind} {name!r}")
     469          support.environment_altered = True
     470  
     471      try:
     472          import stat
     473          # fix possible permissions problems that might prevent cleanup
     474          os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
     475          nuker(name)
     476      except Exception as exc:
     477          print_warning(f"{test_name} left behind {kind} {name!r} "
     478                        f"and it couldn't be removed: {exc}")
     479  
     480  
     481  def abs_module_name(test_name: TestName, test_dir: StrPath | None) -> TestName:
     482      if test_name.startswith('test.') or test_dir:
     483          return test_name
     484      else:
     485          # Import it from the test package
     486          return 'test.' + test_name
     487  
     488  
     489  # gh-90681: When rerunning tests, we might need to rerun the whole
     490  # class or module suite if some its life-cycle hooks fail.
     491  # Test level hooks are not affected.
     492  _TEST_LIFECYCLE_HOOKS = frozenset((
     493      'setUpClass', 'tearDownClass',
     494      'setUpModule', 'tearDownModule',
     495  ))
     496  
     497  def normalize_test_name(test_full_name, *, is_error=False):
     498      short_name = test_full_name.split(" ")[0]
     499      if is_error and short_name in _TEST_LIFECYCLE_HOOKS:
     500          if test_full_name.startswith(('setUpModule (', 'tearDownModule (')):
     501              # if setUpModule() or tearDownModule() failed, don't filter
     502              # tests with the test file name, don't use use filters.
     503              return None
     504  
     505          # This means that we have a failure in a life-cycle hook,
     506          # we need to rerun the whole module or class suite.
     507          # Basically the error looks like this:
     508          #    ERROR: setUpClass (test.test_reg_ex.RegTest)
     509          # or
     510          #    ERROR: setUpModule (test.test_reg_ex)
     511          # So, we need to parse the class / module name.
     512          lpar = test_full_name.index('(')
     513          rpar = test_full_name.index(')')
     514          return test_full_name[lpar + 1: rpar].split('.')[-1]
     515      return short_name
     516  
     517  
     518  def adjust_rlimit_nofile():
     519      """
     520      On macOS the default fd limit (RLIMIT_NOFILE) is sometimes too low (256)
     521      for our test suite to succeed. Raise it to something more reasonable. 1024
     522      is a common Linux default.
     523      """
     524      try:
     525          import resource
     526      except ImportError:
     527          return
     528  
     529      fd_limit, max_fds = resource.getrlimit(resource.RLIMIT_NOFILE)
     530  
     531      desired_fds = 1024
     532  
     533      if fd_limit < desired_fds and fd_limit < max_fds:
     534          new_fd_limit = min(desired_fds, max_fds)
     535          try:
     536              resource.setrlimit(resource.RLIMIT_NOFILE,
     537                                 (new_fd_limit, max_fds))
     538              print(f"Raised RLIMIT_NOFILE: {fd_limit} -> {new_fd_limit}")
     539          except (ValueError, OSError) as err:
     540              print_warning(f"Unable to raise RLIMIT_NOFILE from {fd_limit} to "
     541                            f"{new_fd_limit}: {err}.")
     542  
     543  
     544  def get_host_runner():
     545      if (hostrunner := os.environ.get("_PYTHON_HOSTRUNNER")) is None:
     546          hostrunner = sysconfig.get_config_var("HOSTRUNNER")
     547      return hostrunner
     548  
     549  
     550  def is_cross_compiled():
     551      return ('_PYTHON_HOST_PLATFORM' in os.environ)
     552  
     553  
     554  def format_resources(use_resources: Iterable[str]):
     555      use_resources = set(use_resources)
     556      all_resources = set(ALL_RESOURCES)
     557  
     558      # Express resources relative to "all"
     559      relative_all = ['all']
     560      for name in sorted(all_resources - use_resources):
     561          relative_all.append(f'-{name}')
     562      for name in sorted(use_resources - all_resources):
     563          relative_all.append(f'{name}')
     564      all_text = ','.join(relative_all)
     565      all_text = f"resources: {all_text}"
     566  
     567      # List of enabled resources
     568      text = ','.join(sorted(use_resources))
     569      text = f"resources ({len(use_resources)}): {text}"
     570  
     571      # Pick the shortest string (prefer relative to all if lengths are equal)
     572      if len(all_text) <= len(text):
     573          return all_text
     574      else:
     575          return text
     576  
     577  
     578  def process_cpu_count():
     579      if hasattr(os, 'sched_getaffinity'):
     580          return len(os.sched_getaffinity(0))
     581      else:
     582          return os.cpu_count()
     583  
     584  
     585  def display_header(use_resources: tuple[str, ...],
     586                     python_cmd: tuple[str, ...] | None):
     587      # Print basic platform information
     588      print("==", platform.python_implementation(), *sys.version.split())
     589      print("==", platform.platform(aliased=True),
     590                    "%s-endian" % sys.byteorder)
     591      print("== Python build:", ' '.join(get_build_info()))
     592      print("== cwd:", os.getcwd())
     593  
     594      cpu_count: object = os.cpu_count()
     595      if cpu_count:
     596          affinity = process_cpu_count()
     597          if affinity and affinity != cpu_count:
     598              cpu_count = f"{affinity} (process) / {cpu_count} (system)"
     599          print("== CPU count:", cpu_count)
     600      print("== encodings: locale=%s FS=%s"
     601            % (locale.getencoding(), sys.getfilesystemencoding()))
     602  
     603      if use_resources:
     604          text = format_resources(use_resources)
     605          print(f"== {text}")
     606      else:
     607          print("== resources: all test resources are disabled, "
     608                "use -u option to unskip tests")
     609  
     610      cross_compile = is_cross_compiled()
     611      if cross_compile:
     612          print("== cross compiled: Yes")
     613      if python_cmd:
     614          cmd = shlex.join(python_cmd)
     615          print(f"== host python: {cmd}")
     616  
     617          get_cmd = [*python_cmd, '-m', 'platform']
     618          proc = subprocess.run(
     619              get_cmd,
     620              stdout=subprocess.PIPE,
     621              text=True,
     622              cwd=os_helper.SAVEDCWD)
     623          stdout = proc.stdout.replace('\n', ' ').strip()
     624          if stdout:
     625              print(f"== host platform: {stdout}")
     626          elif proc.returncode:
     627              print(f"== host platform: <command failed with exit code {proc.returncode}>")
     628      else:
     629          hostrunner = get_host_runner()
     630          if hostrunner:
     631              print(f"== host runner: {hostrunner}")
     632  
     633      # This makes it easier to remember what to set in your local
     634      # environment when trying to reproduce a sanitizer failure.
     635      asan = support.check_sanitizer(address=True)
     636      msan = support.check_sanitizer(memory=True)
     637      ubsan = support.check_sanitizer(ub=True)
     638      sanitizers = []
     639      if asan:
     640          sanitizers.append("address")
     641      if msan:
     642          sanitizers.append("memory")
     643      if ubsan:
     644          sanitizers.append("undefined behavior")
     645      if sanitizers:
     646          print(f"== sanitizers: {', '.join(sanitizers)}")
     647          for sanitizer, env_var in (
     648              (asan, "ASAN_OPTIONS"),
     649              (msan, "MSAN_OPTIONS"),
     650              (ubsan, "UBSAN_OPTIONS"),
     651          ):
     652              options= os.environ.get(env_var)
     653              if sanitizer and options is not None:
     654                  print(f"== {env_var}={options!r}")
     655  
     656      print(flush=True)
     657  
     658  
     659  def cleanup_temp_dir(tmp_dir: StrPath):
     660      import glob
     661  
     662      path = os.path.join(glob.escape(tmp_dir), TMP_PREFIX + '*')
     663      print("Cleanup %s directory" % tmp_dir)
     664      for name in glob.glob(path):
     665          if os.path.isdir(name):
     666              print("Remove directory: %s" % name)
     667              os_helper.rmtree(name)
     668          else:
     669              print("Remove file: %s" % name)
     670              os_helper.unlink(name)
     671  
     672  WINDOWS_STATUS = {
     673      0xC0000005: "STATUS_ACCESS_VIOLATION",
     674      0xC00000FD: "STATUS_STACK_OVERFLOW",
     675      0xC000013A: "STATUS_CONTROL_C_EXIT",
     676  }
     677  
     678  def get_signal_name(exitcode):
     679      if exitcode < 0:
     680          signum = -exitcode
     681          try:
     682              return signal.Signals(signum).name
     683          except ValueError:
     684              pass
     685  
     686      try:
     687          return WINDOWS_STATUS[exitcode]
     688      except KeyError:
     689          pass
     690  
     691      return None