1  import unittest
       2  
       3  from contextlib import contextmanager, ExitStack
       4  from test.support import catch_unraisable_exception, import_helper
       5  
       6  
       7  # Skip this test if the _testcapi module isn't available.
       8  _testcapi = import_helper.import_module('_testcapi')
       9  
      10  
      11  class ESC[4;38;5;81mTestDictWatchers(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      12      # types of watchers testcapimodule can add:
      13      EVENTS = 0   # appends dict events as strings to global event list
      14      ERROR = 1    # unconditionally sets and signals a RuntimeException
      15      SECOND = 2   # always appends "second" to global event list
      16  
      17      def add_watcher(self, kind=EVENTS):
      18          return _testcapi.add_dict_watcher(kind)
      19  
      20      def clear_watcher(self, watcher_id):
      21          _testcapi.clear_dict_watcher(watcher_id)
      22  
      23      @contextmanager
      24      def watcher(self, kind=EVENTS):
      25          wid = self.add_watcher(kind)
      26          try:
      27              yield wid
      28          finally:
      29              self.clear_watcher(wid)
      30  
      31      def assert_events(self, expected):
      32          actual = _testcapi.get_dict_watcher_events()
      33          self.assertEqual(actual, expected)
      34  
      35      def watch(self, wid, d):
      36          _testcapi.watch_dict(wid, d)
      37  
      38      def unwatch(self, wid, d):
      39          _testcapi.unwatch_dict(wid, d)
      40  
      41      def test_set_new_item(self):
      42          d = {}
      43          with self.watcher() as wid:
      44              self.watch(wid, d)
      45              d["foo"] = "bar"
      46              self.assert_events(["new:foo:bar"])
      47  
      48      def test_set_existing_item(self):
      49          d = {"foo": "bar"}
      50          with self.watcher() as wid:
      51              self.watch(wid, d)
      52              d["foo"] = "baz"
      53              self.assert_events(["mod:foo:baz"])
      54  
      55      def test_clone(self):
      56          d = {}
      57          d2 = {"foo": "bar"}
      58          with self.watcher() as wid:
      59              self.watch(wid, d)
      60              d.update(d2)
      61              self.assert_events(["clone"])
      62  
      63      def test_no_event_if_not_watched(self):
      64          d = {}
      65          with self.watcher() as wid:
      66              d["foo"] = "bar"
      67              self.assert_events([])
      68  
      69      def test_del(self):
      70          d = {"foo": "bar"}
      71          with self.watcher() as wid:
      72              self.watch(wid, d)
      73              del d["foo"]
      74              self.assert_events(["del:foo"])
      75  
      76      def test_pop(self):
      77          d = {"foo": "bar"}
      78          with self.watcher() as wid:
      79              self.watch(wid, d)
      80              d.pop("foo")
      81              self.assert_events(["del:foo"])
      82  
      83      def test_clear(self):
      84          d = {"foo": "bar"}
      85          with self.watcher() as wid:
      86              self.watch(wid, d)
      87              d.clear()
      88              self.assert_events(["clear"])
      89  
      90      def test_dealloc(self):
      91          d = {"foo": "bar"}
      92          with self.watcher() as wid:
      93              self.watch(wid, d)
      94              del d
      95              self.assert_events(["dealloc"])
      96  
      97      def test_unwatch(self):
      98          d = {}
      99          with self.watcher() as wid:
     100              self.watch(wid, d)
     101              d["foo"] = "bar"
     102              self.unwatch(wid, d)
     103              d["hmm"] = "baz"
     104              self.assert_events(["new:foo:bar"])
     105  
     106      def test_error(self):
     107          d = {}
     108          with self.watcher(kind=self.ERROR) as wid:
     109              self.watch(wid, d)
     110              with catch_unraisable_exception() as cm:
     111                  d["foo"] = "bar"
     112                  self.assertIn(
     113                      "PyDict_EVENT_ADDED watcher callback for <dict at",
     114                      cm.unraisable.object
     115                  )
     116                  self.assertEqual(str(cm.unraisable.exc_value), "boom!")
     117              self.assert_events([])
     118  
     119      def test_dealloc_error(self):
     120          d = {}
     121          with self.watcher(kind=self.ERROR) as wid:
     122              self.watch(wid, d)
     123              with catch_unraisable_exception() as cm:
     124                  del d
     125                  self.assertEqual(str(cm.unraisable.exc_value), "boom!")
     126  
     127      def test_two_watchers(self):
     128          d1 = {}
     129          d2 = {}
     130          with self.watcher() as wid1:
     131              with self.watcher(kind=self.SECOND) as wid2:
     132                  self.watch(wid1, d1)
     133                  self.watch(wid2, d2)
     134                  d1["foo"] = "bar"
     135                  d2["hmm"] = "baz"
     136                  self.assert_events(["new:foo:bar", "second"])
     137  
     138      def test_watch_non_dict(self):
     139          with self.watcher() as wid:
     140              with self.assertRaisesRegex(ValueError, r"Cannot watch non-dictionary"):
     141                  self.watch(wid, 1)
     142  
     143      def test_watch_out_of_range_watcher_id(self):
     144          d = {}
     145          with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"):
     146              self.watch(-1, d)
     147          with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"):
     148              self.watch(8, d)  # DICT_MAX_WATCHERS = 8
     149  
     150      def test_watch_unassigned_watcher_id(self):
     151          d = {}
     152          with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 1"):
     153              self.watch(1, d)
     154  
     155      def test_unwatch_non_dict(self):
     156          with self.watcher() as wid:
     157              with self.assertRaisesRegex(ValueError, r"Cannot watch non-dictionary"):
     158                  self.unwatch(wid, 1)
     159  
     160      def test_unwatch_out_of_range_watcher_id(self):
     161          d = {}
     162          with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"):
     163              self.unwatch(-1, d)
     164          with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"):
     165              self.unwatch(8, d)  # DICT_MAX_WATCHERS = 8
     166  
     167      def test_unwatch_unassigned_watcher_id(self):
     168          d = {}
     169          with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 1"):
     170              self.unwatch(1, d)
     171  
     172      def test_clear_out_of_range_watcher_id(self):
     173          with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"):
     174              self.clear_watcher(-1)
     175          with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"):
     176              self.clear_watcher(8)  # DICT_MAX_WATCHERS = 8
     177  
     178      def test_clear_unassigned_watcher_id(self):
     179          with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 1"):
     180              self.clear_watcher(1)
     181  
     182  
     183  class ESC[4;38;5;81mTestTypeWatchers(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     184      # types of watchers testcapimodule can add:
     185      TYPES = 0    # appends modified types to global event list
     186      ERROR = 1    # unconditionally sets and signals a RuntimeException
     187      WRAP = 2     # appends modified type wrapped in list to global event list
     188  
     189      # duplicating the C constant
     190      TYPE_MAX_WATCHERS = 8
     191  
     192      def add_watcher(self, kind=TYPES):
     193          return _testcapi.add_type_watcher(kind)
     194  
     195      def clear_watcher(self, watcher_id):
     196          _testcapi.clear_type_watcher(watcher_id)
     197  
     198      @contextmanager
     199      def watcher(self, kind=TYPES):
     200          wid = self.add_watcher(kind)
     201          try:
     202              yield wid
     203          finally:
     204              self.clear_watcher(wid)
     205  
     206      def assert_events(self, expected):
     207          actual = _testcapi.get_type_modified_events()
     208          self.assertEqual(actual, expected)
     209  
     210      def watch(self, wid, t):
     211          _testcapi.watch_type(wid, t)
     212  
     213      def unwatch(self, wid, t):
     214          _testcapi.unwatch_type(wid, t)
     215  
     216      def test_watch_type(self):
     217          class ESC[4;38;5;81mC: pass
     218          with self.watcher() as wid:
     219              self.watch(wid, C)
     220              C.foo = "bar"
     221              self.assert_events([C])
     222  
     223      def test_event_aggregation(self):
     224          class ESC[4;38;5;81mC: pass
     225          with self.watcher() as wid:
     226              self.watch(wid, C)
     227              C.foo = "bar"
     228              C.bar = "baz"
     229              # only one event registered for both modifications
     230              self.assert_events([C])
     231  
     232      def test_lookup_resets_aggregation(self):
     233          class ESC[4;38;5;81mC: pass
     234          with self.watcher() as wid:
     235              self.watch(wid, C)
     236              C.foo = "bar"
     237              # lookup resets type version tag
     238              self.assertEqual(C.foo, "bar")
     239              C.bar = "baz"
     240              # both events registered
     241              self.assert_events([C, C])
     242  
     243      def test_unwatch_type(self):
     244          class ESC[4;38;5;81mC: pass
     245          with self.watcher() as wid:
     246              self.watch(wid, C)
     247              C.foo = "bar"
     248              self.assertEqual(C.foo, "bar")
     249              self.assert_events([C])
     250              self.unwatch(wid, C)
     251              C.bar = "baz"
     252              self.assert_events([C])
     253  
     254      def test_clear_watcher(self):
     255          class ESC[4;38;5;81mC: pass
     256          # outer watcher is unused, it's just to keep events list alive
     257          with self.watcher() as _:
     258              with self.watcher() as wid:
     259                  self.watch(wid, C)
     260                  C.foo = "bar"
     261                  self.assertEqual(C.foo, "bar")
     262                  self.assert_events([C])
     263              C.bar = "baz"
     264              # Watcher on C has been cleared, no new event
     265              self.assert_events([C])
     266  
     267      def test_watch_type_subclass(self):
     268          class ESC[4;38;5;81mC: pass
     269          class ESC[4;38;5;81mD(ESC[4;38;5;149mC): pass
     270          with self.watcher() as wid:
     271              self.watch(wid, D)
     272              C.foo = "bar"
     273              self.assert_events([D])
     274  
     275      def test_error(self):
     276          class ESC[4;38;5;81mC: pass
     277          with self.watcher(kind=self.ERROR) as wid:
     278              self.watch(wid, C)
     279              with catch_unraisable_exception() as cm:
     280                  C.foo = "bar"
     281                  self.assertIs(cm.unraisable.object, C)
     282                  self.assertEqual(str(cm.unraisable.exc_value), "boom!")
     283              self.assert_events([])
     284  
     285      def test_two_watchers(self):
     286          class ESC[4;38;5;81mC1: pass
     287          class ESC[4;38;5;81mC2: pass
     288          with self.watcher() as wid1:
     289              with self.watcher(kind=self.WRAP) as wid2:
     290                  self.assertNotEqual(wid1, wid2)
     291                  self.watch(wid1, C1)
     292                  self.watch(wid2, C2)
     293                  C1.foo = "bar"
     294                  C2.hmm = "baz"
     295                  self.assert_events([C1, [C2]])
     296  
     297      def test_all_watchers(self):
     298          class ESC[4;38;5;81mC: pass
     299          with ExitStack() as stack:
     300              last_wid = -1
     301              # don't make assumptions about how many watchers are already
     302              # registered, just go until we reach the max ID
     303              while last_wid < self.TYPE_MAX_WATCHERS - 1:
     304                  last_wid = stack.enter_context(self.watcher())
     305              self.watch(last_wid, C)
     306              C.foo = "bar"
     307              self.assert_events([C])
     308  
     309      def test_watch_non_type(self):
     310          with self.watcher() as wid:
     311              with self.assertRaisesRegex(ValueError, r"Cannot watch non-type"):
     312                  self.watch(wid, 1)
     313  
     314      def test_watch_out_of_range_watcher_id(self):
     315          class ESC[4;38;5;81mC: pass
     316          with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"):
     317              self.watch(-1, C)
     318          with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"):
     319              self.watch(self.TYPE_MAX_WATCHERS, C)
     320  
     321      def test_watch_unassigned_watcher_id(self):
     322          class ESC[4;38;5;81mC: pass
     323          with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"):
     324              self.watch(1, C)
     325  
     326      def test_unwatch_non_type(self):
     327          with self.watcher() as wid:
     328              with self.assertRaisesRegex(ValueError, r"Cannot watch non-type"):
     329                  self.unwatch(wid, 1)
     330  
     331      def test_unwatch_out_of_range_watcher_id(self):
     332          class ESC[4;38;5;81mC: pass
     333          with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"):
     334              self.unwatch(-1, C)
     335          with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"):
     336              self.unwatch(self.TYPE_MAX_WATCHERS, C)
     337  
     338      def test_unwatch_unassigned_watcher_id(self):
     339          class ESC[4;38;5;81mC: pass
     340          with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"):
     341              self.unwatch(1, C)
     342  
     343      def test_clear_out_of_range_watcher_id(self):
     344          with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"):
     345              self.clear_watcher(-1)
     346          with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"):
     347              self.clear_watcher(self.TYPE_MAX_WATCHERS)
     348  
     349      def test_clear_unassigned_watcher_id(self):
     350          with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"):
     351              self.clear_watcher(1)
     352  
     353      def test_no_more_ids_available(self):
     354          with self.assertRaisesRegex(RuntimeError, r"no more type watcher IDs"):
     355              with ExitStack() as stack:
     356                  for _ in range(self.TYPE_MAX_WATCHERS + 1):
     357                      stack.enter_context(self.watcher())
     358  
     359  
     360  class ESC[4;38;5;81mTestCodeObjectWatchers(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     361      @contextmanager
     362      def code_watcher(self, which_watcher):
     363          wid = _testcapi.add_code_watcher(which_watcher)
     364          try:
     365              yield wid
     366          finally:
     367              _testcapi.clear_code_watcher(wid)
     368  
     369      def assert_event_counts(self, exp_created_0, exp_destroyed_0,
     370                              exp_created_1, exp_destroyed_1):
     371          self.assertEqual(
     372              exp_created_0, _testcapi.get_code_watcher_num_created_events(0))
     373          self.assertEqual(
     374              exp_destroyed_0, _testcapi.get_code_watcher_num_destroyed_events(0))
     375          self.assertEqual(
     376              exp_created_1, _testcapi.get_code_watcher_num_created_events(1))
     377          self.assertEqual(
     378              exp_destroyed_1, _testcapi.get_code_watcher_num_destroyed_events(1))
     379  
     380      def test_code_object_events_dispatched(self):
     381          # verify that all counts are zero before any watchers are registered
     382          self.assert_event_counts(0, 0, 0, 0)
     383  
     384          # verify that all counts remain zero when a code object is
     385          # created and destroyed with no watchers registered
     386          co1 = _testcapi.code_newempty("test_watchers", "dummy1", 0)
     387          self.assert_event_counts(0, 0, 0, 0)
     388          del co1
     389          self.assert_event_counts(0, 0, 0, 0)
     390  
     391          # verify counts are as expected when first watcher is registered
     392          with self.code_watcher(0):
     393              self.assert_event_counts(0, 0, 0, 0)
     394              co2 = _testcapi.code_newempty("test_watchers", "dummy2", 0)
     395              self.assert_event_counts(1, 0, 0, 0)
     396              del co2
     397              self.assert_event_counts(1, 1, 0, 0)
     398  
     399              # again with second watcher registered
     400              with self.code_watcher(1):
     401                  self.assert_event_counts(1, 1, 0, 0)
     402                  co3 = _testcapi.code_newempty("test_watchers", "dummy3", 0)
     403                  self.assert_event_counts(2, 1, 1, 0)
     404                  del co3
     405                  self.assert_event_counts(2, 2, 1, 1)
     406  
     407          # verify counts are reset and don't change after both watchers are cleared
     408          co4 = _testcapi.code_newempty("test_watchers", "dummy4", 0)
     409          self.assert_event_counts(0, 0, 0, 0)
     410          del co4
     411          self.assert_event_counts(0, 0, 0, 0)
     412  
     413      def test_error(self):
     414          with self.code_watcher(2):
     415              with catch_unraisable_exception() as cm:
     416                  co = _testcapi.code_newempty("test_watchers", "dummy0", 0)
     417  
     418                  self.assertEqual(
     419                      cm.unraisable.object,
     420                      f"PY_CODE_EVENT_CREATE watcher callback for {co!r}"
     421                  )
     422                  self.assertEqual(str(cm.unraisable.exc_value), "boom!")
     423  
     424      def test_dealloc_error(self):
     425          co = _testcapi.code_newempty("test_watchers", "dummy0", 0)
     426          with self.code_watcher(2):
     427              with catch_unraisable_exception() as cm:
     428                  del co
     429  
     430                  self.assertEqual(str(cm.unraisable.exc_value), "boom!")
     431  
     432      def test_clear_out_of_range_watcher_id(self):
     433          with self.assertRaisesRegex(ValueError, r"Invalid code watcher ID -1"):
     434              _testcapi.clear_code_watcher(-1)
     435          with self.assertRaisesRegex(ValueError, r"Invalid code watcher ID 8"):
     436              _testcapi.clear_code_watcher(8)  # CODE_MAX_WATCHERS = 8
     437  
     438      def test_clear_unassigned_watcher_id(self):
     439          with self.assertRaisesRegex(ValueError, r"No code watcher set for ID 1"):
     440              _testcapi.clear_code_watcher(1)
     441  
     442      def test_allocate_too_many_watchers(self):
     443          with self.assertRaisesRegex(RuntimeError, r"no more code watcher IDs available"):
     444              _testcapi.allocate_too_many_code_watchers()
     445  
     446  
     447  class ESC[4;38;5;81mTestFuncWatchers(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     448      @contextmanager
     449      def add_watcher(self, func):
     450          wid = _testcapi.add_func_watcher(func)
     451          try:
     452              yield
     453          finally:
     454              _testcapi.clear_func_watcher(wid)
     455  
     456      def test_func_events_dispatched(self):
     457          events = []
     458          def watcher(*args):
     459              events.append(args)
     460  
     461          with self.add_watcher(watcher):
     462              def myfunc():
     463                  pass
     464              self.assertIn((_testcapi.PYFUNC_EVENT_CREATE, myfunc, None), events)
     465              myfunc_id = id(myfunc)
     466  
     467              new_code = self.test_func_events_dispatched.__code__
     468              myfunc.__code__ = new_code
     469              self.assertIn((_testcapi.PYFUNC_EVENT_MODIFY_CODE, myfunc, new_code), events)
     470  
     471              new_defaults = (123,)
     472              myfunc.__defaults__ = new_defaults
     473              self.assertIn((_testcapi.PYFUNC_EVENT_MODIFY_DEFAULTS, myfunc, new_defaults), events)
     474  
     475              new_defaults = (456,)
     476              _testcapi.set_func_defaults_via_capi(myfunc, new_defaults)
     477              self.assertIn((_testcapi.PYFUNC_EVENT_MODIFY_DEFAULTS, myfunc, new_defaults), events)
     478  
     479              new_kwdefaults = {"self": 123}
     480              myfunc.__kwdefaults__ = new_kwdefaults
     481              self.assertIn((_testcapi.PYFUNC_EVENT_MODIFY_KWDEFAULTS, myfunc, new_kwdefaults), events)
     482  
     483              new_kwdefaults = {"self": 456}
     484              _testcapi.set_func_kwdefaults_via_capi(myfunc, new_kwdefaults)
     485              self.assertIn((_testcapi.PYFUNC_EVENT_MODIFY_KWDEFAULTS, myfunc, new_kwdefaults), events)
     486  
     487              # Clear events reference to func
     488              events = []
     489              del myfunc
     490              self.assertIn((_testcapi.PYFUNC_EVENT_DESTROY, myfunc_id, None), events)
     491  
     492      def test_multiple_watchers(self):
     493          events0 = []
     494          def first_watcher(*args):
     495              events0.append(args)
     496  
     497          events1 = []
     498          def second_watcher(*args):
     499              events1.append(args)
     500  
     501          with self.add_watcher(first_watcher):
     502              with self.add_watcher(second_watcher):
     503                  def myfunc():
     504                      pass
     505  
     506                  event = (_testcapi.PYFUNC_EVENT_CREATE, myfunc, None)
     507                  self.assertIn(event, events0)
     508                  self.assertIn(event, events1)
     509  
     510      def test_watcher_raises_error(self):
     511          class ESC[4;38;5;81mMyError(ESC[4;38;5;149mException):
     512              pass
     513  
     514          def watcher(*args):
     515              raise MyError("testing 123")
     516  
     517          with self.add_watcher(watcher):
     518              with catch_unraisable_exception() as cm:
     519                  def myfunc():
     520                      pass
     521  
     522                  self.assertEqual(
     523                      cm.unraisable.object,
     524                      f"PyFunction_EVENT_CREATE watcher callback for {myfunc!r}"
     525                  )
     526  
     527      def test_dealloc_watcher_raises_error(self):
     528          class ESC[4;38;5;81mMyError(ESC[4;38;5;149mException):
     529              pass
     530  
     531          def watcher(*args):
     532              raise MyError("testing 123")
     533  
     534          def myfunc():
     535              pass
     536  
     537          with self.add_watcher(watcher):
     538              with catch_unraisable_exception() as cm:
     539                  del myfunc
     540  
     541                  self.assertIsInstance(cm.unraisable.exc_value, MyError)
     542  
     543      def test_clear_out_of_range_watcher_id(self):
     544          with self.assertRaisesRegex(ValueError, r"invalid func watcher ID -1"):
     545              _testcapi.clear_func_watcher(-1)
     546          with self.assertRaisesRegex(ValueError, r"invalid func watcher ID 8"):
     547              _testcapi.clear_func_watcher(8)  # FUNC_MAX_WATCHERS = 8
     548  
     549      def test_clear_unassigned_watcher_id(self):
     550          with self.assertRaisesRegex(ValueError, r"no func watcher set for ID 1"):
     551              _testcapi.clear_func_watcher(1)
     552  
     553      def test_allocate_too_many_watchers(self):
     554          with self.assertRaisesRegex(RuntimeError, r"no more func watcher IDs"):
     555              _testcapi.allocate_too_many_func_watchers()
     556  
     557  
     558  if __name__ == "__main__":
     559      unittest.main()