1 import unittest
2 import string
3 import subprocess
4 import sys
5 import sysconfig
6 import os
7 import pathlib
8 from test import support
9 from test.support.script_helper import (
10 make_script,
11 assert_python_failure,
12 assert_python_ok,
13 )
14 from test.support.os_helper import temp_dir
15
16
17 if not support.has_subprocess_support:
18 raise unittest.SkipTest("test module requires subprocess")
19
20
21 def supports_trampoline_profiling():
22 perf_trampoline = sysconfig.get_config_var("PY_HAVE_PERF_TRAMPOLINE")
23 if not perf_trampoline:
24 return False
25 return int(perf_trampoline) == 1
26
27
28 if not supports_trampoline_profiling():
29 raise unittest.SkipTest("perf trampoline profiling not supported")
30
31
32 class ESC[4;38;5;81mTestPerfTrampoline(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
33 def setUp(self):
34 super().setUp()
35 self.perf_files = set(pathlib.Path("/tmp/").glob("perf-*.map"))
36
37 def tearDown(self) -> None:
38 super().tearDown()
39 files_to_delete = (
40 set(pathlib.Path("/tmp/").glob("perf-*.map")) - self.perf_files
41 )
42 for file in files_to_delete:
43 file.unlink()
44
45 def test_trampoline_works(self):
46 code = """if 1:
47 def foo():
48 pass
49
50 def bar():
51 foo()
52
53 def baz():
54 bar()
55
56 baz()
57 """
58 with temp_dir() as script_dir:
59 script = make_script(script_dir, "perftest", code)
60 with subprocess.Popen(
61 [sys.executable, "-Xperf", script],
62 text=True,
63 stderr=subprocess.PIPE,
64 stdout=subprocess.PIPE,
65 ) as process:
66 stdout, stderr = process.communicate()
67
68 self.assertEqual(stderr, "")
69 self.assertEqual(stdout, "")
70
71 perf_file = pathlib.Path(f"/tmp/perf-{process.pid}.map")
72 self.assertTrue(perf_file.exists())
73 perf_file_contents = perf_file.read_text()
74 perf_lines = perf_file_contents.splitlines();
75 expected_symbols = [f"py::foo:{script}", f"py::bar:{script}", f"py::baz:{script}"]
76 for expected_symbol in expected_symbols:
77 perf_line = next((line for line in perf_lines if expected_symbol in line), None)
78 self.assertIsNotNone(perf_line, f"Could not find {expected_symbol} in perf file")
79 perf_addr = perf_line.split(" ")[0]
80 self.assertFalse(perf_addr.startswith("0x"), "Address should not be prefixed with 0x")
81 self.assertTrue(set(perf_addr).issubset(string.hexdigits), "Address should contain only hex characters")
82
83 def test_trampoline_works_with_forks(self):
84 code = """if 1:
85 import os, sys
86
87 def foo_fork():
88 pass
89
90 def bar_fork():
91 foo_fork()
92
93 def baz_fork():
94 bar_fork()
95
96 def foo():
97 pid = os.fork()
98 if pid == 0:
99 print(os.getpid())
100 baz_fork()
101 else:
102 _, status = os.waitpid(-1, 0)
103 sys.exit(status)
104
105 def bar():
106 foo()
107
108 def baz():
109 bar()
110
111 baz()
112 """
113 with temp_dir() as script_dir:
114 script = make_script(script_dir, "perftest", code)
115 with subprocess.Popen(
116 [sys.executable, "-Xperf", script],
117 text=True,
118 stderr=subprocess.PIPE,
119 stdout=subprocess.PIPE,
120 ) as process:
121 stdout, stderr = process.communicate()
122
123 self.assertEqual(process.returncode, 0)
124 self.assertEqual(stderr, "")
125 child_pid = int(stdout.strip())
126 perf_file = pathlib.Path(f"/tmp/perf-{process.pid}.map")
127 perf_child_file = pathlib.Path(f"/tmp/perf-{child_pid}.map")
128 self.assertTrue(perf_file.exists())
129 self.assertTrue(perf_child_file.exists())
130
131 perf_file_contents = perf_file.read_text()
132 self.assertIn(f"py::foo:{script}", perf_file_contents)
133 self.assertIn(f"py::bar:{script}", perf_file_contents)
134 self.assertIn(f"py::baz:{script}", perf_file_contents)
135
136 child_perf_file_contents = perf_child_file.read_text()
137 self.assertIn(f"py::foo_fork:{script}", child_perf_file_contents)
138 self.assertIn(f"py::bar_fork:{script}", child_perf_file_contents)
139 self.assertIn(f"py::baz_fork:{script}", child_perf_file_contents)
140
141 def test_sys_api(self):
142 code = """if 1:
143 import sys
144 def foo():
145 pass
146
147 def spam():
148 pass
149
150 def bar():
151 sys.deactivate_stack_trampoline()
152 foo()
153 sys.activate_stack_trampoline("perf")
154 spam()
155
156 def baz():
157 bar()
158
159 sys.activate_stack_trampoline("perf")
160 baz()
161 """
162 with temp_dir() as script_dir:
163 script = make_script(script_dir, "perftest", code)
164 with subprocess.Popen(
165 [sys.executable, script],
166 text=True,
167 stderr=subprocess.PIPE,
168 stdout=subprocess.PIPE,
169 ) as process:
170 stdout, stderr = process.communicate()
171
172 self.assertEqual(stderr, "")
173 self.assertEqual(stdout, "")
174
175 perf_file = pathlib.Path(f"/tmp/perf-{process.pid}.map")
176 self.assertTrue(perf_file.exists())
177 perf_file_contents = perf_file.read_text()
178 self.assertNotIn(f"py::foo:{script}", perf_file_contents)
179 self.assertIn(f"py::spam:{script}", perf_file_contents)
180 self.assertIn(f"py::bar:{script}", perf_file_contents)
181 self.assertIn(f"py::baz:{script}", perf_file_contents)
182
183 def test_sys_api_with_existing_trampoline(self):
184 code = """if 1:
185 import sys
186 sys.activate_stack_trampoline("perf")
187 sys.activate_stack_trampoline("perf")
188 """
189 assert_python_ok("-c", code)
190
191 def test_sys_api_with_invalid_trampoline(self):
192 code = """if 1:
193 import sys
194 sys.activate_stack_trampoline("invalid")
195 """
196 rc, out, err = assert_python_failure("-c", code)
197 self.assertIn("invalid backend: invalid", err.decode())
198
199 def test_sys_api_get_status(self):
200 code = """if 1:
201 import sys
202 sys.activate_stack_trampoline("perf")
203 assert sys.is_stack_trampoline_active() is True
204 sys.deactivate_stack_trampoline()
205 assert sys.is_stack_trampoline_active() is False
206 """
207 assert_python_ok("-c", code)
208
209
210 def is_unwinding_reliable():
211 cflags = sysconfig.get_config_var("PY_CORE_CFLAGS")
212 if not cflags:
213 return False
214 return "no-omit-frame-pointer" in cflags
215
216
217 def perf_command_works():
218 try:
219 cmd = ["perf", "--help"]
220 stdout = subprocess.check_output(cmd, text=True)
221 except (subprocess.SubprocessError, OSError):
222 return False
223
224 # perf version does not return a version number on Fedora. Use presence
225 # of "perf.data" in help as indicator that it's perf from Linux tools.
226 if "perf.data" not in stdout:
227 return False
228
229 # Check that we can run a simple perf run
230 with temp_dir() as script_dir:
231 try:
232 output_file = script_dir + "/perf_output.perf"
233 cmd = (
234 "perf",
235 "record",
236 "-g",
237 "--call-graph=fp",
238 "-o",
239 output_file,
240 "--",
241 sys.executable,
242 "-c",
243 'print("hello")',
244 )
245 stdout = subprocess.check_output(
246 cmd, cwd=script_dir, text=True, stderr=subprocess.STDOUT
247 )
248 except (subprocess.SubprocessError, OSError):
249 return False
250
251 if "hello" not in stdout:
252 return False
253
254 return True
255
256
257 def run_perf(cwd, *args, **env_vars):
258 if env_vars:
259 env = os.environ.copy()
260 env.update(env_vars)
261 else:
262 env = None
263 output_file = cwd + "/perf_output.perf"
264 base_cmd = ("perf", "record", "-g", "--call-graph=fp", "-o", output_file, "--")
265 proc = subprocess.run(
266 base_cmd + args,
267 stdout=subprocess.PIPE,
268 stderr=subprocess.PIPE,
269 env=env,
270 )
271 if proc.returncode:
272 print(proc.stderr)
273 raise ValueError(f"Perf failed with return code {proc.returncode}")
274
275 base_cmd = ("perf", "script")
276 proc = subprocess.run(
277 ("perf", "script", "-i", output_file),
278 stdout=subprocess.PIPE,
279 stderr=subprocess.PIPE,
280 env=env,
281 check=True,
282 )
283 return proc.stdout.decode("utf-8", "replace"), proc.stderr.decode(
284 "utf-8", "replace"
285 )
286
287
288 @unittest.skipUnless(perf_command_works(), "perf command doesn't work")
289 @unittest.skipUnless(is_unwinding_reliable(), "Unwinding is unreliable")
290 @support.skip_if_sanitizer(address=True, memory=True, ub=True)
291 class ESC[4;38;5;81mTestPerfProfiler(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
292 def setUp(self):
293 super().setUp()
294 self.perf_files = set(pathlib.Path("/tmp/").glob("perf-*.map"))
295
296 def tearDown(self) -> None:
297 super().tearDown()
298 files_to_delete = (
299 set(pathlib.Path("/tmp/").glob("perf-*.map")) - self.perf_files
300 )
301 for file in files_to_delete:
302 file.unlink()
303
304 def test_python_calls_appear_in_the_stack_if_perf_activated(self):
305 with temp_dir() as script_dir:
306 code = """if 1:
307 def foo(n):
308 x = 0
309 for i in range(n):
310 x += i
311
312 def bar(n):
313 foo(n)
314
315 def baz(n):
316 bar(n)
317
318 baz(10000000)
319 """
320 script = make_script(script_dir, "perftest", code)
321 stdout, stderr = run_perf(script_dir, sys.executable, "-Xperf", script)
322 self.assertEqual(stderr, "")
323
324 self.assertIn(f"py::foo:{script}", stdout)
325 self.assertIn(f"py::bar:{script}", stdout)
326 self.assertIn(f"py::baz:{script}", stdout)
327
328 def test_python_calls_do_not_appear_in_the_stack_if_perf_activated(self):
329 with temp_dir() as script_dir:
330 code = """if 1:
331 def foo(n):
332 x = 0
333 for i in range(n):
334 x += i
335
336 def bar(n):
337 foo(n)
338
339 def baz(n):
340 bar(n)
341
342 baz(10000000)
343 """
344 script = make_script(script_dir, "perftest", code)
345 stdout, stderr = run_perf(script_dir, sys.executable, script)
346 self.assertEqual(stderr, "")
347
348 self.assertNotIn(f"py::foo:{script}", stdout)
349 self.assertNotIn(f"py::bar:{script}", stdout)
350 self.assertNotIn(f"py::baz:{script}", stdout)
351
352
353 if __name__ == "__main__":
354 unittest.main()