• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import unittest
2
3from contextlib import contextmanager, ExitStack
4from test.support import (
5    catch_unraisable_exception, import_helper,
6    gc_collect, suppress_immortalization)
7
8
9# Skip this test if the _testcapi module isn't available.
10_testcapi = import_helper.import_module('_testcapi')
11
12
13class TestDictWatchers(unittest.TestCase):
14    # types of watchers testcapimodule can add:
15    EVENTS = 0   # appends dict events as strings to global event list
16    ERROR = 1    # unconditionally sets and signals a RuntimeException
17    SECOND = 2   # always appends "second" to global event list
18
19    def add_watcher(self, kind=EVENTS):
20        return _testcapi.add_dict_watcher(kind)
21
22    def clear_watcher(self, watcher_id):
23        _testcapi.clear_dict_watcher(watcher_id)
24
25    @contextmanager
26    def watcher(self, kind=EVENTS):
27        wid = self.add_watcher(kind)
28        try:
29            yield wid
30        finally:
31            self.clear_watcher(wid)
32
33    def assert_events(self, expected):
34        actual = _testcapi.get_dict_watcher_events()
35        self.assertEqual(actual, expected)
36
37    def watch(self, wid, d):
38        _testcapi.watch_dict(wid, d)
39
40    def unwatch(self, wid, d):
41        _testcapi.unwatch_dict(wid, d)
42
43    def test_set_new_item(self):
44        d = {}
45        with self.watcher() as wid:
46            self.watch(wid, d)
47            d["foo"] = "bar"
48            self.assert_events(["new:foo:bar"])
49
50    def test_set_existing_item(self):
51        d = {"foo": "bar"}
52        with self.watcher() as wid:
53            self.watch(wid, d)
54            d["foo"] = "baz"
55            self.assert_events(["mod:foo:baz"])
56
57    def test_clone(self):
58        d = {}
59        d2 = {"foo": "bar"}
60        with self.watcher() as wid:
61            self.watch(wid, d)
62            d.update(d2)
63            self.assert_events(["clone"])
64
65    def test_no_event_if_not_watched(self):
66        d = {}
67        with self.watcher() as wid:
68            d["foo"] = "bar"
69            self.assert_events([])
70
71    def test_del(self):
72        d = {"foo": "bar"}
73        with self.watcher() as wid:
74            self.watch(wid, d)
75            del d["foo"]
76            self.assert_events(["del:foo"])
77
78    def test_pop(self):
79        d = {"foo": "bar"}
80        with self.watcher() as wid:
81            self.watch(wid, d)
82            d.pop("foo")
83            self.assert_events(["del:foo"])
84
85    def test_clear(self):
86        d = {"foo": "bar"}
87        with self.watcher() as wid:
88            self.watch(wid, d)
89            d.clear()
90            self.assert_events(["clear"])
91
92    def test_dealloc(self):
93        d = {"foo": "bar"}
94        with self.watcher() as wid:
95            self.watch(wid, d)
96            del d
97            self.assert_events(["dealloc"])
98
99    def test_object_dict(self):
100        class MyObj: pass
101        o = MyObj()
102
103        with self.watcher() as wid:
104            self.watch(wid, o.__dict__)
105            o.foo = "bar"
106            o.foo = "baz"
107            del o.foo
108            self.assert_events(["new:foo:bar", "mod:foo:baz", "del:foo"])
109
110        with self.watcher() as wid:
111            self.watch(wid, o.__dict__)
112            for _ in range(100):
113                o.foo = "bar"
114            self.assert_events(["new:foo:bar"] + ["mod:foo:bar"] * 99)
115
116    def test_unwatch(self):
117        d = {}
118        with self.watcher() as wid:
119            self.watch(wid, d)
120            d["foo"] = "bar"
121            self.unwatch(wid, d)
122            d["hmm"] = "baz"
123            self.assert_events(["new:foo:bar"])
124
125    def test_error(self):
126        d = {}
127        with self.watcher(kind=self.ERROR) as wid:
128            self.watch(wid, d)
129            with catch_unraisable_exception() as cm:
130                d["foo"] = "bar"
131                self.assertIn(
132                    "Exception ignored in "
133                    "PyDict_EVENT_ADDED watcher callback for <dict at ",
134                    cm.unraisable.err_msg
135                )
136                self.assertIsNone(cm.unraisable.object)
137                self.assertEqual(str(cm.unraisable.exc_value), "boom!")
138            self.assert_events([])
139
140    def test_dealloc_error(self):
141        d = {}
142        with self.watcher(kind=self.ERROR) as wid:
143            self.watch(wid, d)
144            with catch_unraisable_exception() as cm:
145                del d
146                self.assertEqual(str(cm.unraisable.exc_value), "boom!")
147
148    def test_two_watchers(self):
149        d1 = {}
150        d2 = {}
151        with self.watcher() as wid1:
152            with self.watcher(kind=self.SECOND) as wid2:
153                self.watch(wid1, d1)
154                self.watch(wid2, d2)
155                d1["foo"] = "bar"
156                d2["hmm"] = "baz"
157                self.assert_events(["new:foo:bar", "second"])
158
159    def test_watch_non_dict(self):
160        with self.watcher() as wid:
161            with self.assertRaisesRegex(ValueError, r"Cannot watch non-dictionary"):
162                self.watch(wid, 1)
163
164    def test_watch_out_of_range_watcher_id(self):
165        d = {}
166        with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"):
167            self.watch(-1, d)
168        with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"):
169            self.watch(8, d)  # DICT_MAX_WATCHERS = 8
170
171    def test_watch_unassigned_watcher_id(self):
172        d = {}
173        with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 3"):
174            self.watch(3, d)
175
176    def test_unwatch_non_dict(self):
177        with self.watcher() as wid:
178            with self.assertRaisesRegex(ValueError, r"Cannot watch non-dictionary"):
179                self.unwatch(wid, 1)
180
181    def test_unwatch_out_of_range_watcher_id(self):
182        d = {}
183        with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"):
184            self.unwatch(-1, d)
185        with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"):
186            self.unwatch(8, d)  # DICT_MAX_WATCHERS = 8
187
188    def test_unwatch_unassigned_watcher_id(self):
189        d = {}
190        with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 3"):
191            self.unwatch(3, d)
192
193    def test_clear_out_of_range_watcher_id(self):
194        with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"):
195            self.clear_watcher(-1)
196        with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"):
197            self.clear_watcher(8)  # DICT_MAX_WATCHERS = 8
198
199    def test_clear_unassigned_watcher_id(self):
200        with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 3"):
201            self.clear_watcher(3)
202
203
204class TestTypeWatchers(unittest.TestCase):
205    # types of watchers testcapimodule can add:
206    TYPES = 0    # appends modified types to global event list
207    ERROR = 1    # unconditionally sets and signals a RuntimeException
208    WRAP = 2     # appends modified type wrapped in list to global event list
209
210    # duplicating the C constant
211    TYPE_MAX_WATCHERS = 8
212
213    def add_watcher(self, kind=TYPES):
214        return _testcapi.add_type_watcher(kind)
215
216    def clear_watcher(self, watcher_id):
217        _testcapi.clear_type_watcher(watcher_id)
218
219    @contextmanager
220    def watcher(self, kind=TYPES):
221        wid = self.add_watcher(kind)
222        try:
223            yield wid
224        finally:
225            self.clear_watcher(wid)
226
227    def assert_events(self, expected):
228        actual = _testcapi.get_type_modified_events()
229        self.assertEqual(actual, expected)
230
231    def watch(self, wid, t):
232        _testcapi.watch_type(wid, t)
233
234    def unwatch(self, wid, t):
235        _testcapi.unwatch_type(wid, t)
236
237    def test_watch_type(self):
238        class C: pass
239        with self.watcher() as wid:
240            self.watch(wid, C)
241            C.foo = "bar"
242            self.assert_events([C])
243
244    def test_event_aggregation(self):
245        class C: pass
246        with self.watcher() as wid:
247            self.watch(wid, C)
248            C.foo = "bar"
249            C.bar = "baz"
250            # only one event registered for both modifications
251            self.assert_events([C])
252
253    def test_lookup_resets_aggregation(self):
254        class C: pass
255        with self.watcher() as wid:
256            self.watch(wid, C)
257            C.foo = "bar"
258            # lookup resets type version tag
259            self.assertEqual(C.foo, "bar")
260            C.bar = "baz"
261            # both events registered
262            self.assert_events([C, C])
263
264    def test_unwatch_type(self):
265        class C: pass
266        with self.watcher() as wid:
267            self.watch(wid, C)
268            C.foo = "bar"
269            self.assertEqual(C.foo, "bar")
270            self.assert_events([C])
271            self.unwatch(wid, C)
272            C.bar = "baz"
273            self.assert_events([C])
274
275    def test_clear_watcher(self):
276        class C: pass
277        # outer watcher is unused, it's just to keep events list alive
278        with self.watcher() as _:
279            with self.watcher() as wid:
280                self.watch(wid, C)
281                C.foo = "bar"
282                self.assertEqual(C.foo, "bar")
283                self.assert_events([C])
284            C.bar = "baz"
285            # Watcher on C has been cleared, no new event
286            self.assert_events([C])
287
288    def test_watch_type_subclass(self):
289        class C: pass
290        class D(C): pass
291        with self.watcher() as wid:
292            self.watch(wid, D)
293            C.foo = "bar"
294            self.assert_events([D])
295
296    def test_error(self):
297        class C: pass
298        with self.watcher(kind=self.ERROR) as wid:
299            self.watch(wid, C)
300            with catch_unraisable_exception() as cm:
301                C.foo = "bar"
302                self.assertEqual(cm.unraisable.err_msg,
303                    f"Exception ignored in type watcher callback #0 for {C!r}")
304                self.assertIs(cm.unraisable.object, None)
305                self.assertEqual(str(cm.unraisable.exc_value), "boom!")
306            self.assert_events([])
307
308    def test_two_watchers(self):
309        class C1: pass
310        class C2: pass
311        with self.watcher() as wid1:
312            with self.watcher(kind=self.WRAP) as wid2:
313                self.assertNotEqual(wid1, wid2)
314                self.watch(wid1, C1)
315                self.watch(wid2, C2)
316                C1.foo = "bar"
317                C2.hmm = "baz"
318                self.assert_events([C1, [C2]])
319
320    def test_all_watchers(self):
321        class C: pass
322        with ExitStack() as stack:
323            last_wid = -1
324            # don't make assumptions about how many watchers are already
325            # registered, just go until we reach the max ID
326            while last_wid < self.TYPE_MAX_WATCHERS - 1:
327                last_wid = stack.enter_context(self.watcher())
328            self.watch(last_wid, C)
329            C.foo = "bar"
330            self.assert_events([C])
331
332    def test_watch_non_type(self):
333        with self.watcher() as wid:
334            with self.assertRaisesRegex(ValueError, r"Cannot watch non-type"):
335                self.watch(wid, 1)
336
337    def test_watch_out_of_range_watcher_id(self):
338        class C: pass
339        with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"):
340            self.watch(-1, C)
341        with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"):
342            self.watch(self.TYPE_MAX_WATCHERS, C)
343
344    def test_watch_unassigned_watcher_id(self):
345        class C: pass
346        with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"):
347            self.watch(1, C)
348
349    def test_unwatch_non_type(self):
350        with self.watcher() as wid:
351            with self.assertRaisesRegex(ValueError, r"Cannot watch non-type"):
352                self.unwatch(wid, 1)
353
354    def test_unwatch_out_of_range_watcher_id(self):
355        class C: pass
356        with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"):
357            self.unwatch(-1, C)
358        with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"):
359            self.unwatch(self.TYPE_MAX_WATCHERS, C)
360
361    def test_unwatch_unassigned_watcher_id(self):
362        class C: pass
363        with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"):
364            self.unwatch(1, C)
365
366    def test_clear_out_of_range_watcher_id(self):
367        with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"):
368            self.clear_watcher(-1)
369        with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"):
370            self.clear_watcher(self.TYPE_MAX_WATCHERS)
371
372    def test_clear_unassigned_watcher_id(self):
373        with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"):
374            self.clear_watcher(1)
375
376    def test_no_more_ids_available(self):
377        with self.assertRaisesRegex(RuntimeError, r"no more type watcher IDs"):
378            with ExitStack() as stack:
379                for _ in range(self.TYPE_MAX_WATCHERS + 1):
380                    stack.enter_context(self.watcher())
381
382
383class TestCodeObjectWatchers(unittest.TestCase):
384    @contextmanager
385    def code_watcher(self, which_watcher):
386        wid = _testcapi.add_code_watcher(which_watcher)
387        try:
388            yield wid
389        finally:
390            _testcapi.clear_code_watcher(wid)
391
392    def assert_event_counts(self, exp_created_0, exp_destroyed_0,
393                            exp_created_1, exp_destroyed_1):
394        gc_collect()  # code objects are collected by GC in free-threaded build
395        self.assertEqual(
396            exp_created_0, _testcapi.get_code_watcher_num_created_events(0))
397        self.assertEqual(
398            exp_destroyed_0, _testcapi.get_code_watcher_num_destroyed_events(0))
399        self.assertEqual(
400            exp_created_1, _testcapi.get_code_watcher_num_created_events(1))
401        self.assertEqual(
402            exp_destroyed_1, _testcapi.get_code_watcher_num_destroyed_events(1))
403
404    @suppress_immortalization()
405    def test_code_object_events_dispatched(self):
406        # verify that all counts are zero before any watchers are registered
407        self.assert_event_counts(0, 0, 0, 0)
408
409        # verify that all counts remain zero when a code object is
410        # created and destroyed with no watchers registered
411        co1 = _testcapi.code_newempty("test_watchers", "dummy1", 0)
412        self.assert_event_counts(0, 0, 0, 0)
413        del co1
414        self.assert_event_counts(0, 0, 0, 0)
415
416        # verify counts are as expected when first watcher is registered
417        with self.code_watcher(0):
418            self.assert_event_counts(0, 0, 0, 0)
419            co2 = _testcapi.code_newempty("test_watchers", "dummy2", 0)
420            self.assert_event_counts(1, 0, 0, 0)
421            del co2
422            self.assert_event_counts(1, 1, 0, 0)
423
424            # again with second watcher registered
425            with self.code_watcher(1):
426                self.assert_event_counts(1, 1, 0, 0)
427                co3 = _testcapi.code_newempty("test_watchers", "dummy3", 0)
428                self.assert_event_counts(2, 1, 1, 0)
429                del co3
430                self.assert_event_counts(2, 2, 1, 1)
431
432        # verify counts are reset and don't change after both watchers are cleared
433        co4 = _testcapi.code_newempty("test_watchers", "dummy4", 0)
434        self.assert_event_counts(0, 0, 0, 0)
435        del co4
436        self.assert_event_counts(0, 0, 0, 0)
437
438    def test_error(self):
439        with self.code_watcher(2):
440            with catch_unraisable_exception() as cm:
441                co = _testcapi.code_newempty("test_watchers", "dummy0", 0)
442
443                self.assertEqual(
444                    cm.unraisable.err_msg,
445                    f"Exception ignored in "
446                    f"PY_CODE_EVENT_CREATE watcher callback for {co!r}"
447                )
448                self.assertIsNone(cm.unraisable.object)
449                self.assertEqual(str(cm.unraisable.exc_value), "boom!")
450
451    @suppress_immortalization()
452    def test_dealloc_error(self):
453        co = _testcapi.code_newempty("test_watchers", "dummy0", 0)
454        with self.code_watcher(2):
455            with catch_unraisable_exception() as cm:
456                del co
457                gc_collect()
458
459                self.assertEqual(str(cm.unraisable.exc_value), "boom!")
460
461    def test_clear_out_of_range_watcher_id(self):
462        with self.assertRaisesRegex(ValueError, r"Invalid code watcher ID -1"):
463            _testcapi.clear_code_watcher(-1)
464        with self.assertRaisesRegex(ValueError, r"Invalid code watcher ID 8"):
465            _testcapi.clear_code_watcher(8)  # CODE_MAX_WATCHERS = 8
466
467    def test_clear_unassigned_watcher_id(self):
468        with self.assertRaisesRegex(ValueError, r"No code watcher set for ID 1"):
469            _testcapi.clear_code_watcher(1)
470
471    def test_allocate_too_many_watchers(self):
472        with self.assertRaisesRegex(RuntimeError, r"no more code watcher IDs available"):
473            _testcapi.allocate_too_many_code_watchers()
474
475
476class TestFuncWatchers(unittest.TestCase):
477    @contextmanager
478    def add_watcher(self, func):
479        wid = _testcapi.add_func_watcher(func)
480        try:
481            yield
482        finally:
483            _testcapi.clear_func_watcher(wid)
484
485    def test_func_events_dispatched(self):
486        events = []
487        def watcher(*args):
488            events.append(args)
489
490        with self.add_watcher(watcher):
491            def myfunc():
492                pass
493            self.assertIn((_testcapi.PYFUNC_EVENT_CREATE, myfunc, None), events)
494            myfunc_id = id(myfunc)
495
496            new_code = self.test_func_events_dispatched.__code__
497            myfunc.__code__ = new_code
498            self.assertIn((_testcapi.PYFUNC_EVENT_MODIFY_CODE, myfunc, new_code), events)
499
500            new_defaults = (123,)
501            myfunc.__defaults__ = new_defaults
502            self.assertIn((_testcapi.PYFUNC_EVENT_MODIFY_DEFAULTS, myfunc, new_defaults), events)
503
504            new_defaults = (456,)
505            _testcapi.set_func_defaults_via_capi(myfunc, new_defaults)
506            self.assertIn((_testcapi.PYFUNC_EVENT_MODIFY_DEFAULTS, myfunc, new_defaults), events)
507
508            new_kwdefaults = {"self": 123}
509            myfunc.__kwdefaults__ = new_kwdefaults
510            self.assertIn((_testcapi.PYFUNC_EVENT_MODIFY_KWDEFAULTS, myfunc, new_kwdefaults), events)
511
512            new_kwdefaults = {"self": 456}
513            _testcapi.set_func_kwdefaults_via_capi(myfunc, new_kwdefaults)
514            self.assertIn((_testcapi.PYFUNC_EVENT_MODIFY_KWDEFAULTS, myfunc, new_kwdefaults), events)
515
516            # Clear events reference to func
517            events = []
518            del myfunc
519            self.assertIn((_testcapi.PYFUNC_EVENT_DESTROY, myfunc_id, None), events)
520
521    def test_multiple_watchers(self):
522        events0 = []
523        def first_watcher(*args):
524            events0.append(args)
525
526        events1 = []
527        def second_watcher(*args):
528            events1.append(args)
529
530        with self.add_watcher(first_watcher):
531            with self.add_watcher(second_watcher):
532                def myfunc():
533                    pass
534
535                event = (_testcapi.PYFUNC_EVENT_CREATE, myfunc, None)
536                self.assertIn(event, events0)
537                self.assertIn(event, events1)
538
539    def test_watcher_raises_error(self):
540        class MyError(Exception):
541            pass
542
543        def watcher(*args):
544            raise MyError("testing 123")
545
546        with self.add_watcher(watcher):
547            with catch_unraisable_exception() as cm:
548                def myfunc():
549                    pass
550
551                self.assertEqual(
552                    cm.unraisable.err_msg,
553                    f"Exception ignored in "
554                    f"PyFunction_EVENT_CREATE watcher callback for {repr(myfunc)[1:-1]}"
555                )
556                self.assertIsNone(cm.unraisable.object)
557
558    def test_dealloc_watcher_raises_error(self):
559        class MyError(Exception):
560            pass
561
562        def watcher(*args):
563            raise MyError("testing 123")
564
565        def myfunc():
566            pass
567
568        with self.add_watcher(watcher):
569            with catch_unraisable_exception() as cm:
570                del myfunc
571
572                self.assertIsInstance(cm.unraisable.exc_value, MyError)
573
574    def test_clear_out_of_range_watcher_id(self):
575        with self.assertRaisesRegex(ValueError, r"invalid func watcher ID -1"):
576            _testcapi.clear_func_watcher(-1)
577        with self.assertRaisesRegex(ValueError, r"invalid func watcher ID 8"):
578            _testcapi.clear_func_watcher(8)  # FUNC_MAX_WATCHERS = 8
579
580    def test_clear_unassigned_watcher_id(self):
581        with self.assertRaisesRegex(ValueError, r"no func watcher set for ID 1"):
582            _testcapi.clear_func_watcher(1)
583
584    def test_allocate_too_many_watchers(self):
585        with self.assertRaisesRegex(RuntimeError, r"no more func watcher IDs"):
586            _testcapi.allocate_too_many_func_watchers()
587
588
589if __name__ == "__main__":
590    unittest.main()
591