• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""PwPtPythonPane class."""
15
16import asyncio
17import functools
18import io
19import logging
20import sys
21from typing import Iterable, Optional, TYPE_CHECKING
22
23from prompt_toolkit.buffer import Buffer
24from prompt_toolkit.layout.controls import BufferControl
25from prompt_toolkit.completion import merge_completers
26from prompt_toolkit.filters import (
27    Condition,
28    has_focus,
29    to_filter,
30)
31from ptpython.completer import (  # type: ignore
32    CompletePrivateAttributes, PythonCompleter,
33)
34import ptpython.repl  # type: ignore
35from ptpython.layout import (  # type: ignore
36    CompletionVisualisation, Dimension,
37)
38
39import pw_console.text_formatting
40
41if TYPE_CHECKING:
42    from pw_console.repl_pane import ReplPane
43
44_LOG = logging.getLogger(__package__)
45
46
47class MissingPtpythonBufferControl(Exception):
48    """Exception for a missing ptpython BufferControl object."""
49
50
51class PwPtPythonRepl(ptpython.repl.PythonRepl):  # pylint: disable=too-many-instance-attributes
52    """A ptpython repl class with changes to code execution and output related
53    methods."""
54    def __init__(
55        self,
56        *args,
57        # pw_console specific kwargs
58        extra_completers: Optional[Iterable] = None,
59        **ptpython_kwargs,
60    ):
61
62        completer = None
63        if extra_completers:
64            # Create the default python completer used by
65            # ptpython.repl.PythonRepl
66            python_completer = PythonCompleter(
67                # No self.get_globals yet so this must be a lambda
68                # pylint: disable=unnecessary-lambda
69                lambda: self.get_globals(),
70                lambda: self.get_locals(),
71                lambda: self.enable_dictionary_completion,  # type: ignore
72            )
73
74            all_completers = [python_completer]
75            all_completers.extend(extra_completers)
76            # Merge default Python completer with the new custom one.
77            completer = merge_completers(all_completers)
78
79        super().__init__(
80            *args,
81            create_app=False,
82            # Absolute minimum height of 1
83            _input_buffer_height=Dimension(min=1),
84            _completer=completer,
85            **ptpython_kwargs,
86        )
87
88        self.enable_mouse_support: bool = True
89        self.enable_history_search: bool = True
90        self.enable_dictionary_completion: bool = True
91        self._set_pt_python_input_buffer_control_focusable()
92
93        # Change some ptpython.repl defaults.
94        self.show_status_bar = False
95        self.show_exit_confirmation = False
96        self.complete_private_attributes = (
97            CompletePrivateAttributes.IF_NO_PUBLIC)
98
99        # Function signature that shows args, kwargs, and types under the cursor
100        # of the input window.
101        self.show_signature: bool = True
102        # Docstring of the current completed function that appears at the bottom
103        # of the input window.
104        self.show_docstring: bool = False
105
106        # Turn off the completion menu in ptpython. The CompletionsMenu in
107        # ConsoleApp.root_container will handle this.
108        self.completion_visualisation: CompletionVisualisation = (
109            CompletionVisualisation.NONE)
110
111        # Additional state variables.
112        self.repl_pane: 'Optional[ReplPane]' = None
113        self._last_result = None
114        self._last_exception = None
115
116    def _set_pt_python_input_buffer_control_focusable(self) -> None:
117        """Enable focus_on_click for ptpython's input buffer."""
118        error_message = (
119            'Unable to find ptpythons BufferControl input object.\n'
120            '  For the last known position see:\n'
121            '  https://github.com/prompt-toolkit/ptpython/'
122            'blob/6072174eace5b645b0cfd5b21b4c237e2539f577/'
123            'ptpython/layout.py#L598\n'
124            '\n'
125            'The installed version of ptpython may not be compatible with'
126            ' pw console; please try re-running environment setup.')
127
128        try:
129            # Fetch the Window's BufferControl object.
130            # From ptpython/layout.py:
131            #   self.root_container = HSplit([
132            #     VSplit([
133            #       HSplit([
134            #         FloatContainer(
135            #           content=HSplit(
136            #             [create_python_input_window()] + extra_body
137            #           ), ...
138            ptpython_buffer_control = (
139                self.ptpython_layout.root_container.children[0].children[0].
140                children[0].content.children[0].content)
141            # This should be a BufferControl instance
142            if not isinstance(ptpython_buffer_control, BufferControl):
143                raise MissingPtpythonBufferControl(error_message)
144            # Enable focus options
145            ptpython_buffer_control.focusable = to_filter(True)
146            ptpython_buffer_control.focus_on_click = to_filter(True)
147        except IndexError as _error:
148            raise MissingPtpythonBufferControl(error_message)
149
150    def __pt_container__(self):
151        """Return the prompt_toolkit root container for class.
152
153        This allows self to be used wherever prompt_toolkit expects a container
154        object."""
155        return self.ptpython_layout.root_container
156
157    def set_repl_pane(self, repl_pane):
158        """Update the parent pw_console.ReplPane reference."""
159        self.repl_pane = repl_pane
160
161    def _save_result(self, formatted_text):
162        """Save the last repl execution result."""
163        unformatted_result = pw_console.text_formatting.remove_formatting(
164            formatted_text)
165        self._last_result = unformatted_result
166
167    def _save_exception(self, formatted_text):
168        """Save the last repl exception."""
169        unformatted_result = pw_console.text_formatting.remove_formatting(
170            formatted_text)
171        self._last_exception = unformatted_result
172
173    def clear_last_result(self):
174        """Erase the last repl execution result."""
175        self._last_result = None
176        self._last_exception = None
177
178    def show_result(self, result):
179        """Format and save output results.
180
181        This function is called from the _run_user_code() function which is
182        always run from the user code thread, within
183        .run_and_show_expression_async().
184        """
185        formatted_result = self._format_result_output(result)
186        self._save_result(formatted_result)
187
188    def _handle_exception(self, e: BaseException) -> None:
189        """Format and save output results.
190
191        This function is called from the _run_user_code() function which is
192        always run from the user code thread, within
193        .run_and_show_expression_async().
194        """
195        formatted_result = self._format_exception_output(e)
196        self._save_exception(formatted_result.__pt_formatted_text__())
197
198    def user_code_complete_callback(self, input_text, future):
199        """Callback to run after user repl code is finished."""
200        # If there was an exception it will be saved in self._last_result
201        result_text = self._last_result
202        result_object = None
203        exception_text = self._last_exception
204
205        # _last_results consumed, erase for the next run.
206        self.clear_last_result()
207
208        stdout_contents = None
209        stderr_contents = None
210        if future.result():
211            future_result = future.result()
212            stdout_contents = future_result['stdout']
213            stderr_contents = future_result['stderr']
214            result_object = future_result['result']
215
216            if result_object is not None:
217                # Use ptpython formatted results:
218                formatted_result = self._format_result_output(result_object)
219                result_text = pw_console.text_formatting.remove_formatting(
220                    formatted_result)
221
222        # Job is finished, append the last result.
223        self.repl_pane.append_result_to_executed_code(
224            input_text,
225            future,
226            result_text,
227            stdout_contents,
228            stderr_contents,
229            exception_text=exception_text,
230            result_object=result_object,
231        )
232
233        # Rebuild output buffer.
234        self.repl_pane.update_output_buffer(
235            'pw_ptpython_repl.user_code_complete_callback')
236
237        # Trigger a prompt_toolkit application redraw.
238        self.repl_pane.application.application.invalidate()
239
240    async def _run_user_code(self, text, stdout_proxy, stdin_proxy):
241        """Run user code and capture stdout+err.
242
243        This fuction should be run in a separate thread from the main
244        prompt_toolkit application."""
245        # NOTE: This function runs in a separate thread using the asyncio event
246        # loop defined by self.repl_pane.application.user_code_loop. Patching
247        # stdout here will not effect the stdout used by prompt_toolkit and the
248        # main user interface.
249
250        # Patch stdout and stderr to capture repl print() statements.
251        original_stdout = sys.stdout
252        original_stderr = sys.stderr
253
254        sys.stdout = stdout_proxy
255        sys.stderr = stdin_proxy
256
257        # Run user repl code
258        try:
259            result = await self.run_and_show_expression_async(text)
260        finally:
261            # Always restore original stdout and stderr
262            sys.stdout = original_stdout
263            sys.stderr = original_stderr
264
265        # Save the captured output
266        stdout_contents = stdout_proxy.getvalue()
267        stderr_contents = stdin_proxy.getvalue()
268
269        return {
270            'stdout': stdout_contents,
271            'stderr': stderr_contents,
272            'result': result
273        }
274
275    def _accept_handler(self, buff: Buffer) -> bool:
276        """Function executed when pressing enter in the ptpython.repl.PythonRepl
277        input buffer."""
278        # Do nothing if no text is entered.
279        if len(buff.text) == 0:
280            return False
281        if self.repl_pane is None:
282            return False
283
284        repl_input_text = buff.text
285        # Exit if quit or exit
286        if repl_input_text.strip() in ['quit', 'quit()', 'exit', 'exit()']:
287            self.repl_pane.application.application.exit()  # type: ignore
288
289        # Create stdout and stderr proxies
290        temp_stdout = io.StringIO()
291        temp_stderr = io.StringIO()
292
293        # The help() command with no args uses it's own interactive prompt which
294        # will not work if prompt_toolkit is running.
295        if repl_input_text.strip() in ['help()']:
296            # Run nothing
297            repl_input_text = ''
298            # Override stdout
299            temp_stdout.write(
300                'Error: Interactive help() is not compatible with this repl.')
301
302        # Execute the repl code in the the separate user_code thread loop.
303        future = asyncio.run_coroutine_threadsafe(
304            # This function will be executed in a separate thread.
305            self._run_user_code(repl_input_text, temp_stdout, temp_stderr),
306            # Using this asyncio event loop.
307            self.repl_pane.application.user_code_loop)  # type: ignore
308
309        # Save the input text and future object.
310        self.repl_pane.append_executed_code(repl_input_text, future,
311                                            temp_stdout,
312                                            temp_stderr)  # type: ignore
313
314        # Run user_code_complete_callback() when done.
315        done_callback = functools.partial(self.user_code_complete_callback,
316                                          repl_input_text)
317        future.add_done_callback(done_callback)
318
319        # Rebuild the parent ReplPane output buffer.
320        self.repl_pane.update_output_buffer('pw_ptpython_repl._accept_handler')
321
322        # TODO(tonymd): Return True if exception is found?
323        # Don't keep input for now. Return True to keep input text.
324        return False
325
326    def line_break_count(self) -> int:
327        return self.default_buffer.text.count('\n')
328
329    def input_empty_if_in_focus_condition(self) -> Condition:
330        @Condition
331        def test() -> bool:
332            if has_focus(self)() and len(self.default_buffer.text) == 0:
333                return True
334            return not has_focus(self)()
335
336        return test
337