1import unittest 2 3from contextlib import contextmanager, ExitStack 4from test.support import ( 5 catch_unraisable_exception, import_helper, 6 gc_collect, suppress_immortalization) 7 8 9# Skip this test if the _testcapi module isn't available. 10_testcapi = import_helper.import_module('_testcapi') 11 12 13class TestDictWatchers(unittest.TestCase): 14 # types of watchers testcapimodule can add: 15 EVENTS = 0 # appends dict events as strings to global event list 16 ERROR = 1 # unconditionally sets and signals a RuntimeException 17 SECOND = 2 # always appends "second" to global event list 18 19 def add_watcher(self, kind=EVENTS): 20 return _testcapi.add_dict_watcher(kind) 21 22 def clear_watcher(self, watcher_id): 23 _testcapi.clear_dict_watcher(watcher_id) 24 25 @contextmanager 26 def watcher(self, kind=EVENTS): 27 wid = self.add_watcher(kind) 28 try: 29 yield wid 30 finally: 31 self.clear_watcher(wid) 32 33 def assert_events(self, expected): 34 actual = _testcapi.get_dict_watcher_events() 35 self.assertEqual(actual, expected) 36 37 def watch(self, wid, d): 38 _testcapi.watch_dict(wid, d) 39 40 def unwatch(self, wid, d): 41 _testcapi.unwatch_dict(wid, d) 42 43 def test_set_new_item(self): 44 d = {} 45 with self.watcher() as wid: 46 self.watch(wid, d) 47 d["foo"] = "bar" 48 self.assert_events(["new:foo:bar"]) 49 50 def test_set_existing_item(self): 51 d = {"foo": "bar"} 52 with self.watcher() as wid: 53 self.watch(wid, d) 54 d["foo"] = "baz" 55 self.assert_events(["mod:foo:baz"]) 56 57 def test_clone(self): 58 d = {} 59 d2 = {"foo": "bar"} 60 with self.watcher() as wid: 61 self.watch(wid, d) 62 d.update(d2) 63 self.assert_events(["clone"]) 64 65 def test_no_event_if_not_watched(self): 66 d = {} 67 with self.watcher() as wid: 68 d["foo"] = "bar" 69 self.assert_events([]) 70 71 def test_del(self): 72 d = {"foo": "bar"} 73 with self.watcher() as wid: 74 self.watch(wid, d) 75 del d["foo"] 76 self.assert_events(["del:foo"]) 77 78 def test_pop(self): 79 d = {"foo": "bar"} 80 with self.watcher() as wid: 81 self.watch(wid, d) 82 d.pop("foo") 83 self.assert_events(["del:foo"]) 84 85 def test_clear(self): 86 d = {"foo": "bar"} 87 with self.watcher() as wid: 88 self.watch(wid, d) 89 d.clear() 90 self.assert_events(["clear"]) 91 92 def test_dealloc(self): 93 d = {"foo": "bar"} 94 with self.watcher() as wid: 95 self.watch(wid, d) 96 del d 97 self.assert_events(["dealloc"]) 98 99 def test_object_dict(self): 100 class MyObj: pass 101 o = MyObj() 102 103 with self.watcher() as wid: 104 self.watch(wid, o.__dict__) 105 o.foo = "bar" 106 o.foo = "baz" 107 del o.foo 108 self.assert_events(["new:foo:bar", "mod:foo:baz", "del:foo"]) 109 110 with self.watcher() as wid: 111 self.watch(wid, o.__dict__) 112 for _ in range(100): 113 o.foo = "bar" 114 self.assert_events(["new:foo:bar"] + ["mod:foo:bar"] * 99) 115 116 def test_unwatch(self): 117 d = {} 118 with self.watcher() as wid: 119 self.watch(wid, d) 120 d["foo"] = "bar" 121 self.unwatch(wid, d) 122 d["hmm"] = "baz" 123 self.assert_events(["new:foo:bar"]) 124 125 def test_error(self): 126 d = {} 127 with self.watcher(kind=self.ERROR) as wid: 128 self.watch(wid, d) 129 with catch_unraisable_exception() as cm: 130 d["foo"] = "bar" 131 self.assertIn( 132 "Exception ignored in " 133 "PyDict_EVENT_ADDED watcher callback for <dict at ", 134 cm.unraisable.err_msg 135 ) 136 self.assertIsNone(cm.unraisable.object) 137 self.assertEqual(str(cm.unraisable.exc_value), "boom!") 138 self.assert_events([]) 139 140 def test_dealloc_error(self): 141 d = {} 142 with self.watcher(kind=self.ERROR) as wid: 143 self.watch(wid, d) 144 with catch_unraisable_exception() as cm: 145 del d 146 self.assertEqual(str(cm.unraisable.exc_value), "boom!") 147 148 def test_two_watchers(self): 149 d1 = {} 150 d2 = {} 151 with self.watcher() as wid1: 152 with self.watcher(kind=self.SECOND) as wid2: 153 self.watch(wid1, d1) 154 self.watch(wid2, d2) 155 d1["foo"] = "bar" 156 d2["hmm"] = "baz" 157 self.assert_events(["new:foo:bar", "second"]) 158 159 def test_watch_non_dict(self): 160 with self.watcher() as wid: 161 with self.assertRaisesRegex(ValueError, r"Cannot watch non-dictionary"): 162 self.watch(wid, 1) 163 164 def test_watch_out_of_range_watcher_id(self): 165 d = {} 166 with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"): 167 self.watch(-1, d) 168 with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"): 169 self.watch(8, d) # DICT_MAX_WATCHERS = 8 170 171 def test_watch_unassigned_watcher_id(self): 172 d = {} 173 with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 3"): 174 self.watch(3, d) 175 176 def test_unwatch_non_dict(self): 177 with self.watcher() as wid: 178 with self.assertRaisesRegex(ValueError, r"Cannot watch non-dictionary"): 179 self.unwatch(wid, 1) 180 181 def test_unwatch_out_of_range_watcher_id(self): 182 d = {} 183 with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"): 184 self.unwatch(-1, d) 185 with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"): 186 self.unwatch(8, d) # DICT_MAX_WATCHERS = 8 187 188 def test_unwatch_unassigned_watcher_id(self): 189 d = {} 190 with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 3"): 191 self.unwatch(3, d) 192 193 def test_clear_out_of_range_watcher_id(self): 194 with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"): 195 self.clear_watcher(-1) 196 with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"): 197 self.clear_watcher(8) # DICT_MAX_WATCHERS = 8 198 199 def test_clear_unassigned_watcher_id(self): 200 with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 3"): 201 self.clear_watcher(3) 202 203 204class TestTypeWatchers(unittest.TestCase): 205 # types of watchers testcapimodule can add: 206 TYPES = 0 # appends modified types to global event list 207 ERROR = 1 # unconditionally sets and signals a RuntimeException 208 WRAP = 2 # appends modified type wrapped in list to global event list 209 210 # duplicating the C constant 211 TYPE_MAX_WATCHERS = 8 212 213 def add_watcher(self, kind=TYPES): 214 return _testcapi.add_type_watcher(kind) 215 216 def clear_watcher(self, watcher_id): 217 _testcapi.clear_type_watcher(watcher_id) 218 219 @contextmanager 220 def watcher(self, kind=TYPES): 221 wid = self.add_watcher(kind) 222 try: 223 yield wid 224 finally: 225 self.clear_watcher(wid) 226 227 def assert_events(self, expected): 228 actual = _testcapi.get_type_modified_events() 229 self.assertEqual(actual, expected) 230 231 def watch(self, wid, t): 232 _testcapi.watch_type(wid, t) 233 234 def unwatch(self, wid, t): 235 _testcapi.unwatch_type(wid, t) 236 237 def test_watch_type(self): 238 class C: pass 239 with self.watcher() as wid: 240 self.watch(wid, C) 241 C.foo = "bar" 242 self.assert_events([C]) 243 244 def test_event_aggregation(self): 245 class C: pass 246 with self.watcher() as wid: 247 self.watch(wid, C) 248 C.foo = "bar" 249 C.bar = "baz" 250 # only one event registered for both modifications 251 self.assert_events([C]) 252 253 def test_lookup_resets_aggregation(self): 254 class C: pass 255 with self.watcher() as wid: 256 self.watch(wid, C) 257 C.foo = "bar" 258 # lookup resets type version tag 259 self.assertEqual(C.foo, "bar") 260 C.bar = "baz" 261 # both events registered 262 self.assert_events([C, C]) 263 264 def test_unwatch_type(self): 265 class C: pass 266 with self.watcher() as wid: 267 self.watch(wid, C) 268 C.foo = "bar" 269 self.assertEqual(C.foo, "bar") 270 self.assert_events([C]) 271 self.unwatch(wid, C) 272 C.bar = "baz" 273 self.assert_events([C]) 274 275 def test_clear_watcher(self): 276 class C: pass 277 # outer watcher is unused, it's just to keep events list alive 278 with self.watcher() as _: 279 with self.watcher() as wid: 280 self.watch(wid, C) 281 C.foo = "bar" 282 self.assertEqual(C.foo, "bar") 283 self.assert_events([C]) 284 C.bar = "baz" 285 # Watcher on C has been cleared, no new event 286 self.assert_events([C]) 287 288 def test_watch_type_subclass(self): 289 class C: pass 290 class D(C): pass 291 with self.watcher() as wid: 292 self.watch(wid, D) 293 C.foo = "bar" 294 self.assert_events([D]) 295 296 def test_error(self): 297 class C: pass 298 with self.watcher(kind=self.ERROR) as wid: 299 self.watch(wid, C) 300 with catch_unraisable_exception() as cm: 301 C.foo = "bar" 302 self.assertEqual(cm.unraisable.err_msg, 303 f"Exception ignored in type watcher callback #0 for {C!r}") 304 self.assertIs(cm.unraisable.object, None) 305 self.assertEqual(str(cm.unraisable.exc_value), "boom!") 306 self.assert_events([]) 307 308 def test_two_watchers(self): 309 class C1: pass 310 class C2: pass 311 with self.watcher() as wid1: 312 with self.watcher(kind=self.WRAP) as wid2: 313 self.assertNotEqual(wid1, wid2) 314 self.watch(wid1, C1) 315 self.watch(wid2, C2) 316 C1.foo = "bar" 317 C2.hmm = "baz" 318 self.assert_events([C1, [C2]]) 319 320 def test_all_watchers(self): 321 class C: pass 322 with ExitStack() as stack: 323 last_wid = -1 324 # don't make assumptions about how many watchers are already 325 # registered, just go until we reach the max ID 326 while last_wid < self.TYPE_MAX_WATCHERS - 1: 327 last_wid = stack.enter_context(self.watcher()) 328 self.watch(last_wid, C) 329 C.foo = "bar" 330 self.assert_events([C]) 331 332 def test_watch_non_type(self): 333 with self.watcher() as wid: 334 with self.assertRaisesRegex(ValueError, r"Cannot watch non-type"): 335 self.watch(wid, 1) 336 337 def test_watch_out_of_range_watcher_id(self): 338 class C: pass 339 with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"): 340 self.watch(-1, C) 341 with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"): 342 self.watch(self.TYPE_MAX_WATCHERS, C) 343 344 def test_watch_unassigned_watcher_id(self): 345 class C: pass 346 with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"): 347 self.watch(1, C) 348 349 def test_unwatch_non_type(self): 350 with self.watcher() as wid: 351 with self.assertRaisesRegex(ValueError, r"Cannot watch non-type"): 352 self.unwatch(wid, 1) 353 354 def test_unwatch_out_of_range_watcher_id(self): 355 class C: pass 356 with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"): 357 self.unwatch(-1, C) 358 with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"): 359 self.unwatch(self.TYPE_MAX_WATCHERS, C) 360 361 def test_unwatch_unassigned_watcher_id(self): 362 class C: pass 363 with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"): 364 self.unwatch(1, C) 365 366 def test_clear_out_of_range_watcher_id(self): 367 with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"): 368 self.clear_watcher(-1) 369 with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"): 370 self.clear_watcher(self.TYPE_MAX_WATCHERS) 371 372 def test_clear_unassigned_watcher_id(self): 373 with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"): 374 self.clear_watcher(1) 375 376 def test_no_more_ids_available(self): 377 with self.assertRaisesRegex(RuntimeError, r"no more type watcher IDs"): 378 with ExitStack() as stack: 379 for _ in range(self.TYPE_MAX_WATCHERS + 1): 380 stack.enter_context(self.watcher()) 381 382 383class TestCodeObjectWatchers(unittest.TestCase): 384 @contextmanager 385 def code_watcher(self, which_watcher): 386 wid = _testcapi.add_code_watcher(which_watcher) 387 try: 388 yield wid 389 finally: 390 _testcapi.clear_code_watcher(wid) 391 392 def assert_event_counts(self, exp_created_0, exp_destroyed_0, 393 exp_created_1, exp_destroyed_1): 394 gc_collect() # code objects are collected by GC in free-threaded build 395 self.assertEqual( 396 exp_created_0, _testcapi.get_code_watcher_num_created_events(0)) 397 self.assertEqual( 398 exp_destroyed_0, _testcapi.get_code_watcher_num_destroyed_events(0)) 399 self.assertEqual( 400 exp_created_1, _testcapi.get_code_watcher_num_created_events(1)) 401 self.assertEqual( 402 exp_destroyed_1, _testcapi.get_code_watcher_num_destroyed_events(1)) 403 404 @suppress_immortalization() 405 def test_code_object_events_dispatched(self): 406 # verify that all counts are zero before any watchers are registered 407 self.assert_event_counts(0, 0, 0, 0) 408 409 # verify that all counts remain zero when a code object is 410 # created and destroyed with no watchers registered 411 co1 = _testcapi.code_newempty("test_watchers", "dummy1", 0) 412 self.assert_event_counts(0, 0, 0, 0) 413 del co1 414 self.assert_event_counts(0, 0, 0, 0) 415 416 # verify counts are as expected when first watcher is registered 417 with self.code_watcher(0): 418 self.assert_event_counts(0, 0, 0, 0) 419 co2 = _testcapi.code_newempty("test_watchers", "dummy2", 0) 420 self.assert_event_counts(1, 0, 0, 0) 421 del co2 422 self.assert_event_counts(1, 1, 0, 0) 423 424 # again with second watcher registered 425 with self.code_watcher(1): 426 self.assert_event_counts(1, 1, 0, 0) 427 co3 = _testcapi.code_newempty("test_watchers", "dummy3", 0) 428 self.assert_event_counts(2, 1, 1, 0) 429 del co3 430 self.assert_event_counts(2, 2, 1, 1) 431 432 # verify counts are reset and don't change after both watchers are cleared 433 co4 = _testcapi.code_newempty("test_watchers", "dummy4", 0) 434 self.assert_event_counts(0, 0, 0, 0) 435 del co4 436 self.assert_event_counts(0, 0, 0, 0) 437 438 def test_error(self): 439 with self.code_watcher(2): 440 with catch_unraisable_exception() as cm: 441 co = _testcapi.code_newempty("test_watchers", "dummy0", 0) 442 443 self.assertEqual( 444 cm.unraisable.err_msg, 445 f"Exception ignored in " 446 f"PY_CODE_EVENT_CREATE watcher callback for {co!r}" 447 ) 448 self.assertIsNone(cm.unraisable.object) 449 self.assertEqual(str(cm.unraisable.exc_value), "boom!") 450 451 @suppress_immortalization() 452 def test_dealloc_error(self): 453 co = _testcapi.code_newempty("test_watchers", "dummy0", 0) 454 with self.code_watcher(2): 455 with catch_unraisable_exception() as cm: 456 del co 457 gc_collect() 458 459 self.assertEqual(str(cm.unraisable.exc_value), "boom!") 460 461 def test_clear_out_of_range_watcher_id(self): 462 with self.assertRaisesRegex(ValueError, r"Invalid code watcher ID -1"): 463 _testcapi.clear_code_watcher(-1) 464 with self.assertRaisesRegex(ValueError, r"Invalid code watcher ID 8"): 465 _testcapi.clear_code_watcher(8) # CODE_MAX_WATCHERS = 8 466 467 def test_clear_unassigned_watcher_id(self): 468 with self.assertRaisesRegex(ValueError, r"No code watcher set for ID 1"): 469 _testcapi.clear_code_watcher(1) 470 471 def test_allocate_too_many_watchers(self): 472 with self.assertRaisesRegex(RuntimeError, r"no more code watcher IDs available"): 473 _testcapi.allocate_too_many_code_watchers() 474 475 476class TestFuncWatchers(unittest.TestCase): 477 @contextmanager 478 def add_watcher(self, func): 479 wid = _testcapi.add_func_watcher(func) 480 try: 481 yield 482 finally: 483 _testcapi.clear_func_watcher(wid) 484 485 def test_func_events_dispatched(self): 486 events = [] 487 def watcher(*args): 488 events.append(args) 489 490 with self.add_watcher(watcher): 491 def myfunc(): 492 pass 493 self.assertIn((_testcapi.PYFUNC_EVENT_CREATE, myfunc, None), events) 494 myfunc_id = id(myfunc) 495 496 new_code = self.test_func_events_dispatched.__code__ 497 myfunc.__code__ = new_code 498 self.assertIn((_testcapi.PYFUNC_EVENT_MODIFY_CODE, myfunc, new_code), events) 499 500 new_defaults = (123,) 501 myfunc.__defaults__ = new_defaults 502 self.assertIn((_testcapi.PYFUNC_EVENT_MODIFY_DEFAULTS, myfunc, new_defaults), events) 503 504 new_defaults = (456,) 505 _testcapi.set_func_defaults_via_capi(myfunc, new_defaults) 506 self.assertIn((_testcapi.PYFUNC_EVENT_MODIFY_DEFAULTS, myfunc, new_defaults), events) 507 508 new_kwdefaults = {"self": 123} 509 myfunc.__kwdefaults__ = new_kwdefaults 510 self.assertIn((_testcapi.PYFUNC_EVENT_MODIFY_KWDEFAULTS, myfunc, new_kwdefaults), events) 511 512 new_kwdefaults = {"self": 456} 513 _testcapi.set_func_kwdefaults_via_capi(myfunc, new_kwdefaults) 514 self.assertIn((_testcapi.PYFUNC_EVENT_MODIFY_KWDEFAULTS, myfunc, new_kwdefaults), events) 515 516 # Clear events reference to func 517 events = [] 518 del myfunc 519 self.assertIn((_testcapi.PYFUNC_EVENT_DESTROY, myfunc_id, None), events) 520 521 def test_multiple_watchers(self): 522 events0 = [] 523 def first_watcher(*args): 524 events0.append(args) 525 526 events1 = [] 527 def second_watcher(*args): 528 events1.append(args) 529 530 with self.add_watcher(first_watcher): 531 with self.add_watcher(second_watcher): 532 def myfunc(): 533 pass 534 535 event = (_testcapi.PYFUNC_EVENT_CREATE, myfunc, None) 536 self.assertIn(event, events0) 537 self.assertIn(event, events1) 538 539 def test_watcher_raises_error(self): 540 class MyError(Exception): 541 pass 542 543 def watcher(*args): 544 raise MyError("testing 123") 545 546 with self.add_watcher(watcher): 547 with catch_unraisable_exception() as cm: 548 def myfunc(): 549 pass 550 551 self.assertEqual( 552 cm.unraisable.err_msg, 553 f"Exception ignored in " 554 f"PyFunction_EVENT_CREATE watcher callback for {repr(myfunc)[1:-1]}" 555 ) 556 self.assertIsNone(cm.unraisable.object) 557 558 def test_dealloc_watcher_raises_error(self): 559 class MyError(Exception): 560 pass 561 562 def watcher(*args): 563 raise MyError("testing 123") 564 565 def myfunc(): 566 pass 567 568 with self.add_watcher(watcher): 569 with catch_unraisable_exception() as cm: 570 del myfunc 571 572 self.assertIsInstance(cm.unraisable.exc_value, MyError) 573 574 def test_clear_out_of_range_watcher_id(self): 575 with self.assertRaisesRegex(ValueError, r"invalid func watcher ID -1"): 576 _testcapi.clear_func_watcher(-1) 577 with self.assertRaisesRegex(ValueError, r"invalid func watcher ID 8"): 578 _testcapi.clear_func_watcher(8) # FUNC_MAX_WATCHERS = 8 579 580 def test_clear_unassigned_watcher_id(self): 581 with self.assertRaisesRegex(ValueError, r"no func watcher set for ID 1"): 582 _testcapi.clear_func_watcher(1) 583 584 def test_allocate_too_many_watchers(self): 585 with self.assertRaisesRegex(RuntimeError, r"no more func watcher IDs"): 586 _testcapi.allocate_too_many_func_watchers() 587 588 589if __name__ == "__main__": 590 unittest.main() 591