1 import contextlib
2 import multiprocessing as mp
3 import multiprocessing.process
4 import multiprocessing.util
5 import os
6 import threading
7 import unittest
8 from concurrent import futures
9 from test import support
10
11 from .executor import ExecutorTest, mul
12 from .util import BaseTestCase, ThreadPoolMixin, setup_module
13
14
15 class ESC[4;38;5;81mThreadPoolExecutorTest(ESC[4;38;5;149mThreadPoolMixin, ESC[4;38;5;149mExecutorTest, ESC[4;38;5;149mBaseTestCase):
16 def test_map_submits_without_iteration(self):
17 """Tests verifying issue 11777."""
18 finished = []
19 def record_finished(n):
20 finished.append(n)
21
22 self.executor.map(record_finished, range(10))
23 self.executor.shutdown(wait=True)
24 self.assertCountEqual(finished, range(10))
25
26 def test_default_workers(self):
27 executor = self.executor_type()
28 expected = min(32, (os.cpu_count() or 1) + 4)
29 self.assertEqual(executor._max_workers, expected)
30
31 def test_saturation(self):
32 executor = self.executor_type(4)
33 def acquire_lock(lock):
34 lock.acquire()
35
36 sem = threading.Semaphore(0)
37 for i in range(15 * executor._max_workers):
38 executor.submit(acquire_lock, sem)
39 self.assertEqual(len(executor._threads), executor._max_workers)
40 for i in range(15 * executor._max_workers):
41 sem.release()
42 executor.shutdown(wait=True)
43
44 def test_idle_thread_reuse(self):
45 executor = self.executor_type()
46 executor.submit(mul, 21, 2).result()
47 executor.submit(mul, 6, 7).result()
48 executor.submit(mul, 3, 14).result()
49 self.assertEqual(len(executor._threads), 1)
50 executor.shutdown(wait=True)
51
52 @unittest.skipUnless(hasattr(os, 'register_at_fork'), 'need os.register_at_fork')
53 @support.requires_resource('cpu')
54 def test_hang_global_shutdown_lock(self):
55 # bpo-45021: _global_shutdown_lock should be reinitialized in the child
56 # process, otherwise it will never exit
57 def submit(pool):
58 pool.submit(submit, pool)
59
60 with futures.ThreadPoolExecutor(1) as pool:
61 pool.submit(submit, pool)
62
63 for _ in range(50):
64 with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers:
65 workers.submit(tuple)
66
67 def test_executor_map_current_future_cancel(self):
68 stop_event = threading.Event()
69 log = []
70
71 def log_n_wait(ident):
72 log.append(f"{ident=} started")
73 try:
74 stop_event.wait()
75 finally:
76 log.append(f"{ident=} stopped")
77
78 with self.executor_type(max_workers=1) as pool:
79 # submit work to saturate the pool
80 fut = pool.submit(log_n_wait, ident="first")
81 try:
82 with contextlib.closing(
83 pool.map(log_n_wait, ["second", "third"], timeout=0)
84 ) as gen:
85 with self.assertRaises(TimeoutError):
86 next(gen)
87 finally:
88 stop_event.set()
89 fut.result()
90 # ident='second' is cancelled as a result of raising a TimeoutError
91 # ident='third' is cancelled because it remained in the collection of futures
92 self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"])
93
94
95 def setUpModule():
96 setup_module()
97
98
99 if __name__ == "__main__":
100 unittest.main()