python (3.12.0)
1 import builtins
2 import locale
3 import os
4 import sys
5 import threading
6 from test import support
7 from test.support import os_helper
8 from test.libregrtest.utils import print_warning
9
10
11 class ESC[4;38;5;81mSkipTestEnvironment(ESC[4;38;5;149mException):
12 pass
13
14
15 # Unit tests are supposed to leave the execution environment unchanged
16 # once they complete. But sometimes tests have bugs, especially when
17 # tests fail, and the changes to environment go on to mess up other
18 # tests. This can cause issues with buildbot stability, since tests
19 # are run in random order and so problems may appear to come and go.
20 # There are a few things we can save and restore to mitigate this, and
21 # the following context manager handles this task.
22
23 class ESC[4;38;5;81msaved_test_environment:
24 """Save bits of the test environment and restore them at block exit.
25
26 with saved_test_environment(test_name, verbose, quiet):
27 #stuff
28
29 Unless quiet is True, a warning is printed to stderr if any of
30 the saved items was changed by the test. The support.environment_altered
31 attribute is set to True if a change is detected.
32
33 If verbose is more than 1, the before and after state of changed
34 items is also printed.
35 """
36
37 def __init__(self, test_name, verbose=0, quiet=False, *, pgo=False):
38 self.test_name = test_name
39 self.verbose = verbose
40 self.quiet = quiet
41 self.pgo = pgo
42
43 # To add things to save and restore, add a name XXX to the resources list
44 # and add corresponding get_XXX/restore_XXX functions. get_XXX should
45 # return the value to be saved and compared against a second call to the
46 # get function when test execution completes. restore_XXX should accept
47 # the saved value and restore the resource using it. It will be called if
48 # and only if a change in the value is detected.
49 #
50 # Note: XXX will have any '.' replaced with '_' characters when determining
51 # the corresponding method names.
52
53 resources = ('sys.argv', 'cwd', 'sys.stdin', 'sys.stdout', 'sys.stderr',
54 'os.environ', 'sys.path', 'sys.path_hooks', '__import__',
55 'warnings.filters', 'asyncore.socket_map',
56 'logging._handlers', 'logging._handlerList', 'sys.gettrace',
57 'sys.warnoptions',
58 # multiprocessing.process._cleanup() may release ref
59 # to a thread, so check processes first.
60 'multiprocessing.process._dangling', 'threading._dangling',
61 'sysconfig._CONFIG_VARS', 'sysconfig._INSTALL_SCHEMES',
62 'files', 'locale', 'warnings.showwarning',
63 'shutil_archive_formats', 'shutil_unpack_formats',
64 'asyncio.events._event_loop_policy',
65 'urllib.requests._url_tempfiles', 'urllib.requests._opener',
66 )
67
68 def get_module(self, name):
69 # function for restore() methods
70 return sys.modules[name]
71
72 def try_get_module(self, name):
73 # function for get() methods
74 try:
75 return self.get_module(name)
76 except KeyError:
77 raise SkipTestEnvironment
78
79 def get_urllib_requests__url_tempfiles(self):
80 urllib_request = self.try_get_module('urllib.request')
81 return list(urllib_request._url_tempfiles)
82 def restore_urllib_requests__url_tempfiles(self, tempfiles):
83 for filename in tempfiles:
84 os_helper.unlink(filename)
85
86 def get_urllib_requests__opener(self):
87 urllib_request = self.try_get_module('urllib.request')
88 return urllib_request._opener
89 def restore_urllib_requests__opener(self, opener):
90 urllib_request = self.get_module('urllib.request')
91 urllib_request._opener = opener
92
93 def get_asyncio_events__event_loop_policy(self):
94 self.try_get_module('asyncio')
95 return support.maybe_get_event_loop_policy()
96 def restore_asyncio_events__event_loop_policy(self, policy):
97 asyncio = self.get_module('asyncio')
98 asyncio.set_event_loop_policy(policy)
99
100 def get_sys_argv(self):
101 return id(sys.argv), sys.argv, sys.argv[:]
102 def restore_sys_argv(self, saved_argv):
103 sys.argv = saved_argv[1]
104 sys.argv[:] = saved_argv[2]
105
106 def get_cwd(self):
107 return os.getcwd()
108 def restore_cwd(self, saved_cwd):
109 os.chdir(saved_cwd)
110
111 def get_sys_stdout(self):
112 return sys.stdout
113 def restore_sys_stdout(self, saved_stdout):
114 sys.stdout = saved_stdout
115
116 def get_sys_stderr(self):
117 return sys.stderr
118 def restore_sys_stderr(self, saved_stderr):
119 sys.stderr = saved_stderr
120
121 def get_sys_stdin(self):
122 return sys.stdin
123 def restore_sys_stdin(self, saved_stdin):
124 sys.stdin = saved_stdin
125
126 def get_os_environ(self):
127 return id(os.environ), os.environ, dict(os.environ)
128 def restore_os_environ(self, saved_environ):
129 os.environ = saved_environ[1]
130 os.environ.clear()
131 os.environ.update(saved_environ[2])
132
133 def get_sys_path(self):
134 return id(sys.path), sys.path, sys.path[:]
135 def restore_sys_path(self, saved_path):
136 sys.path = saved_path[1]
137 sys.path[:] = saved_path[2]
138
139 def get_sys_path_hooks(self):
140 return id(sys.path_hooks), sys.path_hooks, sys.path_hooks[:]
141 def restore_sys_path_hooks(self, saved_hooks):
142 sys.path_hooks = saved_hooks[1]
143 sys.path_hooks[:] = saved_hooks[2]
144
145 def get_sys_gettrace(self):
146 return sys.gettrace()
147 def restore_sys_gettrace(self, trace_fxn):
148 sys.settrace(trace_fxn)
149
150 def get___import__(self):
151 return builtins.__import__
152 def restore___import__(self, import_):
153 builtins.__import__ = import_
154
155 def get_warnings_filters(self):
156 warnings = self.try_get_module('warnings')
157 return id(warnings.filters), warnings.filters, warnings.filters[:]
158 def restore_warnings_filters(self, saved_filters):
159 warnings = self.get_module('warnings')
160 warnings.filters = saved_filters[1]
161 warnings.filters[:] = saved_filters[2]
162
163 def get_asyncore_socket_map(self):
164 asyncore = sys.modules.get('test.support.asyncore')
165 # XXX Making a copy keeps objects alive until __exit__ gets called.
166 return asyncore and asyncore.socket_map.copy() or {}
167 def restore_asyncore_socket_map(self, saved_map):
168 asyncore = sys.modules.get('test.support.asyncore')
169 if asyncore is not None:
170 asyncore.close_all(ignore_all=True)
171 asyncore.socket_map.update(saved_map)
172
173 def get_shutil_archive_formats(self):
174 shutil = self.try_get_module('shutil')
175 # we could call get_archives_formats() but that only returns the
176 # registry keys; we want to check the values too (the functions that
177 # are registered)
178 return shutil._ARCHIVE_FORMATS, shutil._ARCHIVE_FORMATS.copy()
179 def restore_shutil_archive_formats(self, saved):
180 shutil = self.get_module('shutil')
181 shutil._ARCHIVE_FORMATS = saved[0]
182 shutil._ARCHIVE_FORMATS.clear()
183 shutil._ARCHIVE_FORMATS.update(saved[1])
184
185 def get_shutil_unpack_formats(self):
186 shutil = self.try_get_module('shutil')
187 return shutil._UNPACK_FORMATS, shutil._UNPACK_FORMATS.copy()
188 def restore_shutil_unpack_formats(self, saved):
189 shutil = self.get_module('shutil')
190 shutil._UNPACK_FORMATS = saved[0]
191 shutil._UNPACK_FORMATS.clear()
192 shutil._UNPACK_FORMATS.update(saved[1])
193
194 def get_logging__handlers(self):
195 logging = self.try_get_module('logging')
196 # _handlers is a WeakValueDictionary
197 return id(logging._handlers), logging._handlers, logging._handlers.copy()
198 def restore_logging__handlers(self, saved_handlers):
199 # Can't easily revert the logging state
200 pass
201
202 def get_logging__handlerList(self):
203 logging = self.try_get_module('logging')
204 # _handlerList is a list of weakrefs to handlers
205 return id(logging._handlerList), logging._handlerList, logging._handlerList[:]
206 def restore_logging__handlerList(self, saved_handlerList):
207 # Can't easily revert the logging state
208 pass
209
210 def get_sys_warnoptions(self):
211 return id(sys.warnoptions), sys.warnoptions, sys.warnoptions[:]
212 def restore_sys_warnoptions(self, saved_options):
213 sys.warnoptions = saved_options[1]
214 sys.warnoptions[:] = saved_options[2]
215
216 # Controlling dangling references to Thread objects can make it easier
217 # to track reference leaks.
218 def get_threading__dangling(self):
219 # This copies the weakrefs without making any strong reference
220 return threading._dangling.copy()
221 def restore_threading__dangling(self, saved):
222 threading._dangling.clear()
223 threading._dangling.update(saved)
224
225 # Same for Process objects
226 def get_multiprocessing_process__dangling(self):
227 multiprocessing_process = self.try_get_module('multiprocessing.process')
228 # Unjoined process objects can survive after process exits
229 multiprocessing_process._cleanup()
230 # This copies the weakrefs without making any strong reference
231 return multiprocessing_process._dangling.copy()
232 def restore_multiprocessing_process__dangling(self, saved):
233 multiprocessing_process = self.get_module('multiprocessing.process')
234 multiprocessing_process._dangling.clear()
235 multiprocessing_process._dangling.update(saved)
236
237 def get_sysconfig__CONFIG_VARS(self):
238 # make sure the dict is initialized
239 sysconfig = self.try_get_module('sysconfig')
240 sysconfig.get_config_var('prefix')
241 return (id(sysconfig._CONFIG_VARS), sysconfig._CONFIG_VARS,
242 dict(sysconfig._CONFIG_VARS))
243 def restore_sysconfig__CONFIG_VARS(self, saved):
244 sysconfig = self.get_module('sysconfig')
245 sysconfig._CONFIG_VARS = saved[1]
246 sysconfig._CONFIG_VARS.clear()
247 sysconfig._CONFIG_VARS.update(saved[2])
248
249 def get_sysconfig__INSTALL_SCHEMES(self):
250 sysconfig = self.try_get_module('sysconfig')
251 return (id(sysconfig._INSTALL_SCHEMES), sysconfig._INSTALL_SCHEMES,
252 sysconfig._INSTALL_SCHEMES.copy())
253 def restore_sysconfig__INSTALL_SCHEMES(self, saved):
254 sysconfig = self.get_module('sysconfig')
255 sysconfig._INSTALL_SCHEMES = saved[1]
256 sysconfig._INSTALL_SCHEMES.clear()
257 sysconfig._INSTALL_SCHEMES.update(saved[2])
258
259 def get_files(self):
260 # XXX: Maybe add an allow-list here?
261 return sorted(fn + ('/' if os.path.isdir(fn) else '')
262 for fn in os.listdir()
263 if not fn.startswith(".hypothesis"))
264 def restore_files(self, saved_value):
265 fn = os_helper.TESTFN
266 if fn not in saved_value and (fn + '/') not in saved_value:
267 if os.path.isfile(fn):
268 os_helper.unlink(fn)
269 elif os.path.isdir(fn):
270 os_helper.rmtree(fn)
271
272 _lc = [getattr(locale, lc) for lc in dir(locale)
273 if lc.startswith('LC_')]
274 def get_locale(self):
275 pairings = []
276 for lc in self._lc:
277 try:
278 pairings.append((lc, locale.setlocale(lc, None)))
279 except (TypeError, ValueError):
280 continue
281 return pairings
282 def restore_locale(self, saved):
283 for lc, setting in saved:
284 locale.setlocale(lc, setting)
285
286 def get_warnings_showwarning(self):
287 warnings = self.try_get_module('warnings')
288 return warnings.showwarning
289 def restore_warnings_showwarning(self, fxn):
290 warnings = self.get_module('warnings')
291 warnings.showwarning = fxn
292
293 def resource_info(self):
294 for name in self.resources:
295 method_suffix = name.replace('.', '_')
296 get_name = 'get_' + method_suffix
297 restore_name = 'restore_' + method_suffix
298 yield name, getattr(self, get_name), getattr(self, restore_name)
299
300 def __enter__(self):
301 self.saved_values = []
302 for name, get, restore in self.resource_info():
303 try:
304 original = get()
305 except SkipTestEnvironment:
306 continue
307
308 self.saved_values.append((name, get, restore, original))
309 return self
310
311 def __exit__(self, exc_type, exc_val, exc_tb):
312 saved_values = self.saved_values
313 self.saved_values = None
314
315 # Some resources use weak references
316 support.gc_collect()
317
318 for name, get, restore, original in saved_values:
319 current = get()
320 # Check for changes to the resource's value
321 if current != original:
322 support.environment_altered = True
323 restore(original)
324 if not self.quiet and not self.pgo:
325 print_warning(
326 f"{name} was modified by {self.test_name}\n"
327 f" Before: {original}\n"
328 f" After: {current} ")
329 return False