1import sys 2import unittest 3import io 4import atexit 5import os 6from test import support 7from test.support import script_helper 8 9### helpers 10def h1(): 11 print("h1") 12 13def h2(): 14 print("h2") 15 16def h3(): 17 print("h3") 18 19def h4(*args, **kwargs): 20 print("h4", args, kwargs) 21 22def raise1(): 23 raise TypeError 24 25def raise2(): 26 raise SystemError 27 28def exit(): 29 raise SystemExit 30 31 32class GeneralTest(unittest.TestCase): 33 34 def setUp(self): 35 self.save_stdout = sys.stdout 36 self.save_stderr = sys.stderr 37 self.stream = io.StringIO() 38 sys.stdout = sys.stderr = self.stream 39 atexit._clear() 40 41 def tearDown(self): 42 sys.stdout = self.save_stdout 43 sys.stderr = self.save_stderr 44 atexit._clear() 45 46 def test_args(self): 47 # be sure args are handled properly 48 atexit.register(h1) 49 atexit.register(h4) 50 atexit.register(h4, 4, kw="abc") 51 atexit._run_exitfuncs() 52 53 self.assertEqual(self.stream.getvalue(), 54 "h4 (4,) {'kw': 'abc'}\nh4 () {}\nh1\n") 55 56 def test_badargs(self): 57 atexit.register(lambda: 1, 0, 0, (x for x in (1,2)), 0, 0) 58 self.assertRaises(TypeError, atexit._run_exitfuncs) 59 60 def test_order(self): 61 # be sure handlers are executed in reverse order 62 atexit.register(h1) 63 atexit.register(h2) 64 atexit.register(h3) 65 atexit._run_exitfuncs() 66 67 self.assertEqual(self.stream.getvalue(), "h3\nh2\nh1\n") 68 69 def test_raise(self): 70 # be sure raises are handled properly 71 atexit.register(raise1) 72 atexit.register(raise2) 73 74 self.assertRaises(TypeError, atexit._run_exitfuncs) 75 76 def test_raise_unnormalized(self): 77 # Issue #10756: Make sure that an unnormalized exception is 78 # handled properly 79 atexit.register(lambda: 1 / 0) 80 81 self.assertRaises(ZeroDivisionError, atexit._run_exitfuncs) 82 self.assertIn("ZeroDivisionError", self.stream.getvalue()) 83 84 def test_exit(self): 85 # be sure a SystemExit is handled properly 86 atexit.register(exit) 87 88 self.assertRaises(SystemExit, atexit._run_exitfuncs) 89 self.assertEqual(self.stream.getvalue(), '') 90 91 def test_print_tracebacks(self): 92 # Issue #18776: the tracebacks should be printed when errors occur. 93 def f(): 94 1/0 # one 95 def g(): 96 1/0 # two 97 def h(): 98 1/0 # three 99 atexit.register(f) 100 atexit.register(g) 101 atexit.register(h) 102 103 self.assertRaises(ZeroDivisionError, atexit._run_exitfuncs) 104 stderr = self.stream.getvalue() 105 self.assertEqual(stderr.count("ZeroDivisionError"), 3) 106 self.assertIn("# one", stderr) 107 self.assertIn("# two", stderr) 108 self.assertIn("# three", stderr) 109 110 def test_stress(self): 111 a = [0] 112 def inc(): 113 a[0] += 1 114 115 for i in range(128): 116 atexit.register(inc) 117 atexit._run_exitfuncs() 118 119 self.assertEqual(a[0], 128) 120 121 def test_clear(self): 122 a = [0] 123 def inc(): 124 a[0] += 1 125 126 atexit.register(inc) 127 atexit._clear() 128 atexit._run_exitfuncs() 129 130 self.assertEqual(a[0], 0) 131 132 def test_unregister(self): 133 a = [0] 134 def inc(): 135 a[0] += 1 136 def dec(): 137 a[0] -= 1 138 139 for i in range(4): 140 atexit.register(inc) 141 atexit.register(dec) 142 atexit.unregister(inc) 143 atexit._run_exitfuncs() 144 145 self.assertEqual(a[0], -1) 146 147 def test_bound_methods(self): 148 l = [] 149 atexit.register(l.append, 5) 150 atexit._run_exitfuncs() 151 self.assertEqual(l, [5]) 152 153 atexit.unregister(l.append) 154 atexit._run_exitfuncs() 155 self.assertEqual(l, [5]) 156 157 def test_shutdown(self): 158 # Actually test the shutdown mechanism in a subprocess 159 code = """if 1: 160 import atexit 161 162 def f(msg): 163 print(msg) 164 165 atexit.register(f, "one") 166 atexit.register(f, "two") 167 """ 168 res = script_helper.assert_python_ok("-c", code) 169 self.assertEqual(res.out.decode().splitlines(), ["two", "one"]) 170 self.assertFalse(res.err) 171 172 173@support.cpython_only 174class SubinterpreterTest(unittest.TestCase): 175 176 def test_callbacks_leak(self): 177 # This test shows a leak in refleak mode if atexit doesn't 178 # take care to free callbacks in its per-subinterpreter module 179 # state. 180 n = atexit._ncallbacks() 181 code = r"""if 1: 182 import atexit 183 def f(): 184 pass 185 atexit.register(f) 186 del atexit 187 """ 188 ret = support.run_in_subinterp(code) 189 self.assertEqual(ret, 0) 190 self.assertEqual(atexit._ncallbacks(), n) 191 192 def test_callbacks_leak_refcycle(self): 193 # Similar to the above, but with a refcycle through the atexit 194 # module. 195 n = atexit._ncallbacks() 196 code = r"""if 1: 197 import atexit 198 def f(): 199 pass 200 atexit.register(f) 201 atexit.__atexit = atexit 202 """ 203 ret = support.run_in_subinterp(code) 204 self.assertEqual(ret, 0) 205 self.assertEqual(atexit._ncallbacks(), n) 206 207 def test_callback_on_subinterpreter_teardown(self): 208 # This tests if a callback is called on 209 # subinterpreter teardown. 210 expected = b"The test has passed!" 211 r, w = os.pipe() 212 213 code = r"""if 1: 214 import os 215 import atexit 216 def callback(): 217 os.write({:d}, b"The test has passed!") 218 atexit.register(callback) 219 """.format(w) 220 ret = support.run_in_subinterp(code) 221 os.close(w) 222 self.assertEqual(os.read(r, len(expected)), expected) 223 os.close(r) 224 225 226if __name__ == "__main__": 227 unittest.main() 228