1# Run the _testcapi module tests (tests for the Python/C API): by defn, 2# these are all functions _testcapi exports whose name begins with 'test_'. 3 4from collections import OrderedDict 5import os 6import pickle 7import random 8import re 9import subprocess 10import sys 11import textwrap 12import threading 13import time 14import unittest 15import weakref 16import importlib.machinery 17import importlib.util 18from test import support 19from test.support import MISSING_C_DOCSTRINGS 20from test.support.script_helper import assert_python_failure, assert_python_ok 21try: 22 import _posixsubprocess 23except ImportError: 24 _posixsubprocess = None 25 26# Skip this test if the _testcapi module isn't available. 27_testcapi = support.import_module('_testcapi') 28 29import _testinternalcapi 30 31# Were we compiled --with-pydebug or with #define Py_DEBUG? 32Py_DEBUG = hasattr(sys, 'gettotalrefcount') 33 34 35def testfunction(self): 36 """some doc""" 37 return self 38 39 40class InstanceMethod: 41 id = _testcapi.instancemethod(id) 42 testfunction = _testcapi.instancemethod(testfunction) 43 44class CAPITest(unittest.TestCase): 45 46 def test_instancemethod(self): 47 inst = InstanceMethod() 48 self.assertEqual(id(inst), inst.id()) 49 self.assertTrue(inst.testfunction() is inst) 50 self.assertEqual(inst.testfunction.__doc__, testfunction.__doc__) 51 self.assertEqual(InstanceMethod.testfunction.__doc__, testfunction.__doc__) 52 53 InstanceMethod.testfunction.attribute = "test" 54 self.assertEqual(testfunction.attribute, "test") 55 self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test") 56 57 def test_no_FatalError_infinite_loop(self): 58 with support.SuppressCrashReport(): 59 p = subprocess.Popen([sys.executable, "-c", 60 'import _testcapi;' 61 '_testcapi.crash_no_current_thread()'], 62 stdout=subprocess.PIPE, 63 stderr=subprocess.PIPE) 64 (out, err) = p.communicate() 65 self.assertEqual(out, b'') 66 # This used to cause an infinite loop. 67 self.assertTrue(err.rstrip().startswith( 68 b'Fatal Python error: ' 69 b'PyThreadState_Get: ' 70 b'the function must be called with the GIL held, ' 71 b'but the GIL is released ' 72 b'(the current Python thread state is NULL)'), 73 err) 74 75 def test_memoryview_from_NULL_pointer(self): 76 self.assertRaises(ValueError, _testcapi.make_memoryview_from_NULL_pointer) 77 78 def test_exc_info(self): 79 raised_exception = ValueError("5") 80 new_exc = TypeError("TEST") 81 try: 82 raise raised_exception 83 except ValueError as e: 84 tb = e.__traceback__ 85 orig_sys_exc_info = sys.exc_info() 86 orig_exc_info = _testcapi.set_exc_info(new_exc.__class__, new_exc, None) 87 new_sys_exc_info = sys.exc_info() 88 new_exc_info = _testcapi.set_exc_info(*orig_exc_info) 89 reset_sys_exc_info = sys.exc_info() 90 91 self.assertEqual(orig_exc_info[1], e) 92 93 self.assertSequenceEqual(orig_exc_info, (raised_exception.__class__, raised_exception, tb)) 94 self.assertSequenceEqual(orig_sys_exc_info, orig_exc_info) 95 self.assertSequenceEqual(reset_sys_exc_info, orig_exc_info) 96 self.assertSequenceEqual(new_exc_info, (new_exc.__class__, new_exc, None)) 97 self.assertSequenceEqual(new_sys_exc_info, new_exc_info) 98 else: 99 self.assertTrue(False) 100 101 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.') 102 def test_seq_bytes_to_charp_array(self): 103 # Issue #15732: crash in _PySequence_BytesToCharpArray() 104 class Z(object): 105 def __len__(self): 106 return 1 107 self.assertRaises(TypeError, _posixsubprocess.fork_exec, 108 1,Z(),3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21) 109 # Issue #15736: overflow in _PySequence_BytesToCharpArray() 110 class Z(object): 111 def __len__(self): 112 return sys.maxsize 113 def __getitem__(self, i): 114 return b'x' 115 self.assertRaises(MemoryError, _posixsubprocess.fork_exec, 116 1,Z(),3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21) 117 118 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.') 119 def test_subprocess_fork_exec(self): 120 class Z(object): 121 def __len__(self): 122 return 1 123 124 # Issue #15738: crash in subprocess_fork_exec() 125 self.assertRaises(TypeError, _posixsubprocess.fork_exec, 126 Z(),[b'1'],3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21) 127 128 @unittest.skipIf(MISSING_C_DOCSTRINGS, 129 "Signature information for builtins requires docstrings") 130 def test_docstring_signature_parsing(self): 131 132 self.assertEqual(_testcapi.no_docstring.__doc__, None) 133 self.assertEqual(_testcapi.no_docstring.__text_signature__, None) 134 135 self.assertEqual(_testcapi.docstring_empty.__doc__, None) 136 self.assertEqual(_testcapi.docstring_empty.__text_signature__, None) 137 138 self.assertEqual(_testcapi.docstring_no_signature.__doc__, 139 "This docstring has no signature.") 140 self.assertEqual(_testcapi.docstring_no_signature.__text_signature__, None) 141 142 self.assertEqual(_testcapi.docstring_with_invalid_signature.__doc__, 143 "docstring_with_invalid_signature($module, /, boo)\n" 144 "\n" 145 "This docstring has an invalid signature." 146 ) 147 self.assertEqual(_testcapi.docstring_with_invalid_signature.__text_signature__, None) 148 149 self.assertEqual(_testcapi.docstring_with_invalid_signature2.__doc__, 150 "docstring_with_invalid_signature2($module, /, boo)\n" 151 "\n" 152 "--\n" 153 "\n" 154 "This docstring also has an invalid signature." 155 ) 156 self.assertEqual(_testcapi.docstring_with_invalid_signature2.__text_signature__, None) 157 158 self.assertEqual(_testcapi.docstring_with_signature.__doc__, 159 "This docstring has a valid signature.") 160 self.assertEqual(_testcapi.docstring_with_signature.__text_signature__, "($module, /, sig)") 161 162 self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__doc__, None) 163 self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__text_signature__, 164 "($module, /, sig)") 165 166 self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__doc__, 167 "\nThis docstring has a valid signature and some extra newlines.") 168 self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__text_signature__, 169 "($module, /, parameter)") 170 171 def test_c_type_with_matrix_multiplication(self): 172 M = _testcapi.matmulType 173 m1 = M() 174 m2 = M() 175 self.assertEqual(m1 @ m2, ("matmul", m1, m2)) 176 self.assertEqual(m1 @ 42, ("matmul", m1, 42)) 177 self.assertEqual(42 @ m1, ("matmul", 42, m1)) 178 o = m1 179 o @= m2 180 self.assertEqual(o, ("imatmul", m1, m2)) 181 o = m1 182 o @= 42 183 self.assertEqual(o, ("imatmul", m1, 42)) 184 o = 42 185 o @= m1 186 self.assertEqual(o, ("matmul", 42, m1)) 187 188 def test_c_type_with_ipow(self): 189 # When the __ipow__ method of a type was implemented in C, using the 190 # modulo param would cause segfaults. 191 o = _testcapi.ipowType() 192 self.assertEqual(o.__ipow__(1), (1, None)) 193 self.assertEqual(o.__ipow__(2, 2), (2, 2)) 194 195 def test_return_null_without_error(self): 196 # Issue #23571: A function must not return NULL without setting an 197 # error 198 if Py_DEBUG: 199 code = textwrap.dedent(""" 200 import _testcapi 201 from test import support 202 203 with support.SuppressCrashReport(): 204 _testcapi.return_null_without_error() 205 """) 206 rc, out, err = assert_python_failure('-c', code) 207 self.assertRegex(err.replace(b'\r', b''), 208 br'Fatal Python error: _Py_CheckFunctionResult: ' 209 br'a function returned NULL ' 210 br'without setting an error\n' 211 br'Python runtime state: initialized\n' 212 br'SystemError: <built-in function ' 213 br'return_null_without_error> returned NULL ' 214 br'without setting an error\n' 215 br'\n' 216 br'Current thread.*:\n' 217 br' File .*", line 6 in <module>') 218 else: 219 with self.assertRaises(SystemError) as cm: 220 _testcapi.return_null_without_error() 221 self.assertRegex(str(cm.exception), 222 'return_null_without_error.* ' 223 'returned NULL without setting an error') 224 225 def test_return_result_with_error(self): 226 # Issue #23571: A function must not return a result with an error set 227 if Py_DEBUG: 228 code = textwrap.dedent(""" 229 import _testcapi 230 from test import support 231 232 with support.SuppressCrashReport(): 233 _testcapi.return_result_with_error() 234 """) 235 rc, out, err = assert_python_failure('-c', code) 236 self.assertRegex(err.replace(b'\r', b''), 237 br'Fatal Python error: _Py_CheckFunctionResult: ' 238 br'a function returned a result ' 239 br'with an error set\n' 240 br'Python runtime state: initialized\n' 241 br'ValueError\n' 242 br'\n' 243 br'The above exception was the direct cause ' 244 br'of the following exception:\n' 245 br'\n' 246 br'SystemError: <built-in ' 247 br'function return_result_with_error> ' 248 br'returned a result with an error set\n' 249 br'\n' 250 br'Current thread.*:\n' 251 br' File .*, line 6 in <module>') 252 else: 253 with self.assertRaises(SystemError) as cm: 254 _testcapi.return_result_with_error() 255 self.assertRegex(str(cm.exception), 256 'return_result_with_error.* ' 257 'returned a result with an error set') 258 259 def test_buildvalue_N(self): 260 _testcapi.test_buildvalue_N() 261 262 def test_set_nomemory(self): 263 code = """if 1: 264 import _testcapi 265 266 class C(): pass 267 268 # The first loop tests both functions and that remove_mem_hooks() 269 # can be called twice in a row. The second loop checks a call to 270 # set_nomemory() after a call to remove_mem_hooks(). The third 271 # loop checks the start and stop arguments of set_nomemory(). 272 for outer_cnt in range(1, 4): 273 start = 10 * outer_cnt 274 for j in range(100): 275 if j == 0: 276 if outer_cnt != 3: 277 _testcapi.set_nomemory(start) 278 else: 279 _testcapi.set_nomemory(start, start + 1) 280 try: 281 C() 282 except MemoryError as e: 283 if outer_cnt != 3: 284 _testcapi.remove_mem_hooks() 285 print('MemoryError', outer_cnt, j) 286 _testcapi.remove_mem_hooks() 287 break 288 """ 289 rc, out, err = assert_python_ok('-c', code) 290 self.assertIn(b'MemoryError 1 10', out) 291 self.assertIn(b'MemoryError 2 20', out) 292 self.assertIn(b'MemoryError 3 30', out) 293 294 def test_mapping_keys_values_items(self): 295 class Mapping1(dict): 296 def keys(self): 297 return list(super().keys()) 298 def values(self): 299 return list(super().values()) 300 def items(self): 301 return list(super().items()) 302 class Mapping2(dict): 303 def keys(self): 304 return tuple(super().keys()) 305 def values(self): 306 return tuple(super().values()) 307 def items(self): 308 return tuple(super().items()) 309 dict_obj = {'foo': 1, 'bar': 2, 'spam': 3} 310 311 for mapping in [{}, OrderedDict(), Mapping1(), Mapping2(), 312 dict_obj, OrderedDict(dict_obj), 313 Mapping1(dict_obj), Mapping2(dict_obj)]: 314 self.assertListEqual(_testcapi.get_mapping_keys(mapping), 315 list(mapping.keys())) 316 self.assertListEqual(_testcapi.get_mapping_values(mapping), 317 list(mapping.values())) 318 self.assertListEqual(_testcapi.get_mapping_items(mapping), 319 list(mapping.items())) 320 321 def test_mapping_keys_values_items_bad_arg(self): 322 self.assertRaises(AttributeError, _testcapi.get_mapping_keys, None) 323 self.assertRaises(AttributeError, _testcapi.get_mapping_values, None) 324 self.assertRaises(AttributeError, _testcapi.get_mapping_items, None) 325 326 class BadMapping: 327 def keys(self): 328 return None 329 def values(self): 330 return None 331 def items(self): 332 return None 333 bad_mapping = BadMapping() 334 self.assertRaises(TypeError, _testcapi.get_mapping_keys, bad_mapping) 335 self.assertRaises(TypeError, _testcapi.get_mapping_values, bad_mapping) 336 self.assertRaises(TypeError, _testcapi.get_mapping_items, bad_mapping) 337 338 @unittest.skipUnless(hasattr(_testcapi, 'negative_refcount'), 339 'need _testcapi.negative_refcount') 340 def test_negative_refcount(self): 341 # bpo-35059: Check that Py_DECREF() reports the correct filename 342 # when calling _Py_NegativeRefcount() to abort Python. 343 code = textwrap.dedent(""" 344 import _testcapi 345 from test import support 346 347 with support.SuppressCrashReport(): 348 _testcapi.negative_refcount() 349 """) 350 rc, out, err = assert_python_failure('-c', code) 351 self.assertRegex(err, 352 br'_testcapimodule\.c:[0-9]+: ' 353 br'_Py_NegativeRefcount: Assertion failed: ' 354 br'object has negative ref count') 355 356 def test_trashcan_subclass(self): 357 # bpo-35983: Check that the trashcan mechanism for "list" is NOT 358 # activated when its tp_dealloc is being called by a subclass 359 from _testcapi import MyList 360 L = None 361 for i in range(1000): 362 L = MyList((L,)) 363 364 @support.requires_resource('cpu') 365 def test_trashcan_python_class1(self): 366 self.do_test_trashcan_python_class(list) 367 368 @support.requires_resource('cpu') 369 def test_trashcan_python_class2(self): 370 from _testcapi import MyList 371 self.do_test_trashcan_python_class(MyList) 372 373 def do_test_trashcan_python_class(self, base): 374 # Check that the trashcan mechanism works properly for a Python 375 # subclass of a class using the trashcan (this specific test assumes 376 # that the base class "base" behaves like list) 377 class PyList(base): 378 # Count the number of PyList instances to verify that there is 379 # no memory leak 380 num = 0 381 def __init__(self, *args): 382 __class__.num += 1 383 super().__init__(*args) 384 def __del__(self): 385 __class__.num -= 1 386 387 for parity in (0, 1): 388 L = None 389 # We need in the order of 2**20 iterations here such that a 390 # typical 8MB stack would overflow without the trashcan. 391 for i in range(2**20): 392 L = PyList((L,)) 393 L.attr = i 394 if parity: 395 # Add one additional nesting layer 396 L = (L,) 397 self.assertGreater(PyList.num, 0) 398 del L 399 self.assertEqual(PyList.num, 0) 400 401 def test_subclass_of_heap_gc_ctype_with_tpdealloc_decrefs_once(self): 402 class HeapGcCTypeSubclass(_testcapi.HeapGcCType): 403 def __init__(self): 404 self.value2 = 20 405 super().__init__() 406 407 subclass_instance = HeapGcCTypeSubclass() 408 type_refcnt = sys.getrefcount(HeapGcCTypeSubclass) 409 410 # Test that subclass instance was fully created 411 self.assertEqual(subclass_instance.value, 10) 412 self.assertEqual(subclass_instance.value2, 20) 413 414 # Test that the type reference count is only decremented once 415 del subclass_instance 416 self.assertEqual(type_refcnt - 1, sys.getrefcount(HeapGcCTypeSubclass)) 417 418 def test_subclass_of_heap_gc_ctype_with_del_modifying_dunder_class_only_decrefs_once(self): 419 class A(_testcapi.HeapGcCType): 420 def __init__(self): 421 self.value2 = 20 422 super().__init__() 423 424 class B(A): 425 def __init__(self): 426 super().__init__() 427 428 def __del__(self): 429 self.__class__ = A 430 A.refcnt_in_del = sys.getrefcount(A) 431 B.refcnt_in_del = sys.getrefcount(B) 432 433 subclass_instance = B() 434 type_refcnt = sys.getrefcount(B) 435 new_type_refcnt = sys.getrefcount(A) 436 437 # Test that subclass instance was fully created 438 self.assertEqual(subclass_instance.value, 10) 439 self.assertEqual(subclass_instance.value2, 20) 440 441 del subclass_instance 442 443 # Test that setting __class__ modified the reference counts of the types 444 self.assertEqual(type_refcnt - 1, B.refcnt_in_del) 445 self.assertEqual(new_type_refcnt + 1, A.refcnt_in_del) 446 447 # Test that the original type already has decreased its refcnt 448 self.assertEqual(type_refcnt - 1, sys.getrefcount(B)) 449 450 # Test that subtype_dealloc decref the newly assigned __class__ only once 451 self.assertEqual(new_type_refcnt, sys.getrefcount(A)) 452 453 def test_heaptype_with_dict(self): 454 inst = _testcapi.HeapCTypeWithDict() 455 inst.foo = 42 456 self.assertEqual(inst.foo, 42) 457 self.assertEqual(inst.dictobj, inst.__dict__) 458 self.assertEqual(inst.dictobj, {"foo": 42}) 459 460 inst = _testcapi.HeapCTypeWithDict() 461 self.assertEqual({}, inst.__dict__) 462 463 def test_heaptype_with_negative_dict(self): 464 inst = _testcapi.HeapCTypeWithNegativeDict() 465 inst.foo = 42 466 self.assertEqual(inst.foo, 42) 467 self.assertEqual(inst.dictobj, inst.__dict__) 468 self.assertEqual(inst.dictobj, {"foo": 42}) 469 470 inst = _testcapi.HeapCTypeWithNegativeDict() 471 self.assertEqual({}, inst.__dict__) 472 473 def test_heaptype_with_weakref(self): 474 inst = _testcapi.HeapCTypeWithWeakref() 475 ref = weakref.ref(inst) 476 self.assertEqual(ref(), inst) 477 self.assertEqual(inst.weakreflist, ref) 478 479 def test_heaptype_with_buffer(self): 480 inst = _testcapi.HeapCTypeWithBuffer() 481 b = bytes(inst) 482 self.assertEqual(b, b"1234") 483 484 def test_c_subclass_of_heap_ctype_with_tpdealloc_decrefs_once(self): 485 subclass_instance = _testcapi.HeapCTypeSubclass() 486 type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclass) 487 488 # Test that subclass instance was fully created 489 self.assertEqual(subclass_instance.value, 10) 490 self.assertEqual(subclass_instance.value2, 20) 491 492 # Test that the type reference count is only decremented once 493 del subclass_instance 494 self.assertEqual(type_refcnt - 1, sys.getrefcount(_testcapi.HeapCTypeSubclass)) 495 496 def test_c_subclass_of_heap_ctype_with_del_modifying_dunder_class_only_decrefs_once(self): 497 subclass_instance = _testcapi.HeapCTypeSubclassWithFinalizer() 498 type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclassWithFinalizer) 499 new_type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclass) 500 501 # Test that subclass instance was fully created 502 self.assertEqual(subclass_instance.value, 10) 503 self.assertEqual(subclass_instance.value2, 20) 504 505 # The tp_finalize slot will set __class__ to HeapCTypeSubclass 506 del subclass_instance 507 508 # Test that setting __class__ modified the reference counts of the types 509 self.assertEqual(type_refcnt - 1, _testcapi.HeapCTypeSubclassWithFinalizer.refcnt_in_del) 510 self.assertEqual(new_type_refcnt + 1, _testcapi.HeapCTypeSubclass.refcnt_in_del) 511 512 # Test that the original type already has decreased its refcnt 513 self.assertEqual(type_refcnt - 1, sys.getrefcount(_testcapi.HeapCTypeSubclassWithFinalizer)) 514 515 # Test that subtype_dealloc decref the newly assigned __class__ only once 516 self.assertEqual(new_type_refcnt, sys.getrefcount(_testcapi.HeapCTypeSubclass)) 517 518 def test_heaptype_with_setattro(self): 519 obj = _testcapi.HeapCTypeSetattr() 520 self.assertEqual(obj.pvalue, 10) 521 obj.value = 12 522 self.assertEqual(obj.pvalue, 12) 523 del obj.value 524 self.assertEqual(obj.pvalue, 0) 525 526 def test_pynumber_tobase(self): 527 from _testcapi import pynumber_tobase 528 self.assertEqual(pynumber_tobase(123, 2), '0b1111011') 529 self.assertEqual(pynumber_tobase(123, 8), '0o173') 530 self.assertEqual(pynumber_tobase(123, 10), '123') 531 self.assertEqual(pynumber_tobase(123, 16), '0x7b') 532 self.assertEqual(pynumber_tobase(-123, 2), '-0b1111011') 533 self.assertEqual(pynumber_tobase(-123, 8), '-0o173') 534 self.assertEqual(pynumber_tobase(-123, 10), '-123') 535 self.assertEqual(pynumber_tobase(-123, 16), '-0x7b') 536 self.assertRaises(TypeError, pynumber_tobase, 123.0, 10) 537 self.assertRaises(TypeError, pynumber_tobase, '123', 10) 538 self.assertRaises(SystemError, pynumber_tobase, 123, 0) 539 540 541class TestPendingCalls(unittest.TestCase): 542 543 def pendingcalls_submit(self, l, n): 544 def callback(): 545 #this function can be interrupted by thread switching so let's 546 #use an atomic operation 547 l.append(None) 548 549 for i in range(n): 550 time.sleep(random.random()*0.02) #0.01 secs on average 551 #try submitting callback until successful. 552 #rely on regular interrupt to flush queue if we are 553 #unsuccessful. 554 while True: 555 if _testcapi._pending_threadfunc(callback): 556 break; 557 558 def pendingcalls_wait(self, l, n, context = None): 559 #now, stick around until l[0] has grown to 10 560 count = 0; 561 while len(l) != n: 562 #this busy loop is where we expect to be interrupted to 563 #run our callbacks. Note that callbacks are only run on the 564 #main thread 565 if False and support.verbose: 566 print("(%i)"%(len(l),),) 567 for i in range(1000): 568 a = i*i 569 if context and not context.event.is_set(): 570 continue 571 count += 1 572 self.assertTrue(count < 10000, 573 "timeout waiting for %i callbacks, got %i"%(n, len(l))) 574 if False and support.verbose: 575 print("(%i)"%(len(l),)) 576 577 def test_pendingcalls_threaded(self): 578 579 #do every callback on a separate thread 580 n = 32 #total callbacks 581 threads = [] 582 class foo(object):pass 583 context = foo() 584 context.l = [] 585 context.n = 2 #submits per thread 586 context.nThreads = n // context.n 587 context.nFinished = 0 588 context.lock = threading.Lock() 589 context.event = threading.Event() 590 591 threads = [threading.Thread(target=self.pendingcalls_thread, 592 args=(context,)) 593 for i in range(context.nThreads)] 594 with support.start_threads(threads): 595 self.pendingcalls_wait(context.l, n, context) 596 597 def pendingcalls_thread(self, context): 598 try: 599 self.pendingcalls_submit(context.l, context.n) 600 finally: 601 with context.lock: 602 context.nFinished += 1 603 nFinished = context.nFinished 604 if False and support.verbose: 605 print("finished threads: ", nFinished) 606 if nFinished == context.nThreads: 607 context.event.set() 608 609 def test_pendingcalls_non_threaded(self): 610 #again, just using the main thread, likely they will all be dispatched at 611 #once. It is ok to ask for too many, because we loop until we find a slot. 612 #the loop can be interrupted to dispatch. 613 #there are only 32 dispatch slots, so we go for twice that! 614 l = [] 615 n = 64 616 self.pendingcalls_submit(l, n) 617 self.pendingcalls_wait(l, n) 618 619 620class SubinterpreterTest(unittest.TestCase): 621 622 def test_subinterps(self): 623 import builtins 624 r, w = os.pipe() 625 code = """if 1: 626 import sys, builtins, pickle 627 with open({:d}, "wb") as f: 628 pickle.dump(id(sys.modules), f) 629 pickle.dump(id(builtins), f) 630 """.format(w) 631 with open(r, "rb") as f: 632 ret = support.run_in_subinterp(code) 633 self.assertEqual(ret, 0) 634 self.assertNotEqual(pickle.load(f), id(sys.modules)) 635 self.assertNotEqual(pickle.load(f), id(builtins)) 636 637 def test_subinterps_recent_language_features(self): 638 r, w = os.pipe() 639 code = """if 1: 640 import pickle 641 with open({:d}, "wb") as f: 642 643 @(lambda x:x) # Py 3.9 644 def noop(x): return x 645 646 a = (b := f'1{{2}}3') + noop('x') # Py 3.8 (:=) / 3.6 (f'') 647 648 async def foo(arg): return await arg # Py 3.5 649 650 pickle.dump(dict(a=a, b=b), f) 651 """.format(w) 652 653 with open(r, "rb") as f: 654 ret = support.run_in_subinterp(code) 655 self.assertEqual(ret, 0) 656 self.assertEqual(pickle.load(f), {'a': '123x', 'b': '123'}) 657 658 def test_mutate_exception(self): 659 """ 660 Exceptions saved in global module state get shared between 661 individual module instances. This test checks whether or not 662 a change in one interpreter's module gets reflected into the 663 other ones. 664 """ 665 import binascii 666 667 support.run_in_subinterp("import binascii; binascii.Error.foobar = 'foobar'") 668 669 self.assertFalse(hasattr(binascii.Error, "foobar")) 670 671 672class TestThreadState(unittest.TestCase): 673 674 @support.reap_threads 675 def test_thread_state(self): 676 # some extra thread-state tests driven via _testcapi 677 def target(): 678 idents = [] 679 680 def callback(): 681 idents.append(threading.get_ident()) 682 683 _testcapi._test_thread_state(callback) 684 a = b = callback 685 time.sleep(1) 686 # Check our main thread is in the list exactly 3 times. 687 self.assertEqual(idents.count(threading.get_ident()), 3, 688 "Couldn't find main thread correctly in the list") 689 690 target() 691 t = threading.Thread(target=target) 692 t.start() 693 t.join() 694 695 696class Test_testcapi(unittest.TestCase): 697 locals().update((name, getattr(_testcapi, name)) 698 for name in dir(_testcapi) 699 if name.startswith('test_') and not name.endswith('_code')) 700 701 702class Test_testinternalcapi(unittest.TestCase): 703 locals().update((name, getattr(_testinternalcapi, name)) 704 for name in dir(_testinternalcapi) 705 if name.startswith('test_')) 706 707 708class PyMemDebugTests(unittest.TestCase): 709 PYTHONMALLOC = 'debug' 710 # '0x04c06e0' or '04C06E0' 711 PTR_REGEX = r'(?:0x)?[0-9a-fA-F]+' 712 713 def check(self, code): 714 with support.SuppressCrashReport(): 715 out = assert_python_failure('-c', code, 716 PYTHONMALLOC=self.PYTHONMALLOC) 717 stderr = out.err 718 return stderr.decode('ascii', 'replace') 719 720 def test_buffer_overflow(self): 721 out = self.check('import _testcapi; _testcapi.pymem_buffer_overflow()') 722 regex = (r"Debug memory block at address p={ptr}: API 'm'\n" 723 r" 16 bytes originally requested\n" 724 r" The [0-9] pad bytes at p-[0-9] are FORBIDDENBYTE, as expected.\n" 725 r" The [0-9] pad bytes at tail={ptr} are not all FORBIDDENBYTE \(0x[0-9a-f]{{2}}\):\n" 726 r" at tail\+0: 0x78 \*\*\* OUCH\n" 727 r" at tail\+1: 0xfd\n" 728 r" at tail\+2: 0xfd\n" 729 r" .*\n" 730 r"( The block was made by call #[0-9]+ to debug malloc/realloc.\n)?" 731 r" Data at p: cd cd cd .*\n" 732 r"\n" 733 r"Enable tracemalloc to get the memory block allocation traceback\n" 734 r"\n" 735 r"Fatal Python error: _PyMem_DebugRawFree: bad trailing pad byte") 736 regex = regex.format(ptr=self.PTR_REGEX) 737 regex = re.compile(regex, flags=re.DOTALL) 738 self.assertRegex(out, regex) 739 740 def test_api_misuse(self): 741 out = self.check('import _testcapi; _testcapi.pymem_api_misuse()') 742 regex = (r"Debug memory block at address p={ptr}: API 'm'\n" 743 r" 16 bytes originally requested\n" 744 r" The [0-9] pad bytes at p-[0-9] are FORBIDDENBYTE, as expected.\n" 745 r" The [0-9] pad bytes at tail={ptr} are FORBIDDENBYTE, as expected.\n" 746 r"( The block was made by call #[0-9]+ to debug malloc/realloc.\n)?" 747 r" Data at p: cd cd cd .*\n" 748 r"\n" 749 r"Enable tracemalloc to get the memory block allocation traceback\n" 750 r"\n" 751 r"Fatal Python error: _PyMem_DebugRawFree: bad ID: Allocated using API 'm', verified using API 'r'\n") 752 regex = regex.format(ptr=self.PTR_REGEX) 753 self.assertRegex(out, regex) 754 755 def check_malloc_without_gil(self, code): 756 out = self.check(code) 757 expected = ('Fatal Python error: _PyMem_DebugMalloc: ' 758 'Python memory allocator called without holding the GIL') 759 self.assertIn(expected, out) 760 761 def test_pymem_malloc_without_gil(self): 762 # Debug hooks must raise an error if PyMem_Malloc() is called 763 # without holding the GIL 764 code = 'import _testcapi; _testcapi.pymem_malloc_without_gil()' 765 self.check_malloc_without_gil(code) 766 767 def test_pyobject_malloc_without_gil(self): 768 # Debug hooks must raise an error if PyObject_Malloc() is called 769 # without holding the GIL 770 code = 'import _testcapi; _testcapi.pyobject_malloc_without_gil()' 771 self.check_malloc_without_gil(code) 772 773 def check_pyobject_is_freed(self, func_name): 774 code = textwrap.dedent(f''' 775 import gc, os, sys, _testcapi 776 # Disable the GC to avoid crash on GC collection 777 gc.disable() 778 try: 779 _testcapi.{func_name}() 780 # Exit immediately to avoid a crash while deallocating 781 # the invalid object 782 os._exit(0) 783 except _testcapi.error: 784 os._exit(1) 785 ''') 786 assert_python_ok('-c', code, PYTHONMALLOC=self.PYTHONMALLOC) 787 788 def test_pyobject_null_is_freed(self): 789 self.check_pyobject_is_freed('check_pyobject_null_is_freed') 790 791 def test_pyobject_uninitialized_is_freed(self): 792 self.check_pyobject_is_freed('check_pyobject_uninitialized_is_freed') 793 794 def test_pyobject_forbidden_bytes_is_freed(self): 795 self.check_pyobject_is_freed('check_pyobject_forbidden_bytes_is_freed') 796 797 def test_pyobject_freed_is_freed(self): 798 self.check_pyobject_is_freed('check_pyobject_freed_is_freed') 799 800 801class PyMemMallocDebugTests(PyMemDebugTests): 802 PYTHONMALLOC = 'malloc_debug' 803 804 805@unittest.skipUnless(support.with_pymalloc(), 'need pymalloc') 806class PyMemPymallocDebugTests(PyMemDebugTests): 807 PYTHONMALLOC = 'pymalloc_debug' 808 809 810@unittest.skipUnless(Py_DEBUG, 'need Py_DEBUG') 811class PyMemDefaultTests(PyMemDebugTests): 812 # test default allocator of Python compiled in debug mode 813 PYTHONMALLOC = '' 814 815 816class Test_ModuleStateAccess(unittest.TestCase): 817 """Test access to module start (PEP 573)""" 818 819 # The C part of the tests lives in _testmultiphase, in a module called 820 # _testmultiphase_meth_state_access. 821 # This module has multi-phase initialization, unlike _testcapi. 822 823 def setUp(self): 824 fullname = '_testmultiphase_meth_state_access' # XXX 825 origin = importlib.util.find_spec('_testmultiphase').origin 826 loader = importlib.machinery.ExtensionFileLoader(fullname, origin) 827 spec = importlib.util.spec_from_loader(fullname, loader) 828 module = importlib.util.module_from_spec(spec) 829 loader.exec_module(module) 830 self.module = module 831 832 def test_subclass_get_module(self): 833 """PyType_GetModule for defining_class""" 834 class StateAccessType_Subclass(self.module.StateAccessType): 835 pass 836 837 instance = StateAccessType_Subclass() 838 self.assertIs(instance.get_defining_module(), self.module) 839 840 def test_subclass_get_module_with_super(self): 841 class StateAccessType_Subclass(self.module.StateAccessType): 842 def get_defining_module(self): 843 return super().get_defining_module() 844 845 instance = StateAccessType_Subclass() 846 self.assertIs(instance.get_defining_module(), self.module) 847 848 def test_state_access(self): 849 """Checks methods defined with and without argument clinic 850 851 This tests a no-arg method (get_count) and a method with 852 both a positional and keyword argument. 853 """ 854 855 a = self.module.StateAccessType() 856 b = self.module.StateAccessType() 857 858 methods = { 859 'clinic': a.increment_count_clinic, 860 'noclinic': a.increment_count_noclinic, 861 } 862 863 for name, increment_count in methods.items(): 864 with self.subTest(name): 865 self.assertEqual(a.get_count(), b.get_count()) 866 self.assertEqual(a.get_count(), 0) 867 868 increment_count() 869 self.assertEqual(a.get_count(), b.get_count()) 870 self.assertEqual(a.get_count(), 1) 871 872 increment_count(3) 873 self.assertEqual(a.get_count(), b.get_count()) 874 self.assertEqual(a.get_count(), 4) 875 876 increment_count(-2, twice=True) 877 self.assertEqual(a.get_count(), b.get_count()) 878 self.assertEqual(a.get_count(), 0) 879 880 with self.assertRaises(TypeError): 881 increment_count(thrice=3) 882 883 with self.assertRaises(TypeError): 884 increment_count(1, 2, 3) 885 886 887if __name__ == "__main__": 888 unittest.main() 889