• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2023 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14
15"""Log decoder tests."""
16
17from dataclasses import dataclass
18import logging
19from random import randint
20from typing import Any
21from unittest import TestCase, main
22
23from pw_log.log_decoder import (
24    Log,
25    LogStreamDecoder,
26    log_decoded_log,
27    pw_status_code_to_name,
28    timestamp_parser_ns_since_boot,
29)
30from pw_log.proto import log_pb2
31import pw_tokenizer
32
33_MESSAGE_NO_FILENAME = 'World'
34_MESSAGE_AND_ARGS_NO_FILENAME = f'■msg♦{_MESSAGE_NO_FILENAME}'
35_MESSAGE_TOKEN_NO_FILENAME = pw_tokenizer.tokens.pw_tokenizer_65599_hash(
36    _MESSAGE_AND_ARGS_NO_FILENAME
37)
38
39# Creating a database with tokenized information for the core to detokenize
40# tokenized log entries.
41_TOKEN_DATABASE = pw_tokenizer.tokens.Database(
42    [
43        pw_tokenizer.tokens.TokenizedStringEntry(0x01148A48, 'total_dropped'),
44        pw_tokenizer.tokens.TokenizedStringEntry(
45            0x03796798, 'min_queue_remaining'
46        ),
47        pw_tokenizer.tokens.TokenizedStringEntry(0x2E668CD6, 'Jello, world!'),
48        pw_tokenizer.tokens.TokenizedStringEntry(0x329481A2, 'parser_errors'),
49        pw_tokenizer.tokens.TokenizedStringEntry(0x7F35A9A5, 'TestName'),
50        pw_tokenizer.tokens.TokenizedStringEntry(0xCC6D3131, 'Jello?'),
51        pw_tokenizer.tokens.TokenizedStringEntry(
52            0x144C501D, '■msg♦SampleMessage■module♦MODULE■file♦file/path.cc'
53        ),
54        pw_tokenizer.tokens.TokenizedStringEntry(0x0000106A, 'ModuleOrMessage'),
55        pw_tokenizer.tokens.TokenizedStringEntry(
56            pw_tokenizer.tokens.pw_tokenizer_65599_hash(
57                '■msg♦World■module♦wifi■file♦/path/to/file.cc'
58            ),
59            '■msg♦World■module♦wifi■file♦/path/to/file.cc',
60        ),
61        pw_tokenizer.tokens.TokenizedStringEntry(
62            _MESSAGE_TOKEN_NO_FILENAME, _MESSAGE_AND_ARGS_NO_FILENAME
63        ),
64    ]
65)
66_DETOKENIZER = pw_tokenizer.Detokenizer(_TOKEN_DATABASE)
67
68
69def _create_log_entry_with_tokenized_fields(
70    message: str, module: str, file: str, thread: str, line: int, level: int
71) -> log_pb2.LogEntry:
72    """Tokenizing tokenizable LogEntry fields to become a detoknized log."""
73    tokenized_message = pw_tokenizer.encode.encode_token_and_args(
74        pw_tokenizer.tokens.pw_tokenizer_65599_hash(message)
75    )
76    tokenized_module = pw_tokenizer.encode.encode_token_and_args(
77        pw_tokenizer.tokens.pw_tokenizer_65599_hash(module)
78    )
79    tokenized_file = pw_tokenizer.encode.encode_token_and_args(
80        pw_tokenizer.tokens.pw_tokenizer_65599_hash(file)
81    )
82    tokenized_thread = pw_tokenizer.encode.encode_token_and_args(
83        pw_tokenizer.tokens.pw_tokenizer_65599_hash(thread)
84    )
85
86    return log_pb2.LogEntry(
87        message=tokenized_message,
88        module=tokenized_module,
89        file=tokenized_file,
90        line_level=Log.pack_line_level(line, level),
91        thread=tokenized_thread,
92    )
93
94
95def _create_random_log_entry() -> log_pb2.LogEntry:
96    return log_pb2.LogEntry(
97        message=bytes(f'message {randint(1,100)}'.encode('utf-8')),
98        line_level=Log.pack_line_level(
99            randint(0, 2000), randint(logging.DEBUG, logging.CRITICAL)
100        ),
101        file=b'main.cc',
102        thread=bytes(f'thread {randint(1,5)}'.encode('utf-8')),
103    )
104
105
106def _create_drop_count_message_log_entry(
107    drop_count: int, reason: str = ''
108) -> log_pb2.LogEntry:
109    log_entry = log_pb2.LogEntry(dropped=drop_count)
110    if reason:
111        log_entry.message = bytes(reason.encode('utf-8'))
112    return log_entry
113
114
115class TestLogStreamDecoderBase(TestCase):
116    """Base Test class for LogStreamDecoder."""
117
118    def setUp(self) -> None:
119        """Set up logs decoder."""
120
121        def parse_pw_status(msg: str) -> str:
122            return pw_status_code_to_name(msg)
123
124        self.captured_logs: list[Log] = []
125
126        def decoded_log_handler(log: Log) -> None:
127            self.captured_logs.append(log)
128
129        self.decoder = LogStreamDecoder(
130            decoded_log_handler=decoded_log_handler,
131            detokenizer=_DETOKENIZER,
132            source_name='source',
133            timestamp_parser=timestamp_parser_ns_since_boot,
134            message_parser=parse_pw_status,
135        )
136
137    def _captured_logs_as_str(self) -> str:
138        return '\n'.join(map(str, self.captured_logs))
139
140
141class TestLogStreamDecoderDecodingFunctionality(TestLogStreamDecoderBase):
142    """Tests LogStreamDecoder decoding functionality."""
143
144    def test_parse_log_entry_valid_non_tokenized(self) -> None:
145        """Test that valid LogEntry protos are parsed correctly."""
146        expected_log = Log(
147            message='Hello',
148            file_and_line='my/path/file.cc:123',
149            level=logging.INFO,
150            source_name=self.decoder.source_name,
151            timestamp='0:00',
152        )
153        result = self.decoder.parse_log_entry_proto(
154            log_pb2.LogEntry(
155                message=b'Hello',
156                file=b'my/path/file.cc',
157                line_level=Log.pack_line_level(123, logging.INFO),
158            )
159        )
160        self.assertEqual(result, expected_log)
161
162    def test_parse_log_entry_valid_packed_message(self) -> None:
163        """Test that valid LogEntry protos are parsed correctly."""
164        log_with_metadata_in_message = Log(
165            message='World',
166            file_and_line='/path/to/file.cc',
167            level=logging.DEBUG,
168            source_name=self.decoder.source_name,
169            module_name='wifi',
170            timestamp='0:00',
171        )
172        result = self.decoder.parse_log_entry_proto(
173            log_pb2.LogEntry(
174                message=bytes(
175                    '■msg♦World■module♦wifi■file♦/path/to/file.cc'.encode(
176                        'utf-8'
177                    )
178                ),
179                line_level=Log.pack_line_level(0, logging.DEBUG),
180                timestamp=100,
181            )
182        )
183        self.assertEqual(result, log_with_metadata_in_message)
184
185    def test_parse_log_entry_valid_logs_drop_message(self) -> None:
186        """Test that valid LogEntry protos are parsed correctly."""
187        dropped_message = Log(
188            message='Dropped 30 logs due to buffer too small',
189            level=logging.WARNING,
190            source_name=self.decoder.source_name,
191        )
192        result = self.decoder.parse_log_entry_proto(
193            log_pb2.LogEntry(message=b'buffer too small', dropped=30)
194        )
195        self.assertEqual(result, dropped_message)
196
197    def test_parse_log_entry_valid_tokenized(self) -> None:
198        """Test that tokenized LogEntry protos are parsed correctly."""
199        message = 'Jello, world!'
200        module_name = 'TestName'
201        file = 'parser_errors'
202        thread_name = 'Jello?'
203        line = 123
204        level = logging.INFO
205
206        expected_log = Log(
207            message=message,
208            module_name=module_name,
209            file_and_line=file + ':123',
210            level=logging.INFO,
211            source_name=self.decoder.source_name,
212            timestamp='0:00',
213            thread_name=thread_name,
214        )
215
216        log_entry = _create_log_entry_with_tokenized_fields(
217            message, module_name, file, thread_name, line, level
218        )
219        result = self.decoder.parse_log_entry_proto(log_entry)
220        self.assertEqual(result, expected_log, msg='Log was not detokenized')
221
222    def test_tokenized_contents_not_detokenized(self):
223        """Test fields with tokens not in the database are not decrypted."""
224        # The following strings do not have tokens in the device token db.
225        message_not_in_db = 'device is shutting down.'
226        module_name_not_in_db = 'Battery'
227        file_not_in_db = 'charger.cc'
228        thread_name_not_in_db = 'BatteryStatus'
229        line = 123
230        level = logging.INFO
231
232        log_entry = _create_log_entry_with_tokenized_fields(
233            message_not_in_db,
234            module_name_not_in_db,
235            file_not_in_db,
236            thread_name_not_in_db,
237            line,
238            level,
239        )
240        message = pw_tokenizer.proto.decode_optionally_tokenized(
241            _DETOKENIZER, log_entry.message
242        )
243        module = pw_tokenizer.proto.decode_optionally_tokenized(
244            _DETOKENIZER, log_entry.module
245        )
246        file = pw_tokenizer.proto.decode_optionally_tokenized(
247            _DETOKENIZER, log_entry.file
248        )
249        thread = pw_tokenizer.proto.decode_optionally_tokenized(
250            _DETOKENIZER, log_entry.thread
251        )
252        expected_log = Log(
253            message=message,
254            module_name=module,
255            file_and_line=file + ':123',
256            level=logging.INFO,
257            source_name=self.decoder.source_name,
258            timestamp='0:00',
259            thread_name=thread,
260        )
261        result = self.decoder.parse_log_entry_proto(log_entry)
262        self.assertEqual(
263            result, expected_log, msg='Log was unexpectedly detokenized'
264        )
265
266    def test_extracting_log_entry_fields_from_tokenized_metadata(self):
267        """Test that tokenized metadata can be used to extract other fields."""
268        metadata = '■msg♦World■module♦wifi■file♦/path/to/file.cc'
269        thread_name = 'M0Log'
270
271        log_entry = log_pb2.LogEntry(
272            message=pw_tokenizer.encode.encode_token_and_args(
273                pw_tokenizer.tokens.pw_tokenizer_65599_hash(metadata)
274            ),
275            line_level=Log.pack_line_level(0, logging.DEBUG),
276            thread=bytes(thread_name.encode('utf-8')),
277        )
278
279        log_with_metadata_in_message = Log(
280            message='World',
281            file_and_line='/path/to/file.cc',
282            level=logging.DEBUG,
283            source_name=self.decoder.source_name,
284            module_name='wifi',
285            timestamp='0:00',
286            thread_name=thread_name,
287        )
288
289        result = self.decoder.parse_log_entry_proto(log_entry)
290        self.assertEqual(
291            result, log_with_metadata_in_message, msg='Log was detokenized.'
292        )
293
294    def test_extracting_status_argument_from_log_message(self):
295        """Test extract status from log message."""
296        expected_log = Log(
297            message='Could not start flux capacitor: PERMISSION_DENIED',
298            file_and_line='my/path/file.cc:123',
299            level=logging.INFO,
300            source_name=self.decoder.source_name,
301            timestamp='0:00',
302        )
303        result = self.decoder.parse_log_entry_proto(
304            log_pb2.LogEntry(
305                message=b'Could not start flux capacitor: pw::Status=7',
306                file=b'my/path/file.cc',
307                line_level=Log.pack_line_level(123, logging.INFO),
308            )
309        )
310        self.assertEqual(
311            result,
312            expected_log,
313            msg='Status was not extracted from log message.',
314        )
315
316        expected_log = Log(
317            message='Error connecting to server: UNAVAILABLE',
318            file_and_line='my/path/file.cc:123',
319            level=logging.INFO,
320            source_name=self.decoder.source_name,
321            timestamp='0:00',
322        )
323        result = self.decoder.parse_log_entry_proto(
324            log_pb2.LogEntry(
325                message=b'Error connecting to server: pw::Status=14',
326                file=b'my/path/file.cc',
327                line_level=Log.pack_line_level(123, logging.INFO),
328            )
329        )
330        self.assertEqual(
331            result,
332            expected_log,
333            msg='Status was not extracted from log message.',
334        )
335
336    def test_extracting_status_with_improper_spacing(self):
337        """Test spaces before pw::Status are ignored."""
338        expected_log = Log(
339            message='Error connecting to server:UNAVAILABLE',
340            file_and_line='my/path/file.cc:123',
341            level=logging.INFO,
342            source_name=self.decoder.source_name,
343            timestamp='0:00',
344        )
345
346        result = self.decoder.parse_log_entry_proto(
347            log_pb2.LogEntry(
348                message=b'Error connecting to server:pw::Status=14',
349                file=b'my/path/file.cc',
350                line_level=Log.pack_line_level(123, logging.INFO),
351            )
352        )
353        self.assertEqual(
354            result,
355            expected_log,
356            msg='Status was not extracted from log message.',
357        )
358
359        expected_log = Log(
360            message='Error connecting to server:    UNAVAILABLE',
361            file_and_line='my/path/file.cc:123',
362            level=logging.INFO,
363            source_name=self.decoder.source_name,
364            timestamp='0:00',
365        )
366
367        result = self.decoder.parse_log_entry_proto(
368            log_pb2.LogEntry(
369                message=b'Error connecting to server:    pw::Status=14',
370                file=b'my/path/file.cc',
371                line_level=Log.pack_line_level(123, logging.INFO),
372            )
373        )
374        self.assertEqual(
375            result,
376            expected_log,
377            msg='Status was not extracted from log message.',
378        )
379
380    def test_not_extracting_status_extra_space_before_code(self):
381        """Test spaces after pw::Status are not allowed."""
382        expected_log = Log(
383            message='Could not start flux capacitor: pw::Status=    7',
384            file_and_line='my/path/file.cc:123',
385            level=logging.INFO,
386            source_name=self.decoder.source_name,
387            timestamp='0:00',
388        )
389
390        result = self.decoder.parse_log_entry_proto(
391            log_pb2.LogEntry(
392                message=b'Could not start flux capacitor: pw::Status=    7',
393                file=b'my/path/file.cc',
394                line_level=Log.pack_line_level(123, logging.INFO),
395            )
396        )
397        self.assertEqual(
398            result,
399            expected_log,
400            msg='Status was not extracted from log message.',
401        )
402
403    def test_not_extracting_status_new_line_before_code(self):
404        """Test new line characters after pw::Status are not allowed."""
405        expected_log = Log(
406            message='Could not start flux capacitor: pw::Status=\n7',
407            file_and_line='my/path/file.cc:123',
408            level=logging.INFO,
409            source_name=self.decoder.source_name,
410            timestamp='0:00',
411        )
412        result = self.decoder.parse_log_entry_proto(
413            log_pb2.LogEntry(
414                message=b'Could not start flux capacitor: pw::Status=\n7',
415                file=b'my/path/file.cc',
416                line_level=Log.pack_line_level(123, logging.INFO),
417            )
418        )
419        self.assertEqual(
420            result,
421            expected_log,
422            msg='Status was not extracted from log message.',
423        )
424
425    def test_not_extracting_status_from_log_message_with_improper_format(self):
426        """Test status not extracted from log message with incorrect format."""
427        expected_log = Log(
428            message='Could not start flux capacitor: pw::Status12',
429            file_and_line='my/path/file.cc:123',
430            level=logging.INFO,
431            source_name=self.decoder.source_name,
432            timestamp='0:00',
433        )
434        result = self.decoder.parse_log_entry_proto(
435            log_pb2.LogEntry(
436                message=b'Could not start flux capacitor: pw::Status12',
437                file=b'my/path/file.cc',
438                line_level=Log.pack_line_level(123, logging.INFO),
439            )
440        )
441        self.assertEqual(
442            result,
443            expected_log,
444            msg='Status was not extracted from log message.',
445        )
446
447    def test_status_code_in_message_does_not_exist(self):
448        """Test status does not exist in pw_status."""
449        expected_log = Log(
450            message='Could not start flux capacitor: pw::Status=17',
451            file_and_line='my/path/file.cc:123',
452            level=logging.INFO,
453            source_name=self.decoder.source_name,
454            timestamp='0:00',
455        )
456        result = self.decoder.parse_log_entry_proto(
457            log_pb2.LogEntry(
458                message=b'Could not start flux capacitor: pw::Status=17',
459                file=b'my/path/file.cc',
460                line_level=Log.pack_line_level(123, logging.INFO),
461            )
462        )
463        self.assertEqual(
464            result,
465            expected_log,
466            msg='Status was not extracted from log message.',
467        )
468
469    def test_status_code_in_message_is_negative(self):
470        """Test status code is negative."""
471        expected_log = Log(
472            message='Could not start flux capacitor: pw::Status=-1',
473            file_and_line='my/path/file.cc:123',
474            level=logging.INFO,
475            source_name=self.decoder.source_name,
476            timestamp='0:00',
477        )
478        result = self.decoder.parse_log_entry_proto(
479            log_pb2.LogEntry(
480                message=b'Could not start flux capacitor: pw::Status=-1',
481                file=b'my/path/file.cc',
482                line_level=Log.pack_line_level(123, logging.INFO),
483            )
484        )
485        self.assertEqual(
486            result,
487            expected_log,
488            msg='Status was not extracted from log message.',
489        )
490
491    def test_status_code_is_name(self):
492        """Test if the status code format includes the name instead."""
493        expected_log = Log(
494            message='Cannot use flux capacitor: pw::Status=PERMISSION_DENIED',
495            file_and_line='my/path/file.cc:123',
496            level=logging.INFO,
497            source_name=self.decoder.source_name,
498            timestamp='0:00',
499        )
500        result = self.decoder.parse_log_entry_proto(
501            log_pb2.LogEntry(
502                message=(
503                    b'Cannot use flux capacitor: pw::Status=PERMISSION_DENIED'
504                ),
505                file=b'my/path/file.cc',
506                line_level=Log.pack_line_level(123, logging.INFO),
507            )
508        )
509        self.assertEqual(
510            result,
511            expected_log,
512            msg='Status was not extracted from log message.',
513        )
514
515    def test_spelling_mistakes_with_status_keyword(self):
516        """Test spelling mistakes with status keyword."""
517        expected_log = Log(
518            message='Could not start flux capacitor: pw::Rtatus=12',
519            file_and_line='my/path/file.cc:123',
520            level=logging.INFO,
521            source_name=self.decoder.source_name,
522            timestamp='0:00',
523        )
524        result = self.decoder.parse_log_entry_proto(
525            log_pb2.LogEntry(
526                message=b'Could not start flux capacitor: pw::Rtatus=12',
527                file=b'my/path/file.cc',
528                line_level=Log.pack_line_level(123, logging.INFO),
529            )
530        )
531        self.assertEqual(
532            result,
533            expected_log,
534            msg='Status was not extracted from log message.',
535        )
536
537    def test_spelling_mistakes_with_status_keyword_lowercase_s(self):
538        """Test spelling mistakes with status keyword."""
539        expected_log = Log(
540            message='Could not start flux capacitor: pw::status=13',
541            file_and_line='my/path/file.cc:123',
542            level=logging.INFO,
543            source_name=self.decoder.source_name,
544            timestamp='0:00',
545        )
546        result = self.decoder.parse_log_entry_proto(
547            log_pb2.LogEntry(
548                message=b'Could not start flux capacitor: pw::status=13',
549                file=b'my/path/file.cc',
550                line_level=Log.pack_line_level(123, logging.INFO),
551            )
552        )
553        self.assertEqual(
554            result,
555            expected_log,
556            msg='Status was not extracted from log message.',
557        )
558
559    def test_status_code_at_beginning_of_message(self):
560        """Test embedded status argument is found."""
561        expected_log = Log(
562            message='UNAVAILABLE to connect to server.',
563            file_and_line='my/path/file.cc:123',
564            level=logging.INFO,
565            source_name=self.decoder.source_name,
566            timestamp='0:00',
567        )
568        result = self.decoder.parse_log_entry_proto(
569            log_pb2.LogEntry(
570                message=b'pw::Status=14 to connect to server.',
571                file=b'my/path/file.cc',
572                line_level=Log.pack_line_level(123, logging.INFO),
573            )
574        )
575        self.assertEqual(
576            result,
577            expected_log,
578            msg='Status was not extracted from log message.',
579        )
580
581    def test_status_code_in_the_middle_of_message(self):
582        """Test embedded status argument is found."""
583        expected_log = Log(
584            message='Connection error: UNAVAILABLE connecting to server.',
585            file_and_line='my/path/file.cc:123',
586            level=logging.INFO,
587            source_name=self.decoder.source_name,
588            timestamp='0:00',
589        )
590        result = self.decoder.parse_log_entry_proto(
591            log_pb2.LogEntry(
592                message=(
593                    b'Connection error: pw::Status=14 connecting to server.'
594                ),
595                file=b'my/path/file.cc',
596                line_level=Log.pack_line_level(123, logging.INFO),
597            )
598        )
599        self.assertEqual(
600            result,
601            expected_log,
602            msg='Status was not extracted from log message.',
603        )
604
605    def test_status_code_with_no_surrounding_spaces(self):
606        """Test embedded status argument is found."""
607        expected_log = Log(
608            message='Connection error:UNAVAILABLEconnecting to server.',
609            file_and_line='my/path/file.cc:123',
610            level=logging.INFO,
611            source_name=self.decoder.source_name,
612            timestamp='0:00',
613        )
614        result = self.decoder.parse_log_entry_proto(
615            log_pb2.LogEntry(
616                message=b'Connection error:pw::Status=14connecting to server.',
617                file=b'my/path/file.cc',
618                line_level=Log.pack_line_level(123, logging.INFO),
619            )
620        )
621        self.assertEqual(
622            result,
623            expected_log,
624            msg='Status was not extracted from log message.',
625        )
626
627    def test_multiple_status_arguments_in_log_message(self):
628        """Test replacement of multiple status arguments into status string."""
629        expected_log = Log(
630            message='Connection error: UNAVAILABLE and PERMISSION_DENIED.',
631            file_and_line='my/path/file.cc:123',
632            level=logging.INFO,
633            source_name=self.decoder.source_name,
634            timestamp='0:00',
635        )
636        result = self.decoder.parse_log_entry_proto(
637            log_pb2.LogEntry(
638                message=b'Connection error: pw::Status=14 and pw::Status=7.',
639                file=b'my/path/file.cc',
640                line_level=Log.pack_line_level(123, logging.INFO),
641            )
642        )
643        self.assertEqual(
644            result,
645            expected_log,
646            msg='Status was not extracted from log message.',
647        )
648
649    def test_no_filename_in_message_parses_successfully(self):
650        """Test that if the file name is not present the log entry is parsed."""
651        thread_name = 'thread'
652
653        log_entry = log_pb2.LogEntry(
654            message=pw_tokenizer.encode.encode_token_and_args(
655                _MESSAGE_TOKEN_NO_FILENAME
656            ),
657            line_level=Log.pack_line_level(123, logging.DEBUG),
658            thread=bytes(thread_name.encode('utf-8')),
659        )
660        expected_log = Log(
661            message=_MESSAGE_NO_FILENAME,
662            file_and_line=':123',
663            level=logging.DEBUG,
664            source_name=self.decoder.source_name,
665            timestamp='0:00',
666            thread_name=thread_name,
667        )
668        result = self.decoder.parse_log_entry_proto(log_entry)
669        self.assertEqual(result, expected_log)
670
671    def test_log_decoded_log(self):
672        """Test that the logger correctly formats a decoded log."""
673        test_log = Log(
674            message="SampleMessage",
675            level=logging.DEBUG,
676            timestamp='1:30',
677            module_name="MyModule",
678            source_name="MySource",
679            thread_name="MyThread",
680            file_and_line='my_file.cc:123',
681            metadata_fields={'field1': 432, 'field2': 'value'},
682        )
683
684        class CapturingLogger(logging.Logger):
685            """Captures values passed to log().
686
687            Tests that calls to log() have the correct level, extra arguments
688            and the message format string and arguments match.
689            """
690
691            @dataclass(frozen=True)
692            class LoggerLog:
693                """Represents a process log() call."""
694
695                level: int
696                message: str
697                kwargs: Any
698
699            def __init__(self):
700                super().__init__(name="CapturingLogger")
701                self.log_calls: list[CapturingLogger.LoggerLog] = []
702
703            def log(self, level, msg, *args, **kwargs) -> None:
704                log = CapturingLogger.LoggerLog(
705                    level=level, message=msg % args, kwargs=kwargs
706                )
707                self.log_calls.append(log)
708
709        test_logger = CapturingLogger()
710        log_decoded_log(test_log, test_logger)
711        self.assertEqual(len(test_logger.log_calls), 1)
712        self.assertEqual(test_logger.log_calls[0].level, test_log.level)
713        self.assertEqual(
714            test_logger.log_calls[0].message,
715            '[%s] %s %s %s %s'
716            % (
717                test_log.source_name,
718                test_log.module_name,
719                test_log.timestamp,
720                test_log.message,
721                test_log.file_and_line,
722            ),
723        )
724        self.assertEqual(
725            test_logger.log_calls[0].kwargs['extra']['extra_metadata_fields'],
726            test_log.metadata_fields,
727        )
728
729
730class TestLogStreamDecoderLogDropDetectionFunctionality(
731    TestLogStreamDecoderBase
732):
733    """Tests LogStreamDecoder log drop detection functionality."""
734
735    def test_log_drops_transport_error(self):
736        """Tests log drops at transport."""
737        log_entry_in_log_entries_1 = 3
738        log_entries_1 = log_pb2.LogEntries(
739            first_entry_sequence_id=0,
740            entries=[
741                _create_random_log_entry()
742                for _ in range(log_entry_in_log_entries_1)
743            ],
744        )
745        self.decoder.parse_log_entries_proto(log_entries_1)
746        # Assume a second LogEntries was dropped with 5 log entries. I.e. a
747        # log_entries_2 with sequence_ie = 4 and 5 log entries.
748        log_entry_in_log_entries_2 = 5
749        log_entry_in_log_entries_3 = 3
750        log_entries_3 = log_pb2.LogEntries(
751            first_entry_sequence_id=log_entry_in_log_entries_2
752            + log_entry_in_log_entries_3,
753            entries=[
754                _create_random_log_entry()
755                for _ in range(log_entry_in_log_entries_3)
756            ],
757        )
758        self.decoder.parse_log_entries_proto(log_entries_3)
759
760        # The drop message is placed where the dropped logs were detected.
761        self.assertEqual(
762            len(self.captured_logs),
763            log_entry_in_log_entries_1 + 1 + log_entry_in_log_entries_3,
764            msg=(
765                'Unexpected number of messages received: '
766                f'{self._captured_logs_as_str()}'
767            ),
768        )
769        self.assertEqual(
770            (
771                f'Dropped {log_entry_in_log_entries_2} logs due to '
772                f'{LogStreamDecoder.DROP_REASON_LOSS_AT_TRANSPORT}'
773            ),
774            self.captured_logs[log_entry_in_log_entries_1].message,
775        )
776
777    def test_log_drops_source_not_connected(self):
778        """Tests log drops when source of the logs was not connected."""
779        log_entry_in_log_entries = 4
780        drop_count = 7
781        log_entries = log_pb2.LogEntries(
782            first_entry_sequence_id=drop_count,
783            entries=[
784                _create_random_log_entry()
785                for _ in range(log_entry_in_log_entries)
786            ],
787        )
788        self.decoder.parse_log_entries_proto(log_entries)
789
790        # The drop message is placed where the log drops was detected.
791        self.assertEqual(
792            len(self.captured_logs),
793            1 + log_entry_in_log_entries,
794            msg=(
795                'Unexpected number of messages received: '
796                f'{self._captured_logs_as_str()}'
797            ),
798        )
799        self.assertEqual(
800            (
801                f'Dropped {drop_count} logs due to '
802                f'{LogStreamDecoder.DROP_REASON_SOURCE_NOT_CONNECTED}'
803            ),
804            self.captured_logs[0].message,
805        )
806
807    def test_log_drops_source_enqueue_failure_no_message(self):
808        """Tests log drops when source reports log drops."""
809        drop_count = 5
810        log_entries = log_pb2.LogEntries(
811            first_entry_sequence_id=0,
812            entries=[
813                _create_random_log_entry(),
814                _create_random_log_entry(),
815                _create_drop_count_message_log_entry(drop_count),
816                _create_random_log_entry(),
817            ],
818        )
819        self.decoder.parse_log_entries_proto(log_entries)
820
821        # The drop message is placed where the log drops was detected.
822        self.assertEqual(
823            len(self.captured_logs),
824            4,
825            msg=(
826                'Unexpected number of messages received: '
827                f'{self._captured_logs_as_str()}'
828            ),
829        )
830        self.assertEqual(
831            (
832                f'Dropped {drop_count} logs due to '
833                f'{LogStreamDecoder.DROP_REASON_SOURCE_ENQUEUE_FAILURE}'
834            ),
835            self.captured_logs[2].message,
836        )
837
838    def test_log_drops_source_enqueue_failure_with_message(self):
839        """Tests log drops when source reports log drops."""
840        drop_count = 8
841        reason = 'Flux Capacitor exploded'
842        log_entries = log_pb2.LogEntries(
843            first_entry_sequence_id=0,
844            entries=[
845                _create_random_log_entry(),
846                _create_random_log_entry(),
847                _create_drop_count_message_log_entry(drop_count, reason),
848                _create_random_log_entry(),
849            ],
850        )
851        self.decoder.parse_log_entries_proto(log_entries)
852
853        # The drop message is placed where the log drops was detected.
854        self.assertEqual(
855            len(self.captured_logs),
856            4,
857            msg=(
858                'Unexpected number of messages received: '
859                f'{self._captured_logs_as_str()}'
860            ),
861        )
862        self.assertEqual(
863            f'Dropped {drop_count} logs due to {reason.lower()}',
864            self.captured_logs[2].message,
865        )
866
867
868if __name__ == '__main__':
869    main()
870