• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""PwPtPythonPane class."""
15
16import asyncio
17import functools
18import io
19import logging
20import os
21import sys
22import shlex
23import subprocess
24from typing import Iterable, Optional, TYPE_CHECKING
25from unittest.mock import patch
26
27from prompt_toolkit.buffer import Buffer
28from prompt_toolkit.layout.controls import BufferControl
29from prompt_toolkit.completion import merge_completers
30from prompt_toolkit.filters import (
31    Condition,
32    has_focus,
33    to_filter,
34)
35from ptpython.completer import (  # type: ignore
36    CompletePrivateAttributes,
37    PythonCompleter,
38)
39import ptpython.repl  # type: ignore
40from ptpython.layout import (  # type: ignore
41    CompletionVisualisation,
42    Dimension,
43)
44import pygments.plugin
45
46from pw_console.pigweed_code_style import (
47    PigweedCodeStyle,
48    PigweedCodeLightStyle,
49)
50from pw_console.text_formatting import remove_formatting
51
52if TYPE_CHECKING:
53    from pw_console.repl_pane import ReplPane
54
55_LOG = logging.getLogger(__package__)
56_SYSTEM_COMMAND_LOG = logging.getLogger('pw_console_system_command')
57
58_original_find_plugin_styles = pygments.plugin.find_plugin_styles
59
60
61def _wrapped_find_plugin_styles():
62    """Patch pygment find_plugin_styles to also include Pigweed codes styles
63
64    This allows using these themes without requiring Python entrypoints.
65    """
66    for style in [
67        ('pigweed-code', PigweedCodeStyle),
68        ('pigweed-code-light', PigweedCodeLightStyle),
69    ]:
70        yield style
71    yield from _original_find_plugin_styles()
72
73
74class MissingPtpythonBufferControl(Exception):
75    """Exception for a missing ptpython BufferControl object."""
76
77
78def _user_input_is_a_shell_command(text: str) -> bool:
79    return text.startswith('!')
80
81
82class PwPtPythonRepl(
83    ptpython.repl.PythonRepl
84):  # pylint: disable=too-many-instance-attributes
85    """A ptpython repl class with changes to code execution and output related
86    methods."""
87
88    @patch(
89        'pygments.styles.find_plugin_styles', new=_wrapped_find_plugin_styles
90    )
91    def __init__(
92        self,
93        *args,
94        # pw_console specific kwargs
95        extra_completers: Optional[Iterable] = None,
96        **ptpython_kwargs,
97    ):
98        completer = None
99        if extra_completers:
100            # Create the default python completer used by
101            # ptpython.repl.PythonRepl
102            python_completer = PythonCompleter(
103                # No self.get_globals yet so this must be a lambda
104                # pylint: disable=unnecessary-lambda
105                lambda: self.get_globals(),
106                lambda: self.get_locals(),
107                lambda: self.enable_dictionary_completion,  # type: ignore
108            )
109
110            all_completers = [python_completer]
111            all_completers.extend(extra_completers)
112            # Merge default Python completer with the new custom one.
113            completer = merge_completers(all_completers)
114
115        super().__init__(
116            *args,
117            create_app=False,
118            # Absolute minimum height of 1
119            _input_buffer_height=Dimension(min=1),
120            _completer=completer,
121            **ptpython_kwargs,
122        )
123
124        self.enable_mouse_support: bool = True
125        self.enable_history_search: bool = True
126        self.enable_dictionary_completion: bool = True
127        self._set_pt_python_input_buffer_control_focusable()
128
129        # Change some ptpython.repl defaults.
130        self.show_status_bar = False
131        self.show_exit_confirmation = False
132        self.complete_private_attributes = (
133            CompletePrivateAttributes.IF_NO_PUBLIC
134        )
135
136        # Function signature that shows args, kwargs, and types under the cursor
137        # of the input window.
138        self.show_signature: bool = True
139        # Docstring of the current completed function that appears at the bottom
140        # of the input window.
141        self.show_docstring: bool = False
142
143        # Turn off the completion menu in ptpython. The CompletionsMenu in
144        # ConsoleApp.root_container will handle this.
145        self.completion_visualisation: CompletionVisualisation = (
146            CompletionVisualisation.NONE
147        )
148
149        # Additional state variables.
150        self.repl_pane: 'Optional[ReplPane]' = None
151        self._last_result = None
152        self._last_exception = None
153
154    def _set_pt_python_input_buffer_control_focusable(self) -> None:
155        """Enable focus_on_click for ptpython's input buffer."""
156        error_message = (
157            'Unable to find ptpythons BufferControl input object.\n'
158            '  For the last known position see:\n'
159            '  https://github.com/prompt-toolkit/ptpython/'
160            'blob/6072174eace5b645b0cfd5b21b4c237e2539f577/'
161            'ptpython/layout.py#L598\n'
162            '\n'
163            'The installed version of ptpython may not be compatible with'
164            ' pw console; please try re-running environment setup.'
165        )
166
167        try:
168            # Fetch the Window's BufferControl object.
169            # From ptpython/layout.py:
170            #   self.root_container = HSplit([
171            #     VSplit([
172            #       HSplit([
173            #         FloatContainer(
174            #           content=HSplit(
175            #             [create_python_input_window()] + extra_body
176            #           ), ...
177            ptpython_buffer_control = (
178                self.ptpython_layout.root_container.children[0]  # type: ignore
179                .children[0]
180                .children[0]
181                .content.children[0]
182                .content
183            )
184            # This should be a BufferControl instance
185            if not isinstance(ptpython_buffer_control, BufferControl):
186                raise MissingPtpythonBufferControl(error_message)
187            # Enable focus options
188            ptpython_buffer_control.focusable = to_filter(True)
189            ptpython_buffer_control.focus_on_click = to_filter(True)
190        except IndexError as _error:
191            raise MissingPtpythonBufferControl(error_message)
192
193    def __pt_container__(self):
194        """Return the prompt_toolkit root container for class.
195
196        This allows self to be used wherever prompt_toolkit expects a container
197        object."""
198        return self.ptpython_layout.root_container
199
200    def set_repl_pane(self, repl_pane):
201        """Update the parent pw_console.ReplPane reference."""
202        self.repl_pane = repl_pane
203
204    def _save_result(self, formatted_text):
205        """Save the last repl execution result."""
206        unformatted_result = remove_formatting(formatted_text)
207        self._last_result = unformatted_result
208
209    def _save_exception(self, formatted_text):
210        """Save the last repl exception."""
211        unformatted_result = remove_formatting(formatted_text)
212        self._last_exception = unformatted_result
213
214    def clear_last_result(self):
215        """Erase the last repl execution result."""
216        self._last_result = None
217        self._last_exception = None
218
219    def show_result(self, result):
220        """Format and save output results.
221
222        This function is called from the _run_user_code() function which is
223        always run from the user code thread, within
224        .run_and_show_expression_async().
225        """
226        formatted_result = self._format_result_output(result)
227        self._save_result(formatted_result)
228
229    def _handle_exception(self, e: BaseException) -> None:
230        """Format and save output results.
231
232        This function is called from the _run_user_code() function which is
233        always run from the user code thread, within
234        .run_and_show_expression_async().
235        """
236        formatted_result = self._format_exception_output(e)
237        self._save_exception(formatted_result.__pt_formatted_text__())
238
239    def user_code_complete_callback(self, input_text, future):
240        """Callback to run after user repl code is finished."""
241        # If there was an exception it will be saved in self._last_result
242        result_text = self._last_result
243        result_object = None
244        exception_text = self._last_exception
245
246        # _last_results consumed, erase for the next run.
247        self.clear_last_result()
248
249        stdout_contents = None
250        stderr_contents = None
251        if future.result():
252            future_result = future.result()
253            stdout_contents = future_result['stdout']
254            stderr_contents = future_result['stderr']
255            result_object = future_result['result']
256
257            if result_object is not None:
258                # Use ptpython formatted results:
259                formatted_result = self._format_result_output(result_object)
260                result_text = remove_formatting(formatted_result)
261
262        # Job is finished, append the last result.
263        self.repl_pane.append_result_to_executed_code(
264            input_text,
265            future,
266            result_text,
267            stdout_contents,
268            stderr_contents,
269            exception_text=exception_text,
270            result_object=result_object,
271        )
272
273        # Rebuild output buffer.
274        self.repl_pane.update_output_buffer(
275            'pw_ptpython_repl.user_code_complete_callback'
276        )
277
278        # Trigger a prompt_toolkit application redraw.
279        self.repl_pane.application.application.invalidate()
280
281    async def _run_system_command(  # pylint: disable=no-self-use
282        self, text, stdout_proxy, _stdin_proxy
283    ) -> int:
284        """Run a shell command and print results to the repl."""
285        command = shlex.split(text)
286        returncode = None
287        env = os.environ.copy()
288        # Force colors in Pigweed subcommands and some terminal apps.
289        env['PW_USE_COLOR'] = '1'
290        env['CLICOLOR_FORCE'] = '1'
291
292        def _handle_output(output):
293            # Force tab characters to 8 spaces to prevent \t from showing in
294            # prompt_toolkit.
295            output = output.replace('\t', '        ')
296            # Strip some ANSI sequences that don't render.
297            output = output.replace('\x1b(B\x1b[m', '')
298            output = output.replace('\x1b[1m', '')
299            stdout_proxy.write(output)
300            _SYSTEM_COMMAND_LOG.info(output.rstrip())
301
302        with subprocess.Popen(
303            command,
304            env=env,
305            stdout=subprocess.PIPE,
306            stderr=subprocess.STDOUT,
307            errors='replace',
308        ) as proc:
309            # Print the command
310            _SYSTEM_COMMAND_LOG.info('')
311            _SYSTEM_COMMAND_LOG.info('$ %s', text)
312            while returncode is None:
313                if not proc.stdout:
314                    continue
315
316                # Check for one line and update.
317                output = proc.stdout.readline()
318                _handle_output(output)
319
320                returncode = proc.poll()
321
322            # Print any remaining lines.
323            for output in proc.stdout.readlines():
324                _handle_output(output)
325
326        return returncode
327
328    async def _run_user_code(self, text, stdout_proxy, stdin_proxy):
329        """Run user code and capture stdout+err.
330
331        This fuction should be run in a separate thread from the main
332        prompt_toolkit application."""
333        # NOTE: This function runs in a separate thread using the asyncio event
334        # loop defined by self.repl_pane.application.user_code_loop. Patching
335        # stdout here will not effect the stdout used by prompt_toolkit and the
336        # main user interface.
337
338        # Patch stdout and stderr to capture repl print() statements.
339        original_stdout = sys.stdout
340        original_stderr = sys.stderr
341
342        sys.stdout = stdout_proxy
343        sys.stderr = stdin_proxy
344
345        # Run user repl code
346        try:
347            if _user_input_is_a_shell_command(text):
348                result = await self._run_system_command(
349                    text[1:], stdout_proxy, stdin_proxy
350                )
351            else:
352                result = await self.run_and_show_expression_async(text)
353        finally:
354            # Always restore original stdout and stderr
355            sys.stdout = original_stdout
356            sys.stderr = original_stderr
357
358        # Save the captured output
359        stdout_contents = stdout_proxy.getvalue()
360        stderr_contents = stdin_proxy.getvalue()
361
362        return {
363            'stdout': stdout_contents,
364            'stderr': stderr_contents,
365            'result': result,
366        }
367
368    def _accept_handler(self, buff: Buffer) -> bool:
369        """Function executed when pressing enter in the ptpython.repl.PythonRepl
370        input buffer."""
371        # Do nothing if no text is entered.
372        if len(buff.text) == 0:
373            return False
374        if self.repl_pane is None:
375            return False
376
377        repl_input_text = buff.text
378        # Exit if quit or exit
379        if repl_input_text.strip() in ['quit', 'quit()', 'exit', 'exit()']:
380            self.repl_pane.application.application.exit()  # type: ignore
381
382        # Create stdout and stderr proxies
383        temp_stdout = io.StringIO()
384        temp_stderr = io.StringIO()
385
386        # The help() command with no args uses it's own interactive prompt which
387        # will not work if prompt_toolkit is running.
388        if repl_input_text.strip() in ['help()']:
389            # Run nothing
390            repl_input_text = ''
391            # Override stdout
392            temp_stdout.write(
393                'Error: Interactive help() is not compatible with this repl.'
394            )
395
396        # Pop open the system command log pane for shell commands.
397        if _user_input_is_a_shell_command(repl_input_text):
398            self.repl_pane.application.setup_command_runner_log_pane()
399
400        # Execute the repl code in the the separate user_code thread loop.
401        future = asyncio.run_coroutine_threadsafe(
402            # This function will be executed in a separate thread.
403            self._run_user_code(repl_input_text, temp_stdout, temp_stderr),
404            # Using this asyncio event loop.
405            self.repl_pane.application.user_code_loop,
406        )  # type: ignore
407
408        # Save the input text and future object.
409        self.repl_pane.append_executed_code(
410            repl_input_text, future, temp_stdout, temp_stderr
411        )  # type: ignore
412
413        # Run user_code_complete_callback() when done.
414        done_callback = functools.partial(
415            self.user_code_complete_callback, repl_input_text
416        )
417        future.add_done_callback(done_callback)
418
419        # Rebuild the parent ReplPane output buffer.
420        self.repl_pane.update_output_buffer('pw_ptpython_repl._accept_handler')
421
422        # TODO(tonymd): Return True if exception is found?
423        # Don't keep input for now. Return True to keep input text.
424        return False
425
426    def line_break_count(self) -> int:
427        return self.default_buffer.text.count('\n')
428
429    def input_empty_if_in_focus_condition(self) -> Condition:
430        @Condition
431        def test() -> bool:
432            if has_focus(self)() and len(self.default_buffer.text) == 0:
433                return True
434            return not has_focus(self)()
435
436        return test
437