(root)/
Python-3.11.7/
Lib/
test/
libregrtest/
save_env.py
       1  import builtins
       2  import locale
       3  import os
       4  import sys
       5  import threading
       6  
       7  from test import support
       8  from test.support import os_helper
       9  
      10  from .utils import print_warning
      11  
      12  
      13  class ESC[4;38;5;81mSkipTestEnvironment(ESC[4;38;5;149mException):
      14      pass
      15  
      16  
      17  # Unit tests are supposed to leave the execution environment unchanged
      18  # once they complete.  But sometimes tests have bugs, especially when
      19  # tests fail, and the changes to environment go on to mess up other
      20  # tests.  This can cause issues with buildbot stability, since tests
      21  # are run in random order and so problems may appear to come and go.
      22  # There are a few things we can save and restore to mitigate this, and
      23  # the following context manager handles this task.
      24  
      25  class ESC[4;38;5;81msaved_test_environment:
      26      """Save bits of the test environment and restore them at block exit.
      27  
      28          with saved_test_environment(test_name, verbose, quiet):
      29              #stuff
      30  
      31      Unless quiet is True, a warning is printed to stderr if any of
      32      the saved items was changed by the test. The support.environment_altered
      33      attribute is set to True if a change is detected.
      34  
      35      If verbose is more than 1, the before and after state of changed
      36      items is also printed.
      37      """
      38  
      39      def __init__(self, test_name, verbose, quiet, *, pgo):
      40          self.test_name = test_name
      41          self.verbose = verbose
      42          self.quiet = quiet
      43          self.pgo = pgo
      44  
      45      # To add things to save and restore, add a name XXX to the resources list
      46      # and add corresponding get_XXX/restore_XXX functions.  get_XXX should
      47      # return the value to be saved and compared against a second call to the
      48      # get function when test execution completes.  restore_XXX should accept
      49      # the saved value and restore the resource using it.  It will be called if
      50      # and only if a change in the value is detected.
      51      #
      52      # Note: XXX will have any '.' replaced with '_' characters when determining
      53      # the corresponding method names.
      54  
      55      resources = ('sys.argv', 'cwd', 'sys.stdin', 'sys.stdout', 'sys.stderr',
      56                   'os.environ', 'sys.path', 'sys.path_hooks', '__import__',
      57                   'warnings.filters', 'asyncore.socket_map',
      58                   'logging._handlers', 'logging._handlerList', 'sys.gettrace',
      59                   'sys.warnoptions',
      60                   # multiprocessing.process._cleanup() may release ref
      61                   # to a thread, so check processes first.
      62                   'multiprocessing.process._dangling', 'threading._dangling',
      63                   'sysconfig._CONFIG_VARS', 'sysconfig._INSTALL_SCHEMES',
      64                   'files', 'locale', 'warnings.showwarning',
      65                   'shutil_archive_formats', 'shutil_unpack_formats',
      66                   'asyncio.events._event_loop_policy',
      67                   'urllib.requests._url_tempfiles', 'urllib.requests._opener',
      68                  )
      69  
      70      def get_module(self, name):
      71          # function for restore() methods
      72          return sys.modules[name]
      73  
      74      def try_get_module(self, name):
      75          # function for get() methods
      76          try:
      77              return self.get_module(name)
      78          except KeyError:
      79              raise SkipTestEnvironment
      80  
      81      def get_urllib_requests__url_tempfiles(self):
      82          urllib_request = self.try_get_module('urllib.request')
      83          return list(urllib_request._url_tempfiles)
      84      def restore_urllib_requests__url_tempfiles(self, tempfiles):
      85          for filename in tempfiles:
      86              os_helper.unlink(filename)
      87  
      88      def get_urllib_requests__opener(self):
      89          urllib_request = self.try_get_module('urllib.request')
      90          return urllib_request._opener
      91      def restore_urllib_requests__opener(self, opener):
      92          urllib_request = self.get_module('urllib.request')
      93          urllib_request._opener = opener
      94  
      95      def get_asyncio_events__event_loop_policy(self):
      96          self.try_get_module('asyncio')
      97          return support.maybe_get_event_loop_policy()
      98      def restore_asyncio_events__event_loop_policy(self, policy):
      99          asyncio = self.get_module('asyncio')
     100          asyncio.set_event_loop_policy(policy)
     101  
     102      def get_sys_argv(self):
     103          return id(sys.argv), sys.argv, sys.argv[:]
     104      def restore_sys_argv(self, saved_argv):
     105          sys.argv = saved_argv[1]
     106          sys.argv[:] = saved_argv[2]
     107  
     108      def get_cwd(self):
     109          return os.getcwd()
     110      def restore_cwd(self, saved_cwd):
     111          os.chdir(saved_cwd)
     112  
     113      def get_sys_stdout(self):
     114          return sys.stdout
     115      def restore_sys_stdout(self, saved_stdout):
     116          sys.stdout = saved_stdout
     117  
     118      def get_sys_stderr(self):
     119          return sys.stderr
     120      def restore_sys_stderr(self, saved_stderr):
     121          sys.stderr = saved_stderr
     122  
     123      def get_sys_stdin(self):
     124          return sys.stdin
     125      def restore_sys_stdin(self, saved_stdin):
     126          sys.stdin = saved_stdin
     127  
     128      def get_os_environ(self):
     129          return id(os.environ), os.environ, dict(os.environ)
     130      def restore_os_environ(self, saved_environ):
     131          os.environ = saved_environ[1]
     132          os.environ.clear()
     133          os.environ.update(saved_environ[2])
     134  
     135      def get_sys_path(self):
     136          return id(sys.path), sys.path, sys.path[:]
     137      def restore_sys_path(self, saved_path):
     138          sys.path = saved_path[1]
     139          sys.path[:] = saved_path[2]
     140  
     141      def get_sys_path_hooks(self):
     142          return id(sys.path_hooks), sys.path_hooks, sys.path_hooks[:]
     143      def restore_sys_path_hooks(self, saved_hooks):
     144          sys.path_hooks = saved_hooks[1]
     145          sys.path_hooks[:] = saved_hooks[2]
     146  
     147      def get_sys_gettrace(self):
     148          return sys.gettrace()
     149      def restore_sys_gettrace(self, trace_fxn):
     150          sys.settrace(trace_fxn)
     151  
     152      def get___import__(self):
     153          return builtins.__import__
     154      def restore___import__(self, import_):
     155          builtins.__import__ = import_
     156  
     157      def get_warnings_filters(self):
     158          warnings = self.try_get_module('warnings')
     159          return id(warnings.filters), warnings.filters, warnings.filters[:]
     160      def restore_warnings_filters(self, saved_filters):
     161          warnings = self.get_module('warnings')
     162          warnings.filters = saved_filters[1]
     163          warnings.filters[:] = saved_filters[2]
     164  
     165      def get_asyncore_socket_map(self):
     166          asyncore = sys.modules.get('test.support.asyncore')
     167          # XXX Making a copy keeps objects alive until __exit__ gets called.
     168          return asyncore and asyncore.socket_map.copy() or {}
     169      def restore_asyncore_socket_map(self, saved_map):
     170          asyncore = sys.modules.get('test.support.asyncore')
     171          if asyncore is not None:
     172              asyncore.close_all(ignore_all=True)
     173              asyncore.socket_map.update(saved_map)
     174  
     175      def get_shutil_archive_formats(self):
     176          shutil = self.try_get_module('shutil')
     177          # we could call get_archives_formats() but that only returns the
     178          # registry keys; we want to check the values too (the functions that
     179          # are registered)
     180          return shutil._ARCHIVE_FORMATS, shutil._ARCHIVE_FORMATS.copy()
     181      def restore_shutil_archive_formats(self, saved):
     182          shutil = self.get_module('shutil')
     183          shutil._ARCHIVE_FORMATS = saved[0]
     184          shutil._ARCHIVE_FORMATS.clear()
     185          shutil._ARCHIVE_FORMATS.update(saved[1])
     186  
     187      def get_shutil_unpack_formats(self):
     188          shutil = self.try_get_module('shutil')
     189          return shutil._UNPACK_FORMATS, shutil._UNPACK_FORMATS.copy()
     190      def restore_shutil_unpack_formats(self, saved):
     191          shutil = self.get_module('shutil')
     192          shutil._UNPACK_FORMATS = saved[0]
     193          shutil._UNPACK_FORMATS.clear()
     194          shutil._UNPACK_FORMATS.update(saved[1])
     195  
     196      def get_logging__handlers(self):
     197          logging = self.try_get_module('logging')
     198          # _handlers is a WeakValueDictionary
     199          return id(logging._handlers), logging._handlers, logging._handlers.copy()
     200      def restore_logging__handlers(self, saved_handlers):
     201          # Can't easily revert the logging state
     202          pass
     203  
     204      def get_logging__handlerList(self):
     205          logging = self.try_get_module('logging')
     206          # _handlerList is a list of weakrefs to handlers
     207          return id(logging._handlerList), logging._handlerList, logging._handlerList[:]
     208      def restore_logging__handlerList(self, saved_handlerList):
     209          # Can't easily revert the logging state
     210          pass
     211  
     212      def get_sys_warnoptions(self):
     213          return id(sys.warnoptions), sys.warnoptions, sys.warnoptions[:]
     214      def restore_sys_warnoptions(self, saved_options):
     215          sys.warnoptions = saved_options[1]
     216          sys.warnoptions[:] = saved_options[2]
     217  
     218      # Controlling dangling references to Thread objects can make it easier
     219      # to track reference leaks.
     220      def get_threading__dangling(self):
     221          # This copies the weakrefs without making any strong reference
     222          return threading._dangling.copy()
     223      def restore_threading__dangling(self, saved):
     224          threading._dangling.clear()
     225          threading._dangling.update(saved)
     226  
     227      # Same for Process objects
     228      def get_multiprocessing_process__dangling(self):
     229          multiprocessing_process = self.try_get_module('multiprocessing.process')
     230          # Unjoined process objects can survive after process exits
     231          multiprocessing_process._cleanup()
     232          # This copies the weakrefs without making any strong reference
     233          return multiprocessing_process._dangling.copy()
     234      def restore_multiprocessing_process__dangling(self, saved):
     235          multiprocessing_process = self.get_module('multiprocessing.process')
     236          multiprocessing_process._dangling.clear()
     237          multiprocessing_process._dangling.update(saved)
     238  
     239      def get_sysconfig__CONFIG_VARS(self):
     240          # make sure the dict is initialized
     241          sysconfig = self.try_get_module('sysconfig')
     242          sysconfig.get_config_var('prefix')
     243          return (id(sysconfig._CONFIG_VARS), sysconfig._CONFIG_VARS,
     244                  dict(sysconfig._CONFIG_VARS))
     245      def restore_sysconfig__CONFIG_VARS(self, saved):
     246          sysconfig = self.get_module('sysconfig')
     247          sysconfig._CONFIG_VARS = saved[1]
     248          sysconfig._CONFIG_VARS.clear()
     249          sysconfig._CONFIG_VARS.update(saved[2])
     250  
     251      def get_sysconfig__INSTALL_SCHEMES(self):
     252          sysconfig = self.try_get_module('sysconfig')
     253          return (id(sysconfig._INSTALL_SCHEMES), sysconfig._INSTALL_SCHEMES,
     254                  sysconfig._INSTALL_SCHEMES.copy())
     255      def restore_sysconfig__INSTALL_SCHEMES(self, saved):
     256          sysconfig = self.get_module('sysconfig')
     257          sysconfig._INSTALL_SCHEMES = saved[1]
     258          sysconfig._INSTALL_SCHEMES.clear()
     259          sysconfig._INSTALL_SCHEMES.update(saved[2])
     260  
     261      def get_files(self):
     262          # XXX: Maybe add an allow-list here?
     263          return sorted(fn + ('/' if os.path.isdir(fn) else '')
     264                        for fn in os.listdir()
     265                        if not fn.startswith(".hypothesis"))
     266      def restore_files(self, saved_value):
     267          fn = os_helper.TESTFN
     268          if fn not in saved_value and (fn + '/') not in saved_value:
     269              if os.path.isfile(fn):
     270                  os_helper.unlink(fn)
     271              elif os.path.isdir(fn):
     272                  os_helper.rmtree(fn)
     273  
     274      _lc = [getattr(locale, lc) for lc in dir(locale)
     275             if lc.startswith('LC_')]
     276      def get_locale(self):
     277          pairings = []
     278          for lc in self._lc:
     279              try:
     280                  pairings.append((lc, locale.setlocale(lc, None)))
     281              except (TypeError, ValueError):
     282                  continue
     283          return pairings
     284      def restore_locale(self, saved):
     285          for lc, setting in saved:
     286              locale.setlocale(lc, setting)
     287  
     288      def get_warnings_showwarning(self):
     289          warnings = self.try_get_module('warnings')
     290          return warnings.showwarning
     291      def restore_warnings_showwarning(self, fxn):
     292          warnings = self.get_module('warnings')
     293          warnings.showwarning = fxn
     294  
     295      def resource_info(self):
     296          for name in self.resources:
     297              method_suffix = name.replace('.', '_')
     298              get_name = 'get_' + method_suffix
     299              restore_name = 'restore_' + method_suffix
     300              yield name, getattr(self, get_name), getattr(self, restore_name)
     301  
     302      def __enter__(self):
     303          self.saved_values = []
     304          for name, get, restore in self.resource_info():
     305              try:
     306                  original = get()
     307              except SkipTestEnvironment:
     308                  continue
     309  
     310              self.saved_values.append((name, get, restore, original))
     311          return self
     312  
     313      def __exit__(self, exc_type, exc_val, exc_tb):
     314          saved_values = self.saved_values
     315          self.saved_values = None
     316  
     317          # Some resources use weak references
     318          support.gc_collect()
     319  
     320          for name, get, restore, original in saved_values:
     321              current = get()
     322              # Check for changes to the resource's value
     323              if current != original:
     324                  support.environment_altered = True
     325                  restore(original)
     326                  if not self.quiet and not self.pgo:
     327                      print_warning(
     328                          f"{name} was modified by {self.test_name}\n"
     329                          f"  Before: {original}\n"
     330                          f"  After:  {current} ")
     331          return False