1 import builtins
2 import locale
3 import os
4 import sys
5 import threading
6
7 from test import support
8 from test.support import os_helper
9
10 from .utils import print_warning
11
12
13 class ESC[4;38;5;81mSkipTestEnvironment(ESC[4;38;5;149mException):
14 pass
15
16
17 # Unit tests are supposed to leave the execution environment unchanged
18 # once they complete. But sometimes tests have bugs, especially when
19 # tests fail, and the changes to environment go on to mess up other
20 # tests. This can cause issues with buildbot stability, since tests
21 # are run in random order and so problems may appear to come and go.
22 # There are a few things we can save and restore to mitigate this, and
23 # the following context manager handles this task.
24
25 class ESC[4;38;5;81msaved_test_environment:
26 """Save bits of the test environment and restore them at block exit.
27
28 with saved_test_environment(test_name, verbose, quiet):
29 #stuff
30
31 Unless quiet is True, a warning is printed to stderr if any of
32 the saved items was changed by the test. The support.environment_altered
33 attribute is set to True if a change is detected.
34
35 If verbose is more than 1, the before and after state of changed
36 items is also printed.
37 """
38
39 def __init__(self, test_name, verbose, quiet, *, pgo):
40 self.test_name = test_name
41 self.verbose = verbose
42 self.quiet = quiet
43 self.pgo = pgo
44
45 # To add things to save and restore, add a name XXX to the resources list
46 # and add corresponding get_XXX/restore_XXX functions. get_XXX should
47 # return the value to be saved and compared against a second call to the
48 # get function when test execution completes. restore_XXX should accept
49 # the saved value and restore the resource using it. It will be called if
50 # and only if a change in the value is detected.
51 #
52 # Note: XXX will have any '.' replaced with '_' characters when determining
53 # the corresponding method names.
54
55 resources = ('sys.argv', 'cwd', 'sys.stdin', 'sys.stdout', 'sys.stderr',
56 'os.environ', 'sys.path', 'sys.path_hooks', '__import__',
57 'warnings.filters', 'asyncore.socket_map',
58 'logging._handlers', 'logging._handlerList', 'sys.gettrace',
59 'sys.warnoptions',
60 # multiprocessing.process._cleanup() may release ref
61 # to a thread, so check processes first.
62 'multiprocessing.process._dangling', 'threading._dangling',
63 'sysconfig._CONFIG_VARS', 'sysconfig._INSTALL_SCHEMES',
64 'files', 'locale', 'warnings.showwarning',
65 'shutil_archive_formats', 'shutil_unpack_formats',
66 'asyncio.events._event_loop_policy',
67 'urllib.requests._url_tempfiles', 'urllib.requests._opener',
68 )
69
70 def get_module(self, name):
71 # function for restore() methods
72 return sys.modules[name]
73
74 def try_get_module(self, name):
75 # function for get() methods
76 try:
77 return self.get_module(name)
78 except KeyError:
79 raise SkipTestEnvironment
80
81 def get_urllib_requests__url_tempfiles(self):
82 urllib_request = self.try_get_module('urllib.request')
83 return list(urllib_request._url_tempfiles)
84 def restore_urllib_requests__url_tempfiles(self, tempfiles):
85 for filename in tempfiles:
86 os_helper.unlink(filename)
87
88 def get_urllib_requests__opener(self):
89 urllib_request = self.try_get_module('urllib.request')
90 return urllib_request._opener
91 def restore_urllib_requests__opener(self, opener):
92 urllib_request = self.get_module('urllib.request')
93 urllib_request._opener = opener
94
95 def get_asyncio_events__event_loop_policy(self):
96 self.try_get_module('asyncio')
97 return support.maybe_get_event_loop_policy()
98 def restore_asyncio_events__event_loop_policy(self, policy):
99 asyncio = self.get_module('asyncio')
100 asyncio.set_event_loop_policy(policy)
101
102 def get_sys_argv(self):
103 return id(sys.argv), sys.argv, sys.argv[:]
104 def restore_sys_argv(self, saved_argv):
105 sys.argv = saved_argv[1]
106 sys.argv[:] = saved_argv[2]
107
108 def get_cwd(self):
109 return os.getcwd()
110 def restore_cwd(self, saved_cwd):
111 os.chdir(saved_cwd)
112
113 def get_sys_stdout(self):
114 return sys.stdout
115 def restore_sys_stdout(self, saved_stdout):
116 sys.stdout = saved_stdout
117
118 def get_sys_stderr(self):
119 return sys.stderr
120 def restore_sys_stderr(self, saved_stderr):
121 sys.stderr = saved_stderr
122
123 def get_sys_stdin(self):
124 return sys.stdin
125 def restore_sys_stdin(self, saved_stdin):
126 sys.stdin = saved_stdin
127
128 def get_os_environ(self):
129 return id(os.environ), os.environ, dict(os.environ)
130 def restore_os_environ(self, saved_environ):
131 os.environ = saved_environ[1]
132 os.environ.clear()
133 os.environ.update(saved_environ[2])
134
135 def get_sys_path(self):
136 return id(sys.path), sys.path, sys.path[:]
137 def restore_sys_path(self, saved_path):
138 sys.path = saved_path[1]
139 sys.path[:] = saved_path[2]
140
141 def get_sys_path_hooks(self):
142 return id(sys.path_hooks), sys.path_hooks, sys.path_hooks[:]
143 def restore_sys_path_hooks(self, saved_hooks):
144 sys.path_hooks = saved_hooks[1]
145 sys.path_hooks[:] = saved_hooks[2]
146
147 def get_sys_gettrace(self):
148 return sys.gettrace()
149 def restore_sys_gettrace(self, trace_fxn):
150 sys.settrace(trace_fxn)
151
152 def get___import__(self):
153 return builtins.__import__
154 def restore___import__(self, import_):
155 builtins.__import__ = import_
156
157 def get_warnings_filters(self):
158 warnings = self.try_get_module('warnings')
159 return id(warnings.filters), warnings.filters, warnings.filters[:]
160 def restore_warnings_filters(self, saved_filters):
161 warnings = self.get_module('warnings')
162 warnings.filters = saved_filters[1]
163 warnings.filters[:] = saved_filters[2]
164
165 def get_asyncore_socket_map(self):
166 asyncore = sys.modules.get('test.support.asyncore')
167 # XXX Making a copy keeps objects alive until __exit__ gets called.
168 return asyncore and asyncore.socket_map.copy() or {}
169 def restore_asyncore_socket_map(self, saved_map):
170 asyncore = sys.modules.get('test.support.asyncore')
171 if asyncore is not None:
172 asyncore.close_all(ignore_all=True)
173 asyncore.socket_map.update(saved_map)
174
175 def get_shutil_archive_formats(self):
176 shutil = self.try_get_module('shutil')
177 # we could call get_archives_formats() but that only returns the
178 # registry keys; we want to check the values too (the functions that
179 # are registered)
180 return shutil._ARCHIVE_FORMATS, shutil._ARCHIVE_FORMATS.copy()
181 def restore_shutil_archive_formats(self, saved):
182 shutil = self.get_module('shutil')
183 shutil._ARCHIVE_FORMATS = saved[0]
184 shutil._ARCHIVE_FORMATS.clear()
185 shutil._ARCHIVE_FORMATS.update(saved[1])
186
187 def get_shutil_unpack_formats(self):
188 shutil = self.try_get_module('shutil')
189 return shutil._UNPACK_FORMATS, shutil._UNPACK_FORMATS.copy()
190 def restore_shutil_unpack_formats(self, saved):
191 shutil = self.get_module('shutil')
192 shutil._UNPACK_FORMATS = saved[0]
193 shutil._UNPACK_FORMATS.clear()
194 shutil._UNPACK_FORMATS.update(saved[1])
195
196 def get_logging__handlers(self):
197 logging = self.try_get_module('logging')
198 # _handlers is a WeakValueDictionary
199 return id(logging._handlers), logging._handlers, logging._handlers.copy()
200 def restore_logging__handlers(self, saved_handlers):
201 # Can't easily revert the logging state
202 pass
203
204 def get_logging__handlerList(self):
205 logging = self.try_get_module('logging')
206 # _handlerList is a list of weakrefs to handlers
207 return id(logging._handlerList), logging._handlerList, logging._handlerList[:]
208 def restore_logging__handlerList(self, saved_handlerList):
209 # Can't easily revert the logging state
210 pass
211
212 def get_sys_warnoptions(self):
213 return id(sys.warnoptions), sys.warnoptions, sys.warnoptions[:]
214 def restore_sys_warnoptions(self, saved_options):
215 sys.warnoptions = saved_options[1]
216 sys.warnoptions[:] = saved_options[2]
217
218 # Controlling dangling references to Thread objects can make it easier
219 # to track reference leaks.
220 def get_threading__dangling(self):
221 # This copies the weakrefs without making any strong reference
222 return threading._dangling.copy()
223 def restore_threading__dangling(self, saved):
224 threading._dangling.clear()
225 threading._dangling.update(saved)
226
227 # Same for Process objects
228 def get_multiprocessing_process__dangling(self):
229 multiprocessing_process = self.try_get_module('multiprocessing.process')
230 # Unjoined process objects can survive after process exits
231 multiprocessing_process._cleanup()
232 # This copies the weakrefs without making any strong reference
233 return multiprocessing_process._dangling.copy()
234 def restore_multiprocessing_process__dangling(self, saved):
235 multiprocessing_process = self.get_module('multiprocessing.process')
236 multiprocessing_process._dangling.clear()
237 multiprocessing_process._dangling.update(saved)
238
239 def get_sysconfig__CONFIG_VARS(self):
240 # make sure the dict is initialized
241 sysconfig = self.try_get_module('sysconfig')
242 sysconfig.get_config_var('prefix')
243 return (id(sysconfig._CONFIG_VARS), sysconfig._CONFIG_VARS,
244 dict(sysconfig._CONFIG_VARS))
245 def restore_sysconfig__CONFIG_VARS(self, saved):
246 sysconfig = self.get_module('sysconfig')
247 sysconfig._CONFIG_VARS = saved[1]
248 sysconfig._CONFIG_VARS.clear()
249 sysconfig._CONFIG_VARS.update(saved[2])
250
251 def get_sysconfig__INSTALL_SCHEMES(self):
252 sysconfig = self.try_get_module('sysconfig')
253 return (id(sysconfig._INSTALL_SCHEMES), sysconfig._INSTALL_SCHEMES,
254 sysconfig._INSTALL_SCHEMES.copy())
255 def restore_sysconfig__INSTALL_SCHEMES(self, saved):
256 sysconfig = self.get_module('sysconfig')
257 sysconfig._INSTALL_SCHEMES = saved[1]
258 sysconfig._INSTALL_SCHEMES.clear()
259 sysconfig._INSTALL_SCHEMES.update(saved[2])
260
261 def get_files(self):
262 # XXX: Maybe add an allow-list here?
263 return sorted(fn + ('/' if os.path.isdir(fn) else '')
264 for fn in os.listdir()
265 if not fn.startswith(".hypothesis"))
266 def restore_files(self, saved_value):
267 fn = os_helper.TESTFN
268 if fn not in saved_value and (fn + '/') not in saved_value:
269 if os.path.isfile(fn):
270 os_helper.unlink(fn)
271 elif os.path.isdir(fn):
272 os_helper.rmtree(fn)
273
274 _lc = [getattr(locale, lc) for lc in dir(locale)
275 if lc.startswith('LC_')]
276 def get_locale(self):
277 pairings = []
278 for lc in self._lc:
279 try:
280 pairings.append((lc, locale.setlocale(lc, None)))
281 except (TypeError, ValueError):
282 continue
283 return pairings
284 def restore_locale(self, saved):
285 for lc, setting in saved:
286 locale.setlocale(lc, setting)
287
288 def get_warnings_showwarning(self):
289 warnings = self.try_get_module('warnings')
290 return warnings.showwarning
291 def restore_warnings_showwarning(self, fxn):
292 warnings = self.get_module('warnings')
293 warnings.showwarning = fxn
294
295 def resource_info(self):
296 for name in self.resources:
297 method_suffix = name.replace('.', '_')
298 get_name = 'get_' + method_suffix
299 restore_name = 'restore_' + method_suffix
300 yield name, getattr(self, get_name), getattr(self, restore_name)
301
302 def __enter__(self):
303 self.saved_values = []
304 for name, get, restore in self.resource_info():
305 try:
306 original = get()
307 except SkipTestEnvironment:
308 continue
309
310 self.saved_values.append((name, get, restore, original))
311 return self
312
313 def __exit__(self, exc_type, exc_val, exc_tb):
314 saved_values = self.saved_values
315 self.saved_values = None
316
317 # Some resources use weak references
318 support.gc_collect()
319
320 for name, get, restore, original in saved_values:
321 current = get()
322 # Check for changes to the resource's value
323 if current != original:
324 support.environment_altered = True
325 restore(original)
326 if not self.quiet and not self.pgo:
327 print_warning(
328 f"{name} was modified by {self.test_name}\n"
329 f" Before: {original}\n"
330 f" After: {current} ")
331 return False