1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Tests for pw_console.console_app""" 15 16import asyncio 17import builtins 18import inspect 19import io 20import sys 21import threading 22import unittest 23from unittest.mock import MagicMock, call 24 25from prompt_toolkit.application import create_app_session 26from prompt_toolkit.output import ( 27 ColorDepth, 28 # inclusive-language: ignore 29 DummyOutput as FakeOutput, 30) 31 32from pw_console.console_app import ConsoleApp 33from pw_console.console_prefs import ConsolePrefs 34from pw_console.repl_pane import ReplPane 35from pw_console.pw_ptpython_repl import PwPtPythonRepl 36 37_PYTHON_3_8 = sys.version_info >= ( 38 3, 39 8, 40) 41 42if _PYTHON_3_8: 43 from unittest import IsolatedAsyncioTestCase # type: ignore # pylint: disable=no-name-in-module 44 45 class TestReplPane(IsolatedAsyncioTestCase): 46 """Tests for ReplPane.""" 47 def setUp(self): # pylint: disable=invalid-name 48 self.maxDiff = None # pylint: disable=invalid-name 49 50 def test_repl_code_return_values(self) -> None: 51 """Test stdout, return values, and exceptions can be returned from 52 running user repl code.""" 53 app = MagicMock() 54 55 global_vars = { 56 '__name__': '__main__', 57 '__package__': None, 58 '__doc__': None, 59 '__builtins__': builtins, 60 } 61 62 pw_ptpython_repl = PwPtPythonRepl( 63 get_globals=lambda: global_vars, 64 get_locals=lambda: global_vars, 65 color_depth=ColorDepth.DEPTH_8_BIT) 66 repl_pane = ReplPane( 67 application=app, 68 python_repl=pw_ptpython_repl, 69 ) 70 # Check pw_ptpython_repl has a reference to the parent repl_pane. 71 self.assertEqual(repl_pane, pw_ptpython_repl.repl_pane) 72 73 # Define a function, should return nothing. 74 code = inspect.cleandoc(""" 75 def run(): 76 print('The answer is ', end='') 77 return 1+1+4+16+20 78 """) 79 temp_stdout = io.StringIO() 80 temp_stderr = io.StringIO() 81 # pylint: disable=protected-access 82 result = asyncio.run( 83 pw_ptpython_repl._run_user_code(code, temp_stdout, 84 temp_stderr)) 85 self.assertEqual(result, { 86 'stdout': '', 87 'stderr': '', 88 'result': None 89 }) 90 91 temp_stdout = io.StringIO() 92 temp_stderr = io.StringIO() 93 # Check stdout and return value 94 result = asyncio.run( 95 pw_ptpython_repl._run_user_code('run()', temp_stdout, 96 temp_stderr)) 97 self.assertEqual(result, { 98 'stdout': 'The answer is ', 99 'stderr': '', 100 'result': 42 101 }) 102 103 temp_stdout = io.StringIO() 104 temp_stderr = io.StringIO() 105 # Check for repl exception 106 result = asyncio.run( 107 pw_ptpython_repl._run_user_code('return "blah"', temp_stdout, 108 temp_stderr)) 109 self.assertIn("SyntaxError: 'return' outside function", 110 pw_ptpython_repl._last_exception) # type: ignore 111 112 async def test_user_thread(self) -> None: 113 """Test user code thread.""" 114 115 with create_app_session(output=FakeOutput()): 116 # Setup Mocks 117 app = ConsoleApp(color_depth=ColorDepth.DEPTH_8_BIT, 118 prefs=ConsolePrefs(project_file=False, 119 project_user_file=False, 120 user_file=False)) 121 122 app.start_user_code_thread() 123 124 pw_ptpython_repl = app.pw_ptpython_repl 125 repl_pane = app.repl_pane 126 127 # Mock update_output_buffer to track number of update calls 128 repl_pane.update_output_buffer = MagicMock( 129 wraps=repl_pane.update_output_buffer) 130 131 # Mock complete callback 132 pw_ptpython_repl.user_code_complete_callback = MagicMock( 133 wraps=pw_ptpython_repl.user_code_complete_callback) 134 135 # Repl done flag for tests 136 user_code_done = threading.Event() 137 138 # Run some code 139 code = inspect.cleandoc(""" 140 import time 141 def run(): 142 for i in range(2): 143 time.sleep(0.5) 144 print(i) 145 print('The answer is ', end='') 146 return 1+1+4+16+20 147 """) 148 input_buffer = MagicMock(text=code) 149 pw_ptpython_repl._accept_handler(input_buffer) # pylint: disable=protected-access 150 151 # Get last executed code object. 152 user_code1 = repl_pane.executed_code[-1] 153 # Wait for repl code to finish. 154 user_code1.future.add_done_callback( 155 lambda future: user_code_done.set()) 156 # Wait for stdout monitoring to complete. 157 if user_code1.stdout_check_task: 158 await user_code1.stdout_check_task 159 # Wait for test done callback. 160 user_code_done.wait(timeout=3) 161 162 # Check user_code1 results 163 # NOTE: Avoid using assert_has_calls. Thread timing can make the 164 # test flaky. 165 expected_calls = [ 166 # Initial exec start 167 call('pw_ptpython_repl._accept_handler'), 168 # Code finishes 169 call('repl_pane.append_result_to_executed_code'), 170 # Complete callback 171 call('pw_ptpython_repl.user_code_complete_callback'), 172 ] 173 for expected_call in expected_calls: 174 self.assertIn(expected_call, 175 repl_pane.update_output_buffer.mock_calls) 176 177 pw_ptpython_repl.user_code_complete_callback.assert_called_once( 178 ) 179 180 self.assertIsNotNone(user_code1) 181 self.assertTrue(user_code1.future.done()) 182 self.assertEqual(user_code1.input, code) 183 self.assertEqual(user_code1.output, None) 184 # stdout / stderr may be '' or None 185 self.assertFalse(user_code1.stdout) 186 self.assertFalse(user_code1.stderr) 187 188 # Reset mocks 189 user_code_done.clear() 190 pw_ptpython_repl.user_code_complete_callback.reset_mock() 191 repl_pane.update_output_buffer.reset_mock() 192 193 # Run some code 194 input_buffer = MagicMock(text='run()') 195 pw_ptpython_repl._accept_handler(input_buffer) # pylint: disable=protected-access 196 197 # Get last executed code object. 198 user_code2 = repl_pane.executed_code[-1] 199 # Wait for repl code to finish. 200 user_code2.future.add_done_callback( 201 lambda future: user_code_done.set()) 202 # Wait for stdout monitoring to complete. 203 if user_code2.stdout_check_task: 204 await user_code2.stdout_check_task 205 # Wait for test done callback. 206 user_code_done.wait(timeout=3) 207 208 # Check user_code2 results 209 # NOTE: Avoid using assert_has_calls. Thread timing can make the 210 # test flaky. 211 expected_calls = [ 212 # Initial exec start 213 call('pw_ptpython_repl._accept_handler'), 214 # Periodic checks, should be a total of 4: 215 # Code should take 1.0 second to run. 216 # Periodic checks every 0.3 seconds 217 # 1.0 / 0.3 = 3.33 (4) checks 218 call('repl_pane.periodic_check'), 219 call('repl_pane.periodic_check'), 220 call('repl_pane.periodic_check'), 221 # Code finishes 222 call('repl_pane.append_result_to_executed_code'), 223 # Complete callback 224 call('pw_ptpython_repl.user_code_complete_callback'), 225 # Final periodic check 226 call('repl_pane.periodic_check'), 227 ] 228 for expected_call in expected_calls: 229 self.assertIn(expected_call, 230 repl_pane.update_output_buffer.mock_calls) 231 232 pw_ptpython_repl.user_code_complete_callback.assert_called_once( 233 ) 234 self.assertIsNotNone(user_code2) 235 self.assertTrue(user_code2.future.done()) 236 self.assertEqual(user_code2.input, 'run()') 237 self.assertEqual(user_code2.output, '42') 238 self.assertEqual(user_code2.stdout, '0\n1\nThe answer is ') 239 self.assertFalse(user_code2.stderr) 240 241 # Reset mocks 242 user_code_done.clear() 243 pw_ptpython_repl.user_code_complete_callback.reset_mock() 244 repl_pane.update_output_buffer.reset_mock() 245 246 247if __name__ == '__main__': 248 unittest.main() 249