(root)/
Python-3.11.7/
Lib/
test/
test_concurrent_futures/
test_thread_pool.py
       1  import contextlib
       2  import multiprocessing as mp
       3  import multiprocessing.process
       4  import multiprocessing.util
       5  import os
       6  import threading
       7  import unittest
       8  from concurrent import futures
       9  from test import support
      10  
      11  from .executor import ExecutorTest, mul
      12  from .util import BaseTestCase, ThreadPoolMixin, setup_module
      13  
      14  
      15  class ESC[4;38;5;81mThreadPoolExecutorTest(ESC[4;38;5;149mThreadPoolMixin, ESC[4;38;5;149mExecutorTest, ESC[4;38;5;149mBaseTestCase):
      16      def test_map_submits_without_iteration(self):
      17          """Tests verifying issue 11777."""
      18          finished = []
      19          def record_finished(n):
      20              finished.append(n)
      21  
      22          self.executor.map(record_finished, range(10))
      23          self.executor.shutdown(wait=True)
      24          self.assertCountEqual(finished, range(10))
      25  
      26      def test_default_workers(self):
      27          executor = self.executor_type()
      28          expected = min(32, (os.cpu_count() or 1) + 4)
      29          self.assertEqual(executor._max_workers, expected)
      30  
      31      def test_saturation(self):
      32          executor = self.executor_type(4)
      33          def acquire_lock(lock):
      34              lock.acquire()
      35  
      36          sem = threading.Semaphore(0)
      37          for i in range(15 * executor._max_workers):
      38              executor.submit(acquire_lock, sem)
      39          self.assertEqual(len(executor._threads), executor._max_workers)
      40          for i in range(15 * executor._max_workers):
      41              sem.release()
      42          executor.shutdown(wait=True)
      43  
      44      def test_idle_thread_reuse(self):
      45          executor = self.executor_type()
      46          executor.submit(mul, 21, 2).result()
      47          executor.submit(mul, 6, 7).result()
      48          executor.submit(mul, 3, 14).result()
      49          self.assertEqual(len(executor._threads), 1)
      50          executor.shutdown(wait=True)
      51  
      52      @unittest.skipUnless(hasattr(os, 'register_at_fork'), 'need os.register_at_fork')
      53      @support.requires_resource('cpu')
      54      def test_hang_global_shutdown_lock(self):
      55          # bpo-45021: _global_shutdown_lock should be reinitialized in the child
      56          # process, otherwise it will never exit
      57          def submit(pool):
      58              pool.submit(submit, pool)
      59  
      60          with futures.ThreadPoolExecutor(1) as pool:
      61              pool.submit(submit, pool)
      62  
      63              for _ in range(50):
      64                  with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers:
      65                      workers.submit(tuple)
      66  
      67      def test_executor_map_current_future_cancel(self):
      68          stop_event = threading.Event()
      69          log = []
      70  
      71          def log_n_wait(ident):
      72              log.append(f"{ident=} started")
      73              try:
      74                  stop_event.wait()
      75              finally:
      76                  log.append(f"{ident=} stopped")
      77  
      78          with self.executor_type(max_workers=1) as pool:
      79              # submit work to saturate the pool
      80              fut = pool.submit(log_n_wait, ident="first")
      81              try:
      82                  with contextlib.closing(
      83                      pool.map(log_n_wait, ["second", "third"], timeout=0)
      84                  ) as gen:
      85                      with self.assertRaises(TimeoutError):
      86                          next(gen)
      87              finally:
      88                  stop_event.set()
      89              fut.result()
      90          # ident='second' is cancelled as a result of raising a TimeoutError
      91          # ident='third' is cancelled because it remained in the collection of futures
      92          self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"])
      93  
      94  
      95  def setUpModule():
      96      setup_module()
      97  
      98  
      99  if __name__ == "__main__":
     100      unittest.main()