• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Tests for pw_console.console_app"""
15
16import asyncio
17import builtins
18import inspect
19import io
20import sys
21import threading
22import unittest
23from unittest.mock import MagicMock, call
24
25from prompt_toolkit.application import create_app_session
26from prompt_toolkit.output import (
27    ColorDepth,
28    # inclusive-language: ignore
29    DummyOutput as FakeOutput,
30)
31
32from pw_console.console_app import ConsoleApp
33from pw_console.console_prefs import ConsolePrefs
34from pw_console.repl_pane import ReplPane
35from pw_console.pw_ptpython_repl import PwPtPythonRepl
36
37_PYTHON_3_8 = sys.version_info >= (
38    3,
39    8,
40)
41
42if _PYTHON_3_8:
43    from unittest import IsolatedAsyncioTestCase  # type: ignore # pylint: disable=no-name-in-module
44
45    class TestReplPane(IsolatedAsyncioTestCase):
46        """Tests for ReplPane."""
47
48        def setUp(self):  # pylint: disable=invalid-name
49            self.maxDiff = None  # pylint: disable=invalid-name
50
51        def test_repl_code_return_values(self) -> None:
52            """Test stdout, return values, and exceptions can be returned from
53            running user repl code."""
54            app = MagicMock()
55
56            global_vars = {
57                '__name__': '__main__',
58                '__package__': None,
59                '__doc__': None,
60                '__builtins__': builtins,
61            }
62
63            pw_ptpython_repl = PwPtPythonRepl(
64                get_globals=lambda: global_vars,
65                get_locals=lambda: global_vars,
66                color_depth=ColorDepth.DEPTH_8_BIT,
67            )
68            repl_pane = ReplPane(
69                application=app,
70                python_repl=pw_ptpython_repl,
71            )
72            # Check pw_ptpython_repl has a reference to the parent repl_pane.
73            self.assertEqual(repl_pane, pw_ptpython_repl.repl_pane)
74
75            # Define a function, should return nothing.
76            code = inspect.cleandoc(
77                """
78                def run():
79                    print('The answer is ', end='')
80                    return 1+1+4+16+20
81            """
82            )
83            temp_stdout = io.StringIO()
84            temp_stderr = io.StringIO()
85            # pylint: disable=protected-access
86            result = asyncio.run(
87                pw_ptpython_repl._run_user_code(code, temp_stdout, temp_stderr)
88            )
89            self.assertEqual(
90                result, {'stdout': '', 'stderr': '', 'result': None}
91            )
92
93            temp_stdout = io.StringIO()
94            temp_stderr = io.StringIO()
95            # Check stdout and return value
96            result = asyncio.run(
97                pw_ptpython_repl._run_user_code(
98                    'run()', temp_stdout, temp_stderr
99                )
100            )
101            self.assertEqual(
102                result, {'stdout': 'The answer is ', 'stderr': '', 'result': 42}
103            )
104
105            temp_stdout = io.StringIO()
106            temp_stderr = io.StringIO()
107            # Check for repl exception
108            result = asyncio.run(
109                pw_ptpython_repl._run_user_code(
110                    'return "blah"', temp_stdout, temp_stderr
111                )
112            )
113            self.assertIn(
114                "SyntaxError: 'return' outside function",
115                pw_ptpython_repl._last_exception,  # type: ignore
116            )
117
118        async def test_user_thread(self) -> None:
119            """Test user code thread."""
120
121            with create_app_session(output=FakeOutput()):
122                # Setup Mocks
123                prefs = ConsolePrefs(
124                    project_file=False, project_user_file=False, user_file=False
125                )
126                prefs.set_code_theme('default')
127                app = ConsoleApp(
128                    color_depth=ColorDepth.DEPTH_8_BIT, prefs=prefs
129                )
130
131                app.start_user_code_thread()
132
133                pw_ptpython_repl = app.pw_ptpython_repl
134                repl_pane = app.repl_pane
135
136                # Mock update_output_buffer to track number of update calls
137                repl_pane.update_output_buffer = MagicMock(  # type: ignore
138                    wraps=repl_pane.update_output_buffer
139                )
140
141                # Mock complete callback
142                pw_ptpython_repl.user_code_complete_callback = (  # type: ignore
143                    MagicMock(
144                        wraps=pw_ptpython_repl.user_code_complete_callback
145                    )
146                )
147
148                # Repl done flag for tests
149                user_code_done = threading.Event()
150
151                # Run some code
152                code = inspect.cleandoc(
153                    """
154                    import time
155                    def run():
156                        for i in range(2):
157                            time.sleep(0.5)
158                            print(i)
159                        print('The answer is ', end='')
160                        return 1+1+4+16+20
161                """
162                )
163                input_buffer = MagicMock(text=code)
164                # pylint: disable=protected-access
165                pw_ptpython_repl._accept_handler(input_buffer)
166                # pylint: enable=protected-access
167
168                # Get last executed code object.
169                user_code1 = repl_pane.executed_code[-1]
170                # Wait for repl code to finish.
171                user_code1.future.add_done_callback(
172                    lambda future: user_code_done.set()
173                )
174                # Wait for stdout monitoring to complete.
175                if user_code1.stdout_check_task:
176                    await user_code1.stdout_check_task
177                # Wait for test done callback.
178                user_code_done.wait()
179
180                # Check user_code1 results
181                # NOTE: Avoid using assert_has_calls. Thread timing can make the
182                # test flaky.
183                expected_calls = [
184                    # Initial exec start
185                    call('pw_ptpython_repl._accept_handler'),
186                    # Code finishes
187                    call('repl_pane.append_result_to_executed_code'),
188                    # Complete callback
189                    call('pw_ptpython_repl.user_code_complete_callback'),
190                ]
191                for expected_call in expected_calls:
192                    self.assertIn(
193                        expected_call, repl_pane.update_output_buffer.mock_calls
194                    )
195
196                user_code_complete_callback = (
197                    pw_ptpython_repl.user_code_complete_callback
198                )
199                user_code_complete_callback.assert_called_once()
200
201                self.assertIsNotNone(user_code1)
202                self.assertTrue(user_code1.future.done())
203                self.assertEqual(user_code1.input, code)
204                self.assertEqual(user_code1.output, None)
205                # stdout / stderr may be '' or None
206                self.assertFalse(user_code1.stdout)
207                self.assertFalse(user_code1.stderr)
208
209                # Reset mocks
210                user_code_done.clear()
211                pw_ptpython_repl.user_code_complete_callback.reset_mock()
212                repl_pane.update_output_buffer.reset_mock()
213
214                # Run some code
215                input_buffer = MagicMock(text='run()')
216                # pylint: disable=protected-access
217                pw_ptpython_repl._accept_handler(input_buffer)
218                # pylint: enable=protected-access
219
220                # Get last executed code object.
221                user_code2 = repl_pane.executed_code[-1]
222                # Wait for repl code to finish.
223                user_code2.future.add_done_callback(
224                    lambda future: user_code_done.set()
225                )
226                # Wait for stdout monitoring to complete.
227                if user_code2.stdout_check_task:
228                    await user_code2.stdout_check_task
229                # Wait for test done callback.
230                user_code_done.wait()
231
232                # Check user_code2 results
233                # NOTE: Avoid using assert_has_calls. Thread timing can make the
234                # test flaky.
235                expected_calls = [
236                    # Initial exec start
237                    call('pw_ptpython_repl._accept_handler'),
238                    # Periodic checks, should be a total of 4:
239                    #   Code should take 1.0 second to run.
240                    #   Periodic checks every 0.3 seconds
241                    #   1.0 / 0.3 = 3.33 (4) checks
242                    call('repl_pane.periodic_check'),
243                    call('repl_pane.periodic_check'),
244                    call('repl_pane.periodic_check'),
245                    # Code finishes
246                    call('repl_pane.append_result_to_executed_code'),
247                    # Complete callback
248                    call('pw_ptpython_repl.user_code_complete_callback'),
249                    # Final periodic check
250                    call('repl_pane.periodic_check'),
251                ]
252                for expected_call in expected_calls:
253                    self.assertIn(
254                        expected_call, repl_pane.update_output_buffer.mock_calls
255                    )
256
257                # pylint: disable=line-too-long
258                pw_ptpython_repl.user_code_complete_callback.assert_called_once()
259                # pylint: enable=line-too-long
260                self.assertIsNotNone(user_code2)
261                self.assertTrue(user_code2.future.done())
262                self.assertEqual(user_code2.input, 'run()')
263                self.assertEqual(user_code2.output, '42')
264                self.assertEqual(user_code2.stdout, '0\n1\nThe answer is ')
265                self.assertFalse(user_code2.stderr)
266
267                # Reset mocks
268                user_code_done.clear()
269                pw_ptpython_repl.user_code_complete_callback.reset_mock()
270                repl_pane.update_output_buffer.reset_mock()
271
272
273if __name__ == '__main__':
274    unittest.main()
275