(root)/
Python-3.11.7/
Lib/
test/
test_concurrent_futures/
util.py
       1  import multiprocessing
       2  import sys
       3  import time
       4  import unittest
       5  from concurrent import futures
       6  from concurrent.futures._base import (
       7      PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
       8      )
       9  from concurrent.futures.process import _check_system_limits
      10  
      11  from test import support
      12  from test.support import threading_helper
      13  
      14  
      15  def create_future(state=PENDING, exception=None, result=None):
      16      f = Future()
      17      f._state = state
      18      f._exception = exception
      19      f._result = result
      20      return f
      21  
      22  
      23  PENDING_FUTURE = create_future(state=PENDING)
      24  RUNNING_FUTURE = create_future(state=RUNNING)
      25  CANCELLED_FUTURE = create_future(state=CANCELLED)
      26  CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
      27  EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError())
      28  SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
      29  
      30  
      31  class ESC[4;38;5;81mBaseTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      32      def setUp(self):
      33          self._thread_key = threading_helper.threading_setup()
      34  
      35      def tearDown(self):
      36          support.reap_children()
      37          threading_helper.threading_cleanup(*self._thread_key)
      38  
      39  
      40  class ESC[4;38;5;81mExecutorMixin:
      41      worker_count = 5
      42      executor_kwargs = {}
      43  
      44      def setUp(self):
      45          super().setUp()
      46  
      47          self.t1 = time.monotonic()
      48          if hasattr(self, "ctx"):
      49              self.executor = self.executor_type(
      50                  max_workers=self.worker_count,
      51                  mp_context=self.get_context(),
      52                  **self.executor_kwargs)
      53          else:
      54              self.executor = self.executor_type(
      55                  max_workers=self.worker_count,
      56                  **self.executor_kwargs)
      57  
      58      def tearDown(self):
      59          self.executor.shutdown(wait=True)
      60          self.executor = None
      61  
      62          dt = time.monotonic() - self.t1
      63          if support.verbose:
      64              print("%.2fs" % dt, end=' ')
      65          self.assertLess(dt, 300, "synchronization issue: test lasted too long")
      66  
      67          super().tearDown()
      68  
      69      def get_context(self):
      70          return multiprocessing.get_context(self.ctx)
      71  
      72  
      73  class ESC[4;38;5;81mThreadPoolMixin(ESC[4;38;5;149mExecutorMixin):
      74      executor_type = futures.ThreadPoolExecutor
      75  
      76  
      77  class ESC[4;38;5;81mProcessPoolForkMixin(ESC[4;38;5;149mExecutorMixin):
      78      executor_type = futures.ProcessPoolExecutor
      79      ctx = "fork"
      80  
      81      def get_context(self):
      82          try:
      83              _check_system_limits()
      84          except NotImplementedError:
      85              self.skipTest("ProcessPoolExecutor unavailable on this system")
      86          if sys.platform == "win32":
      87              self.skipTest("require unix system")
      88          return super().get_context()
      89  
      90  
      91  class ESC[4;38;5;81mProcessPoolSpawnMixin(ESC[4;38;5;149mExecutorMixin):
      92      executor_type = futures.ProcessPoolExecutor
      93      ctx = "spawn"
      94  
      95      def get_context(self):
      96          try:
      97              _check_system_limits()
      98          except NotImplementedError:
      99              self.skipTest("ProcessPoolExecutor unavailable on this system")
     100          return super().get_context()
     101  
     102  
     103  class ESC[4;38;5;81mProcessPoolForkserverMixin(ESC[4;38;5;149mExecutorMixin):
     104      executor_type = futures.ProcessPoolExecutor
     105      ctx = "forkserver"
     106  
     107      def get_context(self):
     108          try:
     109              _check_system_limits()
     110          except NotImplementedError:
     111              self.skipTest("ProcessPoolExecutor unavailable on this system")
     112          if sys.platform == "win32":
     113              self.skipTest("require unix system")
     114          return super().get_context()
     115  
     116  
     117  def create_executor_tests(remote_globals, mixin, bases=(BaseTestCase,),
     118                            executor_mixins=(ThreadPoolMixin,
     119                                             ProcessPoolForkMixin,
     120                                             ProcessPoolForkserverMixin,
     121                                             ProcessPoolSpawnMixin)):
     122      def strip_mixin(name):
     123          if name.endswith(('Mixin', 'Tests')):
     124              return name[:-5]
     125          elif name.endswith('Test'):
     126              return name[:-4]
     127          else:
     128              return name
     129  
     130      module = remote_globals['__name__']
     131      for exe in executor_mixins:
     132          name = ("%s%sTest"
     133                  % (strip_mixin(exe.__name__), strip_mixin(mixin.__name__)))
     134          cls = type(name, (mixin,) + (exe,) + bases, {'__module__': module})
     135          remote_globals[name] = cls
     136  
     137  
     138  def setup_module():
     139      unittest.addModuleCleanup(multiprocessing.util._cleanup_tests)
     140      thread_info = threading_helper.threading_setup()
     141      unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)