1#!/usr/bin/env python3 2# Copyright 2020 The Pigweed Authors 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5# use this file except in compliance with the License. You may obtain a copy of 6# the License at 7# 8# https://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15r"""Decodes and detokenizes strings from binary or Base64 input. 16 17The main class provided by this module is the Detokenize class. To use it, 18construct it with the path to an ELF or CSV database, a tokens.Database, 19or a file object for an ELF file or CSV. Then, call the detokenize method with 20encoded messages, one at a time. The detokenize method returns a 21DetokenizedString object with the result. 22 23For example:: 24 25 from pw_tokenizer import detokenize 26 27 detok = detokenize.Detokenizer('path/to/firmware/image.elf') 28 print(detok.detokenize(b'\x12\x34\x56\x78\x03hi!')) 29 30This module also provides a command line interface for decoding and detokenizing 31messages from a file or stdin. 32""" 33 34import argparse 35import base64 36import binascii 37from concurrent.futures import Executor, ThreadPoolExecutor 38import enum 39import io 40import logging 41import os 42from pathlib import Path 43import re 44import string 45import struct 46import sys 47import threading 48import time 49from typing import ( 50 AnyStr, 51 BinaryIO, 52 Callable, 53 Iterable, 54 Iterator, 55 Match, 56 NamedTuple, 57 Pattern, 58) 59 60try: 61 from pw_tokenizer import database, decode, encode, tokens 62except ImportError: 63 # Append this path to the module search path to allow running this module 64 # without installing the pw_tokenizer package. 65 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) 66 from pw_tokenizer import database, decode, encode, tokens 67 68_LOG = logging.getLogger('pw_tokenizer') 69 70ENCODED_TOKEN = struct.Struct('<I') 71_BASE64_CHARS = string.ascii_letters + string.digits + '+/-_=' 72DEFAULT_RECURSION = 9 73NESTED_TOKEN_PREFIX = encode.NESTED_TOKEN_PREFIX.encode() 74NESTED_TOKEN_BASE_PREFIX = encode.NESTED_TOKEN_BASE_PREFIX.encode() 75 76_BASE8_TOKEN_REGEX = rb'(?P<base8>[0-7]{11})' 77_BASE10_TOKEN_REGEX = rb'(?P<base10>[0-9]{10})' 78_BASE16_TOKEN_REGEX = rb'(?P<base16>[A-Fa-f0-9]{8})' 79_BASE64_TOKEN_REGEX = ( 80 rb'(?P<base64>' 81 # Tokenized Base64 contains 0 or more blocks of four Base64 chars. 82 rb'(?:[A-Za-z0-9+/\-_]{4})*' 83 # The last block of 4 chars may have one or two padding chars (=). 84 rb'(?:[A-Za-z0-9+/\-_]{3}=|[A-Za-z0-9+/\-_]{2}==)?' 85 rb')' 86) 87_NESTED_TOKEN_FORMATS = ( 88 _BASE8_TOKEN_REGEX, 89 _BASE10_TOKEN_REGEX, 90 _BASE16_TOKEN_REGEX, 91 _BASE64_TOKEN_REGEX, 92) 93 94 95def _token_regex(prefix: bytes) -> Pattern[bytes]: 96 """Returns a regular expression for prefixed tokenized strings.""" 97 return re.compile( 98 # Tokenized strings start with the prefix character ($). 99 re.escape(prefix) 100 # Optional; no base specifier defaults to BASE64. 101 # Hash (#) with no number specified defaults to Base-16. 102 + rb'(?P<basespec>(?P<base>[0-9]*)?' 103 + NESTED_TOKEN_BASE_PREFIX 104 + rb')?' 105 # Match one of the following token formats. 106 + rb'(' 107 + rb'|'.join(_NESTED_TOKEN_FORMATS) 108 + rb')' 109 ) 110 111 112class DetokenizedString: 113 """A detokenized string, with all results if there are collisions.""" 114 115 def __init__( 116 self, 117 token: int | None, 118 format_string_entries: Iterable[tuple], 119 encoded_message: bytes, 120 show_errors: bool = False, 121 recursive_detokenize: Callable[[str], str] | None = None, 122 ): 123 self.token = token 124 self.encoded_message = encoded_message 125 self._show_errors = show_errors 126 127 self.successes: list[decode.FormattedString] = [] 128 self.failures: list[decode.FormattedString] = [] 129 130 decode_attempts: list[tuple[tuple, decode.FormattedString]] = [] 131 132 for entry, fmt in format_string_entries: 133 result = fmt.format( 134 encoded_message[ENCODED_TOKEN.size :], show_errors 135 ) 136 if recursive_detokenize: 137 result = decode.FormattedString( 138 recursive_detokenize(result.value), 139 result.args, 140 result.remaining, 141 ) 142 decode_attempts.append((result.score(entry.date_removed), result)) 143 144 # Sort the attempts by the score so the most likely results are first. 145 decode_attempts.sort(key=lambda value: value[0], reverse=True) 146 147 # Split out the successesful decodes from the failures. 148 for score, result in decode_attempts: 149 if score[0]: 150 self.successes.append(result) 151 else: 152 self.failures.append(result) 153 154 def ok(self) -> bool: 155 """True if exactly one string decoded the arguments successfully.""" 156 return len(self.successes) == 1 157 158 def matches(self) -> list[decode.FormattedString]: 159 """Returns the strings that matched the token, best matches first.""" 160 return self.successes + self.failures 161 162 def best_result(self) -> decode.FormattedString | None: 163 """Returns the string and args for the most likely decoded string.""" 164 for string_and_args in self.matches(): 165 return string_and_args 166 167 return None 168 169 def error_message(self) -> str: 170 """If detokenization failed, returns a descriptive message.""" 171 if self.ok(): 172 return '' 173 174 if not self.matches(): 175 if self.token is None: 176 return 'missing token' 177 178 return 'unknown token {:08x}'.format(self.token) 179 180 if len(self.matches()) == 1: 181 return 'decoding failed for {!r}'.format(self.matches()[0].value) 182 183 return '{} matches'.format(len(self.matches())) 184 185 def __str__(self) -> str: 186 """Returns the string for the most likely result.""" 187 result = self.best_result() 188 if result: 189 return result[0] 190 191 if self._show_errors: 192 return '<[ERROR: {}|{!r}]>'.format( 193 self.error_message(), self.encoded_message 194 ) 195 196 # Display the string as prefixed Base64 if it cannot be decoded. 197 return encode.prefixed_base64(self.encoded_message) 198 199 def __repr__(self) -> str: 200 if self.ok(): 201 message = repr(str(self)) 202 else: 203 message = 'ERROR: {}|{!r}'.format( 204 self.error_message(), self.encoded_message 205 ) 206 207 return '{}({})'.format(type(self).__name__, message) 208 209 210class _TokenizedFormatString(NamedTuple): 211 entry: tokens.TokenizedStringEntry 212 format: decode.FormatString 213 214 215class Detokenizer: 216 """Main detokenization class; detokenizes strings and caches results.""" 217 218 def __init__(self, *token_database_or_elf, show_errors: bool = False): 219 """Decodes and detokenizes binary messages. 220 221 Args: 222 *token_database_or_elf: a path or file object for an ELF or CSV 223 database, a tokens.Database, or an elf_reader.Elf 224 show_errors: if True, an error message is used in place of the % 225 conversion specifier when an argument fails to decode 226 """ 227 self.show_errors = show_errors 228 229 self._database_lock = threading.Lock() 230 231 # Cache FormatStrings for faster lookup & formatting. 232 self._cache: dict[int, list[_TokenizedFormatString]] = {} 233 234 self._initialize_database(token_database_or_elf) 235 236 def _initialize_database(self, token_sources: Iterable) -> None: 237 with self._database_lock: 238 self.database = database.load_token_database(*token_sources) 239 self._cache.clear() 240 241 def lookup(self, token: int) -> list[_TokenizedFormatString]: 242 """Returns (TokenizedStringEntry, FormatString) list for matches.""" 243 with self._database_lock: 244 try: 245 return self._cache[token] 246 except KeyError: 247 format_strings = [ 248 _TokenizedFormatString( 249 entry, decode.FormatString(str(entry)) 250 ) 251 for entry in self.database.token_to_entries[token] 252 ] 253 self._cache[token] = format_strings 254 return format_strings 255 256 def detokenize( 257 self, 258 encoded_message: bytes, 259 prefix: str | bytes = NESTED_TOKEN_PREFIX, 260 recursion: int = DEFAULT_RECURSION, 261 ) -> DetokenizedString: 262 """Decodes and detokenizes a message as a DetokenizedString.""" 263 if not encoded_message: 264 return DetokenizedString( 265 None, (), encoded_message, self.show_errors 266 ) 267 268 # Pad messages smaller than ENCODED_TOKEN.size with zeroes to support 269 # tokens smaller than a uint32. Messages with arguments must always use 270 # a full 32-bit token. 271 missing_token_bytes = ENCODED_TOKEN.size - len(encoded_message) 272 if missing_token_bytes > 0: 273 encoded_message += b'\0' * missing_token_bytes 274 275 (token,) = ENCODED_TOKEN.unpack_from(encoded_message) 276 277 recursive_detokenize = None 278 if recursion > 0: 279 recursive_detokenize = self._detokenize_nested_callback( 280 prefix, recursion 281 ) 282 283 return DetokenizedString( 284 token, 285 self.lookup(token), 286 encoded_message, 287 self.show_errors, 288 recursive_detokenize, 289 ) 290 291 def detokenize_text( 292 self, 293 data: AnyStr, 294 prefix: str | bytes = NESTED_TOKEN_PREFIX, 295 recursion: int = DEFAULT_RECURSION, 296 ) -> AnyStr: 297 """Decodes and replaces prefixed Base64 messages in the provided data. 298 299 Args: 300 data: the binary data to decode 301 prefix: one-character byte string that signals the start of a message 302 recursion: how many levels to recursively decode 303 304 Returns: 305 copy of the data with all recognized tokens decoded 306 """ 307 return self._detokenize_nested_callback(prefix, recursion)(data) 308 309 # TODO(gschen): remove unnecessary function 310 def detokenize_base64( 311 self, 312 data: AnyStr, 313 prefix: str | bytes = NESTED_TOKEN_PREFIX, 314 recursion: int = DEFAULT_RECURSION, 315 ) -> AnyStr: 316 """Alias of detokenize_text for backwards compatibility.""" 317 return self.detokenize_text(data, prefix, recursion) 318 319 def detokenize_text_to_file( 320 self, 321 data: AnyStr, 322 output: BinaryIO, 323 prefix: str | bytes = NESTED_TOKEN_PREFIX, 324 recursion: int = DEFAULT_RECURSION, 325 ) -> None: 326 """Decodes prefixed Base64 messages in data; decodes to output file.""" 327 output.write(self._detokenize_nested(data, prefix, recursion)) 328 329 # TODO(gschen): remove unnecessary function 330 def detokenize_base64_to_file( 331 self, 332 data: AnyStr, 333 output: BinaryIO, 334 prefix: str | bytes = NESTED_TOKEN_PREFIX, 335 recursion: int = DEFAULT_RECURSION, 336 ) -> None: 337 """Alias of detokenize_text_to_file for backwards compatibility.""" 338 self.detokenize_text_to_file(data, output, prefix, recursion) 339 340 def detokenize_text_live( 341 self, 342 input_file: io.RawIOBase | BinaryIO, 343 output: BinaryIO, 344 prefix: str | bytes = NESTED_TOKEN_PREFIX, 345 recursion: int = DEFAULT_RECURSION, 346 ) -> None: 347 """Reads chars one-at-a-time, decoding messages; SLOW for big files.""" 348 349 def transform(data: bytes) -> bytes: 350 return self._detokenize_nested(data.decode(), prefix, recursion) 351 352 for message in NestedMessageParser(prefix, _BASE64_CHARS).transform_io( 353 input_file, transform 354 ): 355 output.write(message) 356 357 # Flush each line to prevent delays when piping between processes. 358 if b'\n' in message: 359 output.flush() 360 361 # TODO(gschen): remove unnecessary function 362 def detokenize_base64_live( 363 self, 364 input_file: io.RawIOBase | BinaryIO, 365 output: BinaryIO, 366 prefix: str | bytes = NESTED_TOKEN_PREFIX, 367 recursion: int = DEFAULT_RECURSION, 368 ) -> None: 369 """Alias of detokenize_text_live for backwards compatibility.""" 370 self.detokenize_text_live(input_file, output, prefix, recursion) 371 372 def _detokenize_nested_callback( 373 self, 374 prefix: str | bytes, 375 recursion: int, 376 ) -> Callable[[AnyStr], AnyStr]: 377 """Returns a function that replaces all tokens for a given string.""" 378 379 def detokenize(message: AnyStr) -> AnyStr: 380 result = self._detokenize_nested(message, prefix, recursion) 381 return result.decode() if isinstance(message, str) else result 382 383 return detokenize 384 385 def _detokenize_nested( 386 self, 387 message: str | bytes, 388 prefix: str | bytes, 389 recursion: int, 390 ) -> bytes: 391 """Returns the message with recognized tokens replaced. 392 393 Message data is internally handled as bytes regardless of input message 394 type and returns the result as bytes. 395 """ 396 # A unified format across the token types is required for regex 397 # consistency. 398 message = message.encode() if isinstance(message, str) else message 399 prefix = prefix.encode() if isinstance(prefix, str) else prefix 400 401 if not self.database: 402 return message 403 404 result = message 405 for _ in range(recursion - 1): 406 result = _token_regex(prefix).sub(self._detokenize_scan, result) 407 408 if result == message: 409 return result 410 return result 411 412 def _detokenize_scan(self, match: Match[bytes]) -> bytes: 413 """Decodes prefixed tokens for one of multiple formats.""" 414 basespec = match.group('basespec') 415 base = match.group('base') 416 417 if not basespec or (base == b'64'): 418 return self._detokenize_once_base64(match) 419 420 if not base: 421 base = b'16' 422 423 return self._detokenize_once(match, base) 424 425 def _detokenize_once( 426 self, 427 match: Match[bytes], 428 base: bytes, 429 ) -> bytes: 430 """Performs lookup on a plain token""" 431 original = match.group(0) 432 token = match.group('base' + base.decode()) 433 if not token: 434 return original 435 436 token = int(token, int(base)) 437 entries = self.database.token_to_entries[token] 438 439 if len(entries) == 1: 440 return str(entries[0]).encode() 441 442 # TODO(gschen): improve token collision reporting 443 444 return original 445 446 def _detokenize_once_base64( 447 self, 448 match: Match[bytes], 449 ) -> bytes: 450 """Performs lookup on a Base64 token""" 451 original = match.group(0) 452 453 try: 454 encoded_token = match.group('base64') 455 if not encoded_token: 456 return original 457 458 detokenized_string = self.detokenize( 459 base64.b64decode(encoded_token, validate=True), recursion=0 460 ) 461 462 if detokenized_string.matches(): 463 return str(detokenized_string).encode() 464 465 except binascii.Error: 466 pass 467 468 return original 469 470 471# TODO: b/265334753 - Reuse this function in database.py:LoadTokenDatabases 472def _parse_domain(path: Path | str) -> tuple[Path, Pattern[str] | None]: 473 """Extracts an optional domain regex pattern suffix from a path""" 474 475 if isinstance(path, Path): 476 path = str(path) 477 478 delimiters = path.count('#') 479 480 if delimiters == 0: 481 return Path(path), None 482 483 if delimiters == 1: 484 path, domain = path.split('#') 485 return Path(path), re.compile(domain) 486 487 raise ValueError( 488 f'Too many # delimiters. Expected 0 or 1, found {delimiters}' 489 ) 490 491 492class AutoUpdatingDetokenizer(Detokenizer): 493 """Loads and updates a detokenizer from database paths.""" 494 495 class _DatabasePath: 496 """Tracks the modified time of a path or file object.""" 497 498 def __init__(self, path: Path | str) -> None: 499 self.path, self.domain = _parse_domain(path) 500 self._modified_time: float | None = self._last_modified_time() 501 502 def updated(self) -> bool: 503 """True if the path has been updated since the last call.""" 504 modified_time = self._last_modified_time() 505 if modified_time is None or modified_time == self._modified_time: 506 return False 507 508 self._modified_time = modified_time 509 return True 510 511 def _last_modified_time(self) -> float | None: 512 if self.path.is_dir(): 513 mtime = -1.0 514 for child in self.path.glob(tokens.DIR_DB_GLOB): 515 mtime = max(mtime, os.path.getmtime(child)) 516 return mtime if mtime >= 0 else None 517 518 try: 519 return os.path.getmtime(self.path) 520 except FileNotFoundError: 521 return None 522 523 def load(self) -> tokens.Database: 524 try: 525 if self.domain is not None: 526 return database.load_token_database( 527 self.path, domain=self.domain 528 ) 529 return database.load_token_database(self.path) 530 except FileNotFoundError: 531 return database.load_token_database() 532 533 def __init__( 534 self, 535 *paths_or_files: Path | str, 536 min_poll_period_s: float = 1.0, 537 pool: Executor = ThreadPoolExecutor(max_workers=1), 538 ) -> None: 539 self.paths = tuple(self._DatabasePath(path) for path in paths_or_files) 540 self.min_poll_period_s = min_poll_period_s 541 self._last_checked_time: float = time.time() 542 # Thread pool to use for loading the databases. Limit to a single 543 # worker since this is low volume and not time critical. 544 self._pool = pool 545 super().__init__(*(path.load() for path in self.paths)) 546 547 def __del__(self) -> None: 548 self._pool.shutdown(wait=False) 549 550 def _reload_paths(self) -> None: 551 self._initialize_database([path.load() for path in self.paths]) 552 553 def _reload_if_changed(self) -> None: 554 if time.time() - self._last_checked_time >= self.min_poll_period_s: 555 self._last_checked_time = time.time() 556 557 if any(path.updated() for path in self.paths): 558 _LOG.info('Changes detected; reloading token database') 559 self._pool.submit(self._reload_paths) 560 561 def lookup(self, token: int) -> list[_TokenizedFormatString]: 562 self._reload_if_changed() 563 return super().lookup(token) 564 565 566class NestedMessageParser: 567 """Parses nested tokenized messages from a byte stream or string.""" 568 569 class _State(enum.Enum): 570 MESSAGE = 1 571 NON_MESSAGE = 2 572 573 def __init__( 574 self, 575 prefix: str | bytes = NESTED_TOKEN_PREFIX, 576 chars: str | bytes = _BASE64_CHARS, 577 ) -> None: 578 """Initializes a parser. 579 580 Args: 581 prefix: one character that signifies the start of a message (``$``). 582 chars: characters allowed in a message 583 """ 584 self._prefix = ord(prefix) 585 586 if isinstance(chars, str): 587 chars = chars.encode() 588 589 # Store the valid message bytes as a set of byte values. 590 self._message_bytes = frozenset(chars) 591 592 if len(prefix) != 1 or self._prefix in self._message_bytes: 593 raise ValueError( 594 f'Invalid prefix {prefix!r}: the prefix must be a single ' 595 'character that is not a valid message character.' 596 ) 597 598 self._buffer = bytearray() 599 self._state: NestedMessageParser._State = self._State.NON_MESSAGE 600 601 def read_messages_io( 602 self, binary_io: io.RawIOBase | BinaryIO 603 ) -> Iterator[tuple[bool, bytes]]: 604 """Reads prefixed messages from a byte stream (BinaryIO object). 605 606 Reads until EOF. If the stream is nonblocking (``read(1)`` returns 607 ``None``), then this function returns and may be called again with the 608 same IO object to continue parsing. Partial messages are preserved 609 between calls. 610 611 Yields: 612 ``(is_message, contents)`` chunks. 613 """ 614 # The read may block indefinitely, depending on the IO object. 615 while (read_byte := binary_io.read(1)) != b'': 616 # Handle non-blocking IO by returning when no bytes are available. 617 if read_byte is None: 618 return 619 620 for byte in read_byte: 621 yield from self._handle_byte(byte) 622 623 if self._state is self._State.NON_MESSAGE: # yield non-message byte 624 yield from self._flush() 625 626 yield from self._flush() # Always flush after EOF 627 self._state = self._State.NON_MESSAGE 628 629 def read_messages( 630 self, chunk: bytes, *, flush: bool = False 631 ) -> Iterator[tuple[bool, bytes]]: 632 """Reads prefixed messages from a byte string. 633 634 This function may be called repeatedly with chunks of a stream. Partial 635 messages are preserved between calls, unless ``flush=True``. 636 637 Args: 638 chunk: byte string that may contain nested messagses 639 flush: whether to flush any incomplete messages after processing 640 this chunk 641 642 Yields: 643 ``(is_message, contents)`` chunks. 644 """ 645 for byte in chunk: 646 yield from self._handle_byte(byte) 647 648 if flush or self._state is self._State.NON_MESSAGE: 649 yield from self._flush() 650 651 def _handle_byte(self, byte: int) -> Iterator[tuple[bool, bytes]]: 652 if self._state is self._State.MESSAGE: 653 if byte not in self._message_bytes: 654 yield from self._flush() 655 if byte != self._prefix: 656 self._state = self._State.NON_MESSAGE 657 elif self._state is self._State.NON_MESSAGE: 658 if byte == self._prefix: 659 yield from self._flush() 660 self._state = self._State.MESSAGE 661 else: 662 raise NotImplementedError(f'Unsupported state: {self._state}') 663 664 self._buffer.append(byte) 665 666 def _flush(self) -> Iterator[tuple[bool, bytes]]: 667 data = bytes(self._buffer) 668 self._buffer.clear() 669 if data: 670 yield self._state is self._State.MESSAGE, data 671 672 def transform_io( 673 self, 674 binary_io: io.RawIOBase | BinaryIO, 675 transform: Callable[[bytes], bytes], 676 ) -> Iterator[bytes]: 677 """Yields the file with a transformation applied to the messages.""" 678 for is_message, chunk in self.read_messages_io(binary_io): 679 yield transform(chunk) if is_message else chunk 680 681 def transform( 682 self, 683 chunk: bytes, 684 transform: Callable[[bytes], bytes], 685 *, 686 flush: bool = False, 687 ) -> bytes: 688 """Yields the chunk with a transformation applied to the messages. 689 690 Partial messages are preserved between calls unless ``flush=True``. 691 """ 692 return b''.join( 693 transform(data) if is_message else data 694 for is_message, data in self.read_messages(chunk, flush=flush) 695 ) 696 697 698# TODO(hepler): Remove this unnecessary function. 699def detokenize_base64( 700 detokenizer: Detokenizer, 701 data: bytes, 702 prefix: str | bytes = NESTED_TOKEN_PREFIX, 703 recursion: int = DEFAULT_RECURSION, 704) -> bytes: 705 """Alias for detokenizer.detokenize_base64 for backwards compatibility. 706 707 This function is deprecated; do not call it. 708 """ 709 return detokenizer.detokenize_base64(data, prefix, recursion) 710 711 712def _follow_and_detokenize_file( 713 detokenizer: Detokenizer, 714 file: BinaryIO, 715 output: BinaryIO, 716 prefix: str | bytes, 717 poll_period_s: float = 0.01, 718) -> None: 719 """Polls a file to detokenize it and any appended data.""" 720 721 try: 722 while True: 723 data = file.read() 724 if data: 725 detokenizer.detokenize_base64_to_file(data, output, prefix) 726 output.flush() 727 else: 728 time.sleep(poll_period_s) 729 except KeyboardInterrupt: 730 pass 731 732 733def _handle_base64( 734 databases, 735 input_file: BinaryIO, 736 output: BinaryIO, 737 prefix: str, 738 show_errors: bool, 739 follow: bool, 740) -> None: 741 """Handles the base64 command line option.""" 742 # argparse.FileType doesn't correctly handle - for binary files. 743 if input_file is sys.stdin: 744 input_file = sys.stdin.buffer 745 746 if output is sys.stdout: 747 output = sys.stdout.buffer 748 749 detokenizer = Detokenizer( 750 tokens.Database.merged(*databases), show_errors=show_errors 751 ) 752 753 if follow: 754 _follow_and_detokenize_file(detokenizer, input_file, output, prefix) 755 elif input_file.seekable(): 756 # Process seekable files all at once, which is MUCH faster. 757 detokenizer.detokenize_base64_to_file(input_file.read(), output, prefix) 758 else: 759 # For non-seekable inputs (e.g. pipes), read one character at a time. 760 detokenizer.detokenize_base64_live(input_file, output, prefix) 761 762 763def _parse_args() -> argparse.Namespace: 764 """Parses and return command line arguments.""" 765 766 parser = argparse.ArgumentParser( 767 description=__doc__, 768 formatter_class=argparse.RawDescriptionHelpFormatter, 769 ) 770 parser.set_defaults(handler=lambda **_: parser.print_help()) 771 772 subparsers = parser.add_subparsers(help='Encoding of the input.') 773 774 base64_help = 'Detokenize Base64-encoded data from a file or stdin.' 775 subparser = subparsers.add_parser( 776 'base64', 777 description=base64_help, 778 parents=[database.token_databases_parser()], 779 help=base64_help, 780 ) 781 subparser.set_defaults(handler=_handle_base64) 782 subparser.add_argument( 783 '-i', 784 '--input', 785 dest='input_file', 786 type=argparse.FileType('rb'), 787 default=sys.stdin.buffer, 788 help='The file from which to read; provide - or omit for stdin.', 789 ) 790 subparser.add_argument( 791 '-f', 792 '--follow', 793 action='store_true', 794 help=( 795 'Detokenize data appended to input_file as it grows; similar to ' 796 'tail -f.' 797 ), 798 ) 799 subparser.add_argument( 800 '-o', 801 '--output', 802 type=argparse.FileType('wb'), 803 default=sys.stdout.buffer, 804 help=( 805 'The file to which to write the output; ' 806 'provide - or omit for stdout.' 807 ), 808 ) 809 subparser.add_argument( 810 '-p', 811 '--prefix', 812 default=NESTED_TOKEN_PREFIX, 813 help=( 814 'The one-character prefix that signals the start of a ' 815 'nested tokenized message. (default: $)' 816 ), 817 ) 818 subparser.add_argument( 819 '-s', 820 '--show_errors', 821 action='store_true', 822 help=( 823 'Show error messages instead of conversion specifiers when ' 824 'arguments cannot be decoded.' 825 ), 826 ) 827 828 return parser.parse_args() 829 830 831def main() -> int: 832 args = _parse_args() 833 834 handler = args.handler 835 del args.handler 836 837 handler(**vars(args)) 838 return 0 839 840 841if __name__ == '__main__': 842 if sys.version_info[0] < 3: 843 sys.exit('ERROR: The detokenizer command line tools require Python 3.') 844 sys.exit(main()) 845