(root)/
Python-3.11.7/
Lib/
test/
test_weakref.py
       1  import gc
       2  import sys
       3  import doctest
       4  import unittest
       5  import collections
       6  import weakref
       7  import operator
       8  import contextlib
       9  import copy
      10  import threading
      11  import time
      12  import random
      13  
      14  from test import support
      15  from test.support import script_helper, ALWAYS_EQ
      16  from test.support import gc_collect
      17  from test.support import threading_helper
      18  
      19  # Used in ReferencesTestCase.test_ref_created_during_del() .
      20  ref_from_del = None
      21  
      22  # Used by FinalizeTestCase as a global that may be replaced by None
      23  # when the interpreter shuts down.
      24  _global_var = 'foobar'
      25  
      26  class ESC[4;38;5;81mC:
      27      def method(self):
      28          pass
      29  
      30  
      31  class ESC[4;38;5;81mCallable:
      32      bar = None
      33  
      34      def __call__(self, x):
      35          self.bar = x
      36  
      37  
      38  def create_function():
      39      def f(): pass
      40      return f
      41  
      42  def create_bound_method():
      43      return C().method
      44  
      45  
      46  class ESC[4;38;5;81mObject:
      47      def __init__(self, arg):
      48          self.arg = arg
      49      def __repr__(self):
      50          return "<Object %r>" % self.arg
      51      def __eq__(self, other):
      52          if isinstance(other, Object):
      53              return self.arg == other.arg
      54          return NotImplemented
      55      def __lt__(self, other):
      56          if isinstance(other, Object):
      57              return self.arg < other.arg
      58          return NotImplemented
      59      def __hash__(self):
      60          return hash(self.arg)
      61      def some_method(self):
      62          return 4
      63      def other_method(self):
      64          return 5
      65  
      66  
      67  class ESC[4;38;5;81mRefCycle:
      68      def __init__(self):
      69          self.cycle = self
      70  
      71  
      72  class ESC[4;38;5;81mTestBase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      73  
      74      def setUp(self):
      75          self.cbcalled = 0
      76  
      77      def callback(self, ref):
      78          self.cbcalled += 1
      79  
      80  
      81  @contextlib.contextmanager
      82  def collect_in_thread(period=0.0001):
      83      """
      84      Ensure GC collections happen in a different thread, at a high frequency.
      85      """
      86      please_stop = False
      87  
      88      def collect():
      89          while not please_stop:
      90              time.sleep(period)
      91              gc.collect()
      92  
      93      with support.disable_gc():
      94          t = threading.Thread(target=collect)
      95          t.start()
      96          try:
      97              yield
      98          finally:
      99              please_stop = True
     100              t.join()
     101  
     102  
     103  class ESC[4;38;5;81mReferencesTestCase(ESC[4;38;5;149mTestBase):
     104  
     105      def test_basic_ref(self):
     106          self.check_basic_ref(C)
     107          self.check_basic_ref(create_function)
     108          self.check_basic_ref(create_bound_method)
     109  
     110          # Just make sure the tp_repr handler doesn't raise an exception.
     111          # Live reference:
     112          o = C()
     113          wr = weakref.ref(o)
     114          repr(wr)
     115          # Dead reference:
     116          del o
     117          repr(wr)
     118  
     119      def test_repr_failure_gh99184(self):
     120          class ESC[4;38;5;81mMyConfig(ESC[4;38;5;149mdict):
     121              def __getattr__(self, x):
     122                  return self[x]
     123  
     124          obj = MyConfig(offset=5)
     125          obj_weakref = weakref.ref(obj)
     126  
     127          self.assertIn('MyConfig', repr(obj_weakref))
     128          self.assertIn('MyConfig', str(obj_weakref))
     129  
     130      def test_basic_callback(self):
     131          self.check_basic_callback(C)
     132          self.check_basic_callback(create_function)
     133          self.check_basic_callback(create_bound_method)
     134  
     135      @support.cpython_only
     136      def test_cfunction(self):
     137          import _testcapi
     138          create_cfunction = _testcapi.create_cfunction
     139          f = create_cfunction()
     140          wr = weakref.ref(f)
     141          self.assertIs(wr(), f)
     142          del f
     143          self.assertIsNone(wr())
     144          self.check_basic_ref(create_cfunction)
     145          self.check_basic_callback(create_cfunction)
     146  
     147      def test_multiple_callbacks(self):
     148          o = C()
     149          ref1 = weakref.ref(o, self.callback)
     150          ref2 = weakref.ref(o, self.callback)
     151          del o
     152          gc_collect()  # For PyPy or other GCs.
     153          self.assertIsNone(ref1(), "expected reference to be invalidated")
     154          self.assertIsNone(ref2(), "expected reference to be invalidated")
     155          self.assertEqual(self.cbcalled, 2,
     156                       "callback not called the right number of times")
     157  
     158      def test_multiple_selfref_callbacks(self):
     159          # Make sure all references are invalidated before callbacks are called
     160          #
     161          # What's important here is that we're using the first
     162          # reference in the callback invoked on the second reference
     163          # (the most recently created ref is cleaned up first).  This
     164          # tests that all references to the object are invalidated
     165          # before any of the callbacks are invoked, so that we only
     166          # have one invocation of _weakref.c:cleanup_helper() active
     167          # for a particular object at a time.
     168          #
     169          def callback(object, self=self):
     170              self.ref()
     171          c = C()
     172          self.ref = weakref.ref(c, callback)
     173          ref1 = weakref.ref(c, callback)
     174          del c
     175  
     176      def test_constructor_kwargs(self):
     177          c = C()
     178          self.assertRaises(TypeError, weakref.ref, c, callback=None)
     179  
     180      def test_proxy_ref(self):
     181          o = C()
     182          o.bar = 1
     183          ref1 = weakref.proxy(o, self.callback)
     184          ref2 = weakref.proxy(o, self.callback)
     185          del o
     186          gc_collect()  # For PyPy or other GCs.
     187  
     188          def check(proxy):
     189              proxy.bar
     190  
     191          self.assertRaises(ReferenceError, check, ref1)
     192          self.assertRaises(ReferenceError, check, ref2)
     193          ref3 = weakref.proxy(C())
     194          gc_collect()  # For PyPy or other GCs.
     195          self.assertRaises(ReferenceError, bool, ref3)
     196          self.assertEqual(self.cbcalled, 2)
     197  
     198      def check_basic_ref(self, factory):
     199          o = factory()
     200          ref = weakref.ref(o)
     201          self.assertIsNotNone(ref(),
     202                       "weak reference to live object should be live")
     203          o2 = ref()
     204          self.assertIs(o, o2,
     205                       "<ref>() should return original object if live")
     206  
     207      def check_basic_callback(self, factory):
     208          self.cbcalled = 0
     209          o = factory()
     210          ref = weakref.ref(o, self.callback)
     211          del o
     212          gc_collect()  # For PyPy or other GCs.
     213          self.assertEqual(self.cbcalled, 1,
     214                       "callback did not properly set 'cbcalled'")
     215          self.assertIsNone(ref(),
     216                       "ref2 should be dead after deleting object reference")
     217  
     218      def test_ref_reuse(self):
     219          o = C()
     220          ref1 = weakref.ref(o)
     221          # create a proxy to make sure that there's an intervening creation
     222          # between these two; it should make no difference
     223          proxy = weakref.proxy(o)
     224          ref2 = weakref.ref(o)
     225          self.assertIs(ref1, ref2,
     226                       "reference object w/out callback should be re-used")
     227  
     228          o = C()
     229          proxy = weakref.proxy(o)
     230          ref1 = weakref.ref(o)
     231          ref2 = weakref.ref(o)
     232          self.assertIs(ref1, ref2,
     233                       "reference object w/out callback should be re-used")
     234          self.assertEqual(weakref.getweakrefcount(o), 2,
     235                       "wrong weak ref count for object")
     236          del proxy
     237          gc_collect()  # For PyPy or other GCs.
     238          self.assertEqual(weakref.getweakrefcount(o), 1,
     239                       "wrong weak ref count for object after deleting proxy")
     240  
     241      def test_proxy_reuse(self):
     242          o = C()
     243          proxy1 = weakref.proxy(o)
     244          ref = weakref.ref(o)
     245          proxy2 = weakref.proxy(o)
     246          self.assertIs(proxy1, proxy2,
     247                       "proxy object w/out callback should have been re-used")
     248  
     249      def test_basic_proxy(self):
     250          o = C()
     251          self.check_proxy(o, weakref.proxy(o))
     252  
     253          L = collections.UserList()
     254          p = weakref.proxy(L)
     255          self.assertFalse(p, "proxy for empty UserList should be false")
     256          p.append(12)
     257          self.assertEqual(len(L), 1)
     258          self.assertTrue(p, "proxy for non-empty UserList should be true")
     259          p[:] = [2, 3]
     260          self.assertEqual(len(L), 2)
     261          self.assertEqual(len(p), 2)
     262          self.assertIn(3, p, "proxy didn't support __contains__() properly")
     263          p[1] = 5
     264          self.assertEqual(L[1], 5)
     265          self.assertEqual(p[1], 5)
     266          L2 = collections.UserList(L)
     267          p2 = weakref.proxy(L2)
     268          self.assertEqual(p, p2)
     269          ## self.assertEqual(repr(L2), repr(p2))
     270          L3 = collections.UserList(range(10))
     271          p3 = weakref.proxy(L3)
     272          self.assertEqual(L3[:], p3[:])
     273          self.assertEqual(L3[5:], p3[5:])
     274          self.assertEqual(L3[:5], p3[:5])
     275          self.assertEqual(L3[2:5], p3[2:5])
     276  
     277      def test_proxy_unicode(self):
     278          # See bug 5037
     279          class ESC[4;38;5;81mC(ESC[4;38;5;149mobject):
     280              def __str__(self):
     281                  return "string"
     282              def __bytes__(self):
     283                  return b"bytes"
     284          instance = C()
     285          self.assertIn("__bytes__", dir(weakref.proxy(instance)))
     286          self.assertEqual(bytes(weakref.proxy(instance)), b"bytes")
     287  
     288      def test_proxy_index(self):
     289          class ESC[4;38;5;81mC:
     290              def __index__(self):
     291                  return 10
     292          o = C()
     293          p = weakref.proxy(o)
     294          self.assertEqual(operator.index(p), 10)
     295  
     296      def test_proxy_div(self):
     297          class ESC[4;38;5;81mC:
     298              def __floordiv__(self, other):
     299                  return 42
     300              def __ifloordiv__(self, other):
     301                  return 21
     302          o = C()
     303          p = weakref.proxy(o)
     304          self.assertEqual(p // 5, 42)
     305          p //= 5
     306          self.assertEqual(p, 21)
     307  
     308      def test_proxy_matmul(self):
     309          class ESC[4;38;5;81mC:
     310              def __matmul__(self, other):
     311                  return 1729
     312              def __rmatmul__(self, other):
     313                  return -163
     314              def __imatmul__(self, other):
     315                  return 561
     316          o = C()
     317          p = weakref.proxy(o)
     318          self.assertEqual(p @ 5, 1729)
     319          self.assertEqual(5 @ p, -163)
     320          p @= 5
     321          self.assertEqual(p, 561)
     322  
     323      # The PyWeakref_* C API is documented as allowing either NULL or
     324      # None as the value for the callback, where either means "no
     325      # callback".  The "no callback" ref and proxy objects are supposed
     326      # to be shared so long as they exist by all callers so long as
     327      # they are active.  In Python 2.3.3 and earlier, this guarantee
     328      # was not honored, and was broken in different ways for
     329      # PyWeakref_NewRef() and PyWeakref_NewProxy().  (Two tests.)
     330  
     331      def test_shared_ref_without_callback(self):
     332          self.check_shared_without_callback(weakref.ref)
     333  
     334      def test_shared_proxy_without_callback(self):
     335          self.check_shared_without_callback(weakref.proxy)
     336  
     337      def check_shared_without_callback(self, makeref):
     338          o = Object(1)
     339          p1 = makeref(o, None)
     340          p2 = makeref(o, None)
     341          self.assertIs(p1, p2, "both callbacks were None in the C API")
     342          del p1, p2
     343          p1 = makeref(o)
     344          p2 = makeref(o, None)
     345          self.assertIs(p1, p2, "callbacks were NULL, None in the C API")
     346          del p1, p2
     347          p1 = makeref(o)
     348          p2 = makeref(o)
     349          self.assertIs(p1, p2, "both callbacks were NULL in the C API")
     350          del p1, p2
     351          p1 = makeref(o, None)
     352          p2 = makeref(o)
     353          self.assertIs(p1, p2, "callbacks were None, NULL in the C API")
     354  
     355      def test_callable_proxy(self):
     356          o = Callable()
     357          ref1 = weakref.proxy(o)
     358  
     359          self.check_proxy(o, ref1)
     360  
     361          self.assertIs(type(ref1), weakref.CallableProxyType,
     362                       "proxy is not of callable type")
     363          ref1('twinkies!')
     364          self.assertEqual(o.bar, 'twinkies!',
     365                       "call through proxy not passed through to original")
     366          ref1(x='Splat.')
     367          self.assertEqual(o.bar, 'Splat.',
     368                       "call through proxy not passed through to original")
     369  
     370          # expect due to too few args
     371          self.assertRaises(TypeError, ref1)
     372  
     373          # expect due to too many args
     374          self.assertRaises(TypeError, ref1, 1, 2, 3)
     375  
     376      def check_proxy(self, o, proxy):
     377          o.foo = 1
     378          self.assertEqual(proxy.foo, 1,
     379                       "proxy does not reflect attribute addition")
     380          o.foo = 2
     381          self.assertEqual(proxy.foo, 2,
     382                       "proxy does not reflect attribute modification")
     383          del o.foo
     384          self.assertFalse(hasattr(proxy, 'foo'),
     385                       "proxy does not reflect attribute removal")
     386  
     387          proxy.foo = 1
     388          self.assertEqual(o.foo, 1,
     389                       "object does not reflect attribute addition via proxy")
     390          proxy.foo = 2
     391          self.assertEqual(o.foo, 2,
     392              "object does not reflect attribute modification via proxy")
     393          del proxy.foo
     394          self.assertFalse(hasattr(o, 'foo'),
     395                       "object does not reflect attribute removal via proxy")
     396  
     397      def test_proxy_deletion(self):
     398          # Test clearing of SF bug #762891
     399          class ESC[4;38;5;81mFoo:
     400              result = None
     401              def __delitem__(self, accessor):
     402                  self.result = accessor
     403          g = Foo()
     404          f = weakref.proxy(g)
     405          del f[0]
     406          self.assertEqual(f.result, 0)
     407  
     408      def test_proxy_bool(self):
     409          # Test clearing of SF bug #1170766
     410          class ESC[4;38;5;81mList(ESC[4;38;5;149mlist): pass
     411          lyst = List()
     412          self.assertEqual(bool(weakref.proxy(lyst)), bool(lyst))
     413  
     414      def test_proxy_iter(self):
     415          # Test fails with a debug build of the interpreter
     416          # (see bpo-38395).
     417  
     418          obj = None
     419  
     420          class ESC[4;38;5;81mMyObj:
     421              def __iter__(self):
     422                  nonlocal obj
     423                  del obj
     424                  return NotImplemented
     425  
     426          obj = MyObj()
     427          p = weakref.proxy(obj)
     428          with self.assertRaises(TypeError):
     429              # "blech" in p calls MyObj.__iter__ through the proxy,
     430              # without keeping a reference to the real object, so it
     431              # can be killed in the middle of the call
     432              "blech" in p
     433  
     434      def test_proxy_next(self):
     435          arr = [4, 5, 6]
     436          def iterator_func():
     437              yield from arr
     438          it = iterator_func()
     439  
     440          class ESC[4;38;5;81mIteratesWeakly:
     441              def __iter__(self):
     442                  return weakref.proxy(it)
     443  
     444          weak_it = IteratesWeakly()
     445  
     446          # Calls proxy.__next__
     447          self.assertEqual(list(weak_it), [4, 5, 6])
     448  
     449      def test_proxy_bad_next(self):
     450          # bpo-44720: PyIter_Next() shouldn't be called if the reference
     451          # isn't an iterator.
     452  
     453          not_an_iterator = lambda: 0
     454  
     455          class ESC[4;38;5;81mA:
     456              def __iter__(self):
     457                  return weakref.proxy(not_an_iterator)
     458          a = A()
     459  
     460          msg = "Weakref proxy referenced a non-iterator"
     461          with self.assertRaisesRegex(TypeError, msg):
     462              list(a)
     463  
     464      def test_proxy_reversed(self):
     465          class ESC[4;38;5;81mMyObj:
     466              def __len__(self):
     467                  return 3
     468              def __reversed__(self):
     469                  return iter('cba')
     470  
     471          obj = MyObj()
     472          self.assertEqual("".join(reversed(weakref.proxy(obj))), "cba")
     473  
     474      def test_proxy_hash(self):
     475          class ESC[4;38;5;81mMyObj:
     476              def __hash__(self):
     477                  return 42
     478  
     479          obj = MyObj()
     480          with self.assertRaises(TypeError):
     481              hash(weakref.proxy(obj))
     482  
     483          class ESC[4;38;5;81mMyObj:
     484              __hash__ = None
     485  
     486          obj = MyObj()
     487          with self.assertRaises(TypeError):
     488              hash(weakref.proxy(obj))
     489  
     490      def test_getweakrefcount(self):
     491          o = C()
     492          ref1 = weakref.ref(o)
     493          ref2 = weakref.ref(o, self.callback)
     494          self.assertEqual(weakref.getweakrefcount(o), 2,
     495                       "got wrong number of weak reference objects")
     496  
     497          proxy1 = weakref.proxy(o)
     498          proxy2 = weakref.proxy(o, self.callback)
     499          self.assertEqual(weakref.getweakrefcount(o), 4,
     500                       "got wrong number of weak reference objects")
     501  
     502          del ref1, ref2, proxy1, proxy2
     503          gc_collect()  # For PyPy or other GCs.
     504          self.assertEqual(weakref.getweakrefcount(o), 0,
     505                       "weak reference objects not unlinked from"
     506                       " referent when discarded.")
     507  
     508          # assumes ints do not support weakrefs
     509          self.assertEqual(weakref.getweakrefcount(1), 0,
     510                       "got wrong number of weak reference objects for int")
     511  
     512      def test_getweakrefs(self):
     513          o = C()
     514          ref1 = weakref.ref(o, self.callback)
     515          ref2 = weakref.ref(o, self.callback)
     516          del ref1
     517          gc_collect()  # For PyPy or other GCs.
     518          self.assertEqual(weakref.getweakrefs(o), [ref2],
     519                       "list of refs does not match")
     520  
     521          o = C()
     522          ref1 = weakref.ref(o, self.callback)
     523          ref2 = weakref.ref(o, self.callback)
     524          del ref2
     525          gc_collect()  # For PyPy or other GCs.
     526          self.assertEqual(weakref.getweakrefs(o), [ref1],
     527                       "list of refs does not match")
     528  
     529          del ref1
     530          gc_collect()  # For PyPy or other GCs.
     531          self.assertEqual(weakref.getweakrefs(o), [],
     532                       "list of refs not cleared")
     533  
     534          # assumes ints do not support weakrefs
     535          self.assertEqual(weakref.getweakrefs(1), [],
     536                       "list of refs does not match for int")
     537  
     538      def test_newstyle_number_ops(self):
     539          class ESC[4;38;5;81mF(ESC[4;38;5;149mfloat):
     540              pass
     541          f = F(2.0)
     542          p = weakref.proxy(f)
     543          self.assertEqual(p + 1.0, 3.0)
     544          self.assertEqual(1.0 + p, 3.0)  # this used to SEGV
     545  
     546      def test_callbacks_protected(self):
     547          # Callbacks protected from already-set exceptions?
     548          # Regression test for SF bug #478534.
     549          class ESC[4;38;5;81mBogusError(ESC[4;38;5;149mException):
     550              pass
     551          data = {}
     552          def remove(k):
     553              del data[k]
     554          def encapsulate():
     555              f = lambda : ()
     556              data[weakref.ref(f, remove)] = None
     557              raise BogusError
     558          try:
     559              encapsulate()
     560          except BogusError:
     561              pass
     562          else:
     563              self.fail("exception not properly restored")
     564          try:
     565              encapsulate()
     566          except BogusError:
     567              pass
     568          else:
     569              self.fail("exception not properly restored")
     570  
     571      def test_sf_bug_840829(self):
     572          # "weakref callbacks and gc corrupt memory"
     573          # subtype_dealloc erroneously exposed a new-style instance
     574          # already in the process of getting deallocated to gc,
     575          # causing double-deallocation if the instance had a weakref
     576          # callback that triggered gc.
     577          # If the bug exists, there probably won't be an obvious symptom
     578          # in a release build.  In a debug build, a segfault will occur
     579          # when the second attempt to remove the instance from the "list
     580          # of all objects" occurs.
     581  
     582          import gc
     583  
     584          class ESC[4;38;5;81mC(ESC[4;38;5;149mobject):
     585              pass
     586  
     587          c = C()
     588          wr = weakref.ref(c, lambda ignore: gc.collect())
     589          del c
     590  
     591          # There endeth the first part.  It gets worse.
     592          del wr
     593  
     594          c1 = C()
     595          c1.i = C()
     596          wr = weakref.ref(c1.i, lambda ignore: gc.collect())
     597  
     598          c2 = C()
     599          c2.c1 = c1
     600          del c1  # still alive because c2 points to it
     601  
     602          # Now when subtype_dealloc gets called on c2, it's not enough just
     603          # that c2 is immune from gc while the weakref callbacks associated
     604          # with c2 execute (there are none in this 2nd half of the test, btw).
     605          # subtype_dealloc goes on to call the base classes' deallocs too,
     606          # so any gc triggered by weakref callbacks associated with anything
     607          # torn down by a base class dealloc can also trigger double
     608          # deallocation of c2.
     609          del c2
     610  
     611      def test_callback_in_cycle(self):
     612          import gc
     613  
     614          class ESC[4;38;5;81mJ(ESC[4;38;5;149mobject):
     615              pass
     616  
     617          class ESC[4;38;5;81mII(ESC[4;38;5;149mobject):
     618              def acallback(self, ignore):
     619                  self.J
     620  
     621          I = II()
     622          I.J = J
     623          I.wr = weakref.ref(J, I.acallback)
     624  
     625          # Now J and II are each in a self-cycle (as all new-style class
     626          # objects are, since their __mro__ points back to them).  I holds
     627          # both a weak reference (I.wr) and a strong reference (I.J) to class
     628          # J.  I is also in a cycle (I.wr points to a weakref that references
     629          # I.acallback).  When we del these three, they all become trash, but
     630          # the cycles prevent any of them from getting cleaned up immediately.
     631          # Instead they have to wait for cyclic gc to deduce that they're
     632          # trash.
     633          #
     634          # gc used to call tp_clear on all of them, and the order in which
     635          # it does that is pretty accidental.  The exact order in which we
     636          # built up these things manages to provoke gc into running tp_clear
     637          # in just the right order (I last).  Calling tp_clear on II leaves
     638          # behind an insane class object (its __mro__ becomes NULL).  Calling
     639          # tp_clear on J breaks its self-cycle, but J doesn't get deleted
     640          # just then because of the strong reference from I.J.  Calling
     641          # tp_clear on I starts to clear I's __dict__, and just happens to
     642          # clear I.J first -- I.wr is still intact.  That removes the last
     643          # reference to J, which triggers the weakref callback.  The callback
     644          # tries to do "self.J", and instances of new-style classes look up
     645          # attributes ("J") in the class dict first.  The class (II) wants to
     646          # search II.__mro__, but that's NULL.   The result was a segfault in
     647          # a release build, and an assert failure in a debug build.
     648          del I, J, II
     649          gc.collect()
     650  
     651      def test_callback_reachable_one_way(self):
     652          import gc
     653  
     654          # This one broke the first patch that fixed the previous test. In this case,
     655          # the objects reachable from the callback aren't also reachable
     656          # from the object (c1) *triggering* the callback:  you can get to
     657          # c1 from c2, but not vice-versa.  The result was that c2's __dict__
     658          # got tp_clear'ed by the time the c2.cb callback got invoked.
     659  
     660          class ESC[4;38;5;81mC:
     661              def cb(self, ignore):
     662                  self.me
     663                  self.c1
     664                  self.wr
     665  
     666          c1, c2 = C(), C()
     667  
     668          c2.me = c2
     669          c2.c1 = c1
     670          c2.wr = weakref.ref(c1, c2.cb)
     671  
     672          del c1, c2
     673          gc.collect()
     674  
     675      def test_callback_different_classes(self):
     676          import gc
     677  
     678          # Like test_callback_reachable_one_way, except c2 and c1 have different
     679          # classes.  c2's class (C) isn't reachable from c1 then, so protecting
     680          # objects reachable from the dying object (c1) isn't enough to stop
     681          # c2's class (C) from getting tp_clear'ed before c2.cb is invoked.
     682          # The result was a segfault (C.__mro__ was NULL when the callback
     683          # tried to look up self.me).
     684  
     685          class ESC[4;38;5;81mC(ESC[4;38;5;149mobject):
     686              def cb(self, ignore):
     687                  self.me
     688                  self.c1
     689                  self.wr
     690  
     691          class ESC[4;38;5;81mD:
     692              pass
     693  
     694          c1, c2 = D(), C()
     695  
     696          c2.me = c2
     697          c2.c1 = c1
     698          c2.wr = weakref.ref(c1, c2.cb)
     699  
     700          del c1, c2, C, D
     701          gc.collect()
     702  
     703      def test_callback_in_cycle_resurrection(self):
     704          import gc
     705  
     706          # Do something nasty in a weakref callback:  resurrect objects
     707          # from dead cycles.  For this to be attempted, the weakref and
     708          # its callback must also be part of the cyclic trash (else the
     709          # objects reachable via the callback couldn't be in cyclic trash
     710          # to begin with -- the callback would act like an external root).
     711          # But gc clears trash weakrefs with callbacks early now, which
     712          # disables the callbacks, so the callbacks shouldn't get called
     713          # at all (and so nothing actually gets resurrected).
     714  
     715          alist = []
     716          class ESC[4;38;5;81mC(ESC[4;38;5;149mobject):
     717              def __init__(self, value):
     718                  self.attribute = value
     719  
     720              def acallback(self, ignore):
     721                  alist.append(self.c)
     722  
     723          c1, c2 = C(1), C(2)
     724          c1.c = c2
     725          c2.c = c1
     726          c1.wr = weakref.ref(c2, c1.acallback)
     727          c2.wr = weakref.ref(c1, c2.acallback)
     728  
     729          def C_went_away(ignore):
     730              alist.append("C went away")
     731          wr = weakref.ref(C, C_went_away)
     732  
     733          del c1, c2, C   # make them all trash
     734          self.assertEqual(alist, [])  # del isn't enough to reclaim anything
     735  
     736          gc.collect()
     737          # c1.wr and c2.wr were part of the cyclic trash, so should have
     738          # been cleared without their callbacks executing.  OTOH, the weakref
     739          # to C is bound to a function local (wr), and wasn't trash, so that
     740          # callback should have been invoked when C went away.
     741          self.assertEqual(alist, ["C went away"])
     742          # The remaining weakref should be dead now (its callback ran).
     743          self.assertEqual(wr(), None)
     744  
     745          del alist[:]
     746          gc.collect()
     747          self.assertEqual(alist, [])
     748  
     749      def test_callbacks_on_callback(self):
     750          import gc
     751  
     752          # Set up weakref callbacks *on* weakref callbacks.
     753          alist = []
     754          def safe_callback(ignore):
     755              alist.append("safe_callback called")
     756  
     757          class ESC[4;38;5;81mC(ESC[4;38;5;149mobject):
     758              def cb(self, ignore):
     759                  alist.append("cb called")
     760  
     761          c, d = C(), C()
     762          c.other = d
     763          d.other = c
     764          callback = c.cb
     765          c.wr = weakref.ref(d, callback)     # this won't trigger
     766          d.wr = weakref.ref(callback, d.cb)  # ditto
     767          external_wr = weakref.ref(callback, safe_callback)  # but this will
     768          self.assertIs(external_wr(), callback)
     769  
     770          # The weakrefs attached to c and d should get cleared, so that
     771          # C.cb is never called.  But external_wr isn't part of the cyclic
     772          # trash, and no cyclic trash is reachable from it, so safe_callback
     773          # should get invoked when the bound method object callback (c.cb)
     774          # -- which is itself a callback, and also part of the cyclic trash --
     775          # gets reclaimed at the end of gc.
     776  
     777          del callback, c, d, C
     778          self.assertEqual(alist, [])  # del isn't enough to clean up cycles
     779          gc.collect()
     780          self.assertEqual(alist, ["safe_callback called"])
     781          self.assertEqual(external_wr(), None)
     782  
     783          del alist[:]
     784          gc.collect()
     785          self.assertEqual(alist, [])
     786  
     787      def test_gc_during_ref_creation(self):
     788          self.check_gc_during_creation(weakref.ref)
     789  
     790      def test_gc_during_proxy_creation(self):
     791          self.check_gc_during_creation(weakref.proxy)
     792  
     793      def check_gc_during_creation(self, makeref):
     794          thresholds = gc.get_threshold()
     795          gc.set_threshold(1, 1, 1)
     796          gc.collect()
     797          class ESC[4;38;5;81mA:
     798              pass
     799  
     800          def callback(*args):
     801              pass
     802  
     803          referenced = A()
     804  
     805          a = A()
     806          a.a = a
     807          a.wr = makeref(referenced)
     808  
     809          try:
     810              # now make sure the object and the ref get labeled as
     811              # cyclic trash:
     812              a = A()
     813              weakref.ref(referenced, callback)
     814  
     815          finally:
     816              gc.set_threshold(*thresholds)
     817  
     818      def test_ref_created_during_del(self):
     819          # Bug #1377858
     820          # A weakref created in an object's __del__() would crash the
     821          # interpreter when the weakref was cleaned up since it would refer to
     822          # non-existent memory.  This test should not segfault the interpreter.
     823          class ESC[4;38;5;81mTarget(ESC[4;38;5;149mobject):
     824              def __del__(self):
     825                  global ref_from_del
     826                  ref_from_del = weakref.ref(self)
     827  
     828          w = Target()
     829  
     830      def test_init(self):
     831          # Issue 3634
     832          # <weakref to class>.__init__() doesn't check errors correctly
     833          r = weakref.ref(Exception)
     834          self.assertRaises(TypeError, r.__init__, 0, 0, 0, 0, 0)
     835          # No exception should be raised here
     836          gc.collect()
     837  
     838      def test_classes(self):
     839          # Check that classes are weakrefable.
     840          class ESC[4;38;5;81mA(ESC[4;38;5;149mobject):
     841              pass
     842          l = []
     843          weakref.ref(int)
     844          a = weakref.ref(A, l.append)
     845          A = None
     846          gc.collect()
     847          self.assertEqual(a(), None)
     848          self.assertEqual(l, [a])
     849  
     850      def test_equality(self):
     851          # Alive weakrefs defer equality testing to their underlying object.
     852          x = Object(1)
     853          y = Object(1)
     854          z = Object(2)
     855          a = weakref.ref(x)
     856          b = weakref.ref(y)
     857          c = weakref.ref(z)
     858          d = weakref.ref(x)
     859          # Note how we directly test the operators here, to stress both
     860          # __eq__ and __ne__.
     861          self.assertTrue(a == b)
     862          self.assertFalse(a != b)
     863          self.assertFalse(a == c)
     864          self.assertTrue(a != c)
     865          self.assertTrue(a == d)
     866          self.assertFalse(a != d)
     867          self.assertFalse(a == x)
     868          self.assertTrue(a != x)
     869          self.assertTrue(a == ALWAYS_EQ)
     870          self.assertFalse(a != ALWAYS_EQ)
     871          del x, y, z
     872          gc.collect()
     873          for r in a, b, c:
     874              # Sanity check
     875              self.assertIs(r(), None)
     876          # Dead weakrefs compare by identity: whether `a` and `d` are the
     877          # same weakref object is an implementation detail, since they pointed
     878          # to the same original object and didn't have a callback.
     879          # (see issue #16453).
     880          self.assertFalse(a == b)
     881          self.assertTrue(a != b)
     882          self.assertFalse(a == c)
     883          self.assertTrue(a != c)
     884          self.assertEqual(a == d, a is d)
     885          self.assertEqual(a != d, a is not d)
     886  
     887      def test_ordering(self):
     888          # weakrefs cannot be ordered, even if the underlying objects can.
     889          ops = [operator.lt, operator.gt, operator.le, operator.ge]
     890          x = Object(1)
     891          y = Object(1)
     892          a = weakref.ref(x)
     893          b = weakref.ref(y)
     894          for op in ops:
     895              self.assertRaises(TypeError, op, a, b)
     896          # Same when dead.
     897          del x, y
     898          gc.collect()
     899          for op in ops:
     900              self.assertRaises(TypeError, op, a, b)
     901  
     902      def test_hashing(self):
     903          # Alive weakrefs hash the same as the underlying object
     904          x = Object(42)
     905          y = Object(42)
     906          a = weakref.ref(x)
     907          b = weakref.ref(y)
     908          self.assertEqual(hash(a), hash(42))
     909          del x, y
     910          gc.collect()
     911          # Dead weakrefs:
     912          # - retain their hash is they were hashed when alive;
     913          # - otherwise, cannot be hashed.
     914          self.assertEqual(hash(a), hash(42))
     915          self.assertRaises(TypeError, hash, b)
     916  
     917      def test_trashcan_16602(self):
     918          # Issue #16602: when a weakref's target was part of a long
     919          # deallocation chain, the trashcan mechanism could delay clearing
     920          # of the weakref and make the target object visible from outside
     921          # code even though its refcount had dropped to 0.  A crash ensued.
     922          class ESC[4;38;5;81mC:
     923              def __init__(self, parent):
     924                  if not parent:
     925                      return
     926                  wself = weakref.ref(self)
     927                  def cb(wparent):
     928                      o = wself()
     929                  self.wparent = weakref.ref(parent, cb)
     930  
     931          d = weakref.WeakKeyDictionary()
     932          root = c = C(None)
     933          for n in range(100):
     934              d[c] = c = C(c)
     935          del root
     936          gc.collect()
     937  
     938      def test_callback_attribute(self):
     939          x = Object(1)
     940          callback = lambda ref: None
     941          ref1 = weakref.ref(x, callback)
     942          self.assertIs(ref1.__callback__, callback)
     943  
     944          ref2 = weakref.ref(x)
     945          self.assertIsNone(ref2.__callback__)
     946  
     947      def test_callback_attribute_after_deletion(self):
     948          x = Object(1)
     949          ref = weakref.ref(x, self.callback)
     950          self.assertIsNotNone(ref.__callback__)
     951          del x
     952          support.gc_collect()
     953          self.assertIsNone(ref.__callback__)
     954  
     955      def test_set_callback_attribute(self):
     956          x = Object(1)
     957          callback = lambda ref: None
     958          ref1 = weakref.ref(x, callback)
     959          with self.assertRaises(AttributeError):
     960              ref1.__callback__ = lambda ref: None
     961  
     962      def test_callback_gcs(self):
     963          class ESC[4;38;5;81mObjectWithDel(ESC[4;38;5;149mObject):
     964              def __del__(self): pass
     965          x = ObjectWithDel(1)
     966          ref1 = weakref.ref(x, lambda ref: support.gc_collect())
     967          del x
     968          support.gc_collect()
     969  
     970  
     971  class ESC[4;38;5;81mSubclassableWeakrefTestCase(ESC[4;38;5;149mTestBase):
     972  
     973      def test_subclass_refs(self):
     974          class ESC[4;38;5;81mMyRef(ESC[4;38;5;149mweakrefESC[4;38;5;149m.ESC[4;38;5;149mref):
     975              def __init__(self, ob, callback=None, value=42):
     976                  self.value = value
     977                  super().__init__(ob, callback)
     978              def __call__(self):
     979                  self.called = True
     980                  return super().__call__()
     981          o = Object("foo")
     982          mr = MyRef(o, value=24)
     983          self.assertIs(mr(), o)
     984          self.assertTrue(mr.called)
     985          self.assertEqual(mr.value, 24)
     986          del o
     987          gc_collect()  # For PyPy or other GCs.
     988          self.assertIsNone(mr())
     989          self.assertTrue(mr.called)
     990  
     991      def test_subclass_refs_dont_replace_standard_refs(self):
     992          class ESC[4;38;5;81mMyRef(ESC[4;38;5;149mweakrefESC[4;38;5;149m.ESC[4;38;5;149mref):
     993              pass
     994          o = Object(42)
     995          r1 = MyRef(o)
     996          r2 = weakref.ref(o)
     997          self.assertIsNot(r1, r2)
     998          self.assertEqual(weakref.getweakrefs(o), [r2, r1])
     999          self.assertEqual(weakref.getweakrefcount(o), 2)
    1000          r3 = MyRef(o)
    1001          self.assertEqual(weakref.getweakrefcount(o), 3)
    1002          refs = weakref.getweakrefs(o)
    1003          self.assertEqual(len(refs), 3)
    1004          self.assertIs(r2, refs[0])
    1005          self.assertIn(r1, refs[1:])
    1006          self.assertIn(r3, refs[1:])
    1007  
    1008      def test_subclass_refs_dont_conflate_callbacks(self):
    1009          class ESC[4;38;5;81mMyRef(ESC[4;38;5;149mweakrefESC[4;38;5;149m.ESC[4;38;5;149mref):
    1010              pass
    1011          o = Object(42)
    1012          r1 = MyRef(o, id)
    1013          r2 = MyRef(o, str)
    1014          self.assertIsNot(r1, r2)
    1015          refs = weakref.getweakrefs(o)
    1016          self.assertIn(r1, refs)
    1017          self.assertIn(r2, refs)
    1018  
    1019      def test_subclass_refs_with_slots(self):
    1020          class ESC[4;38;5;81mMyRef(ESC[4;38;5;149mweakrefESC[4;38;5;149m.ESC[4;38;5;149mref):
    1021              __slots__ = "slot1", "slot2"
    1022              def __new__(type, ob, callback, slot1, slot2):
    1023                  return weakref.ref.__new__(type, ob, callback)
    1024              def __init__(self, ob, callback, slot1, slot2):
    1025                  self.slot1 = slot1
    1026                  self.slot2 = slot2
    1027              def meth(self):
    1028                  return self.slot1 + self.slot2
    1029          o = Object(42)
    1030          r = MyRef(o, None, "abc", "def")
    1031          self.assertEqual(r.slot1, "abc")
    1032          self.assertEqual(r.slot2, "def")
    1033          self.assertEqual(r.meth(), "abcdef")
    1034          self.assertFalse(hasattr(r, "__dict__"))
    1035  
    1036      def test_subclass_refs_with_cycle(self):
    1037          """Confirm https://bugs.python.org/issue3100 is fixed."""
    1038          # An instance of a weakref subclass can have attributes.
    1039          # If such a weakref holds the only strong reference to the object,
    1040          # deleting the weakref will delete the object. In this case,
    1041          # the callback must not be called, because the ref object is
    1042          # being deleted.
    1043          class ESC[4;38;5;81mMyRef(ESC[4;38;5;149mweakrefESC[4;38;5;149m.ESC[4;38;5;149mref):
    1044              pass
    1045  
    1046          # Use a local callback, for "regrtest -R::"
    1047          # to detect refcounting problems
    1048          def callback(w):
    1049              self.cbcalled += 1
    1050  
    1051          o = C()
    1052          r1 = MyRef(o, callback)
    1053          r1.o = o
    1054          del o
    1055  
    1056          del r1 # Used to crash here
    1057  
    1058          self.assertEqual(self.cbcalled, 0)
    1059  
    1060          # Same test, with two weakrefs to the same object
    1061          # (since code paths are different)
    1062          o = C()
    1063          r1 = MyRef(o, callback)
    1064          r2 = MyRef(o, callback)
    1065          r1.r = r2
    1066          r2.o = o
    1067          del o
    1068          del r2
    1069  
    1070          del r1 # Used to crash here
    1071  
    1072          self.assertEqual(self.cbcalled, 0)
    1073  
    1074  
    1075  class ESC[4;38;5;81mWeakMethodTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1076  
    1077      def _subclass(self):
    1078          """Return an Object subclass overriding `some_method`."""
    1079          class ESC[4;38;5;81mC(ESC[4;38;5;149mObject):
    1080              def some_method(self):
    1081                  return 6
    1082          return C
    1083  
    1084      def test_alive(self):
    1085          o = Object(1)
    1086          r = weakref.WeakMethod(o.some_method)
    1087          self.assertIsInstance(r, weakref.ReferenceType)
    1088          self.assertIsInstance(r(), type(o.some_method))
    1089          self.assertIs(r().__self__, o)
    1090          self.assertIs(r().__func__, o.some_method.__func__)
    1091          self.assertEqual(r()(), 4)
    1092  
    1093      def test_object_dead(self):
    1094          o = Object(1)
    1095          r = weakref.WeakMethod(o.some_method)
    1096          del o
    1097          gc.collect()
    1098          self.assertIs(r(), None)
    1099  
    1100      def test_method_dead(self):
    1101          C = self._subclass()
    1102          o = C(1)
    1103          r = weakref.WeakMethod(o.some_method)
    1104          del C.some_method
    1105          gc.collect()
    1106          self.assertIs(r(), None)
    1107  
    1108      def test_callback_when_object_dead(self):
    1109          # Test callback behaviour when object dies first.
    1110          C = self._subclass()
    1111          calls = []
    1112          def cb(arg):
    1113              calls.append(arg)
    1114          o = C(1)
    1115          r = weakref.WeakMethod(o.some_method, cb)
    1116          del o
    1117          gc.collect()
    1118          self.assertEqual(calls, [r])
    1119          # Callback is only called once.
    1120          C.some_method = Object.some_method
    1121          gc.collect()
    1122          self.assertEqual(calls, [r])
    1123  
    1124      def test_callback_when_method_dead(self):
    1125          # Test callback behaviour when method dies first.
    1126          C = self._subclass()
    1127          calls = []
    1128          def cb(arg):
    1129              calls.append(arg)
    1130          o = C(1)
    1131          r = weakref.WeakMethod(o.some_method, cb)
    1132          del C.some_method
    1133          gc.collect()
    1134          self.assertEqual(calls, [r])
    1135          # Callback is only called once.
    1136          del o
    1137          gc.collect()
    1138          self.assertEqual(calls, [r])
    1139  
    1140      @support.cpython_only
    1141      def test_no_cycles(self):
    1142          # A WeakMethod doesn't create any reference cycle to itself.
    1143          o = Object(1)
    1144          def cb(_):
    1145              pass
    1146          r = weakref.WeakMethod(o.some_method, cb)
    1147          wr = weakref.ref(r)
    1148          del r
    1149          self.assertIs(wr(), None)
    1150  
    1151      def test_equality(self):
    1152          def _eq(a, b):
    1153              self.assertTrue(a == b)
    1154              self.assertFalse(a != b)
    1155          def _ne(a, b):
    1156              self.assertTrue(a != b)
    1157              self.assertFalse(a == b)
    1158          x = Object(1)
    1159          y = Object(1)
    1160          a = weakref.WeakMethod(x.some_method)
    1161          b = weakref.WeakMethod(y.some_method)
    1162          c = weakref.WeakMethod(x.other_method)
    1163          d = weakref.WeakMethod(y.other_method)
    1164          # Objects equal, same method
    1165          _eq(a, b)
    1166          _eq(c, d)
    1167          # Objects equal, different method
    1168          _ne(a, c)
    1169          _ne(a, d)
    1170          _ne(b, c)
    1171          _ne(b, d)
    1172          # Objects unequal, same or different method
    1173          z = Object(2)
    1174          e = weakref.WeakMethod(z.some_method)
    1175          f = weakref.WeakMethod(z.other_method)
    1176          _ne(a, e)
    1177          _ne(a, f)
    1178          _ne(b, e)
    1179          _ne(b, f)
    1180          # Compare with different types
    1181          _ne(a, x.some_method)
    1182          _eq(a, ALWAYS_EQ)
    1183          del x, y, z
    1184          gc.collect()
    1185          # Dead WeakMethods compare by identity
    1186          refs = a, b, c, d, e, f
    1187          for q in refs:
    1188              for r in refs:
    1189                  self.assertEqual(q == r, q is r)
    1190                  self.assertEqual(q != r, q is not r)
    1191  
    1192      def test_hashing(self):
    1193          # Alive WeakMethods are hashable if the underlying object is
    1194          # hashable.
    1195          x = Object(1)
    1196          y = Object(1)
    1197          a = weakref.WeakMethod(x.some_method)
    1198          b = weakref.WeakMethod(y.some_method)
    1199          c = weakref.WeakMethod(y.other_method)
    1200          # Since WeakMethod objects are equal, the hashes should be equal.
    1201          self.assertEqual(hash(a), hash(b))
    1202          ha = hash(a)
    1203          # Dead WeakMethods retain their old hash value
    1204          del x, y
    1205          gc.collect()
    1206          self.assertEqual(hash(a), ha)
    1207          self.assertEqual(hash(b), ha)
    1208          # If it wasn't hashed when alive, a dead WeakMethod cannot be hashed.
    1209          self.assertRaises(TypeError, hash, c)
    1210  
    1211  
    1212  class ESC[4;38;5;81mMappingTestCase(ESC[4;38;5;149mTestBase):
    1213  
    1214      COUNT = 10
    1215  
    1216      def check_len_cycles(self, dict_type, cons):
    1217          N = 20
    1218          items = [RefCycle() for i in range(N)]
    1219          dct = dict_type(cons(o) for o in items)
    1220          # Keep an iterator alive
    1221          it = dct.items()
    1222          try:
    1223              next(it)
    1224          except StopIteration:
    1225              pass
    1226          del items
    1227          gc.collect()
    1228          n1 = len(dct)
    1229          del it
    1230          gc.collect()
    1231          n2 = len(dct)
    1232          # one item may be kept alive inside the iterator
    1233          self.assertIn(n1, (0, 1))
    1234          self.assertEqual(n2, 0)
    1235  
    1236      def test_weak_keyed_len_cycles(self):
    1237          self.check_len_cycles(weakref.WeakKeyDictionary, lambda k: (k, 1))
    1238  
    1239      def test_weak_valued_len_cycles(self):
    1240          self.check_len_cycles(weakref.WeakValueDictionary, lambda k: (1, k))
    1241  
    1242      def check_len_race(self, dict_type, cons):
    1243          # Extended sanity checks for len() in the face of cyclic collection
    1244          self.addCleanup(gc.set_threshold, *gc.get_threshold())
    1245          for th in range(1, 100):
    1246              N = 20
    1247              gc.collect(0)
    1248              gc.set_threshold(th, th, th)
    1249              items = [RefCycle() for i in range(N)]
    1250              dct = dict_type(cons(o) for o in items)
    1251              del items
    1252              # All items will be collected at next garbage collection pass
    1253              it = dct.items()
    1254              try:
    1255                  next(it)
    1256              except StopIteration:
    1257                  pass
    1258              n1 = len(dct)
    1259              del it
    1260              n2 = len(dct)
    1261              self.assertGreaterEqual(n1, 0)
    1262              self.assertLessEqual(n1, N)
    1263              self.assertGreaterEqual(n2, 0)
    1264              self.assertLessEqual(n2, n1)
    1265  
    1266      def test_weak_keyed_len_race(self):
    1267          self.check_len_race(weakref.WeakKeyDictionary, lambda k: (k, 1))
    1268  
    1269      def test_weak_valued_len_race(self):
    1270          self.check_len_race(weakref.WeakValueDictionary, lambda k: (1, k))
    1271  
    1272      def test_weak_values(self):
    1273          #
    1274          #  This exercises d.copy(), d.items(), d[], del d[], len(d).
    1275          #
    1276          dict, objects = self.make_weak_valued_dict()
    1277          for o in objects:
    1278              self.assertEqual(weakref.getweakrefcount(o), 1)
    1279              self.assertIs(o, dict[o.arg],
    1280                           "wrong object returned by weak dict!")
    1281          items1 = list(dict.items())
    1282          items2 = list(dict.copy().items())
    1283          items1.sort()
    1284          items2.sort()
    1285          self.assertEqual(items1, items2,
    1286                       "cloning of weak-valued dictionary did not work!")
    1287          del items1, items2
    1288          self.assertEqual(len(dict), self.COUNT)
    1289          del objects[0]
    1290          gc_collect()  # For PyPy or other GCs.
    1291          self.assertEqual(len(dict), self.COUNT - 1,
    1292                       "deleting object did not cause dictionary update")
    1293          del objects, o
    1294          gc_collect()  # For PyPy or other GCs.
    1295          self.assertEqual(len(dict), 0,
    1296                       "deleting the values did not clear the dictionary")
    1297          # regression on SF bug #447152:
    1298          dict = weakref.WeakValueDictionary()
    1299          self.assertRaises(KeyError, dict.__getitem__, 1)
    1300          dict[2] = C()
    1301          gc_collect()  # For PyPy or other GCs.
    1302          self.assertRaises(KeyError, dict.__getitem__, 2)
    1303  
    1304      def test_weak_keys(self):
    1305          #
    1306          #  This exercises d.copy(), d.items(), d[] = v, d[], del d[],
    1307          #  len(d), k in d.
    1308          #
    1309          dict, objects = self.make_weak_keyed_dict()
    1310          for o in objects:
    1311              self.assertEqual(weakref.getweakrefcount(o), 1,
    1312                           "wrong number of weak references to %r!" % o)
    1313              self.assertIs(o.arg, dict[o],
    1314                           "wrong object returned by weak dict!")
    1315          items1 = dict.items()
    1316          items2 = dict.copy().items()
    1317          self.assertEqual(set(items1), set(items2),
    1318                       "cloning of weak-keyed dictionary did not work!")
    1319          del items1, items2
    1320          self.assertEqual(len(dict), self.COUNT)
    1321          del objects[0]
    1322          gc_collect()  # For PyPy or other GCs.
    1323          self.assertEqual(len(dict), (self.COUNT - 1),
    1324                       "deleting object did not cause dictionary update")
    1325          del objects, o
    1326          gc_collect()  # For PyPy or other GCs.
    1327          self.assertEqual(len(dict), 0,
    1328                       "deleting the keys did not clear the dictionary")
    1329          o = Object(42)
    1330          dict[o] = "What is the meaning of the universe?"
    1331          self.assertIn(o, dict)
    1332          self.assertNotIn(34, dict)
    1333  
    1334      def test_weak_keyed_iters(self):
    1335          dict, objects = self.make_weak_keyed_dict()
    1336          self.check_iters(dict)
    1337  
    1338          # Test keyrefs()
    1339          refs = dict.keyrefs()
    1340          self.assertEqual(len(refs), len(objects))
    1341          objects2 = list(objects)
    1342          for wr in refs:
    1343              ob = wr()
    1344              self.assertIn(ob, dict)
    1345              self.assertIn(ob, dict)
    1346              self.assertEqual(ob.arg, dict[ob])
    1347              objects2.remove(ob)
    1348          self.assertEqual(len(objects2), 0)
    1349  
    1350          # Test iterkeyrefs()
    1351          objects2 = list(objects)
    1352          self.assertEqual(len(list(dict.keyrefs())), len(objects))
    1353          for wr in dict.keyrefs():
    1354              ob = wr()
    1355              self.assertIn(ob, dict)
    1356              self.assertIn(ob, dict)
    1357              self.assertEqual(ob.arg, dict[ob])
    1358              objects2.remove(ob)
    1359          self.assertEqual(len(objects2), 0)
    1360  
    1361      def test_weak_valued_iters(self):
    1362          dict, objects = self.make_weak_valued_dict()
    1363          self.check_iters(dict)
    1364  
    1365          # Test valuerefs()
    1366          refs = dict.valuerefs()
    1367          self.assertEqual(len(refs), len(objects))
    1368          objects2 = list(objects)
    1369          for wr in refs:
    1370              ob = wr()
    1371              self.assertEqual(ob, dict[ob.arg])
    1372              self.assertEqual(ob.arg, dict[ob.arg].arg)
    1373              objects2.remove(ob)
    1374          self.assertEqual(len(objects2), 0)
    1375  
    1376          # Test itervaluerefs()
    1377          objects2 = list(objects)
    1378          self.assertEqual(len(list(dict.itervaluerefs())), len(objects))
    1379          for wr in dict.itervaluerefs():
    1380              ob = wr()
    1381              self.assertEqual(ob, dict[ob.arg])
    1382              self.assertEqual(ob.arg, dict[ob.arg].arg)
    1383              objects2.remove(ob)
    1384          self.assertEqual(len(objects2), 0)
    1385  
    1386      def check_iters(self, dict):
    1387          # item iterator:
    1388          items = list(dict.items())
    1389          for item in dict.items():
    1390              items.remove(item)
    1391          self.assertFalse(items, "items() did not touch all items")
    1392  
    1393          # key iterator, via __iter__():
    1394          keys = list(dict.keys())
    1395          for k in dict:
    1396              keys.remove(k)
    1397          self.assertFalse(keys, "__iter__() did not touch all keys")
    1398  
    1399          # key iterator, via iterkeys():
    1400          keys = list(dict.keys())
    1401          for k in dict.keys():
    1402              keys.remove(k)
    1403          self.assertFalse(keys, "iterkeys() did not touch all keys")
    1404  
    1405          # value iterator:
    1406          values = list(dict.values())
    1407          for v in dict.values():
    1408              values.remove(v)
    1409          self.assertFalse(values,
    1410                       "itervalues() did not touch all values")
    1411  
    1412      def check_weak_destroy_while_iterating(self, dict, objects, iter_name):
    1413          n = len(dict)
    1414          it = iter(getattr(dict, iter_name)())
    1415          next(it)             # Trigger internal iteration
    1416          # Destroy an object
    1417          del objects[-1]
    1418          gc.collect()    # just in case
    1419          # We have removed either the first consumed object, or another one
    1420          self.assertIn(len(list(it)), [len(objects), len(objects) - 1])
    1421          del it
    1422          # The removal has been committed
    1423          self.assertEqual(len(dict), n - 1)
    1424  
    1425      def check_weak_destroy_and_mutate_while_iterating(self, dict, testcontext):
    1426          # Check that we can explicitly mutate the weak dict without
    1427          # interfering with delayed removal.
    1428          # `testcontext` should create an iterator, destroy one of the
    1429          # weakref'ed objects and then return a new key/value pair corresponding
    1430          # to the destroyed object.
    1431          with testcontext() as (k, v):
    1432              self.assertNotIn(k, dict)
    1433          with testcontext() as (k, v):
    1434              self.assertRaises(KeyError, dict.__delitem__, k)
    1435          self.assertNotIn(k, dict)
    1436          with testcontext() as (k, v):
    1437              self.assertRaises(KeyError, dict.pop, k)
    1438          self.assertNotIn(k, dict)
    1439          with testcontext() as (k, v):
    1440              dict[k] = v
    1441          self.assertEqual(dict[k], v)
    1442          ddict = copy.copy(dict)
    1443          with testcontext() as (k, v):
    1444              dict.update(ddict)
    1445          self.assertEqual(dict, ddict)
    1446          with testcontext() as (k, v):
    1447              dict.clear()
    1448          self.assertEqual(len(dict), 0)
    1449  
    1450      def check_weak_del_and_len_while_iterating(self, dict, testcontext):
    1451          # Check that len() works when both iterating and removing keys
    1452          # explicitly through various means (.pop(), .clear()...), while
    1453          # implicit mutation is deferred because an iterator is alive.
    1454          # (each call to testcontext() should schedule one item for removal
    1455          #  for this test to work properly)
    1456          o = Object(123456)
    1457          with testcontext():
    1458              n = len(dict)
    1459              # Since underlying dict is ordered, first item is popped
    1460              dict.pop(next(dict.keys()))
    1461              self.assertEqual(len(dict), n - 1)
    1462              dict[o] = o
    1463              self.assertEqual(len(dict), n)
    1464          # last item in objects is removed from dict in context shutdown
    1465          with testcontext():
    1466              self.assertEqual(len(dict), n - 1)
    1467              # Then, (o, o) is popped
    1468              dict.popitem()
    1469              self.assertEqual(len(dict), n - 2)
    1470          with testcontext():
    1471              self.assertEqual(len(dict), n - 3)
    1472              del dict[next(dict.keys())]
    1473              self.assertEqual(len(dict), n - 4)
    1474          with testcontext():
    1475              self.assertEqual(len(dict), n - 5)
    1476              dict.popitem()
    1477              self.assertEqual(len(dict), n - 6)
    1478          with testcontext():
    1479              dict.clear()
    1480              self.assertEqual(len(dict), 0)
    1481          self.assertEqual(len(dict), 0)
    1482  
    1483      def test_weak_keys_destroy_while_iterating(self):
    1484          # Issue #7105: iterators shouldn't crash when a key is implicitly removed
    1485          dict, objects = self.make_weak_keyed_dict()
    1486          self.check_weak_destroy_while_iterating(dict, objects, 'keys')
    1487          self.check_weak_destroy_while_iterating(dict, objects, 'items')
    1488          self.check_weak_destroy_while_iterating(dict, objects, 'values')
    1489          self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs')
    1490          dict, objects = self.make_weak_keyed_dict()
    1491          @contextlib.contextmanager
    1492          def testcontext():
    1493              try:
    1494                  it = iter(dict.items())
    1495                  next(it)
    1496                  # Schedule a key/value for removal and recreate it
    1497                  v = objects.pop().arg
    1498                  gc.collect()      # just in case
    1499                  yield Object(v), v
    1500              finally:
    1501                  it = None           # should commit all removals
    1502                  gc.collect()
    1503          self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
    1504          # Issue #21173: len() fragile when keys are both implicitly and
    1505          # explicitly removed.
    1506          dict, objects = self.make_weak_keyed_dict()
    1507          self.check_weak_del_and_len_while_iterating(dict, testcontext)
    1508  
    1509      def test_weak_values_destroy_while_iterating(self):
    1510          # Issue #7105: iterators shouldn't crash when a key is implicitly removed
    1511          dict, objects = self.make_weak_valued_dict()
    1512          self.check_weak_destroy_while_iterating(dict, objects, 'keys')
    1513          self.check_weak_destroy_while_iterating(dict, objects, 'items')
    1514          self.check_weak_destroy_while_iterating(dict, objects, 'values')
    1515          self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs')
    1516          self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs')
    1517          dict, objects = self.make_weak_valued_dict()
    1518          @contextlib.contextmanager
    1519          def testcontext():
    1520              try:
    1521                  it = iter(dict.items())
    1522                  next(it)
    1523                  # Schedule a key/value for removal and recreate it
    1524                  k = objects.pop().arg
    1525                  gc.collect()      # just in case
    1526                  yield k, Object(k)
    1527              finally:
    1528                  it = None           # should commit all removals
    1529                  gc.collect()
    1530          self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
    1531          dict, objects = self.make_weak_valued_dict()
    1532          self.check_weak_del_and_len_while_iterating(dict, testcontext)
    1533  
    1534      def test_make_weak_keyed_dict_from_dict(self):
    1535          o = Object(3)
    1536          dict = weakref.WeakKeyDictionary({o:364})
    1537          self.assertEqual(dict[o], 364)
    1538  
    1539      def test_make_weak_keyed_dict_from_weak_keyed_dict(self):
    1540          o = Object(3)
    1541          dict = weakref.WeakKeyDictionary({o:364})
    1542          dict2 = weakref.WeakKeyDictionary(dict)
    1543          self.assertEqual(dict[o], 364)
    1544  
    1545      def make_weak_keyed_dict(self):
    1546          dict = weakref.WeakKeyDictionary()
    1547          objects = list(map(Object, range(self.COUNT)))
    1548          for o in objects:
    1549              dict[o] = o.arg
    1550          return dict, objects
    1551  
    1552      def test_make_weak_valued_dict_from_dict(self):
    1553          o = Object(3)
    1554          dict = weakref.WeakValueDictionary({364:o})
    1555          self.assertEqual(dict[364], o)
    1556  
    1557      def test_make_weak_valued_dict_from_weak_valued_dict(self):
    1558          o = Object(3)
    1559          dict = weakref.WeakValueDictionary({364:o})
    1560          dict2 = weakref.WeakValueDictionary(dict)
    1561          self.assertEqual(dict[364], o)
    1562  
    1563      def test_make_weak_valued_dict_misc(self):
    1564          # errors
    1565          self.assertRaises(TypeError, weakref.WeakValueDictionary.__init__)
    1566          self.assertRaises(TypeError, weakref.WeakValueDictionary, {}, {})
    1567          self.assertRaises(TypeError, weakref.WeakValueDictionary, (), ())
    1568          # special keyword arguments
    1569          o = Object(3)
    1570          for kw in 'self', 'dict', 'other', 'iterable':
    1571              d = weakref.WeakValueDictionary(**{kw: o})
    1572              self.assertEqual(list(d.keys()), [kw])
    1573              self.assertEqual(d[kw], o)
    1574  
    1575      def make_weak_valued_dict(self):
    1576          dict = weakref.WeakValueDictionary()
    1577          objects = list(map(Object, range(self.COUNT)))
    1578          for o in objects:
    1579              dict[o.arg] = o
    1580          return dict, objects
    1581  
    1582      def check_popitem(self, klass, key1, value1, key2, value2):
    1583          weakdict = klass()
    1584          weakdict[key1] = value1
    1585          weakdict[key2] = value2
    1586          self.assertEqual(len(weakdict), 2)
    1587          k, v = weakdict.popitem()
    1588          self.assertEqual(len(weakdict), 1)
    1589          if k is key1:
    1590              self.assertIs(v, value1)
    1591          else:
    1592              self.assertIs(v, value2)
    1593          k, v = weakdict.popitem()
    1594          self.assertEqual(len(weakdict), 0)
    1595          if k is key1:
    1596              self.assertIs(v, value1)
    1597          else:
    1598              self.assertIs(v, value2)
    1599  
    1600      def test_weak_valued_dict_popitem(self):
    1601          self.check_popitem(weakref.WeakValueDictionary,
    1602                             "key1", C(), "key2", C())
    1603  
    1604      def test_weak_keyed_dict_popitem(self):
    1605          self.check_popitem(weakref.WeakKeyDictionary,
    1606                             C(), "value 1", C(), "value 2")
    1607  
    1608      def check_setdefault(self, klass, key, value1, value2):
    1609          self.assertIsNot(value1, value2,
    1610                       "invalid test"
    1611                       " -- value parameters must be distinct objects")
    1612          weakdict = klass()
    1613          o = weakdict.setdefault(key, value1)
    1614          self.assertIs(o, value1)
    1615          self.assertIn(key, weakdict)
    1616          self.assertIs(weakdict.get(key), value1)
    1617          self.assertIs(weakdict[key], value1)
    1618  
    1619          o = weakdict.setdefault(key, value2)
    1620          self.assertIs(o, value1)
    1621          self.assertIn(key, weakdict)
    1622          self.assertIs(weakdict.get(key), value1)
    1623          self.assertIs(weakdict[key], value1)
    1624  
    1625      def test_weak_valued_dict_setdefault(self):
    1626          self.check_setdefault(weakref.WeakValueDictionary,
    1627                                "key", C(), C())
    1628  
    1629      def test_weak_keyed_dict_setdefault(self):
    1630          self.check_setdefault(weakref.WeakKeyDictionary,
    1631                                C(), "value 1", "value 2")
    1632  
    1633      def check_update(self, klass, dict):
    1634          #
    1635          #  This exercises d.update(), len(d), d.keys(), k in d,
    1636          #  d.get(), d[].
    1637          #
    1638          weakdict = klass()
    1639          weakdict.update(dict)
    1640          self.assertEqual(len(weakdict), len(dict))
    1641          for k in weakdict.keys():
    1642              self.assertIn(k, dict, "mysterious new key appeared in weak dict")
    1643              v = dict.get(k)
    1644              self.assertIs(v, weakdict[k])
    1645              self.assertIs(v, weakdict.get(k))
    1646          for k in dict.keys():
    1647              self.assertIn(k, weakdict, "original key disappeared in weak dict")
    1648              v = dict[k]
    1649              self.assertIs(v, weakdict[k])
    1650              self.assertIs(v, weakdict.get(k))
    1651  
    1652      def test_weak_valued_dict_update(self):
    1653          self.check_update(weakref.WeakValueDictionary,
    1654                            {1: C(), 'a': C(), C(): C()})
    1655          # errors
    1656          self.assertRaises(TypeError, weakref.WeakValueDictionary.update)
    1657          d = weakref.WeakValueDictionary()
    1658          self.assertRaises(TypeError, d.update, {}, {})
    1659          self.assertRaises(TypeError, d.update, (), ())
    1660          self.assertEqual(list(d.keys()), [])
    1661          # special keyword arguments
    1662          o = Object(3)
    1663          for kw in 'self', 'dict', 'other', 'iterable':
    1664              d = weakref.WeakValueDictionary()
    1665              d.update(**{kw: o})
    1666              self.assertEqual(list(d.keys()), [kw])
    1667              self.assertEqual(d[kw], o)
    1668  
    1669      def test_weak_valued_union_operators(self):
    1670          a = C()
    1671          b = C()
    1672          c = C()
    1673          wvd1 = weakref.WeakValueDictionary({1: a})
    1674          wvd2 = weakref.WeakValueDictionary({1: b, 2: a})
    1675          wvd3 = wvd1.copy()
    1676          d1 = {1: c, 3: b}
    1677          pairs = [(5, c), (6, b)]
    1678  
    1679          tmp1 = wvd1 | wvd2 # Between two WeakValueDictionaries
    1680          self.assertEqual(dict(tmp1), dict(wvd1) | dict(wvd2))
    1681          self.assertIs(type(tmp1), weakref.WeakValueDictionary)
    1682          wvd1 |= wvd2
    1683          self.assertEqual(wvd1, tmp1)
    1684  
    1685          tmp2 = wvd2 | d1 # Between WeakValueDictionary and mapping
    1686          self.assertEqual(dict(tmp2), dict(wvd2) | d1)
    1687          self.assertIs(type(tmp2), weakref.WeakValueDictionary)
    1688          wvd2 |= d1
    1689          self.assertEqual(wvd2, tmp2)
    1690  
    1691          tmp3 = wvd3.copy() # Between WeakValueDictionary and iterable key, value
    1692          tmp3 |= pairs
    1693          self.assertEqual(dict(tmp3), dict(wvd3) | dict(pairs))
    1694          self.assertIs(type(tmp3), weakref.WeakValueDictionary)
    1695  
    1696          tmp4 = d1 | wvd3 # Testing .__ror__
    1697          self.assertEqual(dict(tmp4), d1 | dict(wvd3))
    1698          self.assertIs(type(tmp4), weakref.WeakValueDictionary)
    1699  
    1700          del a
    1701          self.assertNotIn(2, tmp1)
    1702          self.assertNotIn(2, tmp2)
    1703          self.assertNotIn(1, tmp3)
    1704          self.assertNotIn(1, tmp4)
    1705  
    1706      def test_weak_keyed_dict_update(self):
    1707          self.check_update(weakref.WeakKeyDictionary,
    1708                            {C(): 1, C(): 2, C(): 3})
    1709  
    1710      def test_weak_keyed_delitem(self):
    1711          d = weakref.WeakKeyDictionary()
    1712          o1 = Object('1')
    1713          o2 = Object('2')
    1714          d[o1] = 'something'
    1715          d[o2] = 'something'
    1716          self.assertEqual(len(d), 2)
    1717          del d[o1]
    1718          self.assertEqual(len(d), 1)
    1719          self.assertEqual(list(d.keys()), [o2])
    1720  
    1721      def test_weak_keyed_union_operators(self):
    1722          o1 = C()
    1723          o2 = C()
    1724          o3 = C()
    1725          wkd1 = weakref.WeakKeyDictionary({o1: 1, o2: 2})
    1726          wkd2 = weakref.WeakKeyDictionary({o3: 3, o1: 4})
    1727          wkd3 = wkd1.copy()
    1728          d1 = {o2: '5', o3: '6'}
    1729          pairs = [(o2, 7), (o3, 8)]
    1730  
    1731          tmp1 = wkd1 | wkd2 # Between two WeakKeyDictionaries
    1732          self.assertEqual(dict(tmp1), dict(wkd1) | dict(wkd2))
    1733          self.assertIs(type(tmp1), weakref.WeakKeyDictionary)
    1734          wkd1 |= wkd2
    1735          self.assertEqual(wkd1, tmp1)
    1736  
    1737          tmp2 = wkd2 | d1 # Between WeakKeyDictionary and mapping
    1738          self.assertEqual(dict(tmp2), dict(wkd2) | d1)
    1739          self.assertIs(type(tmp2), weakref.WeakKeyDictionary)
    1740          wkd2 |= d1
    1741          self.assertEqual(wkd2, tmp2)
    1742  
    1743          tmp3 = wkd3.copy() # Between WeakKeyDictionary and iterable key, value
    1744          tmp3 |= pairs
    1745          self.assertEqual(dict(tmp3), dict(wkd3) | dict(pairs))
    1746          self.assertIs(type(tmp3), weakref.WeakKeyDictionary)
    1747  
    1748          tmp4 = d1 | wkd3 # Testing .__ror__
    1749          self.assertEqual(dict(tmp4), d1 | dict(wkd3))
    1750          self.assertIs(type(tmp4), weakref.WeakKeyDictionary)
    1751  
    1752          del o1
    1753          self.assertNotIn(4, tmp1.values())
    1754          self.assertNotIn(4, tmp2.values())
    1755          self.assertNotIn(1, tmp3.values())
    1756          self.assertNotIn(1, tmp4.values())
    1757  
    1758      def test_weak_valued_delitem(self):
    1759          d = weakref.WeakValueDictionary()
    1760          o1 = Object('1')
    1761          o2 = Object('2')
    1762          d['something'] = o1
    1763          d['something else'] = o2
    1764          self.assertEqual(len(d), 2)
    1765          del d['something']
    1766          self.assertEqual(len(d), 1)
    1767          self.assertEqual(list(d.items()), [('something else', o2)])
    1768  
    1769      def test_weak_keyed_bad_delitem(self):
    1770          d = weakref.WeakKeyDictionary()
    1771          o = Object('1')
    1772          # An attempt to delete an object that isn't there should raise
    1773          # KeyError.  It didn't before 2.3.
    1774          self.assertRaises(KeyError, d.__delitem__, o)
    1775          self.assertRaises(KeyError, d.__getitem__, o)
    1776  
    1777          # If a key isn't of a weakly referencable type, __getitem__ and
    1778          # __setitem__ raise TypeError.  __delitem__ should too.
    1779          self.assertRaises(TypeError, d.__delitem__,  13)
    1780          self.assertRaises(TypeError, d.__getitem__,  13)
    1781          self.assertRaises(TypeError, d.__setitem__,  13, 13)
    1782  
    1783      def test_weak_keyed_cascading_deletes(self):
    1784          # SF bug 742860.  For some reason, before 2.3 __delitem__ iterated
    1785          # over the keys via self.data.iterkeys().  If things vanished from
    1786          # the dict during this (or got added), that caused a RuntimeError.
    1787  
    1788          d = weakref.WeakKeyDictionary()
    1789          mutate = False
    1790  
    1791          class ESC[4;38;5;81mC(ESC[4;38;5;149mobject):
    1792              def __init__(self, i):
    1793                  self.value = i
    1794              def __hash__(self):
    1795                  return hash(self.value)
    1796              def __eq__(self, other):
    1797                  if mutate:
    1798                      # Side effect that mutates the dict, by removing the
    1799                      # last strong reference to a key.
    1800                      del objs[-1]
    1801                  return self.value == other.value
    1802  
    1803          objs = [C(i) for i in range(4)]
    1804          for o in objs:
    1805              d[o] = o.value
    1806          del o   # now the only strong references to keys are in objs
    1807          # Find the order in which iterkeys sees the keys.
    1808          objs = list(d.keys())
    1809          # Reverse it, so that the iteration implementation of __delitem__
    1810          # has to keep looping to find the first object we delete.
    1811          objs.reverse()
    1812  
    1813          # Turn on mutation in C.__eq__.  The first time through the loop,
    1814          # under the iterkeys() business the first comparison will delete
    1815          # the last item iterkeys() would see, and that causes a
    1816          #     RuntimeError: dictionary changed size during iteration
    1817          # when the iterkeys() loop goes around to try comparing the next
    1818          # key.  After this was fixed, it just deletes the last object *our*
    1819          # "for o in obj" loop would have gotten to.
    1820          mutate = True
    1821          count = 0
    1822          for o in objs:
    1823              count += 1
    1824              del d[o]
    1825          gc_collect()  # For PyPy or other GCs.
    1826          self.assertEqual(len(d), 0)
    1827          self.assertEqual(count, 2)
    1828  
    1829      def test_make_weak_valued_dict_repr(self):
    1830          dict = weakref.WeakValueDictionary()
    1831          self.assertRegex(repr(dict), '<WeakValueDictionary at 0x.*>')
    1832  
    1833      def test_make_weak_keyed_dict_repr(self):
    1834          dict = weakref.WeakKeyDictionary()
    1835          self.assertRegex(repr(dict), '<WeakKeyDictionary at 0x.*>')
    1836  
    1837      @threading_helper.requires_working_threading()
    1838      def test_threaded_weak_valued_setdefault(self):
    1839          d = weakref.WeakValueDictionary()
    1840          with collect_in_thread():
    1841              for i in range(100000):
    1842                  x = d.setdefault(10, RefCycle())
    1843                  self.assertIsNot(x, None)  # we never put None in there!
    1844                  del x
    1845  
    1846      @threading_helper.requires_working_threading()
    1847      def test_threaded_weak_valued_pop(self):
    1848          d = weakref.WeakValueDictionary()
    1849          with collect_in_thread():
    1850              for i in range(100000):
    1851                  d[10] = RefCycle()
    1852                  x = d.pop(10, 10)
    1853                  self.assertIsNot(x, None)  # we never put None in there!
    1854  
    1855      @threading_helper.requires_working_threading()
    1856      def test_threaded_weak_valued_consistency(self):
    1857          # Issue #28427: old keys should not remove new values from
    1858          # WeakValueDictionary when collecting from another thread.
    1859          d = weakref.WeakValueDictionary()
    1860          with collect_in_thread():
    1861              for i in range(200000):
    1862                  o = RefCycle()
    1863                  d[10] = o
    1864                  # o is still alive, so the dict can't be empty
    1865                  self.assertEqual(len(d), 1)
    1866                  o = None  # lose ref
    1867  
    1868      def check_threaded_weak_dict_copy(self, type_, deepcopy):
    1869          # `type_` should be either WeakKeyDictionary or WeakValueDictionary.
    1870          # `deepcopy` should be either True or False.
    1871          exc = []
    1872  
    1873          class ESC[4;38;5;81mDummyKey:
    1874              def __init__(self, ctr):
    1875                  self.ctr = ctr
    1876  
    1877          class ESC[4;38;5;81mDummyValue:
    1878              def __init__(self, ctr):
    1879                  self.ctr = ctr
    1880  
    1881          def dict_copy(d, exc):
    1882              try:
    1883                  if deepcopy is True:
    1884                      _ = copy.deepcopy(d)
    1885                  else:
    1886                      _ = d.copy()
    1887              except Exception as ex:
    1888                  exc.append(ex)
    1889  
    1890          def pop_and_collect(lst):
    1891              gc_ctr = 0
    1892              while lst:
    1893                  i = random.randint(0, len(lst) - 1)
    1894                  gc_ctr += 1
    1895                  lst.pop(i)
    1896                  if gc_ctr % 10000 == 0:
    1897                      gc.collect()  # just in case
    1898  
    1899          self.assertIn(type_, (weakref.WeakKeyDictionary, weakref.WeakValueDictionary))
    1900  
    1901          d = type_()
    1902          keys = []
    1903          values = []
    1904          # Initialize d with many entries
    1905          for i in range(70000):
    1906              k, v = DummyKey(i), DummyValue(i)
    1907              keys.append(k)
    1908              values.append(v)
    1909              d[k] = v
    1910              del k
    1911              del v
    1912  
    1913          t_copy = threading.Thread(target=dict_copy, args=(d, exc,))
    1914          if type_ is weakref.WeakKeyDictionary:
    1915              t_collect = threading.Thread(target=pop_and_collect, args=(keys,))
    1916          else:  # weakref.WeakValueDictionary
    1917              t_collect = threading.Thread(target=pop_and_collect, args=(values,))
    1918  
    1919          t_copy.start()
    1920          t_collect.start()
    1921  
    1922          t_copy.join()
    1923          t_collect.join()
    1924  
    1925          # Test exceptions
    1926          if exc:
    1927              raise exc[0]
    1928  
    1929      @threading_helper.requires_working_threading()
    1930      def test_threaded_weak_key_dict_copy(self):
    1931          # Issue #35615: Weakref keys or values getting GC'ed during dict
    1932          # copying should not result in a crash.
    1933          self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, False)
    1934  
    1935      @threading_helper.requires_working_threading()
    1936      @support.requires_resource('cpu')
    1937      def test_threaded_weak_key_dict_deepcopy(self):
    1938          # Issue #35615: Weakref keys or values getting GC'ed during dict
    1939          # copying should not result in a crash.
    1940          self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, True)
    1941  
    1942      @threading_helper.requires_working_threading()
    1943      def test_threaded_weak_value_dict_copy(self):
    1944          # Issue #35615: Weakref keys or values getting GC'ed during dict
    1945          # copying should not result in a crash.
    1946          self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, False)
    1947  
    1948      @threading_helper.requires_working_threading()
    1949      @support.requires_resource('cpu')
    1950      def test_threaded_weak_value_dict_deepcopy(self):
    1951          # Issue #35615: Weakref keys or values getting GC'ed during dict
    1952          # copying should not result in a crash.
    1953          self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, True)
    1954  
    1955      @support.cpython_only
    1956      def test_remove_closure(self):
    1957          d = weakref.WeakValueDictionary()
    1958          self.assertIsNone(d._remove.__closure__)
    1959  
    1960  
    1961  from test import mapping_tests
    1962  
    1963  class ESC[4;38;5;81mWeakValueDictionaryTestCase(ESC[4;38;5;149mmapping_testsESC[4;38;5;149m.ESC[4;38;5;149mBasicTestMappingProtocol):
    1964      """Check that WeakValueDictionary conforms to the mapping protocol"""
    1965      __ref = {"key1":Object(1), "key2":Object(2), "key3":Object(3)}
    1966      type2test = weakref.WeakValueDictionary
    1967      def _reference(self):
    1968          return self.__ref.copy()
    1969  
    1970  class ESC[4;38;5;81mWeakKeyDictionaryTestCase(ESC[4;38;5;149mmapping_testsESC[4;38;5;149m.ESC[4;38;5;149mBasicTestMappingProtocol):
    1971      """Check that WeakKeyDictionary conforms to the mapping protocol"""
    1972      __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3}
    1973      type2test = weakref.WeakKeyDictionary
    1974      def _reference(self):
    1975          return self.__ref.copy()
    1976  
    1977  
    1978  class ESC[4;38;5;81mFinalizeTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1979  
    1980      class ESC[4;38;5;81mA:
    1981          pass
    1982  
    1983      def _collect_if_necessary(self):
    1984          # we create no ref-cycles so in CPython no gc should be needed
    1985          if sys.implementation.name != 'cpython':
    1986              support.gc_collect()
    1987  
    1988      def test_finalize(self):
    1989          def add(x,y,z):
    1990              res.append(x + y + z)
    1991              return x + y + z
    1992  
    1993          a = self.A()
    1994  
    1995          res = []
    1996          f = weakref.finalize(a, add, 67, 43, z=89)
    1997          self.assertEqual(f.alive, True)
    1998          self.assertEqual(f.peek(), (a, add, (67,43), {'z':89}))
    1999          self.assertEqual(f(), 199)
    2000          self.assertEqual(f(), None)
    2001          self.assertEqual(f(), None)
    2002          self.assertEqual(f.peek(), None)
    2003          self.assertEqual(f.detach(), None)
    2004          self.assertEqual(f.alive, False)
    2005          self.assertEqual(res, [199])
    2006  
    2007          res = []
    2008          f = weakref.finalize(a, add, 67, 43, 89)
    2009          self.assertEqual(f.peek(), (a, add, (67,43,89), {}))
    2010          self.assertEqual(f.detach(), (a, add, (67,43,89), {}))
    2011          self.assertEqual(f(), None)
    2012          self.assertEqual(f(), None)
    2013          self.assertEqual(f.peek(), None)
    2014          self.assertEqual(f.detach(), None)
    2015          self.assertEqual(f.alive, False)
    2016          self.assertEqual(res, [])
    2017  
    2018          res = []
    2019          f = weakref.finalize(a, add, x=67, y=43, z=89)
    2020          del a
    2021          self._collect_if_necessary()
    2022          self.assertEqual(f(), None)
    2023          self.assertEqual(f(), None)
    2024          self.assertEqual(f.peek(), None)
    2025          self.assertEqual(f.detach(), None)
    2026          self.assertEqual(f.alive, False)
    2027          self.assertEqual(res, [199])
    2028  
    2029      def test_arg_errors(self):
    2030          def fin(*args, **kwargs):
    2031              res.append((args, kwargs))
    2032  
    2033          a = self.A()
    2034  
    2035          res = []
    2036          f = weakref.finalize(a, fin, 1, 2, func=3, obj=4)
    2037          self.assertEqual(f.peek(), (a, fin, (1, 2), {'func': 3, 'obj': 4}))
    2038          f()
    2039          self.assertEqual(res, [((1, 2), {'func': 3, 'obj': 4})])
    2040  
    2041          with self.assertRaises(TypeError):
    2042              weakref.finalize(a, func=fin, arg=1)
    2043          with self.assertRaises(TypeError):
    2044              weakref.finalize(obj=a, func=fin, arg=1)
    2045          self.assertRaises(TypeError, weakref.finalize, a)
    2046          self.assertRaises(TypeError, weakref.finalize)
    2047  
    2048      def test_order(self):
    2049          a = self.A()
    2050          res = []
    2051  
    2052          f1 = weakref.finalize(a, res.append, 'f1')
    2053          f2 = weakref.finalize(a, res.append, 'f2')
    2054          f3 = weakref.finalize(a, res.append, 'f3')
    2055          f4 = weakref.finalize(a, res.append, 'f4')
    2056          f5 = weakref.finalize(a, res.append, 'f5')
    2057  
    2058          # make sure finalizers can keep themselves alive
    2059          del f1, f4
    2060  
    2061          self.assertTrue(f2.alive)
    2062          self.assertTrue(f3.alive)
    2063          self.assertTrue(f5.alive)
    2064  
    2065          self.assertTrue(f5.detach())
    2066          self.assertFalse(f5.alive)
    2067  
    2068          f5()                       # nothing because previously unregistered
    2069          res.append('A')
    2070          f3()                       # => res.append('f3')
    2071          self.assertFalse(f3.alive)
    2072          res.append('B')
    2073          f3()                       # nothing because previously called
    2074          res.append('C')
    2075          del a
    2076          self._collect_if_necessary()
    2077                                     # => res.append('f4')
    2078                                     # => res.append('f2')
    2079                                     # => res.append('f1')
    2080          self.assertFalse(f2.alive)
    2081          res.append('D')
    2082          f2()                       # nothing because previously called by gc
    2083  
    2084          expected = ['A', 'f3', 'B', 'C', 'f4', 'f2', 'f1', 'D']
    2085          self.assertEqual(res, expected)
    2086  
    2087      def test_all_freed(self):
    2088          # we want a weakrefable subclass of weakref.finalize
    2089          class ESC[4;38;5;81mMyFinalizer(ESC[4;38;5;149mweakrefESC[4;38;5;149m.ESC[4;38;5;149mfinalize):
    2090              pass
    2091  
    2092          a = self.A()
    2093          res = []
    2094          def callback():
    2095              res.append(123)
    2096          f = MyFinalizer(a, callback)
    2097  
    2098          wr_callback = weakref.ref(callback)
    2099          wr_f = weakref.ref(f)
    2100          del callback, f
    2101  
    2102          self.assertIsNotNone(wr_callback())
    2103          self.assertIsNotNone(wr_f())
    2104  
    2105          del a
    2106          self._collect_if_necessary()
    2107  
    2108          self.assertIsNone(wr_callback())
    2109          self.assertIsNone(wr_f())
    2110          self.assertEqual(res, [123])
    2111  
    2112      @classmethod
    2113      def run_in_child(cls):
    2114          def error():
    2115              # Create an atexit finalizer from inside a finalizer called
    2116              # at exit.  This should be the next to be run.
    2117              g1 = weakref.finalize(cls, print, 'g1')
    2118              print('f3 error')
    2119              1/0
    2120  
    2121          # cls should stay alive till atexit callbacks run
    2122          f1 = weakref.finalize(cls, print, 'f1', _global_var)
    2123          f2 = weakref.finalize(cls, print, 'f2', _global_var)
    2124          f3 = weakref.finalize(cls, error)
    2125          f4 = weakref.finalize(cls, print, 'f4', _global_var)
    2126  
    2127          assert f1.atexit == True
    2128          f2.atexit = False
    2129          assert f3.atexit == True
    2130          assert f4.atexit == True
    2131  
    2132      def test_atexit(self):
    2133          prog = ('from test.test_weakref import FinalizeTestCase;'+
    2134                  'FinalizeTestCase.run_in_child()')
    2135          rc, out, err = script_helper.assert_python_ok('-c', prog)
    2136          out = out.decode('ascii').splitlines()
    2137          self.assertEqual(out, ['f4 foobar', 'f3 error', 'g1', 'f1 foobar'])
    2138          self.assertTrue(b'ZeroDivisionError' in err)
    2139  
    2140  
    2141  class ESC[4;38;5;81mModuleTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    2142      def test_names(self):
    2143          for name in ('ReferenceType', 'ProxyType', 'CallableProxyType',
    2144                       'WeakMethod', 'WeakSet', 'WeakKeyDictionary', 'WeakValueDictionary'):
    2145              obj = getattr(weakref, name)
    2146              if name != 'WeakSet':
    2147                  self.assertEqual(obj.__module__, 'weakref')
    2148              self.assertEqual(obj.__name__, name)
    2149              self.assertEqual(obj.__qualname__, name)
    2150  
    2151  
    2152  libreftest = """ Doctest for examples in the library reference: weakref.rst
    2153  
    2154  >>> from test.support import gc_collect
    2155  >>> import weakref
    2156  >>> class Dict(dict):
    2157  ...     pass
    2158  ...
    2159  >>> obj = Dict(red=1, green=2, blue=3)   # this object is weak referencable
    2160  >>> r = weakref.ref(obj)
    2161  >>> print(r() is obj)
    2162  True
    2163  
    2164  >>> import weakref
    2165  >>> class Object:
    2166  ...     pass
    2167  ...
    2168  >>> o = Object()
    2169  >>> r = weakref.ref(o)
    2170  >>> o2 = r()
    2171  >>> o is o2
    2172  True
    2173  >>> del o, o2
    2174  >>> gc_collect()  # For PyPy or other GCs.
    2175  >>> print(r())
    2176  None
    2177  
    2178  >>> import weakref
    2179  >>> class ExtendedRef(weakref.ref):
    2180  ...     def __init__(self, ob, callback=None, **annotations):
    2181  ...         super().__init__(ob, callback)
    2182  ...         self.__counter = 0
    2183  ...         for k, v in annotations.items():
    2184  ...             setattr(self, k, v)
    2185  ...     def __call__(self):
    2186  ...         '''Return a pair containing the referent and the number of
    2187  ...         times the reference has been called.
    2188  ...         '''
    2189  ...         ob = super().__call__()
    2190  ...         if ob is not None:
    2191  ...             self.__counter += 1
    2192  ...             ob = (ob, self.__counter)
    2193  ...         return ob
    2194  ...
    2195  >>> class A:   # not in docs from here, just testing the ExtendedRef
    2196  ...     pass
    2197  ...
    2198  >>> a = A()
    2199  >>> r = ExtendedRef(a, foo=1, bar="baz")
    2200  >>> r.foo
    2201  1
    2202  >>> r.bar
    2203  'baz'
    2204  >>> r()[1]
    2205  1
    2206  >>> r()[1]
    2207  2
    2208  >>> r()[0] is a
    2209  True
    2210  
    2211  
    2212  >>> import weakref
    2213  >>> _id2obj_dict = weakref.WeakValueDictionary()
    2214  >>> def remember(obj):
    2215  ...     oid = id(obj)
    2216  ...     _id2obj_dict[oid] = obj
    2217  ...     return oid
    2218  ...
    2219  >>> def id2obj(oid):
    2220  ...     return _id2obj_dict[oid]
    2221  ...
    2222  >>> a = A()             # from here, just testing
    2223  >>> a_id = remember(a)
    2224  >>> id2obj(a_id) is a
    2225  True
    2226  >>> del a
    2227  >>> gc_collect()  # For PyPy or other GCs.
    2228  >>> try:
    2229  ...     id2obj(a_id)
    2230  ... except KeyError:
    2231  ...     print('OK')
    2232  ... else:
    2233  ...     print('WeakValueDictionary error')
    2234  OK
    2235  
    2236  """
    2237  
    2238  __test__ = {'libreftest' : libreftest}
    2239  
    2240  def load_tests(loader, tests, pattern):
    2241      tests.addTest(doctest.DocTestSuite())
    2242      return tests
    2243  
    2244  
    2245  if __name__ == "__main__":
    2246      unittest.main()