• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Run the _testcapi module tests (tests for the Python/C API):  by defn,
2# these are all functions _testcapi exports whose name begins with 'test_'.
3
4import _thread
5from collections import deque
6import contextlib
7import importlib.machinery
8import importlib.util
9import json
10import os
11import pickle
12import queue
13import random
14import sys
15import textwrap
16import threading
17import time
18import types
19import unittest
20import warnings
21import weakref
22import operator
23from test import support
24from test.support import MISSING_C_DOCSTRINGS
25from test.support import import_helper
26from test.support import threading_helper
27from test.support import warnings_helper
28from test.support import requires_limited_api
29from test.support import suppress_immortalization
30from test.support import expected_failure_if_gil_disabled
31from test.support import Py_GIL_DISABLED
32from test.support.script_helper import assert_python_failure, assert_python_ok, run_python_until_end
33try:
34    import _posixsubprocess
35except ImportError:
36    _posixsubprocess = None
37try:
38    import _testmultiphase
39except ImportError:
40    _testmultiphase = None
41try:
42    import _testsinglephase
43except ImportError:
44    _testsinglephase = None
45try:
46    import _interpreters
47except ModuleNotFoundError:
48    _interpreters = None
49
50# Skip this test if the _testcapi module isn't available.
51_testcapi = import_helper.import_module('_testcapi')
52
53import _testlimitedcapi
54import _testinternalcapi
55
56
57NULL = None
58
59def decode_stderr(err):
60    return err.decode('utf-8', 'replace').replace('\r', '')
61
62
63def requires_subinterpreters(meth):
64    """Decorator to skip a test if subinterpreters are not supported."""
65    return unittest.skipIf(_interpreters is None,
66                           'subinterpreters required')(meth)
67
68
69def testfunction(self):
70    """some doc"""
71    return self
72
73
74class InstanceMethod:
75    id = _testcapi.instancemethod(id)
76    testfunction = _testcapi.instancemethod(testfunction)
77
78class CAPITest(unittest.TestCase):
79
80    def test_instancemethod(self):
81        inst = InstanceMethod()
82        self.assertEqual(id(inst), inst.id())
83        self.assertTrue(inst.testfunction() is inst)
84        self.assertEqual(inst.testfunction.__doc__, testfunction.__doc__)
85        self.assertEqual(InstanceMethod.testfunction.__doc__, testfunction.__doc__)
86
87        InstanceMethod.testfunction.attribute = "test"
88        self.assertEqual(testfunction.attribute, "test")
89        self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test")
90
91    @support.requires_subprocess()
92    def test_no_FatalError_infinite_loop(self):
93        code = textwrap.dedent("""
94            import _testcapi
95            from test import support
96
97            with support.SuppressCrashReport():
98                _testcapi.crash_no_current_thread()
99        """)
100
101        run_result, _cmd_line = run_python_until_end('-c', code)
102        _rc, out, err = run_result
103        self.assertEqual(out, b'')
104        # This used to cause an infinite loop.
105        msg = ("Fatal Python error: PyThreadState_Get: "
106               "the function must be called with the GIL held, "
107               "after Python initialization and before Python finalization, "
108               "but the GIL is released "
109               "(the current Python thread state is NULL)").encode()
110        self.assertTrue(err.rstrip().startswith(msg),
111                        err)
112
113    def test_memoryview_from_NULL_pointer(self):
114        self.assertRaises(ValueError, _testcapi.make_memoryview_from_NULL_pointer)
115
116    @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
117    def test_seq_bytes_to_charp_array(self):
118        # Issue #15732: crash in _PySequence_BytesToCharpArray()
119        class Z(object):
120            def __len__(self):
121                return 1
122        with self.assertRaisesRegex(TypeError, 'indexing'):
123            _posixsubprocess.fork_exec(
124                          1,Z(),True,(1, 2),5,6,7,8,9,10,11,12,13,14,True,True,17,False,19,20,21,22,False)
125        # Issue #15736: overflow in _PySequence_BytesToCharpArray()
126        class Z(object):
127            def __len__(self):
128                return sys.maxsize
129            def __getitem__(self, i):
130                return b'x'
131        self.assertRaises(MemoryError, _posixsubprocess.fork_exec,
132                          1,Z(),True,(1, 2),5,6,7,8,9,10,11,12,13,14,True,True,17,False,19,20,21,22,False)
133
134    @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
135    def test_subprocess_fork_exec(self):
136        class Z(object):
137            def __len__(self):
138                return 1
139
140        # Issue #15738: crash in subprocess_fork_exec()
141        self.assertRaises(TypeError, _posixsubprocess.fork_exec,
142                          Z(),[b'1'],True,(1, 2),5,6,7,8,9,10,11,12,13,14,True,True,17,False,19,20,21,22,False)
143
144    @unittest.skipIf(MISSING_C_DOCSTRINGS,
145                     "Signature information for builtins requires docstrings")
146    def test_docstring_signature_parsing(self):
147
148        self.assertEqual(_testcapi.no_docstring.__doc__, None)
149        self.assertEqual(_testcapi.no_docstring.__text_signature__, None)
150
151        self.assertEqual(_testcapi.docstring_empty.__doc__, None)
152        self.assertEqual(_testcapi.docstring_empty.__text_signature__, None)
153
154        self.assertEqual(_testcapi.docstring_no_signature.__doc__,
155            "This docstring has no signature.")
156        self.assertEqual(_testcapi.docstring_no_signature.__text_signature__, None)
157
158        self.assertEqual(_testcapi.docstring_with_invalid_signature.__doc__,
159            "docstring_with_invalid_signature($module, /, boo)\n"
160            "\n"
161            "This docstring has an invalid signature."
162            )
163        self.assertEqual(_testcapi.docstring_with_invalid_signature.__text_signature__, None)
164
165        self.assertEqual(_testcapi.docstring_with_invalid_signature2.__doc__,
166            "docstring_with_invalid_signature2($module, /, boo)\n"
167            "\n"
168            "--\n"
169            "\n"
170            "This docstring also has an invalid signature."
171            )
172        self.assertEqual(_testcapi.docstring_with_invalid_signature2.__text_signature__, None)
173
174        self.assertEqual(_testcapi.docstring_with_signature.__doc__,
175            "This docstring has a valid signature.")
176        self.assertEqual(_testcapi.docstring_with_signature.__text_signature__, "($module, /, sig)")
177
178        self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__doc__, None)
179        self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__text_signature__,
180            "($module, /, sig)")
181
182        self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__doc__,
183            "\nThis docstring has a valid signature and some extra newlines.")
184        self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__text_signature__,
185            "($module, /, parameter)")
186
187    def test_c_type_with_matrix_multiplication(self):
188        M = _testcapi.matmulType
189        m1 = M()
190        m2 = M()
191        self.assertEqual(m1 @ m2, ("matmul", m1, m2))
192        self.assertEqual(m1 @ 42, ("matmul", m1, 42))
193        self.assertEqual(42 @ m1, ("matmul", 42, m1))
194        o = m1
195        o @= m2
196        self.assertEqual(o, ("imatmul", m1, m2))
197        o = m1
198        o @= 42
199        self.assertEqual(o, ("imatmul", m1, 42))
200        o = 42
201        o @= m1
202        self.assertEqual(o, ("matmul", 42, m1))
203
204    def test_c_type_with_ipow(self):
205        # When the __ipow__ method of a type was implemented in C, using the
206        # modulo param would cause segfaults.
207        o = _testcapi.ipowType()
208        self.assertEqual(o.__ipow__(1), (1, None))
209        self.assertEqual(o.__ipow__(2, 2), (2, 2))
210
211    def test_return_null_without_error(self):
212        # Issue #23571: A function must not return NULL without setting an
213        # error
214        if support.Py_DEBUG:
215            code = textwrap.dedent("""
216                import _testcapi
217                from test import support
218
219                with support.SuppressCrashReport():
220                    _testcapi.return_null_without_error()
221            """)
222            rc, out, err = assert_python_failure('-c', code)
223            err = decode_stderr(err)
224            self.assertRegex(err,
225                r'Fatal Python error: _Py_CheckFunctionResult: '
226                    r'a function returned NULL without setting an exception\n'
227                r'Python runtime state: initialized\n'
228                r'SystemError: <built-in function return_null_without_error> '
229                    r'returned NULL without setting an exception\n'
230                r'\n'
231                r'Current thread.*:\n'
232                r'  File .*", line 6 in <module>\n')
233        else:
234            with self.assertRaises(SystemError) as cm:
235                _testcapi.return_null_without_error()
236            self.assertRegex(str(cm.exception),
237                             'return_null_without_error.* '
238                             'returned NULL without setting an exception')
239
240    def test_return_result_with_error(self):
241        # Issue #23571: A function must not return a result with an error set
242        if support.Py_DEBUG:
243            code = textwrap.dedent("""
244                import _testcapi
245                from test import support
246
247                with support.SuppressCrashReport():
248                    _testcapi.return_result_with_error()
249            """)
250            rc, out, err = assert_python_failure('-c', code)
251            err = decode_stderr(err)
252            self.assertRegex(err,
253                    r'Fatal Python error: _Py_CheckFunctionResult: '
254                        r'a function returned a result with an exception set\n'
255                    r'Python runtime state: initialized\n'
256                    r'ValueError\n'
257                    r'\n'
258                    r'The above exception was the direct cause '
259                        r'of the following exception:\n'
260                    r'\n'
261                    r'SystemError: <built-in '
262                        r'function return_result_with_error> '
263                        r'returned a result with an exception set\n'
264                    r'\n'
265                    r'Current thread.*:\n'
266                    r'  File .*, line 6 in <module>\n')
267        else:
268            with self.assertRaises(SystemError) as cm:
269                _testcapi.return_result_with_error()
270            self.assertRegex(str(cm.exception),
271                             'return_result_with_error.* '
272                             'returned a result with an exception set')
273
274    def test_getitem_with_error(self):
275        # Test _Py_CheckSlotResult(). Raise an exception and then calls
276        # PyObject_GetItem(): check that the assertion catches the bug.
277        # PyObject_GetItem() must not be called with an exception set.
278        code = textwrap.dedent("""
279            import _testcapi
280            from test import support
281
282            with support.SuppressCrashReport():
283                _testcapi.getitem_with_error({1: 2}, 1)
284        """)
285        rc, out, err = assert_python_failure('-c', code)
286        err = decode_stderr(err)
287        if 'SystemError: ' not in err:
288            self.assertRegex(err,
289                    r'Fatal Python error: _Py_CheckSlotResult: '
290                        r'Slot __getitem__ of type dict succeeded '
291                        r'with an exception set\n'
292                    r'Python runtime state: initialized\n'
293                    r'ValueError: bug\n'
294                    r'\n'
295                    r'Current thread .* \(most recent call first\):\n'
296                    r'  File .*, line 6 in <module>\n'
297                    r'\n'
298                    r'Extension modules: _testcapi \(total: 1\)\n')
299        else:
300            # Python built with NDEBUG macro defined:
301            # test _Py_CheckFunctionResult() instead.
302            self.assertIn('returned a result with an exception set', err)
303
304    def test_buildvalue(self):
305        # Test Py_BuildValue() with object arguments
306        buildvalue = _testcapi.py_buildvalue
307        self.assertEqual(buildvalue(''), None)
308        self.assertEqual(buildvalue('()'), ())
309        self.assertEqual(buildvalue('[]'), [])
310        self.assertEqual(buildvalue('{}'), {})
311        self.assertEqual(buildvalue('()[]{}'), ((), [], {}))
312        self.assertEqual(buildvalue('O', 1), 1)
313        self.assertEqual(buildvalue('(O)', 1), (1,))
314        self.assertEqual(buildvalue('[O]', 1), [1])
315        self.assertRaises(SystemError, buildvalue, '{O}', 1)
316        self.assertEqual(buildvalue('OO', 1, 2), (1, 2))
317        self.assertEqual(buildvalue('(OO)', 1, 2), (1, 2))
318        self.assertEqual(buildvalue('[OO]', 1, 2), [1, 2])
319        self.assertEqual(buildvalue('{OO}', 1, 2), {1: 2})
320        self.assertEqual(buildvalue('{OOOO}', 1, 2, 3, 4), {1: 2, 3: 4})
321        self.assertEqual(buildvalue('((O))', 1), ((1,),))
322        self.assertEqual(buildvalue('((OO))', 1, 2), ((1, 2),))
323
324        self.assertEqual(buildvalue(' \t,:'), None)
325        self.assertEqual(buildvalue('O,', 1), 1)
326        self.assertEqual(buildvalue('   O   ', 1), 1)
327        self.assertEqual(buildvalue('\tO\t', 1), 1)
328        self.assertEqual(buildvalue('O,O', 1, 2), (1, 2))
329        self.assertEqual(buildvalue('O, O', 1, 2), (1, 2))
330        self.assertEqual(buildvalue('O,\tO', 1, 2), (1, 2))
331        self.assertEqual(buildvalue('O O', 1, 2), (1, 2))
332        self.assertEqual(buildvalue('O\tO', 1, 2), (1, 2))
333        self.assertEqual(buildvalue('(O,O)', 1, 2), (1, 2))
334        self.assertEqual(buildvalue('(O, O,)', 1, 2), (1, 2))
335        self.assertEqual(buildvalue(' ( O O ) ', 1, 2), (1, 2))
336        self.assertEqual(buildvalue('\t(\tO\tO\t)\t', 1, 2), (1, 2))
337        self.assertEqual(buildvalue('[O,O]', 1, 2), [1, 2])
338        self.assertEqual(buildvalue('[O, O,]', 1, 2), [1, 2])
339        self.assertEqual(buildvalue(' [ O O ] ', 1, 2), [1, 2])
340        self.assertEqual(buildvalue(' [\tO\tO\t] ', 1, 2), [1, 2])
341        self.assertEqual(buildvalue('{O:O}', 1, 2), {1: 2})
342        self.assertEqual(buildvalue('{O:O,O:O}', 1, 2, 3, 4), {1: 2, 3: 4})
343        self.assertEqual(buildvalue('{O: O, O: O,}', 1, 2, 3, 4), {1: 2, 3: 4})
344        self.assertEqual(buildvalue(' { O O O O } ', 1, 2, 3, 4), {1: 2, 3: 4})
345        self.assertEqual(buildvalue('\t{\tO\tO\tO\tO\t}\t', 1, 2, 3, 4), {1: 2, 3: 4})
346
347        self.assertRaises(SystemError, buildvalue, 'O', NULL)
348        self.assertRaises(SystemError, buildvalue, '(O)', NULL)
349        self.assertRaises(SystemError, buildvalue, '[O]', NULL)
350        self.assertRaises(SystemError, buildvalue, '{O}', NULL)
351        self.assertRaises(SystemError, buildvalue, 'OO', 1, NULL)
352        self.assertRaises(SystemError, buildvalue, 'OO', NULL, 2)
353        self.assertRaises(SystemError, buildvalue, '(OO)', 1, NULL)
354        self.assertRaises(SystemError, buildvalue, '(OO)', NULL, 2)
355        self.assertRaises(SystemError, buildvalue, '[OO]', 1, NULL)
356        self.assertRaises(SystemError, buildvalue, '[OO]', NULL, 2)
357        self.assertRaises(SystemError, buildvalue, '{OO}', 1, NULL)
358        self.assertRaises(SystemError, buildvalue, '{OO}', NULL, 2)
359
360    def test_buildvalue_ints(self):
361        # Test Py_BuildValue() with integer arguments
362        buildvalue = _testcapi.py_buildvalue_ints
363        from _testcapi import SHRT_MIN, SHRT_MAX, USHRT_MAX, INT_MIN, INT_MAX, UINT_MAX
364        self.assertEqual(buildvalue('i', INT_MAX), INT_MAX)
365        self.assertEqual(buildvalue('i', INT_MIN), INT_MIN)
366        self.assertEqual(buildvalue('I', UINT_MAX), UINT_MAX)
367
368        self.assertEqual(buildvalue('h', SHRT_MAX), SHRT_MAX)
369        self.assertEqual(buildvalue('h', SHRT_MIN), SHRT_MIN)
370        self.assertEqual(buildvalue('H', USHRT_MAX), USHRT_MAX)
371
372        self.assertEqual(buildvalue('b', 127), 127)
373        self.assertEqual(buildvalue('b', -128), -128)
374        self.assertEqual(buildvalue('B', 255), 255)
375
376        self.assertEqual(buildvalue('c', ord('A')), b'A')
377        self.assertEqual(buildvalue('c', 255), b'\xff')
378        self.assertEqual(buildvalue('c', 256), b'\x00')
379        self.assertEqual(buildvalue('c', -1), b'\xff')
380
381        self.assertEqual(buildvalue('C', 255), chr(255))
382        self.assertEqual(buildvalue('C', 256), chr(256))
383        self.assertEqual(buildvalue('C', sys.maxunicode), chr(sys.maxunicode))
384        self.assertRaises(ValueError, buildvalue, 'C', -1)
385        self.assertRaises(ValueError, buildvalue, 'C', sys.maxunicode+1)
386
387        # gh-84489
388        self.assertRaises(ValueError, buildvalue, '(C )i', -1, 2)
389        self.assertRaises(ValueError, buildvalue, '[C ]i', -1, 2)
390        self.assertRaises(ValueError, buildvalue, '{Ci }i', -1, 2, 3)
391
392    def test_buildvalue_N(self):
393        _testcapi.test_buildvalue_N()
394
395    def check_negative_refcount(self, code):
396        # bpo-35059: Check that Py_DECREF() reports the correct filename
397        # when calling _Py_NegativeRefcount() to abort Python.
398        code = textwrap.dedent(code)
399        rc, out, err = assert_python_failure('-c', code)
400        self.assertRegex(err,
401                         br'_testcapimodule\.c:[0-9]+: '
402                         br'_Py_NegativeRefcount: Assertion failed: '
403                         br'object has negative ref count')
404
405    @unittest.skipUnless(hasattr(_testcapi, 'negative_refcount'),
406                         'need _testcapi.negative_refcount()')
407    def test_negative_refcount(self):
408        code = """
409            import _testcapi
410            from test import support
411
412            with support.SuppressCrashReport():
413                _testcapi.negative_refcount()
414        """
415        self.check_negative_refcount(code)
416
417    @unittest.skipUnless(hasattr(_testcapi, 'decref_freed_object'),
418                         'need _testcapi.decref_freed_object()')
419    @support.skip_if_sanitizer("use after free on purpose",
420                               address=True, memory=True, ub=True)
421    def test_decref_freed_object(self):
422        code = """
423            import _testcapi
424            from test import support
425
426            with support.SuppressCrashReport():
427                _testcapi.decref_freed_object()
428        """
429        self.check_negative_refcount(code)
430
431    def test_trashcan_subclass(self):
432        # bpo-35983: Check that the trashcan mechanism for "list" is NOT
433        # activated when its tp_dealloc is being called by a subclass
434        from _testcapi import MyList
435        L = None
436        for i in range(1000):
437            L = MyList((L,))
438
439    @support.requires_resource('cpu')
440    def test_trashcan_python_class1(self):
441        self.do_test_trashcan_python_class(list)
442
443    @support.requires_resource('cpu')
444    def test_trashcan_python_class2(self):
445        from _testcapi import MyList
446        self.do_test_trashcan_python_class(MyList)
447
448    def do_test_trashcan_python_class(self, base):
449        # Check that the trashcan mechanism works properly for a Python
450        # subclass of a class using the trashcan (this specific test assumes
451        # that the base class "base" behaves like list)
452        class PyList(base):
453            # Count the number of PyList instances to verify that there is
454            # no memory leak
455            num = 0
456            def __init__(self, *args):
457                __class__.num += 1
458                super().__init__(*args)
459            def __del__(self):
460                __class__.num -= 1
461
462        for parity in (0, 1):
463            L = None
464            # We need in the order of 2**20 iterations here such that a
465            # typical 8MB stack would overflow without the trashcan.
466            for i in range(2**20):
467                L = PyList((L,))
468                L.attr = i
469            if parity:
470                # Add one additional nesting layer
471                L = (L,)
472            self.assertGreater(PyList.num, 0)
473            del L
474            self.assertEqual(PyList.num, 0)
475
476    @unittest.skipIf(MISSING_C_DOCSTRINGS,
477                     "Signature information for builtins requires docstrings")
478    def test_heap_ctype_doc_and_text_signature(self):
479        self.assertEqual(_testcapi.HeapDocCType.__doc__, "somedoc")
480        self.assertEqual(_testcapi.HeapDocCType.__text_signature__, "(arg1, arg2)")
481
482    def test_null_type_doc(self):
483        self.assertEqual(_testcapi.NullTpDocType.__doc__, None)
484
485    @suppress_immortalization()
486    def test_subclass_of_heap_gc_ctype_with_tpdealloc_decrefs_once(self):
487        class HeapGcCTypeSubclass(_testcapi.HeapGcCType):
488            def __init__(self):
489                self.value2 = 20
490                super().__init__()
491
492        subclass_instance = HeapGcCTypeSubclass()
493        type_refcnt = sys.getrefcount(HeapGcCTypeSubclass)
494
495        # Test that subclass instance was fully created
496        self.assertEqual(subclass_instance.value, 10)
497        self.assertEqual(subclass_instance.value2, 20)
498
499        # Test that the type reference count is only decremented once
500        del subclass_instance
501        self.assertEqual(type_refcnt - 1, sys.getrefcount(HeapGcCTypeSubclass))
502
503    @suppress_immortalization()
504    def test_subclass_of_heap_gc_ctype_with_del_modifying_dunder_class_only_decrefs_once(self):
505        class A(_testcapi.HeapGcCType):
506            def __init__(self):
507                self.value2 = 20
508                super().__init__()
509
510        class B(A):
511            def __init__(self):
512                super().__init__()
513
514            def __del__(self):
515                self.__class__ = A
516                A.refcnt_in_del = sys.getrefcount(A)
517                B.refcnt_in_del = sys.getrefcount(B)
518
519        subclass_instance = B()
520        type_refcnt = sys.getrefcount(B)
521        new_type_refcnt = sys.getrefcount(A)
522
523        # Test that subclass instance was fully created
524        self.assertEqual(subclass_instance.value, 10)
525        self.assertEqual(subclass_instance.value2, 20)
526
527        del subclass_instance
528
529        # Test that setting __class__ modified the reference counts of the types
530        if support.Py_DEBUG:
531            # gh-89373: In debug mode, _Py_Dealloc() keeps a strong reference
532            # to the type while calling tp_dealloc()
533            self.assertEqual(type_refcnt, B.refcnt_in_del)
534        else:
535            self.assertEqual(type_refcnt - 1, B.refcnt_in_del)
536        self.assertEqual(new_type_refcnt + 1, A.refcnt_in_del)
537
538        # Test that the original type already has decreased its refcnt
539        self.assertEqual(type_refcnt - 1, sys.getrefcount(B))
540
541        # Test that subtype_dealloc decref the newly assigned __class__ only once
542        self.assertEqual(new_type_refcnt, sys.getrefcount(A))
543
544    def test_heaptype_with_dict(self):
545        inst = _testcapi.HeapCTypeWithDict()
546        inst.foo = 42
547        self.assertEqual(inst.foo, 42)
548        self.assertEqual(inst.dictobj, inst.__dict__)
549        self.assertEqual(inst.dictobj, {"foo": 42})
550
551        inst = _testcapi.HeapCTypeWithDict()
552        self.assertEqual({}, inst.__dict__)
553
554    def test_heaptype_with_managed_dict(self):
555        inst = _testcapi.HeapCTypeWithManagedDict()
556        inst.foo = 42
557        self.assertEqual(inst.foo, 42)
558        self.assertEqual(inst.__dict__, {"foo": 42})
559
560        inst = _testcapi.HeapCTypeWithManagedDict()
561        self.assertEqual({}, inst.__dict__)
562
563        a = _testcapi.HeapCTypeWithManagedDict()
564        b = _testcapi.HeapCTypeWithManagedDict()
565        a.b = b
566        b.a = a
567        del a, b
568
569    def test_sublclassing_managed_dict(self):
570
571        class C(_testcapi.HeapCTypeWithManagedDict):
572            pass
573
574        i = C()
575        i.spam = i
576        del i
577
578    def test_heaptype_with_negative_dict(self):
579        inst = _testcapi.HeapCTypeWithNegativeDict()
580        inst.foo = 42
581        self.assertEqual(inst.foo, 42)
582        self.assertEqual(inst.dictobj, inst.__dict__)
583        self.assertEqual(inst.dictobj, {"foo": 42})
584
585        inst = _testcapi.HeapCTypeWithNegativeDict()
586        self.assertEqual({}, inst.__dict__)
587
588    def test_heaptype_with_weakref(self):
589        inst = _testcapi.HeapCTypeWithWeakref()
590        ref = weakref.ref(inst)
591        self.assertEqual(ref(), inst)
592        self.assertEqual(inst.weakreflist, ref)
593
594    def test_heaptype_with_managed_weakref(self):
595        inst = _testcapi.HeapCTypeWithManagedWeakref()
596        ref = weakref.ref(inst)
597        self.assertEqual(ref(), inst)
598
599    def test_sublclassing_managed_weakref(self):
600
601        class C(_testcapi.HeapCTypeWithManagedWeakref):
602            pass
603
604        inst = C()
605        ref = weakref.ref(inst)
606        self.assertEqual(ref(), inst)
607
608    def test_sublclassing_managed_both(self):
609
610        class C1(_testcapi.HeapCTypeWithManagedWeakref, _testcapi.HeapCTypeWithManagedDict):
611            pass
612
613        class C2(_testcapi.HeapCTypeWithManagedDict, _testcapi.HeapCTypeWithManagedWeakref):
614            pass
615
616        for cls in (C1, C2):
617            inst = cls()
618            ref = weakref.ref(inst)
619            self.assertEqual(ref(), inst)
620            inst.spam = inst
621            del inst
622            ref = weakref.ref(cls())
623            self.assertIs(ref(), None)
624
625    def test_heaptype_with_buffer(self):
626        inst = _testcapi.HeapCTypeWithBuffer()
627        b = bytes(inst)
628        self.assertEqual(b, b"1234")
629
630    def test_c_subclass_of_heap_ctype_with_tpdealloc_decrefs_once(self):
631        subclass_instance = _testcapi.HeapCTypeSubclass()
632        type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclass)
633
634        # Test that subclass instance was fully created
635        self.assertEqual(subclass_instance.value, 10)
636        self.assertEqual(subclass_instance.value2, 20)
637
638        # Test that the type reference count is only decremented once
639        del subclass_instance
640        self.assertEqual(type_refcnt - 1, sys.getrefcount(_testcapi.HeapCTypeSubclass))
641
642    def test_c_subclass_of_heap_ctype_with_del_modifying_dunder_class_only_decrefs_once(self):
643        subclass_instance = _testcapi.HeapCTypeSubclassWithFinalizer()
644        type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclassWithFinalizer)
645        new_type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclass)
646
647        # Test that subclass instance was fully created
648        self.assertEqual(subclass_instance.value, 10)
649        self.assertEqual(subclass_instance.value2, 20)
650
651        # The tp_finalize slot will set __class__ to HeapCTypeSubclass
652        del subclass_instance
653
654        # Test that setting __class__ modified the reference counts of the types
655        if support.Py_DEBUG:
656            # gh-89373: In debug mode, _Py_Dealloc() keeps a strong reference
657            # to the type while calling tp_dealloc()
658            self.assertEqual(type_refcnt, _testcapi.HeapCTypeSubclassWithFinalizer.refcnt_in_del)
659        else:
660            self.assertEqual(type_refcnt - 1, _testcapi.HeapCTypeSubclassWithFinalizer.refcnt_in_del)
661        self.assertEqual(new_type_refcnt + 1, _testcapi.HeapCTypeSubclass.refcnt_in_del)
662
663        # Test that the original type already has decreased its refcnt
664        self.assertEqual(type_refcnt - 1, sys.getrefcount(_testcapi.HeapCTypeSubclassWithFinalizer))
665
666        # Test that subtype_dealloc decref the newly assigned __class__ only once
667        self.assertEqual(new_type_refcnt, sys.getrefcount(_testcapi.HeapCTypeSubclass))
668
669    def test_heaptype_with_setattro(self):
670        obj = _testcapi.HeapCTypeSetattr()
671        self.assertEqual(obj.pvalue, 10)
672        obj.value = 12
673        self.assertEqual(obj.pvalue, 12)
674        del obj.value
675        self.assertEqual(obj.pvalue, 0)
676
677    def test_heaptype_with_custom_metaclass(self):
678        metaclass = _testcapi.HeapCTypeMetaclass
679        self.assertTrue(issubclass(metaclass, type))
680
681        # Class creation from C
682        t = _testcapi.pytype_fromspec_meta(metaclass)
683        self.assertIsInstance(t, type)
684        self.assertEqual(t.__name__, "HeapCTypeViaMetaclass")
685        self.assertIs(type(t), metaclass)
686
687        # Class creation from Python
688        t = metaclass("PyClassViaMetaclass", (), {})
689        self.assertIsInstance(t, type)
690        self.assertEqual(t.__name__, "PyClassViaMetaclass")
691
692    def test_heaptype_with_custom_metaclass_null_new(self):
693        metaclass = _testcapi.HeapCTypeMetaclassNullNew
694
695        self.assertTrue(issubclass(metaclass, type))
696
697        # Class creation from C
698        t = _testcapi.pytype_fromspec_meta(metaclass)
699        self.assertIsInstance(t, type)
700        self.assertEqual(t.__name__, "HeapCTypeViaMetaclass")
701        self.assertIs(type(t), metaclass)
702
703        # Class creation from Python
704        with self.assertRaisesRegex(TypeError, "cannot create .* instances"):
705            metaclass("PyClassViaMetaclass", (), {})
706
707    def test_heaptype_with_custom_metaclass_custom_new(self):
708        metaclass = _testcapi.HeapCTypeMetaclassCustomNew
709
710        self.assertTrue(issubclass(_testcapi.HeapCTypeMetaclassCustomNew, type))
711
712        msg = "Metaclasses with custom tp_new are not supported."
713        with self.assertRaisesRegex(TypeError, msg):
714            t = _testcapi.pytype_fromspec_meta(metaclass)
715
716    def test_heaptype_with_custom_metaclass_deprecation(self):
717        metaclass = _testcapi.HeapCTypeMetaclassCustomNew
718
719        # gh-103968: a metaclass with custom tp_new is deprecated, but still
720        # allowed for functions that existed in 3.11
721        # (PyType_FromSpecWithBases is used here).
722        class Base(metaclass=metaclass):
723            pass
724
725        # Class creation from C
726        with warnings_helper.check_warnings(
727                ('.* _testcapi.Subclass .* custom tp_new.*in Python 3.14.*', DeprecationWarning),
728                ):
729            sub = _testcapi.make_type_with_base(Base)
730        self.assertTrue(issubclass(sub, Base))
731        self.assertIsInstance(sub, metaclass)
732
733    def test_multiple_inheritance_ctypes_with_weakref_or_dict(self):
734
735        with self.assertRaises(TypeError):
736            class Both1(_testcapi.HeapCTypeWithWeakref, _testcapi.HeapCTypeWithDict):
737                pass
738        with self.assertRaises(TypeError):
739            class Both2(_testcapi.HeapCTypeWithDict, _testcapi.HeapCTypeWithWeakref):
740                pass
741
742    def test_multiple_inheritance_ctypes_with_weakref_or_dict_and_other_builtin(self):
743
744        with self.assertRaises(TypeError):
745            class C1(_testcapi.HeapCTypeWithDict, list):
746                pass
747
748        with self.assertRaises(TypeError):
749            class C2(_testcapi.HeapCTypeWithWeakref, list):
750                pass
751
752        class C3(_testcapi.HeapCTypeWithManagedDict, list):
753            pass
754        class C4(_testcapi.HeapCTypeWithManagedWeakref, list):
755            pass
756
757        inst = C3()
758        inst.append(0)
759        str(inst.__dict__)
760
761        inst = C4()
762        inst.append(0)
763        str(inst.__weakref__)
764
765        for cls in (_testcapi.HeapCTypeWithManagedDict, _testcapi.HeapCTypeWithManagedWeakref):
766            for cls2 in (_testcapi.HeapCTypeWithDict, _testcapi.HeapCTypeWithWeakref):
767                class S(cls, cls2):
768                    pass
769            class B1(C3, cls):
770                pass
771            class B2(C4, cls):
772                pass
773
774    def test_pytype_fromspec_with_repeated_slots(self):
775        for variant in range(2):
776            with self.subTest(variant=variant):
777                with self.assertRaises(SystemError):
778                    _testcapi.create_type_from_repeated_slots(variant)
779
780    @warnings_helper.ignore_warnings(category=DeprecationWarning)
781    def test_immutable_type_with_mutable_base(self):
782        # Add deprecation warning here so it's removed in 3.14
783        warnings._deprecated(
784            'creating immutable classes with mutable bases', remove=(3, 14))
785
786        class MutableBase:
787            def meth(self):
788                return 'original'
789
790        with self.assertWarns(DeprecationWarning):
791            ImmutableSubclass = _testcapi.make_immutable_type_with_base(
792                MutableBase)
793        instance = ImmutableSubclass()
794
795        self.assertEqual(instance.meth(), 'original')
796
797        # Cannot override the static type's method
798        with self.assertRaisesRegex(
799                TypeError,
800                "cannot set 'meth' attribute of immutable type"):
801            ImmutableSubclass.meth = lambda self: 'overridden'
802        self.assertEqual(instance.meth(), 'original')
803
804        # Can change the method on the mutable base
805        MutableBase.meth = lambda self: 'changed'
806        self.assertEqual(instance.meth(), 'changed')
807
808    def test_pynumber_tobase(self):
809        from _testcapi import pynumber_tobase
810        small_number = 123
811        large_number = 2**64
812        class IDX:
813            def __init__(self, val):
814                self.val = val
815            def __index__(self):
816                return self.val
817
818        test_cases = ((2, '0b1111011', '0b10000000000000000000000000000000000000000000000000000000000000000'),
819                      (8, '0o173', '0o2000000000000000000000'),
820                      (10, '123', '18446744073709551616'),
821                      (16, '0x7b', '0x10000000000000000'))
822        for base, small_target, large_target in test_cases:
823            with self.subTest(base=base, st=small_target, lt=large_target):
824                # Test for small number
825                self.assertEqual(pynumber_tobase(small_number, base), small_target)
826                self.assertEqual(pynumber_tobase(-small_number, base), '-' + small_target)
827                self.assertEqual(pynumber_tobase(IDX(small_number), base), small_target)
828                # Test for large number(out of range of a longlong,i.e.[-2**63, 2**63-1])
829                self.assertEqual(pynumber_tobase(large_number, base), large_target)
830                self.assertEqual(pynumber_tobase(-large_number, base), '-' + large_target)
831                self.assertEqual(pynumber_tobase(IDX(large_number), base), large_target)
832        self.assertRaises(TypeError, pynumber_tobase, IDX(123.0), 10)
833        self.assertRaises(TypeError, pynumber_tobase, IDX('123'), 10)
834        self.assertRaises(TypeError, pynumber_tobase, 123.0, 10)
835        self.assertRaises(TypeError, pynumber_tobase, '123', 10)
836        self.assertRaises(SystemError, pynumber_tobase, 123, 0)
837
838    def test_pyobject_repr_from_null(self):
839        s = _testcapi.pyobject_repr_from_null()
840        self.assertEqual(s, '<NULL>')
841
842    def test_pyobject_str_from_null(self):
843        s = _testcapi.pyobject_str_from_null()
844        self.assertEqual(s, '<NULL>')
845
846    def test_pyobject_bytes_from_null(self):
847        s = _testcapi.pyobject_bytes_from_null()
848        self.assertEqual(s, b'<NULL>')
849
850    def test_Py_CompileString(self):
851        # Check that Py_CompileString respects the coding cookie
852        _compile = _testcapi.Py_CompileString
853        code = b"# -*- coding: latin1 -*-\nprint('\xc2\xa4')\n"
854        result = _compile(code)
855        expected = compile(code, "<string>", "exec")
856        self.assertEqual(result.co_consts, expected.co_consts)
857
858    def test_export_symbols(self):
859        # bpo-44133: Ensure that the "Py_FrozenMain" and
860        # "PyThread_get_thread_native_id" symbols are exported by the Python
861        # (directly by the binary, or via by the Python dynamic library).
862        ctypes = import_helper.import_module('ctypes')
863        names = []
864
865        # Test if the PY_HAVE_THREAD_NATIVE_ID macro is defined
866        if hasattr(_thread, 'get_native_id'):
867            names.append('PyThread_get_thread_native_id')
868
869        # Python/frozenmain.c fails to build on Windows when the symbols are
870        # missing:
871        # - PyWinFreeze_ExeInit
872        # - PyWinFreeze_ExeTerm
873        # - PyInitFrozenExtensions
874        if os.name != 'nt':
875            names.append('Py_FrozenMain')
876
877        for name in names:
878            with self.subTest(name=name):
879                self.assertTrue(hasattr(ctypes.pythonapi, name))
880
881    def test_clear_managed_dict(self):
882
883        class C:
884            def __init__(self):
885                self.a = 1
886
887        c = C()
888        _testcapi.clear_managed_dict(c)
889        self.assertEqual(c.__dict__, {})
890        c = C()
891        self.assertEqual(c.__dict__, {'a':1})
892        _testcapi.clear_managed_dict(c)
893        self.assertEqual(c.__dict__, {})
894
895    def test_eval_get_func_name(self):
896        def function_example(): ...
897
898        class A:
899            def method_example(self): ...
900
901        self.assertEqual(_testcapi.eval_get_func_name(function_example),
902                         "function_example")
903        self.assertEqual(_testcapi.eval_get_func_name(A.method_example),
904                         "method_example")
905        self.assertEqual(_testcapi.eval_get_func_name(A().method_example),
906                         "method_example")
907        self.assertEqual(_testcapi.eval_get_func_name(sum), "sum")  # c function
908        self.assertEqual(_testcapi.eval_get_func_name(A), "type")
909
910    def test_eval_get_func_desc(self):
911        def function_example(): ...
912
913        class A:
914            def method_example(self): ...
915
916        self.assertEqual(_testcapi.eval_get_func_desc(function_example),
917                         "()")
918        self.assertEqual(_testcapi.eval_get_func_desc(A.method_example),
919                         "()")
920        self.assertEqual(_testcapi.eval_get_func_desc(A().method_example),
921                         "()")
922        self.assertEqual(_testcapi.eval_get_func_desc(sum), "()")  # c function
923        self.assertEqual(_testcapi.eval_get_func_desc(A), " object")
924
925    def test_function_get_code(self):
926        import types
927
928        def some():
929            pass
930
931        code = _testcapi.function_get_code(some)
932        self.assertIsInstance(code, types.CodeType)
933        self.assertEqual(code, some.__code__)
934
935        with self.assertRaises(SystemError):
936            _testcapi.function_get_code(None)  # not a function
937
938    def test_function_get_globals(self):
939        def some():
940            pass
941
942        globals_ = _testcapi.function_get_globals(some)
943        self.assertIsInstance(globals_, dict)
944        self.assertEqual(globals_, some.__globals__)
945
946        with self.assertRaises(SystemError):
947            _testcapi.function_get_globals(None)  # not a function
948
949    def test_function_get_module(self):
950        def some():
951            pass
952
953        module = _testcapi.function_get_module(some)
954        self.assertIsInstance(module, str)
955        self.assertEqual(module, some.__module__)
956
957        with self.assertRaises(SystemError):
958            _testcapi.function_get_module(None)  # not a function
959
960    def test_function_get_defaults(self):
961        def some(
962            pos_only1, pos_only2='p',
963            /,
964            zero=0, optional=None,
965            *,
966            kw1,
967            kw2=True,
968        ):
969            pass
970
971        defaults = _testcapi.function_get_defaults(some)
972        self.assertEqual(defaults, ('p', 0, None))
973        self.assertEqual(defaults, some.__defaults__)
974
975        with self.assertRaises(SystemError):
976            _testcapi.function_get_defaults(None)  # not a function
977
978    def test_function_set_defaults(self):
979        def some(
980            pos_only1, pos_only2='p',
981            /,
982            zero=0, optional=None,
983            *,
984            kw1,
985            kw2=True,
986        ):
987            pass
988
989        old_defaults = ('p', 0, None)
990        self.assertEqual(_testcapi.function_get_defaults(some), old_defaults)
991        self.assertEqual(some.__defaults__, old_defaults)
992
993        with self.assertRaises(SystemError):
994            _testcapi.function_set_defaults(some, 1)  # not tuple or None
995        self.assertEqual(_testcapi.function_get_defaults(some), old_defaults)
996        self.assertEqual(some.__defaults__, old_defaults)
997
998        with self.assertRaises(SystemError):
999            _testcapi.function_set_defaults(1, ())    # not a function
1000        self.assertEqual(_testcapi.function_get_defaults(some), old_defaults)
1001        self.assertEqual(some.__defaults__, old_defaults)
1002
1003        new_defaults = ('q', 1, None)
1004        _testcapi.function_set_defaults(some, new_defaults)
1005        self.assertEqual(_testcapi.function_get_defaults(some), new_defaults)
1006        self.assertEqual(some.__defaults__, new_defaults)
1007
1008        # Empty tuple is fine:
1009        new_defaults = ()
1010        _testcapi.function_set_defaults(some, new_defaults)
1011        self.assertEqual(_testcapi.function_get_defaults(some), new_defaults)
1012        self.assertEqual(some.__defaults__, new_defaults)
1013
1014        class tuplesub(tuple): ...  # tuple subclasses must work
1015
1016        new_defaults = tuplesub(((1, 2), ['a', 'b'], None))
1017        _testcapi.function_set_defaults(some, new_defaults)
1018        self.assertEqual(_testcapi.function_get_defaults(some), new_defaults)
1019        self.assertEqual(some.__defaults__, new_defaults)
1020
1021        # `None` is special, it sets `defaults` to `NULL`,
1022        # it needs special handling in `_testcapi`:
1023        _testcapi.function_set_defaults(some, None)
1024        self.assertEqual(_testcapi.function_get_defaults(some), None)
1025        self.assertEqual(some.__defaults__, None)
1026
1027    def test_function_get_kw_defaults(self):
1028        def some(
1029            pos_only1, pos_only2='p',
1030            /,
1031            zero=0, optional=None,
1032            *,
1033            kw1,
1034            kw2=True,
1035        ):
1036            pass
1037
1038        defaults = _testcapi.function_get_kw_defaults(some)
1039        self.assertEqual(defaults, {'kw2': True})
1040        self.assertEqual(defaults, some.__kwdefaults__)
1041
1042        with self.assertRaises(SystemError):
1043            _testcapi.function_get_kw_defaults(None)  # not a function
1044
1045    def test_function_set_kw_defaults(self):
1046        def some(
1047            pos_only1, pos_only2='p',
1048            /,
1049            zero=0, optional=None,
1050            *,
1051            kw1,
1052            kw2=True,
1053        ):
1054            pass
1055
1056        old_defaults = {'kw2': True}
1057        self.assertEqual(_testcapi.function_get_kw_defaults(some), old_defaults)
1058        self.assertEqual(some.__kwdefaults__, old_defaults)
1059
1060        with self.assertRaises(SystemError):
1061            _testcapi.function_set_kw_defaults(some, 1)  # not dict or None
1062        self.assertEqual(_testcapi.function_get_kw_defaults(some), old_defaults)
1063        self.assertEqual(some.__kwdefaults__, old_defaults)
1064
1065        with self.assertRaises(SystemError):
1066            _testcapi.function_set_kw_defaults(1, {})    # not a function
1067        self.assertEqual(_testcapi.function_get_kw_defaults(some), old_defaults)
1068        self.assertEqual(some.__kwdefaults__, old_defaults)
1069
1070        new_defaults = {'kw2': (1, 2, 3)}
1071        _testcapi.function_set_kw_defaults(some, new_defaults)
1072        self.assertEqual(_testcapi.function_get_kw_defaults(some), new_defaults)
1073        self.assertEqual(some.__kwdefaults__, new_defaults)
1074
1075        # Empty dict is fine:
1076        new_defaults = {}
1077        _testcapi.function_set_kw_defaults(some, new_defaults)
1078        self.assertEqual(_testcapi.function_get_kw_defaults(some), new_defaults)
1079        self.assertEqual(some.__kwdefaults__, new_defaults)
1080
1081        class dictsub(dict): ...  # dict subclasses must work
1082
1083        new_defaults = dictsub({'kw2': None})
1084        _testcapi.function_set_kw_defaults(some, new_defaults)
1085        self.assertEqual(_testcapi.function_get_kw_defaults(some), new_defaults)
1086        self.assertEqual(some.__kwdefaults__, new_defaults)
1087
1088        # `None` is special, it sets `kwdefaults` to `NULL`,
1089        # it needs special handling in `_testcapi`:
1090        _testcapi.function_set_kw_defaults(some, None)
1091        self.assertEqual(_testcapi.function_get_kw_defaults(some), None)
1092        self.assertEqual(some.__kwdefaults__, None)
1093
1094    def test_unstable_gc_new_with_extra_data(self):
1095        class Data(_testcapi.ObjExtraData):
1096            __slots__ = ('x', 'y')
1097
1098        d = Data()
1099        d.x = 10
1100        d.y = 20
1101        d.extra = 30
1102        self.assertEqual(d.x, 10)
1103        self.assertEqual(d.y, 20)
1104        self.assertEqual(d.extra, 30)
1105        del d.extra
1106        self.assertIsNone(d.extra)
1107
1108    def test_get_type_name(self):
1109        class MyType:
1110            pass
1111
1112        from _testcapi import (
1113            get_type_name, get_type_qualname,
1114            get_type_fullyqualname, get_type_module_name)
1115
1116        from collections import OrderedDict
1117        ht = _testcapi.get_heaptype_for_name()
1118        for cls, fullname, modname, qualname, name in (
1119            (int,
1120             'int',
1121             'builtins',
1122             'int',
1123             'int'),
1124            (OrderedDict,
1125             'collections.OrderedDict',
1126             'collections',
1127             'OrderedDict',
1128             'OrderedDict'),
1129            (ht,
1130             '_testcapi.HeapTypeNameType',
1131             '_testcapi',
1132             'HeapTypeNameType',
1133             'HeapTypeNameType'),
1134            (MyType,
1135             f'{__name__}.CAPITest.test_get_type_name.<locals>.MyType',
1136             __name__,
1137             'CAPITest.test_get_type_name.<locals>.MyType',
1138             'MyType'),
1139        ):
1140            with self.subTest(cls=repr(cls)):
1141                self.assertEqual(get_type_fullyqualname(cls), fullname)
1142                self.assertEqual(get_type_module_name(cls), modname)
1143                self.assertEqual(get_type_qualname(cls), qualname)
1144                self.assertEqual(get_type_name(cls), name)
1145
1146        # override __module__
1147        ht.__module__ = 'test_module'
1148        self.assertEqual(get_type_fullyqualname(ht), 'test_module.HeapTypeNameType')
1149        self.assertEqual(get_type_module_name(ht), 'test_module')
1150        self.assertEqual(get_type_qualname(ht), 'HeapTypeNameType')
1151        self.assertEqual(get_type_name(ht), 'HeapTypeNameType')
1152
1153        # override __name__ and __qualname__
1154        MyType.__name__ = 'my_name'
1155        MyType.__qualname__ = 'my_qualname'
1156        self.assertEqual(get_type_fullyqualname(MyType), f'{__name__}.my_qualname')
1157        self.assertEqual(get_type_module_name(MyType), __name__)
1158        self.assertEqual(get_type_qualname(MyType), 'my_qualname')
1159        self.assertEqual(get_type_name(MyType), 'my_name')
1160
1161        # override also __module__
1162        MyType.__module__ = 'my_module'
1163        self.assertEqual(get_type_fullyqualname(MyType), 'my_module.my_qualname')
1164        self.assertEqual(get_type_module_name(MyType), 'my_module')
1165        self.assertEqual(get_type_qualname(MyType), 'my_qualname')
1166        self.assertEqual(get_type_name(MyType), 'my_name')
1167
1168        # PyType_GetFullyQualifiedName() ignores the module if it's "builtins"
1169        # or "__main__" of it is not a string
1170        MyType.__module__ = 'builtins'
1171        self.assertEqual(get_type_fullyqualname(MyType), 'my_qualname')
1172        MyType.__module__ = '__main__'
1173        self.assertEqual(get_type_fullyqualname(MyType), 'my_qualname')
1174        MyType.__module__ = 123
1175        self.assertEqual(get_type_fullyqualname(MyType), 'my_qualname')
1176
1177
1178    def test_gen_get_code(self):
1179        def genf(): yield
1180        gen = genf()
1181        self.assertEqual(_testcapi.gen_get_code(gen), gen.gi_code)
1182
1183    def test_pyeval_getlocals(self):
1184        # Test PyEval_GetLocals()
1185        x = 1
1186        self.assertEqual(_testcapi.pyeval_getlocals(),
1187            {'self': self,
1188             'x': 1})
1189
1190        y = 2
1191        self.assertEqual(_testcapi.pyeval_getlocals(),
1192            {'self': self,
1193             'x': 1,
1194             'y': 2})
1195
1196
1197@requires_limited_api
1198class TestHeapTypeRelative(unittest.TestCase):
1199    """Test API for extending opaque types (PEP 697)"""
1200
1201    @requires_limited_api
1202    def test_heaptype_relative_sizes(self):
1203        # Test subclassing using "relative" basicsize, see PEP 697
1204        def check(extra_base_size, extra_size):
1205            Base, Sub, instance, data_ptr, data_offset, data_size = (
1206                _testlimitedcapi.make_sized_heaptypes(
1207                    extra_base_size, -extra_size))
1208
1209            # no alignment shenanigans when inheriting directly
1210            if extra_size == 0:
1211                self.assertEqual(Base.__basicsize__, Sub.__basicsize__)
1212                self.assertEqual(data_size, 0)
1213
1214            else:
1215                # The following offsets should be in increasing order:
1216                offsets = [
1217                    (0, 'start of object'),
1218                    (Base.__basicsize__, 'end of base data'),
1219                    (data_offset, 'subclass data'),
1220                    (data_offset + extra_size, 'end of requested subcls data'),
1221                    (data_offset + data_size, 'end of reserved subcls data'),
1222                    (Sub.__basicsize__, 'end of object'),
1223                ]
1224                ordered_offsets = sorted(offsets, key=operator.itemgetter(0))
1225                self.assertEqual(
1226                    offsets, ordered_offsets,
1227                    msg=f'Offsets not in expected order, got: {ordered_offsets}')
1228
1229                # end of reserved subcls data == end of object
1230                self.assertEqual(Sub.__basicsize__, data_offset + data_size)
1231
1232                # we don't reserve (requested + alignment) or more data
1233                self.assertLess(data_size - extra_size,
1234                                _testlimitedcapi.ALIGNOF_MAX_ALIGN_T)
1235
1236            # The offsets/sizes we calculated should be aligned.
1237            self.assertEqual(data_offset % _testlimitedcapi.ALIGNOF_MAX_ALIGN_T, 0)
1238            self.assertEqual(data_size % _testlimitedcapi.ALIGNOF_MAX_ALIGN_T, 0)
1239
1240        sizes = sorted({0, 1, 2, 3, 4, 7, 8, 123,
1241                        object.__basicsize__,
1242                        object.__basicsize__-1,
1243                        object.__basicsize__+1})
1244        for extra_base_size in sizes:
1245            for extra_size in sizes:
1246                args = dict(extra_base_size=extra_base_size,
1247                            extra_size=extra_size)
1248                with self.subTest(**args):
1249                    check(**args)
1250
1251    def test_HeapCCollection(self):
1252        """Make sure HeapCCollection works properly by itself"""
1253        collection = _testcapi.HeapCCollection(1, 2, 3)
1254        self.assertEqual(list(collection), [1, 2, 3])
1255
1256    def test_heaptype_inherit_itemsize(self):
1257        """Test HeapCCollection subclasses work properly"""
1258        sizes = sorted({0, 1, 2, 3, 4, 7, 8, 123,
1259                        object.__basicsize__,
1260                        object.__basicsize__-1,
1261                        object.__basicsize__+1})
1262        for extra_size in sizes:
1263            with self.subTest(extra_size=extra_size):
1264                Sub = _testlimitedcapi.subclass_var_heaptype(
1265                    _testcapi.HeapCCollection, -extra_size, 0, 0)
1266                collection = Sub(1, 2, 3)
1267                collection.set_data_to_3s()
1268
1269                self.assertEqual(list(collection), [1, 2, 3])
1270                mem = collection.get_data()
1271                self.assertGreaterEqual(len(mem), extra_size)
1272                self.assertTrue(set(mem) <= {3}, f'got {mem!r}')
1273
1274    def test_heaptype_invalid_inheritance(self):
1275        with self.assertRaises(SystemError,
1276                               msg="Cannot extend variable-size class without "
1277                               + "Py_TPFLAGS_ITEMS_AT_END"):
1278            _testlimitedcapi.subclass_heaptype(int, -8, 0)
1279
1280    def test_heaptype_relative_members(self):
1281        """Test HeapCCollection subclasses work properly"""
1282        sizes = sorted({0, 1, 2, 3, 4, 7, 8, 123,
1283                        object.__basicsize__,
1284                        object.__basicsize__-1,
1285                        object.__basicsize__+1})
1286        for extra_base_size in sizes:
1287            for extra_size in sizes:
1288                for offset in sizes:
1289                    with self.subTest(extra_base_size=extra_base_size, extra_size=extra_size, offset=offset):
1290                        if offset < extra_size:
1291                            Sub = _testlimitedcapi.make_heaptype_with_member(
1292                                extra_base_size, -extra_size, offset, True)
1293                            Base = Sub.mro()[1]
1294                            instance = Sub()
1295                            self.assertEqual(instance.memb, instance.get_memb())
1296                            instance.set_memb(13)
1297                            self.assertEqual(instance.memb, instance.get_memb())
1298                            self.assertEqual(instance.get_memb(), 13)
1299                            instance.memb = 14
1300                            self.assertEqual(instance.memb, instance.get_memb())
1301                            self.assertEqual(instance.get_memb(), 14)
1302                            self.assertGreaterEqual(instance.get_memb_offset(), Base.__basicsize__)
1303                            self.assertLess(instance.get_memb_offset(), Sub.__basicsize__)
1304                            with self.assertRaises(SystemError):
1305                                instance.get_memb_relative()
1306                            with self.assertRaises(SystemError):
1307                                instance.set_memb_relative(0)
1308                        else:
1309                            with self.assertRaises(SystemError):
1310                                Sub = _testlimitedcapi.make_heaptype_with_member(
1311                                    extra_base_size, -extra_size, offset, True)
1312                        with self.assertRaises(SystemError):
1313                            Sub = _testlimitedcapi.make_heaptype_with_member(
1314                                extra_base_size, extra_size, offset, True)
1315                with self.subTest(extra_base_size=extra_base_size, extra_size=extra_size):
1316                    with self.assertRaises(SystemError):
1317                        Sub = _testlimitedcapi.make_heaptype_with_member(
1318                            extra_base_size, -extra_size, -1, True)
1319
1320    def test_heaptype_relative_members_errors(self):
1321        with self.assertRaisesRegex(
1322                SystemError,
1323                r"With Py_RELATIVE_OFFSET, basicsize must be negative"):
1324            _testlimitedcapi.make_heaptype_with_member(0, 1234, 0, True)
1325        with self.assertRaisesRegex(
1326                SystemError, r"Member offset out of range \(0\.\.-basicsize\)"):
1327            _testlimitedcapi.make_heaptype_with_member(0, -8, 1234, True)
1328        with self.assertRaisesRegex(
1329                SystemError, r"Member offset out of range \(0\.\.-basicsize\)"):
1330            _testlimitedcapi.make_heaptype_with_member(0, -8, -1, True)
1331
1332        Sub = _testlimitedcapi.make_heaptype_with_member(0, -8, 0, True)
1333        instance = Sub()
1334        with self.assertRaisesRegex(
1335                SystemError, r"PyMember_GetOne used with Py_RELATIVE_OFFSET"):
1336            instance.get_memb_relative()
1337        with self.assertRaisesRegex(
1338                SystemError, r"PyMember_SetOne used with Py_RELATIVE_OFFSET"):
1339            instance.set_memb_relative(0)
1340
1341    def test_pyobject_getitemdata_error(self):
1342        """Test PyObject_GetItemData fails on unsupported types"""
1343        with self.assertRaises(TypeError):
1344            # None is not variable-length
1345            _testcapi.pyobject_getitemdata(None)
1346        with self.assertRaises(TypeError):
1347            # int is variable-length, but doesn't have the
1348            # Py_TPFLAGS_ITEMS_AT_END layout (and flag)
1349            _testcapi.pyobject_getitemdata(0)
1350
1351
1352    def test_function_get_closure(self):
1353        from types import CellType
1354
1355        def regular_function(): ...
1356        def unused_one_level(arg1):
1357            def inner(arg2, arg3): ...
1358            return inner
1359        def unused_two_levels(arg1, arg2):
1360            def decorator(arg3, arg4):
1361                def inner(arg5, arg6): ...
1362                return inner
1363            return decorator
1364        def with_one_level(arg1):
1365            def inner(arg2, arg3):
1366                return arg1 + arg2 + arg3
1367            return inner
1368        def with_two_levels(arg1, arg2):
1369            def decorator(arg3, arg4):
1370                def inner(arg5, arg6):
1371                    return arg1 + arg2 + arg3 + arg4 + arg5 + arg6
1372                return inner
1373            return decorator
1374
1375        # Functions without closures:
1376        self.assertIsNone(_testcapi.function_get_closure(regular_function))
1377        self.assertIsNone(regular_function.__closure__)
1378
1379        func = unused_one_level(1)
1380        closure = _testcapi.function_get_closure(func)
1381        self.assertIsNone(closure)
1382        self.assertIsNone(func.__closure__)
1383
1384        func = unused_two_levels(1, 2)(3, 4)
1385        closure = _testcapi.function_get_closure(func)
1386        self.assertIsNone(closure)
1387        self.assertIsNone(func.__closure__)
1388
1389        # Functions with closures:
1390        func = with_one_level(5)
1391        closure = _testcapi.function_get_closure(func)
1392        self.assertEqual(closure, func.__closure__)
1393        self.assertIsInstance(closure, tuple)
1394        self.assertEqual(len(closure), 1)
1395        self.assertEqual(len(closure), len(func.__code__.co_freevars))
1396        self.assertTrue(all(isinstance(cell, CellType) for cell in closure))
1397        self.assertTrue(closure[0].cell_contents, 5)
1398
1399        func = with_two_levels(1, 2)(3, 4)
1400        closure = _testcapi.function_get_closure(func)
1401        self.assertEqual(closure, func.__closure__)
1402        self.assertIsInstance(closure, tuple)
1403        self.assertEqual(len(closure), 4)
1404        self.assertEqual(len(closure), len(func.__code__.co_freevars))
1405        self.assertTrue(all(isinstance(cell, CellType) for cell in closure))
1406        self.assertEqual([cell.cell_contents for cell in closure],
1407                         [1, 2, 3, 4])
1408
1409    def test_function_get_closure_error(self):
1410        with self.assertRaises(SystemError):
1411            _testcapi.function_get_closure(1)
1412        with self.assertRaises(SystemError):
1413            _testcapi.function_get_closure(None)
1414
1415    def test_function_set_closure(self):
1416        from types import CellType
1417
1418        def function_without_closure(): ...
1419        def function_with_closure(arg):
1420            def inner():
1421                return arg
1422            return inner
1423
1424        func = function_without_closure
1425        _testcapi.function_set_closure(func, (CellType(1), CellType(1)))
1426        closure = _testcapi.function_get_closure(func)
1427        self.assertEqual([c.cell_contents for c in closure], [1, 1])
1428        self.assertEqual([c.cell_contents for c in func.__closure__], [1, 1])
1429
1430        func = function_with_closure(1)
1431        _testcapi.function_set_closure(func,
1432                                       (CellType(1), CellType(2), CellType(3)))
1433        closure = _testcapi.function_get_closure(func)
1434        self.assertEqual([c.cell_contents for c in closure], [1, 2, 3])
1435        self.assertEqual([c.cell_contents for c in func.__closure__], [1, 2, 3])
1436
1437    def test_function_set_closure_none(self):
1438        def function_without_closure(): ...
1439        def function_with_closure(arg):
1440            def inner():
1441                return arg
1442            return inner
1443
1444        _testcapi.function_set_closure(function_without_closure, None)
1445        self.assertIsNone(
1446            _testcapi.function_get_closure(function_without_closure))
1447        self.assertIsNone(function_without_closure.__closure__)
1448
1449        _testcapi.function_set_closure(function_with_closure, None)
1450        self.assertIsNone(
1451            _testcapi.function_get_closure(function_with_closure))
1452        self.assertIsNone(function_with_closure.__closure__)
1453
1454    def test_function_set_closure_errors(self):
1455        def function_without_closure(): ...
1456
1457        with self.assertRaises(SystemError):
1458            _testcapi.function_set_closure(None, ())  # not a function
1459
1460        with self.assertRaises(SystemError):
1461            _testcapi.function_set_closure(function_without_closure, 1)
1462        self.assertIsNone(function_without_closure.__closure__)  # no change
1463
1464        # NOTE: this works, but goes against the docs:
1465        _testcapi.function_set_closure(function_without_closure, (1, 2))
1466        self.assertEqual(
1467            _testcapi.function_get_closure(function_without_closure), (1, 2))
1468        self.assertEqual(function_without_closure.__closure__, (1, 2))
1469
1470
1471class TestPendingCalls(unittest.TestCase):
1472
1473    # See the comment in ceval.c (at the "handle_eval_breaker" label)
1474    # about when pending calls get run.  This is especially relevant
1475    # here for creating deterministic tests.
1476
1477    def main_pendingcalls_submit(self, l, n):
1478        def callback():
1479            #this function can be interrupted by thread switching so let's
1480            #use an atomic operation
1481            l.append(None)
1482
1483        for i in range(n):
1484            time.sleep(random.random()*0.02) #0.01 secs on average
1485            #try submitting callback until successful.
1486            #rely on regular interrupt to flush queue if we are
1487            #unsuccessful.
1488            while True:
1489                if _testcapi._pending_threadfunc(callback):
1490                    break
1491
1492    def pendingcalls_submit(self, l, n, *, main=True, ensure=False):
1493        def callback():
1494            #this function can be interrupted by thread switching so let's
1495            #use an atomic operation
1496            l.append(None)
1497
1498        if main:
1499            return _testcapi._pending_threadfunc(callback, n,
1500                                                 blocking=False,
1501                                                 ensure_added=ensure)
1502        else:
1503            return _testinternalcapi.pending_threadfunc(callback, n,
1504                                                        blocking=False,
1505                                                        ensure_added=ensure)
1506
1507    def pendingcalls_wait(self, l, numadded, context = None):
1508        #now, stick around until l[0] has grown to 10
1509        count = 0
1510        while len(l) != numadded:
1511            #this busy loop is where we expect to be interrupted to
1512            #run our callbacks.  Note that some callbacks are only run on the
1513            #main thread
1514            if False and support.verbose:
1515                print("(%i)"%(len(l),),)
1516            for i in range(1000):
1517                a = i*i
1518            if context and not context.event.is_set():
1519                continue
1520            count += 1
1521            self.assertTrue(count < 10000,
1522                "timeout waiting for %i callbacks, got %i"%(numadded, len(l)))
1523        if False and support.verbose:
1524            print("(%i)"%(len(l),))
1525
1526    @threading_helper.requires_working_threading()
1527    def test_main_pendingcalls_threaded(self):
1528
1529        #do every callback on a separate thread
1530        n = 32 #total callbacks
1531        threads = []
1532        class foo(object):pass
1533        context = foo()
1534        context.l = []
1535        context.n = 2 #submits per thread
1536        context.nThreads = n // context.n
1537        context.nFinished = 0
1538        context.lock = threading.Lock()
1539        context.event = threading.Event()
1540
1541        threads = [threading.Thread(target=self.main_pendingcalls_thread,
1542                                    args=(context,))
1543                   for i in range(context.nThreads)]
1544        with threading_helper.start_threads(threads):
1545            self.pendingcalls_wait(context.l, n, context)
1546
1547    def main_pendingcalls_thread(self, context):
1548        try:
1549            self.main_pendingcalls_submit(context.l, context.n)
1550        finally:
1551            with context.lock:
1552                context.nFinished += 1
1553                nFinished = context.nFinished
1554                if False and support.verbose:
1555                    print("finished threads: ", nFinished)
1556            if nFinished == context.nThreads:
1557                context.event.set()
1558
1559    def test_main_pendingcalls_non_threaded(self):
1560        #again, just using the main thread, likely they will all be dispatched at
1561        #once.  It is ok to ask for too many, because we loop until we find a slot.
1562        #the loop can be interrupted to dispatch.
1563        #there are only 32 dispatch slots, so we go for twice that!
1564        l = []
1565        n = 64
1566        self.main_pendingcalls_submit(l, n)
1567        self.pendingcalls_wait(l, n)
1568
1569    def test_max_pending(self):
1570        with self.subTest('main-only'):
1571            maxpending = 32
1572
1573            l = []
1574            added = self.pendingcalls_submit(l, 1, main=True)
1575            self.pendingcalls_wait(l, added)
1576            self.assertEqual(added, 1)
1577
1578            l = []
1579            added = self.pendingcalls_submit(l, maxpending, main=True)
1580            self.pendingcalls_wait(l, added)
1581            self.assertEqual(added, maxpending)
1582
1583            l = []
1584            added = self.pendingcalls_submit(l, maxpending+1, main=True)
1585            self.pendingcalls_wait(l, added)
1586            self.assertEqual(added, maxpending)
1587
1588        with self.subTest('not main-only'):
1589            # Per-interpreter pending calls has a much higher limit
1590            # on how many may be pending at a time.
1591            maxpending = 300
1592
1593            l = []
1594            added = self.pendingcalls_submit(l, 1, main=False)
1595            self.pendingcalls_wait(l, added)
1596            self.assertEqual(added, 1)
1597
1598            l = []
1599            added = self.pendingcalls_submit(l, maxpending, main=False)
1600            self.pendingcalls_wait(l, added)
1601            self.assertEqual(added, maxpending)
1602
1603            l = []
1604            added = self.pendingcalls_submit(l, maxpending+1, main=False)
1605            self.pendingcalls_wait(l, added)
1606            self.assertEqual(added, maxpending)
1607
1608    class PendingTask(types.SimpleNamespace):
1609
1610        _add_pending = _testinternalcapi.pending_threadfunc
1611
1612        def __init__(self, req, taskid=None, notify_done=None):
1613            self.id = taskid
1614            self.req = req
1615            self.notify_done = notify_done
1616
1617            self.creator_tid = threading.get_ident()
1618            self.requester_tid = None
1619            self.runner_tid = None
1620            self.result = None
1621
1622        def run(self):
1623            assert self.result is None
1624            self.runner_tid = threading.get_ident()
1625            self._run()
1626            if self.notify_done is not None:
1627                self.notify_done()
1628
1629        def _run(self):
1630            self.result = self.req
1631
1632        def run_in_pending_call(self, worker_tids):
1633            assert self._add_pending is _testinternalcapi.pending_threadfunc
1634            self.requester_tid = threading.get_ident()
1635            def callback():
1636                assert self.result is None
1637                # It can be tricky to control which thread handles
1638                # the eval breaker, so we take a naive approach to
1639                # make sure.
1640                if threading.get_ident() not in worker_tids:
1641                    self._add_pending(callback, ensure_added=True)
1642                    return
1643                self.run()
1644            self._add_pending(callback, ensure_added=True)
1645
1646        def create_thread(self, worker_tids):
1647            return threading.Thread(
1648                target=self.run_in_pending_call,
1649                args=(worker_tids,),
1650            )
1651
1652        def wait_for_result(self):
1653            while self.result is None:
1654                time.sleep(0.01)
1655
1656    @threading_helper.requires_working_threading()
1657    def test_subthreads_can_handle_pending_calls(self):
1658        payload = 'Spam spam spam spam. Lovely spam! Wonderful spam!'
1659
1660        task = self.PendingTask(payload)
1661        def do_the_work():
1662            tid = threading.get_ident()
1663            t = task.create_thread({tid})
1664            with threading_helper.start_threads([t]):
1665                task.wait_for_result()
1666        t = threading.Thread(target=do_the_work)
1667        with threading_helper.start_threads([t]):
1668            pass
1669
1670        self.assertEqual(task.result, payload)
1671
1672    @threading_helper.requires_working_threading()
1673    def test_many_subthreads_can_handle_pending_calls(self):
1674        main_tid = threading.get_ident()
1675        self.assertEqual(threading.main_thread().ident, main_tid)
1676
1677        # We can't use queue.Queue since it isn't reentrant relative
1678        # to pending calls.
1679        _queue = deque()
1680        _active = deque()
1681        _done_lock = threading.Lock()
1682        def queue_put(task):
1683            _queue.append(task)
1684            _active.append(True)
1685        def queue_get():
1686            try:
1687                task = _queue.popleft()
1688            except IndexError:
1689                raise queue.Empty
1690            return task
1691        def queue_task_done():
1692            _active.pop()
1693            if not _active:
1694                try:
1695                    _done_lock.release()
1696                except RuntimeError:
1697                    assert not _done_lock.locked()
1698        def queue_empty():
1699            return not _queue
1700        def queue_join():
1701            _done_lock.acquire()
1702            _done_lock.release()
1703
1704        tasks = []
1705        for i in range(20):
1706            task = self.PendingTask(
1707                req=f'request {i}',
1708                taskid=i,
1709                notify_done=queue_task_done,
1710            )
1711            tasks.append(task)
1712            queue_put(task)
1713        # This will be released once all the tasks have finished.
1714        _done_lock.acquire()
1715
1716        def add_tasks(worker_tids):
1717            while True:
1718                if done:
1719                    return
1720                try:
1721                    task = queue_get()
1722                except queue.Empty:
1723                    break
1724                task.run_in_pending_call(worker_tids)
1725
1726        done = False
1727        def run_tasks():
1728            while not queue_empty():
1729                if done:
1730                    return
1731                time.sleep(0.01)
1732            # Give the worker a chance to handle any remaining pending calls.
1733            while not done:
1734                time.sleep(0.01)
1735
1736        # Start the workers and wait for them to finish.
1737        worker_threads = [threading.Thread(target=run_tasks)
1738                          for _ in range(3)]
1739        with threading_helper.start_threads(worker_threads):
1740            try:
1741                # Add a pending call for each task.
1742                worker_tids = [t.ident for t in worker_threads]
1743                threads = [threading.Thread(target=add_tasks, args=(worker_tids,))
1744                           for _ in range(3)]
1745                with threading_helper.start_threads(threads):
1746                    try:
1747                        pass
1748                    except BaseException:
1749                        done = True
1750                        raise  # re-raise
1751                # Wait for the pending calls to finish.
1752                queue_join()
1753                # Notify the workers that they can stop.
1754                done = True
1755            except BaseException:
1756                done = True
1757                raise  # re-raise
1758        runner_tids = [t.runner_tid for t in tasks]
1759
1760        self.assertNotIn(main_tid, runner_tids)
1761        for task in tasks:
1762            with self.subTest(f'task {task.id}'):
1763                self.assertNotEqual(task.requester_tid, main_tid)
1764                self.assertNotEqual(task.requester_tid, task.runner_tid)
1765                self.assertNotIn(task.requester_tid, runner_tids)
1766
1767    @requires_subinterpreters
1768    def test_isolated_subinterpreter(self):
1769        # We exercise the most important permutations.
1770
1771        # This test relies on pending calls getting called
1772        # (eval breaker tripped) at each loop iteration
1773        # and at each call.
1774
1775        maxtext = 250
1776        main_interpid = 0
1777        interpid = _interpreters.create()
1778        self.addCleanup(lambda: _interpreters.destroy(interpid))
1779        _interpreters.run_string(interpid, f"""if True:
1780            import json
1781            import os
1782            import threading
1783            import time
1784            import _testinternalcapi
1785            from test.support import threading_helper
1786            """)
1787
1788        def create_pipe():
1789            r, w = os.pipe()
1790            self.addCleanup(lambda: os.close(r))
1791            self.addCleanup(lambda: os.close(w))
1792            return r, w
1793
1794        with self.subTest('add in main, run in subinterpreter'):
1795            r_ready, w_ready = create_pipe()
1796            r_done, w_done= create_pipe()
1797            timeout = time.time() + 30  # seconds
1798
1799            def do_work():
1800                _interpreters.run_string(interpid, f"""if True:
1801                    # Wait until this interp has handled the pending call.
1802                    waiting = False
1803                    done = False
1804                    def wait(os_read=os.read):
1805                        global done, waiting
1806                        waiting = True
1807                        os_read({r_done}, 1)
1808                        done = True
1809                    t = threading.Thread(target=wait)
1810                    with threading_helper.start_threads([t]):
1811                        while not waiting:
1812                            pass
1813                        os.write({w_ready}, b'\\0')
1814                        # Loop to trigger the eval breaker.
1815                        while not done:
1816                            time.sleep(0.01)
1817                            if time.time() > {timeout}:
1818                                raise Exception('timed out!')
1819                    """)
1820            t = threading.Thread(target=do_work)
1821            with threading_helper.start_threads([t]):
1822                os.read(r_ready, 1)
1823                # Add the pending call and wait for it to finish.
1824                actual = _testinternalcapi.pending_identify(interpid)
1825                # Signal the subinterpreter to stop.
1826                os.write(w_done, b'\0')
1827
1828            self.assertEqual(actual, int(interpid))
1829
1830        with self.subTest('add in main, run in subinterpreter sub-thread'):
1831            r_ready, w_ready = create_pipe()
1832            r_done, w_done= create_pipe()
1833            timeout = time.time() + 30  # seconds
1834
1835            def do_work():
1836                _interpreters.run_string(interpid, f"""if True:
1837                    waiting = False
1838                    done = False
1839                    def subthread():
1840                        while not waiting:
1841                            pass
1842                        os.write({w_ready}, b'\\0')
1843                        # Loop to trigger the eval breaker.
1844                        while not done:
1845                            time.sleep(0.01)
1846                            if time.time() > {timeout}:
1847                                raise Exception('timed out!')
1848                    t = threading.Thread(target=subthread)
1849                    with threading_helper.start_threads([t]):
1850                        # Wait until this interp has handled the pending call.
1851                        waiting = True
1852                        os.read({r_done}, 1)
1853                        done = True
1854                    """)
1855            t = threading.Thread(target=do_work)
1856            with threading_helper.start_threads([t]):
1857                os.read(r_ready, 1)
1858                # Add the pending call and wait for it to finish.
1859                actual = _testinternalcapi.pending_identify(interpid)
1860                # Signal the subinterpreter to stop.
1861                os.write(w_done, b'\0')
1862
1863            self.assertEqual(actual, int(interpid))
1864
1865        with self.subTest('add in subinterpreter, run in main'):
1866            r_ready, w_ready = create_pipe()
1867            r_done, w_done= create_pipe()
1868            r_data, w_data= create_pipe()
1869            timeout = time.time() + 30  # seconds
1870
1871            def add_job():
1872                os.read(r_ready, 1)
1873                _interpreters.run_string(interpid, f"""if True:
1874                    # Add the pending call and wait for it to finish.
1875                    actual = _testinternalcapi.pending_identify({main_interpid})
1876                    # Signal the subinterpreter to stop.
1877                    os.write({w_done}, b'\\0')
1878                    os.write({w_data}, actual.to_bytes(1, 'little'))
1879                    """)
1880            # Wait until this interp has handled the pending call.
1881            waiting = False
1882            done = False
1883            def wait(os_read=os.read):
1884                nonlocal done, waiting
1885                waiting = True
1886                os_read(r_done, 1)
1887                done = True
1888            t1 = threading.Thread(target=add_job)
1889            t2 = threading.Thread(target=wait)
1890            with threading_helper.start_threads([t1, t2]):
1891                while not waiting:
1892                    pass
1893                os.write(w_ready, b'\0')
1894                # Loop to trigger the eval breaker.
1895                while not done:
1896                    time.sleep(0.01)
1897                    if time.time() > timeout:
1898                        raise Exception('timed out!')
1899                text = os.read(r_data, 1)
1900            actual = int.from_bytes(text, 'little')
1901
1902            self.assertEqual(actual, int(main_interpid))
1903
1904        with self.subTest('add in subinterpreter, run in sub-thread'):
1905            r_ready, w_ready = create_pipe()
1906            r_done, w_done= create_pipe()
1907            r_data, w_data= create_pipe()
1908            timeout = time.time() + 30  # seconds
1909
1910            def add_job():
1911                os.read(r_ready, 1)
1912                _interpreters.run_string(interpid, f"""if True:
1913                    # Add the pending call and wait for it to finish.
1914                    actual = _testinternalcapi.pending_identify({main_interpid})
1915                    # Signal the subinterpreter to stop.
1916                    os.write({w_done}, b'\\0')
1917                    os.write({w_data}, actual.to_bytes(1, 'little'))
1918                    """)
1919            # Wait until this interp has handled the pending call.
1920            waiting = False
1921            done = False
1922            def wait(os_read=os.read):
1923                nonlocal done, waiting
1924                waiting = True
1925                os_read(r_done, 1)
1926                done = True
1927            def subthread():
1928                while not waiting:
1929                    pass
1930                os.write(w_ready, b'\0')
1931                # Loop to trigger the eval breaker.
1932                while not done:
1933                    time.sleep(0.01)
1934                    if time.time() > timeout:
1935                        raise Exception('timed out!')
1936            t1 = threading.Thread(target=add_job)
1937            t2 = threading.Thread(target=wait)
1938            t3 = threading.Thread(target=subthread)
1939            with threading_helper.start_threads([t1, t2, t3]):
1940                pass
1941            text = os.read(r_data, 1)
1942            actual = int.from_bytes(text, 'little')
1943
1944            self.assertEqual(actual, int(main_interpid))
1945
1946        # XXX We can't use the rest until gh-105716 is fixed.
1947        return
1948
1949        with self.subTest('add in subinterpreter, run in subinterpreter sub-thread'):
1950            r_ready, w_ready = create_pipe()
1951            r_done, w_done= create_pipe()
1952            r_data, w_data= create_pipe()
1953            timeout = time.time() + 30  # seconds
1954
1955            def do_work():
1956                _interpreters.run_string(interpid, f"""if True:
1957                    waiting = False
1958                    done = False
1959                    def subthread():
1960                        while not waiting:
1961                            pass
1962                        os.write({w_ready}, b'\\0')
1963                        # Loop to trigger the eval breaker.
1964                        while not done:
1965                            time.sleep(0.01)
1966                            if time.time() > {timeout}:
1967                                raise Exception('timed out!')
1968                    t = threading.Thread(target=subthread)
1969                    with threading_helper.start_threads([t]):
1970                        # Wait until this interp has handled the pending call.
1971                        waiting = True
1972                        os.read({r_done}, 1)
1973                        done = True
1974                    """)
1975            t = threading.Thread(target=do_work)
1976            #with threading_helper.start_threads([t]):
1977            t.start()
1978            if True:
1979                os.read(r_ready, 1)
1980                _interpreters.run_string(interpid, f"""if True:
1981                    # Add the pending call and wait for it to finish.
1982                    actual = _testinternalcapi.pending_identify({interpid})
1983                    # Signal the subinterpreter to stop.
1984                    os.write({w_done}, b'\\0')
1985                    os.write({w_data}, actual.to_bytes(1, 'little'))
1986                    """)
1987            t.join()
1988            text = os.read(r_data, 1)
1989            actual = int.from_bytes(text, 'little')
1990
1991            self.assertEqual(actual, int(interpid))
1992
1993
1994class SubinterpreterTest(unittest.TestCase):
1995
1996    @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
1997    def test_subinterps(self):
1998        import builtins
1999        r, w = os.pipe()
2000        code = """if 1:
2001            import sys, builtins, pickle
2002            with open({:d}, "wb") as f:
2003                pickle.dump(id(sys.modules), f)
2004                pickle.dump(id(builtins), f)
2005            """.format(w)
2006        with open(r, "rb") as f:
2007            ret = support.run_in_subinterp(code)
2008            self.assertEqual(ret, 0)
2009            self.assertNotEqual(pickle.load(f), id(sys.modules))
2010            self.assertNotEqual(pickle.load(f), id(builtins))
2011
2012    @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
2013    def test_subinterps_recent_language_features(self):
2014        r, w = os.pipe()
2015        code = """if 1:
2016            import pickle
2017            with open({:d}, "wb") as f:
2018
2019                @(lambda x:x)  # Py 3.9
2020                def noop(x): return x
2021
2022                a = (b := f'1{{2}}3') + noop('x')  # Py 3.8 (:=) / 3.6 (f'')
2023
2024                async def foo(arg): return await arg  # Py 3.5
2025
2026                pickle.dump(dict(a=a, b=b), f)
2027            """.format(w)
2028
2029        with open(r, "rb") as f:
2030            ret = support.run_in_subinterp(code)
2031            self.assertEqual(ret, 0)
2032            self.assertEqual(pickle.load(f), {'a': '123x', 'b': '123'})
2033
2034    def test_py_config_isoloated_per_interpreter(self):
2035        # A config change in one interpreter must not leak to out to others.
2036        #
2037        # This test could verify ANY config value, it just happens to have been
2038        # written around the time of int_max_str_digits. Refactoring is okay.
2039        code = """if 1:
2040        import sys, _testinternalcapi
2041
2042        # Any config value would do, this happens to be the one being
2043        # double checked at the time this test was written.
2044        config = _testinternalcapi.get_config()
2045        config['int_max_str_digits'] = 55555
2046        config['parse_argv'] = 0
2047        _testinternalcapi.set_config(config)
2048        sub_value = _testinternalcapi.get_config()['int_max_str_digits']
2049        assert sub_value == 55555, sub_value
2050        """
2051        before_config = _testinternalcapi.get_config()
2052        assert before_config['int_max_str_digits'] != 55555
2053        self.assertEqual(support.run_in_subinterp(code), 0,
2054                         'subinterp code failure, check stderr.')
2055        after_config = _testinternalcapi.get_config()
2056        self.assertIsNot(
2057                before_config, after_config,
2058                "Expected get_config() to return a new dict on each call")
2059        self.assertEqual(before_config, after_config,
2060                         "CAUTION: Tests executed after this may be "
2061                         "running under an altered config.")
2062        # try:...finally: calling set_config(before_config) not done
2063        # as that results in sys.argv, sys.path, and sys.warnoptions
2064        # "being modified by test_capi" per test.regrtest.  So if this
2065        # test fails, assume that the environment in this process may
2066        # be altered and suspect.
2067
2068    @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
2069    def test_configured_settings(self):
2070        """
2071        The config with which an interpreter is created corresponds
2072        1-to-1 with the new interpreter's settings.  This test verifies
2073        that they match.
2074        """
2075
2076        OBMALLOC = 1<<5
2077        EXTENSIONS = 1<<8
2078        THREADS = 1<<10
2079        DAEMON_THREADS = 1<<11
2080        FORK = 1<<15
2081        EXEC = 1<<16
2082        ALL_FLAGS = (OBMALLOC | FORK | EXEC | THREADS | DAEMON_THREADS
2083                     | EXTENSIONS);
2084
2085        features = [
2086            'obmalloc',
2087            'fork',
2088            'exec',
2089            'threads',
2090            'daemon_threads',
2091            'extensions',
2092            'own_gil',
2093        ]
2094        kwlist = [f'allow_{n}' for n in features]
2095        kwlist[0] = 'use_main_obmalloc'
2096        kwlist[-2] = 'check_multi_interp_extensions'
2097        kwlist[-1] = 'own_gil'
2098
2099        expected_to_work = {
2100            (True, True, True, True, True, True, True):
2101                (ALL_FLAGS, True),
2102            (True, False, False, False, False, False, False):
2103                (OBMALLOC, False),
2104            (False, False, False, True, False, True, False):
2105                (THREADS | EXTENSIONS, False),
2106        }
2107
2108        expected_to_fail = {
2109            (False, False, False, False, False, False, False),
2110        }
2111
2112        # gh-117649: The free-threaded build does not currently allow
2113        # setting check_multi_interp_extensions to False.
2114        if Py_GIL_DISABLED:
2115            for config in list(expected_to_work.keys()):
2116                kwargs = dict(zip(kwlist, config))
2117                if not kwargs['check_multi_interp_extensions']:
2118                    del expected_to_work[config]
2119                    expected_to_fail.add(config)
2120
2121        # expected to work
2122        for config, expected in expected_to_work.items():
2123            kwargs = dict(zip(kwlist, config))
2124            exp_flags, exp_gil = expected
2125            expected = {
2126                'feature_flags': exp_flags,
2127                'own_gil': exp_gil,
2128            }
2129            with self.subTest(config):
2130                r, w = os.pipe()
2131                script = textwrap.dedent(f'''
2132                    import _testinternalcapi, json, os
2133                    settings = _testinternalcapi.get_interp_settings()
2134                    with os.fdopen({w}, "w") as stdin:
2135                        json.dump(settings, stdin)
2136                    ''')
2137                with os.fdopen(r) as stdout:
2138                    ret = support.run_in_subinterp_with_config(script, **kwargs)
2139                    self.assertEqual(ret, 0)
2140                    out = stdout.read()
2141                settings = json.loads(out)
2142
2143                self.assertEqual(settings, expected)
2144
2145        # expected to fail
2146        for config in expected_to_fail:
2147            kwargs = dict(zip(kwlist, config))
2148            with self.subTest(config):
2149                script = textwrap.dedent(f'''
2150                    import _testinternalcapi
2151                    _testinternalcapi.get_interp_settings()
2152                    raise NotImplementedError('unreachable')
2153                    ''')
2154                with self.assertRaises(_interpreters.InterpreterError):
2155                    support.run_in_subinterp_with_config(script, **kwargs)
2156
2157    @unittest.skipIf(_testsinglephase is None, "test requires _testsinglephase module")
2158    @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
2159    # gh-117649: The free-threaded build does not currently allow overriding
2160    # the check_multi_interp_extensions setting.
2161    @expected_failure_if_gil_disabled()
2162    def test_overridden_setting_extensions_subinterp_check(self):
2163        """
2164        PyInterpreterConfig.check_multi_interp_extensions can be overridden
2165        with PyInterpreterState.override_multi_interp_extensions_check.
2166        This verifies that the override works but does not modify
2167        the underlying setting.
2168        """
2169
2170        OBMALLOC = 1<<5
2171        EXTENSIONS = 1<<8
2172        THREADS = 1<<10
2173        DAEMON_THREADS = 1<<11
2174        FORK = 1<<15
2175        EXEC = 1<<16
2176        BASE_FLAGS = OBMALLOC | FORK | EXEC | THREADS | DAEMON_THREADS
2177        base_kwargs = {
2178            'use_main_obmalloc': True,
2179            'allow_fork': True,
2180            'allow_exec': True,
2181            'allow_threads': True,
2182            'allow_daemon_threads': True,
2183            'own_gil': False,
2184        }
2185
2186        def check(enabled, override):
2187            kwargs = dict(
2188                base_kwargs,
2189                check_multi_interp_extensions=enabled,
2190            )
2191            flags = BASE_FLAGS | EXTENSIONS if enabled else BASE_FLAGS
2192            settings = {
2193                'feature_flags': flags,
2194                'own_gil': False,
2195            }
2196
2197            expected = {
2198                'requested': override,
2199                'override__initial': 0,
2200                'override_after': override,
2201                'override_restored': 0,
2202                # The override should not affect the config or settings.
2203                'settings__initial': settings,
2204                'settings_after': settings,
2205                'settings_restored': settings,
2206                # These are the most likely values to be wrong.
2207                'allowed__initial': not enabled,
2208                'allowed_after': not ((override > 0) if override else enabled),
2209                'allowed_restored': not enabled,
2210            }
2211
2212            r, w = os.pipe()
2213            if Py_GIL_DISABLED:
2214                # gh-117649: The test fails before `w` is closed
2215                self.addCleanup(os.close, w)
2216            script = textwrap.dedent(f'''
2217                from test.test_capi.check_config import run_singlephase_check
2218                run_singlephase_check({override}, {w})
2219                ''')
2220            with os.fdopen(r) as stdout:
2221                ret = support.run_in_subinterp_with_config(script, **kwargs)
2222                self.assertEqual(ret, 0)
2223                out = stdout.read()
2224            results = json.loads(out)
2225
2226            self.assertEqual(results, expected)
2227
2228        self.maxDiff = None
2229
2230        # setting: check disabled
2231        with self.subTest('config: check disabled; override: disabled'):
2232            check(False, -1)
2233        with self.subTest('config: check disabled; override: use config'):
2234            check(False, 0)
2235        with self.subTest('config: check disabled; override: enabled'):
2236            check(False, 1)
2237
2238        # setting: check enabled
2239        with self.subTest('config: check enabled; override: disabled'):
2240            check(True, -1)
2241        with self.subTest('config: check enabled; override: use config'):
2242            check(True, 0)
2243        with self.subTest('config: check enabled; override: enabled'):
2244            check(True, 1)
2245
2246    def test_mutate_exception(self):
2247        """
2248        Exceptions saved in global module state get shared between
2249        individual module instances. This test checks whether or not
2250        a change in one interpreter's module gets reflected into the
2251        other ones.
2252        """
2253        import binascii
2254
2255        support.run_in_subinterp("import binascii; binascii.Error.foobar = 'foobar'")
2256
2257        self.assertFalse(hasattr(binascii.Error, "foobar"))
2258
2259    @unittest.skipIf(_testmultiphase is None, "test requires _testmultiphase module")
2260    # gh-117649: The free-threaded build does not currently support sharing
2261    # extension module state between interpreters.
2262    @expected_failure_if_gil_disabled()
2263    def test_module_state_shared_in_global(self):
2264        """
2265        bpo-44050: Extension module state should be shared between interpreters
2266        when it doesn't support sub-interpreters.
2267        """
2268        r, w = os.pipe()
2269        self.addCleanup(os.close, r)
2270        self.addCleanup(os.close, w)
2271
2272        # Apple extensions must be distributed as frameworks. This requires
2273        # a specialist loader.
2274        if support.is_apple_mobile:
2275            loader = "AppleFrameworkLoader"
2276        else:
2277            loader = "ExtensionFileLoader"
2278
2279        script = textwrap.dedent(f"""
2280            import importlib.machinery
2281            import importlib.util
2282            import os
2283
2284            fullname = '_test_module_state_shared'
2285            origin = importlib.util.find_spec('_testmultiphase').origin
2286            loader = importlib.machinery.{loader}(fullname, origin)
2287            spec = importlib.util.spec_from_loader(fullname, loader)
2288            module = importlib.util.module_from_spec(spec)
2289            attr_id = str(id(module.Error)).encode()
2290
2291            os.write({w}, attr_id)
2292            """)
2293        exec(script)
2294        main_attr_id = os.read(r, 100)
2295
2296        ret = support.run_in_subinterp(script)
2297        self.assertEqual(ret, 0)
2298        subinterp_attr_id = os.read(r, 100)
2299        self.assertEqual(main_attr_id, subinterp_attr_id)
2300
2301
2302@requires_subinterpreters
2303class InterpreterConfigTests(unittest.TestCase):
2304
2305    supported = {
2306        'isolated': types.SimpleNamespace(
2307            use_main_obmalloc=False,
2308            allow_fork=False,
2309            allow_exec=False,
2310            allow_threads=True,
2311            allow_daemon_threads=False,
2312            check_multi_interp_extensions=True,
2313            gil='own',
2314        ),
2315        'legacy': types.SimpleNamespace(
2316            use_main_obmalloc=True,
2317            allow_fork=True,
2318            allow_exec=True,
2319            allow_threads=True,
2320            allow_daemon_threads=True,
2321            check_multi_interp_extensions=bool(Py_GIL_DISABLED),
2322            gil='shared',
2323        ),
2324        'empty': types.SimpleNamespace(
2325            use_main_obmalloc=False,
2326            allow_fork=False,
2327            allow_exec=False,
2328            allow_threads=False,
2329            allow_daemon_threads=False,
2330            check_multi_interp_extensions=False,
2331            gil='default',
2332        ),
2333    }
2334    gil_supported = ['default', 'shared', 'own']
2335
2336    def iter_all_configs(self):
2337        for use_main_obmalloc in (True, False):
2338            for allow_fork in (True, False):
2339                for allow_exec in (True, False):
2340                    for allow_threads in (True, False):
2341                        for allow_daemon in (True, False):
2342                            for checkext in (True, False):
2343                                for gil in ('shared', 'own', 'default'):
2344                                    yield types.SimpleNamespace(
2345                                        use_main_obmalloc=use_main_obmalloc,
2346                                        allow_fork=allow_fork,
2347                                        allow_exec=allow_exec,
2348                                        allow_threads=allow_threads,
2349                                        allow_daemon_threads=allow_daemon,
2350                                        check_multi_interp_extensions=checkext,
2351                                        gil=gil,
2352                                    )
2353
2354    def assert_ns_equal(self, ns1, ns2, msg=None):
2355        # This is mostly copied from TestCase.assertDictEqual.
2356        self.assertEqual(type(ns1), type(ns2))
2357        if ns1 == ns2:
2358            return
2359
2360        import difflib
2361        import pprint
2362        from unittest.util import _common_shorten_repr
2363        standardMsg = '%s != %s' % _common_shorten_repr(ns1, ns2)
2364        diff = ('\n' + '\n'.join(difflib.ndiff(
2365                       pprint.pformat(vars(ns1)).splitlines(),
2366                       pprint.pformat(vars(ns2)).splitlines())))
2367        diff = f'namespace({diff})'
2368        standardMsg = self._truncateMessage(standardMsg, diff)
2369        self.fail(self._formatMessage(msg, standardMsg))
2370
2371    def test_predefined_config(self):
2372        def check(name, expected):
2373            expected = self.supported[expected]
2374            args = (name,) if name else ()
2375
2376            config1 = _interpreters.new_config(*args)
2377            self.assert_ns_equal(config1, expected)
2378            self.assertIsNot(config1, expected)
2379
2380            config2 = _interpreters.new_config(*args)
2381            self.assert_ns_equal(config2, expected)
2382            self.assertIsNot(config2, expected)
2383            self.assertIsNot(config2, config1)
2384
2385        with self.subTest('default'):
2386            check(None, 'isolated')
2387
2388        for name in self.supported:
2389            with self.subTest(name):
2390                check(name, name)
2391
2392    def test_update_from_dict(self):
2393        for name, vanilla in self.supported.items():
2394            with self.subTest(f'noop ({name})'):
2395                expected = vanilla
2396                overrides = vars(vanilla)
2397                config = _interpreters.new_config(name, **overrides)
2398                self.assert_ns_equal(config, expected)
2399
2400            with self.subTest(f'change all ({name})'):
2401                overrides = {k: not v for k, v in vars(vanilla).items()}
2402                for gil in self.gil_supported:
2403                    if vanilla.gil == gil:
2404                        continue
2405                    overrides['gil'] = gil
2406                    expected = types.SimpleNamespace(**overrides)
2407                    config = _interpreters.new_config(
2408                                                            name, **overrides)
2409                    self.assert_ns_equal(config, expected)
2410
2411            # Override individual fields.
2412            for field, old in vars(vanilla).items():
2413                if field == 'gil':
2414                    values = [v for v in self.gil_supported if v != old]
2415                else:
2416                    values = [not old]
2417                for val in values:
2418                    with self.subTest(f'{name}.{field} ({old!r} -> {val!r})'):
2419                        overrides = {field: val}
2420                        expected = types.SimpleNamespace(
2421                            **dict(vars(vanilla), **overrides),
2422                        )
2423                        config = _interpreters.new_config(
2424                                                            name, **overrides)
2425                        self.assert_ns_equal(config, expected)
2426
2427        with self.subTest('unsupported field'):
2428            for name in self.supported:
2429                with self.assertRaises(ValueError):
2430                    _interpreters.new_config(name, spam=True)
2431
2432        # Bad values for bool fields.
2433        for field, value in vars(self.supported['empty']).items():
2434            if field == 'gil':
2435                continue
2436            assert isinstance(value, bool)
2437            for value in [1, '', 'spam', 1.0, None, object()]:
2438                with self.subTest(f'unsupported value ({field}={value!r})'):
2439                    with self.assertRaises(TypeError):
2440                        _interpreters.new_config(**{field: value})
2441
2442        # Bad values for .gil.
2443        for value in [True, 1, 1.0, None, object()]:
2444            with self.subTest(f'unsupported value(gil={value!r})'):
2445                with self.assertRaises(TypeError):
2446                    _interpreters.new_config(gil=value)
2447        for value in ['', 'spam']:
2448            with self.subTest(f'unsupported value (gil={value!r})'):
2449                with self.assertRaises(ValueError):
2450                    _interpreters.new_config(gil=value)
2451
2452    def test_interp_init(self):
2453        questionable = [
2454            # strange
2455            dict(
2456                allow_fork=True,
2457                allow_exec=False,
2458            ),
2459            dict(
2460                gil='shared',
2461                use_main_obmalloc=False,
2462            ),
2463            # risky
2464            dict(
2465                allow_fork=True,
2466                allow_threads=True,
2467            ),
2468            # ought to be invalid?
2469            dict(
2470                allow_threads=False,
2471                allow_daemon_threads=True,
2472            ),
2473            dict(
2474                gil='own',
2475                use_main_obmalloc=True,
2476            ),
2477        ]
2478        invalid = [
2479            dict(
2480                use_main_obmalloc=False,
2481                check_multi_interp_extensions=False
2482            ),
2483        ]
2484        if Py_GIL_DISABLED:
2485            invalid.append(dict(check_multi_interp_extensions=False))
2486        def match(config, override_cases):
2487            ns = vars(config)
2488            for overrides in override_cases:
2489                if dict(ns, **overrides) == ns:
2490                    return True
2491            return False
2492
2493        def check(config):
2494            script = 'pass'
2495            rc = _testinternalcapi.run_in_subinterp_with_config(script, config)
2496            self.assertEqual(rc, 0)
2497
2498        for config in self.iter_all_configs():
2499            if config.gil == 'default':
2500                continue
2501            if match(config, invalid):
2502                with self.subTest(f'invalid: {config}'):
2503                    with self.assertRaises(_interpreters.InterpreterError):
2504                        check(config)
2505            elif match(config, questionable):
2506                with self.subTest(f'questionable: {config}'):
2507                    check(config)
2508            else:
2509                with self.subTest(f'valid: {config}'):
2510                    check(config)
2511
2512    def test_get_config(self):
2513        @contextlib.contextmanager
2514        def new_interp(config):
2515            interpid = _interpreters.create(config, reqrefs=False)
2516            try:
2517                yield interpid
2518            finally:
2519                try:
2520                    _interpreters.destroy(interpid)
2521                except _interpreters.InterpreterNotFoundError:
2522                    pass
2523
2524        with self.subTest('main'):
2525            expected = _interpreters.new_config('legacy')
2526            expected.gil = 'own'
2527            if Py_GIL_DISABLED:
2528                expected.check_multi_interp_extensions = False
2529            interpid, *_ = _interpreters.get_main()
2530            config = _interpreters.get_config(interpid)
2531            self.assert_ns_equal(config, expected)
2532
2533        with self.subTest('isolated'):
2534            expected = _interpreters.new_config('isolated')
2535            with new_interp('isolated') as interpid:
2536                config = _interpreters.get_config(interpid)
2537            self.assert_ns_equal(config, expected)
2538
2539        with self.subTest('legacy'):
2540            expected = _interpreters.new_config('legacy')
2541            with new_interp('legacy') as interpid:
2542                config = _interpreters.get_config(interpid)
2543            self.assert_ns_equal(config, expected)
2544
2545        with self.subTest('custom'):
2546            orig = _interpreters.new_config(
2547                'empty',
2548                use_main_obmalloc=True,
2549                gil='shared',
2550                check_multi_interp_extensions=bool(Py_GIL_DISABLED),
2551            )
2552            with new_interp(orig) as interpid:
2553                config = _interpreters.get_config(interpid)
2554            self.assert_ns_equal(config, orig)
2555
2556
2557@requires_subinterpreters
2558class InterpreterIDTests(unittest.TestCase):
2559
2560    def add_interp_cleanup(self, interpid):
2561        def ensure_destroyed():
2562            try:
2563                _interpreters.destroy(interpid)
2564            except _interpreters.InterpreterNotFoundError:
2565                pass
2566        self.addCleanup(ensure_destroyed)
2567
2568    def new_interpreter(self):
2569        id = _interpreters.create()
2570        self.add_interp_cleanup(id)
2571        return id
2572
2573    def test_conversion_int(self):
2574        convert = _testinternalcapi.normalize_interp_id
2575        interpid = convert(10)
2576        self.assertEqual(interpid, 10)
2577
2578    def test_conversion_coerced(self):
2579        convert = _testinternalcapi.normalize_interp_id
2580        class MyInt(str):
2581            def __index__(self):
2582                return 10
2583        interpid = convert(MyInt())
2584        self.assertEqual(interpid, 10)
2585
2586    def test_conversion_from_interpreter(self):
2587        convert = _testinternalcapi.normalize_interp_id
2588        interpid = self.new_interpreter()
2589        converted = convert(interpid)
2590        self.assertEqual(converted, interpid)
2591
2592    def test_conversion_bad(self):
2593        convert = _testinternalcapi.normalize_interp_id
2594
2595        for badid in [
2596            object(),
2597            10.0,
2598            '10',
2599            b'10',
2600        ]:
2601            with self.subTest(f'bad: {badid!r}'):
2602                with self.assertRaises(TypeError):
2603                    convert(badid)
2604
2605        badid = -1
2606        with self.subTest(f'bad: {badid!r}'):
2607            with self.assertRaises(ValueError):
2608                convert(badid)
2609
2610        badid = 2**64
2611        with self.subTest(f'bad: {badid!r}'):
2612            with self.assertRaises(OverflowError):
2613                convert(badid)
2614
2615    def test_lookup_exists(self):
2616        interpid = self.new_interpreter()
2617        self.assertTrue(
2618            _testinternalcapi.interpreter_exists(interpid))
2619
2620    def test_lookup_does_not_exist(self):
2621        interpid = _testinternalcapi.unused_interpreter_id()
2622        self.assertFalse(
2623            _testinternalcapi.interpreter_exists(interpid))
2624
2625    def test_lookup_destroyed(self):
2626        interpid = _interpreters.create()
2627        _interpreters.destroy(interpid)
2628        self.assertFalse(
2629            _testinternalcapi.interpreter_exists(interpid))
2630
2631    def get_refcount_helpers(self):
2632        return (
2633            _testinternalcapi.get_interpreter_refcount,
2634            (lambda id: _interpreters.incref(id, implieslink=False)),
2635            _interpreters.decref,
2636        )
2637
2638    def test_linked_lifecycle_does_not_exist(self):
2639        exists = _testinternalcapi.interpreter_exists
2640        is_linked = _testinternalcapi.interpreter_refcount_linked
2641        link = _testinternalcapi.link_interpreter_refcount
2642        unlink = _testinternalcapi.unlink_interpreter_refcount
2643        get_refcount, incref, decref = self.get_refcount_helpers()
2644
2645        with self.subTest('never existed'):
2646            interpid = _testinternalcapi.unused_interpreter_id()
2647            self.assertFalse(
2648                exists(interpid))
2649            with self.assertRaises(_interpreters.InterpreterNotFoundError):
2650                is_linked(interpid)
2651            with self.assertRaises(_interpreters.InterpreterNotFoundError):
2652                link(interpid)
2653            with self.assertRaises(_interpreters.InterpreterNotFoundError):
2654                unlink(interpid)
2655            with self.assertRaises(_interpreters.InterpreterNotFoundError):
2656                get_refcount(interpid)
2657            with self.assertRaises(_interpreters.InterpreterNotFoundError):
2658                incref(interpid)
2659            with self.assertRaises(_interpreters.InterpreterNotFoundError):
2660                decref(interpid)
2661
2662        with self.subTest('destroyed'):
2663            interpid = _interpreters.create()
2664            _interpreters.destroy(interpid)
2665            self.assertFalse(
2666                exists(interpid))
2667            with self.assertRaises(_interpreters.InterpreterNotFoundError):
2668                is_linked(interpid)
2669            with self.assertRaises(_interpreters.InterpreterNotFoundError):
2670                link(interpid)
2671            with self.assertRaises(_interpreters.InterpreterNotFoundError):
2672                unlink(interpid)
2673            with self.assertRaises(_interpreters.InterpreterNotFoundError):
2674                get_refcount(interpid)
2675            with self.assertRaises(_interpreters.InterpreterNotFoundError):
2676                incref(interpid)
2677            with self.assertRaises(_interpreters.InterpreterNotFoundError):
2678                decref(interpid)
2679
2680    def test_linked_lifecycle_initial(self):
2681        is_linked = _testinternalcapi.interpreter_refcount_linked
2682        get_refcount, _, _ = self.get_refcount_helpers()
2683
2684        # A new interpreter will start out not linked, with a refcount of 0.
2685        interpid = self.new_interpreter()
2686        linked = is_linked(interpid)
2687        refcount = get_refcount(interpid)
2688
2689        self.assertFalse(linked)
2690        self.assertEqual(refcount, 0)
2691
2692    def test_linked_lifecycle_never_linked(self):
2693        exists = _testinternalcapi.interpreter_exists
2694        is_linked = _testinternalcapi.interpreter_refcount_linked
2695        get_refcount, incref, decref = self.get_refcount_helpers()
2696
2697        interpid = self.new_interpreter()
2698
2699        # Incref will not automatically link it.
2700        incref(interpid)
2701        self.assertFalse(
2702            is_linked(interpid))
2703        self.assertEqual(
2704            1, get_refcount(interpid))
2705
2706        # It isn't linked so it isn't destroyed.
2707        decref(interpid)
2708        self.assertTrue(
2709            exists(interpid))
2710        self.assertFalse(
2711            is_linked(interpid))
2712        self.assertEqual(
2713            0, get_refcount(interpid))
2714
2715    def test_linked_lifecycle_link_unlink(self):
2716        exists = _testinternalcapi.interpreter_exists
2717        is_linked = _testinternalcapi.interpreter_refcount_linked
2718        link = _testinternalcapi.link_interpreter_refcount
2719        unlink = _testinternalcapi.unlink_interpreter_refcount
2720
2721        interpid = self.new_interpreter()
2722
2723        # Linking at refcount 0 does not destroy the interpreter.
2724        link(interpid)
2725        self.assertTrue(
2726            exists(interpid))
2727        self.assertTrue(
2728            is_linked(interpid))
2729
2730        # Unlinking at refcount 0 does not destroy the interpreter.
2731        unlink(interpid)
2732        self.assertTrue(
2733            exists(interpid))
2734        self.assertFalse(
2735            is_linked(interpid))
2736
2737    def test_linked_lifecycle_link_incref_decref(self):
2738        exists = _testinternalcapi.interpreter_exists
2739        is_linked = _testinternalcapi.interpreter_refcount_linked
2740        link = _testinternalcapi.link_interpreter_refcount
2741        get_refcount, incref, decref = self.get_refcount_helpers()
2742
2743        interpid = self.new_interpreter()
2744
2745        # Linking it will not change the refcount.
2746        link(interpid)
2747        self.assertTrue(
2748            is_linked(interpid))
2749        self.assertEqual(
2750            0, get_refcount(interpid))
2751
2752        # Decref with a refcount of 0 is not allowed.
2753        incref(interpid)
2754        self.assertEqual(
2755            1, get_refcount(interpid))
2756
2757        # When linked, decref back to 0 destroys the interpreter.
2758        decref(interpid)
2759        self.assertFalse(
2760            exists(interpid))
2761
2762    def test_linked_lifecycle_incref_link(self):
2763        is_linked = _testinternalcapi.interpreter_refcount_linked
2764        link = _testinternalcapi.link_interpreter_refcount
2765        get_refcount, incref, _ = self.get_refcount_helpers()
2766
2767        interpid = self.new_interpreter()
2768
2769        incref(interpid)
2770        self.assertEqual(
2771            1, get_refcount(interpid))
2772
2773        # Linking it will not reset the refcount.
2774        link(interpid)
2775        self.assertTrue(
2776            is_linked(interpid))
2777        self.assertEqual(
2778            1, get_refcount(interpid))
2779
2780    def test_linked_lifecycle_link_incref_unlink_decref(self):
2781        exists = _testinternalcapi.interpreter_exists
2782        is_linked = _testinternalcapi.interpreter_refcount_linked
2783        link = _testinternalcapi.link_interpreter_refcount
2784        unlink = _testinternalcapi.unlink_interpreter_refcount
2785        get_refcount, incref, decref = self.get_refcount_helpers()
2786
2787        interpid = self.new_interpreter()
2788
2789        link(interpid)
2790        self.assertTrue(
2791            is_linked(interpid))
2792
2793        incref(interpid)
2794        self.assertEqual(
2795            1, get_refcount(interpid))
2796
2797        # Unlinking it will not change the refcount.
2798        unlink(interpid)
2799        self.assertFalse(
2800            is_linked(interpid))
2801        self.assertEqual(
2802            1, get_refcount(interpid))
2803
2804        # Unlinked: decref back to 0 does not destroys the interpreter.
2805        decref(interpid)
2806        self.assertTrue(
2807            exists(interpid))
2808        self.assertEqual(
2809            0, get_refcount(interpid))
2810
2811
2812class BuiltinStaticTypesTests(unittest.TestCase):
2813
2814    TYPES = [
2815        object,
2816        type,
2817        int,
2818        str,
2819        dict,
2820        type(None),
2821        bool,
2822        BaseException,
2823        Exception,
2824        Warning,
2825        DeprecationWarning,  # Warning subclass
2826    ]
2827
2828    def test_tp_bases_is_set(self):
2829        # PyTypeObject.tp_bases is documented as public API.
2830        # See https://github.com/python/cpython/issues/105020.
2831        for typeobj in self.TYPES:
2832            with self.subTest(typeobj):
2833                bases = _testcapi.type_get_tp_bases(typeobj)
2834                self.assertIsNot(bases, None)
2835
2836    def test_tp_mro_is_set(self):
2837        # PyTypeObject.tp_bases is documented as public API.
2838        # See https://github.com/python/cpython/issues/105020.
2839        for typeobj in self.TYPES:
2840            with self.subTest(typeobj):
2841                mro = _testcapi.type_get_tp_mro(typeobj)
2842                self.assertIsNot(mro, None)
2843
2844
2845class TestStaticTypes(unittest.TestCase):
2846
2847    _has_run = False
2848
2849    @classmethod
2850    def setUpClass(cls):
2851        # The tests here don't play nice with our approach to refleak
2852        # detection, so we bail out in that case.
2853        if cls._has_run:
2854            raise unittest.SkipTest('these tests do not support re-running')
2855        cls._has_run = True
2856
2857    @contextlib.contextmanager
2858    def basic_static_type(self, *args):
2859        cls = _testcapi.get_basic_static_type(*args)
2860        yield cls
2861
2862    def test_pytype_ready_always_sets_tp_type(self):
2863        # The point of this test is to prevent something like
2864        # https://github.com/python/cpython/issues/104614
2865        # from happening again.
2866
2867        # First check when tp_base/tp_bases is *not* set before PyType_Ready().
2868        with self.basic_static_type() as cls:
2869            self.assertIs(cls.__base__, object);
2870            self.assertEqual(cls.__bases__, (object,));
2871            self.assertIs(type(cls), type(object));
2872
2873        # Then check when we *do* set tp_base/tp_bases first.
2874        with self.basic_static_type(object) as cls:
2875            self.assertIs(cls.__base__, object);
2876            self.assertEqual(cls.__bases__, (object,));
2877            self.assertIs(type(cls), type(object));
2878
2879
2880class TestThreadState(unittest.TestCase):
2881
2882    @threading_helper.reap_threads
2883    @threading_helper.requires_working_threading()
2884    def test_thread_state(self):
2885        # some extra thread-state tests driven via _testcapi
2886        def target():
2887            idents = []
2888
2889            def callback():
2890                idents.append(threading.get_ident())
2891
2892            _testcapi._test_thread_state(callback)
2893            a = b = callback
2894            time.sleep(1)
2895            # Check our main thread is in the list exactly 3 times.
2896            self.assertEqual(idents.count(threading.get_ident()), 3,
2897                             "Couldn't find main thread correctly in the list")
2898
2899        target()
2900        t = threading.Thread(target=target)
2901        t.start()
2902        t.join()
2903
2904    @threading_helper.reap_threads
2905    @threading_helper.requires_working_threading()
2906    def test_thread_gilstate_in_clear(self):
2907        # See https://github.com/python/cpython/issues/119585
2908        class C:
2909            def __del__(self):
2910                _testcapi.gilstate_ensure_release()
2911
2912        # Thread-local variables are destroyed in `PyThreadState_Clear()`.
2913        local_var = threading.local()
2914
2915        def callback():
2916            local_var.x = C()
2917
2918        _testcapi._test_thread_state(callback)
2919
2920    @threading_helper.reap_threads
2921    @threading_helper.requires_working_threading()
2922    def test_gilstate_ensure_no_deadlock(self):
2923        # See https://github.com/python/cpython/issues/96071
2924        code = textwrap.dedent("""
2925            import _testcapi
2926
2927            def callback():
2928                print('callback called')
2929
2930            _testcapi._test_thread_state(callback)
2931            """)
2932        ret = assert_python_ok('-X', 'tracemalloc', '-c', code)
2933        self.assertIn(b'callback called', ret.out)
2934
2935    def test_gilstate_matches_current(self):
2936        _testcapi.test_current_tstate_matches()
2937
2938
2939def get_test_funcs(mod, exclude_prefix=None):
2940    funcs = {}
2941    for name in dir(mod):
2942        if not name.startswith('test_'):
2943            continue
2944        if exclude_prefix is not None and name.startswith(exclude_prefix):
2945            continue
2946        funcs[name] = getattr(mod, name)
2947    return funcs
2948
2949
2950class Test_testcapi(unittest.TestCase):
2951    locals().update(get_test_funcs(_testcapi))
2952
2953    # Suppress warning from PyUnicode_FromUnicode().
2954    @warnings_helper.ignore_warnings(category=DeprecationWarning)
2955    def test_widechar(self):
2956        _testlimitedcapi.test_widechar()
2957
2958    def test_version_api_data(self):
2959        self.assertEqual(_testcapi.Py_Version, sys.hexversion)
2960
2961
2962class Test_testlimitedcapi(unittest.TestCase):
2963    locals().update(get_test_funcs(_testlimitedcapi))
2964
2965
2966class Test_testinternalcapi(unittest.TestCase):
2967    locals().update(get_test_funcs(_testinternalcapi,
2968                                   exclude_prefix='test_lock_'))
2969
2970
2971@threading_helper.requires_working_threading()
2972class Test_PyLock(unittest.TestCase):
2973    locals().update((name, getattr(_testinternalcapi, name))
2974                    for name in dir(_testinternalcapi)
2975                    if name.startswith('test_lock_'))
2976
2977
2978@unittest.skipIf(_testmultiphase is None, "test requires _testmultiphase module")
2979class Test_ModuleStateAccess(unittest.TestCase):
2980    """Test access to module start (PEP 573)"""
2981
2982    # The C part of the tests lives in _testmultiphase, in a module called
2983    # _testmultiphase_meth_state_access.
2984    # This module has multi-phase initialization, unlike _testcapi.
2985
2986    def setUp(self):
2987        fullname = '_testmultiphase_meth_state_access'  # XXX
2988        origin = importlib.util.find_spec('_testmultiphase').origin
2989        # Apple extensions must be distributed as frameworks. This requires
2990        # a specialist loader.
2991        if support.is_apple_mobile:
2992            loader = importlib.machinery.AppleFrameworkLoader(fullname, origin)
2993        else:
2994            loader = importlib.machinery.ExtensionFileLoader(fullname, origin)
2995        spec = importlib.util.spec_from_loader(fullname, loader)
2996        module = importlib.util.module_from_spec(spec)
2997        loader.exec_module(module)
2998        self.module = module
2999
3000    def test_subclass_get_module(self):
3001        """PyType_GetModule for defining_class"""
3002        class StateAccessType_Subclass(self.module.StateAccessType):
3003            pass
3004
3005        instance = StateAccessType_Subclass()
3006        self.assertIs(instance.get_defining_module(), self.module)
3007
3008    def test_subclass_get_module_with_super(self):
3009        class StateAccessType_Subclass(self.module.StateAccessType):
3010            def get_defining_module(self):
3011                return super().get_defining_module()
3012
3013        instance = StateAccessType_Subclass()
3014        self.assertIs(instance.get_defining_module(), self.module)
3015
3016    def test_state_access(self):
3017        """Checks methods defined with and without argument clinic
3018
3019        This tests a no-arg method (get_count) and a method with
3020        both a positional and keyword argument.
3021        """
3022
3023        a = self.module.StateAccessType()
3024        b = self.module.StateAccessType()
3025
3026        methods = {
3027            'clinic': a.increment_count_clinic,
3028            'noclinic': a.increment_count_noclinic,
3029        }
3030
3031        for name, increment_count in methods.items():
3032            with self.subTest(name):
3033                self.assertEqual(a.get_count(), b.get_count())
3034                self.assertEqual(a.get_count(), 0)
3035
3036                increment_count()
3037                self.assertEqual(a.get_count(), b.get_count())
3038                self.assertEqual(a.get_count(), 1)
3039
3040                increment_count(3)
3041                self.assertEqual(a.get_count(), b.get_count())
3042                self.assertEqual(a.get_count(), 4)
3043
3044                increment_count(-2, twice=True)
3045                self.assertEqual(a.get_count(), b.get_count())
3046                self.assertEqual(a.get_count(), 0)
3047
3048                with self.assertRaises(TypeError):
3049                    increment_count(thrice=3)
3050
3051                with self.assertRaises(TypeError):
3052                    increment_count(1, 2, 3)
3053
3054    def test_get_module_bad_def(self):
3055        # PyType_GetModuleByDef fails gracefully if it doesn't
3056        # find what it's looking for.
3057        # see bpo-46433
3058        instance = self.module.StateAccessType()
3059        with self.assertRaises(TypeError):
3060            instance.getmodulebydef_bad_def()
3061
3062    def test_get_module_static_in_mro(self):
3063        # Here, the class PyType_GetModuleByDef is looking for
3064        # appears in the MRO after a static type (Exception).
3065        # see bpo-46433
3066        class Subclass(BaseException, self.module.StateAccessType):
3067            pass
3068        self.assertIs(Subclass().get_defining_module(), self.module)
3069
3070
3071class TestInternalFrameApi(unittest.TestCase):
3072
3073    @staticmethod
3074    def func():
3075        return sys._getframe()
3076
3077    def test_code(self):
3078        frame = self.func()
3079        code = _testinternalcapi.iframe_getcode(frame)
3080        self.assertIs(code, self.func.__code__)
3081
3082    def test_lasti(self):
3083        frame = self.func()
3084        lasti = _testinternalcapi.iframe_getlasti(frame)
3085        self.assertGreater(lasti, 0)
3086        self.assertLess(lasti, len(self.func.__code__.co_code))
3087
3088    def test_line(self):
3089        frame = self.func()
3090        line = _testinternalcapi.iframe_getline(frame)
3091        firstline = self.func.__code__.co_firstlineno
3092        self.assertEqual(line, firstline + 2)
3093
3094
3095SUFFICIENT_TO_DEOPT_AND_SPECIALIZE = 100
3096
3097class Test_Pep523API(unittest.TestCase):
3098
3099    def do_test(self, func, names):
3100        actual_calls = []
3101        start = SUFFICIENT_TO_DEOPT_AND_SPECIALIZE
3102        count = start + SUFFICIENT_TO_DEOPT_AND_SPECIALIZE
3103        try:
3104            for i in range(count):
3105                if i == start:
3106                    _testinternalcapi.set_eval_frame_record(actual_calls)
3107                func()
3108        finally:
3109            _testinternalcapi.set_eval_frame_default()
3110        expected_calls = names * SUFFICIENT_TO_DEOPT_AND_SPECIALIZE
3111        self.assertEqual(len(expected_calls), len(actual_calls))
3112        for expected, actual in zip(expected_calls, actual_calls, strict=True):
3113            self.assertEqual(expected, actual)
3114
3115    def test_inlined_binary_subscr(self):
3116        class C:
3117            def __getitem__(self, other):
3118                return None
3119        def func():
3120            C()[42]
3121        names = ["func", "__getitem__"]
3122        self.do_test(func, names)
3123
3124    def test_inlined_call(self):
3125        def inner(x=42):
3126            pass
3127        def func():
3128            inner()
3129            inner(42)
3130        names = ["func", "inner", "inner"]
3131        self.do_test(func, names)
3132
3133    def test_inlined_call_function_ex(self):
3134        def inner(x):
3135            pass
3136        def func():
3137            inner(*[42])
3138        names = ["func", "inner"]
3139        self.do_test(func, names)
3140
3141    def test_inlined_for_iter(self):
3142        def gen():
3143            yield 42
3144        def func():
3145            for _ in gen():
3146                pass
3147        names = ["func", "gen", "gen", "gen"]
3148        self.do_test(func, names)
3149
3150    def test_inlined_load_attr(self):
3151        class C:
3152            @property
3153            def a(self):
3154                return 42
3155        class D:
3156            def __getattribute__(self, name):
3157                return 42
3158        def func():
3159            C().a
3160            D().a
3161        names = ["func", "a", "__getattribute__"]
3162        self.do_test(func, names)
3163
3164    def test_inlined_send(self):
3165        def inner():
3166            yield 42
3167        def outer():
3168            yield from inner()
3169        def func():
3170            list(outer())
3171        names = ["func", "outer", "outer", "inner", "inner", "outer", "inner"]
3172        self.do_test(func, names)
3173
3174
3175@unittest.skipUnless(support.Py_GIL_DISABLED, 'need Py_GIL_DISABLED')
3176class TestPyThreadId(unittest.TestCase):
3177    def test_py_thread_id(self):
3178        # gh-112535: Test _Py_ThreadId(): make sure that thread identifiers
3179        # in a few threads are unique
3180        py_thread_id = _testinternalcapi.py_thread_id
3181        short_sleep = 0.010
3182
3183        class GetThreadId(threading.Thread):
3184            def __init__(self):
3185                super().__init__()
3186                self.get_lock = threading.Lock()
3187                self.get_lock.acquire()
3188                self.started_lock = threading.Event()
3189                self.py_tid = None
3190
3191            def run(self):
3192                self.started_lock.set()
3193                self.get_lock.acquire()
3194                self.py_tid = py_thread_id()
3195                time.sleep(short_sleep)
3196                self.py_tid2 = py_thread_id()
3197
3198        nthread = 5
3199        threads = [GetThreadId() for _ in range(nthread)]
3200
3201        # first make run sure that all threads are running
3202        for thread in threads:
3203            thread.start()
3204        for thread in threads:
3205            thread.started_lock.wait()
3206
3207        # call _Py_ThreadId() in the main thread
3208        py_thread_ids = [py_thread_id()]
3209
3210        # now call _Py_ThreadId() in each thread
3211        for thread in threads:
3212            thread.get_lock.release()
3213
3214        # call _Py_ThreadId() in each thread and wait until threads complete
3215        for thread in threads:
3216            thread.join()
3217            py_thread_ids.append(thread.py_tid)
3218            # _PyThread_Id() should not change for a given thread.
3219            # For example, it should remain the same after a short sleep.
3220            self.assertEqual(thread.py_tid2, thread.py_tid)
3221
3222        # make sure that all _Py_ThreadId() are unique
3223        for tid in py_thread_ids:
3224            self.assertIsInstance(tid, int)
3225            self.assertGreater(tid, 0)
3226        self.assertEqual(len(set(py_thread_ids)), len(py_thread_ids),
3227                         py_thread_ids)
3228
3229
3230if __name__ == "__main__":
3231    unittest.main()
3232