• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""ReplPane class."""
15
16import asyncio
17import concurrent
18import functools
19import logging
20import pprint
21from dataclasses import dataclass
22from typing import (
23    Any,
24    Awaitable,
25    Callable,
26    Dict,
27    List,
28    Optional,
29    Tuple,
30    TYPE_CHECKING,
31    Union,
32)
33
34from prompt_toolkit.filters import (
35    Condition,
36    has_focus,
37)
38from prompt_toolkit.document import Document
39from prompt_toolkit.key_binding import KeyBindings, KeyPressEvent
40from prompt_toolkit.layout.dimension import AnyDimension
41from prompt_toolkit.widgets import TextArea
42from prompt_toolkit.layout import (
43    ConditionalContainer,
44    DynamicContainer,
45    Dimension,
46    FloatContainer,
47    HSplit,
48    Window,
49)
50from prompt_toolkit.lexers import PygmentsLexer  # type: ignore
51from pygments.lexers.python import PythonConsoleLexer  # type: ignore
52
53# Alternative Formatting
54# from IPython.lib.lexers import IPythonConsoleLexer  # type: ignore
55
56from pw_console.progress_bar.progress_bar_state import TASKS_CONTEXTVAR
57from pw_console.pw_ptpython_repl import PwPtPythonRepl
58from pw_console.style import get_pane_style
59from pw_console.widgets import (
60    ToolbarButton,
61    WindowPane,
62    WindowPaneHSplit,
63    WindowPaneToolbar,
64)
65
66if TYPE_CHECKING:
67    from pw_console.console_app import ConsoleApp
68
69_LOG = logging.getLogger(__package__)
70
71_Namespace = Dict[str, Any]
72_GetNamespace = Callable[[], _Namespace]
73
74_REPL_OUTPUT_SCROLL_AMOUNT = 5
75
76
77@dataclass
78class UserCodeExecution:
79    """Class to hold a single user repl execution event."""
80
81    input: str
82    future: concurrent.futures.Future
83    output: str
84    stdout: str
85    stderr: str
86    stdout_check_task: Optional[Awaitable] = None
87    result_object: Optional[Any] = None
88    result_str: Optional[str] = None
89    exception_text: Optional[str] = None
90
91    @property
92    def is_running(self):
93        return not self.future.done()
94
95    def update_stdout(self, text: Optional[str]):
96        if text:
97            self.stdout = text
98
99    def update_stderr(self, text: Optional[str]):
100        if text:
101            self.stderr = text
102
103
104class ReplPane(WindowPane):
105    """Pane for reading Python input."""
106
107    # pylint: disable=too-many-instance-attributes,too-many-public-methods
108    def __init__(
109        self,
110        application: 'ConsoleApp',
111        python_repl: PwPtPythonRepl,
112        pane_title: str = 'Python Repl',
113        startup_message: Optional[str] = None,
114    ) -> None:
115        super().__init__(application, pane_title)
116
117        self.executed_code: List[UserCodeExecution] = []
118        self.application = application
119
120        self.pw_ptpython_repl = python_repl
121        self.pw_ptpython_repl.set_repl_pane(self)
122
123        self.wrap_output_lines = True
124
125        self.startup_message = startup_message if startup_message else ''
126
127        self.output_field = TextArea(
128            text=self.startup_message,
129            focusable=True,
130            focus_on_click=True,
131            scrollbar=True,
132            wrap_lines=Condition(lambda: self.wrap_output_lines),
133            lexer=PygmentsLexer(PythonConsoleLexer),
134        )
135
136        # Additional keybindings for the text area.
137        key_bindings = KeyBindings()
138        register = self.application.prefs.register_keybinding
139
140        @register('python-repl.copy-output-selection', key_bindings)
141        def _copy_selection(_event: KeyPressEvent) -> None:
142            """Copy selected text."""
143            self.copy_output_selection()
144
145        self.output_field.control.key_bindings = key_bindings
146
147        # Override output buffer mouse wheel scroll
148        self.output_field.window._scroll_up = (  # type: ignore
149            self.scroll_output_up
150        )
151        self.output_field.window._scroll_down = (  # type: ignore
152            self.scroll_output_down
153        )
154
155        self.bottom_toolbar = self._create_input_toolbar()
156        self.results_toolbar = self._create_output_toolbar()
157
158        self.progress_state = TASKS_CONTEXTVAR.get()
159
160        # ReplPane root container
161        self.container = ConditionalContainer(
162            FloatContainer(
163                # Horizontal split of all Repl pane sections.
164                WindowPaneHSplit(
165                    self,
166                    [
167                        HSplit(
168                            [
169                                # 1. Repl Output
170                                self.output_field,
171                                # 2. Progress bars if any
172                                ConditionalContainer(
173                                    DynamicContainer(
174                                        self.get_progress_bar_task_container
175                                    ),
176                                    filter=Condition(
177                                        # pylint: disable=line-too-long
178                                        lambda: not self.progress_state.all_tasks_complete
179                                        # pylint: enable=line-too-long
180                                    ),
181                                ),
182                                # 3. Static separator toolbar.
183                                self.results_toolbar,
184                            ],
185                            # Output area only dimensions
186                            height=self.get_output_height,
187                        ),
188                        HSplit(
189                            [
190                                # 3. Repl Input
191                                self.pw_ptpython_repl,
192                                # 4. Bottom toolbar
193                                self.bottom_toolbar,
194                            ],
195                            # Input area only dimensions
196                            height=self.get_input_height,
197                        ),
198                    ],
199                    # Repl pane dimensions
200                    height=lambda: self.height,
201                    width=lambda: self.width,
202                    style=functools.partial(get_pane_style, self),
203                ),
204                floats=[],
205            ),
206            filter=Condition(lambda: self.show_pane),
207        )
208
209    def toggle_wrap_output_lines(self):
210        """Enable or disable output line wraping/truncation."""
211        self.wrap_output_lines = not self.wrap_output_lines
212
213    def scroll_output_down(self) -> None:
214        """Scroll the output buffer down on mouse wheel down events."""
215        for _i in range(_REPL_OUTPUT_SCROLL_AMOUNT):
216            # There is no move cursor more than one line at a time function.
217            self.output_field.control.move_cursor_down()
218        self.output_field.window.vertical_scroll += _REPL_OUTPUT_SCROLL_AMOUNT
219
220    def scroll_output_up(self) -> None:
221        """Scroll the output buffer up on mouse wheel up events."""
222        for _i in range(_REPL_OUTPUT_SCROLL_AMOUNT):
223            # There is no move cursor more than one line at a time function.
224            self.output_field.control.move_cursor_up()
225        self.output_field.window.vertical_scroll -= _REPL_OUTPUT_SCROLL_AMOUNT
226
227    def focus_output(self):
228        self.application.focus_on_container(self.output_field)
229
230    def focus_input(self):
231        self.application.focus_on_container(self.pw_ptpython_repl)
232
233    def get_progress_bar_task_container(self):
234        bar_container = self.progress_state.get_container()
235        if bar_container:
236            return bar_container
237        return Window()
238
239    def get_output_height(self) -> AnyDimension:
240        # pylint: disable=no-self-use
241        return Dimension(min=1)
242
243    def get_input_height(self) -> AnyDimension:
244        desired_max_height = 10
245        # Check number of line breaks in the input buffer.
246        input_line_count = self.pw_ptpython_repl.line_break_count()
247        if input_line_count > desired_max_height:
248            desired_max_height = input_line_count
249        # Check if it's taller than the available space
250        if desired_max_height > self.current_pane_height:
251            # Leave space for minimum of
252            #   1 line of content in the output
253            #   + 1 for output toolbar
254            #   + 1 for input toolbar
255            desired_max_height = self.current_pane_height - 3
256
257        if desired_max_height > 1:
258            return Dimension(min=1, max=desired_max_height)
259        # Fall back to at least a height of 1
260        return Dimension(min=1)
261
262    def _create_input_toolbar(self):
263        bottom_toolbar = WindowPaneToolbar(
264            self,
265            focus_action_callable=self.focus_input,
266            focus_check_container=self.pw_ptpython_repl,
267        )
268        bottom_toolbar.add_button(
269            ToolbarButton(
270                'Ctrl-v', 'Paste', self.paste_system_clipboard_to_input_buffer
271            )
272        )
273        bottom_toolbar.add_button(
274            ToolbarButton(
275                'Ctrl-c', 'Copy / Clear', self.copy_or_clear_input_buffer
276            )
277        )
278        bottom_toolbar.add_button(ToolbarButton('Enter', 'Run', self.run_code))
279        bottom_toolbar.add_button(ToolbarButton('F2', 'Settings'))
280        bottom_toolbar.add_button(ToolbarButton('F3', 'History'))
281        return bottom_toolbar
282
283    def _create_output_toolbar(self):
284        results_toolbar = WindowPaneToolbar(
285            self,
286            title='Python Results',
287            focus_action_callable=self.focus_output,
288            focus_check_container=self.output_field,
289            include_resize_handle=False,
290        )
291        results_toolbar.add_button(
292            ToolbarButton(
293                description='Wrap lines',
294                mouse_handler=self.toggle_wrap_output_lines,
295                is_checkbox=True,
296                checked=lambda: self.wrap_output_lines,
297            )
298        )
299        results_toolbar.add_button(
300            ToolbarButton(
301                'Ctrl-Alt-c', 'Copy All Output', self.copy_all_output_text
302            )
303        )
304        results_toolbar.add_button(
305            ToolbarButton(
306                'Ctrl-c', 'Copy Selected Text', self.copy_output_selection
307            )
308        )
309
310        results_toolbar.add_button(
311            ToolbarButton(
312                description='Clear', mouse_handler=self.clear_output_buffer
313            )
314        )
315        results_toolbar.add_button(
316            ToolbarButton('Shift+Arrows / Mouse Drag', 'Select Text')
317        )
318
319        return results_toolbar
320
321    def copy_output_selection(self):
322        """Copy highlighted output text to the system clipboard."""
323        clipboard_data = self.output_field.buffer.copy_selection()
324        self.application.application.clipboard.set_data(clipboard_data)
325
326    def copy_input_selection(self):
327        """Copy highlighted input text to the system clipboard."""
328        clipboard_data = self.pw_ptpython_repl.default_buffer.copy_selection()
329        self.application.application.clipboard.set_data(clipboard_data)
330
331    def copy_all_output_text(self):
332        """Copy all text in the Python output to the system clipboard."""
333        self.application.application.clipboard.set_text(
334            self.output_field.buffer.text
335        )
336
337    def copy_all_input_text(self):
338        """Copy all text in the Python input to the system clipboard."""
339        self.application.application.clipboard.set_text(
340            self.pw_ptpython_repl.default_buffer.text
341        )
342
343    # pylint: disable=no-self-use
344    def get_all_key_bindings(self) -> List:
345        """Return all keybinds for this plugin."""
346        # ptpython native bindings:
347        # return [load_python_bindings(self.pw_ptpython_repl)]
348
349        # Hand-crafted bindings for display in the HelpWindow:
350        return [
351            {
352                'Execute code': ['Enter', 'Option-Enter', 'Alt-Enter'],
353                'Reverse search history': ['Ctrl-r'],
354                'Erase input buffer.': ['Ctrl-c'],
355                'Show settings.': ['F2'],
356                'Show history.': ['F3'],
357            }
358        ]
359
360    def get_window_menu_options(
361        self,
362    ) -> List[Tuple[str, Union[Callable, None]]]:
363        return [
364            (
365                'Python Input > Paste',
366                self.paste_system_clipboard_to_input_buffer,
367            ),
368            ('Python Input > Copy or Clear', self.copy_or_clear_input_buffer),
369            ('Python Input > Run', self.run_code),
370            # Menu separator
371            ('-', None),
372            (
373                'Python Output > Toggle Wrap lines',
374                self.toggle_wrap_output_lines,
375            ),
376            ('Python Output > Copy All', self.copy_all_output_text),
377            ('Python Output > Copy Selection', self.copy_output_selection),
378            ('Python Output > Clear', self.clear_output_buffer),
379        ]
380
381    def run_code(self):
382        """Trigger a repl code execution on mouse click."""
383        self.pw_ptpython_repl.default_buffer.validate_and_handle()
384
385    def ctrl_c(self):
386        """Ctrl-C keybinding behavior."""
387        # If there is text in the input buffer
388        if self.pw_ptpython_repl.default_buffer.text:
389            self.copy_or_clear_input_buffer()
390        else:
391            self.interrupt_last_code_execution()
392
393    def insert_text_into_input_buffer(self, text: str) -> None:
394        self.pw_ptpython_repl.default_buffer.insert_text(text)
395
396    def paste_system_clipboard_to_input_buffer(self, erase_buffer=False):
397        if erase_buffer:
398            self.clear_input_buffer()
399
400        clip_data = self.application.application.clipboard.get_data()
401        self.pw_ptpython_repl.default_buffer.paste_clipboard_data(clip_data)
402
403    def clear_input_buffer(self):
404        # Erase input buffer.
405        self.pw_ptpython_repl.default_buffer.reset()
406        # Clear any displayed function signatures.
407        self.pw_ptpython_repl.on_reset()
408
409    def clear_output_buffer(self):
410        self.executed_code.clear()
411        self.update_output_buffer()
412
413    def copy_or_clear_input_buffer(self):
414        # Copy selected text if a selection is active.
415        if self.pw_ptpython_repl.default_buffer.selection_state:
416            self.copy_input_selection()
417            return
418        # Otherwise, clear the input buffer
419        self.clear_input_buffer()
420
421    def interrupt_last_code_execution(self):
422        code = self._get_currently_running_code()
423        if code:
424            code.future.cancel()
425            code.output = 'Canceled'
426            self.progress_state.cancel_all_tasks()
427        self.pw_ptpython_repl.clear_last_result()
428        self.update_output_buffer('repl_pane.interrupt_last_code_execution')
429
430    def _get_currently_running_code(self):
431        for code in self.executed_code:
432            if not code.future.done():
433                return code
434        return None
435
436    def _get_executed_code(self, future):
437        for code in self.executed_code:
438            if code.future == future:
439                return code
440        return None
441
442    def _log_executed_code(self, code, prefix=''):
443        """Log repl command input text along with a prefix string."""
444        text = self.get_output_buffer_text([code], show_index=False)
445        text = text.strip()
446        for line in text.splitlines():
447            _LOG.debug('[PYTHON %s]  %s', prefix, line.strip())
448
449    async def periodically_check_stdout(
450        self, user_code: UserCodeExecution, stdout_proxy, stderr_proxy
451    ):
452        while not user_code.future.done():
453            await asyncio.sleep(0.3)
454            stdout_text_so_far = stdout_proxy.getvalue()
455            stderr_text_so_far = stderr_proxy.getvalue()
456            if stdout_text_so_far:
457                user_code.update_stdout(stdout_text_so_far)
458            if stderr_text_so_far:
459                user_code.update_stderr(stderr_text_so_far)
460
461            # if stdout_text_so_far or stderr_text_so_far:
462            self.update_output_buffer('repl_pane.periodic_check')
463
464    def append_executed_code(self, text, future, temp_stdout, temp_stderr):
465        user_code = UserCodeExecution(
466            input=text, future=future, output=None, stdout=None, stderr=None
467        )
468
469        background_stdout_check = asyncio.create_task(
470            self.periodically_check_stdout(user_code, temp_stdout, temp_stderr)
471        )
472        user_code.stdout_check_task = background_stdout_check
473        self.executed_code.append(user_code)
474        self._log_executed_code(user_code, prefix='START')
475
476    def append_result_to_executed_code(
477        self,
478        _input_text,
479        future,
480        result_text,
481        stdout_text='',
482        stderr_text='',
483        exception_text='',
484        result_object=None,
485    ):
486        code = self._get_executed_code(future)
487        if code:
488            code.output = result_text
489            code.stdout = stdout_text
490            code.stderr = stderr_text
491            code.exception_text = exception_text
492            code.result_object = result_object
493            if result_object is not None:
494                code.result_str = self._format_result_object(result_object)
495
496        self._log_executed_code(code, prefix='FINISH')
497        self.update_output_buffer('repl_pane.append_result_to_executed_code')
498
499    def _format_result_object(self, result_object: Any) -> str:
500        """Pretty print format a Python object respecting the window width."""
501        content_width = (
502            self.current_pane_width if self.current_pane_width else 80
503        )
504        pprint_respecting_width = pprint.PrettyPrinter(
505            indent=2, width=content_width
506        ).pformat
507
508        return pprint_respecting_width(result_object)
509
510    def get_output_buffer_text(
511        self,
512        code_items: Optional[List[UserCodeExecution]] = None,
513        show_index: bool = True,
514    ):
515        executed_code = code_items or self.executed_code
516
517        template = self.application.get_template('repl_output.jinja')
518
519        return template.render(
520            code_items=executed_code,
521            show_index=show_index,
522        )
523
524    def update_output_buffer(self, *unused_args):
525        text = self.get_output_buffer_text()
526        # Add an extra line break so the last cursor position is in column 0
527        # instead of the end of the last line.
528        text += '\n'
529        self.output_field.buffer.set_document(
530            Document(text=text, cursor_position=len(text))
531        )
532
533        self.application.redraw_ui()
534
535    def input_or_output_has_focus(self) -> Condition:
536        @Condition
537        def test() -> bool:
538            if (
539                has_focus(self.output_field)()
540                or has_focus(self.pw_ptpython_repl)()
541            ):
542                return True
543            return False
544
545        return test
546
547    def history_completions(self) -> List[Tuple[str, str]]:
548        return [
549            (
550                ' '.join([line.lstrip() for line in text.splitlines()]),
551                # Pass original text as the completion result.
552                text,
553            )
554            for text in list(
555                self.pw_ptpython_repl.history.load_history_strings()
556            )
557        ]
558