• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2# Copyright 2020 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15"""Contains the Python decoder tests and generates C++ decoder tests."""
16
17from typing import Iterator, List, NamedTuple, Tuple, Union
18import unittest
19
20from pw_build.generated_tests import Context, PyTest, TestGenerator, GroupOrTest
21from pw_build.generated_tests import parse_test_generation_args
22from pw_hdlc.decode import Frame, FrameDecoder, FrameStatus, NO_ADDRESS
23from pw_hdlc.protocol import frame_check_sequence as fcs
24from pw_hdlc.protocol import encode_address
25
26
27def _encode(address: int, control: int, data: bytes) -> bytes:
28    frame = encode_address(address) + bytes([control]) + data
29    frame += fcs(frame)
30    frame = frame.replace(b'\x7d', b'\x7d\x5d')
31    frame = frame.replace(b'\x7e', b'\x7d\x5e')
32    return b''.join([b'\x7e', frame, b'\x7e'])
33
34
35class Expected(NamedTuple):
36    address: int
37    control: bytes
38    data: bytes
39    status: FrameStatus = FrameStatus.OK
40
41    @classmethod
42    def error(cls, status: FrameStatus):
43        assert status is not FrameStatus.OK
44        return cls(NO_ADDRESS, b'', b'', status)
45
46    def __eq__(self, other) -> bool:
47        """Define == so an Expected and a Frame can be compared."""
48        return (self.address == other.address and self.control == other.control
49                and self.data == other.data and self.status is other.status)
50
51
52class ExpectedRaw(NamedTuple):
53    raw_encoded: bytes
54    status: FrameStatus
55
56    def __eq__(self, other) -> bool:
57        """Define == so an ExpectedRaw and a Frame can be compared."""
58        return (self.raw_encoded == other.raw_encoded
59                and self.status is other.status)
60
61
62Expectation = Union[Expected, ExpectedRaw]
63
64_PARTIAL = fcs(b'\x0ACmsg\x5e')
65_ESCAPED_FLAG_TEST_CASE = (
66    b'\x7e\x0ACmsg\x7d\x7e' + _PARTIAL + b'\x7e',
67    [
68        Expected.error(FrameStatus.FRAMING_ERROR),
69        Expected.error(FrameStatus.FRAMING_ERROR),
70    ],
71)
72
73TEST_CASES: Tuple[GroupOrTest[Tuple[bytes, List[Expectation]]], ...] = (
74    'Empty payload',
75    (_encode(0, 0, b''), [Expected(0, b'\0', b'')]),
76    (_encode(55, 0x99, b''), [Expected(55, b'\x99', b'')]),
77    (_encode(55, 0x99, b'') * 3, [Expected(55, b'\x99', b'')] * 3),
78    'Simple one-byte payload',
79    (_encode(0, 0, b'\0'), [Expected(0, b'\0', b'\0')]),
80    (_encode(123, 0, b'A'), [Expected(123, b'\0', b'A')]),
81    'Simple multi-byte payload',
82    (_encode(0, 0, b'Hello, world!'), [Expected(0, b'\0', b'Hello, world!')]),
83    (_encode(123, 0, b'\0\0\1\0\0'), [Expected(123, b'\0', b'\0\0\1\0\0')]),
84    'Escaped one-byte payload',
85    (_encode(1, 2, b'\x7e'), [Expected(1, b'\2', b'\x7e')]),
86    (_encode(1, 2, b'\x7d'), [Expected(1, b'\2', b'\x7d')]),
87    (_encode(1, 2, b'\x7e') + _encode(1, 2, b'\x7d'),
88     [Expected(1, b'\2', b'\x7e'),
89      Expected(1, b'\2', b'\x7d')]),
90    'Escaped address',
91    (_encode(0x7e, 0, b'A'), [Expected(0x7e, b'\0', b'A')]),
92    (_encode(0x7d, 0, b'B'), [Expected(0x7d, b'\0', b'B')]),
93    'Escaped control',
94    (_encode(0, 0x7e, b'C'), [Expected(0, b'\x7e', b'C')]),
95    (_encode(0, 0x7d, b'D'), [Expected(0, b'\x7d', b'D')]),
96    'Escaped address and control',
97    (_encode(0x7e, 0x7d, b'E'), [Expected(0x7e, b'\x7d', b'E')]),
98    (_encode(0x7d, 0x7e, b'F'), [Expected(0x7d, b'\x7e', b'F')]),
99    (_encode(0x7e, 0x7e, b'\x7e'), [Expected(0x7e, b'\x7e', b'\x7e')]),
100    'Multibyte address',
101    (_encode(128, 0, b'big address'), [Expected(128, b'\0', b'big address')]),
102    (_encode(0xffffffff, 0, b'\0\0\1\0\0'),
103     [Expected(0xffffffff, b'\0', b'\0\0\1\0\0')]),
104    'Multiple frames separated by single flag',
105    (_encode(0, 0, b'A')[:-1] + _encode(1, 2, b'123'),
106     [Expected(0, b'\0', b'A'),
107      Expected(1, b'\2', b'123')]),
108    (_encode(0xff, 0, b'Yo')[:-1] * 3 + b'\x7e',
109     [Expected(0xff, b'\0', b'Yo')] * 3),
110    'Ignore empty frames',
111    (b'\x7e\x7e', []),
112    (b'\x7e' * 10, []),
113    (b'\x7e\x7e' + _encode(1, 2, b'3') + b'\x7e' * 5,
114     [Expected(1, b'\2', b'3')]),
115    (b'\x7e' * 10 + _encode(1, 2, b':O') + b'\x7e' * 3 + _encode(3, 4, b':P'),
116     [Expected(1, b'\2', b':O'),
117      Expected(3, b'\4', b':P')]),
118    'Cannot escape flag',
119    (b'\x7e\xAA\x7d\x7e\xab\x00Hello' + fcs(b'\xab\0Hello') + b'\x7e', [
120        Expected.error(FrameStatus.FRAMING_ERROR),
121        Expected(0x55, b'\0', b'Hello'),
122    ]),
123    _ESCAPED_FLAG_TEST_CASE,
124    'Frame too short',
125    (b'\x7e1\x7e', [Expected.error(FrameStatus.FRAMING_ERROR)]),
126    (b'\x7e12\x7e', [Expected.error(FrameStatus.FRAMING_ERROR)]),
127    (b'\x7e12345\x7e', [Expected.error(FrameStatus.FRAMING_ERROR)]),
128    'Multibyte address too long',
129    (_encode(2 ** 100, 0, b'too long'),
130     [Expected.error(FrameStatus.BAD_ADDRESS)]),
131    'Incorrect frame check sequence',
132    (b'\x7e123456\x7e', [Expected.error(FrameStatus.FCS_MISMATCH)]),
133    (b'\x7e\1\2msg\xff\xff\xff\xff\x7e',
134     [Expected.error(FrameStatus.FCS_MISMATCH)]),
135    (_encode(0xA, 0xB, b'???')[:-2] + _encode(1, 2, b'def'), [
136        Expected.error(FrameStatus.FCS_MISMATCH),
137        Expected(1, b'\2', b'def'),
138    ]),
139    'Invalid escape in address',
140    (b'\x7e\x7d\x7d\0' + fcs(b'\x5d\0') + b'\x7e',
141     [Expected.error(FrameStatus.FRAMING_ERROR)]),
142    'Invalid escape in control',
143    (b'\x7e\0\x7d\x7d' + fcs(b'\0\x5d') + b'\x7e',
144     [Expected.error(FrameStatus.FRAMING_ERROR)]),
145    'Invalid escape in data',
146    (b'\x7e\0\1\x7d\x7d' + fcs(b'\0\1\x5d') + b'\x7e',
147     [Expected.error(FrameStatus.FRAMING_ERROR)]),
148    'Frame ends with escape',
149    (b'\x7e\x7d\x7e', [Expected.error(FrameStatus.FRAMING_ERROR)]),
150    (b'\x7e\1\x7d\x7e', [Expected.error(FrameStatus.FRAMING_ERROR)]),
151    (b'\x7e\1\2abc\x7d\x7e', [Expected.error(FrameStatus.FRAMING_ERROR)]),
152    (b'\x7e\1\2abcd\x7d\x7e', [Expected.error(FrameStatus.FRAMING_ERROR)]),
153    (b'\x7e\1\2abcd1234\x7d\x7e', [Expected.error(FrameStatus.FRAMING_ERROR)]),
154    'Inter-frame data is only escapes',
155    (b'\x7e\x7d\x7e\x7d\x7e', [
156        Expected.error(FrameStatus.FRAMING_ERROR),
157        Expected.error(FrameStatus.FRAMING_ERROR),
158    ]),
159    (b'\x7e\x7d\x7d\x7e\x7d\x7d\x7e', [
160        Expected.error(FrameStatus.FRAMING_ERROR),
161        Expected.error(FrameStatus.FRAMING_ERROR),
162    ]),
163    'Data before first flag',
164    (b'\0\1' + fcs(b'\0\1'), []),
165    (b'\0\1' + fcs(b'\0\1') + b'\x7e',
166     [Expected.error(FrameStatus.FRAMING_ERROR)]),
167    'No frames emitted until flag',
168    (_encode(1, 2, b'3')[:-1], []),
169    (b'\x7e' + _encode(1, 2, b'3')[1:-1] * 2, []),
170    'Only flag and escape characters can be escaped',
171    (b'\x7e\x7d\0' + _encode(1, 2, b'3'),
172     [Expected.error(FrameStatus.FRAMING_ERROR),
173      Expected(1, b'\2', b'3')]),
174    (b'\x7e1234\x7da' + _encode(1, 2, b'3'),
175     [Expected.error(FrameStatus.FRAMING_ERROR),
176      Expected(1, b'\2', b'3')]),
177    'Invalid frame records raw data',
178    (b'Hello?~', [ExpectedRaw(b'Hello?', FrameStatus.FRAMING_ERROR)]),
179    (b'~~Hel\x7d\x7dlo~',
180     [ExpectedRaw(b'Hel\x7d\x7dlo', FrameStatus.FRAMING_ERROR)]),
181    (b'Hello?~~~~~', [ExpectedRaw(b'Hello?', FrameStatus.FRAMING_ERROR)]),
182    (b'~~~~Hello?~~~~~', [ExpectedRaw(b'Hello?', FrameStatus.FCS_MISMATCH)]),
183    (b'Hello?~~Goodbye~', [
184        ExpectedRaw(b'Hello?', FrameStatus.FRAMING_ERROR),
185        ExpectedRaw(b'Goodbye', FrameStatus.FCS_MISMATCH),
186    ]),
187)  # yapf: disable
188# Formatting for the above tuple is very slow, so disable yapf.
189
190_TESTS = TestGenerator(TEST_CASES)
191
192
193def _expected(frames: List[Frame]) -> Iterator[str]:
194    for i, frame in enumerate(frames, 1):
195        if frame.ok():
196            yield f'      Frame::Parse(kDecodedFrame{i:02}).value(),'
197        elif frame.status is FrameStatus.BAD_ADDRESS:
198            yield f'      Frame::Parse(kDecodedFrame{i:02}).status(),'
199        else:
200            yield f'      Status::DataLoss(),  // Frame {i}'
201
202
203_CPP_HEADER = """\
204#include "pw_hdlc/decoder.h"
205
206#include <array>
207#include <cstddef>
208#include <variant>
209
210#include "gtest/gtest.h"
211#include "pw_bytes/array.h"
212
213namespace pw::hdlc {
214namespace {
215"""
216
217_CPP_FOOTER = """\
218}  // namespace
219}  // namespace pw::hdlc"""
220
221
222def _cpp_test(ctx: Context) -> Iterator[str]:
223    """Generates a C++ test for the provided test data."""
224    data, _ = ctx.test_case
225    frames = list(FrameDecoder().process(data))
226    data_bytes = ''.join(rf'\x{byte:02x}' for byte in data)
227
228    yield f'TEST(Decoder, {ctx.cc_name()}) {{'
229    yield f'  static constexpr auto kData = bytes::String("{data_bytes}");\n'
230
231    for i, frame in enumerate(frames, 1):
232        if frame.ok() or frame.status is FrameStatus.BAD_ADDRESS:
233            frame_bytes = ''.join(rf'\x{byte:02x}'
234                                  for byte in frame.raw_decoded)
235            yield (f'  static constexpr auto kDecodedFrame{i:02} = '
236                   f'bytes::String("{frame_bytes}");')
237        else:
238            yield f'  // Frame {i}: {frame.status.value}'
239
240    yield ''
241
242    expected = '\n'.join(_expected(frames)) or '      // No frames'
243    decoder_size = max(len(data), 8)  # Make sure large enough for a frame
244
245    yield f"""\
246  DecoderBuffer<{decoder_size}> decoder;
247
248  static std::array<std::variant<Frame, Status>, {len(frames)}> kExpected = {{
249{expected}
250  }};
251
252  size_t decoded_frames = 0;
253
254  decoder.Process(kData, [&](const Result<Frame>& result) {{
255    ASSERT_LT(decoded_frames++, kExpected.size());
256    auto& expected = kExpected[decoded_frames - 1];
257
258    if (std::holds_alternative<Status>(expected)) {{
259      EXPECT_EQ(Status::DataLoss(), result.status());
260    }} else {{
261      ASSERT_EQ(OkStatus(), result.status());
262
263      const Frame& decoded_frame = result.value();
264      const Frame& expected_frame = std::get<Frame>(expected);
265      EXPECT_EQ(expected_frame.address(), decoded_frame.address());
266      EXPECT_EQ(expected_frame.control(), decoded_frame.control());
267      ASSERT_EQ(expected_frame.data().size(), decoded_frame.data().size());
268      EXPECT_EQ(std::memcmp(expected_frame.data().data(),
269                            decoded_frame.data().data(),
270                            expected_frame.data().size()),
271                0);
272    }}
273  }});
274
275  EXPECT_EQ(decoded_frames, kExpected.size());
276}}"""
277
278
279def _define_py_test(ctx: Context) -> PyTest:
280    data, expected_frames = ctx.test_case
281
282    def test(self) -> None:
283        # Decode in one call
284        self.assertEqual(expected_frames,
285                         list(FrameDecoder().process(data)),
286                         msg=f'{ctx.group}: {data!r}')
287
288        # Decode byte-by-byte
289        decoder = FrameDecoder()
290        decoded_frames: List[Frame] = []
291        for i in range(len(data)):
292            decoded_frames += decoder.process(data[i:i + 1])
293
294        self.assertEqual(expected_frames,
295                         decoded_frames,
296                         msg=f'{ctx.group} (byte-by-byte): {data!r}')
297
298    return test
299
300
301# Class that tests all cases in TEST_CASES.
302DecoderTest = _TESTS.python_tests('DecoderTest', _define_py_test)
303
304if __name__ == '__main__':
305    args = parse_test_generation_args()
306    if args.generate_cc_test:
307        _TESTS.cc_tests(args.generate_cc_test, _cpp_test, _CPP_HEADER,
308                        _CPP_FOOTER)
309    else:
310        unittest.main()
311