(root)/
Python-3.11.7/
Lib/
test/
test_multiprocessing_main_handling.py
       1  # tests __main__ module handling in multiprocessing
       2  from test import support
       3  from test.support import import_helper
       4  # Skip tests if _multiprocessing wasn't built.
       5  import_helper.import_module('_multiprocessing')
       6  
       7  import importlib
       8  import importlib.machinery
       9  import unittest
      10  import sys
      11  import os
      12  import os.path
      13  import py_compile
      14  
      15  from test.support import os_helper
      16  from test.support.script_helper import (
      17      make_pkg, make_script, make_zip_pkg, make_zip_script,
      18      assert_python_ok)
      19  
      20  if support.PGO:
      21      raise unittest.SkipTest("test is not helpful for PGO")
      22  
      23  # Look up which start methods are available to test
      24  import multiprocessing
      25  AVAILABLE_START_METHODS = set(multiprocessing.get_all_start_methods())
      26  
      27  # Issue #22332: Skip tests if sem_open implementation is broken.
      28  support.skip_if_broken_multiprocessing_synchronize()
      29  
      30  verbose = support.verbose
      31  
      32  test_source = """\
      33  # multiprocessing includes all sorts of shenanigans to make __main__
      34  # attributes accessible in the subprocess in a pickle compatible way.
      35  
      36  # We run the "doesn't work in the interactive interpreter" example from
      37  # the docs to make sure it *does* work from an executed __main__,
      38  # regardless of the invocation mechanism
      39  
      40  import sys
      41  import time
      42  from multiprocessing import Pool, set_start_method
      43  from test import support
      44  
      45  # We use this __main__ defined function in the map call below in order to
      46  # check that multiprocessing in correctly running the unguarded
      47  # code in child processes and then making it available as __main__
      48  def f(x):
      49      return x*x
      50  
      51  # Check explicit relative imports
      52  if "check_sibling" in __file__:
      53      # We're inside a package and not in a __main__.py file
      54      # so make sure explicit relative imports work correctly
      55      from . import sibling
      56  
      57  if __name__ == '__main__':
      58      start_method = sys.argv[1]
      59      set_start_method(start_method)
      60      results = []
      61      with Pool(5) as pool:
      62          pool.map_async(f, [1, 2, 3], callback=results.extend)
      63  
      64          # up to 1 min to report the results
      65          for _ in support.sleeping_retry(60, "Timed out waiting for results"):
      66              if results:
      67                  break
      68  
      69      results.sort()
      70      print(start_method, "->", results)
      71  
      72      pool.join()
      73  """
      74  
      75  test_source_main_skipped_in_children = """\
      76  # __main__.py files have an implied "if __name__ == '__main__'" so
      77  # multiprocessing should always skip running them in child processes
      78  
      79  # This means we can't use __main__ defined functions in child processes,
      80  # so we just use "int" as a passthrough operation below
      81  
      82  if __name__ != "__main__":
      83      raise RuntimeError("Should only be called as __main__!")
      84  
      85  import sys
      86  import time
      87  from multiprocessing import Pool, set_start_method
      88  from test import support
      89  
      90  start_method = sys.argv[1]
      91  set_start_method(start_method)
      92  results = []
      93  with Pool(5) as pool:
      94      pool.map_async(int, [1, 4, 9], callback=results.extend)
      95      # up to 1 min to report the results
      96      for _ in support.sleeping_retry(60, "Timed out waiting for results"):
      97          if results:
      98              break
      99  
     100  results.sort()
     101  print(start_method, "->", results)
     102  
     103  pool.join()
     104  """
     105  
     106  # These helpers were copied from test_cmd_line_script & tweaked a bit...
     107  
     108  def _make_test_script(script_dir, script_basename,
     109                        source=test_source, omit_suffix=False):
     110      to_return = make_script(script_dir, script_basename,
     111                              source, omit_suffix)
     112      # Hack to check explicit relative imports
     113      if script_basename == "check_sibling":
     114          make_script(script_dir, "sibling", "")
     115      importlib.invalidate_caches()
     116      return to_return
     117  
     118  def _make_test_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
     119                         source=test_source, depth=1):
     120      to_return = make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
     121                               source, depth)
     122      importlib.invalidate_caches()
     123      return to_return
     124  
     125  # There's no easy way to pass the script directory in to get
     126  # -m to work (avoiding that is the whole point of making
     127  # directories and zipfiles executable!)
     128  # So we fake it for testing purposes with a custom launch script
     129  launch_source = """\
     130  import sys, os.path, runpy
     131  sys.path.insert(0, %s)
     132  runpy._run_module_as_main(%r)
     133  """
     134  
     135  def _make_launch_script(script_dir, script_basename, module_name, path=None):
     136      if path is None:
     137          path = "os.path.dirname(__file__)"
     138      else:
     139          path = repr(path)
     140      source = launch_source % (path, module_name)
     141      to_return = make_script(script_dir, script_basename, source)
     142      importlib.invalidate_caches()
     143      return to_return
     144  
     145  class ESC[4;38;5;81mMultiProcessingCmdLineMixin():
     146      maxDiff = None # Show full tracebacks on subprocess failure
     147  
     148      def setUp(self):
     149          if self.start_method not in AVAILABLE_START_METHODS:
     150              self.skipTest("%r start method not available" % self.start_method)
     151  
     152      def _check_output(self, script_name, exit_code, out, err):
     153          if verbose > 1:
     154              print("Output from test script %r:" % script_name)
     155              print(repr(out))
     156          self.assertEqual(exit_code, 0)
     157          self.assertEqual(err.decode('utf-8'), '')
     158          expected_results = "%s -> [1, 4, 9]" % self.start_method
     159          self.assertEqual(out.decode('utf-8').strip(), expected_results)
     160  
     161      def _check_script(self, script_name, *cmd_line_switches):
     162          if not __debug__:
     163              cmd_line_switches += ('-' + 'O' * sys.flags.optimize,)
     164          run_args = cmd_line_switches + (script_name, self.start_method)
     165          rc, out, err = assert_python_ok(*run_args, __isolated=False)
     166          self._check_output(script_name, rc, out, err)
     167  
     168      def test_basic_script(self):
     169          with os_helper.temp_dir() as script_dir:
     170              script_name = _make_test_script(script_dir, 'script')
     171              self._check_script(script_name)
     172  
     173      def test_basic_script_no_suffix(self):
     174          with os_helper.temp_dir() as script_dir:
     175              script_name = _make_test_script(script_dir, 'script',
     176                                              omit_suffix=True)
     177              self._check_script(script_name)
     178  
     179      def test_ipython_workaround(self):
     180          # Some versions of the IPython launch script are missing the
     181          # __name__ = "__main__" guard, and multiprocessing has long had
     182          # a workaround for that case
     183          # See https://github.com/ipython/ipython/issues/4698
     184          source = test_source_main_skipped_in_children
     185          with os_helper.temp_dir() as script_dir:
     186              script_name = _make_test_script(script_dir, 'ipython',
     187                                              source=source)
     188              self._check_script(script_name)
     189              script_no_suffix = _make_test_script(script_dir, 'ipython',
     190                                                   source=source,
     191                                                   omit_suffix=True)
     192              self._check_script(script_no_suffix)
     193  
     194      def test_script_compiled(self):
     195          with os_helper.temp_dir() as script_dir:
     196              script_name = _make_test_script(script_dir, 'script')
     197              py_compile.compile(script_name, doraise=True)
     198              os.remove(script_name)
     199              pyc_file = import_helper.make_legacy_pyc(script_name)
     200              self._check_script(pyc_file)
     201  
     202      def test_directory(self):
     203          source = self.main_in_children_source
     204          with os_helper.temp_dir() as script_dir:
     205              script_name = _make_test_script(script_dir, '__main__',
     206                                              source=source)
     207              self._check_script(script_dir)
     208  
     209      def test_directory_compiled(self):
     210          source = self.main_in_children_source
     211          with os_helper.temp_dir() as script_dir:
     212              script_name = _make_test_script(script_dir, '__main__',
     213                                              source=source)
     214              py_compile.compile(script_name, doraise=True)
     215              os.remove(script_name)
     216              pyc_file = import_helper.make_legacy_pyc(script_name)
     217              self._check_script(script_dir)
     218  
     219      def test_zipfile(self):
     220          source = self.main_in_children_source
     221          with os_helper.temp_dir() as script_dir:
     222              script_name = _make_test_script(script_dir, '__main__',
     223                                              source=source)
     224              zip_name, run_name = make_zip_script(script_dir, 'test_zip', script_name)
     225              self._check_script(zip_name)
     226  
     227      def test_zipfile_compiled(self):
     228          source = self.main_in_children_source
     229          with os_helper.temp_dir() as script_dir:
     230              script_name = _make_test_script(script_dir, '__main__',
     231                                              source=source)
     232              compiled_name = py_compile.compile(script_name, doraise=True)
     233              zip_name, run_name = make_zip_script(script_dir, 'test_zip', compiled_name)
     234              self._check_script(zip_name)
     235  
     236      def test_module_in_package(self):
     237          with os_helper.temp_dir() as script_dir:
     238              pkg_dir = os.path.join(script_dir, 'test_pkg')
     239              make_pkg(pkg_dir)
     240              script_name = _make_test_script(pkg_dir, 'check_sibling')
     241              launch_name = _make_launch_script(script_dir, 'launch',
     242                                                'test_pkg.check_sibling')
     243              self._check_script(launch_name)
     244  
     245      def test_module_in_package_in_zipfile(self):
     246          with os_helper.temp_dir() as script_dir:
     247              zip_name, run_name = _make_test_zip_pkg(script_dir, 'test_zip', 'test_pkg', 'script')
     248              launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg.script', zip_name)
     249              self._check_script(launch_name)
     250  
     251      def test_module_in_subpackage_in_zipfile(self):
     252          with os_helper.temp_dir() as script_dir:
     253              zip_name, run_name = _make_test_zip_pkg(script_dir, 'test_zip', 'test_pkg', 'script', depth=2)
     254              launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg.test_pkg.script', zip_name)
     255              self._check_script(launch_name)
     256  
     257      def test_package(self):
     258          source = self.main_in_children_source
     259          with os_helper.temp_dir() as script_dir:
     260              pkg_dir = os.path.join(script_dir, 'test_pkg')
     261              make_pkg(pkg_dir)
     262              script_name = _make_test_script(pkg_dir, '__main__',
     263                                              source=source)
     264              launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg')
     265              self._check_script(launch_name)
     266  
     267      def test_package_compiled(self):
     268          source = self.main_in_children_source
     269          with os_helper.temp_dir() as script_dir:
     270              pkg_dir = os.path.join(script_dir, 'test_pkg')
     271              make_pkg(pkg_dir)
     272              script_name = _make_test_script(pkg_dir, '__main__',
     273                                              source=source)
     274              compiled_name = py_compile.compile(script_name, doraise=True)
     275              os.remove(script_name)
     276              pyc_file = import_helper.make_legacy_pyc(script_name)
     277              launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg')
     278              self._check_script(launch_name)
     279  
     280  # Test all supported start methods (setupClass skips as appropriate)
     281  
     282  class ESC[4;38;5;81mSpawnCmdLineTest(ESC[4;38;5;149mMultiProcessingCmdLineMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     283      start_method = 'spawn'
     284      main_in_children_source = test_source_main_skipped_in_children
     285  
     286  class ESC[4;38;5;81mForkCmdLineTest(ESC[4;38;5;149mMultiProcessingCmdLineMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     287      start_method = 'fork'
     288      main_in_children_source = test_source
     289  
     290  class ESC[4;38;5;81mForkServerCmdLineTest(ESC[4;38;5;149mMultiProcessingCmdLineMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     291      start_method = 'forkserver'
     292      main_in_children_source = test_source_main_skipped_in_children
     293  
     294  def tearDownModule():
     295      support.reap_children()
     296  
     297  if __name__ == '__main__':
     298      unittest.main()