(root)/
Python-3.11.7/
Lib/
test/
test_asyncio/
test_locks.py
       1  """Tests for locks.py"""
       2  
       3  import unittest
       4  from unittest import mock
       5  import re
       6  
       7  import asyncio
       8  import collections
       9  
      10  STR_RGX_REPR = (
      11      r'^<(?P<class>.*?) object at (?P<address>.*?)'
      12      r'\[(?P<extras>'
      13      r'(set|unset|locked|unlocked|filling|draining|resetting|broken)'
      14      r'(, value:\d)?'
      15      r'(, waiters:\d+)?'
      16      r'(, waiters:\d+\/\d+)?' # barrier
      17      r')\]>\Z'
      18  )
      19  RGX_REPR = re.compile(STR_RGX_REPR)
      20  
      21  
      22  def tearDownModule():
      23      asyncio.set_event_loop_policy(None)
      24  
      25  
      26  class ESC[4;38;5;81mLockTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
      27  
      28      async def test_repr(self):
      29          lock = asyncio.Lock()
      30          self.assertTrue(repr(lock).endswith('[unlocked]>'))
      31          self.assertTrue(RGX_REPR.match(repr(lock)))
      32  
      33          await lock.acquire()
      34          self.assertTrue(repr(lock).endswith('[locked]>'))
      35          self.assertTrue(RGX_REPR.match(repr(lock)))
      36  
      37      async def test_lock(self):
      38          lock = asyncio.Lock()
      39  
      40          with self.assertRaisesRegex(
      41              TypeError,
      42              "object Lock can't be used in 'await' expression"
      43          ):
      44              await lock
      45  
      46          self.assertFalse(lock.locked())
      47  
      48      async def test_lock_doesnt_accept_loop_parameter(self):
      49          primitives_cls = [
      50              asyncio.Lock,
      51              asyncio.Condition,
      52              asyncio.Event,
      53              asyncio.Semaphore,
      54              asyncio.BoundedSemaphore,
      55          ]
      56  
      57          loop = asyncio.get_running_loop()
      58  
      59          for cls in primitives_cls:
      60              with self.assertRaisesRegex(
      61                  TypeError,
      62                  rf"{cls.__name__}\.__init__\(\) got an unexpected "
      63                  rf"keyword argument 'loop'"
      64              ):
      65                  cls(loop=loop)
      66  
      67      async def test_lock_by_with_statement(self):
      68          primitives = [
      69              asyncio.Lock(),
      70              asyncio.Condition(),
      71              asyncio.Semaphore(),
      72              asyncio.BoundedSemaphore(),
      73          ]
      74  
      75          for lock in primitives:
      76              await asyncio.sleep(0.01)
      77              self.assertFalse(lock.locked())
      78              with self.assertRaisesRegex(
      79                  TypeError,
      80                  r"object \w+ can't be used in 'await' expression"
      81              ):
      82                  with await lock:
      83                      pass
      84              self.assertFalse(lock.locked())
      85  
      86      async def test_acquire(self):
      87          lock = asyncio.Lock()
      88          result = []
      89  
      90          self.assertTrue(await lock.acquire())
      91  
      92          async def c1(result):
      93              if await lock.acquire():
      94                  result.append(1)
      95              return True
      96  
      97          async def c2(result):
      98              if await lock.acquire():
      99                  result.append(2)
     100              return True
     101  
     102          async def c3(result):
     103              if await lock.acquire():
     104                  result.append(3)
     105              return True
     106  
     107          t1 = asyncio.create_task(c1(result))
     108          t2 = asyncio.create_task(c2(result))
     109  
     110          await asyncio.sleep(0)
     111          self.assertEqual([], result)
     112  
     113          lock.release()
     114          await asyncio.sleep(0)
     115          self.assertEqual([1], result)
     116  
     117          await asyncio.sleep(0)
     118          self.assertEqual([1], result)
     119  
     120          t3 = asyncio.create_task(c3(result))
     121  
     122          lock.release()
     123          await asyncio.sleep(0)
     124          self.assertEqual([1, 2], result)
     125  
     126          lock.release()
     127          await asyncio.sleep(0)
     128          self.assertEqual([1, 2, 3], result)
     129  
     130          self.assertTrue(t1.done())
     131          self.assertTrue(t1.result())
     132          self.assertTrue(t2.done())
     133          self.assertTrue(t2.result())
     134          self.assertTrue(t3.done())
     135          self.assertTrue(t3.result())
     136  
     137      async def test_acquire_cancel(self):
     138          lock = asyncio.Lock()
     139          self.assertTrue(await lock.acquire())
     140  
     141          task = asyncio.create_task(lock.acquire())
     142          asyncio.get_running_loop().call_soon(task.cancel)
     143          with self.assertRaises(asyncio.CancelledError):
     144              await task
     145          self.assertFalse(lock._waiters)
     146  
     147      async def test_cancel_race(self):
     148          # Several tasks:
     149          # - A acquires the lock
     150          # - B is blocked in acquire()
     151          # - C is blocked in acquire()
     152          #
     153          # Now, concurrently:
     154          # - B is cancelled
     155          # - A releases the lock
     156          #
     157          # If B's waiter is marked cancelled but not yet removed from
     158          # _waiters, A's release() call will crash when trying to set
     159          # B's waiter; instead, it should move on to C's waiter.
     160  
     161          # Setup: A has the lock, b and c are waiting.
     162          lock = asyncio.Lock()
     163  
     164          async def lockit(name, blocker):
     165              await lock.acquire()
     166              try:
     167                  if blocker is not None:
     168                      await blocker
     169              finally:
     170                  lock.release()
     171  
     172          fa = asyncio.get_running_loop().create_future()
     173          ta = asyncio.create_task(lockit('A', fa))
     174          await asyncio.sleep(0)
     175          self.assertTrue(lock.locked())
     176          tb = asyncio.create_task(lockit('B', None))
     177          await asyncio.sleep(0)
     178          self.assertEqual(len(lock._waiters), 1)
     179          tc = asyncio.create_task(lockit('C', None))
     180          await asyncio.sleep(0)
     181          self.assertEqual(len(lock._waiters), 2)
     182  
     183          # Create the race and check.
     184          # Without the fix this failed at the last assert.
     185          fa.set_result(None)
     186          tb.cancel()
     187          self.assertTrue(lock._waiters[0].cancelled())
     188          await asyncio.sleep(0)
     189          self.assertFalse(lock.locked())
     190          self.assertTrue(ta.done())
     191          self.assertTrue(tb.cancelled())
     192          await tc
     193  
     194      async def test_cancel_release_race(self):
     195          # Issue 32734
     196          # Acquire 4 locks, cancel second, release first
     197          # and 2 locks are taken at once.
     198          loop = asyncio.get_running_loop()
     199          lock = asyncio.Lock()
     200          lock_count = 0
     201          call_count = 0
     202  
     203          async def lockit():
     204              nonlocal lock_count
     205              nonlocal call_count
     206              call_count += 1
     207              await lock.acquire()
     208              lock_count += 1
     209  
     210          def trigger():
     211              t1.cancel()
     212              lock.release()
     213  
     214          await lock.acquire()
     215  
     216          t1 = asyncio.create_task(lockit())
     217          t2 = asyncio.create_task(lockit())
     218          t3 = asyncio.create_task(lockit())
     219  
     220          # Start scheduled tasks
     221          await asyncio.sleep(0)
     222  
     223          loop.call_soon(trigger)
     224          with self.assertRaises(asyncio.CancelledError):
     225              # Wait for cancellation
     226              await t1
     227  
     228          # Make sure only one lock was taken
     229          self.assertEqual(lock_count, 1)
     230          # While 3 calls were made to lockit()
     231          self.assertEqual(call_count, 3)
     232          self.assertTrue(t1.cancelled() and t2.done())
     233  
     234          # Cleanup the task that is stuck on acquire.
     235          t3.cancel()
     236          await asyncio.sleep(0)
     237          self.assertTrue(t3.cancelled())
     238  
     239      async def test_finished_waiter_cancelled(self):
     240          lock = asyncio.Lock()
     241  
     242          await lock.acquire()
     243          self.assertTrue(lock.locked())
     244  
     245          tb = asyncio.create_task(lock.acquire())
     246          await asyncio.sleep(0)
     247          self.assertEqual(len(lock._waiters), 1)
     248  
     249          # Create a second waiter, wake up the first, and cancel it.
     250          # Without the fix, the second was not woken up.
     251          tc = asyncio.create_task(lock.acquire())
     252          tb.cancel()
     253          lock.release()
     254          await asyncio.sleep(0)
     255  
     256          self.assertTrue(lock.locked())
     257          self.assertTrue(tb.cancelled())
     258  
     259          # Cleanup
     260          await tc
     261  
     262      async def test_release_not_acquired(self):
     263          lock = asyncio.Lock()
     264  
     265          self.assertRaises(RuntimeError, lock.release)
     266  
     267      async def test_release_no_waiters(self):
     268          lock = asyncio.Lock()
     269          await lock.acquire()
     270          self.assertTrue(lock.locked())
     271  
     272          lock.release()
     273          self.assertFalse(lock.locked())
     274  
     275      async def test_context_manager(self):
     276          lock = asyncio.Lock()
     277          self.assertFalse(lock.locked())
     278  
     279          async with lock:
     280              self.assertTrue(lock.locked())
     281  
     282          self.assertFalse(lock.locked())
     283  
     284  
     285  class ESC[4;38;5;81mEventTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
     286  
     287      def test_repr(self):
     288          ev = asyncio.Event()
     289          self.assertTrue(repr(ev).endswith('[unset]>'))
     290          match = RGX_REPR.match(repr(ev))
     291          self.assertEqual(match.group('extras'), 'unset')
     292  
     293          ev.set()
     294          self.assertTrue(repr(ev).endswith('[set]>'))
     295          self.assertTrue(RGX_REPR.match(repr(ev)))
     296  
     297          ev._waiters.append(mock.Mock())
     298          self.assertTrue('waiters:1' in repr(ev))
     299          self.assertTrue(RGX_REPR.match(repr(ev)))
     300  
     301      async def test_wait(self):
     302          ev = asyncio.Event()
     303          self.assertFalse(ev.is_set())
     304  
     305          result = []
     306  
     307          async def c1(result):
     308              if await ev.wait():
     309                  result.append(1)
     310  
     311          async def c2(result):
     312              if await ev.wait():
     313                  result.append(2)
     314  
     315          async def c3(result):
     316              if await ev.wait():
     317                  result.append(3)
     318  
     319          t1 = asyncio.create_task(c1(result))
     320          t2 = asyncio.create_task(c2(result))
     321  
     322          await asyncio.sleep(0)
     323          self.assertEqual([], result)
     324  
     325          t3 = asyncio.create_task(c3(result))
     326  
     327          ev.set()
     328          await asyncio.sleep(0)
     329          self.assertEqual([3, 1, 2], result)
     330  
     331          self.assertTrue(t1.done())
     332          self.assertIsNone(t1.result())
     333          self.assertTrue(t2.done())
     334          self.assertIsNone(t2.result())
     335          self.assertTrue(t3.done())
     336          self.assertIsNone(t3.result())
     337  
     338      async def test_wait_on_set(self):
     339          ev = asyncio.Event()
     340          ev.set()
     341  
     342          res = await ev.wait()
     343          self.assertTrue(res)
     344  
     345      async def test_wait_cancel(self):
     346          ev = asyncio.Event()
     347  
     348          wait = asyncio.create_task(ev.wait())
     349          asyncio.get_running_loop().call_soon(wait.cancel)
     350          with self.assertRaises(asyncio.CancelledError):
     351              await wait
     352          self.assertFalse(ev._waiters)
     353  
     354      async def test_clear(self):
     355          ev = asyncio.Event()
     356          self.assertFalse(ev.is_set())
     357  
     358          ev.set()
     359          self.assertTrue(ev.is_set())
     360  
     361          ev.clear()
     362          self.assertFalse(ev.is_set())
     363  
     364      async def test_clear_with_waiters(self):
     365          ev = asyncio.Event()
     366          result = []
     367  
     368          async def c1(result):
     369              if await ev.wait():
     370                  result.append(1)
     371              return True
     372  
     373          t = asyncio.create_task(c1(result))
     374          await asyncio.sleep(0)
     375          self.assertEqual([], result)
     376  
     377          ev.set()
     378          ev.clear()
     379          self.assertFalse(ev.is_set())
     380  
     381          ev.set()
     382          ev.set()
     383          self.assertEqual(1, len(ev._waiters))
     384  
     385          await asyncio.sleep(0)
     386          self.assertEqual([1], result)
     387          self.assertEqual(0, len(ev._waiters))
     388  
     389          self.assertTrue(t.done())
     390          self.assertTrue(t.result())
     391  
     392  
     393  class ESC[4;38;5;81mConditionTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
     394  
     395      async def test_wait(self):
     396          cond = asyncio.Condition()
     397          result = []
     398  
     399          async def c1(result):
     400              await cond.acquire()
     401              if await cond.wait():
     402                  result.append(1)
     403              return True
     404  
     405          async def c2(result):
     406              await cond.acquire()
     407              if await cond.wait():
     408                  result.append(2)
     409              return True
     410  
     411          async def c3(result):
     412              await cond.acquire()
     413              if await cond.wait():
     414                  result.append(3)
     415              return True
     416  
     417          t1 = asyncio.create_task(c1(result))
     418          t2 = asyncio.create_task(c2(result))
     419          t3 = asyncio.create_task(c3(result))
     420  
     421          await asyncio.sleep(0)
     422          self.assertEqual([], result)
     423          self.assertFalse(cond.locked())
     424  
     425          self.assertTrue(await cond.acquire())
     426          cond.notify()
     427          await asyncio.sleep(0)
     428          self.assertEqual([], result)
     429          self.assertTrue(cond.locked())
     430  
     431          cond.release()
     432          await asyncio.sleep(0)
     433          self.assertEqual([1], result)
     434          self.assertTrue(cond.locked())
     435  
     436          cond.notify(2)
     437          await asyncio.sleep(0)
     438          self.assertEqual([1], result)
     439          self.assertTrue(cond.locked())
     440  
     441          cond.release()
     442          await asyncio.sleep(0)
     443          self.assertEqual([1, 2], result)
     444          self.assertTrue(cond.locked())
     445  
     446          cond.release()
     447          await asyncio.sleep(0)
     448          self.assertEqual([1, 2, 3], result)
     449          self.assertTrue(cond.locked())
     450  
     451          self.assertTrue(t1.done())
     452          self.assertTrue(t1.result())
     453          self.assertTrue(t2.done())
     454          self.assertTrue(t2.result())
     455          self.assertTrue(t3.done())
     456          self.assertTrue(t3.result())
     457  
     458      async def test_wait_cancel(self):
     459          cond = asyncio.Condition()
     460          await cond.acquire()
     461  
     462          wait = asyncio.create_task(cond.wait())
     463          asyncio.get_running_loop().call_soon(wait.cancel)
     464          with self.assertRaises(asyncio.CancelledError):
     465              await wait
     466          self.assertFalse(cond._waiters)
     467          self.assertTrue(cond.locked())
     468  
     469      async def test_wait_cancel_contested(self):
     470          cond = asyncio.Condition()
     471  
     472          await cond.acquire()
     473          self.assertTrue(cond.locked())
     474  
     475          wait_task = asyncio.create_task(cond.wait())
     476          await asyncio.sleep(0)
     477          self.assertFalse(cond.locked())
     478  
     479          # Notify, but contest the lock before cancelling
     480          await cond.acquire()
     481          self.assertTrue(cond.locked())
     482          cond.notify()
     483          asyncio.get_running_loop().call_soon(wait_task.cancel)
     484          asyncio.get_running_loop().call_soon(cond.release)
     485  
     486          try:
     487              await wait_task
     488          except asyncio.CancelledError:
     489              # Should not happen, since no cancellation points
     490              pass
     491  
     492          self.assertTrue(cond.locked())
     493  
     494      async def test_wait_cancel_after_notify(self):
     495          # See bpo-32841
     496          waited = False
     497  
     498          cond = asyncio.Condition()
     499  
     500          async def wait_on_cond():
     501              nonlocal waited
     502              async with cond:
     503                  waited = True  # Make sure this area was reached
     504                  await cond.wait()
     505  
     506          waiter = asyncio.create_task(wait_on_cond())
     507          await asyncio.sleep(0)  # Start waiting
     508  
     509          await cond.acquire()
     510          cond.notify()
     511          await asyncio.sleep(0)  # Get to acquire()
     512          waiter.cancel()
     513          await asyncio.sleep(0)  # Activate cancellation
     514          cond.release()
     515          await asyncio.sleep(0)  # Cancellation should occur
     516  
     517          self.assertTrue(waiter.cancelled())
     518          self.assertTrue(waited)
     519  
     520      async def test_wait_unacquired(self):
     521          cond = asyncio.Condition()
     522          with self.assertRaises(RuntimeError):
     523              await cond.wait()
     524  
     525      async def test_wait_for(self):
     526          cond = asyncio.Condition()
     527          presult = False
     528  
     529          def predicate():
     530              return presult
     531  
     532          result = []
     533  
     534          async def c1(result):
     535              await cond.acquire()
     536              if await cond.wait_for(predicate):
     537                  result.append(1)
     538                  cond.release()
     539              return True
     540  
     541          t = asyncio.create_task(c1(result))
     542  
     543          await asyncio.sleep(0)
     544          self.assertEqual([], result)
     545  
     546          await cond.acquire()
     547          cond.notify()
     548          cond.release()
     549          await asyncio.sleep(0)
     550          self.assertEqual([], result)
     551  
     552          presult = True
     553          await cond.acquire()
     554          cond.notify()
     555          cond.release()
     556          await asyncio.sleep(0)
     557          self.assertEqual([1], result)
     558  
     559          self.assertTrue(t.done())
     560          self.assertTrue(t.result())
     561  
     562      async def test_wait_for_unacquired(self):
     563          cond = asyncio.Condition()
     564  
     565          # predicate can return true immediately
     566          res = await cond.wait_for(lambda: [1, 2, 3])
     567          self.assertEqual([1, 2, 3], res)
     568  
     569          with self.assertRaises(RuntimeError):
     570              await cond.wait_for(lambda: False)
     571  
     572      async def test_notify(self):
     573          cond = asyncio.Condition()
     574          result = []
     575  
     576          async def c1(result):
     577              await cond.acquire()
     578              if await cond.wait():
     579                  result.append(1)
     580                  cond.release()
     581              return True
     582  
     583          async def c2(result):
     584              await cond.acquire()
     585              if await cond.wait():
     586                  result.append(2)
     587                  cond.release()
     588              return True
     589  
     590          async def c3(result):
     591              await cond.acquire()
     592              if await cond.wait():
     593                  result.append(3)
     594                  cond.release()
     595              return True
     596  
     597          t1 = asyncio.create_task(c1(result))
     598          t2 = asyncio.create_task(c2(result))
     599          t3 = asyncio.create_task(c3(result))
     600  
     601          await asyncio.sleep(0)
     602          self.assertEqual([], result)
     603  
     604          await cond.acquire()
     605          cond.notify(1)
     606          cond.release()
     607          await asyncio.sleep(0)
     608          self.assertEqual([1], result)
     609  
     610          await cond.acquire()
     611          cond.notify(1)
     612          cond.notify(2048)
     613          cond.release()
     614          await asyncio.sleep(0)
     615          self.assertEqual([1, 2, 3], result)
     616  
     617          self.assertTrue(t1.done())
     618          self.assertTrue(t1.result())
     619          self.assertTrue(t2.done())
     620          self.assertTrue(t2.result())
     621          self.assertTrue(t3.done())
     622          self.assertTrue(t3.result())
     623  
     624      async def test_notify_all(self):
     625          cond = asyncio.Condition()
     626  
     627          result = []
     628  
     629          async def c1(result):
     630              await cond.acquire()
     631              if await cond.wait():
     632                  result.append(1)
     633                  cond.release()
     634              return True
     635  
     636          async def c2(result):
     637              await cond.acquire()
     638              if await cond.wait():
     639                  result.append(2)
     640                  cond.release()
     641              return True
     642  
     643          t1 = asyncio.create_task(c1(result))
     644          t2 = asyncio.create_task(c2(result))
     645  
     646          await asyncio.sleep(0)
     647          self.assertEqual([], result)
     648  
     649          await cond.acquire()
     650          cond.notify_all()
     651          cond.release()
     652          await asyncio.sleep(0)
     653          self.assertEqual([1, 2], result)
     654  
     655          self.assertTrue(t1.done())
     656          self.assertTrue(t1.result())
     657          self.assertTrue(t2.done())
     658          self.assertTrue(t2.result())
     659  
     660      def test_notify_unacquired(self):
     661          cond = asyncio.Condition()
     662          self.assertRaises(RuntimeError, cond.notify)
     663  
     664      def test_notify_all_unacquired(self):
     665          cond = asyncio.Condition()
     666          self.assertRaises(RuntimeError, cond.notify_all)
     667  
     668      async def test_repr(self):
     669          cond = asyncio.Condition()
     670          self.assertTrue('unlocked' in repr(cond))
     671          self.assertTrue(RGX_REPR.match(repr(cond)))
     672  
     673          await cond.acquire()
     674          self.assertTrue('locked' in repr(cond))
     675  
     676          cond._waiters.append(mock.Mock())
     677          self.assertTrue('waiters:1' in repr(cond))
     678          self.assertTrue(RGX_REPR.match(repr(cond)))
     679  
     680          cond._waiters.append(mock.Mock())
     681          self.assertTrue('waiters:2' in repr(cond))
     682          self.assertTrue(RGX_REPR.match(repr(cond)))
     683  
     684      async def test_context_manager(self):
     685          cond = asyncio.Condition()
     686          self.assertFalse(cond.locked())
     687          async with cond:
     688              self.assertTrue(cond.locked())
     689          self.assertFalse(cond.locked())
     690  
     691      async def test_explicit_lock(self):
     692          async def f(lock=None, cond=None):
     693              if lock is None:
     694                  lock = asyncio.Lock()
     695              if cond is None:
     696                  cond = asyncio.Condition(lock)
     697              self.assertIs(cond._lock, lock)
     698              self.assertFalse(lock.locked())
     699              self.assertFalse(cond.locked())
     700              async with cond:
     701                  self.assertTrue(lock.locked())
     702                  self.assertTrue(cond.locked())
     703              self.assertFalse(lock.locked())
     704              self.assertFalse(cond.locked())
     705              async with lock:
     706                  self.assertTrue(lock.locked())
     707                  self.assertTrue(cond.locked())
     708              self.assertFalse(lock.locked())
     709              self.assertFalse(cond.locked())
     710  
     711          # All should work in the same way.
     712          await f()
     713          await f(asyncio.Lock())
     714          lock = asyncio.Lock()
     715          await f(lock, asyncio.Condition(lock))
     716  
     717      async def test_ambiguous_loops(self):
     718          loop = asyncio.new_event_loop()
     719          self.addCleanup(loop.close)
     720  
     721          async def wrong_loop_in_lock():
     722              with self.assertRaises(TypeError):
     723                  asyncio.Lock(loop=loop)  # actively disallowed since 3.10
     724              lock = asyncio.Lock()
     725              lock._loop = loop  # use private API for testing
     726              async with lock:
     727                  # acquired immediately via the fast-path
     728                  # without interaction with any event loop.
     729                  cond = asyncio.Condition(lock)
     730                  # cond.acquire() will trigger waiting on the lock
     731                  # and it will discover the event loop mismatch.
     732                  with self.assertRaisesRegex(
     733                      RuntimeError,
     734                      "is bound to a different event loop",
     735                  ):
     736                      await cond.acquire()
     737  
     738          async def wrong_loop_in_cond():
     739              # Same analogy here with the condition's loop.
     740              lock = asyncio.Lock()
     741              async with lock:
     742                  with self.assertRaises(TypeError):
     743                      asyncio.Condition(lock, loop=loop)
     744                  cond = asyncio.Condition(lock)
     745                  cond._loop = loop
     746                  with self.assertRaisesRegex(
     747                      RuntimeError,
     748                      "is bound to a different event loop",
     749                  ):
     750                      await cond.wait()
     751  
     752          await wrong_loop_in_lock()
     753          await wrong_loop_in_cond()
     754  
     755      async def test_timeout_in_block(self):
     756          condition = asyncio.Condition()
     757          async with condition:
     758              with self.assertRaises(asyncio.TimeoutError):
     759                  await asyncio.wait_for(condition.wait(), timeout=0.5)
     760  
     761  
     762  class ESC[4;38;5;81mSemaphoreTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
     763  
     764      def test_initial_value_zero(self):
     765          sem = asyncio.Semaphore(0)
     766          self.assertTrue(sem.locked())
     767  
     768      async def test_repr(self):
     769          sem = asyncio.Semaphore()
     770          self.assertTrue(repr(sem).endswith('[unlocked, value:1]>'))
     771          self.assertTrue(RGX_REPR.match(repr(sem)))
     772  
     773          await sem.acquire()
     774          self.assertTrue(repr(sem).endswith('[locked]>'))
     775          self.assertTrue('waiters' not in repr(sem))
     776          self.assertTrue(RGX_REPR.match(repr(sem)))
     777  
     778          if sem._waiters is None:
     779              sem._waiters = collections.deque()
     780  
     781          sem._waiters.append(mock.Mock())
     782          self.assertTrue('waiters:1' in repr(sem))
     783          self.assertTrue(RGX_REPR.match(repr(sem)))
     784  
     785          sem._waiters.append(mock.Mock())
     786          self.assertTrue('waiters:2' in repr(sem))
     787          self.assertTrue(RGX_REPR.match(repr(sem)))
     788  
     789      async def test_semaphore(self):
     790          sem = asyncio.Semaphore()
     791          self.assertEqual(1, sem._value)
     792  
     793          with self.assertRaisesRegex(
     794              TypeError,
     795              "object Semaphore can't be used in 'await' expression",
     796          ):
     797              await sem
     798  
     799          self.assertFalse(sem.locked())
     800          self.assertEqual(1, sem._value)
     801  
     802      def test_semaphore_value(self):
     803          self.assertRaises(ValueError, asyncio.Semaphore, -1)
     804  
     805      async def test_acquire(self):
     806          sem = asyncio.Semaphore(3)
     807          result = []
     808  
     809          self.assertTrue(await sem.acquire())
     810          self.assertTrue(await sem.acquire())
     811          self.assertFalse(sem.locked())
     812  
     813          async def c1(result):
     814              await sem.acquire()
     815              result.append(1)
     816              return True
     817  
     818          async def c2(result):
     819              await sem.acquire()
     820              result.append(2)
     821              return True
     822  
     823          async def c3(result):
     824              await sem.acquire()
     825              result.append(3)
     826              return True
     827  
     828          async def c4(result):
     829              await sem.acquire()
     830              result.append(4)
     831              return True
     832  
     833          t1 = asyncio.create_task(c1(result))
     834          t2 = asyncio.create_task(c2(result))
     835          t3 = asyncio.create_task(c3(result))
     836  
     837          await asyncio.sleep(0)
     838          self.assertEqual([1], result)
     839          self.assertTrue(sem.locked())
     840          self.assertEqual(2, len(sem._waiters))
     841          self.assertEqual(0, sem._value)
     842  
     843          t4 = asyncio.create_task(c4(result))
     844  
     845          sem.release()
     846          sem.release()
     847          self.assertEqual(0, sem._value)
     848  
     849          await asyncio.sleep(0)
     850          self.assertEqual(0, sem._value)
     851          self.assertEqual(3, len(result))
     852          self.assertTrue(sem.locked())
     853          self.assertEqual(1, len(sem._waiters))
     854          self.assertEqual(0, sem._value)
     855  
     856          self.assertTrue(t1.done())
     857          self.assertTrue(t1.result())
     858          race_tasks = [t2, t3, t4]
     859          done_tasks = [t for t in race_tasks if t.done() and t.result()]
     860          self.assertEqual(2, len(done_tasks))
     861  
     862          # cleanup locked semaphore
     863          sem.release()
     864          await asyncio.gather(*race_tasks)
     865  
     866      async def test_acquire_cancel(self):
     867          sem = asyncio.Semaphore()
     868          await sem.acquire()
     869  
     870          acquire = asyncio.create_task(sem.acquire())
     871          asyncio.get_running_loop().call_soon(acquire.cancel)
     872          with self.assertRaises(asyncio.CancelledError):
     873              await acquire
     874          self.assertTrue((not sem._waiters) or
     875                          all(waiter.done() for waiter in sem._waiters))
     876  
     877      async def test_acquire_cancel_before_awoken(self):
     878          sem = asyncio.Semaphore(value=0)
     879  
     880          t1 = asyncio.create_task(sem.acquire())
     881          t2 = asyncio.create_task(sem.acquire())
     882          t3 = asyncio.create_task(sem.acquire())
     883          t4 = asyncio.create_task(sem.acquire())
     884  
     885          await asyncio.sleep(0)
     886  
     887          t1.cancel()
     888          t2.cancel()
     889          sem.release()
     890  
     891          await asyncio.sleep(0)
     892          await asyncio.sleep(0)
     893          num_done = sum(t.done() for t in [t3, t4])
     894          self.assertEqual(num_done, 1)
     895          self.assertTrue(t3.done())
     896          self.assertFalse(t4.done())
     897  
     898          t3.cancel()
     899          t4.cancel()
     900          await asyncio.sleep(0)
     901  
     902      async def test_acquire_hang(self):
     903          sem = asyncio.Semaphore(value=0)
     904  
     905          t1 = asyncio.create_task(sem.acquire())
     906          t2 = asyncio.create_task(sem.acquire())
     907          await asyncio.sleep(0)
     908  
     909          t1.cancel()
     910          sem.release()
     911          await asyncio.sleep(0)
     912          await asyncio.sleep(0)
     913          self.assertTrue(sem.locked())
     914          self.assertTrue(t2.done())
     915  
     916      async def test_acquire_no_hang(self):
     917  
     918          sem = asyncio.Semaphore(1)
     919  
     920          async def c1():
     921              async with sem:
     922                  await asyncio.sleep(0)
     923              t2.cancel()
     924  
     925          async def c2():
     926              async with sem:
     927                  self.assertFalse(True)
     928  
     929          t1 = asyncio.create_task(c1())
     930          t2 = asyncio.create_task(c2())
     931  
     932          r1, r2 = await asyncio.gather(t1, t2, return_exceptions=True)
     933          self.assertTrue(r1 is None)
     934          self.assertTrue(isinstance(r2, asyncio.CancelledError))
     935  
     936          await asyncio.wait_for(sem.acquire(), timeout=1.0)
     937  
     938      def test_release_not_acquired(self):
     939          sem = asyncio.BoundedSemaphore()
     940  
     941          self.assertRaises(ValueError, sem.release)
     942  
     943      async def test_release_no_waiters(self):
     944          sem = asyncio.Semaphore()
     945          await sem.acquire()
     946          self.assertTrue(sem.locked())
     947  
     948          sem.release()
     949          self.assertFalse(sem.locked())
     950  
     951      async def test_acquire_fifo_order(self):
     952          sem = asyncio.Semaphore(1)
     953          result = []
     954  
     955          async def coro(tag):
     956              await sem.acquire()
     957              result.append(f'{tag}_1')
     958              await asyncio.sleep(0.01)
     959              sem.release()
     960  
     961              await sem.acquire()
     962              result.append(f'{tag}_2')
     963              await asyncio.sleep(0.01)
     964              sem.release()
     965  
     966          async with asyncio.TaskGroup() as tg:
     967              tg.create_task(coro('c1'))
     968              tg.create_task(coro('c2'))
     969              tg.create_task(coro('c3'))
     970  
     971          self.assertEqual(
     972              ['c1_1', 'c2_1', 'c3_1', 'c1_2', 'c2_2', 'c3_2'],
     973              result
     974          )
     975  
     976      async def test_acquire_fifo_order_2(self):
     977          sem = asyncio.Semaphore(1)
     978          result = []
     979  
     980          async def c1(result):
     981              await sem.acquire()
     982              result.append(1)
     983              return True
     984  
     985          async def c2(result):
     986              await sem.acquire()
     987              result.append(2)
     988              sem.release()
     989              await sem.acquire()
     990              result.append(4)
     991              return True
     992  
     993          async def c3(result):
     994              await sem.acquire()
     995              result.append(3)
     996              return True
     997  
     998          t1 = asyncio.create_task(c1(result))
     999          t2 = asyncio.create_task(c2(result))
    1000          t3 = asyncio.create_task(c3(result))
    1001  
    1002          await asyncio.sleep(0)
    1003  
    1004          sem.release()
    1005          sem.release()
    1006  
    1007          tasks = [t1, t2, t3]
    1008          await asyncio.gather(*tasks)
    1009          self.assertEqual([1, 2, 3, 4], result)
    1010  
    1011      async def test_acquire_fifo_order_3(self):
    1012          sem = asyncio.Semaphore(0)
    1013          result = []
    1014  
    1015          async def c1(result):
    1016              await sem.acquire()
    1017              result.append(1)
    1018              return True
    1019  
    1020          async def c2(result):
    1021              await sem.acquire()
    1022              result.append(2)
    1023              return True
    1024  
    1025          async def c3(result):
    1026              await sem.acquire()
    1027              result.append(3)
    1028              return True
    1029  
    1030          t1 = asyncio.create_task(c1(result))
    1031          t2 = asyncio.create_task(c2(result))
    1032          t3 = asyncio.create_task(c3(result))
    1033  
    1034          await asyncio.sleep(0)
    1035  
    1036          t1.cancel()
    1037  
    1038          await asyncio.sleep(0)
    1039  
    1040          sem.release()
    1041          sem.release()
    1042  
    1043          tasks = [t1, t2, t3]
    1044          await asyncio.gather(*tasks, return_exceptions=True)
    1045          self.assertEqual([2, 3], result)
    1046  
    1047  
    1048  class ESC[4;38;5;81mBarrierTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
    1049  
    1050      async def asyncSetUp(self):
    1051          await super().asyncSetUp()
    1052          self.N = 5
    1053  
    1054      def make_tasks(self, n, coro):
    1055          tasks = [asyncio.create_task(coro()) for _ in range(n)]
    1056          return tasks
    1057  
    1058      async def gather_tasks(self, n, coro):
    1059          tasks = self.make_tasks(n, coro)
    1060          res = await asyncio.gather(*tasks)
    1061          return res, tasks
    1062  
    1063      async def test_barrier(self):
    1064          barrier = asyncio.Barrier(self.N)
    1065          self.assertIn("filling", repr(barrier))
    1066          with self.assertRaisesRegex(
    1067              TypeError,
    1068              "object Barrier can't be used in 'await' expression",
    1069          ):
    1070              await barrier
    1071  
    1072          self.assertIn("filling", repr(barrier))
    1073  
    1074      async def test_repr(self):
    1075          barrier = asyncio.Barrier(self.N)
    1076  
    1077          self.assertTrue(RGX_REPR.match(repr(barrier)))
    1078          self.assertIn("filling", repr(barrier))
    1079  
    1080          waiters = []
    1081          async def wait(barrier):
    1082              await barrier.wait()
    1083  
    1084          incr = 2
    1085          for i in range(incr):
    1086              waiters.append(asyncio.create_task(wait(barrier)))
    1087          await asyncio.sleep(0)
    1088  
    1089          self.assertTrue(RGX_REPR.match(repr(barrier)))
    1090          self.assertTrue(f"waiters:{incr}/{self.N}" in repr(barrier))
    1091          self.assertIn("filling", repr(barrier))
    1092  
    1093          # create missing waiters
    1094          for i in range(barrier.parties - barrier.n_waiting):
    1095              waiters.append(asyncio.create_task(wait(barrier)))
    1096          await asyncio.sleep(0)
    1097  
    1098          self.assertTrue(RGX_REPR.match(repr(barrier)))
    1099          self.assertIn("draining", repr(barrier))
    1100  
    1101          # add a part of waiters
    1102          for i in range(incr):
    1103              waiters.append(asyncio.create_task(wait(barrier)))
    1104          await asyncio.sleep(0)
    1105          # and reset
    1106          await barrier.reset()
    1107  
    1108          self.assertTrue(RGX_REPR.match(repr(barrier)))
    1109          self.assertIn("resetting", repr(barrier))
    1110  
    1111          # add a part of waiters again
    1112          for i in range(incr):
    1113              waiters.append(asyncio.create_task(wait(barrier)))
    1114          await asyncio.sleep(0)
    1115          # and abort
    1116          await barrier.abort()
    1117  
    1118          self.assertTrue(RGX_REPR.match(repr(barrier)))
    1119          self.assertIn("broken", repr(barrier))
    1120          self.assertTrue(barrier.broken)
    1121  
    1122          # suppress unhandled exceptions
    1123          await asyncio.gather(*waiters, return_exceptions=True)
    1124  
    1125      async def test_barrier_parties(self):
    1126          self.assertRaises(ValueError, lambda: asyncio.Barrier(0))
    1127          self.assertRaises(ValueError, lambda: asyncio.Barrier(-4))
    1128  
    1129          self.assertIsInstance(asyncio.Barrier(self.N), asyncio.Barrier)
    1130  
    1131      async def test_context_manager(self):
    1132          self.N = 3
    1133          barrier = asyncio.Barrier(self.N)
    1134          results = []
    1135  
    1136          async def coro():
    1137              async with barrier as i:
    1138                  results.append(i)
    1139  
    1140          await self.gather_tasks(self.N, coro)
    1141  
    1142          self.assertListEqual(sorted(results), list(range(self.N)))
    1143          self.assertEqual(barrier.n_waiting, 0)
    1144          self.assertFalse(barrier.broken)
    1145  
    1146      async def test_filling_one_task(self):
    1147          barrier = asyncio.Barrier(1)
    1148  
    1149          async def f():
    1150              async with barrier as i:
    1151                  return True
    1152  
    1153          ret = await f()
    1154  
    1155          self.assertTrue(ret)
    1156          self.assertEqual(barrier.n_waiting, 0)
    1157          self.assertFalse(barrier.broken)
    1158  
    1159      async def test_filling_one_task_twice(self):
    1160          barrier = asyncio.Barrier(1)
    1161  
    1162          t1 = asyncio.create_task(barrier.wait())
    1163          await asyncio.sleep(0)
    1164          self.assertEqual(barrier.n_waiting, 0)
    1165  
    1166          t2 = asyncio.create_task(barrier.wait())
    1167          await asyncio.sleep(0)
    1168  
    1169          self.assertEqual(t1.result(), t2.result())
    1170          self.assertEqual(t1.done(), t2.done())
    1171  
    1172          self.assertEqual(barrier.n_waiting, 0)
    1173          self.assertFalse(barrier.broken)
    1174  
    1175      async def test_filling_task_by_task(self):
    1176          self.N = 3
    1177          barrier = asyncio.Barrier(self.N)
    1178  
    1179          t1 = asyncio.create_task(barrier.wait())
    1180          await asyncio.sleep(0)
    1181          self.assertEqual(barrier.n_waiting, 1)
    1182          self.assertIn("filling", repr(barrier))
    1183  
    1184          t2 = asyncio.create_task(barrier.wait())
    1185          await asyncio.sleep(0)
    1186          self.assertEqual(barrier.n_waiting, 2)
    1187          self.assertIn("filling", repr(barrier))
    1188  
    1189          t3 = asyncio.create_task(barrier.wait())
    1190          await asyncio.sleep(0)
    1191  
    1192          await asyncio.wait([t1, t2, t3])
    1193  
    1194          self.assertEqual(barrier.n_waiting, 0)
    1195          self.assertFalse(barrier.broken)
    1196  
    1197      async def test_filling_tasks_wait_twice(self):
    1198          barrier = asyncio.Barrier(self.N)
    1199          results = []
    1200  
    1201          async def coro():
    1202              async with barrier:
    1203                  results.append(True)
    1204  
    1205                  async with barrier:
    1206                      results.append(False)
    1207  
    1208          await self.gather_tasks(self.N, coro)
    1209  
    1210          self.assertEqual(len(results), self.N*2)
    1211          self.assertEqual(results.count(True), self.N)
    1212          self.assertEqual(results.count(False), self.N)
    1213  
    1214          self.assertEqual(barrier.n_waiting, 0)
    1215          self.assertFalse(barrier.broken)
    1216  
    1217      async def test_filling_tasks_check_return_value(self):
    1218          barrier = asyncio.Barrier(self.N)
    1219          results1 = []
    1220          results2 = []
    1221  
    1222          async def coro():
    1223              async with barrier:
    1224                  results1.append(True)
    1225  
    1226                  async with barrier as i:
    1227                      results2.append(True)
    1228                      return i
    1229  
    1230          res, _ = await self.gather_tasks(self.N, coro)
    1231  
    1232          self.assertEqual(len(results1), self.N)
    1233          self.assertTrue(all(results1))
    1234          self.assertEqual(len(results2), self.N)
    1235          self.assertTrue(all(results2))
    1236          self.assertListEqual(sorted(res), list(range(self.N)))
    1237  
    1238          self.assertEqual(barrier.n_waiting, 0)
    1239          self.assertFalse(barrier.broken)
    1240  
    1241      async def test_draining_state(self):
    1242          barrier = asyncio.Barrier(self.N)
    1243          results = []
    1244  
    1245          async def coro():
    1246              async with barrier:
    1247                  # barrier state change to filling for the last task release
    1248                  results.append("draining" in repr(barrier))
    1249  
    1250          await self.gather_tasks(self.N, coro)
    1251  
    1252          self.assertEqual(len(results), self.N)
    1253          self.assertEqual(results[-1], False)
    1254          self.assertTrue(all(results[:self.N-1]))
    1255  
    1256          self.assertEqual(barrier.n_waiting, 0)
    1257          self.assertFalse(barrier.broken)
    1258  
    1259      async def test_blocking_tasks_while_draining(self):
    1260          rewait = 2
    1261          barrier = asyncio.Barrier(self.N)
    1262          barrier_nowaiting = asyncio.Barrier(self.N - rewait)
    1263          results = []
    1264          rewait_n = rewait
    1265          counter = 0
    1266  
    1267          async def coro():
    1268              nonlocal rewait_n
    1269  
    1270              # first time waiting
    1271              await barrier.wait()
    1272  
    1273              # after wainting once for all tasks
    1274              if rewait_n > 0:
    1275                  rewait_n -= 1
    1276                  # wait again only for rewait tasks
    1277                  await barrier.wait()
    1278              else:
    1279                  # wait for end of draining state`
    1280                  await barrier_nowaiting.wait()
    1281                  # wait for other waiting tasks
    1282                  await barrier.wait()
    1283  
    1284          # a success means that barrier_nowaiting
    1285          # was waited for exactly N-rewait=3 times
    1286          await self.gather_tasks(self.N, coro)
    1287  
    1288      async def test_filling_tasks_cancel_one(self):
    1289          self.N = 3
    1290          barrier = asyncio.Barrier(self.N)
    1291          results = []
    1292  
    1293          async def coro():
    1294              await barrier.wait()
    1295              results.append(True)
    1296  
    1297          t1 = asyncio.create_task(coro())
    1298          await asyncio.sleep(0)
    1299          self.assertEqual(barrier.n_waiting, 1)
    1300  
    1301          t2 = asyncio.create_task(coro())
    1302          await asyncio.sleep(0)
    1303          self.assertEqual(barrier.n_waiting, 2)
    1304  
    1305          t1.cancel()
    1306          await asyncio.sleep(0)
    1307          self.assertEqual(barrier.n_waiting, 1)
    1308          with self.assertRaises(asyncio.CancelledError):
    1309              await t1
    1310          self.assertTrue(t1.cancelled())
    1311  
    1312          t3 = asyncio.create_task(coro())
    1313          await asyncio.sleep(0)
    1314          self.assertEqual(barrier.n_waiting, 2)
    1315  
    1316          t4 = asyncio.create_task(coro())
    1317          await asyncio.gather(t2, t3, t4)
    1318  
    1319          self.assertEqual(len(results), self.N)
    1320          self.assertTrue(all(results))
    1321  
    1322          self.assertEqual(barrier.n_waiting, 0)
    1323          self.assertFalse(barrier.broken)
    1324  
    1325      async def test_reset_barrier(self):
    1326          barrier = asyncio.Barrier(1)
    1327  
    1328          asyncio.create_task(barrier.reset())
    1329          await asyncio.sleep(0)
    1330  
    1331          self.assertEqual(barrier.n_waiting, 0)
    1332          self.assertFalse(barrier.broken)
    1333  
    1334      async def test_reset_barrier_while_tasks_waiting(self):
    1335          barrier = asyncio.Barrier(self.N)
    1336          results = []
    1337  
    1338          async def coro():
    1339              try:
    1340                  await barrier.wait()
    1341              except asyncio.BrokenBarrierError:
    1342                  results.append(True)
    1343  
    1344          async def coro_reset():
    1345              await barrier.reset()
    1346  
    1347          # N-1 tasks waiting on barrier with N parties
    1348          tasks  = self.make_tasks(self.N-1, coro)
    1349          await asyncio.sleep(0)
    1350  
    1351          # reset the barrier
    1352          asyncio.create_task(coro_reset())
    1353          await asyncio.gather(*tasks)
    1354  
    1355          self.assertEqual(len(results), self.N-1)
    1356          self.assertTrue(all(results))
    1357          self.assertEqual(barrier.n_waiting, 0)
    1358          self.assertNotIn("resetting", repr(barrier))
    1359          self.assertFalse(barrier.broken)
    1360  
    1361      async def test_reset_barrier_when_tasks_half_draining(self):
    1362          barrier = asyncio.Barrier(self.N)
    1363          results1 = []
    1364          rest_of_tasks = self.N//2
    1365  
    1366          async def coro():
    1367              try:
    1368                  await barrier.wait()
    1369              except asyncio.BrokenBarrierError:
    1370                  # catch here waiting tasks
    1371                  results1.append(True)
    1372              else:
    1373                  # here drained task ouside the barrier
    1374                  if rest_of_tasks == barrier._count:
    1375                      # tasks outside the barrier
    1376                      await barrier.reset()
    1377  
    1378          await self.gather_tasks(self.N, coro)
    1379  
    1380          self.assertEqual(results1, [True]*rest_of_tasks)
    1381          self.assertEqual(barrier.n_waiting, 0)
    1382          self.assertNotIn("resetting", repr(barrier))
    1383          self.assertFalse(barrier.broken)
    1384  
    1385      async def test_reset_barrier_when_tasks_half_draining_half_blocking(self):
    1386          barrier = asyncio.Barrier(self.N)
    1387          results1 = []
    1388          results2 = []
    1389          blocking_tasks = self.N//2
    1390          count = 0
    1391  
    1392          async def coro():
    1393              nonlocal count
    1394              try:
    1395                  await barrier.wait()
    1396              except asyncio.BrokenBarrierError:
    1397                  # here catch still waiting tasks
    1398                  results1.append(True)
    1399  
    1400                  # so now waiting again to reach nb_parties
    1401                  await barrier.wait()
    1402              else:
    1403                  count += 1
    1404                  if count > blocking_tasks:
    1405                      # reset now: raise asyncio.BrokenBarrierError for waiting tasks
    1406                      await barrier.reset()
    1407  
    1408                      # so now waiting again to reach nb_parties
    1409                      await barrier.wait()
    1410                  else:
    1411                      try:
    1412                          await barrier.wait()
    1413                      except asyncio.BrokenBarrierError:
    1414                          # here no catch - blocked tasks go to wait
    1415                          results2.append(True)
    1416  
    1417          await self.gather_tasks(self.N, coro)
    1418  
    1419          self.assertEqual(results1, [True]*blocking_tasks)
    1420          self.assertEqual(results2, [])
    1421          self.assertEqual(barrier.n_waiting, 0)
    1422          self.assertNotIn("resetting", repr(barrier))
    1423          self.assertFalse(barrier.broken)
    1424  
    1425      async def test_reset_barrier_while_tasks_waiting_and_waiting_again(self):
    1426          barrier = asyncio.Barrier(self.N)
    1427          results1 = []
    1428          results2 = []
    1429  
    1430          async def coro1():
    1431              try:
    1432                  await barrier.wait()
    1433              except asyncio.BrokenBarrierError:
    1434                  results1.append(True)
    1435              finally:
    1436                  await barrier.wait()
    1437                  results2.append(True)
    1438  
    1439          async def coro2():
    1440              async with barrier:
    1441                  results2.append(True)
    1442  
    1443          tasks = self.make_tasks(self.N-1, coro1)
    1444  
    1445          # reset barrier, N-1 waiting tasks raise an BrokenBarrierError
    1446          asyncio.create_task(barrier.reset())
    1447          await asyncio.sleep(0)
    1448  
    1449          # complete waiting tasks in the `finally`
    1450          asyncio.create_task(coro2())
    1451  
    1452          await asyncio.gather(*tasks)
    1453  
    1454          self.assertFalse(barrier.broken)
    1455          self.assertEqual(len(results1), self.N-1)
    1456          self.assertTrue(all(results1))
    1457          self.assertEqual(len(results2), self.N)
    1458          self.assertTrue(all(results2))
    1459  
    1460          self.assertEqual(barrier.n_waiting, 0)
    1461  
    1462  
    1463      async def test_reset_barrier_while_tasks_draining(self):
    1464          barrier = asyncio.Barrier(self.N)
    1465          results1 = []
    1466          results2 = []
    1467          results3 = []
    1468          count = 0
    1469  
    1470          async def coro():
    1471              nonlocal count
    1472  
    1473              i = await barrier.wait()
    1474              count += 1
    1475              if count == self.N:
    1476                  # last task exited from barrier
    1477                  await barrier.reset()
    1478  
    1479                  # wit here to reach the `parties`
    1480                  await barrier.wait()
    1481              else:
    1482                  try:
    1483                      # second waiting
    1484                      await barrier.wait()
    1485  
    1486                      # N-1 tasks here
    1487                      results1.append(True)
    1488                  except Exception as e:
    1489                      # never goes here
    1490                      results2.append(True)
    1491  
    1492              # Now, pass the barrier again
    1493              # last wait, must be completed
    1494              k = await barrier.wait()
    1495              results3.append(True)
    1496  
    1497          await self.gather_tasks(self.N, coro)
    1498  
    1499          self.assertFalse(barrier.broken)
    1500          self.assertTrue(all(results1))
    1501          self.assertEqual(len(results1), self.N-1)
    1502          self.assertEqual(len(results2), 0)
    1503          self.assertEqual(len(results3), self.N)
    1504          self.assertTrue(all(results3))
    1505  
    1506          self.assertEqual(barrier.n_waiting, 0)
    1507  
    1508      async def test_abort_barrier(self):
    1509          barrier = asyncio.Barrier(1)
    1510  
    1511          asyncio.create_task(barrier.abort())
    1512          await asyncio.sleep(0)
    1513  
    1514          self.assertEqual(barrier.n_waiting, 0)
    1515          self.assertTrue(barrier.broken)
    1516  
    1517      async def test_abort_barrier_when_tasks_half_draining_half_blocking(self):
    1518          barrier = asyncio.Barrier(self.N)
    1519          results1 = []
    1520          results2 = []
    1521          blocking_tasks = self.N//2
    1522          count = 0
    1523  
    1524          async def coro():
    1525              nonlocal count
    1526              try:
    1527                  await barrier.wait()
    1528              except asyncio.BrokenBarrierError:
    1529                  # here catch tasks waiting to drain
    1530                  results1.append(True)
    1531              else:
    1532                  count += 1
    1533                  if count > blocking_tasks:
    1534                      # abort now: raise asyncio.BrokenBarrierError for all tasks
    1535                      await barrier.abort()
    1536                  else:
    1537                      try:
    1538                          await barrier.wait()
    1539                      except asyncio.BrokenBarrierError:
    1540                          # here catch blocked tasks (already drained)
    1541                          results2.append(True)
    1542  
    1543          await self.gather_tasks(self.N, coro)
    1544  
    1545          self.assertTrue(barrier.broken)
    1546          self.assertEqual(results1, [True]*blocking_tasks)
    1547          self.assertEqual(results2, [True]*(self.N-blocking_tasks-1))
    1548          self.assertEqual(barrier.n_waiting, 0)
    1549          self.assertNotIn("resetting", repr(barrier))
    1550  
    1551      async def test_abort_barrier_when_exception(self):
    1552          # test from threading.Barrier: see `lock_tests.test_reset`
    1553          barrier = asyncio.Barrier(self.N)
    1554          results1 = []
    1555          results2 = []
    1556  
    1557          async def coro():
    1558              try:
    1559                  async with barrier as i :
    1560                      if i == self.N//2:
    1561                          raise RuntimeError
    1562                  async with barrier:
    1563                      results1.append(True)
    1564              except asyncio.BrokenBarrierError:
    1565                  results2.append(True)
    1566              except RuntimeError:
    1567                  await barrier.abort()
    1568  
    1569          await self.gather_tasks(self.N, coro)
    1570  
    1571          self.assertTrue(barrier.broken)
    1572          self.assertEqual(len(results1), 0)
    1573          self.assertEqual(len(results2), self.N-1)
    1574          self.assertTrue(all(results2))
    1575          self.assertEqual(barrier.n_waiting, 0)
    1576  
    1577      async def test_abort_barrier_when_exception_then_resetting(self):
    1578          # test from threading.Barrier: see `lock_tests.test_abort_and_reset``
    1579          barrier1 = asyncio.Barrier(self.N)
    1580          barrier2 = asyncio.Barrier(self.N)
    1581          results1 = []
    1582          results2 = []
    1583          results3 = []
    1584  
    1585          async def coro():
    1586              try:
    1587                  i = await barrier1.wait()
    1588                  if i == self.N//2:
    1589                      raise RuntimeError
    1590                  await barrier1.wait()
    1591                  results1.append(True)
    1592              except asyncio.BrokenBarrierError:
    1593                  results2.append(True)
    1594              except RuntimeError:
    1595                  await barrier1.abort()
    1596  
    1597              # Synchronize and reset the barrier.  Must synchronize first so
    1598              # that everyone has left it when we reset, and after so that no
    1599              # one enters it before the reset.
    1600              i = await barrier2.wait()
    1601              if  i == self.N//2:
    1602                  await barrier1.reset()
    1603              await barrier2.wait()
    1604              await barrier1.wait()
    1605              results3.append(True)
    1606  
    1607          await self.gather_tasks(self.N, coro)
    1608  
    1609          self.assertFalse(barrier1.broken)
    1610          self.assertEqual(len(results1), 0)
    1611          self.assertEqual(len(results2), self.N-1)
    1612          self.assertTrue(all(results2))
    1613          self.assertEqual(len(results3), self.N)
    1614          self.assertTrue(all(results3))
    1615  
    1616          self.assertEqual(barrier1.n_waiting, 0)
    1617  
    1618  
    1619  if __name__ == '__main__':
    1620      unittest.main()