• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2# Copyright 2023 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15"""Tests for core / infrastructure code."""
16
17import copy
18import io
19import json
20import os
21import socket
22import sys
23import tempfile
24import time
25import unittest
26from pathlib import Path
27from typing import Any
28
29from unittest.mock import patch
30
31from pw_emu.core import (
32    AlreadyRunning,
33    Config,
34    ConfigError,
35    Handles,
36    InvalidTarget,
37    InvalidChannelName,
38    InvalidChannelType,
39    Launcher,
40)
41from mock_emu_frontend import _mock_emu
42from config_helper import ConfigHelper
43
44
45class ConfigHelperWithLauncher(ConfigHelper):
46    def setUp(self) -> None:
47        super().setUp()
48        self._launcher = Launcher.get('mock-emu', Path(self._config_file))
49
50
51class TestInvalidTarget(ConfigHelperWithLauncher):
52    """Check that InvalidTarget is raised with an empty config."""
53
54    _config: dict[str, Any] = {
55        'emulators': {
56            'mock-emu': {
57                'launcher': 'mock_emu_frontend.MockEmuLauncher',
58                'connector': 'mock_emu_frontend.MockEmuConnector',
59            }
60        },
61    }
62
63    def test_invalid_target(self) -> None:
64        with self.assertRaises(InvalidTarget):
65            self._launcher.start(Path(self._wdir.name), 'test-target')
66
67
68class TestStart(ConfigHelperWithLauncher):
69    """Start tests for valid config."""
70
71    _config: dict[str, Any] = {
72        'emulators': {
73            'mock-emu': {
74                'launcher': 'mock_emu_frontend.MockEmuLauncher',
75                'connector': 'mock_emu_frontend.MockEmuConnector',
76            }
77        },
78        'targets': {'test-target': {'mock-emu': {}}},
79    }
80
81    def setUp(self) -> None:
82        super().setUp()
83        self._connector = self._launcher.start(
84            Path(self._wdir.name), 'test-target'
85        )
86
87    def tearDown(self) -> None:
88        self._connector.stop()
89        super().tearDown()
90
91    def test_running(self) -> None:
92        self.assertTrue(self._connector.running())
93
94    def test_pid_file(self) -> None:
95        self.assertTrue(
96            os.path.exists(os.path.join(self._wdir.name, 'mock-emu.pid'))
97        )
98
99    def test_handles_file(self) -> None:
100        self.assertTrue(
101            os.path.exists(os.path.join(self._wdir.name, 'handles.json'))
102        )
103
104    def test_already_running(self) -> None:
105        with self.assertRaises(AlreadyRunning):
106            self._launcher.start(Path(self._wdir.name), 'test-target')
107
108    def test_log(self) -> None:
109        exp = 'starting mock emulator'
110        path = os.path.join(self._wdir.name, 'mock-emu.log')
111        deadline = time.monotonic() + 100
112        while os.path.getsize(path) < len(exp):
113            time.sleep(0.1)
114            if time.monotonic() > deadline:
115                break
116
117        with open(os.path.join(self._wdir.name, 'mock-emu.log'), 'rt') as file:
118            data = file.read()
119            self.assertTrue(exp in data, data)
120
121
122class TestPrePostStartCmds(ConfigHelperWithLauncher):
123    """Tests for configurations with pre-start commands."""
124
125    _config: dict[str, Any] = {
126        'emulators': {
127            'mock-emu': {
128                'launcher': 'mock_emu_frontend.MockEmuLauncher',
129                'connector': 'mock_emu_frontend.MockEmuConnector',
130            }
131        },
132        'targets': {
133            'test-target': {
134                'pre-start-cmds': {
135                    'pre-1': _mock_emu + ['pre-1'],
136                    'pre-2': _mock_emu + ['$pw_emu_wdir{pre-2}'],
137                },
138                'post-start-cmds': {
139                    'post-1': _mock_emu
140                    + ['$pw_emu_channel_path{test_subst_pty}'],
141                    'post-2': _mock_emu
142                    + [
143                        '$pw_emu_channel_host{test_subst_tcp}',
144                        '$pw_emu_channel_port{test_subst_tcp}',
145                    ],
146                },
147                'mock-emu': {},
148            }
149        },
150    }
151
152    def setUp(self) -> None:
153        super().setUp()
154        self._connector = self._launcher.start(
155            Path(self._wdir.name), 'test-target'
156        )
157
158    def tearDown(self) -> None:
159        self._connector.stop()
160        super().tearDown()
161
162    def test_running(self) -> None:
163        for proc in self._connector.get_procs().keys():
164            self.assertTrue(self._connector.proc_running(proc))
165
166    def test_stop(self) -> None:
167        self._connector.stop()
168        for proc in self._connector.get_procs().keys():
169            self.assertFalse(self._connector.proc_running(proc))
170
171    def test_pid_files(self) -> None:
172        for proc in ['pre-1', 'pre-2', 'post-1', 'post-2']:
173            self.assertTrue(
174                os.path.exists(os.path.join(self._wdir.name, f'{proc}.pid'))
175            )
176
177    def test_logs(self):
178        expect = {
179            'pre-1.log': 'pre-1',
180            'pre-2.log': os.path.join(self._wdir.name, 'pre-2'),
181            'post-1.log': 'pty-path',
182            'post-2.log': 'localhost 1234',
183        }
184
185        for log, pattern in expect.items():
186            path = os.path.join(self._wdir.name, log)
187            deadline = time.monotonic() + 100
188            while os.path.getsize(path) < len(pattern):
189                time.sleep(0.1)
190                if time.monotonic() > deadline:
191                    break
192            with open(os.path.join(self._wdir.name, log)) as file:
193                data = file.read()
194            self.assertTrue(pattern in data, f'`{pattern}` not in `{data}`')
195
196
197class TestStop(ConfigHelperWithLauncher):
198    """Stop tests for valid config."""
199
200    _config: dict[str, Any] = {
201        'emulators': {
202            'mock-emu': {
203                'launcher': 'mock_emu_frontend.MockEmuLauncher',
204                'connector': 'mock_emu_frontend.MockEmuConnector',
205            }
206        },
207        'targets': {'test-target': {'mock-emu': {}}},
208    }
209
210    def setUp(self) -> None:
211        super().setUp()
212        self._connector = self._launcher.start(
213            Path(self._wdir.name), 'test-target'
214        )
215        self._connector.stop()
216
217    def test_pid_files(self) -> None:
218        self.assertFalse(
219            os.path.exists(os.path.join(self._wdir.name, 'emu.pid'))
220        )
221
222    def test_target_file(self) -> None:
223        self.assertFalse(
224            os.path.exists(os.path.join(self._wdir.name, 'target'))
225        )
226
227    def test_running(self) -> None:
228        self.assertFalse(self._connector.running())
229
230
231class TestChannels(ConfigHelperWithLauncher):
232    """Test Connector channels APIs."""
233
234    _config: dict[str, Any] = {
235        'emulators': {
236            'mock-emu': {
237                'launcher': 'mock_emu_frontend.MockEmuLauncher',
238                'connector': 'mock_emu_frontend.MockEmuConnector',
239            }
240        },
241        'mock-emu': {
242            'tcp_channel': True,
243            'pty_channel': sys.platform != 'win32',
244        },
245        'targets': {'test-target': {'mock-emu': {}}},
246    }
247
248    def setUp(self) -> None:
249        super().setUp()
250        self._connector = self._launcher.start(
251            Path(self._wdir.name), 'test-target'
252        )
253
254    def tearDown(self) -> None:
255        self._connector.stop()
256        super().tearDown()
257
258    def test_tcp_channel_addr(self) -> None:
259        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
260        sock.connect(self._connector.get_channel_addr('tcp'))
261        sock.close()
262
263    def test_get_channel_type(self) -> None:
264        self.assertEqual(self._connector.get_channel_type('tcp'), 'tcp')
265        if sys.platform != 'win32':
266            self.assertEqual(self._connector.get_channel_type('pty'), 'pty')
267        with self.assertRaises(InvalidChannelName):
268            self._connector.get_channel_type('invalid channel')
269
270    def test_pty_channel_path(self) -> None:
271        if sys.platform == 'win32':
272            self.skipTest('pty not supported on win32')
273        self.assertTrue(os.path.exists(self._connector.get_channel_path('pty')))
274        with self.assertRaises(InvalidChannelType):
275            self._connector.get_channel_path('tcp')
276
277    def _test_stream(self, stream: io.RawIOBase) -> None:
278        for char in [b'1', b'2', b'3', b'4']:
279            stream.write(char)
280            self.assertEqual(stream.read(1), char)
281
282    def test_tcp_stream(self) -> None:
283        with self._connector.get_channel_stream('tcp') as stream:
284            self._test_stream(stream)
285
286    def test_pty_stream(self) -> None:
287        if sys.platform == 'win32':
288            self.skipTest('pty not supported on win32')
289        with self._connector.get_channel_stream('pty') as stream:
290            self._test_stream(stream)
291
292
293class TestTargetFragments(unittest.TestCase):
294    """Tests for configurations using target fragments."""
295
296    _config_templ: dict[str, Any] = {
297        'pw': {
298            'pw_emu': {
299                'target_files': [],
300                'targets': {
301                    'test-target': {
302                        'test-key': 'test-value',
303                    }
304                },
305            }
306        }
307    }
308
309    _tf1_config: dict[str, Any] = {
310        'targets': {
311            'test-target1': {},
312            'test-target': {
313                'test-key': 'test-value-file1',
314            },
315        }
316    }
317
318    _tf2_config: dict[str, Any] = {'targets': {'test-target2': {}}}
319
320    def setUp(self) -> None:
321        with tempfile.NamedTemporaryFile(
322            'wt', delete=False
323        ) as config_file, tempfile.NamedTemporaryFile(
324            'wt', delete=False
325        ) as targets1_file, tempfile.NamedTemporaryFile(
326            'wt', delete=False
327        ) as targets2_file:
328            self._config_path = config_file.name
329            self._targets1_path = targets1_file.name
330            self._targets2_path = targets2_file.name
331            json.dump(self._tf1_config, targets1_file)
332            json.dump(self._tf2_config, targets2_file)
333            config = copy.deepcopy(self._config_templ)
334            config['pw']['pw_emu']['target_files'].append(self._targets1_path)
335            config['pw']['pw_emu']['target_files'].append(self._targets2_path)
336            json.dump(config, config_file)
337        self._config = Config(Path(self._config_path))
338
339    def tearDown(self) -> None:
340        os.unlink(self._config_path)
341        os.unlink(self._targets1_path)
342        os.unlink(self._targets2_path)
343
344    def test_targets_loaded(self) -> None:
345        self.assertIsNotNone(self._config.get(['targets', 'test-target']))
346        self.assertIsNotNone(self._config.get(['targets', 'test-target1']))
347        self.assertIsNotNone(self._config.get(['targets', 'test-target2']))
348
349    def test_targets_priority(self) -> None:
350        self.assertEqual(
351            self._config.get(['targets', 'test-target', 'test-key']),
352            'test-value',
353        )
354
355
356class TestHandles(unittest.TestCase):
357    """Tests for Handles."""
358
359    _config = {
360        'emu': 'mock-emu',
361        'config': 'test-config',
362        'target': 'test-target',
363        'gdb_cmd': ['test-gdb'],
364        'channels': {
365            'tcp_chan': {'type': 'tcp', 'host': 'localhost', 'port': 1234},
366            'pty_chan': {'type': 'pty', 'path': 'path'},
367        },
368        'procs': {'proc0': {'pid': 1983}, 'proc1': {'pid': 1234}},
369    }
370
371    def test_serialize(self):
372        handles = Handles('mock-emu', 'test-config')
373        handles.add_channel_tcp('tcp_chan', 'localhost', 1234)
374        handles.add_channel_pty('pty_chan', 'path')
375        handles.add_proc('proc0', 1983)
376        handles.add_proc('proc1', 1234)
377        handles.set_target('test-target')
378        handles.set_gdb_cmd(['test-gdb'])
379        tmp = tempfile.TemporaryDirectory()
380        handles.save(Path(tmp.name))
381        with open(os.path.join(tmp.name, 'handles.json'), 'rt') as file:
382            self.assertTrue(json.load(file) == self._config)
383        tmp.cleanup()
384
385    def test_load(self):
386        tmp = tempfile.TemporaryDirectory()
387        with open(os.path.join(tmp.name, 'handles.json'), 'wt') as file:
388            json.dump(self._config, file)
389        handles = Handles.load(Path(tmp.name))
390        self.assertEqual(handles.emu, 'mock-emu')
391        self.assertEqual(handles.gdb_cmd, ['test-gdb'])
392        self.assertEqual(handles.target, 'test-target')
393        self.assertEqual(handles.config, 'test-config')
394        tmp.cleanup()
395
396
397class TestConfig(ConfigHelper):
398    """Stop tests for valid config."""
399
400    _config: dict[str, Any] = {
401        'top': 'entry',
402        'multi': {
403            'level': {
404                'entry': 0,
405            },
406        },
407        'subst': 'a/$pw_env{PW_EMU_TEST_ENV_SUBST}/c',
408        'targets': {
409            'test-target': {
410                'entry': [1, 2, 3],
411                'mock-emu': {
412                    'entry': 'test',
413                },
414            }
415        },
416        'mock-emu': {
417            'executable': _mock_emu,
418        },
419        'list': ['a', '$pw_env{PW_EMU_TEST_ENV_SUBST}', 'c'],
420        'bad-subst-type': '$pw_bad_subst_type{test}',
421    }
422
423    def setUp(self) -> None:
424        super().setUp()
425        self._cfg = Config(Path(self._config_file), 'test-target', 'mock-emu')
426
427    def test_top_entry(self) -> None:
428        self.assertEqual(self._cfg.get(['top']), 'entry')
429
430    def test_empty_subst(self) -> None:
431        with self.assertRaises(ConfigError):
432            self._cfg.get(['subst'])
433
434    def test_subst(self) -> None:
435        with patch.dict('os.environ', {'PW_EMU_TEST_ENV_SUBST': 'b'}):
436            self.assertEqual(self._cfg.get(['subst']), 'a/b/c')
437
438    def test_multi_level_entry(self) -> None:
439        self.assertEqual(self._cfg.get(['multi', 'level', 'entry']), 0)
440
441    def test_get_target(self) -> None:
442        self.assertEqual(self._cfg.get_targets(), ['test-target'])
443
444    def test_target(self) -> None:
445        self.assertEqual(self._cfg.get_target(['entry']), [1, 2, 3])
446
447    def test_target_emu(self) -> None:
448        self.assertEqual(self._cfg.get_target_emu(['entry']), 'test')
449
450    def test_type_checking(self) -> None:
451        with self.assertRaises(ConfigError):
452            self._cfg.get(['top'], entry_type=int)
453        self._cfg.get(['top'], entry_type=str)
454        self._cfg.get_target(['entry'], entry_type=list)
455        self._cfg.get_target_emu(['entry'], entry_type=str)
456        self._cfg.get(['targets'], entry_type=dict)
457
458    def test_non_optional(self) -> None:
459        with self.assertRaises(ConfigError):
460            self._cfg.get(['non-existing'], optional=False)
461
462    def test_optional(self) -> None:
463        self.assertEqual(self._cfg.get(['non-existing']), None)
464        self.assertEqual(self._cfg.get(['non-existing'], entry_type=int), 0)
465        self.assertEqual(self._cfg.get(['non-existing'], entry_type=str), '')
466        self.assertEqual(self._cfg.get(['non-existing'], entry_type=list), [])
467
468    def test_list(self) -> None:
469        with self.assertRaises(ConfigError):
470            self._cfg.get(['list'])
471        with patch.dict('os.environ', {'PW_EMU_TEST_ENV_SUBST': 'b'}):
472            self.assertEqual(self._cfg.get(['list']), ['a', 'b', 'c'])
473
474    def test_bad_subst(self) -> None:
475        with self.assertRaises(ConfigError):
476            self._cfg.get(['bad-subst-type'])
477
478
479if __name__ == '__main__':
480    unittest.main()
481