• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# Copyright 2020 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15r"""Decodes and detokenizes strings from binary or Base64 input.
16
17The main class provided by this module is the Detokenize class. To use it,
18construct it with the path to an ELF or CSV database, a tokens.Database,
19or a file object for an ELF file or CSV. Then, call the detokenize method with
20encoded messages, one at a time. The detokenize method returns a
21DetokenizedString object with the result.
22
23For example::
24
25  from pw_tokenizer import detokenize
26
27  detok = detokenize.Detokenizer('path/to/firmware/image.elf')
28  print(detok.detokenize(b'\x12\x34\x56\x78\x03hi!'))
29
30This module also provides a command line interface for decoding and detokenizing
31messages from a file or stdin.
32"""
33
34import argparse
35import base64
36import binascii
37from concurrent.futures import Executor, ThreadPoolExecutor
38import enum
39import io
40import logging
41import os
42from pathlib import Path
43import re
44import string
45import struct
46import sys
47import threading
48import time
49from typing import (
50    AnyStr,
51    BinaryIO,
52    Callable,
53    Iterable,
54    Iterator,
55    Match,
56    NamedTuple,
57    Pattern,
58)
59
60try:
61    from pw_tokenizer import database, decode, encode, tokens
62except ImportError:
63    # Append this path to the module search path to allow running this module
64    # without installing the pw_tokenizer package.
65    sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
66    from pw_tokenizer import database, decode, encode, tokens
67
68_LOG = logging.getLogger('pw_tokenizer')
69
70ENCODED_TOKEN = struct.Struct('<I')
71_BASE64_CHARS = string.ascii_letters + string.digits + '+/-_='
72DEFAULT_RECURSION = 9
73NESTED_TOKEN_PREFIX = encode.NESTED_TOKEN_PREFIX.encode()
74NESTED_TOKEN_BASE_PREFIX = encode.NESTED_TOKEN_BASE_PREFIX.encode()
75
76_BASE8_TOKEN_REGEX = rb'(?P<base8>[0-7]{11})'
77_BASE10_TOKEN_REGEX = rb'(?P<base10>[0-9]{10})'
78_BASE16_TOKEN_REGEX = rb'(?P<base16>[A-Fa-f0-9]{8})'
79_BASE64_TOKEN_REGEX = (
80    rb'(?P<base64>'
81    # Tokenized Base64 contains 0 or more blocks of four Base64 chars.
82    rb'(?:[A-Za-z0-9+/\-_]{4})*'
83    # The last block of 4 chars may have one or two padding chars (=).
84    rb'(?:[A-Za-z0-9+/\-_]{3}=|[A-Za-z0-9+/\-_]{2}==)?'
85    rb')'
86)
87_NESTED_TOKEN_FORMATS = (
88    _BASE8_TOKEN_REGEX,
89    _BASE10_TOKEN_REGEX,
90    _BASE16_TOKEN_REGEX,
91    _BASE64_TOKEN_REGEX,
92)
93
94
95def _token_regex(prefix: bytes) -> Pattern[bytes]:
96    """Returns a regular expression for prefixed tokenized strings."""
97    return re.compile(
98        # Tokenized strings start with the prefix character ($).
99        re.escape(prefix)
100        # Optional; no base specifier defaults to BASE64.
101        # Hash (#) with no number specified defaults to Base-16.
102        + rb'(?P<basespec>(?P<base>[0-9]*)?'
103        + NESTED_TOKEN_BASE_PREFIX
104        + rb')?'
105        # Match one of the following token formats.
106        + rb'('
107        + rb'|'.join(_NESTED_TOKEN_FORMATS)
108        + rb')'
109    )
110
111
112class DetokenizedString:
113    """A detokenized string, with all results if there are collisions."""
114
115    def __init__(
116        self,
117        token: int | None,
118        format_string_entries: Iterable[tuple],
119        encoded_message: bytes,
120        show_errors: bool = False,
121        recursive_detokenize: Callable[[str], str] | None = None,
122    ):
123        self.token = token
124        self.encoded_message = encoded_message
125        self._show_errors = show_errors
126
127        self.successes: list[decode.FormattedString] = []
128        self.failures: list[decode.FormattedString] = []
129
130        decode_attempts: list[tuple[tuple, decode.FormattedString]] = []
131
132        for entry, fmt in format_string_entries:
133            result = fmt.format(
134                encoded_message[ENCODED_TOKEN.size :], show_errors
135            )
136            if recursive_detokenize:
137                result = decode.FormattedString(
138                    recursive_detokenize(result.value),
139                    result.args,
140                    result.remaining,
141                )
142            decode_attempts.append((result.score(entry.date_removed), result))
143
144        # Sort the attempts by the score so the most likely results are first.
145        decode_attempts.sort(key=lambda value: value[0], reverse=True)
146
147        # Split out the successesful decodes from the failures.
148        for score, result in decode_attempts:
149            if score[0]:
150                self.successes.append(result)
151            else:
152                self.failures.append(result)
153
154    def ok(self) -> bool:
155        """True if exactly one string decoded the arguments successfully."""
156        return len(self.successes) == 1
157
158    def matches(self) -> list[decode.FormattedString]:
159        """Returns the strings that matched the token, best matches first."""
160        return self.successes + self.failures
161
162    def best_result(self) -> decode.FormattedString | None:
163        """Returns the string and args for the most likely decoded string."""
164        for string_and_args in self.matches():
165            return string_and_args
166
167        return None
168
169    def error_message(self) -> str:
170        """If detokenization failed, returns a descriptive message."""
171        if self.ok():
172            return ''
173
174        if not self.matches():
175            if self.token is None:
176                return 'missing token'
177
178            return 'unknown token {:08x}'.format(self.token)
179
180        if len(self.matches()) == 1:
181            return 'decoding failed for {!r}'.format(self.matches()[0].value)
182
183        return '{} matches'.format(len(self.matches()))
184
185    def __str__(self) -> str:
186        """Returns the string for the most likely result."""
187        result = self.best_result()
188        if result:
189            return result[0]
190
191        if self._show_errors:
192            return '<[ERROR: {}|{!r}]>'.format(
193                self.error_message(), self.encoded_message
194            )
195
196        # Display the string as prefixed Base64 if it cannot be decoded.
197        return encode.prefixed_base64(self.encoded_message)
198
199    def __repr__(self) -> str:
200        if self.ok():
201            message = repr(str(self))
202        else:
203            message = 'ERROR: {}|{!r}'.format(
204                self.error_message(), self.encoded_message
205            )
206
207        return '{}({})'.format(type(self).__name__, message)
208
209
210class _TokenizedFormatString(NamedTuple):
211    entry: tokens.TokenizedStringEntry
212    format: decode.FormatString
213
214
215class Detokenizer:
216    """Main detokenization class; detokenizes strings and caches results."""
217
218    def __init__(self, *token_database_or_elf, show_errors: bool = False):
219        """Decodes and detokenizes binary messages.
220
221        Args:
222          *token_database_or_elf: a path or file object for an ELF or CSV
223              database, a tokens.Database, or an elf_reader.Elf
224          show_errors: if True, an error message is used in place of the %
225              conversion specifier when an argument fails to decode
226        """
227        self.show_errors = show_errors
228
229        self._database_lock = threading.Lock()
230
231        # Cache FormatStrings for faster lookup & formatting.
232        self._cache: dict[int, list[_TokenizedFormatString]] = {}
233
234        self._initialize_database(token_database_or_elf)
235
236    def _initialize_database(self, token_sources: Iterable) -> None:
237        with self._database_lock:
238            self.database = database.load_token_database(*token_sources)
239            self._cache.clear()
240
241    def lookup(self, token: int) -> list[_TokenizedFormatString]:
242        """Returns (TokenizedStringEntry, FormatString) list for matches."""
243        with self._database_lock:
244            try:
245                return self._cache[token]
246            except KeyError:
247                format_strings = [
248                    _TokenizedFormatString(
249                        entry, decode.FormatString(str(entry))
250                    )
251                    for entry in self.database.token_to_entries[token]
252                ]
253                self._cache[token] = format_strings
254                return format_strings
255
256    def detokenize(
257        self,
258        encoded_message: bytes,
259        prefix: str | bytes = NESTED_TOKEN_PREFIX,
260        recursion: int = DEFAULT_RECURSION,
261    ) -> DetokenizedString:
262        """Decodes and detokenizes a message as a DetokenizedString."""
263        if not encoded_message:
264            return DetokenizedString(
265                None, (), encoded_message, self.show_errors
266            )
267
268        # Pad messages smaller than ENCODED_TOKEN.size with zeroes to support
269        # tokens smaller than a uint32. Messages with arguments must always use
270        # a full 32-bit token.
271        missing_token_bytes = ENCODED_TOKEN.size - len(encoded_message)
272        if missing_token_bytes > 0:
273            encoded_message += b'\0' * missing_token_bytes
274
275        (token,) = ENCODED_TOKEN.unpack_from(encoded_message)
276
277        recursive_detokenize = None
278        if recursion > 0:
279            recursive_detokenize = self._detokenize_nested_callback(
280                prefix, recursion
281            )
282
283        return DetokenizedString(
284            token,
285            self.lookup(token),
286            encoded_message,
287            self.show_errors,
288            recursive_detokenize,
289        )
290
291    def detokenize_text(
292        self,
293        data: AnyStr,
294        prefix: str | bytes = NESTED_TOKEN_PREFIX,
295        recursion: int = DEFAULT_RECURSION,
296    ) -> AnyStr:
297        """Decodes and replaces prefixed Base64 messages in the provided data.
298
299        Args:
300          data: the binary data to decode
301          prefix: one-character byte string that signals the start of a message
302          recursion: how many levels to recursively decode
303
304        Returns:
305          copy of the data with all recognized tokens decoded
306        """
307        return self._detokenize_nested_callback(prefix, recursion)(data)
308
309    # TODO(gschen): remove unnecessary function
310    def detokenize_base64(
311        self,
312        data: AnyStr,
313        prefix: str | bytes = NESTED_TOKEN_PREFIX,
314        recursion: int = DEFAULT_RECURSION,
315    ) -> AnyStr:
316        """Alias of detokenize_text for backwards compatibility."""
317        return self.detokenize_text(data, prefix, recursion)
318
319    def detokenize_text_to_file(
320        self,
321        data: AnyStr,
322        output: BinaryIO,
323        prefix: str | bytes = NESTED_TOKEN_PREFIX,
324        recursion: int = DEFAULT_RECURSION,
325    ) -> None:
326        """Decodes prefixed Base64 messages in data; decodes to output file."""
327        output.write(self._detokenize_nested(data, prefix, recursion))
328
329    # TODO(gschen): remove unnecessary function
330    def detokenize_base64_to_file(
331        self,
332        data: AnyStr,
333        output: BinaryIO,
334        prefix: str | bytes = NESTED_TOKEN_PREFIX,
335        recursion: int = DEFAULT_RECURSION,
336    ) -> None:
337        """Alias of detokenize_text_to_file for backwards compatibility."""
338        self.detokenize_text_to_file(data, output, prefix, recursion)
339
340    def detokenize_text_live(
341        self,
342        input_file: io.RawIOBase | BinaryIO,
343        output: BinaryIO,
344        prefix: str | bytes = NESTED_TOKEN_PREFIX,
345        recursion: int = DEFAULT_RECURSION,
346    ) -> None:
347        """Reads chars one-at-a-time, decoding messages; SLOW for big files."""
348
349        def transform(data: bytes) -> bytes:
350            return self._detokenize_nested(data.decode(), prefix, recursion)
351
352        for message in NestedMessageParser(prefix, _BASE64_CHARS).transform_io(
353            input_file, transform
354        ):
355            output.write(message)
356
357            # Flush each line to prevent delays when piping between processes.
358            if b'\n' in message:
359                output.flush()
360
361    # TODO(gschen): remove unnecessary function
362    def detokenize_base64_live(
363        self,
364        input_file: io.RawIOBase | BinaryIO,
365        output: BinaryIO,
366        prefix: str | bytes = NESTED_TOKEN_PREFIX,
367        recursion: int = DEFAULT_RECURSION,
368    ) -> None:
369        """Alias of detokenize_text_live for backwards compatibility."""
370        self.detokenize_text_live(input_file, output, prefix, recursion)
371
372    def _detokenize_nested_callback(
373        self,
374        prefix: str | bytes,
375        recursion: int,
376    ) -> Callable[[AnyStr], AnyStr]:
377        """Returns a function that replaces all tokens for a given string."""
378
379        def detokenize(message: AnyStr) -> AnyStr:
380            result = self._detokenize_nested(message, prefix, recursion)
381            return result.decode() if isinstance(message, str) else result
382
383        return detokenize
384
385    def _detokenize_nested(
386        self,
387        message: str | bytes,
388        prefix: str | bytes,
389        recursion: int,
390    ) -> bytes:
391        """Returns the message with recognized tokens replaced.
392
393        Message data is internally handled as bytes regardless of input message
394        type and returns the result as bytes.
395        """
396        # A unified format across the token types is required for regex
397        # consistency.
398        message = message.encode() if isinstance(message, str) else message
399        prefix = prefix.encode() if isinstance(prefix, str) else prefix
400
401        if not self.database:
402            return message
403
404        result = message
405        for _ in range(recursion - 1):
406            result = _token_regex(prefix).sub(self._detokenize_scan, result)
407
408            if result == message:
409                return result
410        return result
411
412    def _detokenize_scan(self, match: Match[bytes]) -> bytes:
413        """Decodes prefixed tokens for one of multiple formats."""
414        basespec = match.group('basespec')
415        base = match.group('base')
416
417        if not basespec or (base == b'64'):
418            return self._detokenize_once_base64(match)
419
420        if not base:
421            base = b'16'
422
423        return self._detokenize_once(match, base)
424
425    def _detokenize_once(
426        self,
427        match: Match[bytes],
428        base: bytes,
429    ) -> bytes:
430        """Performs lookup on a plain token"""
431        original = match.group(0)
432        token = match.group('base' + base.decode())
433        if not token:
434            return original
435
436        token = int(token, int(base))
437        entries = self.database.token_to_entries[token]
438
439        if len(entries) == 1:
440            return str(entries[0]).encode()
441
442        # TODO(gschen): improve token collision reporting
443
444        return original
445
446    def _detokenize_once_base64(
447        self,
448        match: Match[bytes],
449    ) -> bytes:
450        """Performs lookup on a Base64 token"""
451        original = match.group(0)
452
453        try:
454            encoded_token = match.group('base64')
455            if not encoded_token:
456                return original
457
458            detokenized_string = self.detokenize(
459                base64.b64decode(encoded_token, validate=True), recursion=0
460            )
461
462            if detokenized_string.matches():
463                return str(detokenized_string).encode()
464
465        except binascii.Error:
466            pass
467
468        return original
469
470
471# TODO: b/265334753 - Reuse this function in database.py:LoadTokenDatabases
472def _parse_domain(path: Path | str) -> tuple[Path, Pattern[str] | None]:
473    """Extracts an optional domain regex pattern suffix from a path"""
474
475    if isinstance(path, Path):
476        path = str(path)
477
478    delimiters = path.count('#')
479
480    if delimiters == 0:
481        return Path(path), None
482
483    if delimiters == 1:
484        path, domain = path.split('#')
485        return Path(path), re.compile(domain)
486
487    raise ValueError(
488        f'Too many # delimiters. Expected 0 or 1, found {delimiters}'
489    )
490
491
492class AutoUpdatingDetokenizer(Detokenizer):
493    """Loads and updates a detokenizer from database paths."""
494
495    class _DatabasePath:
496        """Tracks the modified time of a path or file object."""
497
498        def __init__(self, path: Path | str) -> None:
499            self.path, self.domain = _parse_domain(path)
500            self._modified_time: float | None = self._last_modified_time()
501
502        def updated(self) -> bool:
503            """True if the path has been updated since the last call."""
504            modified_time = self._last_modified_time()
505            if modified_time is None or modified_time == self._modified_time:
506                return False
507
508            self._modified_time = modified_time
509            return True
510
511        def _last_modified_time(self) -> float | None:
512            if self.path.is_dir():
513                mtime = -1.0
514                for child in self.path.glob(tokens.DIR_DB_GLOB):
515                    mtime = max(mtime, os.path.getmtime(child))
516                return mtime if mtime >= 0 else None
517
518            try:
519                return os.path.getmtime(self.path)
520            except FileNotFoundError:
521                return None
522
523        def load(self) -> tokens.Database:
524            try:
525                if self.domain is not None:
526                    return database.load_token_database(
527                        self.path, domain=self.domain
528                    )
529                return database.load_token_database(self.path)
530            except FileNotFoundError:
531                return database.load_token_database()
532
533    def __init__(
534        self,
535        *paths_or_files: Path | str,
536        min_poll_period_s: float = 1.0,
537        pool: Executor = ThreadPoolExecutor(max_workers=1),
538    ) -> None:
539        self.paths = tuple(self._DatabasePath(path) for path in paths_or_files)
540        self.min_poll_period_s = min_poll_period_s
541        self._last_checked_time: float = time.time()
542        # Thread pool to use for loading the databases. Limit to a single
543        # worker since this is low volume and not time critical.
544        self._pool = pool
545        super().__init__(*(path.load() for path in self.paths))
546
547    def __del__(self) -> None:
548        self._pool.shutdown(wait=False)
549
550    def _reload_paths(self) -> None:
551        self._initialize_database([path.load() for path in self.paths])
552
553    def _reload_if_changed(self) -> None:
554        if time.time() - self._last_checked_time >= self.min_poll_period_s:
555            self._last_checked_time = time.time()
556
557            if any(path.updated() for path in self.paths):
558                _LOG.info('Changes detected; reloading token database')
559                self._pool.submit(self._reload_paths)
560
561    def lookup(self, token: int) -> list[_TokenizedFormatString]:
562        self._reload_if_changed()
563        return super().lookup(token)
564
565
566class NestedMessageParser:
567    """Parses nested tokenized messages from a byte stream or string."""
568
569    class _State(enum.Enum):
570        MESSAGE = 1
571        NON_MESSAGE = 2
572
573    def __init__(
574        self,
575        prefix: str | bytes = NESTED_TOKEN_PREFIX,
576        chars: str | bytes = _BASE64_CHARS,
577    ) -> None:
578        """Initializes a parser.
579
580        Args:
581            prefix: one character that signifies the start of a message (``$``).
582            chars: characters allowed in a message
583        """
584        self._prefix = ord(prefix)
585
586        if isinstance(chars, str):
587            chars = chars.encode()
588
589        # Store the valid message bytes as a set of byte values.
590        self._message_bytes = frozenset(chars)
591
592        if len(prefix) != 1 or self._prefix in self._message_bytes:
593            raise ValueError(
594                f'Invalid prefix {prefix!r}: the prefix must be a single '
595                'character that is not a valid message character.'
596            )
597
598        self._buffer = bytearray()
599        self._state: NestedMessageParser._State = self._State.NON_MESSAGE
600
601    def read_messages_io(
602        self, binary_io: io.RawIOBase | BinaryIO
603    ) -> Iterator[tuple[bool, bytes]]:
604        """Reads prefixed messages from a byte stream (BinaryIO object).
605
606        Reads until EOF. If the stream is nonblocking (``read(1)`` returns
607        ``None``), then this function returns and may be called again with the
608        same IO object to continue parsing. Partial messages are preserved
609        between calls.
610
611        Yields:
612            ``(is_message, contents)`` chunks.
613        """
614        # The read may block indefinitely, depending on the IO object.
615        while (read_byte := binary_io.read(1)) != b'':
616            # Handle non-blocking IO by returning when no bytes are available.
617            if read_byte is None:
618                return
619
620            for byte in read_byte:
621                yield from self._handle_byte(byte)
622
623            if self._state is self._State.NON_MESSAGE:  # yield non-message byte
624                yield from self._flush()
625
626        yield from self._flush()  # Always flush after EOF
627        self._state = self._State.NON_MESSAGE
628
629    def read_messages(
630        self, chunk: bytes, *, flush: bool = False
631    ) -> Iterator[tuple[bool, bytes]]:
632        """Reads prefixed messages from a byte string.
633
634        This function may be called repeatedly with chunks of a stream. Partial
635        messages are preserved between calls, unless ``flush=True``.
636
637        Args:
638            chunk: byte string that may contain nested messagses
639            flush: whether to flush any incomplete messages after processing
640                this chunk
641
642        Yields:
643            ``(is_message, contents)`` chunks.
644        """
645        for byte in chunk:
646            yield from self._handle_byte(byte)
647
648        if flush or self._state is self._State.NON_MESSAGE:
649            yield from self._flush()
650
651    def _handle_byte(self, byte: int) -> Iterator[tuple[bool, bytes]]:
652        if self._state is self._State.MESSAGE:
653            if byte not in self._message_bytes:
654                yield from self._flush()
655                if byte != self._prefix:
656                    self._state = self._State.NON_MESSAGE
657        elif self._state is self._State.NON_MESSAGE:
658            if byte == self._prefix:
659                yield from self._flush()
660                self._state = self._State.MESSAGE
661        else:
662            raise NotImplementedError(f'Unsupported state: {self._state}')
663
664        self._buffer.append(byte)
665
666    def _flush(self) -> Iterator[tuple[bool, bytes]]:
667        data = bytes(self._buffer)
668        self._buffer.clear()
669        if data:
670            yield self._state is self._State.MESSAGE, data
671
672    def transform_io(
673        self,
674        binary_io: io.RawIOBase | BinaryIO,
675        transform: Callable[[bytes], bytes],
676    ) -> Iterator[bytes]:
677        """Yields the file with a transformation applied to the messages."""
678        for is_message, chunk in self.read_messages_io(binary_io):
679            yield transform(chunk) if is_message else chunk
680
681    def transform(
682        self,
683        chunk: bytes,
684        transform: Callable[[bytes], bytes],
685        *,
686        flush: bool = False,
687    ) -> bytes:
688        """Yields the chunk with a transformation applied to the messages.
689
690        Partial messages are preserved between calls unless ``flush=True``.
691        """
692        return b''.join(
693            transform(data) if is_message else data
694            for is_message, data in self.read_messages(chunk, flush=flush)
695        )
696
697
698# TODO(hepler): Remove this unnecessary function.
699def detokenize_base64(
700    detokenizer: Detokenizer,
701    data: bytes,
702    prefix: str | bytes = NESTED_TOKEN_PREFIX,
703    recursion: int = DEFAULT_RECURSION,
704) -> bytes:
705    """Alias for detokenizer.detokenize_base64 for backwards compatibility.
706
707    This function is deprecated; do not call it.
708    """
709    return detokenizer.detokenize_base64(data, prefix, recursion)
710
711
712def _follow_and_detokenize_file(
713    detokenizer: Detokenizer,
714    file: BinaryIO,
715    output: BinaryIO,
716    prefix: str | bytes,
717    poll_period_s: float = 0.01,
718) -> None:
719    """Polls a file to detokenize it and any appended data."""
720
721    try:
722        while True:
723            data = file.read()
724            if data:
725                detokenizer.detokenize_base64_to_file(data, output, prefix)
726                output.flush()
727            else:
728                time.sleep(poll_period_s)
729    except KeyboardInterrupt:
730        pass
731
732
733def _handle_base64(
734    databases,
735    input_file: BinaryIO,
736    output: BinaryIO,
737    prefix: str,
738    show_errors: bool,
739    follow: bool,
740) -> None:
741    """Handles the base64 command line option."""
742    # argparse.FileType doesn't correctly handle - for binary files.
743    if input_file is sys.stdin:
744        input_file = sys.stdin.buffer
745
746    if output is sys.stdout:
747        output = sys.stdout.buffer
748
749    detokenizer = Detokenizer(
750        tokens.Database.merged(*databases), show_errors=show_errors
751    )
752
753    if follow:
754        _follow_and_detokenize_file(detokenizer, input_file, output, prefix)
755    elif input_file.seekable():
756        # Process seekable files all at once, which is MUCH faster.
757        detokenizer.detokenize_base64_to_file(input_file.read(), output, prefix)
758    else:
759        # For non-seekable inputs (e.g. pipes), read one character at a time.
760        detokenizer.detokenize_base64_live(input_file, output, prefix)
761
762
763def _parse_args() -> argparse.Namespace:
764    """Parses and return command line arguments."""
765
766    parser = argparse.ArgumentParser(
767        description=__doc__,
768        formatter_class=argparse.RawDescriptionHelpFormatter,
769    )
770    parser.set_defaults(handler=lambda **_: parser.print_help())
771
772    subparsers = parser.add_subparsers(help='Encoding of the input.')
773
774    base64_help = 'Detokenize Base64-encoded data from a file or stdin.'
775    subparser = subparsers.add_parser(
776        'base64',
777        description=base64_help,
778        parents=[database.token_databases_parser()],
779        help=base64_help,
780    )
781    subparser.set_defaults(handler=_handle_base64)
782    subparser.add_argument(
783        '-i',
784        '--input',
785        dest='input_file',
786        type=argparse.FileType('rb'),
787        default=sys.stdin.buffer,
788        help='The file from which to read; provide - or omit for stdin.',
789    )
790    subparser.add_argument(
791        '-f',
792        '--follow',
793        action='store_true',
794        help=(
795            'Detokenize data appended to input_file as it grows; similar to '
796            'tail -f.'
797        ),
798    )
799    subparser.add_argument(
800        '-o',
801        '--output',
802        type=argparse.FileType('wb'),
803        default=sys.stdout.buffer,
804        help=(
805            'The file to which to write the output; '
806            'provide - or omit for stdout.'
807        ),
808    )
809    subparser.add_argument(
810        '-p',
811        '--prefix',
812        default=NESTED_TOKEN_PREFIX,
813        help=(
814            'The one-character prefix that signals the start of a '
815            'nested tokenized message. (default: $)'
816        ),
817    )
818    subparser.add_argument(
819        '-s',
820        '--show_errors',
821        action='store_true',
822        help=(
823            'Show error messages instead of conversion specifiers when '
824            'arguments cannot be decoded.'
825        ),
826    )
827
828    return parser.parse_args()
829
830
831def main() -> int:
832    args = _parse_args()
833
834    handler = args.handler
835    del args.handler
836
837    handler(**vars(args))
838    return 0
839
840
841if __name__ == '__main__':
842    if sys.version_info[0] < 3:
843        sys.exit('ERROR: The detokenizer command line tools require Python 3.')
844    sys.exit(main())
845