python (3.12.0)
1 """Test suite for the sys.monitoring."""
2
3 import collections
4 import dis
5 import functools
6 import operator
7 import sys
8 import textwrap
9 import types
10 import unittest
11 import asyncio
12
13 PAIR = (0,1)
14
15 def f1():
16 pass
17
18 def f2():
19 len([])
20 sys.getsizeof(0)
21
22 def floop():
23 for item in PAIR:
24 pass
25
26 def gen():
27 yield
28 yield
29
30 def g1():
31 for _ in gen():
32 pass
33
34 TEST_TOOL = 2
35 TEST_TOOL2 = 3
36 TEST_TOOL3 = 4
37
38 class ESC[4;38;5;81mMonitoringBasicTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
39
40 def test_has_objects(self):
41 m = sys.monitoring
42 m.events
43 m.use_tool_id
44 m.free_tool_id
45 m.get_tool
46 m.get_events
47 m.set_events
48 m.get_local_events
49 m.set_local_events
50 m.register_callback
51 m.restart_events
52 m.DISABLE
53 m.MISSING
54 m.events.NO_EVENTS
55
56 def test_tool(self):
57 sys.monitoring.use_tool_id(TEST_TOOL, "MonitoringTest.Tool")
58 self.assertEqual(sys.monitoring.get_tool(TEST_TOOL), "MonitoringTest.Tool")
59 sys.monitoring.set_events(TEST_TOOL, 15)
60 self.assertEqual(sys.monitoring.get_events(TEST_TOOL), 15)
61 sys.monitoring.set_events(TEST_TOOL, 0)
62 with self.assertRaises(ValueError):
63 sys.monitoring.set_events(TEST_TOOL, sys.monitoring.events.C_RETURN)
64 with self.assertRaises(ValueError):
65 sys.monitoring.set_events(TEST_TOOL, sys.monitoring.events.C_RAISE)
66 sys.monitoring.free_tool_id(TEST_TOOL)
67 self.assertEqual(sys.monitoring.get_tool(TEST_TOOL), None)
68 with self.assertRaises(ValueError):
69 sys.monitoring.set_events(TEST_TOOL, sys.monitoring.events.CALL)
70
71
72 class ESC[4;38;5;81mMonitoringTestBase:
73
74 def setUp(self):
75 # Check that a previous test hasn't left monitoring on.
76 for tool in range(6):
77 self.assertEqual(sys.monitoring.get_events(tool), 0)
78 self.assertIs(sys.monitoring.get_tool(TEST_TOOL), None)
79 self.assertIs(sys.monitoring.get_tool(TEST_TOOL2), None)
80 self.assertIs(sys.monitoring.get_tool(TEST_TOOL3), None)
81 sys.monitoring.use_tool_id(TEST_TOOL, "test " + self.__class__.__name__)
82 sys.monitoring.use_tool_id(TEST_TOOL2, "test2 " + self.__class__.__name__)
83 sys.monitoring.use_tool_id(TEST_TOOL3, "test3 " + self.__class__.__name__)
84
85 def tearDown(self):
86 # Check that test hasn't left monitoring on.
87 for tool in range(6):
88 self.assertEqual(sys.monitoring.get_events(tool), 0)
89 sys.monitoring.free_tool_id(TEST_TOOL)
90 sys.monitoring.free_tool_id(TEST_TOOL2)
91 sys.monitoring.free_tool_id(TEST_TOOL3)
92
93
94 class ESC[4;38;5;81mMonitoringCountTest(ESC[4;38;5;149mMonitoringTestBase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
95
96 def check_event_count(self, func, event, expected):
97
98 class ESC[4;38;5;81mCounter:
99 def __init__(self):
100 self.count = 0
101 def __call__(self, *args):
102 self.count += 1
103
104 counter = Counter()
105 sys.monitoring.register_callback(TEST_TOOL, event, counter)
106 if event == E.C_RETURN or event == E.C_RAISE:
107 sys.monitoring.set_events(TEST_TOOL, E.CALL)
108 else:
109 sys.monitoring.set_events(TEST_TOOL, event)
110 self.assertEqual(counter.count, 0)
111 counter.count = 0
112 func()
113 self.assertEqual(counter.count, expected)
114 prev = sys.monitoring.register_callback(TEST_TOOL, event, None)
115 counter.count = 0
116 func()
117 self.assertEqual(counter.count, 0)
118 self.assertEqual(prev, counter)
119 sys.monitoring.set_events(TEST_TOOL, 0)
120
121 def test_start_count(self):
122 self.check_event_count(f1, E.PY_START, 1)
123
124 def test_resume_count(self):
125 self.check_event_count(g1, E.PY_RESUME, 2)
126
127 def test_return_count(self):
128 self.check_event_count(f1, E.PY_RETURN, 1)
129
130 def test_call_count(self):
131 self.check_event_count(f2, E.CALL, 3)
132
133 def test_c_return_count(self):
134 self.check_event_count(f2, E.C_RETURN, 2)
135
136
137 E = sys.monitoring.events
138
139 INSTRUMENTED_EVENTS = [
140 (E.PY_START, "start"),
141 (E.PY_RESUME, "resume"),
142 (E.PY_RETURN, "return"),
143 (E.PY_YIELD, "yield"),
144 (E.JUMP, "jump"),
145 (E.BRANCH, "branch"),
146 ]
147
148 EXCEPT_EVENTS = [
149 (E.RAISE, "raise"),
150 (E.PY_UNWIND, "unwind"),
151 (E.EXCEPTION_HANDLED, "exception_handled"),
152 ]
153
154 SIMPLE_EVENTS = INSTRUMENTED_EVENTS + EXCEPT_EVENTS + [
155 (E.C_RAISE, "c_raise"),
156 (E.C_RETURN, "c_return"),
157 ]
158
159
160 SIMPLE_EVENT_SET = functools.reduce(operator.or_, [ev for (ev, _) in SIMPLE_EVENTS], 0) | E.CALL
161
162
163 def just_pass():
164 pass
165
166 just_pass.events = [
167 "py_call",
168 "start",
169 "return",
170 ]
171
172 def just_raise():
173 raise Exception
174
175 just_raise.events = [
176 'py_call',
177 "start",
178 "raise",
179 "unwind",
180 ]
181
182 def just_call():
183 len([])
184
185 just_call.events = [
186 'py_call',
187 "start",
188 "c_call",
189 "c_return",
190 "return",
191 ]
192
193 def caught():
194 try:
195 1/0
196 except Exception:
197 pass
198
199 caught.events = [
200 'py_call',
201 "start",
202 "raise",
203 "exception_handled",
204 "branch",
205 "return",
206 ]
207
208 def nested_call():
209 just_pass()
210
211 nested_call.events = [
212 "py_call",
213 "start",
214 "py_call",
215 "start",
216 "return",
217 "return",
218 ]
219
220 PY_CALLABLES = (types.FunctionType, types.MethodType)
221
222 class ESC[4;38;5;81mMonitoringEventsBase(ESC[4;38;5;149mMonitoringTestBase):
223
224 def gather_events(self, func):
225 events = []
226 for event, event_name in SIMPLE_EVENTS:
227 def record(*args, event_name=event_name):
228 events.append(event_name)
229 sys.monitoring.register_callback(TEST_TOOL, event, record)
230 def record_call(code, offset, obj, arg):
231 if isinstance(obj, PY_CALLABLES):
232 events.append("py_call")
233 else:
234 events.append("c_call")
235 sys.monitoring.register_callback(TEST_TOOL, E.CALL, record_call)
236 sys.monitoring.set_events(TEST_TOOL, SIMPLE_EVENT_SET)
237 events = []
238 try:
239 func()
240 except:
241 pass
242 sys.monitoring.set_events(TEST_TOOL, 0)
243 #Remove the final event, the call to `sys.monitoring.set_events`
244 events = events[:-1]
245 return events
246
247 def check_events(self, func, expected=None):
248 events = self.gather_events(func)
249 if expected is None:
250 expected = func.events
251 self.assertEqual(events, expected)
252
253 class ESC[4;38;5;81mMonitoringEventsTest(ESC[4;38;5;149mMonitoringEventsBase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
254
255 def test_just_pass(self):
256 self.check_events(just_pass)
257
258 def test_just_raise(self):
259 try:
260 self.check_events(just_raise)
261 except Exception:
262 pass
263 self.assertEqual(sys.monitoring.get_events(TEST_TOOL), 0)
264
265 def test_just_call(self):
266 self.check_events(just_call)
267
268 def test_caught(self):
269 self.check_events(caught)
270
271 def test_nested_call(self):
272 self.check_events(nested_call)
273
274 UP_EVENTS = (E.C_RETURN, E.C_RAISE, E.PY_RETURN, E.PY_UNWIND, E.PY_YIELD)
275 DOWN_EVENTS = (E.PY_START, E.PY_RESUME)
276
277 from test.profilee import testfunc
278
279 class ESC[4;38;5;81mSimulateProfileTest(ESC[4;38;5;149mMonitoringEventsBase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
280
281 def test_balanced(self):
282 events = self.gather_events(testfunc)
283 c = collections.Counter(events)
284 self.assertEqual(c["c_call"], c["c_return"])
285 self.assertEqual(c["start"], c["return"] + c["unwind"])
286 self.assertEqual(c["raise"], c["exception_handled"] + c["unwind"])
287
288 def test_frame_stack(self):
289 self.maxDiff = None
290 stack = []
291 errors = []
292 seen = set()
293 def up(*args):
294 frame = sys._getframe(1)
295 if not stack:
296 errors.append("empty")
297 else:
298 expected = stack.pop()
299 if frame != expected:
300 errors.append(f" Popping {frame} expected {expected}")
301 def down(*args):
302 frame = sys._getframe(1)
303 stack.append(frame)
304 seen.add(frame.f_code)
305 def call(code, offset, callable, arg):
306 if not isinstance(callable, PY_CALLABLES):
307 stack.append(sys._getframe(1))
308 for event in UP_EVENTS:
309 sys.monitoring.register_callback(TEST_TOOL, event, up)
310 for event in DOWN_EVENTS:
311 sys.monitoring.register_callback(TEST_TOOL, event, down)
312 sys.monitoring.register_callback(TEST_TOOL, E.CALL, call)
313 sys.monitoring.set_events(TEST_TOOL, SIMPLE_EVENT_SET)
314 testfunc()
315 sys.monitoring.set_events(TEST_TOOL, 0)
316 self.assertEqual(errors, [])
317 self.assertEqual(stack, [sys._getframe()])
318 self.assertEqual(len(seen), 9)
319
320
321 class ESC[4;38;5;81mCounterWithDisable:
322
323 def __init__(self):
324 self.disable = False
325 self.count = 0
326
327 def __call__(self, *args):
328 self.count += 1
329 if self.disable:
330 return sys.monitoring.DISABLE
331
332
333 class ESC[4;38;5;81mRecorderWithDisable:
334
335 def __init__(self, events):
336 self.disable = False
337 self.events = events
338
339 def __call__(self, code, event):
340 self.events.append(event)
341 if self.disable:
342 return sys.monitoring.DISABLE
343
344
345 class ESC[4;38;5;81mMontoringDisableAndRestartTest(ESC[4;38;5;149mMonitoringTestBase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
346
347 def test_disable(self):
348 try:
349 counter = CounterWithDisable()
350 sys.monitoring.register_callback(TEST_TOOL, E.PY_START, counter)
351 sys.monitoring.set_events(TEST_TOOL, E.PY_START)
352 self.assertEqual(counter.count, 0)
353 counter.count = 0
354 f1()
355 self.assertEqual(counter.count, 1)
356 counter.disable = True
357 counter.count = 0
358 f1()
359 self.assertEqual(counter.count, 1)
360 counter.count = 0
361 f1()
362 self.assertEqual(counter.count, 0)
363 sys.monitoring.set_events(TEST_TOOL, 0)
364 finally:
365 sys.monitoring.restart_events()
366
367 def test_restart(self):
368 try:
369 counter = CounterWithDisable()
370 sys.monitoring.register_callback(TEST_TOOL, E.PY_START, counter)
371 sys.monitoring.set_events(TEST_TOOL, E.PY_START)
372 counter.disable = True
373 f1()
374 counter.count = 0
375 f1()
376 self.assertEqual(counter.count, 0)
377 sys.monitoring.restart_events()
378 counter.count = 0
379 f1()
380 self.assertEqual(counter.count, 1)
381 sys.monitoring.set_events(TEST_TOOL, 0)
382 finally:
383 sys.monitoring.restart_events()
384
385
386 class ESC[4;38;5;81mMultipleMonitorsTest(ESC[4;38;5;149mMonitoringTestBase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
387
388 def test_two_same(self):
389 try:
390 self.assertEqual(sys.monitoring._all_events(), {})
391 counter1 = CounterWithDisable()
392 counter2 = CounterWithDisable()
393 sys.monitoring.register_callback(TEST_TOOL, E.PY_START, counter1)
394 sys.monitoring.register_callback(TEST_TOOL2, E.PY_START, counter2)
395 sys.monitoring.set_events(TEST_TOOL, E.PY_START)
396 sys.monitoring.set_events(TEST_TOOL2, E.PY_START)
397 self.assertEqual(sys.monitoring.get_events(TEST_TOOL), E.PY_START)
398 self.assertEqual(sys.monitoring.get_events(TEST_TOOL2), E.PY_START)
399 self.assertEqual(sys.monitoring._all_events(), {'PY_START': (1 << TEST_TOOL) | (1 << TEST_TOOL2)})
400 counter1.count = 0
401 counter2.count = 0
402 f1()
403 count1 = counter1.count
404 count2 = counter2.count
405 self.assertEqual((count1, count2), (1, 1))
406 finally:
407 sys.monitoring.set_events(TEST_TOOL, 0)
408 sys.monitoring.set_events(TEST_TOOL2, 0)
409 sys.monitoring.register_callback(TEST_TOOL, E.PY_START, None)
410 sys.monitoring.register_callback(TEST_TOOL2, E.PY_START, None)
411 self.assertEqual(sys.monitoring._all_events(), {})
412
413 def test_three_same(self):
414 try:
415 self.assertEqual(sys.monitoring._all_events(), {})
416 counter1 = CounterWithDisable()
417 counter2 = CounterWithDisable()
418 counter3 = CounterWithDisable()
419 sys.monitoring.register_callback(TEST_TOOL, E.PY_START, counter1)
420 sys.monitoring.register_callback(TEST_TOOL2, E.PY_START, counter2)
421 sys.monitoring.register_callback(TEST_TOOL3, E.PY_START, counter3)
422 sys.monitoring.set_events(TEST_TOOL, E.PY_START)
423 sys.monitoring.set_events(TEST_TOOL2, E.PY_START)
424 sys.monitoring.set_events(TEST_TOOL3, E.PY_START)
425 self.assertEqual(sys.monitoring.get_events(TEST_TOOL), E.PY_START)
426 self.assertEqual(sys.monitoring.get_events(TEST_TOOL2), E.PY_START)
427 self.assertEqual(sys.monitoring.get_events(TEST_TOOL3), E.PY_START)
428 self.assertEqual(sys.monitoring._all_events(), {'PY_START': (1 << TEST_TOOL) | (1 << TEST_TOOL2) | (1 << TEST_TOOL3)})
429 counter1.count = 0
430 counter2.count = 0
431 counter3.count = 0
432 f1()
433 count1 = counter1.count
434 count2 = counter2.count
435 count3 = counter3.count
436 self.assertEqual((count1, count2, count3), (1, 1, 1))
437 finally:
438 sys.monitoring.set_events(TEST_TOOL, 0)
439 sys.monitoring.set_events(TEST_TOOL2, 0)
440 sys.monitoring.set_events(TEST_TOOL3, 0)
441 sys.monitoring.register_callback(TEST_TOOL, E.PY_START, None)
442 sys.monitoring.register_callback(TEST_TOOL2, E.PY_START, None)
443 sys.monitoring.register_callback(TEST_TOOL3, E.PY_START, None)
444 self.assertEqual(sys.monitoring._all_events(), {})
445
446 def test_two_different(self):
447 try:
448 self.assertEqual(sys.monitoring._all_events(), {})
449 counter1 = CounterWithDisable()
450 counter2 = CounterWithDisable()
451 sys.monitoring.register_callback(TEST_TOOL, E.PY_START, counter1)
452 sys.monitoring.register_callback(TEST_TOOL2, E.PY_RETURN, counter2)
453 sys.monitoring.set_events(TEST_TOOL, E.PY_START)
454 sys.monitoring.set_events(TEST_TOOL2, E.PY_RETURN)
455 self.assertEqual(sys.monitoring.get_events(TEST_TOOL), E.PY_START)
456 self.assertEqual(sys.monitoring.get_events(TEST_TOOL2), E.PY_RETURN)
457 self.assertEqual(sys.monitoring._all_events(), {'PY_START': 1 << TEST_TOOL, 'PY_RETURN': 1 << TEST_TOOL2})
458 counter1.count = 0
459 counter2.count = 0
460 f1()
461 count1 = counter1.count
462 count2 = counter2.count
463 self.assertEqual((count1, count2), (1, 1))
464 finally:
465 sys.monitoring.set_events(TEST_TOOL, 0)
466 sys.monitoring.set_events(TEST_TOOL2, 0)
467 sys.monitoring.register_callback(TEST_TOOL, E.PY_START, None)
468 sys.monitoring.register_callback(TEST_TOOL2, E.PY_RETURN, None)
469 self.assertEqual(sys.monitoring._all_events(), {})
470
471 def test_two_with_disable(self):
472 try:
473 self.assertEqual(sys.monitoring._all_events(), {})
474 counter1 = CounterWithDisable()
475 counter2 = CounterWithDisable()
476 sys.monitoring.register_callback(TEST_TOOL, E.PY_START, counter1)
477 sys.monitoring.register_callback(TEST_TOOL2, E.PY_START, counter2)
478 sys.monitoring.set_events(TEST_TOOL, E.PY_START)
479 sys.monitoring.set_events(TEST_TOOL2, E.PY_START)
480 self.assertEqual(sys.monitoring.get_events(TEST_TOOL), E.PY_START)
481 self.assertEqual(sys.monitoring.get_events(TEST_TOOL2), E.PY_START)
482 self.assertEqual(sys.monitoring._all_events(), {'PY_START': (1 << TEST_TOOL) | (1 << TEST_TOOL2)})
483 counter1.count = 0
484 counter2.count = 0
485 counter1.disable = True
486 f1()
487 count1 = counter1.count
488 count2 = counter2.count
489 self.assertEqual((count1, count2), (1, 1))
490 counter1.count = 0
491 counter2.count = 0
492 f1()
493 count1 = counter1.count
494 count2 = counter2.count
495 self.assertEqual((count1, count2), (0, 1))
496 finally:
497 sys.monitoring.set_events(TEST_TOOL, 0)
498 sys.monitoring.set_events(TEST_TOOL2, 0)
499 sys.monitoring.register_callback(TEST_TOOL, E.PY_START, None)
500 sys.monitoring.register_callback(TEST_TOOL2, E.PY_START, None)
501 self.assertEqual(sys.monitoring._all_events(), {})
502 sys.monitoring.restart_events()
503
504 def test_with_instruction_event(self):
505 """Test that the second tool can set events with instruction events set by the first tool."""
506 def f():
507 pass
508 code = f.__code__
509
510 try:
511 self.assertEqual(sys.monitoring._all_events(), {})
512 sys.monitoring.set_local_events(TEST_TOOL, code, E.INSTRUCTION | E.LINE)
513 sys.monitoring.set_local_events(TEST_TOOL2, code, E.LINE)
514 finally:
515 sys.monitoring.set_events(TEST_TOOL, 0)
516 sys.monitoring.set_events(TEST_TOOL2, 0)
517 self.assertEqual(sys.monitoring._all_events(), {})
518
519
520 class ESC[4;38;5;81mLineMonitoringTest(ESC[4;38;5;149mMonitoringTestBase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
521
522 def test_lines_single(self):
523 try:
524 self.assertEqual(sys.monitoring._all_events(), {})
525 events = []
526 recorder = RecorderWithDisable(events)
527 sys.monitoring.register_callback(TEST_TOOL, E.LINE, recorder)
528 sys.monitoring.set_events(TEST_TOOL, E.LINE)
529 f1()
530 sys.monitoring.set_events(TEST_TOOL, 0)
531 sys.monitoring.register_callback(TEST_TOOL, E.LINE, None)
532 start = LineMonitoringTest.test_lines_single.__code__.co_firstlineno
533 self.assertEqual(events, [start+7, 16, start+8])
534 finally:
535 sys.monitoring.set_events(TEST_TOOL, 0)
536 sys.monitoring.register_callback(TEST_TOOL, E.LINE, None)
537 self.assertEqual(sys.monitoring._all_events(), {})
538 sys.monitoring.restart_events()
539
540 def test_lines_loop(self):
541 try:
542 self.assertEqual(sys.monitoring._all_events(), {})
543 events = []
544 recorder = RecorderWithDisable(events)
545 sys.monitoring.register_callback(TEST_TOOL, E.LINE, recorder)
546 sys.monitoring.set_events(TEST_TOOL, E.LINE)
547 floop()
548 sys.monitoring.set_events(TEST_TOOL, 0)
549 sys.monitoring.register_callback(TEST_TOOL, E.LINE, None)
550 start = LineMonitoringTest.test_lines_loop.__code__.co_firstlineno
551 self.assertEqual(events, [start+7, 23, 24, 23, 24, 23, start+8])
552 finally:
553 sys.monitoring.set_events(TEST_TOOL, 0)
554 sys.monitoring.register_callback(TEST_TOOL, E.LINE, None)
555 self.assertEqual(sys.monitoring._all_events(), {})
556 sys.monitoring.restart_events()
557
558 def test_lines_two(self):
559 try:
560 self.assertEqual(sys.monitoring._all_events(), {})
561 events = []
562 recorder = RecorderWithDisable(events)
563 events2 = []
564 recorder2 = RecorderWithDisable(events2)
565 sys.monitoring.register_callback(TEST_TOOL, E.LINE, recorder)
566 sys.monitoring.register_callback(TEST_TOOL2, E.LINE, recorder2)
567 sys.monitoring.set_events(TEST_TOOL, E.LINE); sys.monitoring.set_events(TEST_TOOL2, E.LINE)
568 f1()
569 sys.monitoring.set_events(TEST_TOOL, 0); sys.monitoring.set_events(TEST_TOOL2, 0)
570 sys.monitoring.register_callback(TEST_TOOL, E.LINE, None)
571 sys.monitoring.register_callback(TEST_TOOL2, E.LINE, None)
572 start = LineMonitoringTest.test_lines_two.__code__.co_firstlineno
573 expected = [start+10, 16, start+11]
574 self.assertEqual(events, expected)
575 self.assertEqual(events2, expected)
576 finally:
577 sys.monitoring.set_events(TEST_TOOL, 0)
578 sys.monitoring.set_events(TEST_TOOL2, 0)
579 sys.monitoring.register_callback(TEST_TOOL, E.LINE, None)
580 sys.monitoring.register_callback(TEST_TOOL2, E.LINE, None)
581 self.assertEqual(sys.monitoring._all_events(), {})
582 sys.monitoring.restart_events()
583
584 def check_lines(self, func, expected, tool=TEST_TOOL):
585 try:
586 self.assertEqual(sys.monitoring._all_events(), {})
587 events = []
588 recorder = RecorderWithDisable(events)
589 sys.monitoring.register_callback(tool, E.LINE, recorder)
590 sys.monitoring.set_events(tool, E.LINE)
591 func()
592 sys.monitoring.set_events(tool, 0)
593 sys.monitoring.register_callback(tool, E.LINE, None)
594 lines = [ line - func.__code__.co_firstlineno for line in events[1:-1] ]
595 self.assertEqual(lines, expected)
596 finally:
597 sys.monitoring.set_events(tool, 0)
598
599
600 def test_linear(self):
601
602 def func():
603 line = 1
604 line = 2
605 line = 3
606 line = 4
607 line = 5
608
609 self.check_lines(func, [1,2,3,4,5])
610
611 def test_branch(self):
612 def func():
613 if "true".startswith("t"):
614 line = 2
615 line = 3
616 else:
617 line = 5
618 line = 6
619
620 self.check_lines(func, [1,2,3,6])
621
622 def test_try_except(self):
623
624 def func1():
625 try:
626 line = 2
627 line = 3
628 except:
629 line = 5
630 line = 6
631
632 self.check_lines(func1, [1,2,3,6])
633
634 def func2():
635 try:
636 line = 2
637 raise 3
638 except:
639 line = 5
640 line = 6
641
642 self.check_lines(func2, [1,2,3,4,5,6])
643
644 class ESC[4;38;5;81mTestDisable(ESC[4;38;5;149mMonitoringTestBase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
645
646 def gen(self, cond):
647 for i in range(10):
648 if cond:
649 yield 1
650 else:
651 yield 2
652
653 def raise_handle_reraise(self):
654 try:
655 1/0
656 except:
657 raise
658
659 def test_disable_legal_events(self):
660 for event, name in INSTRUMENTED_EVENTS:
661 try:
662 counter = CounterWithDisable()
663 counter.disable = True
664 sys.monitoring.register_callback(TEST_TOOL, event, counter)
665 sys.monitoring.set_events(TEST_TOOL, event)
666 for _ in self.gen(1):
667 pass
668 self.assertLess(counter.count, 4)
669 finally:
670 sys.monitoring.set_events(TEST_TOOL, 0)
671 sys.monitoring.register_callback(TEST_TOOL, event, None)
672
673
674 def test_disable_illegal_events(self):
675 for event, name in EXCEPT_EVENTS:
676 try:
677 counter = CounterWithDisable()
678 counter.disable = True
679 sys.monitoring.register_callback(TEST_TOOL, event, counter)
680 sys.monitoring.set_events(TEST_TOOL, event)
681 with self.assertRaises(ValueError):
682 self.raise_handle_reraise()
683 finally:
684 sys.monitoring.set_events(TEST_TOOL, 0)
685 sys.monitoring.register_callback(TEST_TOOL, event, None)
686
687
688 class ESC[4;38;5;81mExceptionRecorder:
689
690 event_type = E.RAISE
691
692 def __init__(self, events):
693 self.events = events
694
695 def __call__(self, code, offset, exc):
696 self.events.append(("raise", type(exc)))
697
698 class ESC[4;38;5;81mCheckEvents(ESC[4;38;5;149mMonitoringTestBase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
699
700 def get_events(self, func, tool, recorders):
701 try:
702 self.assertEqual(sys.monitoring._all_events(), {})
703 event_list = []
704 all_events = 0
705 for recorder in recorders:
706 ev = recorder.event_type
707 sys.monitoring.register_callback(tool, ev, recorder(event_list))
708 all_events |= ev
709 sys.monitoring.set_events(tool, all_events)
710 func()
711 sys.monitoring.set_events(tool, 0)
712 for recorder in recorders:
713 sys.monitoring.register_callback(tool, recorder.event_type, None)
714 return event_list
715 finally:
716 sys.monitoring.set_events(tool, 0)
717 for recorder in recorders:
718 sys.monitoring.register_callback(tool, recorder.event_type, None)
719
720 def check_events(self, func, expected, tool=TEST_TOOL, recorders=(ExceptionRecorder,)):
721 events = self.get_events(func, tool, recorders)
722 if events != expected:
723 print(events, file = sys.stderr)
724 self.assertEqual(events, expected)
725
726 def check_balanced(self, func, recorders):
727 events = self.get_events(func, TEST_TOOL, recorders)
728 self.assertEqual(len(events)%2, 0)
729 for r, h in zip(events[::2],events[1::2]):
730 r0 = r[0]
731 self.assertIn(r0, ("raise", "reraise"))
732 h0 = h[0]
733 self.assertIn(h0, ("handled", "unwind"))
734 self.assertEqual(r[1], h[1])
735
736
737 class ESC[4;38;5;81mStopiterationRecorder(ESC[4;38;5;149mExceptionRecorder):
738
739 event_type = E.STOP_ITERATION
740
741 class ESC[4;38;5;81mReraiseRecorder(ESC[4;38;5;149mExceptionRecorder):
742
743 event_type = E.RERAISE
744
745 def __call__(self, code, offset, exc):
746 self.events.append(("reraise", type(exc)))
747
748 class ESC[4;38;5;81mUnwindRecorder(ESC[4;38;5;149mExceptionRecorder):
749
750 event_type = E.PY_UNWIND
751
752 def __call__(self, code, offset, exc):
753 self.events.append(("unwind", type(exc)))
754
755 class ESC[4;38;5;81mExceptionHandledRecorder(ESC[4;38;5;149mExceptionRecorder):
756
757 event_type = E.EXCEPTION_HANDLED
758
759 def __call__(self, code, offset, exc):
760 self.events.append(("handled", type(exc)))
761
762 class ESC[4;38;5;81mThrowRecorder(ESC[4;38;5;149mExceptionRecorder):
763
764 event_type = E.PY_THROW
765
766 def __call__(self, code, offset, exc):
767 self.events.append(("throw", type(exc)))
768
769 class ESC[4;38;5;81mExceptionMonitoringTest(ESC[4;38;5;149mCheckEvents):
770
771
772 exception_recorders = (
773 ExceptionRecorder,
774 ReraiseRecorder,
775 ExceptionHandledRecorder,
776 UnwindRecorder
777 )
778
779 def test_simple_try_except(self):
780
781 def func1():
782 try:
783 line = 2
784 raise KeyError
785 except:
786 line = 5
787 line = 6
788
789 self.check_events(func1, [("raise", KeyError)])
790
791 def test_implicit_stop_iteration(self):
792
793 def gen():
794 yield 1
795 return 2
796
797 def implicit_stop_iteration():
798 for _ in gen():
799 pass
800
801 self.check_events(implicit_stop_iteration, [("raise", StopIteration)], recorders=(StopiterationRecorder,))
802
803 initial = [
804 ("raise", ZeroDivisionError),
805 ("handled", ZeroDivisionError)
806 ]
807
808 reraise = [
809 ("reraise", ZeroDivisionError),
810 ("handled", ZeroDivisionError)
811 ]
812
813 def test_explicit_reraise(self):
814
815 def func():
816 try:
817 try:
818 1/0
819 except:
820 raise
821 except:
822 pass
823
824 self.check_balanced(
825 func,
826 recorders = self.exception_recorders)
827
828 def test_explicit_reraise_named(self):
829
830 def func():
831 try:
832 try:
833 1/0
834 except Exception as ex:
835 raise
836 except:
837 pass
838
839 self.check_balanced(
840 func,
841 recorders = self.exception_recorders)
842
843 def test_implicit_reraise(self):
844
845 def func():
846 try:
847 try:
848 1/0
849 except ValueError:
850 pass
851 except:
852 pass
853
854 self.check_balanced(
855 func,
856 recorders = self.exception_recorders)
857
858
859 def test_implicit_reraise_named(self):
860
861 def func():
862 try:
863 try:
864 1/0
865 except ValueError as ex:
866 pass
867 except:
868 pass
869
870 self.check_balanced(
871 func,
872 recorders = self.exception_recorders)
873
874 def test_try_finally(self):
875
876 def func():
877 try:
878 try:
879 1/0
880 finally:
881 pass
882 except:
883 pass
884
885 self.check_balanced(
886 func,
887 recorders = self.exception_recorders)
888
889 def test_async_for(self):
890
891 def func():
892
893 async def async_generator():
894 for i in range(1):
895 raise ZeroDivisionError
896 yield i
897
898 async def async_loop():
899 try:
900 async for item in async_generator():
901 pass
902 except Exception:
903 pass
904
905 try:
906 async_loop().send(None)
907 except StopIteration:
908 pass
909
910 self.check_balanced(
911 func,
912 recorders = self.exception_recorders)
913
914 def test_throw(self):
915
916 def gen():
917 yield 1
918 yield 2
919
920 def func():
921 try:
922 g = gen()
923 next(g)
924 g.throw(IndexError)
925 except IndexError:
926 pass
927
928 self.check_balanced(
929 func,
930 recorders = self.exception_recorders)
931
932 events = self.get_events(
933 func,
934 TEST_TOOL,
935 self.exception_recorders + (ThrowRecorder,)
936 )
937 self.assertEqual(events[0], ("throw", IndexError))
938
939 class ESC[4;38;5;81mLineRecorder:
940
941 event_type = E.LINE
942
943
944 def __init__(self, events):
945 self.events = events
946
947 def __call__(self, code, line):
948 self.events.append(("line", code.co_name, line - code.co_firstlineno))
949
950 class ESC[4;38;5;81mCallRecorder:
951
952 event_type = E.CALL
953
954 def __init__(self, events):
955 self.events = events
956
957 def __call__(self, code, offset, func, arg):
958 self.events.append(("call", func.__name__, arg))
959
960 class ESC[4;38;5;81mCEventRecorder:
961
962 def __init__(self, events):
963 self.events = events
964
965 def __call__(self, code, offset, func, arg):
966 self.events.append((self.event_name, func.__name__, arg))
967
968 class ESC[4;38;5;81mCReturnRecorder(ESC[4;38;5;149mCEventRecorder):
969
970 event_type = E.C_RETURN
971 event_name = "C return"
972
973 class ESC[4;38;5;81mCRaiseRecorder(ESC[4;38;5;149mCEventRecorder):
974
975 event_type = E.C_RAISE
976 event_name = "C raise"
977
978 MANY_RECORDERS = ExceptionRecorder, CallRecorder, LineRecorder, CReturnRecorder, CRaiseRecorder
979
980 class ESC[4;38;5;81mTestManyEvents(ESC[4;38;5;149mCheckEvents):
981
982 def test_simple(self):
983
984 def func1():
985 line1 = 1
986 line2 = 2
987 line3 = 3
988
989 self.check_events(func1, recorders = MANY_RECORDERS, expected = [
990 ('line', 'get_events', 10),
991 ('call', 'func1', sys.monitoring.MISSING),
992 ('line', 'func1', 1),
993 ('line', 'func1', 2),
994 ('line', 'func1', 3),
995 ('line', 'get_events', 11),
996 ('call', 'set_events', 2)])
997
998 def test_c_call(self):
999
1000 def func2():
1001 line1 = 1
1002 [].append(2)
1003 line3 = 3
1004
1005 self.check_events(func2, recorders = MANY_RECORDERS, expected = [
1006 ('line', 'get_events', 10),
1007 ('call', 'func2', sys.monitoring.MISSING),
1008 ('line', 'func2', 1),
1009 ('line', 'func2', 2),
1010 ('call', 'append', [2]),
1011 ('C return', 'append', [2]),
1012 ('line', 'func2', 3),
1013 ('line', 'get_events', 11),
1014 ('call', 'set_events', 2)])
1015
1016 def test_try_except(self):
1017
1018 def func3():
1019 try:
1020 line = 2
1021 raise KeyError
1022 except:
1023 line = 5
1024 line = 6
1025
1026 self.check_events(func3, recorders = MANY_RECORDERS, expected = [
1027 ('line', 'get_events', 10),
1028 ('call', 'func3', sys.monitoring.MISSING),
1029 ('line', 'func3', 1),
1030 ('line', 'func3', 2),
1031 ('line', 'func3', 3),
1032 ('raise', KeyError),
1033 ('line', 'func3', 4),
1034 ('line', 'func3', 5),
1035 ('line', 'func3', 6),
1036 ('line', 'get_events', 11),
1037 ('call', 'set_events', 2)])
1038
1039 class ESC[4;38;5;81mInstructionRecorder:
1040
1041 event_type = E.INSTRUCTION
1042
1043 def __init__(self, events):
1044 self.events = events
1045
1046 def __call__(self, code, offset):
1047 # Filter out instructions in check_events to lower noise
1048 if code.co_name != "get_events":
1049 self.events.append(("instruction", code.co_name, offset))
1050
1051
1052 LINE_AND_INSTRUCTION_RECORDERS = InstructionRecorder, LineRecorder
1053
1054 class ESC[4;38;5;81mTestLineAndInstructionEvents(ESC[4;38;5;149mCheckEvents):
1055 maxDiff = None
1056
1057 def test_simple(self):
1058
1059 def func1():
1060 line1 = 1
1061 line2 = 2
1062 line3 = 3
1063
1064 self.check_events(func1, recorders = LINE_AND_INSTRUCTION_RECORDERS, expected = [
1065 ('line', 'get_events', 10),
1066 ('line', 'func1', 1),
1067 ('instruction', 'func1', 2),
1068 ('instruction', 'func1', 4),
1069 ('line', 'func1', 2),
1070 ('instruction', 'func1', 6),
1071 ('instruction', 'func1', 8),
1072 ('line', 'func1', 3),
1073 ('instruction', 'func1', 10),
1074 ('instruction', 'func1', 12),
1075 ('instruction', 'func1', 14),
1076 ('line', 'get_events', 11)])
1077
1078 def test_c_call(self):
1079
1080 def func2():
1081 line1 = 1
1082 [].append(2)
1083 line3 = 3
1084
1085 self.check_events(func2, recorders = LINE_AND_INSTRUCTION_RECORDERS, expected = [
1086 ('line', 'get_events', 10),
1087 ('line', 'func2', 1),
1088 ('instruction', 'func2', 2),
1089 ('instruction', 'func2', 4),
1090 ('line', 'func2', 2),
1091 ('instruction', 'func2', 6),
1092 ('instruction', 'func2', 8),
1093 ('instruction', 'func2', 28),
1094 ('instruction', 'func2', 30),
1095 ('instruction', 'func2', 38),
1096 ('line', 'func2', 3),
1097 ('instruction', 'func2', 40),
1098 ('instruction', 'func2', 42),
1099 ('instruction', 'func2', 44),
1100 ('line', 'get_events', 11)])
1101
1102 def test_try_except(self):
1103
1104 def func3():
1105 try:
1106 line = 2
1107 raise KeyError
1108 except:
1109 line = 5
1110 line = 6
1111
1112 self.check_events(func3, recorders = LINE_AND_INSTRUCTION_RECORDERS, expected = [
1113 ('line', 'get_events', 10),
1114 ('line', 'func3', 1),
1115 ('instruction', 'func3', 2),
1116 ('line', 'func3', 2),
1117 ('instruction', 'func3', 4),
1118 ('instruction', 'func3', 6),
1119 ('line', 'func3', 3),
1120 ('instruction', 'func3', 8),
1121 ('instruction', 'func3', 18),
1122 ('instruction', 'func3', 20),
1123 ('line', 'func3', 4),
1124 ('instruction', 'func3', 22),
1125 ('line', 'func3', 5),
1126 ('instruction', 'func3', 24),
1127 ('instruction', 'func3', 26),
1128 ('instruction', 'func3', 28),
1129 ('line', 'func3', 6),
1130 ('instruction', 'func3', 30),
1131 ('instruction', 'func3', 32),
1132 ('instruction', 'func3', 34),
1133 ('line', 'get_events', 11)])
1134
1135 def test_with_restart(self):
1136 def func1():
1137 line1 = 1
1138 line2 = 2
1139 line3 = 3
1140
1141 self.check_events(func1, recorders = LINE_AND_INSTRUCTION_RECORDERS, expected = [
1142 ('line', 'get_events', 10),
1143 ('line', 'func1', 1),
1144 ('instruction', 'func1', 2),
1145 ('instruction', 'func1', 4),
1146 ('line', 'func1', 2),
1147 ('instruction', 'func1', 6),
1148 ('instruction', 'func1', 8),
1149 ('line', 'func1', 3),
1150 ('instruction', 'func1', 10),
1151 ('instruction', 'func1', 12),
1152 ('instruction', 'func1', 14),
1153 ('line', 'get_events', 11)])
1154
1155 sys.monitoring.restart_events()
1156
1157 self.check_events(func1, recorders = LINE_AND_INSTRUCTION_RECORDERS, expected = [
1158 ('line', 'get_events', 10),
1159 ('line', 'func1', 1),
1160 ('instruction', 'func1', 2),
1161 ('instruction', 'func1', 4),
1162 ('line', 'func1', 2),
1163 ('instruction', 'func1', 6),
1164 ('instruction', 'func1', 8),
1165 ('line', 'func1', 3),
1166 ('instruction', 'func1', 10),
1167 ('instruction', 'func1', 12),
1168 ('instruction', 'func1', 14),
1169 ('line', 'get_events', 11)])
1170
1171 class ESC[4;38;5;81mTestInstallIncrementallly(ESC[4;38;5;149mMonitoringTestBase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1172
1173 def check_events(self, func, must_include, tool=TEST_TOOL, recorders=(ExceptionRecorder,)):
1174 try:
1175 self.assertEqual(sys.monitoring._all_events(), {})
1176 event_list = []
1177 all_events = 0
1178 for recorder in recorders:
1179 all_events |= recorder.event_type
1180 sys.monitoring.set_events(tool, all_events)
1181 for recorder in recorders:
1182 sys.monitoring.register_callback(tool, recorder.event_type, recorder(event_list))
1183 func()
1184 sys.monitoring.set_events(tool, 0)
1185 for recorder in recorders:
1186 sys.monitoring.register_callback(tool, recorder.event_type, None)
1187 for line in must_include:
1188 self.assertIn(line, event_list)
1189 finally:
1190 sys.monitoring.set_events(tool, 0)
1191 for recorder in recorders:
1192 sys.monitoring.register_callback(tool, recorder.event_type, None)
1193
1194 @staticmethod
1195 def func1():
1196 line1 = 1
1197
1198 MUST_INCLUDE_LI = [
1199 ('instruction', 'func1', 2),
1200 ('line', 'func1', 1),
1201 ('instruction', 'func1', 4),
1202 ('instruction', 'func1', 6)]
1203
1204 def test_line_then_instruction(self):
1205 recorders = [ LineRecorder, InstructionRecorder ]
1206 self.check_events(self.func1,
1207 recorders = recorders, must_include = self.EXPECTED_LI)
1208
1209 def test_instruction_then_line(self):
1210 recorders = [ InstructionRecorder, LineRecorderLowNoise ]
1211 self.check_events(self.func1,
1212 recorders = recorders, must_include = self.EXPECTED_LI)
1213
1214 @staticmethod
1215 def func2():
1216 len(())
1217
1218 MUST_INCLUDE_CI = [
1219 ('instruction', 'func2', 2),
1220 ('call', 'func2', sys.monitoring.MISSING),
1221 ('call', 'len', ()),
1222 ('instruction', 'func2', 12),
1223 ('instruction', 'func2', 14)]
1224
1225
1226
1227 def test_line_then_instruction(self):
1228 recorders = [ CallRecorder, InstructionRecorder ]
1229 self.check_events(self.func2,
1230 recorders = recorders, must_include = self.MUST_INCLUDE_CI)
1231
1232 def test_instruction_then_line(self):
1233 recorders = [ InstructionRecorder, CallRecorder ]
1234 self.check_events(self.func2,
1235 recorders = recorders, must_include = self.MUST_INCLUDE_CI)
1236
1237 LOCAL_RECORDERS = CallRecorder, LineRecorder, CReturnRecorder, CRaiseRecorder
1238
1239 class ESC[4;38;5;81mTestLocalEvents(ESC[4;38;5;149mMonitoringTestBase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1240
1241 def check_events(self, func, expected, tool=TEST_TOOL, recorders=()):
1242 try:
1243 self.assertEqual(sys.monitoring._all_events(), {})
1244 event_list = []
1245 all_events = 0
1246 for recorder in recorders:
1247 ev = recorder.event_type
1248 sys.monitoring.register_callback(tool, ev, recorder(event_list))
1249 all_events |= ev
1250 sys.monitoring.set_local_events(tool, func.__code__, all_events)
1251 func()
1252 sys.monitoring.set_local_events(tool, func.__code__, 0)
1253 for recorder in recorders:
1254 sys.monitoring.register_callback(tool, recorder.event_type, None)
1255 self.assertEqual(event_list, expected)
1256 finally:
1257 sys.monitoring.set_local_events(tool, func.__code__, 0)
1258 for recorder in recorders:
1259 sys.monitoring.register_callback(tool, recorder.event_type, None)
1260
1261
1262 def test_simple(self):
1263
1264 def func1():
1265 line1 = 1
1266 line2 = 2
1267 line3 = 3
1268
1269 self.check_events(func1, recorders = LOCAL_RECORDERS, expected = [
1270 ('line', 'func1', 1),
1271 ('line', 'func1', 2),
1272 ('line', 'func1', 3)])
1273
1274 def test_c_call(self):
1275
1276 def func2():
1277 line1 = 1
1278 [].append(2)
1279 line3 = 3
1280
1281 self.check_events(func2, recorders = LOCAL_RECORDERS, expected = [
1282 ('line', 'func2', 1),
1283 ('line', 'func2', 2),
1284 ('call', 'append', [2]),
1285 ('C return', 'append', [2]),
1286 ('line', 'func2', 3)])
1287
1288 def test_try_except(self):
1289
1290 def func3():
1291 try:
1292 line = 2
1293 raise KeyError
1294 except:
1295 line = 5
1296 line = 6
1297
1298 self.check_events(func3, recorders = LOCAL_RECORDERS, expected = [
1299 ('line', 'func3', 1),
1300 ('line', 'func3', 2),
1301 ('line', 'func3', 3),
1302 ('line', 'func3', 4),
1303 ('line', 'func3', 5),
1304 ('line', 'func3', 6)])
1305
1306 def test_set_non_local_event(self):
1307 with self.assertRaises(ValueError):
1308 sys.monitoring.set_local_events(TEST_TOOL, just_call.__code__, E.RAISE)
1309
1310 def line_from_offset(code, offset):
1311 for start, end, line in code.co_lines():
1312 if start <= offset < end:
1313 if line is None:
1314 return f"[offset={offset}]"
1315 return line - code.co_firstlineno
1316 return -1
1317
1318 class ESC[4;38;5;81mJumpRecorder:
1319
1320 event_type = E.JUMP
1321 name = "jump"
1322
1323 def __init__(self, events):
1324 self.events = events
1325
1326 def __call__(self, code, from_, to):
1327 from_line = line_from_offset(code, from_)
1328 to_line = line_from_offset(code, to)
1329 self.events.append((self.name, code.co_name, from_line, to_line))
1330
1331
1332 class ESC[4;38;5;81mBranchRecorder(ESC[4;38;5;149mJumpRecorder):
1333
1334 event_type = E.BRANCH
1335 name = "branch"
1336
1337 class ESC[4;38;5;81mReturnRecorder:
1338
1339 event_type = E.PY_RETURN
1340
1341 def __init__(self, events):
1342 self.events = events
1343
1344 def __call__(self, code, offset, val):
1345 self.events.append(("return", val))
1346
1347
1348 JUMP_AND_BRANCH_RECORDERS = JumpRecorder, BranchRecorder
1349 JUMP_BRANCH_AND_LINE_RECORDERS = JumpRecorder, BranchRecorder, LineRecorder
1350 FLOW_AND_LINE_RECORDERS = JumpRecorder, BranchRecorder, LineRecorder, ExceptionRecorder, ReturnRecorder
1351
1352 class ESC[4;38;5;81mTestBranchAndJumpEvents(ESC[4;38;5;149mCheckEvents):
1353 maxDiff = None
1354
1355 def test_loop(self):
1356
1357 def func():
1358 x = 1
1359 for a in range(2):
1360 if a:
1361 x = 4
1362 else:
1363 x = 6
1364
1365 self.check_events(func, recorders = JUMP_AND_BRANCH_RECORDERS, expected = [
1366 ('branch', 'func', 2, 2),
1367 ('branch', 'func', 3, 6),
1368 ('jump', 'func', 6, 2),
1369 ('branch', 'func', 2, 2),
1370 ('branch', 'func', 3, 4),
1371 ('jump', 'func', 4, 2),
1372 ('branch', 'func', 2, 2)])
1373
1374 self.check_events(func, recorders = JUMP_BRANCH_AND_LINE_RECORDERS, expected = [
1375 ('line', 'get_events', 10),
1376 ('line', 'func', 1),
1377 ('line', 'func', 2),
1378 ('branch', 'func', 2, 2),
1379 ('line', 'func', 3),
1380 ('branch', 'func', 3, 6),
1381 ('line', 'func', 6),
1382 ('jump', 'func', 6, 2),
1383 ('line', 'func', 2),
1384 ('branch', 'func', 2, 2),
1385 ('line', 'func', 3),
1386 ('branch', 'func', 3, 4),
1387 ('line', 'func', 4),
1388 ('jump', 'func', 4, 2),
1389 ('line', 'func', 2),
1390 ('branch', 'func', 2, 2),
1391 ('line', 'get_events', 11)])
1392
1393 def test_except_star(self):
1394
1395 class ESC[4;38;5;81mFoo:
1396 def meth(self):
1397 pass
1398
1399 def func():
1400 try:
1401 try:
1402 raise KeyError
1403 except* Exception as e:
1404 f = Foo(); f.meth()
1405 except KeyError:
1406 pass
1407
1408
1409 self.check_events(func, recorders = JUMP_BRANCH_AND_LINE_RECORDERS, expected = [
1410 ('line', 'get_events', 10),
1411 ('line', 'func', 1),
1412 ('line', 'func', 2),
1413 ('line', 'func', 3),
1414 ('line', 'func', 4),
1415 ('branch', 'func', 4, 4),
1416 ('line', 'func', 5),
1417 ('line', 'meth', 1),
1418 ('jump', 'func', 5, 5),
1419 ('jump', 'func', 5, '[offset=114]'),
1420 ('branch', 'func', '[offset=120]', '[offset=122]'),
1421 ('line', 'get_events', 11)])
1422
1423 self.check_events(func, recorders = FLOW_AND_LINE_RECORDERS, expected = [
1424 ('line', 'get_events', 10),
1425 ('line', 'func', 1),
1426 ('line', 'func', 2),
1427 ('line', 'func', 3),
1428 ('raise', KeyError),
1429 ('line', 'func', 4),
1430 ('branch', 'func', 4, 4),
1431 ('line', 'func', 5),
1432 ('line', 'meth', 1),
1433 ('return', None),
1434 ('jump', 'func', 5, 5),
1435 ('jump', 'func', 5, '[offset=114]'),
1436 ('branch', 'func', '[offset=120]', '[offset=122]'),
1437 ('return', None),
1438 ('line', 'get_events', 11)])
1439
1440 class ESC[4;38;5;81mTestLoadSuperAttr(ESC[4;38;5;149mCheckEvents):
1441 RECORDERS = CallRecorder, LineRecorder, CRaiseRecorder, CReturnRecorder
1442
1443 def _exec(self, co):
1444 d = {}
1445 exec(co, d, d)
1446 return d
1447
1448 def _exec_super(self, codestr, optimized=False):
1449 # The compiler checks for statically visible shadowing of the name
1450 # `super`, and declines to emit `LOAD_SUPER_ATTR` if shadowing is found.
1451 # So inserting `super = super` prevents the compiler from emitting
1452 # `LOAD_SUPER_ATTR`, and allows us to test that monitoring events for
1453 # `LOAD_SUPER_ATTR` are equivalent to those we'd get from the
1454 # un-optimized `LOAD_GLOBAL super; CALL; LOAD_ATTR` form.
1455 assignment = "x = 1" if optimized else "super = super"
1456 codestr = f"{assignment}\n{textwrap.dedent(codestr)}"
1457 co = compile(codestr, "<string>", "exec")
1458 # validate that we really do have a LOAD_SUPER_ATTR, only when optimized
1459 self.assertEqual(self._has_load_super_attr(co), optimized)
1460 return self._exec(co)
1461
1462 def _has_load_super_attr(self, co):
1463 has = any(instr.opname == "LOAD_SUPER_ATTR" for instr in dis.get_instructions(co))
1464 if not has:
1465 has = any(
1466 isinstance(c, types.CodeType) and self._has_load_super_attr(c)
1467 for c in co.co_consts
1468 )
1469 return has
1470
1471 def _super_method_call(self, optimized=False):
1472 codestr = """
1473 class A:
1474 def method(self, x):
1475 return x
1476
1477 class B(A):
1478 def method(self, x):
1479 return super(
1480 ).method(
1481 x
1482 )
1483
1484 b = B()
1485 def f():
1486 return b.method(1)
1487 """
1488 d = self._exec_super(codestr, optimized)
1489 expected = [
1490 ('line', 'get_events', 10),
1491 ('call', 'f', sys.monitoring.MISSING),
1492 ('line', 'f', 1),
1493 ('call', 'method', d["b"]),
1494 ('line', 'method', 1),
1495 ('call', 'super', sys.monitoring.MISSING),
1496 ('C return', 'super', sys.monitoring.MISSING),
1497 ('line', 'method', 2),
1498 ('line', 'method', 3),
1499 ('line', 'method', 2),
1500 ('call', 'method', 1),
1501 ('line', 'method', 1),
1502 ('line', 'method', 1),
1503 ('line', 'get_events', 11),
1504 ('call', 'set_events', 2),
1505 ]
1506 return d["f"], expected
1507
1508 def test_method_call(self):
1509 nonopt_func, nonopt_expected = self._super_method_call(optimized=False)
1510 opt_func, opt_expected = self._super_method_call(optimized=True)
1511
1512 self.check_events(nonopt_func, recorders=self.RECORDERS, expected=nonopt_expected)
1513 self.check_events(opt_func, recorders=self.RECORDERS, expected=opt_expected)
1514
1515 def _super_method_call_error(self, optimized=False):
1516 codestr = """
1517 class A:
1518 def method(self, x):
1519 return x
1520
1521 class B(A):
1522 def method(self, x):
1523 return super(
1524 x,
1525 self,
1526 ).method(
1527 x
1528 )
1529
1530 b = B()
1531 def f():
1532 try:
1533 return b.method(1)
1534 except TypeError:
1535 pass
1536 else:
1537 assert False, "should have raised TypeError"
1538 """
1539 d = self._exec_super(codestr, optimized)
1540 expected = [
1541 ('line', 'get_events', 10),
1542 ('call', 'f', sys.monitoring.MISSING),
1543 ('line', 'f', 1),
1544 ('line', 'f', 2),
1545 ('call', 'method', d["b"]),
1546 ('line', 'method', 1),
1547 ('line', 'method', 2),
1548 ('line', 'method', 3),
1549 ('line', 'method', 1),
1550 ('call', 'super', 1),
1551 ('C raise', 'super', 1),
1552 ('line', 'f', 3),
1553 ('line', 'f', 4),
1554 ('line', 'get_events', 11),
1555 ('call', 'set_events', 2),
1556 ]
1557 return d["f"], expected
1558
1559 def test_method_call_error(self):
1560 nonopt_func, nonopt_expected = self._super_method_call_error(optimized=False)
1561 opt_func, opt_expected = self._super_method_call_error(optimized=True)
1562
1563 self.check_events(nonopt_func, recorders=self.RECORDERS, expected=nonopt_expected)
1564 self.check_events(opt_func, recorders=self.RECORDERS, expected=opt_expected)
1565
1566 def _super_attr(self, optimized=False):
1567 codestr = """
1568 class A:
1569 x = 1
1570
1571 class B(A):
1572 def method(self):
1573 return super(
1574 ).x
1575
1576 b = B()
1577 def f():
1578 return b.method()
1579 """
1580 d = self._exec_super(codestr, optimized)
1581 expected = [
1582 ('line', 'get_events', 10),
1583 ('call', 'f', sys.monitoring.MISSING),
1584 ('line', 'f', 1),
1585 ('call', 'method', d["b"]),
1586 ('line', 'method', 1),
1587 ('call', 'super', sys.monitoring.MISSING),
1588 ('C return', 'super', sys.monitoring.MISSING),
1589 ('line', 'method', 2),
1590 ('line', 'method', 1),
1591 ('line', 'get_events', 11),
1592 ('call', 'set_events', 2)
1593 ]
1594 return d["f"], expected
1595
1596 def test_attr(self):
1597 nonopt_func, nonopt_expected = self._super_attr(optimized=False)
1598 opt_func, opt_expected = self._super_attr(optimized=True)
1599
1600 self.check_events(nonopt_func, recorders=self.RECORDERS, expected=nonopt_expected)
1601 self.check_events(opt_func, recorders=self.RECORDERS, expected=opt_expected)
1602
1603 def test_vs_other_type_call(self):
1604 code_template = textwrap.dedent("""
1605 class C:
1606 def method(self):
1607 return {cls}().__repr__{call}
1608 c = C()
1609 def f():
1610 return c.method()
1611 """)
1612
1613 def get_expected(name, call_method, ns):
1614 repr_arg = 0 if name == "int" else sys.monitoring.MISSING
1615 return [
1616 ('line', 'get_events', 10),
1617 ('call', 'f', sys.monitoring.MISSING),
1618 ('line', 'f', 1),
1619 ('call', 'method', ns["c"]),
1620 ('line', 'method', 1),
1621 ('call', name, sys.monitoring.MISSING),
1622 ('C return', name, sys.monitoring.MISSING),
1623 *(
1624 [
1625 ('call', '__repr__', repr_arg),
1626 ('C return', '__repr__', repr_arg),
1627 ] if call_method else []
1628 ),
1629 ('line', 'get_events', 11),
1630 ('call', 'set_events', 2),
1631 ]
1632
1633 for call_method in [True, False]:
1634 with self.subTest(call_method=call_method):
1635 call_str = "()" if call_method else ""
1636 code_super = code_template.format(cls="super", call=call_str)
1637 code_int = code_template.format(cls="int", call=call_str)
1638 co_super = compile(code_super, '<string>', 'exec')
1639 self.assertTrue(self._has_load_super_attr(co_super))
1640 ns_super = self._exec(co_super)
1641 ns_int = self._exec(code_int)
1642
1643 self.check_events(
1644 ns_super["f"],
1645 recorders=self.RECORDERS,
1646 expected=get_expected("super", call_method, ns_super)
1647 )
1648 self.check_events(
1649 ns_int["f"],
1650 recorders=self.RECORDERS,
1651 expected=get_expected("int", call_method, ns_int)
1652 )
1653
1654
1655 class ESC[4;38;5;81mTestSetGetEvents(ESC[4;38;5;149mMonitoringTestBase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1656
1657 def test_global(self):
1658 sys.monitoring.set_events(TEST_TOOL, E.PY_START)
1659 self.assertEqual(sys.monitoring.get_events(TEST_TOOL), E.PY_START)
1660 sys.monitoring.set_events(TEST_TOOL2, E.PY_START)
1661 self.assertEqual(sys.monitoring.get_events(TEST_TOOL2), E.PY_START)
1662 sys.monitoring.set_events(TEST_TOOL, 0)
1663 self.assertEqual(sys.monitoring.get_events(TEST_TOOL), 0)
1664 sys.monitoring.set_events(TEST_TOOL2,0)
1665 self.assertEqual(sys.monitoring.get_events(TEST_TOOL2), 0)
1666
1667 def test_local(self):
1668 code = f1.__code__
1669 sys.monitoring.set_local_events(TEST_TOOL, code, E.PY_START)
1670 self.assertEqual(sys.monitoring.get_local_events(TEST_TOOL, code), E.PY_START)
1671 sys.monitoring.set_local_events(TEST_TOOL2, code, E.PY_START)
1672 self.assertEqual(sys.monitoring.get_local_events(TEST_TOOL2, code), E.PY_START)
1673 sys.monitoring.set_local_events(TEST_TOOL, code, 0)
1674 self.assertEqual(sys.monitoring.get_local_events(TEST_TOOL, code), 0)
1675 sys.monitoring.set_local_events(TEST_TOOL2, code, 0)
1676 self.assertEqual(sys.monitoring.get_local_events(TEST_TOOL2, code), 0)
1677
1678 class ESC[4;38;5;81mTestUninitialized(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase, ESC[4;38;5;149mMonitoringTestBase):
1679
1680 @staticmethod
1681 def f():
1682 pass
1683
1684 def test_get_local_events_uninitialized(self):
1685 self.assertEqual(sys.monitoring.get_local_events(TEST_TOOL, self.f.__code__), 0)
1686
1687 class ESC[4;38;5;81mTestRegressions(ESC[4;38;5;149mMonitoringTestBase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1688
1689 def test_105162(self):
1690 caught = None
1691
1692 def inner():
1693 nonlocal caught
1694 try:
1695 yield
1696 except Exception:
1697 caught = "inner"
1698 yield
1699
1700 def outer():
1701 nonlocal caught
1702 try:
1703 yield from inner()
1704 except Exception:
1705 caught = "outer"
1706 yield
1707
1708 def run():
1709 gen = outer()
1710 gen.send(None)
1711 gen.throw(Exception)
1712 run()
1713 self.assertEqual(caught, "inner")
1714 caught = None
1715 try:
1716 sys.monitoring.set_events(TEST_TOOL, E.PY_RESUME)
1717 run()
1718 self.assertEqual(caught, "inner")
1719 finally:
1720 sys.monitoring.set_events(TEST_TOOL, 0)
1721
1722 def test_108390(self):
1723
1724 class ESC[4;38;5;81mFoo:
1725 def __init__(self, set_event):
1726 if set_event:
1727 sys.monitoring.set_events(TEST_TOOL, E.PY_RESUME)
1728
1729 def make_foo_optimized_then_set_event():
1730 for i in range(100):
1731 Foo(i == 99)
1732
1733 try:
1734 make_foo_optimized_then_set_event()
1735 finally:
1736 sys.monitoring.set_events(TEST_TOOL, 0)
1737
1738 def test_gh108976(self):
1739 sys.monitoring.use_tool_id(0, "test")
1740 self.addCleanup(sys.monitoring.free_tool_id, 0)
1741 sys.monitoring.set_events(0, 0)
1742 sys.monitoring.register_callback(0, E.LINE, lambda *args: sys.monitoring.set_events(0, 0))
1743 sys.monitoring.register_callback(0, E.INSTRUCTION, lambda *args: 0)
1744 sys.monitoring.set_events(0, E.LINE | E.INSTRUCTION)
1745 sys.monitoring.set_events(0, 0)