1# Copyright 2023 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14 15"""Log decoder tests.""" 16 17from dataclasses import dataclass 18import logging 19from random import randint 20from typing import Any 21from unittest import TestCase, main 22 23from pw_log.log_decoder import ( 24 Log, 25 LogStreamDecoder, 26 log_decoded_log, 27 pw_status_code_to_name, 28 timestamp_parser_ns_since_boot, 29) 30from pw_log.proto import log_pb2 31import pw_tokenizer 32 33_MESSAGE_NO_FILENAME = 'World' 34_MESSAGE_AND_ARGS_NO_FILENAME = f'■msg♦{_MESSAGE_NO_FILENAME}' 35_MESSAGE_TOKEN_NO_FILENAME = pw_tokenizer.tokens.pw_tokenizer_65599_hash( 36 _MESSAGE_AND_ARGS_NO_FILENAME 37) 38 39# Creating a database with tokenized information for the core to detokenize 40# tokenized log entries. 41_TOKEN_DATABASE = pw_tokenizer.tokens.Database( 42 [ 43 pw_tokenizer.tokens.TokenizedStringEntry(0x01148A48, 'total_dropped'), 44 pw_tokenizer.tokens.TokenizedStringEntry( 45 0x03796798, 'min_queue_remaining' 46 ), 47 pw_tokenizer.tokens.TokenizedStringEntry(0x2E668CD6, 'Jello, world!'), 48 pw_tokenizer.tokens.TokenizedStringEntry(0x329481A2, 'parser_errors'), 49 pw_tokenizer.tokens.TokenizedStringEntry(0x7F35A9A5, 'TestName'), 50 pw_tokenizer.tokens.TokenizedStringEntry(0xCC6D3131, 'Jello?'), 51 pw_tokenizer.tokens.TokenizedStringEntry( 52 0x144C501D, '■msg♦SampleMessage■module♦MODULE■file♦file/path.cc' 53 ), 54 pw_tokenizer.tokens.TokenizedStringEntry(0x0000106A, 'ModuleOrMessage'), 55 pw_tokenizer.tokens.TokenizedStringEntry( 56 pw_tokenizer.tokens.pw_tokenizer_65599_hash( 57 '■msg♦World■module♦wifi■file♦/path/to/file.cc' 58 ), 59 '■msg♦World■module♦wifi■file♦/path/to/file.cc', 60 ), 61 pw_tokenizer.tokens.TokenizedStringEntry( 62 _MESSAGE_TOKEN_NO_FILENAME, _MESSAGE_AND_ARGS_NO_FILENAME 63 ), 64 ] 65) 66_DETOKENIZER = pw_tokenizer.Detokenizer(_TOKEN_DATABASE) 67 68 69def _create_log_entry_with_tokenized_fields( 70 message: str, module: str, file: str, thread: str, line: int, level: int 71) -> log_pb2.LogEntry: 72 """Tokenizing tokenizable LogEntry fields to become a detoknized log.""" 73 tokenized_message = pw_tokenizer.encode.encode_token_and_args( 74 pw_tokenizer.tokens.pw_tokenizer_65599_hash(message) 75 ) 76 tokenized_module = pw_tokenizer.encode.encode_token_and_args( 77 pw_tokenizer.tokens.pw_tokenizer_65599_hash(module) 78 ) 79 tokenized_file = pw_tokenizer.encode.encode_token_and_args( 80 pw_tokenizer.tokens.pw_tokenizer_65599_hash(file) 81 ) 82 tokenized_thread = pw_tokenizer.encode.encode_token_and_args( 83 pw_tokenizer.tokens.pw_tokenizer_65599_hash(thread) 84 ) 85 86 return log_pb2.LogEntry( 87 message=tokenized_message, 88 module=tokenized_module, 89 file=tokenized_file, 90 line_level=Log.pack_line_level(line, level), 91 thread=tokenized_thread, 92 ) 93 94 95def _create_random_log_entry() -> log_pb2.LogEntry: 96 return log_pb2.LogEntry( 97 message=bytes(f'message {randint(1,100)}'.encode('utf-8')), 98 line_level=Log.pack_line_level( 99 randint(0, 2000), randint(logging.DEBUG, logging.CRITICAL) 100 ), 101 file=b'main.cc', 102 thread=bytes(f'thread {randint(1,5)}'.encode('utf-8')), 103 ) 104 105 106def _create_drop_count_message_log_entry( 107 drop_count: int, reason: str = '' 108) -> log_pb2.LogEntry: 109 log_entry = log_pb2.LogEntry(dropped=drop_count) 110 if reason: 111 log_entry.message = bytes(reason.encode('utf-8')) 112 return log_entry 113 114 115class TestLogStreamDecoderBase(TestCase): 116 """Base Test class for LogStreamDecoder.""" 117 118 def setUp(self) -> None: 119 """Set up logs decoder.""" 120 121 def parse_pw_status(msg: str) -> str: 122 return pw_status_code_to_name(msg) 123 124 self.captured_logs: list[Log] = [] 125 126 def decoded_log_handler(log: Log) -> None: 127 self.captured_logs.append(log) 128 129 self.decoder = LogStreamDecoder( 130 decoded_log_handler=decoded_log_handler, 131 detokenizer=_DETOKENIZER, 132 source_name='source', 133 timestamp_parser=timestamp_parser_ns_since_boot, 134 message_parser=parse_pw_status, 135 ) 136 137 def _captured_logs_as_str(self) -> str: 138 return '\n'.join(map(str, self.captured_logs)) 139 140 141class TestLogStreamDecoderDecodingFunctionality(TestLogStreamDecoderBase): 142 """Tests LogStreamDecoder decoding functionality.""" 143 144 def test_parse_log_entry_valid_non_tokenized(self) -> None: 145 """Test that valid LogEntry protos are parsed correctly.""" 146 expected_log = Log( 147 message='Hello', 148 file_and_line='my/path/file.cc:123', 149 level=logging.INFO, 150 source_name=self.decoder.source_name, 151 timestamp='0:00', 152 ) 153 result = self.decoder.parse_log_entry_proto( 154 log_pb2.LogEntry( 155 message=b'Hello', 156 file=b'my/path/file.cc', 157 line_level=Log.pack_line_level(123, logging.INFO), 158 ) 159 ) 160 self.assertEqual(result, expected_log) 161 162 def test_parse_log_entry_valid_packed_message(self) -> None: 163 """Test that valid LogEntry protos are parsed correctly.""" 164 log_with_metadata_in_message = Log( 165 message='World', 166 file_and_line='/path/to/file.cc', 167 level=logging.DEBUG, 168 source_name=self.decoder.source_name, 169 module_name='wifi', 170 timestamp='0:00', 171 ) 172 result = self.decoder.parse_log_entry_proto( 173 log_pb2.LogEntry( 174 message=bytes( 175 '■msg♦World■module♦wifi■file♦/path/to/file.cc'.encode( 176 'utf-8' 177 ) 178 ), 179 line_level=Log.pack_line_level(0, logging.DEBUG), 180 timestamp=100, 181 ) 182 ) 183 self.assertEqual(result, log_with_metadata_in_message) 184 185 def test_parse_log_entry_valid_logs_drop_message(self) -> None: 186 """Test that valid LogEntry protos are parsed correctly.""" 187 dropped_message = Log( 188 message='Dropped 30 logs due to buffer too small', 189 level=logging.WARNING, 190 source_name=self.decoder.source_name, 191 ) 192 result = self.decoder.parse_log_entry_proto( 193 log_pb2.LogEntry(message=b'buffer too small', dropped=30) 194 ) 195 self.assertEqual(result, dropped_message) 196 197 def test_parse_log_entry_valid_tokenized(self) -> None: 198 """Test that tokenized LogEntry protos are parsed correctly.""" 199 message = 'Jello, world!' 200 module_name = 'TestName' 201 file = 'parser_errors' 202 thread_name = 'Jello?' 203 line = 123 204 level = logging.INFO 205 206 expected_log = Log( 207 message=message, 208 module_name=module_name, 209 file_and_line=file + ':123', 210 level=logging.INFO, 211 source_name=self.decoder.source_name, 212 timestamp='0:00', 213 thread_name=thread_name, 214 ) 215 216 log_entry = _create_log_entry_with_tokenized_fields( 217 message, module_name, file, thread_name, line, level 218 ) 219 result = self.decoder.parse_log_entry_proto(log_entry) 220 self.assertEqual(result, expected_log, msg='Log was not detokenized') 221 222 def test_tokenized_contents_not_detokenized(self): 223 """Test fields with tokens not in the database are not decrypted.""" 224 # The following strings do not have tokens in the device token db. 225 message_not_in_db = 'device is shutting down.' 226 module_name_not_in_db = 'Battery' 227 file_not_in_db = 'charger.cc' 228 thread_name_not_in_db = 'BatteryStatus' 229 line = 123 230 level = logging.INFO 231 232 log_entry = _create_log_entry_with_tokenized_fields( 233 message_not_in_db, 234 module_name_not_in_db, 235 file_not_in_db, 236 thread_name_not_in_db, 237 line, 238 level, 239 ) 240 message = pw_tokenizer.proto.decode_optionally_tokenized( 241 _DETOKENIZER, log_entry.message 242 ) 243 module = pw_tokenizer.proto.decode_optionally_tokenized( 244 _DETOKENIZER, log_entry.module 245 ) 246 file = pw_tokenizer.proto.decode_optionally_tokenized( 247 _DETOKENIZER, log_entry.file 248 ) 249 thread = pw_tokenizer.proto.decode_optionally_tokenized( 250 _DETOKENIZER, log_entry.thread 251 ) 252 expected_log = Log( 253 message=message, 254 module_name=module, 255 file_and_line=file + ':123', 256 level=logging.INFO, 257 source_name=self.decoder.source_name, 258 timestamp='0:00', 259 thread_name=thread, 260 ) 261 result = self.decoder.parse_log_entry_proto(log_entry) 262 self.assertEqual( 263 result, expected_log, msg='Log was unexpectedly detokenized' 264 ) 265 266 def test_extracting_log_entry_fields_from_tokenized_metadata(self): 267 """Test that tokenized metadata can be used to extract other fields.""" 268 metadata = '■msg♦World■module♦wifi■file♦/path/to/file.cc' 269 thread_name = 'M0Log' 270 271 log_entry = log_pb2.LogEntry( 272 message=pw_tokenizer.encode.encode_token_and_args( 273 pw_tokenizer.tokens.pw_tokenizer_65599_hash(metadata) 274 ), 275 line_level=Log.pack_line_level(0, logging.DEBUG), 276 thread=bytes(thread_name.encode('utf-8')), 277 ) 278 279 log_with_metadata_in_message = Log( 280 message='World', 281 file_and_line='/path/to/file.cc', 282 level=logging.DEBUG, 283 source_name=self.decoder.source_name, 284 module_name='wifi', 285 timestamp='0:00', 286 thread_name=thread_name, 287 ) 288 289 result = self.decoder.parse_log_entry_proto(log_entry) 290 self.assertEqual( 291 result, log_with_metadata_in_message, msg='Log was detokenized.' 292 ) 293 294 def test_extracting_status_argument_from_log_message(self): 295 """Test extract status from log message.""" 296 expected_log = Log( 297 message='Could not start flux capacitor: PERMISSION_DENIED', 298 file_and_line='my/path/file.cc:123', 299 level=logging.INFO, 300 source_name=self.decoder.source_name, 301 timestamp='0:00', 302 ) 303 result = self.decoder.parse_log_entry_proto( 304 log_pb2.LogEntry( 305 message=b'Could not start flux capacitor: pw::Status=7', 306 file=b'my/path/file.cc', 307 line_level=Log.pack_line_level(123, logging.INFO), 308 ) 309 ) 310 self.assertEqual( 311 result, 312 expected_log, 313 msg='Status was not extracted from log message.', 314 ) 315 316 expected_log = Log( 317 message='Error connecting to server: UNAVAILABLE', 318 file_and_line='my/path/file.cc:123', 319 level=logging.INFO, 320 source_name=self.decoder.source_name, 321 timestamp='0:00', 322 ) 323 result = self.decoder.parse_log_entry_proto( 324 log_pb2.LogEntry( 325 message=b'Error connecting to server: pw::Status=14', 326 file=b'my/path/file.cc', 327 line_level=Log.pack_line_level(123, logging.INFO), 328 ) 329 ) 330 self.assertEqual( 331 result, 332 expected_log, 333 msg='Status was not extracted from log message.', 334 ) 335 336 def test_extracting_status_with_improper_spacing(self): 337 """Test spaces before pw::Status are ignored.""" 338 expected_log = Log( 339 message='Error connecting to server:UNAVAILABLE', 340 file_and_line='my/path/file.cc:123', 341 level=logging.INFO, 342 source_name=self.decoder.source_name, 343 timestamp='0:00', 344 ) 345 346 result = self.decoder.parse_log_entry_proto( 347 log_pb2.LogEntry( 348 message=b'Error connecting to server:pw::Status=14', 349 file=b'my/path/file.cc', 350 line_level=Log.pack_line_level(123, logging.INFO), 351 ) 352 ) 353 self.assertEqual( 354 result, 355 expected_log, 356 msg='Status was not extracted from log message.', 357 ) 358 359 expected_log = Log( 360 message='Error connecting to server: UNAVAILABLE', 361 file_and_line='my/path/file.cc:123', 362 level=logging.INFO, 363 source_name=self.decoder.source_name, 364 timestamp='0:00', 365 ) 366 367 result = self.decoder.parse_log_entry_proto( 368 log_pb2.LogEntry( 369 message=b'Error connecting to server: pw::Status=14', 370 file=b'my/path/file.cc', 371 line_level=Log.pack_line_level(123, logging.INFO), 372 ) 373 ) 374 self.assertEqual( 375 result, 376 expected_log, 377 msg='Status was not extracted from log message.', 378 ) 379 380 def test_not_extracting_status_extra_space_before_code(self): 381 """Test spaces after pw::Status are not allowed.""" 382 expected_log = Log( 383 message='Could not start flux capacitor: pw::Status= 7', 384 file_and_line='my/path/file.cc:123', 385 level=logging.INFO, 386 source_name=self.decoder.source_name, 387 timestamp='0:00', 388 ) 389 390 result = self.decoder.parse_log_entry_proto( 391 log_pb2.LogEntry( 392 message=b'Could not start flux capacitor: pw::Status= 7', 393 file=b'my/path/file.cc', 394 line_level=Log.pack_line_level(123, logging.INFO), 395 ) 396 ) 397 self.assertEqual( 398 result, 399 expected_log, 400 msg='Status was not extracted from log message.', 401 ) 402 403 def test_not_extracting_status_new_line_before_code(self): 404 """Test new line characters after pw::Status are not allowed.""" 405 expected_log = Log( 406 message='Could not start flux capacitor: pw::Status=\n7', 407 file_and_line='my/path/file.cc:123', 408 level=logging.INFO, 409 source_name=self.decoder.source_name, 410 timestamp='0:00', 411 ) 412 result = self.decoder.parse_log_entry_proto( 413 log_pb2.LogEntry( 414 message=b'Could not start flux capacitor: pw::Status=\n7', 415 file=b'my/path/file.cc', 416 line_level=Log.pack_line_level(123, logging.INFO), 417 ) 418 ) 419 self.assertEqual( 420 result, 421 expected_log, 422 msg='Status was not extracted from log message.', 423 ) 424 425 def test_not_extracting_status_from_log_message_with_improper_format(self): 426 """Test status not extracted from log message with incorrect format.""" 427 expected_log = Log( 428 message='Could not start flux capacitor: pw::Status12', 429 file_and_line='my/path/file.cc:123', 430 level=logging.INFO, 431 source_name=self.decoder.source_name, 432 timestamp='0:00', 433 ) 434 result = self.decoder.parse_log_entry_proto( 435 log_pb2.LogEntry( 436 message=b'Could not start flux capacitor: pw::Status12', 437 file=b'my/path/file.cc', 438 line_level=Log.pack_line_level(123, logging.INFO), 439 ) 440 ) 441 self.assertEqual( 442 result, 443 expected_log, 444 msg='Status was not extracted from log message.', 445 ) 446 447 def test_status_code_in_message_does_not_exist(self): 448 """Test status does not exist in pw_status.""" 449 expected_log = Log( 450 message='Could not start flux capacitor: pw::Status=17', 451 file_and_line='my/path/file.cc:123', 452 level=logging.INFO, 453 source_name=self.decoder.source_name, 454 timestamp='0:00', 455 ) 456 result = self.decoder.parse_log_entry_proto( 457 log_pb2.LogEntry( 458 message=b'Could not start flux capacitor: pw::Status=17', 459 file=b'my/path/file.cc', 460 line_level=Log.pack_line_level(123, logging.INFO), 461 ) 462 ) 463 self.assertEqual( 464 result, 465 expected_log, 466 msg='Status was not extracted from log message.', 467 ) 468 469 def test_status_code_in_message_is_negative(self): 470 """Test status code is negative.""" 471 expected_log = Log( 472 message='Could not start flux capacitor: pw::Status=-1', 473 file_and_line='my/path/file.cc:123', 474 level=logging.INFO, 475 source_name=self.decoder.source_name, 476 timestamp='0:00', 477 ) 478 result = self.decoder.parse_log_entry_proto( 479 log_pb2.LogEntry( 480 message=b'Could not start flux capacitor: pw::Status=-1', 481 file=b'my/path/file.cc', 482 line_level=Log.pack_line_level(123, logging.INFO), 483 ) 484 ) 485 self.assertEqual( 486 result, 487 expected_log, 488 msg='Status was not extracted from log message.', 489 ) 490 491 def test_status_code_is_name(self): 492 """Test if the status code format includes the name instead.""" 493 expected_log = Log( 494 message='Cannot use flux capacitor: pw::Status=PERMISSION_DENIED', 495 file_and_line='my/path/file.cc:123', 496 level=logging.INFO, 497 source_name=self.decoder.source_name, 498 timestamp='0:00', 499 ) 500 result = self.decoder.parse_log_entry_proto( 501 log_pb2.LogEntry( 502 message=( 503 b'Cannot use flux capacitor: pw::Status=PERMISSION_DENIED' 504 ), 505 file=b'my/path/file.cc', 506 line_level=Log.pack_line_level(123, logging.INFO), 507 ) 508 ) 509 self.assertEqual( 510 result, 511 expected_log, 512 msg='Status was not extracted from log message.', 513 ) 514 515 def test_spelling_mistakes_with_status_keyword(self): 516 """Test spelling mistakes with status keyword.""" 517 expected_log = Log( 518 message='Could not start flux capacitor: pw::Rtatus=12', 519 file_and_line='my/path/file.cc:123', 520 level=logging.INFO, 521 source_name=self.decoder.source_name, 522 timestamp='0:00', 523 ) 524 result = self.decoder.parse_log_entry_proto( 525 log_pb2.LogEntry( 526 message=b'Could not start flux capacitor: pw::Rtatus=12', 527 file=b'my/path/file.cc', 528 line_level=Log.pack_line_level(123, logging.INFO), 529 ) 530 ) 531 self.assertEqual( 532 result, 533 expected_log, 534 msg='Status was not extracted from log message.', 535 ) 536 537 def test_spelling_mistakes_with_status_keyword_lowercase_s(self): 538 """Test spelling mistakes with status keyword.""" 539 expected_log = Log( 540 message='Could not start flux capacitor: pw::status=13', 541 file_and_line='my/path/file.cc:123', 542 level=logging.INFO, 543 source_name=self.decoder.source_name, 544 timestamp='0:00', 545 ) 546 result = self.decoder.parse_log_entry_proto( 547 log_pb2.LogEntry( 548 message=b'Could not start flux capacitor: pw::status=13', 549 file=b'my/path/file.cc', 550 line_level=Log.pack_line_level(123, logging.INFO), 551 ) 552 ) 553 self.assertEqual( 554 result, 555 expected_log, 556 msg='Status was not extracted from log message.', 557 ) 558 559 def test_status_code_at_beginning_of_message(self): 560 """Test embedded status argument is found.""" 561 expected_log = Log( 562 message='UNAVAILABLE to connect to server.', 563 file_and_line='my/path/file.cc:123', 564 level=logging.INFO, 565 source_name=self.decoder.source_name, 566 timestamp='0:00', 567 ) 568 result = self.decoder.parse_log_entry_proto( 569 log_pb2.LogEntry( 570 message=b'pw::Status=14 to connect to server.', 571 file=b'my/path/file.cc', 572 line_level=Log.pack_line_level(123, logging.INFO), 573 ) 574 ) 575 self.assertEqual( 576 result, 577 expected_log, 578 msg='Status was not extracted from log message.', 579 ) 580 581 def test_status_code_in_the_middle_of_message(self): 582 """Test embedded status argument is found.""" 583 expected_log = Log( 584 message='Connection error: UNAVAILABLE connecting to server.', 585 file_and_line='my/path/file.cc:123', 586 level=logging.INFO, 587 source_name=self.decoder.source_name, 588 timestamp='0:00', 589 ) 590 result = self.decoder.parse_log_entry_proto( 591 log_pb2.LogEntry( 592 message=( 593 b'Connection error: pw::Status=14 connecting to server.' 594 ), 595 file=b'my/path/file.cc', 596 line_level=Log.pack_line_level(123, logging.INFO), 597 ) 598 ) 599 self.assertEqual( 600 result, 601 expected_log, 602 msg='Status was not extracted from log message.', 603 ) 604 605 def test_status_code_with_no_surrounding_spaces(self): 606 """Test embedded status argument is found.""" 607 expected_log = Log( 608 message='Connection error:UNAVAILABLEconnecting to server.', 609 file_and_line='my/path/file.cc:123', 610 level=logging.INFO, 611 source_name=self.decoder.source_name, 612 timestamp='0:00', 613 ) 614 result = self.decoder.parse_log_entry_proto( 615 log_pb2.LogEntry( 616 message=b'Connection error:pw::Status=14connecting to server.', 617 file=b'my/path/file.cc', 618 line_level=Log.pack_line_level(123, logging.INFO), 619 ) 620 ) 621 self.assertEqual( 622 result, 623 expected_log, 624 msg='Status was not extracted from log message.', 625 ) 626 627 def test_multiple_status_arguments_in_log_message(self): 628 """Test replacement of multiple status arguments into status string.""" 629 expected_log = Log( 630 message='Connection error: UNAVAILABLE and PERMISSION_DENIED.', 631 file_and_line='my/path/file.cc:123', 632 level=logging.INFO, 633 source_name=self.decoder.source_name, 634 timestamp='0:00', 635 ) 636 result = self.decoder.parse_log_entry_proto( 637 log_pb2.LogEntry( 638 message=b'Connection error: pw::Status=14 and pw::Status=7.', 639 file=b'my/path/file.cc', 640 line_level=Log.pack_line_level(123, logging.INFO), 641 ) 642 ) 643 self.assertEqual( 644 result, 645 expected_log, 646 msg='Status was not extracted from log message.', 647 ) 648 649 def test_no_filename_in_message_parses_successfully(self): 650 """Test that if the file name is not present the log entry is parsed.""" 651 thread_name = 'thread' 652 653 log_entry = log_pb2.LogEntry( 654 message=pw_tokenizer.encode.encode_token_and_args( 655 _MESSAGE_TOKEN_NO_FILENAME 656 ), 657 line_level=Log.pack_line_level(123, logging.DEBUG), 658 thread=bytes(thread_name.encode('utf-8')), 659 ) 660 expected_log = Log( 661 message=_MESSAGE_NO_FILENAME, 662 file_and_line=':123', 663 level=logging.DEBUG, 664 source_name=self.decoder.source_name, 665 timestamp='0:00', 666 thread_name=thread_name, 667 ) 668 result = self.decoder.parse_log_entry_proto(log_entry) 669 self.assertEqual(result, expected_log) 670 671 def test_log_decoded_log(self): 672 """Test that the logger correctly formats a decoded log.""" 673 test_log = Log( 674 message="SampleMessage", 675 level=logging.DEBUG, 676 timestamp='1:30', 677 module_name="MyModule", 678 source_name="MySource", 679 thread_name="MyThread", 680 file_and_line='my_file.cc:123', 681 metadata_fields={'field1': 432, 'field2': 'value'}, 682 ) 683 684 class CapturingLogger(logging.Logger): 685 """Captures values passed to log(). 686 687 Tests that calls to log() have the correct level, extra arguments 688 and the message format string and arguments match. 689 """ 690 691 @dataclass(frozen=True) 692 class LoggerLog: 693 """Represents a process log() call.""" 694 695 level: int 696 message: str 697 kwargs: Any 698 699 def __init__(self): 700 super().__init__(name="CapturingLogger") 701 self.log_calls: list[CapturingLogger.LoggerLog] = [] 702 703 def log(self, level, msg, *args, **kwargs) -> None: 704 log = CapturingLogger.LoggerLog( 705 level=level, message=msg % args, kwargs=kwargs 706 ) 707 self.log_calls.append(log) 708 709 test_logger = CapturingLogger() 710 log_decoded_log(test_log, test_logger) 711 self.assertEqual(len(test_logger.log_calls), 1) 712 self.assertEqual(test_logger.log_calls[0].level, test_log.level) 713 self.assertEqual( 714 test_logger.log_calls[0].message, 715 '[%s] %s %s %s %s' 716 % ( 717 test_log.source_name, 718 test_log.module_name, 719 test_log.timestamp, 720 test_log.message, 721 test_log.file_and_line, 722 ), 723 ) 724 self.assertEqual( 725 test_logger.log_calls[0].kwargs['extra']['extra_metadata_fields'], 726 test_log.metadata_fields, 727 ) 728 729 730class TestLogStreamDecoderLogDropDetectionFunctionality( 731 TestLogStreamDecoderBase 732): 733 """Tests LogStreamDecoder log drop detection functionality.""" 734 735 def test_log_drops_transport_error(self): 736 """Tests log drops at transport.""" 737 log_entry_in_log_entries_1 = 3 738 log_entries_1 = log_pb2.LogEntries( 739 first_entry_sequence_id=0, 740 entries=[ 741 _create_random_log_entry() 742 for _ in range(log_entry_in_log_entries_1) 743 ], 744 ) 745 self.decoder.parse_log_entries_proto(log_entries_1) 746 # Assume a second LogEntries was dropped with 5 log entries. I.e. a 747 # log_entries_2 with sequence_ie = 4 and 5 log entries. 748 log_entry_in_log_entries_2 = 5 749 log_entry_in_log_entries_3 = 3 750 log_entries_3 = log_pb2.LogEntries( 751 first_entry_sequence_id=log_entry_in_log_entries_2 752 + log_entry_in_log_entries_3, 753 entries=[ 754 _create_random_log_entry() 755 for _ in range(log_entry_in_log_entries_3) 756 ], 757 ) 758 self.decoder.parse_log_entries_proto(log_entries_3) 759 760 # The drop message is placed where the dropped logs were detected. 761 self.assertEqual( 762 len(self.captured_logs), 763 log_entry_in_log_entries_1 + 1 + log_entry_in_log_entries_3, 764 msg=( 765 'Unexpected number of messages received: ' 766 f'{self._captured_logs_as_str()}' 767 ), 768 ) 769 self.assertEqual( 770 ( 771 f'Dropped {log_entry_in_log_entries_2} logs due to ' 772 f'{LogStreamDecoder.DROP_REASON_LOSS_AT_TRANSPORT}' 773 ), 774 self.captured_logs[log_entry_in_log_entries_1].message, 775 ) 776 777 def test_log_drops_source_not_connected(self): 778 """Tests log drops when source of the logs was not connected.""" 779 log_entry_in_log_entries = 4 780 drop_count = 7 781 log_entries = log_pb2.LogEntries( 782 first_entry_sequence_id=drop_count, 783 entries=[ 784 _create_random_log_entry() 785 for _ in range(log_entry_in_log_entries) 786 ], 787 ) 788 self.decoder.parse_log_entries_proto(log_entries) 789 790 # The drop message is placed where the log drops was detected. 791 self.assertEqual( 792 len(self.captured_logs), 793 1 + log_entry_in_log_entries, 794 msg=( 795 'Unexpected number of messages received: ' 796 f'{self._captured_logs_as_str()}' 797 ), 798 ) 799 self.assertEqual( 800 ( 801 f'Dropped {drop_count} logs due to ' 802 f'{LogStreamDecoder.DROP_REASON_SOURCE_NOT_CONNECTED}' 803 ), 804 self.captured_logs[0].message, 805 ) 806 807 def test_log_drops_source_enqueue_failure_no_message(self): 808 """Tests log drops when source reports log drops.""" 809 drop_count = 5 810 log_entries = log_pb2.LogEntries( 811 first_entry_sequence_id=0, 812 entries=[ 813 _create_random_log_entry(), 814 _create_random_log_entry(), 815 _create_drop_count_message_log_entry(drop_count), 816 _create_random_log_entry(), 817 ], 818 ) 819 self.decoder.parse_log_entries_proto(log_entries) 820 821 # The drop message is placed where the log drops was detected. 822 self.assertEqual( 823 len(self.captured_logs), 824 4, 825 msg=( 826 'Unexpected number of messages received: ' 827 f'{self._captured_logs_as_str()}' 828 ), 829 ) 830 self.assertEqual( 831 ( 832 f'Dropped {drop_count} logs due to ' 833 f'{LogStreamDecoder.DROP_REASON_SOURCE_ENQUEUE_FAILURE}' 834 ), 835 self.captured_logs[2].message, 836 ) 837 838 def test_log_drops_source_enqueue_failure_with_message(self): 839 """Tests log drops when source reports log drops.""" 840 drop_count = 8 841 reason = 'Flux Capacitor exploded' 842 log_entries = log_pb2.LogEntries( 843 first_entry_sequence_id=0, 844 entries=[ 845 _create_random_log_entry(), 846 _create_random_log_entry(), 847 _create_drop_count_message_log_entry(drop_count, reason), 848 _create_random_log_entry(), 849 ], 850 ) 851 self.decoder.parse_log_entries_proto(log_entries) 852 853 # The drop message is placed where the log drops was detected. 854 self.assertEqual( 855 len(self.captured_logs), 856 4, 857 msg=( 858 'Unexpected number of messages received: ' 859 f'{self._captured_logs_as_str()}' 860 ), 861 ) 862 self.assertEqual( 863 f'Dropped {drop_count} logs due to {reason.lower()}', 864 self.captured_logs[2].message, 865 ) 866 867 868if __name__ == '__main__': 869 main() 870