• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import sys
2import unittest
3import io
4import atexit
5import os
6from test import support
7from test.support import script_helper
8
9### helpers
10def h1():
11    print("h1")
12
13def h2():
14    print("h2")
15
16def h3():
17    print("h3")
18
19def h4(*args, **kwargs):
20    print("h4", args, kwargs)
21
22def raise1():
23    raise TypeError
24
25def raise2():
26    raise SystemError
27
28def exit():
29    raise SystemExit
30
31
32class GeneralTest(unittest.TestCase):
33
34    def setUp(self):
35        self.save_stdout = sys.stdout
36        self.save_stderr = sys.stderr
37        self.stream = io.StringIO()
38        sys.stdout = sys.stderr = self.stream
39        atexit._clear()
40
41    def tearDown(self):
42        sys.stdout = self.save_stdout
43        sys.stderr = self.save_stderr
44        atexit._clear()
45
46    def test_args(self):
47        # be sure args are handled properly
48        atexit.register(h1)
49        atexit.register(h4)
50        atexit.register(h4, 4, kw="abc")
51        atexit._run_exitfuncs()
52
53        self.assertEqual(self.stream.getvalue(),
54                            "h4 (4,) {'kw': 'abc'}\nh4 () {}\nh1\n")
55
56    def test_badargs(self):
57        atexit.register(lambda: 1, 0, 0, (x for x in (1,2)), 0, 0)
58        self.assertRaises(TypeError, atexit._run_exitfuncs)
59
60    def test_order(self):
61        # be sure handlers are executed in reverse order
62        atexit.register(h1)
63        atexit.register(h2)
64        atexit.register(h3)
65        atexit._run_exitfuncs()
66
67        self.assertEqual(self.stream.getvalue(), "h3\nh2\nh1\n")
68
69    def test_raise(self):
70        # be sure raises are handled properly
71        atexit.register(raise1)
72        atexit.register(raise2)
73
74        self.assertRaises(TypeError, atexit._run_exitfuncs)
75
76    def test_raise_unnormalized(self):
77        # Issue #10756: Make sure that an unnormalized exception is
78        # handled properly
79        atexit.register(lambda: 1 / 0)
80
81        self.assertRaises(ZeroDivisionError, atexit._run_exitfuncs)
82        self.assertIn("ZeroDivisionError", self.stream.getvalue())
83
84    def test_exit(self):
85        # be sure a SystemExit is handled properly
86        atexit.register(exit)
87
88        self.assertRaises(SystemExit, atexit._run_exitfuncs)
89        self.assertEqual(self.stream.getvalue(), '')
90
91    def test_print_tracebacks(self):
92        # Issue #18776: the tracebacks should be printed when errors occur.
93        def f():
94            1/0  # one
95        def g():
96            1/0  # two
97        def h():
98            1/0  # three
99        atexit.register(f)
100        atexit.register(g)
101        atexit.register(h)
102
103        self.assertRaises(ZeroDivisionError, atexit._run_exitfuncs)
104        stderr = self.stream.getvalue()
105        self.assertEqual(stderr.count("ZeroDivisionError"), 3)
106        self.assertIn("# one", stderr)
107        self.assertIn("# two", stderr)
108        self.assertIn("# three", stderr)
109
110    def test_stress(self):
111        a = [0]
112        def inc():
113            a[0] += 1
114
115        for i in range(128):
116            atexit.register(inc)
117        atexit._run_exitfuncs()
118
119        self.assertEqual(a[0], 128)
120
121    def test_clear(self):
122        a = [0]
123        def inc():
124            a[0] += 1
125
126        atexit.register(inc)
127        atexit._clear()
128        atexit._run_exitfuncs()
129
130        self.assertEqual(a[0], 0)
131
132    def test_unregister(self):
133        a = [0]
134        def inc():
135            a[0] += 1
136        def dec():
137            a[0] -= 1
138
139        for i in range(4):
140            atexit.register(inc)
141        atexit.register(dec)
142        atexit.unregister(inc)
143        atexit._run_exitfuncs()
144
145        self.assertEqual(a[0], -1)
146
147    def test_bound_methods(self):
148        l = []
149        atexit.register(l.append, 5)
150        atexit._run_exitfuncs()
151        self.assertEqual(l, [5])
152
153        atexit.unregister(l.append)
154        atexit._run_exitfuncs()
155        self.assertEqual(l, [5])
156
157    def test_shutdown(self):
158        # Actually test the shutdown mechanism in a subprocess
159        code = """if 1:
160            import atexit
161
162            def f(msg):
163                print(msg)
164
165            atexit.register(f, "one")
166            atexit.register(f, "two")
167            """
168        res = script_helper.assert_python_ok("-c", code)
169        self.assertEqual(res.out.decode().splitlines(), ["two", "one"])
170        self.assertFalse(res.err)
171
172
173@support.cpython_only
174class SubinterpreterTest(unittest.TestCase):
175
176    def test_callbacks_leak(self):
177        # This test shows a leak in refleak mode if atexit doesn't
178        # take care to free callbacks in its per-subinterpreter module
179        # state.
180        n = atexit._ncallbacks()
181        code = r"""if 1:
182            import atexit
183            def f():
184                pass
185            atexit.register(f)
186            del atexit
187            """
188        ret = support.run_in_subinterp(code)
189        self.assertEqual(ret, 0)
190        self.assertEqual(atexit._ncallbacks(), n)
191
192    def test_callbacks_leak_refcycle(self):
193        # Similar to the above, but with a refcycle through the atexit
194        # module.
195        n = atexit._ncallbacks()
196        code = r"""if 1:
197            import atexit
198            def f():
199                pass
200            atexit.register(f)
201            atexit.__atexit = atexit
202            """
203        ret = support.run_in_subinterp(code)
204        self.assertEqual(ret, 0)
205        self.assertEqual(atexit._ncallbacks(), n)
206
207    def test_callback_on_subinterpreter_teardown(self):
208        # This tests if a callback is called on
209        # subinterpreter teardown.
210        expected = b"The test has passed!"
211        r, w = os.pipe()
212
213        code = r"""if 1:
214            import os
215            import atexit
216            def callback():
217                os.write({:d}, b"The test has passed!")
218            atexit.register(callback)
219        """.format(w)
220        ret = support.run_in_subinterp(code)
221        os.close(w)
222        self.assertEqual(os.read(r, len(expected)), expected)
223        os.close(r)
224
225
226if __name__ == "__main__":
227    unittest.main()
228