• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2022 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Parses the arguments in a Base64-encoded tokenized message.
15
16This is useful for attempting to decode tokenized messages with arguments for
17which the token is not recognized.
18"""
19
20import argparse
21import base64
22from dataclasses import dataclass
23import logging
24import sys
25from typing import Collection, Iterable, Iterator, Sequence
26
27import pw_cli.log
28from pw_tokenizer.decode import FormatString, FormattedString
29
30_LOG: logging.Logger = logging.getLogger('pw_tokenizer')
31
32DEFAULT_FORMAT_SPECS = (
33    '%s',
34    '%d',
35    '%f',
36)
37
38DEFAULT_MAX_ARGS = 8
39PREFIX = '$'
40
41
42def attempt_to_decode(
43    arg_data: bytes,
44    format_specs: Collection[str] = DEFAULT_FORMAT_SPECS,
45    max_args: int = DEFAULT_MAX_ARGS,
46    yield_failures: bool = False,
47) -> Iterator[FormattedString]:
48    """Attempts to decode arguments using the provided format specifiers."""
49    format_strings = [(0, '')]  # (argument count, format string)
50
51    # Each argument requires at least 1 byte.
52    max_args = min(max_args, len(arg_data))
53
54    while format_strings:
55        arg_count, string = format_strings.pop(0)
56        decode_attempt = FormatString(string).format(arg_data)
57
58        if yield_failures or decode_attempt.ok():
59            yield decode_attempt
60
61        if arg_count < max_args:
62            format_strings.extend(
63                (arg_count + 1, string + spec) for spec in format_specs
64            )
65
66
67@dataclass(frozen=True)
68class TokenizedMessage:
69    string: str
70    binary: bytes
71
72    @property
73    def token(self) -> int:
74        return int.from_bytes(self.binary[:4], 'little')
75
76    @property
77    def binary_args(self) -> bytes:
78        return self.binary[4:]
79
80    @classmethod
81    def parse(cls, message: str, prefix: str = '$') -> 'TokenizedMessage':
82        if not message.startswith(prefix):
83            raise ValueError(
84                f'{message} does not start with {prefix!r} as expected'
85            )
86
87        binary = base64.b64decode(message[1:])
88
89        if len(binary) < 4:
90            raise ValueError(
91                f'{message} is only {len(binary)} bytes; '
92                'tokenized messages must be at least 4 bytes'
93            )
94
95        return cls(message, binary)
96
97
98def _read_stdin():
99    try:
100        while True:
101            yield input()
102    except KeyboardInterrupt:
103        return
104
105
106def _text_list(items: Sequence, conjunction: str = 'or') -> str:
107    if len(items) == 1:
108        return str(items[0])
109
110    return f'{", ".join(str(i) for i in items[:-1])} {conjunction} {items[-1]}'
111
112
113def main(
114    messages: Iterable[str],
115    max_args: int,
116    specs: Sequence[str],
117    show_failures: bool,
118) -> int:
119    """Parses the arguments for a series of tokenized messages."""
120    exit_code = 0
121
122    for message in iter(messages) if messages else _read_stdin():
123        if not message:
124            continue
125
126        if not message.startswith(PREFIX):
127            message = PREFIX + message
128
129        _LOG.info('Decoding arguments for %r', message)
130        try:
131            parsed = TokenizedMessage.parse(message)
132        except ValueError as exc:
133            _LOG.error('%s', exc)
134            exit_code = 2
135            continue
136
137        _LOG.info(
138            'Binary: %r [%s] (%d bytes)',
139            parsed.binary,
140            parsed.binary.hex(' ', 1),
141            len(parsed.binary),
142        )
143        _LOG.info('Token:  0x%08x', parsed.token)
144        _LOG.info(
145            'Args:   %r [%s] (%d bytes)',
146            parsed.binary_args,
147            parsed.binary_args.hex(' ', 1),
148            len(parsed.binary_args),
149        )
150        _LOG.info(
151            'Decoding with up to %d %s arguments', max_args, _text_list(specs)
152        )
153
154        results = sorted(
155            attempt_to_decode(
156                parsed.binary_args, specs, max_args, show_failures
157            ),
158            key=FormattedString.score,
159            reverse=True,
160        )
161
162        if not any(result.ok() for result in results):
163            _LOG.warning(
164                '  No combinations of up to %d %s arguments decoded '
165                'successfully',
166                max_args,
167                _text_list(specs),
168            )
169            exit_code = 1
170
171        for i, result in enumerate(results, 1):
172            _LOG.info(  # pylint: disable=logging-fstring-interpolation
173                f'  Attempt %{len(str(len(results)))}d: [%s] %s',
174                i,
175                ' '.join(str(a.specifier) for a in result.args),
176                ' '.join(str(a) for a in result.args),
177            )
178        print()
179
180    return exit_code
181
182
183def _parse_args() -> argparse.Namespace:
184    parser = argparse.ArgumentParser(
185        description=__doc__,
186        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
187    )
188    parser.add_argument(
189        '--max-args',
190        default=DEFAULT_MAX_ARGS,
191        type=int,
192        help='Maximum number of printf-style arguments',
193    )
194    parser.add_argument(
195        '--specs',
196        nargs='*',
197        default=DEFAULT_FORMAT_SPECS,
198        help='Which printf-style format specifiers to check',
199    )
200    parser.add_argument(
201        '--show-failures',
202        action='store_true',
203        help='Show argument combintations that fail to decode',
204    )
205    parser.add_argument(
206        'messages',
207        nargs='*',
208        help=(
209            'Base64-encoded tokenized messages to decode; omit to read from '
210            'stdin'
211        ),
212    )
213    return parser.parse_args()
214
215
216if __name__ == '__main__':
217    pw_cli.log.install()
218    sys.exit(main(**vars(_parse_args())))
219