python (3.12.0)
1 # tests __main__ module handling in multiprocessing
2 from test import support
3 from test.support import import_helper
4 # Skip tests if _multiprocessing wasn't built.
5 import_helper.import_module('_multiprocessing')
6
7 import importlib
8 import importlib.machinery
9 import unittest
10 import sys
11 import os
12 import os.path
13 import py_compile
14
15 from test.support import os_helper
16 from test.support.script_helper import (
17 make_pkg, make_script, make_zip_pkg, make_zip_script,
18 assert_python_ok)
19
20 if support.PGO:
21 raise unittest.SkipTest("test is not helpful for PGO")
22
23 # Look up which start methods are available to test
24 import multiprocessing
25 AVAILABLE_START_METHODS = set(multiprocessing.get_all_start_methods())
26
27 # Issue #22332: Skip tests if sem_open implementation is broken.
28 support.skip_if_broken_multiprocessing_synchronize()
29
30 verbose = support.verbose
31
32 test_source = """\
33 # multiprocessing includes all sorts of shenanigans to make __main__
34 # attributes accessible in the subprocess in a pickle compatible way.
35
36 # We run the "doesn't work in the interactive interpreter" example from
37 # the docs to make sure it *does* work from an executed __main__,
38 # regardless of the invocation mechanism
39
40 import sys
41 import time
42 from multiprocessing import Pool, set_start_method
43 from test import support
44
45 # We use this __main__ defined function in the map call below in order to
46 # check that multiprocessing in correctly running the unguarded
47 # code in child processes and then making it available as __main__
48 def f(x):
49 return x*x
50
51 # Check explicit relative imports
52 if "check_sibling" in __file__:
53 # We're inside a package and not in a __main__.py file
54 # so make sure explicit relative imports work correctly
55 from . import sibling
56
57 if __name__ == '__main__':
58 start_method = sys.argv[1]
59 set_start_method(start_method)
60 results = []
61 with Pool(5) as pool:
62 pool.map_async(f, [1, 2, 3], callback=results.extend)
63
64 # up to 1 min to report the results
65 for _ in support.sleeping_retry(support.LONG_TIMEOUT,
66 "Timed out waiting for results"):
67 if results:
68 break
69
70 results.sort()
71 print(start_method, "->", results)
72
73 pool.join()
74 """
75
76 test_source_main_skipped_in_children = """\
77 # __main__.py files have an implied "if __name__ == '__main__'" so
78 # multiprocessing should always skip running them in child processes
79
80 # This means we can't use __main__ defined functions in child processes,
81 # so we just use "int" as a passthrough operation below
82
83 if __name__ != "__main__":
84 raise RuntimeError("Should only be called as __main__!")
85
86 import sys
87 import time
88 from multiprocessing import Pool, set_start_method
89 from test import support
90
91 start_method = sys.argv[1]
92 set_start_method(start_method)
93 results = []
94 with Pool(5) as pool:
95 pool.map_async(int, [1, 4, 9], callback=results.extend)
96 # up to 1 min to report the results
97 for _ in support.sleeping_retry(support.LONG_TIMEOUT,
98 "Timed out waiting for results"):
99 if results:
100 break
101
102 results.sort()
103 print(start_method, "->", results)
104
105 pool.join()
106 """
107
108 # These helpers were copied from test_cmd_line_script & tweaked a bit...
109
110 def _make_test_script(script_dir, script_basename,
111 source=test_source, omit_suffix=False):
112 to_return = make_script(script_dir, script_basename,
113 source, omit_suffix)
114 # Hack to check explicit relative imports
115 if script_basename == "check_sibling":
116 make_script(script_dir, "sibling", "")
117 importlib.invalidate_caches()
118 return to_return
119
120 def _make_test_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
121 source=test_source, depth=1):
122 to_return = make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
123 source, depth)
124 importlib.invalidate_caches()
125 return to_return
126
127 # There's no easy way to pass the script directory in to get
128 # -m to work (avoiding that is the whole point of making
129 # directories and zipfiles executable!)
130 # So we fake it for testing purposes with a custom launch script
131 launch_source = """\
132 import sys, os.path, runpy
133 sys.path.insert(0, %s)
134 runpy._run_module_as_main(%r)
135 """
136
137 def _make_launch_script(script_dir, script_basename, module_name, path=None):
138 if path is None:
139 path = "os.path.dirname(__file__)"
140 else:
141 path = repr(path)
142 source = launch_source % (path, module_name)
143 to_return = make_script(script_dir, script_basename, source)
144 importlib.invalidate_caches()
145 return to_return
146
147 class ESC[4;38;5;81mMultiProcessingCmdLineMixin():
148 maxDiff = None # Show full tracebacks on subprocess failure
149
150 def setUp(self):
151 if self.start_method not in AVAILABLE_START_METHODS:
152 self.skipTest("%r start method not available" % self.start_method)
153
154 def _check_output(self, script_name, exit_code, out, err):
155 if verbose > 1:
156 print("Output from test script %r:" % script_name)
157 print(repr(out))
158 self.assertEqual(exit_code, 0)
159 self.assertEqual(err.decode('utf-8'), '')
160 expected_results = "%s -> [1, 4, 9]" % self.start_method
161 self.assertEqual(out.decode('utf-8').strip(), expected_results)
162
163 def _check_script(self, script_name, *cmd_line_switches):
164 if not __debug__:
165 cmd_line_switches += ('-' + 'O' * sys.flags.optimize,)
166 run_args = cmd_line_switches + (script_name, self.start_method)
167 rc, out, err = assert_python_ok(*run_args, __isolated=False)
168 self._check_output(script_name, rc, out, err)
169
170 def test_basic_script(self):
171 with os_helper.temp_dir() as script_dir:
172 script_name = _make_test_script(script_dir, 'script')
173 self._check_script(script_name)
174
175 def test_basic_script_no_suffix(self):
176 with os_helper.temp_dir() as script_dir:
177 script_name = _make_test_script(script_dir, 'script',
178 omit_suffix=True)
179 self._check_script(script_name)
180
181 def test_ipython_workaround(self):
182 # Some versions of the IPython launch script are missing the
183 # __name__ = "__main__" guard, and multiprocessing has long had
184 # a workaround for that case
185 # See https://github.com/ipython/ipython/issues/4698
186 source = test_source_main_skipped_in_children
187 with os_helper.temp_dir() as script_dir:
188 script_name = _make_test_script(script_dir, 'ipython',
189 source=source)
190 self._check_script(script_name)
191 script_no_suffix = _make_test_script(script_dir, 'ipython',
192 source=source,
193 omit_suffix=True)
194 self._check_script(script_no_suffix)
195
196 def test_script_compiled(self):
197 with os_helper.temp_dir() as script_dir:
198 script_name = _make_test_script(script_dir, 'script')
199 py_compile.compile(script_name, doraise=True)
200 os.remove(script_name)
201 pyc_file = import_helper.make_legacy_pyc(script_name)
202 self._check_script(pyc_file)
203
204 def test_directory(self):
205 source = self.main_in_children_source
206 with os_helper.temp_dir() as script_dir:
207 script_name = _make_test_script(script_dir, '__main__',
208 source=source)
209 self._check_script(script_dir)
210
211 def test_directory_compiled(self):
212 source = self.main_in_children_source
213 with os_helper.temp_dir() as script_dir:
214 script_name = _make_test_script(script_dir, '__main__',
215 source=source)
216 py_compile.compile(script_name, doraise=True)
217 os.remove(script_name)
218 pyc_file = import_helper.make_legacy_pyc(script_name)
219 self._check_script(script_dir)
220
221 def test_zipfile(self):
222 source = self.main_in_children_source
223 with os_helper.temp_dir() as script_dir:
224 script_name = _make_test_script(script_dir, '__main__',
225 source=source)
226 zip_name, run_name = make_zip_script(script_dir, 'test_zip', script_name)
227 self._check_script(zip_name)
228
229 def test_zipfile_compiled(self):
230 source = self.main_in_children_source
231 with os_helper.temp_dir() as script_dir:
232 script_name = _make_test_script(script_dir, '__main__',
233 source=source)
234 compiled_name = py_compile.compile(script_name, doraise=True)
235 zip_name, run_name = make_zip_script(script_dir, 'test_zip', compiled_name)
236 self._check_script(zip_name)
237
238 def test_module_in_package(self):
239 with os_helper.temp_dir() as script_dir:
240 pkg_dir = os.path.join(script_dir, 'test_pkg')
241 make_pkg(pkg_dir)
242 script_name = _make_test_script(pkg_dir, 'check_sibling')
243 launch_name = _make_launch_script(script_dir, 'launch',
244 'test_pkg.check_sibling')
245 self._check_script(launch_name)
246
247 def test_module_in_package_in_zipfile(self):
248 with os_helper.temp_dir() as script_dir:
249 zip_name, run_name = _make_test_zip_pkg(script_dir, 'test_zip', 'test_pkg', 'script')
250 launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg.script', zip_name)
251 self._check_script(launch_name)
252
253 def test_module_in_subpackage_in_zipfile(self):
254 with os_helper.temp_dir() as script_dir:
255 zip_name, run_name = _make_test_zip_pkg(script_dir, 'test_zip', 'test_pkg', 'script', depth=2)
256 launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg.test_pkg.script', zip_name)
257 self._check_script(launch_name)
258
259 def test_package(self):
260 source = self.main_in_children_source
261 with os_helper.temp_dir() as script_dir:
262 pkg_dir = os.path.join(script_dir, 'test_pkg')
263 make_pkg(pkg_dir)
264 script_name = _make_test_script(pkg_dir, '__main__',
265 source=source)
266 launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg')
267 self._check_script(launch_name)
268
269 def test_package_compiled(self):
270 source = self.main_in_children_source
271 with os_helper.temp_dir() as script_dir:
272 pkg_dir = os.path.join(script_dir, 'test_pkg')
273 make_pkg(pkg_dir)
274 script_name = _make_test_script(pkg_dir, '__main__',
275 source=source)
276 compiled_name = py_compile.compile(script_name, doraise=True)
277 os.remove(script_name)
278 pyc_file = import_helper.make_legacy_pyc(script_name)
279 launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg')
280 self._check_script(launch_name)
281
282 # Test all supported start methods (setupClass skips as appropriate)
283
284 class ESC[4;38;5;81mSpawnCmdLineTest(ESC[4;38;5;149mMultiProcessingCmdLineMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
285 start_method = 'spawn'
286 main_in_children_source = test_source_main_skipped_in_children
287
288 class ESC[4;38;5;81mForkCmdLineTest(ESC[4;38;5;149mMultiProcessingCmdLineMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
289 start_method = 'fork'
290 main_in_children_source = test_source
291
292 class ESC[4;38;5;81mForkServerCmdLineTest(ESC[4;38;5;149mMultiProcessingCmdLineMixin, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
293 start_method = 'forkserver'
294 main_in_children_source = test_source_main_skipped_in_children
295
296 def tearDownModule():
297 support.reap_children()
298
299 if __name__ == '__main__':
300 unittest.main()