1import ast 2import asyncio 3import code 4import concurrent.futures 5import inspect 6import sys 7import threading 8import types 9import warnings 10 11from . import futures 12 13 14class AsyncIOInteractiveConsole(code.InteractiveConsole): 15 16 def __init__(self, locals, loop): 17 super().__init__(locals) 18 self.compile.compiler.flags |= ast.PyCF_ALLOW_TOP_LEVEL_AWAIT 19 20 self.loop = loop 21 22 def runcode(self, code): 23 future = concurrent.futures.Future() 24 25 def callback(): 26 global repl_future 27 global repl_future_interrupted 28 29 repl_future = None 30 repl_future_interrupted = False 31 32 func = types.FunctionType(code, self.locals) 33 try: 34 coro = func() 35 except SystemExit: 36 raise 37 except KeyboardInterrupt as ex: 38 repl_future_interrupted = True 39 future.set_exception(ex) 40 return 41 except BaseException as ex: 42 future.set_exception(ex) 43 return 44 45 if not inspect.iscoroutine(coro): 46 future.set_result(coro) 47 return 48 49 try: 50 repl_future = self.loop.create_task(coro) 51 futures._chain_future(repl_future, future) 52 except BaseException as exc: 53 future.set_exception(exc) 54 55 loop.call_soon_threadsafe(callback) 56 57 try: 58 return future.result() 59 except SystemExit: 60 raise 61 except BaseException: 62 if repl_future_interrupted: 63 self.write("\nKeyboardInterrupt\n") 64 else: 65 self.showtraceback() 66 67 68class REPLThread(threading.Thread): 69 70 def run(self): 71 try: 72 banner = ( 73 f'asyncio REPL {sys.version} on {sys.platform}\n' 74 f'Use "await" directly instead of "asyncio.run()".\n' 75 f'Type "help", "copyright", "credits" or "license" ' 76 f'for more information.\n' 77 f'{getattr(sys, "ps1", ">>> ")}import asyncio' 78 ) 79 80 console.interact( 81 banner=banner, 82 exitmsg='exiting asyncio REPL...') 83 finally: 84 warnings.filterwarnings( 85 'ignore', 86 message=r'^coroutine .* was never awaited$', 87 category=RuntimeWarning) 88 89 loop.call_soon_threadsafe(loop.stop) 90 91 92if __name__ == '__main__': 93 loop = asyncio.new_event_loop() 94 asyncio.set_event_loop(loop) 95 96 repl_locals = {'asyncio': asyncio} 97 for key in {'__name__', '__package__', 98 '__loader__', '__spec__', 99 '__builtins__', '__file__'}: 100 repl_locals[key] = locals()[key] 101 102 console = AsyncIOInteractiveConsole(repl_locals, loop) 103 104 repl_future = None 105 repl_future_interrupted = False 106 107 try: 108 import readline # NoQA 109 except ImportError: 110 pass 111 112 repl_thread = REPLThread() 113 repl_thread.daemon = True 114 repl_thread.start() 115 116 while True: 117 try: 118 loop.run_forever() 119 except KeyboardInterrupt: 120 if repl_future and not repl_future.done(): 121 repl_future.cancel() 122 repl_future_interrupted = True 123 continue 124 else: 125 break 126