(root)/
Python-3.11.7/
Lib/
test/
test_concurrent_futures/
test_init.py
       1  import contextlib
       2  import logging
       3  import queue
       4  import time
       5  import unittest
       6  from concurrent.futures._base import BrokenExecutor
       7  from logging.handlers import QueueHandler
       8  
       9  from test import support
      10  
      11  from .util import ExecutorMixin, create_executor_tests, setup_module
      12  
      13  
      14  INITIALIZER_STATUS = 'uninitialized'
      15  
      16  def init(x):
      17      global INITIALIZER_STATUS
      18      INITIALIZER_STATUS = x
      19  
      20  def get_init_status():
      21      return INITIALIZER_STATUS
      22  
      23  def init_fail(log_queue=None):
      24      if log_queue is not None:
      25          logger = logging.getLogger('concurrent.futures')
      26          logger.addHandler(QueueHandler(log_queue))
      27          logger.setLevel('CRITICAL')
      28          logger.propagate = False
      29      time.sleep(0.1)  # let some futures be scheduled
      30      raise ValueError('error in initializer')
      31  
      32  
      33  class ESC[4;38;5;81mInitializerMixin(ESC[4;38;5;149mExecutorMixin):
      34      worker_count = 2
      35  
      36      def setUp(self):
      37          global INITIALIZER_STATUS
      38          INITIALIZER_STATUS = 'uninitialized'
      39          self.executor_kwargs = dict(initializer=init,
      40                                      initargs=('initialized',))
      41          super().setUp()
      42  
      43      def test_initializer(self):
      44          futures = [self.executor.submit(get_init_status)
      45                     for _ in range(self.worker_count)]
      46  
      47          for f in futures:
      48              self.assertEqual(f.result(), 'initialized')
      49  
      50  
      51  class ESC[4;38;5;81mFailingInitializerMixin(ESC[4;38;5;149mExecutorMixin):
      52      worker_count = 2
      53  
      54      def setUp(self):
      55          if hasattr(self, "ctx"):
      56              # Pass a queue to redirect the child's logging output
      57              self.mp_context = self.get_context()
      58              self.log_queue = self.mp_context.Queue()
      59              self.executor_kwargs = dict(initializer=init_fail,
      60                                          initargs=(self.log_queue,))
      61          else:
      62              # In a thread pool, the child shares our logging setup
      63              # (see _assert_logged())
      64              self.mp_context = None
      65              self.log_queue = None
      66              self.executor_kwargs = dict(initializer=init_fail)
      67          super().setUp()
      68  
      69      def test_initializer(self):
      70          with self._assert_logged('ValueError: error in initializer'):
      71              try:
      72                  future = self.executor.submit(get_init_status)
      73              except BrokenExecutor:
      74                  # Perhaps the executor is already broken
      75                  pass
      76              else:
      77                  with self.assertRaises(BrokenExecutor):
      78                      future.result()
      79  
      80              # At some point, the executor should break
      81              for _ in support.sleeping_retry(5, "executor not broken"):
      82                  if self.executor._broken:
      83                      break
      84  
      85              # ... and from this point submit() is guaranteed to fail
      86              with self.assertRaises(BrokenExecutor):
      87                  self.executor.submit(get_init_status)
      88  
      89      @contextlib.contextmanager
      90      def _assert_logged(self, msg):
      91          if self.log_queue is not None:
      92              yield
      93              output = []
      94              try:
      95                  while True:
      96                      output.append(self.log_queue.get_nowait().getMessage())
      97              except queue.Empty:
      98                  pass
      99          else:
     100              with self.assertLogs('concurrent.futures', 'CRITICAL') as cm:
     101                  yield
     102              output = cm.output
     103          self.assertTrue(any(msg in line for line in output),
     104                          output)
     105  
     106  
     107  create_executor_tests(globals(), InitializerMixin)
     108  create_executor_tests(globals(), FailingInitializerMixin)
     109  
     110  
     111  def setUpModule():
     112      setup_module()
     113  
     114  
     115  if __name__ == "__main__":
     116      unittest.main()