• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Tests for pw_console.log_view"""
15
16import logging
17import time
18import sys
19import unittest
20from datetime import datetime
21from unittest.mock import MagicMock, patch
22from parameterized import parameterized  # type: ignore
23from prompt_toolkit.data_structures import Point
24
25from pw_console.console_prefs import ConsolePrefs
26from pw_console.log_view import LogView
27from pw_console.log_screen import ScreenLine
28from pw_console.text_formatting import (
29    flatten_formatted_text_tuples,
30    join_adjacent_style_tuples,
31)
32
33_PYTHON_3_8 = sys.version_info >= (
34    3,
35    8,
36)
37
38
39def _create_log_view():
40    log_pane = MagicMock()
41    log_pane.pane_resized = MagicMock(return_value=True)
42    log_pane.current_log_pane_width = 80
43    log_pane.current_log_pane_height = 10
44
45    application = MagicMock()
46    application.prefs = ConsolePrefs(
47        project_file=False, project_user_file=False, user_file=False
48    )
49    application.prefs.reset_config()
50    log_view = LogView(log_pane, application)
51    return log_view, log_pane
52
53
54class TestLogView(unittest.TestCase):
55    """Tests for LogView."""
56
57    # pylint: disable=invalid-name
58    def setUp(self):
59        self.maxDiff = None
60
61    # pylint: enable=invalid-name
62
63    def _create_log_view_with_logs(self, log_count=100):
64        log_view, log_pane = _create_log_view()
65
66        if log_count > 0:
67            test_log = logging.getLogger('log_view.test')
68            with self.assertLogs(test_log, level='DEBUG') as _log_context:
69                test_log.addHandler(log_view.log_store)
70                for i in range(log_count):
71                    test_log.debug('Test log %s', i)
72
73        return log_view, log_pane
74
75    def test_follow_toggle(self) -> None:
76        log_view, _pane = _create_log_view()
77        self.assertTrue(log_view.follow)
78        log_view.toggle_follow()
79        self.assertFalse(log_view.follow)
80
81    def test_follow_scrolls_to_bottom(self) -> None:
82        log_view, _pane = _create_log_view()
83        log_view.toggle_follow()
84        _fragments = log_view.render_content()
85        self.assertFalse(log_view.follow)
86        self.assertEqual(log_view.get_current_line(), 0)
87
88        test_log = logging.getLogger('log_view.test')
89
90        # Log 5 messagse, current_line should stay at 0
91        with self.assertLogs(test_log, level='DEBUG') as _log_context:
92            test_log.addHandler(log_view.log_store)
93            for i in range(5):
94                test_log.debug('Test log %s', i)
95        _fragments = log_view.render_content()
96
97        self.assertEqual(log_view.get_total_count(), 5)
98        self.assertEqual(log_view.get_current_line(), 0)
99        # Turn follow on
100        log_view.toggle_follow()
101        self.assertTrue(log_view.follow)
102
103        # Log another messagse, current_line should move to the last.
104        with self.assertLogs(test_log, level='DEBUG') as _log_context:
105            test_log.addHandler(log_view.log_store)
106            test_log.debug('Test log')
107        _fragments = log_view.render_content()
108
109        self.assertEqual(log_view.get_total_count(), 6)
110        self.assertEqual(log_view.get_current_line(), 5)
111
112    def test_scrolling(self) -> None:
113        """Test all scrolling methods."""
114        log_view, log_pane = self._create_log_view_with_logs(log_count=100)
115
116        # Page scrolling needs to know the current window height.
117        log_pane.pane_resized = MagicMock(return_value=True)
118        log_pane.current_log_pane_width = 80
119        log_pane.current_log_pane_height = 10
120
121        log_view.render_content()
122
123        # Follow is on by default, current line should be at the end.
124        self.assertEqual(log_view.get_current_line(), 99)
125
126        # Move to the beginning.
127        log_view.scroll_to_top()
128        log_view.render_content()
129        self.assertEqual(log_view.get_current_line(), 0)
130
131        # Should not be able to scroll before the beginning.
132        log_view.scroll_up()
133        log_view.render_content()
134        self.assertEqual(log_view.get_current_line(), 0)
135        log_view.scroll_up_one_page()
136        log_view.render_content()
137        self.assertEqual(log_view.get_current_line(), 0)
138
139        # Single and multi line movement.
140        log_view.scroll_down()
141        log_view.render_content()
142        self.assertEqual(log_view.get_current_line(), 1)
143        log_view.scroll(5)
144        log_view.render_content()
145        self.assertEqual(log_view.get_current_line(), 6)
146        log_view.scroll_up()
147        log_view.render_content()
148        self.assertEqual(log_view.get_current_line(), 5)
149
150        # Page down and up.
151        log_view.scroll_down_one_page()
152        self.assertEqual(log_view.get_current_line(), 15)
153
154        log_view.scroll_up_one_page()
155        self.assertEqual(log_view.get_current_line(), 5)
156
157        # Move to the end.
158        log_view.scroll_to_bottom()
159        log_view.render_content()
160        self.assertEqual(log_view.get_current_line(), 99)
161
162        # Should not be able to scroll beyond the end.
163        log_view.scroll_down()
164        log_view.render_content()
165        self.assertEqual(log_view.get_current_line(), 99)
166        log_view.scroll_down_one_page()
167        log_view.render_content()
168        self.assertEqual(log_view.get_current_line(), 99)
169
170        # Move up a bit to turn off follow
171        self.assertEqual(log_view.log_screen.cursor_position, 9)
172        log_view.scroll(-1)
173        self.assertEqual(log_view.log_screen.cursor_position, 8)
174        log_view.render_content()
175        self.assertEqual(log_view.get_current_line(), 98)
176
177        # Simulate a mouse click to scroll.
178        # Click 1 lines from the top of the window.
179        log_view.scroll_to_position(Point(0, 1))
180        log_view.render_content()
181        self.assertEqual(log_view.get_current_line(), 90)
182
183        # Disable follow mode if mouse click on line.
184        log_view.toggle_follow()
185        log_view.render_content()
186        self.assertTrue(log_view.follow)
187        self.assertEqual(log_view.get_current_line(), 99)
188        log_view.scroll_to_position(Point(0, 5))
189        log_view.render_content()
190        self.assertEqual(log_view.get_current_line(), 95)
191        self.assertFalse(log_view.follow)
192
193    def test_render_content_and_cursor_position(self) -> None:
194        """Test render_content results and get_cursor_position
195
196        get_cursor_position() should return the correct position depending on
197        what line is selected."""
198
199        # Mock time to always return the same value.
200        mock_time = MagicMock(  # type: ignore
201            return_value=time.mktime(datetime(2021, 7, 13, 0, 0, 0).timetuple())
202        )
203        with patch('time.time', new=mock_time):
204            log_view, log_pane = self._create_log_view_with_logs(log_count=4)
205
206        # Mock needed LogPane functions that pull info from prompt_toolkit.
207        log_pane.get_horizontal_scroll_amount = MagicMock(return_value=0)
208        log_pane.current_log_pane_width = 80
209        log_pane.current_log_pane_height = 10
210
211        log_view.render_content()
212        log_view.scroll_to_top()
213        log_view.render_content()
214        # Scroll to top keeps the cursor on the bottom of the window.
215        self.assertEqual(log_view.get_cursor_position(), Point(x=0, y=9))
216
217        log_view.scroll_to_bottom()
218        log_view.render_content()
219        self.assertEqual(log_view.get_cursor_position(), Point(x=0, y=9))
220
221        expected_formatted_text = [
222            ('', ''),
223            ('class:log-time', '20210713 00:00:00'),
224            ('', '  '),
225            ('class:log-level-10', 'DEBUG'),
226            ('', '  Test log 0'),
227            ('class:log-time', '20210713 00:00:00'),
228            ('', '  '),
229            ('class:log-level-10', 'DEBUG'),
230            ('', '  Test log 1'),
231            ('class:log-time', '20210713 00:00:00'),
232            ('', '  '),
233            ('class:log-level-10', 'DEBUG'),
234            ('', '  Test log 2'),
235            ('class:selected-log-line class:log-time', '20210713 00:00:00'),
236            ('class:selected-log-line ', '  '),
237            ('class:selected-log-line class:log-level-10', 'DEBUG'),
238            (
239                'class:selected-log-line ',
240                '  Test log 3                                             ',
241            ),
242        ]
243
244        # pylint: disable=protected-access
245        result_text = join_adjacent_style_tuples(
246            flatten_formatted_text_tuples(log_view._line_fragment_cache)
247        )
248        # pylint: enable=protected-access
249
250        self.assertEqual(result_text, expected_formatted_text)
251
252    def test_clear_scrollback(self) -> None:
253        """Test various functions with clearing log scrollback history."""
254        # pylint: disable=protected-access
255        # Create log_view with 4 logs
256        starting_log_count = 4
257        log_view, _pane = self._create_log_view_with_logs(
258            log_count=starting_log_count
259        )
260        log_view.render_content()
261
262        # Check setup is correct
263        self.assertTrue(log_view.follow)
264        self.assertEqual(log_view.get_current_line(), 3)
265        self.assertEqual(log_view.get_total_count(), 4)
266        self.assertEqual(
267            list(
268                log.record.message for log in log_view._get_visible_log_lines()
269            ),
270            ['Test log 0', 'Test log 1', 'Test log 2', 'Test log 3'],
271        )
272
273        # Clear scrollback
274        log_view.clear_scrollback()
275        log_view.render_content()
276        # Follow is still on
277        self.assertTrue(log_view.follow)
278        self.assertEqual(log_view.hidden_line_count(), 4)
279        # Current line index should stay the same
280        self.assertEqual(log_view.get_current_line(), 3)
281        # Total count should stay the same
282        self.assertEqual(log_view.get_total_count(), 4)
283        # No lines returned
284        self.assertEqual(
285            list(
286                log.record.message for log in log_view._get_visible_log_lines()
287            ),
288            [],
289        )
290
291        # Add Log 4 more lines
292        test_log = logging.getLogger('log_view.test')
293        with self.assertLogs(test_log, level='DEBUG') as _log_context:
294            test_log.addHandler(log_view.log_store)
295            for i in range(4):
296                test_log.debug('Test log %s', i + starting_log_count)
297        log_view.render_content()
298
299        # Current line
300        self.assertEqual(log_view.hidden_line_count(), 4)
301        self.assertEqual(log_view.get_last_log_index(), 7)
302        self.assertEqual(log_view.get_current_line(), 7)
303        self.assertEqual(log_view.get_total_count(), 8)
304        # Only the last 4 logs should appear
305        self.assertEqual(
306            list(
307                log.record.message for log in log_view._get_visible_log_lines()
308            ),
309            ['Test log 4', 'Test log 5', 'Test log 6', 'Test log 7'],
310        )
311
312        log_view.scroll_to_bottom()
313        log_view.render_content()
314        self.assertEqual(log_view.get_current_line(), 7)
315        # Turn follow back on
316        log_view.toggle_follow()
317
318        log_view.undo_clear_scrollback()
319        # Current line and total are the same
320        self.assertEqual(log_view.get_current_line(), 7)
321        self.assertEqual(log_view.get_total_count(), 8)
322        # All logs should appear
323        self.assertEqual(
324            list(
325                log.record.message for log in log_view._get_visible_log_lines()
326            ),
327            [
328                'Test log 0',
329                'Test log 1',
330                'Test log 2',
331                'Test log 3',
332                'Test log 4',
333                'Test log 5',
334                'Test log 6',
335                'Test log 7',
336            ],
337        )
338
339        log_view.scroll_to_bottom()
340        log_view.render_content()
341        self.assertEqual(log_view.get_current_line(), 7)
342
343    def test_get_line_at_cursor_position(self) -> None:
344        """Tests fuctions that rely on getting a log_index for the current
345        cursor position.
346
347        Including:
348        - LogScreen.fetch_subline_up
349        - LogScreen.fetch_subline_down
350        - LogView._update_log_index
351        """
352        # pylint: disable=protected-access
353        # Create log_view with 4 logs
354        starting_log_count = 4
355        log_view, _pane = self._create_log_view_with_logs(
356            log_count=starting_log_count
357        )
358        log_view.render_content()
359
360        # Check setup is correct
361        self.assertTrue(log_view.follow)
362        self.assertEqual(log_view.get_current_line(), 3)
363        self.assertEqual(log_view.get_total_count(), 4)
364        self.assertEqual(
365            list(
366                log.record.message for log in log_view._get_visible_log_lines()
367            ),
368            ['Test log 0', 'Test log 1', 'Test log 2', 'Test log 3'],
369        )
370
371        self.assertEqual(log_view.log_screen.cursor_position, 9)
372        # Force the cursor_position to be larger than the log_screen
373        # line_buffer.
374        log_view.log_screen.cursor_position = 10
375        # Attempt to get the current line, no exception should be raised
376        result = log_view.log_screen.get_line_at_cursor_position()
377        # Log index should be None
378        self.assertEqual(result.log_index, None)
379
380        # Force the cursor_position to be < 0. This won't produce an error but
381        # would wrap around to the beginning.
382        log_view.log_screen.cursor_position = -1
383        # Attempt to get the current line, no exception should be raised
384        result = log_view.log_screen.get_line_at_cursor_position()
385        # Result should be a blank line
386        self.assertEqual(result, ScreenLine([('', '')]))
387        # Log index should be None
388        self.assertEqual(result.log_index, None)
389
390    def test_visual_select(self) -> None:
391        """Test log line selection."""
392        log_view, log_pane = self._create_log_view_with_logs(log_count=100)
393        self.assertEqual(100, log_view.get_total_count())
394
395        # Page scrolling needs to know the current window height.
396        log_pane.pane_resized = MagicMock(return_value=True)
397        log_pane.current_log_pane_width = 80
398        log_pane.current_log_pane_height = 10
399
400        log_view.log_screen.reset_logs = MagicMock(
401            wraps=log_view.log_screen.reset_logs
402        )
403        log_view.log_screen.get_lines = MagicMock(
404            wraps=log_view.log_screen.get_lines
405        )
406
407        log_view.render_content()
408        log_view.log_screen.reset_logs.assert_called_once()
409        log_view.log_screen.get_lines.assert_called_once_with(
410            marked_logs_start=None, marked_logs_end=None
411        )
412        log_view.log_screen.get_lines.reset_mock()
413        log_view.log_screen.reset_logs.reset_mock()
414
415        self.assertIsNone(log_view.marked_logs_start)
416        self.assertIsNone(log_view.marked_logs_end)
417        log_view.visual_select_line(Point(0, 9))
418        self.assertEqual(
419            (99, 99), (log_view.marked_logs_start, log_view.marked_logs_end)
420        )
421
422        log_view.visual_select_line(Point(0, 8))
423        log_view.visual_select_line(Point(0, 7))
424        self.assertEqual(
425            (97, 99), (log_view.marked_logs_start, log_view.marked_logs_end)
426        )
427
428        log_view.clear_visual_selection()
429        self.assertIsNone(log_view.marked_logs_start)
430        self.assertIsNone(log_view.marked_logs_end)
431
432        log_view.visual_select_line(Point(0, 1))
433        log_view.visual_select_line(Point(0, 2))
434        log_view.visual_select_line(Point(0, 3))
435        log_view.visual_select_line(Point(0, 4))
436        self.assertEqual(
437            (91, 94), (log_view.marked_logs_start, log_view.marked_logs_end)
438        )
439
440        # Make sure the log screen was not re-generated.
441        log_view.log_screen.reset_logs.assert_not_called()
442        log_view.log_screen.reset_logs.reset_mock()
443
444        # Render the screen
445        log_view.render_content()
446        log_view.log_screen.reset_logs.assert_called_once()
447        # Check the visual selection was specified
448        log_view.log_screen.get_lines.assert_called_once_with(
449            marked_logs_start=91, marked_logs_end=94
450        )
451        log_view.log_screen.get_lines.reset_mock()
452        log_view.log_screen.reset_logs.reset_mock()
453
454
455if _PYTHON_3_8:
456    from unittest import IsolatedAsyncioTestCase  # type: ignore # pylint: disable=no-name-in-module
457
458    class TestLogViewFiltering(
459        IsolatedAsyncioTestCase
460    ):  # pylint: disable=undefined-variable
461        """Test LogView log filtering capabilities."""
462
463        # pylint: disable=invalid-name
464        def setUp(self):
465            self.maxDiff = None
466
467        # pylint: enable=invalid-name
468
469        def _create_log_view_from_list(self, log_messages):
470            log_view, log_pane = _create_log_view()
471
472            test_log = logging.getLogger('log_view.test')
473            with self.assertLogs(test_log, level='DEBUG') as _log_context:
474                test_log.addHandler(log_view.log_store)
475                for log, extra_arg in log_messages:
476                    test_log.debug('%s', log, extra=extra_arg)
477
478            return log_view, log_pane
479
480        @parameterized.expand(
481            [
482                (
483                    # Test name
484                    'regex filter',
485                    # Search input_text
486                    'log.*item',
487                    # input_logs
488                    [
489                        ('Log some item', dict()),
490                        ('Log another item', dict()),
491                        ('Some exception', dict()),
492                    ],
493                    # expected_matched_lines
494                    [
495                        'Log some item',
496                        'Log another item',
497                    ],
498                    # expected_match_line_numbers
499                    {0: 0, 1: 1},
500                    # expected_export_text
501                    ('  DEBUG  Log some item\n  DEBUG  Log another item\n'),
502                    None,  # field
503                    False,  # invert
504                ),
505                (
506                    # Test name
507                    'regex filter with field',
508                    # Search input_text
509                    'earth',
510                    # input_logs
511                    [
512                        (
513                            'Log some item',
514                            dict(extra_metadata_fields={'planet': 'Jupiter'}),
515                        ),
516                        (
517                            'Log another item',
518                            dict(extra_metadata_fields={'planet': 'Earth'}),
519                        ),
520                        (
521                            'Some exception',
522                            dict(extra_metadata_fields={'planet': 'Earth'}),
523                        ),
524                    ],
525                    # expected_matched_lines
526                    [
527                        'Log another item',
528                        'Some exception',
529                    ],
530                    # expected_match_line_numbers
531                    {1: 0, 2: 1},
532                    # expected_export_text
533                    (
534                        '  DEBUG  Earth    Log another item\n'
535                        '  DEBUG  Earth    Some exception\n'
536                    ),
537                    'planet',  # field
538                    False,  # invert
539                ),
540                (
541                    # Test name
542                    'regex filter with field inverted',
543                    # Search input_text
544                    'earth',
545                    # input_logs
546                    [
547                        (
548                            'Log some item',
549                            dict(extra_metadata_fields={'planet': 'Jupiter'}),
550                        ),
551                        (
552                            'Log another item',
553                            dict(extra_metadata_fields={'planet': 'Earth'}),
554                        ),
555                        (
556                            'Some exception',
557                            dict(extra_metadata_fields={'planet': 'Earth'}),
558                        ),
559                    ],
560                    # expected_matched_lines
561                    [
562                        'Log some item',
563                    ],
564                    # expected_match_line_numbers
565                    {0: 0},
566                    # expected_export_text
567                    ('  DEBUG  Jupiter  Log some item\n'),
568                    'planet',  # field
569                    True,  # invert
570                ),
571            ]
572        )
573        async def test_log_filtering(
574            self,
575            _test_name,
576            input_text,
577            input_logs,
578            expected_matched_lines,
579            expected_match_line_numbers,
580            expected_export_text,
581            field=None,
582            invert=False,
583        ) -> None:
584            """Test run log view filtering."""
585            log_view, _log_pane = self._create_log_view_from_list(input_logs)
586            log_view.render_content()
587
588            self.assertEqual(log_view.get_total_count(), len(input_logs))
589            # Apply the search and wait for the match count background task
590            log_view.new_search(input_text, invert=invert, field=field)
591            await log_view.search_match_count_task
592            self.assertEqual(
593                log_view.search_matched_lines, expected_match_line_numbers
594            )
595
596            # Apply the filter and wait for the filter background task
597            log_view.apply_filter()
598            await log_view.filter_existing_logs_task
599
600            # Do the number of logs match the expected count?
601            self.assertEqual(
602                log_view.get_total_count(), len(expected_matched_lines)
603            )
604            self.assertEqual(
605                [log.record.message for log in log_view.filtered_logs],
606                expected_matched_lines,
607            )
608
609            # Check exported text respects filtering
610            log_text = (
611                log_view._logs_to_text(  # pylint: disable=protected-access
612                    use_table_formatting=True
613                )
614            )
615            # Remove leading time from resulting logs
616            log_text_no_datetime = ''
617            for line in log_text.splitlines():
618                log_text_no_datetime += line[17:] + '\n'
619            self.assertEqual(log_text_no_datetime, expected_export_text)
620
621            # Select the bottom log line
622            log_view.render_content()
623            log_view.visual_select_line(Point(0, 9))  # Window height is 10
624            # Export to text
625            log_text = (
626                log_view._logs_to_text(  # pylint: disable=protected-access
627                    selected_lines_only=True, use_table_formatting=False
628                )
629            )
630            self.assertEqual(
631                # Remove date, time, and level
632                log_text[24:].strip(),
633                expected_matched_lines[0].strip(),
634            )
635
636            # Clear filters and check the numbe of lines is back to normal.
637            log_view.clear_filters()
638            self.assertEqual(log_view.get_total_count(), len(input_logs))
639
640
641if __name__ == '__main__':
642    unittest.main()
643