1 # Common utility functions used by various script execution tests
2 # e.g. test_cmd_line, test_cmd_line_script and test_runpy
3
4 import collections
5 import importlib
6 import sys
7 import os
8 import os.path
9 import subprocess
10 import py_compile
11 import zipfile
12
13 from importlib.util import source_from_cache
14 from test import support
15 from test.support.import_helper import make_legacy_pyc
16
17
18 # Cached result of the expensive test performed in the function below.
19 __cached_interp_requires_environment = None
20
21
22 def interpreter_requires_environment():
23 """
24 Returns True if our sys.executable interpreter requires environment
25 variables in order to be able to run at all.
26
27 This is designed to be used with @unittest.skipIf() to annotate tests
28 that need to use an assert_python*() function to launch an isolated
29 mode (-I) or no environment mode (-E) sub-interpreter process.
30
31 A normal build & test does not run into this situation but it can happen
32 when trying to run the standard library test suite from an interpreter that
33 doesn't have an obvious home with Python's current home finding logic.
34
35 Setting PYTHONHOME is one way to get most of the testsuite to run in that
36 situation. PYTHONPATH or PYTHONUSERSITE are other common environment
37 variables that might impact whether or not the interpreter can start.
38 """
39 global __cached_interp_requires_environment
40 if __cached_interp_requires_environment is None:
41 # If PYTHONHOME is set, assume that we need it
42 if 'PYTHONHOME' in os.environ:
43 __cached_interp_requires_environment = True
44 return True
45 # cannot run subprocess, assume we don't need it
46 if not support.has_subprocess_support:
47 __cached_interp_requires_environment = False
48 return False
49
50 # Try running an interpreter with -E to see if it works or not.
51 try:
52 subprocess.check_call([sys.executable, '-E',
53 '-c', 'import sys; sys.exit(0)'])
54 except subprocess.CalledProcessError:
55 __cached_interp_requires_environment = True
56 else:
57 __cached_interp_requires_environment = False
58
59 return __cached_interp_requires_environment
60
61
62 class ESC[4;38;5;81m_PythonRunResult(ESC[4;38;5;149mcollectionsESC[4;38;5;149m.ESC[4;38;5;149mnamedtuple("_PythonRunResult",
63 ("rc", "out", "err"))):
64 """Helper for reporting Python subprocess run results"""
65 def fail(self, cmd_line):
66 """Provide helpful details about failed subcommand runs"""
67 # Limit to 80 lines to ASCII characters
68 maxlen = 80 * 100
69 out, err = self.out, self.err
70 if len(out) > maxlen:
71 out = b'(... truncated stdout ...)' + out[-maxlen:]
72 if len(err) > maxlen:
73 err = b'(... truncated stderr ...)' + err[-maxlen:]
74 out = out.decode('ascii', 'replace').rstrip()
75 err = err.decode('ascii', 'replace').rstrip()
76 raise AssertionError("Process return code is %d\n"
77 "command line: %r\n"
78 "\n"
79 "stdout:\n"
80 "---\n"
81 "%s\n"
82 "---\n"
83 "\n"
84 "stderr:\n"
85 "---\n"
86 "%s\n"
87 "---"
88 % (self.rc, cmd_line,
89 out,
90 err))
91
92
93 # Executing the interpreter in a subprocess
94 @support.requires_subprocess()
95 def run_python_until_end(*args, **env_vars):
96 env_required = interpreter_requires_environment()
97 cwd = env_vars.pop('__cwd', None)
98 if '__isolated' in env_vars:
99 isolated = env_vars.pop('__isolated')
100 else:
101 isolated = not env_vars and not env_required
102 cmd_line = [sys.executable, '-X', 'faulthandler']
103 if isolated:
104 # isolated mode: ignore Python environment variables, ignore user
105 # site-packages, and don't add the current directory to sys.path
106 cmd_line.append('-I')
107 elif not env_vars and not env_required:
108 # ignore Python environment variables
109 cmd_line.append('-E')
110
111 # But a special flag that can be set to override -- in this case, the
112 # caller is responsible to pass the full environment.
113 if env_vars.pop('__cleanenv', None):
114 env = {}
115 if sys.platform == 'win32':
116 # Windows requires at least the SYSTEMROOT environment variable to
117 # start Python.
118 env['SYSTEMROOT'] = os.environ['SYSTEMROOT']
119
120 # Other interesting environment variables, not copied currently:
121 # COMSPEC, HOME, PATH, TEMP, TMPDIR, TMP.
122 else:
123 # Need to preserve the original environment, for in-place testing of
124 # shared library builds.
125 env = os.environ.copy()
126
127 # set TERM='' unless the TERM environment variable is passed explicitly
128 # see issues #11390 and #18300
129 if 'TERM' not in env_vars:
130 env['TERM'] = ''
131
132 env.update(env_vars)
133 cmd_line.extend(args)
134 proc = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
135 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
136 env=env, cwd=cwd)
137 with proc:
138 try:
139 out, err = proc.communicate()
140 finally:
141 proc.kill()
142 subprocess._cleanup()
143 rc = proc.returncode
144 return _PythonRunResult(rc, out, err), cmd_line
145
146
147 @support.requires_subprocess()
148 def _assert_python(expected_success, /, *args, **env_vars):
149 res, cmd_line = run_python_until_end(*args, **env_vars)
150 if (res.rc and expected_success) or (not res.rc and not expected_success):
151 res.fail(cmd_line)
152 return res
153
154
155 def assert_python_ok(*args, **env_vars):
156 """
157 Assert that running the interpreter with `args` and optional environment
158 variables `env_vars` succeeds (rc == 0) and return a (return code, stdout,
159 stderr) tuple.
160
161 If the __cleanenv keyword is set, env_vars is used as a fresh environment.
162
163 Python is started in isolated mode (command line option -I),
164 except if the __isolated keyword is set to False.
165 """
166 return _assert_python(True, *args, **env_vars)
167
168
169 def assert_python_failure(*args, **env_vars):
170 """
171 Assert that running the interpreter with `args` and optional environment
172 variables `env_vars` fails (rc != 0) and return a (return code, stdout,
173 stderr) tuple.
174
175 See assert_python_ok() for more options.
176 """
177 return _assert_python(False, *args, **env_vars)
178
179
180 @support.requires_subprocess()
181 def spawn_python(*args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kw):
182 """Run a Python subprocess with the given arguments.
183
184 kw is extra keyword args to pass to subprocess.Popen. Returns a Popen
185 object.
186 """
187 cmd_line = [sys.executable]
188 if not interpreter_requires_environment():
189 cmd_line.append('-E')
190 cmd_line.extend(args)
191 # Under Fedora (?), GNU readline can output junk on stderr when initialized,
192 # depending on the TERM setting. Setting TERM=vt100 is supposed to disable
193 # that. References:
194 # - http://reinout.vanrees.org/weblog/2009/08/14/readline-invisible-character-hack.html
195 # - http://stackoverflow.com/questions/15760712/python-readline-module-prints-escape-character-during-import
196 # - http://lists.gnu.org/archive/html/bug-readline/2007-08/msg00004.html
197 env = kw.setdefault('env', dict(os.environ))
198 env['TERM'] = 'vt100'
199 return subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
200 stdout=stdout, stderr=stderr,
201 **kw)
202
203
204 def kill_python(p):
205 """Run the given Popen process until completion and return stdout."""
206 p.stdin.close()
207 data = p.stdout.read()
208 p.stdout.close()
209 # try to cleanup the child so we don't appear to leak when running
210 # with regrtest -R.
211 p.wait()
212 subprocess._cleanup()
213 return data
214
215
216 def make_script(script_dir, script_basename, source, omit_suffix=False):
217 script_filename = script_basename
218 if not omit_suffix:
219 script_filename += os.extsep + 'py'
220 script_name = os.path.join(script_dir, script_filename)
221 # The script should be encoded to UTF-8, the default string encoding
222 with open(script_name, 'w', encoding='utf-8') as script_file:
223 script_file.write(source)
224 importlib.invalidate_caches()
225 return script_name
226
227
228 def make_zip_script(zip_dir, zip_basename, script_name, name_in_zip=None):
229 zip_filename = zip_basename+os.extsep+'zip'
230 zip_name = os.path.join(zip_dir, zip_filename)
231 with zipfile.ZipFile(zip_name, 'w') as zip_file:
232 if name_in_zip is None:
233 parts = script_name.split(os.sep)
234 if len(parts) >= 2 and parts[-2] == '__pycache__':
235 legacy_pyc = make_legacy_pyc(source_from_cache(script_name))
236 name_in_zip = os.path.basename(legacy_pyc)
237 script_name = legacy_pyc
238 else:
239 name_in_zip = os.path.basename(script_name)
240 zip_file.write(script_name, name_in_zip)
241 #if test.support.verbose:
242 # with zipfile.ZipFile(zip_name, 'r') as zip_file:
243 # print 'Contents of %r:' % zip_name
244 # zip_file.printdir()
245 return zip_name, os.path.join(zip_name, name_in_zip)
246
247
248 def make_pkg(pkg_dir, init_source=''):
249 os.mkdir(pkg_dir)
250 make_script(pkg_dir, '__init__', init_source)
251
252
253 def make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
254 source, depth=1, compiled=False):
255 unlink = []
256 init_name = make_script(zip_dir, '__init__', '')
257 unlink.append(init_name)
258 init_basename = os.path.basename(init_name)
259 script_name = make_script(zip_dir, script_basename, source)
260 unlink.append(script_name)
261 if compiled:
262 init_name = py_compile.compile(init_name, doraise=True)
263 script_name = py_compile.compile(script_name, doraise=True)
264 unlink.extend((init_name, script_name))
265 pkg_names = [os.sep.join([pkg_name]*i) for i in range(1, depth+1)]
266 script_name_in_zip = os.path.join(pkg_names[-1], os.path.basename(script_name))
267 zip_filename = zip_basename+os.extsep+'zip'
268 zip_name = os.path.join(zip_dir, zip_filename)
269 with zipfile.ZipFile(zip_name, 'w') as zip_file:
270 for name in pkg_names:
271 init_name_in_zip = os.path.join(name, init_basename)
272 zip_file.write(init_name, init_name_in_zip)
273 zip_file.write(script_name, script_name_in_zip)
274 for name in unlink:
275 os.unlink(name)
276 #if test.support.verbose:
277 # with zipfile.ZipFile(zip_name, 'r') as zip_file:
278 # print 'Contents of %r:' % zip_name
279 # zip_file.printdir()
280 return zip_name, os.path.join(zip_name, script_name_in_zip)
281
282
283 @support.requires_subprocess()
284 def run_test_script(script):
285 # use -u to try to get the full output if the test hangs or crash
286 if support.verbose:
287 def title(text):
288 return f"===== {text} ======"
289
290 name = f"script {os.path.basename(script)}"
291 print()
292 print(title(name), flush=True)
293 # In verbose mode, the child process inherit stdout and stdout,
294 # to see output in realtime and reduce the risk of losing output.
295 args = [sys.executable, "-E", "-X", "faulthandler", "-u", script, "-v"]
296 proc = subprocess.run(args)
297 print(title(f"{name} completed: exit code {proc.returncode}"),
298 flush=True)
299 if proc.returncode:
300 raise AssertionError(f"{name} failed")
301 else:
302 assert_python_ok("-u", script, "-v")