• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""ReplPane class."""
15
16import asyncio
17import concurrent
18import functools
19import logging
20import pprint
21from dataclasses import dataclass
22from typing import (
23    Any,
24    Callable,
25    Dict,
26    List,
27    Optional,
28    TYPE_CHECKING,
29)
30
31from prompt_toolkit.filters import (
32    Condition,
33    has_focus,
34)
35from prompt_toolkit.document import Document
36from prompt_toolkit.key_binding import KeyBindings, KeyPressEvent
37from prompt_toolkit.layout.dimension import AnyDimension
38from prompt_toolkit.widgets import TextArea
39from prompt_toolkit.layout import (
40    ConditionalContainer,
41    DynamicContainer,
42    Dimension,
43    FloatContainer,
44    HSplit,
45    Window,
46)
47from prompt_toolkit.lexers import PygmentsLexer  # type: ignore
48from pygments.lexers.python import PythonConsoleLexer  # type: ignore
49# Alternative Formatting
50# from IPython.lib.lexers import IPythonConsoleLexer  # type: ignore
51
52from pw_console.progress_bar.progress_bar_state import TASKS_CONTEXTVAR
53from pw_console.pw_ptpython_repl import PwPtPythonRepl
54from pw_console.widgets import (
55    ToolbarButton,
56    WindowPane,
57    WindowPaneHSplit,
58    WindowPaneToolbar,
59)
60import pw_console.mouse
61import pw_console.style
62
63if TYPE_CHECKING:
64    from pw_console.console_app import ConsoleApp
65
66_LOG = logging.getLogger(__package__)
67
68_Namespace = Dict[str, Any]
69_GetNamespace = Callable[[], _Namespace]
70
71_REPL_OUTPUT_SCROLL_AMOUNT = 5
72
73
74@dataclass
75class UserCodeExecution:
76    """Class to hold a single user repl execution event."""
77    input: str
78    future: concurrent.futures.Future
79    output: str
80    stdout: str
81    stderr: str
82    stdout_check_task: Optional[concurrent.futures.Future] = None
83    result_object: Optional[Any] = None
84    exception_text: Optional[str] = None
85
86    @property
87    def is_running(self):
88        return not self.future.done()
89
90    def update_stdout(self, text: Optional[str]):
91        if text:
92            self.stdout = text
93
94    def update_stderr(self, text: Optional[str]):
95        if text:
96            self.stderr = text
97
98
99class ReplPane(WindowPane):
100    """Pane for reading Python input."""
101
102    # pylint: disable=too-many-instance-attributes,too-many-public-methods
103    def __init__(
104        self,
105        application: 'ConsoleApp',
106        python_repl: PwPtPythonRepl,
107        pane_title: str = 'Python Repl',
108        startup_message: Optional[str] = None,
109    ) -> None:
110        super().__init__(application, pane_title)
111
112        self.executed_code: List = []
113        self.application = application
114
115        self.pw_ptpython_repl = python_repl
116        self.pw_ptpython_repl.set_repl_pane(self)
117
118        self.wrap_output_lines = True
119
120        self.startup_message = startup_message if startup_message else ''
121
122        self.output_field = TextArea(
123            text=self.startup_message,
124            focusable=True,
125            focus_on_click=True,
126            scrollbar=True,
127            wrap_lines=Condition(lambda: self.wrap_output_lines),
128            lexer=PygmentsLexer(PythonConsoleLexer),
129        )
130
131        # Additional keybindings for the text area.
132        key_bindings = KeyBindings()
133        register = self.application.prefs.register_keybinding
134
135        @register('python-repl.copy-output-selection', key_bindings)
136        def _copy_selection(_event: KeyPressEvent) -> None:
137            """Copy selected text."""
138            self.copy_output_selection()
139
140        self.output_field.control.key_bindings = key_bindings
141
142        # Override output buffer mouse wheel scroll
143        self.output_field.window._scroll_up = (  # type: ignore
144            self.scroll_output_up)
145        self.output_field.window._scroll_down = (  # type: ignore
146            self.scroll_output_down)
147
148        self.bottom_toolbar = self._create_input_toolbar()
149        self.results_toolbar = self._create_output_toolbar()
150
151        self.progress_state = TASKS_CONTEXTVAR.get()
152
153        # ReplPane root container
154        self.container = ConditionalContainer(
155            FloatContainer(
156                # Horizontal split of all Repl pane sections.
157                WindowPaneHSplit(
158                    self,
159                    [
160                        HSplit(
161                            [
162                                # 1. Repl Output
163                                self.output_field,
164                                # 2. Progress bars if any
165                                ConditionalContainer(
166                                    DynamicContainer(
167                                        self.get_progress_bar_task_container),
168                                    filter=Condition(
169                                        lambda: not self.progress_state.
170                                        all_tasks_complete)),
171                                # 3. Static separator toolbar.
172                                self.results_toolbar,
173                            ],
174                            # Output area only dimensions
175                            height=self.get_output_height,
176                        ),
177                        HSplit(
178                            [
179                                # 3. Repl Input
180                                self.pw_ptpython_repl,
181                                # 4. Bottom toolbar
182                                self.bottom_toolbar,
183                            ],
184                            # Input area only dimensions
185                            height=self.get_input_height,
186                        ),
187                    ],
188                    # Repl pane dimensions
189                    height=lambda: self.height,
190                    width=lambda: self.width,
191                    style=functools.partial(pw_console.style.get_pane_style,
192                                            self),
193                ),
194                floats=[]),
195            filter=Condition(lambda: self.show_pane))
196
197    def toggle_wrap_output_lines(self):
198        """Enable or disable output line wraping/truncation."""
199        self.wrap_output_lines = not self.wrap_output_lines
200
201    def scroll_output_down(self) -> None:
202        """Scroll the output buffer down on mouse wheel down events."""
203        for _i in range(_REPL_OUTPUT_SCROLL_AMOUNT):
204            # There is no move cursor more than one line at a time function.
205            self.output_field.control.move_cursor_down()
206        self.output_field.window.vertical_scroll += _REPL_OUTPUT_SCROLL_AMOUNT
207
208    def scroll_output_up(self) -> None:
209        """Scroll the output buffer up on mouse wheel up events."""
210        for _i in range(_REPL_OUTPUT_SCROLL_AMOUNT):
211            # There is no move cursor more than one line at a time function.
212            self.output_field.control.move_cursor_up()
213        self.output_field.window.vertical_scroll -= _REPL_OUTPUT_SCROLL_AMOUNT
214
215    def focus_output(self):
216        self.application.focus_on_container(self.output_field)
217
218    def focus_input(self):
219        self.application.focus_on_container(self.pw_ptpython_repl)
220
221    def get_progress_bar_task_container(self):
222        bar_container = self.progress_state.get_container()
223        if bar_container:
224            return bar_container
225        return Window()
226
227    def get_output_height(self) -> AnyDimension:
228        # pylint: disable=no-self-use
229        return Dimension(min=1)
230
231    def get_input_height(self) -> AnyDimension:
232        desired_max_height = 10
233        # Check number of line breaks in the input buffer.
234        input_line_count = self.pw_ptpython_repl.line_break_count()
235        if input_line_count > desired_max_height:
236            desired_max_height = input_line_count
237        # Check if it's taller than the available space
238        if desired_max_height > self.current_pane_height:
239            # Leave space for minimum of
240            #   1 line of content in the output
241            #   + 1 for output toolbar
242            #   + 1 for input toolbar
243            desired_max_height = self.current_pane_height - 3
244
245        if desired_max_height > 1:
246            return Dimension(min=1, max=desired_max_height)
247        # Fall back to at least a height of 1
248        return Dimension(min=1)
249
250    def _create_input_toolbar(self):
251        bottom_toolbar = WindowPaneToolbar(
252            self,
253            focus_action_callable=self.focus_input,
254            focus_check_container=self.pw_ptpython_repl,
255        )
256        bottom_toolbar.add_button(
257            ToolbarButton('Ctrl-v', 'Paste',
258                          self.paste_system_clipboard_to_input_buffer))
259        bottom_toolbar.add_button(
260            ToolbarButton('Ctrl-c', 'Copy / Clear',
261                          self.copy_or_clear_input_buffer))
262        bottom_toolbar.add_button(ToolbarButton('Enter', 'Run', self.run_code))
263        bottom_toolbar.add_button(ToolbarButton('F2', 'Settings'))
264        bottom_toolbar.add_button(ToolbarButton('F3', 'History'))
265        return bottom_toolbar
266
267    def _create_output_toolbar(self):
268        results_toolbar = WindowPaneToolbar(
269            self,
270            title='Python Results',
271            focus_action_callable=self.focus_output,
272            focus_check_container=self.output_field,
273            include_resize_handle=False,
274        )
275        results_toolbar.add_button(
276            ToolbarButton(description='Wrap lines',
277                          mouse_handler=self.toggle_wrap_output_lines,
278                          is_checkbox=True,
279                          checked=lambda: self.wrap_output_lines))
280        results_toolbar.add_button(
281            ToolbarButton('Ctrl-Alt-c', 'Copy All Output',
282                          self.copy_all_output_text))
283        results_toolbar.add_button(
284            ToolbarButton('Ctrl-c', 'Copy Selected Text',
285                          self.copy_output_selection))
286
287        results_toolbar.add_button(
288            ToolbarButton('Shift+Arrows / Mouse Drag', 'Select Text'))
289
290        return results_toolbar
291
292    def copy_output_selection(self):
293        """Copy highlighted output text to the system clipboard."""
294        clipboard_data = self.output_field.buffer.copy_selection()
295        self.application.application.clipboard.set_data(clipboard_data)
296
297    def copy_input_selection(self):
298        """Copy highlighted input text to the system clipboard."""
299        clipboard_data = self.pw_ptpython_repl.default_buffer.copy_selection()
300        self.application.application.clipboard.set_data(clipboard_data)
301
302    def copy_all_output_text(self):
303        """Copy all text in the Python output to the system clipboard."""
304        self.application.application.clipboard.set_text(
305            self.output_field.buffer.text)
306
307    def copy_all_input_text(self):
308        """Copy all text in the Python input to the system clipboard."""
309        self.application.application.clipboard.set_text(
310            self.pw_ptpython_repl.default_buffer.text)
311
312    # pylint: disable=no-self-use
313    def get_all_key_bindings(self) -> List:
314        """Return all keybinds for this plugin."""
315        # ptpython native bindings:
316        # return [load_python_bindings(self.pw_ptpython_repl)]
317
318        # Hand-crafted bindings for display in the HelpWindow:
319        return [{
320            'Execute code': ['Enter', 'Option-Enter', 'Alt-Enter'],
321            'Reverse search history': ['Ctrl-r'],
322            'Erase input buffer.': ['Ctrl-c'],
323            'Show settings.': ['F2'],
324            'Show history.': ['F3'],
325        }]
326
327    def get_all_menu_options(self):
328        return []
329
330    def run_code(self):
331        """Trigger a repl code execution on mouse click."""
332        self.pw_ptpython_repl.default_buffer.validate_and_handle()
333
334    def ctrl_c(self):
335        """Ctrl-C keybinding behavior."""
336        # If there is text in the input buffer
337        if self.pw_ptpython_repl.default_buffer.text:
338            self.copy_or_clear_input_buffer()
339        else:
340            self.interrupt_last_code_execution()
341
342    def paste_system_clipboard_to_input_buffer(self, erase_buffer=False):
343        if erase_buffer:
344            self.clear_input_buffer()
345
346        clip_data = self.application.application.clipboard.get_data()
347        self.pw_ptpython_repl.default_buffer.paste_clipboard_data(clip_data)
348
349    def clear_input_buffer(self):
350        # Erase input buffer.
351        self.pw_ptpython_repl.default_buffer.reset()
352        # Clear any displayed function signatures.
353        self.pw_ptpython_repl.on_reset()
354
355    def copy_or_clear_input_buffer(self):
356        # Copy selected text if a selection is active.
357        if self.pw_ptpython_repl.default_buffer.selection_state:
358            self.copy_input_selection()
359            return
360        # Otherwise, clear the input buffer
361        self.clear_input_buffer()
362
363    def interrupt_last_code_execution(self):
364        code = self._get_currently_running_code()
365        if code:
366            code.future.cancel()
367            code.output = 'Canceled'
368            self.progress_state.cancel_all_tasks()
369        self.pw_ptpython_repl.clear_last_result()
370        self.update_output_buffer('repl_pane.interrupt_last_code_execution')
371
372    def _get_currently_running_code(self):
373        for code in self.executed_code:
374            if not code.future.done():
375                return code
376        return None
377
378    def _get_executed_code(self, future):
379        for code in self.executed_code:
380            if code.future == future:
381                return code
382        return None
383
384    def _log_executed_code(self, code, prefix=''):
385        """Log repl command input text along with a prefix string."""
386        text = self.get_output_buffer_text([code], show_index=False)
387        text = text.strip()
388        for line in text.splitlines():
389            _LOG.debug('[PYTHON %s]  %s', prefix, line.strip())
390
391    async def periodically_check_stdout(self, user_code: UserCodeExecution,
392                                        stdout_proxy, stderr_proxy):
393        while not user_code.future.done():
394            await asyncio.sleep(0.3)
395            stdout_text_so_far = stdout_proxy.getvalue()
396            stderr_text_so_far = stderr_proxy.getvalue()
397            if stdout_text_so_far:
398                user_code.update_stdout(stdout_text_so_far)
399            if stderr_text_so_far:
400                user_code.update_stderr(stderr_text_so_far)
401
402            # if stdout_text_so_far or stderr_text_so_far:
403            self.update_output_buffer('repl_pane.periodic_check')
404
405    def append_executed_code(self, text, future, temp_stdout, temp_stderr):
406        user_code = UserCodeExecution(input=text,
407                                      future=future,
408                                      output=None,
409                                      stdout=None,
410                                      stderr=None)
411
412        background_stdout_check = asyncio.create_task(
413            self.periodically_check_stdout(user_code, temp_stdout,
414                                           temp_stderr))
415        user_code.stdout_check_task = background_stdout_check
416        self.executed_code.append(user_code)
417        self._log_executed_code(user_code, prefix='START')
418
419    def append_result_to_executed_code(
420        self,
421        _input_text,
422        future,
423        result_text,
424        stdout_text='',
425        stderr_text='',
426        exception_text='',
427        result_object=None,
428    ):
429
430        code = self._get_executed_code(future)
431        if code:
432            code.output = result_text
433            code.stdout = stdout_text
434            code.stderr = stderr_text
435            code.exception_text = exception_text
436            code.result_object = result_object
437        self._log_executed_code(code, prefix='FINISH')
438        self.update_output_buffer('repl_pane.append_result_to_executed_code')
439
440    def get_output_buffer_text(self, code_items=None, show_index=True):
441        content_width = (self.current_pane_width
442                         if self.current_pane_width else 80)
443        pprint_respecting_width = pprint.PrettyPrinter(
444            indent=2, width=content_width).pformat
445
446        executed_code = code_items or self.executed_code
447
448        template = self.application.get_template('repl_output.jinja')
449        return template.render(code_items=executed_code,
450                               result_format=pprint_respecting_width,
451                               show_index=show_index)
452
453    def update_output_buffer(self, *unused_args):
454        text = self.get_output_buffer_text()
455        # Add an extra line break so the last cursor position is in column 0
456        # instead of the end of the last line.
457        text += '\n'
458        self.output_field.buffer.set_document(
459            Document(text=text, cursor_position=len(text)))
460
461        self.application.redraw_ui()
462
463    def input_or_output_has_focus(self) -> Condition:
464        @Condition
465        def test() -> bool:
466            if has_focus(self.output_field)() or has_focus(
467                    self.pw_ptpython_repl)():
468                return True
469            return False
470
471        return test
472