1 """Tests for locks.py"""
2
3 import unittest
4 from unittest import mock
5 import re
6
7 import asyncio
8 import collections
9
10 STR_RGX_REPR = (
11 r'^<(?P<class>.*?) object at (?P<address>.*?)'
12 r'\[(?P<extras>'
13 r'(set|unset|locked|unlocked|filling|draining|resetting|broken)'
14 r'(, value:\d)?'
15 r'(, waiters:\d+)?'
16 r'(, waiters:\d+\/\d+)?' # barrier
17 r')\]>\Z'
18 )
19 RGX_REPR = re.compile(STR_RGX_REPR)
20
21
22 def tearDownModule():
23 asyncio.set_event_loop_policy(None)
24
25
26 class ESC[4;38;5;81mLockTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
27
28 async def test_repr(self):
29 lock = asyncio.Lock()
30 self.assertTrue(repr(lock).endswith('[unlocked]>'))
31 self.assertTrue(RGX_REPR.match(repr(lock)))
32
33 await lock.acquire()
34 self.assertTrue(repr(lock).endswith('[locked]>'))
35 self.assertTrue(RGX_REPR.match(repr(lock)))
36
37 async def test_lock(self):
38 lock = asyncio.Lock()
39
40 with self.assertRaisesRegex(
41 TypeError,
42 "object Lock can't be used in 'await' expression"
43 ):
44 await lock
45
46 self.assertFalse(lock.locked())
47
48 async def test_lock_doesnt_accept_loop_parameter(self):
49 primitives_cls = [
50 asyncio.Lock,
51 asyncio.Condition,
52 asyncio.Event,
53 asyncio.Semaphore,
54 asyncio.BoundedSemaphore,
55 ]
56
57 loop = asyncio.get_running_loop()
58
59 for cls in primitives_cls:
60 with self.assertRaisesRegex(
61 TypeError,
62 rf"{cls.__name__}\.__init__\(\) got an unexpected "
63 rf"keyword argument 'loop'"
64 ):
65 cls(loop=loop)
66
67 async def test_lock_by_with_statement(self):
68 primitives = [
69 asyncio.Lock(),
70 asyncio.Condition(),
71 asyncio.Semaphore(),
72 asyncio.BoundedSemaphore(),
73 ]
74
75 for lock in primitives:
76 await asyncio.sleep(0.01)
77 self.assertFalse(lock.locked())
78 with self.assertRaisesRegex(
79 TypeError,
80 r"object \w+ can't be used in 'await' expression"
81 ):
82 with await lock:
83 pass
84 self.assertFalse(lock.locked())
85
86 async def test_acquire(self):
87 lock = asyncio.Lock()
88 result = []
89
90 self.assertTrue(await lock.acquire())
91
92 async def c1(result):
93 if await lock.acquire():
94 result.append(1)
95 return True
96
97 async def c2(result):
98 if await lock.acquire():
99 result.append(2)
100 return True
101
102 async def c3(result):
103 if await lock.acquire():
104 result.append(3)
105 return True
106
107 t1 = asyncio.create_task(c1(result))
108 t2 = asyncio.create_task(c2(result))
109
110 await asyncio.sleep(0)
111 self.assertEqual([], result)
112
113 lock.release()
114 await asyncio.sleep(0)
115 self.assertEqual([1], result)
116
117 await asyncio.sleep(0)
118 self.assertEqual([1], result)
119
120 t3 = asyncio.create_task(c3(result))
121
122 lock.release()
123 await asyncio.sleep(0)
124 self.assertEqual([1, 2], result)
125
126 lock.release()
127 await asyncio.sleep(0)
128 self.assertEqual([1, 2, 3], result)
129
130 self.assertTrue(t1.done())
131 self.assertTrue(t1.result())
132 self.assertTrue(t2.done())
133 self.assertTrue(t2.result())
134 self.assertTrue(t3.done())
135 self.assertTrue(t3.result())
136
137 async def test_acquire_cancel(self):
138 lock = asyncio.Lock()
139 self.assertTrue(await lock.acquire())
140
141 task = asyncio.create_task(lock.acquire())
142 asyncio.get_running_loop().call_soon(task.cancel)
143 with self.assertRaises(asyncio.CancelledError):
144 await task
145 self.assertFalse(lock._waiters)
146
147 async def test_cancel_race(self):
148 # Several tasks:
149 # - A acquires the lock
150 # - B is blocked in acquire()
151 # - C is blocked in acquire()
152 #
153 # Now, concurrently:
154 # - B is cancelled
155 # - A releases the lock
156 #
157 # If B's waiter is marked cancelled but not yet removed from
158 # _waiters, A's release() call will crash when trying to set
159 # B's waiter; instead, it should move on to C's waiter.
160
161 # Setup: A has the lock, b and c are waiting.
162 lock = asyncio.Lock()
163
164 async def lockit(name, blocker):
165 await lock.acquire()
166 try:
167 if blocker is not None:
168 await blocker
169 finally:
170 lock.release()
171
172 fa = asyncio.get_running_loop().create_future()
173 ta = asyncio.create_task(lockit('A', fa))
174 await asyncio.sleep(0)
175 self.assertTrue(lock.locked())
176 tb = asyncio.create_task(lockit('B', None))
177 await asyncio.sleep(0)
178 self.assertEqual(len(lock._waiters), 1)
179 tc = asyncio.create_task(lockit('C', None))
180 await asyncio.sleep(0)
181 self.assertEqual(len(lock._waiters), 2)
182
183 # Create the race and check.
184 # Without the fix this failed at the last assert.
185 fa.set_result(None)
186 tb.cancel()
187 self.assertTrue(lock._waiters[0].cancelled())
188 await asyncio.sleep(0)
189 self.assertFalse(lock.locked())
190 self.assertTrue(ta.done())
191 self.assertTrue(tb.cancelled())
192 await tc
193
194 async def test_cancel_release_race(self):
195 # Issue 32734
196 # Acquire 4 locks, cancel second, release first
197 # and 2 locks are taken at once.
198 loop = asyncio.get_running_loop()
199 lock = asyncio.Lock()
200 lock_count = 0
201 call_count = 0
202
203 async def lockit():
204 nonlocal lock_count
205 nonlocal call_count
206 call_count += 1
207 await lock.acquire()
208 lock_count += 1
209
210 def trigger():
211 t1.cancel()
212 lock.release()
213
214 await lock.acquire()
215
216 t1 = asyncio.create_task(lockit())
217 t2 = asyncio.create_task(lockit())
218 t3 = asyncio.create_task(lockit())
219
220 # Start scheduled tasks
221 await asyncio.sleep(0)
222
223 loop.call_soon(trigger)
224 with self.assertRaises(asyncio.CancelledError):
225 # Wait for cancellation
226 await t1
227
228 # Make sure only one lock was taken
229 self.assertEqual(lock_count, 1)
230 # While 3 calls were made to lockit()
231 self.assertEqual(call_count, 3)
232 self.assertTrue(t1.cancelled() and t2.done())
233
234 # Cleanup the task that is stuck on acquire.
235 t3.cancel()
236 await asyncio.sleep(0)
237 self.assertTrue(t3.cancelled())
238
239 async def test_finished_waiter_cancelled(self):
240 lock = asyncio.Lock()
241
242 await lock.acquire()
243 self.assertTrue(lock.locked())
244
245 tb = asyncio.create_task(lock.acquire())
246 await asyncio.sleep(0)
247 self.assertEqual(len(lock._waiters), 1)
248
249 # Create a second waiter, wake up the first, and cancel it.
250 # Without the fix, the second was not woken up.
251 tc = asyncio.create_task(lock.acquire())
252 tb.cancel()
253 lock.release()
254 await asyncio.sleep(0)
255
256 self.assertTrue(lock.locked())
257 self.assertTrue(tb.cancelled())
258
259 # Cleanup
260 await tc
261
262 async def test_release_not_acquired(self):
263 lock = asyncio.Lock()
264
265 self.assertRaises(RuntimeError, lock.release)
266
267 async def test_release_no_waiters(self):
268 lock = asyncio.Lock()
269 await lock.acquire()
270 self.assertTrue(lock.locked())
271
272 lock.release()
273 self.assertFalse(lock.locked())
274
275 async def test_context_manager(self):
276 lock = asyncio.Lock()
277 self.assertFalse(lock.locked())
278
279 async with lock:
280 self.assertTrue(lock.locked())
281
282 self.assertFalse(lock.locked())
283
284
285 class ESC[4;38;5;81mEventTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
286
287 def test_repr(self):
288 ev = asyncio.Event()
289 self.assertTrue(repr(ev).endswith('[unset]>'))
290 match = RGX_REPR.match(repr(ev))
291 self.assertEqual(match.group('extras'), 'unset')
292
293 ev.set()
294 self.assertTrue(repr(ev).endswith('[set]>'))
295 self.assertTrue(RGX_REPR.match(repr(ev)))
296
297 ev._waiters.append(mock.Mock())
298 self.assertTrue('waiters:1' in repr(ev))
299 self.assertTrue(RGX_REPR.match(repr(ev)))
300
301 async def test_wait(self):
302 ev = asyncio.Event()
303 self.assertFalse(ev.is_set())
304
305 result = []
306
307 async def c1(result):
308 if await ev.wait():
309 result.append(1)
310
311 async def c2(result):
312 if await ev.wait():
313 result.append(2)
314
315 async def c3(result):
316 if await ev.wait():
317 result.append(3)
318
319 t1 = asyncio.create_task(c1(result))
320 t2 = asyncio.create_task(c2(result))
321
322 await asyncio.sleep(0)
323 self.assertEqual([], result)
324
325 t3 = asyncio.create_task(c3(result))
326
327 ev.set()
328 await asyncio.sleep(0)
329 self.assertEqual([3, 1, 2], result)
330
331 self.assertTrue(t1.done())
332 self.assertIsNone(t1.result())
333 self.assertTrue(t2.done())
334 self.assertIsNone(t2.result())
335 self.assertTrue(t3.done())
336 self.assertIsNone(t3.result())
337
338 async def test_wait_on_set(self):
339 ev = asyncio.Event()
340 ev.set()
341
342 res = await ev.wait()
343 self.assertTrue(res)
344
345 async def test_wait_cancel(self):
346 ev = asyncio.Event()
347
348 wait = asyncio.create_task(ev.wait())
349 asyncio.get_running_loop().call_soon(wait.cancel)
350 with self.assertRaises(asyncio.CancelledError):
351 await wait
352 self.assertFalse(ev._waiters)
353
354 async def test_clear(self):
355 ev = asyncio.Event()
356 self.assertFalse(ev.is_set())
357
358 ev.set()
359 self.assertTrue(ev.is_set())
360
361 ev.clear()
362 self.assertFalse(ev.is_set())
363
364 async def test_clear_with_waiters(self):
365 ev = asyncio.Event()
366 result = []
367
368 async def c1(result):
369 if await ev.wait():
370 result.append(1)
371 return True
372
373 t = asyncio.create_task(c1(result))
374 await asyncio.sleep(0)
375 self.assertEqual([], result)
376
377 ev.set()
378 ev.clear()
379 self.assertFalse(ev.is_set())
380
381 ev.set()
382 ev.set()
383 self.assertEqual(1, len(ev._waiters))
384
385 await asyncio.sleep(0)
386 self.assertEqual([1], result)
387 self.assertEqual(0, len(ev._waiters))
388
389 self.assertTrue(t.done())
390 self.assertTrue(t.result())
391
392
393 class ESC[4;38;5;81mConditionTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
394
395 async def test_wait(self):
396 cond = asyncio.Condition()
397 result = []
398
399 async def c1(result):
400 await cond.acquire()
401 if await cond.wait():
402 result.append(1)
403 return True
404
405 async def c2(result):
406 await cond.acquire()
407 if await cond.wait():
408 result.append(2)
409 return True
410
411 async def c3(result):
412 await cond.acquire()
413 if await cond.wait():
414 result.append(3)
415 return True
416
417 t1 = asyncio.create_task(c1(result))
418 t2 = asyncio.create_task(c2(result))
419 t3 = asyncio.create_task(c3(result))
420
421 await asyncio.sleep(0)
422 self.assertEqual([], result)
423 self.assertFalse(cond.locked())
424
425 self.assertTrue(await cond.acquire())
426 cond.notify()
427 await asyncio.sleep(0)
428 self.assertEqual([], result)
429 self.assertTrue(cond.locked())
430
431 cond.release()
432 await asyncio.sleep(0)
433 self.assertEqual([1], result)
434 self.assertTrue(cond.locked())
435
436 cond.notify(2)
437 await asyncio.sleep(0)
438 self.assertEqual([1], result)
439 self.assertTrue(cond.locked())
440
441 cond.release()
442 await asyncio.sleep(0)
443 self.assertEqual([1, 2], result)
444 self.assertTrue(cond.locked())
445
446 cond.release()
447 await asyncio.sleep(0)
448 self.assertEqual([1, 2, 3], result)
449 self.assertTrue(cond.locked())
450
451 self.assertTrue(t1.done())
452 self.assertTrue(t1.result())
453 self.assertTrue(t2.done())
454 self.assertTrue(t2.result())
455 self.assertTrue(t3.done())
456 self.assertTrue(t3.result())
457
458 async def test_wait_cancel(self):
459 cond = asyncio.Condition()
460 await cond.acquire()
461
462 wait = asyncio.create_task(cond.wait())
463 asyncio.get_running_loop().call_soon(wait.cancel)
464 with self.assertRaises(asyncio.CancelledError):
465 await wait
466 self.assertFalse(cond._waiters)
467 self.assertTrue(cond.locked())
468
469 async def test_wait_cancel_contested(self):
470 cond = asyncio.Condition()
471
472 await cond.acquire()
473 self.assertTrue(cond.locked())
474
475 wait_task = asyncio.create_task(cond.wait())
476 await asyncio.sleep(0)
477 self.assertFalse(cond.locked())
478
479 # Notify, but contest the lock before cancelling
480 await cond.acquire()
481 self.assertTrue(cond.locked())
482 cond.notify()
483 asyncio.get_running_loop().call_soon(wait_task.cancel)
484 asyncio.get_running_loop().call_soon(cond.release)
485
486 try:
487 await wait_task
488 except asyncio.CancelledError:
489 # Should not happen, since no cancellation points
490 pass
491
492 self.assertTrue(cond.locked())
493
494 async def test_wait_cancel_after_notify(self):
495 # See bpo-32841
496 waited = False
497
498 cond = asyncio.Condition()
499
500 async def wait_on_cond():
501 nonlocal waited
502 async with cond:
503 waited = True # Make sure this area was reached
504 await cond.wait()
505
506 waiter = asyncio.create_task(wait_on_cond())
507 await asyncio.sleep(0) # Start waiting
508
509 await cond.acquire()
510 cond.notify()
511 await asyncio.sleep(0) # Get to acquire()
512 waiter.cancel()
513 await asyncio.sleep(0) # Activate cancellation
514 cond.release()
515 await asyncio.sleep(0) # Cancellation should occur
516
517 self.assertTrue(waiter.cancelled())
518 self.assertTrue(waited)
519
520 async def test_wait_unacquired(self):
521 cond = asyncio.Condition()
522 with self.assertRaises(RuntimeError):
523 await cond.wait()
524
525 async def test_wait_for(self):
526 cond = asyncio.Condition()
527 presult = False
528
529 def predicate():
530 return presult
531
532 result = []
533
534 async def c1(result):
535 await cond.acquire()
536 if await cond.wait_for(predicate):
537 result.append(1)
538 cond.release()
539 return True
540
541 t = asyncio.create_task(c1(result))
542
543 await asyncio.sleep(0)
544 self.assertEqual([], result)
545
546 await cond.acquire()
547 cond.notify()
548 cond.release()
549 await asyncio.sleep(0)
550 self.assertEqual([], result)
551
552 presult = True
553 await cond.acquire()
554 cond.notify()
555 cond.release()
556 await asyncio.sleep(0)
557 self.assertEqual([1], result)
558
559 self.assertTrue(t.done())
560 self.assertTrue(t.result())
561
562 async def test_wait_for_unacquired(self):
563 cond = asyncio.Condition()
564
565 # predicate can return true immediately
566 res = await cond.wait_for(lambda: [1, 2, 3])
567 self.assertEqual([1, 2, 3], res)
568
569 with self.assertRaises(RuntimeError):
570 await cond.wait_for(lambda: False)
571
572 async def test_notify(self):
573 cond = asyncio.Condition()
574 result = []
575
576 async def c1(result):
577 await cond.acquire()
578 if await cond.wait():
579 result.append(1)
580 cond.release()
581 return True
582
583 async def c2(result):
584 await cond.acquire()
585 if await cond.wait():
586 result.append(2)
587 cond.release()
588 return True
589
590 async def c3(result):
591 await cond.acquire()
592 if await cond.wait():
593 result.append(3)
594 cond.release()
595 return True
596
597 t1 = asyncio.create_task(c1(result))
598 t2 = asyncio.create_task(c2(result))
599 t3 = asyncio.create_task(c3(result))
600
601 await asyncio.sleep(0)
602 self.assertEqual([], result)
603
604 await cond.acquire()
605 cond.notify(1)
606 cond.release()
607 await asyncio.sleep(0)
608 self.assertEqual([1], result)
609
610 await cond.acquire()
611 cond.notify(1)
612 cond.notify(2048)
613 cond.release()
614 await asyncio.sleep(0)
615 self.assertEqual([1, 2, 3], result)
616
617 self.assertTrue(t1.done())
618 self.assertTrue(t1.result())
619 self.assertTrue(t2.done())
620 self.assertTrue(t2.result())
621 self.assertTrue(t3.done())
622 self.assertTrue(t3.result())
623
624 async def test_notify_all(self):
625 cond = asyncio.Condition()
626
627 result = []
628
629 async def c1(result):
630 await cond.acquire()
631 if await cond.wait():
632 result.append(1)
633 cond.release()
634 return True
635
636 async def c2(result):
637 await cond.acquire()
638 if await cond.wait():
639 result.append(2)
640 cond.release()
641 return True
642
643 t1 = asyncio.create_task(c1(result))
644 t2 = asyncio.create_task(c2(result))
645
646 await asyncio.sleep(0)
647 self.assertEqual([], result)
648
649 await cond.acquire()
650 cond.notify_all()
651 cond.release()
652 await asyncio.sleep(0)
653 self.assertEqual([1, 2], result)
654
655 self.assertTrue(t1.done())
656 self.assertTrue(t1.result())
657 self.assertTrue(t2.done())
658 self.assertTrue(t2.result())
659
660 def test_notify_unacquired(self):
661 cond = asyncio.Condition()
662 self.assertRaises(RuntimeError, cond.notify)
663
664 def test_notify_all_unacquired(self):
665 cond = asyncio.Condition()
666 self.assertRaises(RuntimeError, cond.notify_all)
667
668 async def test_repr(self):
669 cond = asyncio.Condition()
670 self.assertTrue('unlocked' in repr(cond))
671 self.assertTrue(RGX_REPR.match(repr(cond)))
672
673 await cond.acquire()
674 self.assertTrue('locked' in repr(cond))
675
676 cond._waiters.append(mock.Mock())
677 self.assertTrue('waiters:1' in repr(cond))
678 self.assertTrue(RGX_REPR.match(repr(cond)))
679
680 cond._waiters.append(mock.Mock())
681 self.assertTrue('waiters:2' in repr(cond))
682 self.assertTrue(RGX_REPR.match(repr(cond)))
683
684 async def test_context_manager(self):
685 cond = asyncio.Condition()
686 self.assertFalse(cond.locked())
687 async with cond:
688 self.assertTrue(cond.locked())
689 self.assertFalse(cond.locked())
690
691 async def test_explicit_lock(self):
692 async def f(lock=None, cond=None):
693 if lock is None:
694 lock = asyncio.Lock()
695 if cond is None:
696 cond = asyncio.Condition(lock)
697 self.assertIs(cond._lock, lock)
698 self.assertFalse(lock.locked())
699 self.assertFalse(cond.locked())
700 async with cond:
701 self.assertTrue(lock.locked())
702 self.assertTrue(cond.locked())
703 self.assertFalse(lock.locked())
704 self.assertFalse(cond.locked())
705 async with lock:
706 self.assertTrue(lock.locked())
707 self.assertTrue(cond.locked())
708 self.assertFalse(lock.locked())
709 self.assertFalse(cond.locked())
710
711 # All should work in the same way.
712 await f()
713 await f(asyncio.Lock())
714 lock = asyncio.Lock()
715 await f(lock, asyncio.Condition(lock))
716
717 async def test_ambiguous_loops(self):
718 loop = asyncio.new_event_loop()
719 self.addCleanup(loop.close)
720
721 async def wrong_loop_in_lock():
722 with self.assertRaises(TypeError):
723 asyncio.Lock(loop=loop) # actively disallowed since 3.10
724 lock = asyncio.Lock()
725 lock._loop = loop # use private API for testing
726 async with lock:
727 # acquired immediately via the fast-path
728 # without interaction with any event loop.
729 cond = asyncio.Condition(lock)
730 # cond.acquire() will trigger waiting on the lock
731 # and it will discover the event loop mismatch.
732 with self.assertRaisesRegex(
733 RuntimeError,
734 "is bound to a different event loop",
735 ):
736 await cond.acquire()
737
738 async def wrong_loop_in_cond():
739 # Same analogy here with the condition's loop.
740 lock = asyncio.Lock()
741 async with lock:
742 with self.assertRaises(TypeError):
743 asyncio.Condition(lock, loop=loop)
744 cond = asyncio.Condition(lock)
745 cond._loop = loop
746 with self.assertRaisesRegex(
747 RuntimeError,
748 "is bound to a different event loop",
749 ):
750 await cond.wait()
751
752 await wrong_loop_in_lock()
753 await wrong_loop_in_cond()
754
755 async def test_timeout_in_block(self):
756 condition = asyncio.Condition()
757 async with condition:
758 with self.assertRaises(asyncio.TimeoutError):
759 await asyncio.wait_for(condition.wait(), timeout=0.5)
760
761
762 class ESC[4;38;5;81mSemaphoreTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
763
764 def test_initial_value_zero(self):
765 sem = asyncio.Semaphore(0)
766 self.assertTrue(sem.locked())
767
768 async def test_repr(self):
769 sem = asyncio.Semaphore()
770 self.assertTrue(repr(sem).endswith('[unlocked, value:1]>'))
771 self.assertTrue(RGX_REPR.match(repr(sem)))
772
773 await sem.acquire()
774 self.assertTrue(repr(sem).endswith('[locked]>'))
775 self.assertTrue('waiters' not in repr(sem))
776 self.assertTrue(RGX_REPR.match(repr(sem)))
777
778 if sem._waiters is None:
779 sem._waiters = collections.deque()
780
781 sem._waiters.append(mock.Mock())
782 self.assertTrue('waiters:1' in repr(sem))
783 self.assertTrue(RGX_REPR.match(repr(sem)))
784
785 sem._waiters.append(mock.Mock())
786 self.assertTrue('waiters:2' in repr(sem))
787 self.assertTrue(RGX_REPR.match(repr(sem)))
788
789 async def test_semaphore(self):
790 sem = asyncio.Semaphore()
791 self.assertEqual(1, sem._value)
792
793 with self.assertRaisesRegex(
794 TypeError,
795 "object Semaphore can't be used in 'await' expression",
796 ):
797 await sem
798
799 self.assertFalse(sem.locked())
800 self.assertEqual(1, sem._value)
801
802 def test_semaphore_value(self):
803 self.assertRaises(ValueError, asyncio.Semaphore, -1)
804
805 async def test_acquire(self):
806 sem = asyncio.Semaphore(3)
807 result = []
808
809 self.assertTrue(await sem.acquire())
810 self.assertTrue(await sem.acquire())
811 self.assertFalse(sem.locked())
812
813 async def c1(result):
814 await sem.acquire()
815 result.append(1)
816 return True
817
818 async def c2(result):
819 await sem.acquire()
820 result.append(2)
821 return True
822
823 async def c3(result):
824 await sem.acquire()
825 result.append(3)
826 return True
827
828 async def c4(result):
829 await sem.acquire()
830 result.append(4)
831 return True
832
833 t1 = asyncio.create_task(c1(result))
834 t2 = asyncio.create_task(c2(result))
835 t3 = asyncio.create_task(c3(result))
836
837 await asyncio.sleep(0)
838 self.assertEqual([1], result)
839 self.assertTrue(sem.locked())
840 self.assertEqual(2, len(sem._waiters))
841 self.assertEqual(0, sem._value)
842
843 t4 = asyncio.create_task(c4(result))
844
845 sem.release()
846 sem.release()
847 self.assertEqual(0, sem._value)
848
849 await asyncio.sleep(0)
850 self.assertEqual(0, sem._value)
851 self.assertEqual(3, len(result))
852 self.assertTrue(sem.locked())
853 self.assertEqual(1, len(sem._waiters))
854 self.assertEqual(0, sem._value)
855
856 self.assertTrue(t1.done())
857 self.assertTrue(t1.result())
858 race_tasks = [t2, t3, t4]
859 done_tasks = [t for t in race_tasks if t.done() and t.result()]
860 self.assertEqual(2, len(done_tasks))
861
862 # cleanup locked semaphore
863 sem.release()
864 await asyncio.gather(*race_tasks)
865
866 async def test_acquire_cancel(self):
867 sem = asyncio.Semaphore()
868 await sem.acquire()
869
870 acquire = asyncio.create_task(sem.acquire())
871 asyncio.get_running_loop().call_soon(acquire.cancel)
872 with self.assertRaises(asyncio.CancelledError):
873 await acquire
874 self.assertTrue((not sem._waiters) or
875 all(waiter.done() for waiter in sem._waiters))
876
877 async def test_acquire_cancel_before_awoken(self):
878 sem = asyncio.Semaphore(value=0)
879
880 t1 = asyncio.create_task(sem.acquire())
881 t2 = asyncio.create_task(sem.acquire())
882 t3 = asyncio.create_task(sem.acquire())
883 t4 = asyncio.create_task(sem.acquire())
884
885 await asyncio.sleep(0)
886
887 t1.cancel()
888 t2.cancel()
889 sem.release()
890
891 await asyncio.sleep(0)
892 await asyncio.sleep(0)
893 num_done = sum(t.done() for t in [t3, t4])
894 self.assertEqual(num_done, 1)
895 self.assertTrue(t3.done())
896 self.assertFalse(t4.done())
897
898 t3.cancel()
899 t4.cancel()
900 await asyncio.sleep(0)
901
902 async def test_acquire_hang(self):
903 sem = asyncio.Semaphore(value=0)
904
905 t1 = asyncio.create_task(sem.acquire())
906 t2 = asyncio.create_task(sem.acquire())
907 await asyncio.sleep(0)
908
909 t1.cancel()
910 sem.release()
911 await asyncio.sleep(0)
912 await asyncio.sleep(0)
913 self.assertTrue(sem.locked())
914 self.assertTrue(t2.done())
915
916 async def test_acquire_no_hang(self):
917
918 sem = asyncio.Semaphore(1)
919
920 async def c1():
921 async with sem:
922 await asyncio.sleep(0)
923 t2.cancel()
924
925 async def c2():
926 async with sem:
927 self.assertFalse(True)
928
929 t1 = asyncio.create_task(c1())
930 t2 = asyncio.create_task(c2())
931
932 r1, r2 = await asyncio.gather(t1, t2, return_exceptions=True)
933 self.assertTrue(r1 is None)
934 self.assertTrue(isinstance(r2, asyncio.CancelledError))
935
936 await asyncio.wait_for(sem.acquire(), timeout=1.0)
937
938 def test_release_not_acquired(self):
939 sem = asyncio.BoundedSemaphore()
940
941 self.assertRaises(ValueError, sem.release)
942
943 async def test_release_no_waiters(self):
944 sem = asyncio.Semaphore()
945 await sem.acquire()
946 self.assertTrue(sem.locked())
947
948 sem.release()
949 self.assertFalse(sem.locked())
950
951 async def test_acquire_fifo_order(self):
952 sem = asyncio.Semaphore(1)
953 result = []
954
955 async def coro(tag):
956 await sem.acquire()
957 result.append(f'{tag}_1')
958 await asyncio.sleep(0.01)
959 sem.release()
960
961 await sem.acquire()
962 result.append(f'{tag}_2')
963 await asyncio.sleep(0.01)
964 sem.release()
965
966 async with asyncio.TaskGroup() as tg:
967 tg.create_task(coro('c1'))
968 tg.create_task(coro('c2'))
969 tg.create_task(coro('c3'))
970
971 self.assertEqual(
972 ['c1_1', 'c2_1', 'c3_1', 'c1_2', 'c2_2', 'c3_2'],
973 result
974 )
975
976 async def test_acquire_fifo_order_2(self):
977 sem = asyncio.Semaphore(1)
978 result = []
979
980 async def c1(result):
981 await sem.acquire()
982 result.append(1)
983 return True
984
985 async def c2(result):
986 await sem.acquire()
987 result.append(2)
988 sem.release()
989 await sem.acquire()
990 result.append(4)
991 return True
992
993 async def c3(result):
994 await sem.acquire()
995 result.append(3)
996 return True
997
998 t1 = asyncio.create_task(c1(result))
999 t2 = asyncio.create_task(c2(result))
1000 t3 = asyncio.create_task(c3(result))
1001
1002 await asyncio.sleep(0)
1003
1004 sem.release()
1005 sem.release()
1006
1007 tasks = [t1, t2, t3]
1008 await asyncio.gather(*tasks)
1009 self.assertEqual([1, 2, 3, 4], result)
1010
1011 async def test_acquire_fifo_order_3(self):
1012 sem = asyncio.Semaphore(0)
1013 result = []
1014
1015 async def c1(result):
1016 await sem.acquire()
1017 result.append(1)
1018 return True
1019
1020 async def c2(result):
1021 await sem.acquire()
1022 result.append(2)
1023 return True
1024
1025 async def c3(result):
1026 await sem.acquire()
1027 result.append(3)
1028 return True
1029
1030 t1 = asyncio.create_task(c1(result))
1031 t2 = asyncio.create_task(c2(result))
1032 t3 = asyncio.create_task(c3(result))
1033
1034 await asyncio.sleep(0)
1035
1036 t1.cancel()
1037
1038 await asyncio.sleep(0)
1039
1040 sem.release()
1041 sem.release()
1042
1043 tasks = [t1, t2, t3]
1044 await asyncio.gather(*tasks, return_exceptions=True)
1045 self.assertEqual([2, 3], result)
1046
1047
1048 class ESC[4;38;5;81mBarrierTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
1049
1050 async def asyncSetUp(self):
1051 await super().asyncSetUp()
1052 self.N = 5
1053
1054 def make_tasks(self, n, coro):
1055 tasks = [asyncio.create_task(coro()) for _ in range(n)]
1056 return tasks
1057
1058 async def gather_tasks(self, n, coro):
1059 tasks = self.make_tasks(n, coro)
1060 res = await asyncio.gather(*tasks)
1061 return res, tasks
1062
1063 async def test_barrier(self):
1064 barrier = asyncio.Barrier(self.N)
1065 self.assertIn("filling", repr(barrier))
1066 with self.assertRaisesRegex(
1067 TypeError,
1068 "object Barrier can't be used in 'await' expression",
1069 ):
1070 await barrier
1071
1072 self.assertIn("filling", repr(barrier))
1073
1074 async def test_repr(self):
1075 barrier = asyncio.Barrier(self.N)
1076
1077 self.assertTrue(RGX_REPR.match(repr(barrier)))
1078 self.assertIn("filling", repr(barrier))
1079
1080 waiters = []
1081 async def wait(barrier):
1082 await barrier.wait()
1083
1084 incr = 2
1085 for i in range(incr):
1086 waiters.append(asyncio.create_task(wait(barrier)))
1087 await asyncio.sleep(0)
1088
1089 self.assertTrue(RGX_REPR.match(repr(barrier)))
1090 self.assertTrue(f"waiters:{incr}/{self.N}" in repr(barrier))
1091 self.assertIn("filling", repr(barrier))
1092
1093 # create missing waiters
1094 for i in range(barrier.parties - barrier.n_waiting):
1095 waiters.append(asyncio.create_task(wait(barrier)))
1096 await asyncio.sleep(0)
1097
1098 self.assertTrue(RGX_REPR.match(repr(barrier)))
1099 self.assertIn("draining", repr(barrier))
1100
1101 # add a part of waiters
1102 for i in range(incr):
1103 waiters.append(asyncio.create_task(wait(barrier)))
1104 await asyncio.sleep(0)
1105 # and reset
1106 await barrier.reset()
1107
1108 self.assertTrue(RGX_REPR.match(repr(barrier)))
1109 self.assertIn("resetting", repr(barrier))
1110
1111 # add a part of waiters again
1112 for i in range(incr):
1113 waiters.append(asyncio.create_task(wait(barrier)))
1114 await asyncio.sleep(0)
1115 # and abort
1116 await barrier.abort()
1117
1118 self.assertTrue(RGX_REPR.match(repr(barrier)))
1119 self.assertIn("broken", repr(barrier))
1120 self.assertTrue(barrier.broken)
1121
1122 # suppress unhandled exceptions
1123 await asyncio.gather(*waiters, return_exceptions=True)
1124
1125 async def test_barrier_parties(self):
1126 self.assertRaises(ValueError, lambda: asyncio.Barrier(0))
1127 self.assertRaises(ValueError, lambda: asyncio.Barrier(-4))
1128
1129 self.assertIsInstance(asyncio.Barrier(self.N), asyncio.Barrier)
1130
1131 async def test_context_manager(self):
1132 self.N = 3
1133 barrier = asyncio.Barrier(self.N)
1134 results = []
1135
1136 async def coro():
1137 async with barrier as i:
1138 results.append(i)
1139
1140 await self.gather_tasks(self.N, coro)
1141
1142 self.assertListEqual(sorted(results), list(range(self.N)))
1143 self.assertEqual(barrier.n_waiting, 0)
1144 self.assertFalse(barrier.broken)
1145
1146 async def test_filling_one_task(self):
1147 barrier = asyncio.Barrier(1)
1148
1149 async def f():
1150 async with barrier as i:
1151 return True
1152
1153 ret = await f()
1154
1155 self.assertTrue(ret)
1156 self.assertEqual(barrier.n_waiting, 0)
1157 self.assertFalse(barrier.broken)
1158
1159 async def test_filling_one_task_twice(self):
1160 barrier = asyncio.Barrier(1)
1161
1162 t1 = asyncio.create_task(barrier.wait())
1163 await asyncio.sleep(0)
1164 self.assertEqual(barrier.n_waiting, 0)
1165
1166 t2 = asyncio.create_task(barrier.wait())
1167 await asyncio.sleep(0)
1168
1169 self.assertEqual(t1.result(), t2.result())
1170 self.assertEqual(t1.done(), t2.done())
1171
1172 self.assertEqual(barrier.n_waiting, 0)
1173 self.assertFalse(barrier.broken)
1174
1175 async def test_filling_task_by_task(self):
1176 self.N = 3
1177 barrier = asyncio.Barrier(self.N)
1178
1179 t1 = asyncio.create_task(barrier.wait())
1180 await asyncio.sleep(0)
1181 self.assertEqual(barrier.n_waiting, 1)
1182 self.assertIn("filling", repr(barrier))
1183
1184 t2 = asyncio.create_task(barrier.wait())
1185 await asyncio.sleep(0)
1186 self.assertEqual(barrier.n_waiting, 2)
1187 self.assertIn("filling", repr(barrier))
1188
1189 t3 = asyncio.create_task(barrier.wait())
1190 await asyncio.sleep(0)
1191
1192 await asyncio.wait([t1, t2, t3])
1193
1194 self.assertEqual(barrier.n_waiting, 0)
1195 self.assertFalse(barrier.broken)
1196
1197 async def test_filling_tasks_wait_twice(self):
1198 barrier = asyncio.Barrier(self.N)
1199 results = []
1200
1201 async def coro():
1202 async with barrier:
1203 results.append(True)
1204
1205 async with barrier:
1206 results.append(False)
1207
1208 await self.gather_tasks(self.N, coro)
1209
1210 self.assertEqual(len(results), self.N*2)
1211 self.assertEqual(results.count(True), self.N)
1212 self.assertEqual(results.count(False), self.N)
1213
1214 self.assertEqual(barrier.n_waiting, 0)
1215 self.assertFalse(barrier.broken)
1216
1217 async def test_filling_tasks_check_return_value(self):
1218 barrier = asyncio.Barrier(self.N)
1219 results1 = []
1220 results2 = []
1221
1222 async def coro():
1223 async with barrier:
1224 results1.append(True)
1225
1226 async with barrier as i:
1227 results2.append(True)
1228 return i
1229
1230 res, _ = await self.gather_tasks(self.N, coro)
1231
1232 self.assertEqual(len(results1), self.N)
1233 self.assertTrue(all(results1))
1234 self.assertEqual(len(results2), self.N)
1235 self.assertTrue(all(results2))
1236 self.assertListEqual(sorted(res), list(range(self.N)))
1237
1238 self.assertEqual(barrier.n_waiting, 0)
1239 self.assertFalse(barrier.broken)
1240
1241 async def test_draining_state(self):
1242 barrier = asyncio.Barrier(self.N)
1243 results = []
1244
1245 async def coro():
1246 async with barrier:
1247 # barrier state change to filling for the last task release
1248 results.append("draining" in repr(barrier))
1249
1250 await self.gather_tasks(self.N, coro)
1251
1252 self.assertEqual(len(results), self.N)
1253 self.assertEqual(results[-1], False)
1254 self.assertTrue(all(results[:self.N-1]))
1255
1256 self.assertEqual(barrier.n_waiting, 0)
1257 self.assertFalse(barrier.broken)
1258
1259 async def test_blocking_tasks_while_draining(self):
1260 rewait = 2
1261 barrier = asyncio.Barrier(self.N)
1262 barrier_nowaiting = asyncio.Barrier(self.N - rewait)
1263 results = []
1264 rewait_n = rewait
1265 counter = 0
1266
1267 async def coro():
1268 nonlocal rewait_n
1269
1270 # first time waiting
1271 await barrier.wait()
1272
1273 # after wainting once for all tasks
1274 if rewait_n > 0:
1275 rewait_n -= 1
1276 # wait again only for rewait tasks
1277 await barrier.wait()
1278 else:
1279 # wait for end of draining state`
1280 await barrier_nowaiting.wait()
1281 # wait for other waiting tasks
1282 await barrier.wait()
1283
1284 # a success means that barrier_nowaiting
1285 # was waited for exactly N-rewait=3 times
1286 await self.gather_tasks(self.N, coro)
1287
1288 async def test_filling_tasks_cancel_one(self):
1289 self.N = 3
1290 barrier = asyncio.Barrier(self.N)
1291 results = []
1292
1293 async def coro():
1294 await barrier.wait()
1295 results.append(True)
1296
1297 t1 = asyncio.create_task(coro())
1298 await asyncio.sleep(0)
1299 self.assertEqual(barrier.n_waiting, 1)
1300
1301 t2 = asyncio.create_task(coro())
1302 await asyncio.sleep(0)
1303 self.assertEqual(barrier.n_waiting, 2)
1304
1305 t1.cancel()
1306 await asyncio.sleep(0)
1307 self.assertEqual(barrier.n_waiting, 1)
1308 with self.assertRaises(asyncio.CancelledError):
1309 await t1
1310 self.assertTrue(t1.cancelled())
1311
1312 t3 = asyncio.create_task(coro())
1313 await asyncio.sleep(0)
1314 self.assertEqual(barrier.n_waiting, 2)
1315
1316 t4 = asyncio.create_task(coro())
1317 await asyncio.gather(t2, t3, t4)
1318
1319 self.assertEqual(len(results), self.N)
1320 self.assertTrue(all(results))
1321
1322 self.assertEqual(barrier.n_waiting, 0)
1323 self.assertFalse(barrier.broken)
1324
1325 async def test_reset_barrier(self):
1326 barrier = asyncio.Barrier(1)
1327
1328 asyncio.create_task(barrier.reset())
1329 await asyncio.sleep(0)
1330
1331 self.assertEqual(barrier.n_waiting, 0)
1332 self.assertFalse(barrier.broken)
1333
1334 async def test_reset_barrier_while_tasks_waiting(self):
1335 barrier = asyncio.Barrier(self.N)
1336 results = []
1337
1338 async def coro():
1339 try:
1340 await barrier.wait()
1341 except asyncio.BrokenBarrierError:
1342 results.append(True)
1343
1344 async def coro_reset():
1345 await barrier.reset()
1346
1347 # N-1 tasks waiting on barrier with N parties
1348 tasks = self.make_tasks(self.N-1, coro)
1349 await asyncio.sleep(0)
1350
1351 # reset the barrier
1352 asyncio.create_task(coro_reset())
1353 await asyncio.gather(*tasks)
1354
1355 self.assertEqual(len(results), self.N-1)
1356 self.assertTrue(all(results))
1357 self.assertEqual(barrier.n_waiting, 0)
1358 self.assertNotIn("resetting", repr(barrier))
1359 self.assertFalse(barrier.broken)
1360
1361 async def test_reset_barrier_when_tasks_half_draining(self):
1362 barrier = asyncio.Barrier(self.N)
1363 results1 = []
1364 rest_of_tasks = self.N//2
1365
1366 async def coro():
1367 try:
1368 await barrier.wait()
1369 except asyncio.BrokenBarrierError:
1370 # catch here waiting tasks
1371 results1.append(True)
1372 else:
1373 # here drained task ouside the barrier
1374 if rest_of_tasks == barrier._count:
1375 # tasks outside the barrier
1376 await barrier.reset()
1377
1378 await self.gather_tasks(self.N, coro)
1379
1380 self.assertEqual(results1, [True]*rest_of_tasks)
1381 self.assertEqual(barrier.n_waiting, 0)
1382 self.assertNotIn("resetting", repr(barrier))
1383 self.assertFalse(barrier.broken)
1384
1385 async def test_reset_barrier_when_tasks_half_draining_half_blocking(self):
1386 barrier = asyncio.Barrier(self.N)
1387 results1 = []
1388 results2 = []
1389 blocking_tasks = self.N//2
1390 count = 0
1391
1392 async def coro():
1393 nonlocal count
1394 try:
1395 await barrier.wait()
1396 except asyncio.BrokenBarrierError:
1397 # here catch still waiting tasks
1398 results1.append(True)
1399
1400 # so now waiting again to reach nb_parties
1401 await barrier.wait()
1402 else:
1403 count += 1
1404 if count > blocking_tasks:
1405 # reset now: raise asyncio.BrokenBarrierError for waiting tasks
1406 await barrier.reset()
1407
1408 # so now waiting again to reach nb_parties
1409 await barrier.wait()
1410 else:
1411 try:
1412 await barrier.wait()
1413 except asyncio.BrokenBarrierError:
1414 # here no catch - blocked tasks go to wait
1415 results2.append(True)
1416
1417 await self.gather_tasks(self.N, coro)
1418
1419 self.assertEqual(results1, [True]*blocking_tasks)
1420 self.assertEqual(results2, [])
1421 self.assertEqual(barrier.n_waiting, 0)
1422 self.assertNotIn("resetting", repr(barrier))
1423 self.assertFalse(barrier.broken)
1424
1425 async def test_reset_barrier_while_tasks_waiting_and_waiting_again(self):
1426 barrier = asyncio.Barrier(self.N)
1427 results1 = []
1428 results2 = []
1429
1430 async def coro1():
1431 try:
1432 await barrier.wait()
1433 except asyncio.BrokenBarrierError:
1434 results1.append(True)
1435 finally:
1436 await barrier.wait()
1437 results2.append(True)
1438
1439 async def coro2():
1440 async with barrier:
1441 results2.append(True)
1442
1443 tasks = self.make_tasks(self.N-1, coro1)
1444
1445 # reset barrier, N-1 waiting tasks raise an BrokenBarrierError
1446 asyncio.create_task(barrier.reset())
1447 await asyncio.sleep(0)
1448
1449 # complete waiting tasks in the `finally`
1450 asyncio.create_task(coro2())
1451
1452 await asyncio.gather(*tasks)
1453
1454 self.assertFalse(barrier.broken)
1455 self.assertEqual(len(results1), self.N-1)
1456 self.assertTrue(all(results1))
1457 self.assertEqual(len(results2), self.N)
1458 self.assertTrue(all(results2))
1459
1460 self.assertEqual(barrier.n_waiting, 0)
1461
1462
1463 async def test_reset_barrier_while_tasks_draining(self):
1464 barrier = asyncio.Barrier(self.N)
1465 results1 = []
1466 results2 = []
1467 results3 = []
1468 count = 0
1469
1470 async def coro():
1471 nonlocal count
1472
1473 i = await barrier.wait()
1474 count += 1
1475 if count == self.N:
1476 # last task exited from barrier
1477 await barrier.reset()
1478
1479 # wit here to reach the `parties`
1480 await barrier.wait()
1481 else:
1482 try:
1483 # second waiting
1484 await barrier.wait()
1485
1486 # N-1 tasks here
1487 results1.append(True)
1488 except Exception as e:
1489 # never goes here
1490 results2.append(True)
1491
1492 # Now, pass the barrier again
1493 # last wait, must be completed
1494 k = await barrier.wait()
1495 results3.append(True)
1496
1497 await self.gather_tasks(self.N, coro)
1498
1499 self.assertFalse(barrier.broken)
1500 self.assertTrue(all(results1))
1501 self.assertEqual(len(results1), self.N-1)
1502 self.assertEqual(len(results2), 0)
1503 self.assertEqual(len(results3), self.N)
1504 self.assertTrue(all(results3))
1505
1506 self.assertEqual(barrier.n_waiting, 0)
1507
1508 async def test_abort_barrier(self):
1509 barrier = asyncio.Barrier(1)
1510
1511 asyncio.create_task(barrier.abort())
1512 await asyncio.sleep(0)
1513
1514 self.assertEqual(barrier.n_waiting, 0)
1515 self.assertTrue(barrier.broken)
1516
1517 async def test_abort_barrier_when_tasks_half_draining_half_blocking(self):
1518 barrier = asyncio.Barrier(self.N)
1519 results1 = []
1520 results2 = []
1521 blocking_tasks = self.N//2
1522 count = 0
1523
1524 async def coro():
1525 nonlocal count
1526 try:
1527 await barrier.wait()
1528 except asyncio.BrokenBarrierError:
1529 # here catch tasks waiting to drain
1530 results1.append(True)
1531 else:
1532 count += 1
1533 if count > blocking_tasks:
1534 # abort now: raise asyncio.BrokenBarrierError for all tasks
1535 await barrier.abort()
1536 else:
1537 try:
1538 await barrier.wait()
1539 except asyncio.BrokenBarrierError:
1540 # here catch blocked tasks (already drained)
1541 results2.append(True)
1542
1543 await self.gather_tasks(self.N, coro)
1544
1545 self.assertTrue(barrier.broken)
1546 self.assertEqual(results1, [True]*blocking_tasks)
1547 self.assertEqual(results2, [True]*(self.N-blocking_tasks-1))
1548 self.assertEqual(barrier.n_waiting, 0)
1549 self.assertNotIn("resetting", repr(barrier))
1550
1551 async def test_abort_barrier_when_exception(self):
1552 # test from threading.Barrier: see `lock_tests.test_reset`
1553 barrier = asyncio.Barrier(self.N)
1554 results1 = []
1555 results2 = []
1556
1557 async def coro():
1558 try:
1559 async with barrier as i :
1560 if i == self.N//2:
1561 raise RuntimeError
1562 async with barrier:
1563 results1.append(True)
1564 except asyncio.BrokenBarrierError:
1565 results2.append(True)
1566 except RuntimeError:
1567 await barrier.abort()
1568
1569 await self.gather_tasks(self.N, coro)
1570
1571 self.assertTrue(barrier.broken)
1572 self.assertEqual(len(results1), 0)
1573 self.assertEqual(len(results2), self.N-1)
1574 self.assertTrue(all(results2))
1575 self.assertEqual(barrier.n_waiting, 0)
1576
1577 async def test_abort_barrier_when_exception_then_resetting(self):
1578 # test from threading.Barrier: see `lock_tests.test_abort_and_reset``
1579 barrier1 = asyncio.Barrier(self.N)
1580 barrier2 = asyncio.Barrier(self.N)
1581 results1 = []
1582 results2 = []
1583 results3 = []
1584
1585 async def coro():
1586 try:
1587 i = await barrier1.wait()
1588 if i == self.N//2:
1589 raise RuntimeError
1590 await barrier1.wait()
1591 results1.append(True)
1592 except asyncio.BrokenBarrierError:
1593 results2.append(True)
1594 except RuntimeError:
1595 await barrier1.abort()
1596
1597 # Synchronize and reset the barrier. Must synchronize first so
1598 # that everyone has left it when we reset, and after so that no
1599 # one enters it before the reset.
1600 i = await barrier2.wait()
1601 if i == self.N//2:
1602 await barrier1.reset()
1603 await barrier2.wait()
1604 await barrier1.wait()
1605 results3.append(True)
1606
1607 await self.gather_tasks(self.N, coro)
1608
1609 self.assertFalse(barrier1.broken)
1610 self.assertEqual(len(results1), 0)
1611 self.assertEqual(len(results2), self.N-1)
1612 self.assertTrue(all(results2))
1613 self.assertEqual(len(results3), self.N)
1614 self.assertTrue(all(results3))
1615
1616 self.assertEqual(barrier1.n_waiting, 0)
1617
1618
1619 if __name__ == '__main__':
1620 unittest.main()