• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""PwPtPythonPane class."""
15
16from __future__ import annotations
17
18import asyncio
19import functools
20import io
21import logging
22import os
23import sys
24import shlex
25import subprocess
26from typing import Any, Iterable, TYPE_CHECKING
27from unittest.mock import patch
28
29# inclusive-language: disable
30from prompt_toolkit.input import DummyInput as IgnoredInput
31
32# inclusive-language: enable
33
34from prompt_toolkit.output.plain_text import PlainTextOutput
35from prompt_toolkit.buffer import Buffer
36from prompt_toolkit.layout.controls import BufferControl
37from prompt_toolkit.completion import merge_completers
38from prompt_toolkit.filters import (
39    Condition,
40    has_focus,
41    to_filter,
42)
43from prompt_toolkit.formatted_text import StyleAndTextTuples
44
45from ptpython.completer import (  # type: ignore
46    CompletePrivateAttributes,
47    PythonCompleter,
48)
49from ptpython.printer import OutputPrinter
50import ptpython.repl  # type: ignore
51from ptpython.layout import (  # type: ignore
52    CompletionVisualisation,
53    Dimension,
54)
55import pygments.plugin
56
57from pw_console.pigweed_code_style import (
58    PigweedCodeStyle,
59    PigweedCodeLightStyle,
60)
61from pw_console.text_formatting import remove_formatting
62
63if TYPE_CHECKING:
64    from pw_console.repl_pane import ReplPane
65
66_LOG = logging.getLogger(__package__)
67_SYSTEM_COMMAND_LOG = logging.getLogger('pw_console_system_command')
68
69_original_find_plugin_styles = pygments.plugin.find_plugin_styles
70
71
72def _wrapped_find_plugin_styles():
73    """Patch pygment find_plugin_styles to also include Pigweed codes styles
74
75    This allows using these themes without requiring Python entrypoints.
76    """
77    for style in [
78        ('pigweed-code', PigweedCodeStyle),
79        ('pigweed-code-light', PigweedCodeLightStyle),
80    ]:
81        yield style
82    yield from _original_find_plugin_styles()
83
84
85class MissingPtpythonBufferControl(Exception):
86    """Exception for a missing ptpython BufferControl object."""
87
88
89def _user_input_is_a_shell_command(text: str) -> bool:
90    return text.startswith('!')
91
92
93class PwPtPythonRepl(
94    ptpython.repl.PythonRepl
95):  # pylint: disable=too-many-instance-attributes
96    """A ptpython repl class with changes to code execution and output related
97    methods."""
98
99    @patch(
100        'pygments.styles.find_plugin_styles', new=_wrapped_find_plugin_styles
101    )
102    def __init__(
103        self,
104        *args,
105        # pw_console specific kwargs
106        extra_completers: Iterable | None = None,
107        **ptpython_kwargs,
108    ):
109        completer = None
110        if extra_completers:
111            # Create the default python completer used by
112            # ptpython.repl.PythonRepl
113            python_completer = PythonCompleter(
114                # No self.get_globals yet so this must be a lambda
115                # pylint: disable=unnecessary-lambda
116                lambda: self.get_globals(),
117                lambda: self.get_locals(),
118                lambda: self.enable_dictionary_completion,  # type: ignore
119            )
120
121            all_completers = [python_completer]
122            all_completers.extend(extra_completers)
123            # Merge default Python completer with the new custom one.
124            completer = merge_completers(all_completers)
125
126        super().__init__(
127            *args,
128            create_app=False,
129            # Absolute minimum height of 1
130            _input_buffer_height=Dimension(min=1),
131            _completer=completer,
132            **ptpython_kwargs,
133        )
134
135        self.enable_mouse_support: bool = True
136        self.enable_history_search: bool = True
137        self.enable_dictionary_completion: bool = True
138        self._set_pt_python_input_buffer_control_focusable()
139
140        # Change some ptpython.repl defaults.
141        self.show_status_bar = False
142        self.show_exit_confirmation = False
143        self.complete_private_attributes = CompletePrivateAttributes.NEVER
144
145        # Function signature that shows args, kwargs, and types under the cursor
146        # of the input window.
147        self.show_signature: bool = True
148        # Docstring of the current completed function that appears at the bottom
149        # of the input window.
150        self.show_docstring: bool = False
151
152        # Turn off the completion menu in ptpython. The CompletionsMenu in
153        # ConsoleApp.root_container will handle this.
154        self.completion_visualisation: CompletionVisualisation = (
155            CompletionVisualisation.NONE
156        )
157
158        # Additional state variables.
159        self.repl_pane: ReplPane | None = None
160        self._last_result = None
161        self._last_exception = None
162
163    def _set_pt_python_input_buffer_control_focusable(self) -> None:
164        """Enable focus_on_click for ptpython's input buffer."""
165        error_message = (
166            'Unable to find ptpythons BufferControl input object.\n'
167            '  For the last known position see:\n'
168            '  https://github.com/prompt-toolkit/ptpython/'
169            'blob/6072174eace5b645b0cfd5b21b4c237e2539f577/'
170            'ptpython/layout.py#L598\n'
171            '\n'
172            'The installed version of ptpython may not be compatible with'
173            ' pw console; please try re-running environment setup.'
174        )
175
176        try:
177            # Fetch the Window's BufferControl object.
178            # From ptpython/layout.py:
179            #   self.root_container = HSplit([
180            #     VSplit([
181            #       HSplit([
182            #         FloatContainer(
183            #           content=HSplit(
184            #             [create_python_input_window()] + extra_body
185            #           ), ...
186            ptpython_buffer_control = (
187                self.ptpython_layout.root_container.children[0]  # type: ignore
188                .children[0]
189                .children[0]
190                .content.children[0]
191                .content
192            )
193            # This should be a BufferControl instance
194            if not isinstance(ptpython_buffer_control, BufferControl):
195                raise MissingPtpythonBufferControl(error_message)
196            # Enable focus options
197            ptpython_buffer_control.focusable = to_filter(True)
198            ptpython_buffer_control.focus_on_click = to_filter(True)
199        except IndexError as _error:
200            raise MissingPtpythonBufferControl(error_message)
201
202    def __pt_container__(self):
203        """Return the prompt_toolkit root container for class.
204
205        This allows self to be used wherever prompt_toolkit expects a container
206        object."""
207        return self.ptpython_layout.root_container
208
209    def set_repl_pane(self, repl_pane):
210        """Update the parent pw_console.ReplPane reference."""
211        self.repl_pane = repl_pane
212
213    def _save_result(self, formatted_text):
214        """Save the last repl execution result."""
215        unformatted_result = formatted_text
216        self._last_result = unformatted_result
217
218    def _save_exception(self, formatted_text):
219        """Save the last repl exception."""
220        unformatted_result = remove_formatting(formatted_text)
221        self._last_exception = unformatted_result
222
223    def clear_last_result(self):
224        """Erase the last repl execution result."""
225        self._last_result = None
226        self._last_exception = None
227
228    def _format_result_output(self, result: Any) -> str | None:
229        """Return a plaintext repr of any object."""
230        try:
231            formatted_result = repr(result)
232        except BaseException as e:  # pylint: disable=broad-exception-caught
233            self._handle_exception(e)
234            formatted_result = None
235        return formatted_result
236
237    def _show_result(self, result: Any):
238        """Format and save output results.
239
240        This function is called from the _run_user_code() function which is
241        always run from the user code thread, within
242        .run_and_show_expression_async().
243        """
244        self._save_result(self._format_result_output(result))
245
246    def _get_output_printer(self) -> OutputPrinter:
247        return OutputPrinter(
248            output=PlainTextOutput(io.StringIO()),
249            input=IgnoredInput(),
250            style=self._current_style,
251            style_transformation=self.style_transformation,
252            title=self.title,
253        )
254
255    def _format_exception_output(self, e: BaseException) -> StyleAndTextTuples:
256        output_printer = self._get_output_printer()
257        formatted_result = output_printer._format_exception_output(  # pylint: disable=protected-access
258            e, highlight=False
259        )
260        return list(formatted_result)
261
262    def _handle_exception(self, e: BaseException) -> None:
263        """Format and save output results.
264
265        This function is called from the _run_user_code() function which is
266        always run from the user code thread, within
267        .run_and_show_expression_async().
268        """
269        self._save_exception(self._format_exception_output(e))
270
271    async def run_and_show_expression_async(self, text: str) -> Any:
272        """Run user code and handle the result.
273
274        This function is similar to ptpython version v3.0.23.
275        """
276        loop = asyncio.get_event_loop()
277
278        try:
279            result = await self.eval_async(text)
280        except KeyboardInterrupt:
281            raise
282        except SystemExit:
283            return
284        except BaseException as e:  # pylint: disable=broad-exception-caught
285            self._handle_exception(e)
286        else:
287            # Print.
288            if result is not None:
289                await loop.run_in_executor(
290                    None, lambda: self._show_result(result)
291                )
292
293            self.current_statement_index += 1
294            self.signatures = []
295            return result
296
297    def user_code_complete_callback(self, input_text, future):
298        """Callback to run after user repl code is finished."""
299        # If there was an exception it will be saved in self._last_result
300        result_text = self._last_result
301        result_object = None
302        exception_text = self._last_exception
303
304        # _last_results consumed, erase for the next run.
305        self.clear_last_result()
306
307        stdout_contents = None
308        stderr_contents = None
309        if future.result():
310            future_result = future.result()
311            stdout_contents = future_result['stdout']
312            stderr_contents = future_result['stderr']
313            result_object = future_result['result']
314
315            if result_object is not None:
316                # Use ptpython formatted results:
317                result_text = self._format_result_output(result_object)
318
319        # Job is finished, append the last result.
320        self.repl_pane.append_result_to_executed_code(
321            input_text,
322            future,
323            result_text,
324            stdout_contents,
325            stderr_contents,
326            exception_text=exception_text,
327            result_object=result_object,
328        )
329
330        # Rebuild output buffer.
331        self.repl_pane.update_output_buffer(
332            'pw_ptpython_repl.user_code_complete_callback'
333        )
334
335        # Trigger a prompt_toolkit application redraw.
336        self.repl_pane.application.application.invalidate()
337
338    async def _run_system_command(  # pylint: disable=no-self-use
339        self, text, stdout_proxy, _stdin_proxy
340    ) -> int:
341        """Run a shell command and print results to the repl."""
342        command = shlex.split(text)
343        returncode = None
344        env = os.environ.copy()
345        # Force colors in Pigweed subcommands and some terminal apps.
346        env['PW_USE_COLOR'] = '1'
347        env['CLICOLOR_FORCE'] = '1'
348
349        def _handle_output(output):
350            # Force tab characters to 8 spaces to prevent \t from showing in
351            # prompt_toolkit.
352            output = output.replace('\t', '        ')
353            # Strip some ANSI sequences that don't render.
354            output = output.replace('\x1b(B\x1b[m', '')
355            output = output.replace('\x1b[1m', '')
356            stdout_proxy.write(output)
357            _SYSTEM_COMMAND_LOG.info(output.rstrip())
358
359        with subprocess.Popen(
360            command,
361            env=env,
362            stdout=subprocess.PIPE,
363            stderr=subprocess.STDOUT,
364            errors='replace',
365        ) as proc:
366            # Print the command
367            _SYSTEM_COMMAND_LOG.info('')
368            _SYSTEM_COMMAND_LOG.info('$ %s', text)
369            while returncode is None:
370                if not proc.stdout:
371                    continue
372
373                # Check for one line and update.
374                output = proc.stdout.readline()
375                _handle_output(output)
376
377                returncode = proc.poll()
378
379            # Print any remaining lines.
380            for output in proc.stdout.readlines():
381                _handle_output(output)
382
383        return returncode
384
385    async def _run_user_code(self, text, stdout_proxy, stdin_proxy):
386        """Run user code and capture stdout+err.
387
388        This fuction should be run in a separate thread from the main
389        prompt_toolkit application."""
390        # NOTE: This function runs in a separate thread using the asyncio event
391        # loop defined by self.repl_pane.application.user_code_loop. Patching
392        # stdout here will not effect the stdout used by prompt_toolkit and the
393        # main user interface.
394
395        # Patch stdout and stderr to capture repl print() statements.
396        original_stdout = sys.stdout
397        original_stderr = sys.stderr
398
399        sys.stdout = stdout_proxy
400        sys.stderr = stdin_proxy
401
402        # Run user repl code
403        try:
404            if _user_input_is_a_shell_command(text):
405                result = await self._run_system_command(
406                    text[1:], stdout_proxy, stdin_proxy
407                )
408            else:
409                result = await self.run_and_show_expression_async(text)
410        finally:
411            # Always restore original stdout and stderr
412            sys.stdout = original_stdout
413            sys.stderr = original_stderr
414
415        # Save the captured output
416        stdout_contents = stdout_proxy.getvalue()
417        stderr_contents = stdin_proxy.getvalue()
418
419        return {
420            'stdout': stdout_contents,
421            'stderr': stderr_contents,
422            'result': result,
423        }
424
425    def _accept_handler(self, buff: Buffer) -> bool:
426        """Function executed when pressing enter in the ptpython.repl.PythonRepl
427        input buffer."""
428        # Do nothing if no text is entered.
429        if len(buff.text) == 0:
430            return False
431        if self.repl_pane is None:
432            return False
433
434        repl_input_text = buff.text
435        # Exit if quit or exit
436        if repl_input_text.strip() in ['quit', 'quit()', 'exit', 'exit()']:
437            self.repl_pane.application.application.exit()  # type: ignore
438
439        # Create stdout and stderr proxies
440        temp_stdout = io.StringIO()
441        temp_stderr = io.StringIO()
442
443        # The help() command with no args uses it's own interactive prompt which
444        # will not work if prompt_toolkit is running.
445        if repl_input_text.strip() in ['help()']:
446            # Run nothing
447            repl_input_text = ''
448            # Override stdout
449            temp_stdout.write(
450                'Error: Interactive help() is not compatible with this repl.'
451            )
452
453        # Pop open the system command log pane for shell commands.
454        if _user_input_is_a_shell_command(repl_input_text):
455            self.repl_pane.application.setup_command_runner_log_pane()
456
457        # Execute the repl code in the the separate user_code thread loop.
458        future = asyncio.run_coroutine_threadsafe(
459            # This function will be executed in a separate thread.
460            self._run_user_code(repl_input_text, temp_stdout, temp_stderr),
461            # Using this asyncio event loop.
462            self.repl_pane.application.user_code_loop,
463        )  # type: ignore
464
465        # Save the input text and future object.
466        self.repl_pane.append_executed_code(
467            repl_input_text, future, temp_stdout, temp_stderr
468        )  # type: ignore
469
470        # Run user_code_complete_callback() when done.
471        done_callback = functools.partial(
472            self.user_code_complete_callback, repl_input_text
473        )
474        future.add_done_callback(done_callback)
475
476        # Rebuild the parent ReplPane output buffer.
477        self.repl_pane.update_output_buffer('pw_ptpython_repl._accept_handler')
478
479        # TODO(tonymd): Return True if exception is found?
480        # Don't keep input for now. Return True to keep input text.
481        return False
482
483    def line_break_count(self) -> int:
484        return self.default_buffer.text.count('\n')
485
486    def input_empty_if_in_focus_condition(self) -> Condition:
487        @Condition
488        def test() -> bool:
489            if has_focus(self)() and len(self.default_buffer.text) == 0:
490                return True
491            return not has_focus(self)()
492
493        return test
494