(root)/
Python-3.12.0/
Lib/
test/
lock_tests.py
       1  """
       2  Various tests for synchronization primitives.
       3  """
       4  
       5  import gc
       6  import sys
       7  import time
       8  from _thread import start_new_thread, TIMEOUT_MAX
       9  import threading
      10  import unittest
      11  import weakref
      12  
      13  from test import support
      14  from test.support import threading_helper
      15  
      16  
      17  requires_fork = unittest.skipUnless(support.has_fork_support,
      18                                      "platform doesn't support fork "
      19                                       "(no _at_fork_reinit method)")
      20  
      21  
      22  def _wait():
      23      # A crude wait/yield function not relying on synchronization primitives.
      24      time.sleep(0.01)
      25  
      26  class ESC[4;38;5;81mBunch(ESC[4;38;5;149mobject):
      27      """
      28      A bunch of threads.
      29      """
      30      def __init__(self, f, n, wait_before_exit=False):
      31          """
      32          Construct a bunch of `n` threads running the same function `f`.
      33          If `wait_before_exit` is True, the threads won't terminate until
      34          do_finish() is called.
      35          """
      36          self.f = f
      37          self.n = n
      38          self.started = []
      39          self.finished = []
      40          self._can_exit = not wait_before_exit
      41          self.wait_thread = threading_helper.wait_threads_exit()
      42          self.wait_thread.__enter__()
      43  
      44          def task():
      45              tid = threading.get_ident()
      46              self.started.append(tid)
      47              try:
      48                  f()
      49              finally:
      50                  self.finished.append(tid)
      51                  while not self._can_exit:
      52                      _wait()
      53  
      54          try:
      55              for i in range(n):
      56                  start_new_thread(task, ())
      57          except:
      58              self._can_exit = True
      59              raise
      60  
      61      def wait_for_started(self):
      62          while len(self.started) < self.n:
      63              _wait()
      64  
      65      def wait_for_finished(self):
      66          while len(self.finished) < self.n:
      67              _wait()
      68          # Wait for threads exit
      69          self.wait_thread.__exit__(None, None, None)
      70  
      71      def do_finish(self):
      72          self._can_exit = True
      73  
      74  
      75  class ESC[4;38;5;81mBaseTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      76      def setUp(self):
      77          self._threads = threading_helper.threading_setup()
      78  
      79      def tearDown(self):
      80          threading_helper.threading_cleanup(*self._threads)
      81          support.reap_children()
      82  
      83      def assertTimeout(self, actual, expected):
      84          # The waiting and/or time.monotonic() can be imprecise, which
      85          # is why comparing to the expected value would sometimes fail
      86          # (especially under Windows).
      87          self.assertGreaterEqual(actual, expected * 0.6)
      88          # Test nothing insane happened
      89          self.assertLess(actual, expected * 10.0)
      90  
      91  
      92  class ESC[4;38;5;81mBaseLockTests(ESC[4;38;5;149mBaseTestCase):
      93      """
      94      Tests for both recursive and non-recursive locks.
      95      """
      96  
      97      def test_constructor(self):
      98          lock = self.locktype()
      99          del lock
     100  
     101      def test_repr(self):
     102          lock = self.locktype()
     103          self.assertRegex(repr(lock), "<unlocked .* object (.*)?at .*>")
     104          del lock
     105  
     106      def test_locked_repr(self):
     107          lock = self.locktype()
     108          lock.acquire()
     109          self.assertRegex(repr(lock), "<locked .* object (.*)?at .*>")
     110          del lock
     111  
     112      def test_acquire_destroy(self):
     113          lock = self.locktype()
     114          lock.acquire()
     115          del lock
     116  
     117      def test_acquire_release(self):
     118          lock = self.locktype()
     119          lock.acquire()
     120          lock.release()
     121          del lock
     122  
     123      def test_try_acquire(self):
     124          lock = self.locktype()
     125          self.assertTrue(lock.acquire(False))
     126          lock.release()
     127  
     128      def test_try_acquire_contended(self):
     129          lock = self.locktype()
     130          lock.acquire()
     131          result = []
     132          def f():
     133              result.append(lock.acquire(False))
     134          Bunch(f, 1).wait_for_finished()
     135          self.assertFalse(result[0])
     136          lock.release()
     137  
     138      def test_acquire_contended(self):
     139          lock = self.locktype()
     140          lock.acquire()
     141          N = 5
     142          def f():
     143              lock.acquire()
     144              lock.release()
     145  
     146          b = Bunch(f, N)
     147          b.wait_for_started()
     148          _wait()
     149          self.assertEqual(len(b.finished), 0)
     150          lock.release()
     151          b.wait_for_finished()
     152          self.assertEqual(len(b.finished), N)
     153  
     154      def test_with(self):
     155          lock = self.locktype()
     156          def f():
     157              lock.acquire()
     158              lock.release()
     159          def _with(err=None):
     160              with lock:
     161                  if err is not None:
     162                      raise err
     163          _with()
     164          # Check the lock is unacquired
     165          Bunch(f, 1).wait_for_finished()
     166          self.assertRaises(TypeError, _with, TypeError)
     167          # Check the lock is unacquired
     168          Bunch(f, 1).wait_for_finished()
     169  
     170      def test_thread_leak(self):
     171          # The lock shouldn't leak a Thread instance when used from a foreign
     172          # (non-threading) thread.
     173          lock = self.locktype()
     174          def f():
     175              lock.acquire()
     176              lock.release()
     177          n = len(threading.enumerate())
     178          # We run many threads in the hope that existing threads ids won't
     179          # be recycled.
     180          Bunch(f, 15).wait_for_finished()
     181          if len(threading.enumerate()) != n:
     182              # There is a small window during which a Thread instance's
     183              # target function has finished running, but the Thread is still
     184              # alive and registered.  Avoid spurious failures by waiting a
     185              # bit more (seen on a buildbot).
     186              time.sleep(0.4)
     187              self.assertEqual(n, len(threading.enumerate()))
     188  
     189      def test_timeout(self):
     190          lock = self.locktype()
     191          # Can't set timeout if not blocking
     192          self.assertRaises(ValueError, lock.acquire, False, 1)
     193          # Invalid timeout values
     194          self.assertRaises(ValueError, lock.acquire, timeout=-100)
     195          self.assertRaises(OverflowError, lock.acquire, timeout=1e100)
     196          self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1)
     197          # TIMEOUT_MAX is ok
     198          lock.acquire(timeout=TIMEOUT_MAX)
     199          lock.release()
     200          t1 = time.monotonic()
     201          self.assertTrue(lock.acquire(timeout=5))
     202          t2 = time.monotonic()
     203          # Just a sanity test that it didn't actually wait for the timeout.
     204          self.assertLess(t2 - t1, 5)
     205          results = []
     206          def f():
     207              t1 = time.monotonic()
     208              results.append(lock.acquire(timeout=0.5))
     209              t2 = time.monotonic()
     210              results.append(t2 - t1)
     211          Bunch(f, 1).wait_for_finished()
     212          self.assertFalse(results[0])
     213          self.assertTimeout(results[1], 0.5)
     214  
     215      def test_weakref_exists(self):
     216          lock = self.locktype()
     217          ref = weakref.ref(lock)
     218          self.assertIsNotNone(ref())
     219  
     220      def test_weakref_deleted(self):
     221          lock = self.locktype()
     222          ref = weakref.ref(lock)
     223          del lock
     224          gc.collect()  # For PyPy or other GCs.
     225          self.assertIsNone(ref())
     226  
     227  
     228  class ESC[4;38;5;81mLockTests(ESC[4;38;5;149mBaseLockTests):
     229      """
     230      Tests for non-recursive, weak locks
     231      (which can be acquired and released from different threads).
     232      """
     233      def test_reacquire(self):
     234          # Lock needs to be released before re-acquiring.
     235          lock = self.locktype()
     236          phase = []
     237  
     238          def f():
     239              lock.acquire()
     240              phase.append(None)
     241              lock.acquire()
     242              phase.append(None)
     243  
     244          with threading_helper.wait_threads_exit():
     245              start_new_thread(f, ())
     246              while len(phase) == 0:
     247                  _wait()
     248              _wait()
     249              self.assertEqual(len(phase), 1)
     250              lock.release()
     251              while len(phase) == 1:
     252                  _wait()
     253              self.assertEqual(len(phase), 2)
     254  
     255      def test_different_thread(self):
     256          # Lock can be released from a different thread.
     257          lock = self.locktype()
     258          lock.acquire()
     259          def f():
     260              lock.release()
     261          b = Bunch(f, 1)
     262          b.wait_for_finished()
     263          lock.acquire()
     264          lock.release()
     265  
     266      def test_state_after_timeout(self):
     267          # Issue #11618: check that lock is in a proper state after a
     268          # (non-zero) timeout.
     269          lock = self.locktype()
     270          lock.acquire()
     271          self.assertFalse(lock.acquire(timeout=0.01))
     272          lock.release()
     273          self.assertFalse(lock.locked())
     274          self.assertTrue(lock.acquire(blocking=False))
     275  
     276      @requires_fork
     277      def test_at_fork_reinit(self):
     278          def use_lock(lock):
     279              # make sure that the lock still works normally
     280              # after _at_fork_reinit()
     281              lock.acquire()
     282              lock.release()
     283  
     284          # unlocked
     285          lock = self.locktype()
     286          lock._at_fork_reinit()
     287          use_lock(lock)
     288  
     289          # locked: _at_fork_reinit() resets the lock to the unlocked state
     290          lock2 = self.locktype()
     291          lock2.acquire()
     292          lock2._at_fork_reinit()
     293          use_lock(lock2)
     294  
     295  
     296  class ESC[4;38;5;81mRLockTests(ESC[4;38;5;149mBaseLockTests):
     297      """
     298      Tests for recursive locks.
     299      """
     300      def test_reacquire(self):
     301          lock = self.locktype()
     302          lock.acquire()
     303          lock.acquire()
     304          lock.release()
     305          lock.acquire()
     306          lock.release()
     307          lock.release()
     308  
     309      def test_release_unacquired(self):
     310          # Cannot release an unacquired lock
     311          lock = self.locktype()
     312          self.assertRaises(RuntimeError, lock.release)
     313          lock.acquire()
     314          lock.acquire()
     315          lock.release()
     316          lock.acquire()
     317          lock.release()
     318          lock.release()
     319          self.assertRaises(RuntimeError, lock.release)
     320  
     321      def test_release_save_unacquired(self):
     322          # Cannot _release_save an unacquired lock
     323          lock = self.locktype()
     324          self.assertRaises(RuntimeError, lock._release_save)
     325          lock.acquire()
     326          lock.acquire()
     327          lock.release()
     328          lock.acquire()
     329          lock.release()
     330          lock.release()
     331          self.assertRaises(RuntimeError, lock._release_save)
     332  
     333      def test_different_thread(self):
     334          # Cannot release from a different thread
     335          lock = self.locktype()
     336          def f():
     337              lock.acquire()
     338          b = Bunch(f, 1, True)
     339          try:
     340              self.assertRaises(RuntimeError, lock.release)
     341          finally:
     342              b.do_finish()
     343          b.wait_for_finished()
     344  
     345      def test__is_owned(self):
     346          lock = self.locktype()
     347          self.assertFalse(lock._is_owned())
     348          lock.acquire()
     349          self.assertTrue(lock._is_owned())
     350          lock.acquire()
     351          self.assertTrue(lock._is_owned())
     352          result = []
     353          def f():
     354              result.append(lock._is_owned())
     355          Bunch(f, 1).wait_for_finished()
     356          self.assertFalse(result[0])
     357          lock.release()
     358          self.assertTrue(lock._is_owned())
     359          lock.release()
     360          self.assertFalse(lock._is_owned())
     361  
     362  
     363  class ESC[4;38;5;81mEventTests(ESC[4;38;5;149mBaseTestCase):
     364      """
     365      Tests for Event objects.
     366      """
     367  
     368      def test_is_set(self):
     369          evt = self.eventtype()
     370          self.assertFalse(evt.is_set())
     371          evt.set()
     372          self.assertTrue(evt.is_set())
     373          evt.set()
     374          self.assertTrue(evt.is_set())
     375          evt.clear()
     376          self.assertFalse(evt.is_set())
     377          evt.clear()
     378          self.assertFalse(evt.is_set())
     379  
     380      def _check_notify(self, evt):
     381          # All threads get notified
     382          N = 5
     383          results1 = []
     384          results2 = []
     385          def f():
     386              results1.append(evt.wait())
     387              results2.append(evt.wait())
     388          b = Bunch(f, N)
     389          b.wait_for_started()
     390          _wait()
     391          self.assertEqual(len(results1), 0)
     392          evt.set()
     393          b.wait_for_finished()
     394          self.assertEqual(results1, [True] * N)
     395          self.assertEqual(results2, [True] * N)
     396  
     397      def test_notify(self):
     398          evt = self.eventtype()
     399          self._check_notify(evt)
     400          # Another time, after an explicit clear()
     401          evt.set()
     402          evt.clear()
     403          self._check_notify(evt)
     404  
     405      def test_timeout(self):
     406          evt = self.eventtype()
     407          results1 = []
     408          results2 = []
     409          N = 5
     410          def f():
     411              results1.append(evt.wait(0.0))
     412              t1 = time.monotonic()
     413              r = evt.wait(0.5)
     414              t2 = time.monotonic()
     415              results2.append((r, t2 - t1))
     416          Bunch(f, N).wait_for_finished()
     417          self.assertEqual(results1, [False] * N)
     418          for r, dt in results2:
     419              self.assertFalse(r)
     420              self.assertTimeout(dt, 0.5)
     421          # The event is set
     422          results1 = []
     423          results2 = []
     424          evt.set()
     425          Bunch(f, N).wait_for_finished()
     426          self.assertEqual(results1, [True] * N)
     427          for r, dt in results2:
     428              self.assertTrue(r)
     429  
     430      def test_set_and_clear(self):
     431          # Issue #13502: check that wait() returns true even when the event is
     432          # cleared before the waiting thread is woken up.
     433          evt = self.eventtype()
     434          results = []
     435          timeout = 0.250
     436          N = 5
     437          def f():
     438              results.append(evt.wait(timeout * 4))
     439          b = Bunch(f, N)
     440          b.wait_for_started()
     441          time.sleep(timeout)
     442          evt.set()
     443          evt.clear()
     444          b.wait_for_finished()
     445          self.assertEqual(results, [True] * N)
     446  
     447      @requires_fork
     448      def test_at_fork_reinit(self):
     449          # ensure that condition is still using a Lock after reset
     450          evt = self.eventtype()
     451          with evt._cond:
     452              self.assertFalse(evt._cond.acquire(False))
     453          evt._at_fork_reinit()
     454          with evt._cond:
     455              self.assertFalse(evt._cond.acquire(False))
     456  
     457      def test_repr(self):
     458          evt = self.eventtype()
     459          self.assertRegex(repr(evt), r"<\w+\.Event at .*: unset>")
     460          evt.set()
     461          self.assertRegex(repr(evt), r"<\w+\.Event at .*: set>")
     462  
     463  
     464  class ESC[4;38;5;81mConditionTests(ESC[4;38;5;149mBaseTestCase):
     465      """
     466      Tests for condition variables.
     467      """
     468  
     469      def test_acquire(self):
     470          cond = self.condtype()
     471          # Be default we have an RLock: the condition can be acquired multiple
     472          # times.
     473          cond.acquire()
     474          cond.acquire()
     475          cond.release()
     476          cond.release()
     477          lock = threading.Lock()
     478          cond = self.condtype(lock)
     479          cond.acquire()
     480          self.assertFalse(lock.acquire(False))
     481          cond.release()
     482          self.assertTrue(lock.acquire(False))
     483          self.assertFalse(cond.acquire(False))
     484          lock.release()
     485          with cond:
     486              self.assertFalse(lock.acquire(False))
     487  
     488      def test_unacquired_wait(self):
     489          cond = self.condtype()
     490          self.assertRaises(RuntimeError, cond.wait)
     491  
     492      def test_unacquired_notify(self):
     493          cond = self.condtype()
     494          self.assertRaises(RuntimeError, cond.notify)
     495  
     496      def _check_notify(self, cond):
     497          # Note that this test is sensitive to timing.  If the worker threads
     498          # don't execute in a timely fashion, the main thread may think they
     499          # are further along then they are.  The main thread therefore issues
     500          # _wait() statements to try to make sure that it doesn't race ahead
     501          # of the workers.
     502          # Secondly, this test assumes that condition variables are not subject
     503          # to spurious wakeups.  The absence of spurious wakeups is an implementation
     504          # detail of Condition Variables in current CPython, but in general, not
     505          # a guaranteed property of condition variables as a programming
     506          # construct.  In particular, it is possible that this can no longer
     507          # be conveniently guaranteed should their implementation ever change.
     508          N = 5
     509          ready = []
     510          results1 = []
     511          results2 = []
     512          phase_num = 0
     513          def f():
     514              cond.acquire()
     515              ready.append(phase_num)
     516              result = cond.wait()
     517              cond.release()
     518              results1.append((result, phase_num))
     519              cond.acquire()
     520              ready.append(phase_num)
     521              result = cond.wait()
     522              cond.release()
     523              results2.append((result, phase_num))
     524          b = Bunch(f, N)
     525          b.wait_for_started()
     526          # first wait, to ensure all workers settle into cond.wait() before
     527          # we continue. See issues #8799 and #30727.
     528          while len(ready) < 5:
     529              _wait()
     530          ready.clear()
     531          self.assertEqual(results1, [])
     532          # Notify 3 threads at first
     533          cond.acquire()
     534          cond.notify(3)
     535          _wait()
     536          phase_num = 1
     537          cond.release()
     538          while len(results1) < 3:
     539              _wait()
     540          self.assertEqual(results1, [(True, 1)] * 3)
     541          self.assertEqual(results2, [])
     542          # make sure all awaken workers settle into cond.wait()
     543          while len(ready) < 3:
     544              _wait()
     545          # Notify 5 threads: they might be in their first or second wait
     546          cond.acquire()
     547          cond.notify(5)
     548          _wait()
     549          phase_num = 2
     550          cond.release()
     551          while len(results1) + len(results2) < 8:
     552              _wait()
     553          self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2)
     554          self.assertEqual(results2, [(True, 2)] * 3)
     555          # make sure all workers settle into cond.wait()
     556          while len(ready) < 5:
     557              _wait()
     558          # Notify all threads: they are all in their second wait
     559          cond.acquire()
     560          cond.notify_all()
     561          _wait()
     562          phase_num = 3
     563          cond.release()
     564          while len(results2) < 5:
     565              _wait()
     566          self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2)
     567          self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2)
     568          b.wait_for_finished()
     569  
     570      def test_notify(self):
     571          cond = self.condtype()
     572          self._check_notify(cond)
     573          # A second time, to check internal state is still ok.
     574          self._check_notify(cond)
     575  
     576      def test_timeout(self):
     577          cond = self.condtype()
     578          results = []
     579          N = 5
     580          def f():
     581              cond.acquire()
     582              t1 = time.monotonic()
     583              result = cond.wait(0.5)
     584              t2 = time.monotonic()
     585              cond.release()
     586              results.append((t2 - t1, result))
     587          Bunch(f, N).wait_for_finished()
     588          self.assertEqual(len(results), N)
     589          for dt, result in results:
     590              self.assertTimeout(dt, 0.5)
     591              # Note that conceptually (that"s the condition variable protocol)
     592              # a wait() may succeed even if no one notifies us and before any
     593              # timeout occurs.  Spurious wakeups can occur.
     594              # This makes it hard to verify the result value.
     595              # In practice, this implementation has no spurious wakeups.
     596              self.assertFalse(result)
     597  
     598      def test_waitfor(self):
     599          cond = self.condtype()
     600          state = 0
     601          def f():
     602              with cond:
     603                  result = cond.wait_for(lambda : state==4)
     604                  self.assertTrue(result)
     605                  self.assertEqual(state, 4)
     606          b = Bunch(f, 1)
     607          b.wait_for_started()
     608          for i in range(4):
     609              time.sleep(0.01)
     610              with cond:
     611                  state += 1
     612                  cond.notify()
     613          b.wait_for_finished()
     614  
     615      def test_waitfor_timeout(self):
     616          cond = self.condtype()
     617          state = 0
     618          success = []
     619          def f():
     620              with cond:
     621                  dt = time.monotonic()
     622                  result = cond.wait_for(lambda : state==4, timeout=0.1)
     623                  dt = time.monotonic() - dt
     624                  self.assertFalse(result)
     625                  self.assertTimeout(dt, 0.1)
     626                  success.append(None)
     627          b = Bunch(f, 1)
     628          b.wait_for_started()
     629          # Only increment 3 times, so state == 4 is never reached.
     630          for i in range(3):
     631              time.sleep(0.01)
     632              with cond:
     633                  state += 1
     634                  cond.notify()
     635          b.wait_for_finished()
     636          self.assertEqual(len(success), 1)
     637  
     638  
     639  class ESC[4;38;5;81mBaseSemaphoreTests(ESC[4;38;5;149mBaseTestCase):
     640      """
     641      Common tests for {bounded, unbounded} semaphore objects.
     642      """
     643  
     644      def test_constructor(self):
     645          self.assertRaises(ValueError, self.semtype, value = -1)
     646          self.assertRaises(ValueError, self.semtype, value = -sys.maxsize)
     647  
     648      def test_acquire(self):
     649          sem = self.semtype(1)
     650          sem.acquire()
     651          sem.release()
     652          sem = self.semtype(2)
     653          sem.acquire()
     654          sem.acquire()
     655          sem.release()
     656          sem.release()
     657  
     658      def test_acquire_destroy(self):
     659          sem = self.semtype()
     660          sem.acquire()
     661          del sem
     662  
     663      def test_acquire_contended(self):
     664          sem = self.semtype(7)
     665          sem.acquire()
     666          N = 10
     667          sem_results = []
     668          results1 = []
     669          results2 = []
     670          phase_num = 0
     671          def f():
     672              sem_results.append(sem.acquire())
     673              results1.append(phase_num)
     674              sem_results.append(sem.acquire())
     675              results2.append(phase_num)
     676          b = Bunch(f, 10)
     677          b.wait_for_started()
     678          while len(results1) + len(results2) < 6:
     679              _wait()
     680          self.assertEqual(results1 + results2, [0] * 6)
     681          phase_num = 1
     682          for i in range(7):
     683              sem.release()
     684          while len(results1) + len(results2) < 13:
     685              _wait()
     686          self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
     687          phase_num = 2
     688          for i in range(6):
     689              sem.release()
     690          while len(results1) + len(results2) < 19:
     691              _wait()
     692          self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
     693          # The semaphore is still locked
     694          self.assertFalse(sem.acquire(False))
     695          # Final release, to let the last thread finish
     696          sem.release()
     697          b.wait_for_finished()
     698          self.assertEqual(sem_results, [True] * (6 + 7 + 6 + 1))
     699  
     700      def test_multirelease(self):
     701          sem = self.semtype(7)
     702          sem.acquire()
     703          results1 = []
     704          results2 = []
     705          phase_num = 0
     706          def f():
     707              sem.acquire()
     708              results1.append(phase_num)
     709              sem.acquire()
     710              results2.append(phase_num)
     711          b = Bunch(f, 10)
     712          b.wait_for_started()
     713          while len(results1) + len(results2) < 6:
     714              _wait()
     715          self.assertEqual(results1 + results2, [0] * 6)
     716          phase_num = 1
     717          sem.release(7)
     718          while len(results1) + len(results2) < 13:
     719              _wait()
     720          self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
     721          phase_num = 2
     722          sem.release(6)
     723          while len(results1) + len(results2) < 19:
     724              _wait()
     725          self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
     726          # The semaphore is still locked
     727          self.assertFalse(sem.acquire(False))
     728          # Final release, to let the last thread finish
     729          sem.release()
     730          b.wait_for_finished()
     731  
     732      def test_try_acquire(self):
     733          sem = self.semtype(2)
     734          self.assertTrue(sem.acquire(False))
     735          self.assertTrue(sem.acquire(False))
     736          self.assertFalse(sem.acquire(False))
     737          sem.release()
     738          self.assertTrue(sem.acquire(False))
     739  
     740      def test_try_acquire_contended(self):
     741          sem = self.semtype(4)
     742          sem.acquire()
     743          results = []
     744          def f():
     745              results.append(sem.acquire(False))
     746              results.append(sem.acquire(False))
     747          Bunch(f, 5).wait_for_finished()
     748          # There can be a thread switch between acquiring the semaphore and
     749          # appending the result, therefore results will not necessarily be
     750          # ordered.
     751          self.assertEqual(sorted(results), [False] * 7 + [True] *  3 )
     752  
     753      def test_acquire_timeout(self):
     754          sem = self.semtype(2)
     755          self.assertRaises(ValueError, sem.acquire, False, timeout=1.0)
     756          self.assertTrue(sem.acquire(timeout=0.005))
     757          self.assertTrue(sem.acquire(timeout=0.005))
     758          self.assertFalse(sem.acquire(timeout=0.005))
     759          sem.release()
     760          self.assertTrue(sem.acquire(timeout=0.005))
     761          t = time.monotonic()
     762          self.assertFalse(sem.acquire(timeout=0.5))
     763          dt = time.monotonic() - t
     764          self.assertTimeout(dt, 0.5)
     765  
     766      def test_default_value(self):
     767          # The default initial value is 1.
     768          sem = self.semtype()
     769          sem.acquire()
     770          def f():
     771              sem.acquire()
     772              sem.release()
     773          b = Bunch(f, 1)
     774          b.wait_for_started()
     775          _wait()
     776          self.assertFalse(b.finished)
     777          sem.release()
     778          b.wait_for_finished()
     779  
     780      def test_with(self):
     781          sem = self.semtype(2)
     782          def _with(err=None):
     783              with sem:
     784                  self.assertTrue(sem.acquire(False))
     785                  sem.release()
     786                  with sem:
     787                      self.assertFalse(sem.acquire(False))
     788                      if err:
     789                          raise err
     790          _with()
     791          self.assertTrue(sem.acquire(False))
     792          sem.release()
     793          self.assertRaises(TypeError, _with, TypeError)
     794          self.assertTrue(sem.acquire(False))
     795          sem.release()
     796  
     797  class ESC[4;38;5;81mSemaphoreTests(ESC[4;38;5;149mBaseSemaphoreTests):
     798      """
     799      Tests for unbounded semaphores.
     800      """
     801  
     802      def test_release_unacquired(self):
     803          # Unbounded releases are allowed and increment the semaphore's value
     804          sem = self.semtype(1)
     805          sem.release()
     806          sem.acquire()
     807          sem.acquire()
     808          sem.release()
     809  
     810      def test_repr(self):
     811          sem = self.semtype(3)
     812          self.assertRegex(repr(sem), r"<\w+\.Semaphore at .*: value=3>")
     813          sem.acquire()
     814          self.assertRegex(repr(sem), r"<\w+\.Semaphore at .*: value=2>")
     815          sem.release()
     816          sem.release()
     817          self.assertRegex(repr(sem), r"<\w+\.Semaphore at .*: value=4>")
     818  
     819  
     820  class ESC[4;38;5;81mBoundedSemaphoreTests(ESC[4;38;5;149mBaseSemaphoreTests):
     821      """
     822      Tests for bounded semaphores.
     823      """
     824  
     825      def test_release_unacquired(self):
     826          # Cannot go past the initial value
     827          sem = self.semtype()
     828          self.assertRaises(ValueError, sem.release)
     829          sem.acquire()
     830          sem.release()
     831          self.assertRaises(ValueError, sem.release)
     832  
     833      def test_repr(self):
     834          sem = self.semtype(3)
     835          self.assertRegex(repr(sem), r"<\w+\.BoundedSemaphore at .*: value=3/3>")
     836          sem.acquire()
     837          self.assertRegex(repr(sem), r"<\w+\.BoundedSemaphore at .*: value=2/3>")
     838  
     839  
     840  class ESC[4;38;5;81mBarrierTests(ESC[4;38;5;149mBaseTestCase):
     841      """
     842      Tests for Barrier objects.
     843      """
     844      N = 5
     845      defaultTimeout = 2.0
     846  
     847      def setUp(self):
     848          self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout)
     849      def tearDown(self):
     850          self.barrier.abort()
     851  
     852      def run_threads(self, f):
     853          b = Bunch(f, self.N-1)
     854          f()
     855          b.wait_for_finished()
     856  
     857      def multipass(self, results, n):
     858          m = self.barrier.parties
     859          self.assertEqual(m, self.N)
     860          for i in range(n):
     861              results[0].append(True)
     862              self.assertEqual(len(results[1]), i * m)
     863              self.barrier.wait()
     864              results[1].append(True)
     865              self.assertEqual(len(results[0]), (i + 1) * m)
     866              self.barrier.wait()
     867          self.assertEqual(self.barrier.n_waiting, 0)
     868          self.assertFalse(self.barrier.broken)
     869  
     870      def test_barrier(self, passes=1):
     871          """
     872          Test that a barrier is passed in lockstep
     873          """
     874          results = [[],[]]
     875          def f():
     876              self.multipass(results, passes)
     877          self.run_threads(f)
     878  
     879      def test_barrier_10(self):
     880          """
     881          Test that a barrier works for 10 consecutive runs
     882          """
     883          return self.test_barrier(10)
     884  
     885      def test_wait_return(self):
     886          """
     887          test the return value from barrier.wait
     888          """
     889          results = []
     890          def f():
     891              r = self.barrier.wait()
     892              results.append(r)
     893  
     894          self.run_threads(f)
     895          self.assertEqual(sum(results), sum(range(self.N)))
     896  
     897      def test_action(self):
     898          """
     899          Test the 'action' callback
     900          """
     901          results = []
     902          def action():
     903              results.append(True)
     904          barrier = self.barriertype(self.N, action)
     905          def f():
     906              barrier.wait()
     907              self.assertEqual(len(results), 1)
     908  
     909          self.run_threads(f)
     910  
     911      def test_abort(self):
     912          """
     913          Test that an abort will put the barrier in a broken state
     914          """
     915          results1 = []
     916          results2 = []
     917          def f():
     918              try:
     919                  i = self.barrier.wait()
     920                  if i == self.N//2:
     921                      raise RuntimeError
     922                  self.barrier.wait()
     923                  results1.append(True)
     924              except threading.BrokenBarrierError:
     925                  results2.append(True)
     926              except RuntimeError:
     927                  self.barrier.abort()
     928                  pass
     929  
     930          self.run_threads(f)
     931          self.assertEqual(len(results1), 0)
     932          self.assertEqual(len(results2), self.N-1)
     933          self.assertTrue(self.barrier.broken)
     934  
     935      def test_reset(self):
     936          """
     937          Test that a 'reset' on a barrier frees the waiting threads
     938          """
     939          results1 = []
     940          results2 = []
     941          results3 = []
     942          def f():
     943              i = self.barrier.wait()
     944              if i == self.N//2:
     945                  # Wait until the other threads are all in the barrier.
     946                  while self.barrier.n_waiting < self.N-1:
     947                      time.sleep(0.001)
     948                  self.barrier.reset()
     949              else:
     950                  try:
     951                      self.barrier.wait()
     952                      results1.append(True)
     953                  except threading.BrokenBarrierError:
     954                      results2.append(True)
     955              # Now, pass the barrier again
     956              self.barrier.wait()
     957              results3.append(True)
     958  
     959          self.run_threads(f)
     960          self.assertEqual(len(results1), 0)
     961          self.assertEqual(len(results2), self.N-1)
     962          self.assertEqual(len(results3), self.N)
     963  
     964  
     965      def test_abort_and_reset(self):
     966          """
     967          Test that a barrier can be reset after being broken.
     968          """
     969          results1 = []
     970          results2 = []
     971          results3 = []
     972          barrier2 = self.barriertype(self.N)
     973          def f():
     974              try:
     975                  i = self.barrier.wait()
     976                  if i == self.N//2:
     977                      raise RuntimeError
     978                  self.barrier.wait()
     979                  results1.append(True)
     980              except threading.BrokenBarrierError:
     981                  results2.append(True)
     982              except RuntimeError:
     983                  self.barrier.abort()
     984                  pass
     985              # Synchronize and reset the barrier.  Must synchronize first so
     986              # that everyone has left it when we reset, and after so that no
     987              # one enters it before the reset.
     988              if barrier2.wait() == self.N//2:
     989                  self.barrier.reset()
     990              barrier2.wait()
     991              self.barrier.wait()
     992              results3.append(True)
     993  
     994          self.run_threads(f)
     995          self.assertEqual(len(results1), 0)
     996          self.assertEqual(len(results2), self.N-1)
     997          self.assertEqual(len(results3), self.N)
     998  
     999      def test_timeout(self):
    1000          """
    1001          Test wait(timeout)
    1002          """
    1003          def f():
    1004              i = self.barrier.wait()
    1005              if i == self.N // 2:
    1006                  # One thread is late!
    1007                  time.sleep(1.0)
    1008              # Default timeout is 2.0, so this is shorter.
    1009              self.assertRaises(threading.BrokenBarrierError,
    1010                                self.barrier.wait, 0.5)
    1011          self.run_threads(f)
    1012  
    1013      def test_default_timeout(self):
    1014          """
    1015          Test the barrier's default timeout
    1016          """
    1017          # create a barrier with a low default timeout
    1018          barrier = self.barriertype(self.N, timeout=0.3)
    1019          def f():
    1020              i = barrier.wait()
    1021              if i == self.N // 2:
    1022                  # One thread is later than the default timeout of 0.3s.
    1023                  time.sleep(1.0)
    1024              self.assertRaises(threading.BrokenBarrierError, barrier.wait)
    1025          self.run_threads(f)
    1026  
    1027      def test_single_thread(self):
    1028          b = self.barriertype(1)
    1029          b.wait()
    1030          b.wait()
    1031  
    1032      def test_repr(self):
    1033          b = self.barriertype(3)
    1034          self.assertRegex(repr(b), r"<\w+\.Barrier at .*: waiters=0/3>")
    1035          def f():
    1036              b.wait(3)
    1037          bunch = Bunch(f, 2)
    1038          bunch.wait_for_started()
    1039          time.sleep(0.2)
    1040          self.assertRegex(repr(b), r"<\w+\.Barrier at .*: waiters=2/3>")
    1041          b.wait(3)
    1042          bunch.wait_for_finished()
    1043          self.assertRegex(repr(b), r"<\w+\.Barrier at .*: waiters=0/3>")
    1044          b.abort()
    1045          self.assertRegex(repr(b), r"<\w+\.Barrier at .*: broken>")