• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1from collections import namedtuple
2import contextlib
3import json
4import io
5import os
6import os.path
7import pickle
8import queue
9#import select
10import subprocess
11import sys
12import tempfile
13from textwrap import dedent, indent
14import threading
15import types
16import unittest
17import warnings
18
19from test import support
20
21# We would use test.support.import_helper.import_module(),
22# but the indirect import of test.support.os_helper causes refleaks.
23try:
24    import _interpreters
25except ImportError as exc:
26    raise unittest.SkipTest(str(exc))
27from test.support import interpreters
28
29
30try:
31    import _testinternalcapi
32    import _testcapi
33except ImportError:
34    _testinternalcapi = None
35    _testcapi = None
36
37def requires_test_modules(func):
38    return unittest.skipIf(_testinternalcapi is None, "test requires _testinternalcapi module")(func)
39
40
41def _dump_script(text):
42    lines = text.splitlines()
43    print()
44    print('-' * 20)
45    for i, line in enumerate(lines, 1):
46        print(f' {i:>{len(str(len(lines)))}}  {line}')
47    print('-' * 20)
48
49
50def _close_file(file):
51    try:
52        if hasattr(file, 'close'):
53            file.close()
54        else:
55            os.close(file)
56    except OSError as exc:
57        if exc.errno != 9:
58            raise  # re-raise
59        # It was closed already.
60
61
62def pack_exception(exc=None):
63    captured = _interpreters.capture_exception(exc)
64    data = dict(captured.__dict__)
65    data['type'] = dict(captured.type.__dict__)
66    return json.dumps(data)
67
68
69def unpack_exception(packed):
70    try:
71        data = json.loads(packed)
72    except json.decoder.JSONDecodeError:
73        warnings.warn('incomplete exception data', RuntimeWarning)
74        print(packed if isinstance(packed, str) else packed.decode('utf-8'))
75        return None
76    exc = types.SimpleNamespace(**data)
77    exc.type = types.SimpleNamespace(**exc.type)
78    return exc;
79
80
81class CapturingResults:
82
83    STDIO = dedent("""\
84        with open({w_pipe}, 'wb', buffering=0) as _spipe_{stream}:
85            _captured_std{stream} = io.StringIO()
86            with contextlib.redirect_std{stream}(_captured_std{stream}):
87                #########################
88                # begin wrapped script
89
90                {indented}
91
92                # end wrapped script
93                #########################
94            text = _captured_std{stream}.getvalue()
95            _spipe_{stream}.write(text.encode('utf-8'))
96        """)[:-1]
97    EXC = dedent("""\
98        with open({w_pipe}, 'wb', buffering=0) as _spipe_exc:
99            try:
100                #########################
101                # begin wrapped script
102
103                {indented}
104
105                # end wrapped script
106                #########################
107            except Exception as exc:
108                text = _interp_utils.pack_exception(exc)
109                _spipe_exc.write(text.encode('utf-8'))
110        """)[:-1]
111
112    @classmethod
113    def wrap_script(cls, script, *, stdout=True, stderr=False, exc=False):
114        script = dedent(script).strip(os.linesep)
115        imports = [
116            f'import {__name__} as _interp_utils',
117        ]
118        wrapped = script
119
120        # Handle exc.
121        if exc:
122            exc = os.pipe()
123            r_exc, w_exc = exc
124            indented = wrapped.replace('\n', '\n        ')
125            wrapped = cls.EXC.format(
126                w_pipe=w_exc,
127                indented=indented,
128            )
129        else:
130            exc = None
131
132        # Handle stdout.
133        if stdout:
134            imports.extend([
135                'import contextlib, io',
136            ])
137            stdout = os.pipe()
138            r_out, w_out = stdout
139            indented = wrapped.replace('\n', '\n        ')
140            wrapped = cls.STDIO.format(
141                w_pipe=w_out,
142                indented=indented,
143                stream='out',
144            )
145        else:
146            stdout = None
147
148        # Handle stderr.
149        if stderr == 'stdout':
150            stderr = None
151        elif stderr:
152            if not stdout:
153                imports.extend([
154                    'import contextlib, io',
155                ])
156            stderr = os.pipe()
157            r_err, w_err = stderr
158            indented = wrapped.replace('\n', '\n        ')
159            wrapped = cls.STDIO.format(
160                w_pipe=w_err,
161                indented=indented,
162                stream='err',
163            )
164        else:
165            stderr = None
166
167        if wrapped == script:
168            raise NotImplementedError
169        else:
170            for line in imports:
171                wrapped = f'{line}{os.linesep}{wrapped}'
172
173        results = cls(stdout, stderr, exc)
174        return wrapped, results
175
176    def __init__(self, out, err, exc):
177        self._rf_out = None
178        self._rf_err = None
179        self._rf_exc = None
180        self._w_out = None
181        self._w_err = None
182        self._w_exc = None
183
184        if out is not None:
185            r_out, w_out = out
186            self._rf_out = open(r_out, 'rb', buffering=0)
187            self._w_out = w_out
188
189        if err is not None:
190            r_err, w_err = err
191            self._rf_err = open(r_err, 'rb', buffering=0)
192            self._w_err = w_err
193
194        if exc is not None:
195            r_exc, w_exc = exc
196            self._rf_exc = open(r_exc, 'rb', buffering=0)
197            self._w_exc = w_exc
198
199        self._buf_out = b''
200        self._buf_err = b''
201        self._buf_exc = b''
202        self._exc = None
203
204        self._closed = False
205
206    def __enter__(self):
207        return self
208
209    def __exit__(self, *args):
210        self.close()
211
212    @property
213    def closed(self):
214        return self._closed
215
216    def close(self):
217        if self._closed:
218            return
219        self._closed = True
220
221        if self._w_out is not None:
222            _close_file(self._w_out)
223            self._w_out = None
224        if self._w_err is not None:
225            _close_file(self._w_err)
226            self._w_err = None
227        if self._w_exc is not None:
228            _close_file(self._w_exc)
229            self._w_exc = None
230
231        self._capture()
232
233        if self._rf_out is not None:
234            _close_file(self._rf_out)
235            self._rf_out = None
236        if self._rf_err is not None:
237            _close_file(self._rf_err)
238            self._rf_err = None
239        if self._rf_exc is not None:
240            _close_file(self._rf_exc)
241            self._rf_exc = None
242
243    def _capture(self):
244        # Ideally this is called only after the script finishes
245        # (and thus has closed the write end of the pipe.
246        if self._rf_out is not None:
247            chunk = self._rf_out.read(100)
248            while chunk:
249                self._buf_out += chunk
250                chunk = self._rf_out.read(100)
251        if self._rf_err is not None:
252            chunk = self._rf_err.read(100)
253            while chunk:
254                self._buf_err += chunk
255                chunk = self._rf_err.read(100)
256        if self._rf_exc is not None:
257            chunk = self._rf_exc.read(100)
258            while chunk:
259                self._buf_exc += chunk
260                chunk = self._rf_exc.read(100)
261
262    def _unpack_stdout(self):
263        return self._buf_out.decode('utf-8')
264
265    def _unpack_stderr(self):
266        return self._buf_err.decode('utf-8')
267
268    def _unpack_exc(self):
269        if self._exc is not None:
270            return self._exc
271        if not self._buf_exc:
272            return None
273        self._exc = unpack_exception(self._buf_exc)
274        return self._exc
275
276    def stdout(self):
277        if self.closed:
278            return self.final().stdout
279        self._capture()
280        return self._unpack_stdout()
281
282    def stderr(self):
283        if self.closed:
284            return self.final().stderr
285        self._capture()
286        return self._unpack_stderr()
287
288    def exc(self):
289        if self.closed:
290            return self.final().exc
291        self._capture()
292        return self._unpack_exc()
293
294    def final(self, *, force=False):
295        try:
296            return self._final
297        except AttributeError:
298            if not self._closed:
299                if not force:
300                    raise Exception('no final results available yet')
301                else:
302                    return CapturedResults.Proxy(self)
303            self._final = CapturedResults(
304                self._unpack_stdout(),
305                self._unpack_stderr(),
306                self._unpack_exc(),
307            )
308            return self._final
309
310
311class CapturedResults(namedtuple('CapturedResults', 'stdout stderr exc')):
312
313    class Proxy:
314        def __init__(self, capturing):
315            self._capturing = capturing
316        def _finish(self):
317            if self._capturing is None:
318                return
319            self._final = self._capturing.final()
320            self._capturing = None
321        def __iter__(self):
322            self._finish()
323            yield from self._final
324        def __len__(self):
325            self._finish()
326            return len(self._final)
327        def __getattr__(self, name):
328            self._finish()
329            if name.startswith('_'):
330                raise AttributeError(name)
331            return getattr(self._final, name)
332
333    def raise_if_failed(self):
334        if self.exc is not None:
335            raise interpreters.ExecutionFailed(self.exc)
336
337
338def _captured_script(script, *, stdout=True, stderr=False, exc=False):
339    return CapturingResults.wrap_script(
340        script,
341        stdout=stdout,
342        stderr=stderr,
343        exc=exc,
344    )
345
346
347def clean_up_interpreters():
348    for interp in interpreters.list_all():
349        if interp.id == 0:  # main
350            continue
351        try:
352            interp.close()
353        except _interpreters.InterpreterError:
354            pass  # already destroyed
355
356
357def _run_output(interp, request, init=None):
358    script, results = _captured_script(request)
359    with results:
360        if init:
361            interp.prepare_main(init)
362        interp.exec(script)
363    return results.stdout()
364
365
366@contextlib.contextmanager
367def _running(interp):
368    r, w = os.pipe()
369    def run():
370        interp.exec(dedent(f"""
371            # wait for "signal"
372            with open({r}) as rpipe:
373                rpipe.read()
374            """))
375
376    t = threading.Thread(target=run)
377    t.start()
378
379    yield
380
381    with open(w, 'w') as spipe:
382        spipe.write('done')
383    t.join()
384
385
386class TestBase(unittest.TestCase):
387
388    def tearDown(self):
389        clean_up_interpreters()
390
391    def pipe(self):
392        def ensure_closed(fd):
393            try:
394                os.close(fd)
395            except OSError:
396                pass
397        r, w = os.pipe()
398        self.addCleanup(lambda: ensure_closed(r))
399        self.addCleanup(lambda: ensure_closed(w))
400        return r, w
401
402    def temp_dir(self):
403        tempdir = tempfile.mkdtemp()
404        tempdir = os.path.realpath(tempdir)
405        from test.support import os_helper
406        self.addCleanup(lambda: os_helper.rmtree(tempdir))
407        return tempdir
408
409    @contextlib.contextmanager
410    def captured_thread_exception(self):
411        ctx = types.SimpleNamespace(caught=None)
412        def excepthook(args):
413            ctx.caught = args
414        orig_excepthook = threading.excepthook
415        threading.excepthook = excepthook
416        try:
417            yield ctx
418        finally:
419            threading.excepthook = orig_excepthook
420
421    def make_script(self, filename, dirname=None, text=None):
422        if text:
423            text = dedent(text)
424        if dirname is None:
425            dirname = self.temp_dir()
426        filename = os.path.join(dirname, filename)
427
428        os.makedirs(os.path.dirname(filename), exist_ok=True)
429        with open(filename, 'w', encoding='utf-8') as outfile:
430            outfile.write(text or '')
431        return filename
432
433    def make_module(self, name, pathentry=None, text=None):
434        if text:
435            text = dedent(text)
436        if pathentry is None:
437            pathentry = self.temp_dir()
438        else:
439            os.makedirs(pathentry, exist_ok=True)
440        *subnames, basename = name.split('.')
441
442        dirname = pathentry
443        for subname in subnames:
444            dirname = os.path.join(dirname, subname)
445            if os.path.isdir(dirname):
446                pass
447            elif os.path.exists(dirname):
448                raise Exception(dirname)
449            else:
450                os.mkdir(dirname)
451            initfile = os.path.join(dirname, '__init__.py')
452            if not os.path.exists(initfile):
453                with open(initfile, 'w'):
454                    pass
455        filename = os.path.join(dirname, basename + '.py')
456
457        with open(filename, 'w', encoding='utf-8') as outfile:
458            outfile.write(text or '')
459        return filename
460
461    @support.requires_subprocess()
462    def run_python(self, *argv):
463        proc = subprocess.run(
464            [sys.executable, *argv],
465            capture_output=True,
466            text=True,
467        )
468        return proc.returncode, proc.stdout, proc.stderr
469
470    def assert_python_ok(self, *argv):
471        exitcode, stdout, stderr = self.run_python(*argv)
472        self.assertNotEqual(exitcode, 1)
473        return stdout, stderr
474
475    def assert_python_failure(self, *argv):
476        exitcode, stdout, stderr = self.run_python(*argv)
477        self.assertNotEqual(exitcode, 0)
478        return stdout, stderr
479
480    def assert_ns_equal(self, ns1, ns2, msg=None):
481        # This is mostly copied from TestCase.assertDictEqual.
482        self.assertEqual(type(ns1), type(ns2))
483        if ns1 == ns2:
484            return
485
486        import difflib
487        import pprint
488        from unittest.util import _common_shorten_repr
489        standardMsg = '%s != %s' % _common_shorten_repr(ns1, ns2)
490        diff = ('\n' + '\n'.join(difflib.ndiff(
491                       pprint.pformat(vars(ns1)).splitlines(),
492                       pprint.pformat(vars(ns2)).splitlines())))
493        diff = f'namespace({diff})'
494        standardMsg = self._truncateMessage(standardMsg, diff)
495        self.fail(self._formatMessage(msg, standardMsg))
496
497    def _run_string(self, interp, script):
498        wrapped, results = _captured_script(script, exc=False)
499        #_dump_script(wrapped)
500        with results:
501            if isinstance(interp, interpreters.Interpreter):
502                interp.exec(script)
503            else:
504                err = _interpreters.run_string(interp, wrapped)
505                if err is not None:
506                    return None, err
507        return results.stdout(), None
508
509    def run_and_capture(self, interp, script):
510        text, err = self._run_string(interp, script)
511        if err is not None:
512            raise interpreters.ExecutionFailed(err)
513        else:
514            return text
515
516    def interp_exists(self, interpid):
517        try:
518            _interpreters.whence(interpid)
519        except _interpreters.InterpreterNotFoundError:
520            return False
521        else:
522            return True
523
524    @requires_test_modules
525    @contextlib.contextmanager
526    def interpreter_from_capi(self, config=None, whence=None):
527        if config is False:
528            if whence is None:
529                whence = _interpreters.WHENCE_LEGACY_CAPI
530            else:
531                assert whence in (_interpreters.WHENCE_LEGACY_CAPI,
532                                  _interpreters.WHENCE_UNKNOWN), repr(whence)
533            config = None
534        elif config is True:
535            config = _interpreters.new_config('default')
536        elif config is None:
537            if whence not in (
538                _interpreters.WHENCE_LEGACY_CAPI,
539                _interpreters.WHENCE_UNKNOWN,
540            ):
541                config = _interpreters.new_config('legacy')
542        elif isinstance(config, str):
543            config = _interpreters.new_config(config)
544
545        if whence is None:
546            whence = _interpreters.WHENCE_XI
547
548        interpid = _testinternalcapi.create_interpreter(config, whence=whence)
549        try:
550            yield interpid
551        finally:
552            try:
553                _testinternalcapi.destroy_interpreter(interpid)
554            except _interpreters.InterpreterNotFoundError:
555                pass
556
557    @contextlib.contextmanager
558    def interpreter_obj_from_capi(self, config='legacy'):
559        with self.interpreter_from_capi(config) as interpid:
560            interp = interpreters.Interpreter(
561                interpid,
562                _whence=_interpreters.WHENCE_CAPI,
563                _ownsref=False,
564            )
565            yield interp, interpid
566
567    @contextlib.contextmanager
568    def capturing(self, script):
569        wrapped, capturing = _captured_script(script, stdout=True, exc=True)
570        #_dump_script(wrapped)
571        with capturing:
572            yield wrapped, capturing.final(force=True)
573
574    @requires_test_modules
575    def run_from_capi(self, interpid, script, *, main=False):
576        with self.capturing(script) as (wrapped, results):
577            rc = _testinternalcapi.exec_interpreter(interpid, wrapped, main=main)
578            assert rc == 0, rc
579        results.raise_if_failed()
580        return results.stdout
581
582    @contextlib.contextmanager
583    def _running(self, run_interp, exec_interp):
584        token = b'\0'
585        r_in, w_in = self.pipe()
586        r_out, w_out = self.pipe()
587
588        def close():
589            _close_file(r_in)
590            _close_file(w_in)
591            _close_file(r_out)
592            _close_file(w_out)
593
594        # Start running (and wait).
595        script = dedent(f"""
596            import os
597            try:
598                # handshake
599                token = os.read({r_in}, 1)
600                os.write({w_out}, token)
601                # Wait for the "done" message.
602                os.read({r_in}, 1)
603            except BrokenPipeError:
604                pass
605            except OSError as exc:
606                if exc.errno != 9:
607                    raise  # re-raise
608                # It was closed already.
609            """)
610        failed = None
611        def run():
612            nonlocal failed
613            try:
614                run_interp(script)
615            except Exception as exc:
616                failed = exc
617                close()
618        t = threading.Thread(target=run)
619        t.start()
620
621        # handshake
622        try:
623            os.write(w_in, token)
624            token2 = os.read(r_out, 1)
625            assert token2 == token, (token2, token)
626        except OSError:
627            t.join()
628            if failed is not None:
629                raise failed
630
631        # CM __exit__()
632        try:
633            try:
634                yield
635            finally:
636                # Send "done".
637                os.write(w_in, b'\0')
638        finally:
639            close()
640            t.join()
641            if failed is not None:
642                raise failed
643
644    @contextlib.contextmanager
645    def running(self, interp):
646        if isinstance(interp, int):
647            interpid = interp
648            def exec_interp(script):
649                exc = _interpreters.exec(interpid, script)
650                assert exc is None, exc
651            run_interp = exec_interp
652        else:
653            def run_interp(script):
654                text = self.run_and_capture(interp, script)
655                assert text == '', repr(text)
656            def exec_interp(script):
657                interp.exec(script)
658        with self._running(run_interp, exec_interp):
659            yield
660
661    @requires_test_modules
662    @contextlib.contextmanager
663    def running_from_capi(self, interpid, *, main=False):
664        def run_interp(script):
665            text = self.run_from_capi(interpid, script, main=main)
666            assert text == '', repr(text)
667        def exec_interp(script):
668            rc = _testinternalcapi.exec_interpreter(interpid, script)
669            assert rc == 0, rc
670        with self._running(run_interp, exec_interp):
671            yield
672
673    @requires_test_modules
674    def run_temp_from_capi(self, script, config='legacy'):
675        if config is False:
676            # Force using Py_NewInterpreter().
677            run_in_interp = (lambda s, c: _testcapi.run_in_subinterp(s))
678            config = None
679        else:
680            run_in_interp = _testinternalcapi.run_in_subinterp_with_config
681            if config is True:
682                config = 'default'
683            if isinstance(config, str):
684                config = _interpreters.new_config(config)
685        with self.capturing(script) as (wrapped, results):
686            rc = run_in_interp(wrapped, config)
687            assert rc == 0, rc
688        results.raise_if_failed()
689        return results.stdout
690