• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# Copyright 2020 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15"""Tests for detokenize."""
16
17import base64
18import concurrent
19import datetime as dt
20import functools
21import io
22import os
23from pathlib import Path
24import struct
25import tempfile
26from typing import Any, Callable, NamedTuple
27import unittest
28from unittest import mock
29
30from pw_tokenizer import database
31from pw_tokenizer import detokenize
32from pw_tokenizer import elf_reader
33from pw_tokenizer import tokens
34
35
36# This function is not part of this test. It was used to generate the binary
37# strings for EMPTY_ELF and ELF_WITH_TOKENIZER_SECTIONS. It takes a path and
38# returns a Python byte string suitable for copying into Python source code.
39def path_to_byte_string(path):
40    with open(path, 'rb') as fd:
41        data = fd.read()
42
43    output = []
44    indices = iter(range(len(data)))
45
46    while True:
47        line = ''
48
49        while len(line) < 70:
50            try:
51                i = next(indices)
52            except StopIteration:
53                break
54
55            line += repr(data[i : i + 1])[2:-1].replace("'", r'\'')
56
57        if not line:
58            return ''.join(output)
59
60        output.append("    b'{}'\n".format(''.join(line)))
61
62
63# This is an empty ELF file. It was created from the ELF file for
64# tokenize_test.cc with the command:
65#
66#   arm-none-eabi-objcopy -S --only-section NO_SECTIONS_PLEASE <ELF> <OUTPUT>
67#
68# The resulting ELF was converted to a Python binary string using
69# path_to_byte_string function above.
70EMPTY_ELF = (
71    b'\x7fELF\x01\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00(\x00\x01'
72    b'\x00\x00\x00\xd1\x83\x00\x084\x00\x00\x00\xe0\x00\x00\x00\x00\x04\x00\x05'
73    b'4\x00 \x00\x05\x00(\x00\x02\x00\x01\x00\x01\x00\x00\x00\xd4\x00\x00\x00'
74    b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x07\x00'
75    b'\x00\x00\x00\x00\x01\x00\x01\x00\x00\x00\xd4\x00\x00\x00\x00\x00\x00\x00'
76    b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00'
77    b'\x01\x00\x01\x00\x00\x00\xd4\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
78    b'\x00\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x01\x00\x01\x00'
79    b'\x00\x00\xd4\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
80    b'\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x01\x00\x01\x00\x00\x00\xd4\x00'
81    b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
82    b'\x06\x00\x00\x00\x00\x00\x01\x00\x00.shstrtab\x00\x00\x00\x00\x00\x00\x00'
83    b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
84    b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01'
85    b'\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xd4\x00\x00'
86    b'\x00\x0b\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00'
87    b'\x00\x00\x00'
88)
89
90# This is an ELF file with only the pw_tokenizer sections. It was created
91# from a tokenize_test binary built for the STM32F429i Discovery board. The
92# pw_tokenizer sections were extracted with this command:
93#
94#   arm-none-eabi-objcopy -S --only-section ".pw_tokenizer*" <ELF> <OUTPUT>
95#
96ELF_WITH_TOKENIZER_SECTIONS_PATH = Path(__file__).parent.joinpath(
97    'example_binary_with_tokenized_strings.elf'
98)
99ELF_WITH_TOKENIZER_SECTIONS = ELF_WITH_TOKENIZER_SECTIONS_PATH.read_bytes()
100
101TOKENS_IN_ELF = 22
102TOKENS_IN_ELF_WITH_TOKENIZER_SECTIONS = 26
103
104# 0x2e668cd6 is 'Jello, world!' (which is also used in database_test.py).
105JELLO_WORLD_TOKEN = b'\xd6\x8c\x66\x2e'
106
107
108class DetokenizeTest(unittest.TestCase):
109    """Tests the detokenize.Detokenizer."""
110
111    def test_simple(self):
112        detok = detokenize.Detokenizer(
113            tokens.Database(
114                [
115                    tokens.TokenizedStringEntry(
116                        0xCDAB, '%02d %s %c%%', date_removed=dt.datetime.now()
117                    )
118                ]
119            )
120        )
121        self.assertEqual(
122            str(detok.detokenize(b'\xab\xcd\0\0\x02\x03Two\x66')), '01 Two 3%'
123        )
124
125    def test_detokenize_extra_data_is_unsuccessful(self):
126        detok = detokenize.Detokenizer(
127            tokens.Database(
128                [
129                    tokens.TokenizedStringEntry(
130                        1, 'no args', date_removed=dt.datetime(1, 1, 1)
131                    )
132                ]
133            )
134        )
135
136        result = detok.detokenize(b'\x01\0\0\0\x04args')
137        self.assertEqual(len(result.failures), 1)
138        string, args, remaining = result.failures[0]
139        self.assertEqual('no args', string)
140        self.assertFalse(args)
141        self.assertEqual(b'\x04args', remaining)
142        self.assertEqual('no args', string)
143        self.assertEqual('no args', str(result))
144
145    def test_detokenize_zero_extend_short_token_with_no_args(self):
146        detok = detokenize.Detokenizer(
147            tokens.Database(
148                [tokens.TokenizedStringEntry(0xCDAB, 'This token is 16 bits')]
149            )
150        )
151        self.assertEqual(
152            str(detok.detokenize(b'\xab\xcd')), 'This token is 16 bits'
153        )
154
155    def test_detokenize_missing_data_is_unsuccessful(self):
156        detok = detokenize.Detokenizer(
157            tokens.Database(
158                [
159                    tokens.TokenizedStringEntry(
160                        2, '%s', date_removed=dt.datetime(1, 1, 1)
161                    )
162                ]
163            )
164        )
165
166        result = detok.detokenize(b'\x02\0\0\0')
167        string, args, remaining = result.failures[0]
168        self.assertEqual('%s', string)
169        self.assertEqual(len(args), 1)
170        self.assertEqual(b'', remaining)
171        self.assertEqual(len(result.failures), 1)
172        self.assertEqual('%s', str(result))
173
174    def test_detokenize_missing_data_with_errors_is_unsuccessful(self):
175        detok = detokenize.Detokenizer(
176            tokens.Database(
177                [
178                    tokens.TokenizedStringEntry(
179                        2, '%s', date_removed=dt.datetime(1, 1, 1)
180                    )
181                ]
182            ),
183            show_errors=True,
184        )
185
186        result = detok.detokenize(b'\x02\0\0\0')
187        string, args, remaining = result.failures[0]
188        self.assertIn('%s MISSING', string)
189        self.assertEqual(len(args), 1)
190        self.assertEqual(b'', remaining)
191        self.assertEqual(len(result.failures), 1)
192        self.assertIn('%s MISSING', str(result))
193
194    def test_unparsed_data(self):
195        detok = detokenize.Detokenizer(
196            tokens.Database(
197                [
198                    tokens.TokenizedStringEntry(
199                        1, 'no args', date_removed=dt.datetime(100, 1, 1)
200                    ),
201                ]
202            )
203        )
204        result = detok.detokenize(b'\x01\0\0\0o_o')
205        self.assertFalse(result.ok())
206        self.assertEqual('no args', str(result))
207        self.assertIn('o_o', repr(result))
208        self.assertIn('decoding failed', result.error_message())
209
210    def test_empty_db(self):
211        detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF))
212        self.assertFalse(detok.detokenize(b'\x12\x34\0\0').ok())
213        self.assertIn(
214            'unknown token', detok.detokenize(b'1234').error_message()
215        )
216        self.assertIn('unknown token', repr(detok.detokenize(b'1234')))
217
218        self.assertEqual(
219            '$' + base64.b64encode(b'1234').decode(),
220            str(detok.detokenize(b'1234')),
221        )
222
223        self.assertIsNone(detok.detokenize(b'').token)
224
225    def test_empty_db_show_errors(self):
226        detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF), show_errors=True)
227        self.assertFalse(detok.detokenize(b'\x12\x34\0\0').ok())
228        self.assertIn(
229            'unknown token', detok.detokenize(b'1234').error_message()
230        )
231        self.assertIn('unknown token', repr(detok.detokenize(b'1234')))
232        self.assertIn('unknown token', str(detok.detokenize(b'1234')))
233
234        self.assertIsNone(detok.detokenize(b'').token)
235
236    def test_missing_token_show_errors(self):
237        detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF), show_errors=True)
238        self.assertIn('missing token', detok.detokenize(b'').error_message())
239        self.assertIn('missing token', str(detok.detokenize(b'')))
240
241    def test_missing_token(self):
242        detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF))
243        self.assertIn('missing token', detok.detokenize(b'').error_message())
244        self.assertEqual('$', str(detok.detokenize(b'')))
245
246    def test_unknown_shorter_token_show_error(self):
247        detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF), show_errors=True)
248
249        self.assertIn('unknown token', detok.detokenize(b'1').error_message())
250        self.assertIn('unknown token', str(detok.detokenize(b'1')))
251        self.assertIn('unknown token', repr(detok.detokenize(b'1')))
252
253        self.assertIn('unknown token', detok.detokenize(b'123').error_message())
254        self.assertIn('unknown token', str(detok.detokenize(b'123')))
255        self.assertIn('unknown token', repr(detok.detokenize(b'123')))
256
257    def test_unknown_shorter_token(self):
258        detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF))
259
260        self.assertEqual(
261            'unknown token 00000001', detok.detokenize(b'\1').error_message()
262        )
263        self.assertEqual(
264            '$' + base64.b64encode(b'\1\0\0\0').decode(),
265            str(detok.detokenize(b'\1')),
266        )
267        self.assertIn('unknown token 00000001', repr(detok.detokenize(b'\1')))
268
269        self.assertEqual(
270            'unknown token 00030201',
271            detok.detokenize(b'\1\2\3').error_message(),
272        )
273        self.assertEqual(
274            '$' + base64.b64encode(b'\1\2\3\0').decode(),
275            str(detok.detokenize(b'\1\2\3')),
276        )
277        self.assertIn(
278            'unknown token 00030201', repr(detok.detokenize(b'\1\2\3'))
279        )
280
281    def test_decode_from_elf_data(self):
282        detok = detokenize.Detokenizer(io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS))
283
284        self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
285        self.assertEqual(
286            str(detok.detokenize(JELLO_WORLD_TOKEN)), 'Jello, world!'
287        )
288
289        undecoded_args = detok.detokenize(JELLO_WORLD_TOKEN + b'some junk')
290        self.assertFalse(undecoded_args.ok())
291        self.assertEqual(str(undecoded_args), 'Jello, world!')
292
293        self.assertTrue(detok.detokenize(b'\0\0\0\0').ok())
294        self.assertEqual(str(detok.detokenize(b'\0\0\0\0')), '')
295
296    def test_decode_from_elf_file(self):
297        """Test decoding from an elf file."""
298        detok = detokenize.Detokenizer(io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS))
299        expected_tokens = frozenset(detok.database.token_to_entries.keys())
300
301        with tempfile.NamedTemporaryFile('wb', delete=False) as elf:
302            try:
303                elf.write(ELF_WITH_TOKENIZER_SECTIONS)
304                elf.close()
305
306                # Open ELF by file object
307                with open(elf.name, 'rb') as fd:
308                    detok = detokenize.Detokenizer(fd)
309
310                self.assertEqual(
311                    expected_tokens,
312                    frozenset(detok.database.token_to_entries.keys()),
313                )
314
315                # Open ELF by path
316                detok = detokenize.Detokenizer(elf.name)
317                self.assertEqual(
318                    expected_tokens,
319                    frozenset(detok.database.token_to_entries.keys()),
320                )
321
322                # Open ELF by elf_reader.Elf
323                with open(elf.name, 'rb') as fd:
324                    detok = detokenize.Detokenizer(elf_reader.Elf(fd))
325
326                self.assertEqual(
327                    expected_tokens,
328                    frozenset(detok.database.token_to_entries.keys()),
329                )
330            finally:
331                os.unlink(elf.name)
332
333    def test_decode_from_csv_file(self):
334        detok = detokenize.Detokenizer(io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS))
335        expected_tokens = frozenset(detok.database.token_to_entries.keys())
336
337        csv_database = str(detok.database)
338        self.assertEqual(len(csv_database.splitlines()), TOKENS_IN_ELF)
339
340        with tempfile.NamedTemporaryFile('w', delete=False) as csv_file:
341            try:
342                csv_file.write(csv_database)
343                csv_file.close()
344
345                # Open CSV by path
346                detok = detokenize.Detokenizer(csv_file.name)
347                self.assertEqual(
348                    expected_tokens,
349                    frozenset(detok.database.token_to_entries.keys()),
350                )
351
352                # Open CSV by file object
353                with open(csv_file.name) as fd:
354                    detok = detokenize.Detokenizer(fd)
355
356                self.assertEqual(
357                    expected_tokens,
358                    frozenset(detok.database.token_to_entries.keys()),
359                )
360            finally:
361                os.unlink(csv_file.name)
362
363    def test_create_detokenizer_with_token_database(self):
364        detok = detokenize.Detokenizer(io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS))
365        expected_tokens = frozenset(detok.database.token_to_entries.keys())
366
367        detok = detokenize.Detokenizer(detok.database)
368        self.assertEqual(
369            expected_tokens, frozenset(detok.database.token_to_entries.keys())
370        )
371
372
373class DetokenizeWithCollisions(unittest.TestCase):
374    """Tests collision resolution."""
375
376    def setUp(self):
377        super().setUp()
378        token = 0xBAAD
379
380        # Database with several conflicting tokens.
381        self.detok = detokenize.Detokenizer(
382            tokens.Database(
383                [
384                    tokens.TokenizedStringEntry(
385                        token, 'REMOVED', date_removed=dt.datetime(9, 1, 1)
386                    ),
387                    tokens.TokenizedStringEntry(token, 'newer'),
388                    tokens.TokenizedStringEntry(
389                        token, 'A: %d', date_removed=dt.datetime(30, 5, 9)
390                    ),
391                    tokens.TokenizedStringEntry(
392                        token, 'B: %c', date_removed=dt.datetime(30, 5, 10)
393                    ),
394                    tokens.TokenizedStringEntry(token, 'C: %s'),
395                    tokens.TokenizedStringEntry(token, '%d%u'),
396                    tokens.TokenizedStringEntry(token, '%s%u %d'),
397                    tokens.TokenizedStringEntry(1, '%s'),
398                    tokens.TokenizedStringEntry(1, '%d'),
399                    tokens.TokenizedStringEntry(2, 'Three %s %s %s'),
400                    tokens.TokenizedStringEntry(2, 'Five %d %d %d %d %s'),
401                ]
402            )
403        )
404
405    def test_collision_no_args_favors_most_recently_present(self):
406        no_args = self.detok.detokenize(b'\xad\xba\0\0')
407        self.assertFalse(no_args.ok())
408        self.assertEqual(len(no_args.successes), 2)
409        self.assertEqual(len(no_args.failures), 5)
410        self.assertEqual(len(no_args.matches()), 7)
411        self.assertEqual(str(no_args), 'newer')
412        self.assertEqual(len(no_args.best_result()[1]), 0)
413        self.assertEqual(no_args.best_result()[0], 'newer')
414
415    def test_collision_one_integer_arg_favors_most_recently_present(self):
416        multiple_correct = self.detok.detokenize(b'\xad\xba\0\0\x7a')
417        self.assertFalse(multiple_correct.ok())
418        self.assertIn('ERROR', repr(multiple_correct))
419        self.assertEqual(len(multiple_correct.successes), 2)
420        self.assertEqual(len(multiple_correct.failures), 5)
421        self.assertEqual(len(multiple_correct.matches()), 7)
422        self.assertEqual(str(multiple_correct), 'B: =')
423
424    def test_collision_one_integer_arg_favor_successful_decode(self):
425        # One string decodes successfully, since the arg is out of range for %c.
426        int_arg = self.detok.detokenize(b'\xad\xba\0\0\xfe\xff\xff\xff\x0f')
427        self.assertTrue(int_arg.ok())
428        self.assertEqual(str(int_arg), 'A: 2147483647')
429
430    def test_collision_one_string_arg_favors_successful_decode(self):
431        # One string decodes successfully, since decoding the argument as an
432        # integer does not decode all the data.
433        string_arg = self.detok.detokenize(b'\xad\xba\0\0\x02Hi')
434        self.assertTrue(string_arg.ok())
435        self.assertEqual(str(string_arg), 'C: Hi')
436
437    def test_collision_one_string_arg_favors_decoding_all_data(self):
438        result = self.detok.detokenize(b'\1\0\0\0\x83hi')
439        self.assertEqual(len(result.failures), 2)
440        # Should resolve to the string since %d would leave one byte behind.
441        self.assertEqual(str(result), '%s')
442
443    def test_collision_multiple_args_favors_decoding_more_arguments(self):
444        result = self.detok.detokenize(b'\2\0\0\0\1\2\1\4\5')
445        self.assertEqual(len(result.matches()), 2)
446        self.assertEqual(result.matches()[0][0], 'Five -1 1 -1 2 %s')
447        self.assertEqual(result.matches()[1][0], 'Three \2 \4 %s')
448
449    def test_collision_multiple_args_favors_decoding_all_arguments(self):
450        unambiguous = self.detok.detokenize(b'\xad\xba\0\0\x01#\x00\x01')
451        self.assertTrue(unambiguous.ok())
452        self.assertEqual(len(unambiguous.matches()), 7)
453        self.assertEqual('#0 -1', str(unambiguous))
454        self.assertIn('#0 -1', repr(unambiguous))
455
456
457class ManualPoolExecutor(concurrent.futures.Executor):
458    """A stubbed pool executor that captures the most recent work request
459    and holds it until the public process method is manually called."""
460
461    def __init__(self):
462        super().__init__()
463        self._func = None
464
465    # pylint: disable=arguments-differ
466    def submit(self, func, *args, **kwargs):
467        """Submits work to the pool, stashing the partial for later use."""
468        self._func = functools.partial(func, *args, **kwargs)
469
470    def process(self):
471        """Processes the latest func submitted to the pool."""
472        if self._func is not None:
473            self._func()
474            self._func = None
475
476
477class InlinePoolExecutor(concurrent.futures.Executor):
478    """A stubbed pool executor that runs work immediately, inline."""
479
480    # pylint: disable=arguments-differ
481    def submit(self, func, *args, **kwargs):
482        """Submits work to the pool, stashing the partial for later use."""
483        func(*args, **kwargs)
484
485
486@mock.patch('os.path.getmtime')
487class AutoUpdatingDetokenizerTest(unittest.TestCase):
488    """Tests the AutoUpdatingDetokenizer class."""
489
490    def test_update(self, mock_getmtime):
491        """Tests the update command."""
492
493        db = database.load_token_database(
494            io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS)
495        )
496        self.assertEqual(len(db), TOKENS_IN_ELF)
497
498        the_time = [100]
499
500        def move_back_time_if_file_exists(path):
501            if os.path.exists(path):
502                the_time[0] -= 1
503                return the_time[0]
504
505            raise FileNotFoundError
506
507        mock_getmtime.side_effect = move_back_time_if_file_exists
508
509        with tempfile.NamedTemporaryFile('wb', delete=False) as file:
510            try:
511                file.close()
512
513                pool = ManualPoolExecutor()
514                detok = detokenize.AutoUpdatingDetokenizer(
515                    file.name, min_poll_period_s=0, pool=pool
516                )
517                self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok())
518
519                with open(file.name, 'wb') as fd:
520                    tokens.write_binary(db, fd)
521
522                # After the change but before the pool runs in another thread,
523                # the token should not exist.
524                self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok())
525
526                # After the pool is allowed to process, it should.
527                pool.process()
528                self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
529            finally:
530                os.unlink(file.name)
531
532    def test_update_with_directory(self, mock_getmtime):
533        """Tests the update command with a directory format database."""
534        db = database.load_token_database(
535            io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS)
536        )
537        self.assertEqual(len(db), TOKENS_IN_ELF)
538
539        the_time = [100]
540
541        def move_back_time_if_file_exists(path):
542            if os.path.exists(path):
543                the_time[0] -= 1
544                return the_time[0]
545
546            raise FileNotFoundError
547
548        mock_getmtime.side_effect = move_back_time_if_file_exists
549
550        with tempfile.TemporaryDirectory() as dbdir:
551            with tempfile.NamedTemporaryFile(
552                'wb', delete=False, suffix='.pw_tokenizer.csv', dir=dbdir
553            ) as matching_suffix_file, tempfile.NamedTemporaryFile(
554                'wb', delete=False, suffix='.not.right', dir=dbdir
555            ) as mismatched_suffix_file:
556                try:
557                    matching_suffix_file.close()
558                    mismatched_suffix_file.close()
559
560                    pool = ManualPoolExecutor()
561                    detok = detokenize.AutoUpdatingDetokenizer(
562                        dbdir, min_poll_period_s=0, pool=pool
563                    )
564                    self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok())
565
566                    with open(mismatched_suffix_file.name, 'wb') as fd:
567                        tokens.write_csv(db, fd)
568                    pool.process()
569                    self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok())
570
571                    with open(matching_suffix_file.name, 'wb') as fd:
572                        tokens.write_csv(db, fd)
573
574                    # After the change but before the pool runs in another
575                    # thread, the token should not exist.
576                    self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok())
577                    pool.process()
578
579                    # After the pool is allowed to process, it should.
580                    self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
581                finally:
582                    os.unlink(mismatched_suffix_file.name)
583                    os.unlink(matching_suffix_file.name)
584                    os.rmdir(dbdir)
585
586        # The database stays around if the file is deleted.
587        self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
588
589    def test_no_update_if_time_is_same(self, mock_getmtime):
590        mock_getmtime.return_value = 100
591
592        with tempfile.NamedTemporaryFile('wb', delete=False) as file:
593            try:
594                tokens.write_csv(
595                    database.load_token_database(
596                        io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS)
597                    ),
598                    file,
599                )
600                file.close()
601
602                detok = detokenize.AutoUpdatingDetokenizer(
603                    file.name, min_poll_period_s=0, pool=InlinePoolExecutor()
604                )
605                self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
606
607                # Empty the database, but keep the mock modified time the same.
608                with open(file.name, 'wb'):
609                    pass
610
611                self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
612                self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
613
614                # Move back time so the now-empty file is reloaded.
615                mock_getmtime.return_value = 50
616                self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok())
617            finally:
618                os.unlink(file.name)
619
620    def test_token_domain_in_str(self, _) -> None:
621        """Tests a str containing a domain"""
622        detok = detokenize.AutoUpdatingDetokenizer(
623            f'{ELF_WITH_TOKENIZER_SECTIONS_PATH}#.*',
624            min_poll_period_s=0,
625            pool=InlinePoolExecutor(),
626        )
627        self.assertEqual(
628            len(detok.database), TOKENS_IN_ELF_WITH_TOKENIZER_SECTIONS
629        )
630
631    def test_token_domain_in_path(self, _) -> None:
632        """Tests a Path() containing a domain"""
633        detok = detokenize.AutoUpdatingDetokenizer(
634            Path(f'{ELF_WITH_TOKENIZER_SECTIONS_PATH}#.*'),
635            min_poll_period_s=0,
636            pool=InlinePoolExecutor(),
637        )
638        self.assertEqual(
639            len(detok.database), TOKENS_IN_ELF_WITH_TOKENIZER_SECTIONS
640        )
641
642    def test_token_no_domain_in_str(self, _) -> None:
643        """Tests a str without a domain"""
644        detok = detokenize.AutoUpdatingDetokenizer(
645            str(ELF_WITH_TOKENIZER_SECTIONS_PATH),
646            min_poll_period_s=0,
647            pool=InlinePoolExecutor(),
648        )
649        self.assertEqual(len(detok.database), TOKENS_IN_ELF)
650
651    def test_token_no_domain_in_path(self, _) -> None:
652        """Tests a Path() without a domain"""
653        detok = detokenize.AutoUpdatingDetokenizer(
654            ELF_WITH_TOKENIZER_SECTIONS_PATH,
655            min_poll_period_s=0,
656            pool=InlinePoolExecutor(),
657        )
658        self.assertEqual(len(detok.database), TOKENS_IN_ELF)
659
660
661def _next_char(message: bytes) -> bytes:
662    return bytes(b + 1 for b in message)
663
664
665class NestedMessageParserTest(unittest.TestCase):
666    """Tests parsing prefixed messages."""
667
668    class _Case(NamedTuple):
669        data: bytes
670        expected: bytes
671        title: str
672        transform: Callable[[bytes], bytes] = _next_char
673
674    TRANSFORM_TEST_CASES = (
675        _Case(b'$abcd', b'%bcde', 'single message'),
676        _Case(
677            b'$$WHAT?$abc$WHY? is this $ok $',
678            b'%%WHAT?%bcd%WHY? is this %ok %',
679            'message and non-message',
680        ),
681        _Case(b'$1$', b'%1%', 'empty message'),
682        _Case(b'$abc$defgh', b'%bcd%efghh', 'sequential message'),
683        _Case(
684            b'w$abcx$defygh$$abz',
685            b'w$ABCx$DEFygh$$ABz',
686            'interspersed start/end non-message',
687            bytes.upper,
688        ),
689        _Case(
690            b'$abcx$defygh$$ab',
691            b'$ABCx$DEFygh$$AB',
692            'interspersed start/end message ',
693            bytes.upper,
694        ),
695    )
696
697    def setUp(self) -> None:
698        self.decoder = detokenize.NestedMessageParser('$', 'abcdefg')
699
700    def test_transform_io(self) -> None:
701        for data, expected, title, transform in self.TRANSFORM_TEST_CASES:
702            self.assertEqual(
703                expected,
704                b''.join(
705                    self.decoder.transform_io(io.BytesIO(data), transform)
706                ),
707                f'{title}: {data!r}',
708            )
709
710    def test_transform_bytes_with_flush(self) -> None:
711        for data, expected, title, transform in self.TRANSFORM_TEST_CASES:
712            self.assertEqual(
713                expected,
714                self.decoder.transform(data, transform, flush=True),
715                f'{title}: {data!r}',
716            )
717
718    def test_transform_bytes_sequential(self) -> None:
719        transform = lambda message: message.upper().replace(b'$', b'*')
720
721        self.assertEqual(self.decoder.transform(b'abc$abcd', transform), b'abc')
722        self.assertEqual(self.decoder.transform(b'$', transform), b'*ABCD')
723        self.assertEqual(self.decoder.transform(b'$b', transform), b'*')
724        self.assertEqual(self.decoder.transform(b'', transform), b'')
725        self.assertEqual(self.decoder.transform(b' ', transform), b'*B ')
726        self.assertEqual(self.decoder.transform(b'hello', transform), b'hello')
727        self.assertEqual(self.decoder.transform(b'?? $ab', transform), b'?? ')
728        self.assertEqual(
729            self.decoder.transform(b'123$ab4$56$a', transform), b'*AB123*AB4*56'
730        )
731        self.assertEqual(
732            self.decoder.transform(b'bc', transform, flush=True), b'*ABC'
733        )
734
735    MESSAGES_TEST: Any = (
736        (b'123$abc456$a', (False, b'123'), (True, b'$abc'), (False, b'456')),
737        (b'7$abcd', (True, b'$a'), (False, b'7')),
738        (b'e',),
739        (b'',),
740        (b'$', (True, b'$abcde')),
741        (b'$', (True, b'$')),
742        (b'$a$b$c', (True, b'$'), (True, b'$a'), (True, b'$b')),
743        (b'1', (True, b'$c'), (False, b'1')),
744        (b'',),
745        (b'?', (False, b'?')),
746        (b'!@', (False, b'!@')),
747        (b'%^&', (False, b'%^&')),
748    )
749
750    def test_read_messages(self) -> None:
751        for step in self.MESSAGES_TEST:
752            data: bytes = step[0]
753            pieces: tuple[tuple[bool, bytes], ...] = step[1:]
754            self.assertEqual(tuple(self.decoder.read_messages(data)), pieces)
755
756    def test_read_messages_flush(self) -> None:
757        self.assertEqual(
758            list(self.decoder.read_messages(b'123$a')), [(False, b'123')]
759        )
760        self.assertEqual(list(self.decoder.read_messages(b'b')), [])
761        self.assertEqual(
762            list(self.decoder.read_messages(b'', flush=True)), [(True, b'$ab')]
763        )
764
765    def test_read_messages_io(self) -> None:
766        # Rework the read_messages test data for stream input.
767        data = io.BytesIO(b''.join(step[0] for step in self.MESSAGES_TEST))
768        expected_pieces = sum((step[1:] for step in self.MESSAGES_TEST), ())
769
770        result = self.decoder.read_messages_io(data)
771        for expected_is_message, expected_data in expected_pieces:
772            if expected_is_message:
773                is_message, piece = next(result)
774                self.assertTrue(is_message)
775                self.assertEqual(expected_data, piece)
776            else:  # the IO version yields non-messages byte by byte
777                for byte in expected_data:
778                    is_message, piece = next(result)
779                    self.assertFalse(is_message)
780                    self.assertEqual(bytes([byte]), piece)
781
782
783class DetokenizeNested(unittest.TestCase):
784    """Tests detokenizing nested tokens"""
785
786    def test_nested_hashed_arg(self):
787        detok = detokenize.Detokenizer(
788            tokens.Database(
789                [
790                    tokens.TokenizedStringEntry(0xA, 'tokenized argument'),
791                    tokens.TokenizedStringEntry(
792                        2,
793                        'This is a ' + '$#%08x',
794                    ),
795                ]
796            )
797        )
798        self.assertEqual(
799            str(detok.detokenize(b'\x02\0\0\0\x14')),
800            'This is a tokenized argument',
801        )
802
803    def test_nested_base64_arg(self):
804        detok = detokenize.Detokenizer(
805            tokens.Database(
806                [
807                    tokens.TokenizedStringEntry(1, 'base64 argument'),
808                    tokens.TokenizedStringEntry(2, 'This is a %s'),
809                ]
810            )
811        )
812        self.assertEqual(
813            str(detok.detokenize(b'\x02\0\0\0\x09$AQAAAA==')),  # token for 1
814            'This is a base64 argument',
815        )
816
817    def test_deeply_nested_arg(self):
818        detok = detokenize.Detokenizer(
819            tokens.Database(
820                [
821                    tokens.TokenizedStringEntry(1, '$10#0000000005'),
822                    tokens.TokenizedStringEntry(2, 'This is a $#%08x'),
823                    tokens.TokenizedStringEntry(3, 'deeply nested argument'),
824                    tokens.TokenizedStringEntry(4, '$AQAAAA=='),
825                    tokens.TokenizedStringEntry(5, '$AwAAAA=='),
826                ]
827            )
828        )
829        self.assertEqual(
830            str(detok.detokenize(b'\x02\0\0\0\x08')),  # token for 4
831            'This is a deeply nested argument',
832        )
833
834
835class DetokenizeBase64(unittest.TestCase):
836    """Tests detokenizing Base64 messages."""
837
838    JELLO = b'$' + base64.b64encode(JELLO_WORLD_TOKEN)
839
840    RECURSION_STRING = f'The secret message is "{JELLO.decode()}"'
841    RECURSION = b'$' + base64.b64encode(
842        struct.pack('I', tokens.c_hash(RECURSION_STRING))
843    )
844
845    RECURSION_STRING_2 = f"'{RECURSION.decode()}', said the spy."
846    RECURSION_2 = b'$' + base64.b64encode(
847        struct.pack('I', tokens.c_hash(RECURSION_STRING_2))
848    )
849
850    TEST_CASES = (
851        (b'', b''),
852        (b'nothing here', b'nothing here'),
853        (JELLO, b'Jello, world!'),
854        (JELLO + b'a', b'Jello, world!a'),
855        (JELLO + b'abc', b'Jello, world!abc'),
856        (JELLO + b'abc=', b'Jello, world!abc='),
857        (b'$a' + JELLO + b'a', b'$aJello, world!a'),
858        (b'Hello ' + JELLO + b'?', b'Hello Jello, world!?'),
859        (b'$' + JELLO, b'$Jello, world!'),
860        (JELLO + JELLO, b'Jello, world!Jello, world!'),
861        (JELLO + b'$' + JELLO, b'Jello, world!$Jello, world!'),
862        (JELLO + b'$a' + JELLO + b'bcd', b'Jello, world!$aJello, world!bcd'),
863        (b'$3141', b'$3141'),
864        (JELLO + b'$3141', b'Jello, world!$3141'),
865        (
866            JELLO + b'$a' + JELLO + b'b' + JELLO + b'c',
867            b'Jello, world!$aJello, world!bJello, world!c',
868        ),
869        (RECURSION, b'The secret message is "Jello, world!"'),
870        (
871            RECURSION_2,
872            b'\'The secret message is "Jello, world!"\', said the spy.',
873        ),
874    )
875
876    def setUp(self):
877        super().setUp()
878        db = database.load_token_database(
879            io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS)
880        )
881        db.add(
882            tokens.TokenizedStringEntry(tokens.c_hash(s), s)
883            for s in [self.RECURSION_STRING, self.RECURSION_STRING_2]
884        )
885        self.detok = detokenize.Detokenizer(db)
886
887    def test_detokenize_base64_live(self):
888        for data, expected in self.TEST_CASES:
889            output = io.BytesIO()
890            self.detok.detokenize_base64_live(io.BytesIO(data), output, '$')
891
892            self.assertEqual(expected, output.getvalue(), f'Input: {data!r}')
893
894    def test_detokenize_base64_to_file(self):
895        for data, expected in self.TEST_CASES:
896            output = io.BytesIO()
897            self.detok.detokenize_base64_to_file(data, output, '$')
898
899            self.assertEqual(expected, output.getvalue())
900
901    def test_detokenize_base64(self):
902        for data, expected in self.TEST_CASES:
903            self.assertEqual(expected, self.detok.detokenize_base64(data, b'$'))
904
905    def test_detokenize_base64_str(self):
906        for data, expected in self.TEST_CASES:
907            self.assertEqual(
908                expected.decode(), self.detok.detokenize_base64(data.decode())
909            )
910
911
912class DetokenizeInfiniteRecursion(unittest.TestCase):
913    """Tests that infinite Base64 token recursion resolves."""
914
915    def setUp(self):
916        super().setUp()
917        self.detok = detokenize.Detokenizer(
918            tokens.Database(
919                [
920                    tokens.TokenizedStringEntry(0, '$AAAAAA=='),  # token for 0
921                    tokens.TokenizedStringEntry(1, '$AgAAAA=='),  # token for 2
922                    tokens.TokenizedStringEntry(2, '$#00000003'),  # token for 3
923                    tokens.TokenizedStringEntry(3, '$AgAAAA=='),  # token for 2
924                ]
925            )
926        )
927
928    def test_detokenize_self_recursion(self):
929        for depth in range(5):
930            self.assertEqual(
931                self.detok.detokenize_text(
932                    b'This one is deep: $AAAAAA==', recursion=depth
933                ),
934                b'This one is deep: $AAAAAA==',
935            )
936
937    def test_detokenize_self_recursion_default(self):
938        self.assertEqual(
939            self.detok.detokenize_text(
940                b'This one is deep: $AAAAAA==',
941            ),
942            b'This one is deep: $AAAAAA==',
943        )
944
945    def test_detokenize_cyclic_recursion_even(self):
946        self.assertEqual(
947            self.detok.detokenize_text(b'I said "$AQAAAA=="', recursion=6),
948            b'I said "$AgAAAA=="',
949        )
950
951    def test_detokenize_cyclic_recursion_odd(self):
952        self.assertEqual(
953            self.detok.detokenize_text(b'I said "$AQAAAA=="', recursion=7),
954            b'I said "$#00000003"',
955        )
956
957
958class DetokenizeBase64InfiniteRecursion(unittest.TestCase):
959    """Tests that infinite Bas64 token recursion resolves."""
960
961    def setUp(self):
962        super().setUp()
963        self.detok = detokenize.Detokenizer(
964            tokens.Database(
965                [
966                    tokens.TokenizedStringEntry(0, '$AAAAAA=='),  # token for 0
967                    tokens.TokenizedStringEntry(1, '$AgAAAA=='),  # token for 2
968                    tokens.TokenizedStringEntry(2, '$AwAAAA=='),  # token for 3
969                    tokens.TokenizedStringEntry(3, '$AgAAAA=='),  # token for 2
970                ]
971            )
972        )
973
974    def test_detokenize_self_recursion(self):
975        for depth in range(5):
976            self.assertEqual(
977                self.detok.detokenize_base64(
978                    b'This one is deep: $AAAAAA==', recursion=depth
979                ),
980                b'This one is deep: $AAAAAA==',
981            )
982
983    def test_detokenize_self_recursion_default(self):
984        self.assertEqual(
985            self.detok.detokenize_base64(b'This one is deep: $64#AAAAAA=='),
986            b'This one is deep: $AAAAAA==',
987        )
988
989    def test_detokenize_cyclic_recursion_even(self):
990        self.assertEqual(
991            self.detok.detokenize_base64(b'I said "$AQAAAA=="', recursion=2),
992            b'I said "$AgAAAA=="',
993        )
994
995    def test_detokenize_cyclic_recursion_odd(self):
996        self.assertEqual(
997            self.detok.detokenize_base64(b'I said "$AQAAAA=="', recursion=3),
998            b'I said "$AwAAAA=="',
999        )
1000
1001
1002if __name__ == '__main__':
1003    unittest.main()
1004