(root)/
Python-3.11.7/
Lib/
test/
test_concurrent_futures/
test_deadlock.py
       1  import contextlib
       2  import queue
       3  import signal
       4  import sys
       5  import time
       6  import unittest
       7  import unittest.mock
       8  from pickle import PicklingError
       9  from concurrent import futures
      10  from concurrent.futures.process import BrokenProcessPool, _ThreadWakeup
      11  
      12  from test import support
      13  
      14  from .util import (
      15      create_executor_tests, setup_module,
      16      ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin)
      17  
      18  
      19  def _crash(delay=None):
      20      """Induces a segfault."""
      21      if delay:
      22          time.sleep(delay)
      23      import faulthandler
      24      faulthandler.disable()
      25      faulthandler._sigsegv()
      26  
      27  
      28  def _crash_with_data(data):
      29      """Induces a segfault with dummy data in input."""
      30      _crash()
      31  
      32  
      33  def _exit():
      34      """Induces a sys exit with exitcode 1."""
      35      sys.exit(1)
      36  
      37  
      38  def _raise_error(Err):
      39      """Function that raises an Exception in process."""
      40      raise Err()
      41  
      42  
      43  def _raise_error_ignore_stderr(Err):
      44      """Function that raises an Exception in process and ignores stderr."""
      45      import io
      46      sys.stderr = io.StringIO()
      47      raise Err()
      48  
      49  
      50  def _return_instance(cls):
      51      """Function that returns a instance of cls."""
      52      return cls()
      53  
      54  
      55  class ESC[4;38;5;81mCrashAtPickle(ESC[4;38;5;149mobject):
      56      """Bad object that triggers a segfault at pickling time."""
      57      def __reduce__(self):
      58          _crash()
      59  
      60  
      61  class ESC[4;38;5;81mCrashAtUnpickle(ESC[4;38;5;149mobject):
      62      """Bad object that triggers a segfault at unpickling time."""
      63      def __reduce__(self):
      64          return _crash, ()
      65  
      66  
      67  class ESC[4;38;5;81mExitAtPickle(ESC[4;38;5;149mobject):
      68      """Bad object that triggers a process exit at pickling time."""
      69      def __reduce__(self):
      70          _exit()
      71  
      72  
      73  class ESC[4;38;5;81mExitAtUnpickle(ESC[4;38;5;149mobject):
      74      """Bad object that triggers a process exit at unpickling time."""
      75      def __reduce__(self):
      76          return _exit, ()
      77  
      78  
      79  class ESC[4;38;5;81mErrorAtPickle(ESC[4;38;5;149mobject):
      80      """Bad object that triggers an error at pickling time."""
      81      def __reduce__(self):
      82          from pickle import PicklingError
      83          raise PicklingError("Error in pickle")
      84  
      85  
      86  class ESC[4;38;5;81mErrorAtUnpickle(ESC[4;38;5;149mobject):
      87      """Bad object that triggers an error at unpickling time."""
      88      def __reduce__(self):
      89          from pickle import UnpicklingError
      90          return _raise_error_ignore_stderr, (UnpicklingError, )
      91  
      92  
      93  class ESC[4;38;5;81mExecutorDeadlockTest:
      94      TIMEOUT = support.LONG_TIMEOUT
      95  
      96      def _fail_on_deadlock(self, executor):
      97          # If we did not recover before TIMEOUT seconds, consider that the
      98          # executor is in a deadlock state and forcefully clean all its
      99          # composants.
     100          import faulthandler
     101          from tempfile import TemporaryFile
     102          with TemporaryFile(mode="w+") as f:
     103              faulthandler.dump_traceback(file=f)
     104              f.seek(0)
     105              tb = f.read()
     106          for p in executor._processes.values():
     107              p.terminate()
     108          # This should be safe to call executor.shutdown here as all possible
     109          # deadlocks should have been broken.
     110          executor.shutdown(wait=True)
     111          print(f"\nTraceback:\n {tb}", file=sys.__stderr__)
     112          self.fail(f"Executor deadlock:\n\n{tb}")
     113  
     114  
     115      def _check_crash(self, error, func, *args, ignore_stderr=False):
     116          # test for deadlock caused by crashes in a pool
     117          self.executor.shutdown(wait=True)
     118  
     119          executor = self.executor_type(
     120              max_workers=2, mp_context=self.get_context())
     121          res = executor.submit(func, *args)
     122  
     123          if ignore_stderr:
     124              cm = support.captured_stderr()
     125          else:
     126              cm = contextlib.nullcontext()
     127  
     128          try:
     129              with self.assertRaises(error):
     130                  with cm:
     131                      res.result(timeout=self.TIMEOUT)
     132          except futures.TimeoutError:
     133              # If we did not recover before TIMEOUT seconds,
     134              # consider that the executor is in a deadlock state
     135              self._fail_on_deadlock(executor)
     136          executor.shutdown(wait=True)
     137  
     138      def test_error_at_task_pickle(self):
     139          # Check problem occurring while pickling a task in
     140          # the task_handler thread
     141          self._check_crash(PicklingError, id, ErrorAtPickle())
     142  
     143      def test_exit_at_task_unpickle(self):
     144          # Check problem occurring while unpickling a task on workers
     145          self._check_crash(BrokenProcessPool, id, ExitAtUnpickle())
     146  
     147      def test_error_at_task_unpickle(self):
     148          # gh-109832: Restore stderr overriden by _raise_error_ignore_stderr()
     149          self.addCleanup(setattr, sys, 'stderr', sys.stderr)
     150  
     151          # Check problem occurring while unpickling a task on workers
     152          self._check_crash(BrokenProcessPool, id, ErrorAtUnpickle())
     153  
     154      def test_crash_at_task_unpickle(self):
     155          # Check problem occurring while unpickling a task on workers
     156          self._check_crash(BrokenProcessPool, id, CrashAtUnpickle())
     157  
     158      def test_crash_during_func_exec_on_worker(self):
     159          # Check problem occurring during func execution on workers
     160          self._check_crash(BrokenProcessPool, _crash)
     161  
     162      def test_exit_during_func_exec_on_worker(self):
     163          # Check problem occurring during func execution on workers
     164          self._check_crash(SystemExit, _exit)
     165  
     166      def test_error_during_func_exec_on_worker(self):
     167          # Check problem occurring during func execution on workers
     168          self._check_crash(RuntimeError, _raise_error, RuntimeError)
     169  
     170      def test_crash_during_result_pickle_on_worker(self):
     171          # Check problem occurring while pickling a task result
     172          # on workers
     173          self._check_crash(BrokenProcessPool, _return_instance, CrashAtPickle)
     174  
     175      def test_exit_during_result_pickle_on_worker(self):
     176          # Check problem occurring while pickling a task result
     177          # on workers
     178          self._check_crash(SystemExit, _return_instance, ExitAtPickle)
     179  
     180      def test_error_during_result_pickle_on_worker(self):
     181          # Check problem occurring while pickling a task result
     182          # on workers
     183          self._check_crash(PicklingError, _return_instance, ErrorAtPickle)
     184  
     185      def test_error_during_result_unpickle_in_result_handler(self):
     186          # gh-109832: Restore stderr overriden by _raise_error_ignore_stderr()
     187          self.addCleanup(setattr, sys, 'stderr', sys.stderr)
     188  
     189          # Check problem occurring while unpickling a task in
     190          # the result_handler thread
     191          self._check_crash(BrokenProcessPool,
     192                            _return_instance, ErrorAtUnpickle,
     193                            ignore_stderr=True)
     194  
     195      def test_exit_during_result_unpickle_in_result_handler(self):
     196          # Check problem occurring while unpickling a task in
     197          # the result_handler thread
     198          self._check_crash(BrokenProcessPool, _return_instance, ExitAtUnpickle)
     199  
     200      def test_shutdown_deadlock(self):
     201          # Test that the pool calling shutdown do not cause deadlock
     202          # if a worker fails after the shutdown call.
     203          self.executor.shutdown(wait=True)
     204          with self.executor_type(max_workers=2,
     205                                  mp_context=self.get_context()) as executor:
     206              self.executor = executor  # Allow clean up in fail_on_deadlock
     207              f = executor.submit(_crash, delay=.1)
     208              executor.shutdown(wait=True)
     209              with self.assertRaises(BrokenProcessPool):
     210                  f.result()
     211  
     212      def test_shutdown_deadlock_pickle(self):
     213          # Test that the pool calling shutdown with wait=False does not cause
     214          # a deadlock if a task fails at pickle after the shutdown call.
     215          # Reported in bpo-39104.
     216          self.executor.shutdown(wait=True)
     217          with self.executor_type(max_workers=2,
     218                                  mp_context=self.get_context()) as executor:
     219              self.executor = executor  # Allow clean up in fail_on_deadlock
     220  
     221              # Start the executor and get the executor_manager_thread to collect
     222              # the threads and avoid dangling thread that should be cleaned up
     223              # asynchronously.
     224              executor.submit(id, 42).result()
     225              executor_manager = executor._executor_manager_thread
     226  
     227              # Submit a task that fails at pickle and shutdown the executor
     228              # without waiting
     229              f = executor.submit(id, ErrorAtPickle())
     230              executor.shutdown(wait=False)
     231              with self.assertRaises(PicklingError):
     232                  f.result()
     233  
     234          # Make sure the executor is eventually shutdown and do not leave
     235          # dangling threads
     236          executor_manager.join()
     237  
     238      def test_crash_big_data(self):
     239          # Test that there is a clean exception instad of a deadlock when a
     240          # child process crashes while some data is being written into the
     241          # queue.
     242          # https://github.com/python/cpython/issues/94777
     243          self.executor.shutdown(wait=True)
     244          data = "a" * support.PIPE_MAX_SIZE
     245          with self.executor_type(max_workers=2,
     246                                  mp_context=self.get_context()) as executor:
     247              self.executor = executor  # Allow clean up in fail_on_deadlock
     248              with self.assertRaises(BrokenProcessPool):
     249                  list(executor.map(_crash_with_data, [data] * 10))
     250  
     251      def test_gh105829_should_not_deadlock_if_wakeup_pipe_full(self):
     252          # Issue #105829: The _ExecutorManagerThread wakeup pipe could
     253          # fill up and block. See: https://github.com/python/cpython/issues/105829
     254  
     255          # Lots of cargo culting while writing this test, apologies if
     256          # something is really stupid...
     257  
     258          self.executor.shutdown(wait=True)
     259  
     260          if not hasattr(signal, 'alarm'):
     261              raise unittest.SkipTest(
     262                  "Tested platform does not support the alarm signal")
     263  
     264          def timeout(_signum, _frame):
     265              import faulthandler
     266              faulthandler.dump_traceback()
     267  
     268              raise RuntimeError("timed out while submitting jobs?")
     269  
     270          thread_run = futures.process._ExecutorManagerThread.run
     271          def mock_run(self):
     272              # Delay thread startup so the wakeup pipe can fill up and block
     273              time.sleep(3)
     274              thread_run(self)
     275  
     276          class ESC[4;38;5;81mMockWakeup(ESC[4;38;5;149m_ThreadWakeup):
     277              """Mock wakeup object to force the wakeup to block"""
     278              def __init__(self):
     279                  super().__init__()
     280                  self._dummy_queue = queue.Queue(maxsize=1)
     281  
     282              def wakeup(self):
     283                  self._dummy_queue.put(None, block=True)
     284                  super().wakeup()
     285  
     286              def clear(self):
     287                  super().clear()
     288                  try:
     289                      while True:
     290                          self._dummy_queue.get_nowait()
     291                  except queue.Empty:
     292                      pass
     293  
     294          with (unittest.mock.patch.object(futures.process._ExecutorManagerThread,
     295                                           'run', mock_run),
     296                unittest.mock.patch('concurrent.futures.process._ThreadWakeup',
     297                                    MockWakeup)):
     298              with self.executor_type(max_workers=2,
     299                                      mp_context=self.get_context()) as executor:
     300                  self.executor = executor  # Allow clean up in fail_on_deadlock
     301  
     302                  job_num = 100
     303                  job_data = range(job_num)
     304  
     305                  # Need to use sigalarm for timeout detection because
     306                  # Executor.submit is not guarded by any timeout (both
     307                  # self._work_ids.put(self._queue_count) and
     308                  # self._executor_manager_thread_wakeup.wakeup() might
     309                  # timeout, maybe more?). In this specific case it was
     310                  # the wakeup call that deadlocked on a blocking pipe.
     311                  old_handler = signal.signal(signal.SIGALRM, timeout)
     312                  try:
     313                      signal.alarm(int(self.TIMEOUT))
     314                      self.assertEqual(job_num, len(list(executor.map(int, job_data))))
     315                  finally:
     316                      signal.alarm(0)
     317                      signal.signal(signal.SIGALRM, old_handler)
     318  
     319  
     320  create_executor_tests(globals(), ExecutorDeadlockTest,
     321                        executor_mixins=(ProcessPoolForkMixin,
     322                                         ProcessPoolForkserverMixin,
     323                                         ProcessPoolSpawnMixin))
     324  
     325  def setUpModule():
     326      setup_module()
     327  
     328  
     329  if __name__ == "__main__":
     330      unittest.main()