• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Tests for sys.audit and sys.addaudithook
2"""
3
4import subprocess
5import sys
6import unittest
7from test import support
8from test.support import import_helper
9from test.support import os_helper
10
11
12if not hasattr(sys, "addaudithook") or not hasattr(sys, "audit"):
13    raise unittest.SkipTest("test only relevant when sys.audit is available")
14
15AUDIT_TESTS_PY = support.findfile("audit-tests.py")
16
17
18class AuditTest(unittest.TestCase):
19    def do_test(self, *args):
20        with subprocess.Popen(
21            [sys.executable, "-X utf8", AUDIT_TESTS_PY, *args],
22            encoding="utf-8",
23            stdout=subprocess.PIPE,
24            stderr=subprocess.PIPE,
25        ) as p:
26            p.wait()
27            sys.stdout.writelines(p.stdout)
28            sys.stderr.writelines(p.stderr)
29            if p.returncode:
30                self.fail("".join(p.stderr))
31
32    def run_python(self, *args):
33        events = []
34        with subprocess.Popen(
35            [sys.executable, "-X utf8", AUDIT_TESTS_PY, *args],
36            encoding="utf-8",
37            stdout=subprocess.PIPE,
38            stderr=subprocess.PIPE,
39        ) as p:
40            p.wait()
41            sys.stderr.writelines(p.stderr)
42            return (
43                p.returncode,
44                [line.strip().partition(" ") for line in p.stdout],
45                "".join(p.stderr),
46            )
47
48    def test_basic(self):
49        self.do_test("test_basic")
50
51    def test_block_add_hook(self):
52        self.do_test("test_block_add_hook")
53
54    def test_block_add_hook_baseexception(self):
55        self.do_test("test_block_add_hook_baseexception")
56
57    def test_marshal(self):
58        import_helper.import_module("marshal")
59
60        self.do_test("test_marshal")
61
62    def test_pickle(self):
63        import_helper.import_module("pickle")
64
65        self.do_test("test_pickle")
66
67    def test_monkeypatch(self):
68        self.do_test("test_monkeypatch")
69
70    def test_open(self):
71        self.do_test("test_open", os_helper.TESTFN)
72
73    def test_cantrace(self):
74        self.do_test("test_cantrace")
75
76    def test_mmap(self):
77        self.do_test("test_mmap")
78
79    def test_excepthook(self):
80        returncode, events, stderr = self.run_python("test_excepthook")
81        if not returncode:
82            self.fail(f"Expected fatal exception\n{stderr}")
83
84        self.assertSequenceEqual(
85            [("sys.excepthook", " ", "RuntimeError('fatal-error')")], events
86        )
87
88    def test_unraisablehook(self):
89        returncode, events, stderr = self.run_python("test_unraisablehook")
90        if returncode:
91            self.fail(stderr)
92
93        self.assertEqual(events[0][0], "sys.unraisablehook")
94        self.assertEqual(
95            events[0][2],
96            "RuntimeError('nonfatal-error') Exception ignored for audit hook test",
97        )
98
99    def test_winreg(self):
100        import_helper.import_module("winreg")
101        returncode, events, stderr = self.run_python("test_winreg")
102        if returncode:
103            self.fail(stderr)
104
105        self.assertEqual(events[0][0], "winreg.OpenKey")
106        self.assertEqual(events[1][0], "winreg.OpenKey/result")
107        expected = events[1][2]
108        self.assertTrue(expected)
109        self.assertSequenceEqual(["winreg.EnumKey", " ", f"{expected} 0"], events[2])
110        self.assertSequenceEqual(["winreg.EnumKey", " ", f"{expected} 10000"], events[3])
111        self.assertSequenceEqual(["winreg.PyHKEY.Detach", " ", expected], events[4])
112
113    def test_socket(self):
114        import_helper.import_module("socket")
115        returncode, events, stderr = self.run_python("test_socket")
116        if returncode:
117            self.fail(stderr)
118
119        if support.verbose:
120            print(*events, sep='\n')
121        self.assertEqual(events[0][0], "socket.gethostname")
122        self.assertEqual(events[1][0], "socket.__new__")
123        self.assertEqual(events[2][0], "socket.bind")
124        self.assertTrue(events[2][2].endswith("('127.0.0.1', 8080)"))
125
126    def test_gc(self):
127        returncode, events, stderr = self.run_python("test_gc")
128        if returncode:
129            self.fail(stderr)
130
131        if support.verbose:
132            print(*events, sep='\n')
133        self.assertEqual(
134            [event[0] for event in events],
135            ["gc.get_objects", "gc.get_referrers", "gc.get_referents"]
136        )
137
138
139    def test_http(self):
140        import_helper.import_module("http.client")
141        returncode, events, stderr = self.run_python("test_http_client")
142        if returncode:
143            self.fail(stderr)
144
145        if support.verbose:
146            print(*events, sep='\n')
147        self.assertEqual(events[0][0], "http.client.connect")
148        self.assertEqual(events[0][2], "www.python.org 80")
149        self.assertEqual(events[1][0], "http.client.send")
150        if events[1][2] != '[cannot send]':
151            self.assertIn('HTTP', events[1][2])
152
153
154    def test_sqlite3(self):
155        try:
156            import sqlite3
157        except ImportError:
158            return
159        returncode, events, stderr = self.run_python("test_sqlite3")
160        if returncode:
161            self.fail(stderr)
162
163        if support.verbose:
164            print(*events, sep='\n')
165        actual = [ev[0] for ev in events]
166        expected = ["sqlite3.connect", "sqlite3.connect/handle"] * 2
167
168        if hasattr(sqlite3.Connection, "enable_load_extension"):
169            expected += [
170                "sqlite3.enable_load_extension",
171                "sqlite3.load_extension",
172            ]
173        self.assertEqual(actual, expected)
174
175
176if __name__ == "__main__":
177    unittest.main()
178