1#! /usr/bin/env python 2 3# SPDX-License-Identifier: GPL-2.0-only 4# This file is part of Scapy 5# See https://scapy.net/ for more information 6# Copyright (C) Nils Weiss <nils@we155.de> 7 8# scapy.contrib.description = Diagnostic over IP (DoIP) / ISO 13400 9# scapy.contrib.status = loads 10 11import socket 12import ssl 13import struct 14import time 15from typing import ( 16 Any, 17 Union, 18 Tuple, 19 Optional, 20 Dict, 21 Type, 22) 23 24from scapy.contrib.automotive import log_automotive 25from scapy.contrib.automotive.uds import UDS 26from scapy.data import MTU 27from scapy.fields import ( 28 ByteEnumField, 29 ConditionalField, 30 IntField, 31 MayEnd, 32 StrFixedLenField, 33 XByteEnumField, 34 XByteField, 35 XIntField, 36 XShortEnumField, 37 XShortField, 38 XStrField, 39) 40from scapy.layers.inet import TCP, UDP 41from scapy.packet import Packet, bind_layers, bind_bottom_up 42from scapy.supersocket import SSLStreamSocket 43 44 45# ISO 13400-2 sect 9.2 46 47 48class DoIP(Packet): 49 """ 50 Implementation of the DoIP (ISO 13400) protocol. DoIP packets can be sent 51 via UDP and TCP. Depending on the payload type, the correct connection 52 need to be chosen: 53 54 +--------------+--------------------------------------------------------------+-----------------+ 55 | Payload Type | Payload Type Name | Connection Kind | 56 +--------------+--------------------------------------------------------------+-----------------+ 57 | 0x0000 | Generic DoIP header negative acknowledge | UDP / TCP | 58 +--------------+--------------------------------------------------------------+-----------------+ 59 | 0x0001 | Vehicle Identification request message | UDP | 60 +--------------+--------------------------------------------------------------+-----------------+ 61 | 0x0002 | Vehicle identification request message with EID | UDP | 62 +--------------+--------------------------------------------------------------+-----------------+ 63 | 0x0003 | Vehicle identification request message with VIN | UDP | 64 +--------------+--------------------------------------------------------------+-----------------+ 65 | 0x0004 | Vehicle announcement message/vehicle identification response | UDP | 66 +--------------+--------------------------------------------------------------+-----------------+ 67 | 0x0005 | Routing activation request | TCP | 68 +--------------+--------------------------------------------------------------+-----------------+ 69 | 0x0006 | Routing activation response | TCP | 70 +--------------+--------------------------------------------------------------+-----------------+ 71 | 0x0007 | Alive Check request | TCP | 72 +--------------+--------------------------------------------------------------+-----------------+ 73 | 0x0008 | Alive Check response | TCP | 74 +--------------+--------------------------------------------------------------+-----------------+ 75 | 0x4001 | IP entity status request | UDP | 76 +--------------+--------------------------------------------------------------+-----------------+ 77 | 0x4002 | DoIP entity status response | UDP | 78 +--------------+--------------------------------------------------------------+-----------------+ 79 | 0x4003 | Diagnostic power mode information request | UDP | 80 +--------------+--------------------------------------------------------------+-----------------+ 81 | 0x4004 | Diagnostic power mode information response | UDP | 82 +--------------+--------------------------------------------------------------+-----------------+ 83 | 0x8001 | Diagnostic message | TCP | 84 +--------------+--------------------------------------------------------------+-----------------+ 85 | 0x8002 | Diagnostic message positive acknowledgement | TCP | 86 +--------------+--------------------------------------------------------------+-----------------+ 87 | 0x8003 | Diagnostic message negative acknowledgement | TCP | 88 +--------------+--------------------------------------------------------------+-----------------+ 89 90 Example with UDP: 91 >>> socket = L3RawSocket(iface="eth0") 92 >>> resp = socket.sr1(IP(dst="169.254.117.238")/UDP(dport=13400)/DoIP(payload_type=1)) 93 94 Example with TCP: 95 >>> socket = DoIPSocket("169.254.117.238") 96 >>> pkt = DoIP(payload_type=0x8001, source_address=0xe80, target_address=0x1000) / UDS() / UDS_RDBI(identifiers=[0x1000]) 97 >>> resp = socket.sr1(pkt, timeout=1) 98 99 Example with UDS: 100 >>> socket = UDS_DoIPSocket("169.254.117.238") 101 >>> pkt = UDS() / UDS_RDBI(identifiers=[0x1000]) 102 >>> resp = socket.sr1(pkt, timeout=1) 103 """ # noqa: E501 104 payload_types = { 105 0x0000: "Generic DoIP header NACK", 106 0x0001: "Vehicle identification request", 107 0x0002: "Vehicle identification request with EID", 108 0x0003: "Vehicle identification request with VIN", 109 0x0004: "Vehicle announcement message/vehicle identification response message", # noqa: E501 110 0x0005: "Routing activation request", 111 0x0006: "Routing activation response", 112 0x0007: "Alive check request", 113 0x0008: "Alive check response", 114 0x4001: "DoIP entity status request", 115 0x4002: "DoIP entity status response", 116 0x4003: "Diagnostic power mode information request", 117 0x4004: "Diagnostic power mode information response", 118 0x8001: "Diagnostic message", 119 0x8002: "Diagnostic message ACK", 120 0x8003: "Diagnostic message NACK"} 121 name = 'DoIP' 122 fields_desc = [ 123 XByteField("protocol_version", 0x02), 124 XByteField("inverse_version", 0xFD), 125 XShortEnumField("payload_type", 0, payload_types), 126 IntField("payload_length", None), 127 ConditionalField(ByteEnumField("nack", 0, { 128 0: "Incorrect pattern format", 1: "Unknown payload type", 129 2: "Message too large", 3: "Out of memory", 130 4: "Invalid payload length" 131 }), lambda p: p.payload_type in [0x0]), 132 ConditionalField(StrFixedLenField("vin", b"", 17), 133 lambda p: p.payload_type in [3, 4]), 134 ConditionalField(XShortField("logical_address", 0), 135 lambda p: p.payload_type in [4]), 136 ConditionalField(StrFixedLenField("eid", b"", 6), 137 lambda p: p.payload_type in [2, 4]), 138 ConditionalField(StrFixedLenField("gid", b"", 6), 139 lambda p: p.payload_type in [4]), 140 ConditionalField(MayEnd(XByteEnumField("further_action", 0, { 141 0x00: "No further action required", 142 0x01: "Reserved by ISO 13400", 0x02: "Reserved by ISO 13400", 143 0x03: "Reserved by ISO 13400", 0x04: "Reserved by ISO 13400", 144 0x05: "Reserved by ISO 13400", 0x06: "Reserved by ISO 13400", 145 0x07: "Reserved by ISO 13400", 0x08: "Reserved by ISO 13400", 146 0x09: "Reserved by ISO 13400", 0x0a: "Reserved by ISO 13400", 147 0x0b: "Reserved by ISO 13400", 0x0c: "Reserved by ISO 13400", 148 0x0d: "Reserved by ISO 13400", 0x0e: "Reserved by ISO 13400", 149 0x0f: "Reserved by ISO 13400", 150 0x10: "Routing activation required to initiate central security", 151 })), lambda p: p.payload_type in [4]), 152 # VIN/GID sync. status is marked as optional, so the packet MayEnd 153 # on further_action 154 ConditionalField(XByteEnumField("vin_gid_status", 0, { 155 0x00: "VIN and/or GID are synchronized", 156 0x01: "Reserved by ISO 13400", 0x02: "Reserved by ISO 13400", 157 0x03: "Reserved by ISO 13400", 0x04: "Reserved by ISO 13400", 158 0x05: "Reserved by ISO 13400", 0x06: "Reserved by ISO 13400", 159 0x07: "Reserved by ISO 13400", 0x08: "Reserved by ISO 13400", 160 0x09: "Reserved by ISO 13400", 0x0a: "Reserved by ISO 13400", 161 0x0b: "Reserved by ISO 13400", 0x0c: "Reserved by ISO 13400", 162 0x0d: "Reserved by ISO 13400", 0x0e: "Reserved by ISO 13400", 163 0x0f: "Reserved by ISO 13400", 164 0x10: "Incomplete: VIN and GID are NOT synchronized" 165 }), lambda p: p.payload_type in [4]), 166 ConditionalField(XShortField("source_address", 0), 167 lambda p: p.payload_type in [5, 8, 0x8001, 0x8002, 0x8003]), # noqa: E501 168 ConditionalField(XByteEnumField("activation_type", 0, { 169 0: "Default", 1: "WWH-OBD", 0xe0: "Central security", 170 0x16: "Default", 0x116: "Diagnostic", 0xe016: "Central security" 171 }), lambda p: p.payload_type in [5]), 172 ConditionalField(XShortField("logical_address_tester", 0), 173 lambda p: p.payload_type in [6]), 174 ConditionalField(XShortField("logical_address_doip_entity", 0), 175 lambda p: p.payload_type in [6]), 176 ConditionalField(XByteEnumField("routing_activation_response", 0, { 177 0x00: "Routing activation denied due to unknown source address.", 178 0x01: "Routing activation denied because all concurrently supported TCP_DATA sockets are registered and active.", # noqa: E501 179 0x02: "Routing activation denied because an SA different from the table connection entry was received on the already activated TCP_DATA socket.", # noqa: E501 180 0x03: "Routing activation denied because the SA is already registered and active on a different TCP_DATA socket.", # noqa: E501 181 0x04: "Routing activation denied due to missing authentication.", 182 0x05: "Routing activation denied due to rejected confirmation.", 183 0x06: "Routing activation denied due to unsupported routing activation type.", # noqa: E501 184 0x07: "Routing activation denied because the specified activation type requires a secure TLS TCP_DATA socket.", # noqa: E501 185 0x08: "Reserved by ISO 13400.", 186 0x09: "Reserved by ISO 13400.", 0x0a: "Reserved by ISO 13400.", 187 0x0b: "Reserved by ISO 13400.", 0x0c: "Reserved by ISO 13400.", 188 0x0d: "Reserved by ISO 13400.", 0x0e: "Reserved by ISO 13400.", 189 0x0f: "Reserved by ISO 13400.", 190 0x10: "Routing successfully activated.", 191 0x11: "Routing will be activated; confirmation required." 192 }), lambda p: p.payload_type in [6]), 193 ConditionalField(XIntField("reserved_iso", 0), 194 lambda p: p.payload_type in [5, 6]), 195 ConditionalField(XStrField("reserved_oem", b""), 196 lambda p: p.payload_type in [5, 6]), 197 ConditionalField(XByteEnumField("diagnostic_power_mode", 0, { 198 0: "not ready", 1: "ready", 2: "not supported" 199 }), lambda p: p.payload_type in [0x4004]), 200 ConditionalField(ByteEnumField("node_type", 0, { 201 0: "DoIP gateway", 1: "DoIP node" 202 }), lambda p: p.payload_type in [0x4002]), 203 ConditionalField(XByteField("max_open_sockets", 1), 204 lambda p: p.payload_type in [0x4002]), 205 ConditionalField(XByteField("cur_open_sockets", 0), 206 lambda p: p.payload_type in [0x4002]), 207 ConditionalField(IntField("max_data_size", 0), 208 lambda p: p.payload_type in [0x4002]), 209 ConditionalField(XShortField("target_address", 0), 210 lambda p: p.payload_type in [0x8001, 0x8002, 0x8003]), # noqa: E501 211 ConditionalField(XByteEnumField("ack_code", 0, {0: "ACK"}), 212 lambda p: p.payload_type in [0x8002]), 213 ConditionalField(ByteEnumField("nack_code", 0, { 214 0x00: "Reserved by ISO 13400", 0x01: "Reserved by ISO 13400", 215 0x02: "Invalid source address", 0x03: "Unknown target address", 216 0x04: "Diagnostic message too large", 0x05: "Out of memory", 217 0x06: "Target unreachable", 0x07: "Unknown network", 218 0x08: "Transport protocol error" 219 }), lambda p: p.payload_type in [0x8003]), 220 ConditionalField(XStrField("previous_msg", b""), 221 lambda p: p.payload_type in [0x8002, 0x8003]) 222 ] 223 224 def answers(self, other): 225 # type: (Packet) -> int 226 """DEV: true if self is an answer from other""" 227 if isinstance(other, type(self)): 228 if self.payload_type == 0: 229 return 1 230 231 matches = [(4, 1), (4, 2), (4, 3), (6, 5), (8, 7), 232 (0x4002, 0x4001), (0x4004, 0x4003), 233 (0x8001, 0x8001), (0x8003, 0x8001)] 234 if (self.payload_type, other.payload_type) in matches: 235 if self.payload_type == 0x8001: 236 return self.payload.answers(other.payload) 237 return 1 238 return 0 239 240 def hashret(self): 241 # type: () -> bytes 242 if self.payload_type in [0x8001, 0x8002, 0x8003]: 243 return bytes(self)[:2] + struct.pack( 244 "H", self.target_address ^ self.source_address) 245 return bytes(self)[:2] 246 247 def post_build(self, pkt, pay): 248 # type: (bytes, bytes) -> bytes 249 """ 250 This will set the Field 'payload_length' to the correct value. 251 """ 252 if self.payload_length is None: 253 pkt = pkt[:4] + struct.pack( 254 "!I", len(pay) + len(pkt) - 8) + pkt[8:] 255 return pkt + pay 256 257 def extract_padding(self, s): 258 # type: (bytes) -> Tuple[bytes, Optional[bytes]] 259 if self.payload_type == 0x8001: 260 return s[:self.payload_length - 4], s[self.payload_length - 4:] 261 else: 262 return b"", s 263 264 @classmethod 265 def tcp_reassemble(cls, data, metadata, session): 266 # type: (bytes, Dict[str, Any], Dict[str, Any]) -> Optional[Packet] 267 length = struct.unpack("!I", data[4:8])[0] + 8 268 if len(data) >= length: 269 return DoIP(data) 270 return None 271 272 273bind_bottom_up(UDP, DoIP, sport=13400) 274bind_bottom_up(UDP, DoIP, dport=13400) 275bind_layers(UDP, DoIP, sport=13400, dport=13400) 276 277bind_layers(TCP, DoIP, sport=13400) 278bind_layers(TCP, DoIP, dport=13400) 279 280bind_layers(DoIP, UDS, payload_type=0x8001) 281 282 283class DoIPSSLStreamSocket(SSLStreamSocket): 284 """Custom SSLStreamSocket for DoIP communication. 285 """ 286 287 def __init__(self, sock, basecls=None): 288 # type: (socket.socket, Optional[Type[Packet]]) -> None 289 super(DoIPSSLStreamSocket, self).__init__(sock, basecls or DoIP) 290 self.buffer = b"" 291 292 def recv(self, x=MTU, **kwargs): 293 # type: (Optional[int], **Any) -> Optional[Packet] 294 if len(self.buffer) < 8: 295 self.buffer += self.ins.recv(8) 296 if len(self.buffer) < 8: 297 return None 298 len_data = self.buffer[:8] 299 300 len_int = struct.unpack(">I", len_data[4:8])[0] 301 len_int += 8 302 303 self.buffer += self.ins.recv(len_int - len(self.buffer)) 304 if len(self.buffer) < len_int: 305 return None 306 pktbuf = self.buffer[:len_int] 307 self.buffer = self.buffer[len_int:] 308 309 pkt = self.basecls(pktbuf, **kwargs) # type: Packet 310 return pkt 311 312 313class DoIPSocket(DoIPSSLStreamSocket): 314 """Socket for DoIP communication. This sockets automatically 315 sends a routing activation request as soon as a TCP or TLS connection is 316 established. 317 318 :param ip: IP address of destination 319 :param port: destination port, usually 13400 320 :param tls_port: destination port for TLS connection, usually 3496 321 :param activate_routing: If true, routing activation request is 322 automatically sent 323 :param source_address: DoIP source address 324 :param target_address: DoIP target address, this is automatically 325 determined if routing activation request is sent 326 :param activation_type: This allows to set a different activation type for 327 the routing activation request 328 :param reserved_oem: Optional parameter to set value for reserved_oem field 329 of routing activation request 330 :param force_tls: Skip establishing of a TCP connection and directly try to 331 connect via SSL/TLS 332 :param context: Optional ssl.SSLContext object for initialization of ssl socket 333 connections. 334 335 Example: 336 >>> socket = DoIPSocket("169.254.0.131") 337 >>> pkt = DoIP(payload_type=0x8001, source_address=0xe80, target_address=0x1000) / UDS() / UDS_RDBI(identifiers=[0x1000]) 338 >>> resp = socket.sr1(pkt, timeout=1) 339 """ # noqa: E501 340 341 def __init__(self, 342 ip='127.0.0.1', # type: str 343 port=13400, # type: int 344 tls_port=3496, # type: int 345 activate_routing=True, # type: bool 346 source_address=0xe80, # type: int 347 target_address=0, # type: int 348 activation_type=0, # type: int 349 reserved_oem=b"", # type: bytes 350 force_tls=False, # type: bool 351 context=None # type: Optional[ssl.SSLContext] 352 ): # type: (...) -> None 353 self.ip = ip 354 self.port = port 355 self.tls_port = tls_port 356 self.activate_routing = activate_routing 357 self.source_address = source_address 358 self.target_address = target_address 359 self.activation_type = activation_type 360 self.reserved_oem = reserved_oem 361 self.force_tls = force_tls 362 self.context = context 363 try: 364 self._init_socket() 365 except Exception: 366 self.close() 367 raise 368 369 def _init_socket(self): 370 # type: () -> None 371 connected = False 372 addrinfo = socket.getaddrinfo(self.ip, self.port, proto=socket.IPPROTO_TCP) 373 sock_family = addrinfo[0][0] 374 375 s = socket.socket(sock_family, socket.SOCK_STREAM) 376 s.settimeout(5) 377 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 378 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 379 380 if not self.force_tls: 381 s.connect(addrinfo[0][-1]) 382 connected = True 383 DoIPSSLStreamSocket.__init__(self, s) 384 385 if not self.activate_routing: 386 return 387 388 activation_return = self._activate_routing() 389 else: 390 # Let's overwrite activation_return to force TLS Connection 391 activation_return = 0x07 392 393 if activation_return == 0x10: 394 # Routing successfully activated. 395 return 396 elif activation_return == 0x07: 397 # Routing activation denied because the specified activation 398 # type requires a secure TLS TCP_DATA socket. 399 if self.context is None: 400 raise ValueError("SSLContext 'context' can not be None") 401 if connected: 402 s.close() 403 s = socket.socket(sock_family, socket.SOCK_STREAM) 404 s.settimeout(5) 405 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 406 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 407 408 ss = self.context.wrap_socket(s) 409 addrinfo = socket.getaddrinfo( 410 self.ip, self.tls_port, proto=socket.IPPROTO_TCP) 411 ss.connect(addrinfo[0][-1]) 412 DoIPSSLStreamSocket.__init__(self, ss) 413 414 if not self.activate_routing: 415 return 416 417 activation_return = self._activate_routing() 418 if activation_return == 0x10: 419 # Routing successfully activated. 420 return 421 else: 422 raise Exception( 423 "DoIPSocket activate_routing failed with " 424 "routing_activation_response 0x%x" % activation_return) 425 426 elif activation_return == -1: 427 raise Exception("DoIPSocket._activate_routing failed") 428 else: 429 raise Exception( 430 "DoIPSocket activate_routing failed with " 431 "routing_activation_response 0x%x!" % activation_return) 432 433 def _activate_routing(self): # type: (...) -> int 434 resp = self.sr1( 435 DoIP(payload_type=0x5, activation_type=self.activation_type, 436 source_address=self.source_address, reserved_oem=self.reserved_oem), 437 verbose=False, timeout=1) 438 if resp and resp.payload_type == 0x6 and \ 439 resp.routing_activation_response == 0x10: 440 self.target_address = ( 441 self.target_address or resp.logical_address_doip_entity) 442 log_automotive.info( 443 "Routing activation successful! Target address set to: 0x%x", 444 self.target_address) 445 else: 446 log_automotive.error( 447 "Routing activation failed! Response: %s", repr(resp)) 448 449 if resp and resp.payload_type == 0x6: 450 return resp.routing_activation_response 451 else: 452 return -1 453 454 455class UDS_DoIPSocket(DoIPSocket): 456 """ 457 Application-Layer socket for DoIP endpoints. This socket takes care about 458 the encapsulation of UDS packets into DoIP packets. 459 460 Example: 461 >>> socket = UDS_DoIPSocket("169.254.117.238") 462 >>> pkt = UDS() / UDS_RDBI(identifiers=[0x1000]) 463 >>> resp = socket.sr1(pkt, timeout=1) 464 """ 465 466 def send(self, x): 467 # type: (Union[Packet, bytes]) -> int 468 if isinstance(x, UDS): 469 pkt = DoIP(payload_type=0x8001, 470 source_address=self.source_address, 471 target_address=self.target_address 472 ) / x 473 else: 474 pkt = x 475 476 try: 477 x.sent_time = time.time() # type: ignore 478 except AttributeError: 479 pass 480 481 return super().send(pkt) 482 483 def recv(self, x=MTU, **kwargs): 484 # type: (Optional[int], **Any) -> Optional[Packet] 485 pkt = super().recv(x, **kwargs) 486 if pkt and pkt.payload_type == 0x8001: 487 return pkt.payload 488 else: 489 return pkt 490 491 pass 492