(root)/
Python-3.11.7/
Lib/
test/
test_concurrent_futures/
test_process_pool.py
       1  import os
       2  import sys
       3  import time
       4  import unittest
       5  from concurrent import futures
       6  from concurrent.futures.process import BrokenProcessPool
       7  
       8  from test import support
       9  from test.support import hashlib_helper
      10  
      11  from .executor import ExecutorTest, mul
      12  from .util import (
      13      ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin,
      14      create_executor_tests, setup_module)
      15  
      16  
      17  class ESC[4;38;5;81mEventfulGCObj():
      18      def __init__(self, mgr):
      19          self.event = mgr.Event()
      20  
      21      def __del__(self):
      22          self.event.set()
      23  
      24  
      25  class ESC[4;38;5;81mProcessPoolExecutorTest(ESC[4;38;5;149mExecutorTest):
      26  
      27      @unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit')
      28      def test_max_workers_too_large(self):
      29          with self.assertRaisesRegex(ValueError,
      30                                      "max_workers must be <= 61"):
      31              futures.ProcessPoolExecutor(max_workers=62)
      32  
      33      def test_killed_child(self):
      34          # When a child process is abruptly terminated, the whole pool gets
      35          # "broken".
      36          futures = [self.executor.submit(time.sleep, 3)]
      37          # Get one of the processes, and terminate (kill) it
      38          p = next(iter(self.executor._processes.values()))
      39          p.terminate()
      40          for fut in futures:
      41              self.assertRaises(BrokenProcessPool, fut.result)
      42          # Submitting other jobs fails as well.
      43          self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
      44  
      45      def test_map_chunksize(self):
      46          def bad_map():
      47              list(self.executor.map(pow, range(40), range(40), chunksize=-1))
      48  
      49          ref = list(map(pow, range(40), range(40)))
      50          self.assertEqual(
      51              list(self.executor.map(pow, range(40), range(40), chunksize=6)),
      52              ref)
      53          self.assertEqual(
      54              list(self.executor.map(pow, range(40), range(40), chunksize=50)),
      55              ref)
      56          self.assertEqual(
      57              list(self.executor.map(pow, range(40), range(40), chunksize=40)),
      58              ref)
      59          self.assertRaises(ValueError, bad_map)
      60  
      61      @classmethod
      62      def _test_traceback(cls):
      63          raise RuntimeError(123) # some comment
      64  
      65      def test_traceback(self):
      66          # We want ensure that the traceback from the child process is
      67          # contained in the traceback raised in the main process.
      68          future = self.executor.submit(self._test_traceback)
      69          with self.assertRaises(Exception) as cm:
      70              future.result()
      71  
      72          exc = cm.exception
      73          self.assertIs(type(exc), RuntimeError)
      74          self.assertEqual(exc.args, (123,))
      75          cause = exc.__cause__
      76          self.assertIs(type(cause), futures.process._RemoteTraceback)
      77          self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
      78  
      79          with support.captured_stderr() as f1:
      80              try:
      81                  raise exc
      82              except RuntimeError:
      83                  sys.excepthook(*sys.exc_info())
      84          self.assertIn('raise RuntimeError(123) # some comment',
      85                        f1.getvalue())
      86  
      87      @hashlib_helper.requires_hashdigest('md5')
      88      def test_ressources_gced_in_workers(self):
      89          # Ensure that argument for a job are correctly gc-ed after the job
      90          # is finished
      91          mgr = self.get_context().Manager()
      92          obj = EventfulGCObj(mgr)
      93          future = self.executor.submit(id, obj)
      94          future.result()
      95  
      96          self.assertTrue(obj.event.wait(timeout=1))
      97  
      98          # explicitly destroy the object to ensure that EventfulGCObj.__del__()
      99          # is called while manager is still running.
     100          obj = None
     101          support.gc_collect()
     102  
     103          mgr.shutdown()
     104          mgr.join()
     105  
     106      def test_saturation(self):
     107          executor = self.executor
     108          mp_context = self.get_context()
     109          sem = mp_context.Semaphore(0)
     110          job_count = 15 * executor._max_workers
     111          for _ in range(job_count):
     112              executor.submit(sem.acquire)
     113          self.assertEqual(len(executor._processes), executor._max_workers)
     114          for _ in range(job_count):
     115              sem.release()
     116  
     117      def test_idle_process_reuse_one(self):
     118          executor = self.executor
     119          assert executor._max_workers >= 4
     120          if self.get_context().get_start_method(allow_none=False) == "fork":
     121              raise unittest.SkipTest("Incompatible with the fork start method.")
     122          executor.submit(mul, 21, 2).result()
     123          executor.submit(mul, 6, 7).result()
     124          executor.submit(mul, 3, 14).result()
     125          self.assertEqual(len(executor._processes), 1)
     126  
     127      def test_idle_process_reuse_multiple(self):
     128          executor = self.executor
     129          assert executor._max_workers <= 5
     130          if self.get_context().get_start_method(allow_none=False) == "fork":
     131              raise unittest.SkipTest("Incompatible with the fork start method.")
     132          executor.submit(mul, 12, 7).result()
     133          executor.submit(mul, 33, 25)
     134          executor.submit(mul, 25, 26).result()
     135          executor.submit(mul, 18, 29)
     136          executor.submit(mul, 1, 2).result()
     137          executor.submit(mul, 0, 9)
     138          self.assertLessEqual(len(executor._processes), 3)
     139          executor.shutdown()
     140  
     141      def test_max_tasks_per_child(self):
     142          context = self.get_context()
     143          if context.get_start_method(allow_none=False) == "fork":
     144              with self.assertRaises(ValueError):
     145                  self.executor_type(1, mp_context=context, max_tasks_per_child=3)
     146              return
     147          # not using self.executor as we need to control construction.
     148          # arguably this could go in another class w/o that mixin.
     149          executor = self.executor_type(
     150                  1, mp_context=context, max_tasks_per_child=3)
     151          f1 = executor.submit(os.getpid)
     152          original_pid = f1.result()
     153          # The worker pid remains the same as the worker could be reused
     154          f2 = executor.submit(os.getpid)
     155          self.assertEqual(f2.result(), original_pid)
     156          self.assertEqual(len(executor._processes), 1)
     157          f3 = executor.submit(os.getpid)
     158          self.assertEqual(f3.result(), original_pid)
     159  
     160          # A new worker is spawned, with a statistically different pid,
     161          # while the previous was reaped.
     162          f4 = executor.submit(os.getpid)
     163          new_pid = f4.result()
     164          self.assertNotEqual(original_pid, new_pid)
     165          self.assertEqual(len(executor._processes), 1)
     166  
     167          executor.shutdown()
     168  
     169      def test_max_tasks_per_child_defaults_to_spawn_context(self):
     170          # not using self.executor as we need to control construction.
     171          # arguably this could go in another class w/o that mixin.
     172          executor = self.executor_type(1, max_tasks_per_child=3)
     173          self.assertEqual(executor._mp_context.get_start_method(), "spawn")
     174  
     175      def test_max_tasks_early_shutdown(self):
     176          context = self.get_context()
     177          if context.get_start_method(allow_none=False) == "fork":
     178              raise unittest.SkipTest("Incompatible with the fork start method.")
     179          # not using self.executor as we need to control construction.
     180          # arguably this could go in another class w/o that mixin.
     181          executor = self.executor_type(
     182                  3, mp_context=context, max_tasks_per_child=1)
     183          futures = []
     184          for i in range(6):
     185              futures.append(executor.submit(mul, i, i))
     186          executor.shutdown()
     187          for i, future in enumerate(futures):
     188              self.assertEqual(future.result(), mul(i, i))
     189  
     190  
     191  create_executor_tests(globals(), ProcessPoolExecutorTest,
     192                        executor_mixins=(ProcessPoolForkMixin,
     193                                         ProcessPoolForkserverMixin,
     194                                         ProcessPoolSpawnMixin))
     195  
     196  
     197  def setUpModule():
     198      setup_module()
     199  
     200  
     201  if __name__ == "__main__":
     202      unittest.main()