• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Decodes arguments and formats tokenized messages.
15
16The decode(format_string, encoded_arguments) function provides a simple way to
17format a string with encoded arguments. The FormatString class may also be used.
18
19Missing, truncated, or otherwise corrupted arguments are handled and displayed
20in the resulting string with an error message.
21"""
22
23from datetime import datetime
24import re
25import struct
26from typing import Iterable, List, NamedTuple, Match, Sequence, Tuple
27
28
29def zigzag_decode(value: int) -> int:
30    """ZigZag decode function from protobuf's wire_format module."""
31    if not value & 0x1:
32        return value >> 1
33    return (value >> 1) ^ (~0)
34
35
36class FormatSpec:
37    """Represents a format specifier parsed from a printf-style string."""
38
39    # Regular expression for finding format specifiers.
40    FORMAT_SPEC = re.compile(r'%(?:(?P<flags>[+\- #0]*\d*(?:\.\d+)?)'
41                             r'(?P<length>hh|h|ll|l|j|z|t|L)?'
42                             r'(?P<type>[csdioxXufFeEaAgGnp])|%)')
43
44    # Conversions to make format strings Python compatible.
45    _UNSUPPORTED_LENGTH = frozenset(['hh', 'll', 'j', 'z', 't'])
46    _REMAP_TYPE = {'a': 'f', 'A': 'F'}
47
48    # Conversion specifiers by type; n is not supported.
49    _SIGNED_INT = 'di'
50    _UNSIGNED_INT = frozenset('oxXup')
51    _FLOATING_POINT = frozenset('fFeEaAgG')
52
53    _PACKED_FLOAT = struct.Struct('<f')
54
55    @classmethod
56    def from_string(cls, format_specifier: str):
57        """Creates a FormatSpec from a str with a single format specifier."""
58        match = cls.FORMAT_SPEC.fullmatch(format_specifier)
59
60        if not match:
61            raise ValueError(
62                '{!r} is not a valid single format specifier'.format(
63                    format_specifier))
64
65        return cls(match)
66
67    def __init__(self, re_match: Match):
68        """Constructs a FormatSpec from an re.Match object for FORMAT_SPEC."""
69        self.match = re_match
70        self.specifier: str = self.match.group()
71
72        self.flags: str = self.match.group('flags') or ''
73        self.length: str = self.match.group('length') or ''
74
75        # If there is no type, the format spec is %%.
76        self.type: str = self.match.group('type') or '%'
77
78        # %p prints as 0xFEEDBEEF; other specs may need length/type switched
79        if self.type == 'p':
80            self.compatible = '0x%08X'
81        else:
82            self.compatible = ''.join([
83                '%', self.flags,
84                '' if self.length in self._UNSUPPORTED_LENGTH else '',
85                self._REMAP_TYPE.get(self.type, self.type)
86            ])
87
88    def decode(self, encoded_arg: bytes) -> 'DecodedArg':
89        """Decodes the provided data according to this format specifier."""
90        if self.type == '%':  # literal %
91            return DecodedArg(self, (),
92                              b'')  # Use () as the value for % formatting.
93
94        if self.type == 's':  # string
95            return self._decode_string(encoded_arg)
96
97        if self.type == 'c':  # character
98            return self._decode_char(encoded_arg)
99
100        if self.type in self._SIGNED_INT:
101            return self._decode_signed_integer(encoded_arg)
102
103        if self.type in self._UNSIGNED_INT:
104            return self._decode_unsigned_integer(encoded_arg)
105
106        if self.type in self._FLOATING_POINT:
107            return self._decode_float(encoded_arg)
108
109        # Unsupported specifier (e.g. %n)
110        return DecodedArg(
111            self, None, b'', DecodedArg.DECODE_ERROR,
112            'Unsupported conversion specifier "{}"'.format(self.type))
113
114    def _decode_signed_integer(self, encoded: bytes) -> 'DecodedArg':
115        """Decodes a signed variable-length integer."""
116        if not encoded:
117            return DecodedArg.missing(self)
118
119        count = 0
120        result = 0
121        shift = 0
122
123        for byte in encoded:
124            count += 1
125            result |= (byte & 0x7f) << shift
126
127            if not byte & 0x80:
128                return DecodedArg(self, zigzag_decode(result), encoded[:count])
129
130            shift += 7
131            if shift >= 64:
132                break
133
134        return DecodedArg(self, None, encoded[:count], DecodedArg.DECODE_ERROR,
135                          'Unterminated variable-length integer')
136
137    def _decode_unsigned_integer(self, encoded: bytes) -> 'DecodedArg':
138        arg = self._decode_signed_integer(encoded)
139
140        # Since ZigZag encoding is used, unsigned integers must be masked off to
141        # their original bit length.
142        if arg.value is not None:
143            arg.value &= (1 << self.size_bits()) - 1
144
145        return arg
146
147    def _decode_float(self, encoded: bytes) -> 'DecodedArg':
148        if len(encoded) < 4:
149            return DecodedArg.missing(self)
150
151        return DecodedArg(self,
152                          self._PACKED_FLOAT.unpack_from(encoded)[0],
153                          encoded[:4])
154
155    def _decode_string(self, encoded: bytes) -> 'DecodedArg':
156        """Reads a unicode string from the encoded data."""
157        if not encoded:
158            return DecodedArg.missing(self)
159
160        size_and_status = encoded[0]
161        status = DecodedArg.OK
162
163        if size_and_status & 0x80:
164            status |= DecodedArg.TRUNCATED
165            size_and_status &= 0x7f
166
167        raw_data = encoded[0:size_and_status + 1]
168        data = raw_data[1:]
169
170        if len(data) < size_and_status:
171            status |= DecodedArg.DECODE_ERROR
172
173        try:
174            decoded = data.decode()
175        except UnicodeDecodeError as err:
176            return DecodedArg(self,
177                              repr(bytes(data)).lstrip('b'), raw_data,
178                              status | DecodedArg.DECODE_ERROR, err)
179
180        return DecodedArg(self, decoded, raw_data, status)
181
182    def _decode_char(self, encoded: bytes) -> 'DecodedArg':
183        """Reads an integer from the data, then converts it to a string."""
184        arg = self._decode_signed_integer(encoded)
185
186        if arg.ok():
187            try:
188                arg.value = chr(arg.value)
189            except (OverflowError, ValueError) as err:
190                arg.error = err
191                arg.status |= DecodedArg.DECODE_ERROR
192
193        return arg
194
195    def size_bits(self) -> int:
196        """Size of the argument in bits; 0 for strings."""
197        if self.type == 's':
198            return 0
199
200        # TODO(hepler): 64-bit targets likely have 64-bit l, j, z, and t.
201        return 64 if self.length in ['ll', 'j'] else 32
202
203    def __str__(self) -> str:
204        return self.specifier
205
206
207class DecodedArg:
208    """Represents a decoded argument that is ready to be formatted."""
209
210    # Status flags for a decoded argument. These values should match the
211    # DecodingStatus enum in pw_tokenizer/internal/decode.h.
212    OK = 0  # decoding was successful
213    MISSING = 1  # the argument was not present in the data
214    TRUNCATED = 2  # the argument was truncated during encoding
215    DECODE_ERROR = 4  # an error occurred while decoding the argument
216    SKIPPED = 8  # argument was skipped due to a previous error
217
218    @classmethod
219    def missing(cls, specifier: FormatSpec):
220        return cls(specifier, None, b'', cls.MISSING)
221
222    def __init__(self,
223                 specifier: FormatSpec,
224                 value,
225                 raw_data: bytes,
226                 status: int = OK,
227                 error=None):
228        self.specifier = specifier  # FormatSpec (e.g. to represent "%0.2f")
229        self.value = value  # the decoded value, or None if decoding failed
230        self.raw_data = bytes(
231            raw_data)  # the exact bytes used to decode this arg
232        self._status = status
233        self.error = error
234
235    def ok(self) -> bool:
236        """The argument was decoded without errors."""
237        return self.status == self.OK or self.status == self.TRUNCATED
238
239    @property
240    def status(self) -> int:
241        return self._status
242
243    @status.setter
244    def status(self, status: int):
245        # The %% specifier is always OK and should always be printed normally.
246        self._status = status if self.specifier.type != '%' else self.OK
247
248    def format(self) -> str:
249        """Returns formatted version of this argument, with error handling."""
250        if self.status == self.TRUNCATED:
251            return self.specifier.compatible % (self.value + '[...]')
252
253        if self.ok():
254            try:
255                return self.specifier.compatible % self.value
256            except (OverflowError, TypeError, ValueError) as err:
257                self.status |= self.DECODE_ERROR
258                self.error = err
259
260        if self.status & self.SKIPPED:
261            message = '{} SKIPPED'.format(self.specifier)
262        elif self.status == self.MISSING:
263            message = '{} MISSING'.format(self.specifier)
264        elif self.status & self.DECODE_ERROR:
265            message = '{} ERROR'.format(self.specifier)
266        else:
267            raise AssertionError('Unhandled DecodedArg status {:x}!'.format(
268                self.status))
269
270        if self.value is None or not str(self.value):
271            return '<[{}]>'.format(message)
272
273        return '<[{} ({})]>'.format(message, self.value)
274
275    def __str__(self) -> str:
276        return self.format()
277
278    def __repr__(self) -> str:
279        return f'DecodedArg({self})'
280
281
282def parse_format_specifiers(format_string: str) -> Iterable[FormatSpec]:
283    for spec in FormatSpec.FORMAT_SPEC.finditer(format_string):
284        yield FormatSpec(spec)
285
286
287class FormattedString(NamedTuple):
288    value: str
289    args: Sequence[DecodedArg]
290    remaining: bytes
291
292    def ok(self) -> bool:
293        """Arg data decoded successfully and all expected args were found."""
294        return all(arg.ok() for arg in self.args) and not self.remaining
295
296    def score(self, date_removed: datetime = None) -> tuple:
297        """Returns a key for sorting by how successful a decode was.
298
299        Decoded strings are sorted by whether they
300
301          1. decoded all bytes for all arguments without errors,
302          2. decoded all data,
303          3. have the fewest decoding errors,
304          4. decoded the most arguments successfully, or
305          5. have the most recent removal date, if they were removed.
306
307        This must match the collision resolution logic in detokenize.cc.
308
309        To format a list of FormattedStrings from most to least successful,
310        use sort(key=FormattedString.score, reverse=True).
311        """
312        return (
313            self.ok(),  # decocoded all data and all expected args were found
314            not self.remaining,  # decoded all data
315            -sum(not arg.ok() for arg in self.args),  # fewest errors
316            len(self.args),  # decoded the most arguments
317            date_removed or datetime.max)  # most recently present
318
319
320class FormatString:
321    """Represents a printf-style format string."""
322    def __init__(self, format_string: str):
323        """Parses format specifiers in the format string."""
324        self.format_string = format_string
325        self.specifiers = tuple(parse_format_specifiers(self.format_string))
326
327        # List of non-specifier string pieces with room for formatted arguments.
328        self._segments = self._parse_string_segments()
329
330    def _parse_string_segments(self) -> List:
331        """Splits the format string by format specifiers."""
332        if not self.specifiers:
333            return [self.format_string]
334
335        spec_spans = [spec.match.span() for spec in self.specifiers]
336
337        # Start with the part of the format string up to the first specifier.
338        string_pieces = [self.format_string[:spec_spans[0][0]]]
339
340        for ((_, end1), (start2, _)) in zip(spec_spans[:-1], spec_spans[1:]):
341            string_pieces.append(self.format_string[end1:start2])
342
343        # Append the format string segment after the last format specifier.
344        string_pieces.append(self.format_string[spec_spans[-1][1]:])
345
346        # Make a list with spots for the replacements between the string pieces.
347        segments: List = [None] * (len(string_pieces) + len(self.specifiers))
348        segments[::2] = string_pieces
349
350        return segments
351
352    def decode(self, encoded: bytes) -> Tuple[Sequence[DecodedArg], bytes]:
353        """Decodes arguments according to the format string.
354
355        Args:
356          encoded: bytes; the encoded arguments
357
358        Returns:
359          tuple with the decoded arguments and any unparsed data
360        """
361        decoded_args = []
362
363        fatal_error = False
364        index = 0
365
366        for spec in self.specifiers:
367            arg = spec.decode(encoded[index:])
368
369            if fatal_error:
370                # After an error is encountered, continue to attempt to parse
371                # arguments, but mark them all as SKIPPED. If an error occurs,
372                # it's impossible to know if subsequent arguments are valid.
373                arg.status |= DecodedArg.SKIPPED
374            elif not arg.ok():
375                fatal_error = True
376
377            decoded_args.append(arg)
378            index += len(arg.raw_data)
379
380        return tuple(decoded_args), encoded[index:]
381
382    def format(self,
383               encoded_args: bytes,
384               show_errors: bool = False) -> FormattedString:
385        """Decodes arguments and formats the string with them.
386
387        Args:
388          encoded_args: the arguments to decode and format the string with
389          show_errors: if True, an error message is used in place of the %
390              conversion specifier when an argument fails to decode
391
392        Returns:
393          tuple with the formatted string, decoded arguments, and remaining data
394        """
395        # Insert formatted arguments in place of each format specifier.
396        args, remaining = self.decode(encoded_args)
397
398        if show_errors:
399            self._segments[1::2] = (arg.format() for arg in args)
400        else:
401            self._segments[1::2] = (arg.format()
402                                    if arg.ok() else arg.specifier.specifier
403                                    for arg in args)
404
405        return FormattedString(''.join(self._segments), args, remaining)
406
407
408def decode(format_string: str,
409           encoded_arguments: bytes,
410           show_errors: bool = False) -> str:
411    """Decodes arguments and formats them with the provided format string.
412
413    Args:
414      format_string: the printf-style format string
415      encoded_arguments: encoded arguments with which to format
416          format_string; must exclude the 4-byte string token
417      show_errors: if True, an error message is used in place of the %
418          conversion specifier when an argument fails to decode
419
420    Returns:
421      the printf-style formatted string
422    """
423    return FormatString(format_string).format(encoded_arguments,
424                                              show_errors).value
425