1  import contextlib
       2  import sys
       3  import time
       4  import unittest
       5  from pickle import PicklingError
       6  from concurrent import futures
       7  from concurrent.futures.process import BrokenProcessPool
       8  
       9  from test import support
      10  
      11  from .util import (
      12      create_executor_tests, setup_module,
      13      ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin)
      14  
      15  
      16  def _crash(delay=None):
      17      """Induces a segfault."""
      18      if delay:
      19          time.sleep(delay)
      20      import faulthandler
      21      faulthandler.disable()
      22      faulthandler._sigsegv()
      23  
      24  
      25  def _crash_with_data(data):
      26      """Induces a segfault with dummy data in input."""
      27      _crash()
      28  
      29  
      30  def _exit():
      31      """Induces a sys exit with exitcode 1."""
      32      sys.exit(1)
      33  
      34  
      35  def _raise_error(Err):
      36      """Function that raises an Exception in process."""
      37      raise Err()
      38  
      39  
      40  def _raise_error_ignore_stderr(Err):
      41      """Function that raises an Exception in process and ignores stderr."""
      42      import io
      43      sys.stderr = io.StringIO()
      44      raise Err()
      45  
      46  
      47  def _return_instance(cls):
      48      """Function that returns a instance of cls."""
      49      return cls()
      50  
      51  
      52  class ESC[4;38;5;81mCrashAtPickle(ESC[4;38;5;149mobject):
      53      """Bad object that triggers a segfault at pickling time."""
      54      def __reduce__(self):
      55          _crash()
      56  
      57  
      58  class ESC[4;38;5;81mCrashAtUnpickle(ESC[4;38;5;149mobject):
      59      """Bad object that triggers a segfault at unpickling time."""
      60      def __reduce__(self):
      61          return _crash, ()
      62  
      63  
      64  class ESC[4;38;5;81mExitAtPickle(ESC[4;38;5;149mobject):
      65      """Bad object that triggers a process exit at pickling time."""
      66      def __reduce__(self):
      67          _exit()
      68  
      69  
      70  class ESC[4;38;5;81mExitAtUnpickle(ESC[4;38;5;149mobject):
      71      """Bad object that triggers a process exit at unpickling time."""
      72      def __reduce__(self):
      73          return _exit, ()
      74  
      75  
      76  class ESC[4;38;5;81mErrorAtPickle(ESC[4;38;5;149mobject):
      77      """Bad object that triggers an error at pickling time."""
      78      def __reduce__(self):
      79          from pickle import PicklingError
      80          raise PicklingError("Error in pickle")
      81  
      82  
      83  class ESC[4;38;5;81mErrorAtUnpickle(ESC[4;38;5;149mobject):
      84      """Bad object that triggers an error at unpickling time."""
      85      def __reduce__(self):
      86          from pickle import UnpicklingError
      87          return _raise_error_ignore_stderr, (UnpicklingError, )
      88  
      89  
      90  class ESC[4;38;5;81mExecutorDeadlockTest:
      91      TIMEOUT = support.SHORT_TIMEOUT
      92  
      93      def _fail_on_deadlock(self, executor):
      94          # If we did not recover before TIMEOUT seconds, consider that the
      95          # executor is in a deadlock state and forcefully clean all its
      96          # composants.
      97          import faulthandler
      98          from tempfile import TemporaryFile
      99          with TemporaryFile(mode="w+") as f:
     100              faulthandler.dump_traceback(file=f)
     101              f.seek(0)
     102              tb = f.read()
     103          for p in executor._processes.values():
     104              p.terminate()
     105          # This should be safe to call executor.shutdown here as all possible
     106          # deadlocks should have been broken.
     107          executor.shutdown(wait=True)
     108          print(f"\nTraceback:\n {tb}", file=sys.__stderr__)
     109          self.fail(f"Executor deadlock:\n\n{tb}")
     110  
     111  
     112      def _check_crash(self, error, func, *args, ignore_stderr=False):
     113          # test for deadlock caused by crashes in a pool
     114          self.executor.shutdown(wait=True)
     115  
     116          executor = self.executor_type(
     117              max_workers=2, mp_context=self.get_context())
     118          res = executor.submit(func, *args)
     119  
     120          if ignore_stderr:
     121              cm = support.captured_stderr()
     122          else:
     123              cm = contextlib.nullcontext()
     124  
     125          try:
     126              with self.assertRaises(error):
     127                  with cm:
     128                      res.result(timeout=self.TIMEOUT)
     129          except futures.TimeoutError:
     130              # If we did not recover before TIMEOUT seconds,
     131              # consider that the executor is in a deadlock state
     132              self._fail_on_deadlock(executor)
     133          executor.shutdown(wait=True)
     134  
     135      def test_error_at_task_pickle(self):
     136          # Check problem occurring while pickling a task in
     137          # the task_handler thread
     138          self._check_crash(PicklingError, id, ErrorAtPickle())
     139  
     140      def test_exit_at_task_unpickle(self):
     141          # Check problem occurring while unpickling a task on workers
     142          self._check_crash(BrokenProcessPool, id, ExitAtUnpickle())
     143  
     144      def test_error_at_task_unpickle(self):
     145          # Check problem occurring while unpickling a task on workers
     146          self._check_crash(BrokenProcessPool, id, ErrorAtUnpickle())
     147  
     148      def test_crash_at_task_unpickle(self):
     149          # Check problem occurring while unpickling a task on workers
     150          self._check_crash(BrokenProcessPool, id, CrashAtUnpickle())
     151  
     152      def test_crash_during_func_exec_on_worker(self):
     153          # Check problem occurring during func execution on workers
     154          self._check_crash(BrokenProcessPool, _crash)
     155  
     156      def test_exit_during_func_exec_on_worker(self):
     157          # Check problem occurring during func execution on workers
     158          self._check_crash(SystemExit, _exit)
     159  
     160      def test_error_during_func_exec_on_worker(self):
     161          # Check problem occurring during func execution on workers
     162          self._check_crash(RuntimeError, _raise_error, RuntimeError)
     163  
     164      def test_crash_during_result_pickle_on_worker(self):
     165          # Check problem occurring while pickling a task result
     166          # on workers
     167          self._check_crash(BrokenProcessPool, _return_instance, CrashAtPickle)
     168  
     169      def test_exit_during_result_pickle_on_worker(self):
     170          # Check problem occurring while pickling a task result
     171          # on workers
     172          self._check_crash(SystemExit, _return_instance, ExitAtPickle)
     173  
     174      def test_error_during_result_pickle_on_worker(self):
     175          # Check problem occurring while pickling a task result
     176          # on workers
     177          self._check_crash(PicklingError, _return_instance, ErrorAtPickle)
     178  
     179      def test_error_during_result_unpickle_in_result_handler(self):
     180          # Check problem occurring while unpickling a task in
     181          # the result_handler thread
     182          self._check_crash(BrokenProcessPool,
     183                            _return_instance, ErrorAtUnpickle,
     184                            ignore_stderr=True)
     185  
     186      def test_exit_during_result_unpickle_in_result_handler(self):
     187          # Check problem occurring while unpickling a task in
     188          # the result_handler thread
     189          self._check_crash(BrokenProcessPool, _return_instance, ExitAtUnpickle)
     190  
     191      def test_shutdown_deadlock(self):
     192          # Test that the pool calling shutdown do not cause deadlock
     193          # if a worker fails after the shutdown call.
     194          self.executor.shutdown(wait=True)
     195          with self.executor_type(max_workers=2,
     196                                  mp_context=self.get_context()) as executor:
     197              self.executor = executor  # Allow clean up in fail_on_deadlock
     198              f = executor.submit(_crash, delay=.1)
     199              executor.shutdown(wait=True)
     200              with self.assertRaises(BrokenProcessPool):
     201                  f.result()
     202  
     203      def test_shutdown_deadlock_pickle(self):
     204          # Test that the pool calling shutdown with wait=False does not cause
     205          # a deadlock if a task fails at pickle after the shutdown call.
     206          # Reported in bpo-39104.
     207          self.executor.shutdown(wait=True)
     208          with self.executor_type(max_workers=2,
     209                                  mp_context=self.get_context()) as executor:
     210              self.executor = executor  # Allow clean up in fail_on_deadlock
     211  
     212              # Start the executor and get the executor_manager_thread to collect
     213              # the threads and avoid dangling thread that should be cleaned up
     214              # asynchronously.
     215              executor.submit(id, 42).result()
     216              executor_manager = executor._executor_manager_thread
     217  
     218              # Submit a task that fails at pickle and shutdown the executor
     219              # without waiting
     220              f = executor.submit(id, ErrorAtPickle())
     221              executor.shutdown(wait=False)
     222              with self.assertRaises(PicklingError):
     223                  f.result()
     224  
     225          # Make sure the executor is eventually shutdown and do not leave
     226          # dangling threads
     227          executor_manager.join()
     228  
     229      def test_crash_big_data(self):
     230          # Test that there is a clean exception instad of a deadlock when a
     231          # child process crashes while some data is being written into the
     232          # queue.
     233          # https://github.com/python/cpython/issues/94777
     234          self.executor.shutdown(wait=True)
     235          data = "a" * support.PIPE_MAX_SIZE
     236          with self.executor_type(max_workers=2,
     237                                  mp_context=self.get_context()) as executor:
     238              self.executor = executor  # Allow clean up in fail_on_deadlock
     239              with self.assertRaises(BrokenProcessPool):
     240                  list(executor.map(_crash_with_data, [data] * 10))
     241  
     242  
     243  create_executor_tests(globals(), ExecutorDeadlockTest,
     244                        executor_mixins=(ProcessPoolForkMixin,
     245                                         ProcessPoolForkserverMixin,
     246                                         ProcessPoolSpawnMixin))
     247  
     248  def setUpModule():
     249      setup_module()
     250  
     251  
     252  if __name__ == "__main__":
     253      unittest.main()