1#!/usr/bin/env python 2# Copyright 2023 The Pigweed Authors 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5# use this file except in compliance with the License. You may obtain a copy of 6# the License at 7# 8# https://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15"""Tests for core / infrastructure code.""" 16 17import copy 18import io 19import json 20import os 21import socket 22import sys 23import tempfile 24import time 25import unittest 26from pathlib import Path 27from typing import Any 28 29from unittest.mock import patch 30 31from pw_emu.core import ( 32 AlreadyRunning, 33 Config, 34 ConfigError, 35 Handles, 36 InvalidTarget, 37 InvalidChannelName, 38 InvalidChannelType, 39 Launcher, 40) 41from mock_emu_frontend import _mock_emu 42from config_helper import ConfigHelper 43 44 45class ConfigHelperWithLauncher(ConfigHelper): 46 def setUp(self) -> None: 47 super().setUp() 48 self._launcher = Launcher.get('mock-emu', Path(self._config_file)) 49 50 51class TestInvalidTarget(ConfigHelperWithLauncher): 52 """Check that InvalidTarget is raised with an empty config.""" 53 54 _config: dict[str, Any] = { 55 'emulators': { 56 'mock-emu': { 57 'launcher': 'mock_emu_frontend.MockEmuLauncher', 58 'connector': 'mock_emu_frontend.MockEmuConnector', 59 } 60 }, 61 } 62 63 def test_invalid_target(self) -> None: 64 with self.assertRaises(InvalidTarget): 65 self._launcher.start(Path(self._wdir.name), 'test-target') 66 67 68class TestStart(ConfigHelperWithLauncher): 69 """Start tests for valid config.""" 70 71 _config: dict[str, Any] = { 72 'emulators': { 73 'mock-emu': { 74 'launcher': 'mock_emu_frontend.MockEmuLauncher', 75 'connector': 'mock_emu_frontend.MockEmuConnector', 76 } 77 }, 78 'targets': {'test-target': {'mock-emu': {}}}, 79 } 80 81 def setUp(self) -> None: 82 super().setUp() 83 self._connector = self._launcher.start( 84 Path(self._wdir.name), 'test-target' 85 ) 86 87 def tearDown(self) -> None: 88 self._connector.stop() 89 super().tearDown() 90 91 def test_running(self) -> None: 92 self.assertTrue(self._connector.running()) 93 94 def test_pid_file(self) -> None: 95 self.assertTrue( 96 os.path.exists(os.path.join(self._wdir.name, 'mock-emu.pid')) 97 ) 98 99 def test_handles_file(self) -> None: 100 self.assertTrue( 101 os.path.exists(os.path.join(self._wdir.name, 'handles.json')) 102 ) 103 104 def test_already_running(self) -> None: 105 with self.assertRaises(AlreadyRunning): 106 self._launcher.start(Path(self._wdir.name), 'test-target') 107 108 def test_log(self) -> None: 109 exp = 'starting mock emulator' 110 path = os.path.join(self._wdir.name, 'mock-emu.log') 111 deadline = time.monotonic() + 100 112 while os.path.getsize(path) < len(exp): 113 time.sleep(0.1) 114 if time.monotonic() > deadline: 115 break 116 117 with open(os.path.join(self._wdir.name, 'mock-emu.log'), 'rt') as file: 118 data = file.read() 119 self.assertTrue(exp in data, data) 120 121 122class TestPrePostStartCmds(ConfigHelperWithLauncher): 123 """Tests for configurations with pre-start commands.""" 124 125 _config: dict[str, Any] = { 126 'emulators': { 127 'mock-emu': { 128 'launcher': 'mock_emu_frontend.MockEmuLauncher', 129 'connector': 'mock_emu_frontend.MockEmuConnector', 130 } 131 }, 132 'targets': { 133 'test-target': { 134 'pre-start-cmds': { 135 'pre-1': _mock_emu + ['pre-1'], 136 'pre-2': _mock_emu + ['$pw_emu_wdir{pre-2}'], 137 }, 138 'post-start-cmds': { 139 'post-1': _mock_emu 140 + ['$pw_emu_channel_path{test_subst_pty}'], 141 'post-2': _mock_emu 142 + [ 143 '$pw_emu_channel_host{test_subst_tcp}', 144 '$pw_emu_channel_port{test_subst_tcp}', 145 ], 146 }, 147 'mock-emu': {}, 148 } 149 }, 150 } 151 152 def setUp(self) -> None: 153 super().setUp() 154 self._connector = self._launcher.start( 155 Path(self._wdir.name), 'test-target' 156 ) 157 158 def tearDown(self) -> None: 159 self._connector.stop() 160 super().tearDown() 161 162 def test_running(self) -> None: 163 for proc in self._connector.get_procs().keys(): 164 self.assertTrue(self._connector.proc_running(proc)) 165 166 def test_stop(self) -> None: 167 self._connector.stop() 168 for proc in self._connector.get_procs().keys(): 169 self.assertFalse(self._connector.proc_running(proc)) 170 171 def test_pid_files(self) -> None: 172 for proc in ['pre-1', 'pre-2', 'post-1', 'post-2']: 173 self.assertTrue( 174 os.path.exists(os.path.join(self._wdir.name, f'{proc}.pid')) 175 ) 176 177 def test_logs(self): 178 expect = { 179 'pre-1.log': 'pre-1', 180 'pre-2.log': os.path.join(self._wdir.name, 'pre-2'), 181 'post-1.log': 'pty-path', 182 'post-2.log': 'localhost 1234', 183 } 184 185 for log, pattern in expect.items(): 186 path = os.path.join(self._wdir.name, log) 187 deadline = time.monotonic() + 100 188 while os.path.getsize(path) < len(pattern): 189 time.sleep(0.1) 190 if time.monotonic() > deadline: 191 break 192 with open(os.path.join(self._wdir.name, log)) as file: 193 data = file.read() 194 self.assertTrue(pattern in data, f'`{pattern}` not in `{data}`') 195 196 197class TestStop(ConfigHelperWithLauncher): 198 """Stop tests for valid config.""" 199 200 _config: dict[str, Any] = { 201 'emulators': { 202 'mock-emu': { 203 'launcher': 'mock_emu_frontend.MockEmuLauncher', 204 'connector': 'mock_emu_frontend.MockEmuConnector', 205 } 206 }, 207 'targets': {'test-target': {'mock-emu': {}}}, 208 } 209 210 def setUp(self) -> None: 211 super().setUp() 212 self._connector = self._launcher.start( 213 Path(self._wdir.name), 'test-target' 214 ) 215 self._connector.stop() 216 217 def test_pid_files(self) -> None: 218 self.assertFalse( 219 os.path.exists(os.path.join(self._wdir.name, 'emu.pid')) 220 ) 221 222 def test_target_file(self) -> None: 223 self.assertFalse( 224 os.path.exists(os.path.join(self._wdir.name, 'target')) 225 ) 226 227 def test_running(self) -> None: 228 self.assertFalse(self._connector.running()) 229 230 231class TestChannels(ConfigHelperWithLauncher): 232 """Test Connector channels APIs.""" 233 234 _config: dict[str, Any] = { 235 'emulators': { 236 'mock-emu': { 237 'launcher': 'mock_emu_frontend.MockEmuLauncher', 238 'connector': 'mock_emu_frontend.MockEmuConnector', 239 } 240 }, 241 'mock-emu': { 242 'tcp_channel': True, 243 'pty_channel': sys.platform != 'win32', 244 }, 245 'targets': {'test-target': {'mock-emu': {}}}, 246 } 247 248 def setUp(self) -> None: 249 super().setUp() 250 self._connector = self._launcher.start( 251 Path(self._wdir.name), 'test-target' 252 ) 253 254 def tearDown(self) -> None: 255 self._connector.stop() 256 super().tearDown() 257 258 def test_tcp_channel_addr(self) -> None: 259 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 260 sock.connect(self._connector.get_channel_addr('tcp')) 261 sock.close() 262 263 def test_get_channel_type(self) -> None: 264 self.assertEqual(self._connector.get_channel_type('tcp'), 'tcp') 265 if sys.platform != 'win32': 266 self.assertEqual(self._connector.get_channel_type('pty'), 'pty') 267 with self.assertRaises(InvalidChannelName): 268 self._connector.get_channel_type('invalid channel') 269 270 def test_pty_channel_path(self) -> None: 271 if sys.platform == 'win32': 272 self.skipTest('pty not supported on win32') 273 self.assertTrue(os.path.exists(self._connector.get_channel_path('pty'))) 274 with self.assertRaises(InvalidChannelType): 275 self._connector.get_channel_path('tcp') 276 277 def _test_stream(self, stream: io.RawIOBase) -> None: 278 for char in [b'1', b'2', b'3', b'4']: 279 stream.write(char) 280 self.assertEqual(stream.read(1), char) 281 282 def test_tcp_stream(self) -> None: 283 with self._connector.get_channel_stream('tcp') as stream: 284 self._test_stream(stream) 285 286 def test_pty_stream(self) -> None: 287 if sys.platform == 'win32': 288 self.skipTest('pty not supported on win32') 289 with self._connector.get_channel_stream('pty') as stream: 290 self._test_stream(stream) 291 292 293class TestTargetFragments(unittest.TestCase): 294 """Tests for configurations using target fragments.""" 295 296 _config_templ: dict[str, Any] = { 297 'pw': { 298 'pw_emu': { 299 'target_files': [], 300 'targets': { 301 'test-target': { 302 'test-key': 'test-value', 303 } 304 }, 305 } 306 } 307 } 308 309 _tf1_config: dict[str, Any] = { 310 'targets': { 311 'test-target1': {}, 312 'test-target': { 313 'test-key': 'test-value-file1', 314 }, 315 } 316 } 317 318 _tf2_config: dict[str, Any] = {'targets': {'test-target2': {}}} 319 320 def setUp(self) -> None: 321 with tempfile.NamedTemporaryFile( 322 'wt', delete=False 323 ) as config_file, tempfile.NamedTemporaryFile( 324 'wt', delete=False 325 ) as targets1_file, tempfile.NamedTemporaryFile( 326 'wt', delete=False 327 ) as targets2_file: 328 self._config_path = config_file.name 329 self._targets1_path = targets1_file.name 330 self._targets2_path = targets2_file.name 331 json.dump(self._tf1_config, targets1_file) 332 json.dump(self._tf2_config, targets2_file) 333 config = copy.deepcopy(self._config_templ) 334 config['pw']['pw_emu']['target_files'].append(self._targets1_path) 335 config['pw']['pw_emu']['target_files'].append(self._targets2_path) 336 json.dump(config, config_file) 337 self._config = Config(Path(self._config_path)) 338 339 def tearDown(self) -> None: 340 os.unlink(self._config_path) 341 os.unlink(self._targets1_path) 342 os.unlink(self._targets2_path) 343 344 def test_targets_loaded(self) -> None: 345 self.assertIsNotNone(self._config.get(['targets', 'test-target'])) 346 self.assertIsNotNone(self._config.get(['targets', 'test-target1'])) 347 self.assertIsNotNone(self._config.get(['targets', 'test-target2'])) 348 349 def test_targets_priority(self) -> None: 350 self.assertEqual( 351 self._config.get(['targets', 'test-target', 'test-key']), 352 'test-value', 353 ) 354 355 356class TestHandles(unittest.TestCase): 357 """Tests for Handles.""" 358 359 _config = { 360 'emu': 'mock-emu', 361 'config': 'test-config', 362 'target': 'test-target', 363 'gdb_cmd': ['test-gdb'], 364 'channels': { 365 'tcp_chan': {'type': 'tcp', 'host': 'localhost', 'port': 1234}, 366 'pty_chan': {'type': 'pty', 'path': 'path'}, 367 }, 368 'procs': {'proc0': {'pid': 1983}, 'proc1': {'pid': 1234}}, 369 } 370 371 def test_serialize(self): 372 handles = Handles('mock-emu', 'test-config') 373 handles.add_channel_tcp('tcp_chan', 'localhost', 1234) 374 handles.add_channel_pty('pty_chan', 'path') 375 handles.add_proc('proc0', 1983) 376 handles.add_proc('proc1', 1234) 377 handles.set_target('test-target') 378 handles.set_gdb_cmd(['test-gdb']) 379 tmp = tempfile.TemporaryDirectory() 380 handles.save(Path(tmp.name)) 381 with open(os.path.join(tmp.name, 'handles.json'), 'rt') as file: 382 self.assertTrue(json.load(file) == self._config) 383 tmp.cleanup() 384 385 def test_load(self): 386 tmp = tempfile.TemporaryDirectory() 387 with open(os.path.join(tmp.name, 'handles.json'), 'wt') as file: 388 json.dump(self._config, file) 389 handles = Handles.load(Path(tmp.name)) 390 self.assertEqual(handles.emu, 'mock-emu') 391 self.assertEqual(handles.gdb_cmd, ['test-gdb']) 392 self.assertEqual(handles.target, 'test-target') 393 self.assertEqual(handles.config, 'test-config') 394 tmp.cleanup() 395 396 397class TestConfig(ConfigHelper): 398 """Stop tests for valid config.""" 399 400 _config: dict[str, Any] = { 401 'top': 'entry', 402 'multi': { 403 'level': { 404 'entry': 0, 405 }, 406 }, 407 'subst': 'a/$pw_env{PW_EMU_TEST_ENV_SUBST}/c', 408 'targets': { 409 'test-target': { 410 'entry': [1, 2, 3], 411 'mock-emu': { 412 'entry': 'test', 413 }, 414 } 415 }, 416 'mock-emu': { 417 'executable': _mock_emu, 418 }, 419 'list': ['a', '$pw_env{PW_EMU_TEST_ENV_SUBST}', 'c'], 420 'bad-subst-type': '$pw_bad_subst_type{test}', 421 } 422 423 def setUp(self) -> None: 424 super().setUp() 425 self._cfg = Config(Path(self._config_file), 'test-target', 'mock-emu') 426 427 def test_top_entry(self) -> None: 428 self.assertEqual(self._cfg.get(['top']), 'entry') 429 430 def test_empty_subst(self) -> None: 431 with self.assertRaises(ConfigError): 432 self._cfg.get(['subst']) 433 434 def test_subst(self) -> None: 435 with patch.dict('os.environ', {'PW_EMU_TEST_ENV_SUBST': 'b'}): 436 self.assertEqual(self._cfg.get(['subst']), 'a/b/c') 437 438 def test_multi_level_entry(self) -> None: 439 self.assertEqual(self._cfg.get(['multi', 'level', 'entry']), 0) 440 441 def test_get_target(self) -> None: 442 self.assertEqual(self._cfg.get_targets(), ['test-target']) 443 444 def test_target(self) -> None: 445 self.assertEqual(self._cfg.get_target(['entry']), [1, 2, 3]) 446 447 def test_target_emu(self) -> None: 448 self.assertEqual(self._cfg.get_target_emu(['entry']), 'test') 449 450 def test_type_checking(self) -> None: 451 with self.assertRaises(ConfigError): 452 self._cfg.get(['top'], entry_type=int) 453 self._cfg.get(['top'], entry_type=str) 454 self._cfg.get_target(['entry'], entry_type=list) 455 self._cfg.get_target_emu(['entry'], entry_type=str) 456 self._cfg.get(['targets'], entry_type=dict) 457 458 def test_non_optional(self) -> None: 459 with self.assertRaises(ConfigError): 460 self._cfg.get(['non-existing'], optional=False) 461 462 def test_optional(self) -> None: 463 self.assertEqual(self._cfg.get(['non-existing']), None) 464 self.assertEqual(self._cfg.get(['non-existing'], entry_type=int), 0) 465 self.assertEqual(self._cfg.get(['non-existing'], entry_type=str), '') 466 self.assertEqual(self._cfg.get(['non-existing'], entry_type=list), []) 467 468 def test_list(self) -> None: 469 with self.assertRaises(ConfigError): 470 self._cfg.get(['list']) 471 with patch.dict('os.environ', {'PW_EMU_TEST_ENV_SUBST': 'b'}): 472 self.assertEqual(self._cfg.get(['list']), ['a', 'b', 'c']) 473 474 def test_bad_subst(self) -> None: 475 with self.assertRaises(ConfigError): 476 self._cfg.get(['bad-subst-type']) 477 478 479if __name__ == '__main__': 480 unittest.main() 481