(root)/
Python-3.11.7/
Lib/
test/
lock_tests.py
       1  """
       2  Various tests for synchronization primitives.
       3  """
       4  
       5  import os
       6  import gc
       7  import sys
       8  import time
       9  from _thread import start_new_thread, TIMEOUT_MAX
      10  import threading
      11  import unittest
      12  import weakref
      13  
      14  from test import support
      15  from test.support import threading_helper
      16  
      17  
      18  requires_fork = unittest.skipUnless(support.has_fork_support,
      19                                      "platform doesn't support fork "
      20                                       "(no _at_fork_reinit method)")
      21  
      22  
      23  def wait_threads_blocked(nthread):
      24      # Arbitrary sleep to wait until N threads are blocked,
      25      # like waiting for a lock.
      26      time.sleep(0.010 * nthread)
      27  
      28  
      29  class ESC[4;38;5;81mBunch(ESC[4;38;5;149mobject):
      30      """
      31      A bunch of threads.
      32      """
      33      def __init__(self, func, nthread, wait_before_exit=False):
      34          """
      35          Construct a bunch of `nthread` threads running the same function `func`.
      36          If `wait_before_exit` is True, the threads won't terminate until
      37          do_finish() is called.
      38          """
      39          self.func = func
      40          self.nthread = nthread
      41          self.started = []
      42          self.finished = []
      43          self.exceptions = []
      44          self._can_exit = not wait_before_exit
      45          self._wait_thread = None
      46  
      47      def task(self):
      48          tid = threading.get_ident()
      49          self.started.append(tid)
      50          try:
      51              self.func()
      52          except BaseException as exc:
      53              self.exceptions.append(exc)
      54          finally:
      55              self.finished.append(tid)
      56              for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
      57                  if self._can_exit:
      58                      break
      59  
      60      def __enter__(self):
      61          self._wait_thread = threading_helper.wait_threads_exit(support.SHORT_TIMEOUT)
      62          self._wait_thread.__enter__()
      63  
      64          try:
      65              for _ in range(self.nthread):
      66                  start_new_thread(self.task, ())
      67          except:
      68              self._can_exit = True
      69              raise
      70  
      71          for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
      72              if len(self.started) >= self.nthread:
      73                  break
      74  
      75          return self
      76  
      77      def __exit__(self, exc_type, exc_value, traceback):
      78          for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
      79              if len(self.finished) >= self.nthread:
      80                  break
      81  
      82          # Wait until threads completely exit according to _thread._count()
      83          self._wait_thread.__exit__(None, None, None)
      84  
      85          # Break reference cycle
      86          exceptions = self.exceptions
      87          self.exceptions = None
      88          if exceptions:
      89              raise ExceptionGroup(f"{self.func} threads raised exceptions",
      90                                   exceptions)
      91  
      92      def do_finish(self):
      93          self._can_exit = True
      94  
      95  
      96  class ESC[4;38;5;81mBaseTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      97      def setUp(self):
      98          self._threads = threading_helper.threading_setup()
      99  
     100      def tearDown(self):
     101          threading_helper.threading_cleanup(*self._threads)
     102          support.reap_children()
     103  
     104      def assertTimeout(self, actual, expected):
     105          # The waiting and/or time.monotonic() can be imprecise, which
     106          # is why comparing to the expected value would sometimes fail
     107          # (especially under Windows).
     108          self.assertGreaterEqual(actual, expected * 0.6)
     109          # Test nothing insane happened
     110          self.assertLess(actual, expected * 10.0)
     111  
     112  
     113  class ESC[4;38;5;81mBaseLockTests(ESC[4;38;5;149mBaseTestCase):
     114      """
     115      Tests for both recursive and non-recursive locks.
     116      """
     117  
     118      def wait_phase(self, phase, expected):
     119          for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
     120              if len(phase) >= expected:
     121                  break
     122          self.assertEqual(len(phase), expected)
     123  
     124      def test_constructor(self):
     125          lock = self.locktype()
     126          del lock
     127  
     128      def test_repr(self):
     129          lock = self.locktype()
     130          self.assertRegex(repr(lock), "<unlocked .* object (.*)?at .*>")
     131          del lock
     132  
     133      def test_locked_repr(self):
     134          lock = self.locktype()
     135          lock.acquire()
     136          self.assertRegex(repr(lock), "<locked .* object (.*)?at .*>")
     137          del lock
     138  
     139      def test_acquire_destroy(self):
     140          lock = self.locktype()
     141          lock.acquire()
     142          del lock
     143  
     144      def test_acquire_release(self):
     145          lock = self.locktype()
     146          lock.acquire()
     147          lock.release()
     148          del lock
     149  
     150      def test_try_acquire(self):
     151          lock = self.locktype()
     152          self.assertTrue(lock.acquire(False))
     153          lock.release()
     154  
     155      def test_try_acquire_contended(self):
     156          lock = self.locktype()
     157          lock.acquire()
     158          result = []
     159          def f():
     160              result.append(lock.acquire(False))
     161          with Bunch(f, 1):
     162              pass
     163          self.assertFalse(result[0])
     164          lock.release()
     165  
     166      def test_acquire_contended(self):
     167          lock = self.locktype()
     168          lock.acquire()
     169          def f():
     170              lock.acquire()
     171              lock.release()
     172  
     173          N = 5
     174          with Bunch(f, N) as bunch:
     175              # Threads block on lock.acquire()
     176              wait_threads_blocked(N)
     177              self.assertEqual(len(bunch.finished), 0)
     178  
     179              # Threads unblocked
     180              lock.release()
     181  
     182          self.assertEqual(len(bunch.finished), N)
     183  
     184      def test_with(self):
     185          lock = self.locktype()
     186          def f():
     187              lock.acquire()
     188              lock.release()
     189  
     190          def with_lock(err=None):
     191              with lock:
     192                  if err is not None:
     193                      raise err
     194  
     195          # Acquire the lock, do nothing, with releases the lock
     196          with lock:
     197              pass
     198  
     199          # Check that the lock is unacquired
     200          with Bunch(f, 1):
     201              pass
     202  
     203          # Acquire the lock, raise an exception, with releases the lock
     204          with self.assertRaises(TypeError):
     205              with lock:
     206                  raise TypeError
     207  
     208          # Check that the lock is unacquired even if after an exception
     209          # was raised in the previous "with lock:" block
     210          with Bunch(f, 1):
     211              pass
     212  
     213      def test_thread_leak(self):
     214          # The lock shouldn't leak a Thread instance when used from a foreign
     215          # (non-threading) thread.
     216          lock = self.locktype()
     217          def f():
     218              lock.acquire()
     219              lock.release()
     220  
     221          # We run many threads in the hope that existing threads ids won't
     222          # be recycled.
     223          with Bunch(f, 15):
     224              pass
     225  
     226      def test_timeout(self):
     227          lock = self.locktype()
     228          # Can't set timeout if not blocking
     229          self.assertRaises(ValueError, lock.acquire, False, 1)
     230          # Invalid timeout values
     231          self.assertRaises(ValueError, lock.acquire, timeout=-100)
     232          self.assertRaises(OverflowError, lock.acquire, timeout=1e100)
     233          self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1)
     234          # TIMEOUT_MAX is ok
     235          lock.acquire(timeout=TIMEOUT_MAX)
     236          lock.release()
     237          t1 = time.monotonic()
     238          self.assertTrue(lock.acquire(timeout=5))
     239          t2 = time.monotonic()
     240          # Just a sanity test that it didn't actually wait for the timeout.
     241          self.assertLess(t2 - t1, 5)
     242          results = []
     243          def f():
     244              t1 = time.monotonic()
     245              results.append(lock.acquire(timeout=0.5))
     246              t2 = time.monotonic()
     247              results.append(t2 - t1)
     248          with Bunch(f, 1):
     249              pass
     250          self.assertFalse(results[0])
     251          self.assertTimeout(results[1], 0.5)
     252  
     253      def test_weakref_exists(self):
     254          lock = self.locktype()
     255          ref = weakref.ref(lock)
     256          self.assertIsNotNone(ref())
     257  
     258      def test_weakref_deleted(self):
     259          lock = self.locktype()
     260          ref = weakref.ref(lock)
     261          del lock
     262          gc.collect()  # For PyPy or other GCs.
     263          self.assertIsNone(ref())
     264  
     265  
     266  class ESC[4;38;5;81mLockTests(ESC[4;38;5;149mBaseLockTests):
     267      """
     268      Tests for non-recursive, weak locks
     269      (which can be acquired and released from different threads).
     270      """
     271      def test_reacquire(self):
     272          # Lock needs to be released before re-acquiring.
     273          lock = self.locktype()
     274          phase = []
     275  
     276          def f():
     277              lock.acquire()
     278              phase.append(None)
     279              lock.acquire()
     280              phase.append(None)
     281  
     282          with threading_helper.wait_threads_exit():
     283              # Thread blocked on lock.acquire()
     284              start_new_thread(f, ())
     285              self.wait_phase(phase, 1)
     286  
     287              # Thread unblocked
     288              lock.release()
     289              self.wait_phase(phase, 2)
     290  
     291      def test_different_thread(self):
     292          # Lock can be released from a different thread.
     293          lock = self.locktype()
     294          lock.acquire()
     295          def f():
     296              lock.release()
     297          with Bunch(f, 1):
     298              pass
     299          lock.acquire()
     300          lock.release()
     301  
     302      def test_state_after_timeout(self):
     303          # Issue #11618: check that lock is in a proper state after a
     304          # (non-zero) timeout.
     305          lock = self.locktype()
     306          lock.acquire()
     307          self.assertFalse(lock.acquire(timeout=0.01))
     308          lock.release()
     309          self.assertFalse(lock.locked())
     310          self.assertTrue(lock.acquire(blocking=False))
     311  
     312      @requires_fork
     313      def test_at_fork_reinit(self):
     314          def use_lock(lock):
     315              # make sure that the lock still works normally
     316              # after _at_fork_reinit()
     317              lock.acquire()
     318              lock.release()
     319  
     320          # unlocked
     321          lock = self.locktype()
     322          lock._at_fork_reinit()
     323          use_lock(lock)
     324  
     325          # locked: _at_fork_reinit() resets the lock to the unlocked state
     326          lock2 = self.locktype()
     327          lock2.acquire()
     328          lock2._at_fork_reinit()
     329          use_lock(lock2)
     330  
     331  
     332  class ESC[4;38;5;81mRLockTests(ESC[4;38;5;149mBaseLockTests):
     333      """
     334      Tests for recursive locks.
     335      """
     336      def test_reacquire(self):
     337          lock = self.locktype()
     338          lock.acquire()
     339          lock.acquire()
     340          lock.release()
     341          lock.acquire()
     342          lock.release()
     343          lock.release()
     344  
     345      def test_release_unacquired(self):
     346          # Cannot release an unacquired lock
     347          lock = self.locktype()
     348          self.assertRaises(RuntimeError, lock.release)
     349          lock.acquire()
     350          lock.acquire()
     351          lock.release()
     352          lock.acquire()
     353          lock.release()
     354          lock.release()
     355          self.assertRaises(RuntimeError, lock.release)
     356  
     357      def test_release_save_unacquired(self):
     358          # Cannot _release_save an unacquired lock
     359          lock = self.locktype()
     360          self.assertRaises(RuntimeError, lock._release_save)
     361          lock.acquire()
     362          lock.acquire()
     363          lock.release()
     364          lock.acquire()
     365          lock.release()
     366          lock.release()
     367          self.assertRaises(RuntimeError, lock._release_save)
     368  
     369      def test_recursion_count(self):
     370          lock = self.locktype()
     371          self.assertEqual(0, lock._recursion_count())
     372          lock.acquire()
     373          self.assertEqual(1, lock._recursion_count())
     374          lock.acquire()
     375          lock.acquire()
     376          self.assertEqual(3, lock._recursion_count())
     377          lock.release()
     378          self.assertEqual(2, lock._recursion_count())
     379          lock.release()
     380          lock.release()
     381          self.assertEqual(0, lock._recursion_count())
     382  
     383          phase = []
     384  
     385          def f():
     386              lock.acquire()
     387              phase.append(None)
     388  
     389              self.wait_phase(phase, 2)
     390              lock.release()
     391              phase.append(None)
     392  
     393          with threading_helper.wait_threads_exit():
     394              # Thread blocked on lock.acquire()
     395              start_new_thread(f, ())
     396              self.wait_phase(phase, 1)
     397              self.assertEqual(0, lock._recursion_count())
     398  
     399              # Thread unblocked
     400              phase.append(None)
     401              self.wait_phase(phase, 3)
     402              self.assertEqual(0, lock._recursion_count())
     403  
     404      def test_different_thread(self):
     405          # Cannot release from a different thread
     406          lock = self.locktype()
     407          def f():
     408              lock.acquire()
     409  
     410          with Bunch(f, 1, True) as bunch:
     411              try:
     412                  self.assertRaises(RuntimeError, lock.release)
     413              finally:
     414                  bunch.do_finish()
     415  
     416      def test__is_owned(self):
     417          lock = self.locktype()
     418          self.assertFalse(lock._is_owned())
     419          lock.acquire()
     420          self.assertTrue(lock._is_owned())
     421          lock.acquire()
     422          self.assertTrue(lock._is_owned())
     423          result = []
     424          def f():
     425              result.append(lock._is_owned())
     426          with Bunch(f, 1):
     427              pass
     428          self.assertFalse(result[0])
     429          lock.release()
     430          self.assertTrue(lock._is_owned())
     431          lock.release()
     432          self.assertFalse(lock._is_owned())
     433  
     434  
     435  class ESC[4;38;5;81mEventTests(ESC[4;38;5;149mBaseTestCase):
     436      """
     437      Tests for Event objects.
     438      """
     439  
     440      def test_is_set(self):
     441          evt = self.eventtype()
     442          self.assertFalse(evt.is_set())
     443          evt.set()
     444          self.assertTrue(evt.is_set())
     445          evt.set()
     446          self.assertTrue(evt.is_set())
     447          evt.clear()
     448          self.assertFalse(evt.is_set())
     449          evt.clear()
     450          self.assertFalse(evt.is_set())
     451  
     452      def _check_notify(self, evt):
     453          # All threads get notified
     454          N = 5
     455          results1 = []
     456          results2 = []
     457          def f():
     458              results1.append(evt.wait())
     459              results2.append(evt.wait())
     460  
     461          with Bunch(f, N):
     462              # Threads blocked on first evt.wait()
     463              wait_threads_blocked(N)
     464              self.assertEqual(len(results1), 0)
     465  
     466              # Threads unblocked
     467              evt.set()
     468  
     469          self.assertEqual(results1, [True] * N)
     470          self.assertEqual(results2, [True] * N)
     471  
     472      def test_notify(self):
     473          evt = self.eventtype()
     474          self._check_notify(evt)
     475          # Another time, after an explicit clear()
     476          evt.set()
     477          evt.clear()
     478          self._check_notify(evt)
     479  
     480      def test_timeout(self):
     481          evt = self.eventtype()
     482          results1 = []
     483          results2 = []
     484          N = 5
     485          def f():
     486              results1.append(evt.wait(0.0))
     487              t1 = time.monotonic()
     488              r = evt.wait(0.5)
     489              t2 = time.monotonic()
     490              results2.append((r, t2 - t1))
     491  
     492          with Bunch(f, N):
     493              pass
     494  
     495          self.assertEqual(results1, [False] * N)
     496          for r, dt in results2:
     497              self.assertFalse(r)
     498              self.assertTimeout(dt, 0.5)
     499  
     500          # The event is set
     501          results1 = []
     502          results2 = []
     503          evt.set()
     504          with Bunch(f, N):
     505              pass
     506  
     507          self.assertEqual(results1, [True] * N)
     508          for r, dt in results2:
     509              self.assertTrue(r)
     510  
     511      def test_set_and_clear(self):
     512          # gh-57711: check that wait() returns true even when the event is
     513          # cleared before the waiting thread is woken up.
     514          event = self.eventtype()
     515          results = []
     516          def f():
     517              results.append(event.wait(support.LONG_TIMEOUT))
     518  
     519          N = 5
     520          with Bunch(f, N):
     521              # Threads blocked on event.wait()
     522              wait_threads_blocked(N)
     523  
     524              # Threads unblocked
     525              event.set()
     526              event.clear()
     527  
     528          self.assertEqual(results, [True] * N)
     529  
     530      @requires_fork
     531      def test_at_fork_reinit(self):
     532          # ensure that condition is still using a Lock after reset
     533          evt = self.eventtype()
     534          with evt._cond:
     535              self.assertFalse(evt._cond.acquire(False))
     536          evt._at_fork_reinit()
     537          with evt._cond:
     538              self.assertFalse(evt._cond.acquire(False))
     539  
     540      def test_repr(self):
     541          evt = self.eventtype()
     542          self.assertRegex(repr(evt), r"<\w+\.Event at .*: unset>")
     543          evt.set()
     544          self.assertRegex(repr(evt), r"<\w+\.Event at .*: set>")
     545  
     546  
     547  class ESC[4;38;5;81mConditionTests(ESC[4;38;5;149mBaseTestCase):
     548      """
     549      Tests for condition variables.
     550      """
     551  
     552      def test_acquire(self):
     553          cond = self.condtype()
     554          # Be default we have an RLock: the condition can be acquired multiple
     555          # times.
     556          cond.acquire()
     557          cond.acquire()
     558          cond.release()
     559          cond.release()
     560          lock = threading.Lock()
     561          cond = self.condtype(lock)
     562          cond.acquire()
     563          self.assertFalse(lock.acquire(False))
     564          cond.release()
     565          self.assertTrue(lock.acquire(False))
     566          self.assertFalse(cond.acquire(False))
     567          lock.release()
     568          with cond:
     569              self.assertFalse(lock.acquire(False))
     570  
     571      def test_unacquired_wait(self):
     572          cond = self.condtype()
     573          self.assertRaises(RuntimeError, cond.wait)
     574  
     575      def test_unacquired_notify(self):
     576          cond = self.condtype()
     577          self.assertRaises(RuntimeError, cond.notify)
     578  
     579      def _check_notify(self, cond):
     580          # Note that this test is sensitive to timing.  If the worker threads
     581          # don't execute in a timely fashion, the main thread may think they
     582          # are further along then they are.  The main thread therefore issues
     583          # wait_threads_blocked() statements to try to make sure that it doesn't
     584          # race ahead of the workers.
     585          # Secondly, this test assumes that condition variables are not subject
     586          # to spurious wakeups.  The absence of spurious wakeups is an implementation
     587          # detail of Condition Variables in current CPython, but in general, not
     588          # a guaranteed property of condition variables as a programming
     589          # construct.  In particular, it is possible that this can no longer
     590          # be conveniently guaranteed should their implementation ever change.
     591          ready = []
     592          results1 = []
     593          results2 = []
     594          phase_num = 0
     595          def f():
     596              cond.acquire()
     597              ready.append(phase_num)
     598              result = cond.wait()
     599  
     600              cond.release()
     601              results1.append((result, phase_num))
     602  
     603              cond.acquire()
     604              ready.append(phase_num)
     605  
     606              result = cond.wait()
     607              cond.release()
     608              results2.append((result, phase_num))
     609  
     610          N = 5
     611          with Bunch(f, N):
     612              # first wait, to ensure all workers settle into cond.wait() before
     613              # we continue. See issues #8799 and #30727.
     614              for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
     615                  if len(ready) >= N:
     616                      break
     617  
     618              ready.clear()
     619              self.assertEqual(results1, [])
     620  
     621              # Notify 3 threads at first
     622              count1 = 3
     623              cond.acquire()
     624              cond.notify(count1)
     625              wait_threads_blocked(count1)
     626  
     627              # Phase 1
     628              phase_num = 1
     629              cond.release()
     630              for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
     631                  if len(results1) >= count1:
     632                      break
     633  
     634              self.assertEqual(results1, [(True, 1)] * count1)
     635              self.assertEqual(results2, [])
     636  
     637              # Wait until awaken workers are blocked on cond.wait()
     638              for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
     639                  if len(ready) >= count1 :
     640                      break
     641  
     642              # Notify 5 threads: they might be in their first or second wait
     643              cond.acquire()
     644              cond.notify(5)
     645              wait_threads_blocked(N)
     646  
     647              # Phase 2
     648              phase_num = 2
     649              cond.release()
     650              for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
     651                  if len(results1) + len(results2) >= (N + count1):
     652                      break
     653  
     654              count2 = N - count1
     655              self.assertEqual(results1, [(True, 1)] * count1 + [(True, 2)] * count2)
     656              self.assertEqual(results2, [(True, 2)] * count1)
     657  
     658              # Make sure all workers settle into cond.wait()
     659              for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
     660                  if len(ready) >= N:
     661                      break
     662  
     663              # Notify all threads: they are all in their second wait
     664              cond.acquire()
     665              cond.notify_all()
     666              wait_threads_blocked(N)
     667  
     668              # Phase 3
     669              phase_num = 3
     670              cond.release()
     671              for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
     672                  if len(results2) >= N:
     673                      break
     674              self.assertEqual(results1, [(True, 1)] * count1 + [(True, 2)] * count2)
     675              self.assertEqual(results2, [(True, 2)] * count1 + [(True, 3)] * count2)
     676  
     677      def test_notify(self):
     678          cond = self.condtype()
     679          self._check_notify(cond)
     680          # A second time, to check internal state is still ok.
     681          self._check_notify(cond)
     682  
     683      def test_timeout(self):
     684          cond = self.condtype()
     685          timeout = 0.5
     686          results = []
     687          def f():
     688              cond.acquire()
     689              t1 = time.monotonic()
     690              result = cond.wait(timeout)
     691              t2 = time.monotonic()
     692              cond.release()
     693              results.append((t2 - t1, result))
     694  
     695          N = 5
     696          with Bunch(f, N):
     697              pass
     698          self.assertEqual(len(results), N)
     699  
     700          for dt, result in results:
     701              self.assertTimeout(dt, timeout)
     702              # Note that conceptually (that"s the condition variable protocol)
     703              # a wait() may succeed even if no one notifies us and before any
     704              # timeout occurs.  Spurious wakeups can occur.
     705              # This makes it hard to verify the result value.
     706              # In practice, this implementation has no spurious wakeups.
     707              self.assertFalse(result)
     708  
     709      def test_waitfor(self):
     710          cond = self.condtype()
     711          state = 0
     712          def f():
     713              with cond:
     714                  result = cond.wait_for(lambda: state == 4)
     715                  self.assertTrue(result)
     716                  self.assertEqual(state, 4)
     717  
     718          with Bunch(f, 1):
     719              for i in range(4):
     720                  time.sleep(0.010)
     721                  with cond:
     722                      state += 1
     723                      cond.notify()
     724  
     725      def test_waitfor_timeout(self):
     726          cond = self.condtype()
     727          state = 0
     728          success = []
     729          def f():
     730              with cond:
     731                  dt = time.monotonic()
     732                  result = cond.wait_for(lambda : state==4, timeout=0.1)
     733                  dt = time.monotonic() - dt
     734                  self.assertFalse(result)
     735                  self.assertTimeout(dt, 0.1)
     736                  success.append(None)
     737  
     738          with Bunch(f, 1):
     739              # Only increment 3 times, so state == 4 is never reached.
     740              for i in range(3):
     741                  time.sleep(0.010)
     742                  with cond:
     743                      state += 1
     744                      cond.notify()
     745  
     746          self.assertEqual(len(success), 1)
     747  
     748  
     749  class ESC[4;38;5;81mBaseSemaphoreTests(ESC[4;38;5;149mBaseTestCase):
     750      """
     751      Common tests for {bounded, unbounded} semaphore objects.
     752      """
     753  
     754      def test_constructor(self):
     755          self.assertRaises(ValueError, self.semtype, value = -1)
     756          self.assertRaises(ValueError, self.semtype, value = -sys.maxsize)
     757  
     758      def test_acquire(self):
     759          sem = self.semtype(1)
     760          sem.acquire()
     761          sem.release()
     762          sem = self.semtype(2)
     763          sem.acquire()
     764          sem.acquire()
     765          sem.release()
     766          sem.release()
     767  
     768      def test_acquire_destroy(self):
     769          sem = self.semtype()
     770          sem.acquire()
     771          del sem
     772  
     773      def test_acquire_contended(self):
     774          sem_value = 7
     775          sem = self.semtype(sem_value)
     776          sem.acquire()
     777  
     778          sem_results = []
     779          results1 = []
     780          results2 = []
     781          phase_num = 0
     782  
     783          def func():
     784              sem_results.append(sem.acquire())
     785              results1.append(phase_num)
     786  
     787              sem_results.append(sem.acquire())
     788              results2.append(phase_num)
     789  
     790          def wait_count(count):
     791              for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
     792                  if len(results1) + len(results2) >= count:
     793                      break
     794  
     795          N = 10
     796          with Bunch(func, N):
     797              # Phase 0
     798              count1 = sem_value - 1
     799              wait_count(count1)
     800              self.assertEqual(results1 + results2, [0] * count1)
     801  
     802              # Phase 1
     803              phase_num = 1
     804              for i in range(sem_value):
     805                  sem.release()
     806              count2 = sem_value
     807              wait_count(count1 + count2)
     808              self.assertEqual(sorted(results1 + results2),
     809                               [0] * count1 + [1] * count2)
     810  
     811              # Phase 2
     812              phase_num = 2
     813              count3 = (sem_value - 1)
     814              for i in range(count3):
     815                  sem.release()
     816              wait_count(count1 + count2 + count3)
     817              self.assertEqual(sorted(results1 + results2),
     818                               [0] * count1 + [1] * count2 + [2] * count3)
     819              # The semaphore is still locked
     820              self.assertFalse(sem.acquire(False))
     821  
     822              # Final release, to let the last thread finish
     823              count4 = 1
     824              sem.release()
     825  
     826          self.assertEqual(sem_results,
     827                           [True] * (count1 + count2 + count3 + count4))
     828  
     829      def test_multirelease(self):
     830          sem_value = 7
     831          sem = self.semtype(sem_value)
     832          sem.acquire()
     833  
     834          results1 = []
     835          results2 = []
     836          phase_num = 0
     837          def func():
     838              sem.acquire()
     839              results1.append(phase_num)
     840  
     841              sem.acquire()
     842              results2.append(phase_num)
     843  
     844          def wait_count(count):
     845              for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
     846                  if len(results1) + len(results2) >= count:
     847                      break
     848  
     849          with Bunch(func, 10):
     850              # Phase 0
     851              count1 = sem_value - 1
     852              wait_count(count1)
     853              self.assertEqual(results1 + results2, [0] * count1)
     854  
     855              # Phase 1
     856              phase_num = 1
     857              count2 = sem_value
     858              sem.release(count2)
     859              wait_count(count1 + count2)
     860              self.assertEqual(sorted(results1 + results2),
     861                               [0] * count1 + [1] * count2)
     862  
     863              # Phase 2
     864              phase_num = 2
     865              count3 = sem_value - 1
     866              sem.release(count3)
     867              wait_count(count1 + count2 + count3)
     868              self.assertEqual(sorted(results1 + results2),
     869                               [0] * count1 + [1] * count2 + [2] * count3)
     870              # The semaphore is still locked
     871              self.assertFalse(sem.acquire(False))
     872  
     873              # Final release, to let the last thread finish
     874              sem.release()
     875  
     876      def test_try_acquire(self):
     877          sem = self.semtype(2)
     878          self.assertTrue(sem.acquire(False))
     879          self.assertTrue(sem.acquire(False))
     880          self.assertFalse(sem.acquire(False))
     881          sem.release()
     882          self.assertTrue(sem.acquire(False))
     883  
     884      def test_try_acquire_contended(self):
     885          sem = self.semtype(4)
     886          sem.acquire()
     887          results = []
     888          def f():
     889              results.append(sem.acquire(False))
     890              results.append(sem.acquire(False))
     891          with Bunch(f, 5):
     892              pass
     893          # There can be a thread switch between acquiring the semaphore and
     894          # appending the result, therefore results will not necessarily be
     895          # ordered.
     896          self.assertEqual(sorted(results), [False] * 7 + [True] *  3 )
     897  
     898      def test_acquire_timeout(self):
     899          sem = self.semtype(2)
     900          self.assertRaises(ValueError, sem.acquire, False, timeout=1.0)
     901          self.assertTrue(sem.acquire(timeout=0.005))
     902          self.assertTrue(sem.acquire(timeout=0.005))
     903          self.assertFalse(sem.acquire(timeout=0.005))
     904          sem.release()
     905          self.assertTrue(sem.acquire(timeout=0.005))
     906          t = time.monotonic()
     907          self.assertFalse(sem.acquire(timeout=0.5))
     908          dt = time.monotonic() - t
     909          self.assertTimeout(dt, 0.5)
     910  
     911      def test_default_value(self):
     912          # The default initial value is 1.
     913          sem = self.semtype()
     914          sem.acquire()
     915          def f():
     916              sem.acquire()
     917              sem.release()
     918  
     919          with Bunch(f, 1) as bunch:
     920              # Thread blocked on sem.acquire()
     921              wait_threads_blocked(1)
     922              self.assertFalse(bunch.finished)
     923  
     924              # Thread unblocked
     925              sem.release()
     926  
     927      def test_with(self):
     928          sem = self.semtype(2)
     929          def _with(err=None):
     930              with sem:
     931                  self.assertTrue(sem.acquire(False))
     932                  sem.release()
     933                  with sem:
     934                      self.assertFalse(sem.acquire(False))
     935                      if err:
     936                          raise err
     937          _with()
     938          self.assertTrue(sem.acquire(False))
     939          sem.release()
     940          self.assertRaises(TypeError, _with, TypeError)
     941          self.assertTrue(sem.acquire(False))
     942          sem.release()
     943  
     944  class ESC[4;38;5;81mSemaphoreTests(ESC[4;38;5;149mBaseSemaphoreTests):
     945      """
     946      Tests for unbounded semaphores.
     947      """
     948  
     949      def test_release_unacquired(self):
     950          # Unbounded releases are allowed and increment the semaphore's value
     951          sem = self.semtype(1)
     952          sem.release()
     953          sem.acquire()
     954          sem.acquire()
     955          sem.release()
     956  
     957      def test_repr(self):
     958          sem = self.semtype(3)
     959          self.assertRegex(repr(sem), r"<\w+\.Semaphore at .*: value=3>")
     960          sem.acquire()
     961          self.assertRegex(repr(sem), r"<\w+\.Semaphore at .*: value=2>")
     962          sem.release()
     963          sem.release()
     964          self.assertRegex(repr(sem), r"<\w+\.Semaphore at .*: value=4>")
     965  
     966  
     967  class ESC[4;38;5;81mBoundedSemaphoreTests(ESC[4;38;5;149mBaseSemaphoreTests):
     968      """
     969      Tests for bounded semaphores.
     970      """
     971  
     972      def test_release_unacquired(self):
     973          # Cannot go past the initial value
     974          sem = self.semtype()
     975          self.assertRaises(ValueError, sem.release)
     976          sem.acquire()
     977          sem.release()
     978          self.assertRaises(ValueError, sem.release)
     979  
     980      def test_repr(self):
     981          sem = self.semtype(3)
     982          self.assertRegex(repr(sem), r"<\w+\.BoundedSemaphore at .*: value=3/3>")
     983          sem.acquire()
     984          self.assertRegex(repr(sem), r"<\w+\.BoundedSemaphore at .*: value=2/3>")
     985  
     986  
     987  class ESC[4;38;5;81mBarrierTests(ESC[4;38;5;149mBaseTestCase):
     988      """
     989      Tests for Barrier objects.
     990      """
     991      N = 5
     992      defaultTimeout = 2.0
     993  
     994      def setUp(self):
     995          self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout)
     996  
     997      def tearDown(self):
     998          self.barrier.abort()
     999  
    1000      def run_threads(self, f):
    1001          with Bunch(f, self.N):
    1002              pass
    1003  
    1004      def multipass(self, results, n):
    1005          m = self.barrier.parties
    1006          self.assertEqual(m, self.N)
    1007          for i in range(n):
    1008              results[0].append(True)
    1009              self.assertEqual(len(results[1]), i * m)
    1010              self.barrier.wait()
    1011              results[1].append(True)
    1012              self.assertEqual(len(results[0]), (i + 1) * m)
    1013              self.barrier.wait()
    1014          self.assertEqual(self.barrier.n_waiting, 0)
    1015          self.assertFalse(self.barrier.broken)
    1016  
    1017      def test_barrier(self, passes=1):
    1018          """
    1019          Test that a barrier is passed in lockstep
    1020          """
    1021          results = [[],[]]
    1022          def f():
    1023              self.multipass(results, passes)
    1024          self.run_threads(f)
    1025  
    1026      def test_barrier_10(self):
    1027          """
    1028          Test that a barrier works for 10 consecutive runs
    1029          """
    1030          return self.test_barrier(10)
    1031  
    1032      def test_wait_return(self):
    1033          """
    1034          test the return value from barrier.wait
    1035          """
    1036          results = []
    1037          def f():
    1038              r = self.barrier.wait()
    1039              results.append(r)
    1040  
    1041          self.run_threads(f)
    1042          self.assertEqual(sum(results), sum(range(self.N)))
    1043  
    1044      def test_action(self):
    1045          """
    1046          Test the 'action' callback
    1047          """
    1048          results = []
    1049          def action():
    1050              results.append(True)
    1051          barrier = self.barriertype(self.N, action)
    1052          def f():
    1053              barrier.wait()
    1054              self.assertEqual(len(results), 1)
    1055  
    1056          self.run_threads(f)
    1057  
    1058      def test_abort(self):
    1059          """
    1060          Test that an abort will put the barrier in a broken state
    1061          """
    1062          results1 = []
    1063          results2 = []
    1064          def f():
    1065              try:
    1066                  i = self.barrier.wait()
    1067                  if i == self.N//2:
    1068                      raise RuntimeError
    1069                  self.barrier.wait()
    1070                  results1.append(True)
    1071              except threading.BrokenBarrierError:
    1072                  results2.append(True)
    1073              except RuntimeError:
    1074                  self.barrier.abort()
    1075                  pass
    1076  
    1077          self.run_threads(f)
    1078          self.assertEqual(len(results1), 0)
    1079          self.assertEqual(len(results2), self.N-1)
    1080          self.assertTrue(self.barrier.broken)
    1081  
    1082      def test_reset(self):
    1083          """
    1084          Test that a 'reset' on a barrier frees the waiting threads
    1085          """
    1086          results1 = []
    1087          results2 = []
    1088          results3 = []
    1089          def f():
    1090              i = self.barrier.wait()
    1091              if i == self.N//2:
    1092                  # Wait until the other threads are all in the barrier.
    1093                  for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
    1094                      if self.barrier.n_waiting >= (self.N - 1):
    1095                          break
    1096                  self.barrier.reset()
    1097              else:
    1098                  try:
    1099                      self.barrier.wait()
    1100                      results1.append(True)
    1101                  except threading.BrokenBarrierError:
    1102                      results2.append(True)
    1103              # Now, pass the barrier again
    1104              self.barrier.wait()
    1105              results3.append(True)
    1106  
    1107          self.run_threads(f)
    1108          self.assertEqual(len(results1), 0)
    1109          self.assertEqual(len(results2), self.N-1)
    1110          self.assertEqual(len(results3), self.N)
    1111  
    1112  
    1113      def test_abort_and_reset(self):
    1114          """
    1115          Test that a barrier can be reset after being broken.
    1116          """
    1117          results1 = []
    1118          results2 = []
    1119          results3 = []
    1120          barrier2 = self.barriertype(self.N)
    1121          def f():
    1122              try:
    1123                  i = self.barrier.wait()
    1124                  if i == self.N//2:
    1125                      raise RuntimeError
    1126                  self.barrier.wait()
    1127                  results1.append(True)
    1128              except threading.BrokenBarrierError:
    1129                  results2.append(True)
    1130              except RuntimeError:
    1131                  self.barrier.abort()
    1132                  pass
    1133              # Synchronize and reset the barrier.  Must synchronize first so
    1134              # that everyone has left it when we reset, and after so that no
    1135              # one enters it before the reset.
    1136              if barrier2.wait() == self.N//2:
    1137                  self.barrier.reset()
    1138              barrier2.wait()
    1139              self.barrier.wait()
    1140              results3.append(True)
    1141  
    1142          self.run_threads(f)
    1143          self.assertEqual(len(results1), 0)
    1144          self.assertEqual(len(results2), self.N-1)
    1145          self.assertEqual(len(results3), self.N)
    1146  
    1147      def test_timeout(self):
    1148          """
    1149          Test wait(timeout)
    1150          """
    1151          def f():
    1152              i = self.barrier.wait()
    1153              if i == self.N // 2:
    1154                  # One thread is late!
    1155                  time.sleep(self.defaultTimeout / 2)
    1156              # Default timeout is 2.0, so this is shorter.
    1157              self.assertRaises(threading.BrokenBarrierError,
    1158                                self.barrier.wait, self.defaultTimeout / 4)
    1159          self.run_threads(f)
    1160  
    1161      def test_default_timeout(self):
    1162          """
    1163          Test the barrier's default timeout
    1164          """
    1165          timeout = 0.100
    1166          barrier = self.barriertype(2, timeout=timeout)
    1167          def f():
    1168              self.assertRaises(threading.BrokenBarrierError,
    1169                                barrier.wait)
    1170  
    1171          start_time = time.monotonic()
    1172          with Bunch(f, 1):
    1173              pass
    1174          dt = time.monotonic() - start_time
    1175          self.assertGreaterEqual(dt, timeout)
    1176  
    1177      def test_single_thread(self):
    1178          b = self.barriertype(1)
    1179          b.wait()
    1180          b.wait()
    1181  
    1182      def test_repr(self):
    1183          barrier = self.barriertype(3)
    1184          timeout = support.LONG_TIMEOUT
    1185          self.assertRegex(repr(barrier), r"<\w+\.Barrier at .*: waiters=0/3>")
    1186          def f():
    1187              barrier.wait(timeout)
    1188  
    1189          N = 2
    1190          with Bunch(f, N):
    1191              # Threads blocked on barrier.wait()
    1192              for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
    1193                  if barrier.n_waiting >= N:
    1194                      break
    1195              self.assertRegex(repr(barrier),
    1196                               r"<\w+\.Barrier at .*: waiters=2/3>")
    1197  
    1198              # Threads unblocked
    1199              barrier.wait(timeout)
    1200  
    1201          self.assertRegex(repr(barrier),
    1202                           r"<\w+\.Barrier at .*: waiters=0/3>")
    1203  
    1204          # Abort the barrier
    1205          barrier.abort()
    1206          self.assertRegex(repr(barrier),
    1207                           r"<\w+\.Barrier at .*: broken>")