1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""PwPtPythonPane class.""" 15 16from __future__ import annotations 17 18import asyncio 19import functools 20import io 21import logging 22import os 23import sys 24import shlex 25import subprocess 26from typing import Any, Iterable, TYPE_CHECKING 27from unittest.mock import patch 28 29# inclusive-language: disable 30from prompt_toolkit.input import DummyInput as IgnoredInput 31 32# inclusive-language: enable 33 34from prompt_toolkit.output.plain_text import PlainTextOutput 35from prompt_toolkit.buffer import Buffer 36from prompt_toolkit.layout.controls import BufferControl 37from prompt_toolkit.completion import merge_completers 38from prompt_toolkit.filters import ( 39 Condition, 40 has_focus, 41 to_filter, 42) 43from prompt_toolkit.formatted_text import StyleAndTextTuples 44 45from ptpython.completer import ( # type: ignore 46 CompletePrivateAttributes, 47 PythonCompleter, 48) 49from ptpython.printer import OutputPrinter 50import ptpython.repl # type: ignore 51from ptpython.layout import ( # type: ignore 52 CompletionVisualisation, 53 Dimension, 54) 55import pygments.plugin 56 57from pw_console.pigweed_code_style import ( 58 PigweedCodeStyle, 59 PigweedCodeLightStyle, 60) 61from pw_console.text_formatting import remove_formatting 62 63if TYPE_CHECKING: 64 from pw_console.repl_pane import ReplPane 65 66_LOG = logging.getLogger(__package__) 67_SYSTEM_COMMAND_LOG = logging.getLogger('pw_console_system_command') 68 69_original_find_plugin_styles = pygments.plugin.find_plugin_styles 70 71 72def _wrapped_find_plugin_styles(): 73 """Patch pygment find_plugin_styles to also include Pigweed codes styles 74 75 This allows using these themes without requiring Python entrypoints. 76 """ 77 for style in [ 78 ('pigweed-code', PigweedCodeStyle), 79 ('pigweed-code-light', PigweedCodeLightStyle), 80 ]: 81 yield style 82 yield from _original_find_plugin_styles() 83 84 85class MissingPtpythonBufferControl(Exception): 86 """Exception for a missing ptpython BufferControl object.""" 87 88 89def _user_input_is_a_shell_command(text: str) -> bool: 90 return text.startswith('!') 91 92 93class PwPtPythonRepl( 94 ptpython.repl.PythonRepl 95): # pylint: disable=too-many-instance-attributes 96 """A ptpython repl class with changes to code execution and output related 97 methods.""" 98 99 @patch( 100 'pygments.styles.find_plugin_styles', new=_wrapped_find_plugin_styles 101 ) 102 def __init__( 103 self, 104 *args, 105 # pw_console specific kwargs 106 extra_completers: Iterable | None = None, 107 **ptpython_kwargs, 108 ): 109 completer = None 110 if extra_completers: 111 # Create the default python completer used by 112 # ptpython.repl.PythonRepl 113 python_completer = PythonCompleter( 114 # No self.get_globals yet so this must be a lambda 115 # pylint: disable=unnecessary-lambda 116 lambda: self.get_globals(), 117 lambda: self.get_locals(), 118 lambda: self.enable_dictionary_completion, # type: ignore 119 ) 120 121 all_completers = [python_completer] 122 all_completers.extend(extra_completers) 123 # Merge default Python completer with the new custom one. 124 completer = merge_completers(all_completers) 125 126 super().__init__( 127 *args, 128 create_app=False, 129 # Absolute minimum height of 1 130 _input_buffer_height=Dimension(min=1), 131 _completer=completer, 132 **ptpython_kwargs, 133 ) 134 135 self.enable_mouse_support: bool = True 136 self.enable_history_search: bool = True 137 self.enable_dictionary_completion: bool = True 138 self._set_pt_python_input_buffer_control_focusable() 139 140 # Change some ptpython.repl defaults. 141 self.show_status_bar = False 142 self.show_exit_confirmation = False 143 self.complete_private_attributes = CompletePrivateAttributes.NEVER 144 145 # Function signature that shows args, kwargs, and types under the cursor 146 # of the input window. 147 self.show_signature: bool = True 148 # Docstring of the current completed function that appears at the bottom 149 # of the input window. 150 self.show_docstring: bool = False 151 152 # Turn off the completion menu in ptpython. The CompletionsMenu in 153 # ConsoleApp.root_container will handle this. 154 self.completion_visualisation: CompletionVisualisation = ( 155 CompletionVisualisation.NONE 156 ) 157 158 # Additional state variables. 159 self.repl_pane: ReplPane | None = None 160 self._last_result = None 161 self._last_exception = None 162 163 def _set_pt_python_input_buffer_control_focusable(self) -> None: 164 """Enable focus_on_click for ptpython's input buffer.""" 165 error_message = ( 166 'Unable to find ptpythons BufferControl input object.\n' 167 ' For the last known position see:\n' 168 ' https://github.com/prompt-toolkit/ptpython/' 169 'blob/6072174eace5b645b0cfd5b21b4c237e2539f577/' 170 'ptpython/layout.py#L598\n' 171 '\n' 172 'The installed version of ptpython may not be compatible with' 173 ' pw console; please try re-running environment setup.' 174 ) 175 176 try: 177 # Fetch the Window's BufferControl object. 178 # From ptpython/layout.py: 179 # self.root_container = HSplit([ 180 # VSplit([ 181 # HSplit([ 182 # FloatContainer( 183 # content=HSplit( 184 # [create_python_input_window()] + extra_body 185 # ), ... 186 ptpython_buffer_control = ( 187 self.ptpython_layout.root_container.children[0] # type: ignore 188 .children[0] 189 .children[0] 190 .content.children[0] 191 .content 192 ) 193 # This should be a BufferControl instance 194 if not isinstance(ptpython_buffer_control, BufferControl): 195 raise MissingPtpythonBufferControl(error_message) 196 # Enable focus options 197 ptpython_buffer_control.focusable = to_filter(True) 198 ptpython_buffer_control.focus_on_click = to_filter(True) 199 except IndexError as _error: 200 raise MissingPtpythonBufferControl(error_message) 201 202 def __pt_container__(self): 203 """Return the prompt_toolkit root container for class. 204 205 This allows self to be used wherever prompt_toolkit expects a container 206 object.""" 207 return self.ptpython_layout.root_container 208 209 def set_repl_pane(self, repl_pane): 210 """Update the parent pw_console.ReplPane reference.""" 211 self.repl_pane = repl_pane 212 213 def _save_result(self, formatted_text): 214 """Save the last repl execution result.""" 215 unformatted_result = formatted_text 216 self._last_result = unformatted_result 217 218 def _save_exception(self, formatted_text): 219 """Save the last repl exception.""" 220 unformatted_result = remove_formatting(formatted_text) 221 self._last_exception = unformatted_result 222 223 def clear_last_result(self): 224 """Erase the last repl execution result.""" 225 self._last_result = None 226 self._last_exception = None 227 228 def _format_result_output(self, result: Any) -> str | None: 229 """Return a plaintext repr of any object.""" 230 try: 231 formatted_result = repr(result) 232 except BaseException as e: # pylint: disable=broad-exception-caught 233 self._handle_exception(e) 234 formatted_result = None 235 return formatted_result 236 237 def _show_result(self, result: Any): 238 """Format and save output results. 239 240 This function is called from the _run_user_code() function which is 241 always run from the user code thread, within 242 .run_and_show_expression_async(). 243 """ 244 self._save_result(self._format_result_output(result)) 245 246 def _get_output_printer(self) -> OutputPrinter: 247 return OutputPrinter( 248 output=PlainTextOutput(io.StringIO()), 249 input=IgnoredInput(), 250 style=self._current_style, 251 style_transformation=self.style_transformation, 252 title=self.title, 253 ) 254 255 def _format_exception_output(self, e: BaseException) -> StyleAndTextTuples: 256 output_printer = self._get_output_printer() 257 formatted_result = output_printer._format_exception_output( # pylint: disable=protected-access 258 e, highlight=False 259 ) 260 return list(formatted_result) 261 262 def _handle_exception(self, e: BaseException) -> None: 263 """Format and save output results. 264 265 This function is called from the _run_user_code() function which is 266 always run from the user code thread, within 267 .run_and_show_expression_async(). 268 """ 269 self._save_exception(self._format_exception_output(e)) 270 271 async def run_and_show_expression_async(self, text: str) -> Any: 272 """Run user code and handle the result. 273 274 This function is similar to ptpython version v3.0.23. 275 """ 276 loop = asyncio.get_event_loop() 277 278 try: 279 result = await self.eval_async(text) 280 except KeyboardInterrupt: 281 raise 282 except SystemExit: 283 return 284 except BaseException as e: # pylint: disable=broad-exception-caught 285 self._handle_exception(e) 286 else: 287 # Print. 288 if result is not None: 289 await loop.run_in_executor( 290 None, lambda: self._show_result(result) 291 ) 292 293 self.current_statement_index += 1 294 self.signatures = [] 295 return result 296 297 def user_code_complete_callback(self, input_text, future): 298 """Callback to run after user repl code is finished.""" 299 # If there was an exception it will be saved in self._last_result 300 result_text = self._last_result 301 result_object = None 302 exception_text = self._last_exception 303 304 # _last_results consumed, erase for the next run. 305 self.clear_last_result() 306 307 stdout_contents = None 308 stderr_contents = None 309 if future.result(): 310 future_result = future.result() 311 stdout_contents = future_result['stdout'] 312 stderr_contents = future_result['stderr'] 313 result_object = future_result['result'] 314 315 if result_object is not None: 316 # Use ptpython formatted results: 317 result_text = self._format_result_output(result_object) 318 319 # Job is finished, append the last result. 320 self.repl_pane.append_result_to_executed_code( 321 input_text, 322 future, 323 result_text, 324 stdout_contents, 325 stderr_contents, 326 exception_text=exception_text, 327 result_object=result_object, 328 ) 329 330 # Rebuild output buffer. 331 self.repl_pane.update_output_buffer( 332 'pw_ptpython_repl.user_code_complete_callback' 333 ) 334 335 # Trigger a prompt_toolkit application redraw. 336 self.repl_pane.application.application.invalidate() 337 338 async def _run_system_command( # pylint: disable=no-self-use 339 self, text, stdout_proxy, _stdin_proxy 340 ) -> int: 341 """Run a shell command and print results to the repl.""" 342 command = shlex.split(text) 343 returncode = None 344 env = os.environ.copy() 345 # Force colors in Pigweed subcommands and some terminal apps. 346 env['PW_USE_COLOR'] = '1' 347 env['CLICOLOR_FORCE'] = '1' 348 349 def _handle_output(output): 350 # Force tab characters to 8 spaces to prevent \t from showing in 351 # prompt_toolkit. 352 output = output.replace('\t', ' ') 353 # Strip some ANSI sequences that don't render. 354 output = output.replace('\x1b(B\x1b[m', '') 355 output = output.replace('\x1b[1m', '') 356 stdout_proxy.write(output) 357 _SYSTEM_COMMAND_LOG.info(output.rstrip()) 358 359 with subprocess.Popen( 360 command, 361 env=env, 362 stdout=subprocess.PIPE, 363 stderr=subprocess.STDOUT, 364 errors='replace', 365 ) as proc: 366 # Print the command 367 _SYSTEM_COMMAND_LOG.info('') 368 _SYSTEM_COMMAND_LOG.info('$ %s', text) 369 while returncode is None: 370 if not proc.stdout: 371 continue 372 373 # Check for one line and update. 374 output = proc.stdout.readline() 375 _handle_output(output) 376 377 returncode = proc.poll() 378 379 # Print any remaining lines. 380 for output in proc.stdout.readlines(): 381 _handle_output(output) 382 383 return returncode 384 385 async def _run_user_code(self, text, stdout_proxy, stdin_proxy): 386 """Run user code and capture stdout+err. 387 388 This fuction should be run in a separate thread from the main 389 prompt_toolkit application.""" 390 # NOTE: This function runs in a separate thread using the asyncio event 391 # loop defined by self.repl_pane.application.user_code_loop. Patching 392 # stdout here will not effect the stdout used by prompt_toolkit and the 393 # main user interface. 394 395 # Patch stdout and stderr to capture repl print() statements. 396 original_stdout = sys.stdout 397 original_stderr = sys.stderr 398 399 sys.stdout = stdout_proxy 400 sys.stderr = stdin_proxy 401 402 # Run user repl code 403 try: 404 if _user_input_is_a_shell_command(text): 405 result = await self._run_system_command( 406 text[1:], stdout_proxy, stdin_proxy 407 ) 408 else: 409 result = await self.run_and_show_expression_async(text) 410 finally: 411 # Always restore original stdout and stderr 412 sys.stdout = original_stdout 413 sys.stderr = original_stderr 414 415 # Save the captured output 416 stdout_contents = stdout_proxy.getvalue() 417 stderr_contents = stdin_proxy.getvalue() 418 419 return { 420 'stdout': stdout_contents, 421 'stderr': stderr_contents, 422 'result': result, 423 } 424 425 def _accept_handler(self, buff: Buffer) -> bool: 426 """Function executed when pressing enter in the ptpython.repl.PythonRepl 427 input buffer.""" 428 # Do nothing if no text is entered. 429 if len(buff.text) == 0: 430 return False 431 if self.repl_pane is None: 432 return False 433 434 repl_input_text = buff.text 435 # Exit if quit or exit 436 if repl_input_text.strip() in ['quit', 'quit()', 'exit', 'exit()']: 437 self.repl_pane.application.application.exit() # type: ignore 438 439 # Create stdout and stderr proxies 440 temp_stdout = io.StringIO() 441 temp_stderr = io.StringIO() 442 443 # The help() command with no args uses it's own interactive prompt which 444 # will not work if prompt_toolkit is running. 445 if repl_input_text.strip() in ['help()']: 446 # Run nothing 447 repl_input_text = '' 448 # Override stdout 449 temp_stdout.write( 450 'Error: Interactive help() is not compatible with this repl.' 451 ) 452 453 # Pop open the system command log pane for shell commands. 454 if _user_input_is_a_shell_command(repl_input_text): 455 self.repl_pane.application.setup_command_runner_log_pane() 456 457 # Execute the repl code in the the separate user_code thread loop. 458 future = asyncio.run_coroutine_threadsafe( 459 # This function will be executed in a separate thread. 460 self._run_user_code(repl_input_text, temp_stdout, temp_stderr), 461 # Using this asyncio event loop. 462 self.repl_pane.application.user_code_loop, 463 ) # type: ignore 464 465 # Save the input text and future object. 466 self.repl_pane.append_executed_code( 467 repl_input_text, future, temp_stdout, temp_stderr 468 ) # type: ignore 469 470 # Run user_code_complete_callback() when done. 471 done_callback = functools.partial( 472 self.user_code_complete_callback, repl_input_text 473 ) 474 future.add_done_callback(done_callback) 475 476 # Rebuild the parent ReplPane output buffer. 477 self.repl_pane.update_output_buffer('pw_ptpython_repl._accept_handler') 478 479 # TODO(tonymd): Return True if exception is found? 480 # Don't keep input for now. Return True to keep input text. 481 return False 482 483 def line_break_count(self) -> int: 484 return self.default_buffer.text.count('\n') 485 486 def input_empty_if_in_focus_condition(self) -> Condition: 487 @Condition 488 def test() -> bool: 489 if has_focus(self)() and len(self.default_buffer.text) == 0: 490 return True 491 return not has_focus(self)() 492 493 return test 494