1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Tests for pw_console.log_view""" 15 16import logging 17import time 18import sys 19import unittest 20from datetime import datetime 21from unittest.mock import MagicMock, patch 22from parameterized import parameterized # type: ignore 23from prompt_toolkit.data_structures import Point 24 25from pw_console.console_prefs import ConsolePrefs 26from pw_console.log_view import LogView 27from pw_console.log_screen import ScreenLine 28from pw_console.text_formatting import ( 29 flatten_formatted_text_tuples, 30 join_adjacent_style_tuples, 31) 32 33_PYTHON_3_8 = sys.version_info >= ( 34 3, 35 8, 36) 37 38 39def _create_log_view(): 40 log_pane = MagicMock() 41 log_pane.pane_resized = MagicMock(return_value=True) 42 log_pane.current_log_pane_width = 80 43 log_pane.current_log_pane_height = 10 44 45 application = MagicMock() 46 application.prefs = ConsolePrefs( 47 project_file=False, project_user_file=False, user_file=False 48 ) 49 application.prefs.reset_config() 50 log_view = LogView(log_pane, application) 51 return log_view, log_pane 52 53 54class TestLogView(unittest.TestCase): 55 """Tests for LogView.""" 56 57 # pylint: disable=invalid-name 58 def setUp(self): 59 self.maxDiff = None 60 61 # pylint: enable=invalid-name 62 63 def _create_log_view_with_logs(self, log_count=100): 64 log_view, log_pane = _create_log_view() 65 66 if log_count > 0: 67 test_log = logging.getLogger('log_view.test') 68 with self.assertLogs(test_log, level='DEBUG') as _log_context: 69 test_log.addHandler(log_view.log_store) 70 for i in range(log_count): 71 test_log.debug('Test log %s', i) 72 73 return log_view, log_pane 74 75 def test_follow_toggle(self) -> None: 76 log_view, _pane = _create_log_view() 77 self.assertTrue(log_view.follow) 78 log_view.toggle_follow() 79 self.assertFalse(log_view.follow) 80 81 def test_follow_scrolls_to_bottom(self) -> None: 82 log_view, _pane = _create_log_view() 83 log_view.toggle_follow() 84 _fragments = log_view.render_content() 85 self.assertFalse(log_view.follow) 86 self.assertEqual(log_view.get_current_line(), 0) 87 88 test_log = logging.getLogger('log_view.test') 89 90 # Log 5 messagse, current_line should stay at 0 91 with self.assertLogs(test_log, level='DEBUG') as _log_context: 92 test_log.addHandler(log_view.log_store) 93 for i in range(5): 94 test_log.debug('Test log %s', i) 95 _fragments = log_view.render_content() 96 97 self.assertEqual(log_view.get_total_count(), 5) 98 self.assertEqual(log_view.get_current_line(), 0) 99 # Turn follow on 100 log_view.toggle_follow() 101 self.assertTrue(log_view.follow) 102 103 # Log another messagse, current_line should move to the last. 104 with self.assertLogs(test_log, level='DEBUG') as _log_context: 105 test_log.addHandler(log_view.log_store) 106 test_log.debug('Test log') 107 _fragments = log_view.render_content() 108 109 self.assertEqual(log_view.get_total_count(), 6) 110 self.assertEqual(log_view.get_current_line(), 5) 111 112 def test_scrolling(self) -> None: 113 """Test all scrolling methods.""" 114 log_view, log_pane = self._create_log_view_with_logs(log_count=100) 115 116 # Page scrolling needs to know the current window height. 117 log_pane.pane_resized = MagicMock(return_value=True) 118 log_pane.current_log_pane_width = 80 119 log_pane.current_log_pane_height = 10 120 121 log_view.render_content() 122 123 # Follow is on by default, current line should be at the end. 124 self.assertEqual(log_view.get_current_line(), 99) 125 126 # Move to the beginning. 127 log_view.scroll_to_top() 128 log_view.render_content() 129 self.assertEqual(log_view.get_current_line(), 0) 130 131 # Should not be able to scroll before the beginning. 132 log_view.scroll_up() 133 log_view.render_content() 134 self.assertEqual(log_view.get_current_line(), 0) 135 log_view.scroll_up_one_page() 136 log_view.render_content() 137 self.assertEqual(log_view.get_current_line(), 0) 138 139 # Single and multi line movement. 140 log_view.scroll_down() 141 log_view.render_content() 142 self.assertEqual(log_view.get_current_line(), 1) 143 log_view.scroll(5) 144 log_view.render_content() 145 self.assertEqual(log_view.get_current_line(), 6) 146 log_view.scroll_up() 147 log_view.render_content() 148 self.assertEqual(log_view.get_current_line(), 5) 149 150 # Page down and up. 151 log_view.scroll_down_one_page() 152 self.assertEqual(log_view.get_current_line(), 15) 153 154 log_view.scroll_up_one_page() 155 self.assertEqual(log_view.get_current_line(), 5) 156 157 # Move to the end. 158 log_view.scroll_to_bottom() 159 log_view.render_content() 160 self.assertEqual(log_view.get_current_line(), 99) 161 162 # Should not be able to scroll beyond the end. 163 log_view.scroll_down() 164 log_view.render_content() 165 self.assertEqual(log_view.get_current_line(), 99) 166 log_view.scroll_down_one_page() 167 log_view.render_content() 168 self.assertEqual(log_view.get_current_line(), 99) 169 170 # Move up a bit to turn off follow 171 self.assertEqual(log_view.log_screen.cursor_position, 9) 172 log_view.scroll(-1) 173 self.assertEqual(log_view.log_screen.cursor_position, 8) 174 log_view.render_content() 175 self.assertEqual(log_view.get_current_line(), 98) 176 177 # Simulate a mouse click to scroll. 178 # Click 1 lines from the top of the window. 179 log_view.scroll_to_position(Point(0, 1)) 180 log_view.render_content() 181 self.assertEqual(log_view.get_current_line(), 90) 182 183 # Disable follow mode if mouse click on line. 184 log_view.toggle_follow() 185 log_view.render_content() 186 self.assertTrue(log_view.follow) 187 self.assertEqual(log_view.get_current_line(), 99) 188 log_view.scroll_to_position(Point(0, 5)) 189 log_view.render_content() 190 self.assertEqual(log_view.get_current_line(), 95) 191 self.assertFalse(log_view.follow) 192 193 def test_render_content_and_cursor_position(self) -> None: 194 """Test render_content results and get_cursor_position 195 196 get_cursor_position() should return the correct position depending on 197 what line is selected.""" 198 199 # Mock time to always return the same value. 200 mock_time = MagicMock( # type: ignore 201 return_value=time.mktime(datetime(2021, 7, 13, 0, 0, 0).timetuple()) 202 ) 203 with patch('time.time', new=mock_time): 204 log_view, log_pane = self._create_log_view_with_logs(log_count=4) 205 206 # Mock needed LogPane functions that pull info from prompt_toolkit. 207 log_pane.get_horizontal_scroll_amount = MagicMock(return_value=0) 208 log_pane.current_log_pane_width = 80 209 log_pane.current_log_pane_height = 10 210 211 log_view.render_content() 212 log_view.scroll_to_top() 213 log_view.render_content() 214 # Scroll to top keeps the cursor on the bottom of the window. 215 self.assertEqual(log_view.get_cursor_position(), Point(x=0, y=9)) 216 217 log_view.scroll_to_bottom() 218 log_view.render_content() 219 self.assertEqual(log_view.get_cursor_position(), Point(x=0, y=9)) 220 221 expected_formatted_text = [ 222 ('', ''), 223 ('class:log-time', '20210713 00:00:00'), 224 ('', ' '), 225 ('class:log-level-10', 'DEBUG'), 226 ('', ' Test log 0'), 227 ('class:log-time', '20210713 00:00:00'), 228 ('', ' '), 229 ('class:log-level-10', 'DEBUG'), 230 ('', ' Test log 1'), 231 ('class:log-time', '20210713 00:00:00'), 232 ('', ' '), 233 ('class:log-level-10', 'DEBUG'), 234 ('', ' Test log 2'), 235 ('class:selected-log-line class:log-time', '20210713 00:00:00'), 236 ('class:selected-log-line ', ' '), 237 ('class:selected-log-line class:log-level-10', 'DEBUG'), 238 ( 239 'class:selected-log-line ', 240 ' Test log 3 ', 241 ), 242 ] 243 244 # pylint: disable=protected-access 245 result_text = join_adjacent_style_tuples( 246 flatten_formatted_text_tuples(log_view._line_fragment_cache) 247 ) 248 # pylint: enable=protected-access 249 250 self.assertEqual(result_text, expected_formatted_text) 251 252 def test_clear_scrollback(self) -> None: 253 """Test various functions with clearing log scrollback history.""" 254 # pylint: disable=protected-access 255 # Create log_view with 4 logs 256 starting_log_count = 4 257 log_view, _pane = self._create_log_view_with_logs( 258 log_count=starting_log_count 259 ) 260 log_view.render_content() 261 262 # Check setup is correct 263 self.assertTrue(log_view.follow) 264 self.assertEqual(log_view.get_current_line(), 3) 265 self.assertEqual(log_view.get_total_count(), 4) 266 self.assertEqual( 267 list( 268 log.record.message for log in log_view._get_visible_log_lines() 269 ), 270 ['Test log 0', 'Test log 1', 'Test log 2', 'Test log 3'], 271 ) 272 273 # Clear scrollback 274 log_view.clear_scrollback() 275 log_view.render_content() 276 # Follow is still on 277 self.assertTrue(log_view.follow) 278 self.assertEqual(log_view.hidden_line_count(), 4) 279 # Current line index should stay the same 280 self.assertEqual(log_view.get_current_line(), 3) 281 # Total count should stay the same 282 self.assertEqual(log_view.get_total_count(), 4) 283 # No lines returned 284 self.assertEqual( 285 list( 286 log.record.message for log in log_view._get_visible_log_lines() 287 ), 288 [], 289 ) 290 291 # Add Log 4 more lines 292 test_log = logging.getLogger('log_view.test') 293 with self.assertLogs(test_log, level='DEBUG') as _log_context: 294 test_log.addHandler(log_view.log_store) 295 for i in range(4): 296 test_log.debug('Test log %s', i + starting_log_count) 297 log_view.render_content() 298 299 # Current line 300 self.assertEqual(log_view.hidden_line_count(), 4) 301 self.assertEqual(log_view.get_last_log_index(), 7) 302 self.assertEqual(log_view.get_current_line(), 7) 303 self.assertEqual(log_view.get_total_count(), 8) 304 # Only the last 4 logs should appear 305 self.assertEqual( 306 list( 307 log.record.message for log in log_view._get_visible_log_lines() 308 ), 309 ['Test log 4', 'Test log 5', 'Test log 6', 'Test log 7'], 310 ) 311 312 log_view.scroll_to_bottom() 313 log_view.render_content() 314 self.assertEqual(log_view.get_current_line(), 7) 315 # Turn follow back on 316 log_view.toggle_follow() 317 318 log_view.undo_clear_scrollback() 319 # Current line and total are the same 320 self.assertEqual(log_view.get_current_line(), 7) 321 self.assertEqual(log_view.get_total_count(), 8) 322 # All logs should appear 323 self.assertEqual( 324 list( 325 log.record.message for log in log_view._get_visible_log_lines() 326 ), 327 [ 328 'Test log 0', 329 'Test log 1', 330 'Test log 2', 331 'Test log 3', 332 'Test log 4', 333 'Test log 5', 334 'Test log 6', 335 'Test log 7', 336 ], 337 ) 338 339 log_view.scroll_to_bottom() 340 log_view.render_content() 341 self.assertEqual(log_view.get_current_line(), 7) 342 343 def test_get_line_at_cursor_position(self) -> None: 344 """Tests fuctions that rely on getting a log_index for the current 345 cursor position. 346 347 Including: 348 - LogScreen.fetch_subline_up 349 - LogScreen.fetch_subline_down 350 - LogView._update_log_index 351 """ 352 # pylint: disable=protected-access 353 # Create log_view with 4 logs 354 starting_log_count = 4 355 log_view, _pane = self._create_log_view_with_logs( 356 log_count=starting_log_count 357 ) 358 log_view.render_content() 359 360 # Check setup is correct 361 self.assertTrue(log_view.follow) 362 self.assertEqual(log_view.get_current_line(), 3) 363 self.assertEqual(log_view.get_total_count(), 4) 364 self.assertEqual( 365 list( 366 log.record.message for log in log_view._get_visible_log_lines() 367 ), 368 ['Test log 0', 'Test log 1', 'Test log 2', 'Test log 3'], 369 ) 370 371 self.assertEqual(log_view.log_screen.cursor_position, 9) 372 # Force the cursor_position to be larger than the log_screen 373 # line_buffer. 374 log_view.log_screen.cursor_position = 10 375 # Attempt to get the current line, no exception should be raised 376 result = log_view.log_screen.get_line_at_cursor_position() 377 # Log index should be None 378 self.assertEqual(result.log_index, None) 379 380 # Force the cursor_position to be < 0. This won't produce an error but 381 # would wrap around to the beginning. 382 log_view.log_screen.cursor_position = -1 383 # Attempt to get the current line, no exception should be raised 384 result = log_view.log_screen.get_line_at_cursor_position() 385 # Result should be a blank line 386 self.assertEqual(result, ScreenLine([('', '')])) 387 # Log index should be None 388 self.assertEqual(result.log_index, None) 389 390 def test_visual_select(self) -> None: 391 """Test log line selection.""" 392 log_view, log_pane = self._create_log_view_with_logs(log_count=100) 393 self.assertEqual(100, log_view.get_total_count()) 394 395 # Page scrolling needs to know the current window height. 396 log_pane.pane_resized = MagicMock(return_value=True) 397 log_pane.current_log_pane_width = 80 398 log_pane.current_log_pane_height = 10 399 400 log_view.log_screen.reset_logs = MagicMock( 401 wraps=log_view.log_screen.reset_logs 402 ) 403 log_view.log_screen.get_lines = MagicMock( 404 wraps=log_view.log_screen.get_lines 405 ) 406 407 log_view.render_content() 408 log_view.log_screen.reset_logs.assert_called_once() 409 log_view.log_screen.get_lines.assert_called_once_with( 410 marked_logs_start=None, marked_logs_end=None 411 ) 412 log_view.log_screen.get_lines.reset_mock() 413 log_view.log_screen.reset_logs.reset_mock() 414 415 self.assertIsNone(log_view.marked_logs_start) 416 self.assertIsNone(log_view.marked_logs_end) 417 log_view.visual_select_line(Point(0, 9)) 418 self.assertEqual( 419 (99, 99), (log_view.marked_logs_start, log_view.marked_logs_end) 420 ) 421 422 log_view.visual_select_line(Point(0, 8)) 423 log_view.visual_select_line(Point(0, 7)) 424 self.assertEqual( 425 (97, 99), (log_view.marked_logs_start, log_view.marked_logs_end) 426 ) 427 428 log_view.clear_visual_selection() 429 self.assertIsNone(log_view.marked_logs_start) 430 self.assertIsNone(log_view.marked_logs_end) 431 432 log_view.visual_select_line(Point(0, 1)) 433 log_view.visual_select_line(Point(0, 2)) 434 log_view.visual_select_line(Point(0, 3)) 435 log_view.visual_select_line(Point(0, 4)) 436 self.assertEqual( 437 (91, 94), (log_view.marked_logs_start, log_view.marked_logs_end) 438 ) 439 440 # Make sure the log screen was not re-generated. 441 log_view.log_screen.reset_logs.assert_not_called() 442 log_view.log_screen.reset_logs.reset_mock() 443 444 # Render the screen 445 log_view.render_content() 446 log_view.log_screen.reset_logs.assert_called_once() 447 # Check the visual selection was specified 448 log_view.log_screen.get_lines.assert_called_once_with( 449 marked_logs_start=91, marked_logs_end=94 450 ) 451 log_view.log_screen.get_lines.reset_mock() 452 log_view.log_screen.reset_logs.reset_mock() 453 454 455if _PYTHON_3_8: 456 from unittest import IsolatedAsyncioTestCase # type: ignore # pylint: disable=no-name-in-module 457 458 class TestLogViewFiltering( 459 IsolatedAsyncioTestCase 460 ): # pylint: disable=undefined-variable 461 """Test LogView log filtering capabilities.""" 462 463 # pylint: disable=invalid-name 464 def setUp(self): 465 self.maxDiff = None 466 467 # pylint: enable=invalid-name 468 469 def _create_log_view_from_list(self, log_messages): 470 log_view, log_pane = _create_log_view() 471 472 test_log = logging.getLogger('log_view.test') 473 with self.assertLogs(test_log, level='DEBUG') as _log_context: 474 test_log.addHandler(log_view.log_store) 475 for log, extra_arg in log_messages: 476 test_log.debug('%s', log, extra=extra_arg) 477 478 return log_view, log_pane 479 480 @parameterized.expand( 481 [ 482 ( 483 # Test name 484 'regex filter', 485 # Search input_text 486 'log.*item', 487 # input_logs 488 [ 489 ('Log some item', dict()), 490 ('Log another item', dict()), 491 ('Some exception', dict()), 492 ], 493 # expected_matched_lines 494 [ 495 'Log some item', 496 'Log another item', 497 ], 498 # expected_match_line_numbers 499 {0: 0, 1: 1}, 500 # expected_export_text 501 (' DEBUG Log some item\n DEBUG Log another item\n'), 502 None, # field 503 False, # invert 504 ), 505 ( 506 # Test name 507 'regex filter with field', 508 # Search input_text 509 'earth', 510 # input_logs 511 [ 512 ( 513 'Log some item', 514 dict(extra_metadata_fields={'planet': 'Jupiter'}), 515 ), 516 ( 517 'Log another item', 518 dict(extra_metadata_fields={'planet': 'Earth'}), 519 ), 520 ( 521 'Some exception', 522 dict(extra_metadata_fields={'planet': 'Earth'}), 523 ), 524 ], 525 # expected_matched_lines 526 [ 527 'Log another item', 528 'Some exception', 529 ], 530 # expected_match_line_numbers 531 {1: 0, 2: 1}, 532 # expected_export_text 533 ( 534 ' DEBUG Earth Log another item\n' 535 ' DEBUG Earth Some exception\n' 536 ), 537 'planet', # field 538 False, # invert 539 ), 540 ( 541 # Test name 542 'regex filter with field inverted', 543 # Search input_text 544 'earth', 545 # input_logs 546 [ 547 ( 548 'Log some item', 549 dict(extra_metadata_fields={'planet': 'Jupiter'}), 550 ), 551 ( 552 'Log another item', 553 dict(extra_metadata_fields={'planet': 'Earth'}), 554 ), 555 ( 556 'Some exception', 557 dict(extra_metadata_fields={'planet': 'Earth'}), 558 ), 559 ], 560 # expected_matched_lines 561 [ 562 'Log some item', 563 ], 564 # expected_match_line_numbers 565 {0: 0}, 566 # expected_export_text 567 (' DEBUG Jupiter Log some item\n'), 568 'planet', # field 569 True, # invert 570 ), 571 ] 572 ) 573 async def test_log_filtering( 574 self, 575 _test_name, 576 input_text, 577 input_logs, 578 expected_matched_lines, 579 expected_match_line_numbers, 580 expected_export_text, 581 field=None, 582 invert=False, 583 ) -> None: 584 """Test run log view filtering.""" 585 log_view, _log_pane = self._create_log_view_from_list(input_logs) 586 log_view.render_content() 587 588 self.assertEqual(log_view.get_total_count(), len(input_logs)) 589 # Apply the search and wait for the match count background task 590 log_view.new_search(input_text, invert=invert, field=field) 591 await log_view.search_match_count_task 592 self.assertEqual( 593 log_view.search_matched_lines, expected_match_line_numbers 594 ) 595 596 # Apply the filter and wait for the filter background task 597 log_view.apply_filter() 598 await log_view.filter_existing_logs_task 599 600 # Do the number of logs match the expected count? 601 self.assertEqual( 602 log_view.get_total_count(), len(expected_matched_lines) 603 ) 604 self.assertEqual( 605 [log.record.message for log in log_view.filtered_logs], 606 expected_matched_lines, 607 ) 608 609 # Check exported text respects filtering 610 log_text = ( 611 log_view._logs_to_text( # pylint: disable=protected-access 612 use_table_formatting=True 613 ) 614 ) 615 # Remove leading time from resulting logs 616 log_text_no_datetime = '' 617 for line in log_text.splitlines(): 618 log_text_no_datetime += line[17:] + '\n' 619 self.assertEqual(log_text_no_datetime, expected_export_text) 620 621 # Select the bottom log line 622 log_view.render_content() 623 log_view.visual_select_line(Point(0, 9)) # Window height is 10 624 # Export to text 625 log_text = ( 626 log_view._logs_to_text( # pylint: disable=protected-access 627 selected_lines_only=True, use_table_formatting=False 628 ) 629 ) 630 self.assertEqual( 631 # Remove date, time, and level 632 log_text[24:].strip(), 633 expected_matched_lines[0].strip(), 634 ) 635 636 # Clear filters and check the numbe of lines is back to normal. 637 log_view.clear_filters() 638 self.assertEqual(log_view.get_total_count(), len(input_logs)) 639 640 641if __name__ == '__main__': 642 unittest.main() 643