(root)/
Python-3.11.7/
Lib/
test/
test_concurrent_futures/
test_shutdown.py
       1  import signal
       2  import sys
       3  import threading
       4  import time
       5  import unittest
       6  from concurrent import futures
       7  
       8  from test import support
       9  from test.support.script_helper import assert_python_ok
      10  
      11  from .util import (
      12      BaseTestCase, ThreadPoolMixin, ProcessPoolForkMixin,
      13      ProcessPoolForkserverMixin, ProcessPoolSpawnMixin,
      14      create_executor_tests, setup_module)
      15  
      16  
      17  def sleep_and_print(t, msg):
      18      time.sleep(t)
      19      print(msg)
      20      sys.stdout.flush()
      21  
      22  
      23  class ESC[4;38;5;81mExecutorShutdownTest:
      24      def test_run_after_shutdown(self):
      25          self.executor.shutdown()
      26          self.assertRaises(RuntimeError,
      27                            self.executor.submit,
      28                            pow, 2, 5)
      29  
      30      def test_interpreter_shutdown(self):
      31          # Test the atexit hook for shutdown of worker threads and processes
      32          rc, out, err = assert_python_ok('-c', """if 1:
      33              from concurrent.futures import {executor_type}
      34              from time import sleep
      35              from test.test_concurrent_futures.test_shutdown import sleep_and_print
      36              if __name__ == "__main__":
      37                  context = '{context}'
      38                  if context == "":
      39                      t = {executor_type}(5)
      40                  else:
      41                      from multiprocessing import get_context
      42                      context = get_context(context)
      43                      t = {executor_type}(5, mp_context=context)
      44                  t.submit(sleep_and_print, 1.0, "apple")
      45              """.format(executor_type=self.executor_type.__name__,
      46                         context=getattr(self, "ctx", "")))
      47          # Errors in atexit hooks don't change the process exit code, check
      48          # stderr manually.
      49          self.assertFalse(err)
      50          self.assertEqual(out.strip(), b"apple")
      51  
      52      def test_submit_after_interpreter_shutdown(self):
      53          # Test the atexit hook for shutdown of worker threads and processes
      54          rc, out, err = assert_python_ok('-c', """if 1:
      55              import atexit
      56              @atexit.register
      57              def run_last():
      58                  try:
      59                      t.submit(id, None)
      60                  except RuntimeError:
      61                      print("runtime-error")
      62                      raise
      63              from concurrent.futures import {executor_type}
      64              if __name__ == "__main__":
      65                  context = '{context}'
      66                  if not context:
      67                      t = {executor_type}(5)
      68                  else:
      69                      from multiprocessing import get_context
      70                      context = get_context(context)
      71                      t = {executor_type}(5, mp_context=context)
      72                      t.submit(id, 42).result()
      73              """.format(executor_type=self.executor_type.__name__,
      74                         context=getattr(self, "ctx", "")))
      75          # Errors in atexit hooks don't change the process exit code, check
      76          # stderr manually.
      77          self.assertIn("RuntimeError: cannot schedule new futures", err.decode())
      78          self.assertEqual(out.strip(), b"runtime-error")
      79  
      80      def test_hang_issue12364(self):
      81          fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
      82          self.executor.shutdown()
      83          for f in fs:
      84              f.result()
      85  
      86      def test_cancel_futures(self):
      87          assert self.worker_count <= 5, "test needs few workers"
      88          fs = [self.executor.submit(time.sleep, .1) for _ in range(50)]
      89          self.executor.shutdown(cancel_futures=True)
      90          # We can't guarantee the exact number of cancellations, but we can
      91          # guarantee that *some* were cancelled. With few workers, many of
      92          # the submitted futures should have been cancelled.
      93          cancelled = [fut for fut in fs if fut.cancelled()]
      94          self.assertGreater(len(cancelled), 20)
      95  
      96          # Ensure the other futures were able to finish.
      97          # Use "not fut.cancelled()" instead of "fut.done()" to include futures
      98          # that may have been left in a pending state.
      99          others = [fut for fut in fs if not fut.cancelled()]
     100          for fut in others:
     101              self.assertTrue(fut.done(), msg=f"{fut._state=}")
     102              self.assertIsNone(fut.exception())
     103  
     104          # Similar to the number of cancelled futures, we can't guarantee the
     105          # exact number that completed. But, we can guarantee that at least
     106          # one finished.
     107          self.assertGreater(len(others), 0)
     108  
     109      def test_hang_gh83386(self):
     110          """shutdown(wait=False) doesn't hang at exit with running futures.
     111  
     112          See https://github.com/python/cpython/issues/83386.
     113          """
     114          if self.executor_type == futures.ProcessPoolExecutor:
     115              raise unittest.SkipTest(
     116                  "Hangs, see https://github.com/python/cpython/issues/83386")
     117  
     118          rc, out, err = assert_python_ok('-c', """if True:
     119              from concurrent.futures import {executor_type}
     120              from test.test_concurrent_futures.test_shutdown import sleep_and_print
     121              if __name__ == "__main__":
     122                  if {context!r}: multiprocessing.set_start_method({context!r})
     123                  t = {executor_type}(max_workers=3)
     124                  t.submit(sleep_and_print, 1.0, "apple")
     125                  t.shutdown(wait=False)
     126              """.format(executor_type=self.executor_type.__name__,
     127                         context=getattr(self, 'ctx', None)))
     128          self.assertFalse(err)
     129          self.assertEqual(out.strip(), b"apple")
     130  
     131      def test_hang_gh94440(self):
     132          """shutdown(wait=True) doesn't hang when a future was submitted and
     133          quickly canceled right before shutdown.
     134  
     135          See https://github.com/python/cpython/issues/94440.
     136          """
     137          if not hasattr(signal, 'alarm'):
     138              raise unittest.SkipTest(
     139                  "Tested platform does not support the alarm signal")
     140  
     141          def timeout(_signum, _frame):
     142              raise RuntimeError("timed out waiting for shutdown")
     143  
     144          kwargs = {}
     145          if getattr(self, 'ctx', None):
     146              kwargs['mp_context'] = self.get_context()
     147          executor = self.executor_type(max_workers=1, **kwargs)
     148          executor.submit(int).result()
     149          old_handler = signal.signal(signal.SIGALRM, timeout)
     150          try:
     151              signal.alarm(5)
     152              executor.submit(int).cancel()
     153              executor.shutdown(wait=True)
     154          finally:
     155              signal.alarm(0)
     156              signal.signal(signal.SIGALRM, old_handler)
     157  
     158  
     159  class ESC[4;38;5;81mThreadPoolShutdownTest(ESC[4;38;5;149mThreadPoolMixin, ESC[4;38;5;149mExecutorShutdownTest, ESC[4;38;5;149mBaseTestCase):
     160      def test_threads_terminate(self):
     161          def acquire_lock(lock):
     162              lock.acquire()
     163  
     164          sem = threading.Semaphore(0)
     165          for i in range(3):
     166              self.executor.submit(acquire_lock, sem)
     167          self.assertEqual(len(self.executor._threads), 3)
     168          for i in range(3):
     169              sem.release()
     170          self.executor.shutdown()
     171          for t in self.executor._threads:
     172              t.join()
     173  
     174      def test_context_manager_shutdown(self):
     175          with futures.ThreadPoolExecutor(max_workers=5) as e:
     176              executor = e
     177              self.assertEqual(list(e.map(abs, range(-5, 5))),
     178                               [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
     179  
     180          for t in executor._threads:
     181              t.join()
     182  
     183      def test_del_shutdown(self):
     184          executor = futures.ThreadPoolExecutor(max_workers=5)
     185          res = executor.map(abs, range(-5, 5))
     186          threads = executor._threads
     187          del executor
     188  
     189          for t in threads:
     190              t.join()
     191  
     192          # Make sure the results were all computed before the
     193          # executor got shutdown.
     194          assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
     195  
     196      def test_shutdown_no_wait(self):
     197          # Ensure that the executor cleans up the threads when calling
     198          # shutdown with wait=False
     199          executor = futures.ThreadPoolExecutor(max_workers=5)
     200          res = executor.map(abs, range(-5, 5))
     201          threads = executor._threads
     202          executor.shutdown(wait=False)
     203          for t in threads:
     204              t.join()
     205  
     206          # Make sure the results were all computed before the
     207          # executor got shutdown.
     208          assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
     209  
     210  
     211      def test_thread_names_assigned(self):
     212          executor = futures.ThreadPoolExecutor(
     213              max_workers=5, thread_name_prefix='SpecialPool')
     214          executor.map(abs, range(-5, 5))
     215          threads = executor._threads
     216          del executor
     217          support.gc_collect()  # For PyPy or other GCs.
     218  
     219          for t in threads:
     220              self.assertRegex(t.name, r'^SpecialPool_[0-4]$')
     221              t.join()
     222  
     223      def test_thread_names_default(self):
     224          executor = futures.ThreadPoolExecutor(max_workers=5)
     225          executor.map(abs, range(-5, 5))
     226          threads = executor._threads
     227          del executor
     228          support.gc_collect()  # For PyPy or other GCs.
     229  
     230          for t in threads:
     231              # Ensure that our default name is reasonably sane and unique when
     232              # no thread_name_prefix was supplied.
     233              self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
     234              t.join()
     235  
     236      def test_cancel_futures_wait_false(self):
     237          # Can only be reliably tested for TPE, since PPE often hangs with
     238          # `wait=False` (even without *cancel_futures*).
     239          rc, out, err = assert_python_ok('-c', """if True:
     240              from concurrent.futures import ThreadPoolExecutor
     241              from test.test_concurrent_futures.test_shutdown import sleep_and_print
     242              if __name__ == "__main__":
     243                  t = ThreadPoolExecutor()
     244                  t.submit(sleep_and_print, .1, "apple")
     245                  t.shutdown(wait=False, cancel_futures=True)
     246              """)
     247          # Errors in atexit hooks don't change the process exit code, check
     248          # stderr manually.
     249          self.assertFalse(err)
     250          self.assertEqual(out.strip(), b"apple")
     251  
     252  
     253  class ESC[4;38;5;81mProcessPoolShutdownTest(ESC[4;38;5;149mExecutorShutdownTest):
     254      def test_processes_terminate(self):
     255          def acquire_lock(lock):
     256              lock.acquire()
     257  
     258          mp_context = self.get_context()
     259          if mp_context.get_start_method(allow_none=False) == "fork":
     260              # fork pre-spawns, not on demand.
     261              expected_num_processes = self.worker_count
     262          else:
     263              expected_num_processes = 3
     264  
     265          sem = mp_context.Semaphore(0)
     266          for _ in range(3):
     267              self.executor.submit(acquire_lock, sem)
     268          self.assertEqual(len(self.executor._processes), expected_num_processes)
     269          for _ in range(3):
     270              sem.release()
     271          processes = self.executor._processes
     272          self.executor.shutdown()
     273  
     274          for p in processes.values():
     275              p.join()
     276  
     277      def test_context_manager_shutdown(self):
     278          with futures.ProcessPoolExecutor(
     279                  max_workers=5, mp_context=self.get_context()) as e:
     280              processes = e._processes
     281              self.assertEqual(list(e.map(abs, range(-5, 5))),
     282                               [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
     283  
     284          for p in processes.values():
     285              p.join()
     286  
     287      def test_del_shutdown(self):
     288          executor = futures.ProcessPoolExecutor(
     289                  max_workers=5, mp_context=self.get_context())
     290          res = executor.map(abs, range(-5, 5))
     291          executor_manager_thread = executor._executor_manager_thread
     292          processes = executor._processes
     293          call_queue = executor._call_queue
     294          executor_manager_thread = executor._executor_manager_thread
     295          del executor
     296          support.gc_collect()  # For PyPy or other GCs.
     297  
     298          # Make sure that all the executor resources were properly cleaned by
     299          # the shutdown process
     300          executor_manager_thread.join()
     301          for p in processes.values():
     302              p.join()
     303          call_queue.join_thread()
     304  
     305          # Make sure the results were all computed before the
     306          # executor got shutdown.
     307          assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
     308  
     309      def test_shutdown_no_wait(self):
     310          # Ensure that the executor cleans up the processes when calling
     311          # shutdown with wait=False
     312          executor = futures.ProcessPoolExecutor(
     313                  max_workers=5, mp_context=self.get_context())
     314          res = executor.map(abs, range(-5, 5))
     315          processes = executor._processes
     316          call_queue = executor._call_queue
     317          executor_manager_thread = executor._executor_manager_thread
     318          executor.shutdown(wait=False)
     319  
     320          # Make sure that all the executor resources were properly cleaned by
     321          # the shutdown process
     322          executor_manager_thread.join()
     323          for p in processes.values():
     324              p.join()
     325          call_queue.join_thread()
     326  
     327          # Make sure the results were all computed before the executor got
     328          # shutdown.
     329          assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
     330  
     331  
     332  create_executor_tests(globals(), ProcessPoolShutdownTest,
     333                        executor_mixins=(ProcessPoolForkMixin,
     334                                         ProcessPoolForkserverMixin,
     335                                         ProcessPoolSpawnMixin))
     336  
     337  
     338  def setUpModule():
     339      setup_module()
     340  
     341  
     342  if __name__ == "__main__":
     343      unittest.main()