1#!/usr/bin/env python3 2# Copyright 2020 The Pigweed Authors 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5# use this file except in compliance with the License. You may obtain a copy of 6# the License at 7# 8# https://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15"""Tests for the database module.""" 16 17import json 18import io 19import os 20from pathlib import Path 21import shutil 22import stat 23import subprocess 24import sys 25import tempfile 26import unittest 27from unittest import mock 28 29from pw_tokenizer import database 30 31# This is an ELF file with only the pw_tokenizer sections. It was created 32# from a tokenize_test binary built for the STM32F429i Discovery board. The 33# pw_tokenizer sections were extracted with this command: 34# 35# arm-none-eabi-objcopy -S --only-section ".pw_tokenize*" <ELF> <OUTPUT> 36# 37TOKENIZED_ENTRIES_ELF = ( 38 Path(__file__).parent / 'example_binary_with_tokenized_strings.elf' 39) 40 41CSV_DEFAULT_DOMAIN = '''\ 4200000000, ,"" 43141c35d5, ,"The answer: ""%s""" 4429aef586, ,"1234" 452b78825f, ,"[:-)" 462e668cd6, ,"Jello, world!" 4731631781, ,"%d" 4861fd1e26, ,"%ld" 4968ab92da, ,"%s there are %x (%.2f) of them%c" 507b940e2a, ,"Hello %s! %hd %e" 517da55d52, ,">:-[]" 527f35a9a5, ,"TestName" 53851beeb6, ,"%u %d" 54881436a0, ,"The answer is: %s" 5588808930, ,"%u%d%02x%X%hu%hhd%d%ld%lu%lld%llu%c%c%c" 5692723f44, ,"???" 57a09d6698, ,"won-won-won-wonderful" 58aa9ffa66, ,"void pw::tokenizer::{anonymous}::TestName()" 59ad002c97, ,"%llx" 60b3653e13, ,"Jello!" 61cc6d3131, ,"Jello?" 62e13b0f94, ,"%llu" 63e65aefef, ,"Won't fit : %s%d" 64''' 65 66CSV_TEST_DOMAIN = """\ 6717fa86d3, ,"hello" 6818c5017c, ,"yes" 6959b2701c, ,"The answer was: %s" 70881436a0, ,"The answer is: %s" 71d18ada0f, ,"something" 72""" 73 74CSV_ALL_DOMAINS = '''\ 7500000000, ,"" 76141c35d5, ,"The answer: ""%s""" 7717fa86d3, ,"hello" 7818c5017c, ,"yes" 7929aef586, ,"1234" 802b78825f, ,"[:-)" 812e668cd6, ,"Jello, world!" 8231631781, ,"%d" 8359b2701c, ,"The answer was: %s" 8461fd1e26, ,"%ld" 8568ab92da, ,"%s there are %x (%.2f) of them%c" 867b940e2a, ,"Hello %s! %hd %e" 877da55d52, ,">:-[]" 887f35a9a5, ,"TestName" 89851beeb6, ,"%u %d" 90881436a0, ,"The answer is: %s" 9188808930, ,"%u%d%02x%X%hu%hhd%d%ld%lu%lld%llu%c%c%c" 9292723f44, ,"???" 93a09d6698, ,"won-won-won-wonderful" 94aa9ffa66, ,"void pw::tokenizer::{anonymous}::TestName()" 95ad002c97, ,"%llx" 96b3653e13, ,"Jello!" 97cc6d3131, ,"Jello?" 98d18ada0f, ,"something" 99e13b0f94, ,"%llu" 100e65aefef, ,"Won't fit : %s%d" 101''' 102 103JSON_SOURCE_STRINGS = '''\ 104[ 105 "pigweed/pw_polyfill/standard_library_public/pw_polyfill/standard_library/assert.h", 106 "protocol_buffer/gen/pigweed/pw_protobuf/common_protos.proto_library/nanopb/pw_protobuf_protos/status.pb.h", 107 "pigweed/pw_rpc/client_server.cc", 108 "pigweed/pw_rpc/public/pw_rpc/client_server.h", 109 "This is a very long string that will produce two tokens; one for C++ and one for C. This is because this string exceeds the default C hash length." 110] 111''' 112 113CSV_STRINGS = '''\ 1142cbf627a, ,"pigweed/pw_rpc/client_server.cc" 115666562a1, ,"protocol_buffer/gen/pigweed/pw_protobuf/common_protos.proto_library/nanopb/pw_protobuf_protos/status.pb.h" 1166c1e6eb3, ,"pigweed/pw_rpc/public/pw_rpc/client_server.h" 117b25a9932, ,"This is a very long string that will produce two tokens; one for C++ and one for C. This is because this string exceeds the default C hash length." 118eadf017f, ,"pigweed/pw_polyfill/standard_library_public/pw_polyfill/standard_library/assert.h" 119f815dc5c, ,"This is a very long string that will produce two tokens; one for C++ and one for C. This is because this string exceeds the default C hash length." 120''' 121 122EXPECTED_REPORT = { 123 str(TOKENIZED_ENTRIES_ELF): { 124 '': { 125 'present_entries': 22, 126 'present_size_bytes': 289, 127 'total_entries': 22, 128 'total_size_bytes': 289, 129 'collisions': {}, 130 }, 131 'TEST_DOMAIN': { 132 'present_entries': 5, 133 'present_size_bytes': 57, 134 'total_entries': 5, 135 'total_size_bytes': 57, 136 'collisions': {}, 137 }, 138 } 139} 140 141 142def run_cli(*args) -> None: 143 original_argv = sys.argv 144 sys.argv = ['database.py', *(str(a) for a in args)] 145 # pylint: disable=protected-access 146 try: 147 database._main(*database._parse_args()) 148 finally: 149 # Remove the log handler added by _main to avoid duplicate logs. 150 if database._LOG.handlers: 151 database._LOG.handlers.pop() 152 # pylint: enable=protected-access 153 154 sys.argv = original_argv 155 156 157def _mock_output() -> io.TextIOWrapper: 158 output = io.BytesIO() 159 output.name = '<fake stdout>' 160 return io.TextIOWrapper(output, write_through=True) 161 162 163def _remove_readonly( # pylint: disable=unused-argument 164 func, path, excinfo 165) -> None: 166 """Changes file permission and recalls the calling function.""" 167 print('Path attempted to be deleted:', path) 168 if not os.access(path, os.W_OK): 169 # Change file permissions. 170 os.chmod(path, stat.S_IWUSR) 171 # Call the calling function again. 172 func(path) 173 174 175class DatabaseCommandLineTest(unittest.TestCase): 176 """Tests the database.py command line interface.""" 177 178 def setUp(self) -> None: 179 self._dir = Path(tempfile.mkdtemp('_pw_tokenizer_test')) 180 self._csv = self._dir / 'db.csv' 181 self._elf = TOKENIZED_ENTRIES_ELF 182 183 self._csv_test_domain = CSV_TEST_DOMAIN 184 185 def tearDown(self) -> None: 186 shutil.rmtree(self._dir) 187 188 def test_create_csv(self) -> None: 189 run_cli('create', '--database', self._csv, self._elf) 190 191 self.assertEqual( 192 CSV_DEFAULT_DOMAIN.splitlines(), self._csv.read_text().splitlines() 193 ) 194 195 def test_create_csv_test_domain(self) -> None: 196 run_cli('create', '--database', self._csv, f'{self._elf}#TEST_DOMAIN') 197 198 self.assertEqual( 199 self._csv_test_domain.splitlines(), 200 self._csv.read_text().splitlines(), 201 ) 202 203 def test_create_csv_all_domains(self) -> None: 204 run_cli('create', '--database', self._csv, f'{self._elf}#.*') 205 206 self.assertEqual( 207 CSV_ALL_DOMAINS.splitlines(), self._csv.read_text().splitlines() 208 ) 209 210 def test_create_force(self) -> None: 211 self._csv.write_text(CSV_ALL_DOMAINS) 212 213 with self.assertRaises(FileExistsError): 214 run_cli('create', '--database', self._csv, self._elf) 215 216 run_cli('create', '--force', '--database', self._csv, self._elf) 217 218 def test_create_binary(self) -> None: 219 binary = self._dir / 'db.bin' 220 run_cli('create', '--type', 'binary', '--database', binary, self._elf) 221 222 # Write the binary database as CSV to verify its contents. 223 run_cli('create', '--database', self._csv, binary) 224 225 self.assertEqual( 226 CSV_DEFAULT_DOMAIN.splitlines(), self._csv.read_text().splitlines() 227 ) 228 229 def test_add_does_not_recalculate_tokens(self) -> None: 230 db_with_custom_token = '01234567, ,"hello"' 231 232 to_add = self._dir / 'add_this.csv' 233 to_add.write_text(db_with_custom_token + '\n') 234 self._csv.touch() 235 236 run_cli('add', '--database', self._csv, to_add) 237 self.assertEqual( 238 db_with_custom_token.splitlines(), 239 self._csv.read_text().splitlines(), 240 ) 241 242 def test_mark_removed(self) -> None: 243 self._csv.write_text(CSV_ALL_DOMAINS) 244 245 run_cli( 246 'mark_removed', 247 '--database', 248 self._csv, 249 '--date', 250 '1998-09-04', 251 self._elf, 252 ) 253 254 # Add the removal date to the four tokens not in the default domain 255 new_csv = CSV_ALL_DOMAINS 256 new_csv = new_csv.replace( 257 '17fa86d3, ,"hello"', '17fa86d3,1998-09-04,"hello"' 258 ) 259 new_csv = new_csv.replace( 260 '18c5017c, ,"yes"', '18c5017c,1998-09-04,"yes"' 261 ) 262 new_csv = new_csv.replace( 263 '59b2701c, ,"The answer was: %s"', 264 '59b2701c,1998-09-04,"The answer was: %s"', 265 ) 266 new_csv = new_csv.replace( 267 'd18ada0f, ,"something"', 'd18ada0f,1998-09-04,"something"' 268 ) 269 self.assertNotEqual(CSV_ALL_DOMAINS, new_csv) 270 271 self.assertEqual( 272 new_csv.splitlines(), self._csv.read_text().splitlines() 273 ) 274 275 def test_purge(self) -> None: 276 self._csv.write_text(CSV_ALL_DOMAINS) 277 278 # Mark everything not in TEST_DOMAIN as removed. 279 run_cli( 280 'mark_removed', '--database', self._csv, f'{self._elf}#TEST_DOMAIN' 281 ) 282 283 # Delete all entries except those in TEST_DOMAIN. 284 run_cli('purge', '--database', self._csv) 285 286 self.assertEqual( 287 self._csv_test_domain.splitlines(), 288 self._csv.read_text().splitlines(), 289 ) 290 291 @mock.patch('sys.stdout', new_callable=_mock_output) 292 def test_report(self, mock_stdout) -> None: 293 run_cli('report', self._elf) 294 295 self.assertEqual( 296 json.loads(mock_stdout.buffer.getvalue()), EXPECTED_REPORT 297 ) 298 299 def test_replace(self) -> None: 300 sub = 'replace/ment' 301 run_cli( 302 'create', 303 '--database', 304 self._csv, 305 self._elf, 306 '--replace', 307 r'(?i)\b[jh]ello\b/' + sub, 308 ) 309 self.assertEqual( 310 CSV_DEFAULT_DOMAIN.replace('Jello', sub).replace('Hello', sub), 311 self._csv.read_text(), 312 ) 313 314 def test_json_strings(self) -> None: 315 strings_file = self._dir / "strings.json" 316 317 with open(strings_file, 'w') as file: 318 file.write(JSON_SOURCE_STRINGS) 319 320 run_cli('create', '--force', '--database', self._csv, strings_file) 321 self.assertEqual( 322 CSV_STRINGS.splitlines(), self._csv.read_text().splitlines() 323 ) 324 325 326class TestDirectoryDatabaseCommandLine(unittest.TestCase): 327 """Tests the directory database command line interface.""" 328 329 def setUp(self) -> None: 330 self._dir = Path(tempfile.mkdtemp('_pw_tokenizer_test')) 331 self._db_dir = self._dir / '_dir_database_test' 332 self._db_dir.mkdir(exist_ok=True) 333 self._db_csv = self._db_dir / '8123913.pw_tokenizer.csv' 334 self._elf = TOKENIZED_ENTRIES_ELF 335 self._csv_test_domain = CSV_TEST_DOMAIN 336 337 def _git(self, *command: str) -> None: 338 """Runs git in self._dir with forced user name and email values. 339 340 Prevents accidentally running git in the wrong directory and avoids 341 errors if the name and email are not configured. 342 """ 343 subprocess.run( 344 [ 345 'git', 346 '-c', 347 'user.name=pw_tokenizer tests', 348 '-c', 349 'user.email=noreply@google.com', 350 *command, 351 ], 352 cwd=self._dir, 353 check=True, 354 ) 355 356 def tearDown(self) -> None: 357 shutil.rmtree(self._dir, onerror=_remove_readonly) 358 359 def test_add_csv_to_dir(self) -> None: 360 """Tests a CSV can be created within the database.""" 361 run_cli('add', '--database', self._db_dir, f'{self._elf}#TEST_DOMAIN') 362 directory = list(self._db_dir.iterdir()) 363 364 self.assertEqual(1, len(directory)) 365 366 self._db_csv = directory.pop() 367 368 self.assertEqual( 369 self._csv_test_domain.splitlines(), 370 self._db_csv.read_text().splitlines(), 371 ) 372 373 def test_add_all_domains_to_dir(self) -> None: 374 """Tests a CSV with all domains can be added to the database.""" 375 run_cli('add', '--database', self._db_dir, f'{self._elf}#.*') 376 directory = list(self._db_dir.iterdir()) 377 378 self.assertEqual(1, len(directory)) 379 380 self._db_csv = directory.pop() 381 382 self.assertEqual( 383 CSV_ALL_DOMAINS.splitlines(), self._db_csv.read_text().splitlines() 384 ) 385 386 def test_not_adding_existing_tokens(self) -> None: 387 """Tests duplicate tokens are not added to the database.""" 388 run_cli('add', '--database', self._db_dir, f'{self._elf}#TEST_DOMAIN') 389 run_cli('add', '--database', self._db_dir, f'{self._elf}#TEST_DOMAIN') 390 directory = list(self._db_dir.iterdir()) 391 392 self.assertEqual(1, len(directory)) 393 394 self._db_csv = directory.pop() 395 396 self.assertEqual( 397 self._csv_test_domain.splitlines(), 398 self._db_csv.read_text().splitlines(), 399 ) 400 401 def test_adding_tokens_without_git_repo(self): 402 """Tests creating new files with new entries when no repo exists.""" 403 # Add CSV_TEST_DOMAIN to a new CSV in the directory database. 404 run_cli('add', '--database', self._db_dir, f'{self._elf}#TEST_DOMAIN') 405 directory = list(self._db_dir.iterdir()) 406 407 self.assertEqual(1, len(directory)) 408 409 first_csv_in_db = directory.pop() 410 411 self.assertEqual( 412 self._csv_test_domain.splitlines(), 413 first_csv_in_db.read_text().splitlines(), 414 ) 415 # Add CSV_ALL_DOMAINS to a new CSV in the directory database. 416 run_cli('add', '--database', self._db_dir, f'{self._elf}#.*') 417 directory = list(self._db_dir.iterdir()) 418 # Assert two different CSVs were created to store new tokens. 419 self.assertEqual(2, len(directory)) 420 # Retrieve the other CSV in the directory. 421 second_csv_in_db = ( 422 directory[0] if directory[0] != first_csv_in_db else directory[1] 423 ) 424 425 self.assertNotEqual(first_csv_in_db, second_csv_in_db) 426 self.assertEqual( 427 self._csv_test_domain.splitlines(), 428 first_csv_in_db.read_text().splitlines(), 429 ) 430 431 # Retrieve entries that exclusively exist in CSV_ALL_DOMAINS 432 # as CSV_ALL_DOMAINS contains all entries in TEST_DOMAIN. 433 entries_exclusively_in_all_domain = set( 434 CSV_ALL_DOMAINS.splitlines() 435 ) - set(self._csv_test_domain.splitlines()) 436 # Ensure only new tokens not in CSV_TEST_DOMAIN were added to 437 # the second CSV added to the directory database. 438 self.assertEqual( 439 entries_exclusively_in_all_domain, 440 set(second_csv_in_db.read_text().splitlines()), 441 ) 442 443 def test_untracked_files_in_dir(self): 444 """Tests untracked CSVs are reused by the database.""" 445 self._git('init') 446 # Add CSV_TEST_DOMAIN to a new CSV in the directory database. 447 run_cli( 448 'add', 449 '--database', 450 self._db_dir, 451 '--discard-temporary', 452 'HEAD', 453 f'{self._elf}#TEST_DOMAIN', 454 ) 455 directory = list(self._db_dir.iterdir()) 456 457 self.assertEqual(1, len(directory)) 458 459 first_path_in_db = directory.pop() 460 461 self.assertEqual( 462 self._csv_test_domain.splitlines(), 463 first_path_in_db.read_text().splitlines(), 464 ) 465 # Retrieve the untracked CSV in the Git repository and discard 466 # tokens that do not exist in CSV_DEFAULT_DOMAIN. 467 run_cli( 468 'add', 469 '--database', 470 self._db_dir, 471 '--discard-temporary', 472 'HEAD', 473 self._elf, 474 ) 475 directory = list(self._db_dir.iterdir()) 476 477 self.assertEqual(1, len(directory)) 478 479 reused_path_in_db = directory.pop() 480 # Ensure the first path created is the same being reused. Also, 481 # the CSV content is the same as CSV_DEFAULT_DOMAIN. 482 self.assertEqual(first_path_in_db, reused_path_in_db) 483 self.assertEqual( 484 CSV_DEFAULT_DOMAIN.splitlines(), 485 reused_path_in_db.read_text().splitlines(), 486 ) 487 488 def test_adding_multiple_elf_files(self) -> None: 489 """Tests adding multiple elf files to a file in the database.""" 490 # Add CSV_TEST_DOMAIN to a new CSV in the directory database. 491 run_cli( 492 'add', 493 '--database', 494 self._db_dir, 495 f'{self._elf}#TEST_DOMAIN', 496 self._elf, 497 ) 498 directory = list(self._db_dir.iterdir()) 499 500 self.assertEqual(1, len(directory)) 501 # Combines CSV_DEFAULT_DOMAIN and TEST_DOMAIN into a unique set 502 # of token entries. 503 entries_from_default_and_test_domain = set( 504 CSV_DEFAULT_DOMAIN.splitlines() 505 ).union(set(self._csv_test_domain.splitlines())) 506 # Multiple ELF files were added at once to a single CSV. 507 self.assertEqual( 508 entries_from_default_and_test_domain, 509 set(directory.pop().read_text().splitlines()), 510 ) 511 512 def test_discarding_old_entries(self) -> None: 513 """Tests discarding old entries for new entries when re-adding.""" 514 self._git('init') 515 # Add CSV_ALL_DOMAINS to a new CSV in the directory database. 516 run_cli( 517 'add', 518 '--database', 519 self._db_dir, 520 '--discard-temporary', 521 'HEAD', 522 f'{self._elf}#.*', 523 ) 524 directory = list(self._db_dir.iterdir()) 525 526 self.assertEqual(1, len(directory)) 527 528 untracked_path_in_db = directory.pop() 529 530 self.assertEqual( 531 CSV_ALL_DOMAINS.splitlines(), 532 untracked_path_in_db.read_text().splitlines(), 533 ) 534 # Add CSV_DEFAULT_DOMAIN and CSV_TEST_DOMAIN to a CSV in the 535 # directory database, while replacing entries in CSV_ALL_DOMAINS 536 # that no longer exist. 537 run_cli( 538 'add', 539 '--database', 540 self._db_dir, 541 '--discard-temporary', 542 'HEAD', 543 f'{self._elf}#TEST_DOMAIN', 544 self._elf, 545 ) 546 directory = list(self._db_dir.iterdir()) 547 548 self.assertEqual(1, len(directory)) 549 550 reused_path_in_db = directory.pop() 551 # Combines CSV_DEFAULT_DOMAIN and TEST_DOMAIN. 552 entries_from_default_and_test_domain = set( 553 CSV_DEFAULT_DOMAIN.splitlines() 554 ).union(set(self._csv_test_domain.splitlines())) 555 556 self.assertEqual(untracked_path_in_db, reused_path_in_db) 557 self.assertEqual( 558 entries_from_default_and_test_domain, 559 set(reused_path_in_db.read_text().splitlines()), 560 ) 561 562 def test_retrieving_csv_from_commit(self) -> None: 563 """Tests retrieving a CSV from a commit and removing temp tokens.""" 564 self._git('init') 565 self._git('commit', '--allow-empty', '-m', 'First Commit') 566 # Add CSV_ALL_DOMAINS to a new CSV in the directory database. 567 run_cli('add', '--database', self._db_dir, f'{self._elf}#.*') 568 directory = list(self._db_dir.iterdir()) 569 570 self.assertEqual(1, len(directory)) 571 572 tracked_path_in_db = directory.pop() 573 574 self.assertEqual( 575 CSV_ALL_DOMAINS.splitlines(), 576 tracked_path_in_db.read_text().splitlines(), 577 ) 578 # Commit the CSV to avoid retrieving the CSV with the checks 579 # for untracked changes. 580 self._git('add', '--all') 581 self._git('commit', '-m', 'Adding a CSV to a new commit.') 582 # Retrieve the CSV in HEAD~ and discard tokens that exist in 583 # CSV_ALL_DOMAINS and not exist in CSV_TEST_DOMAIN. 584 run_cli( 585 'add', 586 '--database', 587 self._db_dir, 588 '--discard-temporary', 589 'HEAD~2', 590 f'{self._elf}#TEST_DOMAIN', 591 ) 592 directory = list(self._db_dir.iterdir()) 593 594 self.assertEqual(1, len(directory)) 595 596 reused_path_in_db = directory.pop() 597 598 self.assertEqual( 599 self._csv_test_domain.splitlines(), 600 reused_path_in_db.read_text().splitlines(), 601 ) 602 603 604if __name__ == '__main__': 605 unittest.main() 606