• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# Copyright 2020 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15"""Tests for the database module."""
16
17import json
18import io
19import os
20from pathlib import Path
21import shutil
22import stat
23import subprocess
24import sys
25import tempfile
26import unittest
27from unittest import mock
28
29from pw_tokenizer import database
30
31# This is an ELF file with only the pw_tokenizer sections. It was created
32# from a tokenize_test binary built for the STM32F429i Discovery board. The
33# pw_tokenizer sections were extracted with this command:
34#
35#   arm-none-eabi-objcopy -S --only-section ".pw_tokenize*" <ELF> <OUTPUT>
36#
37TOKENIZED_ENTRIES_ELF = (
38    Path(__file__).parent / 'example_binary_with_tokenized_strings.elf'
39)
40
41CSV_DEFAULT_DOMAIN = '''\
4200000000,          ,""
43141c35d5,          ,"The answer: ""%s"""
4429aef586,          ,"1234"
452b78825f,          ,"[:-)"
462e668cd6,          ,"Jello, world!"
4731631781,          ,"%d"
4861fd1e26,          ,"%ld"
4968ab92da,          ,"%s there are %x (%.2f) of them%c"
507b940e2a,          ,"Hello %s! %hd %e"
517da55d52,          ,">:-[]"
527f35a9a5,          ,"TestName"
53851beeb6,          ,"%u %d"
54881436a0,          ,"The answer is: %s"
5588808930,          ,"%u%d%02x%X%hu%hhd%d%ld%lu%lld%llu%c%c%c"
5692723f44,          ,"???"
57a09d6698,          ,"won-won-won-wonderful"
58aa9ffa66,          ,"void pw::tokenizer::{anonymous}::TestName()"
59ad002c97,          ,"%llx"
60b3653e13,          ,"Jello!"
61cc6d3131,          ,"Jello?"
62e13b0f94,          ,"%llu"
63e65aefef,          ,"Won't fit : %s%d"
64'''
65
66CSV_TEST_DOMAIN = """\
6717fa86d3,          ,"hello"
6818c5017c,          ,"yes"
6959b2701c,          ,"The answer was: %s"
70881436a0,          ,"The answer is: %s"
71d18ada0f,          ,"something"
72"""
73
74CSV_ALL_DOMAINS = '''\
7500000000,          ,""
76141c35d5,          ,"The answer: ""%s"""
7717fa86d3,          ,"hello"
7818c5017c,          ,"yes"
7929aef586,          ,"1234"
802b78825f,          ,"[:-)"
812e668cd6,          ,"Jello, world!"
8231631781,          ,"%d"
8359b2701c,          ,"The answer was: %s"
8461fd1e26,          ,"%ld"
8568ab92da,          ,"%s there are %x (%.2f) of them%c"
867b940e2a,          ,"Hello %s! %hd %e"
877da55d52,          ,">:-[]"
887f35a9a5,          ,"TestName"
89851beeb6,          ,"%u %d"
90881436a0,          ,"The answer is: %s"
9188808930,          ,"%u%d%02x%X%hu%hhd%d%ld%lu%lld%llu%c%c%c"
9292723f44,          ,"???"
93a09d6698,          ,"won-won-won-wonderful"
94aa9ffa66,          ,"void pw::tokenizer::{anonymous}::TestName()"
95ad002c97,          ,"%llx"
96b3653e13,          ,"Jello!"
97cc6d3131,          ,"Jello?"
98d18ada0f,          ,"something"
99e13b0f94,          ,"%llu"
100e65aefef,          ,"Won't fit : %s%d"
101'''
102
103JSON_SOURCE_STRINGS = '''\
104[
105  "pigweed/pw_polyfill/standard_library_public/pw_polyfill/standard_library/assert.h",
106  "protocol_buffer/gen/pigweed/pw_protobuf/common_protos.proto_library/nanopb/pw_protobuf_protos/status.pb.h",
107  "pigweed/pw_rpc/client_server.cc",
108  "pigweed/pw_rpc/public/pw_rpc/client_server.h",
109  "This is a very long string that will produce two tokens; one for C++ and one for C. This is because this string exceeds the default C hash length."
110]
111'''
112
113CSV_STRINGS = '''\
1142cbf627a,          ,"pigweed/pw_rpc/client_server.cc"
115666562a1,          ,"protocol_buffer/gen/pigweed/pw_protobuf/common_protos.proto_library/nanopb/pw_protobuf_protos/status.pb.h"
1166c1e6eb3,          ,"pigweed/pw_rpc/public/pw_rpc/client_server.h"
117b25a9932,          ,"This is a very long string that will produce two tokens; one for C++ and one for C. This is because this string exceeds the default C hash length."
118eadf017f,          ,"pigweed/pw_polyfill/standard_library_public/pw_polyfill/standard_library/assert.h"
119f815dc5c,          ,"This is a very long string that will produce two tokens; one for C++ and one for C. This is because this string exceeds the default C hash length."
120'''
121
122EXPECTED_REPORT = {
123    str(TOKENIZED_ENTRIES_ELF): {
124        '': {
125            'present_entries': 22,
126            'present_size_bytes': 289,
127            'total_entries': 22,
128            'total_size_bytes': 289,
129            'collisions': {},
130        },
131        'TEST_DOMAIN': {
132            'present_entries': 5,
133            'present_size_bytes': 57,
134            'total_entries': 5,
135            'total_size_bytes': 57,
136            'collisions': {},
137        },
138    }
139}
140
141
142def run_cli(*args) -> None:
143    original_argv = sys.argv
144    sys.argv = ['database.py', *(str(a) for a in args)]
145    # pylint: disable=protected-access
146    try:
147        database._main(*database._parse_args())
148    finally:
149        # Remove the log handler added by _main to avoid duplicate logs.
150        if database._LOG.handlers:
151            database._LOG.handlers.pop()
152        # pylint: enable=protected-access
153
154        sys.argv = original_argv
155
156
157def _mock_output() -> io.TextIOWrapper:
158    output = io.BytesIO()
159    output.name = '<fake stdout>'
160    return io.TextIOWrapper(output, write_through=True)
161
162
163def _remove_readonly(  # pylint: disable=unused-argument
164    func, path, excinfo
165) -> None:
166    """Changes file permission and recalls the calling function."""
167    print('Path attempted to be deleted:', path)
168    if not os.access(path, os.W_OK):
169        # Change file permissions.
170        os.chmod(path, stat.S_IWUSR)
171        # Call the calling function again.
172        func(path)
173
174
175class DatabaseCommandLineTest(unittest.TestCase):
176    """Tests the database.py command line interface."""
177
178    def setUp(self) -> None:
179        self._dir = Path(tempfile.mkdtemp('_pw_tokenizer_test'))
180        self._csv = self._dir / 'db.csv'
181        self._elf = TOKENIZED_ENTRIES_ELF
182
183        self._csv_test_domain = CSV_TEST_DOMAIN
184
185    def tearDown(self) -> None:
186        shutil.rmtree(self._dir)
187
188    def test_create_csv(self) -> None:
189        run_cli('create', '--database', self._csv, self._elf)
190
191        self.assertEqual(
192            CSV_DEFAULT_DOMAIN.splitlines(), self._csv.read_text().splitlines()
193        )
194
195    def test_create_csv_test_domain(self) -> None:
196        run_cli('create', '--database', self._csv, f'{self._elf}#TEST_DOMAIN')
197
198        self.assertEqual(
199            self._csv_test_domain.splitlines(),
200            self._csv.read_text().splitlines(),
201        )
202
203    def test_create_csv_all_domains(self) -> None:
204        run_cli('create', '--database', self._csv, f'{self._elf}#.*')
205
206        self.assertEqual(
207            CSV_ALL_DOMAINS.splitlines(), self._csv.read_text().splitlines()
208        )
209
210    def test_create_force(self) -> None:
211        self._csv.write_text(CSV_ALL_DOMAINS)
212
213        with self.assertRaises(FileExistsError):
214            run_cli('create', '--database', self._csv, self._elf)
215
216        run_cli('create', '--force', '--database', self._csv, self._elf)
217
218    def test_create_binary(self) -> None:
219        binary = self._dir / 'db.bin'
220        run_cli('create', '--type', 'binary', '--database', binary, self._elf)
221
222        # Write the binary database as CSV to verify its contents.
223        run_cli('create', '--database', self._csv, binary)
224
225        self.assertEqual(
226            CSV_DEFAULT_DOMAIN.splitlines(), self._csv.read_text().splitlines()
227        )
228
229    def test_add_does_not_recalculate_tokens(self) -> None:
230        db_with_custom_token = '01234567,          ,"hello"'
231
232        to_add = self._dir / 'add_this.csv'
233        to_add.write_text(db_with_custom_token + '\n')
234        self._csv.touch()
235
236        run_cli('add', '--database', self._csv, to_add)
237        self.assertEqual(
238            db_with_custom_token.splitlines(),
239            self._csv.read_text().splitlines(),
240        )
241
242    def test_mark_removed(self) -> None:
243        self._csv.write_text(CSV_ALL_DOMAINS)
244
245        run_cli(
246            'mark_removed',
247            '--database',
248            self._csv,
249            '--date',
250            '1998-09-04',
251            self._elf,
252        )
253
254        # Add the removal date to the four tokens not in the default domain
255        new_csv = CSV_ALL_DOMAINS
256        new_csv = new_csv.replace(
257            '17fa86d3,          ,"hello"', '17fa86d3,1998-09-04,"hello"'
258        )
259        new_csv = new_csv.replace(
260            '18c5017c,          ,"yes"', '18c5017c,1998-09-04,"yes"'
261        )
262        new_csv = new_csv.replace(
263            '59b2701c,          ,"The answer was: %s"',
264            '59b2701c,1998-09-04,"The answer was: %s"',
265        )
266        new_csv = new_csv.replace(
267            'd18ada0f,          ,"something"', 'd18ada0f,1998-09-04,"something"'
268        )
269        self.assertNotEqual(CSV_ALL_DOMAINS, new_csv)
270
271        self.assertEqual(
272            new_csv.splitlines(), self._csv.read_text().splitlines()
273        )
274
275    def test_purge(self) -> None:
276        self._csv.write_text(CSV_ALL_DOMAINS)
277
278        # Mark everything not in TEST_DOMAIN as removed.
279        run_cli(
280            'mark_removed', '--database', self._csv, f'{self._elf}#TEST_DOMAIN'
281        )
282
283        # Delete all entries except those in TEST_DOMAIN.
284        run_cli('purge', '--database', self._csv)
285
286        self.assertEqual(
287            self._csv_test_domain.splitlines(),
288            self._csv.read_text().splitlines(),
289        )
290
291    @mock.patch('sys.stdout', new_callable=_mock_output)
292    def test_report(self, mock_stdout) -> None:
293        run_cli('report', self._elf)
294
295        self.assertEqual(
296            json.loads(mock_stdout.buffer.getvalue()), EXPECTED_REPORT
297        )
298
299    def test_replace(self) -> None:
300        sub = 'replace/ment'
301        run_cli(
302            'create',
303            '--database',
304            self._csv,
305            self._elf,
306            '--replace',
307            r'(?i)\b[jh]ello\b/' + sub,
308        )
309        self.assertEqual(
310            CSV_DEFAULT_DOMAIN.replace('Jello', sub).replace('Hello', sub),
311            self._csv.read_text(),
312        )
313
314    def test_json_strings(self) -> None:
315        strings_file = self._dir / "strings.json"
316
317        with open(strings_file, 'w') as file:
318            file.write(JSON_SOURCE_STRINGS)
319
320        run_cli('create', '--force', '--database', self._csv, strings_file)
321        self.assertEqual(
322            CSV_STRINGS.splitlines(), self._csv.read_text().splitlines()
323        )
324
325
326class TestDirectoryDatabaseCommandLine(unittest.TestCase):
327    """Tests the directory database command line interface."""
328
329    def setUp(self) -> None:
330        self._dir = Path(tempfile.mkdtemp('_pw_tokenizer_test'))
331        self._db_dir = self._dir / '_dir_database_test'
332        self._db_dir.mkdir(exist_ok=True)
333        self._db_csv = self._db_dir / '8123913.pw_tokenizer.csv'
334        self._elf = TOKENIZED_ENTRIES_ELF
335        self._csv_test_domain = CSV_TEST_DOMAIN
336
337    def _git(self, *command: str) -> None:
338        """Runs git in self._dir with forced user name and email values.
339
340        Prevents accidentally running git in the wrong directory and avoids
341        errors if the name and email are not configured.
342        """
343        subprocess.run(
344            [
345                'git',
346                '-c',
347                'user.name=pw_tokenizer tests',
348                '-c',
349                'user.email=noreply@google.com',
350                *command,
351            ],
352            cwd=self._dir,
353            check=True,
354        )
355
356    def tearDown(self) -> None:
357        shutil.rmtree(self._dir, onerror=_remove_readonly)
358
359    def test_add_csv_to_dir(self) -> None:
360        """Tests a CSV can be created within the database."""
361        run_cli('add', '--database', self._db_dir, f'{self._elf}#TEST_DOMAIN')
362        directory = list(self._db_dir.iterdir())
363
364        self.assertEqual(1, len(directory))
365
366        self._db_csv = directory.pop()
367
368        self.assertEqual(
369            self._csv_test_domain.splitlines(),
370            self._db_csv.read_text().splitlines(),
371        )
372
373    def test_add_all_domains_to_dir(self) -> None:
374        """Tests a CSV with all domains can be added to the database."""
375        run_cli('add', '--database', self._db_dir, f'{self._elf}#.*')
376        directory = list(self._db_dir.iterdir())
377
378        self.assertEqual(1, len(directory))
379
380        self._db_csv = directory.pop()
381
382        self.assertEqual(
383            CSV_ALL_DOMAINS.splitlines(), self._db_csv.read_text().splitlines()
384        )
385
386    def test_not_adding_existing_tokens(self) -> None:
387        """Tests duplicate tokens are not added to the database."""
388        run_cli('add', '--database', self._db_dir, f'{self._elf}#TEST_DOMAIN')
389        run_cli('add', '--database', self._db_dir, f'{self._elf}#TEST_DOMAIN')
390        directory = list(self._db_dir.iterdir())
391
392        self.assertEqual(1, len(directory))
393
394        self._db_csv = directory.pop()
395
396        self.assertEqual(
397            self._csv_test_domain.splitlines(),
398            self._db_csv.read_text().splitlines(),
399        )
400
401    def test_adding_tokens_without_git_repo(self):
402        """Tests creating new files with new entries when no repo exists."""
403        # Add CSV_TEST_DOMAIN to a new CSV in the directory database.
404        run_cli('add', '--database', self._db_dir, f'{self._elf}#TEST_DOMAIN')
405        directory = list(self._db_dir.iterdir())
406
407        self.assertEqual(1, len(directory))
408
409        first_csv_in_db = directory.pop()
410
411        self.assertEqual(
412            self._csv_test_domain.splitlines(),
413            first_csv_in_db.read_text().splitlines(),
414        )
415        # Add CSV_ALL_DOMAINS to a new CSV in the directory database.
416        run_cli('add', '--database', self._db_dir, f'{self._elf}#.*')
417        directory = list(self._db_dir.iterdir())
418        # Assert two different CSVs were created to store new tokens.
419        self.assertEqual(2, len(directory))
420        # Retrieve the other CSV in the directory.
421        second_csv_in_db = (
422            directory[0] if directory[0] != first_csv_in_db else directory[1]
423        )
424
425        self.assertNotEqual(first_csv_in_db, second_csv_in_db)
426        self.assertEqual(
427            self._csv_test_domain.splitlines(),
428            first_csv_in_db.read_text().splitlines(),
429        )
430
431        # Retrieve entries that exclusively exist in CSV_ALL_DOMAINS
432        # as CSV_ALL_DOMAINS contains all entries in TEST_DOMAIN.
433        entries_exclusively_in_all_domain = set(
434            CSV_ALL_DOMAINS.splitlines()
435        ) - set(self._csv_test_domain.splitlines())
436        # Ensure only new tokens not in CSV_TEST_DOMAIN were added to
437        # the second CSV added to the directory database.
438        self.assertEqual(
439            entries_exclusively_in_all_domain,
440            set(second_csv_in_db.read_text().splitlines()),
441        )
442
443    def test_untracked_files_in_dir(self):
444        """Tests untracked CSVs are reused by the database."""
445        self._git('init')
446        # Add CSV_TEST_DOMAIN to a new CSV in the directory database.
447        run_cli(
448            'add',
449            '--database',
450            self._db_dir,
451            '--discard-temporary',
452            'HEAD',
453            f'{self._elf}#TEST_DOMAIN',
454        )
455        directory = list(self._db_dir.iterdir())
456
457        self.assertEqual(1, len(directory))
458
459        first_path_in_db = directory.pop()
460
461        self.assertEqual(
462            self._csv_test_domain.splitlines(),
463            first_path_in_db.read_text().splitlines(),
464        )
465        # Retrieve the untracked CSV in the Git repository and discard
466        # tokens that do not exist in CSV_DEFAULT_DOMAIN.
467        run_cli(
468            'add',
469            '--database',
470            self._db_dir,
471            '--discard-temporary',
472            'HEAD',
473            self._elf,
474        )
475        directory = list(self._db_dir.iterdir())
476
477        self.assertEqual(1, len(directory))
478
479        reused_path_in_db = directory.pop()
480        # Ensure the first path created is the same being reused. Also,
481        # the CSV content is the same as CSV_DEFAULT_DOMAIN.
482        self.assertEqual(first_path_in_db, reused_path_in_db)
483        self.assertEqual(
484            CSV_DEFAULT_DOMAIN.splitlines(),
485            reused_path_in_db.read_text().splitlines(),
486        )
487
488    def test_adding_multiple_elf_files(self) -> None:
489        """Tests adding multiple elf files to a file in the database."""
490        # Add CSV_TEST_DOMAIN to a new CSV in the directory database.
491        run_cli(
492            'add',
493            '--database',
494            self._db_dir,
495            f'{self._elf}#TEST_DOMAIN',
496            self._elf,
497        )
498        directory = list(self._db_dir.iterdir())
499
500        self.assertEqual(1, len(directory))
501        # Combines CSV_DEFAULT_DOMAIN and TEST_DOMAIN into a unique set
502        # of token entries.
503        entries_from_default_and_test_domain = set(
504            CSV_DEFAULT_DOMAIN.splitlines()
505        ).union(set(self._csv_test_domain.splitlines()))
506        # Multiple ELF files were added at once to a single CSV.
507        self.assertEqual(
508            entries_from_default_and_test_domain,
509            set(directory.pop().read_text().splitlines()),
510        )
511
512    def test_discarding_old_entries(self) -> None:
513        """Tests discarding old entries for new entries when re-adding."""
514        self._git('init')
515        # Add CSV_ALL_DOMAINS to a new CSV in the directory database.
516        run_cli(
517            'add',
518            '--database',
519            self._db_dir,
520            '--discard-temporary',
521            'HEAD',
522            f'{self._elf}#.*',
523        )
524        directory = list(self._db_dir.iterdir())
525
526        self.assertEqual(1, len(directory))
527
528        untracked_path_in_db = directory.pop()
529
530        self.assertEqual(
531            CSV_ALL_DOMAINS.splitlines(),
532            untracked_path_in_db.read_text().splitlines(),
533        )
534        # Add CSV_DEFAULT_DOMAIN and CSV_TEST_DOMAIN to a CSV in the
535        # directory database, while replacing entries in CSV_ALL_DOMAINS
536        # that no longer exist.
537        run_cli(
538            'add',
539            '--database',
540            self._db_dir,
541            '--discard-temporary',
542            'HEAD',
543            f'{self._elf}#TEST_DOMAIN',
544            self._elf,
545        )
546        directory = list(self._db_dir.iterdir())
547
548        self.assertEqual(1, len(directory))
549
550        reused_path_in_db = directory.pop()
551        # Combines CSV_DEFAULT_DOMAIN and TEST_DOMAIN.
552        entries_from_default_and_test_domain = set(
553            CSV_DEFAULT_DOMAIN.splitlines()
554        ).union(set(self._csv_test_domain.splitlines()))
555
556        self.assertEqual(untracked_path_in_db, reused_path_in_db)
557        self.assertEqual(
558            entries_from_default_and_test_domain,
559            set(reused_path_in_db.read_text().splitlines()),
560        )
561
562    def test_retrieving_csv_from_commit(self) -> None:
563        """Tests retrieving a CSV from a commit and removing temp tokens."""
564        self._git('init')
565        self._git('commit', '--allow-empty', '-m', 'First Commit')
566        # Add CSV_ALL_DOMAINS to a new CSV in the directory database.
567        run_cli('add', '--database', self._db_dir, f'{self._elf}#.*')
568        directory = list(self._db_dir.iterdir())
569
570        self.assertEqual(1, len(directory))
571
572        tracked_path_in_db = directory.pop()
573
574        self.assertEqual(
575            CSV_ALL_DOMAINS.splitlines(),
576            tracked_path_in_db.read_text().splitlines(),
577        )
578        # Commit the CSV to avoid retrieving the CSV with the checks
579        # for untracked changes.
580        self._git('add', '--all')
581        self._git('commit', '-m', 'Adding a CSV to a new commit.')
582        # Retrieve the CSV in HEAD~ and discard tokens that exist in
583        # CSV_ALL_DOMAINS and not exist in CSV_TEST_DOMAIN.
584        run_cli(
585            'add',
586            '--database',
587            self._db_dir,
588            '--discard-temporary',
589            'HEAD~2',
590            f'{self._elf}#TEST_DOMAIN',
591        )
592        directory = list(self._db_dir.iterdir())
593
594        self.assertEqual(1, len(directory))
595
596        reused_path_in_db = directory.pop()
597
598        self.assertEqual(
599            self._csv_test_domain.splitlines(),
600            reused_path_in_db.read_text().splitlines(),
601        )
602
603
604if __name__ == '__main__':
605    unittest.main()
606