• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# Copyright 2020 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15r"""Decodes and detokenizes strings from binary or Base64 input.
16
17The main class provided by this module is the Detokenize class. To use it,
18construct it with the path to an ELF or CSV database, a tokens.Database,
19or a file object for an ELF file or CSV. Then, call the detokenize method with
20encoded messages, one at a time. The detokenize method returns a
21DetokenizedString object with the result.
22
23For example,
24
25  from pw_tokenizer import detokenize
26
27  detok = detokenize.Detokenizer('path/to/my/image.elf')
28  print(detok.detokenize(b'\x12\x34\x56\x78\x03hi!'))
29
30This module also provides a command line interface for decoding and detokenizing
31messages from a file or stdin.
32"""
33
34import argparse
35import base64
36import binascii
37from datetime import datetime
38import io
39import logging
40import os
41from pathlib import Path
42import re
43import string
44import struct
45import sys
46import time
47from typing import (BinaryIO, Callable, Dict, List, Iterable, Iterator, Match,
48                    NamedTuple, Optional, Pattern, Tuple, Union)
49
50try:
51    from pw_tokenizer import database, decode, encode, tokens
52except ImportError:
53    # Append this path to the module search path to allow running this module
54    # without installing the pw_tokenizer package.
55    sys.path.append(os.path.dirname(os.path.dirname(
56        os.path.abspath(__file__))))
57    from pw_tokenizer import database, decode, encode, tokens
58
59ENCODED_TOKEN = struct.Struct('<I')
60_LOG = logging.getLogger('pw_tokenizer')
61
62
63class DetokenizedString:
64    """A detokenized string, with all results if there are collisions."""
65    def __init__(self,
66                 token: Optional[int],
67                 format_string_entries: Iterable[tuple],
68                 encoded_message: bytes,
69                 show_errors: bool = False):
70        self.token = token
71        self.encoded_message = encoded_message
72        self._show_errors = show_errors
73
74        self.successes: List[decode.FormattedString] = []
75        self.failures: List[decode.FormattedString] = []
76
77        decode_attempts: List[Tuple[Tuple, decode.FormattedString]] = []
78
79        for entry, fmt in format_string_entries:
80            result = fmt.format(encoded_message[ENCODED_TOKEN.size:],
81                                show_errors)
82
83            # Sort competing entries so the most likely matches appear first.
84            # Decoded strings are prioritized by whether they
85            #
86            #   1. decoded all bytes for all arguments without errors,
87            #   2. decoded all data,
88            #   3. have the fewest decoding errors,
89            #   4. decoded the most arguments successfully, or
90            #   5. have the most recent removal date, if they were removed.
91            #
92            # This must match the collision resolution logic in detokenize.cc.
93            score: Tuple = (
94                all(arg.ok() for arg in result.args) and not result.remaining,
95                not result.remaining,  # decoded all data
96                -sum(not arg.ok() for arg in result.args),  # fewest errors
97                len(result.args),  # decoded the most arguments
98                entry.date_removed or datetime.max)  # most recently present
99
100            decode_attempts.append((score, result))
101
102        # Sort the attempts by the score so the most likely results are first.
103        decode_attempts.sort(key=lambda value: value[0], reverse=True)
104
105        # Split out the successesful decodes from the failures.
106        for score, result in decode_attempts:
107            if score[0]:
108                self.successes.append(result)
109            else:
110                self.failures.append(result)
111
112    def ok(self) -> bool:
113        """True if exactly one string decoded the arguments successfully."""
114        return len(self.successes) == 1
115
116    def matches(self) -> List[decode.FormattedString]:
117        """Returns the strings that matched the token, best matches first."""
118        return self.successes + self.failures
119
120    def best_result(self) -> Optional[decode.FormattedString]:
121        """Returns the string and args for the most likely decoded string."""
122        for string_and_args in self.matches():
123            return string_and_args
124
125        return None
126
127    def error_message(self) -> str:
128        """If detokenization failed, returns a descriptive message."""
129        if self.ok():
130            return ''
131
132        if not self.matches():
133            if self.token is None:
134                return 'missing token'
135
136            return 'unknown token {:08x}'.format(self.token)
137
138        if len(self.matches()) == 1:
139            return 'decoding failed for {!r}'.format(self.matches()[0].value)
140
141        return '{} matches'.format(len(self.matches()))
142
143    def __str__(self) -> str:
144        """Returns the string for the most likely result."""
145        result = self.best_result()
146        if result:
147            return result[0]
148
149        if self._show_errors:
150            return '<[ERROR: {}|{!r}]>'.format(self.error_message(),
151                                               self.encoded_message)
152
153        # Display the string as prefixed Base64 if it cannot be decoded.
154        return encode.prefixed_base64(self.encoded_message)
155
156    def __repr__(self) -> str:
157        if self.ok():
158            message = repr(str(self))
159        else:
160            message = 'ERROR: {}|{!r}'.format(self.error_message(),
161                                              self.encoded_message)
162
163        return '{}({})'.format(type(self).__name__, message)
164
165
166class _TokenizedFormatString(NamedTuple):
167    entry: tokens.TokenizedStringEntry
168    format: decode.FormatString
169
170
171class Detokenizer:
172    """Main detokenization class; detokenizes strings and caches results."""
173    def __init__(self, *token_database_or_elf, show_errors: bool = False):
174        """Decodes and detokenizes binary messages.
175
176        Args:
177          *token_database_or_elf: a path or file object for an ELF or CSV
178              database, a tokens.Database, or an elf_reader.Elf
179          show_errors: if True, an error message is used in place of the %
180              conversion specifier when an argument fails to decode
181        """
182        self.database = database.load_token_database(*token_database_or_elf)
183        self.show_errors = show_errors
184
185        # Cache FormatStrings for faster lookup & formatting.
186        self._cache: Dict[int, List[_TokenizedFormatString]] = {}
187
188    def lookup(self, token: int) -> List[_TokenizedFormatString]:
189        """Returns (TokenizedStringEntry, FormatString) list for matches."""
190        try:
191            return self._cache[token]
192        except KeyError:
193            format_strings = [
194                _TokenizedFormatString(entry, decode.FormatString(str(entry)))
195                for entry in self.database.token_to_entries[token]
196            ]
197            self._cache[token] = format_strings
198            return format_strings
199
200    def detokenize(self, encoded_message: bytes) -> DetokenizedString:
201        """Decodes and detokenizes a message as a DetokenizedString."""
202        if len(encoded_message) < ENCODED_TOKEN.size:
203            return DetokenizedString(None, (), encoded_message,
204                                     self.show_errors)
205
206        token, = ENCODED_TOKEN.unpack_from(encoded_message)
207        return DetokenizedString(token, self.lookup(token), encoded_message,
208                                 self.show_errors)
209
210
211class AutoUpdatingDetokenizer:
212    """Loads and updates a detokenizer from database paths."""
213    class _DatabasePath:
214        """Tracks the modified time of a path or file object."""
215        def __init__(self, path):
216            self.path = path if isinstance(path, (str, Path)) else path.name
217            self._modified_time: Optional[float] = self._last_modified_time()
218
219        def updated(self) -> bool:
220            """True if the path has been updated since the last call."""
221            modified_time = self._last_modified_time()
222            if modified_time is None or modified_time == self._modified_time:
223                return False
224
225            self._modified_time = modified_time
226            return True
227
228        def _last_modified_time(self) -> Optional[float]:
229            try:
230                return os.path.getmtime(self.path)
231            except FileNotFoundError:
232                return None
233
234        def load(self) -> tokens.Database:
235            try:
236                return database.load_token_database(self.path)
237            except FileNotFoundError:
238                return database.load_token_database()
239
240    def __init__(self,
241                 *paths_or_files,
242                 min_poll_period_s: float = 1.0) -> None:
243        self.paths = tuple(self._DatabasePath(path) for path in paths_or_files)
244        self.min_poll_period_s = min_poll_period_s
245        self._last_checked_time: float = time.time()
246        self._detokenizer = Detokenizer(*(path.load() for path in self.paths))
247
248    def detokenize(self, data: bytes) -> DetokenizedString:
249        """Updates the token database if it has changed, then detokenizes."""
250        if time.time() - self._last_checked_time >= self.min_poll_period_s:
251            self._last_checked_time = time.time()
252
253            if any(path.updated() for path in self.paths):
254                _LOG.info('Changes detected; reloading token database')
255                self._detokenizer = Detokenizer(*(path.load()
256                                                  for path in self.paths))
257
258        return self._detokenizer.detokenize(data)
259
260
261_Detokenizer = Union[Detokenizer, AutoUpdatingDetokenizer]
262
263
264class PrefixedMessageDecoder:
265    """Parses messages that start with a prefix character from a byte stream."""
266    def __init__(self, prefix: Union[str, bytes], chars: Union[str, bytes]):
267        """Parses prefixed messages.
268
269        Args:
270          prefix: one character that signifies the start of a message
271          chars: characters allowed in a message
272        """
273        self._prefix = prefix.encode() if isinstance(prefix, str) else prefix
274
275        if isinstance(chars, str):
276            chars = chars.encode()
277
278        # Store the valid message bytes as a set of binary strings.
279        self._message_bytes = frozenset(chars[i:i + 1]
280                                        for i in range(len(chars)))
281
282        if len(self._prefix) != 1 or self._prefix in self._message_bytes:
283            raise ValueError(
284                'Invalid prefix {!r}: the prefix must be a single '
285                'character that is not a valid message character.'.format(
286                    prefix))
287
288        self.data = bytearray()
289
290    def _read_next(self, fd: BinaryIO) -> Tuple[bytes, int]:
291        """Returns the next character and its index."""
292        char = fd.read(1)
293        index = len(self.data)
294        self.data += char
295        return char, index
296
297    def read_messages(self,
298                      binary_fd: BinaryIO) -> Iterator[Tuple[bool, bytes]]:
299        """Parses prefixed messages; yields (is_message, contents) chunks."""
300        message_start = None
301
302        while True:
303            # This reads the file character-by-character. Non-message characters
304            # are yielded right away; message characters are grouped.
305            char, index = self._read_next(binary_fd)
306
307            # If in a message, keep reading until the message completes.
308            if message_start is not None:
309                if char in self._message_bytes:
310                    continue
311
312                yield True, self.data[message_start:index]
313                message_start = None
314
315            # Handle a non-message character.
316            if not char:
317                return
318
319            if char == self._prefix:
320                message_start = index
321            else:
322                yield False, char
323
324    def transform(self, binary_fd: BinaryIO,
325                  transform: Callable[[bytes], bytes]) -> Iterator[bytes]:
326        """Yields the file with a transformation applied to the messages."""
327        for is_message, chunk in self.read_messages(binary_fd):
328            yield transform(chunk) if is_message else chunk
329
330
331def _detokenize_prefixed_base64(
332        detokenizer: _Detokenizer, prefix: bytes,
333        recursion: int) -> Callable[[Match[bytes]], bytes]:
334    """Returns a function that decodes prefixed Base64 with the detokenizer."""
335    def decode_and_detokenize(match: Match[bytes]) -> bytes:
336        """Decodes prefixed base64 with the provided detokenizer."""
337        original = match.group(0)
338
339        try:
340            detokenized_string = detokenizer.detokenize(
341                base64.b64decode(original[1:], validate=True))
342            if detokenized_string.matches():
343                result = str(detokenized_string).encode()
344
345                if recursion > 0 and original != result:
346                    result = detokenize_base64(detokenizer, result, prefix,
347                                               recursion - 1)
348
349                return result
350        except binascii.Error:
351            pass
352
353        return original
354
355    return decode_and_detokenize
356
357
358BASE64_PREFIX = encode.BASE64_PREFIX.encode()
359DEFAULT_RECURSION = 9
360
361
362def _base64_message_regex(prefix: bytes) -> Pattern[bytes]:
363    """Returns a regular expression for prefixed base64 tokenized strings."""
364    return re.compile(
365        # Base64 tokenized strings start with the prefix character ($)
366        re.escape(prefix) + (
367            # Tokenized strings contain 0 or more blocks of four Base64 chars.
368            br'(?:[A-Za-z0-9+/\-_]{4})*'
369            # The last block of 4 chars may have one or two padding chars (=).
370            br'(?:[A-Za-z0-9+/\-_]{3}=|[A-Za-z0-9+/\-_]{2}==)?'))
371
372
373def detokenize_base64_live(detokenizer: _Detokenizer,
374                           input_file: BinaryIO,
375                           output: BinaryIO,
376                           prefix: Union[str, bytes] = BASE64_PREFIX,
377                           recursion: int = DEFAULT_RECURSION) -> None:
378    """Reads chars one-at-a-time and decodes messages; SLOW for big files."""
379    prefix_bytes = prefix.encode() if isinstance(prefix, str) else prefix
380
381    base64_message = _base64_message_regex(prefix_bytes)
382
383    def transform(data: bytes) -> bytes:
384        return base64_message.sub(
385            _detokenize_prefixed_base64(detokenizer, prefix_bytes, recursion),
386            data)
387
388    for message in PrefixedMessageDecoder(
389            prefix, string.ascii_letters + string.digits + '+/-_=').transform(
390                input_file, transform):
391        output.write(message)
392
393        # Flush each line to prevent delays when piping between processes.
394        if b'\n' in message:
395            output.flush()
396
397
398def detokenize_base64_to_file(detokenizer: _Detokenizer,
399                              data: bytes,
400                              output: BinaryIO,
401                              prefix: Union[str, bytes] = BASE64_PREFIX,
402                              recursion: int = DEFAULT_RECURSION) -> None:
403    """Decodes prefixed Base64 messages in data; decodes to an output file."""
404    prefix = prefix.encode() if isinstance(prefix, str) else prefix
405    output.write(
406        _base64_message_regex(prefix).sub(
407            _detokenize_prefixed_base64(detokenizer, prefix, recursion), data))
408
409
410def detokenize_base64(detokenizer: _Detokenizer,
411                      data: bytes,
412                      prefix: Union[str, bytes] = BASE64_PREFIX,
413                      recursion: int = DEFAULT_RECURSION) -> bytes:
414    """Decodes and replaces prefixed Base64 messages in the provided data.
415
416    Args:
417      detokenizer: the detokenizer with which to decode messages
418      data: the binary data to decode
419      prefix: one-character byte string that signals the start of a message
420      recursion: how many levels to recursively decode
421
422    Returns:
423      copy of the data with all recognized tokens decoded
424    """
425    output = io.BytesIO()
426    detokenize_base64_to_file(detokenizer, data, output, prefix, recursion)
427    return output.getvalue()
428
429
430def _follow_and_detokenize_file(detokenizer: _Detokenizer,
431                                file: BinaryIO,
432                                output: BinaryIO,
433                                prefix: Union[str, bytes],
434                                poll_period_s: float = 0.01) -> None:
435    """Polls a file to detokenize it and any appended data."""
436
437    try:
438        while True:
439            data = file.read()
440            if data:
441                detokenize_base64_to_file(detokenizer, data, output, prefix)
442                output.flush()
443            else:
444                time.sleep(poll_period_s)
445    except KeyboardInterrupt:
446        pass
447
448
449def _handle_base64(databases, input_file: BinaryIO, output: BinaryIO,
450                   prefix: str, show_errors: bool, follow: bool) -> None:
451    """Handles the base64 command line option."""
452    # argparse.FileType doesn't correctly handle - for binary files.
453    if input_file is sys.stdin:
454        input_file = sys.stdin.buffer
455
456    if output is sys.stdout:
457        output = sys.stdout.buffer
458
459    detokenizer = Detokenizer(tokens.Database.merged(*databases),
460                              show_errors=show_errors)
461
462    if follow:
463        _follow_and_detokenize_file(detokenizer, input_file, output, prefix)
464    elif input_file.seekable():
465        # Process seekable files all at once, which is MUCH faster.
466        detokenize_base64_to_file(detokenizer, input_file.read(), output,
467                                  prefix)
468    else:
469        # For non-seekable inputs (e.g. pipes), read one character at a time.
470        detokenize_base64_live(detokenizer, input_file, output, prefix)
471
472
473def _parse_args() -> argparse.Namespace:
474    """Parses and return command line arguments."""
475
476    parser = argparse.ArgumentParser(
477        description=__doc__,
478        formatter_class=argparse.RawDescriptionHelpFormatter)
479    parser.set_defaults(handler=lambda **_: parser.print_help())
480
481    subparsers = parser.add_subparsers(help='Encoding of the input.')
482
483    base64_help = 'Detokenize Base64-encoded data from a file or stdin.'
484    subparser = subparsers.add_parser(
485        'base64',
486        description=base64_help,
487        parents=[database.token_databases_parser()],
488        help=base64_help)
489    subparser.set_defaults(handler=_handle_base64)
490    subparser.add_argument(
491        '-i',
492        '--input',
493        dest='input_file',
494        type=argparse.FileType('rb'),
495        default=sys.stdin.buffer,
496        help='The file from which to read; provide - or omit for stdin.')
497    subparser.add_argument(
498        '-f',
499        '--follow',
500        action='store_true',
501        help=('Detokenize data appended to input_file as it grows; similar to '
502              'tail -f.'))
503    subparser.add_argument('-o',
504                           '--output',
505                           type=argparse.FileType('wb'),
506                           default=sys.stdout.buffer,
507                           help=('The file to which to write the output; '
508                                 'provide - or omit for stdout.'))
509    subparser.add_argument(
510        '-p',
511        '--prefix',
512        default=BASE64_PREFIX,
513        help=('The one-character prefix that signals the start of a '
514              'Base64-encoded message. (default: $)'))
515    subparser.add_argument(
516        '-s',
517        '--show_errors',
518        action='store_true',
519        help=('Show error messages instead of conversion specifiers when '
520              'arguments cannot be decoded.'))
521
522    return parser.parse_args()
523
524
525def main() -> int:
526    args = _parse_args()
527
528    handler = args.handler
529    del args.handler
530
531    handler(**vars(args))
532    return 0
533
534
535if __name__ == '__main__':
536    if sys.version_info[0] < 3:
537        sys.exit('ERROR: The detokenizer command line tools require Python 3.')
538    sys.exit(main())
539