1# Run the _testcapi module tests (tests for the Python/C API): by defn, 2# these are all functions _testcapi exports whose name begins with 'test_'. 3 4from collections import OrderedDict 5import os 6import pickle 7import random 8import re 9import subprocess 10import sys 11import textwrap 12import threading 13import time 14import unittest 15from test import support 16from test.support import MISSING_C_DOCSTRINGS 17from test.support.script_helper import assert_python_failure, assert_python_ok 18try: 19 import _posixsubprocess 20except ImportError: 21 _posixsubprocess = None 22 23# Skip this test if the _testcapi module isn't available. 24_testcapi = support.import_module('_testcapi') 25 26# Were we compiled --with-pydebug or with #define Py_DEBUG? 27Py_DEBUG = hasattr(sys, 'gettotalrefcount') 28 29 30def testfunction(self): 31 """some doc""" 32 return self 33 34 35class InstanceMethod: 36 id = _testcapi.instancemethod(id) 37 testfunction = _testcapi.instancemethod(testfunction) 38 39class CAPITest(unittest.TestCase): 40 41 def test_instancemethod(self): 42 inst = InstanceMethod() 43 self.assertEqual(id(inst), inst.id()) 44 self.assertTrue(inst.testfunction() is inst) 45 self.assertEqual(inst.testfunction.__doc__, testfunction.__doc__) 46 self.assertEqual(InstanceMethod.testfunction.__doc__, testfunction.__doc__) 47 48 InstanceMethod.testfunction.attribute = "test" 49 self.assertEqual(testfunction.attribute, "test") 50 self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test") 51 52 def test_no_FatalError_infinite_loop(self): 53 with support.SuppressCrashReport(): 54 p = subprocess.Popen([sys.executable, "-c", 55 'import _testcapi;' 56 '_testcapi.crash_no_current_thread()'], 57 stdout=subprocess.PIPE, 58 stderr=subprocess.PIPE) 59 (out, err) = p.communicate() 60 self.assertEqual(out, b'') 61 # This used to cause an infinite loop. 62 self.assertTrue(err.rstrip().startswith( 63 b'Fatal Python error:' 64 b' PyThreadState_Get: no current thread')) 65 66 def test_memoryview_from_NULL_pointer(self): 67 self.assertRaises(ValueError, _testcapi.make_memoryview_from_NULL_pointer) 68 69 def test_exc_info(self): 70 raised_exception = ValueError("5") 71 new_exc = TypeError("TEST") 72 try: 73 raise raised_exception 74 except ValueError as e: 75 tb = e.__traceback__ 76 orig_sys_exc_info = sys.exc_info() 77 orig_exc_info = _testcapi.set_exc_info(new_exc.__class__, new_exc, None) 78 new_sys_exc_info = sys.exc_info() 79 new_exc_info = _testcapi.set_exc_info(*orig_exc_info) 80 reset_sys_exc_info = sys.exc_info() 81 82 self.assertEqual(orig_exc_info[1], e) 83 84 self.assertSequenceEqual(orig_exc_info, (raised_exception.__class__, raised_exception, tb)) 85 self.assertSequenceEqual(orig_sys_exc_info, orig_exc_info) 86 self.assertSequenceEqual(reset_sys_exc_info, orig_exc_info) 87 self.assertSequenceEqual(new_exc_info, (new_exc.__class__, new_exc, None)) 88 self.assertSequenceEqual(new_sys_exc_info, new_exc_info) 89 else: 90 self.assertTrue(False) 91 92 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.') 93 def test_seq_bytes_to_charp_array(self): 94 # Issue #15732: crash in _PySequence_BytesToCharpArray() 95 class Z(object): 96 def __len__(self): 97 return 1 98 self.assertRaises(TypeError, _posixsubprocess.fork_exec, 99 1,Z(),3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17) 100 # Issue #15736: overflow in _PySequence_BytesToCharpArray() 101 class Z(object): 102 def __len__(self): 103 return sys.maxsize 104 def __getitem__(self, i): 105 return b'x' 106 self.assertRaises(MemoryError, _posixsubprocess.fork_exec, 107 1,Z(),3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17) 108 109 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.') 110 def test_subprocess_fork_exec(self): 111 class Z(object): 112 def __len__(self): 113 return 1 114 115 # Issue #15738: crash in subprocess_fork_exec() 116 self.assertRaises(TypeError, _posixsubprocess.fork_exec, 117 Z(),[b'1'],3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17) 118 119 @unittest.skipIf(MISSING_C_DOCSTRINGS, 120 "Signature information for builtins requires docstrings") 121 def test_docstring_signature_parsing(self): 122 123 self.assertEqual(_testcapi.no_docstring.__doc__, None) 124 self.assertEqual(_testcapi.no_docstring.__text_signature__, None) 125 126 self.assertEqual(_testcapi.docstring_empty.__doc__, None) 127 self.assertEqual(_testcapi.docstring_empty.__text_signature__, None) 128 129 self.assertEqual(_testcapi.docstring_no_signature.__doc__, 130 "This docstring has no signature.") 131 self.assertEqual(_testcapi.docstring_no_signature.__text_signature__, None) 132 133 self.assertEqual(_testcapi.docstring_with_invalid_signature.__doc__, 134 "docstring_with_invalid_signature($module, /, boo)\n" 135 "\n" 136 "This docstring has an invalid signature." 137 ) 138 self.assertEqual(_testcapi.docstring_with_invalid_signature.__text_signature__, None) 139 140 self.assertEqual(_testcapi.docstring_with_invalid_signature2.__doc__, 141 "docstring_with_invalid_signature2($module, /, boo)\n" 142 "\n" 143 "--\n" 144 "\n" 145 "This docstring also has an invalid signature." 146 ) 147 self.assertEqual(_testcapi.docstring_with_invalid_signature2.__text_signature__, None) 148 149 self.assertEqual(_testcapi.docstring_with_signature.__doc__, 150 "This docstring has a valid signature.") 151 self.assertEqual(_testcapi.docstring_with_signature.__text_signature__, "($module, /, sig)") 152 153 self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__doc__, None) 154 self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__text_signature__, 155 "($module, /, sig)") 156 157 self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__doc__, 158 "\nThis docstring has a valid signature and some extra newlines.") 159 self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__text_signature__, 160 "($module, /, parameter)") 161 162 def test_c_type_with_matrix_multiplication(self): 163 M = _testcapi.matmulType 164 m1 = M() 165 m2 = M() 166 self.assertEqual(m1 @ m2, ("matmul", m1, m2)) 167 self.assertEqual(m1 @ 42, ("matmul", m1, 42)) 168 self.assertEqual(42 @ m1, ("matmul", 42, m1)) 169 o = m1 170 o @= m2 171 self.assertEqual(o, ("imatmul", m1, m2)) 172 o = m1 173 o @= 42 174 self.assertEqual(o, ("imatmul", m1, 42)) 175 o = 42 176 o @= m1 177 self.assertEqual(o, ("matmul", 42, m1)) 178 179 def test_c_type_with_ipow(self): 180 # When the __ipow__ method of a type was implemented in C, using the 181 # modulo param would cause segfaults. 182 o = _testcapi.ipowType() 183 self.assertEqual(o.__ipow__(1), (1, None)) 184 self.assertEqual(o.__ipow__(2, 2), (2, 2)) 185 186 def test_return_null_without_error(self): 187 # Issue #23571: A function must not return NULL without setting an 188 # error 189 if Py_DEBUG: 190 code = textwrap.dedent(""" 191 import _testcapi 192 from test import support 193 194 with support.SuppressCrashReport(): 195 _testcapi.return_null_without_error() 196 """) 197 rc, out, err = assert_python_failure('-c', code) 198 self.assertRegex(err.replace(b'\r', b''), 199 br'Fatal Python error: a function returned NULL ' 200 br'without setting an error\n' 201 br'Python runtime state: initialized\n' 202 br'SystemError: <built-in function ' 203 br'return_null_without_error> returned NULL ' 204 br'without setting an error\n' 205 br'\n' 206 br'Current thread.*:\n' 207 br' File .*", line 6 in <module>') 208 else: 209 with self.assertRaises(SystemError) as cm: 210 _testcapi.return_null_without_error() 211 self.assertRegex(str(cm.exception), 212 'return_null_without_error.* ' 213 'returned NULL without setting an error') 214 215 def test_return_result_with_error(self): 216 # Issue #23571: A function must not return a result with an error set 217 if Py_DEBUG: 218 code = textwrap.dedent(""" 219 import _testcapi 220 from test import support 221 222 with support.SuppressCrashReport(): 223 _testcapi.return_result_with_error() 224 """) 225 rc, out, err = assert_python_failure('-c', code) 226 self.assertRegex(err.replace(b'\r', b''), 227 br'Fatal Python error: a function returned a ' 228 br'result with an error set\n' 229 br'Python runtime state: initialized\n' 230 br'ValueError\n' 231 br'\n' 232 br'The above exception was the direct cause ' 233 br'of the following exception:\n' 234 br'\n' 235 br'SystemError: <built-in ' 236 br'function return_result_with_error> ' 237 br'returned a result with an error set\n' 238 br'\n' 239 br'Current thread.*:\n' 240 br' File .*, line 6 in <module>') 241 else: 242 with self.assertRaises(SystemError) as cm: 243 _testcapi.return_result_with_error() 244 self.assertRegex(str(cm.exception), 245 'return_result_with_error.* ' 246 'returned a result with an error set') 247 248 def test_buildvalue_N(self): 249 _testcapi.test_buildvalue_N() 250 251 def test_set_nomemory(self): 252 code = """if 1: 253 import _testcapi 254 255 class C(): pass 256 257 # The first loop tests both functions and that remove_mem_hooks() 258 # can be called twice in a row. The second loop checks a call to 259 # set_nomemory() after a call to remove_mem_hooks(). The third 260 # loop checks the start and stop arguments of set_nomemory(). 261 for outer_cnt in range(1, 4): 262 start = 10 * outer_cnt 263 for j in range(100): 264 if j == 0: 265 if outer_cnt != 3: 266 _testcapi.set_nomemory(start) 267 else: 268 _testcapi.set_nomemory(start, start + 1) 269 try: 270 C() 271 except MemoryError as e: 272 if outer_cnt != 3: 273 _testcapi.remove_mem_hooks() 274 print('MemoryError', outer_cnt, j) 275 _testcapi.remove_mem_hooks() 276 break 277 """ 278 rc, out, err = assert_python_ok('-c', code) 279 self.assertIn(b'MemoryError 1 10', out) 280 self.assertIn(b'MemoryError 2 20', out) 281 self.assertIn(b'MemoryError 3 30', out) 282 283 def test_mapping_keys_values_items(self): 284 class Mapping1(dict): 285 def keys(self): 286 return list(super().keys()) 287 def values(self): 288 return list(super().values()) 289 def items(self): 290 return list(super().items()) 291 class Mapping2(dict): 292 def keys(self): 293 return tuple(super().keys()) 294 def values(self): 295 return tuple(super().values()) 296 def items(self): 297 return tuple(super().items()) 298 dict_obj = {'foo': 1, 'bar': 2, 'spam': 3} 299 300 for mapping in [{}, OrderedDict(), Mapping1(), Mapping2(), 301 dict_obj, OrderedDict(dict_obj), 302 Mapping1(dict_obj), Mapping2(dict_obj)]: 303 self.assertListEqual(_testcapi.get_mapping_keys(mapping), 304 list(mapping.keys())) 305 self.assertListEqual(_testcapi.get_mapping_values(mapping), 306 list(mapping.values())) 307 self.assertListEqual(_testcapi.get_mapping_items(mapping), 308 list(mapping.items())) 309 310 def test_mapping_keys_values_items_bad_arg(self): 311 self.assertRaises(AttributeError, _testcapi.get_mapping_keys, None) 312 self.assertRaises(AttributeError, _testcapi.get_mapping_values, None) 313 self.assertRaises(AttributeError, _testcapi.get_mapping_items, None) 314 315 class BadMapping: 316 def keys(self): 317 return None 318 def values(self): 319 return None 320 def items(self): 321 return None 322 bad_mapping = BadMapping() 323 self.assertRaises(TypeError, _testcapi.get_mapping_keys, bad_mapping) 324 self.assertRaises(TypeError, _testcapi.get_mapping_values, bad_mapping) 325 self.assertRaises(TypeError, _testcapi.get_mapping_items, bad_mapping) 326 327 @unittest.skipUnless(hasattr(_testcapi, 'negative_refcount'), 328 'need _testcapi.negative_refcount') 329 def test_negative_refcount(self): 330 # bpo-35059: Check that Py_DECREF() reports the correct filename 331 # when calling _Py_NegativeRefcount() to abort Python. 332 code = textwrap.dedent(""" 333 import _testcapi 334 from test import support 335 336 with support.SuppressCrashReport(): 337 _testcapi.negative_refcount() 338 """) 339 rc, out, err = assert_python_failure('-c', code) 340 self.assertRegex(err, 341 br'_testcapimodule\.c:[0-9]+: ' 342 br'_Py_NegativeRefcount: Assertion failed: ' 343 br'object has negative ref count') 344 345 def test_trashcan_subclass(self): 346 # bpo-35983: Check that the trashcan mechanism for "list" is NOT 347 # activated when its tp_dealloc is being called by a subclass 348 from _testcapi import MyList 349 L = None 350 for i in range(1000): 351 L = MyList((L,)) 352 353 @support.requires_resource('cpu') 354 def test_trashcan_python_class1(self): 355 self.do_test_trashcan_python_class(list) 356 357 @support.requires_resource('cpu') 358 def test_trashcan_python_class2(self): 359 from _testcapi import MyList 360 self.do_test_trashcan_python_class(MyList) 361 362 def do_test_trashcan_python_class(self, base): 363 # Check that the trashcan mechanism works properly for a Python 364 # subclass of a class using the trashcan (this specific test assumes 365 # that the base class "base" behaves like list) 366 class PyList(base): 367 # Count the number of PyList instances to verify that there is 368 # no memory leak 369 num = 0 370 def __init__(self, *args): 371 __class__.num += 1 372 super().__init__(*args) 373 def __del__(self): 374 __class__.num -= 1 375 376 for parity in (0, 1): 377 L = None 378 # We need in the order of 2**20 iterations here such that a 379 # typical 8MB stack would overflow without the trashcan. 380 for i in range(2**20): 381 L = PyList((L,)) 382 L.attr = i 383 if parity: 384 # Add one additional nesting layer 385 L = (L,) 386 self.assertGreater(PyList.num, 0) 387 del L 388 self.assertEqual(PyList.num, 0) 389 390 def test_subclass_of_heap_gc_ctype_with_tpdealloc_decrefs_once(self): 391 class HeapGcCTypeSubclass(_testcapi.HeapGcCType): 392 def __init__(self): 393 self.value2 = 20 394 super().__init__() 395 396 subclass_instance = HeapGcCTypeSubclass() 397 type_refcnt = sys.getrefcount(HeapGcCTypeSubclass) 398 399 # Test that subclass instance was fully created 400 self.assertEqual(subclass_instance.value, 10) 401 self.assertEqual(subclass_instance.value2, 20) 402 403 # Test that the type reference count is only decremented once 404 del subclass_instance 405 self.assertEqual(type_refcnt - 1, sys.getrefcount(HeapGcCTypeSubclass)) 406 407 def test_subclass_of_heap_gc_ctype_with_del_modifying_dunder_class_only_decrefs_once(self): 408 class A(_testcapi.HeapGcCType): 409 def __init__(self): 410 self.value2 = 20 411 super().__init__() 412 413 class B(A): 414 def __init__(self): 415 super().__init__() 416 417 def __del__(self): 418 self.__class__ = A 419 A.refcnt_in_del = sys.getrefcount(A) 420 B.refcnt_in_del = sys.getrefcount(B) 421 422 subclass_instance = B() 423 type_refcnt = sys.getrefcount(B) 424 new_type_refcnt = sys.getrefcount(A) 425 426 # Test that subclass instance was fully created 427 self.assertEqual(subclass_instance.value, 10) 428 self.assertEqual(subclass_instance.value2, 20) 429 430 del subclass_instance 431 432 # Test that setting __class__ modified the reference counts of the types 433 self.assertEqual(type_refcnt - 1, B.refcnt_in_del) 434 self.assertEqual(new_type_refcnt + 1, A.refcnt_in_del) 435 436 # Test that the original type already has decreased its refcnt 437 self.assertEqual(type_refcnt - 1, sys.getrefcount(B)) 438 439 # Test that subtype_dealloc decref the newly assigned __class__ only once 440 self.assertEqual(new_type_refcnt, sys.getrefcount(A)) 441 442 def test_c_subclass_of_heap_ctype_with_tpdealloc_decrefs_once(self): 443 subclass_instance = _testcapi.HeapCTypeSubclass() 444 type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclass) 445 446 # Test that subclass instance was fully created 447 self.assertEqual(subclass_instance.value, 10) 448 self.assertEqual(subclass_instance.value2, 20) 449 450 # Test that the type reference count is only decremented once 451 del subclass_instance 452 self.assertEqual(type_refcnt - 1, sys.getrefcount(_testcapi.HeapCTypeSubclass)) 453 454 def test_c_subclass_of_heap_ctype_with_del_modifying_dunder_class_only_decrefs_once(self): 455 subclass_instance = _testcapi.HeapCTypeSubclassWithFinalizer() 456 type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclassWithFinalizer) 457 new_type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclass) 458 459 # Test that subclass instance was fully created 460 self.assertEqual(subclass_instance.value, 10) 461 self.assertEqual(subclass_instance.value2, 20) 462 463 # The tp_finalize slot will set __class__ to HeapCTypeSubclass 464 del subclass_instance 465 466 # Test that setting __class__ modified the reference counts of the types 467 self.assertEqual(type_refcnt - 1, _testcapi.HeapCTypeSubclassWithFinalizer.refcnt_in_del) 468 self.assertEqual(new_type_refcnt + 1, _testcapi.HeapCTypeSubclass.refcnt_in_del) 469 470 # Test that the original type already has decreased its refcnt 471 self.assertEqual(type_refcnt - 1, sys.getrefcount(_testcapi.HeapCTypeSubclassWithFinalizer)) 472 473 # Test that subtype_dealloc decref the newly assigned __class__ only once 474 self.assertEqual(new_type_refcnt, sys.getrefcount(_testcapi.HeapCTypeSubclass)) 475 476 477class TestPendingCalls(unittest.TestCase): 478 479 def pendingcalls_submit(self, l, n): 480 def callback(): 481 #this function can be interrupted by thread switching so let's 482 #use an atomic operation 483 l.append(None) 484 485 for i in range(n): 486 time.sleep(random.random()*0.02) #0.01 secs on average 487 #try submitting callback until successful. 488 #rely on regular interrupt to flush queue if we are 489 #unsuccessful. 490 while True: 491 if _testcapi._pending_threadfunc(callback): 492 break; 493 494 def pendingcalls_wait(self, l, n, context = None): 495 #now, stick around until l[0] has grown to 10 496 count = 0; 497 while len(l) != n: 498 #this busy loop is where we expect to be interrupted to 499 #run our callbacks. Note that callbacks are only run on the 500 #main thread 501 if False and support.verbose: 502 print("(%i)"%(len(l),),) 503 for i in range(1000): 504 a = i*i 505 if context and not context.event.is_set(): 506 continue 507 count += 1 508 self.assertTrue(count < 10000, 509 "timeout waiting for %i callbacks, got %i"%(n, len(l))) 510 if False and support.verbose: 511 print("(%i)"%(len(l),)) 512 513 def test_pendingcalls_threaded(self): 514 515 #do every callback on a separate thread 516 n = 32 #total callbacks 517 threads = [] 518 class foo(object):pass 519 context = foo() 520 context.l = [] 521 context.n = 2 #submits per thread 522 context.nThreads = n // context.n 523 context.nFinished = 0 524 context.lock = threading.Lock() 525 context.event = threading.Event() 526 527 threads = [threading.Thread(target=self.pendingcalls_thread, 528 args=(context,)) 529 for i in range(context.nThreads)] 530 with support.start_threads(threads): 531 self.pendingcalls_wait(context.l, n, context) 532 533 def pendingcalls_thread(self, context): 534 try: 535 self.pendingcalls_submit(context.l, context.n) 536 finally: 537 with context.lock: 538 context.nFinished += 1 539 nFinished = context.nFinished 540 if False and support.verbose: 541 print("finished threads: ", nFinished) 542 if nFinished == context.nThreads: 543 context.event.set() 544 545 def test_pendingcalls_non_threaded(self): 546 #again, just using the main thread, likely they will all be dispatched at 547 #once. It is ok to ask for too many, because we loop until we find a slot. 548 #the loop can be interrupted to dispatch. 549 #there are only 32 dispatch slots, so we go for twice that! 550 l = [] 551 n = 64 552 self.pendingcalls_submit(l, n) 553 self.pendingcalls_wait(l, n) 554 555 556class SubinterpreterTest(unittest.TestCase): 557 558 def test_subinterps(self): 559 import builtins 560 r, w = os.pipe() 561 code = """if 1: 562 import sys, builtins, pickle 563 with open({:d}, "wb") as f: 564 pickle.dump(id(sys.modules), f) 565 pickle.dump(id(builtins), f) 566 """.format(w) 567 with open(r, "rb") as f: 568 ret = support.run_in_subinterp(code) 569 self.assertEqual(ret, 0) 570 self.assertNotEqual(pickle.load(f), id(sys.modules)) 571 self.assertNotEqual(pickle.load(f), id(builtins)) 572 573 def test_mutate_exception(self): 574 """ 575 Exceptions saved in global module state get shared between 576 individual module instances. This test checks whether or not 577 a change in one interpreter's module gets reflected into the 578 other ones. 579 """ 580 import binascii 581 582 support.run_in_subinterp("import binascii; binascii.Error.foobar = 'foobar'") 583 584 self.assertFalse(hasattr(binascii.Error, "foobar")) 585 586 587class TestThreadState(unittest.TestCase): 588 589 @support.reap_threads 590 def test_thread_state(self): 591 # some extra thread-state tests driven via _testcapi 592 def target(): 593 idents = [] 594 595 def callback(): 596 idents.append(threading.get_ident()) 597 598 _testcapi._test_thread_state(callback) 599 a = b = callback 600 time.sleep(1) 601 # Check our main thread is in the list exactly 3 times. 602 self.assertEqual(idents.count(threading.get_ident()), 3, 603 "Couldn't find main thread correctly in the list") 604 605 target() 606 t = threading.Thread(target=target) 607 t.start() 608 t.join() 609 610 611class Test_testcapi(unittest.TestCase): 612 locals().update((name, getattr(_testcapi, name)) 613 for name in dir(_testcapi) 614 if name.startswith('test_') and not name.endswith('_code')) 615 616 617class PyMemDebugTests(unittest.TestCase): 618 PYTHONMALLOC = 'debug' 619 # '0x04c06e0' or '04C06E0' 620 PTR_REGEX = r'(?:0x)?[0-9a-fA-F]+' 621 622 def check(self, code): 623 with support.SuppressCrashReport(): 624 out = assert_python_failure('-c', code, 625 PYTHONMALLOC=self.PYTHONMALLOC) 626 stderr = out.err 627 return stderr.decode('ascii', 'replace') 628 629 def test_buffer_overflow(self): 630 out = self.check('import _testcapi; _testcapi.pymem_buffer_overflow()') 631 regex = (r"Debug memory block at address p={ptr}: API 'm'\n" 632 r" 16 bytes originally requested\n" 633 r" The [0-9] pad bytes at p-[0-9] are FORBIDDENBYTE, as expected.\n" 634 r" The [0-9] pad bytes at tail={ptr} are not all FORBIDDENBYTE \(0x[0-9a-f]{{2}}\):\n" 635 r" at tail\+0: 0x78 \*\*\* OUCH\n" 636 r" at tail\+1: 0xfd\n" 637 r" at tail\+2: 0xfd\n" 638 r" .*\n" 639 r"( The block was made by call #[0-9]+ to debug malloc/realloc.\n)?" 640 r" Data at p: cd cd cd .*\n" 641 r"\n" 642 r"Enable tracemalloc to get the memory block allocation traceback\n" 643 r"\n" 644 r"Fatal Python error: bad trailing pad byte") 645 regex = regex.format(ptr=self.PTR_REGEX) 646 regex = re.compile(regex, flags=re.DOTALL) 647 self.assertRegex(out, regex) 648 649 def test_api_misuse(self): 650 out = self.check('import _testcapi; _testcapi.pymem_api_misuse()') 651 regex = (r"Debug memory block at address p={ptr}: API 'm'\n" 652 r" 16 bytes originally requested\n" 653 r" The [0-9] pad bytes at p-[0-9] are FORBIDDENBYTE, as expected.\n" 654 r" The [0-9] pad bytes at tail={ptr} are FORBIDDENBYTE, as expected.\n" 655 r"( The block was made by call #[0-9]+ to debug malloc/realloc.\n)?" 656 r" Data at p: cd cd cd .*\n" 657 r"\n" 658 r"Enable tracemalloc to get the memory block allocation traceback\n" 659 r"\n" 660 r"Fatal Python error: bad ID: Allocated using API 'm', verified using API 'r'\n") 661 regex = regex.format(ptr=self.PTR_REGEX) 662 self.assertRegex(out, regex) 663 664 def check_malloc_without_gil(self, code): 665 out = self.check(code) 666 expected = ('Fatal Python error: Python memory allocator called ' 667 'without holding the GIL') 668 self.assertIn(expected, out) 669 670 def test_pymem_malloc_without_gil(self): 671 # Debug hooks must raise an error if PyMem_Malloc() is called 672 # without holding the GIL 673 code = 'import _testcapi; _testcapi.pymem_malloc_without_gil()' 674 self.check_malloc_without_gil(code) 675 676 def test_pyobject_malloc_without_gil(self): 677 # Debug hooks must raise an error if PyObject_Malloc() is called 678 # without holding the GIL 679 code = 'import _testcapi; _testcapi.pyobject_malloc_without_gil()' 680 self.check_malloc_without_gil(code) 681 682 def check_pyobject_is_freed(self, func_name): 683 code = textwrap.dedent(f''' 684 import gc, os, sys, _testcapi 685 # Disable the GC to avoid crash on GC collection 686 gc.disable() 687 try: 688 _testcapi.{func_name}() 689 # Exit immediately to avoid a crash while deallocating 690 # the invalid object 691 os._exit(0) 692 except _testcapi.error: 693 os._exit(1) 694 ''') 695 assert_python_ok('-c', code, PYTHONMALLOC=self.PYTHONMALLOC) 696 697 def test_pyobject_null_is_freed(self): 698 self.check_pyobject_is_freed('check_pyobject_null_is_freed') 699 700 def test_pyobject_uninitialized_is_freed(self): 701 self.check_pyobject_is_freed('check_pyobject_uninitialized_is_freed') 702 703 def test_pyobject_forbidden_bytes_is_freed(self): 704 self.check_pyobject_is_freed('check_pyobject_forbidden_bytes_is_freed') 705 706 def test_pyobject_freed_is_freed(self): 707 self.check_pyobject_is_freed('check_pyobject_freed_is_freed') 708 709 710class PyMemMallocDebugTests(PyMemDebugTests): 711 PYTHONMALLOC = 'malloc_debug' 712 713 714@unittest.skipUnless(support.with_pymalloc(), 'need pymalloc') 715class PyMemPymallocDebugTests(PyMemDebugTests): 716 PYTHONMALLOC = 'pymalloc_debug' 717 718 719@unittest.skipUnless(Py_DEBUG, 'need Py_DEBUG') 720class PyMemDefaultTests(PyMemDebugTests): 721 # test default allocator of Python compiled in debug mode 722 PYTHONMALLOC = '' 723 724 725if __name__ == "__main__": 726 unittest.main() 727