• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""This test case provides support for checking forking and wait behavior.
2
3To test different wait behavior, override the wait_impl method.
4
5We want fork1() semantics -- only the forking thread survives in the
6child after a fork().
7
8On some systems (e.g. Solaris without posix threads) we find that all
9active threads survive in the child after a fork(); this is an error.
10"""
11
12import os, time, unittest
13import threading
14from test import support
15from test.support import threading_helper
16import warnings
17
18
19LONGSLEEP = 2
20SHORTSLEEP = 0.5
21NUM_THREADS = 4
22
23class ForkWait(unittest.TestCase):
24
25    def setUp(self):
26        self._threading_key = threading_helper.threading_setup()
27        self.alive = {}
28        self.stop = 0
29        self.threads = []
30
31    def tearDown(self):
32        # Stop threads
33        self.stop = 1
34        for thread in self.threads:
35            thread.join()
36        thread = None
37        self.threads.clear()
38        threading_helper.threading_cleanup(*self._threading_key)
39
40    def f(self, id):
41        while not self.stop:
42            self.alive[id] = os.getpid()
43            try:
44                time.sleep(SHORTSLEEP)
45            except OSError:
46                pass
47
48    def wait_impl(self, cpid, *, exitcode):
49        support.wait_process(cpid, exitcode=exitcode)
50
51    def test_wait(self):
52        for i in range(NUM_THREADS):
53            thread = threading.Thread(target=self.f, args=(i,))
54            thread.start()
55            self.threads.append(thread)
56
57        # busy-loop to wait for threads
58        for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
59            if len(self.alive) >= NUM_THREADS:
60                break
61
62        a = sorted(self.alive.keys())
63        self.assertEqual(a, list(range(NUM_THREADS)))
64
65        prefork_lives = self.alive.copy()
66
67        # Ignore the warning about fork with threads.
68        with warnings.catch_warnings(category=DeprecationWarning,
69                                     action="ignore"):
70            if (cpid := os.fork()) == 0:
71                # Child
72                time.sleep(LONGSLEEP)
73                n = 0
74                for key in self.alive:
75                    if self.alive[key] != prefork_lives[key]:
76                        n += 1
77                os._exit(n)
78            else:
79                # Parent
80                self.wait_impl(cpid, exitcode=0)
81