1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""PwPtPythonPane class.""" 15 16import asyncio 17import functools 18import io 19import logging 20import sys 21from typing import Iterable, Optional, TYPE_CHECKING 22 23from prompt_toolkit.buffer import Buffer 24from prompt_toolkit.layout.controls import BufferControl 25from prompt_toolkit.completion import merge_completers 26from prompt_toolkit.filters import ( 27 Condition, 28 has_focus, 29 to_filter, 30) 31from ptpython.completer import ( # type: ignore 32 CompletePrivateAttributes, PythonCompleter, 33) 34import ptpython.repl # type: ignore 35from ptpython.layout import ( # type: ignore 36 CompletionVisualisation, Dimension, 37) 38 39import pw_console.text_formatting 40 41if TYPE_CHECKING: 42 from pw_console.repl_pane import ReplPane 43 44_LOG = logging.getLogger(__package__) 45 46 47class MissingPtpythonBufferControl(Exception): 48 """Exception for a missing ptpython BufferControl object.""" 49 50 51class PwPtPythonRepl(ptpython.repl.PythonRepl): # pylint: disable=too-many-instance-attributes 52 """A ptpython repl class with changes to code execution and output related 53 methods.""" 54 def __init__( 55 self, 56 *args, 57 # pw_console specific kwargs 58 extra_completers: Optional[Iterable] = None, 59 **ptpython_kwargs, 60 ): 61 62 completer = None 63 if extra_completers: 64 # Create the default python completer used by 65 # ptpython.repl.PythonRepl 66 python_completer = PythonCompleter( 67 # No self.get_globals yet so this must be a lambda 68 # pylint: disable=unnecessary-lambda 69 lambda: self.get_globals(), 70 lambda: self.get_locals(), 71 lambda: self.enable_dictionary_completion, # type: ignore 72 ) 73 74 all_completers = [python_completer] 75 all_completers.extend(extra_completers) 76 # Merge default Python completer with the new custom one. 77 completer = merge_completers(all_completers) 78 79 super().__init__( 80 *args, 81 create_app=False, 82 # Absolute minimum height of 1 83 _input_buffer_height=Dimension(min=1), 84 _completer=completer, 85 **ptpython_kwargs, 86 ) 87 88 self.enable_mouse_support: bool = True 89 self.enable_history_search: bool = True 90 self.enable_dictionary_completion: bool = True 91 self._set_pt_python_input_buffer_control_focusable() 92 93 # Change some ptpython.repl defaults. 94 self.show_status_bar = False 95 self.show_exit_confirmation = False 96 self.complete_private_attributes = ( 97 CompletePrivateAttributes.IF_NO_PUBLIC) 98 99 # Function signature that shows args, kwargs, and types under the cursor 100 # of the input window. 101 self.show_signature: bool = True 102 # Docstring of the current completed function that appears at the bottom 103 # of the input window. 104 self.show_docstring: bool = False 105 106 # Turn off the completion menu in ptpython. The CompletionsMenu in 107 # ConsoleApp.root_container will handle this. 108 self.completion_visualisation: CompletionVisualisation = ( 109 CompletionVisualisation.NONE) 110 111 # Additional state variables. 112 self.repl_pane: 'Optional[ReplPane]' = None 113 self._last_result = None 114 self._last_exception = None 115 116 def _set_pt_python_input_buffer_control_focusable(self) -> None: 117 """Enable focus_on_click for ptpython's input buffer.""" 118 error_message = ( 119 'Unable to find ptpythons BufferControl input object.\n' 120 ' For the last known position see:\n' 121 ' https://github.com/prompt-toolkit/ptpython/' 122 'blob/6072174eace5b645b0cfd5b21b4c237e2539f577/' 123 'ptpython/layout.py#L598\n' 124 '\n' 125 'The installed version of ptpython may not be compatible with' 126 ' pw console; please try re-running environment setup.') 127 128 try: 129 # Fetch the Window's BufferControl object. 130 # From ptpython/layout.py: 131 # self.root_container = HSplit([ 132 # VSplit([ 133 # HSplit([ 134 # FloatContainer( 135 # content=HSplit( 136 # [create_python_input_window()] + extra_body 137 # ), ... 138 ptpython_buffer_control = ( 139 self.ptpython_layout.root_container.children[0].children[0]. 140 children[0].content.children[0].content) 141 # This should be a BufferControl instance 142 if not isinstance(ptpython_buffer_control, BufferControl): 143 raise MissingPtpythonBufferControl(error_message) 144 # Enable focus options 145 ptpython_buffer_control.focusable = to_filter(True) 146 ptpython_buffer_control.focus_on_click = to_filter(True) 147 except IndexError as _error: 148 raise MissingPtpythonBufferControl(error_message) 149 150 def __pt_container__(self): 151 """Return the prompt_toolkit root container for class. 152 153 This allows self to be used wherever prompt_toolkit expects a container 154 object.""" 155 return self.ptpython_layout.root_container 156 157 def set_repl_pane(self, repl_pane): 158 """Update the parent pw_console.ReplPane reference.""" 159 self.repl_pane = repl_pane 160 161 def _save_result(self, formatted_text): 162 """Save the last repl execution result.""" 163 unformatted_result = pw_console.text_formatting.remove_formatting( 164 formatted_text) 165 self._last_result = unformatted_result 166 167 def _save_exception(self, formatted_text): 168 """Save the last repl exception.""" 169 unformatted_result = pw_console.text_formatting.remove_formatting( 170 formatted_text) 171 self._last_exception = unformatted_result 172 173 def clear_last_result(self): 174 """Erase the last repl execution result.""" 175 self._last_result = None 176 self._last_exception = None 177 178 def show_result(self, result): 179 """Format and save output results. 180 181 This function is called from the _run_user_code() function which is 182 always run from the user code thread, within 183 .run_and_show_expression_async(). 184 """ 185 formatted_result = self._format_result_output(result) 186 self._save_result(formatted_result) 187 188 def _handle_exception(self, e: BaseException) -> None: 189 """Format and save output results. 190 191 This function is called from the _run_user_code() function which is 192 always run from the user code thread, within 193 .run_and_show_expression_async(). 194 """ 195 formatted_result = self._format_exception_output(e) 196 self._save_exception(formatted_result.__pt_formatted_text__()) 197 198 def user_code_complete_callback(self, input_text, future): 199 """Callback to run after user repl code is finished.""" 200 # If there was an exception it will be saved in self._last_result 201 result_text = self._last_result 202 result_object = None 203 exception_text = self._last_exception 204 205 # _last_results consumed, erase for the next run. 206 self.clear_last_result() 207 208 stdout_contents = None 209 stderr_contents = None 210 if future.result(): 211 future_result = future.result() 212 stdout_contents = future_result['stdout'] 213 stderr_contents = future_result['stderr'] 214 result_object = future_result['result'] 215 216 if result_object is not None: 217 # Use ptpython formatted results: 218 formatted_result = self._format_result_output(result_object) 219 result_text = pw_console.text_formatting.remove_formatting( 220 formatted_result) 221 222 # Job is finished, append the last result. 223 self.repl_pane.append_result_to_executed_code( 224 input_text, 225 future, 226 result_text, 227 stdout_contents, 228 stderr_contents, 229 exception_text=exception_text, 230 result_object=result_object, 231 ) 232 233 # Rebuild output buffer. 234 self.repl_pane.update_output_buffer( 235 'pw_ptpython_repl.user_code_complete_callback') 236 237 # Trigger a prompt_toolkit application redraw. 238 self.repl_pane.application.application.invalidate() 239 240 async def _run_user_code(self, text, stdout_proxy, stdin_proxy): 241 """Run user code and capture stdout+err. 242 243 This fuction should be run in a separate thread from the main 244 prompt_toolkit application.""" 245 # NOTE: This function runs in a separate thread using the asyncio event 246 # loop defined by self.repl_pane.application.user_code_loop. Patching 247 # stdout here will not effect the stdout used by prompt_toolkit and the 248 # main user interface. 249 250 # Patch stdout and stderr to capture repl print() statements. 251 original_stdout = sys.stdout 252 original_stderr = sys.stderr 253 254 sys.stdout = stdout_proxy 255 sys.stderr = stdin_proxy 256 257 # Run user repl code 258 try: 259 result = await self.run_and_show_expression_async(text) 260 finally: 261 # Always restore original stdout and stderr 262 sys.stdout = original_stdout 263 sys.stderr = original_stderr 264 265 # Save the captured output 266 stdout_contents = stdout_proxy.getvalue() 267 stderr_contents = stdin_proxy.getvalue() 268 269 return { 270 'stdout': stdout_contents, 271 'stderr': stderr_contents, 272 'result': result 273 } 274 275 def _accept_handler(self, buff: Buffer) -> bool: 276 """Function executed when pressing enter in the ptpython.repl.PythonRepl 277 input buffer.""" 278 # Do nothing if no text is entered. 279 if len(buff.text) == 0: 280 return False 281 if self.repl_pane is None: 282 return False 283 284 repl_input_text = buff.text 285 # Exit if quit or exit 286 if repl_input_text.strip() in ['quit', 'quit()', 'exit', 'exit()']: 287 self.repl_pane.application.application.exit() # type: ignore 288 289 # Create stdout and stderr proxies 290 temp_stdout = io.StringIO() 291 temp_stderr = io.StringIO() 292 293 # The help() command with no args uses it's own interactive prompt which 294 # will not work if prompt_toolkit is running. 295 if repl_input_text.strip() in ['help()']: 296 # Run nothing 297 repl_input_text = '' 298 # Override stdout 299 temp_stdout.write( 300 'Error: Interactive help() is not compatible with this repl.') 301 302 # Execute the repl code in the the separate user_code thread loop. 303 future = asyncio.run_coroutine_threadsafe( 304 # This function will be executed in a separate thread. 305 self._run_user_code(repl_input_text, temp_stdout, temp_stderr), 306 # Using this asyncio event loop. 307 self.repl_pane.application.user_code_loop) # type: ignore 308 309 # Save the input text and future object. 310 self.repl_pane.append_executed_code(repl_input_text, future, 311 temp_stdout, 312 temp_stderr) # type: ignore 313 314 # Run user_code_complete_callback() when done. 315 done_callback = functools.partial(self.user_code_complete_callback, 316 repl_input_text) 317 future.add_done_callback(done_callback) 318 319 # Rebuild the parent ReplPane output buffer. 320 self.repl_pane.update_output_buffer('pw_ptpython_repl._accept_handler') 321 322 # TODO(tonymd): Return True if exception is found? 323 # Don't keep input for now. Return True to keep input text. 324 return False 325 326 def line_break_count(self) -> int: 327 return self.default_buffer.text.count('\n') 328 329 def input_empty_if_in_focus_condition(self) -> Condition: 330 @Condition 331 def test() -> bool: 332 if has_focus(self)() and len(self.default_buffer.text) == 0: 333 return True 334 return not has_focus(self)() 335 336 return test 337