1# Copyright 2020 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Provides functionality for encoding tokenized messages.""" 15 16import base64 17import struct 18from typing import Union 19 20_INT32_MAX = 2**31 - 1 21_UINT32_MAX = 2**32 - 1 22BASE64_PREFIX = '$' 23 24 25def _zig_zag_encode(value: int) -> int: 26 """Encodes signed integers to give a compact varint encoding.""" 27 return value << 1 if value >= 0 else (value << 1) ^ (~0) 28 29 30def _little_endian_base128_encode(integer: int) -> bytearray: 31 data = bytearray() 32 33 while True: 34 # Grab 7 bits; the eighth bit is set to 1 to indicate more data coming. 35 data.append((integer & 0x7f) | 0x80) 36 integer >>= 7 37 38 if not integer: 39 break 40 41 data[-1] &= 0x7f # clear the top bit of the last byte 42 return data 43 44 45def _encode_int32(arg: int) -> bytearray: 46 # Convert large unsigned numbers into their corresponding signed values. 47 if arg > _INT32_MAX: 48 arg -= 2**32 49 50 return _little_endian_base128_encode(_zig_zag_encode(arg)) 51 52 53def _encode_string(arg: bytes) -> bytes: 54 size_byte = len(arg) if len(arg) < 128 else 0xff 55 return struct.pack('B', size_byte) + arg[:127] 56 57 58def encode_token_and_args(token: int, *args: Union[int, float, bytes, 59 str]) -> bytes: 60 """Encodes a tokenized message given its token and arguments. 61 62 This function assumes that the token represents a format string with 63 conversion specifiers that correspond with the provided argument types. 64 Currently, only 32-bit integers are supported. 65 """ 66 67 if token < 0 or token > _UINT32_MAX: 68 raise ValueError( 69 f'The token ({token}) must be an unsigned 32-bit integer') 70 71 data = bytearray(struct.pack('<I', token)) 72 73 for arg in args: 74 if isinstance(arg, int): 75 if arg.bit_length() > 32: 76 raise ValueError( 77 f'Cannot encode {arg}: only 32-bit integers may be encoded' 78 ) 79 data += _encode_int32(arg) 80 elif isinstance(arg, float): 81 data += struct.pack('<f', arg) 82 elif isinstance(arg, str): 83 data += _encode_string(arg.encode()) 84 elif isinstance(arg, bytes): 85 data += _encode_string(arg) 86 else: 87 raise ValueError( 88 f'{arg} has type {type(arg)}, which is not supported') 89 90 return bytes(data) 91 92 93def prefixed_base64(data: bytes, prefix: str = '$') -> str: 94 """Encodes a tokenized message as prefixed Base64.""" 95 return prefix + base64.b64encode(data).decode() 96