• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Tests for pw_console.log_store"""
15
16import logging
17import unittest
18from unittest.mock import MagicMock
19
20from pw_console.log_store import LogStore
21from pw_console.console_prefs import ConsolePrefs
22
23
24def _create_log_store():
25    log_store = LogStore(
26        prefs=ConsolePrefs(
27            project_file=False, project_user_file=False, user_file=False
28        )
29    )
30
31    assert not log_store.table.prefs.show_python_file
32    viewer = MagicMock()
33    viewer.new_logs_arrived = MagicMock()
34    log_store.register_viewer(viewer)
35    return log_store, viewer
36
37
38class TestLogStore(unittest.TestCase):
39    """Tests for LogStore."""
40
41    def setUp(self):
42        self.maxDiff = None  # pylint: disable=invalid-name
43
44    def test_get_total_count(self) -> None:
45        log_store, viewer = _create_log_store()
46        test_log = logging.getLogger('log_store.test')
47        # Must use the assertLogs context manager and the addHandler call.
48        with self.assertLogs(test_log, level='DEBUG') as log_context:
49            test_log.addHandler(log_store)
50            for i in range(5):
51                test_log.debug('Test log %s', i)
52
53        # Expected log message content
54        self.assertEqual(
55            ['DEBUG:log_store.test:Test log {}'.format(i) for i in range(5)],
56            log_context.output,
57        )
58        # LogStore state checks
59        viewer.new_logs_arrived.assert_called()
60        self.assertEqual(5, log_store.get_total_count())
61        self.assertEqual(4, log_store.get_last_log_index())
62
63        log_store.clear_logs()
64        self.assertEqual(0, log_store.get_total_count())
65
66    def test_channel_counts_and_prefix_width(self) -> None:
67        """Test logger names and prefix width calculations."""
68        log_store, _viewer = _create_log_store()
69
70        # Log some messagse on 3 separate logger instances
71        for i, logger_name in enumerate(
72            [
73                'log_store.test',
74                'log_store.dev',
75                'log_store.production',
76            ]
77        ):
78            test_log = logging.getLogger(logger_name)
79            with self.assertLogs(test_log, level='DEBUG') as _log_context:
80                test_log.addHandler(log_store)
81                test_log.debug('Test log message')
82                for j in range(i):
83                    test_log.debug('%s', j)
84
85        self.assertEqual(
86            {
87                'log_store.test': 1,
88                'log_store.dev': 2,
89                'log_store.production': 3,
90            },
91            log_store.channel_counts,
92        )
93        self.assertEqual(
94            'log_store.test: 1, log_store.dev: 2, log_store.production: 3',
95            log_store.get_channel_counts(),
96        )
97
98        self.assertRegex(
99            log_store.logs[0].ansi_stripped_log,
100            r'[0-9]{8} [0-9]{2}:[0-9]{2}:[0-9]{2} DEBUG Test log message',
101        )
102        self.assertEqual(
103            {
104                'log_store.test': 0,
105                'log_store.dev': 0,
106                'log_store.production': 0,
107            },
108            log_store.channel_formatted_prefix_widths,
109        )
110
111    def test_render_table_header_with_metadata(self) -> None:
112        log_store, _viewer = _create_log_store()
113        test_log = logging.getLogger('log_store.test')
114
115        # Log table with extra columns
116        with self.assertLogs(test_log, level='DEBUG') as _log_context:
117            test_log.addHandler(log_store)
118            test_log.debug(
119                'Test log %s',
120                extra=dict(
121                    extra_metadata_fields={
122                        'planet': 'Jupiter',
123                        'galaxy': 'Milky Way',
124                    }
125                ),
126            )
127
128        self.assertEqual(
129            [
130                ('bold', 'Time             '),
131                ('', '  '),
132                ('bold', 'Level'),
133                ('', '  '),
134                ('bold', 'Planet '),
135                ('', '  '),
136                ('bold', 'Galaxy   '),
137                ('', '  '),
138                ('bold', 'Message'),
139            ],
140            log_store.render_table_header(),
141        )
142
143
144if __name__ == '__main__':
145    unittest.main()
146