1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""ReplPane class.""" 15 16import asyncio 17import concurrent 18import functools 19import logging 20import pprint 21from dataclasses import dataclass 22from typing import ( 23 Any, 24 Callable, 25 Dict, 26 List, 27 Optional, 28 TYPE_CHECKING, 29) 30 31from prompt_toolkit.filters import ( 32 Condition, 33 has_focus, 34) 35from prompt_toolkit.document import Document 36from prompt_toolkit.key_binding import KeyBindings, KeyPressEvent 37from prompt_toolkit.layout.dimension import AnyDimension 38from prompt_toolkit.widgets import TextArea 39from prompt_toolkit.layout import ( 40 ConditionalContainer, 41 DynamicContainer, 42 Dimension, 43 FloatContainer, 44 HSplit, 45 Window, 46) 47from prompt_toolkit.lexers import PygmentsLexer # type: ignore 48from pygments.lexers.python import PythonConsoleLexer # type: ignore 49# Alternative Formatting 50# from IPython.lib.lexers import IPythonConsoleLexer # type: ignore 51 52from pw_console.progress_bar.progress_bar_state import TASKS_CONTEXTVAR 53from pw_console.pw_ptpython_repl import PwPtPythonRepl 54from pw_console.widgets import ( 55 ToolbarButton, 56 WindowPane, 57 WindowPaneHSplit, 58 WindowPaneToolbar, 59) 60import pw_console.mouse 61import pw_console.style 62 63if TYPE_CHECKING: 64 from pw_console.console_app import ConsoleApp 65 66_LOG = logging.getLogger(__package__) 67 68_Namespace = Dict[str, Any] 69_GetNamespace = Callable[[], _Namespace] 70 71_REPL_OUTPUT_SCROLL_AMOUNT = 5 72 73 74@dataclass 75class UserCodeExecution: 76 """Class to hold a single user repl execution event.""" 77 input: str 78 future: concurrent.futures.Future 79 output: str 80 stdout: str 81 stderr: str 82 stdout_check_task: Optional[concurrent.futures.Future] = None 83 result_object: Optional[Any] = None 84 exception_text: Optional[str] = None 85 86 @property 87 def is_running(self): 88 return not self.future.done() 89 90 def update_stdout(self, text: Optional[str]): 91 if text: 92 self.stdout = text 93 94 def update_stderr(self, text: Optional[str]): 95 if text: 96 self.stderr = text 97 98 99class ReplPane(WindowPane): 100 """Pane for reading Python input.""" 101 102 # pylint: disable=too-many-instance-attributes,too-many-public-methods 103 def __init__( 104 self, 105 application: 'ConsoleApp', 106 python_repl: PwPtPythonRepl, 107 pane_title: str = 'Python Repl', 108 startup_message: Optional[str] = None, 109 ) -> None: 110 super().__init__(application, pane_title) 111 112 self.executed_code: List = [] 113 self.application = application 114 115 self.pw_ptpython_repl = python_repl 116 self.pw_ptpython_repl.set_repl_pane(self) 117 118 self.wrap_output_lines = True 119 120 self.startup_message = startup_message if startup_message else '' 121 122 self.output_field = TextArea( 123 text=self.startup_message, 124 focusable=True, 125 focus_on_click=True, 126 scrollbar=True, 127 wrap_lines=Condition(lambda: self.wrap_output_lines), 128 lexer=PygmentsLexer(PythonConsoleLexer), 129 ) 130 131 # Additional keybindings for the text area. 132 key_bindings = KeyBindings() 133 register = self.application.prefs.register_keybinding 134 135 @register('python-repl.copy-output-selection', key_bindings) 136 def _copy_selection(_event: KeyPressEvent) -> None: 137 """Copy selected text.""" 138 self.copy_output_selection() 139 140 self.output_field.control.key_bindings = key_bindings 141 142 # Override output buffer mouse wheel scroll 143 self.output_field.window._scroll_up = ( # type: ignore 144 self.scroll_output_up) 145 self.output_field.window._scroll_down = ( # type: ignore 146 self.scroll_output_down) 147 148 self.bottom_toolbar = self._create_input_toolbar() 149 self.results_toolbar = self._create_output_toolbar() 150 151 self.progress_state = TASKS_CONTEXTVAR.get() 152 153 # ReplPane root container 154 self.container = ConditionalContainer( 155 FloatContainer( 156 # Horizontal split of all Repl pane sections. 157 WindowPaneHSplit( 158 self, 159 [ 160 HSplit( 161 [ 162 # 1. Repl Output 163 self.output_field, 164 # 2. Progress bars if any 165 ConditionalContainer( 166 DynamicContainer( 167 self.get_progress_bar_task_container), 168 filter=Condition( 169 lambda: not self.progress_state. 170 all_tasks_complete)), 171 # 3. Static separator toolbar. 172 self.results_toolbar, 173 ], 174 # Output area only dimensions 175 height=self.get_output_height, 176 ), 177 HSplit( 178 [ 179 # 3. Repl Input 180 self.pw_ptpython_repl, 181 # 4. Bottom toolbar 182 self.bottom_toolbar, 183 ], 184 # Input area only dimensions 185 height=self.get_input_height, 186 ), 187 ], 188 # Repl pane dimensions 189 height=lambda: self.height, 190 width=lambda: self.width, 191 style=functools.partial(pw_console.style.get_pane_style, 192 self), 193 ), 194 floats=[]), 195 filter=Condition(lambda: self.show_pane)) 196 197 def toggle_wrap_output_lines(self): 198 """Enable or disable output line wraping/truncation.""" 199 self.wrap_output_lines = not self.wrap_output_lines 200 201 def scroll_output_down(self) -> None: 202 """Scroll the output buffer down on mouse wheel down events.""" 203 for _i in range(_REPL_OUTPUT_SCROLL_AMOUNT): 204 # There is no move cursor more than one line at a time function. 205 self.output_field.control.move_cursor_down() 206 self.output_field.window.vertical_scroll += _REPL_OUTPUT_SCROLL_AMOUNT 207 208 def scroll_output_up(self) -> None: 209 """Scroll the output buffer up on mouse wheel up events.""" 210 for _i in range(_REPL_OUTPUT_SCROLL_AMOUNT): 211 # There is no move cursor more than one line at a time function. 212 self.output_field.control.move_cursor_up() 213 self.output_field.window.vertical_scroll -= _REPL_OUTPUT_SCROLL_AMOUNT 214 215 def focus_output(self): 216 self.application.focus_on_container(self.output_field) 217 218 def focus_input(self): 219 self.application.focus_on_container(self.pw_ptpython_repl) 220 221 def get_progress_bar_task_container(self): 222 bar_container = self.progress_state.get_container() 223 if bar_container: 224 return bar_container 225 return Window() 226 227 def get_output_height(self) -> AnyDimension: 228 # pylint: disable=no-self-use 229 return Dimension(min=1) 230 231 def get_input_height(self) -> AnyDimension: 232 desired_max_height = 10 233 # Check number of line breaks in the input buffer. 234 input_line_count = self.pw_ptpython_repl.line_break_count() 235 if input_line_count > desired_max_height: 236 desired_max_height = input_line_count 237 # Check if it's taller than the available space 238 if desired_max_height > self.current_pane_height: 239 # Leave space for minimum of 240 # 1 line of content in the output 241 # + 1 for output toolbar 242 # + 1 for input toolbar 243 desired_max_height = self.current_pane_height - 3 244 245 if desired_max_height > 1: 246 return Dimension(min=1, max=desired_max_height) 247 # Fall back to at least a height of 1 248 return Dimension(min=1) 249 250 def _create_input_toolbar(self): 251 bottom_toolbar = WindowPaneToolbar( 252 self, 253 focus_action_callable=self.focus_input, 254 focus_check_container=self.pw_ptpython_repl, 255 ) 256 bottom_toolbar.add_button( 257 ToolbarButton('Ctrl-v', 'Paste', 258 self.paste_system_clipboard_to_input_buffer)) 259 bottom_toolbar.add_button( 260 ToolbarButton('Ctrl-c', 'Copy / Clear', 261 self.copy_or_clear_input_buffer)) 262 bottom_toolbar.add_button(ToolbarButton('Enter', 'Run', self.run_code)) 263 bottom_toolbar.add_button(ToolbarButton('F2', 'Settings')) 264 bottom_toolbar.add_button(ToolbarButton('F3', 'History')) 265 return bottom_toolbar 266 267 def _create_output_toolbar(self): 268 results_toolbar = WindowPaneToolbar( 269 self, 270 title='Python Results', 271 focus_action_callable=self.focus_output, 272 focus_check_container=self.output_field, 273 include_resize_handle=False, 274 ) 275 results_toolbar.add_button( 276 ToolbarButton(description='Wrap lines', 277 mouse_handler=self.toggle_wrap_output_lines, 278 is_checkbox=True, 279 checked=lambda: self.wrap_output_lines)) 280 results_toolbar.add_button( 281 ToolbarButton('Ctrl-Alt-c', 'Copy All Output', 282 self.copy_all_output_text)) 283 results_toolbar.add_button( 284 ToolbarButton('Ctrl-c', 'Copy Selected Text', 285 self.copy_output_selection)) 286 287 results_toolbar.add_button( 288 ToolbarButton('Shift+Arrows / Mouse Drag', 'Select Text')) 289 290 return results_toolbar 291 292 def copy_output_selection(self): 293 """Copy highlighted output text to the system clipboard.""" 294 clipboard_data = self.output_field.buffer.copy_selection() 295 self.application.application.clipboard.set_data(clipboard_data) 296 297 def copy_input_selection(self): 298 """Copy highlighted input text to the system clipboard.""" 299 clipboard_data = self.pw_ptpython_repl.default_buffer.copy_selection() 300 self.application.application.clipboard.set_data(clipboard_data) 301 302 def copy_all_output_text(self): 303 """Copy all text in the Python output to the system clipboard.""" 304 self.application.application.clipboard.set_text( 305 self.output_field.buffer.text) 306 307 def copy_all_input_text(self): 308 """Copy all text in the Python input to the system clipboard.""" 309 self.application.application.clipboard.set_text( 310 self.pw_ptpython_repl.default_buffer.text) 311 312 # pylint: disable=no-self-use 313 def get_all_key_bindings(self) -> List: 314 """Return all keybinds for this plugin.""" 315 # ptpython native bindings: 316 # return [load_python_bindings(self.pw_ptpython_repl)] 317 318 # Hand-crafted bindings for display in the HelpWindow: 319 return [{ 320 'Execute code': ['Enter', 'Option-Enter', 'Alt-Enter'], 321 'Reverse search history': ['Ctrl-r'], 322 'Erase input buffer.': ['Ctrl-c'], 323 'Show settings.': ['F2'], 324 'Show history.': ['F3'], 325 }] 326 327 def get_all_menu_options(self): 328 return [] 329 330 def run_code(self): 331 """Trigger a repl code execution on mouse click.""" 332 self.pw_ptpython_repl.default_buffer.validate_and_handle() 333 334 def ctrl_c(self): 335 """Ctrl-C keybinding behavior.""" 336 # If there is text in the input buffer 337 if self.pw_ptpython_repl.default_buffer.text: 338 self.copy_or_clear_input_buffer() 339 else: 340 self.interrupt_last_code_execution() 341 342 def paste_system_clipboard_to_input_buffer(self, erase_buffer=False): 343 if erase_buffer: 344 self.clear_input_buffer() 345 346 clip_data = self.application.application.clipboard.get_data() 347 self.pw_ptpython_repl.default_buffer.paste_clipboard_data(clip_data) 348 349 def clear_input_buffer(self): 350 # Erase input buffer. 351 self.pw_ptpython_repl.default_buffer.reset() 352 # Clear any displayed function signatures. 353 self.pw_ptpython_repl.on_reset() 354 355 def copy_or_clear_input_buffer(self): 356 # Copy selected text if a selection is active. 357 if self.pw_ptpython_repl.default_buffer.selection_state: 358 self.copy_input_selection() 359 return 360 # Otherwise, clear the input buffer 361 self.clear_input_buffer() 362 363 def interrupt_last_code_execution(self): 364 code = self._get_currently_running_code() 365 if code: 366 code.future.cancel() 367 code.output = 'Canceled' 368 self.progress_state.cancel_all_tasks() 369 self.pw_ptpython_repl.clear_last_result() 370 self.update_output_buffer('repl_pane.interrupt_last_code_execution') 371 372 def _get_currently_running_code(self): 373 for code in self.executed_code: 374 if not code.future.done(): 375 return code 376 return None 377 378 def _get_executed_code(self, future): 379 for code in self.executed_code: 380 if code.future == future: 381 return code 382 return None 383 384 def _log_executed_code(self, code, prefix=''): 385 """Log repl command input text along with a prefix string.""" 386 text = self.get_output_buffer_text([code], show_index=False) 387 text = text.strip() 388 for line in text.splitlines(): 389 _LOG.debug('[PYTHON %s] %s', prefix, line.strip()) 390 391 async def periodically_check_stdout(self, user_code: UserCodeExecution, 392 stdout_proxy, stderr_proxy): 393 while not user_code.future.done(): 394 await asyncio.sleep(0.3) 395 stdout_text_so_far = stdout_proxy.getvalue() 396 stderr_text_so_far = stderr_proxy.getvalue() 397 if stdout_text_so_far: 398 user_code.update_stdout(stdout_text_so_far) 399 if stderr_text_so_far: 400 user_code.update_stderr(stderr_text_so_far) 401 402 # if stdout_text_so_far or stderr_text_so_far: 403 self.update_output_buffer('repl_pane.periodic_check') 404 405 def append_executed_code(self, text, future, temp_stdout, temp_stderr): 406 user_code = UserCodeExecution(input=text, 407 future=future, 408 output=None, 409 stdout=None, 410 stderr=None) 411 412 background_stdout_check = asyncio.create_task( 413 self.periodically_check_stdout(user_code, temp_stdout, 414 temp_stderr)) 415 user_code.stdout_check_task = background_stdout_check 416 self.executed_code.append(user_code) 417 self._log_executed_code(user_code, prefix='START') 418 419 def append_result_to_executed_code( 420 self, 421 _input_text, 422 future, 423 result_text, 424 stdout_text='', 425 stderr_text='', 426 exception_text='', 427 result_object=None, 428 ): 429 430 code = self._get_executed_code(future) 431 if code: 432 code.output = result_text 433 code.stdout = stdout_text 434 code.stderr = stderr_text 435 code.exception_text = exception_text 436 code.result_object = result_object 437 self._log_executed_code(code, prefix='FINISH') 438 self.update_output_buffer('repl_pane.append_result_to_executed_code') 439 440 def get_output_buffer_text(self, code_items=None, show_index=True): 441 content_width = (self.current_pane_width 442 if self.current_pane_width else 80) 443 pprint_respecting_width = pprint.PrettyPrinter( 444 indent=2, width=content_width).pformat 445 446 executed_code = code_items or self.executed_code 447 448 template = self.application.get_template('repl_output.jinja') 449 return template.render(code_items=executed_code, 450 result_format=pprint_respecting_width, 451 show_index=show_index) 452 453 def update_output_buffer(self, *unused_args): 454 text = self.get_output_buffer_text() 455 # Add an extra line break so the last cursor position is in column 0 456 # instead of the end of the last line. 457 text += '\n' 458 self.output_field.buffer.set_document( 459 Document(text=text, cursor_position=len(text))) 460 461 self.application.redraw_ui() 462 463 def input_or_output_has_focus(self) -> Condition: 464 @Condition 465 def test() -> bool: 466 if has_focus(self.output_field)() or has_focus( 467 self.pw_ptpython_repl)(): 468 return True 469 return False 470 471 return test 472