1 import itertools
2 import time
3 import unittest
4 import weakref
5 from concurrent import futures
6 from concurrent.futures._base import (
7 CANCELLED_AND_NOTIFIED, FINISHED, Future)
8
9 from test import support
10
11 from .util import (
12 PENDING_FUTURE, RUNNING_FUTURE,
13 CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE,
14 create_future, create_executor_tests, setup_module)
15
16
17 def mul(x, y):
18 return x * y
19
20
21 class ESC[4;38;5;81mAsCompletedTests:
22 # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
23 def test_no_timeout(self):
24 future1 = self.executor.submit(mul, 2, 21)
25 future2 = self.executor.submit(mul, 7, 6)
26
27 completed = set(futures.as_completed(
28 [CANCELLED_AND_NOTIFIED_FUTURE,
29 EXCEPTION_FUTURE,
30 SUCCESSFUL_FUTURE,
31 future1, future2]))
32 self.assertEqual(set(
33 [CANCELLED_AND_NOTIFIED_FUTURE,
34 EXCEPTION_FUTURE,
35 SUCCESSFUL_FUTURE,
36 future1, future2]),
37 completed)
38
39 def test_zero_timeout(self):
40 future1 = self.executor.submit(time.sleep, 2)
41 completed_futures = set()
42 try:
43 for future in futures.as_completed(
44 [CANCELLED_AND_NOTIFIED_FUTURE,
45 EXCEPTION_FUTURE,
46 SUCCESSFUL_FUTURE,
47 future1],
48 timeout=0):
49 completed_futures.add(future)
50 except futures.TimeoutError:
51 pass
52
53 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
54 EXCEPTION_FUTURE,
55 SUCCESSFUL_FUTURE]),
56 completed_futures)
57
58 def test_duplicate_futures(self):
59 # Issue 20367. Duplicate futures should not raise exceptions or give
60 # duplicate responses.
61 # Issue #31641: accept arbitrary iterables.
62 future1 = self.executor.submit(time.sleep, 2)
63 completed = [
64 f for f in futures.as_completed(itertools.repeat(future1, 3))
65 ]
66 self.assertEqual(len(completed), 1)
67
68 def test_free_reference_yielded_future(self):
69 # Issue #14406: Generator should not keep references
70 # to finished futures.
71 futures_list = [Future() for _ in range(8)]
72 futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
73 futures_list.append(create_future(state=FINISHED, result=42))
74
75 with self.assertRaises(futures.TimeoutError):
76 for future in futures.as_completed(futures_list, timeout=0):
77 futures_list.remove(future)
78 wr = weakref.ref(future)
79 del future
80 support.gc_collect() # For PyPy or other GCs.
81 self.assertIsNone(wr())
82
83 futures_list[0].set_result("test")
84 for future in futures.as_completed(futures_list):
85 futures_list.remove(future)
86 wr = weakref.ref(future)
87 del future
88 support.gc_collect() # For PyPy or other GCs.
89 self.assertIsNone(wr())
90 if futures_list:
91 futures_list[0].set_result("test")
92
93 def test_correct_timeout_exception_msg(self):
94 futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE,
95 RUNNING_FUTURE, SUCCESSFUL_FUTURE]
96
97 with self.assertRaises(futures.TimeoutError) as cm:
98 list(futures.as_completed(futures_list, timeout=0))
99
100 self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished')
101
102
103 create_executor_tests(globals(), AsCompletedTests)
104
105
106 def setUpModule():
107 setup_module()
108
109
110 if __name__ == "__main__":
111 unittest.main()