• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# tests __main__ module handling in multiprocessing
2from test import support
3from test.support import import_helper
4# Skip tests if _multiprocessing wasn't built.
5import_helper.import_module('_multiprocessing')
6
7import importlib
8import importlib.machinery
9import unittest
10import sys
11import os
12import os.path
13import py_compile
14
15from test.support import os_helper
16from test.support.script_helper import (
17    make_pkg, make_script, make_zip_pkg, make_zip_script,
18    assert_python_ok)
19
20if support.PGO:
21    raise unittest.SkipTest("test is not helpful for PGO")
22
23# Look up which start methods are available to test
24import multiprocessing
25AVAILABLE_START_METHODS = set(multiprocessing.get_all_start_methods())
26
27# Issue #22332: Skip tests if sem_open implementation is broken.
28support.skip_if_broken_multiprocessing_synchronize()
29
30verbose = support.verbose
31
32test_source = """\
33# multiprocessing includes all sorts of shenanigans to make __main__
34# attributes accessible in the subprocess in a pickle compatible way.
35
36# We run the "doesn't work in the interactive interpreter" example from
37# the docs to make sure it *does* work from an executed __main__,
38# regardless of the invocation mechanism
39
40import sys
41import time
42from multiprocessing import Pool, set_start_method
43
44# We use this __main__ defined function in the map call below in order to
45# check that multiprocessing in correctly running the unguarded
46# code in child processes and then making it available as __main__
47def f(x):
48    return x*x
49
50# Check explicit relative imports
51if "check_sibling" in __file__:
52    # We're inside a package and not in a __main__.py file
53    # so make sure explicit relative imports work correctly
54    from . import sibling
55
56if __name__ == '__main__':
57    start_method = sys.argv[1]
58    set_start_method(start_method)
59    results = []
60    with Pool(5) as pool:
61        pool.map_async(f, [1, 2, 3], callback=results.extend)
62        start_time = time.monotonic()
63        while not results:
64            time.sleep(0.05)
65            # up to 1 min to report the results
66            dt = time.monotonic() - start_time
67            if dt > 60.0:
68                raise RuntimeError("Timed out waiting for results (%.1f sec)" % dt)
69
70    results.sort()
71    print(start_method, "->", results)
72
73    pool.join()
74"""
75
76test_source_main_skipped_in_children = """\
77# __main__.py files have an implied "if __name__ == '__main__'" so
78# multiprocessing should always skip running them in child processes
79
80# This means we can't use __main__ defined functions in child processes,
81# so we just use "int" as a passthrough operation below
82
83if __name__ != "__main__":
84    raise RuntimeError("Should only be called as __main__!")
85
86import sys
87import time
88from multiprocessing import Pool, set_start_method
89
90start_method = sys.argv[1]
91set_start_method(start_method)
92results = []
93with Pool(5) as pool:
94    pool.map_async(int, [1, 4, 9], callback=results.extend)
95    start_time = time.monotonic()
96    while not results:
97        time.sleep(0.05)
98        # up to 1 min to report the results
99        dt = time.monotonic() - start_time
100        if dt > 60.0:
101            raise RuntimeError("Timed out waiting for results (%.1f sec)" % dt)
102
103results.sort()
104print(start_method, "->", results)
105
106pool.join()
107"""
108
109# These helpers were copied from test_cmd_line_script & tweaked a bit...
110
111def _make_test_script(script_dir, script_basename,
112                      source=test_source, omit_suffix=False):
113    to_return = make_script(script_dir, script_basename,
114                            source, omit_suffix)
115    # Hack to check explicit relative imports
116    if script_basename == "check_sibling":
117        make_script(script_dir, "sibling", "")
118    importlib.invalidate_caches()
119    return to_return
120
121def _make_test_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
122                       source=test_source, depth=1):
123    to_return = make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
124                             source, depth)
125    importlib.invalidate_caches()
126    return to_return
127
128# There's no easy way to pass the script directory in to get
129# -m to work (avoiding that is the whole point of making
130# directories and zipfiles executable!)
131# So we fake it for testing purposes with a custom launch script
132launch_source = """\
133import sys, os.path, runpy
134sys.path.insert(0, %s)
135runpy._run_module_as_main(%r)
136"""
137
138def _make_launch_script(script_dir, script_basename, module_name, path=None):
139    if path is None:
140        path = "os.path.dirname(__file__)"
141    else:
142        path = repr(path)
143    source = launch_source % (path, module_name)
144    to_return = make_script(script_dir, script_basename, source)
145    importlib.invalidate_caches()
146    return to_return
147
148class MultiProcessingCmdLineMixin():
149    maxDiff = None # Show full tracebacks on subprocess failure
150
151    def setUp(self):
152        if self.start_method not in AVAILABLE_START_METHODS:
153            self.skipTest("%r start method not available" % self.start_method)
154
155    def _check_output(self, script_name, exit_code, out, err):
156        if verbose > 1:
157            print("Output from test script %r:" % script_name)
158            print(repr(out))
159        self.assertEqual(exit_code, 0)
160        self.assertEqual(err.decode('utf-8'), '')
161        expected_results = "%s -> [1, 4, 9]" % self.start_method
162        self.assertEqual(out.decode('utf-8').strip(), expected_results)
163
164    def _check_script(self, script_name, *cmd_line_switches):
165        if not __debug__:
166            cmd_line_switches += ('-' + 'O' * sys.flags.optimize,)
167        run_args = cmd_line_switches + (script_name, self.start_method)
168        rc, out, err = assert_python_ok(*run_args, __isolated=False)
169        self._check_output(script_name, rc, out, err)
170
171    def test_basic_script(self):
172        with os_helper.temp_dir() as script_dir:
173            script_name = _make_test_script(script_dir, 'script')
174            self._check_script(script_name)
175
176    def test_basic_script_no_suffix(self):
177        with os_helper.temp_dir() as script_dir:
178            script_name = _make_test_script(script_dir, 'script',
179                                            omit_suffix=True)
180            self._check_script(script_name)
181
182    def test_ipython_workaround(self):
183        # Some versions of the IPython launch script are missing the
184        # __name__ = "__main__" guard, and multiprocessing has long had
185        # a workaround for that case
186        # See https://github.com/ipython/ipython/issues/4698
187        source = test_source_main_skipped_in_children
188        with os_helper.temp_dir() as script_dir:
189            script_name = _make_test_script(script_dir, 'ipython',
190                                            source=source)
191            self._check_script(script_name)
192            script_no_suffix = _make_test_script(script_dir, 'ipython',
193                                                 source=source,
194                                                 omit_suffix=True)
195            self._check_script(script_no_suffix)
196
197    def test_script_compiled(self):
198        with os_helper.temp_dir() as script_dir:
199            script_name = _make_test_script(script_dir, 'script')
200            py_compile.compile(script_name, doraise=True)
201            os.remove(script_name)
202            pyc_file = import_helper.make_legacy_pyc(script_name)
203            self._check_script(pyc_file)
204
205    def test_directory(self):
206        source = self.main_in_children_source
207        with os_helper.temp_dir() as script_dir:
208            script_name = _make_test_script(script_dir, '__main__',
209                                            source=source)
210            self._check_script(script_dir)
211
212    def test_directory_compiled(self):
213        source = self.main_in_children_source
214        with os_helper.temp_dir() as script_dir:
215            script_name = _make_test_script(script_dir, '__main__',
216                                            source=source)
217            py_compile.compile(script_name, doraise=True)
218            os.remove(script_name)
219            pyc_file = import_helper.make_legacy_pyc(script_name)
220            self._check_script(script_dir)
221
222    def test_zipfile(self):
223        source = self.main_in_children_source
224        with os_helper.temp_dir() as script_dir:
225            script_name = _make_test_script(script_dir, '__main__',
226                                            source=source)
227            zip_name, run_name = make_zip_script(script_dir, 'test_zip', script_name)
228            self._check_script(zip_name)
229
230    def test_zipfile_compiled(self):
231        source = self.main_in_children_source
232        with os_helper.temp_dir() as script_dir:
233            script_name = _make_test_script(script_dir, '__main__',
234                                            source=source)
235            compiled_name = py_compile.compile(script_name, doraise=True)
236            zip_name, run_name = make_zip_script(script_dir, 'test_zip', compiled_name)
237            self._check_script(zip_name)
238
239    def test_module_in_package(self):
240        with os_helper.temp_dir() as script_dir:
241            pkg_dir = os.path.join(script_dir, 'test_pkg')
242            make_pkg(pkg_dir)
243            script_name = _make_test_script(pkg_dir, 'check_sibling')
244            launch_name = _make_launch_script(script_dir, 'launch',
245                                              'test_pkg.check_sibling')
246            self._check_script(launch_name)
247
248    def test_module_in_package_in_zipfile(self):
249        with os_helper.temp_dir() as script_dir:
250            zip_name, run_name = _make_test_zip_pkg(script_dir, 'test_zip', 'test_pkg', 'script')
251            launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg.script', zip_name)
252            self._check_script(launch_name)
253
254    def test_module_in_subpackage_in_zipfile(self):
255        with os_helper.temp_dir() as script_dir:
256            zip_name, run_name = _make_test_zip_pkg(script_dir, 'test_zip', 'test_pkg', 'script', depth=2)
257            launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg.test_pkg.script', zip_name)
258            self._check_script(launch_name)
259
260    def test_package(self):
261        source = self.main_in_children_source
262        with os_helper.temp_dir() as script_dir:
263            pkg_dir = os.path.join(script_dir, 'test_pkg')
264            make_pkg(pkg_dir)
265            script_name = _make_test_script(pkg_dir, '__main__',
266                                            source=source)
267            launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg')
268            self._check_script(launch_name)
269
270    def test_package_compiled(self):
271        source = self.main_in_children_source
272        with os_helper.temp_dir() as script_dir:
273            pkg_dir = os.path.join(script_dir, 'test_pkg')
274            make_pkg(pkg_dir)
275            script_name = _make_test_script(pkg_dir, '__main__',
276                                            source=source)
277            compiled_name = py_compile.compile(script_name, doraise=True)
278            os.remove(script_name)
279            pyc_file = import_helper.make_legacy_pyc(script_name)
280            launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg')
281            self._check_script(launch_name)
282
283# Test all supported start methods (setupClass skips as appropriate)
284
285class SpawnCmdLineTest(MultiProcessingCmdLineMixin, unittest.TestCase):
286    start_method = 'spawn'
287    main_in_children_source = test_source_main_skipped_in_children
288
289class ForkCmdLineTest(MultiProcessingCmdLineMixin, unittest.TestCase):
290    start_method = 'fork'
291    main_in_children_source = test_source
292
293class ForkServerCmdLineTest(MultiProcessingCmdLineMixin, unittest.TestCase):
294    start_method = 'forkserver'
295    main_in_children_source = test_source_main_skipped_in_children
296
297def tearDownModule():
298    support.reap_children()
299
300if __name__ == '__main__':
301    unittest.main()
302