1 """
2 Various tests for synchronization primitives.
3 """
4
5 import os
6 import gc
7 import sys
8 import time
9 from _thread import start_new_thread, TIMEOUT_MAX
10 import threading
11 import unittest
12 import weakref
13
14 from test import support
15 from test.support import threading_helper
16
17
18 requires_fork = unittest.skipUnless(support.has_fork_support,
19 "platform doesn't support fork "
20 "(no _at_fork_reinit method)")
21
22
23 def wait_threads_blocked(nthread):
24 # Arbitrary sleep to wait until N threads are blocked,
25 # like waiting for a lock.
26 time.sleep(0.010 * nthread)
27
28
29 class ESC[4;38;5;81mBunch(ESC[4;38;5;149mobject):
30 """
31 A bunch of threads.
32 """
33 def __init__(self, func, nthread, wait_before_exit=False):
34 """
35 Construct a bunch of `nthread` threads running the same function `func`.
36 If `wait_before_exit` is True, the threads won't terminate until
37 do_finish() is called.
38 """
39 self.func = func
40 self.nthread = nthread
41 self.started = []
42 self.finished = []
43 self.exceptions = []
44 self._can_exit = not wait_before_exit
45 self._wait_thread = None
46
47 def task(self):
48 tid = threading.get_ident()
49 self.started.append(tid)
50 try:
51 self.func()
52 except BaseException as exc:
53 self.exceptions.append(exc)
54 finally:
55 self.finished.append(tid)
56 for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
57 if self._can_exit:
58 break
59
60 def __enter__(self):
61 self._wait_thread = threading_helper.wait_threads_exit(support.SHORT_TIMEOUT)
62 self._wait_thread.__enter__()
63
64 try:
65 for _ in range(self.nthread):
66 start_new_thread(self.task, ())
67 except:
68 self._can_exit = True
69 raise
70
71 for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
72 if len(self.started) >= self.nthread:
73 break
74
75 return self
76
77 def __exit__(self, exc_type, exc_value, traceback):
78 for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
79 if len(self.finished) >= self.nthread:
80 break
81
82 # Wait until threads completely exit according to _thread._count()
83 self._wait_thread.__exit__(None, None, None)
84
85 # Break reference cycle
86 exceptions = self.exceptions
87 self.exceptions = None
88 if exceptions:
89 raise ExceptionGroup(f"{self.func} threads raised exceptions",
90 exceptions)
91
92 def do_finish(self):
93 self._can_exit = True
94
95
96 class ESC[4;38;5;81mBaseTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
97 def setUp(self):
98 self._threads = threading_helper.threading_setup()
99
100 def tearDown(self):
101 threading_helper.threading_cleanup(*self._threads)
102 support.reap_children()
103
104 def assertTimeout(self, actual, expected):
105 # The waiting and/or time.monotonic() can be imprecise, which
106 # is why comparing to the expected value would sometimes fail
107 # (especially under Windows).
108 self.assertGreaterEqual(actual, expected * 0.6)
109 # Test nothing insane happened
110 self.assertLess(actual, expected * 10.0)
111
112
113 class ESC[4;38;5;81mBaseLockTests(ESC[4;38;5;149mBaseTestCase):
114 """
115 Tests for both recursive and non-recursive locks.
116 """
117
118 def wait_phase(self, phase, expected):
119 for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
120 if len(phase) >= expected:
121 break
122 self.assertEqual(len(phase), expected)
123
124 def test_constructor(self):
125 lock = self.locktype()
126 del lock
127
128 def test_repr(self):
129 lock = self.locktype()
130 self.assertRegex(repr(lock), "<unlocked .* object (.*)?at .*>")
131 del lock
132
133 def test_locked_repr(self):
134 lock = self.locktype()
135 lock.acquire()
136 self.assertRegex(repr(lock), "<locked .* object (.*)?at .*>")
137 del lock
138
139 def test_acquire_destroy(self):
140 lock = self.locktype()
141 lock.acquire()
142 del lock
143
144 def test_acquire_release(self):
145 lock = self.locktype()
146 lock.acquire()
147 lock.release()
148 del lock
149
150 def test_try_acquire(self):
151 lock = self.locktype()
152 self.assertTrue(lock.acquire(False))
153 lock.release()
154
155 def test_try_acquire_contended(self):
156 lock = self.locktype()
157 lock.acquire()
158 result = []
159 def f():
160 result.append(lock.acquire(False))
161 with Bunch(f, 1):
162 pass
163 self.assertFalse(result[0])
164 lock.release()
165
166 def test_acquire_contended(self):
167 lock = self.locktype()
168 lock.acquire()
169 def f():
170 lock.acquire()
171 lock.release()
172
173 N = 5
174 with Bunch(f, N) as bunch:
175 # Threads block on lock.acquire()
176 wait_threads_blocked(N)
177 self.assertEqual(len(bunch.finished), 0)
178
179 # Threads unblocked
180 lock.release()
181
182 self.assertEqual(len(bunch.finished), N)
183
184 def test_with(self):
185 lock = self.locktype()
186 def f():
187 lock.acquire()
188 lock.release()
189
190 def with_lock(err=None):
191 with lock:
192 if err is not None:
193 raise err
194
195 # Acquire the lock, do nothing, with releases the lock
196 with lock:
197 pass
198
199 # Check that the lock is unacquired
200 with Bunch(f, 1):
201 pass
202
203 # Acquire the lock, raise an exception, with releases the lock
204 with self.assertRaises(TypeError):
205 with lock:
206 raise TypeError
207
208 # Check that the lock is unacquired even if after an exception
209 # was raised in the previous "with lock:" block
210 with Bunch(f, 1):
211 pass
212
213 def test_thread_leak(self):
214 # The lock shouldn't leak a Thread instance when used from a foreign
215 # (non-threading) thread.
216 lock = self.locktype()
217 def f():
218 lock.acquire()
219 lock.release()
220
221 # We run many threads in the hope that existing threads ids won't
222 # be recycled.
223 with Bunch(f, 15):
224 pass
225
226 def test_timeout(self):
227 lock = self.locktype()
228 # Can't set timeout if not blocking
229 self.assertRaises(ValueError, lock.acquire, False, 1)
230 # Invalid timeout values
231 self.assertRaises(ValueError, lock.acquire, timeout=-100)
232 self.assertRaises(OverflowError, lock.acquire, timeout=1e100)
233 self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1)
234 # TIMEOUT_MAX is ok
235 lock.acquire(timeout=TIMEOUT_MAX)
236 lock.release()
237 t1 = time.monotonic()
238 self.assertTrue(lock.acquire(timeout=5))
239 t2 = time.monotonic()
240 # Just a sanity test that it didn't actually wait for the timeout.
241 self.assertLess(t2 - t1, 5)
242 results = []
243 def f():
244 t1 = time.monotonic()
245 results.append(lock.acquire(timeout=0.5))
246 t2 = time.monotonic()
247 results.append(t2 - t1)
248 with Bunch(f, 1):
249 pass
250 self.assertFalse(results[0])
251 self.assertTimeout(results[1], 0.5)
252
253 def test_weakref_exists(self):
254 lock = self.locktype()
255 ref = weakref.ref(lock)
256 self.assertIsNotNone(ref())
257
258 def test_weakref_deleted(self):
259 lock = self.locktype()
260 ref = weakref.ref(lock)
261 del lock
262 gc.collect() # For PyPy or other GCs.
263 self.assertIsNone(ref())
264
265
266 class ESC[4;38;5;81mLockTests(ESC[4;38;5;149mBaseLockTests):
267 """
268 Tests for non-recursive, weak locks
269 (which can be acquired and released from different threads).
270 """
271 def test_reacquire(self):
272 # Lock needs to be released before re-acquiring.
273 lock = self.locktype()
274 phase = []
275
276 def f():
277 lock.acquire()
278 phase.append(None)
279 lock.acquire()
280 phase.append(None)
281
282 with threading_helper.wait_threads_exit():
283 # Thread blocked on lock.acquire()
284 start_new_thread(f, ())
285 self.wait_phase(phase, 1)
286
287 # Thread unblocked
288 lock.release()
289 self.wait_phase(phase, 2)
290
291 def test_different_thread(self):
292 # Lock can be released from a different thread.
293 lock = self.locktype()
294 lock.acquire()
295 def f():
296 lock.release()
297 with Bunch(f, 1):
298 pass
299 lock.acquire()
300 lock.release()
301
302 def test_state_after_timeout(self):
303 # Issue #11618: check that lock is in a proper state after a
304 # (non-zero) timeout.
305 lock = self.locktype()
306 lock.acquire()
307 self.assertFalse(lock.acquire(timeout=0.01))
308 lock.release()
309 self.assertFalse(lock.locked())
310 self.assertTrue(lock.acquire(blocking=False))
311
312 @requires_fork
313 def test_at_fork_reinit(self):
314 def use_lock(lock):
315 # make sure that the lock still works normally
316 # after _at_fork_reinit()
317 lock.acquire()
318 lock.release()
319
320 # unlocked
321 lock = self.locktype()
322 lock._at_fork_reinit()
323 use_lock(lock)
324
325 # locked: _at_fork_reinit() resets the lock to the unlocked state
326 lock2 = self.locktype()
327 lock2.acquire()
328 lock2._at_fork_reinit()
329 use_lock(lock2)
330
331
332 class ESC[4;38;5;81mRLockTests(ESC[4;38;5;149mBaseLockTests):
333 """
334 Tests for recursive locks.
335 """
336 def test_reacquire(self):
337 lock = self.locktype()
338 lock.acquire()
339 lock.acquire()
340 lock.release()
341 lock.acquire()
342 lock.release()
343 lock.release()
344
345 def test_release_unacquired(self):
346 # Cannot release an unacquired lock
347 lock = self.locktype()
348 self.assertRaises(RuntimeError, lock.release)
349 lock.acquire()
350 lock.acquire()
351 lock.release()
352 lock.acquire()
353 lock.release()
354 lock.release()
355 self.assertRaises(RuntimeError, lock.release)
356
357 def test_release_save_unacquired(self):
358 # Cannot _release_save an unacquired lock
359 lock = self.locktype()
360 self.assertRaises(RuntimeError, lock._release_save)
361 lock.acquire()
362 lock.acquire()
363 lock.release()
364 lock.acquire()
365 lock.release()
366 lock.release()
367 self.assertRaises(RuntimeError, lock._release_save)
368
369 def test_recursion_count(self):
370 lock = self.locktype()
371 self.assertEqual(0, lock._recursion_count())
372 lock.acquire()
373 self.assertEqual(1, lock._recursion_count())
374 lock.acquire()
375 lock.acquire()
376 self.assertEqual(3, lock._recursion_count())
377 lock.release()
378 self.assertEqual(2, lock._recursion_count())
379 lock.release()
380 lock.release()
381 self.assertEqual(0, lock._recursion_count())
382
383 phase = []
384
385 def f():
386 lock.acquire()
387 phase.append(None)
388
389 self.wait_phase(phase, 2)
390 lock.release()
391 phase.append(None)
392
393 with threading_helper.wait_threads_exit():
394 # Thread blocked on lock.acquire()
395 start_new_thread(f, ())
396 self.wait_phase(phase, 1)
397 self.assertEqual(0, lock._recursion_count())
398
399 # Thread unblocked
400 phase.append(None)
401 self.wait_phase(phase, 3)
402 self.assertEqual(0, lock._recursion_count())
403
404 def test_different_thread(self):
405 # Cannot release from a different thread
406 lock = self.locktype()
407 def f():
408 lock.acquire()
409
410 with Bunch(f, 1, True) as bunch:
411 try:
412 self.assertRaises(RuntimeError, lock.release)
413 finally:
414 bunch.do_finish()
415
416 def test__is_owned(self):
417 lock = self.locktype()
418 self.assertFalse(lock._is_owned())
419 lock.acquire()
420 self.assertTrue(lock._is_owned())
421 lock.acquire()
422 self.assertTrue(lock._is_owned())
423 result = []
424 def f():
425 result.append(lock._is_owned())
426 with Bunch(f, 1):
427 pass
428 self.assertFalse(result[0])
429 lock.release()
430 self.assertTrue(lock._is_owned())
431 lock.release()
432 self.assertFalse(lock._is_owned())
433
434
435 class ESC[4;38;5;81mEventTests(ESC[4;38;5;149mBaseTestCase):
436 """
437 Tests for Event objects.
438 """
439
440 def test_is_set(self):
441 evt = self.eventtype()
442 self.assertFalse(evt.is_set())
443 evt.set()
444 self.assertTrue(evt.is_set())
445 evt.set()
446 self.assertTrue(evt.is_set())
447 evt.clear()
448 self.assertFalse(evt.is_set())
449 evt.clear()
450 self.assertFalse(evt.is_set())
451
452 def _check_notify(self, evt):
453 # All threads get notified
454 N = 5
455 results1 = []
456 results2 = []
457 def f():
458 results1.append(evt.wait())
459 results2.append(evt.wait())
460
461 with Bunch(f, N):
462 # Threads blocked on first evt.wait()
463 wait_threads_blocked(N)
464 self.assertEqual(len(results1), 0)
465
466 # Threads unblocked
467 evt.set()
468
469 self.assertEqual(results1, [True] * N)
470 self.assertEqual(results2, [True] * N)
471
472 def test_notify(self):
473 evt = self.eventtype()
474 self._check_notify(evt)
475 # Another time, after an explicit clear()
476 evt.set()
477 evt.clear()
478 self._check_notify(evt)
479
480 def test_timeout(self):
481 evt = self.eventtype()
482 results1 = []
483 results2 = []
484 N = 5
485 def f():
486 results1.append(evt.wait(0.0))
487 t1 = time.monotonic()
488 r = evt.wait(0.5)
489 t2 = time.monotonic()
490 results2.append((r, t2 - t1))
491
492 with Bunch(f, N):
493 pass
494
495 self.assertEqual(results1, [False] * N)
496 for r, dt in results2:
497 self.assertFalse(r)
498 self.assertTimeout(dt, 0.5)
499
500 # The event is set
501 results1 = []
502 results2 = []
503 evt.set()
504 with Bunch(f, N):
505 pass
506
507 self.assertEqual(results1, [True] * N)
508 for r, dt in results2:
509 self.assertTrue(r)
510
511 def test_set_and_clear(self):
512 # gh-57711: check that wait() returns true even when the event is
513 # cleared before the waiting thread is woken up.
514 event = self.eventtype()
515 results = []
516 def f():
517 results.append(event.wait(support.LONG_TIMEOUT))
518
519 N = 5
520 with Bunch(f, N):
521 # Threads blocked on event.wait()
522 wait_threads_blocked(N)
523
524 # Threads unblocked
525 event.set()
526 event.clear()
527
528 self.assertEqual(results, [True] * N)
529
530 @requires_fork
531 def test_at_fork_reinit(self):
532 # ensure that condition is still using a Lock after reset
533 evt = self.eventtype()
534 with evt._cond:
535 self.assertFalse(evt._cond.acquire(False))
536 evt._at_fork_reinit()
537 with evt._cond:
538 self.assertFalse(evt._cond.acquire(False))
539
540 def test_repr(self):
541 evt = self.eventtype()
542 self.assertRegex(repr(evt), r"<\w+\.Event at .*: unset>")
543 evt.set()
544 self.assertRegex(repr(evt), r"<\w+\.Event at .*: set>")
545
546
547 class ESC[4;38;5;81mConditionTests(ESC[4;38;5;149mBaseTestCase):
548 """
549 Tests for condition variables.
550 """
551
552 def test_acquire(self):
553 cond = self.condtype()
554 # Be default we have an RLock: the condition can be acquired multiple
555 # times.
556 cond.acquire()
557 cond.acquire()
558 cond.release()
559 cond.release()
560 lock = threading.Lock()
561 cond = self.condtype(lock)
562 cond.acquire()
563 self.assertFalse(lock.acquire(False))
564 cond.release()
565 self.assertTrue(lock.acquire(False))
566 self.assertFalse(cond.acquire(False))
567 lock.release()
568 with cond:
569 self.assertFalse(lock.acquire(False))
570
571 def test_unacquired_wait(self):
572 cond = self.condtype()
573 self.assertRaises(RuntimeError, cond.wait)
574
575 def test_unacquired_notify(self):
576 cond = self.condtype()
577 self.assertRaises(RuntimeError, cond.notify)
578
579 def _check_notify(self, cond):
580 # Note that this test is sensitive to timing. If the worker threads
581 # don't execute in a timely fashion, the main thread may think they
582 # are further along then they are. The main thread therefore issues
583 # wait_threads_blocked() statements to try to make sure that it doesn't
584 # race ahead of the workers.
585 # Secondly, this test assumes that condition variables are not subject
586 # to spurious wakeups. The absence of spurious wakeups is an implementation
587 # detail of Condition Variables in current CPython, but in general, not
588 # a guaranteed property of condition variables as a programming
589 # construct. In particular, it is possible that this can no longer
590 # be conveniently guaranteed should their implementation ever change.
591 ready = []
592 results1 = []
593 results2 = []
594 phase_num = 0
595 def f():
596 cond.acquire()
597 ready.append(phase_num)
598 result = cond.wait()
599
600 cond.release()
601 results1.append((result, phase_num))
602
603 cond.acquire()
604 ready.append(phase_num)
605
606 result = cond.wait()
607 cond.release()
608 results2.append((result, phase_num))
609
610 N = 5
611 with Bunch(f, N):
612 # first wait, to ensure all workers settle into cond.wait() before
613 # we continue. See issues #8799 and #30727.
614 for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
615 if len(ready) >= N:
616 break
617
618 ready.clear()
619 self.assertEqual(results1, [])
620
621 # Notify 3 threads at first
622 count1 = 3
623 cond.acquire()
624 cond.notify(count1)
625 wait_threads_blocked(count1)
626
627 # Phase 1
628 phase_num = 1
629 cond.release()
630 for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
631 if len(results1) >= count1:
632 break
633
634 self.assertEqual(results1, [(True, 1)] * count1)
635 self.assertEqual(results2, [])
636
637 # Wait until awaken workers are blocked on cond.wait()
638 for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
639 if len(ready) >= count1 :
640 break
641
642 # Notify 5 threads: they might be in their first or second wait
643 cond.acquire()
644 cond.notify(5)
645 wait_threads_blocked(N)
646
647 # Phase 2
648 phase_num = 2
649 cond.release()
650 for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
651 if len(results1) + len(results2) >= (N + count1):
652 break
653
654 count2 = N - count1
655 self.assertEqual(results1, [(True, 1)] * count1 + [(True, 2)] * count2)
656 self.assertEqual(results2, [(True, 2)] * count1)
657
658 # Make sure all workers settle into cond.wait()
659 for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
660 if len(ready) >= N:
661 break
662
663 # Notify all threads: they are all in their second wait
664 cond.acquire()
665 cond.notify_all()
666 wait_threads_blocked(N)
667
668 # Phase 3
669 phase_num = 3
670 cond.release()
671 for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
672 if len(results2) >= N:
673 break
674 self.assertEqual(results1, [(True, 1)] * count1 + [(True, 2)] * count2)
675 self.assertEqual(results2, [(True, 2)] * count1 + [(True, 3)] * count2)
676
677 def test_notify(self):
678 cond = self.condtype()
679 self._check_notify(cond)
680 # A second time, to check internal state is still ok.
681 self._check_notify(cond)
682
683 def test_timeout(self):
684 cond = self.condtype()
685 timeout = 0.5
686 results = []
687 def f():
688 cond.acquire()
689 t1 = time.monotonic()
690 result = cond.wait(timeout)
691 t2 = time.monotonic()
692 cond.release()
693 results.append((t2 - t1, result))
694
695 N = 5
696 with Bunch(f, N):
697 pass
698 self.assertEqual(len(results), N)
699
700 for dt, result in results:
701 self.assertTimeout(dt, timeout)
702 # Note that conceptually (that"s the condition variable protocol)
703 # a wait() may succeed even if no one notifies us and before any
704 # timeout occurs. Spurious wakeups can occur.
705 # This makes it hard to verify the result value.
706 # In practice, this implementation has no spurious wakeups.
707 self.assertFalse(result)
708
709 def test_waitfor(self):
710 cond = self.condtype()
711 state = 0
712 def f():
713 with cond:
714 result = cond.wait_for(lambda: state == 4)
715 self.assertTrue(result)
716 self.assertEqual(state, 4)
717
718 with Bunch(f, 1):
719 for i in range(4):
720 time.sleep(0.010)
721 with cond:
722 state += 1
723 cond.notify()
724
725 def test_waitfor_timeout(self):
726 cond = self.condtype()
727 state = 0
728 success = []
729 def f():
730 with cond:
731 dt = time.monotonic()
732 result = cond.wait_for(lambda : state==4, timeout=0.1)
733 dt = time.monotonic() - dt
734 self.assertFalse(result)
735 self.assertTimeout(dt, 0.1)
736 success.append(None)
737
738 with Bunch(f, 1):
739 # Only increment 3 times, so state == 4 is never reached.
740 for i in range(3):
741 time.sleep(0.010)
742 with cond:
743 state += 1
744 cond.notify()
745
746 self.assertEqual(len(success), 1)
747
748
749 class ESC[4;38;5;81mBaseSemaphoreTests(ESC[4;38;5;149mBaseTestCase):
750 """
751 Common tests for {bounded, unbounded} semaphore objects.
752 """
753
754 def test_constructor(self):
755 self.assertRaises(ValueError, self.semtype, value = -1)
756 self.assertRaises(ValueError, self.semtype, value = -sys.maxsize)
757
758 def test_acquire(self):
759 sem = self.semtype(1)
760 sem.acquire()
761 sem.release()
762 sem = self.semtype(2)
763 sem.acquire()
764 sem.acquire()
765 sem.release()
766 sem.release()
767
768 def test_acquire_destroy(self):
769 sem = self.semtype()
770 sem.acquire()
771 del sem
772
773 def test_acquire_contended(self):
774 sem_value = 7
775 sem = self.semtype(sem_value)
776 sem.acquire()
777
778 sem_results = []
779 results1 = []
780 results2 = []
781 phase_num = 0
782
783 def func():
784 sem_results.append(sem.acquire())
785 results1.append(phase_num)
786
787 sem_results.append(sem.acquire())
788 results2.append(phase_num)
789
790 def wait_count(count):
791 for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
792 if len(results1) + len(results2) >= count:
793 break
794
795 N = 10
796 with Bunch(func, N):
797 # Phase 0
798 count1 = sem_value - 1
799 wait_count(count1)
800 self.assertEqual(results1 + results2, [0] * count1)
801
802 # Phase 1
803 phase_num = 1
804 for i in range(sem_value):
805 sem.release()
806 count2 = sem_value
807 wait_count(count1 + count2)
808 self.assertEqual(sorted(results1 + results2),
809 [0] * count1 + [1] * count2)
810
811 # Phase 2
812 phase_num = 2
813 count3 = (sem_value - 1)
814 for i in range(count3):
815 sem.release()
816 wait_count(count1 + count2 + count3)
817 self.assertEqual(sorted(results1 + results2),
818 [0] * count1 + [1] * count2 + [2] * count3)
819 # The semaphore is still locked
820 self.assertFalse(sem.acquire(False))
821
822 # Final release, to let the last thread finish
823 count4 = 1
824 sem.release()
825
826 self.assertEqual(sem_results,
827 [True] * (count1 + count2 + count3 + count4))
828
829 def test_multirelease(self):
830 sem_value = 7
831 sem = self.semtype(sem_value)
832 sem.acquire()
833
834 results1 = []
835 results2 = []
836 phase_num = 0
837 def func():
838 sem.acquire()
839 results1.append(phase_num)
840
841 sem.acquire()
842 results2.append(phase_num)
843
844 def wait_count(count):
845 for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
846 if len(results1) + len(results2) >= count:
847 break
848
849 with Bunch(func, 10):
850 # Phase 0
851 count1 = sem_value - 1
852 wait_count(count1)
853 self.assertEqual(results1 + results2, [0] * count1)
854
855 # Phase 1
856 phase_num = 1
857 count2 = sem_value
858 sem.release(count2)
859 wait_count(count1 + count2)
860 self.assertEqual(sorted(results1 + results2),
861 [0] * count1 + [1] * count2)
862
863 # Phase 2
864 phase_num = 2
865 count3 = sem_value - 1
866 sem.release(count3)
867 wait_count(count1 + count2 + count3)
868 self.assertEqual(sorted(results1 + results2),
869 [0] * count1 + [1] * count2 + [2] * count3)
870 # The semaphore is still locked
871 self.assertFalse(sem.acquire(False))
872
873 # Final release, to let the last thread finish
874 sem.release()
875
876 def test_try_acquire(self):
877 sem = self.semtype(2)
878 self.assertTrue(sem.acquire(False))
879 self.assertTrue(sem.acquire(False))
880 self.assertFalse(sem.acquire(False))
881 sem.release()
882 self.assertTrue(sem.acquire(False))
883
884 def test_try_acquire_contended(self):
885 sem = self.semtype(4)
886 sem.acquire()
887 results = []
888 def f():
889 results.append(sem.acquire(False))
890 results.append(sem.acquire(False))
891 with Bunch(f, 5):
892 pass
893 # There can be a thread switch between acquiring the semaphore and
894 # appending the result, therefore results will not necessarily be
895 # ordered.
896 self.assertEqual(sorted(results), [False] * 7 + [True] * 3 )
897
898 def test_acquire_timeout(self):
899 sem = self.semtype(2)
900 self.assertRaises(ValueError, sem.acquire, False, timeout=1.0)
901 self.assertTrue(sem.acquire(timeout=0.005))
902 self.assertTrue(sem.acquire(timeout=0.005))
903 self.assertFalse(sem.acquire(timeout=0.005))
904 sem.release()
905 self.assertTrue(sem.acquire(timeout=0.005))
906 t = time.monotonic()
907 self.assertFalse(sem.acquire(timeout=0.5))
908 dt = time.monotonic() - t
909 self.assertTimeout(dt, 0.5)
910
911 def test_default_value(self):
912 # The default initial value is 1.
913 sem = self.semtype()
914 sem.acquire()
915 def f():
916 sem.acquire()
917 sem.release()
918
919 with Bunch(f, 1) as bunch:
920 # Thread blocked on sem.acquire()
921 wait_threads_blocked(1)
922 self.assertFalse(bunch.finished)
923
924 # Thread unblocked
925 sem.release()
926
927 def test_with(self):
928 sem = self.semtype(2)
929 def _with(err=None):
930 with sem:
931 self.assertTrue(sem.acquire(False))
932 sem.release()
933 with sem:
934 self.assertFalse(sem.acquire(False))
935 if err:
936 raise err
937 _with()
938 self.assertTrue(sem.acquire(False))
939 sem.release()
940 self.assertRaises(TypeError, _with, TypeError)
941 self.assertTrue(sem.acquire(False))
942 sem.release()
943
944 class ESC[4;38;5;81mSemaphoreTests(ESC[4;38;5;149mBaseSemaphoreTests):
945 """
946 Tests for unbounded semaphores.
947 """
948
949 def test_release_unacquired(self):
950 # Unbounded releases are allowed and increment the semaphore's value
951 sem = self.semtype(1)
952 sem.release()
953 sem.acquire()
954 sem.acquire()
955 sem.release()
956
957 def test_repr(self):
958 sem = self.semtype(3)
959 self.assertRegex(repr(sem), r"<\w+\.Semaphore at .*: value=3>")
960 sem.acquire()
961 self.assertRegex(repr(sem), r"<\w+\.Semaphore at .*: value=2>")
962 sem.release()
963 sem.release()
964 self.assertRegex(repr(sem), r"<\w+\.Semaphore at .*: value=4>")
965
966
967 class ESC[4;38;5;81mBoundedSemaphoreTests(ESC[4;38;5;149mBaseSemaphoreTests):
968 """
969 Tests for bounded semaphores.
970 """
971
972 def test_release_unacquired(self):
973 # Cannot go past the initial value
974 sem = self.semtype()
975 self.assertRaises(ValueError, sem.release)
976 sem.acquire()
977 sem.release()
978 self.assertRaises(ValueError, sem.release)
979
980 def test_repr(self):
981 sem = self.semtype(3)
982 self.assertRegex(repr(sem), r"<\w+\.BoundedSemaphore at .*: value=3/3>")
983 sem.acquire()
984 self.assertRegex(repr(sem), r"<\w+\.BoundedSemaphore at .*: value=2/3>")
985
986
987 class ESC[4;38;5;81mBarrierTests(ESC[4;38;5;149mBaseTestCase):
988 """
989 Tests for Barrier objects.
990 """
991 N = 5
992 defaultTimeout = 2.0
993
994 def setUp(self):
995 self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout)
996
997 def tearDown(self):
998 self.barrier.abort()
999
1000 def run_threads(self, f):
1001 with Bunch(f, self.N):
1002 pass
1003
1004 def multipass(self, results, n):
1005 m = self.barrier.parties
1006 self.assertEqual(m, self.N)
1007 for i in range(n):
1008 results[0].append(True)
1009 self.assertEqual(len(results[1]), i * m)
1010 self.barrier.wait()
1011 results[1].append(True)
1012 self.assertEqual(len(results[0]), (i + 1) * m)
1013 self.barrier.wait()
1014 self.assertEqual(self.barrier.n_waiting, 0)
1015 self.assertFalse(self.barrier.broken)
1016
1017 def test_barrier(self, passes=1):
1018 """
1019 Test that a barrier is passed in lockstep
1020 """
1021 results = [[],[]]
1022 def f():
1023 self.multipass(results, passes)
1024 self.run_threads(f)
1025
1026 def test_barrier_10(self):
1027 """
1028 Test that a barrier works for 10 consecutive runs
1029 """
1030 return self.test_barrier(10)
1031
1032 def test_wait_return(self):
1033 """
1034 test the return value from barrier.wait
1035 """
1036 results = []
1037 def f():
1038 r = self.barrier.wait()
1039 results.append(r)
1040
1041 self.run_threads(f)
1042 self.assertEqual(sum(results), sum(range(self.N)))
1043
1044 def test_action(self):
1045 """
1046 Test the 'action' callback
1047 """
1048 results = []
1049 def action():
1050 results.append(True)
1051 barrier = self.barriertype(self.N, action)
1052 def f():
1053 barrier.wait()
1054 self.assertEqual(len(results), 1)
1055
1056 self.run_threads(f)
1057
1058 def test_abort(self):
1059 """
1060 Test that an abort will put the barrier in a broken state
1061 """
1062 results1 = []
1063 results2 = []
1064 def f():
1065 try:
1066 i = self.barrier.wait()
1067 if i == self.N//2:
1068 raise RuntimeError
1069 self.barrier.wait()
1070 results1.append(True)
1071 except threading.BrokenBarrierError:
1072 results2.append(True)
1073 except RuntimeError:
1074 self.barrier.abort()
1075 pass
1076
1077 self.run_threads(f)
1078 self.assertEqual(len(results1), 0)
1079 self.assertEqual(len(results2), self.N-1)
1080 self.assertTrue(self.barrier.broken)
1081
1082 def test_reset(self):
1083 """
1084 Test that a 'reset' on a barrier frees the waiting threads
1085 """
1086 results1 = []
1087 results2 = []
1088 results3 = []
1089 def f():
1090 i = self.barrier.wait()
1091 if i == self.N//2:
1092 # Wait until the other threads are all in the barrier.
1093 for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
1094 if self.barrier.n_waiting >= (self.N - 1):
1095 break
1096 self.barrier.reset()
1097 else:
1098 try:
1099 self.barrier.wait()
1100 results1.append(True)
1101 except threading.BrokenBarrierError:
1102 results2.append(True)
1103 # Now, pass the barrier again
1104 self.barrier.wait()
1105 results3.append(True)
1106
1107 self.run_threads(f)
1108 self.assertEqual(len(results1), 0)
1109 self.assertEqual(len(results2), self.N-1)
1110 self.assertEqual(len(results3), self.N)
1111
1112
1113 def test_abort_and_reset(self):
1114 """
1115 Test that a barrier can be reset after being broken.
1116 """
1117 results1 = []
1118 results2 = []
1119 results3 = []
1120 barrier2 = self.barriertype(self.N)
1121 def f():
1122 try:
1123 i = self.barrier.wait()
1124 if i == self.N//2:
1125 raise RuntimeError
1126 self.barrier.wait()
1127 results1.append(True)
1128 except threading.BrokenBarrierError:
1129 results2.append(True)
1130 except RuntimeError:
1131 self.barrier.abort()
1132 pass
1133 # Synchronize and reset the barrier. Must synchronize first so
1134 # that everyone has left it when we reset, and after so that no
1135 # one enters it before the reset.
1136 if barrier2.wait() == self.N//2:
1137 self.barrier.reset()
1138 barrier2.wait()
1139 self.barrier.wait()
1140 results3.append(True)
1141
1142 self.run_threads(f)
1143 self.assertEqual(len(results1), 0)
1144 self.assertEqual(len(results2), self.N-1)
1145 self.assertEqual(len(results3), self.N)
1146
1147 def test_timeout(self):
1148 """
1149 Test wait(timeout)
1150 """
1151 def f():
1152 i = self.barrier.wait()
1153 if i == self.N // 2:
1154 # One thread is late!
1155 time.sleep(self.defaultTimeout / 2)
1156 # Default timeout is 2.0, so this is shorter.
1157 self.assertRaises(threading.BrokenBarrierError,
1158 self.barrier.wait, self.defaultTimeout / 4)
1159 self.run_threads(f)
1160
1161 def test_default_timeout(self):
1162 """
1163 Test the barrier's default timeout
1164 """
1165 timeout = 0.100
1166 barrier = self.barriertype(2, timeout=timeout)
1167 def f():
1168 self.assertRaises(threading.BrokenBarrierError,
1169 barrier.wait)
1170
1171 start_time = time.monotonic()
1172 with Bunch(f, 1):
1173 pass
1174 dt = time.monotonic() - start_time
1175 self.assertGreaterEqual(dt, timeout)
1176
1177 def test_single_thread(self):
1178 b = self.barriertype(1)
1179 b.wait()
1180 b.wait()
1181
1182 def test_repr(self):
1183 barrier = self.barriertype(3)
1184 timeout = support.LONG_TIMEOUT
1185 self.assertRegex(repr(barrier), r"<\w+\.Barrier at .*: waiters=0/3>")
1186 def f():
1187 barrier.wait(timeout)
1188
1189 N = 2
1190 with Bunch(f, N):
1191 # Threads blocked on barrier.wait()
1192 for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
1193 if barrier.n_waiting >= N:
1194 break
1195 self.assertRegex(repr(barrier),
1196 r"<\w+\.Barrier at .*: waiters=2/3>")
1197
1198 # Threads unblocked
1199 barrier.wait(timeout)
1200
1201 self.assertRegex(repr(barrier),
1202 r"<\w+\.Barrier at .*: waiters=0/3>")
1203
1204 # Abort the barrier
1205 barrier.abort()
1206 self.assertRegex(repr(barrier),
1207 r"<\w+\.Barrier at .*: broken>")