1# Run the _testcapi module tests (tests for the Python/C API): by defn, 2# these are all functions _testcapi exports whose name begins with 'test_'. 3 4import _thread 5from collections import deque 6import contextlib 7import importlib.machinery 8import importlib.util 9import json 10import os 11import pickle 12import queue 13import random 14import sys 15import textwrap 16import threading 17import time 18import types 19import unittest 20import warnings 21import weakref 22import operator 23from test import support 24from test.support import MISSING_C_DOCSTRINGS 25from test.support import import_helper 26from test.support import threading_helper 27from test.support import warnings_helper 28from test.support import requires_limited_api 29from test.support import suppress_immortalization 30from test.support import expected_failure_if_gil_disabled 31from test.support import Py_GIL_DISABLED 32from test.support.script_helper import assert_python_failure, assert_python_ok, run_python_until_end 33try: 34 import _posixsubprocess 35except ImportError: 36 _posixsubprocess = None 37try: 38 import _testmultiphase 39except ImportError: 40 _testmultiphase = None 41try: 42 import _testsinglephase 43except ImportError: 44 _testsinglephase = None 45try: 46 import _interpreters 47except ModuleNotFoundError: 48 _interpreters = None 49 50# Skip this test if the _testcapi module isn't available. 51_testcapi = import_helper.import_module('_testcapi') 52 53import _testlimitedcapi 54import _testinternalcapi 55 56 57NULL = None 58 59def decode_stderr(err): 60 return err.decode('utf-8', 'replace').replace('\r', '') 61 62 63def requires_subinterpreters(meth): 64 """Decorator to skip a test if subinterpreters are not supported.""" 65 return unittest.skipIf(_interpreters is None, 66 'subinterpreters required')(meth) 67 68 69def testfunction(self): 70 """some doc""" 71 return self 72 73 74class InstanceMethod: 75 id = _testcapi.instancemethod(id) 76 testfunction = _testcapi.instancemethod(testfunction) 77 78class CAPITest(unittest.TestCase): 79 80 def test_instancemethod(self): 81 inst = InstanceMethod() 82 self.assertEqual(id(inst), inst.id()) 83 self.assertTrue(inst.testfunction() is inst) 84 self.assertEqual(inst.testfunction.__doc__, testfunction.__doc__) 85 self.assertEqual(InstanceMethod.testfunction.__doc__, testfunction.__doc__) 86 87 InstanceMethod.testfunction.attribute = "test" 88 self.assertEqual(testfunction.attribute, "test") 89 self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test") 90 91 @support.requires_subprocess() 92 def test_no_FatalError_infinite_loop(self): 93 code = textwrap.dedent(""" 94 import _testcapi 95 from test import support 96 97 with support.SuppressCrashReport(): 98 _testcapi.crash_no_current_thread() 99 """) 100 101 run_result, _cmd_line = run_python_until_end('-c', code) 102 _rc, out, err = run_result 103 self.assertEqual(out, b'') 104 # This used to cause an infinite loop. 105 msg = ("Fatal Python error: PyThreadState_Get: " 106 "the function must be called with the GIL held, " 107 "after Python initialization and before Python finalization, " 108 "but the GIL is released " 109 "(the current Python thread state is NULL)").encode() 110 self.assertTrue(err.rstrip().startswith(msg), 111 err) 112 113 def test_memoryview_from_NULL_pointer(self): 114 self.assertRaises(ValueError, _testcapi.make_memoryview_from_NULL_pointer) 115 116 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.') 117 def test_seq_bytes_to_charp_array(self): 118 # Issue #15732: crash in _PySequence_BytesToCharpArray() 119 class Z(object): 120 def __len__(self): 121 return 1 122 with self.assertRaisesRegex(TypeError, 'indexing'): 123 _posixsubprocess.fork_exec( 124 1,Z(),True,(1, 2),5,6,7,8,9,10,11,12,13,14,True,True,17,False,19,20,21,22,False) 125 # Issue #15736: overflow in _PySequence_BytesToCharpArray() 126 class Z(object): 127 def __len__(self): 128 return sys.maxsize 129 def __getitem__(self, i): 130 return b'x' 131 self.assertRaises(MemoryError, _posixsubprocess.fork_exec, 132 1,Z(),True,(1, 2),5,6,7,8,9,10,11,12,13,14,True,True,17,False,19,20,21,22,False) 133 134 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.') 135 def test_subprocess_fork_exec(self): 136 class Z(object): 137 def __len__(self): 138 return 1 139 140 # Issue #15738: crash in subprocess_fork_exec() 141 self.assertRaises(TypeError, _posixsubprocess.fork_exec, 142 Z(),[b'1'],True,(1, 2),5,6,7,8,9,10,11,12,13,14,True,True,17,False,19,20,21,22,False) 143 144 @unittest.skipIf(MISSING_C_DOCSTRINGS, 145 "Signature information for builtins requires docstrings") 146 def test_docstring_signature_parsing(self): 147 148 self.assertEqual(_testcapi.no_docstring.__doc__, None) 149 self.assertEqual(_testcapi.no_docstring.__text_signature__, None) 150 151 self.assertEqual(_testcapi.docstring_empty.__doc__, None) 152 self.assertEqual(_testcapi.docstring_empty.__text_signature__, None) 153 154 self.assertEqual(_testcapi.docstring_no_signature.__doc__, 155 "This docstring has no signature.") 156 self.assertEqual(_testcapi.docstring_no_signature.__text_signature__, None) 157 158 self.assertEqual(_testcapi.docstring_with_invalid_signature.__doc__, 159 "docstring_with_invalid_signature($module, /, boo)\n" 160 "\n" 161 "This docstring has an invalid signature." 162 ) 163 self.assertEqual(_testcapi.docstring_with_invalid_signature.__text_signature__, None) 164 165 self.assertEqual(_testcapi.docstring_with_invalid_signature2.__doc__, 166 "docstring_with_invalid_signature2($module, /, boo)\n" 167 "\n" 168 "--\n" 169 "\n" 170 "This docstring also has an invalid signature." 171 ) 172 self.assertEqual(_testcapi.docstring_with_invalid_signature2.__text_signature__, None) 173 174 self.assertEqual(_testcapi.docstring_with_signature.__doc__, 175 "This docstring has a valid signature.") 176 self.assertEqual(_testcapi.docstring_with_signature.__text_signature__, "($module, /, sig)") 177 178 self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__doc__, None) 179 self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__text_signature__, 180 "($module, /, sig)") 181 182 self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__doc__, 183 "\nThis docstring has a valid signature and some extra newlines.") 184 self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__text_signature__, 185 "($module, /, parameter)") 186 187 def test_c_type_with_matrix_multiplication(self): 188 M = _testcapi.matmulType 189 m1 = M() 190 m2 = M() 191 self.assertEqual(m1 @ m2, ("matmul", m1, m2)) 192 self.assertEqual(m1 @ 42, ("matmul", m1, 42)) 193 self.assertEqual(42 @ m1, ("matmul", 42, m1)) 194 o = m1 195 o @= m2 196 self.assertEqual(o, ("imatmul", m1, m2)) 197 o = m1 198 o @= 42 199 self.assertEqual(o, ("imatmul", m1, 42)) 200 o = 42 201 o @= m1 202 self.assertEqual(o, ("matmul", 42, m1)) 203 204 def test_c_type_with_ipow(self): 205 # When the __ipow__ method of a type was implemented in C, using the 206 # modulo param would cause segfaults. 207 o = _testcapi.ipowType() 208 self.assertEqual(o.__ipow__(1), (1, None)) 209 self.assertEqual(o.__ipow__(2, 2), (2, 2)) 210 211 def test_return_null_without_error(self): 212 # Issue #23571: A function must not return NULL without setting an 213 # error 214 if support.Py_DEBUG: 215 code = textwrap.dedent(""" 216 import _testcapi 217 from test import support 218 219 with support.SuppressCrashReport(): 220 _testcapi.return_null_without_error() 221 """) 222 rc, out, err = assert_python_failure('-c', code) 223 err = decode_stderr(err) 224 self.assertRegex(err, 225 r'Fatal Python error: _Py_CheckFunctionResult: ' 226 r'a function returned NULL without setting an exception\n' 227 r'Python runtime state: initialized\n' 228 r'SystemError: <built-in function return_null_without_error> ' 229 r'returned NULL without setting an exception\n' 230 r'\n' 231 r'Current thread.*:\n' 232 r' File .*", line 6 in <module>\n') 233 else: 234 with self.assertRaises(SystemError) as cm: 235 _testcapi.return_null_without_error() 236 self.assertRegex(str(cm.exception), 237 'return_null_without_error.* ' 238 'returned NULL without setting an exception') 239 240 def test_return_result_with_error(self): 241 # Issue #23571: A function must not return a result with an error set 242 if support.Py_DEBUG: 243 code = textwrap.dedent(""" 244 import _testcapi 245 from test import support 246 247 with support.SuppressCrashReport(): 248 _testcapi.return_result_with_error() 249 """) 250 rc, out, err = assert_python_failure('-c', code) 251 err = decode_stderr(err) 252 self.assertRegex(err, 253 r'Fatal Python error: _Py_CheckFunctionResult: ' 254 r'a function returned a result with an exception set\n' 255 r'Python runtime state: initialized\n' 256 r'ValueError\n' 257 r'\n' 258 r'The above exception was the direct cause ' 259 r'of the following exception:\n' 260 r'\n' 261 r'SystemError: <built-in ' 262 r'function return_result_with_error> ' 263 r'returned a result with an exception set\n' 264 r'\n' 265 r'Current thread.*:\n' 266 r' File .*, line 6 in <module>\n') 267 else: 268 with self.assertRaises(SystemError) as cm: 269 _testcapi.return_result_with_error() 270 self.assertRegex(str(cm.exception), 271 'return_result_with_error.* ' 272 'returned a result with an exception set') 273 274 def test_getitem_with_error(self): 275 # Test _Py_CheckSlotResult(). Raise an exception and then calls 276 # PyObject_GetItem(): check that the assertion catches the bug. 277 # PyObject_GetItem() must not be called with an exception set. 278 code = textwrap.dedent(""" 279 import _testcapi 280 from test import support 281 282 with support.SuppressCrashReport(): 283 _testcapi.getitem_with_error({1: 2}, 1) 284 """) 285 rc, out, err = assert_python_failure('-c', code) 286 err = decode_stderr(err) 287 if 'SystemError: ' not in err: 288 self.assertRegex(err, 289 r'Fatal Python error: _Py_CheckSlotResult: ' 290 r'Slot __getitem__ of type dict succeeded ' 291 r'with an exception set\n' 292 r'Python runtime state: initialized\n' 293 r'ValueError: bug\n' 294 r'\n' 295 r'Current thread .* \(most recent call first\):\n' 296 r' File .*, line 6 in <module>\n' 297 r'\n' 298 r'Extension modules: _testcapi \(total: 1\)\n') 299 else: 300 # Python built with NDEBUG macro defined: 301 # test _Py_CheckFunctionResult() instead. 302 self.assertIn('returned a result with an exception set', err) 303 304 def test_buildvalue(self): 305 # Test Py_BuildValue() with object arguments 306 buildvalue = _testcapi.py_buildvalue 307 self.assertEqual(buildvalue(''), None) 308 self.assertEqual(buildvalue('()'), ()) 309 self.assertEqual(buildvalue('[]'), []) 310 self.assertEqual(buildvalue('{}'), {}) 311 self.assertEqual(buildvalue('()[]{}'), ((), [], {})) 312 self.assertEqual(buildvalue('O', 1), 1) 313 self.assertEqual(buildvalue('(O)', 1), (1,)) 314 self.assertEqual(buildvalue('[O]', 1), [1]) 315 self.assertRaises(SystemError, buildvalue, '{O}', 1) 316 self.assertEqual(buildvalue('OO', 1, 2), (1, 2)) 317 self.assertEqual(buildvalue('(OO)', 1, 2), (1, 2)) 318 self.assertEqual(buildvalue('[OO]', 1, 2), [1, 2]) 319 self.assertEqual(buildvalue('{OO}', 1, 2), {1: 2}) 320 self.assertEqual(buildvalue('{OOOO}', 1, 2, 3, 4), {1: 2, 3: 4}) 321 self.assertEqual(buildvalue('((O))', 1), ((1,),)) 322 self.assertEqual(buildvalue('((OO))', 1, 2), ((1, 2),)) 323 324 self.assertEqual(buildvalue(' \t,:'), None) 325 self.assertEqual(buildvalue('O,', 1), 1) 326 self.assertEqual(buildvalue(' O ', 1), 1) 327 self.assertEqual(buildvalue('\tO\t', 1), 1) 328 self.assertEqual(buildvalue('O,O', 1, 2), (1, 2)) 329 self.assertEqual(buildvalue('O, O', 1, 2), (1, 2)) 330 self.assertEqual(buildvalue('O,\tO', 1, 2), (1, 2)) 331 self.assertEqual(buildvalue('O O', 1, 2), (1, 2)) 332 self.assertEqual(buildvalue('O\tO', 1, 2), (1, 2)) 333 self.assertEqual(buildvalue('(O,O)', 1, 2), (1, 2)) 334 self.assertEqual(buildvalue('(O, O,)', 1, 2), (1, 2)) 335 self.assertEqual(buildvalue(' ( O O ) ', 1, 2), (1, 2)) 336 self.assertEqual(buildvalue('\t(\tO\tO\t)\t', 1, 2), (1, 2)) 337 self.assertEqual(buildvalue('[O,O]', 1, 2), [1, 2]) 338 self.assertEqual(buildvalue('[O, O,]', 1, 2), [1, 2]) 339 self.assertEqual(buildvalue(' [ O O ] ', 1, 2), [1, 2]) 340 self.assertEqual(buildvalue(' [\tO\tO\t] ', 1, 2), [1, 2]) 341 self.assertEqual(buildvalue('{O:O}', 1, 2), {1: 2}) 342 self.assertEqual(buildvalue('{O:O,O:O}', 1, 2, 3, 4), {1: 2, 3: 4}) 343 self.assertEqual(buildvalue('{O: O, O: O,}', 1, 2, 3, 4), {1: 2, 3: 4}) 344 self.assertEqual(buildvalue(' { O O O O } ', 1, 2, 3, 4), {1: 2, 3: 4}) 345 self.assertEqual(buildvalue('\t{\tO\tO\tO\tO\t}\t', 1, 2, 3, 4), {1: 2, 3: 4}) 346 347 self.assertRaises(SystemError, buildvalue, 'O', NULL) 348 self.assertRaises(SystemError, buildvalue, '(O)', NULL) 349 self.assertRaises(SystemError, buildvalue, '[O]', NULL) 350 self.assertRaises(SystemError, buildvalue, '{O}', NULL) 351 self.assertRaises(SystemError, buildvalue, 'OO', 1, NULL) 352 self.assertRaises(SystemError, buildvalue, 'OO', NULL, 2) 353 self.assertRaises(SystemError, buildvalue, '(OO)', 1, NULL) 354 self.assertRaises(SystemError, buildvalue, '(OO)', NULL, 2) 355 self.assertRaises(SystemError, buildvalue, '[OO]', 1, NULL) 356 self.assertRaises(SystemError, buildvalue, '[OO]', NULL, 2) 357 self.assertRaises(SystemError, buildvalue, '{OO}', 1, NULL) 358 self.assertRaises(SystemError, buildvalue, '{OO}', NULL, 2) 359 360 def test_buildvalue_ints(self): 361 # Test Py_BuildValue() with integer arguments 362 buildvalue = _testcapi.py_buildvalue_ints 363 from _testcapi import SHRT_MIN, SHRT_MAX, USHRT_MAX, INT_MIN, INT_MAX, UINT_MAX 364 self.assertEqual(buildvalue('i', INT_MAX), INT_MAX) 365 self.assertEqual(buildvalue('i', INT_MIN), INT_MIN) 366 self.assertEqual(buildvalue('I', UINT_MAX), UINT_MAX) 367 368 self.assertEqual(buildvalue('h', SHRT_MAX), SHRT_MAX) 369 self.assertEqual(buildvalue('h', SHRT_MIN), SHRT_MIN) 370 self.assertEqual(buildvalue('H', USHRT_MAX), USHRT_MAX) 371 372 self.assertEqual(buildvalue('b', 127), 127) 373 self.assertEqual(buildvalue('b', -128), -128) 374 self.assertEqual(buildvalue('B', 255), 255) 375 376 self.assertEqual(buildvalue('c', ord('A')), b'A') 377 self.assertEqual(buildvalue('c', 255), b'\xff') 378 self.assertEqual(buildvalue('c', 256), b'\x00') 379 self.assertEqual(buildvalue('c', -1), b'\xff') 380 381 self.assertEqual(buildvalue('C', 255), chr(255)) 382 self.assertEqual(buildvalue('C', 256), chr(256)) 383 self.assertEqual(buildvalue('C', sys.maxunicode), chr(sys.maxunicode)) 384 self.assertRaises(ValueError, buildvalue, 'C', -1) 385 self.assertRaises(ValueError, buildvalue, 'C', sys.maxunicode+1) 386 387 # gh-84489 388 self.assertRaises(ValueError, buildvalue, '(C )i', -1, 2) 389 self.assertRaises(ValueError, buildvalue, '[C ]i', -1, 2) 390 self.assertRaises(ValueError, buildvalue, '{Ci }i', -1, 2, 3) 391 392 def test_buildvalue_N(self): 393 _testcapi.test_buildvalue_N() 394 395 def check_negative_refcount(self, code): 396 # bpo-35059: Check that Py_DECREF() reports the correct filename 397 # when calling _Py_NegativeRefcount() to abort Python. 398 code = textwrap.dedent(code) 399 rc, out, err = assert_python_failure('-c', code) 400 self.assertRegex(err, 401 br'_testcapimodule\.c:[0-9]+: ' 402 br'_Py_NegativeRefcount: Assertion failed: ' 403 br'object has negative ref count') 404 405 @unittest.skipUnless(hasattr(_testcapi, 'negative_refcount'), 406 'need _testcapi.negative_refcount()') 407 def test_negative_refcount(self): 408 code = """ 409 import _testcapi 410 from test import support 411 412 with support.SuppressCrashReport(): 413 _testcapi.negative_refcount() 414 """ 415 self.check_negative_refcount(code) 416 417 @unittest.skipUnless(hasattr(_testcapi, 'decref_freed_object'), 418 'need _testcapi.decref_freed_object()') 419 @support.skip_if_sanitizer("use after free on purpose", 420 address=True, memory=True, ub=True) 421 def test_decref_freed_object(self): 422 code = """ 423 import _testcapi 424 from test import support 425 426 with support.SuppressCrashReport(): 427 _testcapi.decref_freed_object() 428 """ 429 self.check_negative_refcount(code) 430 431 def test_trashcan_subclass(self): 432 # bpo-35983: Check that the trashcan mechanism for "list" is NOT 433 # activated when its tp_dealloc is being called by a subclass 434 from _testcapi import MyList 435 L = None 436 for i in range(1000): 437 L = MyList((L,)) 438 439 @support.requires_resource('cpu') 440 def test_trashcan_python_class1(self): 441 self.do_test_trashcan_python_class(list) 442 443 @support.requires_resource('cpu') 444 def test_trashcan_python_class2(self): 445 from _testcapi import MyList 446 self.do_test_trashcan_python_class(MyList) 447 448 def do_test_trashcan_python_class(self, base): 449 # Check that the trashcan mechanism works properly for a Python 450 # subclass of a class using the trashcan (this specific test assumes 451 # that the base class "base" behaves like list) 452 class PyList(base): 453 # Count the number of PyList instances to verify that there is 454 # no memory leak 455 num = 0 456 def __init__(self, *args): 457 __class__.num += 1 458 super().__init__(*args) 459 def __del__(self): 460 __class__.num -= 1 461 462 for parity in (0, 1): 463 L = None 464 # We need in the order of 2**20 iterations here such that a 465 # typical 8MB stack would overflow without the trashcan. 466 for i in range(2**20): 467 L = PyList((L,)) 468 L.attr = i 469 if parity: 470 # Add one additional nesting layer 471 L = (L,) 472 self.assertGreater(PyList.num, 0) 473 del L 474 self.assertEqual(PyList.num, 0) 475 476 @unittest.skipIf(MISSING_C_DOCSTRINGS, 477 "Signature information for builtins requires docstrings") 478 def test_heap_ctype_doc_and_text_signature(self): 479 self.assertEqual(_testcapi.HeapDocCType.__doc__, "somedoc") 480 self.assertEqual(_testcapi.HeapDocCType.__text_signature__, "(arg1, arg2)") 481 482 def test_null_type_doc(self): 483 self.assertEqual(_testcapi.NullTpDocType.__doc__, None) 484 485 @suppress_immortalization() 486 def test_subclass_of_heap_gc_ctype_with_tpdealloc_decrefs_once(self): 487 class HeapGcCTypeSubclass(_testcapi.HeapGcCType): 488 def __init__(self): 489 self.value2 = 20 490 super().__init__() 491 492 subclass_instance = HeapGcCTypeSubclass() 493 type_refcnt = sys.getrefcount(HeapGcCTypeSubclass) 494 495 # Test that subclass instance was fully created 496 self.assertEqual(subclass_instance.value, 10) 497 self.assertEqual(subclass_instance.value2, 20) 498 499 # Test that the type reference count is only decremented once 500 del subclass_instance 501 self.assertEqual(type_refcnt - 1, sys.getrefcount(HeapGcCTypeSubclass)) 502 503 @suppress_immortalization() 504 def test_subclass_of_heap_gc_ctype_with_del_modifying_dunder_class_only_decrefs_once(self): 505 class A(_testcapi.HeapGcCType): 506 def __init__(self): 507 self.value2 = 20 508 super().__init__() 509 510 class B(A): 511 def __init__(self): 512 super().__init__() 513 514 def __del__(self): 515 self.__class__ = A 516 A.refcnt_in_del = sys.getrefcount(A) 517 B.refcnt_in_del = sys.getrefcount(B) 518 519 subclass_instance = B() 520 type_refcnt = sys.getrefcount(B) 521 new_type_refcnt = sys.getrefcount(A) 522 523 # Test that subclass instance was fully created 524 self.assertEqual(subclass_instance.value, 10) 525 self.assertEqual(subclass_instance.value2, 20) 526 527 del subclass_instance 528 529 # Test that setting __class__ modified the reference counts of the types 530 if support.Py_DEBUG: 531 # gh-89373: In debug mode, _Py_Dealloc() keeps a strong reference 532 # to the type while calling tp_dealloc() 533 self.assertEqual(type_refcnt, B.refcnt_in_del) 534 else: 535 self.assertEqual(type_refcnt - 1, B.refcnt_in_del) 536 self.assertEqual(new_type_refcnt + 1, A.refcnt_in_del) 537 538 # Test that the original type already has decreased its refcnt 539 self.assertEqual(type_refcnt - 1, sys.getrefcount(B)) 540 541 # Test that subtype_dealloc decref the newly assigned __class__ only once 542 self.assertEqual(new_type_refcnt, sys.getrefcount(A)) 543 544 def test_heaptype_with_dict(self): 545 inst = _testcapi.HeapCTypeWithDict() 546 inst.foo = 42 547 self.assertEqual(inst.foo, 42) 548 self.assertEqual(inst.dictobj, inst.__dict__) 549 self.assertEqual(inst.dictobj, {"foo": 42}) 550 551 inst = _testcapi.HeapCTypeWithDict() 552 self.assertEqual({}, inst.__dict__) 553 554 def test_heaptype_with_managed_dict(self): 555 inst = _testcapi.HeapCTypeWithManagedDict() 556 inst.foo = 42 557 self.assertEqual(inst.foo, 42) 558 self.assertEqual(inst.__dict__, {"foo": 42}) 559 560 inst = _testcapi.HeapCTypeWithManagedDict() 561 self.assertEqual({}, inst.__dict__) 562 563 a = _testcapi.HeapCTypeWithManagedDict() 564 b = _testcapi.HeapCTypeWithManagedDict() 565 a.b = b 566 b.a = a 567 del a, b 568 569 def test_sublclassing_managed_dict(self): 570 571 class C(_testcapi.HeapCTypeWithManagedDict): 572 pass 573 574 i = C() 575 i.spam = i 576 del i 577 578 def test_heaptype_with_negative_dict(self): 579 inst = _testcapi.HeapCTypeWithNegativeDict() 580 inst.foo = 42 581 self.assertEqual(inst.foo, 42) 582 self.assertEqual(inst.dictobj, inst.__dict__) 583 self.assertEqual(inst.dictobj, {"foo": 42}) 584 585 inst = _testcapi.HeapCTypeWithNegativeDict() 586 self.assertEqual({}, inst.__dict__) 587 588 def test_heaptype_with_weakref(self): 589 inst = _testcapi.HeapCTypeWithWeakref() 590 ref = weakref.ref(inst) 591 self.assertEqual(ref(), inst) 592 self.assertEqual(inst.weakreflist, ref) 593 594 def test_heaptype_with_managed_weakref(self): 595 inst = _testcapi.HeapCTypeWithManagedWeakref() 596 ref = weakref.ref(inst) 597 self.assertEqual(ref(), inst) 598 599 def test_sublclassing_managed_weakref(self): 600 601 class C(_testcapi.HeapCTypeWithManagedWeakref): 602 pass 603 604 inst = C() 605 ref = weakref.ref(inst) 606 self.assertEqual(ref(), inst) 607 608 def test_sublclassing_managed_both(self): 609 610 class C1(_testcapi.HeapCTypeWithManagedWeakref, _testcapi.HeapCTypeWithManagedDict): 611 pass 612 613 class C2(_testcapi.HeapCTypeWithManagedDict, _testcapi.HeapCTypeWithManagedWeakref): 614 pass 615 616 for cls in (C1, C2): 617 inst = cls() 618 ref = weakref.ref(inst) 619 self.assertEqual(ref(), inst) 620 inst.spam = inst 621 del inst 622 ref = weakref.ref(cls()) 623 self.assertIs(ref(), None) 624 625 def test_heaptype_with_buffer(self): 626 inst = _testcapi.HeapCTypeWithBuffer() 627 b = bytes(inst) 628 self.assertEqual(b, b"1234") 629 630 def test_c_subclass_of_heap_ctype_with_tpdealloc_decrefs_once(self): 631 subclass_instance = _testcapi.HeapCTypeSubclass() 632 type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclass) 633 634 # Test that subclass instance was fully created 635 self.assertEqual(subclass_instance.value, 10) 636 self.assertEqual(subclass_instance.value2, 20) 637 638 # Test that the type reference count is only decremented once 639 del subclass_instance 640 self.assertEqual(type_refcnt - 1, sys.getrefcount(_testcapi.HeapCTypeSubclass)) 641 642 def test_c_subclass_of_heap_ctype_with_del_modifying_dunder_class_only_decrefs_once(self): 643 subclass_instance = _testcapi.HeapCTypeSubclassWithFinalizer() 644 type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclassWithFinalizer) 645 new_type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclass) 646 647 # Test that subclass instance was fully created 648 self.assertEqual(subclass_instance.value, 10) 649 self.assertEqual(subclass_instance.value2, 20) 650 651 # The tp_finalize slot will set __class__ to HeapCTypeSubclass 652 del subclass_instance 653 654 # Test that setting __class__ modified the reference counts of the types 655 if support.Py_DEBUG: 656 # gh-89373: In debug mode, _Py_Dealloc() keeps a strong reference 657 # to the type while calling tp_dealloc() 658 self.assertEqual(type_refcnt, _testcapi.HeapCTypeSubclassWithFinalizer.refcnt_in_del) 659 else: 660 self.assertEqual(type_refcnt - 1, _testcapi.HeapCTypeSubclassWithFinalizer.refcnt_in_del) 661 self.assertEqual(new_type_refcnt + 1, _testcapi.HeapCTypeSubclass.refcnt_in_del) 662 663 # Test that the original type already has decreased its refcnt 664 self.assertEqual(type_refcnt - 1, sys.getrefcount(_testcapi.HeapCTypeSubclassWithFinalizer)) 665 666 # Test that subtype_dealloc decref the newly assigned __class__ only once 667 self.assertEqual(new_type_refcnt, sys.getrefcount(_testcapi.HeapCTypeSubclass)) 668 669 def test_heaptype_with_setattro(self): 670 obj = _testcapi.HeapCTypeSetattr() 671 self.assertEqual(obj.pvalue, 10) 672 obj.value = 12 673 self.assertEqual(obj.pvalue, 12) 674 del obj.value 675 self.assertEqual(obj.pvalue, 0) 676 677 def test_heaptype_with_custom_metaclass(self): 678 metaclass = _testcapi.HeapCTypeMetaclass 679 self.assertTrue(issubclass(metaclass, type)) 680 681 # Class creation from C 682 t = _testcapi.pytype_fromspec_meta(metaclass) 683 self.assertIsInstance(t, type) 684 self.assertEqual(t.__name__, "HeapCTypeViaMetaclass") 685 self.assertIs(type(t), metaclass) 686 687 # Class creation from Python 688 t = metaclass("PyClassViaMetaclass", (), {}) 689 self.assertIsInstance(t, type) 690 self.assertEqual(t.__name__, "PyClassViaMetaclass") 691 692 def test_heaptype_with_custom_metaclass_null_new(self): 693 metaclass = _testcapi.HeapCTypeMetaclassNullNew 694 695 self.assertTrue(issubclass(metaclass, type)) 696 697 # Class creation from C 698 t = _testcapi.pytype_fromspec_meta(metaclass) 699 self.assertIsInstance(t, type) 700 self.assertEqual(t.__name__, "HeapCTypeViaMetaclass") 701 self.assertIs(type(t), metaclass) 702 703 # Class creation from Python 704 with self.assertRaisesRegex(TypeError, "cannot create .* instances"): 705 metaclass("PyClassViaMetaclass", (), {}) 706 707 def test_heaptype_with_custom_metaclass_custom_new(self): 708 metaclass = _testcapi.HeapCTypeMetaclassCustomNew 709 710 self.assertTrue(issubclass(_testcapi.HeapCTypeMetaclassCustomNew, type)) 711 712 msg = "Metaclasses with custom tp_new are not supported." 713 with self.assertRaisesRegex(TypeError, msg): 714 t = _testcapi.pytype_fromspec_meta(metaclass) 715 716 def test_heaptype_with_custom_metaclass_deprecation(self): 717 metaclass = _testcapi.HeapCTypeMetaclassCustomNew 718 719 # gh-103968: a metaclass with custom tp_new is deprecated, but still 720 # allowed for functions that existed in 3.11 721 # (PyType_FromSpecWithBases is used here). 722 class Base(metaclass=metaclass): 723 pass 724 725 # Class creation from C 726 with warnings_helper.check_warnings( 727 ('.* _testcapi.Subclass .* custom tp_new.*in Python 3.14.*', DeprecationWarning), 728 ): 729 sub = _testcapi.make_type_with_base(Base) 730 self.assertTrue(issubclass(sub, Base)) 731 self.assertIsInstance(sub, metaclass) 732 733 def test_multiple_inheritance_ctypes_with_weakref_or_dict(self): 734 735 with self.assertRaises(TypeError): 736 class Both1(_testcapi.HeapCTypeWithWeakref, _testcapi.HeapCTypeWithDict): 737 pass 738 with self.assertRaises(TypeError): 739 class Both2(_testcapi.HeapCTypeWithDict, _testcapi.HeapCTypeWithWeakref): 740 pass 741 742 def test_multiple_inheritance_ctypes_with_weakref_or_dict_and_other_builtin(self): 743 744 with self.assertRaises(TypeError): 745 class C1(_testcapi.HeapCTypeWithDict, list): 746 pass 747 748 with self.assertRaises(TypeError): 749 class C2(_testcapi.HeapCTypeWithWeakref, list): 750 pass 751 752 class C3(_testcapi.HeapCTypeWithManagedDict, list): 753 pass 754 class C4(_testcapi.HeapCTypeWithManagedWeakref, list): 755 pass 756 757 inst = C3() 758 inst.append(0) 759 str(inst.__dict__) 760 761 inst = C4() 762 inst.append(0) 763 str(inst.__weakref__) 764 765 for cls in (_testcapi.HeapCTypeWithManagedDict, _testcapi.HeapCTypeWithManagedWeakref): 766 for cls2 in (_testcapi.HeapCTypeWithDict, _testcapi.HeapCTypeWithWeakref): 767 class S(cls, cls2): 768 pass 769 class B1(C3, cls): 770 pass 771 class B2(C4, cls): 772 pass 773 774 def test_pytype_fromspec_with_repeated_slots(self): 775 for variant in range(2): 776 with self.subTest(variant=variant): 777 with self.assertRaises(SystemError): 778 _testcapi.create_type_from_repeated_slots(variant) 779 780 @warnings_helper.ignore_warnings(category=DeprecationWarning) 781 def test_immutable_type_with_mutable_base(self): 782 # Add deprecation warning here so it's removed in 3.14 783 warnings._deprecated( 784 'creating immutable classes with mutable bases', remove=(3, 14)) 785 786 class MutableBase: 787 def meth(self): 788 return 'original' 789 790 with self.assertWarns(DeprecationWarning): 791 ImmutableSubclass = _testcapi.make_immutable_type_with_base( 792 MutableBase) 793 instance = ImmutableSubclass() 794 795 self.assertEqual(instance.meth(), 'original') 796 797 # Cannot override the static type's method 798 with self.assertRaisesRegex( 799 TypeError, 800 "cannot set 'meth' attribute of immutable type"): 801 ImmutableSubclass.meth = lambda self: 'overridden' 802 self.assertEqual(instance.meth(), 'original') 803 804 # Can change the method on the mutable base 805 MutableBase.meth = lambda self: 'changed' 806 self.assertEqual(instance.meth(), 'changed') 807 808 def test_pynumber_tobase(self): 809 from _testcapi import pynumber_tobase 810 small_number = 123 811 large_number = 2**64 812 class IDX: 813 def __init__(self, val): 814 self.val = val 815 def __index__(self): 816 return self.val 817 818 test_cases = ((2, '0b1111011', '0b10000000000000000000000000000000000000000000000000000000000000000'), 819 (8, '0o173', '0o2000000000000000000000'), 820 (10, '123', '18446744073709551616'), 821 (16, '0x7b', '0x10000000000000000')) 822 for base, small_target, large_target in test_cases: 823 with self.subTest(base=base, st=small_target, lt=large_target): 824 # Test for small number 825 self.assertEqual(pynumber_tobase(small_number, base), small_target) 826 self.assertEqual(pynumber_tobase(-small_number, base), '-' + small_target) 827 self.assertEqual(pynumber_tobase(IDX(small_number), base), small_target) 828 # Test for large number(out of range of a longlong,i.e.[-2**63, 2**63-1]) 829 self.assertEqual(pynumber_tobase(large_number, base), large_target) 830 self.assertEqual(pynumber_tobase(-large_number, base), '-' + large_target) 831 self.assertEqual(pynumber_tobase(IDX(large_number), base), large_target) 832 self.assertRaises(TypeError, pynumber_tobase, IDX(123.0), 10) 833 self.assertRaises(TypeError, pynumber_tobase, IDX('123'), 10) 834 self.assertRaises(TypeError, pynumber_tobase, 123.0, 10) 835 self.assertRaises(TypeError, pynumber_tobase, '123', 10) 836 self.assertRaises(SystemError, pynumber_tobase, 123, 0) 837 838 def test_pyobject_repr_from_null(self): 839 s = _testcapi.pyobject_repr_from_null() 840 self.assertEqual(s, '<NULL>') 841 842 def test_pyobject_str_from_null(self): 843 s = _testcapi.pyobject_str_from_null() 844 self.assertEqual(s, '<NULL>') 845 846 def test_pyobject_bytes_from_null(self): 847 s = _testcapi.pyobject_bytes_from_null() 848 self.assertEqual(s, b'<NULL>') 849 850 def test_Py_CompileString(self): 851 # Check that Py_CompileString respects the coding cookie 852 _compile = _testcapi.Py_CompileString 853 code = b"# -*- coding: latin1 -*-\nprint('\xc2\xa4')\n" 854 result = _compile(code) 855 expected = compile(code, "<string>", "exec") 856 self.assertEqual(result.co_consts, expected.co_consts) 857 858 def test_export_symbols(self): 859 # bpo-44133: Ensure that the "Py_FrozenMain" and 860 # "PyThread_get_thread_native_id" symbols are exported by the Python 861 # (directly by the binary, or via by the Python dynamic library). 862 ctypes = import_helper.import_module('ctypes') 863 names = [] 864 865 # Test if the PY_HAVE_THREAD_NATIVE_ID macro is defined 866 if hasattr(_thread, 'get_native_id'): 867 names.append('PyThread_get_thread_native_id') 868 869 # Python/frozenmain.c fails to build on Windows when the symbols are 870 # missing: 871 # - PyWinFreeze_ExeInit 872 # - PyWinFreeze_ExeTerm 873 # - PyInitFrozenExtensions 874 if os.name != 'nt': 875 names.append('Py_FrozenMain') 876 877 for name in names: 878 with self.subTest(name=name): 879 self.assertTrue(hasattr(ctypes.pythonapi, name)) 880 881 def test_clear_managed_dict(self): 882 883 class C: 884 def __init__(self): 885 self.a = 1 886 887 c = C() 888 _testcapi.clear_managed_dict(c) 889 self.assertEqual(c.__dict__, {}) 890 c = C() 891 self.assertEqual(c.__dict__, {'a':1}) 892 _testcapi.clear_managed_dict(c) 893 self.assertEqual(c.__dict__, {}) 894 895 def test_eval_get_func_name(self): 896 def function_example(): ... 897 898 class A: 899 def method_example(self): ... 900 901 self.assertEqual(_testcapi.eval_get_func_name(function_example), 902 "function_example") 903 self.assertEqual(_testcapi.eval_get_func_name(A.method_example), 904 "method_example") 905 self.assertEqual(_testcapi.eval_get_func_name(A().method_example), 906 "method_example") 907 self.assertEqual(_testcapi.eval_get_func_name(sum), "sum") # c function 908 self.assertEqual(_testcapi.eval_get_func_name(A), "type") 909 910 def test_eval_get_func_desc(self): 911 def function_example(): ... 912 913 class A: 914 def method_example(self): ... 915 916 self.assertEqual(_testcapi.eval_get_func_desc(function_example), 917 "()") 918 self.assertEqual(_testcapi.eval_get_func_desc(A.method_example), 919 "()") 920 self.assertEqual(_testcapi.eval_get_func_desc(A().method_example), 921 "()") 922 self.assertEqual(_testcapi.eval_get_func_desc(sum), "()") # c function 923 self.assertEqual(_testcapi.eval_get_func_desc(A), " object") 924 925 def test_function_get_code(self): 926 import types 927 928 def some(): 929 pass 930 931 code = _testcapi.function_get_code(some) 932 self.assertIsInstance(code, types.CodeType) 933 self.assertEqual(code, some.__code__) 934 935 with self.assertRaises(SystemError): 936 _testcapi.function_get_code(None) # not a function 937 938 def test_function_get_globals(self): 939 def some(): 940 pass 941 942 globals_ = _testcapi.function_get_globals(some) 943 self.assertIsInstance(globals_, dict) 944 self.assertEqual(globals_, some.__globals__) 945 946 with self.assertRaises(SystemError): 947 _testcapi.function_get_globals(None) # not a function 948 949 def test_function_get_module(self): 950 def some(): 951 pass 952 953 module = _testcapi.function_get_module(some) 954 self.assertIsInstance(module, str) 955 self.assertEqual(module, some.__module__) 956 957 with self.assertRaises(SystemError): 958 _testcapi.function_get_module(None) # not a function 959 960 def test_function_get_defaults(self): 961 def some( 962 pos_only1, pos_only2='p', 963 /, 964 zero=0, optional=None, 965 *, 966 kw1, 967 kw2=True, 968 ): 969 pass 970 971 defaults = _testcapi.function_get_defaults(some) 972 self.assertEqual(defaults, ('p', 0, None)) 973 self.assertEqual(defaults, some.__defaults__) 974 975 with self.assertRaises(SystemError): 976 _testcapi.function_get_defaults(None) # not a function 977 978 def test_function_set_defaults(self): 979 def some( 980 pos_only1, pos_only2='p', 981 /, 982 zero=0, optional=None, 983 *, 984 kw1, 985 kw2=True, 986 ): 987 pass 988 989 old_defaults = ('p', 0, None) 990 self.assertEqual(_testcapi.function_get_defaults(some), old_defaults) 991 self.assertEqual(some.__defaults__, old_defaults) 992 993 with self.assertRaises(SystemError): 994 _testcapi.function_set_defaults(some, 1) # not tuple or None 995 self.assertEqual(_testcapi.function_get_defaults(some), old_defaults) 996 self.assertEqual(some.__defaults__, old_defaults) 997 998 with self.assertRaises(SystemError): 999 _testcapi.function_set_defaults(1, ()) # not a function 1000 self.assertEqual(_testcapi.function_get_defaults(some), old_defaults) 1001 self.assertEqual(some.__defaults__, old_defaults) 1002 1003 new_defaults = ('q', 1, None) 1004 _testcapi.function_set_defaults(some, new_defaults) 1005 self.assertEqual(_testcapi.function_get_defaults(some), new_defaults) 1006 self.assertEqual(some.__defaults__, new_defaults) 1007 1008 # Empty tuple is fine: 1009 new_defaults = () 1010 _testcapi.function_set_defaults(some, new_defaults) 1011 self.assertEqual(_testcapi.function_get_defaults(some), new_defaults) 1012 self.assertEqual(some.__defaults__, new_defaults) 1013 1014 class tuplesub(tuple): ... # tuple subclasses must work 1015 1016 new_defaults = tuplesub(((1, 2), ['a', 'b'], None)) 1017 _testcapi.function_set_defaults(some, new_defaults) 1018 self.assertEqual(_testcapi.function_get_defaults(some), new_defaults) 1019 self.assertEqual(some.__defaults__, new_defaults) 1020 1021 # `None` is special, it sets `defaults` to `NULL`, 1022 # it needs special handling in `_testcapi`: 1023 _testcapi.function_set_defaults(some, None) 1024 self.assertEqual(_testcapi.function_get_defaults(some), None) 1025 self.assertEqual(some.__defaults__, None) 1026 1027 def test_function_get_kw_defaults(self): 1028 def some( 1029 pos_only1, pos_only2='p', 1030 /, 1031 zero=0, optional=None, 1032 *, 1033 kw1, 1034 kw2=True, 1035 ): 1036 pass 1037 1038 defaults = _testcapi.function_get_kw_defaults(some) 1039 self.assertEqual(defaults, {'kw2': True}) 1040 self.assertEqual(defaults, some.__kwdefaults__) 1041 1042 with self.assertRaises(SystemError): 1043 _testcapi.function_get_kw_defaults(None) # not a function 1044 1045 def test_function_set_kw_defaults(self): 1046 def some( 1047 pos_only1, pos_only2='p', 1048 /, 1049 zero=0, optional=None, 1050 *, 1051 kw1, 1052 kw2=True, 1053 ): 1054 pass 1055 1056 old_defaults = {'kw2': True} 1057 self.assertEqual(_testcapi.function_get_kw_defaults(some), old_defaults) 1058 self.assertEqual(some.__kwdefaults__, old_defaults) 1059 1060 with self.assertRaises(SystemError): 1061 _testcapi.function_set_kw_defaults(some, 1) # not dict or None 1062 self.assertEqual(_testcapi.function_get_kw_defaults(some), old_defaults) 1063 self.assertEqual(some.__kwdefaults__, old_defaults) 1064 1065 with self.assertRaises(SystemError): 1066 _testcapi.function_set_kw_defaults(1, {}) # not a function 1067 self.assertEqual(_testcapi.function_get_kw_defaults(some), old_defaults) 1068 self.assertEqual(some.__kwdefaults__, old_defaults) 1069 1070 new_defaults = {'kw2': (1, 2, 3)} 1071 _testcapi.function_set_kw_defaults(some, new_defaults) 1072 self.assertEqual(_testcapi.function_get_kw_defaults(some), new_defaults) 1073 self.assertEqual(some.__kwdefaults__, new_defaults) 1074 1075 # Empty dict is fine: 1076 new_defaults = {} 1077 _testcapi.function_set_kw_defaults(some, new_defaults) 1078 self.assertEqual(_testcapi.function_get_kw_defaults(some), new_defaults) 1079 self.assertEqual(some.__kwdefaults__, new_defaults) 1080 1081 class dictsub(dict): ... # dict subclasses must work 1082 1083 new_defaults = dictsub({'kw2': None}) 1084 _testcapi.function_set_kw_defaults(some, new_defaults) 1085 self.assertEqual(_testcapi.function_get_kw_defaults(some), new_defaults) 1086 self.assertEqual(some.__kwdefaults__, new_defaults) 1087 1088 # `None` is special, it sets `kwdefaults` to `NULL`, 1089 # it needs special handling in `_testcapi`: 1090 _testcapi.function_set_kw_defaults(some, None) 1091 self.assertEqual(_testcapi.function_get_kw_defaults(some), None) 1092 self.assertEqual(some.__kwdefaults__, None) 1093 1094 def test_unstable_gc_new_with_extra_data(self): 1095 class Data(_testcapi.ObjExtraData): 1096 __slots__ = ('x', 'y') 1097 1098 d = Data() 1099 d.x = 10 1100 d.y = 20 1101 d.extra = 30 1102 self.assertEqual(d.x, 10) 1103 self.assertEqual(d.y, 20) 1104 self.assertEqual(d.extra, 30) 1105 del d.extra 1106 self.assertIsNone(d.extra) 1107 1108 def test_get_type_name(self): 1109 class MyType: 1110 pass 1111 1112 from _testcapi import ( 1113 get_type_name, get_type_qualname, 1114 get_type_fullyqualname, get_type_module_name) 1115 1116 from collections import OrderedDict 1117 ht = _testcapi.get_heaptype_for_name() 1118 for cls, fullname, modname, qualname, name in ( 1119 (int, 1120 'int', 1121 'builtins', 1122 'int', 1123 'int'), 1124 (OrderedDict, 1125 'collections.OrderedDict', 1126 'collections', 1127 'OrderedDict', 1128 'OrderedDict'), 1129 (ht, 1130 '_testcapi.HeapTypeNameType', 1131 '_testcapi', 1132 'HeapTypeNameType', 1133 'HeapTypeNameType'), 1134 (MyType, 1135 f'{__name__}.CAPITest.test_get_type_name.<locals>.MyType', 1136 __name__, 1137 'CAPITest.test_get_type_name.<locals>.MyType', 1138 'MyType'), 1139 ): 1140 with self.subTest(cls=repr(cls)): 1141 self.assertEqual(get_type_fullyqualname(cls), fullname) 1142 self.assertEqual(get_type_module_name(cls), modname) 1143 self.assertEqual(get_type_qualname(cls), qualname) 1144 self.assertEqual(get_type_name(cls), name) 1145 1146 # override __module__ 1147 ht.__module__ = 'test_module' 1148 self.assertEqual(get_type_fullyqualname(ht), 'test_module.HeapTypeNameType') 1149 self.assertEqual(get_type_module_name(ht), 'test_module') 1150 self.assertEqual(get_type_qualname(ht), 'HeapTypeNameType') 1151 self.assertEqual(get_type_name(ht), 'HeapTypeNameType') 1152 1153 # override __name__ and __qualname__ 1154 MyType.__name__ = 'my_name' 1155 MyType.__qualname__ = 'my_qualname' 1156 self.assertEqual(get_type_fullyqualname(MyType), f'{__name__}.my_qualname') 1157 self.assertEqual(get_type_module_name(MyType), __name__) 1158 self.assertEqual(get_type_qualname(MyType), 'my_qualname') 1159 self.assertEqual(get_type_name(MyType), 'my_name') 1160 1161 # override also __module__ 1162 MyType.__module__ = 'my_module' 1163 self.assertEqual(get_type_fullyqualname(MyType), 'my_module.my_qualname') 1164 self.assertEqual(get_type_module_name(MyType), 'my_module') 1165 self.assertEqual(get_type_qualname(MyType), 'my_qualname') 1166 self.assertEqual(get_type_name(MyType), 'my_name') 1167 1168 # PyType_GetFullyQualifiedName() ignores the module if it's "builtins" 1169 # or "__main__" of it is not a string 1170 MyType.__module__ = 'builtins' 1171 self.assertEqual(get_type_fullyqualname(MyType), 'my_qualname') 1172 MyType.__module__ = '__main__' 1173 self.assertEqual(get_type_fullyqualname(MyType), 'my_qualname') 1174 MyType.__module__ = 123 1175 self.assertEqual(get_type_fullyqualname(MyType), 'my_qualname') 1176 1177 1178 def test_gen_get_code(self): 1179 def genf(): yield 1180 gen = genf() 1181 self.assertEqual(_testcapi.gen_get_code(gen), gen.gi_code) 1182 1183 def test_pyeval_getlocals(self): 1184 # Test PyEval_GetLocals() 1185 x = 1 1186 self.assertEqual(_testcapi.pyeval_getlocals(), 1187 {'self': self, 1188 'x': 1}) 1189 1190 y = 2 1191 self.assertEqual(_testcapi.pyeval_getlocals(), 1192 {'self': self, 1193 'x': 1, 1194 'y': 2}) 1195 1196 1197@requires_limited_api 1198class TestHeapTypeRelative(unittest.TestCase): 1199 """Test API for extending opaque types (PEP 697)""" 1200 1201 @requires_limited_api 1202 def test_heaptype_relative_sizes(self): 1203 # Test subclassing using "relative" basicsize, see PEP 697 1204 def check(extra_base_size, extra_size): 1205 Base, Sub, instance, data_ptr, data_offset, data_size = ( 1206 _testlimitedcapi.make_sized_heaptypes( 1207 extra_base_size, -extra_size)) 1208 1209 # no alignment shenanigans when inheriting directly 1210 if extra_size == 0: 1211 self.assertEqual(Base.__basicsize__, Sub.__basicsize__) 1212 self.assertEqual(data_size, 0) 1213 1214 else: 1215 # The following offsets should be in increasing order: 1216 offsets = [ 1217 (0, 'start of object'), 1218 (Base.__basicsize__, 'end of base data'), 1219 (data_offset, 'subclass data'), 1220 (data_offset + extra_size, 'end of requested subcls data'), 1221 (data_offset + data_size, 'end of reserved subcls data'), 1222 (Sub.__basicsize__, 'end of object'), 1223 ] 1224 ordered_offsets = sorted(offsets, key=operator.itemgetter(0)) 1225 self.assertEqual( 1226 offsets, ordered_offsets, 1227 msg=f'Offsets not in expected order, got: {ordered_offsets}') 1228 1229 # end of reserved subcls data == end of object 1230 self.assertEqual(Sub.__basicsize__, data_offset + data_size) 1231 1232 # we don't reserve (requested + alignment) or more data 1233 self.assertLess(data_size - extra_size, 1234 _testlimitedcapi.ALIGNOF_MAX_ALIGN_T) 1235 1236 # The offsets/sizes we calculated should be aligned. 1237 self.assertEqual(data_offset % _testlimitedcapi.ALIGNOF_MAX_ALIGN_T, 0) 1238 self.assertEqual(data_size % _testlimitedcapi.ALIGNOF_MAX_ALIGN_T, 0) 1239 1240 sizes = sorted({0, 1, 2, 3, 4, 7, 8, 123, 1241 object.__basicsize__, 1242 object.__basicsize__-1, 1243 object.__basicsize__+1}) 1244 for extra_base_size in sizes: 1245 for extra_size in sizes: 1246 args = dict(extra_base_size=extra_base_size, 1247 extra_size=extra_size) 1248 with self.subTest(**args): 1249 check(**args) 1250 1251 def test_HeapCCollection(self): 1252 """Make sure HeapCCollection works properly by itself""" 1253 collection = _testcapi.HeapCCollection(1, 2, 3) 1254 self.assertEqual(list(collection), [1, 2, 3]) 1255 1256 def test_heaptype_inherit_itemsize(self): 1257 """Test HeapCCollection subclasses work properly""" 1258 sizes = sorted({0, 1, 2, 3, 4, 7, 8, 123, 1259 object.__basicsize__, 1260 object.__basicsize__-1, 1261 object.__basicsize__+1}) 1262 for extra_size in sizes: 1263 with self.subTest(extra_size=extra_size): 1264 Sub = _testlimitedcapi.subclass_var_heaptype( 1265 _testcapi.HeapCCollection, -extra_size, 0, 0) 1266 collection = Sub(1, 2, 3) 1267 collection.set_data_to_3s() 1268 1269 self.assertEqual(list(collection), [1, 2, 3]) 1270 mem = collection.get_data() 1271 self.assertGreaterEqual(len(mem), extra_size) 1272 self.assertTrue(set(mem) <= {3}, f'got {mem!r}') 1273 1274 def test_heaptype_invalid_inheritance(self): 1275 with self.assertRaises(SystemError, 1276 msg="Cannot extend variable-size class without " 1277 + "Py_TPFLAGS_ITEMS_AT_END"): 1278 _testlimitedcapi.subclass_heaptype(int, -8, 0) 1279 1280 def test_heaptype_relative_members(self): 1281 """Test HeapCCollection subclasses work properly""" 1282 sizes = sorted({0, 1, 2, 3, 4, 7, 8, 123, 1283 object.__basicsize__, 1284 object.__basicsize__-1, 1285 object.__basicsize__+1}) 1286 for extra_base_size in sizes: 1287 for extra_size in sizes: 1288 for offset in sizes: 1289 with self.subTest(extra_base_size=extra_base_size, extra_size=extra_size, offset=offset): 1290 if offset < extra_size: 1291 Sub = _testlimitedcapi.make_heaptype_with_member( 1292 extra_base_size, -extra_size, offset, True) 1293 Base = Sub.mro()[1] 1294 instance = Sub() 1295 self.assertEqual(instance.memb, instance.get_memb()) 1296 instance.set_memb(13) 1297 self.assertEqual(instance.memb, instance.get_memb()) 1298 self.assertEqual(instance.get_memb(), 13) 1299 instance.memb = 14 1300 self.assertEqual(instance.memb, instance.get_memb()) 1301 self.assertEqual(instance.get_memb(), 14) 1302 self.assertGreaterEqual(instance.get_memb_offset(), Base.__basicsize__) 1303 self.assertLess(instance.get_memb_offset(), Sub.__basicsize__) 1304 with self.assertRaises(SystemError): 1305 instance.get_memb_relative() 1306 with self.assertRaises(SystemError): 1307 instance.set_memb_relative(0) 1308 else: 1309 with self.assertRaises(SystemError): 1310 Sub = _testlimitedcapi.make_heaptype_with_member( 1311 extra_base_size, -extra_size, offset, True) 1312 with self.assertRaises(SystemError): 1313 Sub = _testlimitedcapi.make_heaptype_with_member( 1314 extra_base_size, extra_size, offset, True) 1315 with self.subTest(extra_base_size=extra_base_size, extra_size=extra_size): 1316 with self.assertRaises(SystemError): 1317 Sub = _testlimitedcapi.make_heaptype_with_member( 1318 extra_base_size, -extra_size, -1, True) 1319 1320 def test_heaptype_relative_members_errors(self): 1321 with self.assertRaisesRegex( 1322 SystemError, 1323 r"With Py_RELATIVE_OFFSET, basicsize must be negative"): 1324 _testlimitedcapi.make_heaptype_with_member(0, 1234, 0, True) 1325 with self.assertRaisesRegex( 1326 SystemError, r"Member offset out of range \(0\.\.-basicsize\)"): 1327 _testlimitedcapi.make_heaptype_with_member(0, -8, 1234, True) 1328 with self.assertRaisesRegex( 1329 SystemError, r"Member offset out of range \(0\.\.-basicsize\)"): 1330 _testlimitedcapi.make_heaptype_with_member(0, -8, -1, True) 1331 1332 Sub = _testlimitedcapi.make_heaptype_with_member(0, -8, 0, True) 1333 instance = Sub() 1334 with self.assertRaisesRegex( 1335 SystemError, r"PyMember_GetOne used with Py_RELATIVE_OFFSET"): 1336 instance.get_memb_relative() 1337 with self.assertRaisesRegex( 1338 SystemError, r"PyMember_SetOne used with Py_RELATIVE_OFFSET"): 1339 instance.set_memb_relative(0) 1340 1341 def test_pyobject_getitemdata_error(self): 1342 """Test PyObject_GetItemData fails on unsupported types""" 1343 with self.assertRaises(TypeError): 1344 # None is not variable-length 1345 _testcapi.pyobject_getitemdata(None) 1346 with self.assertRaises(TypeError): 1347 # int is variable-length, but doesn't have the 1348 # Py_TPFLAGS_ITEMS_AT_END layout (and flag) 1349 _testcapi.pyobject_getitemdata(0) 1350 1351 1352 def test_function_get_closure(self): 1353 from types import CellType 1354 1355 def regular_function(): ... 1356 def unused_one_level(arg1): 1357 def inner(arg2, arg3): ... 1358 return inner 1359 def unused_two_levels(arg1, arg2): 1360 def decorator(arg3, arg4): 1361 def inner(arg5, arg6): ... 1362 return inner 1363 return decorator 1364 def with_one_level(arg1): 1365 def inner(arg2, arg3): 1366 return arg1 + arg2 + arg3 1367 return inner 1368 def with_two_levels(arg1, arg2): 1369 def decorator(arg3, arg4): 1370 def inner(arg5, arg6): 1371 return arg1 + arg2 + arg3 + arg4 + arg5 + arg6 1372 return inner 1373 return decorator 1374 1375 # Functions without closures: 1376 self.assertIsNone(_testcapi.function_get_closure(regular_function)) 1377 self.assertIsNone(regular_function.__closure__) 1378 1379 func = unused_one_level(1) 1380 closure = _testcapi.function_get_closure(func) 1381 self.assertIsNone(closure) 1382 self.assertIsNone(func.__closure__) 1383 1384 func = unused_two_levels(1, 2)(3, 4) 1385 closure = _testcapi.function_get_closure(func) 1386 self.assertIsNone(closure) 1387 self.assertIsNone(func.__closure__) 1388 1389 # Functions with closures: 1390 func = with_one_level(5) 1391 closure = _testcapi.function_get_closure(func) 1392 self.assertEqual(closure, func.__closure__) 1393 self.assertIsInstance(closure, tuple) 1394 self.assertEqual(len(closure), 1) 1395 self.assertEqual(len(closure), len(func.__code__.co_freevars)) 1396 self.assertTrue(all(isinstance(cell, CellType) for cell in closure)) 1397 self.assertTrue(closure[0].cell_contents, 5) 1398 1399 func = with_two_levels(1, 2)(3, 4) 1400 closure = _testcapi.function_get_closure(func) 1401 self.assertEqual(closure, func.__closure__) 1402 self.assertIsInstance(closure, tuple) 1403 self.assertEqual(len(closure), 4) 1404 self.assertEqual(len(closure), len(func.__code__.co_freevars)) 1405 self.assertTrue(all(isinstance(cell, CellType) for cell in closure)) 1406 self.assertEqual([cell.cell_contents for cell in closure], 1407 [1, 2, 3, 4]) 1408 1409 def test_function_get_closure_error(self): 1410 with self.assertRaises(SystemError): 1411 _testcapi.function_get_closure(1) 1412 with self.assertRaises(SystemError): 1413 _testcapi.function_get_closure(None) 1414 1415 def test_function_set_closure(self): 1416 from types import CellType 1417 1418 def function_without_closure(): ... 1419 def function_with_closure(arg): 1420 def inner(): 1421 return arg 1422 return inner 1423 1424 func = function_without_closure 1425 _testcapi.function_set_closure(func, (CellType(1), CellType(1))) 1426 closure = _testcapi.function_get_closure(func) 1427 self.assertEqual([c.cell_contents for c in closure], [1, 1]) 1428 self.assertEqual([c.cell_contents for c in func.__closure__], [1, 1]) 1429 1430 func = function_with_closure(1) 1431 _testcapi.function_set_closure(func, 1432 (CellType(1), CellType(2), CellType(3))) 1433 closure = _testcapi.function_get_closure(func) 1434 self.assertEqual([c.cell_contents for c in closure], [1, 2, 3]) 1435 self.assertEqual([c.cell_contents for c in func.__closure__], [1, 2, 3]) 1436 1437 def test_function_set_closure_none(self): 1438 def function_without_closure(): ... 1439 def function_with_closure(arg): 1440 def inner(): 1441 return arg 1442 return inner 1443 1444 _testcapi.function_set_closure(function_without_closure, None) 1445 self.assertIsNone( 1446 _testcapi.function_get_closure(function_without_closure)) 1447 self.assertIsNone(function_without_closure.__closure__) 1448 1449 _testcapi.function_set_closure(function_with_closure, None) 1450 self.assertIsNone( 1451 _testcapi.function_get_closure(function_with_closure)) 1452 self.assertIsNone(function_with_closure.__closure__) 1453 1454 def test_function_set_closure_errors(self): 1455 def function_without_closure(): ... 1456 1457 with self.assertRaises(SystemError): 1458 _testcapi.function_set_closure(None, ()) # not a function 1459 1460 with self.assertRaises(SystemError): 1461 _testcapi.function_set_closure(function_without_closure, 1) 1462 self.assertIsNone(function_without_closure.__closure__) # no change 1463 1464 # NOTE: this works, but goes against the docs: 1465 _testcapi.function_set_closure(function_without_closure, (1, 2)) 1466 self.assertEqual( 1467 _testcapi.function_get_closure(function_without_closure), (1, 2)) 1468 self.assertEqual(function_without_closure.__closure__, (1, 2)) 1469 1470 1471class TestPendingCalls(unittest.TestCase): 1472 1473 # See the comment in ceval.c (at the "handle_eval_breaker" label) 1474 # about when pending calls get run. This is especially relevant 1475 # here for creating deterministic tests. 1476 1477 def main_pendingcalls_submit(self, l, n): 1478 def callback(): 1479 #this function can be interrupted by thread switching so let's 1480 #use an atomic operation 1481 l.append(None) 1482 1483 for i in range(n): 1484 time.sleep(random.random()*0.02) #0.01 secs on average 1485 #try submitting callback until successful. 1486 #rely on regular interrupt to flush queue if we are 1487 #unsuccessful. 1488 while True: 1489 if _testcapi._pending_threadfunc(callback): 1490 break 1491 1492 def pendingcalls_submit(self, l, n, *, main=True, ensure=False): 1493 def callback(): 1494 #this function can be interrupted by thread switching so let's 1495 #use an atomic operation 1496 l.append(None) 1497 1498 if main: 1499 return _testcapi._pending_threadfunc(callback, n, 1500 blocking=False, 1501 ensure_added=ensure) 1502 else: 1503 return _testinternalcapi.pending_threadfunc(callback, n, 1504 blocking=False, 1505 ensure_added=ensure) 1506 1507 def pendingcalls_wait(self, l, numadded, context = None): 1508 #now, stick around until l[0] has grown to 10 1509 count = 0 1510 while len(l) != numadded: 1511 #this busy loop is where we expect to be interrupted to 1512 #run our callbacks. Note that some callbacks are only run on the 1513 #main thread 1514 if False and support.verbose: 1515 print("(%i)"%(len(l),),) 1516 for i in range(1000): 1517 a = i*i 1518 if context and not context.event.is_set(): 1519 continue 1520 count += 1 1521 self.assertTrue(count < 10000, 1522 "timeout waiting for %i callbacks, got %i"%(numadded, len(l))) 1523 if False and support.verbose: 1524 print("(%i)"%(len(l),)) 1525 1526 @threading_helper.requires_working_threading() 1527 def test_main_pendingcalls_threaded(self): 1528 1529 #do every callback on a separate thread 1530 n = 32 #total callbacks 1531 threads = [] 1532 class foo(object):pass 1533 context = foo() 1534 context.l = [] 1535 context.n = 2 #submits per thread 1536 context.nThreads = n // context.n 1537 context.nFinished = 0 1538 context.lock = threading.Lock() 1539 context.event = threading.Event() 1540 1541 threads = [threading.Thread(target=self.main_pendingcalls_thread, 1542 args=(context,)) 1543 for i in range(context.nThreads)] 1544 with threading_helper.start_threads(threads): 1545 self.pendingcalls_wait(context.l, n, context) 1546 1547 def main_pendingcalls_thread(self, context): 1548 try: 1549 self.main_pendingcalls_submit(context.l, context.n) 1550 finally: 1551 with context.lock: 1552 context.nFinished += 1 1553 nFinished = context.nFinished 1554 if False and support.verbose: 1555 print("finished threads: ", nFinished) 1556 if nFinished == context.nThreads: 1557 context.event.set() 1558 1559 def test_main_pendingcalls_non_threaded(self): 1560 #again, just using the main thread, likely they will all be dispatched at 1561 #once. It is ok to ask for too many, because we loop until we find a slot. 1562 #the loop can be interrupted to dispatch. 1563 #there are only 32 dispatch slots, so we go for twice that! 1564 l = [] 1565 n = 64 1566 self.main_pendingcalls_submit(l, n) 1567 self.pendingcalls_wait(l, n) 1568 1569 def test_max_pending(self): 1570 with self.subTest('main-only'): 1571 maxpending = 32 1572 1573 l = [] 1574 added = self.pendingcalls_submit(l, 1, main=True) 1575 self.pendingcalls_wait(l, added) 1576 self.assertEqual(added, 1) 1577 1578 l = [] 1579 added = self.pendingcalls_submit(l, maxpending, main=True) 1580 self.pendingcalls_wait(l, added) 1581 self.assertEqual(added, maxpending) 1582 1583 l = [] 1584 added = self.pendingcalls_submit(l, maxpending+1, main=True) 1585 self.pendingcalls_wait(l, added) 1586 self.assertEqual(added, maxpending) 1587 1588 with self.subTest('not main-only'): 1589 # Per-interpreter pending calls has a much higher limit 1590 # on how many may be pending at a time. 1591 maxpending = 300 1592 1593 l = [] 1594 added = self.pendingcalls_submit(l, 1, main=False) 1595 self.pendingcalls_wait(l, added) 1596 self.assertEqual(added, 1) 1597 1598 l = [] 1599 added = self.pendingcalls_submit(l, maxpending, main=False) 1600 self.pendingcalls_wait(l, added) 1601 self.assertEqual(added, maxpending) 1602 1603 l = [] 1604 added = self.pendingcalls_submit(l, maxpending+1, main=False) 1605 self.pendingcalls_wait(l, added) 1606 self.assertEqual(added, maxpending) 1607 1608 class PendingTask(types.SimpleNamespace): 1609 1610 _add_pending = _testinternalcapi.pending_threadfunc 1611 1612 def __init__(self, req, taskid=None, notify_done=None): 1613 self.id = taskid 1614 self.req = req 1615 self.notify_done = notify_done 1616 1617 self.creator_tid = threading.get_ident() 1618 self.requester_tid = None 1619 self.runner_tid = None 1620 self.result = None 1621 1622 def run(self): 1623 assert self.result is None 1624 self.runner_tid = threading.get_ident() 1625 self._run() 1626 if self.notify_done is not None: 1627 self.notify_done() 1628 1629 def _run(self): 1630 self.result = self.req 1631 1632 def run_in_pending_call(self, worker_tids): 1633 assert self._add_pending is _testinternalcapi.pending_threadfunc 1634 self.requester_tid = threading.get_ident() 1635 def callback(): 1636 assert self.result is None 1637 # It can be tricky to control which thread handles 1638 # the eval breaker, so we take a naive approach to 1639 # make sure. 1640 if threading.get_ident() not in worker_tids: 1641 self._add_pending(callback, ensure_added=True) 1642 return 1643 self.run() 1644 self._add_pending(callback, ensure_added=True) 1645 1646 def create_thread(self, worker_tids): 1647 return threading.Thread( 1648 target=self.run_in_pending_call, 1649 args=(worker_tids,), 1650 ) 1651 1652 def wait_for_result(self): 1653 while self.result is None: 1654 time.sleep(0.01) 1655 1656 @threading_helper.requires_working_threading() 1657 def test_subthreads_can_handle_pending_calls(self): 1658 payload = 'Spam spam spam spam. Lovely spam! Wonderful spam!' 1659 1660 task = self.PendingTask(payload) 1661 def do_the_work(): 1662 tid = threading.get_ident() 1663 t = task.create_thread({tid}) 1664 with threading_helper.start_threads([t]): 1665 task.wait_for_result() 1666 t = threading.Thread(target=do_the_work) 1667 with threading_helper.start_threads([t]): 1668 pass 1669 1670 self.assertEqual(task.result, payload) 1671 1672 @threading_helper.requires_working_threading() 1673 def test_many_subthreads_can_handle_pending_calls(self): 1674 main_tid = threading.get_ident() 1675 self.assertEqual(threading.main_thread().ident, main_tid) 1676 1677 # We can't use queue.Queue since it isn't reentrant relative 1678 # to pending calls. 1679 _queue = deque() 1680 _active = deque() 1681 _done_lock = threading.Lock() 1682 def queue_put(task): 1683 _queue.append(task) 1684 _active.append(True) 1685 def queue_get(): 1686 try: 1687 task = _queue.popleft() 1688 except IndexError: 1689 raise queue.Empty 1690 return task 1691 def queue_task_done(): 1692 _active.pop() 1693 if not _active: 1694 try: 1695 _done_lock.release() 1696 except RuntimeError: 1697 assert not _done_lock.locked() 1698 def queue_empty(): 1699 return not _queue 1700 def queue_join(): 1701 _done_lock.acquire() 1702 _done_lock.release() 1703 1704 tasks = [] 1705 for i in range(20): 1706 task = self.PendingTask( 1707 req=f'request {i}', 1708 taskid=i, 1709 notify_done=queue_task_done, 1710 ) 1711 tasks.append(task) 1712 queue_put(task) 1713 # This will be released once all the tasks have finished. 1714 _done_lock.acquire() 1715 1716 def add_tasks(worker_tids): 1717 while True: 1718 if done: 1719 return 1720 try: 1721 task = queue_get() 1722 except queue.Empty: 1723 break 1724 task.run_in_pending_call(worker_tids) 1725 1726 done = False 1727 def run_tasks(): 1728 while not queue_empty(): 1729 if done: 1730 return 1731 time.sleep(0.01) 1732 # Give the worker a chance to handle any remaining pending calls. 1733 while not done: 1734 time.sleep(0.01) 1735 1736 # Start the workers and wait for them to finish. 1737 worker_threads = [threading.Thread(target=run_tasks) 1738 for _ in range(3)] 1739 with threading_helper.start_threads(worker_threads): 1740 try: 1741 # Add a pending call for each task. 1742 worker_tids = [t.ident for t in worker_threads] 1743 threads = [threading.Thread(target=add_tasks, args=(worker_tids,)) 1744 for _ in range(3)] 1745 with threading_helper.start_threads(threads): 1746 try: 1747 pass 1748 except BaseException: 1749 done = True 1750 raise # re-raise 1751 # Wait for the pending calls to finish. 1752 queue_join() 1753 # Notify the workers that they can stop. 1754 done = True 1755 except BaseException: 1756 done = True 1757 raise # re-raise 1758 runner_tids = [t.runner_tid for t in tasks] 1759 1760 self.assertNotIn(main_tid, runner_tids) 1761 for task in tasks: 1762 with self.subTest(f'task {task.id}'): 1763 self.assertNotEqual(task.requester_tid, main_tid) 1764 self.assertNotEqual(task.requester_tid, task.runner_tid) 1765 self.assertNotIn(task.requester_tid, runner_tids) 1766 1767 @requires_subinterpreters 1768 def test_isolated_subinterpreter(self): 1769 # We exercise the most important permutations. 1770 1771 # This test relies on pending calls getting called 1772 # (eval breaker tripped) at each loop iteration 1773 # and at each call. 1774 1775 maxtext = 250 1776 main_interpid = 0 1777 interpid = _interpreters.create() 1778 self.addCleanup(lambda: _interpreters.destroy(interpid)) 1779 _interpreters.run_string(interpid, f"""if True: 1780 import json 1781 import os 1782 import threading 1783 import time 1784 import _testinternalcapi 1785 from test.support import threading_helper 1786 """) 1787 1788 def create_pipe(): 1789 r, w = os.pipe() 1790 self.addCleanup(lambda: os.close(r)) 1791 self.addCleanup(lambda: os.close(w)) 1792 return r, w 1793 1794 with self.subTest('add in main, run in subinterpreter'): 1795 r_ready, w_ready = create_pipe() 1796 r_done, w_done= create_pipe() 1797 timeout = time.time() + 30 # seconds 1798 1799 def do_work(): 1800 _interpreters.run_string(interpid, f"""if True: 1801 # Wait until this interp has handled the pending call. 1802 waiting = False 1803 done = False 1804 def wait(os_read=os.read): 1805 global done, waiting 1806 waiting = True 1807 os_read({r_done}, 1) 1808 done = True 1809 t = threading.Thread(target=wait) 1810 with threading_helper.start_threads([t]): 1811 while not waiting: 1812 pass 1813 os.write({w_ready}, b'\\0') 1814 # Loop to trigger the eval breaker. 1815 while not done: 1816 time.sleep(0.01) 1817 if time.time() > {timeout}: 1818 raise Exception('timed out!') 1819 """) 1820 t = threading.Thread(target=do_work) 1821 with threading_helper.start_threads([t]): 1822 os.read(r_ready, 1) 1823 # Add the pending call and wait for it to finish. 1824 actual = _testinternalcapi.pending_identify(interpid) 1825 # Signal the subinterpreter to stop. 1826 os.write(w_done, b'\0') 1827 1828 self.assertEqual(actual, int(interpid)) 1829 1830 with self.subTest('add in main, run in subinterpreter sub-thread'): 1831 r_ready, w_ready = create_pipe() 1832 r_done, w_done= create_pipe() 1833 timeout = time.time() + 30 # seconds 1834 1835 def do_work(): 1836 _interpreters.run_string(interpid, f"""if True: 1837 waiting = False 1838 done = False 1839 def subthread(): 1840 while not waiting: 1841 pass 1842 os.write({w_ready}, b'\\0') 1843 # Loop to trigger the eval breaker. 1844 while not done: 1845 time.sleep(0.01) 1846 if time.time() > {timeout}: 1847 raise Exception('timed out!') 1848 t = threading.Thread(target=subthread) 1849 with threading_helper.start_threads([t]): 1850 # Wait until this interp has handled the pending call. 1851 waiting = True 1852 os.read({r_done}, 1) 1853 done = True 1854 """) 1855 t = threading.Thread(target=do_work) 1856 with threading_helper.start_threads([t]): 1857 os.read(r_ready, 1) 1858 # Add the pending call and wait for it to finish. 1859 actual = _testinternalcapi.pending_identify(interpid) 1860 # Signal the subinterpreter to stop. 1861 os.write(w_done, b'\0') 1862 1863 self.assertEqual(actual, int(interpid)) 1864 1865 with self.subTest('add in subinterpreter, run in main'): 1866 r_ready, w_ready = create_pipe() 1867 r_done, w_done= create_pipe() 1868 r_data, w_data= create_pipe() 1869 timeout = time.time() + 30 # seconds 1870 1871 def add_job(): 1872 os.read(r_ready, 1) 1873 _interpreters.run_string(interpid, f"""if True: 1874 # Add the pending call and wait for it to finish. 1875 actual = _testinternalcapi.pending_identify({main_interpid}) 1876 # Signal the subinterpreter to stop. 1877 os.write({w_done}, b'\\0') 1878 os.write({w_data}, actual.to_bytes(1, 'little')) 1879 """) 1880 # Wait until this interp has handled the pending call. 1881 waiting = False 1882 done = False 1883 def wait(os_read=os.read): 1884 nonlocal done, waiting 1885 waiting = True 1886 os_read(r_done, 1) 1887 done = True 1888 t1 = threading.Thread(target=add_job) 1889 t2 = threading.Thread(target=wait) 1890 with threading_helper.start_threads([t1, t2]): 1891 while not waiting: 1892 pass 1893 os.write(w_ready, b'\0') 1894 # Loop to trigger the eval breaker. 1895 while not done: 1896 time.sleep(0.01) 1897 if time.time() > timeout: 1898 raise Exception('timed out!') 1899 text = os.read(r_data, 1) 1900 actual = int.from_bytes(text, 'little') 1901 1902 self.assertEqual(actual, int(main_interpid)) 1903 1904 with self.subTest('add in subinterpreter, run in sub-thread'): 1905 r_ready, w_ready = create_pipe() 1906 r_done, w_done= create_pipe() 1907 r_data, w_data= create_pipe() 1908 timeout = time.time() + 30 # seconds 1909 1910 def add_job(): 1911 os.read(r_ready, 1) 1912 _interpreters.run_string(interpid, f"""if True: 1913 # Add the pending call and wait for it to finish. 1914 actual = _testinternalcapi.pending_identify({main_interpid}) 1915 # Signal the subinterpreter to stop. 1916 os.write({w_done}, b'\\0') 1917 os.write({w_data}, actual.to_bytes(1, 'little')) 1918 """) 1919 # Wait until this interp has handled the pending call. 1920 waiting = False 1921 done = False 1922 def wait(os_read=os.read): 1923 nonlocal done, waiting 1924 waiting = True 1925 os_read(r_done, 1) 1926 done = True 1927 def subthread(): 1928 while not waiting: 1929 pass 1930 os.write(w_ready, b'\0') 1931 # Loop to trigger the eval breaker. 1932 while not done: 1933 time.sleep(0.01) 1934 if time.time() > timeout: 1935 raise Exception('timed out!') 1936 t1 = threading.Thread(target=add_job) 1937 t2 = threading.Thread(target=wait) 1938 t3 = threading.Thread(target=subthread) 1939 with threading_helper.start_threads([t1, t2, t3]): 1940 pass 1941 text = os.read(r_data, 1) 1942 actual = int.from_bytes(text, 'little') 1943 1944 self.assertEqual(actual, int(main_interpid)) 1945 1946 # XXX We can't use the rest until gh-105716 is fixed. 1947 return 1948 1949 with self.subTest('add in subinterpreter, run in subinterpreter sub-thread'): 1950 r_ready, w_ready = create_pipe() 1951 r_done, w_done= create_pipe() 1952 r_data, w_data= create_pipe() 1953 timeout = time.time() + 30 # seconds 1954 1955 def do_work(): 1956 _interpreters.run_string(interpid, f"""if True: 1957 waiting = False 1958 done = False 1959 def subthread(): 1960 while not waiting: 1961 pass 1962 os.write({w_ready}, b'\\0') 1963 # Loop to trigger the eval breaker. 1964 while not done: 1965 time.sleep(0.01) 1966 if time.time() > {timeout}: 1967 raise Exception('timed out!') 1968 t = threading.Thread(target=subthread) 1969 with threading_helper.start_threads([t]): 1970 # Wait until this interp has handled the pending call. 1971 waiting = True 1972 os.read({r_done}, 1) 1973 done = True 1974 """) 1975 t = threading.Thread(target=do_work) 1976 #with threading_helper.start_threads([t]): 1977 t.start() 1978 if True: 1979 os.read(r_ready, 1) 1980 _interpreters.run_string(interpid, f"""if True: 1981 # Add the pending call and wait for it to finish. 1982 actual = _testinternalcapi.pending_identify({interpid}) 1983 # Signal the subinterpreter to stop. 1984 os.write({w_done}, b'\\0') 1985 os.write({w_data}, actual.to_bytes(1, 'little')) 1986 """) 1987 t.join() 1988 text = os.read(r_data, 1) 1989 actual = int.from_bytes(text, 'little') 1990 1991 self.assertEqual(actual, int(interpid)) 1992 1993 1994class SubinterpreterTest(unittest.TestCase): 1995 1996 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 1997 def test_subinterps(self): 1998 import builtins 1999 r, w = os.pipe() 2000 code = """if 1: 2001 import sys, builtins, pickle 2002 with open({:d}, "wb") as f: 2003 pickle.dump(id(sys.modules), f) 2004 pickle.dump(id(builtins), f) 2005 """.format(w) 2006 with open(r, "rb") as f: 2007 ret = support.run_in_subinterp(code) 2008 self.assertEqual(ret, 0) 2009 self.assertNotEqual(pickle.load(f), id(sys.modules)) 2010 self.assertNotEqual(pickle.load(f), id(builtins)) 2011 2012 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 2013 def test_subinterps_recent_language_features(self): 2014 r, w = os.pipe() 2015 code = """if 1: 2016 import pickle 2017 with open({:d}, "wb") as f: 2018 2019 @(lambda x:x) # Py 3.9 2020 def noop(x): return x 2021 2022 a = (b := f'1{{2}}3') + noop('x') # Py 3.8 (:=) / 3.6 (f'') 2023 2024 async def foo(arg): return await arg # Py 3.5 2025 2026 pickle.dump(dict(a=a, b=b), f) 2027 """.format(w) 2028 2029 with open(r, "rb") as f: 2030 ret = support.run_in_subinterp(code) 2031 self.assertEqual(ret, 0) 2032 self.assertEqual(pickle.load(f), {'a': '123x', 'b': '123'}) 2033 2034 def test_py_config_isoloated_per_interpreter(self): 2035 # A config change in one interpreter must not leak to out to others. 2036 # 2037 # This test could verify ANY config value, it just happens to have been 2038 # written around the time of int_max_str_digits. Refactoring is okay. 2039 code = """if 1: 2040 import sys, _testinternalcapi 2041 2042 # Any config value would do, this happens to be the one being 2043 # double checked at the time this test was written. 2044 config = _testinternalcapi.get_config() 2045 config['int_max_str_digits'] = 55555 2046 config['parse_argv'] = 0 2047 _testinternalcapi.set_config(config) 2048 sub_value = _testinternalcapi.get_config()['int_max_str_digits'] 2049 assert sub_value == 55555, sub_value 2050 """ 2051 before_config = _testinternalcapi.get_config() 2052 assert before_config['int_max_str_digits'] != 55555 2053 self.assertEqual(support.run_in_subinterp(code), 0, 2054 'subinterp code failure, check stderr.') 2055 after_config = _testinternalcapi.get_config() 2056 self.assertIsNot( 2057 before_config, after_config, 2058 "Expected get_config() to return a new dict on each call") 2059 self.assertEqual(before_config, after_config, 2060 "CAUTION: Tests executed after this may be " 2061 "running under an altered config.") 2062 # try:...finally: calling set_config(before_config) not done 2063 # as that results in sys.argv, sys.path, and sys.warnoptions 2064 # "being modified by test_capi" per test.regrtest. So if this 2065 # test fails, assume that the environment in this process may 2066 # be altered and suspect. 2067 2068 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 2069 def test_configured_settings(self): 2070 """ 2071 The config with which an interpreter is created corresponds 2072 1-to-1 with the new interpreter's settings. This test verifies 2073 that they match. 2074 """ 2075 2076 OBMALLOC = 1<<5 2077 EXTENSIONS = 1<<8 2078 THREADS = 1<<10 2079 DAEMON_THREADS = 1<<11 2080 FORK = 1<<15 2081 EXEC = 1<<16 2082 ALL_FLAGS = (OBMALLOC | FORK | EXEC | THREADS | DAEMON_THREADS 2083 | EXTENSIONS); 2084 2085 features = [ 2086 'obmalloc', 2087 'fork', 2088 'exec', 2089 'threads', 2090 'daemon_threads', 2091 'extensions', 2092 'own_gil', 2093 ] 2094 kwlist = [f'allow_{n}' for n in features] 2095 kwlist[0] = 'use_main_obmalloc' 2096 kwlist[-2] = 'check_multi_interp_extensions' 2097 kwlist[-1] = 'own_gil' 2098 2099 expected_to_work = { 2100 (True, True, True, True, True, True, True): 2101 (ALL_FLAGS, True), 2102 (True, False, False, False, False, False, False): 2103 (OBMALLOC, False), 2104 (False, False, False, True, False, True, False): 2105 (THREADS | EXTENSIONS, False), 2106 } 2107 2108 expected_to_fail = { 2109 (False, False, False, False, False, False, False), 2110 } 2111 2112 # gh-117649: The free-threaded build does not currently allow 2113 # setting check_multi_interp_extensions to False. 2114 if Py_GIL_DISABLED: 2115 for config in list(expected_to_work.keys()): 2116 kwargs = dict(zip(kwlist, config)) 2117 if not kwargs['check_multi_interp_extensions']: 2118 del expected_to_work[config] 2119 expected_to_fail.add(config) 2120 2121 # expected to work 2122 for config, expected in expected_to_work.items(): 2123 kwargs = dict(zip(kwlist, config)) 2124 exp_flags, exp_gil = expected 2125 expected = { 2126 'feature_flags': exp_flags, 2127 'own_gil': exp_gil, 2128 } 2129 with self.subTest(config): 2130 r, w = os.pipe() 2131 script = textwrap.dedent(f''' 2132 import _testinternalcapi, json, os 2133 settings = _testinternalcapi.get_interp_settings() 2134 with os.fdopen({w}, "w") as stdin: 2135 json.dump(settings, stdin) 2136 ''') 2137 with os.fdopen(r) as stdout: 2138 ret = support.run_in_subinterp_with_config(script, **kwargs) 2139 self.assertEqual(ret, 0) 2140 out = stdout.read() 2141 settings = json.loads(out) 2142 2143 self.assertEqual(settings, expected) 2144 2145 # expected to fail 2146 for config in expected_to_fail: 2147 kwargs = dict(zip(kwlist, config)) 2148 with self.subTest(config): 2149 script = textwrap.dedent(f''' 2150 import _testinternalcapi 2151 _testinternalcapi.get_interp_settings() 2152 raise NotImplementedError('unreachable') 2153 ''') 2154 with self.assertRaises(_interpreters.InterpreterError): 2155 support.run_in_subinterp_with_config(script, **kwargs) 2156 2157 @unittest.skipIf(_testsinglephase is None, "test requires _testsinglephase module") 2158 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 2159 # gh-117649: The free-threaded build does not currently allow overriding 2160 # the check_multi_interp_extensions setting. 2161 @expected_failure_if_gil_disabled() 2162 def test_overridden_setting_extensions_subinterp_check(self): 2163 """ 2164 PyInterpreterConfig.check_multi_interp_extensions can be overridden 2165 with PyInterpreterState.override_multi_interp_extensions_check. 2166 This verifies that the override works but does not modify 2167 the underlying setting. 2168 """ 2169 2170 OBMALLOC = 1<<5 2171 EXTENSIONS = 1<<8 2172 THREADS = 1<<10 2173 DAEMON_THREADS = 1<<11 2174 FORK = 1<<15 2175 EXEC = 1<<16 2176 BASE_FLAGS = OBMALLOC | FORK | EXEC | THREADS | DAEMON_THREADS 2177 base_kwargs = { 2178 'use_main_obmalloc': True, 2179 'allow_fork': True, 2180 'allow_exec': True, 2181 'allow_threads': True, 2182 'allow_daemon_threads': True, 2183 'own_gil': False, 2184 } 2185 2186 def check(enabled, override): 2187 kwargs = dict( 2188 base_kwargs, 2189 check_multi_interp_extensions=enabled, 2190 ) 2191 flags = BASE_FLAGS | EXTENSIONS if enabled else BASE_FLAGS 2192 settings = { 2193 'feature_flags': flags, 2194 'own_gil': False, 2195 } 2196 2197 expected = { 2198 'requested': override, 2199 'override__initial': 0, 2200 'override_after': override, 2201 'override_restored': 0, 2202 # The override should not affect the config or settings. 2203 'settings__initial': settings, 2204 'settings_after': settings, 2205 'settings_restored': settings, 2206 # These are the most likely values to be wrong. 2207 'allowed__initial': not enabled, 2208 'allowed_after': not ((override > 0) if override else enabled), 2209 'allowed_restored': not enabled, 2210 } 2211 2212 r, w = os.pipe() 2213 if Py_GIL_DISABLED: 2214 # gh-117649: The test fails before `w` is closed 2215 self.addCleanup(os.close, w) 2216 script = textwrap.dedent(f''' 2217 from test.test_capi.check_config import run_singlephase_check 2218 run_singlephase_check({override}, {w}) 2219 ''') 2220 with os.fdopen(r) as stdout: 2221 ret = support.run_in_subinterp_with_config(script, **kwargs) 2222 self.assertEqual(ret, 0) 2223 out = stdout.read() 2224 results = json.loads(out) 2225 2226 self.assertEqual(results, expected) 2227 2228 self.maxDiff = None 2229 2230 # setting: check disabled 2231 with self.subTest('config: check disabled; override: disabled'): 2232 check(False, -1) 2233 with self.subTest('config: check disabled; override: use config'): 2234 check(False, 0) 2235 with self.subTest('config: check disabled; override: enabled'): 2236 check(False, 1) 2237 2238 # setting: check enabled 2239 with self.subTest('config: check enabled; override: disabled'): 2240 check(True, -1) 2241 with self.subTest('config: check enabled; override: use config'): 2242 check(True, 0) 2243 with self.subTest('config: check enabled; override: enabled'): 2244 check(True, 1) 2245 2246 def test_mutate_exception(self): 2247 """ 2248 Exceptions saved in global module state get shared between 2249 individual module instances. This test checks whether or not 2250 a change in one interpreter's module gets reflected into the 2251 other ones. 2252 """ 2253 import binascii 2254 2255 support.run_in_subinterp("import binascii; binascii.Error.foobar = 'foobar'") 2256 2257 self.assertFalse(hasattr(binascii.Error, "foobar")) 2258 2259 @unittest.skipIf(_testmultiphase is None, "test requires _testmultiphase module") 2260 # gh-117649: The free-threaded build does not currently support sharing 2261 # extension module state between interpreters. 2262 @expected_failure_if_gil_disabled() 2263 def test_module_state_shared_in_global(self): 2264 """ 2265 bpo-44050: Extension module state should be shared between interpreters 2266 when it doesn't support sub-interpreters. 2267 """ 2268 r, w = os.pipe() 2269 self.addCleanup(os.close, r) 2270 self.addCleanup(os.close, w) 2271 2272 # Apple extensions must be distributed as frameworks. This requires 2273 # a specialist loader. 2274 if support.is_apple_mobile: 2275 loader = "AppleFrameworkLoader" 2276 else: 2277 loader = "ExtensionFileLoader" 2278 2279 script = textwrap.dedent(f""" 2280 import importlib.machinery 2281 import importlib.util 2282 import os 2283 2284 fullname = '_test_module_state_shared' 2285 origin = importlib.util.find_spec('_testmultiphase').origin 2286 loader = importlib.machinery.{loader}(fullname, origin) 2287 spec = importlib.util.spec_from_loader(fullname, loader) 2288 module = importlib.util.module_from_spec(spec) 2289 attr_id = str(id(module.Error)).encode() 2290 2291 os.write({w}, attr_id) 2292 """) 2293 exec(script) 2294 main_attr_id = os.read(r, 100) 2295 2296 ret = support.run_in_subinterp(script) 2297 self.assertEqual(ret, 0) 2298 subinterp_attr_id = os.read(r, 100) 2299 self.assertEqual(main_attr_id, subinterp_attr_id) 2300 2301 2302@requires_subinterpreters 2303class InterpreterConfigTests(unittest.TestCase): 2304 2305 supported = { 2306 'isolated': types.SimpleNamespace( 2307 use_main_obmalloc=False, 2308 allow_fork=False, 2309 allow_exec=False, 2310 allow_threads=True, 2311 allow_daemon_threads=False, 2312 check_multi_interp_extensions=True, 2313 gil='own', 2314 ), 2315 'legacy': types.SimpleNamespace( 2316 use_main_obmalloc=True, 2317 allow_fork=True, 2318 allow_exec=True, 2319 allow_threads=True, 2320 allow_daemon_threads=True, 2321 check_multi_interp_extensions=bool(Py_GIL_DISABLED), 2322 gil='shared', 2323 ), 2324 'empty': types.SimpleNamespace( 2325 use_main_obmalloc=False, 2326 allow_fork=False, 2327 allow_exec=False, 2328 allow_threads=False, 2329 allow_daemon_threads=False, 2330 check_multi_interp_extensions=False, 2331 gil='default', 2332 ), 2333 } 2334 gil_supported = ['default', 'shared', 'own'] 2335 2336 def iter_all_configs(self): 2337 for use_main_obmalloc in (True, False): 2338 for allow_fork in (True, False): 2339 for allow_exec in (True, False): 2340 for allow_threads in (True, False): 2341 for allow_daemon in (True, False): 2342 for checkext in (True, False): 2343 for gil in ('shared', 'own', 'default'): 2344 yield types.SimpleNamespace( 2345 use_main_obmalloc=use_main_obmalloc, 2346 allow_fork=allow_fork, 2347 allow_exec=allow_exec, 2348 allow_threads=allow_threads, 2349 allow_daemon_threads=allow_daemon, 2350 check_multi_interp_extensions=checkext, 2351 gil=gil, 2352 ) 2353 2354 def assert_ns_equal(self, ns1, ns2, msg=None): 2355 # This is mostly copied from TestCase.assertDictEqual. 2356 self.assertEqual(type(ns1), type(ns2)) 2357 if ns1 == ns2: 2358 return 2359 2360 import difflib 2361 import pprint 2362 from unittest.util import _common_shorten_repr 2363 standardMsg = '%s != %s' % _common_shorten_repr(ns1, ns2) 2364 diff = ('\n' + '\n'.join(difflib.ndiff( 2365 pprint.pformat(vars(ns1)).splitlines(), 2366 pprint.pformat(vars(ns2)).splitlines()))) 2367 diff = f'namespace({diff})' 2368 standardMsg = self._truncateMessage(standardMsg, diff) 2369 self.fail(self._formatMessage(msg, standardMsg)) 2370 2371 def test_predefined_config(self): 2372 def check(name, expected): 2373 expected = self.supported[expected] 2374 args = (name,) if name else () 2375 2376 config1 = _interpreters.new_config(*args) 2377 self.assert_ns_equal(config1, expected) 2378 self.assertIsNot(config1, expected) 2379 2380 config2 = _interpreters.new_config(*args) 2381 self.assert_ns_equal(config2, expected) 2382 self.assertIsNot(config2, expected) 2383 self.assertIsNot(config2, config1) 2384 2385 with self.subTest('default'): 2386 check(None, 'isolated') 2387 2388 for name in self.supported: 2389 with self.subTest(name): 2390 check(name, name) 2391 2392 def test_update_from_dict(self): 2393 for name, vanilla in self.supported.items(): 2394 with self.subTest(f'noop ({name})'): 2395 expected = vanilla 2396 overrides = vars(vanilla) 2397 config = _interpreters.new_config(name, **overrides) 2398 self.assert_ns_equal(config, expected) 2399 2400 with self.subTest(f'change all ({name})'): 2401 overrides = {k: not v for k, v in vars(vanilla).items()} 2402 for gil in self.gil_supported: 2403 if vanilla.gil == gil: 2404 continue 2405 overrides['gil'] = gil 2406 expected = types.SimpleNamespace(**overrides) 2407 config = _interpreters.new_config( 2408 name, **overrides) 2409 self.assert_ns_equal(config, expected) 2410 2411 # Override individual fields. 2412 for field, old in vars(vanilla).items(): 2413 if field == 'gil': 2414 values = [v for v in self.gil_supported if v != old] 2415 else: 2416 values = [not old] 2417 for val in values: 2418 with self.subTest(f'{name}.{field} ({old!r} -> {val!r})'): 2419 overrides = {field: val} 2420 expected = types.SimpleNamespace( 2421 **dict(vars(vanilla), **overrides), 2422 ) 2423 config = _interpreters.new_config( 2424 name, **overrides) 2425 self.assert_ns_equal(config, expected) 2426 2427 with self.subTest('unsupported field'): 2428 for name in self.supported: 2429 with self.assertRaises(ValueError): 2430 _interpreters.new_config(name, spam=True) 2431 2432 # Bad values for bool fields. 2433 for field, value in vars(self.supported['empty']).items(): 2434 if field == 'gil': 2435 continue 2436 assert isinstance(value, bool) 2437 for value in [1, '', 'spam', 1.0, None, object()]: 2438 with self.subTest(f'unsupported value ({field}={value!r})'): 2439 with self.assertRaises(TypeError): 2440 _interpreters.new_config(**{field: value}) 2441 2442 # Bad values for .gil. 2443 for value in [True, 1, 1.0, None, object()]: 2444 with self.subTest(f'unsupported value(gil={value!r})'): 2445 with self.assertRaises(TypeError): 2446 _interpreters.new_config(gil=value) 2447 for value in ['', 'spam']: 2448 with self.subTest(f'unsupported value (gil={value!r})'): 2449 with self.assertRaises(ValueError): 2450 _interpreters.new_config(gil=value) 2451 2452 def test_interp_init(self): 2453 questionable = [ 2454 # strange 2455 dict( 2456 allow_fork=True, 2457 allow_exec=False, 2458 ), 2459 dict( 2460 gil='shared', 2461 use_main_obmalloc=False, 2462 ), 2463 # risky 2464 dict( 2465 allow_fork=True, 2466 allow_threads=True, 2467 ), 2468 # ought to be invalid? 2469 dict( 2470 allow_threads=False, 2471 allow_daemon_threads=True, 2472 ), 2473 dict( 2474 gil='own', 2475 use_main_obmalloc=True, 2476 ), 2477 ] 2478 invalid = [ 2479 dict( 2480 use_main_obmalloc=False, 2481 check_multi_interp_extensions=False 2482 ), 2483 ] 2484 if Py_GIL_DISABLED: 2485 invalid.append(dict(check_multi_interp_extensions=False)) 2486 def match(config, override_cases): 2487 ns = vars(config) 2488 for overrides in override_cases: 2489 if dict(ns, **overrides) == ns: 2490 return True 2491 return False 2492 2493 def check(config): 2494 script = 'pass' 2495 rc = _testinternalcapi.run_in_subinterp_with_config(script, config) 2496 self.assertEqual(rc, 0) 2497 2498 for config in self.iter_all_configs(): 2499 if config.gil == 'default': 2500 continue 2501 if match(config, invalid): 2502 with self.subTest(f'invalid: {config}'): 2503 with self.assertRaises(_interpreters.InterpreterError): 2504 check(config) 2505 elif match(config, questionable): 2506 with self.subTest(f'questionable: {config}'): 2507 check(config) 2508 else: 2509 with self.subTest(f'valid: {config}'): 2510 check(config) 2511 2512 def test_get_config(self): 2513 @contextlib.contextmanager 2514 def new_interp(config): 2515 interpid = _interpreters.create(config, reqrefs=False) 2516 try: 2517 yield interpid 2518 finally: 2519 try: 2520 _interpreters.destroy(interpid) 2521 except _interpreters.InterpreterNotFoundError: 2522 pass 2523 2524 with self.subTest('main'): 2525 expected = _interpreters.new_config('legacy') 2526 expected.gil = 'own' 2527 if Py_GIL_DISABLED: 2528 expected.check_multi_interp_extensions = False 2529 interpid, *_ = _interpreters.get_main() 2530 config = _interpreters.get_config(interpid) 2531 self.assert_ns_equal(config, expected) 2532 2533 with self.subTest('isolated'): 2534 expected = _interpreters.new_config('isolated') 2535 with new_interp('isolated') as interpid: 2536 config = _interpreters.get_config(interpid) 2537 self.assert_ns_equal(config, expected) 2538 2539 with self.subTest('legacy'): 2540 expected = _interpreters.new_config('legacy') 2541 with new_interp('legacy') as interpid: 2542 config = _interpreters.get_config(interpid) 2543 self.assert_ns_equal(config, expected) 2544 2545 with self.subTest('custom'): 2546 orig = _interpreters.new_config( 2547 'empty', 2548 use_main_obmalloc=True, 2549 gil='shared', 2550 check_multi_interp_extensions=bool(Py_GIL_DISABLED), 2551 ) 2552 with new_interp(orig) as interpid: 2553 config = _interpreters.get_config(interpid) 2554 self.assert_ns_equal(config, orig) 2555 2556 2557@requires_subinterpreters 2558class InterpreterIDTests(unittest.TestCase): 2559 2560 def add_interp_cleanup(self, interpid): 2561 def ensure_destroyed(): 2562 try: 2563 _interpreters.destroy(interpid) 2564 except _interpreters.InterpreterNotFoundError: 2565 pass 2566 self.addCleanup(ensure_destroyed) 2567 2568 def new_interpreter(self): 2569 id = _interpreters.create() 2570 self.add_interp_cleanup(id) 2571 return id 2572 2573 def test_conversion_int(self): 2574 convert = _testinternalcapi.normalize_interp_id 2575 interpid = convert(10) 2576 self.assertEqual(interpid, 10) 2577 2578 def test_conversion_coerced(self): 2579 convert = _testinternalcapi.normalize_interp_id 2580 class MyInt(str): 2581 def __index__(self): 2582 return 10 2583 interpid = convert(MyInt()) 2584 self.assertEqual(interpid, 10) 2585 2586 def test_conversion_from_interpreter(self): 2587 convert = _testinternalcapi.normalize_interp_id 2588 interpid = self.new_interpreter() 2589 converted = convert(interpid) 2590 self.assertEqual(converted, interpid) 2591 2592 def test_conversion_bad(self): 2593 convert = _testinternalcapi.normalize_interp_id 2594 2595 for badid in [ 2596 object(), 2597 10.0, 2598 '10', 2599 b'10', 2600 ]: 2601 with self.subTest(f'bad: {badid!r}'): 2602 with self.assertRaises(TypeError): 2603 convert(badid) 2604 2605 badid = -1 2606 with self.subTest(f'bad: {badid!r}'): 2607 with self.assertRaises(ValueError): 2608 convert(badid) 2609 2610 badid = 2**64 2611 with self.subTest(f'bad: {badid!r}'): 2612 with self.assertRaises(OverflowError): 2613 convert(badid) 2614 2615 def test_lookup_exists(self): 2616 interpid = self.new_interpreter() 2617 self.assertTrue( 2618 _testinternalcapi.interpreter_exists(interpid)) 2619 2620 def test_lookup_does_not_exist(self): 2621 interpid = _testinternalcapi.unused_interpreter_id() 2622 self.assertFalse( 2623 _testinternalcapi.interpreter_exists(interpid)) 2624 2625 def test_lookup_destroyed(self): 2626 interpid = _interpreters.create() 2627 _interpreters.destroy(interpid) 2628 self.assertFalse( 2629 _testinternalcapi.interpreter_exists(interpid)) 2630 2631 def get_refcount_helpers(self): 2632 return ( 2633 _testinternalcapi.get_interpreter_refcount, 2634 (lambda id: _interpreters.incref(id, implieslink=False)), 2635 _interpreters.decref, 2636 ) 2637 2638 def test_linked_lifecycle_does_not_exist(self): 2639 exists = _testinternalcapi.interpreter_exists 2640 is_linked = _testinternalcapi.interpreter_refcount_linked 2641 link = _testinternalcapi.link_interpreter_refcount 2642 unlink = _testinternalcapi.unlink_interpreter_refcount 2643 get_refcount, incref, decref = self.get_refcount_helpers() 2644 2645 with self.subTest('never existed'): 2646 interpid = _testinternalcapi.unused_interpreter_id() 2647 self.assertFalse( 2648 exists(interpid)) 2649 with self.assertRaises(_interpreters.InterpreterNotFoundError): 2650 is_linked(interpid) 2651 with self.assertRaises(_interpreters.InterpreterNotFoundError): 2652 link(interpid) 2653 with self.assertRaises(_interpreters.InterpreterNotFoundError): 2654 unlink(interpid) 2655 with self.assertRaises(_interpreters.InterpreterNotFoundError): 2656 get_refcount(interpid) 2657 with self.assertRaises(_interpreters.InterpreterNotFoundError): 2658 incref(interpid) 2659 with self.assertRaises(_interpreters.InterpreterNotFoundError): 2660 decref(interpid) 2661 2662 with self.subTest('destroyed'): 2663 interpid = _interpreters.create() 2664 _interpreters.destroy(interpid) 2665 self.assertFalse( 2666 exists(interpid)) 2667 with self.assertRaises(_interpreters.InterpreterNotFoundError): 2668 is_linked(interpid) 2669 with self.assertRaises(_interpreters.InterpreterNotFoundError): 2670 link(interpid) 2671 with self.assertRaises(_interpreters.InterpreterNotFoundError): 2672 unlink(interpid) 2673 with self.assertRaises(_interpreters.InterpreterNotFoundError): 2674 get_refcount(interpid) 2675 with self.assertRaises(_interpreters.InterpreterNotFoundError): 2676 incref(interpid) 2677 with self.assertRaises(_interpreters.InterpreterNotFoundError): 2678 decref(interpid) 2679 2680 def test_linked_lifecycle_initial(self): 2681 is_linked = _testinternalcapi.interpreter_refcount_linked 2682 get_refcount, _, _ = self.get_refcount_helpers() 2683 2684 # A new interpreter will start out not linked, with a refcount of 0. 2685 interpid = self.new_interpreter() 2686 linked = is_linked(interpid) 2687 refcount = get_refcount(interpid) 2688 2689 self.assertFalse(linked) 2690 self.assertEqual(refcount, 0) 2691 2692 def test_linked_lifecycle_never_linked(self): 2693 exists = _testinternalcapi.interpreter_exists 2694 is_linked = _testinternalcapi.interpreter_refcount_linked 2695 get_refcount, incref, decref = self.get_refcount_helpers() 2696 2697 interpid = self.new_interpreter() 2698 2699 # Incref will not automatically link it. 2700 incref(interpid) 2701 self.assertFalse( 2702 is_linked(interpid)) 2703 self.assertEqual( 2704 1, get_refcount(interpid)) 2705 2706 # It isn't linked so it isn't destroyed. 2707 decref(interpid) 2708 self.assertTrue( 2709 exists(interpid)) 2710 self.assertFalse( 2711 is_linked(interpid)) 2712 self.assertEqual( 2713 0, get_refcount(interpid)) 2714 2715 def test_linked_lifecycle_link_unlink(self): 2716 exists = _testinternalcapi.interpreter_exists 2717 is_linked = _testinternalcapi.interpreter_refcount_linked 2718 link = _testinternalcapi.link_interpreter_refcount 2719 unlink = _testinternalcapi.unlink_interpreter_refcount 2720 2721 interpid = self.new_interpreter() 2722 2723 # Linking at refcount 0 does not destroy the interpreter. 2724 link(interpid) 2725 self.assertTrue( 2726 exists(interpid)) 2727 self.assertTrue( 2728 is_linked(interpid)) 2729 2730 # Unlinking at refcount 0 does not destroy the interpreter. 2731 unlink(interpid) 2732 self.assertTrue( 2733 exists(interpid)) 2734 self.assertFalse( 2735 is_linked(interpid)) 2736 2737 def test_linked_lifecycle_link_incref_decref(self): 2738 exists = _testinternalcapi.interpreter_exists 2739 is_linked = _testinternalcapi.interpreter_refcount_linked 2740 link = _testinternalcapi.link_interpreter_refcount 2741 get_refcount, incref, decref = self.get_refcount_helpers() 2742 2743 interpid = self.new_interpreter() 2744 2745 # Linking it will not change the refcount. 2746 link(interpid) 2747 self.assertTrue( 2748 is_linked(interpid)) 2749 self.assertEqual( 2750 0, get_refcount(interpid)) 2751 2752 # Decref with a refcount of 0 is not allowed. 2753 incref(interpid) 2754 self.assertEqual( 2755 1, get_refcount(interpid)) 2756 2757 # When linked, decref back to 0 destroys the interpreter. 2758 decref(interpid) 2759 self.assertFalse( 2760 exists(interpid)) 2761 2762 def test_linked_lifecycle_incref_link(self): 2763 is_linked = _testinternalcapi.interpreter_refcount_linked 2764 link = _testinternalcapi.link_interpreter_refcount 2765 get_refcount, incref, _ = self.get_refcount_helpers() 2766 2767 interpid = self.new_interpreter() 2768 2769 incref(interpid) 2770 self.assertEqual( 2771 1, get_refcount(interpid)) 2772 2773 # Linking it will not reset the refcount. 2774 link(interpid) 2775 self.assertTrue( 2776 is_linked(interpid)) 2777 self.assertEqual( 2778 1, get_refcount(interpid)) 2779 2780 def test_linked_lifecycle_link_incref_unlink_decref(self): 2781 exists = _testinternalcapi.interpreter_exists 2782 is_linked = _testinternalcapi.interpreter_refcount_linked 2783 link = _testinternalcapi.link_interpreter_refcount 2784 unlink = _testinternalcapi.unlink_interpreter_refcount 2785 get_refcount, incref, decref = self.get_refcount_helpers() 2786 2787 interpid = self.new_interpreter() 2788 2789 link(interpid) 2790 self.assertTrue( 2791 is_linked(interpid)) 2792 2793 incref(interpid) 2794 self.assertEqual( 2795 1, get_refcount(interpid)) 2796 2797 # Unlinking it will not change the refcount. 2798 unlink(interpid) 2799 self.assertFalse( 2800 is_linked(interpid)) 2801 self.assertEqual( 2802 1, get_refcount(interpid)) 2803 2804 # Unlinked: decref back to 0 does not destroys the interpreter. 2805 decref(interpid) 2806 self.assertTrue( 2807 exists(interpid)) 2808 self.assertEqual( 2809 0, get_refcount(interpid)) 2810 2811 2812class BuiltinStaticTypesTests(unittest.TestCase): 2813 2814 TYPES = [ 2815 object, 2816 type, 2817 int, 2818 str, 2819 dict, 2820 type(None), 2821 bool, 2822 BaseException, 2823 Exception, 2824 Warning, 2825 DeprecationWarning, # Warning subclass 2826 ] 2827 2828 def test_tp_bases_is_set(self): 2829 # PyTypeObject.tp_bases is documented as public API. 2830 # See https://github.com/python/cpython/issues/105020. 2831 for typeobj in self.TYPES: 2832 with self.subTest(typeobj): 2833 bases = _testcapi.type_get_tp_bases(typeobj) 2834 self.assertIsNot(bases, None) 2835 2836 def test_tp_mro_is_set(self): 2837 # PyTypeObject.tp_bases is documented as public API. 2838 # See https://github.com/python/cpython/issues/105020. 2839 for typeobj in self.TYPES: 2840 with self.subTest(typeobj): 2841 mro = _testcapi.type_get_tp_mro(typeobj) 2842 self.assertIsNot(mro, None) 2843 2844 2845class TestStaticTypes(unittest.TestCase): 2846 2847 _has_run = False 2848 2849 @classmethod 2850 def setUpClass(cls): 2851 # The tests here don't play nice with our approach to refleak 2852 # detection, so we bail out in that case. 2853 if cls._has_run: 2854 raise unittest.SkipTest('these tests do not support re-running') 2855 cls._has_run = True 2856 2857 @contextlib.contextmanager 2858 def basic_static_type(self, *args): 2859 cls = _testcapi.get_basic_static_type(*args) 2860 yield cls 2861 2862 def test_pytype_ready_always_sets_tp_type(self): 2863 # The point of this test is to prevent something like 2864 # https://github.com/python/cpython/issues/104614 2865 # from happening again. 2866 2867 # First check when tp_base/tp_bases is *not* set before PyType_Ready(). 2868 with self.basic_static_type() as cls: 2869 self.assertIs(cls.__base__, object); 2870 self.assertEqual(cls.__bases__, (object,)); 2871 self.assertIs(type(cls), type(object)); 2872 2873 # Then check when we *do* set tp_base/tp_bases first. 2874 with self.basic_static_type(object) as cls: 2875 self.assertIs(cls.__base__, object); 2876 self.assertEqual(cls.__bases__, (object,)); 2877 self.assertIs(type(cls), type(object)); 2878 2879 2880class TestThreadState(unittest.TestCase): 2881 2882 @threading_helper.reap_threads 2883 @threading_helper.requires_working_threading() 2884 def test_thread_state(self): 2885 # some extra thread-state tests driven via _testcapi 2886 def target(): 2887 idents = [] 2888 2889 def callback(): 2890 idents.append(threading.get_ident()) 2891 2892 _testcapi._test_thread_state(callback) 2893 a = b = callback 2894 time.sleep(1) 2895 # Check our main thread is in the list exactly 3 times. 2896 self.assertEqual(idents.count(threading.get_ident()), 3, 2897 "Couldn't find main thread correctly in the list") 2898 2899 target() 2900 t = threading.Thread(target=target) 2901 t.start() 2902 t.join() 2903 2904 @threading_helper.reap_threads 2905 @threading_helper.requires_working_threading() 2906 def test_thread_gilstate_in_clear(self): 2907 # See https://github.com/python/cpython/issues/119585 2908 class C: 2909 def __del__(self): 2910 _testcapi.gilstate_ensure_release() 2911 2912 # Thread-local variables are destroyed in `PyThreadState_Clear()`. 2913 local_var = threading.local() 2914 2915 def callback(): 2916 local_var.x = C() 2917 2918 _testcapi._test_thread_state(callback) 2919 2920 @threading_helper.reap_threads 2921 @threading_helper.requires_working_threading() 2922 def test_gilstate_ensure_no_deadlock(self): 2923 # See https://github.com/python/cpython/issues/96071 2924 code = textwrap.dedent(""" 2925 import _testcapi 2926 2927 def callback(): 2928 print('callback called') 2929 2930 _testcapi._test_thread_state(callback) 2931 """) 2932 ret = assert_python_ok('-X', 'tracemalloc', '-c', code) 2933 self.assertIn(b'callback called', ret.out) 2934 2935 def test_gilstate_matches_current(self): 2936 _testcapi.test_current_tstate_matches() 2937 2938 2939def get_test_funcs(mod, exclude_prefix=None): 2940 funcs = {} 2941 for name in dir(mod): 2942 if not name.startswith('test_'): 2943 continue 2944 if exclude_prefix is not None and name.startswith(exclude_prefix): 2945 continue 2946 funcs[name] = getattr(mod, name) 2947 return funcs 2948 2949 2950class Test_testcapi(unittest.TestCase): 2951 locals().update(get_test_funcs(_testcapi)) 2952 2953 # Suppress warning from PyUnicode_FromUnicode(). 2954 @warnings_helper.ignore_warnings(category=DeprecationWarning) 2955 def test_widechar(self): 2956 _testlimitedcapi.test_widechar() 2957 2958 def test_version_api_data(self): 2959 self.assertEqual(_testcapi.Py_Version, sys.hexversion) 2960 2961 2962class Test_testlimitedcapi(unittest.TestCase): 2963 locals().update(get_test_funcs(_testlimitedcapi)) 2964 2965 2966class Test_testinternalcapi(unittest.TestCase): 2967 locals().update(get_test_funcs(_testinternalcapi, 2968 exclude_prefix='test_lock_')) 2969 2970 2971@threading_helper.requires_working_threading() 2972class Test_PyLock(unittest.TestCase): 2973 locals().update((name, getattr(_testinternalcapi, name)) 2974 for name in dir(_testinternalcapi) 2975 if name.startswith('test_lock_')) 2976 2977 2978@unittest.skipIf(_testmultiphase is None, "test requires _testmultiphase module") 2979class Test_ModuleStateAccess(unittest.TestCase): 2980 """Test access to module start (PEP 573)""" 2981 2982 # The C part of the tests lives in _testmultiphase, in a module called 2983 # _testmultiphase_meth_state_access. 2984 # This module has multi-phase initialization, unlike _testcapi. 2985 2986 def setUp(self): 2987 fullname = '_testmultiphase_meth_state_access' # XXX 2988 origin = importlib.util.find_spec('_testmultiphase').origin 2989 # Apple extensions must be distributed as frameworks. This requires 2990 # a specialist loader. 2991 if support.is_apple_mobile: 2992 loader = importlib.machinery.AppleFrameworkLoader(fullname, origin) 2993 else: 2994 loader = importlib.machinery.ExtensionFileLoader(fullname, origin) 2995 spec = importlib.util.spec_from_loader(fullname, loader) 2996 module = importlib.util.module_from_spec(spec) 2997 loader.exec_module(module) 2998 self.module = module 2999 3000 def test_subclass_get_module(self): 3001 """PyType_GetModule for defining_class""" 3002 class StateAccessType_Subclass(self.module.StateAccessType): 3003 pass 3004 3005 instance = StateAccessType_Subclass() 3006 self.assertIs(instance.get_defining_module(), self.module) 3007 3008 def test_subclass_get_module_with_super(self): 3009 class StateAccessType_Subclass(self.module.StateAccessType): 3010 def get_defining_module(self): 3011 return super().get_defining_module() 3012 3013 instance = StateAccessType_Subclass() 3014 self.assertIs(instance.get_defining_module(), self.module) 3015 3016 def test_state_access(self): 3017 """Checks methods defined with and without argument clinic 3018 3019 This tests a no-arg method (get_count) and a method with 3020 both a positional and keyword argument. 3021 """ 3022 3023 a = self.module.StateAccessType() 3024 b = self.module.StateAccessType() 3025 3026 methods = { 3027 'clinic': a.increment_count_clinic, 3028 'noclinic': a.increment_count_noclinic, 3029 } 3030 3031 for name, increment_count in methods.items(): 3032 with self.subTest(name): 3033 self.assertEqual(a.get_count(), b.get_count()) 3034 self.assertEqual(a.get_count(), 0) 3035 3036 increment_count() 3037 self.assertEqual(a.get_count(), b.get_count()) 3038 self.assertEqual(a.get_count(), 1) 3039 3040 increment_count(3) 3041 self.assertEqual(a.get_count(), b.get_count()) 3042 self.assertEqual(a.get_count(), 4) 3043 3044 increment_count(-2, twice=True) 3045 self.assertEqual(a.get_count(), b.get_count()) 3046 self.assertEqual(a.get_count(), 0) 3047 3048 with self.assertRaises(TypeError): 3049 increment_count(thrice=3) 3050 3051 with self.assertRaises(TypeError): 3052 increment_count(1, 2, 3) 3053 3054 def test_get_module_bad_def(self): 3055 # PyType_GetModuleByDef fails gracefully if it doesn't 3056 # find what it's looking for. 3057 # see bpo-46433 3058 instance = self.module.StateAccessType() 3059 with self.assertRaises(TypeError): 3060 instance.getmodulebydef_bad_def() 3061 3062 def test_get_module_static_in_mro(self): 3063 # Here, the class PyType_GetModuleByDef is looking for 3064 # appears in the MRO after a static type (Exception). 3065 # see bpo-46433 3066 class Subclass(BaseException, self.module.StateAccessType): 3067 pass 3068 self.assertIs(Subclass().get_defining_module(), self.module) 3069 3070 3071class TestInternalFrameApi(unittest.TestCase): 3072 3073 @staticmethod 3074 def func(): 3075 return sys._getframe() 3076 3077 def test_code(self): 3078 frame = self.func() 3079 code = _testinternalcapi.iframe_getcode(frame) 3080 self.assertIs(code, self.func.__code__) 3081 3082 def test_lasti(self): 3083 frame = self.func() 3084 lasti = _testinternalcapi.iframe_getlasti(frame) 3085 self.assertGreater(lasti, 0) 3086 self.assertLess(lasti, len(self.func.__code__.co_code)) 3087 3088 def test_line(self): 3089 frame = self.func() 3090 line = _testinternalcapi.iframe_getline(frame) 3091 firstline = self.func.__code__.co_firstlineno 3092 self.assertEqual(line, firstline + 2) 3093 3094 3095SUFFICIENT_TO_DEOPT_AND_SPECIALIZE = 100 3096 3097class Test_Pep523API(unittest.TestCase): 3098 3099 def do_test(self, func, names): 3100 actual_calls = [] 3101 start = SUFFICIENT_TO_DEOPT_AND_SPECIALIZE 3102 count = start + SUFFICIENT_TO_DEOPT_AND_SPECIALIZE 3103 try: 3104 for i in range(count): 3105 if i == start: 3106 _testinternalcapi.set_eval_frame_record(actual_calls) 3107 func() 3108 finally: 3109 _testinternalcapi.set_eval_frame_default() 3110 expected_calls = names * SUFFICIENT_TO_DEOPT_AND_SPECIALIZE 3111 self.assertEqual(len(expected_calls), len(actual_calls)) 3112 for expected, actual in zip(expected_calls, actual_calls, strict=True): 3113 self.assertEqual(expected, actual) 3114 3115 def test_inlined_binary_subscr(self): 3116 class C: 3117 def __getitem__(self, other): 3118 return None 3119 def func(): 3120 C()[42] 3121 names = ["func", "__getitem__"] 3122 self.do_test(func, names) 3123 3124 def test_inlined_call(self): 3125 def inner(x=42): 3126 pass 3127 def func(): 3128 inner() 3129 inner(42) 3130 names = ["func", "inner", "inner"] 3131 self.do_test(func, names) 3132 3133 def test_inlined_call_function_ex(self): 3134 def inner(x): 3135 pass 3136 def func(): 3137 inner(*[42]) 3138 names = ["func", "inner"] 3139 self.do_test(func, names) 3140 3141 def test_inlined_for_iter(self): 3142 def gen(): 3143 yield 42 3144 def func(): 3145 for _ in gen(): 3146 pass 3147 names = ["func", "gen", "gen", "gen"] 3148 self.do_test(func, names) 3149 3150 def test_inlined_load_attr(self): 3151 class C: 3152 @property 3153 def a(self): 3154 return 42 3155 class D: 3156 def __getattribute__(self, name): 3157 return 42 3158 def func(): 3159 C().a 3160 D().a 3161 names = ["func", "a", "__getattribute__"] 3162 self.do_test(func, names) 3163 3164 def test_inlined_send(self): 3165 def inner(): 3166 yield 42 3167 def outer(): 3168 yield from inner() 3169 def func(): 3170 list(outer()) 3171 names = ["func", "outer", "outer", "inner", "inner", "outer", "inner"] 3172 self.do_test(func, names) 3173 3174 3175@unittest.skipUnless(support.Py_GIL_DISABLED, 'need Py_GIL_DISABLED') 3176class TestPyThreadId(unittest.TestCase): 3177 def test_py_thread_id(self): 3178 # gh-112535: Test _Py_ThreadId(): make sure that thread identifiers 3179 # in a few threads are unique 3180 py_thread_id = _testinternalcapi.py_thread_id 3181 short_sleep = 0.010 3182 3183 class GetThreadId(threading.Thread): 3184 def __init__(self): 3185 super().__init__() 3186 self.get_lock = threading.Lock() 3187 self.get_lock.acquire() 3188 self.started_lock = threading.Event() 3189 self.py_tid = None 3190 3191 def run(self): 3192 self.started_lock.set() 3193 self.get_lock.acquire() 3194 self.py_tid = py_thread_id() 3195 time.sleep(short_sleep) 3196 self.py_tid2 = py_thread_id() 3197 3198 nthread = 5 3199 threads = [GetThreadId() for _ in range(nthread)] 3200 3201 # first make run sure that all threads are running 3202 for thread in threads: 3203 thread.start() 3204 for thread in threads: 3205 thread.started_lock.wait() 3206 3207 # call _Py_ThreadId() in the main thread 3208 py_thread_ids = [py_thread_id()] 3209 3210 # now call _Py_ThreadId() in each thread 3211 for thread in threads: 3212 thread.get_lock.release() 3213 3214 # call _Py_ThreadId() in each thread and wait until threads complete 3215 for thread in threads: 3216 thread.join() 3217 py_thread_ids.append(thread.py_tid) 3218 # _PyThread_Id() should not change for a given thread. 3219 # For example, it should remain the same after a short sleep. 3220 self.assertEqual(thread.py_tid2, thread.py_tid) 3221 3222 # make sure that all _Py_ThreadId() are unique 3223 for tid in py_thread_ids: 3224 self.assertIsInstance(tid, int) 3225 self.assertGreater(tid, 0) 3226 self.assertEqual(len(set(py_thread_ids)), len(py_thread_ids), 3227 py_thread_ids) 3228 3229 3230if __name__ == "__main__": 3231 unittest.main() 3232