1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""PwPtPythonPane class.""" 15 16import asyncio 17import functools 18import io 19import logging 20import os 21import sys 22import shlex 23import subprocess 24from typing import Iterable, Optional, TYPE_CHECKING 25from unittest.mock import patch 26 27from prompt_toolkit.buffer import Buffer 28from prompt_toolkit.layout.controls import BufferControl 29from prompt_toolkit.completion import merge_completers 30from prompt_toolkit.filters import ( 31 Condition, 32 has_focus, 33 to_filter, 34) 35from ptpython.completer import ( # type: ignore 36 CompletePrivateAttributes, 37 PythonCompleter, 38) 39import ptpython.repl # type: ignore 40from ptpython.layout import ( # type: ignore 41 CompletionVisualisation, 42 Dimension, 43) 44import pygments.plugin 45 46from pw_console.pigweed_code_style import ( 47 PigweedCodeStyle, 48 PigweedCodeLightStyle, 49) 50from pw_console.text_formatting import remove_formatting 51 52if TYPE_CHECKING: 53 from pw_console.repl_pane import ReplPane 54 55_LOG = logging.getLogger(__package__) 56_SYSTEM_COMMAND_LOG = logging.getLogger('pw_console_system_command') 57 58_original_find_plugin_styles = pygments.plugin.find_plugin_styles 59 60 61def _wrapped_find_plugin_styles(): 62 """Patch pygment find_plugin_styles to also include Pigweed codes styles 63 64 This allows using these themes without requiring Python entrypoints. 65 """ 66 for style in [ 67 ('pigweed-code', PigweedCodeStyle), 68 ('pigweed-code-light', PigweedCodeLightStyle), 69 ]: 70 yield style 71 yield from _original_find_plugin_styles() 72 73 74class MissingPtpythonBufferControl(Exception): 75 """Exception for a missing ptpython BufferControl object.""" 76 77 78def _user_input_is_a_shell_command(text: str) -> bool: 79 return text.startswith('!') 80 81 82class PwPtPythonRepl( 83 ptpython.repl.PythonRepl 84): # pylint: disable=too-many-instance-attributes 85 """A ptpython repl class with changes to code execution and output related 86 methods.""" 87 88 @patch( 89 'pygments.styles.find_plugin_styles', new=_wrapped_find_plugin_styles 90 ) 91 def __init__( 92 self, 93 *args, 94 # pw_console specific kwargs 95 extra_completers: Optional[Iterable] = None, 96 **ptpython_kwargs, 97 ): 98 completer = None 99 if extra_completers: 100 # Create the default python completer used by 101 # ptpython.repl.PythonRepl 102 python_completer = PythonCompleter( 103 # No self.get_globals yet so this must be a lambda 104 # pylint: disable=unnecessary-lambda 105 lambda: self.get_globals(), 106 lambda: self.get_locals(), 107 lambda: self.enable_dictionary_completion, # type: ignore 108 ) 109 110 all_completers = [python_completer] 111 all_completers.extend(extra_completers) 112 # Merge default Python completer with the new custom one. 113 completer = merge_completers(all_completers) 114 115 super().__init__( 116 *args, 117 create_app=False, 118 # Absolute minimum height of 1 119 _input_buffer_height=Dimension(min=1), 120 _completer=completer, 121 **ptpython_kwargs, 122 ) 123 124 self.enable_mouse_support: bool = True 125 self.enable_history_search: bool = True 126 self.enable_dictionary_completion: bool = True 127 self._set_pt_python_input_buffer_control_focusable() 128 129 # Change some ptpython.repl defaults. 130 self.show_status_bar = False 131 self.show_exit_confirmation = False 132 self.complete_private_attributes = ( 133 CompletePrivateAttributes.IF_NO_PUBLIC 134 ) 135 136 # Function signature that shows args, kwargs, and types under the cursor 137 # of the input window. 138 self.show_signature: bool = True 139 # Docstring of the current completed function that appears at the bottom 140 # of the input window. 141 self.show_docstring: bool = False 142 143 # Turn off the completion menu in ptpython. The CompletionsMenu in 144 # ConsoleApp.root_container will handle this. 145 self.completion_visualisation: CompletionVisualisation = ( 146 CompletionVisualisation.NONE 147 ) 148 149 # Additional state variables. 150 self.repl_pane: 'Optional[ReplPane]' = None 151 self._last_result = None 152 self._last_exception = None 153 154 def _set_pt_python_input_buffer_control_focusable(self) -> None: 155 """Enable focus_on_click for ptpython's input buffer.""" 156 error_message = ( 157 'Unable to find ptpythons BufferControl input object.\n' 158 ' For the last known position see:\n' 159 ' https://github.com/prompt-toolkit/ptpython/' 160 'blob/6072174eace5b645b0cfd5b21b4c237e2539f577/' 161 'ptpython/layout.py#L598\n' 162 '\n' 163 'The installed version of ptpython may not be compatible with' 164 ' pw console; please try re-running environment setup.' 165 ) 166 167 try: 168 # Fetch the Window's BufferControl object. 169 # From ptpython/layout.py: 170 # self.root_container = HSplit([ 171 # VSplit([ 172 # HSplit([ 173 # FloatContainer( 174 # content=HSplit( 175 # [create_python_input_window()] + extra_body 176 # ), ... 177 ptpython_buffer_control = ( 178 self.ptpython_layout.root_container.children[0] # type: ignore 179 .children[0] 180 .children[0] 181 .content.children[0] 182 .content 183 ) 184 # This should be a BufferControl instance 185 if not isinstance(ptpython_buffer_control, BufferControl): 186 raise MissingPtpythonBufferControl(error_message) 187 # Enable focus options 188 ptpython_buffer_control.focusable = to_filter(True) 189 ptpython_buffer_control.focus_on_click = to_filter(True) 190 except IndexError as _error: 191 raise MissingPtpythonBufferControl(error_message) 192 193 def __pt_container__(self): 194 """Return the prompt_toolkit root container for class. 195 196 This allows self to be used wherever prompt_toolkit expects a container 197 object.""" 198 return self.ptpython_layout.root_container 199 200 def set_repl_pane(self, repl_pane): 201 """Update the parent pw_console.ReplPane reference.""" 202 self.repl_pane = repl_pane 203 204 def _save_result(self, formatted_text): 205 """Save the last repl execution result.""" 206 unformatted_result = remove_formatting(formatted_text) 207 self._last_result = unformatted_result 208 209 def _save_exception(self, formatted_text): 210 """Save the last repl exception.""" 211 unformatted_result = remove_formatting(formatted_text) 212 self._last_exception = unformatted_result 213 214 def clear_last_result(self): 215 """Erase the last repl execution result.""" 216 self._last_result = None 217 self._last_exception = None 218 219 def show_result(self, result): 220 """Format and save output results. 221 222 This function is called from the _run_user_code() function which is 223 always run from the user code thread, within 224 .run_and_show_expression_async(). 225 """ 226 formatted_result = self._format_result_output(result) 227 self._save_result(formatted_result) 228 229 def _handle_exception(self, e: BaseException) -> None: 230 """Format and save output results. 231 232 This function is called from the _run_user_code() function which is 233 always run from the user code thread, within 234 .run_and_show_expression_async(). 235 """ 236 formatted_result = self._format_exception_output(e) 237 self._save_exception(formatted_result.__pt_formatted_text__()) 238 239 def user_code_complete_callback(self, input_text, future): 240 """Callback to run after user repl code is finished.""" 241 # If there was an exception it will be saved in self._last_result 242 result_text = self._last_result 243 result_object = None 244 exception_text = self._last_exception 245 246 # _last_results consumed, erase for the next run. 247 self.clear_last_result() 248 249 stdout_contents = None 250 stderr_contents = None 251 if future.result(): 252 future_result = future.result() 253 stdout_contents = future_result['stdout'] 254 stderr_contents = future_result['stderr'] 255 result_object = future_result['result'] 256 257 if result_object is not None: 258 # Use ptpython formatted results: 259 formatted_result = self._format_result_output(result_object) 260 result_text = remove_formatting(formatted_result) 261 262 # Job is finished, append the last result. 263 self.repl_pane.append_result_to_executed_code( 264 input_text, 265 future, 266 result_text, 267 stdout_contents, 268 stderr_contents, 269 exception_text=exception_text, 270 result_object=result_object, 271 ) 272 273 # Rebuild output buffer. 274 self.repl_pane.update_output_buffer( 275 'pw_ptpython_repl.user_code_complete_callback' 276 ) 277 278 # Trigger a prompt_toolkit application redraw. 279 self.repl_pane.application.application.invalidate() 280 281 async def _run_system_command( # pylint: disable=no-self-use 282 self, text, stdout_proxy, _stdin_proxy 283 ) -> int: 284 """Run a shell command and print results to the repl.""" 285 command = shlex.split(text) 286 returncode = None 287 env = os.environ.copy() 288 # Force colors in Pigweed subcommands and some terminal apps. 289 env['PW_USE_COLOR'] = '1' 290 env['CLICOLOR_FORCE'] = '1' 291 292 def _handle_output(output): 293 # Force tab characters to 8 spaces to prevent \t from showing in 294 # prompt_toolkit. 295 output = output.replace('\t', ' ') 296 # Strip some ANSI sequences that don't render. 297 output = output.replace('\x1b(B\x1b[m', '') 298 output = output.replace('\x1b[1m', '') 299 stdout_proxy.write(output) 300 _SYSTEM_COMMAND_LOG.info(output.rstrip()) 301 302 with subprocess.Popen( 303 command, 304 env=env, 305 stdout=subprocess.PIPE, 306 stderr=subprocess.STDOUT, 307 errors='replace', 308 ) as proc: 309 # Print the command 310 _SYSTEM_COMMAND_LOG.info('') 311 _SYSTEM_COMMAND_LOG.info('$ %s', text) 312 while returncode is None: 313 if not proc.stdout: 314 continue 315 316 # Check for one line and update. 317 output = proc.stdout.readline() 318 _handle_output(output) 319 320 returncode = proc.poll() 321 322 # Print any remaining lines. 323 for output in proc.stdout.readlines(): 324 _handle_output(output) 325 326 return returncode 327 328 async def _run_user_code(self, text, stdout_proxy, stdin_proxy): 329 """Run user code and capture stdout+err. 330 331 This fuction should be run in a separate thread from the main 332 prompt_toolkit application.""" 333 # NOTE: This function runs in a separate thread using the asyncio event 334 # loop defined by self.repl_pane.application.user_code_loop. Patching 335 # stdout here will not effect the stdout used by prompt_toolkit and the 336 # main user interface. 337 338 # Patch stdout and stderr to capture repl print() statements. 339 original_stdout = sys.stdout 340 original_stderr = sys.stderr 341 342 sys.stdout = stdout_proxy 343 sys.stderr = stdin_proxy 344 345 # Run user repl code 346 try: 347 if _user_input_is_a_shell_command(text): 348 result = await self._run_system_command( 349 text[1:], stdout_proxy, stdin_proxy 350 ) 351 else: 352 result = await self.run_and_show_expression_async(text) 353 finally: 354 # Always restore original stdout and stderr 355 sys.stdout = original_stdout 356 sys.stderr = original_stderr 357 358 # Save the captured output 359 stdout_contents = stdout_proxy.getvalue() 360 stderr_contents = stdin_proxy.getvalue() 361 362 return { 363 'stdout': stdout_contents, 364 'stderr': stderr_contents, 365 'result': result, 366 } 367 368 def _accept_handler(self, buff: Buffer) -> bool: 369 """Function executed when pressing enter in the ptpython.repl.PythonRepl 370 input buffer.""" 371 # Do nothing if no text is entered. 372 if len(buff.text) == 0: 373 return False 374 if self.repl_pane is None: 375 return False 376 377 repl_input_text = buff.text 378 # Exit if quit or exit 379 if repl_input_text.strip() in ['quit', 'quit()', 'exit', 'exit()']: 380 self.repl_pane.application.application.exit() # type: ignore 381 382 # Create stdout and stderr proxies 383 temp_stdout = io.StringIO() 384 temp_stderr = io.StringIO() 385 386 # The help() command with no args uses it's own interactive prompt which 387 # will not work if prompt_toolkit is running. 388 if repl_input_text.strip() in ['help()']: 389 # Run nothing 390 repl_input_text = '' 391 # Override stdout 392 temp_stdout.write( 393 'Error: Interactive help() is not compatible with this repl.' 394 ) 395 396 # Pop open the system command log pane for shell commands. 397 if _user_input_is_a_shell_command(repl_input_text): 398 self.repl_pane.application.setup_command_runner_log_pane() 399 400 # Execute the repl code in the the separate user_code thread loop. 401 future = asyncio.run_coroutine_threadsafe( 402 # This function will be executed in a separate thread. 403 self._run_user_code(repl_input_text, temp_stdout, temp_stderr), 404 # Using this asyncio event loop. 405 self.repl_pane.application.user_code_loop, 406 ) # type: ignore 407 408 # Save the input text and future object. 409 self.repl_pane.append_executed_code( 410 repl_input_text, future, temp_stdout, temp_stderr 411 ) # type: ignore 412 413 # Run user_code_complete_callback() when done. 414 done_callback = functools.partial( 415 self.user_code_complete_callback, repl_input_text 416 ) 417 future.add_done_callback(done_callback) 418 419 # Rebuild the parent ReplPane output buffer. 420 self.repl_pane.update_output_buffer('pw_ptpython_repl._accept_handler') 421 422 # TODO(tonymd): Return True if exception is found? 423 # Don't keep input for now. Return True to keep input text. 424 return False 425 426 def line_break_count(self) -> int: 427 return self.default_buffer.text.count('\n') 428 429 def input_empty_if_in_focus_condition(self) -> Condition: 430 @Condition 431 def test() -> bool: 432 if has_focus(self)() and len(self.default_buffer.text) == 0: 433 return True 434 return not has_focus(self)() 435 436 return test 437