1# SPDX-License-Identifier: GPL-2.0-only 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) Nils Weiss <nils@we155.de> 5# Copyright (C) Enrico Pozzobon <enricopozzobon@gmail.com> 6# Copyright (C) Alexander Schroeder <alexander1.schroeder@st.othr.de> 7 8# scapy.contrib.description = ISO-TP (ISO 15765-2) Utilities 9# scapy.contrib.status = library 10 11import struct 12 13from scapy.config import conf 14from scapy.utils import EDecimal 15from scapy.packet import Packet 16from scapy.sessions import DefaultSession 17from scapy.supersocket import SuperSocket 18from scapy.contrib.isotp.isotp_packet import ISOTP, N_PCI_CF, N_PCI_SF, \ 19 N_PCI_FF, N_PCI_FC 20 21# Typing imports 22from typing import ( 23 cast, 24 Iterable, 25 Iterator, 26 Optional, 27 Union, 28 List, 29 Tuple, 30 Dict, 31 Any, 32 Type, 33) 34 35 36class ISOTPMessageBuilderIter(object): 37 """ 38 Iterator class for ISOTPMessageBuilder 39 """ 40 slots = ["builder"] 41 42 def __init__(self, builder): 43 # type: (ISOTPMessageBuilder) -> None 44 self.builder = builder 45 46 def __iter__(self): 47 # type: () -> ISOTPMessageBuilderIter 48 return self 49 50 def __next__(self): 51 # type: () -> ISOTP 52 while self.builder.count: 53 p = self.builder.pop() 54 if p is None: 55 break 56 else: 57 return p 58 raise StopIteration 59 60 next = __next__ 61 62 63class ISOTPMessageBuilder(object): 64 """ 65 Initialize a ISOTPMessageBuilder object 66 67 Utility class to build ISOTP messages out of CAN frames, used by both 68 ISOTP.defragment() and ISOTPSession. 69 70 This class attempts to interpret some CAN frames as ISOTP frames, both with 71 and without extended addressing at the same time. For example, if an 72 extended address of 07 is being used, all frames will also be interpreted 73 as ISOTP single-frame messages. 74 75 CAN frames are fed to an ISOTPMessageBuilder object with the feed() method 76 and the resulting ISOTP frames can be extracted using the pop() method. 77 78 :param use_ext_address: True for only attempting to defragment with 79 extended addressing, False for only attempting 80 to defragment without extended addressing, 81 or None for both 82 :param rx_id: Destination Identifier 83 :param basecls: The class of packets that will be returned, 84 defaults to ISOTP 85 """ 86 87 class Bucket(object): 88 """ 89 Helper class to store not finished ISOTP messages while building. 90 """ 91 92 def __init__(self, total_len, first_piece, ts): 93 # type: (int, bytes, Union[EDecimal, float]) -> None 94 self.pieces = list() # type: List[bytes] 95 self.total_len = total_len 96 self.current_len = 0 97 self.ready = None # type: Optional[bytes] 98 self.tx_id = None # type: Optional[int] 99 self.ext_address = None # type: Optional[int] 100 self.time = ts # type: Union[float, EDecimal] 101 self.push(first_piece) 102 103 def push(self, piece): 104 # type: (bytes) -> None 105 self.pieces.append(piece) 106 self.current_len += len(piece) 107 if self.current_len >= self.total_len: 108 isotp_data = b"".join(self.pieces) 109 self.ready = isotp_data[:self.total_len] 110 111 def __init__( 112 self, 113 use_ext_address=None, # type: Optional[bool] 114 rx_id=None, # type: Optional[Union[int, List[int], Iterable[int]]] 115 basecls=ISOTP # type: Type[ISOTP] 116 ): 117 # type: (...) -> None 118 self.ready = [] # type: List[Tuple[int, Optional[int], ISOTPMessageBuilder.Bucket]] # noqa: E501 119 self.buckets = {} # type: Dict[Tuple[Optional[int], int, int], ISOTPMessageBuilder.Bucket] # noqa: E501 120 self.use_ext_addr = use_ext_address 121 self.basecls = basecls 122 self.rx_ids = None # type: Optional[Iterable[int]] 123 self.last_ff = None # type: Optional[Tuple[Optional[int], int, int]] 124 self.last_ff_ex = None # type: Optional[Tuple[Optional[int], int, int]] # noqa: E501 125 if rx_id is not None: 126 if isinstance(rx_id, list): 127 self.rx_ids = rx_id 128 elif isinstance(rx_id, int): 129 self.rx_ids = [rx_id] 130 elif hasattr(rx_id, "__iter__"): 131 self.rx_ids = rx_id 132 else: 133 raise TypeError("Invalid type for argument rx_id!") 134 135 def feed(self, can): 136 # type: (Union[Iterable[Packet], Packet]) -> None 137 """Attempt to feed an incoming CAN frame into the state machine""" 138 if not isinstance(can, Packet) and hasattr(can, "__iter__"): 139 for p in can: 140 self.feed(p) 141 return 142 143 if not isinstance(can, Packet): 144 return 145 146 if self.rx_ids is not None and can.identifier not in self.rx_ids: 147 return 148 149 data = bytes(can.data) 150 151 if len(data) > 1 and self.use_ext_addr is not True: 152 self._try_feed(can.identifier, None, data, can.time) 153 if len(data) > 2 and self.use_ext_addr is not False: 154 ea = data[0] 155 self._try_feed(can.identifier, ea, data[1:], can.time) 156 157 @property 158 def count(self): 159 # type: () -> int 160 """Returns the number of ready ISOTP messages built from the provided 161 can frames 162 163 :return: Number of ready ISOTP messages 164 """ 165 return len(self.ready) 166 167 def __len__(self): 168 # type: () -> int 169 return self.count 170 171 def pop(self, identifier=None, ext_addr=None): 172 # type: (Optional[int], Optional[int]) -> Optional[ISOTP] 173 """Returns a built ISOTP message 174 175 :param identifier: if not None, only return isotp messages with this 176 destination 177 :param ext_addr: if identifier is not None, only return isotp messages 178 with this extended address for destination 179 :returns: an ISOTP packet, or None if no message is ready 180 """ 181 182 if identifier is not None: 183 for i in range(len(self.ready)): 184 b = self.ready[i] 185 iden = b[0] 186 ea = b[1] 187 if iden == identifier and ext_addr == ea: 188 return ISOTPMessageBuilder._build(self.ready.pop(i), 189 self.basecls) 190 return None 191 192 if len(self.ready) > 0: 193 return ISOTPMessageBuilder._build(self.ready.pop(0), self.basecls) 194 return None 195 196 def __iter__(self): 197 # type: () -> ISOTPMessageBuilderIter 198 return ISOTPMessageBuilderIter(self) 199 200 @staticmethod 201 def _build( 202 t, # type: Tuple[int, Optional[int], ISOTPMessageBuilder.Bucket] 203 basecls=ISOTP # type: Type[ISOTP] 204 ): 205 # type: (...) -> ISOTP 206 bucket = t[2] 207 data = bucket.ready or b"" 208 try: 209 p = basecls(data) 210 except Exception: 211 if conf.debug_dissector: 212 from scapy.sendrecv import debug 213 debug.crashed_on = (basecls, data) 214 raise 215 if hasattr(p, "rx_id"): 216 p.rx_id = t[0] 217 if hasattr(p, "rx_ext_address"): 218 p.rx_ext_address = t[1] 219 if hasattr(p, "tx_id"): 220 p.tx_id = bucket.tx_id 221 if hasattr(p, "ext_address"): 222 p.ext_address = bucket.ext_address 223 if hasattr(p, "time"): 224 p.time = bucket.time 225 return p 226 227 def _feed_first_frame(self, identifier, ea, data, ts): 228 # type: (int, Optional[int], bytes, Union[EDecimal, float]) -> bool 229 if len(data) < 3: 230 # At least 3 bytes are necessary: 2 for length and 1 for data 231 return False 232 233 header = struct.unpack('>H', bytes(data[:2]))[0] 234 expected_length = header & 0x0fff 235 isotp_data = data[2:] 236 if expected_length == 0 and len(data) >= 6: 237 expected_length = struct.unpack('>I', bytes(data[2:6]))[0] 238 isotp_data = data[6:] 239 240 key = (ea, identifier, 1) 241 if ea is None: 242 self.last_ff = key 243 else: 244 self.last_ff_ex = key 245 self.buckets[key] = self.Bucket(expected_length, isotp_data, ts) 246 return True 247 248 def _feed_single_frame(self, identifier, ea, data, ts): 249 # type: (int, Optional[int], bytes, Union[EDecimal, float]) -> bool 250 if len(data) < 2: 251 # At least 2 bytes are necessary: 1 for length and 1 for data 252 return False 253 254 length = data[0] & 0x0f 255 isotp_data = data[1:length + 1] 256 257 if length > len(isotp_data): 258 # CAN frame has less data than expected 259 return False 260 261 self.ready.append((identifier, ea, 262 self.Bucket(length, isotp_data, ts))) 263 return True 264 265 def _feed_consecutive_frame(self, identifier, ea, data): 266 # type: (int, Optional[int], bytes) -> bool 267 if len(data) < 2: 268 # At least 2 bytes are necessary: 1 for sequence number and 269 # 1 for data 270 return False 271 272 first_byte = data[0] 273 seq_no = first_byte & 0x0f 274 isotp_data = data[1:] 275 276 key = (ea, identifier, seq_no) 277 bucket = self.buckets.pop(key, None) 278 279 if bucket is None: 280 # There is no message constructor waiting for this frame 281 return False 282 283 bucket.push(isotp_data) 284 if bucket.ready is None: 285 # full ISOTP message is not ready yet, put it back in 286 # buckets list 287 next_seq = (seq_no + 1) % 16 288 key = (ea, identifier, next_seq) 289 self.buckets[key] = bucket 290 else: 291 self.ready.append((identifier, ea, bucket)) 292 293 return True 294 295 def _feed_flow_control_frame(self, identifier, ea, data): 296 # type: (int, Optional[int], bytes) -> bool 297 if len(data) < 3: 298 # At least 2 bytes are necessary: 1 for sequence number and 299 # 1 for data 300 return False 301 302 keys = [x for x in (self.last_ff, self.last_ff_ex) if x is not None] 303 buckets = [self.buckets.pop(k, None) for k in keys] 304 305 self.last_ff = None 306 self.last_ff_ex = None 307 308 if not any(buckets) or not any(keys): 309 # There is no message constructor waiting for this frame 310 return False 311 312 for key, bucket in zip(keys, buckets): 313 if bucket is None: 314 continue 315 bucket.tx_id = identifier 316 bucket.ext_address = ea 317 self.buckets[key] = bucket 318 return True 319 320 def _try_feed(self, identifier, ea, data, ts): 321 # type: (int, Optional[int], bytes, Union[EDecimal, float]) -> None 322 first_byte = data[0] 323 if len(data) > 1 and first_byte & 0xf0 == N_PCI_SF: 324 self._feed_single_frame(identifier, ea, data, ts) 325 if len(data) > 2 and first_byte & 0xf0 == N_PCI_FF: 326 self._feed_first_frame(identifier, ea, data, ts) 327 if len(data) > 1 and first_byte & 0xf0 == N_PCI_CF: 328 self._feed_consecutive_frame(identifier, ea, data) 329 if len(data) > 1 and first_byte & 0xf0 == N_PCI_FC: 330 self._feed_flow_control_frame(identifier, ea, data) 331 332 333class ISOTPSession(DefaultSession): 334 """Defragment ISOTP packets 'on-the-flow'. 335 336 Usage: 337 >>> sniff(session=ISOTPSession) 338 """ 339 340 def __init__(self, *args, **kwargs): 341 # type: (Any, Any) -> None 342 self.m = ISOTPMessageBuilder( 343 use_ext_address=kwargs.pop("use_ext_address", None), 344 rx_id=kwargs.pop("rx_id", None), 345 basecls=kwargs.pop("basecls", ISOTP)) 346 super(ISOTPSession, self).__init__(*args, **kwargs) 347 348 def recv(self, sock: SuperSocket) -> Iterator[Packet]: 349 """ 350 Will be called by sniff() to ask for a packet 351 """ 352 pkt = sock.recv() 353 if not pkt: 354 return 355 self.m.feed(pkt) 356 while len(self.m) > 0: 357 rcvd = cast(Optional[Packet], self.m.pop()) 358 if rcvd: 359 rcvd = self.process(rcvd) 360 if rcvd: 361 yield rcvd 362