1# Copyright 2022 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Parses the arguments in a Base64-encoded tokenized message. 15 16This is useful for attempting to decode tokenized messages with arguments for 17which the token is not recognized. 18""" 19 20import argparse 21import base64 22from dataclasses import dataclass 23import logging 24import sys 25from typing import Collection, Iterable, Iterator, Sequence 26 27import pw_cli.log 28from pw_tokenizer.decode import FormatString, FormattedString 29 30_LOG: logging.Logger = logging.getLogger('pw_tokenizer') 31 32DEFAULT_FORMAT_SPECS = ( 33 '%s', 34 '%d', 35 '%f', 36) 37 38DEFAULT_MAX_ARGS = 8 39PREFIX = '$' 40 41 42def attempt_to_decode( 43 arg_data: bytes, 44 format_specs: Collection[str] = DEFAULT_FORMAT_SPECS, 45 max_args: int = DEFAULT_MAX_ARGS, 46 yield_failures: bool = False, 47) -> Iterator[FormattedString]: 48 """Attempts to decode arguments using the provided format specifiers.""" 49 format_strings = [(0, '')] # (argument count, format string) 50 51 # Each argument requires at least 1 byte. 52 max_args = min(max_args, len(arg_data)) 53 54 while format_strings: 55 arg_count, string = format_strings.pop(0) 56 decode_attempt = FormatString(string).format(arg_data) 57 58 if yield_failures or decode_attempt.ok(): 59 yield decode_attempt 60 61 if arg_count < max_args: 62 format_strings.extend( 63 (arg_count + 1, string + spec) for spec in format_specs 64 ) 65 66 67@dataclass(frozen=True) 68class TokenizedMessage: 69 string: str 70 binary: bytes 71 72 @property 73 def token(self) -> int: 74 return int.from_bytes(self.binary[:4], 'little') 75 76 @property 77 def binary_args(self) -> bytes: 78 return self.binary[4:] 79 80 @classmethod 81 def parse(cls, message: str, prefix: str = '$') -> 'TokenizedMessage': 82 if not message.startswith(prefix): 83 raise ValueError( 84 f'{message} does not start with {prefix!r} as expected' 85 ) 86 87 binary = base64.b64decode(message[1:]) 88 89 if len(binary) < 4: 90 raise ValueError( 91 f'{message} is only {len(binary)} bytes; ' 92 'tokenized messages must be at least 4 bytes' 93 ) 94 95 return cls(message, binary) 96 97 98def _read_stdin(): 99 try: 100 while True: 101 yield input() 102 except KeyboardInterrupt: 103 return 104 105 106def _text_list(items: Sequence, conjunction: str = 'or') -> str: 107 if len(items) == 1: 108 return str(items[0]) 109 110 return f'{", ".join(str(i) for i in items[:-1])} {conjunction} {items[-1]}' 111 112 113def main( 114 messages: Iterable[str], 115 max_args: int, 116 specs: Sequence[str], 117 show_failures: bool, 118) -> int: 119 """Parses the arguments for a series of tokenized messages.""" 120 exit_code = 0 121 122 for message in iter(messages) if messages else _read_stdin(): 123 if not message: 124 continue 125 126 if not message.startswith(PREFIX): 127 message = PREFIX + message 128 129 _LOG.info('Decoding arguments for %r', message) 130 try: 131 parsed = TokenizedMessage.parse(message) 132 except ValueError as exc: 133 _LOG.error('%s', exc) 134 exit_code = 2 135 continue 136 137 _LOG.info( 138 'Binary: %r [%s] (%d bytes)', 139 parsed.binary, 140 parsed.binary.hex(' ', 1), 141 len(parsed.binary), 142 ) 143 _LOG.info('Token: 0x%08x', parsed.token) 144 _LOG.info( 145 'Args: %r [%s] (%d bytes)', 146 parsed.binary_args, 147 parsed.binary_args.hex(' ', 1), 148 len(parsed.binary_args), 149 ) 150 _LOG.info( 151 'Decoding with up to %d %s arguments', max_args, _text_list(specs) 152 ) 153 154 results = sorted( 155 attempt_to_decode( 156 parsed.binary_args, specs, max_args, show_failures 157 ), 158 key=FormattedString.score, 159 reverse=True, 160 ) 161 162 if not any(result.ok() for result in results): 163 _LOG.warning( 164 ' No combinations of up to %d %s arguments decoded ' 165 'successfully', 166 max_args, 167 _text_list(specs), 168 ) 169 exit_code = 1 170 171 for i, result in enumerate(results, 1): 172 _LOG.info( # pylint: disable=logging-fstring-interpolation 173 f' Attempt %{len(str(len(results)))}d: [%s] %s', 174 i, 175 ' '.join(str(a.specifier) for a in result.args), 176 ' '.join(str(a) for a in result.args), 177 ) 178 print() 179 180 return exit_code 181 182 183def _parse_args() -> argparse.Namespace: 184 parser = argparse.ArgumentParser( 185 description=__doc__, 186 formatter_class=argparse.ArgumentDefaultsHelpFormatter, 187 ) 188 parser.add_argument( 189 '--max-args', 190 default=DEFAULT_MAX_ARGS, 191 type=int, 192 help='Maximum number of printf-style arguments', 193 ) 194 parser.add_argument( 195 '--specs', 196 nargs='*', 197 default=DEFAULT_FORMAT_SPECS, 198 help='Which printf-style format specifiers to check', 199 ) 200 parser.add_argument( 201 '--show-failures', 202 action='store_true', 203 help='Show argument combintations that fail to decode', 204 ) 205 parser.add_argument( 206 'messages', 207 nargs='*', 208 help=( 209 'Base64-encoded tokenized messages to decode; omit to read from ' 210 'stdin' 211 ), 212 ) 213 return parser.parse_args() 214 215 216if __name__ == '__main__': 217 pw_cli.log.install() 218 sys.exit(main(**vars(_parse_args()))) 219