• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Andreas Korb <andreas.korb@e-mundo.de>
5# Copyright (C) Friedrich Feigel <friedrich.feigel@e-mundo.de>
6# Copyright (C) Nils Weiss <nils@we155.de>
7
8# scapy.contrib.description = OnBoardDiagnosticScanner
9# scapy.contrib.status = loads
10
11import copy
12
13from scapy.contrib.automotive.obd.obd import OBD, OBD_S03, OBD_S07, OBD_S0A, \
14    OBD_S01, OBD_S06, OBD_S08, OBD_S09, OBD_NR, OBD_S02, OBD_S02_Record
15from scapy.config import conf
16from scapy.packet import Packet
17from scapy.themes import BlackAndWhite
18
19from scapy.contrib.automotive.scanner.enumerator import ServiceEnumerator, \
20    _AutomotiveTestCaseScanResult, _AutomotiveTestCaseFilteredScanResult
21from scapy.contrib.automotive.scanner.executor import \
22    AutomotiveTestCaseExecutor
23from scapy.contrib.automotive.ecu import EcuState
24from scapy.contrib.automotive.scanner.test_case import AutomotiveTestCaseABC, \
25    _SocketUnion
26
27# Typing imports
28from typing import (
29    List,
30    Type,
31    Any,
32    Iterable,
33)
34
35
36class OBD_Enumerator(ServiceEnumerator):
37    _supported_kwargs = copy.copy(ServiceEnumerator._supported_kwargs)
38    _supported_kwargs.update({
39        'full_scan': (bool, None),
40    })
41
42    _supported_kwargs_doc = ServiceEnumerator._supported_kwargs_doc + """
43        :param bool full_scan: Specifies if the entire scan range is tested, or
44                               if the bitmask with supported identifiers is
45                               queried and only supported identifiers
46                               are scanned."""
47
48    @staticmethod
49    def _get_negative_response_code(resp):
50        # type: (Packet) -> int
51        return resp.response_code
52
53    @staticmethod
54    def _get_negative_response_desc(nrc):
55        # type: (int) -> str
56        return OBD_NR(response_code=nrc).sprintf("%OBD_NR.response_code%")
57
58    @staticmethod
59    def _get_negative_response_label(response):
60        # type: (Packet) -> str
61        return response.sprintf("NR: %OBD_NR.response_code%")
62
63    @property
64    def filtered_results(self):
65        # type: () -> List[_AutomotiveTestCaseFilteredScanResult]
66        return self.results_with_positive_response
67
68
69class OBD_Service_Enumerator(OBD_Enumerator):
70    """
71    Base class for OBD_Service_Enumerators
72    """
73
74    def get_supported(self, socket, state, **kwargs):
75        # type: (_SocketUnion, EcuState, Any) -> List[int]
76        super(OBD_Service_Enumerator, self).execute(
77            socket, state, scan_range=range(0, 0xff, 0x20),
78            exit_scan_on_first_negative_response=True, **kwargs)
79
80        supported = list()
81        for _, _, r, _, _ in self.results_with_positive_response:
82            dr = r.data_records[0]
83            key = next(iter((dr.lastlayer().fields.keys())))
84            try:
85                supported += [int(i[-2:], 16) for i in
86                              getattr(dr, key, ["xxx00"])]
87            except TypeError:
88                pass
89        return list(set([i for i in supported if i % 0x20]))
90
91    def execute(self, socket, state, **kwargs):
92        # type: (_SocketUnion, EcuState, Any) -> None
93        full_scan = kwargs.pop("full_scan", False)  # type: bool
94        if full_scan:
95            super(OBD_Service_Enumerator, self).execute(socket, state, **kwargs)
96        else:
97            supported_pids = self.get_supported(socket, state, **kwargs)
98            del self._request_iterators[state]
99            super(OBD_Service_Enumerator, self).execute(
100                socket, state, scan_range=supported_pids, **kwargs)
101
102    execute.__doc__ = OBD_Enumerator._supported_kwargs_doc
103
104    @staticmethod
105    def print_payload(resp):
106        # type: (Packet) -> str
107        backup_ct = conf.color_theme
108        conf.color_theme = BlackAndWhite()
109        load = repr(resp.data_records[0].lastlayer())
110        conf.color_theme = backup_ct
111        return load
112
113    def _get_table_entry_z(self, tup):
114        # type: (_AutomotiveTestCaseScanResult) -> str
115        return self._get_label(tup[2], self.print_payload)
116
117
118class OBD_DTC_Enumerator(OBD_Enumerator):
119    @staticmethod
120    def print_payload(resp):
121        # type: (Packet) -> str
122        backup_ct = conf.color_theme
123        conf.color_theme = BlackAndWhite()
124        load = repr(resp.dtcs)
125        conf.color_theme = backup_ct
126        return load
127
128
129class OBD_S03_Enumerator(OBD_DTC_Enumerator):
130    _description = "Available DTCs in OBD service 03"
131
132    def _get_initial_requests(self, **kwargs):
133        # type: (Any) -> Iterable[Packet]
134        return [OBD() / OBD_S03()]
135
136    def _get_table_entry_x(self, tup):
137        # type: (_AutomotiveTestCaseScanResult) -> str
138        return "Service 03"
139
140    def _get_table_entry_y(self, tup):
141        # type: (_AutomotiveTestCaseScanResult) -> str
142        resp = tup[2]
143        if resp is None:
144            return "Timeout"
145        else:
146            return "NR" if resp.service == 0x7f else "%d DTCs" % resp.count
147
148
149class OBD_S07_Enumerator(OBD_DTC_Enumerator):
150    _description = "Available DTCs in OBD service 07"
151
152    def _get_initial_requests(self, **kwargs):
153        # type: (Any) -> Iterable[Packet]
154        return [OBD() / OBD_S07()]
155
156    def _get_table_entry_x(self, tup):
157        # type: (_AutomotiveTestCaseScanResult) -> str
158        return "Service 07"
159
160
161class OBD_S0A_Enumerator(OBD_DTC_Enumerator):
162    _description = "Available DTCs in OBD service 10"
163
164    def _get_initial_requests(self, **kwargs):
165        # type: (Any) -> Iterable[Packet]
166        return [OBD() / OBD_S0A()]
167
168    def _get_table_entry_x(self, tup):
169        # type: (_AutomotiveTestCaseScanResult) -> str
170        return "Service 0A"
171
172
173class OBD_S01_Enumerator(OBD_Service_Enumerator):
174    """OBD_S01_Enumerator"""
175
176    _description = "Available data in OBD service 01"
177
178    def _get_initial_requests(self, **kwargs):
179        # type: (Any) -> Iterable[Packet]
180        scan_range = kwargs.pop("scan_range", range(0x100))  # type: Iterable[int]  # noqa: E501
181        return (OBD() / OBD_S01(pid=[x]) for x in scan_range)
182
183    def _get_table_entry_x(self, tup):
184        # type: (_AutomotiveTestCaseScanResult) -> str
185        return "Service 01"
186
187    def _get_table_entry_y(self, tup):
188        # type: (_AutomotiveTestCaseScanResult) -> str
189        resp = tup[2]
190        if resp is None:
191            return "Timeout"
192        else:
193            return "NR" if resp.service == 0x7f else \
194                "%s" % resp.data_records[0].lastlayer().name
195
196
197class OBD_S02_Enumerator(OBD_Service_Enumerator):
198    _description = "Available data in OBD service 02"
199
200    def _get_initial_requests(self, **kwargs):
201        # type: (Any) -> Iterable[Packet]
202        scan_range = kwargs.pop("scan_range", range(0x100))  # type: Iterable[int]  # noqa: E501
203        return (OBD() / OBD_S02(requests=[OBD_S02_Record(pid=[x])])
204                for x in scan_range)
205
206    def _get_table_entry_x(self, tup):
207        # type: (_AutomotiveTestCaseScanResult) -> str
208        return "Service 02"
209
210    def _get_table_entry_y(self, tup):
211        # type: (_AutomotiveTestCaseScanResult) -> str
212        resp = tup[2]
213        if resp is None:
214            return "Timeout"
215        else:
216            return "NR" if resp.service == 0x7f else \
217                "%s" % resp.data_records[0].lastlayer().name
218
219
220class OBD_S06_Enumerator(OBD_Service_Enumerator):
221    _description = "Available data in OBD service 06"
222
223    def _get_initial_requests(self, **kwargs):
224        # type: (Any) -> Iterable[Packet]
225        scan_range = kwargs.pop("scan_range", range(0x100))  # type: Iterable[int]  # noqa: E501
226        return (OBD() / OBD_S06(mid=[x]) for x in scan_range)
227
228    def _get_table_entry_x(self, tup):
229        # type: (_AutomotiveTestCaseScanResult) -> str
230        return "Service 06"
231
232    def _get_table_entry_y(self, tup):
233        # type: (_AutomotiveTestCaseScanResult) -> str
234        req = tup[1]
235        resp = tup[2]
236        if resp is None:
237            return "Timeout"
238        else:
239            return "NR" if resp.service == 0x7f else \
240                "0x%02x %s" % (
241                    req.mid[0],
242                    resp.data_records[0].sprintf("%OBD_S06_PR_Record.mid%"))
243
244
245class OBD_S08_Enumerator(OBD_Service_Enumerator):
246    _description = "Available data in OBD service 08"
247
248    def _get_initial_requests(self, **kwargs):
249        # type: (Any) -> Iterable[Packet]
250        scan_range = kwargs.pop("scan_range", range(0x100))  # type: Iterable[int]  # noqa: E501
251        return (OBD() / OBD_S08(tid=[x]) for x in scan_range)
252
253    def _get_table_entry_x(self, tup):
254        # type: (_AutomotiveTestCaseScanResult) -> str
255        return "Service 08"
256
257    def _get_table_entry_y(self, tup):
258        # type: (_AutomotiveTestCaseScanResult) -> str
259        resp = tup[2]
260        if resp is None:
261            return "Timeout"
262        else:
263            return "NR" if resp.service == 0x7f else "0x%02x %s" % (
264                tup[1].tid[0], resp.data_records[0].lastlayer().name)
265
266
267class OBD_S09_Enumerator(OBD_Service_Enumerator):
268    _description = "Available data in OBD service 09"
269
270    def _get_initial_requests(self, **kwargs):
271        # type: (Any) -> Iterable[Packet]
272        scan_range = kwargs.pop("scan_range", range(0x100))  # type: Iterable[int]  # noqa: E501
273        return (OBD() / OBD_S09(iid=[x]) for x in scan_range)
274
275    def _get_table_entry_x(self, tup):
276        # type: (_AutomotiveTestCaseScanResult) -> str
277        return "Service 09"
278
279    def _get_table_entry_y(self, tup):
280        # type: (_AutomotiveTestCaseScanResult) -> str
281        resp = tup[2]
282        if resp is None:
283            return "Timeout"
284        else:
285            return "NR" if resp.service == 0x7f else \
286                "0x%02x %s" % (tup[1].iid[0],
287                               resp.data_records[0].lastlayer().name)
288
289
290class OBD_Scanner(AutomotiveTestCaseExecutor):
291    @property
292    def enumerators(self):
293        # type: () -> List[AutomotiveTestCaseABC]
294        return self.configuration.test_cases
295
296    @property
297    def default_test_case_clss(self):
298        # type: () -> List[Type[AutomotiveTestCaseABC]]
299        return [OBD_S01_Enumerator, OBD_S02_Enumerator, OBD_S06_Enumerator,
300                OBD_S08_Enumerator, OBD_S09_Enumerator, OBD_S03_Enumerator,
301                OBD_S07_Enumerator, OBD_S0A_Enumerator]
302