• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import os
2import pickle
3import sys
4from textwrap import dedent, indent
5import threading
6import types
7import unittest
8
9from test import support
10from test.support import import_helper
11# Raise SkipTest if subinterpreters not supported.
12_interpreters = import_helper.import_module('_interpreters')
13from test.support import Py_GIL_DISABLED
14from test.support import interpreters
15from test.support import force_not_colorized
16from test.support.interpreters import (
17    InterpreterError, InterpreterNotFoundError, ExecutionFailed,
18)
19from .utils import (
20    _captured_script, _run_output, _running, TestBase,
21    requires_test_modules, _testinternalcapi,
22)
23
24
25WHENCE_STR_UNKNOWN = 'unknown'
26WHENCE_STR_RUNTIME = 'runtime init'
27WHENCE_STR_LEGACY_CAPI = 'legacy C-API'
28WHENCE_STR_CAPI = 'C-API'
29WHENCE_STR_XI = 'cross-interpreter C-API'
30WHENCE_STR_STDLIB = '_interpreters module'
31
32
33class ModuleTests(TestBase):
34
35    def test_queue_aliases(self):
36        first = [
37            interpreters.create_queue,
38            interpreters.Queue,
39            interpreters.QueueEmpty,
40            interpreters.QueueFull,
41        ]
42        second = [
43            interpreters.create_queue,
44            interpreters.Queue,
45            interpreters.QueueEmpty,
46            interpreters.QueueFull,
47        ]
48        self.assertEqual(second, first)
49
50
51class CreateTests(TestBase):
52
53    def test_in_main(self):
54        interp = interpreters.create()
55        self.assertIsInstance(interp, interpreters.Interpreter)
56        self.assertIn(interp, interpreters.list_all())
57
58        # GH-126221: Passing an invalid Unicode character used to cause a SystemError
59        self.assertRaises(UnicodeEncodeError, _interpreters.create, '\udc80')
60
61    def test_in_thread(self):
62        lock = threading.Lock()
63        interp = None
64        def f():
65            nonlocal interp
66            interp = interpreters.create()
67            lock.acquire()
68            lock.release()
69        t = threading.Thread(target=f)
70        with lock:
71            t.start()
72        t.join()
73        self.assertIn(interp, interpreters.list_all())
74
75    def test_in_subinterpreter(self):
76        main, = interpreters.list_all()
77        interp = interpreters.create()
78        out = _run_output(interp, dedent("""
79            from test.support import interpreters
80            interp = interpreters.create()
81            print(interp.id)
82            """))
83        interp2 = interpreters.Interpreter(int(out))
84        self.assertEqual(interpreters.list_all(), [main, interp, interp2])
85
86    def test_after_destroy_all(self):
87        before = set(interpreters.list_all())
88        # Create 3 subinterpreters.
89        interp_lst = []
90        for _ in range(3):
91            interps = interpreters.create()
92            interp_lst.append(interps)
93        # Now destroy them.
94        for interp in interp_lst:
95            interp.close()
96        # Finally, create another.
97        interp = interpreters.create()
98        self.assertEqual(set(interpreters.list_all()), before | {interp})
99
100    def test_after_destroy_some(self):
101        before = set(interpreters.list_all())
102        # Create 3 subinterpreters.
103        interp1 = interpreters.create()
104        interp2 = interpreters.create()
105        interp3 = interpreters.create()
106        # Now destroy 2 of them.
107        interp1.close()
108        interp2.close()
109        # Finally, create another.
110        interp = interpreters.create()
111        self.assertEqual(set(interpreters.list_all()), before | {interp3, interp})
112
113
114class GetMainTests(TestBase):
115
116    def test_id(self):
117        main = interpreters.get_main()
118        self.assertEqual(main.id, 0)
119
120    def test_current(self):
121        main = interpreters.get_main()
122        current = interpreters.get_current()
123        self.assertIs(main, current)
124
125    def test_idempotent(self):
126        main1 = interpreters.get_main()
127        main2 = interpreters.get_main()
128        self.assertIs(main1, main2)
129
130
131class GetCurrentTests(TestBase):
132
133    def test_main(self):
134        main = interpreters.get_main()
135        current = interpreters.get_current()
136        self.assertEqual(current, main)
137
138    def test_subinterpreter(self):
139        main = interpreters.get_main()
140        interp = interpreters.create()
141        out = _run_output(interp, dedent("""
142            from test.support import interpreters
143            cur = interpreters.get_current()
144            print(cur.id)
145            """))
146        current = interpreters.Interpreter(int(out))
147        self.assertEqual(current, interp)
148        self.assertNotEqual(current, main)
149
150    def test_idempotent(self):
151        with self.subTest('main'):
152            cur1 = interpreters.get_current()
153            cur2 = interpreters.get_current()
154            self.assertIs(cur1, cur2)
155
156        with self.subTest('subinterpreter'):
157            interp = interpreters.create()
158            out = _run_output(interp, dedent("""
159                from test.support import interpreters
160                cur = interpreters.get_current()
161                print(id(cur))
162                cur = interpreters.get_current()
163                print(id(cur))
164                """))
165            objid1, objid2 = (int(v) for v in out.splitlines())
166            self.assertEqual(objid1, objid2)
167
168        with self.subTest('per-interpreter'):
169            interp = interpreters.create()
170            out = _run_output(interp, dedent("""
171                from test.support import interpreters
172                cur = interpreters.get_current()
173                print(id(cur))
174                """))
175            id1 = int(out)
176            id2 = id(interp)
177            self.assertNotEqual(id1, id2)
178
179    @requires_test_modules
180    def test_created_with_capi(self):
181        expected = _testinternalcapi.next_interpreter_id()
182        text = self.run_temp_from_capi(f"""
183            import {interpreters.__name__} as interpreters
184            interp = interpreters.get_current()
185            print((interp.id, interp.whence))
186            """)
187        interpid, whence = eval(text)
188        self.assertEqual(interpid, expected)
189        self.assertEqual(whence, WHENCE_STR_CAPI)
190
191
192class ListAllTests(TestBase):
193
194    def test_initial(self):
195        interps = interpreters.list_all()
196        self.assertEqual(1, len(interps))
197
198    def test_after_creating(self):
199        main = interpreters.get_current()
200        first = interpreters.create()
201        second = interpreters.create()
202
203        ids = []
204        for interp in interpreters.list_all():
205            ids.append(interp.id)
206
207        self.assertEqual(ids, [main.id, first.id, second.id])
208
209    def test_after_destroying(self):
210        main = interpreters.get_current()
211        first = interpreters.create()
212        second = interpreters.create()
213        first.close()
214
215        ids = []
216        for interp in interpreters.list_all():
217            ids.append(interp.id)
218
219        self.assertEqual(ids, [main.id, second.id])
220
221    def test_idempotent(self):
222        main = interpreters.get_current()
223        first = interpreters.create()
224        second = interpreters.create()
225        expected = [main, first, second]
226
227        actual = interpreters.list_all()
228
229        self.assertEqual(actual, expected)
230        for interp1, interp2 in zip(actual, expected):
231            self.assertIs(interp1, interp2)
232
233    def test_created_with_capi(self):
234        mainid, *_ = _interpreters.get_main()
235        interpid1 = _interpreters.create()
236        interpid2 = _interpreters.create()
237        interpid3 = _interpreters.create()
238        interpid4 = interpid3 + 1
239        interpid5 = interpid4 + 1
240        expected = [
241            (mainid, WHENCE_STR_RUNTIME),
242            (interpid1, WHENCE_STR_STDLIB),
243            (interpid2, WHENCE_STR_STDLIB),
244            (interpid3, WHENCE_STR_STDLIB),
245            (interpid4, WHENCE_STR_CAPI),
246            (interpid5, WHENCE_STR_STDLIB),
247        ]
248        expected2 = expected[:-2]
249        text = self.run_temp_from_capi(f"""
250            import {interpreters.__name__} as interpreters
251            interp = interpreters.create()
252            print(
253                [(i.id, i.whence) for i in interpreters.list_all()])
254            """)
255        res = eval(text)
256        res2 = [(i.id, i.whence) for i in interpreters.list_all()]
257        self.assertEqual(res, expected)
258        self.assertEqual(res2, expected2)
259
260
261class InterpreterObjectTests(TestBase):
262
263    def test_init_int(self):
264        interpid = interpreters.get_current().id
265        interp = interpreters.Interpreter(interpid)
266        self.assertEqual(interp.id, interpid)
267
268    def test_init_interpreter_id(self):
269        interpid = interpreters.get_current()._id
270        interp = interpreters.Interpreter(interpid)
271        self.assertEqual(interp._id, interpid)
272
273    def test_init_unsupported(self):
274        actualid = interpreters.get_current().id
275        for interpid in [
276            str(actualid),
277            float(actualid),
278            object(),
279            None,
280            '',
281        ]:
282            with self.subTest(repr(interpid)):
283                with self.assertRaises(TypeError):
284                    interpreters.Interpreter(interpid)
285
286    def test_idempotent(self):
287        main = interpreters.get_main()
288        interp = interpreters.Interpreter(main.id)
289        self.assertIs(interp, main)
290
291    def test_init_does_not_exist(self):
292        with self.assertRaises(InterpreterNotFoundError):
293            interpreters.Interpreter(1_000_000)
294
295    def test_init_bad_id(self):
296        with self.assertRaises(ValueError):
297            interpreters.Interpreter(-1)
298
299    def test_id_type(self):
300        main = interpreters.get_main()
301        current = interpreters.get_current()
302        interp = interpreters.create()
303        self.assertIsInstance(main.id, int)
304        self.assertIsInstance(current.id, int)
305        self.assertIsInstance(interp.id, int)
306
307    def test_id_readonly(self):
308        interp = interpreters.create()
309        with self.assertRaises(AttributeError):
310            interp.id = 1_000_000
311
312    def test_whence(self):
313        main = interpreters.get_main()
314        interp = interpreters.create()
315
316        with self.subTest('main'):
317            self.assertEqual(main.whence, WHENCE_STR_RUNTIME)
318
319        with self.subTest('from _interpreters'):
320            self.assertEqual(interp.whence, WHENCE_STR_STDLIB)
321
322        with self.subTest('from C-API'):
323            text = self.run_temp_from_capi(f"""
324                import {interpreters.__name__} as interpreters
325                interp = interpreters.get_current()
326                print(repr(interp.whence))
327                """)
328            whence = eval(text)
329            self.assertEqual(whence, WHENCE_STR_CAPI)
330
331        with self.subTest('readonly'):
332            for value in [
333                None,
334                WHENCE_STR_UNKNOWN,
335                WHENCE_STR_RUNTIME,
336                WHENCE_STR_STDLIB,
337                WHENCE_STR_CAPI,
338            ]:
339                with self.assertRaises(AttributeError):
340                    interp.whence = value
341                with self.assertRaises(AttributeError):
342                    main.whence = value
343
344    def test_hashable(self):
345        interp = interpreters.create()
346        expected = hash(interp.id)
347        actual = hash(interp)
348        self.assertEqual(actual, expected)
349
350    def test_equality(self):
351        interp1 = interpreters.create()
352        interp2 = interpreters.create()
353        self.assertEqual(interp1, interp1)
354        self.assertNotEqual(interp1, interp2)
355
356    def test_pickle(self):
357        interp = interpreters.create()
358        data = pickle.dumps(interp)
359        unpickled = pickle.loads(data)
360        self.assertEqual(unpickled, interp)
361
362
363class TestInterpreterIsRunning(TestBase):
364
365    def test_main(self):
366        main = interpreters.get_main()
367        self.assertTrue(main.is_running())
368
369    # XXX Is this still true?
370    @unittest.skip('Fails on FreeBSD')
371    def test_subinterpreter(self):
372        interp = interpreters.create()
373        self.assertFalse(interp.is_running())
374
375        with _running(interp):
376            self.assertTrue(interp.is_running())
377        self.assertFalse(interp.is_running())
378
379    def test_finished(self):
380        r, w = self.pipe()
381        interp = interpreters.create()
382        interp.exec(f"""if True:
383            import os
384            os.write({w}, b'x')
385            """)
386        self.assertFalse(interp.is_running())
387        self.assertEqual(os.read(r, 1), b'x')
388
389    def test_from_subinterpreter(self):
390        interp = interpreters.create()
391        out = _run_output(interp, dedent(f"""
392            import _interpreters
393            if _interpreters.is_running({interp.id}):
394                print(True)
395            else:
396                print(False)
397            """))
398        self.assertEqual(out.strip(), 'True')
399
400    def test_already_destroyed(self):
401        interp = interpreters.create()
402        interp.close()
403        with self.assertRaises(InterpreterNotFoundError):
404            interp.is_running()
405
406    def test_with_only_background_threads(self):
407        r_interp, w_interp = self.pipe()
408        r_thread, w_thread = self.pipe()
409
410        DONE = b'D'
411        FINISHED = b'F'
412
413        interp = interpreters.create()
414        interp.exec(f"""if True:
415            import os
416            import threading
417
418            def task():
419                v = os.read({r_thread}, 1)
420                assert v == {DONE!r}
421                os.write({w_interp}, {FINISHED!r})
422            t = threading.Thread(target=task)
423            t.start()
424            """)
425        self.assertFalse(interp.is_running())
426
427        os.write(w_thread, DONE)
428        interp.exec('t.join()')
429        self.assertEqual(os.read(r_interp, 1), FINISHED)
430
431    def test_created_with_capi(self):
432        script = dedent(f"""
433            import {interpreters.__name__} as interpreters
434            interp = interpreters.get_current()
435            print(interp.is_running())
436            """)
437        def parse_results(text):
438            self.assertNotEqual(text, "")
439            try:
440                return eval(text)
441            except Exception:
442                raise Exception(repr(text))
443
444        with self.subTest('running __main__ (from self)'):
445            with self.interpreter_from_capi() as interpid:
446                text = self.run_from_capi(interpid, script, main=True)
447            running = parse_results(text)
448            self.assertTrue(running)
449
450        with self.subTest('running, but not __main__ (from self)'):
451            text = self.run_temp_from_capi(script)
452            running = parse_results(text)
453            self.assertFalse(running)
454
455        with self.subTest('running __main__ (from other)'):
456            with self.interpreter_obj_from_capi() as (interp, interpid):
457                before = interp.is_running()
458                with self.running_from_capi(interpid, main=True):
459                    during = interp.is_running()
460                after = interp.is_running()
461            self.assertFalse(before)
462            self.assertTrue(during)
463            self.assertFalse(after)
464
465        with self.subTest('running, but not __main__ (from other)'):
466            with self.interpreter_obj_from_capi() as (interp, interpid):
467                before = interp.is_running()
468                with self.running_from_capi(interpid, main=False):
469                    during = interp.is_running()
470                after = interp.is_running()
471            self.assertFalse(before)
472            self.assertFalse(during)
473            self.assertFalse(after)
474
475        with self.subTest('not running (from other)'):
476            with self.interpreter_obj_from_capi() as (interp, _):
477                running = interp.is_running()
478            self.assertFalse(running)
479
480
481class TestInterpreterClose(TestBase):
482
483    def test_basic(self):
484        main = interpreters.get_main()
485        interp1 = interpreters.create()
486        interp2 = interpreters.create()
487        interp3 = interpreters.create()
488        self.assertEqual(set(interpreters.list_all()),
489                         {main, interp1, interp2, interp3})
490        interp2.close()
491        self.assertEqual(set(interpreters.list_all()),
492                         {main, interp1, interp3})
493
494    def test_all(self):
495        before = set(interpreters.list_all())
496        interps = set()
497        for _ in range(3):
498            interp = interpreters.create()
499            interps.add(interp)
500        self.assertEqual(set(interpreters.list_all()), before | interps)
501        for interp in interps:
502            interp.close()
503        self.assertEqual(set(interpreters.list_all()), before)
504
505    def test_main(self):
506        main, = interpreters.list_all()
507        with self.assertRaises(InterpreterError):
508            main.close()
509
510        def f():
511            with self.assertRaises(InterpreterError):
512                main.close()
513
514        t = threading.Thread(target=f)
515        t.start()
516        t.join()
517
518    def test_already_destroyed(self):
519        interp = interpreters.create()
520        interp.close()
521        with self.assertRaises(InterpreterNotFoundError):
522            interp.close()
523
524    def test_from_current(self):
525        main, = interpreters.list_all()
526        interp = interpreters.create()
527        out = _run_output(interp, dedent(f"""
528            from test.support import interpreters
529            interp = interpreters.Interpreter({interp.id})
530            try:
531                interp.close()
532            except interpreters.InterpreterError:
533                print('failed')
534            """))
535        self.assertEqual(out.strip(), 'failed')
536        self.assertEqual(set(interpreters.list_all()), {main, interp})
537
538    def test_from_sibling(self):
539        main, = interpreters.list_all()
540        interp1 = interpreters.create()
541        interp2 = interpreters.create()
542        self.assertEqual(set(interpreters.list_all()),
543                         {main, interp1, interp2})
544        interp1.exec(dedent(f"""
545            from test.support import interpreters
546            interp2 = interpreters.Interpreter({interp2.id})
547            interp2.close()
548            interp3 = interpreters.create()
549            interp3.close()
550            """))
551        self.assertEqual(set(interpreters.list_all()), {main, interp1})
552
553    def test_from_other_thread(self):
554        interp = interpreters.create()
555        def f():
556            interp.close()
557
558        t = threading.Thread(target=f)
559        t.start()
560        t.join()
561
562    # XXX Is this still true?
563    @unittest.skip('Fails on FreeBSD')
564    def test_still_running(self):
565        main, = interpreters.list_all()
566        interp = interpreters.create()
567        with _running(interp):
568            with self.assertRaises(InterpreterError):
569                interp.close()
570            self.assertTrue(interp.is_running())
571
572    def test_subthreads_still_running(self):
573        r_interp, w_interp = self.pipe()
574        r_thread, w_thread = self.pipe()
575
576        FINISHED = b'F'
577
578        interp = interpreters.create()
579        interp.exec(f"""if True:
580            import os
581            import threading
582            import time
583
584            done = False
585
586            def notify_fini():
587                global done
588                done = True
589                t.join()
590            threading._register_atexit(notify_fini)
591
592            def task():
593                while not done:
594                    time.sleep(0.1)
595                os.write({w_interp}, {FINISHED!r})
596            t = threading.Thread(target=task)
597            t.start()
598            """)
599        interp.close()
600
601        self.assertEqual(os.read(r_interp, 1), FINISHED)
602
603    def test_created_with_capi(self):
604        script = dedent(f"""
605            import {interpreters.__name__} as interpreters
606            interp = interpreters.get_current()
607            interp.close()
608            """)
609
610        with self.subTest('running __main__ (from self)'):
611            with self.interpreter_from_capi() as interpid:
612                with self.assertRaisesRegex(ExecutionFailed,
613                                            'InterpreterError.*unrecognized'):
614                    self.run_from_capi(interpid, script, main=True)
615
616        with self.subTest('running, but not __main__ (from self)'):
617            with self.assertRaisesRegex(ExecutionFailed,
618                                        'InterpreterError.*unrecognized'):
619                self.run_temp_from_capi(script)
620
621        with self.subTest('running __main__ (from other)'):
622            with self.interpreter_obj_from_capi() as (interp, interpid):
623                with self.running_from_capi(interpid, main=True):
624                    with self.assertRaisesRegex(InterpreterError, 'unrecognized'):
625                        interp.close()
626                    # Make sure it wssn't closed.
627                    self.assertTrue(
628                        self.interp_exists(interpid))
629
630        # The rest would be skipped until we deal with running threads when
631        # interp.close() is called.  However, the "whence" restrictions
632        # trigger first.
633
634        with self.subTest('running, but not __main__ (from other)'):
635            with self.interpreter_obj_from_capi() as (interp, interpid):
636                with self.running_from_capi(interpid, main=False):
637                    with self.assertRaisesRegex(InterpreterError, 'unrecognized'):
638                        interp.close()
639                    # Make sure it wssn't closed.
640                    self.assertTrue(
641                        self.interp_exists(interpid))
642
643        with self.subTest('not running (from other)'):
644            with self.interpreter_obj_from_capi() as (interp, interpid):
645                with self.assertRaisesRegex(InterpreterError, 'unrecognized'):
646                    interp.close()
647                self.assertTrue(
648                    self.interp_exists(interpid))
649
650
651class TestInterpreterPrepareMain(TestBase):
652
653    def test_empty(self):
654        interp = interpreters.create()
655        with self.assertRaises(ValueError):
656            interp.prepare_main()
657
658    def test_dict(self):
659        values = {'spam': 42, 'eggs': 'ham'}
660        interp = interpreters.create()
661        interp.prepare_main(values)
662        out = _run_output(interp, dedent("""
663            print(spam, eggs)
664            """))
665        self.assertEqual(out.strip(), '42 ham')
666
667    def test_tuple(self):
668        values = {'spam': 42, 'eggs': 'ham'}
669        values = tuple(values.items())
670        interp = interpreters.create()
671        interp.prepare_main(values)
672        out = _run_output(interp, dedent("""
673            print(spam, eggs)
674            """))
675        self.assertEqual(out.strip(), '42 ham')
676
677    def test_kwargs(self):
678        values = {'spam': 42, 'eggs': 'ham'}
679        interp = interpreters.create()
680        interp.prepare_main(**values)
681        out = _run_output(interp, dedent("""
682            print(spam, eggs)
683            """))
684        self.assertEqual(out.strip(), '42 ham')
685
686    def test_dict_and_kwargs(self):
687        values = {'spam': 42, 'eggs': 'ham'}
688        interp = interpreters.create()
689        interp.prepare_main(values, foo='bar')
690        out = _run_output(interp, dedent("""
691            print(spam, eggs, foo)
692            """))
693        self.assertEqual(out.strip(), '42 ham bar')
694
695    def test_not_shareable(self):
696        interp = interpreters.create()
697        # XXX TypeError?
698        with self.assertRaises(ValueError):
699            interp.prepare_main(spam={'spam': 'eggs', 'foo': 'bar'})
700
701        # Make sure neither was actually bound.
702        with self.assertRaises(ExecutionFailed):
703            interp.exec('print(foo)')
704        with self.assertRaises(ExecutionFailed):
705            interp.exec('print(spam)')
706
707    def test_running(self):
708        interp = interpreters.create()
709        interp.prepare_main({'spam': True})
710        with self.running(interp):
711            with self.assertRaisesRegex(InterpreterError, 'running'):
712                interp.prepare_main({'spam': False})
713        interp.exec('assert spam is True')
714
715    @requires_test_modules
716    def test_created_with_capi(self):
717        with self.interpreter_obj_from_capi() as (interp, interpid):
718            with self.assertRaisesRegex(InterpreterError, 'unrecognized'):
719                interp.prepare_main({'spam': True})
720            with self.assertRaisesRegex(ExecutionFailed, 'NameError'):
721                self.run_from_capi(interpid, 'assert spam is True')
722
723
724class TestInterpreterExec(TestBase):
725
726    def test_success(self):
727        interp = interpreters.create()
728        script, results = _captured_script('print("it worked!", end="")')
729        with results:
730            interp.exec(script)
731        results = results.final()
732        results.raise_if_failed()
733        out = results.stdout
734
735        self.assertEqual(out, 'it worked!')
736
737    def test_failure(self):
738        interp = interpreters.create()
739        with self.assertRaises(ExecutionFailed):
740            interp.exec('raise Exception')
741
742    @force_not_colorized
743    def test_display_preserved_exception(self):
744        tempdir = self.temp_dir()
745        modfile = self.make_module('spam', tempdir, text="""
746            def ham():
747                raise RuntimeError('uh-oh!')
748
749            def eggs():
750                ham()
751            """)
752        scriptfile = self.make_script('script.py', tempdir, text="""
753            from test.support import interpreters
754
755            def script():
756                import spam
757                spam.eggs()
758
759            interp = interpreters.create()
760            interp.exec(script)
761            """)
762
763        stdout, stderr = self.assert_python_failure(scriptfile)
764        self.maxDiff = None
765        interpmod_line, = (l for l in stderr.splitlines() if ' exec' in l)
766        #      File "{interpreters.__file__}", line 179, in exec
767        self.assertEqual(stderr, dedent(f"""\
768            Traceback (most recent call last):
769              File "{scriptfile}", line 9, in <module>
770                interp.exec(script)
771                ~~~~~~~~~~~^^^^^^^^
772              {interpmod_line.strip()}
773                raise ExecutionFailed(excinfo)
774            test.support.interpreters.ExecutionFailed: RuntimeError: uh-oh!
775
776            Uncaught in the interpreter:
777
778            Traceback (most recent call last):
779              File "{scriptfile}", line 6, in script
780                spam.eggs()
781                ~~~~~~~~~^^
782              File "{modfile}", line 6, in eggs
783                ham()
784                ~~~^^
785              File "{modfile}", line 3, in ham
786                raise RuntimeError('uh-oh!')
787            RuntimeError: uh-oh!
788            """))
789        self.assertEqual(stdout, '')
790
791    def test_in_thread(self):
792        interp = interpreters.create()
793        script, results = _captured_script('print("it worked!", end="")')
794        with results:
795            def f():
796                interp.exec(script)
797
798            t = threading.Thread(target=f)
799            t.start()
800            t.join()
801        results = results.final()
802        results.raise_if_failed()
803        out = results.stdout
804
805        self.assertEqual(out, 'it worked!')
806
807    @support.requires_fork()
808    def test_fork(self):
809        interp = interpreters.create()
810        import tempfile
811        with tempfile.NamedTemporaryFile('w+', encoding='utf-8') as file:
812            file.write('')
813            file.flush()
814
815            expected = 'spam spam spam spam spam'
816            script = dedent(f"""
817                import os
818                try:
819                    os.fork()
820                except RuntimeError:
821                    with open('{file.name}', 'w', encoding='utf-8') as out:
822                        out.write('{expected}')
823                """)
824            interp.exec(script)
825
826            file.seek(0)
827            content = file.read()
828            self.assertEqual(content, expected)
829
830    # XXX Is this still true?
831    @unittest.skip('Fails on FreeBSD')
832    def test_already_running(self):
833        interp = interpreters.create()
834        with _running(interp):
835            with self.assertRaises(RuntimeError):
836                interp.exec('print("spam")')
837
838    def test_bad_script(self):
839        interp = interpreters.create()
840        with self.assertRaises(TypeError):
841            interp.exec(10)
842
843    def test_bytes_for_script(self):
844        interp = interpreters.create()
845        with self.assertRaises(TypeError):
846            interp.exec(b'print("spam")')
847
848    def test_with_background_threads_still_running(self):
849        r_interp, w_interp = self.pipe()
850        r_thread, w_thread = self.pipe()
851
852        RAN = b'R'
853        DONE = b'D'
854        FINISHED = b'F'
855
856        interp = interpreters.create()
857        interp.exec(f"""if True:
858            import os
859            import threading
860
861            def task():
862                v = os.read({r_thread}, 1)
863                assert v == {DONE!r}
864                os.write({w_interp}, {FINISHED!r})
865            t = threading.Thread(target=task)
866            t.start()
867            os.write({w_interp}, {RAN!r})
868            """)
869        interp.exec(f"""if True:
870            os.write({w_interp}, {RAN!r})
871            """)
872
873        os.write(w_thread, DONE)
874        interp.exec('t.join()')
875        self.assertEqual(os.read(r_interp, 1), RAN)
876        self.assertEqual(os.read(r_interp, 1), RAN)
877        self.assertEqual(os.read(r_interp, 1), FINISHED)
878
879    def test_created_with_capi(self):
880        with self.interpreter_obj_from_capi() as (interp, _):
881            with self.assertRaisesRegex(InterpreterError, 'unrecognized'):
882                interp.exec('raise Exception("it worked!")')
883
884    # test__interpreters covers the remaining
885    # Interpreter.exec() behavior.
886
887
888def call_func_noop():
889    pass
890
891
892def call_func_return_shareable():
893    return (1, None)
894
895
896def call_func_return_not_shareable():
897    return [1, 2, 3]
898
899
900def call_func_failure():
901    raise Exception('spam!')
902
903
904def call_func_ident(value):
905    return value
906
907
908def get_call_func_closure(value):
909    def call_func_closure():
910        return value
911    return call_func_closure
912
913
914class Spam:
915
916    @staticmethod
917    def noop():
918        pass
919
920    @classmethod
921    def from_values(cls, *values):
922        return cls(values)
923
924    def __init__(self, value):
925        self.value = value
926
927    def __call__(self, *args, **kwargs):
928        return (self.value, args, kwargs)
929
930    def __eq__(self, other):
931        if not isinstance(other, Spam):
932            return NotImplemented
933        return self.value == other.value
934
935    def run(self, *args, **kwargs):
936        return (self.value, args, kwargs)
937
938
939def call_func_complex(op, /, value=None, *args, exc=None, **kwargs):
940    if exc is not None:
941        raise exc
942    if op == '':
943        raise ValueError('missing op')
944    elif op == 'ident':
945        if args or kwargs:
946            raise Exception((args, kwargs))
947        return value
948    elif op == 'full-ident':
949        return (value, args, kwargs)
950    elif op == 'globals':
951        if value is not None or args or kwargs:
952            raise Exception((value, args, kwargs))
953        return __name__
954    elif op == 'interpid':
955        if value is not None or args or kwargs:
956            raise Exception((value, args, kwargs))
957        return interpreters.get_current().id
958    elif op == 'closure':
959        if args or kwargs:
960            raise Exception((args, kwargs))
961        return get_call_func_closure(value)
962    elif op == 'custom':
963        if args or kwargs:
964            raise Exception((args, kwargs))
965        return Spam(value)
966    elif op == 'custom-inner':
967        if args or kwargs:
968            raise Exception((args, kwargs))
969        class Eggs(Spam):
970            pass
971        return Eggs(value)
972    elif not isinstance(op, str):
973        raise TypeError(op)
974    else:
975        raise NotImplementedError(op)
976
977
978class TestInterpreterCall(TestBase):
979
980    # signature
981    #  - blank
982    #  - args
983    #  - kwargs
984    #  - args, kwargs
985    # return
986    #  - nothing (None)
987    #  - simple
988    #  - closure
989    #  - custom
990    # ops:
991    #  - do nothing
992    #  - fail
993    #  - echo
994    #  - do complex, relative to interpreter
995    # scope
996    #  - global func
997    #  - local closure
998    #  - returned closure
999    #  - callable type instance
1000    #  - type
1001    #  - classmethod
1002    #  - staticmethod
1003    #  - instance method
1004    # exception
1005    #  - builtin
1006    #  - custom
1007    #  - preserves info (e.g. SyntaxError)
1008    #  - matching error display
1009
1010    def test_call(self):
1011        interp = interpreters.create()
1012
1013        for i, (callable, args, kwargs) in enumerate([
1014            (call_func_noop, (), {}),
1015            (call_func_return_shareable, (), {}),
1016            (call_func_return_not_shareable, (), {}),
1017            (Spam.noop, (), {}),
1018        ]):
1019            with self.subTest(f'success case #{i+1}'):
1020                res = interp.call(callable)
1021                self.assertIs(res, None)
1022
1023        for i, (callable, args, kwargs) in enumerate([
1024            (call_func_ident, ('spamspamspam',), {}),
1025            (get_call_func_closure, (42,), {}),
1026            (get_call_func_closure(42), (), {}),
1027            (Spam.from_values, (), {}),
1028            (Spam.from_values, (1, 2, 3), {}),
1029            (Spam, ('???'), {}),
1030            (Spam(101), (), {}),
1031            (Spam(10101).run, (), {}),
1032            (call_func_complex, ('ident', 'spam'), {}),
1033            (call_func_complex, ('full-ident', 'spam'), {}),
1034            (call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'}),
1035            (call_func_complex, ('globals',), {}),
1036            (call_func_complex, ('interpid',), {}),
1037            (call_func_complex, ('closure',), {'value': '~~~'}),
1038            (call_func_complex, ('custom', 'spam!'), {}),
1039            (call_func_complex, ('custom-inner', 'eggs!'), {}),
1040            (call_func_complex, ('???',), {'exc': ValueError('spam')}),
1041        ]):
1042            with self.subTest(f'invalid case #{i+1}'):
1043                with self.assertRaises(Exception):
1044                    if args or kwargs:
1045                        raise Exception((args, kwargs))
1046                    interp.call(callable)
1047
1048        with self.assertRaises(ExecutionFailed):
1049            interp.call(call_func_failure)
1050
1051    def test_call_in_thread(self):
1052        interp = interpreters.create()
1053
1054        for i, (callable, args, kwargs) in enumerate([
1055            (call_func_noop, (), {}),
1056            (call_func_return_shareable, (), {}),
1057            (call_func_return_not_shareable, (), {}),
1058            (Spam.noop, (), {}),
1059        ]):
1060            with self.subTest(f'success case #{i+1}'):
1061                with self.captured_thread_exception() as ctx:
1062                    t = interp.call_in_thread(callable)
1063                    t.join()
1064                self.assertIsNone(ctx.caught)
1065
1066        for i, (callable, args, kwargs) in enumerate([
1067            (call_func_ident, ('spamspamspam',), {}),
1068            (get_call_func_closure, (42,), {}),
1069            (get_call_func_closure(42), (), {}),
1070            (Spam.from_values, (), {}),
1071            (Spam.from_values, (1, 2, 3), {}),
1072            (Spam, ('???'), {}),
1073            (Spam(101), (), {}),
1074            (Spam(10101).run, (), {}),
1075            (call_func_complex, ('ident', 'spam'), {}),
1076            (call_func_complex, ('full-ident', 'spam'), {}),
1077            (call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'}),
1078            (call_func_complex, ('globals',), {}),
1079            (call_func_complex, ('interpid',), {}),
1080            (call_func_complex, ('closure',), {'value': '~~~'}),
1081            (call_func_complex, ('custom', 'spam!'), {}),
1082            (call_func_complex, ('custom-inner', 'eggs!'), {}),
1083            (call_func_complex, ('???',), {'exc': ValueError('spam')}),
1084        ]):
1085            with self.subTest(f'invalid case #{i+1}'):
1086                if args or kwargs:
1087                    continue
1088                with self.captured_thread_exception() as ctx:
1089                    t = interp.call_in_thread(callable)
1090                    t.join()
1091                self.assertIsNotNone(ctx.caught)
1092
1093        with self.captured_thread_exception() as ctx:
1094            t = interp.call_in_thread(call_func_failure)
1095            t.join()
1096        self.assertIsNotNone(ctx.caught)
1097
1098
1099class TestIsShareable(TestBase):
1100
1101    def test_default_shareables(self):
1102        shareables = [
1103                # singletons
1104                None,
1105                # builtin objects
1106                b'spam',
1107                'spam',
1108                10,
1109                -10,
1110                True,
1111                False,
1112                100.0,
1113                (),
1114                (1, ('spam', 'eggs'), True),
1115                ]
1116        for obj in shareables:
1117            with self.subTest(obj):
1118                shareable = interpreters.is_shareable(obj)
1119                self.assertTrue(shareable)
1120
1121    def test_not_shareable(self):
1122        class Cheese:
1123            def __init__(self, name):
1124                self.name = name
1125            def __str__(self):
1126                return self.name
1127
1128        class SubBytes(bytes):
1129            """A subclass of a shareable type."""
1130
1131        not_shareables = [
1132                # singletons
1133                NotImplemented,
1134                ...,
1135                # builtin types and objects
1136                type,
1137                object,
1138                object(),
1139                Exception(),
1140                # user-defined types and objects
1141                Cheese,
1142                Cheese('Wensleydale'),
1143                SubBytes(b'spam'),
1144                ]
1145        for obj in not_shareables:
1146            with self.subTest(repr(obj)):
1147                self.assertFalse(
1148                    interpreters.is_shareable(obj))
1149
1150
1151class LowLevelTests(TestBase):
1152
1153    # The behaviors in the low-level module are important in as much
1154    # as they are exercised by the high-level module.  Therefore the
1155    # most important testing happens in the high-level tests.
1156    # These low-level tests cover corner cases that are not
1157    # encountered by the high-level module, thus they
1158    # mostly shouldn't matter as much.
1159
1160    def test_new_config(self):
1161        # This test overlaps with
1162        # test.test_capi.test_misc.InterpreterConfigTests.
1163
1164        default = _interpreters.new_config('isolated')
1165        with self.subTest('no arg'):
1166            config = _interpreters.new_config()
1167            self.assert_ns_equal(config, default)
1168            self.assertIsNot(config, default)
1169
1170        with self.subTest('default'):
1171            config1 = _interpreters.new_config('default')
1172            self.assert_ns_equal(config1, default)
1173            self.assertIsNot(config1, default)
1174
1175            config2 = _interpreters.new_config('default')
1176            self.assert_ns_equal(config2, config1)
1177            self.assertIsNot(config2, config1)
1178
1179        for arg in ['', 'default']:
1180            with self.subTest(f'default ({arg!r})'):
1181                config = _interpreters.new_config(arg)
1182                self.assert_ns_equal(config, default)
1183                self.assertIsNot(config, default)
1184
1185        supported = {
1186            'isolated': types.SimpleNamespace(
1187                use_main_obmalloc=False,
1188                allow_fork=False,
1189                allow_exec=False,
1190                allow_threads=True,
1191                allow_daemon_threads=False,
1192                check_multi_interp_extensions=True,
1193                gil='own',
1194            ),
1195            'legacy': types.SimpleNamespace(
1196                use_main_obmalloc=True,
1197                allow_fork=True,
1198                allow_exec=True,
1199                allow_threads=True,
1200                allow_daemon_threads=True,
1201                check_multi_interp_extensions=bool(Py_GIL_DISABLED),
1202                gil='shared',
1203            ),
1204            'empty': types.SimpleNamespace(
1205                use_main_obmalloc=False,
1206                allow_fork=False,
1207                allow_exec=False,
1208                allow_threads=False,
1209                allow_daemon_threads=False,
1210                check_multi_interp_extensions=False,
1211                gil='default',
1212            ),
1213        }
1214        gil_supported = ['default', 'shared', 'own']
1215
1216        for name, vanilla in supported.items():
1217            with self.subTest(f'supported ({name})'):
1218                expected = vanilla
1219                config1 = _interpreters.new_config(name)
1220                self.assert_ns_equal(config1, expected)
1221                self.assertIsNot(config1, expected)
1222
1223                config2 = _interpreters.new_config(name)
1224                self.assert_ns_equal(config2, config1)
1225                self.assertIsNot(config2, config1)
1226
1227            with self.subTest(f'noop override ({name})'):
1228                expected = vanilla
1229                overrides = vars(vanilla)
1230                config = _interpreters.new_config(name, **overrides)
1231                self.assert_ns_equal(config, expected)
1232
1233            with self.subTest(f'override all ({name})'):
1234                overrides = {k: not v for k, v in vars(vanilla).items()}
1235                for gil in gil_supported:
1236                    if vanilla.gil == gil:
1237                        continue
1238                    overrides['gil'] = gil
1239                    expected = types.SimpleNamespace(**overrides)
1240                    config = _interpreters.new_config(name, **overrides)
1241                    self.assert_ns_equal(config, expected)
1242
1243            # Override individual fields.
1244            for field, old in vars(vanilla).items():
1245                if field == 'gil':
1246                    values = [v for v in gil_supported if v != old]
1247                else:
1248                    values = [not old]
1249                for val in values:
1250                    with self.subTest(f'{name}.{field} ({old!r} -> {val!r})'):
1251                        overrides = {field: val}
1252                        expected = types.SimpleNamespace(
1253                            **dict(vars(vanilla), **overrides),
1254                        )
1255                        config = _interpreters.new_config(name, **overrides)
1256                        self.assert_ns_equal(config, expected)
1257
1258        with self.subTest('extra override'):
1259            with self.assertRaises(ValueError):
1260                _interpreters.new_config(spam=True)
1261
1262        # Bad values for bool fields.
1263        for field, value in vars(supported['empty']).items():
1264            if field == 'gil':
1265                continue
1266            assert isinstance(value, bool)
1267            for value in [1, '', 'spam', 1.0, None, object()]:
1268                with self.subTest(f'bad override ({field}={value!r})'):
1269                    with self.assertRaises(TypeError):
1270                        _interpreters.new_config(**{field: value})
1271
1272        # Bad values for .gil.
1273        for value in [True, 1, 1.0, None, object()]:
1274            with self.subTest(f'bad override (gil={value!r})'):
1275                with self.assertRaises(TypeError):
1276                    _interpreters.new_config(gil=value)
1277        for value in ['', 'spam']:
1278            with self.subTest(f'bad override (gil={value!r})'):
1279                with self.assertRaises(ValueError):
1280                    _interpreters.new_config(gil=value)
1281
1282    def test_get_main(self):
1283        interpid, whence = _interpreters.get_main()
1284        self.assertEqual(interpid, 0)
1285        self.assertEqual(whence, _interpreters.WHENCE_RUNTIME)
1286        self.assertEqual(
1287            _interpreters.whence(interpid),
1288            _interpreters.WHENCE_RUNTIME)
1289
1290    def test_get_current(self):
1291        with self.subTest('main'):
1292            main, *_ = _interpreters.get_main()
1293            interpid, whence = _interpreters.get_current()
1294            self.assertEqual(interpid, main)
1295            self.assertEqual(whence, _interpreters.WHENCE_RUNTIME)
1296
1297        script = f"""
1298            import _interpreters
1299            interpid, whence = _interpreters.get_current()
1300            print((interpid, whence))
1301            """
1302        def parse_stdout(text):
1303            interpid, whence = eval(text)
1304            return interpid, whence
1305
1306        with self.subTest('from _interpreters'):
1307            orig = _interpreters.create()
1308            text = self.run_and_capture(orig, script)
1309            interpid, whence = parse_stdout(text)
1310            self.assertEqual(interpid, orig)
1311            self.assertEqual(whence, _interpreters.WHENCE_STDLIB)
1312
1313        with self.subTest('from C-API'):
1314            last = 0
1315            for id, *_ in _interpreters.list_all():
1316                last = max(last, id)
1317            expected = last + 1
1318            text = self.run_temp_from_capi(script)
1319            interpid, whence = parse_stdout(text)
1320            self.assertEqual(interpid, expected)
1321            self.assertEqual(whence, _interpreters.WHENCE_CAPI)
1322
1323    def test_list_all(self):
1324        mainid, *_ = _interpreters.get_main()
1325        interpid1 = _interpreters.create()
1326        interpid2 = _interpreters.create()
1327        interpid3 = _interpreters.create()
1328        expected = [
1329            (mainid, _interpreters.WHENCE_RUNTIME),
1330            (interpid1, _interpreters.WHENCE_STDLIB),
1331            (interpid2, _interpreters.WHENCE_STDLIB),
1332            (interpid3, _interpreters.WHENCE_STDLIB),
1333        ]
1334
1335        with self.subTest('main'):
1336            res = _interpreters.list_all()
1337            self.assertEqual(res, expected)
1338
1339        with self.subTest('via interp from _interpreters'):
1340            text = self.run_and_capture(interpid2, f"""
1341                import _interpreters
1342                print(
1343                    _interpreters.list_all())
1344                """)
1345
1346            res = eval(text)
1347            self.assertEqual(res, expected)
1348
1349        with self.subTest('via interp from C-API'):
1350            interpid4 = interpid3 + 1
1351            interpid5 = interpid4 + 1
1352            expected2 = expected + [
1353                (interpid4, _interpreters.WHENCE_CAPI),
1354                (interpid5, _interpreters.WHENCE_STDLIB),
1355            ]
1356            expected3 = expected + [
1357                (interpid5, _interpreters.WHENCE_STDLIB),
1358            ]
1359            text = self.run_temp_from_capi(f"""
1360                import _interpreters
1361                _interpreters.create()
1362                print(
1363                    _interpreters.list_all())
1364                """)
1365            res2 = eval(text)
1366            res3 = _interpreters.list_all()
1367            self.assertEqual(res2, expected2)
1368            self.assertEqual(res3, expected3)
1369
1370    def test_create(self):
1371        isolated = _interpreters.new_config('isolated')
1372        legacy = _interpreters.new_config('legacy')
1373        default = isolated
1374
1375        with self.subTest('no args'):
1376            interpid = _interpreters.create()
1377            config = _interpreters.get_config(interpid)
1378            self.assert_ns_equal(config, default)
1379
1380        with self.subTest('config: None'):
1381            interpid = _interpreters.create(None)
1382            config = _interpreters.get_config(interpid)
1383            self.assert_ns_equal(config, default)
1384
1385        with self.subTest('config: \'empty\''):
1386            with self.assertRaises(InterpreterError):
1387                # The "empty" config isn't viable on its own.
1388                _interpreters.create('empty')
1389
1390        for arg, expected in {
1391            '': default,
1392            'default': default,
1393            'isolated': isolated,
1394            'legacy': legacy,
1395        }.items():
1396            with self.subTest(f'str arg: {arg!r}'):
1397                interpid = _interpreters.create(arg)
1398                config = _interpreters.get_config(interpid)
1399                self.assert_ns_equal(config, expected)
1400
1401        with self.subTest('custom'):
1402            orig = _interpreters.new_config('empty')
1403            orig.use_main_obmalloc = True
1404            orig.check_multi_interp_extensions = bool(Py_GIL_DISABLED)
1405            orig.gil = 'shared'
1406            interpid = _interpreters.create(orig)
1407            config = _interpreters.get_config(interpid)
1408            self.assert_ns_equal(config, orig)
1409
1410        with self.subTest('missing fields'):
1411            orig = _interpreters.new_config()
1412            del orig.gil
1413            with self.assertRaises(ValueError):
1414                _interpreters.create(orig)
1415
1416        with self.subTest('extra fields'):
1417            orig = _interpreters.new_config()
1418            orig.spam = True
1419            with self.assertRaises(ValueError):
1420                _interpreters.create(orig)
1421
1422        with self.subTest('whence'):
1423            interpid = _interpreters.create()
1424            self.assertEqual(
1425                _interpreters.whence(interpid),
1426                _interpreters.WHENCE_STDLIB)
1427
1428    @requires_test_modules
1429    def test_destroy(self):
1430        with self.subTest('from _interpreters'):
1431            interpid = _interpreters.create()
1432            before = [id for id, *_ in _interpreters.list_all()]
1433            _interpreters.destroy(interpid)
1434            after = [id for id, *_ in _interpreters.list_all()]
1435
1436            self.assertIn(interpid, before)
1437            self.assertNotIn(interpid, after)
1438            self.assertFalse(
1439                self.interp_exists(interpid))
1440
1441        with self.subTest('main'):
1442            interpid, *_ = _interpreters.get_main()
1443            with self.assertRaises(InterpreterError):
1444                # It is the current interpreter.
1445                _interpreters.destroy(interpid)
1446
1447        with self.subTest('from C-API'):
1448            interpid = _testinternalcapi.create_interpreter()
1449            with self.assertRaisesRegex(InterpreterError, 'unrecognized'):
1450                _interpreters.destroy(interpid, restrict=True)
1451            self.assertTrue(
1452                self.interp_exists(interpid))
1453            _interpreters.destroy(interpid)
1454            self.assertFalse(
1455                self.interp_exists(interpid))
1456
1457    def test_get_config(self):
1458        # This test overlaps with
1459        # test.test_capi.test_misc.InterpreterConfigTests.
1460
1461        with self.subTest('main'):
1462            expected = _interpreters.new_config('legacy')
1463            expected.gil = 'own'
1464            if Py_GIL_DISABLED:
1465                expected.check_multi_interp_extensions = False
1466            interpid, *_ = _interpreters.get_main()
1467            config = _interpreters.get_config(interpid)
1468            self.assert_ns_equal(config, expected)
1469
1470        with self.subTest('isolated'):
1471            expected = _interpreters.new_config('isolated')
1472            interpid = _interpreters.create('isolated')
1473            config = _interpreters.get_config(interpid)
1474            self.assert_ns_equal(config, expected)
1475
1476        with self.subTest('legacy'):
1477            expected = _interpreters.new_config('legacy')
1478            interpid = _interpreters.create('legacy')
1479            config = _interpreters.get_config(interpid)
1480            self.assert_ns_equal(config, expected)
1481
1482        with self.subTest('from C-API'):
1483            orig = _interpreters.new_config('isolated')
1484            with self.interpreter_from_capi(orig) as interpid:
1485                with self.assertRaisesRegex(InterpreterError, 'unrecognized'):
1486                    _interpreters.get_config(interpid, restrict=True)
1487                config = _interpreters.get_config(interpid)
1488            self.assert_ns_equal(config, orig)
1489
1490    @requires_test_modules
1491    def test_whence(self):
1492        with self.subTest('main'):
1493            interpid, *_ = _interpreters.get_main()
1494            whence = _interpreters.whence(interpid)
1495            self.assertEqual(whence, _interpreters.WHENCE_RUNTIME)
1496
1497        with self.subTest('stdlib'):
1498            interpid = _interpreters.create()
1499            whence = _interpreters.whence(interpid)
1500            self.assertEqual(whence, _interpreters.WHENCE_STDLIB)
1501
1502        for orig, name in {
1503            _interpreters.WHENCE_UNKNOWN: 'not ready',
1504            _interpreters.WHENCE_LEGACY_CAPI: 'legacy C-API',
1505            _interpreters.WHENCE_CAPI: 'C-API',
1506            _interpreters.WHENCE_XI: 'cross-interpreter C-API',
1507        }.items():
1508            with self.subTest(f'from C-API ({orig}: {name})'):
1509                with self.interpreter_from_capi(whence=orig) as interpid:
1510                    whence = _interpreters.whence(interpid)
1511                self.assertEqual(whence, orig)
1512
1513        with self.subTest('from C-API, running'):
1514            text = self.run_temp_from_capi(dedent(f"""
1515                import _interpreters
1516                interpid, *_ = _interpreters.get_current()
1517                print(_interpreters.whence(interpid))
1518                """),
1519                config=True)
1520            whence = eval(text)
1521            self.assertEqual(whence, _interpreters.WHENCE_CAPI)
1522
1523        with self.subTest('from legacy C-API, running'):
1524            ...
1525            text = self.run_temp_from_capi(dedent(f"""
1526                import _interpreters
1527                interpid, *_ = _interpreters.get_current()
1528                print(_interpreters.whence(interpid))
1529                """),
1530                config=False)
1531            whence = eval(text)
1532            self.assertEqual(whence, _interpreters.WHENCE_LEGACY_CAPI)
1533
1534    def test_is_running(self):
1535        def check(interpid, expected):
1536            with self.assertRaisesRegex(InterpreterError, 'unrecognized'):
1537                _interpreters.is_running(interpid, restrict=True)
1538            running = _interpreters.is_running(interpid)
1539            self.assertIs(running, expected)
1540
1541        with self.subTest('from _interpreters (running)'):
1542            interpid = _interpreters.create()
1543            with self.running(interpid):
1544                running = _interpreters.is_running(interpid)
1545                self.assertTrue(running)
1546
1547        with self.subTest('from _interpreters (not running)'):
1548            interpid = _interpreters.create()
1549            running = _interpreters.is_running(interpid)
1550            self.assertFalse(running)
1551
1552        with self.subTest('main'):
1553            interpid, *_ = _interpreters.get_main()
1554            check(interpid, True)
1555
1556        with self.subTest('from C-API (running __main__)'):
1557            with self.interpreter_from_capi() as interpid:
1558                with self.running_from_capi(interpid, main=True):
1559                    check(interpid, True)
1560
1561        with self.subTest('from C-API (running, but not __main__)'):
1562            with self.interpreter_from_capi() as interpid:
1563                with self.running_from_capi(interpid, main=False):
1564                    check(interpid, False)
1565
1566        with self.subTest('from C-API (not running)'):
1567            with self.interpreter_from_capi() as interpid:
1568                check(interpid, False)
1569
1570    def test_exec(self):
1571        with self.subTest('run script'):
1572            interpid = _interpreters.create()
1573            script, results = _captured_script('print("it worked!", end="")')
1574            with results:
1575                exc = _interpreters.exec(interpid, script)
1576            results = results.final()
1577            results.raise_if_failed()
1578            out = results.stdout
1579            self.assertEqual(out, 'it worked!')
1580
1581        with self.subTest('uncaught exception'):
1582            interpid = _interpreters.create()
1583            script, results = _captured_script("""
1584                raise Exception('uh-oh!')
1585                print("it worked!", end="")
1586                """)
1587            with results:
1588                exc = _interpreters.exec(interpid, script)
1589                out = results.stdout()
1590            self.assertEqual(out, '')
1591            self.assert_ns_equal(exc, types.SimpleNamespace(
1592                type=types.SimpleNamespace(
1593                    __name__='Exception',
1594                    __qualname__='Exception',
1595                    __module__='builtins',
1596                ),
1597                msg='uh-oh!',
1598                # We check these in other tests.
1599                formatted=exc.formatted,
1600                errdisplay=exc.errdisplay,
1601            ))
1602
1603        with self.subTest('from C-API'):
1604            with self.interpreter_from_capi() as interpid:
1605                with self.assertRaisesRegex(InterpreterError, 'unrecognized'):
1606                    _interpreters.exec(interpid, 'raise Exception("it worked!")',
1607                                       restrict=True)
1608                exc = _interpreters.exec(interpid, 'raise Exception("it worked!")')
1609            self.assertIsNot(exc, None)
1610            self.assertEqual(exc.msg, 'it worked!')
1611
1612    def test_call(self):
1613        with self.subTest('no args'):
1614            interpid = _interpreters.create()
1615            exc = _interpreters.call(interpid, call_func_return_shareable)
1616            self.assertIs(exc, None)
1617
1618        with self.subTest('uncaught exception'):
1619            interpid = _interpreters.create()
1620            exc = _interpreters.call(interpid, call_func_failure)
1621            self.assertEqual(exc, types.SimpleNamespace(
1622                type=types.SimpleNamespace(
1623                    __name__='Exception',
1624                    __qualname__='Exception',
1625                    __module__='builtins',
1626                ),
1627                msg='spam!',
1628                # We check these in other tests.
1629                formatted=exc.formatted,
1630                errdisplay=exc.errdisplay,
1631            ))
1632
1633    @requires_test_modules
1634    def test_set___main___attrs(self):
1635        with self.subTest('from _interpreters'):
1636            interpid = _interpreters.create()
1637            before1 = _interpreters.exec(interpid, 'assert spam == \'eggs\'')
1638            before2 = _interpreters.exec(interpid, 'assert ham == 42')
1639            self.assertEqual(before1.type.__name__, 'NameError')
1640            self.assertEqual(before2.type.__name__, 'NameError')
1641
1642            _interpreters.set___main___attrs(interpid, dict(
1643                spam='eggs',
1644                ham=42,
1645            ))
1646            after1 = _interpreters.exec(interpid, 'assert spam == \'eggs\'')
1647            after2 = _interpreters.exec(interpid, 'assert ham == 42')
1648            after3 = _interpreters.exec(interpid, 'assert spam == 42')
1649            self.assertIs(after1, None)
1650            self.assertIs(after2, None)
1651            self.assertEqual(after3.type.__name__, 'AssertionError')
1652
1653            with self.assertRaises(ValueError):
1654                # GH-127165: Embedded NULL characters broke the lookup
1655                _interpreters.set___main___attrs(interpid, {"\x00": 1})
1656
1657        with self.subTest('from C-API'):
1658            with self.interpreter_from_capi() as interpid:
1659                with self.assertRaisesRegex(InterpreterError, 'unrecognized'):
1660                    _interpreters.set___main___attrs(interpid, {'spam': True},
1661                                                     restrict=True)
1662                _interpreters.set___main___attrs(interpid, {'spam': True})
1663                rc = _testinternalcapi.exec_interpreter(
1664                    interpid,
1665                    'assert spam is True',
1666                )
1667            self.assertEqual(rc, 0)
1668
1669
1670if __name__ == '__main__':
1671    # Test needs to be a package, so we can do relative imports.
1672    unittest.main()
1673