1# SPDX-License-Identifier: GPL-2.0-only 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) Andreas Korb <andreas.korb@e-mundo.de> 5# Copyright (C) Friedrich Feigel <friedrich.feigel@e-mundo.de> 6# Copyright (C) Nils Weiss <nils@we155.de> 7 8# scapy.contrib.description = OnBoardDiagnosticScanner 9# scapy.contrib.status = loads 10 11import copy 12 13from scapy.contrib.automotive.obd.obd import OBD, OBD_S03, OBD_S07, OBD_S0A, \ 14 OBD_S01, OBD_S06, OBD_S08, OBD_S09, OBD_NR, OBD_S02, OBD_S02_Record 15from scapy.config import conf 16from scapy.packet import Packet 17from scapy.themes import BlackAndWhite 18 19from scapy.contrib.automotive.scanner.enumerator import ServiceEnumerator, \ 20 _AutomotiveTestCaseScanResult, _AutomotiveTestCaseFilteredScanResult 21from scapy.contrib.automotive.scanner.executor import \ 22 AutomotiveTestCaseExecutor 23from scapy.contrib.automotive.ecu import EcuState 24from scapy.contrib.automotive.scanner.test_case import AutomotiveTestCaseABC, \ 25 _SocketUnion 26 27# Typing imports 28from typing import ( 29 List, 30 Type, 31 Any, 32 Iterable, 33) 34 35 36class OBD_Enumerator(ServiceEnumerator): 37 _supported_kwargs = copy.copy(ServiceEnumerator._supported_kwargs) 38 _supported_kwargs.update({ 39 'full_scan': (bool, None), 40 }) 41 42 _supported_kwargs_doc = ServiceEnumerator._supported_kwargs_doc + """ 43 :param bool full_scan: Specifies if the entire scan range is tested, or 44 if the bitmask with supported identifiers is 45 queried and only supported identifiers 46 are scanned.""" 47 48 @staticmethod 49 def _get_negative_response_code(resp): 50 # type: (Packet) -> int 51 return resp.response_code 52 53 @staticmethod 54 def _get_negative_response_desc(nrc): 55 # type: (int) -> str 56 return OBD_NR(response_code=nrc).sprintf("%OBD_NR.response_code%") 57 58 @staticmethod 59 def _get_negative_response_label(response): 60 # type: (Packet) -> str 61 return response.sprintf("NR: %OBD_NR.response_code%") 62 63 @property 64 def filtered_results(self): 65 # type: () -> List[_AutomotiveTestCaseFilteredScanResult] 66 return self.results_with_positive_response 67 68 69class OBD_Service_Enumerator(OBD_Enumerator): 70 """ 71 Base class for OBD_Service_Enumerators 72 """ 73 74 def get_supported(self, socket, state, **kwargs): 75 # type: (_SocketUnion, EcuState, Any) -> List[int] 76 super(OBD_Service_Enumerator, self).execute( 77 socket, state, scan_range=range(0, 0xff, 0x20), 78 exit_scan_on_first_negative_response=True, **kwargs) 79 80 supported = list() 81 for _, _, r, _, _ in self.results_with_positive_response: 82 dr = r.data_records[0] 83 key = next(iter((dr.lastlayer().fields.keys()))) 84 try: 85 supported += [int(i[-2:], 16) for i in 86 getattr(dr, key, ["xxx00"])] 87 except TypeError: 88 pass 89 return list(set([i for i in supported if i % 0x20])) 90 91 def execute(self, socket, state, **kwargs): 92 # type: (_SocketUnion, EcuState, Any) -> None 93 full_scan = kwargs.pop("full_scan", False) # type: bool 94 if full_scan: 95 super(OBD_Service_Enumerator, self).execute(socket, state, **kwargs) 96 else: 97 supported_pids = self.get_supported(socket, state, **kwargs) 98 del self._request_iterators[state] 99 super(OBD_Service_Enumerator, self).execute( 100 socket, state, scan_range=supported_pids, **kwargs) 101 102 execute.__doc__ = OBD_Enumerator._supported_kwargs_doc 103 104 @staticmethod 105 def print_payload(resp): 106 # type: (Packet) -> str 107 backup_ct = conf.color_theme 108 conf.color_theme = BlackAndWhite() 109 load = repr(resp.data_records[0].lastlayer()) 110 conf.color_theme = backup_ct 111 return load 112 113 def _get_table_entry_z(self, tup): 114 # type: (_AutomotiveTestCaseScanResult) -> str 115 return self._get_label(tup[2], self.print_payload) 116 117 118class OBD_DTC_Enumerator(OBD_Enumerator): 119 @staticmethod 120 def print_payload(resp): 121 # type: (Packet) -> str 122 backup_ct = conf.color_theme 123 conf.color_theme = BlackAndWhite() 124 load = repr(resp.dtcs) 125 conf.color_theme = backup_ct 126 return load 127 128 129class OBD_S03_Enumerator(OBD_DTC_Enumerator): 130 _description = "Available DTCs in OBD service 03" 131 132 def _get_initial_requests(self, **kwargs): 133 # type: (Any) -> Iterable[Packet] 134 return [OBD() / OBD_S03()] 135 136 def _get_table_entry_x(self, tup): 137 # type: (_AutomotiveTestCaseScanResult) -> str 138 return "Service 03" 139 140 def _get_table_entry_y(self, tup): 141 # type: (_AutomotiveTestCaseScanResult) -> str 142 resp = tup[2] 143 if resp is None: 144 return "Timeout" 145 else: 146 return "NR" if resp.service == 0x7f else "%d DTCs" % resp.count 147 148 149class OBD_S07_Enumerator(OBD_DTC_Enumerator): 150 _description = "Available DTCs in OBD service 07" 151 152 def _get_initial_requests(self, **kwargs): 153 # type: (Any) -> Iterable[Packet] 154 return [OBD() / OBD_S07()] 155 156 def _get_table_entry_x(self, tup): 157 # type: (_AutomotiveTestCaseScanResult) -> str 158 return "Service 07" 159 160 161class OBD_S0A_Enumerator(OBD_DTC_Enumerator): 162 _description = "Available DTCs in OBD service 10" 163 164 def _get_initial_requests(self, **kwargs): 165 # type: (Any) -> Iterable[Packet] 166 return [OBD() / OBD_S0A()] 167 168 def _get_table_entry_x(self, tup): 169 # type: (_AutomotiveTestCaseScanResult) -> str 170 return "Service 0A" 171 172 173class OBD_S01_Enumerator(OBD_Service_Enumerator): 174 """OBD_S01_Enumerator""" 175 176 _description = "Available data in OBD service 01" 177 178 def _get_initial_requests(self, **kwargs): 179 # type: (Any) -> Iterable[Packet] 180 scan_range = kwargs.pop("scan_range", range(0x100)) # type: Iterable[int] # noqa: E501 181 return (OBD() / OBD_S01(pid=[x]) for x in scan_range) 182 183 def _get_table_entry_x(self, tup): 184 # type: (_AutomotiveTestCaseScanResult) -> str 185 return "Service 01" 186 187 def _get_table_entry_y(self, tup): 188 # type: (_AutomotiveTestCaseScanResult) -> str 189 resp = tup[2] 190 if resp is None: 191 return "Timeout" 192 else: 193 return "NR" if resp.service == 0x7f else \ 194 "%s" % resp.data_records[0].lastlayer().name 195 196 197class OBD_S02_Enumerator(OBD_Service_Enumerator): 198 _description = "Available data in OBD service 02" 199 200 def _get_initial_requests(self, **kwargs): 201 # type: (Any) -> Iterable[Packet] 202 scan_range = kwargs.pop("scan_range", range(0x100)) # type: Iterable[int] # noqa: E501 203 return (OBD() / OBD_S02(requests=[OBD_S02_Record(pid=[x])]) 204 for x in scan_range) 205 206 def _get_table_entry_x(self, tup): 207 # type: (_AutomotiveTestCaseScanResult) -> str 208 return "Service 02" 209 210 def _get_table_entry_y(self, tup): 211 # type: (_AutomotiveTestCaseScanResult) -> str 212 resp = tup[2] 213 if resp is None: 214 return "Timeout" 215 else: 216 return "NR" if resp.service == 0x7f else \ 217 "%s" % resp.data_records[0].lastlayer().name 218 219 220class OBD_S06_Enumerator(OBD_Service_Enumerator): 221 _description = "Available data in OBD service 06" 222 223 def _get_initial_requests(self, **kwargs): 224 # type: (Any) -> Iterable[Packet] 225 scan_range = kwargs.pop("scan_range", range(0x100)) # type: Iterable[int] # noqa: E501 226 return (OBD() / OBD_S06(mid=[x]) for x in scan_range) 227 228 def _get_table_entry_x(self, tup): 229 # type: (_AutomotiveTestCaseScanResult) -> str 230 return "Service 06" 231 232 def _get_table_entry_y(self, tup): 233 # type: (_AutomotiveTestCaseScanResult) -> str 234 req = tup[1] 235 resp = tup[2] 236 if resp is None: 237 return "Timeout" 238 else: 239 return "NR" if resp.service == 0x7f else \ 240 "0x%02x %s" % ( 241 req.mid[0], 242 resp.data_records[0].sprintf("%OBD_S06_PR_Record.mid%")) 243 244 245class OBD_S08_Enumerator(OBD_Service_Enumerator): 246 _description = "Available data in OBD service 08" 247 248 def _get_initial_requests(self, **kwargs): 249 # type: (Any) -> Iterable[Packet] 250 scan_range = kwargs.pop("scan_range", range(0x100)) # type: Iterable[int] # noqa: E501 251 return (OBD() / OBD_S08(tid=[x]) for x in scan_range) 252 253 def _get_table_entry_x(self, tup): 254 # type: (_AutomotiveTestCaseScanResult) -> str 255 return "Service 08" 256 257 def _get_table_entry_y(self, tup): 258 # type: (_AutomotiveTestCaseScanResult) -> str 259 resp = tup[2] 260 if resp is None: 261 return "Timeout" 262 else: 263 return "NR" if resp.service == 0x7f else "0x%02x %s" % ( 264 tup[1].tid[0], resp.data_records[0].lastlayer().name) 265 266 267class OBD_S09_Enumerator(OBD_Service_Enumerator): 268 _description = "Available data in OBD service 09" 269 270 def _get_initial_requests(self, **kwargs): 271 # type: (Any) -> Iterable[Packet] 272 scan_range = kwargs.pop("scan_range", range(0x100)) # type: Iterable[int] # noqa: E501 273 return (OBD() / OBD_S09(iid=[x]) for x in scan_range) 274 275 def _get_table_entry_x(self, tup): 276 # type: (_AutomotiveTestCaseScanResult) -> str 277 return "Service 09" 278 279 def _get_table_entry_y(self, tup): 280 # type: (_AutomotiveTestCaseScanResult) -> str 281 resp = tup[2] 282 if resp is None: 283 return "Timeout" 284 else: 285 return "NR" if resp.service == 0x7f else \ 286 "0x%02x %s" % (tup[1].iid[0], 287 resp.data_records[0].lastlayer().name) 288 289 290class OBD_Scanner(AutomotiveTestCaseExecutor): 291 @property 292 def enumerators(self): 293 # type: () -> List[AutomotiveTestCaseABC] 294 return self.configuration.test_cases 295 296 @property 297 def default_test_case_clss(self): 298 # type: () -> List[Type[AutomotiveTestCaseABC]] 299 return [OBD_S01_Enumerator, OBD_S02_Enumerator, OBD_S06_Enumerator, 300 OBD_S08_Enumerator, OBD_S09_Enumerator, OBD_S03_Enumerator, 301 OBD_S07_Enumerator, OBD_S0A_Enumerator] 302