1# Run the _testcapi module tests (tests for the Python/C API): by defn, 2# these are all functions _testcapi exports whose name begins with 'test_'. 3 4from collections import OrderedDict 5import importlib.machinery 6import importlib.util 7import os 8import pickle 9import random 10import re 11import subprocess 12import sys 13import textwrap 14import threading 15import time 16import unittest 17import weakref 18from test import support 19from test.support import MISSING_C_DOCSTRINGS 20from test.support import import_helper 21from test.support import threading_helper 22from test.support import warnings_helper 23from test.support.script_helper import assert_python_failure, assert_python_ok 24try: 25 import _posixsubprocess 26except ImportError: 27 _posixsubprocess = None 28 29# Skip this test if the _testcapi module isn't available. 30_testcapi = import_helper.import_module('_testcapi') 31 32import _testinternalcapi 33 34# Were we compiled --with-pydebug or with #define Py_DEBUG? 35Py_DEBUG = hasattr(sys, 'gettotalrefcount') 36 37 38def decode_stderr(err): 39 return err.decode('utf-8', 'replace').replace('\r', '') 40 41 42def testfunction(self): 43 """some doc""" 44 return self 45 46 47class InstanceMethod: 48 id = _testcapi.instancemethod(id) 49 testfunction = _testcapi.instancemethod(testfunction) 50 51class CAPITest(unittest.TestCase): 52 53 def test_instancemethod(self): 54 inst = InstanceMethod() 55 self.assertEqual(id(inst), inst.id()) 56 self.assertTrue(inst.testfunction() is inst) 57 self.assertEqual(inst.testfunction.__doc__, testfunction.__doc__) 58 self.assertEqual(InstanceMethod.testfunction.__doc__, testfunction.__doc__) 59 60 InstanceMethod.testfunction.attribute = "test" 61 self.assertEqual(testfunction.attribute, "test") 62 self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test") 63 64 def test_no_FatalError_infinite_loop(self): 65 with support.SuppressCrashReport(): 66 p = subprocess.Popen([sys.executable, "-c", 67 'import _testcapi;' 68 '_testcapi.crash_no_current_thread()'], 69 stdout=subprocess.PIPE, 70 stderr=subprocess.PIPE) 71 (out, err) = p.communicate() 72 self.assertEqual(out, b'') 73 # This used to cause an infinite loop. 74 self.assertTrue(err.rstrip().startswith( 75 b'Fatal Python error: ' 76 b'PyThreadState_Get: ' 77 b'the function must be called with the GIL held, ' 78 b'but the GIL is released ' 79 b'(the current Python thread state is NULL)'), 80 err) 81 82 def test_memoryview_from_NULL_pointer(self): 83 self.assertRaises(ValueError, _testcapi.make_memoryview_from_NULL_pointer) 84 85 def test_exc_info(self): 86 raised_exception = ValueError("5") 87 new_exc = TypeError("TEST") 88 try: 89 raise raised_exception 90 except ValueError as e: 91 tb = e.__traceback__ 92 orig_sys_exc_info = sys.exc_info() 93 orig_exc_info = _testcapi.set_exc_info(new_exc.__class__, new_exc, None) 94 new_sys_exc_info = sys.exc_info() 95 new_exc_info = _testcapi.set_exc_info(*orig_exc_info) 96 reset_sys_exc_info = sys.exc_info() 97 98 self.assertEqual(orig_exc_info[1], e) 99 100 self.assertSequenceEqual(orig_exc_info, (raised_exception.__class__, raised_exception, tb)) 101 self.assertSequenceEqual(orig_sys_exc_info, orig_exc_info) 102 self.assertSequenceEqual(reset_sys_exc_info, orig_exc_info) 103 self.assertSequenceEqual(new_exc_info, (new_exc.__class__, new_exc, None)) 104 self.assertSequenceEqual(new_sys_exc_info, new_exc_info) 105 else: 106 self.assertTrue(False) 107 108 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.') 109 def test_seq_bytes_to_charp_array(self): 110 # Issue #15732: crash in _PySequence_BytesToCharpArray() 111 class Z(object): 112 def __len__(self): 113 return 1 114 self.assertRaises(TypeError, _posixsubprocess.fork_exec, 115 1,Z(),3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21) 116 # Issue #15736: overflow in _PySequence_BytesToCharpArray() 117 class Z(object): 118 def __len__(self): 119 return sys.maxsize 120 def __getitem__(self, i): 121 return b'x' 122 self.assertRaises(MemoryError, _posixsubprocess.fork_exec, 123 1,Z(),3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21) 124 125 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.') 126 def test_subprocess_fork_exec(self): 127 class Z(object): 128 def __len__(self): 129 return 1 130 131 # Issue #15738: crash in subprocess_fork_exec() 132 self.assertRaises(TypeError, _posixsubprocess.fork_exec, 133 Z(),[b'1'],3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21) 134 135 @unittest.skipIf(MISSING_C_DOCSTRINGS, 136 "Signature information for builtins requires docstrings") 137 def test_docstring_signature_parsing(self): 138 139 self.assertEqual(_testcapi.no_docstring.__doc__, None) 140 self.assertEqual(_testcapi.no_docstring.__text_signature__, None) 141 142 self.assertEqual(_testcapi.docstring_empty.__doc__, None) 143 self.assertEqual(_testcapi.docstring_empty.__text_signature__, None) 144 145 self.assertEqual(_testcapi.docstring_no_signature.__doc__, 146 "This docstring has no signature.") 147 self.assertEqual(_testcapi.docstring_no_signature.__text_signature__, None) 148 149 self.assertEqual(_testcapi.docstring_with_invalid_signature.__doc__, 150 "docstring_with_invalid_signature($module, /, boo)\n" 151 "\n" 152 "This docstring has an invalid signature." 153 ) 154 self.assertEqual(_testcapi.docstring_with_invalid_signature.__text_signature__, None) 155 156 self.assertEqual(_testcapi.docstring_with_invalid_signature2.__doc__, 157 "docstring_with_invalid_signature2($module, /, boo)\n" 158 "\n" 159 "--\n" 160 "\n" 161 "This docstring also has an invalid signature." 162 ) 163 self.assertEqual(_testcapi.docstring_with_invalid_signature2.__text_signature__, None) 164 165 self.assertEqual(_testcapi.docstring_with_signature.__doc__, 166 "This docstring has a valid signature.") 167 self.assertEqual(_testcapi.docstring_with_signature.__text_signature__, "($module, /, sig)") 168 169 self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__doc__, None) 170 self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__text_signature__, 171 "($module, /, sig)") 172 173 self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__doc__, 174 "\nThis docstring has a valid signature and some extra newlines.") 175 self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__text_signature__, 176 "($module, /, parameter)") 177 178 def test_c_type_with_matrix_multiplication(self): 179 M = _testcapi.matmulType 180 m1 = M() 181 m2 = M() 182 self.assertEqual(m1 @ m2, ("matmul", m1, m2)) 183 self.assertEqual(m1 @ 42, ("matmul", m1, 42)) 184 self.assertEqual(42 @ m1, ("matmul", 42, m1)) 185 o = m1 186 o @= m2 187 self.assertEqual(o, ("imatmul", m1, m2)) 188 o = m1 189 o @= 42 190 self.assertEqual(o, ("imatmul", m1, 42)) 191 o = 42 192 o @= m1 193 self.assertEqual(o, ("matmul", 42, m1)) 194 195 def test_c_type_with_ipow(self): 196 # When the __ipow__ method of a type was implemented in C, using the 197 # modulo param would cause segfaults. 198 o = _testcapi.ipowType() 199 self.assertEqual(o.__ipow__(1), (1, None)) 200 self.assertEqual(o.__ipow__(2, 2), (2, 2)) 201 202 def test_return_null_without_error(self): 203 # Issue #23571: A function must not return NULL without setting an 204 # error 205 if Py_DEBUG: 206 code = textwrap.dedent(""" 207 import _testcapi 208 from test import support 209 210 with support.SuppressCrashReport(): 211 _testcapi.return_null_without_error() 212 """) 213 rc, out, err = assert_python_failure('-c', code) 214 err = decode_stderr(err) 215 self.assertRegex(err, 216 r'Fatal Python error: _Py_CheckFunctionResult: ' 217 r'a function returned NULL without setting an exception\n' 218 r'Python runtime state: initialized\n' 219 r'SystemError: <built-in function return_null_without_error> ' 220 r'returned NULL without setting an exception\n' 221 r'\n' 222 r'Current thread.*:\n' 223 r' File .*", line 6 in <module>\n') 224 else: 225 with self.assertRaises(SystemError) as cm: 226 _testcapi.return_null_without_error() 227 self.assertRegex(str(cm.exception), 228 'return_null_without_error.* ' 229 'returned NULL without setting an exception') 230 231 def test_return_result_with_error(self): 232 # Issue #23571: A function must not return a result with an error set 233 if Py_DEBUG: 234 code = textwrap.dedent(""" 235 import _testcapi 236 from test import support 237 238 with support.SuppressCrashReport(): 239 _testcapi.return_result_with_error() 240 """) 241 rc, out, err = assert_python_failure('-c', code) 242 err = decode_stderr(err) 243 self.assertRegex(err, 244 r'Fatal Python error: _Py_CheckFunctionResult: ' 245 r'a function returned a result with an exception set\n' 246 r'Python runtime state: initialized\n' 247 r'ValueError\n' 248 r'\n' 249 r'The above exception was the direct cause ' 250 r'of the following exception:\n' 251 r'\n' 252 r'SystemError: <built-in ' 253 r'function return_result_with_error> ' 254 r'returned a result with an exception set\n' 255 r'\n' 256 r'Current thread.*:\n' 257 r' File .*, line 6 in <module>\n') 258 else: 259 with self.assertRaises(SystemError) as cm: 260 _testcapi.return_result_with_error() 261 self.assertRegex(str(cm.exception), 262 'return_result_with_error.* ' 263 'returned a result with an exception set') 264 265 def test_getitem_with_error(self): 266 # Test _Py_CheckSlotResult(). Raise an exception and then calls 267 # PyObject_GetItem(): check that the assertion catches the bug. 268 # PyObject_GetItem() must not be called with an exception set. 269 code = textwrap.dedent(""" 270 import _testcapi 271 from test import support 272 273 with support.SuppressCrashReport(): 274 _testcapi.getitem_with_error({1: 2}, 1) 275 """) 276 rc, out, err = assert_python_failure('-c', code) 277 err = decode_stderr(err) 278 if 'SystemError: ' not in err: 279 self.assertRegex(err, 280 r'Fatal Python error: _Py_CheckSlotResult: ' 281 r'Slot __getitem__ of type dict succeeded ' 282 r'with an exception set\n' 283 r'Python runtime state: initialized\n' 284 r'ValueError: bug\n' 285 r'\n' 286 r'Current thread .* \(most recent call first\):\n' 287 r' File .*, line 6 in <module>\n' 288 r'\n' 289 r'Extension modules: _testcapi \(total: 1\)\n') 290 else: 291 # Python built with NDEBUG macro defined: 292 # test _Py_CheckFunctionResult() instead. 293 self.assertIn('returned a result with an exception set', err) 294 295 def test_buildvalue_N(self): 296 _testcapi.test_buildvalue_N() 297 298 def test_set_nomemory(self): 299 code = """if 1: 300 import _testcapi 301 302 class C(): pass 303 304 # The first loop tests both functions and that remove_mem_hooks() 305 # can be called twice in a row. The second loop checks a call to 306 # set_nomemory() after a call to remove_mem_hooks(). The third 307 # loop checks the start and stop arguments of set_nomemory(). 308 for outer_cnt in range(1, 4): 309 start = 10 * outer_cnt 310 for j in range(100): 311 if j == 0: 312 if outer_cnt != 3: 313 _testcapi.set_nomemory(start) 314 else: 315 _testcapi.set_nomemory(start, start + 1) 316 try: 317 C() 318 except MemoryError as e: 319 if outer_cnt != 3: 320 _testcapi.remove_mem_hooks() 321 print('MemoryError', outer_cnt, j) 322 _testcapi.remove_mem_hooks() 323 break 324 """ 325 rc, out, err = assert_python_ok('-c', code) 326 self.assertIn(b'MemoryError 1 10', out) 327 self.assertIn(b'MemoryError 2 20', out) 328 self.assertIn(b'MemoryError 3 30', out) 329 330 def test_mapping_keys_values_items(self): 331 class Mapping1(dict): 332 def keys(self): 333 return list(super().keys()) 334 def values(self): 335 return list(super().values()) 336 def items(self): 337 return list(super().items()) 338 class Mapping2(dict): 339 def keys(self): 340 return tuple(super().keys()) 341 def values(self): 342 return tuple(super().values()) 343 def items(self): 344 return tuple(super().items()) 345 dict_obj = {'foo': 1, 'bar': 2, 'spam': 3} 346 347 for mapping in [{}, OrderedDict(), Mapping1(), Mapping2(), 348 dict_obj, OrderedDict(dict_obj), 349 Mapping1(dict_obj), Mapping2(dict_obj)]: 350 self.assertListEqual(_testcapi.get_mapping_keys(mapping), 351 list(mapping.keys())) 352 self.assertListEqual(_testcapi.get_mapping_values(mapping), 353 list(mapping.values())) 354 self.assertListEqual(_testcapi.get_mapping_items(mapping), 355 list(mapping.items())) 356 357 def test_mapping_keys_values_items_bad_arg(self): 358 self.assertRaises(AttributeError, _testcapi.get_mapping_keys, None) 359 self.assertRaises(AttributeError, _testcapi.get_mapping_values, None) 360 self.assertRaises(AttributeError, _testcapi.get_mapping_items, None) 361 362 class BadMapping: 363 def keys(self): 364 return None 365 def values(self): 366 return None 367 def items(self): 368 return None 369 bad_mapping = BadMapping() 370 self.assertRaises(TypeError, _testcapi.get_mapping_keys, bad_mapping) 371 self.assertRaises(TypeError, _testcapi.get_mapping_values, bad_mapping) 372 self.assertRaises(TypeError, _testcapi.get_mapping_items, bad_mapping) 373 374 @unittest.skipUnless(hasattr(_testcapi, 'negative_refcount'), 375 'need _testcapi.negative_refcount') 376 def test_negative_refcount(self): 377 # bpo-35059: Check that Py_DECREF() reports the correct filename 378 # when calling _Py_NegativeRefcount() to abort Python. 379 code = textwrap.dedent(""" 380 import _testcapi 381 from test import support 382 383 with support.SuppressCrashReport(): 384 _testcapi.negative_refcount() 385 """) 386 rc, out, err = assert_python_failure('-c', code) 387 self.assertRegex(err, 388 br'_testcapimodule\.c:[0-9]+: ' 389 br'_Py_NegativeRefcount: Assertion failed: ' 390 br'object has negative ref count') 391 392 def test_trashcan_subclass(self): 393 # bpo-35983: Check that the trashcan mechanism for "list" is NOT 394 # activated when its tp_dealloc is being called by a subclass 395 from _testcapi import MyList 396 L = None 397 for i in range(1000): 398 L = MyList((L,)) 399 400 @support.requires_resource('cpu') 401 def test_trashcan_python_class1(self): 402 self.do_test_trashcan_python_class(list) 403 404 @support.requires_resource('cpu') 405 def test_trashcan_python_class2(self): 406 from _testcapi import MyList 407 self.do_test_trashcan_python_class(MyList) 408 409 def do_test_trashcan_python_class(self, base): 410 # Check that the trashcan mechanism works properly for a Python 411 # subclass of a class using the trashcan (this specific test assumes 412 # that the base class "base" behaves like list) 413 class PyList(base): 414 # Count the number of PyList instances to verify that there is 415 # no memory leak 416 num = 0 417 def __init__(self, *args): 418 __class__.num += 1 419 super().__init__(*args) 420 def __del__(self): 421 __class__.num -= 1 422 423 for parity in (0, 1): 424 L = None 425 # We need in the order of 2**20 iterations here such that a 426 # typical 8MB stack would overflow without the trashcan. 427 for i in range(2**20): 428 L = PyList((L,)) 429 L.attr = i 430 if parity: 431 # Add one additional nesting layer 432 L = (L,) 433 self.assertGreater(PyList.num, 0) 434 del L 435 self.assertEqual(PyList.num, 0) 436 437 def test_heap_ctype_doc_and_text_signature(self): 438 self.assertEqual(_testcapi.HeapDocCType.__doc__, "somedoc") 439 self.assertEqual(_testcapi.HeapDocCType.__text_signature__, "(arg1, arg2)") 440 441 def test_null_type_doc(self): 442 self.assertEqual(_testcapi.NullTpDocType.__doc__, None) 443 444 def test_subclass_of_heap_gc_ctype_with_tpdealloc_decrefs_once(self): 445 class HeapGcCTypeSubclass(_testcapi.HeapGcCType): 446 def __init__(self): 447 self.value2 = 20 448 super().__init__() 449 450 subclass_instance = HeapGcCTypeSubclass() 451 type_refcnt = sys.getrefcount(HeapGcCTypeSubclass) 452 453 # Test that subclass instance was fully created 454 self.assertEqual(subclass_instance.value, 10) 455 self.assertEqual(subclass_instance.value2, 20) 456 457 # Test that the type reference count is only decremented once 458 del subclass_instance 459 self.assertEqual(type_refcnt - 1, sys.getrefcount(HeapGcCTypeSubclass)) 460 461 def test_subclass_of_heap_gc_ctype_with_del_modifying_dunder_class_only_decrefs_once(self): 462 class A(_testcapi.HeapGcCType): 463 def __init__(self): 464 self.value2 = 20 465 super().__init__() 466 467 class B(A): 468 def __init__(self): 469 super().__init__() 470 471 def __del__(self): 472 self.__class__ = A 473 A.refcnt_in_del = sys.getrefcount(A) 474 B.refcnt_in_del = sys.getrefcount(B) 475 476 subclass_instance = B() 477 type_refcnt = sys.getrefcount(B) 478 new_type_refcnt = sys.getrefcount(A) 479 480 # Test that subclass instance was fully created 481 self.assertEqual(subclass_instance.value, 10) 482 self.assertEqual(subclass_instance.value2, 20) 483 484 del subclass_instance 485 486 # Test that setting __class__ modified the reference counts of the types 487 self.assertEqual(type_refcnt - 1, B.refcnt_in_del) 488 self.assertEqual(new_type_refcnt + 1, A.refcnt_in_del) 489 490 # Test that the original type already has decreased its refcnt 491 self.assertEqual(type_refcnt - 1, sys.getrefcount(B)) 492 493 # Test that subtype_dealloc decref the newly assigned __class__ only once 494 self.assertEqual(new_type_refcnt, sys.getrefcount(A)) 495 496 def test_heaptype_with_dict(self): 497 inst = _testcapi.HeapCTypeWithDict() 498 inst.foo = 42 499 self.assertEqual(inst.foo, 42) 500 self.assertEqual(inst.dictobj, inst.__dict__) 501 self.assertEqual(inst.dictobj, {"foo": 42}) 502 503 inst = _testcapi.HeapCTypeWithDict() 504 self.assertEqual({}, inst.__dict__) 505 506 def test_heaptype_with_negative_dict(self): 507 inst = _testcapi.HeapCTypeWithNegativeDict() 508 inst.foo = 42 509 self.assertEqual(inst.foo, 42) 510 self.assertEqual(inst.dictobj, inst.__dict__) 511 self.assertEqual(inst.dictobj, {"foo": 42}) 512 513 inst = _testcapi.HeapCTypeWithNegativeDict() 514 self.assertEqual({}, inst.__dict__) 515 516 def test_heaptype_with_weakref(self): 517 inst = _testcapi.HeapCTypeWithWeakref() 518 ref = weakref.ref(inst) 519 self.assertEqual(ref(), inst) 520 self.assertEqual(inst.weakreflist, ref) 521 522 def test_heaptype_with_buffer(self): 523 inst = _testcapi.HeapCTypeWithBuffer() 524 b = bytes(inst) 525 self.assertEqual(b, b"1234") 526 527 def test_c_subclass_of_heap_ctype_with_tpdealloc_decrefs_once(self): 528 subclass_instance = _testcapi.HeapCTypeSubclass() 529 type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclass) 530 531 # Test that subclass instance was fully created 532 self.assertEqual(subclass_instance.value, 10) 533 self.assertEqual(subclass_instance.value2, 20) 534 535 # Test that the type reference count is only decremented once 536 del subclass_instance 537 self.assertEqual(type_refcnt - 1, sys.getrefcount(_testcapi.HeapCTypeSubclass)) 538 539 def test_c_subclass_of_heap_ctype_with_del_modifying_dunder_class_only_decrefs_once(self): 540 subclass_instance = _testcapi.HeapCTypeSubclassWithFinalizer() 541 type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclassWithFinalizer) 542 new_type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclass) 543 544 # Test that subclass instance was fully created 545 self.assertEqual(subclass_instance.value, 10) 546 self.assertEqual(subclass_instance.value2, 20) 547 548 # The tp_finalize slot will set __class__ to HeapCTypeSubclass 549 del subclass_instance 550 551 # Test that setting __class__ modified the reference counts of the types 552 self.assertEqual(type_refcnt - 1, _testcapi.HeapCTypeSubclassWithFinalizer.refcnt_in_del) 553 self.assertEqual(new_type_refcnt + 1, _testcapi.HeapCTypeSubclass.refcnt_in_del) 554 555 # Test that the original type already has decreased its refcnt 556 self.assertEqual(type_refcnt - 1, sys.getrefcount(_testcapi.HeapCTypeSubclassWithFinalizer)) 557 558 # Test that subtype_dealloc decref the newly assigned __class__ only once 559 self.assertEqual(new_type_refcnt, sys.getrefcount(_testcapi.HeapCTypeSubclass)) 560 561 def test_heaptype_with_setattro(self): 562 obj = _testcapi.HeapCTypeSetattr() 563 self.assertEqual(obj.pvalue, 10) 564 obj.value = 12 565 self.assertEqual(obj.pvalue, 12) 566 del obj.value 567 self.assertEqual(obj.pvalue, 0) 568 569 def test_pynumber_tobase(self): 570 from _testcapi import pynumber_tobase 571 self.assertEqual(pynumber_tobase(123, 2), '0b1111011') 572 self.assertEqual(pynumber_tobase(123, 8), '0o173') 573 self.assertEqual(pynumber_tobase(123, 10), '123') 574 self.assertEqual(pynumber_tobase(123, 16), '0x7b') 575 self.assertEqual(pynumber_tobase(-123, 2), '-0b1111011') 576 self.assertEqual(pynumber_tobase(-123, 8), '-0o173') 577 self.assertEqual(pynumber_tobase(-123, 10), '-123') 578 self.assertEqual(pynumber_tobase(-123, 16), '-0x7b') 579 self.assertRaises(TypeError, pynumber_tobase, 123.0, 10) 580 self.assertRaises(TypeError, pynumber_tobase, '123', 10) 581 self.assertRaises(SystemError, pynumber_tobase, 123, 0) 582 583 def check_fatal_error(self, code, expected, not_expected=()): 584 with support.SuppressCrashReport(): 585 rc, out, err = assert_python_failure('-sSI', '-c', code) 586 587 err = decode_stderr(err) 588 self.assertIn('Fatal Python error: test_fatal_error: MESSAGE\n', 589 err) 590 591 match = re.search(r'^Extension modules:(.*) \(total: ([0-9]+)\)$', 592 err, re.MULTILINE) 593 if not match: 594 self.fail(f"Cannot find 'Extension modules:' in {err!r}") 595 modules = set(match.group(1).strip().split(', ')) 596 total = int(match.group(2)) 597 598 for name in expected: 599 self.assertIn(name, modules) 600 for name in not_expected: 601 self.assertNotIn(name, modules) 602 self.assertEqual(len(modules), total) 603 604 def test_fatal_error(self): 605 # By default, stdlib extension modules are ignored, 606 # but not test modules. 607 expected = ('_testcapi',) 608 not_expected = ('sys',) 609 code = 'import _testcapi, sys; _testcapi.fatal_error(b"MESSAGE")' 610 self.check_fatal_error(code, expected, not_expected) 611 612 # Mark _testcapi as stdlib module, but not sys 613 expected = ('sys',) 614 not_expected = ('_testcapi',) 615 code = textwrap.dedent(''' 616 import _testcapi, sys 617 sys.stdlib_module_names = frozenset({"_testcapi"}) 618 _testcapi.fatal_error(b"MESSAGE") 619 ''') 620 self.check_fatal_error(code, expected) 621 622 def test_pyobject_repr_from_null(self): 623 s = _testcapi.pyobject_repr_from_null() 624 self.assertEqual(s, '<NULL>') 625 626 def test_pyobject_str_from_null(self): 627 s = _testcapi.pyobject_str_from_null() 628 self.assertEqual(s, '<NULL>') 629 630 def test_pyobject_bytes_from_null(self): 631 s = _testcapi.pyobject_bytes_from_null() 632 self.assertEqual(s, b'<NULL>') 633 634 def test_Py_CompileString(self): 635 # Check that Py_CompileString respects the coding cookie 636 _compile = _testcapi.Py_CompileString 637 code = b"# -*- coding: latin1 -*-\nprint('\xc2\xa4')\n" 638 result = _compile(code) 639 expected = compile(code, "<string>", "exec") 640 self.assertEqual(result.co_consts, expected.co_consts) 641 642 643class TestPendingCalls(unittest.TestCase): 644 645 def pendingcalls_submit(self, l, n): 646 def callback(): 647 #this function can be interrupted by thread switching so let's 648 #use an atomic operation 649 l.append(None) 650 651 for i in range(n): 652 time.sleep(random.random()*0.02) #0.01 secs on average 653 #try submitting callback until successful. 654 #rely on regular interrupt to flush queue if we are 655 #unsuccessful. 656 while True: 657 if _testcapi._pending_threadfunc(callback): 658 break 659 660 def pendingcalls_wait(self, l, n, context = None): 661 #now, stick around until l[0] has grown to 10 662 count = 0 663 while len(l) != n: 664 #this busy loop is where we expect to be interrupted to 665 #run our callbacks. Note that callbacks are only run on the 666 #main thread 667 if False and support.verbose: 668 print("(%i)"%(len(l),),) 669 for i in range(1000): 670 a = i*i 671 if context and not context.event.is_set(): 672 continue 673 count += 1 674 self.assertTrue(count < 10000, 675 "timeout waiting for %i callbacks, got %i"%(n, len(l))) 676 if False and support.verbose: 677 print("(%i)"%(len(l),)) 678 679 def test_pendingcalls_threaded(self): 680 681 #do every callback on a separate thread 682 n = 32 #total callbacks 683 threads = [] 684 class foo(object):pass 685 context = foo() 686 context.l = [] 687 context.n = 2 #submits per thread 688 context.nThreads = n // context.n 689 context.nFinished = 0 690 context.lock = threading.Lock() 691 context.event = threading.Event() 692 693 threads = [threading.Thread(target=self.pendingcalls_thread, 694 args=(context,)) 695 for i in range(context.nThreads)] 696 with threading_helper.start_threads(threads): 697 self.pendingcalls_wait(context.l, n, context) 698 699 def pendingcalls_thread(self, context): 700 try: 701 self.pendingcalls_submit(context.l, context.n) 702 finally: 703 with context.lock: 704 context.nFinished += 1 705 nFinished = context.nFinished 706 if False and support.verbose: 707 print("finished threads: ", nFinished) 708 if nFinished == context.nThreads: 709 context.event.set() 710 711 def test_pendingcalls_non_threaded(self): 712 #again, just using the main thread, likely they will all be dispatched at 713 #once. It is ok to ask for too many, because we loop until we find a slot. 714 #the loop can be interrupted to dispatch. 715 #there are only 32 dispatch slots, so we go for twice that! 716 l = [] 717 n = 64 718 self.pendingcalls_submit(l, n) 719 self.pendingcalls_wait(l, n) 720 721 722class SubinterpreterTest(unittest.TestCase): 723 724 def test_subinterps(self): 725 import builtins 726 r, w = os.pipe() 727 code = """if 1: 728 import sys, builtins, pickle 729 with open({:d}, "wb") as f: 730 pickle.dump(id(sys.modules), f) 731 pickle.dump(id(builtins), f) 732 """.format(w) 733 with open(r, "rb") as f: 734 ret = support.run_in_subinterp(code) 735 self.assertEqual(ret, 0) 736 self.assertNotEqual(pickle.load(f), id(sys.modules)) 737 self.assertNotEqual(pickle.load(f), id(builtins)) 738 739 def test_subinterps_recent_language_features(self): 740 r, w = os.pipe() 741 code = """if 1: 742 import pickle 743 with open({:d}, "wb") as f: 744 745 @(lambda x:x) # Py 3.9 746 def noop(x): return x 747 748 a = (b := f'1{{2}}3') + noop('x') # Py 3.8 (:=) / 3.6 (f'') 749 750 async def foo(arg): return await arg # Py 3.5 751 752 pickle.dump(dict(a=a, b=b), f) 753 """.format(w) 754 755 with open(r, "rb") as f: 756 ret = support.run_in_subinterp(code) 757 self.assertEqual(ret, 0) 758 self.assertEqual(pickle.load(f), {'a': '123x', 'b': '123'}) 759 760 def test_mutate_exception(self): 761 """ 762 Exceptions saved in global module state get shared between 763 individual module instances. This test checks whether or not 764 a change in one interpreter's module gets reflected into the 765 other ones. 766 """ 767 import binascii 768 769 support.run_in_subinterp("import binascii; binascii.Error.foobar = 'foobar'") 770 771 self.assertFalse(hasattr(binascii.Error, "foobar")) 772 773 def test_module_state_shared_in_global(self): 774 """ 775 bpo-44050: Extension module state should be shared between interpreters 776 when it doesn't support sub-interpreters. 777 """ 778 r, w = os.pipe() 779 self.addCleanup(os.close, r) 780 self.addCleanup(os.close, w) 781 782 script = textwrap.dedent(f""" 783 import importlib.machinery 784 import importlib.util 785 import os 786 787 fullname = '_test_module_state_shared' 788 origin = importlib.util.find_spec('_testmultiphase').origin 789 loader = importlib.machinery.ExtensionFileLoader(fullname, origin) 790 spec = importlib.util.spec_from_loader(fullname, loader) 791 module = importlib.util.module_from_spec(spec) 792 attr_id = str(id(module.Error)).encode() 793 794 os.write({w}, attr_id) 795 """) 796 exec(script) 797 main_attr_id = os.read(r, 100) 798 799 ret = support.run_in_subinterp(script) 800 self.assertEqual(ret, 0) 801 subinterp_attr_id = os.read(r, 100) 802 self.assertEqual(main_attr_id, subinterp_attr_id) 803 804 805class TestThreadState(unittest.TestCase): 806 807 @threading_helper.reap_threads 808 def test_thread_state(self): 809 # some extra thread-state tests driven via _testcapi 810 def target(): 811 idents = [] 812 813 def callback(): 814 idents.append(threading.get_ident()) 815 816 _testcapi._test_thread_state(callback) 817 a = b = callback 818 time.sleep(1) 819 # Check our main thread is in the list exactly 3 times. 820 self.assertEqual(idents.count(threading.get_ident()), 3, 821 "Couldn't find main thread correctly in the list") 822 823 target() 824 t = threading.Thread(target=target) 825 t.start() 826 t.join() 827 828 829class Test_testcapi(unittest.TestCase): 830 locals().update((name, getattr(_testcapi, name)) 831 for name in dir(_testcapi) 832 if name.startswith('test_') and not name.endswith('_code')) 833 834 # Suppress warning from PyUnicode_FromUnicode(). 835 @warnings_helper.ignore_warnings(category=DeprecationWarning) 836 def test_widechar(self): 837 _testcapi.test_widechar() 838 839 840class Test_testinternalcapi(unittest.TestCase): 841 locals().update((name, getattr(_testinternalcapi, name)) 842 for name in dir(_testinternalcapi) 843 if name.startswith('test_')) 844 845 846class PyMemDebugTests(unittest.TestCase): 847 PYTHONMALLOC = 'debug' 848 # '0x04c06e0' or '04C06E0' 849 PTR_REGEX = r'(?:0x)?[0-9a-fA-F]+' 850 851 def check(self, code): 852 with support.SuppressCrashReport(): 853 out = assert_python_failure( 854 '-c', code, 855 PYTHONMALLOC=self.PYTHONMALLOC, 856 # FreeBSD: instruct jemalloc to not fill freed() memory 857 # with junk byte 0x5a, see JEMALLOC(3) 858 MALLOC_CONF="junk:false", 859 ) 860 stderr = out.err 861 return stderr.decode('ascii', 'replace') 862 863 def test_buffer_overflow(self): 864 out = self.check('import _testcapi; _testcapi.pymem_buffer_overflow()') 865 regex = (r"Debug memory block at address p={ptr}: API 'm'\n" 866 r" 16 bytes originally requested\n" 867 r" The [0-9] pad bytes at p-[0-9] are FORBIDDENBYTE, as expected.\n" 868 r" The [0-9] pad bytes at tail={ptr} are not all FORBIDDENBYTE \(0x[0-9a-f]{{2}}\):\n" 869 r" at tail\+0: 0x78 \*\*\* OUCH\n" 870 r" at tail\+1: 0xfd\n" 871 r" at tail\+2: 0xfd\n" 872 r" .*\n" 873 r"( The block was made by call #[0-9]+ to debug malloc/realloc.\n)?" 874 r" Data at p: cd cd cd .*\n" 875 r"\n" 876 r"Enable tracemalloc to get the memory block allocation traceback\n" 877 r"\n" 878 r"Fatal Python error: _PyMem_DebugRawFree: bad trailing pad byte") 879 regex = regex.format(ptr=self.PTR_REGEX) 880 regex = re.compile(regex, flags=re.DOTALL) 881 self.assertRegex(out, regex) 882 883 def test_api_misuse(self): 884 out = self.check('import _testcapi; _testcapi.pymem_api_misuse()') 885 regex = (r"Debug memory block at address p={ptr}: API 'm'\n" 886 r" 16 bytes originally requested\n" 887 r" The [0-9] pad bytes at p-[0-9] are FORBIDDENBYTE, as expected.\n" 888 r" The [0-9] pad bytes at tail={ptr} are FORBIDDENBYTE, as expected.\n" 889 r"( The block was made by call #[0-9]+ to debug malloc/realloc.\n)?" 890 r" Data at p: cd cd cd .*\n" 891 r"\n" 892 r"Enable tracemalloc to get the memory block allocation traceback\n" 893 r"\n" 894 r"Fatal Python error: _PyMem_DebugRawFree: bad ID: Allocated using API 'm', verified using API 'r'\n") 895 regex = regex.format(ptr=self.PTR_REGEX) 896 self.assertRegex(out, regex) 897 898 def check_malloc_without_gil(self, code): 899 out = self.check(code) 900 expected = ('Fatal Python error: _PyMem_DebugMalloc: ' 901 'Python memory allocator called without holding the GIL') 902 self.assertIn(expected, out) 903 904 def test_pymem_malloc_without_gil(self): 905 # Debug hooks must raise an error if PyMem_Malloc() is called 906 # without holding the GIL 907 code = 'import _testcapi; _testcapi.pymem_malloc_without_gil()' 908 self.check_malloc_without_gil(code) 909 910 def test_pyobject_malloc_without_gil(self): 911 # Debug hooks must raise an error if PyObject_Malloc() is called 912 # without holding the GIL 913 code = 'import _testcapi; _testcapi.pyobject_malloc_without_gil()' 914 self.check_malloc_without_gil(code) 915 916 def check_pyobject_is_freed(self, func_name): 917 code = textwrap.dedent(f''' 918 import gc, os, sys, _testcapi 919 # Disable the GC to avoid crash on GC collection 920 gc.disable() 921 try: 922 _testcapi.{func_name}() 923 # Exit immediately to avoid a crash while deallocating 924 # the invalid object 925 os._exit(0) 926 except _testcapi.error: 927 os._exit(1) 928 ''') 929 assert_python_ok( 930 '-c', code, 931 PYTHONMALLOC=self.PYTHONMALLOC, 932 MALLOC_CONF="junk:false", 933 ) 934 935 def test_pyobject_null_is_freed(self): 936 self.check_pyobject_is_freed('check_pyobject_null_is_freed') 937 938 def test_pyobject_uninitialized_is_freed(self): 939 self.check_pyobject_is_freed('check_pyobject_uninitialized_is_freed') 940 941 def test_pyobject_forbidden_bytes_is_freed(self): 942 self.check_pyobject_is_freed('check_pyobject_forbidden_bytes_is_freed') 943 944 def test_pyobject_freed_is_freed(self): 945 self.check_pyobject_is_freed('check_pyobject_freed_is_freed') 946 947 948class PyMemMallocDebugTests(PyMemDebugTests): 949 PYTHONMALLOC = 'malloc_debug' 950 951 952@unittest.skipUnless(support.with_pymalloc(), 'need pymalloc') 953class PyMemPymallocDebugTests(PyMemDebugTests): 954 PYTHONMALLOC = 'pymalloc_debug' 955 956 957@unittest.skipUnless(Py_DEBUG, 'need Py_DEBUG') 958class PyMemDefaultTests(PyMemDebugTests): 959 # test default allocator of Python compiled in debug mode 960 PYTHONMALLOC = '' 961 962 963class Test_ModuleStateAccess(unittest.TestCase): 964 """Test access to module start (PEP 573)""" 965 966 # The C part of the tests lives in _testmultiphase, in a module called 967 # _testmultiphase_meth_state_access. 968 # This module has multi-phase initialization, unlike _testcapi. 969 970 def setUp(self): 971 fullname = '_testmultiphase_meth_state_access' # XXX 972 origin = importlib.util.find_spec('_testmultiphase').origin 973 loader = importlib.machinery.ExtensionFileLoader(fullname, origin) 974 spec = importlib.util.spec_from_loader(fullname, loader) 975 module = importlib.util.module_from_spec(spec) 976 loader.exec_module(module) 977 self.module = module 978 979 def test_subclass_get_module(self): 980 """PyType_GetModule for defining_class""" 981 class StateAccessType_Subclass(self.module.StateAccessType): 982 pass 983 984 instance = StateAccessType_Subclass() 985 self.assertIs(instance.get_defining_module(), self.module) 986 987 def test_subclass_get_module_with_super(self): 988 class StateAccessType_Subclass(self.module.StateAccessType): 989 def get_defining_module(self): 990 return super().get_defining_module() 991 992 instance = StateAccessType_Subclass() 993 self.assertIs(instance.get_defining_module(), self.module) 994 995 def test_state_access(self): 996 """Checks methods defined with and without argument clinic 997 998 This tests a no-arg method (get_count) and a method with 999 both a positional and keyword argument. 1000 """ 1001 1002 a = self.module.StateAccessType() 1003 b = self.module.StateAccessType() 1004 1005 methods = { 1006 'clinic': a.increment_count_clinic, 1007 'noclinic': a.increment_count_noclinic, 1008 } 1009 1010 for name, increment_count in methods.items(): 1011 with self.subTest(name): 1012 self.assertEqual(a.get_count(), b.get_count()) 1013 self.assertEqual(a.get_count(), 0) 1014 1015 increment_count() 1016 self.assertEqual(a.get_count(), b.get_count()) 1017 self.assertEqual(a.get_count(), 1) 1018 1019 increment_count(3) 1020 self.assertEqual(a.get_count(), b.get_count()) 1021 self.assertEqual(a.get_count(), 4) 1022 1023 increment_count(-2, twice=True) 1024 self.assertEqual(a.get_count(), b.get_count()) 1025 self.assertEqual(a.get_count(), 0) 1026 1027 with self.assertRaises(TypeError): 1028 increment_count(thrice=3) 1029 1030 with self.assertRaises(TypeError): 1031 increment_count(1, 2, 3) 1032 1033 1034if __name__ == "__main__": 1035 unittest.main() 1036