(root)/
Python-3.12.0/
Lib/
test/
test_finalization.py
       1  """
       2  Tests for object finalization semantics, as outlined in PEP 442.
       3  """
       4  
       5  import contextlib
       6  import gc
       7  import unittest
       8  import weakref
       9  
      10  try:
      11      from _testcapi import with_tp_del
      12  except ImportError:
      13      def with_tp_del(cls):
      14          class ESC[4;38;5;81mC(ESC[4;38;5;149mobject):
      15              def __new__(cls, *args, **kwargs):
      16                  raise TypeError('requires _testcapi.with_tp_del')
      17          return C
      18  
      19  try:
      20      from _testcapi import without_gc
      21  except ImportError:
      22      def without_gc(cls):
      23          class ESC[4;38;5;81mC:
      24              def __new__(cls, *args, **kwargs):
      25                  raise TypeError('requires _testcapi.without_gc')
      26          return C
      27  
      28  from test import support
      29  
      30  
      31  class ESC[4;38;5;81mNonGCSimpleBase:
      32      """
      33      The base class for all the objects under test, equipped with various
      34      testing features.
      35      """
      36  
      37      survivors = []
      38      del_calls = []
      39      tp_del_calls = []
      40      errors = []
      41  
      42      _cleaning = False
      43  
      44      __slots__ = ()
      45  
      46      @classmethod
      47      def _cleanup(cls):
      48          cls.survivors.clear()
      49          cls.errors.clear()
      50          gc.garbage.clear()
      51          gc.collect()
      52          cls.del_calls.clear()
      53          cls.tp_del_calls.clear()
      54  
      55      @classmethod
      56      @contextlib.contextmanager
      57      def test(cls):
      58          """
      59          A context manager to use around all finalization tests.
      60          """
      61          with support.disable_gc():
      62              cls.del_calls.clear()
      63              cls.tp_del_calls.clear()
      64              NonGCSimpleBase._cleaning = False
      65              try:
      66                  yield
      67                  if cls.errors:
      68                      raise cls.errors[0]
      69              finally:
      70                  NonGCSimpleBase._cleaning = True
      71                  cls._cleanup()
      72  
      73      def check_sanity(self):
      74          """
      75          Check the object is sane (non-broken).
      76          """
      77  
      78      def __del__(self):
      79          """
      80          PEP 442 finalizer.  Record that this was called, check the
      81          object is in a sane state, and invoke a side effect.
      82          """
      83          try:
      84              if not self._cleaning:
      85                  self.del_calls.append(id(self))
      86                  self.check_sanity()
      87                  self.side_effect()
      88          except Exception as e:
      89              self.errors.append(e)
      90  
      91      def side_effect(self):
      92          """
      93          A side effect called on destruction.
      94          """
      95  
      96  
      97  class ESC[4;38;5;81mSimpleBase(ESC[4;38;5;149mNonGCSimpleBase):
      98  
      99      def __init__(self):
     100          self.id_ = id(self)
     101  
     102      def check_sanity(self):
     103          assert self.id_ == id(self)
     104  
     105  
     106  @without_gc
     107  class ESC[4;38;5;81mNonGC(ESC[4;38;5;149mNonGCSimpleBase):
     108      __slots__ = ()
     109  
     110  @without_gc
     111  class ESC[4;38;5;81mNonGCResurrector(ESC[4;38;5;149mNonGCSimpleBase):
     112      __slots__ = ()
     113  
     114      def side_effect(self):
     115          """
     116          Resurrect self by storing self in a class-wide list.
     117          """
     118          self.survivors.append(self)
     119  
     120  class ESC[4;38;5;81mSimple(ESC[4;38;5;149mSimpleBase):
     121      pass
     122  
     123  # Can't inherit from NonGCResurrector, in case importing without_gc fails.
     124  class ESC[4;38;5;81mSimpleResurrector(ESC[4;38;5;149mSimpleBase):
     125  
     126      def side_effect(self):
     127          """
     128          Resurrect self by storing self in a class-wide list.
     129          """
     130          self.survivors.append(self)
     131  
     132  
     133  class ESC[4;38;5;81mTestBase:
     134  
     135      def setUp(self):
     136          self.old_garbage = gc.garbage[:]
     137          gc.garbage[:] = []
     138  
     139      def tearDown(self):
     140          # None of the tests here should put anything in gc.garbage
     141          try:
     142              self.assertEqual(gc.garbage, [])
     143          finally:
     144              del self.old_garbage
     145              gc.collect()
     146  
     147      def assert_del_calls(self, ids):
     148          self.assertEqual(sorted(SimpleBase.del_calls), sorted(ids))
     149  
     150      def assert_tp_del_calls(self, ids):
     151          self.assertEqual(sorted(SimpleBase.tp_del_calls), sorted(ids))
     152  
     153      def assert_survivors(self, ids):
     154          self.assertEqual(sorted(id(x) for x in SimpleBase.survivors), sorted(ids))
     155  
     156      def assert_garbage(self, ids):
     157          self.assertEqual(sorted(id(x) for x in gc.garbage), sorted(ids))
     158  
     159      def clear_survivors(self):
     160          SimpleBase.survivors.clear()
     161  
     162  
     163  class ESC[4;38;5;81mSimpleFinalizationTest(ESC[4;38;5;149mTestBase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     164      """
     165      Test finalization without refcycles.
     166      """
     167  
     168      def test_simple(self):
     169          with SimpleBase.test():
     170              s = Simple()
     171              ids = [id(s)]
     172              wr = weakref.ref(s)
     173              del s
     174              gc.collect()
     175              self.assert_del_calls(ids)
     176              self.assert_survivors([])
     177              self.assertIs(wr(), None)
     178              gc.collect()
     179              self.assert_del_calls(ids)
     180              self.assert_survivors([])
     181  
     182      def test_simple_resurrect(self):
     183          with SimpleBase.test():
     184              s = SimpleResurrector()
     185              ids = [id(s)]
     186              wr = weakref.ref(s)
     187              del s
     188              gc.collect()
     189              self.assert_del_calls(ids)
     190              self.assert_survivors(ids)
     191              self.assertIsNot(wr(), None)
     192              self.clear_survivors()
     193              gc.collect()
     194              self.assert_del_calls(ids)
     195              self.assert_survivors([])
     196          self.assertIs(wr(), None)
     197  
     198      @support.cpython_only
     199      def test_non_gc(self):
     200          with SimpleBase.test():
     201              s = NonGC()
     202              self.assertFalse(gc.is_tracked(s))
     203              ids = [id(s)]
     204              del s
     205              gc.collect()
     206              self.assert_del_calls(ids)
     207              self.assert_survivors([])
     208              gc.collect()
     209              self.assert_del_calls(ids)
     210              self.assert_survivors([])
     211  
     212      @support.cpython_only
     213      def test_non_gc_resurrect(self):
     214          with SimpleBase.test():
     215              s = NonGCResurrector()
     216              self.assertFalse(gc.is_tracked(s))
     217              ids = [id(s)]
     218              del s
     219              gc.collect()
     220              self.assert_del_calls(ids)
     221              self.assert_survivors(ids)
     222              self.clear_survivors()
     223              gc.collect()
     224              self.assert_del_calls(ids * 2)
     225              self.assert_survivors(ids)
     226  
     227  
     228  class ESC[4;38;5;81mSelfCycleBase:
     229  
     230      def __init__(self):
     231          super().__init__()
     232          self.ref = self
     233  
     234      def check_sanity(self):
     235          super().check_sanity()
     236          assert self.ref is self
     237  
     238  class ESC[4;38;5;81mSimpleSelfCycle(ESC[4;38;5;149mSelfCycleBase, ESC[4;38;5;149mSimple):
     239      pass
     240  
     241  class ESC[4;38;5;81mSelfCycleResurrector(ESC[4;38;5;149mSelfCycleBase, ESC[4;38;5;149mSimpleResurrector):
     242      pass
     243  
     244  class ESC[4;38;5;81mSuicidalSelfCycle(ESC[4;38;5;149mSelfCycleBase, ESC[4;38;5;149mSimple):
     245  
     246      def side_effect(self):
     247          """
     248          Explicitly break the reference cycle.
     249          """
     250          self.ref = None
     251  
     252  
     253  class ESC[4;38;5;81mSelfCycleFinalizationTest(ESC[4;38;5;149mTestBase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     254      """
     255      Test finalization of an object having a single cyclic reference to
     256      itself.
     257      """
     258  
     259      def test_simple(self):
     260          with SimpleBase.test():
     261              s = SimpleSelfCycle()
     262              ids = [id(s)]
     263              wr = weakref.ref(s)
     264              del s
     265              gc.collect()
     266              self.assert_del_calls(ids)
     267              self.assert_survivors([])
     268              self.assertIs(wr(), None)
     269              gc.collect()
     270              self.assert_del_calls(ids)
     271              self.assert_survivors([])
     272  
     273      def test_simple_resurrect(self):
     274          # Test that __del__ can resurrect the object being finalized.
     275          with SimpleBase.test():
     276              s = SelfCycleResurrector()
     277              ids = [id(s)]
     278              wr = weakref.ref(s)
     279              del s
     280              gc.collect()
     281              self.assert_del_calls(ids)
     282              self.assert_survivors(ids)
     283              # XXX is this desirable?
     284              self.assertIs(wr(), None)
     285              # When trying to destroy the object a second time, __del__
     286              # isn't called anymore (and the object isn't resurrected).
     287              self.clear_survivors()
     288              gc.collect()
     289              self.assert_del_calls(ids)
     290              self.assert_survivors([])
     291              self.assertIs(wr(), None)
     292  
     293      def test_simple_suicide(self):
     294          # Test the GC is able to deal with an object that kills its last
     295          # reference during __del__.
     296          with SimpleBase.test():
     297              s = SuicidalSelfCycle()
     298              ids = [id(s)]
     299              wr = weakref.ref(s)
     300              del s
     301              gc.collect()
     302              self.assert_del_calls(ids)
     303              self.assert_survivors([])
     304              self.assertIs(wr(), None)
     305              gc.collect()
     306              self.assert_del_calls(ids)
     307              self.assert_survivors([])
     308              self.assertIs(wr(), None)
     309  
     310  
     311  class ESC[4;38;5;81mChainedBase:
     312  
     313      def chain(self, left):
     314          self.suicided = False
     315          self.left = left
     316          left.right = self
     317  
     318      def check_sanity(self):
     319          super().check_sanity()
     320          if self.suicided:
     321              assert self.left is None
     322              assert self.right is None
     323          else:
     324              left = self.left
     325              if left.suicided:
     326                  assert left.right is None
     327              else:
     328                  assert left.right is self
     329              right = self.right
     330              if right.suicided:
     331                  assert right.left is None
     332              else:
     333                  assert right.left is self
     334  
     335  class ESC[4;38;5;81mSimpleChained(ESC[4;38;5;149mChainedBase, ESC[4;38;5;149mSimple):
     336      pass
     337  
     338  class ESC[4;38;5;81mChainedResurrector(ESC[4;38;5;149mChainedBase, ESC[4;38;5;149mSimpleResurrector):
     339      pass
     340  
     341  class ESC[4;38;5;81mSuicidalChained(ESC[4;38;5;149mChainedBase, ESC[4;38;5;149mSimple):
     342  
     343      def side_effect(self):
     344          """
     345          Explicitly break the reference cycle.
     346          """
     347          self.suicided = True
     348          self.left = None
     349          self.right = None
     350  
     351  
     352  class ESC[4;38;5;81mCycleChainFinalizationTest(ESC[4;38;5;149mTestBase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     353      """
     354      Test finalization of a cyclic chain.  These tests are similar in
     355      spirit to the self-cycle tests above, but the collectable object
     356      graph isn't trivial anymore.
     357      """
     358  
     359      def build_chain(self, classes):
     360          nodes = [cls() for cls in classes]
     361          for i in range(len(nodes)):
     362              nodes[i].chain(nodes[i-1])
     363          return nodes
     364  
     365      def check_non_resurrecting_chain(self, classes):
     366          N = len(classes)
     367          with SimpleBase.test():
     368              nodes = self.build_chain(classes)
     369              ids = [id(s) for s in nodes]
     370              wrs = [weakref.ref(s) for s in nodes]
     371              del nodes
     372              gc.collect()
     373              self.assert_del_calls(ids)
     374              self.assert_survivors([])
     375              self.assertEqual([wr() for wr in wrs], [None] * N)
     376              gc.collect()
     377              self.assert_del_calls(ids)
     378  
     379      def check_resurrecting_chain(self, classes):
     380          N = len(classes)
     381          with SimpleBase.test():
     382              nodes = self.build_chain(classes)
     383              N = len(nodes)
     384              ids = [id(s) for s in nodes]
     385              survivor_ids = [id(s) for s in nodes if isinstance(s, SimpleResurrector)]
     386              wrs = [weakref.ref(s) for s in nodes]
     387              del nodes
     388              gc.collect()
     389              self.assert_del_calls(ids)
     390              self.assert_survivors(survivor_ids)
     391              # XXX desirable?
     392              self.assertEqual([wr() for wr in wrs], [None] * N)
     393              self.clear_survivors()
     394              gc.collect()
     395              self.assert_del_calls(ids)
     396              self.assert_survivors([])
     397  
     398      def test_homogenous(self):
     399          self.check_non_resurrecting_chain([SimpleChained] * 3)
     400  
     401      def test_homogenous_resurrect(self):
     402          self.check_resurrecting_chain([ChainedResurrector] * 3)
     403  
     404      def test_homogenous_suicidal(self):
     405          self.check_non_resurrecting_chain([SuicidalChained] * 3)
     406  
     407      def test_heterogenous_suicidal_one(self):
     408          self.check_non_resurrecting_chain([SuicidalChained, SimpleChained] * 2)
     409  
     410      def test_heterogenous_suicidal_two(self):
     411          self.check_non_resurrecting_chain(
     412              [SuicidalChained] * 2 + [SimpleChained] * 2)
     413  
     414      def test_heterogenous_resurrect_one(self):
     415          self.check_resurrecting_chain([ChainedResurrector, SimpleChained] * 2)
     416  
     417      def test_heterogenous_resurrect_two(self):
     418          self.check_resurrecting_chain(
     419              [ChainedResurrector, SimpleChained, SuicidalChained] * 2)
     420  
     421      def test_heterogenous_resurrect_three(self):
     422          self.check_resurrecting_chain(
     423              [ChainedResurrector] * 2 + [SimpleChained] * 2 + [SuicidalChained] * 2)
     424  
     425  
     426  # NOTE: the tp_del slot isn't automatically inherited, so we have to call
     427  # with_tp_del() for each instantiated class.
     428  
     429  class ESC[4;38;5;81mLegacyBase(ESC[4;38;5;149mSimpleBase):
     430  
     431      def __del__(self):
     432          try:
     433              # Do not invoke side_effect here, since we are now exercising
     434              # the tp_del slot.
     435              if not self._cleaning:
     436                  self.del_calls.append(id(self))
     437                  self.check_sanity()
     438          except Exception as e:
     439              self.errors.append(e)
     440  
     441      def __tp_del__(self):
     442          """
     443          Legacy (pre-PEP 442) finalizer, mapped to a tp_del slot.
     444          """
     445          try:
     446              if not self._cleaning:
     447                  self.tp_del_calls.append(id(self))
     448                  self.check_sanity()
     449                  self.side_effect()
     450          except Exception as e:
     451              self.errors.append(e)
     452  
     453  @with_tp_del
     454  class ESC[4;38;5;81mLegacy(ESC[4;38;5;149mLegacyBase):
     455      pass
     456  
     457  @with_tp_del
     458  class ESC[4;38;5;81mLegacyResurrector(ESC[4;38;5;149mLegacyBase):
     459  
     460      def side_effect(self):
     461          """
     462          Resurrect self by storing self in a class-wide list.
     463          """
     464          self.survivors.append(self)
     465  
     466  @with_tp_del
     467  class ESC[4;38;5;81mLegacySelfCycle(ESC[4;38;5;149mSelfCycleBase, ESC[4;38;5;149mLegacyBase):
     468      pass
     469  
     470  
     471  @support.cpython_only
     472  class ESC[4;38;5;81mLegacyFinalizationTest(ESC[4;38;5;149mTestBase, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     473      """
     474      Test finalization of objects with a tp_del.
     475      """
     476  
     477      def tearDown(self):
     478          # These tests need to clean up a bit more, since they create
     479          # uncollectable objects.
     480          gc.garbage.clear()
     481          gc.collect()
     482          super().tearDown()
     483  
     484      def test_legacy(self):
     485          with SimpleBase.test():
     486              s = Legacy()
     487              ids = [id(s)]
     488              wr = weakref.ref(s)
     489              del s
     490              gc.collect()
     491              self.assert_del_calls(ids)
     492              self.assert_tp_del_calls(ids)
     493              self.assert_survivors([])
     494              self.assertIs(wr(), None)
     495              gc.collect()
     496              self.assert_del_calls(ids)
     497              self.assert_tp_del_calls(ids)
     498  
     499      def test_legacy_resurrect(self):
     500          with SimpleBase.test():
     501              s = LegacyResurrector()
     502              ids = [id(s)]
     503              wr = weakref.ref(s)
     504              del s
     505              gc.collect()
     506              self.assert_del_calls(ids)
     507              self.assert_tp_del_calls(ids)
     508              self.assert_survivors(ids)
     509              # weakrefs are cleared before tp_del is called.
     510              self.assertIs(wr(), None)
     511              self.clear_survivors()
     512              gc.collect()
     513              self.assert_del_calls(ids)
     514              self.assert_tp_del_calls(ids * 2)
     515              self.assert_survivors(ids)
     516          self.assertIs(wr(), None)
     517  
     518      def test_legacy_self_cycle(self):
     519          # Self-cycles with legacy finalizers end up in gc.garbage.
     520          with SimpleBase.test():
     521              s = LegacySelfCycle()
     522              ids = [id(s)]
     523              wr = weakref.ref(s)
     524              del s
     525              gc.collect()
     526              self.assert_del_calls([])
     527              self.assert_tp_del_calls([])
     528              self.assert_survivors([])
     529              self.assert_garbage(ids)
     530              self.assertIsNot(wr(), None)
     531              # Break the cycle to allow collection
     532              gc.garbage[0].ref = None
     533          self.assert_garbage([])
     534          self.assertIs(wr(), None)
     535  
     536  
     537  if __name__ == "__main__":
     538      unittest.main()