python (3.12.0)

(root)/
lib/
python3.12/
test/
test_multiprocessing_main_handling.py
       1  # tests __main__ module handling in multiprocessing
       2  from test import support
       3  from test.support import import_helper
       4  # Skip tests if _multiprocessing wasn't built.
       5  import_helper.import_module('_multiprocessing')
       6  
       7  import importlib
       8  import importlib.machinery
       9  import unittest
      10  import sys
      11  import os
      12  import os.path
      13  import py_compile
      14  
      15  from test.support import os_helper
      16  from test.support.script_helper import (
      17      make_pkg, make_script, make_zip_pkg, make_zip_script,
      18      assert_python_ok)
      19  
      20  if support.PGO:
      21      raise unittest.SkipTest("test is not helpful for PGO")
      22  
      23  # Look up which start methods are available to test
      24  import multiprocessing
      25  AVAILABLE_START_METHODS = set(multiprocessing.get_all_start_methods())
      26  
      27  # Issue #22332: Skip tests if sem_open implementation is broken.
      28  support.skip_if_broken_multiprocessing_synchronize()
      29  
      30  verbose = support.verbose
      31  
      32  test_source = """\
      33  # multiprocessing includes all sorts of shenanigans to make __main__
      34  # attributes accessible in the subprocess in a pickle compatible way.
      35  
      36  # We run the "doesn't work in the interactive interpreter" example from
      37  # the docs to make sure it *does* work from an executed __main__,
      38  # regardless of the invocation mechanism
      39  
      40  import sys
      41  import time
      42  from multiprocessing import Pool, set_start_method
      43  from test import support
      44  
      45  # We use this __main__ defined function in the map call below in order to
      46  # check that multiprocessing in correctly running the unguarded
      47  # code in child processes and then making it available as __main__
      48  def f(x):
      49      return x*x
      50  
      51  # Check explicit relative imports
      52  if "check_sibling" in __file__:
      53      # We're inside a package and not in a __main__.py file
      54      # so make sure explicit relative imports work correctly
      55      from . import sibling
      56  
      57  if __name__ == '__main__':
      58      start_method = sys.argv[1]
      59      set_start_method(start_method)
      60      results = []
      61      with Pool(5) as pool:
      62          pool.map_async(f, [1, 2, 3], callback=results.extend)
      63  
      64          # up to 1 min to report the results
      65          for _ in support.sleeping_retry(support.LONG_TIMEOUT,
      66                                          "Timed out waiting for results"):
      67              if results:
      68                  break
      69  
      70      results.sort()
      71      print(start_method, "->", results)
      72  
      73      pool.join()
      74  """
      75  
      76  test_source_main_skipped_in_children = """\
      77  # __main__.py files have an implied "if __name__ == '__main__'" so
      78  # multiprocessing should always skip running them in child processes
      79  
      80  # This means we can't use __main__ defined functions in child processes,
      81  # so we just use "int" as a passthrough operation below
      82  
      83  if __name__ != "__main__":
      84      raise RuntimeError("Should only be called as __main__!")
      85  
      86  import sys
      87  import time
      88  from multiprocessing import Pool, set_start_method
      89  from test import support
      90  
      91  start_method = sys.argv[1]
      92  set_start_method(start_method)
      93  results = []
      94  with Pool(5) as pool:
      95      pool.map_async(int, [1, 4, 9], callback=results.extend)
      96      # up to 1 min to report the results
      97      for _ in support.sleeping_retry(support.LONG_TIMEOUT,
      98                                      "Timed out waiting for results"):
      99          if results:
     100              break
     101  
     102  results.sort()
     103  print(start_method, "->", results)
     104  
     105  pool.join()
     106  """
     107  
     108  # These helpers were copied from test_cmd_line_script & tweaked a bit...
     109  
     110  def _make_test_script(script_dir, script_basename,
     111                        source=test_source, omit_suffix=False):
     112      to_return = make_script(script_dir, script_basename,
     113                              source, omit_suffix)
     114      # Hack to check explicit relative imports
     115      if script_basename == "check_sibling":
     116          make_script(script_dir, "sibling", "")
     117      importlib.invalidate_caches()
     118      return to_return
     119  
     120  def _make_test_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
     121                         source=test_source, depth=1):
     122      to_return = make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
     123                               source, depth)
     124      importlib.invalidate_caches()
     125      return to_return
     126  
     127  # There's no easy way to pass the script directory in to get
     128  # -m to work (avoiding that is the whole point of making
     129  # directories and zipfiles executable!)
     130  # So we fake it for testing purposes with a custom launch script
     131  launch_source = """\
     132  import sys, os.path, runpy
     133  sys.path.insert(0, %s)
     134  runpy._run_module_as_main(%r)
     135  """
     136  
     137  def _make_launch_script(script_dir, script_basename, module_name, path=None):
     138      if path is None:
     139          path = "os.path.dirname(__file__)"
     140      else:
     141          path = repr(path)
     142      source = launch_source % (path, module_name)
     143      to_return = make_script(script_dir, script_basename, source)
     144      importlib.invalidate_caches()
     145      return to_return
     146  
     147  class ESC[4;38;5;81mMultiProcessingCmdLineMixin():
     148      maxDiff = None # Show full tracebacks on subprocess failure
     149  
     150      def setUp(self):
     151          if self.start_method not in AVAILABLE_START_METHODS:
     152              self.skipTest("%r start method not available" % self.start_method)
     153  
     154      def _check_output(self, script_name, exit_code, out, err):
     155          if verbose > 1:
     156              print("Output from test script %r:" % script_name)
     157              print(repr(out))
     158          self.assertEqual(exit_code, 0)
     159          self.assertEqual(err.decode('utf-8'), '')
     160          expected_results = "%s -> [1, 4, 9]" % self.start_method
     161          self.assertEqual(out.decode('utf-8').strip(), expected_results)
     162  
     163      def _check_script(self, script_name, *cmd_line_switches):
     164          if not __debug__:
     165              cmd_line_switches += ('-' + 'O' * sys.flags.optimize,)
     166          run_args = cmd_line_switches + (script_name, self.start_method)
     167          rc, out, err = assert_python_ok(*run_args, __isolated=False)
     168          self._check_output(script_name, rc, out, err)
     169  
     170      def test_basic_script(self):
     171          with os_helper.temp_dir() as script_dir:
     172              script_name = _make_test_script(script_dir, 'script')
     173              self._check_script(script_name)
     174  
     175      def test_basic_script_no_suffix(self):
     176          with os_helper.temp_dir() as script_dir:
     177              script_name = _make_test_script(script_dir, 'script',
     178                                              omit_suffix=True)
     179              self._check_script(script_name)
     180  
     181      def test_ipython_workaround(self):
     182          # Some versions of the IPython launch script are missing the
     183          # __name__ = "__main__" guard, and multiprocessing has long had
     184          # a workaround for that case
     185          # See https://github.com/ipython/ipython/issues/4698
     186          source = test_source_main_skipped_in_children
     187          with os_helper.temp_dir() as script_dir:
     188              script_name = _make_test_script(script_dir, 'ipython',
     189                                              source=source)
     190              self._check_script(script_name)
     191              script_no_suffix = _make_test_script(script_dir, 'ipython',
     192                                                   source=source,
     193                                                   omit_suffix=True)
     194              self._check_script(script_no_suffix)
     195  
     196      def test_script_compiled(self):
     197          with os_helper.temp_dir() as script_dir:
     198              script_name = _make_test_script(script_dir, 'script')
     199              py_compile.compile(script_name, doraise=True)
     200              os.remove(script_name)
     201              pyc_file = import_helper.make_legacy_pyc(script_name)
     202              self._check_script(pyc_file)
     203  
     204      def test_directory(self):
     205          source = self.main_in_children_source
     206          with os_helper.temp_dir() as script_dir:
     207              script_name = _make_test_script(script_dir, '__main__',
     208                                              source=source)
     209              self._check_script(script_dir)
     210  
     211      def test_directory_compiled(self):
     212          source = self.main_in_children_source
     213          with os_helper.temp_dir() as script_dir:
     214              script_name = _make_test_script(script_dir, '__main__',
     215                                              source=source)
     216              py_compile.compile(script_name, doraise=True)
     217              os.remove(script_name)
     218              pyc_file = import_helper.make_legacy_pyc(script_name)
     219              self._check_script(script_dir)
     220  
     221      def test_zipfile(self):
     222          source = self.main_in_children_source
     223          with os_helper.temp_dir() as script_dir:
     224              script_name = _make_test_script(script_dir, '__main__',
     225                                              source=source)
     226              zip_name, run_name = make_zip_script(script_dir, 'test_zip', script_name)
     227              self._check_script(zip_name)
     228  
     229      def test_zipfile_compiled(self):
     230          source = self.main_in_children_source
     231          with os_helper.temp_dir() as script_dir:
     232              script_name = _make_test_script(script_dir, '__main__',
     233                                              source=source)
     234              compiled_name = py_compile.compile(script_name, doraise=True)
     235              zip_name, run_name = make_zip_script(script_dir, 'test_zip', compiled_name)
     236              self._check_script(zip_name)
     237  
     238      def test_module_in_package(self):
     239          with os_helper.temp_dir() as script_dir:
     240              pkg_dir = os.path.join(script_dir, 'test_pkg')
     241              make_pkg(pkg_dir)
     242              script_name = _make_test_script(pkg_dir, 'check_sibling')
     243              launch_name = _make_launch_script(script_dir, 'launch',
     244                                                'test_pkg.check_sibling')
     245              self._check_script(launch_name)
     246  
     247      def test_module_in_package_in_zipfile(self):
     248          with os_helper.temp_dir() as script_dir:
     249              zip_name, run_name = _make_test_zip_pkg(script_dir, 'test_zip', 'test_pkg', 'script')
     250              launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg.script', zip_name)
     251              self._check_script(launch_name)
     252  
     253      def test_module_in_subpackage_in_zipfile(self):
     254          with os_helper.temp_dir() as script_dir:
     255              zip_name, run_name = _make_test_zip_pkg(script_dir, 'test_zip', 'test_pkg', 'script', depth=2)
     256              launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg.test_pkg.script', zip_name)
     257              self._check_script(launch_name)
     258  
     259      def test_package(self):
     260          source = self.main_in_children_source
     261          with os_helper.temp_dir() as script_dir:
     262              pkg_dir = os.path.join(script_dir, 'test_pkg')
     263              make_pkg(pkg_dir)
     264              script_name = _make_test_script(pkg_dir, '__main__',
     265                                              source=source)
     266              launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg')
     267              self._check_script(launch_name)
     268  
     269      def test_package_compiled(self):
     270          source = self.main_in_children_source
     271          with os_helper.temp_dir() as script_dir:
     272              pkg_dir = os.path.join(script_dir, 'test_pkg')
     273              make_pkg(pkg_dir)
     274              script_name = _make_test_script(pkg_dir, '__main__',
     275                                              source=source)
     276              compiled_name = py_compile.compile(script_name, doraise=True)
     277              os.remove(script_name)
     278              pyc_file = import_helper.make_legacy_pyc(script_name)
     279              launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg')
     280              self._check_script(launch_name)
     281  
     282  # Test all supported start methods (setupClass skips as appropriate)
     283  
     284  class ESC[4;38;5;81mSpawnCmdLineTest(ESC[4;38;5;149mMultiProcessingCmdLineMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     285      start_method = 'spawn'
     286      main_in_children_source = test_source_main_skipped_in_children
     287  
     288  class ESC[4;38;5;81mForkCmdLineTest(ESC[4;38;5;149mMultiProcessingCmdLineMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     289      start_method = 'fork'
     290      main_in_children_source = test_source
     291  
     292  class ESC[4;38;5;81mForkServerCmdLineTest(ESC[4;38;5;149mMultiProcessingCmdLineMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     293      start_method = 'forkserver'
     294      main_in_children_source = test_source_main_skipped_in_children
     295  
     296  def tearDownModule():
     297      support.reap_children()
     298  
     299  if __name__ == '__main__':
     300      unittest.main()