1# Run the _testcapi module tests (tests for the Python/C API): by defn, 2# these are all functions _testcapi exports whose name begins with 'test_'. 3 4from collections import OrderedDict 5import os 6import pickle 7import random 8import re 9import subprocess 10import sys 11import sysconfig 12import textwrap 13import threading 14import time 15import unittest 16from test import support 17from test.support import MISSING_C_DOCSTRINGS 18from test.support.script_helper import assert_python_failure, assert_python_ok 19try: 20 import _posixsubprocess 21except ImportError: 22 _posixsubprocess = None 23 24# Skip this test if the _testcapi module isn't available. 25_testcapi = support.import_module('_testcapi') 26 27# Were we compiled --with-pydebug or with #define Py_DEBUG? 28Py_DEBUG = hasattr(sys, 'gettotalrefcount') 29 30 31def testfunction(self): 32 """some doc""" 33 return self 34 35class InstanceMethod: 36 id = _testcapi.instancemethod(id) 37 testfunction = _testcapi.instancemethod(testfunction) 38 39class CAPITest(unittest.TestCase): 40 41 def test_instancemethod(self): 42 inst = InstanceMethod() 43 self.assertEqual(id(inst), inst.id()) 44 self.assertTrue(inst.testfunction() is inst) 45 self.assertEqual(inst.testfunction.__doc__, testfunction.__doc__) 46 self.assertEqual(InstanceMethod.testfunction.__doc__, testfunction.__doc__) 47 48 InstanceMethod.testfunction.attribute = "test" 49 self.assertEqual(testfunction.attribute, "test") 50 self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test") 51 52 def test_no_FatalError_infinite_loop(self): 53 with support.SuppressCrashReport(): 54 p = subprocess.Popen([sys.executable, "-c", 55 'import _testcapi;' 56 '_testcapi.crash_no_current_thread()'], 57 stdout=subprocess.PIPE, 58 stderr=subprocess.PIPE) 59 (out, err) = p.communicate() 60 self.assertEqual(out, b'') 61 # This used to cause an infinite loop. 62 self.assertTrue(err.rstrip().startswith( 63 b'Fatal Python error:' 64 b' PyThreadState_Get: no current thread')) 65 66 def test_memoryview_from_NULL_pointer(self): 67 self.assertRaises(ValueError, _testcapi.make_memoryview_from_NULL_pointer) 68 69 def test_exc_info(self): 70 raised_exception = ValueError("5") 71 new_exc = TypeError("TEST") 72 try: 73 raise raised_exception 74 except ValueError as e: 75 tb = e.__traceback__ 76 orig_sys_exc_info = sys.exc_info() 77 orig_exc_info = _testcapi.set_exc_info(new_exc.__class__, new_exc, None) 78 new_sys_exc_info = sys.exc_info() 79 new_exc_info = _testcapi.set_exc_info(*orig_exc_info) 80 reset_sys_exc_info = sys.exc_info() 81 82 self.assertEqual(orig_exc_info[1], e) 83 84 self.assertSequenceEqual(orig_exc_info, (raised_exception.__class__, raised_exception, tb)) 85 self.assertSequenceEqual(orig_sys_exc_info, orig_exc_info) 86 self.assertSequenceEqual(reset_sys_exc_info, orig_exc_info) 87 self.assertSequenceEqual(new_exc_info, (new_exc.__class__, new_exc, None)) 88 self.assertSequenceEqual(new_sys_exc_info, new_exc_info) 89 else: 90 self.assertTrue(False) 91 92 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.') 93 def test_seq_bytes_to_charp_array(self): 94 # Issue #15732: crash in _PySequence_BytesToCharpArray() 95 class Z(object): 96 def __len__(self): 97 return 1 98 self.assertRaises(TypeError, _posixsubprocess.fork_exec, 99 1,Z(),3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17) 100 # Issue #15736: overflow in _PySequence_BytesToCharpArray() 101 class Z(object): 102 def __len__(self): 103 return sys.maxsize 104 def __getitem__(self, i): 105 return b'x' 106 self.assertRaises(MemoryError, _posixsubprocess.fork_exec, 107 1,Z(),3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17) 108 109 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.') 110 def test_subprocess_fork_exec(self): 111 class Z(object): 112 def __len__(self): 113 return 1 114 115 # Issue #15738: crash in subprocess_fork_exec() 116 self.assertRaises(TypeError, _posixsubprocess.fork_exec, 117 Z(),[b'1'],3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17) 118 119 @unittest.skipIf(MISSING_C_DOCSTRINGS, 120 "Signature information for builtins requires docstrings") 121 def test_docstring_signature_parsing(self): 122 123 self.assertEqual(_testcapi.no_docstring.__doc__, None) 124 self.assertEqual(_testcapi.no_docstring.__text_signature__, None) 125 126 self.assertEqual(_testcapi.docstring_empty.__doc__, None) 127 self.assertEqual(_testcapi.docstring_empty.__text_signature__, None) 128 129 self.assertEqual(_testcapi.docstring_no_signature.__doc__, 130 "This docstring has no signature.") 131 self.assertEqual(_testcapi.docstring_no_signature.__text_signature__, None) 132 133 self.assertEqual(_testcapi.docstring_with_invalid_signature.__doc__, 134 "docstring_with_invalid_signature($module, /, boo)\n" 135 "\n" 136 "This docstring has an invalid signature." 137 ) 138 self.assertEqual(_testcapi.docstring_with_invalid_signature.__text_signature__, None) 139 140 self.assertEqual(_testcapi.docstring_with_invalid_signature2.__doc__, 141 "docstring_with_invalid_signature2($module, /, boo)\n" 142 "\n" 143 "--\n" 144 "\n" 145 "This docstring also has an invalid signature." 146 ) 147 self.assertEqual(_testcapi.docstring_with_invalid_signature2.__text_signature__, None) 148 149 self.assertEqual(_testcapi.docstring_with_signature.__doc__, 150 "This docstring has a valid signature.") 151 self.assertEqual(_testcapi.docstring_with_signature.__text_signature__, "($module, /, sig)") 152 153 self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__doc__, None) 154 self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__text_signature__, 155 "($module, /, sig)") 156 157 self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__doc__, 158 "\nThis docstring has a valid signature and some extra newlines.") 159 self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__text_signature__, 160 "($module, /, parameter)") 161 162 def test_c_type_with_matrix_multiplication(self): 163 M = _testcapi.matmulType 164 m1 = M() 165 m2 = M() 166 self.assertEqual(m1 @ m2, ("matmul", m1, m2)) 167 self.assertEqual(m1 @ 42, ("matmul", m1, 42)) 168 self.assertEqual(42 @ m1, ("matmul", 42, m1)) 169 o = m1 170 o @= m2 171 self.assertEqual(o, ("imatmul", m1, m2)) 172 o = m1 173 o @= 42 174 self.assertEqual(o, ("imatmul", m1, 42)) 175 o = 42 176 o @= m1 177 self.assertEqual(o, ("matmul", 42, m1)) 178 179 def test_return_null_without_error(self): 180 # Issue #23571: A function must not return NULL without setting an 181 # error 182 if Py_DEBUG: 183 code = textwrap.dedent(""" 184 import _testcapi 185 from test import support 186 187 with support.SuppressCrashReport(): 188 _testcapi.return_null_without_error() 189 """) 190 rc, out, err = assert_python_failure('-c', code) 191 self.assertRegex(err.replace(b'\r', b''), 192 br'Fatal Python error: a function returned NULL ' 193 br'without setting an error\n' 194 br'SystemError: <built-in function ' 195 br'return_null_without_error> returned NULL ' 196 br'without setting an error\n' 197 br'\n' 198 br'Current thread.*:\n' 199 br' File .*", line 6 in <module>') 200 else: 201 with self.assertRaises(SystemError) as cm: 202 _testcapi.return_null_without_error() 203 self.assertRegex(str(cm.exception), 204 'return_null_without_error.* ' 205 'returned NULL without setting an error') 206 207 def test_return_result_with_error(self): 208 # Issue #23571: A function must not return a result with an error set 209 if Py_DEBUG: 210 code = textwrap.dedent(""" 211 import _testcapi 212 from test import support 213 214 with support.SuppressCrashReport(): 215 _testcapi.return_result_with_error() 216 """) 217 rc, out, err = assert_python_failure('-c', code) 218 self.assertRegex(err.replace(b'\r', b''), 219 br'Fatal Python error: a function returned a ' 220 br'result with an error set\n' 221 br'ValueError\n' 222 br'\n' 223 br'The above exception was the direct cause ' 224 br'of the following exception:\n' 225 br'\n' 226 br'SystemError: <built-in ' 227 br'function return_result_with_error> ' 228 br'returned a result with an error set\n' 229 br'\n' 230 br'Current thread.*:\n' 231 br' File .*, line 6 in <module>') 232 else: 233 with self.assertRaises(SystemError) as cm: 234 _testcapi.return_result_with_error() 235 self.assertRegex(str(cm.exception), 236 'return_result_with_error.* ' 237 'returned a result with an error set') 238 239 def test_buildvalue_N(self): 240 _testcapi.test_buildvalue_N() 241 242 def test_set_nomemory(self): 243 code = """if 1: 244 import _testcapi 245 246 class C(): pass 247 248 # The first loop tests both functions and that remove_mem_hooks() 249 # can be called twice in a row. The second loop checks a call to 250 # set_nomemory() after a call to remove_mem_hooks(). The third 251 # loop checks the start and stop arguments of set_nomemory(). 252 for outer_cnt in range(1, 4): 253 start = 10 * outer_cnt 254 for j in range(100): 255 if j == 0: 256 if outer_cnt != 3: 257 _testcapi.set_nomemory(start) 258 else: 259 _testcapi.set_nomemory(start, start + 1) 260 try: 261 C() 262 except MemoryError as e: 263 if outer_cnt != 3: 264 _testcapi.remove_mem_hooks() 265 print('MemoryError', outer_cnt, j) 266 _testcapi.remove_mem_hooks() 267 break 268 """ 269 rc, out, err = assert_python_ok('-c', code) 270 self.assertIn(b'MemoryError 1 10', out) 271 self.assertIn(b'MemoryError 2 20', out) 272 self.assertIn(b'MemoryError 3 30', out) 273 274 def test_mapping_keys_values_items(self): 275 class Mapping1(dict): 276 def keys(self): 277 return list(super().keys()) 278 def values(self): 279 return list(super().values()) 280 def items(self): 281 return list(super().items()) 282 class Mapping2(dict): 283 def keys(self): 284 return tuple(super().keys()) 285 def values(self): 286 return tuple(super().values()) 287 def items(self): 288 return tuple(super().items()) 289 dict_obj = {'foo': 1, 'bar': 2, 'spam': 3} 290 291 for mapping in [{}, OrderedDict(), Mapping1(), Mapping2(), 292 dict_obj, OrderedDict(dict_obj), 293 Mapping1(dict_obj), Mapping2(dict_obj)]: 294 self.assertListEqual(_testcapi.get_mapping_keys(mapping), 295 list(mapping.keys())) 296 self.assertListEqual(_testcapi.get_mapping_values(mapping), 297 list(mapping.values())) 298 self.assertListEqual(_testcapi.get_mapping_items(mapping), 299 list(mapping.items())) 300 301 def test_mapping_keys_values_items_bad_arg(self): 302 self.assertRaises(AttributeError, _testcapi.get_mapping_keys, None) 303 self.assertRaises(AttributeError, _testcapi.get_mapping_values, None) 304 self.assertRaises(AttributeError, _testcapi.get_mapping_items, None) 305 306 class BadMapping: 307 def keys(self): 308 return None 309 def values(self): 310 return None 311 def items(self): 312 return None 313 bad_mapping = BadMapping() 314 self.assertRaises(TypeError, _testcapi.get_mapping_keys, bad_mapping) 315 self.assertRaises(TypeError, _testcapi.get_mapping_values, bad_mapping) 316 self.assertRaises(TypeError, _testcapi.get_mapping_items, bad_mapping) 317 318 319class TestPendingCalls(unittest.TestCase): 320 321 def pendingcalls_submit(self, l, n): 322 def callback(): 323 #this function can be interrupted by thread switching so let's 324 #use an atomic operation 325 l.append(None) 326 327 for i in range(n): 328 time.sleep(random.random()*0.02) #0.01 secs on average 329 #try submitting callback until successful. 330 #rely on regular interrupt to flush queue if we are 331 #unsuccessful. 332 while True: 333 if _testcapi._pending_threadfunc(callback): 334 break; 335 336 def pendingcalls_wait(self, l, n, context = None): 337 #now, stick around until l[0] has grown to 10 338 count = 0; 339 while len(l) != n: 340 #this busy loop is where we expect to be interrupted to 341 #run our callbacks. Note that callbacks are only run on the 342 #main thread 343 if False and support.verbose: 344 print("(%i)"%(len(l),),) 345 for i in range(1000): 346 a = i*i 347 if context and not context.event.is_set(): 348 continue 349 count += 1 350 self.assertTrue(count < 10000, 351 "timeout waiting for %i callbacks, got %i"%(n, len(l))) 352 if False and support.verbose: 353 print("(%i)"%(len(l),)) 354 355 def test_pendingcalls_threaded(self): 356 357 #do every callback on a separate thread 358 n = 32 #total callbacks 359 threads = [] 360 class foo(object):pass 361 context = foo() 362 context.l = [] 363 context.n = 2 #submits per thread 364 context.nThreads = n // context.n 365 context.nFinished = 0 366 context.lock = threading.Lock() 367 context.event = threading.Event() 368 369 threads = [threading.Thread(target=self.pendingcalls_thread, 370 args=(context,)) 371 for i in range(context.nThreads)] 372 with support.start_threads(threads): 373 self.pendingcalls_wait(context.l, n, context) 374 375 def pendingcalls_thread(self, context): 376 try: 377 self.pendingcalls_submit(context.l, context.n) 378 finally: 379 with context.lock: 380 context.nFinished += 1 381 nFinished = context.nFinished 382 if False and support.verbose: 383 print("finished threads: ", nFinished) 384 if nFinished == context.nThreads: 385 context.event.set() 386 387 def test_pendingcalls_non_threaded(self): 388 #again, just using the main thread, likely they will all be dispatched at 389 #once. It is ok to ask for too many, because we loop until we find a slot. 390 #the loop can be interrupted to dispatch. 391 #there are only 32 dispatch slots, so we go for twice that! 392 l = [] 393 n = 64 394 self.pendingcalls_submit(l, n) 395 self.pendingcalls_wait(l, n) 396 397 398class SubinterpreterTest(unittest.TestCase): 399 400 def test_subinterps(self): 401 import builtins 402 r, w = os.pipe() 403 code = """if 1: 404 import sys, builtins, pickle 405 with open({:d}, "wb") as f: 406 pickle.dump(id(sys.modules), f) 407 pickle.dump(id(builtins), f) 408 """.format(w) 409 with open(r, "rb") as f: 410 ret = support.run_in_subinterp(code) 411 self.assertEqual(ret, 0) 412 self.assertNotEqual(pickle.load(f), id(sys.modules)) 413 self.assertNotEqual(pickle.load(f), id(builtins)) 414 415 416class TestThreadState(unittest.TestCase): 417 418 @support.reap_threads 419 def test_thread_state(self): 420 # some extra thread-state tests driven via _testcapi 421 def target(): 422 idents = [] 423 424 def callback(): 425 idents.append(threading.get_ident()) 426 427 _testcapi._test_thread_state(callback) 428 a = b = callback 429 time.sleep(1) 430 # Check our main thread is in the list exactly 3 times. 431 self.assertEqual(idents.count(threading.get_ident()), 3, 432 "Couldn't find main thread correctly in the list") 433 434 target() 435 t = threading.Thread(target=target) 436 t.start() 437 t.join() 438 439 440class Test_testcapi(unittest.TestCase): 441 locals().update((name, getattr(_testcapi, name)) 442 for name in dir(_testcapi) 443 if name.startswith('test_') and not name.endswith('_code')) 444 445 446class PyMemDebugTests(unittest.TestCase): 447 PYTHONMALLOC = 'debug' 448 # '0x04c06e0' or '04C06E0' 449 PTR_REGEX = r'(?:0x)?[0-9a-fA-F]+' 450 451 def check(self, code): 452 with support.SuppressCrashReport(): 453 out = assert_python_failure('-c', code, 454 PYTHONMALLOC=self.PYTHONMALLOC) 455 stderr = out.err 456 return stderr.decode('ascii', 'replace') 457 458 def test_buffer_overflow(self): 459 out = self.check('import _testcapi; _testcapi.pymem_buffer_overflow()') 460 regex = (r"Debug memory block at address p={ptr}: API 'm'\n" 461 r" 16 bytes originally requested\n" 462 r" The [0-9] pad bytes at p-[0-9] are FORBIDDENBYTE, as expected.\n" 463 r" The [0-9] pad bytes at tail={ptr} are not all FORBIDDENBYTE \(0x[0-9a-f]{{2}}\):\n" 464 r" at tail\+0: 0x78 \*\*\* OUCH\n" 465 r" at tail\+1: 0xfb\n" 466 r" at tail\+2: 0xfb\n" 467 r" .*\n" 468 r" The block was made by call #[0-9]+ to debug malloc/realloc.\n" 469 r" Data at p: cb cb cb .*\n" 470 r"\n" 471 r"Enable tracemalloc to get the memory block allocation traceback\n" 472 r"\n" 473 r"Fatal Python error: bad trailing pad byte") 474 regex = regex.format(ptr=self.PTR_REGEX) 475 regex = re.compile(regex, flags=re.DOTALL) 476 self.assertRegex(out, regex) 477 478 def test_api_misuse(self): 479 out = self.check('import _testcapi; _testcapi.pymem_api_misuse()') 480 regex = (r"Debug memory block at address p={ptr}: API 'm'\n" 481 r" 16 bytes originally requested\n" 482 r" The [0-9] pad bytes at p-[0-9] are FORBIDDENBYTE, as expected.\n" 483 r" The [0-9] pad bytes at tail={ptr} are FORBIDDENBYTE, as expected.\n" 484 r" The block was made by call #[0-9]+ to debug malloc/realloc.\n" 485 r" Data at p: cb cb cb .*\n" 486 r"\n" 487 r"Enable tracemalloc to get the memory block allocation traceback\n" 488 r"\n" 489 r"Fatal Python error: bad ID: Allocated using API 'm', verified using API 'r'\n") 490 regex = regex.format(ptr=self.PTR_REGEX) 491 self.assertRegex(out, regex) 492 493 def check_malloc_without_gil(self, code): 494 out = self.check(code) 495 expected = ('Fatal Python error: Python memory allocator called ' 496 'without holding the GIL') 497 self.assertIn(expected, out) 498 499 def test_pymem_malloc_without_gil(self): 500 # Debug hooks must raise an error if PyMem_Malloc() is called 501 # without holding the GIL 502 code = 'import _testcapi; _testcapi.pymem_malloc_without_gil()' 503 self.check_malloc_without_gil(code) 504 505 def test_pyobject_malloc_without_gil(self): 506 # Debug hooks must raise an error if PyObject_Malloc() is called 507 # without holding the GIL 508 code = 'import _testcapi; _testcapi.pyobject_malloc_without_gil()' 509 self.check_malloc_without_gil(code) 510 511 512class PyMemMallocDebugTests(PyMemDebugTests): 513 PYTHONMALLOC = 'malloc_debug' 514 515 516@unittest.skipUnless(support.with_pymalloc(), 'need pymalloc') 517class PyMemPymallocDebugTests(PyMemDebugTests): 518 PYTHONMALLOC = 'pymalloc_debug' 519 520 521@unittest.skipUnless(Py_DEBUG, 'need Py_DEBUG') 522class PyMemDefaultTests(PyMemDebugTests): 523 # test default allocator of Python compiled in debug mode 524 PYTHONMALLOC = '' 525 526 527if __name__ == "__main__": 528 unittest.main() 529