1# Copyright 2020 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Decoder class for decoding bytes using HDLC protocol""" 15 16import enum 17import logging 18import threading 19import time 20from typing import Iterable, Optional, Callable, Any 21import zlib 22 23from pw_hdlc import protocol 24 25_LOG = logging.getLogger('pw_hdlc') 26 27NO_ADDRESS = -1 28_MIN_FRAME_SIZE = 6 # 1 B address + 1 B control + 4 B CRC-32 29_FLAG_BYTE = bytes([protocol.FLAG]) 30 31 32class FrameStatus(enum.Enum): 33 """Indicates that an error occurred.""" 34 35 OK = 'OK' 36 FCS_MISMATCH = 'frame check sequence failure' 37 FRAMING_ERROR = 'invalid flag or escape characters' 38 BAD_ADDRESS = 'address field too long' 39 40 41class Frame: 42 """Represents an HDLC frame.""" 43 44 def __init__( 45 self, 46 raw_encoded: bytes, 47 raw_decoded: bytes, 48 status: FrameStatus = FrameStatus.OK, 49 ): 50 """Parses fields from an HDLC frame. 51 52 Arguments: 53 raw_encoded: The complete HDLC-encoded frame, including any HDLC 54 flag bytes. In the case of back to back frames, the 55 beginning flag byte may be omitted. 56 raw_decoded: The complete decoded frame (address, control, 57 information, FCS). 58 status: Whether parsing the frame succeeded. 59 """ 60 self.raw_encoded = raw_encoded 61 self.raw_decoded = raw_decoded 62 self.status = status 63 64 self.address: int = NO_ADDRESS 65 self.control: bytes = b'' 66 self.data: bytes = b'' 67 68 if status == FrameStatus.OK: 69 address, address_length = protocol.decode_address(raw_decoded) 70 if address_length == 0: 71 self.status = FrameStatus.BAD_ADDRESS 72 return 73 74 self.address = address 75 self.control = raw_decoded[address_length : address_length + 1] 76 self.data = raw_decoded[address_length + 1 : -4] 77 78 def ok(self) -> bool: 79 """True if this represents a valid frame. 80 81 If false, then parsing failed. The status is set to indicate what type 82 of error occurred, and the data field contains all bytes parsed from the 83 frame (including bytes parsed as address or control bytes). 84 """ 85 return self.status is FrameStatus.OK 86 87 def __repr__(self) -> str: 88 if self.ok(): 89 body = ( 90 f'address={self.address}, control={self.control!r}, ' 91 f'data={self.data!r}' 92 ) 93 else: 94 body = ( 95 f'raw_encoded={self.raw_encoded!r}, ' 96 f'status={str(self.status)}' 97 ) 98 99 return f'{type(self).__name__}({body})' 100 101 102class _State(enum.Enum): 103 INTERFRAME = 0 104 FRAME = 1 105 FRAME_ESCAPE = 2 106 107 108def _check_frame(frame_data: bytes) -> FrameStatus: 109 if len(frame_data) < _MIN_FRAME_SIZE: 110 return FrameStatus.FRAMING_ERROR 111 112 frame_crc = int.from_bytes(frame_data[-4:], 'little') 113 if zlib.crc32(frame_data[:-4]) != frame_crc: 114 return FrameStatus.FCS_MISMATCH 115 116 return FrameStatus.OK 117 118 119class FrameDecoder: 120 """Decodes one or more HDLC frames from a stream of data.""" 121 122 def __init__(self) -> None: 123 self._decoded_data = bytearray() 124 self._raw_data = bytearray() 125 self._state = _State.INTERFRAME 126 127 def process(self, data: bytes) -> Iterable[Frame]: 128 """Decodes and yields HDLC frames, including corrupt frames. 129 130 The ok() method on Frame indicates whether it is valid or represents a 131 frame parsing error. 132 133 Yields: 134 Frames, which may be valid (frame.ok()) or corrupt (!frame.ok()) 135 """ 136 for byte in data: 137 frame = self.process_byte(byte) 138 if frame: 139 yield frame 140 141 def process_valid_frames(self, data: bytes) -> Iterable[Frame]: 142 """Decodes and yields valid HDLC frames, logging any errors.""" 143 for frame in self.process(data): 144 if frame.ok(): 145 yield frame 146 else: 147 _LOG.warning( 148 'Failed to decode frame: %s; discarded %d bytes', 149 frame.status.value, 150 len(frame.raw_encoded), 151 ) 152 _LOG.debug('Discarded data: %s', frame.raw_encoded) 153 154 def _finish_frame(self, status: FrameStatus) -> Frame: 155 # HDLC frames always start and end with a flag character, though the 156 # character may be shared with other frames. Ensure the raw encoding of 157 # OK frames always includes the start and end flags for consistency. 158 if status is FrameStatus.OK: 159 if not self._raw_data.startswith(_FLAG_BYTE): 160 self._raw_data.insert(0, protocol.FLAG) 161 162 frame = Frame(bytes(self._raw_data), bytes(self._decoded_data), status) 163 self._raw_data.clear() 164 self._decoded_data.clear() 165 return frame 166 167 def process_byte(self, byte: int) -> Optional[Frame]: 168 """Processes a single byte and returns a frame if one was completed.""" 169 frame: Optional[Frame] = None 170 171 self._raw_data.append(byte) 172 173 if self._state is _State.INTERFRAME: 174 if byte == protocol.FLAG: 175 if len(self._raw_data) != 1: 176 frame = self._finish_frame(FrameStatus.FRAMING_ERROR) 177 178 self._state = _State.FRAME 179 elif self._state is _State.FRAME: 180 if byte == protocol.FLAG: 181 # On back to back frames, we may see a repeated FLAG byte. 182 if len(self._raw_data) > 1: 183 frame = self._finish_frame(_check_frame(self._decoded_data)) 184 185 self._state = _State.FRAME 186 elif byte == protocol.ESCAPE: 187 self._state = _State.FRAME_ESCAPE 188 else: 189 self._decoded_data.append(byte) 190 elif self._state is _State.FRAME_ESCAPE: 191 if byte == protocol.FLAG: 192 frame = self._finish_frame(FrameStatus.FRAMING_ERROR) 193 self._state = _State.FRAME 194 elif byte in protocol.VALID_ESCAPED_BYTES: 195 self._state = _State.FRAME 196 self._decoded_data.append(protocol.escape(byte)) 197 else: 198 self._state = _State.INTERFRAME 199 else: 200 raise AssertionError(f'Invalid decoder state: {self._state}') 201 202 return frame 203 204 205class FrameAndNonFrameDecoder: 206 """Processes both HDLC frames and non-frame data in a stream.""" 207 208 def __init__( 209 self, 210 non_frame_data_handler: Callable[[bytes], Any], 211 *, 212 mtu: Optional[int] = None, 213 timeout_s: Optional[float] = None, 214 handle_shared_flags: bool = True, 215 ) -> None: 216 """Yields valid HDLC frames and passes non-frame data to callback. 217 218 Args: 219 mtu: Maximum bytes to receive before flushing raw data. If a valid 220 HDLC frame contains more than MTU bytes, the valid frame will be 221 emitted, but part of the frame will be included in the raw data. 222 timeout_s: How long to wait before automatically flushing raw data. If 223 a timeout occurs partway through a valid frame, the frame will be 224 emitted, but part of the frame will be included in the raw data. 225 handle_shared_flags: Whether to permit HDLC frames to share a single 226 flag byte between frames. If False, partial HDLC frames may be 227 emitted as raw data when HDLC frames share a flag byte, but raw 228 data won't have to wait for a timeout or full MTU to be flushed. 229 """ 230 self._non_frame_data_handler = non_frame_data_handler 231 self._mtu = mtu 232 self._shared_flags = handle_shared_flags 233 self._timeout_s = timeout_s 234 235 self._raw_data = bytearray() 236 self._hdlc_decoder = FrameDecoder() 237 self._last_data_time = time.time() 238 self._lock = threading.Lock() 239 240 if self._timeout_s is not None: 241 threading.Thread(target=self._timeout_thread, daemon=True).start() 242 243 def flush_non_frame_data(self) -> None: 244 """Flushes any data in the buffer as non-frame data. 245 246 If a valid HDLC frame was flushed partway, the data for the first part 247 of the frame will be included both in the raw data and in the frame. 248 """ 249 with self._lock: 250 self._flush_non_frame() 251 252 def _flush_non_frame(self, to_index: Optional[int] = None): 253 if self._raw_data: 254 self._non_frame_data_handler(bytes(self._raw_data[:to_index])) 255 del self._raw_data[:to_index] 256 257 def _timeout_thread(self) -> None: 258 assert self._timeout_s is not None 259 260 while True: 261 time.sleep(self._timeout_s) 262 with self._lock: 263 if time.time() - self._last_data_time > self._timeout_s: 264 self._flush_non_frame() 265 266 def process(self, data: bytes) -> Iterable[Frame]: 267 """Processes a stream of mixed HDLC and unstructured data. 268 269 Yields OK frames and calls non_frame_data_handler with non-HDLC data. 270 """ 271 with self._lock: 272 for byte in data: 273 yield from self._process_byte(byte) 274 275 # Flush the data if it is larger than the MTU, or flag bytes are not 276 # being shared and no initial flag was seen. 277 if (self._mtu is not None and len(self._raw_data) > self._mtu) or ( 278 not self._shared_flags 279 and not self._raw_data.startswith(_FLAG_BYTE) 280 ): 281 self._flush_non_frame() 282 283 self._last_data_time = time.time() 284 285 def _process_byte(self, byte: int) -> Iterable[Frame]: 286 self._raw_data.append(byte) 287 frame = self._hdlc_decoder.process_byte(byte) 288 289 if frame is None: 290 return 291 292 if frame.ok(): 293 # Drop the valid frame from the data. Only drop matching bytes in 294 # case the frame was flushed prematurely. 295 for suffix_byte in reversed(frame.raw_encoded): 296 if not self._raw_data or self._raw_data[-1] != suffix_byte: 297 break 298 self._raw_data.pop() 299 300 self._flush_non_frame() # Flush the raw data before the frame. 301 302 if self._mtu is not None and len(frame.raw_encoded) > self._mtu: 303 _LOG.warning( 304 'Found a valid %d B HDLC frame, but the MTU is set to %d! ' 305 'The MTU setting may be incorrect.', 306 self._mtu, 307 len(frame.raw_encoded), 308 ) 309 310 yield frame 311 else: 312 # Don't flush a final flag byte yet because it might be the start of 313 # an HDLC frame. 314 to_index = -1 if self._raw_data[-1] == protocol.FLAG else None 315 self._flush_non_frame(to_index) 316