1# SPDX-License-Identifier: GPL-2.0-only 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) Nils Weiss <nils@we155.de> 5 6# scapy.contrib.description = Unified Diagnostic Service (UDS) 7# scapy.contrib.status = loads 8 9""" 10UDS 11""" 12 13import struct 14from collections import defaultdict 15 16from scapy.fields import ByteEnumField, StrField, ConditionalField, \ 17 BitEnumField, BitField, XByteField, FieldListField, \ 18 XShortField, X3BytesField, XIntField, ByteField, \ 19 ShortField, ObservableDict, XShortEnumField, XByteEnumField, StrLenField, \ 20 FieldLenField, XStrFixedLenField, XStrLenField, FlagsField, PacketListField, \ 21 PacketField 22from scapy.packet import Packet, bind_layers, NoPayload, Raw 23from scapy.config import conf 24from scapy.error import log_loading 25from scapy.utils import PeriodicSenderThread 26from scapy.contrib.isotp import ISOTP 27 28# Typing imports 29from typing import ( 30 Dict, 31 Union, 32) 33 34try: 35 if conf.contribs['UDS']['treat-response-pending-as-answer']: 36 pass 37except KeyError: 38 log_loading.info("Specify \"conf.contribs['UDS'] = " 39 "{'treat-response-pending-as-answer': True}\" to treat " 40 "a negative response 'requestCorrectlyReceived-" 41 "ResponsePending' as answer of a request. \n" 42 "The default value is False.") 43 conf.contribs['UDS'] = {'treat-response-pending-as-answer': False} 44 45 46conf.debug_dissector = True 47 48 49class UDS(ISOTP): 50 services = ObservableDict( 51 {0x10: 'DiagnosticSessionControl', 52 0x11: 'ECUReset', 53 0x14: 'ClearDiagnosticInformation', 54 0x19: 'ReadDTCInformation', 55 0x22: 'ReadDataByIdentifier', 56 0x23: 'ReadMemoryByAddress', 57 0x24: 'ReadScalingDataByIdentifier', 58 0x27: 'SecurityAccess', 59 0x28: 'CommunicationControl', 60 0x29: 'Authentication', 61 0x2A: 'ReadDataPeriodicIdentifier', 62 0x2C: 'DynamicallyDefineDataIdentifier', 63 0x2E: 'WriteDataByIdentifier', 64 0x2F: 'InputOutputControlByIdentifier', 65 0x31: 'RoutineControl', 66 0x34: 'RequestDownload', 67 0x35: 'RequestUpload', 68 0x36: 'TransferData', 69 0x37: 'RequestTransferExit', 70 0x38: 'RequestFileTransfer', 71 0x3D: 'WriteMemoryByAddress', 72 0x3E: 'TesterPresent', 73 0x50: 'DiagnosticSessionControlPositiveResponse', 74 0x51: 'ECUResetPositiveResponse', 75 0x54: 'ClearDiagnosticInformationPositiveResponse', 76 0x59: 'ReadDTCInformationPositiveResponse', 77 0x62: 'ReadDataByIdentifierPositiveResponse', 78 0x63: 'ReadMemoryByAddressPositiveResponse', 79 0x64: 'ReadScalingDataByIdentifierPositiveResponse', 80 0x67: 'SecurityAccessPositiveResponse', 81 0x68: 'CommunicationControlPositiveResponse', 82 0x69: 'AuthenticationPositiveResponse', 83 0x6A: 'ReadDataPeriodicIdentifierPositiveResponse', 84 0x6C: 'DynamicallyDefineDataIdentifierPositiveResponse', 85 0x6E: 'WriteDataByIdentifierPositiveResponse', 86 0x6F: 'InputOutputControlByIdentifierPositiveResponse', 87 0x71: 'RoutineControlPositiveResponse', 88 0x74: 'RequestDownloadPositiveResponse', 89 0x75: 'RequestUploadPositiveResponse', 90 0x76: 'TransferDataPositiveResponse', 91 0x77: 'RequestTransferExitPositiveResponse', 92 0x78: 'RequestFileTransferPositiveResponse', 93 0x7D: 'WriteMemoryByAddressPositiveResponse', 94 0x7E: 'TesterPresentPositiveResponse', 95 0x83: 'AccessTimingParameter', 96 0x84: 'SecuredDataTransmission', 97 0x85: 'ControlDTCSetting', 98 0x86: 'ResponseOnEvent', 99 0x87: 'LinkControl', 100 0xC3: 'AccessTimingParameterPositiveResponse', 101 0xC4: 'SecuredDataTransmissionPositiveResponse', 102 0xC5: 'ControlDTCSettingPositiveResponse', 103 0xC6: 'ResponseOnEventPositiveResponse', 104 0xC7: 'LinkControlPositiveResponse', 105 0x7f: 'NegativeResponse'}) # type: Dict[int, str] 106 name = 'UDS' 107 fields_desc = [ 108 XByteEnumField('service', 0, services) 109 ] 110 111 def answers(self, other): 112 # type: (Union[UDS, Packet]) -> bool 113 if other.__class__ != self.__class__: 114 return False 115 if self.service == 0x7f: 116 return self.payload.answers(other) 117 if self.service == (other.service + 0x40): 118 if isinstance(self.payload, NoPayload) or \ 119 isinstance(other.payload, NoPayload): 120 return len(self) <= len(other) 121 else: 122 return self.payload.answers(other.payload) 123 return False 124 125 def hashret(self): 126 # type: () -> bytes 127 if self.service == 0x7f and len(self) >= 3: 128 return struct.pack('B', bytes(self)[1] & ~0x40) 129 return struct.pack('B', self.service & ~0x40) 130 131 132# ########################DSC################################### 133class UDS_DSC(Packet): 134 diagnosticSessionTypes = ObservableDict({ 135 0x00: 'ISOSAEReserved', 136 0x01: 'defaultSession', 137 0x02: 'programmingSession', 138 0x03: 'extendedDiagnosticSession', 139 0x04: 'safetySystemDiagnosticSession', 140 0x7F: 'ISOSAEReserved'}) 141 name = 'DiagnosticSessionControl' 142 fields_desc = [ 143 ByteEnumField('diagnosticSessionType', 0, diagnosticSessionTypes) 144 ] 145 146 147bind_layers(UDS, UDS_DSC, service=0x10) 148 149 150class UDS_DSCPR(Packet): 151 name = 'DiagnosticSessionControlPositiveResponse' 152 fields_desc = [ 153 ByteEnumField('diagnosticSessionType', 0, 154 UDS_DSC.diagnosticSessionTypes), 155 StrField('sessionParameterRecord', b"") 156 ] 157 158 def answers(self, other): 159 return isinstance(other, UDS_DSC) and \ 160 other.diagnosticSessionType == self.diagnosticSessionType 161 162 163bind_layers(UDS, UDS_DSCPR, service=0x50) 164 165 166# #########################ER################################### 167class UDS_ER(Packet): 168 resetTypes = { 169 0x00: 'ISOSAEReserved', 170 0x01: 'hardReset', 171 0x02: 'keyOffOnReset', 172 0x03: 'softReset', 173 0x04: 'enableRapidPowerShutDown', 174 0x05: 'disableRapidPowerShutDown', 175 0x41: 'powerDown', 176 0x7F: 'ISOSAEReserved'} 177 name = 'ECUReset' 178 fields_desc = [ 179 ByteEnumField('resetType', 0, resetTypes) 180 ] 181 182 183bind_layers(UDS, UDS_ER, service=0x11) 184 185 186class UDS_ERPR(Packet): 187 name = 'ECUResetPositiveResponse' 188 fields_desc = [ 189 ByteEnumField('resetType', 0, UDS_ER.resetTypes), 190 ConditionalField(ByteField('powerDownTime', 0), 191 lambda pkt: pkt.resetType == 0x04) 192 ] 193 194 def answers(self, other): 195 return isinstance(other, UDS_ER) and other.resetType == self.resetType 196 197 198bind_layers(UDS, UDS_ERPR, service=0x51) 199 200 201# #########################SA################################### 202class UDS_SA(Packet): 203 name = 'SecurityAccess' 204 fields_desc = [ 205 ByteField('securityAccessType', 0), 206 ConditionalField(StrField('securityAccessDataRecord', b""), 207 lambda pkt: pkt.securityAccessType % 2 == 1), 208 ConditionalField(StrField('securityKey', b""), 209 lambda pkt: pkt.securityAccessType % 2 == 0) 210 ] 211 212 213bind_layers(UDS, UDS_SA, service=0x27) 214 215 216class UDS_SAPR(Packet): 217 name = 'SecurityAccessPositiveResponse' 218 fields_desc = [ 219 ByteField('securityAccessType', 0), 220 ConditionalField(StrField('securitySeed', b""), 221 lambda pkt: pkt.securityAccessType % 2 == 1), 222 ] 223 224 def answers(self, other): 225 return isinstance(other, UDS_SA) \ 226 and other.securityAccessType == self.securityAccessType 227 228 229bind_layers(UDS, UDS_SAPR, service=0x67) 230 231 232# #########################CC################################### 233class UDS_CC(Packet): 234 controlTypes = { 235 0x00: 'enableRxAndTx', 236 0x01: 'enableRxAndDisableTx', 237 0x02: 'disableRxAndEnableTx', 238 0x03: 'disableRxAndTx' 239 } 240 name = 'CommunicationControl' 241 fields_desc = [ 242 ByteEnumField('controlType', 0, controlTypes), 243 BitEnumField('communicationType0', 0, 2, 244 {0: 'ISOSAEReserved', 245 1: 'normalCommunicationMessages', 246 2: 'networkManagmentCommunicationMessages', 247 3: 'networkManagmentCommunicationMessages and ' 248 'normalCommunicationMessages'}), 249 BitField('communicationType1', 0, 2), 250 BitEnumField('communicationType2', 0, 4, 251 {0: 'Disable/Enable specified communication Type', 252 1: 'Disable/Enable specific subnet', 253 2: 'Disable/Enable specific subnet', 254 3: 'Disable/Enable specific subnet', 255 4: 'Disable/Enable specific subnet', 256 5: 'Disable/Enable specific subnet', 257 6: 'Disable/Enable specific subnet', 258 7: 'Disable/Enable specific subnet', 259 8: 'Disable/Enable specific subnet', 260 9: 'Disable/Enable specific subnet', 261 10: 'Disable/Enable specific subnet', 262 11: 'Disable/Enable specific subnet', 263 12: 'Disable/Enable specific subnet', 264 13: 'Disable/Enable specific subnet', 265 14: 'Disable/Enable specific subnet', 266 15: 'Disable/Enable network'}) 267 ] 268 269 270bind_layers(UDS, UDS_CC, service=0x28) 271 272 273class UDS_CCPR(Packet): 274 name = 'CommunicationControlPositiveResponse' 275 fields_desc = [ 276 ByteEnumField('controlType', 0, UDS_CC.controlTypes) 277 ] 278 279 def answers(self, other): 280 return isinstance(other, UDS_CC) \ 281 and other.controlType == self.controlType 282 283 284bind_layers(UDS, UDS_CCPR, service=0x68) 285 286 287# #########################AUTH################################### 288class UDS_AUTH(Packet): 289 subFunctions = { 290 0x00: 'deAuthenticate', 291 0x01: 'verifyCertificateUnidirectional', 292 0x02: 'verifyCertificateBidirectional', 293 0x03: 'proofOfOwnership', 294 0x04: 'transmitCertificate', 295 0x05: 'requestChallengeForAuthentication', 296 0x06: 'verifyProofOfOwnershipUnidirectional', 297 0x07: 'verifyProofOfOwnershipBidirectional', 298 0x08: 'authenticationConfiguration', 299 0x7F: 'ISOSAEReserved' 300 } 301 name = "Authentication" 302 fields_desc = [ 303 ByteEnumField('subFunction', 0, subFunctions), 304 ConditionalField(XByteField('communicationConfiguration', 0), 305 lambda pkt: pkt.subFunction in [0x01, 0x02, 0x5]), 306 ConditionalField(XShortField('certificateEvaluationId', 0), 307 lambda pkt: pkt.subFunction == 0x04), 308 ConditionalField(XStrFixedLenField('algorithmIndicator', 0, length=16), 309 lambda pkt: pkt.subFunction in [0x05, 0x06, 0x07]), 310 ConditionalField(FieldLenField('lengthOfCertificateClient', None, 311 fmt="H", length_of='certificateClient'), 312 lambda pkt: pkt.subFunction in [0x01, 0x02]), 313 ConditionalField(XStrLenField('certificateClient', b"", 314 length_from=lambda p: 315 p.lengthOfCertificateClient), 316 lambda pkt: pkt.subFunction in [0x01, 0x02]), 317 ConditionalField(FieldLenField('lengthOfProofOfOwnershipClient', None, 318 fmt="H", 319 length_of='proofOfOwnershipClient'), 320 lambda pkt: pkt.subFunction in [0x03, 0x06, 0x07]), 321 ConditionalField(XStrLenField('proofOfOwnershipClient', b"", 322 length_from=lambda p: 323 p.lengthOfProofOfOwnershipClient), 324 lambda pkt: pkt.subFunction in [0x03, 0x06, 0x07]), 325 ConditionalField(FieldLenField('lengthOfChallengeClient', None, 326 fmt="H", length_of='challengeClient'), 327 lambda pkt: pkt.subFunction in [0x01, 0x02, 0x06, 328 0x07]), 329 ConditionalField(XStrLenField('challengeClient', b"", 330 length_from=lambda p: 331 p.lengthOfChallengeClient), 332 lambda pkt: pkt.subFunction in [0x01, 0x02, 0x06, 333 0x07]), 334 ConditionalField(FieldLenField('lengthOfEphemeralPublicKeyClient', 335 None, fmt="H", 336 length_of='ephemeralPublicKeyClient'), 337 lambda pkt: pkt.subFunction == 0x03), 338 ConditionalField(XStrLenField('ephemeralPublicKeyClient', b"", 339 length_from=lambda p: 340 p.lengthOfEphemeralPublicKeyClient), 341 lambda pkt: pkt.subFunction == 0x03), 342 ConditionalField(FieldLenField('lengthOfCertificateData', None, 343 fmt="H", length_of='certificateData'), 344 lambda pkt: pkt.subFunction == 0x04), 345 ConditionalField(XStrLenField('certificateData', b"", 346 length_from=lambda p: 347 p.lengthOfCertificateData), 348 lambda pkt: pkt.subFunction == 0x04), 349 ConditionalField(FieldLenField('lengthOfAdditionalParameter', None, 350 fmt="H", 351 length_of='additionalParameter'), 352 lambda pkt: pkt.subFunction in [0x06, 0x07]), 353 ConditionalField(XStrLenField('additionalParameter', b"", 354 length_from=lambda p: 355 p.lengthOfAdditionalParameter), 356 lambda pkt: pkt.subFunction in [0x06, 0x07]), 357 ] 358 359 360bind_layers(UDS, UDS_AUTH, service=0x29) 361 362 363class UDS_AUTHPR(Packet): 364 authenticationReturnParameterTypes = { 365 0x00: 'requestAccepted', 366 0x01: 'generalReject', 367 # Authentication with PKI Certificate Exchange (ACPE) 368 0x02: 'authenticationConfigurationAPCE', 369 # Authentication with Challenge-Response (ACR) 370 0x03: 'authenticationConfigurationACRWithAsymmetricCryptography', 371 0x04: 'authenticationConfigurationACRWithSymmetricCryptography', 372 0x05: 'ISOSAEReserved', 373 0x0F: 'ISOSAEReserved', 374 0x10: 'deAuthenticationSuccessful', 375 0x11: 'certificateVerifiedOwnershipVerificationNecessary', 376 0x12: 'ownershipVerifiedAuthenticationComplete', 377 0x13: 'certificateVerified', 378 0x14: 'ISOSAEReserved', 379 0x9F: 'ISOSAEReserved', 380 0xFF: 'ISOSAEReserved' 381 } 382 name = 'AuthenticationPositiveResponse' 383 fields_desc = [ 384 ByteEnumField('subFunction', 0, UDS_AUTH.subFunctions), 385 ByteEnumField('returnValue', 0, authenticationReturnParameterTypes), 386 ConditionalField(XStrFixedLenField('algorithmIndicator', 0, length=16), 387 lambda pkt: pkt.subFunction in [0x05, 0x06, 0x07]), 388 ConditionalField(FieldLenField('lengthOfChallengeServer', None, 389 fmt="H", length_of='challengeServer'), 390 lambda pkt: pkt.subFunction in [0x01, 0x02, 0x05]), 391 ConditionalField(XStrLenField('challengeServer', b"", 392 length_from=lambda p: 393 p.lengthOfChallengeServer), 394 lambda pkt: pkt.subFunction in [0x01, 0x02, 0x05]), 395 ConditionalField(FieldLenField('lengthOfCertificateServer', None, 396 fmt="H", length_of='certificateServer'), 397 lambda pkt: pkt.subFunction == 0x02), 398 ConditionalField(XStrLenField('certificateServer', b"", 399 length_from=lambda p: 400 p.lengthOfCertificateServer), 401 lambda pkt: pkt.subFunction == 0x02), 402 ConditionalField(FieldLenField('lengthOfProofOfOwnershipServer', None, 403 fmt="H", 404 length_of='proofOfOwnershipServer'), 405 lambda pkt: pkt.subFunction in [0x02, 0x07]), 406 ConditionalField(XStrLenField('proofOfOwnershipServer', b"", 407 length_from=lambda p: 408 p.lengthOfProofOfOwnershipServer), 409 lambda pkt: pkt.subFunction in [0x02, 0x07]), 410 ConditionalField(FieldLenField('lengthOfSessionKeyInfo', None, fmt="H", 411 length_of='sessionKeyInfo'), 412 lambda pkt: pkt.subFunction in [0x03, 0x06, 0x07]), 413 ConditionalField(XStrLenField('sessionKeyInfo', b"", 414 length_from=lambda p: 415 p.lengthOfSessionKeyInfo), 416 lambda pkt: pkt.subFunction in [0x03, 0x06, 0x07]), 417 ConditionalField(FieldLenField('lengthOfEphemeralPublicKeyServer', 418 None, fmt="H", 419 length_of='ephemeralPublicKeyServer'), 420 lambda pkt: pkt.subFunction in [0x01, 0x02]), 421 ConditionalField(XStrLenField('ephemeralPublicKeyServer', b"", 422 length_from=lambda p: 423 p.lengthOfEphemeralPublicKeyServer), 424 lambda pkt: pkt.subFunction in [0x1, 0x02]), 425 ConditionalField(FieldLenField('lengthOfNeededAdditionalParameter', 426 None, fmt="H", 427 length_of='neededAdditionalParameter'), 428 lambda pkt: pkt.subFunction == 0x05), 429 ConditionalField(XStrLenField('neededAdditionalParameter', b"", 430 length_from=lambda p: 431 p.lengthOfNeededAdditionalParameter), 432 lambda pkt: pkt.subFunction == 0x05), 433 ] 434 435 def answers(self, other): 436 return isinstance(other, UDS_AUTH) \ 437 and other.subFunction == self.subFunction 438 439 440bind_layers(UDS, UDS_AUTHPR, service=0x69) 441 442 443# #########################TP################################### 444class UDS_TP(Packet): 445 name = 'TesterPresent' 446 fields_desc = [ 447 ByteField('subFunction', 0) 448 ] 449 450 451bind_layers(UDS, UDS_TP, service=0x3E) 452 453 454class UDS_TPPR(Packet): 455 name = 'TesterPresentPositiveResponse' 456 fields_desc = [ 457 ByteField('zeroSubFunction', 0) 458 ] 459 460 def answers(self, other): 461 return isinstance(other, UDS_TP) 462 463 464bind_layers(UDS, UDS_TPPR, service=0x7E) 465 466 467# #########################ATP################################### 468class UDS_ATP(Packet): 469 timingParameterAccessTypes = { 470 0: 'ISOSAEReserved', 471 1: 'readExtendedTimingParameterSet', 472 2: 'setTimingParametersToDefaultValues', 473 3: 'readCurrentlyActiveTimingParameters', 474 4: 'setTimingParametersToGivenValues' 475 } 476 name = 'AccessTimingParameter' 477 fields_desc = [ 478 ByteEnumField('timingParameterAccessType', 0, 479 timingParameterAccessTypes), 480 ConditionalField(StrField('timingParameterRequestRecord', b""), 481 lambda pkt: pkt.timingParameterAccessType == 0x4) 482 ] 483 484 485bind_layers(UDS, UDS_ATP, service=0x83) 486 487 488class UDS_ATPPR(Packet): 489 name = 'AccessTimingParameterPositiveResponse' 490 fields_desc = [ 491 ByteEnumField('timingParameterAccessType', 0, 492 UDS_ATP.timingParameterAccessTypes), 493 ConditionalField(StrField('timingParameterResponseRecord', b""), 494 lambda pkt: pkt.timingParameterAccessType == 0x3) 495 ] 496 497 def answers(self, other): 498 return isinstance(other, UDS_ATP) \ 499 and other.timingParameterAccessType == \ 500 self.timingParameterAccessType 501 502 503bind_layers(UDS, UDS_ATPPR, service=0xC3) 504 505 506# #########################SDT################################### 507# TODO: Implement correct internal message service handling here, 508# instead of using just the dataRecord 509class UDS_SDT(Packet): 510 name = 'SecuredDataTransmission' 511 fields_desc = [ 512 BitField('requestMessage', 0, 1), 513 BitField('ISOSAEReservedBackwardsCompatibility', 0, 2), 514 BitField('preEstablishedKeyUsed', 0, 1), 515 BitField('encryptedMessage', 0, 1), 516 BitField('signedMessage', 0, 1), 517 BitField('signedResponseRequested', 0, 1), 518 BitField('ISOSAEReserved', 0, 9), 519 ByteField('signatureEncryptionCalculation', 0), 520 XShortField('signatureLength', 0), 521 XShortField('antiReplayCounter', 0), 522 ByteField('internalMessageServiceRequestId', 0), 523 StrField('dataRecord', b"", fmt="B") 524 ] 525 526 527bind_layers(UDS, UDS_SDT, service=0x84) 528 529 530class UDS_SDTPR(Packet): 531 name = 'SecuredDataTransmissionPositiveResponse' 532 fields_desc = [ 533 BitField('requestMessage', 0, 1), 534 BitField('ISOSAEReservedBackwardsCompatibility', 0, 2), 535 BitField('preEstablishedKeyUsed', 0, 1), 536 BitField('encryptedMessage', 0, 1), 537 BitField('signedMessage', 0, 1), 538 BitField('signedResponseRequested', 0, 1), 539 BitField('ISOSAEReserved', 0, 9), 540 ByteField('signatureEncryptionCalculation', 0), 541 XShortField('signatureLength', 0), 542 XShortField('antiReplayCounter', 0), 543 ByteField('internalMessageServiceResponseId', 0), 544 StrField('dataRecord', b"", fmt="B") 545 ] 546 547 def answers(self, other): 548 return isinstance(other, UDS_SDT) 549 550 551bind_layers(UDS, UDS_SDTPR, service=0xC4) 552 553 554# #########################CDTCS################################### 555class UDS_CDTCS(Packet): 556 DTCSettingTypes = { 557 0: 'ISOSAEReserved', 558 1: 'on', 559 2: 'off' 560 } 561 name = 'ControlDTCSetting' 562 fields_desc = [ 563 ByteEnumField('DTCSettingType', 0, DTCSettingTypes), 564 StrField('DTCSettingControlOptionRecord', b"") 565 ] 566 567 568bind_layers(UDS, UDS_CDTCS, service=0x85) 569 570 571class UDS_CDTCSPR(Packet): 572 name = 'ControlDTCSettingPositiveResponse' 573 fields_desc = [ 574 ByteEnumField('DTCSettingType', 0, UDS_CDTCS.DTCSettingTypes) 575 ] 576 577 def answers(self, other): 578 return isinstance(other, UDS_CDTCS) 579 580 581bind_layers(UDS, UDS_CDTCSPR, service=0xC5) 582 583 584# #########################ROE################################### 585# TODO: improve this protocol implementation 586class UDS_ROE(Packet): 587 eventTypes = { 588 0: 'doNotStoreEvent', 589 1: 'storeEvent' 590 } 591 name = 'ResponseOnEvent' 592 fields_desc = [ 593 ByteEnumField('eventType', 0, eventTypes), 594 ByteField('eventWindowTime', 0), 595 StrField('eventTypeRecord', b"") 596 ] 597 598 599bind_layers(UDS, UDS_ROE, service=0x86) 600 601 602class UDS_ROEPR(Packet): 603 name = 'ResponseOnEventPositiveResponse' 604 fields_desc = [ 605 ByteEnumField('eventType', 0, UDS_ROE.eventTypes), 606 ByteField('numberOfIdentifiedEvents', 0), 607 ByteField('eventWindowTime', 0), 608 StrField('eventTypeRecord', b"") 609 ] 610 611 def answers(self, other): 612 return isinstance(other, UDS_ROE) \ 613 and other.eventType == self.eventType 614 615 616bind_layers(UDS, UDS_ROEPR, service=0xC6) 617 618 619# #########################LC################################### 620class UDS_LC(Packet): 621 linkControlTypes = { 622 0: 'ISOSAEReserved', 623 1: 'verifyBaudrateTransitionWithFixedBaudrate', 624 2: 'verifyBaudrateTransitionWithSpecificBaudrate', 625 3: 'transitionBaudrate' 626 } 627 name = 'LinkControl' 628 fields_desc = [ 629 ByteEnumField('linkControlType', 0, linkControlTypes), 630 ConditionalField(ByteField('baudrateIdentifier', 0), 631 lambda pkt: pkt.linkControlType == 0x1), 632 ConditionalField(ByteField('baudrateHighByte', 0), 633 lambda pkt: pkt.linkControlType == 0x2), 634 ConditionalField(ByteField('baudrateMiddleByte', 0), 635 lambda pkt: pkt.linkControlType == 0x2), 636 ConditionalField(ByteField('baudrateLowByte', 0), 637 lambda pkt: pkt.linkControlType == 0x2) 638 ] 639 640 641bind_layers(UDS, UDS_LC, service=0x87) 642 643 644class UDS_LCPR(Packet): 645 name = 'LinkControlPositiveResponse' 646 fields_desc = [ 647 ByteEnumField('linkControlType', 0, UDS_LC.linkControlTypes) 648 ] 649 650 def answers(self, other): 651 return isinstance(other, UDS_LC) \ 652 and other.linkControlType == self.linkControlType 653 654 655bind_layers(UDS, UDS_LCPR, service=0xC7) 656 657 658# #########################RDBI################################### 659class UDS_RDBI(Packet): 660 dataIdentifiers = ObservableDict() 661 name = 'ReadDataByIdentifier' 662 fields_desc = [ 663 FieldListField("identifiers", None, 664 XShortEnumField('dataIdentifier', 0, 665 dataIdentifiers)) 666 ] 667 668 669bind_layers(UDS, UDS_RDBI, service=0x22) 670 671 672class UDS_RDBIPR(Packet): 673 name = 'ReadDataByIdentifierPositiveResponse' 674 fields_desc = [ 675 XShortEnumField('dataIdentifier', 0, 676 UDS_RDBI.dataIdentifiers), 677 ] 678 679 def answers(self, other): 680 return isinstance(other, UDS_RDBI) \ 681 and self.dataIdentifier in other.identifiers 682 683 684bind_layers(UDS, UDS_RDBIPR, service=0x62) 685 686 687# #########################RMBA################################### 688class UDS_RMBA(Packet): 689 name = 'ReadMemoryByAddress' 690 fields_desc = [ 691 BitField('memorySizeLen', 0, 4), 692 BitField('memoryAddressLen', 0, 4), 693 ConditionalField(XByteField('memoryAddress1', 0), 694 lambda pkt: pkt.memoryAddressLen == 1), 695 ConditionalField(XShortField('memoryAddress2', 0), 696 lambda pkt: pkt.memoryAddressLen == 2), 697 ConditionalField(X3BytesField('memoryAddress3', 0), 698 lambda pkt: pkt.memoryAddressLen == 3), 699 ConditionalField(XIntField('memoryAddress4', 0), 700 lambda pkt: pkt.memoryAddressLen == 4), 701 ConditionalField(XByteField('memorySize1', 0), 702 lambda pkt: pkt.memorySizeLen == 1), 703 ConditionalField(XShortField('memorySize2', 0), 704 lambda pkt: pkt.memorySizeLen == 2), 705 ConditionalField(X3BytesField('memorySize3', 0), 706 lambda pkt: pkt.memorySizeLen == 3), 707 ConditionalField(XIntField('memorySize4', 0), 708 lambda pkt: pkt.memorySizeLen == 4), 709 ] 710 711 712bind_layers(UDS, UDS_RMBA, service=0x23) 713 714 715class UDS_RMBAPR(Packet): 716 name = 'ReadMemoryByAddressPositiveResponse' 717 fields_desc = [ 718 StrField('dataRecord', b"", fmt="B") 719 ] 720 721 def answers(self, other): 722 return isinstance(other, UDS_RMBA) 723 724 725bind_layers(UDS, UDS_RMBAPR, service=0x63) 726 727 728# #########################RSDBI################################### 729class UDS_RSDBI(Packet): 730 name = 'ReadScalingDataByIdentifier' 731 dataIdentifiers = ObservableDict() 732 fields_desc = [ 733 XShortEnumField('dataIdentifier', 0, dataIdentifiers) 734 ] 735 736 737bind_layers(UDS, UDS_RSDBI, service=0x24) 738 739 740# TODO: Implement correct scaling here, instead of using just the dataRecord 741class UDS_RSDBIPR(Packet): 742 name = 'ReadScalingDataByIdentifierPositiveResponse' 743 fields_desc = [ 744 XShortEnumField('dataIdentifier', 0, UDS_RSDBI.dataIdentifiers), 745 ByteField('scalingByte', 0), 746 StrField('dataRecord', b"", fmt="B") 747 ] 748 749 def answers(self, other): 750 return isinstance(other, UDS_RSDBI) \ 751 and other.dataIdentifier == self.dataIdentifier 752 753 754bind_layers(UDS, UDS_RSDBIPR, service=0x64) 755 756 757# #########################RDBPI################################### 758class UDS_RDBPI(Packet): 759 periodicDataIdentifiers = ObservableDict() 760 transmissionModes = { 761 0: 'ISOSAEReserved', 762 1: 'sendAtSlowRate', 763 2: 'sendAtMediumRate', 764 3: 'sendAtFastRate', 765 4: 'stopSending' 766 } 767 name = 'ReadDataByPeriodicIdentifier' 768 fields_desc = [ 769 ByteEnumField('transmissionMode', 0, transmissionModes), 770 ByteEnumField('periodicDataIdentifier', 0, periodicDataIdentifiers), 771 StrField('furtherPeriodicDataIdentifier', b"", fmt="B") 772 ] 773 774 775bind_layers(UDS, UDS_RDBPI, service=0x2A) 776 777 778# TODO: Implement correct scaling here, instead of using just the dataRecord 779class UDS_RDBPIPR(Packet): 780 name = 'ReadDataByPeriodicIdentifierPositiveResponse' 781 fields_desc = [ 782 ByteField('periodicDataIdentifier', 0), 783 StrField('dataRecord', b"", fmt="B") 784 ] 785 786 def answers(self, other): 787 return isinstance(other, UDS_RDBPI) \ 788 and other.periodicDataIdentifier == self.periodicDataIdentifier 789 790 791bind_layers(UDS, UDS_RDBPIPR, service=0x6A) 792 793 794# #########################DDDI################################### 795# TODO: Implement correct interpretation here, 796# instead of using just the dataRecord 797class UDS_DDDI(Packet): 798 name = 'DynamicallyDefineDataIdentifier' 799 subFunctions = {0x1: "defineByIdentifier", 800 0x2: "defineByMemoryAddress", 801 0x3: "clearDynamicallyDefinedDataIdentifier"} 802 fields_desc = [ 803 ByteEnumField('subFunction', 0, subFunctions), 804 StrField('dataRecord', b"", fmt="B") 805 ] 806 807 808bind_layers(UDS, UDS_DDDI, service=0x2C) 809 810 811class UDS_DDDIPR(Packet): 812 name = 'DynamicallyDefineDataIdentifierPositiveResponse' 813 fields_desc = [ 814 ByteEnumField('subFunction', 0, UDS_DDDI.subFunctions), 815 XShortField('dynamicallyDefinedDataIdentifier', 0) 816 ] 817 818 def answers(self, other): 819 return isinstance(other, UDS_DDDI) \ 820 and other.subFunction == self.subFunction 821 822 823bind_layers(UDS, UDS_DDDIPR, service=0x6C) 824 825 826# #########################WDBI################################### 827class UDS_WDBI(Packet): 828 name = 'WriteDataByIdentifier' 829 fields_desc = [ 830 XShortEnumField('dataIdentifier', 0, 831 UDS_RDBI.dataIdentifiers) 832 ] 833 834 835bind_layers(UDS, UDS_WDBI, service=0x2E) 836 837 838class UDS_WDBIPR(Packet): 839 name = 'WriteDataByIdentifierPositiveResponse' 840 fields_desc = [ 841 XShortEnumField('dataIdentifier', 0, 842 UDS_RDBI.dataIdentifiers), 843 ] 844 845 def answers(self, other): 846 return isinstance(other, UDS_WDBI) \ 847 and other.dataIdentifier == self.dataIdentifier 848 849 850bind_layers(UDS, UDS_WDBIPR, service=0x6E) 851 852 853# #########################WMBA################################### 854class UDS_WMBA(Packet): 855 name = 'WriteMemoryByAddress' 856 fields_desc = [ 857 BitField('memorySizeLen', 0, 4), 858 BitField('memoryAddressLen', 0, 4), 859 ConditionalField(XByteField('memoryAddress1', 0), 860 lambda pkt: pkt.memoryAddressLen == 1), 861 ConditionalField(XShortField('memoryAddress2', 0), 862 lambda pkt: pkt.memoryAddressLen == 2), 863 ConditionalField(X3BytesField('memoryAddress3', 0), 864 lambda pkt: pkt.memoryAddressLen == 3), 865 ConditionalField(XIntField('memoryAddress4', 0), 866 lambda pkt: pkt.memoryAddressLen == 4), 867 ConditionalField(XByteField('memorySize1', 0), 868 lambda pkt: pkt.memorySizeLen == 1), 869 ConditionalField(XShortField('memorySize2', 0), 870 lambda pkt: pkt.memorySizeLen == 2), 871 ConditionalField(X3BytesField('memorySize3', 0), 872 lambda pkt: pkt.memorySizeLen == 3), 873 ConditionalField(XIntField('memorySize4', 0), 874 lambda pkt: pkt.memorySizeLen == 4), 875 StrField('dataRecord', b'', fmt="B"), 876 877 ] 878 879 880bind_layers(UDS, UDS_WMBA, service=0x3D) 881 882 883class UDS_WMBAPR(Packet): 884 name = 'WriteMemoryByAddressPositiveResponse' 885 fields_desc = [ 886 BitField('memorySizeLen', 0, 4), 887 BitField('memoryAddressLen', 0, 4), 888 ConditionalField(XByteField('memoryAddress1', 0), 889 lambda pkt: pkt.memoryAddressLen == 1), 890 ConditionalField(XShortField('memoryAddress2', 0), 891 lambda pkt: pkt.memoryAddressLen == 2), 892 ConditionalField(X3BytesField('memoryAddress3', 0), 893 lambda pkt: pkt.memoryAddressLen == 3), 894 ConditionalField(XIntField('memoryAddress4', 0), 895 lambda pkt: pkt.memoryAddressLen == 4), 896 ConditionalField(XByteField('memorySize1', 0), 897 lambda pkt: pkt.memorySizeLen == 1), 898 ConditionalField(XShortField('memorySize2', 0), 899 lambda pkt: pkt.memorySizeLen == 2), 900 ConditionalField(X3BytesField('memorySize3', 0), 901 lambda pkt: pkt.memorySizeLen == 3), 902 ConditionalField(XIntField('memorySize4', 0), 903 lambda pkt: pkt.memorySizeLen == 4) 904 ] 905 906 def answers(self, other): 907 return isinstance(other, UDS_WMBA) \ 908 and other.memorySizeLen == self.memorySizeLen \ 909 and other.memoryAddressLen == self.memoryAddressLen 910 911 912bind_layers(UDS, UDS_WMBAPR, service=0x7D) 913 914 915# ##########################DTC##################################### 916class DTC(Packet): 917 name = 'Diagnostic Trouble Code' 918 dtc_descriptions = {} # Customize this dictionary for each individual ECU / OEM 919 920 fields_desc = [ 921 BitEnumField("system", 0, 2, { 922 0: "Powertrain", 923 1: "Chassis", 924 2: "Body", 925 3: "Network"}), 926 BitEnumField("type", 0, 2, { 927 0: "Generic", 928 1: "ManufacturerSpecific", 929 2: "Generic", 930 3: "Generic"}), 931 BitField("numeric_value_code", 0, 12), 932 ByteField("additional_information_code", 0), 933 ] 934 935 def extract_padding(self, s): 936 return '', s 937 938 939# #########################CDTCI################################### 940class UDS_CDTCI(Packet): 941 name = 'ClearDiagnosticInformation' 942 fields_desc = [ 943 ByteField('groupOfDTCHighByte', 0), 944 ByteField('groupOfDTCMiddleByte', 0), 945 ByteField('groupOfDTCLowByte', 0), 946 ] 947 948 949bind_layers(UDS, UDS_CDTCI, service=0x14) 950 951 952class UDS_CDTCIPR(Packet): 953 name = 'ClearDiagnosticInformationPositiveResponse' 954 955 def answers(self, other): 956 return isinstance(other, UDS_CDTCI) 957 958 959bind_layers(UDS, UDS_CDTCIPR, service=0x54) 960 961 962# #########################RDTCI################################### 963class UDS_RDTCI(Packet): 964 reportTypes = { 965 0: 'ISOSAEReserved', 966 1: 'reportNumberOfDTCByStatusMask', 967 2: 'reportDTCByStatusMask', 968 3: 'reportDTCSnapshotIdentification', 969 4: 'reportDTCSnapshotRecordByDTCNumber', 970 5: 'reportDTCSnapshotRecordByRecordNumber', 971 6: 'reportDTCExtendedDataRecordByDTCNumber', 972 7: 'reportNumberOfDTCBySeverityMaskRecord', 973 8: 'reportDTCBySeverityMaskRecord', 974 9: 'reportSeverityInformationOfDTC', 975 10: 'reportSupportedDTC', 976 11: 'reportFirstTestFailedDTC', 977 12: 'reportFirstConfirmedDTC', 978 13: 'reportMostRecentTestFailedDTC', 979 14: 'reportMostRecentConfirmedDTC', 980 15: 'reportMirrorMemoryDTCByStatusMask', 981 16: 'reportMirrorMemoryDTCExtendedDataRecordByDTCNumber', 982 17: 'reportNumberOfMirrorMemoryDTCByStatusMask', 983 18: 'reportNumberOfEmissionsRelatedOBDDTCByStatusMask', 984 19: 'reportEmissionsRelatedOBDDTCByStatusMask', 985 20: 'reportDTCFaultDetectionCounter', 986 21: 'reportDTCWithPermanentStatus' 987 } 988 dtcStatus = { 989 1: 'TestFailed', 990 2: 'TestFailedThisOperationCycle', 991 4: 'PendingDTC', 992 8: 'ConfirmedDTC', 993 16: 'TestNotCompletedSinceLastClear', 994 32: 'TestFailedSinceLastClear', 995 64: 'TestNotCompletedThisOperationCycle', 996 128: 'WarningIndicatorRequested' 997 } 998 dtcStatusMask = { 999 1: 'ActiveDTCs', 1000 4: 'PendingDTCs', 1001 8: 'ConfirmedOrStoredDTCs', 1002 255: 'AllRecordDTCs' 1003 } 1004 dtcSeverityMask = { 1005 # 0: 'NoSeverityInformation', 1006 1: 'NoClassInformation', 1007 2: 'WWH-OBDClassA', 1008 4: 'WWH-OBDClassB1', 1009 8: 'WWH-OBDClassB2', 1010 16: 'WWH-OBDClassC', 1011 32: 'MaintenanceRequired', 1012 64: 'CheckAtNextHalt', 1013 128: 'CheckImmediately' 1014 } 1015 name = 'ReadDTCInformation' 1016 fields_desc = [ 1017 ByteEnumField('reportType', 0, reportTypes), 1018 ConditionalField(FlagsField('DTCSeverityMask', 0, 8, dtcSeverityMask), 1019 lambda pkt: pkt.reportType in [0x07, 0x08]), 1020 ConditionalField(FlagsField('DTCStatusMask', 0, 8, dtcStatusMask), 1021 lambda pkt: pkt.reportType in [ 1022 0x01, 0x02, 0x07, 0x08, 0x0f, 0x11, 0x12, 0x13]), 1023 ConditionalField(PacketField("dtc", None, pkt_cls=DTC), 1024 lambda pkt: pkt.reportType in [0x3, 0x4, 0x6, 1025 0x10, 0x09]), 1026 ConditionalField(ByteField('DTCSnapshotRecordNumber', 0), 1027 lambda pkt: pkt.reportType in [0x3, 0x4, 0x5]), 1028 ConditionalField(ByteField('DTCExtendedDataRecordNumber', 0), 1029 lambda pkt: pkt.reportType in [0x6, 0x10]) 1030 ] 1031 1032 1033bind_layers(UDS, UDS_RDTCI, service=0x19) 1034 1035 1036class DTCAndStatusRecord(Packet): 1037 name = 'DTC and status record' 1038 fields_desc = [ 1039 PacketField("dtc", None, pkt_cls=DTC), 1040 FlagsField("status", 0, 8, UDS_RDTCI.dtcStatus) 1041 ] 1042 1043 def extract_padding(self, s): 1044 return '', s 1045 1046 1047class DTCExtendedData(Packet): 1048 name = 'Diagnostic Trouble Code Extended Data' 1049 dataTypes = ObservableDict() 1050 fields_desc = [ 1051 ByteEnumField("data_type", 0, dataTypes), 1052 XByteField("record", 0) 1053 ] 1054 1055 def extract_padding(self, s): 1056 return '', s 1057 1058 1059class DTCExtendedDataRecord(Packet): 1060 fields_desc = [ 1061 PacketField("dtcAndStatus", None, pkt_cls=DTCAndStatusRecord), 1062 PacketListField("extendedData", None, pkt_cls=DTCExtendedData) 1063 ] 1064 1065 1066class DTCSnapshot(Packet): 1067 identifiers = defaultdict(list) # for later extension 1068 1069 @staticmethod 1070 def next_identifier_cb(pkt, lst, cur, remain): 1071 return Raw 1072 1073 fields_desc = [ 1074 ByteField("record_number", 0), 1075 ByteField("record_number_of_identifiers", 0), 1076 PacketListField( 1077 "snapshotData", None, 1078 next_cls_cb=lambda pkt, lst, cur, remain: DTCSnapshot.next_identifier_cb( 1079 pkt, lst, cur, remain)) 1080 ] 1081 1082 def extract_padding(self, s): 1083 return '', s 1084 1085 1086class DTCSnapshotRecord(Packet): 1087 fields_desc = [ 1088 PacketField("dtcAndStatus", None, pkt_cls=DTCAndStatusRecord), 1089 PacketListField("snapshots", None, pkt_cls=DTCSnapshot) 1090 ] 1091 1092 1093class UDS_RDTCIPR(Packet): 1094 name = 'ReadDTCInformationPositiveResponse' 1095 fields_desc = [ 1096 ByteEnumField('reportType', 0, UDS_RDTCI.reportTypes), 1097 ConditionalField( 1098 FlagsField('DTCStatusAvailabilityMask', 0, 8, UDS_RDTCI.dtcStatus), 1099 lambda pkt: pkt.reportType in [0x01, 0x07, 0x11, 0x12, 0x02, 0x0A, 1100 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x13, 1101 0x15]), 1102 ConditionalField(ByteEnumField('DTCFormatIdentifier', 0, 1103 {0: 'ISO15031-6DTCFormat', 1104 1: 'UDS-1DTCFormat', 1105 2: 'SAEJ1939-73DTCFormat', 1106 3: 'ISO11992-4DTCFormat'}), 1107 lambda pkt: pkt.reportType in [0x01, 0x07, 1108 0x11, 0x12]), 1109 ConditionalField(ShortField('DTCCount', 0), 1110 lambda pkt: pkt.reportType in [0x01, 0x07, 1111 0x11, 0x12]), 1112 ConditionalField(PacketListField('DTCAndStatusRecord', None, 1113 pkt_cls=DTCAndStatusRecord), 1114 lambda pkt: pkt.reportType in [0x02, 0x0A, 0x0B, 1115 0x0C, 0x0D, 0x0E, 1116 0x0F, 0x13, 0x15]), 1117 ConditionalField(StrField('dataRecord', b""), 1118 lambda pkt: pkt.reportType in [0x03, 0x08, 0x09, 1119 0x10, 0x14]), 1120 ConditionalField(PacketField('snapshotRecord', None, 1121 pkt_cls=DTCSnapshotRecord), 1122 lambda pkt: pkt.reportType in [0x04]), 1123 ConditionalField(PacketField('extendedDataRecord', None, 1124 pkt_cls=DTCExtendedDataRecord), 1125 lambda pkt: pkt.reportType in [0x06]) 1126 ] 1127 1128 def answers(self, other): 1129 if not isinstance(other, UDS_RDTCI): 1130 return False 1131 if not other.reportType == self.reportType: 1132 return False 1133 if self.reportType == 0x06: 1134 return other.dtc == self.extendedDataRecord.dtcAndStatus.dtc 1135 if self.reportType == 0x04: 1136 return other.dtc == self.snapshotRecord.dtcAndStatus.dtc 1137 return True 1138 1139 1140bind_layers(UDS, UDS_RDTCIPR, service=0x59) 1141 1142 1143# #########################RC################################### 1144class UDS_RC(Packet): 1145 routineControlTypes = { 1146 0: 'ISOSAEReserved', 1147 1: 'startRoutine', 1148 2: 'stopRoutine', 1149 3: 'requestRoutineResults' 1150 } 1151 routineControlIdentifiers = ObservableDict() 1152 name = 'RoutineControl' 1153 fields_desc = [ 1154 ByteEnumField('routineControlType', 0, routineControlTypes), 1155 XShortEnumField('routineIdentifier', 0, routineControlIdentifiers) 1156 ] 1157 1158 1159bind_layers(UDS, UDS_RC, service=0x31) 1160 1161 1162class UDS_RCPR(Packet): 1163 name = 'RoutineControlPositiveResponse' 1164 fields_desc = [ 1165 ByteEnumField('routineControlType', 0, UDS_RC.routineControlTypes), 1166 XShortEnumField('routineIdentifier', 0, 1167 UDS_RC.routineControlIdentifiers), 1168 ] 1169 1170 def answers(self, other): 1171 return isinstance(other, UDS_RC) \ 1172 and other.routineControlType == self.routineControlType \ 1173 and other.routineIdentifier == self.routineIdentifier 1174 1175 1176bind_layers(UDS, UDS_RCPR, service=0x71) 1177 1178 1179# #########################RD################################### 1180class UDS_RD(Packet): 1181 dataFormatIdentifiers = ObservableDict({ 1182 0: 'noCompressionNoEncryption' 1183 }) 1184 name = 'RequestDownload' 1185 fields_desc = [ 1186 ByteEnumField('dataFormatIdentifier', 0, dataFormatIdentifiers), 1187 BitField('memorySizeLen', 0, 4), 1188 BitField('memoryAddressLen', 0, 4), 1189 ConditionalField(XByteField('memoryAddress1', 0), 1190 lambda pkt: pkt.memoryAddressLen == 1), 1191 ConditionalField(XShortField('memoryAddress2', 0), 1192 lambda pkt: pkt.memoryAddressLen == 2), 1193 ConditionalField(X3BytesField('memoryAddress3', 0), 1194 lambda pkt: pkt.memoryAddressLen == 3), 1195 ConditionalField(XIntField('memoryAddress4', 0), 1196 lambda pkt: pkt.memoryAddressLen == 4), 1197 ConditionalField(XByteField('memorySize1', 0), 1198 lambda pkt: pkt.memorySizeLen == 1), 1199 ConditionalField(XShortField('memorySize2', 0), 1200 lambda pkt: pkt.memorySizeLen == 2), 1201 ConditionalField(X3BytesField('memorySize3', 0), 1202 lambda pkt: pkt.memorySizeLen == 3), 1203 ConditionalField(XIntField('memorySize4', 0), 1204 lambda pkt: pkt.memorySizeLen == 4) 1205 ] 1206 1207 1208bind_layers(UDS, UDS_RD, service=0x34) 1209 1210 1211class UDS_RDPR(Packet): 1212 name = 'RequestDownloadPositiveResponse' 1213 fields_desc = [ 1214 BitField('memorySizeLen', 0, 4), 1215 BitField('reserved', 0, 4), 1216 StrField('maxNumberOfBlockLength', b"", fmt="B"), 1217 ] 1218 1219 def answers(self, other): 1220 return isinstance(other, UDS_RD) 1221 1222 1223bind_layers(UDS, UDS_RDPR, service=0x74) 1224 1225 1226# #########################RU################################### 1227class UDS_RU(Packet): 1228 name = 'RequestUpload' 1229 fields_desc = [ 1230 ByteEnumField('dataFormatIdentifier', 0, 1231 UDS_RD.dataFormatIdentifiers), 1232 BitField('memorySizeLen', 0, 4), 1233 BitField('memoryAddressLen', 0, 4), 1234 ConditionalField(XByteField('memoryAddress1', 0), 1235 lambda pkt: pkt.memoryAddressLen == 1), 1236 ConditionalField(XShortField('memoryAddress2', 0), 1237 lambda pkt: pkt.memoryAddressLen == 2), 1238 ConditionalField(X3BytesField('memoryAddress3', 0), 1239 lambda pkt: pkt.memoryAddressLen == 3), 1240 ConditionalField(XIntField('memoryAddress4', 0), 1241 lambda pkt: pkt.memoryAddressLen == 4), 1242 ConditionalField(XByteField('memorySize1', 0), 1243 lambda pkt: pkt.memorySizeLen == 1), 1244 ConditionalField(XShortField('memorySize2', 0), 1245 lambda pkt: pkt.memorySizeLen == 2), 1246 ConditionalField(X3BytesField('memorySize3', 0), 1247 lambda pkt: pkt.memorySizeLen == 3), 1248 ConditionalField(XIntField('memorySize4', 0), 1249 lambda pkt: pkt.memorySizeLen == 4) 1250 ] 1251 1252 1253bind_layers(UDS, UDS_RU, service=0x35) 1254 1255 1256class UDS_RUPR(Packet): 1257 name = 'RequestUploadPositiveResponse' 1258 fields_desc = [ 1259 BitField('memorySizeLen', 0, 4), 1260 BitField('reserved', 0, 4), 1261 StrField('maxNumberOfBlockLength', b"", fmt="B"), 1262 ] 1263 1264 def answers(self, other): 1265 return isinstance(other, UDS_RU) 1266 1267 1268bind_layers(UDS, UDS_RUPR, service=0x75) 1269 1270 1271# #########################TD################################### 1272class UDS_TD(Packet): 1273 name = 'TransferData' 1274 fields_desc = [ 1275 ByteField('blockSequenceCounter', 0), 1276 StrField('transferRequestParameterRecord', b"", fmt="B") 1277 ] 1278 1279 1280bind_layers(UDS, UDS_TD, service=0x36) 1281 1282 1283class UDS_TDPR(Packet): 1284 name = 'TransferDataPositiveResponse' 1285 fields_desc = [ 1286 ByteField('blockSequenceCounter', 0), 1287 StrField('transferResponseParameterRecord', b"", fmt="B") 1288 ] 1289 1290 def answers(self, other): 1291 return isinstance(other, UDS_TD) \ 1292 and other.blockSequenceCounter == self.blockSequenceCounter 1293 1294 1295bind_layers(UDS, UDS_TDPR, service=0x76) 1296 1297 1298# #########################RTE################################### 1299class UDS_RTE(Packet): 1300 name = 'RequestTransferExit' 1301 fields_desc = [ 1302 StrField('transferRequestParameterRecord', b"", fmt="B") 1303 ] 1304 1305 1306bind_layers(UDS, UDS_RTE, service=0x37) 1307 1308 1309class UDS_RTEPR(Packet): 1310 name = 'RequestTransferExitPositiveResponse' 1311 fields_desc = [ 1312 StrField('transferResponseParameterRecord', b"", fmt="B") 1313 ] 1314 1315 def answers(self, other): 1316 return isinstance(other, UDS_RTE) 1317 1318 1319bind_layers(UDS, UDS_RTEPR, service=0x77) 1320 1321 1322# #########################RFT################################### 1323class UDS_RFT(Packet): 1324 name = 'RequestFileTransfer' 1325 1326 modeOfOperations = { 1327 0x00: "ISO/SAE Reserved", 1328 0x01: "Add File", 1329 0x02: "Delete File", 1330 0x03: "Replace File", 1331 0x04: "Read File", 1332 0x05: "Read Directory" 1333 } 1334 1335 @staticmethod 1336 def _contains_file_size(packet): 1337 return packet.modeOfOperation not in [2, 4, 5] 1338 1339 fields_desc = [ 1340 XByteEnumField('modeOfOperation', 0, modeOfOperations), 1341 FieldLenField('filePathAndNameLength', None, 1342 length_of='filePathAndName', fmt='H'), 1343 StrLenField('filePathAndName', b"", 1344 length_from=lambda p: p.filePathAndNameLength), 1345 ConditionalField(BitField('compressionMethod', 0, 4), 1346 lambda p: p.modeOfOperation not in [2, 5]), 1347 ConditionalField(BitField('encryptingMethod', 0, 4), 1348 lambda p: p.modeOfOperation not in [2, 5]), 1349 ConditionalField(FieldLenField('fileSizeParameterLength', None, 1350 fmt="B", 1351 length_of='fileSizeUnCompressed'), 1352 lambda p: UDS_RFT._contains_file_size(p)), 1353 ConditionalField(StrLenField('fileSizeUnCompressed', b"", 1354 length_from=lambda p: 1355 p.fileSizeParameterLength), 1356 lambda p: UDS_RFT._contains_file_size(p)), 1357 ConditionalField(StrLenField('fileSizeCompressed', b"", 1358 length_from=lambda p: 1359 p.fileSizeParameterLength), 1360 lambda p: UDS_RFT._contains_file_size(p)) 1361 ] 1362 1363 1364bind_layers(UDS, UDS_RFT, service=0x38) 1365 1366 1367class UDS_RFTPR(Packet): 1368 name = 'RequestFileTransferPositiveResponse' 1369 1370 @staticmethod 1371 def _contains_data_format_identifier(packet): 1372 return packet.modeOfOperation != 0x02 1373 1374 fields_desc = [ 1375 XByteEnumField('modeOfOperation', 0, UDS_RFT.modeOfOperations), 1376 ConditionalField(FieldLenField('lengthFormatIdentifier', None, 1377 length_of='maxNumberOfBlockLength', 1378 fmt='B'), 1379 lambda p: p.modeOfOperation != 2), 1380 ConditionalField(StrLenField('maxNumberOfBlockLength', b"", 1381 length_from=lambda p: p.lengthFormatIdentifier), 1382 lambda p: p.modeOfOperation != 2), 1383 ConditionalField(BitField('compressionMethod', 0, 4), 1384 lambda p: p.modeOfOperation != 0x02), 1385 ConditionalField(BitField('encryptingMethod', 0, 4), 1386 lambda p: p.modeOfOperation != 0x02), 1387 ConditionalField(FieldLenField('fileSizeOrDirInfoParameterLength', 1388 None, 1389 length_of='fileSizeUncompressedOrDirInfoLength'), 1390 lambda p: p.modeOfOperation not in [1, 2, 3]), 1391 ConditionalField(StrLenField('fileSizeUncompressedOrDirInfoLength', 1392 b"", 1393 length_from=lambda p: 1394 p.fileSizeOrDirInfoParameterLength), 1395 lambda p: p.modeOfOperation not in [1, 2, 3]), 1396 ConditionalField(StrLenField('fileSizeCompressed', b"", 1397 length_from=lambda p: 1398 p.fileSizeOrDirInfoParameterLength), 1399 lambda p: p.modeOfOperation not in [1, 2, 3, 5]), 1400 ] 1401 1402 def answers(self, other): 1403 return isinstance(other, UDS_RFT) 1404 1405 1406bind_layers(UDS, UDS_RFTPR, service=0x78) 1407 1408 1409# #########################IOCBI################################### 1410class UDS_IOCBI(Packet): 1411 name = 'InputOutputControlByIdentifier' 1412 fields_desc = [ 1413 XShortEnumField('dataIdentifier', 0, UDS_RDBI.dataIdentifiers), 1414 ] 1415 1416 1417bind_layers(UDS, UDS_IOCBI, service=0x2F) 1418 1419 1420class UDS_IOCBIPR(Packet): 1421 name = 'InputOutputControlByIdentifierPositiveResponse' 1422 fields_desc = [ 1423 XShortEnumField('dataIdentifier', 0, UDS_RDBI.dataIdentifiers), 1424 ] 1425 1426 def answers(self, other): 1427 return isinstance(other, UDS_IOCBI) \ 1428 and other.dataIdentifier == self.dataIdentifier 1429 1430 1431bind_layers(UDS, UDS_IOCBIPR, service=0x6F) 1432 1433 1434# #########################NR################################### 1435class UDS_NR(Packet): 1436 negativeResponseCodes = { 1437 0x00: 'positiveResponse', 1438 0x10: 'generalReject', 1439 0x11: 'serviceNotSupported', 1440 0x12: 'subFunctionNotSupported', 1441 0x13: 'incorrectMessageLengthOrInvalidFormat', 1442 0x14: 'responseTooLong', 1443 0x20: 'ISOSAEReserved', 1444 0x21: 'busyRepeatRequest', 1445 0x22: 'conditionsNotCorrect', 1446 0x23: 'ISOSAEReserved', 1447 0x24: 'requestSequenceError', 1448 0x25: 'noResponseFromSubnetComponent', 1449 0x26: 'failurePreventsExecutionOfRequestedAction', 1450 0x31: 'requestOutOfRange', 1451 0x33: 'securityAccessDenied', 1452 0x34: 'authenticationRequired', 1453 0x35: 'invalidKey', 1454 0x36: 'exceedNumberOfAttempts', 1455 0x37: 'requiredTimeDelayNotExpired', 1456 0x3A: 'secureDataVerificationFailed', 1457 0x50: 'certificateVerificationFailedInvalidTimePeriod', 1458 0x51: 'certificateVerificationFailedInvalidSignature', 1459 0x52: 'certificateVerificationFailedInvalidChainOfTrust', 1460 0x53: 'certificateVerificationFailedInvalidType', 1461 0x54: 'certificateVerificationFailedInvalidFormat', 1462 0x55: 'certificateVerificationFailedInvalidContent', 1463 0x56: 'certificateVerificationFailedInvalidScope', 1464 0x57: 'certificateVerificationFailedInvalidCertificateRevoked', 1465 0x58: 'ownershipVerificationFailed', 1466 0x59: 'challengeCalculationFailed', 1467 0x5a: 'settingAccessRightsFailed', 1468 0x5b: 'sessionKeyCreationOrDerivationFailed', 1469 0x5c: 'configurationDataUsageFailed', 1470 0x5d: 'deAuthenticationFailed', 1471 0x70: 'uploadDownloadNotAccepted', 1472 0x71: 'transferDataSuspended', 1473 0x72: 'generalProgrammingFailure', 1474 0x73: 'wrongBlockSequenceCounter', 1475 0x78: 'requestCorrectlyReceived-ResponsePending', 1476 0x7E: 'subFunctionNotSupportedInActiveSession', 1477 0x7F: 'serviceNotSupportedInActiveSession', 1478 0x80: 'ISOSAEReserved', 1479 0x81: 'rpmTooHigh', 1480 0x82: 'rpmTooLow', 1481 0x83: 'engineIsRunning', 1482 0x84: 'engineIsNotRunning', 1483 0x85: 'engineRunTimeTooLow', 1484 0x86: 'temperatureTooHigh', 1485 0x87: 'temperatureTooLow', 1486 0x88: 'vehicleSpeedTooHigh', 1487 0x89: 'vehicleSpeedTooLow', 1488 0x8a: 'throttle/PedalTooHigh', 1489 0x8b: 'throttle/PedalTooLow', 1490 0x8c: 'transmissionRangeNotInNeutral', 1491 0x8d: 'transmissionRangeNotInGear', 1492 0x8e: 'ISOSAEReserved', 1493 0x8f: 'brakeSwitch(es)NotClosed', 1494 0x90: 'shifterLeverNotInPark', 1495 0x91: 'torqueConverterClutchLocked', 1496 0x92: 'voltageTooHigh', 1497 0x93: 'voltageTooLow', 1498 } 1499 name = 'NegativeResponse' 1500 fields_desc = [ 1501 XByteEnumField('requestServiceId', 0, UDS.services), 1502 ByteEnumField('negativeResponseCode', 0, negativeResponseCodes) 1503 ] 1504 1505 def answers(self, other): 1506 return self.requestServiceId == other.service and \ 1507 (self.negativeResponseCode != 0x78 or 1508 conf.contribs['UDS']['treat-response-pending-as-answer']) 1509 1510 1511bind_layers(UDS, UDS_NR, service=0x7f) 1512 1513 1514# ################################################################## 1515# ######################## UTILS ################################### 1516# ################################################################## 1517 1518 1519class UDS_TesterPresentSender(PeriodicSenderThread): 1520 def __init__(self, sock, pkt=UDS() / UDS_TP(subFunction=0x80), interval=2): 1521 """ Thread to send TesterPresent messages packets periodically 1522 1523 Args: 1524 sock: socket where packet is sent periodically 1525 pkt: packet to send 1526 interval: interval between two packets 1527 """ 1528 PeriodicSenderThread.__init__(self, sock, pkt, interval) 1529