1 import sys
2 import threading
3 import time
4 import unittest
5 from concurrent import futures
6
7 from .util import (
8 CANCELLED_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE,
9 SUCCESSFUL_FUTURE,
10 create_executor_tests, setup_module,
11 BaseTestCase, ThreadPoolMixin,
12 ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin)
13
14
15 def mul(x, y):
16 return x * y
17
18 def sleep_and_raise(t):
19 time.sleep(t)
20 raise Exception('this is an exception')
21
22
23 class ESC[4;38;5;81mWaitTests:
24 def test_20369(self):
25 # See https://bugs.python.org/issue20369
26 future = self.executor.submit(time.sleep, 1.5)
27 done, not_done = futures.wait([future, future],
28 return_when=futures.ALL_COMPLETED)
29 self.assertEqual({future}, done)
30 self.assertEqual(set(), not_done)
31
32
33 def test_first_completed(self):
34 future1 = self.executor.submit(mul, 21, 2)
35 future2 = self.executor.submit(time.sleep, 1.5)
36
37 done, not_done = futures.wait(
38 [CANCELLED_FUTURE, future1, future2],
39 return_when=futures.FIRST_COMPLETED)
40
41 self.assertEqual(set([future1]), done)
42 self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
43
44 def test_first_completed_some_already_completed(self):
45 future1 = self.executor.submit(time.sleep, 1.5)
46
47 finished, pending = futures.wait(
48 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
49 return_when=futures.FIRST_COMPLETED)
50
51 self.assertEqual(
52 set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
53 finished)
54 self.assertEqual(set([future1]), pending)
55
56 def test_first_exception(self):
57 future1 = self.executor.submit(mul, 2, 21)
58 future2 = self.executor.submit(sleep_and_raise, 1.5)
59 future3 = self.executor.submit(time.sleep, 3)
60
61 finished, pending = futures.wait(
62 [future1, future2, future3],
63 return_when=futures.FIRST_EXCEPTION)
64
65 self.assertEqual(set([future1, future2]), finished)
66 self.assertEqual(set([future3]), pending)
67
68 def test_first_exception_some_already_complete(self):
69 future1 = self.executor.submit(divmod, 21, 0)
70 future2 = self.executor.submit(time.sleep, 1.5)
71
72 finished, pending = futures.wait(
73 [SUCCESSFUL_FUTURE,
74 CANCELLED_FUTURE,
75 CANCELLED_AND_NOTIFIED_FUTURE,
76 future1, future2],
77 return_when=futures.FIRST_EXCEPTION)
78
79 self.assertEqual(set([SUCCESSFUL_FUTURE,
80 CANCELLED_AND_NOTIFIED_FUTURE,
81 future1]), finished)
82 self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
83
84 def test_first_exception_one_already_failed(self):
85 future1 = self.executor.submit(time.sleep, 2)
86
87 finished, pending = futures.wait(
88 [EXCEPTION_FUTURE, future1],
89 return_when=futures.FIRST_EXCEPTION)
90
91 self.assertEqual(set([EXCEPTION_FUTURE]), finished)
92 self.assertEqual(set([future1]), pending)
93
94 def test_all_completed(self):
95 future1 = self.executor.submit(divmod, 2, 0)
96 future2 = self.executor.submit(mul, 2, 21)
97
98 finished, pending = futures.wait(
99 [SUCCESSFUL_FUTURE,
100 CANCELLED_AND_NOTIFIED_FUTURE,
101 EXCEPTION_FUTURE,
102 future1,
103 future2],
104 return_when=futures.ALL_COMPLETED)
105
106 self.assertEqual(set([SUCCESSFUL_FUTURE,
107 CANCELLED_AND_NOTIFIED_FUTURE,
108 EXCEPTION_FUTURE,
109 future1,
110 future2]), finished)
111 self.assertEqual(set(), pending)
112
113 def test_timeout(self):
114 short_timeout = 0.050
115 long_timeout = short_timeout * 10
116
117 future = self.executor.submit(time.sleep, long_timeout)
118
119 finished, pending = futures.wait(
120 [CANCELLED_AND_NOTIFIED_FUTURE,
121 EXCEPTION_FUTURE,
122 SUCCESSFUL_FUTURE,
123 future],
124 timeout=short_timeout,
125 return_when=futures.ALL_COMPLETED)
126
127 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
128 EXCEPTION_FUTURE,
129 SUCCESSFUL_FUTURE]),
130 finished)
131 self.assertEqual(set([future]), pending)
132
133
134 class ESC[4;38;5;81mThreadPoolWaitTests(ESC[4;38;5;149mThreadPoolMixin, ESC[4;38;5;149mWaitTests, ESC[4;38;5;149mBaseTestCase):
135
136 def test_pending_calls_race(self):
137 # Issue #14406: multi-threaded race condition when waiting on all
138 # futures.
139 event = threading.Event()
140 def future_func():
141 event.wait()
142 oldswitchinterval = sys.getswitchinterval()
143 sys.setswitchinterval(1e-6)
144 try:
145 fs = {self.executor.submit(future_func) for i in range(100)}
146 event.set()
147 futures.wait(fs, return_when=futures.ALL_COMPLETED)
148 finally:
149 sys.setswitchinterval(oldswitchinterval)
150
151
152 create_executor_tests(globals(), WaitTests,
153 executor_mixins=(ProcessPoolForkMixin,
154 ProcessPoolForkserverMixin,
155 ProcessPoolSpawnMixin))
156
157
158 def setUpModule():
159 setup_module()
160
161
162 if __name__ == "__main__":
163 unittest.main()