1 """Tests for sys.audit and sys.addaudithook
2 """
3
4 import subprocess
5 import sys
6 import unittest
7 from test import support
8 from test.support import import_helper
9 from test.support import os_helper
10
11
12 if not hasattr(sys, "addaudithook") or not hasattr(sys, "audit"):
13 raise unittest.SkipTest("test only relevant when sys.audit is available")
14
15 AUDIT_TESTS_PY = support.findfile("audit-tests.py")
16
17
18 class ESC[4;38;5;81mAuditTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
19 maxDiff = None
20
21 @support.requires_subprocess()
22 def do_test(self, *args):
23 with subprocess.Popen(
24 [sys.executable, "-X utf8", AUDIT_TESTS_PY, *args],
25 encoding="utf-8",
26 stdout=subprocess.PIPE,
27 stderr=subprocess.PIPE,
28 ) as p:
29 p.wait()
30 sys.stdout.writelines(p.stdout)
31 sys.stderr.writelines(p.stderr)
32 if p.returncode:
33 self.fail("".join(p.stderr))
34
35 @support.requires_subprocess()
36 def run_python(self, *args):
37 events = []
38 with subprocess.Popen(
39 [sys.executable, "-X utf8", AUDIT_TESTS_PY, *args],
40 encoding="utf-8",
41 stdout=subprocess.PIPE,
42 stderr=subprocess.PIPE,
43 ) as p:
44 p.wait()
45 sys.stderr.writelines(p.stderr)
46 return (
47 p.returncode,
48 [line.strip().partition(" ") for line in p.stdout],
49 "".join(p.stderr),
50 )
51
52 def test_basic(self):
53 self.do_test("test_basic")
54
55 def test_block_add_hook(self):
56 self.do_test("test_block_add_hook")
57
58 def test_block_add_hook_baseexception(self):
59 self.do_test("test_block_add_hook_baseexception")
60
61 def test_marshal(self):
62 import_helper.import_module("marshal")
63
64 self.do_test("test_marshal")
65
66 def test_pickle(self):
67 import_helper.import_module("pickle")
68
69 self.do_test("test_pickle")
70
71 def test_monkeypatch(self):
72 self.do_test("test_monkeypatch")
73
74 def test_open(self):
75 self.do_test("test_open", os_helper.TESTFN)
76
77 def test_cantrace(self):
78 self.do_test("test_cantrace")
79
80 def test_mmap(self):
81 self.do_test("test_mmap")
82
83 def test_excepthook(self):
84 returncode, events, stderr = self.run_python("test_excepthook")
85 if not returncode:
86 self.fail(f"Expected fatal exception\n{stderr}")
87
88 self.assertSequenceEqual(
89 [("sys.excepthook", " ", "RuntimeError('fatal-error')")], events
90 )
91
92 def test_unraisablehook(self):
93 returncode, events, stderr = self.run_python("test_unraisablehook")
94 if returncode:
95 self.fail(stderr)
96
97 self.assertEqual(events[0][0], "sys.unraisablehook")
98 self.assertEqual(
99 events[0][2],
100 "RuntimeError('nonfatal-error') Exception ignored for audit hook test",
101 )
102
103 def test_winreg(self):
104 import_helper.import_module("winreg")
105 returncode, events, stderr = self.run_python("test_winreg")
106 if returncode:
107 self.fail(stderr)
108
109 self.assertEqual(events[0][0], "winreg.OpenKey")
110 self.assertEqual(events[1][0], "winreg.OpenKey/result")
111 expected = events[1][2]
112 self.assertTrue(expected)
113 self.assertSequenceEqual(["winreg.EnumKey", " ", f"{expected} 0"], events[2])
114 self.assertSequenceEqual(["winreg.EnumKey", " ", f"{expected} 10000"], events[3])
115 self.assertSequenceEqual(["winreg.PyHKEY.Detach", " ", expected], events[4])
116
117 def test_socket(self):
118 import_helper.import_module("socket")
119 returncode, events, stderr = self.run_python("test_socket")
120 if returncode:
121 self.fail(stderr)
122
123 if support.verbose:
124 print(*events, sep='\n')
125 self.assertEqual(events[0][0], "socket.gethostname")
126 self.assertEqual(events[1][0], "socket.__new__")
127 self.assertEqual(events[2][0], "socket.bind")
128 self.assertTrue(events[2][2].endswith("('127.0.0.1', 8080)"))
129
130 def test_gc(self):
131 returncode, events, stderr = self.run_python("test_gc")
132 if returncode:
133 self.fail(stderr)
134
135 if support.verbose:
136 print(*events, sep='\n')
137 self.assertEqual(
138 [event[0] for event in events],
139 ["gc.get_objects", "gc.get_referrers", "gc.get_referents"]
140 )
141
142
143 def test_http(self):
144 import_helper.import_module("http.client")
145 returncode, events, stderr = self.run_python("test_http_client")
146 if returncode:
147 self.fail(stderr)
148
149 if support.verbose:
150 print(*events, sep='\n')
151 self.assertEqual(events[0][0], "http.client.connect")
152 self.assertEqual(events[0][2], "www.python.org 80")
153 self.assertEqual(events[1][0], "http.client.send")
154 if events[1][2] != '[cannot send]':
155 self.assertIn('HTTP', events[1][2])
156
157
158 def test_sqlite3(self):
159 sqlite3 = import_helper.import_module("sqlite3")
160 returncode, events, stderr = self.run_python("test_sqlite3")
161 if returncode:
162 self.fail(stderr)
163
164 if support.verbose:
165 print(*events, sep='\n')
166 actual = [ev[0] for ev in events]
167 expected = ["sqlite3.connect", "sqlite3.connect/handle"] * 2
168
169 if hasattr(sqlite3.Connection, "enable_load_extension"):
170 expected += [
171 "sqlite3.enable_load_extension",
172 "sqlite3.load_extension",
173 ]
174 self.assertEqual(actual, expected)
175
176
177 def test_sys_getframe(self):
178 returncode, events, stderr = self.run_python("test_sys_getframe")
179 if returncode:
180 self.fail(stderr)
181
182 if support.verbose:
183 print(*events, sep='\n')
184 actual = [(ev[0], ev[2]) for ev in events]
185 expected = [("sys._getframe", "test_sys_getframe")]
186
187 self.assertEqual(actual, expected)
188
189 def test_sys_getframemodulename(self):
190 returncode, events, stderr = self.run_python("test_sys_getframemodulename")
191 if returncode:
192 self.fail(stderr)
193
194 if support.verbose:
195 print(*events, sep='\n')
196 actual = [(ev[0], ev[2]) for ev in events]
197 expected = [("sys._getframemodulename", "0")]
198
199 self.assertEqual(actual, expected)
200
201
202 def test_threading(self):
203 returncode, events, stderr = self.run_python("test_threading")
204 if returncode:
205 self.fail(stderr)
206
207 if support.verbose:
208 print(*events, sep='\n')
209 actual = [(ev[0], ev[2]) for ev in events]
210 expected = [
211 ("_thread.start_new_thread", "(<test_func>, (), None)"),
212 ("test.test_func", "()"),
213 ]
214
215 self.assertEqual(actual, expected)
216
217
218 def test_wmi_exec_query(self):
219 import_helper.import_module("_wmi")
220 returncode, events, stderr = self.run_python("test_wmi_exec_query")
221 if returncode:
222 self.fail(stderr)
223
224 if support.verbose:
225 print(*events, sep='\n')
226 actual = [(ev[0], ev[2]) for ev in events]
227 expected = [("_wmi.exec_query", "SELECT * FROM Win32_OperatingSystem")]
228
229 self.assertEqual(actual, expected)
230
231 def test_syslog(self):
232 syslog = import_helper.import_module("syslog")
233
234 returncode, events, stderr = self.run_python("test_syslog")
235 if returncode:
236 self.fail(stderr)
237
238 if support.verbose:
239 print('Events:', *events, sep='\n ')
240
241 self.assertSequenceEqual(
242 events,
243 [('syslog.openlog', ' ', f'python 0 {syslog.LOG_USER}'),
244 ('syslog.syslog', ' ', f'{syslog.LOG_INFO} test'),
245 ('syslog.setlogmask', ' ', f'{syslog.LOG_DEBUG}'),
246 ('syslog.closelog', '', ''),
247 ('syslog.syslog', ' ', f'{syslog.LOG_INFO} test2'),
248 ('syslog.openlog', ' ', f'audit-tests.py 0 {syslog.LOG_USER}'),
249 ('syslog.openlog', ' ', f'audit-tests.py {syslog.LOG_NDELAY} {syslog.LOG_LOCAL0}'),
250 ('syslog.openlog', ' ', f'None 0 {syslog.LOG_USER}'),
251 ('syslog.closelog', '', '')]
252 )
253
254 def test_not_in_gc(self):
255 returncode, _, stderr = self.run_python("test_not_in_gc")
256 if returncode:
257 self.fail(stderr)
258
259
260 def test_sys_monitoring_register_callback(self):
261 returncode, events, stderr = self.run_python("test_sys_monitoring_register_callback")
262 if returncode:
263 self.fail(stderr)
264
265 if support.verbose:
266 print(*events, sep='\n')
267 actual = [(ev[0], ev[2]) for ev in events]
268 expected = [("sys.monitoring.register_callback", "(None,)")]
269
270 self.assertEqual(actual, expected)
271
272
273 if __name__ == "__main__":
274 unittest.main()