• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Nils Weiss <nils@we155.de>
5# Copyright (C) Enrico Pozzobon <enricopozzobon@gmail.com>
6# Copyright (C) Alexander Schroeder <alexander1.schroeder@st.othr.de>
7
8# scapy.contrib.description = ISO-TP (ISO 15765-2) Utilities
9# scapy.contrib.status = library
10
11import struct
12
13from scapy.config import conf
14from scapy.utils import EDecimal
15from scapy.packet import Packet
16from scapy.sessions import DefaultSession
17from scapy.supersocket import SuperSocket
18from scapy.contrib.isotp.isotp_packet import ISOTP, N_PCI_CF, N_PCI_SF, \
19    N_PCI_FF, N_PCI_FC
20
21# Typing imports
22from typing import (
23    cast,
24    Iterable,
25    Iterator,
26    Optional,
27    Union,
28    List,
29    Tuple,
30    Dict,
31    Any,
32    Type,
33)
34
35
36class ISOTPMessageBuilderIter(object):
37    """
38    Iterator class for ISOTPMessageBuilder
39    """
40    slots = ["builder"]
41
42    def __init__(self, builder):
43        # type: (ISOTPMessageBuilder) -> None
44        self.builder = builder
45
46    def __iter__(self):
47        # type: () -> ISOTPMessageBuilderIter
48        return self
49
50    def __next__(self):
51        # type: () -> ISOTP
52        while self.builder.count:
53            p = self.builder.pop()
54            if p is None:
55                break
56            else:
57                return p
58        raise StopIteration
59
60    next = __next__
61
62
63class ISOTPMessageBuilder(object):
64    """
65    Initialize a ISOTPMessageBuilder object
66
67    Utility class to build ISOTP messages out of CAN frames, used by both
68    ISOTP.defragment() and ISOTPSession.
69
70    This class attempts to interpret some CAN frames as ISOTP frames, both with
71    and without extended addressing at the same time. For example, if an
72    extended address of 07 is being used, all frames will also be interpreted
73    as ISOTP single-frame messages.
74
75    CAN frames are fed to an ISOTPMessageBuilder object with the feed() method
76    and the resulting ISOTP frames can be extracted using the pop() method.
77
78    :param use_ext_address: True for only attempting to defragment with
79                         extended addressing, False for only attempting
80                         to defragment without extended addressing,
81                         or None for both
82    :param rx_id: Destination Identifier
83    :param basecls: The class of packets that will be returned,
84                    defaults to ISOTP
85    """
86
87    class Bucket(object):
88        """
89        Helper class to store not finished ISOTP messages while building.
90        """
91
92        def __init__(self, total_len, first_piece, ts):
93            # type: (int, bytes, Union[EDecimal, float]) -> None
94            self.pieces = list()  # type: List[bytes]
95            self.total_len = total_len
96            self.current_len = 0
97            self.ready = None  # type: Optional[bytes]
98            self.tx_id = None  # type: Optional[int]
99            self.ext_address = None  # type: Optional[int]
100            self.time = ts  # type: Union[float, EDecimal]
101            self.push(first_piece)
102
103        def push(self, piece):
104            # type: (bytes) -> None
105            self.pieces.append(piece)
106            self.current_len += len(piece)
107            if self.current_len >= self.total_len:
108                isotp_data = b"".join(self.pieces)
109                self.ready = isotp_data[:self.total_len]
110
111    def __init__(
112            self,
113            use_ext_address=None,  # type: Optional[bool]
114            rx_id=None,  # type: Optional[Union[int, List[int], Iterable[int]]]
115            basecls=ISOTP  # type: Type[ISOTP]
116    ):
117        # type: (...) -> None
118        self.ready = []  # type: List[Tuple[int, Optional[int], ISOTPMessageBuilder.Bucket]]  # noqa: E501
119        self.buckets = {}  # type: Dict[Tuple[Optional[int], int, int], ISOTPMessageBuilder.Bucket]  # noqa: E501
120        self.use_ext_addr = use_ext_address
121        self.basecls = basecls
122        self.rx_ids = None  # type: Optional[Iterable[int]]
123        self.last_ff = None  # type: Optional[Tuple[Optional[int], int, int]]
124        self.last_ff_ex = None  # type: Optional[Tuple[Optional[int], int, int]]  # noqa: E501
125        if rx_id is not None:
126            if isinstance(rx_id, list):
127                self.rx_ids = rx_id
128            elif isinstance(rx_id, int):
129                self.rx_ids = [rx_id]
130            elif hasattr(rx_id, "__iter__"):
131                self.rx_ids = rx_id
132            else:
133                raise TypeError("Invalid type for argument rx_id!")
134
135    def feed(self, can):
136        # type: (Union[Iterable[Packet], Packet]) -> None
137        """Attempt to feed an incoming CAN frame into the state machine"""
138        if not isinstance(can, Packet) and hasattr(can, "__iter__"):
139            for p in can:
140                self.feed(p)
141            return
142
143        if not isinstance(can, Packet):
144            return
145
146        if self.rx_ids is not None and can.identifier not in self.rx_ids:
147            return
148
149        data = bytes(can.data)
150
151        if len(data) > 1 and self.use_ext_addr is not True:
152            self._try_feed(can.identifier, None, data, can.time)
153        if len(data) > 2 and self.use_ext_addr is not False:
154            ea = data[0]
155            self._try_feed(can.identifier, ea, data[1:], can.time)
156
157    @property
158    def count(self):
159        # type: () -> int
160        """Returns the number of ready ISOTP messages built from the provided
161        can frames
162
163        :return: Number of ready ISOTP messages
164        """
165        return len(self.ready)
166
167    def __len__(self):
168        # type: () -> int
169        return self.count
170
171    def pop(self, identifier=None, ext_addr=None):
172        # type: (Optional[int], Optional[int]) -> Optional[ISOTP]
173        """Returns a built ISOTP message
174
175        :param identifier: if not None, only return isotp messages with this
176                           destination
177        :param ext_addr: if identifier is not None, only return isotp messages
178                         with this extended address for destination
179        :returns: an ISOTP packet, or None if no message is ready
180        """
181
182        if identifier is not None:
183            for i in range(len(self.ready)):
184                b = self.ready[i]
185                iden = b[0]
186                ea = b[1]
187                if iden == identifier and ext_addr == ea:
188                    return ISOTPMessageBuilder._build(self.ready.pop(i),
189                                                      self.basecls)
190            return None
191
192        if len(self.ready) > 0:
193            return ISOTPMessageBuilder._build(self.ready.pop(0), self.basecls)
194        return None
195
196    def __iter__(self):
197        # type: () -> ISOTPMessageBuilderIter
198        return ISOTPMessageBuilderIter(self)
199
200    @staticmethod
201    def _build(
202            t,  # type: Tuple[int, Optional[int], ISOTPMessageBuilder.Bucket]
203            basecls=ISOTP  # type: Type[ISOTP]
204    ):
205        # type: (...) -> ISOTP
206        bucket = t[2]
207        data = bucket.ready or b""
208        try:
209            p = basecls(data)
210        except Exception:
211            if conf.debug_dissector:
212                from scapy.sendrecv import debug
213                debug.crashed_on = (basecls, data)
214            raise
215        if hasattr(p, "rx_id"):
216            p.rx_id = t[0]
217        if hasattr(p, "rx_ext_address"):
218            p.rx_ext_address = t[1]
219        if hasattr(p, "tx_id"):
220            p.tx_id = bucket.tx_id
221        if hasattr(p, "ext_address"):
222            p.ext_address = bucket.ext_address
223        if hasattr(p, "time"):
224            p.time = bucket.time
225        return p
226
227    def _feed_first_frame(self, identifier, ea, data, ts):
228        # type: (int, Optional[int], bytes, Union[EDecimal, float]) -> bool
229        if len(data) < 3:
230            # At least 3 bytes are necessary: 2 for length and 1 for data
231            return False
232
233        header = struct.unpack('>H', bytes(data[:2]))[0]
234        expected_length = header & 0x0fff
235        isotp_data = data[2:]
236        if expected_length == 0 and len(data) >= 6:
237            expected_length = struct.unpack('>I', bytes(data[2:6]))[0]
238            isotp_data = data[6:]
239
240        key = (ea, identifier, 1)
241        if ea is None:
242            self.last_ff = key
243        else:
244            self.last_ff_ex = key
245        self.buckets[key] = self.Bucket(expected_length, isotp_data, ts)
246        return True
247
248    def _feed_single_frame(self, identifier, ea, data, ts):
249        # type: (int, Optional[int], bytes, Union[EDecimal, float]) -> bool
250        if len(data) < 2:
251            # At least 2 bytes are necessary: 1 for length and 1 for data
252            return False
253
254        length = data[0] & 0x0f
255        isotp_data = data[1:length + 1]
256
257        if length > len(isotp_data):
258            # CAN frame has less data than expected
259            return False
260
261        self.ready.append((identifier, ea,
262                           self.Bucket(length, isotp_data, ts)))
263        return True
264
265    def _feed_consecutive_frame(self, identifier, ea, data):
266        # type: (int, Optional[int], bytes) -> bool
267        if len(data) < 2:
268            # At least 2 bytes are necessary: 1 for sequence number and
269            # 1 for data
270            return False
271
272        first_byte = data[0]
273        seq_no = first_byte & 0x0f
274        isotp_data = data[1:]
275
276        key = (ea, identifier, seq_no)
277        bucket = self.buckets.pop(key, None)
278
279        if bucket is None:
280            # There is no message constructor waiting for this frame
281            return False
282
283        bucket.push(isotp_data)
284        if bucket.ready is None:
285            # full ISOTP message is not ready yet, put it back in
286            # buckets list
287            next_seq = (seq_no + 1) % 16
288            key = (ea, identifier, next_seq)
289            self.buckets[key] = bucket
290        else:
291            self.ready.append((identifier, ea, bucket))
292
293        return True
294
295    def _feed_flow_control_frame(self, identifier, ea, data):
296        # type: (int, Optional[int], bytes) -> bool
297        if len(data) < 3:
298            # At least 2 bytes are necessary: 1 for sequence number and
299            # 1 for data
300            return False
301
302        keys = [x for x in (self.last_ff, self.last_ff_ex) if x is not None]
303        buckets = [self.buckets.pop(k, None) for k in keys]
304
305        self.last_ff = None
306        self.last_ff_ex = None
307
308        if not any(buckets) or not any(keys):
309            # There is no message constructor waiting for this frame
310            return False
311
312        for key, bucket in zip(keys, buckets):
313            if bucket is None:
314                continue
315            bucket.tx_id = identifier
316            bucket.ext_address = ea
317            self.buckets[key] = bucket
318        return True
319
320    def _try_feed(self, identifier, ea, data, ts):
321        # type: (int, Optional[int], bytes, Union[EDecimal, float]) -> None
322        first_byte = data[0]
323        if len(data) > 1 and first_byte & 0xf0 == N_PCI_SF:
324            self._feed_single_frame(identifier, ea, data, ts)
325        if len(data) > 2 and first_byte & 0xf0 == N_PCI_FF:
326            self._feed_first_frame(identifier, ea, data, ts)
327        if len(data) > 1 and first_byte & 0xf0 == N_PCI_CF:
328            self._feed_consecutive_frame(identifier, ea, data)
329        if len(data) > 1 and first_byte & 0xf0 == N_PCI_FC:
330            self._feed_flow_control_frame(identifier, ea, data)
331
332
333class ISOTPSession(DefaultSession):
334    """Defragment ISOTP packets 'on-the-flow'.
335
336    Usage:
337        >>> sniff(session=ISOTPSession)
338    """
339
340    def __init__(self, *args, **kwargs):
341        # type: (Any, Any) -> None
342        self.m = ISOTPMessageBuilder(
343            use_ext_address=kwargs.pop("use_ext_address", None),
344            rx_id=kwargs.pop("rx_id", None),
345            basecls=kwargs.pop("basecls", ISOTP))
346        super(ISOTPSession, self).__init__(*args, **kwargs)
347
348    def recv(self, sock: SuperSocket) -> Iterator[Packet]:
349        """
350        Will be called by sniff() to ask for a packet
351        """
352        pkt = sock.recv()
353        if not pkt:
354            return
355        self.m.feed(pkt)
356        while len(self.m) > 0:
357            rcvd = cast(Optional[Packet], self.m.pop())
358            if rcvd:
359                rcvd = self.process(rcvd)
360            if rcvd:
361                yield rcvd
362