python (3.12.0)
1 import unittest
2
3 from contextlib import contextmanager, ExitStack
4 from test.support import catch_unraisable_exception, import_helper
5
6
7 # Skip this test if the _testcapi module isn't available.
8 _testcapi = import_helper.import_module('_testcapi')
9
10
11 class ESC[4;38;5;81mTestDictWatchers(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
12 # types of watchers testcapimodule can add:
13 EVENTS = 0 # appends dict events as strings to global event list
14 ERROR = 1 # unconditionally sets and signals a RuntimeException
15 SECOND = 2 # always appends "second" to global event list
16
17 def add_watcher(self, kind=EVENTS):
18 return _testcapi.add_dict_watcher(kind)
19
20 def clear_watcher(self, watcher_id):
21 _testcapi.clear_dict_watcher(watcher_id)
22
23 @contextmanager
24 def watcher(self, kind=EVENTS):
25 wid = self.add_watcher(kind)
26 try:
27 yield wid
28 finally:
29 self.clear_watcher(wid)
30
31 def assert_events(self, expected):
32 actual = _testcapi.get_dict_watcher_events()
33 self.assertEqual(actual, expected)
34
35 def watch(self, wid, d):
36 _testcapi.watch_dict(wid, d)
37
38 def unwatch(self, wid, d):
39 _testcapi.unwatch_dict(wid, d)
40
41 def test_set_new_item(self):
42 d = {}
43 with self.watcher() as wid:
44 self.watch(wid, d)
45 d["foo"] = "bar"
46 self.assert_events(["new:foo:bar"])
47
48 def test_set_existing_item(self):
49 d = {"foo": "bar"}
50 with self.watcher() as wid:
51 self.watch(wid, d)
52 d["foo"] = "baz"
53 self.assert_events(["mod:foo:baz"])
54
55 def test_clone(self):
56 d = {}
57 d2 = {"foo": "bar"}
58 with self.watcher() as wid:
59 self.watch(wid, d)
60 d.update(d2)
61 self.assert_events(["clone"])
62
63 def test_no_event_if_not_watched(self):
64 d = {}
65 with self.watcher() as wid:
66 d["foo"] = "bar"
67 self.assert_events([])
68
69 def test_del(self):
70 d = {"foo": "bar"}
71 with self.watcher() as wid:
72 self.watch(wid, d)
73 del d["foo"]
74 self.assert_events(["del:foo"])
75
76 def test_pop(self):
77 d = {"foo": "bar"}
78 with self.watcher() as wid:
79 self.watch(wid, d)
80 d.pop("foo")
81 self.assert_events(["del:foo"])
82
83 def test_clear(self):
84 d = {"foo": "bar"}
85 with self.watcher() as wid:
86 self.watch(wid, d)
87 d.clear()
88 self.assert_events(["clear"])
89
90 def test_dealloc(self):
91 d = {"foo": "bar"}
92 with self.watcher() as wid:
93 self.watch(wid, d)
94 del d
95 self.assert_events(["dealloc"])
96
97 def test_unwatch(self):
98 d = {}
99 with self.watcher() as wid:
100 self.watch(wid, d)
101 d["foo"] = "bar"
102 self.unwatch(wid, d)
103 d["hmm"] = "baz"
104 self.assert_events(["new:foo:bar"])
105
106 def test_error(self):
107 d = {}
108 with self.watcher(kind=self.ERROR) as wid:
109 self.watch(wid, d)
110 with catch_unraisable_exception() as cm:
111 d["foo"] = "bar"
112 self.assertIn(
113 "PyDict_EVENT_ADDED watcher callback for <dict at",
114 cm.unraisable.object
115 )
116 self.assertEqual(str(cm.unraisable.exc_value), "boom!")
117 self.assert_events([])
118
119 def test_dealloc_error(self):
120 d = {}
121 with self.watcher(kind=self.ERROR) as wid:
122 self.watch(wid, d)
123 with catch_unraisable_exception() as cm:
124 del d
125 self.assertEqual(str(cm.unraisable.exc_value), "boom!")
126
127 def test_two_watchers(self):
128 d1 = {}
129 d2 = {}
130 with self.watcher() as wid1:
131 with self.watcher(kind=self.SECOND) as wid2:
132 self.watch(wid1, d1)
133 self.watch(wid2, d2)
134 d1["foo"] = "bar"
135 d2["hmm"] = "baz"
136 self.assert_events(["new:foo:bar", "second"])
137
138 def test_watch_non_dict(self):
139 with self.watcher() as wid:
140 with self.assertRaisesRegex(ValueError, r"Cannot watch non-dictionary"):
141 self.watch(wid, 1)
142
143 def test_watch_out_of_range_watcher_id(self):
144 d = {}
145 with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"):
146 self.watch(-1, d)
147 with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"):
148 self.watch(8, d) # DICT_MAX_WATCHERS = 8
149
150 def test_watch_unassigned_watcher_id(self):
151 d = {}
152 with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 1"):
153 self.watch(1, d)
154
155 def test_unwatch_non_dict(self):
156 with self.watcher() as wid:
157 with self.assertRaisesRegex(ValueError, r"Cannot watch non-dictionary"):
158 self.unwatch(wid, 1)
159
160 def test_unwatch_out_of_range_watcher_id(self):
161 d = {}
162 with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"):
163 self.unwatch(-1, d)
164 with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"):
165 self.unwatch(8, d) # DICT_MAX_WATCHERS = 8
166
167 def test_unwatch_unassigned_watcher_id(self):
168 d = {}
169 with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 1"):
170 self.unwatch(1, d)
171
172 def test_clear_out_of_range_watcher_id(self):
173 with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"):
174 self.clear_watcher(-1)
175 with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"):
176 self.clear_watcher(8) # DICT_MAX_WATCHERS = 8
177
178 def test_clear_unassigned_watcher_id(self):
179 with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 1"):
180 self.clear_watcher(1)
181
182
183 class ESC[4;38;5;81mTestTypeWatchers(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
184 # types of watchers testcapimodule can add:
185 TYPES = 0 # appends modified types to global event list
186 ERROR = 1 # unconditionally sets and signals a RuntimeException
187 WRAP = 2 # appends modified type wrapped in list to global event list
188
189 # duplicating the C constant
190 TYPE_MAX_WATCHERS = 8
191
192 def add_watcher(self, kind=TYPES):
193 return _testcapi.add_type_watcher(kind)
194
195 def clear_watcher(self, watcher_id):
196 _testcapi.clear_type_watcher(watcher_id)
197
198 @contextmanager
199 def watcher(self, kind=TYPES):
200 wid = self.add_watcher(kind)
201 try:
202 yield wid
203 finally:
204 self.clear_watcher(wid)
205
206 def assert_events(self, expected):
207 actual = _testcapi.get_type_modified_events()
208 self.assertEqual(actual, expected)
209
210 def watch(self, wid, t):
211 _testcapi.watch_type(wid, t)
212
213 def unwatch(self, wid, t):
214 _testcapi.unwatch_type(wid, t)
215
216 def test_watch_type(self):
217 class ESC[4;38;5;81mC: pass
218 with self.watcher() as wid:
219 self.watch(wid, C)
220 C.foo = "bar"
221 self.assert_events([C])
222
223 def test_event_aggregation(self):
224 class ESC[4;38;5;81mC: pass
225 with self.watcher() as wid:
226 self.watch(wid, C)
227 C.foo = "bar"
228 C.bar = "baz"
229 # only one event registered for both modifications
230 self.assert_events([C])
231
232 def test_lookup_resets_aggregation(self):
233 class ESC[4;38;5;81mC: pass
234 with self.watcher() as wid:
235 self.watch(wid, C)
236 C.foo = "bar"
237 # lookup resets type version tag
238 self.assertEqual(C.foo, "bar")
239 C.bar = "baz"
240 # both events registered
241 self.assert_events([C, C])
242
243 def test_unwatch_type(self):
244 class ESC[4;38;5;81mC: pass
245 with self.watcher() as wid:
246 self.watch(wid, C)
247 C.foo = "bar"
248 self.assertEqual(C.foo, "bar")
249 self.assert_events([C])
250 self.unwatch(wid, C)
251 C.bar = "baz"
252 self.assert_events([C])
253
254 def test_clear_watcher(self):
255 class ESC[4;38;5;81mC: pass
256 # outer watcher is unused, it's just to keep events list alive
257 with self.watcher() as _:
258 with self.watcher() as wid:
259 self.watch(wid, C)
260 C.foo = "bar"
261 self.assertEqual(C.foo, "bar")
262 self.assert_events([C])
263 C.bar = "baz"
264 # Watcher on C has been cleared, no new event
265 self.assert_events([C])
266
267 def test_watch_type_subclass(self):
268 class ESC[4;38;5;81mC: pass
269 class ESC[4;38;5;81mD(ESC[4;38;5;149mC): pass
270 with self.watcher() as wid:
271 self.watch(wid, D)
272 C.foo = "bar"
273 self.assert_events([D])
274
275 def test_error(self):
276 class ESC[4;38;5;81mC: pass
277 with self.watcher(kind=self.ERROR) as wid:
278 self.watch(wid, C)
279 with catch_unraisable_exception() as cm:
280 C.foo = "bar"
281 self.assertIs(cm.unraisable.object, C)
282 self.assertEqual(str(cm.unraisable.exc_value), "boom!")
283 self.assert_events([])
284
285 def test_two_watchers(self):
286 class ESC[4;38;5;81mC1: pass
287 class ESC[4;38;5;81mC2: pass
288 with self.watcher() as wid1:
289 with self.watcher(kind=self.WRAP) as wid2:
290 self.assertNotEqual(wid1, wid2)
291 self.watch(wid1, C1)
292 self.watch(wid2, C2)
293 C1.foo = "bar"
294 C2.hmm = "baz"
295 self.assert_events([C1, [C2]])
296
297 def test_all_watchers(self):
298 class ESC[4;38;5;81mC: pass
299 with ExitStack() as stack:
300 last_wid = -1
301 # don't make assumptions about how many watchers are already
302 # registered, just go until we reach the max ID
303 while last_wid < self.TYPE_MAX_WATCHERS - 1:
304 last_wid = stack.enter_context(self.watcher())
305 self.watch(last_wid, C)
306 C.foo = "bar"
307 self.assert_events([C])
308
309 def test_watch_non_type(self):
310 with self.watcher() as wid:
311 with self.assertRaisesRegex(ValueError, r"Cannot watch non-type"):
312 self.watch(wid, 1)
313
314 def test_watch_out_of_range_watcher_id(self):
315 class ESC[4;38;5;81mC: pass
316 with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"):
317 self.watch(-1, C)
318 with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"):
319 self.watch(self.TYPE_MAX_WATCHERS, C)
320
321 def test_watch_unassigned_watcher_id(self):
322 class ESC[4;38;5;81mC: pass
323 with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"):
324 self.watch(1, C)
325
326 def test_unwatch_non_type(self):
327 with self.watcher() as wid:
328 with self.assertRaisesRegex(ValueError, r"Cannot watch non-type"):
329 self.unwatch(wid, 1)
330
331 def test_unwatch_out_of_range_watcher_id(self):
332 class ESC[4;38;5;81mC: pass
333 with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"):
334 self.unwatch(-1, C)
335 with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"):
336 self.unwatch(self.TYPE_MAX_WATCHERS, C)
337
338 def test_unwatch_unassigned_watcher_id(self):
339 class ESC[4;38;5;81mC: pass
340 with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"):
341 self.unwatch(1, C)
342
343 def test_clear_out_of_range_watcher_id(self):
344 with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"):
345 self.clear_watcher(-1)
346 with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"):
347 self.clear_watcher(self.TYPE_MAX_WATCHERS)
348
349 def test_clear_unassigned_watcher_id(self):
350 with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"):
351 self.clear_watcher(1)
352
353 def test_no_more_ids_available(self):
354 with self.assertRaisesRegex(RuntimeError, r"no more type watcher IDs"):
355 with ExitStack() as stack:
356 for _ in range(self.TYPE_MAX_WATCHERS + 1):
357 stack.enter_context(self.watcher())
358
359
360 class ESC[4;38;5;81mTestCodeObjectWatchers(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
361 @contextmanager
362 def code_watcher(self, which_watcher):
363 wid = _testcapi.add_code_watcher(which_watcher)
364 try:
365 yield wid
366 finally:
367 _testcapi.clear_code_watcher(wid)
368
369 def assert_event_counts(self, exp_created_0, exp_destroyed_0,
370 exp_created_1, exp_destroyed_1):
371 self.assertEqual(
372 exp_created_0, _testcapi.get_code_watcher_num_created_events(0))
373 self.assertEqual(
374 exp_destroyed_0, _testcapi.get_code_watcher_num_destroyed_events(0))
375 self.assertEqual(
376 exp_created_1, _testcapi.get_code_watcher_num_created_events(1))
377 self.assertEqual(
378 exp_destroyed_1, _testcapi.get_code_watcher_num_destroyed_events(1))
379
380 def test_code_object_events_dispatched(self):
381 # verify that all counts are zero before any watchers are registered
382 self.assert_event_counts(0, 0, 0, 0)
383
384 # verify that all counts remain zero when a code object is
385 # created and destroyed with no watchers registered
386 co1 = _testcapi.code_newempty("test_watchers", "dummy1", 0)
387 self.assert_event_counts(0, 0, 0, 0)
388 del co1
389 self.assert_event_counts(0, 0, 0, 0)
390
391 # verify counts are as expected when first watcher is registered
392 with self.code_watcher(0):
393 self.assert_event_counts(0, 0, 0, 0)
394 co2 = _testcapi.code_newempty("test_watchers", "dummy2", 0)
395 self.assert_event_counts(1, 0, 0, 0)
396 del co2
397 self.assert_event_counts(1, 1, 0, 0)
398
399 # again with second watcher registered
400 with self.code_watcher(1):
401 self.assert_event_counts(1, 1, 0, 0)
402 co3 = _testcapi.code_newempty("test_watchers", "dummy3", 0)
403 self.assert_event_counts(2, 1, 1, 0)
404 del co3
405 self.assert_event_counts(2, 2, 1, 1)
406
407 # verify counts are reset and don't change after both watchers are cleared
408 co4 = _testcapi.code_newempty("test_watchers", "dummy4", 0)
409 self.assert_event_counts(0, 0, 0, 0)
410 del co4
411 self.assert_event_counts(0, 0, 0, 0)
412
413 def test_error(self):
414 with self.code_watcher(2):
415 with catch_unraisable_exception() as cm:
416 co = _testcapi.code_newempty("test_watchers", "dummy0", 0)
417
418 self.assertEqual(
419 cm.unraisable.object,
420 f"PY_CODE_EVENT_CREATE watcher callback for {co!r}"
421 )
422 self.assertEqual(str(cm.unraisable.exc_value), "boom!")
423
424 def test_dealloc_error(self):
425 co = _testcapi.code_newempty("test_watchers", "dummy0", 0)
426 with self.code_watcher(2):
427 with catch_unraisable_exception() as cm:
428 del co
429
430 self.assertEqual(str(cm.unraisable.exc_value), "boom!")
431
432 def test_clear_out_of_range_watcher_id(self):
433 with self.assertRaisesRegex(ValueError, r"Invalid code watcher ID -1"):
434 _testcapi.clear_code_watcher(-1)
435 with self.assertRaisesRegex(ValueError, r"Invalid code watcher ID 8"):
436 _testcapi.clear_code_watcher(8) # CODE_MAX_WATCHERS = 8
437
438 def test_clear_unassigned_watcher_id(self):
439 with self.assertRaisesRegex(ValueError, r"No code watcher set for ID 1"):
440 _testcapi.clear_code_watcher(1)
441
442 def test_allocate_too_many_watchers(self):
443 with self.assertRaisesRegex(RuntimeError, r"no more code watcher IDs available"):
444 _testcapi.allocate_too_many_code_watchers()
445
446
447 class ESC[4;38;5;81mTestFuncWatchers(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
448 @contextmanager
449 def add_watcher(self, func):
450 wid = _testcapi.add_func_watcher(func)
451 try:
452 yield
453 finally:
454 _testcapi.clear_func_watcher(wid)
455
456 def test_func_events_dispatched(self):
457 events = []
458 def watcher(*args):
459 events.append(args)
460
461 with self.add_watcher(watcher):
462 def myfunc():
463 pass
464 self.assertIn((_testcapi.PYFUNC_EVENT_CREATE, myfunc, None), events)
465 myfunc_id = id(myfunc)
466
467 new_code = self.test_func_events_dispatched.__code__
468 myfunc.__code__ = new_code
469 self.assertIn((_testcapi.PYFUNC_EVENT_MODIFY_CODE, myfunc, new_code), events)
470
471 new_defaults = (123,)
472 myfunc.__defaults__ = new_defaults
473 self.assertIn((_testcapi.PYFUNC_EVENT_MODIFY_DEFAULTS, myfunc, new_defaults), events)
474
475 new_defaults = (456,)
476 _testcapi.set_func_defaults_via_capi(myfunc, new_defaults)
477 self.assertIn((_testcapi.PYFUNC_EVENT_MODIFY_DEFAULTS, myfunc, new_defaults), events)
478
479 new_kwdefaults = {"self": 123}
480 myfunc.__kwdefaults__ = new_kwdefaults
481 self.assertIn((_testcapi.PYFUNC_EVENT_MODIFY_KWDEFAULTS, myfunc, new_kwdefaults), events)
482
483 new_kwdefaults = {"self": 456}
484 _testcapi.set_func_kwdefaults_via_capi(myfunc, new_kwdefaults)
485 self.assertIn((_testcapi.PYFUNC_EVENT_MODIFY_KWDEFAULTS, myfunc, new_kwdefaults), events)
486
487 # Clear events reference to func
488 events = []
489 del myfunc
490 self.assertIn((_testcapi.PYFUNC_EVENT_DESTROY, myfunc_id, None), events)
491
492 def test_multiple_watchers(self):
493 events0 = []
494 def first_watcher(*args):
495 events0.append(args)
496
497 events1 = []
498 def second_watcher(*args):
499 events1.append(args)
500
501 with self.add_watcher(first_watcher):
502 with self.add_watcher(second_watcher):
503 def myfunc():
504 pass
505
506 event = (_testcapi.PYFUNC_EVENT_CREATE, myfunc, None)
507 self.assertIn(event, events0)
508 self.assertIn(event, events1)
509
510 def test_watcher_raises_error(self):
511 class ESC[4;38;5;81mMyError(ESC[4;38;5;149mException):
512 pass
513
514 def watcher(*args):
515 raise MyError("testing 123")
516
517 with self.add_watcher(watcher):
518 with catch_unraisable_exception() as cm:
519 def myfunc():
520 pass
521
522 self.assertEqual(
523 cm.unraisable.object,
524 f"PyFunction_EVENT_CREATE watcher callback for {myfunc!r}"
525 )
526
527 def test_dealloc_watcher_raises_error(self):
528 class ESC[4;38;5;81mMyError(ESC[4;38;5;149mException):
529 pass
530
531 def watcher(*args):
532 raise MyError("testing 123")
533
534 def myfunc():
535 pass
536
537 with self.add_watcher(watcher):
538 with catch_unraisable_exception() as cm:
539 del myfunc
540
541 self.assertIsInstance(cm.unraisable.exc_value, MyError)
542
543 def test_clear_out_of_range_watcher_id(self):
544 with self.assertRaisesRegex(ValueError, r"invalid func watcher ID -1"):
545 _testcapi.clear_func_watcher(-1)
546 with self.assertRaisesRegex(ValueError, r"invalid func watcher ID 8"):
547 _testcapi.clear_func_watcher(8) # FUNC_MAX_WATCHERS = 8
548
549 def test_clear_unassigned_watcher_id(self):
550 with self.assertRaisesRegex(ValueError, r"no func watcher set for ID 1"):
551 _testcapi.clear_func_watcher(1)
552
553 def test_allocate_too_many_watchers(self):
554 with self.assertRaisesRegex(RuntimeError, r"no more func watcher IDs"):
555 _testcapi.allocate_too_many_func_watchers()
556
557
558 if __name__ == "__main__":
559 unittest.main()