• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Tests for pw_console.console_app"""
15
16import asyncio
17import builtins
18import inspect
19import io
20import sys
21import threading
22import unittest
23from unittest.mock import MagicMock, call
24
25from prompt_toolkit.application import create_app_session
26from prompt_toolkit.output import (
27    ColorDepth,
28    # inclusive-language: ignore
29    DummyOutput as FakeOutput,
30)
31
32from pw_console.console_app import ConsoleApp
33from pw_console.console_prefs import ConsolePrefs
34from pw_console.repl_pane import ReplPane
35from pw_console.pw_ptpython_repl import PwPtPythonRepl
36
37_PYTHON_3_8 = sys.version_info >= (
38    3,
39    8,
40)
41
42if _PYTHON_3_8:
43    from unittest import IsolatedAsyncioTestCase  # type: ignore # pylint: disable=no-name-in-module
44
45    class TestReplPane(IsolatedAsyncioTestCase):
46        """Tests for ReplPane."""
47        def setUp(self):  # pylint: disable=invalid-name
48            self.maxDiff = None  # pylint: disable=invalid-name
49
50        def test_repl_code_return_values(self) -> None:
51            """Test stdout, return values, and exceptions can be returned from
52            running user repl code."""
53            app = MagicMock()
54
55            global_vars = {
56                '__name__': '__main__',
57                '__package__': None,
58                '__doc__': None,
59                '__builtins__': builtins,
60            }
61
62            pw_ptpython_repl = PwPtPythonRepl(
63                get_globals=lambda: global_vars,
64                get_locals=lambda: global_vars,
65                color_depth=ColorDepth.DEPTH_8_BIT)
66            repl_pane = ReplPane(
67                application=app,
68                python_repl=pw_ptpython_repl,
69            )
70            # Check pw_ptpython_repl has a reference to the parent repl_pane.
71            self.assertEqual(repl_pane, pw_ptpython_repl.repl_pane)
72
73            # Define a function, should return nothing.
74            code = inspect.cleandoc("""
75                def run():
76                    print('The answer is ', end='')
77                    return 1+1+4+16+20
78            """)
79            temp_stdout = io.StringIO()
80            temp_stderr = io.StringIO()
81            # pylint: disable=protected-access
82            result = asyncio.run(
83                pw_ptpython_repl._run_user_code(code, temp_stdout,
84                                                temp_stderr))
85            self.assertEqual(result, {
86                'stdout': '',
87                'stderr': '',
88                'result': None
89            })
90
91            temp_stdout = io.StringIO()
92            temp_stderr = io.StringIO()
93            # Check stdout and return value
94            result = asyncio.run(
95                pw_ptpython_repl._run_user_code('run()', temp_stdout,
96                                                temp_stderr))
97            self.assertEqual(result, {
98                'stdout': 'The answer is ',
99                'stderr': '',
100                'result': 42
101            })
102
103            temp_stdout = io.StringIO()
104            temp_stderr = io.StringIO()
105            # Check for repl exception
106            result = asyncio.run(
107                pw_ptpython_repl._run_user_code('return "blah"', temp_stdout,
108                                                temp_stderr))
109            self.assertIn("SyntaxError: 'return' outside function",
110                          pw_ptpython_repl._last_exception)  # type: ignore
111
112        async def test_user_thread(self) -> None:
113            """Test user code thread."""
114
115            with create_app_session(output=FakeOutput()):
116                # Setup Mocks
117                app = ConsoleApp(color_depth=ColorDepth.DEPTH_8_BIT,
118                                 prefs=ConsolePrefs(project_file=False,
119                                                    project_user_file=False,
120                                                    user_file=False))
121
122                app.start_user_code_thread()
123
124                pw_ptpython_repl = app.pw_ptpython_repl
125                repl_pane = app.repl_pane
126
127                # Mock update_output_buffer to track number of update calls
128                repl_pane.update_output_buffer = MagicMock(
129                    wraps=repl_pane.update_output_buffer)
130
131                # Mock complete callback
132                pw_ptpython_repl.user_code_complete_callback = MagicMock(
133                    wraps=pw_ptpython_repl.user_code_complete_callback)
134
135                # Repl done flag for tests
136                user_code_done = threading.Event()
137
138                # Run some code
139                code = inspect.cleandoc("""
140                    import time
141                    def run():
142                        for i in range(2):
143                            time.sleep(0.5)
144                            print(i)
145                        print('The answer is ', end='')
146                        return 1+1+4+16+20
147                """)
148                input_buffer = MagicMock(text=code)
149                pw_ptpython_repl._accept_handler(input_buffer)  # pylint: disable=protected-access
150
151                # Get last executed code object.
152                user_code1 = repl_pane.executed_code[-1]
153                # Wait for repl code to finish.
154                user_code1.future.add_done_callback(
155                    lambda future: user_code_done.set())
156                # Wait for stdout monitoring to complete.
157                if user_code1.stdout_check_task:
158                    await user_code1.stdout_check_task
159                # Wait for test done callback.
160                user_code_done.wait(timeout=3)
161
162                # Check user_code1 results
163                # NOTE: Avoid using assert_has_calls. Thread timing can make the
164                # test flaky.
165                expected_calls = [
166                    # Initial exec start
167                    call('pw_ptpython_repl._accept_handler'),
168                    # Code finishes
169                    call('repl_pane.append_result_to_executed_code'),
170                    # Complete callback
171                    call('pw_ptpython_repl.user_code_complete_callback'),
172                ]
173                for expected_call in expected_calls:
174                    self.assertIn(expected_call,
175                                  repl_pane.update_output_buffer.mock_calls)
176
177                pw_ptpython_repl.user_code_complete_callback.assert_called_once(
178                )
179
180                self.assertIsNotNone(user_code1)
181                self.assertTrue(user_code1.future.done())
182                self.assertEqual(user_code1.input, code)
183                self.assertEqual(user_code1.output, None)
184                # stdout / stderr may be '' or None
185                self.assertFalse(user_code1.stdout)
186                self.assertFalse(user_code1.stderr)
187
188                # Reset mocks
189                user_code_done.clear()
190                pw_ptpython_repl.user_code_complete_callback.reset_mock()
191                repl_pane.update_output_buffer.reset_mock()
192
193                # Run some code
194                input_buffer = MagicMock(text='run()')
195                pw_ptpython_repl._accept_handler(input_buffer)  # pylint: disable=protected-access
196
197                # Get last executed code object.
198                user_code2 = repl_pane.executed_code[-1]
199                # Wait for repl code to finish.
200                user_code2.future.add_done_callback(
201                    lambda future: user_code_done.set())
202                # Wait for stdout monitoring to complete.
203                if user_code2.stdout_check_task:
204                    await user_code2.stdout_check_task
205                # Wait for test done callback.
206                user_code_done.wait(timeout=3)
207
208                # Check user_code2 results
209                # NOTE: Avoid using assert_has_calls. Thread timing can make the
210                # test flaky.
211                expected_calls = [
212                    # Initial exec start
213                    call('pw_ptpython_repl._accept_handler'),
214                    # Periodic checks, should be a total of 4:
215                    #   Code should take 1.0 second to run.
216                    #   Periodic checks every 0.3 seconds
217                    #   1.0 / 0.3 = 3.33 (4) checks
218                    call('repl_pane.periodic_check'),
219                    call('repl_pane.periodic_check'),
220                    call('repl_pane.periodic_check'),
221                    # Code finishes
222                    call('repl_pane.append_result_to_executed_code'),
223                    # Complete callback
224                    call('pw_ptpython_repl.user_code_complete_callback'),
225                    # Final periodic check
226                    call('repl_pane.periodic_check'),
227                ]
228                for expected_call in expected_calls:
229                    self.assertIn(expected_call,
230                                  repl_pane.update_output_buffer.mock_calls)
231
232                pw_ptpython_repl.user_code_complete_callback.assert_called_once(
233                )
234                self.assertIsNotNone(user_code2)
235                self.assertTrue(user_code2.future.done())
236                self.assertEqual(user_code2.input, 'run()')
237                self.assertEqual(user_code2.output, '42')
238                self.assertEqual(user_code2.stdout, '0\n1\nThe answer is ')
239                self.assertFalse(user_code2.stderr)
240
241                # Reset mocks
242                user_code_done.clear()
243                pw_ptpython_repl.user_code_complete_callback.reset_mock()
244                repl_pane.update_output_buffer.reset_mock()
245
246
247if __name__ == '__main__':
248    unittest.main()
249