(root)/
Python-3.12.0/
Lib/
test/
test_threading_local.py
       1  import sys
       2  import unittest
       3  from doctest import DocTestSuite
       4  from test import support
       5  from test.support import threading_helper
       6  from test.support.import_helper import import_module
       7  import weakref
       8  
       9  # Modules under test
      10  import _thread
      11  import threading
      12  import _threading_local
      13  
      14  
      15  threading_helper.requires_working_threading(module=True)
      16  
      17  
      18  class ESC[4;38;5;81mWeak(ESC[4;38;5;149mobject):
      19      pass
      20  
      21  def target(local, weaklist):
      22      weak = Weak()
      23      local.weak = weak
      24      weaklist.append(weakref.ref(weak))
      25  
      26  
      27  class ESC[4;38;5;81mBaseLocalTest:
      28  
      29      def test_local_refs(self):
      30          self._local_refs(20)
      31          self._local_refs(50)
      32          self._local_refs(100)
      33  
      34      def _local_refs(self, n):
      35          local = self._local()
      36          weaklist = []
      37          for i in range(n):
      38              t = threading.Thread(target=target, args=(local, weaklist))
      39              t.start()
      40              t.join()
      41          del t
      42  
      43          support.gc_collect()  # For PyPy or other GCs.
      44          self.assertEqual(len(weaklist), n)
      45  
      46          # XXX _threading_local keeps the local of the last stopped thread alive.
      47          deadlist = [weak for weak in weaklist if weak() is None]
      48          self.assertIn(len(deadlist), (n-1, n))
      49  
      50          # Assignment to the same thread local frees it sometimes (!)
      51          local.someothervar = None
      52          support.gc_collect()  # For PyPy or other GCs.
      53          deadlist = [weak for weak in weaklist if weak() is None]
      54          self.assertIn(len(deadlist), (n-1, n), (n, len(deadlist)))
      55  
      56      def test_derived(self):
      57          # Issue 3088: if there is a threads switch inside the __init__
      58          # of a threading.local derived class, the per-thread dictionary
      59          # is created but not correctly set on the object.
      60          # The first member set may be bogus.
      61          import time
      62          class ESC[4;38;5;81mLocal(ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149m_local):
      63              def __init__(self):
      64                  time.sleep(0.01)
      65          local = Local()
      66  
      67          def f(i):
      68              local.x = i
      69              # Simply check that the variable is correctly set
      70              self.assertEqual(local.x, i)
      71  
      72          with threading_helper.start_threads(threading.Thread(target=f, args=(i,))
      73                                              for i in range(10)):
      74              pass
      75  
      76      def test_derived_cycle_dealloc(self):
      77          # http://bugs.python.org/issue6990
      78          class ESC[4;38;5;81mLocal(ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149m_local):
      79              pass
      80          locals = None
      81          passed = False
      82          e1 = threading.Event()
      83          e2 = threading.Event()
      84  
      85          def f():
      86              nonlocal passed
      87              # 1) Involve Local in a cycle
      88              cycle = [Local()]
      89              cycle.append(cycle)
      90              cycle[0].foo = 'bar'
      91  
      92              # 2) GC the cycle (triggers threadmodule.c::local_clear
      93              # before local_dealloc)
      94              del cycle
      95              support.gc_collect()  # For PyPy or other GCs.
      96              e1.set()
      97              e2.wait()
      98  
      99              # 4) New Locals should be empty
     100              passed = all(not hasattr(local, 'foo') for local in locals)
     101  
     102          t = threading.Thread(target=f)
     103          t.start()
     104          e1.wait()
     105  
     106          # 3) New Locals should recycle the original's address. Creating
     107          # them in the thread overwrites the thread state and avoids the
     108          # bug
     109          locals = [Local() for i in range(10)]
     110          e2.set()
     111          t.join()
     112  
     113          self.assertTrue(passed)
     114  
     115      def test_arguments(self):
     116          # Issue 1522237
     117          class ESC[4;38;5;81mMyLocal(ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149m_local):
     118              def __init__(self, *args, **kwargs):
     119                  pass
     120  
     121          MyLocal(a=1)
     122          MyLocal(1)
     123          self.assertRaises(TypeError, self._local, a=1)
     124          self.assertRaises(TypeError, self._local, 1)
     125  
     126      def _test_one_class(self, c):
     127          self._failed = "No error message set or cleared."
     128          obj = c()
     129          e1 = threading.Event()
     130          e2 = threading.Event()
     131  
     132          def f1():
     133              obj.x = 'foo'
     134              obj.y = 'bar'
     135              del obj.y
     136              e1.set()
     137              e2.wait()
     138  
     139          def f2():
     140              try:
     141                  foo = obj.x
     142              except AttributeError:
     143                  # This is expected -- we haven't set obj.x in this thread yet!
     144                  self._failed = ""  # passed
     145              else:
     146                  self._failed = ('Incorrectly got value %r from class %r\n' %
     147                                  (foo, c))
     148                  sys.stderr.write(self._failed)
     149  
     150          t1 = threading.Thread(target=f1)
     151          t1.start()
     152          e1.wait()
     153          t2 = threading.Thread(target=f2)
     154          t2.start()
     155          t2.join()
     156          # The test is done; just let t1 know it can exit, and wait for it.
     157          e2.set()
     158          t1.join()
     159  
     160          self.assertFalse(self._failed, self._failed)
     161  
     162      def test_threading_local(self):
     163          self._test_one_class(self._local)
     164  
     165      def test_threading_local_subclass(self):
     166          class ESC[4;38;5;81mLocalSubclass(ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149m_local):
     167              """To test that subclasses behave properly."""
     168          self._test_one_class(LocalSubclass)
     169  
     170      def _test_dict_attribute(self, cls):
     171          obj = cls()
     172          obj.x = 5
     173          self.assertEqual(obj.__dict__, {'x': 5})
     174          with self.assertRaises(AttributeError):
     175              obj.__dict__ = {}
     176          with self.assertRaises(AttributeError):
     177              del obj.__dict__
     178  
     179      def test_dict_attribute(self):
     180          self._test_dict_attribute(self._local)
     181  
     182      def test_dict_attribute_subclass(self):
     183          class ESC[4;38;5;81mLocalSubclass(ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149m_local):
     184              """To test that subclasses behave properly."""
     185          self._test_dict_attribute(LocalSubclass)
     186  
     187      def test_cycle_collection(self):
     188          class ESC[4;38;5;81mX:
     189              pass
     190  
     191          x = X()
     192          x.local = self._local()
     193          x.local.x = x
     194          wr = weakref.ref(x)
     195          del x
     196          support.gc_collect()  # For PyPy or other GCs.
     197          self.assertIsNone(wr())
     198  
     199  
     200      def test_threading_local_clear_race(self):
     201          # See https://github.com/python/cpython/issues/100892
     202  
     203          _testcapi = import_module('_testcapi')
     204          _testcapi.call_in_temporary_c_thread(lambda: None, False)
     205  
     206          for _ in range(1000):
     207              _ = threading.local()
     208  
     209          _testcapi.join_temporary_c_thread()
     210  
     211  
     212  class ESC[4;38;5;81mThreadLocalTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase, ESC[4;38;5;149mBaseLocalTest):
     213      _local = _thread._local
     214  
     215  class ESC[4;38;5;81mPyThreadingLocalTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase, ESC[4;38;5;149mBaseLocalTest):
     216      _local = _threading_local.local
     217  
     218  
     219  def load_tests(loader, tests, pattern):
     220      tests.addTest(DocTestSuite('_threading_local'))
     221  
     222      local_orig = _threading_local.local
     223      def setUp(test):
     224          _threading_local.local = _thread._local
     225      def tearDown(test):
     226          _threading_local.local = local_orig
     227      tests.addTests(DocTestSuite('_threading_local',
     228                                  setUp=setUp, tearDown=tearDown)
     229                     )
     230      return tests
     231  
     232  
     233  if __name__ == '__main__':
     234      unittest.main()