python (3.11.7)
       1  import gc
       2  import pprint
       3  import sys
       4  import unittest
       5  from test import support
       6  
       7  
       8  class ESC[4;38;5;81mTestGetProfile(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
       9      def setUp(self):
      10          sys.setprofile(None)
      11  
      12      def tearDown(self):
      13          sys.setprofile(None)
      14  
      15      def test_empty(self):
      16          self.assertIsNone(sys.getprofile())
      17  
      18      def test_setget(self):
      19          def fn(*args):
      20              pass
      21  
      22          sys.setprofile(fn)
      23          self.assertIs(sys.getprofile(), fn)
      24  
      25  class ESC[4;38;5;81mHookWatcher:
      26      def __init__(self):
      27          self.frames = []
      28          self.events = []
      29  
      30      def callback(self, frame, event, arg):
      31          if (event == "call"
      32              or event == "return"
      33              or event == "exception"):
      34              self.add_event(event, frame)
      35  
      36      def add_event(self, event, frame=None):
      37          """Add an event to the log."""
      38          if frame is None:
      39              frame = sys._getframe(1)
      40  
      41          try:
      42              frameno = self.frames.index(frame)
      43          except ValueError:
      44              frameno = len(self.frames)
      45              self.frames.append(frame)
      46  
      47          self.events.append((frameno, event, ident(frame)))
      48  
      49      def get_events(self):
      50          """Remove calls to add_event()."""
      51          disallowed = [ident(self.add_event.__func__), ident(ident)]
      52          self.frames = None
      53  
      54          return [item for item in self.events if item[2] not in disallowed]
      55  
      56  
      57  class ESC[4;38;5;81mProfileSimulator(ESC[4;38;5;149mHookWatcher):
      58      def __init__(self, testcase):
      59          self.testcase = testcase
      60          self.stack = []
      61          HookWatcher.__init__(self)
      62  
      63      def callback(self, frame, event, arg):
      64          # Callback registered with sys.setprofile()/sys.settrace()
      65          self.dispatch[event](self, frame)
      66  
      67      def trace_call(self, frame):
      68          self.add_event('call', frame)
      69          self.stack.append(frame)
      70  
      71      def trace_return(self, frame):
      72          self.add_event('return', frame)
      73          self.stack.pop()
      74  
      75      def trace_exception(self, frame):
      76          self.testcase.fail(
      77              "the profiler should never receive exception events")
      78  
      79      def trace_pass(self, frame):
      80          pass
      81  
      82      dispatch = {
      83          'call': trace_call,
      84          'exception': trace_exception,
      85          'return': trace_return,
      86          'c_call': trace_pass,
      87          'c_return': trace_pass,
      88          'c_exception': trace_pass,
      89          }
      90  
      91  
      92  class ESC[4;38;5;81mTestCaseBase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      93      def check_events(self, callable, expected):
      94          events = capture_events(callable, self.new_watcher())
      95          if events != expected:
      96              self.fail("Expected events:\n%s\nReceived events:\n%s"
      97                        % (pprint.pformat(expected), pprint.pformat(events)))
      98  
      99  
     100  class ESC[4;38;5;81mProfileHookTestCase(ESC[4;38;5;149mTestCaseBase):
     101      def new_watcher(self):
     102          return HookWatcher()
     103  
     104      def test_simple(self):
     105          def f(p):
     106              pass
     107          f_ident = ident(f)
     108          self.check_events(f, [(1, 'call', f_ident),
     109                                (1, 'return', f_ident),
     110                                ])
     111  
     112      def test_exception(self):
     113          def f(p):
     114              1/0
     115          f_ident = ident(f)
     116          self.check_events(f, [(1, 'call', f_ident),
     117                                (1, 'return', f_ident),
     118                                ])
     119  
     120      def test_caught_exception(self):
     121          def f(p):
     122              try: 1/0
     123              except: pass
     124          f_ident = ident(f)
     125          self.check_events(f, [(1, 'call', f_ident),
     126                                (1, 'return', f_ident),
     127                                ])
     128  
     129      def test_caught_nested_exception(self):
     130          def f(p):
     131              try: 1/0
     132              except: pass
     133          f_ident = ident(f)
     134          self.check_events(f, [(1, 'call', f_ident),
     135                                (1, 'return', f_ident),
     136                                ])
     137  
     138      def test_nested_exception(self):
     139          def f(p):
     140              1/0
     141          f_ident = ident(f)
     142          self.check_events(f, [(1, 'call', f_ident),
     143                                # This isn't what I expected:
     144                                # (0, 'exception', protect_ident),
     145                                # I expected this again:
     146                                (1, 'return', f_ident),
     147                                ])
     148  
     149      def test_exception_in_except_clause(self):
     150          def f(p):
     151              1/0
     152          def g(p):
     153              try:
     154                  f(p)
     155              except:
     156                  try: f(p)
     157                  except: pass
     158          f_ident = ident(f)
     159          g_ident = ident(g)
     160          self.check_events(g, [(1, 'call', g_ident),
     161                                (2, 'call', f_ident),
     162                                (2, 'return', f_ident),
     163                                (3, 'call', f_ident),
     164                                (3, 'return', f_ident),
     165                                (1, 'return', g_ident),
     166                                ])
     167  
     168      def test_exception_propagation(self):
     169          def f(p):
     170              1/0
     171          def g(p):
     172              try: f(p)
     173              finally: p.add_event("falling through")
     174          f_ident = ident(f)
     175          g_ident = ident(g)
     176          self.check_events(g, [(1, 'call', g_ident),
     177                                (2, 'call', f_ident),
     178                                (2, 'return', f_ident),
     179                                (1, 'falling through', g_ident),
     180                                (1, 'return', g_ident),
     181                                ])
     182  
     183      def test_raise_twice(self):
     184          def f(p):
     185              try: 1/0
     186              except: 1/0
     187          f_ident = ident(f)
     188          self.check_events(f, [(1, 'call', f_ident),
     189                                (1, 'return', f_ident),
     190                                ])
     191  
     192      def test_raise_reraise(self):
     193          def f(p):
     194              try: 1/0
     195              except: raise
     196          f_ident = ident(f)
     197          self.check_events(f, [(1, 'call', f_ident),
     198                                (1, 'return', f_ident),
     199                                ])
     200  
     201      def test_raise(self):
     202          def f(p):
     203              raise Exception()
     204          f_ident = ident(f)
     205          self.check_events(f, [(1, 'call', f_ident),
     206                                (1, 'return', f_ident),
     207                                ])
     208  
     209      def test_distant_exception(self):
     210          def f():
     211              1/0
     212          def g():
     213              f()
     214          def h():
     215              g()
     216          def i():
     217              h()
     218          def j(p):
     219              i()
     220          f_ident = ident(f)
     221          g_ident = ident(g)
     222          h_ident = ident(h)
     223          i_ident = ident(i)
     224          j_ident = ident(j)
     225          self.check_events(j, [(1, 'call', j_ident),
     226                                (2, 'call', i_ident),
     227                                (3, 'call', h_ident),
     228                                (4, 'call', g_ident),
     229                                (5, 'call', f_ident),
     230                                (5, 'return', f_ident),
     231                                (4, 'return', g_ident),
     232                                (3, 'return', h_ident),
     233                                (2, 'return', i_ident),
     234                                (1, 'return', j_ident),
     235                                ])
     236  
     237      def test_generator(self):
     238          def f():
     239              for i in range(2):
     240                  yield i
     241          def g(p):
     242              for i in f():
     243                  pass
     244          f_ident = ident(f)
     245          g_ident = ident(g)
     246          self.check_events(g, [(1, 'call', g_ident),
     247                                # call the iterator twice to generate values
     248                                (2, 'call', f_ident),
     249                                (2, 'return', f_ident),
     250                                (2, 'call', f_ident),
     251                                (2, 'return', f_ident),
     252                                # once more; returns end-of-iteration with
     253                                # actually raising an exception
     254                                (2, 'call', f_ident),
     255                                (2, 'return', f_ident),
     256                                (1, 'return', g_ident),
     257                                ])
     258  
     259      def test_stop_iteration(self):
     260          def f():
     261              for i in range(2):
     262                  yield i
     263          def g(p):
     264              for i in f():
     265                  pass
     266          f_ident = ident(f)
     267          g_ident = ident(g)
     268          self.check_events(g, [(1, 'call', g_ident),
     269                                # call the iterator twice to generate values
     270                                (2, 'call', f_ident),
     271                                (2, 'return', f_ident),
     272                                (2, 'call', f_ident),
     273                                (2, 'return', f_ident),
     274                                # once more to hit the raise:
     275                                (2, 'call', f_ident),
     276                                (2, 'return', f_ident),
     277                                (1, 'return', g_ident),
     278                                ])
     279  
     280  
     281  class ESC[4;38;5;81mProfileSimulatorTestCase(ESC[4;38;5;149mTestCaseBase):
     282      def new_watcher(self):
     283          return ProfileSimulator(self)
     284  
     285      def test_simple(self):
     286          def f(p):
     287              pass
     288          f_ident = ident(f)
     289          self.check_events(f, [(1, 'call', f_ident),
     290                                (1, 'return', f_ident),
     291                                ])
     292  
     293      def test_basic_exception(self):
     294          def f(p):
     295              1/0
     296          f_ident = ident(f)
     297          self.check_events(f, [(1, 'call', f_ident),
     298                                (1, 'return', f_ident),
     299                                ])
     300  
     301      def test_caught_exception(self):
     302          def f(p):
     303              try: 1/0
     304              except: pass
     305          f_ident = ident(f)
     306          self.check_events(f, [(1, 'call', f_ident),
     307                                (1, 'return', f_ident),
     308                                ])
     309  
     310      def test_distant_exception(self):
     311          def f():
     312              1/0
     313          def g():
     314              f()
     315          def h():
     316              g()
     317          def i():
     318              h()
     319          def j(p):
     320              i()
     321          f_ident = ident(f)
     322          g_ident = ident(g)
     323          h_ident = ident(h)
     324          i_ident = ident(i)
     325          j_ident = ident(j)
     326          self.check_events(j, [(1, 'call', j_ident),
     327                                (2, 'call', i_ident),
     328                                (3, 'call', h_ident),
     329                                (4, 'call', g_ident),
     330                                (5, 'call', f_ident),
     331                                (5, 'return', f_ident),
     332                                (4, 'return', g_ident),
     333                                (3, 'return', h_ident),
     334                                (2, 'return', i_ident),
     335                                (1, 'return', j_ident),
     336                                ])
     337  
     338      # bpo-34125: profiling method_descriptor with **kwargs
     339      def test_unbound_method(self):
     340          kwargs = {}
     341          def f(p):
     342              dict.get({}, 42, **kwargs)
     343          f_ident = ident(f)
     344          self.check_events(f, [(1, 'call', f_ident),
     345                                (1, 'return', f_ident)])
     346  
     347      # Test an invalid call (bpo-34126)
     348      def test_unbound_method_no_args(self):
     349          def f(p):
     350              dict.get()
     351          f_ident = ident(f)
     352          self.check_events(f, [(1, 'call', f_ident),
     353                                (1, 'return', f_ident)])
     354  
     355      # Test an invalid call (bpo-34126)
     356      def test_unbound_method_invalid_args(self):
     357          def f(p):
     358              dict.get(print, 42)
     359          f_ident = ident(f)
     360          self.check_events(f, [(1, 'call', f_ident),
     361                                (1, 'return', f_ident)])
     362  
     363      # Test an invalid call (bpo-34125)
     364      def test_unbound_method_no_keyword_args(self):
     365          kwargs = {}
     366          def f(p):
     367              dict.get(**kwargs)
     368          f_ident = ident(f)
     369          self.check_events(f, [(1, 'call', f_ident),
     370                                (1, 'return', f_ident)])
     371  
     372      # Test an invalid call (bpo-34125)
     373      def test_unbound_method_invalid_keyword_args(self):
     374          kwargs = {}
     375          def f(p):
     376              dict.get(print, 42, **kwargs)
     377          f_ident = ident(f)
     378          self.check_events(f, [(1, 'call', f_ident),
     379                                (1, 'return', f_ident)])
     380  
     381  
     382  def ident(function):
     383      if hasattr(function, "f_code"):
     384          code = function.f_code
     385      else:
     386          code = function.__code__
     387      return code.co_firstlineno, code.co_name
     388  
     389  
     390  def protect(f, p):
     391      try: f(p)
     392      except: pass
     393  
     394  protect_ident = ident(protect)
     395  
     396  
     397  def capture_events(callable, p=None):
     398      if p is None:
     399          p = HookWatcher()
     400      # Disable the garbage collector. This prevents __del__s from showing up in
     401      # traces.
     402      old_gc = gc.isenabled()
     403      gc.disable()
     404      try:
     405          sys.setprofile(p.callback)
     406          protect(callable, p)
     407          sys.setprofile(None)
     408      finally:
     409          if old_gc:
     410              gc.enable()
     411      return p.get_events()[1:-1]
     412  
     413  
     414  def show_events(callable):
     415      import pprint
     416      pprint.pprint(capture_events(callable))
     417  
     418  
     419  class ESC[4;38;5;81mTestEdgeCases(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     420  
     421      def setUp(self):
     422          self.addCleanup(sys.setprofile, sys.getprofile())
     423          sys.setprofile(None)
     424  
     425      def test_reentrancy(self):
     426          def foo(*args):
     427              ...
     428  
     429          def bar(*args):
     430              ...
     431  
     432          class ESC[4;38;5;81mA:
     433              def __call__(self, *args):
     434                  pass
     435  
     436              def __del__(self):
     437                  sys.setprofile(bar)
     438  
     439          sys.setprofile(A())
     440          with support.catch_unraisable_exception() as cm:
     441              sys.setprofile(foo)
     442              self.assertEqual(cm.unraisable.object, A.__del__)
     443              self.assertIsInstance(cm.unraisable.exc_value, RuntimeError)
     444  
     445          self.assertEqual(sys.getprofile(), foo)
     446  
     447  
     448      def test_same_object(self):
     449          def foo(*args):
     450              ...
     451  
     452          sys.setprofile(foo)
     453          del foo
     454          sys.setprofile(sys.getprofile())
     455  
     456  
     457  if __name__ == "__main__":
     458      unittest.main()