1from collections import namedtuple 2import contextlib 3import json 4import io 5import os 6import os.path 7import pickle 8import queue 9#import select 10import subprocess 11import sys 12import tempfile 13from textwrap import dedent, indent 14import threading 15import types 16import unittest 17import warnings 18 19from test import support 20 21# We would use test.support.import_helper.import_module(), 22# but the indirect import of test.support.os_helper causes refleaks. 23try: 24 import _interpreters 25except ImportError as exc: 26 raise unittest.SkipTest(str(exc)) 27from test.support import interpreters 28 29 30try: 31 import _testinternalcapi 32 import _testcapi 33except ImportError: 34 _testinternalcapi = None 35 _testcapi = None 36 37def requires_test_modules(func): 38 return unittest.skipIf(_testinternalcapi is None, "test requires _testinternalcapi module")(func) 39 40 41def _dump_script(text): 42 lines = text.splitlines() 43 print() 44 print('-' * 20) 45 for i, line in enumerate(lines, 1): 46 print(f' {i:>{len(str(len(lines)))}} {line}') 47 print('-' * 20) 48 49 50def _close_file(file): 51 try: 52 if hasattr(file, 'close'): 53 file.close() 54 else: 55 os.close(file) 56 except OSError as exc: 57 if exc.errno != 9: 58 raise # re-raise 59 # It was closed already. 60 61 62def pack_exception(exc=None): 63 captured = _interpreters.capture_exception(exc) 64 data = dict(captured.__dict__) 65 data['type'] = dict(captured.type.__dict__) 66 return json.dumps(data) 67 68 69def unpack_exception(packed): 70 try: 71 data = json.loads(packed) 72 except json.decoder.JSONDecodeError: 73 warnings.warn('incomplete exception data', RuntimeWarning) 74 print(packed if isinstance(packed, str) else packed.decode('utf-8')) 75 return None 76 exc = types.SimpleNamespace(**data) 77 exc.type = types.SimpleNamespace(**exc.type) 78 return exc; 79 80 81class CapturingResults: 82 83 STDIO = dedent("""\ 84 with open({w_pipe}, 'wb', buffering=0) as _spipe_{stream}: 85 _captured_std{stream} = io.StringIO() 86 with contextlib.redirect_std{stream}(_captured_std{stream}): 87 ######################### 88 # begin wrapped script 89 90 {indented} 91 92 # end wrapped script 93 ######################### 94 text = _captured_std{stream}.getvalue() 95 _spipe_{stream}.write(text.encode('utf-8')) 96 """)[:-1] 97 EXC = dedent("""\ 98 with open({w_pipe}, 'wb', buffering=0) as _spipe_exc: 99 try: 100 ######################### 101 # begin wrapped script 102 103 {indented} 104 105 # end wrapped script 106 ######################### 107 except Exception as exc: 108 text = _interp_utils.pack_exception(exc) 109 _spipe_exc.write(text.encode('utf-8')) 110 """)[:-1] 111 112 @classmethod 113 def wrap_script(cls, script, *, stdout=True, stderr=False, exc=False): 114 script = dedent(script).strip(os.linesep) 115 imports = [ 116 f'import {__name__} as _interp_utils', 117 ] 118 wrapped = script 119 120 # Handle exc. 121 if exc: 122 exc = os.pipe() 123 r_exc, w_exc = exc 124 indented = wrapped.replace('\n', '\n ') 125 wrapped = cls.EXC.format( 126 w_pipe=w_exc, 127 indented=indented, 128 ) 129 else: 130 exc = None 131 132 # Handle stdout. 133 if stdout: 134 imports.extend([ 135 'import contextlib, io', 136 ]) 137 stdout = os.pipe() 138 r_out, w_out = stdout 139 indented = wrapped.replace('\n', '\n ') 140 wrapped = cls.STDIO.format( 141 w_pipe=w_out, 142 indented=indented, 143 stream='out', 144 ) 145 else: 146 stdout = None 147 148 # Handle stderr. 149 if stderr == 'stdout': 150 stderr = None 151 elif stderr: 152 if not stdout: 153 imports.extend([ 154 'import contextlib, io', 155 ]) 156 stderr = os.pipe() 157 r_err, w_err = stderr 158 indented = wrapped.replace('\n', '\n ') 159 wrapped = cls.STDIO.format( 160 w_pipe=w_err, 161 indented=indented, 162 stream='err', 163 ) 164 else: 165 stderr = None 166 167 if wrapped == script: 168 raise NotImplementedError 169 else: 170 for line in imports: 171 wrapped = f'{line}{os.linesep}{wrapped}' 172 173 results = cls(stdout, stderr, exc) 174 return wrapped, results 175 176 def __init__(self, out, err, exc): 177 self._rf_out = None 178 self._rf_err = None 179 self._rf_exc = None 180 self._w_out = None 181 self._w_err = None 182 self._w_exc = None 183 184 if out is not None: 185 r_out, w_out = out 186 self._rf_out = open(r_out, 'rb', buffering=0) 187 self._w_out = w_out 188 189 if err is not None: 190 r_err, w_err = err 191 self._rf_err = open(r_err, 'rb', buffering=0) 192 self._w_err = w_err 193 194 if exc is not None: 195 r_exc, w_exc = exc 196 self._rf_exc = open(r_exc, 'rb', buffering=0) 197 self._w_exc = w_exc 198 199 self._buf_out = b'' 200 self._buf_err = b'' 201 self._buf_exc = b'' 202 self._exc = None 203 204 self._closed = False 205 206 def __enter__(self): 207 return self 208 209 def __exit__(self, *args): 210 self.close() 211 212 @property 213 def closed(self): 214 return self._closed 215 216 def close(self): 217 if self._closed: 218 return 219 self._closed = True 220 221 if self._w_out is not None: 222 _close_file(self._w_out) 223 self._w_out = None 224 if self._w_err is not None: 225 _close_file(self._w_err) 226 self._w_err = None 227 if self._w_exc is not None: 228 _close_file(self._w_exc) 229 self._w_exc = None 230 231 self._capture() 232 233 if self._rf_out is not None: 234 _close_file(self._rf_out) 235 self._rf_out = None 236 if self._rf_err is not None: 237 _close_file(self._rf_err) 238 self._rf_err = None 239 if self._rf_exc is not None: 240 _close_file(self._rf_exc) 241 self._rf_exc = None 242 243 def _capture(self): 244 # Ideally this is called only after the script finishes 245 # (and thus has closed the write end of the pipe. 246 if self._rf_out is not None: 247 chunk = self._rf_out.read(100) 248 while chunk: 249 self._buf_out += chunk 250 chunk = self._rf_out.read(100) 251 if self._rf_err is not None: 252 chunk = self._rf_err.read(100) 253 while chunk: 254 self._buf_err += chunk 255 chunk = self._rf_err.read(100) 256 if self._rf_exc is not None: 257 chunk = self._rf_exc.read(100) 258 while chunk: 259 self._buf_exc += chunk 260 chunk = self._rf_exc.read(100) 261 262 def _unpack_stdout(self): 263 return self._buf_out.decode('utf-8') 264 265 def _unpack_stderr(self): 266 return self._buf_err.decode('utf-8') 267 268 def _unpack_exc(self): 269 if self._exc is not None: 270 return self._exc 271 if not self._buf_exc: 272 return None 273 self._exc = unpack_exception(self._buf_exc) 274 return self._exc 275 276 def stdout(self): 277 if self.closed: 278 return self.final().stdout 279 self._capture() 280 return self._unpack_stdout() 281 282 def stderr(self): 283 if self.closed: 284 return self.final().stderr 285 self._capture() 286 return self._unpack_stderr() 287 288 def exc(self): 289 if self.closed: 290 return self.final().exc 291 self._capture() 292 return self._unpack_exc() 293 294 def final(self, *, force=False): 295 try: 296 return self._final 297 except AttributeError: 298 if not self._closed: 299 if not force: 300 raise Exception('no final results available yet') 301 else: 302 return CapturedResults.Proxy(self) 303 self._final = CapturedResults( 304 self._unpack_stdout(), 305 self._unpack_stderr(), 306 self._unpack_exc(), 307 ) 308 return self._final 309 310 311class CapturedResults(namedtuple('CapturedResults', 'stdout stderr exc')): 312 313 class Proxy: 314 def __init__(self, capturing): 315 self._capturing = capturing 316 def _finish(self): 317 if self._capturing is None: 318 return 319 self._final = self._capturing.final() 320 self._capturing = None 321 def __iter__(self): 322 self._finish() 323 yield from self._final 324 def __len__(self): 325 self._finish() 326 return len(self._final) 327 def __getattr__(self, name): 328 self._finish() 329 if name.startswith('_'): 330 raise AttributeError(name) 331 return getattr(self._final, name) 332 333 def raise_if_failed(self): 334 if self.exc is not None: 335 raise interpreters.ExecutionFailed(self.exc) 336 337 338def _captured_script(script, *, stdout=True, stderr=False, exc=False): 339 return CapturingResults.wrap_script( 340 script, 341 stdout=stdout, 342 stderr=stderr, 343 exc=exc, 344 ) 345 346 347def clean_up_interpreters(): 348 for interp in interpreters.list_all(): 349 if interp.id == 0: # main 350 continue 351 try: 352 interp.close() 353 except _interpreters.InterpreterError: 354 pass # already destroyed 355 356 357def _run_output(interp, request, init=None): 358 script, results = _captured_script(request) 359 with results: 360 if init: 361 interp.prepare_main(init) 362 interp.exec(script) 363 return results.stdout() 364 365 366@contextlib.contextmanager 367def _running(interp): 368 r, w = os.pipe() 369 def run(): 370 interp.exec(dedent(f""" 371 # wait for "signal" 372 with open({r}) as rpipe: 373 rpipe.read() 374 """)) 375 376 t = threading.Thread(target=run) 377 t.start() 378 379 yield 380 381 with open(w, 'w') as spipe: 382 spipe.write('done') 383 t.join() 384 385 386class TestBase(unittest.TestCase): 387 388 def tearDown(self): 389 clean_up_interpreters() 390 391 def pipe(self): 392 def ensure_closed(fd): 393 try: 394 os.close(fd) 395 except OSError: 396 pass 397 r, w = os.pipe() 398 self.addCleanup(lambda: ensure_closed(r)) 399 self.addCleanup(lambda: ensure_closed(w)) 400 return r, w 401 402 def temp_dir(self): 403 tempdir = tempfile.mkdtemp() 404 tempdir = os.path.realpath(tempdir) 405 from test.support import os_helper 406 self.addCleanup(lambda: os_helper.rmtree(tempdir)) 407 return tempdir 408 409 @contextlib.contextmanager 410 def captured_thread_exception(self): 411 ctx = types.SimpleNamespace(caught=None) 412 def excepthook(args): 413 ctx.caught = args 414 orig_excepthook = threading.excepthook 415 threading.excepthook = excepthook 416 try: 417 yield ctx 418 finally: 419 threading.excepthook = orig_excepthook 420 421 def make_script(self, filename, dirname=None, text=None): 422 if text: 423 text = dedent(text) 424 if dirname is None: 425 dirname = self.temp_dir() 426 filename = os.path.join(dirname, filename) 427 428 os.makedirs(os.path.dirname(filename), exist_ok=True) 429 with open(filename, 'w', encoding='utf-8') as outfile: 430 outfile.write(text or '') 431 return filename 432 433 def make_module(self, name, pathentry=None, text=None): 434 if text: 435 text = dedent(text) 436 if pathentry is None: 437 pathentry = self.temp_dir() 438 else: 439 os.makedirs(pathentry, exist_ok=True) 440 *subnames, basename = name.split('.') 441 442 dirname = pathentry 443 for subname in subnames: 444 dirname = os.path.join(dirname, subname) 445 if os.path.isdir(dirname): 446 pass 447 elif os.path.exists(dirname): 448 raise Exception(dirname) 449 else: 450 os.mkdir(dirname) 451 initfile = os.path.join(dirname, '__init__.py') 452 if not os.path.exists(initfile): 453 with open(initfile, 'w'): 454 pass 455 filename = os.path.join(dirname, basename + '.py') 456 457 with open(filename, 'w', encoding='utf-8') as outfile: 458 outfile.write(text or '') 459 return filename 460 461 @support.requires_subprocess() 462 def run_python(self, *argv): 463 proc = subprocess.run( 464 [sys.executable, *argv], 465 capture_output=True, 466 text=True, 467 ) 468 return proc.returncode, proc.stdout, proc.stderr 469 470 def assert_python_ok(self, *argv): 471 exitcode, stdout, stderr = self.run_python(*argv) 472 self.assertNotEqual(exitcode, 1) 473 return stdout, stderr 474 475 def assert_python_failure(self, *argv): 476 exitcode, stdout, stderr = self.run_python(*argv) 477 self.assertNotEqual(exitcode, 0) 478 return stdout, stderr 479 480 def assert_ns_equal(self, ns1, ns2, msg=None): 481 # This is mostly copied from TestCase.assertDictEqual. 482 self.assertEqual(type(ns1), type(ns2)) 483 if ns1 == ns2: 484 return 485 486 import difflib 487 import pprint 488 from unittest.util import _common_shorten_repr 489 standardMsg = '%s != %s' % _common_shorten_repr(ns1, ns2) 490 diff = ('\n' + '\n'.join(difflib.ndiff( 491 pprint.pformat(vars(ns1)).splitlines(), 492 pprint.pformat(vars(ns2)).splitlines()))) 493 diff = f'namespace({diff})' 494 standardMsg = self._truncateMessage(standardMsg, diff) 495 self.fail(self._formatMessage(msg, standardMsg)) 496 497 def _run_string(self, interp, script): 498 wrapped, results = _captured_script(script, exc=False) 499 #_dump_script(wrapped) 500 with results: 501 if isinstance(interp, interpreters.Interpreter): 502 interp.exec(script) 503 else: 504 err = _interpreters.run_string(interp, wrapped) 505 if err is not None: 506 return None, err 507 return results.stdout(), None 508 509 def run_and_capture(self, interp, script): 510 text, err = self._run_string(interp, script) 511 if err is not None: 512 raise interpreters.ExecutionFailed(err) 513 else: 514 return text 515 516 def interp_exists(self, interpid): 517 try: 518 _interpreters.whence(interpid) 519 except _interpreters.InterpreterNotFoundError: 520 return False 521 else: 522 return True 523 524 @requires_test_modules 525 @contextlib.contextmanager 526 def interpreter_from_capi(self, config=None, whence=None): 527 if config is False: 528 if whence is None: 529 whence = _interpreters.WHENCE_LEGACY_CAPI 530 else: 531 assert whence in (_interpreters.WHENCE_LEGACY_CAPI, 532 _interpreters.WHENCE_UNKNOWN), repr(whence) 533 config = None 534 elif config is True: 535 config = _interpreters.new_config('default') 536 elif config is None: 537 if whence not in ( 538 _interpreters.WHENCE_LEGACY_CAPI, 539 _interpreters.WHENCE_UNKNOWN, 540 ): 541 config = _interpreters.new_config('legacy') 542 elif isinstance(config, str): 543 config = _interpreters.new_config(config) 544 545 if whence is None: 546 whence = _interpreters.WHENCE_XI 547 548 interpid = _testinternalcapi.create_interpreter(config, whence=whence) 549 try: 550 yield interpid 551 finally: 552 try: 553 _testinternalcapi.destroy_interpreter(interpid) 554 except _interpreters.InterpreterNotFoundError: 555 pass 556 557 @contextlib.contextmanager 558 def interpreter_obj_from_capi(self, config='legacy'): 559 with self.interpreter_from_capi(config) as interpid: 560 interp = interpreters.Interpreter( 561 interpid, 562 _whence=_interpreters.WHENCE_CAPI, 563 _ownsref=False, 564 ) 565 yield interp, interpid 566 567 @contextlib.contextmanager 568 def capturing(self, script): 569 wrapped, capturing = _captured_script(script, stdout=True, exc=True) 570 #_dump_script(wrapped) 571 with capturing: 572 yield wrapped, capturing.final(force=True) 573 574 @requires_test_modules 575 def run_from_capi(self, interpid, script, *, main=False): 576 with self.capturing(script) as (wrapped, results): 577 rc = _testinternalcapi.exec_interpreter(interpid, wrapped, main=main) 578 assert rc == 0, rc 579 results.raise_if_failed() 580 return results.stdout 581 582 @contextlib.contextmanager 583 def _running(self, run_interp, exec_interp): 584 token = b'\0' 585 r_in, w_in = self.pipe() 586 r_out, w_out = self.pipe() 587 588 def close(): 589 _close_file(r_in) 590 _close_file(w_in) 591 _close_file(r_out) 592 _close_file(w_out) 593 594 # Start running (and wait). 595 script = dedent(f""" 596 import os 597 try: 598 # handshake 599 token = os.read({r_in}, 1) 600 os.write({w_out}, token) 601 # Wait for the "done" message. 602 os.read({r_in}, 1) 603 except BrokenPipeError: 604 pass 605 except OSError as exc: 606 if exc.errno != 9: 607 raise # re-raise 608 # It was closed already. 609 """) 610 failed = None 611 def run(): 612 nonlocal failed 613 try: 614 run_interp(script) 615 except Exception as exc: 616 failed = exc 617 close() 618 t = threading.Thread(target=run) 619 t.start() 620 621 # handshake 622 try: 623 os.write(w_in, token) 624 token2 = os.read(r_out, 1) 625 assert token2 == token, (token2, token) 626 except OSError: 627 t.join() 628 if failed is not None: 629 raise failed 630 631 # CM __exit__() 632 try: 633 try: 634 yield 635 finally: 636 # Send "done". 637 os.write(w_in, b'\0') 638 finally: 639 close() 640 t.join() 641 if failed is not None: 642 raise failed 643 644 @contextlib.contextmanager 645 def running(self, interp): 646 if isinstance(interp, int): 647 interpid = interp 648 def exec_interp(script): 649 exc = _interpreters.exec(interpid, script) 650 assert exc is None, exc 651 run_interp = exec_interp 652 else: 653 def run_interp(script): 654 text = self.run_and_capture(interp, script) 655 assert text == '', repr(text) 656 def exec_interp(script): 657 interp.exec(script) 658 with self._running(run_interp, exec_interp): 659 yield 660 661 @requires_test_modules 662 @contextlib.contextmanager 663 def running_from_capi(self, interpid, *, main=False): 664 def run_interp(script): 665 text = self.run_from_capi(interpid, script, main=main) 666 assert text == '', repr(text) 667 def exec_interp(script): 668 rc = _testinternalcapi.exec_interpreter(interpid, script) 669 assert rc == 0, rc 670 with self._running(run_interp, exec_interp): 671 yield 672 673 @requires_test_modules 674 def run_temp_from_capi(self, script, config='legacy'): 675 if config is False: 676 # Force using Py_NewInterpreter(). 677 run_in_interp = (lambda s, c: _testcapi.run_in_subinterp(s)) 678 config = None 679 else: 680 run_in_interp = _testinternalcapi.run_in_subinterp_with_config 681 if config is True: 682 config = 'default' 683 if isinstance(config, str): 684 config = _interpreters.new_config(config) 685 with self.capturing(script) as (wrapped, results): 686 rc = run_in_interp(wrapped, config) 687 assert rc == 0, rc 688 results.raise_if_failed() 689 return results.stdout 690