1# Run the _testcapi module tests (tests for the Python/C API): by defn, 2# these are all functions _testcapi exports whose name begins with 'test_'. 3 4import os 5import pickle 6import random 7import re 8import subprocess 9import sys 10import sysconfig 11import textwrap 12import time 13import unittest 14from test import support 15from test.support import MISSING_C_DOCSTRINGS 16from test.support.script_helper import assert_python_failure 17try: 18 import _posixsubprocess 19except ImportError: 20 _posixsubprocess = None 21try: 22 import threading 23except ImportError: 24 threading = None 25# Skip this test if the _testcapi module isn't available. 26_testcapi = support.import_module('_testcapi') 27 28# Were we compiled --with-pydebug or with #define Py_DEBUG? 29Py_DEBUG = hasattr(sys, 'gettotalrefcount') 30 31 32def testfunction(self): 33 """some doc""" 34 return self 35 36class InstanceMethod: 37 id = _testcapi.instancemethod(id) 38 testfunction = _testcapi.instancemethod(testfunction) 39 40class CAPITest(unittest.TestCase): 41 42 def test_instancemethod(self): 43 inst = InstanceMethod() 44 self.assertEqual(id(inst), inst.id()) 45 self.assertTrue(inst.testfunction() is inst) 46 self.assertEqual(inst.testfunction.__doc__, testfunction.__doc__) 47 self.assertEqual(InstanceMethod.testfunction.__doc__, testfunction.__doc__) 48 49 InstanceMethod.testfunction.attribute = "test" 50 self.assertEqual(testfunction.attribute, "test") 51 self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test") 52 53 @unittest.skipUnless(threading, 'Threading required for this test.') 54 def test_no_FatalError_infinite_loop(self): 55 with support.SuppressCrashReport(): 56 p = subprocess.Popen([sys.executable, "-c", 57 'import _testcapi;' 58 '_testcapi.crash_no_current_thread()'], 59 stdout=subprocess.PIPE, 60 stderr=subprocess.PIPE) 61 (out, err) = p.communicate() 62 self.assertEqual(out, b'') 63 # This used to cause an infinite loop. 64 self.assertTrue(err.rstrip().startswith( 65 b'Fatal Python error:' 66 b' PyThreadState_Get: no current thread')) 67 68 def test_memoryview_from_NULL_pointer(self): 69 self.assertRaises(ValueError, _testcapi.make_memoryview_from_NULL_pointer) 70 71 def test_exc_info(self): 72 raised_exception = ValueError("5") 73 new_exc = TypeError("TEST") 74 try: 75 raise raised_exception 76 except ValueError as e: 77 tb = e.__traceback__ 78 orig_sys_exc_info = sys.exc_info() 79 orig_exc_info = _testcapi.set_exc_info(new_exc.__class__, new_exc, None) 80 new_sys_exc_info = sys.exc_info() 81 new_exc_info = _testcapi.set_exc_info(*orig_exc_info) 82 reset_sys_exc_info = sys.exc_info() 83 84 self.assertEqual(orig_exc_info[1], e) 85 86 self.assertSequenceEqual(orig_exc_info, (raised_exception.__class__, raised_exception, tb)) 87 self.assertSequenceEqual(orig_sys_exc_info, orig_exc_info) 88 self.assertSequenceEqual(reset_sys_exc_info, orig_exc_info) 89 self.assertSequenceEqual(new_exc_info, (new_exc.__class__, new_exc, None)) 90 self.assertSequenceEqual(new_sys_exc_info, new_exc_info) 91 else: 92 self.assertTrue(False) 93 94 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.') 95 def test_seq_bytes_to_charp_array(self): 96 # Issue #15732: crash in _PySequence_BytesToCharpArray() 97 class Z(object): 98 def __len__(self): 99 return 1 100 self.assertRaises(TypeError, _posixsubprocess.fork_exec, 101 1,Z(),3,[1, 2],5,6,7,8,9,10,11,12,13,14,15,16,17) 102 # Issue #15736: overflow in _PySequence_BytesToCharpArray() 103 class Z(object): 104 def __len__(self): 105 return sys.maxsize 106 def __getitem__(self, i): 107 return b'x' 108 self.assertRaises(MemoryError, _posixsubprocess.fork_exec, 109 1,Z(),3,[1, 2],5,6,7,8,9,10,11,12,13,14,15,16,17) 110 111 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.') 112 def test_subprocess_fork_exec(self): 113 class Z(object): 114 def __len__(self): 115 return 1 116 117 # Issue #15738: crash in subprocess_fork_exec() 118 self.assertRaises(TypeError, _posixsubprocess.fork_exec, 119 Z(),[b'1'],3,[1, 2],5,6,7,8,9,10,11,12,13,14,15,16,17) 120 121 @unittest.skipIf(MISSING_C_DOCSTRINGS, 122 "Signature information for builtins requires docstrings") 123 def test_docstring_signature_parsing(self): 124 125 self.assertEqual(_testcapi.no_docstring.__doc__, None) 126 self.assertEqual(_testcapi.no_docstring.__text_signature__, None) 127 128 self.assertEqual(_testcapi.docstring_empty.__doc__, None) 129 self.assertEqual(_testcapi.docstring_empty.__text_signature__, None) 130 131 self.assertEqual(_testcapi.docstring_no_signature.__doc__, 132 "This docstring has no signature.") 133 self.assertEqual(_testcapi.docstring_no_signature.__text_signature__, None) 134 135 self.assertEqual(_testcapi.docstring_with_invalid_signature.__doc__, 136 "docstring_with_invalid_signature($module, /, boo)\n" 137 "\n" 138 "This docstring has an invalid signature." 139 ) 140 self.assertEqual(_testcapi.docstring_with_invalid_signature.__text_signature__, None) 141 142 self.assertEqual(_testcapi.docstring_with_invalid_signature2.__doc__, 143 "docstring_with_invalid_signature2($module, /, boo)\n" 144 "\n" 145 "--\n" 146 "\n" 147 "This docstring also has an invalid signature." 148 ) 149 self.assertEqual(_testcapi.docstring_with_invalid_signature2.__text_signature__, None) 150 151 self.assertEqual(_testcapi.docstring_with_signature.__doc__, 152 "This docstring has a valid signature.") 153 self.assertEqual(_testcapi.docstring_with_signature.__text_signature__, "($module, /, sig)") 154 155 self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__doc__, None) 156 self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__text_signature__, 157 "($module, /, sig)") 158 159 self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__doc__, 160 "\nThis docstring has a valid signature and some extra newlines.") 161 self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__text_signature__, 162 "($module, /, parameter)") 163 164 def test_c_type_with_matrix_multiplication(self): 165 M = _testcapi.matmulType 166 m1 = M() 167 m2 = M() 168 self.assertEqual(m1 @ m2, ("matmul", m1, m2)) 169 self.assertEqual(m1 @ 42, ("matmul", m1, 42)) 170 self.assertEqual(42 @ m1, ("matmul", 42, m1)) 171 o = m1 172 o @= m2 173 self.assertEqual(o, ("imatmul", m1, m2)) 174 o = m1 175 o @= 42 176 self.assertEqual(o, ("imatmul", m1, 42)) 177 o = 42 178 o @= m1 179 self.assertEqual(o, ("matmul", 42, m1)) 180 181 def test_return_null_without_error(self): 182 # Issue #23571: A function must not return NULL without setting an 183 # error 184 if Py_DEBUG: 185 code = textwrap.dedent(""" 186 import _testcapi 187 from test import support 188 189 with support.SuppressCrashReport(): 190 _testcapi.return_null_without_error() 191 """) 192 rc, out, err = assert_python_failure('-c', code) 193 self.assertRegex(err.replace(b'\r', b''), 194 br'Fatal Python error: a function returned NULL ' 195 br'without setting an error\n' 196 br'SystemError: <built-in function ' 197 br'return_null_without_error> returned NULL ' 198 br'without setting an error\n' 199 br'\n' 200 br'Current thread.*:\n' 201 br' File .*", line 6 in <module>') 202 else: 203 with self.assertRaises(SystemError) as cm: 204 _testcapi.return_null_without_error() 205 self.assertRegex(str(cm.exception), 206 'return_null_without_error.* ' 207 'returned NULL without setting an error') 208 209 def test_return_result_with_error(self): 210 # Issue #23571: A function must not return a result with an error set 211 if Py_DEBUG: 212 code = textwrap.dedent(""" 213 import _testcapi 214 from test import support 215 216 with support.SuppressCrashReport(): 217 _testcapi.return_result_with_error() 218 """) 219 rc, out, err = assert_python_failure('-c', code) 220 self.assertRegex(err.replace(b'\r', b''), 221 br'Fatal Python error: a function returned a ' 222 br'result with an error set\n' 223 br'ValueError\n' 224 br'\n' 225 br'The above exception was the direct cause ' 226 br'of the following exception:\n' 227 br'\n' 228 br'SystemError: <built-in ' 229 br'function return_result_with_error> ' 230 br'returned a result with an error set\n' 231 br'\n' 232 br'Current thread.*:\n' 233 br' File .*, line 6 in <module>') 234 else: 235 with self.assertRaises(SystemError) as cm: 236 _testcapi.return_result_with_error() 237 self.assertRegex(str(cm.exception), 238 'return_result_with_error.* ' 239 'returned a result with an error set') 240 241 def test_buildvalue_N(self): 242 _testcapi.test_buildvalue_N() 243 244 245@unittest.skipUnless(threading, 'Threading required for this test.') 246class TestPendingCalls(unittest.TestCase): 247 248 def pendingcalls_submit(self, l, n): 249 def callback(): 250 #this function can be interrupted by thread switching so let's 251 #use an atomic operation 252 l.append(None) 253 254 for i in range(n): 255 time.sleep(random.random()*0.02) #0.01 secs on average 256 #try submitting callback until successful. 257 #rely on regular interrupt to flush queue if we are 258 #unsuccessful. 259 while True: 260 if _testcapi._pending_threadfunc(callback): 261 break; 262 263 def pendingcalls_wait(self, l, n, context = None): 264 #now, stick around until l[0] has grown to 10 265 count = 0; 266 while len(l) != n: 267 #this busy loop is where we expect to be interrupted to 268 #run our callbacks. Note that callbacks are only run on the 269 #main thread 270 if False and support.verbose: 271 print("(%i)"%(len(l),),) 272 for i in range(1000): 273 a = i*i 274 if context and not context.event.is_set(): 275 continue 276 count += 1 277 self.assertTrue(count < 10000, 278 "timeout waiting for %i callbacks, got %i"%(n, len(l))) 279 if False and support.verbose: 280 print("(%i)"%(len(l),)) 281 282 def test_pendingcalls_threaded(self): 283 284 #do every callback on a separate thread 285 n = 32 #total callbacks 286 threads = [] 287 class foo(object):pass 288 context = foo() 289 context.l = [] 290 context.n = 2 #submits per thread 291 context.nThreads = n // context.n 292 context.nFinished = 0 293 context.lock = threading.Lock() 294 context.event = threading.Event() 295 296 threads = [threading.Thread(target=self.pendingcalls_thread, 297 args=(context,)) 298 for i in range(context.nThreads)] 299 with support.start_threads(threads): 300 self.pendingcalls_wait(context.l, n, context) 301 302 def pendingcalls_thread(self, context): 303 try: 304 self.pendingcalls_submit(context.l, context.n) 305 finally: 306 with context.lock: 307 context.nFinished += 1 308 nFinished = context.nFinished 309 if False and support.verbose: 310 print("finished threads: ", nFinished) 311 if nFinished == context.nThreads: 312 context.event.set() 313 314 def test_pendingcalls_non_threaded(self): 315 #again, just using the main thread, likely they will all be dispatched at 316 #once. It is ok to ask for too many, because we loop until we find a slot. 317 #the loop can be interrupted to dispatch. 318 #there are only 32 dispatch slots, so we go for twice that! 319 l = [] 320 n = 64 321 self.pendingcalls_submit(l, n) 322 self.pendingcalls_wait(l, n) 323 324 325class SubinterpreterTest(unittest.TestCase): 326 327 def test_subinterps(self): 328 import builtins 329 r, w = os.pipe() 330 code = """if 1: 331 import sys, builtins, pickle 332 with open({:d}, "wb") as f: 333 pickle.dump(id(sys.modules), f) 334 pickle.dump(id(builtins), f) 335 """.format(w) 336 with open(r, "rb") as f: 337 ret = support.run_in_subinterp(code) 338 self.assertEqual(ret, 0) 339 self.assertNotEqual(pickle.load(f), id(sys.modules)) 340 self.assertNotEqual(pickle.load(f), id(builtins)) 341 342 343# Bug #6012 344class Test6012(unittest.TestCase): 345 def test(self): 346 self.assertEqual(_testcapi.argparsing("Hello", "World"), 1) 347 348 349class EmbeddingTests(unittest.TestCase): 350 def setUp(self): 351 here = os.path.abspath(__file__) 352 basepath = os.path.dirname(os.path.dirname(os.path.dirname(here))) 353 exename = "_testembed" 354 if sys.platform.startswith("win"): 355 ext = ("_d" if "_d" in sys.executable else "") + ".exe" 356 exename += ext 357 exepath = os.path.dirname(sys.executable) 358 else: 359 exepath = os.path.join(basepath, "Programs") 360 self.test_exe = exe = os.path.join(exepath, exename) 361 if not os.path.exists(exe): 362 self.skipTest("%r doesn't exist" % exe) 363 # This is needed otherwise we get a fatal error: 364 # "Py_Initialize: Unable to get the locale encoding 365 # LookupError: no codec search functions registered: can't find encoding" 366 self.oldcwd = os.getcwd() 367 os.chdir(basepath) 368 369 def tearDown(self): 370 os.chdir(self.oldcwd) 371 372 def run_embedded_interpreter(self, *args): 373 """Runs a test in the embedded interpreter""" 374 cmd = [self.test_exe] 375 cmd.extend(args) 376 p = subprocess.Popen(cmd, 377 stdout=subprocess.PIPE, 378 stderr=subprocess.PIPE, 379 universal_newlines=True) 380 (out, err) = p.communicate() 381 self.assertEqual(p.returncode, 0, 382 "bad returncode %d, stderr is %r" % 383 (p.returncode, err)) 384 return out, err 385 386 def test_subinterps(self): 387 # This is just a "don't crash" test 388 out, err = self.run_embedded_interpreter() 389 if support.verbose: 390 print() 391 print(out) 392 print(err) 393 394 @staticmethod 395 def _get_default_pipe_encoding(): 396 rp, wp = os.pipe() 397 try: 398 with os.fdopen(wp, 'w') as w: 399 default_pipe_encoding = w.encoding 400 finally: 401 os.close(rp) 402 return default_pipe_encoding 403 404 def test_forced_io_encoding(self): 405 # Checks forced configuration of embedded interpreter IO streams 406 out, err = self.run_embedded_interpreter("forced_io_encoding") 407 if support.verbose: 408 print() 409 print(out) 410 print(err) 411 expected_errors = sys.__stdout__.errors 412 expected_stdin_encoding = sys.__stdin__.encoding 413 expected_pipe_encoding = self._get_default_pipe_encoding() 414 expected_output = '\n'.join([ 415 "--- Use defaults ---", 416 "Expected encoding: default", 417 "Expected errors: default", 418 "stdin: {in_encoding}:{errors}", 419 "stdout: {out_encoding}:{errors}", 420 "stderr: {out_encoding}:backslashreplace", 421 "--- Set errors only ---", 422 "Expected encoding: default", 423 "Expected errors: ignore", 424 "stdin: {in_encoding}:ignore", 425 "stdout: {out_encoding}:ignore", 426 "stderr: {out_encoding}:backslashreplace", 427 "--- Set encoding only ---", 428 "Expected encoding: latin-1", 429 "Expected errors: default", 430 "stdin: latin-1:{errors}", 431 "stdout: latin-1:{errors}", 432 "stderr: latin-1:backslashreplace", 433 "--- Set encoding and errors ---", 434 "Expected encoding: latin-1", 435 "Expected errors: replace", 436 "stdin: latin-1:replace", 437 "stdout: latin-1:replace", 438 "stderr: latin-1:backslashreplace"]) 439 expected_output = expected_output.format( 440 in_encoding=expected_stdin_encoding, 441 out_encoding=expected_pipe_encoding, 442 errors=expected_errors) 443 # This is useful if we ever trip over odd platform behaviour 444 self.maxDiff = None 445 self.assertEqual(out.strip(), expected_output) 446 447 448class SkipitemTest(unittest.TestCase): 449 450 def test_skipitem(self): 451 """ 452 If this test failed, you probably added a new "format unit" 453 in Python/getargs.c, but neglected to update our poor friend 454 skipitem() in the same file. (If so, shame on you!) 455 456 With a few exceptions**, this function brute-force tests all 457 printable ASCII*** characters (32 to 126 inclusive) as format units, 458 checking to see that PyArg_ParseTupleAndKeywords() return consistent 459 errors both when the unit is attempted to be used and when it is 460 skipped. If the format unit doesn't exist, we'll get one of two 461 specific error messages (one for used, one for skipped); if it does 462 exist we *won't* get that error--we'll get either no error or some 463 other error. If we get the specific "does not exist" error for one 464 test and not for the other, there's a mismatch, and the test fails. 465 466 ** Some format units have special funny semantics and it would 467 be difficult to accommodate them here. Since these are all 468 well-established and properly skipped in skipitem() we can 469 get away with not testing them--this test is really intended 470 to catch *new* format units. 471 472 *** Python C source files must be ASCII. Therefore it's impossible 473 to have non-ASCII format units. 474 475 """ 476 empty_tuple = () 477 tuple_1 = (0,) 478 dict_b = {'b':1} 479 keywords = ["a", "b"] 480 481 for i in range(32, 127): 482 c = chr(i) 483 484 # skip parentheses, the error reporting is inconsistent about them 485 # skip 'e', it's always a two-character code 486 # skip '|' and '$', they don't represent arguments anyway 487 if c in '()e|$': 488 continue 489 490 # test the format unit when not skipped 491 format = c + "i" 492 try: 493 # (note: the format string must be bytes!) 494 _testcapi.parse_tuple_and_keywords(tuple_1, dict_b, 495 format.encode("ascii"), keywords) 496 when_not_skipped = False 497 except SystemError as e: 498 s = "argument 1 (impossible<bad format char>)" 499 when_not_skipped = (str(e) == s) 500 except TypeError: 501 when_not_skipped = False 502 503 # test the format unit when skipped 504 optional_format = "|" + format 505 try: 506 _testcapi.parse_tuple_and_keywords(empty_tuple, dict_b, 507 optional_format.encode("ascii"), keywords) 508 when_skipped = False 509 except SystemError as e: 510 s = "impossible<bad format char>: '{}'".format(format) 511 when_skipped = (str(e) == s) 512 513 message = ("test_skipitem_parity: " 514 "detected mismatch between convertsimple and skipitem " 515 "for format unit '{}' ({}), not skipped {}, skipped {}".format( 516 c, i, when_skipped, when_not_skipped)) 517 self.assertIs(when_skipped, when_not_skipped, message) 518 519 def test_parse_tuple_and_keywords(self): 520 # parse_tuple_and_keywords error handling tests 521 self.assertRaises(TypeError, _testcapi.parse_tuple_and_keywords, 522 (), {}, 42, []) 523 self.assertRaises(ValueError, _testcapi.parse_tuple_and_keywords, 524 (), {}, b'', 42) 525 self.assertRaises(ValueError, _testcapi.parse_tuple_and_keywords, 526 (), {}, b'', [''] * 42) 527 self.assertRaises(ValueError, _testcapi.parse_tuple_and_keywords, 528 (), {}, b'', [42]) 529 530 def test_positional_only(self): 531 parse = _testcapi.parse_tuple_and_keywords 532 533 parse((1, 2, 3), {}, b'OOO', ['', '', 'a']) 534 parse((1, 2), {'a': 3}, b'OOO', ['', '', 'a']) 535 with self.assertRaisesRegex(TypeError, 536 r'Function takes at least 2 positional arguments \(1 given\)'): 537 parse((1,), {'a': 3}, b'OOO', ['', '', 'a']) 538 parse((1,), {}, b'O|OO', ['', '', 'a']) 539 with self.assertRaisesRegex(TypeError, 540 r'Function takes at least 1 positional arguments \(0 given\)'): 541 parse((), {}, b'O|OO', ['', '', 'a']) 542 parse((1, 2), {'a': 3}, b'OO$O', ['', '', 'a']) 543 with self.assertRaisesRegex(TypeError, 544 r'Function takes exactly 2 positional arguments \(1 given\)'): 545 parse((1,), {'a': 3}, b'OO$O', ['', '', 'a']) 546 parse((1,), {}, b'O|O$O', ['', '', 'a']) 547 with self.assertRaisesRegex(TypeError, 548 r'Function takes at least 1 positional arguments \(0 given\)'): 549 parse((), {}, b'O|O$O', ['', '', 'a']) 550 with self.assertRaisesRegex(SystemError, r'Empty parameter name after \$'): 551 parse((1,), {}, b'O|$OO', ['', '', 'a']) 552 with self.assertRaisesRegex(SystemError, 'Empty keyword'): 553 parse((1,), {}, b'O|OO', ['', 'a', '']) 554 555 556@unittest.skipUnless(threading, 'Threading required for this test.') 557class TestThreadState(unittest.TestCase): 558 559 @support.reap_threads 560 def test_thread_state(self): 561 # some extra thread-state tests driven via _testcapi 562 def target(): 563 idents = [] 564 565 def callback(): 566 idents.append(threading.get_ident()) 567 568 _testcapi._test_thread_state(callback) 569 a = b = callback 570 time.sleep(1) 571 # Check our main thread is in the list exactly 3 times. 572 self.assertEqual(idents.count(threading.get_ident()), 3, 573 "Couldn't find main thread correctly in the list") 574 575 target() 576 t = threading.Thread(target=target) 577 t.start() 578 t.join() 579 580 581class Test_testcapi(unittest.TestCase): 582 def test__testcapi(self): 583 for name in dir(_testcapi): 584 if name.startswith('test_'): 585 with self.subTest("internal", name=name): 586 test = getattr(_testcapi, name) 587 test() 588 589 590class PyMemDebugTests(unittest.TestCase): 591 PYTHONMALLOC = 'debug' 592 # '0x04c06e0' or '04C06E0' 593 PTR_REGEX = r'(?:0x)?[0-9a-fA-F]+' 594 595 def check(self, code): 596 with support.SuppressCrashReport(): 597 out = assert_python_failure('-c', code, 598 PYTHONMALLOC=self.PYTHONMALLOC) 599 stderr = out.err 600 return stderr.decode('ascii', 'replace') 601 602 def test_buffer_overflow(self): 603 out = self.check('import _testcapi; _testcapi.pymem_buffer_overflow()') 604 regex = (r"Debug memory block at address p={ptr}: API 'm'\n" 605 r" 16 bytes originally requested\n" 606 r" The [0-9] pad bytes at p-[0-9] are FORBIDDENBYTE, as expected.\n" 607 r" The [0-9] pad bytes at tail={ptr} are not all FORBIDDENBYTE \(0x[0-9a-f]{{2}}\):\n" 608 r" at tail\+0: 0x78 \*\*\* OUCH\n" 609 r" at tail\+1: 0xfb\n" 610 r" at tail\+2: 0xfb\n" 611 r" .*\n" 612 r" The block was made by call #[0-9]+ to debug malloc/realloc.\n" 613 r" Data at p: cb cb cb .*\n" 614 r"\n" 615 r"Fatal Python error: bad trailing pad byte") 616 regex = regex.format(ptr=self.PTR_REGEX) 617 regex = re.compile(regex, flags=re.DOTALL) 618 self.assertRegex(out, regex) 619 620 def test_api_misuse(self): 621 out = self.check('import _testcapi; _testcapi.pymem_api_misuse()') 622 regex = (r"Debug memory block at address p={ptr}: API 'm'\n" 623 r" 16 bytes originally requested\n" 624 r" The [0-9] pad bytes at p-[0-9] are FORBIDDENBYTE, as expected.\n" 625 r" The [0-9] pad bytes at tail={ptr} are FORBIDDENBYTE, as expected.\n" 626 r" The block was made by call #[0-9]+ to debug malloc/realloc.\n" 627 r" Data at p: cb cb cb .*\n" 628 r"\n" 629 r"Fatal Python error: bad ID: Allocated using API 'm', verified using API 'r'\n") 630 regex = regex.format(ptr=self.PTR_REGEX) 631 self.assertRegex(out, regex) 632 633 @unittest.skipUnless(threading, 'Test requires a GIL (multithreading)') 634 def check_malloc_without_gil(self, code): 635 out = self.check(code) 636 expected = ('Fatal Python error: Python memory allocator called ' 637 'without holding the GIL') 638 self.assertIn(expected, out) 639 640 def test_pymem_malloc_without_gil(self): 641 # Debug hooks must raise an error if PyMem_Malloc() is called 642 # without holding the GIL 643 code = 'import _testcapi; _testcapi.pymem_malloc_without_gil()' 644 self.check_malloc_without_gil(code) 645 646 def test_pyobject_malloc_without_gil(self): 647 # Debug hooks must raise an error if PyObject_Malloc() is called 648 # without holding the GIL 649 code = 'import _testcapi; _testcapi.pyobject_malloc_without_gil()' 650 self.check_malloc_without_gil(code) 651 652 653class PyMemMallocDebugTests(PyMemDebugTests): 654 PYTHONMALLOC = 'malloc_debug' 655 656 657@unittest.skipUnless(sysconfig.get_config_var('WITH_PYMALLOC') == 1, 658 'need pymalloc') 659class PyMemPymallocDebugTests(PyMemDebugTests): 660 PYTHONMALLOC = 'pymalloc_debug' 661 662 663@unittest.skipUnless(Py_DEBUG, 'need Py_DEBUG') 664class PyMemDefaultTests(PyMemDebugTests): 665 # test default allocator of Python compiled in debug mode 666 PYTHONMALLOC = '' 667 668 669if __name__ == "__main__": 670 unittest.main() 671