1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Tests for pw_console.console_app""" 15 16import asyncio 17import builtins 18import inspect 19import io 20import sys 21import threading 22import unittest 23from unittest.mock import MagicMock, call 24 25from prompt_toolkit.application import create_app_session 26from prompt_toolkit.output import ( 27 ColorDepth, 28 # inclusive-language: ignore 29 DummyOutput as FakeOutput, 30) 31 32from pw_console.console_app import ConsoleApp 33from pw_console.console_prefs import ConsolePrefs 34from pw_console.repl_pane import ReplPane 35from pw_console.pw_ptpython_repl import PwPtPythonRepl 36 37_PYTHON_3_8 = sys.version_info >= ( 38 3, 39 8, 40) 41 42if _PYTHON_3_8: 43 from unittest import IsolatedAsyncioTestCase # type: ignore # pylint: disable=no-name-in-module 44 45 class TestReplPane(IsolatedAsyncioTestCase): 46 """Tests for ReplPane.""" 47 48 def setUp(self): # pylint: disable=invalid-name 49 self.maxDiff = None # pylint: disable=invalid-name 50 51 def test_repl_code_return_values(self) -> None: 52 """Test stdout, return values, and exceptions can be returned from 53 running user repl code.""" 54 app = MagicMock() 55 56 global_vars = { 57 '__name__': '__main__', 58 '__package__': None, 59 '__doc__': None, 60 '__builtins__': builtins, 61 } 62 63 pw_ptpython_repl = PwPtPythonRepl( 64 get_globals=lambda: global_vars, 65 get_locals=lambda: global_vars, 66 color_depth=ColorDepth.DEPTH_8_BIT, 67 ) 68 repl_pane = ReplPane( 69 application=app, 70 python_repl=pw_ptpython_repl, 71 ) 72 # Check pw_ptpython_repl has a reference to the parent repl_pane. 73 self.assertEqual(repl_pane, pw_ptpython_repl.repl_pane) 74 75 # Define a function, should return nothing. 76 code = inspect.cleandoc( 77 """ 78 def run(): 79 print('The answer is ', end='') 80 return 1+1+4+16+20 81 """ 82 ) 83 temp_stdout = io.StringIO() 84 temp_stderr = io.StringIO() 85 # pylint: disable=protected-access 86 result = asyncio.run( 87 pw_ptpython_repl._run_user_code(code, temp_stdout, temp_stderr) 88 ) 89 self.assertEqual( 90 result, {'stdout': '', 'stderr': '', 'result': None} 91 ) 92 93 temp_stdout = io.StringIO() 94 temp_stderr = io.StringIO() 95 # Check stdout and return value 96 result = asyncio.run( 97 pw_ptpython_repl._run_user_code( 98 'run()', temp_stdout, temp_stderr 99 ) 100 ) 101 self.assertEqual( 102 result, {'stdout': 'The answer is ', 'stderr': '', 'result': 42} 103 ) 104 105 temp_stdout = io.StringIO() 106 temp_stderr = io.StringIO() 107 # Check for repl exception 108 result = asyncio.run( 109 pw_ptpython_repl._run_user_code( 110 'return "blah"', temp_stdout, temp_stderr 111 ) 112 ) 113 self.assertIn( 114 "SyntaxError: 'return' outside function", 115 pw_ptpython_repl._last_exception, # type: ignore 116 ) 117 118 async def test_user_thread(self) -> None: 119 """Test user code thread.""" 120 121 with create_app_session(output=FakeOutput()): 122 # Setup Mocks 123 prefs = ConsolePrefs( 124 project_file=False, project_user_file=False, user_file=False 125 ) 126 prefs.set_code_theme('default') 127 app = ConsoleApp( 128 color_depth=ColorDepth.DEPTH_8_BIT, prefs=prefs 129 ) 130 131 app.start_user_code_thread() 132 133 pw_ptpython_repl = app.pw_ptpython_repl 134 repl_pane = app.repl_pane 135 136 # Mock update_output_buffer to track number of update calls 137 repl_pane.update_output_buffer = MagicMock( # type: ignore 138 wraps=repl_pane.update_output_buffer 139 ) 140 141 # Mock complete callback 142 pw_ptpython_repl.user_code_complete_callback = ( # type: ignore 143 MagicMock( 144 wraps=pw_ptpython_repl.user_code_complete_callback 145 ) 146 ) 147 148 # Repl done flag for tests 149 user_code_done = threading.Event() 150 151 # Run some code 152 code = inspect.cleandoc( 153 """ 154 import time 155 def run(): 156 for i in range(2): 157 time.sleep(0.5) 158 print(i) 159 print('The answer is ', end='') 160 return 1+1+4+16+20 161 """ 162 ) 163 input_buffer = MagicMock(text=code) 164 # pylint: disable=protected-access 165 pw_ptpython_repl._accept_handler(input_buffer) 166 # pylint: enable=protected-access 167 168 # Get last executed code object. 169 user_code1 = repl_pane.executed_code[-1] 170 # Wait for repl code to finish. 171 user_code1.future.add_done_callback( 172 lambda future: user_code_done.set() 173 ) 174 # Wait for stdout monitoring to complete. 175 if user_code1.stdout_check_task: 176 await user_code1.stdout_check_task 177 # Wait for test done callback. 178 user_code_done.wait() 179 180 # Check user_code1 results 181 # NOTE: Avoid using assert_has_calls. Thread timing can make the 182 # test flaky. 183 expected_calls = [ 184 # Initial exec start 185 call('pw_ptpython_repl._accept_handler'), 186 # Code finishes 187 call('repl_pane.append_result_to_executed_code'), 188 # Complete callback 189 call('pw_ptpython_repl.user_code_complete_callback'), 190 ] 191 for expected_call in expected_calls: 192 self.assertIn( 193 expected_call, repl_pane.update_output_buffer.mock_calls 194 ) 195 196 user_code_complete_callback = ( 197 pw_ptpython_repl.user_code_complete_callback 198 ) 199 user_code_complete_callback.assert_called_once() 200 201 self.assertIsNotNone(user_code1) 202 self.assertTrue(user_code1.future.done()) 203 self.assertEqual(user_code1.input, code) 204 self.assertEqual(user_code1.output, None) 205 # stdout / stderr may be '' or None 206 self.assertFalse(user_code1.stdout) 207 self.assertFalse(user_code1.stderr) 208 209 # Reset mocks 210 user_code_done.clear() 211 pw_ptpython_repl.user_code_complete_callback.reset_mock() 212 repl_pane.update_output_buffer.reset_mock() 213 214 # Run some code 215 input_buffer = MagicMock(text='run()') 216 # pylint: disable=protected-access 217 pw_ptpython_repl._accept_handler(input_buffer) 218 # pylint: enable=protected-access 219 220 # Get last executed code object. 221 user_code2 = repl_pane.executed_code[-1] 222 # Wait for repl code to finish. 223 user_code2.future.add_done_callback( 224 lambda future: user_code_done.set() 225 ) 226 # Wait for stdout monitoring to complete. 227 if user_code2.stdout_check_task: 228 await user_code2.stdout_check_task 229 # Wait for test done callback. 230 user_code_done.wait() 231 232 # Check user_code2 results 233 # NOTE: Avoid using assert_has_calls. Thread timing can make the 234 # test flaky. 235 expected_calls = [ 236 # Initial exec start 237 call('pw_ptpython_repl._accept_handler'), 238 # Periodic checks, should be a total of 4: 239 # Code should take 1.0 second to run. 240 # Periodic checks every 0.3 seconds 241 # 1.0 / 0.3 = 3.33 (4) checks 242 call('repl_pane.periodic_check'), 243 call('repl_pane.periodic_check'), 244 call('repl_pane.periodic_check'), 245 # Code finishes 246 call('repl_pane.append_result_to_executed_code'), 247 # Complete callback 248 call('pw_ptpython_repl.user_code_complete_callback'), 249 # Final periodic check 250 call('repl_pane.periodic_check'), 251 ] 252 for expected_call in expected_calls: 253 self.assertIn( 254 expected_call, repl_pane.update_output_buffer.mock_calls 255 ) 256 257 # pylint: disable=line-too-long 258 pw_ptpython_repl.user_code_complete_callback.assert_called_once() 259 # pylint: enable=line-too-long 260 self.assertIsNotNone(user_code2) 261 self.assertTrue(user_code2.future.done()) 262 self.assertEqual(user_code2.input, 'run()') 263 self.assertEqual(user_code2.output, '42') 264 self.assertEqual(user_code2.stdout, '0\n1\nThe answer is ') 265 self.assertFalse(user_code2.stderr) 266 267 # Reset mocks 268 user_code_done.clear() 269 pw_ptpython_repl.user_code_complete_callback.reset_mock() 270 repl_pane.update_output_buffer.reset_mock() 271 272 273if __name__ == '__main__': 274 unittest.main() 275