1"""Tests for sys.audit and sys.addaudithook 2""" 3 4import subprocess 5import sys 6import unittest 7from test import support 8from test.support import import_helper 9from test.support import os_helper 10 11 12if not hasattr(sys, "addaudithook") or not hasattr(sys, "audit"): 13 raise unittest.SkipTest("test only relevant when sys.audit is available") 14 15AUDIT_TESTS_PY = support.findfile("audit-tests.py") 16 17 18class AuditTest(unittest.TestCase): 19 def do_test(self, *args): 20 with subprocess.Popen( 21 [sys.executable, "-X utf8", AUDIT_TESTS_PY, *args], 22 encoding="utf-8", 23 stdout=subprocess.PIPE, 24 stderr=subprocess.PIPE, 25 ) as p: 26 p.wait() 27 sys.stdout.writelines(p.stdout) 28 sys.stderr.writelines(p.stderr) 29 if p.returncode: 30 self.fail("".join(p.stderr)) 31 32 def run_python(self, *args): 33 events = [] 34 with subprocess.Popen( 35 [sys.executable, "-X utf8", AUDIT_TESTS_PY, *args], 36 encoding="utf-8", 37 stdout=subprocess.PIPE, 38 stderr=subprocess.PIPE, 39 ) as p: 40 p.wait() 41 sys.stderr.writelines(p.stderr) 42 return ( 43 p.returncode, 44 [line.strip().partition(" ") for line in p.stdout], 45 "".join(p.stderr), 46 ) 47 48 def test_basic(self): 49 self.do_test("test_basic") 50 51 def test_block_add_hook(self): 52 self.do_test("test_block_add_hook") 53 54 def test_block_add_hook_baseexception(self): 55 self.do_test("test_block_add_hook_baseexception") 56 57 def test_marshal(self): 58 import_helper.import_module("marshal") 59 60 self.do_test("test_marshal") 61 62 def test_pickle(self): 63 import_helper.import_module("pickle") 64 65 self.do_test("test_pickle") 66 67 def test_monkeypatch(self): 68 self.do_test("test_monkeypatch") 69 70 def test_open(self): 71 self.do_test("test_open", os_helper.TESTFN) 72 73 def test_cantrace(self): 74 self.do_test("test_cantrace") 75 76 def test_mmap(self): 77 self.do_test("test_mmap") 78 79 def test_excepthook(self): 80 returncode, events, stderr = self.run_python("test_excepthook") 81 if not returncode: 82 self.fail(f"Expected fatal exception\n{stderr}") 83 84 self.assertSequenceEqual( 85 [("sys.excepthook", " ", "RuntimeError('fatal-error')")], events 86 ) 87 88 def test_unraisablehook(self): 89 returncode, events, stderr = self.run_python("test_unraisablehook") 90 if returncode: 91 self.fail(stderr) 92 93 self.assertEqual(events[0][0], "sys.unraisablehook") 94 self.assertEqual( 95 events[0][2], 96 "RuntimeError('nonfatal-error') Exception ignored for audit hook test", 97 ) 98 99 def test_winreg(self): 100 import_helper.import_module("winreg") 101 returncode, events, stderr = self.run_python("test_winreg") 102 if returncode: 103 self.fail(stderr) 104 105 self.assertEqual(events[0][0], "winreg.OpenKey") 106 self.assertEqual(events[1][0], "winreg.OpenKey/result") 107 expected = events[1][2] 108 self.assertTrue(expected) 109 self.assertSequenceEqual(["winreg.EnumKey", " ", f"{expected} 0"], events[2]) 110 self.assertSequenceEqual(["winreg.EnumKey", " ", f"{expected} 10000"], events[3]) 111 self.assertSequenceEqual(["winreg.PyHKEY.Detach", " ", expected], events[4]) 112 113 def test_socket(self): 114 import_helper.import_module("socket") 115 returncode, events, stderr = self.run_python("test_socket") 116 if returncode: 117 self.fail(stderr) 118 119 if support.verbose: 120 print(*events, sep='\n') 121 self.assertEqual(events[0][0], "socket.gethostname") 122 self.assertEqual(events[1][0], "socket.__new__") 123 self.assertEqual(events[2][0], "socket.bind") 124 self.assertTrue(events[2][2].endswith("('127.0.0.1', 8080)")) 125 126 def test_gc(self): 127 returncode, events, stderr = self.run_python("test_gc") 128 if returncode: 129 self.fail(stderr) 130 131 if support.verbose: 132 print(*events, sep='\n') 133 self.assertEqual( 134 [event[0] for event in events], 135 ["gc.get_objects", "gc.get_referrers", "gc.get_referents"] 136 ) 137 138 139 def test_http(self): 140 import_helper.import_module("http.client") 141 returncode, events, stderr = self.run_python("test_http_client") 142 if returncode: 143 self.fail(stderr) 144 145 if support.verbose: 146 print(*events, sep='\n') 147 self.assertEqual(events[0][0], "http.client.connect") 148 self.assertEqual(events[0][2], "www.python.org 80") 149 self.assertEqual(events[1][0], "http.client.send") 150 if events[1][2] != '[cannot send]': 151 self.assertIn('HTTP', events[1][2]) 152 153 154 def test_sqlite3(self): 155 try: 156 import sqlite3 157 except ImportError: 158 return 159 returncode, events, stderr = self.run_python("test_sqlite3") 160 if returncode: 161 self.fail(stderr) 162 163 if support.verbose: 164 print(*events, sep='\n') 165 actual = [ev[0] for ev in events] 166 expected = ["sqlite3.connect", "sqlite3.connect/handle"] * 2 167 168 if hasattr(sqlite3.Connection, "enable_load_extension"): 169 expected += [ 170 "sqlite3.enable_load_extension", 171 "sqlite3.load_extension", 172 ] 173 self.assertEqual(actual, expected) 174 175 176if __name__ == "__main__": 177 unittest.main() 178