1 import dis
2 import os.path
3 import re
4 import subprocess
5 import sys
6 import sysconfig
7 import types
8 import unittest
9
10 from test import support
11 from test.support import findfile
12
13
14 if not support.has_subprocess_support:
15 raise unittest.SkipTest("test module requires subprocess")
16
17
18 def abspath(filename):
19 return os.path.abspath(findfile(filename, subdir="dtracedata"))
20
21
22 def normalize_trace_output(output):
23 """Normalize DTrace output for comparison.
24
25 DTrace keeps a per-CPU buffer, and when showing the fired probes, buffers
26 are concatenated. So if the operating system moves our thread around, the
27 straight result can be "non-causal". So we add timestamps to the probe
28 firing, sort by that field, then strip it from the output"""
29
30 # When compiling with '--with-pydebug', strip '[# refs]' debug output.
31 output = re.sub(r"\[[0-9]+ refs\]", "", output)
32 try:
33 result = [
34 row.split("\t")
35 for row in output.splitlines()
36 if row and not row.startswith('#')
37 ]
38 result.sort(key=lambda row: int(row[0]))
39 result = [row[1] for row in result]
40 return "\n".join(result)
41 except (IndexError, ValueError):
42 raise AssertionError(
43 "tracer produced unparsable output:\n{}".format(output)
44 )
45
46
47 class ESC[4;38;5;81mTraceBackend:
48 EXTENSION = None
49 COMMAND = None
50 COMMAND_ARGS = []
51
52 def run_case(self, name, optimize_python=None):
53 actual_output = normalize_trace_output(self.trace_python(
54 script_file=abspath(name + self.EXTENSION),
55 python_file=abspath(name + ".py"),
56 optimize_python=optimize_python))
57
58 with open(abspath(name + self.EXTENSION + ".expected")) as f:
59 expected_output = f.read().rstrip()
60
61 return (expected_output, actual_output)
62
63 def generate_trace_command(self, script_file, subcommand=None):
64 command = self.COMMAND + [script_file]
65 if subcommand:
66 command += ["-c", subcommand]
67 return command
68
69 def trace(self, script_file, subcommand=None):
70 command = self.generate_trace_command(script_file, subcommand)
71 stdout, _ = subprocess.Popen(command,
72 stdout=subprocess.PIPE,
73 stderr=subprocess.STDOUT,
74 universal_newlines=True).communicate()
75 return stdout
76
77 def trace_python(self, script_file, python_file, optimize_python=None):
78 python_flags = []
79 if optimize_python:
80 python_flags.extend(["-O"] * optimize_python)
81 subcommand = " ".join([sys.executable] + python_flags + [python_file])
82 return self.trace(script_file, subcommand)
83
84 def assert_usable(self):
85 try:
86 output = self.trace(abspath("assert_usable" + self.EXTENSION))
87 output = output.strip()
88 except (FileNotFoundError, NotADirectoryError, PermissionError) as fnfe:
89 output = str(fnfe)
90 if output != "probe: success":
91 raise unittest.SkipTest(
92 "{}(1) failed: {}".format(self.COMMAND[0], output)
93 )
94
95
96 class ESC[4;38;5;81mDTraceBackend(ESC[4;38;5;149mTraceBackend):
97 EXTENSION = ".d"
98 COMMAND = ["dtrace", "-q", "-s"]
99
100
101 class ESC[4;38;5;81mSystemTapBackend(ESC[4;38;5;149mTraceBackend):
102 EXTENSION = ".stp"
103 COMMAND = ["stap", "-g"]
104
105
106 class ESC[4;38;5;81mTraceTests:
107 # unittest.TestCase options
108 maxDiff = None
109
110 # TraceTests options
111 backend = None
112 optimize_python = 0
113
114 @classmethod
115 def setUpClass(self):
116 self.backend.assert_usable()
117
118 def run_case(self, name):
119 actual_output, expected_output = self.backend.run_case(
120 name, optimize_python=self.optimize_python)
121 self.assertEqual(actual_output, expected_output)
122
123 def test_function_entry_return(self):
124 self.run_case("call_stack")
125
126 def test_verify_call_opcodes(self):
127 """Ensure our call stack test hits all function call opcodes"""
128
129 opcodes = set(["CALL_FUNCTION", "CALL_FUNCTION_EX", "CALL_FUNCTION_KW"])
130
131 with open(abspath("call_stack.py")) as f:
132 code_string = f.read()
133
134 def get_function_instructions(funcname):
135 # Recompile with appropriate optimization setting
136 code = compile(source=code_string,
137 filename="<string>",
138 mode="exec",
139 optimize=self.optimize_python)
140
141 for c in code.co_consts:
142 if isinstance(c, types.CodeType) and c.co_name == funcname:
143 return dis.get_instructions(c)
144 return []
145
146 for instruction in get_function_instructions('start'):
147 opcodes.discard(instruction.opname)
148
149 self.assertEqual(set(), opcodes)
150
151 def test_gc(self):
152 self.run_case("gc")
153
154 def test_line(self):
155 self.run_case("line")
156
157
158 class ESC[4;38;5;81mDTraceNormalTests(ESC[4;38;5;149mTraceTests, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
159 backend = DTraceBackend()
160 optimize_python = 0
161
162
163 class ESC[4;38;5;81mDTraceOptimizedTests(ESC[4;38;5;149mTraceTests, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
164 backend = DTraceBackend()
165 optimize_python = 2
166
167
168 class ESC[4;38;5;81mSystemTapNormalTests(ESC[4;38;5;149mTraceTests, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
169 backend = SystemTapBackend()
170 optimize_python = 0
171
172
173 class ESC[4;38;5;81mSystemTapOptimizedTests(ESC[4;38;5;149mTraceTests, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
174 backend = SystemTapBackend()
175 optimize_python = 2
176
177 class ESC[4;38;5;81mCheckDtraceProbes(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
178 @classmethod
179 def setUpClass(cls):
180 if sysconfig.get_config_var('WITH_DTRACE'):
181 readelf_major_version, readelf_minor_version = cls.get_readelf_version()
182 if support.verbose:
183 print(f"readelf version: {readelf_major_version}.{readelf_minor_version}")
184 else:
185 raise unittest.SkipTest("CPython must be configured with the --with-dtrace option.")
186
187
188 @staticmethod
189 def get_readelf_version():
190 try:
191 cmd = ["readelf", "--version"]
192 proc = subprocess.Popen(
193 cmd,
194 stdout=subprocess.PIPE,
195 stderr=subprocess.PIPE,
196 universal_newlines=True,
197 )
198 with proc:
199 version, stderr = proc.communicate()
200
201 if proc.returncode:
202 raise Exception(
203 f"Command {' '.join(cmd)!r} failed "
204 f"with exit code {proc.returncode}: "
205 f"stdout={version!r} stderr={stderr!r}"
206 )
207 except OSError:
208 raise unittest.SkipTest("Couldn't find readelf on the path")
209
210 # Regex to parse:
211 # 'GNU readelf (GNU Binutils) 2.40.0\n' -> 2.40
212 match = re.search(r"^(?:GNU) readelf.*?\b(\d+)\.(\d+)", version)
213 if match is None:
214 raise unittest.SkipTest(f"Unable to parse readelf version: {version}")
215
216 return int(match.group(1)), int(match.group(2))
217
218 def get_readelf_output(self):
219 command = ["readelf", "-n", sys.executable]
220 stdout, _ = subprocess.Popen(
221 command,
222 stdout=subprocess.PIPE,
223 stderr=subprocess.STDOUT,
224 universal_newlines=True,
225 ).communicate()
226 return stdout
227
228 def test_check_probes(self):
229 readelf_output = self.get_readelf_output()
230
231 available_probe_names = [
232 "Name: import__find__load__done",
233 "Name: import__find__load__start",
234 "Name: audit",
235 "Name: gc__start",
236 "Name: gc__done",
237 ]
238
239 for probe_name in available_probe_names:
240 with self.subTest(probe_name=probe_name):
241 self.assertIn(probe_name, readelf_output)
242
243 @unittest.expectedFailure
244 def test_missing_probes(self):
245 readelf_output = self.get_readelf_output()
246
247 # Missing probes will be added in the future.
248 missing_probe_names = [
249 "Name: function__entry",
250 "Name: function__return",
251 "Name: line",
252 ]
253
254 for probe_name in missing_probe_names:
255 with self.subTest(probe_name=probe_name):
256 self.assertIn(probe_name, readelf_output)
257
258
259 if __name__ == '__main__':
260 unittest.main()