1#!/usr/bin/env python 2# Copyright 2020 The Pigweed Authors 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5# use this file except in compliance with the License. You may obtain a copy of 6# the License at 7# 8# https://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15"""Contains the Python decoder tests and generates C++ decoder tests.""" 16 17from typing import Iterator, List, NamedTuple, Tuple, Union 18import unittest 19 20from pw_build.generated_tests import Context, PyTest, TestGenerator, GroupOrTest 21from pw_build.generated_tests import parse_test_generation_args 22from pw_hdlc.decode import Frame, FrameDecoder, FrameStatus, NO_ADDRESS 23from pw_hdlc.protocol import frame_check_sequence as fcs 24from pw_hdlc.protocol import encode_address 25 26 27def _encode(address: int, control: int, data: bytes) -> bytes: 28 frame = encode_address(address) + bytes([control]) + data 29 frame += fcs(frame) 30 frame = frame.replace(b'\x7d', b'\x7d\x5d') 31 frame = frame.replace(b'\x7e', b'\x7d\x5e') 32 return b''.join([b'\x7e', frame, b'\x7e']) 33 34 35class Expected(NamedTuple): 36 address: int 37 control: bytes 38 data: bytes 39 status: FrameStatus = FrameStatus.OK 40 41 @classmethod 42 def error(cls, status: FrameStatus): 43 assert status is not FrameStatus.OK 44 return cls(NO_ADDRESS, b'', b'', status) 45 46 def __eq__(self, other) -> bool: 47 """Define == so an Expected and a Frame can be compared.""" 48 return (self.address == other.address and self.control == other.control 49 and self.data == other.data and self.status is other.status) 50 51 52class ExpectedRaw(NamedTuple): 53 raw_encoded: bytes 54 status: FrameStatus 55 56 def __eq__(self, other) -> bool: 57 """Define == so an ExpectedRaw and a Frame can be compared.""" 58 return (self.raw_encoded == other.raw_encoded 59 and self.status is other.status) 60 61 62Expectation = Union[Expected, ExpectedRaw] 63 64_PARTIAL = fcs(b'\x0ACmsg\x5e') 65_ESCAPED_FLAG_TEST_CASE = ( 66 b'\x7e\x0ACmsg\x7d\x7e' + _PARTIAL + b'\x7e', 67 [ 68 Expected.error(FrameStatus.FRAMING_ERROR), 69 Expected.error(FrameStatus.FRAMING_ERROR), 70 ], 71) 72 73TEST_CASES: Tuple[GroupOrTest[Tuple[bytes, List[Expectation]]], ...] = ( 74 'Empty payload', 75 (_encode(0, 0, b''), [Expected(0, b'\0', b'')]), 76 (_encode(55, 0x99, b''), [Expected(55, b'\x99', b'')]), 77 (_encode(55, 0x99, b'') * 3, [Expected(55, b'\x99', b'')] * 3), 78 'Simple one-byte payload', 79 (_encode(0, 0, b'\0'), [Expected(0, b'\0', b'\0')]), 80 (_encode(123, 0, b'A'), [Expected(123, b'\0', b'A')]), 81 'Simple multi-byte payload', 82 (_encode(0, 0, b'Hello, world!'), [Expected(0, b'\0', b'Hello, world!')]), 83 (_encode(123, 0, b'\0\0\1\0\0'), [Expected(123, b'\0', b'\0\0\1\0\0')]), 84 'Escaped one-byte payload', 85 (_encode(1, 2, b'\x7e'), [Expected(1, b'\2', b'\x7e')]), 86 (_encode(1, 2, b'\x7d'), [Expected(1, b'\2', b'\x7d')]), 87 (_encode(1, 2, b'\x7e') + _encode(1, 2, b'\x7d'), 88 [Expected(1, b'\2', b'\x7e'), 89 Expected(1, b'\2', b'\x7d')]), 90 'Escaped address', 91 (_encode(0x7e, 0, b'A'), [Expected(0x7e, b'\0', b'A')]), 92 (_encode(0x7d, 0, b'B'), [Expected(0x7d, b'\0', b'B')]), 93 'Escaped control', 94 (_encode(0, 0x7e, b'C'), [Expected(0, b'\x7e', b'C')]), 95 (_encode(0, 0x7d, b'D'), [Expected(0, b'\x7d', b'D')]), 96 'Escaped address and control', 97 (_encode(0x7e, 0x7d, b'E'), [Expected(0x7e, b'\x7d', b'E')]), 98 (_encode(0x7d, 0x7e, b'F'), [Expected(0x7d, b'\x7e', b'F')]), 99 (_encode(0x7e, 0x7e, b'\x7e'), [Expected(0x7e, b'\x7e', b'\x7e')]), 100 'Multibyte address', 101 (_encode(128, 0, b'big address'), [Expected(128, b'\0', b'big address')]), 102 (_encode(0xffffffff, 0, b'\0\0\1\0\0'), 103 [Expected(0xffffffff, b'\0', b'\0\0\1\0\0')]), 104 'Multiple frames separated by single flag', 105 (_encode(0, 0, b'A')[:-1] + _encode(1, 2, b'123'), 106 [Expected(0, b'\0', b'A'), 107 Expected(1, b'\2', b'123')]), 108 (_encode(0xff, 0, b'Yo')[:-1] * 3 + b'\x7e', 109 [Expected(0xff, b'\0', b'Yo')] * 3), 110 'Ignore empty frames', 111 (b'\x7e\x7e', []), 112 (b'\x7e' * 10, []), 113 (b'\x7e\x7e' + _encode(1, 2, b'3') + b'\x7e' * 5, 114 [Expected(1, b'\2', b'3')]), 115 (b'\x7e' * 10 + _encode(1, 2, b':O') + b'\x7e' * 3 + _encode(3, 4, b':P'), 116 [Expected(1, b'\2', b':O'), 117 Expected(3, b'\4', b':P')]), 118 'Cannot escape flag', 119 (b'\x7e\xAA\x7d\x7e\xab\x00Hello' + fcs(b'\xab\0Hello') + b'\x7e', [ 120 Expected.error(FrameStatus.FRAMING_ERROR), 121 Expected(0x55, b'\0', b'Hello'), 122 ]), 123 _ESCAPED_FLAG_TEST_CASE, 124 'Frame too short', 125 (b'\x7e1\x7e', [Expected.error(FrameStatus.FRAMING_ERROR)]), 126 (b'\x7e12\x7e', [Expected.error(FrameStatus.FRAMING_ERROR)]), 127 (b'\x7e12345\x7e', [Expected.error(FrameStatus.FRAMING_ERROR)]), 128 'Multibyte address too long', 129 (_encode(2 ** 100, 0, b'too long'), 130 [Expected.error(FrameStatus.BAD_ADDRESS)]), 131 'Incorrect frame check sequence', 132 (b'\x7e123456\x7e', [Expected.error(FrameStatus.FCS_MISMATCH)]), 133 (b'\x7e\1\2msg\xff\xff\xff\xff\x7e', 134 [Expected.error(FrameStatus.FCS_MISMATCH)]), 135 (_encode(0xA, 0xB, b'???')[:-2] + _encode(1, 2, b'def'), [ 136 Expected.error(FrameStatus.FCS_MISMATCH), 137 Expected(1, b'\2', b'def'), 138 ]), 139 'Invalid escape in address', 140 (b'\x7e\x7d\x7d\0' + fcs(b'\x5d\0') + b'\x7e', 141 [Expected.error(FrameStatus.FRAMING_ERROR)]), 142 'Invalid escape in control', 143 (b'\x7e\0\x7d\x7d' + fcs(b'\0\x5d') + b'\x7e', 144 [Expected.error(FrameStatus.FRAMING_ERROR)]), 145 'Invalid escape in data', 146 (b'\x7e\0\1\x7d\x7d' + fcs(b'\0\1\x5d') + b'\x7e', 147 [Expected.error(FrameStatus.FRAMING_ERROR)]), 148 'Frame ends with escape', 149 (b'\x7e\x7d\x7e', [Expected.error(FrameStatus.FRAMING_ERROR)]), 150 (b'\x7e\1\x7d\x7e', [Expected.error(FrameStatus.FRAMING_ERROR)]), 151 (b'\x7e\1\2abc\x7d\x7e', [Expected.error(FrameStatus.FRAMING_ERROR)]), 152 (b'\x7e\1\2abcd\x7d\x7e', [Expected.error(FrameStatus.FRAMING_ERROR)]), 153 (b'\x7e\1\2abcd1234\x7d\x7e', [Expected.error(FrameStatus.FRAMING_ERROR)]), 154 'Inter-frame data is only escapes', 155 (b'\x7e\x7d\x7e\x7d\x7e', [ 156 Expected.error(FrameStatus.FRAMING_ERROR), 157 Expected.error(FrameStatus.FRAMING_ERROR), 158 ]), 159 (b'\x7e\x7d\x7d\x7e\x7d\x7d\x7e', [ 160 Expected.error(FrameStatus.FRAMING_ERROR), 161 Expected.error(FrameStatus.FRAMING_ERROR), 162 ]), 163 'Data before first flag', 164 (b'\0\1' + fcs(b'\0\1'), []), 165 (b'\0\1' + fcs(b'\0\1') + b'\x7e', 166 [Expected.error(FrameStatus.FRAMING_ERROR)]), 167 'No frames emitted until flag', 168 (_encode(1, 2, b'3')[:-1], []), 169 (b'\x7e' + _encode(1, 2, b'3')[1:-1] * 2, []), 170 'Only flag and escape characters can be escaped', 171 (b'\x7e\x7d\0' + _encode(1, 2, b'3'), 172 [Expected.error(FrameStatus.FRAMING_ERROR), 173 Expected(1, b'\2', b'3')]), 174 (b'\x7e1234\x7da' + _encode(1, 2, b'3'), 175 [Expected.error(FrameStatus.FRAMING_ERROR), 176 Expected(1, b'\2', b'3')]), 177 'Invalid frame records raw data', 178 (b'Hello?~', [ExpectedRaw(b'Hello?', FrameStatus.FRAMING_ERROR)]), 179 (b'~~Hel\x7d\x7dlo~', 180 [ExpectedRaw(b'Hel\x7d\x7dlo', FrameStatus.FRAMING_ERROR)]), 181 (b'Hello?~~~~~', [ExpectedRaw(b'Hello?', FrameStatus.FRAMING_ERROR)]), 182 (b'~~~~Hello?~~~~~', [ExpectedRaw(b'Hello?', FrameStatus.FCS_MISMATCH)]), 183 (b'Hello?~~Goodbye~', [ 184 ExpectedRaw(b'Hello?', FrameStatus.FRAMING_ERROR), 185 ExpectedRaw(b'Goodbye', FrameStatus.FCS_MISMATCH), 186 ]), 187) # yapf: disable 188# Formatting for the above tuple is very slow, so disable yapf. 189 190_TESTS = TestGenerator(TEST_CASES) 191 192 193def _expected(frames: List[Frame]) -> Iterator[str]: 194 for i, frame in enumerate(frames, 1): 195 if frame.ok(): 196 yield f' Frame::Parse(kDecodedFrame{i:02}).value(),' 197 elif frame.status is FrameStatus.BAD_ADDRESS: 198 yield f' Frame::Parse(kDecodedFrame{i:02}).status(),' 199 else: 200 yield f' Status::DataLoss(), // Frame {i}' 201 202 203_CPP_HEADER = """\ 204#include "pw_hdlc/decoder.h" 205 206#include <array> 207#include <cstddef> 208#include <variant> 209 210#include "gtest/gtest.h" 211#include "pw_bytes/array.h" 212 213namespace pw::hdlc { 214namespace { 215""" 216 217_CPP_FOOTER = """\ 218} // namespace 219} // namespace pw::hdlc""" 220 221_TS_HEADER = """\ 222import 'jasmine'; 223 224import {Buffer} from 'buffer'; 225 226import {Decoder, FrameStatus} from './decoder' 227import * as protocol from './protocol' 228import * as util from './util' 229 230class Expected { 231 address: number 232 control: Uint8Array 233 data: Uint8Array 234 status: FrameStatus 235 236 constructor( 237 address: number, 238 control: Uint8Array, 239 data: Uint8Array, 240 status: FrameStatus) { 241 this.address = address; 242 this.control = control; 243 this.data = data; 244 this.status = status; 245 } 246} 247 248class ExpectedRaw { 249 raw: Uint8Array 250 status: FrameStatus 251 252 constructor(raw: Uint8Array, status: FrameStatus) { 253 this.status = status; 254 this.raw = raw; 255 } 256} 257 258describe('Decoder', () => { 259 let decoder: Decoder; 260 let textEncoder: TextEncoder; 261 262 beforeEach(() => { 263 decoder = new Decoder(); 264 textEncoder = new TextEncoder(); 265 }); 266 267""" 268_TS_FOOTER = """\ 269}); 270""" 271 272 273def _cpp_test(ctx: Context) -> Iterator[str]: 274 """Generates a C++ test for the provided test data.""" 275 data, _ = ctx.test_case 276 frames = list(FrameDecoder().process(data)) 277 data_bytes = ''.join(rf'\x{byte:02x}' for byte in data) 278 279 yield f'TEST(Decoder, {ctx.cc_name()}) {{' 280 yield f' static constexpr auto kData = bytes::String("{data_bytes}");\n' 281 282 for i, frame in enumerate(frames, 1): 283 if frame.ok() or frame.status is FrameStatus.BAD_ADDRESS: 284 frame_bytes = ''.join(rf'\x{byte:02x}' 285 for byte in frame.raw_decoded) 286 yield (f' static constexpr auto kDecodedFrame{i:02} = ' 287 f'bytes::String("{frame_bytes}");') 288 else: 289 yield f' // Frame {i}: {frame.status.value}' 290 291 yield '' 292 293 expected = '\n'.join(_expected(frames)) or ' // No frames' 294 decoder_size = max(len(data), 8) # Make sure large enough for a frame 295 296 yield f"""\ 297 DecoderBuffer<{decoder_size}> decoder; 298 299 static std::array<std::variant<Frame, Status>, {len(frames)}> kExpected = {{ 300{expected} 301 }}; 302 303 size_t decoded_frames = 0; 304 305 decoder.Process(kData, [&](const Result<Frame>& result) {{ 306 ASSERT_LT(decoded_frames++, kExpected.size()); 307 auto& expected = kExpected[decoded_frames - 1]; 308 309 if (std::holds_alternative<Status>(expected)) {{ 310 EXPECT_EQ(Status::DataLoss(), result.status()); 311 }} else {{ 312 ASSERT_EQ(OkStatus(), result.status()); 313 314 const Frame& decoded_frame = result.value(); 315 const Frame& expected_frame = std::get<Frame>(expected); 316 EXPECT_EQ(expected_frame.address(), decoded_frame.address()); 317 EXPECT_EQ(expected_frame.control(), decoded_frame.control()); 318 ASSERT_EQ(expected_frame.data().size(), decoded_frame.data().size()); 319 EXPECT_EQ(std::memcmp(expected_frame.data().data(), 320 decoded_frame.data().data(), 321 expected_frame.data().size()), 322 0); 323 }} 324 }}); 325 326 EXPECT_EQ(decoded_frames, kExpected.size()); 327}}""" 328 329 330def _define_py_test(ctx: Context) -> PyTest: 331 data, expected_frames = ctx.test_case 332 333 def test(self) -> None: 334 # Decode in one call 335 self.assertEqual(expected_frames, 336 list(FrameDecoder().process(data)), 337 msg=f'{ctx.group}: {data!r}') 338 # Decode byte-by-byte 339 decoder = FrameDecoder() 340 decoded_frames: List[Frame] = [] 341 for i in range(len(data)): 342 decoded_frames += decoder.process(data[i:i + 1]) 343 344 self.assertEqual(expected_frames, 345 decoded_frames, 346 msg=f'{ctx.group} (byte-by-byte): {data!r}') 347 348 return test 349 350 351def _ts_byte_array(data: bytes) -> str: 352 return '[' + ', '.join(rf'0x{byte:02x}' for byte in data) + ']' 353 354 355def _ts_test(ctx: Context) -> Iterator[str]: 356 """Generates a TS test for the provided test data.""" 357 data, _ = ctx.test_case 358 frames = list(FrameDecoder().process(data)) 359 data_bytes = _ts_byte_array(data) 360 361 yield f' it(\'{ctx.ts_name()}\', () => {{' 362 yield f' const data = new Uint8Array({data_bytes});' 363 364 yield ' const expectedFrames = [' 365 for frame in frames: 366 control_bytes = _ts_byte_array(frame.control) 367 frame_bytes = _ts_byte_array(frame.data) 368 369 if frame is Expected: 370 yield (f' new Expected({frame.address}, ' 371 f'new Uint8Array({control_bytes}), ' 372 f'new Uint8Array({frame_bytes}), {frame.status}),') 373 else: 374 raw = _ts_byte_array(frame.raw_encoded) 375 yield ( 376 f' new ExpectedRaw(new Uint8Array({raw}), {frame.status}),' 377 ) 378 379 yield ' ].values();\n' 380 381 yield """\ 382 const result = decoder.process(data); 383 384 while (true) { 385 const expectedFrame = expectedFrames.next(); 386 const actualFrame = result.next(); 387 if (expectedFrame.done && actualFrame.done) { 388 break; 389 } 390 expect(expectedFrame.done).toBeFalse(); 391 expect(actualFrame.done).toBeFalse(); 392 393 const expected = expectedFrame.value; 394 const actual = actualFrame.value; 395 if (expected instanceof Expected) { 396 expect(actual.address).toEqual(expected.address); 397 expect(actual.control).toEqual(expected.control); 398 expect(actual.data).toEqual(expected.data); 399 expect(actual.status).toEqual(expected.status); 400 } else { 401 // Expected Raw 402 expect(actual.rawEncoded).toEqual(expected.raw); 403 expect(actual.status).toEqual(expected.status); 404 } 405 } 406 }); 407""" 408 409 410# Class that tests all cases in TEST_CASES. 411DecoderTest = _TESTS.python_tests('DecoderTest', _define_py_test) 412 413if __name__ == '__main__': 414 args = parse_test_generation_args() 415 if args.generate_cc_test: 416 _TESTS.cc_tests(args.generate_cc_test, _cpp_test, _CPP_HEADER, 417 _CPP_FOOTER) 418 elif args.generate_ts_test: 419 _TESTS.ts_tests(args.generate_ts_test, _ts_test, _TS_HEADER, 420 _TS_FOOTER) 421 else: 422 unittest.main() 423