1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""ReplPane class.""" 15 16import asyncio 17import concurrent 18import functools 19import logging 20import pprint 21from dataclasses import dataclass 22from typing import ( 23 Any, 24 Awaitable, 25 Callable, 26 Dict, 27 List, 28 Optional, 29 Tuple, 30 TYPE_CHECKING, 31 Union, 32) 33 34from prompt_toolkit.filters import ( 35 Condition, 36 has_focus, 37) 38from prompt_toolkit.document import Document 39from prompt_toolkit.key_binding import KeyBindings, KeyPressEvent 40from prompt_toolkit.layout.dimension import AnyDimension 41from prompt_toolkit.widgets import TextArea 42from prompt_toolkit.layout import ( 43 ConditionalContainer, 44 DynamicContainer, 45 Dimension, 46 FloatContainer, 47 HSplit, 48 Window, 49) 50from prompt_toolkit.lexers import PygmentsLexer # type: ignore 51from pygments.lexers.python import PythonConsoleLexer # type: ignore 52 53# Alternative Formatting 54# from IPython.lib.lexers import IPythonConsoleLexer # type: ignore 55 56from pw_console.progress_bar.progress_bar_state import TASKS_CONTEXTVAR 57from pw_console.pw_ptpython_repl import PwPtPythonRepl 58from pw_console.style import get_pane_style 59from pw_console.widgets import ( 60 ToolbarButton, 61 WindowPane, 62 WindowPaneHSplit, 63 WindowPaneToolbar, 64) 65 66if TYPE_CHECKING: 67 from pw_console.console_app import ConsoleApp 68 69_LOG = logging.getLogger(__package__) 70 71_Namespace = Dict[str, Any] 72_GetNamespace = Callable[[], _Namespace] 73 74_REPL_OUTPUT_SCROLL_AMOUNT = 5 75 76 77@dataclass 78class UserCodeExecution: 79 """Class to hold a single user repl execution event.""" 80 81 input: str 82 future: concurrent.futures.Future 83 output: str 84 stdout: str 85 stderr: str 86 stdout_check_task: Optional[Awaitable] = None 87 result_object: Optional[Any] = None 88 result_str: Optional[str] = None 89 exception_text: Optional[str] = None 90 91 @property 92 def is_running(self): 93 return not self.future.done() 94 95 def update_stdout(self, text: Optional[str]): 96 if text: 97 self.stdout = text 98 99 def update_stderr(self, text: Optional[str]): 100 if text: 101 self.stderr = text 102 103 104class ReplPane(WindowPane): 105 """Pane for reading Python input.""" 106 107 # pylint: disable=too-many-instance-attributes,too-many-public-methods 108 def __init__( 109 self, 110 application: 'ConsoleApp', 111 python_repl: PwPtPythonRepl, 112 pane_title: str = 'Python Repl', 113 startup_message: Optional[str] = None, 114 ) -> None: 115 super().__init__(application, pane_title) 116 117 self.executed_code: List[UserCodeExecution] = [] 118 self.application = application 119 120 self.pw_ptpython_repl = python_repl 121 self.pw_ptpython_repl.set_repl_pane(self) 122 123 self.wrap_output_lines = True 124 125 self.startup_message = startup_message if startup_message else '' 126 127 self.output_field = TextArea( 128 text=self.startup_message, 129 focusable=True, 130 focus_on_click=True, 131 scrollbar=True, 132 wrap_lines=Condition(lambda: self.wrap_output_lines), 133 lexer=PygmentsLexer(PythonConsoleLexer), 134 ) 135 136 # Additional keybindings for the text area. 137 key_bindings = KeyBindings() 138 register = self.application.prefs.register_keybinding 139 140 @register('python-repl.copy-output-selection', key_bindings) 141 def _copy_selection(_event: KeyPressEvent) -> None: 142 """Copy selected text.""" 143 self.copy_output_selection() 144 145 self.output_field.control.key_bindings = key_bindings 146 147 # Override output buffer mouse wheel scroll 148 self.output_field.window._scroll_up = ( # type: ignore 149 self.scroll_output_up 150 ) 151 self.output_field.window._scroll_down = ( # type: ignore 152 self.scroll_output_down 153 ) 154 155 self.bottom_toolbar = self._create_input_toolbar() 156 self.results_toolbar = self._create_output_toolbar() 157 158 self.progress_state = TASKS_CONTEXTVAR.get() 159 160 # ReplPane root container 161 self.container = ConditionalContainer( 162 FloatContainer( 163 # Horizontal split of all Repl pane sections. 164 WindowPaneHSplit( 165 self, 166 [ 167 HSplit( 168 [ 169 # 1. Repl Output 170 self.output_field, 171 # 2. Progress bars if any 172 ConditionalContainer( 173 DynamicContainer( 174 self.get_progress_bar_task_container 175 ), 176 filter=Condition( 177 # pylint: disable=line-too-long 178 lambda: not self.progress_state.all_tasks_complete 179 # pylint: enable=line-too-long 180 ), 181 ), 182 # 3. Static separator toolbar. 183 self.results_toolbar, 184 ], 185 # Output area only dimensions 186 height=self.get_output_height, 187 ), 188 HSplit( 189 [ 190 # 3. Repl Input 191 self.pw_ptpython_repl, 192 # 4. Bottom toolbar 193 self.bottom_toolbar, 194 ], 195 # Input area only dimensions 196 height=self.get_input_height, 197 ), 198 ], 199 # Repl pane dimensions 200 height=lambda: self.height, 201 width=lambda: self.width, 202 style=functools.partial(get_pane_style, self), 203 ), 204 floats=[], 205 ), 206 filter=Condition(lambda: self.show_pane), 207 ) 208 209 def toggle_wrap_output_lines(self): 210 """Enable or disable output line wraping/truncation.""" 211 self.wrap_output_lines = not self.wrap_output_lines 212 213 def scroll_output_down(self) -> None: 214 """Scroll the output buffer down on mouse wheel down events.""" 215 for _i in range(_REPL_OUTPUT_SCROLL_AMOUNT): 216 # There is no move cursor more than one line at a time function. 217 self.output_field.control.move_cursor_down() 218 self.output_field.window.vertical_scroll += _REPL_OUTPUT_SCROLL_AMOUNT 219 220 def scroll_output_up(self) -> None: 221 """Scroll the output buffer up on mouse wheel up events.""" 222 for _i in range(_REPL_OUTPUT_SCROLL_AMOUNT): 223 # There is no move cursor more than one line at a time function. 224 self.output_field.control.move_cursor_up() 225 self.output_field.window.vertical_scroll -= _REPL_OUTPUT_SCROLL_AMOUNT 226 227 def focus_output(self): 228 self.application.focus_on_container(self.output_field) 229 230 def focus_input(self): 231 self.application.focus_on_container(self.pw_ptpython_repl) 232 233 def get_progress_bar_task_container(self): 234 bar_container = self.progress_state.get_container() 235 if bar_container: 236 return bar_container 237 return Window() 238 239 def get_output_height(self) -> AnyDimension: 240 # pylint: disable=no-self-use 241 return Dimension(min=1) 242 243 def get_input_height(self) -> AnyDimension: 244 desired_max_height = 10 245 # Check number of line breaks in the input buffer. 246 input_line_count = self.pw_ptpython_repl.line_break_count() 247 if input_line_count > desired_max_height: 248 desired_max_height = input_line_count 249 # Check if it's taller than the available space 250 if desired_max_height > self.current_pane_height: 251 # Leave space for minimum of 252 # 1 line of content in the output 253 # + 1 for output toolbar 254 # + 1 for input toolbar 255 desired_max_height = self.current_pane_height - 3 256 257 if desired_max_height > 1: 258 return Dimension(min=1, max=desired_max_height) 259 # Fall back to at least a height of 1 260 return Dimension(min=1) 261 262 def _create_input_toolbar(self): 263 bottom_toolbar = WindowPaneToolbar( 264 self, 265 focus_action_callable=self.focus_input, 266 focus_check_container=self.pw_ptpython_repl, 267 ) 268 bottom_toolbar.add_button( 269 ToolbarButton( 270 'Ctrl-v', 'Paste', self.paste_system_clipboard_to_input_buffer 271 ) 272 ) 273 bottom_toolbar.add_button( 274 ToolbarButton( 275 'Ctrl-c', 'Copy / Clear', self.copy_or_clear_input_buffer 276 ) 277 ) 278 bottom_toolbar.add_button(ToolbarButton('Enter', 'Run', self.run_code)) 279 bottom_toolbar.add_button(ToolbarButton('F2', 'Settings')) 280 bottom_toolbar.add_button(ToolbarButton('F3', 'History')) 281 return bottom_toolbar 282 283 def _create_output_toolbar(self): 284 results_toolbar = WindowPaneToolbar( 285 self, 286 title='Python Results', 287 focus_action_callable=self.focus_output, 288 focus_check_container=self.output_field, 289 include_resize_handle=False, 290 ) 291 results_toolbar.add_button( 292 ToolbarButton( 293 description='Wrap lines', 294 mouse_handler=self.toggle_wrap_output_lines, 295 is_checkbox=True, 296 checked=lambda: self.wrap_output_lines, 297 ) 298 ) 299 results_toolbar.add_button( 300 ToolbarButton( 301 'Ctrl-Alt-c', 'Copy All Output', self.copy_all_output_text 302 ) 303 ) 304 results_toolbar.add_button( 305 ToolbarButton( 306 'Ctrl-c', 'Copy Selected Text', self.copy_output_selection 307 ) 308 ) 309 310 results_toolbar.add_button( 311 ToolbarButton( 312 description='Clear', mouse_handler=self.clear_output_buffer 313 ) 314 ) 315 results_toolbar.add_button( 316 ToolbarButton('Shift+Arrows / Mouse Drag', 'Select Text') 317 ) 318 319 return results_toolbar 320 321 def copy_output_selection(self): 322 """Copy highlighted output text to the system clipboard.""" 323 clipboard_data = self.output_field.buffer.copy_selection() 324 self.application.application.clipboard.set_data(clipboard_data) 325 326 def copy_input_selection(self): 327 """Copy highlighted input text to the system clipboard.""" 328 clipboard_data = self.pw_ptpython_repl.default_buffer.copy_selection() 329 self.application.application.clipboard.set_data(clipboard_data) 330 331 def copy_all_output_text(self): 332 """Copy all text in the Python output to the system clipboard.""" 333 self.application.application.clipboard.set_text( 334 self.output_field.buffer.text 335 ) 336 337 def copy_all_input_text(self): 338 """Copy all text in the Python input to the system clipboard.""" 339 self.application.application.clipboard.set_text( 340 self.pw_ptpython_repl.default_buffer.text 341 ) 342 343 # pylint: disable=no-self-use 344 def get_all_key_bindings(self) -> List: 345 """Return all keybinds for this plugin.""" 346 # ptpython native bindings: 347 # return [load_python_bindings(self.pw_ptpython_repl)] 348 349 # Hand-crafted bindings for display in the HelpWindow: 350 return [ 351 { 352 'Execute code': ['Enter', 'Option-Enter', 'Alt-Enter'], 353 'Reverse search history': ['Ctrl-r'], 354 'Erase input buffer.': ['Ctrl-c'], 355 'Show settings.': ['F2'], 356 'Show history.': ['F3'], 357 } 358 ] 359 360 def get_window_menu_options( 361 self, 362 ) -> List[Tuple[str, Union[Callable, None]]]: 363 return [ 364 ( 365 'Python Input > Paste', 366 self.paste_system_clipboard_to_input_buffer, 367 ), 368 ('Python Input > Copy or Clear', self.copy_or_clear_input_buffer), 369 ('Python Input > Run', self.run_code), 370 # Menu separator 371 ('-', None), 372 ( 373 'Python Output > Toggle Wrap lines', 374 self.toggle_wrap_output_lines, 375 ), 376 ('Python Output > Copy All', self.copy_all_output_text), 377 ('Python Output > Copy Selection', self.copy_output_selection), 378 ('Python Output > Clear', self.clear_output_buffer), 379 ] 380 381 def run_code(self): 382 """Trigger a repl code execution on mouse click.""" 383 self.pw_ptpython_repl.default_buffer.validate_and_handle() 384 385 def ctrl_c(self): 386 """Ctrl-C keybinding behavior.""" 387 # If there is text in the input buffer 388 if self.pw_ptpython_repl.default_buffer.text: 389 self.copy_or_clear_input_buffer() 390 else: 391 self.interrupt_last_code_execution() 392 393 def insert_text_into_input_buffer(self, text: str) -> None: 394 self.pw_ptpython_repl.default_buffer.insert_text(text) 395 396 def paste_system_clipboard_to_input_buffer(self, erase_buffer=False): 397 if erase_buffer: 398 self.clear_input_buffer() 399 400 clip_data = self.application.application.clipboard.get_data() 401 self.pw_ptpython_repl.default_buffer.paste_clipboard_data(clip_data) 402 403 def clear_input_buffer(self): 404 # Erase input buffer. 405 self.pw_ptpython_repl.default_buffer.reset() 406 # Clear any displayed function signatures. 407 self.pw_ptpython_repl.on_reset() 408 409 def clear_output_buffer(self): 410 self.executed_code.clear() 411 self.update_output_buffer() 412 413 def copy_or_clear_input_buffer(self): 414 # Copy selected text if a selection is active. 415 if self.pw_ptpython_repl.default_buffer.selection_state: 416 self.copy_input_selection() 417 return 418 # Otherwise, clear the input buffer 419 self.clear_input_buffer() 420 421 def interrupt_last_code_execution(self): 422 code = self._get_currently_running_code() 423 if code: 424 code.future.cancel() 425 code.output = 'Canceled' 426 self.progress_state.cancel_all_tasks() 427 self.pw_ptpython_repl.clear_last_result() 428 self.update_output_buffer('repl_pane.interrupt_last_code_execution') 429 430 def _get_currently_running_code(self): 431 for code in self.executed_code: 432 if not code.future.done(): 433 return code 434 return None 435 436 def _get_executed_code(self, future): 437 for code in self.executed_code: 438 if code.future == future: 439 return code 440 return None 441 442 def _log_executed_code(self, code, prefix=''): 443 """Log repl command input text along with a prefix string.""" 444 text = self.get_output_buffer_text([code], show_index=False) 445 text = text.strip() 446 for line in text.splitlines(): 447 _LOG.debug('[PYTHON %s] %s', prefix, line.strip()) 448 449 async def periodically_check_stdout( 450 self, user_code: UserCodeExecution, stdout_proxy, stderr_proxy 451 ): 452 while not user_code.future.done(): 453 await asyncio.sleep(0.3) 454 stdout_text_so_far = stdout_proxy.getvalue() 455 stderr_text_so_far = stderr_proxy.getvalue() 456 if stdout_text_so_far: 457 user_code.update_stdout(stdout_text_so_far) 458 if stderr_text_so_far: 459 user_code.update_stderr(stderr_text_so_far) 460 461 # if stdout_text_so_far or stderr_text_so_far: 462 self.update_output_buffer('repl_pane.periodic_check') 463 464 def append_executed_code(self, text, future, temp_stdout, temp_stderr): 465 user_code = UserCodeExecution( 466 input=text, future=future, output=None, stdout=None, stderr=None 467 ) 468 469 background_stdout_check = asyncio.create_task( 470 self.periodically_check_stdout(user_code, temp_stdout, temp_stderr) 471 ) 472 user_code.stdout_check_task = background_stdout_check 473 self.executed_code.append(user_code) 474 self._log_executed_code(user_code, prefix='START') 475 476 def append_result_to_executed_code( 477 self, 478 _input_text, 479 future, 480 result_text, 481 stdout_text='', 482 stderr_text='', 483 exception_text='', 484 result_object=None, 485 ): 486 code = self._get_executed_code(future) 487 if code: 488 code.output = result_text 489 code.stdout = stdout_text 490 code.stderr = stderr_text 491 code.exception_text = exception_text 492 code.result_object = result_object 493 if result_object is not None: 494 code.result_str = self._format_result_object(result_object) 495 496 self._log_executed_code(code, prefix='FINISH') 497 self.update_output_buffer('repl_pane.append_result_to_executed_code') 498 499 def _format_result_object(self, result_object: Any) -> str: 500 """Pretty print format a Python object respecting the window width.""" 501 content_width = ( 502 self.current_pane_width if self.current_pane_width else 80 503 ) 504 pprint_respecting_width = pprint.PrettyPrinter( 505 indent=2, width=content_width 506 ).pformat 507 508 return pprint_respecting_width(result_object) 509 510 def get_output_buffer_text( 511 self, 512 code_items: Optional[List[UserCodeExecution]] = None, 513 show_index: bool = True, 514 ): 515 executed_code = code_items or self.executed_code 516 517 template = self.application.get_template('repl_output.jinja') 518 519 return template.render( 520 code_items=executed_code, 521 show_index=show_index, 522 ) 523 524 def update_output_buffer(self, *unused_args): 525 text = self.get_output_buffer_text() 526 # Add an extra line break so the last cursor position is in column 0 527 # instead of the end of the last line. 528 text += '\n' 529 self.output_field.buffer.set_document( 530 Document(text=text, cursor_position=len(text)) 531 ) 532 533 self.application.redraw_ui() 534 535 def input_or_output_has_focus(self) -> Condition: 536 @Condition 537 def test() -> bool: 538 if ( 539 has_focus(self.output_field)() 540 or has_focus(self.pw_ptpython_repl)() 541 ): 542 return True 543 return False 544 545 return test 546 547 def history_completions(self) -> List[Tuple[str, str]]: 548 return [ 549 ( 550 ' '.join([line.lstrip() for line in text.splitlines()]), 551 # Pass original text as the completion result. 552 text, 553 ) 554 for text in list( 555 self.pw_ptpython_repl.history.load_history_strings() 556 ) 557 ] 558