1 """
2 Various tests for synchronization primitives.
3 """
4
5 import gc
6 import sys
7 import time
8 from _thread import start_new_thread, TIMEOUT_MAX
9 import threading
10 import unittest
11 import weakref
12
13 from test import support
14 from test.support import threading_helper
15
16
17 requires_fork = unittest.skipUnless(support.has_fork_support,
18 "platform doesn't support fork "
19 "(no _at_fork_reinit method)")
20
21
22 def _wait():
23 # A crude wait/yield function not relying on synchronization primitives.
24 time.sleep(0.01)
25
26 class ESC[4;38;5;81mBunch(ESC[4;38;5;149mobject):
27 """
28 A bunch of threads.
29 """
30 def __init__(self, f, n, wait_before_exit=False):
31 """
32 Construct a bunch of `n` threads running the same function `f`.
33 If `wait_before_exit` is True, the threads won't terminate until
34 do_finish() is called.
35 """
36 self.f = f
37 self.n = n
38 self.started = []
39 self.finished = []
40 self._can_exit = not wait_before_exit
41 self.wait_thread = threading_helper.wait_threads_exit()
42 self.wait_thread.__enter__()
43
44 def task():
45 tid = threading.get_ident()
46 self.started.append(tid)
47 try:
48 f()
49 finally:
50 self.finished.append(tid)
51 while not self._can_exit:
52 _wait()
53
54 try:
55 for i in range(n):
56 start_new_thread(task, ())
57 except:
58 self._can_exit = True
59 raise
60
61 def wait_for_started(self):
62 while len(self.started) < self.n:
63 _wait()
64
65 def wait_for_finished(self):
66 while len(self.finished) < self.n:
67 _wait()
68 # Wait for threads exit
69 self.wait_thread.__exit__(None, None, None)
70
71 def do_finish(self):
72 self._can_exit = True
73
74
75 class ESC[4;38;5;81mBaseTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
76 def setUp(self):
77 self._threads = threading_helper.threading_setup()
78
79 def tearDown(self):
80 threading_helper.threading_cleanup(*self._threads)
81 support.reap_children()
82
83 def assertTimeout(self, actual, expected):
84 # The waiting and/or time.monotonic() can be imprecise, which
85 # is why comparing to the expected value would sometimes fail
86 # (especially under Windows).
87 self.assertGreaterEqual(actual, expected * 0.6)
88 # Test nothing insane happened
89 self.assertLess(actual, expected * 10.0)
90
91
92 class ESC[4;38;5;81mBaseLockTests(ESC[4;38;5;149mBaseTestCase):
93 """
94 Tests for both recursive and non-recursive locks.
95 """
96
97 def test_constructor(self):
98 lock = self.locktype()
99 del lock
100
101 def test_repr(self):
102 lock = self.locktype()
103 self.assertRegex(repr(lock), "<unlocked .* object (.*)?at .*>")
104 del lock
105
106 def test_locked_repr(self):
107 lock = self.locktype()
108 lock.acquire()
109 self.assertRegex(repr(lock), "<locked .* object (.*)?at .*>")
110 del lock
111
112 def test_acquire_destroy(self):
113 lock = self.locktype()
114 lock.acquire()
115 del lock
116
117 def test_acquire_release(self):
118 lock = self.locktype()
119 lock.acquire()
120 lock.release()
121 del lock
122
123 def test_try_acquire(self):
124 lock = self.locktype()
125 self.assertTrue(lock.acquire(False))
126 lock.release()
127
128 def test_try_acquire_contended(self):
129 lock = self.locktype()
130 lock.acquire()
131 result = []
132 def f():
133 result.append(lock.acquire(False))
134 Bunch(f, 1).wait_for_finished()
135 self.assertFalse(result[0])
136 lock.release()
137
138 def test_acquire_contended(self):
139 lock = self.locktype()
140 lock.acquire()
141 N = 5
142 def f():
143 lock.acquire()
144 lock.release()
145
146 b = Bunch(f, N)
147 b.wait_for_started()
148 _wait()
149 self.assertEqual(len(b.finished), 0)
150 lock.release()
151 b.wait_for_finished()
152 self.assertEqual(len(b.finished), N)
153
154 def test_with(self):
155 lock = self.locktype()
156 def f():
157 lock.acquire()
158 lock.release()
159 def _with(err=None):
160 with lock:
161 if err is not None:
162 raise err
163 _with()
164 # Check the lock is unacquired
165 Bunch(f, 1).wait_for_finished()
166 self.assertRaises(TypeError, _with, TypeError)
167 # Check the lock is unacquired
168 Bunch(f, 1).wait_for_finished()
169
170 def test_thread_leak(self):
171 # The lock shouldn't leak a Thread instance when used from a foreign
172 # (non-threading) thread.
173 lock = self.locktype()
174 def f():
175 lock.acquire()
176 lock.release()
177 n = len(threading.enumerate())
178 # We run many threads in the hope that existing threads ids won't
179 # be recycled.
180 Bunch(f, 15).wait_for_finished()
181 if len(threading.enumerate()) != n:
182 # There is a small window during which a Thread instance's
183 # target function has finished running, but the Thread is still
184 # alive and registered. Avoid spurious failures by waiting a
185 # bit more (seen on a buildbot).
186 time.sleep(0.4)
187 self.assertEqual(n, len(threading.enumerate()))
188
189 def test_timeout(self):
190 lock = self.locktype()
191 # Can't set timeout if not blocking
192 self.assertRaises(ValueError, lock.acquire, False, 1)
193 # Invalid timeout values
194 self.assertRaises(ValueError, lock.acquire, timeout=-100)
195 self.assertRaises(OverflowError, lock.acquire, timeout=1e100)
196 self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1)
197 # TIMEOUT_MAX is ok
198 lock.acquire(timeout=TIMEOUT_MAX)
199 lock.release()
200 t1 = time.monotonic()
201 self.assertTrue(lock.acquire(timeout=5))
202 t2 = time.monotonic()
203 # Just a sanity test that it didn't actually wait for the timeout.
204 self.assertLess(t2 - t1, 5)
205 results = []
206 def f():
207 t1 = time.monotonic()
208 results.append(lock.acquire(timeout=0.5))
209 t2 = time.monotonic()
210 results.append(t2 - t1)
211 Bunch(f, 1).wait_for_finished()
212 self.assertFalse(results[0])
213 self.assertTimeout(results[1], 0.5)
214
215 def test_weakref_exists(self):
216 lock = self.locktype()
217 ref = weakref.ref(lock)
218 self.assertIsNotNone(ref())
219
220 def test_weakref_deleted(self):
221 lock = self.locktype()
222 ref = weakref.ref(lock)
223 del lock
224 gc.collect() # For PyPy or other GCs.
225 self.assertIsNone(ref())
226
227
228 class ESC[4;38;5;81mLockTests(ESC[4;38;5;149mBaseLockTests):
229 """
230 Tests for non-recursive, weak locks
231 (which can be acquired and released from different threads).
232 """
233 def test_reacquire(self):
234 # Lock needs to be released before re-acquiring.
235 lock = self.locktype()
236 phase = []
237
238 def f():
239 lock.acquire()
240 phase.append(None)
241 lock.acquire()
242 phase.append(None)
243
244 with threading_helper.wait_threads_exit():
245 start_new_thread(f, ())
246 while len(phase) == 0:
247 _wait()
248 _wait()
249 self.assertEqual(len(phase), 1)
250 lock.release()
251 while len(phase) == 1:
252 _wait()
253 self.assertEqual(len(phase), 2)
254
255 def test_different_thread(self):
256 # Lock can be released from a different thread.
257 lock = self.locktype()
258 lock.acquire()
259 def f():
260 lock.release()
261 b = Bunch(f, 1)
262 b.wait_for_finished()
263 lock.acquire()
264 lock.release()
265
266 def test_state_after_timeout(self):
267 # Issue #11618: check that lock is in a proper state after a
268 # (non-zero) timeout.
269 lock = self.locktype()
270 lock.acquire()
271 self.assertFalse(lock.acquire(timeout=0.01))
272 lock.release()
273 self.assertFalse(lock.locked())
274 self.assertTrue(lock.acquire(blocking=False))
275
276 @requires_fork
277 def test_at_fork_reinit(self):
278 def use_lock(lock):
279 # make sure that the lock still works normally
280 # after _at_fork_reinit()
281 lock.acquire()
282 lock.release()
283
284 # unlocked
285 lock = self.locktype()
286 lock._at_fork_reinit()
287 use_lock(lock)
288
289 # locked: _at_fork_reinit() resets the lock to the unlocked state
290 lock2 = self.locktype()
291 lock2.acquire()
292 lock2._at_fork_reinit()
293 use_lock(lock2)
294
295
296 class ESC[4;38;5;81mRLockTests(ESC[4;38;5;149mBaseLockTests):
297 """
298 Tests for recursive locks.
299 """
300 def test_reacquire(self):
301 lock = self.locktype()
302 lock.acquire()
303 lock.acquire()
304 lock.release()
305 lock.acquire()
306 lock.release()
307 lock.release()
308
309 def test_release_unacquired(self):
310 # Cannot release an unacquired lock
311 lock = self.locktype()
312 self.assertRaises(RuntimeError, lock.release)
313 lock.acquire()
314 lock.acquire()
315 lock.release()
316 lock.acquire()
317 lock.release()
318 lock.release()
319 self.assertRaises(RuntimeError, lock.release)
320
321 def test_release_save_unacquired(self):
322 # Cannot _release_save an unacquired lock
323 lock = self.locktype()
324 self.assertRaises(RuntimeError, lock._release_save)
325 lock.acquire()
326 lock.acquire()
327 lock.release()
328 lock.acquire()
329 lock.release()
330 lock.release()
331 self.assertRaises(RuntimeError, lock._release_save)
332
333 def test_different_thread(self):
334 # Cannot release from a different thread
335 lock = self.locktype()
336 def f():
337 lock.acquire()
338 b = Bunch(f, 1, True)
339 try:
340 self.assertRaises(RuntimeError, lock.release)
341 finally:
342 b.do_finish()
343 b.wait_for_finished()
344
345 def test__is_owned(self):
346 lock = self.locktype()
347 self.assertFalse(lock._is_owned())
348 lock.acquire()
349 self.assertTrue(lock._is_owned())
350 lock.acquire()
351 self.assertTrue(lock._is_owned())
352 result = []
353 def f():
354 result.append(lock._is_owned())
355 Bunch(f, 1).wait_for_finished()
356 self.assertFalse(result[0])
357 lock.release()
358 self.assertTrue(lock._is_owned())
359 lock.release()
360 self.assertFalse(lock._is_owned())
361
362
363 class ESC[4;38;5;81mEventTests(ESC[4;38;5;149mBaseTestCase):
364 """
365 Tests for Event objects.
366 """
367
368 def test_is_set(self):
369 evt = self.eventtype()
370 self.assertFalse(evt.is_set())
371 evt.set()
372 self.assertTrue(evt.is_set())
373 evt.set()
374 self.assertTrue(evt.is_set())
375 evt.clear()
376 self.assertFalse(evt.is_set())
377 evt.clear()
378 self.assertFalse(evt.is_set())
379
380 def _check_notify(self, evt):
381 # All threads get notified
382 N = 5
383 results1 = []
384 results2 = []
385 def f():
386 results1.append(evt.wait())
387 results2.append(evt.wait())
388 b = Bunch(f, N)
389 b.wait_for_started()
390 _wait()
391 self.assertEqual(len(results1), 0)
392 evt.set()
393 b.wait_for_finished()
394 self.assertEqual(results1, [True] * N)
395 self.assertEqual(results2, [True] * N)
396
397 def test_notify(self):
398 evt = self.eventtype()
399 self._check_notify(evt)
400 # Another time, after an explicit clear()
401 evt.set()
402 evt.clear()
403 self._check_notify(evt)
404
405 def test_timeout(self):
406 evt = self.eventtype()
407 results1 = []
408 results2 = []
409 N = 5
410 def f():
411 results1.append(evt.wait(0.0))
412 t1 = time.monotonic()
413 r = evt.wait(0.5)
414 t2 = time.monotonic()
415 results2.append((r, t2 - t1))
416 Bunch(f, N).wait_for_finished()
417 self.assertEqual(results1, [False] * N)
418 for r, dt in results2:
419 self.assertFalse(r)
420 self.assertTimeout(dt, 0.5)
421 # The event is set
422 results1 = []
423 results2 = []
424 evt.set()
425 Bunch(f, N).wait_for_finished()
426 self.assertEqual(results1, [True] * N)
427 for r, dt in results2:
428 self.assertTrue(r)
429
430 def test_set_and_clear(self):
431 # Issue #13502: check that wait() returns true even when the event is
432 # cleared before the waiting thread is woken up.
433 evt = self.eventtype()
434 results = []
435 timeout = 0.250
436 N = 5
437 def f():
438 results.append(evt.wait(timeout * 4))
439 b = Bunch(f, N)
440 b.wait_for_started()
441 time.sleep(timeout)
442 evt.set()
443 evt.clear()
444 b.wait_for_finished()
445 self.assertEqual(results, [True] * N)
446
447 @requires_fork
448 def test_at_fork_reinit(self):
449 # ensure that condition is still using a Lock after reset
450 evt = self.eventtype()
451 with evt._cond:
452 self.assertFalse(evt._cond.acquire(False))
453 evt._at_fork_reinit()
454 with evt._cond:
455 self.assertFalse(evt._cond.acquire(False))
456
457 def test_repr(self):
458 evt = self.eventtype()
459 self.assertRegex(repr(evt), r"<\w+\.Event at .*: unset>")
460 evt.set()
461 self.assertRegex(repr(evt), r"<\w+\.Event at .*: set>")
462
463
464 class ESC[4;38;5;81mConditionTests(ESC[4;38;5;149mBaseTestCase):
465 """
466 Tests for condition variables.
467 """
468
469 def test_acquire(self):
470 cond = self.condtype()
471 # Be default we have an RLock: the condition can be acquired multiple
472 # times.
473 cond.acquire()
474 cond.acquire()
475 cond.release()
476 cond.release()
477 lock = threading.Lock()
478 cond = self.condtype(lock)
479 cond.acquire()
480 self.assertFalse(lock.acquire(False))
481 cond.release()
482 self.assertTrue(lock.acquire(False))
483 self.assertFalse(cond.acquire(False))
484 lock.release()
485 with cond:
486 self.assertFalse(lock.acquire(False))
487
488 def test_unacquired_wait(self):
489 cond = self.condtype()
490 self.assertRaises(RuntimeError, cond.wait)
491
492 def test_unacquired_notify(self):
493 cond = self.condtype()
494 self.assertRaises(RuntimeError, cond.notify)
495
496 def _check_notify(self, cond):
497 # Note that this test is sensitive to timing. If the worker threads
498 # don't execute in a timely fashion, the main thread may think they
499 # are further along then they are. The main thread therefore issues
500 # _wait() statements to try to make sure that it doesn't race ahead
501 # of the workers.
502 # Secondly, this test assumes that condition variables are not subject
503 # to spurious wakeups. The absence of spurious wakeups is an implementation
504 # detail of Condition Variables in current CPython, but in general, not
505 # a guaranteed property of condition variables as a programming
506 # construct. In particular, it is possible that this can no longer
507 # be conveniently guaranteed should their implementation ever change.
508 N = 5
509 ready = []
510 results1 = []
511 results2 = []
512 phase_num = 0
513 def f():
514 cond.acquire()
515 ready.append(phase_num)
516 result = cond.wait()
517 cond.release()
518 results1.append((result, phase_num))
519 cond.acquire()
520 ready.append(phase_num)
521 result = cond.wait()
522 cond.release()
523 results2.append((result, phase_num))
524 b = Bunch(f, N)
525 b.wait_for_started()
526 # first wait, to ensure all workers settle into cond.wait() before
527 # we continue. See issues #8799 and #30727.
528 while len(ready) < 5:
529 _wait()
530 ready.clear()
531 self.assertEqual(results1, [])
532 # Notify 3 threads at first
533 cond.acquire()
534 cond.notify(3)
535 _wait()
536 phase_num = 1
537 cond.release()
538 while len(results1) < 3:
539 _wait()
540 self.assertEqual(results1, [(True, 1)] * 3)
541 self.assertEqual(results2, [])
542 # make sure all awaken workers settle into cond.wait()
543 while len(ready) < 3:
544 _wait()
545 # Notify 5 threads: they might be in their first or second wait
546 cond.acquire()
547 cond.notify(5)
548 _wait()
549 phase_num = 2
550 cond.release()
551 while len(results1) + len(results2) < 8:
552 _wait()
553 self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2)
554 self.assertEqual(results2, [(True, 2)] * 3)
555 # make sure all workers settle into cond.wait()
556 while len(ready) < 5:
557 _wait()
558 # Notify all threads: they are all in their second wait
559 cond.acquire()
560 cond.notify_all()
561 _wait()
562 phase_num = 3
563 cond.release()
564 while len(results2) < 5:
565 _wait()
566 self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2)
567 self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2)
568 b.wait_for_finished()
569
570 def test_notify(self):
571 cond = self.condtype()
572 self._check_notify(cond)
573 # A second time, to check internal state is still ok.
574 self._check_notify(cond)
575
576 def test_timeout(self):
577 cond = self.condtype()
578 results = []
579 N = 5
580 def f():
581 cond.acquire()
582 t1 = time.monotonic()
583 result = cond.wait(0.5)
584 t2 = time.monotonic()
585 cond.release()
586 results.append((t2 - t1, result))
587 Bunch(f, N).wait_for_finished()
588 self.assertEqual(len(results), N)
589 for dt, result in results:
590 self.assertTimeout(dt, 0.5)
591 # Note that conceptually (that"s the condition variable protocol)
592 # a wait() may succeed even if no one notifies us and before any
593 # timeout occurs. Spurious wakeups can occur.
594 # This makes it hard to verify the result value.
595 # In practice, this implementation has no spurious wakeups.
596 self.assertFalse(result)
597
598 def test_waitfor(self):
599 cond = self.condtype()
600 state = 0
601 def f():
602 with cond:
603 result = cond.wait_for(lambda : state==4)
604 self.assertTrue(result)
605 self.assertEqual(state, 4)
606 b = Bunch(f, 1)
607 b.wait_for_started()
608 for i in range(4):
609 time.sleep(0.01)
610 with cond:
611 state += 1
612 cond.notify()
613 b.wait_for_finished()
614
615 def test_waitfor_timeout(self):
616 cond = self.condtype()
617 state = 0
618 success = []
619 def f():
620 with cond:
621 dt = time.monotonic()
622 result = cond.wait_for(lambda : state==4, timeout=0.1)
623 dt = time.monotonic() - dt
624 self.assertFalse(result)
625 self.assertTimeout(dt, 0.1)
626 success.append(None)
627 b = Bunch(f, 1)
628 b.wait_for_started()
629 # Only increment 3 times, so state == 4 is never reached.
630 for i in range(3):
631 time.sleep(0.01)
632 with cond:
633 state += 1
634 cond.notify()
635 b.wait_for_finished()
636 self.assertEqual(len(success), 1)
637
638
639 class ESC[4;38;5;81mBaseSemaphoreTests(ESC[4;38;5;149mBaseTestCase):
640 """
641 Common tests for {bounded, unbounded} semaphore objects.
642 """
643
644 def test_constructor(self):
645 self.assertRaises(ValueError, self.semtype, value = -1)
646 self.assertRaises(ValueError, self.semtype, value = -sys.maxsize)
647
648 def test_acquire(self):
649 sem = self.semtype(1)
650 sem.acquire()
651 sem.release()
652 sem = self.semtype(2)
653 sem.acquire()
654 sem.acquire()
655 sem.release()
656 sem.release()
657
658 def test_acquire_destroy(self):
659 sem = self.semtype()
660 sem.acquire()
661 del sem
662
663 def test_acquire_contended(self):
664 sem = self.semtype(7)
665 sem.acquire()
666 N = 10
667 sem_results = []
668 results1 = []
669 results2 = []
670 phase_num = 0
671 def f():
672 sem_results.append(sem.acquire())
673 results1.append(phase_num)
674 sem_results.append(sem.acquire())
675 results2.append(phase_num)
676 b = Bunch(f, 10)
677 b.wait_for_started()
678 while len(results1) + len(results2) < 6:
679 _wait()
680 self.assertEqual(results1 + results2, [0] * 6)
681 phase_num = 1
682 for i in range(7):
683 sem.release()
684 while len(results1) + len(results2) < 13:
685 _wait()
686 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
687 phase_num = 2
688 for i in range(6):
689 sem.release()
690 while len(results1) + len(results2) < 19:
691 _wait()
692 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
693 # The semaphore is still locked
694 self.assertFalse(sem.acquire(False))
695 # Final release, to let the last thread finish
696 sem.release()
697 b.wait_for_finished()
698 self.assertEqual(sem_results, [True] * (6 + 7 + 6 + 1))
699
700 def test_multirelease(self):
701 sem = self.semtype(7)
702 sem.acquire()
703 results1 = []
704 results2 = []
705 phase_num = 0
706 def f():
707 sem.acquire()
708 results1.append(phase_num)
709 sem.acquire()
710 results2.append(phase_num)
711 b = Bunch(f, 10)
712 b.wait_for_started()
713 while len(results1) + len(results2) < 6:
714 _wait()
715 self.assertEqual(results1 + results2, [0] * 6)
716 phase_num = 1
717 sem.release(7)
718 while len(results1) + len(results2) < 13:
719 _wait()
720 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
721 phase_num = 2
722 sem.release(6)
723 while len(results1) + len(results2) < 19:
724 _wait()
725 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
726 # The semaphore is still locked
727 self.assertFalse(sem.acquire(False))
728 # Final release, to let the last thread finish
729 sem.release()
730 b.wait_for_finished()
731
732 def test_try_acquire(self):
733 sem = self.semtype(2)
734 self.assertTrue(sem.acquire(False))
735 self.assertTrue(sem.acquire(False))
736 self.assertFalse(sem.acquire(False))
737 sem.release()
738 self.assertTrue(sem.acquire(False))
739
740 def test_try_acquire_contended(self):
741 sem = self.semtype(4)
742 sem.acquire()
743 results = []
744 def f():
745 results.append(sem.acquire(False))
746 results.append(sem.acquire(False))
747 Bunch(f, 5).wait_for_finished()
748 # There can be a thread switch between acquiring the semaphore and
749 # appending the result, therefore results will not necessarily be
750 # ordered.
751 self.assertEqual(sorted(results), [False] * 7 + [True] * 3 )
752
753 def test_acquire_timeout(self):
754 sem = self.semtype(2)
755 self.assertRaises(ValueError, sem.acquire, False, timeout=1.0)
756 self.assertTrue(sem.acquire(timeout=0.005))
757 self.assertTrue(sem.acquire(timeout=0.005))
758 self.assertFalse(sem.acquire(timeout=0.005))
759 sem.release()
760 self.assertTrue(sem.acquire(timeout=0.005))
761 t = time.monotonic()
762 self.assertFalse(sem.acquire(timeout=0.5))
763 dt = time.monotonic() - t
764 self.assertTimeout(dt, 0.5)
765
766 def test_default_value(self):
767 # The default initial value is 1.
768 sem = self.semtype()
769 sem.acquire()
770 def f():
771 sem.acquire()
772 sem.release()
773 b = Bunch(f, 1)
774 b.wait_for_started()
775 _wait()
776 self.assertFalse(b.finished)
777 sem.release()
778 b.wait_for_finished()
779
780 def test_with(self):
781 sem = self.semtype(2)
782 def _with(err=None):
783 with sem:
784 self.assertTrue(sem.acquire(False))
785 sem.release()
786 with sem:
787 self.assertFalse(sem.acquire(False))
788 if err:
789 raise err
790 _with()
791 self.assertTrue(sem.acquire(False))
792 sem.release()
793 self.assertRaises(TypeError, _with, TypeError)
794 self.assertTrue(sem.acquire(False))
795 sem.release()
796
797 class ESC[4;38;5;81mSemaphoreTests(ESC[4;38;5;149mBaseSemaphoreTests):
798 """
799 Tests for unbounded semaphores.
800 """
801
802 def test_release_unacquired(self):
803 # Unbounded releases are allowed and increment the semaphore's value
804 sem = self.semtype(1)
805 sem.release()
806 sem.acquire()
807 sem.acquire()
808 sem.release()
809
810 def test_repr(self):
811 sem = self.semtype(3)
812 self.assertRegex(repr(sem), r"<\w+\.Semaphore at .*: value=3>")
813 sem.acquire()
814 self.assertRegex(repr(sem), r"<\w+\.Semaphore at .*: value=2>")
815 sem.release()
816 sem.release()
817 self.assertRegex(repr(sem), r"<\w+\.Semaphore at .*: value=4>")
818
819
820 class ESC[4;38;5;81mBoundedSemaphoreTests(ESC[4;38;5;149mBaseSemaphoreTests):
821 """
822 Tests for bounded semaphores.
823 """
824
825 def test_release_unacquired(self):
826 # Cannot go past the initial value
827 sem = self.semtype()
828 self.assertRaises(ValueError, sem.release)
829 sem.acquire()
830 sem.release()
831 self.assertRaises(ValueError, sem.release)
832
833 def test_repr(self):
834 sem = self.semtype(3)
835 self.assertRegex(repr(sem), r"<\w+\.BoundedSemaphore at .*: value=3/3>")
836 sem.acquire()
837 self.assertRegex(repr(sem), r"<\w+\.BoundedSemaphore at .*: value=2/3>")
838
839
840 class ESC[4;38;5;81mBarrierTests(ESC[4;38;5;149mBaseTestCase):
841 """
842 Tests for Barrier objects.
843 """
844 N = 5
845 defaultTimeout = 2.0
846
847 def setUp(self):
848 self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout)
849 def tearDown(self):
850 self.barrier.abort()
851
852 def run_threads(self, f):
853 b = Bunch(f, self.N-1)
854 f()
855 b.wait_for_finished()
856
857 def multipass(self, results, n):
858 m = self.barrier.parties
859 self.assertEqual(m, self.N)
860 for i in range(n):
861 results[0].append(True)
862 self.assertEqual(len(results[1]), i * m)
863 self.barrier.wait()
864 results[1].append(True)
865 self.assertEqual(len(results[0]), (i + 1) * m)
866 self.barrier.wait()
867 self.assertEqual(self.barrier.n_waiting, 0)
868 self.assertFalse(self.barrier.broken)
869
870 def test_barrier(self, passes=1):
871 """
872 Test that a barrier is passed in lockstep
873 """
874 results = [[],[]]
875 def f():
876 self.multipass(results, passes)
877 self.run_threads(f)
878
879 def test_barrier_10(self):
880 """
881 Test that a barrier works for 10 consecutive runs
882 """
883 return self.test_barrier(10)
884
885 def test_wait_return(self):
886 """
887 test the return value from barrier.wait
888 """
889 results = []
890 def f():
891 r = self.barrier.wait()
892 results.append(r)
893
894 self.run_threads(f)
895 self.assertEqual(sum(results), sum(range(self.N)))
896
897 def test_action(self):
898 """
899 Test the 'action' callback
900 """
901 results = []
902 def action():
903 results.append(True)
904 barrier = self.barriertype(self.N, action)
905 def f():
906 barrier.wait()
907 self.assertEqual(len(results), 1)
908
909 self.run_threads(f)
910
911 def test_abort(self):
912 """
913 Test that an abort will put the barrier in a broken state
914 """
915 results1 = []
916 results2 = []
917 def f():
918 try:
919 i = self.barrier.wait()
920 if i == self.N//2:
921 raise RuntimeError
922 self.barrier.wait()
923 results1.append(True)
924 except threading.BrokenBarrierError:
925 results2.append(True)
926 except RuntimeError:
927 self.barrier.abort()
928 pass
929
930 self.run_threads(f)
931 self.assertEqual(len(results1), 0)
932 self.assertEqual(len(results2), self.N-1)
933 self.assertTrue(self.barrier.broken)
934
935 def test_reset(self):
936 """
937 Test that a 'reset' on a barrier frees the waiting threads
938 """
939 results1 = []
940 results2 = []
941 results3 = []
942 def f():
943 i = self.barrier.wait()
944 if i == self.N//2:
945 # Wait until the other threads are all in the barrier.
946 while self.barrier.n_waiting < self.N-1:
947 time.sleep(0.001)
948 self.barrier.reset()
949 else:
950 try:
951 self.barrier.wait()
952 results1.append(True)
953 except threading.BrokenBarrierError:
954 results2.append(True)
955 # Now, pass the barrier again
956 self.barrier.wait()
957 results3.append(True)
958
959 self.run_threads(f)
960 self.assertEqual(len(results1), 0)
961 self.assertEqual(len(results2), self.N-1)
962 self.assertEqual(len(results3), self.N)
963
964
965 def test_abort_and_reset(self):
966 """
967 Test that a barrier can be reset after being broken.
968 """
969 results1 = []
970 results2 = []
971 results3 = []
972 barrier2 = self.barriertype(self.N)
973 def f():
974 try:
975 i = self.barrier.wait()
976 if i == self.N//2:
977 raise RuntimeError
978 self.barrier.wait()
979 results1.append(True)
980 except threading.BrokenBarrierError:
981 results2.append(True)
982 except RuntimeError:
983 self.barrier.abort()
984 pass
985 # Synchronize and reset the barrier. Must synchronize first so
986 # that everyone has left it when we reset, and after so that no
987 # one enters it before the reset.
988 if barrier2.wait() == self.N//2:
989 self.barrier.reset()
990 barrier2.wait()
991 self.barrier.wait()
992 results3.append(True)
993
994 self.run_threads(f)
995 self.assertEqual(len(results1), 0)
996 self.assertEqual(len(results2), self.N-1)
997 self.assertEqual(len(results3), self.N)
998
999 def test_timeout(self):
1000 """
1001 Test wait(timeout)
1002 """
1003 def f():
1004 i = self.barrier.wait()
1005 if i == self.N // 2:
1006 # One thread is late!
1007 time.sleep(1.0)
1008 # Default timeout is 2.0, so this is shorter.
1009 self.assertRaises(threading.BrokenBarrierError,
1010 self.barrier.wait, 0.5)
1011 self.run_threads(f)
1012
1013 def test_default_timeout(self):
1014 """
1015 Test the barrier's default timeout
1016 """
1017 # create a barrier with a low default timeout
1018 barrier = self.barriertype(self.N, timeout=0.3)
1019 def f():
1020 i = barrier.wait()
1021 if i == self.N // 2:
1022 # One thread is later than the default timeout of 0.3s.
1023 time.sleep(1.0)
1024 self.assertRaises(threading.BrokenBarrierError, barrier.wait)
1025 self.run_threads(f)
1026
1027 def test_single_thread(self):
1028 b = self.barriertype(1)
1029 b.wait()
1030 b.wait()
1031
1032 def test_repr(self):
1033 b = self.barriertype(3)
1034 self.assertRegex(repr(b), r"<\w+\.Barrier at .*: waiters=0/3>")
1035 def f():
1036 b.wait(3)
1037 bunch = Bunch(f, 2)
1038 bunch.wait_for_started()
1039 time.sleep(0.2)
1040 self.assertRegex(repr(b), r"<\w+\.Barrier at .*: waiters=2/3>")
1041 b.wait(3)
1042 bunch.wait_for_finished()
1043 self.assertRegex(repr(b), r"<\w+\.Barrier at .*: waiters=0/3>")
1044 b.abort()
1045 self.assertRegex(repr(b), r"<\w+\.Barrier at .*: broken>")