1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Tests for pw_console.log_store""" 15 16import logging 17import unittest 18from unittest.mock import MagicMock 19 20from pw_console.log_store import LogStore 21from pw_console.console_prefs import ConsolePrefs 22 23 24def _create_log_store(): 25 log_store = LogStore( 26 prefs=ConsolePrefs( 27 project_file=False, project_user_file=False, user_file=False 28 ) 29 ) 30 31 assert not log_store.table.prefs.show_python_file 32 viewer = MagicMock() 33 viewer.new_logs_arrived = MagicMock() 34 log_store.register_viewer(viewer) 35 return log_store, viewer 36 37 38class TestLogStore(unittest.TestCase): 39 """Tests for LogStore.""" 40 41 def setUp(self): 42 self.maxDiff = None # pylint: disable=invalid-name 43 44 def test_get_total_count(self) -> None: 45 log_store, viewer = _create_log_store() 46 test_log = logging.getLogger('log_store.test') 47 # Must use the assertLogs context manager and the addHandler call. 48 with self.assertLogs(test_log, level='DEBUG') as log_context: 49 test_log.addHandler(log_store) 50 for i in range(5): 51 test_log.debug('Test log %s', i) 52 53 # Expected log message content 54 self.assertEqual( 55 ['DEBUG:log_store.test:Test log {}'.format(i) for i in range(5)], 56 log_context.output, 57 ) 58 # LogStore state checks 59 viewer.new_logs_arrived.assert_called() 60 self.assertEqual(5, log_store.get_total_count()) 61 self.assertEqual(4, log_store.get_last_log_index()) 62 63 log_store.clear_logs() 64 self.assertEqual(0, log_store.get_total_count()) 65 66 def test_channel_counts_and_prefix_width(self) -> None: 67 """Test logger names and prefix width calculations.""" 68 log_store, _viewer = _create_log_store() 69 70 # Log some messagse on 3 separate logger instances 71 for i, logger_name in enumerate( 72 [ 73 'log_store.test', 74 'log_store.dev', 75 'log_store.production', 76 ] 77 ): 78 test_log = logging.getLogger(logger_name) 79 with self.assertLogs(test_log, level='DEBUG') as _log_context: 80 test_log.addHandler(log_store) 81 test_log.debug('Test log message') 82 for j in range(i): 83 test_log.debug('%s', j) 84 85 self.assertEqual( 86 { 87 'log_store.test': 1, 88 'log_store.dev': 2, 89 'log_store.production': 3, 90 }, 91 log_store.channel_counts, 92 ) 93 self.assertEqual( 94 'log_store.test: 1, log_store.dev: 2, log_store.production: 3', 95 log_store.get_channel_counts(), 96 ) 97 98 self.assertRegex( 99 log_store.logs[0].ansi_stripped_log, 100 r'[0-9]{8} [0-9]{2}:[0-9]{2}:[0-9]{2} DEBUG Test log message', 101 ) 102 self.assertEqual( 103 { 104 'log_store.test': 0, 105 'log_store.dev': 0, 106 'log_store.production': 0, 107 }, 108 log_store.channel_formatted_prefix_widths, 109 ) 110 111 def test_render_table_header_with_metadata(self) -> None: 112 log_store, _viewer = _create_log_store() 113 test_log = logging.getLogger('log_store.test') 114 115 # Log table with extra columns 116 with self.assertLogs(test_log, level='DEBUG') as _log_context: 117 test_log.addHandler(log_store) 118 test_log.debug( 119 'Test log %s', 120 extra=dict( 121 extra_metadata_fields={ 122 'planet': 'Jupiter', 123 'galaxy': 'Milky Way', 124 } 125 ), 126 ) 127 128 self.assertEqual( 129 [ 130 ('bold', 'Time '), 131 ('', ' '), 132 ('bold', 'Level'), 133 ('', ' '), 134 ('bold', 'Planet '), 135 ('', ' '), 136 ('bold', 'Galaxy '), 137 ('', ' '), 138 ('bold', 'Message'), 139 ], 140 log_store.render_table_header(), 141 ) 142 143 144if __name__ == '__main__': 145 unittest.main() 146