1import os 2import pickle 3import sys 4from textwrap import dedent, indent 5import threading 6import types 7import unittest 8 9from test import support 10from test.support import import_helper 11# Raise SkipTest if subinterpreters not supported. 12_interpreters = import_helper.import_module('_interpreters') 13from test.support import Py_GIL_DISABLED 14from test.support import interpreters 15from test.support import force_not_colorized 16from test.support.interpreters import ( 17 InterpreterError, InterpreterNotFoundError, ExecutionFailed, 18) 19from .utils import ( 20 _captured_script, _run_output, _running, TestBase, 21 requires_test_modules, _testinternalcapi, 22) 23 24 25WHENCE_STR_UNKNOWN = 'unknown' 26WHENCE_STR_RUNTIME = 'runtime init' 27WHENCE_STR_LEGACY_CAPI = 'legacy C-API' 28WHENCE_STR_CAPI = 'C-API' 29WHENCE_STR_XI = 'cross-interpreter C-API' 30WHENCE_STR_STDLIB = '_interpreters module' 31 32 33class ModuleTests(TestBase): 34 35 def test_queue_aliases(self): 36 first = [ 37 interpreters.create_queue, 38 interpreters.Queue, 39 interpreters.QueueEmpty, 40 interpreters.QueueFull, 41 ] 42 second = [ 43 interpreters.create_queue, 44 interpreters.Queue, 45 interpreters.QueueEmpty, 46 interpreters.QueueFull, 47 ] 48 self.assertEqual(second, first) 49 50 51class CreateTests(TestBase): 52 53 def test_in_main(self): 54 interp = interpreters.create() 55 self.assertIsInstance(interp, interpreters.Interpreter) 56 self.assertIn(interp, interpreters.list_all()) 57 58 # GH-126221: Passing an invalid Unicode character used to cause a SystemError 59 self.assertRaises(UnicodeEncodeError, _interpreters.create, '\udc80') 60 61 def test_in_thread(self): 62 lock = threading.Lock() 63 interp = None 64 def f(): 65 nonlocal interp 66 interp = interpreters.create() 67 lock.acquire() 68 lock.release() 69 t = threading.Thread(target=f) 70 with lock: 71 t.start() 72 t.join() 73 self.assertIn(interp, interpreters.list_all()) 74 75 def test_in_subinterpreter(self): 76 main, = interpreters.list_all() 77 interp = interpreters.create() 78 out = _run_output(interp, dedent(""" 79 from test.support import interpreters 80 interp = interpreters.create() 81 print(interp.id) 82 """)) 83 interp2 = interpreters.Interpreter(int(out)) 84 self.assertEqual(interpreters.list_all(), [main, interp, interp2]) 85 86 def test_after_destroy_all(self): 87 before = set(interpreters.list_all()) 88 # Create 3 subinterpreters. 89 interp_lst = [] 90 for _ in range(3): 91 interps = interpreters.create() 92 interp_lst.append(interps) 93 # Now destroy them. 94 for interp in interp_lst: 95 interp.close() 96 # Finally, create another. 97 interp = interpreters.create() 98 self.assertEqual(set(interpreters.list_all()), before | {interp}) 99 100 def test_after_destroy_some(self): 101 before = set(interpreters.list_all()) 102 # Create 3 subinterpreters. 103 interp1 = interpreters.create() 104 interp2 = interpreters.create() 105 interp3 = interpreters.create() 106 # Now destroy 2 of them. 107 interp1.close() 108 interp2.close() 109 # Finally, create another. 110 interp = interpreters.create() 111 self.assertEqual(set(interpreters.list_all()), before | {interp3, interp}) 112 113 114class GetMainTests(TestBase): 115 116 def test_id(self): 117 main = interpreters.get_main() 118 self.assertEqual(main.id, 0) 119 120 def test_current(self): 121 main = interpreters.get_main() 122 current = interpreters.get_current() 123 self.assertIs(main, current) 124 125 def test_idempotent(self): 126 main1 = interpreters.get_main() 127 main2 = interpreters.get_main() 128 self.assertIs(main1, main2) 129 130 131class GetCurrentTests(TestBase): 132 133 def test_main(self): 134 main = interpreters.get_main() 135 current = interpreters.get_current() 136 self.assertEqual(current, main) 137 138 def test_subinterpreter(self): 139 main = interpreters.get_main() 140 interp = interpreters.create() 141 out = _run_output(interp, dedent(""" 142 from test.support import interpreters 143 cur = interpreters.get_current() 144 print(cur.id) 145 """)) 146 current = interpreters.Interpreter(int(out)) 147 self.assertEqual(current, interp) 148 self.assertNotEqual(current, main) 149 150 def test_idempotent(self): 151 with self.subTest('main'): 152 cur1 = interpreters.get_current() 153 cur2 = interpreters.get_current() 154 self.assertIs(cur1, cur2) 155 156 with self.subTest('subinterpreter'): 157 interp = interpreters.create() 158 out = _run_output(interp, dedent(""" 159 from test.support import interpreters 160 cur = interpreters.get_current() 161 print(id(cur)) 162 cur = interpreters.get_current() 163 print(id(cur)) 164 """)) 165 objid1, objid2 = (int(v) for v in out.splitlines()) 166 self.assertEqual(objid1, objid2) 167 168 with self.subTest('per-interpreter'): 169 interp = interpreters.create() 170 out = _run_output(interp, dedent(""" 171 from test.support import interpreters 172 cur = interpreters.get_current() 173 print(id(cur)) 174 """)) 175 id1 = int(out) 176 id2 = id(interp) 177 self.assertNotEqual(id1, id2) 178 179 @requires_test_modules 180 def test_created_with_capi(self): 181 expected = _testinternalcapi.next_interpreter_id() 182 text = self.run_temp_from_capi(f""" 183 import {interpreters.__name__} as interpreters 184 interp = interpreters.get_current() 185 print((interp.id, interp.whence)) 186 """) 187 interpid, whence = eval(text) 188 self.assertEqual(interpid, expected) 189 self.assertEqual(whence, WHENCE_STR_CAPI) 190 191 192class ListAllTests(TestBase): 193 194 def test_initial(self): 195 interps = interpreters.list_all() 196 self.assertEqual(1, len(interps)) 197 198 def test_after_creating(self): 199 main = interpreters.get_current() 200 first = interpreters.create() 201 second = interpreters.create() 202 203 ids = [] 204 for interp in interpreters.list_all(): 205 ids.append(interp.id) 206 207 self.assertEqual(ids, [main.id, first.id, second.id]) 208 209 def test_after_destroying(self): 210 main = interpreters.get_current() 211 first = interpreters.create() 212 second = interpreters.create() 213 first.close() 214 215 ids = [] 216 for interp in interpreters.list_all(): 217 ids.append(interp.id) 218 219 self.assertEqual(ids, [main.id, second.id]) 220 221 def test_idempotent(self): 222 main = interpreters.get_current() 223 first = interpreters.create() 224 second = interpreters.create() 225 expected = [main, first, second] 226 227 actual = interpreters.list_all() 228 229 self.assertEqual(actual, expected) 230 for interp1, interp2 in zip(actual, expected): 231 self.assertIs(interp1, interp2) 232 233 def test_created_with_capi(self): 234 mainid, *_ = _interpreters.get_main() 235 interpid1 = _interpreters.create() 236 interpid2 = _interpreters.create() 237 interpid3 = _interpreters.create() 238 interpid4 = interpid3 + 1 239 interpid5 = interpid4 + 1 240 expected = [ 241 (mainid, WHENCE_STR_RUNTIME), 242 (interpid1, WHENCE_STR_STDLIB), 243 (interpid2, WHENCE_STR_STDLIB), 244 (interpid3, WHENCE_STR_STDLIB), 245 (interpid4, WHENCE_STR_CAPI), 246 (interpid5, WHENCE_STR_STDLIB), 247 ] 248 expected2 = expected[:-2] 249 text = self.run_temp_from_capi(f""" 250 import {interpreters.__name__} as interpreters 251 interp = interpreters.create() 252 print( 253 [(i.id, i.whence) for i in interpreters.list_all()]) 254 """) 255 res = eval(text) 256 res2 = [(i.id, i.whence) for i in interpreters.list_all()] 257 self.assertEqual(res, expected) 258 self.assertEqual(res2, expected2) 259 260 261class InterpreterObjectTests(TestBase): 262 263 def test_init_int(self): 264 interpid = interpreters.get_current().id 265 interp = interpreters.Interpreter(interpid) 266 self.assertEqual(interp.id, interpid) 267 268 def test_init_interpreter_id(self): 269 interpid = interpreters.get_current()._id 270 interp = interpreters.Interpreter(interpid) 271 self.assertEqual(interp._id, interpid) 272 273 def test_init_unsupported(self): 274 actualid = interpreters.get_current().id 275 for interpid in [ 276 str(actualid), 277 float(actualid), 278 object(), 279 None, 280 '', 281 ]: 282 with self.subTest(repr(interpid)): 283 with self.assertRaises(TypeError): 284 interpreters.Interpreter(interpid) 285 286 def test_idempotent(self): 287 main = interpreters.get_main() 288 interp = interpreters.Interpreter(main.id) 289 self.assertIs(interp, main) 290 291 def test_init_does_not_exist(self): 292 with self.assertRaises(InterpreterNotFoundError): 293 interpreters.Interpreter(1_000_000) 294 295 def test_init_bad_id(self): 296 with self.assertRaises(ValueError): 297 interpreters.Interpreter(-1) 298 299 def test_id_type(self): 300 main = interpreters.get_main() 301 current = interpreters.get_current() 302 interp = interpreters.create() 303 self.assertIsInstance(main.id, int) 304 self.assertIsInstance(current.id, int) 305 self.assertIsInstance(interp.id, int) 306 307 def test_id_readonly(self): 308 interp = interpreters.create() 309 with self.assertRaises(AttributeError): 310 interp.id = 1_000_000 311 312 def test_whence(self): 313 main = interpreters.get_main() 314 interp = interpreters.create() 315 316 with self.subTest('main'): 317 self.assertEqual(main.whence, WHENCE_STR_RUNTIME) 318 319 with self.subTest('from _interpreters'): 320 self.assertEqual(interp.whence, WHENCE_STR_STDLIB) 321 322 with self.subTest('from C-API'): 323 text = self.run_temp_from_capi(f""" 324 import {interpreters.__name__} as interpreters 325 interp = interpreters.get_current() 326 print(repr(interp.whence)) 327 """) 328 whence = eval(text) 329 self.assertEqual(whence, WHENCE_STR_CAPI) 330 331 with self.subTest('readonly'): 332 for value in [ 333 None, 334 WHENCE_STR_UNKNOWN, 335 WHENCE_STR_RUNTIME, 336 WHENCE_STR_STDLIB, 337 WHENCE_STR_CAPI, 338 ]: 339 with self.assertRaises(AttributeError): 340 interp.whence = value 341 with self.assertRaises(AttributeError): 342 main.whence = value 343 344 def test_hashable(self): 345 interp = interpreters.create() 346 expected = hash(interp.id) 347 actual = hash(interp) 348 self.assertEqual(actual, expected) 349 350 def test_equality(self): 351 interp1 = interpreters.create() 352 interp2 = interpreters.create() 353 self.assertEqual(interp1, interp1) 354 self.assertNotEqual(interp1, interp2) 355 356 def test_pickle(self): 357 interp = interpreters.create() 358 data = pickle.dumps(interp) 359 unpickled = pickle.loads(data) 360 self.assertEqual(unpickled, interp) 361 362 363class TestInterpreterIsRunning(TestBase): 364 365 def test_main(self): 366 main = interpreters.get_main() 367 self.assertTrue(main.is_running()) 368 369 # XXX Is this still true? 370 @unittest.skip('Fails on FreeBSD') 371 def test_subinterpreter(self): 372 interp = interpreters.create() 373 self.assertFalse(interp.is_running()) 374 375 with _running(interp): 376 self.assertTrue(interp.is_running()) 377 self.assertFalse(interp.is_running()) 378 379 def test_finished(self): 380 r, w = self.pipe() 381 interp = interpreters.create() 382 interp.exec(f"""if True: 383 import os 384 os.write({w}, b'x') 385 """) 386 self.assertFalse(interp.is_running()) 387 self.assertEqual(os.read(r, 1), b'x') 388 389 def test_from_subinterpreter(self): 390 interp = interpreters.create() 391 out = _run_output(interp, dedent(f""" 392 import _interpreters 393 if _interpreters.is_running({interp.id}): 394 print(True) 395 else: 396 print(False) 397 """)) 398 self.assertEqual(out.strip(), 'True') 399 400 def test_already_destroyed(self): 401 interp = interpreters.create() 402 interp.close() 403 with self.assertRaises(InterpreterNotFoundError): 404 interp.is_running() 405 406 def test_with_only_background_threads(self): 407 r_interp, w_interp = self.pipe() 408 r_thread, w_thread = self.pipe() 409 410 DONE = b'D' 411 FINISHED = b'F' 412 413 interp = interpreters.create() 414 interp.exec(f"""if True: 415 import os 416 import threading 417 418 def task(): 419 v = os.read({r_thread}, 1) 420 assert v == {DONE!r} 421 os.write({w_interp}, {FINISHED!r}) 422 t = threading.Thread(target=task) 423 t.start() 424 """) 425 self.assertFalse(interp.is_running()) 426 427 os.write(w_thread, DONE) 428 interp.exec('t.join()') 429 self.assertEqual(os.read(r_interp, 1), FINISHED) 430 431 def test_created_with_capi(self): 432 script = dedent(f""" 433 import {interpreters.__name__} as interpreters 434 interp = interpreters.get_current() 435 print(interp.is_running()) 436 """) 437 def parse_results(text): 438 self.assertNotEqual(text, "") 439 try: 440 return eval(text) 441 except Exception: 442 raise Exception(repr(text)) 443 444 with self.subTest('running __main__ (from self)'): 445 with self.interpreter_from_capi() as interpid: 446 text = self.run_from_capi(interpid, script, main=True) 447 running = parse_results(text) 448 self.assertTrue(running) 449 450 with self.subTest('running, but not __main__ (from self)'): 451 text = self.run_temp_from_capi(script) 452 running = parse_results(text) 453 self.assertFalse(running) 454 455 with self.subTest('running __main__ (from other)'): 456 with self.interpreter_obj_from_capi() as (interp, interpid): 457 before = interp.is_running() 458 with self.running_from_capi(interpid, main=True): 459 during = interp.is_running() 460 after = interp.is_running() 461 self.assertFalse(before) 462 self.assertTrue(during) 463 self.assertFalse(after) 464 465 with self.subTest('running, but not __main__ (from other)'): 466 with self.interpreter_obj_from_capi() as (interp, interpid): 467 before = interp.is_running() 468 with self.running_from_capi(interpid, main=False): 469 during = interp.is_running() 470 after = interp.is_running() 471 self.assertFalse(before) 472 self.assertFalse(during) 473 self.assertFalse(after) 474 475 with self.subTest('not running (from other)'): 476 with self.interpreter_obj_from_capi() as (interp, _): 477 running = interp.is_running() 478 self.assertFalse(running) 479 480 481class TestInterpreterClose(TestBase): 482 483 def test_basic(self): 484 main = interpreters.get_main() 485 interp1 = interpreters.create() 486 interp2 = interpreters.create() 487 interp3 = interpreters.create() 488 self.assertEqual(set(interpreters.list_all()), 489 {main, interp1, interp2, interp3}) 490 interp2.close() 491 self.assertEqual(set(interpreters.list_all()), 492 {main, interp1, interp3}) 493 494 def test_all(self): 495 before = set(interpreters.list_all()) 496 interps = set() 497 for _ in range(3): 498 interp = interpreters.create() 499 interps.add(interp) 500 self.assertEqual(set(interpreters.list_all()), before | interps) 501 for interp in interps: 502 interp.close() 503 self.assertEqual(set(interpreters.list_all()), before) 504 505 def test_main(self): 506 main, = interpreters.list_all() 507 with self.assertRaises(InterpreterError): 508 main.close() 509 510 def f(): 511 with self.assertRaises(InterpreterError): 512 main.close() 513 514 t = threading.Thread(target=f) 515 t.start() 516 t.join() 517 518 def test_already_destroyed(self): 519 interp = interpreters.create() 520 interp.close() 521 with self.assertRaises(InterpreterNotFoundError): 522 interp.close() 523 524 def test_from_current(self): 525 main, = interpreters.list_all() 526 interp = interpreters.create() 527 out = _run_output(interp, dedent(f""" 528 from test.support import interpreters 529 interp = interpreters.Interpreter({interp.id}) 530 try: 531 interp.close() 532 except interpreters.InterpreterError: 533 print('failed') 534 """)) 535 self.assertEqual(out.strip(), 'failed') 536 self.assertEqual(set(interpreters.list_all()), {main, interp}) 537 538 def test_from_sibling(self): 539 main, = interpreters.list_all() 540 interp1 = interpreters.create() 541 interp2 = interpreters.create() 542 self.assertEqual(set(interpreters.list_all()), 543 {main, interp1, interp2}) 544 interp1.exec(dedent(f""" 545 from test.support import interpreters 546 interp2 = interpreters.Interpreter({interp2.id}) 547 interp2.close() 548 interp3 = interpreters.create() 549 interp3.close() 550 """)) 551 self.assertEqual(set(interpreters.list_all()), {main, interp1}) 552 553 def test_from_other_thread(self): 554 interp = interpreters.create() 555 def f(): 556 interp.close() 557 558 t = threading.Thread(target=f) 559 t.start() 560 t.join() 561 562 # XXX Is this still true? 563 @unittest.skip('Fails on FreeBSD') 564 def test_still_running(self): 565 main, = interpreters.list_all() 566 interp = interpreters.create() 567 with _running(interp): 568 with self.assertRaises(InterpreterError): 569 interp.close() 570 self.assertTrue(interp.is_running()) 571 572 def test_subthreads_still_running(self): 573 r_interp, w_interp = self.pipe() 574 r_thread, w_thread = self.pipe() 575 576 FINISHED = b'F' 577 578 interp = interpreters.create() 579 interp.exec(f"""if True: 580 import os 581 import threading 582 import time 583 584 done = False 585 586 def notify_fini(): 587 global done 588 done = True 589 t.join() 590 threading._register_atexit(notify_fini) 591 592 def task(): 593 while not done: 594 time.sleep(0.1) 595 os.write({w_interp}, {FINISHED!r}) 596 t = threading.Thread(target=task) 597 t.start() 598 """) 599 interp.close() 600 601 self.assertEqual(os.read(r_interp, 1), FINISHED) 602 603 def test_created_with_capi(self): 604 script = dedent(f""" 605 import {interpreters.__name__} as interpreters 606 interp = interpreters.get_current() 607 interp.close() 608 """) 609 610 with self.subTest('running __main__ (from self)'): 611 with self.interpreter_from_capi() as interpid: 612 with self.assertRaisesRegex(ExecutionFailed, 613 'InterpreterError.*unrecognized'): 614 self.run_from_capi(interpid, script, main=True) 615 616 with self.subTest('running, but not __main__ (from self)'): 617 with self.assertRaisesRegex(ExecutionFailed, 618 'InterpreterError.*unrecognized'): 619 self.run_temp_from_capi(script) 620 621 with self.subTest('running __main__ (from other)'): 622 with self.interpreter_obj_from_capi() as (interp, interpid): 623 with self.running_from_capi(interpid, main=True): 624 with self.assertRaisesRegex(InterpreterError, 'unrecognized'): 625 interp.close() 626 # Make sure it wssn't closed. 627 self.assertTrue( 628 self.interp_exists(interpid)) 629 630 # The rest would be skipped until we deal with running threads when 631 # interp.close() is called. However, the "whence" restrictions 632 # trigger first. 633 634 with self.subTest('running, but not __main__ (from other)'): 635 with self.interpreter_obj_from_capi() as (interp, interpid): 636 with self.running_from_capi(interpid, main=False): 637 with self.assertRaisesRegex(InterpreterError, 'unrecognized'): 638 interp.close() 639 # Make sure it wssn't closed. 640 self.assertTrue( 641 self.interp_exists(interpid)) 642 643 with self.subTest('not running (from other)'): 644 with self.interpreter_obj_from_capi() as (interp, interpid): 645 with self.assertRaisesRegex(InterpreterError, 'unrecognized'): 646 interp.close() 647 self.assertTrue( 648 self.interp_exists(interpid)) 649 650 651class TestInterpreterPrepareMain(TestBase): 652 653 def test_empty(self): 654 interp = interpreters.create() 655 with self.assertRaises(ValueError): 656 interp.prepare_main() 657 658 def test_dict(self): 659 values = {'spam': 42, 'eggs': 'ham'} 660 interp = interpreters.create() 661 interp.prepare_main(values) 662 out = _run_output(interp, dedent(""" 663 print(spam, eggs) 664 """)) 665 self.assertEqual(out.strip(), '42 ham') 666 667 def test_tuple(self): 668 values = {'spam': 42, 'eggs': 'ham'} 669 values = tuple(values.items()) 670 interp = interpreters.create() 671 interp.prepare_main(values) 672 out = _run_output(interp, dedent(""" 673 print(spam, eggs) 674 """)) 675 self.assertEqual(out.strip(), '42 ham') 676 677 def test_kwargs(self): 678 values = {'spam': 42, 'eggs': 'ham'} 679 interp = interpreters.create() 680 interp.prepare_main(**values) 681 out = _run_output(interp, dedent(""" 682 print(spam, eggs) 683 """)) 684 self.assertEqual(out.strip(), '42 ham') 685 686 def test_dict_and_kwargs(self): 687 values = {'spam': 42, 'eggs': 'ham'} 688 interp = interpreters.create() 689 interp.prepare_main(values, foo='bar') 690 out = _run_output(interp, dedent(""" 691 print(spam, eggs, foo) 692 """)) 693 self.assertEqual(out.strip(), '42 ham bar') 694 695 def test_not_shareable(self): 696 interp = interpreters.create() 697 # XXX TypeError? 698 with self.assertRaises(ValueError): 699 interp.prepare_main(spam={'spam': 'eggs', 'foo': 'bar'}) 700 701 # Make sure neither was actually bound. 702 with self.assertRaises(ExecutionFailed): 703 interp.exec('print(foo)') 704 with self.assertRaises(ExecutionFailed): 705 interp.exec('print(spam)') 706 707 def test_running(self): 708 interp = interpreters.create() 709 interp.prepare_main({'spam': True}) 710 with self.running(interp): 711 with self.assertRaisesRegex(InterpreterError, 'running'): 712 interp.prepare_main({'spam': False}) 713 interp.exec('assert spam is True') 714 715 @requires_test_modules 716 def test_created_with_capi(self): 717 with self.interpreter_obj_from_capi() as (interp, interpid): 718 with self.assertRaisesRegex(InterpreterError, 'unrecognized'): 719 interp.prepare_main({'spam': True}) 720 with self.assertRaisesRegex(ExecutionFailed, 'NameError'): 721 self.run_from_capi(interpid, 'assert spam is True') 722 723 724class TestInterpreterExec(TestBase): 725 726 def test_success(self): 727 interp = interpreters.create() 728 script, results = _captured_script('print("it worked!", end="")') 729 with results: 730 interp.exec(script) 731 results = results.final() 732 results.raise_if_failed() 733 out = results.stdout 734 735 self.assertEqual(out, 'it worked!') 736 737 def test_failure(self): 738 interp = interpreters.create() 739 with self.assertRaises(ExecutionFailed): 740 interp.exec('raise Exception') 741 742 @force_not_colorized 743 def test_display_preserved_exception(self): 744 tempdir = self.temp_dir() 745 modfile = self.make_module('spam', tempdir, text=""" 746 def ham(): 747 raise RuntimeError('uh-oh!') 748 749 def eggs(): 750 ham() 751 """) 752 scriptfile = self.make_script('script.py', tempdir, text=""" 753 from test.support import interpreters 754 755 def script(): 756 import spam 757 spam.eggs() 758 759 interp = interpreters.create() 760 interp.exec(script) 761 """) 762 763 stdout, stderr = self.assert_python_failure(scriptfile) 764 self.maxDiff = None 765 interpmod_line, = (l for l in stderr.splitlines() if ' exec' in l) 766 # File "{interpreters.__file__}", line 179, in exec 767 self.assertEqual(stderr, dedent(f"""\ 768 Traceback (most recent call last): 769 File "{scriptfile}", line 9, in <module> 770 interp.exec(script) 771 ~~~~~~~~~~~^^^^^^^^ 772 {interpmod_line.strip()} 773 raise ExecutionFailed(excinfo) 774 test.support.interpreters.ExecutionFailed: RuntimeError: uh-oh! 775 776 Uncaught in the interpreter: 777 778 Traceback (most recent call last): 779 File "{scriptfile}", line 6, in script 780 spam.eggs() 781 ~~~~~~~~~^^ 782 File "{modfile}", line 6, in eggs 783 ham() 784 ~~~^^ 785 File "{modfile}", line 3, in ham 786 raise RuntimeError('uh-oh!') 787 RuntimeError: uh-oh! 788 """)) 789 self.assertEqual(stdout, '') 790 791 def test_in_thread(self): 792 interp = interpreters.create() 793 script, results = _captured_script('print("it worked!", end="")') 794 with results: 795 def f(): 796 interp.exec(script) 797 798 t = threading.Thread(target=f) 799 t.start() 800 t.join() 801 results = results.final() 802 results.raise_if_failed() 803 out = results.stdout 804 805 self.assertEqual(out, 'it worked!') 806 807 @support.requires_fork() 808 def test_fork(self): 809 interp = interpreters.create() 810 import tempfile 811 with tempfile.NamedTemporaryFile('w+', encoding='utf-8') as file: 812 file.write('') 813 file.flush() 814 815 expected = 'spam spam spam spam spam' 816 script = dedent(f""" 817 import os 818 try: 819 os.fork() 820 except RuntimeError: 821 with open('{file.name}', 'w', encoding='utf-8') as out: 822 out.write('{expected}') 823 """) 824 interp.exec(script) 825 826 file.seek(0) 827 content = file.read() 828 self.assertEqual(content, expected) 829 830 # XXX Is this still true? 831 @unittest.skip('Fails on FreeBSD') 832 def test_already_running(self): 833 interp = interpreters.create() 834 with _running(interp): 835 with self.assertRaises(RuntimeError): 836 interp.exec('print("spam")') 837 838 def test_bad_script(self): 839 interp = interpreters.create() 840 with self.assertRaises(TypeError): 841 interp.exec(10) 842 843 def test_bytes_for_script(self): 844 interp = interpreters.create() 845 with self.assertRaises(TypeError): 846 interp.exec(b'print("spam")') 847 848 def test_with_background_threads_still_running(self): 849 r_interp, w_interp = self.pipe() 850 r_thread, w_thread = self.pipe() 851 852 RAN = b'R' 853 DONE = b'D' 854 FINISHED = b'F' 855 856 interp = interpreters.create() 857 interp.exec(f"""if True: 858 import os 859 import threading 860 861 def task(): 862 v = os.read({r_thread}, 1) 863 assert v == {DONE!r} 864 os.write({w_interp}, {FINISHED!r}) 865 t = threading.Thread(target=task) 866 t.start() 867 os.write({w_interp}, {RAN!r}) 868 """) 869 interp.exec(f"""if True: 870 os.write({w_interp}, {RAN!r}) 871 """) 872 873 os.write(w_thread, DONE) 874 interp.exec('t.join()') 875 self.assertEqual(os.read(r_interp, 1), RAN) 876 self.assertEqual(os.read(r_interp, 1), RAN) 877 self.assertEqual(os.read(r_interp, 1), FINISHED) 878 879 def test_created_with_capi(self): 880 with self.interpreter_obj_from_capi() as (interp, _): 881 with self.assertRaisesRegex(InterpreterError, 'unrecognized'): 882 interp.exec('raise Exception("it worked!")') 883 884 # test__interpreters covers the remaining 885 # Interpreter.exec() behavior. 886 887 888def call_func_noop(): 889 pass 890 891 892def call_func_return_shareable(): 893 return (1, None) 894 895 896def call_func_return_not_shareable(): 897 return [1, 2, 3] 898 899 900def call_func_failure(): 901 raise Exception('spam!') 902 903 904def call_func_ident(value): 905 return value 906 907 908def get_call_func_closure(value): 909 def call_func_closure(): 910 return value 911 return call_func_closure 912 913 914class Spam: 915 916 @staticmethod 917 def noop(): 918 pass 919 920 @classmethod 921 def from_values(cls, *values): 922 return cls(values) 923 924 def __init__(self, value): 925 self.value = value 926 927 def __call__(self, *args, **kwargs): 928 return (self.value, args, kwargs) 929 930 def __eq__(self, other): 931 if not isinstance(other, Spam): 932 return NotImplemented 933 return self.value == other.value 934 935 def run(self, *args, **kwargs): 936 return (self.value, args, kwargs) 937 938 939def call_func_complex(op, /, value=None, *args, exc=None, **kwargs): 940 if exc is not None: 941 raise exc 942 if op == '': 943 raise ValueError('missing op') 944 elif op == 'ident': 945 if args or kwargs: 946 raise Exception((args, kwargs)) 947 return value 948 elif op == 'full-ident': 949 return (value, args, kwargs) 950 elif op == 'globals': 951 if value is not None or args or kwargs: 952 raise Exception((value, args, kwargs)) 953 return __name__ 954 elif op == 'interpid': 955 if value is not None or args or kwargs: 956 raise Exception((value, args, kwargs)) 957 return interpreters.get_current().id 958 elif op == 'closure': 959 if args or kwargs: 960 raise Exception((args, kwargs)) 961 return get_call_func_closure(value) 962 elif op == 'custom': 963 if args or kwargs: 964 raise Exception((args, kwargs)) 965 return Spam(value) 966 elif op == 'custom-inner': 967 if args or kwargs: 968 raise Exception((args, kwargs)) 969 class Eggs(Spam): 970 pass 971 return Eggs(value) 972 elif not isinstance(op, str): 973 raise TypeError(op) 974 else: 975 raise NotImplementedError(op) 976 977 978class TestInterpreterCall(TestBase): 979 980 # signature 981 # - blank 982 # - args 983 # - kwargs 984 # - args, kwargs 985 # return 986 # - nothing (None) 987 # - simple 988 # - closure 989 # - custom 990 # ops: 991 # - do nothing 992 # - fail 993 # - echo 994 # - do complex, relative to interpreter 995 # scope 996 # - global func 997 # - local closure 998 # - returned closure 999 # - callable type instance 1000 # - type 1001 # - classmethod 1002 # - staticmethod 1003 # - instance method 1004 # exception 1005 # - builtin 1006 # - custom 1007 # - preserves info (e.g. SyntaxError) 1008 # - matching error display 1009 1010 def test_call(self): 1011 interp = interpreters.create() 1012 1013 for i, (callable, args, kwargs) in enumerate([ 1014 (call_func_noop, (), {}), 1015 (call_func_return_shareable, (), {}), 1016 (call_func_return_not_shareable, (), {}), 1017 (Spam.noop, (), {}), 1018 ]): 1019 with self.subTest(f'success case #{i+1}'): 1020 res = interp.call(callable) 1021 self.assertIs(res, None) 1022 1023 for i, (callable, args, kwargs) in enumerate([ 1024 (call_func_ident, ('spamspamspam',), {}), 1025 (get_call_func_closure, (42,), {}), 1026 (get_call_func_closure(42), (), {}), 1027 (Spam.from_values, (), {}), 1028 (Spam.from_values, (1, 2, 3), {}), 1029 (Spam, ('???'), {}), 1030 (Spam(101), (), {}), 1031 (Spam(10101).run, (), {}), 1032 (call_func_complex, ('ident', 'spam'), {}), 1033 (call_func_complex, ('full-ident', 'spam'), {}), 1034 (call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'}), 1035 (call_func_complex, ('globals',), {}), 1036 (call_func_complex, ('interpid',), {}), 1037 (call_func_complex, ('closure',), {'value': '~~~'}), 1038 (call_func_complex, ('custom', 'spam!'), {}), 1039 (call_func_complex, ('custom-inner', 'eggs!'), {}), 1040 (call_func_complex, ('???',), {'exc': ValueError('spam')}), 1041 ]): 1042 with self.subTest(f'invalid case #{i+1}'): 1043 with self.assertRaises(Exception): 1044 if args or kwargs: 1045 raise Exception((args, kwargs)) 1046 interp.call(callable) 1047 1048 with self.assertRaises(ExecutionFailed): 1049 interp.call(call_func_failure) 1050 1051 def test_call_in_thread(self): 1052 interp = interpreters.create() 1053 1054 for i, (callable, args, kwargs) in enumerate([ 1055 (call_func_noop, (), {}), 1056 (call_func_return_shareable, (), {}), 1057 (call_func_return_not_shareable, (), {}), 1058 (Spam.noop, (), {}), 1059 ]): 1060 with self.subTest(f'success case #{i+1}'): 1061 with self.captured_thread_exception() as ctx: 1062 t = interp.call_in_thread(callable) 1063 t.join() 1064 self.assertIsNone(ctx.caught) 1065 1066 for i, (callable, args, kwargs) in enumerate([ 1067 (call_func_ident, ('spamspamspam',), {}), 1068 (get_call_func_closure, (42,), {}), 1069 (get_call_func_closure(42), (), {}), 1070 (Spam.from_values, (), {}), 1071 (Spam.from_values, (1, 2, 3), {}), 1072 (Spam, ('???'), {}), 1073 (Spam(101), (), {}), 1074 (Spam(10101).run, (), {}), 1075 (call_func_complex, ('ident', 'spam'), {}), 1076 (call_func_complex, ('full-ident', 'spam'), {}), 1077 (call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'}), 1078 (call_func_complex, ('globals',), {}), 1079 (call_func_complex, ('interpid',), {}), 1080 (call_func_complex, ('closure',), {'value': '~~~'}), 1081 (call_func_complex, ('custom', 'spam!'), {}), 1082 (call_func_complex, ('custom-inner', 'eggs!'), {}), 1083 (call_func_complex, ('???',), {'exc': ValueError('spam')}), 1084 ]): 1085 with self.subTest(f'invalid case #{i+1}'): 1086 if args or kwargs: 1087 continue 1088 with self.captured_thread_exception() as ctx: 1089 t = interp.call_in_thread(callable) 1090 t.join() 1091 self.assertIsNotNone(ctx.caught) 1092 1093 with self.captured_thread_exception() as ctx: 1094 t = interp.call_in_thread(call_func_failure) 1095 t.join() 1096 self.assertIsNotNone(ctx.caught) 1097 1098 1099class TestIsShareable(TestBase): 1100 1101 def test_default_shareables(self): 1102 shareables = [ 1103 # singletons 1104 None, 1105 # builtin objects 1106 b'spam', 1107 'spam', 1108 10, 1109 -10, 1110 True, 1111 False, 1112 100.0, 1113 (), 1114 (1, ('spam', 'eggs'), True), 1115 ] 1116 for obj in shareables: 1117 with self.subTest(obj): 1118 shareable = interpreters.is_shareable(obj) 1119 self.assertTrue(shareable) 1120 1121 def test_not_shareable(self): 1122 class Cheese: 1123 def __init__(self, name): 1124 self.name = name 1125 def __str__(self): 1126 return self.name 1127 1128 class SubBytes(bytes): 1129 """A subclass of a shareable type.""" 1130 1131 not_shareables = [ 1132 # singletons 1133 NotImplemented, 1134 ..., 1135 # builtin types and objects 1136 type, 1137 object, 1138 object(), 1139 Exception(), 1140 # user-defined types and objects 1141 Cheese, 1142 Cheese('Wensleydale'), 1143 SubBytes(b'spam'), 1144 ] 1145 for obj in not_shareables: 1146 with self.subTest(repr(obj)): 1147 self.assertFalse( 1148 interpreters.is_shareable(obj)) 1149 1150 1151class LowLevelTests(TestBase): 1152 1153 # The behaviors in the low-level module are important in as much 1154 # as they are exercised by the high-level module. Therefore the 1155 # most important testing happens in the high-level tests. 1156 # These low-level tests cover corner cases that are not 1157 # encountered by the high-level module, thus they 1158 # mostly shouldn't matter as much. 1159 1160 def test_new_config(self): 1161 # This test overlaps with 1162 # test.test_capi.test_misc.InterpreterConfigTests. 1163 1164 default = _interpreters.new_config('isolated') 1165 with self.subTest('no arg'): 1166 config = _interpreters.new_config() 1167 self.assert_ns_equal(config, default) 1168 self.assertIsNot(config, default) 1169 1170 with self.subTest('default'): 1171 config1 = _interpreters.new_config('default') 1172 self.assert_ns_equal(config1, default) 1173 self.assertIsNot(config1, default) 1174 1175 config2 = _interpreters.new_config('default') 1176 self.assert_ns_equal(config2, config1) 1177 self.assertIsNot(config2, config1) 1178 1179 for arg in ['', 'default']: 1180 with self.subTest(f'default ({arg!r})'): 1181 config = _interpreters.new_config(arg) 1182 self.assert_ns_equal(config, default) 1183 self.assertIsNot(config, default) 1184 1185 supported = { 1186 'isolated': types.SimpleNamespace( 1187 use_main_obmalloc=False, 1188 allow_fork=False, 1189 allow_exec=False, 1190 allow_threads=True, 1191 allow_daemon_threads=False, 1192 check_multi_interp_extensions=True, 1193 gil='own', 1194 ), 1195 'legacy': types.SimpleNamespace( 1196 use_main_obmalloc=True, 1197 allow_fork=True, 1198 allow_exec=True, 1199 allow_threads=True, 1200 allow_daemon_threads=True, 1201 check_multi_interp_extensions=bool(Py_GIL_DISABLED), 1202 gil='shared', 1203 ), 1204 'empty': types.SimpleNamespace( 1205 use_main_obmalloc=False, 1206 allow_fork=False, 1207 allow_exec=False, 1208 allow_threads=False, 1209 allow_daemon_threads=False, 1210 check_multi_interp_extensions=False, 1211 gil='default', 1212 ), 1213 } 1214 gil_supported = ['default', 'shared', 'own'] 1215 1216 for name, vanilla in supported.items(): 1217 with self.subTest(f'supported ({name})'): 1218 expected = vanilla 1219 config1 = _interpreters.new_config(name) 1220 self.assert_ns_equal(config1, expected) 1221 self.assertIsNot(config1, expected) 1222 1223 config2 = _interpreters.new_config(name) 1224 self.assert_ns_equal(config2, config1) 1225 self.assertIsNot(config2, config1) 1226 1227 with self.subTest(f'noop override ({name})'): 1228 expected = vanilla 1229 overrides = vars(vanilla) 1230 config = _interpreters.new_config(name, **overrides) 1231 self.assert_ns_equal(config, expected) 1232 1233 with self.subTest(f'override all ({name})'): 1234 overrides = {k: not v for k, v in vars(vanilla).items()} 1235 for gil in gil_supported: 1236 if vanilla.gil == gil: 1237 continue 1238 overrides['gil'] = gil 1239 expected = types.SimpleNamespace(**overrides) 1240 config = _interpreters.new_config(name, **overrides) 1241 self.assert_ns_equal(config, expected) 1242 1243 # Override individual fields. 1244 for field, old in vars(vanilla).items(): 1245 if field == 'gil': 1246 values = [v for v in gil_supported if v != old] 1247 else: 1248 values = [not old] 1249 for val in values: 1250 with self.subTest(f'{name}.{field} ({old!r} -> {val!r})'): 1251 overrides = {field: val} 1252 expected = types.SimpleNamespace( 1253 **dict(vars(vanilla), **overrides), 1254 ) 1255 config = _interpreters.new_config(name, **overrides) 1256 self.assert_ns_equal(config, expected) 1257 1258 with self.subTest('extra override'): 1259 with self.assertRaises(ValueError): 1260 _interpreters.new_config(spam=True) 1261 1262 # Bad values for bool fields. 1263 for field, value in vars(supported['empty']).items(): 1264 if field == 'gil': 1265 continue 1266 assert isinstance(value, bool) 1267 for value in [1, '', 'spam', 1.0, None, object()]: 1268 with self.subTest(f'bad override ({field}={value!r})'): 1269 with self.assertRaises(TypeError): 1270 _interpreters.new_config(**{field: value}) 1271 1272 # Bad values for .gil. 1273 for value in [True, 1, 1.0, None, object()]: 1274 with self.subTest(f'bad override (gil={value!r})'): 1275 with self.assertRaises(TypeError): 1276 _interpreters.new_config(gil=value) 1277 for value in ['', 'spam']: 1278 with self.subTest(f'bad override (gil={value!r})'): 1279 with self.assertRaises(ValueError): 1280 _interpreters.new_config(gil=value) 1281 1282 def test_get_main(self): 1283 interpid, whence = _interpreters.get_main() 1284 self.assertEqual(interpid, 0) 1285 self.assertEqual(whence, _interpreters.WHENCE_RUNTIME) 1286 self.assertEqual( 1287 _interpreters.whence(interpid), 1288 _interpreters.WHENCE_RUNTIME) 1289 1290 def test_get_current(self): 1291 with self.subTest('main'): 1292 main, *_ = _interpreters.get_main() 1293 interpid, whence = _interpreters.get_current() 1294 self.assertEqual(interpid, main) 1295 self.assertEqual(whence, _interpreters.WHENCE_RUNTIME) 1296 1297 script = f""" 1298 import _interpreters 1299 interpid, whence = _interpreters.get_current() 1300 print((interpid, whence)) 1301 """ 1302 def parse_stdout(text): 1303 interpid, whence = eval(text) 1304 return interpid, whence 1305 1306 with self.subTest('from _interpreters'): 1307 orig = _interpreters.create() 1308 text = self.run_and_capture(orig, script) 1309 interpid, whence = parse_stdout(text) 1310 self.assertEqual(interpid, orig) 1311 self.assertEqual(whence, _interpreters.WHENCE_STDLIB) 1312 1313 with self.subTest('from C-API'): 1314 last = 0 1315 for id, *_ in _interpreters.list_all(): 1316 last = max(last, id) 1317 expected = last + 1 1318 text = self.run_temp_from_capi(script) 1319 interpid, whence = parse_stdout(text) 1320 self.assertEqual(interpid, expected) 1321 self.assertEqual(whence, _interpreters.WHENCE_CAPI) 1322 1323 def test_list_all(self): 1324 mainid, *_ = _interpreters.get_main() 1325 interpid1 = _interpreters.create() 1326 interpid2 = _interpreters.create() 1327 interpid3 = _interpreters.create() 1328 expected = [ 1329 (mainid, _interpreters.WHENCE_RUNTIME), 1330 (interpid1, _interpreters.WHENCE_STDLIB), 1331 (interpid2, _interpreters.WHENCE_STDLIB), 1332 (interpid3, _interpreters.WHENCE_STDLIB), 1333 ] 1334 1335 with self.subTest('main'): 1336 res = _interpreters.list_all() 1337 self.assertEqual(res, expected) 1338 1339 with self.subTest('via interp from _interpreters'): 1340 text = self.run_and_capture(interpid2, f""" 1341 import _interpreters 1342 print( 1343 _interpreters.list_all()) 1344 """) 1345 1346 res = eval(text) 1347 self.assertEqual(res, expected) 1348 1349 with self.subTest('via interp from C-API'): 1350 interpid4 = interpid3 + 1 1351 interpid5 = interpid4 + 1 1352 expected2 = expected + [ 1353 (interpid4, _interpreters.WHENCE_CAPI), 1354 (interpid5, _interpreters.WHENCE_STDLIB), 1355 ] 1356 expected3 = expected + [ 1357 (interpid5, _interpreters.WHENCE_STDLIB), 1358 ] 1359 text = self.run_temp_from_capi(f""" 1360 import _interpreters 1361 _interpreters.create() 1362 print( 1363 _interpreters.list_all()) 1364 """) 1365 res2 = eval(text) 1366 res3 = _interpreters.list_all() 1367 self.assertEqual(res2, expected2) 1368 self.assertEqual(res3, expected3) 1369 1370 def test_create(self): 1371 isolated = _interpreters.new_config('isolated') 1372 legacy = _interpreters.new_config('legacy') 1373 default = isolated 1374 1375 with self.subTest('no args'): 1376 interpid = _interpreters.create() 1377 config = _interpreters.get_config(interpid) 1378 self.assert_ns_equal(config, default) 1379 1380 with self.subTest('config: None'): 1381 interpid = _interpreters.create(None) 1382 config = _interpreters.get_config(interpid) 1383 self.assert_ns_equal(config, default) 1384 1385 with self.subTest('config: \'empty\''): 1386 with self.assertRaises(InterpreterError): 1387 # The "empty" config isn't viable on its own. 1388 _interpreters.create('empty') 1389 1390 for arg, expected in { 1391 '': default, 1392 'default': default, 1393 'isolated': isolated, 1394 'legacy': legacy, 1395 }.items(): 1396 with self.subTest(f'str arg: {arg!r}'): 1397 interpid = _interpreters.create(arg) 1398 config = _interpreters.get_config(interpid) 1399 self.assert_ns_equal(config, expected) 1400 1401 with self.subTest('custom'): 1402 orig = _interpreters.new_config('empty') 1403 orig.use_main_obmalloc = True 1404 orig.check_multi_interp_extensions = bool(Py_GIL_DISABLED) 1405 orig.gil = 'shared' 1406 interpid = _interpreters.create(orig) 1407 config = _interpreters.get_config(interpid) 1408 self.assert_ns_equal(config, orig) 1409 1410 with self.subTest('missing fields'): 1411 orig = _interpreters.new_config() 1412 del orig.gil 1413 with self.assertRaises(ValueError): 1414 _interpreters.create(orig) 1415 1416 with self.subTest('extra fields'): 1417 orig = _interpreters.new_config() 1418 orig.spam = True 1419 with self.assertRaises(ValueError): 1420 _interpreters.create(orig) 1421 1422 with self.subTest('whence'): 1423 interpid = _interpreters.create() 1424 self.assertEqual( 1425 _interpreters.whence(interpid), 1426 _interpreters.WHENCE_STDLIB) 1427 1428 @requires_test_modules 1429 def test_destroy(self): 1430 with self.subTest('from _interpreters'): 1431 interpid = _interpreters.create() 1432 before = [id for id, *_ in _interpreters.list_all()] 1433 _interpreters.destroy(interpid) 1434 after = [id for id, *_ in _interpreters.list_all()] 1435 1436 self.assertIn(interpid, before) 1437 self.assertNotIn(interpid, after) 1438 self.assertFalse( 1439 self.interp_exists(interpid)) 1440 1441 with self.subTest('main'): 1442 interpid, *_ = _interpreters.get_main() 1443 with self.assertRaises(InterpreterError): 1444 # It is the current interpreter. 1445 _interpreters.destroy(interpid) 1446 1447 with self.subTest('from C-API'): 1448 interpid = _testinternalcapi.create_interpreter() 1449 with self.assertRaisesRegex(InterpreterError, 'unrecognized'): 1450 _interpreters.destroy(interpid, restrict=True) 1451 self.assertTrue( 1452 self.interp_exists(interpid)) 1453 _interpreters.destroy(interpid) 1454 self.assertFalse( 1455 self.interp_exists(interpid)) 1456 1457 def test_get_config(self): 1458 # This test overlaps with 1459 # test.test_capi.test_misc.InterpreterConfigTests. 1460 1461 with self.subTest('main'): 1462 expected = _interpreters.new_config('legacy') 1463 expected.gil = 'own' 1464 if Py_GIL_DISABLED: 1465 expected.check_multi_interp_extensions = False 1466 interpid, *_ = _interpreters.get_main() 1467 config = _interpreters.get_config(interpid) 1468 self.assert_ns_equal(config, expected) 1469 1470 with self.subTest('isolated'): 1471 expected = _interpreters.new_config('isolated') 1472 interpid = _interpreters.create('isolated') 1473 config = _interpreters.get_config(interpid) 1474 self.assert_ns_equal(config, expected) 1475 1476 with self.subTest('legacy'): 1477 expected = _interpreters.new_config('legacy') 1478 interpid = _interpreters.create('legacy') 1479 config = _interpreters.get_config(interpid) 1480 self.assert_ns_equal(config, expected) 1481 1482 with self.subTest('from C-API'): 1483 orig = _interpreters.new_config('isolated') 1484 with self.interpreter_from_capi(orig) as interpid: 1485 with self.assertRaisesRegex(InterpreterError, 'unrecognized'): 1486 _interpreters.get_config(interpid, restrict=True) 1487 config = _interpreters.get_config(interpid) 1488 self.assert_ns_equal(config, orig) 1489 1490 @requires_test_modules 1491 def test_whence(self): 1492 with self.subTest('main'): 1493 interpid, *_ = _interpreters.get_main() 1494 whence = _interpreters.whence(interpid) 1495 self.assertEqual(whence, _interpreters.WHENCE_RUNTIME) 1496 1497 with self.subTest('stdlib'): 1498 interpid = _interpreters.create() 1499 whence = _interpreters.whence(interpid) 1500 self.assertEqual(whence, _interpreters.WHENCE_STDLIB) 1501 1502 for orig, name in { 1503 _interpreters.WHENCE_UNKNOWN: 'not ready', 1504 _interpreters.WHENCE_LEGACY_CAPI: 'legacy C-API', 1505 _interpreters.WHENCE_CAPI: 'C-API', 1506 _interpreters.WHENCE_XI: 'cross-interpreter C-API', 1507 }.items(): 1508 with self.subTest(f'from C-API ({orig}: {name})'): 1509 with self.interpreter_from_capi(whence=orig) as interpid: 1510 whence = _interpreters.whence(interpid) 1511 self.assertEqual(whence, orig) 1512 1513 with self.subTest('from C-API, running'): 1514 text = self.run_temp_from_capi(dedent(f""" 1515 import _interpreters 1516 interpid, *_ = _interpreters.get_current() 1517 print(_interpreters.whence(interpid)) 1518 """), 1519 config=True) 1520 whence = eval(text) 1521 self.assertEqual(whence, _interpreters.WHENCE_CAPI) 1522 1523 with self.subTest('from legacy C-API, running'): 1524 ... 1525 text = self.run_temp_from_capi(dedent(f""" 1526 import _interpreters 1527 interpid, *_ = _interpreters.get_current() 1528 print(_interpreters.whence(interpid)) 1529 """), 1530 config=False) 1531 whence = eval(text) 1532 self.assertEqual(whence, _interpreters.WHENCE_LEGACY_CAPI) 1533 1534 def test_is_running(self): 1535 def check(interpid, expected): 1536 with self.assertRaisesRegex(InterpreterError, 'unrecognized'): 1537 _interpreters.is_running(interpid, restrict=True) 1538 running = _interpreters.is_running(interpid) 1539 self.assertIs(running, expected) 1540 1541 with self.subTest('from _interpreters (running)'): 1542 interpid = _interpreters.create() 1543 with self.running(interpid): 1544 running = _interpreters.is_running(interpid) 1545 self.assertTrue(running) 1546 1547 with self.subTest('from _interpreters (not running)'): 1548 interpid = _interpreters.create() 1549 running = _interpreters.is_running(interpid) 1550 self.assertFalse(running) 1551 1552 with self.subTest('main'): 1553 interpid, *_ = _interpreters.get_main() 1554 check(interpid, True) 1555 1556 with self.subTest('from C-API (running __main__)'): 1557 with self.interpreter_from_capi() as interpid: 1558 with self.running_from_capi(interpid, main=True): 1559 check(interpid, True) 1560 1561 with self.subTest('from C-API (running, but not __main__)'): 1562 with self.interpreter_from_capi() as interpid: 1563 with self.running_from_capi(interpid, main=False): 1564 check(interpid, False) 1565 1566 with self.subTest('from C-API (not running)'): 1567 with self.interpreter_from_capi() as interpid: 1568 check(interpid, False) 1569 1570 def test_exec(self): 1571 with self.subTest('run script'): 1572 interpid = _interpreters.create() 1573 script, results = _captured_script('print("it worked!", end="")') 1574 with results: 1575 exc = _interpreters.exec(interpid, script) 1576 results = results.final() 1577 results.raise_if_failed() 1578 out = results.stdout 1579 self.assertEqual(out, 'it worked!') 1580 1581 with self.subTest('uncaught exception'): 1582 interpid = _interpreters.create() 1583 script, results = _captured_script(""" 1584 raise Exception('uh-oh!') 1585 print("it worked!", end="") 1586 """) 1587 with results: 1588 exc = _interpreters.exec(interpid, script) 1589 out = results.stdout() 1590 self.assertEqual(out, '') 1591 self.assert_ns_equal(exc, types.SimpleNamespace( 1592 type=types.SimpleNamespace( 1593 __name__='Exception', 1594 __qualname__='Exception', 1595 __module__='builtins', 1596 ), 1597 msg='uh-oh!', 1598 # We check these in other tests. 1599 formatted=exc.formatted, 1600 errdisplay=exc.errdisplay, 1601 )) 1602 1603 with self.subTest('from C-API'): 1604 with self.interpreter_from_capi() as interpid: 1605 with self.assertRaisesRegex(InterpreterError, 'unrecognized'): 1606 _interpreters.exec(interpid, 'raise Exception("it worked!")', 1607 restrict=True) 1608 exc = _interpreters.exec(interpid, 'raise Exception("it worked!")') 1609 self.assertIsNot(exc, None) 1610 self.assertEqual(exc.msg, 'it worked!') 1611 1612 def test_call(self): 1613 with self.subTest('no args'): 1614 interpid = _interpreters.create() 1615 exc = _interpreters.call(interpid, call_func_return_shareable) 1616 self.assertIs(exc, None) 1617 1618 with self.subTest('uncaught exception'): 1619 interpid = _interpreters.create() 1620 exc = _interpreters.call(interpid, call_func_failure) 1621 self.assertEqual(exc, types.SimpleNamespace( 1622 type=types.SimpleNamespace( 1623 __name__='Exception', 1624 __qualname__='Exception', 1625 __module__='builtins', 1626 ), 1627 msg='spam!', 1628 # We check these in other tests. 1629 formatted=exc.formatted, 1630 errdisplay=exc.errdisplay, 1631 )) 1632 1633 @requires_test_modules 1634 def test_set___main___attrs(self): 1635 with self.subTest('from _interpreters'): 1636 interpid = _interpreters.create() 1637 before1 = _interpreters.exec(interpid, 'assert spam == \'eggs\'') 1638 before2 = _interpreters.exec(interpid, 'assert ham == 42') 1639 self.assertEqual(before1.type.__name__, 'NameError') 1640 self.assertEqual(before2.type.__name__, 'NameError') 1641 1642 _interpreters.set___main___attrs(interpid, dict( 1643 spam='eggs', 1644 ham=42, 1645 )) 1646 after1 = _interpreters.exec(interpid, 'assert spam == \'eggs\'') 1647 after2 = _interpreters.exec(interpid, 'assert ham == 42') 1648 after3 = _interpreters.exec(interpid, 'assert spam == 42') 1649 self.assertIs(after1, None) 1650 self.assertIs(after2, None) 1651 self.assertEqual(after3.type.__name__, 'AssertionError') 1652 1653 with self.assertRaises(ValueError): 1654 # GH-127165: Embedded NULL characters broke the lookup 1655 _interpreters.set___main___attrs(interpid, {"\x00": 1}) 1656 1657 with self.subTest('from C-API'): 1658 with self.interpreter_from_capi() as interpid: 1659 with self.assertRaisesRegex(InterpreterError, 'unrecognized'): 1660 _interpreters.set___main___attrs(interpid, {'spam': True}, 1661 restrict=True) 1662 _interpreters.set___main___attrs(interpid, {'spam': True}) 1663 rc = _testinternalcapi.exec_interpreter( 1664 interpid, 1665 'assert spam is True', 1666 ) 1667 self.assertEqual(rc, 0) 1668 1669 1670if __name__ == '__main__': 1671 # Test needs to be a package, so we can do relative imports. 1672 unittest.main() 1673