1#!/usr/bin/env python3 2# Copyright 2020 The Pigweed Authors 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5# use this file except in compliance with the License. You may obtain a copy of 6# the License at 7# 8# https://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15"""Tests for detokenize.""" 16 17import base64 18import concurrent 19import datetime as dt 20import functools 21import io 22import os 23from pathlib import Path 24import struct 25import tempfile 26from typing import Any, Callable, NamedTuple 27import unittest 28from unittest import mock 29 30from pw_tokenizer import database 31from pw_tokenizer import detokenize 32from pw_tokenizer import elf_reader 33from pw_tokenizer import tokens 34 35 36# This function is not part of this test. It was used to generate the binary 37# strings for EMPTY_ELF and ELF_WITH_TOKENIZER_SECTIONS. It takes a path and 38# returns a Python byte string suitable for copying into Python source code. 39def path_to_byte_string(path): 40 with open(path, 'rb') as fd: 41 data = fd.read() 42 43 output = [] 44 indices = iter(range(len(data))) 45 46 while True: 47 line = '' 48 49 while len(line) < 70: 50 try: 51 i = next(indices) 52 except StopIteration: 53 break 54 55 line += repr(data[i : i + 1])[2:-1].replace("'", r'\'') 56 57 if not line: 58 return ''.join(output) 59 60 output.append(" b'{}'\n".format(''.join(line))) 61 62 63# This is an empty ELF file. It was created from the ELF file for 64# tokenize_test.cc with the command: 65# 66# arm-none-eabi-objcopy -S --only-section NO_SECTIONS_PLEASE <ELF> <OUTPUT> 67# 68# The resulting ELF was converted to a Python binary string using 69# path_to_byte_string function above. 70EMPTY_ELF = ( 71 b'\x7fELF\x01\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00(\x00\x01' 72 b'\x00\x00\x00\xd1\x83\x00\x084\x00\x00\x00\xe0\x00\x00\x00\x00\x04\x00\x05' 73 b'4\x00 \x00\x05\x00(\x00\x02\x00\x01\x00\x01\x00\x00\x00\xd4\x00\x00\x00' 74 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x07\x00' 75 b'\x00\x00\x00\x00\x01\x00\x01\x00\x00\x00\xd4\x00\x00\x00\x00\x00\x00\x00' 76 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00' 77 b'\x01\x00\x01\x00\x00\x00\xd4\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 78 b'\x00\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x01\x00\x01\x00' 79 b'\x00\x00\xd4\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 80 b'\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x01\x00\x01\x00\x00\x00\xd4\x00' 81 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 82 b'\x06\x00\x00\x00\x00\x00\x01\x00\x00.shstrtab\x00\x00\x00\x00\x00\x00\x00' 83 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 84 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01' 85 b'\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xd4\x00\x00' 86 b'\x00\x0b\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00' 87 b'\x00\x00\x00' 88) 89 90# This is an ELF file with only the pw_tokenizer sections. It was created 91# from a tokenize_test binary built for the STM32F429i Discovery board. The 92# pw_tokenizer sections were extracted with this command: 93# 94# arm-none-eabi-objcopy -S --only-section ".pw_tokenizer*" <ELF> <OUTPUT> 95# 96ELF_WITH_TOKENIZER_SECTIONS_PATH = Path(__file__).parent.joinpath( 97 'example_binary_with_tokenized_strings.elf' 98) 99ELF_WITH_TOKENIZER_SECTIONS = ELF_WITH_TOKENIZER_SECTIONS_PATH.read_bytes() 100 101TOKENS_IN_ELF = 22 102TOKENS_IN_ELF_WITH_TOKENIZER_SECTIONS = 26 103 104# 0x2e668cd6 is 'Jello, world!' (which is also used in database_test.py). 105JELLO_WORLD_TOKEN = b'\xd6\x8c\x66\x2e' 106 107 108class DetokenizeTest(unittest.TestCase): 109 """Tests the detokenize.Detokenizer.""" 110 111 def test_simple(self): 112 detok = detokenize.Detokenizer( 113 tokens.Database( 114 [ 115 tokens.TokenizedStringEntry( 116 0xCDAB, '%02d %s %c%%', date_removed=dt.datetime.now() 117 ) 118 ] 119 ) 120 ) 121 self.assertEqual( 122 str(detok.detokenize(b'\xab\xcd\0\0\x02\x03Two\x66')), '01 Two 3%' 123 ) 124 125 def test_detokenize_extra_data_is_unsuccessful(self): 126 detok = detokenize.Detokenizer( 127 tokens.Database( 128 [ 129 tokens.TokenizedStringEntry( 130 1, 'no args', date_removed=dt.datetime(1, 1, 1) 131 ) 132 ] 133 ) 134 ) 135 136 result = detok.detokenize(b'\x01\0\0\0\x04args') 137 self.assertEqual(len(result.failures), 1) 138 string, args, remaining = result.failures[0] 139 self.assertEqual('no args', string) 140 self.assertFalse(args) 141 self.assertEqual(b'\x04args', remaining) 142 self.assertEqual('no args', string) 143 self.assertEqual('no args', str(result)) 144 145 def test_detokenize_zero_extend_short_token_with_no_args(self): 146 detok = detokenize.Detokenizer( 147 tokens.Database( 148 [tokens.TokenizedStringEntry(0xCDAB, 'This token is 16 bits')] 149 ) 150 ) 151 self.assertEqual( 152 str(detok.detokenize(b'\xab\xcd')), 'This token is 16 bits' 153 ) 154 155 def test_detokenize_missing_data_is_unsuccessful(self): 156 detok = detokenize.Detokenizer( 157 tokens.Database( 158 [ 159 tokens.TokenizedStringEntry( 160 2, '%s', date_removed=dt.datetime(1, 1, 1) 161 ) 162 ] 163 ) 164 ) 165 166 result = detok.detokenize(b'\x02\0\0\0') 167 string, args, remaining = result.failures[0] 168 self.assertEqual('%s', string) 169 self.assertEqual(len(args), 1) 170 self.assertEqual(b'', remaining) 171 self.assertEqual(len(result.failures), 1) 172 self.assertEqual('%s', str(result)) 173 174 def test_detokenize_missing_data_with_errors_is_unsuccessful(self): 175 detok = detokenize.Detokenizer( 176 tokens.Database( 177 [ 178 tokens.TokenizedStringEntry( 179 2, '%s', date_removed=dt.datetime(1, 1, 1) 180 ) 181 ] 182 ), 183 show_errors=True, 184 ) 185 186 result = detok.detokenize(b'\x02\0\0\0') 187 string, args, remaining = result.failures[0] 188 self.assertIn('%s MISSING', string) 189 self.assertEqual(len(args), 1) 190 self.assertEqual(b'', remaining) 191 self.assertEqual(len(result.failures), 1) 192 self.assertIn('%s MISSING', str(result)) 193 194 def test_unparsed_data(self): 195 detok = detokenize.Detokenizer( 196 tokens.Database( 197 [ 198 tokens.TokenizedStringEntry( 199 1, 'no args', date_removed=dt.datetime(100, 1, 1) 200 ), 201 ] 202 ) 203 ) 204 result = detok.detokenize(b'\x01\0\0\0o_o') 205 self.assertFalse(result.ok()) 206 self.assertEqual('no args', str(result)) 207 self.assertIn('o_o', repr(result)) 208 self.assertIn('decoding failed', result.error_message()) 209 210 def test_empty_db(self): 211 detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF)) 212 self.assertFalse(detok.detokenize(b'\x12\x34\0\0').ok()) 213 self.assertIn( 214 'unknown token', detok.detokenize(b'1234').error_message() 215 ) 216 self.assertIn('unknown token', repr(detok.detokenize(b'1234'))) 217 218 self.assertEqual( 219 '$' + base64.b64encode(b'1234').decode(), 220 str(detok.detokenize(b'1234')), 221 ) 222 223 self.assertIsNone(detok.detokenize(b'').token) 224 225 def test_empty_db_show_errors(self): 226 detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF), show_errors=True) 227 self.assertFalse(detok.detokenize(b'\x12\x34\0\0').ok()) 228 self.assertIn( 229 'unknown token', detok.detokenize(b'1234').error_message() 230 ) 231 self.assertIn('unknown token', repr(detok.detokenize(b'1234'))) 232 self.assertIn('unknown token', str(detok.detokenize(b'1234'))) 233 234 self.assertIsNone(detok.detokenize(b'').token) 235 236 def test_missing_token_show_errors(self): 237 detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF), show_errors=True) 238 self.assertIn('missing token', detok.detokenize(b'').error_message()) 239 self.assertIn('missing token', str(detok.detokenize(b''))) 240 241 def test_missing_token(self): 242 detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF)) 243 self.assertIn('missing token', detok.detokenize(b'').error_message()) 244 self.assertEqual('$', str(detok.detokenize(b''))) 245 246 def test_unknown_shorter_token_show_error(self): 247 detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF), show_errors=True) 248 249 self.assertIn('unknown token', detok.detokenize(b'1').error_message()) 250 self.assertIn('unknown token', str(detok.detokenize(b'1'))) 251 self.assertIn('unknown token', repr(detok.detokenize(b'1'))) 252 253 self.assertIn('unknown token', detok.detokenize(b'123').error_message()) 254 self.assertIn('unknown token', str(detok.detokenize(b'123'))) 255 self.assertIn('unknown token', repr(detok.detokenize(b'123'))) 256 257 def test_unknown_shorter_token(self): 258 detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF)) 259 260 self.assertEqual( 261 'unknown token 00000001', detok.detokenize(b'\1').error_message() 262 ) 263 self.assertEqual( 264 '$' + base64.b64encode(b'\1\0\0\0').decode(), 265 str(detok.detokenize(b'\1')), 266 ) 267 self.assertIn('unknown token 00000001', repr(detok.detokenize(b'\1'))) 268 269 self.assertEqual( 270 'unknown token 00030201', 271 detok.detokenize(b'\1\2\3').error_message(), 272 ) 273 self.assertEqual( 274 '$' + base64.b64encode(b'\1\2\3\0').decode(), 275 str(detok.detokenize(b'\1\2\3')), 276 ) 277 self.assertIn( 278 'unknown token 00030201', repr(detok.detokenize(b'\1\2\3')) 279 ) 280 281 def test_decode_from_elf_data(self): 282 detok = detokenize.Detokenizer(io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS)) 283 284 self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok()) 285 self.assertEqual( 286 str(detok.detokenize(JELLO_WORLD_TOKEN)), 'Jello, world!' 287 ) 288 289 undecoded_args = detok.detokenize(JELLO_WORLD_TOKEN + b'some junk') 290 self.assertFalse(undecoded_args.ok()) 291 self.assertEqual(str(undecoded_args), 'Jello, world!') 292 293 self.assertTrue(detok.detokenize(b'\0\0\0\0').ok()) 294 self.assertEqual(str(detok.detokenize(b'\0\0\0\0')), '') 295 296 def test_decode_from_elf_file(self): 297 """Test decoding from an elf file.""" 298 detok = detokenize.Detokenizer(io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS)) 299 expected_tokens = frozenset(detok.database.token_to_entries.keys()) 300 301 with tempfile.NamedTemporaryFile('wb', delete=False) as elf: 302 try: 303 elf.write(ELF_WITH_TOKENIZER_SECTIONS) 304 elf.close() 305 306 # Open ELF by file object 307 with open(elf.name, 'rb') as fd: 308 detok = detokenize.Detokenizer(fd) 309 310 self.assertEqual( 311 expected_tokens, 312 frozenset(detok.database.token_to_entries.keys()), 313 ) 314 315 # Open ELF by path 316 detok = detokenize.Detokenizer(elf.name) 317 self.assertEqual( 318 expected_tokens, 319 frozenset(detok.database.token_to_entries.keys()), 320 ) 321 322 # Open ELF by elf_reader.Elf 323 with open(elf.name, 'rb') as fd: 324 detok = detokenize.Detokenizer(elf_reader.Elf(fd)) 325 326 self.assertEqual( 327 expected_tokens, 328 frozenset(detok.database.token_to_entries.keys()), 329 ) 330 finally: 331 os.unlink(elf.name) 332 333 def test_decode_from_csv_file(self): 334 detok = detokenize.Detokenizer(io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS)) 335 expected_tokens = frozenset(detok.database.token_to_entries.keys()) 336 337 csv_database = str(detok.database) 338 self.assertEqual(len(csv_database.splitlines()), TOKENS_IN_ELF) 339 340 with tempfile.NamedTemporaryFile('w', delete=False) as csv_file: 341 try: 342 csv_file.write(csv_database) 343 csv_file.close() 344 345 # Open CSV by path 346 detok = detokenize.Detokenizer(csv_file.name) 347 self.assertEqual( 348 expected_tokens, 349 frozenset(detok.database.token_to_entries.keys()), 350 ) 351 352 # Open CSV by file object 353 with open(csv_file.name) as fd: 354 detok = detokenize.Detokenizer(fd) 355 356 self.assertEqual( 357 expected_tokens, 358 frozenset(detok.database.token_to_entries.keys()), 359 ) 360 finally: 361 os.unlink(csv_file.name) 362 363 def test_create_detokenizer_with_token_database(self): 364 detok = detokenize.Detokenizer(io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS)) 365 expected_tokens = frozenset(detok.database.token_to_entries.keys()) 366 367 detok = detokenize.Detokenizer(detok.database) 368 self.assertEqual( 369 expected_tokens, frozenset(detok.database.token_to_entries.keys()) 370 ) 371 372 373class DetokenizeWithCollisions(unittest.TestCase): 374 """Tests collision resolution.""" 375 376 def setUp(self): 377 super().setUp() 378 token = 0xBAAD 379 380 # Database with several conflicting tokens. 381 self.detok = detokenize.Detokenizer( 382 tokens.Database( 383 [ 384 tokens.TokenizedStringEntry( 385 token, 'REMOVED', date_removed=dt.datetime(9, 1, 1) 386 ), 387 tokens.TokenizedStringEntry(token, 'newer'), 388 tokens.TokenizedStringEntry( 389 token, 'A: %d', date_removed=dt.datetime(30, 5, 9) 390 ), 391 tokens.TokenizedStringEntry( 392 token, 'B: %c', date_removed=dt.datetime(30, 5, 10) 393 ), 394 tokens.TokenizedStringEntry(token, 'C: %s'), 395 tokens.TokenizedStringEntry(token, '%d%u'), 396 tokens.TokenizedStringEntry(token, '%s%u %d'), 397 tokens.TokenizedStringEntry(1, '%s'), 398 tokens.TokenizedStringEntry(1, '%d'), 399 tokens.TokenizedStringEntry(2, 'Three %s %s %s'), 400 tokens.TokenizedStringEntry(2, 'Five %d %d %d %d %s'), 401 ] 402 ) 403 ) 404 405 def test_collision_no_args_favors_most_recently_present(self): 406 no_args = self.detok.detokenize(b'\xad\xba\0\0') 407 self.assertFalse(no_args.ok()) 408 self.assertEqual(len(no_args.successes), 2) 409 self.assertEqual(len(no_args.failures), 5) 410 self.assertEqual(len(no_args.matches()), 7) 411 self.assertEqual(str(no_args), 'newer') 412 self.assertEqual(len(no_args.best_result()[1]), 0) 413 self.assertEqual(no_args.best_result()[0], 'newer') 414 415 def test_collision_one_integer_arg_favors_most_recently_present(self): 416 multiple_correct = self.detok.detokenize(b'\xad\xba\0\0\x7a') 417 self.assertFalse(multiple_correct.ok()) 418 self.assertIn('ERROR', repr(multiple_correct)) 419 self.assertEqual(len(multiple_correct.successes), 2) 420 self.assertEqual(len(multiple_correct.failures), 5) 421 self.assertEqual(len(multiple_correct.matches()), 7) 422 self.assertEqual(str(multiple_correct), 'B: =') 423 424 def test_collision_one_integer_arg_favor_successful_decode(self): 425 # One string decodes successfully, since the arg is out of range for %c. 426 int_arg = self.detok.detokenize(b'\xad\xba\0\0\xfe\xff\xff\xff\x0f') 427 self.assertTrue(int_arg.ok()) 428 self.assertEqual(str(int_arg), 'A: 2147483647') 429 430 def test_collision_one_string_arg_favors_successful_decode(self): 431 # One string decodes successfully, since decoding the argument as an 432 # integer does not decode all the data. 433 string_arg = self.detok.detokenize(b'\xad\xba\0\0\x02Hi') 434 self.assertTrue(string_arg.ok()) 435 self.assertEqual(str(string_arg), 'C: Hi') 436 437 def test_collision_one_string_arg_favors_decoding_all_data(self): 438 result = self.detok.detokenize(b'\1\0\0\0\x83hi') 439 self.assertEqual(len(result.failures), 2) 440 # Should resolve to the string since %d would leave one byte behind. 441 self.assertEqual(str(result), '%s') 442 443 def test_collision_multiple_args_favors_decoding_more_arguments(self): 444 result = self.detok.detokenize(b'\2\0\0\0\1\2\1\4\5') 445 self.assertEqual(len(result.matches()), 2) 446 self.assertEqual(result.matches()[0][0], 'Five -1 1 -1 2 %s') 447 self.assertEqual(result.matches()[1][0], 'Three \2 \4 %s') 448 449 def test_collision_multiple_args_favors_decoding_all_arguments(self): 450 unambiguous = self.detok.detokenize(b'\xad\xba\0\0\x01#\x00\x01') 451 self.assertTrue(unambiguous.ok()) 452 self.assertEqual(len(unambiguous.matches()), 7) 453 self.assertEqual('#0 -1', str(unambiguous)) 454 self.assertIn('#0 -1', repr(unambiguous)) 455 456 457class ManualPoolExecutor(concurrent.futures.Executor): 458 """A stubbed pool executor that captures the most recent work request 459 and holds it until the public process method is manually called.""" 460 461 def __init__(self): 462 super().__init__() 463 self._func = None 464 465 # pylint: disable=arguments-differ 466 def submit(self, func, *args, **kwargs): 467 """Submits work to the pool, stashing the partial for later use.""" 468 self._func = functools.partial(func, *args, **kwargs) 469 470 def process(self): 471 """Processes the latest func submitted to the pool.""" 472 if self._func is not None: 473 self._func() 474 self._func = None 475 476 477class InlinePoolExecutor(concurrent.futures.Executor): 478 """A stubbed pool executor that runs work immediately, inline.""" 479 480 # pylint: disable=arguments-differ 481 def submit(self, func, *args, **kwargs): 482 """Submits work to the pool, stashing the partial for later use.""" 483 func(*args, **kwargs) 484 485 486@mock.patch('os.path.getmtime') 487class AutoUpdatingDetokenizerTest(unittest.TestCase): 488 """Tests the AutoUpdatingDetokenizer class.""" 489 490 def test_update(self, mock_getmtime): 491 """Tests the update command.""" 492 493 db = database.load_token_database( 494 io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS) 495 ) 496 self.assertEqual(len(db), TOKENS_IN_ELF) 497 498 the_time = [100] 499 500 def move_back_time_if_file_exists(path): 501 if os.path.exists(path): 502 the_time[0] -= 1 503 return the_time[0] 504 505 raise FileNotFoundError 506 507 mock_getmtime.side_effect = move_back_time_if_file_exists 508 509 with tempfile.NamedTemporaryFile('wb', delete=False) as file: 510 try: 511 file.close() 512 513 pool = ManualPoolExecutor() 514 detok = detokenize.AutoUpdatingDetokenizer( 515 file.name, min_poll_period_s=0, pool=pool 516 ) 517 self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok()) 518 519 with open(file.name, 'wb') as fd: 520 tokens.write_binary(db, fd) 521 522 # After the change but before the pool runs in another thread, 523 # the token should not exist. 524 self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok()) 525 526 # After the pool is allowed to process, it should. 527 pool.process() 528 self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok()) 529 finally: 530 os.unlink(file.name) 531 532 def test_update_with_directory(self, mock_getmtime): 533 """Tests the update command with a directory format database.""" 534 db = database.load_token_database( 535 io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS) 536 ) 537 self.assertEqual(len(db), TOKENS_IN_ELF) 538 539 the_time = [100] 540 541 def move_back_time_if_file_exists(path): 542 if os.path.exists(path): 543 the_time[0] -= 1 544 return the_time[0] 545 546 raise FileNotFoundError 547 548 mock_getmtime.side_effect = move_back_time_if_file_exists 549 550 with tempfile.TemporaryDirectory() as dbdir: 551 with tempfile.NamedTemporaryFile( 552 'wb', delete=False, suffix='.pw_tokenizer.csv', dir=dbdir 553 ) as matching_suffix_file, tempfile.NamedTemporaryFile( 554 'wb', delete=False, suffix='.not.right', dir=dbdir 555 ) as mismatched_suffix_file: 556 try: 557 matching_suffix_file.close() 558 mismatched_suffix_file.close() 559 560 pool = ManualPoolExecutor() 561 detok = detokenize.AutoUpdatingDetokenizer( 562 dbdir, min_poll_period_s=0, pool=pool 563 ) 564 self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok()) 565 566 with open(mismatched_suffix_file.name, 'wb') as fd: 567 tokens.write_csv(db, fd) 568 pool.process() 569 self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok()) 570 571 with open(matching_suffix_file.name, 'wb') as fd: 572 tokens.write_csv(db, fd) 573 574 # After the change but before the pool runs in another 575 # thread, the token should not exist. 576 self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok()) 577 pool.process() 578 579 # After the pool is allowed to process, it should. 580 self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok()) 581 finally: 582 os.unlink(mismatched_suffix_file.name) 583 os.unlink(matching_suffix_file.name) 584 os.rmdir(dbdir) 585 586 # The database stays around if the file is deleted. 587 self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok()) 588 589 def test_no_update_if_time_is_same(self, mock_getmtime): 590 mock_getmtime.return_value = 100 591 592 with tempfile.NamedTemporaryFile('wb', delete=False) as file: 593 try: 594 tokens.write_csv( 595 database.load_token_database( 596 io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS) 597 ), 598 file, 599 ) 600 file.close() 601 602 detok = detokenize.AutoUpdatingDetokenizer( 603 file.name, min_poll_period_s=0, pool=InlinePoolExecutor() 604 ) 605 self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok()) 606 607 # Empty the database, but keep the mock modified time the same. 608 with open(file.name, 'wb'): 609 pass 610 611 self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok()) 612 self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok()) 613 614 # Move back time so the now-empty file is reloaded. 615 mock_getmtime.return_value = 50 616 self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok()) 617 finally: 618 os.unlink(file.name) 619 620 def test_token_domain_in_str(self, _) -> None: 621 """Tests a str containing a domain""" 622 detok = detokenize.AutoUpdatingDetokenizer( 623 f'{ELF_WITH_TOKENIZER_SECTIONS_PATH}#.*', 624 min_poll_period_s=0, 625 pool=InlinePoolExecutor(), 626 ) 627 self.assertEqual( 628 len(detok.database), TOKENS_IN_ELF_WITH_TOKENIZER_SECTIONS 629 ) 630 631 def test_token_domain_in_path(self, _) -> None: 632 """Tests a Path() containing a domain""" 633 detok = detokenize.AutoUpdatingDetokenizer( 634 Path(f'{ELF_WITH_TOKENIZER_SECTIONS_PATH}#.*'), 635 min_poll_period_s=0, 636 pool=InlinePoolExecutor(), 637 ) 638 self.assertEqual( 639 len(detok.database), TOKENS_IN_ELF_WITH_TOKENIZER_SECTIONS 640 ) 641 642 def test_token_no_domain_in_str(self, _) -> None: 643 """Tests a str without a domain""" 644 detok = detokenize.AutoUpdatingDetokenizer( 645 str(ELF_WITH_TOKENIZER_SECTIONS_PATH), 646 min_poll_period_s=0, 647 pool=InlinePoolExecutor(), 648 ) 649 self.assertEqual(len(detok.database), TOKENS_IN_ELF) 650 651 def test_token_no_domain_in_path(self, _) -> None: 652 """Tests a Path() without a domain""" 653 detok = detokenize.AutoUpdatingDetokenizer( 654 ELF_WITH_TOKENIZER_SECTIONS_PATH, 655 min_poll_period_s=0, 656 pool=InlinePoolExecutor(), 657 ) 658 self.assertEqual(len(detok.database), TOKENS_IN_ELF) 659 660 661def _next_char(message: bytes) -> bytes: 662 return bytes(b + 1 for b in message) 663 664 665class NestedMessageParserTest(unittest.TestCase): 666 """Tests parsing prefixed messages.""" 667 668 class _Case(NamedTuple): 669 data: bytes 670 expected: bytes 671 title: str 672 transform: Callable[[bytes], bytes] = _next_char 673 674 TRANSFORM_TEST_CASES = ( 675 _Case(b'$abcd', b'%bcde', 'single message'), 676 _Case( 677 b'$$WHAT?$abc$WHY? is this $ok $', 678 b'%%WHAT?%bcd%WHY? is this %ok %', 679 'message and non-message', 680 ), 681 _Case(b'$1$', b'%1%', 'empty message'), 682 _Case(b'$abc$defgh', b'%bcd%efghh', 'sequential message'), 683 _Case( 684 b'w$abcx$defygh$$abz', 685 b'w$ABCx$DEFygh$$ABz', 686 'interspersed start/end non-message', 687 bytes.upper, 688 ), 689 _Case( 690 b'$abcx$defygh$$ab', 691 b'$ABCx$DEFygh$$AB', 692 'interspersed start/end message ', 693 bytes.upper, 694 ), 695 ) 696 697 def setUp(self) -> None: 698 self.decoder = detokenize.NestedMessageParser('$', 'abcdefg') 699 700 def test_transform_io(self) -> None: 701 for data, expected, title, transform in self.TRANSFORM_TEST_CASES: 702 self.assertEqual( 703 expected, 704 b''.join( 705 self.decoder.transform_io(io.BytesIO(data), transform) 706 ), 707 f'{title}: {data!r}', 708 ) 709 710 def test_transform_bytes_with_flush(self) -> None: 711 for data, expected, title, transform in self.TRANSFORM_TEST_CASES: 712 self.assertEqual( 713 expected, 714 self.decoder.transform(data, transform, flush=True), 715 f'{title}: {data!r}', 716 ) 717 718 def test_transform_bytes_sequential(self) -> None: 719 transform = lambda message: message.upper().replace(b'$', b'*') 720 721 self.assertEqual(self.decoder.transform(b'abc$abcd', transform), b'abc') 722 self.assertEqual(self.decoder.transform(b'$', transform), b'*ABCD') 723 self.assertEqual(self.decoder.transform(b'$b', transform), b'*') 724 self.assertEqual(self.decoder.transform(b'', transform), b'') 725 self.assertEqual(self.decoder.transform(b' ', transform), b'*B ') 726 self.assertEqual(self.decoder.transform(b'hello', transform), b'hello') 727 self.assertEqual(self.decoder.transform(b'?? $ab', transform), b'?? ') 728 self.assertEqual( 729 self.decoder.transform(b'123$ab4$56$a', transform), b'*AB123*AB4*56' 730 ) 731 self.assertEqual( 732 self.decoder.transform(b'bc', transform, flush=True), b'*ABC' 733 ) 734 735 MESSAGES_TEST: Any = ( 736 (b'123$abc456$a', (False, b'123'), (True, b'$abc'), (False, b'456')), 737 (b'7$abcd', (True, b'$a'), (False, b'7')), 738 (b'e',), 739 (b'',), 740 (b'$', (True, b'$abcde')), 741 (b'$', (True, b'$')), 742 (b'$a$b$c', (True, b'$'), (True, b'$a'), (True, b'$b')), 743 (b'1', (True, b'$c'), (False, b'1')), 744 (b'',), 745 (b'?', (False, b'?')), 746 (b'!@', (False, b'!@')), 747 (b'%^&', (False, b'%^&')), 748 ) 749 750 def test_read_messages(self) -> None: 751 for step in self.MESSAGES_TEST: 752 data: bytes = step[0] 753 pieces: tuple[tuple[bool, bytes], ...] = step[1:] 754 self.assertEqual(tuple(self.decoder.read_messages(data)), pieces) 755 756 def test_read_messages_flush(self) -> None: 757 self.assertEqual( 758 list(self.decoder.read_messages(b'123$a')), [(False, b'123')] 759 ) 760 self.assertEqual(list(self.decoder.read_messages(b'b')), []) 761 self.assertEqual( 762 list(self.decoder.read_messages(b'', flush=True)), [(True, b'$ab')] 763 ) 764 765 def test_read_messages_io(self) -> None: 766 # Rework the read_messages test data for stream input. 767 data = io.BytesIO(b''.join(step[0] for step in self.MESSAGES_TEST)) 768 expected_pieces = sum((step[1:] for step in self.MESSAGES_TEST), ()) 769 770 result = self.decoder.read_messages_io(data) 771 for expected_is_message, expected_data in expected_pieces: 772 if expected_is_message: 773 is_message, piece = next(result) 774 self.assertTrue(is_message) 775 self.assertEqual(expected_data, piece) 776 else: # the IO version yields non-messages byte by byte 777 for byte in expected_data: 778 is_message, piece = next(result) 779 self.assertFalse(is_message) 780 self.assertEqual(bytes([byte]), piece) 781 782 783class DetokenizeNested(unittest.TestCase): 784 """Tests detokenizing nested tokens""" 785 786 def test_nested_hashed_arg(self): 787 detok = detokenize.Detokenizer( 788 tokens.Database( 789 [ 790 tokens.TokenizedStringEntry(0xA, 'tokenized argument'), 791 tokens.TokenizedStringEntry( 792 2, 793 'This is a ' + '$#%08x', 794 ), 795 ] 796 ) 797 ) 798 self.assertEqual( 799 str(detok.detokenize(b'\x02\0\0\0\x14')), 800 'This is a tokenized argument', 801 ) 802 803 def test_nested_base64_arg(self): 804 detok = detokenize.Detokenizer( 805 tokens.Database( 806 [ 807 tokens.TokenizedStringEntry(1, 'base64 argument'), 808 tokens.TokenizedStringEntry(2, 'This is a %s'), 809 ] 810 ) 811 ) 812 self.assertEqual( 813 str(detok.detokenize(b'\x02\0\0\0\x09$AQAAAA==')), # token for 1 814 'This is a base64 argument', 815 ) 816 817 def test_deeply_nested_arg(self): 818 detok = detokenize.Detokenizer( 819 tokens.Database( 820 [ 821 tokens.TokenizedStringEntry(1, '$10#0000000005'), 822 tokens.TokenizedStringEntry(2, 'This is a $#%08x'), 823 tokens.TokenizedStringEntry(3, 'deeply nested argument'), 824 tokens.TokenizedStringEntry(4, '$AQAAAA=='), 825 tokens.TokenizedStringEntry(5, '$AwAAAA=='), 826 ] 827 ) 828 ) 829 self.assertEqual( 830 str(detok.detokenize(b'\x02\0\0\0\x08')), # token for 4 831 'This is a deeply nested argument', 832 ) 833 834 835class DetokenizeBase64(unittest.TestCase): 836 """Tests detokenizing Base64 messages.""" 837 838 JELLO = b'$' + base64.b64encode(JELLO_WORLD_TOKEN) 839 840 RECURSION_STRING = f'The secret message is "{JELLO.decode()}"' 841 RECURSION = b'$' + base64.b64encode( 842 struct.pack('I', tokens.c_hash(RECURSION_STRING)) 843 ) 844 845 RECURSION_STRING_2 = f"'{RECURSION.decode()}', said the spy." 846 RECURSION_2 = b'$' + base64.b64encode( 847 struct.pack('I', tokens.c_hash(RECURSION_STRING_2)) 848 ) 849 850 TEST_CASES = ( 851 (b'', b''), 852 (b'nothing here', b'nothing here'), 853 (JELLO, b'Jello, world!'), 854 (JELLO + b'a', b'Jello, world!a'), 855 (JELLO + b'abc', b'Jello, world!abc'), 856 (JELLO + b'abc=', b'Jello, world!abc='), 857 (b'$a' + JELLO + b'a', b'$aJello, world!a'), 858 (b'Hello ' + JELLO + b'?', b'Hello Jello, world!?'), 859 (b'$' + JELLO, b'$Jello, world!'), 860 (JELLO + JELLO, b'Jello, world!Jello, world!'), 861 (JELLO + b'$' + JELLO, b'Jello, world!$Jello, world!'), 862 (JELLO + b'$a' + JELLO + b'bcd', b'Jello, world!$aJello, world!bcd'), 863 (b'$3141', b'$3141'), 864 (JELLO + b'$3141', b'Jello, world!$3141'), 865 ( 866 JELLO + b'$a' + JELLO + b'b' + JELLO + b'c', 867 b'Jello, world!$aJello, world!bJello, world!c', 868 ), 869 (RECURSION, b'The secret message is "Jello, world!"'), 870 ( 871 RECURSION_2, 872 b'\'The secret message is "Jello, world!"\', said the spy.', 873 ), 874 ) 875 876 def setUp(self): 877 super().setUp() 878 db = database.load_token_database( 879 io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS) 880 ) 881 db.add( 882 tokens.TokenizedStringEntry(tokens.c_hash(s), s) 883 for s in [self.RECURSION_STRING, self.RECURSION_STRING_2] 884 ) 885 self.detok = detokenize.Detokenizer(db) 886 887 def test_detokenize_base64_live(self): 888 for data, expected in self.TEST_CASES: 889 output = io.BytesIO() 890 self.detok.detokenize_base64_live(io.BytesIO(data), output, '$') 891 892 self.assertEqual(expected, output.getvalue(), f'Input: {data!r}') 893 894 def test_detokenize_base64_to_file(self): 895 for data, expected in self.TEST_CASES: 896 output = io.BytesIO() 897 self.detok.detokenize_base64_to_file(data, output, '$') 898 899 self.assertEqual(expected, output.getvalue()) 900 901 def test_detokenize_base64(self): 902 for data, expected in self.TEST_CASES: 903 self.assertEqual(expected, self.detok.detokenize_base64(data, b'$')) 904 905 def test_detokenize_base64_str(self): 906 for data, expected in self.TEST_CASES: 907 self.assertEqual( 908 expected.decode(), self.detok.detokenize_base64(data.decode()) 909 ) 910 911 912class DetokenizeInfiniteRecursion(unittest.TestCase): 913 """Tests that infinite Base64 token recursion resolves.""" 914 915 def setUp(self): 916 super().setUp() 917 self.detok = detokenize.Detokenizer( 918 tokens.Database( 919 [ 920 tokens.TokenizedStringEntry(0, '$AAAAAA=='), # token for 0 921 tokens.TokenizedStringEntry(1, '$AgAAAA=='), # token for 2 922 tokens.TokenizedStringEntry(2, '$#00000003'), # token for 3 923 tokens.TokenizedStringEntry(3, '$AgAAAA=='), # token for 2 924 ] 925 ) 926 ) 927 928 def test_detokenize_self_recursion(self): 929 for depth in range(5): 930 self.assertEqual( 931 self.detok.detokenize_text( 932 b'This one is deep: $AAAAAA==', recursion=depth 933 ), 934 b'This one is deep: $AAAAAA==', 935 ) 936 937 def test_detokenize_self_recursion_default(self): 938 self.assertEqual( 939 self.detok.detokenize_text( 940 b'This one is deep: $AAAAAA==', 941 ), 942 b'This one is deep: $AAAAAA==', 943 ) 944 945 def test_detokenize_cyclic_recursion_even(self): 946 self.assertEqual( 947 self.detok.detokenize_text(b'I said "$AQAAAA=="', recursion=6), 948 b'I said "$AgAAAA=="', 949 ) 950 951 def test_detokenize_cyclic_recursion_odd(self): 952 self.assertEqual( 953 self.detok.detokenize_text(b'I said "$AQAAAA=="', recursion=7), 954 b'I said "$#00000003"', 955 ) 956 957 958class DetokenizeBase64InfiniteRecursion(unittest.TestCase): 959 """Tests that infinite Bas64 token recursion resolves.""" 960 961 def setUp(self): 962 super().setUp() 963 self.detok = detokenize.Detokenizer( 964 tokens.Database( 965 [ 966 tokens.TokenizedStringEntry(0, '$AAAAAA=='), # token for 0 967 tokens.TokenizedStringEntry(1, '$AgAAAA=='), # token for 2 968 tokens.TokenizedStringEntry(2, '$AwAAAA=='), # token for 3 969 tokens.TokenizedStringEntry(3, '$AgAAAA=='), # token for 2 970 ] 971 ) 972 ) 973 974 def test_detokenize_self_recursion(self): 975 for depth in range(5): 976 self.assertEqual( 977 self.detok.detokenize_base64( 978 b'This one is deep: $AAAAAA==', recursion=depth 979 ), 980 b'This one is deep: $AAAAAA==', 981 ) 982 983 def test_detokenize_self_recursion_default(self): 984 self.assertEqual( 985 self.detok.detokenize_base64(b'This one is deep: $64#AAAAAA=='), 986 b'This one is deep: $AAAAAA==', 987 ) 988 989 def test_detokenize_cyclic_recursion_even(self): 990 self.assertEqual( 991 self.detok.detokenize_base64(b'I said "$AQAAAA=="', recursion=2), 992 b'I said "$AgAAAA=="', 993 ) 994 995 def test_detokenize_cyclic_recursion_odd(self): 996 self.assertEqual( 997 self.detok.detokenize_base64(b'I said "$AQAAAA=="', recursion=3), 998 b'I said "$AwAAAA=="', 999 ) 1000 1001 1002if __name__ == '__main__': 1003 unittest.main() 1004