python (3.12.0)

(root)/
lib/
python3.12/
test/
test_monitoring.py
       1  """Test suite for the sys.monitoring."""
       2  
       3  import collections
       4  import dis
       5  import functools
       6  import operator
       7  import sys
       8  import textwrap
       9  import types
      10  import unittest
      11  import asyncio
      12  
      13  PAIR = (0,1)
      14  
      15  def f1():
      16      pass
      17  
      18  def f2():
      19      len([])
      20      sys.getsizeof(0)
      21  
      22  def floop():
      23      for item in PAIR:
      24          pass
      25  
      26  def gen():
      27      yield
      28      yield
      29  
      30  def g1():
      31      for _ in gen():
      32          pass
      33  
      34  TEST_TOOL = 2
      35  TEST_TOOL2 = 3
      36  TEST_TOOL3 = 4
      37  
      38  class ESC[4;38;5;81mMonitoringBasicTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      39  
      40      def test_has_objects(self):
      41          m = sys.monitoring
      42          m.events
      43          m.use_tool_id
      44          m.free_tool_id
      45          m.get_tool
      46          m.get_events
      47          m.set_events
      48          m.get_local_events
      49          m.set_local_events
      50          m.register_callback
      51          m.restart_events
      52          m.DISABLE
      53          m.MISSING
      54          m.events.NO_EVENTS
      55  
      56      def test_tool(self):
      57          sys.monitoring.use_tool_id(TEST_TOOL, "MonitoringTest.Tool")
      58          self.assertEqual(sys.monitoring.get_tool(TEST_TOOL), "MonitoringTest.Tool")
      59          sys.monitoring.set_events(TEST_TOOL, 15)
      60          self.assertEqual(sys.monitoring.get_events(TEST_TOOL), 15)
      61          sys.monitoring.set_events(TEST_TOOL, 0)
      62          with self.assertRaises(ValueError):
      63              sys.monitoring.set_events(TEST_TOOL, sys.monitoring.events.C_RETURN)
      64          with self.assertRaises(ValueError):
      65              sys.monitoring.set_events(TEST_TOOL, sys.monitoring.events.C_RAISE)
      66          sys.monitoring.free_tool_id(TEST_TOOL)
      67          self.assertEqual(sys.monitoring.get_tool(TEST_TOOL), None)
      68          with self.assertRaises(ValueError):
      69              sys.monitoring.set_events(TEST_TOOL, sys.monitoring.events.CALL)
      70  
      71  
      72  class ESC[4;38;5;81mMonitoringTestBase:
      73  
      74      def setUp(self):
      75          # Check that a previous test hasn't left monitoring on.
      76          for tool in range(6):
      77              self.assertEqual(sys.monitoring.get_events(tool), 0)
      78          self.assertIs(sys.monitoring.get_tool(TEST_TOOL), None)
      79          self.assertIs(sys.monitoring.get_tool(TEST_TOOL2), None)
      80          self.assertIs(sys.monitoring.get_tool(TEST_TOOL3), None)
      81          sys.monitoring.use_tool_id(TEST_TOOL, "test " + self.__class__.__name__)
      82          sys.monitoring.use_tool_id(TEST_TOOL2, "test2 " + self.__class__.__name__)
      83          sys.monitoring.use_tool_id(TEST_TOOL3, "test3 " + self.__class__.__name__)
      84  
      85      def tearDown(self):
      86          # Check that test hasn't left monitoring on.
      87          for tool in range(6):
      88              self.assertEqual(sys.monitoring.get_events(tool), 0)
      89          sys.monitoring.free_tool_id(TEST_TOOL)
      90          sys.monitoring.free_tool_id(TEST_TOOL2)
      91          sys.monitoring.free_tool_id(TEST_TOOL3)
      92  
      93  
      94  class ESC[4;38;5;81mMonitoringCountTest(ESC[4;38;5;149mMonitoringTestBase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      95  
      96      def check_event_count(self, func, event, expected):
      97  
      98          class ESC[4;38;5;81mCounter:
      99              def __init__(self):
     100                  self.count = 0
     101              def __call__(self, *args):
     102                  self.count += 1
     103  
     104          counter = Counter()
     105          sys.monitoring.register_callback(TEST_TOOL, event, counter)
     106          if event == E.C_RETURN or event == E.C_RAISE:
     107              sys.monitoring.set_events(TEST_TOOL, E.CALL)
     108          else:
     109              sys.monitoring.set_events(TEST_TOOL, event)
     110          self.assertEqual(counter.count, 0)
     111          counter.count = 0
     112          func()
     113          self.assertEqual(counter.count, expected)
     114          prev = sys.monitoring.register_callback(TEST_TOOL, event, None)
     115          counter.count = 0
     116          func()
     117          self.assertEqual(counter.count, 0)
     118          self.assertEqual(prev, counter)
     119          sys.monitoring.set_events(TEST_TOOL, 0)
     120  
     121      def test_start_count(self):
     122          self.check_event_count(f1, E.PY_START, 1)
     123  
     124      def test_resume_count(self):
     125          self.check_event_count(g1, E.PY_RESUME, 2)
     126  
     127      def test_return_count(self):
     128          self.check_event_count(f1, E.PY_RETURN, 1)
     129  
     130      def test_call_count(self):
     131          self.check_event_count(f2, E.CALL, 3)
     132  
     133      def test_c_return_count(self):
     134          self.check_event_count(f2, E.C_RETURN, 2)
     135  
     136  
     137  E = sys.monitoring.events
     138  
     139  INSTRUMENTED_EVENTS = [
     140      (E.PY_START, "start"),
     141      (E.PY_RESUME, "resume"),
     142      (E.PY_RETURN, "return"),
     143      (E.PY_YIELD, "yield"),
     144      (E.JUMP, "jump"),
     145      (E.BRANCH, "branch"),
     146  ]
     147  
     148  EXCEPT_EVENTS = [
     149      (E.RAISE, "raise"),
     150      (E.PY_UNWIND, "unwind"),
     151      (E.EXCEPTION_HANDLED, "exception_handled"),
     152  ]
     153  
     154  SIMPLE_EVENTS = INSTRUMENTED_EVENTS + EXCEPT_EVENTS + [
     155      (E.C_RAISE, "c_raise"),
     156      (E.C_RETURN, "c_return"),
     157  ]
     158  
     159  
     160  SIMPLE_EVENT_SET = functools.reduce(operator.or_, [ev for (ev, _) in SIMPLE_EVENTS], 0) | E.CALL
     161  
     162  
     163  def just_pass():
     164      pass
     165  
     166  just_pass.events = [
     167      "py_call",
     168      "start",
     169      "return",
     170  ]
     171  
     172  def just_raise():
     173      raise Exception
     174  
     175  just_raise.events = [
     176      'py_call',
     177      "start",
     178      "raise",
     179      "unwind",
     180  ]
     181  
     182  def just_call():
     183      len([])
     184  
     185  just_call.events = [
     186      'py_call',
     187      "start",
     188      "c_call",
     189      "c_return",
     190      "return",
     191  ]
     192  
     193  def caught():
     194      try:
     195          1/0
     196      except Exception:
     197          pass
     198  
     199  caught.events = [
     200      'py_call',
     201      "start",
     202      "raise",
     203      "exception_handled",
     204      "branch",
     205      "return",
     206  ]
     207  
     208  def nested_call():
     209      just_pass()
     210  
     211  nested_call.events = [
     212      "py_call",
     213      "start",
     214      "py_call",
     215      "start",
     216      "return",
     217      "return",
     218  ]
     219  
     220  PY_CALLABLES = (types.FunctionType, types.MethodType)
     221  
     222  class ESC[4;38;5;81mMonitoringEventsBase(ESC[4;38;5;149mMonitoringTestBase):
     223  
     224      def gather_events(self, func):
     225          events = []
     226          for event, event_name in SIMPLE_EVENTS:
     227              def record(*args, event_name=event_name):
     228                  events.append(event_name)
     229              sys.monitoring.register_callback(TEST_TOOL, event, record)
     230          def record_call(code, offset, obj, arg):
     231              if isinstance(obj, PY_CALLABLES):
     232                  events.append("py_call")
     233              else:
     234                  events.append("c_call")
     235          sys.monitoring.register_callback(TEST_TOOL, E.CALL, record_call)
     236          sys.monitoring.set_events(TEST_TOOL, SIMPLE_EVENT_SET)
     237          events = []
     238          try:
     239              func()
     240          except:
     241              pass
     242          sys.monitoring.set_events(TEST_TOOL, 0)
     243          #Remove the final event, the call to `sys.monitoring.set_events`
     244          events = events[:-1]
     245          return events
     246  
     247      def check_events(self, func, expected=None):
     248          events = self.gather_events(func)
     249          if expected is None:
     250              expected = func.events
     251          self.assertEqual(events, expected)
     252  
     253  class ESC[4;38;5;81mMonitoringEventsTest(ESC[4;38;5;149mMonitoringEventsBase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     254  
     255      def test_just_pass(self):
     256          self.check_events(just_pass)
     257  
     258      def test_just_raise(self):
     259          try:
     260              self.check_events(just_raise)
     261          except Exception:
     262              pass
     263          self.assertEqual(sys.monitoring.get_events(TEST_TOOL), 0)
     264  
     265      def test_just_call(self):
     266          self.check_events(just_call)
     267  
     268      def test_caught(self):
     269          self.check_events(caught)
     270  
     271      def test_nested_call(self):
     272          self.check_events(nested_call)
     273  
     274  UP_EVENTS = (E.C_RETURN, E.C_RAISE, E.PY_RETURN, E.PY_UNWIND, E.PY_YIELD)
     275  DOWN_EVENTS = (E.PY_START, E.PY_RESUME)
     276  
     277  from test.profilee import testfunc
     278  
     279  class ESC[4;38;5;81mSimulateProfileTest(ESC[4;38;5;149mMonitoringEventsBase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     280  
     281      def test_balanced(self):
     282          events = self.gather_events(testfunc)
     283          c = collections.Counter(events)
     284          self.assertEqual(c["c_call"], c["c_return"])
     285          self.assertEqual(c["start"], c["return"] + c["unwind"])
     286          self.assertEqual(c["raise"], c["exception_handled"] + c["unwind"])
     287  
     288      def test_frame_stack(self):
     289          self.maxDiff = None
     290          stack = []
     291          errors = []
     292          seen = set()
     293          def up(*args):
     294              frame = sys._getframe(1)
     295              if not stack:
     296                  errors.append("empty")
     297              else:
     298                  expected = stack.pop()
     299                  if frame != expected:
     300                      errors.append(f" Popping {frame} expected {expected}")
     301          def down(*args):
     302              frame = sys._getframe(1)
     303              stack.append(frame)
     304              seen.add(frame.f_code)
     305          def call(code, offset, callable, arg):
     306              if not isinstance(callable, PY_CALLABLES):
     307                  stack.append(sys._getframe(1))
     308          for event in UP_EVENTS:
     309              sys.monitoring.register_callback(TEST_TOOL, event, up)
     310          for event in DOWN_EVENTS:
     311              sys.monitoring.register_callback(TEST_TOOL, event, down)
     312          sys.monitoring.register_callback(TEST_TOOL, E.CALL, call)
     313          sys.monitoring.set_events(TEST_TOOL, SIMPLE_EVENT_SET)
     314          testfunc()
     315          sys.monitoring.set_events(TEST_TOOL, 0)
     316          self.assertEqual(errors, [])
     317          self.assertEqual(stack, [sys._getframe()])
     318          self.assertEqual(len(seen), 9)
     319  
     320  
     321  class ESC[4;38;5;81mCounterWithDisable:
     322  
     323      def __init__(self):
     324          self.disable = False
     325          self.count = 0
     326  
     327      def __call__(self, *args):
     328          self.count += 1
     329          if self.disable:
     330              return sys.monitoring.DISABLE
     331  
     332  
     333  class ESC[4;38;5;81mRecorderWithDisable:
     334  
     335      def __init__(self, events):
     336          self.disable = False
     337          self.events = events
     338  
     339      def __call__(self, code, event):
     340          self.events.append(event)
     341          if self.disable:
     342              return sys.monitoring.DISABLE
     343  
     344  
     345  class ESC[4;38;5;81mMontoringDisableAndRestartTest(ESC[4;38;5;149mMonitoringTestBase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     346  
     347      def test_disable(self):
     348          try:
     349              counter = CounterWithDisable()
     350              sys.monitoring.register_callback(TEST_TOOL, E.PY_START, counter)
     351              sys.monitoring.set_events(TEST_TOOL, E.PY_START)
     352              self.assertEqual(counter.count, 0)
     353              counter.count = 0
     354              f1()
     355              self.assertEqual(counter.count, 1)
     356              counter.disable = True
     357              counter.count = 0
     358              f1()
     359              self.assertEqual(counter.count, 1)
     360              counter.count = 0
     361              f1()
     362              self.assertEqual(counter.count, 0)
     363              sys.monitoring.set_events(TEST_TOOL, 0)
     364          finally:
     365              sys.monitoring.restart_events()
     366  
     367      def test_restart(self):
     368          try:
     369              counter = CounterWithDisable()
     370              sys.monitoring.register_callback(TEST_TOOL, E.PY_START, counter)
     371              sys.monitoring.set_events(TEST_TOOL, E.PY_START)
     372              counter.disable = True
     373              f1()
     374              counter.count = 0
     375              f1()
     376              self.assertEqual(counter.count, 0)
     377              sys.monitoring.restart_events()
     378              counter.count = 0
     379              f1()
     380              self.assertEqual(counter.count, 1)
     381              sys.monitoring.set_events(TEST_TOOL, 0)
     382          finally:
     383              sys.monitoring.restart_events()
     384  
     385  
     386  class ESC[4;38;5;81mMultipleMonitorsTest(ESC[4;38;5;149mMonitoringTestBase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     387  
     388      def test_two_same(self):
     389          try:
     390              self.assertEqual(sys.monitoring._all_events(), {})
     391              counter1 = CounterWithDisable()
     392              counter2 = CounterWithDisable()
     393              sys.monitoring.register_callback(TEST_TOOL, E.PY_START, counter1)
     394              sys.monitoring.register_callback(TEST_TOOL2, E.PY_START, counter2)
     395              sys.monitoring.set_events(TEST_TOOL, E.PY_START)
     396              sys.monitoring.set_events(TEST_TOOL2, E.PY_START)
     397              self.assertEqual(sys.monitoring.get_events(TEST_TOOL), E.PY_START)
     398              self.assertEqual(sys.monitoring.get_events(TEST_TOOL2), E.PY_START)
     399              self.assertEqual(sys.monitoring._all_events(), {'PY_START': (1 << TEST_TOOL) | (1 << TEST_TOOL2)})
     400              counter1.count = 0
     401              counter2.count = 0
     402              f1()
     403              count1 = counter1.count
     404              count2 = counter2.count
     405              self.assertEqual((count1, count2), (1, 1))
     406          finally:
     407              sys.monitoring.set_events(TEST_TOOL, 0)
     408              sys.monitoring.set_events(TEST_TOOL2, 0)
     409              sys.monitoring.register_callback(TEST_TOOL, E.PY_START, None)
     410              sys.monitoring.register_callback(TEST_TOOL2, E.PY_START, None)
     411              self.assertEqual(sys.monitoring._all_events(), {})
     412  
     413      def test_three_same(self):
     414          try:
     415              self.assertEqual(sys.monitoring._all_events(), {})
     416              counter1 = CounterWithDisable()
     417              counter2 = CounterWithDisable()
     418              counter3 = CounterWithDisable()
     419              sys.monitoring.register_callback(TEST_TOOL, E.PY_START, counter1)
     420              sys.monitoring.register_callback(TEST_TOOL2, E.PY_START, counter2)
     421              sys.monitoring.register_callback(TEST_TOOL3, E.PY_START, counter3)
     422              sys.monitoring.set_events(TEST_TOOL, E.PY_START)
     423              sys.monitoring.set_events(TEST_TOOL2, E.PY_START)
     424              sys.monitoring.set_events(TEST_TOOL3, E.PY_START)
     425              self.assertEqual(sys.monitoring.get_events(TEST_TOOL), E.PY_START)
     426              self.assertEqual(sys.monitoring.get_events(TEST_TOOL2), E.PY_START)
     427              self.assertEqual(sys.monitoring.get_events(TEST_TOOL3), E.PY_START)
     428              self.assertEqual(sys.monitoring._all_events(), {'PY_START': (1 << TEST_TOOL) | (1 << TEST_TOOL2) | (1 << TEST_TOOL3)})
     429              counter1.count = 0
     430              counter2.count = 0
     431              counter3.count = 0
     432              f1()
     433              count1 = counter1.count
     434              count2 = counter2.count
     435              count3 = counter3.count
     436              self.assertEqual((count1, count2, count3), (1, 1, 1))
     437          finally:
     438              sys.monitoring.set_events(TEST_TOOL, 0)
     439              sys.monitoring.set_events(TEST_TOOL2, 0)
     440              sys.monitoring.set_events(TEST_TOOL3, 0)
     441              sys.monitoring.register_callback(TEST_TOOL, E.PY_START, None)
     442              sys.monitoring.register_callback(TEST_TOOL2, E.PY_START, None)
     443              sys.monitoring.register_callback(TEST_TOOL3, E.PY_START, None)
     444              self.assertEqual(sys.monitoring._all_events(), {})
     445  
     446      def test_two_different(self):
     447          try:
     448              self.assertEqual(sys.monitoring._all_events(), {})
     449              counter1 = CounterWithDisable()
     450              counter2 = CounterWithDisable()
     451              sys.monitoring.register_callback(TEST_TOOL, E.PY_START, counter1)
     452              sys.monitoring.register_callback(TEST_TOOL2, E.PY_RETURN, counter2)
     453              sys.monitoring.set_events(TEST_TOOL, E.PY_START)
     454              sys.monitoring.set_events(TEST_TOOL2, E.PY_RETURN)
     455              self.assertEqual(sys.monitoring.get_events(TEST_TOOL), E.PY_START)
     456              self.assertEqual(sys.monitoring.get_events(TEST_TOOL2), E.PY_RETURN)
     457              self.assertEqual(sys.monitoring._all_events(), {'PY_START': 1 << TEST_TOOL, 'PY_RETURN': 1 << TEST_TOOL2})
     458              counter1.count = 0
     459              counter2.count = 0
     460              f1()
     461              count1 = counter1.count
     462              count2 = counter2.count
     463              self.assertEqual((count1, count2), (1, 1))
     464          finally:
     465              sys.monitoring.set_events(TEST_TOOL, 0)
     466              sys.monitoring.set_events(TEST_TOOL2, 0)
     467              sys.monitoring.register_callback(TEST_TOOL, E.PY_START, None)
     468              sys.monitoring.register_callback(TEST_TOOL2, E.PY_RETURN, None)
     469              self.assertEqual(sys.monitoring._all_events(), {})
     470  
     471      def test_two_with_disable(self):
     472          try:
     473              self.assertEqual(sys.monitoring._all_events(), {})
     474              counter1 = CounterWithDisable()
     475              counter2 = CounterWithDisable()
     476              sys.monitoring.register_callback(TEST_TOOL, E.PY_START, counter1)
     477              sys.monitoring.register_callback(TEST_TOOL2, E.PY_START, counter2)
     478              sys.monitoring.set_events(TEST_TOOL, E.PY_START)
     479              sys.monitoring.set_events(TEST_TOOL2, E.PY_START)
     480              self.assertEqual(sys.monitoring.get_events(TEST_TOOL), E.PY_START)
     481              self.assertEqual(sys.monitoring.get_events(TEST_TOOL2), E.PY_START)
     482              self.assertEqual(sys.monitoring._all_events(), {'PY_START': (1 << TEST_TOOL) | (1 << TEST_TOOL2)})
     483              counter1.count = 0
     484              counter2.count = 0
     485              counter1.disable = True
     486              f1()
     487              count1 = counter1.count
     488              count2 = counter2.count
     489              self.assertEqual((count1, count2), (1, 1))
     490              counter1.count = 0
     491              counter2.count = 0
     492              f1()
     493              count1 = counter1.count
     494              count2 = counter2.count
     495              self.assertEqual((count1, count2), (0, 1))
     496          finally:
     497              sys.monitoring.set_events(TEST_TOOL, 0)
     498              sys.monitoring.set_events(TEST_TOOL2, 0)
     499              sys.monitoring.register_callback(TEST_TOOL, E.PY_START, None)
     500              sys.monitoring.register_callback(TEST_TOOL2, E.PY_START, None)
     501              self.assertEqual(sys.monitoring._all_events(), {})
     502              sys.monitoring.restart_events()
     503  
     504      def test_with_instruction_event(self):
     505          """Test that the second tool can set events with instruction events set by the first tool."""
     506          def f():
     507              pass
     508          code = f.__code__
     509  
     510          try:
     511              self.assertEqual(sys.monitoring._all_events(), {})
     512              sys.monitoring.set_local_events(TEST_TOOL, code, E.INSTRUCTION | E.LINE)
     513              sys.monitoring.set_local_events(TEST_TOOL2, code, E.LINE)
     514          finally:
     515              sys.monitoring.set_events(TEST_TOOL, 0)
     516              sys.monitoring.set_events(TEST_TOOL2, 0)
     517              self.assertEqual(sys.monitoring._all_events(), {})
     518  
     519  
     520  class ESC[4;38;5;81mLineMonitoringTest(ESC[4;38;5;149mMonitoringTestBase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     521  
     522      def test_lines_single(self):
     523          try:
     524              self.assertEqual(sys.monitoring._all_events(), {})
     525              events = []
     526              recorder = RecorderWithDisable(events)
     527              sys.monitoring.register_callback(TEST_TOOL, E.LINE, recorder)
     528              sys.monitoring.set_events(TEST_TOOL, E.LINE)
     529              f1()
     530              sys.monitoring.set_events(TEST_TOOL, 0)
     531              sys.monitoring.register_callback(TEST_TOOL, E.LINE, None)
     532              start = LineMonitoringTest.test_lines_single.__code__.co_firstlineno
     533              self.assertEqual(events, [start+7, 16, start+8])
     534          finally:
     535              sys.monitoring.set_events(TEST_TOOL, 0)
     536              sys.monitoring.register_callback(TEST_TOOL, E.LINE, None)
     537              self.assertEqual(sys.monitoring._all_events(), {})
     538              sys.monitoring.restart_events()
     539  
     540      def test_lines_loop(self):
     541          try:
     542              self.assertEqual(sys.monitoring._all_events(), {})
     543              events = []
     544              recorder = RecorderWithDisable(events)
     545              sys.monitoring.register_callback(TEST_TOOL, E.LINE, recorder)
     546              sys.monitoring.set_events(TEST_TOOL, E.LINE)
     547              floop()
     548              sys.monitoring.set_events(TEST_TOOL, 0)
     549              sys.monitoring.register_callback(TEST_TOOL, E.LINE, None)
     550              start = LineMonitoringTest.test_lines_loop.__code__.co_firstlineno
     551              self.assertEqual(events, [start+7, 23, 24, 23, 24, 23, start+8])
     552          finally:
     553              sys.monitoring.set_events(TEST_TOOL, 0)
     554              sys.monitoring.register_callback(TEST_TOOL, E.LINE, None)
     555              self.assertEqual(sys.monitoring._all_events(), {})
     556              sys.monitoring.restart_events()
     557  
     558      def test_lines_two(self):
     559          try:
     560              self.assertEqual(sys.monitoring._all_events(), {})
     561              events = []
     562              recorder = RecorderWithDisable(events)
     563              events2 = []
     564              recorder2 = RecorderWithDisable(events2)
     565              sys.monitoring.register_callback(TEST_TOOL, E.LINE, recorder)
     566              sys.monitoring.register_callback(TEST_TOOL2, E.LINE, recorder2)
     567              sys.monitoring.set_events(TEST_TOOL, E.LINE); sys.monitoring.set_events(TEST_TOOL2, E.LINE)
     568              f1()
     569              sys.monitoring.set_events(TEST_TOOL, 0); sys.monitoring.set_events(TEST_TOOL2, 0)
     570              sys.monitoring.register_callback(TEST_TOOL, E.LINE, None)
     571              sys.monitoring.register_callback(TEST_TOOL2, E.LINE, None)
     572              start = LineMonitoringTest.test_lines_two.__code__.co_firstlineno
     573              expected = [start+10, 16, start+11]
     574              self.assertEqual(events, expected)
     575              self.assertEqual(events2, expected)
     576          finally:
     577              sys.monitoring.set_events(TEST_TOOL, 0)
     578              sys.monitoring.set_events(TEST_TOOL2, 0)
     579              sys.monitoring.register_callback(TEST_TOOL, E.LINE, None)
     580              sys.monitoring.register_callback(TEST_TOOL2, E.LINE, None)
     581              self.assertEqual(sys.monitoring._all_events(), {})
     582              sys.monitoring.restart_events()
     583  
     584      def check_lines(self, func, expected, tool=TEST_TOOL):
     585          try:
     586              self.assertEqual(sys.monitoring._all_events(), {})
     587              events = []
     588              recorder = RecorderWithDisable(events)
     589              sys.monitoring.register_callback(tool, E.LINE, recorder)
     590              sys.monitoring.set_events(tool, E.LINE)
     591              func()
     592              sys.monitoring.set_events(tool, 0)
     593              sys.monitoring.register_callback(tool, E.LINE, None)
     594              lines = [ line - func.__code__.co_firstlineno for line in events[1:-1] ]
     595              self.assertEqual(lines, expected)
     596          finally:
     597              sys.monitoring.set_events(tool, 0)
     598  
     599  
     600      def test_linear(self):
     601  
     602          def func():
     603              line = 1
     604              line = 2
     605              line = 3
     606              line = 4
     607              line = 5
     608  
     609          self.check_lines(func, [1,2,3,4,5])
     610  
     611      def test_branch(self):
     612          def func():
     613              if "true".startswith("t"):
     614                  line = 2
     615                  line = 3
     616              else:
     617                  line = 5
     618              line = 6
     619  
     620          self.check_lines(func, [1,2,3,6])
     621  
     622      def test_try_except(self):
     623  
     624          def func1():
     625              try:
     626                  line = 2
     627                  line = 3
     628              except:
     629                  line = 5
     630              line = 6
     631  
     632          self.check_lines(func1, [1,2,3,6])
     633  
     634          def func2():
     635              try:
     636                  line = 2
     637                  raise 3
     638              except:
     639                  line = 5
     640              line = 6
     641  
     642          self.check_lines(func2, [1,2,3,4,5,6])
     643  
     644  class ESC[4;38;5;81mTestDisable(ESC[4;38;5;149mMonitoringTestBase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     645  
     646      def gen(self, cond):
     647          for i in range(10):
     648              if cond:
     649                  yield 1
     650              else:
     651                  yield 2
     652  
     653      def raise_handle_reraise(self):
     654          try:
     655              1/0
     656          except:
     657              raise
     658  
     659      def test_disable_legal_events(self):
     660          for event, name in INSTRUMENTED_EVENTS:
     661              try:
     662                  counter = CounterWithDisable()
     663                  counter.disable = True
     664                  sys.monitoring.register_callback(TEST_TOOL, event, counter)
     665                  sys.monitoring.set_events(TEST_TOOL, event)
     666                  for _ in self.gen(1):
     667                      pass
     668                  self.assertLess(counter.count, 4)
     669              finally:
     670                  sys.monitoring.set_events(TEST_TOOL, 0)
     671                  sys.monitoring.register_callback(TEST_TOOL, event, None)
     672  
     673  
     674      def test_disable_illegal_events(self):
     675          for event, name in EXCEPT_EVENTS:
     676              try:
     677                  counter = CounterWithDisable()
     678                  counter.disable = True
     679                  sys.monitoring.register_callback(TEST_TOOL, event, counter)
     680                  sys.monitoring.set_events(TEST_TOOL, event)
     681                  with self.assertRaises(ValueError):
     682                      self.raise_handle_reraise()
     683              finally:
     684                  sys.monitoring.set_events(TEST_TOOL, 0)
     685                  sys.monitoring.register_callback(TEST_TOOL, event, None)
     686  
     687  
     688  class ESC[4;38;5;81mExceptionRecorder:
     689  
     690      event_type = E.RAISE
     691  
     692      def __init__(self, events):
     693          self.events = events
     694  
     695      def __call__(self, code, offset, exc):
     696          self.events.append(("raise", type(exc)))
     697  
     698  class ESC[4;38;5;81mCheckEvents(ESC[4;38;5;149mMonitoringTestBase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     699  
     700      def get_events(self, func, tool, recorders):
     701          try:
     702              self.assertEqual(sys.monitoring._all_events(), {})
     703              event_list = []
     704              all_events = 0
     705              for recorder in recorders:
     706                  ev = recorder.event_type
     707                  sys.monitoring.register_callback(tool, ev, recorder(event_list))
     708                  all_events |= ev
     709              sys.monitoring.set_events(tool, all_events)
     710              func()
     711              sys.monitoring.set_events(tool, 0)
     712              for recorder in recorders:
     713                  sys.monitoring.register_callback(tool, recorder.event_type, None)
     714              return event_list
     715          finally:
     716              sys.monitoring.set_events(tool, 0)
     717              for recorder in recorders:
     718                  sys.monitoring.register_callback(tool, recorder.event_type, None)
     719  
     720      def check_events(self, func, expected, tool=TEST_TOOL, recorders=(ExceptionRecorder,)):
     721          events = self.get_events(func, tool, recorders)
     722          if events != expected:
     723              print(events, file = sys.stderr)
     724          self.assertEqual(events, expected)
     725  
     726      def check_balanced(self, func, recorders):
     727          events = self.get_events(func, TEST_TOOL, recorders)
     728          self.assertEqual(len(events)%2, 0)
     729          for r, h in zip(events[::2],events[1::2]):
     730              r0 = r[0]
     731              self.assertIn(r0, ("raise", "reraise"))
     732              h0 = h[0]
     733              self.assertIn(h0, ("handled", "unwind"))
     734              self.assertEqual(r[1], h[1])
     735  
     736  
     737  class ESC[4;38;5;81mStopiterationRecorder(ESC[4;38;5;149mExceptionRecorder):
     738  
     739      event_type = E.STOP_ITERATION
     740  
     741  class ESC[4;38;5;81mReraiseRecorder(ESC[4;38;5;149mExceptionRecorder):
     742  
     743      event_type = E.RERAISE
     744  
     745      def __call__(self, code, offset, exc):
     746          self.events.append(("reraise", type(exc)))
     747  
     748  class ESC[4;38;5;81mUnwindRecorder(ESC[4;38;5;149mExceptionRecorder):
     749  
     750      event_type = E.PY_UNWIND
     751  
     752      def __call__(self, code, offset, exc):
     753          self.events.append(("unwind", type(exc)))
     754  
     755  class ESC[4;38;5;81mExceptionHandledRecorder(ESC[4;38;5;149mExceptionRecorder):
     756  
     757      event_type = E.EXCEPTION_HANDLED
     758  
     759      def __call__(self, code, offset, exc):
     760          self.events.append(("handled", type(exc)))
     761  
     762  class ESC[4;38;5;81mThrowRecorder(ESC[4;38;5;149mExceptionRecorder):
     763  
     764      event_type = E.PY_THROW
     765  
     766      def __call__(self, code, offset, exc):
     767          self.events.append(("throw", type(exc)))
     768  
     769  class ESC[4;38;5;81mExceptionMonitoringTest(ESC[4;38;5;149mCheckEvents):
     770  
     771  
     772      exception_recorders = (
     773          ExceptionRecorder,
     774          ReraiseRecorder,
     775          ExceptionHandledRecorder,
     776          UnwindRecorder
     777      )
     778  
     779      def test_simple_try_except(self):
     780  
     781          def func1():
     782              try:
     783                  line = 2
     784                  raise KeyError
     785              except:
     786                  line = 5
     787              line = 6
     788  
     789          self.check_events(func1, [("raise", KeyError)])
     790  
     791      def test_implicit_stop_iteration(self):
     792  
     793          def gen():
     794              yield 1
     795              return 2
     796  
     797          def implicit_stop_iteration():
     798              for _ in gen():
     799                  pass
     800  
     801          self.check_events(implicit_stop_iteration, [("raise", StopIteration)], recorders=(StopiterationRecorder,))
     802  
     803      initial = [
     804          ("raise", ZeroDivisionError),
     805          ("handled", ZeroDivisionError)
     806      ]
     807  
     808      reraise = [
     809          ("reraise", ZeroDivisionError),
     810          ("handled", ZeroDivisionError)
     811      ]
     812  
     813      def test_explicit_reraise(self):
     814  
     815          def func():
     816              try:
     817                  try:
     818                      1/0
     819                  except:
     820                      raise
     821              except:
     822                  pass
     823  
     824          self.check_balanced(
     825              func,
     826              recorders = self.exception_recorders)
     827  
     828      def test_explicit_reraise_named(self):
     829  
     830          def func():
     831              try:
     832                  try:
     833                      1/0
     834                  except Exception as ex:
     835                      raise
     836              except:
     837                  pass
     838  
     839          self.check_balanced(
     840              func,
     841              recorders = self.exception_recorders)
     842  
     843      def test_implicit_reraise(self):
     844  
     845          def func():
     846              try:
     847                  try:
     848                      1/0
     849                  except ValueError:
     850                      pass
     851              except:
     852                  pass
     853  
     854          self.check_balanced(
     855              func,
     856              recorders = self.exception_recorders)
     857  
     858  
     859      def test_implicit_reraise_named(self):
     860  
     861          def func():
     862              try:
     863                  try:
     864                      1/0
     865                  except ValueError as ex:
     866                      pass
     867              except:
     868                  pass
     869  
     870          self.check_balanced(
     871              func,
     872              recorders = self.exception_recorders)
     873  
     874      def test_try_finally(self):
     875  
     876          def func():
     877              try:
     878                  try:
     879                      1/0
     880                  finally:
     881                      pass
     882              except:
     883                  pass
     884  
     885          self.check_balanced(
     886              func,
     887              recorders = self.exception_recorders)
     888  
     889      def test_async_for(self):
     890  
     891          def func():
     892  
     893              async def async_generator():
     894                  for i in range(1):
     895                      raise ZeroDivisionError
     896                      yield i
     897  
     898              async def async_loop():
     899                  try:
     900                      async for item in async_generator():
     901                          pass
     902                  except Exception:
     903                      pass
     904  
     905              try:
     906                  async_loop().send(None)
     907              except StopIteration:
     908                  pass
     909  
     910          self.check_balanced(
     911              func,
     912              recorders = self.exception_recorders)
     913  
     914      def test_throw(self):
     915  
     916          def gen():
     917              yield 1
     918              yield 2
     919  
     920          def func():
     921              try:
     922                  g = gen()
     923                  next(g)
     924                  g.throw(IndexError)
     925              except IndexError:
     926                  pass
     927  
     928          self.check_balanced(
     929              func,
     930              recorders = self.exception_recorders)
     931  
     932          events = self.get_events(
     933              func,
     934              TEST_TOOL,
     935              self.exception_recorders + (ThrowRecorder,)
     936          )
     937          self.assertEqual(events[0], ("throw", IndexError))
     938  
     939  class ESC[4;38;5;81mLineRecorder:
     940  
     941      event_type = E.LINE
     942  
     943  
     944      def __init__(self, events):
     945          self.events = events
     946  
     947      def __call__(self, code, line):
     948          self.events.append(("line", code.co_name, line - code.co_firstlineno))
     949  
     950  class ESC[4;38;5;81mCallRecorder:
     951  
     952      event_type = E.CALL
     953  
     954      def __init__(self, events):
     955          self.events = events
     956  
     957      def __call__(self, code, offset, func, arg):
     958          self.events.append(("call", func.__name__, arg))
     959  
     960  class ESC[4;38;5;81mCEventRecorder:
     961  
     962      def __init__(self, events):
     963          self.events = events
     964  
     965      def __call__(self, code, offset, func, arg):
     966          self.events.append((self.event_name, func.__name__, arg))
     967  
     968  class ESC[4;38;5;81mCReturnRecorder(ESC[4;38;5;149mCEventRecorder):
     969  
     970      event_type = E.C_RETURN
     971      event_name = "C return"
     972  
     973  class ESC[4;38;5;81mCRaiseRecorder(ESC[4;38;5;149mCEventRecorder):
     974  
     975      event_type = E.C_RAISE
     976      event_name = "C raise"
     977  
     978  MANY_RECORDERS = ExceptionRecorder, CallRecorder, LineRecorder, CReturnRecorder, CRaiseRecorder
     979  
     980  class ESC[4;38;5;81mTestManyEvents(ESC[4;38;5;149mCheckEvents):
     981  
     982      def test_simple(self):
     983  
     984          def func1():
     985              line1 = 1
     986              line2 = 2
     987              line3 = 3
     988  
     989          self.check_events(func1, recorders = MANY_RECORDERS, expected = [
     990              ('line', 'get_events', 10),
     991              ('call', 'func1', sys.monitoring.MISSING),
     992              ('line', 'func1', 1),
     993              ('line', 'func1', 2),
     994              ('line', 'func1', 3),
     995              ('line', 'get_events', 11),
     996              ('call', 'set_events', 2)])
     997  
     998      def test_c_call(self):
     999  
    1000          def func2():
    1001              line1 = 1
    1002              [].append(2)
    1003              line3 = 3
    1004  
    1005          self.check_events(func2, recorders = MANY_RECORDERS, expected = [
    1006              ('line', 'get_events', 10),
    1007              ('call', 'func2', sys.monitoring.MISSING),
    1008              ('line', 'func2', 1),
    1009              ('line', 'func2', 2),
    1010              ('call', 'append', [2]),
    1011              ('C return', 'append', [2]),
    1012              ('line', 'func2', 3),
    1013              ('line', 'get_events', 11),
    1014              ('call', 'set_events', 2)])
    1015  
    1016      def test_try_except(self):
    1017  
    1018          def func3():
    1019              try:
    1020                  line = 2
    1021                  raise KeyError
    1022              except:
    1023                  line = 5
    1024              line = 6
    1025  
    1026          self.check_events(func3, recorders = MANY_RECORDERS, expected = [
    1027              ('line', 'get_events', 10),
    1028              ('call', 'func3', sys.monitoring.MISSING),
    1029              ('line', 'func3', 1),
    1030              ('line', 'func3', 2),
    1031              ('line', 'func3', 3),
    1032              ('raise', KeyError),
    1033              ('line', 'func3', 4),
    1034              ('line', 'func3', 5),
    1035              ('line', 'func3', 6),
    1036              ('line', 'get_events', 11),
    1037              ('call', 'set_events', 2)])
    1038  
    1039  class ESC[4;38;5;81mInstructionRecorder:
    1040  
    1041      event_type = E.INSTRUCTION
    1042  
    1043      def __init__(self, events):
    1044          self.events = events
    1045  
    1046      def __call__(self, code, offset):
    1047          # Filter out instructions in check_events to lower noise
    1048          if code.co_name != "get_events":
    1049              self.events.append(("instruction", code.co_name, offset))
    1050  
    1051  
    1052  LINE_AND_INSTRUCTION_RECORDERS = InstructionRecorder, LineRecorder
    1053  
    1054  class ESC[4;38;5;81mTestLineAndInstructionEvents(ESC[4;38;5;149mCheckEvents):
    1055      maxDiff = None
    1056  
    1057      def test_simple(self):
    1058  
    1059          def func1():
    1060              line1 = 1
    1061              line2 = 2
    1062              line3 = 3
    1063  
    1064          self.check_events(func1, recorders = LINE_AND_INSTRUCTION_RECORDERS, expected = [
    1065              ('line', 'get_events', 10),
    1066              ('line', 'func1', 1),
    1067              ('instruction', 'func1', 2),
    1068              ('instruction', 'func1', 4),
    1069              ('line', 'func1', 2),
    1070              ('instruction', 'func1', 6),
    1071              ('instruction', 'func1', 8),
    1072              ('line', 'func1', 3),
    1073              ('instruction', 'func1', 10),
    1074              ('instruction', 'func1', 12),
    1075              ('instruction', 'func1', 14),
    1076              ('line', 'get_events', 11)])
    1077  
    1078      def test_c_call(self):
    1079  
    1080          def func2():
    1081              line1 = 1
    1082              [].append(2)
    1083              line3 = 3
    1084  
    1085          self.check_events(func2, recorders = LINE_AND_INSTRUCTION_RECORDERS, expected = [
    1086              ('line', 'get_events', 10),
    1087              ('line', 'func2', 1),
    1088              ('instruction', 'func2', 2),
    1089              ('instruction', 'func2', 4),
    1090              ('line', 'func2', 2),
    1091              ('instruction', 'func2', 6),
    1092              ('instruction', 'func2', 8),
    1093              ('instruction', 'func2', 28),
    1094              ('instruction', 'func2', 30),
    1095              ('instruction', 'func2', 38),
    1096              ('line', 'func2', 3),
    1097              ('instruction', 'func2', 40),
    1098              ('instruction', 'func2', 42),
    1099              ('instruction', 'func2', 44),
    1100              ('line', 'get_events', 11)])
    1101  
    1102      def test_try_except(self):
    1103  
    1104          def func3():
    1105              try:
    1106                  line = 2
    1107                  raise KeyError
    1108              except:
    1109                  line = 5
    1110              line = 6
    1111  
    1112          self.check_events(func3, recorders = LINE_AND_INSTRUCTION_RECORDERS, expected = [
    1113              ('line', 'get_events', 10),
    1114              ('line', 'func3', 1),
    1115              ('instruction', 'func3', 2),
    1116              ('line', 'func3', 2),
    1117              ('instruction', 'func3', 4),
    1118              ('instruction', 'func3', 6),
    1119              ('line', 'func3', 3),
    1120              ('instruction', 'func3', 8),
    1121              ('instruction', 'func3', 18),
    1122              ('instruction', 'func3', 20),
    1123              ('line', 'func3', 4),
    1124              ('instruction', 'func3', 22),
    1125              ('line', 'func3', 5),
    1126              ('instruction', 'func3', 24),
    1127              ('instruction', 'func3', 26),
    1128              ('instruction', 'func3', 28),
    1129              ('line', 'func3', 6),
    1130              ('instruction', 'func3', 30),
    1131              ('instruction', 'func3', 32),
    1132              ('instruction', 'func3', 34),
    1133              ('line', 'get_events', 11)])
    1134  
    1135      def test_with_restart(self):
    1136          def func1():
    1137              line1 = 1
    1138              line2 = 2
    1139              line3 = 3
    1140  
    1141          self.check_events(func1, recorders = LINE_AND_INSTRUCTION_RECORDERS, expected = [
    1142              ('line', 'get_events', 10),
    1143              ('line', 'func1', 1),
    1144              ('instruction', 'func1', 2),
    1145              ('instruction', 'func1', 4),
    1146              ('line', 'func1', 2),
    1147              ('instruction', 'func1', 6),
    1148              ('instruction', 'func1', 8),
    1149              ('line', 'func1', 3),
    1150              ('instruction', 'func1', 10),
    1151              ('instruction', 'func1', 12),
    1152              ('instruction', 'func1', 14),
    1153              ('line', 'get_events', 11)])
    1154  
    1155          sys.monitoring.restart_events()
    1156  
    1157          self.check_events(func1, recorders = LINE_AND_INSTRUCTION_RECORDERS, expected = [
    1158              ('line', 'get_events', 10),
    1159              ('line', 'func1', 1),
    1160              ('instruction', 'func1', 2),
    1161              ('instruction', 'func1', 4),
    1162              ('line', 'func1', 2),
    1163              ('instruction', 'func1', 6),
    1164              ('instruction', 'func1', 8),
    1165              ('line', 'func1', 3),
    1166              ('instruction', 'func1', 10),
    1167              ('instruction', 'func1', 12),
    1168              ('instruction', 'func1', 14),
    1169              ('line', 'get_events', 11)])
    1170  
    1171  class ESC[4;38;5;81mTestInstallIncrementallly(ESC[4;38;5;149mMonitoringTestBase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1172  
    1173      def check_events(self, func, must_include, tool=TEST_TOOL, recorders=(ExceptionRecorder,)):
    1174          try:
    1175              self.assertEqual(sys.monitoring._all_events(), {})
    1176              event_list = []
    1177              all_events = 0
    1178              for recorder in recorders:
    1179                  all_events |= recorder.event_type
    1180                  sys.monitoring.set_events(tool, all_events)
    1181              for recorder in recorders:
    1182                  sys.monitoring.register_callback(tool, recorder.event_type, recorder(event_list))
    1183              func()
    1184              sys.monitoring.set_events(tool, 0)
    1185              for recorder in recorders:
    1186                  sys.monitoring.register_callback(tool, recorder.event_type, None)
    1187              for line in must_include:
    1188                  self.assertIn(line, event_list)
    1189          finally:
    1190              sys.monitoring.set_events(tool, 0)
    1191              for recorder in recorders:
    1192                  sys.monitoring.register_callback(tool, recorder.event_type, None)
    1193  
    1194      @staticmethod
    1195      def func1():
    1196          line1 = 1
    1197  
    1198      MUST_INCLUDE_LI = [
    1199              ('instruction', 'func1', 2),
    1200              ('line', 'func1', 1),
    1201              ('instruction', 'func1', 4),
    1202              ('instruction', 'func1', 6)]
    1203  
    1204      def test_line_then_instruction(self):
    1205          recorders = [ LineRecorder, InstructionRecorder ]
    1206          self.check_events(self.func1,
    1207                            recorders = recorders, must_include = self.EXPECTED_LI)
    1208  
    1209      def test_instruction_then_line(self):
    1210          recorders = [ InstructionRecorder, LineRecorderLowNoise ]
    1211          self.check_events(self.func1,
    1212                            recorders = recorders, must_include = self.EXPECTED_LI)
    1213  
    1214      @staticmethod
    1215      def func2():
    1216          len(())
    1217  
    1218      MUST_INCLUDE_CI = [
    1219              ('instruction', 'func2', 2),
    1220              ('call', 'func2', sys.monitoring.MISSING),
    1221              ('call', 'len', ()),
    1222              ('instruction', 'func2', 12),
    1223              ('instruction', 'func2', 14)]
    1224  
    1225  
    1226  
    1227      def test_line_then_instruction(self):
    1228          recorders = [ CallRecorder, InstructionRecorder ]
    1229          self.check_events(self.func2,
    1230                            recorders = recorders, must_include = self.MUST_INCLUDE_CI)
    1231  
    1232      def test_instruction_then_line(self):
    1233          recorders = [ InstructionRecorder, CallRecorder ]
    1234          self.check_events(self.func2,
    1235                            recorders = recorders, must_include = self.MUST_INCLUDE_CI)
    1236  
    1237  LOCAL_RECORDERS = CallRecorder, LineRecorder, CReturnRecorder, CRaiseRecorder
    1238  
    1239  class ESC[4;38;5;81mTestLocalEvents(ESC[4;38;5;149mMonitoringTestBase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1240  
    1241      def check_events(self, func, expected, tool=TEST_TOOL, recorders=()):
    1242          try:
    1243              self.assertEqual(sys.monitoring._all_events(), {})
    1244              event_list = []
    1245              all_events = 0
    1246              for recorder in recorders:
    1247                  ev = recorder.event_type
    1248                  sys.monitoring.register_callback(tool, ev, recorder(event_list))
    1249                  all_events |= ev
    1250              sys.monitoring.set_local_events(tool, func.__code__, all_events)
    1251              func()
    1252              sys.monitoring.set_local_events(tool, func.__code__, 0)
    1253              for recorder in recorders:
    1254                  sys.monitoring.register_callback(tool, recorder.event_type, None)
    1255              self.assertEqual(event_list, expected)
    1256          finally:
    1257              sys.monitoring.set_local_events(tool, func.__code__, 0)
    1258              for recorder in recorders:
    1259                  sys.monitoring.register_callback(tool, recorder.event_type, None)
    1260  
    1261  
    1262      def test_simple(self):
    1263  
    1264          def func1():
    1265              line1 = 1
    1266              line2 = 2
    1267              line3 = 3
    1268  
    1269          self.check_events(func1, recorders = LOCAL_RECORDERS, expected = [
    1270              ('line', 'func1', 1),
    1271              ('line', 'func1', 2),
    1272              ('line', 'func1', 3)])
    1273  
    1274      def test_c_call(self):
    1275  
    1276          def func2():
    1277              line1 = 1
    1278              [].append(2)
    1279              line3 = 3
    1280  
    1281          self.check_events(func2, recorders = LOCAL_RECORDERS, expected = [
    1282              ('line', 'func2', 1),
    1283              ('line', 'func2', 2),
    1284              ('call', 'append', [2]),
    1285              ('C return', 'append', [2]),
    1286              ('line', 'func2', 3)])
    1287  
    1288      def test_try_except(self):
    1289  
    1290          def func3():
    1291              try:
    1292                  line = 2
    1293                  raise KeyError
    1294              except:
    1295                  line = 5
    1296              line = 6
    1297  
    1298          self.check_events(func3, recorders = LOCAL_RECORDERS, expected = [
    1299              ('line', 'func3', 1),
    1300              ('line', 'func3', 2),
    1301              ('line', 'func3', 3),
    1302              ('line', 'func3', 4),
    1303              ('line', 'func3', 5),
    1304              ('line', 'func3', 6)])
    1305  
    1306      def test_set_non_local_event(self):
    1307          with self.assertRaises(ValueError):
    1308              sys.monitoring.set_local_events(TEST_TOOL, just_call.__code__, E.RAISE)
    1309  
    1310  def line_from_offset(code, offset):
    1311      for start, end, line in code.co_lines():
    1312          if start <= offset < end:
    1313              if line is None:
    1314                  return f"[offset={offset}]"
    1315              return line - code.co_firstlineno
    1316      return -1
    1317  
    1318  class ESC[4;38;5;81mJumpRecorder:
    1319  
    1320      event_type = E.JUMP
    1321      name = "jump"
    1322  
    1323      def __init__(self, events):
    1324          self.events = events
    1325  
    1326      def __call__(self, code, from_, to):
    1327          from_line = line_from_offset(code, from_)
    1328          to_line = line_from_offset(code, to)
    1329          self.events.append((self.name, code.co_name, from_line, to_line))
    1330  
    1331  
    1332  class ESC[4;38;5;81mBranchRecorder(ESC[4;38;5;149mJumpRecorder):
    1333  
    1334      event_type = E.BRANCH
    1335      name = "branch"
    1336  
    1337  class ESC[4;38;5;81mReturnRecorder:
    1338  
    1339      event_type = E.PY_RETURN
    1340  
    1341      def __init__(self, events):
    1342          self.events = events
    1343  
    1344      def __call__(self, code, offset, val):
    1345          self.events.append(("return", val))
    1346  
    1347  
    1348  JUMP_AND_BRANCH_RECORDERS = JumpRecorder, BranchRecorder
    1349  JUMP_BRANCH_AND_LINE_RECORDERS = JumpRecorder, BranchRecorder, LineRecorder
    1350  FLOW_AND_LINE_RECORDERS = JumpRecorder, BranchRecorder, LineRecorder, ExceptionRecorder, ReturnRecorder
    1351  
    1352  class ESC[4;38;5;81mTestBranchAndJumpEvents(ESC[4;38;5;149mCheckEvents):
    1353      maxDiff = None
    1354  
    1355      def test_loop(self):
    1356  
    1357          def func():
    1358              x = 1
    1359              for a in range(2):
    1360                  if a:
    1361                      x = 4
    1362                  else:
    1363                      x = 6
    1364  
    1365          self.check_events(func, recorders = JUMP_AND_BRANCH_RECORDERS, expected = [
    1366              ('branch', 'func', 2, 2),
    1367              ('branch', 'func', 3, 6),
    1368              ('jump', 'func', 6, 2),
    1369              ('branch', 'func', 2, 2),
    1370              ('branch', 'func', 3, 4),
    1371              ('jump', 'func', 4, 2),
    1372              ('branch', 'func', 2, 2)])
    1373  
    1374          self.check_events(func, recorders = JUMP_BRANCH_AND_LINE_RECORDERS, expected = [
    1375              ('line', 'get_events', 10),
    1376              ('line', 'func', 1),
    1377              ('line', 'func', 2),
    1378              ('branch', 'func', 2, 2),
    1379              ('line', 'func', 3),
    1380              ('branch', 'func', 3, 6),
    1381              ('line', 'func', 6),
    1382              ('jump', 'func', 6, 2),
    1383              ('line', 'func', 2),
    1384              ('branch', 'func', 2, 2),
    1385              ('line', 'func', 3),
    1386              ('branch', 'func', 3, 4),
    1387              ('line', 'func', 4),
    1388              ('jump', 'func', 4, 2),
    1389              ('line', 'func', 2),
    1390              ('branch', 'func', 2, 2),
    1391              ('line', 'get_events', 11)])
    1392  
    1393      def test_except_star(self):
    1394  
    1395          class ESC[4;38;5;81mFoo:
    1396              def meth(self):
    1397                  pass
    1398  
    1399          def func():
    1400              try:
    1401                  try:
    1402                      raise KeyError
    1403                  except* Exception as e:
    1404                      f = Foo(); f.meth()
    1405              except KeyError:
    1406                  pass
    1407  
    1408  
    1409          self.check_events(func, recorders = JUMP_BRANCH_AND_LINE_RECORDERS, expected = [
    1410              ('line', 'get_events', 10),
    1411              ('line', 'func', 1),
    1412              ('line', 'func', 2),
    1413              ('line', 'func', 3),
    1414              ('line', 'func', 4),
    1415              ('branch', 'func', 4, 4),
    1416              ('line', 'func', 5),
    1417              ('line', 'meth', 1),
    1418              ('jump', 'func', 5, 5),
    1419              ('jump', 'func', 5, '[offset=114]'),
    1420              ('branch', 'func', '[offset=120]', '[offset=122]'),
    1421              ('line', 'get_events', 11)])
    1422  
    1423          self.check_events(func, recorders = FLOW_AND_LINE_RECORDERS, expected = [
    1424              ('line', 'get_events', 10),
    1425              ('line', 'func', 1),
    1426              ('line', 'func', 2),
    1427              ('line', 'func', 3),
    1428              ('raise', KeyError),
    1429              ('line', 'func', 4),
    1430              ('branch', 'func', 4, 4),
    1431              ('line', 'func', 5),
    1432              ('line', 'meth', 1),
    1433              ('return', None),
    1434              ('jump', 'func', 5, 5),
    1435              ('jump', 'func', 5, '[offset=114]'),
    1436              ('branch', 'func', '[offset=120]', '[offset=122]'),
    1437              ('return', None),
    1438              ('line', 'get_events', 11)])
    1439  
    1440  class ESC[4;38;5;81mTestLoadSuperAttr(ESC[4;38;5;149mCheckEvents):
    1441      RECORDERS = CallRecorder, LineRecorder, CRaiseRecorder, CReturnRecorder
    1442  
    1443      def _exec(self, co):
    1444          d = {}
    1445          exec(co, d, d)
    1446          return d
    1447  
    1448      def _exec_super(self, codestr, optimized=False):
    1449          # The compiler checks for statically visible shadowing of the name
    1450          # `super`, and declines to emit `LOAD_SUPER_ATTR` if shadowing is found.
    1451          # So inserting `super = super` prevents the compiler from emitting
    1452          # `LOAD_SUPER_ATTR`, and allows us to test that monitoring events for
    1453          # `LOAD_SUPER_ATTR` are equivalent to those we'd get from the
    1454          # un-optimized `LOAD_GLOBAL super; CALL; LOAD_ATTR` form.
    1455          assignment = "x = 1" if optimized else "super = super"
    1456          codestr = f"{assignment}\n{textwrap.dedent(codestr)}"
    1457          co = compile(codestr, "<string>", "exec")
    1458          # validate that we really do have a LOAD_SUPER_ATTR, only when optimized
    1459          self.assertEqual(self._has_load_super_attr(co), optimized)
    1460          return self._exec(co)
    1461  
    1462      def _has_load_super_attr(self, co):
    1463          has = any(instr.opname == "LOAD_SUPER_ATTR" for instr in dis.get_instructions(co))
    1464          if not has:
    1465              has = any(
    1466                  isinstance(c, types.CodeType) and self._has_load_super_attr(c)
    1467                  for c in co.co_consts
    1468              )
    1469          return has
    1470  
    1471      def _super_method_call(self, optimized=False):
    1472          codestr = """
    1473              class A:
    1474                  def method(self, x):
    1475                      return x
    1476  
    1477              class B(A):
    1478                  def method(self, x):
    1479                      return super(
    1480                      ).method(
    1481                          x
    1482                      )
    1483  
    1484              b = B()
    1485              def f():
    1486                  return b.method(1)
    1487          """
    1488          d = self._exec_super(codestr, optimized)
    1489          expected = [
    1490              ('line', 'get_events', 10),
    1491              ('call', 'f', sys.monitoring.MISSING),
    1492              ('line', 'f', 1),
    1493              ('call', 'method', d["b"]),
    1494              ('line', 'method', 1),
    1495              ('call', 'super', sys.monitoring.MISSING),
    1496              ('C return', 'super', sys.monitoring.MISSING),
    1497              ('line', 'method', 2),
    1498              ('line', 'method', 3),
    1499              ('line', 'method', 2),
    1500              ('call', 'method', 1),
    1501              ('line', 'method', 1),
    1502              ('line', 'method', 1),
    1503              ('line', 'get_events', 11),
    1504              ('call', 'set_events', 2),
    1505          ]
    1506          return d["f"], expected
    1507  
    1508      def test_method_call(self):
    1509          nonopt_func, nonopt_expected = self._super_method_call(optimized=False)
    1510          opt_func, opt_expected = self._super_method_call(optimized=True)
    1511  
    1512          self.check_events(nonopt_func, recorders=self.RECORDERS, expected=nonopt_expected)
    1513          self.check_events(opt_func, recorders=self.RECORDERS, expected=opt_expected)
    1514  
    1515      def _super_method_call_error(self, optimized=False):
    1516          codestr = """
    1517              class A:
    1518                  def method(self, x):
    1519                      return x
    1520  
    1521              class B(A):
    1522                  def method(self, x):
    1523                      return super(
    1524                          x,
    1525                          self,
    1526                      ).method(
    1527                          x
    1528                      )
    1529  
    1530              b = B()
    1531              def f():
    1532                  try:
    1533                      return b.method(1)
    1534                  except TypeError:
    1535                      pass
    1536                  else:
    1537                      assert False, "should have raised TypeError"
    1538          """
    1539          d = self._exec_super(codestr, optimized)
    1540          expected = [
    1541              ('line', 'get_events', 10),
    1542              ('call', 'f', sys.monitoring.MISSING),
    1543              ('line', 'f', 1),
    1544              ('line', 'f', 2),
    1545              ('call', 'method', d["b"]),
    1546              ('line', 'method', 1),
    1547              ('line', 'method', 2),
    1548              ('line', 'method', 3),
    1549              ('line', 'method', 1),
    1550              ('call', 'super', 1),
    1551              ('C raise', 'super', 1),
    1552              ('line', 'f', 3),
    1553              ('line', 'f', 4),
    1554              ('line', 'get_events', 11),
    1555              ('call', 'set_events', 2),
    1556          ]
    1557          return d["f"], expected
    1558  
    1559      def test_method_call_error(self):
    1560          nonopt_func, nonopt_expected = self._super_method_call_error(optimized=False)
    1561          opt_func, opt_expected = self._super_method_call_error(optimized=True)
    1562  
    1563          self.check_events(nonopt_func, recorders=self.RECORDERS, expected=nonopt_expected)
    1564          self.check_events(opt_func, recorders=self.RECORDERS, expected=opt_expected)
    1565  
    1566      def _super_attr(self, optimized=False):
    1567          codestr = """
    1568              class A:
    1569                  x = 1
    1570  
    1571              class B(A):
    1572                  def method(self):
    1573                      return super(
    1574                      ).x
    1575  
    1576              b = B()
    1577              def f():
    1578                  return b.method()
    1579          """
    1580          d = self._exec_super(codestr, optimized)
    1581          expected = [
    1582              ('line', 'get_events', 10),
    1583              ('call', 'f', sys.monitoring.MISSING),
    1584              ('line', 'f', 1),
    1585              ('call', 'method', d["b"]),
    1586              ('line', 'method', 1),
    1587              ('call', 'super', sys.monitoring.MISSING),
    1588              ('C return', 'super', sys.monitoring.MISSING),
    1589              ('line', 'method', 2),
    1590              ('line', 'method', 1),
    1591              ('line', 'get_events', 11),
    1592              ('call', 'set_events', 2)
    1593          ]
    1594          return d["f"], expected
    1595  
    1596      def test_attr(self):
    1597          nonopt_func, nonopt_expected = self._super_attr(optimized=False)
    1598          opt_func, opt_expected = self._super_attr(optimized=True)
    1599  
    1600          self.check_events(nonopt_func, recorders=self.RECORDERS, expected=nonopt_expected)
    1601          self.check_events(opt_func, recorders=self.RECORDERS, expected=opt_expected)
    1602  
    1603      def test_vs_other_type_call(self):
    1604          code_template = textwrap.dedent("""
    1605              class C:
    1606                  def method(self):
    1607                      return {cls}().__repr__{call}
    1608              c = C()
    1609              def f():
    1610                  return c.method()
    1611          """)
    1612  
    1613          def get_expected(name, call_method, ns):
    1614              repr_arg = 0 if name == "int" else sys.monitoring.MISSING
    1615              return [
    1616                  ('line', 'get_events', 10),
    1617                  ('call', 'f', sys.monitoring.MISSING),
    1618                  ('line', 'f', 1),
    1619                  ('call', 'method', ns["c"]),
    1620                  ('line', 'method', 1),
    1621                  ('call', name, sys.monitoring.MISSING),
    1622                  ('C return', name, sys.monitoring.MISSING),
    1623                  *(
    1624                      [
    1625                          ('call', '__repr__', repr_arg),
    1626                          ('C return', '__repr__', repr_arg),
    1627                      ] if call_method else []
    1628                  ),
    1629                  ('line', 'get_events', 11),
    1630                  ('call', 'set_events', 2),
    1631              ]
    1632  
    1633          for call_method in [True, False]:
    1634              with self.subTest(call_method=call_method):
    1635                  call_str = "()" if call_method else ""
    1636                  code_super = code_template.format(cls="super", call=call_str)
    1637                  code_int = code_template.format(cls="int", call=call_str)
    1638                  co_super = compile(code_super, '<string>', 'exec')
    1639                  self.assertTrue(self._has_load_super_attr(co_super))
    1640                  ns_super = self._exec(co_super)
    1641                  ns_int = self._exec(code_int)
    1642  
    1643                  self.check_events(
    1644                      ns_super["f"],
    1645                      recorders=self.RECORDERS,
    1646                      expected=get_expected("super", call_method, ns_super)
    1647                  )
    1648                  self.check_events(
    1649                      ns_int["f"],
    1650                      recorders=self.RECORDERS,
    1651                      expected=get_expected("int", call_method, ns_int)
    1652                  )
    1653  
    1654  
    1655  class ESC[4;38;5;81mTestSetGetEvents(ESC[4;38;5;149mMonitoringTestBase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1656  
    1657      def test_global(self):
    1658          sys.monitoring.set_events(TEST_TOOL, E.PY_START)
    1659          self.assertEqual(sys.monitoring.get_events(TEST_TOOL), E.PY_START)
    1660          sys.monitoring.set_events(TEST_TOOL2, E.PY_START)
    1661          self.assertEqual(sys.monitoring.get_events(TEST_TOOL2), E.PY_START)
    1662          sys.monitoring.set_events(TEST_TOOL, 0)
    1663          self.assertEqual(sys.monitoring.get_events(TEST_TOOL), 0)
    1664          sys.monitoring.set_events(TEST_TOOL2,0)
    1665          self.assertEqual(sys.monitoring.get_events(TEST_TOOL2), 0)
    1666  
    1667      def test_local(self):
    1668          code = f1.__code__
    1669          sys.monitoring.set_local_events(TEST_TOOL, code, E.PY_START)
    1670          self.assertEqual(sys.monitoring.get_local_events(TEST_TOOL, code), E.PY_START)
    1671          sys.monitoring.set_local_events(TEST_TOOL2, code, E.PY_START)
    1672          self.assertEqual(sys.monitoring.get_local_events(TEST_TOOL2, code), E.PY_START)
    1673          sys.monitoring.set_local_events(TEST_TOOL, code, 0)
    1674          self.assertEqual(sys.monitoring.get_local_events(TEST_TOOL, code), 0)
    1675          sys.monitoring.set_local_events(TEST_TOOL2, code, 0)
    1676          self.assertEqual(sys.monitoring.get_local_events(TEST_TOOL2, code), 0)
    1677  
    1678  class ESC[4;38;5;81mTestUninitialized(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase, ESC[4;38;5;149mMonitoringTestBase):
    1679  
    1680      @staticmethod
    1681      def f():
    1682          pass
    1683  
    1684      def test_get_local_events_uninitialized(self):
    1685          self.assertEqual(sys.monitoring.get_local_events(TEST_TOOL, self.f.__code__), 0)
    1686  
    1687  class ESC[4;38;5;81mTestRegressions(ESC[4;38;5;149mMonitoringTestBase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1688  
    1689      def test_105162(self):
    1690          caught = None
    1691  
    1692          def inner():
    1693              nonlocal caught
    1694              try:
    1695                  yield
    1696              except Exception:
    1697                  caught = "inner"
    1698                  yield
    1699  
    1700          def outer():
    1701              nonlocal caught
    1702              try:
    1703                  yield from inner()
    1704              except Exception:
    1705                  caught = "outer"
    1706                  yield
    1707  
    1708          def run():
    1709              gen = outer()
    1710              gen.send(None)
    1711              gen.throw(Exception)
    1712          run()
    1713          self.assertEqual(caught, "inner")
    1714          caught = None
    1715          try:
    1716              sys.monitoring.set_events(TEST_TOOL, E.PY_RESUME)
    1717              run()
    1718              self.assertEqual(caught, "inner")
    1719          finally:
    1720              sys.monitoring.set_events(TEST_TOOL, 0)
    1721  
    1722      def test_108390(self):
    1723  
    1724          class ESC[4;38;5;81mFoo:
    1725              def __init__(self, set_event):
    1726                  if set_event:
    1727                      sys.monitoring.set_events(TEST_TOOL, E.PY_RESUME)
    1728  
    1729          def make_foo_optimized_then_set_event():
    1730              for i in range(100):
    1731                  Foo(i == 99)
    1732  
    1733          try:
    1734              make_foo_optimized_then_set_event()
    1735          finally:
    1736              sys.monitoring.set_events(TEST_TOOL, 0)
    1737  
    1738      def test_gh108976(self):
    1739          sys.monitoring.use_tool_id(0, "test")
    1740          self.addCleanup(sys.monitoring.free_tool_id, 0)
    1741          sys.monitoring.set_events(0, 0)
    1742          sys.monitoring.register_callback(0, E.LINE, lambda *args: sys.monitoring.set_events(0, 0))
    1743          sys.monitoring.register_callback(0, E.INSTRUCTION, lambda *args: 0)
    1744          sys.monitoring.set_events(0, E.LINE | E.INSTRUCTION)
    1745          sys.monitoring.set_events(0, 0)