1 import threading
2 import time
3 import weakref
4 from concurrent import futures
5 from test import support
6
7
8 def mul(x, y):
9 return x * y
10
11 def capture(*args, **kwargs):
12 return args, kwargs
13
14
15 class ESC[4;38;5;81mMyObject(ESC[4;38;5;149mobject):
16 def my_method(self):
17 pass
18
19
20 def make_dummy_object(_):
21 return MyObject()
22
23
24 class ESC[4;38;5;81mExecutorTest:
25 # Executor.shutdown() and context manager usage is tested by
26 # ExecutorShutdownTest.
27 def test_submit(self):
28 future = self.executor.submit(pow, 2, 8)
29 self.assertEqual(256, future.result())
30
31 def test_submit_keyword(self):
32 future = self.executor.submit(mul, 2, y=8)
33 self.assertEqual(16, future.result())
34 future = self.executor.submit(capture, 1, self=2, fn=3)
35 self.assertEqual(future.result(), ((1,), {'self': 2, 'fn': 3}))
36 with self.assertRaises(TypeError):
37 self.executor.submit(fn=capture, arg=1)
38 with self.assertRaises(TypeError):
39 self.executor.submit(arg=1)
40
41 def test_map(self):
42 self.assertEqual(
43 list(self.executor.map(pow, range(10), range(10))),
44 list(map(pow, range(10), range(10))))
45
46 self.assertEqual(
47 list(self.executor.map(pow, range(10), range(10), chunksize=3)),
48 list(map(pow, range(10), range(10))))
49
50 def test_map_exception(self):
51 i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
52 self.assertEqual(i.__next__(), (0, 1))
53 self.assertEqual(i.__next__(), (0, 1))
54 self.assertRaises(ZeroDivisionError, i.__next__)
55
56 def test_map_timeout(self):
57 results = []
58 try:
59 for i in self.executor.map(time.sleep,
60 [0, 0, 6],
61 timeout=5):
62 results.append(i)
63 except futures.TimeoutError:
64 pass
65 else:
66 self.fail('expected TimeoutError')
67
68 self.assertEqual([None, None], results)
69
70 def test_shutdown_race_issue12456(self):
71 # Issue #12456: race condition at shutdown where trying to post a
72 # sentinel in the call queue blocks (the queue is full while processes
73 # have exited).
74 self.executor.map(str, [2] * (self.worker_count + 1))
75 self.executor.shutdown()
76
77 @support.cpython_only
78 def test_no_stale_references(self):
79 # Issue #16284: check that the executors don't unnecessarily hang onto
80 # references.
81 my_object = MyObject()
82 my_object_collected = threading.Event()
83 my_object_callback = weakref.ref(
84 my_object, lambda obj: my_object_collected.set())
85 # Deliberately discarding the future.
86 self.executor.submit(my_object.my_method)
87 del my_object
88
89 collected = my_object_collected.wait(timeout=support.SHORT_TIMEOUT)
90 self.assertTrue(collected,
91 "Stale reference not collected within timeout.")
92
93 def test_max_workers_negative(self):
94 for number in (0, -1):
95 with self.assertRaisesRegex(ValueError,
96 "max_workers must be greater "
97 "than 0"):
98 self.executor_type(max_workers=number)
99
100 def test_free_reference(self):
101 # Issue #14406: Result iterator should not keep an internal
102 # reference to result objects.
103 for obj in self.executor.map(make_dummy_object, range(10)):
104 wr = weakref.ref(obj)
105 del obj
106 support.gc_collect() # For PyPy or other GCs.
107 self.assertIsNone(wr())