python (3.11.7)
1 import gc
2 import sys
3 import doctest
4 import unittest
5 import collections
6 import weakref
7 import operator
8 import contextlib
9 import copy
10 import threading
11 import time
12 import random
13
14 from test import support
15 from test.support import script_helper, ALWAYS_EQ
16 from test.support import gc_collect
17 from test.support import threading_helper
18
19 # Used in ReferencesTestCase.test_ref_created_during_del() .
20 ref_from_del = None
21
22 # Used by FinalizeTestCase as a global that may be replaced by None
23 # when the interpreter shuts down.
24 _global_var = 'foobar'
25
26 class ESC[4;38;5;81mC:
27 def method(self):
28 pass
29
30
31 class ESC[4;38;5;81mCallable:
32 bar = None
33
34 def __call__(self, x):
35 self.bar = x
36
37
38 def create_function():
39 def f(): pass
40 return f
41
42 def create_bound_method():
43 return C().method
44
45
46 class ESC[4;38;5;81mObject:
47 def __init__(self, arg):
48 self.arg = arg
49 def __repr__(self):
50 return "<Object %r>" % self.arg
51 def __eq__(self, other):
52 if isinstance(other, Object):
53 return self.arg == other.arg
54 return NotImplemented
55 def __lt__(self, other):
56 if isinstance(other, Object):
57 return self.arg < other.arg
58 return NotImplemented
59 def __hash__(self):
60 return hash(self.arg)
61 def some_method(self):
62 return 4
63 def other_method(self):
64 return 5
65
66
67 class ESC[4;38;5;81mRefCycle:
68 def __init__(self):
69 self.cycle = self
70
71
72 class ESC[4;38;5;81mTestBase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
73
74 def setUp(self):
75 self.cbcalled = 0
76
77 def callback(self, ref):
78 self.cbcalled += 1
79
80
81 @contextlib.contextmanager
82 def collect_in_thread(period=0.0001):
83 """
84 Ensure GC collections happen in a different thread, at a high frequency.
85 """
86 please_stop = False
87
88 def collect():
89 while not please_stop:
90 time.sleep(period)
91 gc.collect()
92
93 with support.disable_gc():
94 t = threading.Thread(target=collect)
95 t.start()
96 try:
97 yield
98 finally:
99 please_stop = True
100 t.join()
101
102
103 class ESC[4;38;5;81mReferencesTestCase(ESC[4;38;5;149mTestBase):
104
105 def test_basic_ref(self):
106 self.check_basic_ref(C)
107 self.check_basic_ref(create_function)
108 self.check_basic_ref(create_bound_method)
109
110 # Just make sure the tp_repr handler doesn't raise an exception.
111 # Live reference:
112 o = C()
113 wr = weakref.ref(o)
114 repr(wr)
115 # Dead reference:
116 del o
117 repr(wr)
118
119 def test_repr_failure_gh99184(self):
120 class ESC[4;38;5;81mMyConfig(ESC[4;38;5;149mdict):
121 def __getattr__(self, x):
122 return self[x]
123
124 obj = MyConfig(offset=5)
125 obj_weakref = weakref.ref(obj)
126
127 self.assertIn('MyConfig', repr(obj_weakref))
128 self.assertIn('MyConfig', str(obj_weakref))
129
130 def test_basic_callback(self):
131 self.check_basic_callback(C)
132 self.check_basic_callback(create_function)
133 self.check_basic_callback(create_bound_method)
134
135 @support.cpython_only
136 def test_cfunction(self):
137 import _testcapi
138 create_cfunction = _testcapi.create_cfunction
139 f = create_cfunction()
140 wr = weakref.ref(f)
141 self.assertIs(wr(), f)
142 del f
143 self.assertIsNone(wr())
144 self.check_basic_ref(create_cfunction)
145 self.check_basic_callback(create_cfunction)
146
147 def test_multiple_callbacks(self):
148 o = C()
149 ref1 = weakref.ref(o, self.callback)
150 ref2 = weakref.ref(o, self.callback)
151 del o
152 gc_collect() # For PyPy or other GCs.
153 self.assertIsNone(ref1(), "expected reference to be invalidated")
154 self.assertIsNone(ref2(), "expected reference to be invalidated")
155 self.assertEqual(self.cbcalled, 2,
156 "callback not called the right number of times")
157
158 def test_multiple_selfref_callbacks(self):
159 # Make sure all references are invalidated before callbacks are called
160 #
161 # What's important here is that we're using the first
162 # reference in the callback invoked on the second reference
163 # (the most recently created ref is cleaned up first). This
164 # tests that all references to the object are invalidated
165 # before any of the callbacks are invoked, so that we only
166 # have one invocation of _weakref.c:cleanup_helper() active
167 # for a particular object at a time.
168 #
169 def callback(object, self=self):
170 self.ref()
171 c = C()
172 self.ref = weakref.ref(c, callback)
173 ref1 = weakref.ref(c, callback)
174 del c
175
176 def test_constructor_kwargs(self):
177 c = C()
178 self.assertRaises(TypeError, weakref.ref, c, callback=None)
179
180 def test_proxy_ref(self):
181 o = C()
182 o.bar = 1
183 ref1 = weakref.proxy(o, self.callback)
184 ref2 = weakref.proxy(o, self.callback)
185 del o
186 gc_collect() # For PyPy or other GCs.
187
188 def check(proxy):
189 proxy.bar
190
191 self.assertRaises(ReferenceError, check, ref1)
192 self.assertRaises(ReferenceError, check, ref2)
193 ref3 = weakref.proxy(C())
194 gc_collect() # For PyPy or other GCs.
195 self.assertRaises(ReferenceError, bool, ref3)
196 self.assertEqual(self.cbcalled, 2)
197
198 def check_basic_ref(self, factory):
199 o = factory()
200 ref = weakref.ref(o)
201 self.assertIsNotNone(ref(),
202 "weak reference to live object should be live")
203 o2 = ref()
204 self.assertIs(o, o2,
205 "<ref>() should return original object if live")
206
207 def check_basic_callback(self, factory):
208 self.cbcalled = 0
209 o = factory()
210 ref = weakref.ref(o, self.callback)
211 del o
212 gc_collect() # For PyPy or other GCs.
213 self.assertEqual(self.cbcalled, 1,
214 "callback did not properly set 'cbcalled'")
215 self.assertIsNone(ref(),
216 "ref2 should be dead after deleting object reference")
217
218 def test_ref_reuse(self):
219 o = C()
220 ref1 = weakref.ref(o)
221 # create a proxy to make sure that there's an intervening creation
222 # between these two; it should make no difference
223 proxy = weakref.proxy(o)
224 ref2 = weakref.ref(o)
225 self.assertIs(ref1, ref2,
226 "reference object w/out callback should be re-used")
227
228 o = C()
229 proxy = weakref.proxy(o)
230 ref1 = weakref.ref(o)
231 ref2 = weakref.ref(o)
232 self.assertIs(ref1, ref2,
233 "reference object w/out callback should be re-used")
234 self.assertEqual(weakref.getweakrefcount(o), 2,
235 "wrong weak ref count for object")
236 del proxy
237 gc_collect() # For PyPy or other GCs.
238 self.assertEqual(weakref.getweakrefcount(o), 1,
239 "wrong weak ref count for object after deleting proxy")
240
241 def test_proxy_reuse(self):
242 o = C()
243 proxy1 = weakref.proxy(o)
244 ref = weakref.ref(o)
245 proxy2 = weakref.proxy(o)
246 self.assertIs(proxy1, proxy2,
247 "proxy object w/out callback should have been re-used")
248
249 def test_basic_proxy(self):
250 o = C()
251 self.check_proxy(o, weakref.proxy(o))
252
253 L = collections.UserList()
254 p = weakref.proxy(L)
255 self.assertFalse(p, "proxy for empty UserList should be false")
256 p.append(12)
257 self.assertEqual(len(L), 1)
258 self.assertTrue(p, "proxy for non-empty UserList should be true")
259 p[:] = [2, 3]
260 self.assertEqual(len(L), 2)
261 self.assertEqual(len(p), 2)
262 self.assertIn(3, p, "proxy didn't support __contains__() properly")
263 p[1] = 5
264 self.assertEqual(L[1], 5)
265 self.assertEqual(p[1], 5)
266 L2 = collections.UserList(L)
267 p2 = weakref.proxy(L2)
268 self.assertEqual(p, p2)
269 ## self.assertEqual(repr(L2), repr(p2))
270 L3 = collections.UserList(range(10))
271 p3 = weakref.proxy(L3)
272 self.assertEqual(L3[:], p3[:])
273 self.assertEqual(L3[5:], p3[5:])
274 self.assertEqual(L3[:5], p3[:5])
275 self.assertEqual(L3[2:5], p3[2:5])
276
277 def test_proxy_unicode(self):
278 # See bug 5037
279 class ESC[4;38;5;81mC(ESC[4;38;5;149mobject):
280 def __str__(self):
281 return "string"
282 def __bytes__(self):
283 return b"bytes"
284 instance = C()
285 self.assertIn("__bytes__", dir(weakref.proxy(instance)))
286 self.assertEqual(bytes(weakref.proxy(instance)), b"bytes")
287
288 def test_proxy_index(self):
289 class ESC[4;38;5;81mC:
290 def __index__(self):
291 return 10
292 o = C()
293 p = weakref.proxy(o)
294 self.assertEqual(operator.index(p), 10)
295
296 def test_proxy_div(self):
297 class ESC[4;38;5;81mC:
298 def __floordiv__(self, other):
299 return 42
300 def __ifloordiv__(self, other):
301 return 21
302 o = C()
303 p = weakref.proxy(o)
304 self.assertEqual(p // 5, 42)
305 p //= 5
306 self.assertEqual(p, 21)
307
308 def test_proxy_matmul(self):
309 class ESC[4;38;5;81mC:
310 def __matmul__(self, other):
311 return 1729
312 def __rmatmul__(self, other):
313 return -163
314 def __imatmul__(self, other):
315 return 561
316 o = C()
317 p = weakref.proxy(o)
318 self.assertEqual(p @ 5, 1729)
319 self.assertEqual(5 @ p, -163)
320 p @= 5
321 self.assertEqual(p, 561)
322
323 # The PyWeakref_* C API is documented as allowing either NULL or
324 # None as the value for the callback, where either means "no
325 # callback". The "no callback" ref and proxy objects are supposed
326 # to be shared so long as they exist by all callers so long as
327 # they are active. In Python 2.3.3 and earlier, this guarantee
328 # was not honored, and was broken in different ways for
329 # PyWeakref_NewRef() and PyWeakref_NewProxy(). (Two tests.)
330
331 def test_shared_ref_without_callback(self):
332 self.check_shared_without_callback(weakref.ref)
333
334 def test_shared_proxy_without_callback(self):
335 self.check_shared_without_callback(weakref.proxy)
336
337 def check_shared_without_callback(self, makeref):
338 o = Object(1)
339 p1 = makeref(o, None)
340 p2 = makeref(o, None)
341 self.assertIs(p1, p2, "both callbacks were None in the C API")
342 del p1, p2
343 p1 = makeref(o)
344 p2 = makeref(o, None)
345 self.assertIs(p1, p2, "callbacks were NULL, None in the C API")
346 del p1, p2
347 p1 = makeref(o)
348 p2 = makeref(o)
349 self.assertIs(p1, p2, "both callbacks were NULL in the C API")
350 del p1, p2
351 p1 = makeref(o, None)
352 p2 = makeref(o)
353 self.assertIs(p1, p2, "callbacks were None, NULL in the C API")
354
355 def test_callable_proxy(self):
356 o = Callable()
357 ref1 = weakref.proxy(o)
358
359 self.check_proxy(o, ref1)
360
361 self.assertIs(type(ref1), weakref.CallableProxyType,
362 "proxy is not of callable type")
363 ref1('twinkies!')
364 self.assertEqual(o.bar, 'twinkies!',
365 "call through proxy not passed through to original")
366 ref1(x='Splat.')
367 self.assertEqual(o.bar, 'Splat.',
368 "call through proxy not passed through to original")
369
370 # expect due to too few args
371 self.assertRaises(TypeError, ref1)
372
373 # expect due to too many args
374 self.assertRaises(TypeError, ref1, 1, 2, 3)
375
376 def check_proxy(self, o, proxy):
377 o.foo = 1
378 self.assertEqual(proxy.foo, 1,
379 "proxy does not reflect attribute addition")
380 o.foo = 2
381 self.assertEqual(proxy.foo, 2,
382 "proxy does not reflect attribute modification")
383 del o.foo
384 self.assertFalse(hasattr(proxy, 'foo'),
385 "proxy does not reflect attribute removal")
386
387 proxy.foo = 1
388 self.assertEqual(o.foo, 1,
389 "object does not reflect attribute addition via proxy")
390 proxy.foo = 2
391 self.assertEqual(o.foo, 2,
392 "object does not reflect attribute modification via proxy")
393 del proxy.foo
394 self.assertFalse(hasattr(o, 'foo'),
395 "object does not reflect attribute removal via proxy")
396
397 def test_proxy_deletion(self):
398 # Test clearing of SF bug #762891
399 class ESC[4;38;5;81mFoo:
400 result = None
401 def __delitem__(self, accessor):
402 self.result = accessor
403 g = Foo()
404 f = weakref.proxy(g)
405 del f[0]
406 self.assertEqual(f.result, 0)
407
408 def test_proxy_bool(self):
409 # Test clearing of SF bug #1170766
410 class ESC[4;38;5;81mList(ESC[4;38;5;149mlist): pass
411 lyst = List()
412 self.assertEqual(bool(weakref.proxy(lyst)), bool(lyst))
413
414 def test_proxy_iter(self):
415 # Test fails with a debug build of the interpreter
416 # (see bpo-38395).
417
418 obj = None
419
420 class ESC[4;38;5;81mMyObj:
421 def __iter__(self):
422 nonlocal obj
423 del obj
424 return NotImplemented
425
426 obj = MyObj()
427 p = weakref.proxy(obj)
428 with self.assertRaises(TypeError):
429 # "blech" in p calls MyObj.__iter__ through the proxy,
430 # without keeping a reference to the real object, so it
431 # can be killed in the middle of the call
432 "blech" in p
433
434 def test_proxy_next(self):
435 arr = [4, 5, 6]
436 def iterator_func():
437 yield from arr
438 it = iterator_func()
439
440 class ESC[4;38;5;81mIteratesWeakly:
441 def __iter__(self):
442 return weakref.proxy(it)
443
444 weak_it = IteratesWeakly()
445
446 # Calls proxy.__next__
447 self.assertEqual(list(weak_it), [4, 5, 6])
448
449 def test_proxy_bad_next(self):
450 # bpo-44720: PyIter_Next() shouldn't be called if the reference
451 # isn't an iterator.
452
453 not_an_iterator = lambda: 0
454
455 class ESC[4;38;5;81mA:
456 def __iter__(self):
457 return weakref.proxy(not_an_iterator)
458 a = A()
459
460 msg = "Weakref proxy referenced a non-iterator"
461 with self.assertRaisesRegex(TypeError, msg):
462 list(a)
463
464 def test_proxy_reversed(self):
465 class ESC[4;38;5;81mMyObj:
466 def __len__(self):
467 return 3
468 def __reversed__(self):
469 return iter('cba')
470
471 obj = MyObj()
472 self.assertEqual("".join(reversed(weakref.proxy(obj))), "cba")
473
474 def test_proxy_hash(self):
475 class ESC[4;38;5;81mMyObj:
476 def __hash__(self):
477 return 42
478
479 obj = MyObj()
480 with self.assertRaises(TypeError):
481 hash(weakref.proxy(obj))
482
483 class ESC[4;38;5;81mMyObj:
484 __hash__ = None
485
486 obj = MyObj()
487 with self.assertRaises(TypeError):
488 hash(weakref.proxy(obj))
489
490 def test_getweakrefcount(self):
491 o = C()
492 ref1 = weakref.ref(o)
493 ref2 = weakref.ref(o, self.callback)
494 self.assertEqual(weakref.getweakrefcount(o), 2,
495 "got wrong number of weak reference objects")
496
497 proxy1 = weakref.proxy(o)
498 proxy2 = weakref.proxy(o, self.callback)
499 self.assertEqual(weakref.getweakrefcount(o), 4,
500 "got wrong number of weak reference objects")
501
502 del ref1, ref2, proxy1, proxy2
503 gc_collect() # For PyPy or other GCs.
504 self.assertEqual(weakref.getweakrefcount(o), 0,
505 "weak reference objects not unlinked from"
506 " referent when discarded.")
507
508 # assumes ints do not support weakrefs
509 self.assertEqual(weakref.getweakrefcount(1), 0,
510 "got wrong number of weak reference objects for int")
511
512 def test_getweakrefs(self):
513 o = C()
514 ref1 = weakref.ref(o, self.callback)
515 ref2 = weakref.ref(o, self.callback)
516 del ref1
517 gc_collect() # For PyPy or other GCs.
518 self.assertEqual(weakref.getweakrefs(o), [ref2],
519 "list of refs does not match")
520
521 o = C()
522 ref1 = weakref.ref(o, self.callback)
523 ref2 = weakref.ref(o, self.callback)
524 del ref2
525 gc_collect() # For PyPy or other GCs.
526 self.assertEqual(weakref.getweakrefs(o), [ref1],
527 "list of refs does not match")
528
529 del ref1
530 gc_collect() # For PyPy or other GCs.
531 self.assertEqual(weakref.getweakrefs(o), [],
532 "list of refs not cleared")
533
534 # assumes ints do not support weakrefs
535 self.assertEqual(weakref.getweakrefs(1), [],
536 "list of refs does not match for int")
537
538 def test_newstyle_number_ops(self):
539 class ESC[4;38;5;81mF(ESC[4;38;5;149mfloat):
540 pass
541 f = F(2.0)
542 p = weakref.proxy(f)
543 self.assertEqual(p + 1.0, 3.0)
544 self.assertEqual(1.0 + p, 3.0) # this used to SEGV
545
546 def test_callbacks_protected(self):
547 # Callbacks protected from already-set exceptions?
548 # Regression test for SF bug #478534.
549 class ESC[4;38;5;81mBogusError(ESC[4;38;5;149mException):
550 pass
551 data = {}
552 def remove(k):
553 del data[k]
554 def encapsulate():
555 f = lambda : ()
556 data[weakref.ref(f, remove)] = None
557 raise BogusError
558 try:
559 encapsulate()
560 except BogusError:
561 pass
562 else:
563 self.fail("exception not properly restored")
564 try:
565 encapsulate()
566 except BogusError:
567 pass
568 else:
569 self.fail("exception not properly restored")
570
571 def test_sf_bug_840829(self):
572 # "weakref callbacks and gc corrupt memory"
573 # subtype_dealloc erroneously exposed a new-style instance
574 # already in the process of getting deallocated to gc,
575 # causing double-deallocation if the instance had a weakref
576 # callback that triggered gc.
577 # If the bug exists, there probably won't be an obvious symptom
578 # in a release build. In a debug build, a segfault will occur
579 # when the second attempt to remove the instance from the "list
580 # of all objects" occurs.
581
582 import gc
583
584 class ESC[4;38;5;81mC(ESC[4;38;5;149mobject):
585 pass
586
587 c = C()
588 wr = weakref.ref(c, lambda ignore: gc.collect())
589 del c
590
591 # There endeth the first part. It gets worse.
592 del wr
593
594 c1 = C()
595 c1.i = C()
596 wr = weakref.ref(c1.i, lambda ignore: gc.collect())
597
598 c2 = C()
599 c2.c1 = c1
600 del c1 # still alive because c2 points to it
601
602 # Now when subtype_dealloc gets called on c2, it's not enough just
603 # that c2 is immune from gc while the weakref callbacks associated
604 # with c2 execute (there are none in this 2nd half of the test, btw).
605 # subtype_dealloc goes on to call the base classes' deallocs too,
606 # so any gc triggered by weakref callbacks associated with anything
607 # torn down by a base class dealloc can also trigger double
608 # deallocation of c2.
609 del c2
610
611 def test_callback_in_cycle(self):
612 import gc
613
614 class ESC[4;38;5;81mJ(ESC[4;38;5;149mobject):
615 pass
616
617 class ESC[4;38;5;81mII(ESC[4;38;5;149mobject):
618 def acallback(self, ignore):
619 self.J
620
621 I = II()
622 I.J = J
623 I.wr = weakref.ref(J, I.acallback)
624
625 # Now J and II are each in a self-cycle (as all new-style class
626 # objects are, since their __mro__ points back to them). I holds
627 # both a weak reference (I.wr) and a strong reference (I.J) to class
628 # J. I is also in a cycle (I.wr points to a weakref that references
629 # I.acallback). When we del these three, they all become trash, but
630 # the cycles prevent any of them from getting cleaned up immediately.
631 # Instead they have to wait for cyclic gc to deduce that they're
632 # trash.
633 #
634 # gc used to call tp_clear on all of them, and the order in which
635 # it does that is pretty accidental. The exact order in which we
636 # built up these things manages to provoke gc into running tp_clear
637 # in just the right order (I last). Calling tp_clear on II leaves
638 # behind an insane class object (its __mro__ becomes NULL). Calling
639 # tp_clear on J breaks its self-cycle, but J doesn't get deleted
640 # just then because of the strong reference from I.J. Calling
641 # tp_clear on I starts to clear I's __dict__, and just happens to
642 # clear I.J first -- I.wr is still intact. That removes the last
643 # reference to J, which triggers the weakref callback. The callback
644 # tries to do "self.J", and instances of new-style classes look up
645 # attributes ("J") in the class dict first. The class (II) wants to
646 # search II.__mro__, but that's NULL. The result was a segfault in
647 # a release build, and an assert failure in a debug build.
648 del I, J, II
649 gc.collect()
650
651 def test_callback_reachable_one_way(self):
652 import gc
653
654 # This one broke the first patch that fixed the previous test. In this case,
655 # the objects reachable from the callback aren't also reachable
656 # from the object (c1) *triggering* the callback: you can get to
657 # c1 from c2, but not vice-versa. The result was that c2's __dict__
658 # got tp_clear'ed by the time the c2.cb callback got invoked.
659
660 class ESC[4;38;5;81mC:
661 def cb(self, ignore):
662 self.me
663 self.c1
664 self.wr
665
666 c1, c2 = C(), C()
667
668 c2.me = c2
669 c2.c1 = c1
670 c2.wr = weakref.ref(c1, c2.cb)
671
672 del c1, c2
673 gc.collect()
674
675 def test_callback_different_classes(self):
676 import gc
677
678 # Like test_callback_reachable_one_way, except c2 and c1 have different
679 # classes. c2's class (C) isn't reachable from c1 then, so protecting
680 # objects reachable from the dying object (c1) isn't enough to stop
681 # c2's class (C) from getting tp_clear'ed before c2.cb is invoked.
682 # The result was a segfault (C.__mro__ was NULL when the callback
683 # tried to look up self.me).
684
685 class ESC[4;38;5;81mC(ESC[4;38;5;149mobject):
686 def cb(self, ignore):
687 self.me
688 self.c1
689 self.wr
690
691 class ESC[4;38;5;81mD:
692 pass
693
694 c1, c2 = D(), C()
695
696 c2.me = c2
697 c2.c1 = c1
698 c2.wr = weakref.ref(c1, c2.cb)
699
700 del c1, c2, C, D
701 gc.collect()
702
703 def test_callback_in_cycle_resurrection(self):
704 import gc
705
706 # Do something nasty in a weakref callback: resurrect objects
707 # from dead cycles. For this to be attempted, the weakref and
708 # its callback must also be part of the cyclic trash (else the
709 # objects reachable via the callback couldn't be in cyclic trash
710 # to begin with -- the callback would act like an external root).
711 # But gc clears trash weakrefs with callbacks early now, which
712 # disables the callbacks, so the callbacks shouldn't get called
713 # at all (and so nothing actually gets resurrected).
714
715 alist = []
716 class ESC[4;38;5;81mC(ESC[4;38;5;149mobject):
717 def __init__(self, value):
718 self.attribute = value
719
720 def acallback(self, ignore):
721 alist.append(self.c)
722
723 c1, c2 = C(1), C(2)
724 c1.c = c2
725 c2.c = c1
726 c1.wr = weakref.ref(c2, c1.acallback)
727 c2.wr = weakref.ref(c1, c2.acallback)
728
729 def C_went_away(ignore):
730 alist.append("C went away")
731 wr = weakref.ref(C, C_went_away)
732
733 del c1, c2, C # make them all trash
734 self.assertEqual(alist, []) # del isn't enough to reclaim anything
735
736 gc.collect()
737 # c1.wr and c2.wr were part of the cyclic trash, so should have
738 # been cleared without their callbacks executing. OTOH, the weakref
739 # to C is bound to a function local (wr), and wasn't trash, so that
740 # callback should have been invoked when C went away.
741 self.assertEqual(alist, ["C went away"])
742 # The remaining weakref should be dead now (its callback ran).
743 self.assertEqual(wr(), None)
744
745 del alist[:]
746 gc.collect()
747 self.assertEqual(alist, [])
748
749 def test_callbacks_on_callback(self):
750 import gc
751
752 # Set up weakref callbacks *on* weakref callbacks.
753 alist = []
754 def safe_callback(ignore):
755 alist.append("safe_callback called")
756
757 class ESC[4;38;5;81mC(ESC[4;38;5;149mobject):
758 def cb(self, ignore):
759 alist.append("cb called")
760
761 c, d = C(), C()
762 c.other = d
763 d.other = c
764 callback = c.cb
765 c.wr = weakref.ref(d, callback) # this won't trigger
766 d.wr = weakref.ref(callback, d.cb) # ditto
767 external_wr = weakref.ref(callback, safe_callback) # but this will
768 self.assertIs(external_wr(), callback)
769
770 # The weakrefs attached to c and d should get cleared, so that
771 # C.cb is never called. But external_wr isn't part of the cyclic
772 # trash, and no cyclic trash is reachable from it, so safe_callback
773 # should get invoked when the bound method object callback (c.cb)
774 # -- which is itself a callback, and also part of the cyclic trash --
775 # gets reclaimed at the end of gc.
776
777 del callback, c, d, C
778 self.assertEqual(alist, []) # del isn't enough to clean up cycles
779 gc.collect()
780 self.assertEqual(alist, ["safe_callback called"])
781 self.assertEqual(external_wr(), None)
782
783 del alist[:]
784 gc.collect()
785 self.assertEqual(alist, [])
786
787 def test_gc_during_ref_creation(self):
788 self.check_gc_during_creation(weakref.ref)
789
790 def test_gc_during_proxy_creation(self):
791 self.check_gc_during_creation(weakref.proxy)
792
793 def check_gc_during_creation(self, makeref):
794 thresholds = gc.get_threshold()
795 gc.set_threshold(1, 1, 1)
796 gc.collect()
797 class ESC[4;38;5;81mA:
798 pass
799
800 def callback(*args):
801 pass
802
803 referenced = A()
804
805 a = A()
806 a.a = a
807 a.wr = makeref(referenced)
808
809 try:
810 # now make sure the object and the ref get labeled as
811 # cyclic trash:
812 a = A()
813 weakref.ref(referenced, callback)
814
815 finally:
816 gc.set_threshold(*thresholds)
817
818 def test_ref_created_during_del(self):
819 # Bug #1377858
820 # A weakref created in an object's __del__() would crash the
821 # interpreter when the weakref was cleaned up since it would refer to
822 # non-existent memory. This test should not segfault the interpreter.
823 class ESC[4;38;5;81mTarget(ESC[4;38;5;149mobject):
824 def __del__(self):
825 global ref_from_del
826 ref_from_del = weakref.ref(self)
827
828 w = Target()
829
830 def test_init(self):
831 # Issue 3634
832 # <weakref to class>.__init__() doesn't check errors correctly
833 r = weakref.ref(Exception)
834 self.assertRaises(TypeError, r.__init__, 0, 0, 0, 0, 0)
835 # No exception should be raised here
836 gc.collect()
837
838 def test_classes(self):
839 # Check that classes are weakrefable.
840 class ESC[4;38;5;81mA(ESC[4;38;5;149mobject):
841 pass
842 l = []
843 weakref.ref(int)
844 a = weakref.ref(A, l.append)
845 A = None
846 gc.collect()
847 self.assertEqual(a(), None)
848 self.assertEqual(l, [a])
849
850 def test_equality(self):
851 # Alive weakrefs defer equality testing to their underlying object.
852 x = Object(1)
853 y = Object(1)
854 z = Object(2)
855 a = weakref.ref(x)
856 b = weakref.ref(y)
857 c = weakref.ref(z)
858 d = weakref.ref(x)
859 # Note how we directly test the operators here, to stress both
860 # __eq__ and __ne__.
861 self.assertTrue(a == b)
862 self.assertFalse(a != b)
863 self.assertFalse(a == c)
864 self.assertTrue(a != c)
865 self.assertTrue(a == d)
866 self.assertFalse(a != d)
867 self.assertFalse(a == x)
868 self.assertTrue(a != x)
869 self.assertTrue(a == ALWAYS_EQ)
870 self.assertFalse(a != ALWAYS_EQ)
871 del x, y, z
872 gc.collect()
873 for r in a, b, c:
874 # Sanity check
875 self.assertIs(r(), None)
876 # Dead weakrefs compare by identity: whether `a` and `d` are the
877 # same weakref object is an implementation detail, since they pointed
878 # to the same original object and didn't have a callback.
879 # (see issue #16453).
880 self.assertFalse(a == b)
881 self.assertTrue(a != b)
882 self.assertFalse(a == c)
883 self.assertTrue(a != c)
884 self.assertEqual(a == d, a is d)
885 self.assertEqual(a != d, a is not d)
886
887 def test_ordering(self):
888 # weakrefs cannot be ordered, even if the underlying objects can.
889 ops = [operator.lt, operator.gt, operator.le, operator.ge]
890 x = Object(1)
891 y = Object(1)
892 a = weakref.ref(x)
893 b = weakref.ref(y)
894 for op in ops:
895 self.assertRaises(TypeError, op, a, b)
896 # Same when dead.
897 del x, y
898 gc.collect()
899 for op in ops:
900 self.assertRaises(TypeError, op, a, b)
901
902 def test_hashing(self):
903 # Alive weakrefs hash the same as the underlying object
904 x = Object(42)
905 y = Object(42)
906 a = weakref.ref(x)
907 b = weakref.ref(y)
908 self.assertEqual(hash(a), hash(42))
909 del x, y
910 gc.collect()
911 # Dead weakrefs:
912 # - retain their hash is they were hashed when alive;
913 # - otherwise, cannot be hashed.
914 self.assertEqual(hash(a), hash(42))
915 self.assertRaises(TypeError, hash, b)
916
917 def test_trashcan_16602(self):
918 # Issue #16602: when a weakref's target was part of a long
919 # deallocation chain, the trashcan mechanism could delay clearing
920 # of the weakref and make the target object visible from outside
921 # code even though its refcount had dropped to 0. A crash ensued.
922 class ESC[4;38;5;81mC:
923 def __init__(self, parent):
924 if not parent:
925 return
926 wself = weakref.ref(self)
927 def cb(wparent):
928 o = wself()
929 self.wparent = weakref.ref(parent, cb)
930
931 d = weakref.WeakKeyDictionary()
932 root = c = C(None)
933 for n in range(100):
934 d[c] = c = C(c)
935 del root
936 gc.collect()
937
938 def test_callback_attribute(self):
939 x = Object(1)
940 callback = lambda ref: None
941 ref1 = weakref.ref(x, callback)
942 self.assertIs(ref1.__callback__, callback)
943
944 ref2 = weakref.ref(x)
945 self.assertIsNone(ref2.__callback__)
946
947 def test_callback_attribute_after_deletion(self):
948 x = Object(1)
949 ref = weakref.ref(x, self.callback)
950 self.assertIsNotNone(ref.__callback__)
951 del x
952 support.gc_collect()
953 self.assertIsNone(ref.__callback__)
954
955 def test_set_callback_attribute(self):
956 x = Object(1)
957 callback = lambda ref: None
958 ref1 = weakref.ref(x, callback)
959 with self.assertRaises(AttributeError):
960 ref1.__callback__ = lambda ref: None
961
962 def test_callback_gcs(self):
963 class ESC[4;38;5;81mObjectWithDel(ESC[4;38;5;149mObject):
964 def __del__(self): pass
965 x = ObjectWithDel(1)
966 ref1 = weakref.ref(x, lambda ref: support.gc_collect())
967 del x
968 support.gc_collect()
969
970
971 class ESC[4;38;5;81mSubclassableWeakrefTestCase(ESC[4;38;5;149mTestBase):
972
973 def test_subclass_refs(self):
974 class ESC[4;38;5;81mMyRef(ESC[4;38;5;149mweakrefESC[4;38;5;149m.ESC[4;38;5;149mref):
975 def __init__(self, ob, callback=None, value=42):
976 self.value = value
977 super().__init__(ob, callback)
978 def __call__(self):
979 self.called = True
980 return super().__call__()
981 o = Object("foo")
982 mr = MyRef(o, value=24)
983 self.assertIs(mr(), o)
984 self.assertTrue(mr.called)
985 self.assertEqual(mr.value, 24)
986 del o
987 gc_collect() # For PyPy or other GCs.
988 self.assertIsNone(mr())
989 self.assertTrue(mr.called)
990
991 def test_subclass_refs_dont_replace_standard_refs(self):
992 class ESC[4;38;5;81mMyRef(ESC[4;38;5;149mweakrefESC[4;38;5;149m.ESC[4;38;5;149mref):
993 pass
994 o = Object(42)
995 r1 = MyRef(o)
996 r2 = weakref.ref(o)
997 self.assertIsNot(r1, r2)
998 self.assertEqual(weakref.getweakrefs(o), [r2, r1])
999 self.assertEqual(weakref.getweakrefcount(o), 2)
1000 r3 = MyRef(o)
1001 self.assertEqual(weakref.getweakrefcount(o), 3)
1002 refs = weakref.getweakrefs(o)
1003 self.assertEqual(len(refs), 3)
1004 self.assertIs(r2, refs[0])
1005 self.assertIn(r1, refs[1:])
1006 self.assertIn(r3, refs[1:])
1007
1008 def test_subclass_refs_dont_conflate_callbacks(self):
1009 class ESC[4;38;5;81mMyRef(ESC[4;38;5;149mweakrefESC[4;38;5;149m.ESC[4;38;5;149mref):
1010 pass
1011 o = Object(42)
1012 r1 = MyRef(o, id)
1013 r2 = MyRef(o, str)
1014 self.assertIsNot(r1, r2)
1015 refs = weakref.getweakrefs(o)
1016 self.assertIn(r1, refs)
1017 self.assertIn(r2, refs)
1018
1019 def test_subclass_refs_with_slots(self):
1020 class ESC[4;38;5;81mMyRef(ESC[4;38;5;149mweakrefESC[4;38;5;149m.ESC[4;38;5;149mref):
1021 __slots__ = "slot1", "slot2"
1022 def __new__(type, ob, callback, slot1, slot2):
1023 return weakref.ref.__new__(type, ob, callback)
1024 def __init__(self, ob, callback, slot1, slot2):
1025 self.slot1 = slot1
1026 self.slot2 = slot2
1027 def meth(self):
1028 return self.slot1 + self.slot2
1029 o = Object(42)
1030 r = MyRef(o, None, "abc", "def")
1031 self.assertEqual(r.slot1, "abc")
1032 self.assertEqual(r.slot2, "def")
1033 self.assertEqual(r.meth(), "abcdef")
1034 self.assertFalse(hasattr(r, "__dict__"))
1035
1036 def test_subclass_refs_with_cycle(self):
1037 """Confirm https://bugs.python.org/issue3100 is fixed."""
1038 # An instance of a weakref subclass can have attributes.
1039 # If such a weakref holds the only strong reference to the object,
1040 # deleting the weakref will delete the object. In this case,
1041 # the callback must not be called, because the ref object is
1042 # being deleted.
1043 class ESC[4;38;5;81mMyRef(ESC[4;38;5;149mweakrefESC[4;38;5;149m.ESC[4;38;5;149mref):
1044 pass
1045
1046 # Use a local callback, for "regrtest -R::"
1047 # to detect refcounting problems
1048 def callback(w):
1049 self.cbcalled += 1
1050
1051 o = C()
1052 r1 = MyRef(o, callback)
1053 r1.o = o
1054 del o
1055
1056 del r1 # Used to crash here
1057
1058 self.assertEqual(self.cbcalled, 0)
1059
1060 # Same test, with two weakrefs to the same object
1061 # (since code paths are different)
1062 o = C()
1063 r1 = MyRef(o, callback)
1064 r2 = MyRef(o, callback)
1065 r1.r = r2
1066 r2.o = o
1067 del o
1068 del r2
1069
1070 del r1 # Used to crash here
1071
1072 self.assertEqual(self.cbcalled, 0)
1073
1074
1075 class ESC[4;38;5;81mWeakMethodTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1076
1077 def _subclass(self):
1078 """Return an Object subclass overriding `some_method`."""
1079 class ESC[4;38;5;81mC(ESC[4;38;5;149mObject):
1080 def some_method(self):
1081 return 6
1082 return C
1083
1084 def test_alive(self):
1085 o = Object(1)
1086 r = weakref.WeakMethod(o.some_method)
1087 self.assertIsInstance(r, weakref.ReferenceType)
1088 self.assertIsInstance(r(), type(o.some_method))
1089 self.assertIs(r().__self__, o)
1090 self.assertIs(r().__func__, o.some_method.__func__)
1091 self.assertEqual(r()(), 4)
1092
1093 def test_object_dead(self):
1094 o = Object(1)
1095 r = weakref.WeakMethod(o.some_method)
1096 del o
1097 gc.collect()
1098 self.assertIs(r(), None)
1099
1100 def test_method_dead(self):
1101 C = self._subclass()
1102 o = C(1)
1103 r = weakref.WeakMethod(o.some_method)
1104 del C.some_method
1105 gc.collect()
1106 self.assertIs(r(), None)
1107
1108 def test_callback_when_object_dead(self):
1109 # Test callback behaviour when object dies first.
1110 C = self._subclass()
1111 calls = []
1112 def cb(arg):
1113 calls.append(arg)
1114 o = C(1)
1115 r = weakref.WeakMethod(o.some_method, cb)
1116 del o
1117 gc.collect()
1118 self.assertEqual(calls, [r])
1119 # Callback is only called once.
1120 C.some_method = Object.some_method
1121 gc.collect()
1122 self.assertEqual(calls, [r])
1123
1124 def test_callback_when_method_dead(self):
1125 # Test callback behaviour when method dies first.
1126 C = self._subclass()
1127 calls = []
1128 def cb(arg):
1129 calls.append(arg)
1130 o = C(1)
1131 r = weakref.WeakMethod(o.some_method, cb)
1132 del C.some_method
1133 gc.collect()
1134 self.assertEqual(calls, [r])
1135 # Callback is only called once.
1136 del o
1137 gc.collect()
1138 self.assertEqual(calls, [r])
1139
1140 @support.cpython_only
1141 def test_no_cycles(self):
1142 # A WeakMethod doesn't create any reference cycle to itself.
1143 o = Object(1)
1144 def cb(_):
1145 pass
1146 r = weakref.WeakMethod(o.some_method, cb)
1147 wr = weakref.ref(r)
1148 del r
1149 self.assertIs(wr(), None)
1150
1151 def test_equality(self):
1152 def _eq(a, b):
1153 self.assertTrue(a == b)
1154 self.assertFalse(a != b)
1155 def _ne(a, b):
1156 self.assertTrue(a != b)
1157 self.assertFalse(a == b)
1158 x = Object(1)
1159 y = Object(1)
1160 a = weakref.WeakMethod(x.some_method)
1161 b = weakref.WeakMethod(y.some_method)
1162 c = weakref.WeakMethod(x.other_method)
1163 d = weakref.WeakMethod(y.other_method)
1164 # Objects equal, same method
1165 _eq(a, b)
1166 _eq(c, d)
1167 # Objects equal, different method
1168 _ne(a, c)
1169 _ne(a, d)
1170 _ne(b, c)
1171 _ne(b, d)
1172 # Objects unequal, same or different method
1173 z = Object(2)
1174 e = weakref.WeakMethod(z.some_method)
1175 f = weakref.WeakMethod(z.other_method)
1176 _ne(a, e)
1177 _ne(a, f)
1178 _ne(b, e)
1179 _ne(b, f)
1180 # Compare with different types
1181 _ne(a, x.some_method)
1182 _eq(a, ALWAYS_EQ)
1183 del x, y, z
1184 gc.collect()
1185 # Dead WeakMethods compare by identity
1186 refs = a, b, c, d, e, f
1187 for q in refs:
1188 for r in refs:
1189 self.assertEqual(q == r, q is r)
1190 self.assertEqual(q != r, q is not r)
1191
1192 def test_hashing(self):
1193 # Alive WeakMethods are hashable if the underlying object is
1194 # hashable.
1195 x = Object(1)
1196 y = Object(1)
1197 a = weakref.WeakMethod(x.some_method)
1198 b = weakref.WeakMethod(y.some_method)
1199 c = weakref.WeakMethod(y.other_method)
1200 # Since WeakMethod objects are equal, the hashes should be equal.
1201 self.assertEqual(hash(a), hash(b))
1202 ha = hash(a)
1203 # Dead WeakMethods retain their old hash value
1204 del x, y
1205 gc.collect()
1206 self.assertEqual(hash(a), ha)
1207 self.assertEqual(hash(b), ha)
1208 # If it wasn't hashed when alive, a dead WeakMethod cannot be hashed.
1209 self.assertRaises(TypeError, hash, c)
1210
1211
1212 class ESC[4;38;5;81mMappingTestCase(ESC[4;38;5;149mTestBase):
1213
1214 COUNT = 10
1215
1216 def check_len_cycles(self, dict_type, cons):
1217 N = 20
1218 items = [RefCycle() for i in range(N)]
1219 dct = dict_type(cons(o) for o in items)
1220 # Keep an iterator alive
1221 it = dct.items()
1222 try:
1223 next(it)
1224 except StopIteration:
1225 pass
1226 del items
1227 gc.collect()
1228 n1 = len(dct)
1229 del it
1230 gc.collect()
1231 n2 = len(dct)
1232 # one item may be kept alive inside the iterator
1233 self.assertIn(n1, (0, 1))
1234 self.assertEqual(n2, 0)
1235
1236 def test_weak_keyed_len_cycles(self):
1237 self.check_len_cycles(weakref.WeakKeyDictionary, lambda k: (k, 1))
1238
1239 def test_weak_valued_len_cycles(self):
1240 self.check_len_cycles(weakref.WeakValueDictionary, lambda k: (1, k))
1241
1242 def check_len_race(self, dict_type, cons):
1243 # Extended sanity checks for len() in the face of cyclic collection
1244 self.addCleanup(gc.set_threshold, *gc.get_threshold())
1245 for th in range(1, 100):
1246 N = 20
1247 gc.collect(0)
1248 gc.set_threshold(th, th, th)
1249 items = [RefCycle() for i in range(N)]
1250 dct = dict_type(cons(o) for o in items)
1251 del items
1252 # All items will be collected at next garbage collection pass
1253 it = dct.items()
1254 try:
1255 next(it)
1256 except StopIteration:
1257 pass
1258 n1 = len(dct)
1259 del it
1260 n2 = len(dct)
1261 self.assertGreaterEqual(n1, 0)
1262 self.assertLessEqual(n1, N)
1263 self.assertGreaterEqual(n2, 0)
1264 self.assertLessEqual(n2, n1)
1265
1266 def test_weak_keyed_len_race(self):
1267 self.check_len_race(weakref.WeakKeyDictionary, lambda k: (k, 1))
1268
1269 def test_weak_valued_len_race(self):
1270 self.check_len_race(weakref.WeakValueDictionary, lambda k: (1, k))
1271
1272 def test_weak_values(self):
1273 #
1274 # This exercises d.copy(), d.items(), d[], del d[], len(d).
1275 #
1276 dict, objects = self.make_weak_valued_dict()
1277 for o in objects:
1278 self.assertEqual(weakref.getweakrefcount(o), 1)
1279 self.assertIs(o, dict[o.arg],
1280 "wrong object returned by weak dict!")
1281 items1 = list(dict.items())
1282 items2 = list(dict.copy().items())
1283 items1.sort()
1284 items2.sort()
1285 self.assertEqual(items1, items2,
1286 "cloning of weak-valued dictionary did not work!")
1287 del items1, items2
1288 self.assertEqual(len(dict), self.COUNT)
1289 del objects[0]
1290 gc_collect() # For PyPy or other GCs.
1291 self.assertEqual(len(dict), self.COUNT - 1,
1292 "deleting object did not cause dictionary update")
1293 del objects, o
1294 gc_collect() # For PyPy or other GCs.
1295 self.assertEqual(len(dict), 0,
1296 "deleting the values did not clear the dictionary")
1297 # regression on SF bug #447152:
1298 dict = weakref.WeakValueDictionary()
1299 self.assertRaises(KeyError, dict.__getitem__, 1)
1300 dict[2] = C()
1301 gc_collect() # For PyPy or other GCs.
1302 self.assertRaises(KeyError, dict.__getitem__, 2)
1303
1304 def test_weak_keys(self):
1305 #
1306 # This exercises d.copy(), d.items(), d[] = v, d[], del d[],
1307 # len(d), k in d.
1308 #
1309 dict, objects = self.make_weak_keyed_dict()
1310 for o in objects:
1311 self.assertEqual(weakref.getweakrefcount(o), 1,
1312 "wrong number of weak references to %r!" % o)
1313 self.assertIs(o.arg, dict[o],
1314 "wrong object returned by weak dict!")
1315 items1 = dict.items()
1316 items2 = dict.copy().items()
1317 self.assertEqual(set(items1), set(items2),
1318 "cloning of weak-keyed dictionary did not work!")
1319 del items1, items2
1320 self.assertEqual(len(dict), self.COUNT)
1321 del objects[0]
1322 gc_collect() # For PyPy or other GCs.
1323 self.assertEqual(len(dict), (self.COUNT - 1),
1324 "deleting object did not cause dictionary update")
1325 del objects, o
1326 gc_collect() # For PyPy or other GCs.
1327 self.assertEqual(len(dict), 0,
1328 "deleting the keys did not clear the dictionary")
1329 o = Object(42)
1330 dict[o] = "What is the meaning of the universe?"
1331 self.assertIn(o, dict)
1332 self.assertNotIn(34, dict)
1333
1334 def test_weak_keyed_iters(self):
1335 dict, objects = self.make_weak_keyed_dict()
1336 self.check_iters(dict)
1337
1338 # Test keyrefs()
1339 refs = dict.keyrefs()
1340 self.assertEqual(len(refs), len(objects))
1341 objects2 = list(objects)
1342 for wr in refs:
1343 ob = wr()
1344 self.assertIn(ob, dict)
1345 self.assertIn(ob, dict)
1346 self.assertEqual(ob.arg, dict[ob])
1347 objects2.remove(ob)
1348 self.assertEqual(len(objects2), 0)
1349
1350 # Test iterkeyrefs()
1351 objects2 = list(objects)
1352 self.assertEqual(len(list(dict.keyrefs())), len(objects))
1353 for wr in dict.keyrefs():
1354 ob = wr()
1355 self.assertIn(ob, dict)
1356 self.assertIn(ob, dict)
1357 self.assertEqual(ob.arg, dict[ob])
1358 objects2.remove(ob)
1359 self.assertEqual(len(objects2), 0)
1360
1361 def test_weak_valued_iters(self):
1362 dict, objects = self.make_weak_valued_dict()
1363 self.check_iters(dict)
1364
1365 # Test valuerefs()
1366 refs = dict.valuerefs()
1367 self.assertEqual(len(refs), len(objects))
1368 objects2 = list(objects)
1369 for wr in refs:
1370 ob = wr()
1371 self.assertEqual(ob, dict[ob.arg])
1372 self.assertEqual(ob.arg, dict[ob.arg].arg)
1373 objects2.remove(ob)
1374 self.assertEqual(len(objects2), 0)
1375
1376 # Test itervaluerefs()
1377 objects2 = list(objects)
1378 self.assertEqual(len(list(dict.itervaluerefs())), len(objects))
1379 for wr in dict.itervaluerefs():
1380 ob = wr()
1381 self.assertEqual(ob, dict[ob.arg])
1382 self.assertEqual(ob.arg, dict[ob.arg].arg)
1383 objects2.remove(ob)
1384 self.assertEqual(len(objects2), 0)
1385
1386 def check_iters(self, dict):
1387 # item iterator:
1388 items = list(dict.items())
1389 for item in dict.items():
1390 items.remove(item)
1391 self.assertFalse(items, "items() did not touch all items")
1392
1393 # key iterator, via __iter__():
1394 keys = list(dict.keys())
1395 for k in dict:
1396 keys.remove(k)
1397 self.assertFalse(keys, "__iter__() did not touch all keys")
1398
1399 # key iterator, via iterkeys():
1400 keys = list(dict.keys())
1401 for k in dict.keys():
1402 keys.remove(k)
1403 self.assertFalse(keys, "iterkeys() did not touch all keys")
1404
1405 # value iterator:
1406 values = list(dict.values())
1407 for v in dict.values():
1408 values.remove(v)
1409 self.assertFalse(values,
1410 "itervalues() did not touch all values")
1411
1412 def check_weak_destroy_while_iterating(self, dict, objects, iter_name):
1413 n = len(dict)
1414 it = iter(getattr(dict, iter_name)())
1415 next(it) # Trigger internal iteration
1416 # Destroy an object
1417 del objects[-1]
1418 gc.collect() # just in case
1419 # We have removed either the first consumed object, or another one
1420 self.assertIn(len(list(it)), [len(objects), len(objects) - 1])
1421 del it
1422 # The removal has been committed
1423 self.assertEqual(len(dict), n - 1)
1424
1425 def check_weak_destroy_and_mutate_while_iterating(self, dict, testcontext):
1426 # Check that we can explicitly mutate the weak dict without
1427 # interfering with delayed removal.
1428 # `testcontext` should create an iterator, destroy one of the
1429 # weakref'ed objects and then return a new key/value pair corresponding
1430 # to the destroyed object.
1431 with testcontext() as (k, v):
1432 self.assertNotIn(k, dict)
1433 with testcontext() as (k, v):
1434 self.assertRaises(KeyError, dict.__delitem__, k)
1435 self.assertNotIn(k, dict)
1436 with testcontext() as (k, v):
1437 self.assertRaises(KeyError, dict.pop, k)
1438 self.assertNotIn(k, dict)
1439 with testcontext() as (k, v):
1440 dict[k] = v
1441 self.assertEqual(dict[k], v)
1442 ddict = copy.copy(dict)
1443 with testcontext() as (k, v):
1444 dict.update(ddict)
1445 self.assertEqual(dict, ddict)
1446 with testcontext() as (k, v):
1447 dict.clear()
1448 self.assertEqual(len(dict), 0)
1449
1450 def check_weak_del_and_len_while_iterating(self, dict, testcontext):
1451 # Check that len() works when both iterating and removing keys
1452 # explicitly through various means (.pop(), .clear()...), while
1453 # implicit mutation is deferred because an iterator is alive.
1454 # (each call to testcontext() should schedule one item for removal
1455 # for this test to work properly)
1456 o = Object(123456)
1457 with testcontext():
1458 n = len(dict)
1459 # Since underlying dict is ordered, first item is popped
1460 dict.pop(next(dict.keys()))
1461 self.assertEqual(len(dict), n - 1)
1462 dict[o] = o
1463 self.assertEqual(len(dict), n)
1464 # last item in objects is removed from dict in context shutdown
1465 with testcontext():
1466 self.assertEqual(len(dict), n - 1)
1467 # Then, (o, o) is popped
1468 dict.popitem()
1469 self.assertEqual(len(dict), n - 2)
1470 with testcontext():
1471 self.assertEqual(len(dict), n - 3)
1472 del dict[next(dict.keys())]
1473 self.assertEqual(len(dict), n - 4)
1474 with testcontext():
1475 self.assertEqual(len(dict), n - 5)
1476 dict.popitem()
1477 self.assertEqual(len(dict), n - 6)
1478 with testcontext():
1479 dict.clear()
1480 self.assertEqual(len(dict), 0)
1481 self.assertEqual(len(dict), 0)
1482
1483 def test_weak_keys_destroy_while_iterating(self):
1484 # Issue #7105: iterators shouldn't crash when a key is implicitly removed
1485 dict, objects = self.make_weak_keyed_dict()
1486 self.check_weak_destroy_while_iterating(dict, objects, 'keys')
1487 self.check_weak_destroy_while_iterating(dict, objects, 'items')
1488 self.check_weak_destroy_while_iterating(dict, objects, 'values')
1489 self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs')
1490 dict, objects = self.make_weak_keyed_dict()
1491 @contextlib.contextmanager
1492 def testcontext():
1493 try:
1494 it = iter(dict.items())
1495 next(it)
1496 # Schedule a key/value for removal and recreate it
1497 v = objects.pop().arg
1498 gc.collect() # just in case
1499 yield Object(v), v
1500 finally:
1501 it = None # should commit all removals
1502 gc.collect()
1503 self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
1504 # Issue #21173: len() fragile when keys are both implicitly and
1505 # explicitly removed.
1506 dict, objects = self.make_weak_keyed_dict()
1507 self.check_weak_del_and_len_while_iterating(dict, testcontext)
1508
1509 def test_weak_values_destroy_while_iterating(self):
1510 # Issue #7105: iterators shouldn't crash when a key is implicitly removed
1511 dict, objects = self.make_weak_valued_dict()
1512 self.check_weak_destroy_while_iterating(dict, objects, 'keys')
1513 self.check_weak_destroy_while_iterating(dict, objects, 'items')
1514 self.check_weak_destroy_while_iterating(dict, objects, 'values')
1515 self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs')
1516 self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs')
1517 dict, objects = self.make_weak_valued_dict()
1518 @contextlib.contextmanager
1519 def testcontext():
1520 try:
1521 it = iter(dict.items())
1522 next(it)
1523 # Schedule a key/value for removal and recreate it
1524 k = objects.pop().arg
1525 gc.collect() # just in case
1526 yield k, Object(k)
1527 finally:
1528 it = None # should commit all removals
1529 gc.collect()
1530 self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
1531 dict, objects = self.make_weak_valued_dict()
1532 self.check_weak_del_and_len_while_iterating(dict, testcontext)
1533
1534 def test_make_weak_keyed_dict_from_dict(self):
1535 o = Object(3)
1536 dict = weakref.WeakKeyDictionary({o:364})
1537 self.assertEqual(dict[o], 364)
1538
1539 def test_make_weak_keyed_dict_from_weak_keyed_dict(self):
1540 o = Object(3)
1541 dict = weakref.WeakKeyDictionary({o:364})
1542 dict2 = weakref.WeakKeyDictionary(dict)
1543 self.assertEqual(dict[o], 364)
1544
1545 def make_weak_keyed_dict(self):
1546 dict = weakref.WeakKeyDictionary()
1547 objects = list(map(Object, range(self.COUNT)))
1548 for o in objects:
1549 dict[o] = o.arg
1550 return dict, objects
1551
1552 def test_make_weak_valued_dict_from_dict(self):
1553 o = Object(3)
1554 dict = weakref.WeakValueDictionary({364:o})
1555 self.assertEqual(dict[364], o)
1556
1557 def test_make_weak_valued_dict_from_weak_valued_dict(self):
1558 o = Object(3)
1559 dict = weakref.WeakValueDictionary({364:o})
1560 dict2 = weakref.WeakValueDictionary(dict)
1561 self.assertEqual(dict[364], o)
1562
1563 def test_make_weak_valued_dict_misc(self):
1564 # errors
1565 self.assertRaises(TypeError, weakref.WeakValueDictionary.__init__)
1566 self.assertRaises(TypeError, weakref.WeakValueDictionary, {}, {})
1567 self.assertRaises(TypeError, weakref.WeakValueDictionary, (), ())
1568 # special keyword arguments
1569 o = Object(3)
1570 for kw in 'self', 'dict', 'other', 'iterable':
1571 d = weakref.WeakValueDictionary(**{kw: o})
1572 self.assertEqual(list(d.keys()), [kw])
1573 self.assertEqual(d[kw], o)
1574
1575 def make_weak_valued_dict(self):
1576 dict = weakref.WeakValueDictionary()
1577 objects = list(map(Object, range(self.COUNT)))
1578 for o in objects:
1579 dict[o.arg] = o
1580 return dict, objects
1581
1582 def check_popitem(self, klass, key1, value1, key2, value2):
1583 weakdict = klass()
1584 weakdict[key1] = value1
1585 weakdict[key2] = value2
1586 self.assertEqual(len(weakdict), 2)
1587 k, v = weakdict.popitem()
1588 self.assertEqual(len(weakdict), 1)
1589 if k is key1:
1590 self.assertIs(v, value1)
1591 else:
1592 self.assertIs(v, value2)
1593 k, v = weakdict.popitem()
1594 self.assertEqual(len(weakdict), 0)
1595 if k is key1:
1596 self.assertIs(v, value1)
1597 else:
1598 self.assertIs(v, value2)
1599
1600 def test_weak_valued_dict_popitem(self):
1601 self.check_popitem(weakref.WeakValueDictionary,
1602 "key1", C(), "key2", C())
1603
1604 def test_weak_keyed_dict_popitem(self):
1605 self.check_popitem(weakref.WeakKeyDictionary,
1606 C(), "value 1", C(), "value 2")
1607
1608 def check_setdefault(self, klass, key, value1, value2):
1609 self.assertIsNot(value1, value2,
1610 "invalid test"
1611 " -- value parameters must be distinct objects")
1612 weakdict = klass()
1613 o = weakdict.setdefault(key, value1)
1614 self.assertIs(o, value1)
1615 self.assertIn(key, weakdict)
1616 self.assertIs(weakdict.get(key), value1)
1617 self.assertIs(weakdict[key], value1)
1618
1619 o = weakdict.setdefault(key, value2)
1620 self.assertIs(o, value1)
1621 self.assertIn(key, weakdict)
1622 self.assertIs(weakdict.get(key), value1)
1623 self.assertIs(weakdict[key], value1)
1624
1625 def test_weak_valued_dict_setdefault(self):
1626 self.check_setdefault(weakref.WeakValueDictionary,
1627 "key", C(), C())
1628
1629 def test_weak_keyed_dict_setdefault(self):
1630 self.check_setdefault(weakref.WeakKeyDictionary,
1631 C(), "value 1", "value 2")
1632
1633 def check_update(self, klass, dict):
1634 #
1635 # This exercises d.update(), len(d), d.keys(), k in d,
1636 # d.get(), d[].
1637 #
1638 weakdict = klass()
1639 weakdict.update(dict)
1640 self.assertEqual(len(weakdict), len(dict))
1641 for k in weakdict.keys():
1642 self.assertIn(k, dict, "mysterious new key appeared in weak dict")
1643 v = dict.get(k)
1644 self.assertIs(v, weakdict[k])
1645 self.assertIs(v, weakdict.get(k))
1646 for k in dict.keys():
1647 self.assertIn(k, weakdict, "original key disappeared in weak dict")
1648 v = dict[k]
1649 self.assertIs(v, weakdict[k])
1650 self.assertIs(v, weakdict.get(k))
1651
1652 def test_weak_valued_dict_update(self):
1653 self.check_update(weakref.WeakValueDictionary,
1654 {1: C(), 'a': C(), C(): C()})
1655 # errors
1656 self.assertRaises(TypeError, weakref.WeakValueDictionary.update)
1657 d = weakref.WeakValueDictionary()
1658 self.assertRaises(TypeError, d.update, {}, {})
1659 self.assertRaises(TypeError, d.update, (), ())
1660 self.assertEqual(list(d.keys()), [])
1661 # special keyword arguments
1662 o = Object(3)
1663 for kw in 'self', 'dict', 'other', 'iterable':
1664 d = weakref.WeakValueDictionary()
1665 d.update(**{kw: o})
1666 self.assertEqual(list(d.keys()), [kw])
1667 self.assertEqual(d[kw], o)
1668
1669 def test_weak_valued_union_operators(self):
1670 a = C()
1671 b = C()
1672 c = C()
1673 wvd1 = weakref.WeakValueDictionary({1: a})
1674 wvd2 = weakref.WeakValueDictionary({1: b, 2: a})
1675 wvd3 = wvd1.copy()
1676 d1 = {1: c, 3: b}
1677 pairs = [(5, c), (6, b)]
1678
1679 tmp1 = wvd1 | wvd2 # Between two WeakValueDictionaries
1680 self.assertEqual(dict(tmp1), dict(wvd1) | dict(wvd2))
1681 self.assertIs(type(tmp1), weakref.WeakValueDictionary)
1682 wvd1 |= wvd2
1683 self.assertEqual(wvd1, tmp1)
1684
1685 tmp2 = wvd2 | d1 # Between WeakValueDictionary and mapping
1686 self.assertEqual(dict(tmp2), dict(wvd2) | d1)
1687 self.assertIs(type(tmp2), weakref.WeakValueDictionary)
1688 wvd2 |= d1
1689 self.assertEqual(wvd2, tmp2)
1690
1691 tmp3 = wvd3.copy() # Between WeakValueDictionary and iterable key, value
1692 tmp3 |= pairs
1693 self.assertEqual(dict(tmp3), dict(wvd3) | dict(pairs))
1694 self.assertIs(type(tmp3), weakref.WeakValueDictionary)
1695
1696 tmp4 = d1 | wvd3 # Testing .__ror__
1697 self.assertEqual(dict(tmp4), d1 | dict(wvd3))
1698 self.assertIs(type(tmp4), weakref.WeakValueDictionary)
1699
1700 del a
1701 self.assertNotIn(2, tmp1)
1702 self.assertNotIn(2, tmp2)
1703 self.assertNotIn(1, tmp3)
1704 self.assertNotIn(1, tmp4)
1705
1706 def test_weak_keyed_dict_update(self):
1707 self.check_update(weakref.WeakKeyDictionary,
1708 {C(): 1, C(): 2, C(): 3})
1709
1710 def test_weak_keyed_delitem(self):
1711 d = weakref.WeakKeyDictionary()
1712 o1 = Object('1')
1713 o2 = Object('2')
1714 d[o1] = 'something'
1715 d[o2] = 'something'
1716 self.assertEqual(len(d), 2)
1717 del d[o1]
1718 self.assertEqual(len(d), 1)
1719 self.assertEqual(list(d.keys()), [o2])
1720
1721 def test_weak_keyed_union_operators(self):
1722 o1 = C()
1723 o2 = C()
1724 o3 = C()
1725 wkd1 = weakref.WeakKeyDictionary({o1: 1, o2: 2})
1726 wkd2 = weakref.WeakKeyDictionary({o3: 3, o1: 4})
1727 wkd3 = wkd1.copy()
1728 d1 = {o2: '5', o3: '6'}
1729 pairs = [(o2, 7), (o3, 8)]
1730
1731 tmp1 = wkd1 | wkd2 # Between two WeakKeyDictionaries
1732 self.assertEqual(dict(tmp1), dict(wkd1) | dict(wkd2))
1733 self.assertIs(type(tmp1), weakref.WeakKeyDictionary)
1734 wkd1 |= wkd2
1735 self.assertEqual(wkd1, tmp1)
1736
1737 tmp2 = wkd2 | d1 # Between WeakKeyDictionary and mapping
1738 self.assertEqual(dict(tmp2), dict(wkd2) | d1)
1739 self.assertIs(type(tmp2), weakref.WeakKeyDictionary)
1740 wkd2 |= d1
1741 self.assertEqual(wkd2, tmp2)
1742
1743 tmp3 = wkd3.copy() # Between WeakKeyDictionary and iterable key, value
1744 tmp3 |= pairs
1745 self.assertEqual(dict(tmp3), dict(wkd3) | dict(pairs))
1746 self.assertIs(type(tmp3), weakref.WeakKeyDictionary)
1747
1748 tmp4 = d1 | wkd3 # Testing .__ror__
1749 self.assertEqual(dict(tmp4), d1 | dict(wkd3))
1750 self.assertIs(type(tmp4), weakref.WeakKeyDictionary)
1751
1752 del o1
1753 self.assertNotIn(4, tmp1.values())
1754 self.assertNotIn(4, tmp2.values())
1755 self.assertNotIn(1, tmp3.values())
1756 self.assertNotIn(1, tmp4.values())
1757
1758 def test_weak_valued_delitem(self):
1759 d = weakref.WeakValueDictionary()
1760 o1 = Object('1')
1761 o2 = Object('2')
1762 d['something'] = o1
1763 d['something else'] = o2
1764 self.assertEqual(len(d), 2)
1765 del d['something']
1766 self.assertEqual(len(d), 1)
1767 self.assertEqual(list(d.items()), [('something else', o2)])
1768
1769 def test_weak_keyed_bad_delitem(self):
1770 d = weakref.WeakKeyDictionary()
1771 o = Object('1')
1772 # An attempt to delete an object that isn't there should raise
1773 # KeyError. It didn't before 2.3.
1774 self.assertRaises(KeyError, d.__delitem__, o)
1775 self.assertRaises(KeyError, d.__getitem__, o)
1776
1777 # If a key isn't of a weakly referencable type, __getitem__ and
1778 # __setitem__ raise TypeError. __delitem__ should too.
1779 self.assertRaises(TypeError, d.__delitem__, 13)
1780 self.assertRaises(TypeError, d.__getitem__, 13)
1781 self.assertRaises(TypeError, d.__setitem__, 13, 13)
1782
1783 def test_weak_keyed_cascading_deletes(self):
1784 # SF bug 742860. For some reason, before 2.3 __delitem__ iterated
1785 # over the keys via self.data.iterkeys(). If things vanished from
1786 # the dict during this (or got added), that caused a RuntimeError.
1787
1788 d = weakref.WeakKeyDictionary()
1789 mutate = False
1790
1791 class ESC[4;38;5;81mC(ESC[4;38;5;149mobject):
1792 def __init__(self, i):
1793 self.value = i
1794 def __hash__(self):
1795 return hash(self.value)
1796 def __eq__(self, other):
1797 if mutate:
1798 # Side effect that mutates the dict, by removing the
1799 # last strong reference to a key.
1800 del objs[-1]
1801 return self.value == other.value
1802
1803 objs = [C(i) for i in range(4)]
1804 for o in objs:
1805 d[o] = o.value
1806 del o # now the only strong references to keys are in objs
1807 # Find the order in which iterkeys sees the keys.
1808 objs = list(d.keys())
1809 # Reverse it, so that the iteration implementation of __delitem__
1810 # has to keep looping to find the first object we delete.
1811 objs.reverse()
1812
1813 # Turn on mutation in C.__eq__. The first time through the loop,
1814 # under the iterkeys() business the first comparison will delete
1815 # the last item iterkeys() would see, and that causes a
1816 # RuntimeError: dictionary changed size during iteration
1817 # when the iterkeys() loop goes around to try comparing the next
1818 # key. After this was fixed, it just deletes the last object *our*
1819 # "for o in obj" loop would have gotten to.
1820 mutate = True
1821 count = 0
1822 for o in objs:
1823 count += 1
1824 del d[o]
1825 gc_collect() # For PyPy or other GCs.
1826 self.assertEqual(len(d), 0)
1827 self.assertEqual(count, 2)
1828
1829 def test_make_weak_valued_dict_repr(self):
1830 dict = weakref.WeakValueDictionary()
1831 self.assertRegex(repr(dict), '<WeakValueDictionary at 0x.*>')
1832
1833 def test_make_weak_keyed_dict_repr(self):
1834 dict = weakref.WeakKeyDictionary()
1835 self.assertRegex(repr(dict), '<WeakKeyDictionary at 0x.*>')
1836
1837 @threading_helper.requires_working_threading()
1838 def test_threaded_weak_valued_setdefault(self):
1839 d = weakref.WeakValueDictionary()
1840 with collect_in_thread():
1841 for i in range(100000):
1842 x = d.setdefault(10, RefCycle())
1843 self.assertIsNot(x, None) # we never put None in there!
1844 del x
1845
1846 @threading_helper.requires_working_threading()
1847 def test_threaded_weak_valued_pop(self):
1848 d = weakref.WeakValueDictionary()
1849 with collect_in_thread():
1850 for i in range(100000):
1851 d[10] = RefCycle()
1852 x = d.pop(10, 10)
1853 self.assertIsNot(x, None) # we never put None in there!
1854
1855 @threading_helper.requires_working_threading()
1856 def test_threaded_weak_valued_consistency(self):
1857 # Issue #28427: old keys should not remove new values from
1858 # WeakValueDictionary when collecting from another thread.
1859 d = weakref.WeakValueDictionary()
1860 with collect_in_thread():
1861 for i in range(200000):
1862 o = RefCycle()
1863 d[10] = o
1864 # o is still alive, so the dict can't be empty
1865 self.assertEqual(len(d), 1)
1866 o = None # lose ref
1867
1868 def check_threaded_weak_dict_copy(self, type_, deepcopy):
1869 # `type_` should be either WeakKeyDictionary or WeakValueDictionary.
1870 # `deepcopy` should be either True or False.
1871 exc = []
1872
1873 class ESC[4;38;5;81mDummyKey:
1874 def __init__(self, ctr):
1875 self.ctr = ctr
1876
1877 class ESC[4;38;5;81mDummyValue:
1878 def __init__(self, ctr):
1879 self.ctr = ctr
1880
1881 def dict_copy(d, exc):
1882 try:
1883 if deepcopy is True:
1884 _ = copy.deepcopy(d)
1885 else:
1886 _ = d.copy()
1887 except Exception as ex:
1888 exc.append(ex)
1889
1890 def pop_and_collect(lst):
1891 gc_ctr = 0
1892 while lst:
1893 i = random.randint(0, len(lst) - 1)
1894 gc_ctr += 1
1895 lst.pop(i)
1896 if gc_ctr % 10000 == 0:
1897 gc.collect() # just in case
1898
1899 self.assertIn(type_, (weakref.WeakKeyDictionary, weakref.WeakValueDictionary))
1900
1901 d = type_()
1902 keys = []
1903 values = []
1904 # Initialize d with many entries
1905 for i in range(70000):
1906 k, v = DummyKey(i), DummyValue(i)
1907 keys.append(k)
1908 values.append(v)
1909 d[k] = v
1910 del k
1911 del v
1912
1913 t_copy = threading.Thread(target=dict_copy, args=(d, exc,))
1914 if type_ is weakref.WeakKeyDictionary:
1915 t_collect = threading.Thread(target=pop_and_collect, args=(keys,))
1916 else: # weakref.WeakValueDictionary
1917 t_collect = threading.Thread(target=pop_and_collect, args=(values,))
1918
1919 t_copy.start()
1920 t_collect.start()
1921
1922 t_copy.join()
1923 t_collect.join()
1924
1925 # Test exceptions
1926 if exc:
1927 raise exc[0]
1928
1929 @threading_helper.requires_working_threading()
1930 def test_threaded_weak_key_dict_copy(self):
1931 # Issue #35615: Weakref keys or values getting GC'ed during dict
1932 # copying should not result in a crash.
1933 self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, False)
1934
1935 @threading_helper.requires_working_threading()
1936 @support.requires_resource('cpu')
1937 def test_threaded_weak_key_dict_deepcopy(self):
1938 # Issue #35615: Weakref keys or values getting GC'ed during dict
1939 # copying should not result in a crash.
1940 self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, True)
1941
1942 @threading_helper.requires_working_threading()
1943 def test_threaded_weak_value_dict_copy(self):
1944 # Issue #35615: Weakref keys or values getting GC'ed during dict
1945 # copying should not result in a crash.
1946 self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, False)
1947
1948 @threading_helper.requires_working_threading()
1949 @support.requires_resource('cpu')
1950 def test_threaded_weak_value_dict_deepcopy(self):
1951 # Issue #35615: Weakref keys or values getting GC'ed during dict
1952 # copying should not result in a crash.
1953 self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, True)
1954
1955 @support.cpython_only
1956 def test_remove_closure(self):
1957 d = weakref.WeakValueDictionary()
1958 self.assertIsNone(d._remove.__closure__)
1959
1960
1961 from test import mapping_tests
1962
1963 class ESC[4;38;5;81mWeakValueDictionaryTestCase(ESC[4;38;5;149mmapping_testsESC[4;38;5;149m.ESC[4;38;5;149mBasicTestMappingProtocol):
1964 """Check that WeakValueDictionary conforms to the mapping protocol"""
1965 __ref = {"key1":Object(1), "key2":Object(2), "key3":Object(3)}
1966 type2test = weakref.WeakValueDictionary
1967 def _reference(self):
1968 return self.__ref.copy()
1969
1970 class ESC[4;38;5;81mWeakKeyDictionaryTestCase(ESC[4;38;5;149mmapping_testsESC[4;38;5;149m.ESC[4;38;5;149mBasicTestMappingProtocol):
1971 """Check that WeakKeyDictionary conforms to the mapping protocol"""
1972 __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3}
1973 type2test = weakref.WeakKeyDictionary
1974 def _reference(self):
1975 return self.__ref.copy()
1976
1977
1978 class ESC[4;38;5;81mFinalizeTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1979
1980 class ESC[4;38;5;81mA:
1981 pass
1982
1983 def _collect_if_necessary(self):
1984 # we create no ref-cycles so in CPython no gc should be needed
1985 if sys.implementation.name != 'cpython':
1986 support.gc_collect()
1987
1988 def test_finalize(self):
1989 def add(x,y,z):
1990 res.append(x + y + z)
1991 return x + y + z
1992
1993 a = self.A()
1994
1995 res = []
1996 f = weakref.finalize(a, add, 67, 43, z=89)
1997 self.assertEqual(f.alive, True)
1998 self.assertEqual(f.peek(), (a, add, (67,43), {'z':89}))
1999 self.assertEqual(f(), 199)
2000 self.assertEqual(f(), None)
2001 self.assertEqual(f(), None)
2002 self.assertEqual(f.peek(), None)
2003 self.assertEqual(f.detach(), None)
2004 self.assertEqual(f.alive, False)
2005 self.assertEqual(res, [199])
2006
2007 res = []
2008 f = weakref.finalize(a, add, 67, 43, 89)
2009 self.assertEqual(f.peek(), (a, add, (67,43,89), {}))
2010 self.assertEqual(f.detach(), (a, add, (67,43,89), {}))
2011 self.assertEqual(f(), None)
2012 self.assertEqual(f(), None)
2013 self.assertEqual(f.peek(), None)
2014 self.assertEqual(f.detach(), None)
2015 self.assertEqual(f.alive, False)
2016 self.assertEqual(res, [])
2017
2018 res = []
2019 f = weakref.finalize(a, add, x=67, y=43, z=89)
2020 del a
2021 self._collect_if_necessary()
2022 self.assertEqual(f(), None)
2023 self.assertEqual(f(), None)
2024 self.assertEqual(f.peek(), None)
2025 self.assertEqual(f.detach(), None)
2026 self.assertEqual(f.alive, False)
2027 self.assertEqual(res, [199])
2028
2029 def test_arg_errors(self):
2030 def fin(*args, **kwargs):
2031 res.append((args, kwargs))
2032
2033 a = self.A()
2034
2035 res = []
2036 f = weakref.finalize(a, fin, 1, 2, func=3, obj=4)
2037 self.assertEqual(f.peek(), (a, fin, (1, 2), {'func': 3, 'obj': 4}))
2038 f()
2039 self.assertEqual(res, [((1, 2), {'func': 3, 'obj': 4})])
2040
2041 with self.assertRaises(TypeError):
2042 weakref.finalize(a, func=fin, arg=1)
2043 with self.assertRaises(TypeError):
2044 weakref.finalize(obj=a, func=fin, arg=1)
2045 self.assertRaises(TypeError, weakref.finalize, a)
2046 self.assertRaises(TypeError, weakref.finalize)
2047
2048 def test_order(self):
2049 a = self.A()
2050 res = []
2051
2052 f1 = weakref.finalize(a, res.append, 'f1')
2053 f2 = weakref.finalize(a, res.append, 'f2')
2054 f3 = weakref.finalize(a, res.append, 'f3')
2055 f4 = weakref.finalize(a, res.append, 'f4')
2056 f5 = weakref.finalize(a, res.append, 'f5')
2057
2058 # make sure finalizers can keep themselves alive
2059 del f1, f4
2060
2061 self.assertTrue(f2.alive)
2062 self.assertTrue(f3.alive)
2063 self.assertTrue(f5.alive)
2064
2065 self.assertTrue(f5.detach())
2066 self.assertFalse(f5.alive)
2067
2068 f5() # nothing because previously unregistered
2069 res.append('A')
2070 f3() # => res.append('f3')
2071 self.assertFalse(f3.alive)
2072 res.append('B')
2073 f3() # nothing because previously called
2074 res.append('C')
2075 del a
2076 self._collect_if_necessary()
2077 # => res.append('f4')
2078 # => res.append('f2')
2079 # => res.append('f1')
2080 self.assertFalse(f2.alive)
2081 res.append('D')
2082 f2() # nothing because previously called by gc
2083
2084 expected = ['A', 'f3', 'B', 'C', 'f4', 'f2', 'f1', 'D']
2085 self.assertEqual(res, expected)
2086
2087 def test_all_freed(self):
2088 # we want a weakrefable subclass of weakref.finalize
2089 class ESC[4;38;5;81mMyFinalizer(ESC[4;38;5;149mweakrefESC[4;38;5;149m.ESC[4;38;5;149mfinalize):
2090 pass
2091
2092 a = self.A()
2093 res = []
2094 def callback():
2095 res.append(123)
2096 f = MyFinalizer(a, callback)
2097
2098 wr_callback = weakref.ref(callback)
2099 wr_f = weakref.ref(f)
2100 del callback, f
2101
2102 self.assertIsNotNone(wr_callback())
2103 self.assertIsNotNone(wr_f())
2104
2105 del a
2106 self._collect_if_necessary()
2107
2108 self.assertIsNone(wr_callback())
2109 self.assertIsNone(wr_f())
2110 self.assertEqual(res, [123])
2111
2112 @classmethod
2113 def run_in_child(cls):
2114 def error():
2115 # Create an atexit finalizer from inside a finalizer called
2116 # at exit. This should be the next to be run.
2117 g1 = weakref.finalize(cls, print, 'g1')
2118 print('f3 error')
2119 1/0
2120
2121 # cls should stay alive till atexit callbacks run
2122 f1 = weakref.finalize(cls, print, 'f1', _global_var)
2123 f2 = weakref.finalize(cls, print, 'f2', _global_var)
2124 f3 = weakref.finalize(cls, error)
2125 f4 = weakref.finalize(cls, print, 'f4', _global_var)
2126
2127 assert f1.atexit == True
2128 f2.atexit = False
2129 assert f3.atexit == True
2130 assert f4.atexit == True
2131
2132 def test_atexit(self):
2133 prog = ('from test.test_weakref import FinalizeTestCase;'+
2134 'FinalizeTestCase.run_in_child()')
2135 rc, out, err = script_helper.assert_python_ok('-c', prog)
2136 out = out.decode('ascii').splitlines()
2137 self.assertEqual(out, ['f4 foobar', 'f3 error', 'g1', 'f1 foobar'])
2138 self.assertTrue(b'ZeroDivisionError' in err)
2139
2140
2141 class ESC[4;38;5;81mModuleTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2142 def test_names(self):
2143 for name in ('ReferenceType', 'ProxyType', 'CallableProxyType',
2144 'WeakMethod', 'WeakSet', 'WeakKeyDictionary', 'WeakValueDictionary'):
2145 obj = getattr(weakref, name)
2146 if name != 'WeakSet':
2147 self.assertEqual(obj.__module__, 'weakref')
2148 self.assertEqual(obj.__name__, name)
2149 self.assertEqual(obj.__qualname__, name)
2150
2151
2152 libreftest = """ Doctest for examples in the library reference: weakref.rst
2153
2154 >>> from test.support import gc_collect
2155 >>> import weakref
2156 >>> class Dict(dict):
2157 ... pass
2158 ...
2159 >>> obj = Dict(red=1, green=2, blue=3) # this object is weak referencable
2160 >>> r = weakref.ref(obj)
2161 >>> print(r() is obj)
2162 True
2163
2164 >>> import weakref
2165 >>> class Object:
2166 ... pass
2167 ...
2168 >>> o = Object()
2169 >>> r = weakref.ref(o)
2170 >>> o2 = r()
2171 >>> o is o2
2172 True
2173 >>> del o, o2
2174 >>> gc_collect() # For PyPy or other GCs.
2175 >>> print(r())
2176 None
2177
2178 >>> import weakref
2179 >>> class ExtendedRef(weakref.ref):
2180 ... def __init__(self, ob, callback=None, **annotations):
2181 ... super().__init__(ob, callback)
2182 ... self.__counter = 0
2183 ... for k, v in annotations.items():
2184 ... setattr(self, k, v)
2185 ... def __call__(self):
2186 ... '''Return a pair containing the referent and the number of
2187 ... times the reference has been called.
2188 ... '''
2189 ... ob = super().__call__()
2190 ... if ob is not None:
2191 ... self.__counter += 1
2192 ... ob = (ob, self.__counter)
2193 ... return ob
2194 ...
2195 >>> class A: # not in docs from here, just testing the ExtendedRef
2196 ... pass
2197 ...
2198 >>> a = A()
2199 >>> r = ExtendedRef(a, foo=1, bar="baz")
2200 >>> r.foo
2201 1
2202 >>> r.bar
2203 'baz'
2204 >>> r()[1]
2205 1
2206 >>> r()[1]
2207 2
2208 >>> r()[0] is a
2209 True
2210
2211
2212 >>> import weakref
2213 >>> _id2obj_dict = weakref.WeakValueDictionary()
2214 >>> def remember(obj):
2215 ... oid = id(obj)
2216 ... _id2obj_dict[oid] = obj
2217 ... return oid
2218 ...
2219 >>> def id2obj(oid):
2220 ... return _id2obj_dict[oid]
2221 ...
2222 >>> a = A() # from here, just testing
2223 >>> a_id = remember(a)
2224 >>> id2obj(a_id) is a
2225 True
2226 >>> del a
2227 >>> gc_collect() # For PyPy or other GCs.
2228 >>> try:
2229 ... id2obj(a_id)
2230 ... except KeyError:
2231 ... print('OK')
2232 ... else:
2233 ... print('WeakValueDictionary error')
2234 OK
2235
2236 """
2237
2238 __test__ = {'libreftest' : libreftest}
2239
2240 def load_tests(loader, tests, pattern):
2241 tests.addTest(doctest.DocTestSuite())
2242 return tests
2243
2244
2245 if __name__ == "__main__":
2246 unittest.main()